Whamcloud - gitweb
LU-56 ptlrpc: Reduce at_lock dance
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index e6fe4c7..84f3474 100644 (file)
@@ -715,11 +715,12 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
         if (!cfs_atomic_dec_and_test(&req->rq_refcount))
                 return;
 
-       cfs_spin_lock(&svcpt->scp_at_lock);
        if (req->rq_at_linked) {
                struct ptlrpc_at_array *array = &svcpt->scp_at_array;
                 __u32 index = req->rq_at_index;
 
+               cfs_spin_lock(&svcpt->scp_at_lock);
+
                 LASSERT(!cfs_list_empty(&req->rq_timed_list));
                 cfs_list_del_init(&req->rq_timed_list);
                 cfs_spin_lock(&req->rq_lock);
@@ -727,10 +728,11 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                 cfs_spin_unlock(&req->rq_lock);
                 array->paa_reqs_count[index]--;
                 array->paa_count--;
-        } else
-                LASSERT(cfs_list_empty(&req->rq_timed_list));
 
-       cfs_spin_unlock(&svcpt->scp_at_lock);
+               cfs_spin_unlock(&svcpt->scp_at_lock);
+       } else {
+               LASSERT(cfs_list_empty(&req->rq_timed_list));
+       }
 
         /* finalize request */
         if (req->rq_export) {
@@ -962,10 +964,8 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt)
        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
        __s32 next;
 
-       cfs_spin_lock(&svcpt->scp_at_lock);
        if (array->paa_count == 0) {
                cfs_timer_disarm(&svcpt->scp_at_timer);
-               cfs_spin_unlock(&svcpt->scp_at_lock);
                return;
        }
 
@@ -979,7 +979,6 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt)
                CDEBUG(D_INFO, "armed %s at %+ds\n",
                       svcpt->scp_service->srv_name, next);
        }
-       cfs_spin_unlock(&svcpt->scp_at_lock);
 }
 
 /* Add rpc to early reply check list */
@@ -989,7 +988,6 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
         struct ptlrpc_request *rq = NULL;
         __u32 index;
-        int found = 0;
 
         if (AT_OFF)
                 return(0);
@@ -1031,12 +1029,9 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         array->paa_count++;
         if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
                 array->paa_deadline = req->rq_deadline;
-                found = 1;
-        }
-       cfs_spin_unlock(&svcpt->scp_at_lock);
-
-       if (found)
                ptlrpc_at_set_timer(svcpt);
+       }
+       cfs_spin_unlock(&svcpt->scp_at_lock);
 
        return 0;
 }
@@ -1208,10 +1203,10 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
        first = array->paa_deadline - now;
        if (first > at_early_margin) {
                /* We've still got plenty of time.  Reset the timer. */
-               cfs_spin_unlock(&svcpt->scp_at_lock);
                ptlrpc_at_set_timer(svcpt);
-                RETURN(0);
-        }
+               cfs_spin_unlock(&svcpt->scp_at_lock);
+               RETURN(0);
+       }
 
         /* We're close to a timeout, and we don't know how much longer the
            server will take. Send early replies to everyone expiring soon. */
@@ -1253,11 +1248,11 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
                         index = 0;
         }
         array->paa_deadline = deadline;
-       cfs_spin_unlock(&svcpt->scp_at_lock);
-
        /* we have a new earliest deadline, restart the timer */
        ptlrpc_at_set_timer(svcpt);
 
+       cfs_spin_unlock(&svcpt->scp_at_lock);
+
         CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
                "replies\n", first, at_extra, counter);
         if (first < 0) {
@@ -1746,8 +1741,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                                    at_get(&svcpt->scp_at_estimate));
         }
 
-        rc = lu_context_init(&request->rq_session,
-                             LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
+       rc = lu_context_init(&request->rq_session, LCT_SESSION | LCT_NOREF);
         if (rc) {
                 CERROR("Failure to initialize session: %d\n", rc);
                 goto out_req;
@@ -2737,151 +2731,211 @@ static void ptlrpc_wait_replies(struct ptlrpc_service_part *svcpt)
        }
 }
 
-int ptlrpc_unregister_service(struct ptlrpc_service *service)
+static void
+ptlrpc_service_del_atimer(struct ptlrpc_service *svc)
 {
-       struct l_wait_info              lwi;
-       struct ptlrpc_service_part      *svcpt;
-       struct ptlrpc_reply_state       *rs;
-       struct ptlrpc_reply_state       *t;
-       struct ptlrpc_at_array          *array;
-       cfs_list_t                      *tmp;
-       int                             rc;
-       ENTRY;
-
-       service->srv_is_stopping = 1;
-       svcpt = service->srv_part;
+       struct ptlrpc_service_part        *svcpt;
 
-       if (svcpt == NULL || /* no instance of ptlrpc_service_part */
-           svcpt->scp_service == NULL) /* it's not fully initailzed */
-               GOTO(out, rc = 0);
-
-       cfs_timer_disarm(&svcpt->scp_at_timer);
-
-       ptlrpc_stop_all_threads(service);
+       /* early disarm AT timer... */
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-        cfs_spin_lock (&ptlrpc_all_services_lock);
-        cfs_list_del_init (&service->srv_list);
-        cfs_spin_unlock (&ptlrpc_all_services_lock);
+               cfs_timer_disarm(&svcpt->scp_at_timer);
+       } while (0);
+}
 
-        ptlrpc_lprocfs_unregister_service(service);
+static void
+ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc)
+{
+       struct ptlrpc_service_part        *svcpt;
+       struct ptlrpc_request_buffer_desc *rqbd;
+       struct l_wait_info                lwi;
+       int                               rc;
 
         /* All history will be culled when the next request buffer is
-         * freed */
-        service->srv_max_history_rqbds = 0;
+        * freed in ptlrpc_service_purge_all() */
+        svc->srv_max_history_rqbds = 0;
 
-        CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
+       rc = LNetClearLazyPortal(svc->srv_req_portal);
+       LASSERT(rc == 0);
 
-        rc = LNetClearLazyPortal(service->srv_req_portal);
-        LASSERT (rc == 0);
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-       /* Unlink all the request buffers.  This forces a 'final' event with
-        * its 'unlink' flag set for each posted rqbd */
-       cfs_list_for_each(tmp, &svcpt->scp_rqbd_posted) {
-                struct ptlrpc_request_buffer_desc *rqbd =
-                        cfs_list_entry(tmp, struct ptlrpc_request_buffer_desc,
-                                       rqbd_list);
+               /* Unlink all the request buffers.  This forces a 'final'
+                * event with its 'unlink' flag set for each posted rqbd */
+               cfs_list_for_each_entry(rqbd, &svcpt->scp_rqbd_posted,
+                                       rqbd_list) {
+                       rc = LNetMDUnlink(rqbd->rqbd_md_h);
+                       LASSERT(rc == 0 || rc == -ENOENT);
+               }
+       } while (0);
 
-                rc = LNetMDUnlink(rqbd->rqbd_md_h);
-                LASSERT (rc == 0 || rc == -ENOENT);
-        }
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-        /* Wait for the network to release any buffers it's currently
-         * filling */
-        for (;;) {
+               /* Wait for the network to release any buffers
+                * it's currently filling */
                cfs_spin_lock(&svcpt->scp_lock);
-               rc = svcpt->scp_nrqbds_posted;
+               while (svcpt->scp_nrqbds_posted != 0) {
+                       cfs_spin_unlock(&svcpt->scp_lock);
+                       /* Network access will complete in finite time but
+                        * the HUGE timeout lets us CWARN for visibility
+                        * of sluggish NALs */
+                       lwi = LWI_TIMEOUT_INTERVAL(
+                                       cfs_time_seconds(LONG_UNLINK),
+                                       cfs_time_seconds(1), NULL, NULL);
+                       rc = l_wait_event(svcpt->scp_waitq,
+                                         svcpt->scp_nrqbds_posted == 0, &lwi);
+                       if (rc == -ETIMEDOUT) {
+                               CWARN("Service %s waiting for "
+                                     "request buffers\n",
+                                     svcpt->scp_service->srv_name);
+                       }
+                       cfs_spin_lock(&svcpt->scp_lock);
+               }
                cfs_spin_unlock(&svcpt->scp_lock);
+       } while (0);
+}
 
-                if (rc == 0)
-                        break;
+static void
+ptlrpc_service_purge_all(struct ptlrpc_service *svc)
+{
+       struct ptlrpc_service_part              *svcpt;
+       struct ptlrpc_request_buffer_desc       *rqbd;
+       struct ptlrpc_request                   *req;
+       struct ptlrpc_reply_state               *rs;
 
-                /* Network access will complete in finite time but the HUGE
-                 * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
-                                           cfs_time_seconds(1), NULL, NULL);
-               rc = l_wait_event(svcpt->scp_waitq,
-                                 svcpt->scp_nrqbds_posted == 0, &lwi);
-               if (rc == -ETIMEDOUT)
-                       CWARN("Service %s waiting for request buffers\n",
-                             service->srv_name);
-       }
+       do { /* iterrate over multiple partitions in the future */
+               /* schedule all outstanding replies to terminate them */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-       /* schedule all outstanding replies to terminate them */
-       cfs_spin_lock(&svcpt->scp_rep_lock);
-       while (!cfs_list_empty(&svcpt->scp_rep_active)) {
-               struct ptlrpc_reply_state *rs =
-                       cfs_list_entry(svcpt->scp_rep_active.next,
-                                      struct ptlrpc_reply_state, rs_list);
-               cfs_spin_lock(&rs->rs_lock);
-               ptlrpc_schedule_difficult_reply(rs);
-               cfs_spin_unlock(&rs->rs_lock);
-       }
-       cfs_spin_unlock(&svcpt->scp_rep_lock);
+               cfs_spin_lock(&svcpt->scp_rep_lock);
+               while (!cfs_list_empty(&svcpt->scp_rep_active)) {
+                       rs = cfs_list_entry(svcpt->scp_rep_active.next,
+                                           struct ptlrpc_reply_state, rs_list);
+                       cfs_spin_lock(&rs->rs_lock);
+                       ptlrpc_schedule_difficult_reply(rs);
+                       cfs_spin_unlock(&rs->rs_lock);
+               }
+               cfs_spin_unlock(&svcpt->scp_rep_lock);
+
+               /* purge the request queue.  NB No new replies (rqbds
+                * all unlinked) and no service threads, so I'm the only
+                * thread noodling the request queue now */
+               while (!cfs_list_empty(&svcpt->scp_req_incoming)) {
+                       req = cfs_list_entry(svcpt->scp_req_incoming.next,
+                                            struct ptlrpc_request, rq_list);
+
+                       cfs_list_del(&req->rq_list);
+                       svcpt->scp_nreqs_incoming--;
+                       svcpt->scp_nreqs_active++;
+                       ptlrpc_server_finish_request(svcpt, req);
+               }
 
-       /* purge the request queue.  NB No new replies (rqbds all unlinked)
-        * and no service threads, so I'm the only thread noodling the
-        * request queue now */
-       while (!cfs_list_empty(&svcpt->scp_req_incoming)) {
-               struct ptlrpc_request *req =
-                       cfs_list_entry(svcpt->scp_req_incoming.next,
-                                      struct ptlrpc_request,
-                                      rq_list);
+               while (ptlrpc_server_request_pending(svcpt, 1)) {
+                       req = ptlrpc_server_request_get(svcpt, 1);
+                       cfs_list_del(&req->rq_list);
+                       svcpt->scp_nreqs_active++;
+                       ptlrpc_hpreq_fini(req);
+                       ptlrpc_server_finish_request(svcpt, req);
+               }
 
-               cfs_list_del(&req->rq_list);
-               svcpt->scp_nreqs_incoming--;
-               svcpt->scp_nreqs_active++;
-               ptlrpc_server_finish_request(svcpt, req);
-       }
-       while (ptlrpc_server_request_pending(svcpt, 1)) {
-               struct ptlrpc_request *req;
+               LASSERT(cfs_list_empty(&svcpt->scp_rqbd_posted));
+               LASSERT(svcpt->scp_nreqs_incoming == 0);
+               LASSERT(svcpt->scp_nreqs_active == 0);
+               /* history should have been culled by
+                * ptlrpc_server_finish_request */
+               LASSERT(svcpt->scp_hist_nrqbds == 0);
 
-               req = ptlrpc_server_request_get(svcpt, 1);
-               cfs_list_del(&req->rq_list);
-               svcpt->scp_nreqs_active++;
-               ptlrpc_server_finish_request(svcpt, req);
-       }
-       LASSERT(svcpt->scp_nreqs_incoming == 0);
-       LASSERT(svcpt->scp_nreqs_active == 0);
-       LASSERT(svcpt->scp_hist_nrqbds == 0);
-       LASSERT(cfs_list_empty(&svcpt->scp_rqbd_posted));
-
-       /* Now free all the request buffers since nothing references them
-        * any more... */
-       while (!cfs_list_empty(&svcpt->scp_rqbd_idle)) {
-               struct ptlrpc_request_buffer_desc *rqbd =
-                       cfs_list_entry(svcpt->scp_rqbd_idle.next,
-                                      struct ptlrpc_request_buffer_desc,
-                                      rqbd_list);
-
-               ptlrpc_free_rqbd(rqbd);
-       }
+               /* Now free all the request buffers since nothing
+                * references them any more... */
 
-       ptlrpc_wait_replies(svcpt);
+               while (!cfs_list_empty(&svcpt->scp_rqbd_idle)) {
+                       rqbd = cfs_list_entry(svcpt->scp_rqbd_idle.next,
+                                             struct ptlrpc_request_buffer_desc,
+                                             rqbd_list);
+                       ptlrpc_free_rqbd(rqbd);
+               }
+               ptlrpc_wait_replies(svcpt);
+
+               while (!cfs_list_empty(&svcpt->scp_rep_idle)) {
+                       rs = cfs_list_entry(svcpt->scp_rep_idle.next,
+                                           struct ptlrpc_reply_state,
+                                           rs_list);
+                       cfs_list_del(&rs->rs_list);
+                       OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
+               }
+       } while (0);
+}
 
-       cfs_list_for_each_entry_safe(rs, t, &svcpt->scp_rep_idle, rs_list) {
-               cfs_list_del(&rs->rs_list);
-               OBD_FREE_LARGE(rs, service->srv_max_reply_size);
-       }
+static void
+ptlrpc_service_free(struct ptlrpc_service *svc)
+{
+       struct ptlrpc_service_part      *svcpt;
+       struct ptlrpc_at_array          *array;
 
-       /* In case somebody rearmed this in the meantime */
-       cfs_timer_disarm(&svcpt->scp_at_timer);
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt == NULL || svcpt->scp_service == NULL)
+                       break;
 
-       array = &svcpt->scp_at_array;
-        if (array->paa_reqs_array != NULL) {
-                OBD_FREE(array->paa_reqs_array,
-                         sizeof(cfs_list_t) * array->paa_size);
-                array->paa_reqs_array = NULL;
-        }
+               /* In case somebody rearmed this in the meantime */
+               cfs_timer_disarm(&svcpt->scp_at_timer);
+               array = &svcpt->scp_at_array;
 
-        if (array->paa_reqs_count != NULL) {
-                OBD_FREE(array->paa_reqs_count,
-                         sizeof(__u32) * array->paa_size);
-                array->paa_reqs_count= NULL;
-        }
+               if (array->paa_reqs_array != NULL) {
+                       OBD_FREE(array->paa_reqs_array,
+                                sizeof(cfs_list_t) * array->paa_size);
+                       array->paa_reqs_array = NULL;
+               }
+
+               if (array->paa_reqs_count != NULL) {
+                       OBD_FREE(array->paa_reqs_count,
+                                sizeof(__u32) * array->paa_size);
+                       array->paa_reqs_count = NULL;
+               }
+               svcpt->scp_service = NULL;
+       } while (0);
+
+       do { /* iterrate over multiple partitions in the future */
+               svcpt = svc->srv_part;
+               if (svcpt != NULL)
+                       OBD_FREE_PTR(svcpt);
+       } while (0);
+
+       OBD_FREE_PTR(svc);
+}
+
+int ptlrpc_unregister_service(struct ptlrpc_service *service)
+{
+       ENTRY;
+
+       CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
+
+       service->srv_is_stopping = 1;
+
+       cfs_spin_lock(&ptlrpc_all_services_lock);
+       cfs_list_del_init(&service->srv_list);
+       cfs_spin_unlock(&ptlrpc_all_services_lock);
+
+       ptlrpc_lprocfs_unregister_service(service);
+
+       ptlrpc_service_del_atimer(service);
+       ptlrpc_stop_all_threads(service);
+
+       ptlrpc_service_unlink_rqbd(service);
+       ptlrpc_service_purge_all(service);
+       ptlrpc_service_free(service);
 
-       OBD_FREE_PTR(svcpt);
- out:
-       OBD_FREE_PTR(service);
        RETURN(0);
 }