Whamcloud - gitweb
LU-874 ldlm: prioritize LDLM_CANCEL requests
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index a3f96fc..d77a893 100644 (file)
@@ -28,6 +28,9 @@
 /*
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -66,6 +69,11 @@ extern cfs_mem_cache_t *ldlm_lock_slab;
 static cfs_semaphore_t  ldlm_ref_sem;
 static int ldlm_refcount;
 
+struct ldlm_cb_async_args {
+        struct ldlm_cb_set_arg *ca_set_arg;
+        struct ldlm_lock       *ca_lock;
+};
+
 /* LDLM state */
 
 static struct ldlm_state *ldlm_state;
@@ -271,7 +279,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock)
 /* This is called from within a timer interrupt and cannot schedule */
 static void waiting_locks_callback(unsigned long unused)
 {
-        struct ldlm_lock *lock, *last = NULL;
+        struct ldlm_lock *lock;
 
 repeat:
         cfs_spin_lock_bh(&waiting_locks_spinlock);
@@ -347,8 +355,6 @@ repeat:
                            libcfs_nid2str(
                                    lock->l_export->exp_connection->c_peer.nid));
 
-                last = lock;
-
                 /* no needs to take an extra ref on the lock since it was in
                  * the waiting_locks_list and ldlm_add_waiting_lock()
                  * already grabbed a ref */
@@ -641,30 +647,27 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
 static int ldlm_cb_interpret(const struct lu_env *env,
                              struct ptlrpc_request *req, void *data, int rc)
 {
-        struct ldlm_cb_set_arg *arg;
-        struct ldlm_lock *lock;
+        struct ldlm_cb_async_args *ca   = data;
+        struct ldlm_lock          *lock = ca->ca_lock;
+        struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
         ENTRY;
 
-        LASSERT(data != NULL);
-
-        arg = req->rq_async_args.pointer_arg[0];
-        lock = req->rq_async_args.pointer_arg[1];
         LASSERT(lock != NULL);
         if (rc != 0) {
                 rc = ldlm_handle_ast_error(lock, req, rc,
                                            arg->type == LDLM_BL_CALLBACK
                                            ? "blocking" : "completion");
+                if (rc == -ERESTART)
+                        cfs_atomic_inc(&arg->restart);
         }
-
         LDLM_LOCK_RELEASE(lock);
 
-        if (rc == -ERESTART)
-                cfs_atomic_set(&arg->restart, 1);
-
+        if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+                cfs_waitq_signal(&arg->waitq);
         RETURN(0);
 }
 
-static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
                                           struct ldlm_cb_set_arg *arg,
                                           struct ldlm_lock *lock,
                                           int instant_cancel)
@@ -676,12 +679,11 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
                 rc = ptl_send_rpc(req, 1);
                 ptlrpc_req_finished(req);
                 if (rc == 0)
-                        /* If we cancelled the lock, we need to restart
-                         * ldlm_reprocess_queue */
-                        cfs_atomic_set(&arg->restart, 1);
+                        cfs_atomic_inc(&arg->restart);
         } else {
                 LDLM_LOCK_GET(lock);
-                ptlrpc_set_add_req(arg->set, req);
+                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+                cfs_atomic_inc(&arg->rpcs);
         }
 
         RETURN(rc);
@@ -704,7 +706,11 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
         cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
         cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
                                 rq_exp_list) {
-                if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+                /* Do not process requests that were not yet added to there
+                 * incoming queue or were already removed from there for
+                 * processing */
+                if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
+                    req->rq_ops->hpreq_lock_match &&
                     req->rq_ops->hpreq_lock_match(req, lock))
                         ptlrpc_hpreq_reorder(req);
         }
@@ -723,6 +729,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                              struct ldlm_lock_desc *desc,
                              void *data, int flag)
 {
+        struct ldlm_cb_async_args *ca;
         struct ldlm_cb_set_arg *arg = data;
         struct ldlm_request    *body;
         struct ptlrpc_request  *req;
@@ -749,8 +756,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        req->rq_async_args.pointer_arg[0] = arg;
-        req->rq_async_args.pointer_arg[1] = lock;
+        CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+        ca = ptlrpc_req_async_args(req);
+        ca->ca_set_arg = arg;
+        ca->ca_lock = lock;
+
         req->rq_interpret_reply = ldlm_cb_interpret;
         req->rq_no_resend = 1;
 
@@ -801,7 +811,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
@@ -811,6 +821,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct ldlm_cb_set_arg *arg = data;
         struct ldlm_request    *body;
         struct ptlrpc_request  *req;
+        struct ldlm_cb_async_args *ca;
         long                    total_enqueue_wait;
         int                     instant_cancel = 0;
         int                     rc = 0;
@@ -839,8 +850,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 RETURN(rc);
         }
 
-        req->rq_async_args.pointer_arg[0] = arg;
-        req->rq_async_args.pointer_arg[1] = lock;
+        CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+        ca = ptlrpc_req_async_args(req);
+        ca->ca_set_arg = arg;
+        ca->ca_lock = lock;
+
         req->rq_interpret_reply = ldlm_cb_interpret;
         req->rq_no_resend = 1;
         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
@@ -851,10 +865,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (lock->l_resource->lr_lvb_len) {
                 void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
-                lock_res_and_lock(lock);
+                lock_res(lock->l_resource);
                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                        lock->l_resource->lr_lvb_len);
-                unlock_res_and_lock(lock);
+                unlock_res(lock->l_resource);
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
@@ -912,7 +926,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
@@ -956,12 +970,20 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
 
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ELDLM_NO_LOCK_DATA)
+        /* Update the LVB from disk if the AST failed (this is a legal race)
+         *
+         * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
+         * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
+         *   when inode is cleared. LU-274
+         */
+        if (rc == -ELDLM_NO_LOCK_DATA) {
                 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
-        else if (rc != 0)
+                ldlm_res_lvbo_update(res, NULL, 1);
+        } else if (rc != 0) {
                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
-        else
+        } else {
                 rc = ldlm_res_lvbo_update(res, req, 1);
+        }
 
         ptlrpc_req_finished(req);
         if (rc == -ERESTART)
@@ -1137,13 +1159,14 @@ existing_lock:
                  * local_lock_enqueue by the policy function. */
                 cookie = req;
         } else {
-                lock_res_and_lock(lock);
+                /* based on the assumption that lvb size never changes during
+                 * resource life time otherwise it need resource->lr_lock's
+                 * protection */
                 if (lock->l_resource->lr_lvb_len) {
                         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
                                              RCL_SERVER,
                                              lock->l_resource->lr_lvb_len);
                 }
-                unlock_res_and_lock(lock);
 
                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                         GOTO(out, rc = -ENOMEM);
@@ -1154,7 +1177,10 @@ existing_lock:
         }
 
         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
-                lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+                ldlm_convert_policy_to_local(
+                                          dlm_req->lock_desc.l_resource.lr_type,
+                                          &dlm_req->lock_desc.l_policy_data,
+                                          &lock->l_policy_data);
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                 lock->l_req_extent = lock->l_policy_data.l_extent;
 
@@ -1244,24 +1270,28 @@ existing_lock:
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
                            "(err=%d, rc=%d)", err, rc);
 
-                lock_res_and_lock(lock);
                 if (rc == 0) {
                         if (lock->l_resource->lr_lvb_len > 0) {
+                                /* MDT path won't handle lr_lvb_data, so
+                                 * lock/unlock better be contained in the
+                                 * if block */
                                 void *lvb;
 
                                 lvb = req_capsule_server_get(&req->rq_pill,
                                                              &RMF_DLM_LVB);
                                 LASSERTF(lvb != NULL, "req %p, lock %p\n",
                                          req, lock);
-
+                                lock_res(lock->l_resource);
                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                                        lock->l_resource->lr_lvb_len);
+                                unlock_res(lock->l_resource);
                         }
                 } else {
+                        lock_res_and_lock(lock);
                         ldlm_resource_unlink_lock(lock);
                         ldlm_lock_destroy_nolock(lock);
+                        unlock_res_and_lock(lock);
                 }
-                unlock_res_and_lock(lock);
 
                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
                         ldlm_reprocess_all(lock->l_resource);
@@ -1522,7 +1552,10 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         }
 
         if (lock->l_resource->lr_type != LDLM_PLAIN) {
-                lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+                ldlm_convert_policy_to_local(
+                                          dlm_req->lock_desc.l_resource.lr_type,
+                                          &dlm_req->lock_desc.l_policy_data,
+                                          &lock->l_policy_data);
                 LDLM_DEBUG(lock, "completion AST, new policy data");
         }
 
@@ -1572,7 +1605,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
          * l_ast_data */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
 
-        ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
+        ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
@@ -2066,6 +2099,90 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
+static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
+                                        struct ldlm_lock *lock)
+{
+        struct ldlm_request *dlm_req;
+        struct lustre_handle lockh;
+        int rc = 0;
+        int i;
+        ENTRY;
+
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+        if (dlm_req == NULL)
+                RETURN(0);
+
+        ldlm_lock2handle(lock, &lockh);
+        for (i = 0; i < dlm_req->lock_count; i++) {
+                if (lustre_handle_equal(&dlm_req->lock_handle[i],
+                                        &lockh)) {
+                        DEBUG_REQ(D_RPCTRACE, req,
+                                  "Prio raised by lock "LPX64".", lockh.cookie);
+
+                        rc = 1;
+                        break;
+                }
+        }
+
+        RETURN(rc);
+
+}
+
+static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        int rc = 0;
+        int i;
+        ENTRY;
+
+        /* no prolong in recovery */
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                RETURN(0);
+
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+        if (dlm_req == NULL)
+                RETURN(-EFAULT);
+
+        for (i = 0; i < dlm_req->lock_count; i++) {
+                struct ldlm_lock *lock;
+
+                lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
+                if (lock == NULL)
+                        continue;
+
+                rc = !!(lock->l_flags & LDLM_FL_AST_SENT);
+                if (rc)
+                        LDLM_DEBUG(lock, "hpreq cancel lock");
+                LDLM_LOCK_PUT(lock);
+
+                if (rc)
+                        break;
+        }
+
+        RETURN(rc);
+}
+
+static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
+        .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
+        .hpreq_check      = ldlm_cancel_hpreq_check
+};
+
+static int ldlm_hpreq_handler(struct ptlrpc_request *req)
+{
+        ENTRY;
+
+        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+
+        if (req->rq_export == NULL)
+                RETURN(0);
+
+        if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
+                req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+                req->rq_ops = &ldlm_cancel_hpreq_ops;
+        }
+        RETURN(0);
+}
+
 int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
                         cfs_hlist_node_t *hnode, void *data)
 
@@ -2115,7 +2232,8 @@ void ldlm_revoke_export_locks(struct obd_export *exp)
         CFS_INIT_LIST_HEAD(&rpc_list);
         cfs_hash_for_each_empty(exp->exp_lock_hash,
                                 ldlm_revoke_lock_cb, &rpc_list);
-        ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
+        ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+                          LDLM_WORK_REVOKE_AST);
 
         EXIT;
 }
@@ -2164,7 +2282,7 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
         int rc;
 
         cfs_init_completion(&bltd.bltd_comp);
-        rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+        rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
         if (rc < 0) {
                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
                        cfs_atomic_read(&blp->blp_num_threads), rc);
@@ -2200,32 +2318,31 @@ static int ldlm_bl_thread_main(void *arg)
         while (1) {
                 struct l_wait_info lwi = { 0 };
                 struct ldlm_bl_work_item *blwi = NULL;
+                int busy;
 
                 blwi = ldlm_bl_get_work(blp);
 
                 if (blwi == NULL) {
-                        int busy;
-
                         cfs_atomic_dec(&blp->blp_busy_threads);
                         l_wait_event_exclusive(blp->blp_waitq,
                                          (blwi = ldlm_bl_get_work(blp)) != NULL,
                                          &lwi);
                         busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
-
-                        if (blwi->blwi_ns == NULL)
-                                /* added by ldlm_cleanup() */
-                                break;
-
-                        /* Not fatal if racy and have a few too many threads */
-                        if (unlikely(busy < blp->blp_max_threads &&
-                            busy >= cfs_atomic_read(&blp->blp_num_threads)))
-                                /* discard the return value, we tried */
-                                ldlm_bl_thread_start(blp);
                 } else {
-                        if (blwi->blwi_ns == NULL)
-                                /* added by ldlm_cleanup() */
-                                break;
+                        busy = cfs_atomic_read(&blp->blp_busy_threads);
                 }
+
+                if (blwi->blwi_ns == NULL)
+                        /* added by ldlm_cleanup() */
+                        break;
+
+                /* Not fatal if racy and have a few too many threads */
+                if (unlikely(busy < blp->blp_max_threads &&
+                             busy >= cfs_atomic_read(&blp->blp_num_threads) &&
+                             !blwi->blwi_mem_pressure))
+                        /* discard the return value, we tried */
+                        ldlm_bl_thread_start(blp);
+
                 if (blwi->blwi_mem_pressure)
                         cfs_memory_pressure_set();
 
@@ -2300,7 +2417,7 @@ void ldlm_put_ref(void)
  * Export handle<->lock hash operations.
  */
 static unsigned
-ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask)
+ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
 {
         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
 }
@@ -2324,7 +2441,7 @@ ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key)
 }
 
 static int
-ldlm_export_lock_keycmp(void *key, cfs_hlist_node_t *hnode)
+ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode)
 {
         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
 }
@@ -2453,7 +2570,7 @@ static int ldlm_setup(void)
                                 ldlm_min_threads, ldlm_max_threads,
                                 "ldlm_cn",
                                 LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
-                                NULL);
+                                ldlm_hpreq_handler);
 
         if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
@@ -2497,7 +2614,7 @@ static int ldlm_setup(void)
         cfs_spin_lock_init(&waiting_locks_spinlock);
         cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
 
-        rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
+        rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
                 GOTO(out_thread, rc);
@@ -2583,7 +2700,7 @@ static int ldlm_cleanup(void)
         RETURN(0);
 }
 
-int __init ldlm_init(void)
+int ldlm_init(void)
 {
         cfs_init_mutex(&ldlm_ref_sem);
         cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
@@ -2616,7 +2733,7 @@ int __init ldlm_init(void)
         return 0;
 }
 
-void __exit ldlm_exit(void)
+void ldlm_exit(void)
 {
         int rc;
         if (ldlm_refcount)