#include <lu_object.h>
#include <uapi/linux/lnet/lnet-types.h>
#include "ptlrpc_internal.h"
+#include <linux/delay.h>
/* The following are visible and mutable through /sys/module/ptlrpc */
int test_req_buffer_pressure = 0;
}
}
+static void ptlrpc_add_exp_list_nolock(struct ptlrpc_request *req,
+ struct obd_export *export, bool hp)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ if (hp)
+ list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+ else
+ list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+ if (tag && export->exp_used_slots)
+ set_bit(tag - 1, export->exp_used_slots);
+}
+
+static void ptlrpc_del_exp_list(struct ptlrpc_request *req)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ spin_lock(&req->rq_export->exp_rpc_lock);
+ list_del_init(&req->rq_exp_list);
+ if (tag && !req->rq_obsolete && req->rq_export->exp_used_slots)
+ clear_bit(tag - 1, req->rq_export->exp_used_slots);
+ spin_unlock(&req->rq_export->exp_rpc_lock);
+}
+
/** Change request export and move hp request from old export to new */
void ptlrpc_request_change_export(struct ptlrpc_request *req,
struct obd_export *export)
if (req->rq_export != NULL) {
LASSERT(!list_empty(&req->rq_exp_list));
/* remove rq_exp_list from last export */
- spin_lock(&req->rq_export->exp_rpc_lock);
- list_del_init(&req->rq_exp_list);
- spin_unlock(&req->rq_export->exp_rpc_lock);
- /*
- * export has one reference already, so it`s safe to
+ ptlrpc_del_exp_list(req);
+ /* export has one reference already, so it's safe to
* add req to export queue here and get another
* reference for request later
*/
spin_lock(&export->exp_rpc_lock);
- if (req->rq_ops != NULL) /* hp request */
- list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
- else
- list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+ ptlrpc_add_exp_list_nolock(req, export, req->rq_ops != NULL);
spin_unlock(&export->exp_rpc_lock);
class_export_rpc_dec(req->rq_export);
return tmp;
}
+#ifdef HAVE_SERVER_SUPPORT
+static void ptlrpc_server_mark_obsolete(struct ptlrpc_request *req)
+{
+ req->rq_obsolete = 1;
+}
+
+static void
+ptlrpc_server_mark_in_progress_obsolete(struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *tmp = NULL;
+ __u16 tag;
+
+ if (!tgt_is_increasing_xid_client(req->rq_export) ||
+ req->rq_export->exp_used_slots == NULL)
+ return;
+
+ tag = lustre_msg_get_tag(req->rq_reqmsg);
+ if (tag == 0)
+ return;
+
+ if (!test_bit(tag - 1, req->rq_export->exp_used_slots))
+ return;
+
+ /* This list should not be longer than max_requests in
+ * flights on the client, so it is not all that long.
+ * Also we only hit this codepath in case of a resent
+ * request which makes it even more rarely hit */
+ list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs, rq_exp_list) {
+ if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+ req->rq_xid > tmp->rq_xid)
+ ptlrpc_server_mark_obsolete(tmp);
+
+ }
+ list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs, rq_exp_list) {
+ if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+ req->rq_xid > tmp->rq_xid)
+ ptlrpc_server_mark_obsolete(tmp);
+ }
+}
+#endif
+
/**
* Check if a request should be assigned with a high priority.
*
if (req->rq_ops && req->rq_ops->hpreq_fini)
req->rq_ops->hpreq_fini(req);
- spin_lock(&req->rq_export->exp_rpc_lock);
- list_del_init(&req->rq_exp_list);
- spin_unlock(&req->rq_export->exp_rpc_lock);
+ ptlrpc_del_exp_list(req);
}
EXIT;
}
hp = rc > 0;
ptlrpc_nrs_req_initialize(svcpt, req, hp);
- if (req->rq_export != NULL) {
+ while (req->rq_export != NULL) {
struct obd_export *exp = req->rq_export;
/*
* atomically
*/
spin_lock_bh(&exp->exp_rpc_lock);
+#ifdef HAVE_SERVER_SUPPORT
+ ptlrpc_server_mark_in_progress_obsolete(req);
+#endif
orig = ptlrpc_server_check_resend_in_progress(req);
+ if (orig && OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+ spin_unlock_bh(&exp->exp_rpc_lock);
+
+ OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+ msleep(4 * MSEC_PER_SEC);
+ continue;
+ }
+
if (orig && likely(atomic_inc_not_zero(&orig->rq_refcount))) {
bool linked;
ptlrpc_at_add_timed(orig);
ptlrpc_server_drop_request(orig);
ptlrpc_nrs_req_finalize(req);
+
+ /* don't mark slot unused for resend in progress */
+ req->rq_obsolete = 1;
+
RETURN(-EBUSY);
}
- if (hp || req->rq_ops != NULL)
- list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
- else
- list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+ ptlrpc_add_exp_list_nolock(req, exp, hp || req->rq_ops != NULL);
+
spin_unlock_bh(&exp->exp_rpc_lock);
+ break;
}
/*