Whamcloud - gitweb
LU-8650 mdt: enable REP-ACK for DNE 07/22807/4
authorLai Siyao <lai.siyao@intel.com>
Thu, 29 Sep 2016 12:48:47 +0000 (20:48 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 9 May 2017 03:44:14 +0000 (03:44 +0000)
LU-7903 reveals that REP-ACK is disabled in 2.8, this was
introduced in LU-3538 http://review.whamcloud.com/12530
which is to support DNE Commit-on-Sharing, but it disabled
REP-ACK, while Commit-on-Sharing doesn't take effect for
local operations (operation which involves only one MDT)
either, this may cause single MDT recovery fail.

To fix this, we need to enable REP-ACK, and also make sure
http://review.whamcloud.com/12530 work as designed:
1. save local locks upon unlock as before, but don't convert
   locks into COS mode.
2. reply_out_callback() wakes up ptlrpc_handle_rs(), if
   reply was committed, decref locks like before.
3. otherwise for uncommitted reply convert locks to COS mode,
   and later when it's committed, ptlrpc_commit_replies()
   wakes up ptlrpc_handle_rs() again, which will decref locks
   like before.

In short, local locks will be converted into COS mode upon
REP-ACK, and transaction commit will decref locks.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: Id21681017573b50e071dd8b5a4d65489843781a1
Reviewed-on: https://review.whamcloud.com/22807
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lib.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_recovery.c
lustre/ptlrpc/service.c

index 1619459..6cc0c91 100644 (file)
@@ -668,6 +668,8 @@ struct ptlrpc_reply_state {
         unsigned long          rs_committed:1;/* the transaction was committed
                                                  and the rs was dispatched
                                                  by ptlrpc_commit_replies */
+       unsigned long           rs_convert_lock:1; /* need to convert saved
+                                                   * locks to COS mode */
        atomic_t                rs_refcount;    /* number of users */
        /** Number of locks awaiting client ACK */
        int                     rs_nlocks;
@@ -2250,8 +2252,8 @@ struct ptlrpc_service_conf {
  *
  * @{
  */
-void ptlrpc_save_lock(struct ptlrpc_request *req,
-                      struct lustre_handle *lock, int mode, int no_ack);
+void ptlrpc_save_lock(struct ptlrpc_request *req, struct lustre_handle *lock,
+                     int mode, bool no_ack, bool convert_lock);
 void ptlrpc_commit_replies(struct obd_export *exp);
 void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs);
 void ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs);
index 268ae92..66a88d4 100644 (file)
@@ -716,7 +716,10 @@ int server_disconnect_export(struct obd_export *exp)
                spin_lock(&svcpt->scp_rep_lock);
 
                list_del_init(&rs->rs_exp_list);
+
                spin_lock(&rs->rs_lock);
+               /* clear rs_convert_lock to make sure rs is handled and put */
+               rs->rs_convert_lock = 0;
                ptlrpc_schedule_difficult_reply(rs);
                spin_unlock(&rs->rs_lock);
 
index aa586ec..d2e1367 100644 (file)
@@ -2387,49 +2387,66 @@ static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
  * \see ldlm_blocking_ast_nocheck
  */
 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, int flag)
+                    void *data, int flag)
 {
-        struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
-        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
-        int rc;
-        ENTRY;
+       struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool commit_async = false;
+       int rc;
+       ENTRY;
 
-        if (flag == LDLM_CB_CANCELING)
-                RETURN(0);
+       if (flag == LDLM_CB_CANCELING)
+               RETURN(0);
 
-        lock_res_and_lock(lock);
-        if (lock->l_blocking_ast != mdt_blocking_ast) {
-                unlock_res_and_lock(lock);
-                RETURN(0);
-        }
+       lock_res_and_lock(lock);
+       if (lock->l_blocking_ast != mdt_blocking_ast) {
+               unlock_res_and_lock(lock);
+               RETURN(0);
+       }
+       /* There is no lock conflict if l_blocking_lock == NULL,
+        * it indicates a blocking ast sent from ldlm_lock_decref_internal
+        * when the last reference to a local lock was released */
        if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
            lock->l_blocking_lock != NULL) {
-               if (mdt_cos_is_enabled(mdt) &&
-                   lock->l_client_cookie !=
-                   lock->l_blocking_lock->l_client_cookie)
-                       mdt_set_lock_sync(lock);
-               else if (mdt_slc_is_enabled(mdt) &&
-                        ldlm_is_cos_incompat(lock->l_blocking_lock))
+               if (mdt_cos_is_enabled(mdt)) {
+                       if (lock->l_client_cookie !=
+                           lock->l_blocking_lock->l_client_cookie)
+                               mdt_set_lock_sync(lock);
+               else if (mdt_slc_is_enabled(mdt) &&
+                          ldlm_is_cos_incompat(lock->l_blocking_lock)) {
                        mdt_set_lock_sync(lock);
+                       /*
+                        * we may do extra commit here, but there is a small
+                        * window to miss a commit: lock was unlocked (saved),
+                        * then a conflict lock queued and we come here, but
+                        * REP-ACK not received, so lock was not converted to
+                        * COS mode yet.
+                        * Fortunately this window is quite small, so the
+                        * extra commit should be rare (not to say distributed
+                        * operation is rare too).
+                        */
+                       commit_async = true;
+               }
+       } else if (lock->l_req_mode == LCK_COS &&
+                  lock->l_blocking_lock != NULL) {
+               commit_async = true;
        }
-        rc = ldlm_blocking_ast_nocheck(lock);
 
-        /* There is no lock conflict if l_blocking_lock == NULL,
-         * it indicates a blocking ast sent from ldlm_lock_decref_internal
-         * when the last reference to a local lock was released */
-        if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
-                struct lu_env env;
+       rc = ldlm_blocking_ast_nocheck(lock);
+
+       if (commit_async) {
+               struct lu_env env;
 
                rc = lu_env_init(&env, LCT_LOCAL);
                if (unlikely(rc != 0))
                        CWARN("%s: lu_env initialization failed, cannot "
                              "start asynchronous commit: rc = %d\n",
                              obd->obd_name, rc);
-                else
-                        mdt_device_commit_async(&env, mdt);
-                lu_env_fini(&env);
-        }
-        RETURN(rc);
+               else
+                       mdt_device_commit_async(&env, mdt);
+               lu_env_fini(&env);
+       }
+       RETURN(rc);
 }
 
 /*
@@ -2811,10 +2828,8 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        struct mdt_device *mdt = info->mti_mdt;
                        struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
-                       int cos;
-
-                       cos = (mdt_cos_is_enabled(mdt) ||
-                              mdt_slc_is_enabled(mdt));
+                       bool cos = mdt_cos_is_enabled(mdt);
+                       bool convert_lock = !cos && mdt_slc_is_enabled(mdt);
 
                        LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
                                 h->cookie);
@@ -2822,14 +2837,15 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        /* there is no request if mdt_object_unlock() is called
                         * from mdt_export_cleanup()->mdt_add_dirty_flag() */
                        if (likely(req != NULL)) {
-                               CDEBUG(D_HA, "request = %p reply state = %p"
-                                      " transno = %lld\n", req,
-                                      req->rq_reply_state, req->rq_transno);
+                               LDLM_DEBUG(lock, "save lock request %p reply "
+                                       "state %p transno %lld\n", req,
+                                       req->rq_reply_state, req->rq_transno);
                                if (cos) {
                                        ldlm_lock_downgrade(lock, LCK_COS);
                                        mode = LCK_COS;
                                }
-                               ptlrpc_save_lock(req, h, mode, cos);
+                               ptlrpc_save_lock(req, h, mode, cos,
+                                                convert_lock);
                        } else {
                                mdt_fid_unlock(h, mode);
                        }
index cc55a08..15fdd61 100644 (file)
@@ -157,12 +157,13 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                       oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_opc,
                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
 
-                for (i = 0; i < oldrep->rs_nlocks; i++)
-                        ptlrpc_save_lock(req, &oldrep->rs_locks[i],
-                                         oldrep->rs_modes[i], 0);
-                oldrep->rs_nlocks = 0;
+               for (i = 0; i < oldrep->rs_nlocks; i++)
+                       ptlrpc_save_lock(req, &oldrep->rs_locks[i],
+                                        oldrep->rs_modes[i], oldrep->rs_no_ack,
+                                        oldrep->rs_convert_lock);
+               oldrep->rs_nlocks = 0;
 
-                DEBUG_REQ(D_HA, req, "stole locks for");
+               DEBUG_REQ(D_HA, req, "stole locks for");
                spin_lock(&oldrep->rs_lock);
                ptlrpc_schedule_difficult_reply(oldrep);
                spin_unlock(&oldrep->rs_lock);
index a23ffe4..221974e 100644 (file)
@@ -176,24 +176,25 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
  * Puts a lock and its mode into reply state assotiated to request reply.
  */
 void
-ptlrpc_save_lock(struct ptlrpc_request *req,
-                 struct lustre_handle *lock, int mode, int no_ack)
+ptlrpc_save_lock(struct ptlrpc_request *req, struct lustre_handle *lock,
+                int mode, bool no_ack, bool convert_lock)
 {
-        struct ptlrpc_reply_state *rs = req->rq_reply_state;
-        int                        idx;
+       struct ptlrpc_reply_state *rs = req->rq_reply_state;
+       int idx;
 
-        LASSERT(rs != NULL);
-        LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
+       LASSERT(rs != NULL);
+       LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
 
-        if (req->rq_export->exp_disconnected) {
-                ldlm_lock_decref(lock, mode);
-        } else {
-                idx = rs->rs_nlocks++;
-                rs->rs_locks[idx] = *lock;
-                rs->rs_modes[idx] = mode;
-                rs->rs_difficult = 1;
-                rs->rs_no_ack = !!no_ack;
-        }
+       if (req->rq_export->exp_disconnected) {
+               ldlm_lock_decref(lock, mode);
+       } else {
+               idx = rs->rs_nlocks++;
+               rs->rs_locks[idx] = *lock;
+               rs->rs_modes[idx] = mode;
+               rs->rs_difficult = 1;
+               rs->rs_no_ack = no_ack;
+               rs->rs_convert_lock = convert_lock;
+       }
 }
 EXPORT_SYMBOL(ptlrpc_save_lock);
 
@@ -204,7 +205,7 @@ struct ptlrpc_hr_thread {
        int                             hrt_id;         /* thread ID */
        spinlock_t                      hrt_lock;
        wait_queue_head_t               hrt_waitq;
-       struct list_head                        hrt_queue;      /* RS queue */
+       struct list_head                hrt_queue;
        struct ptlrpc_hr_partition      *hrt_partition;
 };
 
@@ -2161,10 +2162,10 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
 {
        struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
        struct ptlrpc_service     *svc = svcpt->scp_service;
-        struct obd_export         *exp;
-        int                        nlocks;
-        int                        been_handled;
-        ENTRY;
+       struct obd_export         *exp;
+       int                        nlocks;
+       int                        been_handled;
+       ENTRY;
 
        exp = rs->rs_export;
 
@@ -2172,58 +2173,91 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
        LASSERT(rs->rs_scheduled);
        LASSERT(list_empty(&rs->rs_list));
 
-       spin_lock(&exp->exp_lock);
-       /* Noop if removed already */
-       list_del_init(&rs->rs_exp_list);
-       spin_unlock(&exp->exp_lock);
-
-        /* The disk commit callback holds exp_uncommitted_replies_lock while it
-         * iterates over newly committed replies, removing them from
-         * exp_uncommitted_replies.  It then drops this lock and schedules the
-         * replies it found for handling here.
-         *
-         * We can avoid contention for exp_uncommitted_replies_lock between the
-         * HRT threads and further commit callbacks by checking rs_committed
-         * which is set in the commit callback while it holds both
-         * rs_lock and exp_uncommitted_reples.
-         *
-         * If we see rs_committed clear, the commit callback _may_ not have
-         * handled this reply yet and we race with it to grab
-         * exp_uncommitted_replies_lock before removing the reply from
-         * exp_uncommitted_replies.  Note that if we lose the race and the
-         * reply has already been removed, list_del_init() is a noop.
-         *
-         * If we see rs_committed set, we know the commit callback is handling,
-         * or has handled this reply since store reordering might allow us to
-         * see rs_committed set out of sequence.  But since this is done
-         * holding rs_lock, we can be sure it has all completed once we hold
-         * rs_lock, which we do right next.
-         */
+       /* The disk commit callback holds exp_uncommitted_replies_lock while it
+        * iterates over newly committed replies, removing them from
+        * exp_uncommitted_replies.  It then drops this lock and schedules the
+        * replies it found for handling here.
+        *
+        * We can avoid contention for exp_uncommitted_replies_lock between the
+        * HRT threads and further commit callbacks by checking rs_committed
+        * which is set in the commit callback while it holds both
+        * rs_lock and exp_uncommitted_reples.
+        *
+        * If we see rs_committed clear, the commit callback _may_ not have
+        * handled this reply yet and we race with it to grab
+        * exp_uncommitted_replies_lock before removing the reply from
+        * exp_uncommitted_replies.  Note that if we lose the race and the
+        * reply has already been removed, list_del_init() is a noop.
+        *
+        * If we see rs_committed set, we know the commit callback is handling,
+        * or has handled this reply since store reordering might allow us to
+        * see rs_committed set out of sequence.  But since this is done
+        * holding rs_lock, we can be sure it has all completed once we hold
+        * rs_lock, which we do right next.
+        */
        if (!rs->rs_committed) {
+               /* if rs was commited, no need to convert locks, don't check
+                * rs_committed here because rs may never be added into
+                * exp_uncommitted_replies and this flag never be set, see
+                * target_send_reply() */
+               if (rs->rs_convert_lock &&
+                   rs->rs_transno > exp->exp_last_committed) {
+                       struct ldlm_lock *lock;
+
+                       spin_lock(&rs->rs_lock);
+                       if (rs->rs_convert_lock &&
+                           rs->rs_transno > exp->exp_last_committed) {
+                               nlocks = rs->rs_nlocks;
+                               while (nlocks-- > 0)
+                                       rs->rs_modes[nlocks] = LCK_COS;
+                               nlocks = rs->rs_nlocks;
+                               rs->rs_convert_lock = 0;
+                               /* clear rs_scheduled so that commit callback
+                                * can schedule again */
+                               rs->rs_scheduled = 0;
+                               spin_unlock(&rs->rs_lock);
+
+                               while (nlocks-- > 0) {
+                                       lock = ldlm_handle2lock(
+                                                       &rs->rs_locks[nlocks]);
+                                       LASSERT(lock != NULL);
+                                       ldlm_lock_downgrade(lock, LCK_COS);
+                                       LDLM_LOCK_PUT(lock);
+                               }
+                               RETURN(0);
+                       }
+                       spin_unlock(&rs->rs_lock);
+               }
+
                spin_lock(&exp->exp_uncommitted_replies_lock);
                list_del_init(&rs->rs_obd_list);
                spin_unlock(&exp->exp_uncommitted_replies_lock);
        }
 
+       spin_lock(&exp->exp_lock);
+       /* Noop if removed already */
+       list_del_init(&rs->rs_exp_list);
+       spin_unlock(&exp->exp_lock);
+
        spin_lock(&rs->rs_lock);
 
-        been_handled = rs->rs_handled;
-        rs->rs_handled = 1;
+       been_handled = rs->rs_handled;
+       rs->rs_handled = 1;
 
-        nlocks = rs->rs_nlocks;                 /* atomic "steal", but */
-        rs->rs_nlocks = 0;                      /* locks still on rs_locks! */
+       nlocks = rs->rs_nlocks;                 /* atomic "steal", but */
+       rs->rs_nlocks = 0;                      /* locks still on rs_locks! */
 
-        if (nlocks == 0 && !been_handled) {
-                /* If we see this, we should already have seen the warning
-                 * in mds_steal_ack_locks()  */
+       if (nlocks == 0 && !been_handled) {
+               /* If we see this, we should already have seen the warning
+                * in mds_steal_ack_locks()  */
                CDEBUG(D_HA, "All locks stolen from rs %p x%lld.t%lld"
                       " o%d NID %s\n",
                       rs,
                       rs->rs_xid, rs->rs_transno, rs->rs_opc,
                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
-        }
+       }
 
-        if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
+       if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
                spin_unlock(&rs->rs_lock);
 
                if (!been_handled && rs->rs_on_net) {
@@ -2239,6 +2273,7 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
        }
 
        rs->rs_scheduled = 0;
+       rs->rs_convert_lock = 0;
 
        if (!rs->rs_on_net) {
                /* Off the net */