cl->cli_name = name;
}
-struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
-{
- return &req->rq_connection->c_remote_uuid;
-}
-
struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
{
struct ptlrpc_connection *c;
request->rq_request_portal = imp->imp_client->cli_request_portal;
request->rq_reply_portal = imp->imp_client->cli_reply_portal;
- request->rq_connection = ptlrpc_connection_addref(imp->imp_connection);
-
spin_lock_init(&request->rq_lock);
INIT_LIST_HEAD(&request->rq_list);
+ INIT_LIST_HEAD(&request->rq_replay_list);
init_waitqueue_head(&request->rq_reply_waitq);
request->rq_xid = ptlrpc_next_xid();
atomic_set(&request->rq_refcount, 1);
LASSERT (status != NULL);
*status = 0;
- /* A new import, or one that has been cleaned up.
- */
if (imp->imp_state == LUSTRE_IMP_NEW) {
DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
*status = -EIO;
+ LBUG();
+ }
+ else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
+ DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
+ *status = -EIO;
}
/*
* If the import has been invalidated (such as by an OST failure), the
err = req->rq_repmsg->status;
if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
- DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
+ DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d",
+ err);
RETURN(err < 0 ? err : -EINVAL);
}
RETURN(err);
}
-static int after_reply(struct ptlrpc_request *req, int *restartp)
+static int after_reply(struct ptlrpc_request *req)
{
unsigned long flags;
struct obd_import *imp = req->rq_import;
LASSERT(!req->rq_receiving_reply);
LASSERT(req->rq_replied);
- if (restartp != NULL)
- *restartp = 0;
-
/* NB Until this point, the whole of the incoming message,
* including buflens, status etc is in the sender's byte order. */
ptlrpc_request_handle_notconn(req);
- if (req->rq_err)
- RETURN(-EIO);
-
- if (req->rq_no_resend)
- RETURN(rc); /* -ENOTCONN */
-
- if (req->rq_resend) {
- if (restartp == NULL)
- LBUG(); /* async resend not supported yet */
- spin_lock_irqsave (&req->rq_lock, flags);
- req->rq_resend = 0;
- spin_unlock_irqrestore (&req->rq_lock, flags);
- *restartp = 1;
- lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
- DEBUG_REQ(D_HA, req, "resending: ");
- RETURN(0);
- }
-
- CERROR("request should be err or resend: %p\n", req);
- LBUG();
+ RETURN(rc);
}
if (req->rq_import->imp_replayable) {
int rc;
ENTRY;
- LASSERT(req->rq_send_state == LUSTRE_IMP_FULL);
LASSERT(req->rq_phase == RQ_PHASE_NEW);
req->rq_phase = RQ_PHASE_RPC;
}
if (req->rq_phase == RQ_PHASE_RPC) {
- int do_restart = 0;
if (req->rq_waiting || req->rq_resend) {
int status;
spin_lock_irqsave(&imp->imp_lock, flags);
if (req->rq_resend) {
lustre_msg_add_flags(req->rq_reqmsg,
MSG_RESENT);
- spin_lock_irqsave(&req->rq_lock, flags);
- req->rq_resend = 0;
- spin_unlock_irqrestore(&req->rq_lock,
- flags);
-
ptlrpc_unregister_reply(req);
if (req->rq_bulk) {
__u64 old_xid = req->rq_xid;
list_del_init(&req->rq_list);
spin_unlock_irqrestore(&imp->imp_lock, flags);
- req->rq_status = after_reply(req, &do_restart);
- if (do_restart) {
+ req->rq_status = after_reply(req);
+ if (req->rq_resend) {
+ /* Add this req to the delayed list so
+ it can be errored if the import is
+ evicted after recovery. */
spin_lock_irqsave (&req->rq_lock, flags);
- req->rq_resend = 1; /* ugh */
- spin_unlock_irqrestore (&req->rq_lock, flags);
+ list_add_tail(&req->rq_list,
+ &imp->imp_delayed_list);
+ spin_unlock_irqrestore(&req->rq_lock, flags);
continue;
}
if (req->rq_bulk != NULL)
ptlrpc_unregister_bulk (req);
+ req->rq_phase = RQ_PHASE_COMPLETE;
+
if (req->rq_interpret_reply != NULL) {
int (*interpreter)(struct ptlrpc_request *,void *,int) =
req->rq_interpret_reply;
imp->imp_connection->c_peer.peer_nid,
req->rq_reqmsg->opc);
- req->rq_phase = RQ_PHASE_COMPLETE;
set->set_remaining--;
}
RETURN(1);
/* If this request is for recovery or other primordial tasks,
- * don't go back to sleep, and don't start recovery again.. */
- if (req->rq_send_state != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov)
+ * then error it out here. */
+ if (req->rq_send_state != LUSTRE_IMP_FULL ||
+ imp->imp_obd->obd_no_recov) {
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_status = -ETIMEDOUT;
+ req->rq_err = 1;
+ spin_unlock_irqrestore (&req->rq_lock, flags);
RETURN(1);
+ }
ptlrpc_fail_import(imp, req->rq_import_generation);
list_entry(tmp, struct ptlrpc_request, rq_set_chain);
/* request in-flight? */
- if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
+ if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting
+ && !req->rq_resend) ||
(req->rq_phase == RQ_PHASE_BULK)))
continue;
unsigned long flags = 0;
if (!locked)
spin_lock_irqsave(&request->rq_import->imp_lock, flags);
- list_del_init(&request->rq_list);
+ list_del_init(&request->rq_replay_list);
if (!locked)
spin_unlock_irqrestore(&request->rq_import->imp_lock,
flags);
if (request->rq_bulk != NULL)
ptlrpc_free_bulk(request->rq_bulk);
- ptlrpc_put_connection(request->rq_connection);
OBD_FREE(request, sizeof(*request));
EXIT;
}
__ptlrpc_req_finished(request, 0);
}
-static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
-{
- OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
- request->rq_reqmsg = NULL;
- request->rq_reqlen = 0;
-}
-
/* Disengage the client's reply buffer from the network
* NB does _NOT_ unregister any client-side bulk.
* IDEMPOTENT, but _not_ safe against concurrent callers.
imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
/* XXX ok to remove when 1357 resolved - rread 05/29/03 */
LASSERT(req != last_req);
free_req:
if (req->rq_commit_cb != NULL)
req->rq_commit_cb(req);
- list_del_init(&req->rq_list);
+ list_del_init(&req->rq_replay_list);
__ptlrpc_req_finished(req, 1);
}
{
unsigned long flags;
- DEBUG_REQ(D_HA, req, "resending");
+ DEBUG_REQ(D_HA, req, "going to resend");
req->rq_reqmsg->handle.cookie = 0;
- ptlrpc_put_connection(req->rq_connection);
- req->rq_connection =
- ptlrpc_connection_addref(req->rq_import->imp_connection);
req->rq_status = -EAGAIN;
spin_lock_irqsave (&req->rq_lock, flags);
LASSERT(spin_is_locked(&imp->imp_lock));
#endif
+ /* don't re-add requests that have been replayed */
+ if (!list_empty(&req->rq_replay_list))
+ return;
+
LASSERT(imp->imp_replayable);
/* Balanced in ptlrpc_free_committed, usually. */
ptlrpc_request_addref(req);
list_for_each_prev(tmp, &imp->imp_replay_list) {
struct ptlrpc_request *iter =
- list_entry(tmp, struct ptlrpc_request, rq_list);
+ list_entry(tmp, struct ptlrpc_request, rq_replay_list);
/* We may have duplicate transnos if we create and then
* open a file, or for closes retained if to match creating
continue;
}
- list_add(&req->rq_list, &iter->rq_list);
+ list_add(&req->rq_replay_list, &iter->rq_replay_list);
return;
}
- list_add_tail(&req->rq_list, &imp->imp_replay_list);
+ list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
}
int ptlrpc_queue_wait(struct ptlrpc_request *req)
struct l_wait_info lwi;
struct obd_import *imp = req->rq_import;
unsigned long flags;
- int do_restart = 0;
int timeout = 0;
ENTRY;
list_add_tail(&req->rq_list, &imp->imp_delayed_list);
spin_unlock_irqrestore(&imp->imp_lock, flags);
- DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d > %d)",
- current->comm, req->rq_send_state, imp->imp_state);
+ DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
+ current->comm,
+ ptlrpc_import_state_name(req->rq_send_state),
+ ptlrpc_import_state_name(imp->imp_state));
lwi = LWI_INTR(interrupted_request, req);
rc = l_wait_event(req->rq_reply_waitq,
(req->rq_send_state == imp->imp_state ||
req->rq_err),
&lwi);
- DEBUG_REQ(D_HA, req, "\"%s\" awake: (%d > %d or %d == 1)",
- current->comm, imp->imp_state, req->rq_send_state,
+ DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
+ current->comm,
+ ptlrpc_import_state_name(imp->imp_state),
+ ptlrpc_import_state_name(req->rq_send_state),
req->rq_err);
spin_lock_irqsave(&imp->imp_lock, flags);
GOTO(out, rc);
}
+ if (req->rq_resend) {
+ lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+
+ if (req->rq_bulk != NULL)
+ ptlrpc_unregister_bulk (req);
+
+ DEBUG_REQ(D_HA, req, "resending: ");
+ }
+
/* XXX this is the same as ptlrpc_set_wait */
LASSERT(list_empty(&req->rq_list));
list_add_tail(&req->rq_list, &imp->imp_sending_list);
/* ...unless we were specifically told otherwise. */
if (req->rq_no_resend)
GOTO(out, rc = -ETIMEDOUT);
- spin_lock_irqsave (&req->rq_lock, flags);
- req->rq_resend = 0;
- spin_unlock_irqrestore (&req->rq_lock, flags);
- lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
-
- if (req->rq_bulk != NULL)
- ptlrpc_unregister_bulk (req);
-
- DEBUG_REQ(D_HA, req, "resending: ");
spin_lock_irqsave(&imp->imp_lock, flags);
goto restart;
}
GOTO(out, rc = req->rq_status);
}
- rc = after_reply (req, &do_restart);
+ rc = after_reply (req);
/* NB may return +ve success rc */
- if (do_restart) {
- if (req->rq_bulk != NULL)
- ptlrpc_unregister_bulk (req);
- DEBUG_REQ(D_HA, req, "resending: ");
+ if (req->rq_resend) {
spin_lock_irqsave(&imp->imp_lock, flags);
goto restart;
}
RETURN(rc);
}
-int ptlrpc_replay_req(struct ptlrpc_request *req)
-{
- int rc = 0, old_state, old_status = 0;
- // struct ptlrpc_client *cli = req->rq_import->imp_client;
- struct l_wait_info lwi;
- ENTRY;
-
- LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
-
- /* I don't touch rq_phase here, so the debug log can show what
- * state it was left in */
-
- /* Not handling automatic bulk replay yet (or ever?) */
- LASSERT(req->rq_bulk == NULL);
-
- DEBUG_REQ(D_NET, req, "about to replay");
-
- /* Update request's state, since we might have a new connection. */
- ptlrpc_put_connection(req->rq_connection);
- req->rq_connection =
- ptlrpc_connection_addref(req->rq_import->imp_connection);
-
- /* temporarily set request to REPLAY level---not strictly
- * necessary since ptl_send_rpc doesn't check state, but let's
- * be consistent.*/
- old_state = req->rq_send_state;
-
- /*
- * Q: "How can a req get on the replay list if it wasn't replied?"
- * A: "If we failed during the replay of this request, it will still
- * be on the list, but rq_replied will have been reset to 0."
- */
- if (req->rq_replied)
- old_status = req->rq_repmsg->status;
- req->rq_send_state = LUSTRE_IMP_REPLAY;
- rc = ptl_send_rpc(req);
- if (rc) {
- CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
- ptlrpc_cleanup_request_buf(req);
- // up(&cli->cli_rpc_sem);
- GOTO(out, rc = -rc);
- }
-
- CDEBUG(D_OTHER, "-- sleeping\n");
- lwi = LWI_INTR(NULL, NULL); /* XXX needs timeout, nested recovery */
- l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
- CDEBUG(D_OTHER, "-- done\n");
-
- // up(&cli->cli_rpc_sem);
+struct ptlrpc_replay_async_args {
+ int praa_old_state;
+ int praa_old_status;
+};
- /* If the reply was received normally, this just grabs the spinlock
- * (ensuring the reply callback has returned), sees that
- * req->rq_receiving_reply is clear and returns. */
- ptlrpc_unregister_reply (req);
+static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
+ void * data, int rc)
+{
+ struct ptlrpc_replay_async_args *aa = data;
+ struct obd_import *imp = req->rq_import;
+ unsigned long flags;
- if (!req->rq_replied) {
- CERROR("Unknown reason for wakeup\n");
- /* XXX Phil - I end up here when I kill obdctl */
- /* ...that's because signals aren't all masked in
- * l_wait_event() -eeb */
- GOTO(out, rc = -EINTR);
- }
+ atomic_dec(&imp->imp_replay_inflight);
#if SWAB_PARANOIA
/* Clear reply swab mask; this is a new reply in sender's byte order */
CERROR("unpack_rep failed: %d\n", rc);
GOTO(out, rc = -EPROTO);
}
-#if 0
- /* FIXME: Enable when BlueArc makes new release */
- if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
- req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
- CERROR("invalid packet type received (type=%u)\n",
- req->rq_repmsg->type);
- GOTO(out, rc = -EPROTO);
- }
-#endif
if (req->rq_repmsg->type == PTL_RPC_MSG_ERR &&
req->rq_repmsg->status == -ENOTCONN)
/* The transno had better not change over replay. */
LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
- CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
+ DEBUG_REQ(D_HA, req, "got rep");
/* let the callback do fixups, possibly including in the request */
if (req->rq_replay_cb)
req->rq_replay_cb(req);
- if (req->rq_replied && req->rq_repmsg->status != old_status) {
+ if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
- req->rq_repmsg->status, old_status);
+ req->rq_repmsg->status, aa->praa_old_status);
} else {
/* Put it back for re-replay. */
- req->rq_status = old_status;
+ req->rq_repmsg->status = aa->praa_old_status;
}
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ imp->imp_last_replay_transno = req->rq_transno;
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+ /* continue with recovery */
+ rc = ptlrpc_import_recovery_state_machine(imp);
out:
- req->rq_send_state = old_state;
+ req->rq_send_state = aa->praa_old_state;
+
+ if (rc != 0)
+ /* this replay failed, so restart recovery */
+ ptlrpc_connect_import(imp, NULL);
+
RETURN(rc);
}
+
+int ptlrpc_replay_req(struct ptlrpc_request *req)
+{
+ struct ptlrpc_replay_async_args *aa;
+ ENTRY;
+
+ LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
+
+ /* Not handling automatic bulk replay yet (or ever?) */
+ LASSERT(req->rq_bulk == NULL);
+
+ DEBUG_REQ(D_HA, req, "REPLAY");
+
+ LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
+ aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
+ memset(aa, 0, sizeof *aa);
+
+ /* Prepare request to be resent with ptlrpcd */
+ aa->praa_old_state = req->rq_send_state;
+ req->rq_send_state = LUSTRE_IMP_REPLAY;
+ req->rq_phase = RQ_PHASE_NEW;
+ /*
+ * Q: "How can a req get on the replay list if it wasn't replied?"
+ * A: "If we failed during the replay of this request, it will still
+ * be on the list, but rq_replied will have been reset to 0."
+ */
+ if (req->rq_replied) {
+ aa->praa_old_status = req->rq_repmsg->status;
+ req->rq_status = 0;
+ req->rq_replied = 0;
+ }
+
+ req->rq_interpret_reply = ptlrpc_replay_interpret;
+ atomic_inc(&req->rq_import->imp_replay_inflight);
+ ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
+
+ ptlrpcd_add_req(req);
+ RETURN(0);
+}
+
void ptlrpc_abort_inflight(struct obd_import *imp)
{
unsigned long flags;