spin_unlock(&pool->prp_lock);
}
+void ptlrpc_add_unreplied(struct ptlrpc_request *req)
+{
+ struct obd_import *imp = req->rq_import;
+ struct list_head *tmp;
+ struct ptlrpc_request *iter;
+
+ assert_spin_locked(&imp->imp_lock);
+ LASSERT(list_empty(&req->rq_unreplied_list));
+
+ /* unreplied list is sorted by xid in ascending order */
+ list_for_each_prev(tmp, &imp->imp_unreplied_list) {
+ iter = list_entry(tmp, struct ptlrpc_request,
+ rq_unreplied_list);
+
+ LASSERT(req->rq_xid != iter->rq_xid);
+ if (req->rq_xid < iter->rq_xid)
+ continue;
+ list_add(&req->rq_unreplied_list, &iter->rq_unreplied_list);
+ return;
+ }
+ list_add(&req->rq_unreplied_list, &imp->imp_unreplied_list);
+}
+
+void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req)
+{
+ req->rq_xid = ptlrpc_next_xid();
+ ptlrpc_add_unreplied(req);
+}
+
+static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
+{
+ spin_lock(&req->rq_import->imp_lock);
+ ptlrpc_assign_next_xid_nolock(req);
+ spin_unlock(&req->rq_import->imp_lock);
+}
+
static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode,
int count, __u32 *lengths, char **bufs,
ptlrpc_at_set_req_timeout(request);
lustre_msg_set_opc(request->rq_reqmsg, opcode);
+ ptlrpc_assign_next_xid(request);
RETURN(0);
out_ctx:
EXIT;
}
+__u64 ptlrpc_known_replied_xid(struct obd_import *imp)
+{
+ struct ptlrpc_request *req;
+
+ assert_spin_locked(&imp->imp_lock);
+ if (list_empty(&imp->imp_unreplied_list))
+ return 0;
+
+ req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
+ rq_unreplied_list);
+ LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid);
+
+ if (imp->imp_known_replied_xid < req->rq_xid - 1)
+ imp->imp_known_replied_xid = req->rq_xid - 1;
+
+ return req->rq_xid - 1;
+}
+
/**
* Callback function called when client receives RPC reply for \a req.
* Returns 0 on success or error code.
else
req->rq_sent = now + req->rq_nr_resend;
+ /* Resend for EINPROGRESS will use a new XID */
+ spin_lock(&imp->imp_lock);
+ list_del_init(&req->rq_unreplied_list);
+ spin_unlock(&imp->imp_lock);
+
RETURN(0);
}
static int ptlrpc_send_new_req(struct ptlrpc_request *req)
{
struct obd_import *imp = req->rq_import;
- struct list_head *tmp;
- __u64 min_xid = ~0ULL;
+ __u64 min_xid = 0;
int rc;
ENTRY;
spin_lock(&imp->imp_lock);
- /* the very first time we assign XID. it's important to assign XID
- * and put it on the list atomically, so that the lowest assigned
- * XID is always known. this is vital for multislot last_rcvd */
- if (req->rq_send_state == LUSTRE_IMP_REPLAY) {
- LASSERT(req->rq_xid != 0);
- } else {
- LASSERT(req->rq_xid == 0);
- req->rq_xid = ptlrpc_next_xid();
- }
+ LASSERT(req->rq_xid != 0);
+ LASSERT(!list_empty(&req->rq_unreplied_list));
if (!req->rq_generation_set)
req->rq_import_generation = imp->imp_generation;
list_add_tail(&req->rq_list, &imp->imp_sending_list);
atomic_inc(&req->rq_import->imp_inflight);
- /* find the lowest unreplied XID */
- list_for_each(tmp, &imp->imp_delayed_list) {
- struct ptlrpc_request *r;
- r = list_entry(tmp, struct ptlrpc_request, rq_list);
- if (r->rq_xid < min_xid)
- min_xid = r->rq_xid;
- }
- list_for_each(tmp, &imp->imp_sending_list) {
- struct ptlrpc_request *r;
- r = list_entry(tmp, struct ptlrpc_request, rq_list);
- if (r->rq_xid < min_xid)
- min_xid = r->rq_xid;
- }
+ /* find the known replied XID from the unreplied list, CONNECT
+ * and DISCONNECT requests are skipped to make the sanity check
+ * on server side happy. see process_req_last_xid().
+ *
+ * For CONNECT: Because replay requests have lower XID, it'll
+ * break the sanity check if CONNECT bump the exp_last_xid on
+ * server.
+ *
+ * For DISCONNECT: Since client will abort inflight RPC before
+ * sending DISCONNECT, DISCONNECT may carry an XID which higher
+ * than the inflight RPC.
+ */
+ if (!ptlrpc_req_is_connect(req) && !ptlrpc_req_is_disconnect(req))
+ min_xid = ptlrpc_known_replied_xid(imp);
spin_unlock(&imp->imp_lock);
- if (likely(min_xid != ~0ULL))
- lustre_msg_set_last_xid(req->rq_reqmsg, min_xid - 1);
+ lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
lustre_msg_set_status(req->rq_reqmsg, current_pid());
list_del_init(&req->rq_list);
atomic_dec(&imp->imp_inflight);
}
+ list_del_init(&req->rq_unreplied_list);
spin_unlock(&imp->imp_lock);
atomic_dec(&set->set_remaining);
if (!locked)
spin_lock(&request->rq_import->imp_lock);
list_del_init(&request->rq_replay_list);
+ list_del_init(&request->rq_unreplied_list);
if (!locked)
spin_unlock(&request->rq_import->imp_lock);
}
LASSERT(bd != NULL);
- if (!req->rq_resend || req->rq_nr_resend != 0) {
+ if (!req->rq_resend) {
/* this request has a new xid, just use it as bulk matchbits */
req->rq_mbits = req->rq_xid;