Whamcloud - gitweb
LU-8347 ldlm: granting conflicting locks
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 7f96019..c2a420d 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2014, Intel Corporation.
+ * Copyright (c) 2010, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #include <linux/kthread.h>
+#include <linux/list.h>
 #include <libcfs/libcfs.h>
 #include <lustre_dlm.h>
 #include <obd_class.h>
-#include <libcfs/list.h>
 #include "ldlm_internal.h"
 
 static int ldlm_num_threads;
-CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
-                "number of DLM service threads to start");
+module_param(ldlm_num_threads, int, 0444);
+MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
 
 static char *ldlm_cpts;
-CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
-               "CPU partitions ldlm threads should run on");
+module_param(ldlm_cpts, charp, 0444);
+MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
 
 static struct mutex    ldlm_ref_mutex;
 static int ldlm_refcount;
@@ -68,7 +64,7 @@ struct ldlm_cb_async_args {
 
 static struct ldlm_state *ldlm_state;
 
-inline cfs_time_t round_timeout(cfs_time_t timeout)
+static inline cfs_time_t round_timeout(cfs_time_t timeout)
 {
         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
 }
@@ -206,7 +202,7 @@ static int expired_lock_main(void *arg)
 
                        lock = list_entry(expired->next, struct ldlm_lock,
                                          l_pending_chain);
-                       if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE &&
+                       if ((void *)lock < LP_POISON + PAGE_SIZE &&
                            (void *)lock >= LP_POISON) {
                                spin_unlock_bh(&waiting_locks_spinlock);
                                CERROR("free lock on elt list %p\n", lock);
@@ -214,7 +210,7 @@ static int expired_lock_main(void *arg)
                        }
                        list_del_init(&lock->l_pending_chain);
                        if ((void *)lock->l_export <
-                            LP_POISON + PAGE_CACHE_SIZE &&
+                            LP_POISON + PAGE_SIZE &&
                            (void *)lock->l_export >= LP_POISON) {
                                CERROR("lock with free export on elt list %p\n",
                                       lock->l_export);
@@ -659,31 +655,33 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
  * Perform lock cleanup if AST reply came with error.
  */
 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
-                                 struct ptlrpc_request *req, int rc,
-                                 const char *ast_type)
+                                struct ptlrpc_request *req, int rc,
+                                const char *ast_type)
 {
        lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
 
        if (!req->rq_replied || (rc && rc != -EINVAL)) {
                if (lock->l_export && lock->l_export->exp_libclient) {
-                       LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
-                                  " timeout, just cancelling lock", ast_type,
+                       LDLM_DEBUG(lock,
+                                  "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock",
+                                  ast_type, req, req->rq_xid,
                                   libcfs_nid2str(peer.nid));
                        ldlm_lock_cancel(lock);
                        rc = -ERESTART;
                } else if (ldlm_is_cancel(lock)) {
-                       LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
-                                  "cancel was received (AST reply lost?)",
-                                  ast_type, libcfs_nid2str(peer.nid));
+                       LDLM_DEBUG(lock,
+                                  "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
+                                  ast_type, req, req->rq_xid,
+                                  libcfs_nid2str(peer.nid));
                        ldlm_lock_cancel(lock);
                        rc = -ERESTART;
                } else {
-                       LDLM_ERROR(lock, "client (nid %s) %s %s AST "
-                                  "(req status %d rc %d), evict it",
+                       LDLM_ERROR(lock,
+                                  "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
                                   libcfs_nid2str(peer.nid),
                                   req->rq_replied ? "returned error from" :
                                   "failed to reply to",
-                                  ast_type,
+                                  ast_type, req, req->rq_xid,
                                   (req->rq_repmsg != NULL) ?
                                   lustre_msg_get_status(req->rq_repmsg) : 0,
                                   rc);
@@ -695,12 +693,12 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
        if (rc == -EINVAL) {
                struct ldlm_resource *res = lock->l_resource;
 
-               LDLM_DEBUG(lock, "client (nid %s) returned %d"
-                          " from %s AST - normal race",
+               LDLM_DEBUG(lock,
+                          "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
                           libcfs_nid2str(peer.nid),
                           req->rq_repmsg ?
                           lustre_msg_get_status(req->rq_repmsg) : -1,
-                          ast_type);
+                          ast_type, req, req->rq_xid);
                if (res) {
                        /* update lvbo to return proper attributes.
                         * see bug 23174 */
@@ -852,6 +850,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 /* Don't need to do anything here. */
                 RETURN(0);
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
+               LDLM_DEBUG(lock, "dropping BL AST");
+               RETURN(0);
+       }
+
         LASSERT(lock);
         LASSERT(data != NULL);
         if (lock->l_export->exp_obd->obd_recovering != 0)
@@ -903,6 +906,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LDLM_DEBUG(lock, "server preparing blocking AST");
 
         ptlrpc_request_set_replen(req);
+       ldlm_set_cbpending(lock);
        if (instant_cancel) {
                unlock_res_and_lock(lock);
                ldlm_lock_cancel(lock);
@@ -1265,23 +1269,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                 GOTO(out, rc = -EPROTO);
         }
 
-#if 0
-        /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
-           against server's _CONNECT_SUPPORTED flags? (I don't want to use
-           ibits for mgc/mgs) */
-
-        /* INODEBITS_INTEROP: Perform conversion from plain lock to
-         * inodebits lock if client does not support them. */
-       if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) &&
-            (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
-                dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
-                dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
-                        MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
-                if (dlm_req->lock_desc.l_req_mode == LCK_PR)
-                        dlm_req->lock_desc.l_req_mode = LCK_CR;
-        }
-#endif
-
        if (unlikely((flags & LDLM_FL_REPLAY) ||
                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
                 /* Find an existing lock in the per-export lock hash */
@@ -1291,8 +1278,8 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                 lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
                                        (void *)&dlm_req->lock_handle[0]);
                 if (lock != NULL) {
-                        DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
-                                  LPX64, lock->l_handle.h_cookie);
+                       DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx",
+                                 lock->l_handle.h_cookie);
                        flags |= LDLM_FL_RESENT;
                         GOTO(existing_lock, rc = 0);
                }
@@ -1355,6 +1342,14 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
         * without them. */
        lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
                                              LDLM_FL_INHERIT_MASK);
+
+       ldlm_convert_policy_to_local(req->rq_export,
+                                    dlm_req->lock_desc.l_resource.lr_type,
+                                    &dlm_req->lock_desc.l_policy_data,
+                                    &lock->l_policy_data);
+       if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+               lock->l_req_extent = lock->l_policy_data.l_extent;
+
 existing_lock:
 
         if (flags & LDLM_FL_HAS_INTENT) {
@@ -1376,14 +1371,6 @@ existing_lock:
                         GOTO(out, rc);
         }
 
-        if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
-                ldlm_convert_policy_to_local(req->rq_export,
-                                          dlm_req->lock_desc.l_resource.lr_type,
-                                          &dlm_req->lock_desc.l_policy_data,
-                                          &lock->l_policy_data);
-        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
-                lock->l_req_extent = lock->l_policy_data.l_extent;
-
        err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
        if (err) {
                if ((int)err < 0)
@@ -1441,7 +1428,7 @@ existing_lock:
                if (unlikely(!ldlm_is_cancel_on_block(lock) ||
                              !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
                         CERROR("Granting sync lock to libclient. "
-                               "req fl %d, rep fl %d, lock fl "LPX64"\n",
+                              "req fl %d, rep fl %d, lock fl %#llx\n",
                                dlm_req->lock_flags, dlm_rep->lock_flags,
                                lock->l_flags);
                         LDLM_ERROR(lock, "sync lock");
@@ -1451,7 +1438,7 @@ existing_lock:
                                it = req_capsule_client_get(&req->rq_pill,
                                                            &RMF_LDLM_INTENT);
                                if (it != NULL) {
-                                       CERROR("This is intent %s ("LPU64")\n",
+                                       CERROR("This is intent %s (%llu)\n",
                                               ldlm_it2str(it->opc), it->opc);
                                }
                        }
@@ -1515,7 +1502,7 @@ existing_lock:
                        }
                }
 
-               if (rc != 0) {
+               if (rc != 0 && !(flags & LDLM_FL_RESENT)) {
                        if (lock->l_export) {
                                ldlm_lock_cancel(lock);
                        } else {
@@ -1671,7 +1658,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
                 if (!lock) {
                         LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
-                                          "lock (cookie "LPU64")",
+                                         "lock (cookie %llu)",
                                           dlm_req->lock_handle[i].cookie);
                         continue;
                 }
@@ -2135,11 +2122,11 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req)
 }
 
 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
-                                        const char *msg, int rc,
-                                        struct lustre_handle *handle)
+                                       const char *msg, int rc,
+                                       const struct lustre_handle *handle)
 {
         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
-                  "%s: [nid %s] [rc %d] [lock "LPX64"]",
+                 "%s: [nid %s] [rc %d] [lock %#llx]",
                   msg, libcfs_id2str(req->rq_peer), rc,
                   handle ? handle->cookie : 0);
         if (req->rq_no_reply)
@@ -2258,7 +2245,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
 
         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
         if (!lock) {
-                CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
+               CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock "
                        "disappeared\n", dlm_req->lock_handle[0].cookie);
                 rc = ldlm_callback_reply(req, -EINVAL);
                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
@@ -2274,28 +2261,27 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         lock_res_and_lock(lock);
        lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
                                              LDLM_FL_AST_MASK);
-        if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
-                /* If somebody cancels lock and cache is already dropped,
-                 * or lock is failed before cp_ast received on client,
-                 * we can tell the server we have no lock. Otherwise, we
-                 * should send cancel after dropping the cache. */
+       if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
+               /* If somebody cancels lock and cache is already dropped,
+                * or lock is failed before cp_ast received on client,
+                * we can tell the server we have no lock. Otherwise, we
+                * should send cancel after dropping the cache. */
                if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
-                   ldlm_is_failed(lock)) {
-                        LDLM_DEBUG(lock, "callback on lock "
-                                   LPX64" - lock disappeared\n",
-                                   dlm_req->lock_handle[0].cookie);
-                        unlock_res_and_lock(lock);
-                        LDLM_LOCK_RELEASE(lock);
-                        rc = ldlm_callback_reply(req, -EINVAL);
-                        ldlm_callback_errmsg(req, "Operate on stale lock", rc,
-                                             &dlm_req->lock_handle[0]);
-                        RETURN(0);
-                }
+                    ldlm_is_failed(lock)) {
+                       LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared",
+                                  dlm_req->lock_handle[0].cookie);
+                       unlock_res_and_lock(lock);
+                       LDLM_LOCK_RELEASE(lock);
+                       rc = ldlm_callback_reply(req, -EINVAL);
+                       ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+                                            &dlm_req->lock_handle[0]);
+                       RETURN(0);
+               }
                /* BL_AST locks are not needed in LRU.
                 * Let ldlm_cancel_lru() be fast. */
-                ldlm_lock_remove_from_lru(lock);
+               ldlm_lock_remove_from_lru(lock);
                ldlm_set_bl_ast(lock);
-        }
+       }
         unlock_res_and_lock(lock);
 
         /* We want the ost thread to get this reply so that it can respond
@@ -2360,7 +2346,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
                 struct ldlm_request *dlm_req;
 
                 CERROR("%s from %s arrived at %lu with bad export cookie "
-                       LPU64"\n",
+                      "%llu\n",
                        ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
                        libcfs_nid2str(req->rq_peer.nid),
                        req->rq_arrival_time.tv_sec,
@@ -2420,7 +2406,7 @@ static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
                 if (lustre_handle_equal(&dlm_req->lock_handle[i],
                                         &lockh)) {
                         DEBUG_REQ(D_RPCTRACE, req,
-                                  "Prio raised by lock "LPX64".", lockh.cookie);
+                                 "Prio raised by lock %#llx.", lockh.cookie);
 
                         rc = 1;
                         break;
@@ -2537,15 +2523,15 @@ static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
 void ldlm_revoke_export_locks(struct obd_export *exp)
 {
        struct list_head  rpc_list;
-        ENTRY;
+       ENTRY;
 
        INIT_LIST_HEAD(&rpc_list);
-        cfs_hash_for_each_empty(exp->exp_lock_hash,
-                                ldlm_revoke_lock_cb, &rpc_list);
-        ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
-                          LDLM_WORK_REVOKE_AST);
+       cfs_hash_for_each_nolock(exp->exp_lock_hash,
+                                ldlm_revoke_lock_cb, &rpc_list, 0);
+       ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+                         LDLM_WORK_REVOKE_AST);
 
-        EXIT;
+       EXIT;
 }
 EXPORT_SYMBOL(ldlm_revoke_export_locks);
 #endif /* HAVE_SERVER_SUPPORT */
@@ -2600,7 +2586,6 @@ static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
 
 /* This only contains temporary data until the thread starts */
 struct ldlm_bl_thread_data {
-       char                    bltd_name[CFS_CURPROC_COMM_MAX];
        struct ldlm_bl_pool     *bltd_blp;
        struct completion       bltd_comp;
        int                     bltd_num;
@@ -2608,19 +2593,32 @@ struct ldlm_bl_thread_data {
 
 static int ldlm_bl_thread_main(void *arg);
 
-static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
+static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy)
 {
        struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
        struct task_struct *task;
 
        init_completion(&bltd.bltd_comp);
-       bltd.bltd_num = atomic_read(&blp->blp_num_threads);
-       snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1,
-               "ldlm_bl_%02d", bltd.bltd_num);
-       task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name);
+
+       bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads);
+       if (bltd.bltd_num >= blp->blp_max_threads) {
+               atomic_dec(&blp->blp_num_threads);
+               return 0;
+       }
+
+       LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num);
+       if (check_busy &&
+           atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) {
+               atomic_dec(&blp->blp_num_threads);
+               return 0;
+       }
+
+       task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d",
+                          bltd.bltd_num);
        if (IS_ERR(task)) {
                CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
-                      atomic_read(&blp->blp_num_threads), PTR_ERR(task));
+                      bltd.bltd_num, PTR_ERR(task));
+               atomic_dec(&blp->blp_num_threads);
                return PTR_ERR(task);
        }
        wait_for_completion(&bltd.bltd_comp);
@@ -2632,12 +2630,11 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
 static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
                                      struct ldlm_bl_work_item *blwi)
 {
-       int busy = atomic_read(&blp->blp_busy_threads);
-
-       if (busy >= blp->blp_max_threads)
+       if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads)
                return 0;
 
-       if (busy < atomic_read(&blp->blp_num_threads))
+       if (atomic_read(&blp->blp_busy_threads) <
+           atomic_read(&blp->blp_num_threads))
                return 0;
 
        if (blwi != NULL && (blwi->blwi_ns == NULL ||
@@ -2726,9 +2723,6 @@ static int ldlm_bl_thread_main(void *arg)
 
        blp = bltd->bltd_blp;
 
-       atomic_inc(&blp->blp_num_threads);
-       atomic_inc(&blp->blp_busy_threads);
-
        complete(&bltd->bltd_comp);
        /* cannot use bltd after this, it is only on caller's stack */
 
@@ -2740,29 +2734,28 @@ static int ldlm_bl_thread_main(void *arg)
 
                rc = ldlm_bl_get_work(blp, &blwi, &exp);
 
-               if (rc == 0) {
-                       atomic_dec(&blp->blp_busy_threads);
+               if (rc == 0)
                        l_wait_event_exclusive(blp->blp_waitq,
                                               ldlm_bl_get_work(blp, &blwi,
                                                                &exp),
                                               &lwi);
-                       atomic_inc(&blp->blp_busy_threads);
-               }
+               atomic_inc(&blp->blp_busy_threads);
 
                if (ldlm_bl_thread_need_create(blp, blwi))
                        /* discard the return value, we tried */
-                       ldlm_bl_thread_start(blp);
+                       ldlm_bl_thread_start(blp, true);
 
                if (exp)
                        rc = ldlm_bl_thread_exports(blp, exp);
                else if (blwi)
                        rc = ldlm_bl_thread_blwi(blp, blwi);
 
+               atomic_dec(&blp->blp_busy_threads);
+
                if (rc == LDLM_ITER_STOP)
                        break;
        }
 
-       atomic_dec(&blp->blp_busy_threads);
        atomic_dec(&blp->blp_num_threads);
        complete(&blp->blp_comp);
        RETURN(0);
@@ -3042,7 +3035,7 @@ static int ldlm_setup(void)
        }
 
        for (i = 0; i < blp->blp_min_threads; i++) {
-               rc = ldlm_bl_thread_start(blp);
+               rc = ldlm_bl_thread_start(blp, false);
                if (rc < 0)
                        GOTO(out, rc);
        }