Whamcloud - gitweb
b=21140 Fix srv_threads_running counting.
authorAlexander.Zarochentsev <Alexander.Zarochentsev@sun.com>
Wed, 12 May 2010 10:29:19 +0000 (14:29 +0400)
committerRobert Read <robert.read@oracle.com>
Wed, 12 May 2010 19:04:15 +0000 (12:04 -0700)
It was possible to overload n_active_request processing incoming requests and
break the thread reservation logic. Likely, it was responsible to the long
processing of requests.

The patch makes srv_threads_running to exactly count only running not sleeping
threads. All threads accounting and comparing/reservation of threads are done
under the service spinlock so it produce a reliable result. The thread
reservation logic is based on new srv_threads_running value and cannot be
confused by not active sleeping threads. The thread reservation logic is
concentrated now in one place, where the wakeup condition is checked (now in
ptlrpc_main_check_event), once a thread is woken up, it is counted as running
and does further work w/o additional checks.

i=zhen.liang
i=robert.read

lustre/ptlrpc/service.c

index 07cd10d..a1d1060 100644 (file)
@@ -1317,17 +1317,16 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
         return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
         return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
-               svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+               svc->srv_threads_running < svc->srv_threads_started - 2;
 }
 
 static struct ptlrpc_request *
 }
 
 static struct ptlrpc_request *
-ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+ptlrpc_server_request_get(struct ptlrpc_service *svc)
 {
         struct ptlrpc_request *req = NULL;
         ENTRY;
 
 {
         struct ptlrpc_request *req = NULL;
         ENTRY;
 
-        if (ptlrpc_server_allow_normal(svc, force) &&
-            !cfs_list_empty(&svc->srv_request_queue) &&
+        if (!cfs_list_empty(&svc->srv_request_queue) &&
             (cfs_list_empty(&svc->srv_request_hpq) ||
              svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
                 req = cfs_list_entry(svc->srv_request_queue.next,
             (cfs_list_empty(&svc->srv_request_hpq) ||
              svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
                 req = cfs_list_entry(svc->srv_request_queue.next,
@@ -1508,22 +1507,14 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         LASSERT(svc);
 
         cfs_spin_lock(&svc->srv_lock);
         LASSERT(svc);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
-            (
 #ifndef __KERNEL__
 #ifndef __KERNEL__
-             /* !@%$# liblustre only has 1 thread */
-             cfs_atomic_read(&svc->srv_n_difficult_replies) != 0 &&
-#endif
-             svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
-                 /* Don't handle regular requests in the last thread, in order               * re
-                  * to handle difficult replies (which might block other threads)
-                  * as well as handle any incoming reqs, early replies, etc.
-                  * That means we always need at least 2 service threads. */
+        /* !@%$# liblustre only has 1 thread */
+        if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
                 cfs_spin_unlock(&svc->srv_lock);
                 RETURN(0);
                 cfs_spin_unlock(&svc->srv_lock);
                 RETURN(0);
-             }
-
-        request = ptlrpc_server_request_get(svc, 0);
+        }
+#endif
+        request = ptlrpc_server_request_get(svc);
         if  (request == NULL) {
                 cfs_spin_unlock(&svc->srv_lock);
                 RETURN(0);
         if  (request == NULL) {
                 cfs_spin_unlock(&svc->srv_lock);
                 RETURN(0);
@@ -1540,12 +1531,11 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                         cfs_spin_unlock(&svc->srv_lock);
                         OBD_FAIL_TIMEOUT(fail_opc, 4);
                         cfs_spin_lock(&svc->srv_lock);
                         cfs_spin_unlock(&svc->srv_lock);
                         OBD_FAIL_TIMEOUT(fail_opc, 4);
                         cfs_spin_lock(&svc->srv_lock);
-                        request = ptlrpc_server_request_get(svc, 0);
+                        request = ptlrpc_server_request_get(svc);
                         if  (request == NULL) {
                                 cfs_spin_unlock(&svc->srv_lock);
                                 RETURN(0);
                         }
                         if  (request == NULL) {
                                 cfs_spin_unlock(&svc->srv_lock);
                                 RETURN(0);
                         }
-                        LASSERT(ptlrpc_server_request_pending(svc, 0));
                 }
         }
 
                 }
         }
 
@@ -1910,6 +1900,85 @@ ptlrpc_retry_rqbds(void *arg)
         return (-ETIMEDOUT);
 }
 
         return (-ETIMEDOUT);
 }
 
+/**
+ *  Status bits to pass todo info from
+ *  ptlrpc_main_check_event to ptlrpc_main.
+ */
+#define PTLRPC_MAIN_STOPPING    0x01
+#define PTLRPC_MAIN_IN_REQ      0x02
+#define PTLRPC_MAIN_ACTIVE_REQ  0x04
+#define PTLRPC_MAIN_CHECK_TIMED 0x08
+#define PTLRPC_MAIN_REPOST      0x10
+
+/**
+ * A container to share per-thread status variables between
+ * ptlrpc_main_check_event and ptlrpc_main functions.
+ */
+struct ptlrpc_main_check_s {
+        /** todo info for the ptrlrpc_main */
+        int todo;
+        /** is this thread counted as running or not? */
+        int running;
+};
+
+/**
+ * Check whether current service thread has work to do.
+ */
+static int ptlrpc_main_check_event(struct ptlrpc_thread *t,
+                                   struct ptlrpc_main_check_s *status)
+{
+        struct ptlrpc_service *svc = t->t_svc;
+        ENTRY;
+
+        status->todo = 0;
+
+        /* check the stop flags w/o any locking to make all
+         * concurrently running threads stop faster. */
+        if (unlikely((t->t_flags & SVC_STOPPING) ||
+                     svc->srv_is_stopping)) {
+                status->todo |= PTLRPC_MAIN_STOPPING;
+                goto out;
+        }
+
+        cfs_spin_lock(&svc->srv_lock);
+        /* count this thread as not running before possible sleep in
+         * the outer wait event if it is not done yet. */
+        if (status->running) {
+                LASSERT(svc->srv_threads_running > 0);
+                svc->srv_threads_running--;
+                status->running = 0;
+        }
+        /* Process all incoming reqs before handling any */
+        if (!cfs_list_empty(&svc->srv_req_in_queue)) {
+                        status->todo |= PTLRPC_MAIN_IN_REQ;
+        }
+        /* Don't handle regular requests in the last thread, in order
+         * to handle any incoming reqs, early replies, etc. */
+        if (ptlrpc_server_request_pending(svc, 0) &&
+            (svc->srv_threads_running < (svc->srv_threads_started - 1))) {
+                status->todo |= PTLRPC_MAIN_ACTIVE_REQ;
+        }
+        if (svc->srv_at_check) {
+                status->todo |= PTLRPC_MAIN_CHECK_TIMED;
+        }
+        if ((!cfs_list_empty(&svc->srv_idle_rqbds) &&
+             svc->srv_rqbd_timeout == 0)) {
+                status->todo |= PTLRPC_MAIN_REPOST;
+        }
+        /* count this thread as active if it goes out the outer
+         * wait event */
+        if (status->todo) {
+                svc->srv_threads_running++;
+                status->running = 1;
+        }
+        cfs_spin_unlock(&svc->srv_lock);
+ out:
+        RETURN(status->todo);
+}
+
+/**
+ * Main prlrpc service thread routine.
+ */
 static int ptlrpc_main(void *arg)
 {
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
 static int ptlrpc_main(void *arg)
 {
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
@@ -1917,6 +1986,7 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_thread   *thread = data->thread;
         struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
         struct ptlrpc_thread   *thread = data->thread;
         struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
+        struct ptlrpc_main_check_s st;
 #ifdef WITH_GROUP_INFO
         cfs_group_info_t *ginfo = NULL;
 #endif
 #ifdef WITH_GROUP_INFO
         cfs_group_info_t *ginfo = NULL;
 #endif
@@ -1996,7 +2066,6 @@ static int ptlrpc_main(void *arg)
         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
 
         cfs_spin_lock(&svc->srv_lock);
         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
 
         cfs_spin_lock(&svc->srv_lock);
-        svc->srv_threads_running++;
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
         cfs_spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
         cfs_spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
@@ -2006,7 +2075,10 @@ static int ptlrpc_main(void *arg)
 
         /* XXX maintain a list of all managed devices: insert here */
 
 
         /* XXX maintain a list of all managed devices: insert here */
 
-        while (!(thread->t_flags & SVC_STOPPING) && !svc->srv_is_stopping) {
+        st.running = 0;
+        st.todo = 0;
+
+        while (!(st.todo & PTLRPC_MAIN_STOPPING)) {
                 /* Don't exit while there are replies to be handled */
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
                 /* Don't exit while there are replies to be handled */
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
@@ -2016,19 +2088,8 @@ static int ptlrpc_main(void *arg)
                 cfs_cond_resched();
 
                 l_wait_event_exclusive (svc->srv_waitq,
                 cfs_cond_resched();
 
                 l_wait_event_exclusive (svc->srv_waitq,
-                              thread->t_flags & SVC_STOPPING ||
-                              svc->srv_is_stopping ||
-                              (!cfs_list_empty(&svc->srv_idle_rqbds) &&
-                               svc->srv_rqbd_timeout == 0) ||
-                              !cfs_list_empty(&svc->srv_req_in_queue) ||
-                              (ptlrpc_server_request_pending(svc, 0) &&
-                               (svc->srv_n_active_reqs <
-                                (svc->srv_threads_running - 1))) ||
-                              svc->srv_at_check,
-                              &lwi);
-
-                if (thread->t_flags & SVC_STOPPING || svc->srv_is_stopping)
-                        break;
+                                        ptlrpc_main_check_event(thread, &st),
+                                        &lwi);
 
                 lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
 
 
                 lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
 
@@ -2039,31 +2100,26 @@ static int ptlrpc_main(void *arg)
                         /* Ignore return code - we tried... */
                         ptlrpc_start_thread(dev, svc);
 
                         /* Ignore return code - we tried... */
                         ptlrpc_start_thread(dev, svc);
 
-                if (!cfs_list_empty(&svc->srv_req_in_queue)) {
-                        /* Process all incoming reqs before handling any */
+                if (st.todo & PTLRPC_MAIN_IN_REQ) {
                         ptlrpc_server_handle_req_in(svc);
                         /* but limit ourselves in case of flood */
                         if (counter++ < 1000)
                                 continue;
                         counter = 0;
                 }
                         ptlrpc_server_handle_req_in(svc);
                         /* but limit ourselves in case of flood */
                         if (counter++ < 1000)
                                 continue;
                         counter = 0;
                 }
-
-                if (svc->srv_at_check)
+                if (st.todo & PTLRPC_MAIN_CHECK_TIMED) {
                         ptlrpc_at_check_timed(svc);
                         ptlrpc_at_check_timed(svc);
-
-                /* don't handle requests in the last thread */
-                if (ptlrpc_server_request_pending(svc, 0) &&
-                    (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
+                }
+                if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
                         lu_context_exit(&env.le_ctx);
                 }
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
                         lu_context_exit(&env.le_ctx);
                 }
-
-                if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
+                if ((st.todo & PTLRPC_MAIN_REPOST) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
-                        /* I just failed to repost request buffers.  Wait
-                         * for a timeout (unless something else happens)
-                         * before I try again */
+                        /* I just failed to repost request buffers.
+                         * Wait for a timeout (unless something else
+                         * happens) before I try again */
                         svc->srv_rqbd_timeout = cfs_time_seconds(1)/10;
                         CDEBUG(D_RPCTRACE,"Posted buffers: %d\n",
                                svc->srv_nrqbd_receiving);
                         svc->srv_rqbd_timeout = cfs_time_seconds(1)/10;
                         CDEBUG(D_RPCTRACE,"Posted buffers: %d\n",
                                svc->srv_nrqbd_receiving);
@@ -2085,14 +2141,16 @@ out:
         CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
                thread, thread->t_pid, thread->t_id, rc);
 
         CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
                thread, thread->t_pid, thread->t_id, rc);
 
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_threads_running--; /* must know immediately */
+        if (st.running) {
+                cfs_spin_lock(&svc->srv_lock);
+                svc->srv_threads_running--;
+                cfs_spin_unlock(&svc->srv_lock);
+        }
+
         thread->t_id = rc;
         thread->t_flags = SVC_STOPPED;
 
         cfs_waitq_signal(&thread->t_ctl_waitq);
         thread->t_id = rc;
         thread->t_flags = SVC_STOPPED;
 
         cfs_waitq_signal(&thread->t_ctl_waitq);
-        cfs_spin_unlock(&svc->srv_lock);
-
         return rc;
 }
 
         return rc;
 }
 
@@ -2527,7 +2585,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         while (ptlrpc_server_request_pending(service, 1)) {
                 struct ptlrpc_request *req;
 
         while (ptlrpc_server_request_pending(service, 1)) {
                 struct ptlrpc_request *req;
 
-                req = ptlrpc_server_request_get(service, 1);
+                req = ptlrpc_server_request_get(service);
                 cfs_list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
                 cfs_list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;