lnet_md_exhausted(md));
}
-static inline unsigned int
-lnet_match_to_hash(lnet_process_id_t id, __u64 mbits)
-{
- mbits += id.nid + id.pid;
- return cfs_hash_long((unsigned long)mbits, LNET_PORTAL_HASH_BITS);
-}
-
#ifdef __KERNEL__
#define LNET_LOCK() cfs_spin_lock(&the_lnet.ln_lock)
#define LNET_UNLOCK() cfs_spin_unlock(&the_lnet.ln_lock)
void lnet_return_rx_credits_locked(lnet_msg_t *msg);
/* portals functions */
+/* portals attributes */
static inline int
lnet_ptl_is_lazy(lnet_portal_t *ptl)
{
ptl->ptl_options &= ~opt;
}
-static inline cfs_list_t *
-lnet_ptl_me_head(int index, lnet_process_id_t id, __u64 mbits)
-{
- lnet_portal_t *ptl = the_lnet.ln_portals[index];
-
- if (lnet_ptl_is_wildcard(ptl)) {
- return &ptl->ptl_mlist;
- } else if (lnet_ptl_is_unique(ptl)) {
- LASSERT(ptl->ptl_mhash != NULL);
- return &ptl->ptl_mhash[lnet_match_to_hash(id, mbits)];
- }
- return NULL;
-}
-
+/* match-table functions */
+cfs_list_t *lnet_mt_match_head(struct lnet_match_table *mtable,
+ lnet_process_id_t id, __u64 mbits);
+struct lnet_match_table *lnet_mt_of_attach(unsigned int index,
+ lnet_process_id_t id, __u64 mbits,
+ __u64 ignore_bits,
+ lnet_ins_pos_t pos);
+struct lnet_match_table *lnet_mt_of_match(unsigned int index,
+ lnet_process_id_t id, __u64 mbits);
+int lnet_mt_match_md(struct lnet_match_table *mtable,
+ int op_mask, lnet_process_id_t src,
+ unsigned int rlength, unsigned int roffset,
+ __u64 match_bits, lnet_msg_t *msg);
+
+/* portals match/attach functions */
+void lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
+ cfs_list_t *matches, cfs_list_t *drops);
+void lnet_ptl_detach_md(lnet_me_t *me, lnet_libmd_t *md);
+int lnet_ptl_match_md(unsigned int index, int op_mask, lnet_process_id_t src,
+ unsigned int rlength, unsigned int roffset,
+ __u64 match_bits, lnet_msg_t *msg);
+
+/* initialized and finalize portals */
int lnet_portals_create(void);
void lnet_portals_destroy(void);
-int lnet_ptl_type_match(struct lnet_portal *ptl, lnet_process_id_t id,
- __u64 mbits, __u64 ignore_bits);
-void lnet_match_blocked_msg(lnet_libmd_t *md);
-int lnet_match_md(int index, int op_mask, lnet_process_id_t src,
- unsigned int rlength, unsigned int roffset,
- __u64 match_bits, lnet_msg_t *msg);
-
/* message functions */
int lnet_parse (lnet_ni_t *ni, lnet_hdr_t *hdr,
lnet_nid_t fromnid, void *private, int rdma_req);
lnet_nid_t msg_from;
__u32 msg_type;
- unsigned int msg_rx_committed:1;
+ /* commited for sending */
unsigned int msg_tx_committed:1;
+ /* queued for tx credit */
+ unsigned int msg_tx_delayed:1;
+ /* commited for receiving */
+ unsigned int msg_rx_committed:1;
+ /* queued for RX buffer */
+ unsigned int msg_rx_delayed:1;
+ /* ready for pending on RX delay list */
+ unsigned int msg_rx_ready_delay:1;
unsigned int msg_vmflush:1; /* VM trying to free memory */
unsigned int msg_target_is_router:1; /* sending to a router */
unsigned int msg_ack:1; /* ack on finalize (PUT) */
unsigned int msg_sending:1; /* outgoing message */
unsigned int msg_receiving:1; /* being received */
- unsigned int msg_delayed:1; /* had to Q for buffer or tx credit */
unsigned int msg_txcredit:1; /* taken an NI send credit */
unsigned int msg_peertxcredit:1; /* taken a peer send credit */
unsigned int msg_rtrcredit:1; /* taken a globel router credit */
#define LNET_PTL_MATCH_WILDCARD (1 << 2) /* wildcard match, request portal */
/* ME hash of RDMA portal */
-#define LNET_PORTAL_HASH_BITS 8
-#define LNET_PORTAL_HASH_SIZE (1 << LNET_PORTAL_HASH_BITS)
+#define LNET_MT_HASH_BITS 8
+#define LNET_MT_HASH_SIZE (1 << LNET_MT_HASH_BITS)
+
+/* portal match table */
+struct lnet_match_table {
+ /* reserved for upcoming patches, CPU partition ID */
+ unsigned int mt_cpt;
+ unsigned int mt_portal; /* portal index */
+ cfs_list_t mt_mlist; /* matching list */
+ cfs_list_t *mt_mhash; /* matching hash */
+};
typedef struct lnet_portal {
unsigned int ptl_index; /* portal ID, reserved */
- cfs_list_t *ptl_mhash; /* match hash */
- cfs_list_t ptl_mlist; /* match list */
- cfs_list_t ptl_msgq; /* messages blocking for MD */
- __u64 ptl_ml_version; /* validity stamp, only changed for new attached MD */
- __u64 ptl_msgq_version; /* validity stamp */
- unsigned int ptl_options;
+ /* flags on this portal: lazy, unique... */
+ unsigned int ptl_options;
+ /* Now we only have single instance for each portal,
+ * will have instance per CPT in upcoming patches */
+ struct lnet_match_table *ptl_mtable;
+ /* messages blocking for MD */
+ cfs_list_t ptl_msgq;
} lnet_portal_t;
#define LNET_LH_HASH_BITS 12
/* Disassociate from ME (if any), and unlink it if it was created
* with LNET_UNLINK */
if (me != NULL) {
- md->md_me = NULL;
- me->me_md = NULL;
+ /* detach MD from portal */
+ lnet_ptl_detach_md(me, md);
if (me->me_unlink == LNET_UNLINK)
lnet_me_unlink(me);
}
LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
lnet_unlink_t unlink, lnet_handle_md_t *handle)
{
+ CFS_LIST_HEAD (matches);
+ CFS_LIST_HEAD (drops);
lnet_me_t *me;
lnet_libmd_t *md;
int rc;
if (rc != 0)
goto failed;
- the_lnet.ln_portals[me->me_portal]->ptl_ml_version++;
-
- me->me_md = md;
- md->md_me = me;
+ /* attach this MD to portal of ME and check if it matches any
+ * blocked msgs on this portal */
+ lnet_ptl_attach_md(me, md, &matches, &drops);
lnet_md2handle(handle, md);
- /* check if this MD matches any blocked msgs */
- lnet_match_blocked_msg(md); /* expects LNET_LOCK held */
-
LNET_UNLOCK();
+
+ lnet_drop_delayed_msg_list(&drops, "Bad match");
+ lnet_recv_delayed_msg_list(&matches);
+
return 0;
failed:
lnet_unlink_t unlink, lnet_ins_pos_t pos,
lnet_handle_me_t *handle)
{
+ struct lnet_match_table *mtable;
lnet_me_t *me;
- lnet_portal_t *ptl;
cfs_list_t *head;
- int rc;
LASSERT (the_lnet.ln_init);
LASSERT (the_lnet.ln_refcount > 0);
if ((int)portal >= the_lnet.ln_nportals)
return -EINVAL;
- ptl = the_lnet.ln_portals[portal];
- rc = lnet_ptl_type_match(ptl, match_id, match_bits, ignore_bits);
- if (!rc)
+ mtable = lnet_mt_of_attach(portal, match_id,
+ match_bits, ignore_bits, pos);
+ if (mtable == NULL) /* can't match portal type */
return -EPERM;
me = lnet_me_alloc();
me->me_md = NULL;
lnet_res_lh_initialize(&the_lnet.ln_me_container, &me->me_lh);
- head = lnet_ptl_me_head(portal, match_id, match_bits);
+ head = lnet_mt_match_head(mtable, match_id, match_bits);
LASSERT (head != NULL);
if (pos == LNET_INS_AFTER)
void
lnet_me_unlink(lnet_me_t *me)
{
- cfs_list_del (&me->me_list);
+ cfs_list_del(&me->me_list);
- if (me->me_md != NULL) {
- me->me_md->md_me = NULL;
- lnet_md_unlink(me->me_md);
- }
+ if (me->me_md != NULL) {
+ lnet_libmd_t *md = me->me_md;
+
+ /* detach MD from portal of this ME */
+ lnet_ptl_detach_md(me, md);
+ lnet_md_unlink(md);
+ }
lnet_res_lh_invalidate(&me->me_lh);
lnet_me_free_locked(me);
}
int
-lnet_eager_recv_locked(lnet_msg_t *msg)
+lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
{
- lnet_peer_t *peer;
- lnet_ni_t *ni;
- int rc = 0;
-
- LASSERT (!msg->msg_delayed);
- msg->msg_delayed = 1;
-
- LASSERT (msg->msg_receiving);
- LASSERT (!msg->msg_sending);
+ int rc;
+
+ LASSERT(!msg->msg_sending);
+ LASSERT(msg->msg_receiving);
+ LASSERT(ni->ni_lnd->lnd_eager_recv != NULL);
+
+ msg->msg_rx_ready_delay = 1;
+ rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
+ &msg->msg_private);
+ if (rc != 0) {
+ CERROR("recv from %s / send to %s aborted: "
+ "eager_recv failed %d\n",
+ libcfs_nid2str(msg->msg_rxpeer->lp_nid),
+ libcfs_id2str(msg->msg_target), rc);
+ LASSERT(rc < 0); /* required by my callers */
+ }
- peer = msg->msg_rxpeer;
- ni = peer->lp_ni;
+ return rc;
+}
- if (ni->ni_lnd->lnd_eager_recv != NULL) {
- LNET_UNLOCK();
+int
+lnet_ni_eager_recv_locked(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ int rc;
- rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
- &msg->msg_private);
- if (rc != 0) {
- CERROR("recv from %s / send to %s aborted: "
- "eager_recv failed %d\n",
- libcfs_nid2str(peer->lp_nid),
- libcfs_id2str(msg->msg_target), rc);
- LASSERT (rc < 0); /* required by my callers */
- }
+ if (ni->ni_lnd->lnd_eager_recv == NULL) {
+ msg->msg_rx_ready_delay = 1;
+ return 0;
+ }
- LNET_LOCK();
- }
+ LNET_UNLOCK();
+ rc = lnet_ni_eager_recv(ni, msg);
+ LNET_LOCK();
- return rc;
+ return rc;
}
/* NB: caller shall hold a ref on 'lp' as I'd drop LNET_LOCK */
lnet_ni_t *ni = lp->lp_ni;
/* non-lnet_send() callers have checked before */
- LASSERT (!do_send || msg->msg_delayed);
- LASSERT (!msg->msg_receiving);
+ LASSERT(!do_send || msg->msg_tx_delayed);
+ LASSERT(!msg->msg_receiving);
/* NB 'lp' is always the next hop */
if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
lp->lp_mintxcredits = lp->lp_txcredits;
if (lp->lp_txcredits < 0) {
- msg->msg_delayed = 1;
+ msg->msg_tx_delayed = 1;
cfs_list_add_tail(&msg->msg_list, &lp->lp_txq);
return EAGAIN;
}
ni->ni_mintxcredits = ni->ni_txcredits;
if (ni->ni_txcredits < 0) {
- msg->msg_delayed = 1;
+ msg->msg_tx_delayed = 1;
cfs_list_add_tail(&msg->msg_list, &ni->ni_txq);
return EAGAIN;
}
LASSERT (msg->msg_receiving);
LASSERT (!msg->msg_sending);
- /* non-lnet_parse callers only send delayed messages */
- LASSERT (!do_recv || msg->msg_delayed);
+ /* non-lnet_parse callers only receive delayed messages */
+ LASSERT(!do_recv || msg->msg_rx_delayed);
if (!msg->msg_peerrtrcredit) {
LASSERT ((lp->lp_rtrcredits < 0) ==
if (lp->lp_rtrcredits < 0) {
/* must have checked eager_recv before here */
- LASSERT (msg->msg_delayed);
+ LASSERT(msg->msg_rx_ready_delay);
+ msg->msg_rx_delayed = 1;
cfs_list_add_tail(&msg->msg_list, &lp->lp_rtrq);
return EAGAIN;
}
if (rbp->rbp_credits < 0) {
/* must have checked eager_recv before here */
- LASSERT (msg->msg_delayed);
+ LASSERT(msg->msg_rx_ready_delay);
+ msg->msg_rx_delayed = 1;
cfs_list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
return EAGAIN;
}
cfs_list_del(&msg2->msg_list);
LASSERT(msg2->msg_txpeer->lp_ni == ni);
- LASSERT(msg2->msg_delayed);
+ LASSERT(msg2->msg_tx_delayed);
(void) lnet_post_send_locked(msg2, 1);
}
lnet_msg_t, msg_list);
cfs_list_del(&msg2->msg_list);
- LASSERT (msg2->msg_txpeer == txpeer);
- LASSERT (msg2->msg_delayed);
+ LASSERT(msg2->msg_txpeer == txpeer);
+ LASSERT(msg2->msg_tx_delayed);
(void) lnet_post_send_locked(msg2, 1);
}
msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
(msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
- lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_delayed,
+ lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
msg->msg_offset, msg->msg_wanted, hdr->payload_length);
}
{
int rc;
int index;
- __u64 version;
lnet_hdr_t *hdr = &msg->msg_hdr;
unsigned int rlength = hdr->payload_length;
lnet_process_id_t src= {0};
- lnet_portal_t *ptl;
src.nid = hdr->src_nid;
src.pid = hdr->src_pid;
index = hdr->msg.put.ptl_index;
- LNET_LOCK();
+ msg->msg_rx_ready_delay = ni->ni_lnd->lnd_eager_recv == NULL;
again:
- rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
- rlength, hdr->msg.put.offset,
- hdr->msg.put.match_bits, msg);
+ rc = lnet_ptl_match_md(index, LNET_MD_OP_PUT, src,
+ rlength, hdr->msg.put.offset,
+ hdr->msg.put.match_bits, msg);
switch (rc) {
default:
LBUG();
case LNET_MATCHMD_OK:
- LNET_UNLOCK();
lnet_recv_put(ni, msg);
return 0;
case LNET_MATCHMD_NONE:
- ptl = the_lnet.ln_portals[index];
- version = ptl->ptl_ml_version;
-
- rc = 0;
- if (!msg->msg_delayed)
- rc = lnet_eager_recv_locked(msg);
-
- if (rc == 0 &&
- !the_lnet.ln_shutdown &&
- lnet_ptl_is_lazy(ptl)) {
- if (version != ptl->ptl_ml_version)
- goto again;
-
- cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msgq);
- ptl->ptl_msgq_version++;
- LNET_UNLOCK();
+ if (msg->msg_rx_delayed) /* attached on delayed list */
+ return 0;
- CDEBUG(D_NET, "Delaying PUT from %s portal %d match "
- LPU64" offset %d length %d: no match \n",
- libcfs_id2str(src), index,
- hdr->msg.put.match_bits,
- hdr->msg.put.offset, rlength);
- return 0;
- }
+ rc = lnet_ni_eager_recv(ni, msg);
+ if (rc == 0)
+ goto again;
/* fall through */
case LNET_MATCHMD_DROP:
libcfs_id2str(src), index,
hdr->msg.put.match_bits,
hdr->msg.put.offset, rlength, rc);
- LNET_UNLOCK();
return ENOENT; /* +ve: OK but no match */
}
hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
- LNET_LOCK();
-
- rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
- hdr->msg.get.sink_length, hdr->msg.get.src_offset,
- hdr->msg.get.match_bits, msg);
+ rc = lnet_ptl_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
+ hdr->msg.get.sink_length,
+ hdr->msg.get.src_offset,
+ hdr->msg.get.match_bits, msg);
if (rc == LNET_MATCHMD_DROP) {
CNETERR("Dropping GET from %s portal %d match "LPU64
" offset %d length %d\n",
hdr->msg.get.match_bits,
hdr->msg.get.src_offset,
hdr->msg.get.sink_length);
- LNET_UNLOCK();
return ENOENT; /* +ve: OK but no match */
}
LASSERT (rc == LNET_MATCHMD_OK);
- LNET_UNLOCK();
-
lnet_build_msg_event(msg, LNET_EVENT_GET);
reply_wmd = hdr->msg.get.return_wmd;
LNET_LOCK();
if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
lnet_msg2bufpool(msg)->rbp_credits <= 0) {
- rc = lnet_eager_recv_locked(msg);
+ rc = lnet_ni_eager_recv_locked(ni, msg);
if (rc != 0) {
LNET_UNLOCK();
goto free_drop;
id.pid = msg->msg_hdr.src_pid;
LASSERT(msg->msg_md == NULL);
- LASSERT(msg->msg_delayed);
+ LASSERT(msg->msg_rx_delayed);
LASSERT(msg->msg_rxpeer != NULL);
LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
id.nid = msg->msg_hdr.src_nid;
id.pid = msg->msg_hdr.src_pid;
- LASSERT(msg->msg_delayed);
+ LASSERT(msg->msg_rx_delayed);
LASSERT(msg->msg_md != NULL);
LASSERT(msg->msg_rxpeer != NULL);
LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
#include <lnet/lib-lnet.h>
-int
-lnet_ptl_type_match(struct lnet_portal *ptl, lnet_process_id_t match_id,
+static int
+lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
__u64 mbits, __u64 ignore_bits)
{
- int unique;
+ struct lnet_portal *ptl = the_lnet.ln_portals[index];
+ int unique;
unique = ignore_bits == 0 &&
match_id.nid != LNET_NID_ANY &&
return LNET_MATCHMD_OK;
}
+struct lnet_match_table *
+lnet_mt_of_attach(unsigned int index, lnet_process_id_t id,
+ __u64 mbits, __u64 ignore_bits, lnet_ins_pos_t pos)
+{
+ struct lnet_portal *ptl;
+
+ LASSERT(index < the_lnet.ln_nportals);
+
+ if (!lnet_ptl_match_type(index, id, mbits, ignore_bits))
+ return NULL;
+
+ ptl = the_lnet.ln_portals[index];
+ /* NB: Now we only have one match-table for each portal,
+ * and will have match-table per CPT in upcoming changes,
+ * ME will be scattered to different match-tables based
+ * on attaching information */
+ return ptl->ptl_mtable;
+}
+
+struct lnet_match_table *
+lnet_mt_of_match(unsigned int index, lnet_process_id_t id, __u64 mbits)
+{
+ struct lnet_portal *ptl;
+
+ LASSERT(index < the_lnet.ln_nportals);
+
+ ptl = the_lnet.ln_portals[index];
+ if (!lnet_ptl_is_unique(ptl) &&
+ !lnet_ptl_is_wildcard(ptl) && !lnet_ptl_is_lazy(ptl))
+ return NULL;
+
+ /* NB: Now we only have one match-table for each portal,
+ * and will have match-table per CPT in upcoming changes,
+ * request will be scattered to different match-tables based
+ * on matching information */
+ return ptl->ptl_mtable;
+}
+
+cfs_list_t *
+lnet_mt_match_head(struct lnet_match_table *mtable,
+ lnet_process_id_t id, __u64 mbits)
+{
+ struct lnet_portal *ptl = the_lnet.ln_portals[mtable->mt_portal];
+
+ if (lnet_ptl_is_wildcard(ptl)) {
+ return &mtable->mt_mlist;
+
+ } else if (lnet_ptl_is_unique(ptl)) {
+ unsigned long hash = mbits + id.nid + id.pid;
+
+ hash = cfs_hash_long(hash, LNET_MT_HASH_BITS);
+ return &mtable->mt_mhash[hash];
+ }
+
+ return NULL;
+}
+
int
-lnet_match_md(int index, int op_mask, lnet_process_id_t src,
- unsigned int rlength, unsigned int roffset,
- __u64 match_bits, lnet_msg_t *msg)
+lnet_mt_match_md(struct lnet_match_table *mtable,
+ int op_mask, lnet_process_id_t src,
+ unsigned int rlength, unsigned int roffset,
+ __u64 match_bits, lnet_msg_t *msg)
{
- struct lnet_portal *ptl = the_lnet.ln_portals[index];
cfs_list_t *head;
lnet_me_t *me;
lnet_me_t *tmp;
- lnet_libmd_t *md;
int rc;
- CDEBUG(D_NET, "Request from %s of length %d into portal %d "
- "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
-
- if (index < 0 || index >= the_lnet.ln_nportals) {
- CERROR("Invalid portal %d not in [0-%d]\n",
- index, the_lnet.ln_nportals);
- return LNET_MATCHMD_DROP;
- }
-
- head = lnet_ptl_me_head(index, src, match_bits);
+ head = lnet_mt_match_head(mtable, src, match_bits);
if (head == NULL) /* nobody posted anything on this portal */
goto out;
cfs_list_for_each_entry_safe(me, tmp, head, me_list) {
- md = me->me_md;
-
/* ME attached but MD not attached yet */
- if (md == NULL)
+ if (me->me_md == NULL)
continue;
- LASSERT(me == md->md_me);
+ LASSERT(me == me->me_md->md_me);
- rc = lnet_try_match_md(index, op_mask, src, rlength,
- roffset, match_bits, md, msg);
+ rc = lnet_try_match_md(mtable->mt_portal,
+ op_mask, src, rlength, roffset,
+ match_bits, me->me_md, msg);
switch (rc) {
default:
LBUG();
out:
if (op_mask == LNET_MD_OP_GET ||
- !lnet_ptl_is_lazy(ptl))
+ !lnet_ptl_is_lazy(the_lnet.ln_portals[mtable->mt_portal]))
return LNET_MATCHMD_DROP;
return LNET_MATCHMD_NONE;
}
-/* called with LNET_LOCK held */
-void
-lnet_match_blocked_msg(lnet_libmd_t *md)
+int
+lnet_ptl_match_md(unsigned int index, int op_mask, lnet_process_id_t src,
+ unsigned int rlength, unsigned int roffset,
+ __u64 match_bits, lnet_msg_t *msg)
{
- CFS_LIST_HEAD (drops);
- CFS_LIST_HEAD (matches);
- cfs_list_t *tmp;
- cfs_list_t *entry;
- lnet_msg_t *msg;
+ struct lnet_match_table *mtable;
struct lnet_portal *ptl;
- lnet_me_t *me = md->md_me;
+ int rc;
- LASSERT(me->me_portal < (unsigned int)the_lnet.ln_nportals);
+ CDEBUG(D_NET, "Request from %s of length %d into portal %d "
+ "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
- ptl = the_lnet.ln_portals[me->me_portal];
- if (!lnet_ptl_is_lazy(ptl)) {
- LASSERT(cfs_list_empty(&ptl->ptl_msgq));
- return;
+ if (index >= the_lnet.ln_nportals) {
+ CERROR("Invalid portal %d not in [0-%d]\n",
+ index, the_lnet.ln_nportals);
+ return LNET_MATCHMD_DROP;
}
+ mtable = lnet_mt_of_match(index, src, match_bits);
+ if (mtable == NULL) {
+ CDEBUG(D_NET, "Drop early message from %s of length %d into "
+ "portal %d MB="LPX64"\n",
+ libcfs_id2str(src), rlength, index, match_bits);
+ return LNET_MATCHMD_DROP;
+ }
+
+ ptl = the_lnet.ln_portals[index];
+ LNET_LOCK();
+
+ if (the_lnet.ln_shutdown) {
+ rc = LNET_MATCHMD_DROP;
+ goto out;
+ }
+
+ rc = lnet_mt_match_md(mtable, op_mask, src, rlength,
+ roffset, match_bits, msg);
+ if (rc != LNET_MATCHMD_NONE) /* matched or dropping */
+ goto out;
+
+ if (!msg->msg_rx_ready_delay)
+ goto out;
+
+ LASSERT(!msg->msg_rx_delayed);
+ msg->msg_rx_delayed = 1;
+ cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msgq);
+
+ CDEBUG(D_NET,
+ "Delaying %s from %s portal %d MB "LPX64" offset %d len %d\n",
+ op_mask == LNET_MD_OP_PUT ? "PUT" : "GET",
+ libcfs_id2str(src), index, match_bits, roffset, rlength);
+ out:
+ LNET_UNLOCK();
+ return rc;
+}
+
+void
+lnet_ptl_detach_md(lnet_me_t *me, lnet_libmd_t *md)
+{
+ LASSERT(me->me_md == md && md->md_me == me);
+
+ me->me_md = NULL;
+ md->md_me = NULL;
+}
+
+/* called with LNET_LOCK held */
+void
+lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
+ cfs_list_t *matches, cfs_list_t *drops)
+{
+ struct lnet_portal *ptl = the_lnet.ln_portals[me->me_portal];
+ lnet_msg_t *tmp;
+ lnet_msg_t *msg;
+
LASSERT(md->md_refcount == 0); /* a brand new MD */
- cfs_list_for_each_safe(entry, tmp, &ptl->ptl_msgq) {
+ me->me_md = md;
+ md->md_me = me;
+
+ cfs_list_for_each_entry_safe(msg, tmp, &ptl->ptl_msgq, msg_list) {
int rc;
int index;
lnet_hdr_t *hdr;
lnet_process_id_t src;
- msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
- LASSERT(msg->msg_delayed);
+ LASSERT(msg->msg_rx_delayed);
hdr = &msg->msg_hdr;
index = hdr->msg.put.ptl_index;
/* Hurrah! This _is_ a match */
cfs_list_del(&msg->msg_list);
- ptl->ptl_msgq_version++;
if (rc == LNET_MATCHMD_OK) {
- cfs_list_add_tail(&msg->msg_list, &matches);
+ cfs_list_add_tail(&msg->msg_list, matches);
CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
"match "LPU64" offset %d length %d.\n",
} else {
LASSERT(rc == LNET_MATCHMD_DROP);
- cfs_list_add_tail(&msg->msg_list, &drops);
+ cfs_list_add_tail(&msg->msg_list, drops);
}
if (lnet_md_exhausted(md))
break;
}
-
- LNET_UNLOCK();
-
- lnet_drop_delayed_msg_list(&drops, "Bad match");
- lnet_recv_delayed_msg_list(&matches);
-
- LNET_LOCK();
}
void
lnet_ptl_cleanup(struct lnet_portal *ptl)
{
- lnet_me_t *me;
- int j;
+ struct lnet_match_table *mtable;
LASSERT(cfs_list_empty(&ptl->ptl_msgq));
- LASSERT(cfs_list_empty(&ptl->ptl_mlist));
- if (ptl->ptl_mhash == NULL) /* uninitialized portal */
+ if (ptl->ptl_mtable == NULL) /* uninitialized portal */
return;
- /* cleanup ME */
- while (!cfs_list_empty(&ptl->ptl_mlist)) {
- me = cfs_list_entry(ptl->ptl_mlist.next,
- lnet_me_t, me_list);
- CERROR("Active wildcard ME %p on exit\n", me);
- cfs_list_del(&me->me_list);
- lnet_me_free(me);
- }
+ do { /* iterate over match-tables when we have percpt match-table */
+ cfs_list_t *mhash;
+ lnet_me_t *me;
+ int j;
+
+ mtable = ptl->ptl_mtable;
+
+ if (mtable->mt_mhash == NULL) /* uninitialized match-table */
+ continue;
- for (j = 0; j < LNET_PORTAL_HASH_SIZE; j++) {
- while (!cfs_list_empty(&ptl->ptl_mhash[j])) {
- me = cfs_list_entry(ptl->ptl_mhash[j].next,
- lnet_me_t, me_list);
- CERROR("Active unique ME %p on exit\n", me);
+ mhash = mtable->mt_mhash;
+ /* cleanup ME */
+ while (!cfs_list_empty(&mtable->mt_mlist)) {
+ me = cfs_list_entry(mtable->mt_mlist.next,
+ lnet_me_t, me_list);
+ CERROR("Active wildcard ME %p on exit\n", me);
cfs_list_del(&me->me_list);
lnet_me_free(me);
}
- }
- LIBCFS_FREE(ptl->ptl_mhash,
- LNET_PORTAL_HASH_SIZE * sizeof(ptl->ptl_mhash[0]));
- ptl->ptl_mhash = NULL; /* mark it as finalized */
+ for (j = 0; j < LNET_MT_HASH_SIZE; j++) {
+ while (!cfs_list_empty(&mhash[j])) {
+ me = cfs_list_entry(mhash[j].next,
+ lnet_me_t, me_list);
+ CERROR("Active unique ME %p on exit\n", me);
+ cfs_list_del(&me->me_list);
+ lnet_me_free(me);
+ }
+ }
+
+ LIBCFS_FREE(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE);
+ } while (0);
+
+ LIBCFS_FREE(ptl->ptl_mtable, sizeof(*mtable));
+ ptl->ptl_mtable = NULL;
}
int
lnet_ptl_setup(struct lnet_portal *ptl, int index)
{
+ struct lnet_match_table *mtable;
cfs_list_t *mhash;
- int i;
+ int j;
ptl->ptl_index = index;
CFS_INIT_LIST_HEAD(&ptl->ptl_msgq);
- CFS_INIT_LIST_HEAD(&ptl->ptl_mlist);
- LIBCFS_ALLOC(mhash, sizeof(*mhash) * LNET_PORTAL_HASH_SIZE);
- if (mhash == NULL) {
+ LIBCFS_ALLOC(mtable, sizeof(*mtable));
+ if (mtable == NULL) {
CERROR("Failed to create match table for portal %d\n", index);
return -ENOMEM;
}
- for (i = 0; i < LNET_PORTAL_HASH_SIZE; i++)
- CFS_INIT_LIST_HEAD(&mhash[i]);
+ ptl->ptl_mtable = mtable;
+ do { /* iterate over match-tables when we have percpt match-table */
+ LIBCFS_ALLOC(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE);
+ if (mhash == NULL) {
+ CERROR("Failed to create match hash for portal %d\n",
+ index);
+ goto failed;
+ }
+
+ mtable->mt_mhash = mhash;
+ for (j = 0; j < LNET_MT_HASH_SIZE; j++)
+ CFS_INIT_LIST_HEAD(&mhash[j]);
- ptl->ptl_mhash = mhash; /* initialized */
+ CFS_INIT_LIST_HEAD(&mtable->mt_mlist);
+ mtable->mt_portal = index;
+ } while (0);
return 0;
+ failed:
+ lnet_ptl_cleanup(ptl);
+ return -ENOMEM;
}
void
/* grab all the blocked messages atomically */
cfs_list_splice_init(&ptl->ptl_msgq, &zombies);
- ptl->ptl_msgq_version++;
lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
LNET_UNLOCK();