Whamcloud - gitweb
b=22244 delegate lock cancel to blocking thread
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 4c7c2f0..f808843 100644 (file)
@@ -433,8 +433,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
 }
 
 static void failed_lock_cleanup(struct ldlm_namespace *ns,
-                                struct ldlm_lock *lock,
-                                struct lustre_handle *lockh, int mode)
+                                struct ldlm_lock *lock, int mode)
 {
         int need_cancel = 0;
 
@@ -447,25 +446,31 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                  * bl_ast and -EINVAL reply is sent to server anyways.
                  * bug 17645 */
                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
-                                 LDLM_FL_ATOMIC_CB;
+                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
                 need_cancel = 1;
         }
         unlock_res_and_lock(lock);
 
-        if (need_cancel) {
+        if (need_cancel)
                 LDLM_DEBUG(lock,
                            "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | "
-                           "LDLM_FL_ATOMIC_CB");
-                ldlm_lock_decref_and_cancel(lockh, mode);
-        } else {
+                           "LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
+        else
                 LDLM_DEBUG(lock, "lock was granted or failed in race");
-                ldlm_lock_decref(lockh, mode);
-        }
+
+        ldlm_lock_decref_internal(lock, mode);
 
         /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
          *       from llite/file.c/ll_file_flock(). */
+        /* This code makes for the fact that we do not have blocking handler on
+         * a client for flock locks. As such this is the place where we must
+         * completely kill failed locks. (interrupted and those that
+         * were waiting to be granted when server evicted us. */
         if (lock->l_resource->lr_type == LDLM_FLOCK) {
-                ldlm_lock_destroy(lock);
+                lock_res_and_lock(lock);
+                ldlm_resource_unlink_lock(lock);
+                ldlm_lock_destroy_nolock(lock);
+                unlock_res_and_lock(lock);
         }
 }
 
@@ -614,7 +619,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                         int err = lock->l_completion_ast(lock, *flags, NULL);
                         if (!rc)
                                 rc = err;
-                        if (rc && type != LDLM_FLOCK) /* bug 9425, bug 10250 */
+                        if (rc)
                                 cleanup_phase = 1;
                 }
         }
@@ -629,7 +634,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
         EXIT;
 cleanup:
         if (cleanup_phase == 1 && rc)
-                failed_lock_cleanup(ns, lock, lockh, mode);
+                failed_lock_cleanup(ns, lock, mode);
         /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
         LDLM_LOCK_PUT(lock);
         LDLM_LOCK_RELEASE(lock);
@@ -816,7 +821,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                                                 LUSTRE_DLM_VERSION,
                                                 LDLM_ENQUEUE);
                 if (req == NULL) {
-                        failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
+                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
                         LDLM_LOCK_RELEASE(lock);
                         RETURN(-ENOMEM);
                 }
@@ -1071,7 +1076,7 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
 /* Prepare and send a batched cancel rpc, it will include count lock handles
  * of locks given in @head. */
 int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels,
-                        int count, int flags)
+                        int count, ldlm_cancel_flags_t flags)
 {
         struct ptlrpc_request *req = NULL;
         struct obd_import *imp;
@@ -1125,7 +1130,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels,
                 ldlm_cancel_pack(req, cancels, count);
 
                 ptlrpc_request_set_replen(req);
-                if (flags & LDLM_FL_ASYNC) {
+                if (flags & LCF_ASYNC) {
                         ptlrpcd_add_req(req, PSCOPE_OTHER);
                         sent = count;
                         GOTO(out, 0);
@@ -1256,7 +1261,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 flags = ns_connect_lru_resize(ns) ?
                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
-                                               LDLM_FL_BL_AST, flags);
+                                               LCF_BL_AST, flags);
         }
         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(0);
@@ -1264,7 +1269,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
 
 /* XXX until we will have compound requests and can cut cancels from generic rpc
  * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
-static int ldlm_cancel_list(cfs_list_t *cancels, int count, int flags)
+int ldlm_cli_cancel_list_local(cfs_list_t *cancels, int count,
+                               ldlm_cancel_flags_t flags)
 {
         CFS_LIST_HEAD(head);
         struct ldlm_lock *lock, *next;
@@ -1275,13 +1281,13 @@ static int ldlm_cancel_list(cfs_list_t *cancels, int count, int flags)
                 if (left-- == 0)
                         break;
 
-                if (flags & LDLM_FL_LOCAL_ONLY) {
+                if (flags & LCF_LOCAL) {
                         rc = LDLM_FL_LOCAL_ONLY;
                         ldlm_lock_cancel(lock);
                 } else {
                         rc = ldlm_cli_cancel_local(lock);
                 }
-                if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) {
+                if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
                         LDLM_DEBUG(lock, "Cancel lock separately");
                         cfs_list_del_init(&lock->l_bl_ast);
                         cfs_list_add(&lock->l_bl_ast, &head);
@@ -1467,8 +1473,8 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
  *
  * flags & LDLM_CANCEL_AGED -   cancel alocks according to "aged policy".
  */
-int ldlm_cancel_lru_local(struct ldlm_namespace *ns, cfs_list_t *cancels,
-                          int count, int max, int cancel_flags, int flags)
+static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
+                                 int count, int max, int flags)
 {
         ldlm_cancel_lru_policy_t pf;
         struct ldlm_lock *lock, *next;
@@ -1578,14 +1584,25 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, cfs_list_t *cancels,
                 unused--;
         }
         cfs_spin_unlock(&ns->ns_unused_lock);
-        RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
+        RETURN(added);
+}
+
+int ldlm_cancel_lru_local(struct ldlm_namespace *ns, cfs_list_t *cancels,
+                          int count, int max, ldlm_cancel_flags_t cancel_flags,
+                          int flags)
+{
+        int added;
+        added = ldlm_prepare_lru_list(ns, cancels, count, max, flags);
+        if (added <= 0)
+                return added;
+        return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
 }
 
 /* when called with LDLM_ASYNC the blocking callback will be handled
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  when called with LDLM_SYNC the blocking
  * callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t mode,
                     int flags)
 {
         CFS_LIST_HEAD(cancels);
@@ -1593,19 +1610,16 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
         ENTRY;
 
 #ifndef __KERNEL__
-        sync = LDLM_SYNC; /* force to be sync in user space */
+        mode = LDLM_SYNC; /* force to be sync in user space */
 #endif
-        count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
-        if (sync == LDLM_ASYNC) {
-                rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
-                if (rc == 0)
-                        RETURN(count);
-        }
+        /* Just prepare the list of locks, do not actually cancel them yet.
+         * Locks are cancelled later in a separate thread. */
+        count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
+        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, mode);
+        if (rc == 0)
+                RETURN(count);
 
-        /* If an error occured in ASYNC mode, or this is SYNC mode,
-         * cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
-        RETURN(count);
+        RETURN(0);
 }
 
 /* Find and cancel locally unused locks found on resource, matched to the
@@ -1615,7 +1629,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                                cfs_list_t *cancels,
                                ldlm_policy_data_t *policy,
                                ldlm_mode_t mode, int lock_flags,
-                               int cancel_flags, void *opaque)
+                               ldlm_cancel_flags_t cancel_flags, void *opaque)
 {
         struct ldlm_lock *lock;
         int count = 0;
@@ -1630,13 +1644,8 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                         continue;
                 }
 
-                if (lock->l_readers || lock->l_writers) {
-                        if (cancel_flags & LDLM_FL_WARN) {
-                                LDLM_ERROR(lock, "lock in use");
-                                //LBUG();
-                        }
+                if (lock->l_readers || lock->l_writers)
                         continue;
-                }
 
                 /* If somebody is already doing CANCEL, or blocking ast came,
                  * skip this lock. */
@@ -1665,7 +1674,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
         }
         unlock_res(res);
 
-        RETURN(ldlm_cancel_list(cancels, count, cancel_flags));
+        RETURN(ldlm_cli_cancel_list_local(cancels, count, cancel_flags));
 }
 
 /* If @req is NULL, send CANCEL request to server with handles of locks
@@ -1675,7 +1684,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
  * buffer at the offset @off.
  * Destroy @cancels at the end. */
 int ldlm_cli_cancel_list(cfs_list_t *cancels, int count,
-                         struct ptlrpc_request *req, int flags)
+                         struct ptlrpc_request *req, ldlm_cancel_flags_t flags)
 {
         struct ldlm_lock *lock;
         int res = 0;
@@ -1723,7 +1732,9 @@ int ldlm_cli_cancel_list(cfs_list_t *cancels, int count,
 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                                     const struct ldlm_res_id *res_id,
                                     ldlm_policy_data_t *policy,
-                                    ldlm_mode_t mode, int flags, void *opaque)
+                                    ldlm_mode_t mode,
+                                    ldlm_cancel_flags_t flags,
+                                    void *opaque)
 {
         struct ldlm_resource *res;
         CFS_LIST_HEAD(cancels);
@@ -1765,11 +1776,11 @@ static inline int have_no_nsresource(struct ldlm_namespace *ns)
 /* Cancel all locks on a namespace (or a specific resource, if given)
  * that have 0 readers/writers.
  *
- * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
+ * If flags & LCF_LOCAL, throw the locks away without trying
  * to notify the server. */
 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                            const struct ldlm_res_id *res_id,
-                           int flags, void *opaque)
+                           ldlm_cancel_flags_t flags, void *opaque)
 {
         int i;
         ENTRY;
@@ -1911,12 +1922,17 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
         RETURN(rc);
 }
 
-/* non-blocking function to manipulate a lock whose cb_data is being put away.*/
-void ldlm_resource_iterate(struct ldlm_namespace *ns,
-                           const struct ldlm_res_id *res_id,
-                           ldlm_iterator_t iter, void *data)
+/* non-blocking function to manipulate a lock whose cb_data is being put away.
+ * return  0:  find no resource
+ *       > 0:  must be LDLM_ITER_STOP/LDLM_ITER_CONTINUE.
+ *       < 0:  errors
+ */
+int ldlm_resource_iterate(struct ldlm_namespace *ns,
+                          const struct ldlm_res_id *res_id,
+                          ldlm_iterator_t iter, void *data)
 {
         struct ldlm_resource *res;
+        int rc;
         ENTRY;
 
         if (ns == NULL) {
@@ -1925,16 +1941,14 @@ void ldlm_resource_iterate(struct ldlm_namespace *ns,
         }
 
         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
-        if (res == NULL) {
-                EXIT;
-                return;
-        }
+        if (res == NULL)
+                RETURN(0);
 
         LDLM_RESOURCE_ADDREF(res);
-        ldlm_resource_foreach(res, iter, data);
+        rc = ldlm_resource_foreach(res, iter, data);
         LDLM_RESOURCE_DELREF(res);
         ldlm_resource_putref(res);
-        EXIT;
+        RETURN(rc);
 }
 
 /* Lock replay */
@@ -2102,6 +2116,10 @@ int ldlm_replay_locks(struct obd_import *imp)
 
         LASSERT(cfs_atomic_read(&imp->imp_replay_inflight) == 0);
 
+        /* don't replay locks if import failed recovery */
+        if (imp->imp_vbr_failed)
+                RETURN(0);
+
         /* ensure this doesn't fall to 0 before all have been queued */
         cfs_atomic_inc(&imp->imp_replay_inflight);