if (cli == NULL) {
CDEBUG(D_INFO|D_WARNING, "%s: detached "
- "sequence mgr client %s\n", seq->seq_name,
- cli->seq_exp->exp_client_uuid.uuid);
- seq->seq_cli = cli;
+ "sequence mgr client %s\n", seq->lss_name,
+ cli->lcs_exp->exp_client_uuid.uuid);
+ seq->lss_cli = cli;
RETURN(0);
}
- if (seq->seq_cli) {
+ if (seq->lss_cli) {
CERROR("%s: sequence-controller is already "
- "assigned\n", seq->seq_name);
+ "assigned\n", seq->lss_name);
RETURN(-EINVAL);
}
CDEBUG(D_INFO|D_WARNING, "%s: attached "
- "sequence client %s\n", seq->seq_name,
- cli->seq_exp->exp_client_uuid.uuid);
+ "sequence client %s\n", seq->lss_name,
+ cli->lcs_exp->exp_client_uuid.uuid);
/* asking client for new range, assign that range to ->seq_super and
* write seq state to backing store should be atomic. */
- down(&seq->seq_sem);
+ down(&seq->lss_sem);
/* assign controller */
- seq->seq_cli = cli;
+ seq->lss_cli = cli;
/* get new range from controller only if super-sequence is not yet
* initialized from backing store or something else. */
- if (range_is_zero(&seq->seq_super)) {
+ if (range_is_zero(&seq->lss_super)) {
rc = seq_client_alloc_super(cli);
if (rc) {
CERROR("can't allocate super-sequence, "
}
/* take super-seq from client seq mgr */
- LASSERT(range_is_sane(&cli->seq_range));
+ LASSERT(range_is_sane(&cli->lcs_range));
- seq->seq_super = cli->seq_range;
+ seq->lss_super = cli->lcs_range;
/* save init seq to backing store. */
rc = seq_store_write(seq, ctx);
EXIT;
out_up:
- up(&seq->seq_sem);
+ up(&seq->lss_sem);
return rc;
}
EXPORT_SYMBOL(seq_server_set_cli);
struct lu_range *range,
const struct lu_context *ctx)
{
- struct lu_range *space = &seq->seq_space;
+ struct lu_range *space = &seq->lss_space;
int rc;
ENTRY;
LASSERT(range_is_sane(space));
- if (range_space(space) < seq->seq_super_width) {
+ if (range_space(space) < seq->lss_super_width) {
CWARN("sequences space is going to exhaust soon. "
- "Only can allocate "LPU64" sequences\n",
- space->lr_end - space->lr_start);
+ "Can allocate only "LPU64" sequences\n",
+ range_space(space));
*range = *space;
space->lr_start = space->lr_end;
rc = 0;
CERROR("sequences space is exhausted\n");
rc = -ENOSPC;
} else {
- range_alloc(range, space, seq->seq_super_width);
+ range_alloc(range, space, seq->lss_super_width);
rc = 0;
}
if (rc == 0) {
CDEBUG(D_INFO, "%s: allocated super-sequence "
- "["LPX64"-"LPX64"]\n", seq->seq_name,
- range->lr_start, range->lr_end);
+ DRANGE"\n", seq->lss_name, PRANGE(range));
}
RETURN(rc);
int rc;
ENTRY;
- down(&seq->seq_sem);
+ down(&seq->lss_sem);
rc = __seq_server_alloc_super(seq, range, ctx);
- up(&seq->seq_sem);
+ up(&seq->lss_sem);
RETURN(rc);
}
struct lu_range *range,
const struct lu_context *ctx)
{
- struct lu_range *super = &seq->seq_super;
+ struct lu_range *super = &seq->lss_super;
int rc = 0;
ENTRY;
/* XXX: here we should avoid cascading RPCs using kind of async
* preallocation when meta-sequence is close to exhausting. */
if (range_is_exhausted(super)) {
- if (!seq->seq_cli) {
+ if (!seq->lss_cli) {
CERROR("no seq-controller client is setup\n");
RETURN(-EOPNOTSUPP);
}
- rc = seq_client_alloc_super(seq->seq_cli);
+ rc = seq_client_alloc_super(seq->lss_cli);
if (rc) {
CERROR("can't allocate new super-sequence, "
"rc %d\n", rc);
}
/* saving new range into allocation space. */
- *super = seq->seq_cli->seq_range;
+ *super = seq->lss_cli->lcs_range;
LASSERT(range_is_sane(super));
}
- range_alloc(range, super, seq->seq_meta_width);
+ range_alloc(range, super, seq->lss_meta_width);
rc = seq_store_write(seq, ctx);
if (rc) {
if (rc == 0) {
CDEBUG(D_INFO, "%s: allocated meta-sequence "
- "["LPX64"-"LPX64"]\n", seq->seq_name,
- range->lr_start, range->lr_end);
+ DRANGE"\n", seq->lss_name, PRANGE(range));
}
RETURN(rc);
int rc;
ENTRY;
- down(&seq->seq_sem);
+ down(&seq->lss_sem);
rc = __seq_server_alloc_meta(seq, range, ctx);
- up(&seq->seq_sem);
+ up(&seq->lss_sem);
RETURN(rc);
}
static int seq_req_handle0(const struct lu_context *ctx,
- struct ptlrpc_request *req)
+ struct ptlrpc_request *req,
+ struct seq_thread_info *info)
{
- int rep_buf_size[2] = { -1, -1 };
- struct req_capsule pill;
struct lu_site *site;
struct lu_range *out;
int rc = -EPROTO;
site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
LASSERT(site != NULL);
- req_capsule_init(&pill, req, RCL_SERVER, rep_buf_size);
-
- req_capsule_set(&pill, &RQF_SEQ_QUERY);
- req_capsule_pack(&pill);
+ rc = req_capsule_pack(&info->sti_pill);
+ if (rc)
+ RETURN(rc);
- opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
+ opc = req_capsule_client_get(&info->sti_pill,
+ &RMF_SEQ_OPC);
if (opc != NULL) {
- out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
- if (out == NULL) {
- CERROR("can't get range buffer\n");
- GOTO(out_pill, rc= -EPROTO);
- }
+ out = req_capsule_server_get(&info->sti_pill,
+ &RMF_SEQ_RANGE);
+ if (out == NULL)
+ RETURN(-EPROTO);
switch (*opc) {
case SEQ_ALLOC_META:
if (!site->ls_server_seq) {
- CERROR("sequence-server is not initialized\n");
- GOTO(out_pill, rc == -EINVAL);
+ CERROR("sequence-server is not "
+ "initialized\n");
+ RETURN(-EINVAL);
}
- rc = seq_server_alloc_meta(site->ls_server_seq, out, ctx);
+ rc = seq_server_alloc_meta(site->ls_server_seq,
+ out, ctx);
break;
case SEQ_ALLOC_SUPER:
if (!site->ls_control_seq) {
- CERROR("sequence-controller is not initialized\n");
- GOTO(out_pill, rc == -EINVAL);
+ CERROR("sequence-controller is not "
+ "initialized\n");
+ RETURN(-EINVAL);
}
- rc = seq_server_alloc_super(site->ls_control_seq, out, ctx);
+ rc = seq_server_alloc_super(site->ls_control_seq,
+ out, ctx);
break;
default:
- CERROR("wrong opc 0x%x\n", *opc);
+ CERROR("wrong opc %#x\n", *opc);
break;
}
- } else {
- CERROR("cannot unpack client request\n");
}
-out_pill:
- EXIT;
- req_capsule_fini(&pill);
- return rc;
+ RETURN(rc);
+}
+
+static void *seq_thread_init(const struct lu_context *ctx,
+ struct lu_context_key *key)
+{
+ struct seq_thread_info *info;
+
+ /*
+ * check that no high order allocations are incurred.
+ */
+ CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
+ OBD_ALLOC_PTR(info);
+ if (info == NULL)
+ info = ERR_PTR(-ENOMEM);
+ return info;
+}
+
+static void seq_thread_fini(const struct lu_context *ctx,
+ struct lu_context_key *key, void *data)
+{
+ struct seq_thread_info *info = data;
+ OBD_FREE_PTR(info);
+}
+
+struct lu_context_key seq_thread_key = {
+ .lct_tags = LCT_MD_THREAD,
+ .lct_init = seq_thread_init,
+ .lct_fini = seq_thread_fini
+};
+
+static void seq_thread_info_init(struct ptlrpc_request *req,
+ struct seq_thread_info *info)
+{
+ int i;
+
+ /* mark rep buffer as req-layout stuff expects */
+ for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
+ info->sti_rep_buf_size[i] = -1;
+
+ /* init request capsule */
+ req_capsule_init(&info->sti_pill, req, RCL_SERVER,
+ info->sti_rep_buf_size);
+
+ req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
+}
+
+static void seq_thread_info_fini(struct seq_thread_info *info)
+{
+ req_capsule_fini(&info->sti_pill);
}
static int seq_req_handle(struct ptlrpc_request *req)
{
- int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
const struct lu_context *ctx;
- int rc = -EPROTO;
+ struct seq_thread_info *info;
+ int rc = 0;
ENTRY;
OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
ctx = req->rq_svc_thread->t_ctx;
LASSERT(ctx != NULL);
LASSERT(ctx->lc_thread == req->rq_svc_thread);
- if (req->rq_reqmsg->opc == SEQ_QUERY) {
+
+ info = lu_context_key_get(ctx, &seq_thread_key);
+ LASSERT(info != NULL);
+
+ seq_thread_info_init(req, info);
+
+ if (lustre_msg_get_opc(req->rq_reqmsg) == SEQ_QUERY) {
if (req->rq_export != NULL) {
- rc = seq_req_handle0(ctx, req);
+ /*
+ * no need to return error here and overwrite @rc, this
+ * function should return 0 even if seq_req_handle0()
+ * returns some error code.
+ */
+ seq_req_handle0(ctx, req, info);
} else {
CERROR("Unconnected request\n");
req->rq_status = -ENOTCONN;
}
} else {
- CERROR("Wrong opcode: %d\n",
- req->rq_reqmsg->opc);
+ CERROR("Wrong opcode: %d\n",
+ lustre_msg_get_opc(req->rq_reqmsg));
req->rq_status = -ENOTSUPP;
rc = ptlrpc_error(req);
- RETURN(rc);
+ GOTO(out_info, rc);
}
- target_send_reply(req, rc, fail);
- RETURN(0);
+ target_send_reply(req, rc, OBD_FAIL_SEQ_ALL_REPLY_NET);
+ EXIT;
+out_info:
+ seq_thread_info_fini(info);
+ return rc;
}
+static void seq_server_proc_fini(struct lu_server_seq *seq);
+
#ifdef LPROCFS
static int seq_server_proc_init(struct lu_server_seq *seq)
{
int rc;
ENTRY;
- seq->seq_proc_dir = lprocfs_register(seq->seq_name,
+ seq->lss_proc_dir = lprocfs_register(seq->lss_name,
proc_lustre_root,
NULL, NULL);
- if (IS_ERR(seq->seq_proc_dir)) {
- CERROR("LProcFS failed in seq-init\n");
- rc = PTR_ERR(seq->seq_proc_dir);
- GOTO(err, rc);
+ if (IS_ERR(seq->lss_proc_dir)) {
+ rc = PTR_ERR(seq->lss_proc_dir);
+ RETURN(rc);
}
- seq->seq_proc_entry = lprocfs_register("services",
- seq->seq_proc_dir,
+ seq->lss_proc_entry = lprocfs_register("services",
+ seq->lss_proc_dir,
NULL, NULL);
- if (IS_ERR(seq->seq_proc_entry)) {
- CERROR("LProcFS failed in seq-init\n");
- rc = PTR_ERR(seq->seq_proc_entry);
- GOTO(err_type, rc);
+ if (IS_ERR(seq->lss_proc_entry)) {
+ rc = PTR_ERR(seq->lss_proc_entry);
+ GOTO(out_cleanup, rc);
}
- rc = lprocfs_add_vars(seq->seq_proc_dir,
+ rc = lprocfs_add_vars(seq->lss_proc_dir,
seq_server_proc_list, seq);
if (rc) {
CERROR("can't init sequence manager "
"proc, rc %d\n", rc);
- GOTO(err_entry, rc);
+ GOTO(out_cleanup, rc);
}
RETURN(0);
-err_entry:
- lprocfs_remove(seq->seq_proc_entry);
-err_type:
- lprocfs_remove(seq->seq_proc_dir);
-err:
- seq->seq_proc_dir = NULL;
- seq->seq_proc_entry = NULL;
+out_cleanup:
+ seq_server_proc_fini(seq);
return rc;
}
static void seq_server_proc_fini(struct lu_server_seq *seq)
{
ENTRY;
- if (seq->seq_proc_entry) {
- lprocfs_remove(seq->seq_proc_entry);
- seq->seq_proc_entry = NULL;
+ if (seq->lss_proc_entry != NULL) {
+ if (!IS_ERR(seq->lss_proc_entry))
+ lprocfs_remove(seq->lss_proc_entry);
+ seq->lss_proc_entry = NULL;
}
- if (seq->seq_proc_dir) {
- lprocfs_remove(seq->seq_proc_dir);
- seq->seq_proc_dir = NULL;
+ if (seq->lss_proc_dir != NULL) {
+ if (!IS_ERR(seq->lss_proc_dir))
+ lprocfs_remove(seq->lss_proc_dir);
+ seq->lss_proc_dir = NULL;
}
EXIT;
}
+#else
+static int seq_server_proc_init(struct lu_server_seq *seq)
+{
+ return 0;
+}
+
+static void seq_server_proc_fini(struct lu_server_seq *seq)
+{
+ return;
+}
#endif
+#define LUSTRE_MD_SEQ_NAME "seq-md"
+#define LUSTRE_CT_SEQ_NAME "seq-ct"
+#define LUSTRE_DT_SEQ_NAME "seq-dt"
+
int seq_server_init(struct lu_server_seq *seq,
struct dt_device *dev,
const char *uuid,
- lu_server_type_t type,
+ enum lu_mgr_type type,
const struct lu_context *ctx)
{
- int rc, portal = (type == LUSTRE_SEQ_SRV) ?
- SEQ_SRV_PORTAL : SEQ_CTLR_PORTAL;
-
- struct ptlrpc_service_conf seq_conf = {
+ int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
+
+ struct ptlrpc_service_conf seq_md_conf = {
.psc_nbufs = MDS_NBUFS,
.psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = MDS_MAXREQSIZE,
- .psc_max_reply_size = MDS_MAXREPSIZE,
- .psc_req_portal = portal,
+ .psc_max_req_size = SEQ_MAXREQSIZE,
+ .psc_max_reply_size = SEQ_MAXREPSIZE,
+ .psc_req_portal = (is_srv ?
+ SEQ_METADATA_PORTAL :
+ SEQ_CONTROLLER_PORTAL),
.psc_rep_portal = MDC_REPLY_PORTAL,
.psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
.psc_num_threads = SEQ_NUM_THREADS,
.psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
};
+ struct ptlrpc_service_conf seq_dt_conf = {
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = SEQ_MAXREQSIZE,
+ .psc_max_reply_size = SEQ_MAXREPSIZE,
+ .psc_req_portal = SEQ_DATA_PORTAL,
+ .psc_rep_portal = OSC_REPLY_PORTAL,
+ .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
+ .psc_num_threads = SEQ_NUM_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+ };
ENTRY;
LASSERT(dev != NULL);
LASSERT(uuid != NULL);
- seq->seq_dev = dev;
- seq->seq_cli = NULL;
- seq->seq_type = type;
- sema_init(&seq->seq_sem, 1);
+ seq->lss_cli = NULL;
+ seq->lss_type = type;
+ sema_init(&seq->lss_sem, 1);
- seq->seq_super_width = LUSTRE_SEQ_SUPER_WIDTH;
- seq->seq_meta_width = LUSTRE_SEQ_META_WIDTH;
+ seq->lss_super_width = LUSTRE_SEQ_SUPER_WIDTH;
+ seq->lss_meta_width = LUSTRE_SEQ_META_WIDTH;
- snprintf(seq->seq_name, sizeof(seq->seq_name), "%s-%s-%s",
- LUSTRE_SEQ_NAME, (type == LUSTRE_SEQ_SRV ? "srv" : "ctl"),
+ snprintf(seq->lss_name, sizeof(seq->lss_name), "%s-%s-%s",
+ LUSTRE_SEQ_NAME, (is_srv ? "srv" : "ctl"),
uuid);
- seq->seq_space = LUSTRE_SEQ_SPACE_RANGE;
- seq->seq_super = LUSTRE_SEQ_ZERO_RANGE;
-
- lu_device_get(&seq->seq_dev->dd_lu_dev);
+ seq->lss_space = LUSTRE_SEQ_SPACE_RANGE;
+ seq->lss_super = LUSTRE_SEQ_ZERO_RANGE;
- rc = seq_store_init(seq, ctx);
+ rc = seq_store_init(seq, ctx, dev);
if (rc)
GOTO(out, rc);
/* request backing store for saved sequence info */
rc = seq_store_read(seq, ctx);
if (rc == -ENODATA) {
- if (type == LUSTRE_SEQ_SRV) {
- CDEBUG(D_INFO|D_WARNING, "%s: no data on "
- "disk found, wait for controller "
- "attach\n", seq->seq_name);
- } else {
- CDEBUG(D_INFO|D_WARNING, "%s: no data on "
- "disk found, this is first controller "
- "run\n", seq->seq_name);
- }
+ CDEBUG(D_INFO|D_WARNING, "%s: no data on "
+ "storage was found, %s\n", seq->lss_name,
+ is_srv ? "wait for controller attach" :
+ "this is first controller run");
} else if (rc) {
CERROR("can't read sequence state, rc = %d\n",
rc);
GOTO(out, rc);
}
-#ifdef LPROCFS
rc = seq_server_proc_init(seq);
if (rc)
GOTO(out, rc);
-#endif
- seq->seq_service = ptlrpc_init_svc_conf(&seq_conf,
- seq_req_handle,
- LUSTRE_SEQ_NAME,
- seq->seq_proc_entry,
- NULL);
- if (seq->seq_service != NULL)
- rc = ptlrpc_start_threads(NULL, seq->seq_service,
- LUSTRE_SEQ_NAME);
+ seq->lss_md_service = ptlrpc_init_svc_conf(&seq_md_conf,
+ seq_req_handle,
+ LUSTRE_SEQ_NAME"_md",
+ seq->lss_proc_entry,
+ NULL);
+ if (seq->lss_md_service != NULL)
+ rc = ptlrpc_start_threads(NULL, seq->lss_md_service,
+ is_srv ? LUSTRE_MD_SEQ_NAME :
+ LUSTRE_CT_SEQ_NAME);
else
- rc = -ENOMEM;
-
+ GOTO(out, rc = -ENOMEM);
+
+ /*
+ * we want to have really cluster-wide sequences space. This is why we
+ * start only one sequence controller which manages space.
+ */
+ if (is_srv) {
+ seq->lss_dt_service = ptlrpc_init_svc_conf(&seq_dt_conf,
+ seq_req_handle,
+ LUSTRE_SEQ_NAME"_dt",
+ seq->lss_proc_entry,
+ NULL);
+ if (seq->lss_dt_service != NULL)
+ rc = ptlrpc_start_threads(NULL, seq->lss_dt_service,
+ LUSTRE_DT_SEQ_NAME);
+ else
+ GOTO(out, rc = -ENOMEM);
+ }
+
EXIT;
-
out:
if (rc) {
seq_server_fini(seq, ctx);
} else {
CDEBUG(D_INFO|D_WARNING, "%s Sequence Manager\n",
- (type == LUSTRE_SEQ_SRV ? "Server" : "Controller"));
+ (is_srv ? "Server" : "Controller"));
}
return rc;
}
{
ENTRY;
- if (seq->seq_service != NULL) {
- ptlrpc_unregister_service(seq->seq_service);
- seq->seq_service = NULL;
+ if (seq->lss_md_service != NULL) {
+ ptlrpc_unregister_service(seq->lss_md_service);
+ seq->lss_md_service = NULL;
}
-#ifdef LPROCFS
- seq_server_proc_fini(seq);
-#endif
+ if (seq->lss_dt_service != NULL) {
+ ptlrpc_unregister_service(seq->lss_dt_service);
+ seq->lss_dt_service = NULL;
+ }
+ seq_server_proc_fini(seq);
seq_store_fini(seq, ctx);
- if (seq->seq_dev != NULL) {
- lu_device_put(&seq->seq_dev->dd_lu_dev);
- seq->seq_dev = NULL;
- }
-
EXIT;
}
EXPORT_SYMBOL(seq_server_fini);