*/
#define DEBUG_SUBSYSTEM S_RPC
+#ifndef __KERNEL__
+#include <errno.h>
+#include <signal.h>
+#include <liblustre.h>
+#endif
#include <linux/obd_support.h>
#include <linux/obd_class.h>
struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
{
struct ptlrpc_connection *c;
- struct lustre_peer peer;
+ struct ptlrpc_peer peer;
int err;
- err = kportal_uuid_to_peer(uuid->uuid, &peer);
+ err = ptlrpc_uuid_to_peer(uuid, &peer);
if (err != 0) {
CERROR("cannot find peer %s!\n", uuid->uuid);
return NULL;
void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,struct obd_uuid *uuid)
{
- struct lustre_peer peer;
+ struct ptlrpc_peer peer;
int err;
- err = kportal_uuid_to_peer(uuid->uuid, &peer);
+ err = ptlrpc_uuid_to_peer (uuid, &peer);
if (err != 0) {
CERROR("cannot find peer %s!\n", uuid->uuid);
return;
}
- memcpy(&conn->c_peer, &peer, sizeof(peer));
+ memcpy (&conn->c_peer, &peer, sizeof (peer));
return;
}
LASSERT(desc->bd_connection);
- /* If PtlMDUnlink succeeds, then it hasn't completed yet. If it
- * fails, the bulk finished _just_ in time (after the timeout
- * fired but before we got this far) and we'll let it live.
+ /* If PtlMDUnlink succeeds, then bulk I/O on the MD hasn't
+ * even started yet. XXX where do we kunmup the thing?
+ *
+ * If it fail with PTL_MD_BUSY, then the network is still
+ * reading/writing the buffers and we must wait for it to
+ * complete (which it will within finite time, most
+ * probably with failure; we really need portals error
+ * events to detect that).
+ *
+ * Otherwise (PTL_INV_MD) it completed after the bd_flags
+ * test above!
*/
- if (PtlMDUnlink(desc->bd_md_h) != 0) {
+ if (PtlMDUnlink(desc->bd_md_h) != PTL_OK) {
CERROR("Near-miss on OST %s -- need to adjust "
"obd_timeout?\n",
desc->bd_connection->c_remote_uuid.uuid);
return;
}
+ /* We must take it off the imp_replay_list first. Otherwise, we'll set
+ * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
+ if (request->rq_import) {
+ unsigned long flags = 0;
+ if (!locked)
+ spin_lock_irqsave(&request->rq_import->imp_lock, flags);
+ list_del_init(&request->rq_list);
+ if (!locked)
+ spin_unlock_irqrestore(&request->rq_import->imp_lock,
+ flags);
+ }
+
if (atomic_read(&request->rq_refcount) != 0) {
CERROR("freeing request %p (%d->%s:%d) with refcount %d\n",
request, request->rq_reqmsg->opc,
request->rq_connection->c_remote_uuid.uuid,
request->rq_import->imp_client->cli_request_portal,
atomic_read (&request->rq_refcount));
- /* LBUG(); */
+ LBUG();
}
if (request->rq_repmsg != NULL) {
request->rq_reqmsg = NULL;
}
- if (request->rq_import) {
- unsigned long flags = 0;
- if (!locked)
- spin_lock_irqsave(&request->rq_import->imp_lock, flags);
- list_del_init(&request->rq_list);
- if (!locked)
- spin_unlock_irqrestore(&request->rq_import->imp_lock,
- flags);
- }
-
ptlrpc_put_connection(request->rq_connection);
OBD_FREE(request, sizeof(*request));
EXIT;
}
if (req->rq_flags & PTL_RPC_FL_RESEND) {
- ENTRY;
DEBUG_REQ(D_ERROR, req, "RESEND:");
GOTO(out, rc = 1);
}
}
/* Abort this request and cleanup any resources associated with it. */
-static int ptlrpc_abort(struct ptlrpc_request *request)
+int ptlrpc_abort(struct ptlrpc_request *request)
{
/* First remove the ME for the reply; in theory, this means
* that we can tear down the buffer safely. */
LASSERT(spin_is_locked(&imp->imp_lock));
#endif
- CDEBUG(D_HA, "committing for last_committed "LPU64"\n",
- imp->imp_peer_committed_transno);
+ CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
+ imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
void ptlrpc_continue_req(struct ptlrpc_request *req)
{
- ENTRY;
DEBUG_REQ(D_HA, req, "continuing delayed request");
req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
wake_up(&req->rq_wait_for_rep);
- EXIT;
}
void ptlrpc_resend_req(struct ptlrpc_request *req)
{
- ENTRY;
DEBUG_REQ(D_HA, req, "resending");
req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
req->rq_flags |= PTL_RPC_FL_RESEND;
req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
wake_up(&req->rq_wait_for_rep);
- EXIT;
}
void ptlrpc_restart_req(struct ptlrpc_request *req)
{
- ENTRY;
DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
req->rq_status = -ERESTARTSYS;
req->rq_flags |= PTL_RPC_FL_RESTART;
req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
wake_up(&req->rq_wait_for_rep);
- EXIT;
}
static int expired_request(void *data)
init_waitqueue_head(&req->rq_wait_for_rep);
- spin_lock_irqsave(&imp->imp_lock, flags);
- req->rq_xid = HTON__u32(++imp->imp_last_xid);
- spin_unlock_irqrestore(&imp->imp_lock, flags);
+ req->rq_xid = HTON__u32(ptlrpc_next_xid());
/* for distributed debugging */
req->rq_reqmsg->status = HTON__u32(current->pid);
- CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n",
- NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
- conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc));
+ CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%s:"LPX64
+ ":%d\n", NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
+ conn->c_peer.peer_ni->pni_name, conn->c_peer.peer_nid,
+ NTOH__u32(req->rq_reqmsg->opc));
spin_lock_irqsave(&imp->imp_lock, flags);
lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_request,
interrupted_request, req);
}
+#ifdef __KERNEL__
l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
+#else
+ {
+ extern int reply_in_callback(ptl_event_t *ev);
+ ptl_event_t reply_ev;
+ PtlEQWait(req->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h, &reply_ev);
+ reply_in_callback(&reply_ev);
+ }
+#endif
+
DEBUG_REQ(D_NET, req, "-- done sleeping");
spin_lock_irqsave(&imp->imp_lock, flags);
/* Don't resend if we were interrupted. */
if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
PTL_RPC_FL_RESEND) {
+ if (req->rq_flags & PTL_RPC_FL_NO_RESEND) {
+ ptlrpc_abort(req); /* clean up reply buffers */
+ req->rq_flags &= ~PTL_RPC_FL_NO_RESEND;
+ GOTO(out, rc = -ETIMEDOUT);
+ }
req->rq_flags &= ~PTL_RPC_FL_RESEND;
lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
DEBUG_REQ(D_HA, req, "resending: ");
* ptlrpc_queue_wait must (and does) hold imp_lock while testing this
* flag and then putting requests on sending_list or delayed_list.
*/
- spin_lock_irqsave(&imp->imp_lock, flags);
- imp->imp_flags |= IMP_INVALID;
- spin_unlock_irqrestore(&imp->imp_lock, flags);
+ if ((imp->imp_flags & IMP_REPLAYABLE) == 0) {
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ imp->imp_flags |= IMP_INVALID;
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ }
list_for_each_safe(tmp, n, &imp->imp_sending_list) {
struct ptlrpc_request *req =