out if people complain.
- Add some helpers for accessing the lower (general) and upper (op-specific)
portions of the flags.
- Use an op flag (MDS_OPEN_HAS_EA) rather than bufcount checking to determine
whether or not an open has an EA for delayed-create.
- Remove basically-unused rq_reply_md_h.
- Pack open-replay state in extra message buffer, instead of dangling it off the
request.
- Add DEBUG_REQ for consistent and clean request-dumping, and use it somewhat
liberally.
- Teach reint_link to be tolerant of -EEXIST if replaying.
- Likewise for reint_unlink and -ENOENT.
- Remove connection locking from ptlrpc_check_reply. We don't lock in
queue_wait, so if it's needed I need a better story anyway, and in the interim
this will fix a guaranteed deadlock in replay.
- Factor sending-queue request disposition checks for legibility and reuse.
- Dump retained-request state before starting replay, to aid debugging.
__u64 cookie; /* security token */
__u32 magic;
__u32 type;
- __u32 version;
+ __u32 flags;
__u32 opc;
__u64 last_xid;
__u64 last_committed;
__u32 buflens[0];
};
+/* Flags that are operation-specific go in the top 16 bits. */
+#define MSG_OP_FLAG_MASK 0xffff0000
+#define MSG_OP_FLAG_SHIFT 16
+
+/* Flags that apply to all requests are in the bottom 16 bits */
+#define MSG_GEN_FLAG_MASK 0x0000ffff
+
+static inline u16 lustre_msg_get_flags(struct lustre_msg *msg)
+{
+ return (u16)(msg->flags & MSG_GEN_FLAG_MASK);
+}
+
+static inline void lustre_msg_set_flags(struct lustre_msg *msg, u16 flags)
+{
+ msg->flags &= ~MSG_GEN_FLAG_MASK;
+ msg->flags |= flags;
+}
+
+static inline u16 lustre_msg_get_op_flags(struct lustre_msg *msg)
+{
+ return (u16)(msg->flags >> MSG_OP_FLAG_SHIFT);
+}
+
+static inline void lustre_msg_set_op_flags(struct lustre_msg *msg, u16 flags)
+{
+ msg->flags &= ~MSG_OP_FLAG_MASK;
+ msg->flags |= (flags << MSG_OP_FLAG_SHIFT);
+}
+
#define CONNMGR_REPLY 0
#define CONNMGR_CONNECT 1
__u32 generation;
};
+#define MDS_OPEN_HAS_EA 1 /* this open has an EA, for a delayed create*/
+
/* MDS update records */
/* incoming reply */
ptl_md_t rq_reply_md;
- ptl_handle_md_t rq_reply_md_h; /* we can lose this: set, never read */
ptl_handle_me_t rq_reply_me_h;
/* outgoing req/rep */
struct obd_import *rq_import;
struct ptlrpc_service *rq_svc;
- void (*rq_replay_cb)(struct ptlrpc_request *, void *);
- void *rq_replay_cb_data;
+ void (*rq_replay_cb)(struct ptlrpc_request *);
};
+#define DEBUG_REQ(level, req, fmt, args...) \
+do { \
+CDEBUG(level, \
+ "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d fl %x\n", \
+ ## args, req->rq_xid, req->rq_transno, \
+ req->rq_reqmsg ? req->rq_reqmsg->opc : -1, \
+ req->rq_connection->c_remote_uuid, \
+ req->rq_import->imp_client->cli_request_portal); \
+} while (0)
+
struct ptlrpc_bulk_page {
struct ptlrpc_bulk_desc *bp_desc;
struct list_head bp_link;
RETURN(0);
}
-static void mdc_replay_open(struct ptlrpc_request *req, void *data)
+struct replay_open_data {
+ struct lustre_handle *fh;
+};
+
+static void mdc_replay_open(struct ptlrpc_request *req)
{
- struct lustre_handle *fh = data;
+ int offset;
+ struct replay_open_data *saved;
struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 0);
+ if (lustre_msg_get_op_flags(req->rq_reqmsg) & MDS_OPEN_HAS_EA)
+ offset = 2;
+ else
+ offset = 1;
+
+ saved = lustre_msg_buf(req->rq_reqmsg, offset);
mds_unpack_body(body);
CDEBUG(D_HA, "updating from "LPD64"/"LPD64" to "LPD64"/"LPD64"\n",
- fh->addr, fh->cookie, body->handle.addr, body->handle.cookie);
- memcpy(fh, &body->handle, sizeof(*fh));
+ saved->fh->addr, saved->fh->cookie,
+ body->handle.addr, body->handle.cookie);
+ memcpy(saved->fh, &body->handle, sizeof(body->handle));
}
int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
struct ptlrpc_request **request)
{
struct mds_body *body;
- int rc, size[2] = {sizeof(*body)}, bufcount = 1;
+ struct replay_open_data *replay_data;
+ int rc, size[3] = {sizeof(*body), sizeof(*replay_data)}, bufcount = 2;
struct ptlrpc_request *req;
ENTRY;
if (lsm) {
- bufcount = 2;
- // size[1] = mdc->cl_max_mds_easize; soon...
+ bufcount = 3;
+ size[2] = size[1]; /* shuffle the spare data along */
+
size[1] = lsm->lsm_mds_easize;
}
if (!req)
GOTO(out, rc = -ENOMEM);
+ if (lsm)
+ lustre_msg_set_op_flags(req->rq_reqmsg, MDS_OPEN_HAS_EA);
+
+
req->rq_flags |= PTL_RPC_FL_REPLAY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
/* If open is replayed, we need to fix up the fh. */
req->rq_replay_cb = mdc_replay_open;
- req->rq_replay_cb_data = fh;
-
+ replay_data = lustre_msg_buf(req->rq_reqmsg, lsm ? 2 : 1);
+ replay_data->fh = fh;
+
EXIT;
out:
*request = req;
GOTO(out_free, rc = PTR_ERR(de));
/* check if this inode has seen a delayed object creation */
- if (req->rq_reqmsg->bufcount > 1) {
+ if (lustre_msg_get_op_flags(req->rq_reqmsg) & MDS_OPEN_HAS_EA) {
struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, 1);
rc = mds_store_ea(mds, req, body, de, lmm);
CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
if (!inode) {
- CDEBUG(D_INODE, "child doesn't exist (dir %ld, name %s\n",
- dir->i_ino, rec->ur_name);
+ if (rec->ur_opcode & REINT_REPLAYING) {
+ CDEBUG(D_INODE,
+ "child missing (%ld/%s); OK for REPLAYING\n",
+ dir->i_ino, rec->ur_name);
+ rc = 0;
+ } else {
+ CDEBUG(D_INODE,
+ "child doesn't exist (dir %ld, name %s)\n",
+ dir->i_ino, rec->ur_name);
+ rc = -ENOENT;
+ }
/* going to out_unlink_cancel causes an LBUG, don't know why */
- GOTO(out_unlink_dchild, rc = -ENOENT);
+ GOTO(out_unlink_dchild, rc);
}
if (offset) {
}
}
if (rec->ur_opcode & REINT_REPLAYING) {
+ /* XXX verify that the link is to the the right file? */
rc = 0;
CDEBUG(D_INODE,
"child exists (dir %ld, name %s) (REPLAYING)\n",
}
EXIT;
-
-
out_link_dchild:
l_dput(dchild);
out_link_tgt_dir:
spin_unlock(&conn->c_lock);
request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC;
- request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
request->rq_reqmsg->opc = HTON__u32(opcode);
+ request->rq_reqmsg->flags = 0;
ptlrpc_hdl2req(request, &imp->imp_handle);
RETURN(request);
if (req->rq_repmsg != NULL) {
struct ptlrpc_connection *conn = req->rq_import->imp_connection;
- spin_lock(&conn->c_lock);
if (req->rq_level > conn->c_level) {
CDEBUG(D_HA,
"rep to xid "LPD64" op %d to %s:%d: "
req->rq_import->imp_client->cli_request_portal,
req->rq_level, conn->c_level);
req->rq_repmsg = NULL;
- spin_unlock(&conn->c_lock);
GOTO(out, rc = 0);
}
- spin_unlock(&conn->c_lock);
req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
req->rq_flags |= PTL_RPC_FL_REPLIED;
GOTO(out, rc = 1);
#define EIO_IF_INVALID(conn, req) \
if ((conn->c_flags & CONN_INVALID) || \
(req->rq_import->imp_flags & IMP_INVALID)) { \
- CERROR("req xid "LPD64" op %d to %s:%d: %s_INVALID\n", \
- (unsigned long long)req->rq_xid, req->rq_reqmsg->opc, \
- req->rq_connection->c_remote_uuid, \
- req->rq_import->imp_client->cli_request_portal, \
- (conn->c_flags & CONN_INVALID) ? "CONN_" : "IMP_"); \
+ DEBUG_REQ(D_ERROR, req, "%s_INVALID:", \
+ (conn->c_flags & CONN_INVALID) ? "CONN" : "IMP"); \
spin_unlock(&conn->c_lock); \
RETURN(-EIO); \
}
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
- CDEBUG(D_NET, "subsys: %s req "LPD64" opc %d level %d, conn level %d\n",
- cli->cli_name, req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
- req->rq_connection->c_level);
+ DEBUG_REQ(D_HA, req, "subsys: %s:", cli->cli_name);
/* XXX probably both an import and connection level are needed */
if (req->rq_level > conn->c_level) {
list_add_tail(&req->rq_list, &conn->c_delayed_head);
spin_unlock(&conn->c_lock);
- CDEBUG(D_HA, "req xid "LPD64" op %d to %s:%d: waiting for "
- "recovery (%d < %d)\n",
- (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
- req->rq_connection->c_remote_uuid,
- req->rq_import->imp_client->cli_request_portal,
- req->rq_level, conn->c_level);
-
+ DEBUG_REQ(D_HA, req, "waiting for recovery: (%d < %d)",
+ req->rq_level, conn->c_level);
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(req->rq_wait_for_rep,
(req->rq_level <= conn->c_level) ||
/* the sleep below will time out, triggering recovery */
}
- CDEBUG(D_NET, "-- sleeping on req xid "LPD64" op %d to %s:%d\n",
- (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
- req->rq_connection->c_remote_uuid,
- req->rq_import->imp_client->cli_request_portal);
+ DEBUG_REQ(D_NET, req, "-- sleeping");
lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
interrupted_request,req);
l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
- CDEBUG(D_NET, "-- done sleeping on req xid "LPD64" op %d to %s:%d\n",
- (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
- req->rq_connection->c_remote_uuid,
- req->rq_import->imp_client->cli_request_portal);
+ DEBUG_REQ(D_NET, req, "-- done sleeping");
if (req->rq_flags & PTL_RPC_FL_ERR) {
ptlrpc_abort(req);
if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
PTL_RPC_FL_RESEND) {
req->rq_flags &= ~PTL_RPC_FL_RESEND;
- CDEBUG(D_HA, "resending req xid "LPD64" op %d to %s:%d\n",
- (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
- req->rq_connection->c_remote_uuid,
- req->rq_import->imp_client->cli_request_portal);
+ DEBUG_REQ(D_HA, req, "resending: ");
goto resend;
}
/* let the callback do fixups, possibly including in the request */
if (req->rq_replay_cb)
- req->rq_replay_cb(req, req->rq_replay_cb_data);
+ req->rq_replay_cb(req);
if (req->rq_repmsg->status == 0) {
CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
/* add a ref, which will be balanced in request_out_callback */
atomic_inc(&request->rq_refcount);
if (request->rq_replen != 0) {
-
/* request->rq_repmsg is set only when the reply comes in, in
* client_packet_callback() */
if (request->rq_reply_md.start) {
request->rq_reply_md.eventq = reply_in_eq;
rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
- PTL_UNLINK, &request->rq_reply_md_h);
+ PTL_UNLINK, NULL);
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
LBUG();
RETURN(0);
}
+#define REPLAY_COMMITTED 0 /* Fully processed (commit + reply) */
+#define REPLAY_REPLAY 1 /* Forced-replay (e.g. open) */
+#define REPLAY_RESEND 2 /* Resend required. */
+#define REPLAY_RESEND_IGNORE 3 /* Resend, ignore the reply (already saw it) */
+#define REPLAY_RESTART 4 /* Have to restart the call, sorry! */
+
+static int replay_state(struct ptlrpc_request *req, __u64 last_xid)
+{
+ /* This request must always be replayed. */
+ if (req->rq_flags & PTL_RPC_FL_REPLAY)
+ return REPLAY_REPLAY;
+
+ /* Uncommitted request */
+ if (req->rq_xid > last_xid) {
+ if (req->rq_flags & PTL_RPC_FL_REPLIED) {
+ /* Saw reply, so resend and ignore new reply. */
+ return REPLAY_RESEND_IGNORE;
+ }
+
+ /* Didn't see reply either, so resend. */
+ return REPLAY_RESEND;
+ }
+
+ /* This request has been committed and we saw the reply. Goodbye! */
+ if (req->rq_flags & PTL_RPC_FL_REPLIED)
+ return REPLAY_COMMITTED;
+
+ /* Request committed, but we didn't see the reply: have to restart. */
+ return REPLAY_RESTART;
+}
+
+static char *replay_state2str(int state) {
+ static char *state_strings[] = {
+ "COMMITTED", "REPLAY", "RESEND", "RESEND_IGNORE", "RESTART"
+ };
+ static char *unknown_state = "UNKNOWN";
+
+ if (state < 0 ||
+ state > (sizeof(state_strings) / sizeof(state_strings[0]))) {
+ return unknown_state;
+ }
+
+ return state_strings[state];
+}
+
int ptlrpc_replay(struct ptlrpc_connection *conn)
{
int rc = 0;
CDEBUG(D_HA, "connection %p to %s has last_xid "LPD64"\n",
conn, conn->c_remote_uuid, conn->c_last_xid);
+ list_for_each(tmp, &conn->c_sending_head) {
+ int state;
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ state = replay_state(req, conn->c_last_xid);
+ DEBUG_REQ(D_HA, req, "SENDING: %s: ", replay_state2str(state));
+ }
+
+ list_for_each(tmp, &conn->c_delayed_head) {
+ int state;
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ state = replay_state(req, conn->c_last_xid);
+ DEBUG_REQ(D_HA, req, "DELAYED: ");
+ }
+
list_for_each_safe(tmp, pos, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- /* replay what needs to be replayed */
- if (req->rq_flags & PTL_RPC_FL_REPLAY) {
- CDEBUG(D_HA, "FL_REPLAY: xid "LPD64" transno "LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
- req->rq_import->imp_client->cli_request_portal);
+ switch (replay_state(req, conn->c_last_xid)) {
+ case REPLAY_REPLAY:
+ DEBUG_REQ(D_HA, req, "REPLAY:");
rc = ptlrpc_replay_req(req);
#if 0
#error We should not hold a spinlock over such a lengthy operation.
rc, req->rq_xid);
GOTO(out, rc);
}
- }
+ break;
- /* server has seen req, we have reply: skip */
- if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid <= conn->c_last_xid) {
- CDEBUG(D_HA, "REPLIED SKIP: xid "LPD64" transno "
- LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_repmsg->transno,
- req->rq_reqmsg->opc,
- req->rq_import->imp_client->cli_request_portal);
- continue;
- }
- /* server has lost req, we have reply: resend, ign reply */
- if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid > conn->c_last_xid) {
- CDEBUG(D_HA, "REPLIED RESEND: xid "LPD64" transno "
- LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_repmsg->transno,
- req->rq_reqmsg->opc,
- req->rq_import->imp_client->cli_request_portal);
+ case REPLAY_COMMITTED:
+ DEBUG_REQ(D_HA, req, "COMMITTED:");
+ /* XXX commit now? */
+ break;
+
+ case REPLAY_RESEND_IGNORE:
+ DEBUG_REQ(D_HA, req, "RESEND_IGNORE:");
rc = ptlrpc_replay_req(req);
if (rc) {
CERROR("request resend error %d for req %Ld\n",
rc, req->rq_xid);
GOTO(out, rc);
}
- }
+ break;
- /* server has seen req, we have lost reply: -ERESTARTSYS */
- if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid <= conn->c_last_xid) {
- CDEBUG(D_HA, "RESTARTSYS: xid "LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_reqmsg->opc,
- req->rq_import->imp_client->cli_request_portal);
+ case REPLAY_RESTART:
+ DEBUG_REQ(D_HA, req, "RESTART:");
ptlrpc_restart_req(req);
- }
+ break;
- /* service has not seen req, no reply: resend */
- if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid > conn->c_last_xid) {
- CDEBUG(D_HA, "RESEND: xid "LPD64" transno "LPD64
- " op %d @ %d\n", req->rq_xid,
- req->rq_repmsg ? req->rq_repmsg->transno : 0,
- req->rq_reqmsg->opc,
- req->rq_import->imp_client->cli_request_portal);
+ case REPLAY_RESEND:
+ DEBUG_REQ(D_HA, req, "RESEND:");
ptlrpc_resend_req(req);
+ break;
+
+ default:
+ LBUG();
}
}
CERROR("recovery complete on conn %p(%s), waking delayed reqs\n",
conn, conn->c_remote_uuid);
- /* Finally, continue what we delayed since recovery started */
+ /* Finally, continue processing requests that blocked for recovery. */
list_for_each_safe(tmp, pos, &conn->c_delayed_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ DEBUG_REQ(D_HA, req, "WAKING: ");
ptlrpc_continue_req(req);
}
goto out;
}
- if (request->rq_reqmsg->version != PTLRPC_MSG_VERSION) {
- CERROR("wrong lustre_msg version %d: ptl %d from "LPX64" xid "
- LPD64"\n",
- request->rq_reqmsg->version, svc->srv_req_portal,
- event->initiator.nid, request->rq_xid);
- goto out;
- }
-
CDEBUG(D_NET, "got req "LPD64" (md: %p + %d)\n", request->rq_xid,
event->mem_desc.start, event->offset);