int req_portal, int rep_portal, int watchdog_factor,
svc_handler_t handler, char *name,
cfs_proc_dir_entry_t *proc_entry,
- svcreq_printfn_t svcreq_printfn,
+ svcreq_printfn_t svcreq_printfn,
int min_threads, int max_threads, char *threadname)
{
int rc;
LASSERT (nbufs > 0);
LASSERT (bufsize >= max_req_size);
-
+
OBD_ALLOC(service, sizeof(*service));
if (service == NULL)
RETURN(NULL);
CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
CFS_INIT_LIST_HEAD(&service->srv_at_list);
cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
- /* At SOW, service time should be quick; 10s seems generous. If client
+ /* At SOW, service time should be quick; 10s seems generous. If client
timeout is less than this, we'll be sending an early reply. */
at_init(&service->srv_at_estimate, 10, 0);
spin_lock (&ptlrpc_all_services_lock);
list_add (&service->srv_list, &ptlrpc_all_services);
spin_unlock (&ptlrpc_all_services_lock);
-
+
/* Now allocate the request buffers */
rc = ptlrpc_grow_req_bufs(service);
/* We shouldn't be under memory pressure at startup, so
oldest_time);
}
} else {
- if (cfs_time_current_sec() >
+ if (cfs_time_current_sec() >
(exp->exp_obd->obd_eviction_timer + extra_delay)) {
/* The evictor won't evict anyone who we've heard from
* recently, so we don't have to check before we start
static int ptlrpc_check_req(struct ptlrpc_request *req)
{
- if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
+ if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
req->rq_export->exp_conn_cnt) {
DEBUG_REQ(D_ERROR, req,
"DROPPING req from old connection %d < %d",
}
/* Set timer for closest deadline */
- rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
+ rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
rq_timed_list);
next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
at_early_margin);
- if (next <= 0)
+ if (next <= 0)
ptlrpc_at_timer((unsigned long)svc);
else
cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
struct ptlrpc_request *rq;
int found = 0;
- if (AT_OFF)
+ if (AT_OFF)
return(0);
if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
return(-ENOSYS);
-
+
spin_lock(&svc->srv_at_lock);
if (unlikely(req->rq_sent_final)) {
ptlrpc_at_set_timer(svc);
return 0;
-}
+}
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
int extra_time)
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
time_t newdl;
int rc;
ENTRY;
-
- /* deadline is when the client expects us to reply, margin is the
+
+ /* deadline is when the client expects us to reply, margin is the
difference between clients' and servers' expectations */
- DEBUG_REQ(D_ADAPTTO, req,
+ DEBUG_REQ(D_ADAPTTO, req,
"%ssending early reply (deadline %+lds, margin %+lds) for "
"%d+%d", AT_OFF ? "AT off - not " : "",
olddl, olddl - at_get(&svc->srv_at_estimate),
at_get(&svc->srv_at_estimate), extra_time);
- if (AT_OFF)
+ if (AT_OFF)
RETURN(0);
-
+
if (olddl < 0) {
DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
"not sending early reply. Consider increasing "
OBD_FREE(reqcopy, sizeof *reqcopy);
RETURN(-ENOMEM);
}
-
+
*reqcopy = *req;
reqcopy->rq_reply_state = NULL;
reqcopy->rq_rep_swab_mask = 0;
/* RPC ref */
class_export_rpc_get(reqcopy->rq_export);
- if (reqcopy->rq_export->exp_obd &&
+ if (reqcopy->rq_export->exp_obd &&
reqcopy->rq_export->exp_obd->obd_fail)
GOTO(out_put, rc = -ENODEV);
rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
- if (rc)
+ if (rc)
GOTO(out_put, rc);
rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
}
- /* Free the (early) reply state from lustre_pack_reply.
+ /* Free the (early) reply state from lustre_pack_reply.
(ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
ptlrpc_req_drop_rs(reqcopy);
}
delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
svc->srv_at_check = 0;
-
+
if (list_empty(&svc->srv_at_list)) {
spin_unlock(&svc->srv_at_lock);
- RETURN(0);
+ RETURN(0);
}
/* The timer went off, but maybe the nearest rpc already completed. */
/* We've still got plenty of time. Reset the timer. */
spin_unlock(&svc->srv_at_lock);
ptlrpc_at_set_timer(svc);
- RETURN(0);
+ RETURN(0);
}
- /* We're close to a timeout, and we don't know how much longer the
+ /* We're close to a timeout, and we don't know how much longer the
server will take. Send early replies to everyone expiring soon. */
CFS_INIT_LIST_HEAD(&work_list);
list_for_each_entry_safe(rq, n, &svc->srv_at_list, rq_timed_list) {
CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
"replies\n", first, at_extra, counter);
-
+
if (first < 0) {
- /* We're already past request deadlines before we even get a
+ /* We're already past request deadlines before we even get a
chance to send early replies */
LCONSOLE_WARN("%s: This server is not able to keep up with "
"request traffic (cpu-bound).\n", svc->srv_name);
rc = ptlrpc_check_req(req);
class_export_put(req->rq_export);
req->rq_export = NULL;
- if (rc)
+ if (rc)
goto err_req;
}
/* req_in handling should/must be fast */
- if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
+ if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
/* Set rpc server deadline and add it to the timed list */
deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
- MSGHDR_AT_SUPPORT) ?
+ MSGHDR_AT_SUPPORT) ?
/* The max time the client expects us to take */
lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
goto err_req;
}
-
+
ptlrpc_at_add_timed(req);
/* Move it over to the request processing queue */
svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
/* Don't handle regular requests in the last thread, in order * remain free to handle any 'difficult' replies (that might
* to handle difficult replies (which might block other threads)
- * as well as handle any incoming reqs, early replies, etc.
+ * as well as handle any incoming reqs, early replies, etc.
* That means we always need at least 2 service threads. */
spin_unlock(&svc->srv_lock);
RETURN(0);
spin_unlock(&svc->srv_lock);
+ if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
+ libcfs_debug_dumplog();
+
do_gettimeofday(&work_start);
timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
if (svc->srv_stats != NULL) {
lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
at_get(&svc->srv_at_estimate));
}
-
+
CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
-
+
request->rq_svc_thread = thread;
request->rq_export = class_conn2export(
lustre_msg_get_handle(request->rq_reqmsg));
export = class_export_rpc_get(request->rq_export);
}
- /* Discard requests queued for longer than the deadline.
+ /* Discard requests queued for longer than the deadline.
The deadline is increased if we send an early reply. */
if (cfs_time_current_sec() > request->rq_deadline) {
DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
rc = svc->srv_handler(request);
-
+
request->rq_phase = RQ_PHASE_COMPLETE;
CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
request->rq_transno, request->rq_status,
- request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
+ request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
-999);
if (svc->srv_stats != NULL) {
__u32 op = lustre_msg_get_opc(request->rq_reqmsg);
}
}
if (request->rq_early_count) {
- DEBUG_REQ(D_ADAPTTO, request,
+ DEBUG_REQ(D_ADAPTTO, request,
"sent %d early replies before finishing in %lds",
request->rq_early_count,
work_end.tv_sec - request->rq_arrival_time.tv_sec);
*/
cfs_waitq_signal(&thread->t_ctl_waitq);
- watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate)) *
+ watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
+ at_get(&svc->srv_at_estimate)) *
svc->srv_watchdog_factor, NULL, NULL);
spin_lock(&svc->srv_lock);
&lwi);
lc_watchdog_touch_ms(watchdog, max_t(int, obd_timeout,
- AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate)) *
+ AT_OFF ? 0 :
+ at_get(&svc->srv_at_estimate)) *
svc->srv_watchdog_factor);
ptlrpc_check_rqbd_pool(svc);
if (!list_empty(&svc->srv_req_in_queue)) {
/* Process all incoming reqs before handling any */
ptlrpc_server_handle_req_in(svc);
- /* but limit ourselves in case of flood */
+ /* but limit ourselves in case of flood */
if (counter++ < 1000)
continue;
counter = 0;
}
- if (svc->srv_at_check)
+ if (svc->srv_at_check)
ptlrpc_at_check_timed(svc);
/* don't handle requests in the last thread */
d.thread = thread;
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
+
/* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in ptlrpc_daemonize() right away.
*/
* its 'unlink' flag set for each posted rqbd */
list_for_each(tmp, &service->srv_active_rqbds) {
struct ptlrpc_request_buffer_desc *rqbd =
- list_entry(tmp, struct ptlrpc_request_buffer_desc,
+ list_entry(tmp, struct ptlrpc_request_buffer_desc,
rqbd_list);
rc = LNetMDUnlink(rqbd->rqbd_md_h);
spin_unlock(&svc->srv_lock);
return 0;
}
-
+
/* How long has the next entry been waiting? */
request = list_entry(svc->srv_request_queue.next,
struct ptlrpc_request, rq_list);
timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
spin_unlock(&svc->srv_lock);
- if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
+ if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
at_max)) {
CERROR("%s: unhealthy - request has been waiting %lds\n",
svc->srv_name, timediff / ONE_MILLION);