-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
CFS_MODULE_PARM(local_nid_dist_zero, "i", int, 0444,
"Reserved");
-/* forward ref */
-static void lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg);
-
-#define LNET_MATCHMD_NONE 0 /* Didn't match */
-#define LNET_MATCHMD_OK 1 /* Matched OK */
-#define LNET_MATCHMD_DROP 2 /* Must be discarded */
-
-static int
-lnet_try_match_md (int index, int op_mask, lnet_process_id_t src,
- unsigned int rlength, unsigned int roffset,
- __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
- unsigned int *mlength_out, unsigned int *offset_out)
-{
- /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
- * lnet_match_blocked_msg() relies on this to avoid races */
- unsigned int offset;
- unsigned int mlength;
- lnet_me_t *me = md->md_me;
-
- /* mismatched MD op */
- if ((md->md_options & op_mask) == 0)
- return LNET_MATCHMD_NONE;
-
- /* MD exhausted */
- if (lnet_md_exhausted(md))
- return LNET_MATCHMD_NONE;
-
- /* mismatched ME nid/pid? */
- if (me->me_match_id.nid != LNET_NID_ANY &&
- me->me_match_id.nid != src.nid)
- return LNET_MATCHMD_NONE;
-
- if (me->me_match_id.pid != LNET_PID_ANY &&
- me->me_match_id.pid != src.pid)
- return LNET_MATCHMD_NONE;
-
- /* mismatched ME matchbits? */
- if (((me->me_match_bits ^ match_bits) & ~me->me_ignore_bits) != 0)
- return LNET_MATCHMD_NONE;
-
- /* Hurrah! This _is_ a match; check it out... */
-
- if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
- offset = md->md_offset;
- else
- offset = roffset;
-
- if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
- mlength = md->md_max_size;
- LASSERT (md->md_offset + mlength <= md->md_length);
- } else {
- mlength = md->md_length - offset;
- }
-
- if (rlength <= mlength) { /* fits in allowed space */
- mlength = rlength;
- } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
- /* this packet _really_ is too big */
- CERROR("Matching packet from %s, match "LPU64
- " length %d too big: %d left, %d allowed\n",
- libcfs_id2str(src), match_bits, rlength,
- md->md_length - offset, mlength);
-
- return LNET_MATCHMD_DROP;
- }
-
- /* Commit to this ME/MD */
- CDEBUG(D_NET, "Incoming %s index %x from %s of "
- "length %d/%d into md "LPX64" [%d] + %d\n",
- (op_mask == LNET_MD_OP_PUT) ? "put" : "get",
- index, libcfs_id2str(src), mlength, rlength,
- md->md_lh.lh_cookie, md->md_niov, offset);
-
- lnet_commit_md(md, msg);
- md->md_offset = offset + mlength;
-
- /* NB Caller will set ev.type and ev.hdr_data */
- msg->msg_ev.initiator = src;
- msg->msg_ev.pt_index = index;
- msg->msg_ev.match_bits = match_bits;
- msg->msg_ev.rlength = rlength;
- msg->msg_ev.mlength = mlength;
- msg->msg_ev.offset = offset;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
- *offset_out = offset;
- *mlength_out = mlength;
-
- /* Auto-unlink NOW, so the ME gets unlinked if required.
- * We bumped md->md_refcount above so the MD just gets flagged
- * for unlink when it is finalized. */
- if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
- lnet_md_exhausted(md)) {
- lnet_md_unlink(md);
- }
-
- return LNET_MATCHMD_OK;
-}
-
-static int
-lnet_match_md(int index, int op_mask, lnet_process_id_t src,
- unsigned int rlength, unsigned int roffset,
- __u64 match_bits, lnet_msg_t *msg,
- unsigned int *mlength_out, unsigned int *offset_out,
- lnet_libmd_t **md_out)
-{
- lnet_portal_t *ptl = &the_lnet.ln_portals[index];
- cfs_list_t *head;
- lnet_me_t *me;
- lnet_me_t *tmp;
- lnet_libmd_t *md;
- int rc;
-
- CDEBUG (D_NET, "Request from %s of length %d into portal %d "
- "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
-
- if (index < 0 || index >= the_lnet.ln_nportals) {
- CERROR("Invalid portal %d not in [0-%d]\n",
- index, the_lnet.ln_nportals);
- return LNET_MATCHMD_DROP;
- }
-
- head = lnet_portal_me_head(index, src, match_bits);
- if (head == NULL) /* nobody posted anything on this portal */
- goto out;
-
- cfs_list_for_each_entry_safe_typed (me, tmp, head,
- lnet_me_t, me_list) {
- md = me->me_md;
-
- /* ME attached but MD not attached yet */
- if (md == NULL)
- continue;
-
- LASSERT (me == md->md_me);
-
- rc = lnet_try_match_md(index, op_mask, src, rlength,
- roffset, match_bits, md, msg,
- mlength_out, offset_out);
- switch (rc) {
- default:
- LBUG();
-
- case LNET_MATCHMD_NONE:
- continue;
-
- case LNET_MATCHMD_OK:
- *md_out = md;
- return LNET_MATCHMD_OK;
-
- case LNET_MATCHMD_DROP:
- return LNET_MATCHMD_DROP;
- }
- /* not reached */
- }
-
- out:
- if (op_mask == LNET_MD_OP_GET ||
- !lnet_portal_is_lazy(ptl))
- return LNET_MATCHMD_DROP;
-
- return LNET_MATCHMD_NONE;
-}
-
int
lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
{
LASSERT(!msg->msg_sending);
LASSERT(rlen == msg->msg_len);
LASSERT(mlen <= msg->msg_len);
+ LASSERT(msg->msg_offset == offset);
+ LASSERT(msg->msg_wanted == mlen);
- msg->msg_wanted = mlen;
- msg->msg_offset = offset;
msg->msg_receiving = 0;
if (mlen != 0) {
LASSERT (lnet_peer_aliveness_enabled(lp));
LASSERT (ni->ni_lnd->lnd_query != NULL);
+ LASSERT (the_lnet.ln_routing == 1);
LNET_UNLOCK();
(ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
if (last_alive != 0) /* NI has updated timestamp */
lp->lp_last_alive = last_alive;
- return;
}
/* NB: always called with LNET_LOCK held */
cfs_time_t deadline;
LASSERT (lnet_peer_aliveness_enabled(lp));
+ LASSERT (the_lnet.ln_routing == 1);
/* Trust lnet_notify() if it has more recent aliveness news, but
* ignore the initial assumed death (see lnet_peers_start_down()).
{
cfs_time_t now = cfs_time_current();
+ /* LU-630: only router checks peer health. */
+ if (the_lnet.ln_routing == 0)
+ return 1;
+
if (!lnet_peer_aliveness_enabled(lp))
return -ENODEV;
/* NB 'lp' is always the next hop */
if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
lnet_peer_alive_locked(lp) == 0) {
+ the_lnet.ln_counters.drop_count++;
+ the_lnet.ln_counters.drop_length += msg->msg_len;
LNET_UNLOCK();
CNETERR("Dropping message for %s: peer not alive\n",
LASSERT (!msg->msg_onactivelist);
msg->msg_onactivelist = 1;
- cfs_list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+ cfs_list_add(&msg->msg_activelist,
+ &the_lnet.ln_msg_container.msc_active);
}
lnet_rtrbufpool_t *
}
}
+static lnet_peer_t *
+lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target)
+{
+ lnet_remotenet_t *rnet;
+ lnet_route_t *rtr;
+ lnet_route_t *rtr_best;
+ lnet_route_t *rtr_last;
+ struct lnet_peer *lp_best;
+ struct lnet_peer *lp;
+ int rc;
+
+ rnet = lnet_find_net_locked(LNET_NIDNET(target));
+ if (rnet == NULL)
+ return NULL;
+
+ lp_best = NULL;
+ rtr_best = rtr_last = NULL;
+ cfs_list_for_each_entry(rtr, &rnet->lrn_routes, lr_list) {
+ lp = rtr->lr_gateway;
+
+ if (!lp->lp_alive || /* gateway is down */
+ (lp->lp_ping_version == LNET_PROTO_PING_VERSION &&
+ rtr->lr_downis != 0)) /* NI to target is down */
+ continue;
+
+ if (ni != NULL && lp->lp_ni != ni)
+ continue;
+
+ if (lp_best == NULL) {
+ rtr_best = rtr_last = rtr;
+ lp_best = lp;
+ continue;
+ }
+
+ rc = lnet_compare_routes(rtr, rtr_best);
+ if (rc < 0)
+ continue;
+
+ rtr_best = rtr;
+ lp_best = lp;
+ }
+
+ if (rtr_best != NULL) {
+ /* Place selected route at the end of the route list to ensure
+ * fairness; everything else being equal... */
+ cfs_list_del(&rtr_best->lr_list);
+ cfs_list_add_tail(&rtr_best->lr_list, &rnet->lrn_routes);
+ }
+
+ return lp_best;
+}
+
int
lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
{
lnet_nid_t dst_nid = msg->msg_target.nid;
lnet_ni_t *src_ni;
lnet_ni_t *local_ni;
- lnet_remotenet_t *rnet;
- lnet_route_t *route;
- lnet_route_t *best_route;
- cfs_list_t *tmp;
lnet_peer_t *lp;
- lnet_peer_t *lp2;
int rc;
LASSERT (msg->msg_txpeer == NULL);
LNET_LOCK();
#endif
/* sending to a remote network */
- rnet = lnet_find_net_locked(LNET_NIDNET(dst_nid));
- if (rnet == NULL) {
- if (src_ni != NULL)
- lnet_ni_decref_locked(src_ni);
- LNET_UNLOCK();
- LCONSOLE_WARN("No route to %s\n",
- libcfs_id2str(msg->msg_target));
- return -EHOSTUNREACH;
- }
-
- /* Find the best gateway I can use */
- lp = NULL;
- best_route = NULL;
- cfs_list_for_each(tmp, &rnet->lrn_routes) {
- route = cfs_list_entry(tmp, lnet_route_t, lr_list);
- lp2 = route->lr_gateway;
-
- if (lp2->lp_alive &&
- lnet_router_down_ni(lp2, rnet->lrn_net) <= 0 &&
- (src_ni == NULL || lp2->lp_ni == src_ni) &&
- (lp == NULL ||
- lnet_compare_routes(route, best_route) > 0)) {
- best_route = route;
- lp = lp2;
- }
- }
-
+ lp = lnet_find_route_locked(src_ni, dst_nid);
if (lp == NULL) {
if (src_ni != NULL)
lnet_ni_decref_locked(src_ni);
return -EHOSTUNREACH;
}
- /* Place selected route at the end of the route list to ensure
- * fairness; everything else being equal... */
- cfs_list_del(&best_route->lr_list);
- cfs_list_add_tail(&best_route->lr_list, &rnet->lrn_routes);
+ CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
+ libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
+ lnet_msgtyp2str(msg->msg_type), msg->msg_len);
if (src_ni == NULL) {
src_ni = lp->lp_ni;
return 0;
}
-static void
-lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
+void
+lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
+ unsigned int offset, unsigned int mlen)
{
/* ALWAYS called holding the LNET_LOCK */
/* Here, we commit the MD to a network OP by marking it busy and
LASSERT (!msg->msg_routing);
msg->msg_md = md;
+ lnet_md_deconstruct(md, &msg->msg_ev.md);
+ lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+ if (msg->msg_receiving) {
+ msg->msg_offset = offset;
+ msg->msg_wanted = mlen;
+ }
md->md_refcount++;
if (md->md_threshold != LNET_MD_THRESH_INF) {
LASSERT (!msg->msg_onactivelist);
msg->msg_onactivelist = 1;
- cfs_list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+ cfs_list_add(&msg->msg_activelist,
+ &the_lnet.ln_msg_container.msc_active);
}
static void
}
static void
-lnet_drop_delayed_put(lnet_msg_t *msg, char *reason)
+lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
{
- lnet_process_id_t id = {0};
-
- id.nid = msg->msg_hdr.src_nid;
- id.pid = msg->msg_hdr.src_pid;
+ lnet_hdr_t *hdr = &msg->msg_hdr;
- LASSERT (msg->msg_md == NULL);
- LASSERT (msg->msg_delayed);
- LASSERT (msg->msg_rxpeer != NULL);
- LASSERT (msg->msg_hdr.type == LNET_MSG_PUT);
-
- CWARN("Dropping delayed PUT from %s portal %d match "LPU64
- " offset %d length %d: %s\n",
- libcfs_id2str(id),
- msg->msg_hdr.msg.put.ptl_index,
- msg->msg_hdr.msg.put.match_bits,
- msg->msg_hdr.msg.put.offset,
- msg->msg_hdr.payload_length,
- reason);
-
- /* NB I can't drop msg's ref on msg_rxpeer until after I've
- * called lnet_drop_message(), so I just hang onto msg as well
- * until that's done */
-
- lnet_drop_message(msg->msg_rxpeer->lp_ni,
- msg->msg_private, msg->msg_len);
+ LNET_LOCK();
- LNET_LOCK();
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += msg->msg_wanted;
- lnet_peer_decref_locked(msg->msg_rxpeer);
- msg->msg_rxpeer = NULL;
+ LNET_UNLOCK();
- lnet_msg_free(msg);
+ if (msg->msg_wanted != 0)
+ lnet_setpayloadbuffer(msg);
- LNET_UNLOCK();
-}
-
-/**
- * Turn on the lazy portal attribute. Use with caution!
- *
- * This portal attribute only affects incoming PUT requests to the portal,
- * and is off by default. By default, if there's no matching MD for an
- * incoming PUT request, it is simply dropped. With the lazy attribute on,
- * such requests are queued indefinitely until either a matching MD is
- * posted to the portal or the lazy attribute is turned off.
- *
- * It would prevent dropped requests, however it should be regarded as the
- * last line of defense - i.e. users must keep a close watch on active
- * buffers on a lazy portal and once it becomes too low post more buffers as
- * soon as possible. This is because delayed requests usually have detrimental
- * effects on underlying network connections. A few delayed requests often
- * suffice to bring an underlying connection to a complete halt, due to flow
- * control mechanisms.
- *
- * There's also a DOS attack risk. If users don't post match-all MDs on a
- * lazy portal, a malicious peer can easily stop a service by sending some
- * PUT requests with match bits that won't match any MD. A routed server is
- * especially vulnerable since the connections to its neighbor routers are
- * shared among all clients.
- *
- * \param portal Index of the portal to enable the lazy attribute on.
- *
- * \retval 0 On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetSetLazyPortal(int portal)
-{
- lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
-
- if (portal < 0 || portal >= the_lnet.ln_nportals)
- return -EINVAL;
-
- CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
-
- LNET_LOCK();
- lnet_portal_setopt(ptl, LNET_PTL_LAZY);
- LNET_UNLOCK();
-
- return 0;
-}
-
-/**
- * Turn off the lazy portal attribute. Delayed requests on the portal,
- * if any, will be all dropped when this function returns.
- *
- * \param portal Index of the portal to disable the lazy attribute on.
- *
- * \retval 0 On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetClearLazyPortal(int portal)
-{
- cfs_list_t zombies;
- lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
- lnet_msg_t *msg;
-
- if (portal < 0 || portal >= the_lnet.ln_nportals)
- return -EINVAL;
-
- LNET_LOCK();
-
- if (!lnet_portal_is_lazy(ptl)) {
- LNET_UNLOCK();
- return 0;
- }
-
- if (the_lnet.ln_shutdown)
- CWARN ("Active lazy portal %d on exit\n", portal);
- else
- CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
-
- /* grab all the blocked messages atomically */
- cfs_list_add(&zombies, &ptl->ptl_msgq);
- cfs_list_del_init(&ptl->ptl_msgq);
-
- ptl->ptl_msgq_version++;
- lnet_portal_unsetopt(ptl, LNET_PTL_LAZY);
-
- LNET_UNLOCK();
-
- while (!cfs_list_empty(&zombies)) {
- msg = cfs_list_entry(zombies.next, lnet_msg_t, msg_list);
- cfs_list_del(&msg->msg_list);
-
- lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
- }
-
- return 0;
-}
-
-static void
-lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
- unsigned int offset, unsigned int mlength)
-{
- lnet_hdr_t *hdr = &msg->msg_hdr;
-
- LNET_LOCK();
-
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += mlength;
-
- LNET_UNLOCK();
-
- if (mlength != 0)
- lnet_setpayloadbuffer(msg);
-
- msg->msg_ev.type = LNET_EVENT_PUT;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.hdr_data = hdr->msg.put.hdr_data;
+ lnet_build_msg_event(msg, LNET_EVENT_PUT);
/* Must I ACK? If so I'll grab the ack_wmd out of the header and put
* it back into the ACK during lnet_finalize() */
- msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
- (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+ msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+ (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
- lnet_ni_recv(msg->msg_rxpeer->lp_ni,
- msg->msg_private,
- msg, delayed, offset, mlength,
- hdr->payload_length);
-}
-
-/* called with LNET_LOCK held */
-void
-lnet_match_blocked_msg(lnet_libmd_t *md)
-{
- CFS_LIST_HEAD (drops);
- CFS_LIST_HEAD (matches);
- cfs_list_t *tmp;
- cfs_list_t *entry;
- lnet_msg_t *msg;
- lnet_portal_t *ptl;
- lnet_me_t *me = md->md_me;
-
- LASSERT (me->me_portal < (unsigned int)the_lnet.ln_nportals);
-
- ptl = &the_lnet.ln_portals[me->me_portal];
- if (!lnet_portal_is_lazy(ptl)) {
- LASSERT (cfs_list_empty(&ptl->ptl_msgq));
- return;
- }
-
- LASSERT (md->md_refcount == 0); /* a brand new MD */
-
- cfs_list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
- int rc;
- int index;
- unsigned int mlength;
- unsigned int offset;
- lnet_hdr_t *hdr;
- lnet_process_id_t src;
-
- msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
- LASSERT (msg->msg_delayed);
-
- hdr = &msg->msg_hdr;
- index = hdr->msg.put.ptl_index;
-
- src.nid = hdr->src_nid;
- src.pid = hdr->src_pid;
-
- rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
- hdr->payload_length,
- hdr->msg.put.offset,
- hdr->msg.put.match_bits,
- md, msg, &mlength, &offset);
-
- if (rc == LNET_MATCHMD_NONE)
- continue;
-
- /* Hurrah! This _is_ a match */
- cfs_list_del(&msg->msg_list);
- ptl->ptl_msgq_version++;
-
- if (rc == LNET_MATCHMD_OK) {
- cfs_list_add_tail(&msg->msg_list, &matches);
-
- CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
- "match "LPU64" offset %d length %d.\n",
- libcfs_id2str(src),
- hdr->msg.put.ptl_index,
- hdr->msg.put.match_bits,
- hdr->msg.put.offset,
- hdr->payload_length);
- } else {
- LASSERT (rc == LNET_MATCHMD_DROP);
-
- cfs_list_add_tail(&msg->msg_list, &drops);
- }
-
- if (lnet_md_exhausted(md))
- break;
- }
-
- LNET_UNLOCK();
-
- cfs_list_for_each_safe (entry, tmp, &drops) {
- msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
- cfs_list_del(&msg->msg_list);
-
- lnet_drop_delayed_put(msg, "Bad match");
- }
-
- cfs_list_for_each_safe (entry, tmp, &matches) {
- msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
- cfs_list_del(&msg->msg_list);
-
- /* md won't disappear under me, since each msg
- * holds a ref on it */
- lnet_recv_put(md, msg, 1,
- msg->msg_ev.offset,
- msg->msg_ev.mlength);
- }
-
- LNET_LOCK();
+ lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_delayed,
+ msg->msg_offset, msg->msg_wanted, hdr->payload_length);
}
static int
__u64 version;
lnet_hdr_t *hdr = &msg->msg_hdr;
unsigned int rlength = hdr->payload_length;
- unsigned int mlength = 0;
- unsigned int offset = 0;
lnet_process_id_t src= {0};
- lnet_libmd_t *md;
lnet_portal_t *ptl;
src.nid = hdr->src_nid;
again:
rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
rlength, hdr->msg.put.offset,
- hdr->msg.put.match_bits, msg,
- &mlength, &offset, &md);
+ hdr->msg.put.match_bits, msg);
switch (rc) {
default:
LBUG();
case LNET_MATCHMD_OK:
LNET_UNLOCK();
- lnet_recv_put(md, msg, msg->msg_delayed, offset, mlength);
+ lnet_recv_put(ni, msg);
return 0;
case LNET_MATCHMD_NONE:
- ptl = &the_lnet.ln_portals[index];
+ ptl = the_lnet.ln_portals[index];
version = ptl->ptl_ml_version;
rc = 0;
if (rc == 0 &&
!the_lnet.ln_shutdown &&
- lnet_portal_is_lazy(ptl)) {
+ lnet_ptl_is_lazy(ptl)) {
if (version != ptl->ptl_ml_version)
goto again;
lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
{
lnet_hdr_t *hdr = &msg->msg_hdr;
- unsigned int mlength = 0;
- unsigned int offset = 0;
lnet_process_id_t src = {0};
lnet_handle_wire_t reply_wmd;
- lnet_libmd_t *md;
int rc;
src.nid = hdr->src_nid;
rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
hdr->msg.get.sink_length, hdr->msg.get.src_offset,
- hdr->msg.get.match_bits, msg,
- &mlength, &offset, &md);
+ hdr->msg.get.match_bits, msg);
if (rc == LNET_MATCHMD_DROP) {
CNETERR("Dropping GET from %s portal %d match "LPU64
" offset %d length %d\n",
LASSERT (rc == LNET_MATCHMD_OK);
the_lnet.ln_counters.send_count++;
- the_lnet.ln_counters.send_length += mlength;
+ the_lnet.ln_counters.send_length += msg->msg_wanted;
- LNET_UNLOCK();
+ LNET_UNLOCK();
- msg->msg_ev.type = LNET_EVENT_GET;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.hdr_data = 0;
+ lnet_build_msg_event(msg, LNET_EVENT_GET);
- reply_wmd = hdr->msg.get.return_wmd;
+ reply_wmd = hdr->msg.get.return_wmd;
- lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
+ lnet_prep_send(msg, LNET_MSG_REPLY, src,
+ msg->msg_offset, msg->msg_wanted);
msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, 0, mlength);
if (mlength != 0)
lnet_setpayloadbuffer(msg);
- msg->msg_ev.type = LNET_EVENT_REPLY;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.initiator = src;
- msg->msg_ev.rlength = rlength;
- msg->msg_ev.mlength = mlength;
- msg->msg_ev.offset = 0;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
the_lnet.ln_counters.recv_count++;
the_lnet.ln_counters.recv_length += mlength;
LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_REPLY);
+
lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
return 0;
}
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
hdr->msg.ack.dst_wmd.wh_object_cookie);
- lnet_commit_md(md, msg);
-
- msg->msg_ev.type = LNET_EVENT_ACK;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.initiator = src;
- msg->msg_ev.mlength = hdr->msg.ack.mlength;
- msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
+ lnet_commit_md(md, msg, 0, 0);
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
+ the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_count++;
+ LNET_UNLOCK();
- LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_ACK);
lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
return 0;
return -EPROTO;
}
- if (the_lnet.ln_routing) {
- cfs_time_t now = cfs_time_current();
-
- LNET_LOCK();
-
- ni->ni_last_alive = now;
- if (ni->ni_status != NULL &&
- ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
- ni->ni_status->ns_status = LNET_NI_STATUS_UP;
+ if (the_lnet.ln_routing &&
+ ni->ni_last_alive != cfs_time_current_sec()) {
+ LNET_LOCK();
- LNET_UNLOCK();
+ /* NB: so far here is the only place to set NI status to "up */
+ ni->ni_last_alive = cfs_time_current_sec();
+ if (ni->ni_status != NULL &&
+ ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
+ ni->ni_status->ns_status = LNET_NI_STATUS_UP;
+ LNET_UNLOCK();
}
/* Regard a bad destination NID as a protocol error. Senders should
msg->msg_len = msg->msg_wanted = payload_length;
msg->msg_offset = 0;
msg->msg_hdr = *hdr;
+ /* for building message event */
+ msg->msg_from = from_nid;
LNET_LOCK();
rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid);
msg->msg_hdr.dest_pid = dest_pid;
msg->msg_hdr.payload_length = payload_length;
- msg->msg_ev.sender = from_nid;
-
switch (type) {
case LNET_MSG_ACK:
rc = lnet_parse_ack(ni, msg);
lnet_peer_decref_locked(msg->msg_rxpeer);
msg->msg_rxpeer = NULL;
}
- lnet_msg_free(msg); /* expects LNET_LOCK held */
+ lnet_msg_free_locked(msg); /* expects LNET_LOCK held */
LNET_UNLOCK();
drop:
return 0;
}
+void
+lnet_drop_delayed_msg_list(cfs_list_t *head, char *reason)
+{
+ while (!cfs_list_empty(head)) {
+ lnet_process_id_t id = {0};
+ lnet_msg_t *msg;
+
+ msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+ cfs_list_del(&msg->msg_list);
+
+ id.nid = msg->msg_hdr.src_nid;
+ id.pid = msg->msg_hdr.src_pid;
+
+ LASSERT(msg->msg_md == NULL);
+ LASSERT(msg->msg_delayed);
+ LASSERT(msg->msg_rxpeer != NULL);
+ LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+ CWARN("Dropping delayed PUT from %s portal %d match "LPU64
+ " offset %d length %d: %s\n",
+ libcfs_id2str(id),
+ msg->msg_hdr.msg.put.ptl_index,
+ msg->msg_hdr.msg.put.match_bits,
+ msg->msg_hdr.msg.put.offset,
+ msg->msg_hdr.payload_length, reason);
+
+ /* NB I can't drop msg's ref on msg_rxpeer until after I've
+ * called lnet_drop_message(), so I just hang onto msg as well
+ * until that's done */
+
+ lnet_drop_message(msg->msg_rxpeer->lp_ni,
+ msg->msg_private, msg->msg_len);
+
+ LNET_LOCK();
+ lnet_peer_decref_locked(msg->msg_rxpeer);
+ LNET_UNLOCK();
+
+ lnet_msg_free(msg);
+ }
+}
+
+void
+lnet_recv_delayed_msg_list(cfs_list_t *head)
+{
+ while (!cfs_list_empty(head)) {
+ lnet_msg_t *msg;
+ lnet_process_id_t id;
+
+ msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+ cfs_list_del(&msg->msg_list);
+
+ /* md won't disappear under me, since each msg
+ * holds a ref on it */
+
+ id.nid = msg->msg_hdr.src_nid;
+ id.pid = msg->msg_hdr.src_pid;
+
+ LASSERT(msg->msg_delayed);
+ LASSERT(msg->msg_md != NULL);
+ LASSERT(msg->msg_rxpeer != NULL);
+ LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+ CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
+ "match "LPU64" offset %d length %d.\n",
+ libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
+ msg->msg_hdr.msg.put.match_bits,
+ msg->msg_hdr.msg.put.offset,
+ msg->msg_hdr.payload_length);
+
+ lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
+ }
+}
+
/**
* Initiate an asynchronous PUT operation.
*
md = lnet_handle2md(&mdh);
if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
- lnet_msg_free(msg);
+ lnet_msg_free_locked(msg);
CERROR("Dropping PUT ("LPU64":%d:%s): MD (%d) invalid\n",
match_bits, portal, libcfs_id2str(target),
CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, 0, 0);
lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
LNET_WIRE_HANDLE_COOKIE_NONE;
}
- msg->msg_ev.type = LNET_EVENT_SEND;
- msg->msg_ev.initiator.nid = LNET_NID_ANY;
- msg->msg_ev.initiator.pid = the_lnet.ln_pid;
- msg->msg_ev.target = target;
- msg->msg_ev.sender = LNET_NID_ANY;
- msg->msg_ev.pt_index = portal;
- msg->msg_ev.match_bits = match_bits;
- msg->msg_ev.rlength = md->md_length;
- msg->msg_ev.mlength = md->md_length;
- msg->msg_ev.offset = offset;
- msg->msg_ev.hdr_data = hdr_data;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
the_lnet.ln_counters.send_count++;
the_lnet.ln_counters.send_length += md->md_length;
LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
rc = lnet_send(self, msg);
if (rc != 0) {
CNETERR( "Error sending PUT to %s: %d\n",
LASSERT (getmd->md_offset == 0);
- CDEBUG(D_NET, "%s: Reply from %s md %p\n",
- libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
-
- lnet_commit_md (getmd, msg);
+ CDEBUG(D_NET, "%s: Reply from %s md %p\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
+ /* setup information for lnet_build_msg_event */
+ msg->msg_from = peer_id.nid;
msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
+ msg->msg_hdr.src_nid = peer_id.nid;
+ msg->msg_hdr.payload_length = getmd->md_length;
- msg->msg_ev.type = LNET_EVENT_REPLY;
- msg->msg_ev.initiator = peer_id;
- msg->msg_ev.sender = peer_id.nid; /* optimized GETs can't be routed */
- msg->msg_ev.rlength = msg->msg_ev.mlength = getmd->md_length;
- msg->msg_ev.offset = 0;
+ lnet_commit_md(getmd, msg, getmd->md_offset, getmd->md_length);
- lnet_md_deconstruct(getmd, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, getmd);
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += getmd->md_length;
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += getmd->md_length;
+ LNET_UNLOCK();
- LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_REPLY);
- return msg;
+ return msg;
drop_msg:
- lnet_msg_free(msg);
+ lnet_msg_free_locked(msg);
drop:
- the_lnet.ln_counters.drop_count++;
- the_lnet.ln_counters.drop_length += getmd->md_length;
+ the_lnet.ln_counters.drop_count++;
+ the_lnet.ln_counters.drop_length += getmd->md_length;
- LNET_UNLOCK ();
+ LNET_UNLOCK ();
- return NULL;
+ return NULL;
}
void
md = lnet_handle2md(&mdh);
if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
- lnet_msg_free(msg);
+ lnet_msg_free_locked(msg);
CERROR("Dropping GET ("LPU64":%d:%s): MD (%d) invalid\n",
match_bits, portal, libcfs_id2str(target),
CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, 0, 0);
lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
md->md_lh.lh_cookie;
- msg->msg_ev.type = LNET_EVENT_SEND;
- msg->msg_ev.initiator.nid = LNET_NID_ANY;
- msg->msg_ev.initiator.pid = the_lnet.ln_pid;
- msg->msg_ev.target = target;
- msg->msg_ev.sender = LNET_NID_ANY;
- msg->msg_ev.pt_index = portal;
- msg->msg_ev.match_bits = match_bits;
- msg->msg_ev.rlength = md->md_length;
- msg->msg_ev.mlength = md->md_length;
- msg->msg_ev.offset = offset;
- msg->msg_ev.hdr_data = 0;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
the_lnet.ln_counters.send_count++;
LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
rc = lnet_send(self, msg);
if (rc < 0) {
CNETERR( "Error sending GET to %s: %d\n",