/*
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
*/
/*
* This file is part of Lustre, http://www.lustre.org/
err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
if (err != 0) {
- CDEBUG(D_NETERROR, "cannot find peer %s!\n", uuid->uuid);
+ CNETERR("cannot find peer %s!\n", uuid->uuid);
return NULL;
}
cfs_list_del(&req->rq_list);
LASSERT(req->rq_reqbuf);
LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
- OBD_FREE(req->rq_reqbuf, pool->prp_rq_size);
+ OBD_FREE_LARGE(req->rq_reqbuf, pool->prp_rq_size);
OBD_FREE(req, sizeof(*req));
}
cfs_spin_unlock(&pool->prp_lock);
OBD_ALLOC(req, sizeof(struct ptlrpc_request));
if (!req)
return;
- OBD_ALLOC_GFP(msg, size, CFS_ALLOC_STD);
+ OBD_ALLOC_LARGE(msg, size);
if (!msg) {
OBD_FREE(req, sizeof(struct ptlrpc_request));
return;
*/
void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
{
- /* if we kill request before timeout - need adjust counter */
- if (req->rq_phase == RQ_PHASE_RPC) {
- struct ptlrpc_request_set *set = req->rq_set;
+ struct ptlrpc_request_set *set = req->rq_set;
+ int wakeup = 0;
- if (set)
- cfs_atomic_dec(&set->set_remaining);
- }
+ /* hold ref on the request to prevent others (ptlrpcd) to free it */
+ ptlrpc_request_addref(req);
+ cfs_list_del_init(&req->rq_list);
+
+ /* if we kill request before timeout - need adjust counter */
+ if (req->rq_phase == RQ_PHASE_RPC && set != NULL &&
+ cfs_atomic_dec_and_test(&set->set_remaining))
+ wakeup = 1;
ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
- cfs_list_del_init(&req->rq_list);
+
+ /* Only need to call wakeup once when to be empty. */
+ if (wakeup)
+ cfs_waitq_signal(&set->set_waitq);
+ ptlrpc_req_finished(req);
}
/**
OBD_ALLOC(set, sizeof *set);
if (!set)
RETURN(NULL);
+ cfs_atomic_set(&set->set_refcount, 1);
CFS_INIT_LIST_HEAD(&set->set_requests);
cfs_waitq_init(&set->set_waitq);
+ cfs_atomic_set(&set->set_new_count, 0);
cfs_atomic_set(&set->set_remaining, 0);
cfs_spin_lock_init(&set->set_new_req_lock);
CFS_INIT_LIST_HEAD(&set->set_new_requests);
LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
- OBD_FREE(set, sizeof(*set));
+ ptlrpc_reqset_put(set);
EXIT;
}
void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
struct ptlrpc_request *req)
{
+ LASSERT(cfs_list_empty(&req->rq_set_chain));
+
/* The set takes over the caller's request reference */
cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
req->rq_set = set;
cfs_atomic_inc(&set->set_remaining);
- req->rq_queued_time = cfs_time_current(); /* Where is the best place to set this? */
+ req->rq_queued_time = cfs_time_current();
}
/**
* Add a request to a request with dedicated server thread
* and wake the thread to make any necessary processing.
* Currently only used for ptlrpcd.
- * Returns 0 if succesful or non zero error code on error.
- * (the only possible error for now is if the dedicated server thread
- * is shutting down)
*/
-int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
struct ptlrpc_request *req)
{
struct ptlrpc_request_set *set = pc->pc_set;
+ int count, i;
- /*
- * Let caller know that we stopped and will not handle this request.
- * It needs to take care itself of request.
- */
- if (cfs_test_bit(LIOD_STOP, &pc->pc_flags))
- return -EALREADY;
+ LASSERT(req->rq_set == NULL);
+ LASSERT(cfs_test_bit(LIOD_STOP, &pc->pc_flags) == 0);
cfs_spin_lock(&set->set_new_req_lock);
/*
* The set takes over the caller's request reference.
*/
- cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
req->rq_set = set;
+ req->rq_queued_time = cfs_time_current();
+ cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
+ count = cfs_atomic_inc_return(&set->set_new_count);
cfs_spin_unlock(&set->set_new_req_lock);
- cfs_waitq_signal(&set->set_waitq);
- return 0;
+ /* Only need to call wakeup once for the first entry. */
+ if (count == 1) {
+ cfs_waitq_signal(&set->set_waitq);
+
+ /* XXX: It maybe unnecessary to wakeup all the partners. But to
+ * guarantee the async RPC can be processed ASAP, we have
+ * no other better choice. It maybe fixed in future. */
+ for (i = 0; i < pc->pc_npartners; i++)
+ cfs_waitq_signal(&pc->pc_partners[i]->pc_set->set_waitq);
+ }
}
/**
}
if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
- OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, obd_fail_val);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
ptlrpc_at_adj_net_latency(req,
lustre_msg_get_service_time(req->rq_repmsg));
LASSERT(req->rq_next_phase != req->rq_phase);
LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
- /* Abort the bulk, if the request itself has been
- * aborted, for instance, on a client eviction. */
- if (req->rq_err && req->rq_status == -EINTR &&
- req->rq_bulk != NULL)
- ptlrpc_abort_bulk(req->rq_bulk);
-
/*
* Skip processing until reply is unlinked. We
* can't return to pool before that and we can't
if (ptlrpc_client_recv_or_unlink(req) ||
ptlrpc_client_bulk_active(req))
continue;
+ /* If there is no need to resend, fail it now. */
+ if (req->rq_no_resend) {
+ if (req->rq_status == 0)
+ req->rq_status = -EIO;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
+ GOTO(interpret, req->rq_status);
+ } else {
+ continue;
+ }
}
if (req->rq_err) {
/* This moves to "unregistering" phase we need to wait for
* reply unlink. */
- if (!unregistered && !ptlrpc_unregister_reply(req, 1))
+ if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
+ /* start async bulk unlink too */
+ ptlrpc_unregister_bulk(req, 1);
continue;
+ }
if (!ptlrpc_unregister_bulk(req, 1))
continue;
req->rq_timedout = 1;
cfs_spin_unlock(&req->rq_lock);
- DEBUG_REQ(req->rq_fake ? D_INFO : D_WARNING, req, "Request x"LPU64
- " sent from %s to NID %s has %s: [sent "CFS_DURATION_T"] "
- "[real_sent "CFS_DURATION_T"] [current "CFS_DURATION_T"] "
- "[deadline "CFS_DURATION_T"s] [delay "CFS_DURATION_T"s]",
- req->rq_xid, imp ? imp->imp_obd->obd_name : "<?>",
- imp ? libcfs_nid2str(imp->imp_connection->c_peer.nid) : "<?>",
+ DEBUG_REQ(req->rq_fake ? D_INFO : D_WARNING, req, "Request "
+ " sent has %s: [sent "CFS_DURATION_T"/"
+ "real "CFS_DURATION_T"]",
req->rq_net_err ? "failed due to network error" :
((req->rq_real_sent == 0 ||
cfs_time_before(req->rq_real_sent, req->rq_sent) ||
cfs_time_aftereq(req->rq_real_sent, req->rq_deadline)) ?
"timed out for sent delay" : "timed out for slow reply"),
- req->rq_sent, req->rq_real_sent, cfs_time_current_sec(),
- cfs_time_sub(req->rq_deadline, req->rq_sent),
- cfs_time_sub(cfs_time_current_sec(), req->rq_deadline));
+ req->rq_sent, req->rq_real_sent);
if (imp != NULL && obd_debug_peer_on_timeout)
LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
cfs_list_t *tmp;
LASSERT(set != NULL);
- CERROR("INTERRUPTED SET %p\n", set);
+ CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
cfs_list_for_each(tmp, &set->set_requests) {
struct ptlrpc_request *req =
rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
+ /* LU-769 - if we ignored the signal because it was already
+ * pending when we started, we need to handle it now or we risk
+ * it being ignored forever */
+ if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
+ cfs_signal_pending()) {
+ cfs_sigset_t blocked_sigs =
+ cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+
+ /* In fact we only interrupt for the "fatal" signals
+ * like SIGINT or SIGKILL. We still ignore less
+ * important signals since ptlrpc set is not easily
+ * reentrant from userspace again */
+ if (cfs_signal_pending())
+ ptlrpc_interrupted_set(set);
+ cfs_block_sigs(blocked_sigs);
+ }
+
LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
/* -EINTR => all requests have been flagged rq_intr so next
if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
imp->imp_generation == imp->imp_last_generation_checked) {
- CDEBUG(D_RPCTRACE, "%s: skip recheck: last_committed "LPU64"\n",
+ CDEBUG(D_INFO, "%s: skip recheck: last_committed "LPU64"\n",
imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
EXIT;
return;
break;
}
- DEBUG_REQ(D_RPCTRACE, req, "commit (last_committed "LPU64")",
+ DEBUG_REQ(D_INFO, req, "commit (last_committed "LPU64")",
imp->imp_peer_committed_transno);
free_req:
cfs_spin_lock(&req->rq_lock);
imp->imp_vbr_failed = 1;
imp->imp_no_lock_replay = 1;
cfs_spin_unlock(&imp->imp_lock);
+ lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
} else {
/** The transno had better not change over replay. */
LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
cfs_spin_unlock(&imp->imp_lock);
LASSERT(imp->imp_last_replay_transno);
+ /* transaction number shouldn't be bigger than the latest replayed */
+ if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
+ DEBUG_REQ(D_ERROR, req,
+ "Reported transno "LPU64" is bigger than the "
+ "replayed one: "LPU64, req->rq_transno,
+ lustre_msg_get_transno(req->rq_reqmsg));
+ GOTO(out, rc = -EINVAL);
+ }
+
DEBUG_REQ(D_HA, req, "got rep");
/* let the callback do fixups, possibly including in the request */
* Errors while replay can set transno to 0, but
* imp_last_replay_transno shouldn't be set to 0 anyway
*/
- if (req->rq_transno > 0) {
- cfs_spin_lock(&imp->imp_lock);
- LASSERT(req->rq_transno <= imp->imp_last_replay_transno);
- imp->imp_last_replay_transno = req->rq_transno;
- cfs_spin_unlock(&imp->imp_lock);
- } else
+ if (req->rq_transno == 0)
CERROR("Transno is 0 during replay!\n");
+
/* continue with recovery */
rc = ptlrpc_import_recovery_state_machine(imp);
out:
if (rc != 0)
/* this replay failed, so restart recovery */
- ptlrpc_connect_import(imp, NULL);
+ ptlrpc_connect_import(imp);
RETURN(rc);
}
cfs_atomic_inc(&req->rq_import->imp_replay_inflight);
ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
RETURN(0);
}