return NULL;
}
-static void ptlrpc_server_req_decref(struct ptlrpc_request *req)
+/**
+ * to actually free the request, must be called without holding svc_lock.
+ * note it's caller's responsibility to unlink req->rq_list.
+ */
+static void ptlrpc_server_free_request(struct ptlrpc_request *req)
{
- struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+ LASSERT(atomic_read(&req->rq_refcount) == 0);
+ LASSERT(list_empty(&req->rq_timed_list));
- if (!atomic_dec_and_test(&req->rq_refcount))
- return;
+ /* DEBUG_REQ() assumes the reply state of a request with a valid
+ * ref will not be destroyed until that reference is dropped. */
+ ptlrpc_req_drop_rs(req);
sptlrpc_svc_ctx_decref(req);
- LASSERT(list_empty(&req->rq_timed_list));
- if (req != &rqbd->rqbd_req) {
+ if (req != &req->rq_rqbd->rqbd_req) {
/* NB request buffers use an embedded
* req if the incoming req unlinked the
* MD; this isn't one of them! */
OBD_FREE(req, sizeof(*req));
- } else {
- struct ptlrpc_service *svc = rqbd->rqbd_service;
- /* schedule request buffer for re-use.
- * NB I can only do this after I've disposed of their
- * reqs; particularly the embedded req */
- spin_lock(&svc->srv_lock);
- list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
- spin_unlock(&svc->srv_lock);
}
}
-static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
-{
- list_del(&req->rq_list);
- ptlrpc_req_drop_rs(req);
- ptlrpc_server_req_decref(req);
-}
-
-static void
-ptlrpc_server_free_request(struct ptlrpc_request *req)
+/**
+ * drop a reference count of the request. if it reaches 0, we either
+ * put it into history list, or free it immediately.
+ */
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
struct list_head *tmp;
struct list_head *nxt;
- if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
- DEBUG_REQ(D_INFO, req, "free req");
- spin_lock(&svc->srv_at_lock);
- req->rq_sent_final = 1;
- list_del_init(&req->rq_timed_list);
- spin_unlock(&svc->srv_at_lock);
+ if (!atomic_dec_and_test(&req->rq_refcount))
+ return;
spin_lock(&svc->srv_lock);
req = list_entry(rqbd->rqbd_reqs.next,
struct ptlrpc_request,
rq_list);
- __ptlrpc_server_free_request(req);
+ list_del(&req->rq_list);
+ ptlrpc_server_free_request(req);
}
spin_lock(&svc->srv_lock);
+ /*
+ * now all reqs including the embedded req has been
+ * disposed, schedule request buffer for re-use.
+ */
+ LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
+ list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
}
+
+ spin_unlock(&svc->srv_lock);
} else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
- /* If we are low on memory, we are not interested in
- history */
+ /* If we are low on memory, we are not interested in history */
+ list_del(&req->rq_list);
list_del_init(&req->rq_history_list);
- __ptlrpc_server_free_request(req);
+ spin_unlock(&svc->srv_lock);
+
+ ptlrpc_server_free_request(req);
+ } else {
+ spin_unlock(&svc->srv_lock);
}
+}
- spin_unlock(&svc->srv_lock);
+/**
+ * to finish a request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+
+ if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
+ DEBUG_REQ(D_INFO, req, "free req");
+ spin_lock(&svc->srv_at_lock);
+ req->rq_sent_final = 1;
+ list_del_init(&req->rq_timed_list);
+ spin_unlock(&svc->srv_at_lock);
+
+ ptlrpc_server_drop_request(req);
}
/* This function makes sure dead exports are evicted in a timely manner.
at_get(&svc->srv_at_estimate), delay);
}
- /* ptlrpc_server_free_request may delete an entry out of the work
- list */
+ /* ptlrpc_server_finish_request may delete an entry out of
+ * the work list */
spin_lock(&svc->srv_at_lock);
while (!list_empty(&work_list)) {
rq = list_entry(work_list.next, struct ptlrpc_request,
if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
ptlrpc_at_add_timed(rq);
- ptlrpc_server_req_decref(rq);
+ ptlrpc_server_drop_request(rq);
spin_lock(&svc->srv_at_lock);
}
spin_unlock(&svc->srv_at_lock);
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
spin_unlock(&svc->srv_lock);
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
RETURN(1);
}
}
out_req:
- ptlrpc_server_free_request(request);
+ ptlrpc_server_finish_request(request);
RETURN(1);
}
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
}
while (!list_empty(&service->srv_request_queue)) {
struct ptlrpc_request *req =
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
}
LASSERT(service->srv_n_queued_reqs == 0);
LASSERT(service->srv_n_active_reqs == 0);