*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
#define DEBUG_SUBSYSTEM S_RPC
-#ifndef __KERNEL__
-#include <liblustre.h>
-#endif
+#include <linux/kthread.h>
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_net.h>
/* The following are visible and mutable through /sys/module/ptlrpc */
int test_req_buffer_pressure = 0;
-CFS_MODULE_PARM(test_req_buffer_pressure, "i", int, 0444,
- "set non-zero to put pressure on request buffer pools");
-CFS_MODULE_PARM(at_min, "i", int, 0644,
- "Adaptive timeout minimum (sec)");
-CFS_MODULE_PARM(at_max, "i", int, 0644,
- "Adaptive timeout maximum (sec)");
-CFS_MODULE_PARM(at_history, "i", int, 0644,
- "Adaptive timeouts remember the slowest event that took place "
- "within this period (sec)");
-CFS_MODULE_PARM(at_early_margin, "i", int, 0644,
- "How soon before an RPC deadline to send an early reply");
-CFS_MODULE_PARM(at_extra, "i", int, 0644,
- "How much extra time to give with each early reply");
-
+module_param(test_req_buffer_pressure, int, 0444);
+MODULE_PARM_DESC(test_req_buffer_pressure, "set non-zero to put pressure on request buffer pools");
+module_param(at_min, int, 0644);
+MODULE_PARM_DESC(at_min, "Adaptive timeout minimum (sec)");
+module_param(at_max, int, 0644);
+MODULE_PARM_DESC(at_max, "Adaptive timeout maximum (sec)");
+module_param(at_history, int, 0644);
+MODULE_PARM_DESC(at_history,
+ "Adaptive timeouts remember the slowest event that took place within this period (sec)");
+module_param(at_early_margin, int, 0644);
+MODULE_PARM_DESC(at_early_margin, "How soon before an RPC deadline to send an early reply");
+module_param(at_extra, int, 0644);
+MODULE_PARM_DESC(at_extra, "How much extra time to give with each early reply");
/* forward ref */
static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
/** Used to protect the \e ptlrpc_all_services list */
struct mutex ptlrpc_all_services_mutex;
-struct ptlrpc_request_buffer_desc *
+static struct ptlrpc_request_buffer_desc *
ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
{
struct ptlrpc_service *svc = svcpt->scp_service;
return rqbd;
}
-void
+static void
ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
{
struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
OBD_FREE_PTR(rqbd);
}
-int
+static int
ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
{
struct ptlrpc_service *svc = svcpt->scp_service;
}
EXPORT_SYMBOL(ptlrpc_save_lock);
-#ifdef __KERNEL__
struct ptlrpc_hr_partition;
#define DECLARE_RS_BATCH(b) struct rs_batch b
-#else /* __KERNEL__ */
-
-#define rs_batch_init(b) do{}while(0)
-#define rs_batch_fini(b) do{}while(0)
-#define rs_batch_add(b, r) ptlrpc_schedule_difficult_reply(r)
-#define DECLARE_RS_BATCH(b)
-
-#endif /* __KERNEL__ */
/**
* Put reply state into a queue for processing because we received
*/
void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
{
-#ifdef __KERNEL__
struct ptlrpc_hr_thread *hrt;
ENTRY;
wake_up(&hrt->hrt_waitq);
EXIT;
-#else
- list_add_tail(&rs->rs_list, &rs->rs_svcpt->scp_rep_queue);
-#endif
}
void
rs_batch_fini(&batch);
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_commit_replies);
static int
ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
struct ptlrpc_service_conf *conf)
{
-#ifdef __KERNEL__
struct ptlrpc_service_thr_conf *tc = &conf->psc_thr;
unsigned init;
unsigned total;
/*
* User wants to increase number of threads with for
- * each CPU core/HT, most likely the factor is larger then
+ * each CPU core/HT, most likely the factor is larger than
* one thread/core because service threads are supposed to
* be blocked by lock or wait for IO.
*/
* have too many threads no matter how many cores/HTs
* there are.
*/
- if (cfs_cpu_ht_nsiblings(0) > 1) { /* weight is # of HTs */
+ if (cfs_cpu_ht_nsiblings(smp_processor_id()) > 1) {
+ /* weight is # of HTs */
/* depress thread factor for hyper-thread */
factor = factor - (factor >> 1) + (factor >> 3);
}
svc->srv_name, nthrs * svc->srv_ncpts,
tc->tc_nthrs_max);
}
-#endif
}
/**
/* reply states */
spin_lock_init(&svcpt->scp_rep_lock);
INIT_LIST_HEAD(&svcpt->scp_rep_active);
-#ifndef __KERNEL__
- INIT_LIST_HEAD(&svcpt->scp_rep_queue);
-#endif
INIT_LIST_HEAD(&svcpt->scp_rep_idle);
init_waitqueue_head(&svcpt->scp_rep_waitq);
atomic_set(&svcpt->scp_nreps_difficult, 0);
if (array->paa_reqs_count == NULL)
goto failed;
- cfs_timer_init(&svcpt->scp_at_timer, ptlrpc_at_timer, svcpt);
+ setup_timer(&svcpt->scp_at_timer, ptlrpc_at_timer,
+ (unsigned long)svcpt);
+
/* At SOW, service time should be quick; 10s seems generous. If client
* timeout is less than this, we'll be sending an early reply. */
at_init(&svcpt->scp_at_estimate, 10, 0);
CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
service->srv_name, service->srv_req_portal);
-#ifdef __KERNEL__
rc = ptlrpc_start_threads(service);
if (rc != 0) {
CERROR("Failed to start threads for service %s: %d\n",
service->srv_name, rc);
GOTO(failed, rc);
}
-#endif
RETURN(service);
failed:
}
/**
- * to finish a active request: stop sending more early replies, and release
+ * to finish an active request: stop sending more early replies, and release
* the request. should be called after we finished handling the request.
*/
static void ptlrpc_server_finish_active_request(
* This function is only called when some export receives a message (i.e.,
* the network is up.)
*/
-static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
+void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
{
struct obd_export *oldest_exp;
time_t oldest_time, new_time;
} else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 &&
!obd->obd_recovering) {
DEBUG_REQ(D_ERROR, req, "Invalid req with transno "
- LPU64" without recovery",
+ "%llu without recovery",
lustre_msg_get_transno(req->rq_reqmsg));
class_fail_export(req->rq_export);
rc = -ENODEV;
__s32 next;
if (array->paa_count == 0) {
- cfs_timer_disarm(&svcpt->scp_at_timer);
+ del_timer(&svcpt->scp_at_timer);
return;
}
if (next <= 0) {
ptlrpc_at_timer((unsigned long)svcpt);
} else {
- cfs_timer_arm(&svcpt->scp_at_timer, cfs_time_shift(next));
+ mod_timer(&svcpt->scp_at_timer, cfs_time_shift(next));
CDEBUG(D_INFO, "armed %s at %+ds\n",
svcpt->scp_service->srv_name, next);
}
static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
{
struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
- struct ptlrpc_request *reqcopy;
- struct lustre_msg *reqmsg;
- cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
- int rc;
- ENTRY;
+ struct ptlrpc_request *reqcopy;
+ struct lustre_msg *reqmsg;
+ cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
+ time_t newdl;
+ int rc;
+
+ ENTRY;
+
+ if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
+ /* don't send early reply */
+ RETURN(1);
+ }
/* deadline is when the client expects us to reply, margin is the
difference between clients' and servers' expectations */
RETURN(-ENOSYS);
}
- if (req->rq_export &&
- lustre_msg_get_flags(req->rq_reqmsg) &
- (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
- /* During recovery, we don't want to send too many early
- * replies, but on the other hand we want to make sure the
- * client has enough time to resend if the rpc is lost. So
- * during the recovery period send at least 4 early replies,
- * spacing them every at_extra if we can. at_estimate should
- * always equal this fixed value during recovery. */
- at_measured(&svcpt->scp_at_estimate, min(at_extra,
- req->rq_export->exp_obd->obd_recovery_timeout / 4));
+ if (req->rq_export &&
+ lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
+ /* During recovery, we don't want to send too many early
+ * replies, but on the other hand we want to make sure the
+ * client has enough time to resend if the rpc is lost. So
+ * during the recovery period send at least 4 early replies,
+ * spacing them every at_extra if we can. at_estimate should
+ * always equal this fixed value during recovery. */
+ /* Don't account request processing time into AT history
+ * during recovery, it is not service time we need but
+ * includes also waiting time for recovering clients */
+ newdl = cfs_time_current_sec() + min(at_extra,
+ req->rq_export->exp_obd->obd_recovery_timeout / 4);
} else {
/* We want to extend the request deadline by at_extra seconds,
* so we set our service estimate to reflect how much time has
at_measured(&svcpt->scp_at_estimate, at_extra +
cfs_time_current_sec() -
req->rq_arrival_time.tv_sec);
+ newdl = req->rq_arrival_time.tv_sec +
+ at_get(&svcpt->scp_at_estimate);
+ }
- /* Check to see if we've actually increased the deadline -
- * we may be past adaptive_max */
- if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate)) {
- DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
- "(%ld/%ld), not sending early reply\n",
- olddl, req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate) -
- cfs_time_current_sec());
- RETURN(-ETIMEDOUT);
- }
+ /* Check to see if we've actually increased the deadline -
+ * we may be past adaptive_max */
+ if (req->rq_deadline >= newdl) {
+ DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
+ "(%ld/%ld), not sending early reply\n",
+ olddl, newdl - cfs_time_current_sec());
+ RETURN(-ETIMEDOUT);
}
reqcopy = ptlrpc_request_cache_alloc(GFP_NOFS);
reqcopy->rq_reqmsg = reqmsg;
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
+ /*
+ * tgt_brw_read() and tgt_brw_write() may have decided not to reply.
+ * Without this check, we would fail the rq_no_reply assertion in
+ * ptlrpc_send_reply().
+ */
+ if (reqcopy->rq_no_reply)
+ GOTO(out, rc = -ETIMEDOUT);
+
LASSERT(atomic_read(&req->rq_refcount));
/** if it is last refcount then early reply isn't needed */
if (atomic_read(&req->rq_refcount) == 1) {
if (!rc) {
/* Adjust our own deadline to what we told the client */
- req->rq_deadline = req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate);
+ req->rq_deadline = newdl;
req->rq_early_count++; /* number sent, server side */
} else {
DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
}
/**
- * Put the request to the export list if the request may become
- * a high priority one.
+ * Check if a request should be assigned with a high priority.
+ *
+ * \retval < 0: error occurred
+ * 0: normal RPC request
+ * +1: high priority request
*/
static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
- struct list_head *list;
- int rc, hp = 0;
-
+ int rc = 0;
ENTRY;
- if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
+ if (svcpt->scp_service->srv_ops.so_hpreq_handler != NULL) {
rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req);
if (rc < 0)
RETURN(rc);
+
LASSERT(rc == 0);
}
- if (req->rq_export) {
- if (req->rq_ops) {
- /* Perform request specific check. We should do this
- * check before the request is added into exp_hp_rpcs
- * list otherwise it may hit swab race at LU-1044. */
- if (req->rq_ops->hpreq_check) {
- rc = req->rq_ops->hpreq_check(req);
- /**
- * XXX: Out of all current
- * ptlrpc_hpreq_ops::hpreq_check(), only
- * ldlm_cancel_hpreq_check() can return an
- * error code; other functions assert in
- * similar places, which seems odd.
- * What also does not seem right is that
- * handlers for those RPCs do not assert
- * on the same checks, but rather handle the
- * error cases. e.g. see ost_rw_hpreq_check(),
- * and ost_brw_read(), ost_brw_write().
- */
- if (rc < 0)
- RETURN(rc);
- LASSERT(rc == 0 || rc == 1);
- hp = rc;
- }
- list = &req->rq_export->exp_hp_rpcs;
- } else {
- list = &req->rq_export->exp_reg_rpcs;
- }
- /* do search for duplicated xid and the adding to the list
- * atomically */
- spin_lock_bh(&req->rq_export->exp_rpc_lock);
- rc = ptlrpc_server_check_resend_in_progress(req);
- if (rc < 0) {
- spin_unlock_bh(&req->rq_export->exp_rpc_lock);
- RETURN(rc);
+ if (req->rq_export != NULL && req->rq_ops != NULL) {
+ /* Perform request specific check. We should do this
+ * check before the request is added into exp_hp_rpcs
+ * list otherwise it may hit swab race at LU-1044. */
+ if (req->rq_ops->hpreq_check != NULL) {
+ rc = req->rq_ops->hpreq_check(req);
+ if (rc == -ESTALE) {
+ req->rq_status = rc;
+ ptlrpc_error(req);
+ }
+ /** can only return error,
+ * 0 for normal request,
+ * or 1 for high priority request */
+ LASSERT(rc <= 1);
}
- list_add(&req->rq_exp_list, list);
- spin_unlock_bh(&req->rq_export->exp_rpc_lock);
}
- ptlrpc_nrs_req_initialize(svcpt, req, !!hp);
-
- RETURN(hp);
+ RETURN(rc);
}
/** Remove the request from the export list. */
static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
- int rc;
+ int rc;
+ bool hp;
ENTRY;
rc = ptlrpc_server_hpreq_init(svcpt, req);
if (rc < 0)
RETURN(rc);
+ hp = rc > 0;
+ ptlrpc_nrs_req_initialize(svcpt, req, hp);
+
+ if (req->rq_export != NULL) {
+ struct obd_export *exp = req->rq_export;
+
+ /* do search for duplicated xid and the adding to the list
+ * atomically */
+ spin_lock_bh(&exp->exp_rpc_lock);
+ rc = ptlrpc_server_check_resend_in_progress(req);
+ if (rc < 0) {
+ spin_unlock_bh(&exp->exp_rpc_lock);
+
+ ptlrpc_nrs_req_finalize(req);
+ RETURN(rc);
+ }
+
+ if (hp || req->rq_ops != NULL)
+ list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
+ else
+ list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+ spin_unlock_bh(&exp->exp_rpc_lock);
+ }
+
/* the current thread is not the processing thread for this request
* since that, but request is in exp_hp_list and can be find there.
* Remove all relations between request and old thread. */
req->rq_svc_thread = NULL;
req->rq_session.lc_thread = NULL;
- ptlrpc_nrs_req_add(svcpt, req, !!rc);
+ ptlrpc_nrs_req_add(svcpt, req, hp);
RETURN(0);
}
bool force)
{
int running = svcpt->scp_nthrs_running;
-#ifndef __KERNEL__
- if (1) /* always allow to handle normal request for liblustre */
- return true;
-#endif
if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
/* leave just 1 thread for normal RPCs */
ENTRY;
spin_lock(&svcpt->scp_req_lock);
-#ifndef __KERNEL__
- /* !@%$# liblustre only has 1 thread */
- if (atomic_read(&svcpt->scp_nreps_difficult) != 0) {
- spin_unlock(&svcpt->scp_req_lock);
- RETURN(NULL);
- }
-#endif
if (ptlrpc_server_high_pending(svcpt, force)) {
req = ptlrpc_nrs_req_get_nolock(svcpt, true, force);
rc = ptlrpc_unpack_req_msg(req, req->rq_reqlen);
if (rc != 0) {
CERROR("error unpacking request: ptl %d from %s "
- "x"LPU64"\n", svc->srv_req_portal,
+ "x%llu\n", svc->srv_req_portal,
libcfs_id2str(req->rq_peer), req->rq_xid);
goto err_req;
}
rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
if (rc) {
CERROR ("error unpacking ptlrpc body: ptl %d from %s x"
- LPU64"\n", svc->srv_req_portal,
+ "%llu\n", svc->srv_req_portal,
libcfs_id2str(req->rq_peer), req->rq_xid);
goto err_req;
}
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
- CERROR("drop incoming rpc opc %u, x"LPU64"\n",
+ CERROR("drop incoming rpc opc %u, x%llu\n",
cfs_fail_val, req->rq_xid);
goto err_req;
}
goto err_req;
}
- switch(lustre_msg_get_opc(req->rq_reqmsg)) {
- case MDS_WRITEPAGE:
- case OST_WRITE:
- req->rq_bulk_write = 1;
- break;
- case MDS_READPAGE:
- case OST_READ:
- case MGS_CONFIG_READ:
- req->rq_bulk_read = 1;
- break;
- }
+ switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+ case MDS_WRITEPAGE:
+ case OST_WRITE:
+ case OUT_UPDATE:
+ req->rq_bulk_write = 1;
+ break;
+ case MDS_READPAGE:
+ case OST_READ:
+ case MGS_CONFIG_READ:
+ req->rq_bulk_read = 1;
+ break;
+ }
- CDEBUG(D_RPCTRACE, "got req x"LPU64"\n", req->rq_xid);
+ CDEBUG(D_RPCTRACE, "got req x%llu\n", req->rq_xid);
req->rq_export = class_conn2export(
lustre_msg_get_handle(req->rq_reqmsg));
MSGHDR_AT_SUPPORT) ?
/* The max time the client expects us to take */
lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
+
req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
if (unlikely(deadline == 0)) {
DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
}
CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
- "%s:%s+%d:%d:x"LPU64":%s:%d\n", current_comm(),
+ "%s:%s+%d:%d:x%llu:%s:%d\n", current_comm(),
(request->rq_export ?
(char *)request->rq_export->exp_client_uuid.uuid : "0"),
(request->rq_export ?
if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
- CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
+ CDEBUG(D_NET, "got req %llu\n", request->rq_xid);
/* re-assign request and sesson thread to the current one */
request->rq_svc_thread = thread;
do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
- "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in "
- "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
+ "%s:%s+%d:%d:x%llu:%s:%d Request procesed in "
+ "%ldus (%ldus total) trans %llu rc %d/%d\n",
current_comm(),
(request->rq_export ?
(char *)request->rq_export->exp_client_uuid.uuid : "0"),
if (nlocks == 0 && !been_handled) {
/* If we see this, we should already have seen the warning
* in mds_steal_ack_locks() */
- CDEBUG(D_HA, "All locks stolen from rs %p x"LPD64".t"LPD64
+ CDEBUG(D_HA, "All locks stolen from rs %p x%lld.t%lld"
" o%d NID %s\n",
rs,
rs->rs_xid, rs->rs_transno, rs->rs_opc,
RETURN(1);
}
-#ifndef __KERNEL__
-
-/**
- * Check whether given service has a reply available for processing
- * and process it.
- *
- * \param svc a ptlrpc service
- * \retval 0 no replies processed
- * \retval 1 one reply processed
- */
-static int
-ptlrpc_server_handle_reply(struct ptlrpc_service_part *svcpt)
-{
- struct ptlrpc_reply_state *rs = NULL;
- ENTRY;
-
- spin_lock(&svcpt->scp_rep_lock);
- if (!list_empty(&svcpt->scp_rep_queue)) {
- rs = list_entry(svcpt->scp_rep_queue.prev,
- struct ptlrpc_reply_state,
- rs_list);
- list_del_init(&rs->rs_list);
- }
- spin_unlock(&svcpt->scp_rep_lock);
- if (rs != NULL)
- ptlrpc_handle_rs(rs);
- RETURN(rs != NULL);
-}
-
-/* FIXME make use of timeout later */
-int
-liblustre_check_services (void *arg)
-{
- int did_something = 0;
- int rc;
- struct list_head *tmp, *nxt;
- ENTRY;
-
- /* I'm relying on being single threaded, not to have to lock
- * ptlrpc_all_services etc */
- list_for_each_safe(tmp, nxt, &ptlrpc_all_services) {
- struct ptlrpc_service *svc =
- list_entry(tmp, struct ptlrpc_service, srv_list);
- struct ptlrpc_service_part *svcpt;
-
- LASSERT(svc->srv_ncpts == 1);
- svcpt = svc->srv_parts[0];
-
- if (svcpt->scp_nthrs_running != 0) /* I've recursed */
- continue;
-
- /* service threads can block for bulk, so this limits us
- * (arbitrarily) to recursing 1 stack frame per service.
- * Note that the problem with recursion is that we have to
- * unwind completely before our caller can resume. */
-
- svcpt->scp_nthrs_running++;
-
- do {
- rc = ptlrpc_server_handle_req_in(svcpt, NULL);
- rc |= ptlrpc_server_handle_reply(svcpt);
- rc |= ptlrpc_at_check_timed(svcpt);
- rc |= ptlrpc_server_handle_request(svcpt, NULL);
- rc |= (ptlrpc_server_post_idle_rqbds(svcpt) > 0);
- did_something |= rc;
- } while (rc);
-
- svcpt->scp_nthrs_running--;
- }
-
- RETURN(did_something);
-}
-#define ptlrpc_stop_all_threads(s) do {} while (0)
-
-#else /* __KERNEL__ */
static void
ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt)
struct ptlrpc_service_part *svcpt = thread->t_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_reply_state *rs;
-#ifdef WITH_GROUP_INFO
struct group_info *ginfo = NULL;
-#endif
struct lu_env *env;
int counter = 0, rc = 0;
ENTRY;
svc->srv_name, thread->t_name, svcpt->scp_cpt);
}
-#ifdef WITH_GROUP_INFO
ginfo = groups_alloc(0);
if (!ginfo) {
rc = -ENOMEM;
set_current_groups(ginfo);
put_group_info(ginfo);
-#endif
if (svc->srv_ops.so_thr_init != NULL) {
rc = svc->srv_ops.so_thr_init(thread);
goto out_srv_fini;
}
- /* Alloc reply state structure for this one */
- OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
- if (!rs) {
- rc = -ENOMEM;
- goto out_srv_fini;
- }
+ /* Alloc reply state structure for this one */
+ OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
+ if (!rs) {
+ rc = -ENOMEM;
+ goto out_srv_fini;
+ }
spin_lock(&svcpt->scp_lock);
struct ptlrpc_hr_thread *hrt = (struct ptlrpc_hr_thread *)arg;
struct ptlrpc_hr_partition *hrp = hrt->hrt_partition;
struct list_head replies;
- char threadname[20];
int rc;
INIT_LIST_HEAD(&replies);
- snprintf(threadname, sizeof(threadname), "ptlrpc_hr%02d_%03d",
- hrp->hrp_cpt, hrt->hrt_id);
unshare_fs_struct();
rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
if (rc != 0) {
+ char threadname[20];
+
+ snprintf(threadname, sizeof(threadname), "ptlrpc_hr%02d_%03d",
+ hrp->hrp_cpt, hrt->hrt_id);
CWARN("Failed to bind %s on CPT %d of CPT table %p: rc = %d\n",
threadname, hrp->hrp_cpt, ptlrpc_hr.hr_cpt_table, rc);
}
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_stop_all_threads);
int ptlrpc_start_threads(struct ptlrpc_service *svc)
{
ptlrpc_stop_all_threads(svc);
RETURN(rc);
}
-EXPORT_SYMBOL(ptlrpc_start_threads);
int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
{
int rc;
int i;
int j;
+ int weight;
ENTRY;
memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
init_waitqueue_head(&ptlrpc_hr.hr_waitq);
+ weight = cfs_cpu_ht_nsiblings(smp_processor_id());
+
cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
hrp->hrp_cpt = i;
atomic_set(&hrp->hrp_nstopped, 0);
hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i);
- hrp->hrp_nthrs /= cfs_cpu_ht_nsiblings(0);
- LASSERT(hrp->hrp_nthrs > 0);
+ hrp->hrp_nthrs /= weight;
+ if (hrp->hrp_nthrs == 0)
+ hrp->hrp_nthrs = 1;
+
OBD_CPT_ALLOC(hrp->hrp_thrs, ptlrpc_hr.hr_cpt_table, i,
hrp->hrp_nthrs * sizeof(*hrt));
if (hrp->hrp_thrs == NULL)
ptlrpc_hr.hr_partitions = NULL;
}
-#endif /* __KERNEL__ */
/**
* Wait until all already scheduled replies are processed.
/* early disarm AT timer... */
ptlrpc_service_for_each_part(svcpt, i, svc) {
if (svcpt->scp_service != NULL)
- cfs_timer_disarm(&svcpt->scp_at_timer);
+ del_timer(&svcpt->scp_at_timer);
}
}
break;
/* In case somebody rearmed this in the meantime */
- cfs_timer_disarm(&svcpt->scp_at_timer);
+ del_timer(&svcpt->scp_at_timer);
array = &svcpt->scp_at_array;
if (array->paa_reqs_array != NULL) {
* Right now, it just checks to make sure that requests aren't languishing
* in the queue. We'll use this health check to govern whether a node needs
* to be shot, so it's intentionally non-aggressive. */
-int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
+static int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
{
struct ptlrpc_request *request = NULL;
struct timeval right_now;