/*
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
{
- /* if we kill request before timeout - need adjust counter */
- if (req->rq_phase == RQ_PHASE_RPC) {
- struct ptlrpc_request_set *set = req->rq_set;
+ struct ptlrpc_request_set *set = req->rq_set;
+ int wakeup = 0;
- if (set)
- cfs_atomic_dec(&set->set_remaining);
- }
+ /* hold ref on the request to prevent others (ptlrpcd) to free it */
+ ptlrpc_request_addref(req);
+ cfs_list_del_init(&req->rq_list);
+
+ /* if we kill request before timeout - need adjust counter */
+ if (req->rq_phase == RQ_PHASE_RPC && set != NULL &&
+ cfs_atomic_dec_and_test(&set->set_remaining))
+ wakeup = 1;
ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
- cfs_list_del_init(&req->rq_list);
+
+ /* Only need to call wakeup once when to be empty. */
+ if (wakeup)
+ cfs_waitq_signal(&set->set_waitq);
+ ptlrpc_req_finished(req);
}
/**
OBD_ALLOC(set, sizeof *set);
if (!set)
RETURN(NULL);
+ cfs_atomic_set(&set->set_refcount, 1);
CFS_INIT_LIST_HEAD(&set->set_requests);
cfs_waitq_init(&set->set_waitq);
+ cfs_atomic_set(&set->set_new_count, 0);
cfs_atomic_set(&set->set_remaining, 0);
cfs_spin_lock_init(&set->set_new_req_lock);
CFS_INIT_LIST_HEAD(&set->set_new_requests);
LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
- OBD_FREE(set, sizeof(*set));
+ ptlrpc_reqset_put(set);
EXIT;
}
void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
struct ptlrpc_request *req)
{
+ LASSERT(cfs_list_empty(&req->rq_set_chain));
+
/* The set takes over the caller's request reference */
cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
req->rq_set = set;
cfs_atomic_inc(&set->set_remaining);
- req->rq_queued_time = cfs_time_current(); /* Where is the best place to set this? */
+ req->rq_queued_time = cfs_time_current();
}
/**
* Add a request to a request with dedicated server thread
* and wake the thread to make any necessary processing.
* Currently only used for ptlrpcd.
- * Returns 0 if succesful or non zero error code on error.
- * (the only possible error for now is if the dedicated server thread
- * is shutting down)
*/
-int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
struct ptlrpc_request *req)
{
struct ptlrpc_request_set *set = pc->pc_set;
+ int count, i;
- /*
- * Let caller know that we stopped and will not handle this request.
- * It needs to take care itself of request.
- */
- if (cfs_test_bit(LIOD_STOP, &pc->pc_flags))
- return -EALREADY;
+ LASSERT(req->rq_set == NULL);
+ LASSERT(cfs_test_bit(LIOD_STOP, &pc->pc_flags) == 0);
cfs_spin_lock(&set->set_new_req_lock);
/*
* The set takes over the caller's request reference.
*/
- cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
req->rq_set = set;
+ req->rq_queued_time = cfs_time_current();
+ cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
+ count = cfs_atomic_inc_return(&set->set_new_count);
cfs_spin_unlock(&set->set_new_req_lock);
- cfs_waitq_signal(&set->set_waitq);
- return 0;
+ /* Only need to call wakeup once for the first entry. */
+ if (count == 1) {
+ cfs_waitq_signal(&set->set_waitq);
+
+ /* XXX: It maybe unnecessary to wakeup all the partners. But to
+ * guarantee the async RPC can be processed ASAP, we have
+ * no other better choice. It maybe fixed in future. */
+ for (i = 0; i < pc->pc_npartners; i++)
+ cfs_waitq_signal(&pc->pc_partners[i]->pc_set->set_waitq);
+ }
}
/**
req->rq_timedout = 1;
cfs_spin_unlock(&req->rq_lock);
- DEBUG_REQ(req->rq_fake ? D_INFO : D_WARNING, req, "Request x"LPU64
- " sent from %s to NID %s has %s: [sent "CFS_DURATION_T"] "
- "[real_sent "CFS_DURATION_T"] [current "CFS_DURATION_T"] "
- "[deadline "CFS_DURATION_T"s] [delay "CFS_DURATION_T"s]",
- req->rq_xid, imp ? imp->imp_obd->obd_name : "<?>",
- imp ? libcfs_nid2str(imp->imp_connection->c_peer.nid) : "<?>",
+ DEBUG_REQ(req->rq_fake ? D_INFO : D_WARNING, req, "Request "
+ " sent has %s: [sent "CFS_DURATION_T"/"
+ "real "CFS_DURATION_T"]",
req->rq_net_err ? "failed due to network error" :
((req->rq_real_sent == 0 ||
cfs_time_before(req->rq_real_sent, req->rq_sent) ||
cfs_time_aftereq(req->rq_real_sent, req->rq_deadline)) ?
"timed out for sent delay" : "timed out for slow reply"),
- req->rq_sent, req->rq_real_sent, cfs_time_current_sec(),
- cfs_time_sub(req->rq_deadline, req->rq_sent),
- cfs_time_sub(cfs_time_current_sec(), req->rq_deadline));
+ req->rq_sent, req->rq_real_sent);
if (imp != NULL && obd_debug_peer_on_timeout)
LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
cfs_list_t *tmp;
LASSERT(set != NULL);
- CERROR("INTERRUPTED SET %p\n", set);
+ CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
cfs_list_for_each(tmp, &set->set_requests) {
struct ptlrpc_request *req =
rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
+ /* LU-769 - if we ignored the signal because it was already
+ * pending when we started, we need to handle it now or we risk
+ * it being ignored forever */
+ if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
+ cfs_signal_pending()) {
+ cfs_sigset_t blocked_sigs =
+ cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+
+ /* In fact we only interrupt for the "fatal" signals
+ * like SIGINT or SIGKILL. We still ignore less
+ * important signals since ptlrpc set is not easily
+ * reentrant from userspace again */
+ if (cfs_signal_pending())
+ ptlrpc_interrupted_set(set);
+ cfs_block_sigs(blocked_sigs);
+ }
+
LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
/* -EINTR => all requests have been flagged rq_intr so next
if (rc != 0)
/* this replay failed, so restart recovery */
- ptlrpc_connect_import(imp, NULL);
+ ptlrpc_connect_import(imp);
RETURN(rc);
}
cfs_atomic_inc(&req->rq_import->imp_replay_inflight);
ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
RETURN(0);
}