void
ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode)
+ struct lustre_handle *lock, int mode, int no_ack)
{
struct ptlrpc_reply_state *rs = req->rq_reply_state;
int idx;
rs->rs_locks[idx] = *lock;
rs->rs_modes[idx] = mode;
rs->rs_difficult = 1;
+ rs->rs_no_ack = !!no_ack;
}
void
ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
{
struct ptlrpc_service *svc = rs->rs_service;
+ ENTRY;
#ifdef CONFIG_SMP
LASSERT (spin_is_locked (&svc->srv_lock));
LASSERT (rs->rs_difficult);
rs->rs_scheduled_ever = 1; /* flag any notification attempt */
- if (rs->rs_scheduled) /* being set up or already notified */
+ if (rs->rs_scheduled) { /* being set up or already notified */
+ EXIT;
return;
+ }
rs->rs_scheduled = 1;
list_del (&rs->rs_list);
list_add (&rs->rs_list, &svc->srv_reply_queue);
cfs_waitq_signal (&svc->srv_waitq);
+ EXIT;
}
void
{
struct list_head *tmp;
struct list_head *nxt;
+ ENTRY;
/* Find any replies that have been committed and get their service
* to attend to complete them. */
}
spin_unlock(&obd->obd_uncommitted_replies_lock);
+ EXIT;
}
static int
/* Add to sorted list. Presumably latest rpcs will have the latest
deadlines, so search backward. */
list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
- if (req->rq_deadline > rq->rq_deadline) {
+ if (req->rq_deadline >= rq->rq_deadline) {
list_add(&req->rq_timed_list, &rq->rq_timed_list);
found++;
break;
RETURN(-ENOSYS);
}
- if (extra_time) {
- /* Fake our processing time into the future to ask the
- clients for some extra amount of time */
- extra_time += cfs_time_current_sec() -
- req->rq_arrival_time.tv_sec;
- at_add(&svc->srv_at_estimate, extra_time);
+ if (req->rq_export && req->rq_export->exp_in_recovery) {
+ /* don't increase server estimates during recovery, and give
+ clients the full recovery time. */
+ newdl = cfs_time_current_sec() +
+ req->rq_export->exp_obd->obd_recovery_timeout;
+ } else {
+ if (extra_time) {
+ /* Fake our processing time into the future to ask the
+ clients for some extra amount of time */
+ extra_time += cfs_time_current_sec() -
+ req->rq_arrival_time.tv_sec;
+ at_add(&svc->srv_at_estimate, extra_time);
+ }
+ newdl = req->rq_arrival_time.tv_sec +
+ at_get(&svc->srv_at_estimate);
}
-
- newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate);
if (req->rq_deadline >= newdl) {
/* We're not adding any time, no need to send an early reply
(e.g. maybe at adaptive_max) */
if (!rs->rs_on_net) {
/* Off the net */
svc->srv_n_difficult_replies--;
+ if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+ /* wake up threads that are being stopped by
+ ptlrpc_unregister_service/ptlrpc_stop_threads
+ and sleep waiting svr_n_difficult_replies == 0 */
+ cfs_waitq_broadcast(&svc->srv_waitq);
spin_unlock(&svc->srv_lock);
class_export_put (exp);
struct ptlrpc_thread *thread)
{
struct l_wait_info lwi = { 0 };
+ ENTRY;
+ CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
spin_lock(&svc->srv_lock);
thread->t_flags = SVC_STOPPING;
spin_unlock(&svc->srv_lock);
spin_unlock(&svc->srv_lock);
OBD_FREE_PTR(thread);
+ EXIT;
}
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_thread *thread;
+ ENTRY;
spin_lock(&svc->srv_lock);
while (!list_empty(&svc->srv_threads)) {
}
spin_unlock(&svc->srv_lock);
+ EXIT;
}
int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
struct l_wait_info lwi;
struct list_head *tmp;
struct ptlrpc_reply_state *rs, *t;
+ ENTRY;
+ service->srv_is_stopping = 1;
cfs_timer_disarm(&service->srv_at_timer);
ptlrpc_stop_all_threads(service);
cfs_timer_disarm(&service->srv_at_timer);
OBD_FREE_PTR(service);
- return 0;
+ RETURN(0);
}
/* Returns 0 if the service is healthy.