Whamcloud - gitweb
LU-2624 ptlrpc: improve stop of ptlrpcd threads
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
index 861e3cd..669b0d7 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -92,7 +92,7 @@ CFS_MODULE_PARM(ptlrpcd_bind_policy, "i", int, 0644,
 #endif
 static struct ptlrpcd *ptlrpcds;
 
-cfs_mutex_t ptlrpcd_mutex;
+struct mutex ptlrpcd_mutex;
 static int ptlrpcd_users = 0;
 
 void ptlrpcd_wake(struct ptlrpc_request *req)
@@ -183,12 +183,12 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
         }
 
 #ifdef __KERNEL__
-        cfs_spin_lock(&new->set_new_req_lock);
-        cfs_list_splice_init(&set->set_requests, &new->set_new_requests);
-        i = cfs_atomic_read(&set->set_remaining);
-        count = cfs_atomic_add_return(i, &new->set_new_count);
-        cfs_atomic_set(&set->set_remaining, 0);
-        cfs_spin_unlock(&new->set_new_req_lock);
+       spin_lock(&new->set_new_req_lock);
+       cfs_list_splice_init(&set->set_requests, &new->set_new_requests);
+       i = cfs_atomic_read(&set->set_remaining);
+       count = cfs_atomic_add_return(i, &new->set_new_count);
+       cfs_atomic_set(&set->set_remaining, 0);
+       spin_unlock(&new->set_new_req_lock);
         if (count == i) {
                 cfs_waitq_signal(&new->set_waitq);
 
@@ -213,7 +213,7 @@ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
         struct ptlrpc_request *req;
         int rc = 0;
 
-        cfs_spin_lock(&src->set_new_req_lock);
+       spin_lock(&src->set_new_req_lock);
         if (likely(!cfs_list_empty(&src->set_new_requests))) {
                 cfs_list_for_each_safe(pos, tmp, &src->set_new_requests) {
                         req = cfs_list_entry(pos, struct ptlrpc_request,
@@ -226,8 +226,8 @@ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
                 cfs_atomic_add(rc, &des->set_remaining);
                 cfs_atomic_set(&src->set_new_count, 0);
         }
-        cfs_spin_unlock(&src->set_new_req_lock);
-        return rc;
+       spin_unlock(&src->set_new_req_lock);
+       return rc;
 }
 #endif
 
@@ -242,13 +242,13 @@ void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
        if (req->rq_reqmsg)
                lustre_msg_set_jobid(req->rq_reqmsg, NULL);
 
-        cfs_spin_lock(&req->rq_lock);
+       spin_lock(&req->rq_lock);
         if (req->rq_invalid_rqset) {
                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5),
                                                      back_to_sleep, NULL);
 
                 req->rq_invalid_rqset = 0;
-                cfs_spin_unlock(&req->rq_lock);
+               spin_unlock(&req->rq_lock);
                 l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
         } else if (req->rq_set) {
                 /* If we have a vaid "rq_set", just reuse it to avoid double
@@ -258,11 +258,11 @@ void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
 
                 /* ptlrpc_check_set will decrease the count */
                 cfs_atomic_inc(&req->rq_set->set_remaining);
-                cfs_spin_unlock(&req->rq_lock);
-                cfs_waitq_signal(&req->rq_set->set_waitq);
-                return;
-        } else {
-                cfs_spin_unlock(&req->rq_lock);
+               spin_unlock(&req->rq_lock);
+               cfs_waitq_signal(&req->rq_set->set_waitq);
+               return;
+       } else {
+               spin_unlock(&req->rq_lock);
         }
 
         pc = ptlrpcd_select_pc(req, policy, idx);
@@ -293,7 +293,7 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
         ENTRY;
 
         if (cfs_atomic_read(&set->set_new_count)) {
-                cfs_spin_lock(&set->set_new_req_lock);
+               spin_lock(&set->set_new_req_lock);
                 if (likely(!cfs_list_empty(&set->set_new_requests))) {
                         cfs_list_splice_init(&set->set_new_requests,
                                              &set->set_requests);
@@ -305,7 +305,7 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
                          */
                         rc = 1;
                 }
-                cfs_spin_unlock(&set->set_new_req_lock);
+               spin_unlock(&set->set_new_req_lock);
         }
 
         /* We should call lu_env_refill() before handling new requests to make
@@ -368,15 +368,15 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
                                 if (partner == NULL)
                                         continue;
 
-                                cfs_spin_lock(&partner->pc_lock);
-                                ps = partner->pc_set;
-                                if (ps == NULL) {
-                                        cfs_spin_unlock(&partner->pc_lock);
-                                        continue;
-                                }
+                               spin_lock(&partner->pc_lock);
+                               ps = partner->pc_set;
+                               if (ps == NULL) {
+                                       spin_unlock(&partner->pc_lock);
+                                       continue;
+                               }
 
-                                ptlrpc_reqset_get(ps);
-                                cfs_spin_unlock(&partner->pc_lock);
+                               ptlrpc_reqset_get(ps);
+                               spin_unlock(&partner->pc_lock);
 
                                 if (cfs_atomic_read(&ps->set_new_count)) {
                                         rc = ptlrpcd_steal_rqset(set, ps);
@@ -410,20 +410,21 @@ static int ptlrpcd(void *arg)
         int rc, exit = 0;
         ENTRY;
 
-        cfs_daemonize_ctxt(pc->pc_name);
-#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
-        if (cfs_test_bit(LIOD_BIND, &pc->pc_flags)) {
-                int index = pc->pc_index;
+       cfs_daemonize_ctxt(pc->pc_name);
+#if defined(CONFIG_SMP) && \
+(defined(HAVE_CPUMASK_OF_NODE) || defined(HAVE_NODE_TO_CPUMASK))
+       if (test_bit(LIOD_BIND, &pc->pc_flags)) {
+               int index = pc->pc_index;
 
                 if (index >= 0 && index < cfs_num_possible_cpus()) {
-                        while (!cpu_online(index)) {
-                                if (++index >= cfs_num_possible_cpus())
-                                        index = 0;
-                        }
-                        cfs_set_cpus_allowed(cfs_current(),
-                                     node_to_cpumask(cpu_to_node(index)));
-                }
-        }
+                       while (!cpu_online(index)) {
+                               if (++index >= cfs_num_possible_cpus())
+                                       index = 0;
+                       }
+                       cfs_set_cpus_allowed(cfs_current(),
+                                    *cpumask_of_node(cpu_to_node(index)));
+               }
+       }
 #endif
         /*
          * XXX So far only "client" ptlrpcd uses an environment. In
@@ -432,7 +433,7 @@ static int ptlrpcd(void *arg)
          */
         rc = lu_context_init(&env.le_ctx,
                              LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
-        cfs_complete(&pc->pc_starting);
+       complete(&pc->pc_starting);
 
         if (rc != 0)
                 RETURN(rc);
@@ -459,8 +460,8 @@ static int ptlrpcd(void *arg)
                 /*
                  * Abort inflight rpcs for forced stop case.
                  */
-                if (cfs_test_bit(LIOD_STOP, &pc->pc_flags)) {
-                        if (cfs_test_bit(LIOD_FORCE, &pc->pc_flags))
+               if (test_bit(LIOD_STOP, &pc->pc_flags)) {
+                       if (test_bit(LIOD_FORCE, &pc->pc_flags))
                                 ptlrpc_abort_set(set);
                         exit++;
                 }
@@ -478,12 +479,7 @@ static int ptlrpcd(void *arg)
                 ptlrpc_set_wait(set);
         lu_context_fini(&env.le_ctx);
 
-        cfs_clear_bit(LIOD_START, &pc->pc_flags);
-        cfs_clear_bit(LIOD_STOP, &pc->pc_flags);
-        cfs_clear_bit(LIOD_FORCE, &pc->pc_flags);
-        cfs_clear_bit(LIOD_BIND, &pc->pc_flags);
-
-        cfs_complete(&pc->pc_finishing);
+       complete(&pc->pc_finishing);
 
         return 0;
 }
@@ -526,14 +522,13 @@ static int ptlrpcd(void *arg)
 # endif
 static int ptlrpcd_bind(int index, int max)
 {
-        struct ptlrpcd_ctl *pc;
-        int rc = 0;
-#if defined(CONFIG_NUMA) && defined(HAVE_NODE_TO_CPUMASK)
-        struct ptlrpcd_ctl *ppc;
-        int node, i, pidx;
-        cpumask_t mask;
+       struct ptlrpcd_ctl *pc;
+       int rc = 0;
+#if defined(CONFIG_NUMA) && \
+(defined(HAVE_CPUMASK_OF_NODE) || defined(HAVE_NODE_TO_CPUMASK))
+       cpumask_t mask;
 #endif
-        ENTRY;
+       ENTRY;
 
         LASSERT(index <= max - 1);
         pc = &ptlrpcds->pd_threads[index];
@@ -543,20 +538,23 @@ static int ptlrpcd_bind(int index, int max)
                 break;
         case PDB_POLICY_FULL:
                 pc->pc_npartners = 0;
-                cfs_set_bit(LIOD_BIND, &pc->pc_flags);
+               set_bit(LIOD_BIND, &pc->pc_flags);
                 break;
         case PDB_POLICY_PAIR:
                 LASSERT(max % 2 == 0);
                 pc->pc_npartners = 1;
                 break;
-        case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA) && defined(HAVE_NODE_TO_CPUMASK)
-                node = cpu_to_node(index);
-                mask = node_to_cpumask(node);
-                for (i = max; i < cfs_num_online_cpus(); i++)
-                        cpu_clear(i, mask);
-                pc->pc_npartners = cpus_weight(mask) - 1;
-                cfs_set_bit(LIOD_BIND, &pc->pc_flags);
+       case PDB_POLICY_NEIGHBOR:
+#if defined(CONFIG_NUMA) && \
+(defined(HAVE_CPUMASK_OF_NODE) || defined(HAVE_NODE_TO_CPUMASK))
+       {
+               int i;
+               mask = *cpumask_of_node(cpu_to_node(index));
+               for (i = max; i < cfs_num_online_cpus(); i++)
+                       cpu_clear(i, mask);
+               pc->pc_npartners = cpus_weight(mask) - 1;
+               set_bit(LIOD_BIND, &pc->pc_flags);
+       }
 #else
                 LASSERT(max >= 3);
                 pc->pc_npartners = 2;
@@ -577,33 +575,38 @@ static int ptlrpcd_bind(int index, int max)
                         switch (ptlrpcd_bind_policy) {
                         case PDB_POLICY_PAIR:
                                 if (index & 0x1) {
-                                        cfs_set_bit(LIOD_BIND, &pc->pc_flags);
+                                       set_bit(LIOD_BIND, &pc->pc_flags);
                                         pc->pc_partners[0] = &ptlrpcds->
                                                 pd_threads[index - 1];
                                         ptlrpcds->pd_threads[index - 1].
                                                 pc_partners[0] = pc;
                                 }
                                 break;
-                        case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA) && defined(HAVE_NODE_TO_CPUMASK)
-                                /* partners are cores in the same NUMA node.
-                                 * setup partnership only with ptlrpcd threads
-                                 * that are already initialized
-                                 */
-                                for (pidx = 0, i = 0; i < index; i++) {
-                                        if (cpu_isset(i, mask)) {
-                                                ppc = &ptlrpcds->pd_threads[i];
-                                                pc->pc_partners[pidx++] = ppc;
-                                                ppc->pc_partners[ppc->
-                                                          pc_npartners++] = pc;
-                                        }
-                                }
+                       case PDB_POLICY_NEIGHBOR:
+#if defined(CONFIG_NUMA) && \
+(defined(HAVE_CPUMASK_OF_NODE) || defined(HAVE_NODE_TO_CPUMASK))
+                       {
+                               struct ptlrpcd_ctl *ppc;
+                               int i, pidx;
+                               /* partners are cores in the same NUMA node.
+                                * setup partnership only with ptlrpcd threads
+                                * that are already initialized
+                                */
+                               for (pidx = 0, i = 0; i < index; i++) {
+                                       if (cpu_isset(i, mask)) {
+                                               ppc = &ptlrpcds->pd_threads[i];
+                                               pc->pc_partners[pidx++] = ppc;
+                                               ppc->pc_partners[ppc->
+                                                         pc_npartners++] = pc;
+                                       }
+                               }
                                 /* adjust number of partners to the number
                                  * of partnership really setup */
                                 pc->pc_npartners = pidx;
+                       }
 #else
                                 if (index & 0x1)
-                                        cfs_set_bit(LIOD_BIND, &pc->pc_flags);
+                                       set_bit(LIOD_BIND, &pc->pc_flags);
                                 if (index > 0) {
                                         pc->pc_partners[0] = &ptlrpcds->
                                                 pd_threads[index - 1];
@@ -652,7 +655,7 @@ int ptlrpcd_check_async_rpcs(void *arg)
                         /*
                          * XXX: send replay requests.
                          */
-                        if (cfs_test_bit(LIOD_RECOVERY, &pc->pc_flags))
+                       if (test_bit(LIOD_RECOVERY, &pc->pc_flags))
                                 rc = ptlrpcd_check(&pc->pc_env, pc);
                         lu_context_exit(&pc->pc_env.le_ctx);
                 }
@@ -681,16 +684,16 @@ int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
         /*
          * Do not allow start second thread for one pc.
          */
-        if (cfs_test_and_set_bit(LIOD_START, &pc->pc_flags)) {
-                CWARN("Starting second thread (%s) for same pc %p\n",
-                       name, pc);
-                RETURN(0);
-        }
-
-        pc->pc_index = index;
-        cfs_init_completion(&pc->pc_starting);
-        cfs_init_completion(&pc->pc_finishing);
-        cfs_spin_lock_init(&pc->pc_lock);
+       if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
+               CWARN("Starting second thread (%s) for same pc %p\n",
+                     name, pc);
+               RETURN(0);
+       }
+
+       pc->pc_index = index;
+       init_completion(&pc->pc_starting);
+       init_completion(&pc->pc_finishing);
+       spin_lock_init(&pc->pc_lock);
         strncpy(pc->pc_name, name, sizeof(pc->pc_name) - 1);
         pc->pc_set = ptlrpc_prep_set();
         if (pc->pc_set == NULL)
@@ -717,7 +720,7 @@ int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
                 GOTO(out, rc);
 
         rc = 0;
-        cfs_wait_for_completion(&pc->pc_starting);
+       wait_for_completion(&pc->pc_starting);
 #else
         pc->pc_wait_callback =
                 liblustre_register_wait_callback("ptlrpcd_check_async_rpcs",
@@ -732,48 +735,67 @@ out:
                 if (pc->pc_set != NULL) {
                         struct ptlrpc_request_set *set = pc->pc_set;
 
-                        cfs_spin_lock(&pc->pc_lock);
-                        pc->pc_set = NULL;
-                        cfs_spin_unlock(&pc->pc_lock);
-                        ptlrpc_set_destroy(set);
-                }
-                if (env != 0)
-                        lu_context_fini(&pc->pc_env.le_ctx);
-                cfs_clear_bit(LIOD_BIND, &pc->pc_flags);
+                       spin_lock(&pc->pc_lock);
+                       pc->pc_set = NULL;
+                       spin_unlock(&pc->pc_lock);
+                       ptlrpc_set_destroy(set);
+               }
+               if (env != 0)
+                       lu_context_fini(&pc->pc_env.le_ctx);
+               clear_bit(LIOD_BIND, &pc->pc_flags);
 #else
-                SET_BUT_UNUSED(env);
+               SET_BUT_UNUSED(env);
 #endif
-                cfs_clear_bit(LIOD_START, &pc->pc_flags);
-        }
-        RETURN(rc);
+               clear_bit(LIOD_START, &pc->pc_flags);
+       }
+       RETURN(rc);
 }
 
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
 {
-       struct ptlrpc_request_set *set = pc->pc_set;
-        ENTRY;
+       ENTRY;
 
-        if (!cfs_test_bit(LIOD_START, &pc->pc_flags)) {
-                CWARN("Thread for pc %p was not started\n", pc);
-                goto out;
-        }
+       if (!test_bit(LIOD_START, &pc->pc_flags)) {
+               CWARN("Thread for pc %p was not started\n", pc);
+               goto out;
+       }
+
+       set_bit(LIOD_STOP, &pc->pc_flags);
+       if (force)
+               set_bit(LIOD_FORCE, &pc->pc_flags);
+       cfs_waitq_signal(&pc->pc_set->set_waitq);
+
+out:
+       EXIT;
+}
+
+void ptlrpcd_free(struct ptlrpcd_ctl *pc)
+{
+       struct ptlrpc_request_set *set = pc->pc_set;
+       ENTRY;
+
+       if (!test_bit(LIOD_START, &pc->pc_flags)) {
+               CWARN("Thread for pc %p was not started\n", pc);
+               goto out;
+       }
 
-        cfs_set_bit(LIOD_STOP, &pc->pc_flags);
-        if (force)
-                cfs_set_bit(LIOD_FORCE, &pc->pc_flags);
-        cfs_waitq_signal(&pc->pc_set->set_waitq);
 #ifdef __KERNEL__
-        cfs_wait_for_completion(&pc->pc_finishing);
+       wait_for_completion(&pc->pc_finishing);
 #else
-        liblustre_deregister_wait_callback(pc->pc_wait_callback);
-        liblustre_deregister_idle_callback(pc->pc_idle_callback);
+       liblustre_deregister_wait_callback(pc->pc_wait_callback);
+       liblustre_deregister_idle_callback(pc->pc_idle_callback);
 #endif
-        lu_context_fini(&pc->pc_env.le_ctx);
+       lu_context_fini(&pc->pc_env.le_ctx);
+
+       spin_lock(&pc->pc_lock);
+       pc->pc_set = NULL;
+       spin_unlock(&pc->pc_lock);
+       ptlrpc_set_destroy(set);
 
-        cfs_spin_lock(&pc->pc_lock);
-        pc->pc_set = NULL;
-        cfs_spin_unlock(&pc->pc_lock);
-        ptlrpc_set_destroy(set);
+       clear_bit(LIOD_START, &pc->pc_flags);
+       clear_bit(LIOD_STOP, &pc->pc_flags);
+       clear_bit(LIOD_FORCE, &pc->pc_flags);
+       clear_bit(LIOD_BIND, &pc->pc_flags);
 
 out:
 #ifdef __KERNEL__
@@ -791,18 +813,21 @@ out:
 
 static void ptlrpcd_fini(void)
 {
-        int i;
-        ENTRY;
-
-        if (ptlrpcds != NULL) {
-                for (i = 0; i < ptlrpcds->pd_nthreads; i++)
-                        ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
-                ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
-                OBD_FREE(ptlrpcds, ptlrpcds->pd_size);
-                ptlrpcds = NULL;
-        }
-
-        EXIT;
+       int i;
+       ENTRY;
+
+       if (ptlrpcds != NULL) {
+               for (i = 0; i < ptlrpcds->pd_nthreads; i++)
+                       ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
+               for (i = 0; i < ptlrpcds->pd_nthreads; i++)
+                       ptlrpcd_free(&ptlrpcds->pd_threads[i]);
+               ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
+               ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
+               OBD_FREE(ptlrpcds, ptlrpcds->pd_size);
+               ptlrpcds = NULL;
+       }
+
+       EXIT;
 }
 
 static int ptlrpcd_init(void)
@@ -831,7 +856,7 @@ static int ptlrpcd_init(void)
                 GOTO(out, rc = -ENOMEM);
 
         snprintf(name, 15, "ptlrpcd_rcv");
-        cfs_set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
+       set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
         rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv);
         if (rc < 0)
                 GOTO(out, rc);
@@ -863,7 +888,10 @@ out:
         if (rc != 0 && ptlrpcds != NULL) {
                 for (j = 0; j <= i; j++)
                         ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0);
-                ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
+               for (j = 0; j <= i; j++)
+                       ptlrpcd_free(&ptlrpcds->pd_threads[j]);
+               ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
+               ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
                 OBD_FREE(ptlrpcds, size);
                 ptlrpcds = NULL;
         }
@@ -876,20 +904,20 @@ int ptlrpcd_addref(void)
         int rc = 0;
         ENTRY;
 
-        cfs_mutex_lock(&ptlrpcd_mutex);
+       mutex_lock(&ptlrpcd_mutex);
         if (++ptlrpcd_users == 1)
                 rc = ptlrpcd_init();
-        cfs_mutex_unlock(&ptlrpcd_mutex);
+       mutex_unlock(&ptlrpcd_mutex);
         RETURN(rc);
 }
 EXPORT_SYMBOL(ptlrpcd_addref);
 
 void ptlrpcd_decref(void)
 {
-        cfs_mutex_lock(&ptlrpcd_mutex);
+       mutex_lock(&ptlrpcd_mutex);
         if (--ptlrpcd_users == 0)
                 ptlrpcd_fini();
-        cfs_mutex_unlock(&ptlrpcd_mutex);
+       mutex_unlock(&ptlrpcd_mutex);
 }
 EXPORT_SYMBOL(ptlrpcd_decref);
 /** @} ptlrpcd */