static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
{
return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
- svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+ svc->srv_threads_running < svc->srv_threads_started - 2;
}
static struct ptlrpc_request *
-ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+ptlrpc_server_request_get(struct ptlrpc_service *svc)
{
struct ptlrpc_request *req = NULL;
ENTRY;
- if (ptlrpc_server_allow_normal(svc, force) &&
- !cfs_list_empty(&svc->srv_request_queue) &&
+ if (!cfs_list_empty(&svc->srv_request_queue) &&
(cfs_list_empty(&svc->srv_request_hpq) ||
svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
req = cfs_list_entry(svc->srv_request_queue.next,
LASSERT(svc);
cfs_spin_lock(&svc->srv_lock);
- if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
- (
#ifndef __KERNEL__
- /* !@%$# liblustre only has 1 thread */
- cfs_atomic_read(&svc->srv_n_difficult_replies) != 0 &&
-#endif
- svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
- /* Don't handle regular requests in the last thread, in order * re
- * to handle difficult replies (which might block other threads)
- * as well as handle any incoming reqs, early replies, etc.
- * That means we always need at least 2 service threads. */
+ /* !@%$# liblustre only has 1 thread */
+ if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
cfs_spin_unlock(&svc->srv_lock);
RETURN(0);
- }
-
- request = ptlrpc_server_request_get(svc, 0);
+ }
+#endif
+ request = ptlrpc_server_request_get(svc);
if (request == NULL) {
cfs_spin_unlock(&svc->srv_lock);
RETURN(0);
cfs_spin_unlock(&svc->srv_lock);
OBD_FAIL_TIMEOUT(fail_opc, 4);
cfs_spin_lock(&svc->srv_lock);
- request = ptlrpc_server_request_get(svc, 0);
+ request = ptlrpc_server_request_get(svc);
if (request == NULL) {
cfs_spin_unlock(&svc->srv_lock);
RETURN(0);
}
- LASSERT(ptlrpc_server_request_pending(svc, 0));
}
}
return (-ETIMEDOUT);
}
+/**
+ * Status bits to pass todo info from
+ * ptlrpc_main_check_event to ptlrpc_main.
+ */
+#define PTLRPC_MAIN_STOPPING 0x01
+#define PTLRPC_MAIN_IN_REQ 0x02
+#define PTLRPC_MAIN_ACTIVE_REQ 0x04
+#define PTLRPC_MAIN_CHECK_TIMED 0x08
+#define PTLRPC_MAIN_REPOST 0x10
+
+/**
+ * A container to share per-thread status variables between
+ * ptlrpc_main_check_event and ptlrpc_main functions.
+ */
+struct ptlrpc_main_check_s {
+ /** todo info for the ptrlrpc_main */
+ int todo;
+ /** is this thread counted as running or not? */
+ int running;
+};
+
+/**
+ * Check whether current service thread has work to do.
+ */
+static int ptlrpc_main_check_event(struct ptlrpc_thread *t,
+ struct ptlrpc_main_check_s *status)
+{
+ struct ptlrpc_service *svc = t->t_svc;
+ ENTRY;
+
+ status->todo = 0;
+
+ /* check the stop flags w/o any locking to make all
+ * concurrently running threads stop faster. */
+ if (unlikely((t->t_flags & SVC_STOPPING) ||
+ svc->srv_is_stopping)) {
+ status->todo |= PTLRPC_MAIN_STOPPING;
+ goto out;
+ }
+
+ cfs_spin_lock(&svc->srv_lock);
+ /* count this thread as not running before possible sleep in
+ * the outer wait event if it is not done yet. */
+ if (status->running) {
+ LASSERT(svc->srv_threads_running > 0);
+ svc->srv_threads_running--;
+ status->running = 0;
+ }
+ /* Process all incoming reqs before handling any */
+ if (!cfs_list_empty(&svc->srv_req_in_queue)) {
+ status->todo |= PTLRPC_MAIN_IN_REQ;
+ }
+ /* Don't handle regular requests in the last thread, in order
+ * to handle any incoming reqs, early replies, etc. */
+ if (ptlrpc_server_request_pending(svc, 0) &&
+ (svc->srv_threads_running < (svc->srv_threads_started - 1))) {
+ status->todo |= PTLRPC_MAIN_ACTIVE_REQ;
+ }
+ if (svc->srv_at_check) {
+ status->todo |= PTLRPC_MAIN_CHECK_TIMED;
+ }
+ if ((!cfs_list_empty(&svc->srv_idle_rqbds) &&
+ svc->srv_rqbd_timeout == 0)) {
+ status->todo |= PTLRPC_MAIN_REPOST;
+ }
+ /* count this thread as active if it goes out the outer
+ * wait event */
+ if (status->todo) {
+ svc->srv_threads_running++;
+ status->running = 1;
+ }
+ cfs_spin_unlock(&svc->srv_lock);
+ out:
+ RETURN(status->todo);
+}
+
+/**
+ * Main prlrpc service thread routine.
+ */
static int ptlrpc_main(void *arg)
{
struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
struct ptlrpc_thread *thread = data->thread;
struct obd_device *dev = data->dev;
struct ptlrpc_reply_state *rs;
+ struct ptlrpc_main_check_s st;
#ifdef WITH_GROUP_INFO
cfs_group_info_t *ginfo = NULL;
#endif
thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
cfs_spin_lock(&svc->srv_lock);
- svc->srv_threads_running++;
cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
cfs_spin_unlock(&svc->srv_lock);
cfs_waitq_signal(&svc->srv_free_rs_waitq);
/* XXX maintain a list of all managed devices: insert here */
- while (!(thread->t_flags & SVC_STOPPING) && !svc->srv_is_stopping) {
+ st.running = 0;
+ st.todo = 0;
+
+ while (!(st.todo & PTLRPC_MAIN_STOPPING)) {
/* Don't exit while there are replies to be handled */
struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
ptlrpc_retry_rqbds, svc);
cfs_cond_resched();
l_wait_event_exclusive (svc->srv_waitq,
- thread->t_flags & SVC_STOPPING ||
- svc->srv_is_stopping ||
- (!cfs_list_empty(&svc->srv_idle_rqbds) &&
- svc->srv_rqbd_timeout == 0) ||
- !cfs_list_empty(&svc->srv_req_in_queue) ||
- (ptlrpc_server_request_pending(svc, 0) &&
- (svc->srv_n_active_reqs <
- (svc->srv_threads_running - 1))) ||
- svc->srv_at_check,
- &lwi);
-
- if (thread->t_flags & SVC_STOPPING || svc->srv_is_stopping)
- break;
+ ptlrpc_main_check_event(thread, &st),
+ &lwi);
lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
/* Ignore return code - we tried... */
ptlrpc_start_thread(dev, svc);
- if (!cfs_list_empty(&svc->srv_req_in_queue)) {
- /* Process all incoming reqs before handling any */
+ if (st.todo & PTLRPC_MAIN_IN_REQ) {
ptlrpc_server_handle_req_in(svc);
/* but limit ourselves in case of flood */
if (counter++ < 1000)
continue;
counter = 0;
}
-
- if (svc->srv_at_check)
+ if (st.todo & PTLRPC_MAIN_CHECK_TIMED) {
ptlrpc_at_check_timed(svc);
-
- /* don't handle requests in the last thread */
- if (ptlrpc_server_request_pending(svc, 0) &&
- (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
+ }
+ if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) {
lu_context_enter(&env.le_ctx);
ptlrpc_server_handle_request(svc, thread);
lu_context_exit(&env.le_ctx);
}
-
- if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
+ if ((st.todo & PTLRPC_MAIN_REPOST) &&
ptlrpc_server_post_idle_rqbds(svc) < 0) {
- /* I just failed to repost request buffers. Wait
- * for a timeout (unless something else happens)
- * before I try again */
+ /* I just failed to repost request buffers.
+ * Wait for a timeout (unless something else
+ * happens) before I try again */
svc->srv_rqbd_timeout = cfs_time_seconds(1)/10;
CDEBUG(D_RPCTRACE,"Posted buffers: %d\n",
svc->srv_nrqbd_receiving);
CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
thread, thread->t_pid, thread->t_id, rc);
- cfs_spin_lock(&svc->srv_lock);
- svc->srv_threads_running--; /* must know immediately */
+ if (st.running) {
+ cfs_spin_lock(&svc->srv_lock);
+ svc->srv_threads_running--;
+ cfs_spin_unlock(&svc->srv_lock);
+ }
+
thread->t_id = rc;
thread->t_flags = SVC_STOPPED;
cfs_waitq_signal(&thread->t_ctl_waitq);
- cfs_spin_unlock(&svc->srv_lock);
-
return rc;
}
while (ptlrpc_server_request_pending(service, 1)) {
struct ptlrpc_request *req;
- req = ptlrpc_server_request_get(service, 1);
+ req = ptlrpc_server_request_get(service);
cfs_list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;