Whamcloud - gitweb
LU-925 agl: async glimpse lock process in CLIO stack
[fs/lustre-release.git] / lustre / osc / osc_request.c
index f4f2c7a..1f25a97 100644 (file)
@@ -3178,7 +3178,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
 
 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
                             obd_enqueue_update_f upcall, void *cookie,
-                            int *flags, int rc)
+                            int *flags, int agl, int rc)
 {
         int intent = *flags & LDLM_FL_HAS_INTENT;
         ENTRY;
@@ -3196,7 +3196,8 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
                 }
         }
 
-        if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
+        if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
+            (rc == 0)) {
                 *flags |= LDLM_FL_LVB_READY;
                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
@@ -3214,6 +3215,9 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         struct ldlm_lock *lock;
         struct lustre_handle handle;
         __u32 mode;
+        struct ost_lvb *lvb;
+        __u32 lvb_len;
+        int *flags = aa->oa_flags;
 
         /* Make a local copy of a lock handle and a mode, because aa->oa_*
          * might be freed anytime after lock upcall has been called. */
@@ -3233,13 +3237,20 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         /* Let CP AST to grant the lock first. */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
+        if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
+                lvb = NULL;
+                lvb_len = 0;
+        } else {
+                lvb = aa->oa_lvb;
+                lvb_len = sizeof(*aa->oa_lvb);
+        }
+
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
-                                   mode, aa->oa_flags, aa->oa_lvb,
-                                   sizeof(*aa->oa_lvb), &handle, rc);
+                                   mode, flags, lvb, lvb_len, &handle, rc);
         /* Complete osc stuff. */
-        rc = osc_enqueue_fini(req, aa->oa_lvb,
-                              aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+        rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
+                              flags, aa->oa_agl, rc);
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
@@ -3263,8 +3274,9 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp,
                         struct lov_oinfo *loi, int flags,
                         struct ost_lvb *lvb, __u32 mode, int rc)
 {
+        struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+
         if (rc == ELDLM_OK) {
-                struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
                 __u64 tmp;
 
                 LASSERT(lock != NULL);
@@ -3285,13 +3297,21 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp,
                                    lock->l_policy_data.l_extent.end);
                 }
                 ldlm_lock_allow_match(lock);
-                LDLM_LOCK_PUT(lock);
         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
+                LASSERT(lock != NULL);
                 loi->loi_lvb = *lvb;
+                ldlm_lock_allow_match(lock);
                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
                 rc = ELDLM_OK;
         }
+
+        if (lock != NULL) {
+                if (rc != ELDLM_OK)
+                        ldlm_lock_fail_match(lock, rc);
+
+                LDLM_LOCK_PUT(lock);
+        }
 }
 EXPORT_SYMBOL(osc_update_enqueue);
 
@@ -3310,11 +3330,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                      obd_enqueue_update_f upcall, void *cookie,
                      struct ldlm_enqueue_info *einfo,
                      struct lustre_handle *lockh,
-                     struct ptlrpc_request_set *rqset, int async)
+                     struct ptlrpc_request_set *rqset, int async, int agl)
 {
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req = NULL;
         int intent = *flags & LDLM_FL_HAS_INTENT;
+        int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
         ldlm_mode_t mode;
         int rc;
         ENTRY;
@@ -3348,13 +3369,20 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
         mode = einfo->ei_mode;
         if (einfo->ei_mode == LCK_PR)
                 mode |= LCK_PW;
-        mode = ldlm_lock_match(obd->obd_namespace,
-                               *flags | LDLM_FL_LVB_READY, res_id,
+        mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
                                einfo->ei_type, policy, mode, lockh, 0);
         if (mode) {
                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
 
-                if (osc_set_lock_data_with_check(matched, einfo)) {
+                if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
+                        /* For AGL, if enqueue RPC is sent but the lock is not
+                         * granted, then skip to process this strpe.
+                         * Return -ECANCELED to tell the caller. */
+                        ldlm_lock_decref(lockh, mode);
+                        LDLM_LOCK_PUT(matched);
+                        RETURN(-ECANCELED);
+                } else if (osc_set_lock_data_with_check(matched, einfo)) {
+                        *flags |= LDLM_FL_LVB_READY;
                         /* addref the lock only if not async requests and PW
                          * lock is matched whereas we asked for PR. */
                         if (!rqset && einfo->ei_mode != mode)
@@ -3368,16 +3396,17 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                         /* We already have a lock, and it's referenced */
                         (*upcall)(cookie, ELDLM_OK);
 
-                        /* For async requests, decref the lock. */
                         if (einfo->ei_mode != mode)
                                 ldlm_lock_decref(lockh, LCK_PW);
                         else if (rqset)
+                                /* For async requests, decref the lock. */
                                 ldlm_lock_decref(lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(matched);
                         RETURN(ELDLM_OK);
-                } else
+                } else {
                         ldlm_lock_decref(lockh, mode);
-                LDLM_LOCK_PUT(matched);
+                        LDLM_LOCK_PUT(matched);
+                }
         }
 
  no_match:
@@ -3416,6 +3445,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                         aa->oa_cookie = cookie;
                         aa->oa_lvb    = lvb;
                         aa->oa_lockh  = lockh;
+                        aa->oa_agl    = !!agl;
 
                         req->rq_interpret_reply =
                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
@@ -3429,7 +3459,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                 RETURN(rc);
         }
 
-        rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
+        rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
         if (intent)
                 ptlrpc_req_finished(req);
 
@@ -3451,7 +3481,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
-                              rqset, rqset != NULL);
+                              rqset, rqset != NULL, 0);
         RETURN(rc);
 }