Whamcloud - gitweb
LU-56 ptlrpc: Reduce at_lock dance
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index c0960e3..84f3474 100644 (file)
@@ -471,7 +471,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
        }
 
        /*
-        * NB: we will add some common at here for estimating, for example:
+        * NB: we will add some common code here for estimating, for example:
         * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate
         * threads number based on:
         *     (online_cpus * conf::tc_factor) + conf::tc_nthrs_base.
@@ -482,7 +482,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
         * availability of service.
         *
         * Also, we will need to validate threads number at here for
-        * CPT affinity service (CPU ParTiion) in the future.
+        * CPT affinity service (CPU ParTion) in the future.
         * A service can have percpt thread-pool instead of a global thread
         * pool for each service, which means user might not always get the
         * threads number they want even they set it in conf::tc_nthrs_user,
@@ -715,11 +715,12 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
         if (!cfs_atomic_dec_and_test(&req->rq_refcount))
                 return;
 
-       cfs_spin_lock(&svcpt->scp_at_lock);
        if (req->rq_at_linked) {
                struct ptlrpc_at_array *array = &svcpt->scp_at_array;
                 __u32 index = req->rq_at_index;
 
+               cfs_spin_lock(&svcpt->scp_at_lock);
+
                 LASSERT(!cfs_list_empty(&req->rq_timed_list));
                 cfs_list_del_init(&req->rq_timed_list);
                 cfs_spin_lock(&req->rq_lock);
@@ -727,10 +728,11 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                 cfs_spin_unlock(&req->rq_lock);
                 array->paa_reqs_count[index]--;
                 array->paa_count--;
-        } else
-                LASSERT(cfs_list_empty(&req->rq_timed_list));
 
-       cfs_spin_unlock(&svcpt->scp_at_lock);
+               cfs_spin_unlock(&svcpt->scp_at_lock);
+       } else {
+               LASSERT(cfs_list_empty(&req->rq_timed_list));
+       }
 
         /* finalize request */
         if (req->rq_export) {
@@ -962,10 +964,8 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt)
        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
        __s32 next;
 
-       cfs_spin_lock(&svcpt->scp_at_lock);
        if (array->paa_count == 0) {
                cfs_timer_disarm(&svcpt->scp_at_timer);
-               cfs_spin_unlock(&svcpt->scp_at_lock);
                return;
        }
 
@@ -979,7 +979,6 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt)
                CDEBUG(D_INFO, "armed %s at %+ds\n",
                       svcpt->scp_service->srv_name, next);
        }
-       cfs_spin_unlock(&svcpt->scp_at_lock);
 }
 
 /* Add rpc to early reply check list */
@@ -989,7 +988,6 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
         struct ptlrpc_request *rq = NULL;
         __u32 index;
-        int found = 0;
 
         if (AT_OFF)
                 return(0);
@@ -1031,12 +1029,9 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         array->paa_count++;
         if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
                 array->paa_deadline = req->rq_deadline;
-                found = 1;
-        }
-       cfs_spin_unlock(&svcpt->scp_at_lock);
-
-       if (found)
                ptlrpc_at_set_timer(svcpt);
+       }
+       cfs_spin_unlock(&svcpt->scp_at_lock);
 
        return 0;
 }
@@ -1208,10 +1203,10 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
        first = array->paa_deadline - now;
        if (first > at_early_margin) {
                /* We've still got plenty of time.  Reset the timer. */
-               cfs_spin_unlock(&svcpt->scp_at_lock);
                ptlrpc_at_set_timer(svcpt);
-                RETURN(0);
-        }
+               cfs_spin_unlock(&svcpt->scp_at_lock);
+               RETURN(0);
+       }
 
         /* We're close to a timeout, and we don't know how much longer the
            server will take. Send early replies to everyone expiring soon. */
@@ -1253,11 +1248,11 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
                         index = 0;
         }
         array->paa_deadline = deadline;
-       cfs_spin_unlock(&svcpt->scp_at_lock);
-
        /* we have a new earliest deadline, restart the timer */
        ptlrpc_at_set_timer(svcpt);
 
+       cfs_spin_unlock(&svcpt->scp_at_lock);
+
         CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
                "replies\n", first, at_extra, counter);
         if (first < 0) {
@@ -1746,8 +1741,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                                    at_get(&svcpt->scp_at_estimate));
         }
 
-        rc = lu_context_init(&request->rq_session,
-                             LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
+       rc = lu_context_init(&request->rq_session, LCT_SESSION | LCT_NOREF);
         if (rc) {
                 CERROR("Failure to initialize session: %d\n", rc);
                 goto out_req;
@@ -2172,8 +2166,7 @@ ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
  */
 static int ptlrpc_main(void *arg)
 {
-       struct ptlrpc_svc_data          *data = (struct ptlrpc_svc_data *)arg;
-       struct ptlrpc_thread            *thread = data->thread;
+       struct ptlrpc_thread            *thread = (struct ptlrpc_thread *)arg;
        struct ptlrpc_service_part      *svcpt = thread->t_svcpt;
        struct ptlrpc_service           *svc = svcpt->scp_service;
        struct ptlrpc_reply_state       *rs;
@@ -2185,7 +2178,7 @@ static int ptlrpc_main(void *arg)
         ENTRY;
 
         thread->t_pid = cfs_curproc_pid();
-        cfs_daemonize_ctxt(data->name);
+        cfs_daemonize_ctxt(thread->t_name);
 
 #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
         /* we need to do this before any per-thread allocation is done so that
@@ -2250,6 +2243,7 @@ static int ptlrpc_main(void *arg)
        LASSERT(thread_is_starting(thread));
        thread_clear_flags(thread, SVC_STARTING);
 
+       LASSERT(svcpt->scp_nthrs_starting == 1);
        svcpt->scp_nthrs_starting--;
 
        /* SVC_STOPPING may already be set here if someone else is trying
@@ -2260,10 +2254,7 @@ static int ptlrpc_main(void *arg)
        svcpt->scp_nthrs_running++;
        cfs_spin_unlock(&svcpt->scp_lock);
 
-       /*
-        * wake up our creator. Note: @data is invalid after this point,
-        * because it's allocated on ptlrpc_start_thread() stack.
-        */
+       /* wake up our creator in case he's still waiting. */
        cfs_waitq_signal(&thread->t_ctl_waitq);
 
        thread->t_watchdog = lc_watchdog_add(ptlrpc_server_get_timeout(svcpt),
@@ -2286,7 +2277,7 @@ static int ptlrpc_main(void *arg)
 
                if (ptlrpc_threads_need_create(svcpt)) {
                        /* Ignore return code - we tried... */
-                       ptlrpc_start_thread(svcpt);
+                       ptlrpc_start_thread(svcpt, 0);
                 }
 
                /* Process all incoming reqs before handling any */
@@ -2495,29 +2486,53 @@ static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
         RETURN(0);
 }
 
-static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt,
-                              struct ptlrpc_thread *thread)
+static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 {
-       struct l_wait_info lwi = { 0 };
+       struct l_wait_info      lwi = { 0 };
+       struct ptlrpc_thread    *thread;
+       CFS_LIST_HEAD           (zombie);
+
        ENTRY;
 
-       CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n",
-              thread, thread->t_pid);
+       CDEBUG(D_INFO, "Stopping threads for service %s\n",
+              svcpt->scp_service->srv_name);
 
        cfs_spin_lock(&svcpt->scp_lock);
        /* let the thread know that we would like it to stop asap */
-       thread_add_flags(thread, SVC_STOPPING);
-       cfs_spin_unlock(&svcpt->scp_lock);
+       list_for_each_entry(thread, &svcpt->scp_threads, t_link) {
+               CDEBUG(D_INFO, "Stopping thread %s #%u\n",
+                      svcpt->scp_service->srv_thread_name, thread->t_id);
+               thread_add_flags(thread, SVC_STOPPING);
+       }
 
        cfs_waitq_broadcast(&svcpt->scp_waitq);
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_stopped(thread), &lwi);
 
-       cfs_spin_lock(&svcpt->scp_lock);
-       cfs_list_del(&thread->t_link);
+       while (!cfs_list_empty(&svcpt->scp_threads)) {
+               thread = cfs_list_entry(svcpt->scp_threads.next,
+                                       struct ptlrpc_thread, t_link);
+               if (thread_is_stopped(thread)) {
+                       cfs_list_del(&thread->t_link);
+                       cfs_list_add(&thread->t_link, &zombie);
+                       continue;
+               }
+               cfs_spin_unlock(&svcpt->scp_lock);
+
+               CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
+                      svcpt->scp_service->srv_thread_name, thread->t_id);
+               l_wait_event(thread->t_ctl_waitq,
+                            thread_is_stopped(thread), &lwi);
+
+               cfs_spin_lock(&svcpt->scp_lock);
+       }
+
        cfs_spin_unlock(&svcpt->scp_lock);
 
-       OBD_FREE_PTR(thread);
+       while (!cfs_list_empty(&zombie)) {
+               thread = cfs_list_entry(zombie.next,
+                                       struct ptlrpc_thread, t_link);
+               cfs_list_del(&thread->t_link);
+               OBD_FREE_PTR(thread);
+       }
        EXIT;
 }
 
@@ -2526,23 +2541,10 @@ static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt,
  */
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
-       struct ptlrpc_service_part      *svcpt = svc->srv_part;
-       struct ptlrpc_thread            *thread;
        ENTRY;
 
-       LASSERT(svcpt != NULL);
-
-       cfs_spin_lock(&svcpt->scp_lock);
-       while (!cfs_list_empty(&svcpt->scp_threads)) {
-               thread = cfs_list_entry(svcpt->scp_threads.next,
-                                       struct ptlrpc_thread, t_link);
-
-               cfs_spin_unlock(&svcpt->scp_lock);
-               ptlrpc_stop_thread(svcpt, thread);
-               cfs_spin_lock(&svcpt->scp_lock);
-       }
-
-       cfs_spin_unlock(&svcpt->scp_lock);
+       if (svc != NULL && svc->srv_part != NULL)
+               ptlrpc_svcpt_stop_threads(svc->srv_part);
        EXIT;
 }
 
@@ -2555,7 +2557,7 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
            ptlrpc_server_handle_request */
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
-               rc = ptlrpc_start_thread(svc->srv_part);
+               rc = ptlrpc_start_thread(svc->srv_part, 1);
                 /* We have enough threads, don't start more.  b=15759 */
                 if (rc == -EMFILE) {
                         rc = 0;
@@ -2571,13 +2573,11 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
         RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
+int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
        struct l_wait_info      lwi = { 0 };
-       struct ptlrpc_svc_data  d;
        struct ptlrpc_thread    *thread;
        struct ptlrpc_service   *svc = svcpt->scp_service;
-       char                    name[32];
        int                     rc;
        ENTRY;
 
@@ -2587,6 +2587,7 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
               svc->srv_name, svcpt->scp_nthrs_running,
               svc->srv_threads_min, svc->srv_threads_max);
 
+ again:
        if (unlikely(svc->srv_is_stopping))
                RETURN(-ESRCH);
 
@@ -2607,6 +2608,24 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
                RETURN(-EMFILE);
        }
 
+       if (svcpt->scp_nthrs_starting != 0) {
+               /* serialize starting because some modules (obdfilter)
+                * might require unique and contiguous t_id */
+               LASSERT(svcpt->scp_nthrs_starting == 1);
+               cfs_spin_unlock(&svcpt->scp_lock);
+               OBD_FREE_PTR(thread);
+               if (wait) {
+                       CDEBUG(D_INFO, "Waiting for creating thread %s #%d\n",
+                              svc->srv_thread_name, svcpt->scp_thr_nextid);
+                       cfs_schedule();
+                       goto again;
+               }
+
+               CDEBUG(D_INFO, "Creating thread %s #%d race, retry later\n",
+                      svc->srv_thread_name, svcpt->scp_thr_nextid);
+               RETURN(-EAGAIN);
+       }
+
        svcpt->scp_nthrs_starting++;
        thread->t_id = svcpt->scp_thr_nextid++;
        thread_add_flags(thread, SVC_STARTING);
@@ -2615,20 +2634,18 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
        cfs_list_add(&thread->t_link, &svcpt->scp_threads);
        cfs_spin_unlock(&svcpt->scp_lock);
 
-        sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
-        d.svc = svc;
-        d.name = name;
-        d.thread = thread;
-
-        CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
-        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
-         * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
-         */
-        rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
-        if (rc < 0) {
-                CERROR("cannot start thread '%s': rc %d\n", name, rc);
+       snprintf(thread->t_name, PTLRPC_THR_NAME_LEN,
+                "%s_%02d", svc->srv_thread_name, thread->t_id);
 
+       CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
+       /*
+        * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+        * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
+        */
+       rc = cfs_create_thread(ptlrpc_main, thread, CFS_DAEMON_FLAGS);
+       if (rc < 0) {
+               CERROR("cannot start thread '%s': rc %d\n",
+                      thread->t_name, rc);
                cfs_spin_lock(&svcpt->scp_lock);
                cfs_list_del(&thread->t_link);
                --svcpt->scp_nthrs_starting;
@@ -2637,6 +2654,10 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
                 OBD_FREE(thread, sizeof(*thread));
                 RETURN(rc);
         }
+
+       if (!wait)
+               RETURN(0);
+
         l_wait_event(thread->t_ctl_waitq,
                      thread_is_running(thread) || thread_is_stopped(thread),
                      &lwi);
@@ -2710,151 +2731,211 @@ static void ptlrpc_wait_replies(struct ptlrpc_service_part *svcpt)
        }
 }
 
-int ptlrpc_unregister_service(struct ptlrpc_service *service)
+static void
+ptlrpc_service_del_atimer(struct ptlrpc_service *svc)
 {
-       struct l_wait_info              lwi;
-       struct ptlrpc_service_part      *svcpt;
-       struct ptlrpc_reply_state       *rs;
-       struct ptlrpc_reply_state       *t;
-       struct ptlrpc_at_array          *array;
-       cfs_list_t                      *tmp;
-       int                             rc;
-       ENTRY;
+       struct ptlrpc_service_part        *svcpt;
 
-       service->srv_is_stopping = 1;
-       svcpt = service->srv_part;
-
-       if (svcpt == NULL || /* no instance of ptlrpc_service_part */
-           svcpt->scp_service == NULL) /* it's not fully initailzed */
-               GOTO(out, rc = 0);
-
-       cfs_timer_disarm(&svcpt->scp_at_timer);
-
-       ptlrpc_stop_all_threads(service);
+       /* early disarm AT timer... */
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-        cfs_spin_lock (&ptlrpc_all_services_lock);
-        cfs_list_del_init (&service->srv_list);
-        cfs_spin_unlock (&ptlrpc_all_services_lock);
+               cfs_timer_disarm(&svcpt->scp_at_timer);
+       } while (0);
+}
 
-        ptlrpc_lprocfs_unregister_service(service);
+static void
+ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc)
+{
+       struct ptlrpc_service_part        *svcpt;
+       struct ptlrpc_request_buffer_desc *rqbd;
+       struct l_wait_info                lwi;
+       int                               rc;
 
         /* All history will be culled when the next request buffer is
-         * freed */
-        service->srv_max_history_rqbds = 0;
+        * freed in ptlrpc_service_purge_all() */
+        svc->srv_max_history_rqbds = 0;
 
-        CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
+       rc = LNetClearLazyPortal(svc->srv_req_portal);
+       LASSERT(rc == 0);
 
-        rc = LNetClearLazyPortal(service->srv_req_portal);
-        LASSERT (rc == 0);
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-       /* Unlink all the request buffers.  This forces a 'final' event with
-        * its 'unlink' flag set for each posted rqbd */
-       cfs_list_for_each(tmp, &svcpt->scp_rqbd_posted) {
-                struct ptlrpc_request_buffer_desc *rqbd =
-                        cfs_list_entry(tmp, struct ptlrpc_request_buffer_desc,
-                                       rqbd_list);
+               /* Unlink all the request buffers.  This forces a 'final'
+                * event with its 'unlink' flag set for each posted rqbd */
+               cfs_list_for_each_entry(rqbd, &svcpt->scp_rqbd_posted,
+                                       rqbd_list) {
+                       rc = LNetMDUnlink(rqbd->rqbd_md_h);
+                       LASSERT(rc == 0 || rc == -ENOENT);
+               }
+       } while (0);
 
-                rc = LNetMDUnlink(rqbd->rqbd_md_h);
-                LASSERT (rc == 0 || rc == -ENOENT);
-        }
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-        /* Wait for the network to release any buffers it's currently
-         * filling */
-        for (;;) {
+               /* Wait for the network to release any buffers
+                * it's currently filling */
                cfs_spin_lock(&svcpt->scp_lock);
-               rc = svcpt->scp_nrqbds_posted;
+               while (svcpt->scp_nrqbds_posted != 0) {
+                       cfs_spin_unlock(&svcpt->scp_lock);
+                       /* Network access will complete in finite time but
+                        * the HUGE timeout lets us CWARN for visibility
+                        * of sluggish NALs */
+                       lwi = LWI_TIMEOUT_INTERVAL(
+                                       cfs_time_seconds(LONG_UNLINK),
+                                       cfs_time_seconds(1), NULL, NULL);
+                       rc = l_wait_event(svcpt->scp_waitq,
+                                         svcpt->scp_nrqbds_posted == 0, &lwi);
+                       if (rc == -ETIMEDOUT) {
+                               CWARN("Service %s waiting for "
+                                     "request buffers\n",
+                                     svcpt->scp_service->srv_name);
+                       }
+                       cfs_spin_lock(&svcpt->scp_lock);
+               }
                cfs_spin_unlock(&svcpt->scp_lock);
+       } while (0);
+}
 
-                if (rc == 0)
-                        break;
+static void
+ptlrpc_service_purge_all(struct ptlrpc_service *svc)
+{
+       struct ptlrpc_service_part              *svcpt;
+       struct ptlrpc_request_buffer_desc       *rqbd;
+       struct ptlrpc_request                   *req;
+       struct ptlrpc_reply_state               *rs;
 
-                /* Network access will complete in finite time but the HUGE
-                 * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
-                                           cfs_time_seconds(1), NULL, NULL);
-               rc = l_wait_event(svcpt->scp_waitq,
-                                 svcpt->scp_nrqbds_posted == 0, &lwi);
-               if (rc == -ETIMEDOUT)
-                       CWARN("Service %s waiting for request buffers\n",
-                             service->srv_name);
-       }
+       do { /* iterrate over multiple partitions in the future */
+               /* schedule all outstanding replies to terminate them */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-       /* schedule all outstanding replies to terminate them */
-       cfs_spin_lock(&svcpt->scp_rep_lock);
-       while (!cfs_list_empty(&svcpt->scp_rep_active)) {
-               struct ptlrpc_reply_state *rs =
-                       cfs_list_entry(svcpt->scp_rep_active.next,
-                                      struct ptlrpc_reply_state, rs_list);
-               cfs_spin_lock(&rs->rs_lock);
-               ptlrpc_schedule_difficult_reply(rs);
-               cfs_spin_unlock(&rs->rs_lock);
-       }
-       cfs_spin_unlock(&svcpt->scp_rep_lock);
+               cfs_spin_lock(&svcpt->scp_rep_lock);
+               while (!cfs_list_empty(&svcpt->scp_rep_active)) {
+                       rs = cfs_list_entry(svcpt->scp_rep_active.next,
+                                           struct ptlrpc_reply_state, rs_list);
+                       cfs_spin_lock(&rs->rs_lock);
+                       ptlrpc_schedule_difficult_reply(rs);
+                       cfs_spin_unlock(&rs->rs_lock);
+               }
+               cfs_spin_unlock(&svcpt->scp_rep_lock);
+
+               /* purge the request queue.  NB No new replies (rqbds
+                * all unlinked) and no service threads, so I'm the only
+                * thread noodling the request queue now */
+               while (!cfs_list_empty(&svcpt->scp_req_incoming)) {
+                       req = cfs_list_entry(svcpt->scp_req_incoming.next,
+                                            struct ptlrpc_request, rq_list);
+
+                       cfs_list_del(&req->rq_list);
+                       svcpt->scp_nreqs_incoming--;
+                       svcpt->scp_nreqs_active++;
+                       ptlrpc_server_finish_request(svcpt, req);
+               }
 
-       /* purge the request queue.  NB No new replies (rqbds all unlinked)
-        * and no service threads, so I'm the only thread noodling the
-        * request queue now */
-       while (!cfs_list_empty(&svcpt->scp_req_incoming)) {
-               struct ptlrpc_request *req =
-                       cfs_list_entry(svcpt->scp_req_incoming.next,
-                                      struct ptlrpc_request,
-                                      rq_list);
+               while (ptlrpc_server_request_pending(svcpt, 1)) {
+                       req = ptlrpc_server_request_get(svcpt, 1);
+                       cfs_list_del(&req->rq_list);
+                       svcpt->scp_nreqs_active++;
+                       ptlrpc_hpreq_fini(req);
+                       ptlrpc_server_finish_request(svcpt, req);
+               }
 
-               cfs_list_del(&req->rq_list);
-               svcpt->scp_nreqs_incoming--;
-               svcpt->scp_nreqs_active++;
-               ptlrpc_server_finish_request(svcpt, req);
-       }
-       while (ptlrpc_server_request_pending(svcpt, 1)) {
-               struct ptlrpc_request *req;
+               LASSERT(cfs_list_empty(&svcpt->scp_rqbd_posted));
+               LASSERT(svcpt->scp_nreqs_incoming == 0);
+               LASSERT(svcpt->scp_nreqs_active == 0);
+               /* history should have been culled by
+                * ptlrpc_server_finish_request */
+               LASSERT(svcpt->scp_hist_nrqbds == 0);
 
-               req = ptlrpc_server_request_get(svcpt, 1);
-               cfs_list_del(&req->rq_list);
-               svcpt->scp_nreqs_active++;
-               ptlrpc_server_finish_request(svcpt, req);
-       }
-       LASSERT(svcpt->scp_nreqs_incoming == 0);
-       LASSERT(svcpt->scp_nreqs_active == 0);
-       LASSERT(svcpt->scp_hist_nrqbds == 0);
-       LASSERT(cfs_list_empty(&svcpt->scp_rqbd_posted));
-
-       /* Now free all the request buffers since nothing references them
-        * any more... */
-       while (!cfs_list_empty(&svcpt->scp_rqbd_idle)) {
-               struct ptlrpc_request_buffer_desc *rqbd =
-                       cfs_list_entry(svcpt->scp_rqbd_idle.next,
-                                      struct ptlrpc_request_buffer_desc,
-                                      rqbd_list);
-
-               ptlrpc_free_rqbd(rqbd);
-       }
+               /* Now free all the request buffers since nothing
+                * references them any more... */
 
-       ptlrpc_wait_replies(svcpt);
+               while (!cfs_list_empty(&svcpt->scp_rqbd_idle)) {
+                       rqbd = cfs_list_entry(svcpt->scp_rqbd_idle.next,
+                                             struct ptlrpc_request_buffer_desc,
+                                             rqbd_list);
+                       ptlrpc_free_rqbd(rqbd);
+               }
+               ptlrpc_wait_replies(svcpt);
+
+               while (!cfs_list_empty(&svcpt->scp_rep_idle)) {
+                       rs = cfs_list_entry(svcpt->scp_rep_idle.next,
+                                           struct ptlrpc_reply_state,
+                                           rs_list);
+                       cfs_list_del(&rs->rs_list);
+                       OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
+               }
+       } while (0);
+}
 
-       cfs_list_for_each_entry_safe(rs, t, &svcpt->scp_rep_idle, rs_list) {
-               cfs_list_del(&rs->rs_list);
-               OBD_FREE_LARGE(rs, service->srv_max_reply_size);
-       }
+static void
+ptlrpc_service_free(struct ptlrpc_service *svc)
+{
+       struct ptlrpc_service_part      *svcpt;
+       struct ptlrpc_at_array          *array;
 
-       /* In case somebody rearmed this in the meantime */
-       cfs_timer_disarm(&svcpt->scp_at_timer);
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-       array = &svcpt->scp_at_array;
-        if (array->paa_reqs_array != NULL) {
-                OBD_FREE(array->paa_reqs_array,
-                         sizeof(cfs_list_t) * array->paa_size);
-                array->paa_reqs_array = NULL;
-        }
+               /* In case somebody rearmed this in the meantime */
+               cfs_timer_disarm(&svcpt->scp_at_timer);
+               array = &svcpt->scp_at_array;
 
-        if (array->paa_reqs_count != NULL) {
-                OBD_FREE(array->paa_reqs_count,
-                         sizeof(__u32) * array->paa_size);
-                array->paa_reqs_count= NULL;
-        }
+               if (array->paa_reqs_array != NULL) {
+                       OBD_FREE(array->paa_reqs_array,
+                                sizeof(cfs_list_t) * array->paa_size);
+                       array->paa_reqs_array = NULL;
+               }
+
+               if (array->paa_reqs_count != NULL) {
+                       OBD_FREE(array->paa_reqs_count,
+                                sizeof(__u32) * array->paa_size);
+                       array->paa_reqs_count = NULL;
+               }
+               svcpt->scp_service = NULL;
+       } while (0);
+
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt != NULL)
+                       OBD_FREE_PTR(svcpt);
+       } while (0);
+
+       OBD_FREE_PTR(svc);
+}
+
+int ptlrpc_unregister_service(struct ptlrpc_service *service)
+{
+       ENTRY;
+
+       CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
+
+       service->srv_is_stopping = 1;
+
+       cfs_spin_lock(&ptlrpc_all_services_lock);
+       cfs_list_del_init(&service->srv_list);
+       cfs_spin_unlock(&ptlrpc_all_services_lock);
+
+       ptlrpc_lprocfs_unregister_service(service);
+
+       ptlrpc_service_del_atimer(service);
+       ptlrpc_stop_all_threads(service);
+
+       ptlrpc_service_unlink_rqbd(service);
+       ptlrpc_service_purge_all(service);
+       ptlrpc_service_free(service);
 
-       OBD_FREE_PTR(svcpt);
- out:
-       OBD_FREE_PTR(service);
        RETURN(0);
 }