X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=bee8d3fbcf00773f7ba63f5f32ccad85ac29f7f4;hb=5e30a2c06176f50f5e17aba68fdae7e38d922d33;hp=b9b73ccc8a33b387a0be9640bda2dc32f9314512;hpb=490f0dd33b2244029f8bf17050f7a86bc76986d6;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index b9b73cc..bee8d3f 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -37,6 +37,7 @@ #include #include +#include #include #include #include @@ -46,6 +47,50 @@ #include "ptlrpc_internal.h" +static void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1); +} + +static void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0); +} + +static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc) +{ + int i; + + for (i = 0; i < desc->bd_iov_count ; i++) + put_page(BD_GET_KIOV(desc, i).kiov_page); +} + +static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc, + void *frag, int len) +{ + unsigned int offset = (uintptr_t)frag & ~PAGE_MASK; + + ENTRY; + while (len > 0) { + int page_len = min_t(unsigned int, PAGE_SIZE - offset, + len); + uintptr_t vaddr = (uintptr_t) frag; + + ptlrpc_prep_bulk_page_nopin(desc, + lnet_kvaddr_to_page(vaddr), + offset, page_len); + offset = 0; + len -= page_len; + frag += page_len; + } + + RETURN(desc->bd_nob); +} + const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_pin, .release_frags = ptlrpc_release_bulk_page_pin, @@ -55,6 +100,7 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops); const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_nopin, .release_frags = ptlrpc_release_bulk_noop, + .add_iov_frag = ptlrpc_prep_bulk_frag_pages, }; EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops); @@ -70,7 +116,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async); /** * Initialize passed in client structure \a cl. */ -void ptlrpc_init_client(int req_portal, int rep_portal, char *name, +void ptlrpc_init_client(int req_portal, int rep_portal, const char *name, struct ptlrpc_client *cl) { cl->cli_request_portal = req_portal; @@ -195,7 +241,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, if (!desc) RETURN(NULL); - desc->bd_import_generation = req->rq_import_generation; desc->bd_import = class_import_get(imp); desc->bd_req = req; @@ -415,14 +460,16 @@ static int unpack_reply(struct ptlrpc_request *req) if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) { rc = ptlrpc_unpack_rep_msg(req, req->rq_replen); if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc); + DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d", + rc); return -EPROTO; } } rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc); + DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d", + rc); return -EPROTO; } return 0; @@ -483,6 +530,8 @@ __must_hold(&req->rq_lock) req->rq_deadline = req->rq_sent + req->rq_timeout + ptlrpc_at_get_net_latency(req); + /* The below message is checked in replay-single.sh test_65{a,b} */ + /* The below message is checked in sanity-{gss,krb5} test_8 */ DEBUG_REQ(D_ADAPTTO, req, "Early reply #%d, new deadline in %llds (%llds)", req->rq_early_count, @@ -561,13 +610,11 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) "Trying to change pool size with nonempty pool from %d to %d bytes\n", pool->prp_rq_size, size); - spin_lock(&pool->prp_lock); pool->prp_rq_size = size; for (i = 0; i < num_rq; i++) { struct ptlrpc_request *req; struct lustre_msg *msg; - spin_unlock(&pool->prp_lock); req = ptlrpc_request_cache_alloc(GFP_NOFS); if (!req) return i; @@ -581,8 +628,8 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) req->rq_pool = pool; spin_lock(&pool->prp_lock); list_add_tail(&req->rq_list, &pool->prp_req_list); + spin_unlock(&pool->prp_lock); } - spin_unlock(&pool->prp_lock); return num_rq; } EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); @@ -601,7 +648,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, { struct ptlrpc_request_pool *pool; - OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool)); + OBD_ALLOC_PTR(pool); if (!pool) return NULL; @@ -712,8 +759,42 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req) spin_unlock(&req->rq_import->imp_lock); } -static __u64 ptlrpc_last_xid; -static spinlock_t ptlrpc_last_xid_lock; +static atomic64_t ptlrpc_last_xid; + +static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_import->imp_lock); + list_del_init(&req->rq_unreplied_list); + ptlrpc_assign_next_xid_nolock(req); + spin_unlock(&req->rq_import->imp_lock); + DEBUG_REQ(D_RPCTRACE, req, "reassign xid"); +} + +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc; + __u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc); + lustre_msg_set_tag(req->rq_reqmsg, tag); + ptlrpc_reassign_next_xid(req); +} +EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot); + +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req) +{ + __u16 tag = lustre_msg_get_tag(req->rq_reqmsg); + + if (tag != 0) { + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + obd_put_mod_rpc_slot(cli, opc, tag); + } +} +EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot); int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, __u32 version, int opcode, char **bufs, @@ -782,9 +863,9 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, fail2_t = &request->rq_bulk_deadline; } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) { time64_t now = ktime_get_real_seconds(); - spin_lock(&ptlrpc_last_xid_lock); - ptlrpc_last_xid = ((__u64)now >> 4) << 24; - spin_unlock(&ptlrpc_last_xid_lock); + u64 xid = ((u64)now >> 4) << 24; + + atomic64_set(&ptlrpc_last_xid, xid); } if (fail_t) { @@ -822,32 +903,7 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack); int ptlrpc_request_pack(struct ptlrpc_request *request, __u32 version, int opcode) { - int rc; - - rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); - if (rc) - return rc; - - /* - * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of - * ptlrpc_body sent from server equal to local ptlrpc_body size, so we - * have to send old ptlrpc_body to keep interoprability with these - * clients. - * - * Only three kinds of server->client RPCs so far: - * - LDLM_BL_CALLBACK - * - LDLM_CP_CALLBACK - * - LDLM_GL_CALLBACK - * - * XXX This should be removed whenever we drop the interoprability with - * the these old clients. - */ - if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK || - opcode == LDLM_GL_CALLBACK) - req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY, - sizeof(struct ptlrpc_body_v2), RCL_CLIENT); - - return rc; + return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); } EXPORT_SYMBOL(ptlrpc_request_pack); @@ -897,7 +953,6 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, const struct req_format *format) { struct ptlrpc_request *request; - int connect = 0; request = __ptlrpc_request_alloc(imp, pool); if (!request) @@ -918,17 +973,17 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, if (imp->imp_state == LUSTRE_IMP_IDLE) { imp->imp_generation++; imp->imp_initiated_at = imp->imp_generation; - imp->imp_state = LUSTRE_IMP_NEW; - connect = 1; - } - spin_unlock(&imp->imp_lock); - if (connect) { - rc = ptlrpc_connect_import(imp); + imp->imp_state = LUSTRE_IMP_NEW; + + /* connect_import_locked releases imp_lock */ + rc = ptlrpc_connect_import_locked(imp); if (rc < 0) { ptlrpc_request_free(request); return NULL; } ptlrpc_pinger_add_import(imp); + } else { + spin_unlock(&imp->imp_lock); } } @@ -1206,7 +1261,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, if (req->rq_ctx_init || req->rq_ctx_fini) { /* always allow ctx init/fini rpc go through */ } else if (imp->imp_state == LUSTRE_IMP_NEW) { - DEBUG_REQ(D_ERROR, req, "Uninitialized import."); + DEBUG_REQ(D_ERROR, req, "Uninitialized import"); *status = -EIO; } else if (imp->imp_state == LUSTRE_IMP_CLOSED) { unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg); @@ -1216,11 +1271,11 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, * race with umount */ DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ? - D_HA : D_ERROR, req, "IMP_CLOSED "); + D_HA : D_ERROR, req, "IMP_CLOSED"); *status = -EIO; } else if (ptlrpc_send_limit_expired(req)) { /* probably doesn't need to be a D_ERROR afterinitial testing */ - DEBUG_REQ(D_HA, req, "send limit expired "); + DEBUG_REQ(D_HA, req, "send limit expired"); *status = -ETIMEDOUT; } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING && imp->imp_state == LUSTRE_IMP_CONNECTING) { @@ -1250,7 +1305,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || imp->imp_state == LUSTRE_IMP_REPLAY_WAIT || imp->imp_state == LUSTRE_IMP_RECOVER)) { - DEBUG_REQ(D_HA, req, "allow during recovery.\n"); + DEBUG_REQ(D_HA, req, "allow during recovery"); } else { delay = 1; } @@ -1307,32 +1362,28 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err) */ static int ptlrpc_check_status(struct ptlrpc_request *req) { - int err; + int rc; ENTRY; - err = lustre_msg_get_status(req->rq_repmsg); + rc = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { struct obd_import *imp = req->rq_import; lnet_nid_t nid = imp->imp_connection->c_peer.nid; __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); - if (ptlrpc_console_allow(req, opc, err)) + if (ptlrpc_console_allow(req, opc, rc)) LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s failed: rc = %d\n", imp->imp_obd->obd_name, ll_opcode2str(opc), - libcfs_nid2str(nid), err); - RETURN(err < 0 ? err : -EINVAL); + libcfs_nid2str(nid), rc); + RETURN(rc < 0 ? rc : -EINVAL); } - if (err < 0) { - DEBUG_REQ(D_INFO, req, "status is %d", err); - } else if (err > 0) { - /* XXX: translate this error from net to host */ - DEBUG_REQ(D_INFO, req, "status is %d", err); - } + if (rc) + DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc); - RETURN(err); + RETURN(rc); } /** @@ -1400,7 +1451,7 @@ static int after_reply(struct ptlrpc_request *req) if (req->rq_reply_truncated) { if (ptlrpc_no_resend(req)) { DEBUG_REQ(D_ERROR, req, - "reply buffer overflow, expected: %d, actual size: %d", + "reply buffer overflow, expected=%d, actual size=%d", req->rq_nob_received, req->rq_repbuf_len); RETURN(-EOVERFLOW); } @@ -1429,7 +1480,7 @@ static int after_reply(struct ptlrpc_request *req) */ rc = sptlrpc_cli_unwrap_reply(req); if (rc) { - DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc); + DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc); RETURN(rc); } @@ -1448,7 +1499,8 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { time64_t now = ktime_get_real_seconds(); - DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS"); + DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) | + D_RPCTRACE, req, "resending request on EINPROGRESS"); spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); @@ -1676,11 +1728,12 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) } CDEBUG(D_RPCTRACE, - "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n", - current_comm(), + "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, - obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg)); + obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); rc = ptl_send_rpc(req, 0); if (rc == -ENOMEM) { @@ -1694,7 +1747,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) RETURN(rc); } if (rc) { - DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc); + DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d", + rc); spin_lock(&req->rq_lock); req->rq_net_err = 1; spin_unlock(&req->rq_lock); @@ -1930,9 +1984,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * put on delay list - only if we wait * recovery finished - before send */ - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp->imp_delayed_list); + list_move_tail(&req->rq_list, + &imp->imp_delayed_list); spin_unlock(&imp->imp_lock); continue; } @@ -1956,9 +2009,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) GOTO(interpret, req->rq_status); } - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp->imp_sending_list); + list_move_tail(&req->rq_list, + &imp->imp_sending_list); spin_unlock(&imp->imp_lock); @@ -2127,13 +2179,14 @@ interpret: if (req->rq_reqmsg) CDEBUG(D_RPCTRACE, - "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n", - current_comm(), + "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, obd_import_nid2str(imp), - lustre_msg_get_opc(req->rq_reqmsg)); + lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); spin_lock(&imp->imp_lock); /* @@ -2438,7 +2491,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) set, timeout); if ((timeout == 0 && !signal_pending(current)) || - set->set_allow_intr) + set->set_allow_intr) { /* * No requests are in-flight (ether timed out * or delayed), so we can allow interrupts. @@ -2449,7 +2502,10 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) cfs_time_seconds(timeout ? timeout : 1), ptlrpc_expired_set, ptlrpc_interrupted_set, set); - else + + rc = l_wait_event(set->set_waitq, + ptlrpc_check_set(NULL, set), &lwi); + } else { /* * At least one request is in flight, so no * interrupts are allowed. Wait until all @@ -2458,29 +2514,32 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), ptlrpc_expired_set, set); - rc = l_wait_event(set->set_waitq, - ptlrpc_check_set(NULL, set), &lwi); - - /* - * LU-769 - if we ignored the signal because it was already - * pending when we started, we need to handle it now or we risk - * it being ignored forever - */ - if (rc == -ETIMEDOUT && - (!lwi.lwi_allow_intr || set->set_allow_intr) && - signal_pending(current)) { - sigset_t blocked_sigs = - cfs_block_sigsinv(LUSTRE_FATAL_SIGS); + rc = l_wait_event(set->set_waitq, + ptlrpc_check_set(NULL, set), &lwi); /* - * In fact we only interrupt for the "fatal" signals - * like SIGINT or SIGKILL. We still ignore less - * important signals since ptlrpc set is not easily - * reentrant from userspace again + * LU-769 - if we ignored the signal because + * it was already pending when we started, we + * need to handle it now or we risk it being + * ignored forever */ - if (signal_pending(current)) - ptlrpc_interrupted_set(set); - cfs_restore_sigs(blocked_sigs); + if (rc == -ETIMEDOUT && + signal_pending(current)) { + sigset_t blocked_sigs = + cfs_block_sigsinv(LUSTRE_FATAL_SIGS); + + /* + * In fact we only interrupt for the + * "fatal" signals like SIGINT or + * SIGKILL. We still ignore less + * important signals since ptlrpc set + * is not easily reentrant from + * userspace again + */ + if (signal_pending(current)) + ptlrpc_interrupted_set(set); + cfs_restore_sigs(blocked_sigs); + } } LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); @@ -2682,9 +2741,6 @@ EXPORT_SYMBOL(ptlrpc_req_xid); */ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) { - int rc; - struct l_wait_info lwi; - /* * Might sleep. */ @@ -2725,24 +2781,25 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) * unlinked before returning a req to the pool. */ for (;;) { - /* The wq argument is ignored by user-space wait_event macros */ wait_queue_head_t *wq = (request->rq_set) ? &request->rq_set->set_waitq : &request->rq_reply_waitq; + int seconds = LONG_UNLINK; /* * Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), - cfs_time_seconds(1), NULL, NULL); - rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), - &lwi); - if (rc == 0) { + while (seconds > 0 && + wait_event_idle_timeout( + *wq, + !ptlrpc_client_recv_or_unlink(request), + cfs_time_seconds(1)) == 0) + seconds -= 1; + if (seconds > 0) { ptlrpc_rqphase_move(request, request->rq_next_phase); RETURN(1); } - LASSERT(rc == -ETIMEDOUT); DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d", request->rq_receiving_reply, @@ -3059,7 +3116,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, if (!ptlrpc_client_replied(req) || (req->rq_bulk && lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) { - DEBUG_REQ(D_ERROR, req, "request replay timed out.\n"); + DEBUG_REQ(D_ERROR, req, "request replay timed out"); GOTO(out, rc = -ETIMEDOUT); } @@ -3071,7 +3128,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, /** VBR: check version failure */ if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { /** replay was failed due to version mismatch */ - DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n"); + DEBUG_REQ(D_WARNING, req, "Version mismatch during replay"); spin_lock(&imp->imp_lock); imp->imp_vbr_failed = 1; spin_unlock(&imp->imp_lock); @@ -3094,13 +3151,13 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, /* transaction number shouldn't be bigger than the latest replayed */ if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { DEBUG_REQ(D_ERROR, req, - "Reported transno %llu is bigger than the replayed one: %llu", + "Reported transno=%llu is bigger than replayed=%llu", req->rq_transno, lustre_msg_get_transno(req->rq_reqmsg)); GOTO(out, rc = -EINVAL); } - DEBUG_REQ(D_HA, req, "got rep"); + DEBUG_REQ(D_HA, req, "got reply"); /* let the callback do fixups, possibly including in the request */ if (req->rq_replay_cb) @@ -3193,8 +3250,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); memset(aa, 0, sizeof(*aa)); /* Prepare request to be resent with ptlrpcd */ @@ -3230,14 +3286,14 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) void ptlrpc_abort_inflight(struct obd_import *imp) { struct list_head *tmp, *n; - ENTRY; + /* * Make sure that no new requests get processed for this import. * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing * this flag and then putting requests on sending_list or delayed_list. */ - spin_lock(&imp->imp_lock); + assert_spin_locked(&imp->imp_lock); /* * XXX locking? Maybe we should remove each request with the list @@ -3282,8 +3338,6 @@ void ptlrpc_abort_inflight(struct obd_import *imp) if (imp->imp_replayable) ptlrpc_free_committed(imp); - spin_unlock(&imp->imp_lock); - EXIT; } @@ -3333,19 +3387,21 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set) void ptlrpc_init_xid(void) { time64_t now = ktime_get_real_seconds(); + u64 xid; - spin_lock_init(&ptlrpc_last_xid_lock); if (now < YEAR_2004) { - get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid)); - ptlrpc_last_xid >>= 2; - ptlrpc_last_xid |= (1ULL << 61); + get_random_bytes(&xid, sizeof(xid)); + xid >>= 2; + xid |= (1ULL << 61); } else { - ptlrpc_last_xid = (__u64)now << 20; + xid = (u64)now << 20; } /* Need to always be aligned to a power-of-two for mutli-bulk BRW */ - CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0); - ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK; + BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) != + 0); + xid &= PTLRPC_BULK_OPS_MASK; + atomic64_set(&ptlrpc_last_xid, xid); } /** @@ -3362,14 +3418,7 @@ void ptlrpc_init_xid(void) */ __u64 ptlrpc_next_xid(void) { - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - ptlrpc_last_xid = next; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; + return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid); } /** @@ -3458,19 +3507,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) */ __u64 ptlrpc_sample_next_xid(void) { -#if BITS_PER_LONG == 32 - /* need to avoid possible word tearing on 32-bit systems */ - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; -#else - /* No need to lock, since returned value is racy anyways */ - return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; -#endif + return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT; } EXPORT_SYMBOL(ptlrpc_sample_next_xid); @@ -3569,8 +3606,7 @@ void *ptlrpcd_alloc_work(struct obd_import *imp, req->rq_no_delay = req->rq_no_resend = 1; req->rq_pill.rc_fmt = (void *)&worker_format; - CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args)); - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->cb = cb; args->cbdata = cbdata;