Whamcloud - gitweb
LU-8612 mdt: maintain Sync-on-Lock-Cancel locks per target 90/22490/2
authorLai Siyao <lai.siyao@intel.com>
Wed, 14 Sep 2016 10:11:56 +0000 (18:11 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 5 Oct 2016 03:51:22 +0000 (03:51 +0000)
Sync-on-Lock-Cancel locks should be maintained per target, so that
if there are more than one target(MDT) on one server, transaction
commit on different target won't interfere with each other.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: Ib2834c64089f18fb9d2e06d64f770ddaf9c2e2d3
Reviewed-on: http://review.whamcloud.com/22490
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lu_target.h
lustre/mdt/mdt_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c
lustre/target/tgt_main.c

index e8f09e7..e66f29d 100644 (file)
@@ -165,6 +165,10 @@ struct lu_target {
        unsigned long           **lut_reply_bitmap;
        /** target sync count, used for debug & test */
        atomic_t                 lut_sync_count;
+
+       /** cross MDT locks which should trigger Sync-on-Lock-Cancel */
+       spinlock_t               lut_slc_locks_guard;
+       struct list_head         lut_slc_locks;
 };
 
 /* number of slots in reply bitmap */
@@ -433,8 +437,9 @@ int tgt_hpreq_handler(struct ptlrpc_request *req);
 
 /* target/tgt_main.c */
 void tgt_boot_epoch_update(struct lu_target *lut);
-void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno);
-void tgt_discard_slc_lock(struct ldlm_lock *lock);
+void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock,
+                      __u64 transno);
+void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock);
 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
                           struct obd_export *exp, __u64 transno);
 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp);
index 3e44b43..9fd6efc 100644 (file)
@@ -2392,11 +2392,15 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                break;
        }
        case LDLM_CB_CANCELING: {
+               struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+               struct mdt_device *mdt =
+                               mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev);
+
                LDLM_DEBUG(lock, "Revoke remote lock\n");
 
                /* discard slc lock here so that it can be cleaned anytime,
                 * especially for cleanup_resource() */
-               tgt_discard_slc_lock(lock);
+               tgt_discard_slc_lock(&mdt->mdt_lut, lock);
 
                /* once we cache lock, l_ast_data is set to mdt_object */
                if (lock->l_ast_data != NULL) {
@@ -2405,9 +2409,6 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                        rc = lu_env_init(&env, LCT_MD_THREAD);
                        if (unlikely(rc != 0)) {
-                               struct obd_device *obd;
-
-                               obd = ldlm_lock_to_ns(lock)->ns_obd;
                                CWARN("%s: lu_env initialization failed, object"
                                      "%p "DFID" is leaked!\n",
                                      obd->obd_name, mo,
@@ -2807,7 +2808,8 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
                        struct ptlrpc_request *req = mdt_info_req(info);
 
                        LASSERT(req != NULL);
-                       tgt_save_slc_lock(lock, req->rq_transno);
+                       tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
+                                         req->rq_transno);
                        ldlm_lock_decref(h, mode);
                }
                h->cookie = 0ull;
index 8d2fc37..2a2aa36 100644 (file)
@@ -288,5 +288,5 @@ void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new);
 int top_trans_create_tmt(const struct lu_env *env,
                         struct top_thandle *top_th);
 
-void tgt_cancel_slc_locks(__u64 transno);
+void tgt_cancel_slc_locks(struct lu_target *tgt, __u64 transno);
 #endif /* _TG_INTERNAL_H */
index e44346a..3e4bd00 100644 (file)
@@ -800,7 +800,7 @@ static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
                spin_unlock(&ccb->llcc_tgt->lut_translock);
 
                ptlrpc_commit_replies(ccb->llcc_exp);
-               tgt_cancel_slc_locks(ccb->llcc_transno);
+               tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
        } else {
                spin_unlock(&ccb->llcc_tgt->lut_translock);
        }
index e32e558..871335d 100644 (file)
 #include "tgt_internal.h"
 #include "../ptlrpc/ptlrpc_internal.h"
 
-static spinlock_t uncommitted_slc_locks_guard;
-static struct list_head uncommitted_slc_locks;
-
 /*
- * Save cross-MDT lock in uncommitted_slc_locks.
+ * Save cross-MDT lock in lut_slc_locks.
  *
  * Lock R/W count is not saved, but released in unlock (not canceled remotely),
  * instead only a refcount is taken, so that the remote MDT where the object
  * resides can detect conflict with this lock there.
  *
+ * \param lut target
  * \param lock cross-MDT lock to save
  * \param transno when the transaction with this transno is committed, this lock
  *               can be canceled.
  */
-void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno)
+void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock,
+                      __u64 transno)
 {
-       spin_lock(&uncommitted_slc_locks_guard);
+       spin_lock(&lut->lut_slc_locks_guard);
        lock_res_and_lock(lock);
        if (ldlm_is_cbpending(lock)) {
                /* if it was canceld by server, don't save, because remote MDT
@@ -65,27 +64,27 @@ void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno)
                 * both use this lock, and save it after use, so for the second
                 * one, just put the refcount. */
                if (list_empty(&lock->l_slc_link))
-                       list_add_tail(&lock->l_slc_link,
-                                     &uncommitted_slc_locks);
+                       list_add_tail(&lock->l_slc_link, &lut->lut_slc_locks);
                else
                        LDLM_LOCK_PUT(lock);
        }
        unlock_res_and_lock(lock);
-       spin_unlock(&uncommitted_slc_locks_guard);
+       spin_unlock(&lut->lut_slc_locks_guard);
 }
 EXPORT_SYMBOL(tgt_save_slc_lock);
 
 /*
- * Discard cross-MDT lock from uncommitted_slc_locks.
+ * Discard cross-MDT lock from lut_slc_locks.
  *
- * This is called upon BAST, just remove lock from uncommitted_slc_locks and put
- * lock refcount. The BAST will cancel this lock.
+ * This is called upon BAST, just remove lock from lut_slc_locks and put lock
+ * refcount. The BAST will cancel this lock.
  *
+ * \param lut target
  * \param lock cross-MDT lock to discard
  */
-void tgt_discard_slc_lock(struct ldlm_lock *lock)
+void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock)
 {
-       spin_lock(&uncommitted_slc_locks_guard);
+       spin_lock(&lut->lut_slc_locks_guard);
        lock_res_and_lock(lock);
        /* may race with tgt_cancel_slc_locks() */
        if (lock->l_transno != 0) {
@@ -96,26 +95,26 @@ void tgt_discard_slc_lock(struct ldlm_lock *lock)
                LDLM_LOCK_PUT(lock);
        }
        unlock_res_and_lock(lock);
-       spin_unlock(&uncommitted_slc_locks_guard);
+       spin_unlock(&lut->lut_slc_locks_guard);
 }
 EXPORT_SYMBOL(tgt_discard_slc_lock);
 
 /*
  * Cancel cross-MDT locks upon transaction commit.
  *
- * Remove cross-MDT locks from uncommitted_slc_locks, cancel them and put lock
- * refcount.
+ * Remove cross-MDT locks from lut_slc_locks, cancel them and put lock refcount.
  *
+ * \param lut target
  * \param transno transaction with this number was committed.
  */
-void tgt_cancel_slc_locks(__u64 transno)
+void tgt_cancel_slc_locks(struct lu_target *lut, __u64 transno)
 {
        struct ldlm_lock *lock, *next;
        LIST_HEAD(list);
        struct lustre_handle lockh;
 
-       spin_lock(&uncommitted_slc_locks_guard);
-       list_for_each_entry_safe(lock, next, &uncommitted_slc_locks,
+       spin_lock(&lut->lut_slc_locks_guard);
+       list_for_each_entry_safe(lock, next, &lut->lut_slc_locks,
                                 l_slc_link) {
                lock_res_and_lock(lock);
                LASSERT(lock->l_transno != 0);
@@ -134,7 +133,7 @@ void tgt_cancel_slc_locks(__u64 transno)
                list_move(&lock->l_slc_link, &list);
                unlock_res_and_lock(lock);
        }
-       spin_unlock(&uncommitted_slc_locks_guard);
+       spin_unlock(&lut->lut_slc_locks_guard);
 
        list_for_each_entry_safe(lock, next, &list, l_slc_link) {
                list_del_init(&lock->l_slc_link);
@@ -182,6 +181,9 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        spin_lock_init(&lut->lut_flags_lock);
        lut->lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
 
+       spin_lock_init(&lut->lut_slc_locks_guard);
+       INIT_LIST_HEAD(&lut->lut_slc_locks);
+
        /* last_rcvd initialization is needed by replayable targets only */
        if (!obd->obd_replayable)
                RETURN(0);
@@ -412,9 +414,6 @@ int tgt_mod_init(void)
 
        update_info_init();
 
-       spin_lock_init(&uncommitted_slc_locks_guard);
-       INIT_LIST_HEAD(&uncommitted_slc_locks);
-
        RETURN(0);
 }