* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
#define DEBUG_SUBSYSTEM S_RPC
+#include <linux/kthread.h>
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_net.h>
/** Used to protect the \e ptlrpc_all_services list */
struct mutex ptlrpc_all_services_mutex;
-struct ptlrpc_request_buffer_desc *
+static struct ptlrpc_request_buffer_desc *
ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
{
struct ptlrpc_service *svc = svcpt->scp_service;
return rqbd;
}
-void
+static void
ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
{
struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
OBD_FREE_PTR(rqbd);
}
-int
+static int
ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
{
struct ptlrpc_service *svc = svcpt->scp_service;
rs_batch_fini(&batch);
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_commit_replies);
static int
ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
/*
* User wants to increase number of threads with for
- * each CPU core/HT, most likely the factor is larger then
+ * each CPU core/HT, most likely the factor is larger than
* one thread/core because service threads are supposed to
* be blocked by lock or wait for IO.
*/
}
/**
- * to finish a active request: stop sending more early replies, and release
+ * to finish an active request: stop sending more early replies, and release
* the request. should be called after we finished handling the request.
*/
static void ptlrpc_server_finish_active_request(
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_update_export_timer);
/**
* Sanity check request \a req.
static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
{
struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
- struct ptlrpc_request *reqcopy;
- struct lustre_msg *reqmsg;
- cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
- int rc;
- ENTRY;
+ struct ptlrpc_request *reqcopy;
+ struct lustre_msg *reqmsg;
+ cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
+ time_t newdl;
+ int rc;
+
+ ENTRY;
if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
/* don't send early reply */
* during the recovery period send at least 4 early replies,
* spacing them every at_extra if we can. at_estimate should
* always equal this fixed value during recovery. */
- at_measured(&svcpt->scp_at_estimate,
- cfs_time_current_sec() -
- req->rq_arrival_time.tv_sec + min(at_extra,
- req->rq_export->exp_obd->obd_recovery_timeout / 4));
+ /* Don't account request processing time into AT history
+ * during recovery, it is not service time we need but
+ * includes also waiting time for recovering clients */
+ newdl = cfs_time_current_sec() + min(at_extra,
+ req->rq_export->exp_obd->obd_recovery_timeout / 4);
} else {
/* We want to extend the request deadline by at_extra seconds,
* so we set our service estimate to reflect how much time has
at_measured(&svcpt->scp_at_estimate, at_extra +
cfs_time_current_sec() -
req->rq_arrival_time.tv_sec);
-
+ newdl = req->rq_arrival_time.tv_sec +
+ at_get(&svcpt->scp_at_estimate);
}
+
/* Check to see if we've actually increased the deadline -
* we may be past adaptive_max */
- if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate)) {
+ if (req->rq_deadline >= newdl) {
DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
"(%ld/%ld), not sending early reply\n",
- olddl, req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate) -
- cfs_time_current_sec());
+ olddl, newdl - cfs_time_current_sec());
RETURN(-ETIMEDOUT);
}
reqcopy->rq_reqmsg = reqmsg;
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
+ /*
+ * tgt_brw_read() and tgt_brw_write() may have decided not to reply.
+ * Without this check, we would fail the rq_no_reply assertion in
+ * ptlrpc_send_reply().
+ */
+ if (reqcopy->rq_no_reply)
+ GOTO(out, rc = -ETIMEDOUT);
+
LASSERT(atomic_read(&req->rq_refcount));
/** if it is last refcount then early reply isn't needed */
if (atomic_read(&req->rq_refcount) == 1) {
if (!rc) {
/* Adjust our own deadline to what we told the client */
- req->rq_deadline = req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate);
+ req->rq_deadline = newdl;
req->rq_early_count++; /* number sent, server side */
} else {
DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
}
/**
- * Put the request to the export list if the request may become
- * a high priority one.
+ * Check if a request should be assigned with a high priority.
+ *
+ * \retval < 0: error occurred
+ * 0: normal RPC request
+ * +1: high priority request
*/
static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
- struct list_head *list;
- int rc, hp = 0;
-
+ int rc;
ENTRY;
- if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
+ if (svcpt->scp_service->srv_ops.so_hpreq_handler != NULL) {
rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req);
if (rc < 0)
RETURN(rc);
+
LASSERT(rc == 0);
}
- if (req->rq_export) {
- if (req->rq_ops) {
- /* Perform request specific check. We should do this
- * check before the request is added into exp_hp_rpcs
- * list otherwise it may hit swab race at LU-1044. */
- if (req->rq_ops->hpreq_check) {
- rc = req->rq_ops->hpreq_check(req);
- /**
- * XXX: Out of all current
- * ptlrpc_hpreq_ops::hpreq_check(), only
- * ldlm_cancel_hpreq_check() can return an
- * error code; other functions assert in
- * similar places, which seems odd.
- * What also does not seem right is that
- * handlers for those RPCs do not assert
- * on the same checks, but rather handle the
- * error cases. e.g. see ost_rw_hpreq_check(),
- * and ost_brw_read(), ost_brw_write().
- */
- if (rc < 0)
- RETURN(rc);
- LASSERT(rc == 0 || rc == 1);
- hp = rc;
- }
- list = &req->rq_export->exp_hp_rpcs;
- } else {
- list = &req->rq_export->exp_reg_rpcs;
- }
- /* do search for duplicated xid and the adding to the list
- * atomically */
- spin_lock_bh(&req->rq_export->exp_rpc_lock);
- rc = ptlrpc_server_check_resend_in_progress(req);
- if (rc < 0) {
- spin_unlock_bh(&req->rq_export->exp_rpc_lock);
- RETURN(rc);
+ if (req->rq_export != NULL && req->rq_ops != NULL) {
+ /* Perform request specific check. We should do this
+ * check before the request is added into exp_hp_rpcs
+ * list otherwise it may hit swab race at LU-1044. */
+ if (req->rq_ops->hpreq_check != NULL) {
+ rc = req->rq_ops->hpreq_check(req);
+ LASSERT(rc <= 1); /* can only return error, 0, or 1 */
}
- list_add(&req->rq_exp_list, list);
- spin_unlock_bh(&req->rq_export->exp_rpc_lock);
}
- ptlrpc_nrs_req_initialize(svcpt, req, !!hp);
-
- RETURN(hp);
+ RETURN(rc);
}
/** Remove the request from the export list. */
static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
- int rc;
+ int rc;
+ bool hp;
ENTRY;
rc = ptlrpc_server_hpreq_init(svcpt, req);
if (rc < 0)
RETURN(rc);
+ hp = rc > 0;
+ ptlrpc_nrs_req_initialize(svcpt, req, hp);
+
+ if (req->rq_export != NULL) {
+ struct obd_export *exp = req->rq_export;
+
+ /* do search for duplicated xid and the adding to the list
+ * atomically */
+ spin_lock_bh(&exp->exp_rpc_lock);
+ rc = ptlrpc_server_check_resend_in_progress(req);
+ if (rc < 0) {
+ spin_unlock_bh(&exp->exp_rpc_lock);
+
+ ptlrpc_nrs_req_finalize(req);
+ RETURN(rc);
+ }
+
+ if (hp || req->rq_ops != NULL)
+ list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
+ else
+ list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+ spin_unlock_bh(&exp->exp_rpc_lock);
+ }
+
/* the current thread is not the processing thread for this request
* since that, but request is in exp_hp_list and can be find there.
* Remove all relations between request and old thread. */
req->rq_svc_thread = NULL;
req->rq_session.lc_thread = NULL;
- ptlrpc_nrs_req_add(svcpt, req, !!rc);
+ ptlrpc_nrs_req_add(svcpt, req, hp);
RETURN(0);
}
goto err_req;
}
- switch(lustre_msg_get_opc(req->rq_reqmsg)) {
- case MDS_WRITEPAGE:
- case OST_WRITE:
- req->rq_bulk_write = 1;
- break;
- case MDS_READPAGE:
- case OST_READ:
- case MGS_CONFIG_READ:
- req->rq_bulk_read = 1;
- break;
- }
+ switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+ case MDS_WRITEPAGE:
+ case OST_WRITE:
+ case OUT_UPDATE:
+ req->rq_bulk_write = 1;
+ break;
+ case MDS_READPAGE:
+ case OST_READ:
+ case MGS_CONFIG_READ:
+ req->rq_bulk_read = 1;
+ break;
+ }
CDEBUG(D_RPCTRACE, "got req x"LPU64"\n", req->rq_xid);
MSGHDR_AT_SUPPORT) ?
/* The max time the client expects us to take */
lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
+
req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
if (unlikely(deadline == 0)) {
DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_stop_all_threads);
int ptlrpc_start_threads(struct ptlrpc_service *svc)
{
ptlrpc_stop_all_threads(svc);
RETURN(rc);
}
-EXPORT_SYMBOL(ptlrpc_start_threads);
int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
{
int rc;
int i;
int j;
+ int weight;
ENTRY;
memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
init_waitqueue_head(&ptlrpc_hr.hr_waitq);
+ weight = cfs_cpu_ht_nsiblings(0);
+
cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
hrp->hrp_cpt = i;
atomic_set(&hrp->hrp_nstopped, 0);
hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i);
- hrp->hrp_nthrs /= cfs_cpu_ht_nsiblings(0);
+ hrp->hrp_nthrs /= weight;
LASSERT(hrp->hrp_nthrs > 0);
OBD_CPT_ALLOC(hrp->hrp_thrs, ptlrpc_hr.hr_cpt_table, i,
* Right now, it just checks to make sure that requests aren't languishing
* in the queue. We'll use this health check to govern whether a node needs
* to be shot, so it's intentionally non-aggressive. */
-int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
+static int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
{
struct ptlrpc_request *request = NULL;
struct timeval right_now;