void
ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode)
+ struct lustre_handle *lock, int mode, int no_ack)
{
struct ptlrpc_reply_state *rs = req->rq_reply_state;
int idx;
rs->rs_locks[idx] = *lock;
rs->rs_modes[idx] = mode;
rs->rs_difficult = 1;
+ rs->rs_no_ack = !!no_ack;
}
void
ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
{
struct ptlrpc_service *svc = rs->rs_service;
+ ENTRY;
#ifdef CONFIG_SMP
LASSERT (spin_is_locked (&svc->srv_lock));
LASSERT (rs->rs_difficult);
rs->rs_scheduled_ever = 1; /* flag any notification attempt */
- if (rs->rs_scheduled) /* being set up or already notified */
+ if (rs->rs_scheduled) { /* being set up or already notified */
+ EXIT;
return;
+ }
rs->rs_scheduled = 1;
list_del (&rs->rs_list);
list_add (&rs->rs_list, &svc->srv_reply_queue);
cfs_waitq_signal (&svc->srv_waitq);
+ EXIT;
}
void
{
struct list_head *tmp;
struct list_head *nxt;
+ ENTRY;
/* Find any replies that have been committed and get their service
* to attend to complete them. */
}
spin_unlock(&obd->obd_uncommitted_replies_lock);
+ EXIT;
}
static int
return NULL;
}
-static void ptlrpc_server_req_decref(struct ptlrpc_request *req)
+/**
+ * to actually free the request, must be called without holding svc_lock.
+ * note it's caller's responsibility to unlink req->rq_list.
+ */
+static void ptlrpc_server_free_request(struct ptlrpc_request *req)
{
- struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+ LASSERT(atomic_read(&req->rq_refcount) == 0);
+ LASSERT(list_empty(&req->rq_timed_list));
- if (!atomic_dec_and_test(&req->rq_refcount))
- return;
+ /* DEBUG_REQ() assumes the reply state of a request with a valid
+ * ref will not be destroyed until that reference is dropped. */
+ ptlrpc_req_drop_rs(req);
sptlrpc_svc_ctx_decref(req);
- LASSERT(list_empty(&req->rq_timed_list));
- if (req != &rqbd->rqbd_req) {
+ if (req != &req->rq_rqbd->rqbd_req) {
/* NB request buffers use an embedded
* req if the incoming req unlinked the
* MD; this isn't one of them! */
OBD_FREE(req, sizeof(*req));
- } else {
- struct ptlrpc_service *svc = rqbd->rqbd_service;
- /* schedule request buffer for re-use.
- * NB I can only do this after I've disposed of their
- * reqs; particularly the embedded req */
- spin_lock(&svc->srv_lock);
- list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
- spin_unlock(&svc->srv_lock);
}
}
-static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
-{
- list_del(&req->rq_list);
- ptlrpc_req_drop_rs(req);
- ptlrpc_server_req_decref(req);
-}
-
-static void
-ptlrpc_server_free_request(struct ptlrpc_request *req)
+/**
+ * drop a reference count of the request. if it reaches 0, we either
+ * put it into history list, or free it immediately.
+ */
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
struct list_head *tmp;
struct list_head *nxt;
- if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
- DEBUG_REQ(D_INFO, req, "free req");
- spin_lock(&svc->srv_at_lock);
- req->rq_sent_final = 1;
- list_del_init(&req->rq_timed_list);
- spin_unlock(&svc->srv_at_lock);
+ if (!atomic_dec_and_test(&req->rq_refcount))
+ return;
spin_lock(&svc->srv_lock);
req = list_entry(rqbd->rqbd_reqs.next,
struct ptlrpc_request,
rq_list);
- __ptlrpc_server_free_request(req);
+ list_del(&req->rq_list);
+ ptlrpc_server_free_request(req);
}
spin_lock(&svc->srv_lock);
+ /*
+ * now all reqs including the embedded req has been
+ * disposed, schedule request buffer for re-use.
+ */
+ LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
+ list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
}
+
+ spin_unlock(&svc->srv_lock);
} else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
- /* If we are low on memory, we are not interested in
- history */
- list_del(&req->rq_history_list);
- __ptlrpc_server_free_request(req);
+ /* If we are low on memory, we are not interested in history */
+ list_del(&req->rq_list);
+ list_del_init(&req->rq_history_list);
+ spin_unlock(&svc->srv_lock);
+
+ ptlrpc_server_free_request(req);
+ } else {
+ spin_unlock(&svc->srv_lock);
}
+}
- spin_unlock(&svc->srv_lock);
+/**
+ * to finish a request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+
+ if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
+ DEBUG_REQ(D_INFO, req, "free req");
+
+ spin_lock(&svc->srv_at_lock);
+ req->rq_sent_final = 1;
+ list_del_init(&req->rq_timed_list);
+ spin_unlock(&svc->srv_at_lock);
+ ptlrpc_server_drop_request(req);
}
/* This function makes sure dead exports are evicted in a timely manner.
EXIT;
}
-#ifndef __KERNEL__
-int lu_context_init(struct lu_context *ctx, __u32 tags)
-{
- return 0;
-}
-
-void lu_context_fini(struct lu_context *ctx)
-{
-}
-
-void lu_context_enter(struct lu_context *ctx)
-{
-}
-
-void lu_context_exit(struct lu_context *ctx)
-{
-}
-
-#endif
-
static int ptlrpc_check_req(struct ptlrpc_request *req)
{
if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
/* Set timer for closest deadline */
rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
rq_timed_list);
- next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
+ next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
at_early_margin);
if (next <= 0)
ptlrpc_at_timer((unsigned long)svc);
/* Add to sorted list. Presumably latest rpcs will have the latest
deadlines, so search backward. */
list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
- if (req->rq_deadline > rq->rq_deadline) {
+ if (req->rq_deadline >= rq->rq_deadline) {
list_add(&req->rq_timed_list, &rq->rq_timed_list);
found++;
break;
RETURN(-ENOSYS);
}
- if (extra_time) {
- /* Fake our processing time into the future to ask the
- clients for some extra amount of time */
- extra_time += cfs_time_current_sec() -
- req->rq_arrival_time.tv_sec;
- at_add(&svc->srv_at_estimate, extra_time);
+ if (req->rq_export && req->rq_export->exp_in_recovery) {
+ /* don't increase server estimates during recovery, and give
+ clients the full recovery time. */
+ newdl = cfs_time_current_sec() +
+ req->rq_export->exp_obd->obd_recovery_timeout;
+ } else {
+ if (extra_time) {
+ /* Fake our processing time into the future to ask the
+ clients for some extra amount of time */
+ extra_time += cfs_time_current_sec() -
+ req->rq_arrival_time.tv_sec;
+ at_add(&svc->srv_at_estimate, extra_time);
+ }
+ newdl = req->rq_arrival_time.tv_sec +
+ at_get(&svc->srv_at_estimate);
}
-
- newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate);
if (req->rq_deadline >= newdl) {
/* We're not adding any time, no need to send an early reply
(e.g. maybe at adaptive_max) */
at_get(&svc->srv_at_estimate), delay);
}
- /* ptlrpc_server_free_request may delete an entry out of the work
- list */
+ /* ptlrpc_server_finish_request may delete an entry out of
+ * the work list */
spin_lock(&svc->srv_at_lock);
while (!list_empty(&work_list)) {
rq = list_entry(work_list.next, struct ptlrpc_request,
if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
ptlrpc_at_add_timed(rq);
- ptlrpc_server_req_decref(rq);
+ ptlrpc_server_drop_request(rq);
spin_lock(&svc->srv_at_lock);
}
spin_unlock(&svc->srv_at_lock);
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
spin_unlock(&svc->srv_lock);
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
RETURN(1);
}
spin_unlock(&svc->srv_lock);
+ if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
+ libcfs_debug_dumplog();
+
do_gettimeofday(&work_start);
timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
if (likely(svc->srv_stats != NULL)) {
request->rq_session.lc_thread = thread;
lu_context_enter(&request->rq_session);
- CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
+ CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
request->rq_svc_thread = thread;
if (thread)
}
out_req:
- ptlrpc_server_free_request(request);
+ ptlrpc_server_finish_request(request);
RETURN(1);
}
if (!rs->rs_on_net) {
/* Off the net */
svc->srv_n_difficult_replies--;
+ if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+ /* wake up threads that are being stopped by
+ ptlrpc_unregister_service/ptlrpc_stop_threads
+ and sleep waiting svr_n_difficult_replies == 0 */
+ cfs_waitq_broadcast(&svc->srv_waitq);
spin_unlock(&svc->srv_lock);
class_export_put (exp);
struct ptlrpc_thread *thread)
{
struct l_wait_info lwi = { 0 };
+ ENTRY;
+ CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
spin_lock(&svc->srv_lock);
thread->t_flags = SVC_STOPPING;
spin_unlock(&svc->srv_lock);
spin_unlock(&svc->srv_lock);
OBD_FREE_PTR(thread);
+ EXIT;
}
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_thread *thread;
+ ENTRY;
spin_lock(&svc->srv_lock);
while (!list_empty(&svc->srv_threads)) {
}
spin_unlock(&svc->srv_lock);
+ EXIT;
}
int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
if (rc == -EMFILE)
break;
if (rc) {
- CERROR("cannot start %s thread #%d: rc %d\n",
+ CERROR("cannot start %s thread #%d: rc %d\n",
svc->srv_thread_name, i, rc);
ptlrpc_stop_all_threads(svc);
}
d.thread = thread;
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
+
/* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in ptlrpc_daemonize() right away.
*/
struct l_wait_info lwi;
struct list_head *tmp;
struct ptlrpc_reply_state *rs, *t;
+ ENTRY;
+ service->srv_is_stopping = 1;
cfs_timer_disarm(&service->srv_at_timer);
ptlrpc_stop_all_threads(service);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
}
while (!list_empty(&service->srv_request_queue)) {
struct ptlrpc_request *req =
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
}
LASSERT(service->srv_n_queued_reqs == 0);
LASSERT(service->srv_n_active_reqs == 0);
cfs_timer_disarm(&service->srv_at_timer);
OBD_FREE_PTR(service);
- return 0;
+ RETURN(0);
}
/* Returns 0 if the service is healthy.