struct list_head c_link;
struct lustre_peer c_peer;
__u8 c_local_uuid[37]; /* XXX do we need this? */
- __u8 c_remote_uuid[37];
+ __u8 c_remote_uuid[37];
int c_level;
__u32 c_generation; /* changes upon new connection */
__u32 c_epoch; /* changes when peer changes */
- __u32 c_bootcount; /* peer's boot count */
+ __u32 c_bootcount; /* peer's boot count */
- spinlock_t c_lock;
+ spinlock_t c_lock; /* also protects req->rq_list */
__u32 c_xid_in;
__u32 c_xid_out;
__u64 c_remote_conn;
__u64 c_remote_token;
- __u64 c_last_xid;
- __u64 c_last_committed;
- struct list_head c_delayed_head; /* delayed until post-recovery */
- struct list_head c_sending_head;
- struct list_head c_dying_head;
+ __u64 c_last_xid; /* protected by c_lock */
+ __u64 c_last_committed;/* protected by c_lock */
+ struct list_head c_delayed_head;/* delayed until post-recovery */
+ struct list_head c_sending_head;/* protected by c_lock */
+ struct list_head c_dying_head; /* protected by c_lock */
struct recovd_data c_recovd_data;
struct list_head c_clients; /* XXXshaver will be c_imports */
__u32 cli_target_devno;
struct ptlrpc_connection *cli_connection;
-
+
void *cli_data;
struct semaphore cli_rpc_sem; /* limits outstanding requests */
-
+
struct list_head cli_client_chain;
char *cli_name;
};
if (req->rq_flags & PTL_RPC_FL_REPLAY) {
CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n",
req->rq_xid, conn->c_last_xid);
- rc = ptlrpc_replay_req(req);
- if (rc) {
- CERROR("recovery replay error %d for req %Ld\n",
- rc, req->rq_xid);
+#error We should not hold a spinlock over such a lengthy operation.
+#error If necessary, drop spinlock, do operation, re-get spinlock, restart loop.
+#error If we need to avoid re-processint items, then delete them from the list
+#error as they are replayed and re-add at the tail of this list, so the next
+#error item to process will always be at the head of the list.
+ rc = ptlrpc_replay_req(req);
+ if (rc) {
+ CERROR("recovery replay error %d for req %Ld\n",
+ rc, req->rq_xid);
GOTO(out, rc);
}
}
return 0;
}
-/* caller must lock cli */
+/* caller must hold conn->c_lock */
void ptlrpc_free_committed(struct ptlrpc_connection *conn)
{
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
+restart:
list_for_each_safe(tmp, saved, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) {
- CDEBUG(D_INFO, "Retaining request %Ld for replay\n",
- req->rq_xid);
+ if (req->rq_flags & PTL_RPC_FL_REPLAY) {
+ CDEBUG(D_INFO, "Keeping req %p xid %Ld for replay\n",
+ req, req->rq_xid);
continue;
}
if (req->rq_transno > conn->c_last_committed)
break;
- CDEBUG(D_INFO, "Marking request xid %Ld as committed ("
- "transno=%Lu, last_committed=%Lu\n",
+ CDEBUG(D_INFO, "Marking request %p xid %Ld as committed "
+ "transno=%Lu, last_committed=%Lu\n", req,
(long long)req->rq_xid, (long long)req->rq_transno,
(long long)conn->c_last_committed);
if (atomic_dec_and_test(&req->rq_refcount)) {
- /* we do this to prevent free_req deadlock */
- list_del_init(&req->rq_list);
req->rq_client = NULL;
+
+ /* We do this to prevent free_req deadlock. Restarting
+ * after each removal is not so bad, as we are almost
+ * always deleting the first item in the list.
+ */
+ spin_unlock(&conn->c_lock);
ptlrpc_free_req(req);
+ spin_lock(&conn->c_lock);
+ goto restart;
} else {
- list_del_init(&req->rq_list);
+ list_del(&req->rq_list);
list_add(&req->rq_list, &conn->c_dying_head);
}
}
return;
}
+restart1:
spin_lock(&conn->c_lock);
list_for_each_safe(tmp, saved, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
list_del_init(&req->rq_list);
req->rq_client = NULL;
- ptlrpc_free_req(req);
+ spin_unlock(&conn->c_lock);
+ ptlrpc_free_req(req);
+ goto restart1;
}
+restart2:
list_for_each_safe(tmp, saved, &conn->c_dying_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
if (req->rq_client != cli)
CERROR("Request %p is on the dying list at cleanup!\n", req);
list_del_init(&req->rq_list);
req->rq_client = NULL;
+ spin_unlock(&conn->c_lock);
ptlrpc_free_req(req);
+ spin_lock(&conn->c_lock);
+ goto restart2;
}
spin_unlock(&conn->c_lock);
req->rq_connection->c_level);
/* XXX probably both an import and connection level are needed */
- if (req->rq_level > req->rq_connection->c_level) {
+ if (req->rq_level > conn->c_level) {
CERROR("process %d waiting for recovery (%d > %d)\n",
- current->pid, req->rq_level, req->rq_connection->c_level);
+ current->pid, req->rq_level, conn->c_level);
spin_lock(&conn->c_lock);
- list_del_init(&req->rq_list);
+ list_del(&req->rq_list);
list_add_tail(&req->rq_list, &conn->c_delayed_head);
spin_unlock(&conn->c_lock);
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(req->rq_wait_for_rep,
- req->rq_level <= req->rq_connection->c_level,
- &lwi);
+ req->rq_level <= conn->c_level, &lwi);
spin_lock(&conn->c_lock);
list_del_init(&req->rq_list);
spin_unlock(&conn->c_lock);
-
+
if (rc)
RETURN(rc);
}
spin_lock(&conn->c_lock);
- list_del_init(&req->rq_list);
+ list_del(&req->rq_list);
list_add_tail(&req->rq_list, &conn->c_sending_head);
spin_unlock(&conn->c_lock);
spin_lock(&conn->c_lock);
conn->c_last_xid = req->rq_repmsg->last_xid;
conn->c_last_committed = req->rq_repmsg->last_committed;
- ptlrpc_free_committed(conn);
+ ptlrpc_free_committed(conn);
spin_unlock(&conn->c_lock);
EXIT;