Whamcloud - gitweb
Branch b1_8_gate
authorvitaly <vitaly>
Tue, 18 Nov 2008 21:40:04 +0000 (21:40 +0000)
committervitaly <vitaly>
Tue, 18 Nov 2008 21:40:04 +0000 (21:40 +0000)
b=16129
i=adilger
i=green

- a high priority request list is added into service;
- once a lock is canceled, all the IO requests, including coming
  ones, under this lock, are moved into this list;
- PING is also added into this list;
- once a lock cancel timeout occurs, the timeout is prolonged
  if there is an IO rpc under this lock;
- another request list is added into the export, used to speed up
  the rpc-lock matching.

lustre/ldlm/ldlm_lockd.c

index 28fa0f2..ef6c4a0 100644 (file)
@@ -236,6 +236,31 @@ static int expired_lock_main(void *arg)
         RETURN(0);
 }
 
+/**
+ * Check if there is a request in the export request list
+ * which prevents the lock canceling.
+ */
+static int ldlm_lock_busy(struct ldlm_lock *lock)
+{
+        struct ptlrpc_request *req;
+        int match = 0;
+        ENTRY;
+
+        if (lock->l_export == NULL)
+                return 0;
+
+        spin_lock(&lock->l_export->exp_lock);
+        list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+                if (req->rq_ops->hpreq_lock_match) {
+                        match = req->rq_ops->hpreq_lock_match(req, lock);
+                        if (match)
+                                break;
+                }
+        }
+        spin_unlock(&lock->l_export->exp_lock);
+        RETURN(match);
+}
+
 /* This is called from within a timer interrupt and cannot schedule */
 static void waiting_locks_callback(unsigned long unused)
 {
@@ -245,11 +270,33 @@ static void waiting_locks_callback(unsigned long unused)
         while (!list_empty(&waiting_locks_list)) {
                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
                                   l_pending_chain);
-
                 if (cfs_time_after(lock->l_callback_timeout, cfs_time_current())
                     || (lock->l_req_mode == LCK_GROUP))
                         break;
 
+                /* Check if we need to prolong timeout */
+                if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+                    ldlm_lock_busy(lock)) {
+                        int cont = 1;
+
+                        if (lock->l_pending_chain.next == &waiting_locks_list)
+                                cont = 0;
+
+                        LDLM_LOCK_GET(lock);
+                        spin_unlock_bh(&waiting_locks_spinlock);
+                        LDLM_DEBUG(lock, "prolong the busy lock");
+                        ldlm_refresh_waiting_lock(lock);
+                        spin_lock_bh(&waiting_locks_spinlock);
+
+                        if (!cont) {
+                                LDLM_LOCK_PUT(lock);
+                                break;
+                        }
+
+                        LDLM_LOCK_PUT(lock);
+                        continue;
+                }
+                lock->l_resource->lr_namespace->ns_timeouts++;
                 LDLM_ERROR(lock, "lock callback timer expired after %lds: "
                            "evicting client at %s ",
                            cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
@@ -317,15 +364,21 @@ static void waiting_locks_callback(unsigned long unused)
  */
 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
-        int timeout;
+        cfs_time_t timeout;
         cfs_time_t timeout_rounded;
 
         if (!list_empty(&lock->l_pending_chain))
                 return 0;
 
-        timeout = ldlm_get_enq_timeout(lock);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
+            OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+                timeout = 2;
+        else
+                timeout = ldlm_get_enq_timeout(lock);
 
-        lock->l_callback_timeout = cfs_time_shift(timeout);
+        timeout = cfs_time_shift(timeout);
+        if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
+                lock->l_callback_timeout = timeout;
 
         timeout_rounded = round_timeout(lock->l_callback_timeout);
 
@@ -457,7 +510,6 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
         LDLM_DEBUG(lock, "refreshed");
         return 1;
 }
-
 #else /* !__KERNEL__ */
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
@@ -604,6 +656,30 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
         RETURN(rc);
 }
 
+/**
+ * Check if there are requests in the export request list which prevent
+ * the lock canceling and make these requests high priority ones.
+ */
+static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
+{
+        struct ptlrpc_request *req;
+        ENTRY;
+
+        if (lock->l_export == NULL) {
+                LDLM_DEBUG(lock, "client lock: no-op");
+                RETURN_EXIT;
+        }
+
+        spin_lock(&lock->l_export->exp_lock);
+        list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+                if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+                    req->rq_ops->hpreq_lock_match(req, lock))
+                        ptlrpc_hpreq_reorder(req);
+        }
+        spin_unlock(&lock->l_export->exp_lock);
+        EXIT;
+}
+
 /*
  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
  * enqueued server lock conflicts with given one.
@@ -631,6 +707,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LASSERT(lock);
         LASSERT(data != NULL);
 
+        ldlm_lock_reorder_req(lock);
+
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,
                               NULL);
@@ -2072,7 +2150,7 @@ static int ldlm_setup(void)
                                 ldlm_callback_handler, "ldlm_cbd",
                                 ldlm_svc_proc_dir, NULL,
                                 ldlm_min_threads, ldlm_max_threads,
-                                "ldlm_cb");
+                                "ldlm_cb", NULL);
 
         if (!ldlm_state->ldlm_cb_service) {
                 CERROR("failed to start service\n");
@@ -2086,7 +2164,7 @@ static int ldlm_setup(void)
                                 ldlm_cancel_handler, "ldlm_canceld",
                                 ldlm_svc_proc_dir, NULL,
                                 ldlm_min_threads, ldlm_max_threads,
-                                "ldlm_cn");
+                                "ldlm_cn", NULL);
 
         if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");