Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lustre / portals / portals / lib-move.c
index ecd543c..477ddf8 100644 (file)
 #include <portals/lib-p30.h>
 #include <portals/arg-blocks.h>
 
-/*
- * Right now it does not check access control lists.
- *
- * We only support one MD per ME, which is how the Portals 3.1 spec is written.
- * All previous complication is removed.
- */
-
-static lib_me_t *
-lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
-            ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
-            ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
-            ptl_size_t *offset_out, int *unlink_out)
+/* forward ref */
+static void lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg);
+
+static lib_md_t *
+lib_match_md(nal_cb_t *nal, int index, int op_mask, 
+             ptl_nid_t src_nid, ptl_pid_t src_pid, 
+             ptl_size_t rlength, ptl_size_t roffset,
+             ptl_match_bits_t match_bits, lib_msg_t *msg,
+             ptl_size_t *mlength_out, ptl_size_t *offset_out)
 {
         lib_ni_t         *ni = &nal->ni;
         struct list_head *match_list = &ni->tbl.tbl[index];
@@ -53,7 +50,6 @@ lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
         lib_md_t         *md;
         ptl_size_t        mlength;
         ptl_size_t        offset;
-
         ENTRY;
 
         CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
@@ -75,14 +71,14 @@ lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
 
                 LASSERT (me == md->me);
 
-                /* MD deactivated */
-                if (md->threshold == 0)
-                        continue;
-
                 /* mismatched MD op */
                 if ((md->options & op_mask) == 0)
                         continue;
 
+                /* MD exhausted */
+                if (lib_md_exhausted(md))
+                        continue;
+
                 /* mismatched ME nid/pid? */
                 if (me->match_id.nid != PTL_NID_ANY &&
                     me->match_id.nid != src_nid)
@@ -103,10 +99,12 @@ lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
                 else
                         offset = roffset;
 
-                mlength = md->length - offset;
-                if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
-                    mlength > md->max_size)
+                if ((md->options & PTL_MD_MAX_SIZE) != 0) {
                         mlength = md->max_size;
+                        LASSERT (md->offset + mlength <= md->length);
+                } else {
+                        mlength = md->length - offset;
+                }
 
                 if (rlength <= mlength) {        /* fits in allowed space */
                         mlength = rlength;
@@ -118,13 +116,38 @@ lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
                         goto failed;
                 }
 
+                /* Commit to this ME/MD */
+                CDEBUG(D_NET, "Incoming %s index %x from "LPU64"/%u of "
+                       "length %d/%d into md "LPX64" [%d] + %d\n", 
+                       (op_mask == PTL_MD_OP_PUT) ? "put" : "get",
+                       index, src_nid, src_pid, mlength, rlength, 
+                       md->md_lh.lh_cookie, md->md_niov, offset);
+
+                lib_commit_md(nal, md, msg);
                 md->offset = offset + mlength;
 
+                /* NB Caller sets ev.type and ev.hdr_data */
+                msg->ev.initiator.nid = src_nid;
+                msg->ev.initiator.pid = src_pid;
+                msg->ev.portal = index;
+                msg->ev.match_bits = match_bits;
+                msg->ev.rlength = rlength;
+                msg->ev.mlength = mlength;
+                msg->ev.offset = offset;
+
+                lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
+
                 *offset_out = offset;
                 *mlength_out = mlength;
-                *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
-                               md->offset >= (md->length - md->max_size));
-                RETURN (me);
+
+                /* Auto-unlink NOW, so the ME gets unlinked if required.
+                 * We bumped md->pending above so the MD just gets flagged
+                 * for unlink when it is finalized. */
+                if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) != 0 &&
+                    lib_md_exhausted(md))
+                        lib_md_unlink(nal, md);
+
+                RETURN (md);
         }
 
  failed:
@@ -627,9 +650,7 @@ parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
         lib_ni_t        *ni = &nal->ni;
         ptl_size_t       mlength = 0;
         ptl_size_t       offset = 0;
-        int              unlink = 0;
         ptl_err_t        rc;
-        lib_me_t        *me;
         lib_md_t        *md;
         unsigned long    flags;
                 
@@ -640,36 +661,19 @@ parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 
         state_lock(nal, &flags);
 
-        me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
-                         hdr->src_nid, hdr->src_pid,
-                         hdr->payload_length, hdr->msg.put.offset,
-                         hdr->msg.put.match_bits,
-                         &mlength, &offset, &unlink);
-        if (me == NULL) {
+        md = lib_match_md(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
+                          hdr->src_nid, hdr->src_pid,
+                          hdr->payload_length, hdr->msg.put.offset,
+                          hdr->msg.put.match_bits, msg,
+                          &mlength, &offset);
+        if (md == NULL) {
                 state_unlock(nal, &flags);
                 return (PTL_FAIL);
         }
 
-        md = me->md;
-        CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
-               "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
-               hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, 
-               md->md_lh.lh_cookie, md->md_niov, offset);
-
-        lib_commit_md(nal, md, msg);
-
-        msg->ev.type = PTL_EVENT_PUT;
-        msg->ev.initiator.nid = hdr->src_nid;
-        msg->ev.initiator.pid = hdr->src_pid;
-        msg->ev.portal = hdr->msg.put.ptl_index;
-        msg->ev.match_bits = hdr->msg.put.match_bits;
-        msg->ev.rlength = hdr->payload_length;
-        msg->ev.mlength = mlength;
-        msg->ev.offset = offset;
+        msg->ev.type = PTL_EVENT_PUT_END;
         msg->ev.hdr_data = hdr->msg.put.hdr_data;
 
-        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-
         if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
             !(md->options & PTL_MD_ACK_DISABLE)) {
                 msg->ack_wmd = hdr->msg.put.ack_wmd;
@@ -678,11 +682,6 @@ parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
         ni->counters.recv_count++;
         ni->counters.recv_length += mlength;
 
-        /* only unlink after MD's pending count has been bumped in
-         * lib_commit_md() otherwise lib_me_unlink() will nuke it */
-        if (unlink)
-                lib_me_unlink (nal, me);
-
         state_unlock(nal, &flags);
 
         rc = lib_recv(nal, private, msg, md, offset, mlength,
@@ -700,8 +699,6 @@ parse_get(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
         lib_ni_t        *ni = &nal->ni;
         ptl_size_t       mlength = 0;
         ptl_size_t       offset = 0;
-        int              unlink = 0;
-        lib_me_t        *me;
         lib_md_t        *md;
         ptl_hdr_t        reply;
         unsigned long    flags;
@@ -715,44 +712,22 @@ parse_get(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 
         state_lock(nal, &flags);
 
-        me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
-                         hdr->src_nid, hdr->src_pid,
-                         hdr->msg.get.sink_length, hdr->msg.get.src_offset,
-                         hdr->msg.get.match_bits,
-                         &mlength, &offset, &unlink);
-        if (me == NULL) {
+        md = lib_match_md(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
+                          hdr->src_nid, hdr->src_pid,
+                          hdr->msg.get.sink_length, hdr->msg.get.src_offset,
+                          hdr->msg.get.match_bits, msg,
+                          &mlength, &offset);
+        if (md == NULL) {
                 state_unlock(nal, &flags);
                 return (PTL_FAIL);
         }
 
-        md = me->md;
-        CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
-               "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
-               hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, 
-               md->md_lh.lh_cookie, md->md_niov, offset);
-
-        lib_commit_md(nal, md, msg);
-
-        msg->ev.type = PTL_EVENT_GET;
-        msg->ev.initiator.nid = hdr->src_nid;
-        msg->ev.initiator.pid = hdr->src_pid;
-        msg->ev.portal = hdr->msg.get.ptl_index;
-        msg->ev.match_bits = hdr->msg.get.match_bits;
-        msg->ev.rlength = hdr->payload_length;
-        msg->ev.mlength = mlength;
-        msg->ev.offset = offset;
+        msg->ev.type = PTL_EVENT_GET_END;
         msg->ev.hdr_data = 0;
 
-        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-
         ni->counters.send_count++;
         ni->counters.send_length += mlength;
 
-        /* only unlink after MD's refcount has been bumped in
-         * lib_commit_md() otherwise lib_me_unlink() will nuke it */
-        if (unlink)
-                lib_me_unlink (nal, me);
-
         state_unlock(nal, &flags);
 
         memset (&reply, 0, sizeof (reply));
@@ -828,7 +803,7 @@ parse_reply(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 
         lib_commit_md(nal, md, msg);
 
-        msg->ev.type = PTL_EVENT_REPLY;
+        msg->ev.type = PTL_EVENT_REPLY_END;
         msg->ev.initiator.nid = hdr->src_nid;
         msg->ev.initiator.pid = hdr->src_pid;
         msg->ev.rlength = rlength;
@@ -1044,8 +1019,6 @@ lib_parse(nal_cb_t *nal, ptl_hdr_t *hdr, void *private)
                 return;
         }
 
-        do_gettimeofday(&msg->ev.arrival_time);
-
         switch (hdr->type) {
         case PTL_MSG_ACK:
                 rc = parse_ack(nal, hdr, private, msg);
@@ -1112,14 +1085,14 @@ do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
         {
                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
                        nal->ni.nid, id->nid);
-                return (ret->rc = PTL_INV_PROC);
+                return (ret->rc = PTL_PROCESS_INVALID);
         }
 
         msg = lib_msg_alloc(nal);
         if (msg == NULL) {
                 CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n",
                        ni->nid, id->nid);
-                return (ret->rc = PTL_NOSPACE);
+                return (ret->rc = PTL_NO_SPACE);
         }
 
         state_lock(nal, &flags);
@@ -1129,7 +1102,7 @@ do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
                 lib_msg_free(nal, msg);
                 state_unlock(nal, &flags);
         
-                return (ret->rc = PTL_INV_MD);
+                return (ret->rc = PTL_MD_INVALID);
         }
 
         CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
@@ -1158,7 +1131,7 @@ do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
 
         lib_commit_md(nal, md, msg);
         
-        msg->ev.type = PTL_EVENT_SENT;
+        msg->ev.type = PTL_EVENT_SEND_END;
         msg->ev.initiator.nid = ni->nid;
         msg->ev.initiator.pid = ni->pid;
         msg->ev.portal = args->portal_in;
@@ -1188,19 +1161,18 @@ do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
 }
 
 lib_msg_t * 
-lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd)
+lib_create_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_msg_t *getmsg)
 {
         /* The NAL can DMA direct to the GET md (i.e. no REPLY msg).  This
-         * returns a msg the NAL can pass to lib_finalize() so that a REPLY
-         * event still occurs. 
+         * returns a msg for the NAL to pass to lib_finalize() when the sink
+         * data has been received.
          *
-         * CAVEAT EMPTOR: 'getmd' is passed by pointer so it MUST be valid.
-         * This can only be guaranteed while a lib_msg_t holds a reference
-         * on it (ie. pending > 0), so best call this before the
-         * lib_finalize() of the original GET. */
+         * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
+         * lib_finalize() is called on it, so the NAL must call this first */
 
         lib_ni_t        *ni = &nal->ni;
         lib_msg_t       *msg = lib_msg_alloc(nal);
+        lib_md_t        *getmd = getmsg->md;
         unsigned long    flags;
 
         state_lock(nal, &flags);
@@ -1225,7 +1197,7 @@ lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd)
 
         lib_commit_md (nal, getmd, msg);
 
-        msg->ev.type = PTL_EVENT_REPLY;
+        msg->ev.type = PTL_EVENT_REPLY_END;
         msg->ev.initiator.nid = peer_nid;
         msg->ev.initiator.pid = 0;      /* XXX FIXME!!! */
         msg->ev.rlength = msg->ev.mlength = getmd->length;
@@ -1281,14 +1253,14 @@ do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
         {
                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
                        nal->ni.nid, id->nid);
-                return (ret->rc = PTL_INV_PROC);
+                return (ret->rc = PTL_PROCESS_INVALID);
         }
 
         msg = lib_msg_alloc(nal);
         if (msg == NULL) {
                 CERROR(LPU64": Dropping GET to "LPU64": ENOMEM on lib_msg_t\n",
                        ni->nid, id->nid);
-                return (ret->rc = PTL_NOSPACE);
+                return (ret->rc = PTL_NO_SPACE);
         }
 
         state_lock(nal, &flags);
@@ -1298,7 +1270,7 @@ do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
                 lib_msg_free(nal, msg);
                 state_unlock(nal, &flags);
 
-                return ret->rc = PTL_INV_MD;
+                return ret->rc = PTL_MD_INVALID;
         }
 
         CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
@@ -1323,7 +1295,7 @@ do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
 
         lib_commit_md(nal, md, msg);
 
-        msg->ev.type = PTL_EVENT_SENT;
+        msg->ev.type = PTL_EVENT_SEND_END;
         msg->ev.initiator.nid = ni->nid;
         msg->ev.initiator.pid = ni->pid;
         msg->ev.portal = args->portal_in;