Whamcloud - gitweb
Fix up the spinlock situation around rq->rq_list and the lists it lives on.
authoradilger <adilger>
Thu, 29 Aug 2002 07:11:32 +0000 (07:11 +0000)
committeradilger <adilger>
Thu, 29 Aug 2002 07:11:32 +0000 (07:11 +0000)
We avoid a deadlock in ptlrpc_free_committed() when it calls ptlrpc_free_req().
Likewise when ptlrpc_cleanup_client() calls ptlrpc_free_req().

A note for Mike in ll_recover() for when it is re-enabled:

   We should not hold a spinlock over such a lengthy operation.
   If necessary, drop spinlock, do operation, re-get spinlock, restart loop.
   If we need to avoid re-processing items, then delete them from the list
   as they are replayed and re-add at the tail of this list, so the next
   item to process will always be at the head of the list.

lustre/include/linux/lustre_net.h
lustre/llite/recover.c
lustre/ptlrpc/client.c

index 5cdddee..e77f0aa 100644 (file)
@@ -37,14 +37,14 @@ struct ptlrpc_connection {
         struct list_head        c_link;
         struct lustre_peer      c_peer;
         __u8                    c_local_uuid[37];  /* XXX do we need this? */
-        __u8                    c_remote_uuid[37]; 
+        __u8                    c_remote_uuid[37];
 
         int                     c_level;
         __u32                   c_generation;  /* changes upon new connection */
         __u32                   c_epoch;       /* changes when peer changes */
-        __u32                   c_bootcount;   /* peer's boot count */ 
+        __u32                   c_bootcount;   /* peer's boot count */
 
-        spinlock_t              c_lock;
+        spinlock_t              c_lock;        /* also protects req->rq_list */
         __u32                   c_xid_in;
         __u32                   c_xid_out;
 
@@ -53,11 +53,11 @@ struct ptlrpc_connection {
         __u64                   c_remote_conn;
         __u64                   c_remote_token;
 
-        __u64                   c_last_xid;
-        __u64                   c_last_committed;
-        struct list_head        c_delayed_head; /* delayed until post-recovery */
-        struct list_head        c_sending_head;
-        struct list_head        c_dying_head;
+        __u64                   c_last_xid;    /* protected by c_lock */
+        __u64                   c_last_committed;/* protected by c_lock */
+        struct list_head        c_delayed_head;/* delayed until post-recovery */
+        struct list_head        c_sending_head;/* protected by c_lock */
+        struct list_head        c_dying_head;  /* protected by c_lock */
         struct recovd_data      c_recovd_data;
 
         struct list_head        c_clients; /* XXXshaver will be c_imports */
@@ -75,10 +75,10 @@ struct ptlrpc_client {
         __u32                     cli_target_devno;
 
         struct ptlrpc_connection *cli_connection;
-        
+
         void                     *cli_data;
         struct semaphore          cli_rpc_sem; /* limits outstanding requests */
-        
+
         struct list_head          cli_client_chain;
         char                     *cli_name;
 };
index cdc9488..8de4907 100644 (file)
@@ -73,10 +73,15 @@ int ll_recover(struct ptlrpc_client *cli)
                 if (req->rq_flags & PTL_RPC_FL_REPLAY) {
                         CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n",
                                req->rq_xid, conn->c_last_xid);
-                        rc = ptlrpc_replay_req(req); 
-                        if (rc) { 
-                                CERROR("recovery replay error %d for req %Ld\n", 
-                                       rc, req->rq_xid); 
+#error We should not hold a spinlock over such a lengthy operation.
+#error If necessary, drop spinlock, do operation, re-get spinlock, restart loop.
+#error If we need to avoid re-processint items, then delete them from the list
+#error as they are replayed and re-add at the tail of this list, so the next
+#error item to process will always be at the head of the list.
+                        rc = ptlrpc_replay_req(req);
+                        if (rc) {
+                                CERROR("recovery replay error %d for req %Ld\n",
+                                       rc, req->rq_xid);
                                 GOTO(out, rc);
                         }
                 }
index cce98a1..bcdcc4a 100644 (file)
@@ -331,18 +331,19 @@ static int ptlrpc_abort(struct ptlrpc_request *request)
         return 0;
 }
 
-/* caller must lock cli */
+/* caller must hold conn->c_lock */
 void ptlrpc_free_committed(struct ptlrpc_connection *conn)
 {
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
 
+restart:
         list_for_each_safe(tmp, saved, &conn->c_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
-                if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) { 
-                        CDEBUG(D_INFO, "Retaining request %Ld for replay\n",
-                               req->rq_xid);
+                if (req->rq_flags & PTL_RPC_FL_REPLAY) {
+                        CDEBUG(D_INFO, "Keeping req %p xid %Ld for replay\n",
+                               req, req->rq_xid);
                         continue;
                 }
 
@@ -350,17 +351,23 @@ void ptlrpc_free_committed(struct ptlrpc_connection *conn)
                 if (req->rq_transno > conn->c_last_committed)
                         break;
 
-                CDEBUG(D_INFO, "Marking request xid %Ld as committed ("
-                       "transno=%Lu, last_committed=%Lu\n",
+                CDEBUG(D_INFO, "Marking request %p xid %Ld as committed "
+                       "transno=%Lu, last_committed=%Lu\n", req,
                        (long long)req->rq_xid, (long long)req->rq_transno,
                        (long long)conn->c_last_committed);
                 if (atomic_dec_and_test(&req->rq_refcount)) {
-                        /* we do this to prevent free_req deadlock */
-                        list_del_init(&req->rq_list); 
                         req->rq_client = NULL;
+
+                        /* We do this to prevent free_req deadlock.  Restarting
+                         * after each removal is not so bad, as we are almost
+                         * always deleting the first item in the list.
+                         */
+                        spin_unlock(&conn->c_lock);
                         ptlrpc_free_req(req);
+                        spin_lock(&conn->c_lock);
+                        goto restart;
                 } else {
-                        list_del_init(&req->rq_list);
+                        list_del(&req->rq_list);
                         list_add(&req->rq_list, &conn->c_dying_head);
                 }
         }
@@ -381,6 +388,7 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
                 return;
         }
 
+restart1:
         spin_lock(&conn->c_lock);
         list_for_each_safe(tmp, saved, &conn->c_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
@@ -389,8 +397,11 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
                 list_del_init(&req->rq_list);
                 req->rq_client = NULL;
-                ptlrpc_free_req(req); 
+                spin_unlock(&conn->c_lock);
+                ptlrpc_free_req(req);
+                goto restart1;
         }
+restart2:
         list_for_each_safe(tmp, saved, &conn->c_dying_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 if (req->rq_client != cli)
@@ -398,7 +409,10 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
                 CERROR("Request %p is on the dying list at cleanup!\n", req);
                 list_del_init(&req->rq_list);
                 req->rq_client = NULL;
+                spin_unlock(&conn->c_lock);
                 ptlrpc_free_req(req); 
+                spin_lock(&conn->c_lock);
+                goto restart2;
         }
         spin_unlock(&conn->c_lock);
 
@@ -485,24 +499,23 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                req->rq_connection->c_level);
 
         /* XXX probably both an import and connection level are needed */
-        if (req->rq_level > req->rq_connection->c_level) { 
+        if (req->rq_level > conn->c_level) { 
                 CERROR("process %d waiting for recovery (%d > %d)\n", 
-                       current->pid, req->rq_level, req->rq_connection->c_level);
+                       current->pid, req->rq_level, conn->c_level);
 
                 spin_lock(&conn->c_lock);
-                list_del_init(&req->rq_list);
+                list_del(&req->rq_list);
                 list_add_tail(&req->rq_list, &conn->c_delayed_head);
                 spin_unlock(&conn->c_lock);
 
                 lwi = LWI_INTR(NULL, NULL);
                 rc = l_wait_event(req->rq_wait_for_rep,
-                                  req->rq_level <= req->rq_connection->c_level,
-                                  &lwi);
+                                  req->rq_level <= conn->c_level, &lwi);
 
                 spin_lock(&conn->c_lock);
                 list_del_init(&req->rq_list);
                 spin_unlock(&conn->c_lock);
-                
+
                 if (rc)
                         RETURN(rc);
 
@@ -522,7 +535,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         }
 
         spin_lock(&conn->c_lock);
-        list_del_init(&req->rq_list);
+        list_del(&req->rq_list);
         list_add_tail(&req->rq_list, &conn->c_sending_head);
         spin_unlock(&conn->c_lock);
 
@@ -567,7 +580,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         spin_lock(&conn->c_lock);
         conn->c_last_xid = req->rq_repmsg->last_xid;
         conn->c_last_committed = req->rq_repmsg->last_committed;
-        ptlrpc_free_committed(conn); 
+        ptlrpc_free_committed(conn);
         spin_unlock(&conn->c_lock);
 
         EXIT;