Whamcloud - gitweb
b=16102
[fs/lustre-release.git] / lnet / selftest / rpc.c
index baf8935..4d8d653 100644 (file)
@@ -11,6 +11,8 @@
 #include "selftest.h"
 
 
+#define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
+
 typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
@@ -20,9 +22,6 @@ typedef enum {
         SRPC_STATE_STOPPING,
 } srpc_state_t;
 
-#define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
-#define SRPC_PEER_CREDITS         16   /* >= most LND's default peer credit */
-
 struct smoketest_rpc {
         spinlock_t        rpc_glock;     /* global lock */
         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
@@ -33,6 +32,10 @@ struct smoketest_rpc {
         __u64             rpc_matchbits; /* matchbits counter */
 } srpc_data;
 
+static int srpc_peer_credits = 16;
+CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444,
+                "# in-flight RPCs per peer (16 by default)");
+
 /* forward ref's */
 int srpc_handle_rpc (swi_workitem_t *wi);
 
@@ -171,7 +174,7 @@ srpc_create_peer (lnet_nid_t nid)
 
         memset(peer, 0, sizeof(srpc_peer_t));
         peer->stp_nid     = nid;
-        peer->stp_credits = SRPC_PEER_CREDITS;
+        peer->stp_credits = srpc_peer_credits;
 
         spin_lock_init(&peer->stp_lock);
         CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
@@ -198,8 +201,8 @@ srpc_find_peer_locked (lnet_nid_t nid)
 static srpc_peer_t *
 srpc_nid2peer (lnet_nid_t nid)
 {
-       srpc_peer_t *peer;
-       srpc_peer_t *new_peer;
+        srpc_peer_t *peer;
+        srpc_peer_t *new_peer;
 
         spin_lock(&srpc_data.rpc_glock);
         peer = srpc_find_peer_locked(nid);
@@ -207,7 +210,7 @@ srpc_nid2peer (lnet_nid_t nid)
 
         if (peer != NULL)
                 return peer;
-        
+
         new_peer = srpc_create_peer(nid);
 
         spin_lock(&srpc_data.rpc_glock);
@@ -225,7 +228,7 @@ srpc_nid2peer (lnet_nid_t nid)
                 spin_unlock(&srpc_data.rpc_glock);
                 return NULL;
         }
-                
+
         list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
         spin_unlock(&srpc_data.rpc_glock);
         return new_peer;
@@ -375,7 +378,7 @@ srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
 }
 
 int
-srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, 
+srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
                       int options, lnet_process_id_t peer, lnet_nid_t self,
                       lnet_handle_md_t *mdh, srpc_event_t *ev)
 {
@@ -399,11 +402,11 @@ srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
         /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
          * they're only meaningful for MDs attached to an ME (i.e. passive
          * buffers... */
-       if ((options & LNET_MD_OP_PUT) != 0) {
+        if ((options & LNET_MD_OP_PUT) != 0) {
                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
                              portal, matchbits, 0, 0);
         } else {
-               LASSERT ((options & LNET_MD_OP_GET) != 0);
+                LASSERT ((options & LNET_MD_OP_GET) != 0);
 
                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
         }
@@ -438,7 +441,7 @@ srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
         else
                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
 
-        rc = srpc_post_active_rdma(portal, service, buf, len, 
+        rc = srpc_post_active_rdma(portal, service, buf, len,
                                    LNET_MD_OP_PUT, peer,
                                    LNET_NID_ANY, mdh, ev);
         return rc;
@@ -506,7 +509,7 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
         spin_unlock(&sv->sv_lock);
         LIBCFS_FREE(buf, sizeof(*buf));
         spin_lock(&sv->sv_lock);
-        return rc; 
+        return rc;
 }
 
 int
@@ -918,7 +921,8 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 }
         }
         case SWI_STATE_BULK_STARTED:
-                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
+                /* we cannot LASSERT ev_fired right here because it
+                 * may be set only upon an event with unlinked==1 */
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = ev->ev_status;
@@ -927,11 +931,20 @@ srpc_handle_rpc (swi_workitem_t *wi)
                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
 
                         if (rc != 0) {
-                                srpc_server_rpc_done(rpc, rc);
-                                return 1;
+                                if (ev->ev_fired) {
+                                        srpc_server_rpc_done(rpc, rc);
+                                        return 1;
+                                }
+
+                                rpc->srpc_status = rc;
+                                wi->wi_state     = SWI_STATE_BULK_ERRORED;
+                                LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
+                                return 0; /* wait for UNLINK event  */
                         }
                 }
 
+                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
+
                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
                 rc = srpc_send_reply(rpc);
                 if (rc == 0)
@@ -945,6 +958,13 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 wi->wi_state = SWI_STATE_DONE;
                 srpc_server_rpc_done(rpc, ev->ev_status);
                 return 1;
+
+        case SWI_STATE_BULK_ERRORED:
+                LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired);
+                LASSERT (rpc->srpc_status != 0);
+
+                srpc_server_rpc_done(rpc, rpc->srpc_status);
+                return 1;
         }
 
         return 0;
@@ -982,20 +1002,20 @@ srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
         CFS_INIT_LIST_HEAD(&timer->stt_list);
         timer->stt_data    = rpc;
         timer->stt_func    = srpc_client_rpc_expired;
-        timer->stt_expires = cfs_time_add(rpc->crpc_timeout, 
+        timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
                                           cfs_time_current_sec());
         stt_add_timer(timer);
         return;
 }
 
-/* 
+/*
  * Called with rpc->crpc_lock held.
  *
  * Upon exit the RPC expiry timer is not queued and the handler is not
  * running on any CPU. */
 void
 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
-{     
+{
         /* timer not planted or already exploded */
         if (rpc->crpc_timeout == 0) return;
 
@@ -1007,7 +1027,7 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
         while (rpc->crpc_timeout != 0) {
                 spin_unlock(&rpc->crpc_lock);
 
-                cfs_schedule(); 
+                cfs_schedule();
 
                 spin_lock(&rpc->crpc_lock);
         }
@@ -1075,7 +1095,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
          * No one can schedule me now since:
          * - RPC timer has been defused.
          * - all LNet events have been fired.
-         * - crpc_closed has been set, preventing srpc_abort_rpc from 
+         * - crpc_closed has been set, preventing srpc_abort_rpc from
          *   scheduling me.
          * Cancel pending schedules and prevent future schedule attempts:
          */
@@ -1133,7 +1153,7 @@ srpc_send_rpc (swi_workitem_t *wi)
 
         case SWI_STATE_REQUEST_SUBMITTED:
                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
-                 * order; however, they're processed in a strict order: 
+                 * order; however, they're processed in a strict order:
                  * rqt, rpy, and bulk. */
                 if (!rpc->crpc_reqstev.ev_fired) break;
 
@@ -1150,7 +1170,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                 rc = rpc->crpc_replyev.ev_status;
                 if (rc != 0) break;
 
-                if ((reply->msg_type != type && 
+                if ((reply->msg_type != type &&
                      reply->msg_type != __swab32(type)) ||
                     (reply->msg_magic != SRPC_MSG_MAGIC &&
                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
@@ -1219,7 +1239,7 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service,
 {
         srpc_client_rpc_t *rpc;
 
-       LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
+        LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
                                    crpc_bulk.bk_iovs[nbulkiov]));
         if (rpc == NULL)
                 return NULL;
@@ -1368,7 +1388,7 @@ srpc_send_reply (srpc_server_rpc_t *rpc)
 }
 
 /* when in kernel always called with LNET_LOCK() held, and in thread context */
-void 
+void
 srpc_lnet_ev_handler (lnet_event_t *ev)
 {
         srpc_event_t      *rpcev = ev->md.user_ptr;
@@ -1378,6 +1398,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
         srpc_service_t    *sv;
         srpc_msg_t        *msg;
         srpc_msg_type_t    type;
+        int                fired_flag = 1;
 
         LASSERT (!in_interrupt());
 
@@ -1410,7 +1431,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
 
                 LASSERT (rpcev->ev_fired == 0);
                 rpcev->ev_fired  = 1;
-                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
+                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
                 swi_schedule_workitem(&crpc->crpc_wi);
 
@@ -1438,7 +1459,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 LASSERT (sv->sv_nposted_msg >= 0);
 
                 if (sv->sv_shuttingdown) {
-                        /* Leave buffer on sv->sv_posted_msgq since 
+                        /* Leave buffer on sv->sv_posted_msgq since
                          * srpc_finish_service needs to traverse it. */
                         spin_unlock(&sv->sv_lock);
                         break;
@@ -1449,7 +1470,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 type = srpc_service2request(sv->sv_id);
 
                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
-                    (msg->msg_type != type && 
+                    (msg->msg_type != type &&
                      msg->msg_type != __swab32(type)) ||
                     (msg->msg_magic != SRPC_MSG_MAGIC &&
                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
@@ -1499,10 +1520,13 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                          ev->type == LNET_EVENT_REPLY ||
                          ev->type == LNET_EVENT_UNLINK);
 
-                if (ev->type == LNET_EVENT_SEND && 
-                    ev->status == 0 && !ev->unlinked)
-                        break; /* wait for the final LNET_EVENT_REPLY */
-
+                if (ev->type == LNET_EVENT_SEND && !ev->unlinked) {
+                        if (ev->status == 0)
+                                break; /* wait for the final LNET_EVENT_REPLY */
+                        else
+                                fired_flag = 0; /* LNET_EVENT_REPLY may arrive
+                                                   (optimized GET case) */
+                }
         case SRPC_BULK_PUT_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
                         spin_lock(&srpc_data.rpc_glock);
@@ -1521,9 +1545,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 LASSERT (rpcev == &srpc->srpc_ev);
 
                 spin_lock(&sv->sv_lock);
-                rpcev->ev_fired  = 1;
-                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
+                if (fired_flag)
+                        rpcev->ev_fired  = 1;
+
+                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
+
                 srpc_schedule_server_rpc(srpc);
                 spin_unlock(&sv->sv_lock);
                 break;
@@ -1544,15 +1571,15 @@ srpc_check_event (int timeout)
         rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
                         timeout * 1000, &ev, &i);
         if (rc == 0) return 0;
-        
+
         LASSERT (rc == -EOVERFLOW || rc == 1);
-        
+
         /* We can't affort to miss any events... */
         if (rc == -EOVERFLOW) {
                 CERROR ("Dropped an event!!!\n");
                 abort();
         }
-                
+
         srpc_lnet_ev_handler(&ev);
         return 1;
 }
@@ -1565,6 +1592,18 @@ srpc_startup (void)
         int i;
         int rc;
 
+#ifndef __KERNEL__
+        char *s;
+
+        s = getenv("SRPC_PEER_CREDITS");
+        srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits;
+#endif
+
+        if (srpc_peer_credits <= 0) {
+                CERROR("Peer credits must be positive: %d\n", srpc_peer_credits);
+                return -EINVAL;
+        }
+
         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
         spin_lock_init(&srpc_data.rpc_glock);
 
@@ -1587,7 +1626,10 @@ srpc_startup (void)
 #ifdef __KERNEL__
         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
 #else
-        rc = LNetNIInit(getpid());
+        if (the_lnet.ln_server_mode_flag)
+                rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
+        else
+                rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
 #endif
         if (rc < 0) {
                 CERROR ("LNetNIInit() has failed: %d\n", rc);
@@ -1684,7 +1726,7 @@ srpc_shutdown (void)
 
                         LASSERT (list_empty(&peer->stp_rpcq));
                         LASSERT (list_empty(&peer->stp_ctl_rpcq));
-                        LASSERT (peer->stp_credits == SRPC_PEER_CREDITS);
+                        LASSERT (peer->stp_credits == srpc_peer_credits);
 
                         LIBCFS_FREE(peer, sizeof(srpc_peer_t));
                 }