void
ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode)
+ struct lustre_handle *lock, int mode, int no_ack)
{
struct ptlrpc_reply_state *rs = req->rq_reply_state;
int idx;
rs->rs_locks[idx] = *lock;
rs->rs_modes[idx] = mode;
rs->rs_difficult = 1;
+ rs->rs_no_ack = !!no_ack;
}
void
ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
{
struct ptlrpc_service *svc = rs->rs_service;
+ ENTRY;
#ifdef CONFIG_SMP
LASSERT (spin_is_locked (&svc->srv_lock));
LASSERT (rs->rs_difficult);
rs->rs_scheduled_ever = 1; /* flag any notification attempt */
- if (rs->rs_scheduled) /* being set up or already notified */
+ if (rs->rs_scheduled) { /* being set up or already notified */
+ EXIT;
return;
+ }
rs->rs_scheduled = 1;
list_del (&rs->rs_list);
list_add (&rs->rs_list, &svc->srv_reply_queue);
cfs_waitq_signal (&svc->srv_waitq);
+ EXIT;
}
void
{
struct list_head *tmp;
struct list_head *nxt;
+ ENTRY;
/* Find any replies that have been committed and get their service
* to attend to complete them. */
}
spin_unlock(&obd->obd_uncommitted_replies_lock);
+ EXIT;
}
static int
return NULL;
}
-static void ptlrpc_server_req_decref(struct ptlrpc_request *req)
+/**
+ * to actually free the request, must be called without holding svc_lock.
+ * note it's caller's responsibility to unlink req->rq_list.
+ */
+static void ptlrpc_server_free_request(struct ptlrpc_request *req)
{
- struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+ LASSERT(atomic_read(&req->rq_refcount) == 0);
+ LASSERT(list_empty(&req->rq_timed_list));
- if (!atomic_dec_and_test(&req->rq_refcount))
- return;
+ /* DEBUG_REQ() assumes the reply state of a request with a valid
+ * ref will not be destroyed until that reference is dropped. */
+ ptlrpc_req_drop_rs(req);
sptlrpc_svc_ctx_decref(req);
- LASSERT(list_empty(&req->rq_timed_list));
- if (req != &rqbd->rqbd_req) {
+ if (req != &req->rq_rqbd->rqbd_req) {
/* NB request buffers use an embedded
* req if the incoming req unlinked the
* MD; this isn't one of them! */
OBD_FREE(req, sizeof(*req));
- } else {
- struct ptlrpc_service *svc = rqbd->rqbd_service;
- /* schedule request buffer for re-use.
- * NB I can only do this after I've disposed of their
- * reqs; particularly the embedded req */
- spin_lock(&svc->srv_lock);
- list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
- spin_unlock(&svc->srv_lock);
}
}
-static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
-{
- list_del(&req->rq_list);
- ptlrpc_req_drop_rs(req);
- ptlrpc_server_req_decref(req);
-}
-
-static void
-ptlrpc_server_free_request(struct ptlrpc_request *req)
+/**
+ * drop a reference count of the request. if it reaches 0, we either
+ * put it into history list, or free it immediately.
+ */
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
struct list_head *tmp;
struct list_head *nxt;
- if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
- DEBUG_REQ(D_INFO, req, "free req");
- spin_lock(&svc->srv_at_lock);
- req->rq_sent_final = 1;
- list_del_init(&req->rq_timed_list);
- spin_unlock(&svc->srv_at_lock);
+ if (!atomic_dec_and_test(&req->rq_refcount))
+ return;
spin_lock(&svc->srv_lock);
req = list_entry(rqbd->rqbd_reqs.next,
struct ptlrpc_request,
rq_list);
- __ptlrpc_server_free_request(req);
+ list_del(&req->rq_list);
+ ptlrpc_server_free_request(req);
}
spin_lock(&svc->srv_lock);
+ /*
+ * now all reqs including the embedded req has been
+ * disposed, schedule request buffer for re-use.
+ */
+ LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
+ list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
}
+
+ spin_unlock(&svc->srv_lock);
} else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
- /* If we are low on memory, we are not interested in
- history */
- list_del(&req->rq_history_list);
- __ptlrpc_server_free_request(req);
+ /* If we are low on memory, we are not interested in history */
+ list_del(&req->rq_list);
+ list_del_init(&req->rq_history_list);
+ spin_unlock(&svc->srv_lock);
+
+ ptlrpc_server_free_request(req);
+ } else {
+ spin_unlock(&svc->srv_lock);
}
+}
- spin_unlock(&svc->srv_lock);
+/**
+ * to finish a request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+
+ if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
+ DEBUG_REQ(D_INFO, req, "free req");
+ spin_lock(&svc->srv_at_lock);
+ req->rq_sent_final = 1;
+ list_del_init(&req->rq_timed_list);
+ spin_unlock(&svc->srv_at_lock);
+
+ ptlrpc_server_drop_request(req);
}
/* This function makes sure dead exports are evicted in a timely manner.
EXIT;
}
-#ifndef __KERNEL__
-int lu_context_init(struct lu_context *ctx, __u32 tags)
-{
- return 0;
-}
-
-void lu_context_fini(struct lu_context *ctx)
-{
-}
-
-void lu_context_enter(struct lu_context *ctx)
-{
-}
-
-void lu_context_exit(struct lu_context *ctx)
-{
-}
-
-#endif
-
static int ptlrpc_check_req(struct ptlrpc_request *req)
{
if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
/* Set timer for closest deadline */
rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
rq_timed_list);
- next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
+ next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
at_early_margin);
if (next <= 0)
ptlrpc_at_timer((unsigned long)svc);
if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
return(-ENOSYS);
- DEBUG_REQ(D_ADAPTTO, req, "add timed %lds",
- req->rq_deadline - cfs_time_current_sec());
-
spin_lock(&svc->srv_at_lock);
if (unlikely(req->rq_sent_final)) {
/* Add to sorted list. Presumably latest rpcs will have the latest
deadlines, so search backward. */
list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
- if (req->rq_deadline > rq->rq_deadline) {
+ if (req->rq_deadline >= rq->rq_deadline) {
list_add(&req->rq_timed_list, &rq->rq_timed_list);
found++;
break;
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
struct ptlrpc_request *reqcopy;
struct lustre_msg *reqmsg;
- long olddl = req->rq_deadline - cfs_time_current_sec();
+ cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
time_t newdl;
int rc;
ENTRY;
RETURN(0);
if (olddl < 0) {
- CDEBUG(D_WARNING, "x"LPU64": Already past deadline (%+lds), not"
- " sending early reply. Increase at_early_margin (%d)?\n",
- req->rq_xid, olddl, at_early_margin);
+ DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
+ "not sending early reply. Consider increasing "
+ "at_early_margin (%d)?", olddl, at_early_margin);
+
/* Return an error so we're not re-added to the timed list. */
RETURN(-ETIMEDOUT);
}
if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){
- CDEBUG(D_INFO, "Wanted to ask client for more time, but no AT "
- "support\n");
+ DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, "
+ "but no AT support");
RETURN(-ENOSYS);
}
- if (extra_time) {
- /* Fake our processing time into the future to ask the
- clients for some extra amount of time */
- extra_time += cfs_time_current_sec() -
- req->rq_arrival_time.tv_sec;
- at_add(&svc->srv_at_estimate, extra_time);
+ if (req->rq_export && req->rq_export->exp_in_recovery) {
+ /* don't increase server estimates during recovery, and give
+ clients the full recovery time. */
+ newdl = cfs_time_current_sec() +
+ req->rq_export->exp_obd->obd_recovery_timeout;
+ } else {
+ if (extra_time) {
+ /* Fake our processing time into the future to ask the
+ clients for some extra amount of time */
+ extra_time += cfs_time_current_sec() -
+ req->rq_arrival_time.tv_sec;
+ at_add(&svc->srv_at_estimate, extra_time);
+ }
+ newdl = req->rq_arrival_time.tv_sec +
+ at_get(&svc->srv_at_estimate);
}
-
- newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate);
if (req->rq_deadline >= newdl) {
/* We're not adding any time, no need to send an early reply
(e.g. maybe at adaptive_max) */
- CDEBUG(D_ADAPTTO, "x"LPU64": Couldn't add any time (%ld/%ld), "
- "not sending early reply\n", req->rq_xid, olddl,
- newdl - cfs_time_current_sec());
+ DEBUG_REQ(D_WARNING, req, "Couldn't add any time ("
+ CFS_DURATION_T"/"CFS_DURATION_T"), "
+ "not sending early reply\n", olddl,
+ cfs_time_sub(newdl, cfs_time_current_sec()));
RETURN(-ETIMEDOUT);
}
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
if (req->rq_sent_final) {
- CDEBUG(D_ADAPTTO, "x"LPU64": normal reply already sent out, "
- "abort sending early reply\n", req->rq_xid);
+ DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
+ "abort sending early reply\n");
GOTO(out, rc = 0);
}
at_get(&svc->srv_at_estimate), delay);
}
- /* ptlrpc_server_free_request may delete an entry out of the work
- list */
+ /* ptlrpc_server_finish_request may delete an entry out of
+ * the work list */
spin_lock(&svc->srv_at_lock);
while (!list_empty(&work_list)) {
rq = list_entry(work_list.next, struct ptlrpc_request,
if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
ptlrpc_at_add_timed(rq);
- ptlrpc_server_req_decref(rq);
+ ptlrpc_server_drop_request(rq);
spin_lock(&svc->srv_at_lock);
}
spin_unlock(&svc->srv_at_lock);
/* req_in handling should/must be fast */
if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
- DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
- cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
+ DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s",
+ cfs_time_sub(cfs_time_current_sec(),
+ req->rq_arrival_time.tv_sec));
/* Set rpc server deadline and add it to the timed list */
deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
spin_unlock(&svc->srv_lock);
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
RETURN(1);
}
spin_unlock(&svc->srv_lock);
+ if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
+ libcfs_debug_dumplog();
+
do_gettimeofday(&work_start);
timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
if (likely(svc->srv_stats != NULL)) {
request->rq_session.lc_thread = thread;
lu_context_enter(&request->rq_session);
- CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
+ CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
request->rq_svc_thread = thread;
if (thread)
The deadline is increased if we send an early reply. */
if (cfs_time_current_sec() > request->rq_deadline) {
DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
- ": deadline %ld%+lds ago\n",
+ ": deadline "CFS_DURATION_T":"CFS_DURATION_T"s ago\n",
libcfs_id2str(request->rq_peer),
- request->rq_deadline -
- request->rq_arrival_time.tv_sec,
- cfs_time_current_sec() - request->rq_deadline);
+ cfs_time_sub(request->rq_deadline,
+ request->rq_arrival_time.tv_sec),
+ cfs_time_sub(cfs_time_current_sec(),
+ request->rq_deadline));
goto put_rpc_export;
}
if (unlikely(cfs_time_current_sec() > request->rq_deadline)) {
DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer "
- "than estimated (%ld%+lds); client may timeout.",
- request->rq_xid, request->rq_deadline -
- request->rq_arrival_time.tv_sec,
- cfs_time_current_sec() - request->rq_deadline);
+ "than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);"
+ " client may timeout.",
+ request->rq_xid, cfs_time_sub(request->rq_deadline,
+ request->rq_arrival_time.tv_sec),
+ cfs_time_sub(cfs_time_current_sec(),
+ request->rq_deadline));
}
do_gettimeofday(&work_end);
}
if (unlikely(request->rq_early_count)) {
DEBUG_REQ(D_ADAPTTO, request,
- "sent %d early replies before finishing in %lds",
+ "sent %d early replies before finishing in "
+ CFS_DURATION_T"s",
request->rq_early_count,
- work_end.tv_sec - request->rq_arrival_time.tv_sec);
+ cfs_time_sub(work_end.tv_sec,
+ request->rq_arrival_time.tv_sec));
}
out_req:
- ptlrpc_server_free_request(request);
+ ptlrpc_server_finish_request(request);
RETURN(1);
}
if (!rs->rs_on_net) {
/* Off the net */
svc->srv_n_difficult_replies--;
+ if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+ /* wake up threads that are being stopped by
+ ptlrpc_unregister_service/ptlrpc_stop_threads
+ and sleep waiting svr_n_difficult_replies == 0 */
+ cfs_waitq_broadcast(&svc->srv_waitq);
spin_unlock(&svc->srv_lock);
class_export_put (exp);
struct ptlrpc_thread *thread)
{
struct l_wait_info lwi = { 0 };
+ ENTRY;
+ CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
spin_lock(&svc->srv_lock);
thread->t_flags = SVC_STOPPING;
spin_unlock(&svc->srv_lock);
spin_unlock(&svc->srv_lock);
OBD_FREE_PTR(thread);
+ EXIT;
}
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_thread *thread;
+ ENTRY;
spin_lock(&svc->srv_lock);
while (!list_empty(&svc->srv_threads)) {
}
spin_unlock(&svc->srv_lock);
+ EXIT;
}
int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
if (rc == -EMFILE)
break;
if (rc) {
- CERROR("cannot start %s thread #%d: rc %d\n",
+ CERROR("cannot start %s thread #%d: rc %d\n",
svc->srv_thread_name, i, rc);
ptlrpc_stop_all_threads(svc);
}
d.thread = thread;
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
+
/* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in ptlrpc_daemonize() right away.
*/
struct l_wait_info lwi;
struct list_head *tmp;
struct ptlrpc_reply_state *rs, *t;
+ ENTRY;
+ service->srv_is_stopping = 1;
cfs_timer_disarm(&service->srv_at_timer);
ptlrpc_stop_all_threads(service);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
}
while (!list_empty(&service->srv_request_queue)) {
struct ptlrpc_request *req =
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
}
LASSERT(service->srv_n_queued_reqs == 0);
LASSERT(service->srv_n_active_reqs == 0);
cfs_timer_disarm(&service->srv_at_timer);
OBD_FREE_PTR(service);
- return 0;
+ RETURN(0);
}
/* Returns 0 if the service is healthy.