Whamcloud - gitweb
LU-11276 ldlm: fix lock convert races 66/36466/11
authorVitaly Fertman <c17818@cray.com>
Wed, 16 Oct 2019 16:07:56 +0000 (19:07 +0300)
committerOleg Drokin <green@whamcloud.com>
Thu, 23 Jan 2020 06:19:46 +0000 (06:19 +0000)
The blocking cb may be triggered in parallel and the convert logic
of the DOM lock must be ready that the cancel_bits could be already
zeroed by the first executor.

As there may be several blocking cb parallel executors and several
conversion callers, each requesting for different inode bits, setup
the following logic:
- the lock keeps the aggregated set of bits requested for cancelling
  by different parties, where 0 means the whole lock is to be
  cancelled, and where the CBPENDING flag means there is a canceling
  job pending;
- once completed, the cancel_bits are zeroed and the CBPENDING flag
  is dropped, meaning the next request will be a part of the next job;
- once a local lock is converted, its state is changed appropriately
  and no cleanup is left for the interpret time as the lock is ready
  for the next usage;
- as the lock is unlocked in a process of conversion and more bits
  may appear, check it and repeat appropriately;
- let just 1 conversion executor to work at a time, others are waiting
  similar to ldlm_cli_cancel();
- there are others who may want to cancel unused locks (cancel_lru,
  cancel_resource_local), consider CANCELING as a request to cancel
  the full lock independently of the cancel_bits;

Some cleanups are done:
- move the cache drop logic to the CANCELING part of the blocking cb
  from the BLOCKING one;
- remove the convert RPC interpret, as the lock cleanups are already
  done in advance; the convert RPC is re-sendable and an error means
  there is a serioes net problem;

Test-Parameters: testlist=racer,racer,racer
Signed-off-by: Vitaly Fertman <c17818@cray.com>
Change-Id: I901de34241704ed801152f071cb7f610fe6f4bfe
Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/36466
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andriy Skulysh <c17819@cray.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/namei.c

index 4f285f8..7f1fef3 100644 (file)
@@ -676,7 +676,6 @@ enum ldlm_cancel_flags {
        LCF_ASYNC       = 0x1, /* Cancel locks asynchronously. */
        LCF_LOCAL       = 0x2, /* Cancel locks locally, not notifing server */
        LCF_BL_AST      = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */
-       LCF_CONVERT     = 0x8, /* Try to convert IBITS lock before cancel */
 };
 
 struct ldlm_flock {
@@ -1694,7 +1693,9 @@ int ldlm_cli_enqueue_local(const struct lu_env *env,
                           void *data, __u32 lvb_len, enum lvb_type lvb_type,
                           const __u64 *client_cookie,
                           struct lustre_handle *lockh);
-int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags);
+int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits);
+int ldlm_cli_convert(struct ldlm_lock *lock,
+                    enum ldlm_cancel_flags cancel_flags);
 int ldlm_cli_update_pool(struct ptlrpc_request *req);
 int ldlm_cli_cancel(const struct lustre_handle *lockh,
                    enum ldlm_cancel_flags cancel_flags);
@@ -1720,8 +1721,8 @@ int ldlm_cli_cancel_list(struct list_head *head, int count,
                         enum ldlm_cancel_flags flags);
 
 int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop);
-int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits);
-int ldlm_cli_dropbits_list(struct list_head *converts, __u64 drop_bits);
+int ldlm_cli_inodebits_convert(struct ldlm_lock *lock,
+                              enum ldlm_cancel_flags cancel_flags);
 
 /** @} ldlm_cli_api */
 
index c8c9fa5..095c6e5 100644 (file)
@@ -382,7 +382,10 @@ void ldlm_ibits_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
                                     union ldlm_policy_data *lpolicy)
 {
        lpolicy->l_inodebits.bits = wpolicy->l_inodebits.bits;
-       lpolicy->l_inodebits.try_bits = wpolicy->l_inodebits.try_bits;
+       /**
+        * try_bits are to be handled outside of generic write_to_local due
+        * to different behavior on a server and client.
+        */
 }
 
 void ldlm_ibits_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
@@ -424,93 +427,103 @@ int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop)
 EXPORT_SYMBOL(ldlm_inodebits_drop);
 
 /* convert single lock */
-int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits)
+int ldlm_cli_inodebits_convert(struct ldlm_lock *lock,
+                              enum ldlm_cancel_flags cancel_flags)
 {
-       struct lustre_handle lockh;
+       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+       struct ldlm_lock_desc ld = { { 0 } };
+       __u64 drop_bits, new_bits;
        __u32 flags = 0;
        int rc;
 
        ENTRY;
 
-       LASSERT(drop_bits);
-       LASSERT(!lock->l_readers && !lock->l_writers);
+       check_res_locked(lock->l_resource);
 
-       LDLM_DEBUG(lock, "client lock convert START");
+       /* Lock is being converted already */
+       if (ldlm_is_converting(lock)) {
+               if (!(cancel_flags & LCF_ASYNC)) {
+                       struct l_wait_info lwi = { 0 };
 
-       ldlm_lock2handle(lock, &lockh);
-       lock_res_and_lock(lock);
-       /* check if all bits are blocked */
-       if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) {
-               unlock_res_and_lock(lock);
-               /* return error to continue with cancel */
-               GOTO(exit, rc = -EINVAL);
+                       unlock_res_and_lock(lock);
+                       l_wait_event(lock->l_waitq,
+                                    is_lock_converted(lock), &lwi);
+                       lock_res_and_lock(lock);
+               }
+               RETURN(0);
        }
 
-       /* check if no common bits, consider this as successful convert */
-       if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
-               unlock_res_and_lock(lock);
-               GOTO(exit, rc = 0);
-       }
+       /* lru_cancel may happen in parallel and call ldlm_cli_cancel_list()
+        * independently.
+        */
+       if (ldlm_is_canceling(lock))
+               RETURN(-EINVAL);
 
-       /* check if there is race with cancel */
-       if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) {
-               unlock_res_and_lock(lock);
-               GOTO(exit, rc = -EINVAL);
-       }
+       /* no need in only local convert */
+       if (lock->l_flags & (LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL_ON_BLOCK))
+               RETURN(-EINVAL);
 
-       /* clear cbpending flag early, it is safe to match lock right after
-        * client convert because it is downgrade always.
-        */
-       ldlm_clear_cbpending(lock);
-       ldlm_clear_bl_ast(lock);
+       drop_bits = lock->l_policy_data.l_inodebits.cancel_bits;
+       /* no cancel bits - means that caller needs full cancel */
+       if (drop_bits == 0)
+               RETURN(-EINVAL);
+
+       new_bits = lock->l_policy_data.l_inodebits.bits & ~drop_bits;
+       /* check if all lock bits are dropped, proceed with cancel */
+       if (!new_bits)
+               RETURN(-EINVAL);
+
+       /* check if no dropped bits, consider this as successful convert */
+       if (lock->l_policy_data.l_inodebits.bits == new_bits)
+               RETURN(0);
 
-       /* If lock is being converted already, check drop bits first */
-       if (ldlm_is_converting(lock)) {
-               /* raced lock convert, lock inodebits are remaining bits
-                * so check if they are conflicting with new convert or not.
-                */
-               if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
-                       unlock_res_and_lock(lock);
-                       GOTO(exit, rc = 0);
-               }
-               /* Otherwise drop new conflicting bits in new convert */
-       }
        ldlm_set_converting(lock);
-       /* from all bits of blocking lock leave only conflicting */
-       drop_bits &= lock->l_policy_data.l_inodebits.bits;
-       /* save them in cancel_bits, so l_blocking_ast will know
-        * which bits from the current lock were dropped. */
-       lock->l_policy_data.l_inodebits.cancel_bits = drop_bits;
-       /* Finally clear these bits in lock ibits */
-       ldlm_inodebits_drop(lock, drop_bits);
-       unlock_res_and_lock(lock);
        /* Finally call cancel callback for remaining bits only.
         * It is important to have converting flag during that
         * so blocking_ast callback can distinguish convert from
         * cancels.
         */
-       if (lock->l_blocking_ast)
-               lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
-                                    LDLM_CB_CANCELING);
-
+       ld.l_policy_data.l_inodebits.cancel_bits = drop_bits;
+       unlock_res_and_lock(lock);
+       lock->l_blocking_ast(lock, &ld, lock->l_ast_data, LDLM_CB_CANCELING);
        /* now notify server about convert */
-       rc = ldlm_cli_convert(lock, &flags);
-       if (rc) {
-               lock_res_and_lock(lock);
-               if (ldlm_is_converting(lock)) {
-                       ldlm_clear_converting(lock);
-                       ldlm_set_cbpending(lock);
-                       ldlm_set_bl_ast(lock);
-               }
-               unlock_res_and_lock(lock);
-               GOTO(exit, rc);
-       }
-       EXIT;
-exit:
-       LDLM_DEBUG(lock, "client lock convert END");
-       return rc;
-}
+       rc = ldlm_cli_convert_req(lock, &flags, new_bits);
+       lock_res_and_lock(lock);
+       if (rc)
+               GOTO(full_cancel, rc);
+
+       /* Finally clear these bits in lock ibits */
+       ldlm_inodebits_drop(lock, drop_bits);
 
+       /* Being locked again check if lock was canceled, it is important
+        * to do and don't drop cbpending below
+        */
+       if (ldlm_is_canceling(lock))
+               GOTO(full_cancel, rc = -EINVAL);
+
+       /* also check again if more bits to be cancelled appeared */
+       if (drop_bits != lock->l_policy_data.l_inodebits.cancel_bits)
+               GOTO(clear_converting, rc = -EAGAIN);
+
+       /* clear cbpending flag early, it is safe to match lock right after
+        * client convert because it is downgrade always.
+        */
+       ldlm_clear_cbpending(lock);
+       ldlm_clear_bl_ast(lock);
+       spin_lock(&ns->ns_lock);
+       if (list_empty(&lock->l_lru))
+               ldlm_lock_add_to_lru_nolock(lock);
+       spin_unlock(&ns->ns_lock);
+
+       /* the job is done, zero the cancel_bits. If more conflicts appear,
+        * it will result in another cycle of ldlm_cli_inodebits_convert().
+        */
+full_cancel:
+       lock->l_policy_data.l_inodebits.cancel_bits = 0;
+clear_converting:
+       ldlm_clear_converting(lock);
+       RETURN(rc);
+}
 
 int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock)
 {
index 15789bc..be72ad1 100644 (file)
@@ -192,6 +192,7 @@ int ldlm_bl_thread_wakeup(void);
 
 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
+void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
 
 #ifdef HAVE_SERVER_SUPPORT
 /* ldlm_plain.c */
@@ -385,6 +386,17 @@ static inline bool is_bl_done(struct ldlm_lock *lock)
        return bl_done;
 }
 
+static inline bool is_lock_converted(struct ldlm_lock *lock)
+{
+       bool ret = 0;
+
+       lock_res_and_lock(lock);
+       ret = (lock->l_policy_data.l_inodebits.cancel_bits == 0);
+       unlock_res_and_lock(lock);
+
+       return ret;
+}
+
 typedef void (*ldlm_policy_wire_to_local_t)(const union ldlm_wire_policy_data *,
                                            union ldlm_policy_data *);
 typedef void (*ldlm_policy_local_to_wire_t)(const union ldlm_policy_data *,
index 12b9034..e208689 100644 (file)
@@ -1359,6 +1359,9 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                                     &lock->l_policy_data);
        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                lock->l_req_extent = lock->l_policy_data.l_extent;
+       else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
+               lock->l_policy_data.l_inodebits.try_bits =
+                       dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
 
 existing_lock:
        cookie = req;
@@ -1589,6 +1592,8 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
        struct obd_export *exp = req->rq_export;
        struct ldlm_reply *dlm_rep;
        struct ldlm_lock *lock;
+       __u64 bits;
+       __u64 new_bits;
        int rc;
 
        ENTRY;
@@ -1605,62 +1610,61 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
        dlm_rep->lock_flags = dlm_req->lock_flags;
 
        lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
-       if (lock) {
-               __u64 bits;
-               __u64 new;
+       if (!lock) {
+               LDLM_DEBUG_NOLOCK("server lock is canceled already");
+               req->rq_status = ELDLM_NO_LOCK_DATA;
+               RETURN(0);
+       }
 
-               bits = lock->l_policy_data.l_inodebits.bits;
-               new = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
-               LDLM_DEBUG(lock, "server-side convert handler START");
+       LDLM_DEBUG(lock, "server-side convert handler START");
 
-               if (ldlm_is_cancel(lock)) {
-                       LDLM_ERROR(lock, "convert on canceled lock!");
-                       rc = ELDLM_NO_LOCK_DATA;
-               } else if (dlm_req->lock_desc.l_req_mode !=
-                          lock->l_granted_mode) {
-                       LDLM_ERROR(lock, "lock mode differs!");
-                       rc = ELDLM_NO_LOCK_DATA;
-               } else if (bits == new) {
-                       /*
-                        * This can be valid situation if CONVERT RPCs are
-                        * re-ordered. Just finish silently
-                        */
-                       LDLM_DEBUG(lock, "lock is converted already!");
-                       rc = ELDLM_OK;
-               } else {
-                       lock_res_and_lock(lock);
-                       if (ldlm_is_waited(lock))
-                               ldlm_del_waiting_lock(lock);
+       lock_res_and_lock(lock);
+       bits = lock->l_policy_data.l_inodebits.bits;
+       new_bits = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
 
-                       ldlm_clear_cbpending(lock);
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-                       ldlm_inodebits_drop(lock, bits & ~new);
+       if (ldlm_is_cancel(lock)) {
+               LDLM_DEBUG(lock, "convert on canceled lock!");
+               unlock_res_and_lock(lock);
+               GOTO(out_put, rc = ELDLM_NO_LOCK_DATA);
+       }
 
-                       ldlm_clear_blocking_data(lock);
-                       unlock_res_and_lock(lock);
+       if (dlm_req->lock_desc.l_req_mode != lock->l_granted_mode) {
+               LDLM_ERROR(lock, "lock mode differs!");
+               unlock_res_and_lock(lock);
+               GOTO(out_put, rc = -EPROTO);
+       }
 
-                       ldlm_reprocess_all(lock->l_resource, NULL);
-                       rc = ELDLM_OK;
-               }
+       if (bits == new_bits) {
+               /*
+                * This can be valid situation if CONVERT RPCs are
+                * re-ordered. Just finish silently
+                */
+               LDLM_DEBUG(lock, "lock is converted already!");
+               unlock_res_and_lock(lock);
+       } else {
+               if (ldlm_is_waited(lock))
+                       ldlm_del_waiting_lock(lock);
 
-               if (rc == ELDLM_OK) {
-                       dlm_rep->lock_handle = lock->l_remote_handle;
-                       ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
-                                       &dlm_rep->lock_desc.l_policy_data);
-               }
+               ldlm_clear_cbpending(lock);
+               lock->l_policy_data.l_inodebits.cancel_bits = 0;
+               ldlm_inodebits_drop(lock, bits & ~new_bits);
 
-               LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
-                          rc);
-               LDLM_LOCK_PUT(lock);
-       } else {
-               rc = ELDLM_NO_LOCK_DATA;
-               LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
-                                 rc);
+               ldlm_clear_blocking_data(lock);
+               unlock_res_and_lock(lock);
+
+               ldlm_reprocess_all(lock->l_resource, NULL);
        }
 
+       dlm_rep->lock_handle = lock->l_remote_handle;
+       ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
+                                       &dlm_rep->lock_desc.l_policy_data);
+       rc = ELDLM_OK;
+       EXIT;
+out_put:
+       LDLM_DEBUG(lock, "server-side convert handler END, rc = %d", rc);
+       LDLM_LOCK_PUT(lock);
        req->rq_status = rc;
-
-       RETURN(0);
+       return 0;
 }
 
 /**
@@ -1798,23 +1802,17 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 #endif /* HAVE_SERVER_SUPPORT */
 
 /**
- * Callback handler for receiving incoming blocking ASTs.
- *
- * This can only happen on client side.
+ * Server may pass additional information about blocking lock.
+ * For IBITS locks it is conflicting bits which can be used for
+ * lock convert instead of cancel.
  */
-void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
-                            struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
 {
-       int do_ast;
-
-       ENTRY;
-
-       LDLM_DEBUG(lock, "client blocking AST callback handler");
-
-       lock_res_and_lock(lock);
+       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
-       /* set bits to cancel for this lock for possible lock convert */
-       if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) {
+       check_res_locked(lock->l_resource);
+       if (ns_is_client(ns) && ld &&
+           (lock->l_resource->lr_type == LDLM_IBITS)) {
                /*
                 * Lock description contains policy of blocking lock,
                 * and its cancel_bits is used to pass conflicting bits.
@@ -1826,18 +1824,43 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
                 * never use cancel bits from different resource, full cancel
                 * is to be used.
                 */
-               if (ld && ld->l_policy_data.l_inodebits.bits &&
+               if (ld->l_policy_data.l_inodebits.cancel_bits &&
                    ldlm_res_eq(&ld->l_resource.lr_name,
-                               &lock->l_resource->lr_name))
-                       lock->l_policy_data.l_inodebits.cancel_bits =
+                               &lock->l_resource->lr_name) &&
+                   !(ldlm_is_cbpending(lock) &&
+                     lock->l_policy_data.l_inodebits.cancel_bits == 0)) {
+                       /* always combine conflicting ibits */
+                       lock->l_policy_data.l_inodebits.cancel_bits |=
                                ld->l_policy_data.l_inodebits.cancel_bits;
-               /*
-                * if there is no valid ld and lock is cbpending already
-                * then cancel_bits should be kept, otherwise it is zeroed.
-                */
-               else if (!ldlm_is_cbpending(lock))
+               } else {
+                       /* If cancel_bits are not obtained or
+                        * if the lock is already CBPENDING and
+                        * has no cancel_bits set
+                        * - the full lock is to be cancelled
+                        */
                        lock->l_policy_data.l_inodebits.cancel_bits = 0;
+               }
        }
+}
+
+/**
+ * Callback handler for receiving incoming blocking ASTs.
+ *
+ * This can only happen on client side.
+ */
+void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+                            struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+{
+       int do_ast;
+
+       ENTRY;
+
+       LDLM_DEBUG(lock, "client blocking AST callback handler");
+
+       lock_res_and_lock(lock);
+
+       /* get extra information from desc if any */
+       ldlm_bl_desc2lock(ld, lock);
        ldlm_set_cbpending(lock);
 
        do_ast = (!lock->l_readers && !lock->l_writers);
@@ -1959,6 +1982,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 * Let ldlm_cancel_lru() be fast.
                 */
                ldlm_lock_remove_from_lru(lock);
+               ldlm_bl_desc2lock(&dlm_req->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
                LDLM_DEBUG(lock, "completion AST includes blocking AST");
        }
@@ -2011,6 +2035,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                                    struct ldlm_request *dlm_req,
                                    struct ldlm_lock *lock)
 {
+       struct ldlm_lock_desc *ld = &dlm_req->lock_desc;
        int rc = -ENOSYS;
 
        ENTRY;
@@ -2034,8 +2059,15 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                        ktime_add(lock->l_last_used,
                                  ktime_set(ns->ns_dirty_age_limit, 0)))) {
                unlock_res_and_lock(lock);
-               if (ldlm_bl_to_thread_lock(ns, NULL, lock))
-                       ldlm_handle_bl_callback(ns, NULL, lock);
+
+               /* For MDS glimpse it is always DOM lock, set corresponding
+                * cancel_bits to perform lock convert if needed
+                */
+               if (lock->l_resource->lr_type == LDLM_IBITS)
+                       ld->l_policy_data.l_inodebits.cancel_bits =
+                                                       MDS_INODELOCK_DOM;
+               if (ldlm_bl_to_thread_lock(ns, ld, lock))
+                       ldlm_handle_bl_callback(ns, ld, lock);
 
                EXIT;
                return;
index ac40691..691f10c 100644 (file)
@@ -735,6 +735,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        if ((*flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
+               ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
                unlock_res_and_lock(lock);
                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
@@ -1134,128 +1135,6 @@ out:
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 
 /**
- * Client-side lock convert reply handling.
- *
- * Finish client lock converting, checks for concurrent converts
- * and clear 'converting' flag so lock can be placed back into LRU.
- */
-static int lock_convert_interpret(const struct lu_env *env,
-                                 struct ptlrpc_request *req,
-                                 void *args, int rc)
-{
-       struct ldlm_async_args *aa = args;
-       struct ldlm_lock *lock;
-       struct ldlm_reply *reply;
-
-       ENTRY;
-
-       lock = ldlm_handle2lock(&aa->lock_handle);
-       if (!lock) {
-               LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
-                       aa->lock_handle.cookie);
-               RETURN(-ESTALE);
-       }
-
-       LDLM_DEBUG(lock, "CONVERTED lock:");
-
-       if (rc != ELDLM_OK)
-               GOTO(out, rc);
-
-       reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
-       if (reply == NULL)
-               GOTO(out, rc = -EPROTO);
-
-       if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
-               LDLM_ERROR(lock,
-                          "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n",
-                          aa->lock_handle.cookie, reply->lock_handle.cookie,
-                          req->rq_export->exp_client_uuid.uuid,
-                          libcfs_id2str(req->rq_peer));
-               GOTO(out, rc = ELDLM_NO_LOCK_DATA);
-       }
-
-       lock_res_and_lock(lock);
-       /*
-        * Lock convert is sent for any new bits to drop, the converting flag
-        * is dropped when ibits on server are the same as on client. Meanwhile
-        * that can be so that more later convert will be replied first with
-        * and clear converting flag, so in case of such race just exit here.
-        * if lock has no converting bits then
-        */
-       if (!ldlm_is_converting(lock)) {
-               LDLM_DEBUG(lock,
-                          "convert ACK for lock without converting flag, reply ibits %#llx",
-                          reply->lock_desc.l_policy_data.l_inodebits.bits);
-       } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
-                  lock->l_policy_data.l_inodebits.bits) {
-               /*
-                * Compare server returned lock ibits and local lock ibits
-                * if they are the same we consider convertion is done,
-                * otherwise we have more converts inflight and keep
-                * converting flag.
-                */
-               LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
-                          reply->lock_desc.l_policy_data.l_inodebits.bits);
-       } else {
-               ldlm_clear_converting(lock);
-
-               /*
-                * Concurrent BL AST may arrive and cause another convert
-                * or cancel so just do nothing here if bl_ast is set,
-                * finish with convert otherwise.
-                */
-               if (!ldlm_is_bl_ast(lock)) {
-                       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
-
-                       /*
-                        * Drop cancel_bits since there are no more converts
-                        * and put lock into LRU if it is still not used and
-                        * is not there yet.
-                        */
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-                       if (!lock->l_readers && !lock->l_writers &&
-                           !ldlm_is_canceling(lock)) {
-                               spin_lock(&ns->ns_lock);
-                               /* there is check for list_empty() inside */
-                               ldlm_lock_remove_from_lru_nolock(lock);
-                               ldlm_lock_add_to_lru_nolock(lock);
-                               spin_unlock(&ns->ns_lock);
-                       }
-               }
-       }
-       unlock_res_and_lock(lock);
-out:
-       if (rc) {
-               int flag;
-
-               lock_res_and_lock(lock);
-               if (ldlm_is_converting(lock)) {
-                       ldlm_clear_converting(lock);
-                       ldlm_set_cbpending(lock);
-                       ldlm_set_bl_ast(lock);
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-               }
-               unlock_res_and_lock(lock);
-
-               /*
-                * fallback to normal lock cancel. If rc means there is no
-                * valid lock on server, do only local cancel
-                */
-               if (rc == ELDLM_NO_LOCK_DATA)
-                       flag = LCF_LOCAL;
-               else
-                       flag = LCF_ASYNC;
-
-               rc = ldlm_cli_cancel(&aa->lock_handle, flag);
-               if (rc < 0)
-                       LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
-                                  rc);
-       }
-       LDLM_LOCK_PUT(lock);
-       RETURN(rc);
-}
-
-/**
  * Client-side IBITS lock convert.
  *
  * Inform server that lock has been converted instead of canceling.
@@ -1267,19 +1146,15 @@ out:
  * is made asynchronous.
  *
  */
-int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
+int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits)
 {
        struct ldlm_request *body;
        struct ptlrpc_request *req;
-       struct ldlm_async_args *aa;
        struct obd_export *exp = lock->l_conn_export;
 
        ENTRY;
 
-       if (exp == NULL) {
-               LDLM_ERROR(lock, "convert must not be called on local locks.");
-               RETURN(-EINVAL);
-       }
+       LASSERT(exp != NULL);
 
        /*
         * this is better to check earlier and it is done so already,
@@ -1310,8 +1185,7 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
        body->lock_desc.l_req_mode = lock->l_req_mode;
        body->lock_desc.l_granted_mode = lock->l_granted_mode;
 
-       body->lock_desc.l_policy_data.l_inodebits.bits =
-                                       lock->l_policy_data.l_inodebits.bits;
+       body->lock_desc.l_policy_data.l_inodebits.bits = new_bits;
        body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0;
 
        body->lock_flags = ldlm_flags_to_wire(*flags);
@@ -1331,10 +1205,6 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
                lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                     LDLM_CONVERT - LDLM_FIRST_OPC);
 
-       aa = ptlrpc_req_async_args(aa, req);
-       ldlm_lock2handle(lock, &aa->lock_handle);
-       req->rq_interpret_reply = lock_convert_interpret;
-
        ptlrpcd_add_req(req);
        RETURN(0);
 }
@@ -1587,6 +1457,27 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        RETURN(0);
 }
 
+int ldlm_cli_convert(struct ldlm_lock *lock,
+                    enum ldlm_cancel_flags cancel_flags)
+{
+       int rc = -EINVAL;
+
+       LASSERT(!lock->l_readers && !lock->l_writers);
+       LDLM_DEBUG(lock, "client lock convert START");
+
+       if (lock->l_resource->lr_type == LDLM_IBITS) {
+               lock_res_and_lock(lock);
+               do {
+                       rc = ldlm_cli_inodebits_convert(lock, cancel_flags);
+               } while (rc == -EAGAIN);
+               unlock_res_and_lock(lock);
+       }
+
+       LDLM_DEBUG(lock, "client lock convert END");
+       RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_cli_convert);
+
 /**
  * Client side lock cancel.
  *
@@ -1611,20 +1502,9 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                RETURN(0);
        }
 
-       /* Convert lock bits instead of cancel for IBITS locks */
-       if (cancel_flags & LCF_CONVERT) {
-               LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
-               LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0);
-
-               rc = ldlm_cli_dropbits(lock,
-                               lock->l_policy_data.l_inodebits.cancel_bits);
-               if (rc == 0) {
-                       LDLM_LOCK_RELEASE(lock);
-                       RETURN(0);
-               }
-       }
-
        lock_res_and_lock(lock);
+       LASSERT(!ldlm_is_converting(lock));
+
        /* Lock is being canceled and the caller doesn't want to wait */
        if (ldlm_is_canceling(lock)) {
                if (cancel_flags & LCF_ASYNC) {
@@ -1637,16 +1517,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                RETURN(0);
        }
 
-       /*
-        * Lock is being converted, cancel it immediately.
-        * When convert will end, it releases lock and it will be gone.
-        */
-       if (ldlm_is_converting(lock)) {
-               /* set back flags removed by convert */
-               ldlm_set_cbpending(lock);
-               ldlm_set_bl_ast(lock);
-       }
-
        ldlm_set_canceling(lock);
        unlock_res_and_lock(lock);
 
@@ -2018,8 +1888,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                        /* No locks which got blocking requests. */
                        LASSERT(!ldlm_is_bl_ast(lock));
 
-                       if (!ldlm_is_canceling(lock) &&
-                           !ldlm_is_converting(lock))
+                       if (!ldlm_is_canceling(lock))
                                break;
 
                        /*
@@ -2077,7 +1946,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
 
                lock_res_and_lock(lock);
                /* Check flags again under the lock. */
-               if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) ||
+               if (ldlm_is_canceling(lock) ||
                    ldlm_lock_remove_from_lru_check(lock, last_use) == 0) {
                        /*
                         * Another thread is removing lock from LRU, or
@@ -2207,11 +2076,10 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                        continue;
 
                /*
-                * If somebody is already doing CANCEL, or blocking AST came,
-                * or lock is being converted then skip this lock.
+                * If somebody is already doing CANCEL, or blocking AST came
+                * then skip this lock.
                 */
-               if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) ||
-                   ldlm_is_converting(lock))
+               if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock))
                        continue;
 
                if (lockmode_compat(lock->l_granted_mode, mode))
@@ -2237,7 +2105,6 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                /* See CBPENDING comment in ldlm_cancel_lru */
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
                                 lock_flags;
-
                LASSERT(list_empty(&lock->l_bl_ast));
                list_add(&lock->l_bl_ast, cancels);
                LDLM_LOCK_GET(lock);
index fc406a9..5628adf 100644 (file)
@@ -430,11 +430,10 @@ int ll_md_need_convert(struct ldlm_lock *lock)
        return !!(bits);
 }
 
-int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld,
                       void *data, int flag)
 {
        struct lustre_handle lockh;
-       __u64 bits = lock->l_policy_data.l_inodebits.bits;
        int rc;
 
        ENTRY;
@@ -444,17 +443,21 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
        {
                __u64 cancel_flags = LCF_ASYNC;
 
-               if (ll_md_need_convert(lock)) {
-                       cancel_flags |= LCF_CONVERT;
-                       /* For lock convert some cancel actions may require
-                        * this lock with non-dropped canceled bits, e.g. page
-                        * flush for DOM lock. So call ll_lock_cancel_bits()
-                        * here while canceled bits are still set.
-                        */
-                       bits = lock->l_policy_data.l_inodebits.cancel_bits;
-                       if (bits & MDS_INODELOCK_DOM)
-                               ll_lock_cancel_bits(lock, MDS_INODELOCK_DOM);
+               /* if lock convert is not needed then still have to
+                * pass lock via ldlm_cli_convert() to keep all states
+                * correct, set cancel_bits to full lock bits to cause
+                * full cancel to happen.
+                */
+               if (!ll_md_need_convert(lock)) {
+                       lock_res_and_lock(lock);
+                       lock->l_policy_data.l_inodebits.cancel_bits =
+                                       lock->l_policy_data.l_inodebits.bits;
+                       unlock_res_and_lock(lock);
                }
+               rc = ldlm_cli_convert(lock, cancel_flags);
+               if (!rc)
+                       RETURN(0);
+               /* continue with cancel otherwise */
                ldlm_lock2handle(lock, &lockh);
                rc = ldlm_cli_cancel(&lockh, cancel_flags);
                if (rc < 0) {
@@ -464,24 +467,34 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                break;
        }
        case LDLM_CB_CANCELING:
+       {
+               __u64 to_cancel = lock->l_policy_data.l_inodebits.bits;
+
                /* Nothing to do for non-granted locks */
                if (!ldlm_is_granted(lock))
                        break;
 
-               if (ldlm_is_converting(lock)) {
-                       /* this is called on already converted lock, so
-                        * ibits has remained bits only and cancel_bits
-                        * are bits that were dropped.
-                        * Note that DOM lock is handled prior lock convert
-                        * and is excluded here.
+               /* If 'ld' is supplied then bits to be cancelled are passed
+                * implicitly by lock converting and cancel_bits from 'ld'
+                * should be used. Otherwise full cancel is being performed
+                * and lock inodebits are used.
+                *
+                * Note: we cannot rely on cancel_bits in lock itself at this
+                * moment because they can be changed by concurrent thread,
+                * so ldlm_cli_inodebits_convert() pass cancel bits implicitly
+                * in 'ld' parameter.
+                */
+               if (ld) {
+                       /* partial bits cancel allowed only during convert */
+                       LASSERT(ldlm_is_converting(lock));
+                       /* mask cancel bits by lock bits so only no any unused
+                        * bits are passed to ll_lock_cancel_bits()
                         */
-                       bits = lock->l_policy_data.l_inodebits.cancel_bits &
-                               ~MDS_INODELOCK_DOM;
-               } else {
-                       LASSERT(ldlm_is_canceling(lock));
+                       to_cancel &= ld->l_policy_data.l_inodebits.cancel_bits;
                }
-               ll_lock_cancel_bits(lock, bits);
+               ll_lock_cancel_bits(lock, to_cancel);
                break;
+       }
        default:
                LBUG();
        }