int i;
for (i = 0; i < desc->bd_iov_count ; i++)
- put_page(BD_GET_KIOV(desc, i).kiov_page);
+ put_page(desc->bd_vec[i].bv_page);
}
static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
void *frag, int len)
{
- unsigned int offset = (uintptr_t)frag & ~PAGE_MASK;
+ unsigned int offset = (unsigned long)frag & ~PAGE_MASK;
ENTRY;
while (len > 0) {
int page_len = min_t(unsigned int, PAGE_SIZE - offset,
len);
- uintptr_t vaddr = (uintptr_t) frag;
+ unsigned long vaddr = (unsigned long)frag;
ptlrpc_prep_bulk_page_nopin(desc,
lnet_kvaddr_to_page(vaddr),
};
EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
-const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = {
- .add_iov_frag = ptlrpc_prep_bulk_frag,
-};
-EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops);
-
static int ptlrpc_send_new_req(struct ptlrpc_request *req);
static int ptlrpcd_check_work(struct ptlrpc_request *req);
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
struct ptlrpc_bulk_desc *desc;
int i;
- /* ensure that only one of KIOV or IOVEC is set but not both */
- LASSERT((ptlrpc_is_bulk_desc_kiov(type) &&
- ops->add_kiov_frag != NULL) ||
- (ptlrpc_is_bulk_desc_kvec(type) &&
- ops->add_iov_frag != NULL));
+ LASSERT(ops->add_kiov_frag != NULL);
+
+ if (max_brw > PTLRPC_BULK_OPS_COUNT)
+ RETURN(NULL);
+
+ if (nfrags > LNET_MAX_IOV * max_brw)
+ RETURN(NULL);
OBD_ALLOC_PTR(desc);
if (!desc)
return NULL;
- if (type & PTLRPC_BULK_BUF_KIOV) {
- OBD_ALLOC_LARGE(GET_KIOV(desc),
- nfrags * sizeof(*GET_KIOV(desc)));
- if (!GET_KIOV(desc))
- goto out;
- } else {
- OBD_ALLOC_LARGE(GET_KVEC(desc),
- nfrags * sizeof(*GET_KVEC(desc)));
- if (!GET_KVEC(desc))
- goto out;
- }
+
+ OBD_ALLOC_LARGE(desc->bd_vec,
+ nfrags * sizeof(*desc->bd_vec));
+ if (!desc->bd_vec)
+ goto out;
spin_lock_init(&desc->bd_lock);
init_waitqueue_head(&desc->bd_waitq);
desc->bd_portal = portal;
desc->bd_type = type;
desc->bd_md_count = 0;
+ desc->bd_nob_last = LNET_MTU;
desc->bd_frag_ops = ops;
LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
struct page *page, int pageoffset, int len,
int pin)
{
- lnet_kiov_t *kiov;
+ struct bio_vec *kiov;
LASSERT(desc->bd_iov_count < desc->bd_max_iov);
LASSERT(page != NULL);
LASSERT(pageoffset >= 0);
LASSERT(len > 0);
LASSERT(pageoffset + len <= PAGE_SIZE);
- LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
- kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
+ kiov = &desc->bd_vec[desc->bd_iov_count];
+
+ if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+ ((desc->bd_nob_last + len) > LNET_MTU)) {
+ desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+ desc->bd_md_count++;
+ desc->bd_nob_last = 0;
+ LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+ }
+ desc->bd_nob_last += len;
desc->bd_nob += len;
if (pin)
get_page(page);
- kiov->kiov_page = page;
- kiov->kiov_offset = pageoffset;
- kiov->kiov_len = len;
+ kiov->bv_page = page;
+ kiov->bv_offset = pageoffset;
+ kiov->bv_len = len;
desc->bd_iov_count++;
}
EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
-int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
- void *frag, int len)
-{
- struct kvec *iovec;
-
- ENTRY;
-
- LASSERT(desc->bd_iov_count < desc->bd_max_iov);
- LASSERT(frag != NULL);
- LASSERT(len > 0);
- LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
-
- iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
-
- desc->bd_nob += len;
-
- iovec->iov_base = frag;
- iovec->iov_len = len;
-
- desc->bd_iov_count++;
-
- RETURN(desc->bd_nob);
-}
-EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
-
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
{
ENTRY;
LASSERT(desc != NULL);
LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
- LASSERT(desc->bd_md_count == 0); /* network hands off */
+ LASSERT(desc->bd_refs == 0); /* network hands off */
LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
LASSERT(desc->bd_frag_ops != NULL);
- if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
- sptlrpc_enc_pool_put_pages(desc);
+ sptlrpc_enc_pool_put_pages(desc);
if (desc->bd_export)
class_export_put(desc->bd_export);
if (desc->bd_frag_ops->release_frags != NULL)
desc->bd_frag_ops->release_frags(desc);
- if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
- OBD_FREE_LARGE(GET_KIOV(desc),
- desc->bd_max_iov * sizeof(*GET_KIOV(desc)));
- else
- OBD_FREE_LARGE(GET_KVEC(desc),
- desc->bd_max_iov * sizeof(*GET_KVEC(desc)));
+ OBD_FREE_LARGE(desc->bd_vec,
+ desc->bd_max_iov * sizeof(*desc->bd_vec));
OBD_FREE_PTR(desc);
EXIT;
}
*/
void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
{
- __u32 serv_est;
- int idx;
- struct imp_at *at;
-
LASSERT(req->rq_import);
if (AT_OFF) {
req->rq_timeout = req->rq_import->imp_server_timeout ?
obd_timeout / 2 : obd_timeout;
} else {
- at = &req->rq_import->imp_at;
+ struct imp_at *at = &req->rq_import->imp_at;
+ timeout_t serv_est;
+ int idx;
+
idx = import_at_get_index(req->rq_import,
req->rq_request_portal);
serv_est = at_get(&at->iat_service_estimate[idx]);
+ /*
+ * Currently a 32 bit value is sent over the
+ * wire for rq_timeout so please don't change this
+ * to time64_t. The work for LU-1158 will in time
+ * replace rq_timeout with a 64 bit nanosecond value
+ */
req->rq_timeout = at_est2timeout(serv_est);
}
/*
* We could get even fancier here, using history to predict increased
* loading...
- */
-
- /*
+ *
* Let the server know what this RPC timeout is by putting it in the
* reqmsg
*/
/* Adjust max service estimate based on server value */
static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
- unsigned int serv_est)
+ timeout_t serv_est)
{
int idx;
- unsigned int oldse;
+ timeout_t oldse;
struct imp_at *at;
LASSERT(req->rq_import);
/* Adjust expected network latency */
void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
- unsigned int service_time)
+ timeout_t service_timeout)
{
- unsigned int nl, oldnl;
- struct imp_at *at;
time64_t now = ktime_get_real_seconds();
+ struct imp_at *at;
+ timeout_t oldnl;
+ timeout_t nl;
LASSERT(req->rq_import);
- if (service_time > now - req->rq_sent + 3) {
+ if (service_timeout > now - req->rq_sent + 3) {
/*
* b=16408, however, this can also happen if early reply
* is lost and client RPC is expired and resent, early reply
CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
D_ADAPTTO : D_WARNING,
"Reported service time %u > total measured time %lld\n",
- service_time, now - req->rq_sent);
+ service_timeout, now - req->rq_sent);
return;
}
- /* Network latency is total time less server processing time */
- nl = max_t(int, now - req->rq_sent -
- service_time, 0) + 1; /* st rounding */
+ /* Network latency is total time less server processing time,
+ * st rounding
+ */
+ nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1;
at = &req->rq_import->imp_at;
oldnl = at_measured(&at->iat_net_latency, nl);
__must_hold(&req->rq_lock)
{
struct ptlrpc_request *early_req;
+ timeout_t service_timeout;
time64_t olddl;
int rc;
lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
/* Network latency can be adjusted, it is pure network delays */
- ptlrpc_at_adj_net_latency(req,
- lustre_msg_get_service_time(early_req->rq_repmsg));
+ service_timeout = lustre_msg_get_service_timeout(early_req->rq_repmsg);
+ ptlrpc_at_adj_net_latency(req, service_timeout);
sptlrpc_cli_finish_early_reply(early_req);
*/
void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
{
- struct list_head *l, *tmp;
struct ptlrpc_request *req;
LASSERT(pool != NULL);
spin_lock(&pool->prp_lock);
- list_for_each_safe(l, tmp, &pool->prp_req_list) {
- req = list_entry(l, struct ptlrpc_request, rq_list);
+ while ((req = list_first_entry_or_null(&pool->prp_req_list,
+ struct ptlrpc_request,
+ rq_list))) {
list_del(&req->rq_list);
LASSERT(req->rq_reqbuf);
LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
void ptlrpc_add_unreplied(struct ptlrpc_request *req)
{
struct obd_import *imp = req->rq_import;
- struct list_head *tmp;
struct ptlrpc_request *iter;
assert_spin_locked(&imp->imp_lock);
LASSERT(list_empty(&req->rq_unreplied_list));
/* unreplied list is sorted by xid in ascending order */
- list_for_each_prev(tmp, &imp->imp_unreplied_list) {
- iter = list_entry(tmp, struct ptlrpc_request,
- rq_unreplied_list);
-
+ list_for_each_entry_reverse(iter, &imp->imp_unreplied_list,
+ rq_unreplied_list) {
LASSERT(req->rq_xid != iter->rq_xid);
if (req->rq_xid < iter->rq_xid)
continue;
}
if (fail_t) {
- *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
+ *fail_t = ktime_get_real_seconds() +
+ PTLRPC_REQ_LONG_UNLINK;
if (fail2_t)
*fail2_t = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* The RPC is infected, let the test to change the
LASSERT(!request->rq_pool);
sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
out_free:
+ atomic_dec(&imp->imp_reqs);
class_import_put(imp);
return rc;
if (request) {
ptlrpc_cli_req_init(request);
- LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
+ LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
LASSERT(imp != LP_POISON);
LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
imp->imp_client);
LASSERT(imp->imp_client != LP_POISON);
request->rq_import = class_import_get(imp);
+ atomic_inc(&imp->imp_reqs);
} else {
CERROR("request allocation out of memory\n");
}
return request;
}
+static int ptlrpc_reconnect_if_idle(struct obd_import *imp)
+{
+ int rc;
+
+ /*
+ * initiate connection if needed when the import has been
+ * referenced by the new request to avoid races with disconnect.
+ * serialize this check against conditional state=IDLE
+ * in ptlrpc_disconnect_idle_interpret()
+ */
+ spin_lock(&imp->imp_lock);
+ if (imp->imp_state == LUSTRE_IMP_IDLE) {
+ imp->imp_generation++;
+ imp->imp_initiated_at = imp->imp_generation;
+ imp->imp_state = LUSTRE_IMP_NEW;
+
+ /* connect_import_locked releases imp_lock */
+ rc = ptlrpc_connect_import_locked(imp);
+ if (rc)
+ return rc;
+ ptlrpc_pinger_add_import(imp);
+ } else {
+ spin_unlock(&imp->imp_lock);
+ }
+ return 0;
+}
+
/**
* Helper function for creating a request.
* Calls __ptlrpc_request_alloc to allocate new request sturcture and inits
if (!request)
return NULL;
- /*
- * initiate connection if needed when the import has been
- * referenced by the new request to avoid races with disconnect
- */
- if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
- int rc;
-
- CDEBUG_LIMIT(imp->imp_idle_debug,
- "%s: reconnect after %llds idle\n",
- imp->imp_obd->obd_name, ktime_get_real_seconds() -
- imp->imp_last_reply_time);
- spin_lock(&imp->imp_lock);
- if (imp->imp_state == LUSTRE_IMP_IDLE) {
- imp->imp_generation++;
- imp->imp_initiated_at = imp->imp_generation;
- imp->imp_state = LUSTRE_IMP_NEW;
-
- /* connect_import_locked releases imp_lock */
- rc = ptlrpc_connect_import_locked(imp);
- if (rc < 0) {
- ptlrpc_request_free(request);
- return NULL;
- }
- ptlrpc_pinger_add_import(imp);
- } else {
- spin_unlock(&imp->imp_lock);
+ /* don't make expensive check for idling connection
+ * if it's already connected */
+ if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) {
+ if (ptlrpc_reconnect_if_idle(imp) < 0) {
+ atomic_dec(&imp->imp_reqs);
+ ptlrpc_request_free(request);
+ return NULL;
}
}
*/
void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
- struct list_head *next;
+ struct ptlrpc_request *req;
int expected_phase;
int n = 0;
/* Requests on the set should either all be completed, or all be new */
expected_phase = (atomic_read(&set->set_remaining) == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
LASSERT(req->rq_phase == expected_phase);
n++;
}
atomic_read(&set->set_remaining) == n, "%d / %d\n",
atomic_read(&set->set_remaining), n);
- list_for_each_safe(tmp, next, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ while ((req = list_first_entry_or_null(&set->set_requests,
+ struct ptlrpc_request,
+ rq_set_chain))) {
list_del_init(&req->rq_set_chain);
LASSERT(req->rq_phase == expected_phase);
void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
struct ptlrpc_request *req)
{
+ if (set == PTLRPCD_SET) {
+ ptlrpcd_add_req(req);
+ return;
+ }
+
LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE);
LASSERT(list_empty(&req->rq_set_chain));
} else if (req->rq_no_delay &&
imp->imp_generation != imp->imp_initiated_at) {
/* ignore nodelay for requests initiating connections */
- *status = -EWOULDBLOCK;
+ *status = -EAGAIN;
} else if (req->rq_allow_replay &&
(imp->imp_state == LUSTRE_IMP_REPLAY ||
imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
ptlrpc_at_adj_net_latency(req,
- lustre_msg_get_service_time(req->rq_repmsg));
+ lustre_msg_get_service_timeout(req->rq_repmsg));
rc = ptlrpc_check_status(req);
lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
- lustre_msg_set_status(req->rq_reqmsg, current_pid());
+ lustre_msg_set_status(req->rq_reqmsg, current->pid);
- rc = sptlrpc_req_refresh_ctx(req, -1);
+ rc = sptlrpc_req_refresh_ctx(req, 0);
if (rc) {
if (req->rq_err) {
req->rq_status = rc;
CDEBUG(D_RPCTRACE,
"Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
- req, current_comm(),
+ req, current->comm,
imp->imp_obd->obd_uuid.uuid,
lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg),
*/
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- struct list_head *tmp, *next;
+ struct ptlrpc_request *req, *next;
LIST_HEAD(comp_reqs);
int force_timer_recalc = 0;
if (atomic_read(&set->set_remaining) == 0)
RETURN(1);
- list_for_each_safe(tmp, next, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry_safe(req, next, &set->set_requests,
+ rq_set_chain) {
struct obd_import *imp = req->rq_import;
int unregistered = 0;
int async = 1;
* not corrupt any data.
*/
if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
- ptlrpc_client_recv_or_unlink(req))
+ ptlrpc_cli_wait_unlink(req))
continue;
if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
ptlrpc_client_bulk_active(req))
/*
* Check if we still need to wait for unlink.
*/
- if (ptlrpc_client_recv_or_unlink(req) ||
+ if (ptlrpc_cli_wait_unlink(req) ||
ptlrpc_client_bulk_active(req))
continue;
/* If there is no need to resend, fail it now. */
}
/*
- * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
+ * ptlrpc_set_wait uses l_wait_event_abortable_timeout()
* so it sets rq_intr regardless of individual rpc
* timeouts. The synchronous IO waiting path sets
* rq_intr irrespective of whether ptlrpcd
GOTO(interpret, req->rq_status);
}
+ /* don't resend too fast in case of network
+ * errors.
+ */
+ if (ktime_get_real_seconds() < (req->rq_sent + 1)
+ && req->rq_net_err && req->rq_timedout) {
+
+ DEBUG_REQ(D_INFO, req,
+ "throttle request");
+ /* Don't try to resend RPC right away
+ * as it is likely it will fail again
+ * and ptlrpc_check_set() will be
+ * called again, keeping this thread
+ * busy. Instead, wait for the next
+ * timeout. Flag it as resend to
+ * ensure we don't wait to long.
+ */
+ req->rq_resend = 1;
+ spin_unlock(&imp->imp_lock);
+ continue;
+ }
+
list_move_tail(&req->rq_list,
&imp->imp_sending_list);
* rq_wait_ctx is only touched by ptlrpcd,
* so no lock is needed here.
*/
- status = sptlrpc_req_refresh_ctx(req, -1);
+ status = sptlrpc_req_refresh_ctx(req, 0);
if (status) {
if (req->rq_err) {
req->rq_status = status;
if (req->rq_reqmsg)
CDEBUG(D_RPCTRACE,
"Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
- req, current_comm(),
+ req, current->comm,
imp->imp_obd->obd_uuid.uuid,
lustre_msg_get_status(req->rq_reqmsg),
req->rq_xid,
req->rq_real_sent < req->rq_sent ||
req->rq_real_sent >= req->rq_deadline) ?
"timed out for sent delay" : "timed out for slow reply"),
- (s64)req->rq_sent, (s64)req->rq_real_sent);
+ req->rq_sent, req->rq_real_sent);
if (imp && obd_debug_peer_on_timeout)
LNetDebugPeer(imp->imp_connection->c_peer);
/**
* Time out all uncompleted requests in request set pointed by \a data
- * Callback used when waiting on sets with l_wait_event.
- * Always returns 1.
+ * This is called when a wait times out.
*/
void ptlrpc_expired_set(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
+ struct ptlrpc_request *req;
time64_t now = ktime_get_real_seconds();
ENTRY;
/*
* A timeout expired. See which reqs it applies to...
*/
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
/* don't expire request waiting for context */
if (req->rq_wait_ctx)
continue;
* ptlrpcd thread.
*/
ptlrpc_expire_one_request(req, 1);
- }
-}
+ /*
+ * Loops require that we resched once in a while to avoid
+ * RCU stalls and a few other problems.
+ */
+ cond_resched();
-/**
- * Sets rq_intr flag in \a req under spinlock.
- */
-void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
-{
- spin_lock(&req->rq_lock);
- req->rq_intr = 1;
- spin_unlock(&req->rq_lock);
+ }
}
-EXPORT_SYMBOL(ptlrpc_mark_interrupted);
/**
* Interrupts (sets interrupted flag) all uncompleted requests in
- * a set \a data. Callback for l_wait_event for interruptible waits.
+ * a set \a data. This is called when a wait_event is interrupted
+ * by a signal.
*/
static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
+ struct ptlrpc_request *req;
LASSERT(set != NULL);
CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
if (req->rq_intr)
continue;
!req->rq_allow_intr)
continue;
- ptlrpc_mark_interrupted(req);
+ spin_lock(&req->rq_lock);
+ req->rq_intr = 1;
+ spin_unlock(&req->rq_lock);
}
}
*/
time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
time64_t now = ktime_get_real_seconds();
int timeout = 0;
struct ptlrpc_request *req;
time64_t deadline;
ENTRY;
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
/* Request in-flight? */
if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK) ||
*/
int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
struct ptlrpc_request *req;
time64_t timeout;
int rc;
if (set->set_producer)
(void)ptlrpc_set_producer(set);
else
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
if (req->rq_phase == RQ_PHASE_NEW)
(void)ptlrpc_send_new_req(req);
}
*/
if (rc == -ETIMEDOUT &&
signal_pending(current)) {
- sigset_t blocked_sigs =
- cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+ sigset_t old, new;
+ siginitset(&new, LUSTRE_FATAL_SIGS);
+ sigprocmask(SIG_BLOCK, &new, &old);
/*
* In fact we only interrupt for the
* "fatal" signals like SIGINT or
*/
if (signal_pending(current))
ptlrpc_interrupted_set(set);
- cfs_restore_sigs(blocked_sigs);
+ sigprocmask(SIG_SETMASK, &old, NULL);
}
}
* the error cases -eeb.
*/
if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry(req, &set->set_requests,
+ rq_set_chain) {
spin_lock(&req->rq_lock);
req->rq_invalid_rqset = 1;
spin_unlock(&req->rq_lock);
LASSERT(atomic_read(&set->set_remaining) == 0);
rc = set->set_rc; /* rq_status of already freed requests if any */
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
if (req->rq_status != 0)
rc = req->rq_status;
sptlrpc_cli_free_repbuf(request);
if (request->rq_import) {
+ if (!ptlrpcd_check_work(request)) {
+ LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0);
+ atomic_dec(&request->rq_import->imp_reqs);
+ }
class_import_put(request->rq_import);
request->rq_import = NULL;
}
*/
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
+ bool discard = false;
/*
* Might sleep.
*/
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
request->rq_reply_deadline = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* Nothing left to do.
*/
- if (!ptlrpc_client_recv_or_unlink(request))
+ if (!__ptlrpc_cli_wait_unlink(request, &discard))
RETURN(1);
LNetMDUnlink(request->rq_reply_md_h);
+ if (discard) /* Discard the request-out callback */
+ __LNetMDUnlink(request->rq_req_md_h, discard);
+
/*
* Let's check it once again.
*/
- if (!ptlrpc_client_recv_or_unlink(request))
+ if (!ptlrpc_cli_wait_unlink(request))
RETURN(1);
/* Move to "Unregistering" phase as reply was not unlinked yet. */
RETURN(0);
/*
- * We have to l_wait_event() whatever the result, to give liblustre
+ * We have to wait_event_idle_timeout() whatever the result, to get
* a chance to run reply_in_callback(), and to make sure we've
* unlinked before returning a req to the pool.
*/
wait_queue_head_t *wq = (request->rq_set) ?
&request->rq_set->set_waitq :
&request->rq_reply_waitq;
- int seconds = LONG_UNLINK;
+ int seconds = PTLRPC_REQ_LONG_UNLINK;
/*
* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs
while (seconds > 0 &&
wait_event_idle_timeout(
*wq,
- !ptlrpc_client_recv_or_unlink(request),
+ !ptlrpc_cli_wait_unlink(request),
cfs_time_seconds(1)) == 0)
seconds -= 1;
if (seconds > 0) {
void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
struct obd_import *imp)
{
- struct list_head *tmp;
+ struct ptlrpc_request *iter;
assert_spin_locked(&imp->imp_lock);
LASSERT(imp->imp_replayable);
/* Balanced in ptlrpc_free_committed, usually. */
ptlrpc_request_addref(req);
- list_for_each_prev(tmp, &imp->imp_replay_list) {
- struct ptlrpc_request *iter = list_entry(tmp,
- struct ptlrpc_request,
- rq_replay_list);
-
+ list_for_each_entry_reverse(iter, &imp->imp_replay_list,
+ rq_replay_list) {
/*
* We may have duplicate transnos if we create and then
* open a file, or for closes retained if to match creating
}
/* for distributed debugging */
- lustre_msg_set_status(req->rq_reqmsg, current_pid());
+ lustre_msg_set_status(req->rq_reqmsg, current->pid);
/* add a ref for the set (see comment in ptlrpc_set_add_req) */
ptlrpc_request_addref(req);
ptlrpc_at_set_req_timeout(req);
/* Tell server net_latency to calculate how long to wait for reply. */
- lustre_msg_set_service_time(req->rq_reqmsg,
- ptlrpc_at_get_net_latency(req));
+ lustre_msg_set_service_timeout(req->rq_reqmsg,
+ ptlrpc_at_get_net_latency(req));
DEBUG_REQ(D_HA, req, "REPLAY");
atomic_inc(&req->rq_import->imp_replay_inflight);
*/
void ptlrpc_abort_inflight(struct obd_import *imp)
{
- struct list_head *tmp, *n;
+ struct ptlrpc_request *req;
ENTRY;
/*
* locked? Also, how do we know if the requests on the list are
* being freed at this time?
*/
- list_for_each_safe(tmp, n, &imp->imp_sending_list) {
- struct ptlrpc_request *req = list_entry(tmp,
- struct ptlrpc_request,
- rq_list);
-
+ list_for_each_entry(req, &imp->imp_sending_list, rq_list) {
DEBUG_REQ(D_RPCTRACE, req, "inflight");
spin_lock(&req->rq_lock);
spin_unlock(&req->rq_lock);
}
- list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_list);
-
+ list_for_each_entry(req, &imp->imp_delayed_list, rq_list) {
DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
spin_lock(&req->rq_lock);
*/
void ptlrpc_abort_set(struct ptlrpc_request_set *set)
{
- struct list_head *tmp, *pos;
+ struct ptlrpc_request *req;
LASSERT(set != NULL);
- list_for_each_safe(pos, tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(pos, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
spin_lock(&req->rq_lock);
if (req->rq_phase != RQ_PHASE_RPC) {
spin_unlock(&req->rq_lock);
|| req->rq_mbits == 0) {
req->rq_mbits = req->rq_xid;
} else {
- int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV;
- req->rq_mbits -= total_md - 1;
+ req->rq_mbits -= bd->bd_md_count - 1;
}
} else {
/*
* that server can infer the number of bulks that were prepared,
* see LU-1431
*/
- req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV) - 1;
+ req->rq_mbits += bd->bd_md_count - 1;
/*
* Set rq_xid as rq_mbits to indicate the final bulk for the old