#include <linux/delay.h>
#include <linux/random.h>
+#include <lnet/lib-lnet.h>
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_lib.h>
put_page(BD_GET_KIOV(desc, i).kiov_page);
}
+static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
+ void *frag, int len)
+{
+ unsigned int offset = (uintptr_t)frag & ~PAGE_MASK;
+
+ ENTRY;
+ while (len > 0) {
+ int page_len = min_t(unsigned int, PAGE_SIZE - offset,
+ len);
+ uintptr_t vaddr = (uintptr_t) frag;
+
+ ptlrpc_prep_bulk_page_nopin(desc,
+ lnet_kvaddr_to_page(vaddr),
+ offset, page_len);
+ offset = 0;
+ len -= page_len;
+ frag += page_len;
+ }
+
+ RETURN(desc->bd_nob);
+}
+
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
.add_kiov_frag = ptlrpc_prep_bulk_page_pin,
.release_frags = ptlrpc_release_bulk_page_pin,
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
.add_kiov_frag = ptlrpc_prep_bulk_page_nopin,
.release_frags = ptlrpc_release_bulk_noop,
+ .add_iov_frag = ptlrpc_prep_bulk_frag_pages,
};
EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
if (!desc)
RETURN(NULL);
- desc->bd_import_generation = req->rq_import_generation;
desc->bd_import = class_import_get(imp);
desc->bd_req = req;
if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
+ DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d",
+ rc);
return -EPROTO;
}
}
rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
+ DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d",
+ rc);
return -EPROTO;
}
return 0;
req->rq_deadline = req->rq_sent + req->rq_timeout +
ptlrpc_at_get_net_latency(req);
+ /* The below message is checked in replay-single.sh test_65{a,b} */
+ /* The below message is checked in sanity-{gss,krb5} test_8 */
DEBUG_REQ(D_ADAPTTO, req,
"Early reply #%d, new deadline in %llds (%llds)",
req->rq_early_count,
{
struct ptlrpc_request_pool *pool;
- OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
+ OBD_ALLOC_PTR(pool);
if (!pool)
return NULL;
static atomic64_t ptlrpc_last_xid;
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+ spin_lock(&req->rq_import->imp_lock);
+ list_del_init(&req->rq_unreplied_list);
+ ptlrpc_assign_next_xid_nolock(req);
+ spin_unlock(&req->rq_import->imp_lock);
+ DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc;
+ __u16 tag;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ tag = obd_get_mod_rpc_slot(cli, opc);
+ lustre_msg_set_tag(req->rq_reqmsg, tag);
+ ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ if (tag != 0) {
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ obd_put_mod_rpc_slot(cli, opc, tag);
+ }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
+
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
struct ptlrpc_cli_ctx *ctx)
const struct req_format *format)
{
struct ptlrpc_request *request;
- int connect = 0;
request = __ptlrpc_request_alloc(imp, pool);
if (!request)
if (imp->imp_state == LUSTRE_IMP_IDLE) {
imp->imp_generation++;
imp->imp_initiated_at = imp->imp_generation;
- imp->imp_state = LUSTRE_IMP_NEW;
- connect = 1;
- }
- spin_unlock(&imp->imp_lock);
- if (connect) {
- rc = ptlrpc_connect_import(imp);
+ imp->imp_state = LUSTRE_IMP_NEW;
+
+ /* connect_import_locked releases imp_lock */
+ rc = ptlrpc_connect_import_locked(imp);
if (rc < 0) {
ptlrpc_request_free(request);
return NULL;
}
ptlrpc_pinger_add_import(imp);
+ } else {
+ spin_unlock(&imp->imp_lock);
}
}
if (req->rq_ctx_init || req->rq_ctx_fini) {
/* always allow ctx init/fini rpc go through */
} else if (imp->imp_state == LUSTRE_IMP_NEW) {
- DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
+ DEBUG_REQ(D_ERROR, req, "Uninitialized import");
*status = -EIO;
} else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg);
* race with umount
*/
DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ?
- D_HA : D_ERROR, req, "IMP_CLOSED ");
+ D_HA : D_ERROR, req, "IMP_CLOSED");
*status = -EIO;
} else if (ptlrpc_send_limit_expired(req)) {
/* probably doesn't need to be a D_ERROR afterinitial testing */
- DEBUG_REQ(D_HA, req, "send limit expired ");
+ DEBUG_REQ(D_HA, req, "send limit expired");
*status = -ETIMEDOUT;
} else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
imp->imp_state == LUSTRE_IMP_CONNECTING) {
imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
imp->imp_state == LUSTRE_IMP_RECOVER)) {
- DEBUG_REQ(D_HA, req, "allow during recovery.\n");
+ DEBUG_REQ(D_HA, req, "allow during recovery");
} else {
delay = 1;
}
*/
static int ptlrpc_check_status(struct ptlrpc_request *req)
{
- int err;
+ int rc;
ENTRY;
- err = lustre_msg_get_status(req->rq_repmsg);
+ rc = lustre_msg_get_status(req->rq_repmsg);
if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
struct obd_import *imp = req->rq_import;
lnet_nid_t nid = imp->imp_connection->c_peer.nid;
__u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
- if (ptlrpc_console_allow(req, opc, err))
+ if (ptlrpc_console_allow(req, opc, rc))
LCONSOLE_ERROR_MSG(0x11,
"%s: operation %s to node %s failed: rc = %d\n",
imp->imp_obd->obd_name,
ll_opcode2str(opc),
- libcfs_nid2str(nid), err);
- RETURN(err < 0 ? err : -EINVAL);
+ libcfs_nid2str(nid), rc);
+ RETURN(rc < 0 ? rc : -EINVAL);
}
- if (err < 0) {
- DEBUG_REQ(D_INFO, req, "status is %d", err);
- } else if (err > 0) {
- /* XXX: translate this error from net to host */
- DEBUG_REQ(D_INFO, req, "status is %d", err);
- }
+ if (rc)
+ DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc);
- RETURN(err);
+ RETURN(rc);
}
/**
if (req->rq_reply_truncated) {
if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_ERROR, req,
- "reply buffer overflow, expected: %d, actual size: %d",
+ "reply buffer overflow, expected=%d, actual size=%d",
req->rq_nob_received, req->rq_repbuf_len);
RETURN(-EOVERFLOW);
}
*/
rc = sptlrpc_cli_unwrap_reply(req);
if (rc) {
- DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
+ DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc);
RETURN(rc);
}
ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
time64_t now = ktime_get_real_seconds();
- DEBUG_REQ(req->rq_nr_resend > 0 ? D_ERROR : D_RPCTRACE, req,
- "Resending request on EINPROGRESS");
+ DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) |
+ D_RPCTRACE, req, "resending request on EINPROGRESS");
spin_lock(&req->rq_lock);
req->rq_resend = 1;
spin_unlock(&req->rq_lock);
RETURN(rc);
}
if (rc) {
- DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
+ DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d",
+ rc);
spin_lock(&req->rq_lock);
req->rq_net_err = 1;
spin_unlock(&req->rq_lock);
if (!ptlrpc_client_replied(req) ||
(req->rq_bulk &&
lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) {
- DEBUG_REQ(D_ERROR, req, "request replay timed out.\n");
+ DEBUG_REQ(D_ERROR, req, "request replay timed out");
GOTO(out, rc = -ETIMEDOUT);
}
/** VBR: check version failure */
if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
/** replay was failed due to version mismatch */
- DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
+ DEBUG_REQ(D_WARNING, req, "Version mismatch during replay");
spin_lock(&imp->imp_lock);
imp->imp_vbr_failed = 1;
spin_unlock(&imp->imp_lock);
/* transaction number shouldn't be bigger than the latest replayed */
if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
DEBUG_REQ(D_ERROR, req,
- "Reported transno %llu is bigger than the replayed one: %llu",
+ "Reported transno=%llu is bigger than replayed=%llu",
req->rq_transno,
lustre_msg_get_transno(req->rq_reqmsg));
GOTO(out, rc = -EINVAL);
}
- DEBUG_REQ(D_HA, req, "got rep");
+ DEBUG_REQ(D_HA, req, "got reply");
/* let the callback do fixups, possibly including in the request */
if (req->rq_replay_cb)
}
/* Need to always be aligned to a power-of-two for mutli-bulk BRW */
- CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
+ BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) !=
+ 0);
xid &= PTLRPC_BULK_OPS_MASK;
atomic64_set(&ptlrpc_last_xid, xid);
}