Whamcloud - gitweb
* Add timeouts for blocking-AST callbacks.
authorshaver <shaver>
Wed, 21 Aug 2002 20:49:11 +0000 (20:49 +0000)
committershaver <shaver>
Wed, 21 Aug 2002 20:49:11 +0000 (20:49 +0000)
* Add fail_loc support for dropping a blocking AST reply or callback on the
  OSC.

lustre/include/linux/lustre_dlm.h
lustre/include/linux/obd_support.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c

index 5c3413c..cd18fa2 100644 (file)
@@ -119,6 +119,9 @@ struct ldlm_lock {
         struct list_head      l_childof;
         struct list_head      l_res_link; /*position in one of three res lists*/
         struct list_head      l_inode_link; /* position in inode info list */
+        struct list_head      l_export_chain; /* per-export chain of locks */
+        struct list_head      l_pending_chain; /* locks with callbacks pending */
+        unsigned long         l_callback_timeout;
 
         ldlm_mode_t           l_req_mode;
         ldlm_mode_t           l_granted_mode;
index e9cd118..c7d1328 100644 (file)
@@ -93,6 +93,8 @@ extern char obd_recovery_upcall[128];
 #define OBD_FAIL_OSC                     0x400
 #define OBD_FAIL_OSC_BRW_READ_BULK       0x401
 #define OBD_FAIL_OSC_BRW_WRITE_BULK      0x402
+#define OBD_FAIL_OSC_LOCK_BL_AST         0x403
+#define OBD_FAIL_OSC_LOCK_BL_REPLY       0x404
 
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
index fea73d7..c2f9f25 100644 (file)
@@ -226,6 +226,8 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         INIT_LIST_HEAD(&lock->l_children);
         INIT_LIST_HEAD(&lock->l_res_link);
         INIT_LIST_HEAD(&lock->l_inode_link);
+        INIT_LIST_HEAD(&lock->l_export_chain);
+        INIT_LIST_HEAD(&lock->l_pending_chain);
         init_waitqueue_head(&lock->l_waitq);
 
         if (parent != NULL) {
index f62bb91..4d687a3 100644 (file)
@@ -35,6 +35,93 @@ extern struct list_head ldlm_namespace_list;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
+inline unsigned long round_timeout(unsigned long timeout)
+{
+        return ((timeout / HZ) + 1) * HZ;
+}
+
+static void waiting_locks_callback(unsigned long unused)
+{
+        CERROR("lock(s) expired! need to start recovery!\n");
+}
+
+static struct list_head waiting_locks_list;
+static spinlock_t waiting_locks_spinlock;
+static struct timer_list waiting_locks_timer;
+/*
+ * Indicate that we're waiting for a client to call us back cancelling a given
+ * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
+ * timer to fire appropriately.  (We round up to the next second, to avoid floods
+ * of timer firings during periods of high lock contention and traffic.
+ */
+int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+{
+        unsigned long timeout_rounded;
+        ENTRY;
+        
+        LASSERT(list_empty(&lock->l_pending_chain));
+        
+        CERROR("waiting for lock %p\n", lock);
+        spin_lock_bh(&waiting_locks_spinlock);
+        lock->l_callback_timeout = jiffies + (obd_timeout * HZ);
+        
+        timeout_rounded = round_timeout(lock->l_callback_timeout);
+        
+        if (timeout_rounded < waiting_locks_timer.expires ||
+            !timer_pending(&waiting_locks_timer)) {
+                CERROR("adjusting timer! (%0lx/%0lx < %0lx)\n",
+                       lock->l_callback_timeout, timeout_rounded,
+                       waiting_locks_timer.expires);
+                mod_timer(&waiting_locks_timer, timeout_rounded);
+        }
+        list_add(&lock->l_pending_chain, waiting_locks_list.prev); /* FIFO */
+        spin_unlock_bh(&waiting_locks_spinlock);
+        RETURN(1);
+}
+
+/*
+ * Remove a lock from the pending list, likely because it had its cancellation
+ * callback arrive without incident.  This adjusts the lock-timeout timer if
+ * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
+ */
+int ldlm_del_waiting_lock(struct ldlm_lock *lock)
+{
+        struct list_head *list_next;
+
+        ENTRY;
+        
+        spin_lock_bh(&waiting_locks_spinlock);
+
+        if (list_empty(&lock->l_pending_chain)) {
+                spin_unlock_bh(&waiting_locks_spinlock);
+                RETURN(0);
+        }
+
+        CERROR("no longer waiting for lock %p\n", lock);
+
+        list_next = lock->l_pending_chain.next;
+        if (lock->l_pending_chain.prev == &waiting_locks_list) {
+                /* Removing the head of the list, adjust timer. */
+                if (list_next == &waiting_locks_list) {
+                        /* No more, just cancel. */
+                        CERROR("no more locks waiting, stopping timer\n");
+                        del_timer(&waiting_locks_timer);
+                } else {
+                        struct ldlm_lock *next;
+                        next = list_entry(list_next, struct ldlm_lock,
+                                          l_pending_chain);
+                        CERROR("adjusting timer (%0lx %0lx)\n",
+                               waiting_locks_timer.expires,
+                               round_timeout(next->l_callback_timeout));
+                        mod_timer(&waiting_locks_timer,
+                                  round_timeout(next->l_callback_timeout));
+                }
+        }
+        list_del_init(&lock->l_pending_chain);
+        spin_unlock_bh(&waiting_locks_spinlock);
+        RETURN(1);
+}
+
 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                                     struct ldlm_lock_desc *desc,
                                     void *data, __u32 data_len)
@@ -59,6 +146,7 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LDLM_DEBUG(lock, "server preparing blocking AST");
         req->rq_replen = lustre_msg_size(0, NULL);
 
+        ldlm_add_waiting_lock(lock);
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
         ptlrpc_free_req(req);
@@ -211,6 +299,8 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
                 LDLM_DEBUG(lock, "server-side convert handler START");
                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
                                   &dlm_rep->lock_flags);
+                if (ldlm_del_waiting_lock(lock))
+                        CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
                 req->rq_status = 0;
         }
         if (ptlrpc_reply(req->rq_svc, req) != 0)
@@ -249,6 +339,8 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
                 ldlm_lock_cancel(lock);
+                if (ldlm_del_waiting_lock(lock))
+                        CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
                 req->rq_status = 0;
         }
 
@@ -271,6 +363,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
         int rc, do_ast;
         ENTRY;
 
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_REPLY, 0);
+
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(-ENOMEM);
@@ -278,6 +372,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_AST, 0);
+
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
@@ -480,6 +576,12 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
                 }
         }
 
+        INIT_LIST_HEAD(&waiting_locks_list);
+        spin_lock_init(&waiting_locks_spinlock);
+        waiting_locks_timer.function = waiting_locks_callback;
+        waiting_locks_timer.data = 0;
+        init_timer(&waiting_locks_timer);
+        
         RETURN(0);
 
  out_thread: