Whamcloud - gitweb
LU-9679 ptlrpc: use OBD_ALLOC_PTR_ARRAY() and FREE
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
index 8a05a46..b3a06a5 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -82,7 +82,8 @@ struct ptlrpcd {
  */
 static int max_ptlrpcds;
 module_param(max_ptlrpcds, int, 0644);
-MODULE_PARM_DESC(max_ptlrpcds, "Max ptlrpcd thread count to be started.");
+MODULE_PARM_DESC(max_ptlrpcds,
+                "Max ptlrpcd thread count to be started (obsolete).");
 
 /*
  * ptlrpcd_bind_policy is obsolete, but retained to ensure that
@@ -100,8 +101,9 @@ MODULE_PARM_DESC(ptlrpcd_bind_policy,
  * in a CPT.
  */
 static int ptlrpcd_per_cpt_max;
+module_param(ptlrpcd_per_cpt_max, int, 0644);
 MODULE_PARM_DESC(ptlrpcd_per_cpt_max,
-                "Max ptlrpcd thread count to be started per cpt.");
+                "Max ptlrpcd thread count to be started per CPT.");
 
 /*
  * ptlrpcd_partner_group_size: The desired number of threads in each
@@ -173,7 +175,7 @@ ptlrpcd_select_pc(struct ptlrpc_request *req)
        if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL)
                return &ptlrpcd_rcv;
 
-       cpt = cfs_cpt_current(cfs_cpt_table, 1);
+       cpt = cfs_cpt_current(cfs_cpt_tab, 1);
        if (ptlrpcds_cpt_idx == NULL)
                idx = cpt;
        else
@@ -210,7 +212,7 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
 
                LASSERT(req->rq_phase == RQ_PHASE_NEW);
                req->rq_set = new;
-               req->rq_queued_time = cfs_time_current();
+               req->rq_queued_time = ktime_get_seconds();
        }
 
        spin_lock(&new->set_new_req_lock);
@@ -222,9 +224,11 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
        if (count == i) {
                wake_up(&new->set_waitq);
 
-               /* XXX: It maybe unnecessary to wakeup all the partners. But to
+               /*
+                * XXX: It maybe unnecessary to wakeup all the partners. But to
                 *      guarantee the async RPC can be processed ASAP, we have
-                *      no other better choice. It maybe fixed in future. */
+                *      no other better choice. It maybe fixed in future.
+                */
                for (i = 0; i < pc->pc_npartners; i++)
                        wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
        }
@@ -234,7 +238,7 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
  * Return transferred RPCs count.
  */
 static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
-                               struct ptlrpc_request_set *src)
+                              struct ptlrpc_request_set *src)
 {
        struct list_head *tmp, *pos;
        struct ptlrpc_request *req;
@@ -270,15 +274,18 @@ void ptlrpcd_add_req(struct ptlrpc_request *req)
 
        spin_lock(&req->rq_lock);
        if (req->rq_invalid_rqset) {
-               struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5),
-                                                    back_to_sleep, NULL);
-
                req->rq_invalid_rqset = 0;
                spin_unlock(&req->rq_lock);
-               l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
+               if (wait_event_idle_timeout(req->rq_set_waitq,
+                                           req->rq_set == NULL,
+                                           cfs_time_seconds(5)) == 0)
+                       l_wait_event_abortable(req->rq_set_waitq,
+                                              req->rq_set == NULL);
        } else if (req->rq_set) {
-               /* If we have a vaid "rq_set", just reuse it to avoid double
-                * linked. */
+               /*
+                * If we have a vaid "rq_set", just reuse it to avoid double
+                * linked.
+                */
                LASSERT(req->rq_phase == RQ_PHASE_NEW);
                LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
 
@@ -293,7 +300,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req)
 
        pc = ptlrpcd_select_pc(req);
 
-       DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
+       DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s+%d]",
                  req, pc->pc_name, pc->pc_index);
 
        ptlrpc_set_add_new_req(pc, req);
@@ -312,11 +319,12 @@ static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
 static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
 {
        struct list_head *tmp, *pos;
-        struct ptlrpc_request *req;
-        struct ptlrpc_request_set *set = pc->pc_set;
-        int rc = 0;
-        int rc2;
-        ENTRY;
+       struct ptlrpc_request *req;
+       struct ptlrpc_request_set *set = pc->pc_set;
+       int rc = 0;
+       int rc2;
+
+       ENTRY;
 
        if (atomic_read(&set->set_new_count)) {
                spin_lock(&set->set_new_req_lock);
@@ -334,7 +342,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
                spin_unlock(&set->set_new_req_lock);
        }
 
-       /* We should call lu_env_refill() before handling new requests to make
+       /*
+        * We should call lu_env_refill() before handling new requests to make
         * sure that env key the requests depending on really exists.
         */
        rc2 = lu_env_refill(env);
@@ -356,8 +365,10 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
        if (atomic_read(&set->set_remaining))
                rc |= ptlrpc_check_set(env, set);
 
-       /* NB: ptlrpc_check_set has already moved complted request at the
-        * head of seq::set_requests */
+       /*
+        * NB: ptlrpc_check_set has already moved complted request at the
+        * head of seq::set_requests
+        */
        list_for_each_safe(pos, tmp, &set->set_requests) {
                req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
                if (req->rq_phase != RQ_PHASE_COMPLETE)
@@ -374,19 +385,21 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
                 */
                rc = atomic_read(&set->set_new_count);
 
-                /* If we have nothing to do, check whether we can take some
-                 * work from our partner threads. */
-                if (rc == 0 && pc->pc_npartners > 0) {
-                        struct ptlrpcd_ctl *partner;
-                        struct ptlrpc_request_set *ps;
-                        int first = pc->pc_cursor;
-
-                        do {
-                                partner = pc->pc_partners[pc->pc_cursor++];
-                                if (pc->pc_cursor >= pc->pc_npartners)
-                                        pc->pc_cursor = 0;
-                                if (partner == NULL)
-                                        continue;
+               /*
+                * If we have nothing to do, check whether we can take some
+                * work from our partner threads.
+                */
+               if (rc == 0 && pc->pc_npartners > 0) {
+                       struct ptlrpcd_ctl *partner;
+                       struct ptlrpc_request_set *ps;
+                       int first = pc->pc_cursor;
+
+                       do {
+                               partner = pc->pc_partners[pc->pc_cursor++];
+                               if (pc->pc_cursor >= pc->pc_npartners)
+                                       pc->pc_cursor = 0;
+                               if (partner == NULL)
+                                       continue;
 
                                spin_lock(&partner->pc_lock);
                                ps = partner->pc_set;
@@ -401,8 +414,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
                                if (atomic_read(&ps->set_new_count)) {
                                        rc = ptlrpcd_steal_rqset(set, ps);
                                        if (rc > 0)
-                                               CDEBUG(D_RPCTRACE, "transfer %d"
-                                                      " async RPCs [%d->%d]\n",
+                                               CDEBUG(D_RPCTRACE,
+                                                      "transfer %d async RPCs [%d->%d]\n",
                                                       rc, partner->pc_index,
                                                       pc->pc_index);
                                }
@@ -411,14 +424,13 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
                }
        }
 
-       RETURN(rc);
+       RETURN(rc || test_bit(LIOD_STOP, &pc->pc_flags));
 }
 
 /**
  * Main ptlrpcd thread.
  * ptlrpc's code paths like to execute in process context, so we have this
  * thread which spins on a set which contains the rpcs and sends them.
- *
  */
 static int ptlrpcd(void *arg)
 {
@@ -428,11 +440,12 @@ static int ptlrpcd(void *arg)
        struct lu_env                   env = { .le_ses = &ses };
        int                             rc = 0;
        int                             exit = 0;
+
        ENTRY;
 
        unshare_fs_struct();
 
-       if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0)
+       if (cfs_cpt_bind(cfs_cpt_tab, pc->pc_cpt) != 0)
                CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
 
        /*
@@ -466,23 +479,27 @@ static int ptlrpcd(void *arg)
 
        complete(&pc->pc_starting);
 
-        /*
-         * This mainloop strongly resembles ptlrpc_set_wait() except that our
-         * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
-         * there are requests in the set. New requests come in on the set's
-         * new_req_list and ptlrpcd_check() moves them into the set.
-         */
-        do {
-                struct l_wait_info lwi;
-                int timeout;
+       /*
+        * This mainloop strongly resembles ptlrpc_set_wait() except that our
+        * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
+        * there are requests in the set. New requests come in on the set's
+        * new_req_list and ptlrpcd_check() moves them into the set.
+        */
+       do {
+               time64_t timeout;
 
-                timeout = ptlrpc_set_next_timeout(set);
-                lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
-                                  ptlrpc_expired_set, set);
+               timeout = ptlrpc_set_next_timeout(set);
 
                lu_context_enter(&env.le_ctx);
                lu_context_enter(env.le_ses);
-               l_wait_event(set->set_waitq, ptlrpcd_check(&env, pc), &lwi);
+               if (timeout == 0)
+                       wait_event_idle(set->set_waitq,
+                                       ptlrpcd_check(&env, pc));
+               else if (wait_event_idle_timeout(set->set_waitq,
+                                                ptlrpcd_check(&env, pc),
+                                                cfs_time_seconds(timeout))
+                        == 0)
+                       ptlrpc_expired_set(set);
                lu_context_exit(&env.le_ctx);
                lu_context_exit(env.le_ses);
 
@@ -491,21 +508,21 @@ static int ptlrpcd(void *arg)
                 */
                if (test_bit(LIOD_STOP, &pc->pc_flags)) {
                        if (test_bit(LIOD_FORCE, &pc->pc_flags))
-                                ptlrpc_abort_set(set);
-                        exit++;
-                }
-
-                /*
-                 * Let's make one more loop to make sure that ptlrpcd_check()
-                 * copied all raced new rpcs into the set so we can kill them.
-                 */
-        } while (exit < 2);
-
-        /*
-         * Wait for inflight requests to drain.
-         */
+                               ptlrpc_abort_set(set);
+                       exit++;
+               }
+
+               /*
+                * Let's make one more loop to make sure that ptlrpcd_check()
+                * copied all raced new rpcs into the set so we can kill them.
+                */
+       } while (exit < 2);
+
+       /*
+        * Wait for inflight requests to drain.
+        */
        if (!list_empty(&set->set_requests))
-                ptlrpc_set_wait(set);
+               ptlrpc_set_wait(&env, set);
        lu_context_fini(&env.le_ctx);
        lu_context_fini(env.le_ses);
 
@@ -565,6 +582,7 @@ static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
        int                     first;
        int                     i;
        int                     rc = 0;
+
        ENTRY;
 
        LASSERT(index >= 0 && index < pd->pd_nthreads);
@@ -574,7 +592,7 @@ static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
        if (pc->pc_npartners <= 0)
                GOTO(out, rc);
 
-       OBD_CPT_ALLOC(pc->pc_partners, cfs_cpt_table, pc->pc_cpt,
+       OBD_CPT_ALLOC(pc->pc_partners, cfs_cpt_tab, pc->pc_cpt,
                      sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
        if (pc->pc_partners == NULL) {
                pc->pc_npartners = 0;
@@ -595,6 +613,7 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc)
 {
        struct task_struct      *task;
        int                     rc = 0;
+
        ENTRY;
 
        /*
@@ -606,15 +625,6 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc)
                RETURN(0);
        }
 
-       /*
-        * So far only "client" ptlrpcd uses an environment. In the future,
-        * ptlrpcd thread (or a thread-set) has to be given an argument,
-        * describing its "scope".
-        */
-       rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
-       if (rc != 0)
-               GOTO(out, rc);
-
        task = kthread_run(ptlrpcd, pc, pc->pc_name);
        if (IS_ERR(task))
                GOTO(out_set, rc = PTR_ERR(task));
@@ -635,8 +645,6 @@ out_set:
                spin_unlock(&pc->pc_lock);
                ptlrpc_set_destroy(set);
        }
-       lu_context_fini(&pc->pc_env.le_ctx);
-out:
        clear_bit(LIOD_START, &pc->pc_flags);
        RETURN(rc);
 }
@@ -662,6 +670,7 @@ out:
 void ptlrpcd_free(struct ptlrpcd_ctl *pc)
 {
        struct ptlrpc_request_set *set = pc->pc_set;
+
        ENTRY;
 
        if (!test_bit(LIOD_START, &pc->pc_flags)) {
@@ -670,7 +679,6 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc)
        }
 
        wait_for_completion(&pc->pc_finishing);
-       lu_context_fini(&pc->pc_env.le_ctx);
 
        spin_lock(&pc->pc_lock);
        pc->pc_set = NULL;
@@ -682,16 +690,15 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc)
        clear_bit(LIOD_FORCE, &pc->pc_flags);
 
 out:
-        if (pc->pc_npartners > 0) {
-                LASSERT(pc->pc_partners != NULL);
-
-                OBD_FREE(pc->pc_partners,
-                         sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
-                pc->pc_partners = NULL;
-        }
-        pc->pc_npartners = 0;
+       if (pc->pc_npartners > 0) {
+               LASSERT(pc->pc_partners != NULL);
+
+               OBD_FREE_PTR_ARRAY(pc->pc_partners, pc->pc_npartners);
+               pc->pc_partners = NULL;
+       }
+       pc->pc_npartners = 0;
        pc->pc_error = 0;
-        EXIT;
+       EXIT;
 }
 
 static void ptlrpcd_fini(void)
@@ -699,6 +706,7 @@ static void ptlrpcd_fini(void)
        int     i;
        int     j;
        int     ncpts;
+
        ENTRY;
 
        if (ptlrpcds != NULL) {
@@ -712,7 +720,7 @@ static void ptlrpcd_fini(void)
                        OBD_FREE(ptlrpcds[i], ptlrpcds[i]->pd_size);
                        ptlrpcds[i] = NULL;
                }
-               OBD_FREE(ptlrpcds, sizeof(ptlrpcds[0]) * ptlrpcds_num);
+               OBD_FREE_PTR_ARRAY(ptlrpcds, ptlrpcds_num);
        }
        ptlrpcds_num = 0;
 
@@ -720,8 +728,8 @@ static void ptlrpcd_fini(void)
        ptlrpcd_free(&ptlrpcd_rcv);
 
        if (ptlrpcds_cpt_idx != NULL) {
-               ncpts = cfs_cpt_number(cfs_cpt_table);
-               OBD_FREE(ptlrpcds_cpt_idx, ncpts * sizeof(ptlrpcds_cpt_idx[0]));
+               ncpts = cfs_cpt_number(cfs_cpt_tab);
+               OBD_FREE_PTR_ARRAY(ptlrpcds_cpt_idx, ncpts);
                ptlrpcds_cpt_idx = NULL;
        }
 
@@ -741,15 +749,16 @@ static int ptlrpcd_init(void)
        int                     ncpts;
        int                     cpt;
        struct ptlrpcd          *pd;
+
        ENTRY;
 
        /*
         * Determine the CPTs that ptlrpcd threads will run on.
         */
-       cptable = cfs_cpt_table;
+       cptable = cfs_cpt_tab;
        ncpts = cfs_cpt_number(cptable);
        if (ptlrpcd_cpts != NULL) {
-               struct cfs_expr_list    *el;
+               struct cfs_expr_list *el;
 
                size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
                OBD_ALLOC(ptlrpcds_cpt_idx, size);
@@ -894,6 +903,7 @@ static int ptlrpcd_init(void)
 
                size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
                OBD_CPT_ALLOC(pd, cptable, cpt, size);
+
                if (!pd)
                        GOTO(out, rc = -ENOMEM);
                pd->pd_size      = size;
@@ -950,25 +960,26 @@ out:
 
 int ptlrpcd_addref(void)
 {
-        int rc = 0;
-        ENTRY;
+       int rc = 0;
+
+       ENTRY;
 
        mutex_lock(&ptlrpcd_mutex);
-        if (++ptlrpcd_users == 1) {
+       if (++ptlrpcd_users == 1) {
                rc = ptlrpcd_init();
                if (rc < 0)
                        ptlrpcd_users--;
        }
        mutex_unlock(&ptlrpcd_mutex);
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(ptlrpcd_addref);
 
 void ptlrpcd_decref(void)
 {
        mutex_lock(&ptlrpcd_mutex);
-        if (--ptlrpcd_users == 0)
-                ptlrpcd_fini();
+       if (--ptlrpcd_users == 0)
+               ptlrpcd_fini();
        mutex_unlock(&ptlrpcd_mutex);
 }
 EXPORT_SYMBOL(ptlrpcd_decref);