From: adilger Date: Thu, 29 Aug 2002 07:11:32 +0000 (+0000) Subject: Fix up the spinlock situation around rq->rq_list and the lists it lives on. X-Git-Tag: 0.5.5~15 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=0ba38e94c4bf2f4e74bf6a287edf97a2616328ca;p=fs%2Flustre-release.git Fix up the spinlock situation around rq->rq_list and the lists it lives on. We avoid a deadlock in ptlrpc_free_committed() when it calls ptlrpc_free_req(). Likewise when ptlrpc_cleanup_client() calls ptlrpc_free_req(). A note for Mike in ll_recover() for when it is re-enabled: We should not hold a spinlock over such a lengthy operation. If necessary, drop spinlock, do operation, re-get spinlock, restart loop. If we need to avoid re-processing items, then delete them from the list as they are replayed and re-add at the tail of this list, so the next item to process will always be at the head of the list. --- diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 5cdddee..e77f0aa 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -37,14 +37,14 @@ struct ptlrpc_connection { struct list_head c_link; struct lustre_peer c_peer; __u8 c_local_uuid[37]; /* XXX do we need this? */ - __u8 c_remote_uuid[37]; + __u8 c_remote_uuid[37]; int c_level; __u32 c_generation; /* changes upon new connection */ __u32 c_epoch; /* changes when peer changes */ - __u32 c_bootcount; /* peer's boot count */ + __u32 c_bootcount; /* peer's boot count */ - spinlock_t c_lock; + spinlock_t c_lock; /* also protects req->rq_list */ __u32 c_xid_in; __u32 c_xid_out; @@ -53,11 +53,11 @@ struct ptlrpc_connection { __u64 c_remote_conn; __u64 c_remote_token; - __u64 c_last_xid; - __u64 c_last_committed; - struct list_head c_delayed_head; /* delayed until post-recovery */ - struct list_head c_sending_head; - struct list_head c_dying_head; + __u64 c_last_xid; /* protected by c_lock */ + __u64 c_last_committed;/* protected by c_lock */ + struct list_head c_delayed_head;/* delayed until post-recovery */ + struct list_head c_sending_head;/* protected by c_lock */ + struct list_head c_dying_head; /* protected by c_lock */ struct recovd_data c_recovd_data; struct list_head c_clients; /* XXXshaver will be c_imports */ @@ -75,10 +75,10 @@ struct ptlrpc_client { __u32 cli_target_devno; struct ptlrpc_connection *cli_connection; - + void *cli_data; struct semaphore cli_rpc_sem; /* limits outstanding requests */ - + struct list_head cli_client_chain; char *cli_name; }; diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c index cdc9488..8de4907 100644 --- a/lustre/llite/recover.c +++ b/lustre/llite/recover.c @@ -73,10 +73,15 @@ int ll_recover(struct ptlrpc_client *cli) if (req->rq_flags & PTL_RPC_FL_REPLAY) { CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n", req->rq_xid, conn->c_last_xid); - rc = ptlrpc_replay_req(req); - if (rc) { - CERROR("recovery replay error %d for req %Ld\n", - rc, req->rq_xid); +#error We should not hold a spinlock over such a lengthy operation. +#error If necessary, drop spinlock, do operation, re-get spinlock, restart loop. +#error If we need to avoid re-processint items, then delete them from the list +#error as they are replayed and re-add at the tail of this list, so the next +#error item to process will always be at the head of the list. + rc = ptlrpc_replay_req(req); + if (rc) { + CERROR("recovery replay error %d for req %Ld\n", + rc, req->rq_xid); GOTO(out, rc); } } diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index cce98a1..bcdcc4a 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -331,18 +331,19 @@ static int ptlrpc_abort(struct ptlrpc_request *request) return 0; } -/* caller must lock cli */ +/* caller must hold conn->c_lock */ void ptlrpc_free_committed(struct ptlrpc_connection *conn) { struct list_head *tmp, *saved; struct ptlrpc_request *req; +restart: list_for_each_safe(tmp, saved, &conn->c_sending_head) { req = list_entry(tmp, struct ptlrpc_request, rq_list); - if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) { - CDEBUG(D_INFO, "Retaining request %Ld for replay\n", - req->rq_xid); + if (req->rq_flags & PTL_RPC_FL_REPLAY) { + CDEBUG(D_INFO, "Keeping req %p xid %Ld for replay\n", + req, req->rq_xid); continue; } @@ -350,17 +351,23 @@ void ptlrpc_free_committed(struct ptlrpc_connection *conn) if (req->rq_transno > conn->c_last_committed) break; - CDEBUG(D_INFO, "Marking request xid %Ld as committed (" - "transno=%Lu, last_committed=%Lu\n", + CDEBUG(D_INFO, "Marking request %p xid %Ld as committed " + "transno=%Lu, last_committed=%Lu\n", req, (long long)req->rq_xid, (long long)req->rq_transno, (long long)conn->c_last_committed); if (atomic_dec_and_test(&req->rq_refcount)) { - /* we do this to prevent free_req deadlock */ - list_del_init(&req->rq_list); req->rq_client = NULL; + + /* We do this to prevent free_req deadlock. Restarting + * after each removal is not so bad, as we are almost + * always deleting the first item in the list. + */ + spin_unlock(&conn->c_lock); ptlrpc_free_req(req); + spin_lock(&conn->c_lock); + goto restart; } else { - list_del_init(&req->rq_list); + list_del(&req->rq_list); list_add(&req->rq_list, &conn->c_dying_head); } } @@ -381,6 +388,7 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli) return; } +restart1: spin_lock(&conn->c_lock); list_for_each_safe(tmp, saved, &conn->c_sending_head) { req = list_entry(tmp, struct ptlrpc_request, rq_list); @@ -389,8 +397,11 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli) CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req); list_del_init(&req->rq_list); req->rq_client = NULL; - ptlrpc_free_req(req); + spin_unlock(&conn->c_lock); + ptlrpc_free_req(req); + goto restart1; } +restart2: list_for_each_safe(tmp, saved, &conn->c_dying_head) { req = list_entry(tmp, struct ptlrpc_request, rq_list); if (req->rq_client != cli) @@ -398,7 +409,10 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli) CERROR("Request %p is on the dying list at cleanup!\n", req); list_del_init(&req->rq_list); req->rq_client = NULL; + spin_unlock(&conn->c_lock); ptlrpc_free_req(req); + spin_lock(&conn->c_lock); + goto restart2; } spin_unlock(&conn->c_lock); @@ -485,24 +499,23 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) req->rq_connection->c_level); /* XXX probably both an import and connection level are needed */ - if (req->rq_level > req->rq_connection->c_level) { + if (req->rq_level > conn->c_level) { CERROR("process %d waiting for recovery (%d > %d)\n", - current->pid, req->rq_level, req->rq_connection->c_level); + current->pid, req->rq_level, conn->c_level); spin_lock(&conn->c_lock); - list_del_init(&req->rq_list); + list_del(&req->rq_list); list_add_tail(&req->rq_list, &conn->c_delayed_head); spin_unlock(&conn->c_lock); lwi = LWI_INTR(NULL, NULL); rc = l_wait_event(req->rq_wait_for_rep, - req->rq_level <= req->rq_connection->c_level, - &lwi); + req->rq_level <= conn->c_level, &lwi); spin_lock(&conn->c_lock); list_del_init(&req->rq_list); spin_unlock(&conn->c_lock); - + if (rc) RETURN(rc); @@ -522,7 +535,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) } spin_lock(&conn->c_lock); - list_del_init(&req->rq_list); + list_del(&req->rq_list); list_add_tail(&req->rq_list, &conn->c_sending_head); spin_unlock(&conn->c_lock); @@ -567,7 +580,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) spin_lock(&conn->c_lock); conn->c_last_xid = req->rq_repmsg->last_xid; conn->c_last_committed = req->rq_repmsg->last_committed; - ptlrpc_free_committed(conn); + ptlrpc_free_committed(conn); spin_unlock(&conn->c_lock); EXIT;