Whamcloud - gitweb
b=16102
authormaxim <maxim>
Thu, 17 Jul 2008 21:23:18 +0000 (21:23 +0000)
committermaxim <maxim>
Thu, 17 Jul 2008 21:23:18 +0000 (21:23 +0000)
i=isaac
i=liang
Landing the patch (attachment #17747) on LNET HEAD.

lnet/ChangeLog
lnet/selftest/rpc.c
lnet/selftest/selftest.h

index d1580aa..7241c42 100644 (file)
@@ -17,6 +17,12 @@ Bugzilla   :
 Description: 
 Details    : 
 
+Severity   : normal
+Bugzilla   : 16102
+Description: LNET Selftest results in Soft lockup on OSS CPU
+Details    : only hits when 8 or more o2ib clients involved and a session is
+             torn down with 'lst end_session' without preceeding 'lst stop'.
+
 Severity   : minor
 Bugzilla   : 16321
 Description: concurrent_sends in IB LNDs should not be changeable at run time
index ac5a8fe..4d8d653 100644 (file)
@@ -201,8 +201,8 @@ srpc_find_peer_locked (lnet_nid_t nid)
 static srpc_peer_t *
 srpc_nid2peer (lnet_nid_t nid)
 {
-       srpc_peer_t *peer;
-       srpc_peer_t *new_peer;
+        srpc_peer_t *peer;
+        srpc_peer_t *new_peer;
 
         spin_lock(&srpc_data.rpc_glock);
         peer = srpc_find_peer_locked(nid);
@@ -210,7 +210,7 @@ srpc_nid2peer (lnet_nid_t nid)
 
         if (peer != NULL)
                 return peer;
-        
+
         new_peer = srpc_create_peer(nid);
 
         spin_lock(&srpc_data.rpc_glock);
@@ -228,7 +228,7 @@ srpc_nid2peer (lnet_nid_t nid)
                 spin_unlock(&srpc_data.rpc_glock);
                 return NULL;
         }
-                
+
         list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
         spin_unlock(&srpc_data.rpc_glock);
         return new_peer;
@@ -378,7 +378,7 @@ srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
 }
 
 int
-srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, 
+srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
                       int options, lnet_process_id_t peer, lnet_nid_t self,
                       lnet_handle_md_t *mdh, srpc_event_t *ev)
 {
@@ -441,7 +441,7 @@ srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
         else
                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
 
-        rc = srpc_post_active_rdma(portal, service, buf, len, 
+        rc = srpc_post_active_rdma(portal, service, buf, len,
                                    LNET_MD_OP_PUT, peer,
                                    LNET_NID_ANY, mdh, ev);
         return rc;
@@ -509,7 +509,7 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
         spin_unlock(&sv->sv_lock);
         LIBCFS_FREE(buf, sizeof(*buf));
         spin_lock(&sv->sv_lock);
-        return rc; 
+        return rc;
 }
 
 int
@@ -921,7 +921,8 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 }
         }
         case SWI_STATE_BULK_STARTED:
-                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
+                /* we cannot LASSERT ev_fired right here because it
+                 * may be set only upon an event with unlinked==1 */
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = ev->ev_status;
@@ -930,11 +931,20 @@ srpc_handle_rpc (swi_workitem_t *wi)
                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
 
                         if (rc != 0) {
-                                srpc_server_rpc_done(rpc, rc);
-                                return 1;
+                                if (ev->ev_fired) {
+                                        srpc_server_rpc_done(rpc, rc);
+                                        return 1;
+                                }
+
+                                rpc->srpc_status = rc;
+                                wi->wi_state     = SWI_STATE_BULK_ERRORED;
+                                LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
+                                return 0; /* wait for UNLINK event  */
                         }
                 }
 
+                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
+
                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
                 rc = srpc_send_reply(rpc);
                 if (rc == 0)
@@ -948,6 +958,13 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 wi->wi_state = SWI_STATE_DONE;
                 srpc_server_rpc_done(rpc, ev->ev_status);
                 return 1;
+
+        case SWI_STATE_BULK_ERRORED:
+                LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired);
+                LASSERT (rpc->srpc_status != 0);
+
+                srpc_server_rpc_done(rpc, rpc->srpc_status);
+                return 1;
         }
 
         return 0;
@@ -985,20 +1002,20 @@ srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
         CFS_INIT_LIST_HEAD(&timer->stt_list);
         timer->stt_data    = rpc;
         timer->stt_func    = srpc_client_rpc_expired;
-        timer->stt_expires = cfs_time_add(rpc->crpc_timeout, 
+        timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
                                           cfs_time_current_sec());
         stt_add_timer(timer);
         return;
 }
 
-/* 
+/*
  * Called with rpc->crpc_lock held.
  *
  * Upon exit the RPC expiry timer is not queued and the handler is not
  * running on any CPU. */
 void
 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
-{     
+{
         /* timer not planted or already exploded */
         if (rpc->crpc_timeout == 0) return;
 
@@ -1010,7 +1027,7 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
         while (rpc->crpc_timeout != 0) {
                 spin_unlock(&rpc->crpc_lock);
 
-                cfs_schedule(); 
+                cfs_schedule();
 
                 spin_lock(&rpc->crpc_lock);
         }
@@ -1078,7 +1095,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
          * No one can schedule me now since:
          * - RPC timer has been defused.
          * - all LNet events have been fired.
-         * - crpc_closed has been set, preventing srpc_abort_rpc from 
+         * - crpc_closed has been set, preventing srpc_abort_rpc from
          *   scheduling me.
          * Cancel pending schedules and prevent future schedule attempts:
          */
@@ -1136,7 +1153,7 @@ srpc_send_rpc (swi_workitem_t *wi)
 
         case SWI_STATE_REQUEST_SUBMITTED:
                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
-                 * order; however, they're processed in a strict order: 
+                 * order; however, they're processed in a strict order:
                  * rqt, rpy, and bulk. */
                 if (!rpc->crpc_reqstev.ev_fired) break;
 
@@ -1153,7 +1170,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                 rc = rpc->crpc_replyev.ev_status;
                 if (rc != 0) break;
 
-                if ((reply->msg_type != type && 
+                if ((reply->msg_type != type &&
                      reply->msg_type != __swab32(type)) ||
                     (reply->msg_magic != SRPC_MSG_MAGIC &&
                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
@@ -1222,7 +1239,7 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service,
 {
         srpc_client_rpc_t *rpc;
 
-       LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
+        LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
                                    crpc_bulk.bk_iovs[nbulkiov]));
         if (rpc == NULL)
                 return NULL;
@@ -1371,7 +1388,7 @@ srpc_send_reply (srpc_server_rpc_t *rpc)
 }
 
 /* when in kernel always called with LNET_LOCK() held, and in thread context */
-void 
+void
 srpc_lnet_ev_handler (lnet_event_t *ev)
 {
         srpc_event_t      *rpcev = ev->md.user_ptr;
@@ -1381,6 +1398,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
         srpc_service_t    *sv;
         srpc_msg_t        *msg;
         srpc_msg_type_t    type;
+        int                fired_flag = 1;
 
         LASSERT (!in_interrupt());
 
@@ -1413,7 +1431,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
 
                 LASSERT (rpcev->ev_fired == 0);
                 rpcev->ev_fired  = 1;
-                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
+                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
                 swi_schedule_workitem(&crpc->crpc_wi);
 
@@ -1441,7 +1459,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 LASSERT (sv->sv_nposted_msg >= 0);
 
                 if (sv->sv_shuttingdown) {
-                        /* Leave buffer on sv->sv_posted_msgq since 
+                        /* Leave buffer on sv->sv_posted_msgq since
                          * srpc_finish_service needs to traverse it. */
                         spin_unlock(&sv->sv_lock);
                         break;
@@ -1452,7 +1470,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 type = srpc_service2request(sv->sv_id);
 
                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
-                    (msg->msg_type != type && 
+                    (msg->msg_type != type &&
                      msg->msg_type != __swab32(type)) ||
                     (msg->msg_magic != SRPC_MSG_MAGIC &&
                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
@@ -1502,10 +1520,13 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                          ev->type == LNET_EVENT_REPLY ||
                          ev->type == LNET_EVENT_UNLINK);
 
-                if (ev->type == LNET_EVENT_SEND && 
-                    ev->status == 0 && !ev->unlinked)
-                        break; /* wait for the final LNET_EVENT_REPLY */
-
+                if (ev->type == LNET_EVENT_SEND && !ev->unlinked) {
+                        if (ev->status == 0)
+                                break; /* wait for the final LNET_EVENT_REPLY */
+                        else
+                                fired_flag = 0; /* LNET_EVENT_REPLY may arrive
+                                                   (optimized GET case) */
+                }
         case SRPC_BULK_PUT_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
                         spin_lock(&srpc_data.rpc_glock);
@@ -1524,9 +1545,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 LASSERT (rpcev == &srpc->srpc_ev);
 
                 spin_lock(&sv->sv_lock);
-                rpcev->ev_fired  = 1;
-                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
+                if (fired_flag)
+                        rpcev->ev_fired  = 1;
+
+                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
+
                 srpc_schedule_server_rpc(srpc);
                 spin_unlock(&sv->sv_lock);
                 break;
index 37228e4..9aab54d 100644 (file)
@@ -56,6 +56,7 @@ typedef struct { volatile int counter; } atomic_t;
 #define SWI_STATE_REQUEST_SENT             4
 #define SWI_STATE_REPLY_RECEIVED           5
 #define SWI_STATE_BULK_STARTED             6
+#define SWI_STATE_BULK_ERRORED             7
 #define SWI_STATE_DONE                     10
 
 /* forward refs */
@@ -71,11 +72,11 @@ struct sfw_test_instance;
  *   serialized with respect to itself.
  * - no CPU affinity, a workitem does not necessarily run on the same CPU
  *   that schedules it. However, this might change in the future.
- * - if a workitem is scheduled again before it has a chance to run, it 
+ * - if a workitem is scheduled again before it has a chance to run, it
  *   runs only once.
- * - if a workitem is scheduled while it runs, it runs again after it 
- *   completes; this ensures that events occurring while other events are 
- *   being processed receive due attention. This behavior also allows a 
+ * - if a workitem is scheduled while it runs, it runs again after it
+ *   completes; this ensures that events occurring while other events are
+ *   being processed receive due attention. This behavior also allows a
  *   workitem to reschedule itself.
  *
  * Usage notes:
@@ -355,7 +356,7 @@ typedef struct {
 typedef struct {
         int  (*tso_init)(struct sfw_test_instance *tsi); /* intialize test client */
         void (*tso_fini)(struct sfw_test_instance *tsi); /* finalize test client */
-        int  (*tso_prep_rpc)(struct sfw_test_unit *tsu,     
+        int  (*tso_prep_rpc)(struct sfw_test_unit *tsu,
                              lnet_process_id_t dest,
                              srpc_client_rpc_t **rpc);   /* prep a tests rpc */
         void (*tso_done_rpc)(struct sfw_test_unit *tsu,
@@ -388,7 +389,7 @@ typedef struct sfw_test_instance {
         } tsi_u;
 } sfw_test_instance_t;
 
-/* XXX: trailing (CFS_PAGE_SIZE % sizeof(lnet_process_id_t)) bytes at 
+/* XXX: trailing (CFS_PAGE_SIZE % sizeof(lnet_process_id_t)) bytes at
  * the end of pages are not used */
 #define SFW_MAX_CONCUR     LST_MAX_CONCUR
 #define SFW_ID_PER_PAGE    (CFS_PAGE_SIZE / sizeof(lnet_process_id_t))
@@ -425,7 +426,7 @@ void sfw_add_bulk_page(srpc_bulk_t *bk, cfs_page_t *pg, int i);
 int sfw_alloc_pages(srpc_server_rpc_t *rpc, int npages, int sink);
 
 srpc_client_rpc_t *
-srpc_create_client_rpc(lnet_process_id_t peer, int service, 
+srpc_create_client_rpc(lnet_process_id_t peer, int service,
                        int nbulkiov, int bulklen,
                        void (*rpc_done)(srpc_client_rpc_t *),
                        void (*rpc_fini)(srpc_client_rpc_t *), void *priv);
@@ -513,12 +514,12 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer,
         return;
 }
 
-static inline const char * 
+static inline const char *
 swi_state2str (int state)
 {
 #define STATE2STR(x) case x: return #x
         switch(state) {
-                default: 
+                default:
                         LBUG();
                 STATE2STR(SWI_STATE_NEWBORN);
                 STATE2STR(SWI_STATE_REPLY_SUBMITTED);
@@ -527,6 +528,7 @@ swi_state2str (int state)
                 STATE2STR(SWI_STATE_REQUEST_SENT);
                 STATE2STR(SWI_STATE_REPLY_RECEIVED);
                 STATE2STR(SWI_STATE_BULK_STARTED);
+                STATE2STR(SWI_STATE_BULK_ERRORED);
                 STATE2STR(SWI_STATE_DONE);
         }
 #undef STATE2STR