Whamcloud - gitweb
b=21128 send cancel rpc in a separate thread
authorVitaly Fertman <Vitaly.Fertman@Sun.COM>
Wed, 26 May 2010 20:51:03 +0000 (00:51 +0400)
committerJohann Lombardi <johann@sun.com>
Fri, 28 May 2010 11:30:55 +0000 (13:30 +0200)
i=oleg.drokin
i=rahul

run sync ldlm_bl_to_thread_list() in separate thread to save stack space.

lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c

index e068ba5..12dc18c 100644 (file)
@@ -134,7 +134,7 @@ void ldlm_cancel_locks_for_export(struct obd_export *export);
 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
                            struct ldlm_lock *lock);
 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                           struct list_head *cancels, int count);
+                           struct list_head *cancels, int count, int mode);
 
 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
index 80b204b..a656fa8 100644 (file)
@@ -133,9 +133,21 @@ struct ldlm_bl_work_item {
         struct ldlm_lock        *blwi_lock;
         struct list_head        blwi_head;
         int                     blwi_count;
+        struct completion       blwi_comp;
+        atomic_t                blwi_ref_count;
 };
 
 #ifdef __KERNEL__
+static inline void ldlm_bl_work_item_get(struct ldlm_bl_work_item *blwi)
+{
+        atomic_inc(&blwi->blwi_ref_count);
+}
+
+static inline void ldlm_bl_work_item_put(struct ldlm_bl_work_item *blwi)
+{
+        if (atomic_dec_and_test(&blwi->blwi_ref_count))
+                OBD_FREE(blwi, sizeof(*blwi));
+}
 
 static inline int have_expired_locks(void)
 {
@@ -1580,20 +1592,21 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 }
 
 #ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+static int __ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_bl_work_item *blwi,
                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
-                             struct list_head *cancels, int count)
+                             struct list_head *cancels, int count, int mode)
 {
         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
-        struct ldlm_bl_work_item *blwi;
         ENTRY;
 
-        if (cancels && count == 0)
+        if (cancels && count == 0) {
+                if (mode == LDLM_ASYNC)
+                        OBD_FREE(blwi, sizeof(*blwi));
                 RETURN(0);
+        }
 
-        OBD_ALLOC(blwi, sizeof(*blwi));
-        if (blwi == NULL)
-                RETURN(-ENOMEM);
+        init_completion(&blwi->blwi_comp);
+        atomic_set(&blwi->blwi_ref_count, 1);
 
         blwi->blwi_ns = ns;
         if (ld != NULL)
@@ -1605,6 +1618,7 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
         } else {
                 blwi->blwi_lock = lock;
         }
+
         spin_lock(&blp->blp_lock);
         if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
@@ -1613,28 +1627,60 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
                 /* other blocking callbacks are added to the regular list */
                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
         }
-        cfs_waitq_signal(&blp->blp_waitq);
         spin_unlock(&blp->blp_lock);
 
+        if (mode == LDLM_SYNC) {
+                /* keep ref count as object is on this stack for SYNC call */
+                ldlm_bl_work_item_get(blwi);
+                cfs_waitq_signal(&blp->blp_waitq);
+                wait_for_completion(&blwi->blwi_comp);
+        } else {
+                cfs_waitq_signal(&blp->blp_waitq);
+        }
+
         RETURN(0);
 }
+
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+                             struct list_head *cancels, int count, int mode)
+{
+        ENTRY;
+
+        if (mode == LDLM_SYNC) {
+                /* if it is synchronous call do minimum mem alloc, as it could
+                 * be triggered from kernel shrinker
+                 */
+                struct ldlm_bl_work_item blwi;
+                memset(&blwi, 0, sizeof(blwi));
+                /* have extra ref as this obj is on stack */
+                RETURN(__ldlm_bl_to_thread(ns, &blwi, ld, lock, cancels, count, mode));
+        } else {
+                struct ldlm_bl_work_item *blwi;
+                OBD_ALLOC(blwi, sizeof(*blwi));
+                if (blwi == NULL)
+                        RETURN(-ENOMEM);
+
+                RETURN(__ldlm_bl_to_thread(ns, blwi, ld, lock, cancels, count, mode));
+        }
+}
 #endif
 
 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
                            struct ldlm_lock *lock)
 {
 #ifdef __KERNEL__
-        RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+        RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
 #else
         RETURN(-ENOSYS);
 #endif
 }
 
 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                           struct list_head *cancels, int count)
+                           struct list_head *cancels, int count, int mode)
 {
 #ifdef __KERNEL__
-        RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
+        RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
 #else
         RETURN(-ENOSYS);
 #endif
@@ -1977,7 +2023,8 @@ static int ldlm_bl_thread_main(void *arg)
                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
                                                 blwi->blwi_lock);
                 }
-                OBD_FREE(blwi, sizeof(*blwi));
+                complete(&blwi->blwi_comp);
+                ldlm_bl_work_item_put(blwi);
         }
 
         atomic_dec(&blp->blp_busy_threads);
index 773dbd6..b92c925 100644 (file)
@@ -1466,7 +1466,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  when called with LDLM_SYNC the blocking
  * callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t mode,
                     int flags)
 {
         CFS_LIST_HEAD(cancels);
@@ -1474,19 +1474,15 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
         ENTRY;
 
 #ifndef __KERNEL__
-        sync = LDLM_SYNC; /* force to be sync in user space */
+        mode = LDLM_SYNC; /* force to be sync in user space */
 #endif
         count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
-        if (sync == LDLM_ASYNC) {
-                rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
-                if (rc == 0)
-                        RETURN(count);
-        }
 
-        /* If an error occured in ASYNC mode, or this is SYNC mode,
-         * cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
-        RETURN(count);
+        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, mode);
+        if (rc == 0)
+                RETURN(count);
+
+        RETURN(0);
 }
 
 /* Find and cancel locally unused locks found on resource, matched to the