#include <lu_object.h>
#include <uapi/linux/lnet/lnet-types.h>
#include "ptlrpc_internal.h"
+#include <linux/delay.h>
/* The following are visible and mutable through /sys/module/ptlrpc */
int test_req_buffer_pressure = 0;
rqbd = list_entry(svcpt->scp_rqbd_idle.next,
struct ptlrpc_request_buffer_desc,
rqbd_list);
- list_del(&rqbd->rqbd_list);
/* assume we will post successfully */
svcpt->scp_nrqbds_posted++;
- list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_posted);
+ list_move(&rqbd->rqbd_list, &svcpt->scp_rqbd_posted);
spin_unlock(&svcpt->scp_lock);
spin_lock(&svcpt->scp_lock);
svcpt->scp_nrqbds_posted--;
- list_del(&rqbd->rqbd_list);
- list_add_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
+ list_move_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
/*
* Don't complain if no request buffers are posted right now; LNET
* have too many threads no matter how many cores/HTs
* there are.
*/
+ preempt_disable();
if (cpumask_weight
(topology_sibling_cpumask(smp_processor_id())) > 1) {
/* weight is # of HTs */
/* depress thread factor for hyper-thread */
factor = factor - (factor >> 1) + (factor >> 3);
}
+ preempt_enable();
weight = cfs_cpt_weight(svc->srv_cptable, 0);
refcount = --(rqbd->rqbd_refcount);
if (refcount == 0) {
/* request buffer is now idle: add to history */
- list_del(&rqbd->rqbd_list);
-
- list_add_tail(&rqbd->rqbd_list, &svcpt->scp_hist_rqbds);
+ list_move_tail(&rqbd->rqbd_list, &svcpt->scp_hist_rqbds);
svcpt->scp_hist_nrqbds++;
/*
}
}
+static void ptlrpc_add_exp_list_nolock(struct ptlrpc_request *req,
+ struct obd_export *export, bool hp)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ if (hp)
+ list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+ else
+ list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+ if (tag && export->exp_used_slots)
+ set_bit(tag - 1, export->exp_used_slots);
+}
+
+static void ptlrpc_del_exp_list(struct ptlrpc_request *req)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ spin_lock(&req->rq_export->exp_rpc_lock);
+ list_del_init(&req->rq_exp_list);
+ if (tag && !req->rq_obsolete && req->rq_export->exp_used_slots)
+ clear_bit(tag - 1, req->rq_export->exp_used_slots);
+ spin_unlock(&req->rq_export->exp_rpc_lock);
+}
+
/** Change request export and move hp request from old export to new */
void ptlrpc_request_change_export(struct ptlrpc_request *req,
struct obd_export *export)
if (req->rq_export != NULL) {
LASSERT(!list_empty(&req->rq_exp_list));
/* remove rq_exp_list from last export */
- spin_lock(&req->rq_export->exp_rpc_lock);
- list_del_init(&req->rq_exp_list);
- spin_unlock(&req->rq_export->exp_rpc_lock);
- /*
- * export has one reference already, so it`s safe to
+ ptlrpc_del_exp_list(req);
+ /* export has one reference already, so it's safe to
* add req to export queue here and get another
* reference for request later
*/
spin_lock(&export->exp_rpc_lock);
- if (req->rq_ops != NULL) /* hp request */
- list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
- else
- list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+ ptlrpc_add_exp_list_nolock(req, export, req->rq_ops != NULL);
spin_unlock(&export->exp_rpc_lock);
class_export_rpc_dec(req->rq_export);
/* request takes one export refcount */
req->rq_export = class_export_get(export);
class_export_rpc_inc(export);
-
- return;
}
/**
RETURN(0);
if (olddl < 0) {
+ /* below message is checked in replay-ost-single.sh test_9 */
DEBUG_REQ(D_WARNING, req,
"Already past deadline (%+llds), not sending early reply. Consider increasing at_early_margin (%d)?",
(s64)olddl, at_early_margin);
* we may be past adaptive_max
*/
if (req->rq_deadline >= newdl) {
- DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%lld/%lld), not sending early reply\n",
+ DEBUG_REQ(D_WARNING, req,
+ "Could not add any time (%lld/%lld), not sending early reply",
(s64)olddl, (s64)(newdl - ktime_get_real_seconds()));
RETURN(-ETIMEDOUT);
}
GOTO(out, rc = -ETIMEDOUT);
LASSERT(atomic_read(&req->rq_refcount));
- /** if it is last refcount then early reply isn't needed */
+ /* if it is last refcount then early reply isn't needed */
if (atomic_read(&req->rq_refcount) == 1) {
DEBUG_REQ(D_ADAPTTO, reqcopy,
- "Normal reply already sent out, abort sending early reply\n");
+ "Normal reply already sent, abort early reply");
GOTO(out, rc = -EINVAL);
}
req->rq_deadline = newdl;
req->rq_early_count++; /* number sent, server side */
} else {
- DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
+ DEBUG_REQ(D_ERROR, req, "Early reply send failed: rc = %d", rc);
}
/*
return tmp;
}
+#ifdef HAVE_SERVER_SUPPORT
+static void ptlrpc_server_mark_obsolete(struct ptlrpc_request *req)
+{
+ req->rq_obsolete = 1;
+}
+
+static void
+ptlrpc_server_mark_in_progress_obsolete(struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *tmp = NULL;
+ __u16 tag;
+
+ if (!tgt_is_increasing_xid_client(req->rq_export) ||
+ req->rq_export->exp_used_slots == NULL)
+ return;
+
+ tag = lustre_msg_get_tag(req->rq_reqmsg);
+ if (tag == 0)
+ return;
+
+ if (!test_bit(tag - 1, req->rq_export->exp_used_slots))
+ return;
+
+ /* This list should not be longer than max_requests in
+ * flights on the client, so it is not all that long.
+ * Also we only hit this codepath in case of a resent
+ * request which makes it even more rarely hit */
+ list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs, rq_exp_list) {
+ if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+ req->rq_xid > tmp->rq_xid)
+ ptlrpc_server_mark_obsolete(tmp);
+
+ }
+ list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs, rq_exp_list) {
+ if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+ req->rq_xid > tmp->rq_xid)
+ ptlrpc_server_mark_obsolete(tmp);
+ }
+}
+#endif
+
/**
* Check if a request should be assigned with a high priority.
*
if (req->rq_ops && req->rq_ops->hpreq_fini)
req->rq_ops->hpreq_fini(req);
- spin_lock(&req->rq_export->exp_rpc_lock);
- list_del_init(&req->rq_exp_list);
- spin_unlock(&req->rq_export->exp_rpc_lock);
+ ptlrpc_del_exp_list(req);
}
EXIT;
}
hp = rc > 0;
ptlrpc_nrs_req_initialize(svcpt, req, hp);
- if (req->rq_export != NULL) {
+ while (req->rq_export != NULL) {
struct obd_export *exp = req->rq_export;
/*
* atomically
*/
spin_lock_bh(&exp->exp_rpc_lock);
+#ifdef HAVE_SERVER_SUPPORT
+ ptlrpc_server_mark_in_progress_obsolete(req);
+#endif
orig = ptlrpc_server_check_resend_in_progress(req);
+ if (orig && OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+ spin_unlock_bh(&exp->exp_rpc_lock);
+
+ OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+ msleep(4 * MSEC_PER_SEC);
+ continue;
+ }
+
if (orig && likely(atomic_inc_not_zero(&orig->rq_refcount))) {
bool linked;
ptlrpc_at_add_timed(orig);
ptlrpc_server_drop_request(orig);
ptlrpc_nrs_req_finalize(req);
+
+ /* don't mark slot unused for resend in progress */
+ req->rq_obsolete = 1;
+
RETURN(-EBUSY);
}
- if (hp || req->rq_ops != NULL)
- list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
- else
- list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+ ptlrpc_add_exp_list_nolock(req, exp, hp || req->rq_ops != NULL);
+
spin_unlock_bh(&exp->exp_rpc_lock);
+ break;
}
/*
rc = sptlrpc_target_export_check(req->rq_export, req);
if (rc)
DEBUG_REQ(D_ERROR, req,
- "DROPPING req with illegal security flavor,");
+ "DROPPING req with illegal security flavor");
}
if (rc)
* The deadline is increased if we send an early reply.
*/
if (ktime_get_real_seconds() > request->rq_deadline) {
- DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s: deadline %lld:%llds ago\n",
+ DEBUG_REQ(D_ERROR, request,
+ "Dropping timed-out request from %s: deadline %lld/%llds ago",
libcfs_id2str(request->rq_peer),
request->rq_deadline -
request->rq_arrival_time.tv_sec,
}
CDEBUG(D_RPCTRACE,
- "Handling RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d\n",
- current_comm(),
+ "Handling RPC req@%p pname:cluuid+ref:pid:xid:nid:opc:job %s:%s+%d:%d:x%llu:%s:%d:%s\n",
+ request, current_comm(),
(request->rq_export ?
(char *)request->rq_export->exp_client_uuid.uuid : "0"),
(request->rq_export ?
- atomic_read(&request->rq_export->exp_refcount) : -99),
+ refcount_read(&request->rq_export->exp_handle.h_ref) : -99),
lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
libcfs_id2str(request->rq_peer),
- lustre_msg_get_opc(request->rq_reqmsg));
+ lustre_msg_get_opc(request->rq_reqmsg),
+ lustre_msg_get_jobid(request->rq_reqmsg) ?: "");
if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
put_conn:
if (unlikely(ktime_get_real_seconds() > request->rq_deadline)) {
DEBUG_REQ(D_WARNING, request,
- "Request took longer than estimated (%lld:%llds); "
- "client may timeout.",
+ "Request took longer than estimated (%lld/%llds); client may timeout",
request->rq_deadline -
request->rq_arrival_time.tv_sec,
ktime_get_real_seconds() - request->rq_deadline);
timediff_usecs = ktime_us_delta(work_end, work_start);
arrived_usecs = ktime_us_delta(work_end, arrived);
CDEBUG(D_RPCTRACE,
- "Handled RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d Request processed in %lldus (%lldus total) trans %llu rc %d/%d\n",
- current_comm(),
+ "Handled RPC req@%p pname:cluuid+ref:pid:xid:nid:opc:job %s:%s+%d:%d:x%llu:%s:%d:%s Request processed in %lldus (%lldus total) trans %llu rc %d/%d\n",
+ request, current_comm(),
(request->rq_export ?
(char *)request->rq_export->exp_client_uuid.uuid : "0"),
(request->rq_export ?
- atomic_read(&request->rq_export->exp_refcount) : -99),
+ refcount_read(&request->rq_export->exp_handle.h_ref) : -99),
lustre_msg_get_status(request->rq_reqmsg),
request->rq_xid,
libcfs_id2str(request->rq_peer),
lustre_msg_get_opc(request->rq_reqmsg),
+ lustre_msg_get_jobid(request->rq_reqmsg) ?: "",
timediff_usecs,
arrived_usecs,
(request->rq_repmsg ?
u64 ms_lapse = ktime_ms_delta(ktime_get(), thread->t_touched);
u32 ms_frac = do_div(ms_lapse, MSEC_PER_SEC);
- if (!__ratelimit(&watchdog_limit)) {
+ /* ___ratelimit() returns true if the action is NOT ratelimited */
+ if (__ratelimit(&watchdog_limit)) {
+ /* below message is checked in sanity-quota.sh test_6,18 */
LCONSOLE_WARN("%s: service thread pid %u was inactive for %llu.%03u seconds. The thread might be hung, or it might only be slow and will resume later. Dumping the stack trace for debugging purposes:\n",
thread->t_task->comm, thread->t_task->pid,
ms_lapse, ms_frac);
libcfs_debug_dumpstack(thread->t_task);
} else {
+ /* below message is checked in sanity-quota.sh test_6,18 */
LCONSOLE_WARN("%s: service thread pid %u was inactive for %llu.%03u seconds. Watchdog stack traces are limited to 3 per %u seconds, skipping this one.\n",
thread->t_task->comm, thread->t_task->pid,
ms_lapse, ms_frac, libcfs_watchdog_ratelimit);
/* reset le_ses to initial state */
env->le_ses = NULL;
+ /* Refill the context before execution to make sure
+ * all thread keys are allocated */
+ lu_env_refill(env);
/* Process all incoming reqs before handling any */
if (ptlrpc_server_request_incoming(svcpt)) {
lu_context_enter(&env->le_ctx);
struct ptlrpc_hr_thread *hrt = (struct ptlrpc_hr_thread *)arg;
struct ptlrpc_hr_partition *hrp = hrt->hrt_partition;
struct list_head replies;
+ struct lu_env *env;
int rc;
+ OBD_ALLOC_PTR(env);
+ if (env == NULL)
+ RETURN(-ENOMEM);
+
INIT_LIST_HEAD(&replies);
unshare_fs_struct();
threadname, hrp->hrp_cpt, ptlrpc_hr.hr_cpt_table, rc);
}
+ rc = lu_context_init(&env->le_ctx, LCT_MD_THREAD | LCT_DT_THREAD |
+ LCT_REMEMBER | LCT_NOREF);
+ if (rc)
+ GOTO(out_env, rc);
+
+ rc = lu_env_add(env);
+ if (rc)
+ GOTO(out_ctx_fini, rc);
+
atomic_inc(&hrp->hrp_nstarted);
wake_up(&ptlrpc_hr.hr_waitq);
struct ptlrpc_reply_state,
rs_list);
list_del_init(&rs->rs_list);
+ /* refill keys if needed */
+ lu_env_refill(env);
+ lu_context_enter(&env->le_ctx);
ptlrpc_handle_rs(rs);
+ lu_context_exit(&env->le_ctx);
}
}
atomic_inc(&hrp->hrp_nstopped);
wake_up(&ptlrpc_hr.hr_waitq);
+ lu_env_remove(env);
+out_ctx_fini:
+ lu_context_fini(&env->le_ctx);
+out_env:
+ OBD_FREE_PTR(env);
return 0;
}
static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
{
- struct l_wait_info lwi = { 0 };
struct ptlrpc_thread *thread;
struct list_head zombie;
thread = list_entry(svcpt->scp_threads.next,
struct ptlrpc_thread, t_link);
if (thread_is_stopped(thread)) {
- list_del(&thread->t_link);
- list_add(&thread->t_link, &zombie);
+ list_move(&thread->t_link, &zombie);
continue;
}
spin_unlock(&svcpt->scp_lock);
CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
svcpt->scp_service->srv_thread_name, thread->t_id);
- l_wait_event(thread->t_ctl_waitq,
- thread_is_stopped(thread), &lwi);
+ wait_event_idle(thread->t_ctl_waitq,
+ thread_is_stopped(thread));
spin_lock(&svcpt->scp_lock);
}
int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
{
- struct l_wait_info lwi = { 0 };
struct ptlrpc_thread *thread;
struct ptlrpc_service *svc;
struct task_struct *task;
if (!wait)
RETURN(0);
- l_wait_event(thread->t_ctl_waitq,
- thread_is_running(thread) || thread_is_stopped(thread),
- &lwi);
+ wait_event_idle(thread->t_ctl_waitq,
+ thread_is_running(thread) || thread_is_stopped(thread));
rc = thread_is_stopped(thread) ? thread->t_id : 0;
RETURN(rc);
init_waitqueue_head(&ptlrpc_hr.hr_waitq);
+ preempt_disable();
weight = cpumask_weight(topology_sibling_cpumask(smp_processor_id()));
+ preempt_enable();
cfs_percpt_for_each(hrp, cpt, ptlrpc_hr.hr_partitions) {
hrp->hrp_cpt = cpt;
static void ptlrpc_wait_replies(struct ptlrpc_service_part *svcpt)
{
while (1) {
- int rc;
- struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10),
- NULL, NULL);
-
- rc = l_wait_event(svcpt->scp_waitq,
- atomic_read(&svcpt->scp_nreps_difficult) == 0, &lwi);
- if (rc == 0)
+ if (wait_event_idle_timeout(
+ svcpt->scp_waitq,
+ atomic_read(&svcpt->scp_nreps_difficult) == 0,
+ cfs_time_seconds(10)) > 0)
break;
CWARN("Unexpectedly long timeout %s %p\n",
svcpt->scp_service->srv_name, svcpt->scp_service);
{
struct ptlrpc_service_part *svcpt;
struct ptlrpc_request_buffer_desc *rqbd;
- struct l_wait_info lwi;
int rc;
int i;
*/
spin_lock(&svcpt->scp_lock);
while (svcpt->scp_nrqbds_posted != 0) {
+ int seconds = LONG_UNLINK;
+
spin_unlock(&svcpt->scp_lock);
/*
* Network access will complete in finite time but
* the HUGE timeout lets us CWARN for visibility
* of sluggish NALs
*/
- lwi = LWI_TIMEOUT_INTERVAL(
- cfs_time_seconds(LONG_UNLINK),
- cfs_time_seconds(1), NULL, NULL);
- rc = l_wait_event(svcpt->scp_waitq,
- svcpt->scp_nrqbds_posted == 0, &lwi);
- if (rc == -ETIMEDOUT) {
+ while (seconds > 0 &&
+ wait_event_idle_timeout(
+ svcpt->scp_waitq,
+ svcpt->scp_nrqbds_posted == 0,
+ cfs_time_seconds(1)) == 0)
+ seconds -= 1;
+ if (seconds == 0) {
CWARN("Service %s waiting for request buffers\n",
svcpt->scp_service->srv_name);
}