struct lu_buf cl_buf;
/** size of layout in lov_mds_md format. */
size_t cl_size;
+ /** size of DoM component if exists or zero otherwise */
+ u32 cl_dom_comp_size;
/** Layout generation. */
u32 cl_layout_gen;
/** whether layout is a composite one */
return ex1->start <= ex2->start && ex1->end >= ex2->end;
}
+int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop);
+
#endif
/** @} LDLM */
#define OBD_MD_FLUID (0x00000200ULL) /* user ID */
#define OBD_MD_FLGID (0x00000400ULL) /* group ID */
#define OBD_MD_FLFLAGS (0x00000800ULL) /* flags word */
+#define OBD_MD_DOM_SIZE (0X00001000ULL) /* Data-on-MDT component size */
#define OBD_MD_FLNLINK (0x00002000ULL) /* link count */
#define OBD_MD_FLGENER (0x00004000ULL) /* generation number */
/*#define OBD_MD_FLINLINE (0x00008000ULL) inline data. used until 1.6.5 */
__u32 mbo_uid_h; /* high 32-bits of uid, for FUID */
__u32 mbo_gid_h; /* high 32-bits of gid, for FUID */
__u32 mbo_projid;
- __u64 mbo_padding_6; /* also fix lustre_swab_mdt_body */
- __u64 mbo_padding_7;
- __u64 mbo_padding_8;
+ __u64 mbo_dom_size; /* size of DOM component */
+ __u64 mbo_dom_blocks; /* blocks consumed by DOM component */
+ __u64 mbo_padding_8; /* also fix lustre_swab_mdt_body */
__u64 mbo_padding_9;
__u64 mbo_padding_10;
}; /* 216 */
wpolicy->l_inodebits.bits = lpolicy->l_inodebits.bits;
wpolicy->l_inodebits.try_bits = lpolicy->l_inodebits.try_bits;
}
+
+int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop)
+{
+ ENTRY;
+
+ check_res_locked(lock->l_resource);
+
+ /* Just return if there are no conflicting bits */
+ if ((lock->l_policy_data.l_inodebits.bits & to_drop) == 0) {
+ LDLM_WARN(lock, "try to drop unset bits %#llx/%#llx\n",
+ lock->l_policy_data.l_inodebits.bits, to_drop);
+ /* nothing to do */
+ RETURN(0);
+ }
+
+ /* remove lock from a skiplist and put in the new place
+ * according with new inodebits */
+ ldlm_resource_unlink_lock(lock);
+ lock->l_policy_data.l_inodebits.bits &= ~to_drop;
+ ldlm_grant_lock_with_skiplist(lock);
+ RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_inodebits_drop);
LDLM_WORK_GL_AST
} ldlm_desc_ast_t;
+void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock);
void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list);
int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
enum req_location loc, void *data, int size);
* Add a lock to granted list on a resource maintaining skiplist
* correctness.
*/
-static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
+void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
{
- struct sl_insert_point prev;
- ENTRY;
+ struct sl_insert_point prev;
- LASSERT(lock->l_req_mode == lock->l_granted_mode);
+ LASSERT(lock->l_req_mode == lock->l_granted_mode);
- search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
- ldlm_granted_list_add_lock(lock, &prev);
- EXIT;
+ search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
+ ldlm_granted_list_add_lock(lock, &prev);
}
/**
return lu_fid_eq(&ll_i2info(inode)->lli_fid, opaque);
}
+int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
+{
+ struct lu_env *env;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct cl_layout clt = { .cl_layout_gen = 0, };
+ int rc;
+ __u16 refcheck;
+
+
+ ENTRY;
+
+ if (!lli->lli_clob)
+ RETURN(0);
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ rc = cl_object_layout_get(env, lli->lli_clob, &clt);
+ if (rc) {
+ CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
+ PFID(ll_inode2fid(inode)));
+ rc = -ENODATA;
+ } else if (clt.cl_dom_comp_size == 0) {
+ CDEBUG(D_INODE, "DOM lock without DOM layout for "DFID"\n",
+ PFID(ll_inode2fid(inode)));
+ rc = -EINVAL;
+ } else {
+ enum cl_fsync_mode mode;
+ loff_t end = clt.cl_dom_comp_size - 1;
+
+ mode = ldlm_is_discard_data(lock) ?
+ CL_FSYNC_DISCARD : CL_FSYNC_LOCAL;
+ rc = cl_sync_file_range(inode, 0, end, mode, 1);
+ truncate_inode_pages_range(inode->i_mapping, 0, end);
+ }
+ cl_env_put(env, &refcheck);
+ RETURN(rc);
+}
+
int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
void *data, int flag)
{
struct inode *inode = ll_inode_from_resource_lock(lock);
__u64 bits = lock->l_policy_data.l_inodebits.bits;
- /* Inode is set to lock->l_resource->lr_lvb_inode
- * for mdc - bug 24555 */
- LASSERT(lock->l_ast_data == NULL);
-
if (inode == NULL)
break;
}
if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
- MDS_INODELOCK_LAYOUT | MDS_INODELOCK_PERM))
+ MDS_INODELOCK_LAYOUT | MDS_INODELOCK_PERM |
+ MDS_INODELOCK_DOM))
ll_have_md_lock(inode, &bits, LCK_MINMODE);
+ if (bits & MDS_INODELOCK_DOM) {
+ rc = ll_dom_lock_cancel(inode, lock);
+ if (rc < 0)
+ CDEBUG(D_INODE, "cannot flush DoM data "
+ DFID": rc = %d\n",
+ PFID(ll_inode2fid(inode)), rc);
+ lock_res_and_lock(lock);
+ ldlm_set_kms_ignore(lock);
+ unlock_res_and_lock(lock);
+ bits &= ~MDS_INODELOCK_DOM;
+ }
+
if (bits & MDS_INODELOCK_LAYOUT) {
struct cl_object_conf conf = {
.coc_opc = OBJECT_CONF_INVALIDATE,
cl->cl_size = lov_comp_md_size(lsm);
cl->cl_layout_gen = lsm->lsm_layout_gen;
- cl->cl_is_composite = lsm_is_composite(lsm->lsm_magic);
+ if (lsm_is_composite(lsm->lsm_magic)) {
+ struct lov_stripe_md_entry *lsme = lsm->lsm_entries[0];
+ cl->cl_is_composite = true;
+
+ if (lsme_is_dom(lsme))
+ cl->cl_dom_comp_size = lsme->lsme_extent.e_end;
+ }
rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
lov_lsm_put(lsm);
einfo->ei_cbdata = osc; /* value to be put into ->l_ast_data */
}
-static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data)
+static void mdc_lock_lvb_update(const struct lu_env *env,
+ struct osc_object *osc,
+ struct ldlm_lock *dlmlock,
+ struct ost_lvb *lvb);
+
+static int mdc_set_dom_lock_data(const struct lu_env *env,
+ struct ldlm_lock *lock, void *data)
{
+ struct osc_object *obj = data;
int set = 0;
LASSERT(lock != NULL);
+ LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
lock_res_and_lock(lock);
-
- if (lock->l_ast_data == NULL)
+ if (lock->l_ast_data == NULL) {
lock->l_ast_data = data;
+ mdc_lock_lvb_update(env, obj, lock, NULL);
+ }
+
if (lock->l_ast_data == data)
set = 1;
return set;
}
-int mdc_dom_lock_match(struct obd_export *exp, struct ldlm_res_id *res_id,
+int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp,
+ struct ldlm_res_id *res_id,
enum ldlm_type type, union ldlm_policy_data *policy,
enum ldlm_mode mode, __u64 *flags, void *data,
struct lustre_handle *lockh, int unref)
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
LASSERT(lock != NULL);
- if (!mdc_set_dom_lock_data(lock, data)) {
+ if (!mdc_set_dom_lock_data(env, lock, data)) {
ldlm_lock_decref(lockh, rc);
rc = 0;
}
/* If we're trying to read, we also search for an existing PW lock. The
* VFS and page cache already protect us locally, so lots of readers/
* writers can share a single PW lock. */
- mode = mdc_dom_lock_match(osc_export(obj), resname, LDLM_IBITS, policy,
- LCK_PR | LCK_PW, &flags, obj, &lockh,
+ mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS,
+ policy, LCK_PR | LCK_PW, &flags, obj, &lockh,
dap_flags & OSC_DAP_FL_CANCELING);
if (mode != 0) {
lock = ldlm_handle2lock(&lockh);
dlmlock->l_ast_data = NULL;
cl_object_get(obj);
}
+ ldlm_set_kms_ignore(dlmlock);
unlock_res_and_lock(dlmlock);
/* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
*
* Called under lock and resource spin-locks.
*/
-static void mdc_lock_lvb_update(const struct lu_env *env,
- struct osc_object *osc,
- struct ldlm_lock *dlmlock,
- struct ost_lvb *lvb)
+void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
+ struct ldlm_lock *dlmlock, struct ost_lvb *lvb)
{
struct cl_object *obj = osc2cl(osc);
struct lov_oinfo *oinfo = osc->oo_oinfo;
attr->cat_kms = size;
} else {
LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu,"
- " leaving kms=%llu, end=%llu",
- lvb->lvb_size, oinfo->loi_kms,
- dlmlock->l_policy_data.l_extent.end);
+ " leaving kms=%llu",
+ lvb->lvb_size, oinfo->loi_kms);
}
}
cl_object_attr_update(env, obj, attr, valid);
lvb->lvb_mtime = body->mbo_mtime;
lvb->lvb_atime = body->mbo_atime;
lvb->lvb_ctime = body->mbo_ctime;
- lvb->lvb_blocks = body->mbo_blocks;
- lvb->lvb_size = body->mbo_size;
+ lvb->lvb_blocks = body->mbo_dom_blocks;
+ lvb->lvb_size = body->mbo_dom_size;
+
RETURN(0);
}
* when other sync requests do not get released lock from a client, the client
* is excluded from the cluster -- such scenarious make the life difficult, so
* release locks just after they are obtained. */
-int mdc_enqueue_send(struct obd_export *exp, struct ldlm_res_id *res_id,
- __u64 *flags, union ldlm_policy_data *policy,
+int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
+ struct ldlm_res_id *res_id, __u64 *flags,
+ union ldlm_policy_data *policy,
struct ost_lvb *lvb, int kms_valid,
osc_enqueue_upcall_f upcall, void *cookie,
struct ldlm_enqueue_info *einfo, int async)
ENTRY;
- if (!kms_valid)
- goto no_match;
-
mode = einfo->ei_mode;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
RETURN(ELDLM_OK);
matched = ldlm_handle2lock(&lockh);
- if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) {
+ if (ldlm_is_kms_ignore(matched))
+ goto no_match;
+
+ if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) {
*flags |= LDLM_FL_LVB_READY;
/* We already have a lock, and it's referenced. */
ldlm_lock_decref(&lockh, mode);
LDLM_LOCK_PUT(matched);
RETURN(ELDLM_OK);
- } else {
- ldlm_lock_decref(&lockh, mode);
- LDLM_LOCK_PUT(matched);
}
+no_match:
+ ldlm_lock_decref(&lockh, mode);
+ LDLM_LOCK_PUT(matched);
}
-no_match:
if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
RETURN(-ENOLCK);
fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname);
mdc_lock_build_policy(env, policy);
LASSERT(!oscl->ols_speculative);
- result = mdc_enqueue_send(osc_export(osc), resname, &oscl->ols_flags,
- policy, &oscl->ols_lvb,
- osc->oo_oinfo->loi_kms_valid,
+ result = mdc_enqueue_send(env, osc_export(osc), resname,
+ &oscl->ols_flags, policy,
+ &oscl->ols_lvb, osc->oo_oinfo->loi_kms_valid,
upcall, cookie, &oscl->ols_einfo, async);
if (result == 0) {
if (osc_lock_is_lockless(oscl)) {
return osc_attr_get(env, obj, attr);
}
+static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
+{
+ ENTRY;
+
+ CDEBUG(D_DLMTRACE, "obj: %p/%p, lock %p\n",
+ data, lock->l_ast_data, lock);
+
+ LASSERT(lock->l_granted_mode == lock->l_req_mode);
+ if ((lock->l_ast_data == NULL && !ldlm_is_kms_ignore(lock)) ||
+ (lock->l_ast_data == data)) {
+ lock->l_ast_data = NULL;
+ ldlm_set_kms_ignore(lock);
+ }
+ RETURN(LDLM_ITER_CONTINUE);
+}
+
+int mdc_object_prune(const struct lu_env *env, struct cl_object *obj)
+{
+ struct osc_object *osc = cl2osc(obj);
+ struct ldlm_res_id *resname = &osc_env_info(env)->oti_resname;
+
+ /* DLM locks don't hold a reference of osc_object so we have to
+ * clear it before the object is being destroyed. */
+ osc_build_res_name(osc, resname);
+ ldlm_resource_iterate(osc_export(osc)->exp_obd->obd_namespace, resname,
+ mdc_object_ast_clear, osc);
+ return 0;
+}
+
static const struct cl_object_operations mdc_ops = {
.coo_page_init = osc_page_init,
.coo_lock_init = mdc_lock_init,
.coo_attr_update = osc_attr_update,
.coo_glimpse = osc_object_glimpse,
.coo_req_attr_set = mdc_req_attr_set,
- .coo_prune = osc_object_prune,
+ .coo_prune = mdc_object_prune,
};
static const struct osc_object_operations mdc_object_ops = {
extern struct lu_device_type mdc_device_type;
int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
struct ldlm_lock_desc *new, void *data, int flag);
-
+int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data);
+int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb);
#endif
struct ldlm_request *lockreq;
struct ldlm_reply *lockrep;
struct ldlm_lock *lock;
+ struct mdt_body *body = NULL;
void *lvb_data = NULL;
__u32 lvb_len = 0;
+
ENTRY;
LASSERT(rc >= 0);
/* We know what to expect, so we do any byte flipping required here */
if (it_has_reply_body(it)) {
- struct mdt_body *body;
-
body = req_capsule_server_get(pill, &RMF_MDT_BODY);
if (body == NULL) {
CERROR ("Can't swab mdt_body\n");
* client still does this checking in case it's talking with an old
* server. - Jinshan */
lock = ldlm_handle2lock(lockh);
- if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
+ if (lock == NULL)
+ RETURN(rc);
+
+ if (ldlm_has_layout(lock) && lvb_data != NULL &&
!(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
void *lmm;
ldlm_it2str(it->it_op), lvb_len);
OBD_ALLOC_LARGE(lmm, lvb_len);
- if (lmm == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-ENOMEM);
- }
+ if (lmm == NULL)
+ GOTO(out_lock, rc = -ENOMEM);
+
memcpy(lmm, lvb_data, lvb_len);
/* install lvb_data */
if (lmm != NULL)
OBD_FREE_LARGE(lmm, lvb_len);
}
- if (lock != NULL)
- LDLM_LOCK_PUT(lock);
+
+ if (ldlm_has_dom(lock)) {
+ LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
+
+ body = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
+ LDLM_ERROR(lock, "%s: DoM lock without size.\n",
+ exp->exp_obd->obd_name);
+ GOTO(out_lock, rc = -EPROTO);
+ }
+
+ LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
+ ldlm_it2str(it->it_op), body->mbo_dom_size);
+
+ rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
+ }
+out_lock:
+ LDLM_LOCK_PUT(lock);
RETURN(rc);
}
rc = obd_get_request_slot(&obddev->u.cli);
if (rc != 0) {
mdc_put_mod_rpc_slot(req, it);
- mdc_clear_replay_flag(req, 0);
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
- }
+ mdc_clear_replay_flag(req, 0);
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+ }
+ }
- rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
+ /* With Data-on-MDT the glimpse callback is needed too.
+ * It is set here in advance but not in mdc_finish_enqueue()
+ * to avoid possible races. It is safe to have glimpse handler
+ * for non-DOM locks and costs nothing.*/
+ if (einfo->ei_cb_gl == NULL)
+ einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
+
+ rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
0, lvb_type, lockh, 0);
- if (!it) {
- /* For flock requests we immediatelly return without further
- delay and let caller deal with the rest, since rest of
- this function metadata processing makes no sense for flock
+ if (!it) {
+ /* For flock requests we immediatelly return without further
+ delay and let caller deal with the rest, since rest of
+ this function metadata processing makes no sense for flock
requests anyway. But in case of problem during comms with
Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
can not rely on caller and this mainly for F_UNLCKs
.ei_mode = it_to_lock_mode(it),
.ei_cb_bl = cb_blocking,
.ei_cb_cp = ldlm_completion_ast,
+ .ei_cb_gl = mdc_ldlm_glimpse_ast,
};
struct lustre_handle lockh;
int rc = 0;
RETURN(rc);
}
+ /* With Data-on-MDT the glimpse callback is needed too.
+ * It is set here in advance but not in mdc_finish_enqueue()
+ * to avoid possible races. It is safe to have glimpse handler
+ * for non-DOM locks and costs nothing.*/
+ if (minfo->mi_einfo.ei_cb_gl == NULL)
+ minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
+
rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
&flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
if (rc < 0) {
RETURN(rc);
}
+/**
+ * Pack size attributes into the reply.
+ */
+int mdt_pack_size2body(struct mdt_thread_info *info,
+ const struct lu_fid *fid, bool dom_lock)
+{
+ struct mdt_body *b;
+ struct md_attr *ma = &info->mti_attr;
+ int dom_stripe;
+
+ ENTRY;
+
+ LASSERT(ma->ma_attr.la_valid & LA_MODE);
+
+ if (!S_ISREG(ma->ma_attr.la_mode) ||
+ !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
+ RETURN(-ENODATA);
+
+ dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm);
+ /* no DoM stripe, no size in reply */
+ if (dom_stripe == LMM_NO_DOM)
+ RETURN(-ENOENT);
+
+ /* no DoM lock, no size in reply */
+ if (!dom_lock)
+ RETURN(0);
+
+ /* Either DoM lock exists or LMM has only DoM stripe then
+ * return size on body. */
+ b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+
+ mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
+ RETURN(0);
+}
+
#ifdef CONFIG_FS_POSIX_ACL
/*
* Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
/* layout lock must be granted in a best-effort way
* for IT operations */
LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
- if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
- exp_connect_layout(info->mti_exp) &&
- S_ISREG(lu_object_attr(&child->mot_obj)) &&
+ if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
!mdt_object_remote(child) && ldlm_rep != NULL) {
- /* try to grant layout lock for regular file. */
- try_bits = MDS_INODELOCK_LAYOUT;
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+ exp_connect_layout(info->mti_exp)) {
+ /* try to grant layout lock for regular file. */
+ try_bits = MDS_INODELOCK_LAYOUT;
+ }
+ /* Acquire DOM lock in advance for data-on-mdt file */
+ if (child != parent)
+ try_bits |= MDS_INODELOCK_DOM;
}
if (try_bits != 0) {
"Lock res_id: "DLDLMRES", fid: "DFID"\n",
PLDLMRES(lock->l_resource),
PFID(mdt_object_fid(child)));
+
+ if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
+ mdt_object_exists(child) && !mdt_object_remote(child) &&
+ child != parent) {
+ LDLM_LOCK_PUT(lock);
+ mdt_object_put(info->mti_env, child);
+ /* NB: call the mdt_pack_size2body always after
+ * mdt_object_put(), that is why this speacial
+ * exit path is used. */
+ rc = mdt_pack_size2body(info, child_fid,
+ child_bits & MDS_INODELOCK_DOM);
+ if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
+ /* DOM lock was taken in advance but this is
+ * not DoM file. Drop the lock. */
+ lock_res_and_lock(lock);
+ ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
+ unlock_res_and_lock(lock);
+ }
+
+ GOTO(out_parent, rc = 0);
+ }
}
if (lock)
LDLM_LOCK_PUT(lock);
m->mdt_skip_lfsck = 1;
}
+ /* DoM files get IO lock at open by default */
+ m->mdt_opts.mo_dom_lock = 1;
+
m->mdt_squash.rsi_uid = 0;
m->mdt_squash.rsi_gid = 0;
INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
unsigned int mo_user_xattr:1,
mo_acl:1,
mo_cos:1,
- mo_evict_tgt_nids:1;
+ mo_evict_tgt_nids:1,
+ mo_dom_lock:1;
} mdt_opts;
/* mdt state flags */
unsigned long mdt_state;
return exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE;
}
+enum {
+ LMM_NO_DOM,
+ LMM_DOM_ONLY,
+ LMM_DOM_OST
+};
+
+/* XXX Look into layout in MDT layer. This must be done in LOD. */
+static inline int mdt_lmm_dom_entry(struct lov_mds_md *lmm)
+{
+ struct lov_comp_md_v1 *comp_v1;
+ struct lov_mds_md *v1;
+ int i;
+
+ if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
+ comp_v1 = (struct lov_comp_md_v1 *)lmm;
+ v1 = (struct lov_mds_md *)((char *)comp_v1 +
+ comp_v1->lcm_entries[0].lcme_offset);
+ /* DoM entry is the first entry always */
+ if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_MDT)
+ return LMM_NO_DOM;
+
+ for (i = 1; i < comp_v1->lcm_entry_count; i++) {
+ int j;
+
+ v1 = (struct lov_mds_md *)((char *)comp_v1 +
+ comp_v1->lcm_entries[i].lcme_offset);
+ for (j = 0; j < v1->lmm_stripe_count; j++) {
+ /* if there is any object on OST */
+ if (v1->lmm_objects[j].l_ost_idx !=
+ (__u32)-1UL)
+ return LMM_DOM_OST;
+ }
+ }
+ return LMM_DOM_ONLY;
+ }
+ return LMM_NO_DOM;
+}
+
__u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag);
void mdt_set_disposition(struct mdt_thread_info *info,
struct ldlm_reply *rep, __u64 op_flag);
struct mdt_object *o, struct lu_nodemap *nodemap);
#endif
void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
- const struct lu_attr *attr, const struct lu_fid *fid);
-
+ const struct lu_attr *attr, const struct lu_fid *fid);
+int mdt_pack_size2body(struct mdt_thread_info *info,
+ const struct lu_fid *fid, bool dom_lock);
int mdt_getxattr(struct mdt_thread_info *info);
int mdt_reint_setxattr(struct mdt_thread_info *info,
struct mdt_lock_handle *lh);
mdt_dom_discard_data(mti, mdt_object_fid(mo));
}
+int mdt_dom_object_size(const struct lu_env *env, struct mdt_device *mdt,
+ const struct lu_fid *fid, struct mdt_body *mb,
+ bool dom_lock);
+bool mdt_dom_client_has_lock(struct mdt_thread_info *info,
+ const struct lu_fid *fid);
/* grants */
long mdt_grant_connect(const struct lu_env *env, struct obd_export *exp,
u64 want, bool conservative);
lock_res(res);
res_lvb = res->lr_lvb_data;
- mb->mbo_size = res_lvb->lvb_size;
- mb->mbo_blocks = res_lvb->lvb_blocks;
+ mb->mbo_dom_size = res_lvb->lvb_size;
+ mb->mbo_dom_blocks = res_lvb->lvb_blocks;
mb->mbo_mtime = res_lvb->lvb_mtime;
mb->mbo_ctime = res_lvb->lvb_ctime;
mb->mbo_atime = res_lvb->lvb_atime;
CDEBUG(D_DLMTRACE, "size %llu\n", res_lvb->lvb_size);
mb->mbo_valid |= OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME |
- OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ OBD_MD_DOM_SIZE;
unlock_res(res);
}
fid_build_reg_res_name(fid, &resid);
res = ldlm_resource_get(mdt->mdt_namespace, NULL, &resid,
LDLM_IBITS, 1);
- if (IS_ERR(res) || res->lr_lvb_data == NULL)
+ if (IS_ERR(res))
RETURN(-ENOENT);
- /* if there is no DOM bit in the lock then glimpse is needed
- * to return valid size */
- if (!dom_lock) {
- rc = mdt_do_glimpse(env, mdt->mdt_namespace, res);
- if (rc < 0)
- GOTO(out, rc);
- }
-
/* Update lvbo data if DoM lock returned or if LVB is not yet valid. */
if (dom_lock || !mdt_dom_lvb_is_valid(res))
mdt_dom_lvbo_update(res, NULL, NULL, false);
mdt_lvb2body(res, mb);
-out:
ldlm_resource_putref(res);
RETURN(rc);
}
__u64 tmpflags = 0;
enum ldlm_error err;
- rc = policy(lock, &tmpflags, 0, &err, NULL);
+ rc = policy(lock, &tmpflags, LDLM_PROCESS_RESCAN, &err, NULL);
check_res_locked(res);
}
unlock_res(res);
struct ldlm_res_id *res_id = &info->mti_res_id;
struct lustre_handle dom_lh;
__u64 flags = LDLM_FL_AST_DISCARD_DATA;
- __u64 rc = 0;
+ int rc = 0;
policy->l_inodebits.bits = MDS_INODELOCK_DOM;
policy->l_inodebits.try_bits = 0;
ldlm_lock_decref(&dom_lh, LCK_PW);
}
+/* check if client has already DoM lock for given resource */
+bool mdt_dom_client_has_lock(struct mdt_thread_info *info,
+ const struct lu_fid *fid)
+{
+ struct mdt_device *mdt = info->mti_mdt;
+ union ldlm_policy_data *policy = &info->mti_policy;
+ struct ldlm_res_id *res_id = &info->mti_res_id;
+ struct lustre_handle lockh;
+ enum ldlm_mode mode;
+ struct ldlm_lock *lock;
+ bool rc;
+
+ policy->l_inodebits.bits = MDS_INODELOCK_DOM;
+ fid_build_reg_res_name(fid, res_id);
+
+ mode = ldlm_lock_match(mdt->mdt_namespace, LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_TEST_LOCK, res_id, LDLM_IBITS, policy,
+ LCK_PW, &lockh, 0);
+
+ /* There is no other PW lock on this object; finished. */
+ if (mode == 0)
+ return false;
+
+ lock = ldlm_handle2lock(&lockh);
+ if (lock == 0)
+ return false;
+
+ /* check if lock from the same client */
+ rc = (lock->l_export->exp_handle.h_cookie ==
+ info->mti_exp->exp_handle.h_cookie);
+ LDLM_LOCK_PUT(lock);
+ return rc;
+}
+
}
LPROC_SEQ_FOPS(mdt_sync_count);
+static int mdt_dom_lock_seq_show(struct seq_file *m, void *data)
+{
+ struct obd_device *obd = m->private;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+ seq_printf(m, "%u\n", (mdt->mdt_opts.mo_dom_lock != 0));
+ return 0;
+}
+
+static ssize_t
+mdt_dom_lock_seq_write(struct file *file, const char __user *buffer,
+ size_t count, loff_t *off)
+{
+ struct seq_file *m = file->private_data;
+ struct obd_device *obd = m->private;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ __s64 val;
+ int rc;
+
+ rc = lprocfs_str_to_s64(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ mdt->mdt_opts.mo_dom_lock = !!val;
+ return count;
+}
+LPROC_SEQ_FOPS(mdt_dom_lock);
+
LPROC_SEQ_FOPS_RO_TYPE(mdt, recovery_status);
LPROC_SEQ_FOPS_RO_TYPE(mdt, num_exports);
LPROC_SEQ_FOPS_RO_TYPE(mdt, target_instance);
.fops = &mdt_async_commit_count_fops },
{ .name = "sync_count",
.fops = &mdt_sync_count_fops },
+ { .name = "dom_lock",
+ .fops = &mdt_dom_lock_fops },
{ NULL }
};
{
struct md_attr *ma = &info->mti_attr;
__u64 open_flags = info->mti_spec.sp_cr_flags;
+ __u64 trybits = 0;
enum ldlm_mode lm = LCK_CR;
bool acq_lease = !!(open_flags & MDS_OPEN_LEASE);
bool try_layout = false;
bool create_layout = false;
int rc = 0;
+ int dom_stripes = LMM_NO_DOM;
+ bool dom_lock = false;
+
ENTRY;
*ibits = 0;
if (exp_connect_layout(info->mti_exp) && !create_layout &&
ma->ma_need & MA_LOV)
try_layout = true;
+
+ /* DoM files can have just MDT stripe or combined MDT + OST
+ * stripes.
+ * - In the first case the open for read/write will do IO to
+ * the MDT stripe and it makes sense to take IO lock in
+ * advance along with OPEN even if it is blocking lock.
+ * - In the second case it is just size of MDT stripe and it
+ * is quite unlikely that client will write into it, though
+ * it may read it. So IO lock will be taken optionally if it
+ * is non-blocking one.
+ */
+ if (ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)
+ dom_stripes = mdt_lmm_dom_entry(ma->ma_lmm);
+
+ if (dom_stripes == LMM_DOM_ONLY &&
+ info->mti_mdt->mdt_opts.mo_dom_lock != 0 &&
+ !mdt_dom_client_has_lock(info, mdt_object_fid(obj)))
+ dom_lock = true;
}
if (acq_lease) {
try_layout = false;
lhc = &info->mti_lh[MDT_LH_LOCAL];
+ } else if (dom_lock) {
+ lm = (open_flags & FMODE_WRITE) ? LCK_PW : LCK_PR;
+ *ibits = MDS_INODELOCK_DOM;
+ try_layout = false;
}
+
CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
PFID(mdt_object_fid(obj)),
atomic_read(&obj->mot_open_count), lm);
* lock for each open.
* However this is a double-edged sword because changing
* permission will revoke huge # of LOOKUP locks. */
- rc = mdt_object_lock_try(info, obj, lhc, ibits,
- MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_LOOKUP, false);
- } else if (*ibits != 0) {
- rc = mdt_object_lock(info, obj, lhc, *ibits);
+ trybits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
}
- CDEBUG(D_INODE, "%s: Requested bits lock:"DFID ", ibits = %#llx"
+ if (trybits != 0)
+ rc = mdt_object_lock_try(info, obj, lhc, ibits, trybits, false);
+ else if (*ibits != 0)
+ rc = mdt_object_lock(info, obj, lhc, *ibits);
+
+ CDEBUG(D_INODE, "%s: Requested bits lock:"DFID ", ibits = %#llx/%#llx"
", open_flags = %#llo, try_layout = %d : rc = %d\n",
mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(obj)),
- *ibits, open_flags, try_layout, rc);
+ *ibits, trybits, open_flags, try_layout, rc);
/* will change layout, revoke layout locks by enqueuing EX lock. */
if (rc == 0 && create_layout) {
if (ibits == 0 || rc == -MDT_EREMOTE_OPEN)
RETURN_EXIT;
- if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
+ if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT) &&
+ !(ibits & MDS_INODELOCK_DOM)) {
/* for the open request, the lock will only return to client
* if open or layout lock is granted. */
rc = 1;
mdt_object_open_unlock(info, o, lhc, ibits, rc);
out:
mdt_object_put(env, o);
+ if (rc == 0) {
+ rc = mdt_pack_size2body(info, rr->rr_fid2,
+ ibits & MDS_INODELOCK_DOM);
+ LASSERT(ergo(ibits & MDS_INODELOCK_DOM, !rc));
+ rc = 0;
+ }
out_parent_put:
if (parent != NULL)
mdt_object_put(env, parent);
mdt_object_open_unlock(info, child, lhc, ibits, result);
out_child:
mdt_object_put(info->mti_env, child);
+ if (result == 0) {
+ rc = mdt_pack_size2body(info, child_fid,
+ ibits & MDS_INODELOCK_DOM);
+ LASSERT(ergo(ibits & MDS_INODELOCK_DOM, !rc));
+ rc = 0;
+ }
out_parent:
mdt_object_unlock_put(info, parent, lh, result || !created);
out:
__swab32s(&b->mbo_uid_h);
__swab32s(&b->mbo_gid_h);
__swab32s(&b->mbo_projid);
- CLASSERT(offsetof(typeof(*b), mbo_padding_6) != 0);
- CLASSERT(offsetof(typeof(*b), mbo_padding_7) != 0);
+ __swab64s(&b->mbo_dom_size);
+ __swab64s(&b->mbo_dom_blocks);
CLASSERT(offsetof(typeof(*b), mbo_padding_8) != 0);
CLASSERT(offsetof(typeof(*b), mbo_padding_9) != 0);
CLASSERT(offsetof(typeof(*b), mbo_padding_10) != 0);
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_net.h>
-#include <lustre/lustre_lfsck_user.h>
#include <lustre_disk.h>
+#include <uapi/linux/lustre/lustre_lfsck_user.h>
+
#include <lustre_disk.h>
#include <uapi/linux/lustre/lustre_lfsck_user.h>
+
void lustre_assert_wire_constants(void)
{
/* Wire protocol assertions generated by 'wirecheck'
(long long)MDS_ATTR_FROM_OPEN);
LASSERTF(MDS_ATTR_BLOCKS == 0x0000000000008000ULL, "found 0x%.16llxULL\n",
(long long)MDS_ATTR_BLOCKS);
-
LASSERTF(MDS_ATTR_PROJID == 0x0000000000010000ULL, "found 0x%.16llxULL\n",
(long long)MDS_ATTR_PROJID);
LASSERTF(FLD_QUERY == 900, "found %lld\n",
OBD_MD_DEFAULT_MEA);
LASSERTF(OBD_MD_FLOSTLAYOUT == (0x0080000000000000ULL), "found 0x%.16llxULL\n",
OBD_MD_FLOSTLAYOUT);
-
LASSERTF(OBD_MD_FLPROJID == (0x0100000000000000ULL), "found 0x%.16llxULL\n",
OBD_MD_FLPROJID);
CLASSERT(OBD_FL_INLINEDATA == 0x00000001);
(long long)(int)offsetof(struct mdt_body, mbo_projid));
LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_projid) == 4, "found %lld\n",
(long long)(int)sizeof(((struct mdt_body *)0)->mbo_projid));
- LASSERTF((int)offsetof(struct mdt_body, mbo_padding_6) == 176, "found %lld\n",
- (long long)(int)offsetof(struct mdt_body, mbo_padding_6));
- LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_padding_6) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct mdt_body *)0)->mbo_padding_6));
- LASSERTF((int)offsetof(struct mdt_body, mbo_padding_7) == 184, "found %lld\n",
- (long long)(int)offsetof(struct mdt_body, mbo_padding_7));
- LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_padding_7) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct mdt_body *)0)->mbo_padding_7));
+ LASSERTF((int)offsetof(struct mdt_body, mbo_dom_size) == 176, "found %lld\n",
+ (long long)(int)offsetof(struct mdt_body, mbo_dom_size));
+ LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_dom_size) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct mdt_body *)0)->mbo_dom_size));
+ LASSERTF((int)offsetof(struct mdt_body, mbo_dom_blocks) == 184, "found %lld\n",
+ (long long)(int)offsetof(struct mdt_body, mbo_dom_blocks));
+ LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_dom_blocks) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct mdt_body *)0)->mbo_dom_blocks));
LASSERTF((int)offsetof(struct mdt_body, mbo_padding_8) == 192, "found %lld\n",
(long long)(int)offsetof(struct mdt_body, mbo_padding_8));
LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_padding_8) == 8, "found %lld\n",
(long long)(int)offsetof(struct llog_setattr64_rec, lsr_valid));
LASSERTF((int)sizeof(((struct llog_setattr64_rec *)0)->lsr_valid) == 8, "found %lld\n",
(long long)(int)sizeof(((struct llog_setattr64_rec *)0)->lsr_valid));
+ LASSERTF((int)offsetof(struct llog_setattr64_rec, lsr_tail) == 56, "found %lld\n",
+ (long long)(int)offsetof(struct llog_setattr64_rec, lsr_tail));
+ LASSERTF((int)sizeof(((struct llog_setattr64_rec *)0)->lsr_tail) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct llog_setattr64_rec *)0)->lsr_tail));
LASSERTF((int)offsetof(struct llog_setattr64_rec_v2, lsr_projid) == 56, "found %lld\n",
(long long)(int)offsetof(struct llog_setattr64_rec_v2, lsr_projid));
LASSERTF((int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_projid) == 4, "found %lld\n",
(long long)(int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_projid));
- LASSERTF((int)offsetof(struct llog_setattr64_rec_v2, lsr_tail) == 80, "found %lld\n",
- (long long)(int)offsetof(struct llog_setattr64_rec_v2, lsr_tail));
- LASSERTF((int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_tail) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_tail));
/* Checks for struct llog_size_change_rec */
LASSERTF((int)sizeof(struct llog_size_change_rec) == 64, "found %lld\n",
(long long)(int)offsetof(posix_acl_xattr_header, a_version));
LASSERTF((int)sizeof(((posix_acl_xattr_header *)0)->a_version) == 4, "found %lld\n",
(long long)(int)sizeof(((posix_acl_xattr_header *)0)->a_version));
-#ifndef HAVE_STRUCT_POSIX_ACL_XATTR
LASSERTF((int)offsetof(posix_acl_xattr_header, a_entries) == 4, "found %lld\n",
(long long)(int)offsetof(posix_acl_xattr_header, a_entries));
LASSERTF((int)sizeof(((posix_acl_xattr_header *)0)->a_entries) == 0, "found %lld\n",
(long long)(int)sizeof(((posix_acl_xattr_header *)0)->a_entries));
-#endif
#endif /* CONFIG_FS_POSIX_ACL */
/* Checks for struct link_ea_header */
test_smallio() {
OSC="mdc"
run_smalliomany $DOM
+ echo "### Data-on-MDT files, no IO lock on open ###"
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=0
+ OSC="mdc"
+ run_smalliomany $DOM
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=1
OSC="osc"
run_smalliomany $NORM
}
test_mdtest() {
OSC="mdc"
run_MDtest $DOM
+ echo "### Data-on-MDT files, NO IO lock on open ###"
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=0
+ OSC="mdc"
+ run_MDtest $DOM
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=1
+ echo "### Normal files, $OSTCOUNT OSTs ###"
OSC="osc"
run_MDtest $NORM
}
test_IOR() {
OSC="mdc"
run_IOR $DOM
+ echo "### Data-on-MDT files, no IO lock on open ###"
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=0
+ OSC="mdc"
+ run_IOR $DOM
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=1
OSC="osc"
run_IOR $NORM
}
test_dbench() {
OSC="mdc"
run_dbench $DOM
+ echo "### Data-on-MDT files, no IO lock on open ###"
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=0
+ OSC="mdc"
+ run_dbench $DOM
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=1
OSC="osc"
run_dbench $NORM
}
test_smf() {
OSC="mdc"
run_smallfile $DOM
+ echo "### Data-on-MDT files, no IO lock on open ###"
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=0
+ OSC="mdc"
+ run_smallfile $DOM
+ do_facet $SINGLEMDS lctl set_param -n mdt.*.dom_lock=1
OSC="osc"
run_smallfile $NORM
mkdir -p $DIR/$tdir
- $SETSTRIPE -E 1024K -L mdt $dom
+ $LFS setstripe -E 1024K -L mdt $dom
lctl set_param -n mdc.*.stats=clear
dd if=/dev/zero of=$dom bs=4096 count=1 || return 1
cat $dom > /dev/null
- local reads=$(lctl get_param -n mdc.*.stats | \
- awk '/ost_read/ {print $2}')
+ local reads=$(lctl get_param -n mdc.*.stats |
+ awk '/ost_read/ {print $2}')
[ -z $reads ] || error "Unexpected $reads READ RPCs"
ls $dom
rm -f $dom
}
run_test 271a "DoM: data is cached for read after write"
+test_271b() {
+ local dom=$DIR/$tdir/dom
+
+ mkdir -p $DIR/$tdir
+
+ $LFS setstripe -E 1024K -L mdt -E EOF $dom
+
+ lctl set_param -n mdc.*.stats=clear
+ dd if=/dev/zero of=$dom bs=4096 count=1 || return 1
+ cancel_lru_locks mdc
+ $CHECKSTAT -t file -s 4096 $dom || error "stat #1 fails"
+ # second stat to check size is cached on client
+ $CHECKSTAT -t file -s 4096 $dom || error "stat #2 fails"
+ local gls=$(lctl get_param -n mdc.*.stats |
+ awk '/ldlm_glimpse/ {print $2}')
+ [ -z $gls ] || error "Unexpected $gls glimpse RPCs"
+ rm -f $dom
+}
+run_test 271b "DoM: no glimpse RPC for stat (DoM only file)"
+
+test_271ba() {
+ local dom=$DIR/$tdir/dom
+
+ mkdir -p $DIR/$tdir
+
+ $LFS setstripe -E 1024K -L mdt -E EOF $dom
+
+ lctl set_param -n mdc.*.stats=clear
+ lctl set_param -n osc.*.stats=clear
+ dd if=/dev/zero of=$dom bs=2048K count=1 || return 1
+ cancel_lru_locks mdc
+ $CHECKSTAT -t file -s 2097152 $dom || error "stat"
+ # second stat to check size is cached on client
+ $CHECKSTAT -t file -s 2097152 $dom || error "stat"
+ local gls=$(lctl get_param -n mdc.*.stats |
+ awk '/ldlm_glimpse/ {print $2}')
+ [ -z $gls ] || error "Unexpected $gls glimpse RPCs"
+ local gls=$(lctl get_param -n osc.*.stats |
+ awk '/ldlm_glimpse/ {print $2}')
+ [ -z $gls ] || error "Unexpected $gls OSC glimpse RPCs"
+ rm -f $dom
+}
+run_test 271ba "DoM: no glimpse RPC for stat (combined file)"
+
+test_271c() {
+ # test to be enabled with lock_convert
+ skip "skipped until lock convert will be implemented" && return
+
+ local dom=$DIR/$tdir/dom
+
+ mkdir -p $DIR/$tdir
+
+ $LFS setstripe -E 1024K -L mdt $DIR/$tdir
+
+ local mdtidx=$($LFS getstripe -M $DIR/$tdir)
+ local facet=mds$((mdtidx + 1))
+
+ cancel_lru_locks mdc
+ do_facet $facet lctl set_param -n mdt.*.dom_lock=0
+ createmany -o $dom 1000
+ lctl set_param -n mdc.*.stats=clear
+ smalliomany -w $dom 1000 200
+ lctl get_param -n mdc.*.stats
+ local enq=$(lctl get_param -n mdc.*.stats |
+ awk '/ldlm_ibits_enqueue/ {print $2}')
+ # Each file has 1 open, 1 IO enqueues, total 2000
+ # but now we have also +1 getxattr for security.capability, total 3000
+ [ $enq -ge 2000 ] || error "Too few enqueues $enq, expected > 2000"
+ unlinkmany $dom 1000
+
+ cancel_lru_locks mdc
+ do_facet $facet lctl set_param -n mdt.*.dom_lock=1
+ createmany -o $dom 1000
+ lctl set_param -n mdc.*.stats=clear
+ smalliomany -w $dom 1000 200
+ lctl get_param -n mdc.*.stats
+ local enq_2=$(lctl get_param -n mdc.*.stats |
+ awk '/ldlm_ibits_enqueue/ {print $2}')
+ # Expect to see reduced amount of RPCs by 1000 due to single enqueue
+ # for OPEN and IO lock.
+ [ $((enq - enq_2)) -ge 1000 ] ||
+ error "Too many enqueues $enq_2, expected about $((enq - 1000))"
+ unlinkmany $dom 1000
+ return 0
+}
+run_test 271c "DoM: IO lock at open saves enqueue RPCs"
+
cleanup_test_300() {
trap 0
umask $SAVE_UMASK
}
run_test 93 "alloc_rr should not allocate on same ost"
+# Data-on-MDT tests
+test_100a() {
+ skip "Reserved for glimpse-ahead" && return
+ mkdir -p $DIR/$tdir
+
+ $SETSTRIPE -E 1024K -L mdt -E EOF $DIR/$tdir/dom
+
+ lctl set_param -n mdc.*.stats=clear
+ dd if=/dev/zero of=$DIR2/$tdir/dom bs=4096 count=1 || return 1
+
+ $CHECKSTAT -t file -s 4096 $DIR/$tdir/dom || error "stat #1"
+ # first stat from server should return size data and save glimpse
+ local reads=$(lctl get_param -n mdc.*.stats | \
+ awk '/ldlm_glimpse/ {print $2}')
+ [ -z $reads ] || error "Unexpected $reads glimpse RPCs"
+ # second stat to check size is NOT cached on client without IO lock
+ $CHECKSTAT -t file -s 4096 $DIR/$tdir/dom || error "stat #2"
+
+ local reads=$(lctl get_param -n mdc.*.stats | \
+ awk '/ldlm_glimpse/ {print $2}')
+ [ "1" == "$reads" ] || error "Expect 1 glimpse RPCs but got $reads"
+ rm -f $dom
+}
+run_test 100a "DoM: glimpse RPCs for stat without IO lock (DoM only file)"
+
+test_100b() {
+ mkdir -p $DIR/$tdir
+
+ $SETSTRIPE -E 1024K -L mdt -E EOF $DIR/$tdir/dom
+
+ lctl set_param -n mdc.*.stats=clear
+ dd if=/dev/zero of=$DIR2/$tdir/dom bs=4096 count=1 || return 1
+ cancel_lru_locks mdc
+ # first stat data from server should have size
+ $CHECKSTAT -t file -s 4096 $DIR/$tdir/dom || error "stat #1"
+ # second stat to check size is cached on client
+ $CHECKSTAT -t file -s 4096 $DIR/$tdir/dom || error "stat #2"
+
+ local reads=$(lctl get_param -n mdc.*.stats | \
+ awk '/ldlm_glimpse/ {print $2}')
+ # both stats should cause no glimpse requests
+ [ -z $reads ] || error "Unexpected $reads glimpse RPCs"
+ rm -f $dom
+}
+run_test 100b "DoM: no glimpse RPC for stat with IO lock (DoM only file)"
+
+test_100c() {
+ mkdir -p $DIR/$tdir
+
+ $SETSTRIPE -E 1024K -L mdt -E EOF $DIR/$tdir/dom
+
+ lctl set_param -n mdc.*.stats=clear
+ lctl set_param -n osc.*.stats=clear
+ dd if=/dev/zero of=$DIR2/$tdir/dom bs=2048K count=1 || return 1
+
+ # check that size is merged from MDT and OST correctly
+ $CHECKSTAT -t file -s 2097152 $DIR/$tdir/dom ||
+ error "Wrong size from stat #1"
+
+ local reads=$(lctl get_param -n osc.*.stats | grep ldlm_glimpse | wc -l)
+ [ $reads -eq 0 ] && error "Expect OST glimpse RPCs but got none"
+
+ rm -f $dom
+}
+run_test 100c "DoM: write vs stat without IO lock (combined file)"
+
+test_100d() {
+ mkdir -p $DIR/$tdir
+
+ $SETSTRIPE -E 1024K -L mdt -E EOF $DIR/$tdir/dom
+
+
+ dd if=/dev/zero of=$DIR2/$tdir/dom bs=2048K count=1 || return 1
+ lctl set_param -n mdc.*.stats=clear
+ $TRUNCATE $DIR2/$tdir/dom 4096
+
+ # check that reported size is valid after file grows to OST and
+ # is truncated back to MDT stripe size
+ $CHECKSTAT -t file -s 4096 $DIR/$tdir/dom ||
+ error "Wrong size from stat #1"
+
+ local reads=$(lctl get_param -n osc.*.stats | grep ldlm_glimpse | wc -l)
+ [ $reads -eq 0 ] && error "Expect OST glimpse but got none"
+
+ rm -f $dom
+}
+run_test 100d "DoM: write+truncate vs stat without IO lock (combined file)"
+
+
+test_101a() {
+ $LFS setstripe -E 1024K -L mdt -E EOF $DIR1/$tfile
+ lctl set_param -n mdc.*.stats=clear
+ # to get layout
+ $CHECKSTAT -t file $DIR1/$tfile
+ # open + IO lock
+ dd if=/dev/zero of=$DIR1/$tfile bs=4096 count=1 || error "Write fails"
+ # must discard pages
+ rm $DIR2/$tfile || error "Unlink fails"
+ local writes=$(lctl get_param -n mdc.*.stats | grep ost_write | wc -l)
+ [ $writes -eq 0 ] || error "Found WRITE RPC but expect none"
+}
+run_test 101a "Discard DoM data on unlink"
+
+test_101b() {
+ $LFS setstripe -E 1024K -L mdt -E EOF $DIR1/$tfile
+ touch $DIR1/${tfile}_2
+ lctl set_param -n mdc.*.stats=clear
+ # to get layout
+ $CHECKSTAT -t file $DIR1/$tfile
+ # open + IO lock
+ dd if=/dev/zero of=$DIR1/$tfile bs=4096 count=1 || error "Write fails"
+ # must discard pages
+ mv $DIR2/${tfile}_2 $DIR2/$tfile || error "Rename fails"
+ local writes=$(lctl get_param -n mdc.*.stats | grep ost_write | wc -l)
+ [ $writes -eq 0 ] || error "Found WRITE RPC but expect none"
+}
+run_test 101b "Discard DoM data on rename"
+
+test_101c() {
+ $LFS setstripe -E 1024K -L mdt -E EOF $DIR1/$tfile
+ lctl set_param -n mdc.*.stats=clear
+ # to get layout
+ $CHECKSTAT -t file $DIR1/$tfile
+ # open + IO lock
+ dd if=/dev/zero of=$DIR1/$tfile bs=4096 count=1 || error "Write fails"
+
+ $MULTIOP $DIR1/$tfile O_c &
+ MULTIOP_PID=$!
+ sleep 2
+ rm $DIR2/$tfile > /dev/null || error "Unlink fails"
+ kill -USR1 $MULTIOP_PID || return 2
+ wait $MULTIOP_PID || return 3
+ local writes=$(lctl get_param -n mdc.*.stats | grep ost_write | wc -l)
+ [ $writes -eq 0 ] || error "Found WRITE RPC but expect none"
+}
+run_test 101c "Discard DoM data on close-unlink"
+
log "cleanup: ======================================================"
# kill and wait in each test only guarentee script finish, but command in script
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
-#include <lustre/lustre_idl.h>
-#include <lustre/lustre_lfsck_user.h>
-#include <linux/lustre_disk.h>
+#include <linux/lustre/lustre_idl.h>
+#include <linux/lustre/lustre_lfsck_user.h>
+#include <linux/lustre/lustre_disk.h>
#define BLANK_LINE() \
do { \
CHECK_MEMBER(mdt_body, mbo_uid_h);
CHECK_MEMBER(mdt_body, mbo_gid_h);
CHECK_MEMBER(mdt_body, mbo_projid);
- CHECK_MEMBER(mdt_body, mbo_padding_6);
- CHECK_MEMBER(mdt_body, mbo_padding_7);
+ CHECK_MEMBER(mdt_body, mbo_dom_size);
+ CHECK_MEMBER(mdt_body, mbo_dom_blocks);
CHECK_MEMBER(mdt_body, mbo_padding_8);
CHECK_MEMBER(mdt_body, mbo_padding_9);
CHECK_MEMBER(mdt_body, mbo_padding_10);
#include <stdio.h>
#include <string.h>
-#include <lustre/lustre_idl.h>
-#include <lustre/lustre_lfsck_user.h>
-#include <linux/lustre_disk.h>
+#include <linux/lustre/lustre_idl.h>
+#include <linux/lustre/lustre_lfsck_user.h>
+#include <linux/lustre/lustre_disk.h>
#define LASSERT(cond) if (!(cond)) { printf("failed " #cond "\n"); ret = 1; }
#define LASSERTF(cond, fmt, ...) if (!(cond)) { printf("failed '" #cond "'" fmt, ## __VA_ARGS__);ret = 1;}
/*
* Compile-time LASSERT, which verifies correctness at compile-time rather
* than runtime. If "cond" is true, then there are two different cases
- * ("(non-zero)" and "0"). If "cond" is false, then there are two identical cases
- * ("0" and "0"), which is an error that causes the compiler to complain.
+ * ("(non-zero)" and "0"). If "cond" is false, then there are two identical
+ * cases ("0" and "0"), which is an error that causes the compiler to complain.
*/
#define CLASSERT(cond) do {switch (1) {case (cond): case 0: break; } } while (0)
(long long)(int)offsetof(struct mdt_body, mbo_projid));
LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_projid) == 4, "found %lld\n",
(long long)(int)sizeof(((struct mdt_body *)0)->mbo_projid));
- LASSERTF((int)offsetof(struct mdt_body, mbo_padding_6) == 176, "found %lld\n",
- (long long)(int)offsetof(struct mdt_body, mbo_padding_6));
- LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_padding_6) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct mdt_body *)0)->mbo_padding_6));
- LASSERTF((int)offsetof(struct mdt_body, mbo_padding_7) == 184, "found %lld\n",
- (long long)(int)offsetof(struct mdt_body, mbo_padding_7));
- LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_padding_7) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct mdt_body *)0)->mbo_padding_7));
+ LASSERTF((int)offsetof(struct mdt_body, mbo_dom_size) == 176, "found %lld\n",
+ (long long)(int)offsetof(struct mdt_body, mbo_dom_size));
+ LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_dom_size) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct mdt_body *)0)->mbo_dom_size));
+ LASSERTF((int)offsetof(struct mdt_body, mbo_dom_blocks) == 184, "found %lld\n",
+ (long long)(int)offsetof(struct mdt_body, mbo_dom_blocks));
+ LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_dom_blocks) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct mdt_body *)0)->mbo_dom_blocks));
LASSERTF((int)offsetof(struct mdt_body, mbo_padding_8) == 192, "found %lld\n",
(long long)(int)offsetof(struct mdt_body, mbo_padding_8));
LASSERTF((int)sizeof(((struct mdt_body *)0)->mbo_padding_8) == 8, "found %lld\n",
(long long)(int)offsetof(struct llog_setattr64_rec, lsr_valid));
LASSERTF((int)sizeof(((struct llog_setattr64_rec *)0)->lsr_valid) == 8, "found %lld\n",
(long long)(int)sizeof(((struct llog_setattr64_rec *)0)->lsr_valid));
+ LASSERTF((int)offsetof(struct llog_setattr64_rec, lsr_tail) == 56, "found %lld\n",
+ (long long)(int)offsetof(struct llog_setattr64_rec, lsr_tail));
+ LASSERTF((int)sizeof(((struct llog_setattr64_rec *)0)->lsr_tail) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct llog_setattr64_rec *)0)->lsr_tail));
LASSERTF((int)offsetof(struct llog_setattr64_rec_v2, lsr_projid) == 56, "found %lld\n",
(long long)(int)offsetof(struct llog_setattr64_rec_v2, lsr_projid));
LASSERTF((int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_projid) == 4, "found %lld\n",
(long long)(int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_projid));
- LASSERTF((int)offsetof(struct llog_setattr64_rec_v2, lsr_tail) == 80, "found %lld\n",
- (long long)(int)offsetof(struct llog_setattr64_rec_v2, lsr_tail));
- LASSERTF((int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_tail) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct llog_setattr64_rec_v2 *)0)->lsr_tail));
/* Checks for struct llog_size_change_rec */
LASSERTF((int)sizeof(struct llog_size_change_rec) == 64, "found %lld\n",