struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_request_buffer_desc *rqbd;
- OBD_ALLOC_PTR(rqbd);
+ OBD_CPT_ALLOC_PTR(rqbd, svc->srv_cptable, svcpt->scp_cpt);
if (rqbd == NULL)
return NULL;
rqbd->rqbd_svcpt = svcpt;
- rqbd->rqbd_refcount = 0;
- rqbd->rqbd_cbid.cbid_fn = request_in_callback;
- rqbd->rqbd_cbid.cbid_arg = rqbd;
- CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
- OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
-
- if (rqbd->rqbd_buffer == NULL) {
- OBD_FREE_PTR(rqbd);
- return (NULL);
- }
+ rqbd->rqbd_refcount = 0;
+ rqbd->rqbd_cbid.cbid_fn = request_in_callback;
+ rqbd->rqbd_cbid.cbid_arg = rqbd;
+ CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
+ OBD_CPT_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_cptable,
+ svcpt->scp_cpt, svc->srv_buf_size);
+ if (rqbd->rqbd_buffer == NULL) {
+ OBD_FREE_PTR(rqbd);
+ return NULL;
+ }
cfs_spin_lock(&svcpt->scp_lock);
cfs_list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
}
int
-ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt)
+ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
{
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_request_buffer_desc *rqbd;
rc = -ENOMEM;
break;
}
-
- if (ptlrpc_server_post_idle_rqbds(svcpt) < 0) {
- rc = -EAGAIN;
- break;
- }
}
CDEBUG(D_RPCTRACE,
"%s: allocate %d new %d-byte reqbufs (%d/%d left), rc = %d\n",
- svc->srv_name, i, svc->srv_buf_size,
- svcpt->scp_nrqbds_posted, svcpt->scp_nrqbds_total, rc);
+ svc->srv_name, i, svc->srv_buf_size, svcpt->scp_nrqbds_posted,
+ svcpt->scp_nrqbds_total, rc);
+
+ if (post && rc == 0)
+ rc = ptlrpc_server_post_idle_rqbds(svcpt);
return rc;
}
}
static void
-ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
- int *min_p, int *max_p)
+ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
+ struct ptlrpc_service_conf *conf)
{
#ifdef __KERNEL__
struct ptlrpc_service_thr_conf *tc = &conf->psc_thr;
- int nthrs_min;
- int nthrs;
+ unsigned init;
+ unsigned total;
+ unsigned nthrs;
+ int weight;
- nthrs_min = PTLRPC_NTHRS_MIN + (conf->psc_ops.so_hpreq_handler != NULL);
- nthrs_min = max_t(int, nthrs_min, tc->tc_nthrs_min);
+ /*
+ * Common code for estimating & validating threads number.
+ * CPT affinity service could have percpt thread-pool instead
+ * of a global thread-pool, which means user might not always
+ * get the threads number they give it in conf::tc_nthrs_user
+ * even they did set. It's because we need to validate threads
+ * number for each CPT to guarantee each pool will have enough
+ * threads to keep the service healthy.
+ */
+ init = PTLRPC_NTHRS_INIT + (svc->srv_ops.so_hpreq_handler != NULL);
+ init = max_t(int, init, tc->tc_nthrs_init);
+
+ /* NB: please see comments in lustre_lnet.h for definition
+ * details of these members */
+ LASSERT(tc->tc_nthrs_max != 0);
+
+ if (tc->tc_nthrs_user != 0) {
+ /* In case there is a reason to test a service with many
+ * threads, we give a less strict check here, it can
+ * be up to 8 * nthrs_max */
+ total = min(tc->tc_nthrs_max * 8, tc->tc_nthrs_user);
+ nthrs = total / svc->srv_ncpts;
+ init = max(init, nthrs);
+ goto out;
+ }
- nthrs = tc->tc_nthrs_user;
- if (nthrs != 0) { /* validate it */
- nthrs = min_t(int, nthrs, tc->tc_nthrs_max);
- nthrs = max_t(int, nthrs, nthrs_min);
- *min_p = *max_p = nthrs;
- return;
+ total = tc->tc_nthrs_max;
+ if (tc->tc_nthrs_base == 0) {
+ /* don't care about base threads number per partition,
+ * this is most for non-affinity service */
+ nthrs = total / svc->srv_ncpts;
+ goto out;
}
- /*
- * NB: we will add some common code here for estimating, for example:
- * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate
- * threads number based on:
- * (online_cpus * conf::tc_factor) + conf::tc_nthrs_base.
- *
- * So we can remove code block like estimation in ost_setup, also,
- * we might estimate MDS threads number as well instead of using
- * absolute number, and have more threads on fat servers to improve
- * availability of service.
- *
- * Also, we will need to validate threads number at here for
- * CPT affinity service (CPU ParTion) in the future.
- * A service can have percpt thread-pool instead of a global thread
- * pool for each service, which means user might not always get the
- * threads number they want even they set it in conf::tc_nthrs_user,
- * because we need to adjust threads number for each CPT, instead of
- * just use (conf::tc_nthrs_user / NCPTS), to make sure each pool
- * will be healthy.
- */
- *max_p = tc->tc_nthrs_max;
- *min_p = nthrs_min;
-#else /* __KERNEL__ */
- *max_p = *min_p = 1; /* whatever */
+ nthrs = tc->tc_nthrs_base;
+ if (svc->srv_ncpts == 1) {
+ int i;
+
+ /* NB: Increase the base number if it's single partition
+ * and total number of cores/HTs is larger or equal to 4.
+ * result will always < 2 * nthrs_base */
+ weight = cfs_cpt_weight(svc->srv_cptable, CFS_CPT_ANY);
+ for (i = 1; (weight >> (i + 1)) != 0 && /* >= 4 cores/HTs */
+ (tc->tc_nthrs_base >> i) != 0; i++)
+ nthrs += tc->tc_nthrs_base >> i;
+ }
+
+ if (tc->tc_thr_factor != 0) {
+ int factor = tc->tc_thr_factor;
+ const int fade = 4;
+
+ /*
+ * User wants to increase number of threads with for
+ * each CPU core/HT, most likely the factor is larger then
+ * one thread/core because service threads are supposed to
+ * be blocked by lock or wait for IO.
+ */
+ /*
+ * Amdahl's law says that adding processors wouldn't give
+ * a linear increasing of parallelism, so it's nonsense to
+ * have too many threads no matter how many cores/HTs
+ * there are.
+ */
+ if (cfs_cpu_ht_nsiblings(0) > 1) { /* weight is # of HTs */
+ /* depress thread factor for hyper-thread */
+ factor = factor - (factor >> 1) + (factor >> 3);
+ }
+
+ weight = cfs_cpt_weight(svc->srv_cptable, 0);
+ LASSERT(weight > 0);
+
+ for (; factor > 0 && weight > 0; factor--, weight -= fade)
+ nthrs += min(weight, fade) * factor;
+ }
+
+ if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
+ nthrs = max(tc->tc_nthrs_base,
+ tc->tc_nthrs_max / svc->srv_ncpts);
+ }
+ out:
+ nthrs = max(nthrs, tc->tc_nthrs_init);
+ svc->srv_nthrs_cpt_limit = nthrs;
+ svc->srv_nthrs_cpt_init = init;
+
+ if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
+ LCONSOLE_WARN("%s: This service may have more threads (%d) "
+ "than the given soft limit (%d)\n",
+ svc->srv_name, nthrs * svc->srv_ncpts,
+ tc->tc_nthrs_max);
+ }
#endif
}
*/
static int
ptlrpc_service_part_init(struct ptlrpc_service *svc,
- struct ptlrpc_service_part *svcpt)
+ struct ptlrpc_service_part *svcpt, int cpt)
{
struct ptlrpc_at_array *array;
int size;
int index;
int rc;
+ svcpt->scp_cpt = cpt;
CFS_INIT_LIST_HEAD(&svcpt->scp_threads);
/* rqbd and incoming request queue */
array->paa_deadline = -1;
/* allocate memory for scp_at_array (ptlrpc_at_array) */
- OBD_ALLOC(array->paa_reqs_array, sizeof(cfs_list_t) * size);
+ OBD_CPT_ALLOC(array->paa_reqs_array,
+ svc->srv_cptable, cpt, sizeof(cfs_list_t) * size);
if (array->paa_reqs_array == NULL)
return -ENOMEM;
for (index = 0; index < size; index++)
CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
- OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
+ OBD_CPT_ALLOC(array->paa_reqs_count,
+ svc->srv_cptable, cpt, sizeof(__u32) * size);
if (array->paa_reqs_count == NULL)
goto failed;
/* assign this before call ptlrpc_grow_req_bufs */
svcpt->scp_service = svc;
/* Now allocate the request buffers, but don't post them now */
- rc = ptlrpc_grow_req_bufs(svcpt);
+ rc = ptlrpc_grow_req_bufs(svcpt, 0);
/* We shouldn't be under memory pressure at startup, so
* fail if we can't allocate all our buffers at this time. */
if (rc != 0)
ptlrpc_register_service(struct ptlrpc_service_conf *conf,
cfs_proc_dir_entry_t *proc_entry)
{
+ struct ptlrpc_service_cpt_conf *cconf = &conf->psc_cpt;
struct ptlrpc_service *service;
+ struct ptlrpc_service_part *svcpt;
+ struct cfs_cpt_table *cptable;
+ __u32 *cpts = NULL;
+ int ncpts;
+ int cpt;
int rc;
+ int i;
ENTRY;
LASSERT(conf->psc_buf.bc_nbufs > 0);
conf->psc_buf.bc_req_max_size + SPTLRPC_MAX_PAYLOAD);
LASSERT(conf->psc_thr.tc_ctx_tags != 0);
- OBD_ALLOC_PTR(service);
- if (service == NULL)
+ cptable = cconf->cc_cptable;
+ if (cptable == NULL)
+ cptable = cfs_cpt_table;
+
+ if (!conf->psc_thr.tc_cpu_affinity) {
+ ncpts = 1;
+ } else {
+ ncpts = cfs_cpt_number(cptable);
+ if (cconf->cc_pattern != NULL) {
+ struct cfs_expr_list *el;
+
+ rc = cfs_expr_list_parse(cconf->cc_pattern,
+ strlen(cconf->cc_pattern),
+ 0, ncpts - 1, &el);
+ if (rc != 0) {
+ CERROR("%s: invalid CPT pattern string: %s",
+ conf->psc_name, cconf->cc_pattern);
+ RETURN(ERR_PTR(-EINVAL));
+ }
+
+ rc = cfs_expr_list_values(el, ncpts, &cpts);
+ cfs_expr_list_free(el);
+ if (rc <= 0) {
+ CERROR("%s: failed to parse CPT array %s: %d\n",
+ conf->psc_name, cconf->cc_pattern, rc);
+ RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL));
+ }
+ ncpts = rc;
+ }
+ }
+
+ OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts]));
+ if (service == NULL) {
+ if (cpts != NULL)
+ OBD_FREE(cpts, sizeof(*cpts) * ncpts);
RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ service->srv_cptable = cptable;
+ service->srv_cpts = cpts;
+ service->srv_ncpts = ncpts;
+
+ service->srv_cpt_bits = 0; /* it's zero already, easy to read... */
+ while ((1 << service->srv_cpt_bits) < cfs_cpt_number(cptable))
+ service->srv_cpt_bits++;
/* public members */
cfs_spin_lock_init(&service->srv_lock);
CFS_INIT_LIST_HEAD(&service->srv_list); /* for safty of cleanup */
/* buffer configuration */
- service->srv_nbuf_per_group = test_req_buffer_pressure ?
- 1 : conf->psc_buf.bc_nbufs;
+ service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 :
+ max(conf->psc_buf.bc_nbufs /
+ service->srv_ncpts, 1U);
service->srv_max_req_size = conf->psc_buf.bc_req_max_size +
SPTLRPC_MAX_PAYLOAD;
service->srv_buf_size = conf->psc_buf.bc_buf_size;
conf->psc_buf.bc_rep_max_size + SPTLRPC_MAX_PAYLOAD)
service->srv_max_reply_size <<= 1;
- ptlrpc_server_nthreads_check(conf, &service->srv_threads_min,
- &service->srv_threads_max);
-
service->srv_thread_name = conf->psc_thr.tc_thr_name;
service->srv_ctx_tags = conf->psc_thr.tc_ctx_tags;
- service->srv_cpu_affinity = !!conf->psc_thr.tc_cpu_affinity;
service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
service->srv_ops = conf->psc_ops;
- OBD_ALLOC_PTR(service->srv_part);
- if (service->srv_part == NULL)
- GOTO(failed, rc = -ENOMEM);
+ for (i = 0; i < ncpts; i++) {
+ if (!conf->psc_thr.tc_cpu_affinity)
+ cpt = CFS_CPT_ANY;
+ else
+ cpt = cpts != NULL ? cpts[i] : i;
- rc = ptlrpc_service_part_init(service, service->srv_part);
- if (rc != 0)
- GOTO(failed, rc);
+ OBD_CPT_ALLOC(svcpt, cptable, cpt, sizeof(*svcpt));
+ if (svcpt == NULL)
+ GOTO(failed, rc = -ENOMEM);
+
+ service->srv_parts[i] = svcpt;
+ rc = ptlrpc_service_part_init(service, svcpt, cpt);
+ if (rc != 0)
+ GOTO(failed, rc);
+ }
+
+ ptlrpc_server_nthreads_check(service, conf);
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT(rc == 0);
/* cull some history?
* I expect only about 1 or 2 rqbds need to be recycled here */
- while (svcpt->scp_hist_nrqbds > svc->srv_max_history_rqbds) {
+ while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
rqbd = cfs_list_entry(svcpt->scp_hist_rqbds.next,
struct ptlrpc_request_buffer_desc,
rqbd_list);
cfs_list_entry (tmp, struct ptlrpc_service, srv_list);
struct ptlrpc_service_part *svcpt;
- svcpt = svc->srv_part;
+ LASSERT(svc->srv_ncpts == 1);
+ svcpt = svc->srv_parts[0];
if (svcpt->scp_nthrs_running != 0) /* I've recursed */
continue;
* space. */
if (avail <= low_water)
- ptlrpc_grow_req_bufs(svcpt);
+ ptlrpc_grow_req_bufs(svcpt, 1);
if (svcpt->scp_service->srv_stats) {
lprocfs_counter_add(svcpt->scp_service->srv_stats,
ptlrpc_threads_increasable(struct ptlrpc_service_part *svcpt)
{
return svcpt->scp_nthrs_running +
- svcpt->scp_nthrs_starting < svcpt->scp_service->srv_threads_max;
+ svcpt->scp_nthrs_starting <
+ svcpt->scp_service->srv_nthrs_cpt_limit;
}
/**
thread->t_pid = cfs_curproc_pid();
cfs_daemonize_ctxt(thread->t_name);
-#if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
- /* we need to do this before any per-thread allocation is done so that
- * we get the per-thread allocations on local node. bug 7342 */
- if (svc->srv_cpu_affinity) {
- int cpu, num_cpu;
-
- for (cpu = 0, num_cpu = 0; cpu < cfs_num_possible_cpus();
- cpu++) {
- if (!cpu_online(cpu))
- continue;
- if (num_cpu == thread->t_id % cfs_num_online_cpus())
- break;
- num_cpu++;
- }
- cfs_set_cpus_allowed(cfs_current(),
- node_to_cpumask(cpu_to_node(cpu)));
- }
-#endif
+ /* NB: we will call cfs_cpt_bind() for all threads, because we
+ * might want to run lustre server only on a subset of system CPUs,
+ * in that case ->scp_cpt is CFS_CPT_ANY */
+ rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
+ if (rc != 0) {
+ CWARN("%s: failed to bind %s on CPT %d\n",
+ svc->srv_name, thread->t_name, svcpt->scp_cpt);
+ }
#ifdef WITH_GROUP_INFO
ginfo = cfs_groups_alloc(0);
env->le_ctx.lc_thread = thread;
env->le_ctx.lc_cookie = 0x6;
+ while (!cfs_list_empty(&svcpt->scp_rqbd_idle)) {
+ rc = ptlrpc_server_post_idle_rqbds(svcpt);
+ if (rc >= 0)
+ continue;
+
+ CERROR("Failed to post rqbd for %s on CPT %d: %d\n",
+ svc->srv_name, svcpt->scp_cpt, rc);
+ goto out_srv_fini;
+ }
+
/* Alloc reply state structure for this one */
OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
if (!rs) {
*/
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
+ struct ptlrpc_service_part *svcpt;
+ int i;
ENTRY;
- if (svc != NULL && svc->srv_part != NULL)
- ptlrpc_svcpt_stop_threads(svc->srv_part);
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service != NULL)
+ ptlrpc_svcpt_stop_threads(svcpt);
+ }
+
EXIT;
}
int ptlrpc_start_threads(struct ptlrpc_service *svc)
{
- int i, rc = 0;
- ENTRY;
+ int rc = 0;
+ int i;
+ int j;
+ ENTRY;
- /* We require 2 threads min - see note in
- ptlrpc_server_handle_request */
- LASSERT(svc->srv_threads_min >= 2);
- for (i = 0; i < svc->srv_threads_min; i++) {
- rc = ptlrpc_start_thread(svc->srv_part, 1);
- /* We have enough threads, don't start more. b=15759 */
- if (rc == -EMFILE) {
- rc = 0;
- break;
- }
- if (rc) {
- CERROR("cannot start %s thread #%d: rc %d\n",
- svc->srv_thread_name, i, rc);
- ptlrpc_stop_all_threads(svc);
- break;
- }
- }
- RETURN(rc);
+ /* We require 2 threads min, see note in ptlrpc_server_handle_request */
+ LASSERT(svc->srv_nthrs_cpt_init >= PTLRPC_NTHRS_INIT);
+
+ for (i = 0; i < svc->srv_ncpts; i++) {
+ for (j = 0; j < svc->srv_nthrs_cpt_init; j++) {
+ rc = ptlrpc_start_thread(svc->srv_parts[i], 1);
+ if (rc == 0)
+ continue;
+
+ if (rc != -EMFILE)
+ goto failed;
+ /* We have enough threads, don't start more. b=15759 */
+ break;
+ }
+ }
+
+ RETURN(0);
+ failed:
+ CERROR("cannot start %s thread #%d_%d: rc %d\n",
+ svc->srv_thread_name, i, j, rc);
+ ptlrpc_stop_all_threads(svc);
+ RETURN(rc);
}
int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
LASSERT(svcpt != NULL);
- CDEBUG(D_RPCTRACE, "%s started %d min %d max %d\n",
- svc->srv_name, svcpt->scp_nthrs_running,
- svc->srv_threads_min, svc->srv_threads_max);
+ CDEBUG(D_RPCTRACE, "%s[%d] started %d min %d max %d\n",
+ svc->srv_name, svcpt->scp_cpt, svcpt->scp_nthrs_running,
+ svc->srv_nthrs_cpt_init, svc->srv_nthrs_cpt_limit);
again:
if (unlikely(svc->srv_is_stopping))
if (!ptlrpc_threads_increasable(svcpt) ||
(OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
- svcpt->scp_nthrs_running == svc->srv_threads_min - 1))
+ svcpt->scp_nthrs_running == svc->srv_nthrs_cpt_init - 1))
RETURN(-EMFILE);
- OBD_ALLOC_PTR(thread);
+ OBD_CPT_ALLOC_PTR(thread, svc->srv_cptable, svcpt->scp_cpt);
if (thread == NULL)
RETURN(-ENOMEM);
cfs_waitq_init(&thread->t_ctl_waitq);
cfs_list_add(&thread->t_link, &svcpt->scp_threads);
cfs_spin_unlock(&svcpt->scp_lock);
- snprintf(thread->t_name, PTLRPC_THR_NAME_LEN,
- "%s_%02d", svc->srv_thread_name, thread->t_id);
+ if (svcpt->scp_cpt >= 0) {
+ snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, "%s%02d_%03d",
+ svc->srv_thread_name, svcpt->scp_cpt, thread->t_id);
+ } else {
+ snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, "%s_%04d",
+ svc->srv_thread_name, thread->t_id);
+ }
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
/*
static void
ptlrpc_service_del_atimer(struct ptlrpc_service *svc)
{
- struct ptlrpc_service_part *svcpt;
+ struct ptlrpc_service_part *svcpt;
+ int i;
/* early disarm AT timer... */
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
- break;
-
- cfs_timer_disarm(&svcpt->scp_at_timer);
- } while (0);
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service != NULL)
+ cfs_timer_disarm(&svcpt->scp_at_timer);
+ }
}
static void
struct ptlrpc_request_buffer_desc *rqbd;
struct l_wait_info lwi;
int rc;
+ int i;
- /* All history will be culled when the next request buffer is
+ /* All history will be culled when the next request buffer is
* freed in ptlrpc_service_purge_all() */
- svc->srv_max_history_rqbds = 0;
+ svc->srv_hist_nrqbds_cpt_max = 0;
rc = LNetClearLazyPortal(svc->srv_req_portal);
LASSERT(rc == 0);
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
/* Unlink all the request buffers. This forces a 'final'
rc = LNetMDUnlink(rqbd->rqbd_md_h);
LASSERT(rc == 0 || rc == -ENOENT);
}
- } while (0);
+ }
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
/* Wait for the network to release any buffers
cfs_spin_lock(&svcpt->scp_lock);
}
cfs_spin_unlock(&svcpt->scp_lock);
- } while (0);
+ }
}
static void
struct ptlrpc_request_buffer_desc *rqbd;
struct ptlrpc_request *req;
struct ptlrpc_reply_state *rs;
+ int i;
- do { /* iterrate over multiple partitions in the future */
- /* schedule all outstanding replies to terminate them */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
cfs_spin_lock(&svcpt->scp_rep_lock);
cfs_list_del(&rs->rs_list);
OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
}
- } while (0);
+ }
}
static void
{
struct ptlrpc_service_part *svcpt;
struct ptlrpc_at_array *array;
+ int i;
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
/* In case somebody rearmed this in the meantime */
sizeof(__u32) * array->paa_size);
array->paa_reqs_count = NULL;
}
- svcpt->scp_service = NULL;
- } while (0);
+ }
+
+ ptlrpc_service_for_each_part(svcpt, i, svc)
+ OBD_FREE_PTR(svcpt);
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt != NULL)
- OBD_FREE_PTR(svcpt);
- } while (0);
+ if (svc->srv_cpts != NULL)
+ cfs_expr_list_values_free(svc->srv_cpts, svc->srv_ncpts);
- OBD_FREE_PTR(svc);
+ OBD_FREE(svc, offsetof(struct ptlrpc_service,
+ srv_parts[svc->srv_ncpts]));
}
int ptlrpc_unregister_service(struct ptlrpc_service *service)
* Right now, it just checks to make sure that requests aren't languishing
* in the queue. We'll use this health check to govern whether a node needs
* to be shot, so it's intentionally non-aggressive. */
-int ptlrpc_service_health_check(struct ptlrpc_service *svc)
+int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
{
- struct ptlrpc_service_part *svcpt;
struct ptlrpc_request *request;
struct timeval right_now;
long timediff;
- if (svc == NULL || svc->srv_part == NULL)
- return 0;
-
cfs_gettimeofday(&right_now);
- svcpt = svc->srv_part;
cfs_spin_lock(&svcpt->scp_req_lock);
if (!ptlrpc_server_request_pending(svcpt, 1)) {
cfs_spin_unlock(&svcpt->scp_req_lock);
return 0;
}
+
+int
+ptlrpc_service_health_check(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_service_part *svcpt;
+ int i;
+
+ if (svc == NULL || svc->srv_parts == NULL)
+ return 0;
+
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ int rc = ptlrpc_svcpt_health_check(svcpt);
+
+ if (rc != 0)
+ return rc;
+ }
+ return 0;
+}