Whamcloud - gitweb
LU-10175 ldlm: IBITS lock convert instead of cancel 02/30202/15
authorMikhal Pershin <mike.pershin@intel.com>
Fri, 3 Nov 2017 07:08:19 +0000 (10:08 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 6 May 2018 03:57:34 +0000 (03:57 +0000)
For IBITS lock it is possible to drop just conflicting
bits and keep lock itself instead of cancelling it.
Lock convert is only bits downgrade on client and then
on server.
Patch implements lock convert during blocking AST.

Test-Parameters: mdssizegb=20 testlist=sanity-dom,dom-performance,racer,racer,racer
Signed-off-by: Mikhal Pershin <mike.pershin@intel.com>
Change-Id: Id073f14ec3283479a9a48fd80090ecd3b0967b6d
Reviewed-on: https://review.whamcloud.com/30202
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
12 files changed:
lustre/include/lustre_dlm.h
lustre/include/lustre_dlm_flags.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/namei.c
lustre/mdt/mdt_io.c
lustre/target/tgt_handler.c
lustre/tests/sanity.sh

index d8ecc3b..5fc4c5d 100644 (file)
@@ -649,6 +649,7 @@ enum ldlm_cancel_flags {
        LCF_ASYNC       = 0x1, /* Cancel locks asynchronously. */
        LCF_LOCAL       = 0x2, /* Cancel locks locally, not notifing server */
        LCF_BL_AST      = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */
+       LCF_CONVERT     = 0x8, /* Try to convert IBITS lock before cancel */
 };
 
 struct ldlm_flock {
@@ -1603,6 +1604,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                           void *data, __u32 lvb_len, enum lvb_type lvb_type,
                           const __u64 *client_cookie,
                           struct lustre_handle *lockh);
+int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags);
 int ldlm_cli_update_pool(struct ptlrpc_request *req);
 int ldlm_cli_cancel(const struct lustre_handle *lockh,
                    enum ldlm_cancel_flags cancel_flags);
@@ -1626,6 +1628,11 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
 int ldlm_cli_cancel_list(struct list_head *head, int count,
                         struct ptlrpc_request *req,
                         enum ldlm_cancel_flags flags);
+
+int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop);
+int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits);
+int ldlm_cli_dropbits_list(struct list_head *converts, __u64 drop_bits);
+
 /** @} ldlm_cli_api */
 
 /* mds/handler.c */
index dc978d4..865fdee 100644 (file)
@@ -26,7 +26,7 @@
 #ifndef LDLM_ALL_FLAGS_MASK
 
 /** l_flags bits marked as "all_flags" bits */
-#define LDLM_FL_ALL_FLAGS_MASK          0x00FFFFFFC08F932FULL
+#define LDLM_FL_ALL_FLAGS_MASK          0x00FFFFFFC28F932FULL
 
 /** extent, mode, or resource changed */
 #define LDLM_FL_LOCK_CHANGED            0x0000000000000001ULL // bit   0
 #define ldlm_set_cos_incompat(_l)      LDLM_SET_FLAG((_l), 1ULL << 24)
 #define ldlm_clear_cos_incompat(_l)    LDLM_CLEAR_FLAG((_l), 1ULL << 24)
 
+/*
+ * Flag indicates that lock is being converted (downgraded) during the blocking
+ * AST instead of cancelling. Used for IBITS locks now and drops conflicting
+ * bits only keepeing other.
+ */
+#define LDLM_FL_CONVERTING              0x0000000002000000ULL /* bit  25 */
+#define ldlm_is_converting(_l)          LDLM_TEST_FLAG((_l), 1ULL << 25)
+#define ldlm_set_converting(_l)         LDLM_SET_FLAG((_l), 1ULL << 25)
+#define ldlm_clear_converting(_l)       LDLM_CLEAR_FLAG((_l), 1ULL << 25)
+
 /**
  * Part of original lockahead implementation, OBD_CONNECT_LOCKAHEAD_OLD.
  * Reserved temporarily to allow those implementations to keep working.
index cf0281f..c9b467e 100644 (file)
@@ -2320,8 +2320,11 @@ static inline bool ldlm_extent_equal(const struct ldlm_extent *ex1,
 }
 
 struct ldlm_inodebits {
-        __u64 bits;
-       __u64 try_bits; /* optional bits to try */
+       __u64 bits;
+       union {
+               __u64 try_bits; /* optional bits to try */
+               __u64 cancel_bits; /* for lock convert */
+       };
 };
 
 struct ldlm_flock_wire {
index da561d8..269c49a 100644 (file)
@@ -213,11 +213,39 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
        check_res_locked(res);
 
        if (intention == LDLM_PROCESS_RESCAN) {
-               *err = ELDLM_LOCK_ABORTED;
+               struct list_head *bl_list;
+
+               if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                       bl_list = NULL;
+                       *err = ELDLM_LOCK_WOULDBLOCK;
+               } else {
+                       bl_list = work_list;
+                       *err = ELDLM_LOCK_ABORTED;
+               }
 
                LASSERT(lock->l_policy_data.l_inodebits.bits != 0);
 
-               rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL);
+               /* It is possible that some of granted locks was not canceled
+                * but converted and is kept in granted queue. So there is
+                * a window where lock with 'ast_sent' might become granted
+                * again. Meanwhile a new lock may appear in that window and
+                * conflicts with the converted lock so the following scenario
+                * is possible:
+                *
+                * 1) lock1 conflicts with lock2
+                * 2) bl_ast was sent for lock2
+                * 3) lock3 comes and conflicts with lock2 too
+                * 4) no bl_ast sent because lock2->l_bl_ast_sent is 1
+                * 5) lock2 was converted for lock1 but not for lock3
+                * 6) lock1 granted, lock3 still is waiting for lock2, but
+                *    there will never be another bl_ast for that
+                *
+                * To avoid this scenario the work_list is used below to collect
+                * any blocked locks from granted queue during every reprocess
+                * and bl_ast will be sent if needed.
+                */
+               rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock,
+                                                bl_list);
                if (!rc)
                        RETURN(LDLM_ITER_STOP);
                rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
@@ -282,7 +310,14 @@ void ldlm_ibits_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
        wpolicy->l_inodebits.try_bits = lpolicy->l_inodebits.try_bits;
 }
 
-int ldlm_inodebits_drop(struct ldlm_lock *lock,  __u64 to_drop)
+/**
+ * Attempt to convert already granted IBITS lock with several bits set to
+ * a lock with less bits (downgrade).
+ *
+ * Such lock conversion is used to keep lock with non-blocking bits instead of
+ * cancelling it, introduced for better support of DoM files.
+ */
+int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop)
 {
        ENTRY;
 
@@ -304,3 +339,84 @@ int ldlm_inodebits_drop(struct ldlm_lock *lock,  __u64 to_drop)
        RETURN(0);
 }
 EXPORT_SYMBOL(ldlm_inodebits_drop);
+
+/* convert single lock */
+int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits)
+{
+       struct lustre_handle lockh;
+       __u32 flags = 0;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(drop_bits);
+       LASSERT(!lock->l_readers && !lock->l_writers);
+
+       LDLM_DEBUG(lock, "client lock convert START");
+
+       ldlm_lock2handle(lock, &lockh);
+       lock_res_and_lock(lock);
+       /* check if all bits are cancelled */
+       if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) {
+               unlock_res_and_lock(lock);
+               /* return error to continue with cancel */
+               GOTO(exit, rc = -EINVAL);
+       }
+
+       /* check if there is race with cancel */
+       if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) {
+               unlock_res_and_lock(lock);
+               GOTO(exit, rc = -EINVAL);
+       }
+
+       /* clear cbpending flag early, it is safe to match lock right after
+        * client convert because it is downgrade always.
+        */
+       ldlm_clear_cbpending(lock);
+       ldlm_clear_bl_ast(lock);
+
+       /* If lock is being converted already, check drop bits first */
+       if (ldlm_is_converting(lock)) {
+               /* raced lock convert, lock inodebits are remaining bits
+                * so check if they are conflicting with new convert or not.
+                */
+               if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
+                       unlock_res_and_lock(lock);
+                       GOTO(exit, rc = 0);
+               }
+               /* Otherwise drop new conflicting bits in new convert */
+       }
+       ldlm_set_converting(lock);
+       /* from all bits of blocking lock leave only conflicting */
+       drop_bits &= lock->l_policy_data.l_inodebits.bits;
+       /* save them in cancel_bits, so l_blocking_ast will know
+        * which bits from the current lock were dropped. */
+       lock->l_policy_data.l_inodebits.cancel_bits = drop_bits;
+       /* Finally clear these bits in lock ibits */
+       ldlm_inodebits_drop(lock, drop_bits);
+       unlock_res_and_lock(lock);
+       /* Finally call cancel callback for remaining bits only.
+        * It is important to have converting flag during that
+        * so blocking_ast callback can distinguish convert from
+        * cancels.
+        */
+       if (lock->l_blocking_ast)
+               lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
+                                    LDLM_CB_CANCELING);
+
+       /* now notify server about convert */
+       rc = ldlm_cli_convert(lock, &flags);
+       if (rc) {
+               lock_res_and_lock(lock);
+               ldlm_clear_converting(lock);
+               ldlm_set_cbpending(lock);
+               ldlm_set_bl_ast(lock);
+               unlock_res_and_lock(lock);
+               LASSERT(list_empty(&lock->l_lru));
+               GOTO(exit, rc);
+       }
+       EXIT;
+exit:
+       LDLM_DEBUG(lock, "client lock convert END");
+       return rc;
+}
index 0afd9e2..572b686 100644 (file)
@@ -175,6 +175,7 @@ void ldlm_lock_destroy_nolock(struct ldlm_lock *lock);
 
 int ldlm_export_cancel_blocked_locks(struct obd_export *exp);
 int ldlm_export_cancel_locks(struct obd_export *exp);
+void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock);
 
 /* ldlm_lockd.c */
 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
index b5c68b2..0b75a03 100644 (file)
@@ -868,7 +868,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
         } else if (ns_is_client(ns) &&
                    !lock->l_readers && !lock->l_writers &&
                   !ldlm_is_no_lru(lock) &&
-                  !ldlm_is_bl_ast(lock)) {
+                  !ldlm_is_bl_ast(lock) &&
+                  !ldlm_is_converting(lock)) {
 
                 LDLM_DEBUG(lock, "add lock into lru list");
 
@@ -1923,8 +1924,6 @@ restart:
        if (!list_empty(&bl_ast_list)) {
                unlock_res(res);
 
-               LASSERT(intention == LDLM_PROCESS_RECOVERY);
-
                rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
                                       LDLM_WORK_BL_AST);
 
@@ -2065,6 +2064,13 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        unlock_res_and_lock(lock);
 
        ldlm_lock2desc(lock->l_blocking_lock, &d);
+       /* copy blocking lock ibits in cancel_bits as well,
+        * new client may use them for lock convert and it is
+        * important to use new field to convert locks from
+        * new servers only
+        */
+       d.l_policy_data.l_inodebits.cancel_bits =
+               lock->l_blocking_lock->l_policy_data.l_inodebits.bits;
 
        rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
        LDLM_LOCK_RELEASE(lock->l_blocking_lock);
@@ -2398,6 +2404,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
          * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
                 LDLM_ERROR(lock, "lock still has references");
+               unlock_res_and_lock(lock);
                 LBUG();
         }
 
index e485b30..b502b17 100644 (file)
@@ -1348,6 +1348,22 @@ existing_lock:
                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
                 rc = -ENOTCONN;
        } else if (ldlm_is_ast_sent(lock)) {
+               /* fill lock desc for possible lock convert */
+               if (lock->l_blocking_lock &&
+                   lock->l_resource->lr_type == LDLM_IBITS) {
+                       struct ldlm_lock *bl_lock = lock->l_blocking_lock;
+                       struct ldlm_lock_desc *rep_desc = &dlm_rep->lock_desc;
+
+                       LDLM_DEBUG(lock,
+                                  "save blocking bits %llx in granted lock",
+                                  bl_lock->l_policy_data.l_inodebits.bits);
+                       /* If lock is blocked then save blocking ibits
+                        * in returned lock policy for the possible lock
+                        * convert on a client.
+                        */
+                       rep_desc->l_policy_data.l_inodebits.cancel_bits =
+                               bl_lock->l_policy_data.l_inodebits.bits;
+               }
                dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
                 if (lock->l_granted_mode == lock->l_req_mode) {
                         /*
@@ -1436,14 +1452,15 @@ existing_lock:
                        }
                }
 
-                if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
-                        ldlm_reprocess_all(lock->l_resource);
+               if (!err && !ldlm_is_cbpending(lock) &&
+                   dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
+                       ldlm_reprocess_all(lock->l_resource);
 
-                LDLM_LOCK_RELEASE(lock);
-        }
+               LDLM_LOCK_RELEASE(lock);
+       }
 
-        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
-                          lock, rc);
+       LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
+                         lock, rc);
 
         return rc;
 }
@@ -1452,9 +1469,74 @@ existing_lock:
  * Main LDLM entry point for server code to process lock conversion requests.
  */
 int ldlm_handle_convert0(struct ptlrpc_request *req,
-                         const struct ldlm_request *dlm_req)
+                        const struct ldlm_request *dlm_req)
 {
-       RETURN(-ENOTSUPP);
+       struct obd_export *exp = req->rq_export;
+       struct ldlm_reply *dlm_rep;
+       struct ldlm_lock *lock;
+       int rc;
+
+       ENTRY;
+
+       if (exp && exp->exp_nid_stats && exp->exp_nid_stats->nid_ldlm_stats)
+               lprocfs_counter_incr(exp->exp_nid_stats->nid_ldlm_stats,
+                                    LDLM_CONVERT - LDLM_FIRST_OPC);
+
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc)
+               RETURN(rc);
+
+       dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       dlm_rep->lock_flags = dlm_req->lock_flags;
+
+       lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
+       if (lock) {
+               __u64 bits;
+               __u64 new;
+
+               bits = lock->l_policy_data.l_inodebits.bits;
+               new = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
+               LDLM_DEBUG(lock, "server-side convert handler START");
+
+               if (ldlm_is_cancel(lock)) {
+                       LDLM_ERROR(lock, "convert on canceled lock!");
+                       req->rq_status = LUSTRE_EINVAL;
+               } else if (dlm_req->lock_desc.l_req_mode !=
+                          lock->l_granted_mode) {
+                       LDLM_ERROR(lock, "lock mode differs!");
+                       req->rq_status = LUSTRE_EINVAL;
+               } else if (bits == new) {
+                       /* This can be valid situation if CONVERT RPCs are
+                        * re-ordered. Just finish silently*/
+                       LDLM_DEBUG(lock, "lock is converted already!");
+                       req->rq_status = 0;
+               } else {
+                       lock_res_and_lock(lock);
+                       if (ldlm_is_waited(lock))
+                               ldlm_del_waiting_lock(lock);
+
+                       ldlm_clear_cbpending(lock);
+                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
+                       ldlm_inodebits_drop(lock, bits & ~new);
+                       lock->l_bl_ast_run = 0;
+                       ldlm_clear_ast_sent(lock);
+                       unlock_res_and_lock(lock);
+                       req->rq_status = 0;
+
+                       dlm_rep->lock_handle = lock->l_remote_handle;
+                       ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
+                                       &dlm_rep->lock_desc.l_policy_data);
+
+                       ldlm_reprocess_all(lock->l_resource);
+               }
+               LDLM_DEBUG(lock, "server-side convert handler END");
+               LDLM_LOCK_PUT(lock);
+       } else {
+               req->rq_status = LUSTRE_EINVAL;
+               LDLM_DEBUG_NOLOCK("server-side convert handler END");
+       }
+
+       RETURN(0);
 }
 
 /**
@@ -1585,6 +1667,24 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
         LDLM_DEBUG(lock, "client blocking AST callback handler");
 
         lock_res_and_lock(lock);
+
+       /* set bits to cancel for this lock for possible lock convert */
+       if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) {
+               /* Lock description contains policy of blocking lock,
+                * and its cancel_bits is used to pass conflicting bits.
+                * NOTE: ld can be NULL or can be not NULL but zeroed if
+                * passed from ldlm_bl_thread_blwi(), check below used bits
+                * in ld to make sure it is valid description.
+                */
+               if (ld && ld->l_policy_data.l_inodebits.bits)
+                       lock->l_policy_data.l_inodebits.cancel_bits =
+                               ld->l_policy_data.l_inodebits.cancel_bits;
+               /* if there is no valid ld and lock is cbpending already
+                * then cancel_bits should be kept, otherwise it is zeroed.
+                */
+               else if (!ldlm_is_cbpending(lock))
+                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
+       }
        ldlm_set_cbpending(lock);
 
        if (ldlm_is_cancel_on_block(lock))
index 3cdeea7..61be33c 100644 (file)
@@ -678,19 +678,19 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                                &lock->l_policy_data);
                }
 
-                if (type != LDLM_PLAIN)
-                        LDLM_DEBUG(lock,"client-side enqueue, new policy data");
-        }
+               if (type != LDLM_PLAIN)
+                       LDLM_DEBUG(lock,"client-side enqueue, new policy data");
+       }
 
        if ((*flags) & LDLM_FL_AST_SENT) {
-                lock_res_and_lock(lock);
+               lock_res_and_lock(lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
-                unlock_res_and_lock(lock);
-                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
-        }
+               unlock_res_and_lock(lock);
+               LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
+       }
 
-        /* If the lock has already been granted by a completion AST, don't
-         * clobber the LVB with an older one. */
+       /* If the lock has already been granted by a completion AST, don't
+        * clobber the LVB with an older one. */
        if (lvb_len > 0) {
                /* We must lock or a racing completion might update lvb without
                 * letting us know and we'll clobber the correct value.
@@ -1037,6 +1037,177 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 
 /**
+ * Client-side lock convert reply handling.
+ *
+ * Finish client lock converting, checks for concurrent converts
+ * and clear 'converting' flag so lock can be placed back into LRU.
+ */
+static int lock_convert_interpret(const struct lu_env *env,
+                                 struct ptlrpc_request *req,
+                                 struct ldlm_async_args *aa, int rc)
+{
+       struct ldlm_lock *lock;
+       struct ldlm_reply *reply;
+
+       ENTRY;
+
+       lock = ldlm_handle2lock(&aa->lock_handle);
+       if (!lock) {
+               LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
+                       aa->lock_handle.cookie);
+               RETURN(-ESTALE);
+       }
+
+       LDLM_DEBUG(lock, "CONVERTED lock:");
+
+       if (rc != ELDLM_OK)
+               GOTO(out, rc);
+
+       reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       if (reply == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
+               LDLM_ERROR(lock, "convert ACK with wrong lock cookie %#llx"
+                          " but cookie %#llx from server %s id %s\n",
+                          aa->lock_handle.cookie, reply->lock_handle.cookie,
+                          req->rq_export->exp_client_uuid.uuid,
+                          libcfs_id2str(req->rq_peer));
+               GOTO(out, rc = -ESTALE);
+       }
+
+       lock_res_and_lock(lock);
+       /* Lock convert is sent for any new bits to drop, the converting flag
+        * is dropped when ibits on server are the same as on client. Meanwhile
+        * that can be so that more later convert will be replied first with
+        * and clear converting flag, so in case of such race just exit here.
+        * if lock has no converting bits then  */
+       if (!ldlm_is_converting(lock)) {
+               LDLM_DEBUG(lock, "convert ACK for lock without converting flag,"
+                          " reply ibits %#llx",
+                          reply->lock_desc.l_policy_data.l_inodebits.bits);
+       } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
+                  lock->l_policy_data.l_inodebits.bits) {
+               /* Compare server returned lock ibits and local lock ibits
+                * if they are the same we consider convertion is done,
+                * otherwise we have more converts inflight and keep
+                * converting flag.
+                */
+               LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
+                          reply->lock_desc.l_policy_data.l_inodebits.bits);
+       } else {
+               ldlm_clear_converting(lock);
+
+               /* Concurrent BL AST has arrived, it may cause another convert
+                * or cancel so just exit here.
+                */
+               if (!ldlm_is_bl_ast(lock)) {
+                       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+
+                       /* Drop cancel_bits since there are no more converts
+                        * and put lock into LRU if it is not there yet.
+                        */
+                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
+                       spin_lock(&ns->ns_lock);
+                       if (!list_empty(&lock->l_lru))
+                               ldlm_lock_remove_from_lru_nolock(lock);
+                       ldlm_lock_add_to_lru_nolock(lock);
+                       spin_unlock(&ns->ns_lock);
+               }
+       }
+       unlock_res_and_lock(lock);
+out:
+       if (rc) {
+               lock_res_and_lock(lock);
+               if (ldlm_is_converting(lock)) {
+                       LASSERT(list_empty(&lock->l_lru));
+                       ldlm_clear_converting(lock);
+                       ldlm_set_cbpending(lock);
+                       ldlm_set_bl_ast(lock);
+               }
+               unlock_res_and_lock(lock);
+       }
+
+       LDLM_LOCK_PUT(lock);
+       RETURN(rc);
+}
+
+/**
+ * Client-side IBITS lock convert.
+ *
+ * Inform server that lock has been converted instead of canceling.
+ * Server finishes convert on own side and does reprocess to grant
+ * all related waiting locks.
+ *
+ * Since convert means only ibits downgrading, client doesn't need to
+ * wait for server reply to finish local converting process so this request
+ * is made asynchronous.
+ *
+ */
+int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
+{
+       struct ldlm_request *body;
+       struct ptlrpc_request *req;
+       struct ldlm_async_args *aa;
+       struct obd_export *exp = lock->l_conn_export;
+
+       ENTRY;
+
+       if (exp == NULL) {
+               LDLM_ERROR(lock, "convert must not be called on local locks.");
+               RETURN(-EINVAL);
+       }
+
+       if (lock->l_resource->lr_type != LDLM_IBITS) {
+               LDLM_ERROR(lock, "convert works with IBITS locks only.");
+               RETURN(-EINVAL);
+       }
+
+       LDLM_DEBUG(lock, "client-side convert");
+
+       req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                       &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
+                                       LDLM_CONVERT);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+       body->lock_handle[0] = lock->l_remote_handle;
+
+       body->lock_desc.l_req_mode = lock->l_req_mode;
+       body->lock_desc.l_granted_mode = lock->l_granted_mode;
+
+       body->lock_desc.l_policy_data.l_inodebits.bits =
+                                       lock->l_policy_data.l_inodebits.bits;
+       body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0;
+
+       body->lock_flags = ldlm_flags_to_wire(*flags);
+       body->lock_count = 1;
+
+       ptlrpc_request_set_replen(req);
+
+       /* That could be useful to use cancel portals for convert as well
+        * as high-priority handling. This will require changes in
+        * ldlm_cancel_handler to understand convert RPC as well.
+        *
+        * req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
+        * req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+        */
+       ptlrpc_at_set_req_timeout(req);
+
+       if (exp->exp_obd->obd_svc_stats != NULL)
+               lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
+                                    LDLM_CONVERT - LDLM_FIRST_OPC);
+
+       aa = ptlrpc_req_async_args(req);
+       ldlm_lock2handle(lock, &aa->lock_handle);
+       req->rq_interpret_reply = (ptlrpc_interpterer_t)lock_convert_interpret;
+
+       ptlrpcd_add_req(req);
+       RETURN(0);
+}
+
+/**
  * Cancel locks locally.
  * Returns:
  * \retval LDLM_FL_LOCAL_ONLY if there is no need for a CANCEL RPC to the server
@@ -1291,6 +1462,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
        struct ldlm_namespace *ns;
        struct ldlm_lock *lock;
        struct list_head cancels = LIST_HEAD_INIT(cancels);
+
        ENTRY;
 
        lock = ldlm_handle2lock_long(lockh, 0);
@@ -1299,6 +1471,19 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                RETURN(0);
        }
 
+       /* Convert lock bits instead of cancel for IBITS locks */
+       if (cancel_flags & LCF_CONVERT) {
+               LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
+               LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0);
+
+               rc = ldlm_cli_dropbits(lock,
+                               lock->l_policy_data.l_inodebits.cancel_bits);
+               if (rc == 0) {
+                       LDLM_LOCK_RELEASE(lock);
+                       RETURN(0);
+               }
+       }
+
        lock_res_and_lock(lock);
        /* Lock is being canceled and the caller doesn't want to wait */
        if (ldlm_is_canceling(lock)) {
@@ -1314,6 +1499,15 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                RETURN(0);
        }
 
+       /* Lock is being converted, cancel it immediately.
+        * When convert will end, it releases lock and it will be gone.
+        */
+       if (ldlm_is_converting(lock)) {
+               /* set back flags removed by convert */
+               ldlm_set_cbpending(lock);
+               ldlm_set_bl_ast(lock);
+       }
+
        ldlm_set_canceling(lock);
        unlock_res_and_lock(lock);
 
@@ -1680,7 +1874,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
 
                        /* Somebody is already doing CANCEL. No need for this
                         * lock in LRU, do not traverse it again. */
-                       if (!ldlm_is_canceling(lock))
+                       if (!ldlm_is_canceling(lock) ||
+                           !ldlm_is_converting(lock))
                                break;
 
                        ldlm_lock_remove_from_lru_nolock(lock);
@@ -1723,7 +1918,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
 
                lock_res_and_lock(lock);
                /* Check flags again under the lock. */
-               if (ldlm_is_canceling(lock) ||
+               if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) ||
                    ldlm_lock_remove_from_lru_check(lock, last_use) == 0) {
                        /* Another thread is removing lock from LRU, or
                         * somebody is already doing CANCEL, or there
index 5fc7c29..0e098a3 100644 (file)
@@ -223,179 +223,254 @@ int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
        RETURN(rc);
 }
 
-int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                      void *data, int flag)
+void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
 {
-       struct lustre_handle lockh;
+       struct inode *inode = ll_inode_from_resource_lock(lock);
+       __u64 bits = to_cancel;
        int rc;
-       ENTRY;
 
-       switch (flag) {
-       case LDLM_CB_BLOCKING:
-               ldlm_lock2handle(lock, &lockh);
-               rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
-               if (rc < 0) {
-                       CDEBUG(D_INODE, "ldlm_cli_cancel: rc = %d\n", rc);
-                       RETURN(rc);
-               }
-               break;
-       case LDLM_CB_CANCELING: {
-               struct inode *inode = ll_inode_from_resource_lock(lock);
-               __u64 bits = lock->l_policy_data.l_inodebits.bits;
+       if (inode == NULL)
+               return;
 
-               if (inode == NULL)
-                       break;
+       if (!fid_res_name_eq(ll_inode2fid(inode),
+                            &lock->l_resource->lr_name)) {
+               LDLM_ERROR(lock, "data mismatch with object "DFID"(%p)",
+                          PFID(ll_inode2fid(inode)), inode);
+               LBUG();
+       }
 
-               /* Invalidate all dentries associated with this inode */
-               LASSERT(ldlm_is_canceling(lock));
+       if (bits & MDS_INODELOCK_XATTR) {
+               if (S_ISDIR(inode->i_mode))
+                       ll_i2info(inode)->lli_def_stripe_offset = -1;
+               ll_xattr_cache_destroy(inode);
+               bits &= ~MDS_INODELOCK_XATTR;
+       }
+
+       /* For OPEN locks we differentiate between lock modes
+        * LCK_CR, LCK_CW, LCK_PR - bug 22891 */
+       if (bits & MDS_INODELOCK_OPEN)
+               ll_have_md_lock(inode, &bits, lock->l_req_mode);
+
+       if (bits & MDS_INODELOCK_OPEN) {
+               fmode_t fmode;
 
-               if (!fid_res_name_eq(ll_inode2fid(inode),
-                                    &lock->l_resource->lr_name)) {
-                       LDLM_ERROR(lock, "data mismatch with object "DFID"(%p)",
-                                  PFID(ll_inode2fid(inode)), inode);
+               switch (lock->l_req_mode) {
+               case LCK_CW:
+                       fmode = FMODE_WRITE;
+                       break;
+               case LCK_PR:
+                       fmode = FMODE_EXEC;
+                       break;
+               case LCK_CR:
+                       fmode = FMODE_READ;
+                       break;
+               default:
+                       LDLM_ERROR(lock, "bad lock mode for OPEN lock");
                        LBUG();
                }
 
-               if (bits & MDS_INODELOCK_XATTR) {
-                       if (S_ISDIR(inode->i_mode))
-                               ll_i2info(inode)->lli_def_stripe_offset = -1;
-                       ll_xattr_cache_destroy(inode);
-                       bits &= ~MDS_INODELOCK_XATTR;
-               }
+               ll_md_real_close(inode, fmode);
 
-               /* For OPEN locks we differentiate between lock modes
-                * LCK_CR, LCK_CW, LCK_PR - bug 22891 */
-               if (bits & MDS_INODELOCK_OPEN)
-                       ll_have_md_lock(inode, &bits, lock->l_req_mode);
-
-               if (bits & MDS_INODELOCK_OPEN) {
-                       fmode_t fmode;
-
-                       switch (lock->l_req_mode) {
-                       case LCK_CW:
-                               fmode = FMODE_WRITE;
-                               break;
-                       case LCK_PR:
-                               fmode = FMODE_EXEC;
-                               break;
-                       case LCK_CR:
-                               fmode = FMODE_READ;
-                               break;
-                       default:
-                               LDLM_ERROR(lock, "bad lock mode for OPEN lock");
-                               LBUG();
-                       }
+               bits &= ~MDS_INODELOCK_OPEN;
+       }
 
-                       ll_md_real_close(inode, fmode);
+       if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+                   MDS_INODELOCK_LAYOUT | MDS_INODELOCK_PERM |
+                   MDS_INODELOCK_DOM))
+               ll_have_md_lock(inode, &bits, LCK_MINMODE);
 
-                       bits &= ~MDS_INODELOCK_OPEN;
-               }
+       if (bits & MDS_INODELOCK_DOM) {
+               rc =  ll_dom_lock_cancel(inode, lock);
+               if (rc < 0)
+                       CDEBUG(D_INODE, "cannot flush DoM data "
+                              DFID": rc = %d\n",
+                              PFID(ll_inode2fid(inode)), rc);
+               lock_res_and_lock(lock);
+               ldlm_set_kms_ignore(lock);
+               unlock_res_and_lock(lock);
+       }
 
-               if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
-                           MDS_INODELOCK_LAYOUT | MDS_INODELOCK_PERM |
-                           MDS_INODELOCK_DOM))
-                       ll_have_md_lock(inode, &bits, LCK_MINMODE);
-
-               if (bits & MDS_INODELOCK_DOM) {
-                       rc =  ll_dom_lock_cancel(inode, lock);
-                       if (rc < 0)
-                               CDEBUG(D_INODE, "cannot flush DoM data "
-                                      DFID": rc = %d\n",
-                                      PFID(ll_inode2fid(inode)), rc);
-                       lock_res_and_lock(lock);
-                       ldlm_set_kms_ignore(lock);
-                       unlock_res_and_lock(lock);
-                       bits &= ~MDS_INODELOCK_DOM;
-               }
+       if (bits & MDS_INODELOCK_LAYOUT) {
+               struct cl_object_conf conf = {
+                       .coc_opc = OBJECT_CONF_INVALIDATE,
+                       .coc_inode = inode,
+               };
 
-               if (bits & MDS_INODELOCK_LAYOUT) {
-                       struct cl_object_conf conf = {
-                               .coc_opc = OBJECT_CONF_INVALIDATE,
-                               .coc_inode = inode,
-                       };
-
-                       rc = ll_layout_conf(inode, &conf);
-                       if (rc < 0)
-                               CDEBUG(D_INODE, "cannot invalidate layout of "
-                                      DFID": rc = %d\n",
-                                      PFID(ll_inode2fid(inode)), rc);
-               }
+               rc = ll_layout_conf(inode, &conf);
+               if (rc < 0)
+                       CDEBUG(D_INODE, "cannot invalidate layout of "
+                              DFID": rc = %d\n",
+                              PFID(ll_inode2fid(inode)), rc);
+       }
 
-               if (bits & MDS_INODELOCK_UPDATE) {
-                       struct ll_inode_info *lli = ll_i2info(inode);
-                       lli->lli_update_atime = 1;
-               }
+       if (bits & MDS_INODELOCK_UPDATE) {
+               struct ll_inode_info *lli = ll_i2info(inode);
 
-               if ((bits & MDS_INODELOCK_UPDATE) && S_ISDIR(inode->i_mode)) {
-                       struct ll_inode_info *lli = ll_i2info(inode);
+               lli->lli_update_atime = 1;
+       }
 
-                       CDEBUG(D_INODE, "invalidating inode "DFID" lli = %p, "
-                              "pfid  = "DFID"\n", PFID(ll_inode2fid(inode)),
-                              lli, PFID(&lli->lli_pfid));
-                       truncate_inode_pages(inode->i_mapping, 0);
+       if ((bits & MDS_INODELOCK_UPDATE) && S_ISDIR(inode->i_mode)) {
+               struct ll_inode_info *lli = ll_i2info(inode);
 
-                       if (unlikely(!fid_is_zero(&lli->lli_pfid))) {
-                               struct inode *master_inode = NULL;
-                               unsigned long hash;
+               CDEBUG(D_INODE, "invalidating inode "DFID" lli = %p, "
+                      "pfid  = "DFID"\n", PFID(ll_inode2fid(inode)),
+                      lli, PFID(&lli->lli_pfid));
+               truncate_inode_pages(inode->i_mapping, 0);
 
-                               /* This is slave inode, since all of the child
-                                * dentry is connected on the master inode, so
-                                * we have to invalidate the negative children
-                                * on master inode */
-                               CDEBUG(D_INODE, "Invalidate s"DFID" m"DFID"\n",
-                                      PFID(ll_inode2fid(inode)),
-                                      PFID(&lli->lli_pfid));
+               if (unlikely(!fid_is_zero(&lli->lli_pfid))) {
+                       struct inode *master_inode = NULL;
+                       unsigned long hash;
 
-                               hash = cl_fid_build_ino(&lli->lli_pfid,
+                       /* This is slave inode, since all of the child dentry
+                        * is connected on the master inode, so we have to
+                        * invalidate the negative children on master inode */
+                       CDEBUG(D_INODE, "Invalidate s"DFID" m"DFID"\n",
+                              PFID(ll_inode2fid(inode)), PFID(&lli->lli_pfid));
+
+                       hash = cl_fid_build_ino(&lli->lli_pfid,
                                        ll_need_32bit_api(ll_i2sbi(inode)));
 
-                               /* Do not lookup the inode with ilookup5,
-                                * otherwise it will cause dead lock,
-                                *
-                                * 1. Client1 send chmod req to the MDT0, then
-                                * on MDT0, it enqueues master and all of its
-                                * slaves lock, (mdt_attr_set() ->
-                                * mdt_lock_slaves()), after gets master and
-                                * stripe0 lock, it will send the enqueue req
-                                * (for stripe1) to MDT1, then MDT1 finds the
-                                * lock has been granted to client2. Then MDT1
-                                * sends blocking ast to client2.
-                                *
-                                * 2. At the same time, client2 tries to unlink
-                                * the striped dir (rm -rf striped_dir), and
-                                * during lookup, it will hold the master inode
-                                * of the striped directory, whose inode state
-                                * is NEW, then tries to revalidate all of its
-                                * slaves, (ll_prep_inode()->ll_iget()->
-                                * ll_read_inode2()-> ll_update_inode().). And
-                                * it will be blocked on the server side because
-                                * of 1.
-                                *
-                                * 3. Then the client get the blocking_ast req,
-                                * cancel the lock, but being blocked if using
-                                * ->ilookup5()), because master inode state is
-                                *  NEW. */
-                               master_inode = ilookup5_nowait(inode->i_sb,
-                                                   hash, ll_test_inode_by_fid,
+                       /* Do not lookup the inode with ilookup5, otherwise
+                        * it will cause dead lock,
+                        * 1. Client1 send chmod req to the MDT0, then on MDT0,
+                        * it enqueues master and all of its slaves lock,
+                        * (mdt_attr_set() -> mdt_lock_slaves()), after gets
+                        * master and stripe0 lock, it will send the enqueue
+                        * req (for stripe1) to MDT1, then MDT1 finds the lock
+                        * has been granted to client2. Then MDT1 sends blocking
+                        * ast to client2.
+                        * 2. At the same time, client2 tries to unlink
+                        * the striped dir (rm -rf striped_dir), and during
+                        * lookup, it will hold the master inode of the striped
+                        * directory, whose inode state is NEW, then tries to
+                        * revalidate all of its slaves, (ll_prep_inode()->
+                        * ll_iget()->ll_read_inode2()-> ll_update_inode().).
+                        * And it will be blocked on the server side because
+                        * of 1.
+                        * 3. Then the client get the blocking_ast req, cancel
+                        * the lock, but being blocked if using ->ilookup5()),
+                        * because master inode state is NEW. */
+                       master_inode = ilookup5_nowait(inode->i_sb, hash,
+                                                       ll_test_inode_by_fid,
                                                        (void *)&lli->lli_pfid);
-                               if (master_inode) {
-                                       ll_invalidate_negative_children(
-                                                               master_inode);
-                                       iput(master_inode);
-                               }
-                       } else {
-                               ll_invalidate_negative_children(inode);
+                       if (master_inode) {
+                               ll_invalidate_negative_children(master_inode);
+                               iput(master_inode);
                        }
+               } else {
+                       ll_invalidate_negative_children(inode);
                }
+       }
 
-               if ((bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM)) &&
-                   inode->i_sb->s_root != NULL &&
-                   inode != inode->i_sb->s_root->d_inode)
-                       ll_invalidate_aliases(inode);
+       if ((bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM)) &&
+           inode->i_sb->s_root != NULL &&
+           inode != inode->i_sb->s_root->d_inode)
+               ll_invalidate_aliases(inode);
 
-               iput(inode);
+       iput(inode);
+}
+
+/* Check if the given lock may be downgraded instead of canceling and
+ * that convert is really needed. */
+int ll_md_need_convert(struct ldlm_lock *lock)
+{
+       struct inode *inode;
+       __u64 wanted = lock->l_policy_data.l_inodebits.cancel_bits;
+       __u64 bits = lock->l_policy_data.l_inodebits.bits & ~wanted;
+       enum ldlm_mode mode = LCK_MINMODE;
+
+       if (!wanted || !bits || ldlm_is_cancel(lock))
+               return 0;
+
+       /* do not convert locks other than DOM for now */
+       if (!((bits | wanted) & MDS_INODELOCK_DOM))
+               return 0;
+
+       /* We may have already remaining bits in some other lock so
+        * lock convert will leave us just extra lock for the same bit.
+        * Check if client has other lock with the same bits and the same
+        * or lower mode and don't convert if any.
+        */
+       switch (lock->l_req_mode) {
+       case LCK_PR:
+               mode = LCK_PR;
+       case LCK_PW:
+               mode |= LCK_CR;
                break;
+       case LCK_CW:
+               mode = LCK_CW;
+       case LCK_CR:
+               mode |= LCK_CR;
+               break;
+       default:
+               /* do not convert other modes */
+               return 0;
        }
+
+       /* is lock is too old to be converted? */
+       lock_res_and_lock(lock);
+       if (ktime_after(ktime_get(),
+                       ktime_add(lock->l_last_used,
+                                 ktime_set(10, 0)))) {
+               unlock_res_and_lock(lock);
+               return 0;
+       }
+       unlock_res_and_lock(lock);
+
+       inode = ll_inode_from_resource_lock(lock);
+       ll_have_md_lock(inode, &bits, mode);
+       iput(inode);
+       return !!(bits);
+}
+
+int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                      void *data, int flag)
+{
+       struct lustre_handle lockh;
+       __u64 bits = lock->l_policy_data.l_inodebits.bits;
+       int rc;
+
+       ENTRY;
+
+       switch (flag) {
+       case LDLM_CB_BLOCKING:
+       {
+               __u64 cancel_flags = LCF_ASYNC;
+
+               if (ll_md_need_convert(lock)) {
+                       cancel_flags |= LCF_CONVERT;
+                       /* For lock convert some cancel actions may require
+                        * this lock with non-dropped canceled bits, e.g. page
+                        * flush for DOM lock. So call ll_lock_cancel_bits()
+                        * here while canceled bits are still set.
+                        */
+                       bits = lock->l_policy_data.l_inodebits.cancel_bits;
+                       if (bits & MDS_INODELOCK_DOM)
+                               ll_lock_cancel_bits(lock, MDS_INODELOCK_DOM);
+               }
+               ldlm_lock2handle(lock, &lockh);
+               rc = ldlm_cli_cancel(&lockh, cancel_flags);
+               if (rc < 0) {
+                       CDEBUG(D_INODE, "ldlm_cli_cancel: rc = %d\n", rc);
+                       RETURN(rc);
+               }
+               break;
+       }
+       case LDLM_CB_CANCELING:
+               if (ldlm_is_converting(lock)) {
+                       /* this is called on already converted lock, so
+                        * ibits has remained bits only and cancel_bits
+                        * are bits that were dropped.
+                        * Note that DOM lock is handled prior lock convert
+                        * and is excluded here.
+                        */
+                       bits = lock->l_policy_data.l_inodebits.cancel_bits &
+                               ~MDS_INODELOCK_DOM;
+               } else {
+                       LASSERT(ldlm_is_canceling(lock));
+               }
+               ll_lock_cancel_bits(lock, bits);
+               break;
        default:
                LBUG();
        }
index dfeb102..83c968f 100644 (file)
@@ -1055,7 +1055,7 @@ int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
        if (flags & LDLM_FL_RESENT) {
                rc = LDLM_ITER_CONTINUE;
        } else {
-               __u64 tmpflags = 0;
+               __u64 tmpflags = LDLM_FL_BLOCK_NOWAIT;
                enum ldlm_error err;
 
                rc = policy(lock, &tmpflags, LDLM_PROCESS_RESCAN, &err, NULL);
index 24e6936..3b8454d 100644 (file)
@@ -1371,7 +1371,7 @@ int tgt_cp_callback(struct tgt_session_info *tsi)
 /* generic LDLM target handler */
 struct tgt_handler tgt_dlm_handlers[] = {
 TGT_DLM_HDL    (HABEO_CLAVIS,  LDLM_ENQUEUE,           tgt_enqueue),
-TGT_DLM_HDL_VAR(HABEO_CLAVIS,  LDLM_CONVERT,           tgt_convert),
+TGT_DLM_HDL    (HABEO_CLAVIS,  LDLM_CONVERT,           tgt_convert),
 TGT_DLM_HDL_VAR(0,             LDLM_BL_CALLBACK,       tgt_bl_callback),
 TGT_DLM_HDL_VAR(0,             LDLM_CP_CALLBACK,       tgt_cp_callback)
 };
index 29def34..c48a44d 100755 (executable)
@@ -16558,9 +16558,6 @@ test_271ba() {
 run_test 271ba "DoM: no glimpse RPC for stat (combined file)"
 
 test_271c() {
-       # test to be enabled with lock_convert
-       skip "skipped until lock convert will be implemented"
-
        [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.10.55) ] &&
                skip "Need MDS version at least 2.10.55"