*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_RPC
static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
+static int ptlrpc_start_threads(struct ptlrpc_service *svc);
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
/** Holds a list of all PTLRPC services */
LIST_HEAD(ptlrpc_all_services);
return posted;
}
- rqbd = list_entry(svcpt->scp_rqbd_idle.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ rqbd = list_first_entry(&svcpt->scp_rqbd_idle,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
/* assume we will post successfully */
svcpt->scp_nrqbds_posted++;
failed:
if (array->paa_reqs_count != NULL) {
- OBD_FREE(array->paa_reqs_count, sizeof(__u32) * size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_count, size);
array->paa_reqs_count = NULL;
}
if (array->paa_reqs_array != NULL) {
- OBD_FREE(array->paa_reqs_array,
- sizeof(struct list_head) * array->paa_size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_array, array->paa_size);
array->paa_reqs_array = NULL;
}
strlen(cconf->cc_pattern),
0, ncpts - 1, &el);
if (rc != 0) {
- CERROR("%s: invalid CPT pattern string: %s",
+ CERROR("%s: invalid CPT pattern string: %s\n",
conf->psc_name, cconf->cc_pattern);
RETURN(ERR_PTR(-EINVAL));
}
CERROR("%s: failed to parse CPT array %s: %d\n",
conf->psc_name, cconf->cc_pattern, rc);
if (cpts != NULL)
- OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+ OBD_FREE_PTR_ARRAY(cpts, ncpts);
RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL));
}
ncpts = rc;
OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts]));
if (service == NULL) {
if (cpts != NULL)
- OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+ OBD_FREE_PTR_ARRAY(cpts, ncpts);
RETURN(ERR_PTR(-ENOMEM));
}
struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
int refcount;
- struct list_head *tmp;
- struct list_head *nxt;
if (!atomic_dec_and_test(&req->rq_refcount))
return;
* I expect only about 1 or 2 rqbds need to be recycled here
*/
while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
- rqbd = list_entry(svcpt->scp_hist_rqbds.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ rqbd = list_first_entry(&svcpt->scp_hist_rqbds,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
list_del(&rqbd->rqbd_list);
svcpt->scp_hist_nrqbds--;
* remove rqbd's reqs from svc's req history while
* I've got the service lock
*/
- list_for_each(tmp, &rqbd->rqbd_reqs) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_list);
+ list_for_each_entry(req, &rqbd->rqbd_reqs, rq_list) {
/* Track the highest culled req seq */
if (req->rq_history_seq >
svcpt->scp_hist_seq_culled) {
spin_unlock(&svcpt->scp_lock);
- list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
- req = list_entry(rqbd->rqbd_reqs.next,
- struct ptlrpc_request,
- rq_list);
+ while ((req = list_first_entry_or_null(
+ &rqbd->rqbd_reqs,
+ struct ptlrpc_request, rq_list))) {
list_del(&req->rq_list);
ptlrpc_server_free_request(req);
}
ENTRY;
- if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
+ if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT) ||
+ CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
/* don't send early reply */
RETURN(1);
}
* we took additional refcount so entries can't be deleted from list, no
* locking is needed
*/
- while (!list_empty(&work_list)) {
- rq = list_entry(work_list.next, struct ptlrpc_request,
- rq_timed_list);
+ while ((rq = list_first_entry_or_null(&work_list,
+ struct ptlrpc_request,
+ rq_timed_list)) != NULL) {
list_del_init(&rq->rq_timed_list);
if (ptlrpc_at_send_early_reply(rq) == 0)
return NULL;
/*
- * bulk request are aborted upon reconnect, don't try to
- * find a match
- */
- if (req->rq_bulk_write || req->rq_bulk_read)
- return NULL;
-
- /*
* This list should not be longer than max_requests in
* flights on the client, so it is not all that long.
* Also we only hit this codepath in case of a resent
ptlrpc_at_remove_timed(orig);
spin_unlock(&orig->rq_rqbd->rqbd_svcpt->scp_at_lock);
orig->rq_deadline = req->rq_deadline;
+ orig->rq_rep_mbits = req->rq_rep_mbits;
if (likely(linked))
ptlrpc_at_add_timed(orig);
ptlrpc_server_drop_request(orig);
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_request *req;
__u32 deadline;
+ __u32 opc;
int rc;
ENTRY;
RETURN(0);
}
- req = list_entry(svcpt->scp_req_incoming.next,
- struct ptlrpc_request, rq_list);
+ req = list_first_entry(&svcpt->scp_req_incoming,
+ struct ptlrpc_request, rq_list);
list_del_init(&req->rq_list);
svcpt->scp_nreqs_incoming--;
/*
goto err_req;
}
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
- lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
+ opc == cfs_fail_val) {
CERROR("drop incoming rpc opc %u, x%llu\n",
cfs_fail_val, req->rq_xid);
goto err_req;
goto err_req;
}
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+ switch (opc) {
case MDS_WRITEPAGE:
case OST_WRITE:
case OUT_UPDATE:
thread->t_env->le_ses = &req->rq_session;
}
+
+ if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND) &&
+ (opc == LDLM_ENQUEUE) &&
+ (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+ OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, 6);
+
ptlrpc_at_add_timed(req);
+ if (opc != OST_CONNECT && opc != MDS_CONNECT &&
+ opc != MGS_CONNECT && req->rq_export != NULL) {
+ if (exp_connect_flags2(req->rq_export) & OBD_CONNECT2_REP_MBITS)
+ req->rq_rep_mbits = lustre_msg_get_mbits(req->rq_reqmsg);
+ }
+
/* Move it over to the request processing queue */
rc = ptlrpc_server_request_add(svcpt, req);
if (rc)
}
}
-static void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout)
+void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout)
{
INIT_DELAYED_WORK(work, ptlrpc_watchdog_fire);
schedule_delayed_work(work, cfs_time_seconds(timeout));
}
-static void ptlrpc_watchdog_disable(struct delayed_work *work)
+void ptlrpc_watchdog_disable(struct delayed_work *work)
{
cancel_delayed_work_sync(work);
}
-static void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout)
+void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout)
{
struct ptlrpc_thread *thread = container_of(&work->work,
struct ptlrpc_thread,
thread->t_task = current;
thread->t_pid = current->pid;
- unshare_fs_struct();
if (svc->srv_cpt_bind) {
rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
if (env == NULL)
RETURN(-ENOMEM);
- unshare_fs_struct();
-
rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
if (rc != 0) {
char threadname[20];
if (hrp->hrp_thrs == NULL)
continue; /* uninitialized */
for (j = 0; j < hrp->hrp_nthrs; j++)
- wake_up_all(&hrp->hrp_thrs[j].hrt_waitq);
+ wake_up(&hrp->hrp_thrs[j].hrt_waitq);
}
cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
wake_up_all(&svcpt->scp_waitq);
- while (!list_empty(&svcpt->scp_threads)) {
- thread = list_entry(svcpt->scp_threads.next,
- struct ptlrpc_thread, t_link);
+ while ((thread = list_first_entry_or_null(&svcpt->scp_threads,
+ struct ptlrpc_thread,
+ t_link)) != NULL) {
if (thread_is_stopped(thread)) {
list_move(&thread->t_link, &zombie);
continue;
spin_unlock(&svcpt->scp_lock);
- while (!list_empty(&zombie)) {
- thread = list_entry(zombie.next,
- struct ptlrpc_thread, t_link);
+ while ((thread = list_first_entry_or_null(&zombie,
+ struct ptlrpc_thread,
+ t_link)) != NULL) {
list_del(&thread->t_link);
OBD_FREE_PTR(thread);
}
/**
* Stops all threads of a particular service \a svc
*/
-void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_service_part *svcpt;
int i;
EXIT;
}
-int ptlrpc_start_threads(struct ptlrpc_service *svc)
+static int ptlrpc_start_threads(struct ptlrpc_service *svc)
{
int rc = 0;
int i;
RETURN(rc);
}
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
{
struct ptlrpc_thread *thread;
struct ptlrpc_service *svc;
ptlrpc_stop_hr_threads();
cfs_percpt_for_each(hrp, cpt, ptlrpc_hr.hr_partitions) {
- if (hrp->hrp_thrs != NULL) {
- OBD_FREE(hrp->hrp_thrs,
- hrp->hrp_nthrs * sizeof(hrp->hrp_thrs[0]));
- }
+ if (hrp->hrp_thrs)
+ OBD_FREE_PTR_ARRAY(hrp->hrp_thrs, hrp->hrp_nthrs);
}
cfs_percpt_free(ptlrpc_hr.hr_partitions);
*/
spin_lock(&svcpt->scp_lock);
while (svcpt->scp_nrqbds_posted != 0) {
- int seconds = LONG_UNLINK;
+ int seconds = PTLRPC_REQ_LONG_UNLINK;
spin_unlock(&svcpt->scp_lock);
/*
break;
spin_lock(&svcpt->scp_rep_lock);
- while (!list_empty(&svcpt->scp_rep_active)) {
- rs = list_entry(svcpt->scp_rep_active.next,
- struct ptlrpc_reply_state, rs_list);
+ while ((rs = list_first_entry_or_null(&svcpt->scp_rep_active,
+ struct ptlrpc_reply_state,
+ rs_list)) != NULL) {
spin_lock(&rs->rs_lock);
ptlrpc_schedule_difficult_reply(rs);
spin_unlock(&rs->rs_lock);
* all unlinked) and no service threads, so I'm the only
* thread noodling the request queue now
*/
- while (!list_empty(&svcpt->scp_req_incoming)) {
- req = list_entry(svcpt->scp_req_incoming.next,
- struct ptlrpc_request, rq_list);
-
+ while ((req = list_first_entry_or_null(&svcpt->scp_req_incoming,
+ struct ptlrpc_request,
+ rq_list)) != NULL) {
list_del(&req->rq_list);
svcpt->scp_nreqs_incoming--;
ptlrpc_server_finish_request(svcpt, req);
ptlrpc_server_finish_active_request(svcpt, req);
}
- LASSERT(list_empty(&svcpt->scp_rqbd_posted));
+ /*
+ * The portal may be shared by several services (eg:OUT_PORTAL).
+ * So the request could be referenced by other target. So we
+ * have to wait the ptlrpc_server_drop_request invoked.
+ *
+ * TODO: move the req_buffer as global rather than per service.
+ */
+ spin_lock(&svcpt->scp_lock);
+ while (!list_empty(&svcpt->scp_rqbd_posted)) {
+ spin_unlock(&svcpt->scp_lock);
+ wait_event_idle_timeout(svcpt->scp_waitq,
+ list_empty(&svcpt->scp_rqbd_posted),
+ cfs_time_seconds(1));
+ spin_lock(&svcpt->scp_lock);
+ }
+ spin_unlock(&svcpt->scp_lock);
+
LASSERT(svcpt->scp_nreqs_incoming == 0);
LASSERT(svcpt->scp_nreqs_active == 0);
/*
* Now free all the request buffers since nothing
* references them any more...
*/
-
- while (!list_empty(&svcpt->scp_rqbd_idle)) {
- rqbd = list_entry(svcpt->scp_rqbd_idle.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ while ((rqbd = list_first_entry_or_null(&svcpt->scp_rqbd_idle,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list)) != NULL)
ptlrpc_free_rqbd(rqbd);
- }
+
ptlrpc_wait_replies(svcpt);
- while (!list_empty(&svcpt->scp_rep_idle)) {
- rs = list_entry(svcpt->scp_rep_idle.next,
- struct ptlrpc_reply_state,
- rs_list);
+ while ((rs = list_first_entry_or_null(&svcpt->scp_rep_idle,
+ struct ptlrpc_reply_state,
+ rs_list)) != NULL) {
list_del(&rs->rs_list);
OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
}
array = &svcpt->scp_at_array;
if (array->paa_reqs_array != NULL) {
- OBD_FREE(array->paa_reqs_array,
- sizeof(struct list_head) * array->paa_size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_array,
+ array->paa_size);
array->paa_reqs_array = NULL;
}
if (array->paa_reqs_count != NULL) {
- OBD_FREE(array->paa_reqs_count,
- sizeof(__u32) * array->paa_size);
+ OBD_FREE_PTR_ARRAY(array->paa_reqs_count,
+ array->paa_size);
array->paa_reqs_count = NULL;
}
}