X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=0577f6aa898425dbc9d4d2e1f4482a49d6948d53;hp=e145ea3057d8bb9580f05b45e6ba907f93d7b28f;hb=5c89fa57cb2dfff667ddd75eaeb13f942f5b155d;hpb=c3668865c6496bacec56c7779acc915a64671540 diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index e145ea3..0577f6a 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -23,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2016, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -34,6 +34,10 @@ #define DEBUG_SUBSYSTEM S_RPC +#include +#include + +#include #include #include #include @@ -43,6 +47,50 @@ #include "ptlrpc_internal.h" +static void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1); +} + +static void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0); +} + +static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc) +{ + int i; + + for (i = 0; i < desc->bd_iov_count ; i++) + put_page(desc->bd_vec[i].bv_page); +} + +static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc, + void *frag, int len) +{ + unsigned int offset = (unsigned long)frag & ~PAGE_MASK; + + ENTRY; + while (len > 0) { + int page_len = min_t(unsigned int, PAGE_SIZE - offset, + len); + unsigned long vaddr = (unsigned long)frag; + + ptlrpc_prep_bulk_page_nopin(desc, + lnet_kvaddr_to_page(vaddr), + offset, page_len); + offset = 0; + len -= page_len; + frag += page_len; + } + + RETURN(desc->bd_nob); +} + const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_pin, .release_frags = ptlrpc_release_bulk_page_pin, @@ -52,14 +100,10 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops); const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_nopin, .release_frags = ptlrpc_release_bulk_noop, + .add_iov_frag = ptlrpc_prep_bulk_frag_pages, }; EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops); -const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = { - .add_iov_frag = ptlrpc_prep_bulk_frag, -}; -EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops); - static int ptlrpc_send_new_req(struct ptlrpc_request *req); static int ptlrpcd_check_work(struct ptlrpc_request *req); static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async); @@ -67,12 +111,12 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async); /** * Initialize passed in client structure \a cl. */ -void ptlrpc_init_client(int req_portal, int rep_portal, char *name, - struct ptlrpc_client *cl) +void ptlrpc_init_client(int req_portal, int rep_portal, const char *name, + struct ptlrpc_client *cl) { - cl->cli_request_portal = req_portal; - cl->cli_reply_portal = rep_portal; - cl->cli_name = name; + cl->cli_request_portal = req_portal; + cl->cli_reply_portal = rep_portal; + cl->cli_name = name; } EXPORT_SYMBOL(ptlrpc_init_client); @@ -83,12 +127,14 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid, lnet_nid_t nid4refnet) { struct ptlrpc_connection *c; - lnet_nid_t self; + lnet_nid_t self; struct lnet_process_id peer; - int err; + int err; - /* ptlrpc_uuid_to_peer() initializes its 2nd parameter - * before accessing its values. */ + /* + * ptlrpc_uuid_to_peer() initializes its 2nd parameter + * before accessing its values. + */ /* coverity[uninit_use_in_call] */ peer.nid = nid4refnet; err = ptlrpc_uuid_to_peer(uuid, &peer, &self); @@ -112,34 +158,31 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid, * Allocate and initialize new bulk descriptor on the sender. * Returns pointer to the descriptor or NULL on error. */ -struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, +struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, + unsigned int max_brw, enum ptlrpc_bulk_op_type type, - unsigned portal, + unsigned int portal, const struct ptlrpc_bulk_frag_ops *ops) { struct ptlrpc_bulk_desc *desc; int i; - /* ensure that only one of KIOV or IOVEC is set but not both */ - LASSERT((ptlrpc_is_bulk_desc_kiov(type) && - ops->add_kiov_frag != NULL) || - (ptlrpc_is_bulk_desc_kvec(type) && - ops->add_iov_frag != NULL)); + LASSERT(ops->add_kiov_frag != NULL); + + if (max_brw > PTLRPC_BULK_OPS_COUNT) + RETURN(NULL); + + if (nfrags > LNET_MAX_IOV * max_brw) + RETURN(NULL); OBD_ALLOC_PTR(desc); - if (desc == NULL) + if (!desc) return NULL; - if (type & PTLRPC_BULK_BUF_KIOV) { - OBD_ALLOC_LARGE(GET_KIOV(desc), - nfrags * sizeof(*GET_KIOV(desc))); - if (GET_KIOV(desc) == NULL) - goto out; - } else { - OBD_ALLOC_LARGE(GET_KVEC(desc), - nfrags * sizeof(*GET_KVEC(desc))); - if (GET_KVEC(desc) == NULL) - goto out; - } + + OBD_ALLOC_LARGE(desc->bd_vec, + nfrags * sizeof(*desc->bd_vec)); + if (!desc->bd_vec) + goto out; spin_lock_init(&desc->bd_lock); init_waitqueue_head(&desc->bd_waitq); @@ -148,11 +191,14 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, desc->bd_portal = portal; desc->bd_type = type; desc->bd_md_count = 0; - desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *) ops; + desc->bd_nob_last = LNET_MTU; + desc->bd_frag_ops = ops; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); - /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this - * node. Negotiated ocd_brw_size will always be <= this number. */ + /* + * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this + * node. Negotiated ocd_brw_size will always be <= this number. + */ for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++) LNetInvalidateMDHandle(&desc->bd_mds[i]); @@ -170,9 +216,10 @@ out: * error. */ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, - unsigned nfrags, unsigned max_brw, + unsigned int nfrags, + unsigned int max_brw, unsigned int type, - unsigned portal, + unsigned int portal, const struct ptlrpc_bulk_frag_ops *ops) { @@ -183,20 +230,19 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, LASSERT(ptlrpc_is_bulk_op_passive(type)); desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops); - if (desc == NULL) + if (!desc) RETURN(NULL); - desc->bd_import_generation = req->rq_import_generation; - desc->bd_import = class_import_get(imp); - desc->bd_req = req; + desc->bd_import = class_import_get(imp); + desc->bd_req = req; - desc->bd_cbid.cbid_fn = client_bulk_callback; - desc->bd_cbid.cbid_arg = desc; + desc->bd_cbid.cbid_fn = client_bulk_callback; + desc->bd_cbid.cbid_arg = desc; - /* This makes req own desc, and free it when she frees herself */ - req->rq_bulk = desc; + /* This makes req own desc, and free it when she frees herself */ + req->rq_bulk = desc; - return desc; + return desc; } EXPORT_SYMBOL(ptlrpc_prep_bulk_imp); @@ -204,66 +250,49 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page, int pageoffset, int len, int pin) { - lnet_kiov_t *kiov; + struct bio_vec *kiov; LASSERT(desc->bd_iov_count < desc->bd_max_iov); LASSERT(page != NULL); LASSERT(pageoffset >= 0); LASSERT(len > 0); LASSERT(pageoffset + len <= PAGE_SIZE); - LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type)); - kiov = &BD_GET_KIOV(desc, desc->bd_iov_count); + kiov = &desc->bd_vec[desc->bd_iov_count]; + + if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) || + ((desc->bd_nob_last + len) > LNET_MTU)) { + desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count; + desc->bd_md_count++; + desc->bd_nob_last = 0; + LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT); + } + desc->bd_nob_last += len; desc->bd_nob += len; if (pin) get_page(page); - kiov->kiov_page = page; - kiov->kiov_offset = pageoffset; - kiov->kiov_len = len; + kiov->bv_page = page; + kiov->bv_offset = pageoffset; + kiov->bv_len = len; desc->bd_iov_count++; } EXPORT_SYMBOL(__ptlrpc_prep_bulk_page); -int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc, - void *frag, int len) -{ - struct kvec *iovec; - ENTRY; - - LASSERT(desc->bd_iov_count < desc->bd_max_iov); - LASSERT(frag != NULL); - LASSERT(len > 0); - LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type)); - - iovec = &BD_GET_KVEC(desc, desc->bd_iov_count); - - desc->bd_nob += len; - - iovec->iov_base = frag; - iovec->iov_len = len; - - desc->bd_iov_count++; - - RETURN(desc->bd_nob); -} -EXPORT_SYMBOL(ptlrpc_prep_bulk_frag); - void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) { ENTRY; LASSERT(desc != NULL); LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */ - LASSERT(desc->bd_md_count == 0); /* network hands off */ + LASSERT(desc->bd_refs == 0); /* network hands off */ LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL)); LASSERT(desc->bd_frag_ops != NULL); - if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) - sptlrpc_enc_pool_put_pages(desc); + sptlrpc_enc_pool_put_pages(desc); if (desc->bd_export) class_export_put(desc->bd_export); @@ -273,12 +302,8 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) if (desc->bd_frag_ops->release_frags != NULL) desc->bd_frag_ops->release_frags(desc); - if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) - OBD_FREE_LARGE(GET_KIOV(desc), - desc->bd_max_iov * sizeof(*GET_KIOV(desc))); - else - OBD_FREE_LARGE(GET_KVEC(desc), - desc->bd_max_iov * sizeof(*GET_KVEC(desc))); + OBD_FREE_LARGE(desc->bd_vec, + desc->bd_max_iov * sizeof(*desc->bd_vec)); OBD_FREE_PTR(desc); EXIT; } @@ -290,79 +315,91 @@ EXPORT_SYMBOL(ptlrpc_free_bulk); */ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) { - __u32 serv_est; - int idx; - struct imp_at *at; - - LASSERT(req->rq_import); - - if (AT_OFF) { - /* non-AT settings */ - /** - * \a imp_server_timeout means this is reverse import and - * we send (currently only) ASTs to the client and cannot afford - * to wait too long for the reply, otherwise the other client - * (because of which we are sending this request) would - * timeout waiting for us - */ - req->rq_timeout = req->rq_import->imp_server_timeout ? - obd_timeout / 2 : obd_timeout; - } else { - at = &req->rq_import->imp_at; - idx = import_at_get_index(req->rq_import, - req->rq_request_portal); - serv_est = at_get(&at->iat_service_estimate[idx]); - req->rq_timeout = at_est2timeout(serv_est); - } - /* We could get even fancier here, using history to predict increased - loading... */ - - /* Let the server know what this RPC timeout is by putting it in the - reqmsg*/ - lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); + LASSERT(req->rq_import); + + if (AT_OFF) { + /* non-AT settings */ + /** + * \a imp_server_timeout means this is reverse import and + * we send (currently only) ASTs to the client and cannot afford + * to wait too long for the reply, otherwise the other client + * (because of which we are sending this request) would + * timeout waiting for us + */ + req->rq_timeout = req->rq_import->imp_server_timeout ? + obd_timeout / 2 : obd_timeout; + } else { + struct imp_at *at = &req->rq_import->imp_at; + timeout_t serv_est; + int idx; + + idx = import_at_get_index(req->rq_import, + req->rq_request_portal); + serv_est = at_get(&at->iat_service_estimate[idx]); + /* + * Currently a 32 bit value is sent over the + * wire for rq_timeout so please don't change this + * to time64_t. The work for LU-1158 will in time + * replace rq_timeout with a 64 bit nanosecond value + */ + req->rq_timeout = at_est2timeout(serv_est); + } + /* + * We could get even fancier here, using history to predict increased + * loading... + * + * Let the server know what this RPC timeout is by putting it in the + * reqmsg + */ + lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); } EXPORT_SYMBOL(ptlrpc_at_set_req_timeout); /* Adjust max service estimate based on server value */ static void ptlrpc_at_adj_service(struct ptlrpc_request *req, - unsigned int serv_est) + timeout_t serv_est) { - int idx; - unsigned int oldse; - struct imp_at *at; + int idx; + timeout_t oldse; + struct imp_at *at; - LASSERT(req->rq_import); - at = &req->rq_import->imp_at; + LASSERT(req->rq_import); + at = &req->rq_import->imp_at; - idx = import_at_get_index(req->rq_import, req->rq_request_portal); - /* max service estimates are tracked on the server side, - so just keep minimal history here */ - oldse = at_measured(&at->iat_service_estimate[idx], serv_est); - if (oldse != 0) - CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d " - "has changed from %d to %d\n", - req->rq_import->imp_obd->obd_name,req->rq_request_portal, - oldse, at_get(&at->iat_service_estimate[idx])); + idx = import_at_get_index(req->rq_import, req->rq_request_portal); + /* + * max service estimates are tracked on the server side, + * so just keep minimal history here + */ + oldse = at_measured(&at->iat_service_estimate[idx], serv_est); + if (oldse != 0) + CDEBUG(D_ADAPTTO, + "The RPC service estimate for %s ptl %d has changed from %d to %d\n", + req->rq_import->imp_obd->obd_name, + req->rq_request_portal, + oldse, at_get(&at->iat_service_estimate[idx])); } /* Expected network latency per remote node (secs) */ int ptlrpc_at_get_net_latency(struct ptlrpc_request *req) { - return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency); + return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency); } /* Adjust expected network latency */ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, - unsigned int service_time) + timeout_t service_timeout) { - unsigned int nl, oldnl; - struct imp_at *at; time64_t now = ktime_get_real_seconds(); + struct imp_at *at; + timeout_t oldnl; + timeout_t nl; - LASSERT(req->rq_import); + LASSERT(req->rq_import); - if (service_time > now - req->rq_sent + 3) { - /* bz16408, however, this can also happen if early reply + if (service_timeout > now - req->rq_sent + 3) { + /* + * b=16408, however, this can also happen if early reply * is lost and client RPC is expired and resent, early reply * or reply of original RPC can still be fit in reply buffer * of resent RPC, now client is measuring time from the @@ -372,43 +409,45 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ? D_ADAPTTO : D_WARNING, "Reported service time %u > total measured time %lld\n", - service_time, now - req->rq_sent); + service_timeout, now - req->rq_sent); return; } - /* Network latency is total time less server processing time */ - nl = max_t(int, now - req->rq_sent - - service_time, 0) + 1; /* st rounding */ + /* Network latency is total time less server processing time, + * st rounding + */ + nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1; at = &req->rq_import->imp_at; - oldnl = at_measured(&at->iat_net_latency, nl); - if (oldnl != 0) - CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) " - "has changed from %d to %d\n", - req->rq_import->imp_obd->obd_name, - obd_uuid2str( - &req->rq_import->imp_connection->c_remote_uuid), - oldnl, at_get(&at->iat_net_latency)); + oldnl = at_measured(&at->iat_net_latency, nl); + if (oldnl != 0) + CDEBUG(D_ADAPTTO, + "The network latency for %s (nid %s) has changed from %d to %d\n", + req->rq_import->imp_obd->obd_name, + obd_uuid2str(&req->rq_import->imp_connection->c_remote_uuid), + oldnl, at_get(&at->iat_net_latency)); } static int unpack_reply(struct ptlrpc_request *req) { - int rc; + int rc; - if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) { - rc = ptlrpc_unpack_rep_msg(req, req->rq_replen); - if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc); - return(-EPROTO); - } - } + if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) { + rc = ptlrpc_unpack_rep_msg(req, req->rq_replen); + if (rc) { + DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d", + rc); + return -EPROTO; + } + } - rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); - if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc); - return(-EPROTO); - } - return 0; + rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); + if (rc) { + DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d", + rc); + return -EPROTO; + } + return 0; } /** @@ -419,11 +458,12 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) __must_hold(&req->rq_lock) { struct ptlrpc_request *early_req; + timeout_t service_timeout; time64_t olddl; int rc; ENTRY; - req->rq_early = 0; + req->rq_early = 0; spin_unlock(&req->rq_lock); rc = sptlrpc_cli_unwrap_early_reply(req, &early_req); @@ -439,29 +479,35 @@ __must_hold(&req->rq_lock) RETURN(rc); } - /* Use new timeout value just to adjust the local value for this + /* + * Use new timeout value just to adjust the local value for this * request, don't include it into at_history. It is unclear yet why * service time increased and should it be counted or skipped, e.g. * that can be recovery case or some error or server, the real reply - * will add all new data if it is worth to add. */ + * will add all new data if it is worth to add. + */ req->rq_timeout = lustre_msg_get_timeout(early_req->rq_repmsg); lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); /* Network latency can be adjusted, it is pure network delays */ - ptlrpc_at_adj_net_latency(req, - lustre_msg_get_service_time(early_req->rq_repmsg)); + service_timeout = lustre_msg_get_service_timeout(early_req->rq_repmsg); + ptlrpc_at_adj_net_latency(req, service_timeout); sptlrpc_cli_finish_early_reply(early_req); spin_lock(&req->rq_lock); olddl = req->rq_deadline; - /* server assumes it now has rq_timeout from when the request + /* + * server assumes it now has rq_timeout from when the request * arrived, so the client should give it at least that long. * since we don't know the arrival time we'll use the original - * sent time */ + * sent time + */ req->rq_deadline = req->rq_sent + req->rq_timeout + ptlrpc_at_get_net_latency(req); + /* The below message is checked in replay-single.sh test_65{a,b} */ + /* The below message is checked in sanity-{gss,krb5} test_8 */ DEBUG_REQ(D_ADAPTTO, req, "Early reply #%d, new deadline in %llds (%llds)", req->rq_early_count, @@ -478,7 +524,7 @@ int ptlrpc_request_cache_init(void) request_cache = kmem_cache_create("ptlrpc_cache", sizeof(struct ptlrpc_request), 0, SLAB_HWCACHE_ALIGN, NULL); - return request_cache == NULL ? -ENOMEM : 0; + return request_cache ? 0 : -ENOMEM; } void ptlrpc_request_cache_fini(void) @@ -505,14 +551,14 @@ void ptlrpc_request_cache_free(struct ptlrpc_request *req) */ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool) { - struct list_head *l, *tmp; struct ptlrpc_request *req; LASSERT(pool != NULL); spin_lock(&pool->prp_lock); - list_for_each_safe(l, tmp, &pool->prp_req_list) { - req = list_entry(l, struct ptlrpc_request, rq_list); + while ((req = list_first_entry_or_null(&pool->prp_req_list, + struct ptlrpc_request, + rq_list))) { list_del(&req->rq_list); LASSERT(req->rq_reqbuf); LASSERT(req->rq_reqbuf_len == pool->prp_rq_size); @@ -529,24 +575,22 @@ EXPORT_SYMBOL(ptlrpc_free_rq_pool); */ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) { - int i; - int size = 1; + int i; + int size = 1; - while (size < pool->prp_rq_size) - size <<= 1; + while (size < pool->prp_rq_size) + size <<= 1; LASSERTF(list_empty(&pool->prp_req_list) || - size == pool->prp_rq_size, - "Trying to change pool size with nonempty pool " - "from %d to %d bytes\n", pool->prp_rq_size, size); + size == pool->prp_rq_size, + "Trying to change pool size with nonempty pool from %d to %d bytes\n", + pool->prp_rq_size, size); - spin_lock(&pool->prp_lock); pool->prp_rq_size = size; for (i = 0; i < num_rq; i++) { struct ptlrpc_request *req; struct lustre_msg *msg; - spin_unlock(&pool->prp_lock); req = ptlrpc_request_cache_alloc(GFP_NOFS); if (!req) return i; @@ -560,8 +604,8 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) req->rq_pool = pool; spin_lock(&pool->prp_lock); list_add_tail(&req->rq_list, &pool->prp_req_list); + spin_unlock(&pool->prp_lock); } - spin_unlock(&pool->prp_lock); return num_rq; } EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); @@ -580,13 +624,14 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, { struct ptlrpc_request_pool *pool; - OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool)); + OBD_ALLOC_PTR(pool); if (!pool) return NULL; - /* Request next power of two for the allocation, because internally - kernel would do exactly this */ - + /* + * Request next power of two for the allocation, because internally + * kernel would do exactly this + */ spin_lock_init(&pool->prp_lock); INIT_LIST_HEAD(&pool->prp_req_list); pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD; @@ -604,18 +649,20 @@ EXPORT_SYMBOL(ptlrpc_init_rq_pool); static struct ptlrpc_request * ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) { - struct ptlrpc_request *request; - struct lustre_msg *reqbuf; + struct ptlrpc_request *request; + struct lustre_msg *reqbuf; - if (!pool) - return NULL; + if (!pool) + return NULL; spin_lock(&pool->prp_lock); - /* See if we have anything in a pool, and bail out if nothing, + /* + * See if we have anything in a pool, and bail out if nothing, * in writeout path, where this matters, this is safe to do, because * nothing is lost in this case, and when some in-flight requests - * complete, this code will be called again. */ + * complete, this code will be called again. + */ if (unlikely(list_empty(&pool->prp_req_list))) { spin_unlock(&pool->prp_lock); return NULL; @@ -626,16 +673,16 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) list_del_init(&request->rq_list); spin_unlock(&pool->prp_lock); - LASSERT(request->rq_reqbuf); - LASSERT(request->rq_pool); + LASSERT(request->rq_reqbuf); + LASSERT(request->rq_pool); - reqbuf = request->rq_reqbuf; - memset(request, 0, sizeof(*request)); - request->rq_reqbuf = reqbuf; - request->rq_reqbuf_len = pool->prp_rq_size; - request->rq_pool = pool; + reqbuf = request->rq_reqbuf; + memset(request, 0, sizeof(*request)); + request->rq_reqbuf = reqbuf; + request->rq_reqbuf_len = pool->prp_rq_size; + request->rq_pool = pool; - return request; + return request; } /** @@ -654,18 +701,15 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) void ptlrpc_add_unreplied(struct ptlrpc_request *req) { - struct obd_import *imp = req->rq_import; - struct list_head *tmp; - struct ptlrpc_request *iter; + struct obd_import *imp = req->rq_import; + struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); LASSERT(list_empty(&req->rq_unreplied_list)); /* unreplied list is sorted by xid in ascending order */ - list_for_each_prev(tmp, &imp->imp_unreplied_list) { - iter = list_entry(tmp, struct ptlrpc_request, - rq_unreplied_list); - + list_for_each_entry_reverse(iter, &imp->imp_unreplied_list, + rq_unreplied_list) { LASSERT(req->rq_xid != iter->rq_xid); if (req->rq_xid < iter->rq_xid) continue; @@ -688,6 +732,43 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req) spin_unlock(&req->rq_import->imp_lock); } +static atomic64_t ptlrpc_last_xid; + +static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_import->imp_lock); + list_del_init(&req->rq_unreplied_list); + ptlrpc_assign_next_xid_nolock(req); + spin_unlock(&req->rq_import->imp_lock); + DEBUG_REQ(D_RPCTRACE, req, "reassign xid"); +} + +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc; + __u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc); + lustre_msg_set_tag(req->rq_reqmsg, tag); + ptlrpc_reassign_next_xid(req); +} +EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot); + +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req) +{ + __u16 tag = lustre_msg_get_tag(req->rq_reqmsg); + + if (tag != 0) { + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + obd_put_mod_rpc_slot(cli, opc, tag); + } +} +EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot); + int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, __u32 version, int opcode, char **bufs, struct ptlrpc_cli_ctx *ctx) @@ -703,7 +784,7 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, imp = request->rq_import; lengths = request->rq_pill.rc_area[RCL_CLIENT]; - if (ctx != NULL) { + if (ctx) { request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx); } else { rc = sptlrpc_req_get_ctx(request); @@ -739,29 +820,34 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, ptlrpc_at_set_req_timeout(request); lustre_msg_set_opc(request->rq_reqmsg, opcode); - ptlrpc_assign_next_xid(request); /* Let's setup deadline for req/reply/bulk unlink for opcode. */ if (cfs_fail_val == opcode) { time64_t *fail_t = NULL, *fail2_t = NULL; - if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) + if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) { fail_t = &request->rq_bulk_deadline; - else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) { fail_t = &request->rq_reply_deadline; - else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) { fail_t = &request->rq_req_deadline; - else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) { + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) { fail_t = &request->rq_reply_deadline; fail2_t = &request->rq_bulk_deadline; + } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) { + time64_t now = ktime_get_real_seconds(); + u64 xid = ((u64)now >> 4) << 24; + + atomic64_set(&ptlrpc_last_xid, xid); } if (fail_t) { - *fail_t = ktime_get_real_seconds() + LONG_UNLINK; + *fail_t = ktime_get_real_seconds() + + PTLRPC_REQ_LONG_UNLINK; if (fail2_t) *fail2_t = ktime_get_real_seconds() + - LONG_UNLINK; + PTLRPC_REQ_LONG_UNLINK; /* * The RPC is infected, let the test to change the @@ -770,6 +856,7 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, msleep(4 * MSEC_PER_SEC); } } + ptlrpc_assign_next_xid(request); RETURN(0); @@ -777,10 +864,10 @@ out_ctx: LASSERT(!request->rq_pool); sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1); out_free: + atomic_dec(&imp->imp_reqs); class_import_put(imp); return rc; - } EXPORT_SYMBOL(ptlrpc_request_bufs_pack); @@ -789,32 +876,9 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack); * steps if necessary. */ int ptlrpc_request_pack(struct ptlrpc_request *request, - __u32 version, int opcode) + __u32 version, int opcode) { - int rc; - rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); - if (rc) - return rc; - - /* For some old 1.8 clients (< 1.8.7), they will LASSERT the size of - * ptlrpc_body sent from server equal to local ptlrpc_body size, so we - * have to send old ptlrpc_body to keep interoprability with these - * clients. - * - * Only three kinds of server->client RPCs so far: - * - LDLM_BL_CALLBACK - * - LDLM_CP_CALLBACK - * - LDLM_GL_CALLBACK - * - * XXX This should be removed whenever we drop the interoprability with - * the these old clients. - */ - if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK || - opcode == LDLM_GL_CALLBACK) - req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY, - sizeof(struct ptlrpc_body_v2), RCL_CLIENT); - - return rc; + return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); } EXPORT_SYMBOL(ptlrpc_request_pack); @@ -838,13 +902,14 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, if (request) { ptlrpc_cli_req_init(request); - LASSERTF((unsigned long)imp > 0x1000, "%p", imp); + LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp); LASSERT(imp != LP_POISON); LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n", - imp->imp_client); + imp->imp_client); LASSERT(imp->imp_client != LP_POISON); request->rq_import = class_import_get(imp); + atomic_inc(&imp->imp_reqs); } else { CERROR("request allocation out of memory\n"); } @@ -852,6 +917,33 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, return request; } +static int ptlrpc_reconnect_if_idle(struct obd_import *imp) +{ + int rc; + + /* + * initiate connection if needed when the import has been + * referenced by the new request to avoid races with disconnect. + * serialize this check against conditional state=IDLE + * in ptlrpc_disconnect_idle_interpret() + */ + spin_lock(&imp->imp_lock); + if (imp->imp_state == LUSTRE_IMP_IDLE) { + imp->imp_generation++; + imp->imp_initiated_at = imp->imp_generation; + imp->imp_state = LUSTRE_IMP_NEW; + + /* connect_import_locked releases imp_lock */ + rc = ptlrpc_connect_import_locked(imp); + if (rc) + return rc; + ptlrpc_pinger_add_import(imp); + } else { + spin_unlock(&imp->imp_lock); + } + return 0; +} + /** * Helper function for creating a request. * Calls __ptlrpc_request_alloc to allocate new request sturcture and inits @@ -860,18 +952,28 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, */ static struct ptlrpc_request * ptlrpc_request_alloc_internal(struct obd_import *imp, - struct ptlrpc_request_pool * pool, - const struct req_format *format) + struct ptlrpc_request_pool *pool, + const struct req_format *format) { - struct ptlrpc_request *request; + struct ptlrpc_request *request; + + request = __ptlrpc_request_alloc(imp, pool); + if (!request) + return NULL; - request = __ptlrpc_request_alloc(imp, pool); - if (request == NULL) - return NULL; + /* don't make expensive check for idling connection + * if it's already connected */ + if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) { + if (ptlrpc_reconnect_if_idle(imp) < 0) { + atomic_dec(&imp->imp_reqs); + ptlrpc_request_free(request); + return NULL; + } + } - req_capsule_init(&request->rq_pill, request, RCL_CLIENT); - req_capsule_set(&request->rq_pill, format); - return request; + req_capsule_init(&request->rq_pill, request, RCL_CLIENT); + req_capsule_set(&request->rq_pill, format); + return request; } /** @@ -879,9 +981,9 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, * buffer structure according to capsule template \a format. */ struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp, - const struct req_format *format) + const struct req_format *format) { - return ptlrpc_request_alloc_internal(imp, NULL, format); + return ptlrpc_request_alloc_internal(imp, NULL, format); } EXPORT_SYMBOL(ptlrpc_request_alloc); @@ -889,11 +991,12 @@ EXPORT_SYMBOL(ptlrpc_request_alloc); * Allocate new request structure for import \a imp from pool \a pool and * initialize its buffer structure according to capsule template \a format. */ -struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp, - struct ptlrpc_request_pool * pool, - const struct req_format *format) +struct ptlrpc_request * +ptlrpc_request_alloc_pool(struct obd_import *imp, + struct ptlrpc_request_pool *pool, + const struct req_format *format) { - return ptlrpc_request_alloc_internal(imp, pool, format); + return ptlrpc_request_alloc_internal(imp, pool, format); } EXPORT_SYMBOL(ptlrpc_request_alloc_pool); @@ -918,20 +1021,20 @@ EXPORT_SYMBOL(ptlrpc_request_free); * Returns allocated request or NULL on error. */ struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp, - const struct req_format *format, - __u32 version, int opcode) + const struct req_format *format, + __u32 version, int opcode) { - struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format); - int rc; + struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format); + int rc; - if (req) { - rc = ptlrpc_request_pack(req, version, opcode); - if (rc) { - ptlrpc_request_free(req); - req = NULL; - } - } - return req; + if (req) { + rc = ptlrpc_request_pack(req, version, opcode); + if (rc) { + ptlrpc_request_free(req); + req = NULL; + } + } + return req; } EXPORT_SYMBOL(ptlrpc_request_alloc_pack); @@ -941,12 +1044,12 @@ EXPORT_SYMBOL(ptlrpc_request_alloc_pack); */ struct ptlrpc_request_set *ptlrpc_prep_set(void) { - struct ptlrpc_request_set *set; - int cpt; + struct ptlrpc_request_set *set; + int cpt; ENTRY; - cpt = cfs_cpt_current(cfs_cpt_table, 0); - OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof *set); + cpt = cfs_cpt_current(cfs_cpt_tab, 0); + OBD_CPT_ALLOC(set, cfs_cpt_tab, cpt, sizeof(*set)); if (!set) RETURN(NULL); atomic_set(&set->set_refcount, 1); @@ -956,7 +1059,6 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void) atomic_set(&set->set_remaining, 0); spin_lock_init(&set->set_new_req_lock); INIT_LIST_HEAD(&set->set_new_requests); - INIT_LIST_HEAD(&set->set_cblist); set->set_max_inflight = UINT_MAX; set->set_producer = NULL; set->set_producer_arg = NULL; @@ -1001,20 +1103,16 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, */ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct list_head *next; - int expected_phase; - int n = 0; + struct ptlrpc_request *req; + int expected_phase; + int n = 0; + ENTRY; /* Requests on the set should either all be completed, or all be new */ expected_phase = (atomic_read(&set->set_remaining) == 0) ? RQ_PHASE_COMPLETE : RQ_PHASE_NEW; - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { LASSERT(req->rq_phase == expected_phase); n++; } @@ -1023,10 +1121,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) atomic_read(&set->set_remaining) == n, "%d / %d\n", atomic_read(&set->set_remaining), n); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + while ((req = list_first_entry_or_null(&set->set_requests, + struct ptlrpc_request, + rq_set_chain))) { list_del_init(&req->rq_set_chain); LASSERT(req->rq_phase == expected_phase); @@ -1041,8 +1138,8 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) req->rq_invalid_rqset = 0; spin_unlock(&req->rq_lock); - ptlrpc_req_finished (req); - } + ptlrpc_req_finished(req); + } LASSERT(atomic_read(&set->set_remaining) == 0); @@ -1052,33 +1149,18 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) EXPORT_SYMBOL(ptlrpc_set_destroy); /** - * Add a callback function \a fn to the set. - * This function would be called when all requests on this set are completed. - * The function will be passed \a data argument. - */ -int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, - set_interpreter_func fn, void *data) -{ - struct ptlrpc_set_cbdata *cbdata; - - OBD_ALLOC_PTR(cbdata); - if (cbdata == NULL) - RETURN(-ENOMEM); - - cbdata->psc_interpret = fn; - cbdata->psc_data = data; - list_add_tail(&cbdata->psc_item, &set->set_cblist); - - RETURN(0); -} - -/** * Add a new request to the general purpose request set. * Assumes request reference from the caller. */ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { + if (set == PTLRPCD_SET) { + ptlrpcd_add_req(req); + return; + } + + LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE); LASSERT(list_empty(&req->rq_set_chain)); if (req->rq_allow_intr) @@ -1090,12 +1172,14 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, atomic_inc(&set->set_remaining); req->rq_queued_time = ktime_get_seconds(); - if (req->rq_reqmsg != NULL) + if (req->rq_reqmsg) lustre_msg_set_jobid(req->rq_reqmsg, NULL); - if (set->set_producer != NULL) - /* If the request set has a producer callback, the RPC must be - * sent straight away */ + if (set->set_producer) + /* + * If the request set has a producer callback, the RPC must be + * sent straight away + */ ptlrpc_send_new_req(req); } EXPORT_SYMBOL(ptlrpc_set_add_req); @@ -1106,12 +1190,12 @@ EXPORT_SYMBOL(ptlrpc_set_add_req); * Currently only used for ptlrpcd. */ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { - struct ptlrpc_request_set *set = pc->pc_set; - int count, i; + struct ptlrpc_request_set *set = pc->pc_set; + int count, i; - LASSERT(req->rq_set == NULL); + LASSERT(req->rq_set == NULL); LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0); spin_lock(&set->set_new_req_lock); @@ -1128,9 +1212,11 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, if (count == 1) { wake_up(&set->set_waitq); - /* XXX: It maybe unnecessary to wakeup all the partners. But to + /* + * XXX: It maybe unnecessary to wakeup all the partners. But to * guarantee the async RPC can be processed ASAP, we have - * no other better choice. It maybe fixed in future. */ + * no other better choice. It maybe fixed in future. + */ for (i = 0; i < pc->pc_npartners; i++) wake_up(&pc->pc_partners[i]->pc_set->set_waitq); } @@ -1147,33 +1233,36 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, * The imp->imp_lock must be held. */ static int ptlrpc_import_delay_req(struct obd_import *imp, - struct ptlrpc_request *req, int *status) + struct ptlrpc_request *req, int *status) { - int delay = 0; - ENTRY; + int delay = 0; - LASSERT (status != NULL); - *status = 0; + ENTRY; + LASSERT(status); + *status = 0; if (req->rq_ctx_init || req->rq_ctx_fini) { /* always allow ctx init/fini rpc go through */ } else if (imp->imp_state == LUSTRE_IMP_NEW) { - DEBUG_REQ(D_ERROR, req, "Uninitialized import."); + DEBUG_REQ(D_ERROR, req, "Uninitialized import"); *status = -EIO; } else if (imp->imp_state == LUSTRE_IMP_CLOSED) { unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg); - /* pings or MDS-equivalent STATFS may safely race with umount */ + /* + * pings or MDS-equivalent STATFS may safely + * race with umount + */ DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ? - D_HA : D_ERROR, req, "IMP_CLOSED "); + D_HA : D_ERROR, req, "IMP_CLOSED"); *status = -EIO; } else if (ptlrpc_send_limit_expired(req)) { - /* probably doesn't need to be a D_ERROR after initial testing*/ - DEBUG_REQ(D_HA, req, "send limit expired "); + /* probably doesn't need to be a D_ERROR afterinitial testing */ + DEBUG_REQ(D_HA, req, "send limit expired"); *status = -ETIMEDOUT; } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING && imp->imp_state == LUSTRE_IMP_CONNECTING) { - /* allow CONNECT even if import is invalid */ ; + ;/* allow CONNECT even if import is invalid */ if (atomic_read(&imp->imp_inval_count) != 0) { DEBUG_REQ(D_ERROR, req, "invalidate in flight"); *status = -EIO; @@ -1181,23 +1270,25 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, } else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) { if (!imp->imp_deactive) DEBUG_REQ(D_NET, req, "IMP_INVALID"); - *status = -ESHUTDOWN; /* bz 12940 */ + *status = -ESHUTDOWN; /* b=12940 */ } else if (req->rq_import_generation != imp->imp_generation) { - DEBUG_REQ(D_ERROR, req, "req wrong generation:"); - *status = -EIO; - } else if (req->rq_send_state != imp->imp_state) { - /* invalidate in progress - any requests should be drop */ + DEBUG_REQ(D_ERROR, req, "req wrong generation:"); + *status = -EIO; + } else if (req->rq_send_state != imp->imp_state) { + /* invalidate in progress - any requests should be drop */ if (atomic_read(&imp->imp_inval_count) != 0) { - DEBUG_REQ(D_ERROR, req, "invalidate in flight"); - *status = -EIO; - } else if (req->rq_no_delay) { - *status = -EWOULDBLOCK; + DEBUG_REQ(D_ERROR, req, "invalidate in flight"); + *status = -EIO; + } else if (req->rq_no_delay && + imp->imp_generation != imp->imp_initiated_at) { + /* ignore nodelay for requests initiating connections */ + *status = -EAGAIN; } else if (req->rq_allow_replay && - (imp->imp_state == LUSTRE_IMP_REPLAY || - imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || - imp->imp_state == LUSTRE_IMP_REPLAY_WAIT || - imp->imp_state == LUSTRE_IMP_RECOVER)) { - DEBUG_REQ(D_HA, req, "allow during recovery.\n"); + (imp->imp_state == LUSTRE_IMP_REPLAY || + imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || + imp->imp_state == LUSTRE_IMP_REPLAY_WAIT || + imp->imp_state == LUSTRE_IMP_RECOVER)) { + DEBUG_REQ(D_HA, req, "allow during recovery"); } else { delay = 1; } @@ -1221,15 +1312,16 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err) /* Suppress particular reconnect errors which are to be expected. */ if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) { - /* Suppress timed out reconnect requests */ if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) || req->rq_timedout) return false; - /* Suppress most unavailable/again reconnect requests, but + /* + * Suppress most unavailable/again reconnect requests, but * print occasionally so it is clear client is trying to - * connect to a server where no target is running. */ + * connect to a server where no target is running. + */ if ((err == -ENODEV || err == -EAGAIN) && req->rq_import->imp_conn_cnt % 30 != 20) return false; @@ -1253,32 +1345,28 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err) */ static int ptlrpc_check_status(struct ptlrpc_request *req) { - int err; - ENTRY; + int rc; - err = lustre_msg_get_status(req->rq_repmsg); + ENTRY; + rc = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { struct obd_import *imp = req->rq_import; lnet_nid_t nid = imp->imp_connection->c_peer.nid; __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); - if (ptlrpc_console_allow(req, opc, err)) - LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s " - "failed: rc = %d\n", + if (ptlrpc_console_allow(req, opc, rc)) + LCONSOLE_ERROR_MSG(0x11, + "%s: operation %s to node %s failed: rc = %d\n", imp->imp_obd->obd_name, ll_opcode2str(opc), - libcfs_nid2str(nid), err); - RETURN(err < 0 ? err : -EINVAL); + libcfs_nid2str(nid), rc); + RETURN(rc < 0 ? rc : -EINVAL); } - if (err < 0) { - DEBUG_REQ(D_INFO, req, "status is %d", err); - } else if (err > 0) { - /* XXX: translate this error from net to host */ - DEBUG_REQ(D_INFO, req, "status is %d", err); - } + if (rc) + DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc); - RETURN(err); + RETURN(rc); } /** @@ -1288,20 +1376,20 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) */ static void ptlrpc_save_versions(struct ptlrpc_request *req) { - struct lustre_msg *repmsg = req->rq_repmsg; - struct lustre_msg *reqmsg = req->rq_reqmsg; - __u64 *versions = lustre_msg_get_versions(repmsg); - ENTRY; + struct lustre_msg *repmsg = req->rq_repmsg; + struct lustre_msg *reqmsg = req->rq_reqmsg; + __u64 *versions = lustre_msg_get_versions(repmsg); - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) - return; + ENTRY; + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + return; - LASSERT(versions); - lustre_msg_set_versions(reqmsg, versions); + LASSERT(versions); + lustre_msg_set_versions(reqmsg, versions); CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n", - versions[0], versions[1]); + versions[0], versions[1]); - EXIT; + EXIT; } __u64 ptlrpc_known_replied_xid(struct obd_import *imp) @@ -1339,43 +1427,45 @@ static int after_reply(struct ptlrpc_request *req) int rc; ENTRY; - LASSERT(obd != NULL); - /* repbuf must be unlinked */ + LASSERT(obd != NULL); + /* repbuf must be unlinked */ LASSERT(!req->rq_receiving_reply && req->rq_reply_unlinked); if (req->rq_reply_truncated) { - if (ptlrpc_no_resend(req)) { - DEBUG_REQ(D_ERROR, req, "reply buffer overflow," - " expected: %d, actual size: %d", - req->rq_nob_received, req->rq_repbuf_len); - RETURN(-EOVERFLOW); - } - - sptlrpc_cli_free_repbuf(req); - /* Pass the required reply buffer size (include - * space for early reply). - * NB: no need to roundup because alloc_repbuf - * will roundup it */ - req->rq_replen = req->rq_nob_received; - req->rq_nob_received = 0; + if (ptlrpc_no_resend(req)) { + DEBUG_REQ(D_ERROR, req, + "reply buffer overflow, expected=%d, actual size=%d", + req->rq_nob_received, req->rq_repbuf_len); + RETURN(-EOVERFLOW); + } + + sptlrpc_cli_free_repbuf(req); + /* + * Pass the required reply buffer size (include + * space for early reply). + * NB: no need to roundup because alloc_repbuf + * will roundup it + */ + req->rq_replen = req->rq_nob_received; + req->rq_nob_received = 0; spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); - RETURN(0); - } + RETURN(0); + } work_start = ktime_get_real(); timediff = ktime_us_delta(work_start, req->rq_sent_ns); - /* - * NB Until this point, the whole of the incoming message, - * including buflens, status etc is in the sender's byte order. - */ - rc = sptlrpc_cli_unwrap_reply(req); - if (rc) { - DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc); - RETURN(rc); - } + /* + * NB Until this point, the whole of the incoming message, + * including buflens, status etc is in the sender's byte order. + */ + rc = sptlrpc_cli_unwrap_reply(req); + if (rc) { + DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc); + RETURN(rc); + } /* * Security layer unwrap might ask resend this request. @@ -1392,7 +1482,8 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { time64_t now = ktime_get_real_seconds(); - DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS"); + DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) | + D_RPCTRACE, req, "resending request on EINPROGRESS"); spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); @@ -1400,10 +1491,12 @@ static int after_reply(struct ptlrpc_request *req) /* Readjust the timeout for current conditions */ ptlrpc_at_set_req_timeout(req); - /* delay resend to give a chance to the server to get ready. + /* + * delay resend to give a chance to the server to get ready. * The delay is increased by 1s on every resend and is capped to * the current request timeout (i.e. obd_timeout if AT is off, - * or AT service time x 125% + 5s, see at_est2timeout) */ + * or AT service time x 125% + 5s, see at_est2timeout) + */ if (req->rq_nr_resend > req->rq_timeout) req->rq_sent = now + req->rq_timeout; else @@ -1417,27 +1510,26 @@ static int after_reply(struct ptlrpc_request *req) RETURN(0); } - if (obd->obd_svc_stats != NULL) { + if (obd->obd_svc_stats) { lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR, timediff); ptlrpc_lprocfs_rpc_sent(req, timediff); } - if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY && - lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) { - DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)", - lustre_msg_get_type(req->rq_repmsg)); - RETURN(-EPROTO); - } + if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY && + lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) { + DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)", + lustre_msg_get_type(req->rq_repmsg)); + RETURN(-EPROTO); + } - if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING) - CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val); - ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg)); - ptlrpc_at_adj_net_latency(req, - lustre_msg_get_service_time(req->rq_repmsg)); + if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING) + CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val); + ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg)); + ptlrpc_at_adj_net_latency(req, + lustre_msg_get_service_timeout(req->rq_repmsg)); - rc = ptlrpc_check_status(req); - imp->imp_connect_error = rc; + rc = ptlrpc_check_status(req); if (rc) { /* @@ -1453,48 +1545,50 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_request_handle_notconn(req); RETURN(rc); } - } else { - /* - * Let's look if server sent slv. Do it only for RPC with - * rc == 0. - */ - ldlm_cli_update_pool(req); - } - - /* - * Store transno in reqmsg for replay. - */ - if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { - req->rq_transno = lustre_msg_get_transno(req->rq_repmsg); - lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno); - } - - if (imp->imp_replayable) { + } else { + /* + * Let's look if server sent slv. Do it only for RPC with + * rc == 0. + */ + ldlm_cli_update_pool(req); + } + + /* + * Store transno in reqmsg for replay. + */ + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { + req->rq_transno = lustre_msg_get_transno(req->rq_repmsg); + lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno); + } + + if (imp->imp_replayable) { spin_lock(&imp->imp_lock); - /* - * No point in adding already-committed requests to the replay - * list, we will just remove them immediately. b=9829 - */ - if (req->rq_transno != 0 && - (req->rq_transno > - lustre_msg_get_last_committed(req->rq_repmsg) || - req->rq_replay)) { - /** version recovery */ - ptlrpc_save_versions(req); - ptlrpc_retain_replayable_request(req, imp); - } else if (req->rq_commit_cb != NULL && + /* + * No point in adding already-committed requests to the replay + * list, we will just remove them immediately. b=9829 + */ + if (req->rq_transno != 0 && + (req->rq_transno > + lustre_msg_get_last_committed(req->rq_repmsg) || + req->rq_replay)) { + /** version recovery */ + ptlrpc_save_versions(req); + ptlrpc_retain_replayable_request(req, imp); + } else if (req->rq_commit_cb && list_empty(&req->rq_replay_list)) { - /* NB: don't call rq_commit_cb if it's already on + /* + * NB: don't call rq_commit_cb if it's already on * rq_replay_list, ptlrpc_free_committed() will call - * it later, see LU-3618 for details */ + * it later, see LU-3618 for details + */ spin_unlock(&imp->imp_lock); req->rq_commit_cb(req); spin_lock(&imp->imp_lock); - } + } - /* - * Replay-enabled imports return commit-status information. - */ + /* + * Replay-enabled imports return commit-status information. + */ committed = lustre_msg_get_last_committed(req->rq_repmsg); if (likely(committed > imp->imp_peer_committed_transno)) imp->imp_peer_committed_transno = committed; @@ -1528,25 +1622,25 @@ static int after_reply(struct ptlrpc_request *req) */ static int ptlrpc_send_new_req(struct ptlrpc_request *req) { - struct obd_import *imp = req->rq_import; - __u64 min_xid = 0; - int rc; - ENTRY; + struct obd_import *imp = req->rq_import; + __u64 min_xid = 0; + int rc; - LASSERT(req->rq_phase == RQ_PHASE_NEW); + ENTRY; + LASSERT(req->rq_phase == RQ_PHASE_NEW); /* do not try to go further if there is not enough memory in enc_pool */ - if (req->rq_sent && req->rq_bulk != NULL) + if (req->rq_sent && req->rq_bulk) if (req->rq_bulk->bd_iov_count > get_free_pages_in_pool() && pool_is_at_full_capacity()) RETURN(-ENOMEM); if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) && - (!req->rq_generation_set || - req->rq_import_generation == imp->imp_generation)) - RETURN (0); + (!req->rq_generation_set || + req->rq_import_generation == imp->imp_generation)) + RETURN(0); - ptlrpc_rqphase_move(req, RQ_PHASE_RPC); + ptlrpc_rqphase_move(req, RQ_PHASE_RPC); spin_lock(&imp->imp_lock); @@ -1582,7 +1676,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) list_add_tail(&req->rq_list, &imp->imp_sending_list); atomic_inc(&req->rq_import->imp_inflight); - /* find the known replied XID from the unreplied list, CONNECT + /* + * find the known replied XID from the unreplied list, CONNECT * and DISCONNECT requests are skipped to make the sanity check * on server side happy. see process_req_last_xid(). * @@ -1600,60 +1695,65 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) lustre_msg_set_last_xid(req->rq_reqmsg, min_xid); - lustre_msg_set_status(req->rq_reqmsg, current_pid()); + lustre_msg_set_status(req->rq_reqmsg, current->pid); - rc = sptlrpc_req_refresh_ctx(req, -1); - if (rc) { - if (req->rq_err) { - req->rq_status = rc; - RETURN(1); - } else { + rc = sptlrpc_req_refresh_ctx(req, 0); + if (rc) { + if (req->rq_err) { + req->rq_status = rc; + RETURN(1); + } else { spin_lock(&req->rq_lock); req->rq_wait_ctx = 1; spin_unlock(&req->rq_lock); - RETURN(0); - } - } + RETURN(0); + } + } - CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc" - " %s:%s:%d:%llu:%s:%d\n", current_comm(), + CDEBUG(D_RPCTRACE, + "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current->comm, imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, - libcfs_nid2str(imp->imp_connection->c_peer.nid), - lustre_msg_get_opc(req->rq_reqmsg)); + obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); - rc = ptl_send_rpc(req, 0); + rc = ptl_send_rpc(req, 0); if (rc == -ENOMEM) { spin_lock(&imp->imp_lock); if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); - atomic_dec(&req->rq_import->imp_inflight); + if (atomic_dec_and_test(&req->rq_import->imp_inflight)) + wake_up(&req->rq_import->imp_recovery_waitq); } spin_unlock(&imp->imp_lock); ptlrpc_rqphase_move(req, RQ_PHASE_NEW); RETURN(rc); } - if (rc) { - DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc); + if (rc) { + DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d", + rc); spin_lock(&req->rq_lock); req->rq_net_err = 1; spin_unlock(&req->rq_lock); - RETURN(rc); - } - RETURN(0); + RETURN(rc); + } + RETURN(0); } static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) { int remaining, rc; - ENTRY; + ENTRY; LASSERT(set->set_producer != NULL); remaining = atomic_read(&set->set_remaining); - /* populate the ->set_requests list with requests until we - * reach the maximum number of RPCs in flight for this set */ + /* + * populate the ->set_requests list with requests until we + * reach the maximum number of RPCs in flight for this set + */ while (atomic_read(&set->set_remaining) < set->set_max_inflight) { rc = set->set_producer(set, set->set_producer_arg); if (rc == -ENOENT) { @@ -1677,19 +1777,16 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) */ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp, *next; - struct list_head comp_reqs; + struct ptlrpc_request *req, *next; + LIST_HEAD(comp_reqs); int force_timer_recalc = 0; - ENTRY; + ENTRY; if (atomic_read(&set->set_remaining) == 0) RETURN(1); - INIT_LIST_HEAD(&comp_reqs); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry_safe(req, next, &set->set_requests, + rq_set_chain) { struct obd_import *imp = req->rq_import; int unregistered = 0; int async = 1; @@ -1700,7 +1797,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) continue; } - /* This schedule point is mainly for the ptlrpcd caller of this + /* + * This schedule point is mainly for the ptlrpcd caller of this * function. Most ptlrpc sets are not long-lived and unbounded * in length, but at the least the set used by the ptlrpcd is. * Since the processing time is unbounded, we need to insert an @@ -1708,16 +1806,20 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) */ cond_resched(); - /* If the caller requires to allow to be interpreted by force + /* + * If the caller requires to allow to be interpreted by force * and it has really been interpreted, then move the request * to RQ_PHASE_INTERPRET phase in spite of what the current - * phase is. */ + * phase is. + */ if (unlikely(req->rq_allow_intr && req->rq_intr)) { req->rq_status = -EINTR; ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - /* Since it is interpreted and we have to wait for - * the reply to be unlinked, then use sync mode. */ + /* + * Since it is interpreted and we have to wait for + * the reply to be unlinked, then use sync mode. + */ async = 0; GOTO(interpret, req->rq_status); @@ -1767,115 +1869,120 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * not corrupt any data. */ if (req->rq_phase == RQ_PHASE_UNREG_RPC && - ptlrpc_client_recv_or_unlink(req)) + ptlrpc_cli_wait_unlink(req)) continue; if (req->rq_phase == RQ_PHASE_UNREG_BULK && ptlrpc_client_bulk_active(req)) continue; - /* - * Turn fail_loc off to prevent it from looping - * forever. - */ - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) { - OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK, - OBD_FAIL_ONCE); - } - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) { - OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK, - OBD_FAIL_ONCE); - } - - /* - * Move to next phase if reply was successfully - * unlinked. - */ - ptlrpc_rqphase_move(req, req->rq_next_phase); - } - - if (req->rq_phase == RQ_PHASE_INTERPRET) - GOTO(interpret, req->rq_status); - - /* - * Note that this also will start async reply unlink. - */ - if (req->rq_net_err && !req->rq_timedout) { - ptlrpc_expire_one_request(req, 1); - - /* - * Check if we still need to wait for unlink. - */ - if (ptlrpc_client_recv_or_unlink(req) || - ptlrpc_client_bulk_active(req)) - continue; - /* If there is no need to resend, fail it now. */ - if (req->rq_no_resend) { - if (req->rq_status == 0) - req->rq_status = -EIO; - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } else { - continue; - } - } - - if (req->rq_err) { - spin_lock(&req->rq_lock); - req->rq_replied = 0; - spin_unlock(&req->rq_lock); - if (req->rq_status == 0) - req->rq_status = -EIO; - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } - - /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr - * so it sets rq_intr regardless of individual rpc - * timeouts. The synchronous IO waiting path sets - * rq_intr irrespective of whether ptlrpcd - * has seen a timeout. Our policy is to only interpret - * interrupted rpcs after they have timed out, so we - * need to enforce that here. - */ - - if (req->rq_intr && (req->rq_timedout || req->rq_waiting || - req->rq_wait_ctx)) { - req->rq_status = -EINTR; - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } - - if (req->rq_phase == RQ_PHASE_RPC) { - if (req->rq_timedout || req->rq_resend || - req->rq_waiting || req->rq_wait_ctx) { - int status; - - if (!ptlrpc_unregister_reply(req, 1)) { - ptlrpc_unregister_bulk(req, 1); - continue; - } + /* + * Turn fail_loc off to prevent it from looping + * forever. + */ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) { + OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK, + OBD_FAIL_ONCE); + } + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) { + OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK, + OBD_FAIL_ONCE); + } + + /* + * Move to next phase if reply was successfully + * unlinked. + */ + ptlrpc_rqphase_move(req, req->rq_next_phase); + } + + if (req->rq_phase == RQ_PHASE_INTERPRET) + GOTO(interpret, req->rq_status); + + /* + * Note that this also will start async reply unlink. + */ + if (req->rq_net_err && !req->rq_timedout) { + ptlrpc_expire_one_request(req, 1); + + /* + * Check if we still need to wait for unlink. + */ + if (ptlrpc_cli_wait_unlink(req) || + ptlrpc_client_bulk_active(req)) + continue; + /* If there is no need to resend, fail it now. */ + if (req->rq_no_resend) { + if (req->rq_status == 0) + req->rq_status = -EIO; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } else { + continue; + } + } + + if (req->rq_err) { + spin_lock(&req->rq_lock); + req->rq_replied = 0; + spin_unlock(&req->rq_lock); + if (req->rq_status == 0) + req->rq_status = -EIO; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } + + /* + * ptlrpc_set_wait uses l_wait_event_abortable_timeout() + * so it sets rq_intr regardless of individual rpc + * timeouts. The synchronous IO waiting path sets + * rq_intr irrespective of whether ptlrpcd + * has seen a timeout. Our policy is to only interpret + * interrupted rpcs after they have timed out, so we + * need to enforce that here. + */ + + if (req->rq_intr && (req->rq_timedout || req->rq_waiting || + req->rq_wait_ctx)) { + req->rq_status = -EINTR; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } + + if (req->rq_phase == RQ_PHASE_RPC) { + if (req->rq_timedout || req->rq_resend || + req->rq_waiting || req->rq_wait_ctx) { + int status; + + if (!ptlrpc_unregister_reply(req, 1)) { + ptlrpc_unregister_bulk(req, 1); + continue; + } spin_lock(&imp->imp_lock); - if (ptlrpc_import_delay_req(imp, req, &status)){ - /* put on delay list - only if we wait - * recovery finished - before send */ - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp-> - imp_delayed_list); + if (ptlrpc_import_delay_req(imp, req, + &status)) { + /* + * put on delay list - only if we wait + * recovery finished - before send + */ + list_move_tail(&req->rq_list, + &imp->imp_delayed_list); spin_unlock(&imp->imp_lock); - continue; - } + continue; + } - if (status != 0) { - req->rq_status = status; - ptlrpc_rqphase_move(req, - RQ_PHASE_INTERPRET); + if (status != 0) { + req->rq_status = status; + ptlrpc_rqphase_move(req, + RQ_PHASE_INTERPRET); spin_unlock(&imp->imp_lock); GOTO(interpret, req->rq_status); } + /* ignore on just initiated connections */ if (ptlrpc_no_resend(req) && - !req->rq_wait_ctx) { + !req->rq_wait_ctx && + imp->imp_generation != + imp->imp_initiated_at) { req->rq_status = -ENOTCONN; ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); @@ -1883,9 +1990,29 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) GOTO(interpret, req->rq_status); } - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp->imp_sending_list); + /* don't resend too fast in case of network + * errors. + */ + if (ktime_get_real_seconds() < (req->rq_sent + 1) + && req->rq_net_err && req->rq_timedout) { + + DEBUG_REQ(D_INFO, req, + "throttle request"); + /* Don't try to resend RPC right away + * as it is likely it will fail again + * and ptlrpc_check_set() will be + * called again, keeping this thread + * busy. Instead, wait for the next + * timeout. Flag it as resend to + * ensure we don't wait to long. + */ + req->rq_resend = 1; + spin_unlock(&imp->imp_lock); + continue; + } + + list_move_tail(&req->rq_list, + &imp->imp_sending_list); spin_unlock(&imp->imp_lock); @@ -1894,24 +2021,22 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); if (req->rq_timedout || req->rq_resend) { - /* This is re-sending anyways, - * let's mark req as resend. */ + /* + * This is re-sending anyways, + * let's mark req as resend. + */ spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); - - if (req->rq_bulk != NULL && - !ptlrpc_unregister_bulk(req, 1)) - continue; - } - /* - * rq_wait_ctx is only touched by ptlrpcd, - * so no lock is needed here. - */ - status = sptlrpc_req_refresh_ctx(req, -1); - if (status) { - if (req->rq_err) { - req->rq_status = status; + } + /* + * rq_wait_ctx is only touched by ptlrpcd, + * so no lock is needed here. + */ + status = sptlrpc_req_refresh_ctx(req, 0); + if (status) { + if (req->rq_err) { + req->rq_status = status; spin_lock(&req->rq_lock); req->rq_wait_ctx = 0; spin_unlock(&req->rq_lock); @@ -1929,6 +2054,14 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); } + /* + * In any case, the previous bulk should be + * cleaned up to prepare for the new sending + */ + if (req->rq_bulk && + !ptlrpc_unregister_bulk(req, 1)) + continue; + rc = ptl_send_rpc(req, 0); if (rc == -ENOMEM) { spin_lock(&imp->imp_lock); @@ -1973,49 +2106,56 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); - /* unlink from net because we are going to - * swab in-place of reply buffer */ - unregistered = ptlrpc_unregister_reply(req, 1); - if (!unregistered) - continue; - - req->rq_status = after_reply(req); - if (req->rq_resend) - continue; - - /* If there is no bulk associated with this request, - * then we're done and should let the interpreter - * process the reply. Similarly if the RPC returned - * an error, and therefore the bulk will never arrive. - */ - if (req->rq_bulk == NULL || req->rq_status < 0) { - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - GOTO(interpret, req->rq_status); - } - - ptlrpc_rqphase_move(req, RQ_PHASE_BULK); - } - - LASSERT(req->rq_phase == RQ_PHASE_BULK); - if (ptlrpc_client_bulk_active(req)) - continue; + /* + * unlink from net because we are going to + * swab in-place of reply buffer + */ + unregistered = ptlrpc_unregister_reply(req, 1); + if (!unregistered) + continue; + + req->rq_status = after_reply(req); + if (req->rq_resend) + continue; + + /* + * If there is no bulk associated with this request, + * then we're done and should let the interpreter + * process the reply. Similarly if the RPC returned + * an error, and therefore the bulk will never arrive. + */ + if (!req->rq_bulk || req->rq_status < 0) { + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + GOTO(interpret, req->rq_status); + } + + ptlrpc_rqphase_move(req, RQ_PHASE_BULK); + } + + LASSERT(req->rq_phase == RQ_PHASE_BULK); + if (ptlrpc_client_bulk_active(req)) + continue; if (req->rq_bulk->bd_failure) { - /* The RPC reply arrived OK, but the bulk screwed + /* + * The RPC reply arrived OK, but the bulk screwed * up! Dead weird since the server told us the RPC * was good after getting the REPLY for her GET or - * the ACK for her PUT. */ + * the ACK for her PUT. + */ DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); req->rq_status = -EIO; } ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - interpret: +interpret: LASSERT(req->rq_phase == RQ_PHASE_INTERPRET); - /* This moves to "unregistering" phase we need to wait for - * reply unlink. */ + /* + * This moves to "unregistering" phase we need to wait for + * reply unlink. + */ if (!unregistered && !ptlrpc_unregister_reply(req, async)) { /* start async bulk unlink too */ ptlrpc_unregister_bulk(req, 1); @@ -2025,8 +2165,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (!ptlrpc_unregister_bulk(req, async)) continue; - /* When calling interpret receiving already should be - * finished. */ + /* + * When calling interpret receiving already should be + * finished. + */ LASSERT(!req->rq_receiving_reply); ptlrpc_req_interpret(env, req, req->rq_status); @@ -2037,38 +2179,44 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); - if (req->rq_reqmsg != NULL) + if (req->rq_reqmsg) CDEBUG(D_RPCTRACE, - "Completed RPC pname:cluuid:pid:xid:nid:" - "opc %s:%s:%d:%llu:%s:%d\n", current_comm(), + "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current->comm, imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, - libcfs_nid2str(imp->imp_connection->c_peer.nid), - lustre_msg_get_opc(req->rq_reqmsg)); + obd_import_nid2str(imp), + lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); spin_lock(&imp->imp_lock); - /* Request already may be not on sending or delaying list. This + /* + * Request already may be not on sending or delaying list. This * may happen in the case of marking it erroneous for the case * ptlrpc_import_delay_req(req, status) find it impossible to - * allow sending this rpc and returns *status != 0. */ + * allow sending this rpc and returns *status != 0. + */ if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); - atomic_dec(&imp->imp_inflight); + if (atomic_dec_and_test(&imp->imp_inflight)) + wake_up(&imp->imp_recovery_waitq); } list_del_init(&req->rq_unreplied_list); spin_unlock(&imp->imp_lock); atomic_dec(&set->set_remaining); - wake_up_all(&imp->imp_recovery_waitq); + wake_up(&imp->imp_recovery_waitq); if (set->set_producer) { /* produce a new request if possible */ if (ptlrpc_set_producer(set) > 0) force_timer_recalc = 1; - /* free the request that has just been completed - * in order not to pollute set->set_requests */ + /* + * free the request that has just been completed + * in order not to pollute set->set_requests + */ list_del_init(&req->rq_set_chain); spin_lock(&req->rq_lock); req->rq_set = NULL; @@ -2084,8 +2232,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } } - /* move completed request at the head of list so it's easier for - * caller to find them */ + /* + * move completed request at the head of list so it's easier for + * caller to find them + */ list_splice(&comp_reqs, &set->set_requests); /* If we hit an error, we want to recover promptly. */ @@ -2103,14 +2253,14 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) struct obd_import *imp = req->rq_import; unsigned int debug_mask = D_RPCTRACE; int rc = 0; - ENTRY; + ENTRY; spin_lock(&req->rq_lock); req->rq_timedout = 1; spin_unlock(&req->rq_lock); if (ptlrpc_console_allow(req, lustre_msg_get_opc(req->rq_reqmsg), - lustre_msg_get_status(req->rq_reqmsg))) + lustre_msg_get_status(req->rq_reqmsg))) debug_mask = D_WARNING; DEBUG_REQ(debug_mask, req, "Request sent has %s: [sent %lld/real %lld]", req->rq_net_err ? "failed due to network error" : @@ -2118,64 +2268,66 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) req->rq_real_sent < req->rq_sent || req->rq_real_sent >= req->rq_deadline) ? "timed out for sent delay" : "timed out for slow reply"), - (s64)req->rq_sent, (s64)req->rq_real_sent); + req->rq_sent, req->rq_real_sent); - if (imp != NULL && obd_debug_peer_on_timeout) + if (imp && obd_debug_peer_on_timeout) LNetDebugPeer(imp->imp_connection->c_peer); - ptlrpc_unregister_reply(req, async_unlink); - ptlrpc_unregister_bulk(req, async_unlink); + ptlrpc_unregister_reply(req, async_unlink); + ptlrpc_unregister_bulk(req, async_unlink); - if (obd_dump_on_timeout) - libcfs_debug_dumplog(); + if (obd_dump_on_timeout) + libcfs_debug_dumplog(); - if (imp == NULL) { - DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?"); - RETURN(1); - } + if (!imp) { + DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?"); + RETURN(1); + } atomic_inc(&imp->imp_timeouts); - /* The DLM server doesn't want recovery run on its imports. */ - if (imp->imp_dlm_fake) - RETURN(1); - - /* If this request is for recovery or other primordial tasks, - * then error it out here. */ - if (req->rq_ctx_init || req->rq_ctx_fini || - req->rq_send_state != LUSTRE_IMP_FULL || - imp->imp_obd->obd_no_recov) { - DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)", - ptlrpc_import_state_name(req->rq_send_state), - ptlrpc_import_state_name(imp->imp_state)); + /* The DLM server doesn't want recovery run on its imports. */ + if (imp->imp_dlm_fake) + RETURN(1); + + /* + * If this request is for recovery or other primordial tasks, + * then error it out here. + */ + if (req->rq_ctx_init || req->rq_ctx_fini || + req->rq_send_state != LUSTRE_IMP_FULL || + imp->imp_obd->obd_no_recov) { + DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)", + ptlrpc_import_state_name(req->rq_send_state), + ptlrpc_import_state_name(imp->imp_state)); spin_lock(&req->rq_lock); req->rq_status = -ETIMEDOUT; req->rq_err = 1; spin_unlock(&req->rq_lock); RETURN(1); - } + } - /* if a request can't be resent we can't wait for an answer after - the timeout */ - if (ptlrpc_no_resend(req)) { - DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:"); - rc = 1; - } + /* + * if a request can't be resent we can't wait for an answer after + * the timeout + */ + if (ptlrpc_no_resend(req)) { + DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:"); + rc = 1; + } - ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg)); + ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg)); - RETURN(rc); + RETURN(rc); } /** * Time out all uncompleted requests in request set pointed by \a data - * Callback used when waiting on sets with l_wait_event. - * Always returns 1. + * This is called when a wait times out. */ -int ptlrpc_expired_set(void *data) +void ptlrpc_expired_set(struct ptlrpc_request_set *set) { - struct ptlrpc_request_set *set = data; - struct list_head *tmp; + struct ptlrpc_request *req; time64_t now = ktime_get_real_seconds(); ENTRY; @@ -2184,65 +2336,48 @@ int ptlrpc_expired_set(void *data) /* * A timeout expired. See which reqs it applies to... */ - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - - /* don't expire request waiting for context */ - if (req->rq_wait_ctx) - continue; - - /* Request in-flight? */ - if (!((req->rq_phase == RQ_PHASE_RPC && - !req->rq_waiting && !req->rq_resend) || - (req->rq_phase == RQ_PHASE_BULK))) - continue; - - if (req->rq_timedout || /* already dealt with */ - req->rq_deadline > now) /* not expired */ - continue; - - /* Deal with this guy. Do it asynchronously to not block - * ptlrpcd thread. */ - ptlrpc_expire_one_request(req, 1); - } - - /* - * When waiting for a whole set, we always break out of the - * sleep so we can recalculate the timeout, or enable interrupts - * if everyone's timed out. - */ - RETURN(1); -} + list_for_each_entry(req, &set->set_requests, rq_set_chain) { + /* don't expire request waiting for context */ + if (req->rq_wait_ctx) + continue; -/** - * Sets rq_intr flag in \a req under spinlock. - */ -void ptlrpc_mark_interrupted(struct ptlrpc_request *req) -{ - spin_lock(&req->rq_lock); - req->rq_intr = 1; - spin_unlock(&req->rq_lock); + /* Request in-flight? */ + if (!((req->rq_phase == RQ_PHASE_RPC && + !req->rq_waiting && !req->rq_resend) || + (req->rq_phase == RQ_PHASE_BULK))) + continue; + + if (req->rq_timedout || /* already dealt with */ + req->rq_deadline > now) /* not expired */ + continue; + + /* + * Deal with this guy. Do it asynchronously to not block + * ptlrpcd thread. + */ + ptlrpc_expire_one_request(req, 1); + /* + * Loops require that we resched once in a while to avoid + * RCU stalls and a few other problems. + */ + cond_resched(); + + } } -EXPORT_SYMBOL(ptlrpc_mark_interrupted); /** * Interrupts (sets interrupted flag) all uncompleted requests in - * a set \a data. Callback for l_wait_event for interruptible waits. + * a set \a data. This is called when a wait_event is interrupted + * by a signal. */ -static void ptlrpc_interrupted_set(void *data) +static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set) { - struct ptlrpc_request_set *set = data; - struct list_head *tmp; + struct ptlrpc_request *req; LASSERT(set != NULL); CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set); - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_intr) continue; @@ -2251,7 +2386,9 @@ static void ptlrpc_interrupted_set(void *data) !req->rq_allow_intr) continue; - ptlrpc_mark_interrupted(req); + spin_lock(&req->rq_lock); + req->rq_intr = 1; + spin_unlock(&req->rq_lock); } } @@ -2260,49 +2397,40 @@ static void ptlrpc_interrupted_set(void *data) */ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) { - struct list_head *tmp; time64_t now = ktime_get_real_seconds(); int timeout = 0; struct ptlrpc_request *req; time64_t deadline; ENTRY; - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - - /* - * Request in-flight? - */ - if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || - (req->rq_phase == RQ_PHASE_BULK) || - (req->rq_phase == RQ_PHASE_NEW))) - continue; - - /* - * Already timed out. - */ - if (req->rq_timedout) - continue; - - /* - * Waiting for ctx. - */ - if (req->rq_wait_ctx) - continue; - - if (req->rq_phase == RQ_PHASE_NEW) - deadline = req->rq_sent; + list_for_each_entry(req, &set->set_requests, rq_set_chain) { + /* Request in-flight? */ + if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || + (req->rq_phase == RQ_PHASE_BULK) || + (req->rq_phase == RQ_PHASE_NEW))) + continue; + + /* Already timed out. */ + if (req->rq_timedout) + continue; + + /* Waiting for ctx. */ + if (req->rq_wait_ctx) + continue; + + if (req->rq_phase == RQ_PHASE_NEW) + deadline = req->rq_sent; else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend) deadline = req->rq_sent; - else - deadline = req->rq_sent + req->rq_timeout; + else + deadline = req->rq_sent + req->rq_timeout; - if (deadline <= now) /* actually expired already */ - timeout = 1; /* ASAP */ - else if (timeout == 0 || timeout > deadline - now) - timeout = deadline - now; - } - RETURN(timeout); + if (deadline <= now) /* actually expired already */ + timeout = 1; /* ASAP */ + else if (timeout == 0 || timeout > deadline - now) + timeout = deadline - now; + } + RETURN(timeout); } /** @@ -2311,88 +2439,112 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) * error or otherwise be interrupted). * Returns 0 on success or error code otherwise. */ -int ptlrpc_set_wait(struct ptlrpc_request_set *set) +int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct ptlrpc_request *req; - struct l_wait_info lwi; + struct ptlrpc_request *req; time64_t timeout; int rc; - ENTRY; + ENTRY; if (set->set_producer) (void)ptlrpc_set_producer(set); else - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_phase == RQ_PHASE_NEW) (void)ptlrpc_send_new_req(req); } if (list_empty(&set->set_requests)) - RETURN(0); + RETURN(0); - do { - timeout = ptlrpc_set_next_timeout(set); + do { + timeout = ptlrpc_set_next_timeout(set); - /* wait until all complete, interrupted, or an in-flight - * req times out */ + /* + * wait until all complete, interrupted, or an in-flight + * req times out + */ CDEBUG(D_RPCTRACE, "set %p going to sleep for %lld seconds\n", - set, timeout); + set, timeout); if ((timeout == 0 && !signal_pending(current)) || - set->set_allow_intr) - /* No requests are in-flight (ether timed out + set->set_allow_intr) { + /* + * No requests are in-flight (ether timed out * or delayed), so we can allow interrupts. * We still want to block for a limited time, - * so we allow interrupts during the timeout. */ - lwi = LWI_TIMEOUT_INTR_ALL( - cfs_time_seconds(timeout ? timeout : 1), - ptlrpc_expired_set, - ptlrpc_interrupted_set, set); - else - /* - * At least one request is in flight, so no - * interrupts are allowed. Wait until all - * complete, or an in-flight req times out. - */ - lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1), - ptlrpc_expired_set, set); - - rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi); - - /* LU-769 - if we ignored the signal because it was already - * pending when we started, we need to handle it now or we risk - * it being ignored forever */ - if (rc == -ETIMEDOUT && - (!lwi.lwi_allow_intr || set->set_allow_intr) && - signal_pending(current)) { - sigset_t blocked_sigs = - cfs_block_sigsinv(LUSTRE_FATAL_SIGS); - - /* In fact we only interrupt for the "fatal" signals - * like SIGINT or SIGKILL. We still ignore less - * important signals since ptlrpc set is not easily - * reentrant from userspace again */ - if (signal_pending(current)) + * so we allow interrupts during the timeout. + */ + rc = l_wait_event_abortable_timeout( + set->set_waitq, + ptlrpc_check_set(NULL, set), + cfs_time_seconds(timeout ? timeout : 1)); + if (rc == 0) { + rc = -ETIMEDOUT; + ptlrpc_expired_set(set); + } else if (rc < 0) { + rc = -EINTR; ptlrpc_interrupted_set(set); - cfs_restore_sigs(blocked_sigs); + } else { + rc = 0; + } + } else { + /* + * At least one request is in flight, so no + * interrupts are allowed. Wait until all + * complete, or an in-flight req times out. + */ + rc = wait_event_idle_timeout( + set->set_waitq, + ptlrpc_check_set(NULL, set), + cfs_time_seconds(timeout ? timeout : 1)); + if (rc == 0) { + ptlrpc_expired_set(set); + rc = -ETIMEDOUT; + } else { + rc = 0; + } + + /* + * LU-769 - if we ignored the signal because + * it was already pending when we started, we + * need to handle it now or we risk it being + * ignored forever + */ + if (rc == -ETIMEDOUT && + signal_pending(current)) { + sigset_t old, new; + + siginitset(&new, LUSTRE_FATAL_SIGS); + sigprocmask(SIG_BLOCK, &new, &old); + /* + * In fact we only interrupt for the + * "fatal" signals like SIGINT or + * SIGKILL. We still ignore less + * important signals since ptlrpc set + * is not easily reentrant from + * userspace again + */ + if (signal_pending(current)) + ptlrpc_interrupted_set(set); + sigprocmask(SIG_SETMASK, &old, NULL); + } } - LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); + LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); - /* -EINTR => all requests have been flagged rq_intr so next - * check completes. - * -ETIMEDOUT => someone timed out. When all reqs have - * timed out, signals are enabled allowing completion with - * EINTR. - * I don't really care if we go once more round the loop in - * the error cases -eeb. */ + /* + * -EINTR => all requests have been flagged rq_intr so next + * check completes. + * -ETIMEDOUT => someone timed out. When all reqs have + * timed out, signals are enabled allowing completion with + * EINTR. + * I don't really care if we go once more round the loop in + * the error cases -eeb. + */ if (rc == 0 && atomic_read(&set->set_remaining) == 0) { - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, + rq_set_chain) { spin_lock(&req->rq_lock); req->rq_invalid_rqset = 1; spin_unlock(&req->rq_lock); @@ -2402,34 +2554,14 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) LASSERT(atomic_read(&set->set_remaining) == 0); - rc = set->set_rc; /* rq_status of already freed requests if any */ - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - - LASSERT(req->rq_phase == RQ_PHASE_COMPLETE); - if (req->rq_status != 0) - rc = req->rq_status; - } - - if (set->set_interpret != NULL) { - int (*interpreter)(struct ptlrpc_request_set *set,void *,int) = - set->set_interpret; - rc = interpreter (set, set->set_arg, rc); - } else { - struct ptlrpc_set_cbdata *cbdata, *n; - int err; - - list_for_each_entry_safe(cbdata, n, - &set->set_cblist, psc_item) { - list_del_init(&cbdata->psc_item); - err = cbdata->psc_interpret(set, cbdata->psc_data, rc); - if (err && !rc) - rc = err; - OBD_FREE_PTR(cbdata); - } - } - - RETURN(rc); + rc = set->set_rc; /* rq_status of already freed requests if any */ + list_for_each_entry(req, &set->set_requests, rq_set_chain) { + LASSERT(req->rq_phase == RQ_PHASE_COMPLETE); + if (req->rq_status != 0) + rc = req->rq_status; + } + + RETURN(rc); } EXPORT_SYMBOL(ptlrpc_set_wait); @@ -2445,7 +2577,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) { ENTRY; - if (request == NULL) + if (!request) RETURN_EXIT; LASSERT(!request->rq_srv_req); @@ -2457,16 +2589,18 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) req_capsule_fini(&request->rq_pill); - /* We must take it off the imp_replay_list first. Otherwise, we'll set - * request->rq_reqmsg to NULL while osc_close is dereferencing it. */ - if (request->rq_import != NULL) { + /* + * We must take it off the imp_replay_list first. Otherwise, we'll set + * request->rq_reqmsg to NULL while osc_close is dereferencing it. + */ + if (request->rq_import) { if (!locked) spin_lock(&request->rq_import->imp_lock); list_del_init(&request->rq_replay_list); list_del_init(&request->rq_unreplied_list); if (!locked) spin_unlock(&request->rq_import->imp_lock); - } + } LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request); if (atomic_read(&request->rq_refcount) != 0) { @@ -2475,25 +2609,29 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) LBUG(); } - if (request->rq_repbuf != NULL) - sptlrpc_cli_free_repbuf(request); + if (request->rq_repbuf) + sptlrpc_cli_free_repbuf(request); - if (request->rq_import != NULL) { - class_import_put(request->rq_import); - request->rq_import = NULL; - } - if (request->rq_bulk != NULL) + if (request->rq_import) { + if (!ptlrpcd_check_work(request)) { + LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0); + atomic_dec(&request->rq_import->imp_reqs); + } + class_import_put(request->rq_import); + request->rq_import = NULL; + } + if (request->rq_bulk) ptlrpc_free_bulk(request->rq_bulk); - if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL) - sptlrpc_cli_free_reqbuf(request); + if (request->rq_reqbuf || request->rq_clrbuf) + sptlrpc_cli_free_reqbuf(request); - if (request->rq_cli_ctx) - sptlrpc_req_put_ctx(request, !locked); + if (request->rq_cli_ctx) + sptlrpc_req_put_ctx(request, !locked); - if (request->rq_pool) - __ptlrpc_free_req_to_pool(request); - else + if (request->rq_pool) + __ptlrpc_free_req_to_pool(request); + else ptlrpc_request_cache_free(request); EXIT; } @@ -2521,8 +2659,8 @@ void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request) static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) { int count; - ENTRY; + ENTRY; if (!request) RETURN(1); @@ -2536,7 +2674,8 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) count = atomic_dec_return(&request->rq_refcount); LASSERTF(count >= 0, "Invalid ref count %d\n", count); - /* For open RPC, the client does not know the EA size (LOV, ACL, and + /* + * For open RPC, the client does not know the EA size (LOV, ACL, and * so on) before replied, then the client has to reserve very large * reply buffer. Such buffer will not be released until the RPC freed. * Since The open RPC is replayable, we need to keep it in the replay @@ -2546,7 +2685,8 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) * If fact, it is unnecessary to keep reply buffer for open replay, * related EAs have already been saved via mdc_save_lovea() before * coming here. So it is safe to free the reply buffer some earlier - * before releasing the RPC to avoid client OOM. LU-9514 */ + * before releasing the RPC to avoid client OOM. LU-9514 + */ if (count == 1 && request->rq_early_free_repbuf && request->rq_repbuf) { spin_lock(&request->rq_early_free_lock); sptlrpc_cli_free_repbuf(request); @@ -2569,7 +2709,7 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) */ void ptlrpc_req_finished(struct ptlrpc_request *request) { - __ptlrpc_req_finished(request, 0); + __ptlrpc_req_finished(request, 0); } EXPORT_SYMBOL(ptlrpc_req_finished); @@ -2578,7 +2718,7 @@ EXPORT_SYMBOL(ptlrpc_req_finished); */ __u64 ptlrpc_req_xid(struct ptlrpc_request *request) { - return request->rq_xid; + return request->rq_xid; } EXPORT_SYMBOL(ptlrpc_req_xid); @@ -2591,9 +2731,7 @@ EXPORT_SYMBOL(ptlrpc_req_xid); */ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) { - int rc; - struct l_wait_info lwi; - + bool discard = false; /* * Might sleep. */ @@ -2603,60 +2741,66 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && async && request->rq_reply_deadline == 0 && cfs_fail_val == 0) request->rq_reply_deadline = ktime_get_real_seconds() + - LONG_UNLINK; + PTLRPC_REQ_LONG_UNLINK; - /* - * Nothing left to do. - */ - if (!ptlrpc_client_recv_or_unlink(request)) - RETURN(1); + /* + * Nothing left to do. + */ + if (!__ptlrpc_cli_wait_unlink(request, &discard)) + RETURN(1); + + LNetMDUnlink(request->rq_reply_md_h); - LNetMDUnlink(request->rq_reply_md_h); + if (discard) /* Discard the request-out callback */ + __LNetMDUnlink(request->rq_req_md_h, discard); - /* - * Let's check it once again. - */ - if (!ptlrpc_client_recv_or_unlink(request)) - RETURN(1); + /* + * Let's check it once again. + */ + if (!ptlrpc_cli_wait_unlink(request)) + RETURN(1); /* Move to "Unregistering" phase as reply was not unlinked yet. */ ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC); - /* - * Do not wait for unlink to finish. - */ - if (async) - RETURN(0); - - /* - * We have to l_wait_event() whatever the result, to give liblustre - * a chance to run reply_in_callback(), and to make sure we've - * unlinked before returning a req to the pool. - */ - for (;;) { - /* The wq argument is ignored by user-space wait_event macros */ - wait_queue_head_t *wq = (request->rq_set != NULL) ? + /* + * Do not wait for unlink to finish. + */ + if (async) + RETURN(0); + + /* + * We have to wait_event_idle_timeout() whatever the result, to get + * a chance to run reply_in_callback(), and to make sure we've + * unlinked before returning a req to the pool. + */ + for (;;) { + wait_queue_head_t *wq = (request->rq_set) ? &request->rq_set->set_waitq : &request->rq_reply_waitq; - /* Network access will complete in finite time but the HUGE - * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), - cfs_time_seconds(1), NULL, NULL); - rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), - &lwi); - if (rc == 0) { - ptlrpc_rqphase_move(request, request->rq_next_phase); - RETURN(1); - } - - LASSERT(rc == -ETIMEDOUT); - DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout " - "receiving_reply=%d req_ulinked=%d reply_unlinked=%d", + int seconds = PTLRPC_REQ_LONG_UNLINK; + /* + * Network access will complete in finite time but the HUGE + * timeout lets us CWARN for visibility of sluggish NALs + */ + while (seconds > 0 && + wait_event_idle_timeout( + *wq, + !ptlrpc_cli_wait_unlink(request), + cfs_time_seconds(1)) == 0) + seconds -= 1; + if (seconds > 0) { + ptlrpc_rqphase_move(request, request->rq_next_phase); + RETURN(1); + } + + DEBUG_REQ(D_WARNING, request, + "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d", request->rq_receiving_reply, request->rq_req_unlinked, request->rq_reply_unlinked); - } - RETURN(0); + } + RETURN(0); } static void ptlrpc_free_request(struct ptlrpc_request *req) @@ -2665,7 +2809,7 @@ static void ptlrpc_free_request(struct ptlrpc_request *req) req->rq_replay = 0; spin_unlock(&req->rq_lock); - if (req->rq_commit_cb != NULL) + if (req->rq_commit_cb) req->rq_commit_cb(req); list_del_init(&req->rq_replay_list); @@ -2677,7 +2821,7 @@ static void ptlrpc_free_request(struct ptlrpc_request *req) */ void ptlrpc_request_committed(struct ptlrpc_request *req, int force) { - struct obd_import *imp = req->rq_import; + struct obd_import *imp = req->rq_import; spin_lock(&imp->imp_lock); if (list_empty(&req->rq_replay_list)) { @@ -2685,8 +2829,11 @@ void ptlrpc_request_committed(struct ptlrpc_request *req, int force) return; } - if (force || req->rq_transno <= imp->imp_peer_committed_transno) + if (force || req->rq_transno <= imp->imp_peer_committed_transno) { + if (imp->imp_replay_cursor == &req->rq_replay_list) + imp->imp_replay_cursor = req->rq_replay_list.next; ptlrpc_free_request(req); + } spin_unlock(&imp->imp_lock); } @@ -2702,64 +2849,64 @@ EXPORT_SYMBOL(ptlrpc_request_committed); */ void ptlrpc_free_committed(struct obd_import *imp) { - struct ptlrpc_request *req, *saved; - struct ptlrpc_request *last_req = NULL; /* temporary fire escape */ - bool skip_committed_list = true; - ENTRY; + struct ptlrpc_request *req, *saved; + struct ptlrpc_request *last_req = NULL; /* temporary fire escape */ + bool skip_committed_list = true; + ENTRY; LASSERT(imp != NULL); assert_spin_locked(&imp->imp_lock); - if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked && - imp->imp_generation == imp->imp_last_generation_checked) { + if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked && + imp->imp_generation == imp->imp_last_generation_checked) { CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n", - imp->imp_obd->obd_name, imp->imp_peer_committed_transno); + imp->imp_obd->obd_name, imp->imp_peer_committed_transno); RETURN_EXIT; - } + } CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n", - imp->imp_obd->obd_name, imp->imp_peer_committed_transno, - imp->imp_generation); + imp->imp_obd->obd_name, imp->imp_peer_committed_transno, + imp->imp_generation); if (imp->imp_generation != imp->imp_last_generation_checked || imp->imp_last_transno_checked == 0) skip_committed_list = false; - imp->imp_last_transno_checked = imp->imp_peer_committed_transno; - imp->imp_last_generation_checked = imp->imp_generation; + imp->imp_last_transno_checked = imp->imp_peer_committed_transno; + imp->imp_last_generation_checked = imp->imp_generation; list_for_each_entry_safe(req, saved, &imp->imp_replay_list, - rq_replay_list) { - /* XXX ok to remove when 1357 resolved - rread 05/29/03 */ - LASSERT(req != last_req); - last_req = req; - - if (req->rq_transno == 0) { - DEBUG_REQ(D_EMERG, req, "zero transno during replay"); - LBUG(); - } - if (req->rq_import_generation < imp->imp_generation) { - DEBUG_REQ(D_RPCTRACE, req, "free request with old gen"); - GOTO(free_req, 0); - } - - /* not yet committed */ - if (req->rq_transno > imp->imp_peer_committed_transno) { - DEBUG_REQ(D_RPCTRACE, req, "stopping search"); - break; - } + rq_replay_list) { + /* XXX ok to remove when 1357 resolved - rread 05/29/03 */ + LASSERT(req != last_req); + last_req = req; + + if (req->rq_transno == 0) { + DEBUG_REQ(D_EMERG, req, "zero transno during replay"); + LBUG(); + } + if (req->rq_import_generation < imp->imp_generation) { + DEBUG_REQ(D_RPCTRACE, req, "free request with old gen"); + GOTO(free_req, 0); + } + + /* not yet committed */ + if (req->rq_transno > imp->imp_peer_committed_transno) { + DEBUG_REQ(D_RPCTRACE, req, "stopping search"); + break; + } if (req->rq_replay) { DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)"); list_move_tail(&req->rq_replay_list, - &imp->imp_committed_list); + &imp->imp_committed_list); continue; } DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)", - imp->imp_peer_committed_transno); + imp->imp_peer_committed_transno); free_req: ptlrpc_free_request(req); - } + } if (skip_committed_list) GOTO(out, 0); @@ -2781,13 +2928,13 @@ free_req: } } out: - EXIT; + EXIT; } void ptlrpc_cleanup_client(struct obd_import *imp) { - ENTRY; - EXIT; + ENTRY; + EXIT; } /** @@ -2798,25 +2945,26 @@ void ptlrpc_cleanup_client(struct obd_import *imp) */ void ptlrpc_resend_req(struct ptlrpc_request *req) { - DEBUG_REQ(D_HA, req, "going to resend"); + DEBUG_REQ(D_HA, req, "going to resend"); spin_lock(&req->rq_lock); - /* Request got reply but linked to the import list still. - Let ptlrpc_check_set() to process it. */ + /* + * Request got reply but linked to the import list still. + * Let ptlrpc_check_set() process it. + */ if (ptlrpc_client_replied(req)) { spin_unlock(&req->rq_lock); DEBUG_REQ(D_HA, req, "it has reply, so skip it"); return; } - lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 }); - req->rq_status = -EAGAIN; + req->rq_status = -EAGAIN; - req->rq_resend = 1; - req->rq_net_err = 0; - req->rq_timedout = 0; + req->rq_resend = 1; + req->rq_net_err = 0; + req->rq_timedout = 0; - ptlrpc_client_wake_req(req); + ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -2849,20 +2997,22 @@ EXPORT_SYMBOL(ptlrpc_request_addref); * Must be called under imp_lock */ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, - struct obd_import *imp) + struct obd_import *imp) { - struct list_head *tmp; + struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); - if (req->rq_transno == 0) { - DEBUG_REQ(D_EMERG, req, "saving request with zero transno"); - LBUG(); - } + if (req->rq_transno == 0) { + DEBUG_REQ(D_EMERG, req, "saving request with zero transno"); + LBUG(); + } - /* clear this for new requests that were resent as well - as resent replayed requests. */ - lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); + /* + * clear this for new requests that were resent as well + * as resent replayed requests. + */ + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); /* don't re-add requests that have been replayed */ if (!list_empty(&req->rq_replay_list)) @@ -2877,25 +3027,23 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, LASSERT(imp->imp_replayable); /* Balanced in ptlrpc_free_committed, usually. */ ptlrpc_request_addref(req); - list_for_each_prev(tmp, &imp->imp_replay_list) { - struct ptlrpc_request *iter = list_entry(tmp, - struct ptlrpc_request, - rq_replay_list); - - /* We may have duplicate transnos if we create and then - * open a file, or for closes retained if to match creating - * opens, so use req->rq_xid as a secondary key. - * (See bugs 684, 685, and 428.) - * XXX no longer needed, but all opens need transnos! - */ - if (iter->rq_transno > req->rq_transno) - continue; - - if (iter->rq_transno == req->rq_transno) { - LASSERT(iter->rq_xid != req->rq_xid); - if (iter->rq_xid > req->rq_xid) - continue; - } + list_for_each_entry_reverse(iter, &imp->imp_replay_list, + rq_replay_list) { + /* + * We may have duplicate transnos if we create and then + * open a file, or for closes retained if to match creating + * opens, so use req->rq_xid as a secondary key. + * (See bugs 684, 685, and 428.) + * XXX no longer needed, but all opens need transnos! + */ + if (iter->rq_transno > req->rq_transno) + continue; + + if (iter->rq_transno == req->rq_transno) { + LASSERT(iter->rq_xid != req->rq_xid); + if (iter->rq_xid > req->rq_xid) + continue; + } list_add(&req->rq_replay_list, &iter->rq_replay_list); return; @@ -2910,29 +3058,29 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, */ int ptlrpc_queue_wait(struct ptlrpc_request *req) { - struct ptlrpc_request_set *set; - int rc; - ENTRY; + struct ptlrpc_request_set *set; + int rc; - LASSERT(req->rq_set == NULL); - LASSERT(!req->rq_receiving_reply); + ENTRY; + LASSERT(req->rq_set == NULL); + LASSERT(!req->rq_receiving_reply); set = ptlrpc_prep_set(); - if (set == NULL) { + if (!set) { CERROR("cannot allocate ptlrpc set: rc = %d\n", -ENOMEM); RETURN(-ENOMEM); } /* for distributed debugging */ - lustre_msg_set_status(req->rq_reqmsg, current_pid()); + lustre_msg_set_status(req->rq_reqmsg, current->pid); - /* add a ref for the set (see comment in ptlrpc_set_add_req) */ - ptlrpc_request_addref(req); - ptlrpc_set_add_req(set, req); - rc = ptlrpc_set_wait(set); - ptlrpc_set_destroy(set); + /* add a ref for the set (see comment in ptlrpc_set_add_req) */ + ptlrpc_request_addref(req); + ptlrpc_set_add_req(set, req); + rc = ptlrpc_set_wait(NULL, set); + ptlrpc_set_destroy(set); - RETURN(rc); + RETURN(rc); } EXPORT_SYMBOL(ptlrpc_queue_wait); @@ -2943,81 +3091,81 @@ EXPORT_SYMBOL(ptlrpc_queue_wait); */ static int ptlrpc_replay_interpret(const struct lu_env *env, struct ptlrpc_request *req, - void * data, int rc) + void *args, int rc) { - struct ptlrpc_replay_async_args *aa = data; + struct ptlrpc_replay_async_args *aa = args; struct obd_import *imp = req->rq_import; ENTRY; atomic_dec(&imp->imp_replay_inflight); - /* Note: if it is bulk replay (MDS-MDS replay), then even if + /* + * Note: if it is bulk replay (MDS-MDS replay), then even if * server got the request, but bulk transfer timeout, let's - * replay the bulk req again */ + * replay the bulk req again + */ if (!ptlrpc_client_replied(req) || - (req->rq_bulk != NULL && + (req->rq_bulk && lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) { - DEBUG_REQ(D_ERROR, req, "request replay timed out.\n"); + DEBUG_REQ(D_ERROR, req, "request replay timed out"); GOTO(out, rc = -ETIMEDOUT); } - if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR && - (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN || - lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) - GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg)); + if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR && + (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN || + lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) + GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg)); - /** VBR: check version failure */ - if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { + /** VBR: check version failure */ + if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { /** replay was failed due to version mismatch */ - DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n"); + DEBUG_REQ(D_WARNING, req, "Version mismatch during replay"); spin_lock(&imp->imp_lock); imp->imp_vbr_failed = 1; - imp->imp_no_lock_replay = 1; spin_unlock(&imp->imp_lock); lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status); - } else { - /** The transno had better not change over replay. */ - LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) == - lustre_msg_get_transno(req->rq_repmsg) || - lustre_msg_get_transno(req->rq_repmsg) == 0, + } else { + /** The transno had better not change over replay. */ + LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) == + lustre_msg_get_transno(req->rq_repmsg) || + lustre_msg_get_transno(req->rq_repmsg) == 0, "%#llx/%#llx\n", - lustre_msg_get_transno(req->rq_reqmsg), - lustre_msg_get_transno(req->rq_repmsg)); - } + lustre_msg_get_transno(req->rq_reqmsg), + lustre_msg_get_transno(req->rq_repmsg)); + } spin_lock(&imp->imp_lock); - /** if replays by version then gap occur on server, no trust to locks */ - if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY) - imp->imp_no_lock_replay = 1; imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg); spin_unlock(&imp->imp_lock); - LASSERT(imp->imp_last_replay_transno); - - /* transaction number shouldn't be bigger than the latest replayed */ - if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { - DEBUG_REQ(D_ERROR, req, - "Reported transno %llu is bigger than the " - "replayed one: %llu", req->rq_transno, - lustre_msg_get_transno(req->rq_reqmsg)); - GOTO(out, rc = -EINVAL); - } - - DEBUG_REQ(D_HA, req, "got rep"); - - /* let the callback do fixups, possibly including in the request */ - if (req->rq_replay_cb) - req->rq_replay_cb(req); - - if (ptlrpc_client_replied(req) && - lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) { - DEBUG_REQ(D_ERROR, req, "status %d, old was %d", - lustre_msg_get_status(req->rq_repmsg), - aa->praa_old_status); - - /* Note: If the replay fails for MDT-MDT recovery, let's + LASSERT(imp->imp_last_replay_transno); + + /* transaction number shouldn't be bigger than the latest replayed */ + if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { + DEBUG_REQ(D_ERROR, req, + "Reported transno=%llu is bigger than replayed=%llu", + req->rq_transno, + lustre_msg_get_transno(req->rq_reqmsg)); + GOTO(out, rc = -EINVAL); + } + + DEBUG_REQ(D_HA, req, "got reply"); + + /* let the callback do fixups, possibly including in the request */ + if (req->rq_replay_cb) + req->rq_replay_cb(req); + + if (ptlrpc_client_replied(req) && + lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) { + DEBUG_REQ(D_ERROR, req, "status %d, old was %d", + lustre_msg_get_status(req->rq_repmsg), + aa->praa_old_status); + + /* + * Note: If the replay fails for MDT-MDT recovery, let's * abort all of the following requests in the replay * and sending list, because MDT-MDT update requests - * are dependent on each other, see LU-7039 */ + * are dependent on each other, see LU-7039 + */ if (imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) { struct ptlrpc_request *free_req; struct ptlrpc_request *tmp; @@ -3036,8 +3184,8 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, } list_for_each_entry_safe(free_req, tmp, - &imp->imp_delayed_list, - rq_list) { + &imp->imp_delayed_list, + rq_list) { spin_lock(&free_req->rq_lock); free_req->rq_err = 1; free_req->rq_status = -EIO; @@ -3046,8 +3194,8 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, } list_for_each_entry_safe(free_req, tmp, - &imp->imp_sending_list, - rq_list) { + &imp->imp_sending_list, + rq_list) { spin_lock(&free_req->rq_lock); free_req->rq_err = 1; free_req->rq_status = -EIO; @@ -3056,28 +3204,28 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, } spin_unlock(&imp->imp_lock); } - } else { - /* Put it back for re-replay. */ - lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status); - } - - /* - * Errors while replay can set transno to 0, but - * imp_last_replay_transno shouldn't be set to 0 anyway - */ - if (req->rq_transno == 0) - CERROR("Transno is 0 during replay!\n"); - - /* continue with recovery */ - rc = ptlrpc_import_recovery_state_machine(imp); + } else { + /* Put it back for re-replay. */ + lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status); + } + + /* + * Errors while replay can set transno to 0, but + * imp_last_replay_transno shouldn't be set to 0 anyway + */ + if (req->rq_transno == 0) + CERROR("Transno is 0 during replay!\n"); + + /* continue with recovery */ + rc = ptlrpc_import_recovery_state_machine(imp); out: - req->rq_send_state = aa->praa_old_state; + req->rq_send_state = aa->praa_old_state; - if (rc != 0) - /* this replay failed, so restart recovery */ - ptlrpc_connect_import(imp); + if (rc != 0) + /* this replay failed, so restart recovery */ + ptlrpc_connect_import(imp); - RETURN(rc); + RETURN(rc); } /** @@ -3093,33 +3241,31 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); memset(aa, 0, sizeof(*aa)); - /* Prepare request to be resent with ptlrpcd */ - aa->praa_old_state = req->rq_send_state; - req->rq_send_state = LUSTRE_IMP_REPLAY; - req->rq_phase = RQ_PHASE_NEW; - req->rq_next_phase = RQ_PHASE_UNDEFINED; - if (req->rq_repmsg) - aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg); - req->rq_status = 0; - req->rq_interpret_reply = ptlrpc_replay_interpret; - /* Readjust the timeout for current conditions */ - ptlrpc_at_set_req_timeout(req); - - /* Tell server the net_latency, so the server can calculate how long - * it should wait for next replay */ - lustre_msg_set_service_time(req->rq_reqmsg, - ptlrpc_at_get_net_latency(req)); - DEBUG_REQ(D_HA, req, "REPLAY"); + /* Prepare request to be resent with ptlrpcd */ + aa->praa_old_state = req->rq_send_state; + req->rq_send_state = LUSTRE_IMP_REPLAY; + req->rq_phase = RQ_PHASE_NEW; + req->rq_next_phase = RQ_PHASE_UNDEFINED; + if (req->rq_repmsg) + aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg); + req->rq_status = 0; + req->rq_interpret_reply = ptlrpc_replay_interpret; + /* Readjust the timeout for current conditions */ + ptlrpc_at_set_req_timeout(req); + + /* Tell server net_latency to calculate how long to wait for reply. */ + lustre_msg_set_service_timeout(req->rq_reqmsg, + ptlrpc_at_get_net_latency(req)); + DEBUG_REQ(D_HA, req, "REPLAY"); atomic_inc(&req->rq_import->imp_replay_inflight); spin_lock(&req->rq_lock); req->rq_early_free_repbuf = 0; spin_unlock(&req->rq_lock); - ptlrpc_request_addref(req); /* ptlrpcd needs a ref */ + ptlrpc_request_addref(req); /* ptlrpcd needs a ref */ ptlrpcd_add_req(req); RETURN(0); @@ -3130,25 +3276,23 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) */ void ptlrpc_abort_inflight(struct obd_import *imp) { - struct list_head *tmp, *n; + struct ptlrpc_request *req; ENTRY; - /* Make sure that no new requests get processed for this import. + /* + * Make sure that no new requests get processed for this import. * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing * this flag and then putting requests on sending_list or delayed_list. */ - spin_lock(&imp->imp_lock); + assert_spin_locked(&imp->imp_lock); - /* XXX locking? Maybe we should remove each request with the list + /* + * XXX locking? Maybe we should remove each request with the list * locked? Also, how do we know if the requests on the list are * being freed at this time? */ - list_for_each_safe(tmp, n, &imp->imp_sending_list) { - struct ptlrpc_request *req = list_entry(tmp, - struct ptlrpc_request, - rq_list); - - DEBUG_REQ(D_RPCTRACE, req, "inflight"); + list_for_each_entry(req, &imp->imp_sending_list, rq_list) { + DEBUG_REQ(D_RPCTRACE, req, "inflight"); spin_lock(&req->rq_lock); if (req->rq_import_generation < imp->imp_generation) { @@ -3159,10 +3303,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) spin_unlock(&req->rq_lock); } - list_for_each_safe(tmp, n, &imp->imp_delayed_list) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_list); - + list_for_each_entry(req, &imp->imp_delayed_list, rq_list) { DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req"); spin_lock(&req->rq_lock); @@ -3174,13 +3315,13 @@ void ptlrpc_abort_inflight(struct obd_import *imp) spin_unlock(&req->rq_lock); } - /* Last chance to free reqs left on the replay list, but we - * will still leak reqs that haven't committed. */ + /* + * Last chance to free reqs left on the replay list, but we + * will still leak reqs that haven't committed. + */ if (imp->imp_replayable) ptlrpc_free_committed(imp); - spin_unlock(&imp->imp_lock); - EXIT; } @@ -3189,15 +3330,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp) */ void ptlrpc_abort_set(struct ptlrpc_request_set *set) { - struct list_head *tmp, *pos; + struct ptlrpc_request *req; LASSERT(set != NULL); - list_for_each_safe(pos, tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(pos, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { spin_lock(&req->rq_lock); if (req->rq_phase != RQ_PHASE_RPC) { spin_unlock(&req->rq_lock); @@ -3211,9 +3348,6 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set) } } -static __u64 ptlrpc_last_xid; -static spinlock_t ptlrpc_last_xid_lock; - /** * Initialize the XID for the node. This is common among all requests on * this node, and only requires the property that it is monotonically @@ -3233,19 +3367,21 @@ static spinlock_t ptlrpc_last_xid_lock; void ptlrpc_init_xid(void) { time64_t now = ktime_get_real_seconds(); + u64 xid; - spin_lock_init(&ptlrpc_last_xid_lock); if (now < YEAR_2004) { - cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid)); - ptlrpc_last_xid >>= 2; - ptlrpc_last_xid |= (1ULL << 61); + get_random_bytes(&xid, sizeof(xid)); + xid >>= 2; + xid |= (1ULL << 61); } else { - ptlrpc_last_xid = (__u64)now << 20; + xid = (u64)now << 20; } /* Need to always be aligned to a power-of-two for mutli-bulk BRW */ - CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0); - ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK; + BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) != + 0); + xid &= PTLRPC_BULK_OPS_MASK; + atomic64_set(&ptlrpc_last_xid, xid); } /** @@ -3262,14 +3398,7 @@ void ptlrpc_init_xid(void) */ __u64 ptlrpc_next_xid(void) { - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - ptlrpc_last_xid = next; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; + return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid); } /** @@ -3285,21 +3414,28 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) LASSERT(bd != NULL); - /* Generate new matchbits for all resend requests, including - * resend replay. */ + /* + * Generate new matchbits for all resend requests, including + * resend replay. + */ if (req->rq_resend) { __u64 old_mbits = req->rq_mbits; - /* First time resend on -EINPROGRESS will generate new xid, + /* + * First time resend on -EINPROGRESS will generate new xid, * so we can actually use the rq_xid as rq_mbits in such case, * however, it's bit hard to distinguish such resend with a * 'resend for the -EINPROGRESS resend'. To make it simple, - * we opt to generate mbits for all resend cases. */ - if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)){ + * we opt to generate mbits for all resend cases. + */ + if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, + BULK_MBITS)) { req->rq_mbits = ptlrpc_next_xid(); } else { - /* Old version transfers rq_xid to peer as - * matchbits. */ + /* + * Old version transfers rq_xid to peer as + * matchbits. + */ spin_lock(&req->rq_import->imp_lock); list_del_init(&req->rq_unreplied_list); ptlrpc_assign_next_xid_nolock(req); @@ -3310,24 +3446,34 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) old_mbits, req->rq_mbits); } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { /* Request being sent first time, use xid as matchbits. */ - req->rq_mbits = req->rq_xid; + if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS) + || req->rq_mbits == 0) { + req->rq_mbits = req->rq_xid; + } else { + req->rq_mbits -= bd->bd_md_count - 1; + } } else { - /* Replay request, xid and matchbits have already been - * correctly assigned. */ + /* + * Replay request, xid and matchbits have already been + * correctly assigned. + */ return; } - /* For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so + /* + * For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so * that server can infer the number of bulks that were prepared, - * see LU-1431 */ - req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV) - 1; + * see LU-1431 + */ + req->rq_mbits += bd->bd_md_count - 1; - /* Set rq_xid as rq_mbits to indicate the final bulk for the old + /* + * Set rq_xid as rq_mbits to indicate the final bulk for the old * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808. * * It's ok to directly set the rq_xid here, since this xid bump - * won't affect the request position in unreplied list. */ + * won't affect the request position in unreplied list. + */ if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)) req->rq_xid = req->rq_mbits; } @@ -3338,19 +3484,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) */ __u64 ptlrpc_sample_next_xid(void) { -#if BITS_PER_LONG == 32 - /* need to avoid possible word tearing on 32-bit systems */ - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; -#else - /* No need to lock, since returned value is racy anyways */ - return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; -#endif + return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT; } EXPORT_SYMBOL(ptlrpc_sample_next_xid); @@ -3372,8 +3506,8 @@ EXPORT_SYMBOL(ptlrpc_sample_next_xid); * have delay before it really runs by ptlrpcd thread. */ struct ptlrpc_work_async_args { - int (*cb)(const struct lu_env *, void *); - void *cbdata; + int (*cb)(const struct lu_env *, void *); + void *cbdata; }; static void ptlrpcd_add_work_req(struct ptlrpc_request *req) @@ -3391,9 +3525,9 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req) } static int work_interpreter(const struct lu_env *env, - struct ptlrpc_request *req, void *data, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct ptlrpc_work_async_args *arg = data; + struct ptlrpc_work_async_args *arg = args; LASSERT(ptlrpcd_check_work(req)); LASSERT(arg->cb != NULL); @@ -3423,18 +3557,18 @@ static int ptlrpcd_check_work(struct ptlrpc_request *req) void *ptlrpcd_alloc_work(struct obd_import *imp, int (*cb)(const struct lu_env *, void *), void *cbdata) { - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req = NULL; struct ptlrpc_work_async_args *args; - ENTRY; + ENTRY; might_sleep(); - if (cb == NULL) + if (!cb) RETURN(ERR_PTR(-EINVAL)); /* copy some code from deprecated fakereq. */ req = ptlrpc_request_cache_alloc(GFP_NOFS); - if (req == NULL) { + if (!req) { CERROR("ptlrpc: run out of memory!\n"); RETURN(ERR_PTR(-ENOMEM)); } @@ -3449,8 +3583,7 @@ void *ptlrpcd_alloc_work(struct obd_import *imp, req->rq_no_delay = req->rq_no_resend = 1; req->rq_pill.rc_fmt = (void *)&worker_format; - CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args)); - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->cb = cb; args->cbdata = cbdata; @@ -3460,10 +3593,10 @@ EXPORT_SYMBOL(ptlrpcd_alloc_work); void ptlrpcd_destroy_work(void *handler) { - struct ptlrpc_request *req = handler; + struct ptlrpc_request *req = handler; - if (req) - ptlrpc_req_finished(req); + if (req) + ptlrpc_req_finished(req); } EXPORT_SYMBOL(ptlrpcd_destroy_work); @@ -3471,14 +3604,14 @@ int ptlrpcd_queue_work(void *handler) { struct ptlrpc_request *req = handler; - /* - * Check if the req is already being queued. - * - * Here comes a trick: it lacks a way of checking if a req is being - * processed reliably in ptlrpc. Here I have to use refcount of req - * for this purpose. This is okay because the caller should use this - * req as opaque data. - Jinshan - */ + /* + * Check if the req is already being queued. + * + * Here comes a trick: it lacks a way of checking if a req is being + * processed reliably in ptlrpc. Here I have to use refcount of req + * for this purpose. This is okay because the caller should use this + * req as opaque data. - Jinshan + */ LASSERT(atomic_read(&req->rq_refcount) > 0); if (atomic_inc_return(&req->rq_refcount) == 2) ptlrpcd_add_work_req(req);