#include <linux/delay.h>
#include <linux/random.h>
+#include <lnet/lib-lnet.h>
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_lib.h>
int i;
for (i = 0; i < desc->bd_iov_count ; i++)
- put_page(BD_GET_KIOV(desc, i).kiov_page);
+ put_page(desc->bd_vec[i].bv_page);
+}
+
+static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
+ void *frag, int len)
+{
+ unsigned int offset = (unsigned long)frag & ~PAGE_MASK;
+
+ ENTRY;
+ while (len > 0) {
+ int page_len = min_t(unsigned int, PAGE_SIZE - offset,
+ len);
+ unsigned long vaddr = (unsigned long)frag;
+
+ ptlrpc_prep_bulk_page_nopin(desc,
+ lnet_kvaddr_to_page(vaddr),
+ offset, page_len);
+ offset = 0;
+ len -= page_len;
+ frag += page_len;
+ }
+
+ RETURN(desc->bd_nob);
}
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
.add_kiov_frag = ptlrpc_prep_bulk_page_nopin,
.release_frags = ptlrpc_release_bulk_noop,
+ .add_iov_frag = ptlrpc_prep_bulk_frag_pages,
};
EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
-const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = {
- .add_iov_frag = ptlrpc_prep_bulk_frag,
-};
-EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops);
-
static int ptlrpc_send_new_req(struct ptlrpc_request *req);
static int ptlrpcd_check_work(struct ptlrpc_request *req);
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
struct ptlrpc_bulk_desc *desc;
int i;
- /* ensure that only one of KIOV or IOVEC is set but not both */
- LASSERT((ptlrpc_is_bulk_desc_kiov(type) &&
- ops->add_kiov_frag != NULL) ||
- (ptlrpc_is_bulk_desc_kvec(type) &&
- ops->add_iov_frag != NULL));
+ LASSERT(ops->add_kiov_frag != NULL);
+
+ if (max_brw > PTLRPC_BULK_OPS_COUNT)
+ RETURN(NULL);
+
+ if (nfrags > LNET_MAX_IOV * max_brw)
+ RETURN(NULL);
OBD_ALLOC_PTR(desc);
if (!desc)
return NULL;
- if (type & PTLRPC_BULK_BUF_KIOV) {
- OBD_ALLOC_LARGE(GET_KIOV(desc),
- nfrags * sizeof(*GET_KIOV(desc)));
- if (!GET_KIOV(desc))
- goto out;
- } else {
- OBD_ALLOC_LARGE(GET_KVEC(desc),
- nfrags * sizeof(*GET_KVEC(desc)));
- if (!GET_KVEC(desc))
- goto out;
- }
+
+ OBD_ALLOC_LARGE(desc->bd_vec,
+ nfrags * sizeof(*desc->bd_vec));
+ if (!desc->bd_vec)
+ goto out;
spin_lock_init(&desc->bd_lock);
init_waitqueue_head(&desc->bd_waitq);
desc->bd_portal = portal;
desc->bd_type = type;
desc->bd_md_count = 0;
+ desc->bd_nob_last = LNET_MTU;
desc->bd_frag_ops = ops;
LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
if (!desc)
RETURN(NULL);
- desc->bd_import_generation = req->rq_import_generation;
desc->bd_import = class_import_get(imp);
desc->bd_req = req;
struct page *page, int pageoffset, int len,
int pin)
{
- lnet_kiov_t *kiov;
+ struct bio_vec *kiov;
LASSERT(desc->bd_iov_count < desc->bd_max_iov);
LASSERT(page != NULL);
LASSERT(pageoffset >= 0);
LASSERT(len > 0);
LASSERT(pageoffset + len <= PAGE_SIZE);
- LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
- kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
+ kiov = &desc->bd_vec[desc->bd_iov_count];
+ if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+ ((desc->bd_nob_last + len) > LNET_MTU)) {
+ desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+ desc->bd_md_count++;
+ desc->bd_nob_last = 0;
+ LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+ }
+
+ desc->bd_nob_last += len;
desc->bd_nob += len;
if (pin)
get_page(page);
- kiov->kiov_page = page;
- kiov->kiov_offset = pageoffset;
- kiov->kiov_len = len;
+ kiov->bv_page = page;
+ kiov->bv_offset = pageoffset;
+ kiov->bv_len = len;
desc->bd_iov_count++;
}
EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
-int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
- void *frag, int len)
-{
- struct kvec *iovec;
-
- ENTRY;
-
- LASSERT(desc->bd_iov_count < desc->bd_max_iov);
- LASSERT(frag != NULL);
- LASSERT(len > 0);
- LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
-
- iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
-
- desc->bd_nob += len;
-
- iovec->iov_base = frag;
- iovec->iov_len = len;
-
- desc->bd_iov_count++;
-
- RETURN(desc->bd_nob);
-}
-EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
-
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
{
ENTRY;
LASSERT(desc != NULL);
LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
- LASSERT(desc->bd_md_count == 0); /* network hands off */
+ LASSERT(desc->bd_refs == 0); /* network hands off */
LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
LASSERT(desc->bd_frag_ops != NULL);
- if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
- sptlrpc_enc_pool_put_pages(desc);
+ sptlrpc_enc_pool_put_pages(desc);
if (desc->bd_export)
class_export_put(desc->bd_export);
if (desc->bd_frag_ops->release_frags != NULL)
desc->bd_frag_ops->release_frags(desc);
- if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
- OBD_FREE_LARGE(GET_KIOV(desc),
- desc->bd_max_iov * sizeof(*GET_KIOV(desc)));
- else
- OBD_FREE_LARGE(GET_KVEC(desc),
- desc->bd_max_iov * sizeof(*GET_KVEC(desc)));
+ OBD_FREE_LARGE(desc->bd_vec,
+ desc->bd_max_iov * sizeof(*desc->bd_vec));
OBD_FREE_PTR(desc);
EXIT;
}
*/
void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
{
- __u32 serv_est;
- int idx;
- struct imp_at *at;
-
LASSERT(req->rq_import);
if (AT_OFF) {
req->rq_timeout = req->rq_import->imp_server_timeout ?
obd_timeout / 2 : obd_timeout;
} else {
- at = &req->rq_import->imp_at;
+ struct imp_at *at = &req->rq_import->imp_at;
+ timeout_t serv_est;
+ int idx;
+
idx = import_at_get_index(req->rq_import,
req->rq_request_portal);
serv_est = at_get(&at->iat_service_estimate[idx]);
+ /*
+ * Currently a 32 bit value is sent over the
+ * wire for rq_timeout so please don't change this
+ * to time64_t. The work for LU-1158 will in time
+ * replace rq_timeout with a 64 bit nanosecond value
+ */
req->rq_timeout = at_est2timeout(serv_est);
}
/*
* We could get even fancier here, using history to predict increased
* loading...
- */
-
- /*
+ *
* Let the server know what this RPC timeout is by putting it in the
* reqmsg
*/
/* Adjust max service estimate based on server value */
static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
- unsigned int serv_est)
+ timeout_t serv_est)
{
int idx;
- unsigned int oldse;
+ timeout_t oldse;
struct imp_at *at;
LASSERT(req->rq_import);
/* Adjust expected network latency */
void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
- unsigned int service_time)
+ timeout_t service_timeout)
{
- unsigned int nl, oldnl;
- struct imp_at *at;
time64_t now = ktime_get_real_seconds();
+ struct imp_at *at;
+ timeout_t oldnl;
+ timeout_t nl;
LASSERT(req->rq_import);
- if (service_time > now - req->rq_sent + 3) {
+ if (service_timeout > now - req->rq_sent + 3) {
/*
* b=16408, however, this can also happen if early reply
* is lost and client RPC is expired and resent, early reply
CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
D_ADAPTTO : D_WARNING,
"Reported service time %u > total measured time %lld\n",
- service_time, now - req->rq_sent);
+ service_timeout, now - req->rq_sent);
return;
}
- /* Network latency is total time less server processing time */
- nl = max_t(int, now - req->rq_sent -
- service_time, 0) + 1; /* st rounding */
+ /* Network latency is total time less server processing time,
+ * st rounding
+ */
+ nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1;
at = &req->rq_import->imp_at;
oldnl = at_measured(&at->iat_net_latency, nl);
if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
+ DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d",
+ rc);
return -EPROTO;
}
}
rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
+ DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d",
+ rc);
return -EPROTO;
}
return 0;
__must_hold(&req->rq_lock)
{
struct ptlrpc_request *early_req;
+ timeout_t service_timeout;
time64_t olddl;
int rc;
lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
/* Network latency can be adjusted, it is pure network delays */
- ptlrpc_at_adj_net_latency(req,
- lustre_msg_get_service_time(early_req->rq_repmsg));
+ service_timeout = lustre_msg_get_service_timeout(early_req->rq_repmsg);
+ ptlrpc_at_adj_net_latency(req, service_timeout);
sptlrpc_cli_finish_early_reply(early_req);
req->rq_deadline = req->rq_sent + req->rq_timeout +
ptlrpc_at_get_net_latency(req);
+ /* The below message is checked in replay-single.sh test_65{a,b} */
+ /* The below message is checked in sanity-{gss,krb5} test_8 */
DEBUG_REQ(D_ADAPTTO, req,
"Early reply #%d, new deadline in %llds (%llds)",
req->rq_early_count,
*/
void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
{
- struct list_head *l, *tmp;
struct ptlrpc_request *req;
LASSERT(pool != NULL);
spin_lock(&pool->prp_lock);
- list_for_each_safe(l, tmp, &pool->prp_req_list) {
- req = list_entry(l, struct ptlrpc_request, rq_list);
+ while ((req = list_first_entry_or_null(&pool->prp_req_list,
+ struct ptlrpc_request,
+ rq_list))) {
list_del(&req->rq_list);
LASSERT(req->rq_reqbuf);
LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
"Trying to change pool size with nonempty pool from %d to %d bytes\n",
pool->prp_rq_size, size);
- spin_lock(&pool->prp_lock);
pool->prp_rq_size = size;
for (i = 0; i < num_rq; i++) {
struct ptlrpc_request *req;
struct lustre_msg *msg;
- spin_unlock(&pool->prp_lock);
req = ptlrpc_request_cache_alloc(GFP_NOFS);
if (!req)
return i;
req->rq_pool = pool;
spin_lock(&pool->prp_lock);
list_add_tail(&req->rq_list, &pool->prp_req_list);
+ spin_unlock(&pool->prp_lock);
}
- spin_unlock(&pool->prp_lock);
return num_rq;
}
EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
{
struct ptlrpc_request_pool *pool;
- OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
+ OBD_ALLOC_PTR(pool);
if (!pool)
return NULL;
void ptlrpc_add_unreplied(struct ptlrpc_request *req)
{
struct obd_import *imp = req->rq_import;
- struct list_head *tmp;
struct ptlrpc_request *iter;
assert_spin_locked(&imp->imp_lock);
LASSERT(list_empty(&req->rq_unreplied_list));
/* unreplied list is sorted by xid in ascending order */
- list_for_each_prev(tmp, &imp->imp_unreplied_list) {
- iter = list_entry(tmp, struct ptlrpc_request,
- rq_unreplied_list);
-
+ list_for_each_entry_reverse(iter, &imp->imp_unreplied_list,
+ rq_unreplied_list) {
LASSERT(req->rq_xid != iter->rq_xid);
if (req->rq_xid < iter->rq_xid)
continue;
spin_unlock(&req->rq_import->imp_lock);
}
-static __u64 ptlrpc_last_xid;
-static spinlock_t ptlrpc_last_xid_lock;
+static atomic64_t ptlrpc_last_xid;
+
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+ spin_lock(&req->rq_import->imp_lock);
+ list_del_init(&req->rq_unreplied_list);
+ ptlrpc_assign_next_xid_nolock(req);
+ spin_unlock(&req->rq_import->imp_lock);
+ DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc;
+ __u16 tag;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ tag = obd_get_mod_rpc_slot(cli, opc);
+ lustre_msg_set_tag(req->rq_reqmsg, tag);
+ ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ if (tag != 0) {
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ obd_put_mod_rpc_slot(cli, opc, tag);
+ }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
fail2_t = &request->rq_bulk_deadline;
} else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) {
time64_t now = ktime_get_real_seconds();
- spin_lock(&ptlrpc_last_xid_lock);
- ptlrpc_last_xid = ((__u64)now >> 4) << 24;
- spin_unlock(&ptlrpc_last_xid_lock);
+ u64 xid = ((u64)now >> 4) << 24;
+
+ atomic64_set(&ptlrpc_last_xid, xid);
}
if (fail_t) {
- *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
+ *fail_t = ktime_get_real_seconds() +
+ PTLRPC_REQ_LONG_UNLINK;
if (fail2_t)
*fail2_t = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* The RPC is infected, let the test to change the
LASSERT(!request->rq_pool);
sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
out_free:
+ atomic_dec(&imp->imp_reqs);
class_import_put(imp);
return rc;
int ptlrpc_request_pack(struct ptlrpc_request *request,
__u32 version, int opcode)
{
- int rc;
-
- rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
- if (rc)
- return rc;
-
- /*
- * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
- * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
- * have to send old ptlrpc_body to keep interoprability with these
- * clients.
- *
- * Only three kinds of server->client RPCs so far:
- * - LDLM_BL_CALLBACK
- * - LDLM_CP_CALLBACK
- * - LDLM_GL_CALLBACK
- *
- * XXX This should be removed whenever we drop the interoprability with
- * the these old clients.
- */
- if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
- opcode == LDLM_GL_CALLBACK)
- req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
- sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
-
- return rc;
+ return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
}
EXPORT_SYMBOL(ptlrpc_request_pack);
if (request) {
ptlrpc_cli_req_init(request);
- LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
+ LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
LASSERT(imp != LP_POISON);
LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
imp->imp_client);
LASSERT(imp->imp_client != LP_POISON);
request->rq_import = class_import_get(imp);
+ atomic_inc(&imp->imp_reqs);
} else {
CERROR("request allocation out of memory\n");
}
return request;
}
+static int ptlrpc_reconnect_if_idle(struct obd_import *imp)
+{
+ int rc;
+
+ /*
+ * initiate connection if needed when the import has been
+ * referenced by the new request to avoid races with disconnect.
+ * serialize this check against conditional state=IDLE
+ * in ptlrpc_disconnect_idle_interpret()
+ */
+ spin_lock(&imp->imp_lock);
+ if (imp->imp_state == LUSTRE_IMP_IDLE) {
+ imp->imp_generation++;
+ imp->imp_initiated_at = imp->imp_generation;
+ imp->imp_state = LUSTRE_IMP_NEW;
+
+ /* connect_import_locked releases imp_lock */
+ rc = ptlrpc_connect_import_locked(imp);
+ if (rc)
+ return rc;
+ ptlrpc_pinger_add_import(imp);
+ } else {
+ spin_unlock(&imp->imp_lock);
+ }
+ return 0;
+}
+
/**
* Helper function for creating a request.
* Calls __ptlrpc_request_alloc to allocate new request sturcture and inits
const struct req_format *format)
{
struct ptlrpc_request *request;
- int connect = 0;
request = __ptlrpc_request_alloc(imp, pool);
if (!request)
return NULL;
- /*
- * initiate connection if needed when the import has been
- * referenced by the new request to avoid races with disconnect
- */
- if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
- int rc;
-
- CDEBUG_LIMIT(imp->imp_idle_debug,
- "%s: reconnect after %llds idle\n",
- imp->imp_obd->obd_name, ktime_get_real_seconds() -
- imp->imp_last_reply_time);
- spin_lock(&imp->imp_lock);
- if (imp->imp_state == LUSTRE_IMP_IDLE) {
- imp->imp_generation++;
- imp->imp_initiated_at = imp->imp_generation;
- imp->imp_state = LUSTRE_IMP_NEW;
- connect = 1;
- }
- spin_unlock(&imp->imp_lock);
- if (connect) {
- rc = ptlrpc_connect_import(imp);
- if (rc < 0) {
- ptlrpc_request_free(request);
- return NULL;
- }
- ptlrpc_pinger_add_import(imp);
+ /* don't make expensive check for idling connection
+ * if it's already connected */
+ if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) {
+ if (ptlrpc_reconnect_if_idle(imp) < 0) {
+ atomic_dec(&imp->imp_reqs);
+ ptlrpc_request_free(request);
+ return NULL;
}
}
int cpt;
ENTRY;
- cpt = cfs_cpt_current(cfs_cpt_table, 0);
- OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof(*set));
+ cpt = cfs_cpt_current(cfs_cpt_tab, 0);
+ OBD_CPT_ALLOC(set, cfs_cpt_tab, cpt, sizeof(*set));
if (!set)
RETURN(NULL);
atomic_set(&set->set_refcount, 1);
*/
void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
- struct list_head *next;
+ struct ptlrpc_request *req;
int expected_phase;
int n = 0;
/* Requests on the set should either all be completed, or all be new */
expected_phase = (atomic_read(&set->set_remaining) == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
LASSERT(req->rq_phase == expected_phase);
n++;
}
atomic_read(&set->set_remaining) == n, "%d / %d\n",
atomic_read(&set->set_remaining), n);
- list_for_each_safe(tmp, next, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ while ((req = list_first_entry_or_null(&set->set_requests,
+ struct ptlrpc_request,
+ rq_set_chain))) {
list_del_init(&req->rq_set_chain);
LASSERT(req->rq_phase == expected_phase);
void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
struct ptlrpc_request *req)
{
+ if (set == PTLRPCD_SET) {
+ ptlrpcd_add_req(req);
+ return;
+ }
+
LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE);
LASSERT(list_empty(&req->rq_set_chain));
if (req->rq_ctx_init || req->rq_ctx_fini) {
/* always allow ctx init/fini rpc go through */
} else if (imp->imp_state == LUSTRE_IMP_NEW) {
- DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
+ DEBUG_REQ(D_ERROR, req, "Uninitialized import");
*status = -EIO;
} else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg);
* race with umount
*/
DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ?
- D_HA : D_ERROR, req, "IMP_CLOSED ");
+ D_HA : D_ERROR, req, "IMP_CLOSED");
*status = -EIO;
} else if (ptlrpc_send_limit_expired(req)) {
/* probably doesn't need to be a D_ERROR afterinitial testing */
- DEBUG_REQ(D_HA, req, "send limit expired ");
+ DEBUG_REQ(D_HA, req, "send limit expired");
*status = -ETIMEDOUT;
} else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
imp->imp_state == LUSTRE_IMP_CONNECTING) {
} else if (req->rq_no_delay &&
imp->imp_generation != imp->imp_initiated_at) {
/* ignore nodelay for requests initiating connections */
- *status = -EWOULDBLOCK;
+ *status = -EAGAIN;
} else if (req->rq_allow_replay &&
(imp->imp_state == LUSTRE_IMP_REPLAY ||
imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
imp->imp_state == LUSTRE_IMP_RECOVER)) {
- DEBUG_REQ(D_HA, req, "allow during recovery.\n");
+ DEBUG_REQ(D_HA, req, "allow during recovery");
} else {
delay = 1;
}
*/
static int ptlrpc_check_status(struct ptlrpc_request *req)
{
- int err;
+ int rc;
ENTRY;
- err = lustre_msg_get_status(req->rq_repmsg);
+ rc = lustre_msg_get_status(req->rq_repmsg);
if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
struct obd_import *imp = req->rq_import;
lnet_nid_t nid = imp->imp_connection->c_peer.nid;
__u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
- if (ptlrpc_console_allow(req, opc, err))
+ if (ptlrpc_console_allow(req, opc, rc))
LCONSOLE_ERROR_MSG(0x11,
"%s: operation %s to node %s failed: rc = %d\n",
imp->imp_obd->obd_name,
ll_opcode2str(opc),
- libcfs_nid2str(nid), err);
- RETURN(err < 0 ? err : -EINVAL);
+ libcfs_nid2str(nid), rc);
+ RETURN(rc < 0 ? rc : -EINVAL);
}
- if (err < 0) {
- DEBUG_REQ(D_INFO, req, "status is %d", err);
- } else if (err > 0) {
- /* XXX: translate this error from net to host */
- DEBUG_REQ(D_INFO, req, "status is %d", err);
- }
+ if (rc)
+ DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc);
- RETURN(err);
+ RETURN(rc);
}
/**
if (req->rq_reply_truncated) {
if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_ERROR, req,
- "reply buffer overflow, expected: %d, actual size: %d",
+ "reply buffer overflow, expected=%d, actual size=%d",
req->rq_nob_received, req->rq_repbuf_len);
RETURN(-EOVERFLOW);
}
*/
rc = sptlrpc_cli_unwrap_reply(req);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
+ DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc);
RETURN(rc);
}
ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
time64_t now = ktime_get_real_seconds();
- DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
+ DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) |
+ D_RPCTRACE, req, "resending request on EINPROGRESS");
spin_lock(&req->rq_lock);
req->rq_resend = 1;
spin_unlock(&req->rq_lock);
CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
ptlrpc_at_adj_net_latency(req,
- lustre_msg_get_service_time(req->rq_repmsg));
+ lustre_msg_get_service_timeout(req->rq_repmsg));
rc = ptlrpc_check_status(req);
lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
- lustre_msg_set_status(req->rq_reqmsg, current_pid());
+ lustre_msg_set_status(req->rq_reqmsg, current->pid);
- rc = sptlrpc_req_refresh_ctx(req, -1);
+ rc = sptlrpc_req_refresh_ctx(req, 0);
if (rc) {
if (req->rq_err) {
req->rq_status = rc;
}
CDEBUG(D_RPCTRACE,
- "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
- current_comm(),
+ "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+ req, current->comm,
imp->imp_obd->obd_uuid.uuid,
lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
- obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg));
+ obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg),
+ lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
rc = ptl_send_rpc(req, 0);
if (rc == -ENOMEM) {
spin_lock(&imp->imp_lock);
if (!list_empty(&req->rq_list)) {
list_del_init(&req->rq_list);
- atomic_dec(&req->rq_import->imp_inflight);
+ if (atomic_dec_and_test(&req->rq_import->imp_inflight))
+ wake_up(&req->rq_import->imp_recovery_waitq);
}
spin_unlock(&imp->imp_lock);
ptlrpc_rqphase_move(req, RQ_PHASE_NEW);
RETURN(rc);
}
if (rc) {
- DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
+ DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d",
+ rc);
spin_lock(&req->rq_lock);
req->rq_net_err = 1;
spin_unlock(&req->rq_lock);
*/
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- struct list_head *tmp, *next;
- struct list_head comp_reqs;
+ struct ptlrpc_request *req, *next;
+ LIST_HEAD(comp_reqs);
int force_timer_recalc = 0;
ENTRY;
if (atomic_read(&set->set_remaining) == 0)
RETURN(1);
- INIT_LIST_HEAD(&comp_reqs);
- list_for_each_safe(tmp, next, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry_safe(req, next, &set->set_requests,
+ rq_set_chain) {
struct obd_import *imp = req->rq_import;
int unregistered = 0;
int async = 1;
* not corrupt any data.
*/
if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
- ptlrpc_client_recv_or_unlink(req))
+ ptlrpc_cli_wait_unlink(req))
continue;
if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
ptlrpc_client_bulk_active(req))
/*
* Check if we still need to wait for unlink.
*/
- if (ptlrpc_client_recv_or_unlink(req) ||
+ if (ptlrpc_cli_wait_unlink(req) ||
ptlrpc_client_bulk_active(req))
continue;
/* If there is no need to resend, fail it now. */
}
/*
- * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
+ * ptlrpc_set_wait uses l_wait_event_abortable_timeout()
* so it sets rq_intr regardless of individual rpc
* timeouts. The synchronous IO waiting path sets
* rq_intr irrespective of whether ptlrpcd
* put on delay list - only if we wait
* recovery finished - before send
*/
- list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list,
- &imp->imp_delayed_list);
+ list_move_tail(&req->rq_list,
+ &imp->imp_delayed_list);
spin_unlock(&imp->imp_lock);
continue;
}
GOTO(interpret, req->rq_status);
}
- list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list,
- &imp->imp_sending_list);
+ /* don't resend too fast in case of network
+ * errors.
+ */
+ if (ktime_get_real_seconds() < (req->rq_sent + 1)
+ && req->rq_net_err && req->rq_timedout) {
+
+ DEBUG_REQ(D_INFO, req,
+ "throttle request");
+ /* Don't try to resend RPC right away
+ * as it is likely it will fail again
+ * and ptlrpc_check_set() will be
+ * called again, keeping this thread
+ * busy. Instead, wait for the next
+ * timeout. Flag it as resend to
+ * ensure we don't wait to long.
+ */
+ req->rq_resend = 1;
+ spin_unlock(&imp->imp_lock);
+ continue;
+ }
+
+ list_move_tail(&req->rq_list,
+ &imp->imp_sending_list);
spin_unlock(&imp->imp_lock);
* rq_wait_ctx is only touched by ptlrpcd,
* so no lock is needed here.
*/
- status = sptlrpc_req_refresh_ctx(req, -1);
+ status = sptlrpc_req_refresh_ctx(req, 0);
if (status) {
if (req->rq_err) {
req->rq_status = status;
if (req->rq_reqmsg)
CDEBUG(D_RPCTRACE,
- "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
- current_comm(),
+ "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+ req, current->comm,
imp->imp_obd->obd_uuid.uuid,
lustre_msg_get_status(req->rq_reqmsg),
req->rq_xid,
obd_import_nid2str(imp),
- lustre_msg_get_opc(req->rq_reqmsg));
+ lustre_msg_get_opc(req->rq_reqmsg),
+ lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
spin_lock(&imp->imp_lock);
/*
*/
if (!list_empty(&req->rq_list)) {
list_del_init(&req->rq_list);
- atomic_dec(&imp->imp_inflight);
+ if (atomic_dec_and_test(&imp->imp_inflight))
+ wake_up(&imp->imp_recovery_waitq);
}
list_del_init(&req->rq_unreplied_list);
spin_unlock(&imp->imp_lock);
atomic_dec(&set->set_remaining);
- wake_up_all(&imp->imp_recovery_waitq);
+ wake_up(&imp->imp_recovery_waitq);
if (set->set_producer) {
/* produce a new request if possible */
req->rq_real_sent < req->rq_sent ||
req->rq_real_sent >= req->rq_deadline) ?
"timed out for sent delay" : "timed out for slow reply"),
- (s64)req->rq_sent, (s64)req->rq_real_sent);
+ req->rq_sent, req->rq_real_sent);
if (imp && obd_debug_peer_on_timeout)
LNetDebugPeer(imp->imp_connection->c_peer);
/**
* Time out all uncompleted requests in request set pointed by \a data
- * Callback used when waiting on sets with l_wait_event.
- * Always returns 1.
+ * This is called when a wait times out.
*/
-int ptlrpc_expired_set(void *data)
+void ptlrpc_expired_set(struct ptlrpc_request_set *set)
{
- struct ptlrpc_request_set *set = data;
- struct list_head *tmp;
+ struct ptlrpc_request *req;
time64_t now = ktime_get_real_seconds();
ENTRY;
/*
* A timeout expired. See which reqs it applies to...
*/
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
/* don't expire request waiting for context */
if (req->rq_wait_ctx)
continue;
* ptlrpcd thread.
*/
ptlrpc_expire_one_request(req, 1);
- }
-
- /*
- * When waiting for a whole set, we always break out of the
- * sleep so we can recalculate the timeout, or enable interrupts
- * if everyone's timed out.
- */
- RETURN(1);
-}
+ /*
+ * Loops require that we resched once in a while to avoid
+ * RCU stalls and a few other problems.
+ */
+ cond_resched();
-/**
- * Sets rq_intr flag in \a req under spinlock.
- */
-void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
-{
- spin_lock(&req->rq_lock);
- req->rq_intr = 1;
- spin_unlock(&req->rq_lock);
+ }
}
-EXPORT_SYMBOL(ptlrpc_mark_interrupted);
/**
* Interrupts (sets interrupted flag) all uncompleted requests in
- * a set \a data. Callback for l_wait_event for interruptible waits.
+ * a set \a data. This is called when a wait_event is interrupted
+ * by a signal.
*/
-static void ptlrpc_interrupted_set(void *data)
+static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
{
- struct ptlrpc_request_set *set = data;
- struct list_head *tmp;
+ struct ptlrpc_request *req;
LASSERT(set != NULL);
CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
- list_for_each(tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
if (req->rq_intr)
continue;
!req->rq_allow_intr)
continue;
- ptlrpc_mark_interrupted(req);
+ spin_lock(&req->rq_lock);
+ req->rq_intr = 1;
+ spin_unlock(&req->rq_lock);
}
}
*/
time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
time64_t now = ktime_get_real_seconds();
int timeout = 0;
struct ptlrpc_request *req;
time64_t deadline;
ENTRY;
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
/* Request in-flight? */
if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK) ||
*/
int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
struct ptlrpc_request *req;
- struct l_wait_info lwi;
time64_t timeout;
int rc;
if (set->set_producer)
(void)ptlrpc_set_producer(set);
else
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
if (req->rq_phase == RQ_PHASE_NEW)
(void)ptlrpc_send_new_req(req);
}
set, timeout);
if ((timeout == 0 && !signal_pending(current)) ||
- set->set_allow_intr)
+ set->set_allow_intr) {
/*
* No requests are in-flight (ether timed out
* or delayed), so we can allow interrupts.
* We still want to block for a limited time,
* so we allow interrupts during the timeout.
*/
- lwi = LWI_TIMEOUT_INTR_ALL(
- cfs_time_seconds(timeout ? timeout : 1),
- ptlrpc_expired_set,
- ptlrpc_interrupted_set, set);
- else
+ rc = l_wait_event_abortable_timeout(
+ set->set_waitq,
+ ptlrpc_check_set(NULL, set),
+ cfs_time_seconds(timeout ? timeout : 1));
+ if (rc == 0) {
+ rc = -ETIMEDOUT;
+ ptlrpc_expired_set(set);
+ } else if (rc < 0) {
+ rc = -EINTR;
+ ptlrpc_interrupted_set(set);
+ } else {
+ rc = 0;
+ }
+ } else {
/*
* At least one request is in flight, so no
* interrupts are allowed. Wait until all
* complete, or an in-flight req times out.
*/
- lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
- ptlrpc_expired_set, set);
-
- rc = l_wait_event(set->set_waitq,
- ptlrpc_check_set(NULL, set), &lwi);
-
- /*
- * LU-769 - if we ignored the signal because it was already
- * pending when we started, we need to handle it now or we risk
- * it being ignored forever
- */
- if (rc == -ETIMEDOUT &&
- (!lwi.lwi_allow_intr || set->set_allow_intr) &&
- signal_pending(current)) {
- sigset_t blocked_sigs =
- cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+ rc = wait_event_idle_timeout(
+ set->set_waitq,
+ ptlrpc_check_set(NULL, set),
+ cfs_time_seconds(timeout ? timeout : 1));
+ if (rc == 0) {
+ ptlrpc_expired_set(set);
+ rc = -ETIMEDOUT;
+ } else {
+ rc = 0;
+ }
/*
- * In fact we only interrupt for the "fatal" signals
- * like SIGINT or SIGKILL. We still ignore less
- * important signals since ptlrpc set is not easily
- * reentrant from userspace again
+ * LU-769 - if we ignored the signal because
+ * it was already pending when we started, we
+ * need to handle it now or we risk it being
+ * ignored forever
*/
- if (signal_pending(current))
- ptlrpc_interrupted_set(set);
- cfs_restore_sigs(blocked_sigs);
+ if (rc == -ETIMEDOUT &&
+ signal_pending(current)) {
+ sigset_t old, new;
+
+ siginitset(&new, LUSTRE_FATAL_SIGS);
+ sigprocmask(SIG_BLOCK, &new, &old);
+ /*
+ * In fact we only interrupt for the
+ * "fatal" signals like SIGINT or
+ * SIGKILL. We still ignore less
+ * important signals since ptlrpc set
+ * is not easily reentrant from
+ * userspace again
+ */
+ if (signal_pending(current))
+ ptlrpc_interrupted_set(set);
+ sigprocmask(SIG_SETMASK, &old, NULL);
+ }
}
LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
* the error cases -eeb.
*/
if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_set_chain);
+ list_for_each_entry(req, &set->set_requests,
+ rq_set_chain) {
spin_lock(&req->rq_lock);
req->rq_invalid_rqset = 1;
spin_unlock(&req->rq_lock);
LASSERT(atomic_read(&set->set_remaining) == 0);
rc = set->set_rc; /* rq_status of already freed requests if any */
- list_for_each(tmp, &set->set_requests) {
- req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
if (req->rq_status != 0)
rc = req->rq_status;
sptlrpc_cli_free_repbuf(request);
if (request->rq_import) {
+ if (!ptlrpcd_check_work(request)) {
+ LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0);
+ atomic_dec(&request->rq_import->imp_reqs);
+ }
class_import_put(request->rq_import);
request->rq_import = NULL;
}
*/
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
- int rc;
- struct l_wait_info lwi;
-
+ bool discard = false;
/*
* Might sleep.
*/
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
request->rq_reply_deadline = ktime_get_real_seconds() +
- LONG_UNLINK;
+ PTLRPC_REQ_LONG_UNLINK;
/*
* Nothing left to do.
*/
- if (!ptlrpc_client_recv_or_unlink(request))
+ if (!__ptlrpc_cli_wait_unlink(request, &discard))
RETURN(1);
LNetMDUnlink(request->rq_reply_md_h);
+ if (discard) /* Discard the request-out callback */
+ __LNetMDUnlink(request->rq_req_md_h, discard);
+
/*
* Let's check it once again.
*/
- if (!ptlrpc_client_recv_or_unlink(request))
+ if (!ptlrpc_cli_wait_unlink(request))
RETURN(1);
/* Move to "Unregistering" phase as reply was not unlinked yet. */
RETURN(0);
/*
- * We have to l_wait_event() whatever the result, to give liblustre
+ * We have to wait_event_idle_timeout() whatever the result, to get
* a chance to run reply_in_callback(), and to make sure we've
* unlinked before returning a req to the pool.
*/
for (;;) {
- /* The wq argument is ignored by user-space wait_event macros */
wait_queue_head_t *wq = (request->rq_set) ?
&request->rq_set->set_waitq :
&request->rq_reply_waitq;
+ int seconds = PTLRPC_REQ_LONG_UNLINK;
/*
* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs
*/
- lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
- cfs_time_seconds(1), NULL, NULL);
- rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
- &lwi);
- if (rc == 0) {
+ while (seconds > 0 &&
+ wait_event_idle_timeout(
+ *wq,
+ !ptlrpc_cli_wait_unlink(request),
+ cfs_time_seconds(1)) == 0)
+ seconds -= 1;
+ if (seconds > 0) {
ptlrpc_rqphase_move(request, request->rq_next_phase);
RETURN(1);
}
- LASSERT(rc == -ETIMEDOUT);
DEBUG_REQ(D_WARNING, request,
"Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
request->rq_receiving_reply,
void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
struct obd_import *imp)
{
- struct list_head *tmp;
+ struct ptlrpc_request *iter;
assert_spin_locked(&imp->imp_lock);
LASSERT(imp->imp_replayable);
/* Balanced in ptlrpc_free_committed, usually. */
ptlrpc_request_addref(req);
- list_for_each_prev(tmp, &imp->imp_replay_list) {
- struct ptlrpc_request *iter = list_entry(tmp,
- struct ptlrpc_request,
- rq_replay_list);
-
+ list_for_each_entry_reverse(iter, &imp->imp_replay_list,
+ rq_replay_list) {
/*
* We may have duplicate transnos if we create and then
* open a file, or for closes retained if to match creating
}
/* for distributed debugging */
- lustre_msg_set_status(req->rq_reqmsg, current_pid());
+ lustre_msg_set_status(req->rq_reqmsg, current->pid);
/* add a ref for the set (see comment in ptlrpc_set_add_req) */
ptlrpc_request_addref(req);
if (!ptlrpc_client_replied(req) ||
(req->rq_bulk &&
lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) {
- DEBUG_REQ(D_ERROR, req, "request replay timed out.\n");
+ DEBUG_REQ(D_ERROR, req, "request replay timed out");
GOTO(out, rc = -ETIMEDOUT);
}
/** VBR: check version failure */
if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
/** replay was failed due to version mismatch */
- DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
+ DEBUG_REQ(D_WARNING, req, "Version mismatch during replay");
spin_lock(&imp->imp_lock);
imp->imp_vbr_failed = 1;
spin_unlock(&imp->imp_lock);
/* transaction number shouldn't be bigger than the latest replayed */
if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
DEBUG_REQ(D_ERROR, req,
- "Reported transno %llu is bigger than the replayed one: %llu",
+ "Reported transno=%llu is bigger than replayed=%llu",
req->rq_transno,
lustre_msg_get_transno(req->rq_reqmsg));
GOTO(out, rc = -EINVAL);
}
- DEBUG_REQ(D_HA, req, "got rep");
+ DEBUG_REQ(D_HA, req, "got reply");
/* let the callback do fixups, possibly including in the request */
if (req->rq_replay_cb)
LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
+ aa = ptlrpc_req_async_args(aa, req);
memset(aa, 0, sizeof(*aa));
/* Prepare request to be resent with ptlrpcd */
ptlrpc_at_set_req_timeout(req);
/* Tell server net_latency to calculate how long to wait for reply. */
- lustre_msg_set_service_time(req->rq_reqmsg,
- ptlrpc_at_get_net_latency(req));
+ lustre_msg_set_service_timeout(req->rq_reqmsg,
+ ptlrpc_at_get_net_latency(req));
DEBUG_REQ(D_HA, req, "REPLAY");
atomic_inc(&req->rq_import->imp_replay_inflight);
*/
void ptlrpc_abort_inflight(struct obd_import *imp)
{
- struct list_head *tmp, *n;
-
+ struct ptlrpc_request *req;
ENTRY;
+
/*
* Make sure that no new requests get processed for this import.
* ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
* this flag and then putting requests on sending_list or delayed_list.
*/
- spin_lock(&imp->imp_lock);
+ assert_spin_locked(&imp->imp_lock);
/*
* XXX locking? Maybe we should remove each request with the list
* locked? Also, how do we know if the requests on the list are
* being freed at this time?
*/
- list_for_each_safe(tmp, n, &imp->imp_sending_list) {
- struct ptlrpc_request *req = list_entry(tmp,
- struct ptlrpc_request,
- rq_list);
-
+ list_for_each_entry(req, &imp->imp_sending_list, rq_list) {
DEBUG_REQ(D_RPCTRACE, req, "inflight");
spin_lock(&req->rq_lock);
spin_unlock(&req->rq_lock);
}
- list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_list);
-
+ list_for_each_entry(req, &imp->imp_delayed_list, rq_list) {
DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
spin_lock(&req->rq_lock);
if (imp->imp_replayable)
ptlrpc_free_committed(imp);
- spin_unlock(&imp->imp_lock);
-
EXIT;
}
*/
void ptlrpc_abort_set(struct ptlrpc_request_set *set)
{
- struct list_head *tmp, *pos;
+ struct ptlrpc_request *req;
LASSERT(set != NULL);
- list_for_each_safe(pos, tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(pos, struct ptlrpc_request,
- rq_set_chain);
-
+ list_for_each_entry(req, &set->set_requests, rq_set_chain) {
spin_lock(&req->rq_lock);
if (req->rq_phase != RQ_PHASE_RPC) {
spin_unlock(&req->rq_lock);
void ptlrpc_init_xid(void)
{
time64_t now = ktime_get_real_seconds();
+ u64 xid;
- spin_lock_init(&ptlrpc_last_xid_lock);
if (now < YEAR_2004) {
- get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
- ptlrpc_last_xid >>= 2;
- ptlrpc_last_xid |= (1ULL << 61);
+ get_random_bytes(&xid, sizeof(xid));
+ xid >>= 2;
+ xid |= (1ULL << 61);
} else {
- ptlrpc_last_xid = (__u64)now << 20;
+ xid = (u64)now << 20;
}
/* Need to always be aligned to a power-of-two for mutli-bulk BRW */
- CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
- ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
+ BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) !=
+ 0);
+ xid &= PTLRPC_BULK_OPS_MASK;
+ atomic64_set(&ptlrpc_last_xid, xid);
}
/**
*/
__u64 ptlrpc_next_xid(void)
{
- __u64 next;
-
- spin_lock(&ptlrpc_last_xid_lock);
- next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
- ptlrpc_last_xid = next;
- spin_unlock(&ptlrpc_last_xid_lock);
-
- return next;
+ return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid);
}
/**
|| req->rq_mbits == 0) {
req->rq_mbits = req->rq_xid;
} else {
- int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV;
- req->rq_mbits -= total_md - 1;
+ req->rq_mbits -= bd->bd_md_count - 1;
}
} else {
/*
* that server can infer the number of bulks that were prepared,
* see LU-1431
*/
- req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
- LNET_MAX_IOV) - 1;
+ req->rq_mbits += bd->bd_md_count - 1;
/*
* Set rq_xid as rq_mbits to indicate the final bulk for the old
*/
__u64 ptlrpc_sample_next_xid(void)
{
-#if BITS_PER_LONG == 32
- /* need to avoid possible word tearing on 32-bit systems */
- __u64 next;
-
- spin_lock(&ptlrpc_last_xid_lock);
- next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
- spin_unlock(&ptlrpc_last_xid_lock);
-
- return next;
-#else
- /* No need to lock, since returned value is racy anyways */
- return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-#endif
+ return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT;
}
EXPORT_SYMBOL(ptlrpc_sample_next_xid);
req->rq_no_delay = req->rq_no_resend = 1;
req->rq_pill.rc_fmt = (void *)&worker_format;
- CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
- args = ptlrpc_req_async_args(req);
+ args = ptlrpc_req_async_args(args, req);
args->cb = cb;
args->cbdata = cbdata;