const struct ldlm_request *dlm_req,
const struct ldlm_callback_suite *cbs);
int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
- enum ldlm_type type, __u8 with_policy,
- enum ldlm_mode mode, __u64 *flags, void *lvb,
- __u32 lvb_len,
+ struct ldlm_enqueue_info *einfo, __u8 with_policy,
+ __u64 *flags, void *lvb, __u32 lvb_len,
const struct lustre_handle *lockh, int rc);
int ldlm_cli_enqueue_local(const struct lu_env *env,
struct ldlm_namespace *ns,
return !!(exp_connect_flags2(exp) & OBD_CONNECT2_LSEEK);
}
+static inline int exp_connect_dom_lvb(struct obd_export *exp)
+{
+ return !!(exp_connect_flags2(exp) & OBD_CONNECT2_DOM_LVB);
+}
+
enum {
/* archive_ids in array format */
KKUC_CT_DATA_ARRAY_MAGIC = 0x092013cea,
#define OBD_CONNECT2_FIDMAP 0x10000ULL /* FID map */
#define OBD_CONNECT2_GETATTR_PFID 0x20000ULL /* pack parent FID in getattr */
#define OBD_CONNECT2_LSEEK 0x40000ULL /* SEEK_HOLE/DATA RPC */
+#define OBD_CONNECT2_DOM_LVB 0x80000ULL /* pack DOM glimpse data in LVB */
/* XXX README XXX:
* Please DO NOT add flag values here before first ensuring that this same
* flag value is not in use on some other branch. Please clear any such
OBD_CONNECT2_CRUSH | \
OBD_CONNECT2_ENCRYPT | \
OBD_CONNECT2_GETATTR_PFID |\
- OBD_CONNECT2_LSEEK)
+ OBD_CONNECT2_LSEEK | OBD_CONNECT2_DOM_LVB)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
if (*ldlm_flags & LDLM_FL_BLOCK_NOWAIT)
RETURN(-EWOULDBLOCK);
- /* Combined DOM lock came across GROUP
- * DOM lock, it makes the thread to be
- * blocked for a long time, not allowed,
- * the trybits to be used instead.
- * Not combined DOM lock is requested by
- * client, and have to wait for long
- * until re-worked to a non-intent
- * request). */
- if ((req_bits & MDS_INODELOCK_DOM) &&
+ /* Local combined DOM lock came across
+ * GROUP DOM lock, it makes the thread
+ * to be blocked for a long time, not
+ * allowed, the trybits to be used
+ * instead.
+ */
+ if (!req->l_export &&
+ (req_bits & MDS_INODELOCK_DOM) &&
(req_bits & ~MDS_INODELOCK_DOM))
- RETURN(-EPROTO);
+ LBUG();
goto skip_work_list;
}
}
}
-static bool ldlm_request_slot_needed(enum ldlm_type type)
+static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
{
- return type == LDLM_FLOCK || type == LDLM_IBITS;
+ /* exclude EXTENT locks and DOM-only IBITS locks because they
+ * are asynchronous and don't wait on server being blocked.
+ */
+ return einfo->ei_type == LDLM_FLOCK ||
+ (einfo->ei_type == LDLM_IBITS &&
+ einfo->ei_inodebits != MDS_INODELOCK_DOM);
}
/**
* Called after receiving reply from server.
*/
int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
- enum ldlm_type type, __u8 with_policy,
- enum ldlm_mode mode, __u64 *flags, void *lvb,
+ struct ldlm_enqueue_info *einfo,
+ __u8 with_policy, __u64 *ldlm_flags, void *lvb,
__u32 lvb_len, const struct lustre_handle *lockh,
int rc)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
const struct lu_env *env = NULL;
- int is_replay = *flags & LDLM_FL_REPLAY;
+ int is_replay = *ldlm_flags & LDLM_FL_REPLAY;
struct ldlm_lock *lock;
struct ldlm_reply *reply;
int cleanup_phase = 1;
ENTRY;
- if (ldlm_request_slot_needed(type))
+ if (ldlm_request_slot_needed(einfo))
obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
ptlrpc_put_mod_rpc_slot(req);
lock = ldlm_handle2lock(lockh);
/* ldlm_cli_enqueue is holding a reference on this lock. */
if (!lock) {
- LASSERT(type == LDLM_FLOCK);
+ LASSERT(einfo->ei_type == LDLM_FLOCK);
RETURN(-ENOLCK);
}
lock->l_remote_handle = reply->lock_handle;
}
- *flags = ldlm_flags_from_wire(reply->lock_flags);
+ *ldlm_flags = ldlm_flags_from_wire(reply->lock_flags);
lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
LDLM_FL_INHERIT_MASK);
unlock_res_and_lock(lock);
CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
- lock, reply->lock_handle.cookie, *flags);
+ lock, reply->lock_handle.cookie, *ldlm_flags);
/*
* If enqueue returned a blocked lock but the completion handler has
* already run, then it fixed up the resource and we don't need to do it
* again.
*/
- if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+ if ((*ldlm_flags) & LDLM_FL_LOCK_CHANGED) {
int newmode = reply->lock_desc.l_req_mode;
LASSERT(!is_replay);
&lock->l_policy_data);
}
- if (type != LDLM_PLAIN)
+ if (einfo->ei_type != LDLM_PLAIN)
LDLM_DEBUG(lock,
"client-side enqueue, new policy data");
}
- if ((*flags) & LDLM_FL_AST_SENT) {
+ if ((*ldlm_flags) & LDLM_FL_AST_SENT) {
lock_res_and_lock(lock);
ldlm_bl_desc2lock(&reply->lock_desc, lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
}
if (!is_replay) {
- rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+ rc = ldlm_lock_enqueue(env, ns, &lock, NULL, ldlm_flags);
if (lock->l_completion_ast != NULL) {
- int err = lock->l_completion_ast(lock, *flags, NULL);
+ int err = lock->l_completion_ast(lock, *ldlm_flags,
+ NULL);
if (!rc)
rc = err;
EXIT;
cleanup:
if (cleanup_phase == 1 && rc)
- failed_lock_cleanup(ns, lock, mode);
+ failed_lock_cleanup(ns, lock, einfo->ei_mode);
/* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
LDLM_LOCK_PUT(lock);
LDLM_LOCK_RELEASE(lock);
/* extended LDLM opcodes in client stats */
if (exp->exp_obd->obd_svc_stats != NULL) {
- bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-
- /* OST glimpse has no intent buffer */
- if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
- RCL_CLIENT)) {
- struct ldlm_intent *it;
-
- it = req_capsule_client_get(&req->rq_pill,
- &RMF_LDLM_INTENT);
- glimpse = (it && (it->opc == IT_GLIMPSE));
- }
-
- if (!glimpse)
- ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
- else
+ /* glimpse is intent with no intent buffer */
+ if (*flags & LDLM_FL_HAS_INTENT &&
+ !req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
+ RCL_CLIENT))
lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
PTLRPC_LAST_CNTR +
LDLM_GLIMPSE_ENQUEUE);
+ else
+ ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
}
/* It is important to obtain modify RPC slot first (if applicable), so
if (einfo->ei_enq_slot)
ptlrpc_get_mod_rpc_slot(req);
- if (ldlm_request_slot_needed(einfo->ei_type)) {
+ if (ldlm_request_slot_needed(einfo)) {
rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
if (rc) {
if (einfo->ei_enq_slot)
rc = ptlrpc_queue_wait(req);
- err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
- einfo->ei_mode, flags, lvb, lvb_len,
- lockh, rc);
+ err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
+ lvb, lvb_len, lockh, rc);
/*
* If ldlm_cli_enqueue_fini did not find the lock, we need to free
OBD_CONNECT2_ASYNC_DISCARD |
OBD_CONNECT2_PCC |
OBD_CONNECT2_CRUSH | OBD_CONNECT2_LSEEK |
- OBD_CONNECT2_GETATTR_PFID;
+ OBD_CONNECT2_GETATTR_PFID |
+ OBD_CONNECT2_DOM_LVB;
#ifdef HAVE_LRU_RESIZE_SUPPORT
if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
res_id, type, policy, mode, lockh, match_flags);
-
if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
RETURN(rc);
* Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
* and ldlm_lock caches.
*/
-static int mdc_dlm_blocking_ast0(const struct lu_env *env,
- struct ldlm_lock *dlmlock,
- int flag)
+static int mdc_dlm_canceling(const struct lu_env *env,
+ struct ldlm_lock *dlmlock)
{
struct cl_object *obj = NULL;
int result = 0;
ENTRY;
- LASSERT(flag == LDLM_CB_CANCELING);
- LASSERT(dlmlock != NULL);
-
lock_res_and_lock(dlmlock);
- if (dlmlock->l_granted_mode != dlmlock->l_req_mode) {
+ if (!ldlm_is_granted(dlmlock)) {
dlmlock->l_ast_data = NULL;
unlock_res_and_lock(dlmlock);
RETURN(0);
}
int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
- struct ldlm_lock_desc *new, void *data, int flag)
+ struct ldlm_lock_desc *new, void *data, int reason)
{
int rc = 0;
ENTRY;
- switch (flag) {
+ switch (reason) {
case LDLM_CB_BLOCKING: {
struct lustre_handle lockh;
break;
}
- rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
+ rc = mdc_dlm_canceling(env, dlmlock);
cl_env_put(env, &refcheck);
break;
}
attr->cat_kms = size;
setkms = 1;
}
+ ldlm_lock_allow_match_locked(dlmlock);
}
/* The size should not be less than the kms */
/* Lock must have been granted. */
lock_res_and_lock(dlmlock);
- if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
+ if (ldlm_is_granted(dlmlock)) {
struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr;
/* extend the lock extent, otherwise it will have problem when
/**
* Lock upcall function that is executed either when a reply to ENQUEUE rpc is
- * received from a server, or after osc_enqueue_base() matched a local DLM
+ * received from a server, or after mdc_enqueue_send() matched a local DLM
* lock.
*/
static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
RETURN(rc);
}
+/* This is needed only for old servers (before 2.14) support */
int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb)
{
struct mdt_body *body;
+ /* get LVB data from mdt_body otherwise */
body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
if (!body)
RETURN(-EPROTO);
- lvb->lvb_mtime = body->mbo_mtime;
- lvb->lvb_atime = body->mbo_atime;
- lvb->lvb_ctime = body->mbo_ctime;
- lvb->lvb_blocks = body->mbo_dom_blocks;
- lvb->lvb_size = body->mbo_dom_size;
+ if (!(body->mbo_valid & OBD_MD_DOM_SIZE))
+ RETURN(-EPROTO);
+ mdc_body2lvb(body, lvb);
RETURN(0);
}
-int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
- void *cookie, struct lustre_handle *lockh,
- enum ldlm_mode mode, __u64 *flags, int errcode)
+int mdc_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
+ osc_enqueue_upcall_f upcall, void *cookie,
+ struct lustre_handle *lockh, enum ldlm_mode mode,
+ __u64 *flags, int errcode)
{
struct osc_lock *ols = cookie;
- struct ldlm_lock *lock;
+ bool glimpse = *flags & LDLM_FL_HAS_INTENT;
int rc = 0;
ENTRY;
- /* The request was created before ldlm_cli_enqueue call. */
- if (errcode == ELDLM_LOCK_ABORTED) {
+ /* needed only for glimpse from an old server (< 2.14) */
+ if (glimpse && !exp_connect_dom_lvb(exp))
+ rc = mdc_fill_lvb(req, &ols->ols_lvb);
+
+ if (glimpse && errcode == ELDLM_LOCK_ABORTED) {
struct ldlm_reply *rep;
rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- LASSERT(rep != NULL);
-
- rep->lock_policy_res2 =
- ptlrpc_status_ntoh(rep->lock_policy_res2);
- if (rep->lock_policy_res2)
- errcode = rep->lock_policy_res2;
-
- rc = mdc_fill_lvb(req, &ols->ols_lvb);
+ if (likely(rep)) {
+ rep->lock_policy_res2 =
+ ptlrpc_status_ntoh(rep->lock_policy_res2);
+ if (rep->lock_policy_res2)
+ errcode = rep->lock_policy_res2;
+ } else {
+ rc = -EPROTO;
+ }
*flags |= LDLM_FL_LVB_READY;
} else if (errcode == ELDLM_OK) {
+ struct ldlm_lock *lock;
+
/* Callers have references, should be valid always */
lock = ldlm_handle2lock(lockh);
- LASSERT(lock);
- rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
+ /* At this point ols_lvb must be filled with correct LVB either
+ * by mdc_fill_lvb() above or by ldlm_cli_enqueue_fini().
+ * DoM uses l_ost_lvb to store LVB data, so copy it here from
+ * just updated ols_lvb.
+ */
+ lock_res_and_lock(lock);
+ memcpy(&lock->l_ost_lvb, &ols->ols_lvb,
+ sizeof(lock->l_ost_lvb));
+ unlock_res_and_lock(lock);
LDLM_LOCK_PUT(lock);
*flags |= LDLM_FL_LVB_READY;
}
struct ldlm_lock *lock;
struct lustre_handle *lockh = &aa->oa_lockh;
enum ldlm_mode mode = aa->oa_mode;
+ struct ldlm_enqueue_info einfo = {
+ .ei_type = aa->oa_type,
+ .ei_mode = mode,
+ };
ENTRY;
/* Take an additional reference so that a blocking AST that
* ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
* to arrive after an upcall has been executed by
- * osc_enqueue_fini(). */
+ * mdc_enqueue_fini().
+ */
ldlm_lock_addref(lockh, mode);
/* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
/* Complete obtaining the lock procedure. */
- rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
- aa->oa_mode, aa->oa_flags, NULL, 0,
- lockh, rc);
+ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
+ aa->oa_lvb, aa->oa_lvb ?
+ sizeof(*aa->oa_lvb) : 0, lockh, rc);
/* Complete mdc stuff. */
- rc = mdc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
- aa->oa_flags, rc);
+ rc = mdc_enqueue_fini(aa->oa_exp, req, aa->oa_upcall, aa->oa_cookie,
+ lockh, mode, aa->oa_flags, rc);
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
* release locks just after they are obtained. */
int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
struct ldlm_res_id *res_id, __u64 *flags,
- union ldlm_policy_data *policy,
- struct ost_lvb *lvb, int kms_valid,
+ union ldlm_policy_data *policy, struct ost_lvb *lvb,
osc_enqueue_upcall_f upcall, void *cookie,
struct ldlm_enqueue_info *einfo, int async)
{
__u64 match_flags = *flags;
LIST_HEAD(cancels);
int rc, count;
+ int lvb_size;
+ bool compat_glimpse = glimpse && !exp_connect_dom_lvb(exp);
ENTRY;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
+ match_flags |= LDLM_FL_LVB_READY;
if (glimpse)
match_flags |= LDLM_FL_BLOCK_GRANTED;
- /* DOM locking uses LDLM_FL_KMS_IGNORE to mark locks wich have no valid
- * LVB information, e.g. canceled locks or locks of just pruned object,
- * such locks should be skipped.
- */
mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
einfo->ei_type, policy, mode, &lockh);
if (mode) {
if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
RETURN(-ENOLCK);
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT);
+ /* Glimpse is intent on old server */
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), compat_glimpse ?
+ &RQF_LDLM_INTENT : &RQF_LDLM_ENQUEUE);
if (req == NULL)
RETURN(-ENOMEM);
RETURN(rc);
}
- /* pack the intent */
- lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
- lit->opc = glimpse ? IT_GLIMPSE : IT_BRW;
-
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0);
- req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
- ptlrpc_request_set_replen(req);
+ if (compat_glimpse) {
+ /* pack the glimpse intent */
+ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+ lit->opc = IT_GLIMPSE;
+ }
/* users of mdc_enqueue() can pass this flag for ldlm_lock_match() */
*flags &= ~LDLM_FL_BLOCK_GRANTED;
- /* All MDC IO locks are intents */
- *flags |= LDLM_FL_HAS_INTENT;
- rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, NULL,
- 0, LVB_T_NONE, &lockh, async);
+
+ if (compat_glimpse) {
+ req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
+ lvb_size = 0;
+ } else {
+ lvb_size = sizeof(*lvb);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ lvb_size);
+ }
+ ptlrpc_request_set_replen(req);
+
+ rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
+ lvb_size, LVB_T_OST, &lockh, async);
if (async) {
if (!rc) {
struct osc_enqueue_args *aa;
aa->oa_cookie = cookie;
aa->oa_speculative = false;
aa->oa_flags = flags;
- aa->oa_lvb = lvb;
+ aa->oa_lvb = compat_glimpse ? NULL : lvb;
req->rq_interpret_reply = mdc_enqueue_interpret;
ptlrpcd_add_req(req);
RETURN(rc);
}
- rc = mdc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
+ rc = mdc_enqueue_fini(exp, req, upcall, cookie, &lockh, einfo->ei_mode,
flags, rc);
ptlrpc_req_finished(req);
RETURN(rc);
mdc_lock_build_policy(env, lock, policy);
LASSERT(!oscl->ols_speculative);
result = mdc_enqueue_send(env, osc_export(osc), resname,
- &oscl->ols_flags, policy,
- &oscl->ols_lvb, osc->oo_oinfo->loi_kms_valid,
+ &oscl->ols_flags, policy, &oscl->ols_lvb,
upcall, cookie, &oscl->ols_einfo, async);
if (result == 0) {
if (osc_lock_is_lockless(oscl)) {
* so init it here with given osc_object.
*/
mdc_set_dom_lock_data(lock, cl2osc(obj));
- RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+ RETURN(mdc_dlm_canceling(env, lock));
}
static const struct cl_object_operations mdc_ops = {
}
#endif
+static inline void mdc_body2lvb(struct mdt_body *body, struct ost_lvb *lvb)
+{
+ LASSERT(body->mbo_valid & OBD_MD_DOM_SIZE);
+ lvb->lvb_mtime = body->mbo_mtime;
+ lvb->lvb_atime = body->mbo_atime;
+ lvb->lvb_ctime = body->mbo_ctime;
+ lvb->lvb_blocks = body->mbo_dom_blocks;
+ lvb->lvb_size = body->mbo_dom_size;
+}
+
static inline unsigned long hash_x_index(__u64 hash, int hash64)
{
if (BITS_PER_LONG == 32 && hash64)
LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
ldlm_it2str(it->it_op), body->mbo_dom_size);
- rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
+ lock_res_and_lock(lock);
+ mdc_body2lvb(body, &lock->l_ost_lvb);
+ ldlm_lock_allow_match_locked(lock);
+ unlock_res_and_lock(lock);
}
out_lock:
LDLM_LOCK_PUT(lock);
if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
rc = -ETIMEDOUT;
- rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
- &flags, NULL, 0, lockh, rc);
+ rc = ldlm_cli_enqueue_fini(exp, req, einfo, 1, &flags, NULL, 0,
+ lockh, rc);
if (rc < 0) {
CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
exp->exp_obd->obd_name, rc);
} else {
rc = err_serious(-EFAULT);
}
+ } else if (ldesc->l_resource.lr_type == LDLM_IBITS &&
+ ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) {
+ struct ldlm_reply *rep;
+
+ /* No intent was provided but INTENT flag is set along with
+ * DOM bit, this is considered as GLIMPSE request.
+ * This logic is common for MDT and OST glimpse
+ */
+ mdt_ptlrpc_stats_update(req, IT_GLIMPSE);
+ rc = mdt_glimpse_enqueue(info, ns, lockp, flags);
+ /* Check whether the reply has been packed successfully. */
+ if (req->rq_repmsg != NULL) {
+ rep = req_capsule_server_get(info->mti_pill,
+ &RMF_DLM_REP);
+ rep->lock_policy_res2 =
+ ptlrpc_status_hton(rep->lock_policy_res2);
+ }
} else {
/* No intent was provided */
req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
return rc;
}
-static void mdt_lvb2body(struct ldlm_resource *res, struct mdt_body *mb)
+static void mdt_lvb2reply(struct ldlm_resource *res, struct mdt_body *mb,
+ struct ost_lvb *lvb)
{
struct ost_lvb *res_lvb;
lock_res(res);
res_lvb = res->lr_lvb_data;
- mb->mbo_dom_size = res_lvb->lvb_size;
- mb->mbo_dom_blocks = res_lvb->lvb_blocks;
- mb->mbo_mtime = res_lvb->lvb_mtime;
- mb->mbo_ctime = res_lvb->lvb_ctime;
- mb->mbo_atime = res_lvb->lvb_atime;
-
+ if (lvb)
+ *lvb = *res_lvb;
+
+ if (mb) {
+ mb->mbo_dom_size = res_lvb->lvb_size;
+ mb->mbo_dom_blocks = res_lvb->lvb_blocks;
+ mb->mbo_mtime = res_lvb->lvb_mtime;
+ mb->mbo_ctime = res_lvb->lvb_ctime;
+ mb->mbo_atime = res_lvb->lvb_atime;
+ mb->mbo_valid |= OBD_MD_FLATIME | OBD_MD_FLCTIME |
+ OBD_MD_FLMTIME | OBD_MD_DOM_SIZE;
+ }
CDEBUG(D_DLMTRACE, "size %llu\n", res_lvb->lvb_size);
-
- mb->mbo_valid |= OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME |
- OBD_MD_DOM_SIZE;
unlock_res(res);
}
if (dom_lock || !mdt_dom_lvb_is_valid(res))
mdt_dom_lvbo_update(res, NULL, NULL, false);
- mdt_lvb2body(res, mb);
+ mdt_lvb2reply(res, mb, NULL);
ldlm_resource_putref(res);
RETURN(rc);
}
ldlm_processing_policy policy;
struct ldlm_reply *rep;
struct mdt_body *mbo;
+ struct ost_lvb *lvb;
+ bool old_client = !exp_connect_dom_lvb(mti->mti_exp);
int rc;
ENTRY;
policy = ldlm_get_processing_policy(res);
LASSERT(policy != NULL);
- req_capsule_set_size(mti->mti_pill, &RMF_MDT_MD, RCL_SERVER, 0);
- req_capsule_set_size(mti->mti_pill, &RMF_ACL, RCL_SERVER, 0);
+ if (unlikely(old_client)) {
+ req_capsule_set_size(mti->mti_pill, &RMF_MDT_MD, RCL_SERVER, 0);
+ req_capsule_set_size(mti->mti_pill, &RMF_ACL, RCL_SERVER, 0);
+ } else {
+ req_capsule_set_size(mti->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
+ sizeof(*lvb));
+ }
rc = req_capsule_server_pack(mti->mti_pill);
if (rc)
RETURN(err_serious(rc));
rep = req_capsule_server_get(mti->mti_pill, &RMF_DLM_REP);
- if (rep == NULL)
- RETURN(-EPROTO);
- mbo = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
- if (mbo == NULL)
- RETURN(-EPROTO);
+ if (unlikely(old_client)) {
+ mbo = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
+ LASSERT(mbo);
+ lvb = NULL;
+ } else {
+ lvb = req_capsule_server_get(mti->mti_pill, &RMF_DLM_LVB);
+ LASSERT(lvb);
+ mbo = NULL;
+ }
lock_res(res);
/* Check if this is a resend case (MSG_RESENT is set on RPC) and a
if (rc == -ENOENT) {
/* We are racing with unlink(); just return -ENOENT */
rep->lock_policy_res2 = ptlrpc_status_hton(-ENOENT);
- rc = 0;
} else if (rc == -EINVAL) {
/* this is possible is client lock has been cancelled but
* still exists on server. If that lock was found on server
* as only conflicting lock then the client has already
* size authority and glimpse is not needed. */
CDEBUG(D_DLMTRACE, "Glimpse from the client owning lock\n");
- rc = 0;
} else if (rc < 0) {
RETURN(rc);
}
/* LVB can be without valid data in case of DOM */
if (!mdt_dom_lvb_is_valid(res))
mdt_dom_lvbo_update(res, lock, NULL, false);
- mdt_lvb2body(res, mbo);
+ mdt_lvb2reply(res, mbo, lvb);
+
RETURN(rc);
}
GOTO(out_fail, rc);
mdt_dom_disk_lvbo_update(mti->mti_env, mo, res, false);
}
- mdt_lvb2body(res, mbo);
+ mdt_lvb2reply(res, mbo, NULL);
out_fail:
rep->lock_policy_res2 = clear_serious(rc);
if (rep->lock_policy_res2) {
return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock);
}
+ /* Always prefer DoM LVB data because layout is never returned in
+ * LVB when lock bits are combined with DoM, this is either GETATTR
+ * or OPEN enqueue. Meanwhile GL AST can be issued on such combined
+ * lock bits and it uses LVB for DoM data.
+ */
if (ldlm_has_dom(lock))
return sizeof(struct ost_lvb);
GOTO(out, rc = -ENOMEM);
}
- /* LVB for DoM lock is needed only for glimpse,
- * don't fill DoM data if there is layout lock */
+ /* DOM LVB is used by glimpse and IO completion when
+ * DoM bits is always alone.
+ * If DoM bit is combined with any other bit then it is
+ * intent OPEN or GETATTR lock which is not filling
+ * LVB buffer in reply neither for DoM nor for LAYOUT.
+ */
if (ldlm_has_dom(lock)) {
struct ldlm_resource *res = lock->l_resource;
int lvb_len = sizeof(struct ost_lvb);
if (!mdt_dom_lvb_is_valid(res))
- mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0);
-
- if (lvb_len > *lvblen)
- lvb_len = *lvblen;
+ mdt_dom_lvbo_update(res, lock, NULL, 0);
+ LASSERT(*lvblen >= lvb_len);
lock_res(res);
memcpy(lvb, res->lr_lvb_data, lvb_len);
unlock_res(res);
-
GOTO(out, rc = lvb_len);
}
"fidmap", /* 0x10000 */
"getattr_pfid", /* 0x20000 */
"lseek", /* 0x40000 */
+ "dom_lvb", /* 0x80000 */
NULL
};
struct ost_lvb *lvb = aa->oa_lvb;
__u32 lvb_len = sizeof(*lvb);
__u64 flags = 0;
+ struct ldlm_enqueue_info einfo = {
+ .ei_type = aa->oa_type,
+ .ei_mode = mode,
+ };
ENTRY;
}
/* Complete obtaining the lock procedure. */
- rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
- aa->oa_mode, aa->oa_flags, lvb, lvb_len,
- lockh, rc);
+ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
+ lvb, lvb_len, lockh, rc);
/* Complete osc stuff. */
rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
aa->oa_flags, aa->oa_speculative, rc);
if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
RETURN(-ENOLCK);
- if (intent) {
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_LDLM_ENQUEUE_LVB);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
- sizeof *lvb);
- ptlrpc_request_set_replen(req);
- }
-
/* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
*flags &= ~LDLM_FL_BLOCK_GRANTED;
req->rq_interpret_reply = osc_enqueue_interpret;
ptlrpc_set_add_req(rqset, req);
- } else if (intent) {
- ptlrpc_req_finished(req);
}
RETURN(rc);
}
rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
flags, speculative, rc);
- if (intent)
- ptlrpc_req_finished(req);
RETURN(rc);
}
policy->l_extent.end |= ~PAGE_MASK;
/* Next, search for already existing extent locks that will cover us */
- /* If we're trying to read, we also search for an existing PW lock. The
- * VFS and page cache already protect us locally, so lots of readers/
- * writers can share a single PW lock. */
- rc = mode;
- if (mode == LCK_PR)
- rc |= LCK_PW;
-
rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
- res_id, type, policy, rc, lockh,
+ res_id, type, policy, mode, lockh,
match_flags);
if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
RETURN(rc);
OBD_CONNECT2_GETATTR_PFID);
LASSERTF(OBD_CONNECT2_LSEEK == 0x40000ULL, "found 0x%.16llxULL\n",
OBD_CONNECT2_LSEEK);
+ LASSERTF(OBD_CONNECT2_DOM_LVB == 0x80000ULL, "found 0x%.16llxULL\n",
+ OBD_CONNECT2_DOM_LVB);
LASSERTF(OBD_CKSUM_CRC32 == 0x00000001UL, "found 0x%.8xUL\n",
(unsigned)OBD_CKSUM_CRC32);
LASSERTF(OBD_CKSUM_ADLER == 0x00000002UL, "found 0x%.8xUL\n",
(long long)(int)sizeof(((struct ldlm_inodebits *)0)->try_bits));
LASSERTF((int)offsetof(struct ldlm_inodebits, li_gid) == 16, "found %lld\n",
(long long)(int)offsetof(struct ldlm_inodebits, li_gid));
-
LASSERTF((int)sizeof(((struct ldlm_inodebits *)0)->li_gid) == 8, "found %lld\n",
(long long)(int)sizeof(((struct ldlm_inodebits *)0)->li_gid));
struct qsd_async_args *aa = (struct qsd_async_args *)arg;
struct ldlm_reply *lockrep;
__u64 flags = LDLM_FL_HAS_INTENT;
+ struct ldlm_enqueue_info einfo = {
+ .ei_type = LDLM_PLAIN,
+ .ei_mode = LCK_CR,
+ };
ENTRY;
LASSERT(aa->aa_exp);
req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
- rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
- &flags, (void *)aa->aa_lvb,
- sizeof(struct lquota_lvb), lockh, rc);
+ rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, &einfo, 0, &flags,
+ aa->aa_lvb, sizeof(*(aa->aa_lvb)),
+ lockh, rc);
if (rc < 0) {
/* the lock has been destroyed, forget about the lock handle */
memset(lockh, 0, sizeof(*lockh));
test_sanityn()
{
SANITYN_ONLY=${SANITYN_ONLY:-"1 2 4 5 6 7 8 9 10 11 12 14 17 19 20 \
- 23 27 39 51a 51c 51d 107"}
+ 23 27 39 51a 51c 51d"}
+
+ if [[ $MDS1_VERSION -ge $(version_code 2.13.55) ]]; then
+ SANITYN_ONLY+=" 107"
+ fi
+
SANITYN_REPEAT=${SANITYN_REPEAT:-1}
# XXX: to fix 60
ONLY=$SANITYN_ONLY ONLY_REPEAT=$SANITYN_REPEAT OSC="mdc" DOM="yes" \
CHECK_DEFINE_64X(OBD_CONNECT2_FIDMAP);
CHECK_DEFINE_64X(OBD_CONNECT2_GETATTR_PFID);
CHECK_DEFINE_64X(OBD_CONNECT2_LSEEK);
+ CHECK_DEFINE_64X(OBD_CONNECT2_DOM_LVB);
CHECK_VALUE_X(OBD_CKSUM_CRC32);
CHECK_VALUE_X(OBD_CKSUM_ADLER);
OBD_CONNECT2_GETATTR_PFID);
LASSERTF(OBD_CONNECT2_LSEEK == 0x40000ULL, "found 0x%.16llxULL\n",
OBD_CONNECT2_LSEEK);
+ LASSERTF(OBD_CONNECT2_DOM_LVB == 0x80000ULL, "found 0x%.16llxULL\n",
+ OBD_CONNECT2_DOM_LVB);
LASSERTF(OBD_CKSUM_CRC32 == 0x00000001UL, "found 0x%.8xUL\n",
(unsigned)OBD_CKSUM_CRC32);
LASSERTF(OBD_CKSUM_ADLER == 0x00000002UL, "found 0x%.8xUL\n",
LASSERTF((int)sizeof(((struct ldlm_inodebits *)0)->li_gid) == 8, "found %lld\n",
(long long)(int)sizeof(((struct ldlm_inodebits *)0)->li_gid));
-
/* Checks for struct ldlm_flock_wire */
LASSERTF((int)sizeof(struct ldlm_flock_wire) == 32, "found %lld\n",
(long long)(int)sizeof(struct ldlm_flock_wire));