+ /*
+ * Steal buffer from other CPTs, and delay msg if nothing to
+ * steal. This function is more expensive than a regular
+ * match, but we don't expect it can happen a lot. The return
+ * code contains one of LNET_MATCHMD_OK, LNET_MATCHMD_DROP, or
+ * LNET_MATCHMD_NONE.
+ */
+ LASSERT(lnet_ptl_is_wildcard(ptl));
+
+ for (i = 0; i < LNET_CPT_NUMBER; i++) {
+ struct lnet_match_table *mtable;
+ int cpt;
+
+ cpt = (first + i) % LNET_CPT_NUMBER;
+ mtable = ptl->ptl_mtables[cpt];
+ if (i != 0 && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled)
+ continue;
+
+ lnet_res_lock(cpt);
+ lnet_ptl_lock(ptl);
+
+ if (i == 0) {
+ /* The first try, add to stealing list. */
+ list_add_tail(&msg->msg_list,
+ &ptl->ptl_msg_stealing);
+ }
+
+ if (!list_empty(&msg->msg_list)) {
+ /* On stealing list. */
+ rc = lnet_mt_match_md(mtable, info, msg);
+
+ if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 &&
+ mtable->mt_enabled)
+ lnet_ptl_disable_mt(ptl, cpt);
+
+ if ((rc & LNET_MATCHMD_FINISH) != 0) {
+ /* Match found, remove from stealing list. */
+ list_del_init(&msg->msg_list);
+ } else if (i == LNET_CPT_NUMBER - 1 || /* (1) */
+ ptl->ptl_mt_nmaps == 0 || /* (2) */
+ (ptl->ptl_mt_nmaps == 1 && /* (3) */
+ ptl->ptl_mt_maps[0] == cpt)) {
+ /*
+ * No match found, and this is either
+ * (1) the last cpt to check, or
+ * (2) there is no active cpt, or
+ * (3) this is the only active cpt.
+ * There is nothing to steal: delay or
+ * drop the message.
+ */
+ list_del_init(&msg->msg_list);
+
+ if (lnet_ptl_is_lazy(ptl)) {
+ msg->msg_rx_delayed = 1;
+ list_add_tail(&msg->msg_list,
+ &ptl->ptl_msg_delayed);
+ rc = LNET_MATCHMD_NONE;
+ } else {
+ rc = LNET_MATCHMD_DROP;
+ }
+ } else {
+ /* Do another iteration. */
+ rc = 0;
+ }
+ } else {
+ /*
+ * No longer on stealing list: another thread
+ * matched the message in lnet_ptl_attach_md().
+ * We are now expected to handle the message.
+ */
+ rc = msg->msg_md == NULL ?
+ LNET_MATCHMD_DROP : LNET_MATCHMD_OK;
+ }
+
+ lnet_ptl_unlock(ptl);
+ lnet_res_unlock(cpt);
+
+ /*
+ * Note that test (1) above ensures that we always
+ * exit the loop through this break statement.
+ *
+ * LNET_MATCHMD_NONE means msg was added to the
+ * delayed queue, and we may no longer reference it
+ * after lnet_ptl_unlock() and lnet_res_unlock().
+ */
+ if (rc & (LNET_MATCHMD_FINISH | LNET_MATCHMD_NONE))
+ break;
+ }
+
+ return rc;