rs->rs_no_ack = !!no_ack;
}
+#ifdef __KERNEL__
+
+#define HRT_RUNNING 0
+#define HRT_STOPPING 1
+
+struct ptlrpc_hr_thread {
+ spinlock_t hrt_lock;
+ unsigned long hrt_flags;
+ cfs_waitq_t hrt_wait;
+ struct list_head hrt_queue;
+ struct completion hrt_completion;
+};
+
+struct ptlrpc_hr_service {
+ int hr_index;
+ int hr_n_threads;
+ int hr_size;
+ struct ptlrpc_hr_thread hr_threads[0];
+};
+
+struct rs_batch {
+ struct list_head rsb_replies;
+ struct ptlrpc_service *rsb_svc;
+ unsigned int rsb_n_replies;
+};
+
+/**
+ * A pointer to per-node reply handling service.
+ */
+static struct ptlrpc_hr_service *ptlrpc_hr = NULL;
+
+/**
+ * maximum mumber of replies scheduled in one batch
+ */
+#define MAX_SCHEDULED 256
+
+/**
+ * Initialize a reply batch.
+ *
+ * \param b batch
+ */
+static void rs_batch_init(struct rs_batch *b)
+{
+ memset(b, 0, sizeof *b);
+ CFS_INIT_LIST_HEAD(&b->rsb_replies);
+}
+
+/**
+ * Dispatch all replies accumulated in the batch to one from
+ * dedicated reply handing threads.
+ *
+ * \param b batch
+ */
+static void rs_batch_dispatch(struct rs_batch *b)
+{
+ if (b->rsb_n_replies != 0) {
+ struct ptlrpc_hr_service *hr = ptlrpc_hr;
+ int idx;
+
+ idx = hr->hr_index++;
+ if (hr->hr_index >= hr->hr_n_threads)
+ hr->hr_index = 0;
+
+ spin_lock(&hr->hr_threads[idx].hrt_lock);
+ list_splice_init(&b->rsb_replies,
+ &hr->hr_threads[idx].hrt_queue);
+ spin_unlock(&hr->hr_threads[idx].hrt_lock);
+ cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
+ b->rsb_n_replies = 0;
+ }
+}
+
+/**
+ * Add a reply to a batch.
+ * Add one reply object to a batch, schedule batched replies if overload.
+ *
+ * \param b batch
+ * \param rs reply
+ */
+static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
+{
+ struct ptlrpc_service *svc = rs->rs_service;
+
+ if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
+ if (b->rsb_svc != NULL) {
+ rs_batch_dispatch(b);
+ spin_unlock(&b->rsb_svc->srv_lock);
+ }
+ spin_lock(&svc->srv_lock);
+ b->rsb_svc = svc;
+ }
+ spin_lock(&rs->rs_lock);
+ rs->rs_scheduled_ever = 1;
+ if (rs->rs_scheduled == 0) {
+ list_move(&rs->rs_list, &b->rsb_replies);
+ rs->rs_scheduled = 1;
+ b->rsb_n_replies++;
+ }
+ spin_unlock(&rs->rs_lock);
+}
+
+/**
+ * Reply batch finalization.
+ * Dispatch remaining replies from the batch
+ * and release remaining spinlock.
+ *
+ * \param b batch
+ */
+static void rs_batch_fini(struct rs_batch *b)
+{
+ if (b->rsb_svc != 0) {
+ rs_batch_dispatch(b);
+ spin_unlock(&b->rsb_svc->srv_lock);
+ }
+}
+
+#define DECLARE_RS_BATCH(b) struct rs_batch b
+
+#else /* __KERNEL__ */
+
+#define rs_batch_init(b) do{}while(0)
+#define rs_batch_fini(b) do{}while(0)
+#define rs_batch_add(b, r) ptlrpc_schedule_difficult_reply(r)
+#define DECLARE_RS_BATCH(b)
+
+#endif /* __KERNEL__ */
+
+void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
+{
+#ifdef __KERNEL__
+ struct ptlrpc_hr_service *hr = ptlrpc_hr;
+ int idx;
+ ENTRY;
+
+ LASSERT(list_empty(&rs->rs_list));
+
+ idx = hr->hr_index++;
+ if (hr->hr_index >= hr->hr_n_threads)
+ hr->hr_index = 0;
+ spin_lock(&hr->hr_threads[idx].hrt_lock);
+ list_add_tail(&rs->rs_list, &hr->hr_threads[idx].hrt_queue);
+ spin_unlock(&hr->hr_threads[idx].hrt_lock);
+ cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
+ EXIT;
+#else
+ list_add_tail(&rs->rs_list, &rs->rs_service->srv_reply_queue);
+#endif
+}
+
void
ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
{
- struct ptlrpc_service *svc = rs->rs_service;
ENTRY;
- LASSERT_SPIN_LOCKED(&svc->srv_lock);
+ LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock);
+ LASSERT_SPIN_LOCKED(&rs->rs_lock);
LASSERT (rs->rs_difficult);
rs->rs_scheduled_ever = 1; /* flag any notification attempt */
}
rs->rs_scheduled = 1;
- list_del (&rs->rs_list);
- list_add (&rs->rs_list, &svc->srv_reply_queue);
- cfs_waitq_signal (&svc->srv_waitq);
+ list_del_init(&rs->rs_list);
+ ptlrpc_dispatch_difficult_reply(rs);
EXIT;
}
{
struct list_head *tmp;
struct list_head *nxt;
+ DECLARE_RS_BATCH(batch);
ENTRY;
+ rs_batch_init(&batch);
/* Find any replies that have been committed and get their service
* to attend to complete them. */
/* CAVEAT EMPTOR: spinlock ordering!!! */
spin_lock(&obd->obd_uncommitted_replies_lock);
-
list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
struct ptlrpc_reply_state *rs =
list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
LASSERT (rs->rs_difficult);
if (rs->rs_transno <= obd->obd_last_committed) {
- struct ptlrpc_service *svc = rs->rs_service;
-
- spin_lock (&svc->srv_lock);
- list_del_init (&rs->rs_obd_list);
- ptlrpc_schedule_difficult_reply (rs);
- spin_unlock (&svc->srv_lock);
+ list_del_init(&rs->rs_obd_list);
+ rs_batch_add(&batch, rs);
}
}
-
spin_unlock(&obd->obd_uncommitted_replies_lock);
+ rs_batch_fini(&batch);
EXIT;
}
CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_request_history);
CFS_INIT_LIST_HEAD(&service->srv_active_replies);
+#ifndef __KERNEL__
CFS_INIT_LIST_HEAD(&service->srv_reply_queue);
+#endif
CFS_INIT_LIST_HEAD(&service->srv_free_rs_list);
cfs_waitq_init(&service->srv_free_rs_waitq);
+ atomic_set(&service->srv_n_difficult_replies, 0);
spin_lock_init(&service->srv_at_lock);
CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
(
#ifndef __KERNEL__
/* !@%$# liblustre only has 1 thread */
- svc->srv_n_difficult_replies != 0 &&
+ atomic_read(&svc->srv_n_difficult_replies) != 0 &&
#endif
svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
/* Don't handle regular requests in the last thread, in order * re
RETURN(1);
}
+/**
+ * An internal function to process a single reply state object.
+ */
static int
-ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
+ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
{
- struct ptlrpc_reply_state *rs;
+ struct ptlrpc_service *svc = rs->rs_service;
struct obd_export *exp;
struct obd_device *obd;
int nlocks;
int been_handled;
ENTRY;
- spin_lock(&svc->srv_lock);
- if (list_empty (&svc->srv_reply_queue)) {
- spin_unlock(&svc->srv_lock);
- RETURN(0);
- }
-
- rs = list_entry (svc->srv_reply_queue.next,
- struct ptlrpc_reply_state, rs_list);
-
exp = rs->rs_export;
obd = exp->exp_obd;
LASSERT (rs->rs_difficult);
LASSERT (rs->rs_scheduled);
-
- list_del_init (&rs->rs_list);
-
- /* Disengage from notifiers carefully (lock order - irqrestore below!)*/
- spin_unlock(&svc->srv_lock);
-
- spin_lock (&obd->obd_uncommitted_replies_lock);
- /* Noop if removed already */
- list_del_init (&rs->rs_obd_list);
- spin_unlock (&obd->obd_uncommitted_replies_lock);
+ LASSERT (list_empty(&rs->rs_list));
spin_lock (&exp->exp_lock);
/* Noop if removed already */
list_del_init (&rs->rs_exp_list);
spin_unlock (&exp->exp_lock);
- spin_lock(&svc->srv_lock);
+ spin_lock(&rs->rs_lock);
been_handled = rs->rs_handled;
rs->rs_handled = 1;
+ if (!list_empty(&rs->rs_obd_list)) {
+ spin_unlock(&rs->rs_lock);
+ spin_lock(&obd->obd_uncommitted_replies_lock);
+ spin_lock(&rs->rs_lock);
+ list_del_init(&rs->rs_obd_list);
+ spin_unlock(&obd->obd_uncommitted_replies_lock);
+ }
+
nlocks = rs->rs_nlocks; /* atomic "steal", but */
rs->rs_nlocks = 0; /* locks still on rs_locks! */
}
if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
- spin_unlock(&svc->srv_lock);
+ spin_unlock(&rs->rs_lock);
if (!been_handled && rs->rs_on_net) {
LNetMDUnlink(rs->rs_md_h);
ldlm_lock_decref(&rs->rs_locks[nlocks],
rs->rs_modes[nlocks]);
- spin_lock(&svc->srv_lock);
+ spin_lock(&rs->rs_lock);
}
rs->rs_scheduled = 0;
if (!rs->rs_on_net) {
/* Off the net */
- svc->srv_n_difficult_replies--;
- if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
- /* wake up threads that are being stopped by
- ptlrpc_unregister_service/ptlrpc_stop_threads
- and sleep waiting svr_n_difficult_replies == 0 */
- cfs_waitq_broadcast(&svc->srv_waitq);
- spin_unlock(&svc->srv_lock);
+ spin_unlock(&rs->rs_lock);
class_export_put (exp);
rs->rs_export = NULL;
ptlrpc_rs_decref (rs);
atomic_dec (&svc->srv_outstanding_replies);
+ if (atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
+ svc->srv_is_stopping)
+ cfs_waitq_broadcast(&svc->srv_waitq);
RETURN(1);
}
/* still on the net; callback will schedule */
- spin_unlock(&svc->srv_lock);
+ spin_unlock(&rs->rs_lock);
RETURN(1);
}
#ifndef __KERNEL__
+
+/**
+ * Check whether given service has a reply available for processing
+ * and process it.
+ *
+ * \param svc a ptlrpc service
+ * \retval 0 no replies processes
+ * \retval 1 one reply processed
+ */
+static int
+ptlrpc_server_handle_reply(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_reply_state *rs = NULL;
+ ENTRY;
+
+ spin_lock(&svc->srv_lock);
+ if (!list_empty(&svc->srv_reply_queue)) {
+ rs = list_entry(svc->srv_reply_queue.prev,
+ struct ptlrpc_reply_state,
+ rs_list);
+ list_del_init(&rs->rs_list);
+ }
+ spin_unlock(&svc->srv_lock);
+ if (rs != NULL)
+ ptlrpc_handle_rs(rs);
+ RETURN(rs != NULL);
+}
+
/* FIXME make use of timeout later */
int
liblustre_check_services (void *arg)
/* XXX maintain a list of all managed devices: insert here */
- while ((thread->t_flags & SVC_STOPPING) == 0 ||
- svc->srv_n_difficult_replies != 0) {
+ while ((thread->t_flags & SVC_STOPPING) == 0) {
/* Don't exit while there are replies to be handled */
struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
ptlrpc_retry_rqbds, svc);
cond_resched();
l_wait_event_exclusive (svc->srv_waitq,
- ((thread->t_flags & SVC_STOPPING) != 0 &&
- svc->srv_n_difficult_replies == 0) ||
+ ((thread->t_flags & SVC_STOPPING) != 0) ||
(!list_empty(&svc->srv_idle_rqbds) &&
svc->srv_rqbd_timeout == 0) ||
!list_empty(&svc->srv_req_in_queue) ||
- !list_empty(&svc->srv_reply_queue) ||
(ptlrpc_server_request_pending(svc, 0) &&
(svc->srv_n_active_reqs <
(svc->srv_threads_running - 1))) ||
ptlrpc_start_thread(dev, svc);
}
- if (!list_empty(&svc->srv_reply_queue))
- ptlrpc_server_handle_reply(svc);
-
if (!list_empty(&svc->srv_req_in_queue)) {
/* Process all incoming reqs before handling any */
ptlrpc_server_handle_req_in(svc);
return rc;
}
+struct ptlrpc_hr_args {
+ int thread_index;
+ int cpu_index;
+ struct ptlrpc_hr_service *hrs;
+};
+
+static int hrt_dont_sleep(struct ptlrpc_hr_thread *t,
+ struct list_head *replies)
+{
+ int result;
+
+ spin_lock(&t->hrt_lock);
+ list_splice_init(&t->hrt_queue, replies);
+ result = test_bit(HRT_STOPPING, &t->hrt_flags) ||
+ !list_empty(replies);
+ spin_unlock(&t->hrt_lock);
+ return result;
+}
+
+static int ptlrpc_hr_main(void *arg)
+{
+ struct ptlrpc_hr_args * hr_args = arg;
+ struct ptlrpc_hr_service *hr = hr_args->hrs;
+ struct ptlrpc_hr_thread *t = &hr->hr_threads[hr_args->thread_index];
+ char threadname[20];
+ CFS_LIST_HEAD(replies);
+
+ snprintf(threadname, sizeof(threadname),
+ "ptlrpc_hr_%d", hr_args->thread_index);
+
+ ptlrpc_daemonize(threadname);
+#if defined(HAVE_NODE_TO_CPUMASK)
+ set_cpus_allowed(cfs_current(),
+ node_to_cpumask(cpu_to_node(hr_args->cpu_index)));
+#endif
+ set_bit(HRT_RUNNING, &t->hrt_flags);
+ cfs_waitq_signal(&t->hrt_wait);
+
+ while (!test_bit(HRT_STOPPING, &t->hrt_flags)) {
+
+ cfs_wait_event(t->hrt_wait, hrt_dont_sleep(t, &replies));
+ while (!list_empty(&replies)) {
+ struct ptlrpc_reply_state *rs;
+
+ rs = list_entry(replies.prev,
+ struct ptlrpc_reply_state,
+ rs_list);
+ list_del_init(&rs->rs_list);
+ ptlrpc_handle_rs(rs);
+ }
+ }
+
+ clear_bit(HRT_RUNNING, &t->hrt_flags);
+ complete(&t->hrt_completion);
+
+ return 0;
+}
+
+static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
+{
+ struct ptlrpc_hr_thread *t = &hr->hr_threads[n];
+ struct ptlrpc_hr_args args;
+ int rc;
+ ENTRY;
+
+ args.thread_index = n;
+ args.cpu_index = cpu;
+ args.hrs = hr;
+
+ rc = cfs_kernel_thread(ptlrpc_hr_main, (void*)&args,
+ CLONE_VM|CLONE_FILES);
+ if (rc < 0) {
+ complete(&t->hrt_completion);
+ GOTO(out, rc);
+ }
+ cfs_wait_event(t->hrt_wait, test_bit(HRT_RUNNING, &t->hrt_flags));
+ RETURN(0);
+ out:
+ return rc;
+}
+
+static void ptlrpc_stop_hr_thread(struct ptlrpc_hr_thread *t)
+{
+ ENTRY;
+
+ set_bit(HRT_STOPPING, &t->hrt_flags);
+ cfs_waitq_signal(&t->hrt_wait);
+ wait_for_completion(&t->hrt_completion);
+
+ EXIT;
+}
+
+static void ptlrpc_stop_hr_threads(struct ptlrpc_hr_service *hrs)
+{
+ int n;
+ ENTRY;
+
+ for (n = 0; n < hrs->hr_n_threads; n++)
+ ptlrpc_stop_hr_thread(&hrs->hr_threads[n]);
+
+ EXIT;
+}
+
+static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
+{
+ int rc = -ENOMEM;
+ int n, cpu, threads_started = 0;
+ ENTRY;
+
+ LASSERT(hr != NULL);
+ LASSERT(hr->hr_n_threads > 0);
+
+ for (n = 0, cpu = 0; n < hr->hr_n_threads; n++) {
+#if defined(HAVE_NODE_TO_CPUMASK)
+ while(!cpu_online(cpu)) {
+ cpu++;
+ if (cpu >= num_possible_cpus())
+ cpu = 0;
+ }
+#endif
+ rc = ptlrpc_start_hr_thread(hr, n, cpu);
+ if (rc != 0)
+ break;
+ threads_started++;
+ cpu++;
+ }
+ if (threads_started == 0) {
+ CERROR("No reply handling threads started\n");
+ RETURN(-ESRCH);
+ }
+ if (threads_started < hr->hr_n_threads) {
+ CWARN("Started only %d reply handling threads from %d\n",
+ threads_started, hr->hr_n_threads);
+ hr->hr_n_threads = threads_started;
+ }
+ RETURN(0);
+}
+
static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
struct ptlrpc_thread *thread)
{
rc = (thread->t_flags & SVC_STOPPED) ? thread->t_id : 0;
RETURN(rc);
}
-#endif
+
+
+int ptlrpc_hr_init(void)
+{
+ int i;
+ int n_cpus = num_online_cpus();
+ struct ptlrpc_hr_service *hr;
+ int size;
+ ENTRY;
+
+ LASSERT(ptlrpc_hr == NULL);
+
+ size = offsetof(struct ptlrpc_hr_service, hr_threads[n_cpus]);
+ OBD_ALLOC(hr, size);
+ if (hr == NULL)
+ RETURN(-ENOMEM);
+ for (i = 0; i < n_cpus; i++) {
+ struct ptlrpc_hr_thread *t = &hr->hr_threads[i];
+
+ spin_lock_init(&t->hrt_lock);
+ cfs_waitq_init(&t->hrt_wait);
+ CFS_INIT_LIST_HEAD(&t->hrt_queue);
+ init_completion(&t->hrt_completion);
+ }
+ hr->hr_n_threads = n_cpus;
+ hr->hr_size = size;
+ ptlrpc_hr = hr;
+
+ RETURN(ptlrpc_start_hr_threads(hr));
+}
+
+void ptlrpc_hr_fini(void)
+{
+ if (ptlrpc_hr != NULL) {
+ ptlrpc_stop_hr_threads(ptlrpc_hr);
+ OBD_FREE(ptlrpc_hr, ptlrpc_hr->hr_size);
+ ptlrpc_hr = NULL;
+ }
+}
+
+#endif /* __KERNEL__ */
+
+/**
+ * Wait until all already scheduled replies are processed.
+ */
+static void ptlrpc_wait_replies(struct ptlrpc_service *svc)
+{
+ while (1) {
+ int rc;
+ struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10),
+ NULL, NULL);
+ rc = l_wait_event(svc->srv_waitq,
+ atomic_read(&svc->srv_n_difficult_replies) == 0,
+ &lwi);
+ if (rc == 0)
+ break;
+ CWARN("Unexpectedly long timeout %p\n", svc);
+ }
+}
int ptlrpc_unregister_service(struct ptlrpc_service *service)
{
struct ptlrpc_reply_state *rs =
list_entry(service->srv_active_replies.next,
struct ptlrpc_reply_state, rs_list);
+ spin_lock(&rs->rs_lock);
ptlrpc_schedule_difficult_reply(rs);
+ spin_unlock(&rs->rs_lock);
}
spin_unlock(&service->srv_lock);
ptlrpc_free_rqbd(rqbd);
}
- /* wait for all outstanding replies to complete (they were
- * scheduled having been flagged to abort above) */
- while (atomic_read(&service->srv_outstanding_replies) != 0) {
- struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10), NULL, NULL);
-
- rc = l_wait_event(service->srv_waitq,
- !list_empty(&service->srv_reply_queue), &lwi);
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
-
- if (rc == 0) {
- ptlrpc_server_handle_reply(service);
- continue;
- }
- CWARN("Unexpectedly long timeout %p\n", service);
- }
+ ptlrpc_wait_replies(service);
list_for_each_entry_safe(rs, t, &service->srv_free_rs_list, rs_list) {
list_del(&rs->rs_list);