Whamcloud - gitweb
* Added loopback optimisation to lib-move.c
authoreeb <eeb>
Mon, 14 Feb 2005 15:30:19 +0000 (15:30 +0000)
committereeb <eeb>
Mon, 14 Feb 2005 15:30:19 +0000 (15:30 +0000)
     lctl --net ??? loopback             # show current state
     lctl --net ??? loopback on          # enable loopback optimisation
     lctl --net ??? loopback off         # disable loopback optimisation
     By default loopback optimisation is turned OFF

15 files changed:
lnet/include/libcfs/kp30.h
lnet/include/lnet/api.h
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-p30.h
lnet/include/lnet/lib-types.h
lnet/include/lnet/lnetctl.h
lnet/include/lnet/nal.h
lnet/include/lnet/ptlctl.h
lnet/lnet/api-wrap.c
lnet/lnet/lib-init.c
lnet/lnet/lib-move.c
lnet/lnet/lib-ni.c
lnet/lnet/module.c
lnet/utils/portals.c
lnet/utils/ptlctl.c

index 6582c3d..43853c1 100644 (file)
@@ -400,7 +400,7 @@ extern int portal_ioctl_getdata(char *buf, char *end, void *arg);
 #define IOC_PORTAL_NAL_CMD                 _IOWR('e', 35, IOCTL_PORTAL_TYPE)
 #define IOC_PORTAL_GET_NID                 _IOWR('e', 36, IOCTL_PORTAL_TYPE)
 #define IOC_PORTAL_FAIL_NID                _IOWR('e', 37, IOCTL_PORTAL_TYPE)
-
+#define IOC_PORTAL_LOOPBACK                _IOWR('e', 38, IOCTL_PORTAL_TYPE)
 #define IOC_PORTAL_LWT_CONTROL             _IOWR('e', 39, IOCTL_PORTAL_TYPE)
 #define IOC_PORTAL_LWT_SNAPSHOT            _IOWR('e', 40, IOCTL_PORTAL_TYPE)
 #define IOC_PORTAL_LWT_LOOKUP_STRING       _IOWR('e', 41, IOCTL_PORTAL_TYPE)
index 56b7b99..2d3a8f6 100644 (file)
@@ -35,7 +35,7 @@ int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t * interface_out);
 
 
 /* 
- * PtlNIFailNid
+ * PtlFailNid
  *
  * Not an official Portals 3 API call.  It provides a way of simulating
  * communications failures to all (nid == PTL_NID_ANY), or specific peers
@@ -45,6 +45,14 @@ int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t * interface_out);
  */
 int PtlFailNid (ptl_handle_ni_t ni, ptl_nid_t nid, unsigned int threshold);
 
+/* 
+ * PtlLoopback
+ *
+ * Not an official Portals 3 API call.  It provides a way of enabling or
+ * disabling loopback optimisation, or getting its current state.
+ */
+int PtlLoopback (ptl_handle_ni_t ni, int set, int *enabled);
+
 /*
  * PtlSnprintHandle: 
  *
index a407379..f56206b 100644 (file)
@@ -461,5 +461,6 @@ extern int lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh,
                        ptl_match_bits_t match_bits, 
                        ptl_size_t offset, ptl_hdr_data_t hdr_data);
 extern int lib_api_fail_nid(nal_t *apinal, ptl_nid_t nid, unsigned int threshold);
+extern int lib_api_loopback(nal_t *apinal, int set, int *enabled);
 
 #endif
index a407379..f56206b 100644 (file)
@@ -461,5 +461,6 @@ extern int lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh,
                        ptl_match_bits_t match_bits, 
                        ptl_size_t offset, ptl_hdr_data_t hdr_data);
 extern int lib_api_fail_nid(nal_t *apinal, ptl_nid_t nid, unsigned int threshold);
+extern int lib_api_loopback(nal_t *apinal, int set, int *enabled);
 
 #endif
index 1ac2c56..f6302ab 100644 (file)
@@ -259,6 +259,7 @@ typedef struct lib_ni
         __u64             ni_interface_cookie;  /* uniquely identifies this ni in this epoch */
         
         struct list_head  ni_test_peers;
+        int               ni_loopback;          /* loopback shortcircuits NAL */
         
 #ifdef PTL_USE_LIB_FREELIST
         lib_freelist_t    ni_free_mes;
@@ -358,4 +359,19 @@ typedef struct lib_nal
        int (*libnal_dist) (struct lib_nal *nal, ptl_nid_t nid, unsigned long *dist);
 } lib_nal_t;
 
+typedef struct                                  /* loopback descriptor */
+{
+        unsigned int     lod_type;
+        unsigned int     lod_niov;
+        size_t           lod_offset;
+        size_t           lod_nob;
+        union {
+                struct iovec  *iov;
+                ptl_kiov_t    *kiov;
+        }                lod_iov;
+} lo_desc_t;
+
+#define LOD_IOV     0xeb105
+#define LOD_KIOV    0xeb106
+
 #endif
index 99da747..cce160e 100644 (file)
@@ -63,6 +63,7 @@ int jt_ptl_del_route (int argc, char **argv);
 int jt_ptl_notify_router (int argc, char **argv);
 int jt_ptl_print_routes (int argc, char **argv);
 int jt_ptl_fail_nid (int argc, char **argv);
+int jt_ptl_loopback (int argc, char **argv);
 int jt_ptl_lwt(int argc, char **argv);
 int jt_ptl_memhog(int argc, char **argv);
 
index bf86569..aad611d 100644 (file)
@@ -32,6 +32,7 @@ struct nal_t {
        int (*nal_ni_status) (nal_t *nal, ptl_sr_index_t register, ptl_sr_value_t *status);
        int (*nal_ni_dist) (nal_t *nal, ptl_process_id_t *id, unsigned long *distance);
        int (*nal_fail_nid) (nal_t *nal, ptl_nid_t nid, unsigned int threshold);
+       int (*nal_loopback) (nal_t *nal, int set, int *enabled);
 
        int (*nal_me_attach) (nal_t *nal, ptl_pt_index_t portal,
                              ptl_process_id_t match_id, 
index 99da747..cce160e 100644 (file)
@@ -63,6 +63,7 @@ int jt_ptl_del_route (int argc, char **argv);
 int jt_ptl_notify_router (int argc, char **argv);
 int jt_ptl_print_routes (int argc, char **argv);
 int jt_ptl_fail_nid (int argc, char **argv);
+int jt_ptl_loopback (int argc, char **argv);
 int jt_ptl_lwt(int argc, char **argv);
 int jt_ptl_memhog(int argc, char **argv);
 
index c42e1e8..92f495e 100644 (file)
@@ -86,6 +86,20 @@ int PtlFailNid (ptl_handle_ni_t interface, ptl_nid_t nid, unsigned int threshold
         return nal->nal_fail_nid(nal, nid, threshold);
 }
 
+int PtlLoopback (ptl_handle_ni_t interface, int set, int *enabled)
+{
+        nal_t     *nal;
+
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&interface);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
+        
+        return nal->nal_loopback(nal, set, enabled);
+}
+
 int PtlNIStatus(ptl_handle_ni_t interface_in, ptl_sr_index_t register_in,
                 ptl_sr_value_t *status_out)
 {
index b0c55f9..6d0099c 100644 (file)
@@ -281,6 +281,7 @@ lib_init(lib_nal_t *libnal, nal_t *apinal,
         apinal->nal_ni_status = lib_api_ni_status;
         apinal->nal_ni_dist   = lib_api_ni_dist;
         apinal->nal_fail_nid  = lib_api_fail_nid;
+        apinal->nal_loopback  = lib_api_loopback;
         apinal->nal_me_attach = lib_api_me_attach;
         apinal->nal_me_insert = lib_api_me_insert;
         apinal->nal_me_unlink = lib_api_me_unlink;
@@ -356,6 +357,9 @@ lib_init(lib_nal_t *libnal, nal_t *apinal,
         if (actual_limits != NULL)
                 *actual_limits = ni->ni_actual_limits;
 
+        /* disable loopback optimisation by default */
+        ni->ni_loopback = 0;
+
  out:
         if (rc != PTL_OK) {
                 lib_cleanup_handle_hash (libnal);
index e9ef015..ed1c23e 100644 (file)
@@ -34,6 +34,8 @@
 
 /* forward ref */
 static void lib_commit_md (lib_nal_t *nal, lib_md_t *md, lib_msg_t *msg);
+static ptl_err_t do_lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, 
+                              void *private, int loopback);
 
 static lib_md_t *
 lib_match_md(lib_nal_t *nal, int index, int op_mask,
@@ -213,6 +215,19 @@ int lib_api_fail_nid (nal_t *apinal, ptl_nid_t nid, unsigned int threshold)
         return PTL_OK;
 }
 
+int 
+lib_api_loopback (nal_t *apinal, int set, int *enabled)
+{
+        lib_nal_t *nal = apinal->nal_data;
+
+        if (set)
+                nal->libnal_ni.ni_loopback = *enabled;
+        else
+                *enabled = nal->libnal_ni.ni_loopback;
+        
+        return PTL_OK;
+}
+
 static int
 fail_peer (lib_nal_t *nal, ptl_nid_t nid, int outgoing)
 {
@@ -556,6 +571,243 @@ lib_extract_kiov (int dst_niov, ptl_kiov_t *dst,
 #endif
 
 ptl_err_t
+lib_lo_rxiov(lib_nal_t    *nal,
+             void         *private,
+             lib_msg_t    *libmsg,
+             unsigned int  niov,
+             struct iovec *iov,
+             size_t        offset,
+             size_t        mlen,
+             size_t        rlen)
+{
+        lo_desc_t *lod = (lo_desc_t *)private;
+
+        /* I only handle mapped->mapped matches */
+        LASSERT(lod->lod_type == LOD_IOV);
+        LASSERT(mlen > 0);
+
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT(niov > 0);
+        }
+        
+        while (lod->lod_offset >= lod->lod_iov.iov->iov_len) {
+                lod->lod_offset -= lod->lod_iov.iov->iov_len;
+                lod->lod_iov.iov++;
+                lod->lod_niov--;
+                LASSERT(lod->lod_niov > 0);
+        }
+        
+        do {
+                int fraglen = MIN(iov->iov_len - offset,
+                                  lod->lod_iov.iov->iov_len - lod->lod_offset);
+
+                LASSERT(niov > 0);
+                LASSERT(lod->lod_niov > 0);
+
+                if (fraglen > mlen)
+                        fraglen = mlen;
+                
+                memcpy((void *)((unsigned long)iov->iov_base + offset),
+                       (void *)((unsigned long)lod->lod_iov.iov->iov_base +
+                                lod->lod_offset),
+                       fraglen);
+
+                if (offset + fraglen < iov->iov_len) {
+                        offset += fraglen;
+                } else {
+                        offset = 0;
+                        iov++;
+                        niov--;
+                }
+
+                if (lod->lod_offset + fraglen < lod->lod_iov.iov->iov_len ) {
+                        lod->lod_offset += fraglen;
+                } else {
+                        lod->lod_offset = 0;
+                        lod->lod_iov.iov++;
+                        lod->lod_niov--;
+                }
+
+                mlen -= fraglen;
+        } while (mlen > 0);
+        
+        lib_finalize(nal, private, libmsg, PTL_OK);
+        return PTL_OK;
+}
+
+ptl_err_t
+lib_lo_rxkiov(lib_nal_t    *nal,
+              void         *private,
+              lib_msg_t    *libmsg,
+              unsigned int  niov,
+              ptl_kiov_t   *kiov,
+              size_t        offset,
+              size_t        mlen,
+              size_t        rlen)
+{
+        void          *srcaddr = NULL;
+        void          *dstaddr = NULL;
+        unsigned long  srcfrag = 0;
+        unsigned long  dstfrag = 0;
+        unsigned long  fraglen;
+        lo_desc_t     *lod = (lo_desc_t *)private;
+
+        /* I only handle unmapped->unmapped matches */
+        LASSERT(lod->lod_type == LOD_KIOV);
+
+        if (mlen == 0)
+                return PTL_OK;
+
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT(niov > 0);
+        }
+
+        while (lod->lod_offset >= lod->lod_iov.kiov->kiov_len) {
+                lod->lod_offset -= lod->lod_iov.kiov->kiov_len;
+                lod->lod_iov.kiov++;
+                lod->lod_niov--;
+                LASSERT(lod->lod_niov > 0);
+        }
+
+        do {
+                /* CAVEAT EMPTOR: I kmap 2 pages at once == slight risk of deadlock */
+                LASSERT(niov > 0);
+                if (dstaddr == NULL) {
+                        dstaddr = (void *)((unsigned long)kmap(kiov->kiov_page) +
+                                           kiov->kiov_offset + offset);
+                        dstfrag = kiov->kiov_len -  offset;
+                }
+
+                LASSERT(lod->lod_niov > 0);
+                if (srcaddr == NULL) {
+                        srcaddr = (void *)((unsigned long)kmap(lod->lod_iov.kiov->kiov_page) +
+                                           lod->lod_iov.kiov->kiov_offset + lod->lod_offset);
+                        srcfrag = lod->lod_iov.kiov->kiov_len - lod->lod_offset;
+                }
+                
+                fraglen = MIN(srcfrag, dstfrag);
+                if (fraglen > mlen)
+                        fraglen = mlen;
+                
+                memcpy(dstaddr, srcaddr, fraglen);
+                
+                if (fraglen < dstfrag) {
+                        dstfrag -= fraglen;
+                        dstaddr = (void *)((unsigned long)dstaddr + fraglen);
+                } else {
+                        kunmap(kiov->kiov_page);
+                        dstaddr = NULL;
+                        offset = 0;
+                        kiov++;
+                        niov--;
+                }
+
+                if (fraglen < srcfrag) {
+                        srcfrag -= fraglen;
+                        srcaddr = (void *)((unsigned long)srcaddr + fraglen);
+                } else {
+                        kunmap(lod->lod_iov.kiov->kiov_page);
+                        srcaddr = NULL;
+                        lod->lod_offset = 0;
+                        lod->lod_iov.kiov++;
+                        lod->lod_niov--;
+                }
+
+                mlen -= fraglen;
+        } while (mlen > 0);
+
+        if (dstaddr != NULL)
+                kunmap(kiov->kiov_page);
+
+        if (srcaddr != NULL)
+                kunmap(lod->lod_iov.kiov->kiov_page);
+
+        lib_finalize(nal, private, libmsg, PTL_OK);
+        return PTL_OK;
+}
+
+ptl_err_t
+lib_lo_txiov (lib_nal_t    *nal,
+              void         *private,
+              lib_msg_t    *libmsg,
+              ptl_hdr_t    *hdr,
+              int           type,
+              ptl_nid_t     nid,
+              ptl_pid_t     pid,
+              unsigned int  payload_niov,
+              struct iovec *payload_iov,
+              size_t        payload_offset,
+              size_t        payload_nob)
+{
+        lo_desc_t lod = {
+                .lod_type    = LOD_IOV,
+                .lod_niov    = payload_niov,
+                .lod_offset  = payload_offset,
+                .lod_nob     = payload_nob,
+                .lod_iov     = { .iov = payload_iov } };
+        ptl_err_t rc;
+
+        rc = do_lib_parse(nal, hdr, &lod, 1);
+        if (rc == PTL_OK)
+                lib_finalize(nal, private, libmsg, PTL_OK);
+        
+        return rc;
+}
+
+ptl_err_t
+lib_lo_txkiov (lib_nal_t    *nal,
+               void         *private,
+               lib_msg_t    *libmsg,
+               ptl_hdr_t    *hdr,
+               int           type,
+               ptl_nid_t     nid,
+               ptl_pid_t     pid,
+               unsigned int  payload_niov,
+               ptl_kiov_t   *payload_kiov,
+               size_t        payload_offset,
+               size_t        payload_nob)
+{
+        lo_desc_t lod = {
+                .lod_type     = LOD_KIOV,
+                .lod_niov     = payload_niov,
+                .lod_offset   = payload_offset,
+                .lod_nob      = payload_nob,
+                .lod_iov      = { .kiov = payload_kiov } };
+        ptl_err_t   rc;
+
+        rc = do_lib_parse(nal, hdr, &lod, 1);
+        if (rc == PTL_OK)
+                lib_finalize(nal, private, libmsg, PTL_OK);
+        
+        return rc;
+}
+
+ptl_err_t
+lib_lo_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+             ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
+{
+        if (mlen == 0) {
+                lib_finalize(nal, private, msg, PTL_OK);
+                return PTL_OK;
+        }
+        
+        if ((md->options & PTL_MD_KIOV) == 0)
+                return lib_lo_rxiov(nal, private, msg,
+                                    md->md_niov, md->md_iov.iov,
+                                    offset, mlen, rlen);
+        
+        return lib_lo_rxkiov(nal, private, msg,
+                             md->md_niov, md->md_iov.kiov,
+                             offset, mlen, rlen);
+}
+
+ptl_err_t
 lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
 {
@@ -579,22 +831,45 @@ lib_send (lib_nal_t *nal, void *private, lib_msg_t *msg,
           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
           lib_md_t *md, ptl_size_t offset, ptl_size_t len)
 {
-        if (len == 0)
-                return (nal->libnal_send(nal, private, msg,
-                                         hdr, type, nid, pid,
-                                         0, NULL,
-                                         offset, len));
-
-        if ((md->options & PTL_MD_KIOV) == 0)
-                return (nal->libnal_send(nal, private, msg,
-                                         hdr, type, nid, pid,
-                                         md->md_niov, md->md_iov.iov,
-                                         offset, len));
-
-        return (nal->libnal_send_pages(nal, private, msg,
-                                       hdr, type, nid, pid,
-                                       md->md_niov, md->md_iov.kiov,
-                                       offset, len));
+        int loopback = (nal->libnal_ni.ni_loopback &&
+                        (nid == nal->libnal_ni.ni_pid.nid));
+
+        if (len == 0) {
+                if (loopback)
+                        return lib_lo_txiov(nal, private, msg,
+                                            hdr, type, nid, pid,
+                                            0, NULL,
+                                            offset, len);
+                else
+                        return nal->libnal_send(nal, private, msg,
+                                                hdr, type, nid, pid,
+                                                0, NULL,
+                                                offset, len);
+        }
+        
+        if ((md->options & PTL_MD_KIOV) == 0) {
+                if (loopback)
+                        return lib_lo_txiov(nal, private, msg,
+                                            hdr, type, nid, pid,
+                                            md->md_niov, md->md_iov.iov,
+                                            offset, len);
+                else
+                        return nal->libnal_send(nal, private, msg,
+                                                hdr, type, nid, pid,
+                                                md->md_niov, md->md_iov.iov,
+                                                offset, len);
+        }
+        
+        if (loopback)
+                return lib_lo_txkiov(nal, private, msg,
+                                     hdr, type, nid, pid,
+                                     md->md_niov, md->md_iov.kiov,
+                                     offset, len);
+        else
+                return nal->libnal_send_pages(nal, private, msg,
+                                              hdr, type, nid, pid,
+                                              md->md_niov, md->md_iov.kiov,
+                                              offset, len);
 }
 
 static void
@@ -622,7 +897,7 @@ lib_commit_md (lib_nal_t *nal, lib_md_t *md, lib_msg_t *msg)
 }
 
 static void
-lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr)
+lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr, int loopback)
 {
         unsigned long flags;
 
@@ -636,7 +911,9 @@ lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr)
         LIB_UNLOCK(nal, flags);
 
         /* NULL msg => if NAL calls lib_finalize it will be a noop */
-        (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
+        if (!loopback)
+                (void) lib_recv(nal, private, NULL, NULL, 0, 0,
+                                hdr->payload_length);
 }
 
 /*
@@ -647,7 +924,8 @@ lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr)
  *
  */
 static ptl_err_t
-parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, 
+          lib_msg_t *msg, int loopback)
 {
         lib_ni_t        *ni = &nal->libnal_ni;
         ptl_size_t       mlength = 0;
@@ -686,8 +964,13 @@ parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 
         LIB_UNLOCK(nal, flags);
 
-        rc = lib_recv(nal, private, msg, md, offset, mlength,
-                      hdr->payload_length);
+        if (loopback)
+                rc = lib_lo_recv(nal, private, msg, md, offset, mlength,
+                                 hdr->payload_length);
+        else
+                rc = lib_recv(nal, private, msg, md, offset, mlength,
+                              hdr->payload_length);
+
         if (rc != PTL_OK)
                 CERROR(LPU64": error on receiving PUT from "LPU64": %d\n",
                        ni->ni_pid.nid, hdr->src_nid, rc);
@@ -696,7 +979,8 @@ parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 }
 
 static ptl_err_t
-parse_get(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_get(lib_nal_t *nal, ptl_hdr_t *hdr, void *private,
+          lib_msg_t *msg, int loopback)
 {
         lib_ni_t        *ni = &nal->libnal_ni;
         ptl_size_t       mlength = 0;
@@ -752,13 +1036,16 @@ parse_get(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
                        ni->ni_pid.nid, hdr->src_nid, rc);
 
         /* Discard any junk after the hdr */
-        (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
+        if (!loopback)
+                (void) lib_recv(nal, private, NULL, NULL, 0, 0,
+                                hdr->payload_length);
 
         return (rc);
 }
 
 static ptl_err_t
-parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private,
+            lib_msg_t *msg, int loopback)
 {
         lib_ni_t        *ni = &nal->libnal_ni;
         lib_md_t        *md;
@@ -820,7 +1107,11 @@ parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 
         LIB_UNLOCK(nal, flags);
 
-        rc = lib_recv(nal, private, msg, md, 0, length, rlength);
+        if (loopback)
+                rc = lib_lo_recv(nal, private, msg, md, 0, length, rlength);
+        else
+                rc = lib_recv(nal, private, msg, md, 0, length, rlength);
+
         if (rc != PTL_OK)
                 CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n",
                        ni->ni_pid.nid, hdr->src_nid, rc);
@@ -829,7 +1120,8 @@ parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 }
 
 static ptl_err_t
-parse_ack(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_ack(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, 
+          lib_msg_t *msg, int loopback)
 {
         lib_ni_t      *ni = &nal->libnal_ni;
         lib_md_t      *md;
@@ -878,7 +1170,9 @@ parse_ack(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
         lib_finalize(nal, private, msg, PTL_OK);
 
         /* ...and now discard any junk after the hdr */
-        (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
+        if (!loopback)
+                (void) lib_recv(nal, private, NULL, NULL, 0, 0,
+                                hdr->payload_length);
 
        return (PTL_OK);
 }
@@ -959,6 +1253,12 @@ void print_hdr(lib_nal_t *nal, ptl_hdr_t * hdr)
 ptl_err_t
 lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
 {
+        return do_lib_parse(nal, hdr, private, 0);
+}
+
+ptl_err_t
+do_lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, int loopback)
+{
         unsigned long  flags;
         ptl_err_t      rc;
         lib_msg_t     *msg;
@@ -993,7 +1293,7 @@ lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
                                hdr->src_nid);
 
                         /* it's good but we don't want it */
-                        lib_drop_message(nal, private, hdr);
+                        lib_drop_message(nal, private, hdr, loopback);
                         return PTL_OK;
                 }
 
@@ -1036,7 +1336,7 @@ lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
                        ": simulated failure\n",
                        nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr),
                        hdr->src_nid);
-                lib_drop_message(nal, private, hdr);
+                lib_drop_message(nal, private, hdr, loopback);
                 return PTL_OK;
         }
 
@@ -1046,22 +1346,22 @@ lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
                        ": can't allocate a lib_msg_t\n",
                        nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr),
                        hdr->src_nid);
-                lib_drop_message(nal, private, hdr);
+                lib_drop_message(nal, private, hdr, loopback);
                 return PTL_OK;
         }
 
         switch (hdr->type) {
         case PTL_MSG_ACK:
-                rc = parse_ack(nal, hdr, private, msg);
+                rc = parse_ack(nal, hdr, private, msg, loopback);
                 break;
         case PTL_MSG_PUT:
-                rc = parse_put(nal, hdr, private, msg);
+                rc = parse_put(nal, hdr, private, msg, loopback);
                 break;
         case PTL_MSG_GET:
-                rc = parse_get(nal, hdr, private, msg);
+                rc = parse_get(nal, hdr, private, msg, loopback);
                 break;
         case PTL_MSG_REPLY:
-                rc = parse_reply(nal, hdr, private, msg);
+                rc = parse_reply(nal, hdr, private, msg, loopback);
                 break;
         default:
                 LASSERT(0);
@@ -1078,7 +1378,7 @@ lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
                         lib_msg_free(nal, msg); /* expects LIB_LOCK held */
                         LIB_UNLOCK(nal, flags);
 
-                        lib_drop_message(nal, private, hdr);
+                        lib_drop_message(nal, private, hdr, loopback);
                 }
         }
 
index cf273f8..e45859a 100644 (file)
@@ -19,5 +19,11 @@ int lib_api_ni_dist (nal_t *apinal, ptl_process_id_t *pid, unsigned long *dist)
 {
         lib_nal_t *nal = apinal->nal_data;
 
+        if (nal->libnal_ni.ni_loopback &&
+            pid->nid == nal->libnal_ni.ni_pid.nid) {
+                *dist = 0;
+                return PTL_OK;
+        }
+        
         return (nal->libnal_dist(nal, pid->nid, dist));
 }
index 21b91a5..472175b 100644 (file)
@@ -102,6 +102,41 @@ static int kportal_ioctl(struct portal_ioctl_data *data,
                 PtlNIFini(nih);
                 RETURN (err);
         }
+
+        case IOC_PORTAL_LOOPBACK: {
+                ptl_handle_ni_t  nih;
+                int              enabled = data->ioc_flags;
+                int              set = data->ioc_misc;
+
+                CDEBUG (D_IOCTL, "loopback: [%d] %d %d\n",
+                        data->ioc_nal, enabled, set);
+
+                err = PtlNIInit(data->ioc_nal, LUSTRE_SRV_PTL_PID, NULL,
+                                NULL, &nih);
+                if (!(err == PTL_OK || err == PTL_IFACE_DUP))
+                        return (-EINVAL);
+
+                if (err == PTL_OK) {
+                        /* There's no point in failing an interface that
+                         * came into existance just for this */
+                        err = -EINVAL;
+                } else {
+                        err = PtlLoopback (nih, set, &enabled);
+                        if (err != PTL_OK) {
+                                err = -EINVAL;
+                        } else {
+                                data->ioc_flags = enabled;
+                                if (copy_to_user ((char *)arg, data, 
+                                                  sizeof (*data)))
+                                        err = -EFAULT;
+                                else
+                                        err = 0;
+                        }
+                }
+
+                PtlNIFini(nih);
+                RETURN (err);
+        }
         default:
                 RETURN(-EINVAL);
         }
index 847fb00..09fdf5f 100644 (file)
@@ -1319,6 +1319,46 @@ jt_ptl_fail_nid (int argc, char **argv)
 }
 
 int
+jt_ptl_loopback (int argc, char **argv)
+{
+        int                      rc;
+        ptl_nid_t                nid;
+        unsigned int             threshold;
+        int                      set;
+        int                      enable;
+        struct portal_ioctl_data data;
+
+        if (argc > 2)
+        {
+                fprintf (stderr, "usage: %s [on|off]\n", argv[0]);
+                return (0);
+        }
+        
+        if (!g_nal_is_set())
+                return (-1);
+
+        set = argc > 1;
+        if (set && ptl_parse_bool (&enable, argv[1]) != 0) {
+                fprintf (stderr, "Can't parse boolean %s\n", argv[1]);
+                return (-1);
+        }
+
+        PORTAL_IOC_INIT (data);
+        data.ioc_nal = g_nal;
+        data.ioc_flags = enable;
+        data.ioc_misc = set;
+        
+        rc = l_ioctl (PORTALS_DEV_ID, IOC_PORTAL_LOOPBACK, &data);
+        if (rc < 0)
+                fprintf (stderr, "IOC_PORTAL_FAIL_NID failed: %s\n",
+                         strerror (errno));
+        else
+                printf ("loopback %s\n", data.ioc_flags ? "enabled" : "disabled");
+        
+        return (0);
+}
+
+int
 jt_ptl_add_route (int argc, char **argv)
 {
         struct portals_cfg       pcfg;
index 03cfe77..3089211 100644 (file)
@@ -53,6 +53,7 @@ command_t list[] = {
         {"print_routes", jt_ptl_print_routes, 0, "print the routing table (args: none)"},
         {"dump", jt_ioc_dump, 0, "usage: dump file, save ioctl buffer to file"},
         {"fail", jt_ptl_fail_nid, 0, "usage: fail nid|_all_ [count]"},
+        {"loopback", jt_ptl_loopback, 0, "usage: loopback [on|off]"},
         {"help", Parser_help, 0, "help"},
         {"exit", Parser_quit, 0, "quit"},
         {"quit", Parser_quit, 0, "quit"},