Whamcloud - gitweb
LU-4499 nrs: adjust the order of REQ NRS initilization
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 706b042..ba6272a 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -35,6 +35,7 @@
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
+#include <linux/kthread.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
@@ -69,7 +70,7 @@ struct list_head ptlrpc_all_services;
 /** Used to protect the \e ptlrpc_all_services list */
 struct mutex ptlrpc_all_services_mutex;
 
-struct ptlrpc_request_buffer_desc *
+static struct ptlrpc_request_buffer_desc *
 ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
 {
        struct ptlrpc_service             *svc = svcpt->scp_service;
@@ -99,7 +100,7 @@ ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
        return rqbd;
 }
 
-void
+static void
 ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 {
        struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
@@ -116,7 +117,7 @@ ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
        OBD_FREE_PTR(rqbd);
 }
 
-int
+static int
 ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
 {
        struct ptlrpc_service             *svc = svcpt->scp_service;
@@ -431,7 +432,6 @@ void ptlrpc_commit_replies(struct obd_export *exp)
        rs_batch_fini(&batch);
        EXIT;
 }
-EXPORT_SYMBOL(ptlrpc_commit_replies);
 
 static int
 ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
@@ -554,7 +554,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
 
                /*
                 * User wants to increase number of threads with for
-                * each CPU core/HT, most likely the factor is larger then
+                * each CPU core/HT, most likely the factor is larger than
                 * one thread/core because service threads are supposed to
                 * be blocked by lock or wait for IO.
                 */
@@ -1008,7 +1008,7 @@ static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
 }
 
 /**
- * to finish a active request: stop sending more early replies, and release
+ * to finish an active request: stop sending more early replies, and release
  * the request. should be called after we finished handling the request.
  */
 static void ptlrpc_server_finish_active_request(
@@ -1110,7 +1110,6 @@ void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 
         EXIT;
 }
-EXPORT_SYMBOL(ptlrpc_update_export_timer);
 
 /**
  * Sanity check request \a req.
@@ -1261,11 +1260,18 @@ ptlrpc_at_remove_timed(struct ptlrpc_request *req)
 static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 {
        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-        struct ptlrpc_request *reqcopy;
-        struct lustre_msg *reqmsg;
-        cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
-        int rc;
-        ENTRY;
+       struct ptlrpc_request *reqcopy;
+       struct lustre_msg *reqmsg;
+       cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
+       time_t  newdl;
+       int rc;
+
+       ENTRY;
+
+       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
+               /* don't send early reply */
+               RETURN(1);
+       }
 
         /* deadline is when the client expects us to reply, margin is the
            difference between clients' and servers' expectations */
@@ -1302,10 +1308,11 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
                 * during the recovery period send at least 4 early replies,
                 * spacing them every at_extra if we can. at_estimate should
                 * always equal this fixed value during recovery. */
-               at_measured(&svcpt->scp_at_estimate,
-                           cfs_time_current_sec() -
-                           req->rq_arrival_time.tv_sec + min(at_extra,
-                           req->rq_export->exp_obd->obd_recovery_timeout / 4));
+               /* Don't account request processing time into AT history
+                * during recovery, it is not service time we need but
+                * includes also waiting time for recovering clients */
+               newdl = cfs_time_current_sec() + min(at_extra,
+                       req->rq_export->exp_obd->obd_recovery_timeout / 4);
        } else {
                /* We want to extend the request deadline by at_extra seconds,
                 * so we set our service estimate to reflect how much time has
@@ -1317,17 +1324,16 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
                at_measured(&svcpt->scp_at_estimate, at_extra +
                            cfs_time_current_sec() -
                            req->rq_arrival_time.tv_sec);
-
+               newdl = req->rq_arrival_time.tv_sec +
+                       at_get(&svcpt->scp_at_estimate);
        }
+
        /* Check to see if we've actually increased the deadline -
         * we may be past adaptive_max */
-       if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
-           at_get(&svcpt->scp_at_estimate)) {
+       if (req->rq_deadline >= newdl) {
                DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
                          "(%ld/%ld), not sending early reply\n",
-                         olddl, req->rq_arrival_time.tv_sec +
-                         at_get(&svcpt->scp_at_estimate) -
-                         cfs_time_current_sec());
+                         olddl, newdl - cfs_time_current_sec());
                RETURN(-ETIMEDOUT);
        }
 
@@ -1349,6 +1355,14 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         reqcopy->rq_reqmsg = reqmsg;
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
 
+       /*
+        * tgt_brw_read() and tgt_brw_write() may have decided not to reply.
+        * Without this check, we would fail the rq_no_reply assertion in
+        * ptlrpc_send_reply().
+        */
+       if (reqcopy->rq_no_reply)
+               GOTO(out, rc = -ETIMEDOUT);
+
        LASSERT(atomic_read(&req->rq_refcount));
        /** if it is last refcount then early reply isn't needed */
        if (atomic_read(&req->rq_refcount) == 1) {
@@ -1377,8 +1391,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 
        if (!rc) {
                /* Adjust our own deadline to what we told the client */
-               req->rq_deadline = req->rq_arrival_time.tv_sec +
-                                  at_get(&svcpt->scp_at_estimate);
+               req->rq_deadline = newdl;
                req->rq_early_count++; /* number sent, server side */
        } else {
                DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
@@ -1545,67 +1558,37 @@ found:
 }
 
 /**
- * Put the request to the export list if the request may become
- * a high priority one.
+ * Check if a request should be assigned with a high priority.
+ *
+ * \retval     < 0: error occurred
+ *               0: normal RPC request
+ *              +1: high priority request
  */
 static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
                                    struct ptlrpc_request *req)
 {
-       struct list_head        *list;
-       int              rc, hp = 0;
-
+       int rc;
        ENTRY;
 
-       if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
+       if (svcpt->scp_service->srv_ops.so_hpreq_handler != NULL) {
                rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req);
                if (rc < 0)
                        RETURN(rc);
+
                LASSERT(rc == 0);
        }
-       if (req->rq_export) {
-               if (req->rq_ops) {
-                       /* Perform request specific check. We should do this
-                        * check before the request is added into exp_hp_rpcs
-                        * list otherwise it may hit swab race at LU-1044. */
-                       if (req->rq_ops->hpreq_check) {
-                               rc = req->rq_ops->hpreq_check(req);
-                               /**
-                                * XXX: Out of all current
-                                * ptlrpc_hpreq_ops::hpreq_check(), only
-                                * ldlm_cancel_hpreq_check() can return an
-                                * error code; other functions assert in
-                                * similar places, which seems odd.
-                                * What also does not seem right is that
-                                * handlers for those RPCs do not assert
-                                * on the same checks, but rather handle the
-                                * error cases. e.g. see ost_rw_hpreq_check(),
-                                * and ost_brw_read(), ost_brw_write().
-                                */
-                               if (rc < 0)
-                                       RETURN(rc);
-                               LASSERT(rc == 0 || rc == 1);
-                               hp = rc;
-                       }
-                       list = &req->rq_export->exp_hp_rpcs;
-               } else {
-                       list = &req->rq_export->exp_reg_rpcs;
-               }
 
-               /* do search for duplicated xid and the adding to the list
-                * atomically */
-               spin_lock_bh(&req->rq_export->exp_rpc_lock);
-               rc = ptlrpc_server_check_resend_in_progress(req);
-               if (rc < 0) {
-                       spin_unlock_bh(&req->rq_export->exp_rpc_lock);
-                       RETURN(rc);
+       if (req->rq_export != NULL && req->rq_ops != NULL) {
+               /* Perform request specific check. We should do this
+                * check before the request is added into exp_hp_rpcs
+                * list otherwise it may hit swab race at LU-1044. */
+               if (req->rq_ops->hpreq_check != NULL) {
+                       rc = req->rq_ops->hpreq_check(req);
+                       LASSERT(rc <= 1); /* can only return error, 0, or 1 */
                }
-               list_add(&req->rq_exp_list, list);
-               spin_unlock_bh(&req->rq_export->exp_rpc_lock);
        }
 
-       ptlrpc_nrs_req_initialize(svcpt, req, !!hp);
-
-       RETURN(hp);
+       RETURN(rc);
 }
 
 /** Remove the request from the export list. */
@@ -1652,13 +1635,38 @@ EXPORT_SYMBOL(ptlrpc_hpreq_handler);
 static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                                     struct ptlrpc_request *req)
 {
-       int     rc;
+       int rc;
+       bool hp;
        ENTRY;
 
        rc = ptlrpc_server_hpreq_init(svcpt, req);
        if (rc < 0)
                RETURN(rc);
 
+       hp = rc > 0;
+       ptlrpc_nrs_req_initialize(svcpt, req, hp);
+
+       if (req->rq_export != NULL) {
+               struct obd_export *exp = req->rq_export;
+
+               /* do search for duplicated xid and the adding to the list
+                * atomically */
+               spin_lock_bh(&exp->exp_rpc_lock);
+               rc = ptlrpc_server_check_resend_in_progress(req);
+               if (rc < 0) {
+                       spin_unlock_bh(&exp->exp_rpc_lock);
+
+                       ptlrpc_nrs_req_finalize(req);
+                       RETURN(rc);
+               }
+
+               if (hp || req->rq_ops != NULL)
+                       list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
+               else
+                       list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+               spin_unlock_bh(&exp->exp_rpc_lock);
+       }
+
        /* the current thread is not the processing thread for this request
         * since that, but request is in exp_hp_list and can be find there.
         * Remove all relations between request and old thread. */
@@ -1666,7 +1674,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
        req->rq_svc_thread = NULL;
        req->rq_session.lc_thread = NULL;
 
-       ptlrpc_nrs_req_add(svcpt, req, !!rc);
+       ptlrpc_nrs_req_add(svcpt, req, hp);
 
        RETURN(0);
 }
@@ -1899,17 +1907,18 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                 goto err_req;
         }
 
-        switch(lustre_msg_get_opc(req->rq_reqmsg)) {
-        case MDS_WRITEPAGE:
-        case OST_WRITE:
-                req->rq_bulk_write = 1;
-                break;
-        case MDS_READPAGE:
-        case OST_READ:
-        case MGS_CONFIG_READ:
-                req->rq_bulk_read = 1;
-                break;
-        }
+       switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       case MDS_WRITEPAGE:
+       case OST_WRITE:
+       case OUT_UPDATE:
+               req->rq_bulk_write = 1;
+               break;
+       case MDS_READPAGE:
+       case OST_READ:
+       case MGS_CONFIG_READ:
+               req->rq_bulk_read = 1;
+               break;
+       }
 
         CDEBUG(D_RPCTRACE, "got req x"LPU64"\n", req->rq_xid);
 
@@ -1940,6 +1949,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                     MSGHDR_AT_SUPPORT) ?
                    /* The max time the client expects us to take */
                    lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
+
         req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
         if (unlikely(deadline == 0)) {
                 DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
@@ -2738,7 +2748,6 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 
        EXIT;
 }
-EXPORT_SYMBOL(ptlrpc_stop_all_threads);
 
 int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
@@ -2770,7 +2779,6 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
        ptlrpc_stop_all_threads(svc);
        RETURN(rc);
 }
-EXPORT_SYMBOL(ptlrpc_start_threads);
 
 int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
@@ -2885,6 +2893,7 @@ int ptlrpc_hr_init(void)
        int                             rc;
        int                             i;
        int                             j;
+       int                             weight;
        ENTRY;
 
        memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
@@ -2897,6 +2906,8 @@ int ptlrpc_hr_init(void)
 
        init_waitqueue_head(&ptlrpc_hr.hr_waitq);
 
+       weight = cfs_cpu_ht_nsiblings(0);
+
        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
                hrp->hrp_cpt = i;
 
@@ -2904,7 +2915,7 @@ int ptlrpc_hr_init(void)
                atomic_set(&hrp->hrp_nstopped, 0);
 
                hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i);
-               hrp->hrp_nthrs /= cfs_cpu_ht_nsiblings(0);
+               hrp->hrp_nthrs /= weight;
 
                LASSERT(hrp->hrp_nthrs > 0);
                OBD_CPT_ALLOC(hrp->hrp_thrs, ptlrpc_hr.hr_cpt_table, i,
@@ -3180,7 +3191,7 @@ EXPORT_SYMBOL(ptlrpc_unregister_service);
  * Right now, it just checks to make sure that requests aren't languishing
  * in the queue.  We'll use this health check to govern whether a node needs
  * to be shot, so it's intentionally non-aggressive. */
-int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
+static int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
 {
        struct ptlrpc_request           *request = NULL;
        struct timeval                  right_now;