Whamcloud - gitweb
b=17682 limit performance impact of rpctrace, dlmtrace & quota
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index dff36d3..4818e7a 100644 (file)
@@ -280,6 +280,7 @@ static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
                 rs->rs_scheduled = 1;
                 b->rsb_n_replies++;
         }
+        rs->rs_committed = 1;
         spin_unlock(&rs->rs_lock);
 }
 
@@ -613,15 +614,17 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                 return;
 
         spin_lock(&svc->srv_at_lock);
-        list_del_init(&req->rq_timed_list);
         if (req->rq_at_linked) {
                 struct ptlrpc_at_array *array = &svc->srv_at_array;
                 __u32 index = req->rq_at_index;
 
+                LASSERT(!list_empty(&req->rq_timed_list));
+                list_del_init(&req->rq_timed_list);
                 req->rq_at_linked = 0;
                 array->paa_reqs_count[index]--;
                 array->paa_count--;
-        }
+        } else
+                LASSERT(list_empty(&req->rq_timed_list));
         spin_unlock(&svc->srv_at_lock);
 
         /* finalize request */
@@ -771,8 +774,8 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
                         exp->exp_obd->obd_eviction_timer =
                                 cfs_time_current_sec() + 3 * PING_INTERVAL;
                         CDEBUG(D_HA, "%s: Think about evicting %s from "CFS_TIME_T"\n",
-                               exp->exp_obd->obd_name, obd_export_nid2str(exp),
-                               oldest_time);
+                               exp->exp_obd->obd_name, 
+                               obd_export_nid2str(oldest_exp), oldest_time);
                 }
         } else {
                 if (cfs_time_current_sec() >
@@ -890,8 +893,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         return 0;
 }
 
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
-                                      int extra_time)
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 {
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
         struct ptlrpc_request *reqcopy;
@@ -907,7 +909,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                   "%ssending early reply (deadline %+lds, margin %+lds) for "
                   "%d+%d", AT_OFF ? "AT off - not " : "",
                   olddl, olddl - at_get(&svc->srv_at_estimate),
-                  at_get(&svc->srv_at_estimate), extra_time);
+                  at_get(&svc->srv_at_estimate), at_extra);
 
         if (AT_OFF)
                 RETURN(0);
@@ -927,22 +929,23 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                 RETURN(-ENOSYS);
         }
 
-        if (req->rq_export && req->rq_export->exp_in_recovery) {
-                /* don't increase server estimates during recovery, and give
-                   clients the full recovery time. */
-                newdl = cfs_time_current_sec() +
-                        req->rq_export->exp_obd->obd_recovery_timeout;
+        if (req->rq_export &&
+            lustre_msg_get_flags(req->rq_reqmsg) &
+            (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
+                /**
+                 * Use at_extra as early reply period for recovery requests but
+                 * make sure it is not bigger than recovery time / 4
+                 */
+                at_add(&svc->srv_at_estimate,
+                       min(at_extra,
+                           req->rq_export->exp_obd->obd_recovery_timeout / 4));
         } else {
-                if (extra_time) {
-                        /* Fake our processing time into the future to ask the
-                           clients for some extra amount of time */
-                        extra_time += cfs_time_current_sec() -
-                                req->rq_arrival_time.tv_sec;
-                        at_add(&svc->srv_at_estimate, extra_time);
-                }
-                newdl = req->rq_arrival_time.tv_sec +
-                        at_get(&svc->srv_at_estimate);
+                /* Fake our processing time into the future to ask the clients
+                 * for some extra amount of time */
+                at_add(&svc->srv_at_estimate, at_extra);
         }
+
+        newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate);
         if (req->rq_deadline >= newdl) {
                 /* We're not adding any time, no need to send an early reply
                    (e.g. maybe at adaptive_max) */
@@ -1068,7 +1071,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                 list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
                                          rq_timed_list) {
                         if (rq->rq_deadline <= now + at_early_margin) {
-                                list_del(&rq->rq_timed_list);
+                                list_del_init(&rq->rq_timed_list);
                                 /**
                                  * ptlrpc_server_drop_request() may drop
                                  * refcount to 0 already. Let's check this and
@@ -1119,7 +1122,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                                 rq_timed_list);
                 list_del_init(&rq->rq_timed_list);
 
-                if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
+                if (ptlrpc_at_send_early_reply(rq) == 0)
                         ptlrpc_at_add_timed(rq);
 
                 ptlrpc_server_drop_request(rq);
@@ -1207,7 +1210,7 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
         EXIT;
 }
 
-/** Check if the request if a high priority one. */
+/** Check if the request is a high priority one. */
 static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
 {
         int opc, rc = 0;
@@ -1572,16 +1575,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
 
-        CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
-               "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
-               (request->rq_export ?
-                (char *)request->rq_export->exp_client_uuid.uuid : "0"),
-               (request->rq_export ?
-                atomic_read(&request->rq_export->exp_refcount) : -99),
-               lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
-               libcfs_id2str(request->rq_peer),
-               lustre_msg_get_opc(request->rq_reqmsg));
-
 put_rpc_export:
         if (export != NULL)
                 class_export_rpc_put(export);
@@ -1601,15 +1594,26 @@ put_conn:
 
         do_gettimeofday(&work_end);
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
-        CDEBUG(D_RPCTRACE, "request x"LPU64" opc %u from %s processed in "
-               "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
-               request->rq_xid, lustre_msg_get_opc(request->rq_reqmsg),
-               libcfs_id2str(request->rq_peer), timediff,
-               cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
-               request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
-               request->rq_transno, request->rq_status,
-               request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
-               -999);
+        CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
+               "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in "
+               "%lds (%lds total) trans "LPU64" rc %d/%d\n",
+                cfs_curproc_comm(),
+                (request->rq_export ?
+                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
+                (request->rq_export ?
+                 atomic_read(&request->rq_export->exp_refcount) : -99),
+                lustre_msg_get_status(request->rq_reqmsg),
+                request->rq_xid,
+                libcfs_id2str(request->rq_peer),
+                lustre_msg_get_opc(request->rq_reqmsg),
+                timediff,
+                cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
+                (request->rq_repmsg ?
+                 lustre_msg_get_transno(request->rq_repmsg) :
+                 request->rq_transno),
+                request->rq_status,
+                (request->rq_repmsg ?
+                 lustre_msg_get_status(request->rq_repmsg) : -999));
         if (likely(svc->srv_stats != NULL && request->rq_reqmsg != NULL)) {
                 __u32 op = lustre_msg_get_opc(request->rq_reqmsg);
                 int opc = opcode_offset(op);
@@ -1664,9 +1668,29 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
         list_del_init (&rs->rs_exp_list);
         spin_unlock (&exp->exp_lock);
 
-        /* Avoid exp_uncommitted_replies_lock contention if we 100% sure that
-         * rs has been removed from the list already */
-        if (!list_empty_careful(&rs->rs_obd_list)) {
+        /* The disk commit callback holds exp_uncommitted_replies_lock while it
+         * iterates over newly committed replies, removing them from
+         * exp_uncommitted_replies.  It then drops this lock and schedules the
+         * replies it found for handling here.
+         *
+         * We can avoid contention for exp_uncommitted_replies_lock between the
+         * HRT threads and further commit callbacks by checking rs_committed
+         * which is set in the commit callback while it holds both
+         * rs_lock and exp_uncommitted_reples.
+         * 
+         * If we see rs_committed clear, the commit callback _may_ not have
+         * handled this reply yet and we race with it to grab
+         * exp_uncommitted_replies_lock before removing the reply from
+         * exp_uncommitted_replies.  Note that if we lose the race and the
+         * reply has already been removed, list_del_init() is a noop.
+         *
+         * If we see rs_committed set, we know the commit callback is handling,
+         * or has handled this reply since store reordering might allow us to
+         * see rs_committed set out of sequence.  But since this is done
+         * holding rs_lock, we can be sure it has all completed once we hold
+         * rs_lock, which we do right next.
+         */
+        if (!rs->rs_committed) {
                 spin_lock(&exp->exp_uncommitted_replies_lock);
                 list_del_init(&rs->rs_obd_list);
                 spin_unlock(&exp->exp_uncommitted_replies_lock);
@@ -1844,6 +1868,7 @@ static int ptlrpc_main(void *arg)
         int counter = 0, rc = 0;
         ENTRY;
 
+        thread->t_pid = cfs_curproc_pid();
         cfs_daemonize_ctxt(data->name);
 
 #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
@@ -1896,8 +1921,14 @@ static int ptlrpc_main(void *arg)
                 goto out_srv_fini;
         }
 
-        /* Record that the thread is running */
-        thread->t_flags = SVC_RUNNING;
+        spin_lock(&svc->srv_lock);
+        /* SVC_STOPPING may already be set here if someone else is trying
+         * to stop the service while this new thread has been dynamically
+         * forked. We still set SVC_RUNNING to let our creator know that
+         * we are now running, however we will exit as soon as possible */
+        thread->t_flags |= SVC_RUNNING;
+        spin_unlock(&svc->srv_lock);
+
         /*
          * wake up our creator. Note: @data is invalid after this point,
          * because it's allocated on ptlrpc_start_thread() stack.
@@ -1917,7 +1948,7 @@ static int ptlrpc_main(void *arg)
 
         /* XXX maintain a list of all managed devices: insert here */
 
-        while ((thread->t_flags & SVC_STOPPING) == 0) {
+        while (!(thread->t_flags & SVC_STOPPING) && !svc->srv_is_stopping) {
                 /* Don't exit while there are replies to be handled */
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
@@ -1927,7 +1958,8 @@ static int ptlrpc_main(void *arg)
                 cond_resched();
 
                 l_wait_event_exclusive (svc->srv_waitq,
-                              ((thread->t_flags & SVC_STOPPING) != 0) ||
+                              thread->t_flags & SVC_STOPPING ||
+                              svc->srv_is_stopping ||
                               (!list_empty(&svc->srv_idle_rqbds) &&
                                svc->srv_rqbd_timeout == 0) ||
                               !list_empty(&svc->srv_req_in_queue) ||
@@ -1937,15 +1969,17 @@ static int ptlrpc_main(void *arg)
                               svc->srv_at_check,
                               &lwi);
 
+                if (thread->t_flags & SVC_STOPPING || svc->srv_is_stopping)
+                        break;
+
                 lc_watchdog_touch(thread->t_watchdog, GET_TIMEOUT(svc));
 
                 ptlrpc_check_rqbd_pool(svc);
 
-                if ((svc->srv_threads_started < svc->srv_threads_max) &&
-                    (svc->srv_n_active_reqs >= (svc->srv_threads_started - 1))){
+                if (svc->srv_threads_started < svc->srv_threads_max &&
+                    svc->srv_n_active_reqs >= (svc->srv_threads_started - 1)) 
                         /* Ignore return code - we tried... */
                         ptlrpc_start_thread(dev, svc);
-                }
 
                 if (!list_empty(&svc->srv_req_in_queue)) {
                         /* Process all incoming reqs before handling any */
@@ -1990,7 +2024,8 @@ out_srv_fini:
 
         lu_context_fini(&env.le_ctx);
 out:
-        CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc);
+        CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
+               thread, thread->t_pid, thread->t_id, rc);
 
         spin_lock(&svc->srv_lock);
         svc->srv_threads_running--; /* must know immediately */
@@ -2147,14 +2182,17 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
         struct l_wait_info lwi = { 0 };
         ENTRY;
 
-        CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
+        CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n",
+               thread, thread->t_pid);
+
         spin_lock(&svc->srv_lock);
-        thread->t_flags = SVC_STOPPING;
+        /* let the thread know that we would like it to stop asap */
+        thread->t_flags |= SVC_STOPPING;
         spin_unlock(&svc->srv_lock);
 
         cfs_waitq_broadcast(&svc->srv_waitq);
-        l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
-                     &lwi);
+        l_wait_event(thread->t_ctl_waitq,
+                     (thread->t_flags & SVC_STOPPED), &lwi);
 
         spin_lock(&svc->srv_lock);
         list_del(&thread->t_link);
@@ -2200,6 +2238,7 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
                         CERROR("cannot start %s thread #%d: rc %d\n",
                                svc->srv_thread_name, i, rc);
                         ptlrpc_stop_all_threads(svc);
+                        break;
                 }
         }
         RETURN(rc);
@@ -2217,6 +2256,10 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
                svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
                svc->srv_threads_max, svc->srv_threads_running);
+
+        if (unlikely(svc->srv_is_stopping))
+                RETURN(-ESRCH);
+
         if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
             (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
              svc->srv_threads_started == svc->srv_threads_min - 1))