X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=85bb30d1326475897ea8dd9b2853f458cad01925;hp=038ff31a480fdaa30af168fec498dbda449f6b8f;hb=917655fc2938b90a9c246dd2d58408c42aa1658d;hpb=61f9847a812f95d5140edee1fb25bfbe6cb70bb7 diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 038ff31..85bb30d 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -35,6 +35,9 @@ #define DEBUG_SUBSYSTEM S_RPC #include +#include + +#include #include #include #include @@ -44,6 +47,50 @@ #include "ptlrpc_internal.h" +static void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1); +} + +static void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0); +} + +static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc) +{ + int i; + + for (i = 0; i < desc->bd_iov_count ; i++) + put_page(BD_GET_KIOV(desc, i).kiov_page); +} + +static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc, + void *frag, int len) +{ + unsigned int offset = (uintptr_t)frag & ~PAGE_MASK; + + ENTRY; + while (len > 0) { + int page_len = min_t(unsigned int, PAGE_SIZE - offset, + len); + uintptr_t vaddr = (uintptr_t) frag; + + ptlrpc_prep_bulk_page_nopin(desc, + lnet_kvaddr_to_page(vaddr), + offset, page_len); + offset = 0; + len -= page_len; + frag += page_len; + } + + RETURN(desc->bd_nob); +} + const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_pin, .release_frags = ptlrpc_release_bulk_page_pin, @@ -53,6 +100,7 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops); const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_nopin, .release_frags = ptlrpc_release_bulk_noop, + .add_iov_frag = ptlrpc_prep_bulk_frag_pages, }; EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops); @@ -68,12 +116,12 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async); /** * Initialize passed in client structure \a cl. */ -void ptlrpc_init_client(int req_portal, int rep_portal, char *name, - struct ptlrpc_client *cl) +void ptlrpc_init_client(int req_portal, int rep_portal, const char *name, + struct ptlrpc_client *cl) { - cl->cli_request_portal = req_portal; - cl->cli_reply_portal = rep_portal; - cl->cli_name = name; + cl->cli_request_portal = req_portal; + cl->cli_reply_portal = rep_portal; + cl->cli_name = name; } EXPORT_SYMBOL(ptlrpc_init_client); @@ -84,12 +132,14 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid, lnet_nid_t nid4refnet) { struct ptlrpc_connection *c; - lnet_nid_t self; + lnet_nid_t self; struct lnet_process_id peer; - int err; + int err; - /* ptlrpc_uuid_to_peer() initializes its 2nd parameter - * before accessing its values. */ + /* + * ptlrpc_uuid_to_peer() initializes its 2nd parameter + * before accessing its values. + */ /* coverity[uninit_use_in_call] */ peer.nid = nid4refnet; err = ptlrpc_uuid_to_peer(uuid, &peer, &self); @@ -113,9 +163,10 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid, * Allocate and initialize new bulk descriptor on the sender. * Returns pointer to the descriptor or NULL on error. */ -struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, +struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, + unsigned int max_brw, enum ptlrpc_bulk_op_type type, - unsigned portal, + unsigned int portal, const struct ptlrpc_bulk_frag_ops *ops) { struct ptlrpc_bulk_desc *desc; @@ -128,17 +179,17 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, ops->add_iov_frag != NULL)); OBD_ALLOC_PTR(desc); - if (desc == NULL) + if (!desc) return NULL; if (type & PTLRPC_BULK_BUF_KIOV) { OBD_ALLOC_LARGE(GET_KIOV(desc), nfrags * sizeof(*GET_KIOV(desc))); - if (GET_KIOV(desc) == NULL) + if (!GET_KIOV(desc)) goto out; } else { OBD_ALLOC_LARGE(GET_KVEC(desc), nfrags * sizeof(*GET_KVEC(desc))); - if (GET_KVEC(desc) == NULL) + if (!GET_KVEC(desc)) goto out; } @@ -149,11 +200,13 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, desc->bd_portal = portal; desc->bd_type = type; desc->bd_md_count = 0; - desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *) ops; + desc->bd_frag_ops = ops; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); - /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this - * node. Negotiated ocd_brw_size will always be <= this number. */ + /* + * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this + * node. Negotiated ocd_brw_size will always be <= this number. + */ for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++) LNetInvalidateMDHandle(&desc->bd_mds[i]); @@ -171,9 +224,10 @@ out: * error. */ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, - unsigned nfrags, unsigned max_brw, + unsigned int nfrags, + unsigned int max_brw, unsigned int type, - unsigned portal, + unsigned int portal, const struct ptlrpc_bulk_frag_ops *ops) { @@ -184,20 +238,19 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, LASSERT(ptlrpc_is_bulk_op_passive(type)); desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops); - if (desc == NULL) + if (!desc) RETURN(NULL); - desc->bd_import_generation = req->rq_import_generation; - desc->bd_import = class_import_get(imp); - desc->bd_req = req; + desc->bd_import = class_import_get(imp); + desc->bd_req = req; - desc->bd_cbid.cbid_fn = client_bulk_callback; - desc->bd_cbid.cbid_arg = desc; + desc->bd_cbid.cbid_fn = client_bulk_callback; + desc->bd_cbid.cbid_arg = desc; - /* This makes req own desc, and free it when she frees herself */ - req->rq_bulk = desc; + /* This makes req own desc, and free it when she frees herself */ + req->rq_bulk = desc; - return desc; + return desc; } EXPORT_SYMBOL(ptlrpc_prep_bulk_imp); @@ -233,6 +286,7 @@ int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc, void *frag, int len) { struct kvec *iovec; + ENTRY; LASSERT(desc->bd_iov_count < desc->bd_max_iov); @@ -276,10 +330,10 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) OBD_FREE_LARGE(GET_KIOV(desc), - desc->bd_max_iov * sizeof(*GET_KIOV(desc))); + desc->bd_max_iov * sizeof(*GET_KIOV(desc))); else OBD_FREE_LARGE(GET_KVEC(desc), - desc->bd_max_iov * sizeof(*GET_KVEC(desc))); + desc->bd_max_iov * sizeof(*GET_KVEC(desc))); OBD_FREE_PTR(desc); EXIT; } @@ -291,79 +345,87 @@ EXPORT_SYMBOL(ptlrpc_free_bulk); */ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) { - __u32 serv_est; - int idx; - struct imp_at *at; - - LASSERT(req->rq_import); - - if (AT_OFF) { - /* non-AT settings */ - /** - * \a imp_server_timeout means this is reverse import and - * we send (currently only) ASTs to the client and cannot afford - * to wait too long for the reply, otherwise the other client - * (because of which we are sending this request) would - * timeout waiting for us - */ - req->rq_timeout = req->rq_import->imp_server_timeout ? - obd_timeout / 2 : obd_timeout; - } else { - at = &req->rq_import->imp_at; - idx = import_at_get_index(req->rq_import, - req->rq_request_portal); - serv_est = at_get(&at->iat_service_estimate[idx]); - req->rq_timeout = at_est2timeout(serv_est); - } - /* We could get even fancier here, using history to predict increased - loading... */ - - /* Let the server know what this RPC timeout is by putting it in the - reqmsg*/ - lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); + __u32 serv_est; + int idx; + struct imp_at *at; + + LASSERT(req->rq_import); + + if (AT_OFF) { + /* non-AT settings */ + /** + * \a imp_server_timeout means this is reverse import and + * we send (currently only) ASTs to the client and cannot afford + * to wait too long for the reply, otherwise the other client + * (because of which we are sending this request) would + * timeout waiting for us + */ + req->rq_timeout = req->rq_import->imp_server_timeout ? + obd_timeout / 2 : obd_timeout; + } else { + at = &req->rq_import->imp_at; + idx = import_at_get_index(req->rq_import, + req->rq_request_portal); + serv_est = at_get(&at->iat_service_estimate[idx]); + req->rq_timeout = at_est2timeout(serv_est); + } + /* + * We could get even fancier here, using history to predict increased + * loading... + */ + + /* + * Let the server know what this RPC timeout is by putting it in the + * reqmsg + */ + lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); } EXPORT_SYMBOL(ptlrpc_at_set_req_timeout); /* Adjust max service estimate based on server value */ static void ptlrpc_at_adj_service(struct ptlrpc_request *req, - unsigned int serv_est) + unsigned int serv_est) { - int idx; - unsigned int oldse; - struct imp_at *at; + int idx; + unsigned int oldse; + struct imp_at *at; - LASSERT(req->rq_import); - at = &req->rq_import->imp_at; + LASSERT(req->rq_import); + at = &req->rq_import->imp_at; - idx = import_at_get_index(req->rq_import, req->rq_request_portal); - /* max service estimates are tracked on the server side, - so just keep minimal history here */ - oldse = at_measured(&at->iat_service_estimate[idx], serv_est); - if (oldse != 0) - CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d " - "has changed from %d to %d\n", - req->rq_import->imp_obd->obd_name,req->rq_request_portal, - oldse, at_get(&at->iat_service_estimate[idx])); + idx = import_at_get_index(req->rq_import, req->rq_request_portal); + /* + * max service estimates are tracked on the server side, + * so just keep minimal history here + */ + oldse = at_measured(&at->iat_service_estimate[idx], serv_est); + if (oldse != 0) + CDEBUG(D_ADAPTTO, + "The RPC service estimate for %s ptl %d has changed from %d to %d\n", + req->rq_import->imp_obd->obd_name, + req->rq_request_portal, + oldse, at_get(&at->iat_service_estimate[idx])); } /* Expected network latency per remote node (secs) */ int ptlrpc_at_get_net_latency(struct ptlrpc_request *req) { - return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency); + return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency); } /* Adjust expected network latency */ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, unsigned int service_time) { - unsigned int nl, oldnl; - struct imp_at *at; + unsigned int nl, oldnl; + struct imp_at *at; time64_t now = ktime_get_real_seconds(); - LASSERT(req->rq_import); + LASSERT(req->rq_import); if (service_time > now - req->rq_sent + 3) { - /* bz16408, however, this can also happen if early reply + /* + * b=16408, however, this can also happen if early reply * is lost and client RPC is expired and resent, early reply * or reply of original RPC can still be fit in reply buffer * of resent RPC, now client is measuring time from the @@ -377,39 +439,40 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, return; } - /* Network latency is total time less server processing time */ + /* Network latency is total time less server processing time */ nl = max_t(int, now - req->rq_sent - service_time, 0) + 1; /* st rounding */ at = &req->rq_import->imp_at; - oldnl = at_measured(&at->iat_net_latency, nl); - if (oldnl != 0) - CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) " - "has changed from %d to %d\n", - req->rq_import->imp_obd->obd_name, - obd_uuid2str( - &req->rq_import->imp_connection->c_remote_uuid), - oldnl, at_get(&at->iat_net_latency)); + oldnl = at_measured(&at->iat_net_latency, nl); + if (oldnl != 0) + CDEBUG(D_ADAPTTO, + "The network latency for %s (nid %s) has changed from %d to %d\n", + req->rq_import->imp_obd->obd_name, + obd_uuid2str(&req->rq_import->imp_connection->c_remote_uuid), + oldnl, at_get(&at->iat_net_latency)); } static int unpack_reply(struct ptlrpc_request *req) { - int rc; + int rc; - if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) { - rc = ptlrpc_unpack_rep_msg(req, req->rq_replen); - if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc); - return(-EPROTO); - } - } + if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) { + rc = ptlrpc_unpack_rep_msg(req, req->rq_replen); + if (rc) { + DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d", + rc); + return -EPROTO; + } + } - rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); - if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc); - return(-EPROTO); - } - return 0; + rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); + if (rc) { + DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d", + rc); + return -EPROTO; + } + return 0; } /** @@ -424,7 +487,7 @@ __must_hold(&req->rq_lock) int rc; ENTRY; - req->rq_early = 0; + req->rq_early = 0; spin_unlock(&req->rq_lock); rc = sptlrpc_cli_unwrap_early_reply(req, &early_req); @@ -440,29 +503,35 @@ __must_hold(&req->rq_lock) RETURN(rc); } - /* Use new timeout value just to adjust the local value for this + /* + * Use new timeout value just to adjust the local value for this * request, don't include it into at_history. It is unclear yet why * service time increased and should it be counted or skipped, e.g. * that can be recovery case or some error or server, the real reply - * will add all new data if it is worth to add. */ + * will add all new data if it is worth to add. + */ req->rq_timeout = lustre_msg_get_timeout(early_req->rq_repmsg); lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); /* Network latency can be adjusted, it is pure network delays */ ptlrpc_at_adj_net_latency(req, - lustre_msg_get_service_time(early_req->rq_repmsg)); + lustre_msg_get_service_time(early_req->rq_repmsg)); sptlrpc_cli_finish_early_reply(early_req); spin_lock(&req->rq_lock); olddl = req->rq_deadline; - /* server assumes it now has rq_timeout from when the request + /* + * server assumes it now has rq_timeout from when the request * arrived, so the client should give it at least that long. * since we don't know the arrival time we'll use the original - * sent time */ + * sent time + */ req->rq_deadline = req->rq_sent + req->rq_timeout + ptlrpc_at_get_net_latency(req); + /* The below message is checked in replay-single.sh test_65{a,b} */ + /* The below message is checked in sanity-{gss,krb5} test_8 */ DEBUG_REQ(D_ADAPTTO, req, "Early reply #%d, new deadline in %llds (%llds)", req->rq_early_count, @@ -479,7 +548,7 @@ int ptlrpc_request_cache_init(void) request_cache = kmem_cache_create("ptlrpc_cache", sizeof(struct ptlrpc_request), 0, SLAB_HWCACHE_ALIGN, NULL); - return request_cache == NULL ? -ENOMEM : 0; + return request_cache ? 0 : -ENOMEM; } void ptlrpc_request_cache_fini(void) @@ -530,24 +599,22 @@ EXPORT_SYMBOL(ptlrpc_free_rq_pool); */ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) { - int i; - int size = 1; + int i; + int size = 1; - while (size < pool->prp_rq_size) - size <<= 1; + while (size < pool->prp_rq_size) + size <<= 1; LASSERTF(list_empty(&pool->prp_req_list) || - size == pool->prp_rq_size, - "Trying to change pool size with nonempty pool " - "from %d to %d bytes\n", pool->prp_rq_size, size); + size == pool->prp_rq_size, + "Trying to change pool size with nonempty pool from %d to %d bytes\n", + pool->prp_rq_size, size); - spin_lock(&pool->prp_lock); pool->prp_rq_size = size; for (i = 0; i < num_rq; i++) { struct ptlrpc_request *req; struct lustre_msg *msg; - spin_unlock(&pool->prp_lock); req = ptlrpc_request_cache_alloc(GFP_NOFS); if (!req) return i; @@ -561,8 +628,8 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) req->rq_pool = pool; spin_lock(&pool->prp_lock); list_add_tail(&req->rq_list, &pool->prp_req_list); + spin_unlock(&pool->prp_lock); } - spin_unlock(&pool->prp_lock); return num_rq; } EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); @@ -581,13 +648,14 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, { struct ptlrpc_request_pool *pool; - OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool)); + OBD_ALLOC_PTR(pool); if (!pool) return NULL; - /* Request next power of two for the allocation, because internally - kernel would do exactly this */ - + /* + * Request next power of two for the allocation, because internally + * kernel would do exactly this + */ spin_lock_init(&pool->prp_lock); INIT_LIST_HEAD(&pool->prp_req_list); pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD; @@ -605,18 +673,20 @@ EXPORT_SYMBOL(ptlrpc_init_rq_pool); static struct ptlrpc_request * ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) { - struct ptlrpc_request *request; - struct lustre_msg *reqbuf; + struct ptlrpc_request *request; + struct lustre_msg *reqbuf; - if (!pool) - return NULL; + if (!pool) + return NULL; spin_lock(&pool->prp_lock); - /* See if we have anything in a pool, and bail out if nothing, + /* + * See if we have anything in a pool, and bail out if nothing, * in writeout path, where this matters, this is safe to do, because * nothing is lost in this case, and when some in-flight requests - * complete, this code will be called again. */ + * complete, this code will be called again. + */ if (unlikely(list_empty(&pool->prp_req_list))) { spin_unlock(&pool->prp_lock); return NULL; @@ -627,16 +697,16 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) list_del_init(&request->rq_list); spin_unlock(&pool->prp_lock); - LASSERT(request->rq_reqbuf); - LASSERT(request->rq_pool); + LASSERT(request->rq_reqbuf); + LASSERT(request->rq_pool); - reqbuf = request->rq_reqbuf; - memset(request, 0, sizeof(*request)); - request->rq_reqbuf = reqbuf; - request->rq_reqbuf_len = pool->prp_rq_size; - request->rq_pool = pool; + reqbuf = request->rq_reqbuf; + memset(request, 0, sizeof(*request)); + request->rq_reqbuf = reqbuf; + request->rq_reqbuf_len = pool->prp_rq_size; + request->rq_pool = pool; - return request; + return request; } /** @@ -655,9 +725,9 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) void ptlrpc_add_unreplied(struct ptlrpc_request *req) { - struct obd_import *imp = req->rq_import; - struct list_head *tmp; - struct ptlrpc_request *iter; + struct obd_import *imp = req->rq_import; + struct list_head *tmp; + struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); LASSERT(list_empty(&req->rq_unreplied_list)); @@ -689,6 +759,43 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req) spin_unlock(&req->rq_import->imp_lock); } +static atomic64_t ptlrpc_last_xid; + +static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_import->imp_lock); + list_del_init(&req->rq_unreplied_list); + ptlrpc_assign_next_xid_nolock(req); + spin_unlock(&req->rq_import->imp_lock); + DEBUG_REQ(D_RPCTRACE, req, "reassign xid"); +} + +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc; + __u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc); + lustre_msg_set_tag(req->rq_reqmsg, tag); + ptlrpc_reassign_next_xid(req); +} +EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot); + +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req) +{ + __u16 tag = lustre_msg_get_tag(req->rq_reqmsg); + + if (tag != 0) { + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + obd_put_mod_rpc_slot(cli, opc, tag); + } +} +EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot); + int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, __u32 version, int opcode, char **bufs, struct ptlrpc_cli_ctx *ctx) @@ -704,7 +811,7 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, imp = request->rq_import; lengths = request->rq_pill.rc_area[RCL_CLIENT]; - if (ctx != NULL) { + if (ctx) { request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx); } else { rc = sptlrpc_req_get_ctx(request); @@ -740,21 +847,25 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, ptlrpc_at_set_req_timeout(request); lustre_msg_set_opc(request->rq_reqmsg, opcode); - ptlrpc_assign_next_xid(request); /* Let's setup deadline for req/reply/bulk unlink for opcode. */ if (cfs_fail_val == opcode) { time64_t *fail_t = NULL, *fail2_t = NULL; - if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) + if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) { fail_t = &request->rq_bulk_deadline; - else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) { fail_t = &request->rq_reply_deadline; - else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) { fail_t = &request->rq_req_deadline; - else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) { + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) { fail_t = &request->rq_reply_deadline; fail2_t = &request->rq_bulk_deadline; + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) { + time64_t now = ktime_get_real_seconds(); + u64 xid = ((u64)now >> 4) << 24; + + atomic64_set(&ptlrpc_last_xid, xid); } if (fail_t) { @@ -771,6 +882,7 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, msleep(4 * MSEC_PER_SEC); } } + ptlrpc_assign_next_xid(request); RETURN(0); @@ -781,7 +893,6 @@ out_free: class_import_put(imp); return rc; - } EXPORT_SYMBOL(ptlrpc_request_bufs_pack); @@ -790,32 +901,9 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack); * steps if necessary. */ int ptlrpc_request_pack(struct ptlrpc_request *request, - __u32 version, int opcode) + __u32 version, int opcode) { - int rc; - rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); - if (rc) - return rc; - - /* For some old 1.8 clients (< 1.8.7), they will LASSERT the size of - * ptlrpc_body sent from server equal to local ptlrpc_body size, so we - * have to send old ptlrpc_body to keep interoprability with these - * clients. - * - * Only three kinds of server->client RPCs so far: - * - LDLM_BL_CALLBACK - * - LDLM_CP_CALLBACK - * - LDLM_GL_CALLBACK - * - * XXX This should be removed whenever we drop the interoprability with - * the these old clients. - */ - if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK || - opcode == LDLM_GL_CALLBACK) - req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY, - sizeof(struct ptlrpc_body_v2), RCL_CLIENT); - - return rc; + return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); } EXPORT_SYMBOL(ptlrpc_request_pack); @@ -842,7 +930,7 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, LASSERTF((unsigned long)imp > 0x1000, "%p", imp); LASSERT(imp != LP_POISON); LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n", - imp->imp_client); + imp->imp_client); LASSERT(imp->imp_client != LP_POISON); request->rq_import = class_import_get(imp); @@ -861,18 +949,47 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, */ static struct ptlrpc_request * ptlrpc_request_alloc_internal(struct obd_import *imp, - struct ptlrpc_request_pool * pool, - const struct req_format *format) + struct ptlrpc_request_pool *pool, + const struct req_format *format) { - struct ptlrpc_request *request; + struct ptlrpc_request *request; - request = __ptlrpc_request_alloc(imp, pool); - if (request == NULL) - return NULL; + request = __ptlrpc_request_alloc(imp, pool); + if (!request) + return NULL; - req_capsule_init(&request->rq_pill, request, RCL_CLIENT); - req_capsule_set(&request->rq_pill, format); - return request; + /* + * initiate connection if needed when the import has been + * referenced by the new request to avoid races with disconnect + */ + if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) { + int rc; + + CDEBUG_LIMIT(imp->imp_idle_debug, + "%s: reconnect after %llds idle\n", + imp->imp_obd->obd_name, ktime_get_real_seconds() - + imp->imp_last_reply_time); + spin_lock(&imp->imp_lock); + if (imp->imp_state == LUSTRE_IMP_IDLE) { + imp->imp_generation++; + imp->imp_initiated_at = imp->imp_generation; + imp->imp_state = LUSTRE_IMP_NEW; + + /* connect_import_locked releases imp_lock */ + rc = ptlrpc_connect_import_locked(imp); + if (rc < 0) { + ptlrpc_request_free(request); + return NULL; + } + ptlrpc_pinger_add_import(imp); + } else { + spin_unlock(&imp->imp_lock); + } + } + + req_capsule_init(&request->rq_pill, request, RCL_CLIENT); + req_capsule_set(&request->rq_pill, format); + return request; } /** @@ -880,9 +997,9 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, * buffer structure according to capsule template \a format. */ struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp, - const struct req_format *format) + const struct req_format *format) { - return ptlrpc_request_alloc_internal(imp, NULL, format); + return ptlrpc_request_alloc_internal(imp, NULL, format); } EXPORT_SYMBOL(ptlrpc_request_alloc); @@ -890,11 +1007,12 @@ EXPORT_SYMBOL(ptlrpc_request_alloc); * Allocate new request structure for import \a imp from pool \a pool and * initialize its buffer structure according to capsule template \a format. */ -struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp, - struct ptlrpc_request_pool * pool, - const struct req_format *format) +struct ptlrpc_request * +ptlrpc_request_alloc_pool(struct obd_import *imp, + struct ptlrpc_request_pool *pool, + const struct req_format *format) { - return ptlrpc_request_alloc_internal(imp, pool, format); + return ptlrpc_request_alloc_internal(imp, pool, format); } EXPORT_SYMBOL(ptlrpc_request_alloc_pool); @@ -919,20 +1037,20 @@ EXPORT_SYMBOL(ptlrpc_request_free); * Returns allocated request or NULL on error. */ struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp, - const struct req_format *format, - __u32 version, int opcode) + const struct req_format *format, + __u32 version, int opcode) { - struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format); - int rc; + struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format); + int rc; - if (req) { - rc = ptlrpc_request_pack(req, version, opcode); - if (rc) { - ptlrpc_request_free(req); - req = NULL; - } - } - return req; + if (req) { + rc = ptlrpc_request_pack(req, version, opcode); + if (rc) { + ptlrpc_request_free(req); + req = NULL; + } + } + return req; } EXPORT_SYMBOL(ptlrpc_request_alloc_pack); @@ -942,12 +1060,12 @@ EXPORT_SYMBOL(ptlrpc_request_alloc_pack); */ struct ptlrpc_request_set *ptlrpc_prep_set(void) { - struct ptlrpc_request_set *set; - int cpt; + struct ptlrpc_request_set *set; + int cpt; ENTRY; cpt = cfs_cpt_current(cfs_cpt_table, 0); - OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof *set); + OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof(*set)); if (!set) RETURN(NULL); atomic_set(&set->set_refcount, 1); @@ -1001,10 +1119,11 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, */ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct list_head *next; - int expected_phase; - int n = 0; + struct list_head *tmp; + struct list_head *next; + int expected_phase; + int n = 0; + ENTRY; /* Requests on the set should either all be completed, or all be new */ @@ -1041,8 +1160,8 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) req->rq_invalid_rqset = 0; spin_unlock(&req->rq_lock); - ptlrpc_req_finished (req); - } + ptlrpc_req_finished(req); + } LASSERT(atomic_read(&set->set_remaining) == 0); @@ -1056,8 +1175,9 @@ EXPORT_SYMBOL(ptlrpc_set_destroy); * Assumes request reference from the caller. */ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { + LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE); LASSERT(list_empty(&req->rq_set_chain)); if (req->rq_allow_intr) @@ -1069,12 +1189,14 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, atomic_inc(&set->set_remaining); req->rq_queued_time = ktime_get_seconds(); - if (req->rq_reqmsg != NULL) + if (req->rq_reqmsg) lustre_msg_set_jobid(req->rq_reqmsg, NULL); - if (set->set_producer != NULL) - /* If the request set has a producer callback, the RPC must be - * sent straight away */ + if (set->set_producer) + /* + * If the request set has a producer callback, the RPC must be + * sent straight away + */ ptlrpc_send_new_req(req); } EXPORT_SYMBOL(ptlrpc_set_add_req); @@ -1085,12 +1207,12 @@ EXPORT_SYMBOL(ptlrpc_set_add_req); * Currently only used for ptlrpcd. */ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { - struct ptlrpc_request_set *set = pc->pc_set; - int count, i; + struct ptlrpc_request_set *set = pc->pc_set; + int count, i; - LASSERT(req->rq_set == NULL); + LASSERT(req->rq_set == NULL); LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0); spin_lock(&set->set_new_req_lock); @@ -1107,9 +1229,11 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, if (count == 1) { wake_up(&set->set_waitq); - /* XXX: It maybe unnecessary to wakeup all the partners. But to + /* + * XXX: It maybe unnecessary to wakeup all the partners. But to * guarantee the async RPC can be processed ASAP, we have - * no other better choice. It maybe fixed in future. */ + * no other better choice. It maybe fixed in future. + */ for (i = 0; i < pc->pc_npartners; i++) wake_up(&pc->pc_partners[i]->pc_set->set_waitq); } @@ -1126,33 +1250,36 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, * The imp->imp_lock must be held. */ static int ptlrpc_import_delay_req(struct obd_import *imp, - struct ptlrpc_request *req, int *status) + struct ptlrpc_request *req, int *status) { - int delay = 0; - ENTRY; + int delay = 0; - LASSERT (status != NULL); - *status = 0; + ENTRY; + LASSERT(status); + *status = 0; if (req->rq_ctx_init || req->rq_ctx_fini) { /* always allow ctx init/fini rpc go through */ } else if (imp->imp_state == LUSTRE_IMP_NEW) { - DEBUG_REQ(D_ERROR, req, "Uninitialized import."); + DEBUG_REQ(D_ERROR, req, "Uninitialized import"); *status = -EIO; } else if (imp->imp_state == LUSTRE_IMP_CLOSED) { unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg); - /* pings or MDS-equivalent STATFS may safely race with umount */ + /* + * pings or MDS-equivalent STATFS may safely + * race with umount + */ DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ? - D_HA : D_ERROR, req, "IMP_CLOSED "); + D_HA : D_ERROR, req, "IMP_CLOSED"); *status = -EIO; } else if (ptlrpc_send_limit_expired(req)) { - /* probably doesn't need to be a D_ERROR after initial testing*/ - DEBUG_REQ(D_HA, req, "send limit expired "); + /* probably doesn't need to be a D_ERROR afterinitial testing */ + DEBUG_REQ(D_HA, req, "send limit expired"); *status = -ETIMEDOUT; } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING && imp->imp_state == LUSTRE_IMP_CONNECTING) { - /* allow CONNECT even if import is invalid */ ; + ;/* allow CONNECT even if import is invalid */ if (atomic_read(&imp->imp_inval_count) != 0) { DEBUG_REQ(D_ERROR, req, "invalidate in flight"); *status = -EIO; @@ -1160,23 +1287,25 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, } else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) { if (!imp->imp_deactive) DEBUG_REQ(D_NET, req, "IMP_INVALID"); - *status = -ESHUTDOWN; /* bz 12940 */ + *status = -ESHUTDOWN; /* b=12940 */ } else if (req->rq_import_generation != imp->imp_generation) { - DEBUG_REQ(D_ERROR, req, "req wrong generation:"); - *status = -EIO; - } else if (req->rq_send_state != imp->imp_state) { - /* invalidate in progress - any requests should be drop */ + DEBUG_REQ(D_ERROR, req, "req wrong generation:"); + *status = -EIO; + } else if (req->rq_send_state != imp->imp_state) { + /* invalidate in progress - any requests should be drop */ if (atomic_read(&imp->imp_inval_count) != 0) { - DEBUG_REQ(D_ERROR, req, "invalidate in flight"); - *status = -EIO; - } else if (req->rq_no_delay) { - *status = -EWOULDBLOCK; + DEBUG_REQ(D_ERROR, req, "invalidate in flight"); + *status = -EIO; + } else if (req->rq_no_delay && + imp->imp_generation != imp->imp_initiated_at) { + /* ignore nodelay for requests initiating connections */ + *status = -EWOULDBLOCK; } else if (req->rq_allow_replay && - (imp->imp_state == LUSTRE_IMP_REPLAY || - imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || - imp->imp_state == LUSTRE_IMP_REPLAY_WAIT || - imp->imp_state == LUSTRE_IMP_RECOVER)) { - DEBUG_REQ(D_HA, req, "allow during recovery.\n"); + (imp->imp_state == LUSTRE_IMP_REPLAY || + imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || + imp->imp_state == LUSTRE_IMP_REPLAY_WAIT || + imp->imp_state == LUSTRE_IMP_RECOVER)) { + DEBUG_REQ(D_HA, req, "allow during recovery"); } else { delay = 1; } @@ -1200,15 +1329,16 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err) /* Suppress particular reconnect errors which are to be expected. */ if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) { - /* Suppress timed out reconnect requests */ if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) || req->rq_timedout) return false; - /* Suppress most unavailable/again reconnect requests, but + /* + * Suppress most unavailable/again reconnect requests, but * print occasionally so it is clear client is trying to - * connect to a server where no target is running. */ + * connect to a server where no target is running. + */ if ((err == -ENODEV || err == -EAGAIN) && req->rq_import->imp_conn_cnt % 30 != 20) return false; @@ -1232,32 +1362,28 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err) */ static int ptlrpc_check_status(struct ptlrpc_request *req) { - int err; - ENTRY; + int rc; - err = lustre_msg_get_status(req->rq_repmsg); + ENTRY; + rc = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { struct obd_import *imp = req->rq_import; lnet_nid_t nid = imp->imp_connection->c_peer.nid; __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); - if (ptlrpc_console_allow(req, opc, err)) - LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s " - "failed: rc = %d\n", + if (ptlrpc_console_allow(req, opc, rc)) + LCONSOLE_ERROR_MSG(0x11, + "%s: operation %s to node %s failed: rc = %d\n", imp->imp_obd->obd_name, ll_opcode2str(opc), - libcfs_nid2str(nid), err); - RETURN(err < 0 ? err : -EINVAL); + libcfs_nid2str(nid), rc); + RETURN(rc < 0 ? rc : -EINVAL); } - if (err < 0) { - DEBUG_REQ(D_INFO, req, "status is %d", err); - } else if (err > 0) { - /* XXX: translate this error from net to host */ - DEBUG_REQ(D_INFO, req, "status is %d", err); - } + if (rc) + DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc); - RETURN(err); + RETURN(rc); } /** @@ -1267,20 +1393,20 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) */ static void ptlrpc_save_versions(struct ptlrpc_request *req) { - struct lustre_msg *repmsg = req->rq_repmsg; - struct lustre_msg *reqmsg = req->rq_reqmsg; - __u64 *versions = lustre_msg_get_versions(repmsg); - ENTRY; + struct lustre_msg *repmsg = req->rq_repmsg; + struct lustre_msg *reqmsg = req->rq_reqmsg; + __u64 *versions = lustre_msg_get_versions(repmsg); - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) - return; + ENTRY; + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + return; - LASSERT(versions); - lustre_msg_set_versions(reqmsg, versions); + LASSERT(versions); + lustre_msg_set_versions(reqmsg, versions); CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n", - versions[0], versions[1]); + versions[0], versions[1]); - EXIT; + EXIT; } __u64 ptlrpc_known_replied_xid(struct obd_import *imp) @@ -1318,43 +1444,45 @@ static int after_reply(struct ptlrpc_request *req) int rc; ENTRY; - LASSERT(obd != NULL); - /* repbuf must be unlinked */ + LASSERT(obd != NULL); + /* repbuf must be unlinked */ LASSERT(!req->rq_receiving_reply && req->rq_reply_unlinked); if (req->rq_reply_truncated) { - if (ptlrpc_no_resend(req)) { - DEBUG_REQ(D_ERROR, req, "reply buffer overflow," - " expected: %d, actual size: %d", - req->rq_nob_received, req->rq_repbuf_len); - RETURN(-EOVERFLOW); - } - - sptlrpc_cli_free_repbuf(req); - /* Pass the required reply buffer size (include - * space for early reply). - * NB: no need to roundup because alloc_repbuf - * will roundup it */ - req->rq_replen = req->rq_nob_received; - req->rq_nob_received = 0; + if (ptlrpc_no_resend(req)) { + DEBUG_REQ(D_ERROR, req, + "reply buffer overflow, expected=%d, actual size=%d", + req->rq_nob_received, req->rq_repbuf_len); + RETURN(-EOVERFLOW); + } + + sptlrpc_cli_free_repbuf(req); + /* + * Pass the required reply buffer size (include + * space for early reply). + * NB: no need to roundup because alloc_repbuf + * will roundup it + */ + req->rq_replen = req->rq_nob_received; + req->rq_nob_received = 0; spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); - RETURN(0); - } + RETURN(0); + } work_start = ktime_get_real(); timediff = ktime_us_delta(work_start, req->rq_sent_ns); - /* - * NB Until this point, the whole of the incoming message, - * including buflens, status etc is in the sender's byte order. - */ - rc = sptlrpc_cli_unwrap_reply(req); - if (rc) { - DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc); - RETURN(rc); - } + /* + * NB Until this point, the whole of the incoming message, + * including buflens, status etc is in the sender's byte order. + */ + rc = sptlrpc_cli_unwrap_reply(req); + if (rc) { + DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc); + RETURN(rc); + } /* * Security layer unwrap might ask resend this request. @@ -1371,7 +1499,8 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { time64_t now = ktime_get_real_seconds(); - DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS"); + DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) | + D_RPCTRACE, req, "resending request on EINPROGRESS"); spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); @@ -1379,10 +1508,12 @@ static int after_reply(struct ptlrpc_request *req) /* Readjust the timeout for current conditions */ ptlrpc_at_set_req_timeout(req); - /* delay resend to give a chance to the server to get ready. + /* + * delay resend to give a chance to the server to get ready. * The delay is increased by 1s on every resend and is capped to * the current request timeout (i.e. obd_timeout if AT is off, - * or AT service time x 125% + 5s, see at_est2timeout) */ + * or AT service time x 125% + 5s, see at_est2timeout) + */ if (req->rq_nr_resend > req->rq_timeout) req->rq_sent = now + req->rq_timeout; else @@ -1396,27 +1527,26 @@ static int after_reply(struct ptlrpc_request *req) RETURN(0); } - if (obd->obd_svc_stats != NULL) { + if (obd->obd_svc_stats) { lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR, timediff); ptlrpc_lprocfs_rpc_sent(req, timediff); } - if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY && - lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) { - DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)", - lustre_msg_get_type(req->rq_repmsg)); - RETURN(-EPROTO); - } + if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY && + lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) { + DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)", + lustre_msg_get_type(req->rq_repmsg)); + RETURN(-EPROTO); + } - if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING) - CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val); - ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg)); - ptlrpc_at_adj_net_latency(req, - lustre_msg_get_service_time(req->rq_repmsg)); + if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING) + CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val); + ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg)); + ptlrpc_at_adj_net_latency(req, + lustre_msg_get_service_time(req->rq_repmsg)); - rc = ptlrpc_check_status(req); - imp->imp_connect_error = rc; + rc = ptlrpc_check_status(req); if (rc) { /* @@ -1432,48 +1562,50 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_request_handle_notconn(req); RETURN(rc); } - } else { - /* - * Let's look if server sent slv. Do it only for RPC with - * rc == 0. - */ - ldlm_cli_update_pool(req); - } - - /* - * Store transno in reqmsg for replay. - */ - if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { - req->rq_transno = lustre_msg_get_transno(req->rq_repmsg); - lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno); - } - - if (imp->imp_replayable) { + } else { + /* + * Let's look if server sent slv. Do it only for RPC with + * rc == 0. + */ + ldlm_cli_update_pool(req); + } + + /* + * Store transno in reqmsg for replay. + */ + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { + req->rq_transno = lustre_msg_get_transno(req->rq_repmsg); + lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno); + } + + if (imp->imp_replayable) { spin_lock(&imp->imp_lock); - /* - * No point in adding already-committed requests to the replay - * list, we will just remove them immediately. b=9829 - */ - if (req->rq_transno != 0 && - (req->rq_transno > - lustre_msg_get_last_committed(req->rq_repmsg) || - req->rq_replay)) { - /** version recovery */ - ptlrpc_save_versions(req); - ptlrpc_retain_replayable_request(req, imp); - } else if (req->rq_commit_cb != NULL && + /* + * No point in adding already-committed requests to the replay + * list, we will just remove them immediately. b=9829 + */ + if (req->rq_transno != 0 && + (req->rq_transno > + lustre_msg_get_last_committed(req->rq_repmsg) || + req->rq_replay)) { + /** version recovery */ + ptlrpc_save_versions(req); + ptlrpc_retain_replayable_request(req, imp); + } else if (req->rq_commit_cb && list_empty(&req->rq_replay_list)) { - /* NB: don't call rq_commit_cb if it's already on + /* + * NB: don't call rq_commit_cb if it's already on * rq_replay_list, ptlrpc_free_committed() will call - * it later, see LU-3618 for details */ + * it later, see LU-3618 for details + */ spin_unlock(&imp->imp_lock); req->rq_commit_cb(req); spin_lock(&imp->imp_lock); - } + } - /* - * Replay-enabled imports return commit-status information. - */ + /* + * Replay-enabled imports return commit-status information. + */ committed = lustre_msg_get_last_committed(req->rq_repmsg); if (likely(committed > imp->imp_peer_committed_transno)) imp->imp_peer_committed_transno = committed; @@ -1507,25 +1639,25 @@ static int after_reply(struct ptlrpc_request *req) */ static int ptlrpc_send_new_req(struct ptlrpc_request *req) { - struct obd_import *imp = req->rq_import; - __u64 min_xid = 0; - int rc; - ENTRY; + struct obd_import *imp = req->rq_import; + __u64 min_xid = 0; + int rc; - LASSERT(req->rq_phase == RQ_PHASE_NEW); + ENTRY; + LASSERT(req->rq_phase == RQ_PHASE_NEW); /* do not try to go further if there is not enough memory in enc_pool */ - if (req->rq_sent && req->rq_bulk != NULL) + if (req->rq_sent && req->rq_bulk) if (req->rq_bulk->bd_iov_count > get_free_pages_in_pool() && pool_is_at_full_capacity()) RETURN(-ENOMEM); if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) && - (!req->rq_generation_set || - req->rq_import_generation == imp->imp_generation)) - RETURN (0); + (!req->rq_generation_set || + req->rq_import_generation == imp->imp_generation)) + RETURN(0); - ptlrpc_rqphase_move(req, RQ_PHASE_RPC); + ptlrpc_rqphase_move(req, RQ_PHASE_RPC); spin_lock(&imp->imp_lock); @@ -1561,7 +1693,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) list_add_tail(&req->rq_list, &imp->imp_sending_list); atomic_inc(&req->rq_import->imp_inflight); - /* find the known replied XID from the unreplied list, CONNECT + /* + * find the known replied XID from the unreplied list, CONNECT * and DISCONNECT requests are skipped to make the sanity check * on server side happy. see process_req_last_xid(). * @@ -1581,26 +1714,28 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) lustre_msg_set_status(req->rq_reqmsg, current_pid()); - rc = sptlrpc_req_refresh_ctx(req, -1); - if (rc) { - if (req->rq_err) { - req->rq_status = rc; - RETURN(1); - } else { + rc = sptlrpc_req_refresh_ctx(req, -1); + if (rc) { + if (req->rq_err) { + req->rq_status = rc; + RETURN(1); + } else { spin_lock(&req->rq_lock); req->rq_wait_ctx = 1; spin_unlock(&req->rq_lock); - RETURN(0); - } - } + RETURN(0); + } + } - CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc" - " %s:%s:%d:%llu:%s:%d\n", current_comm(), + CDEBUG(D_RPCTRACE, + "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, - obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg)); + obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); - rc = ptl_send_rpc(req, 0); + rc = ptl_send_rpc(req, 0); if (rc == -ENOMEM) { spin_lock(&imp->imp_lock); if (!list_empty(&req->rq_list)) { @@ -1611,27 +1746,30 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) ptlrpc_rqphase_move(req, RQ_PHASE_NEW); RETURN(rc); } - if (rc) { - DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc); + if (rc) { + DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d", + rc); spin_lock(&req->rq_lock); req->rq_net_err = 1; spin_unlock(&req->rq_lock); - RETURN(rc); - } - RETURN(0); + RETURN(rc); + } + RETURN(0); } static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) { int remaining, rc; - ENTRY; + ENTRY; LASSERT(set->set_producer != NULL); remaining = atomic_read(&set->set_remaining); - /* populate the ->set_requests list with requests until we - * reach the maximum number of RPCs in flight for this set */ + /* + * populate the ->set_requests list with requests until we + * reach the maximum number of RPCs in flight for this set + */ while (atomic_read(&set->set_remaining) < set->set_max_inflight) { rc = set->set_producer(set, set->set_producer_arg); if (rc == -ENOENT) { @@ -1658,8 +1796,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) struct list_head *tmp, *next; struct list_head comp_reqs; int force_timer_recalc = 0; - ENTRY; + ENTRY; if (atomic_read(&set->set_remaining) == 0) RETURN(1); @@ -1678,7 +1816,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) continue; } - /* This schedule point is mainly for the ptlrpcd caller of this + /* + * This schedule point is mainly for the ptlrpcd caller of this * function. Most ptlrpc sets are not long-lived and unbounded * in length, but at the least the set used by the ptlrpcd is. * Since the processing time is unbounded, we need to insert an @@ -1686,16 +1825,20 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) */ cond_resched(); - /* If the caller requires to allow to be interpreted by force + /* + * If the caller requires to allow to be interpreted by force * and it has really been interpreted, then move the request * to RQ_PHASE_INTERPRET phase in spite of what the current - * phase is. */ + * phase is. + */ if (unlikely(req->rq_allow_intr && req->rq_intr)) { req->rq_status = -EINTR; ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - /* Since it is interpreted and we have to wait for - * the reply to be unlinked, then use sync mode. */ + /* + * Since it is interpreted and we have to wait for + * the reply to be unlinked, then use sync mode. + */ async = 0; GOTO(interpret, req->rq_status); @@ -1751,82 +1894,83 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) ptlrpc_client_bulk_active(req)) continue; - /* - * Turn fail_loc off to prevent it from looping - * forever. - */ - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) { - OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK, - OBD_FAIL_ONCE); - } - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) { - OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK, - OBD_FAIL_ONCE); - } - - /* - * Move to next phase if reply was successfully - * unlinked. - */ - ptlrpc_rqphase_move(req, req->rq_next_phase); - } - - if (req->rq_phase == RQ_PHASE_INTERPRET) - GOTO(interpret, req->rq_status); - - /* - * Note that this also will start async reply unlink. - */ - if (req->rq_net_err && !req->rq_timedout) { - ptlrpc_expire_one_request(req, 1); - - /* - * Check if we still need to wait for unlink. - */ - if (ptlrpc_client_recv_or_unlink(req) || - ptlrpc_client_bulk_active(req)) - continue; - /* If there is no need to resend, fail it now. */ - if (req->rq_no_resend) { - if (req->rq_status == 0) - req->rq_status = -EIO; - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } else { - continue; - } - } - - if (req->rq_err) { + /* + * Turn fail_loc off to prevent it from looping + * forever. + */ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) { + OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK, + OBD_FAIL_ONCE); + } + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) { + OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK, + OBD_FAIL_ONCE); + } + + /* + * Move to next phase if reply was successfully + * unlinked. + */ + ptlrpc_rqphase_move(req, req->rq_next_phase); + } + + if (req->rq_phase == RQ_PHASE_INTERPRET) + GOTO(interpret, req->rq_status); + + /* + * Note that this also will start async reply unlink. + */ + if (req->rq_net_err && !req->rq_timedout) { + ptlrpc_expire_one_request(req, 1); + + /* + * Check if we still need to wait for unlink. + */ + if (ptlrpc_client_recv_or_unlink(req) || + ptlrpc_client_bulk_active(req)) + continue; + /* If there is no need to resend, fail it now. */ + if (req->rq_no_resend) { + if (req->rq_status == 0) + req->rq_status = -EIO; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } else { + continue; + } + } + + if (req->rq_err) { spin_lock(&req->rq_lock); req->rq_replied = 0; spin_unlock(&req->rq_lock); - if (req->rq_status == 0) - req->rq_status = -EIO; - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } - - /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr - * so it sets rq_intr regardless of individual rpc + if (req->rq_status == 0) + req->rq_status = -EIO; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } + + /* + * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr + * so it sets rq_intr regardless of individual rpc * timeouts. The synchronous IO waiting path sets - * rq_intr irrespective of whether ptlrpcd - * has seen a timeout. Our policy is to only interpret - * interrupted rpcs after they have timed out, so we - * need to enforce that here. - */ - - if (req->rq_intr && (req->rq_timedout || req->rq_waiting || - req->rq_wait_ctx)) { - req->rq_status = -EINTR; - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } - - if (req->rq_phase == RQ_PHASE_RPC) { - if (req->rq_timedout || req->rq_resend || - req->rq_waiting || req->rq_wait_ctx) { - int status; + * rq_intr irrespective of whether ptlrpcd + * has seen a timeout. Our policy is to only interpret + * interrupted rpcs after they have timed out, so we + * need to enforce that here. + */ + + if (req->rq_intr && (req->rq_timedout || req->rq_waiting || + req->rq_wait_ctx)) { + req->rq_status = -EINTR; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } + + if (req->rq_phase == RQ_PHASE_RPC) { + if (req->rq_timedout || req->rq_resend || + req->rq_waiting || req->rq_wait_ctx) { + int status; if (!ptlrpc_unregister_reply(req, 1)) { ptlrpc_unregister_bulk(req, 1); @@ -1834,26 +1978,30 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } spin_lock(&imp->imp_lock); - if (ptlrpc_import_delay_req(imp, req, &status)){ - /* put on delay list - only if we wait - * recovery finished - before send */ - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp-> - imp_delayed_list); + if (ptlrpc_import_delay_req(imp, req, + &status)) { + /* + * put on delay list - only if we wait + * recovery finished - before send + */ + list_move_tail(&req->rq_list, + &imp->imp_delayed_list); spin_unlock(&imp->imp_lock); - continue; - } + continue; + } - if (status != 0) { - req->rq_status = status; - ptlrpc_rqphase_move(req, - RQ_PHASE_INTERPRET); + if (status != 0) { + req->rq_status = status; + ptlrpc_rqphase_move(req, + RQ_PHASE_INTERPRET); spin_unlock(&imp->imp_lock); GOTO(interpret, req->rq_status); } + /* ignore on just initiated connections */ if (ptlrpc_no_resend(req) && - !req->rq_wait_ctx) { + !req->rq_wait_ctx && + imp->imp_generation != + imp->imp_initiated_at) { req->rq_status = -ENOTCONN; ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); @@ -1861,9 +2009,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) GOTO(interpret, req->rq_status); } - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp->imp_sending_list); + list_move_tail(&req->rq_list, + &imp->imp_sending_list); spin_unlock(&imp->imp_lock); @@ -1872,24 +2019,22 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); if (req->rq_timedout || req->rq_resend) { - /* This is re-sending anyways, - * let's mark req as resend. */ + /* + * This is re-sending anyways, + * let's mark req as resend. + */ spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); - - if (req->rq_bulk != NULL && - !ptlrpc_unregister_bulk(req, 1)) - continue; - } - /* - * rq_wait_ctx is only touched by ptlrpcd, - * so no lock is needed here. - */ - status = sptlrpc_req_refresh_ctx(req, -1); - if (status) { - if (req->rq_err) { - req->rq_status = status; + } + /* + * rq_wait_ctx is only touched by ptlrpcd, + * so no lock is needed here. + */ + status = sptlrpc_req_refresh_ctx(req, -1); + if (status) { + if (req->rq_err) { + req->rq_status = status; spin_lock(&req->rq_lock); req->rq_wait_ctx = 0; spin_unlock(&req->rq_lock); @@ -1907,6 +2052,14 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); } + /* + * In any case, the previous bulk should be + * cleaned up to prepare for the new sending + */ + if (req->rq_bulk && + !ptlrpc_unregister_bulk(req, 1)) + continue; + rc = ptl_send_rpc(req, 0); if (rc == -ENOMEM) { spin_lock(&imp->imp_lock); @@ -1951,49 +2104,56 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); - /* unlink from net because we are going to - * swab in-place of reply buffer */ - unregistered = ptlrpc_unregister_reply(req, 1); - if (!unregistered) - continue; - - req->rq_status = after_reply(req); - if (req->rq_resend) - continue; - - /* If there is no bulk associated with this request, - * then we're done and should let the interpreter - * process the reply. Similarly if the RPC returned - * an error, and therefore the bulk will never arrive. - */ - if (req->rq_bulk == NULL || req->rq_status < 0) { - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } - - ptlrpc_rqphase_move(req, RQ_PHASE_BULK); - } - - LASSERT(req->rq_phase == RQ_PHASE_BULK); - if (ptlrpc_client_bulk_active(req)) - continue; + /* + * unlink from net because we are going to + * swab in-place of reply buffer + */ + unregistered = ptlrpc_unregister_reply(req, 1); + if (!unregistered) + continue; + + req->rq_status = after_reply(req); + if (req->rq_resend) + continue; + + /* + * If there is no bulk associated with this request, + * then we're done and should let the interpreter + * process the reply. Similarly if the RPC returned + * an error, and therefore the bulk will never arrive. + */ + if (!req->rq_bulk || req->rq_status < 0) { + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } + + ptlrpc_rqphase_move(req, RQ_PHASE_BULK); + } + + LASSERT(req->rq_phase == RQ_PHASE_BULK); + if (ptlrpc_client_bulk_active(req)) + continue; if (req->rq_bulk->bd_failure) { - /* The RPC reply arrived OK, but the bulk screwed + /* + * The RPC reply arrived OK, but the bulk screwed * up! Dead weird since the server told us the RPC * was good after getting the REPLY for her GET or - * the ACK for her PUT. */ + * the ACK for her PUT. + */ DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); req->rq_status = -EIO; } ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - interpret: +interpret: LASSERT(req->rq_phase == RQ_PHASE_INTERPRET); - /* This moves to "unregistering" phase we need to wait for - * reply unlink. */ + /* + * This moves to "unregistering" phase we need to wait for + * reply unlink. + */ if (!unregistered && !ptlrpc_unregister_reply(req, async)) { /* start async bulk unlink too */ ptlrpc_unregister_bulk(req, 1); @@ -2003,8 +2163,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (!ptlrpc_unregister_bulk(req, async)) continue; - /* When calling interpret receiving already should be - * finished. */ + /* + * When calling interpret receiving already should be + * finished. + */ LASSERT(!req->rq_receiving_reply); ptlrpc_req_interpret(env, req, req->rq_status); @@ -2015,21 +2177,24 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); - if (req->rq_reqmsg != NULL) + if (req->rq_reqmsg) CDEBUG(D_RPCTRACE, - "Completed RPC pname:cluuid:pid:xid:nid:" - "opc %s:%s:%d:%llu:%s:%d\n", current_comm(), + "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, obd_import_nid2str(imp), - lustre_msg_get_opc(req->rq_reqmsg)); + lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); spin_lock(&imp->imp_lock); - /* Request already may be not on sending or delaying list. This + /* + * Request already may be not on sending or delaying list. This * may happen in the case of marking it erroneous for the case * ptlrpc_import_delay_req(req, status) find it impossible to - * allow sending this rpc and returns *status != 0. */ + * allow sending this rpc and returns *status != 0. + */ if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); atomic_dec(&imp->imp_inflight); @@ -2045,8 +2210,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (ptlrpc_set_producer(set) > 0) force_timer_recalc = 1; - /* free the request that has just been completed - * in order not to pollute set->set_requests */ + /* + * free the request that has just been completed + * in order not to pollute set->set_requests + */ list_del_init(&req->rq_set_chain); spin_lock(&req->rq_lock); req->rq_set = NULL; @@ -2062,8 +2229,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } } - /* move completed request at the head of list so it's easier for - * caller to find them */ + /* + * move completed request at the head of list so it's easier for + * caller to find them + */ list_splice(&comp_reqs, &set->set_requests); /* If we hit an error, we want to recover promptly. */ @@ -2081,14 +2250,14 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) struct obd_import *imp = req->rq_import; unsigned int debug_mask = D_RPCTRACE; int rc = 0; - ENTRY; + ENTRY; spin_lock(&req->rq_lock); req->rq_timedout = 1; spin_unlock(&req->rq_lock); if (ptlrpc_console_allow(req, lustre_msg_get_opc(req->rq_reqmsg), - lustre_msg_get_status(req->rq_reqmsg))) + lustre_msg_get_status(req->rq_reqmsg))) debug_mask = D_WARNING; DEBUG_REQ(debug_mask, req, "Request sent has %s: [sent %lld/real %lld]", req->rq_net_err ? "failed due to network error" : @@ -2098,51 +2267,55 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) "timed out for sent delay" : "timed out for slow reply"), (s64)req->rq_sent, (s64)req->rq_real_sent); - if (imp != NULL && obd_debug_peer_on_timeout) + if (imp && obd_debug_peer_on_timeout) LNetDebugPeer(imp->imp_connection->c_peer); - ptlrpc_unregister_reply(req, async_unlink); - ptlrpc_unregister_bulk(req, async_unlink); + ptlrpc_unregister_reply(req, async_unlink); + ptlrpc_unregister_bulk(req, async_unlink); - if (obd_dump_on_timeout) - libcfs_debug_dumplog(); + if (obd_dump_on_timeout) + libcfs_debug_dumplog(); - if (imp == NULL) { - DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?"); - RETURN(1); - } + if (!imp) { + DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?"); + RETURN(1); + } atomic_inc(&imp->imp_timeouts); - /* The DLM server doesn't want recovery run on its imports. */ - if (imp->imp_dlm_fake) - RETURN(1); - - /* If this request is for recovery or other primordial tasks, - * then error it out here. */ - if (req->rq_ctx_init || req->rq_ctx_fini || - req->rq_send_state != LUSTRE_IMP_FULL || - imp->imp_obd->obd_no_recov) { - DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)", - ptlrpc_import_state_name(req->rq_send_state), - ptlrpc_import_state_name(imp->imp_state)); + /* The DLM server doesn't want recovery run on its imports. */ + if (imp->imp_dlm_fake) + RETURN(1); + + /* + * If this request is for recovery or other primordial tasks, + * then error it out here. + */ + if (req->rq_ctx_init || req->rq_ctx_fini || + req->rq_send_state != LUSTRE_IMP_FULL || + imp->imp_obd->obd_no_recov) { + DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)", + ptlrpc_import_state_name(req->rq_send_state), + ptlrpc_import_state_name(imp->imp_state)); spin_lock(&req->rq_lock); req->rq_status = -ETIMEDOUT; req->rq_err = 1; spin_unlock(&req->rq_lock); RETURN(1); - } + } - /* if a request can't be resent we can't wait for an answer after - the timeout */ - if (ptlrpc_no_resend(req)) { - DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:"); - rc = 1; - } + /* + * if a request can't be resent we can't wait for an answer after + * the timeout + */ + if (ptlrpc_no_resend(req)) { + DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:"); + rc = 1; + } - ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg)); + ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg)); - RETURN(rc); + RETURN(rc); } /** @@ -2167,31 +2340,33 @@ int ptlrpc_expired_set(void *data) list_entry(tmp, struct ptlrpc_request, rq_set_chain); - /* don't expire request waiting for context */ - if (req->rq_wait_ctx) - continue; + /* don't expire request waiting for context */ + if (req->rq_wait_ctx) + continue; - /* Request in-flight? */ - if (!((req->rq_phase == RQ_PHASE_RPC && - !req->rq_waiting && !req->rq_resend) || - (req->rq_phase == RQ_PHASE_BULK))) - continue; + /* Request in-flight? */ + if (!((req->rq_phase == RQ_PHASE_RPC && + !req->rq_waiting && !req->rq_resend) || + (req->rq_phase == RQ_PHASE_BULK))) + continue; - if (req->rq_timedout || /* already dealt with */ - req->rq_deadline > now) /* not expired */ - continue; + if (req->rq_timedout || /* already dealt with */ + req->rq_deadline > now) /* not expired */ + continue; - /* Deal with this guy. Do it asynchronously to not block - * ptlrpcd thread. */ - ptlrpc_expire_one_request(req, 1); - } + /* + * Deal with this guy. Do it asynchronously to not block + * ptlrpcd thread. + */ + ptlrpc_expire_one_request(req, 1); + } - /* - * When waiting for a whole set, we always break out of the - * sleep so we can recalculate the timeout, or enable interrupts - * if everyone's timed out. - */ - RETURN(1); + /* + * When waiting for a whole set, we always break out of the + * sleep so we can recalculate the timeout, or enable interrupts + * if everyone's timed out. + */ + RETURN(1); } /** @@ -2248,39 +2423,33 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) list_for_each(tmp, &set->set_requests) { req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - /* - * Request in-flight? - */ - if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || - (req->rq_phase == RQ_PHASE_BULK) || - (req->rq_phase == RQ_PHASE_NEW))) - continue; - - /* - * Already timed out. - */ - if (req->rq_timedout) - continue; - - /* - * Waiting for ctx. - */ - if (req->rq_wait_ctx) - continue; - - if (req->rq_phase == RQ_PHASE_NEW) - deadline = req->rq_sent; + /* Request in-flight? */ + if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || + (req->rq_phase == RQ_PHASE_BULK) || + (req->rq_phase == RQ_PHASE_NEW))) + continue; + + /* Already timed out. */ + if (req->rq_timedout) + continue; + + /* Waiting for ctx. */ + if (req->rq_wait_ctx) + continue; + + if (req->rq_phase == RQ_PHASE_NEW) + deadline = req->rq_sent; else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend) deadline = req->rq_sent; - else - deadline = req->rq_sent + req->rq_timeout; + else + deadline = req->rq_sent + req->rq_timeout; - if (deadline <= now) /* actually expired already */ - timeout = 1; /* ASAP */ - else if (timeout == 0 || timeout > deadline - now) - timeout = deadline - now; - } - RETURN(timeout); + if (deadline <= now) /* actually expired already */ + timeout = 1; /* ASAP */ + else if (timeout == 0 || timeout > deadline - now) + timeout = deadline - now; + } + RETURN(timeout); } /** @@ -2289,15 +2458,15 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) * error or otherwise be interrupted). * Returns 0 on success or error code otherwise. */ -int ptlrpc_set_wait(struct ptlrpc_request_set *set) +int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct ptlrpc_request *req; - struct l_wait_info lwi; + struct list_head *tmp; + struct ptlrpc_request *req; + struct l_wait_info lwi; time64_t timeout; int rc; - ENTRY; + ENTRY; if (set->set_producer) (void)ptlrpc_set_producer(set); else @@ -2309,64 +2478,75 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) } if (list_empty(&set->set_requests)) - RETURN(0); + RETURN(0); - do { - timeout = ptlrpc_set_next_timeout(set); + do { + timeout = ptlrpc_set_next_timeout(set); - /* wait until all complete, interrupted, or an in-flight - * req times out */ + /* + * wait until all complete, interrupted, or an in-flight + * req times out + */ CDEBUG(D_RPCTRACE, "set %p going to sleep for %lld seconds\n", - set, timeout); + set, timeout); if ((timeout == 0 && !signal_pending(current)) || set->set_allow_intr) - /* No requests are in-flight (ether timed out + /* + * No requests are in-flight (ether timed out * or delayed), so we can allow interrupts. * We still want to block for a limited time, - * so we allow interrupts during the timeout. */ + * so we allow interrupts during the timeout. + */ lwi = LWI_TIMEOUT_INTR_ALL( cfs_time_seconds(timeout ? timeout : 1), ptlrpc_expired_set, ptlrpc_interrupted_set, set); - else - /* - * At least one request is in flight, so no - * interrupts are allowed. Wait until all + else + /* + * At least one request is in flight, so no + * interrupts are allowed. Wait until all * complete, or an in-flight req times out. - */ - lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1), - ptlrpc_expired_set, set); + */ + lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), + ptlrpc_expired_set, set); - rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi); + rc = l_wait_event(set->set_waitq, + ptlrpc_check_set(NULL, set), &lwi); - /* LU-769 - if we ignored the signal because it was already - * pending when we started, we need to handle it now or we risk - * it being ignored forever */ + /* + * LU-769 - if we ignored the signal because it was already + * pending when we started, we need to handle it now or we risk + * it being ignored forever + */ if (rc == -ETIMEDOUT && (!lwi.lwi_allow_intr || set->set_allow_intr) && signal_pending(current)) { sigset_t blocked_sigs = cfs_block_sigsinv(LUSTRE_FATAL_SIGS); - /* In fact we only interrupt for the "fatal" signals + /* + * In fact we only interrupt for the "fatal" signals * like SIGINT or SIGKILL. We still ignore less * important signals since ptlrpc set is not easily - * reentrant from userspace again */ + * reentrant from userspace again + */ if (signal_pending(current)) ptlrpc_interrupted_set(set); cfs_restore_sigs(blocked_sigs); } - LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); + LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); - /* -EINTR => all requests have been flagged rq_intr so next - * check completes. - * -ETIMEDOUT => someone timed out. When all reqs have - * timed out, signals are enabled allowing completion with - * EINTR. - * I don't really care if we go once more round the loop in - * the error cases -eeb. */ + /* + * -EINTR => all requests have been flagged rq_intr so next + * check completes. + * -ETIMEDOUT => someone timed out. When all reqs have + * timed out, signals are enabled allowing completion with + * EINTR. + * I don't really care if we go once more round the loop in + * the error cases -eeb. + */ if (rc == 0 && atomic_read(&set->set_remaining) == 0) { list_for_each(tmp, &set->set_requests) { req = list_entry(tmp, struct ptlrpc_request, @@ -2380,14 +2560,14 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) LASSERT(atomic_read(&set->set_remaining) == 0); - rc = set->set_rc; /* rq_status of already freed requests if any */ + rc = set->set_rc; /* rq_status of already freed requests if any */ list_for_each(tmp, &set->set_requests) { req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - LASSERT(req->rq_phase == RQ_PHASE_COMPLETE); - if (req->rq_status != 0) - rc = req->rq_status; - } + LASSERT(req->rq_phase == RQ_PHASE_COMPLETE); + if (req->rq_status != 0) + rc = req->rq_status; + } RETURN(rc); } @@ -2405,7 +2585,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) { ENTRY; - if (request == NULL) + if (!request) RETURN_EXIT; LASSERT(!request->rq_srv_req); @@ -2417,16 +2597,18 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) req_capsule_fini(&request->rq_pill); - /* We must take it off the imp_replay_list first. Otherwise, we'll set - * request->rq_reqmsg to NULL while osc_close is dereferencing it. */ - if (request->rq_import != NULL) { + /* + * We must take it off the imp_replay_list first. Otherwise, we'll set + * request->rq_reqmsg to NULL while osc_close is dereferencing it. + */ + if (request->rq_import) { if (!locked) spin_lock(&request->rq_import->imp_lock); list_del_init(&request->rq_replay_list); list_del_init(&request->rq_unreplied_list); if (!locked) spin_unlock(&request->rq_import->imp_lock); - } + } LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request); if (atomic_read(&request->rq_refcount) != 0) { @@ -2435,25 +2617,25 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) LBUG(); } - if (request->rq_repbuf != NULL) - sptlrpc_cli_free_repbuf(request); + if (request->rq_repbuf) + sptlrpc_cli_free_repbuf(request); - if (request->rq_import != NULL) { - class_import_put(request->rq_import); - request->rq_import = NULL; - } - if (request->rq_bulk != NULL) + if (request->rq_import) { + class_import_put(request->rq_import); + request->rq_import = NULL; + } + if (request->rq_bulk) ptlrpc_free_bulk(request->rq_bulk); - if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL) - sptlrpc_cli_free_reqbuf(request); + if (request->rq_reqbuf || request->rq_clrbuf) + sptlrpc_cli_free_reqbuf(request); - if (request->rq_cli_ctx) - sptlrpc_req_put_ctx(request, !locked); + if (request->rq_cli_ctx) + sptlrpc_req_put_ctx(request, !locked); - if (request->rq_pool) - __ptlrpc_free_req_to_pool(request); - else + if (request->rq_pool) + __ptlrpc_free_req_to_pool(request); + else ptlrpc_request_cache_free(request); EXIT; } @@ -2481,8 +2663,8 @@ void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request) static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) { int count; - ENTRY; + ENTRY; if (!request) RETURN(1); @@ -2496,7 +2678,8 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) count = atomic_dec_return(&request->rq_refcount); LASSERTF(count >= 0, "Invalid ref count %d\n", count); - /* For open RPC, the client does not know the EA size (LOV, ACL, and + /* + * For open RPC, the client does not know the EA size (LOV, ACL, and * so on) before replied, then the client has to reserve very large * reply buffer. Such buffer will not be released until the RPC freed. * Since The open RPC is replayable, we need to keep it in the replay @@ -2506,7 +2689,8 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) * If fact, it is unnecessary to keep reply buffer for open replay, * related EAs have already been saved via mdc_save_lovea() before * coming here. So it is safe to free the reply buffer some earlier - * before releasing the RPC to avoid client OOM. LU-9514 */ + * before releasing the RPC to avoid client OOM. LU-9514 + */ if (count == 1 && request->rq_early_free_repbuf && request->rq_repbuf) { spin_lock(&request->rq_early_free_lock); sptlrpc_cli_free_repbuf(request); @@ -2529,7 +2713,7 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) */ void ptlrpc_req_finished(struct ptlrpc_request *request) { - __ptlrpc_req_finished(request, 0); + __ptlrpc_req_finished(request, 0); } EXPORT_SYMBOL(ptlrpc_req_finished); @@ -2538,7 +2722,7 @@ EXPORT_SYMBOL(ptlrpc_req_finished); */ __u64 ptlrpc_req_xid(struct ptlrpc_request *request) { - return request->rq_xid; + return request->rq_xid; } EXPORT_SYMBOL(ptlrpc_req_xid); @@ -2551,7 +2735,7 @@ EXPORT_SYMBOL(ptlrpc_req_xid); */ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) { - int rc; + int rc; struct l_wait_info lwi; /* @@ -2565,58 +2749,60 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) request->rq_reply_deadline = ktime_get_real_seconds() + LONG_UNLINK; - /* - * Nothing left to do. - */ - if (!ptlrpc_client_recv_or_unlink(request)) - RETURN(1); + /* + * Nothing left to do. + */ + if (!ptlrpc_client_recv_or_unlink(request)) + RETURN(1); - LNetMDUnlink(request->rq_reply_md_h); + LNetMDUnlink(request->rq_reply_md_h); - /* - * Let's check it once again. - */ - if (!ptlrpc_client_recv_or_unlink(request)) - RETURN(1); + /* + * Let's check it once again. + */ + if (!ptlrpc_client_recv_or_unlink(request)) + RETURN(1); /* Move to "Unregistering" phase as reply was not unlinked yet. */ ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC); - /* - * Do not wait for unlink to finish. - */ - if (async) - RETURN(0); - - /* - * We have to l_wait_event() whatever the result, to give liblustre - * a chance to run reply_in_callback(), and to make sure we've - * unlinked before returning a req to the pool. - */ - for (;;) { + /* + * Do not wait for unlink to finish. + */ + if (async) + RETURN(0); + + /* + * We have to l_wait_event() whatever the result, to give liblustre + * a chance to run reply_in_callback(), and to make sure we've + * unlinked before returning a req to the pool. + */ + for (;;) { /* The wq argument is ignored by user-space wait_event macros */ - wait_queue_head_t *wq = (request->rq_set != NULL) ? + wait_queue_head_t *wq = (request->rq_set) ? &request->rq_set->set_waitq : &request->rq_reply_waitq; - /* Network access will complete in finite time but the HUGE - * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), - cfs_time_seconds(1), NULL, NULL); - rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), - &lwi); - if (rc == 0) { - ptlrpc_rqphase_move(request, request->rq_next_phase); - RETURN(1); - } - - LASSERT(rc == -ETIMEDOUT); - DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout " - "receiving_reply=%d req_ulinked=%d reply_unlinked=%d", + /* + * Network access will complete in finite time but the HUGE + * timeout lets us CWARN for visibility of sluggish NALs + */ + lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), + cfs_time_seconds(1), NULL, NULL); + rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), + &lwi); + if (rc == 0) { + ptlrpc_rqphase_move(request, request->rq_next_phase); + RETURN(1); + } + + LASSERT(rc == -ETIMEDOUT); + DEBUG_REQ(D_WARNING, request, + "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d", request->rq_receiving_reply, request->rq_req_unlinked, request->rq_reply_unlinked); - } - RETURN(0); + } + RETURN(0); } static void ptlrpc_free_request(struct ptlrpc_request *req) @@ -2625,7 +2811,7 @@ static void ptlrpc_free_request(struct ptlrpc_request *req) req->rq_replay = 0; spin_unlock(&req->rq_lock); - if (req->rq_commit_cb != NULL) + if (req->rq_commit_cb) req->rq_commit_cb(req); list_del_init(&req->rq_replay_list); @@ -2637,7 +2823,7 @@ static void ptlrpc_free_request(struct ptlrpc_request *req) */ void ptlrpc_request_committed(struct ptlrpc_request *req, int force) { - struct obd_import *imp = req->rq_import; + struct obd_import *imp = req->rq_import; spin_lock(&imp->imp_lock); if (list_empty(&req->rq_replay_list)) { @@ -2645,8 +2831,11 @@ void ptlrpc_request_committed(struct ptlrpc_request *req, int force) return; } - if (force || req->rq_transno <= imp->imp_peer_committed_transno) + if (force || req->rq_transno <= imp->imp_peer_committed_transno) { + if (imp->imp_replay_cursor == &req->rq_replay_list) + imp->imp_replay_cursor = req->rq_replay_list.next; ptlrpc_free_request(req); + } spin_unlock(&imp->imp_lock); } @@ -2662,64 +2851,64 @@ EXPORT_SYMBOL(ptlrpc_request_committed); */ void ptlrpc_free_committed(struct obd_import *imp) { - struct ptlrpc_request *req, *saved; - struct ptlrpc_request *last_req = NULL; /* temporary fire escape */ - bool skip_committed_list = true; - ENTRY; + struct ptlrpc_request *req, *saved; + struct ptlrpc_request *last_req = NULL; /* temporary fire escape */ + bool skip_committed_list = true; + ENTRY; LASSERT(imp != NULL); assert_spin_locked(&imp->imp_lock); - if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked && - imp->imp_generation == imp->imp_last_generation_checked) { + if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked && + imp->imp_generation == imp->imp_last_generation_checked) { CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n", - imp->imp_obd->obd_name, imp->imp_peer_committed_transno); + imp->imp_obd->obd_name, imp->imp_peer_committed_transno); RETURN_EXIT; - } + } CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n", - imp->imp_obd->obd_name, imp->imp_peer_committed_transno, - imp->imp_generation); + imp->imp_obd->obd_name, imp->imp_peer_committed_transno, + imp->imp_generation); if (imp->imp_generation != imp->imp_last_generation_checked || imp->imp_last_transno_checked == 0) skip_committed_list = false; - imp->imp_last_transno_checked = imp->imp_peer_committed_transno; - imp->imp_last_generation_checked = imp->imp_generation; + imp->imp_last_transno_checked = imp->imp_peer_committed_transno; + imp->imp_last_generation_checked = imp->imp_generation; list_for_each_entry_safe(req, saved, &imp->imp_replay_list, - rq_replay_list) { - /* XXX ok to remove when 1357 resolved - rread 05/29/03 */ - LASSERT(req != last_req); - last_req = req; - - if (req->rq_transno == 0) { - DEBUG_REQ(D_EMERG, req, "zero transno during replay"); - LBUG(); - } - if (req->rq_import_generation < imp->imp_generation) { - DEBUG_REQ(D_RPCTRACE, req, "free request with old gen"); - GOTO(free_req, 0); - } - - /* not yet committed */ - if (req->rq_transno > imp->imp_peer_committed_transno) { - DEBUG_REQ(D_RPCTRACE, req, "stopping search"); - break; - } + rq_replay_list) { + /* XXX ok to remove when 1357 resolved - rread 05/29/03 */ + LASSERT(req != last_req); + last_req = req; + + if (req->rq_transno == 0) { + DEBUG_REQ(D_EMERG, req, "zero transno during replay"); + LBUG(); + } + if (req->rq_import_generation < imp->imp_generation) { + DEBUG_REQ(D_RPCTRACE, req, "free request with old gen"); + GOTO(free_req, 0); + } + + /* not yet committed */ + if (req->rq_transno > imp->imp_peer_committed_transno) { + DEBUG_REQ(D_RPCTRACE, req, "stopping search"); + break; + } if (req->rq_replay) { DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)"); list_move_tail(&req->rq_replay_list, - &imp->imp_committed_list); + &imp->imp_committed_list); continue; } DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)", - imp->imp_peer_committed_transno); + imp->imp_peer_committed_transno); free_req: ptlrpc_free_request(req); - } + } if (skip_committed_list) GOTO(out, 0); @@ -2741,13 +2930,13 @@ free_req: } } out: - EXIT; + EXIT; } void ptlrpc_cleanup_client(struct obd_import *imp) { - ENTRY; - EXIT; + ENTRY; + EXIT; } /** @@ -2758,25 +2947,26 @@ void ptlrpc_cleanup_client(struct obd_import *imp) */ void ptlrpc_resend_req(struct ptlrpc_request *req) { - DEBUG_REQ(D_HA, req, "going to resend"); + DEBUG_REQ(D_HA, req, "going to resend"); spin_lock(&req->rq_lock); - /* Request got reply but linked to the import list still. - Let ptlrpc_check_set() to process it. */ + /* + * Request got reply but linked to the import list still. + * Let ptlrpc_check_set() process it. + */ if (ptlrpc_client_replied(req)) { spin_unlock(&req->rq_lock); DEBUG_REQ(D_HA, req, "it has reply, so skip it"); return; } - lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 }); - req->rq_status = -EAGAIN; + req->rq_status = -EAGAIN; - req->rq_resend = 1; - req->rq_net_err = 0; - req->rq_timedout = 0; + req->rq_resend = 1; + req->rq_net_err = 0; + req->rq_timedout = 0; - ptlrpc_client_wake_req(req); + ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -2809,20 +2999,22 @@ EXPORT_SYMBOL(ptlrpc_request_addref); * Must be called under imp_lock */ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, - struct obd_import *imp) + struct obd_import *imp) { struct list_head *tmp; assert_spin_locked(&imp->imp_lock); - if (req->rq_transno == 0) { - DEBUG_REQ(D_EMERG, req, "saving request with zero transno"); - LBUG(); - } + if (req->rq_transno == 0) { + DEBUG_REQ(D_EMERG, req, "saving request with zero transno"); + LBUG(); + } - /* clear this for new requests that were resent as well - as resent replayed requests. */ - lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); + /* + * clear this for new requests that were resent as well + * as resent replayed requests. + */ + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); /* don't re-add requests that have been replayed */ if (!list_empty(&req->rq_replay_list)) @@ -2842,20 +3034,21 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, struct ptlrpc_request, rq_replay_list); - /* We may have duplicate transnos if we create and then - * open a file, or for closes retained if to match creating - * opens, so use req->rq_xid as a secondary key. - * (See bugs 684, 685, and 428.) - * XXX no longer needed, but all opens need transnos! - */ - if (iter->rq_transno > req->rq_transno) - continue; - - if (iter->rq_transno == req->rq_transno) { - LASSERT(iter->rq_xid != req->rq_xid); - if (iter->rq_xid > req->rq_xid) - continue; - } + /* + * We may have duplicate transnos if we create and then + * open a file, or for closes retained if to match creating + * opens, so use req->rq_xid as a secondary key. + * (See bugs 684, 685, and 428.) + * XXX no longer needed, but all opens need transnos! + */ + if (iter->rq_transno > req->rq_transno) + continue; + + if (iter->rq_transno == req->rq_transno) { + LASSERT(iter->rq_xid != req->rq_xid); + if (iter->rq_xid > req->rq_xid) + continue; + } list_add(&req->rq_replay_list, &iter->rq_replay_list); return; @@ -2870,15 +3063,15 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, */ int ptlrpc_queue_wait(struct ptlrpc_request *req) { - struct ptlrpc_request_set *set; - int rc; - ENTRY; + struct ptlrpc_request_set *set; + int rc; - LASSERT(req->rq_set == NULL); - LASSERT(!req->rq_receiving_reply); + ENTRY; + LASSERT(req->rq_set == NULL); + LASSERT(!req->rq_receiving_reply); set = ptlrpc_prep_set(); - if (set == NULL) { + if (!set) { CERROR("cannot allocate ptlrpc set: rc = %d\n", -ENOMEM); RETURN(-ENOMEM); } @@ -2886,13 +3079,13 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) /* for distributed debugging */ lustre_msg_set_status(req->rq_reqmsg, current_pid()); - /* add a ref for the set (see comment in ptlrpc_set_add_req) */ - ptlrpc_request_addref(req); - ptlrpc_set_add_req(set, req); - rc = ptlrpc_set_wait(set); - ptlrpc_set_destroy(set); + /* add a ref for the set (see comment in ptlrpc_set_add_req) */ + ptlrpc_request_addref(req); + ptlrpc_set_add_req(set, req); + rc = ptlrpc_set_wait(NULL, set); + ptlrpc_set_destroy(set); - RETURN(rc); + RETURN(rc); } EXPORT_SYMBOL(ptlrpc_queue_wait); @@ -2903,77 +3096,81 @@ EXPORT_SYMBOL(ptlrpc_queue_wait); */ static int ptlrpc_replay_interpret(const struct lu_env *env, struct ptlrpc_request *req, - void * data, int rc) + void *args, int rc) { - struct ptlrpc_replay_async_args *aa = data; + struct ptlrpc_replay_async_args *aa = args; struct obd_import *imp = req->rq_import; ENTRY; atomic_dec(&imp->imp_replay_inflight); - /* Note: if it is bulk replay (MDS-MDS replay), then even if + /* + * Note: if it is bulk replay (MDS-MDS replay), then even if * server got the request, but bulk transfer timeout, let's - * replay the bulk req again */ + * replay the bulk req again + */ if (!ptlrpc_client_replied(req) || - (req->rq_bulk != NULL && + (req->rq_bulk && lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) { - DEBUG_REQ(D_ERROR, req, "request replay timed out.\n"); + DEBUG_REQ(D_ERROR, req, "request replay timed out"); GOTO(out, rc = -ETIMEDOUT); } - if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR && - (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN || - lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) - GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg)); + if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR && + (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN || + lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) + GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg)); - /** VBR: check version failure */ - if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { + /** VBR: check version failure */ + if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { /** replay was failed due to version mismatch */ - DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n"); + DEBUG_REQ(D_WARNING, req, "Version mismatch during replay"); spin_lock(&imp->imp_lock); imp->imp_vbr_failed = 1; spin_unlock(&imp->imp_lock); lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status); - } else { - /** The transno had better not change over replay. */ - LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) == - lustre_msg_get_transno(req->rq_repmsg) || - lustre_msg_get_transno(req->rq_repmsg) == 0, + } else { + /** The transno had better not change over replay. */ + LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) == + lustre_msg_get_transno(req->rq_repmsg) || + lustre_msg_get_transno(req->rq_repmsg) == 0, "%#llx/%#llx\n", - lustre_msg_get_transno(req->rq_reqmsg), - lustre_msg_get_transno(req->rq_repmsg)); - } + lustre_msg_get_transno(req->rq_reqmsg), + lustre_msg_get_transno(req->rq_repmsg)); + } spin_lock(&imp->imp_lock); imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg); spin_unlock(&imp->imp_lock); - LASSERT(imp->imp_last_replay_transno); - - /* transaction number shouldn't be bigger than the latest replayed */ - if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { - DEBUG_REQ(D_ERROR, req, - "Reported transno %llu is bigger than the " - "replayed one: %llu", req->rq_transno, - lustre_msg_get_transno(req->rq_reqmsg)); - GOTO(out, rc = -EINVAL); - } - - DEBUG_REQ(D_HA, req, "got rep"); - - /* let the callback do fixups, possibly including in the request */ - if (req->rq_replay_cb) - req->rq_replay_cb(req); - - if (ptlrpc_client_replied(req) && - lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) { - DEBUG_REQ(D_ERROR, req, "status %d, old was %d", - lustre_msg_get_status(req->rq_repmsg), - aa->praa_old_status); - - /* Note: If the replay fails for MDT-MDT recovery, let's + LASSERT(imp->imp_last_replay_transno); + + /* transaction number shouldn't be bigger than the latest replayed */ + if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { + DEBUG_REQ(D_ERROR, req, + "Reported transno=%llu is bigger than replayed=%llu", + req->rq_transno, + lustre_msg_get_transno(req->rq_reqmsg)); + GOTO(out, rc = -EINVAL); + } + + DEBUG_REQ(D_HA, req, "got reply"); + + /* let the callback do fixups, possibly including in the request */ + if (req->rq_replay_cb) + req->rq_replay_cb(req); + + if (ptlrpc_client_replied(req) && + lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) { + DEBUG_REQ(D_ERROR, req, "status %d, old was %d", + lustre_msg_get_status(req->rq_repmsg), + aa->praa_old_status); + + /* + * Note: If the replay fails for MDT-MDT recovery, let's * abort all of the following requests in the replay * and sending list, because MDT-MDT update requests - * are dependent on each other, see LU-7039 */ + * are dependent on each other, see LU-7039 + */ if (imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) { struct ptlrpc_request *free_req; struct ptlrpc_request *tmp; @@ -2992,8 +3189,8 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, } list_for_each_entry_safe(free_req, tmp, - &imp->imp_delayed_list, - rq_list) { + &imp->imp_delayed_list, + rq_list) { spin_lock(&free_req->rq_lock); free_req->rq_err = 1; free_req->rq_status = -EIO; @@ -3002,8 +3199,8 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, } list_for_each_entry_safe(free_req, tmp, - &imp->imp_sending_list, - rq_list) { + &imp->imp_sending_list, + rq_list) { spin_lock(&free_req->rq_lock); free_req->rq_err = 1; free_req->rq_status = -EIO; @@ -3012,28 +3209,28 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, } spin_unlock(&imp->imp_lock); } - } else { - /* Put it back for re-replay. */ - lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status); - } - - /* - * Errors while replay can set transno to 0, but - * imp_last_replay_transno shouldn't be set to 0 anyway - */ - if (req->rq_transno == 0) - CERROR("Transno is 0 during replay!\n"); - - /* continue with recovery */ - rc = ptlrpc_import_recovery_state_machine(imp); + } else { + /* Put it back for re-replay. */ + lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status); + } + + /* + * Errors while replay can set transno to 0, but + * imp_last_replay_transno shouldn't be set to 0 anyway + */ + if (req->rq_transno == 0) + CERROR("Transno is 0 during replay!\n"); + + /* continue with recovery */ + rc = ptlrpc_import_recovery_state_machine(imp); out: - req->rq_send_state = aa->praa_old_state; + req->rq_send_state = aa->praa_old_state; - if (rc != 0) - /* this replay failed, so restart recovery */ - ptlrpc_connect_import(imp); + if (rc != 0) + /* this replay failed, so restart recovery */ + ptlrpc_connect_import(imp); - RETURN(rc); + RETURN(rc); } /** @@ -3049,33 +3246,31 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); memset(aa, 0, sizeof(*aa)); - /* Prepare request to be resent with ptlrpcd */ - aa->praa_old_state = req->rq_send_state; - req->rq_send_state = LUSTRE_IMP_REPLAY; - req->rq_phase = RQ_PHASE_NEW; - req->rq_next_phase = RQ_PHASE_UNDEFINED; - if (req->rq_repmsg) - aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg); - req->rq_status = 0; - req->rq_interpret_reply = ptlrpc_replay_interpret; - /* Readjust the timeout for current conditions */ - ptlrpc_at_set_req_timeout(req); - - /* Tell server the net_latency, so the server can calculate how long - * it should wait for next replay */ - lustre_msg_set_service_time(req->rq_reqmsg, - ptlrpc_at_get_net_latency(req)); - DEBUG_REQ(D_HA, req, "REPLAY"); + /* Prepare request to be resent with ptlrpcd */ + aa->praa_old_state = req->rq_send_state; + req->rq_send_state = LUSTRE_IMP_REPLAY; + req->rq_phase = RQ_PHASE_NEW; + req->rq_next_phase = RQ_PHASE_UNDEFINED; + if (req->rq_repmsg) + aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg); + req->rq_status = 0; + req->rq_interpret_reply = ptlrpc_replay_interpret; + /* Readjust the timeout for current conditions */ + ptlrpc_at_set_req_timeout(req); + + /* Tell server net_latency to calculate how long to wait for reply. */ + lustre_msg_set_service_time(req->rq_reqmsg, + ptlrpc_at_get_net_latency(req)); + DEBUG_REQ(D_HA, req, "REPLAY"); atomic_inc(&req->rq_import->imp_replay_inflight); spin_lock(&req->rq_lock); req->rq_early_free_repbuf = 0; spin_unlock(&req->rq_lock); - ptlrpc_request_addref(req); /* ptlrpcd needs a ref */ + ptlrpc_request_addref(req); /* ptlrpcd needs a ref */ ptlrpcd_add_req(req); RETURN(0); @@ -3089,13 +3284,15 @@ void ptlrpc_abort_inflight(struct obd_import *imp) struct list_head *tmp, *n; ENTRY; - /* Make sure that no new requests get processed for this import. + /* + * Make sure that no new requests get processed for this import. * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing * this flag and then putting requests on sending_list or delayed_list. */ - spin_lock(&imp->imp_lock); + assert_spin_locked(&imp->imp_lock); - /* XXX locking? Maybe we should remove each request with the list + /* + * XXX locking? Maybe we should remove each request with the list * locked? Also, how do we know if the requests on the list are * being freed at this time? */ @@ -3104,7 +3301,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) struct ptlrpc_request, rq_list); - DEBUG_REQ(D_RPCTRACE, req, "inflight"); + DEBUG_REQ(D_RPCTRACE, req, "inflight"); spin_lock(&req->rq_lock); if (req->rq_import_generation < imp->imp_generation) { @@ -3130,13 +3327,13 @@ void ptlrpc_abort_inflight(struct obd_import *imp) spin_unlock(&req->rq_lock); } - /* Last chance to free reqs left on the replay list, but we - * will still leak reqs that haven't committed. */ + /* + * Last chance to free reqs left on the replay list, but we + * will still leak reqs that haven't committed. + */ if (imp->imp_replayable) ptlrpc_free_committed(imp); - spin_unlock(&imp->imp_lock); - EXIT; } @@ -3167,9 +3364,6 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set) } } -static __u64 ptlrpc_last_xid; -static spinlock_t ptlrpc_last_xid_lock; - /** * Initialize the XID for the node. This is common among all requests on * this node, and only requires the property that it is monotonically @@ -3189,19 +3383,21 @@ static spinlock_t ptlrpc_last_xid_lock; void ptlrpc_init_xid(void) { time64_t now = ktime_get_real_seconds(); + u64 xid; - spin_lock_init(&ptlrpc_last_xid_lock); if (now < YEAR_2004) { - cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid)); - ptlrpc_last_xid >>= 2; - ptlrpc_last_xid |= (1ULL << 61); + get_random_bytes(&xid, sizeof(xid)); + xid >>= 2; + xid |= (1ULL << 61); } else { - ptlrpc_last_xid = (__u64)now << 20; + xid = (u64)now << 20; } /* Need to always be aligned to a power-of-two for mutli-bulk BRW */ - CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0); - ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK; + BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) != + 0); + xid &= PTLRPC_BULK_OPS_MASK; + atomic64_set(&ptlrpc_last_xid, xid); } /** @@ -3218,14 +3414,7 @@ void ptlrpc_init_xid(void) */ __u64 ptlrpc_next_xid(void) { - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - ptlrpc_last_xid = next; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; + return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid); } /** @@ -3241,21 +3430,28 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) LASSERT(bd != NULL); - /* Generate new matchbits for all resend requests, including - * resend replay. */ + /* + * Generate new matchbits for all resend requests, including + * resend replay. + */ if (req->rq_resend) { __u64 old_mbits = req->rq_mbits; - /* First time resend on -EINPROGRESS will generate new xid, + /* + * First time resend on -EINPROGRESS will generate new xid, * so we can actually use the rq_xid as rq_mbits in such case, * however, it's bit hard to distinguish such resend with a * 'resend for the -EINPROGRESS resend'. To make it simple, - * we opt to generate mbits for all resend cases. */ - if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)){ + * we opt to generate mbits for all resend cases. + */ + if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, + BULK_MBITS)) { req->rq_mbits = ptlrpc_next_xid(); } else { - /* Old version transfers rq_xid to peer as - * matchbits. */ + /* + * Old version transfers rq_xid to peer as + * matchbits. + */ spin_lock(&req->rq_import->imp_lock); list_del_init(&req->rq_unreplied_list); ptlrpc_assign_next_xid_nolock(req); @@ -3266,24 +3462,37 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) old_mbits, req->rq_mbits); } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { /* Request being sent first time, use xid as matchbits. */ - req->rq_mbits = req->rq_xid; + if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS) + || req->rq_mbits == 0) { + req->rq_mbits = req->rq_xid; + } else { + int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) / + LNET_MAX_IOV; + req->rq_mbits -= total_md - 1; + } } else { - /* Replay request, xid and matchbits have already been - * correctly assigned. */ + /* + * Replay request, xid and matchbits have already been + * correctly assigned. + */ return; } - /* For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so + /* + * For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so * that server can infer the number of bulks that were prepared, - * see LU-1431 */ + * see LU-1431 + */ req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV) - 1; - /* Set rq_xid as rq_mbits to indicate the final bulk for the old + /* + * Set rq_xid as rq_mbits to indicate the final bulk for the old * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808. * * It's ok to directly set the rq_xid here, since this xid bump - * won't affect the request position in unreplied list. */ + * won't affect the request position in unreplied list. + */ if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)) req->rq_xid = req->rq_mbits; } @@ -3294,19 +3503,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) */ __u64 ptlrpc_sample_next_xid(void) { -#if BITS_PER_LONG == 32 - /* need to avoid possible word tearing on 32-bit systems */ - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; -#else - /* No need to lock, since returned value is racy anyways */ - return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; -#endif + return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT; } EXPORT_SYMBOL(ptlrpc_sample_next_xid); @@ -3328,8 +3525,8 @@ EXPORT_SYMBOL(ptlrpc_sample_next_xid); * have delay before it really runs by ptlrpcd thread. */ struct ptlrpc_work_async_args { - int (*cb)(const struct lu_env *, void *); - void *cbdata; + int (*cb)(const struct lu_env *, void *); + void *cbdata; }; static void ptlrpcd_add_work_req(struct ptlrpc_request *req) @@ -3347,9 +3544,9 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req) } static int work_interpreter(const struct lu_env *env, - struct ptlrpc_request *req, void *data, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct ptlrpc_work_async_args *arg = data; + struct ptlrpc_work_async_args *arg = args; LASSERT(ptlrpcd_check_work(req)); LASSERT(arg->cb != NULL); @@ -3379,18 +3576,18 @@ static int ptlrpcd_check_work(struct ptlrpc_request *req) void *ptlrpcd_alloc_work(struct obd_import *imp, int (*cb)(const struct lu_env *, void *), void *cbdata) { - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req = NULL; struct ptlrpc_work_async_args *args; - ENTRY; + ENTRY; might_sleep(); - if (cb == NULL) + if (!cb) RETURN(ERR_PTR(-EINVAL)); /* copy some code from deprecated fakereq. */ req = ptlrpc_request_cache_alloc(GFP_NOFS); - if (req == NULL) { + if (!req) { CERROR("ptlrpc: run out of memory!\n"); RETURN(ERR_PTR(-ENOMEM)); } @@ -3405,8 +3602,7 @@ void *ptlrpcd_alloc_work(struct obd_import *imp, req->rq_no_delay = req->rq_no_resend = 1; req->rq_pill.rc_fmt = (void *)&worker_format; - CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args)); - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->cb = cb; args->cbdata = cbdata; @@ -3416,10 +3612,10 @@ EXPORT_SYMBOL(ptlrpcd_alloc_work); void ptlrpcd_destroy_work(void *handler) { - struct ptlrpc_request *req = handler; + struct ptlrpc_request *req = handler; - if (req) - ptlrpc_req_finished(req); + if (req) + ptlrpc_req_finished(req); } EXPORT_SYMBOL(ptlrpcd_destroy_work); @@ -3427,14 +3623,14 @@ int ptlrpcd_queue_work(void *handler) { struct ptlrpc_request *req = handler; - /* - * Check if the req is already being queued. - * - * Here comes a trick: it lacks a way of checking if a req is being - * processed reliably in ptlrpc. Here I have to use refcount of req - * for this purpose. This is okay because the caller should use this - * req as opaque data. - Jinshan - */ + /* + * Check if the req is already being queued. + * + * Here comes a trick: it lacks a way of checking if a req is being + * processed reliably in ptlrpc. Here I have to use refcount of req + * for this purpose. This is okay because the caller should use this + * req as opaque data. - Jinshan + */ LASSERT(atomic_read(&req->rq_refcount) > 0); if (atomic_inc_return(&req->rq_refcount) == 2) ptlrpcd_add_work_req(req);