Whamcloud - gitweb
LU-10664 dom: non-blocking enqueue for DOM locks
[fs/lustre-release.git] / lustre / mdc / mdc_dev.c
index cbe0201..75d44ba 100644 (file)
@@ -106,7 +106,6 @@ int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp,
 
        rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
                             res_id, type, policy, mode, lockh, match_flags);
-
        if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
                RETURN(rc);
 
@@ -302,9 +301,8 @@ void mdc_lock_lockless_cancel(const struct lu_env *env,
  * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
  * and ldlm_lock caches.
  */
-static int mdc_dlm_blocking_ast0(const struct lu_env *env,
-                                struct ldlm_lock *dlmlock,
-                                int flag)
+static int mdc_dlm_canceling(const struct lu_env *env,
+                            struct ldlm_lock *dlmlock)
 {
        struct cl_object *obj = NULL;
        int result = 0;
@@ -313,11 +311,8 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env,
 
        ENTRY;
 
-       LASSERT(flag == LDLM_CB_CANCELING);
-       LASSERT(dlmlock != NULL);
-
        lock_res_and_lock(dlmlock);
-       if (dlmlock->l_granted_mode != dlmlock->l_req_mode) {
+       if (!ldlm_is_granted(dlmlock)) {
                dlmlock->l_ast_data = NULL;
                unlock_res_and_lock(dlmlock);
                RETURN(0);
@@ -358,13 +353,13 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env,
 }
 
 int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
-                         struct ldlm_lock_desc *new, void *data, int flag)
+                         struct ldlm_lock_desc *new, void *data, int reason)
 {
        int rc = 0;
 
        ENTRY;
 
-       switch (flag) {
+       switch (reason) {
        case LDLM_CB_BLOCKING: {
                struct lustre_handle lockh;
 
@@ -395,7 +390,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
                        break;
                }
 
-               rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
+               rc = mdc_dlm_canceling(env, dlmlock);
                cl_env_put(env, &refcheck);
                break;
        }
@@ -443,6 +438,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
                        attr->cat_kms = size;
                        setkms = 1;
                }
+               ldlm_lock_allow_match_locked(dlmlock);
        }
 
        /* The size should not be less than the kms */
@@ -492,7 +488,7 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
 
        /* Lock must have been granted. */
        lock_res_and_lock(dlmlock);
-       if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
+       if (ldlm_is_granted(dlmlock)) {
                struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr;
 
                /* extend the lock extent, otherwise it will have problem when
@@ -518,7 +514,7 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
 
 /**
  * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
- * received from a server, or after osc_enqueue_base() matched a local DLM
+ * received from a server, or after mdc_enqueue_send() matched a local DLM
  * lock.
  */
 static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
@@ -576,53 +572,66 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
        RETURN(rc);
 }
 
+/* This is needed only for old servers (before 2.14) support */
 int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb)
 {
        struct mdt_body *body;
 
+       /* get LVB data from mdt_body otherwise */
        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
        if (!body)
                RETURN(-EPROTO);
 
-       lvb->lvb_mtime = body->mbo_mtime;
-       lvb->lvb_atime = body->mbo_atime;
-       lvb->lvb_ctime = body->mbo_ctime;
-       lvb->lvb_blocks = body->mbo_dom_blocks;
-       lvb->lvb_size = body->mbo_dom_size;
+       if (!(body->mbo_valid & OBD_MD_DOM_SIZE))
+               RETURN(-EPROTO);
 
+       mdc_body2lvb(body, lvb);
        RETURN(0);
 }
 
-int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
-                    void *cookie, struct lustre_handle *lockh,
-                    enum ldlm_mode mode, __u64 *flags, int errcode)
+int mdc_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
+                    osc_enqueue_upcall_f upcall, void *cookie,
+                    struct lustre_handle *lockh, enum ldlm_mode mode,
+                    __u64 *flags, int errcode)
 {
        struct osc_lock *ols = cookie;
-       struct ldlm_lock *lock;
+       bool glimpse = *flags & LDLM_FL_HAS_INTENT;
        int rc = 0;
 
        ENTRY;
 
-       /* The request was created before ldlm_cli_enqueue call. */
-       if (errcode == ELDLM_LOCK_ABORTED) {
+       /* needed only for glimpse from an old server (< 2.14) */
+       if (glimpse && !exp_connect_dom_lvb(exp))
+               rc = mdc_fill_lvb(req, &ols->ols_lvb);
+
+       if (glimpse && errcode == ELDLM_LOCK_ABORTED) {
                struct ldlm_reply *rep;
 
                rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
-               LASSERT(rep != NULL);
-
-               rep->lock_policy_res2 =
-                       ptlrpc_status_ntoh(rep->lock_policy_res2);
-               if (rep->lock_policy_res2)
-                       errcode = rep->lock_policy_res2;
-
-               rc = mdc_fill_lvb(req, &ols->ols_lvb);
+               if (likely(rep)) {
+                       rep->lock_policy_res2 =
+                               ptlrpc_status_ntoh(rep->lock_policy_res2);
+                       if (rep->lock_policy_res2)
+                               errcode = rep->lock_policy_res2;
+               } else {
+                       rc = -EPROTO;
+               }
                *flags |= LDLM_FL_LVB_READY;
        } else if (errcode == ELDLM_OK) {
+               struct ldlm_lock *lock;
+
                /* Callers have references, should be valid always */
                lock = ldlm_handle2lock(lockh);
-               LASSERT(lock);
 
-               rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
+               /* At this point ols_lvb must be filled with correct LVB either
+                * by mdc_fill_lvb() above or by ldlm_cli_enqueue_fini().
+                * DoM uses l_ost_lvb to store LVB data, so copy it here from
+                * just updated ols_lvb.
+                */
+               lock_res_and_lock(lock);
+               memcpy(&lock->l_ost_lvb, &ols->ols_lvb,
+                      sizeof(lock->l_ost_lvb));
+               unlock_res_and_lock(lock);
                LDLM_LOCK_PUT(lock);
                *flags |= LDLM_FL_LVB_READY;
        }
@@ -646,6 +655,10 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        struct ldlm_lock *lock;
        struct lustre_handle *lockh = &aa->oa_lockh;
        enum ldlm_mode mode = aa->oa_mode;
+       struct ldlm_enqueue_info einfo = {
+               .ei_type = aa->oa_type,
+               .ei_mode = mode,
+       };
 
        ENTRY;
 
@@ -661,7 +674,8 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        /* Take an additional reference so that a blocking AST that
         * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
         * to arrive after an upcall has been executed by
-        * osc_enqueue_fini(). */
+        * mdc_enqueue_fini().
+        */
        ldlm_lock_addref(lockh, mode);
 
        /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
@@ -671,12 +685,12 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
        /* Complete obtaining the lock procedure. */
-       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
-                                  aa->oa_mode, aa->oa_flags, NULL, 0,
-                                  lockh, rc);
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
+                                  aa->oa_lvb, aa->oa_lvb ?
+                                  sizeof(*aa->oa_lvb) : 0, lockh, rc);
        /* Complete mdc stuff. */
-       rc = mdc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
-                             aa->oa_flags, rc);
+       rc = mdc_enqueue_fini(aa->oa_exp, req, aa->oa_upcall, aa->oa_cookie,
+                             lockh, mode, aa->oa_flags, rc);
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
@@ -694,8 +708,7 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
  * release locks just after they are obtained. */
 int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                     struct ldlm_res_id *res_id, __u64 *flags,
-                    union ldlm_policy_data *policy,
-                    struct ost_lvb *lvb, int kms_valid,
+                    union ldlm_policy_data *policy, struct ost_lvb *lvb,
                     osc_enqueue_upcall_f upcall, void *cookie,
                     struct ldlm_enqueue_info *einfo, int async)
 {
@@ -708,6 +721,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        __u64 match_flags = *flags;
        LIST_HEAD(cancels);
        int rc, count;
+       int lvb_size;
+       bool compat_glimpse = glimpse && !exp_connect_dom_lvb(exp);
 
        ENTRY;
 
@@ -715,12 +730,9 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        if (einfo->ei_mode == LCK_PR)
                mode |= LCK_PW;
 
+       match_flags |= LDLM_FL_LVB_READY;
        if (glimpse)
                match_flags |= LDLM_FL_BLOCK_GRANTED;
-       /* DOM locking uses LDLM_FL_KMS_IGNORE to mark locks wich have no valid
-        * LVB information, e.g. canceled locks or locks of just pruned object,
-        * such locks should be skipped.
-        */
        mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
                               einfo->ei_type, policy, mode, &lockh);
        if (mode) {
@@ -751,7 +763,9 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
                RETURN(-ENOLCK);
 
-       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT);
+       /* Glimpse is intent on old server */
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), compat_glimpse ?
+                                  &RQF_LDLM_INTENT : &RQF_LDLM_ENQUEUE);
        if (req == NULL)
                RETURN(-ENOMEM);
 
@@ -769,20 +783,28 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                RETURN(rc);
        }
 
-       /* pack the intent */
-       lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
-       lit->opc = glimpse ? IT_GLIMPSE : IT_BRW;
-
-       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0);
-       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
-       ptlrpc_request_set_replen(req);
+       if (compat_glimpse) {
+               /* pack the glimpse intent */
+               lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+               lit->opc = IT_GLIMPSE;
+       }
 
        /* users of mdc_enqueue() can pass this flag for ldlm_lock_match() */
        *flags &= ~LDLM_FL_BLOCK_GRANTED;
-       /* All MDC IO locks are intents */
-       *flags |= LDLM_FL_HAS_INTENT;
-       rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, NULL,
-                             0, LVB_T_NONE, &lockh, async);
+
+       if (compat_glimpse) {
+               req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0);
+               req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
+               lvb_size = 0;
+       } else {
+               lvb_size = sizeof(*lvb);
+               req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                    lvb_size);
+       }
+       ptlrpc_request_set_replen(req);
+
+       rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
+                             lvb_size, LVB_T_OST, &lockh, async);
        if (async) {
                if (!rc) {
                        struct osc_enqueue_args *aa;
@@ -796,7 +818,7 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        aa->oa_cookie = cookie;
                        aa->oa_speculative = false;
                        aa->oa_flags = flags;
-                       aa->oa_lvb = lvb;
+                       aa->oa_lvb = compat_glimpse ? NULL : lvb;
 
                        req->rq_interpret_reply = mdc_enqueue_interpret;
                        ptlrpcd_add_req(req);
@@ -806,7 +828,7 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                RETURN(rc);
        }
 
-       rc = mdc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
+       rc = mdc_enqueue_fini(exp, req, upcall, cookie, &lockh, einfo->ei_mode,
                              flags, rc);
        ptlrpc_req_finished(req);
        RETURN(rc);
@@ -893,8 +915,7 @@ enqueue_base:
        mdc_lock_build_policy(env, lock, policy);
        LASSERT(!oscl->ols_speculative);
        result = mdc_enqueue_send(env, osc_export(osc), resname,
-                                 &oscl->ols_flags, policy,
-                                 &oscl->ols_lvb, osc->oo_oinfo->loi_kms_valid,
+                                 &oscl->ols_flags, policy, &oscl->ols_lvb,
                                  upcall, cookie, &oscl->ols_einfo, async);
        if (result == 0) {
                if (osc_lock_is_lockless(oscl)) {
@@ -1458,7 +1479,7 @@ static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
         * so init it here with given osc_object.
         */
        mdc_set_dom_lock_data(lock, cl2osc(obj));
-       RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+       RETURN(mdc_dlm_canceling(env, lock));
 }
 
 static const struct cl_object_operations mdc_ops = {