LASSERT(ops->add_kiov_frag != NULL);
+ if (max_brw > PTLRPC_BULK_OPS_COUNT)
+ RETURN(NULL);
+
+ if (nfrags > LNET_MAX_IOV * max_brw)
+ RETURN(NULL);
+
OBD_ALLOC_PTR(desc);
if (!desc)
return NULL;
desc->bd_portal = portal;
desc->bd_type = type;
desc->bd_md_count = 0;
+ desc->bd_nob_last = LNET_MTU;
desc->bd_frag_ops = ops;
LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
kiov = &desc->bd_vec[desc->bd_iov_count];
+ if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+ ((desc->bd_nob_last + len) > LNET_MTU)) {
+ desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+ desc->bd_md_count++;
+ desc->bd_nob_last = 0;
+ LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+ }
+
+ desc->bd_nob_last += len;
desc->bd_nob += len;
if (pin)
LASSERT(desc != NULL);
LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
- LASSERT(desc->bd_md_count == 0); /* network hands off */
+ LASSERT(desc->bd_refs == 0); /* network hands off */
LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
LASSERT(desc->bd_frag_ops != NULL);
*/
void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
{
- __u32 serv_est;
- int idx;
- struct imp_at *at;
-
LASSERT(req->rq_import);
if (AT_OFF) {
req->rq_timeout = req->rq_import->imp_server_timeout ?
obd_timeout / 2 : obd_timeout;
} else {
- at = &req->rq_import->imp_at;
+ struct imp_at *at = &req->rq_import->imp_at;
+ timeout_t serv_est;
+ int idx;
+
idx = import_at_get_index(req->rq_import,
req->rq_request_portal);
serv_est = at_get(&at->iat_service_estimate[idx]);
+ /*
+ * Currently a 32 bit value is sent over the
+ * wire for rq_timeout so please don't change this
+ * to time64_t. The work for LU-1158 will in time
+ * replace rq_timeout with a 64 bit nanosecond value
+ */
req->rq_timeout = at_est2timeout(serv_est);
}
/*
* We could get even fancier here, using history to predict increased
* loading...
- */
-
- /*
+ *
* Let the server know what this RPC timeout is by putting it in the
* reqmsg
*/
/* Adjust max service estimate based on server value */
static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
- unsigned int serv_est)
+ timeout_t serv_est)
{
int idx;
- unsigned int oldse;
+ timeout_t oldse;
struct imp_at *at;
LASSERT(req->rq_import);
void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
timeout_t service_timeout)
{
- unsigned int nl, oldnl;
- struct imp_at *at;
time64_t now = ktime_get_real_seconds();
+ struct imp_at *at;
+ timeout_t oldnl;
+ timeout_t nl;
LASSERT(req->rq_import);
return;
}
- /* Network latency is total time less server processing time */
- nl = max_t(int, now - req->rq_sent -
- service_timeout, 0) + 1; /* st rounding */
+ /* Network latency is total time less server processing time,
+ * st rounding
+ */
+ nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1;
at = &req->rq_import->imp_at;
oldnl = at_measured(&at->iat_net_latency, nl);
}
if (fail_t) {
- *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
+ *fail_t = ktime_get_real_seconds() +
+ PTLRPC_REQ_LONG_UNLINK;
if (fail2_t)
*fail2_t = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* The RPC is infected, let the test to change the
LASSERT(!request->rq_pool);
sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
out_free:
+ atomic_dec(&imp->imp_reqs);
class_import_put(imp);
return rc;
if (request) {
ptlrpc_cli_req_init(request);
- LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
+ LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
LASSERT(imp != LP_POISON);
LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
imp->imp_client);
LASSERT(imp->imp_client != LP_POISON);
request->rq_import = class_import_get(imp);
+ atomic_inc(&imp->imp_reqs);
} else {
CERROR("request allocation out of memory\n");
}
return request;
}
+static int ptlrpc_reconnect_if_idle(struct obd_import *imp)
+{
+ int rc;
+
+ /*
+ * initiate connection if needed when the import has been
+ * referenced by the new request to avoid races with disconnect.
+ * serialize this check against conditional state=IDLE
+ * in ptlrpc_disconnect_idle_interpret()
+ */
+ spin_lock(&imp->imp_lock);
+ if (imp->imp_state == LUSTRE_IMP_IDLE) {
+ imp->imp_generation++;
+ imp->imp_initiated_at = imp->imp_generation;
+ imp->imp_state = LUSTRE_IMP_NEW;
+
+ /* connect_import_locked releases imp_lock */
+ rc = ptlrpc_connect_import_locked(imp);
+ if (rc)
+ return rc;
+ ptlrpc_pinger_add_import(imp);
+ } else {
+ spin_unlock(&imp->imp_lock);
+ }
+ return 0;
+}
+
/**
* Helper function for creating a request.
* Calls __ptlrpc_request_alloc to allocate new request sturcture and inits
if (!request)
return NULL;
- /*
- * initiate connection if needed when the import has been
- * referenced by the new request to avoid races with disconnect
- */
- if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
- int rc;
-
- CDEBUG_LIMIT(imp->imp_idle_debug,
- "%s: reconnect after %llds idle\n",
- imp->imp_obd->obd_name, ktime_get_real_seconds() -
- imp->imp_last_reply_time);
- spin_lock(&imp->imp_lock);
- if (imp->imp_state == LUSTRE_IMP_IDLE) {
- imp->imp_generation++;
- imp->imp_initiated_at = imp->imp_generation;
- imp->imp_state = LUSTRE_IMP_NEW;
-
- /* connect_import_locked releases imp_lock */
- rc = ptlrpc_connect_import_locked(imp);
- if (rc < 0) {
- ptlrpc_request_free(request);
- return NULL;
- }
- ptlrpc_pinger_add_import(imp);
- } else {
- spin_unlock(&imp->imp_lock);
+ /* don't make expensive check for idling connection
+ * if it's already connected */
+ if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) {
+ if (ptlrpc_reconnect_if_idle(imp) < 0) {
+ atomic_dec(&imp->imp_reqs);
+ ptlrpc_request_free(request);
+ return NULL;
}
}
GOTO(interpret, req->rq_status);
}
+ /* don't resend too fast in case of network
+ * errors.
+ */
+ if (ktime_get_real_seconds() < (req->rq_sent + 1)
+ && req->rq_net_err && req->rq_timedout) {
+
+ DEBUG_REQ(D_INFO, req,
+ "throttle request");
+ /* Don't try to resend RPC right away
+ * as it is likely it will fail again
+ * and ptlrpc_check_set() will be
+ * called again, keeping this thread
+ * busy. Instead, wait for the next
+ * timeout. Flag it as resend to
+ * ensure we don't wait to long.
+ */
+ req->rq_resend = 1;
+ spin_unlock(&imp->imp_lock);
+ continue;
+ }
+
list_move_tail(&req->rq_list,
&imp->imp_sending_list);
req->rq_real_sent < req->rq_sent ||
req->rq_real_sent >= req->rq_deadline) ?
"timed out for sent delay" : "timed out for slow reply"),
- (s64)req->rq_sent, (s64)req->rq_real_sent);
+ req->rq_sent, req->rq_real_sent);
if (imp && obd_debug_peer_on_timeout)
LNetDebugPeer(imp->imp_connection->c_peer);
*/
if (rc == -ETIMEDOUT &&
signal_pending(current)) {
- sigset_t blocked_sigs;
+ sigset_t old, new;
- cfs_block_sigsinv(LUSTRE_FATAL_SIGS,
- &blocked_sigs);
+ siginitset(&new, LUSTRE_FATAL_SIGS);
+ sigprocmask(SIG_BLOCK, &new, &old);
/*
* In fact we only interrupt for the
* "fatal" signals like SIGINT or
*/
if (signal_pending(current))
ptlrpc_interrupted_set(set);
- cfs_restore_sigs(&blocked_sigs);
+ sigprocmask(SIG_SETMASK, &old, NULL);
}
}
sptlrpc_cli_free_repbuf(request);
if (request->rq_import) {
+ if (!ptlrpcd_check_work(request)) {
+ LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0);
+ atomic_dec(&request->rq_import->imp_reqs);
+ }
class_import_put(request->rq_import);
request->rq_import = NULL;
}
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
request->rq_reply_deadline = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* Nothing left to do.
wait_queue_head_t *wq = (request->rq_set) ?
&request->rq_set->set_waitq :
&request->rq_reply_waitq;
- int seconds = LONG_UNLINK;
+ int seconds = PTLRPC_REQ_LONG_UNLINK;
/*
* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs
|| req->rq_mbits == 0) {
req->rq_mbits = req->rq_xid;
} else {
- int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV;
- req->rq_mbits -= total_md - 1;
+ req->rq_mbits -= bd->bd_md_count - 1;
}
} else {
/*
* that server can infer the number of bulks that were prepared,
* see LU-1431
*/
- req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV) - 1;
+ req->rq_mbits += bd->bd_md_count - 1;
/*
* Set rq_xid as rq_mbits to indicate the final bulk for the old