Whamcloud - gitweb
branch: HEAD
authorericm <ericm>
Tue, 14 Oct 2008 18:59:51 +0000 (18:59 +0000)
committerericm <ericm>
Tue, 14 Oct 2008 18:59:51 +0000 (18:59 +0000)
do not repost buffer before all requests are finished.
b=17228
r=wangdi
r=nathan

lustre/ptlrpc/service.c

index 20fa81f..1f08a77 100644 (file)
@@ -400,41 +400,34 @@ failed:
         return NULL;
 }
 
-static void ptlrpc_server_req_decref(struct ptlrpc_request *req)
+/**
+ * to actually free the request, must be called without holding svc_lock.
+ * note it's caller's responsibility to unlink req->rq_list.
+ */
+static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 {
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+        LASSERT(atomic_read(&req->rq_refcount) == 0);
+        LASSERT(list_empty(&req->rq_timed_list));
 
-        if (!atomic_dec_and_test(&req->rq_refcount))
-                return;
+         /* DEBUG_REQ() assumes the reply state of a request with a valid
+          * ref will not be destroyed until that reference is dropped. */
+        ptlrpc_req_drop_rs(req);
 
         sptlrpc_svc_ctx_decref(req);
 
-        LASSERT(list_empty(&req->rq_timed_list));
-        if (req != &rqbd->rqbd_req) {
+        if (req != &req->rq_rqbd->rqbd_req) {
                 /* NB request buffers use an embedded
                  * req if the incoming req unlinked the
                  * MD; this isn't one of them! */
                 OBD_FREE(req, sizeof(*req));
-        } else {
-                struct ptlrpc_service *svc = rqbd->rqbd_service;
-                /* schedule request buffer for re-use.
-                 * NB I can only do this after I've disposed of their
-                 * reqs; particularly the embedded req */
-                spin_lock(&svc->srv_lock);
-                list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
-                spin_unlock(&svc->srv_lock);
         }
 }
 
-static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
-{
-        list_del(&req->rq_list);
-        ptlrpc_req_drop_rs(req);
-        ptlrpc_server_req_decref(req);
-}
-
-static void
-ptlrpc_server_free_request(struct ptlrpc_request *req)
+/**
+ * drop a reference count of the request. if it reaches 0, we either
+ * put it into history list, or free it immediately.
+ */
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 {
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service             *svc = rqbd->rqbd_service;
@@ -442,12 +435,8 @@ ptlrpc_server_free_request(struct ptlrpc_request *req)
         struct list_head                  *tmp;
         struct list_head                  *nxt;
 
-        if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
-                DEBUG_REQ(D_INFO, req, "free req");
-        spin_lock(&svc->srv_at_lock);
-        req->rq_sent_final = 1;
-        list_del_init(&req->rq_timed_list);
-        spin_unlock(&svc->srv_at_lock);
+        if (!atomic_dec_and_test(&req->rq_refcount))
+                return;
 
         spin_lock(&svc->srv_lock);
 
@@ -490,20 +479,49 @@ ptlrpc_server_free_request(struct ptlrpc_request *req)
                                 req = list_entry(rqbd->rqbd_reqs.next,
                                                  struct ptlrpc_request,
                                                  rq_list);
-                                __ptlrpc_server_free_request(req);
+                                list_del(&req->rq_list);
+                                ptlrpc_server_free_request(req);
                         }
 
                         spin_lock(&svc->srv_lock);
+                        /*
+                         * now all reqs including the embedded req has been
+                         * disposed, schedule request buffer for re-use.
+                         */
+                        LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
+                        list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
                 }
+
+                spin_unlock(&svc->srv_lock);
         } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
-                 /* If we are low on memory, we are not interested in
-                    history */
+                /* If we are low on memory, we are not interested in history */
+                list_del(&req->rq_list);
                 list_del_init(&req->rq_history_list);
-                __ptlrpc_server_free_request(req);
+                spin_unlock(&svc->srv_lock);
+
+                ptlrpc_server_free_request(req);
+        } else {
+                spin_unlock(&svc->srv_lock);
         }
+}
 
-        spin_unlock(&svc->srv_lock);
+/**
+ * to finish a request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+{
+        struct ptlrpc_service  *svc = req->rq_rqbd->rqbd_service;
+
+        if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
+                DEBUG_REQ(D_INFO, req, "free req");
 
+        spin_lock(&svc->srv_at_lock);
+        req->rq_sent_final = 1;
+        list_del_init(&req->rq_timed_list);
+        spin_unlock(&svc->srv_at_lock);
+
+        ptlrpc_server_drop_request(req);
 }
 
 /* This function makes sure dead exports are evicted in a timely manner.
@@ -889,8 +907,8 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                       at_get(&svc->srv_at_estimate), delay);
         }
 
-        /* ptlrpc_server_free_request may delete an entry out of the work
-           list */
+        /* ptlrpc_server_finish_request may delete an entry out of
+         * the work list */
         spin_lock(&svc->srv_at_lock);
         while (!list_empty(&work_list)) {
                 rq = list_entry(work_list.next, struct ptlrpc_request,
@@ -904,7 +922,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                 if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
                         ptlrpc_at_add_timed(rq);
 
-                ptlrpc_server_req_decref(rq);
+                ptlrpc_server_drop_request(rq);
                 spin_lock(&svc->srv_at_lock);
         }
         spin_unlock(&svc->srv_at_lock);
@@ -1028,7 +1046,7 @@ err_req:
         svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
         spin_unlock(&svc->srv_lock);
-        ptlrpc_server_free_request(req);
+        ptlrpc_server_finish_request(req);
 
         RETURN(1);
 }
@@ -1203,7 +1221,7 @@ put_conn:
         }
 
 out_req:
-        ptlrpc_server_free_request(request);
+        ptlrpc_server_finish_request(request);
 
         RETURN(1);
 }
@@ -1775,7 +1793,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_free_request(req);
+                ptlrpc_server_finish_request(req);
         }
         while (!list_empty(&service->srv_request_queue)) {
                 struct ptlrpc_request *req =
@@ -1787,7 +1805,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
 
-                ptlrpc_server_free_request(req);
+                ptlrpc_server_finish_request(req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
         LASSERT(service->srv_n_active_reqs == 0);