struct list_head *imp_replay_cursor;
/** @} */
- /** List of not replied requests */
- struct list_head imp_unreplied_list;
- /** Known maximal replied XID */
- __u64 imp_known_replied_xid;
-
/** obd device for this import */
struct obd_device *imp_obd;
union ptlrpc_async_args cr_async_args;
/** Opaq data for replay and commit callbacks. */
void *cr_cb_data;
- /** Link to the imp->imp_unreplied_list */
- struct list_head cr_unreplied_list;
/**
* Commit callback, called when request is committed and about to be
* freed.
#define rq_resend_cb rq_cli.cr_resend_cb
#define rq_async_args rq_cli.cr_async_args
#define rq_cb_data rq_cli.cr_cb_data
-#define rq_unreplied_list rq_cli.cr_unreplied_list
#define rq_commit_cb rq_cli.cr_commit_cb
#define rq_replay_cb rq_cli.cr_replay_cb
INIT_LIST_HEAD(&imp->imp_sending_list);
INIT_LIST_HEAD(&imp->imp_delayed_list);
INIT_LIST_HEAD(&imp->imp_committed_list);
- INIT_LIST_HEAD(&imp->imp_unreplied_list);
- imp->imp_known_replied_xid = 0;
imp->imp_replay_cursor = &imp->imp_committed_list;
spin_lock_init(&imp->imp_lock);
imp->imp_last_success_conn = 0;
spin_unlock(&pool->prp_lock);
}
-void ptlrpc_add_unreplied(struct ptlrpc_request *req)
-{
- struct obd_import *imp = req->rq_import;
- struct list_head *tmp;
- struct ptlrpc_request *iter;
-
- assert_spin_locked(&imp->imp_lock);
- LASSERT(list_empty(&req->rq_unreplied_list));
-
- /* unreplied list is sorted by xid in ascending order */
- list_for_each_prev(tmp, &imp->imp_unreplied_list) {
- iter = list_entry(tmp, struct ptlrpc_request,
- rq_unreplied_list);
-
- LASSERT(req->rq_xid != iter->rq_xid);
- if (req->rq_xid < iter->rq_xid)
- continue;
- list_add(&req->rq_unreplied_list, &iter->rq_unreplied_list);
- return;
- }
- list_add(&req->rq_unreplied_list, &imp->imp_unreplied_list);
-}
-
-void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req)
-{
- req->rq_xid = ptlrpc_next_xid();
- ptlrpc_add_unreplied(req);
-}
-
-static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
-{
- spin_lock(&req->rq_import->imp_lock);
- ptlrpc_assign_next_xid_nolock(req);
- spin_unlock(&req->rq_import->imp_lock);
-}
-
static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode,
int count, __u32 *lengths, char **bufs,
ptlrpc_at_set_req_timeout(request);
lustre_msg_set_opc(request->rq_reqmsg, opcode);
- ptlrpc_assign_next_xid(request);
RETURN(0);
out_ctx:
EXIT;
}
-__u64 ptlrpc_known_replied_xid(struct obd_import *imp)
-{
- struct ptlrpc_request *req;
-
- assert_spin_locked(&imp->imp_lock);
- if (list_empty(&imp->imp_unreplied_list))
- return 0;
-
- req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
- rq_unreplied_list);
- LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid);
-
- if (imp->imp_known_replied_xid < req->rq_xid - 1)
- imp->imp_known_replied_xid = req->rq_xid - 1;
-
- return req->rq_xid - 1;
-}
-
/**
* Callback function called when client receives RPC reply for \a req.
* Returns 0 on success or error code.
else
req->rq_sent = now + req->rq_nr_resend;
- /* Resend for EINPROGRESS will use a new XID */
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_unreplied_list);
- spin_unlock(&imp->imp_lock);
-
RETURN(0);
}
static int ptlrpc_send_new_req(struct ptlrpc_request *req)
{
struct obd_import *imp = req->rq_import;
- __u64 min_xid = 0;
+ struct list_head *tmp;
+ __u64 min_xid = ~0ULL;
int rc;
ENTRY;
spin_lock(&imp->imp_lock);
- LASSERT(req->rq_xid != 0);
- LASSERT(!list_empty(&req->rq_unreplied_list));
+ /* the very first time we assign XID. it's important to assign XID
+ * and put it on the list atomically, so that the lowest assigned
+ * XID is always known. this is vital for multislot last_rcvd */
+ if (req->rq_send_state == LUSTRE_IMP_REPLAY) {
+ LASSERT(req->rq_xid != 0);
+ } else {
+ LASSERT(req->rq_xid == 0);
+ req->rq_xid = ptlrpc_next_xid();
+ }
if (!req->rq_generation_set)
req->rq_import_generation = imp->imp_generation;
list_add_tail(&req->rq_list, &imp->imp_sending_list);
atomic_inc(&req->rq_import->imp_inflight);
- /* find the known replied XID from the unreplied list, CONNECT
- * and DISCONNECT requests are skipped to make the sanity check
- * on server side happy. see process_req_last_xid().
- *
- * For CONNECT: Because replay requests have lower XID, it'll
- * break the sanity check if CONNECT bump the exp_last_xid on
- * server.
- *
- * For DISCONNECT: Since client will abort inflight RPC before
- * sending DISCONNECT, DISCONNECT may carry an XID which higher
- * than the inflight RPC.
- */
- if (!ptlrpc_req_is_connect(req) && !ptlrpc_req_is_disconnect(req))
- min_xid = ptlrpc_known_replied_xid(imp);
+ /* find the lowest unreplied XID */
+ list_for_each(tmp, &imp->imp_delayed_list) {
+ struct ptlrpc_request *r;
+ r = list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (r->rq_xid < min_xid)
+ min_xid = r->rq_xid;
+ }
+ list_for_each(tmp, &imp->imp_sending_list) {
+ struct ptlrpc_request *r;
+ r = list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (r->rq_xid < min_xid)
+ min_xid = r->rq_xid;
+ }
spin_unlock(&imp->imp_lock);
- lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
+ if (likely(min_xid != ~0ULL))
+ lustre_msg_set_last_xid(req->rq_reqmsg, min_xid - 1);
lustre_msg_set_status(req->rq_reqmsg, current_pid());
list_del_init(&req->rq_list);
atomic_dec(&imp->imp_inflight);
}
- list_del_init(&req->rq_unreplied_list);
spin_unlock(&imp->imp_lock);
atomic_dec(&set->set_remaining);
if (!locked)
spin_lock(&request->rq_import->imp_lock);
list_del_init(&request->rq_replay_list);
- list_del_init(&request->rq_unreplied_list);
if (!locked)
spin_unlock(&request->rq_import->imp_lock);
}
LASSERT(bd != NULL);
- if (!req->rq_resend) {
+ if (!req->rq_resend || req->rq_nr_resend != 0) {
/* this request has a new xid, just use it as bulk matchbits */
req->rq_mbits = req->rq_xid;
}
/**
- * Add all replay requests back to unreplied list before start replay,
- * so that we can make sure the known replied XID is always increased
- * only even if when replaying requests.
- */
-static void ptlrpc_prepare_replay(struct obd_import *imp)
-{
- struct ptlrpc_request *req;
-
- if (imp->imp_state != LUSTRE_IMP_REPLAY ||
- imp->imp_resend_replay)
- return;
-
- /* If the server was restart during repaly, the requests may
- * have been added to the unreplied list in former replay. */
- spin_lock(&imp->imp_lock);
-
- list_for_each_entry(req, &imp->imp_committed_list, rq_replay_list) {
- if (list_empty(&req->rq_unreplied_list))
- ptlrpc_add_unreplied(req);
- }
-
- list_for_each_entry(req, &imp->imp_replay_list, rq_replay_list) {
- if (list_empty(&req->rq_unreplied_list))
- ptlrpc_add_unreplied(req);
- }
-
- imp->imp_known_replied_xid = ptlrpc_known_replied_xid(imp);
- spin_unlock(&imp->imp_lock);
-}
-
-/**
* interpret_reply callback for connect RPCs.
* Looks into returned status of connect operation and decides
* what to do with the import - i.e enter recovery, promote it to
}
finish:
- ptlrpc_prepare_replay(imp);
rc = ptlrpc_import_recovery_state_machine(imp);
if (rc == -ENOTCONN) {
CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
lustre_msghdr_set_flags(request->rq_reqmsg,
imp->imp_msghdr_flags);
- /* If it's the first time to resend the request for EINPROGRESS,
- * we need to allocate a new XID (see after_reply()), it's different
- * from the resend for reply timeout. */
- if (request->rq_nr_resend != 0 &&
- list_empty(&request->rq_unreplied_list)) {
- __u64 min_xid = 0;
+ if (request->rq_nr_resend != 0) {
/* resend for EINPROGRESS, allocate new xid to avoid reply
* reconstruction */
- spin_lock(&imp->imp_lock);
- ptlrpc_assign_next_xid_nolock(request);
- request->rq_mbits = request->rq_xid;
- min_xid = ptlrpc_known_replied_xid(imp);
- spin_unlock(&imp->imp_lock);
-
- lustre_msg_set_last_xid(request->rq_reqmsg, min_xid);
+ request->rq_xid = ptlrpc_next_xid();
DEBUG_REQ(D_RPCTRACE, request, "Allocating new xid for "
"resend on EINPROGRESS");
- } else if (request->rq_bulk != NULL) {
- ptlrpc_set_bulk_mbits(request);
- lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits);
}
- if (list_empty(&request->rq_unreplied_list) ||
- request->rq_xid <= imp->imp_known_replied_xid) {
- DEBUG_REQ(D_ERROR, request, "xid: "LPU64", replied: "LPU64", "
- "list_empty:%d\n", request->rq_xid,
- imp->imp_known_replied_xid,
- list_empty(&request->rq_unreplied_list));
- LBUG();
+ if (request->rq_bulk != NULL) {
+ ptlrpc_set_bulk_mbits(request);
+ lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits);
}
/** For enabled AT all request should have AT_SUPPORT in the
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
void ptlrpc_resend_req(struct ptlrpc_request *request);
void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req);
-void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req);
-__u64 ptlrpc_known_replied_xid(struct obd_import *imp);
-void ptlrpc_add_unreplied(struct ptlrpc_request *req);
/* events.c */
int ptlrpc_init_portals(void);
INIT_LIST_HEAD(&cr->cr_set_chain);
INIT_LIST_HEAD(&cr->cr_ctx_chain);
- INIT_LIST_HEAD(&cr->cr_unreplied_list);
init_waitqueue_head(&cr->cr_reply_waitq);
init_waitqueue_head(&cr->cr_set_waitq);
}
INIT_LIST_HEAD(&sr->sr_hist_list);
}
-static inline bool ptlrpc_req_is_connect(struct ptlrpc_request *req)
-{
- if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CONNECT ||
- lustre_msg_get_opc(req->rq_reqmsg) == OST_CONNECT ||
- lustre_msg_get_opc(req->rq_reqmsg) == MGS_CONNECT)
- return true;
- else
- return false;
-}
-
-static inline bool ptlrpc_req_is_disconnect(struct ptlrpc_request *req)
-{
- if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_DISCONNECT ||
- lustre_msg_get_opc(req->rq_reqmsg) == OST_DISCONNECT ||
- lustre_msg_get_opc(req->rq_reqmsg) == MGS_DISCONNECT)
- return true;
- else
- return false;
-}
-
#endif /* PTLRPC_INTERNAL_H */
/* If need to resend the last sent transno (because a reconnect
* has occurred), then stop on the matching req and send it again.
- * If, however, the last sent transno has been committed then we
+ * If, however, the last sent transno has been committed then we
* continue replay from the next request. */
if (req != NULL && imp->imp_resend_replay)
lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
spin_lock(&imp->imp_lock);
- /* The resend replay request may have been removed from the
- * unreplied list. */
- if (req != NULL && imp->imp_resend_replay &&
- list_empty(&req->rq_unreplied_list))
- ptlrpc_add_unreplied(req);
-
imp->imp_resend_replay = 0;
spin_unlock(&imp->imp_lock);
- if (req != NULL) {
- /* The request should have been added back in unreplied list
- * by ptlrpc_prepare_replay(). */
- LASSERT(!list_empty(&req->rq_unreplied_list));
-
- rc = ptlrpc_replay_req(req);
- if (rc) {
- CERROR("recovery replay error %d for req "
- LPU64"\n", rc, req->rq_xid);
- RETURN(rc);
- }
- *inflight = 1;
- }
- RETURN(rc);
+ if (req != NULL) {
+ rc = ptlrpc_replay_req(req);
+ if (rc) {
+ CERROR("recovery replay error %d for req "
+ LPU64"\n", rc, req->rq_xid);
+ RETURN(rc);
+ }
+ *inflight = 1;
+ }
+ RETURN(rc);
}
/**
RETURN(h);
}
-static int process_req_last_xid(struct ptlrpc_request *req)
-{
- __u64 last_xid;
- ENTRY;
-
- /* check request's xid is consistent with export's last_xid */
- last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
- if (last_xid > req->rq_export->exp_last_xid)
- req->rq_export->exp_last_xid = last_xid;
-
- if (req->rq_xid == 0 ||
- (req->rq_xid <= req->rq_export->exp_last_xid)) {
- DEBUG_REQ(D_ERROR, req, "Unexpected xid %llx vs. "
- "last_xid %llx\n", req->rq_xid,
- req->rq_export->exp_last_xid);
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 93, 0)
- /* This LBUG() can be triggered in following case:
- *
- * - Client send a no_resend RPC, like statfs;
- * - The RPC timedout (or some other error) on client,
- * then it's removed from the unreplied list;
- * - Client send some other request to bump the
- * exp_last_xid on server;
- * - The former RPC got chance to be processed;
- * - LBUG();
- *
- * Let's keep this for debug purpose for now, and it
- * should be removed when release.
- */
- LBUG();
-#endif
- req->rq_status = -EPROTO;
- RETURN(ptlrpc_error(req));
- }
-
- /* try to release in-memory reply data */
- if (tgt_is_multimodrpcs_client(req->rq_export)) {
- tgt_handle_received_xid(req->rq_export,
- lustre_msg_get_last_xid(req->rq_reqmsg));
- if (!(lustre_msg_get_flags(req->rq_reqmsg) &
- (MSG_RESENT | MSG_REPLAY)))
- tgt_handle_tag(req->rq_export,
- lustre_msg_get_tag(req->rq_reqmsg));
- }
- RETURN(0);
-}
-
int tgt_request_handle(struct ptlrpc_request *req)
{
struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
struct lu_target *tgt;
int request_fail_id = 0;
__u32 opc = lustre_msg_get_opc(msg);
- struct obd_device *obd;
int rc;
- bool is_connect = false;
+
ENTRY;
/* Refill the context, to make sure all thread keys are allocated */
* target, otherwise that should be connect operation */
if (opc == MDS_CONNECT || opc == OST_CONNECT ||
opc == MGS_CONNECT) {
- is_connect = true;
req_capsule_set(&req->rq_pill, &RQF_CONNECT);
rc = target_handle_connect(req);
if (rc != 0) {
GOTO(out, rc);
}
- /* Skip last_xid processing for the recovery thread, otherwise, the
- * last_xid on same request could be processed twice: first time when
- * processing the incoming request, second time when the request is
- * being processed by recovery thread. */
- obd = class_exp2obd(req->rq_export);
- if (is_connect) {
- /* reset the exp_last_xid on each connection. */
- req->rq_export->exp_last_xid = 0;
- } else if (obd->obd_recovery_data.trd_processing_task !=
- current_pid()) {
- rc = process_req_last_xid(req);
- if (rc)
+ /* check request's xid is consistent with export's last_xid */
+ if (req->rq_export != NULL) {
+ __u64 last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
+ if (last_xid != 0)
+ req->rq_export->exp_last_xid = last_xid;
+ if (req->rq_xid == 0 ||
+ req->rq_xid <= req->rq_export->exp_last_xid) {
+ DEBUG_REQ(D_ERROR, req,
+ "Unexpected xid %llx vs. last_xid %llx\n",
+ req->rq_xid, req->rq_export->exp_last_xid);
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 93, 0)
+ LBUG();
+#endif
+ req->rq_status = -EPROTO;
+ rc = ptlrpc_error(req);
GOTO(out, rc);
+ }
}
request_fail_id = tgt->lut_request_fail_id;
tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
+ /* try to release in-memory reply data */
+ if (tgt_is_multimodrpcs_client(req->rq_export)) {
+ tgt_handle_received_xid(req->rq_export,
+ lustre_msg_get_last_xid(req->rq_reqmsg));
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_RESENT | MSG_REPLAY)))
+ tgt_handle_tag(req->rq_export,
+ lustre_msg_get_tag(req->rq_reqmsg));
+ }
+
h = tgt_handler_find_check(req);
if (IS_ERR(h)) {
req->rq_status = PTR_ERR(h);