- LASSERT (nbufs > 0);
- LASSERT (bufsize >= max_req_size + SPTLRPC_MAX_PAYLOAD);
- LASSERT (ctx_tags != 0);
-
- OBD_ALLOC_PTR(service);
- if (service == NULL)
- RETURN(NULL);
-
- /* First initialise enough for early teardown */
-
- service->srv_name = name;
- spin_lock_init(&service->srv_lock);
- CFS_INIT_LIST_HEAD(&service->srv_threads);
- cfs_waitq_init(&service->srv_waitq);
-
- service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : nbufs;
- service->srv_max_req_size = max_req_size + SPTLRPC_MAX_PAYLOAD;
- service->srv_buf_size = bufsize;
- service->srv_rep_portal = rep_portal;
- service->srv_req_portal = req_portal;
- service->srv_watchdog_factor = watchdog_factor;
- service->srv_handler = handler;
- service->srv_request_history_print_fn = svcreq_printfn;
- service->srv_request_seq = 1; /* valid seq #s start at 1 */
- service->srv_request_max_cull_seq = 0;
- service->srv_threads_min = min_threads;
- service->srv_threads_max = max_threads;
- service->srv_thread_name = threadname;
- service->srv_ctx_tags = ctx_tags;
-
- rc = LNetSetLazyPortal(service->srv_req_portal);
- LASSERT (rc == 0);
-
- CFS_INIT_LIST_HEAD(&service->srv_request_queue);
- CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
- CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
- CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
- CFS_INIT_LIST_HEAD(&service->srv_request_history);
- CFS_INIT_LIST_HEAD(&service->srv_active_replies);
- CFS_INIT_LIST_HEAD(&service->srv_reply_queue);
- CFS_INIT_LIST_HEAD(&service->srv_free_rs_list);
- cfs_waitq_init(&service->srv_free_rs_waitq);
-
- spin_lock_init(&service->srv_at_lock);
- CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
- CFS_INIT_LIST_HEAD(&service->srv_at_list);
- cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
- /* At SOW, service time should be quick; 10s seems generous. If client
- timeout is less than this, we'll be sending an early reply. */
- at_init(&service->srv_at_estimate, 10, 0);
-
- spin_lock (&ptlrpc_all_services_lock);
- list_add (&service->srv_list, &ptlrpc_all_services);
- spin_unlock (&ptlrpc_all_services_lock);
-
- /* Now allocate the request buffers */
- rc = ptlrpc_grow_req_bufs(service);
- /* We shouldn't be under memory pressure at startup, so
- * fail if we can't post all our buffers at this time. */
- if (rc != 0)
- GOTO(failed, NULL);
-
- /* Now allocate pool of reply buffers */
- /* Increase max reply size to next power of two */
- service->srv_max_reply_size = 1;
- while (service->srv_max_reply_size <
- max_reply_size + SPTLRPC_MAX_PAYLOAD)
- service->srv_max_reply_size <<= 1;
+/**
+ * Initialize service on a given portal.
+ * This includes starting serving threads , allocating and posting rqbds and
+ * so on.
+ */
+struct ptlrpc_service *
+ptlrpc_register_service(struct ptlrpc_service_conf *conf,
+ cfs_proc_dir_entry_t *proc_entry)
+{
+ struct ptlrpc_service_cpt_conf *cconf = &conf->psc_cpt;
+ struct ptlrpc_service *service;
+ struct ptlrpc_service_part *svcpt;
+ struct cfs_cpt_table *cptable;
+ __u32 *cpts = NULL;
+ int ncpts;
+ int cpt;
+ int rc;
+ int i;
+ ENTRY;
+
+ LASSERT(conf->psc_buf.bc_nbufs > 0);
+ LASSERT(conf->psc_buf.bc_buf_size >=
+ conf->psc_buf.bc_req_max_size + SPTLRPC_MAX_PAYLOAD);
+ LASSERT(conf->psc_thr.tc_ctx_tags != 0);
+
+ cptable = cconf->cc_cptable;
+ if (cptable == NULL)
+ cptable = cfs_cpt_table;
+
+ if (!conf->psc_thr.tc_cpu_affinity) {
+ ncpts = 1;
+ } else {
+ ncpts = cfs_cpt_number(cptable);
+ if (cconf->cc_pattern != NULL) {
+ struct cfs_expr_list *el;
+
+ rc = cfs_expr_list_parse(cconf->cc_pattern,
+ strlen(cconf->cc_pattern),
+ 0, ncpts - 1, &el);
+ if (rc != 0) {
+ CERROR("%s: invalid CPT pattern string: %s",
+ conf->psc_name, cconf->cc_pattern);
+ RETURN(ERR_PTR(-EINVAL));
+ }
+
+ rc = cfs_expr_list_values(el, ncpts, &cpts);
+ cfs_expr_list_free(el);
+ if (rc <= 0) {
+ CERROR("%s: failed to parse CPT array %s: %d\n",
+ conf->psc_name, cconf->cc_pattern, rc);
+ RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL));
+ }
+ ncpts = rc;
+ }
+ }
+
+ OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts]));
+ if (service == NULL) {
+ if (cpts != NULL)
+ OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+ RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ service->srv_cptable = cptable;
+ service->srv_cpts = cpts;
+ service->srv_ncpts = ncpts;
+
+ service->srv_cpt_bits = 0; /* it's zero already, easy to read... */
+ while ((1 << service->srv_cpt_bits) < cfs_cpt_number(cptable))
+ service->srv_cpt_bits++;
+
+ /* public members */
+ cfs_spin_lock_init(&service->srv_lock);
+ service->srv_name = conf->psc_name;
+ service->srv_watchdog_factor = conf->psc_watchdog_factor;
+ CFS_INIT_LIST_HEAD(&service->srv_list); /* for safty of cleanup */
+
+ /* buffer configuration */
+ service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 :
+ max(conf->psc_buf.bc_nbufs /
+ service->srv_ncpts, 1U);
+ service->srv_max_req_size = conf->psc_buf.bc_req_max_size +
+ SPTLRPC_MAX_PAYLOAD;
+ service->srv_buf_size = conf->psc_buf.bc_buf_size;
+ service->srv_rep_portal = conf->psc_buf.bc_rep_portal;
+ service->srv_req_portal = conf->psc_buf.bc_req_portal;
+
+ /* Increase max reply size to next power of two */
+ service->srv_max_reply_size = 1;
+ while (service->srv_max_reply_size <
+ conf->psc_buf.bc_rep_max_size + SPTLRPC_MAX_PAYLOAD)
+ service->srv_max_reply_size <<= 1;
+
+ service->srv_thread_name = conf->psc_thr.tc_thr_name;
+ service->srv_ctx_tags = conf->psc_thr.tc_ctx_tags;
+ service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
+ service->srv_ops = conf->psc_ops;
+
+ for (i = 0; i < ncpts; i++) {
+ if (!conf->psc_thr.tc_cpu_affinity)
+ cpt = CFS_CPT_ANY;
+ else
+ cpt = cpts != NULL ? cpts[i] : i;
+
+ OBD_CPT_ALLOC(svcpt, cptable, cpt, sizeof(*svcpt));
+ if (svcpt == NULL)
+ GOTO(failed, rc = -ENOMEM);
+
+ service->srv_parts[i] = svcpt;
+ rc = ptlrpc_service_part_init(service, svcpt, cpt);
+ if (rc != 0)
+ GOTO(failed, rc);
+ }
+
+ ptlrpc_server_nthreads_check(service, conf);
+
+ rc = LNetSetLazyPortal(service->srv_req_portal);
+ LASSERT(rc == 0);
+
+ cfs_spin_lock (&ptlrpc_all_services_lock);
+ cfs_list_add (&service->srv_list, &ptlrpc_all_services);
+ cfs_spin_unlock (&ptlrpc_all_services_lock);