LIBCFS_ALLOC_GFP(pbuf, LNET_PING_BUFFER_SIZE(nnis), gfp);
if (pbuf) {
pbuf->pb_nnis = nnis;
+ pbuf->pb_needs_post = false;
atomic_set(&pbuf->pb_refcnt, 1);
}
/* Resize the push target. */
int lnet_push_target_resize(void)
{
- struct lnet_process_id id = { LNET_NID_ANY, LNET_PID_ANY };
- struct lnet_md md = { NULL };
- struct lnet_me *me;
struct lnet_handle_md mdh;
struct lnet_handle_md old_mdh;
struct lnet_ping_buffer *pbuf;
again:
nnis = the_lnet.ln_push_target_nnis;
if (nnis <= 0) {
- rc = -EINVAL;
- goto fail_return;
+ CDEBUG(D_NET, "Invalid nnis %d\n", nnis);
+ return -EINVAL;
}
+ /* NB: lnet_ping_buffer_alloc() sets pbuf refcount to 1. That ref is
+ * dropped when we need to resize again (see "old_pbuf" below) or when
+ * LNet is shutdown (see lnet_push_target_fini())
+ */
pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
if (!pbuf) {
- rc = -ENOMEM;
- goto fail_return;
- }
-
- me = LNetMEAttach(LNET_RESERVED_PORTAL, id,
- LNET_PROTO_PING_MATCHBITS, 0,
- LNET_UNLINK, LNET_INS_AFTER);
-
- if (IS_ERR(me)) {
- rc = PTR_ERR(me);
- CERROR("Can't create push target ME: %d\n", rc);
- goto fail_decref_pbuf;
+ CDEBUG(D_NET, "Can't allocate pbuf for nnis %d\n", nnis);
+ return -ENOMEM;
}
- /* initialize md content */
- md.start = &pbuf->pb_info;
- md.length = LNET_PING_INFO_SIZE(nnis);
- md.threshold = LNET_MD_THRESH_INF;
- md.max_size = 0;
- md.options = LNET_MD_OP_PUT | LNET_MD_TRUNCATE |
- LNET_MD_MANAGE_REMOTE;
- md.user_ptr = pbuf;
- md.eq_handle = the_lnet.ln_push_target_eq;
-
- rc = LNetMDAttach(me, md, LNET_RETAIN, &mdh);
+ rc = lnet_push_target_post(pbuf, &mdh);
if (rc) {
- CERROR("Can't attach push MD: %d\n", rc);
- goto fail_unlink_me;
+ CDEBUG(D_NET, "Failed to post push target: %d\n", rc);
+ lnet_ping_buffer_decref(pbuf);
+ return rc;
}
- lnet_ping_buffer_addref(pbuf);
lnet_net_lock(LNET_LOCK_EX);
old_pbuf = the_lnet.ln_push_target;
if (old_pbuf) {
LNetMDUnlink(old_mdh);
+ /* Drop ref set by lnet_ping_buffer_alloc() */
lnet_ping_buffer_decref(old_pbuf);
}
+ /* Received another push or reply that requires a larger buffer */
if (nnis < the_lnet.ln_push_target_nnis)
goto again;
CDEBUG(D_NET, "nnis %d success\n", nnis);
-
return 0;
+}
-fail_unlink_me:
- LNetMEUnlink(me);
-fail_decref_pbuf:
- lnet_ping_buffer_decref(pbuf);
-fail_return:
- CDEBUG(D_NET, "nnis %d error %d\n", nnis, rc);
- return rc;
+int lnet_push_target_post(struct lnet_ping_buffer *pbuf,
+ struct lnet_handle_md *mdhp)
+{
+ struct lnet_process_id id = { LNET_NID_ANY, LNET_PID_ANY };
+ struct lnet_md md = { NULL };
+ struct lnet_me *me;
+ int rc;
+
+ me = LNetMEAttach(LNET_RESERVED_PORTAL, id,
+ LNET_PROTO_PING_MATCHBITS, 0,
+ LNET_UNLINK, LNET_INS_AFTER);
+ if (IS_ERR(me)) {
+ rc = PTR_ERR(me);
+ CERROR("Can't create push target ME: %d\n", rc);
+ return rc;
+ }
+
+ pbuf->pb_needs_post = false;
+
+ /* This reference is dropped by lnet_push_target_event_handler() */
+ lnet_ping_buffer_addref(pbuf);
+
+ /* initialize md content */
+ md.start = &pbuf->pb_info;
+ md.length = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
+ md.threshold = 1;
+ md.max_size = 0;
+ md.options = LNET_MD_OP_PUT | LNET_MD_TRUNCATE;
+ md.user_ptr = pbuf;
+ md.eq_handle = the_lnet.ln_push_target_eq;
+
+ rc = LNetMDAttach(me, md, LNET_UNLINK, mdhp);
+ if (rc) {
+ CERROR("Can't attach push MD: %d\n", rc);
+ LNetMEUnlink(me);
+ lnet_ping_buffer_decref(pbuf);
+ pbuf->pb_needs_post = true;
+ return rc;
+ }
+
+ CDEBUG(D_NET, "posted push target %p\n", pbuf);
+
+ return 0;
}
static void lnet_push_target_event_handler(struct lnet_event *ev)
{
struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
+ CDEBUG(D_NET, "type %d status %d unlinked %d\n", ev->type, ev->status,
+ ev->unlinked);
+
if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
lnet_swap_pinginfo(pbuf);
+ if (ev->type == LNET_EVENT_UNLINK) {
+ /* Drop ref added by lnet_push_target_post() */
+ lnet_ping_buffer_decref(pbuf);
+ return;
+ }
+
lnet_peer_push_event(ev);
if (ev->unlinked)
+ /* Drop ref added by lnet_push_target_post */
lnet_ping_buffer_decref(pbuf);
}
return rc;
}
+ rc = LNetSetLazyPortal(LNET_RESERVED_PORTAL);
+ LASSERT(rc == 0);
+
/* Start at the required minimum, we'll enlarge if required. */
the_lnet.ln_push_target_nnis = LNET_INTERFACES_MIN;
rc = lnet_push_target_resize();
if (rc) {
+ LNetClearLazyPortal(LNET_RESERVED_PORTAL);
LNetEQFree(the_lnet.ln_push_target_eq);
the_lnet.ln_push_target_eq = NULL;
}
schedule_timeout_uninterruptible(cfs_time_seconds(1));
}
+ /* Drop ref set by lnet_ping_buffer_alloc() */
lnet_ping_buffer_decref(the_lnet.ln_push_target);
the_lnet.ln_push_target = NULL;
the_lnet.ln_push_target_nnis = 0;
+ LNetClearLazyPortal(LNET_RESERVED_PORTAL);
LNetEQFree(the_lnet.ln_push_target_eq);
the_lnet.ln_push_target_eq = NULL;
}
*/
void lnet_peer_push_event(struct lnet_event *ev)
{
- struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
+ struct lnet_ping_buffer *pbuf;
struct lnet_peer *lp;
+ pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start + ev->offset);
+
/* lnet_find_peer() adds a refcount */
lp = lnet_find_peer(ev->source.nid);
if (!lp) {
CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
libcfs_nid2str(ev->initiator.nid),
libcfs_nid2str(ev->source.nid));
+ pbuf->pb_needs_post = true;
return;
}
* NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
* and tell discovery to allocate a bigger buffer.
*/
- if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
+ if (ev->mlength < ev->rlength) {
if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
LNET_PING_BUFFER_SEQNO(pbuf));
out:
+ /* We've processed this buffer. It can be reposted */
+ pbuf->pb_needs_post = true;
+
/*
* Queue the peer for discovery if not done, force it on the request
* queue and wake the discovery thread if the peer was already queued,
TASK_INTERRUPTIBLE);
if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
break;
- if (lnet_push_target_resize_needed())
+ if (lnet_push_target_resize_needed() ||
+ the_lnet.ln_push_target->pb_needs_post)
break;
if (!list_empty(&the_lnet.ln_dc_request))
break;
if (lnet_peer_discovery_wait_for_work())
break;
- lnet_resend_msgs();
-
if (lnet_push_target_resize_needed())
lnet_push_target_resize();
+ else if (the_lnet.ln_push_target->pb_needs_post)
+ lnet_push_target_post(the_lnet.ln_push_target,
+ &the_lnet.ln_push_target_md);
+
+ lnet_resend_msgs();
lnet_net_lock(LNET_LOCK_EX);
if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) {
lp->lp_last_queued = ktime_get_real_seconds();
lnet_net_unlock(LNET_LOCK_EX);
+ if (lnet_push_target_resize_needed())
+ lnet_push_target_resize();
+ else if (the_lnet.ln_push_target->pb_needs_post)
+ lnet_push_target_post(the_lnet.ln_push_target,
+ &the_lnet.ln_push_target_md);
+
/*
* Select an action depending on the state of
* the peer and whether discovery is disabled.