Whamcloud - gitweb
Branch b1_6
authorbobijam <bobijam>
Tue, 23 Dec 2008 07:26:20 +0000 (07:26 +0000)
committerbobijam <bobijam>
Tue, 23 Dec 2008 07:26:20 +0000 (07:26 +0000)
b=17046
i=johann
i=oleg.drokin (green)

Reconstruct flock completion ast to make it in line with ldlm_completion_ast().

lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_request.c

index c77b40c..10be102 100644 (file)
@@ -101,11 +101,12 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         LASSERT(list_empty(&lock->l_flock_waitq));
 
         list_del_init(&lock->l_res_link);
-        if (flags == LDLM_FL_WAIT_NOREPROC) {
+        if (flags == LDLM_FL_WAIT_NOREPROC &&
+            !(lock->l_flags & LDLM_FL_FAILED)) {
                 /* client side - set a flag to prevent sending a CANCEL */
                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
 
-                /* when reaching here, it is under lock_res_and_lock(). Thus, 
+                /* when reaching here, it is under lock_res_and_lock(). Thus,
                    need call the nolock version of ldlm_lock_decref_internal*/
                 ldlm_lock_decref_internal_nolock(lock, mode);
         }
@@ -370,7 +371,8 @@ reprocess:
                                         NULL, 0);
                         lock_res_and_lock(req);
                         if (!new2) {
-                                ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
+                                ldlm_flock_destroy(req, lock->l_granted_mode,
+                                                   *flags);
                                 *err = -ENOLCK;
                                 RETURN(LDLM_ITER_STOP);
                         }
@@ -391,14 +393,15 @@ reprocess:
                 new2->l_conn_export = lock->l_conn_export;
                 if (lock->l_export != NULL) {
                         new2->l_export = class_export_get(lock->l_export);
-                        if (new2->l_export->exp_lock_hash && 
+                        if (new2->l_export->exp_lock_hash &&
                             hlist_unhashed(&new2->l_exp_hash))
                                 lustre_hash_add(new2->l_export->exp_lock_hash,
                                                 &new2->l_remote_handle,
                                                 &new2->l_exp_hash);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC)
-                        ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
+                        ldlm_lock_addref_internal_nolock(new2,
+                                                         lock->l_granted_mode);
 
                 /* insert new2 at lock */
                 ldlm_resource_add_lock(res, ownlocks, new2);
@@ -423,7 +426,7 @@ reprocess:
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
                 if (first_enq) {
                         /* If this is an unlock, reprocess the waitq and
-                         * send completions ASTs for locks that can now be 
+                         * send completions ASTs for locks that can now be
                          * granted. The only problem with doing this
                          * reprocessing here is that the completion ASTs for
                          * newly granted locks will be sent before the unlock
@@ -433,7 +436,7 @@ reprocess:
                          * ldlm_reprocess_queue. */
                         if ((mode == LCK_NL) && overlaps) {
                                 struct list_head rpc_list
-                                                    = CFS_LIST_HEAD_INIT(rpc_list);
+                                                 = CFS_LIST_HEAD_INIT(rpc_list);
                                 int rc;
 restart:
                                 ldlm_reprocess_queue(res, &res->lr_waiting,
@@ -494,17 +497,26 @@ ldlm_flock_interrupted_wait(void *data)
         EXIT;
 }
 
+/**
+ * Flock completion calback function.
+ *
+ * \param lock [in,out]: A lock to be handled
+ * \param flags    [in]: flags
+ * \param *data    [in]: ldlm_run_cp_ast_work() will use ldlm_cb_set_arg
+ *
+ * \retval 0    : success
+ * \retval <0   : failure
+ */
 int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
-        struct ldlm_namespace *ns;
-        cfs_flock_t *getlk = lock->l_ast_data;
-        struct ldlm_flock_wait_data fwd;
-        struct obd_device *obd;
-        struct obd_import *imp = NULL;
-        ldlm_error_t err;
-        int rc = 0;
-        struct l_wait_info lwi;
+        cfs_flock_t                    *getlk = lock->l_ast_data;
+        struct obd_device              *obd;
+        struct obd_import              *imp = NULL;
+        struct ldlm_flock_wait_data     fwd;
+        struct l_wait_info              lwi;
+        ldlm_error_t                    err;
+        int                             rc = 0;
         ENTRY;
 
         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
@@ -515,11 +527,12 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
          * holding the lock even if app still believes it has it, since
          * server already dropped it anyway. Only for granted locks too. */
         lock_res_and_lock(lock);
-        if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) == 
+        if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
                 unlock_res_and_lock(lock);
                 if (lock->l_req_mode == lock->l_granted_mode &&
-                    lock->l_granted_mode != LCK_NL)
+                    lock->l_granted_mode != LCK_NL &&
+                    NULL == data)
                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
                 RETURN(0);
         }
@@ -528,20 +541,25 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 
         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                       LDLM_FL_BLOCK_CONV)))
-                goto  granted;
+                       LDLM_FL_BLOCK_CONV))) {
+                if (NULL == data)
+                        /* mds granted the lock in the reply */
+                        goto granted;
+                /* CP AST RPC: lock get granted, wake it up */
+                cfs_waitq_signal(&lock->l_waitq);
+                RETURN(0);
+        }
 
         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
                    "sleeping");
-
         fwd.fwd_lock = lock;
         obd = class_exp2obd(lock->l_conn_export);
 
-        /* if this is a local lock, then there is no import */
-        if (obd != NULL)
+        /* if this is a local lock, there is no import */
+        if (NULL != obd)
                 imp = obd->u.cli.cl_import;
 
-        if (imp != NULL) {
+        if (NULL != imp) {
                 spin_lock(&imp->imp_lock);
                 fwd.fwd_generation = imp->imp_generation;
                 spin_unlock(&imp->imp_lock);
@@ -550,27 +568,31 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
 
         /* Go to sleep until the lock is granted. */
-        rc = l_wait_event(lock->l_waitq,
-                          ((lock->l_req_mode == lock->l_granted_mode) ||
-                           lock->l_destroyed), &lwi);
+        rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
+
+        if (rc) {
+                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
+                           rc);
+                RETURN(rc);
+        }
 
-        LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
-        RETURN(rc);
 granted:
         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
-        LDLM_DEBUG(lock, "client-side enqueue granted");
-        ns = lock->l_resource->lr_namespace;
-        lock_res_and_lock(lock);
 
-        /* before flock's complete ast gets here, the flock
-         * can possibly be freed by another thread
-         */
-        if (lock->l_destroyed) {
-                LDLM_DEBUG(lock, "already destroyed by another thread");
+        lock_res_and_lock(lock);
+        if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
+                LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
                 unlock_res_and_lock(lock);
-                RETURN(0);
+                RETURN(-EIO);
         }
+        if (rc) {
+                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
+                           rc);
+                unlock_res_and_lock(lock);
+                RETURN(rc);
+        }
+
+        LDLM_DEBUG(lock, "client-side enqueue granted");
 
         /* take lock off the deadlock detection waitq. */
         spin_lock(&ldlm_flock_waitq_lock);
@@ -583,8 +605,9 @@ granted:
         if (flags & LDLM_FL_TEST_LOCK) {
                 /* fcntl(F_GETLK) request */
                 /* The old mode was saved in getlk->fl_type so that if the mode
-                 * in the lock changes we can decref the approprate refcount. */
-                ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC);
+                 * in the lock changes we can decref the appropriate refcount.*/
+                ldlm_flock_destroy(lock, cfs_flock_type(getlk),
+                                   LDLM_FL_WAIT_NOREPROC);
                 switch (lock->l_granted_mode) {
                 case LCK_PR:
                         cfs_flock_set_type(getlk, F_RDLCK);
@@ -595,17 +618,18 @@ granted:
                 default:
                         cfs_flock_set_type(getlk, F_UNLCK);
                 }
-                cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
-                cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start);
-                cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end);
+                cfs_flock_set_pid(getlk,
+                                  (pid_t)lock->l_policy_data.l_flock.pid);
+                cfs_flock_set_start(getlk,
+                                    (loff_t)lock->l_policy_data.l_flock.start);
+                cfs_flock_set_end(getlk,
+                                  (loff_t)lock->l_policy_data.l_flock.end);
         } else {
                 int noreproc = LDLM_FL_WAIT_NOREPROC;
 
                 /* We need to reprocess the lock to do merges or splits
                  * with existing locks owned by this process. */
                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
-                if (flags == 0)
-                        cfs_waitq_signal(&lock->l_waitq);
         }
         unlock_res_and_lock(lock);
         RETURN(0);
index c316918..fee4b24 100644 (file)
@@ -45,19 +45,19 @@ extern struct list_head ldlm_cli_namespace_list;
 
 static inline atomic_t *ldlm_namespace_nr(ldlm_side_t client)
 {
-        return client == LDLM_NAMESPACE_SERVER ? 
+        return client == LDLM_NAMESPACE_SERVER ?
                 &ldlm_srv_namespace_nr : &ldlm_cli_namespace_nr;
 }
 
 static inline struct list_head *ldlm_namespace_list(ldlm_side_t client)
 {
-        return client == LDLM_NAMESPACE_SERVER ? 
+        return client == LDLM_NAMESPACE_SERVER ?
                 &ldlm_srv_namespace_list : &ldlm_cli_namespace_list;
 }
 
 static inline struct semaphore *ldlm_namespace_lock(ldlm_side_t client)
 {
-        return client == LDLM_NAMESPACE_SERVER ? 
+        return client == LDLM_NAMESPACE_SERVER ?
                 &ldlm_srv_namespace_lock : &ldlm_cli_namespace_lock;
 }
 
@@ -75,11 +75,11 @@ enum {
         LDLM_CANCEL_LRUR   = 1 << 3  /* Cancel locks from lru resize. */
 };
 
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, 
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
                     int flags);
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                           int count, int max, int cancel_flags, int flags);
-int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns, int count, int max, 
+int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns, int count, int max,
                              int flags);
 extern int ldlm_enqueue_min;
 int ldlm_get_enq_timeout(struct ldlm_lock *lock);
@@ -88,7 +88,7 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock);
 int ldlm_resource_putref_locked(struct ldlm_resource *res);
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
-void ldlm_namespace_free_prior(struct ldlm_namespace *ns, 
+void ldlm_namespace_free_prior(struct ldlm_namespace *ns,
                                struct obd_import *imp, int force);
 void ldlm_namespace_free_post(struct ldlm_namespace *ns);
 
@@ -235,3 +235,17 @@ typedef enum ldlm_policy_res ldlm_policy_res_t;
                 return rc;                                                  \
         }                                                                   \
         struct __##var##__dummy_write {;} /* semicolon catcher */
+
+static inline int is_granted_or_cancelled(struct ldlm_lock *lock)
+{
+        int ret = 0;
+
+        lock_res_and_lock(lock);
+        if (((lock->l_req_mode == lock->l_granted_mode) &&
+             !(lock->l_flags & LDLM_FL_CP_REQD)) ||
+            (lock->l_flags & LDLM_FL_FAILED))
+                ret = 1;
+        unlock_res_and_lock(lock);
+
+        return ret;
+}
index f073a85..a39dc71 100644 (file)
@@ -116,20 +116,6 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock)
 }
 EXPORT_SYMBOL(ldlm_get_enq_timeout);
 
-static int is_granted_or_cancelled(struct ldlm_lock *lock)
-{
-        int ret = 0;
-
-        lock_res_and_lock(lock);
-        if (((lock->l_req_mode == lock->l_granted_mode) &&
-             !(lock->l_flags & LDLM_FL_CP_REQD)) ||
-            (lock->l_flags & LDLM_FL_FAILED))
-                ret = 1;
-        unlock_res_and_lock(lock);
-
-        return ret;
-}
-
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
         /* XXX ALLOCATE - 160 bytes */
@@ -188,7 +174,7 @@ noreproc:
                 spin_unlock(&imp->imp_lock);
         }
 
-        if (ns_is_client(lock->l_resource->lr_namespace) && 
+        if (ns_is_client(lock->l_resource->lr_namespace) &&
             lock->l_resource->lr_type == LDLM_EXTENT &&
             OBD_FAIL_CHECK(OBD_FAIL_LDLM_INTR_CP_AST | OBD_FAIL_ONCE)) {
                 obd_fail_loc = OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE;
@@ -196,7 +182,7 @@ noreproc:
                 rc = -EINTR;
         } else {
                 /* Go to sleep until the lock is granted or cancelled. */
-                rc = l_wait_event(lock->l_waitq, 
+                rc = l_wait_event(lock->l_waitq,
                                   is_granted_or_cancelled(lock), &lwi);
         }
 
@@ -364,20 +350,20 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
         /* Set a flag to prevent us from sending a CANCEL (bug 407) */
         lock_res_and_lock(lock);
         /* Check that lock is not granted or failed, we might race. */
-        if ((lock->l_req_mode != lock->l_granted_mode) && 
+        if ((lock->l_req_mode != lock->l_granted_mode) &&
             !(lock->l_flags & LDLM_FL_FAILED)) {
                 /* Make sure that this lock will not be found by raced
-                 * bl_ast and -EINVAL reply is sent to server anyways. 
+                 * bl_ast and -EINVAL reply is sent to server anyways.
                  * bug 17645 */
-                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED | 
+                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
                                  LDLM_FL_ATOMIC_CB;
                 need_cancel = 1;
         }
         unlock_res_and_lock(lock);
-  
+
         if (need_cancel) {
-                LDLM_DEBUG(lock, 
-                           "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | " 
+                LDLM_DEBUG(lock,
+                           "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | "
                            "LDLM_FL_ATOMIC_CB");
                 ldlm_lock_decref_and_cancel(lockh, mode);
         } else {
@@ -1932,10 +1918,10 @@ static int replay_lock_interpret(struct ptlrpc_request *req,
         lock->l_remote_handle = reply->lock_handle;
 
         /* Key change rehash lock in per-export hash with new key */
-       exp = req->rq_export;
+        exp = req->rq_export;
         if (exp && exp->exp_lock_hash)
                 lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key,
-                                      &lock->l_remote_handle,
+                                       &lock->l_remote_handle,
                                        &lock->l_exp_hash);
 
         LDLM_DEBUG(lock, "replayed lock:");