Whamcloud - gitweb
LU-56 lnet: move "match" functions to lib-ptl.c
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index 9661ec3..683d555 100644 (file)
@@ -44,172 +44,6 @@ static int local_nid_dist_zero = 1;
 CFS_MODULE_PARM(local_nid_dist_zero, "i", int, 0444,
                 "Reserved");
 
-/* forward ref */
-static void lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg);
-
-#define LNET_MATCHMD_NONE     0   /* Didn't match */
-#define LNET_MATCHMD_OK       1   /* Matched OK */
-#define LNET_MATCHMD_DROP     2   /* Must be discarded */
-
-static int
-lnet_try_match_md (int index, int op_mask, lnet_process_id_t src,
-                   unsigned int rlength, unsigned int roffset,
-                   __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
-                   unsigned int *mlength_out, unsigned int *offset_out)
-{
-        /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
-         * lnet_match_blocked_msg() relies on this to avoid races */
-        unsigned int  offset;
-        unsigned int  mlength;
-        lnet_me_t    *me = md->md_me;
-
-        /* mismatched MD op */
-        if ((md->md_options & op_mask) == 0)
-                return LNET_MATCHMD_NONE;
-
-        /* MD exhausted */
-        if (lnet_md_exhausted(md))
-                return LNET_MATCHMD_NONE;
-
-        /* mismatched ME nid/pid? */
-        if (me->me_match_id.nid != LNET_NID_ANY &&
-            me->me_match_id.nid != src.nid)
-                return LNET_MATCHMD_NONE;
-
-        if (me->me_match_id.pid != LNET_PID_ANY &&
-            me->me_match_id.pid != src.pid)
-                return LNET_MATCHMD_NONE;
-
-        /* mismatched ME matchbits? */
-        if (((me->me_match_bits ^ match_bits) & ~me->me_ignore_bits) != 0)
-                return LNET_MATCHMD_NONE;
-
-        /* Hurrah! This _is_ a match; check it out... */
-
-        if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
-                offset = md->md_offset;
-        else
-                offset = roffset;
-
-        if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
-                mlength = md->md_max_size;
-                LASSERT (md->md_offset + mlength <= md->md_length);
-        } else {
-                mlength = md->md_length - offset;
-        }
-
-        if (rlength <= mlength) {        /* fits in allowed space */
-                mlength = rlength;
-        } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
-                /* this packet _really_ is too big */
-                CERROR("Matching packet from %s, match "LPU64
-                       " length %d too big: %d left, %d allowed\n",
-                       libcfs_id2str(src), match_bits, rlength,
-                       md->md_length - offset, mlength);
-
-                return LNET_MATCHMD_DROP;
-        }
-
-        /* Commit to this ME/MD */
-        CDEBUG(D_NET, "Incoming %s index %x from %s of "
-               "length %d/%d into md "LPX64" [%d] + %d\n",
-               (op_mask == LNET_MD_OP_PUT) ? "put" : "get",
-               index, libcfs_id2str(src), mlength, rlength,
-               md->md_lh.lh_cookie, md->md_niov, offset);
-
-        lnet_commit_md(md, msg);
-        md->md_offset = offset + mlength;
-
-        /* NB Caller will set ev.type and ev.hdr_data */
-        msg->msg_ev.initiator = src;
-        msg->msg_ev.pt_index = index;
-        msg->msg_ev.match_bits = match_bits;
-        msg->msg_ev.rlength = rlength;
-        msg->msg_ev.mlength = mlength;
-        msg->msg_ev.offset = offset;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
-        *offset_out = offset;
-        *mlength_out = mlength;
-
-        /* Auto-unlink NOW, so the ME gets unlinked if required.
-         * We bumped md->md_refcount above so the MD just gets flagged
-         * for unlink when it is finalized. */
-        if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
-            lnet_md_exhausted(md)) {
-                lnet_md_unlink(md);
-        }
-
-        return LNET_MATCHMD_OK;
-}
-
-static int
-lnet_match_md(int index, int op_mask, lnet_process_id_t src,
-              unsigned int rlength, unsigned int roffset,
-              __u64 match_bits, lnet_msg_t *msg,
-              unsigned int *mlength_out, unsigned int *offset_out,
-              lnet_libmd_t **md_out)
-{
-        lnet_portal_t    *ptl = &the_lnet.ln_portals[index];
-        cfs_list_t       *head;
-        lnet_me_t        *me;
-        lnet_me_t        *tmp;
-        lnet_libmd_t     *md;
-        int               rc;
-
-        CDEBUG (D_NET, "Request from %s of length %d into portal %d "
-                "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
-
-        if (index < 0 || index >= the_lnet.ln_nportals) {
-                CERROR("Invalid portal %d not in [0-%d]\n",
-                       index, the_lnet.ln_nportals);
-                return LNET_MATCHMD_DROP;
-        }
-
-        head = lnet_portal_me_head(index, src, match_bits);
-        if (head == NULL) /* nobody posted anything on this portal */
-                goto out;
-
-        cfs_list_for_each_entry_safe_typed (me, tmp, head,
-                                            lnet_me_t, me_list) {
-                md = me->me_md;
-
-                /* ME attached but MD not attached yet */
-                if (md == NULL)
-                        continue;
-
-                LASSERT (me == md->md_me);
-
-                rc = lnet_try_match_md(index, op_mask, src, rlength,
-                                       roffset, match_bits, md, msg,
-                                       mlength_out, offset_out);
-                switch (rc) {
-                default:
-                        LBUG();
-
-                case LNET_MATCHMD_NONE:
-                        continue;
-
-                case LNET_MATCHMD_OK:
-                        *md_out = md;
-                        return LNET_MATCHMD_OK;
-
-                case LNET_MATCHMD_DROP:
-                        return LNET_MATCHMD_DROP;
-                }
-                /* not reached */
-        }
-
- out:
-        if (op_mask == LNET_MD_OP_GET ||
-            !lnet_portal_is_lazy(ptl))
-                return LNET_MATCHMD_DROP;
-
-        return LNET_MATCHMD_NONE;
-}
-
 int
 lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
 {
@@ -1502,7 +1336,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
         return 0;
 }
 
-static void
+void
 lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
 {
         /* ALWAYS called holding the LNET_LOCK */
@@ -1543,140 +1377,6 @@ lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
 }
 
 static void
-lnet_drop_delayed_put(lnet_msg_t *msg, char *reason)
-{
-        lnet_process_id_t id = {0};
-
-        id.nid = msg->msg_hdr.src_nid;
-        id.pid = msg->msg_hdr.src_pid;
-
-        LASSERT (msg->msg_md == NULL);
-        LASSERT (msg->msg_delayed);
-        LASSERT (msg->msg_rxpeer != NULL);
-        LASSERT (msg->msg_hdr.type == LNET_MSG_PUT);
-
-        CWARN("Dropping delayed PUT from %s portal %d match "LPU64
-              " offset %d length %d: %s\n", 
-              libcfs_id2str(id),
-              msg->msg_hdr.msg.put.ptl_index,
-              msg->msg_hdr.msg.put.match_bits,
-              msg->msg_hdr.msg.put.offset,
-              msg->msg_hdr.payload_length,
-              reason);
-
-        /* NB I can't drop msg's ref on msg_rxpeer until after I've
-         * called lnet_drop_message(), so I just hang onto msg as well
-         * until that's done */
-
-        lnet_drop_message(msg->msg_rxpeer->lp_ni,
-                          msg->msg_private, msg->msg_len);
-
-        LNET_LOCK();
-
-        lnet_peer_decref_locked(msg->msg_rxpeer);
-        msg->msg_rxpeer = NULL;
-
-       lnet_msg_free_locked(msg);
-
-        LNET_UNLOCK();
-}
-
-/**
- * Turn on the lazy portal attribute. Use with caution!
- *
- * This portal attribute only affects incoming PUT requests to the portal,
- * and is off by default. By default, if there's no matching MD for an
- * incoming PUT request, it is simply dropped. With the lazy attribute on,
- * such requests are queued indefinitely until either a matching MD is
- * posted to the portal or the lazy attribute is turned off.
- *
- * It would prevent dropped requests, however it should be regarded as the
- * last line of defense - i.e. users must keep a close watch on active
- * buffers on a lazy portal and once it becomes too low post more buffers as
- * soon as possible. This is because delayed requests usually have detrimental
- * effects on underlying network connections. A few delayed requests often
- * suffice to bring an underlying connection to a complete halt, due to flow
- * control mechanisms.
- *
- * There's also a DOS attack risk. If users don't post match-all MDs on a
- * lazy portal, a malicious peer can easily stop a service by sending some
- * PUT requests with match bits that won't match any MD. A routed server is
- * especially vulnerable since the connections to its neighbor routers are
- * shared among all clients.
- *
- * \param portal Index of the portal to enable the lazy attribute on.
- *
- * \retval 0       On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetSetLazyPortal(int portal)
-{
-        lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
-
-        if (portal < 0 || portal >= the_lnet.ln_nportals)
-                return -EINVAL;
-
-        CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
-
-        LNET_LOCK();
-        lnet_portal_setopt(ptl, LNET_PTL_LAZY);
-        LNET_UNLOCK();
-
-        return 0;
-}
-
-/**
- * Turn off the lazy portal attribute. Delayed requests on the portal,
- * if any, will be all dropped when this function returns.
- *
- * \param portal Index of the portal to disable the lazy attribute on.
- *
- * \retval 0       On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetClearLazyPortal(int portal)
-{
-        cfs_list_t        zombies;
-        lnet_portal_t    *ptl = &the_lnet.ln_portals[portal];
-        lnet_msg_t       *msg;
-
-        if (portal < 0 || portal >= the_lnet.ln_nportals)
-                return -EINVAL;
-
-        LNET_LOCK();
-
-        if (!lnet_portal_is_lazy(ptl)) {
-                LNET_UNLOCK();
-                return 0;
-        }
-
-        if (the_lnet.ln_shutdown)
-                CWARN ("Active lazy portal %d on exit\n", portal);
-        else
-                CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
-
-        /* grab all the blocked messages atomically */
-        cfs_list_add(&zombies, &ptl->ptl_msgq);
-        cfs_list_del_init(&ptl->ptl_msgq);
-
-        ptl->ptl_msgq_version++;
-        lnet_portal_unsetopt(ptl, LNET_PTL_LAZY);
-
-        LNET_UNLOCK();
-
-        while (!cfs_list_empty(&zombies)) {
-                msg = cfs_list_entry(zombies.next, lnet_msg_t, msg_list);
-                cfs_list_del(&msg->msg_list);
-
-                lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
-        }
-
-        return 0;
-}
-
-static void
 lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
               unsigned int offset, unsigned int mlength)
 {
@@ -1708,104 +1408,6 @@ lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
                      hdr->payload_length);
 }
 
-/* called with LNET_LOCK held */
-void
-lnet_match_blocked_msg(lnet_libmd_t *md)
-{
-        CFS_LIST_HEAD    (drops);
-        CFS_LIST_HEAD    (matches);
-        cfs_list_t       *tmp;
-        cfs_list_t       *entry;
-        lnet_msg_t       *msg;
-        lnet_portal_t    *ptl;
-        lnet_me_t        *me  = md->md_me;
-
-        LASSERT (me->me_portal < (unsigned int)the_lnet.ln_nportals);
-
-        ptl = &the_lnet.ln_portals[me->me_portal];
-        if (!lnet_portal_is_lazy(ptl)) {
-                LASSERT (cfs_list_empty(&ptl->ptl_msgq));
-                return;
-        }
-
-        LASSERT (md->md_refcount == 0); /* a brand new MD */
-
-        cfs_list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
-                int               rc;
-                int               index;
-                unsigned int      mlength;
-                unsigned int      offset;
-                lnet_hdr_t       *hdr;
-                lnet_process_id_t src;
-
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                LASSERT (msg->msg_delayed);
-
-                hdr   = &msg->msg_hdr;
-                index = hdr->msg.put.ptl_index;
-
-                src.nid = hdr->src_nid;
-                src.pid = hdr->src_pid;
-
-                rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
-                                       hdr->payload_length,
-                                       hdr->msg.put.offset,
-                                       hdr->msg.put.match_bits,
-                                       md, msg, &mlength, &offset);
-
-                if (rc == LNET_MATCHMD_NONE)
-                        continue;
-
-                /* Hurrah! This _is_ a match */
-                cfs_list_del(&msg->msg_list);
-                ptl->ptl_msgq_version++;
-
-                if (rc == LNET_MATCHMD_OK) {
-                        cfs_list_add_tail(&msg->msg_list, &matches);
-
-                        CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
-                               "match "LPU64" offset %d length %d.\n",
-                               libcfs_id2str(src),
-                               hdr->msg.put.ptl_index,
-                               hdr->msg.put.match_bits,
-                               hdr->msg.put.offset,
-                               hdr->payload_length);
-                } else {
-                        LASSERT (rc == LNET_MATCHMD_DROP);
-
-                        cfs_list_add_tail(&msg->msg_list, &drops);
-                }
-
-                if (lnet_md_exhausted(md))
-                        break;
-        }
-
-        LNET_UNLOCK();
-
-        cfs_list_for_each_safe (entry, tmp, &drops) {
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                cfs_list_del(&msg->msg_list);
-
-                lnet_drop_delayed_put(msg, "Bad match");
-        }
-
-        cfs_list_for_each_safe (entry, tmp, &matches) {
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                cfs_list_del(&msg->msg_list);
-
-                /* md won't disappear under me, since each msg
-                 * holds a ref on it */
-                lnet_recv_put(md, msg, 1,
-                              msg->msg_ev.offset,
-                              msg->msg_ev.mlength);
-        }
-
-        LNET_LOCK();
-}
-
 static int
 lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
 {
@@ -1847,7 +1449,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
                 return 0;
 
         case LNET_MATCHMD_NONE:
-                ptl = &the_lnet.ln_portals[index];
+               ptl = the_lnet.ln_portals[index];
                 version = ptl->ptl_ml_version;
 
                 rc = 0;
@@ -1856,7 +1458,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
 
                 if (rc == 0 &&
                     !the_lnet.ln_shutdown &&
-                    lnet_portal_is_lazy(ptl)) {
+                   lnet_ptl_is_lazy(ptl)) {
                         if (version != ptl->ptl_ml_version)
                                 goto again;
 
@@ -2410,6 +2012,80 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         return 0;
 }
 
+void
+lnet_drop_delayed_msg_list(cfs_list_t *head, char *reason)
+{
+       while (!cfs_list_empty(head)) {
+               lnet_process_id_t       id = {0};
+               lnet_msg_t              *msg;
+
+               msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+               cfs_list_del(&msg->msg_list);
+
+               id.nid = msg->msg_hdr.src_nid;
+               id.pid = msg->msg_hdr.src_pid;
+
+               LASSERT(msg->msg_md == NULL);
+               LASSERT(msg->msg_delayed);
+               LASSERT(msg->msg_rxpeer != NULL);
+               LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+               CWARN("Dropping delayed PUT from %s portal %d match "LPU64
+                     " offset %d length %d: %s\n",
+                     libcfs_id2str(id),
+                     msg->msg_hdr.msg.put.ptl_index,
+                     msg->msg_hdr.msg.put.match_bits,
+                     msg->msg_hdr.msg.put.offset,
+                     msg->msg_hdr.payload_length, reason);
+
+               /* NB I can't drop msg's ref on msg_rxpeer until after I've
+                * called lnet_drop_message(), so I just hang onto msg as well
+                * until that's done */
+
+               lnet_drop_message(msg->msg_rxpeer->lp_ni,
+                                 msg->msg_private, msg->msg_len);
+
+               LNET_LOCK();
+               lnet_peer_decref_locked(msg->msg_rxpeer);
+               LNET_UNLOCK();
+
+               lnet_msg_free(msg);
+       }
+}
+
+void
+lnet_recv_delayed_msg_list(cfs_list_t *head)
+{
+       while (!cfs_list_empty(head)) {
+               lnet_msg_t        *msg;
+               lnet_process_id_t  id;
+
+               msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+               cfs_list_del(&msg->msg_list);
+
+               /* md won't disappear under me, since each msg
+                * holds a ref on it */
+
+               id.nid = msg->msg_hdr.src_nid;
+               id.pid = msg->msg_hdr.src_pid;
+
+               LASSERT(msg->msg_delayed);
+               LASSERT(msg->msg_md != NULL);
+               LASSERT(msg->msg_rxpeer != NULL);
+               LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+               CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
+                      "match "LPU64" offset %d length %d.\n",
+                       libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
+                       msg->msg_hdr.msg.put.match_bits,
+                       msg->msg_hdr.msg.put.offset,
+                       msg->msg_hdr.payload_length);
+
+               lnet_recv_put(msg->msg_md, msg, 1,
+                             msg->msg_ev.offset, msg->msg_ev.mlength);
+       }
+}
+
 /**
  * Initiate an asynchronous PUT operation.
  *