*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_RPC
#include <lustre_sec.h>
#include "ptlrpc_internal.h"
-struct lnet_eq *ptlrpc_eq;
+lnet_handler_t ptlrpc_handler;
+struct percpu_ref ptlrpc_pending;
/*
* Client's outgoing request callback
*/
void request_out_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
struct ptlrpc_request *req = cbid->cbid_arg;
bool wakeup = false;
ENTRY;
LASSERT(ev->type == LNET_EVENT_SEND || ev->type == LNET_EVENT_UNLINK);
LASSERT(ev->unlinked);
+ if (unlikely(lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val &&
+ CFS_FAIL_CHECK_RESET(OBD_FAIL_NET_ERROR_RPC,
+ OBD_FAIL_OSP_PRECREATE_PAUSE |
+ CFS_FAIL_ONCE)))
+ ev->status = -ECONNABORTED;
+
DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
+ /* Do not update imp_next_ping for connection request */
+ if (lustre_msg_get_opc(req->rq_reqmsg) !=
+ req->rq_import->imp_connect_op)
+ ptlrpc_pinger_sending_on_import(req->rq_import);
+
sptlrpc_request_out_callback(req);
spin_lock(&req->rq_lock);
*/
void reply_in_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_request *req = cbid->cbid_arg;
- ENTRY;
+ struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
+ struct ptlrpc_request *req = cbid->cbid_arg;
+ ENTRY;
- DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
+ DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
- LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
- LASSERT (ev->md.start == req->rq_repbuf);
- LASSERT (ev->offset + ev->mlength <= req->rq_repbuf_len);
- /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
- for adaptive timeouts' early reply. */
- LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
+ LASSERT(ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
+ LASSERT(ev->md_start == req->rq_repbuf);
+ LASSERT(ev->offset + ev->mlength <= req->rq_repbuf_len);
+ /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
+ * for adaptive timeouts' early reply.
+ */
+ LASSERT((ev->md_options & LNET_MD_MANAGE_REMOTE) != 0);
spin_lock(&req->rq_lock);
*/
void client_bulk_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
- struct ptlrpc_request *req;
- ENTRY;
+ struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
+ struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
+ struct ptlrpc_request *req;
+ ENTRY;
LASSERT((ptlrpc_is_bulk_put_sink(desc->bd_type) &&
ev->type == LNET_EVENT_PUT) ||
ev->type == LNET_EVENT_UNLINK);
LASSERT(ev->unlinked);
- if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
- ev->status = -EIO;
+ if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
+ ev->status = -EIO;
- if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
- ev->status = -EIO;
+ if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
+ ev->status = -EIO;
- CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
- "event type %d, status %d, desc %p\n",
- ev->type, ev->status, desc);
+ CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+ "event type %d, status %d, desc %p\n",
+ ev->type, ev->status, desc);
spin_lock(&desc->bd_lock);
req = desc->bd_req;
- LASSERT(desc->bd_md_count > 0);
- desc->bd_md_count--;
+ LASSERT(desc->bd_refs > 0);
+ desc->bd_refs--;
if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
desc->bd_nob_transferred += ev->mlength;
spin_lock(&req->rq_lock);
req->rq_net_err = 1;
spin_unlock(&req->rq_lock);
+ desc->bd_failure = 1;
}
- if (ev->status != 0)
- desc->bd_failure = 1;
/* NB don't unlock till after wakeup; desc can disappear under us
* otherwise */
- if (desc->bd_md_count == 0)
+ if (desc->bd_refs == 0)
ptlrpc_client_wake_req(desc->bd_req);
spin_unlock(&desc->bd_lock);
*/
void request_in_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
- struct ptlrpc_service *service = svcpt->scp_service;
- struct ptlrpc_request *req;
- ENTRY;
+ struct ptlrpc_service *service = svcpt->scp_service;
+ struct ptlrpc_request *req;
+ ENTRY;
- LASSERT (ev->type == LNET_EVENT_PUT ||
- ev->type == LNET_EVENT_UNLINK);
- LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
- LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
- rqbd->rqbd_buffer + service->srv_buf_size);
+ LASSERT(ev->type == LNET_EVENT_PUT ||
+ ev->type == LNET_EVENT_UNLINK);
+ LASSERT((char *)ev->md_start >= rqbd->rqbd_buffer);
+ LASSERT((char *)ev->md_start + ev->offset + ev->mlength <=
+ rqbd->rqbd_buffer + service->srv_buf_size);
- CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
- "event type %d, status %d, service %s\n",
- ev->type, ev->status, service->srv_name);
+ CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+ "event type %d, status %d, service %s\n",
+ ev->type, ev->status, service->srv_name);
- if (ev->unlinked) {
- /* If this is the last request message to fit in the
- * request buffer we can use the request object embedded in
- * rqbd. Note that if we failed to allocate a request,
- * we'd have to re-post the rqbd, which we can't do in this
- * context. */
- req = &rqbd->rqbd_req;
- memset(req, 0, sizeof (*req));
- } else {
- LASSERT (ev->type == LNET_EVENT_PUT);
- if (ev->status != 0) {
- /* We moaned above already... */
- return;
- }
+ if (ev->unlinked) {
+ /* If this is the last request message to fit in the
+ * request buffer we can use the request object embedded in
+ * rqbd. Note that if we failed to allocate a request,
+ * we'd have to re-post the rqbd, which we can't do in this
+ * context.
+ */
+ req = &rqbd->rqbd_req;
+ memset(req, 0, sizeof(*req));
+ } else {
+ LASSERT(ev->type == LNET_EVENT_PUT);
+ if (ev->status != 0) /* We moaned above already... */
+ return;
req = ptlrpc_request_cache_alloc(GFP_ATOMIC);
if (req == NULL) {
CERROR("Can't allocate incoming request descriptor: "
* flags are reset and scalars are zero. We only set the message
* size to non-zero if this was a successful receive. */
req->rq_xid = ev->match_bits;
- req->rq_reqbuf = ev->md.start + ev->offset;
+ req->rq_reqbuf = ev->md_start + ev->offset;
if (ev->type == LNET_EVENT_PUT && ev->status == 0)
req->rq_reqdata_len = ev->mlength;
ktime_get_real_ts64(&req->rq_arrival_time);
*/
void reply_out_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
struct ptlrpc_reply_state *rs = cbid->cbid_arg;
struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
ENTRY;
*/
void server_bulk_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
ENTRY;
(ptlrpc_is_bulk_get_sink(desc->bd_type) &&
ev->type == LNET_EVENT_REPLY));
- CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
- "event type %d, status %d, desc %p\n",
- ev->type, ev->status, desc);
+ CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+ "event type %d, status %d, desc %p\n",
+ ev->type, ev->status, desc);
spin_lock(&desc->bd_lock);
- LASSERT(desc->bd_md_count > 0);
+ LASSERT(desc->bd_refs > 0);
if ((ev->type == LNET_EVENT_ACK ||
ev->type == LNET_EVENT_REPLY) &&
desc->bd_failure = 1;
if (ev->unlinked) {
- desc->bd_md_count--;
+ desc->bd_refs--;
/* This is the last callback no matter what... */
- if (desc->bd_md_count == 0)
+ if (desc->bd_refs == 0)
wake_up(&desc->bd_waitq);
}
static void ptlrpc_master_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
void (*callback)(struct lnet_event *ev) = cbid->cbid_fn;
- /* Honestly, it's best to find out early. */
- LASSERT (cbid->cbid_arg != LP_POISON);
- LASSERT (callback == request_out_callback ||
- callback == reply_in_callback ||
- callback == client_bulk_callback ||
- callback == request_in_callback ||
- callback == reply_out_callback
+ /* Honestly, it's best to find out early. */
+ LASSERT(cbid->cbid_arg != LP_POISON);
+ LASSERT(callback == request_out_callback ||
+ callback == reply_in_callback ||
+ callback == client_bulk_callback ||
+ callback == request_in_callback ||
+ callback == reply_out_callback
#ifdef HAVE_SERVER_SUPPORT
- || callback == server_bulk_callback
+ || callback == server_bulk_callback
#endif
- );
+ );
- callback (ev);
+ callback(ev);
+ if (ev->unlinked)
+ percpu_ref_put(&ptlrpc_pending);
}
int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
struct lnet_process_id *peer, lnet_nid_t *self)
{
- int best_dist = 0;
- __u32 best_order = 0;
- int count = 0;
- int rc = -ENOENT;
- int dist;
- __u32 order;
- lnet_nid_t dst_nid;
- lnet_nid_t src_nid;
+ int best_dist = 0;
+ __u32 best_order = 0;
+ int count = 0;
+ int rc = -ENOENT;
+ int dist;
+ __u32 order;
+ lnet_nid_t dst_nid;
+ lnet_nid_t src_nid;
peer->pid = LNET_PID_LUSTRE;
continue;
if (dist == 0) { /* local! use loopback LND */
- peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
+ peer->nid = *self = LNET_NID_LO_0;
rc = 0;
break;
}
return rc;
}
-void ptlrpc_ni_fini(void)
+static struct completion ptlrpc_done;
+
+static void ptlrpc_release(struct percpu_ref *ref)
{
- int rc;
- int retries;
+ complete(&ptlrpc_done);
+}
+static void ptlrpc_ni_fini(void)
+{
/* Wait for the event queue to become idle since there may still be
* messages in flight with pending events (i.e. the fire-and-forget
* messages == client requests and "non-difficult" server
* replies */
- for (retries = 0;; retries++) {
- rc = LNetEQFree(ptlrpc_eq);
- switch (rc) {
- default:
- LBUG();
+ init_completion(&ptlrpc_done);
+ percpu_ref_kill(&ptlrpc_pending);
+ wait_for_completion(&ptlrpc_done);
- case 0:
- LNetNIFini();
- return;
-
- case -EBUSY:
- if (retries != 0)
- CWARN("Event queue still busy\n");
-
- /* Wait for a bit */
- ssleep(2);
- break;
- }
- }
- /* notreached */
+ lnet_assert_handler_unused(ptlrpc_handler);
+ LNetNIFini();
}
lnet_pid_t ptl_get_pid(void)
int ptlrpc_ni_init(void)
{
- int rc;
- lnet_pid_t pid;
+ int rc;
+ lnet_pid_t pid;
- pid = ptl_get_pid();
- CDEBUG(D_NET, "My pid is: %x\n", pid);
+ pid = ptl_get_pid();
+ CDEBUG(D_NET, "My pid is: %x\n", pid);
- /* We're not passing any limits yet... */
- rc = LNetNIInit(pid);
- if (rc < 0) {
- CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
+ /* We're not passing any limits yet... */
+ rc = LNetNIInit(pid);
+ if (rc < 0) {
+ CDEBUG(D_NET, "ptlrpc: Can't init network interface: rc = %d\n",
+ rc);
return rc;
- }
+ }
- /* CAVEAT EMPTOR: how we process portals events is _radically_
- * different depending on... */
+ rc = percpu_ref_init(&ptlrpc_pending, ptlrpc_release, 0, GFP_KERNEL);
+ if (rc) {
+ CERROR("ptlrpc: Can't init percpu refcount: rc = %d\n", rc);
+ return rc;
+ }
+ /* CAVEAT EMPTOR: how we process portals events is _radically_
+ * different depending on...
+ */
/* kernel LNet calls our master callback when there are new event,
* because we are guaranteed to get every event via callback,
* so we just set EQ size to 0 to avoid overhread of serializing
* enqueue/dequeue operations in LNet. */
- ptlrpc_eq = LNetEQAlloc(0, ptlrpc_master_callback);
- if (!IS_ERR(ptlrpc_eq))
- return 0;
-
- rc = PTR_ERR(ptlrpc_eq);
- CERROR("Failed to allocate event queue: %d\n", rc);
- LNetNIFini();
-
- return rc;
+ ptlrpc_handler = ptlrpc_master_callback;
+ return 0;
}
int ptlrpc_init_portals(void)