LASSERT(ops->add_kiov_frag != NULL);
+ if (max_brw > PTLRPC_BULK_OPS_COUNT)
+ RETURN(NULL);
+
+ if (nfrags > LNET_MAX_IOV * max_brw)
+ RETURN(NULL);
+
OBD_ALLOC_PTR(desc);
if (!desc)
return NULL;
desc->bd_portal = portal;
desc->bd_type = type;
desc->bd_md_count = 0;
+ desc->bd_nob_last = LNET_MTU;
desc->bd_frag_ops = ops;
LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
kiov = &desc->bd_vec[desc->bd_iov_count];
+ if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+ ((desc->bd_nob_last + len) > LNET_MTU)) {
+ desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+ desc->bd_md_count++;
+ desc->bd_nob_last = 0;
+ LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+ }
+
+ desc->bd_nob_last += len;
desc->bd_nob += len;
if (pin)
LASSERT(desc != NULL);
LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
- LASSERT(desc->bd_md_count == 0); /* network hands off */
+ LASSERT(desc->bd_refs == 0); /* network hands off */
LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
LASSERT(desc->bd_frag_ops != NULL);
*/
void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
{
- __u32 serv_est;
- int idx;
- struct imp_at *at;
-
LASSERT(req->rq_import);
if (AT_OFF) {
req->rq_timeout = req->rq_import->imp_server_timeout ?
obd_timeout / 2 : obd_timeout;
} else {
- at = &req->rq_import->imp_at;
+ struct imp_at *at = &req->rq_import->imp_at;
+ timeout_t serv_est;
+ int idx;
+
idx = import_at_get_index(req->rq_import,
req->rq_request_portal);
serv_est = at_get(&at->iat_service_estimate[idx]);
+ /*
+ * Currently a 32 bit value is sent over the
+ * wire for rq_timeout so please don't change this
+ * to time64_t. The work for LU-1158 will in time
+ * replace rq_timeout with a 64 bit nanosecond value
+ */
req->rq_timeout = at_est2timeout(serv_est);
}
/*
* We could get even fancier here, using history to predict increased
* loading...
- */
-
- /*
+ *
* Let the server know what this RPC timeout is by putting it in the
* reqmsg
*/
/* Adjust max service estimate based on server value */
static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
- unsigned int serv_est)
+ timeout_t serv_est)
{
int idx;
- unsigned int oldse;
+ timeout_t oldse;
struct imp_at *at;
LASSERT(req->rq_import);
void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
timeout_t service_timeout)
{
- unsigned int nl, oldnl;
- struct imp_at *at;
time64_t now = ktime_get_real_seconds();
+ struct imp_at *at;
+ timeout_t oldnl;
+ timeout_t nl;
LASSERT(req->rq_import);
* resent time, but server sent back service time of original
* RPC.
*/
- CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
- D_ADAPTTO : D_WARNING,
- "Reported service time %u > total measured time %lld\n",
- service_timeout, now - req->rq_sent);
+ CDEBUG_LIMIT((lustre_msg_get_flags(req->rq_reqmsg) &
+ MSG_RESENT) ? D_ADAPTTO : D_WARNING,
+ "Reported service time %u > total measured time %lld\n",
+ service_timeout, now - req->rq_sent);
return;
}
- /* Network latency is total time less server processing time */
- nl = max_t(int, now - req->rq_sent -
- service_timeout, 0) + 1; /* st rounding */
+ /* Network latency is total time less server processing time,
+ * st rounding
+ */
+ nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1;
at = &req->rq_import->imp_at;
oldnl = at_measured(&at->iat_net_latency, nl);
*/
void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
{
- struct list_head *l, *tmp;
struct ptlrpc_request *req;
LASSERT(pool != NULL);
spin_lock(&pool->prp_lock);
- list_for_each_safe(l, tmp, &pool->prp_req_list) {
- req = list_entry(l, struct ptlrpc_request, rq_list);
+ while ((req = list_first_entry_or_null(&pool->prp_req_list,
+ struct ptlrpc_request,
+ rq_list))) {
list_del(&req->rq_list);
LASSERT(req->rq_reqbuf);
LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
void ptlrpc_add_unreplied(struct ptlrpc_request *req)
{
struct obd_import *imp = req->rq_import;
- struct list_head *tmp;
struct ptlrpc_request *iter;
assert_spin_locked(&imp->imp_lock);
LASSERT(list_empty(&req->rq_unreplied_list));
/* unreplied list is sorted by xid in ascending order */
- list_for_each_prev(tmp, &imp->imp_unreplied_list) {
- iter = list_entry(tmp, struct ptlrpc_request,
- rq_unreplied_list);
-
+ list_for_each_entry_reverse(iter, &imp->imp_unreplied_list,
+ rq_unreplied_list) {
LASSERT(req->rq_xid != iter->rq_xid);
if (req->rq_xid < iter->rq_xid)
continue;
}
if (fail_t) {
- *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
+ *fail_t = ktime_get_real_seconds() +
+ PTLRPC_REQ_LONG_UNLINK;
if (fail2_t)
*fail2_t = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* The RPC is infected, let the test to change the
LASSERT(!request->rq_pool);
sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
out_free:
+ atomic_dec(&imp->imp_reqs);
class_import_put(imp);
return rc;
if (request) {
ptlrpc_cli_req_init(request);
- LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
+ LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
LASSERT(imp != LP_POISON);
LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
imp->imp_client);
LASSERT(imp->imp_client != LP_POISON);
request->rq_import = class_import_get(imp);
+ atomic_inc(&imp->imp_reqs);
} else {
CERROR("request allocation out of memory\n");
}
return request;
}
+static int ptlrpc_reconnect_if_idle(struct obd_import *imp)
+{
+ int rc;
+
+ /*
+ * initiate connection if needed when the import has been
+ * referenced by the new request to avoid races with disconnect.
+ * serialize this check against conditional state=IDLE
+ * in ptlrpc_disconnect_idle_interpret()
+ */
+ spin_lock(&imp->imp_lock);
+ if (imp->imp_state == LUSTRE_IMP_IDLE) {
+ imp->imp_generation++;
+ imp->imp_initiated_at = imp->imp_generation;
+ imp->imp_state = LUSTRE_IMP_NEW;
+
+ /* connect_import_locked releases imp_lock */
+ rc = ptlrpc_connect_import_locked(imp);
+ if (rc)
+ return rc;
+ ptlrpc_pinger_add_import(imp);
+ } else {
+ spin_unlock(&imp->imp_lock);
+ }
+ return 0;
+}
+
/**
* Helper function for creating a request.
* Calls __ptlrpc_request_alloc to allocate new request sturcture and inits
if (!request)
return NULL;
- /*
- * initiate connection if needed when the import has been
- * referenced by the new request to avoid races with disconnect
- */
- if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
- int rc;
-
- CDEBUG_LIMIT(imp->imp_idle_debug,
- "%s: reconnect after %llds idle\n",
- imp->imp_obd->obd_name, ktime_get_real_seconds() -
- imp->imp_last_reply_time);
- spin_lock(&imp->imp_lock);
- if (imp->imp_state == LUSTRE_IMP_IDLE) {
- imp->imp_generation++;
- imp->imp_initiated_at = imp->imp_generation;
- imp->imp_state = LUSTRE_IMP_NEW;
-
- /* connect_import_locked releases imp_lock */
- rc = ptlrpc_connect_import_locked(imp);
- if (rc < 0) {
- ptlrpc_request_free(request);
- return NULL;
- }
- ptlrpc_pinger_add_import(imp);
- } else {
- spin_unlock(&imp->imp_lock);
+ /* don't make expensive check for idling connection
+ * if it's already connected */
+ if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) {
+ if (ptlrpc_reconnect_if_idle(imp) < 0) {
+ atomic_dec(&imp->imp_reqs);
+ ptlrpc_request_free(request);
+ return NULL;
}
}
*/
void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
- struct list_head *next;
+ struct ptlrpc_request *req;
int expected_phase;
int n = 0;
/* Requests on the set should either all be completed, or all be new */
expected_phase = (atomic_read(&set->set_remaining) == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
LASSERT(req->rq_phase == expected_phase);
n++;
}
atomic_read(&set->set_remaining) == n, "%d / %d\n",
atomic_read(&set->set_remaining), n);
- list_for_each_safe(tmp, next, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ while ((req = list_first_entry_or_null(&set->set_requests,
+ struct ptlrpc_request,
+ rq_set_chain))) {
list_del_init(&req->rq_set_chain);
LASSERT(req->rq_phase == expected_phase);
} else if (req->rq_no_delay &&
imp->imp_generation != imp->imp_initiated_at) {
/* ignore nodelay for requests initiating connections */
- *status = -EWOULDBLOCK;
+ *status = -EAGAIN;
} else if (req->rq_allow_replay &&
(imp->imp_state == LUSTRE_IMP_REPLAY ||
imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
lustre_msg_set_status(req->rq_reqmsg, current->pid);
- rc = sptlrpc_req_refresh_ctx(req, 0);
+ /* If the request to be sent is an LDLM callback, do not try to
+ * refresh context.
+ * An LDLM callback is sent by a server to a client in order to make
+ * it release a lock, on a communication channel that uses a reverse
+ * context. It cannot be refreshed on its own, as it is the 'reverse'
+ * (server-side) representation of a client context.
+ * We do not care if the reverse context is expired, and want to send
+ * the LDLM callback anyway. Once the client receives the AST, it is
+ * its job to refresh its own context if it has expired, hence
+ * refreshing the associated reverse context on server side, before
+ * being able to send the LDLM_CANCEL requested by the server.
+ */
+ if (lustre_msg_get_opc(req->rq_reqmsg) != LDLM_BL_CALLBACK &&
+ lustre_msg_get_opc(req->rq_reqmsg) != LDLM_CP_CALLBACK &&
+ lustre_msg_get_opc(req->rq_reqmsg) != LDLM_GL_CALLBACK)
+ rc = sptlrpc_req_refresh_ctx(req, 0);
if (rc) {
if (req->rq_err) {
req->rq_status = rc;
*/
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- struct list_head *tmp, *next;
+ struct ptlrpc_request *req, *next;
LIST_HEAD(comp_reqs);
int force_timer_recalc = 0;
if (atomic_read(&set->set_remaining) == 0)
RETURN(1);
- list_for_each_safe(tmp, next, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry_safe(req, next, &set->set_requests,
+ rq_set_chain) {
struct obd_import *imp = req->rq_import;
int unregistered = 0;
int async = 1;
* not corrupt any data.
*/
if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
- ptlrpc_client_recv_or_unlink(req))
+ ptlrpc_cli_wait_unlink(req))
continue;
if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
ptlrpc_client_bulk_active(req))
/*
* Check if we still need to wait for unlink.
*/
- if (ptlrpc_client_recv_or_unlink(req) ||
+ if (ptlrpc_cli_wait_unlink(req) ||
ptlrpc_client_bulk_active(req))
continue;
/* If there is no need to resend, fail it now. */
GOTO(interpret, req->rq_status);
}
+ /* don't resend too fast in case of network
+ * errors.
+ */
+ if (ktime_get_real_seconds() < (req->rq_sent + 1)
+ && req->rq_net_err && req->rq_timedout) {
+
+ DEBUG_REQ(D_INFO, req,
+ "throttle request");
+ /* Don't try to resend RPC right away
+ * as it is likely it will fail again
+ * and ptlrpc_check_set() will be
+ * called again, keeping this thread
+ * busy. Instead, wait for the next
+ * timeout. Flag it as resend to
+ * ensure we don't wait to long.
+ */
+ req->rq_resend = 1;
+ spin_unlock(&imp->imp_lock);
+ continue;
+ }
+
list_move_tail(&req->rq_list,
&imp->imp_sending_list);
req->rq_real_sent < req->rq_sent ||
req->rq_real_sent >= req->rq_deadline) ?
"timed out for sent delay" : "timed out for slow reply"),
- (s64)req->rq_sent, (s64)req->rq_real_sent);
+ req->rq_sent, req->rq_real_sent);
if (imp && obd_debug_peer_on_timeout)
LNetDebugPeer(imp->imp_connection->c_peer);
*/
void ptlrpc_expired_set(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
+ struct ptlrpc_request *req;
time64_t now = ktime_get_real_seconds();
ENTRY;
/*
* A timeout expired. See which reqs it applies to...
*/
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
/* don't expire request waiting for context */
if (req->rq_wait_ctx)
continue;
* ptlrpcd thread.
*/
ptlrpc_expire_one_request(req, 1);
+ /*
+ * Loops require that we resched once in a while to avoid
+ * RCU stalls and a few other problems.
+ */
+ cond_resched();
+
}
}
*/
static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
+ struct ptlrpc_request *req;
LASSERT(set != NULL);
CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
if (req->rq_intr)
continue;
*/
time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
time64_t now = ktime_get_real_seconds();
int timeout = 0;
struct ptlrpc_request *req;
time64_t deadline;
ENTRY;
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
/* Request in-flight? */
if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK) ||
*/
int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
struct ptlrpc_request *req;
time64_t timeout;
int rc;
if (set->set_producer)
(void)ptlrpc_set_producer(set);
else
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
if (req->rq_phase == RQ_PHASE_NEW)
(void)ptlrpc_send_new_req(req);
}
*/
if (rc == -ETIMEDOUT &&
signal_pending(current)) {
- sigset_t blocked_sigs;
+ sigset_t old, new;
- cfs_block_sigsinv(LUSTRE_FATAL_SIGS,
- &blocked_sigs);
+ siginitset(&new, LUSTRE_FATAL_SIGS);
+ sigprocmask(SIG_BLOCK, &new, &old);
/*
* In fact we only interrupt for the
* "fatal" signals like SIGINT or
*/
if (signal_pending(current))
ptlrpc_interrupted_set(set);
- cfs_restore_sigs(&blocked_sigs);
+ sigprocmask(SIG_SETMASK, &old, NULL);
}
}
* the error cases -eeb.
*/
if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry(req, &set->set_requests,
+ rq_set_chain) {
spin_lock(&req->rq_lock);
req->rq_invalid_rqset = 1;
spin_unlock(&req->rq_lock);
LASSERT(atomic_read(&set->set_remaining) == 0);
rc = set->set_rc; /* rq_status of already freed requests if any */
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
if (req->rq_status != 0)
rc = req->rq_status;
sptlrpc_cli_free_repbuf(request);
if (request->rq_import) {
+ if (!ptlrpcd_check_work(request)) {
+ LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0);
+ atomic_dec(&request->rq_import->imp_reqs);
+ }
class_import_put(request->rq_import);
request->rq_import = NULL;
}
*/
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
+ bool discard = false;
/*
* Might sleep.
*/
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
request->rq_reply_deadline = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* Nothing left to do.
*/
- if (!ptlrpc_client_recv_or_unlink(request))
+ if (!__ptlrpc_cli_wait_unlink(request, &discard))
RETURN(1);
LNetMDUnlink(request->rq_reply_md_h);
+ if (discard) /* Discard the request-out callback */
+ __LNetMDUnlink(request->rq_req_md_h, discard);
+
/*
* Let's check it once again.
*/
- if (!ptlrpc_client_recv_or_unlink(request))
+ if (!ptlrpc_cli_wait_unlink(request))
RETURN(1);
/* Move to "Unregistering" phase as reply was not unlinked yet. */
wait_queue_head_t *wq = (request->rq_set) ?
&request->rq_set->set_waitq :
&request->rq_reply_waitq;
- int seconds = LONG_UNLINK;
+ int seconds = PTLRPC_REQ_LONG_UNLINK;
/*
* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs
while (seconds > 0 &&
wait_event_idle_timeout(
*wq,
- !ptlrpc_client_recv_or_unlink(request),
+ !ptlrpc_cli_wait_unlink(request),
cfs_time_seconds(1)) == 0)
seconds -= 1;
if (seconds > 0) {
void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
struct obd_import *imp)
{
- struct list_head *tmp;
+ struct ptlrpc_request *iter;
assert_spin_locked(&imp->imp_lock);
LASSERT(imp->imp_replayable);
/* Balanced in ptlrpc_free_committed, usually. */
ptlrpc_request_addref(req);
- list_for_each_prev(tmp, &imp->imp_replay_list) {
- struct ptlrpc_request *iter = list_entry(tmp,
- struct ptlrpc_request,
- rq_replay_list);
-
+ list_for_each_entry_reverse(iter, &imp->imp_replay_list,
+ rq_replay_list) {
/*
* We may have duplicate transnos if we create and then
* open a file, or for closes retained if to match creating
*/
void ptlrpc_abort_inflight(struct obd_import *imp)
{
- struct list_head *tmp, *n;
+ struct ptlrpc_request *req;
ENTRY;
/*
* locked? Also, how do we know if the requests on the list are
* being freed at this time?
*/
- list_for_each_safe(tmp, n, &imp->imp_sending_list) {
- struct ptlrpc_request *req = list_entry(tmp,
- struct ptlrpc_request,
- rq_list);
-
+ list_for_each_entry(req, &imp->imp_sending_list, rq_list) {
DEBUG_REQ(D_RPCTRACE, req, "inflight");
spin_lock(&req->rq_lock);
spin_unlock(&req->rq_lock);
}
- list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_list);
-
+ list_for_each_entry(req, &imp->imp_delayed_list, rq_list) {
DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
spin_lock(&req->rq_lock);
*/
void ptlrpc_abort_set(struct ptlrpc_request_set *set)
{
- struct list_head *tmp, *pos;
+ struct ptlrpc_request *req;
LASSERT(set != NULL);
- list_for_each_safe(pos, tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(pos, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
spin_lock(&req->rq_lock);
if (req->rq_phase != RQ_PHASE_RPC) {
spin_unlock(&req->rq_lock);
|| req->rq_mbits == 0) {
req->rq_mbits = req->rq_xid;
} else {
- int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV;
- req->rq_mbits -= total_md - 1;
+ req->rq_mbits -= bd->bd_md_count - 1;
}
} else {
/*
* that server can infer the number of bulks that were prepared,
* see LU-1431
*/
- req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV) - 1;
+ req->rq_mbits += bd->bd_md_count - 1;
/*
* Set rq_xid as rq_mbits to indicate the final bulk for the old