Whamcloud - gitweb
LU-56 obdclass: SMP improvement for lu_key
authorLiang Zhen <liang@whamcloud.com>
Thu, 17 May 2012 06:47:15 +0000 (14:47 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 28 Jun 2012 03:37:30 +0000 (23:37 -0400)
ptlrpc service threads need to call lu_context_init/fini in each
loop (for each RPC), this could be a big performance issue on
fat SMP machine if we always add lu_context to remember list
because operations on lu_context with LCT_REMEMBER are serialized
by one spinlock and we need to lock/unlock it for multiple times
for each RPC.

But we found LCT_REMEMBER is abused at here, it's impossible that
server stack is being unloaded but service threads are still
running, so we can simply remove LCT_REMEMBER flag from ->rq_session,
and made some small changes to bypass global lock in lu_context_init
and lu_context_fini if w/o LCT_REMEMBER.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I5875a90365a103707526483047ec7628f6964a56
Reviewed-on: http://review.whamcloud.com/2824
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/obdclass/lu_object.c
lustre/ptlrpc/service.c

index fe1d9fc..9d6746d 100644 (file)
@@ -1284,13 +1284,14 @@ static void key_fini(struct lu_context *ctx, int index)
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
                 lu_ref_del(&key->lct_reference, "ctx", ctx);
                 cfs_atomic_dec(&key->lct_used);
-                LASSERT(key->lct_owner != NULL);
-                if (!(ctx->lc_tags & LCT_NOREF)) {
-                        LASSERT(cfs_module_refcount(key->lct_owner) > 0);
-                        cfs_module_put(key->lct_owner);
-                }
-                ctx->lc_value[index] = NULL;
-        }
+
+               LASSERT(key->lct_owner != NULL);
+               if ((ctx->lc_tags & LCT_NOREF) == 0) {
+                       LINVRNT(cfs_module_refcount(key->lct_owner) > 0);
+                       cfs_module_put(key->lct_owner);
+               }
+               ctx->lc_value[index] = NULL;
+       }
 }
 
 /**
@@ -1455,17 +1456,16 @@ EXPORT_SYMBOL(lu_context_key_revive);
 
 static void keys_fini(struct lu_context *ctx)
 {
-        int i;
+       int     i;
 
-        cfs_spin_lock(&lu_keys_guard);
-        if (ctx->lc_value != NULL) {
-                for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
-                        key_fini(ctx, i);
-                OBD_FREE(ctx->lc_value,
-                         ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
-                ctx->lc_value = NULL;
-        }
-        cfs_spin_unlock(&lu_keys_guard);
+       if (ctx->lc_value == NULL)
+               return;
+
+       for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
+               key_fini(ctx, i);
+
+       OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+       ctx->lc_value = NULL;
 }
 
 static int keys_fill(struct lu_context *ctx)
@@ -1514,17 +1514,11 @@ static int keys_fill(struct lu_context *ctx)
 
 static int keys_init(struct lu_context *ctx)
 {
-        int result;
-
-        OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
-        if (likely(ctx->lc_value != NULL))
-                result = keys_fill(ctx);
-        else
-                result = -ENOMEM;
+       OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+       if (likely(ctx->lc_value != NULL))
+               return keys_fill(ctx);
 
-        if (result != 0)
-                keys_fini(ctx);
-        return result;
+       return -ENOMEM;
 }
 
 /**
@@ -1532,6 +1526,8 @@ static int keys_init(struct lu_context *ctx)
  */
 int lu_context_init(struct lu_context *ctx, __u32 tags)
 {
+       int     rc;
+
         memset(ctx, 0, sizeof *ctx);
         ctx->lc_state = LCS_INITIALIZED;
         ctx->lc_tags = tags;
@@ -1539,9 +1535,15 @@ int lu_context_init(struct lu_context *ctx, __u32 tags)
                 cfs_spin_lock(&lu_keys_guard);
                 cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
                 cfs_spin_unlock(&lu_keys_guard);
-        } else
-                CFS_INIT_LIST_HEAD(&ctx->lc_remember);
-        return keys_init(ctx);
+       } else {
+               CFS_INIT_LIST_HEAD(&ctx->lc_remember);
+       }
+
+       rc = keys_init(ctx);
+       if (rc != 0)
+               lu_context_fini(ctx);
+
+       return rc;
 }
 EXPORT_SYMBOL(lu_context_init);
 
@@ -1550,12 +1552,19 @@ EXPORT_SYMBOL(lu_context_init);
  */
 void lu_context_fini(struct lu_context *ctx)
 {
-        LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
-        ctx->lc_state = LCS_FINALIZED;
-        keys_fini(ctx);
-        cfs_spin_lock(&lu_keys_guard);
-        cfs_list_del_init(&ctx->lc_remember);
-        cfs_spin_unlock(&lu_keys_guard);
+       LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
+       ctx->lc_state = LCS_FINALIZED;
+
+       if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
+               LASSERT(cfs_list_empty(&ctx->lc_remember));
+               keys_fini(ctx);
+
+       } else { /* could race with key degister */
+               cfs_spin_lock(&lu_keys_guard);
+               keys_fini(ctx);
+               cfs_list_del_init(&ctx->lc_remember);
+               cfs_spin_unlock(&lu_keys_guard);
+       }
 }
 EXPORT_SYMBOL(lu_context_fini);
 
index c95f76e..13f699e 100644 (file)
@@ -1746,8 +1746,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                                    at_get(&svcpt->scp_at_estimate));
         }
 
-        rc = lu_context_init(&request->rq_session,
-                             LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
+       rc = lu_context_init(&request->rq_session, LCT_SESSION | LCT_NOREF);
         if (rc) {
                 CERROR("Failure to initialize session: %d\n", rc);
                 goto out_req;