X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=aacfb39379f800ebddf25320439a25844100cb68;hb=7462e8cad730897f459da31886c57585654f26b8;hp=378916256a54f98c88a653702ab8a9dd5e159f97;hpb=fb18c05c0f5ee50bbd782334744028c912658c70;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 3789162..aacfb39 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -37,6 +37,7 @@ #include #include +#include #include #include #include @@ -65,7 +66,29 @@ static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc) int i; for (i = 0; i < desc->bd_iov_count ; i++) - put_page(BD_GET_KIOV(desc, i).kiov_page); + put_page(desc->bd_vec[i].bv_page); +} + +static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc, + void *frag, int len) +{ + unsigned int offset = (unsigned long)frag & ~PAGE_MASK; + + ENTRY; + while (len > 0) { + int page_len = min_t(unsigned int, PAGE_SIZE - offset, + len); + unsigned long vaddr = (unsigned long)frag; + + ptlrpc_prep_bulk_page_nopin(desc, + lnet_kvaddr_to_page(vaddr), + offset, page_len); + offset = 0; + len -= page_len; + frag += page_len; + } + + RETURN(desc->bd_nob); } const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = { @@ -77,14 +100,10 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops); const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_nopin, .release_frags = ptlrpc_release_bulk_noop, + .add_iov_frag = ptlrpc_prep_bulk_frag_pages, }; EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops); -const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = { - .add_iov_frag = ptlrpc_prep_bulk_frag, -}; -EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops); - static int ptlrpc_send_new_req(struct ptlrpc_request *req); static int ptlrpcd_check_work(struct ptlrpc_request *req); static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async); @@ -148,26 +167,22 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, struct ptlrpc_bulk_desc *desc; int i; - /* ensure that only one of KIOV or IOVEC is set but not both */ - LASSERT((ptlrpc_is_bulk_desc_kiov(type) && - ops->add_kiov_frag != NULL) || - (ptlrpc_is_bulk_desc_kvec(type) && - ops->add_iov_frag != NULL)); + LASSERT(ops->add_kiov_frag != NULL); + + if (max_brw > PTLRPC_BULK_OPS_COUNT) + RETURN(NULL); + + if (nfrags > LNET_MAX_IOV * max_brw) + RETURN(NULL); OBD_ALLOC_PTR(desc); if (!desc) return NULL; - if (type & PTLRPC_BULK_BUF_KIOV) { - OBD_ALLOC_LARGE(GET_KIOV(desc), - nfrags * sizeof(*GET_KIOV(desc))); - if (!GET_KIOV(desc)) - goto out; - } else { - OBD_ALLOC_LARGE(GET_KVEC(desc), - nfrags * sizeof(*GET_KVEC(desc))); - if (!GET_KVEC(desc)) - goto out; - } + + OBD_ALLOC_LARGE(desc->bd_vec, + nfrags * sizeof(*desc->bd_vec)); + if (!desc->bd_vec) + goto out; spin_lock_init(&desc->bd_lock); init_waitqueue_head(&desc->bd_waitq); @@ -176,6 +191,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, desc->bd_portal = portal; desc->bd_type = type; desc->bd_md_count = 0; + desc->bd_nob_last = LNET_MTU; desc->bd_frag_ops = ops; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); @@ -217,7 +233,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, if (!desc) RETURN(NULL); - desc->bd_import_generation = req->rq_import_generation; desc->bd_import = class_import_get(imp); desc->bd_req = req; @@ -235,67 +250,49 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page, int pageoffset, int len, int pin) { - lnet_kiov_t *kiov; + struct bio_vec *kiov; LASSERT(desc->bd_iov_count < desc->bd_max_iov); LASSERT(page != NULL); LASSERT(pageoffset >= 0); LASSERT(len > 0); LASSERT(pageoffset + len <= PAGE_SIZE); - LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type)); - kiov = &BD_GET_KIOV(desc, desc->bd_iov_count); + kiov = &desc->bd_vec[desc->bd_iov_count]; + + if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) || + ((desc->bd_nob_last + len) > LNET_MTU)) { + desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count; + desc->bd_md_count++; + desc->bd_nob_last = 0; + LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT); + } + desc->bd_nob_last += len; desc->bd_nob += len; if (pin) get_page(page); - kiov->kiov_page = page; - kiov->kiov_offset = pageoffset; - kiov->kiov_len = len; + kiov->bv_page = page; + kiov->bv_offset = pageoffset; + kiov->bv_len = len; desc->bd_iov_count++; } EXPORT_SYMBOL(__ptlrpc_prep_bulk_page); -int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc, - void *frag, int len) -{ - struct kvec *iovec; - - ENTRY; - - LASSERT(desc->bd_iov_count < desc->bd_max_iov); - LASSERT(frag != NULL); - LASSERT(len > 0); - LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type)); - - iovec = &BD_GET_KVEC(desc, desc->bd_iov_count); - - desc->bd_nob += len; - - iovec->iov_base = frag; - iovec->iov_len = len; - - desc->bd_iov_count++; - - RETURN(desc->bd_nob); -} -EXPORT_SYMBOL(ptlrpc_prep_bulk_frag); - void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) { ENTRY; LASSERT(desc != NULL); LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */ - LASSERT(desc->bd_md_count == 0); /* network hands off */ + LASSERT(desc->bd_refs == 0); /* network hands off */ LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL)); LASSERT(desc->bd_frag_ops != NULL); - if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) - sptlrpc_enc_pool_put_pages(desc); + sptlrpc_enc_pool_put_pages(desc); if (desc->bd_export) class_export_put(desc->bd_export); @@ -305,12 +302,8 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) if (desc->bd_frag_ops->release_frags != NULL) desc->bd_frag_ops->release_frags(desc); - if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) - OBD_FREE_LARGE(GET_KIOV(desc), - desc->bd_max_iov * sizeof(*GET_KIOV(desc))); - else - OBD_FREE_LARGE(GET_KVEC(desc), - desc->bd_max_iov * sizeof(*GET_KVEC(desc))); + OBD_FREE_LARGE(desc->bd_vec, + desc->bd_max_iov * sizeof(*desc->bd_vec)); OBD_FREE_PTR(desc); EXIT; } @@ -322,10 +315,6 @@ EXPORT_SYMBOL(ptlrpc_free_bulk); */ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) { - __u32 serv_est; - int idx; - struct imp_at *at; - LASSERT(req->rq_import); if (AT_OFF) { @@ -340,18 +329,25 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) req->rq_timeout = req->rq_import->imp_server_timeout ? obd_timeout / 2 : obd_timeout; } else { - at = &req->rq_import->imp_at; + struct imp_at *at = &req->rq_import->imp_at; + timeout_t serv_est; + int idx; + idx = import_at_get_index(req->rq_import, req->rq_request_portal); serv_est = at_get(&at->iat_service_estimate[idx]); + /* + * Currently a 32 bit value is sent over the + * wire for rq_timeout so please don't change this + * to time64_t. The work for LU-1158 will in time + * replace rq_timeout with a 64 bit nanosecond value + */ req->rq_timeout = at_est2timeout(serv_est); } /* * We could get even fancier here, using history to predict increased * loading... - */ - - /* + * * Let the server know what this RPC timeout is by putting it in the * reqmsg */ @@ -361,10 +357,10 @@ EXPORT_SYMBOL(ptlrpc_at_set_req_timeout); /* Adjust max service estimate based on server value */ static void ptlrpc_at_adj_service(struct ptlrpc_request *req, - unsigned int serv_est) + timeout_t serv_est) { int idx; - unsigned int oldse; + timeout_t oldse; struct imp_at *at; LASSERT(req->rq_import); @@ -392,15 +388,16 @@ int ptlrpc_at_get_net_latency(struct ptlrpc_request *req) /* Adjust expected network latency */ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, - unsigned int service_time) + timeout_t service_timeout) { - unsigned int nl, oldnl; - struct imp_at *at; time64_t now = ktime_get_real_seconds(); + struct imp_at *at; + timeout_t oldnl; + timeout_t nl; LASSERT(req->rq_import); - if (service_time > now - req->rq_sent + 3) { + if (service_timeout > now - req->rq_sent + 3) { /* * b=16408, however, this can also happen if early reply * is lost and client RPC is expired and resent, early reply @@ -409,16 +406,17 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, * resent time, but server sent back service time of original * RPC. */ - CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ? - D_ADAPTTO : D_WARNING, - "Reported service time %u > total measured time %lld\n", - service_time, now - req->rq_sent); + CDEBUG_LIMIT((lustre_msg_get_flags(req->rq_reqmsg) & + MSG_RESENT) ? D_ADAPTTO : D_WARNING, + "Reported service time %u > total measured time %lld\n", + service_timeout, now - req->rq_sent); return; } - /* Network latency is total time less server processing time */ - nl = max_t(int, now - req->rq_sent - - service_time, 0) + 1; /* st rounding */ + /* Network latency is total time less server processing time, + * st rounding + */ + nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1; at = &req->rq_import->imp_at; oldnl = at_measured(&at->iat_net_latency, nl); @@ -437,14 +435,16 @@ static int unpack_reply(struct ptlrpc_request *req) if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) { rc = ptlrpc_unpack_rep_msg(req, req->rq_replen); if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc); + DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d", + rc); return -EPROTO; } } rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc); + DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d", + rc); return -EPROTO; } return 0; @@ -458,6 +458,7 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) __must_hold(&req->rq_lock) { struct ptlrpc_request *early_req; + timeout_t service_timeout; time64_t olddl; int rc; @@ -489,8 +490,8 @@ __must_hold(&req->rq_lock) lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); /* Network latency can be adjusted, it is pure network delays */ - ptlrpc_at_adj_net_latency(req, - lustre_msg_get_service_time(early_req->rq_repmsg)); + service_timeout = lustre_msg_get_service_timeout(early_req->rq_repmsg); + ptlrpc_at_adj_net_latency(req, service_timeout); sptlrpc_cli_finish_early_reply(early_req); @@ -505,6 +506,8 @@ __must_hold(&req->rq_lock) req->rq_deadline = req->rq_sent + req->rq_timeout + ptlrpc_at_get_net_latency(req); + /* The below message is checked in replay-single.sh test_65{a,b} */ + /* The below message is checked in sanity-{gss,krb5} test_8 */ DEBUG_REQ(D_ADAPTTO, req, "Early reply #%d, new deadline in %llds (%llds)", req->rq_early_count, @@ -548,14 +551,14 @@ void ptlrpc_request_cache_free(struct ptlrpc_request *req) */ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool) { - struct list_head *l, *tmp; struct ptlrpc_request *req; LASSERT(pool != NULL); spin_lock(&pool->prp_lock); - list_for_each_safe(l, tmp, &pool->prp_req_list) { - req = list_entry(l, struct ptlrpc_request, rq_list); + while ((req = list_first_entry_or_null(&pool->prp_req_list, + struct ptlrpc_request, + rq_list))) { list_del(&req->rq_list); LASSERT(req->rq_reqbuf); LASSERT(req->rq_reqbuf_len == pool->prp_rq_size); @@ -621,7 +624,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, { struct ptlrpc_request_pool *pool; - OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool)); + OBD_ALLOC_PTR(pool); if (!pool) return NULL; @@ -699,17 +702,14 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) void ptlrpc_add_unreplied(struct ptlrpc_request *req) { struct obd_import *imp = req->rq_import; - struct list_head *tmp; struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); LASSERT(list_empty(&req->rq_unreplied_list)); /* unreplied list is sorted by xid in ascending order */ - list_for_each_prev(tmp, &imp->imp_unreplied_list) { - iter = list_entry(tmp, struct ptlrpc_request, - rq_unreplied_list); - + list_for_each_entry_reverse(iter, &imp->imp_unreplied_list, + rq_unreplied_list) { LASSERT(req->rq_xid != iter->rq_xid); if (req->rq_xid < iter->rq_xid) continue; @@ -734,6 +734,41 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req) static atomic64_t ptlrpc_last_xid; +static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_import->imp_lock); + list_del_init(&req->rq_unreplied_list); + ptlrpc_assign_next_xid_nolock(req); + spin_unlock(&req->rq_import->imp_lock); + DEBUG_REQ(D_RPCTRACE, req, "reassign xid"); +} + +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc; + __u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc); + lustre_msg_set_tag(req->rq_reqmsg, tag); + ptlrpc_reassign_next_xid(req); +} +EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot); + +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req) +{ + __u16 tag = lustre_msg_get_tag(req->rq_reqmsg); + + if (tag != 0) { + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + obd_put_mod_rpc_slot(cli, opc, tag); + } +} +EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot); + int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, __u32 version, int opcode, char **bufs, struct ptlrpc_cli_ctx *ctx) @@ -807,11 +842,12 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, } if (fail_t) { - *fail_t = ktime_get_real_seconds() + LONG_UNLINK; + *fail_t = ktime_get_real_seconds() + + PTLRPC_REQ_LONG_UNLINK; if (fail2_t) *fail2_t = ktime_get_real_seconds() + - LONG_UNLINK; + PTLRPC_REQ_LONG_UNLINK; /* * The RPC is infected, let the test to change the @@ -828,6 +864,7 @@ out_ctx: LASSERT(!request->rq_pool); sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1); out_free: + atomic_dec(&imp->imp_reqs); class_import_put(imp); return rc; @@ -865,13 +902,14 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, if (request) { ptlrpc_cli_req_init(request); - LASSERTF((unsigned long)imp > 0x1000, "%p", imp); + LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp); LASSERT(imp != LP_POISON); LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n", imp->imp_client); LASSERT(imp->imp_client != LP_POISON); request->rq_import = class_import_get(imp); + atomic_inc(&imp->imp_reqs); } else { CERROR("request allocation out of memory\n"); } @@ -879,6 +917,33 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, return request; } +static int ptlrpc_reconnect_if_idle(struct obd_import *imp) +{ + int rc; + + /* + * initiate connection if needed when the import has been + * referenced by the new request to avoid races with disconnect. + * serialize this check against conditional state=IDLE + * in ptlrpc_disconnect_idle_interpret() + */ + spin_lock(&imp->imp_lock); + if (imp->imp_state == LUSTRE_IMP_IDLE) { + imp->imp_generation++; + imp->imp_initiated_at = imp->imp_generation; + imp->imp_state = LUSTRE_IMP_NEW; + + /* connect_import_locked releases imp_lock */ + rc = ptlrpc_connect_import_locked(imp); + if (rc) + return rc; + ptlrpc_pinger_add_import(imp); + } else { + spin_unlock(&imp->imp_lock); + } + return 0; +} + /** * Helper function for creating a request. * Calls __ptlrpc_request_alloc to allocate new request sturcture and inits @@ -891,38 +956,18 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, const struct req_format *format) { struct ptlrpc_request *request; - int connect = 0; request = __ptlrpc_request_alloc(imp, pool); if (!request) return NULL; - /* - * initiate connection if needed when the import has been - * referenced by the new request to avoid races with disconnect - */ - if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) { - int rc; - - CDEBUG_LIMIT(imp->imp_idle_debug, - "%s: reconnect after %llds idle\n", - imp->imp_obd->obd_name, ktime_get_real_seconds() - - imp->imp_last_reply_time); - spin_lock(&imp->imp_lock); - if (imp->imp_state == LUSTRE_IMP_IDLE) { - imp->imp_generation++; - imp->imp_initiated_at = imp->imp_generation; - imp->imp_state = LUSTRE_IMP_NEW; - connect = 1; - } - spin_unlock(&imp->imp_lock); - if (connect) { - rc = ptlrpc_connect_import(imp); - if (rc < 0) { - ptlrpc_request_free(request); - return NULL; - } - ptlrpc_pinger_add_import(imp); + /* don't make expensive check for idling connection + * if it's already connected */ + if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) { + if (ptlrpc_reconnect_if_idle(imp) < 0) { + atomic_dec(&imp->imp_reqs); + ptlrpc_request_free(request); + return NULL; } } @@ -1003,8 +1048,8 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void) int cpt; ENTRY; - cpt = cfs_cpt_current(cfs_cpt_table, 0); - OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof(*set)); + cpt = cfs_cpt_current(cfs_cpt_tab, 0); + OBD_CPT_ALLOC(set, cfs_cpt_tab, cpt, sizeof(*set)); if (!set) RETURN(NULL); atomic_set(&set->set_refcount, 1); @@ -1058,8 +1103,7 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, */ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct list_head *next; + struct ptlrpc_request *req; int expected_phase; int n = 0; @@ -1068,11 +1112,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) /* Requests on the set should either all be completed, or all be new */ expected_phase = (atomic_read(&set->set_remaining) == 0) ? RQ_PHASE_COMPLETE : RQ_PHASE_NEW; - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { LASSERT(req->rq_phase == expected_phase); n++; } @@ -1081,10 +1121,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) atomic_read(&set->set_remaining) == n, "%d / %d\n", atomic_read(&set->set_remaining), n); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + while ((req = list_first_entry_or_null(&set->set_requests, + struct ptlrpc_request, + rq_set_chain))) { list_del_init(&req->rq_set_chain); LASSERT(req->rq_phase == expected_phase); @@ -1116,6 +1155,11 @@ EXPORT_SYMBOL(ptlrpc_set_destroy); void ptlrpc_set_add_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req) { + if (set == PTLRPCD_SET) { + ptlrpcd_add_req(req); + return; + } + LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE); LASSERT(list_empty(&req->rq_set_chain)); @@ -1200,7 +1244,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, if (req->rq_ctx_init || req->rq_ctx_fini) { /* always allow ctx init/fini rpc go through */ } else if (imp->imp_state == LUSTRE_IMP_NEW) { - DEBUG_REQ(D_ERROR, req, "Uninitialized import."); + DEBUG_REQ(D_ERROR, req, "Uninitialized import"); *status = -EIO; } else if (imp->imp_state == LUSTRE_IMP_CLOSED) { unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg); @@ -1210,11 +1254,11 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, * race with umount */ DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ? - D_HA : D_ERROR, req, "IMP_CLOSED "); + D_HA : D_ERROR, req, "IMP_CLOSED"); *status = -EIO; } else if (ptlrpc_send_limit_expired(req)) { /* probably doesn't need to be a D_ERROR afterinitial testing */ - DEBUG_REQ(D_HA, req, "send limit expired "); + DEBUG_REQ(D_HA, req, "send limit expired"); *status = -ETIMEDOUT; } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING && imp->imp_state == LUSTRE_IMP_CONNECTING) { @@ -1238,13 +1282,13 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, } else if (req->rq_no_delay && imp->imp_generation != imp->imp_initiated_at) { /* ignore nodelay for requests initiating connections */ - *status = -EWOULDBLOCK; + *status = -EAGAIN; } else if (req->rq_allow_replay && (imp->imp_state == LUSTRE_IMP_REPLAY || imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || imp->imp_state == LUSTRE_IMP_REPLAY_WAIT || imp->imp_state == LUSTRE_IMP_RECOVER)) { - DEBUG_REQ(D_HA, req, "allow during recovery.\n"); + DEBUG_REQ(D_HA, req, "allow during recovery"); } else { delay = 1; } @@ -1301,32 +1345,28 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err) */ static int ptlrpc_check_status(struct ptlrpc_request *req) { - int err; + int rc; ENTRY; - err = lustre_msg_get_status(req->rq_repmsg); + rc = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { struct obd_import *imp = req->rq_import; lnet_nid_t nid = imp->imp_connection->c_peer.nid; __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); - if (ptlrpc_console_allow(req, opc, err)) + if (ptlrpc_console_allow(req, opc, rc)) LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s failed: rc = %d\n", imp->imp_obd->obd_name, ll_opcode2str(opc), - libcfs_nid2str(nid), err); - RETURN(err < 0 ? err : -EINVAL); + libcfs_nid2str(nid), rc); + RETURN(rc < 0 ? rc : -EINVAL); } - if (err < 0) { - DEBUG_REQ(D_INFO, req, "status is %d", err); - } else if (err > 0) { - /* XXX: translate this error from net to host */ - DEBUG_REQ(D_INFO, req, "status is %d", err); - } + if (rc) + DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc); - RETURN(err); + RETURN(rc); } /** @@ -1394,7 +1434,7 @@ static int after_reply(struct ptlrpc_request *req) if (req->rq_reply_truncated) { if (ptlrpc_no_resend(req)) { DEBUG_REQ(D_ERROR, req, - "reply buffer overflow, expected: %d, actual size: %d", + "reply buffer overflow, expected=%d, actual size=%d", req->rq_nob_received, req->rq_repbuf_len); RETURN(-EOVERFLOW); } @@ -1423,7 +1463,7 @@ static int after_reply(struct ptlrpc_request *req) */ rc = sptlrpc_cli_unwrap_reply(req); if (rc) { - DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc); + DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc); RETURN(rc); } @@ -1442,8 +1482,8 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { time64_t now = ktime_get_real_seconds(); - DEBUG_REQ(req->rq_nr_resend > 0 ? D_ERROR : D_RPCTRACE, req, - "Resending request on EINPROGRESS"); + DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) | + D_RPCTRACE, req, "resending request on EINPROGRESS"); spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); @@ -1487,7 +1527,7 @@ static int after_reply(struct ptlrpc_request *req) CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val); ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg)); ptlrpc_at_adj_net_latency(req, - lustre_msg_get_service_time(req->rq_repmsg)); + lustre_msg_get_service_timeout(req->rq_repmsg)); rc = ptlrpc_check_status(req); @@ -1655,9 +1695,24 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) lustre_msg_set_last_xid(req->rq_reqmsg, min_xid); - lustre_msg_set_status(req->rq_reqmsg, current_pid()); - - rc = sptlrpc_req_refresh_ctx(req, -1); + lustre_msg_set_status(req->rq_reqmsg, current->pid); + + /* If the request to be sent is an LDLM callback, do not try to + * refresh context. + * An LDLM callback is sent by a server to a client in order to make + * it release a lock, on a communication channel that uses a reverse + * context. It cannot be refreshed on its own, as it is the 'reverse' + * (server-side) representation of a client context. + * We do not care if the reverse context is expired, and want to send + * the LDLM callback anyway. Once the client receives the AST, it is + * its job to refresh its own context if it has expired, hence + * refreshing the associated reverse context on server side, before + * being able to send the LDLM_CANCEL requested by the server. + */ + if (lustre_msg_get_opc(req->rq_reqmsg) != LDLM_BL_CALLBACK && + lustre_msg_get_opc(req->rq_reqmsg) != LDLM_CP_CALLBACK && + lustre_msg_get_opc(req->rq_reqmsg) != LDLM_GL_CALLBACK) + rc = sptlrpc_req_refresh_ctx(req, 0); if (rc) { if (req->rq_err) { req->rq_status = rc; @@ -1672,7 +1727,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) CDEBUG(D_RPCTRACE, "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", - req, current_comm(), + req, current->comm, imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg), @@ -1683,14 +1738,16 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) spin_lock(&imp->imp_lock); if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); - atomic_dec(&req->rq_import->imp_inflight); + if (atomic_dec_and_test(&req->rq_import->imp_inflight)) + wake_up(&req->rq_import->imp_recovery_waitq); } spin_unlock(&imp->imp_lock); ptlrpc_rqphase_move(req, RQ_PHASE_NEW); RETURN(rc); } if (rc) { - DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc); + DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d", + rc); spin_lock(&req->rq_lock); req->rq_net_err = 1; spin_unlock(&req->rq_lock); @@ -1735,19 +1792,16 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) */ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp, *next; - struct list_head comp_reqs; + struct ptlrpc_request *req, *next; + LIST_HEAD(comp_reqs); int force_timer_recalc = 0; ENTRY; if (atomic_read(&set->set_remaining) == 0) RETURN(1); - INIT_LIST_HEAD(&comp_reqs); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry_safe(req, next, &set->set_requests, + rq_set_chain) { struct obd_import *imp = req->rq_import; int unregistered = 0; int async = 1; @@ -1830,7 +1884,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * not corrupt any data. */ if (req->rq_phase == RQ_PHASE_UNREG_RPC && - ptlrpc_client_recv_or_unlink(req)) + ptlrpc_cli_wait_unlink(req)) continue; if (req->rq_phase == RQ_PHASE_UNREG_BULK && ptlrpc_client_bulk_active(req)) @@ -1868,7 +1922,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) /* * Check if we still need to wait for unlink. */ - if (ptlrpc_client_recv_or_unlink(req) || + if (ptlrpc_cli_wait_unlink(req) || ptlrpc_client_bulk_active(req)) continue; /* If there is no need to resend, fail it now. */ @@ -1893,7 +1947,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } /* - * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr + * ptlrpc_set_wait uses l_wait_event_abortable_timeout() * so it sets rq_intr regardless of individual rpc * timeouts. The synchronous IO waiting path sets * rq_intr irrespective of whether ptlrpcd @@ -1951,6 +2005,27 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) GOTO(interpret, req->rq_status); } + /* don't resend too fast in case of network + * errors. + */ + if (ktime_get_real_seconds() < (req->rq_sent + 1) + && req->rq_net_err && req->rq_timedout) { + + DEBUG_REQ(D_INFO, req, + "throttle request"); + /* Don't try to resend RPC right away + * as it is likely it will fail again + * and ptlrpc_check_set() will be + * called again, keeping this thread + * busy. Instead, wait for the next + * timeout. Flag it as resend to + * ensure we don't wait to long. + */ + req->rq_resend = 1; + spin_unlock(&imp->imp_lock); + continue; + } + list_move_tail(&req->rq_list, &imp->imp_sending_list); @@ -1973,7 +2048,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * rq_wait_ctx is only touched by ptlrpcd, * so no lock is needed here. */ - status = sptlrpc_req_refresh_ctx(req, -1); + status = sptlrpc_req_refresh_ctx(req, 0); if (status) { if (req->rq_err) { req->rq_status = status; @@ -2122,7 +2197,7 @@ interpret: if (req->rq_reqmsg) CDEBUG(D_RPCTRACE, "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", - req, current_comm(), + req, current->comm, imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, @@ -2139,13 +2214,14 @@ interpret: */ if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); - atomic_dec(&imp->imp_inflight); + if (atomic_dec_and_test(&imp->imp_inflight)) + wake_up(&imp->imp_recovery_waitq); } list_del_init(&req->rq_unreplied_list); spin_unlock(&imp->imp_lock); atomic_dec(&set->set_remaining); - wake_up_all(&imp->imp_recovery_waitq); + wake_up(&imp->imp_recovery_waitq); if (set->set_producer) { /* produce a new request if possible */ @@ -2207,7 +2283,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) req->rq_real_sent < req->rq_sent || req->rq_real_sent >= req->rq_deadline) ? "timed out for sent delay" : "timed out for slow reply"), - (s64)req->rq_sent, (s64)req->rq_real_sent); + req->rq_sent, req->rq_real_sent); if (imp && obd_debug_peer_on_timeout) LNetDebugPeer(imp->imp_connection->c_peer); @@ -2262,13 +2338,11 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) /** * Time out all uncompleted requests in request set pointed by \a data - * Callback used when waiting on sets with l_wait_event. - * Always returns 1. + * This is called when a wait times out. */ -int ptlrpc_expired_set(void *data) +void ptlrpc_expired_set(struct ptlrpc_request_set *set) { - struct ptlrpc_request_set *set = data; - struct list_head *tmp; + struct ptlrpc_request *req; time64_t now = ktime_get_real_seconds(); ENTRY; @@ -2277,11 +2351,7 @@ int ptlrpc_expired_set(void *data) /* * A timeout expired. See which reqs it applies to... */ - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { /* don't expire request waiting for context */ if (req->rq_wait_ctx) continue; @@ -2301,43 +2371,28 @@ int ptlrpc_expired_set(void *data) * ptlrpcd thread. */ ptlrpc_expire_one_request(req, 1); - } - - /* - * When waiting for a whole set, we always break out of the - * sleep so we can recalculate the timeout, or enable interrupts - * if everyone's timed out. - */ - RETURN(1); -} + /* + * Loops require that we resched once in a while to avoid + * RCU stalls and a few other problems. + */ + cond_resched(); -/** - * Sets rq_intr flag in \a req under spinlock. - */ -void ptlrpc_mark_interrupted(struct ptlrpc_request *req) -{ - spin_lock(&req->rq_lock); - req->rq_intr = 1; - spin_unlock(&req->rq_lock); + } } -EXPORT_SYMBOL(ptlrpc_mark_interrupted); /** * Interrupts (sets interrupted flag) all uncompleted requests in - * a set \a data. Callback for l_wait_event for interruptible waits. + * a set \a data. This is called when a wait_event is interrupted + * by a signal. */ -static void ptlrpc_interrupted_set(void *data) +static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set) { - struct ptlrpc_request_set *set = data; - struct list_head *tmp; + struct ptlrpc_request *req; LASSERT(set != NULL); CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set); - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_intr) continue; @@ -2346,7 +2401,9 @@ static void ptlrpc_interrupted_set(void *data) !req->rq_allow_intr) continue; - ptlrpc_mark_interrupted(req); + spin_lock(&req->rq_lock); + req->rq_intr = 1; + spin_unlock(&req->rq_lock); } } @@ -2355,16 +2412,13 @@ static void ptlrpc_interrupted_set(void *data) */ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) { - struct list_head *tmp; time64_t now = ktime_get_real_seconds(); int timeout = 0; struct ptlrpc_request *req; time64_t deadline; ENTRY; - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { /* Request in-flight? */ if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || (req->rq_phase == RQ_PHASE_BULK) || @@ -2402,9 +2456,7 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) */ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp; struct ptlrpc_request *req; - struct l_wait_info lwi; time64_t timeout; int rc; @@ -2412,9 +2464,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) if (set->set_producer) (void)ptlrpc_set_producer(set); else - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_phase == RQ_PHASE_NEW) (void)ptlrpc_send_new_req(req); } @@ -2433,49 +2483,67 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) set, timeout); if ((timeout == 0 && !signal_pending(current)) || - set->set_allow_intr) + set->set_allow_intr) { /* * No requests are in-flight (ether timed out * or delayed), so we can allow interrupts. * We still want to block for a limited time, * so we allow interrupts during the timeout. */ - lwi = LWI_TIMEOUT_INTR_ALL( - cfs_time_seconds(timeout ? timeout : 1), - ptlrpc_expired_set, - ptlrpc_interrupted_set, set); - else + rc = l_wait_event_abortable_timeout( + set->set_waitq, + ptlrpc_check_set(NULL, set), + cfs_time_seconds(timeout ? timeout : 1)); + if (rc == 0) { + rc = -ETIMEDOUT; + ptlrpc_expired_set(set); + } else if (rc < 0) { + rc = -EINTR; + ptlrpc_interrupted_set(set); + } else { + rc = 0; + } + } else { /* * At least one request is in flight, so no * interrupts are allowed. Wait until all * complete, or an in-flight req times out. */ - lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), - ptlrpc_expired_set, set); - - rc = l_wait_event(set->set_waitq, - ptlrpc_check_set(NULL, set), &lwi); - - /* - * LU-769 - if we ignored the signal because it was already - * pending when we started, we need to handle it now or we risk - * it being ignored forever - */ - if (rc == -ETIMEDOUT && - (!lwi.lwi_allow_intr || set->set_allow_intr) && - signal_pending(current)) { - sigset_t blocked_sigs = - cfs_block_sigsinv(LUSTRE_FATAL_SIGS); + rc = wait_event_idle_timeout( + set->set_waitq, + ptlrpc_check_set(NULL, set), + cfs_time_seconds(timeout ? timeout : 1)); + if (rc == 0) { + ptlrpc_expired_set(set); + rc = -ETIMEDOUT; + } else { + rc = 0; + } /* - * In fact we only interrupt for the "fatal" signals - * like SIGINT or SIGKILL. We still ignore less - * important signals since ptlrpc set is not easily - * reentrant from userspace again + * LU-769 - if we ignored the signal because + * it was already pending when we started, we + * need to handle it now or we risk it being + * ignored forever */ - if (signal_pending(current)) - ptlrpc_interrupted_set(set); - cfs_restore_sigs(blocked_sigs); + if (rc == -ETIMEDOUT && + signal_pending(current)) { + sigset_t old, new; + + siginitset(&new, LUSTRE_FATAL_SIGS); + sigprocmask(SIG_BLOCK, &new, &old); + /* + * In fact we only interrupt for the + * "fatal" signals like SIGINT or + * SIGKILL. We still ignore less + * important signals since ptlrpc set + * is not easily reentrant from + * userspace again + */ + if (signal_pending(current)) + ptlrpc_interrupted_set(set); + sigprocmask(SIG_SETMASK, &old, NULL); + } } LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); @@ -2490,9 +2558,8 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) * the error cases -eeb. */ if (rc == 0 && atomic_read(&set->set_remaining) == 0) { - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, + rq_set_chain) { spin_lock(&req->rq_lock); req->rq_invalid_rqset = 1; spin_unlock(&req->rq_lock); @@ -2503,9 +2570,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) LASSERT(atomic_read(&set->set_remaining) == 0); rc = set->set_rc; /* rq_status of already freed requests if any */ - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { LASSERT(req->rq_phase == RQ_PHASE_COMPLETE); if (req->rq_status != 0) rc = req->rq_status; @@ -2563,6 +2628,10 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) sptlrpc_cli_free_repbuf(request); if (request->rq_import) { + if (!ptlrpcd_check_work(request)) { + LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0); + atomic_dec(&request->rq_import->imp_reqs); + } class_import_put(request->rq_import); request->rq_import = NULL; } @@ -2677,9 +2746,7 @@ EXPORT_SYMBOL(ptlrpc_req_xid); */ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) { - int rc; - struct l_wait_info lwi; - + bool discard = false; /* * Might sleep. */ @@ -2689,20 +2756,23 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && async && request->rq_reply_deadline == 0 && cfs_fail_val == 0) request->rq_reply_deadline = ktime_get_real_seconds() + - LONG_UNLINK; + PTLRPC_REQ_LONG_UNLINK; /* * Nothing left to do. */ - if (!ptlrpc_client_recv_or_unlink(request)) + if (!__ptlrpc_cli_wait_unlink(request, &discard)) RETURN(1); LNetMDUnlink(request->rq_reply_md_h); + if (discard) /* Discard the request-out callback */ + __LNetMDUnlink(request->rq_req_md_h, discard); + /* * Let's check it once again. */ - if (!ptlrpc_client_recv_or_unlink(request)) + if (!ptlrpc_cli_wait_unlink(request)) RETURN(1); /* Move to "Unregistering" phase as reply was not unlinked yet. */ @@ -2715,29 +2785,30 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) RETURN(0); /* - * We have to l_wait_event() whatever the result, to give liblustre + * We have to wait_event_idle_timeout() whatever the result, to get * a chance to run reply_in_callback(), and to make sure we've * unlinked before returning a req to the pool. */ for (;;) { - /* The wq argument is ignored by user-space wait_event macros */ wait_queue_head_t *wq = (request->rq_set) ? &request->rq_set->set_waitq : &request->rq_reply_waitq; + int seconds = PTLRPC_REQ_LONG_UNLINK; /* * Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), - cfs_time_seconds(1), NULL, NULL); - rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), - &lwi); - if (rc == 0) { + while (seconds > 0 && + wait_event_idle_timeout( + *wq, + !ptlrpc_cli_wait_unlink(request), + cfs_time_seconds(1)) == 0) + seconds -= 1; + if (seconds > 0) { ptlrpc_rqphase_move(request, request->rq_next_phase); RETURN(1); } - LASSERT(rc == -ETIMEDOUT); DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d", request->rq_receiving_reply, @@ -2943,7 +3014,7 @@ EXPORT_SYMBOL(ptlrpc_request_addref); void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, struct obd_import *imp) { - struct list_head *tmp; + struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); @@ -2971,11 +3042,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, LASSERT(imp->imp_replayable); /* Balanced in ptlrpc_free_committed, usually. */ ptlrpc_request_addref(req); - list_for_each_prev(tmp, &imp->imp_replay_list) { - struct ptlrpc_request *iter = list_entry(tmp, - struct ptlrpc_request, - rq_replay_list); - + list_for_each_entry_reverse(iter, &imp->imp_replay_list, + rq_replay_list) { /* * We may have duplicate transnos if we create and then * open a file, or for closes retained if to match creating @@ -3019,7 +3087,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) } /* for distributed debugging */ - lustre_msg_set_status(req->rq_reqmsg, current_pid()); + lustre_msg_set_status(req->rq_reqmsg, current->pid); /* add a ref for the set (see comment in ptlrpc_set_add_req) */ ptlrpc_request_addref(req); @@ -3054,7 +3122,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, if (!ptlrpc_client_replied(req) || (req->rq_bulk && lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) { - DEBUG_REQ(D_ERROR, req, "request replay timed out.\n"); + DEBUG_REQ(D_ERROR, req, "request replay timed out"); GOTO(out, rc = -ETIMEDOUT); } @@ -3066,7 +3134,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, /** VBR: check version failure */ if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { /** replay was failed due to version mismatch */ - DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n"); + DEBUG_REQ(D_WARNING, req, "Version mismatch during replay"); spin_lock(&imp->imp_lock); imp->imp_vbr_failed = 1; spin_unlock(&imp->imp_lock); @@ -3089,13 +3157,13 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, /* transaction number shouldn't be bigger than the latest replayed */ if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { DEBUG_REQ(D_ERROR, req, - "Reported transno %llu is bigger than the replayed one: %llu", + "Reported transno=%llu is bigger than replayed=%llu", req->rq_transno, lustre_msg_get_transno(req->rq_reqmsg)); GOTO(out, rc = -EINVAL); } - DEBUG_REQ(D_HA, req, "got rep"); + DEBUG_REQ(D_HA, req, "got reply"); /* let the callback do fixups, possibly including in the request */ if (req->rq_replay_cb) @@ -3204,8 +3272,8 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) ptlrpc_at_set_req_timeout(req); /* Tell server net_latency to calculate how long to wait for reply. */ - lustre_msg_set_service_time(req->rq_reqmsg, - ptlrpc_at_get_net_latency(req)); + lustre_msg_set_service_timeout(req->rq_reqmsg, + ptlrpc_at_get_net_latency(req)); DEBUG_REQ(D_HA, req, "REPLAY"); atomic_inc(&req->rq_import->imp_replay_inflight); @@ -3223,7 +3291,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) */ void ptlrpc_abort_inflight(struct obd_import *imp) { - struct list_head *tmp, *n; + struct ptlrpc_request *req; ENTRY; /* @@ -3238,11 +3306,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) * locked? Also, how do we know if the requests on the list are * being freed at this time? */ - list_for_each_safe(tmp, n, &imp->imp_sending_list) { - struct ptlrpc_request *req = list_entry(tmp, - struct ptlrpc_request, - rq_list); - + list_for_each_entry(req, &imp->imp_sending_list, rq_list) { DEBUG_REQ(D_RPCTRACE, req, "inflight"); spin_lock(&req->rq_lock); @@ -3254,10 +3318,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) spin_unlock(&req->rq_lock); } - list_for_each_safe(tmp, n, &imp->imp_delayed_list) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_list); - + list_for_each_entry(req, &imp->imp_delayed_list, rq_list) { DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req"); spin_lock(&req->rq_lock); @@ -3284,15 +3345,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp) */ void ptlrpc_abort_set(struct ptlrpc_request_set *set) { - struct list_head *tmp, *pos; + struct ptlrpc_request *req; LASSERT(set != NULL); - list_for_each_safe(pos, tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(pos, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { spin_lock(&req->rq_lock); if (req->rq_phase != RQ_PHASE_RPC) { spin_unlock(&req->rq_lock); @@ -3336,7 +3393,8 @@ void ptlrpc_init_xid(void) } /* Need to always be aligned to a power-of-two for mutli-bulk BRW */ - CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0); + BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) != + 0); xid &= PTLRPC_BULK_OPS_MASK; atomic64_set(&ptlrpc_last_xid, xid); } @@ -3407,9 +3465,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) || req->rq_mbits == 0) { req->rq_mbits = req->rq_xid; } else { - int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV; - req->rq_mbits -= total_md - 1; + req->rq_mbits -= bd->bd_md_count - 1; } } else { /* @@ -3424,8 +3480,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) * that server can infer the number of bulks that were prepared, * see LU-1431 */ - req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV) - 1; + req->rq_mbits += bd->bd_md_count - 1; /* * Set rq_xid as rq_mbits to indicate the final bulk for the old