Whamcloud - gitweb
LU-933 mdc: allow disabling the mdc_rpc_lock
authorLiang Zhen <liang@whamcloud.com>
Fri, 3 Feb 2012 15:43:55 +0000 (23:43 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 11 Jun 2012 13:13:36 +0000 (09:13 -0400)
It is desirable to allow disabling the client mdc_{get,put}_rpc_lock()
in order to allow clients to send multiple filesystem-modifying RPCs
at the same time. While this would break MDS recovery (due to
insufficient transaction slots in the MDS last_rcvd file) it would
allow a smaller number of clients to generate a much higher RPC load
on the MDS. This is ideal for MDS/RPC load testing purposes, and can
also be used to help evaluate the potential benefits of implementing
the multi-slot last_rcvd feature.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I39be390ed37f0327bf7eb8015d528195b8568e0b
Reviewed-on: http://review.whamcloud.com/2084
Tested-by: Hudson
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/include/libcfs/libcfs_fail.h
libcfs/include/libcfs/user-prim.h
lustre/include/lustre_mdc.h
lustre/include/obd_support.h
lustre/tests/sanity.sh

index b60a313..6dda76b 100644 (file)
@@ -74,35 +74,51 @@ enum {
                               (cfs_fail_loc & CFS_FAIL_MASK_LOC) ==           \
                               ((id) & CFS_FAIL_MASK_LOC))
 
                               (cfs_fail_loc & CFS_FAIL_MASK_LOC) ==           \
                               ((id) & CFS_FAIL_MASK_LOC))
 
-static inline int cfs_fail_check_set(__u32 id, __u32 value, int set)
+static inline int cfs_fail_check_set(__u32 id, __u32 value,
+                                    int set, int quiet)
 {
 {
-        int ret = 0;
-
-        if (unlikely(CFS_FAIL_PRECHECK(id) &&
-                     (ret = __cfs_fail_check_set(id, value, set))))
-                LCONSOLE_INFO("*** cfs_fail_loc=%x, val=%u***\n", id, value);
-
-        return ret;
+       int ret = 0;
+
+       if (unlikely(CFS_FAIL_PRECHECK(id) &&
+                    (ret = __cfs_fail_check_set(id, value, set)))) {
+               if (quiet) {
+                       CDEBUG(D_INFO, "*** cfs_fail_loc=%x, val=%u***\n",
+                              id, value);
+               } else {
+                       LCONSOLE_INFO("*** cfs_fail_loc=%x, val=%u***\n",
+                                     id, value);
+               }
+       }
+
+       return ret;
 }
 
 /* If id hit cfs_fail_loc, return 1, otherwise return 0 */
 #define CFS_FAIL_CHECK(id) \
 }
 
 /* If id hit cfs_fail_loc, return 1, otherwise return 0 */
 #define CFS_FAIL_CHECK(id) \
-        cfs_fail_check_set(id, 0, CFS_FAIL_LOC_NOSET)
+       cfs_fail_check_set(id, 0, CFS_FAIL_LOC_NOSET, 0)
+#define CFS_FAIL_CHECK_QUIET(id) \
+       cfs_fail_check_set(id, 0, CFS_FAIL_LOC_NOSET, 1)
 
 /* If id hit cfs_fail_loc and cfs_fail_val == (-1 or value) return 1,
  * otherwise return 0 */
 #define CFS_FAIL_CHECK_VALUE(id, value) \
 
 /* If id hit cfs_fail_loc and cfs_fail_val == (-1 or value) return 1,
  * otherwise return 0 */
 #define CFS_FAIL_CHECK_VALUE(id, value) \
-        cfs_fail_check_set(id, value, CFS_FAIL_LOC_VALUE)
+       cfs_fail_check_set(id, value, CFS_FAIL_LOC_VALUE, 0)
+#define CFS_FAIL_CHECK_VALUE_QUIET(id, value) \
+       cfs_fail_check_set(id, value, CFS_FAIL_LOC_VALUE, 1)
 
 /* If id hit cfs_fail_loc, cfs_fail_loc |= value and return 1,
  * otherwise return 0 */
 #define CFS_FAIL_CHECK_ORSET(id, value) \
 
 /* If id hit cfs_fail_loc, cfs_fail_loc |= value and return 1,
  * otherwise return 0 */
 #define CFS_FAIL_CHECK_ORSET(id, value) \
-        cfs_fail_check_set(id, value, CFS_FAIL_LOC_ORSET)
+       cfs_fail_check_set(id, value, CFS_FAIL_LOC_ORSET, 0)
+#define CFS_FAIL_CHECK_ORSET_QUIET(id, value) \
+       cfs_fail_check_set(id, value, CFS_FAIL_LOC_ORSET, 1)
 
 /* If id hit cfs_fail_loc, cfs_fail_loc = value and return 1,
  * otherwise return 0 */
 #define CFS_FAIL_CHECK_RESET(id, value) \
 
 /* If id hit cfs_fail_loc, cfs_fail_loc = value and return 1,
  * otherwise return 0 */
 #define CFS_FAIL_CHECK_RESET(id, value) \
-        cfs_fail_check_set(id, value, CFS_FAIL_LOC_RESET)
+       cfs_fail_check_set(id, value, CFS_FAIL_LOC_RESET, 0)
+#define CFS_FAIL_CHECK_RESET_QUIET(id, value) \
+       cfs_fail_check_set(id, value, CFS_FAIL_LOC_RESET, 1)
 
 static inline int cfs_fail_timeout_set(__u32 id, __u32 value, int ms, int set)
 {
 
 static inline int cfs_fail_timeout_set(__u32 id, __u32 value, int ms, int set)
 {
index baad28a..76e6c33 100644 (file)
@@ -98,8 +98,10 @@ typedef long cfs_task_state_t;
 #define CFS_TASK_UNINT          (1)
 #define CFS_TASK_RUNNING        (2)
 
 #define CFS_TASK_UNINT          (1)
 #define CFS_TASK_RUNNING        (2)
 
+static inline void cfs_schedule(void)                  {}
+static inline void cfs_schedule_timeout(int64_t t)     {}
 
 
-/* 
+/*
  * Lproc
  */
 typedef int (cfs_read_proc_t)(char *page, char **start, off_t off,
  * Lproc
  */
 typedef int (cfs_read_proc_t)(char *page, char **start, off_t off,
index a90dfb8..8b10c64 100644 (file)
@@ -70,10 +70,13 @@ struct ptlrpc_request;
 struct obd_device;
 
 struct mdc_rpc_lock {
 struct obd_device;
 
 struct mdc_rpc_lock {
-        cfs_mutex_t           rpcl_mutex;
-        struct lookup_intent *rpcl_it;
+       cfs_mutex_t             rpcl_mutex;
+       struct lookup_intent    *rpcl_it;
+       int                     rpcl_fakes;
 };
 
 };
 
+#define MDC_FAKE_RPCL_IT ((void *)0x2c0012bfUL)
+
 static inline void mdc_init_rpc_lock(struct mdc_rpc_lock *lck)
 {
         cfs_mutex_init(&lck->rpcl_mutex);
 static inline void mdc_init_rpc_lock(struct mdc_rpc_lock *lck)
 {
         cfs_mutex_init(&lck->rpcl_mutex);
@@ -81,25 +84,67 @@ static inline void mdc_init_rpc_lock(struct mdc_rpc_lock *lck)
 }
 
 static inline void mdc_get_rpc_lock(struct mdc_rpc_lock *lck,
 }
 
 static inline void mdc_get_rpc_lock(struct mdc_rpc_lock *lck,
-                                    struct lookup_intent *it)
+                                   struct lookup_intent *it)
 {
 {
-        ENTRY;
-        if (!it || (it->it_op != IT_GETATTR && it->it_op != IT_LOOKUP)) {
-                cfs_mutex_lock(&lck->rpcl_mutex);
-                LASSERT(lck->rpcl_it == NULL);
-                lck->rpcl_it = it;
-        }
+       ENTRY;
+
+       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP))
+               return;
+
+       /* This would normally block until the existing request finishes.
+        * If fail_loc is set it will block until the regular request is
+        * done, then set rpcl_it to MDC_FAKE_RPCL_IT.  Once that is set
+        * it will only be cleared when all fake requests are finished.
+        * Only when all fake requests are finished can normal requests
+        * be sent, to ensure they are recoverable again. */
+ again:
+       cfs_mutex_lock(&lck->rpcl_mutex);
+
+       if (CFS_FAIL_CHECK_QUIET(OBD_FAIL_MDC_RPCS_SEM)) {
+               lck->rpcl_it = MDC_FAKE_RPCL_IT;
+               lck->rpcl_fakes++;
+               cfs_mutex_unlock(&lck->rpcl_mutex);
+               return;
+       }
+
+       /* This will only happen when the CFS_FAIL_CHECK() was
+        * just turned off but there are still requests in progress.
+        * Wait until they finish.  It doesn't need to be efficient
+        * in this extremely rare case, just have low overhead in
+        * the common case when it isn't true. */
+       while (unlikely(lck->rpcl_it == MDC_FAKE_RPCL_IT)) {
+               cfs_mutex_unlock(&lck->rpcl_mutex);
+               cfs_schedule_timeout(cfs_time_seconds(1) / 4);
+               goto again;
+       }
+
+       LASSERT(lck->rpcl_it == NULL);
+       lck->rpcl_it = it;
 }
 
 static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck,
 }
 
 static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck,
-                                    struct lookup_intent *it)
+                                   struct lookup_intent *it)
 {
 {
-        if (!it || (it->it_op != IT_GETATTR && it->it_op != IT_LOOKUP)) {
-                LASSERT(it == lck->rpcl_it);
-                lck->rpcl_it = NULL;
-                cfs_mutex_unlock(&lck->rpcl_mutex);
-        }
-        EXIT;
+       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP))
+               goto out;
+
+       if (lck->rpcl_it == MDC_FAKE_RPCL_IT) { /* OBD_FAIL_MDC_RPCS_SEM */
+               cfs_mutex_lock(&lck->rpcl_mutex);
+
+               LASSERTF(lck->rpcl_fakes > 0, "%d\n", lck->rpcl_fakes);
+               lck->rpcl_fakes--;
+
+               if (lck->rpcl_fakes == 0)
+                       lck->rpcl_it = NULL;
+
+       } else {
+               LASSERTF(it == lck->rpcl_it, "%p != %p\n", it, lck->rpcl_it);
+               lck->rpcl_it = NULL;
+       }
+
+       cfs_mutex_unlock(&lck->rpcl_mutex);
+ out:
+       EXIT;
 }
 
 static inline void mdc_update_max_ea_from_body(struct obd_export *exp,
 }
 
 static inline void mdc_update_max_ea_from_body(struct obd_export *exp,
index 3fe05d4..3ebcf91 100644 (file)
@@ -383,6 +383,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDC_ENQUEUE_PAUSE       0x801
 #define OBD_FAIL_MDC_OLD_EXT_FLAGS       0x802
 #define OBD_FAIL_MDC_GETATTR_ENQUEUE     0x803
 #define OBD_FAIL_MDC_ENQUEUE_PAUSE       0x801
 #define OBD_FAIL_MDC_OLD_EXT_FLAGS       0x802
 #define OBD_FAIL_MDC_GETATTR_ENQUEUE     0x803
+#define OBD_FAIL_MDC_RPCS_SEM           0x804
 
 #define OBD_FAIL_MGS                     0x900
 #define OBD_FAIL_MGS_ALL_REQUEST_NET     0x901
 
 #define OBD_FAIL_MGS                     0x900
 #define OBD_FAIL_MGS_ALL_REQUEST_NET     0x901
index 6214363..4500346 100644 (file)
@@ -8416,6 +8416,32 @@ test_181() { # bug 22177
 }
 run_test 181 "Test open-unlinked dir ========================"
 
 }
 run_test 181 "Test open-unlinked dir ========================"
 
+test_182() {
+       # disable MDC RPC lock wouldn't crash client
+       local fcount=1000
+       local tcount=4
+
+       mkdir -p $DIR/$tdir || error "creating dir $DIR/$tdir"
+#define OBD_FAIL_MDC_RPCS_SEM          0x804
+       $LCTL set_param fail_loc=0x804
+
+       for (( i=0; i < $tcount; i++ )) ; do
+               mkdir $DIR/$tdir/$i
+               createmany -o $DIR/$tdir/$i/f- $fcount &
+       done
+       wait
+
+       for (( i=0; i < $tcount; i++ )) ; do
+               unlinkmany $DIR/$tdir/$i/f- $fcount &
+       done
+       wait
+
+       rm -rf $DIR/$tdir
+
+       $LCTL set_param fail_loc=0
+}
+run_test 182 "Disable MDC RPCs semaphore wouldn't crash client ================"
+
 # OST pools tests
 POOL=${POOL:-cea1}
 TGT_COUNT=$OSTCOUNT
 # OST pools tests
 POOL=${POOL:-cea1}
 TGT_COUNT=$OSTCOUNT