if (atomic_dec_and_test(&request->rq_refcount))
ptlrpc_free_req(request);
+ else
+ DEBUG_REQ(D_INFO, request, "refcount now %u",
+ atomic_read(&request->rq_refcount));
}
void ptlrpc_free_req(struct ptlrpc_request *request)
req = list_entry(tmp, struct ptlrpc_request, rq_list);
if (req->rq_flags & PTL_RPC_FL_REPLAY) {
- CDEBUG(D_INFO, "Keeping req %p xid "LPD64" for replay\n",
- req, req->rq_xid);
+ DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
continue;
}
if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
- CDEBUG(D_INFO, "Keeping in-flight req %p xid "LPD64
- " for replay\n", req, req->rq_xid);
+ DEBUG_REQ(D_HA, req, "keeping (in-flight)");
continue;
}
if (req->rq_transno > conn->c_last_committed)
break;
- CDEBUG(D_INFO, "Marking request %p xid %Ld as committed "
- "transno=%Lu, last_committed=%Lu\n", req,
- (long long)req->rq_xid, (long long)req->rq_transno,
- (long long)conn->c_last_committed);
+ DEBUG_REQ(D_HA, req, "committing (last_committed %Lu)",
+ (long long)conn->c_last_committed);
if (atomic_dec_and_test(&req->rq_refcount)) {
/* We do this to prevent free_req deadlock.
* Restarting after each removal is not so bad, as we are
req->rq_replen, req->rq_repmsg->status);
spin_lock(&conn->c_lock);
- conn->c_last_xid = req->rq_repmsg->last_xid;
- conn->c_last_committed = req->rq_repmsg->last_committed;
- ptlrpc_free_committed(conn);
+
+ /* Requests that aren't from replayable imports, or which don't have
+ * transno information, can be "committed" early.
+ */
+
+ if ((req->rq_import->imp_flags & IMP_REPLAYABLE) == 0 ||
+ req->rq_repmsg->transno == 0) {
+ /* This import doesn't support replay, so we can just "commit"
+ * this request now.
+ */
+ DEBUG_REQ(D_HA, req, "not replayable, committing:");
+ list_del_init(&req->rq_list);
+ spin_unlock(&conn->c_lock);
+ ptlrpc_req_finished(req); /* Must be called unlocked. */
+ spin_lock(&conn->c_lock);
+ }
+
+ /* Replay-enabled imports return commit-status information. */
+ if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
+ /* XXX this needs to be per-import, or multiple MDS services on
+ * XXX the same system are going to interfere messily with each
+ * XXX others' transno spaces.
+ */
+ conn->c_last_xid = req->rq_repmsg->last_xid;
+ conn->c_last_committed = req->rq_repmsg->last_committed;
+ ptlrpc_free_committed(conn);
+ }
+
spin_unlock(&conn->c_lock);
EXIT;
int ptlrpc_replay_req(struct ptlrpc_request *req)
{
- int rc = 0, old_level, old_status;
+ int rc = 0, old_level, old_status = 0;
// struct ptlrpc_client *cli = req->rq_import->imp_client;
struct l_wait_info lwi;
ENTRY;
/* temporarily set request to RECOVD level (reset at out:) */
old_level = req->rq_level;
- old_status = req->rq_repmsg->status;
+ if (req->rq_flags & PTL_RPC_FL_REPLIED)
+ old_status = req->rq_repmsg->status;
req->rq_level = LUSTRE_CONN_RECOVD;
rc = ptl_send_rpc(req);
if (rc) {
if (req->rq_replay_cb)
req->rq_replay_cb(req);
- if (req->rq_repmsg->status != old_status) {
+ if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
+ req->rq_repmsg->status != old_status) {
DEBUG_REQ(D_HA, req, "status %d, old was %d",
req->rq_repmsg->status, old_status);
}