*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_RPC
static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
+static int ptlrpc_start_threads(struct ptlrpc_service *svc);
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
/** Holds a list of all PTLRPC services */
LIST_HEAD(ptlrpc_all_services);
return posted;
}
- rqbd = list_entry(svcpt->scp_rqbd_idle.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ rqbd = list_first_entry(&svcpt->scp_rqbd_idle,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
/* assume we will post successfully */
svcpt->scp_nrqbds_posted++;
failed:
if (array->paa_reqs_count != NULL) {
- OBD_FREE(array->paa_reqs_count, sizeof(__u32) * size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_count, size);
array->paa_reqs_count = NULL;
}
if (array->paa_reqs_array != NULL) {
- OBD_FREE(array->paa_reqs_array,
- sizeof(struct list_head) * array->paa_size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_array, array->paa_size);
array->paa_reqs_array = NULL;
}
strlen(cconf->cc_pattern),
0, ncpts - 1, &el);
if (rc != 0) {
- CERROR("%s: invalid CPT pattern string: %s",
+ CERROR("%s: invalid CPT pattern string: %s\n",
conf->psc_name, cconf->cc_pattern);
RETURN(ERR_PTR(-EINVAL));
}
CERROR("%s: failed to parse CPT array %s: %d\n",
conf->psc_name, cconf->cc_pattern, rc);
if (cpts != NULL)
- OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+ OBD_FREE_PTR_ARRAY(cpts, ncpts);
RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL));
}
ncpts = rc;
OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts]));
if (service == NULL) {
if (cpts != NULL)
- OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+ OBD_FREE_PTR_ARRAY(cpts, ncpts);
RETURN(ERR_PTR(-ENOMEM));
}
struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
int refcount;
- struct list_head *tmp;
- struct list_head *nxt;
if (!atomic_dec_and_test(&req->rq_refcount))
return;
* I expect only about 1 or 2 rqbds need to be recycled here
*/
while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
- rqbd = list_entry(svcpt->scp_hist_rqbds.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ rqbd = list_first_entry(&svcpt->scp_hist_rqbds,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
list_del(&rqbd->rqbd_list);
svcpt->scp_hist_nrqbds--;
* remove rqbd's reqs from svc's req history while
* I've got the service lock
*/
- list_for_each(tmp, &rqbd->rqbd_reqs) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_list);
+ list_for_each_entry(req, &rqbd->rqbd_reqs, rq_list) {
/* Track the highest culled req seq */
if (req->rq_history_seq >
svcpt->scp_hist_seq_culled) {
spin_unlock(&svcpt->scp_lock);
- list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
- req = list_entry(rqbd->rqbd_reqs.next,
- struct ptlrpc_request,
- rq_list);
+ while ((req = list_first_entry_or_null(
+ &rqbd->rqbd_reqs,
+ struct ptlrpc_request, rq_list))) {
list_del(&req->rq_list);
ptlrpc_server_free_request(req);
}
struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
struct ptlrpc_request *reqcopy;
struct lustre_msg *reqmsg;
- time64_t olddl = req->rq_deadline - ktime_get_real_seconds();
+ timeout_t olddl = req->rq_deadline - ktime_get_real_seconds();
time64_t newdl;
int rc;
ENTRY;
- if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
+ if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT) ||
+ CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
/* don't send early reply */
RETURN(1);
}
* difference between clients' and servers' expectations
*/
DEBUG_REQ(D_ADAPTTO, req,
- "%ssending early reply (deadline %+llds, margin %+llds) for %d+%d",
+ "%ssending early reply (deadline %+ds, margin %+ds) for %d+%d",
AT_OFF ? "AT off - not " : "",
- (s64)olddl, (s64)(olddl - at_get(&svcpt->scp_at_estimate)),
+ olddl, olddl - at_get(&svcpt->scp_at_estimate),
at_get(&svcpt->scp_at_estimate), at_extra);
if (AT_OFF)
if (olddl < 0) {
/* below message is checked in replay-ost-single.sh test_9 */
DEBUG_REQ(D_WARNING, req,
- "Already past deadline (%+llds), not sending early reply. Consider increasing at_early_margin (%d)?",
- (s64)olddl, at_early_margin);
+ "Already past deadline (%+ds), not sending early reply. Consider increasing at_early_margin (%d)?",
+ olddl, at_early_margin);
/* Return an error so we're not re-added to the timed list. */
RETURN(-ETIMEDOUT);
*/
if (req->rq_deadline >= newdl) {
DEBUG_REQ(D_WARNING, req,
- "Could not add any time (%lld/%lld), not sending early reply",
- (s64)olddl, (s64)(newdl - ktime_get_real_seconds()));
+ "Could not add any time (%d/%lld), not sending early reply",
+ olddl, newdl - ktime_get_real_seconds());
RETURN(-ETIMEDOUT);
}
__u32 index, count;
time64_t deadline;
time64_t now = ktime_get_real_seconds();
- s64 delay;
+ s64 delay_ms;
int first, counter = 0;
ENTRY;
spin_unlock(&svcpt->scp_at_lock);
RETURN(0);
}
- delay = ktime_ms_delta(ktime_get(), svcpt->scp_at_checktime);
+ delay_ms = ktime_ms_delta(ktime_get(), svcpt->scp_at_checktime);
svcpt->scp_at_check = 0;
if (array->paa_count == 0) {
*/
LCONSOLE_WARN("%s: This server is not able to keep up with request traffic (cpu-bound).\n",
svcpt->scp_service->srv_name);
- CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%lld\n",
+ CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%lldms\n",
counter, svcpt->scp_nreqs_incoming,
svcpt->scp_nreqs_active,
- at_get(&svcpt->scp_at_estimate), delay);
+ at_get(&svcpt->scp_at_estimate), delay_ms);
}
/*
* we took additional refcount so entries can't be deleted from list, no
* locking is needed
*/
- while (!list_empty(&work_list)) {
- rq = list_entry(work_list.next, struct ptlrpc_request,
- rq_timed_list);
+ while ((rq = list_first_entry_or_null(&work_list,
+ struct ptlrpc_request,
+ rq_timed_list)) != NULL) {
list_del_init(&rq->rq_timed_list);
if (ptlrpc_at_send_early_reply(rq) == 0)
return NULL;
/*
- * bulk request are aborted upon reconnect, don't try to
- * find a match
- */
- if (req->rq_bulk_write || req->rq_bulk_read)
- return NULL;
-
- /*
* This list should not be longer than max_requests in
* flights on the client, so it is not all that long.
* Also we only hit this codepath in case of a resent
ptlrpc_at_remove_timed(orig);
spin_unlock(&orig->rq_rqbd->rqbd_svcpt->scp_at_lock);
orig->rq_deadline = req->rq_deadline;
+ orig->rq_rep_mbits = req->rq_rep_mbits;
if (likely(linked))
ptlrpc_at_add_timed(orig);
ptlrpc_server_drop_request(orig);
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_request *req;
__u32 deadline;
+ __u32 opc;
int rc;
ENTRY;
RETURN(0);
}
- req = list_entry(svcpt->scp_req_incoming.next,
- struct ptlrpc_request, rq_list);
+ req = list_first_entry(&svcpt->scp_req_incoming,
+ struct ptlrpc_request, rq_list);
list_del_init(&req->rq_list);
svcpt->scp_nreqs_incoming--;
/*
goto err_req;
}
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
- lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
+ opc == cfs_fail_val) {
CERROR("drop incoming rpc opc %u, x%llu\n",
cfs_fail_val, req->rq_xid);
goto err_req;
goto err_req;
}
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+ switch (opc) {
case MDS_WRITEPAGE:
case OST_WRITE:
case OUT_UPDATE:
/* req_in handling should/must be fast */
if (ktime_get_real_seconds() - req->rq_arrival_time.tv_sec > 5)
DEBUG_REQ(D_WARNING, req, "Slow req_in handling %llds",
- (s64)(ktime_get_real_seconds() -
- req->rq_arrival_time.tv_sec));
+ ktime_get_real_seconds() -
+ req->rq_arrival_time.tv_sec);
/* Set rpc server deadline and add it to the timed list */
deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
thread->t_env->le_ses = &req->rq_session;
}
+
+ if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND) &&
+ (opc == LDLM_ENQUEUE) &&
+ (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+ OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, 6);
+
ptlrpc_at_add_timed(req);
+ if (opc != OST_CONNECT && opc != MDS_CONNECT &&
+ opc != MGS_CONNECT && req->rq_export != NULL) {
+ if (exp_connect_flags2(req->rq_export) & OBD_CONNECT2_REP_MBITS)
+ req->rq_rep_mbits = lustre_msg_get_mbits(req->rq_reqmsg);
+ }
+
/* Move it over to the request processing queue */
rc = ptlrpc_server_request_add(svcpt, req);
if (rc)
}
}
-static void ptlrpc_watchdog_init(struct delayed_work *work, time_t time)
+void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout)
{
INIT_DELAYED_WORK(work, ptlrpc_watchdog_fire);
- schedule_delayed_work(work, cfs_time_seconds(time));
+ schedule_delayed_work(work, cfs_time_seconds(timeout));
}
-static void ptlrpc_watchdog_disable(struct delayed_work *work)
+void ptlrpc_watchdog_disable(struct delayed_work *work)
{
cancel_delayed_work_sync(work);
}
-static void ptlrpc_watchdog_touch(struct delayed_work *work, time_t time)
+void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout)
{
struct ptlrpc_thread *thread = container_of(&work->work,
struct ptlrpc_thread,
t_watchdog.work);
thread->t_touched = ktime_get();
- mod_delayed_work(system_wq, work, cfs_time_seconds(time));
+ mod_delayed_work(system_wq, work, cfs_time_seconds(timeout));
}
/**
thread->t_task = current;
thread->t_pid = current->pid;
- unshare_fs_struct();
if (svc->srv_cpt_bind) {
rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
if (env == NULL)
RETURN(-ENOMEM);
- unshare_fs_struct();
-
rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
if (rc != 0) {
char threadname[20];
if (hrp->hrp_thrs == NULL)
continue; /* uninitialized */
for (j = 0; j < hrp->hrp_nthrs; j++)
- wake_up_all(&hrp->hrp_thrs[j].hrt_waitq);
+ wake_up(&hrp->hrp_thrs[j].hrt_waitq);
}
cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
wake_up_all(&svcpt->scp_waitq);
- while (!list_empty(&svcpt->scp_threads)) {
- thread = list_entry(svcpt->scp_threads.next,
- struct ptlrpc_thread, t_link);
+ while ((thread = list_first_entry_or_null(&svcpt->scp_threads,
+ struct ptlrpc_thread,
+ t_link)) != NULL) {
if (thread_is_stopped(thread)) {
list_move(&thread->t_link, &zombie);
continue;
spin_unlock(&svcpt->scp_lock);
- while (!list_empty(&zombie)) {
- thread = list_entry(zombie.next,
- struct ptlrpc_thread, t_link);
+ while ((thread = list_first_entry_or_null(&zombie,
+ struct ptlrpc_thread,
+ t_link)) != NULL) {
list_del(&thread->t_link);
OBD_FREE_PTR(thread);
}
/**
* Stops all threads of a particular service \a svc
*/
-void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_service_part *svcpt;
int i;
EXIT;
}
-int ptlrpc_start_threads(struct ptlrpc_service *svc)
+static int ptlrpc_start_threads(struct ptlrpc_service *svc)
{
int rc = 0;
int i;
RETURN(rc);
}
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
{
struct ptlrpc_thread *thread;
struct ptlrpc_service *svc;
ptlrpc_stop_hr_threads();
cfs_percpt_for_each(hrp, cpt, ptlrpc_hr.hr_partitions) {
- if (hrp->hrp_thrs != NULL) {
- OBD_FREE(hrp->hrp_thrs,
- hrp->hrp_nthrs * sizeof(hrp->hrp_thrs[0]));
- }
+ if (hrp->hrp_thrs)
+ OBD_FREE_PTR_ARRAY(hrp->hrp_thrs, hrp->hrp_nthrs);
}
cfs_percpt_free(ptlrpc_hr.hr_partitions);
*/
spin_lock(&svcpt->scp_lock);
while (svcpt->scp_nrqbds_posted != 0) {
- int seconds = LONG_UNLINK;
+ int seconds = PTLRPC_REQ_LONG_UNLINK;
spin_unlock(&svcpt->scp_lock);
/*
break;
spin_lock(&svcpt->scp_rep_lock);
- while (!list_empty(&svcpt->scp_rep_active)) {
- rs = list_entry(svcpt->scp_rep_active.next,
- struct ptlrpc_reply_state, rs_list);
+ while ((rs = list_first_entry_or_null(&svcpt->scp_rep_active,
+ struct ptlrpc_reply_state,
+ rs_list)) != NULL) {
spin_lock(&rs->rs_lock);
ptlrpc_schedule_difficult_reply(rs);
spin_unlock(&rs->rs_lock);
* all unlinked) and no service threads, so I'm the only
* thread noodling the request queue now
*/
- while (!list_empty(&svcpt->scp_req_incoming)) {
- req = list_entry(svcpt->scp_req_incoming.next,
- struct ptlrpc_request, rq_list);
-
+ while ((req = list_first_entry_or_null(&svcpt->scp_req_incoming,
+ struct ptlrpc_request,
+ rq_list)) != NULL) {
list_del(&req->rq_list);
svcpt->scp_nreqs_incoming--;
ptlrpc_server_finish_request(svcpt, req);
ptlrpc_server_finish_active_request(svcpt, req);
}
- LASSERT(list_empty(&svcpt->scp_rqbd_posted));
+ /*
+ * The portal may be shared by several services (eg:OUT_PORTAL).
+ * So the request could be referenced by other target. So we
+ * have to wait the ptlrpc_server_drop_request invoked.
+ *
+ * TODO: move the req_buffer as global rather than per service.
+ */
+ spin_lock(&svcpt->scp_lock);
+ while (!list_empty(&svcpt->scp_rqbd_posted)) {
+ spin_unlock(&svcpt->scp_lock);
+ wait_event_idle_timeout(svcpt->scp_waitq,
+ list_empty(&svcpt->scp_rqbd_posted),
+ cfs_time_seconds(1));
+ spin_lock(&svcpt->scp_lock);
+ }
+ spin_unlock(&svcpt->scp_lock);
+
LASSERT(svcpt->scp_nreqs_incoming == 0);
LASSERT(svcpt->scp_nreqs_active == 0);
/*
* Now free all the request buffers since nothing
* references them any more...
*/
-
- while (!list_empty(&svcpt->scp_rqbd_idle)) {
- rqbd = list_entry(svcpt->scp_rqbd_idle.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ while ((rqbd = list_first_entry_or_null(&svcpt->scp_rqbd_idle,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list)) != NULL)
ptlrpc_free_rqbd(rqbd);
- }
+
ptlrpc_wait_replies(svcpt);
- while (!list_empty(&svcpt->scp_rep_idle)) {
- rs = list_entry(svcpt->scp_rep_idle.next,
- struct ptlrpc_reply_state,
- rs_list);
+ while ((rs = list_first_entry_or_null(&svcpt->scp_rep_idle,
+ struct ptlrpc_reply_state,
+ rs_list)) != NULL) {
list_del(&rs->rs_list);
OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
}
array = &svcpt->scp_at_array;
if (array->paa_reqs_array != NULL) {
- OBD_FREE(array->paa_reqs_array,
- sizeof(struct list_head) * array->paa_size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_array,
+ array->paa_size);
array->paa_reqs_array = NULL;
}
if (array->paa_reqs_count != NULL) {
- OBD_FREE(array->paa_reqs_count,
- sizeof(__u32) * array->paa_size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_count,
+ array->paa_size);
array->paa_reqs_count = NULL;
}
}