Whamcloud - gitweb
LU-2936 ptlrpc: Do not try to fetch hp request blindly
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index ab19b5c..6278f7b 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2010, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -67,8 +67,10 @@ static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
 static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
 static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
 
-static CFS_LIST_HEAD(ptlrpc_all_services);
-spinlock_t ptlrpc_all_services_lock;
+/** Holds a list of all PTLRPC services */
+CFS_LIST_HEAD(ptlrpc_all_services);
+/** Used to protect the \e ptlrpc_all_services list */
+struct mutex ptlrpc_all_services_mutex;
 
 struct ptlrpc_request_buffer_desc *
 ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
@@ -636,8 +638,6 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc,
 
        /* acitve requests and hp requests */
        spin_lock_init(&svcpt->scp_req_lock);
-       CFS_INIT_LIST_HEAD(&svcpt->scp_req_pending);
-       CFS_INIT_LIST_HEAD(&svcpt->scp_hreq_pending);
 
        /* reply states */
        spin_lock_init(&svcpt->scp_rep_lock);
@@ -783,9 +783,8 @@ ptlrpc_register_service(struct ptlrpc_service_conf *conf,
        CFS_INIT_LIST_HEAD(&service->srv_list); /* for safty of cleanup */
 
        /* buffer configuration */
-       service->srv_nbuf_per_group     = test_req_buffer_pressure ?  1 :
-                                         max(conf->psc_buf.bc_nbufs /
-                                             service->srv_ncpts, 1U);
+       service->srv_nbuf_per_group     = test_req_buffer_pressure ?
+                                         1 : conf->psc_buf.bc_nbufs;
        service->srv_max_req_size       = conf->psc_buf.bc_req_max_size +
                                          SPTLRPC_MAX_PAYLOAD;
        service->srv_buf_size           = conf->psc_buf.bc_buf_size;
@@ -824,15 +823,19 @@ ptlrpc_register_service(struct ptlrpc_service_conf *conf,
        rc = LNetSetLazyPortal(service->srv_req_portal);
        LASSERT(rc == 0);
 
-       spin_lock(&ptlrpc_all_services_lock);
-        cfs_list_add (&service->srv_list, &ptlrpc_all_services);
-       spin_unlock(&ptlrpc_all_services_lock);
+       mutex_lock(&ptlrpc_all_services_mutex);
+       cfs_list_add (&service->srv_list, &ptlrpc_all_services);
+       mutex_unlock(&ptlrpc_all_services_mutex);
 
-        if (proc_entry != NULL)
-                ptlrpc_lprocfs_register_service(proc_entry, service);
+       if (proc_entry != NULL)
+               ptlrpc_lprocfs_register_service(proc_entry, service);
 
-        CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
-               service->srv_name, service->srv_req_portal);
+       rc = ptlrpc_service_nrs_setup(service);
+       if (rc != 0)
+               GOTO(failed, rc);
+
+       CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
+              service->srv_name, service->srv_req_portal);
 
 #ifdef __KERNEL__
        rc = ptlrpc_start_threads(service);
@@ -991,11 +994,14 @@ static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
        ptlrpc_server_hpreq_fini(req);
 
        spin_lock(&svcpt->scp_req_lock);
+       ptlrpc_nrs_req_stop_nolock(req);
        svcpt->scp_nreqs_active--;
        if (req->rq_hp)
                svcpt->scp_nhreqs_active--;
        spin_unlock(&svcpt->scp_req_lock);
 
+       ptlrpc_nrs_req_finalize(req);
+
        ptlrpc_server_drop_request(req);
 }
 
@@ -1091,14 +1097,14 @@ static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
         int rc = 0;
 
-        if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
-                     req->rq_export->exp_conn_cnt)) {
-                DEBUG_REQ(D_ERROR, req,
-                          "DROPPING req from old connection %d < %d",
-                          lustre_msg_get_conn_cnt(req->rq_reqmsg),
-                          req->rq_export->exp_conn_cnt);
-                return -EEXIST;
-        }
+       if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
+                    req->rq_export->exp_conn_cnt)) {
+               DEBUG_REQ(D_RPCTRACE, req,
+                         "DROPPING req from old connection %d < %d",
+                         lustre_msg_get_conn_cnt(req->rq_reqmsg),
+                         req->rq_export->exp_conn_cnt);
+               return -EEXIST;
+       }
         if (unlikely(req->rq_export->exp_obd &&
                      req->rq_export->exp_obd->obd_fail)) {
              /* Failing over, don't handle any more reqs, send
@@ -1471,23 +1477,39 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
  * Put the request to the export list if the request may become
  * a high priority one.
  */
-static int ptlrpc_server_hpreq_init(struct ptlrpc_service *svc,
+static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
                                    struct ptlrpc_request *req)
 {
-        int rc = 0;
-        ENTRY;
+       int rc = 0;
+       ENTRY;
 
-       if (svc->srv_ops.so_hpreq_handler) {
-               rc = svc->srv_ops.so_hpreq_handler(req);
-                if (rc)
-                        RETURN(rc);
-        }
-        if (req->rq_export && req->rq_ops) {
-                /* Perform request specific check. We should do this check
-                 * before the request is added into exp_hp_rpcs list otherwise
-                 * it may hit swab race at LU-1044. */
-                if (req->rq_ops->hpreq_check)
-                        rc = req->rq_ops->hpreq_check(req);
+       if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
+               rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req);
+               if (rc < 0)
+                       RETURN(rc);
+               LASSERT(rc == 0);
+       }
+       if (req->rq_export && req->rq_ops) {
+               /* Perform request specific check. We should do this check
+                * before the request is added into exp_hp_rpcs list otherwise
+                * it may hit swab race at LU-1044. */
+               if (req->rq_ops->hpreq_check) {
+                       rc = req->rq_ops->hpreq_check(req);
+                       /**
+                        * XXX: Out of all current
+                        * ptlrpc_hpreq_ops::hpreq_check(), only
+                        * ldlm_cancel_hpreq_check() can return an error code;
+                        * other functions assert in similar places, which seems
+                        * odd. What also does not seem right is that handlers
+                        * for those RPCs do not assert on the same checks, but
+                        * rather handle the error cases. e.g. see
+                        * ost_rw_hpreq_check(), and ost_brw_read(),
+                        * ost_brw_write().
+                        */
+                       if (rc < 0)
+                               RETURN(rc);
+                       LASSERT(rc == 0 || rc == 1);
+               }
 
                spin_lock_bh(&req->rq_export->exp_rpc_lock);
                cfs_list_add(&req->rq_exp_list,
@@ -1495,6 +1517,8 @@ static int ptlrpc_server_hpreq_init(struct ptlrpc_service *svc,
                spin_unlock_bh(&req->rq_export->exp_rpc_lock);
        }
 
+       ptlrpc_nrs_req_initialize(svcpt, req, rc);
+
        RETURN(rc);
 }
 
@@ -1539,77 +1563,17 @@ int ptlrpc_hpreq_handler(struct ptlrpc_request *req)
 }
 EXPORT_SYMBOL(ptlrpc_hpreq_handler);
 
-/**
- * Make the request a high priority one.
- *
- * All the high priority requests are queued in a separate FIFO
- * ptlrpc_service_part::scp_hpreq_pending list which is parallel to
- * ptlrpc_service_part::scp_req_pending list but has a higher priority
- * for handling.
- *
- * \see ptlrpc_server_handle_request().
- */
-static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service_part *svcpt,
-                                        struct ptlrpc_request *req)
-{
-       ENTRY;
-
-       spin_lock(&req->rq_lock);
-        if (req->rq_hp == 0) {
-                int opc = lustre_msg_get_opc(req->rq_reqmsg);
-
-                /* Add to the high priority queue. */
-               cfs_list_move_tail(&req->rq_list, &svcpt->scp_hreq_pending);
-                req->rq_hp = 1;
-                if (opc != OBD_PING)
-                        DEBUG_REQ(D_RPCTRACE, req, "high priority req");
-        }
-       spin_unlock(&req->rq_lock);
-       EXIT;
-}
-
-/**
- * \see ptlrpc_hpreq_reorder_nolock
- */
-void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
-{
-       struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-       ENTRY;
-
-       spin_lock(&svcpt->scp_req_lock);
-       /* It may happen that the request is already taken for the processing
-        * but still in the export list, or the request is not in the request
-        * queue but in the export list already, do not add it into the
-        * HP list. */
-       if (!cfs_list_empty(&req->rq_list))
-               ptlrpc_hpreq_reorder_nolock(svcpt, req);
-       spin_unlock(&svcpt->scp_req_lock);
-       EXIT;
-}
-EXPORT_SYMBOL(ptlrpc_hpreq_reorder);
-
-/**
- * Add a request to the regular or HP queue; optionally perform HP request
- * initialization.
- */
 static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                                     struct ptlrpc_request *req)
 {
        int     rc;
        ENTRY;
 
-       rc = ptlrpc_server_hpreq_init(svcpt->scp_service, req);
+       rc = ptlrpc_server_hpreq_init(svcpt, req);
        if (rc < 0)
                RETURN(rc);
 
-       spin_lock(&svcpt->scp_req_lock);
-
-       if (rc)
-               ptlrpc_hpreq_reorder_nolock(svcpt, req);
-       else
-               cfs_list_add_tail(&req->rq_list, &svcpt->scp_req_pending);
-
-       spin_unlock(&svcpt->scp_req_lock);
+       ptlrpc_nrs_req_add(svcpt, req, !!rc);
 
        RETURN(0);
 }
@@ -1624,6 +1588,9 @@ static int ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt,
 {
        int running = svcpt->scp_nthrs_running;
 
+       if (!nrs_svcpt_has_hp(svcpt))
+               return 0;
+
        if (force)
                return 1;
 
@@ -1641,7 +1608,7 @@ static int ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt,
        if (svcpt->scp_nhreqs_active == 0)
                return 1;
 
-       return cfs_list_empty(&svcpt->scp_req_pending) ||
+       return !ptlrpc_nrs_req_pending_nolock(svcpt, false) ||
               svcpt->scp_hreq_count < svcpt->scp_service->srv_hpreq_ratio;
 }
 
@@ -1649,7 +1616,7 @@ static int ptlrpc_server_high_pending(struct ptlrpc_service_part *svcpt,
                                      int force)
 {
        return ptlrpc_server_allow_high(svcpt, force) &&
-              !cfs_list_empty(&svcpt->scp_hreq_pending);
+              ptlrpc_nrs_req_pending_nolock(svcpt, true);
 }
 
 /**
@@ -1685,14 +1652,14 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
                return 0;
 
        return svcpt->scp_nhreqs_active > 0 ||
-              svcpt->scp_service->srv_ops.so_hpreq_handler == NULL;
+              !nrs_svcpt_has_hp(svcpt);
 }
 
 static int ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt,
                                        int force)
 {
        return ptlrpc_server_allow_normal(svcpt, force) &&
-              !cfs_list_empty(&svcpt->scp_req_pending);
+              ptlrpc_nrs_req_pending_nolock(svcpt, false);
 }
 
 /**
@@ -1722,15 +1689,13 @@ ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, int force)
        ENTRY;
 
        if (ptlrpc_server_high_pending(svcpt, force)) {
-               req = cfs_list_entry(svcpt->scp_hreq_pending.next,
-                                    struct ptlrpc_request, rq_list);
+               req = ptlrpc_nrs_req_poll_nolock(svcpt, true);
                svcpt->scp_hreq_count++;
                RETURN(req);
        }
 
        if (ptlrpc_server_normal_pending(svcpt, force)) {
-               req = cfs_list_entry(svcpt->scp_req_pending.next,
-                                    struct ptlrpc_request, rq_list);
+               req = ptlrpc_nrs_req_poll_nolock(svcpt, false);
                svcpt->scp_hreq_count = 0;
                RETURN(req);
        }
@@ -1938,12 +1903,12 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                        }
                }
        }
-
-       cfs_list_del_init(&request->rq_list);
+       ptlrpc_nrs_req_del_nolock(request);
        svcpt->scp_nreqs_active++;
        if (request->rq_hp)
                svcpt->scp_nhreqs_active++;
 
+       ptlrpc_nrs_req_start_nolock(request);
        spin_unlock(&svcpt->scp_req_lock);
 
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
@@ -2020,19 +1985,19 @@ put_conn:
         lu_context_exit(&request->rq_session);
         lu_context_fini(&request->rq_session);
 
-        if (unlikely(cfs_time_current_sec() > request->rq_deadline)) {
-                DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer "
-                          "than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);"
-                          " client may timeout.",
-                          request->rq_xid, cfs_time_sub(request->rq_deadline,
-                          request->rq_arrival_time.tv_sec),
-                          cfs_time_sub(cfs_time_current_sec(),
-                          request->rq_deadline));
-        }
+       if (unlikely(cfs_time_current_sec() > request->rq_deadline)) {
+                    DEBUG_REQ(D_WARNING, request, "Request took longer "
+                              "than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);"
+                              " client may timeout.",
+                              cfs_time_sub(request->rq_deadline,
+                                           request->rq_arrival_time.tv_sec),
+                              cfs_time_sub(cfs_time_current_sec(),
+                                           request->rq_deadline));
+       }
 
         cfs_gettimeofday(&work_end);
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
-        CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
+       CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in "
                "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
                 cfs_curproc_comm(),
@@ -2785,12 +2750,14 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
        struct l_wait_info      lwi = { 0 };
        struct ptlrpc_thread    *thread;
-       struct ptlrpc_service   *svc = svcpt->scp_service;
+       struct ptlrpc_service   *svc;
        int                     rc;
        ENTRY;
 
        LASSERT(svcpt != NULL);
 
+       svc = svcpt->scp_service;
+
        CDEBUG(D_RPCTRACE, "%s[%d] started %d min %d max %d\n",
               svc->srv_name, svcpt->scp_cpt, svcpt->scp_nthrs_running,
               svc->srv_nthrs_cpt_init, svc->srv_nthrs_cpt_limit);
@@ -3081,7 +3048,7 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
 
                while (ptlrpc_server_request_pending(svcpt, 1)) {
                        req = ptlrpc_server_request_get(svcpt, 1);
-                       cfs_list_del(&req->rq_list);
+                       ptlrpc_nrs_req_del_nolock(req);
                        svcpt->scp_nreqs_active++;
                        ptlrpc_server_hpreq_fini(req);
 
@@ -3164,17 +3131,19 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
 
        service->srv_is_stopping = 1;
 
-       spin_lock(&ptlrpc_all_services_lock);
+       mutex_lock(&ptlrpc_all_services_mutex);
        cfs_list_del_init(&service->srv_list);
-       spin_unlock(&ptlrpc_all_services_lock);
-
-       ptlrpc_lprocfs_unregister_service(service);
+       mutex_unlock(&ptlrpc_all_services_mutex);
 
        ptlrpc_service_del_atimer(service);
        ptlrpc_stop_all_threads(service);
 
        ptlrpc_service_unlink_rqbd(service);
        ptlrpc_service_purge_all(service);
+       ptlrpc_service_nrs_cleanup(service);
+
+       ptlrpc_lprocfs_unregister_service(service);
+
        ptlrpc_service_free(service);
 
        RETURN(0);
@@ -3189,27 +3158,24 @@ EXPORT_SYMBOL(ptlrpc_unregister_service);
  * to be shot, so it's intentionally non-aggressive. */
 int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
 {
-       struct ptlrpc_request           *request;
+       struct ptlrpc_request           *request = NULL;
        struct timeval                  right_now;
        long                            timediff;
 
        cfs_gettimeofday(&right_now);
 
        spin_lock(&svcpt->scp_req_lock);
-       if (!ptlrpc_server_request_pending(svcpt, 1)) {
+        /* How long has the next entry been waiting? */
+       if (ptlrpc_server_high_pending(svcpt, 1))
+               request = ptlrpc_nrs_req_poll_nolock(svcpt, true);
+       else if (ptlrpc_server_normal_pending(svcpt, 1))
+               request = ptlrpc_nrs_req_poll_nolock(svcpt, false);
+
+       if (request == NULL) {
                spin_unlock(&svcpt->scp_req_lock);
                return 0;
        }
 
-       /* How long has the next entry been waiting? */
-       if (cfs_list_empty(&svcpt->scp_req_pending)) {
-               request = cfs_list_entry(svcpt->scp_hreq_pending.next,
-                                        struct ptlrpc_request, rq_list);
-       } else {
-               request = cfs_list_entry(svcpt->scp_req_pending.next,
-                                        struct ptlrpc_request, rq_list);
-       }
-
        timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
        spin_unlock(&svcpt->scp_req_lock);