struct list_head *imp_replay_cursor;
/** @} */
+ /** List of not replied requests */
+ struct list_head imp_unreplied_list;
+ /** Known maximal replied XID */
+ __u64 imp_known_replied_xid;
+
/** obd device for this import */
struct obd_device *imp_obd;
union ptlrpc_async_args cr_async_args;
/** Opaq data for replay and commit callbacks. */
void *cr_cb_data;
+ /** Link to the imp->imp_unreplied_list */
+ struct list_head cr_unreplied_list;
/**
* Commit callback, called when request is committed and about to be
* freed.
#define rq_resend_cb rq_cli.cr_resend_cb
#define rq_async_args rq_cli.cr_async_args
#define rq_cb_data rq_cli.cr_cb_data
+#define rq_unreplied_list rq_cli.cr_unreplied_list
#define rq_commit_cb rq_cli.cr_commit_cb
#define rq_replay_cb rq_cli.cr_replay_cb
INIT_LIST_HEAD(&imp->imp_sending_list);
INIT_LIST_HEAD(&imp->imp_delayed_list);
INIT_LIST_HEAD(&imp->imp_committed_list);
+ INIT_LIST_HEAD(&imp->imp_unreplied_list);
+ imp->imp_known_replied_xid = 0;
imp->imp_replay_cursor = &imp->imp_committed_list;
spin_lock_init(&imp->imp_lock);
imp->imp_last_success_conn = 0;
spin_unlock(&pool->prp_lock);
}
+void ptlrpc_add_unreplied(struct ptlrpc_request *req)
+{
+ struct obd_import *imp = req->rq_import;
+ struct list_head *tmp;
+ struct ptlrpc_request *iter;
+
+ assert_spin_locked(&imp->imp_lock);
+ LASSERT(list_empty(&req->rq_unreplied_list));
+
+ /* unreplied list is sorted by xid in ascending order */
+ list_for_each_prev(tmp, &imp->imp_unreplied_list) {
+ iter = list_entry(tmp, struct ptlrpc_request,
+ rq_unreplied_list);
+
+ LASSERT(req->rq_xid != iter->rq_xid);
+ if (req->rq_xid < iter->rq_xid)
+ continue;
+ list_add(&req->rq_unreplied_list, &iter->rq_unreplied_list);
+ return;
+ }
+ list_add(&req->rq_unreplied_list, &imp->imp_unreplied_list);
+}
+
+void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req)
+{
+ req->rq_xid = ptlrpc_next_xid();
+ ptlrpc_add_unreplied(req);
+}
+
+static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
+{
+ spin_lock(&req->rq_import->imp_lock);
+ ptlrpc_assign_next_xid_nolock(req);
+ spin_unlock(&req->rq_import->imp_lock);
+}
+
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
struct ptlrpc_cli_ctx *ctx)
ptlrpc_at_set_req_timeout(request);
lustre_msg_set_opc(request->rq_reqmsg, opcode);
+ ptlrpc_assign_next_xid(request);
RETURN(0);
EXIT;
}
+__u64 ptlrpc_known_replied_xid(struct obd_import *imp)
+{
+ struct ptlrpc_request *req;
+
+ assert_spin_locked(&imp->imp_lock);
+ if (list_empty(&imp->imp_unreplied_list))
+ return 0;
+
+ req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
+ rq_unreplied_list);
+ LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid);
+
+ if (imp->imp_known_replied_xid < req->rq_xid - 1)
+ imp->imp_known_replied_xid = req->rq_xid - 1;
+
+ return req->rq_xid - 1;
+}
+
/**
* Callback function called when client receives RPC reply for \a req.
* Returns 0 on success or error code.
else
req->rq_sent = now + req->rq_nr_resend;
+ /* Resend for EINPROGRESS will use a new XID */
+ spin_lock(&imp->imp_lock);
+ list_del_init(&req->rq_unreplied_list);
+ spin_unlock(&imp->imp_lock);
+
RETURN(0);
}
static int ptlrpc_send_new_req(struct ptlrpc_request *req)
{
struct obd_import *imp = req->rq_import;
- struct list_head *tmp;
- __u64 min_xid = ~0ULL;
+ __u64 min_xid = 0;
int rc;
ENTRY;
spin_lock(&imp->imp_lock);
- /* the very first time we assign XID. it's important to assign XID
- * and put it on the list atomically, so that the lowest assigned
- * XID is always known. this is vital for multislot last_rcvd */
- if (req->rq_send_state == LUSTRE_IMP_REPLAY) {
- LASSERT(req->rq_xid != 0);
- } else {
- LASSERT(req->rq_xid == 0);
- req->rq_xid = ptlrpc_next_xid();
- }
+ LASSERT(req->rq_xid != 0);
+ LASSERT(!list_empty(&req->rq_unreplied_list));
if (!req->rq_generation_set)
req->rq_import_generation = imp->imp_generation;
list_add_tail(&req->rq_list, &imp->imp_sending_list);
atomic_inc(&req->rq_import->imp_inflight);
- /* find the lowest unreplied XID */
- list_for_each(tmp, &imp->imp_delayed_list) {
- struct ptlrpc_request *r;
- r = list_entry(tmp, struct ptlrpc_request, rq_list);
- if (r->rq_xid < min_xid)
- min_xid = r->rq_xid;
- }
- list_for_each(tmp, &imp->imp_sending_list) {
- struct ptlrpc_request *r;
- r = list_entry(tmp, struct ptlrpc_request, rq_list);
- if (r->rq_xid < min_xid)
- min_xid = r->rq_xid;
- }
+ /* find the known replied XID from the unreplied list, CONNECT
+ * and DISCONNECT requests are skipped to make the sanity check
+ * on server side happy. see process_req_last_xid().
+ *
+ * For CONNECT: Because replay requests have lower XID, it'll
+ * break the sanity check if CONNECT bump the exp_last_xid on
+ * server.
+ *
+ * For DISCONNECT: Since client will abort inflight RPC before
+ * sending DISCONNECT, DISCONNECT may carry an XID which higher
+ * than the inflight RPC.
+ */
+ if (!ptlrpc_req_is_connect(req) && !ptlrpc_req_is_disconnect(req))
+ min_xid = ptlrpc_known_replied_xid(imp);
spin_unlock(&imp->imp_lock);
- if (likely(min_xid != ~0ULL))
- lustre_msg_set_last_xid(req->rq_reqmsg, min_xid - 1);
+ lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
lustre_msg_set_status(req->rq_reqmsg, current_pid());
list_del_init(&req->rq_list);
atomic_dec(&imp->imp_inflight);
}
+ list_del_init(&req->rq_unreplied_list);
spin_unlock(&imp->imp_lock);
atomic_dec(&set->set_remaining);
if (!locked)
spin_lock(&request->rq_import->imp_lock);
list_del_init(&request->rq_replay_list);
+ list_del_init(&request->rq_unreplied_list);
if (!locked)
spin_unlock(&request->rq_import->imp_lock);
}
LASSERT(bd != NULL);
- if (!req->rq_resend || req->rq_nr_resend != 0) {
+ if (!req->rq_resend) {
/* this request has a new xid, just use it as bulk matchbits */
req->rq_mbits = req->rq_xid;
}
/**
+ * Add all replay requests back to unreplied list before start replay,
+ * so that we can make sure the known replied XID is always increased
+ * only even if when replaying requests.
+ */
+static void ptlrpc_prepare_replay(struct obd_import *imp)
+{
+ struct ptlrpc_request *req;
+
+ if (imp->imp_state != LUSTRE_IMP_REPLAY ||
+ imp->imp_resend_replay)
+ return;
+
+ /* If the server was restart during repaly, the requests may
+ * have been added to the unreplied list in former replay. */
+ spin_lock(&imp->imp_lock);
+
+ list_for_each_entry(req, &imp->imp_committed_list, rq_replay_list) {
+ if (list_empty(&req->rq_unreplied_list))
+ ptlrpc_add_unreplied(req);
+ }
+
+ list_for_each_entry(req, &imp->imp_replay_list, rq_replay_list) {
+ if (list_empty(&req->rq_unreplied_list))
+ ptlrpc_add_unreplied(req);
+ }
+
+ imp->imp_known_replied_xid = ptlrpc_known_replied_xid(imp);
+ spin_unlock(&imp->imp_lock);
+}
+
+/**
* interpret_reply callback for connect RPCs.
* Looks into returned status of connect operation and decides
* what to do with the import - i.e enter recovery, promote it to
}
finish:
+ ptlrpc_prepare_replay(imp);
rc = ptlrpc_import_recovery_state_machine(imp);
if (rc == -ENOTCONN) {
CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
lustre_msghdr_set_flags(request->rq_reqmsg,
imp->imp_msghdr_flags);
- if (request->rq_nr_resend != 0) {
+ /* If it's the first time to resend the request for EINPROGRESS,
+ * we need to allocate a new XID (see after_reply()), it's different
+ * from the resend for reply timeout. */
+ if (request->rq_nr_resend != 0 &&
+ list_empty(&request->rq_unreplied_list)) {
+ __u64 min_xid = 0;
/* resend for EINPROGRESS, allocate new xid to avoid reply
* reconstruction */
- request->rq_xid = ptlrpc_next_xid();
+ spin_lock(&imp->imp_lock);
+ ptlrpc_assign_next_xid_nolock(request);
+ request->rq_mbits = request->rq_xid;
+ min_xid = ptlrpc_known_replied_xid(imp);
+ spin_unlock(&imp->imp_lock);
+
+ lustre_msg_set_last_xid(request->rq_reqmsg, min_xid);
DEBUG_REQ(D_RPCTRACE, request, "Allocating new xid for "
"resend on EINPROGRESS");
- }
-
- if (request->rq_bulk != NULL) {
+ } else if (request->rq_bulk != NULL) {
ptlrpc_set_bulk_mbits(request);
lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits);
}
+ if (list_empty(&request->rq_unreplied_list) ||
+ request->rq_xid <= imp->imp_known_replied_xid) {
+ DEBUG_REQ(D_ERROR, request, "xid: "LPU64", replied: "LPU64", "
+ "list_empty:%d\n", request->rq_xid,
+ imp->imp_known_replied_xid,
+ list_empty(&request->rq_unreplied_list));
+ LBUG();
+ }
+
/** For enabled AT all request should have AT_SUPPORT in the
* FULL import state when OBD_CONNECT_AT is set */
LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL ||
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
void ptlrpc_resend_req(struct ptlrpc_request *request);
void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req);
+void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req);
+__u64 ptlrpc_known_replied_xid(struct obd_import *imp);
+void ptlrpc_add_unreplied(struct ptlrpc_request *req);
/* events.c */
int ptlrpc_init_portals(void);
INIT_LIST_HEAD(&cr->cr_set_chain);
INIT_LIST_HEAD(&cr->cr_ctx_chain);
+ INIT_LIST_HEAD(&cr->cr_unreplied_list);
init_waitqueue_head(&cr->cr_reply_waitq);
init_waitqueue_head(&cr->cr_set_waitq);
}
INIT_LIST_HEAD(&sr->sr_hist_list);
}
+static inline bool ptlrpc_req_is_connect(struct ptlrpc_request *req)
+{
+ if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CONNECT ||
+ lustre_msg_get_opc(req->rq_reqmsg) == OST_CONNECT ||
+ lustre_msg_get_opc(req->rq_reqmsg) == MGS_CONNECT)
+ return true;
+ else
+ return false;
+}
+
+static inline bool ptlrpc_req_is_disconnect(struct ptlrpc_request *req)
+{
+ if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_DISCONNECT ||
+ lustre_msg_get_opc(req->rq_reqmsg) == OST_DISCONNECT ||
+ lustre_msg_get_opc(req->rq_reqmsg) == MGS_DISCONNECT)
+ return true;
+ else
+ return false;
+}
+
#endif /* PTLRPC_INTERNAL_H */
/* If need to resend the last sent transno (because a reconnect
* has occurred), then stop on the matching req and send it again.
- * If, however, the last sent transno has been committed then we
+ * If, however, the last sent transno has been committed then we
* continue replay from the next request. */
if (req != NULL && imp->imp_resend_replay)
lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
spin_lock(&imp->imp_lock);
+ /* The resend replay request may have been removed from the
+ * unreplied list. */
+ if (req != NULL && imp->imp_resend_replay &&
+ list_empty(&req->rq_unreplied_list))
+ ptlrpc_add_unreplied(req);
+
imp->imp_resend_replay = 0;
spin_unlock(&imp->imp_lock);
- if (req != NULL) {
- rc = ptlrpc_replay_req(req);
- if (rc) {
- CERROR("recovery replay error %d for req "
- LPU64"\n", rc, req->rq_xid);
- RETURN(rc);
- }
- *inflight = 1;
- }
- RETURN(rc);
+ if (req != NULL) {
+ /* The request should have been added back in unreplied list
+ * by ptlrpc_prepare_replay(). */
+ LASSERT(!list_empty(&req->rq_unreplied_list));
+
+ rc = ptlrpc_replay_req(req);
+ if (rc) {
+ CERROR("recovery replay error %d for req "
+ LPU64"\n", rc, req->rq_xid);
+ RETURN(rc);
+ }
+ *inflight = 1;
+ }
+ RETURN(rc);
}
/**
RETURN(h);
}
+static int process_req_last_xid(struct ptlrpc_request *req)
+{
+ __u64 last_xid;
+ ENTRY;
+
+ /* check request's xid is consistent with export's last_xid */
+ last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
+ if (last_xid > req->rq_export->exp_last_xid)
+ req->rq_export->exp_last_xid = last_xid;
+
+ if (req->rq_xid == 0 ||
+ (req->rq_xid <= req->rq_export->exp_last_xid)) {
+ DEBUG_REQ(D_ERROR, req, "Unexpected xid %llx vs. "
+ "last_xid %llx\n", req->rq_xid,
+ req->rq_export->exp_last_xid);
+ /* Some request is allowed to be sent during replay,
+ * such as OUT update requests, FLD requests, so it
+ * is possible that replay requests has smaller XID
+ * than the exp_last_xid.
+ *
+ * Some non-replay requests may have smaller XID as
+ * well:
+ *
+ * - Client send a no_resend RPC, like statfs;
+ * - The RPC timedout (or some other error) on client,
+ * then it's removed from the unreplied list;
+ * - Client send some other request to bump the
+ * exp_last_xid on server;
+ * - The former RPC got chance to be processed;
+ */
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
+ req->rq_status = -EPROTO;
+ RETURN(ptlrpc_error(req));
+ }
+ }
+
+ /* try to release in-memory reply data */
+ if (tgt_is_multimodrpcs_client(req->rq_export)) {
+ tgt_handle_received_xid(req->rq_export,
+ lustre_msg_get_last_xid(req->rq_reqmsg));
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_RESENT | MSG_REPLAY)))
+ tgt_handle_tag(req->rq_export,
+ lustre_msg_get_tag(req->rq_reqmsg));
+ }
+ RETURN(0);
+}
+
int tgt_request_handle(struct ptlrpc_request *req)
{
struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
struct lu_target *tgt;
int request_fail_id = 0;
__u32 opc = lustre_msg_get_opc(msg);
+ struct obd_device *obd;
int rc;
-
+ bool is_connect = false;
ENTRY;
/* Refill the context, to make sure all thread keys are allocated */
* target, otherwise that should be connect operation */
if (opc == MDS_CONNECT || opc == OST_CONNECT ||
opc == MGS_CONNECT) {
+ is_connect = true;
req_capsule_set(&req->rq_pill, &RQF_CONNECT);
rc = target_handle_connect(req);
if (rc != 0) {
GOTO(out, rc);
}
- /* check request's xid is consistent with export's last_xid */
- if (req->rq_export != NULL) {
- __u64 last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
- if (last_xid != 0)
- req->rq_export->exp_last_xid = last_xid;
- if (req->rq_xid == 0 ||
- req->rq_xid <= req->rq_export->exp_last_xid) {
- DEBUG_REQ(D_ERROR, req,
- "Unexpected xid %llx vs. last_xid %llx\n",
- req->rq_xid, req->rq_export->exp_last_xid);
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 93, 0)
- LBUG();
-#endif
- req->rq_status = -EPROTO;
- rc = ptlrpc_error(req);
+ /* Skip last_xid processing for the recovery thread, otherwise, the
+ * last_xid on same request could be processed twice: first time when
+ * processing the incoming request, second time when the request is
+ * being processed by recovery thread. */
+ obd = class_exp2obd(req->rq_export);
+ if (is_connect) {
+ /* reset the exp_last_xid on each connection. */
+ req->rq_export->exp_last_xid = 0;
+ } else if (obd->obd_recovery_data.trd_processing_task !=
+ current_pid()) {
+ rc = process_req_last_xid(req);
+ if (rc)
GOTO(out, rc);
- }
}
request_fail_id = tgt->lut_request_fail_id;
tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
- /* try to release in-memory reply data */
- if (tgt_is_multimodrpcs_client(req->rq_export)) {
- tgt_handle_received_xid(req->rq_export,
- lustre_msg_get_last_xid(req->rq_reqmsg));
- if (!(lustre_msg_get_flags(req->rq_reqmsg) &
- (MSG_RESENT | MSG_REPLAY)))
- tgt_handle_tag(req->rq_export,
- lustre_msg_get_tag(req->rq_reqmsg));
- }
-
h = tgt_handler_find_check(req);
if (IS_ERR(h)) {
req->rq_status = PTR_ERR(h);