Whamcloud - gitweb
Branch b1_8_gate
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index d8d737d..a0bf907 100644 (file)
@@ -300,7 +300,7 @@ struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                c->psc_watchdog_factor,
                                h, name, proc_entry,
                                prntfn, c->psc_min_threads, c->psc_max_threads,
-                               threadname, c->psc_ctx_tags);
+                               threadname, c->psc_ctx_tags, NULL);
 }
 EXPORT_SYMBOL(ptlrpc_init_svc_conf);
 
@@ -320,7 +320,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 cfs_proc_dir_entry_t *proc_entry,
                 svcreq_printfn_t svcreq_printfn,
                 int min_threads, int max_threads,
-                char *threadname, __u32 ctx_tags)
+                char *threadname, __u32 ctx_tags,
+                svc_hpreq_handler_t hp_handler)
 {
         int                    rc;
         struct ptlrpc_service *service;
@@ -355,11 +356,16 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_threads_max = max_threads;
         service->srv_thread_name = threadname;
         service->srv_ctx_tags = ctx_tags;
+        service->srv_hpreq_handler = hp_handler;
+        service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
+        service->srv_hpreq_count = 0;
+        service->srv_n_hpreq = 0;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
 
         CFS_INIT_LIST_HEAD(&service->srv_request_queue);
+        CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
         CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
         CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
         CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
@@ -520,6 +526,11 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
 {
         struct ptlrpc_service  *svc = req->rq_rqbd->rqbd_service;
 
+        if (req->rq_export) {
+                class_export_put(req->rq_export);
+                req->rq_export = NULL;
+        }
+
         if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
                 DEBUG_REQ(D_INFO, req, "free req");
 
@@ -537,7 +548,7 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
 static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 {
         struct obd_export *oldest_exp;
-        time_t oldest_time;
+        time_t oldest_time, new_time;
 
         ENTRY;
 
@@ -548,9 +559,13 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
            of the list, we can be really lazy here - we don't have to evict
            at the exact right moment.  Eventually, all silent exports
            will make it to the top of the list. */
-        exp->exp_last_request_time = max(exp->exp_last_request_time,
-                                         cfs_time_current_sec() + extra_delay);
 
+        /* Do not pay attention on 1sec or smaller renewals. */
+        new_time = cfs_time_current_sec() + extra_delay;
+        if (exp->exp_last_request_time + 1 /*second */ >= new_time)
+                RETURN_EXIT;
+
+        exp->exp_last_request_time = new_time;
         CDEBUG(D_HA, "updating export %s at "CFS_TIME_T" exp %p\n",
                exp->exp_client_uuid.uuid,
                exp->exp_last_request_time, exp);
@@ -563,8 +578,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         if (list_empty(&exp->exp_obd_chain_timed)) {
                 /* this one is not timed */
                 spin_unlock(&exp->exp_obd->obd_dev_lock);
-                EXIT;
-                return;
+                RETURN_EXIT;
         }
 
         list_move_tail(&exp->exp_obd_chain_timed,
@@ -924,6 +938,167 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
         RETURN(0);
 }
 
+/**
+ * Put the request to the export list if the request may become
+ * a high priority one.
+ */
+static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
+                             struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
+
+        if (svc->srv_hpreq_handler) {
+                rc = svc->srv_hpreq_handler(req);
+                if (rc)
+                        RETURN(rc);
+        }
+        if (req->rq_export && req->rq_ops) {
+                spin_lock(&req->rq_export->exp_lock);
+                list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc);
+                spin_unlock(&req->rq_export->exp_lock);
+        }
+
+        RETURN(0);
+}
+
+/** Remove the request from the export list. */
+static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
+{
+        ENTRY;
+        if (req->rq_export && req->rq_ops) {
+                spin_lock(&req->rq_export->exp_lock);
+                list_del_init(&req->rq_exp_list);
+                spin_unlock(&req->rq_export->exp_lock);
+        }
+        EXIT;
+}
+
+/**
+ * Make the request a high priority one.
+ *
+ * All the high priority requests are queued in a separate FIFO
+ * ptlrpc_service::srv_request_hpq list which is parallel to
+ * ptlrpc_service::srv_request_queue list but has a higher priority
+ * for handling.
+ *
+ * \see ptlrpc_server_handle_request().
+ */
+static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
+                                        struct ptlrpc_request *req)
+{
+        ENTRY;
+        LASSERT(svc != NULL);
+        spin_lock(&req->rq_lock);
+        if (req->rq_hp == 0) {
+                int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+                /* Add to the high priority queue. */
+                list_move_tail(&req->rq_list, &svc->srv_request_hpq);
+                req->rq_hp = 1;
+                if (opc != OBD_PING)
+                        DEBUG_REQ(D_NET, req, "high priority req");
+        }
+        spin_unlock(&req->rq_lock);
+        EXIT;
+}
+
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
+{
+        struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+        ENTRY;
+
+        spin_lock(&svc->srv_lock);
+        /* It may happen that the request is already taken for the processing
+         * but still in the export list, do not re-add it into the HP list. */
+        if (req->rq_phase == RQ_PHASE_NEW)
+                ptlrpc_hpreq_reorder_nolock(svc, req);
+        spin_unlock(&svc->srv_lock);
+        EXIT;
+}
+
+/** Check if the request if a high priority one. */
+static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
+{
+        int opc, rc = 0;
+        ENTRY;
+
+        /* Check by request opc. */
+        opc = lustre_msg_get_opc(req->rq_reqmsg);
+        if (opc == OBD_PING)
+                RETURN(1);
+
+        /* Perform request specific check. */
+        if (req->rq_ops && req->rq_ops->hpreq_check)
+                rc = req->rq_ops->hpreq_check(req);
+        RETURN(rc);
+}
+
+/** Check if a request is a high priority one. */
+static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
+                                     struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
+
+        rc = ptlrpc_server_hpreq_check(req);
+        if (rc < 0)
+                RETURN(rc);
+
+        spin_lock(&svc->srv_lock);
+        /* Before inserting the request into the queue, check if it is not
+         * inserted yet, or even already handled -- it may happen due to
+         * a racing ldlm_server_blocking_ast(). */
+        if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) {
+                if (rc)
+                        ptlrpc_hpreq_reorder_nolock(svc, req);
+                else
+                        list_add_tail(&req->rq_list, &svc->srv_request_queue);
+        }
+        spin_unlock(&svc->srv_lock);
+
+        RETURN(0);
+}
+
+/* Only allow normal priority requests on a service that has a high-priority
+ * queue if forced (i.e. cleanup), if there are other high priority requests
+ * already being processed (i.e. those threads can service more high-priority
+ * requests), or if there are enough idle threads that a later thread can do
+ * a high priority request. */
+static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
+{
+        return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
+               svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+}
+
+static struct ptlrpc_request *
+ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+{
+        struct ptlrpc_request *req = NULL;
+        ENTRY;
+
+        if (ptlrpc_server_allow_normal(svc, force) &&
+            !list_empty(&svc->srv_request_queue) &&
+            (list_empty(&svc->srv_request_hpq) ||
+             svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
+                req = list_entry(svc->srv_request_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count = 0;
+        } else if (!list_empty(&svc->srv_request_hpq)) {
+                req = list_entry(svc->srv_request_hpq.next,
+                                 struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count++;
+        }
+        RETURN(req);
+}
+
+static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+        return ((ptlrpc_server_allow_normal(svc, force) &&
+                 !list_empty(&svc->srv_request_queue)) ||
+                !list_empty(&svc->srv_request_hpq));
+}
+
 /* Handle freshly incoming reqs, add to timed early reply list,
    pass on to regular request queue */
 static int
@@ -1003,10 +1178,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                                           "illegal security flavor,");
                 }
 
-                class_export_put(req->rq_export);
-                req->rq_export = NULL;
                 if (rc)
                         goto err_req;
+                ptlrpc_update_export_timer(req->rq_export, 0);
         }
 
         /* req_in handling should/must be fast */
@@ -1027,12 +1201,15 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         }
 
         ptlrpc_at_add_timed(req);
+        rc = ptlrpc_hpreq_init(svc, req);
+        if (rc)
+                GOTO(err_req, rc);
 
         /* Move it over to the request processing queue */
-        spin_lock(&svc->srv_lock);
-        list_add_tail(&req->rq_list, &svc->srv_request_queue);
+        rc = ptlrpc_server_request_add(svc, req);
+        if (rc)
+                GOTO(err_req, rc);
         cfs_waitq_signal(&svc->srv_waitq);
-        spin_unlock(&svc->srv_lock);
         RETURN(1);
 
 err_req:
@@ -1054,13 +1231,14 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         struct timeval         work_start;
         struct timeval         work_end;
         long                   timediff;
-        int                    rc;
+        int                    opc, rc;
+        int                    fail_opc = 0;
         ENTRY;
 
         LASSERT(svc);
 
         spin_lock(&svc->srv_lock);
-        if (unlikely(list_empty (&svc->srv_request_queue) ||
+        if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
             (
 #ifndef __KERNEL__
              /* !@%$# liblustre only has 1 thread */
@@ -1073,16 +1251,47 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                   * That means we always need at least 2 service threads. */
                 spin_unlock(&svc->srv_lock);
                 RETURN(0);
+             }
+
+        request = ptlrpc_server_request_get(svc, 0);
+        if  (request == NULL) {
+                spin_unlock(&svc->srv_lock);
+                RETURN(0);
+        }
+
+        opc = lustre_msg_get_opc(request->rq_reqmsg);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
+                fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
+        else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+                fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
+
+        if (unlikely(fail_opc)) {
+                if (request->rq_export && request->rq_ops) {
+                        spin_unlock(&svc->srv_lock);
+                        OBD_FAIL_TIMEOUT(fail_opc, 4);
+                        spin_lock(&svc->srv_lock);
+                        request = ptlrpc_server_request_get(svc, 0);
+                        if  (request == NULL) {
+                                spin_unlock(&svc->srv_lock);
+                                RETURN(0);
+                        }
+                        LASSERT(ptlrpc_server_request_pending(svc, 0));
+                }
         }
 
-        request = list_entry (svc->srv_request_queue.next,
-                              struct ptlrpc_request, rq_list);
-        list_del_init (&request->rq_list);
+        list_del_init(&request->rq_list);
         svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
+        if (request->rq_hp)
+                svc->srv_n_hpreq++;
 
+        /* The phase is changed under the lock here because we need to know
+         * the request is under processing (see ptlrpc_hpreq_reorder()). */
+        ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
         spin_unlock(&svc->srv_lock);
 
+        ptlrpc_hpreq_fini(request);
+
         if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
                 libcfs_debug_dumplog();
 
@@ -1115,9 +1324,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         if (thread)
                 request->rq_svc_thread->t_env->le_ses = &request->rq_session;
 
-        request->rq_export = class_conn2export(
-                                     lustre_msg_get_handle(request->rq_reqmsg));
-
         if (likely(request->rq_export)) {
                 if (unlikely(ptlrpc_check_req(request)))
                         goto put_conn;
@@ -1138,8 +1344,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 goto put_rpc_export;
         }
 
-        ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-
         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
                (request->rq_export ?
@@ -1170,9 +1374,6 @@ put_rpc_export:
         if (export != NULL)
                 class_export_rpc_put(export);
 put_conn:
-        if (likely(request->rq_export != NULL))
-                class_export_put(request->rq_export);
-
         lu_context_exit(&request->rq_session);
         lu_context_fini(&request->rq_session);
 
@@ -1217,6 +1418,10 @@ put_conn:
         }
 
 out_req:
+        spin_lock(&svc->srv_lock);
+        if (request->rq_hp)
+                svc->srv_n_hpreq--;
+        spin_unlock(&svc->srv_lock);
         ptlrpc_server_finish_request(request);
 
         RETURN(1);
@@ -1515,7 +1720,7 @@ static int ptlrpc_main(void *arg)
                                svc->srv_rqbd_timeout == 0) ||
                               !list_empty(&svc->srv_req_in_queue) ||
                               !list_empty(&svc->srv_reply_queue) ||
-                              (!list_empty(&svc->srv_request_queue) &&
+                              (ptlrpc_server_request_pending(svc, 0) &&
                                (svc->srv_n_active_reqs <
                                 (svc->srv_threads_running - 1))) ||
                               svc->srv_at_check,
@@ -1550,7 +1755,7 @@ static int ptlrpc_main(void *arg)
                         ptlrpc_at_check_timed(svc);
 
                 /* don't handle requests in the last thread */
-                if (!list_empty (&svc->srv_request_queue) &&
+                if (ptlrpc_server_request_pending(svc, 0) &&
                     (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
@@ -1806,16 +2011,14 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 service->srv_n_active_reqs++;
                 ptlrpc_server_finish_request(req);
         }
-        while (!list_empty(&service->srv_request_queue)) {
-                struct ptlrpc_request *req =
-                        list_entry(service->srv_request_queue.next,
-                                   struct ptlrpc_request,
-                                   rq_list);
+        while (ptlrpc_server_request_pending(service, 1)) {
+                struct ptlrpc_request *req;
 
+                req = ptlrpc_server_request_get(service, 1);
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-
+                ptlrpc_hpreq_fini(req);
                 ptlrpc_server_finish_request(req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
@@ -1879,14 +2082,18 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
         do_gettimeofday(&right_now);
 
         spin_lock(&svc->srv_lock);
-        if (list_empty(&svc->srv_request_queue)) {
+        if (!ptlrpc_server_request_pending(svc, 1)) {
                 spin_unlock(&svc->srv_lock);
                 return 0;
         }
 
         /* How long has the next entry been waiting? */
-        request = list_entry(svc->srv_request_queue.next,
-                             struct ptlrpc_request, rq_list);
+        if (list_empty(&svc->srv_request_queue))
+                request = list_entry(svc->srv_request_hpq.next,
+                                     struct ptlrpc_request, rq_list);
+        else
+                request = list_entry(svc->srv_request_queue.next,
+                                     struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
         spin_unlock(&svc->srv_lock);