* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
static int ptlrpc_send_new_req(struct ptlrpc_request *req);
static int ptlrpcd_check_work(struct ptlrpc_request *req);
+static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
/**
* Initialize passed in client structure \a cl.
return c;
}
-EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
/**
* Allocate and initialize new bulk descriptor on the sender.
time_t now = cfs_time_current_sec();
LASSERT(req->rq_import);
- at = &req->rq_import->imp_at;
+
+ if (service_time > now - req->rq_sent + 3) {
+ /* bz16408, however, this can also happen if early reply
+ * is lost and client RPC is expired and resent, early reply
+ * or reply of original RPC can still be fit in reply buffer
+ * of resent RPC, now client is measuring time from the
+ * resent time, but server sent back service time of original
+ * RPC.
+ */
+ CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
+ D_ADAPTTO : D_WARNING,
+ "Reported service time %u > total measured time "
+ CFS_DURATION_T"\n", service_time,
+ cfs_time_sub(now, req->rq_sent));
+ return;
+ }
/* Network latency is total time less server processing time */
- nl = max_t(int, now - req->rq_sent - service_time, 0) +1/*st rounding*/;
- if (service_time > now - req->rq_sent + 3 /* bz16408 */)
- CWARN("Reported service time %u > total measured time "
- CFS_DURATION_T"\n", service_time,
- cfs_time_sub(now, req->rq_sent));
+ nl = max_t(int, now - req->rq_sent -
+ service_time, 0) + 1; /* st rounding */
+ at = &req->rq_import->imp_at;
oldnl = at_measured(&at->iat_net_latency, nl);
if (oldnl != 0)
rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
if (rc) {
spin_lock(&req->rq_lock);
- RETURN(rc);
- }
-
- rc = unpack_reply(early_req);
- if (rc == 0) {
- /* Expecting to increase the service time estimate here */
- ptlrpc_at_adj_service(req,
- lustre_msg_get_timeout(early_req->rq_repmsg));
- ptlrpc_at_adj_net_latency(req,
- lustre_msg_get_service_time(early_req->rq_repmsg));
- }
-
- sptlrpc_cli_finish_early_reply(early_req);
+ RETURN(rc);
+ }
+ rc = unpack_reply(early_req);
if (rc != 0) {
+ sptlrpc_cli_finish_early_reply(early_req);
spin_lock(&req->rq_lock);
RETURN(rc);
}
- /* Adjust the local timeout for this req */
- ptlrpc_at_set_req_timeout(req);
+ /* Use new timeout value just to adjust the local value for this
+ * request, don't include it into at_history. It is unclear yet why
+ * service time increased and should it be counted or skipped, e.g.
+ * that can be recovery case or some error or server, the real reply
+ * will add all new data if it is worth to add. */
+ req->rq_timeout = lustre_msg_get_timeout(early_req->rq_repmsg);
+ lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
+
+ /* Network latency can be adjusted, it is pure network delays */
+ ptlrpc_at_adj_net_latency(req,
+ lustre_msg_get_service_time(early_req->rq_repmsg));
+
+ sptlrpc_cli_finish_early_reply(early_req);
spin_lock(&req->rq_lock);
olddl = req->rq_deadline;
RETURN(rc);
}
-struct kmem_cache *request_cache;
+static struct kmem_cache *request_cache;
int ptlrpc_request_cache_init(void)
{
lustre_msg_add_version(request->rq_reqmsg, version);
request->rq_send_state = LUSTRE_IMP_FULL;
request->rq_type = PTL_RPC_MSG_REQUEST;
- request->rq_export = NULL;
request->rq_req_cbid.cbid_fn = request_out_callback;
request->rq_req_cbid.cbid_arg = request;
ptlrpc_at_set_req_timeout(request);
- spin_lock_init(&request->rq_lock);
- INIT_LIST_HEAD(&request->rq_list);
- INIT_LIST_HEAD(&request->rq_timed_list);
- INIT_LIST_HEAD(&request->rq_replay_list);
- INIT_LIST_HEAD(&request->rq_ctx_chain);
- INIT_LIST_HEAD(&request->rq_set_chain);
- INIT_LIST_HEAD(&request->rq_history_list);
- INIT_LIST_HEAD(&request->rq_exp_list);
- init_waitqueue_head(&request->rq_reply_waitq);
- init_waitqueue_head(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
- atomic_set(&request->rq_refcount, 1);
-
lustre_msg_set_opc(request->rq_reqmsg, opcode);
RETURN(0);
request = ptlrpc_request_cache_alloc(GFP_NOFS);
if (request) {
- LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
+ ptlrpc_cli_req_init(request);
+
+ LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
LASSERT(imp != LP_POISON);
LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
imp->imp_client);
}
return request;
}
-EXPORT_SYMBOL(ptlrpc_prep_req_pool);
/**
* Same as ptlrpc_prep_req_pool, but without pool
return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs,
NULL);
}
-EXPORT_SYMBOL(ptlrpc_prep_req);
/**
* Allocate and initialize new request set structure.
RETURN(set);
}
-EXPORT_SYMBOL(ptlrpc_prep_fcset);
/**
* Wind down and free request set structure previously allocated with
RETURN(0);
}
-EXPORT_SYMBOL(ptlrpc_set_add_cb);
/**
* Add a new request to the general purpose request set.
wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
}
}
-EXPORT_SYMBOL(ptlrpc_set_add_new_req);
/**
* Based on the current state of the import, determine if the request
}
/**
- * Decide if the eror message regarding provided request \a req
- * should be printed to the console or not.
- * Makes it's decision on request status and other properties.
- * Returns 1 to print error on the system console or 0 if not.
+ * Decide if the error message should be printed to the console or not.
+ * Makes its decision based on request type, status, and failure frequency.
+ *
+ * \param[in] req request that failed and may need a console message
+ *
+ * \retval false if no message should be printed
+ * \retval true if console message should be printed
*/
-static int ptlrpc_console_allow(struct ptlrpc_request *req)
+static bool ptlrpc_console_allow(struct ptlrpc_request *req)
{
- __u32 opc;
- int err;
+ __u32 opc;
- LASSERT(req->rq_reqmsg != NULL);
- opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(req->rq_reqmsg != NULL);
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
- /* Suppress particular reconnect errors which are to be expected. No
- * errors are suppressed for the initial connection on an import */
- if ((lustre_handle_is_used(&req->rq_import->imp_remote_handle)) &&
- (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT)) {
+ /* Suppress particular reconnect errors which are to be expected. */
+ if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) {
+ int err;
- /* Suppress timed out reconnect requests */
- if (req->rq_timedout)
- return 0;
+ /* Suppress timed out reconnect requests */
+ if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) ||
+ req->rq_timedout)
+ return false;
- /* Suppress unavailable/again reconnect requests */
- err = lustre_msg_get_status(req->rq_repmsg);
- if (err == -ENODEV || err == -EAGAIN)
- return 0;
- }
+ /* Suppress most unavailable/again reconnect requests, but
+ * print occasionally so it is clear client is trying to
+ * connect to a server where no target is running. */
+ err = lustre_msg_get_status(req->rq_repmsg);
+ if ((err == -ENODEV || err == -EAGAIN) &&
+ req->rq_import->imp_conn_cnt % 30 != 20)
+ return false;
+ }
- return 1;
+ return true;
}
/**
err = lustre_msg_get_status(req->rq_repmsg);
if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
struct obd_import *imp = req->rq_import;
+ lnet_nid_t nid = imp->imp_connection->c_peer.nid;
__u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
if (ptlrpc_console_allow(req))
- LCONSOLE_ERROR_MSG(0x011, "%s: Communicating with %s,"
- " operation %s failed with %d.\n",
- imp->imp_obd->obd_name,
- libcfs_nid2str(
- imp->imp_connection->c_peer.nid),
- ll_opcode2str(opc), err);
+ LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s "
+ "failed: rc = %d\n",
+ imp->imp_obd->obd_name,
+ ll_opcode2str(opc),
+ libcfs_nid2str(nid), err);
RETURN(err < 0 ? err : -EINVAL);
}
LASSERT(obd != NULL);
/* repbuf must be unlinked */
- LASSERT(!req->rq_receiving_reply && !req->rq_reply_unlink);
+ LASSERT(!req->rq_receiving_reply && req->rq_reply_unlinked);
- if (req->rq_reply_truncate) {
+ if (req->rq_reply_truncated) {
if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_ERROR, req, "reply buffer overflow,"
" expected: %d, actual size: %d",
RETURN(0);
}
+ do_gettimeofday(&work_start);
+ timediff = cfs_timeval_sub(&work_start, &req->rq_sent_tv, NULL);
+
/*
* NB Until this point, the whole of the incoming message,
* including buflens, status etc is in the sender's byte order.
RETURN(0);
}
- do_gettimeofday(&work_start);
- timediff = cfs_timeval_sub(&work_start, &req->rq_arrival_time, NULL);
if (obd->obd_svc_stats != NULL) {
lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
timediff);
rc = ptlrpc_check_status(req);
imp->imp_connect_error = rc;
- if (rc) {
- /*
- * Either we've been evicted, or the server has failed for
- * some reason. Try to reconnect, and if that fails, punt to
- * the upcall.
- */
- if (ll_rpc_recoverable_error(rc)) {
- if (req->rq_send_state != LUSTRE_IMP_FULL ||
- imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
- RETURN(rc);
- }
- ptlrpc_request_handle_notconn(req);
- RETURN(rc);
- }
+ if (rc) {
+ /*
+ * Either we've been evicted, or the server has failed for
+ * some reason. Try to reconnect, and if that fails, punt to
+ * the upcall.
+ */
+ if (ptlrpc_recoverable_error(rc)) {
+ if (req->rq_send_state != LUSTRE_IMP_FULL ||
+ imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
+ RETURN(rc);
+ }
+ ptlrpc_request_handle_notconn(req);
+ RETURN(rc);
+ }
} else {
/*
* Let's look if server sent slv. Do it only for RPC with
/* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
* so it sets rq_intr regardless of individual rpc
- * timeouts. The synchronous IO waiting path sets
+ * timeouts. The synchronous IO waiting path sets
* rq_intr irrespective of whether ptlrpcd
* has seen a timeout. Our policy is to only interpret
* interrupted rpcs after they have timed out, so we
*/
RETURN(1);
}
-EXPORT_SYMBOL(ptlrpc_expired_set);
/**
* Sets rq_intr flag in \a req under spinlock.
* Interrupts (sets interrupted flag) all uncompleted requests in
* a set \a data. Callback for l_wait_event for interruptible waits.
*/
-void ptlrpc_interrupted_set(void *data)
+static void ptlrpc_interrupted_set(void *data)
{
struct ptlrpc_request_set *set = data;
struct list_head *tmp;
ptlrpc_mark_interrupted(req);
}
}
-EXPORT_SYMBOL(ptlrpc_interrupted_set);
/**
* Get the smallest timeout in the set; this does NOT set a timeout.
}
RETURN(timeout);
}
-EXPORT_SYMBOL(ptlrpc_set_next_timeout);
/**
* Send all unset request from the set and then wait untill all
CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
set, timeout);
- if (timeout == 0 && !cfs_signal_pending())
+ if (timeout == 0 && !signal_pending(current))
/*
* No requests are in-flight (ether timed out
* or delayed), so we can allow interrupts.
* We still want to block for a limited time,
* so we allow interrupts during the timeout.
*/
- lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
+ lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
ptlrpc_expired_set,
ptlrpc_interrupted_set, set);
else
/*
* At least one request is in flight, so no
* interrupts are allowed. Wait until all
- * complete, or an in-flight req times out.
+ * complete, or an in-flight req times out.
*/
lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1),
ptlrpc_expired_set, set);
* pending when we started, we need to handle it now or we risk
* it being ignored forever */
if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
- cfs_signal_pending()) {
+ signal_pending(current)) {
sigset_t blocked_sigs =
cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
* like SIGINT or SIGKILL. We still ignore less
* important signals since ptlrpc set is not easily
* reentrant from userspace again */
- if (cfs_signal_pending())
+ if (signal_pending(current))
ptlrpc_interrupted_set(set);
cfs_restore_sigs(blocked_sigs);
}
*/
static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
{
- ENTRY;
- if (request == NULL) {
- EXIT;
- return;
- }
+ ENTRY;
+
+ if (request == NULL)
+ RETURN_EXIT;
- LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
- LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
+ LASSERT(!request->rq_srv_req);
+ LASSERT(request->rq_export == NULL);
+ LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
- LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request);
- LASSERTF(!request->rq_replay, "req %p\n", request);
+ LASSERTF(!request->rq_replay, "req %p\n", request);
- req_capsule_fini(&request->rq_pill);
+ req_capsule_fini(&request->rq_pill);
- /* We must take it off the imp_replay_list first. Otherwise, we'll set
- * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
- if (request->rq_import != NULL) {
+ /* We must take it off the imp_replay_list first. Otherwise, we'll set
+ * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
+ if (request->rq_import != NULL) {
if (!locked)
spin_lock(&request->rq_import->imp_lock);
list_del_init(&request->rq_replay_list);
if (request->rq_repbuf != NULL)
sptlrpc_cli_free_repbuf(request);
- if (request->rq_export != NULL) {
- class_export_put(request->rq_export);
- request->rq_export = NULL;
- }
+
if (request->rq_import != NULL) {
class_import_put(request->rq_import);
request->rq_import = NULL;
static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
/**
* Drop one request reference. Must be called with import imp_lock held.
- * When reference count drops to zero, reuqest is freed.
+ * When reference count drops to zero, request is freed.
*/
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
{
assert_spin_locked(&request->rq_import->imp_lock);
(void)__ptlrpc_req_finished(request, 1);
}
-EXPORT_SYMBOL(ptlrpc_req_finished_with_imp_lock);
/**
* Helper function
* The request owner (i.e. the thread doing the I/O) must call...
* Returns 0 on success or 1 if unregistering cannot be made.
*/
-int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
+static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
int rc;
struct l_wait_info lwi;
}
LASSERT(rc == -ETIMEDOUT);
- DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
- "rvcng=%d unlnk=%d/%d", request->rq_receiving_reply,
- request->rq_req_unlink, request->rq_reply_unlink);
+ DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
+ "receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
+ request->rq_receiving_reply,
+ request->rq_req_unlinked,
+ request->rq_reply_unlinked);
}
RETURN(0);
}
-EXPORT_SYMBOL(ptlrpc_unregister_reply);
static void ptlrpc_free_request(struct ptlrpc_request *req)
{
imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
imp->imp_generation);
- if (imp->imp_generation != imp->imp_last_generation_checked)
+ if (imp->imp_generation != imp->imp_last_generation_checked ||
+ imp->imp_last_transno_checked == 0)
skip_committed_list = false;
imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
if (req->rq_import_generation < imp->imp_generation) {
DEBUG_REQ(D_RPCTRACE, req, "free stale open request");
ptlrpc_free_request(req);
+ } else if (!req->rq_replay) {
+ DEBUG_REQ(D_RPCTRACE, req, "free closed open request");
+ ptlrpc_free_request(req);
}
}
out:
ENTRY;
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_cleanup_client);
/**
* Schedule previously sent request for resend.
ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
-EXPORT_SYMBOL(ptlrpc_resend_req);
/* XXX: this function and rq_status are currently unused */
void ptlrpc_restart_req(struct ptlrpc_request *req)
ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
-EXPORT_SYMBOL(ptlrpc_restart_req);
/**
* Grab additional reference on a request \a req
list_add(&req->rq_replay_list, &imp->imp_replay_list);
}
-EXPORT_SYMBOL(ptlrpc_retain_replayable_request);
/**
* Send request and wait until it completes.
}
EXPORT_SYMBOL(ptlrpc_queue_wait);
-struct ptlrpc_replay_async_args {
- int praa_old_state;
- int praa_old_status;
-};
-
/**
* Callback used for replayed requests reply processing.
- * In case of succesful reply calls registeresd request replay callback.
+ * In case of successful reply calls registered request replay callback.
* In case of error restart replay process.
*/
static int ptlrpc_replay_interpret(const struct lu_env *env,
ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
RETURN(0);
}
-EXPORT_SYMBOL(ptlrpc_replay_req);
/**
* Aborts all in-flight request on import \a imp sending and delayed lists
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_abort_inflight);
/**
* Abort all uncompleted requests in request set \a set
return next;
}
-EXPORT_SYMBOL(ptlrpc_next_xid);
/**
* Get a glimpse at what next xid value might have been.
void *ptlrpcd_alloc_work(struct obd_import *imp,
int (*cb)(const struct lu_env *, void *), void *cbdata)
{
- struct ptlrpc_request *req = NULL;
+ struct ptlrpc_request *req = NULL;
struct ptlrpc_work_async_args *args;
ENTRY;
RETURN(ERR_PTR(-ENOMEM));
}
+ ptlrpc_cli_req_init(req);
+
req->rq_send_state = LUSTRE_IMP_FULL;
req->rq_type = PTL_RPC_MSG_REQUEST;
req->rq_import = class_import_get(imp);
- req->rq_export = NULL;
req->rq_interpret_reply = work_interpreter;
/* don't want reply */
- req->rq_receiving_reply = 0;
- req->rq_req_unlink = req->rq_reply_unlink = 0;
req->rq_no_delay = req->rq_no_resend = 1;
req->rq_pill.rc_fmt = (void *)&worker_format;
- spin_lock_init(&req->rq_lock);
- INIT_LIST_HEAD(&req->rq_list);
- INIT_LIST_HEAD(&req->rq_replay_list);
- INIT_LIST_HEAD(&req->rq_set_chain);
- INIT_LIST_HEAD(&req->rq_history_list);
- INIT_LIST_HEAD(&req->rq_exp_list);
- init_waitqueue_head(&req->rq_reply_waitq);
- init_waitqueue_head(&req->rq_set_waitq);
- atomic_set(&req->rq_refcount, 1);
-
CLASSERT (sizeof(*args) <= sizeof(req->rq_async_args));
args = ptlrpc_req_async_args(req);
args->cb = cb;