Whamcloud - gitweb
LU-9504 ptlrpc: REP-ACK hr may race with trans commit 07/27207/9
authorLai Siyao <lai.siyao@intel.com>
Thu, 18 May 2017 16:27:31 +0000 (00:27 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 19 Jun 2017 16:27:40 +0000 (16:27 +0000)
REP-ACK hr may race with transaction commit, and the latter will
release saved locks, so in REP-ACK hr we need to get locks early to
convert them to COS mode safely.

But the locks got may be decrefed and canceled, in this case it
can't be converted to COS mode, remove an assert in
ldlm_lock_downgrade() for this.

Also protect mdt_steal_ack_locks() with rs_lock because it may also
race with REP-ACK hr. And move ldlm_lock_decref() outside of locks
because it may sleep.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: Ia9a3ba6a83689c0552ae8aaf2eb735c3f06b62e2
Reviewed-on: https://review.whamcloud.com/27207
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/ldlm/ldlm_lock.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_recovery.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/service.c

index 034ef30..7708e27 100644 (file)
@@ -2534,8 +2534,10 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
 /**
  * Downgrade an exclusive lock.
  *
- * A fast variant of ldlm_lock_convert for convertion of exclusive
- * locks. The convertion is always successful.
+ * A fast variant of ldlm_lock_convert for convertion of exclusive locks. The
+ * convertion may fail if lock was canceled before downgrade, but it doesn't
+ * indicate any problem, because such lock has no reader or writer, and will
+ * be released soon.
  * Used by Commit on Sharing (COS) code.
  *
  * \param lock A lock to convert
@@ -2543,25 +2545,34 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
  */
 void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
 {
-        ENTRY;
+       ENTRY;
 
-        LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
-        LASSERT(new_mode == LCK_COS);
+       LASSERT(new_mode == LCK_COS);
 
-        lock_res_and_lock(lock);
-        ldlm_resource_unlink_lock(lock);
-        /*
-         * Remove the lock from pool as it will be added again in
-         * ldlm_grant_lock() called below.
-         */
-        ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+       lock_res_and_lock(lock);
 
-        lock->l_req_mode = new_mode;
-        ldlm_grant_lock(lock, NULL);
-        unlock_res_and_lock(lock);
-        ldlm_reprocess_all(lock->l_resource);
+       if (!(lock->l_granted_mode & (LCK_PW | LCK_EX))) {
+               unlock_res_and_lock(lock);
 
-        EXIT;
+               LASSERT(lock->l_granted_mode == LCK_MINMODE);
+               LDLM_DEBUG(lock, "lock was canceled before downgrade");
+               RETURN_EXIT;
+       }
+
+       ldlm_resource_unlink_lock(lock);
+       /*
+        * Remove the lock from pool as it will be added again in
+        * ldlm_grant_lock() called below.
+        */
+       ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+       lock->l_req_mode = new_mode;
+       ldlm_grant_lock(lock, NULL);
+
+       unlock_res_and_lock(lock);
+
+       ldlm_reprocess_all(lock->l_resource);
+
+       EXIT;
 }
 EXPORT_SYMBOL(ldlm_lock_downgrade);
 
index 1b58617..6035012 100644 (file)
@@ -2893,8 +2893,11 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                                        ldlm_lock_downgrade(lock, LCK_COS);
                                        mode = LCK_COS;
                                }
-                               ptlrpc_save_lock(req, h, mode, cos,
-                                                convert_lock);
+                               if (req->rq_export->exp_disconnected)
+                                       mdt_fid_unlock(h, mode);
+                               else
+                                       ptlrpc_save_lock(req, h, mode, cos,
+                                                        convert_lock);
                        } else {
                                mdt_fid_unlock(h, mode);
                        }
index 15fdd61..ed61a2e 100644 (file)
@@ -126,52 +126,62 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
 static void mdt_steal_ack_locks(struct ptlrpc_request *req)
 {
        struct ptlrpc_service_part *svcpt;
-       struct obd_export          *exp = req->rq_export;
-       struct list_head           *tmp;
-       struct ptlrpc_reply_state  *oldrep;
-       int                         i;
+       struct obd_export *exp = req->rq_export;
+       struct list_head *tmp;
+       struct ptlrpc_reply_state *rs;
+       int i;
 
        /* CAVEAT EMPTOR: spinlock order */
        spin_lock(&exp->exp_lock);
        list_for_each(tmp, &exp->exp_outstanding_replies) {
-               oldrep = list_entry(tmp, struct ptlrpc_reply_state,
+               rs = list_entry(tmp, struct ptlrpc_reply_state,
                                    rs_exp_list);
 
-               if (oldrep->rs_xid != req->rq_xid)
+               if (rs->rs_xid != req->rq_xid)
                        continue;
 
-               if (oldrep->rs_opc != lustre_msg_get_opc(req->rq_reqmsg))
+               if (rs->rs_opc != lustre_msg_get_opc(req->rq_reqmsg))
                        CERROR("%s: Resent req xid %llu has mismatched opc: "
                               "new %d old %d\n", exp->exp_obd->obd_name,
                               req->rq_xid, lustre_msg_get_opc(req->rq_reqmsg),
-                              oldrep->rs_opc);
-
-               svcpt = oldrep->rs_svcpt;
-               spin_lock(&svcpt->scp_rep_lock);
+                              rs->rs_opc);
 
-               list_del_init(&oldrep->rs_exp_list);
+               svcpt = rs->rs_svcpt;
 
                CDEBUG(D_HA, "Stealing %d locks from rs %p x%lld.t%lld"
                       " o%d NID %s\n",
-                      oldrep->rs_nlocks, oldrep,
-                      oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_opc,
+                      rs->rs_nlocks, rs,
+                      rs->rs_xid, rs->rs_transno, rs->rs_opc,
                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
 
-               for (i = 0; i < oldrep->rs_nlocks; i++)
-                       ptlrpc_save_lock(req, &oldrep->rs_locks[i],
-                                        oldrep->rs_modes[i], oldrep->rs_no_ack,
-                                        oldrep->rs_convert_lock);
-               oldrep->rs_nlocks = 0;
+               spin_lock(&svcpt->scp_rep_lock);
+               list_del_init(&rs->rs_exp_list);
+
+               spin_lock(&rs->rs_lock);
+               for (i = 0; i < rs->rs_nlocks; i++)
+                       ptlrpc_save_lock(req, &rs->rs_locks[i],
+                                        rs->rs_modes[i], rs->rs_no_ack,
+                                        rs->rs_convert_lock);
+               rs->rs_nlocks = 0;
 
                DEBUG_REQ(D_HA, req, "stole locks for");
-               spin_lock(&oldrep->rs_lock);
-               ptlrpc_schedule_difficult_reply(oldrep);
-               spin_unlock(&oldrep->rs_lock);
+               ptlrpc_schedule_difficult_reply(rs);
+               spin_unlock(&rs->rs_lock);
 
                spin_unlock(&svcpt->scp_rep_lock);
                break;
        }
        spin_unlock(&exp->exp_lock);
+
+       /* if exp_disconnected, decref stolen locks */
+       if (exp->exp_disconnected) {
+               rs = req->rq_reply_state;
+
+               for (i = 0; i < rs->rs_nlocks; i++)
+                       ldlm_lock_decref(&rs->rs_locks[i], rs->rs_modes[i]);
+
+               rs->rs_nlocks = 0;
+       }
 }
 
 __u64 mdt_req_from_lrd(struct ptlrpc_request *req,
index e7b04f0..d720645 100644 (file)
@@ -2525,6 +2525,7 @@ int req_capsule_server_grow(struct req_capsule *pill,
 
                 nrs->rs_difficult = 1;
                 nrs->rs_no_ack = rs->rs_no_ack;
+               nrs->rs_convert_lock = rs->rs_convert_lock;
                 for (i = 0; i < rs->rs_nlocks; i++) {
                         nrs->rs_locks[i] = rs->rs_locks[i];
                         nrs->rs_modes[i] = rs->rs_modes[i];
index 04a9147..5daa2ec 100644 (file)
@@ -185,16 +185,12 @@ ptlrpc_save_lock(struct ptlrpc_request *req, struct lustre_handle *lock,
        LASSERT(rs != NULL);
        LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
 
-       if (req->rq_export->exp_disconnected) {
-               ldlm_lock_decref(lock, mode);
-       } else {
-               idx = rs->rs_nlocks++;
-               rs->rs_locks[idx] = *lock;
-               rs->rs_modes[idx] = mode;
-               rs->rs_difficult = 1;
-               rs->rs_no_ack = no_ack;
-               rs->rs_convert_lock = convert_lock;
-       }
+       idx = rs->rs_nlocks++;
+       rs->rs_locks[idx] = *lock;
+       rs->rs_modes[idx] = mode;
+       rs->rs_difficult = 1;
+       rs->rs_no_ack = no_ack;
+       rs->rs_convert_lock = convert_lock;
 }
 EXPORT_SYMBOL(ptlrpc_save_lock);
 
@@ -2208,13 +2204,28 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
                if (rs->rs_convert_lock &&
                    rs->rs_transno > exp->exp_last_committed) {
                        struct ldlm_lock *lock;
+                       struct ldlm_lock *ack_locks[RS_MAX_LOCKS] = { NULL };
 
                        spin_lock(&rs->rs_lock);
                        if (rs->rs_convert_lock &&
                            rs->rs_transno > exp->exp_last_committed) {
                                nlocks = rs->rs_nlocks;
-                               while (nlocks-- > 0)
+                               while (nlocks-- > 0) {
+                                       /*
+                                        * NB don't assume rs is always handled
+                                        * by the same service thread (see
+                                        * ptlrpc_hr_select, so REP-ACK hr may
+                                        * race with trans commit, while the
+                                        * latter will release locks, get locks
+                                        * here early to convert to COS mode
+                                        * safely.
+                                        */
+                                       lock = ldlm_handle2lock(
+                                                       &rs->rs_locks[nlocks]);
+                                       LASSERT(lock);
+                                       ack_locks[nlocks] = lock;
                                        rs->rs_modes[nlocks] = LCK_COS;
+                               }
                                nlocks = rs->rs_nlocks;
                                rs->rs_convert_lock = 0;
                                /* clear rs_scheduled so that commit callback
@@ -2223,9 +2234,7 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
                                spin_unlock(&rs->rs_lock);
 
                                while (nlocks-- > 0) {
-                                       lock = ldlm_handle2lock(
-                                                       &rs->rs_locks[nlocks]);
-                                       LASSERT(lock != NULL);
+                                       lock = ack_locks[nlocks];
                                        ldlm_lock_downgrade(lock, LCK_COS);
                                        LDLM_LOCK_PUT(lock);
                                }