X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=85bb30d1326475897ea8dd9b2853f458cad01925;hb=917655fc2938b90a9c246dd2d58408c42aa1658d;hp=03e61642146965a73feee17bea9a7e52364acda0;hpb=c3cc11a40ff6188c01b0eb9825ef4fa7617af448;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 03e6164..85bb30d 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -35,6 +35,9 @@ #define DEBUG_SUBSYSTEM S_RPC #include +#include + +#include #include #include #include @@ -44,6 +47,50 @@ #include "ptlrpc_internal.h" +static void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1); +} + +static void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc, + struct page *page, int pageoffset, + int len) +{ + __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0); +} + +static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc) +{ + int i; + + for (i = 0; i < desc->bd_iov_count ; i++) + put_page(BD_GET_KIOV(desc, i).kiov_page); +} + +static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc, + void *frag, int len) +{ + unsigned int offset = (uintptr_t)frag & ~PAGE_MASK; + + ENTRY; + while (len > 0) { + int page_len = min_t(unsigned int, PAGE_SIZE - offset, + len); + uintptr_t vaddr = (uintptr_t) frag; + + ptlrpc_prep_bulk_page_nopin(desc, + lnet_kvaddr_to_page(vaddr), + offset, page_len); + offset = 0; + len -= page_len; + frag += page_len; + } + + RETURN(desc->bd_nob); +} + const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_pin, .release_frags = ptlrpc_release_bulk_page_pin, @@ -53,6 +100,7 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops); const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = { .add_kiov_frag = ptlrpc_prep_bulk_page_nopin, .release_frags = ptlrpc_release_bulk_noop, + .add_iov_frag = ptlrpc_prep_bulk_frag_pages, }; EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops); @@ -68,7 +116,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async); /** * Initialize passed in client structure \a cl. */ -void ptlrpc_init_client(int req_portal, int rep_portal, char *name, +void ptlrpc_init_client(int req_portal, int rep_portal, const char *name, struct ptlrpc_client *cl) { cl->cli_request_portal = req_portal; @@ -152,7 +200,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, desc->bd_portal = portal; desc->bd_type = type; desc->bd_md_count = 0; - desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *)ops; + desc->bd_frag_ops = ops; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); /* @@ -193,7 +241,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, if (!desc) RETURN(NULL); - desc->bd_import_generation = req->rq_import_generation; desc->bd_import = class_import_get(imp); desc->bd_req = req; @@ -413,14 +460,16 @@ static int unpack_reply(struct ptlrpc_request *req) if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) { rc = ptlrpc_unpack_rep_msg(req, req->rq_replen); if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc); + DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d", + rc); return -EPROTO; } } rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); if (rc) { - DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc); + DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d", + rc); return -EPROTO; } return 0; @@ -481,6 +530,8 @@ __must_hold(&req->rq_lock) req->rq_deadline = req->rq_sent + req->rq_timeout + ptlrpc_at_get_net_latency(req); + /* The below message is checked in replay-single.sh test_65{a,b} */ + /* The below message is checked in sanity-{gss,krb5} test_8 */ DEBUG_REQ(D_ADAPTTO, req, "Early reply #%d, new deadline in %llds (%llds)", req->rq_early_count, @@ -559,13 +610,11 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) "Trying to change pool size with nonempty pool from %d to %d bytes\n", pool->prp_rq_size, size); - spin_lock(&pool->prp_lock); pool->prp_rq_size = size; for (i = 0; i < num_rq; i++) { struct ptlrpc_request *req; struct lustre_msg *msg; - spin_unlock(&pool->prp_lock); req = ptlrpc_request_cache_alloc(GFP_NOFS); if (!req) return i; @@ -579,8 +628,8 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) req->rq_pool = pool; spin_lock(&pool->prp_lock); list_add_tail(&req->rq_list, &pool->prp_req_list); + spin_unlock(&pool->prp_lock); } - spin_unlock(&pool->prp_lock); return num_rq; } EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); @@ -599,7 +648,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, { struct ptlrpc_request_pool *pool; - OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool)); + OBD_ALLOC_PTR(pool); if (!pool) return NULL; @@ -710,8 +759,42 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req) spin_unlock(&req->rq_import->imp_lock); } -static __u64 ptlrpc_last_xid; -static spinlock_t ptlrpc_last_xid_lock; +static atomic64_t ptlrpc_last_xid; + +static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_import->imp_lock); + list_del_init(&req->rq_unreplied_list); + ptlrpc_assign_next_xid_nolock(req); + spin_unlock(&req->rq_import->imp_lock); + DEBUG_REQ(D_RPCTRACE, req, "reassign xid"); +} + +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc; + __u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc); + lustre_msg_set_tag(req->rq_reqmsg, tag); + ptlrpc_reassign_next_xid(req); +} +EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot); + +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req) +{ + __u16 tag = lustre_msg_get_tag(req->rq_reqmsg); + + if (tag != 0) { + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + obd_put_mod_rpc_slot(cli, opc, tag); + } +} +EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot); int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, __u32 version, int opcode, char **bufs, @@ -780,9 +863,9 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, fail2_t = &request->rq_bulk_deadline; } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) { time64_t now = ktime_get_real_seconds(); - spin_lock(&ptlrpc_last_xid_lock); - ptlrpc_last_xid = ((__u64)now >> 4) << 24; - spin_unlock(&ptlrpc_last_xid_lock); + u64 xid = ((u64)now >> 4) << 24; + + atomic64_set(&ptlrpc_last_xid, xid); } if (fail_t) { @@ -820,32 +903,7 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack); int ptlrpc_request_pack(struct ptlrpc_request *request, __u32 version, int opcode) { - int rc; - - rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); - if (rc) - return rc; - - /* - * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of - * ptlrpc_body sent from server equal to local ptlrpc_body size, so we - * have to send old ptlrpc_body to keep interoprability with these - * clients. - * - * Only three kinds of server->client RPCs so far: - * - LDLM_BL_CALLBACK - * - LDLM_CP_CALLBACK - * - LDLM_GL_CALLBACK - * - * XXX This should be removed whenever we drop the interoprability with - * the these old clients. - */ - if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK || - opcode == LDLM_GL_CALLBACK) - req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY, - sizeof(struct ptlrpc_body_v2), RCL_CLIENT); - - return rc; + return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); } EXPORT_SYMBOL(ptlrpc_request_pack); @@ -895,7 +953,6 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, const struct req_format *format) { struct ptlrpc_request *request; - int connect = 0; request = __ptlrpc_request_alloc(imp, pool); if (!request) @@ -916,17 +973,17 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, if (imp->imp_state == LUSTRE_IMP_IDLE) { imp->imp_generation++; imp->imp_initiated_at = imp->imp_generation; - imp->imp_state = LUSTRE_IMP_NEW; - connect = 1; - } - spin_unlock(&imp->imp_lock); - if (connect) { - rc = ptlrpc_connect_import(imp); + imp->imp_state = LUSTRE_IMP_NEW; + + /* connect_import_locked releases imp_lock */ + rc = ptlrpc_connect_import_locked(imp); if (rc < 0) { ptlrpc_request_free(request); return NULL; } ptlrpc_pinger_add_import(imp); + } else { + spin_unlock(&imp->imp_lock); } } @@ -1204,7 +1261,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, if (req->rq_ctx_init || req->rq_ctx_fini) { /* always allow ctx init/fini rpc go through */ } else if (imp->imp_state == LUSTRE_IMP_NEW) { - DEBUG_REQ(D_ERROR, req, "Uninitialized import."); + DEBUG_REQ(D_ERROR, req, "Uninitialized import"); *status = -EIO; } else if (imp->imp_state == LUSTRE_IMP_CLOSED) { unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg); @@ -1214,11 +1271,11 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, * race with umount */ DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ? - D_HA : D_ERROR, req, "IMP_CLOSED "); + D_HA : D_ERROR, req, "IMP_CLOSED"); *status = -EIO; } else if (ptlrpc_send_limit_expired(req)) { /* probably doesn't need to be a D_ERROR afterinitial testing */ - DEBUG_REQ(D_HA, req, "send limit expired "); + DEBUG_REQ(D_HA, req, "send limit expired"); *status = -ETIMEDOUT; } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING && imp->imp_state == LUSTRE_IMP_CONNECTING) { @@ -1248,7 +1305,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || imp->imp_state == LUSTRE_IMP_REPLAY_WAIT || imp->imp_state == LUSTRE_IMP_RECOVER)) { - DEBUG_REQ(D_HA, req, "allow during recovery.\n"); + DEBUG_REQ(D_HA, req, "allow during recovery"); } else { delay = 1; } @@ -1305,32 +1362,28 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err) */ static int ptlrpc_check_status(struct ptlrpc_request *req) { - int err; + int rc; ENTRY; - err = lustre_msg_get_status(req->rq_repmsg); + rc = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { struct obd_import *imp = req->rq_import; lnet_nid_t nid = imp->imp_connection->c_peer.nid; __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); - if (ptlrpc_console_allow(req, opc, err)) + if (ptlrpc_console_allow(req, opc, rc)) LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s failed: rc = %d\n", imp->imp_obd->obd_name, ll_opcode2str(opc), - libcfs_nid2str(nid), err); - RETURN(err < 0 ? err : -EINVAL); + libcfs_nid2str(nid), rc); + RETURN(rc < 0 ? rc : -EINVAL); } - if (err < 0) { - DEBUG_REQ(D_INFO, req, "status is %d", err); - } else if (err > 0) { - /* XXX: translate this error from net to host */ - DEBUG_REQ(D_INFO, req, "status is %d", err); - } + if (rc) + DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc); - RETURN(err); + RETURN(rc); } /** @@ -1398,7 +1451,7 @@ static int after_reply(struct ptlrpc_request *req) if (req->rq_reply_truncated) { if (ptlrpc_no_resend(req)) { DEBUG_REQ(D_ERROR, req, - "reply buffer overflow, expected: %d, actual size: %d", + "reply buffer overflow, expected=%d, actual size=%d", req->rq_nob_received, req->rq_repbuf_len); RETURN(-EOVERFLOW); } @@ -1427,7 +1480,7 @@ static int after_reply(struct ptlrpc_request *req) */ rc = sptlrpc_cli_unwrap_reply(req); if (rc) { - DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc); + DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc); RETURN(rc); } @@ -1446,7 +1499,8 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { time64_t now = ktime_get_real_seconds(); - DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS"); + DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) | + D_RPCTRACE, req, "resending request on EINPROGRESS"); spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); @@ -1674,11 +1728,12 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) } CDEBUG(D_RPCTRACE, - "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n", - current_comm(), + "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, - obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg)); + obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); rc = ptl_send_rpc(req, 0); if (rc == -ENOMEM) { @@ -1692,7 +1747,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) RETURN(rc); } if (rc) { - DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc); + DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d", + rc); spin_lock(&req->rq_lock); req->rq_net_err = 1; spin_unlock(&req->rq_lock); @@ -1928,9 +1984,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * put on delay list - only if we wait * recovery finished - before send */ - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp->imp_delayed_list); + list_move_tail(&req->rq_list, + &imp->imp_delayed_list); spin_unlock(&imp->imp_lock); continue; } @@ -1954,9 +2009,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) GOTO(interpret, req->rq_status); } - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, - &imp->imp_sending_list); + list_move_tail(&req->rq_list, + &imp->imp_sending_list); spin_unlock(&imp->imp_lock); @@ -2125,13 +2179,14 @@ interpret: if (req->rq_reqmsg) CDEBUG(D_RPCTRACE, - "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n", - current_comm(), + "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n", + req, current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, obd_import_nid2str(imp), - lustre_msg_get_opc(req->rq_reqmsg)); + lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_jobid(req->rq_reqmsg) ?: ""); spin_lock(&imp->imp_lock); /* @@ -2408,7 +2463,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) struct list_head *tmp; struct ptlrpc_request *req; struct l_wait_info lwi; - struct lu_env _env; time64_t timeout; int rc; @@ -2426,20 +2480,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) if (list_empty(&set->set_requests)) RETURN(0); - /* - * ideally we want env provide by the caller all the time, - * but at the moment that would mean a massive change in - * LDLM while benefits would be close to zero, so just - * initialize env here for those rare cases - */ - if (!env) { - /* XXX: skip on the client side? */ - rc = lu_env_init(&_env, LCT_DT_THREAD); - if (rc) - RETURN(rc); - env = &_env; - } - do { timeout = ptlrpc_set_next_timeout(set); @@ -2472,7 +2512,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) ptlrpc_expired_set, set); rc = l_wait_event(set->set_waitq, - ptlrpc_check_set(env, set), &lwi); + ptlrpc_check_set(NULL, set), &lwi); /* * LU-769 - if we ignored the signal because it was already @@ -2529,9 +2569,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) rc = req->rq_status; } - if (env && env == &_env) - lu_env_fini(&_env); - RETURN(rc); } EXPORT_SYMBOL(ptlrpc_set_wait); @@ -3075,7 +3112,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, if (!ptlrpc_client_replied(req) || (req->rq_bulk && lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) { - DEBUG_REQ(D_ERROR, req, "request replay timed out.\n"); + DEBUG_REQ(D_ERROR, req, "request replay timed out"); GOTO(out, rc = -ETIMEDOUT); } @@ -3087,7 +3124,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, /** VBR: check version failure */ if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { /** replay was failed due to version mismatch */ - DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n"); + DEBUG_REQ(D_WARNING, req, "Version mismatch during replay"); spin_lock(&imp->imp_lock); imp->imp_vbr_failed = 1; spin_unlock(&imp->imp_lock); @@ -3110,13 +3147,13 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, /* transaction number shouldn't be bigger than the latest replayed */ if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { DEBUG_REQ(D_ERROR, req, - "Reported transno %llu is bigger than the replayed one: %llu", + "Reported transno=%llu is bigger than replayed=%llu", req->rq_transno, lustre_msg_get_transno(req->rq_reqmsg)); GOTO(out, rc = -EINVAL); } - DEBUG_REQ(D_HA, req, "got rep"); + DEBUG_REQ(D_HA, req, "got reply"); /* let the callback do fixups, possibly including in the request */ if (req->rq_replay_cb) @@ -3209,8 +3246,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); memset(aa, 0, sizeof(*aa)); /* Prepare request to be resent with ptlrpcd */ @@ -3246,14 +3282,14 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) void ptlrpc_abort_inflight(struct obd_import *imp) { struct list_head *tmp, *n; - ENTRY; + /* * Make sure that no new requests get processed for this import. * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing * this flag and then putting requests on sending_list or delayed_list. */ - spin_lock(&imp->imp_lock); + assert_spin_locked(&imp->imp_lock); /* * XXX locking? Maybe we should remove each request with the list @@ -3298,8 +3334,6 @@ void ptlrpc_abort_inflight(struct obd_import *imp) if (imp->imp_replayable) ptlrpc_free_committed(imp); - spin_unlock(&imp->imp_lock); - EXIT; } @@ -3349,19 +3383,21 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set) void ptlrpc_init_xid(void) { time64_t now = ktime_get_real_seconds(); + u64 xid; - spin_lock_init(&ptlrpc_last_xid_lock); if (now < YEAR_2004) { - cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid)); - ptlrpc_last_xid >>= 2; - ptlrpc_last_xid |= (1ULL << 61); + get_random_bytes(&xid, sizeof(xid)); + xid >>= 2; + xid |= (1ULL << 61); } else { - ptlrpc_last_xid = (__u64)now << 20; + xid = (u64)now << 20; } /* Need to always be aligned to a power-of-two for mutli-bulk BRW */ - CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0); - ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK; + BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) != + 0); + xid &= PTLRPC_BULK_OPS_MASK; + atomic64_set(&ptlrpc_last_xid, xid); } /** @@ -3378,14 +3414,7 @@ void ptlrpc_init_xid(void) */ __u64 ptlrpc_next_xid(void) { - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - ptlrpc_last_xid = next; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; + return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid); } /** @@ -3474,19 +3503,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) */ __u64 ptlrpc_sample_next_xid(void) { -#if BITS_PER_LONG == 32 - /* need to avoid possible word tearing on 32-bit systems */ - __u64 next; - - spin_lock(&ptlrpc_last_xid_lock); - next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; - spin_unlock(&ptlrpc_last_xid_lock); - - return next; -#else - /* No need to lock, since returned value is racy anyways */ - return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; -#endif + return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT; } EXPORT_SYMBOL(ptlrpc_sample_next_xid); @@ -3585,8 +3602,7 @@ void *ptlrpcd_alloc_work(struct obd_import *imp, req->rq_no_delay = req->rq_no_resend = 1; req->rq_pill.rc_fmt = (void *)&worker_format; - CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args)); - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->cb = cb; args->cbdata = cbdata;