Whamcloud - gitweb
LU-5710 all: second batch of corrected typos and grammar errors
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 069e05b..8609e3f 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -35,9 +35,6 @@
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
-#ifndef __KERNEL__
-#include <liblustre.h>
-#endif
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
@@ -72,7 +69,7 @@ struct list_head ptlrpc_all_services;
 /** Used to protect the \e ptlrpc_all_services list */
 struct mutex ptlrpc_all_services_mutex;
 
-struct ptlrpc_request_buffer_desc *
+static struct ptlrpc_request_buffer_desc *
 ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
 {
        struct ptlrpc_service             *svc = svcpt->scp_service;
@@ -102,7 +99,7 @@ ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
        return rqbd;
 }
 
-void
+static void
 ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 {
        struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
@@ -119,7 +116,7 @@ ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
        OBD_FREE_PTR(rqbd);
 }
 
-int
+static int
 ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
 {
        struct ptlrpc_service             *svc = svcpt->scp_service;
@@ -204,7 +201,6 @@ ptlrpc_save_lock(struct ptlrpc_request *req,
 }
 EXPORT_SYMBOL(ptlrpc_save_lock);
 
-#ifdef __KERNEL__
 
 struct ptlrpc_hr_partition;
 
@@ -365,14 +361,6 @@ static void rs_batch_fini(struct rs_batch *b)
 
 #define DECLARE_RS_BATCH(b)     struct rs_batch b
 
-#else /* __KERNEL__ */
-
-#define rs_batch_init(b)        do{}while(0)
-#define rs_batch_fini(b)        do{}while(0)
-#define rs_batch_add(b, r)      ptlrpc_schedule_difficult_reply(r)
-#define DECLARE_RS_BATCH(b)
-
-#endif /* __KERNEL__ */
 
 /**
  * Put reply state into a queue for processing because we received
@@ -380,7 +368,6 @@ static void rs_batch_fini(struct rs_batch *b)
  */
 void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
 {
-#ifdef __KERNEL__
        struct ptlrpc_hr_thread *hrt;
        ENTRY;
 
@@ -394,9 +381,6 @@ void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
 
        wake_up(&hrt->hrt_waitq);
        EXIT;
-#else
-       list_add_tail(&rs->rs_list, &rs->rs_svcpt->scp_rep_queue);
-#endif
 }
 
 void
@@ -447,7 +431,6 @@ void ptlrpc_commit_replies(struct obd_export *exp)
        rs_batch_fini(&batch);
        EXIT;
 }
-EXPORT_SYMBOL(ptlrpc_commit_replies);
 
 static int
 ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
@@ -511,7 +494,6 @@ static void
 ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
                             struct ptlrpc_service_conf *conf)
 {
-#ifdef __KERNEL__
        struct ptlrpc_service_thr_conf  *tc = &conf->psc_thr;
        unsigned                        init;
        unsigned                        total;
@@ -608,7 +590,6 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
                       svc->srv_name, nthrs * svc->srv_ncpts,
                       tc->tc_nthrs_max);
        }
-#endif
 }
 
 /**
@@ -642,9 +623,6 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc,
        /* reply states */
        spin_lock_init(&svcpt->scp_rep_lock);
        INIT_LIST_HEAD(&svcpt->scp_rep_active);
-#ifndef __KERNEL__
-       INIT_LIST_HEAD(&svcpt->scp_rep_queue);
-#endif
        INIT_LIST_HEAD(&svcpt->scp_rep_idle);
        init_waitqueue_head(&svcpt->scp_rep_waitq);
        atomic_set(&svcpt->scp_nreps_difficult, 0);
@@ -837,14 +815,12 @@ ptlrpc_register_service(struct ptlrpc_service_conf *conf,
        CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
               service->srv_name, service->srv_req_portal);
 
-#ifdef __KERNEL__
        rc = ptlrpc_start_threads(service);
        if (rc != 0) {
                CERROR("Failed to start threads for service %s: %d\n",
                       service->srv_name, rc);
                GOTO(failed, rc);
        }
-#endif
 
        RETURN(service);
 failed:
@@ -1031,7 +1007,7 @@ static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
 }
 
 /**
- * to finish a active request: stop sending more early replies, and release
+ * to finish an active request: stop sending more early replies, and release
  * the request. should be called after we finished handling the request.
  */
 static void ptlrpc_server_finish_active_request(
@@ -1058,7 +1034,7 @@ static void ptlrpc_server_finish_active_request(
  * This function is only called when some export receives a message (i.e.,
  * the network is up.)
  */
-static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
+void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 {
         struct obd_export *oldest_exp;
         time_t oldest_time, new_time;
@@ -1283,11 +1259,18 @@ ptlrpc_at_remove_timed(struct ptlrpc_request *req)
 static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 {
        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-        struct ptlrpc_request *reqcopy;
-        struct lustre_msg *reqmsg;
-        cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
-        int rc;
-        ENTRY;
+       struct ptlrpc_request *reqcopy;
+       struct lustre_msg *reqmsg;
+       cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
+       time_t  newdl;
+       int rc;
+
+       ENTRY;
+
+       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
+               /* don't send early reply */
+               RETURN(1);
+       }
 
         /* deadline is when the client expects us to reply, margin is the
            difference between clients' and servers' expectations */
@@ -1315,17 +1298,20 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
                 RETURN(-ENOSYS);
         }
 
-        if (req->rq_export &&
-            lustre_msg_get_flags(req->rq_reqmsg) &
-            (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
-                /* During recovery, we don't want to send too many early
-                 * replies, but on the other hand we want to make sure the
-                 * client has enough time to resend if the rpc is lost. So
-                 * during the recovery period send at least 4 early replies,
-                 * spacing them every at_extra if we can. at_estimate should
-                 * always equal this fixed value during recovery. */
-               at_measured(&svcpt->scp_at_estimate, min(at_extra,
-                           req->rq_export->exp_obd->obd_recovery_timeout / 4));
+       if (req->rq_export &&
+           lustre_msg_get_flags(req->rq_reqmsg) &
+           (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
+               /* During recovery, we don't want to send too many early
+                * replies, but on the other hand we want to make sure the
+                * client has enough time to resend if the rpc is lost. So
+                * during the recovery period send at least 4 early replies,
+                * spacing them every at_extra if we can. at_estimate should
+                * always equal this fixed value during recovery. */
+               /* Don't account request processing time into AT history
+                * during recovery, it is not service time we need but
+                * includes also waiting time for recovering clients */
+               newdl = cfs_time_current_sec() + min(at_extra,
+                       req->rq_export->exp_obd->obd_recovery_timeout / 4);
        } else {
                /* We want to extend the request deadline by at_extra seconds,
                 * so we set our service estimate to reflect how much time has
@@ -1337,18 +1323,17 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
                at_measured(&svcpt->scp_at_estimate, at_extra +
                            cfs_time_current_sec() -
                            req->rq_arrival_time.tv_sec);
+               newdl = req->rq_arrival_time.tv_sec +
+                       at_get(&svcpt->scp_at_estimate);
+       }
 
-               /* Check to see if we've actually increased the deadline -
-                * we may be past adaptive_max */
-               if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
-                   at_get(&svcpt->scp_at_estimate)) {
-                       DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
-                                 "(%ld/%ld), not sending early reply\n",
-                                 olddl, req->rq_arrival_time.tv_sec +
-                                 at_get(&svcpt->scp_at_estimate) -
-                                 cfs_time_current_sec());
-                       RETURN(-ETIMEDOUT);
-               }
+       /* Check to see if we've actually increased the deadline -
+        * we may be past adaptive_max */
+       if (req->rq_deadline >= newdl) {
+               DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
+                         "(%ld/%ld), not sending early reply\n",
+                         olddl, newdl - cfs_time_current_sec());
+               RETURN(-ETIMEDOUT);
        }
 
        reqcopy = ptlrpc_request_cache_alloc(GFP_NOFS);
@@ -1369,6 +1354,14 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         reqcopy->rq_reqmsg = reqmsg;
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
 
+       /*
+        * tgt_brw_read() and tgt_brw_write() may have decided not to reply.
+        * Without this check, we would fail the rq_no_reply assertion in
+        * ptlrpc_send_reply().
+        */
+       if (reqcopy->rq_no_reply)
+               GOTO(out, rc = -ETIMEDOUT);
+
        LASSERT(atomic_read(&req->rq_refcount));
        /** if it is last refcount then early reply isn't needed */
        if (atomic_read(&req->rq_refcount) == 1) {
@@ -1397,8 +1390,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 
        if (!rc) {
                /* Adjust our own deadline to what we told the client */
-               req->rq_deadline = req->rq_arrival_time.tv_sec +
-                                  at_get(&svcpt->scp_at_estimate);
+               req->rq_deadline = newdl;
                req->rq_early_count++; /* number sent, server side */
        } else {
                DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
@@ -1748,10 +1740,6 @@ static bool ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
                                       bool force)
 {
        int running = svcpt->scp_nthrs_running;
-#ifndef __KERNEL__
-       if (1) /* always allow to handle normal request for liblustre */
-               return true;
-#endif
        if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
                     CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
                /* leave just 1 thread for normal RPCs */
@@ -1809,13 +1797,6 @@ ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, bool force)
        ENTRY;
 
        spin_lock(&svcpt->scp_req_lock);
-#ifndef __KERNEL__
-       /* !@%$# liblustre only has 1 thread */
-       if (atomic_read(&svcpt->scp_nreps_difficult) != 0) {
-               spin_unlock(&svcpt->scp_req_lock);
-               RETURN(NULL);
-       }
-#endif
 
        if (ptlrpc_server_high_pending(svcpt, force)) {
                req = ptlrpc_nrs_req_get_nolock(svcpt, true, force);
@@ -1971,6 +1952,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                     MSGHDR_AT_SUPPORT) ?
                    /* The max time the client expects us to take */
                    lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
+
         req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
         if (unlikely(deadline == 0)) {
                 DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
@@ -2269,81 +2251,6 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
        RETURN(1);
 }
 
-#ifndef __KERNEL__
-
-/**
- * Check whether given service has a reply available for processing
- * and process it.
- *
- * \param svc a ptlrpc service
- * \retval 0 no replies processed
- * \retval 1 one reply processed
- */
-static int
-ptlrpc_server_handle_reply(struct ptlrpc_service_part *svcpt)
-{
-       struct ptlrpc_reply_state *rs = NULL;
-       ENTRY;
-
-       spin_lock(&svcpt->scp_rep_lock);
-       if (!list_empty(&svcpt->scp_rep_queue)) {
-               rs = list_entry(svcpt->scp_rep_queue.prev,
-                                   struct ptlrpc_reply_state,
-                                   rs_list);
-               list_del_init(&rs->rs_list);
-       }
-       spin_unlock(&svcpt->scp_rep_lock);
-       if (rs != NULL)
-               ptlrpc_handle_rs(rs);
-       RETURN(rs != NULL);
-}
-
-/* FIXME make use of timeout later */
-int
-liblustre_check_services (void *arg)
-{
-       int  did_something = 0;
-       int  rc;
-       struct list_head *tmp, *nxt;
-       ENTRY;
-
-       /* I'm relying on being single threaded, not to have to lock
-        * ptlrpc_all_services etc */
-       list_for_each_safe(tmp, nxt, &ptlrpc_all_services) {
-               struct ptlrpc_service *svc =
-                       list_entry(tmp, struct ptlrpc_service, srv_list);
-               struct ptlrpc_service_part *svcpt;
-
-               LASSERT(svc->srv_ncpts == 1);
-               svcpt = svc->srv_parts[0];
-
-               if (svcpt->scp_nthrs_running != 0)     /* I've recursed */
-                       continue;
-
-               /* service threads can block for bulk, so this limits us
-                * (arbitrarily) to recursing 1 stack frame per service.
-                * Note that the problem with recursion is that we have to
-                * unwind completely before our caller can resume. */
-
-               svcpt->scp_nthrs_running++;
-
-               do {
-                       rc = ptlrpc_server_handle_req_in(svcpt, NULL);
-                       rc |= ptlrpc_server_handle_reply(svcpt);
-                       rc |= ptlrpc_at_check_timed(svcpt);
-                       rc |= ptlrpc_server_handle_request(svcpt, NULL);
-                       rc |= (ptlrpc_server_post_idle_rqbds(svcpt) > 0);
-                       did_something |= rc;
-               } while (rc);
-
-               svcpt->scp_nthrs_running--;
-       }
-
-       RETURN(did_something);
-}
-#define ptlrpc_stop_all_threads(s) do {} while (0)
-
-#else /* __KERNEL__ */
 
 static void
 ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt)
@@ -2478,9 +2385,7 @@ static int ptlrpc_main(void *arg)
        struct ptlrpc_service_part      *svcpt = thread->t_svcpt;
        struct ptlrpc_service           *svc = svcpt->scp_service;
        struct ptlrpc_reply_state       *rs;
-#ifdef WITH_GROUP_INFO
        struct group_info *ginfo = NULL;
-#endif
        struct lu_env *env;
        int counter = 0, rc = 0;
        ENTRY;
@@ -2497,7 +2402,6 @@ static int ptlrpc_main(void *arg)
                      svc->srv_name, thread->t_name, svcpt->scp_cpt);
        }
 
-#ifdef WITH_GROUP_INFO
        ginfo = groups_alloc(0);
        if (!ginfo) {
                rc = -ENOMEM;
@@ -2506,7 +2410,6 @@ static int ptlrpc_main(void *arg)
 
        set_current_groups(ginfo);
        put_group_info(ginfo);
-#endif
 
        if (svc->srv_ops.so_thr_init != NULL) {
                rc = svc->srv_ops.so_thr_init(thread);
@@ -2848,7 +2751,6 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 
        EXIT;
 }
-EXPORT_SYMBOL(ptlrpc_stop_all_threads);
 
 int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
@@ -2880,7 +2782,6 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
        ptlrpc_stop_all_threads(svc);
        RETURN(rc);
 }
-EXPORT_SYMBOL(ptlrpc_start_threads);
 
 int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
@@ -2995,6 +2896,7 @@ int ptlrpc_hr_init(void)
        int                             rc;
        int                             i;
        int                             j;
+       int                             weight;
        ENTRY;
 
        memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
@@ -3007,6 +2909,8 @@ int ptlrpc_hr_init(void)
 
        init_waitqueue_head(&ptlrpc_hr.hr_waitq);
 
+       weight = cfs_cpu_ht_nsiblings(0);
+
        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
                hrp->hrp_cpt = i;
 
@@ -3014,7 +2918,7 @@ int ptlrpc_hr_init(void)
                atomic_set(&hrp->hrp_nstopped, 0);
 
                hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i);
-               hrp->hrp_nthrs /= cfs_cpu_ht_nsiblings(0);
+               hrp->hrp_nthrs /= weight;
 
                LASSERT(hrp->hrp_nthrs > 0);
                OBD_CPT_ALLOC(hrp->hrp_thrs, ptlrpc_hr.hr_cpt_table, i,
@@ -3061,7 +2965,6 @@ void ptlrpc_hr_fini(void)
        ptlrpc_hr.hr_partitions = NULL;
 }
 
-#endif /* __KERNEL__ */
 
 /**
  * Wait until all already scheduled replies are processed.
@@ -3291,7 +3194,7 @@ EXPORT_SYMBOL(ptlrpc_unregister_service);
  * Right now, it just checks to make sure that requests aren't languishing
  * in the queue.  We'll use this health check to govern whether a node needs
  * to be shot, so it's intentionally non-aggressive. */
-int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
+static int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
 {
        struct ptlrpc_request           *request = NULL;
        struct timeval                  right_now;