c->psc_watchdog_factor,
h, name, proc_entry,
prntfn, c->psc_min_threads, c->psc_max_threads,
- threadname, c->psc_ctx_tags);
+ threadname, c->psc_ctx_tags, NULL);
}
EXPORT_SYMBOL(ptlrpc_init_svc_conf);
cfs_proc_dir_entry_t *proc_entry,
svcreq_printfn_t svcreq_printfn,
int min_threads, int max_threads,
- char *threadname, __u32 ctx_tags)
+ char *threadname, __u32 ctx_tags,
+ svc_hpreq_handler_t hp_handler)
{
int rc;
struct ptlrpc_service *service;
service->srv_threads_max = max_threads;
service->srv_thread_name = threadname;
service->srv_ctx_tags = ctx_tags;
+ service->srv_hpreq_handler = hp_handler;
+ service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
+ service->srv_hpreq_count = 0;
+ service->srv_n_hpreq = 0;
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT (rc == 0);
CFS_INIT_LIST_HEAD(&service->srv_request_queue);
+ CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+ if (req->rq_export) {
+ class_export_put(req->rq_export);
+ req->rq_export = NULL;
+ }
+
if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
DEBUG_REQ(D_INFO, req, "free req");
static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
{
struct obd_export *oldest_exp;
- time_t oldest_time;
+ time_t oldest_time, new_time;
ENTRY;
of the list, we can be really lazy here - we don't have to evict
at the exact right moment. Eventually, all silent exports
will make it to the top of the list. */
- exp->exp_last_request_time = max(exp->exp_last_request_time,
- cfs_time_current_sec() + extra_delay);
+ /* Do not pay attention on 1sec or smaller renewals. */
+ new_time = cfs_time_current_sec() + extra_delay;
+ if (exp->exp_last_request_time + 1 /*second */ >= new_time)
+ RETURN_EXIT;
+
+ exp->exp_last_request_time = new_time;
CDEBUG(D_HA, "updating export %s at "CFS_TIME_T" exp %p\n",
exp->exp_client_uuid.uuid,
exp->exp_last_request_time, exp);
if (list_empty(&exp->exp_obd_chain_timed)) {
/* this one is not timed */
spin_unlock(&exp->exp_obd->obd_dev_lock);
- EXIT;
- return;
+ RETURN_EXIT;
}
list_move_tail(&exp->exp_obd_chain_timed,
RETURN(0);
}
+/**
+ * Put the request to the export list if the request may become
+ * a high priority one.
+ */
+static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ int rc;
+ ENTRY;
+
+ if (svc->srv_hpreq_handler) {
+ rc = svc->srv_hpreq_handler(req);
+ if (rc)
+ RETURN(rc);
+ }
+ if (req->rq_export && req->rq_ops) {
+ spin_lock(&req->rq_export->exp_lock);
+ list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc);
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+
+ RETURN(0);
+}
+
+/** Remove the request from the export list. */
+static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
+{
+ ENTRY;
+ if (req->rq_export && req->rq_ops) {
+ spin_lock(&req->rq_export->exp_lock);
+ list_del_init(&req->rq_exp_list);
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+ EXIT;
+}
+
+/**
+ * Make the request a high priority one.
+ *
+ * All the high priority requests are queued in a separate FIFO
+ * ptlrpc_service::srv_request_hpq list which is parallel to
+ * ptlrpc_service::srv_request_queue list but has a higher priority
+ * for handling.
+ *
+ * \see ptlrpc_server_handle_request().
+ */
+static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ ENTRY;
+ LASSERT(svc != NULL);
+ spin_lock(&req->rq_lock);
+ if (req->rq_hp == 0) {
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ /* Add to the high priority queue. */
+ list_move_tail(&req->rq_list, &svc->srv_request_hpq);
+ req->rq_hp = 1;
+ if (opc != OBD_PING)
+ DEBUG_REQ(D_NET, req, "high priority req");
+ }
+ spin_unlock(&req->rq_lock);
+ EXIT;
+}
+
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+ ENTRY;
+
+ spin_lock(&svc->srv_lock);
+ /* It may happen that the request is already taken for the processing
+ * but still in the export list, do not re-add it into the HP list. */
+ if (req->rq_phase == RQ_PHASE_NEW)
+ ptlrpc_hpreq_reorder_nolock(svc, req);
+ spin_unlock(&svc->srv_lock);
+ EXIT;
+}
+
+/** Check if the request if a high priority one. */
+static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
+{
+ int opc, rc = 0;
+ ENTRY;
+
+ /* Check by request opc. */
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ if (opc == OBD_PING)
+ RETURN(1);
+
+ /* Perform request specific check. */
+ if (req->rq_ops && req->rq_ops->hpreq_check)
+ rc = req->rq_ops->hpreq_check(req);
+ RETURN(rc);
+}
+
+/** Check if a request is a high priority one. */
+static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ int rc;
+ ENTRY;
+
+ rc = ptlrpc_server_hpreq_check(req);
+ if (rc < 0)
+ RETURN(rc);
+
+ spin_lock(&svc->srv_lock);
+ /* Before inserting the request into the queue, check if it is not
+ * inserted yet, or even already handled -- it may happen due to
+ * a racing ldlm_server_blocking_ast(). */
+ if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) {
+ if (rc)
+ ptlrpc_hpreq_reorder_nolock(svc, req);
+ else
+ list_add_tail(&req->rq_list, &svc->srv_request_queue);
+ }
+ spin_unlock(&svc->srv_lock);
+
+ RETURN(0);
+}
+
+/* Only allow normal priority requests on a service that has a high-priority
+ * queue if forced (i.e. cleanup), if there are other high priority requests
+ * already being processed (i.e. those threads can service more high-priority
+ * requests), or if there are enough idle threads that a later thread can do
+ * a high priority request. */
+static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
+{
+ return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
+ svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+}
+
+static struct ptlrpc_request *
+ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+{
+ struct ptlrpc_request *req = NULL;
+ ENTRY;
+
+ if (ptlrpc_server_allow_normal(svc, force) &&
+ !list_empty(&svc->srv_request_queue) &&
+ (list_empty(&svc->srv_request_hpq) ||
+ svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
+ req = list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
+ svc->srv_hpreq_count = 0;
+ } else if (!list_empty(&svc->srv_request_hpq)) {
+ req = list_entry(svc->srv_request_hpq.next,
+ struct ptlrpc_request, rq_list);
+ svc->srv_hpreq_count++;
+ }
+ RETURN(req);
+}
+
+static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+ return ((ptlrpc_server_allow_normal(svc, force) &&
+ !list_empty(&svc->srv_request_queue)) ||
+ !list_empty(&svc->srv_request_hpq));
+}
+
/* Handle freshly incoming reqs, add to timed early reply list,
pass on to regular request queue */
static int
"illegal security flavor,");
}
- class_export_put(req->rq_export);
- req->rq_export = NULL;
if (rc)
goto err_req;
+ ptlrpc_update_export_timer(req->rq_export, 0);
}
/* req_in handling should/must be fast */
}
ptlrpc_at_add_timed(req);
+ rc = ptlrpc_hpreq_init(svc, req);
+ if (rc)
+ GOTO(err_req, rc);
/* Move it over to the request processing queue */
- spin_lock(&svc->srv_lock);
- list_add_tail(&req->rq_list, &svc->srv_request_queue);
+ rc = ptlrpc_server_request_add(svc, req);
+ if (rc)
+ GOTO(err_req, rc);
cfs_waitq_signal(&svc->srv_waitq);
- spin_unlock(&svc->srv_lock);
RETURN(1);
err_req:
struct timeval work_start;
struct timeval work_end;
long timediff;
- int rc;
+ int opc, rc;
+ int fail_opc = 0;
ENTRY;
LASSERT(svc);
spin_lock(&svc->srv_lock);
- if (unlikely(list_empty (&svc->srv_request_queue) ||
+ if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
(
#ifndef __KERNEL__
/* !@%$# liblustre only has 1 thread */
* That means we always need at least 2 service threads. */
spin_unlock(&svc->srv_lock);
RETURN(0);
+ }
+
+ request = ptlrpc_server_request_get(svc, 0);
+ if (request == NULL) {
+ spin_unlock(&svc->srv_lock);
+ RETURN(0);
+ }
+
+ opc = lustre_msg_get_opc(request->rq_reqmsg);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
+ fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
+ else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+ fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
+
+ if (unlikely(fail_opc)) {
+ if (request->rq_export && request->rq_ops) {
+ spin_unlock(&svc->srv_lock);
+ OBD_FAIL_TIMEOUT(fail_opc, 4);
+ spin_lock(&svc->srv_lock);
+ request = ptlrpc_server_request_get(svc, 0);
+ if (request == NULL) {
+ spin_unlock(&svc->srv_lock);
+ RETURN(0);
+ }
+ LASSERT(ptlrpc_server_request_pending(svc, 0));
+ }
}
- request = list_entry (svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
- list_del_init (&request->rq_list);
+ list_del_init(&request->rq_list);
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
+ if (request->rq_hp)
+ svc->srv_n_hpreq++;
+ /* The phase is changed under the lock here because we need to know
+ * the request is under processing (see ptlrpc_hpreq_reorder()). */
+ ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
spin_unlock(&svc->srv_lock);
+ ptlrpc_hpreq_fini(request);
+
if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
libcfs_debug_dumplog();
if (thread)
request->rq_svc_thread->t_env->le_ses = &request->rq_session;
- request->rq_export = class_conn2export(
- lustre_msg_get_handle(request->rq_reqmsg));
-
if (likely(request->rq_export)) {
if (unlikely(ptlrpc_check_req(request)))
goto put_conn;
goto put_rpc_export;
}
- ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-
CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
"%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
(request->rq_export ?
if (export != NULL)
class_export_rpc_put(export);
put_conn:
- if (likely(request->rq_export != NULL))
- class_export_put(request->rq_export);
-
lu_context_exit(&request->rq_session);
lu_context_fini(&request->rq_session);
}
out_req:
+ spin_lock(&svc->srv_lock);
+ if (request->rq_hp)
+ svc->srv_n_hpreq--;
+ spin_unlock(&svc->srv_lock);
ptlrpc_server_finish_request(request);
RETURN(1);
svc->srv_rqbd_timeout == 0) ||
!list_empty(&svc->srv_req_in_queue) ||
!list_empty(&svc->srv_reply_queue) ||
- (!list_empty(&svc->srv_request_queue) &&
+ (ptlrpc_server_request_pending(svc, 0) &&
(svc->srv_n_active_reqs <
(svc->srv_threads_running - 1))) ||
svc->srv_at_check,
ptlrpc_at_check_timed(svc);
/* don't handle requests in the last thread */
- if (!list_empty (&svc->srv_request_queue) &&
+ if (ptlrpc_server_request_pending(svc, 0) &&
(svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
lu_context_enter(&env.le_ctx);
ptlrpc_server_handle_request(svc, thread);
service->srv_n_active_reqs++;
ptlrpc_server_finish_request(req);
}
- while (!list_empty(&service->srv_request_queue)) {
- struct ptlrpc_request *req =
- list_entry(service->srv_request_queue.next,
- struct ptlrpc_request,
- rq_list);
+ while (ptlrpc_server_request_pending(service, 1)) {
+ struct ptlrpc_request *req;
+ req = ptlrpc_server_request_get(service, 1);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
-
+ ptlrpc_hpreq_fini(req);
ptlrpc_server_finish_request(req);
}
LASSERT(service->srv_n_queued_reqs == 0);
do_gettimeofday(&right_now);
spin_lock(&svc->srv_lock);
- if (list_empty(&svc->srv_request_queue)) {
+ if (!ptlrpc_server_request_pending(svc, 1)) {
spin_unlock(&svc->srv_lock);
return 0;
}
/* How long has the next entry been waiting? */
- request = list_entry(svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
+ if (list_empty(&svc->srv_request_queue))
+ request = list_entry(svc->srv_request_hpq.next,
+ struct ptlrpc_request, rq_list);
+ else
+ request = list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
spin_unlock(&svc->srv_lock);