#endif
};
+enum {
+ SVC_STOPPED = 1 << 0,
+ SVC_STOPPING = 1 << 1,
+ SVC_STARTING = 1 << 2,
+ SVC_RUNNING = 1 << 3,
+ SVC_EVENT = 1 << 4,
+ SVC_SIGNAL = 1 << 5,
+};
+
/**
* Definition of server service thread structure
*/
struct ptlrpc_request rqbd_req;
};
-typedef int (*svc_handler_t)(struct ptlrpc_request *req);
-typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *);
-typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *);
+typedef int (*svc_thr_init_t)(struct ptlrpc_thread *thread);
+typedef void (*svc_thr_done_t)(struct ptlrpc_thread *thread);
+typedef int (*svc_handler_t)(struct ptlrpc_request *req);
+typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *);
+typedef void (*svc_req_printfn_t)(void *, struct ptlrpc_request *);
+
+#ifndef __cfs_cacheline_aligned
+/* NB: put it here for reducing patche dependence */
+# define __cfs_cacheline_aligned
+#endif
/**
* How many high priority requests to serve before serving one normal
* The service is listening on a particular portal (like tcp port)
* and perform actions for a specific server like IO service for OST
* or general metadata service for MDS.
+ *
+ * ptlrpc service has four locks:
+ * \a srv_lock
+ * serialize operations on rqbd and requests waiting for preprocess
+ * \a srv_rq_lock
+ * serialize operations active requests sent to this portal
+ * \a srv_at_lock
+ * serialize adaptive timeout stuff
+ * \a srv_rs_lock
+ * serialize operations on RS list (reply states)
+ *
+ * We don't have any use-case to take two or more locks at the same time
+ * for now, so there is no lock order issue.
*/
struct ptlrpc_service {
- cfs_list_t srv_list; /* chain thru all services */
- int srv_max_req_size; /* biggest request to receive */
- int srv_max_reply_size; /* biggest reply to send */
- int srv_buf_size; /* size of individual buffers */
- int srv_nbuf_per_group; /* # buffers to allocate in 1 group */
- int srv_nbufs; /* total # req buffer descs allocated */
- int srv_threads_min; /* threads to start at SOW */
- int srv_threads_max; /* thread upper limit */
- int srv_threads_started; /* index of last started thread */
- int srv_threads_running; /* # running threads */
- cfs_atomic_t srv_n_difficult_replies; /* # 'difficult' replies */
- int srv_n_active_reqs; /* # reqs being served */
- int srv_n_hpreq; /* # HPreqs being served */
- cfs_duration_t srv_rqbd_timeout; /* timeout before re-posting reqs, in tick */
- int srv_watchdog_factor; /* soft watchdog timeout multiplier */
- unsigned srv_cpu_affinity:1; /* bind threads to CPUs */
- unsigned srv_at_check:1; /* check early replies */
- unsigned srv_is_stopping:1; /* under unregister_service */
- cfs_time_t srv_at_checktime; /* debug */
-
- /** Local portal on which to receive requests */
- __u32 srv_req_portal;
- /** Portal on the client to send replies to */
- __u32 srv_rep_portal;
-
- /** AT stuff */
+ /** most often accessed fields */
+ /** chain thru all services */
+ cfs_list_t srv_list;
+ /** only statically allocated strings here; we don't clean them */
+ char *srv_name;
+ /** only statically allocated strings here; we don't clean them */
+ char *srv_thread_name;
+ /** service thread list */
+ cfs_list_t srv_threads;
+ /** threads to start at beginning of service */
+ int srv_threads_min;
+ /** thread upper limit */
+ int srv_threads_max;
+ /** always increasing number */
+ unsigned srv_threads_next_id;
+ /** # of starting threads */
+ int srv_threads_starting;
+ /** # running threads */
+ int srv_threads_running;
+
+ /** service operations, move to ptlrpc_svc_ops_t in the future */
/** @{ */
- struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */
- cfs_spinlock_t srv_at_lock;
- struct ptlrpc_at_array srv_at_array; /* reqs waiting for replies */
- cfs_timer_t srv_at_timer; /* early reply timer */
- /** @} */
-
- int srv_n_queued_reqs; /* # reqs in either of the queues below */
- int srv_hpreq_count; /* # hp requests handled */
- int srv_hpreq_ratio; /* # hp per lp reqs to handle */
- cfs_list_t srv_req_in_queue; /* incoming reqs */
- cfs_list_t srv_request_queue; /* reqs waiting for service */
- cfs_list_t srv_request_hpq; /* high priority queue */
-
- cfs_list_t srv_request_history; /* request history */
- __u64 srv_request_seq; /* next request sequence # */
- __u64 srv_request_max_cull_seq; /* highest seq culled from history */
- svcreq_printfn_t srv_request_history_print_fn; /* service-specific print fn */
-
- cfs_list_t srv_idle_rqbds; /* request buffers to be reposted */
- cfs_list_t srv_active_rqbds; /* req buffers receiving */
- cfs_list_t srv_history_rqbds; /* request buffer history */
- int srv_nrqbd_receiving; /* # posted request buffers */
- int srv_n_history_rqbds; /* # request buffers in history */
- int srv_max_history_rqbds;/* max # request buffers in history */
-
- cfs_atomic_t srv_outstanding_replies;
- cfs_list_t srv_active_replies; /* all the active replies */
-#ifndef __KERNEL__
- cfs_list_t srv_reply_queue; /* replies waiting for service */
-#endif
- cfs_waitq_t srv_waitq; /* all threads sleep on this. This
- * wait-queue is signalled when new
- * incoming request arrives and when
- * difficult reply has to be handled. */
-
- cfs_list_t srv_threads; /* service thread list */
+ /**
+ * if non-NULL called during thread creation (ptlrpc_start_thread())
+ * to initialize service specific per-thread state.
+ */
+ svc_thr_init_t srv_init;
+ /**
+ * if non-NULL called during thread shutdown (ptlrpc_main()) to
+ * destruct state created by ->srv_init().
+ */
+ svc_thr_done_t srv_done;
/** Handler function for incoming requests for this service */
- svc_handler_t srv_handler;
- svc_hpreq_handler_t srv_hpreq_handler; /* hp request handler */
-
- char *srv_name; /* only statically allocated strings here; we don't clean them */
- char *srv_thread_name; /* only statically allocated strings here; we don't clean them */
-
- cfs_spinlock_t srv_lock;
+ svc_handler_t srv_handler;
+ /** hp request handler */
+ svc_hpreq_handler_t srv_hpreq_handler;
+ /** service-specific print fn */
+ svc_req_printfn_t srv_req_printfn;
+ /** @} */
/** Root of /proc dir tree for this service */
- cfs_proc_dir_entry_t *srv_procroot;
+ cfs_proc_dir_entry_t *srv_procroot;
/** Pointer to statistic data for this service */
- struct lprocfs_stats *srv_stats;
-
- /** List of free reply_states */
- cfs_list_t srv_free_rs_list;
- /** waitq to run, when adding stuff to srv_free_rs_list */
- cfs_waitq_t srv_free_rs_waitq;
-
+ struct lprocfs_stats *srv_stats;
+ /** # hp per lp reqs to handle */
+ int srv_hpreq_ratio;
+ /** biggest request to receive */
+ int srv_max_req_size;
+ /** biggest reply to send */
+ int srv_max_reply_size;
+ /** size of individual buffers */
+ int srv_buf_size;
+ /** # buffers to allocate in 1 group */
+ int srv_nbuf_per_group;
+ /** Local portal on which to receive requests */
+ __u32 srv_req_portal;
+ /** Portal on the client to send replies to */
+ __u32 srv_rep_portal;
/**
* Tags for lu_context associated with this thread, see struct
* lu_context.
*/
- __u32 srv_ctx_tags;
+ __u32 srv_ctx_tags;
+ /** soft watchdog timeout multiplier */
+ int srv_watchdog_factor;
+ /** bind threads to CPUs */
+ unsigned srv_cpu_affinity:1;
+ /** under unregister_service */
+ unsigned srv_is_stopping:1;
+
/**
- * if non-NULL called during thread creation (ptlrpc_start_thread())
- * to initialize service specific per-thread state.
+ * serialize the following fields, used for protecting
+ * rqbd list and incoming requests waiting for preprocess
*/
- int (*srv_init)(struct ptlrpc_thread *thread);
+ cfs_spinlock_t srv_lock __cfs_cacheline_aligned;
+ /** incoming reqs */
+ cfs_list_t srv_req_in_queue;
+ /** total # req buffer descs allocated */
+ int srv_nbufs;
+ /** # posted request buffers */
+ int srv_nrqbd_receiving;
+ /** timeout before re-posting reqs, in tick */
+ cfs_duration_t srv_rqbd_timeout;
+ /** request buffers to be reposted */
+ cfs_list_t srv_idle_rqbds;
+ /** req buffers receiving */
+ cfs_list_t srv_active_rqbds;
+ /** request buffer history */
+ cfs_list_t srv_history_rqbds;
+ /** # request buffers in history */
+ int srv_n_history_rqbds;
+ /** max # request buffers in history */
+ int srv_max_history_rqbds;
+ /** request history */
+ cfs_list_t srv_request_history;
+ /** next request sequence # */
+ __u64 srv_request_seq;
+ /** highest seq culled from history */
+ __u64 srv_request_max_cull_seq;
/**
- * if non-NULL called during thread shutdown (ptlrpc_main()) to
- * destruct state created by ->srv_init().
+ * all threads sleep on this. This wait-queue is signalled when new
+ * incoming request arrives and when difficult reply has to be handled.
+ */
+ cfs_waitq_t srv_waitq;
+
+ /**
+ * serialize the following fields, used for processing requests
+ * sent to this portal
*/
- void (*srv_done)(struct ptlrpc_thread *thread);
+ cfs_spinlock_t srv_rq_lock __cfs_cacheline_aligned;
+ /** # reqs in either of the queues below */
+ /** reqs waiting for service */
+ cfs_list_t srv_request_queue;
+ /** high priority queue */
+ cfs_list_t srv_request_hpq;
+ /** # incoming reqs */
+ int srv_n_queued_reqs;
+ /** # reqs being served */
+ int srv_n_active_reqs;
+ /** # HPreqs being served */
+ int srv_n_active_hpreq;
+ /** # hp requests handled */
+ int srv_hpreq_count;
+
+ /** AT stuff */
+ /** @{ */
+ /**
+ * serialize the following fields, used for changes on
+ * adaptive timeout
+ */
+ cfs_spinlock_t srv_at_lock __cfs_cacheline_aligned;
+ /** estimated rpc service time */
+ struct adaptive_timeout srv_at_estimate;
+ /** reqs waiting for replies */
+ struct ptlrpc_at_array srv_at_array;
+ /** early reply timer */
+ cfs_timer_t srv_at_timer;
+ /** check early replies */
+ unsigned srv_at_check;
+ /** debug */
+ cfs_time_t srv_at_checktime;
+ /** @} */
+ /**
+ * serialize the following fields, used for processing
+ * replies for this portal
+ */
+ cfs_spinlock_t srv_rs_lock __cfs_cacheline_aligned;
+ /** all the active replies */
+ cfs_list_t srv_active_replies;
+#ifndef __KERNEL__
+ /** replies waiting for service */
+ cfs_list_t srv_reply_queue;
+#endif
+ /** List of free reply_states */
+ cfs_list_t srv_free_rs_list;
+ /** waitq to run, when adding stuff to srv_free_rs_list */
+ cfs_waitq_t srv_free_rs_waitq;
+ /** # 'difficult' replies */
+ cfs_atomic_t srv_n_difficult_replies;
//struct ptlrpc_srv_ni srv_interfaces[0];
};
struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
svc_handler_t h, char *name,
struct proc_dir_entry *proc_entry,
- svcreq_printfn_t prntfn,
+ svc_req_printfn_t prntfn,
char *threadname);
struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
int watchdog_factor,
svc_handler_t, char *name,
cfs_proc_dir_entry_t *proc_entry,
- svcreq_printfn_t,
+ svc_req_printfn_t,
int min_threads, int max_threads,
char *threadname, __u32 ctx_tags,
svc_hpreq_handler_t);
if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
if (b->rsb_svc != NULL) {
rs_batch_dispatch(b);
- cfs_spin_unlock(&b->rsb_svc->srv_lock);
+ cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
}
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rs_lock);
b->rsb_svc = svc;
}
cfs_spin_lock(&rs->rs_lock);
{
if (b->rsb_svc != 0) {
rs_batch_dispatch(b);
- cfs_spin_unlock(&b->rsb_svc->srv_lock);
+ cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
}
}
{
ENTRY;
- LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock);
+ LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock);
LASSERT_SPIN_LOCKED(&rs->rs_lock);
LASSERT (rs->rs_difficult);
- rs->rs_scheduled_ever = 1; /* flag any notification attempt */
+ rs->rs_scheduled_ever = 1; /* flag any notification attempt */
- if (rs->rs_scheduled) { /* being set up or already notified */
+ if (rs->rs_scheduled) { /* being set up or already notified */
EXIT;
return;
}
struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
svc_handler_t h, char *name,
struct proc_dir_entry *proc_entry,
- svcreq_printfn_t prntfn,
+ svc_req_printfn_t prntfn,
char *threadname)
{
return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
int req_portal, int rep_portal, int watchdog_factor,
svc_handler_t handler, char *name,
cfs_proc_dir_entry_t *proc_entry,
- svcreq_printfn_t svcreq_printfn,
+ svc_req_printfn_t svcreq_printfn,
int min_threads, int max_threads,
char *threadname, __u32 ctx_tags,
svc_hpreq_handler_t hp_handler)
service->srv_name = name;
cfs_spin_lock_init(&service->srv_lock);
+ cfs_spin_lock_init(&service->srv_rq_lock);
+ cfs_spin_lock_init(&service->srv_rs_lock);
CFS_INIT_LIST_HEAD(&service->srv_threads);
cfs_waitq_init(&service->srv_waitq);
service->srv_req_portal = req_portal;
service->srv_watchdog_factor = watchdog_factor;
service->srv_handler = handler;
- service->srv_request_history_print_fn = svcreq_printfn;
+ service->srv_req_printfn = svcreq_printfn;
service->srv_request_seq = 1; /* valid seq #s start at 1 */
service->srv_request_max_cull_seq = 0;
service->srv_threads_min = min_threads;
service->srv_hpreq_handler = hp_handler;
service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
service->srv_hpreq_count = 0;
- service->srv_n_hpreq = 0;
+ service->srv_n_active_hpreq = 0;
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT (rc == 0);
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rq_lock);
svc->srv_n_active_reqs++;
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
}
/**
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rq_lock);
svc->srv_n_active_reqs--;
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
}
/**
cfs_spin_lock(&svc->srv_lock);
- svc->srv_n_active_reqs--;
cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs);
refcount = --(rqbd->rqbd_refcount);
* to finish a request: stop sending more early replies, and release
* the request. should be called after we finished handling the request.
*/
-static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+static void ptlrpc_server_finish_request(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
+ cfs_spin_lock(&svc->srv_rq_lock);
+ svc->srv_n_active_reqs--;
+ if (req->rq_hp)
+ svc->srv_n_active_hpreq--;
+ cfs_spin_unlock(&svc->srv_rq_lock);
+
ptlrpc_server_drop_request(req);
}
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
ENTRY;
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rq_lock);
/* It may happen that the request is already taken for the processing
* but still in the export list, do not re-add it into the HP list. */
if (req->rq_phase == RQ_PHASE_NEW)
ptlrpc_hpreq_reorder_nolock(svc, req);
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
EXIT;
}
if (rc < 0)
RETURN(rc);
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rq_lock);
/* Before inserting the request into the queue, check if it is not
* inserted yet, or even already handled -- it may happen due to
* a racing ldlm_server_blocking_ast(). */
cfs_list_add_tail(&req->rq_list,
&svc->srv_request_queue);
}
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
RETURN(0);
}
/**
+ * Allow to handle high priority request
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ */
+static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force)
+{
+ if (force)
+ return 1;
+
+ if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+ return 0;
+
+ return cfs_list_empty(&svc->srv_request_queue) ||
+ svc->srv_hpreq_count < svc->srv_hpreq_ratio;
+}
+
+static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force)
+{
+ return ptlrpc_server_allow_high(svc, force) &&
+ !cfs_list_empty(&svc->srv_request_hpq);
+}
+
+/**
* Only allow normal priority requests on a service that has a high-priority
* queue if forced (i.e. cleanup), if there are other high priority requests
* already being processed (i.e. those threads can service more high-priority
* requests), or if there are enough idle threads that a later thread can do
* a high priority request.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
*/
static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
{
- return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
- svc->srv_threads_running <= svc->srv_threads_started - 2;
+ if (force ||
+ svc->srv_n_active_reqs < svc->srv_threads_running - 2)
+ return 1;
+
+ if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+ return 0;
+
+ return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL;
+}
+
+static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force)
+{
+ return ptlrpc_server_allow_normal(svc, force) &&
+ !cfs_list_empty(&svc->srv_request_queue);
+}
+
+/**
+ * Returns true if there are requests available in incoming
+ * request queue for processing and it is allowed to fetch them.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ * \see ptlrpc_server_allow_normal
+ * \see ptlrpc_server_allow high
+ */
+static inline int
+ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+ return ptlrpc_server_high_pending(svc, force) ||
+ ptlrpc_server_normal_pending(svc, force);
}
/**
static struct ptlrpc_request *
ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
{
- struct ptlrpc_request *req = NULL;
+ struct ptlrpc_request *req;
ENTRY;
- if (ptlrpc_server_allow_normal(svc, force) &&
- !cfs_list_empty(&svc->srv_request_queue) &&
- (cfs_list_empty(&svc->srv_request_hpq) ||
- svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
- req = cfs_list_entry(svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
- svc->srv_hpreq_count = 0;
- } else if (!cfs_list_empty(&svc->srv_request_hpq)) {
+ if (ptlrpc_server_high_pending(svc, force)) {
req = cfs_list_entry(svc->srv_request_hpq.next,
struct ptlrpc_request, rq_list);
svc->srv_hpreq_count++;
+ RETURN(req);
+
}
- RETURN(req);
-}
-/**
- * Returns true if there are requests available in incoming
- * request queue for processing and it is allowed to fetch them
- * \see ptlrpc_server_allow_normal
- */
-static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
-{
- return ((ptlrpc_server_allow_normal(svc, force) &&
- !cfs_list_empty(&svc->srv_request_queue)) ||
- !cfs_list_empty(&svc->srv_request_hpq));
+ if (ptlrpc_server_normal_pending(svc, force)) {
+ req = cfs_list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
+ svc->srv_hpreq_count = 0;
+ RETURN(req);
+ }
+ RETURN(NULL);
}
/**
req = cfs_list_entry(svc->srv_req_in_queue.next,
struct ptlrpc_request, rq_list);
cfs_list_del_init (&req->rq_list);
+ svc->srv_n_queued_reqs--;
/* Consider this still a "queued" request as far as stats are
concerned */
cfs_spin_unlock(&svc->srv_lock);
RETURN(1);
err_req:
- cfs_spin_lock(&svc->srv_lock);
- svc->srv_n_queued_reqs--;
+ cfs_spin_lock(&svc->srv_rq_lock);
svc->srv_n_active_reqs++;
- cfs_spin_unlock(&svc->srv_lock);
- ptlrpc_server_finish_request(req);
+ cfs_spin_unlock(&svc->srv_rq_lock);
+ ptlrpc_server_finish_request(svc, req);
RETURN(1);
}
LASSERT(svc);
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rq_lock);
#ifndef __KERNEL__
/* !@%$# liblustre only has 1 thread */
if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
RETURN(0);
}
#endif
request = ptlrpc_server_request_get(svc, 0);
if (request == NULL) {
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
RETURN(0);
}
if (unlikely(fail_opc)) {
if (request->rq_export && request->rq_ops) {
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
OBD_FAIL_TIMEOUT(fail_opc, 4);
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rq_lock);
request = ptlrpc_server_request_get(svc, 0);
if (request == NULL) {
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
RETURN(0);
}
}
}
cfs_list_del_init(&request->rq_list);
- svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
if (request->rq_hp)
- svc->srv_n_hpreq++;
+ svc->srv_n_active_hpreq++;
/* The phase is changed under the lock here because we need to know
* the request is under processing (see ptlrpc_hpreq_reorder()). */
ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
ptlrpc_hpreq_fini(request);
}
out_req:
- cfs_spin_lock(&svc->srv_lock);
- if (request->rq_hp)
- svc->srv_n_hpreq--;
- cfs_spin_unlock(&svc->srv_lock);
- ptlrpc_server_finish_request(request);
+ ptlrpc_server_finish_request(svc, request);
RETURN(1);
}
class_export_put (exp);
rs->rs_export = NULL;
ptlrpc_rs_decref (rs);
- cfs_atomic_dec (&svc->srv_outstanding_replies);
if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
svc->srv_is_stopping)
cfs_waitq_broadcast(&svc->srv_waitq);
struct ptlrpc_reply_state *rs = NULL;
ENTRY;
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rs_lock);
if (!cfs_list_empty(&svc->srv_reply_queue)) {
rs = cfs_list_entry(svc->srv_reply_queue.prev,
struct ptlrpc_reply_state,
rs_list);
cfs_list_del_init(&rs->rs_list);
}
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rs_lock);
if (rs != NULL)
ptlrpc_handle_rs(rs);
RETURN(rs != NULL);
return (-ETIMEDOUT);
}
-/**
- * Status bits to pass todo info from
- * ptlrpc_main_check_event to ptlrpc_main.
- */
-#define PTLRPC_MAIN_STOPPING 0x01
-#define PTLRPC_MAIN_IN_REQ 0x02
-#define PTLRPC_MAIN_ACTIVE_REQ 0x04
-#define PTLRPC_MAIN_CHECK_TIMED 0x08
-#define PTLRPC_MAIN_REPOST 0x10
+static inline int
+ptlrpc_threads_enough(struct ptlrpc_service *svc)
+{
+ return svc->srv_n_active_reqs <
+ svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL);
+}
/**
- * A container to share per-thread status variables between
- * ptlrpc_main_check_event and ptlrpc_main functions.
+ * allowed to create more threads
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
*/
-struct ptlrpc_main_check_s {
- /** todo info for the ptrlrpc_main */
- int todo;
- /** is this thread counted as running or not? */
- int running;
-};
+static inline int
+ptlrpc_threads_increasable(struct ptlrpc_service *svc)
+{
+ return svc->srv_threads_running +
+ svc->srv_threads_starting < svc->srv_threads_max;
+}
/**
- * Check whether current service thread has work to do.
+ * too many requests and allowed to create more threads
*/
-static int ptlrpc_main_check_event(struct ptlrpc_thread *t,
- struct ptlrpc_main_check_s *status)
+static inline int
+ptlrpc_threads_need_create(struct ptlrpc_service *svc)
{
- struct ptlrpc_service *svc = t->t_svc;
- ENTRY;
+ return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc);
+}
- status->todo = 0;
+static inline int
+ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
+{
+ return (thread->t_flags & SVC_STOPPING) != 0 ||
+ thread->t_svc->srv_is_stopping;
+}
- /* check the stop flags w/o any locking to make all
- * concurrently running threads stop faster. */
- if (unlikely((t->t_flags & SVC_STOPPING) ||
- svc->srv_is_stopping)) {
- status->todo |= PTLRPC_MAIN_STOPPING;
- goto out;
- }
+static inline int
+ptlrpc_rqbd_pending(struct ptlrpc_service *svc)
+{
+ return !cfs_list_empty(&svc->srv_idle_rqbds) &&
+ svc->srv_rqbd_timeout == 0;
+}
- cfs_spin_lock(&svc->srv_lock);
- /* ptlrpc_server_request_pending() needs this thread to be
- * counted as running. */
- if (!status->running) {
- svc->srv_threads_running++;
- status->running = 1;
- }
- /* Process all incoming reqs before handling any */
- if (!cfs_list_empty(&svc->srv_req_in_queue)) {
- status->todo |= PTLRPC_MAIN_IN_REQ;
- }
- /* Don't handle regular requests in the last thread, in order
- * to handle any incoming reqs, early replies, etc. */
- if (ptlrpc_server_request_pending(svc, 0) &&
- svc->srv_threads_running <= svc->srv_threads_started - 1) {
- status->todo |= PTLRPC_MAIN_ACTIVE_REQ;
- }
- if (svc->srv_at_check) {
- status->todo |= PTLRPC_MAIN_CHECK_TIMED;
- }
- if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
- svc->srv_rqbd_timeout == 0) {
- status->todo |= PTLRPC_MAIN_REPOST;
- }
- /* count this thread as not running if it is going to sleep in
- * the outer wait event */
- if (!status->todo) {
- svc->srv_threads_running--;
- status->running = 0;
- }
- cfs_spin_unlock(&svc->srv_lock);
- out:
- RETURN(status->todo);
+static inline int
+ptlrpc_at_check(struct ptlrpc_service *svc)
+{
+ return svc->srv_at_check;
+}
+
+/**
+ * requests wait on preprocessing
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
+ */
+static inline int
+ptlrpc_server_request_waiting(struct ptlrpc_service *svc)
+{
+ return !cfs_list_empty(&svc->srv_req_in_queue);
}
/**
struct ptlrpc_service *svc = data->svc;
struct ptlrpc_thread *thread = data->thread;
struct ptlrpc_reply_state *rs;
- struct ptlrpc_main_check_s st;
#ifdef WITH_GROUP_INFO
cfs_group_info_t *ginfo = NULL;
#endif
}
cfs_spin_lock(&svc->srv_lock);
+
+ LASSERT((thread->t_flags & SVC_STARTING) != 0);
+ thread->t_flags &= ~SVC_STARTING;
+ svc->srv_threads_starting--;
+
/* SVC_STOPPING may already be set here if someone else is trying
* to stop the service while this new thread has been dynamically
* forked. We still set SVC_RUNNING to let our creator know that
* we are now running, however we will exit as soon as possible */
thread->t_flags |= SVC_RUNNING;
+ svc->srv_threads_running++;
cfs_spin_unlock(&svc->srv_lock);
/*
thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rs_lock);
cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
- cfs_spin_unlock(&svc->srv_lock);
cfs_waitq_signal(&svc->srv_free_rs_waitq);
+ cfs_spin_unlock(&svc->srv_rs_lock);
CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
svc->srv_threads_running);
/* XXX maintain a list of all managed devices: insert here */
-
- st.running = 0;
- st.todo = 0;
-
- while (!(st.todo & PTLRPC_MAIN_STOPPING)) {
+ while (!ptlrpc_thread_stopping(thread)) {
/* Don't exit while there are replies to be handled */
struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
ptlrpc_retry_rqbds, svc);
cfs_cond_resched();
- l_wait_event_exclusive (svc->srv_waitq,
- ptlrpc_main_check_event(thread, &st),
- &lwi);
+ l_wait_event_exclusive(svc->srv_waitq,
+ ptlrpc_thread_stopping(thread) ||
+ ptlrpc_server_request_waiting(svc) ||
+ ptlrpc_server_request_pending(svc, 0) ||
+ ptlrpc_rqbd_pending(svc) ||
+ ptlrpc_at_check(svc), &lwi);
+
+ if (ptlrpc_thread_stopping(thread))
+ break;
lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
ptlrpc_check_rqbd_pool(svc);
- if (svc->srv_threads_started < svc->srv_threads_max &&
- svc->srv_n_active_reqs >= (svc->srv_threads_started - 2))
+ if (ptlrpc_threads_need_create(svc)) {
/* Ignore return code - we tried... */
ptlrpc_start_thread(svc);
+ }
/* Process all incoming reqs before handling any */
- if (st.todo & PTLRPC_MAIN_IN_REQ) {
+ if (ptlrpc_server_request_waiting(svc)) {
ptlrpc_server_handle_req_in(svc);
/* but limit ourselves in case of flood */
- if (counter++ < 1000)
+ if (counter++ < 100)
continue;
counter = 0;
}
- if (st.todo & PTLRPC_MAIN_CHECK_TIMED) {
+
+ if (ptlrpc_at_check(svc))
ptlrpc_at_check_timed(svc);
- }
- if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) {
+
+ if (ptlrpc_server_request_pending(svc, 0)) {
lu_context_enter(&env.le_ctx);
ptlrpc_server_handle_request(svc, thread);
lu_context_exit(&env.le_ctx);
}
- if ((st.todo & PTLRPC_MAIN_REPOST) &&
+
+ if (ptlrpc_rqbd_pending(svc) &&
ptlrpc_server_post_idle_rqbds(svc) < 0) {
/* I just failed to repost request buffers.
* Wait for a timeout (unless something else
thread, thread->t_pid, thread->t_id, rc);
cfs_spin_lock(&svc->srv_lock);
- if (st.running)
+ if ((thread->t_flags & SVC_STARTING) != 0) {
+ svc->srv_threads_starting--;
+ thread->t_flags &= ~SVC_STARTING;
+ }
+
+ if ((thread->t_flags & SVC_RUNNING) != 0) {
+ /* must know immediately */
svc->srv_threads_running--;
- thread->t_id = rc;
- thread->t_flags = SVC_STOPPED;
+ thread->t_flags &= ~SVC_RUNNING;
+ }
+
+ thread->t_id = rc;
+ thread->t_flags |= SVC_STOPPED;
+
cfs_waitq_signal(&thread->t_ctl_waitq);
cfs_spin_unlock(&svc->srv_lock);
struct ptlrpc_svc_data d;
struct ptlrpc_thread *thread;
char name[32];
- int id, rc;
+ int rc;
ENTRY;
CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
- svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
+ svc->srv_name, svc->srv_threads_running, svc->srv_threads_min,
svc->srv_threads_max, svc->srv_threads_running);
if (unlikely(svc->srv_is_stopping))
RETURN(-ESRCH);
- if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
+ if (!ptlrpc_threads_increasable(svc) ||
(OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
- svc->srv_threads_started == svc->srv_threads_min - 1))
+ svc->srv_threads_running == svc->srv_threads_min - 1))
RETURN(-EMFILE);
OBD_ALLOC_PTR(thread);
cfs_waitq_init(&thread->t_ctl_waitq);
cfs_spin_lock(&svc->srv_lock);
- if (svc->srv_threads_started >= svc->srv_threads_max) {
+ if (!ptlrpc_threads_increasable(svc)) {
cfs_spin_unlock(&svc->srv_lock);
OBD_FREE_PTR(thread);
RETURN(-EMFILE);
}
+
+ svc->srv_threads_starting++;
+ thread->t_id = svc->srv_threads_next_id++;
+ thread->t_flags |= SVC_STARTING;
+ thread->t_svc = svc;
+
cfs_list_add(&thread->t_link, &svc->srv_threads);
- id = svc->srv_threads_started++;
cfs_spin_unlock(&svc->srv_lock);
- thread->t_svc = svc;
- thread->t_id = id;
- sprintf(name, "%s_%02d", svc->srv_thread_name, id);
+ sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
d.svc = svc;
d.name = name;
d.thread = thread;
cfs_spin_lock(&svc->srv_lock);
cfs_list_del(&thread->t_link);
- --svc->srv_threads_started;
+ --svc->srv_threads_starting;
cfs_spin_unlock(&svc->srv_lock);
OBD_FREE(thread, sizeof(*thread));
}
/* schedule all outstanding replies to terminate them */
- cfs_spin_lock(&service->srv_lock);
+ cfs_spin_lock(&service->srv_rs_lock);
while (!cfs_list_empty(&service->srv_active_replies)) {
struct ptlrpc_reply_state *rs =
cfs_list_entry(service->srv_active_replies.next,
ptlrpc_schedule_difficult_reply(rs);
cfs_spin_unlock(&rs->rs_lock);
}
- cfs_spin_unlock(&service->srv_lock);
+ cfs_spin_unlock(&service->srv_rs_lock);
/* purge the request queue. NB No new replies (rqbds all unlinked)
* and no service threads, so I'm the only thread noodling the
cfs_list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_finish_request(req);
+ ptlrpc_server_finish_request(service, req);
}
while (ptlrpc_server_request_pending(service, 1)) {
struct ptlrpc_request *req;
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
ptlrpc_hpreq_fini(req);
- ptlrpc_server_finish_request(req);
+ ptlrpc_server_finish_request(service, req);
}
LASSERT(service->srv_n_queued_reqs == 0);
LASSERT(service->srv_n_active_reqs == 0);
cfs_gettimeofday(&right_now);
- cfs_spin_lock(&svc->srv_lock);
+ cfs_spin_lock(&svc->srv_rq_lock);
if (!ptlrpc_server_request_pending(svc, 1)) {
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
return 0;
}
request = cfs_list_entry(svc->srv_request_queue.next,
struct ptlrpc_request, rq_list);
timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
- cfs_spin_unlock(&svc->srv_lock);
+ cfs_spin_unlock(&svc->srv_rq_lock);
if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
at_max)) {