Whamcloud - gitweb
LU-12678 lnet: remove LNetMEUnlink and clean up related code 46/38646/13
authorMr NeilBrown <neilb@suse.de>
Mon, 18 May 2020 00:45:05 +0000 (10:45 +1000)
committerOleg Drokin <green@whamcloud.com>
Fri, 10 Jul 2020 16:52:34 +0000 (16:52 +0000)
LNetMEUnlink is not particularly useful, and exposing it as an LNet
interface only provides the opportunity for it to be misused.

Every successful call to LNetMEAttach() is followed by a call to
LNetMDAttach().  If that call succeeds, the ME is owned by
the MD and the caller mustn't touch it again.
If the call fails, the caller is currently required to call
LNetMEUnlink(), which all callers do, and these are the only places
that LNetMEUnlink() are called.

As LNetMDAttach() knows when it will fail, it can unlink the ME itself
and save the caller the effort.
This allows LNetMEUnlink() to be removed which simplifies
the LNet interface.

LNetMEUnlink() is also used in in ptl_send_rpc() in a situation where
ptl_send_buf() fails.  In this case both the ME and the MD need to be
unlinked, as as they are interconnected, LNetMEUnlink() or
LNetMDUnlink() can equally do the job.  So change it to use
LNetMDUnlink().

LNetMEUnlink() is primarily a call the lnet_me_unlink(). It also
 - has some handling if ->me_md is not NULL, but that is never the
   case
 - takes the lnet_res_lock().  However LNetMDAttach() already
   takes that lock.
So none of this functionality is useful to LNetMDAttach().
On failure, it can call lnet_me_unlink() directly while ensuring
it still has the lock.

This patch:
 - moves the calls to lnet_md_validate() into lnet_md_build()
 - changes LNetMDAttach() to always take the lnet_res_lock(),
   and to call lnet_me_unlink() on failure.
 - removes all calls to LNetMEUnlink() and sometimes simplifies
   surrounding code.
 - changes lnet_md_link() to 'void' as it only ever returns
   '0', and thus simplify error handling in LNetMDAttach()
   and LNetMDBind()

Signed-off-by: Mr NeilBrown <neilb@suse.de>
Change-Id: Ied4e8fb544dbe1b32df7dc70439161dc74366a1d
Reviewed-on: https://review.whamcloud.com/38646
Reviewed-by: Yang Sheng <ys@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/api.h
lnet/lnet/api-ni.c
lnet/lnet/lib-md.c
lnet/lnet/lib-me.c
lnet/selftest/rpc.c
lustre/ptlrpc/niobuf.c

index f552349..b3901f5 100644 (file)
@@ -92,8 +92,8 @@ bool LNetIsPeerLocal(lnet_nid_t nid);
  * list is a chain of MEs. Each ME includes a pointer to a memory descriptor
  * and a set of match criteria. The match criteria can be used to reject
  * incoming requests based on process ID or the match bits provided in the
  * list is a chain of MEs. Each ME includes a pointer to a memory descriptor
  * and a set of match criteria. The match criteria can be used to reject
  * incoming requests based on process ID or the match bits provided in the
- * request. MEs can be dynamically inserted into a match list by LNetMEAttach()
- * and removed from its list by LNetMEUnlink().
+ * request. MEs can be dynamically inserted into a match list by LNetMEAttach(),
+ * and must then be attached to an MD with LNetMDAttach().
  * @{ */
 struct lnet_me *
 LNetMEAttach(unsigned int portal,
  * @{ */
 struct lnet_me *
 LNetMEAttach(unsigned int portal,
@@ -102,8 +102,6 @@ LNetMEAttach(unsigned int portal,
             __u64 ignore_bits_in,
             enum lnet_unlink unlink_in,
             enum lnet_ins_pos pos_in);
             __u64 ignore_bits_in,
             enum lnet_unlink unlink_in,
             enum lnet_ins_pos pos_in);
-
-void LNetMEUnlink(struct lnet_me *current_in);
 /** @} lnet_me */
 
 /** \defgroup lnet_md Memory descriptors
 /** @} lnet_me */
 
 /** \defgroup lnet_md Memory descriptors
index 8c97fba..1f933a1 100644 (file)
@@ -1724,14 +1724,12 @@ lnet_ping_target_setup(struct lnet_ping_buffer **ppbuf,
        rc = LNetMDAttach(me, &md, LNET_RETAIN, ping_mdh);
        if (rc != 0) {
                CERROR("Can't attach ping target MD: %d\n", rc);
        rc = LNetMDAttach(me, &md, LNET_RETAIN, ping_mdh);
        if (rc != 0) {
                CERROR("Can't attach ping target MD: %d\n", rc);
-               goto fail_unlink_ping_me;
+               goto fail_decref_ping_buffer;
        }
        lnet_ping_buffer_addref(*ppbuf);
 
        return 0;
 
        }
        lnet_ping_buffer_addref(*ppbuf);
 
        return 0;
 
-fail_unlink_ping_me:
-       LNetMEUnlink(me);
 fail_decref_ping_buffer:
        LASSERT(atomic_read(&(*ppbuf)->pb_refcnt) == 1);
        lnet_ping_buffer_decref(*ppbuf);
 fail_decref_ping_buffer:
        LASSERT(atomic_read(&(*ppbuf)->pb_refcnt) == 1);
        lnet_ping_buffer_decref(*ppbuf);
@@ -1933,7 +1931,6 @@ int lnet_push_target_post(struct lnet_ping_buffer *pbuf,
        rc = LNetMDAttach(me, &md, LNET_UNLINK, mdhp);
        if (rc) {
                CERROR("Can't attach push MD: %d\n", rc);
        rc = LNetMDAttach(me, &md, LNET_UNLINK, mdhp);
        if (rc) {
                CERROR("Can't attach push MD: %d\n", rc);
-               LNetMEUnlink(me);
                lnet_ping_buffer_decref(pbuf);
                pbuf->pb_needs_post = true;
                return rc;
                lnet_ping_buffer_decref(pbuf);
                pbuf->pb_needs_post = true;
                return rc;
index 268e856..61db468 100644 (file)
@@ -141,6 +141,8 @@ out:
        return cpt;
 }
 
        return cpt;
 }
 
+static int lnet_md_validate(const struct lnet_md *umd);
+
 static struct lnet_libmd *
 lnet_md_build(const struct lnet_md *umd, int unlink)
 {
 static struct lnet_libmd *
 lnet_md_build(const struct lnet_md *umd, int unlink)
 {
@@ -150,6 +152,9 @@ lnet_md_build(const struct lnet_md *umd, int unlink)
        struct lnet_libmd *lmd;
        unsigned int size;
 
        struct lnet_libmd *lmd;
        unsigned int size;
 
+       if (lnet_md_validate(umd) != 0)
+               return ERR_PTR(-EINVAL);
+
        if (umd->options & LNET_MD_KIOV)
                niov = umd->length;
        else
        if (umd->options & LNET_MD_KIOV)
                niov = umd->length;
        else
@@ -244,14 +249,13 @@ lnet_md_build(const struct lnet_md *umd, int unlink)
 }
 
 /* must be called with resource lock held */
 }
 
 /* must be called with resource lock held */
-static int
+static void
 lnet_md_link(struct lnet_libmd *md, lnet_handler_t handler, int cpt)
 {
        struct lnet_res_container *container = the_lnet.ln_md_containers[cpt];
 
        /* NB we are passed an allocated, but inactive md.
 lnet_md_link(struct lnet_libmd *md, lnet_handler_t handler, int cpt)
 {
        struct lnet_res_container *container = the_lnet.ln_md_containers[cpt];
 
        /* NB we are passed an allocated, but inactive md.
-        * if we return success, caller may lnet_md_unlink() it.
-        * otherwise caller may only lnet_md_free() it.
+        * Caller may lnet_md_unlink() it, or may lnet_md_free() it.
         */
        /* This implementation doesn't know how to create START events or
         * disable END events.  Best to LASSERT our caller is compliant so
         */
        /* This implementation doesn't know how to create START events or
         * disable END events.  Best to LASSERT our caller is compliant so
@@ -267,8 +271,6 @@ lnet_md_link(struct lnet_libmd *md, lnet_handler_t handler, int cpt)
 
        LASSERT(list_empty(&md->md_list));
        list_add(&md->md_list, &container->rec_active);
 
        LASSERT(list_empty(&md->md_list));
        list_add(&md->md_list, &container->rec_active);
-
-       return 0;
 }
 
 void lnet_assert_handler_unused(lnet_handler_t handler)
 }
 
 void lnet_assert_handler_unused(lnet_handler_t handler)
@@ -333,14 +335,11 @@ lnet_md_validate(const struct lnet_md *umd)
  * \param handle On successful returns, a handle to the newly created MD is
  * saved here. This handle can be used later in LNetMDUnlink().
  *
  * \param handle On successful returns, a handle to the newly created MD is
  * saved here. This handle can be used later in LNetMDUnlink().
  *
+ * The ME will either be linked to the new MD, or it will be freed.
+ *
  * \retval 0      On success.
  * \retval -EINVAL If \a umd is not valid.
  * \retval -ENOMEM If new MD cannot be allocated.
  * \retval 0      On success.
  * \retval -EINVAL If \a umd is not valid.
  * \retval -ENOMEM If new MD cannot be allocated.
- * \retval -ENOENT Either \a meh or \a umd.eq_handle does not point to a
- * valid object. Note that it's OK to supply a NULL \a umd.eq_handle by
- * calling LNetInvalidateHandle() on it.
- * \retval -EBUSY  If the ME pointed to by \a meh is already associated with
- * a MD.
  */
 int
 LNetMDAttach(struct lnet_me *me, const struct lnet_md *umd,
  */
 int
 LNetMDAttach(struct lnet_me *me, const struct lnet_md *umd,
@@ -350,33 +349,26 @@ LNetMDAttach(struct lnet_me *me, const struct lnet_md *umd,
        LIST_HEAD(drops);
        struct lnet_libmd       *md;
        int                     cpt;
        LIST_HEAD(drops);
        struct lnet_libmd       *md;
        int                     cpt;
-       int                     rc;
 
        LASSERT(the_lnet.ln_refcount > 0);
 
        LASSERT(the_lnet.ln_refcount > 0);
-
-       if (lnet_md_validate(umd) != 0)
-               return -EINVAL;
+       LASSERT(!me->me_md);
 
        if ((umd->options & (LNET_MD_OP_GET | LNET_MD_OP_PUT)) == 0) {
                CERROR("Invalid option: no MD_OP set\n");
 
        if ((umd->options & (LNET_MD_OP_GET | LNET_MD_OP_PUT)) == 0) {
                CERROR("Invalid option: no MD_OP set\n");
-               return -EINVAL;
-       }
-
-       md = lnet_md_build(umd, unlink);
-       if (IS_ERR(md))
-               return PTR_ERR(md);
+               md = ERR_PTR(-EINVAL);
+       } else
+               md = lnet_md_build(umd, unlink);
 
        cpt = me->me_cpt;
 
        cpt = me->me_cpt;
-
        lnet_res_lock(cpt);
 
        lnet_res_lock(cpt);
 
-       if (me->me_md)
-               rc = -EBUSY;
-       else
-               rc = lnet_md_link(md, umd->handler, cpt);
+       if (IS_ERR(md)) {
+               lnet_me_unlink(me);
+               lnet_res_unlock(cpt);
+               return PTR_ERR(md);
+       }
 
 
-       if (rc != 0)
-               goto out_unlock;
+       lnet_md_link(md, umd->handler, cpt);
 
        /* attach this MD to portal of ME and check if it matches any
         * blocked msgs on this portal */
 
        /* attach this MD to portal of ME and check if it matches any
         * blocked msgs on this portal */
@@ -390,11 +382,6 @@ LNetMDAttach(struct lnet_me *me, const struct lnet_md *umd,
        lnet_recv_delayed_msg_list(&matches);
 
        return 0;
        lnet_recv_delayed_msg_list(&matches);
 
        return 0;
-
-out_unlock:
-       lnet_res_unlock(cpt);
-       lnet_md_free(md);
-       return rc;
 }
 EXPORT_SYMBOL(LNetMDAttach);
 
 }
 EXPORT_SYMBOL(LNetMDAttach);
 
@@ -410,9 +397,6 @@ EXPORT_SYMBOL(LNetMDAttach);
  * \retval 0      On success.
  * \retval -EINVAL If \a umd is not valid.
  * \retval -ENOMEM If new MD cannot be allocated.
  * \retval 0      On success.
  * \retval -EINVAL If \a umd is not valid.
  * \retval -ENOMEM If new MD cannot be allocated.
- * \retval -ENOENT \a umd.eq_handle does not point to a valid EQ. Note that
- * it's OK to supply a NULL \a umd.eq_handle by calling
- * LNetInvalidateHandle() on it.
  */
 int
 LNetMDBind(const struct lnet_md *umd, enum lnet_unlink unlink,
  */
 int
 LNetMDBind(const struct lnet_md *umd, enum lnet_unlink unlink,
@@ -424,9 +408,6 @@ LNetMDBind(const struct lnet_md *umd, enum lnet_unlink unlink,
 
        LASSERT(the_lnet.ln_refcount > 0);
 
 
        LASSERT(the_lnet.ln_refcount > 0);
 
-       if (lnet_md_validate(umd) != 0)
-               return -EINVAL;
-
        if ((umd->options & (LNET_MD_OP_GET | LNET_MD_OP_PUT)) != 0) {
                CERROR("Invalid option: GET|PUT illegal on active MDs\n");
                return -EINVAL;
        if ((umd->options & (LNET_MD_OP_GET | LNET_MD_OP_PUT)) != 0) {
                CERROR("Invalid option: GET|PUT illegal on active MDs\n");
                return -EINVAL;
@@ -445,18 +426,13 @@ LNetMDBind(const struct lnet_md *umd, enum lnet_unlink unlink,
 
        cpt = lnet_res_lock_current();
 
 
        cpt = lnet_res_lock_current();
 
-       rc = lnet_md_link(md, umd->handler, cpt);
-       if (rc != 0)
-               goto out_unlock;
+       lnet_md_link(md, umd->handler, cpt);
 
        lnet_md2handle(handle, md);
 
        lnet_res_unlock(cpt);
        return 0;
 
 
        lnet_md2handle(handle, md);
 
        lnet_res_unlock(cpt);
        return 0;
 
- out_unlock:
-       lnet_res_unlock(cpt);
-
  out_free:
        lnet_md_free(md);
        return rc;
  out_free:
        lnet_md_free(md);
        return rc;
index 8dc8785..3b49dc0 100644 (file)
@@ -119,47 +119,6 @@ LNetMEAttach(unsigned int portal,
 }
 EXPORT_SYMBOL(LNetMEAttach);
 
 }
 EXPORT_SYMBOL(LNetMEAttach);
 
-/**
- * Unlink a match entry from its match list.
- *
- * This operation also releases any resources associated with the ME. If a
- * memory descriptor is attached to the ME, then it will be unlinked as well
- * and an unlink event will be generated. It is an error to use the ME handle
- * after calling LNetMEUnlink().
- *
- * \param meh A handle for the ME to be unlinked.
- *
- * \retval 0      On success.
- * \retval -ENOENT If \a meh does not point to a valid ME.
- * \see LNetMDUnlink() for the discussion on delivering unlink event.
- */
-void
-LNetMEUnlink(struct lnet_me *me)
-{
-       struct lnet_libmd *md;
-       struct lnet_event ev;
-       int cpt;
-
-       LASSERT(the_lnet.ln_refcount > 0);
-
-       cpt = me->me_cpt;
-       lnet_res_lock(cpt);
-
-       md = me->me_md;
-       if (md != NULL) {
-               md->md_flags |= LNET_MD_FLAG_ABORTED;
-               if (md->md_handler && md->md_refcount == 0) {
-                       lnet_build_unlink_event(md, &ev);
-                       md->md_handler(&ev);
-               }
-       }
-
-       lnet_me_unlink(me);
-
-       lnet_res_unlock(cpt);
-}
-EXPORT_SYMBOL(LNetMEUnlink);
-
 /* call with lnet_res_lock please */
 void
 lnet_me_unlink(struct lnet_me *me)
 /* call with lnet_res_lock please */
 void
 lnet_me_unlink(struct lnet_me *me)
index db6c704..6221b3d 100644 (file)
@@ -380,7 +380,6 @@ srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
                CERROR("LNetMDAttach failed: %d\n", rc);
                LASSERT(rc == -ENOMEM);
 
                CERROR("LNetMDAttach failed: %d\n", rc);
                LASSERT(rc == -ENOMEM);
 
-               LNetMEUnlink(me);
                return -ENOMEM;
        }
 
                return -ENOMEM;
        }
 
index 9fde3ac..9193550 100644 (file)
@@ -397,7 +397,6 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
                        CERROR("%s: LNetMDAttach failed x%llu/%d: rc = %d\n",
                               desc->bd_import->imp_obd->obd_name, mbits,
                               posted_md, rc);
                        CERROR("%s: LNetMDAttach failed x%llu/%d: rc = %d\n",
                               desc->bd_import->imp_obd->obd_name, mbits,
                               posted_md, rc);
-                       LNetMEUnlink(me);
                        break;
                }
        }
                        break;
                }
        }
@@ -711,28 +710,28 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 
        LNetInvalidateMDHandle(&bulk_cookie);
 
 
        LNetInvalidateMDHandle(&bulk_cookie);
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_RPC))
-                RETURN(0);
+       if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_RPC))
+               RETURN(0);
 
 
-        LASSERT(request->rq_type == PTL_RPC_MSG_REQUEST);
-        LASSERT(request->rq_wait_ctx == 0);
+       LASSERT(request->rq_type == PTL_RPC_MSG_REQUEST);
+       LASSERT(request->rq_wait_ctx == 0);
 
 
-        /* If this is a re-transmit, we're required to have disengaged
-         * cleanly from the previous attempt */
-        LASSERT(!request->rq_receiving_reply);
+       /* If this is a re-transmit, we're required to have disengaged
+        * cleanly from the previous attempt */
+       LASSERT(!request->rq_receiving_reply);
        LASSERT(!((lustre_msg_get_flags(request->rq_reqmsg) & MSG_REPLAY) &&
        LASSERT(!((lustre_msg_get_flags(request->rq_reqmsg) & MSG_REPLAY) &&
-               (imp->imp_state == LUSTRE_IMP_FULL)));
+                 (imp->imp_state == LUSTRE_IMP_FULL)));
 
        if (unlikely(obd != NULL && obd->obd_fail)) {
                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
 
        if (unlikely(obd != NULL && obd->obd_fail)) {
                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
-                       obd->obd_name);
+                      obd->obd_name);
                /* this prevents us from waiting in ptlrpc_queue_wait */
                spin_lock(&request->rq_lock);
                request->rq_err = 1;
                spin_unlock(&request->rq_lock);
                /* this prevents us from waiting in ptlrpc_queue_wait */
                spin_lock(&request->rq_lock);
                request->rq_err = 1;
                spin_unlock(&request->rq_lock);
-                request->rq_status = -ENODEV;
-                RETURN(-ENODEV);
-        }
+               request->rq_status = -ENODEV;
+               RETURN(-ENODEV);
+       }
 
        connection = imp->imp_connection;
 
 
        connection = imp->imp_connection;
 
@@ -781,7 +780,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
        LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL ||
                (imp->imp_msghdr_flags & MSGHDR_AT_SUPPORT) ||
                !(imp->imp_connect_data.ocd_connect_flags &
        LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL ||
                (imp->imp_msghdr_flags & MSGHDR_AT_SUPPORT) ||
                !(imp->imp_connect_data.ocd_connect_flags &
-               OBD_CONNECT_AT));
+                 OBD_CONNECT_AT));
 
        if (request->rq_resend) {
                lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
 
        if (request->rq_resend) {
                lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
@@ -808,16 +807,16 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                bulk_cookie = request->rq_bulk->bd_mds[0];
        }
 
                bulk_cookie = request->rq_bulk->bd_mds[0];
        }
 
-        if (!noreply) {
-                LASSERT (request->rq_replen != 0);
-                if (request->rq_repbuf == NULL) {
-                        LASSERT(request->rq_repdata == NULL);
-                        LASSERT(request->rq_repmsg == NULL);
-                        rc = sptlrpc_cli_alloc_repbuf(request,
-                                                      request->rq_replen);
-                        if (rc) {
-                                /* this prevents us from looping in
-                                 * ptlrpc_queue_wait */
+       if (!noreply) {
+               LASSERT (request->rq_replen != 0);
+               if (request->rq_repbuf == NULL) {
+                       LASSERT(request->rq_repdata == NULL);
+                       LASSERT(request->rq_repmsg == NULL);
+                       rc = sptlrpc_cli_alloc_repbuf(request,
+                                                     request->rq_replen);
+                       if (rc) {
+                               /* this prevents us from looping in
+                                * ptlrpc_queue_wait */
                                spin_lock(&request->rq_lock);
                                request->rq_err = 1;
                                spin_unlock(&request->rq_lock);
                                spin_lock(&request->rq_lock);
                                request->rq_err = 1;
                                spin_unlock(&request->rq_lock);
@@ -853,24 +852,24 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
        request->rq_receiving_reply = !noreply;
        /* Clear any flags that may be present from previous sends. */
        request->rq_req_unlinked = 0;
        request->rq_receiving_reply = !noreply;
        /* Clear any flags that may be present from previous sends. */
        request->rq_req_unlinked = 0;
-        request->rq_replied = 0;
-        request->rq_err = 0;
-        request->rq_timedout = 0;
-        request->rq_net_err = 0;
-        request->rq_resend = 0;
-        request->rq_restart = 0;
+       request->rq_replied = 0;
+       request->rq_err = 0;
+       request->rq_timedout = 0;
+       request->rq_net_err = 0;
+       request->rq_resend = 0;
+       request->rq_restart = 0;
        request->rq_reply_truncated = 0;
        spin_unlock(&request->rq_lock);
 
        request->rq_reply_truncated = 0;
        spin_unlock(&request->rq_lock);
 
-        if (!noreply) {
-                reply_md.start     = request->rq_repbuf;
-                reply_md.length    = request->rq_repbuf_len;
-                /* Allow multiple early replies */
-                reply_md.threshold = LNET_MD_THRESH_INF;
-                /* Manage remote for early replies */
-                reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
-                        LNET_MD_MANAGE_REMOTE |
-                        LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
+       if (!noreply) {
+               reply_md.start     = request->rq_repbuf;
+               reply_md.length    = request->rq_repbuf_len;
+               /* Allow multiple early replies */
+               reply_md.threshold = LNET_MD_THRESH_INF;
+               /* Manage remote for early replies */
+               reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
+                       LNET_MD_MANAGE_REMOTE |
+                       LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
                reply_md.user_ptr  = &request->rq_reply_cbid;
                reply_md.handler = ptlrpc_handler;
 
                reply_md.user_ptr  = &request->rq_reply_cbid;
                reply_md.handler = ptlrpc_handler;
 
@@ -885,7 +884,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                        /* ...but the MD attach didn't succeed... */
                        request->rq_receiving_reply = 0;
                        spin_unlock(&request->rq_lock);
                        /* ...but the MD attach didn't succeed... */
                        request->rq_receiving_reply = 0;
                        spin_unlock(&request->rq_lock);
-                       GOTO(cleanup_me, rc = -ENOMEM);
+                       GOTO(cleanup_bulk, rc = -ENOMEM);
                }
                percpu_ref_get(&ptlrpc_pending);
 
                }
                percpu_ref_get(&ptlrpc_pending);
 
@@ -895,20 +894,21 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                       request->rq_reply_portal);
        }
 
                       request->rq_reply_portal);
        }
 
-        /* add references on request for request_out_callback */
-        ptlrpc_request_addref(request);
+       /* add references on request for request_out_callback */
+       ptlrpc_request_addref(request);
        if (obd != NULL && obd->obd_svc_stats != NULL)
                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQACTIVE_CNTR,
        if (obd != NULL && obd->obd_svc_stats != NULL)
                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQACTIVE_CNTR,
-                       atomic_read(&imp->imp_inflight));
+                                   atomic_read(&imp->imp_inflight));
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);
 
        request->rq_sent_ns = ktime_get_real();
        request->rq_sent = ktime_get_real_seconds();
        /* We give the server rq_timeout secs to process the req, and
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);
 
        request->rq_sent_ns = ktime_get_real();
        request->rq_sent = ktime_get_real_seconds();
        /* We give the server rq_timeout secs to process the req, and
-          add the network latency for our local timeout. */
-        request->rq_deadline = request->rq_sent + request->rq_timeout +
-                ptlrpc_at_get_net_latency(request);
+        * add the network latency for our local timeout.
+        */
+       request->rq_deadline = request->rq_sent + request->rq_timeout +
+               ptlrpc_at_get_net_latency(request);
 
        ptlrpc_pinger_sending_on_import(imp);
 
 
        ptlrpc_pinger_sending_on_import(imp);
 
@@ -924,16 +924,12 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                GOTO(out, rc);
 
        request->rq_req_unlinked = 1;
                GOTO(out, rc);
 
        request->rq_req_unlinked = 1;
-        ptlrpc_req_finished(request);
-        if (noreply)
-                GOTO(out, rc);
-
- cleanup_me:
-       /* MEUnlink is safe; the PUT didn't even get off the ground, and
-        * nobody apart from the PUT's target has the right nid+XID to
-        * access the reply buffer.
-        */
-       LNetMEUnlink(reply_me);
+       ptlrpc_req_finished(request);
+       if (noreply)
+               GOTO(out, rc);
+
+       LNetMDUnlink(request->rq_reply_md_h);
+
        /* UNLINKED callback called synchronously */
        LASSERT(!request->rq_receiving_reply);
 
        /* UNLINKED callback called synchronously */
        LASSERT(!request->rq_receiving_reply);
 
@@ -1006,7 +1002,6 @@ int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 
        CERROR("ptlrpc: LNetMDAttach failed: rc = %d\n", rc);
        LASSERT(rc == -ENOMEM);
 
        CERROR("ptlrpc: LNetMDAttach failed: rc = %d\n", rc);
        LASSERT(rc == -ENOMEM);
-       LNetMEUnlink(me);
        LASSERT(rc == 0);
        rqbd->rqbd_refcount = 0;
 
        LASSERT(rc == 0);
        rqbd->rqbd_refcount = 0;