Whamcloud - gitweb
b=22827 restore locking for thread->t_flags
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index b72848a..e8e93cf 100644 (file)
@@ -658,7 +658,9 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
                 LASSERT(!cfs_list_empty(&req->rq_timed_list));
                 cfs_list_del_init(&req->rq_timed_list);
+                cfs_spin_lock(&req->rq_lock);
                 req->rq_at_linked = 0;
+                cfs_spin_unlock(&req->rq_lock);
                 array->paa_reqs_count[index]--;
                 array->paa_count--;
         } else
@@ -919,7 +921,9 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
                 cfs_list_add(&req->rq_timed_list,
                              &array->paa_reqs_array[index]);
 
+        cfs_spin_lock(&req->rq_lock);
         req->rq_at_linked = 1;
+        cfs_spin_unlock(&req->rq_lock);
         req->rq_at_index = index;
         array->paa_reqs_count[index]++;
         array->paa_count++;
@@ -974,29 +978,34 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         if (req->rq_export &&
             lustre_msg_get_flags(req->rq_reqmsg) &
             (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
-                /**
-                 * Use at_extra as early reply period for recovery requests but
-                 * make sure it is not bigger than recovery time / 4
-                 */
-                at_add(&svc->srv_at_estimate,
-                       min(at_extra,
-                           req->rq_export->exp_obd->obd_recovery_timeout / 4));
+                /* During recovery, we don't want to send too many early
+                 * replies, but on the other hand we want to make sure the
+                 * client has enough time to resend if the rpc is lost. So
+                 * during the recovery period send at least 4 early replies,
+                 * spacing them every at_extra if we can. at_estimate should
+                 * always equal this fixed value during recovery. */
+                at_measured(&svc->srv_at_estimate, min(at_extra,
+                            req->rq_export->exp_obd->obd_recovery_timeout / 4));
         } else {
                 /* Fake our processing time into the future to ask the clients
                  * for some extra amount of time */
-                at_add(&svc->srv_at_estimate, at_extra);
+                at_measured(&svc->srv_at_estimate, at_extra +
+                            cfs_time_current_sec() -
+                            req->rq_arrival_time.tv_sec);
+
+                /* Check to see if we've actually increased the deadline -
+                 * we may be past adaptive_max */
+                if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
+                    at_get(&svc->srv_at_estimate)) {
+                        DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
+                                  "(%ld/%ld), not sending early reply\n",
+                                  olddl, req->rq_arrival_time.tv_sec +
+                                  at_get(&svc->srv_at_estimate) -
+                                  cfs_time_current_sec());
+                        RETURN(-ETIMEDOUT);
+                }
         }
-
         newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate);
-        if (req->rq_deadline >= newdl) {
-                /* We're not adding any time, no need to send an early reply
-                   (e.g. maybe at adaptive_max) */
-                DEBUG_REQ(D_WARNING, req, "Couldn't add any time ("
-                          CFS_DURATION_T"/"CFS_DURATION_T"), "
-                          "not sending early reply\n", olddl,
-                          cfs_time_sub(newdl, cfs_time_current_sec()));
-                RETURN(-ETIMEDOUT);
-        }
 
         OBD_ALLOC(reqcopy, sizeof *reqcopy);
         if (reqcopy == NULL)
@@ -1125,7 +1134,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                                 counter++;
                                 array->paa_reqs_count[index]--;
                                 array->paa_count--;
+                                cfs_spin_lock(&rq->rq_lock);
                                 rq->rq_at_linked = 0;
+                                cfs_spin_unlock(&rq->rq_lock);
                                 continue;
                         }
 
@@ -1306,17 +1317,16 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
         return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
-               svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+               svc->srv_threads_running < svc->srv_threads_started - 2;
 }
 
 static struct ptlrpc_request *
-ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+ptlrpc_server_request_get(struct ptlrpc_service *svc)
 {
         struct ptlrpc_request *req = NULL;
         ENTRY;
 
-        if (ptlrpc_server_allow_normal(svc, force) &&
-            !cfs_list_empty(&svc->srv_request_queue) &&
+        if (!cfs_list_empty(&svc->srv_request_queue) &&
             (cfs_list_empty(&svc->srv_request_hpq) ||
              svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
                 req = cfs_list_entry(svc->srv_request_queue.next,
@@ -1497,22 +1507,14 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         LASSERT(svc);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
-            (
 #ifndef __KERNEL__
-             /* !@%$# liblustre only has 1 thread */
-             cfs_atomic_read(&svc->srv_n_difficult_replies) != 0 &&
-#endif
-             svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
-                 /* Don't handle regular requests in the last thread, in order               * re
-                  * to handle difficult replies (which might block other threads)
-                  * as well as handle any incoming reqs, early replies, etc.
-                  * That means we always need at least 2 service threads. */
+        /* !@%$# liblustre only has 1 thread */
+        if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
                 cfs_spin_unlock(&svc->srv_lock);
                 RETURN(0);
-             }
-
-        request = ptlrpc_server_request_get(svc, 0);
+        }
+#endif
+        request = ptlrpc_server_request_get(svc);
         if  (request == NULL) {
                 cfs_spin_unlock(&svc->srv_lock);
                 RETURN(0);
@@ -1529,12 +1531,11 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                         cfs_spin_unlock(&svc->srv_lock);
                         OBD_FAIL_TIMEOUT(fail_opc, 4);
                         cfs_spin_lock(&svc->srv_lock);
-                        request = ptlrpc_server_request_get(svc, 0);
+                        request = ptlrpc_server_request_get(svc);
                         if  (request == NULL) {
                                 cfs_spin_unlock(&svc->srv_lock);
                                 RETURN(0);
                         }
-                        LASSERT(ptlrpc_server_request_pending(svc, 0));
                 }
         }
 
@@ -1899,6 +1900,85 @@ ptlrpc_retry_rqbds(void *arg)
         return (-ETIMEDOUT);
 }
 
+/**
+ *  Status bits to pass todo info from
+ *  ptlrpc_main_check_event to ptlrpc_main.
+ */
+#define PTLRPC_MAIN_STOPPING    0x01
+#define PTLRPC_MAIN_IN_REQ      0x02
+#define PTLRPC_MAIN_ACTIVE_REQ  0x04
+#define PTLRPC_MAIN_CHECK_TIMED 0x08
+#define PTLRPC_MAIN_REPOST      0x10
+
+/**
+ * A container to share per-thread status variables between
+ * ptlrpc_main_check_event and ptlrpc_main functions.
+ */
+struct ptlrpc_main_check_s {
+        /** todo info for the ptrlrpc_main */
+        int todo;
+        /** is this thread counted as running or not? */
+        int running;
+};
+
+/**
+ * Check whether current service thread has work to do.
+ */
+static int ptlrpc_main_check_event(struct ptlrpc_thread *t,
+                                   struct ptlrpc_main_check_s *status)
+{
+        struct ptlrpc_service *svc = t->t_svc;
+        ENTRY;
+
+        status->todo = 0;
+
+        /* check the stop flags w/o any locking to make all
+         * concurrently running threads stop faster. */
+        if (unlikely((t->t_flags & SVC_STOPPING) ||
+                     svc->srv_is_stopping)) {
+                status->todo |= PTLRPC_MAIN_STOPPING;
+                goto out;
+        }
+
+        cfs_spin_lock(&svc->srv_lock);
+        /* count this thread as not running before possible sleep in
+         * the outer wait event if it is not done yet. */
+        if (status->running) {
+                LASSERT(svc->srv_threads_running > 0);
+                svc->srv_threads_running--;
+                status->running = 0;
+        }
+        /* Process all incoming reqs before handling any */
+        if (!cfs_list_empty(&svc->srv_req_in_queue)) {
+                        status->todo |= PTLRPC_MAIN_IN_REQ;
+        }
+        /* Don't handle regular requests in the last thread, in order
+         * to handle any incoming reqs, early replies, etc. */
+        if (ptlrpc_server_request_pending(svc, 0) &&
+            (svc->srv_threads_running < (svc->srv_threads_started - 1))) {
+                status->todo |= PTLRPC_MAIN_ACTIVE_REQ;
+        }
+        if (svc->srv_at_check) {
+                status->todo |= PTLRPC_MAIN_CHECK_TIMED;
+        }
+        if ((!cfs_list_empty(&svc->srv_idle_rqbds) &&
+             svc->srv_rqbd_timeout == 0)) {
+                status->todo |= PTLRPC_MAIN_REPOST;
+        }
+        /* count this thread as active if it goes out the outer
+         * wait event */
+        if (status->todo) {
+                svc->srv_threads_running++;
+                status->running = 1;
+        }
+        cfs_spin_unlock(&svc->srv_lock);
+ out:
+        RETURN(status->todo);
+}
+
+/**
+ * Main prlrpc service thread routine.
+ */
 static int ptlrpc_main(void *arg)
 {
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
@@ -1906,6 +1986,7 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_thread   *thread = data->thread;
         struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
+        struct ptlrpc_main_check_s st;
 #ifdef WITH_GROUP_INFO
         cfs_group_info_t *ginfo = NULL;
 #endif
@@ -1985,7 +2066,6 @@ static int ptlrpc_main(void *arg)
         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
 
         cfs_spin_lock(&svc->srv_lock);
-        svc->srv_threads_running++;
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
         cfs_spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
@@ -1995,7 +2075,10 @@ static int ptlrpc_main(void *arg)
 
         /* XXX maintain a list of all managed devices: insert here */
 
-        while (!(thread->t_flags & SVC_STOPPING) && !svc->srv_is_stopping) {
+        st.running = 0;
+        st.todo = 0;
+
+        while (!(st.todo & PTLRPC_MAIN_STOPPING)) {
                 /* Don't exit while there are replies to be handled */
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
@@ -2005,19 +2088,8 @@ static int ptlrpc_main(void *arg)
                 cfs_cond_resched();
 
                 l_wait_event_exclusive (svc->srv_waitq,
-                              thread->t_flags & SVC_STOPPING ||
-                              svc->srv_is_stopping ||
-                              (!cfs_list_empty(&svc->srv_idle_rqbds) &&
-                               svc->srv_rqbd_timeout == 0) ||
-                              !cfs_list_empty(&svc->srv_req_in_queue) ||
-                              (ptlrpc_server_request_pending(svc, 0) &&
-                               (svc->srv_n_active_reqs <
-                                (svc->srv_threads_running - 1))) ||
-                              svc->srv_at_check,
-                              &lwi);
-
-                if (thread->t_flags & SVC_STOPPING || svc->srv_is_stopping)
-                        break;
+                                        ptlrpc_main_check_event(thread, &st),
+                                        &lwi);
 
                 lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
 
@@ -2028,31 +2100,26 @@ static int ptlrpc_main(void *arg)
                         /* Ignore return code - we tried... */
                         ptlrpc_start_thread(dev, svc);
 
-                if (!cfs_list_empty(&svc->srv_req_in_queue)) {
-                        /* Process all incoming reqs before handling any */
+                if (st.todo & PTLRPC_MAIN_IN_REQ) {
                         ptlrpc_server_handle_req_in(svc);
                         /* but limit ourselves in case of flood */
                         if (counter++ < 1000)
                                 continue;
                         counter = 0;
                 }
-
-                if (svc->srv_at_check)
+                if (st.todo & PTLRPC_MAIN_CHECK_TIMED) {
                         ptlrpc_at_check_timed(svc);
-
-                /* don't handle requests in the last thread */
-                if (ptlrpc_server_request_pending(svc, 0) &&
-                    (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
+                }
+                if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
                         lu_context_exit(&env.le_ctx);
                 }
-
-                if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
+                if ((st.todo & PTLRPC_MAIN_REPOST) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
-                        /* I just failed to repost request buffers.  Wait
-                         * for a timeout (unless something else happens)
-                         * before I try again */
+                        /* I just failed to repost request buffers.
+                         * Wait for a timeout (unless something else
+                         * happens) before I try again */
                         svc->srv_rqbd_timeout = cfs_time_seconds(1)/10;
                         CDEBUG(D_RPCTRACE,"Posted buffers: %d\n",
                                svc->srv_nrqbd_receiving);
@@ -2075,10 +2142,10 @@ out:
                thread, thread->t_pid, thread->t_id, rc);
 
         cfs_spin_lock(&svc->srv_lock);
-        svc->srv_threads_running--; /* must know immediately */
+        if (st.running)
+                svc->srv_threads_running--;
         thread->t_id = rc;
         thread->t_flags = SVC_STOPPED;
-
         cfs_waitq_signal(&thread->t_ctl_waitq);
         cfs_spin_unlock(&svc->srv_lock);
 
@@ -2516,7 +2583,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         while (ptlrpc_server_request_pending(service, 1)) {
                 struct ptlrpc_request *req;
 
-                req = ptlrpc_server_request_get(service, 1);
+                req = ptlrpc_server_request_get(service);
                 cfs_list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;