#define DEBUG_SUBSYSTEM S_RPC
#include <linux/delay.h>
+#include <linux/random.h>
+
+#include <lnet/lib-lnet.h>
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_lib.h>
#include "ptlrpc_internal.h"
+static void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
+ struct page *page, int pageoffset,
+ int len)
+{
+ __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1);
+}
+
+static void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
+ struct page *page, int pageoffset,
+ int len)
+{
+ __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
+}
+
+static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
+{
+ int i;
+
+ for (i = 0; i < desc->bd_iov_count ; i++)
+ put_page(BD_GET_KIOV(desc, i).kiov_page);
+}
+
+static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
+ void *frag, int len)
+{
+ unsigned int offset = (uintptr_t)frag & ~PAGE_MASK;
+
+ ENTRY;
+ while (len > 0) {
+ int page_len = min_t(unsigned int, PAGE_SIZE - offset,
+ len);
+ uintptr_t vaddr = (uintptr_t) frag;
+
+ ptlrpc_prep_bulk_page_nopin(desc,
+ lnet_kvaddr_to_page(vaddr),
+ offset, page_len);
+ offset = 0;
+ len -= page_len;
+ frag += page_len;
+ }
+
+ RETURN(desc->bd_nob);
+}
+
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
.add_kiov_frag = ptlrpc_prep_bulk_page_pin,
.release_frags = ptlrpc_release_bulk_page_pin,
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
.add_kiov_frag = ptlrpc_prep_bulk_page_nopin,
.release_frags = ptlrpc_release_bulk_noop,
+ .add_iov_frag = ptlrpc_prep_bulk_frag_pages,
};
EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
/**
* Initialize passed in client structure \a cl.
*/
-void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
+void ptlrpc_init_client(int req_portal, int rep_portal, const char *name,
struct ptlrpc_client *cl)
{
cl->cli_request_portal = req_portal;
desc->bd_portal = portal;
desc->bd_type = type;
desc->bd_md_count = 0;
- desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *)ops;
+ desc->bd_frag_ops = ops;
LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
/*
if (!desc)
RETURN(NULL);
- desc->bd_import_generation = req->rq_import_generation;
desc->bd_import = class_import_get(imp);
desc->bd_req = req;
if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
+ DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d",
+ rc);
return -EPROTO;
}
}
rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
+ DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d",
+ rc);
return -EPROTO;
}
return 0;
req->rq_deadline = req->rq_sent + req->rq_timeout +
ptlrpc_at_get_net_latency(req);
+ /* The below message is checked in replay-single.sh test_65{a,b} */
+ /* The below message is checked in sanity-{gss,krb5} test_8 */
DEBUG_REQ(D_ADAPTTO, req,
"Early reply #%d, new deadline in %llds (%llds)",
req->rq_early_count,
"Trying to change pool size with nonempty pool from %d to %d bytes\n",
pool->prp_rq_size, size);
- spin_lock(&pool->prp_lock);
pool->prp_rq_size = size;
for (i = 0; i < num_rq; i++) {
struct ptlrpc_request *req;
struct lustre_msg *msg;
- spin_unlock(&pool->prp_lock);
req = ptlrpc_request_cache_alloc(GFP_NOFS);
if (!req)
return i;
req->rq_pool = pool;
spin_lock(&pool->prp_lock);
list_add_tail(&req->rq_list, &pool->prp_req_list);
+ spin_unlock(&pool->prp_lock);
}
- spin_unlock(&pool->prp_lock);
return num_rq;
}
EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
{
struct ptlrpc_request_pool *pool;
- OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
+ OBD_ALLOC_PTR(pool);
if (!pool)
return NULL;
spin_unlock(&req->rq_import->imp_lock);
}
-static __u64 ptlrpc_last_xid;
-static spinlock_t ptlrpc_last_xid_lock;
+static atomic64_t ptlrpc_last_xid;
+
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+ spin_lock(&req->rq_import->imp_lock);
+ list_del_init(&req->rq_unreplied_list);
+ ptlrpc_assign_next_xid_nolock(req);
+ spin_unlock(&req->rq_import->imp_lock);
+ DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc;
+ __u16 tag;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ tag = obd_get_mod_rpc_slot(cli, opc);
+ lustre_msg_set_tag(req->rq_reqmsg, tag);
+ ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ if (tag != 0) {
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ obd_put_mod_rpc_slot(cli, opc, tag);
+ }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
fail2_t = &request->rq_bulk_deadline;
} else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) {
time64_t now = ktime_get_real_seconds();
- spin_lock(&ptlrpc_last_xid_lock);
- ptlrpc_last_xid = ((__u64)now >> 4) << 24;
- spin_unlock(&ptlrpc_last_xid_lock);
+ u64 xid = ((u64)now >> 4) << 24;
+
+ atomic64_set(&ptlrpc_last_xid, xid);
}
if (fail_t) {
int ptlrpc_request_pack(struct ptlrpc_request *request,
__u32 version, int opcode)
{
- int rc;
-
- rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
- if (rc)
- return rc;
-
- /*
- * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
- * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
- * have to send old ptlrpc_body to keep interoprability with these
- * clients.
- *
- * Only three kinds of server->client RPCs so far:
- * - LDLM_BL_CALLBACK
- * - LDLM_CP_CALLBACK
- * - LDLM_GL_CALLBACK
- *
- * XXX This should be removed whenever we drop the interoprability with
- * the these old clients.
- */
- if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
- opcode == LDLM_GL_CALLBACK)
- req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
- sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
-
- return rc;
+ return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
}
EXPORT_SYMBOL(ptlrpc_request_pack);
const struct req_format *format)
{
struct ptlrpc_request *request;
- int connect = 0;
request = __ptlrpc_request_alloc(imp, pool);
if (!request)
if (imp->imp_state == LUSTRE_IMP_IDLE) {
imp->imp_generation++;
imp->imp_initiated_at = imp->imp_generation;
- imp->imp_state = LUSTRE_IMP_NEW;
- connect = 1;
- }
- spin_unlock(&imp->imp_lock);
- if (connect) {
- rc = ptlrpc_connect_import(imp);
+ imp->imp_state = LUSTRE_IMP_NEW;
+
+ /* connect_import_locked releases imp_lock */
+ rc = ptlrpc_connect_import_locked(imp);
if (rc < 0) {
ptlrpc_request_free(request);
return NULL;
}
ptlrpc_pinger_add_import(imp);
+ } else {
+ spin_unlock(&imp->imp_lock);
}
}
if (req->rq_ctx_init || req->rq_ctx_fini) {
/* always allow ctx init/fini rpc go through */
} else if (imp->imp_state == LUSTRE_IMP_NEW) {
- DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
+ DEBUG_REQ(D_ERROR, req, "Uninitialized import");
*status = -EIO;
} else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg);
* race with umount
*/
DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ?
- D_HA : D_ERROR, req, "IMP_CLOSED ");
+ D_HA : D_ERROR, req, "IMP_CLOSED");
*status = -EIO;
} else if (ptlrpc_send_limit_expired(req)) {
/* probably doesn't need to be a D_ERROR afterinitial testing */
- DEBUG_REQ(D_HA, req, "send limit expired ");
+ DEBUG_REQ(D_HA, req, "send limit expired");
*status = -ETIMEDOUT;
} else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
imp->imp_state == LUSTRE_IMP_CONNECTING) {
imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
imp->imp_state == LUSTRE_IMP_RECOVER)) {
- DEBUG_REQ(D_HA, req, "allow during recovery.\n");
+ DEBUG_REQ(D_HA, req, "allow during recovery");
} else {
delay = 1;
}
*/
static int ptlrpc_check_status(struct ptlrpc_request *req)
{
- int err;
+ int rc;
ENTRY;
- err = lustre_msg_get_status(req->rq_repmsg);
+ rc = lustre_msg_get_status(req->rq_repmsg);
if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
struct obd_import *imp = req->rq_import;
lnet_nid_t nid = imp->imp_connection->c_peer.nid;
__u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
- if (ptlrpc_console_allow(req, opc, err))
+ if (ptlrpc_console_allow(req, opc, rc))
LCONSOLE_ERROR_MSG(0x11,
"%s: operation %s to node %s failed: rc = %d\n",
imp->imp_obd->obd_name,
ll_opcode2str(opc),
- libcfs_nid2str(nid), err);
- RETURN(err < 0 ? err : -EINVAL);
+ libcfs_nid2str(nid), rc);
+ RETURN(rc < 0 ? rc : -EINVAL);
}
- if (err < 0) {
- DEBUG_REQ(D_INFO, req, "status is %d", err);
- } else if (err > 0) {
- /* XXX: translate this error from net to host */
- DEBUG_REQ(D_INFO, req, "status is %d", err);
- }
+ if (rc)
+ DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc);
- RETURN(err);
+ RETURN(rc);
}
/**
if (req->rq_reply_truncated) {
if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_ERROR, req,
- "reply buffer overflow, expected: %d, actual size: %d",
+ "reply buffer overflow, expected=%d, actual size=%d",
req->rq_nob_received, req->rq_repbuf_len);
RETURN(-EOVERFLOW);
}
*/
rc = sptlrpc_cli_unwrap_reply(req);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
+ DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc);
RETURN(rc);
}
ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
time64_t now = ktime_get_real_seconds();
- DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
+ DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) |
+ D_RPCTRACE, req, "resending request on EINPROGRESS");
spin_lock(&req->rq_lock);
req->rq_resend = 1;
spin_unlock(&req->rq_lock);
}
CDEBUG(D_RPCTRACE,
- "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
- current_comm(),
+ "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+ req, current_comm(),
imp->imp_obd->obd_uuid.uuid,
lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
- obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg));
+ obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg),
+ lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
rc = ptl_send_rpc(req, 0);
if (rc == -ENOMEM) {
RETURN(rc);
}
if (rc) {
- DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
+ DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d",
+ rc);
spin_lock(&req->rq_lock);
req->rq_net_err = 1;
spin_unlock(&req->rq_lock);
* put on delay list - only if we wait
* recovery finished - before send
*/
- list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list,
- &imp->imp_delayed_list);
+ list_move_tail(&req->rq_list,
+ &imp->imp_delayed_list);
spin_unlock(&imp->imp_lock);
continue;
}
GOTO(interpret, req->rq_status);
}
- list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list,
- &imp->imp_sending_list);
+ list_move_tail(&req->rq_list,
+ &imp->imp_sending_list);
spin_unlock(&imp->imp_lock);
if (req->rq_reqmsg)
CDEBUG(D_RPCTRACE,
- "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
- current_comm(),
+ "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+ req, current_comm(),
imp->imp_obd->obd_uuid.uuid,
lustre_msg_get_status(req->rq_reqmsg),
req->rq_xid,
obd_import_nid2str(imp),
- lustre_msg_get_opc(req->rq_reqmsg));
+ lustre_msg_get_opc(req->rq_reqmsg),
+ lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
spin_lock(&imp->imp_lock);
/*
set, timeout);
if ((timeout == 0 && !signal_pending(current)) ||
- set->set_allow_intr)
+ set->set_allow_intr) {
/*
* No requests are in-flight (ether timed out
* or delayed), so we can allow interrupts.
cfs_time_seconds(timeout ? timeout : 1),
ptlrpc_expired_set,
ptlrpc_interrupted_set, set);
- else
+
+ rc = l_wait_event(set->set_waitq,
+ ptlrpc_check_set(NULL, set), &lwi);
+ } else {
/*
* At least one request is in flight, so no
* interrupts are allowed. Wait until all
lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
ptlrpc_expired_set, set);
- rc = l_wait_event(set->set_waitq,
- ptlrpc_check_set(NULL, set), &lwi);
-
- /*
- * LU-769 - if we ignored the signal because it was already
- * pending when we started, we need to handle it now or we risk
- * it being ignored forever
- */
- if (rc == -ETIMEDOUT &&
- (!lwi.lwi_allow_intr || set->set_allow_intr) &&
- signal_pending(current)) {
- sigset_t blocked_sigs =
- cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+ rc = l_wait_event(set->set_waitq,
+ ptlrpc_check_set(NULL, set), &lwi);
/*
- * In fact we only interrupt for the "fatal" signals
- * like SIGINT or SIGKILL. We still ignore less
- * important signals since ptlrpc set is not easily
- * reentrant from userspace again
+ * LU-769 - if we ignored the signal because
+ * it was already pending when we started, we
+ * need to handle it now or we risk it being
+ * ignored forever
*/
- if (signal_pending(current))
- ptlrpc_interrupted_set(set);
- cfs_restore_sigs(blocked_sigs);
+ if (rc == -ETIMEDOUT &&
+ signal_pending(current)) {
+ sigset_t blocked_sigs =
+ cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+
+ /*
+ * In fact we only interrupt for the
+ * "fatal" signals like SIGINT or
+ * SIGKILL. We still ignore less
+ * important signals since ptlrpc set
+ * is not easily reentrant from
+ * userspace again
+ */
+ if (signal_pending(current))
+ ptlrpc_interrupted_set(set);
+ cfs_restore_sigs(blocked_sigs);
+ }
}
LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
*/
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
- int rc;
- struct l_wait_info lwi;
-
/*
* Might sleep.
*/
* unlinked before returning a req to the pool.
*/
for (;;) {
- /* The wq argument is ignored by user-space wait_event macros */
wait_queue_head_t *wq = (request->rq_set) ?
&request->rq_set->set_waitq :
&request->rq_reply_waitq;
+ int seconds = LONG_UNLINK;
/*
* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs
*/
- lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
- cfs_time_seconds(1), NULL, NULL);
- rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
- &lwi);
- if (rc == 0) {
+ while (seconds > 0 &&
+ wait_event_idle_timeout(
+ *wq,
+ !ptlrpc_client_recv_or_unlink(request),
+ cfs_time_seconds(1)) == 0)
+ seconds -= 1;
+ if (seconds > 0) {
ptlrpc_rqphase_move(request, request->rq_next_phase);
RETURN(1);
}
- LASSERT(rc == -ETIMEDOUT);
DEBUG_REQ(D_WARNING, request,
"Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
request->rq_receiving_reply,
if (!ptlrpc_client_replied(req) ||
(req->rq_bulk &&
lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) {
- DEBUG_REQ(D_ERROR, req, "request replay timed out.\n");
+ DEBUG_REQ(D_ERROR, req, "request replay timed out");
GOTO(out, rc = -ETIMEDOUT);
}
/** VBR: check version failure */
if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
/** replay was failed due to version mismatch */
- DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
+ DEBUG_REQ(D_WARNING, req, "Version mismatch during replay");
spin_lock(&imp->imp_lock);
imp->imp_vbr_failed = 1;
spin_unlock(&imp->imp_lock);
/* transaction number shouldn't be bigger than the latest replayed */
if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
DEBUG_REQ(D_ERROR, req,
- "Reported transno %llu is bigger than the replayed one: %llu",
+ "Reported transno=%llu is bigger than replayed=%llu",
req->rq_transno,
lustre_msg_get_transno(req->rq_reqmsg));
GOTO(out, rc = -EINVAL);
}
- DEBUG_REQ(D_HA, req, "got rep");
+ DEBUG_REQ(D_HA, req, "got reply");
/* let the callback do fixups, possibly including in the request */
if (req->rq_replay_cb)
LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
+ aa = ptlrpc_req_async_args(aa, req);
memset(aa, 0, sizeof(*aa));
/* Prepare request to be resent with ptlrpcd */
void ptlrpc_abort_inflight(struct obd_import *imp)
{
struct list_head *tmp, *n;
-
ENTRY;
+
/*
* Make sure that no new requests get processed for this import.
* ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
* this flag and then putting requests on sending_list or delayed_list.
*/
- spin_lock(&imp->imp_lock);
+ assert_spin_locked(&imp->imp_lock);
/*
* XXX locking? Maybe we should remove each request with the list
if (imp->imp_replayable)
ptlrpc_free_committed(imp);
- spin_unlock(&imp->imp_lock);
-
EXIT;
}
void ptlrpc_init_xid(void)
{
time64_t now = ktime_get_real_seconds();
+ u64 xid;
- spin_lock_init(&ptlrpc_last_xid_lock);
if (now < YEAR_2004) {
- cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
- ptlrpc_last_xid >>= 2;
- ptlrpc_last_xid |= (1ULL << 61);
+ get_random_bytes(&xid, sizeof(xid));
+ xid >>= 2;
+ xid |= (1ULL << 61);
} else {
- ptlrpc_last_xid = (__u64)now << 20;
+ xid = (u64)now << 20;
}
/* Need to always be aligned to a power-of-two for mutli-bulk BRW */
- CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
- ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
+ BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) !=
+ 0);
+ xid &= PTLRPC_BULK_OPS_MASK;
+ atomic64_set(&ptlrpc_last_xid, xid);
}
/**
*/
__u64 ptlrpc_next_xid(void)
{
- __u64 next;
-
- spin_lock(&ptlrpc_last_xid_lock);
- next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
- ptlrpc_last_xid = next;
- spin_unlock(&ptlrpc_last_xid_lock);
-
- return next;
+ return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid);
}
/**
*/
__u64 ptlrpc_sample_next_xid(void)
{
-#if BITS_PER_LONG == 32
- /* need to avoid possible word tearing on 32-bit systems */
- __u64 next;
-
- spin_lock(&ptlrpc_last_xid_lock);
- next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
- spin_unlock(&ptlrpc_last_xid_lock);
-
- return next;
-#else
- /* No need to lock, since returned value is racy anyways */
- return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-#endif
+ return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT;
}
EXPORT_SYMBOL(ptlrpc_sample_next_xid);
req->rq_no_delay = req->rq_no_resend = 1;
req->rq_pill.rc_fmt = (void *)&worker_format;
- CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
- args = ptlrpc_req_async_args(req);
+ args = ptlrpc_req_async_args(args, req);
args->cb = cb;
args->cbdata = cbdata;