static int cmm_add_mdc(const struct lu_context *ctx,
struct cmm_device *cm, struct lustre_cfg *cfg)
{
- struct lu_device_type *ldt = &mdc_device_type;
- struct lu_device *ld;
- struct lu_site *ls;
- struct mdc_device *mc, *tmp;
+ struct lu_device_type *ldt = &mdc_device_type;
char *p, *num = lustre_cfg_string(cfg, 2);
+ struct mdc_device *mc, *tmp;
+ struct lu_fld_target target;
+ struct lu_device *ld;
+ struct lu_site *ls;
mdsno_t mdc_num;
int rc;
ENTRY;
lu_device_get(cmm2lu_dev(cm));
ls = cm->cmm_md_dev.md_lu_dev.ld_site;
- fld_client_add_target(ls->ls_client_fld, mc->mc_desc.cl_exp);
+
+ target.ft_srv = NULL;
+ target.ft_idx = mc->mc_num;
+ target.ft_exp = mc->mc_desc.cl_exp;
+
+ fld_client_add_target(ls->ls_client_fld, &target);
}
RETURN(rc);
}
struct cmm_device *cm)
{
struct mdc_device *mc, *tmp;
- struct lu_site *ls;
ENTRY;
- ls = cm->cmm_md_dev.md_lu_dev.ld_site;
- if (ls->ls_client_fld != NULL) {
- fld_client_fini(ls->ls_client_fld);
- OBD_FREE_PTR(ls->ls_client_fld);
- ls->ls_client_fld = NULL;
- }
-
/* finish all mdc devices */
spin_lock(&cm->cmm_tgt_guard);
list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
struct cmm_device *m, struct lustre_cfg *cfg)
{
const char *index = lustre_cfg_string(cfg, 2);
- struct lu_site *ls;
- char *p, *name_str;
- int rc;
+ char *p;
- LASSERT(index);
+ LASSERT(index != NULL);
m->cmm_local_num = simple_strtol(index, &p, 10);
if (*p) {
RETURN(-EINVAL);
}
- ls = m->cmm_md_dev.md_lu_dev.ld_site;
- OBD_ALLOC_PTR(ls->ls_client_fld);
- if (!ls->ls_client_fld)
- RETURN(-ENOMEM);
-
- name_str = lustre_cfg_string(cfg, 0);
- rc = fld_client_init(ls->ls_client_fld, name_str,
- LUSTRE_CLI_FLD_HASH_DHT);
- if (rc) {
- CERROR("can't init FLD, err %d\n", rc);
- }
- RETURN(rc);
+ RETURN(0);
}
static int cmm_process_config(const struct lu_context *ctx,
if (cli == NULL) {
CDEBUG(D_INFO|D_WARNING, "%s: detached "
- "sequence mgr client %s\n", seq->lss_name,
- cli->lcs_exp->exp_client_uuid.uuid);
+ "sequence mgr client %s\n",
+ seq->lss_name, cli->lcs_name);
seq->lss_cli = cli;
RETURN(0);
}
CDEBUG(D_INFO|D_WARNING, "%s: attached "
"sequence client %s\n", seq->lss_name,
- cli->lcs_exp->exp_client_uuid.uuid);
+ cli->lcs_name);
/* asking client for new range, assign that range to ->seq_super and
* write seq state to backing store should be atomic. */
if (range_is_zero(&seq->lss_super)) {
rc = seq_client_alloc_super(cli);
if (rc) {
+ up(&seq->lss_sem);
CERROR("can't allocate super-sequence, "
"rc %d\n", rc);
- GOTO(out_up, rc);
+ RETURN(rc);
}
/* take super-seq from client seq mgr */
}
}
- EXIT;
-out_up:
up(&seq->lss_sem);
- return rc;
+ RETURN(rc);
}
EXPORT_SYMBOL(seq_server_set_cli);
RETURN(rc);
}
-static int seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *range,
- const struct lu_context *ctx)
+int seq_server_alloc_super(struct lu_server_seq *seq,
+ struct lu_range *range,
+ const struct lu_context *ctx)
{
int rc;
ENTRY;
RETURN(rc);
}
-static int seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *range,
- const struct lu_context *ctx)
+int seq_server_alloc_meta(struct lu_server_seq *seq,
+ struct lu_range *range,
+ const struct lu_context *ctx)
{
int rc;
ENTRY;
int seq_server_init(struct lu_server_seq *seq,
struct dt_device *dev,
- const char *uuid,
+ const char *prefix,
enum lu_mgr_type type,
const struct lu_context *ctx)
{
int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
- static struct ptlrpc_service_conf seq_conf;
+ static struct ptlrpc_service_conf seq_conf;
seq_conf = (typeof(seq_conf)) {
.psc_nbufs = MDS_NBUFS,
.psc_bufsize = MDS_BUFSIZE,
ENTRY;
LASSERT(dev != NULL);
- LASSERT(uuid != NULL);
+ LASSERT(prefix != NULL);
seq->lss_cli = NULL;
seq->lss_type = type;
snprintf(seq->lss_name, sizeof(seq->lss_name), "%s-%s-%s",
LUSTRE_SEQ_NAME, (is_srv ? "srv" : "ctl"),
- uuid);
+ prefix);
seq->lss_space = LUSTRE_SEQ_SPACE_RANGE;
seq->lss_super = LUSTRE_SEQ_ZERO_RANGE;
#ifdef __KERNEL__
struct seq_store_record {
- struct lu_range srv_space;
- struct lu_range srv_super;
- struct lu_range ctl_space;
- struct lu_range ctl_super;
+ struct lu_range ssr_space;
+ struct lu_range ssr_super;
};
struct seq_thread_info {
/* request sequence-controller node to allocate new super-sequence. */
static int __seq_client_alloc_super(struct lu_client_seq *seq)
{
- return seq_client_rpc(seq, &seq->lcs_range,
- SEQ_ALLOC_SUPER, "super");
+ int rc;
+
+#ifdef __KERNEL__
+ if (seq->lcs_srv) {
+ rc = seq_server_alloc_super(seq->lcs_srv,
+ &seq->lcs_range,
+ seq->lcs_ctx);
+ } else {
+#endif
+ rc = seq_client_rpc(seq, &seq->lcs_range,
+ SEQ_ALLOC_SUPER, "super");
+#ifdef __KERNEL__
+ }
+#endif
+ return rc;
}
int seq_client_alloc_super(struct lu_client_seq *seq)
/* request sequence-controller node to allocate new meta-sequence. */
static int __seq_client_alloc_meta(struct lu_client_seq *seq)
{
- return seq_client_rpc(seq, &seq->lcs_range,
- SEQ_ALLOC_META, "meta");
+ int rc;
+
+#ifdef __KERNEL__
+ if (seq->lcs_srv) {
+ rc = seq_server_alloc_meta(seq->lcs_srv,
+ &seq->lcs_range,
+ seq->lcs_ctx);
+ } else {
+#endif
+ rc = seq_client_rpc(seq, &seq->lcs_range,
+ SEQ_ALLOC_META, "meta");
+#ifdef __KERNEL__
+ }
+#endif
+ return rc;
}
int seq_client_alloc_meta(struct lu_client_seq *seq)
#endif
int seq_client_init(struct lu_client_seq *seq,
- const char *uuid,
struct obd_export *exp,
- enum lu_cli_type type)
+ enum lu_cli_type type,
+ const char *prefix,
+ struct lu_server_seq *srv,
+ const struct lu_context *ctx)
{
- int rc = 0;
+ int rc;
ENTRY;
- LASSERT(exp != NULL);
+ LASSERT(seq != NULL);
+ LASSERT(prefix != NULL);
+ seq->lcs_ctx = ctx;
+ seq->lcs_exp = exp;
+ seq->lcs_srv = srv;
seq->lcs_type = type;
fid_zero(&seq->lcs_fid);
range_zero(&seq->lcs_range);
sema_init(&seq->lcs_sem, 1);
- seq->lcs_exp = class_export_get(exp);
seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
+ if (exp == NULL) {
+ LASSERT(seq->lcs_ctx != NULL);
+ LASSERT(seq->lcs_srv != NULL);
+ } else {
+ LASSERT(seq->lcs_exp != NULL);
+ seq->lcs_exp = class_export_get(seq->lcs_exp);
+ }
+
snprintf(seq->lcs_name, sizeof(seq->lcs_name),
- "%s-cli-%s", LUSTRE_SEQ_NAME, uuid);
+ "%s-cli-%s", LUSTRE_SEQ_NAME, prefix);
rc = seq_client_proc_init(seq);
if (rc)
seq->lcs_exp = NULL;
}
- CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager\n");
+ seq->lcs_srv = NULL;
+
+ CDEBUG(D_INFO|D_WARNING,
+ "Client Sequence Manager\n");
EXIT;
}
info = lu_context_key_get(ctx, &seq_thread_key);
LASSERT(info != NULL);
- rc = dt_obj->do_body_ops->dbo_read(ctx, dt_obj,
- (char *)&info->sti_record,
- sizeof(info->sti_record), &pos);
- if (rc < 0) {
- CERROR("can't read seq data from store, rc %d\n", rc);
- RETURN(rc);
- }
-
- if (rc > 0 && rc != sizeof(info->sti_record)) {
- CWARN("read only %d bytes of %d, old data - zeroing out\n",
- rc, sizeof(info->sti_record));
- memset(&info->sti_record, 0, sizeof(info->sti_record));
- }
-
/* stub here, will fix it later */
info->sti_txn.tp_credits = SEQ_TXN_STORE_CREDITS;
th = dt_dev->dd_ops->dt_trans_start(ctx, dt_dev, &info->sti_txn);
if (!IS_ERR(th)) {
/* store ranges in le format */
- if (seq->lss_type == LUSTRE_SEQ_SERVER) {
- range_to_le(&info->sti_record.srv_space, &seq->lss_space);
- range_to_le(&info->sti_record.srv_super, &seq->lss_super);
- } else {
- range_to_le(&info->sti_record.ctl_space, &seq->lss_space);
- range_to_le(&info->sti_record.ctl_super, &seq->lss_super);
-
- }
+ range_to_le(&info->sti_record.ssr_space, &seq->lss_space);
+ range_to_le(&info->sti_record.ssr_super, &seq->lss_super);
rc = dt_obj->do_body_ops->dbo_write(ctx, dt_obj,
(char *)&info->sti_record,
sizeof(info->sti_record), &pos);
if (rc == sizeof(info->sti_record)) {
- if (seq->lss_type == LUSTRE_SEQ_SERVER) {
- seq->lss_space = info->sti_record.srv_space;
- seq->lss_super = info->sti_record.srv_super;
- } else {
- seq->lss_space = info->sti_record.ctl_space;
- seq->lss_super = info->sti_record.ctl_super;
- }
+ seq->lss_space = info->sti_record.ssr_space;
+ seq->lss_super = info->sti_record.ssr_super;
+
lustre_swab_lu_range(&seq->lss_space);
lustre_swab_lu_range(&seq->lss_super);
{
struct dt_object *dt_obj;
struct lu_fid fid;
+ const char *name;
int rc;
ENTRY;
LASSERT(seq->lss_md_service == NULL);
LASSERT(seq->lss_dt_service == NULL);
- dt_obj = dt_store_open(ctx, dt, "seq", &fid);
+ name = seq->lss_type == LUSTRE_SEQ_SERVER ?
+ "seq_srv" : "seq_ctl";
+
+ dt_obj = dt_store_open(ctx, dt, name, &fid);
if (!IS_ERR(dt_obj)) {
seq->lss_obj = dt_obj;
rc = 0;
}
/* insert index entry and update cache */
-static int fld_server_create(struct lu_server_fld *fld,
- const struct lu_context *ctx,
- seqno_t seq, mdsno_t mds)
+int fld_server_create(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ seqno_t seq, mdsno_t mds)
{
return fld_index_create(fld, ctx, seq, mds);
}
+EXPORT_SYMBOL(fld_server_create);
/* delete index entry */
-static int fld_server_delete(struct lu_server_fld *fld,
- const struct lu_context *ctx,
- seqno_t seq)
+int fld_server_delete(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ seqno_t seq)
{
return fld_index_delete(fld, ctx, seq);
}
+EXPORT_SYMBOL(fld_server_delete);
/* issue on-disk index lookup */
-static int fld_server_lookup(struct lu_server_fld *fld,
- const struct lu_context *ctx,
- seqno_t seq, mdsno_t *mds)
+int fld_server_lookup(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ seqno_t seq, mdsno_t *mds)
{
return fld_index_lookup(fld, ctx, seq, mds);
}
+EXPORT_SYMBOL(fld_server_lookup);
static int fld_server_handle(struct lu_server_fld *fld,
const struct lu_context *ctx,
mdsno_t mds;
int rc;
- rc = fld_cache_lookup(site->ls_client_fld->fld_cache,
+ rc = fld_cache_lookup(site->ls_client_fld->lcf_cache,
fid_seq(fid), &mds);
if (rc == 0)
result = (mds == site->ls_node_id);
int rc = 0;
ENTRY;
- fld->fld_proc_dir = lprocfs_register(fld->fld_name,
+ fld->lsf_proc_dir = lprocfs_register(fld->lsf_name,
proc_lustre_root,
fld_server_proc_list, fld);
- if (IS_ERR(fld->fld_proc_dir)) {
- rc = PTR_ERR(fld->fld_proc_dir);
+ if (IS_ERR(fld->lsf_proc_dir)) {
+ rc = PTR_ERR(fld->lsf_proc_dir);
RETURN(rc);
}
- fld->fld_proc_entry = lprocfs_register("services",
- fld->fld_proc_dir,
+ fld->lsf_proc_entry = lprocfs_register("services",
+ fld->lsf_proc_dir,
NULL, NULL);
- if (IS_ERR(fld->fld_proc_entry)) {
- rc = PTR_ERR(fld->fld_proc_entry);
+ if (IS_ERR(fld->lsf_proc_entry)) {
+ rc = PTR_ERR(fld->lsf_proc_entry);
GOTO(out_cleanup, rc);
}
RETURN(rc);
static void fld_server_proc_fini(struct lu_server_fld *fld)
{
ENTRY;
- if (fld->fld_proc_entry != NULL) {
- if (!IS_ERR(fld->fld_proc_entry))
- lprocfs_remove(fld->fld_proc_entry);
- fld->fld_proc_entry = NULL;
+ if (fld->lsf_proc_entry != NULL) {
+ if (!IS_ERR(fld->lsf_proc_entry))
+ lprocfs_remove(fld->lsf_proc_entry);
+ fld->lsf_proc_entry = NULL;
}
- if (fld->fld_proc_dir != NULL) {
- if (!IS_ERR(fld->fld_proc_dir))
- lprocfs_remove(fld->fld_proc_dir);
- fld->fld_proc_dir = NULL;
+ if (fld->lsf_proc_dir != NULL) {
+ if (!IS_ERR(fld->lsf_proc_dir))
+ lprocfs_remove(fld->lsf_proc_dir);
+ fld->lsf_proc_dir = NULL;
}
EXIT;
}
const char *uuid)
{
int rc;
- struct ptlrpc_service_conf fld_conf = {
+ static struct ptlrpc_service_conf fld_conf;
+ fld_conf = (typeof(fld_conf)) {
.psc_nbufs = MDS_NBUFS,
.psc_bufsize = MDS_BUFSIZE,
.psc_max_req_size = FLD_MAXREQSIZE,
};
ENTRY;
- snprintf(fld->fld_name, sizeof(fld->fld_name),
+ snprintf(fld->lsf_name, sizeof(fld->lsf_name),
"%s-srv-%s", LUSTRE_FLD_NAME, uuid);
rc = fld_index_init(fld, ctx, dt);
if (rc)
GOTO(out, rc);
- fld->fld_service =
+ fld->lsf_service =
ptlrpc_init_svc_conf(&fld_conf, fld_handle,
LUSTRE_FLD_NAME,
- fld->fld_proc_entry, NULL);
- if (fld->fld_service != NULL)
- rc = ptlrpc_start_threads(NULL, fld->fld_service,
+ fld->lsf_proc_entry, NULL);
+ if (fld->lsf_service != NULL)
+ rc = ptlrpc_start_threads(NULL, fld->lsf_service,
LUSTRE_FLD_NAME);
else
rc = -ENOMEM;
{
ENTRY;
- if (fld->fld_service != NULL) {
- ptlrpc_unregister_service(fld->fld_service);
- fld->fld_service = NULL;
+ if (fld->lsf_service != NULL) {
+ ptlrpc_unregister_service(fld->lsf_service);
+ fld->lsf_service = NULL;
}
fld_server_proc_fini(fld);
-
fld_index_fini(fld, ctx);
EXIT;
const struct lu_context *ctx,
seqno_t seq, mdsno_t mds)
{
- struct dt_object *dt_obj = fld->fld_obj;
+ struct dt_object *dt_obj = fld->lsf_obj;
struct dt_device *dt_dev;
struct txn_param txn;
struct thandle *th;
int rc;
ENTRY;
- dt_dev = lu2dt_dev(fld->fld_obj->do_lu.lo_dev);
+ dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
/* stub here, will fix it later */
txn.tp_credits = FLD_TXN_INDEX_INSERT_CREDITS;
const struct lu_context *ctx,
seqno_t seq)
{
- struct dt_object *dt_obj = fld->fld_obj;
+ struct dt_object *dt_obj = fld->lsf_obj;
struct dt_device *dt_dev;
struct txn_param txn;
struct thandle *th;
int rc;
ENTRY;
- dt_dev = lu2dt_dev(fld->fld_obj->do_lu.lo_dev);
+ dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
txn.tp_credits = FLD_TXN_INDEX_DELETE_CREDITS;
th = dt_dev->dd_ops->dt_trans_start(ctx, dt_dev, &txn);
if (!IS_ERR(th)) {
const struct lu_context *ctx,
seqno_t seq, mdsno_t *mds)
{
- struct dt_object *dt_obj = fld->fld_obj;
+ struct dt_object *dt_obj = fld->lsf_obj;
struct dt_rec *rec = fld_rec(ctx, 0);
int rc;
ENTRY;
* lu_context_key has to be registered before threads are started,
* check this.
*/
- LASSERT(fld->fld_service == NULL);
+ LASSERT(fld->lsf_service == NULL);
dt_obj = dt_store_open(ctx, dt, fld_index_name, &fid);
if (!IS_ERR(dt_obj)) {
- fld->fld_obj = dt_obj;
+ fld->lsf_obj = dt_obj;
rc = dt_obj->do_ops->do_index_try(ctx, dt_obj,
&fld_index_features);
if (rc == 0)
LASSERT(dt_obj->do_index_ops != NULL);
else
- CERROR("\"%s\" is not an index!\n", fld_index_name);
+ CERROR("\"%s\" is not an index!\n",
+ fld_index_name);
} else {
CERROR("cannot find \"%s\" obj %d\n",
fld_index_name, (int)PTR_ERR(dt_obj));
const struct lu_context *ctx)
{
ENTRY;
- if (fld->fld_obj != NULL) {
- if (!IS_ERR(fld->fld_obj))
- lu_object_put(ctx, &fld->fld_obj->do_lu);
- fld->fld_obj = NULL;
+ if (fld->lsf_obj != NULL) {
+ if (!IS_ERR(fld->lsf_obj))
+ lu_object_put(ctx, &fld->lsf_obj->do_lu);
+ fld->lsf_obj = NULL;
}
EXIT;
}
#endif
+static inline const char *
+fld_target_name(struct lu_fld_target *tar)
+{
+ if (tar->ft_srv != NULL)
+ return tar->ft_srv->lsf_name;
+
+ return tar->ft_exp->exp_client_uuid.uuid;
+}
+
#endif
static int fld_rrb_hash(struct lu_client_fld *fld,
seqno_t seq)
{
- LASSERT(fld->fld_count > 0);
- return do_div(seq, fld->fld_count);
+ LASSERT(fld->lcf_count > 0);
+ return do_div(seq, fld->lcf_count);
}
-static struct fld_target *
+static struct lu_fld_target *
fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
{
- struct fld_target *target;
+ struct lu_fld_target *target;
int hash;
ENTRY;
hash = fld_rrb_hash(fld, seq);
- list_for_each_entry(target, &fld->fld_targets, fldt_chain) {
- if (target->fldt_idx == hash)
+ list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
+ if (target->ft_idx == hash)
RETURN(target);
}
return fld_rrb_hash(fld, seq);
}
-static struct fld_target *
+static struct lu_fld_target *
fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
{
/* XXX: here should be DHT scan code */
}
};
-static struct fld_target *
+static struct lu_fld_target *
fld_client_get_target(struct lu_client_fld *fld,
seqno_t seq)
{
- struct fld_target *target;
+ struct lu_fld_target *target;
ENTRY;
- LASSERT(fld->fld_hash != NULL);
+ LASSERT(fld->lcf_hash != NULL);
- spin_lock(&fld->fld_lock);
- target = fld->fld_hash->fh_scan_func(fld, seq);
- spin_unlock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
+ target = fld->lcf_hash->fh_scan_func(fld, seq);
+ spin_unlock(&fld->lcf_lock);
RETURN(target);
}
* of FLD module.
*/
int fld_client_add_target(struct lu_client_fld *fld,
- struct obd_export *exp)
+ struct lu_fld_target *tar)
{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- struct fld_target *target, *tmp;
+ const char *tar_name = fld_target_name(tar);
+ struct lu_fld_target *target, *tmp;
ENTRY;
- LASSERT(exp != NULL);
+ LASSERT(tar != NULL);
+ LASSERT(tar_name != NULL);
+ LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
- CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
- fld->fld_name, cli->cl_target_uuid.uuid);
+ CDEBUG(D_INFO|D_WARNING, "%s: adding target %s\n",
+ fld->lcf_name, tar_name);
OBD_ALLOC_PTR(target);
if (target == NULL)
RETURN(-ENOMEM);
- spin_lock(&fld->fld_lock);
- list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
- if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
- &exp->exp_client_uuid))
+ spin_lock(&fld->lcf_lock);
+ list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
+ const char *tmp_name = fld_target_name(tmp);
+
+ if (strlen(tar_name) == strlen(tmp_name) &&
+ strcmp(tmp_name, tar_name) == 0)
{
- spin_unlock(&fld->fld_lock);
+ spin_unlock(&fld->lcf_lock);
OBD_FREE_PTR(target);
RETURN(-EEXIST);
}
}
- target->fldt_exp = class_export_get(exp);
- target->fldt_idx = fld->fld_count;
+ target->ft_exp = tar->ft_exp;
+ target->ft_srv = tar->ft_srv;
+ target->ft_idx = tar->ft_idx;
- list_add_tail(&target->fldt_chain,
- &fld->fld_targets);
- fld->fld_count++;
- spin_unlock(&fld->fld_lock);
+ list_add_tail(&target->ft_chain,
+ &fld->lcf_targets);
+
+ fld->lcf_count++;
+ spin_unlock(&fld->lcf_lock);
RETURN(0);
}
/* remove export from FLD */
int fld_client_del_target(struct lu_client_fld *fld,
- struct obd_export *exp)
+ __u64 idx)
{
- struct fld_target *target, *tmp;
+ struct lu_fld_target *target, *tmp;
ENTRY;
- spin_lock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
list_for_each_entry_safe(target, tmp,
- &fld->fld_targets, fldt_chain) {
- if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
- &exp->exp_client_uuid))
- {
- fld->fld_count--;
- list_del(&target->fldt_chain);
- spin_unlock(&fld->fld_lock);
- class_export_put(target->fldt_exp);
+ &fld->lcf_targets, ft_chain) {
+ if (target->ft_idx == idx) {
+ fld->lcf_count--;
+ list_del(&target->ft_chain);
+ spin_unlock(&fld->lcf_lock);
+ class_export_put(target->ft_exp);
OBD_FREE_PTR(target);
RETURN(0);
}
}
- spin_unlock(&fld->fld_lock);
+ spin_unlock(&fld->lcf_lock);
RETURN(-ENOENT);
}
EXPORT_SYMBOL(fld_client_del_target);
int rc;
ENTRY;
- fld->fld_proc_dir = lprocfs_register(fld->fld_name,
+ fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
proc_lustre_root,
NULL, NULL);
- if (IS_ERR(fld->fld_proc_dir)) {
+ if (IS_ERR(fld->lcf_proc_dir)) {
CERROR("LProcFS failed in fld-init\n");
- rc = PTR_ERR(fld->fld_proc_dir);
+ rc = PTR_ERR(fld->lcf_proc_dir);
RETURN(rc);
}
- rc = lprocfs_add_vars(fld->fld_proc_dir,
+ rc = lprocfs_add_vars(fld->lcf_proc_dir,
fld_client_proc_list, fld);
if (rc) {
CERROR("can't init FLD "
static void fld_client_proc_fini(struct lu_client_fld *fld)
{
ENTRY;
- if (fld->fld_proc_dir) {
- if (!IS_ERR(fld->fld_proc_dir))
- lprocfs_remove(fld->fld_proc_dir);
- fld->fld_proc_dir = NULL;
+ if (fld->lcf_proc_dir) {
+ if (!IS_ERR(fld->lcf_proc_dir))
+ lprocfs_remove(fld->lcf_proc_dir);
+ fld->lcf_proc_dir = NULL;
}
EXIT;
}
#define FLD_CACHE_THRESHOLD 10
int fld_client_init(struct lu_client_fld *fld,
- const char *uuid, int hash)
+ const char *prefix, int hash,
+ const struct lu_context *ctx)
{
#ifdef __KERNEL__
int cache_size, cache_threshold;
LASSERT(fld != NULL);
if (!hash_is_sane(hash)) {
- CERROR("wrong hash function %#x\n", hash);
+ CERROR("Wrong hash function %#x\n", hash);
RETURN(-EINVAL);
}
- INIT_LIST_HEAD(&fld->fld_targets);
- spin_lock_init(&fld->fld_lock);
- fld->fld_hash = &fld_hash[hash];
- fld->fld_count = 0;
+ fld->lcf_count = 0;
+ fld->lcf_ctx = ctx;
+ spin_lock_init(&fld->lcf_lock);
+ fld->lcf_hash = &fld_hash[hash];
+ INIT_LIST_HEAD(&fld->lcf_targets);
- snprintf(fld->fld_name, sizeof(fld->fld_name),
- "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
+ snprintf(fld->lcf_name, sizeof(fld->lcf_name),
+ "%s-cli-%s", LUSTRE_FLD_NAME, prefix);
#ifdef __KERNEL__
cache_size = FLD_CACHE_SIZE /
cache_threshold = cache_size *
FLD_CACHE_THRESHOLD / 100;
- fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE,
+ fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
cache_size,
cache_threshold);
- if (IS_ERR(fld->fld_cache)) {
- rc = PTR_ERR(fld->fld_cache);
- fld->fld_cache = NULL;
+ if (IS_ERR(fld->lcf_cache)) {
+ rc = PTR_ERR(fld->lcf_cache);
+ fld->lcf_cache = NULL;
GOTO(out, rc);
}
#endif
else
CDEBUG(D_INFO|D_WARNING,
"Client FLD, using \"%s\" hash\n",
- fld->fld_hash->fh_name);
+ fld->lcf_hash->fh_name);
return rc;
}
EXPORT_SYMBOL(fld_client_init);
void fld_client_fini(struct lu_client_fld *fld)
{
- struct fld_target *target, *tmp;
+ struct lu_fld_target *target, *tmp;
ENTRY;
fld_client_proc_fini(fld);
- spin_lock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
list_for_each_entry_safe(target, tmp,
- &fld->fld_targets, fldt_chain) {
- fld->fld_count--;
- list_del(&target->fldt_chain);
- class_export_put(target->fldt_exp);
+ &fld->lcf_targets, ft_chain) {
+ fld->lcf_count--;
+ list_del(&target->ft_chain);
+ class_export_put(target->ft_exp);
OBD_FREE_PTR(target);
}
- spin_unlock(&fld->fld_lock);
+ spin_unlock(&fld->lcf_lock);
#ifdef __KERNEL__
- if (fld->fld_cache != NULL) {
- if (!IS_ERR(fld->fld_cache))
- fld_cache_fini(fld->fld_cache);
- fld->fld_cache = NULL;
+ if (fld->lcf_cache != NULL) {
+ if (!IS_ERR(fld->lcf_cache))
+ fld_cache_fini(fld->lcf_cache);
+ fld->lcf_cache = NULL;
}
#endif
seqno_t seq, mdsno_t mds)
{
struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
- struct fld_target *target;
+ struct lu_fld_target *target;
int rc;
ENTRY;
target = fld_client_get_target(fld, seq);
LASSERT(target != NULL);
- rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+ if (target->ft_srv != NULL) {
+ LASSERT(fld->lcf_ctx != NULL);
+ rc = fld_server_create(target->ft_srv,
+ fld->lcf_ctx,
+ seq, mds);
+ } else {
+#endif
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+ }
+#endif
if (rc == 0) {
/*
* reason is that, we do not want to stop proceeding because of
* cache errors. --umka
*/
- fld_cache_insert(fld->fld_cache, seq, mds);
+ fld_cache_insert(fld->lcf_cache, seq, mds);
}
RETURN(rc);
}
seqno_t seq)
{
struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
- struct fld_target *target;
+ struct lu_fld_target *target;
int rc;
ENTRY;
- fld_cache_delete(fld->fld_cache, seq);
+ fld_cache_delete(fld->lcf_cache, seq);
target = fld_client_get_target(fld, seq);
LASSERT(target != NULL);
- rc = fld_client_rpc(target->fldt_exp,
- &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+ if (target->ft_srv != NULL) {
+ LASSERT(fld->lcf_ctx != NULL);
+ rc = fld_server_delete(target->ft_srv,
+ fld->lcf_ctx,
+ seq);
+ } else {
+#endif
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+ }
+#endif
RETURN(rc);
}
seqno_t seq, mdsno_t *mds)
{
struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
- struct fld_target *target;
+ struct lu_fld_target *target;
int rc;
ENTRY;
/* lookup it in the cache */
- rc = fld_cache_lookup(fld->fld_cache, seq, mds);
+ rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
if (rc == 0)
RETURN(0);
target = fld_client_get_target(fld, seq);
LASSERT(target != NULL);
- rc = fld_client_rpc(target->fldt_exp,
- &md_fld, FLD_LOOKUP);
+#ifdef __KERNEL__
+ if (target->ft_srv != NULL) {
+ LASSERT(fld->lcf_ctx != NULL);
+ rc = fld_server_lookup(target->ft_srv,
+ fld->lcf_ctx,
+ seq, mds);
+ } else {
+#endif
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_LOOKUP);
+#ifdef __KERNEL__
+ }
+#endif
if (rc == 0) {
*mds = md_fld.mf_mds;
* Do not return error here as well. See previous comment in
* same situation in function fld_client_create(). --umka
*/
- fld_cache_insert(fld->fld_cache, seq, *mds);
+ fld_cache_insert(fld->lcf_cache, seq, *mds);
}
RETURN(rc);
}
int count, int *eof, void *data)
{
struct lu_client_fld *fld = (struct lu_client_fld *)data;
- struct fld_target *target;
+ struct lu_fld_target *target;
int total = 0, rc;
ENTRY;
LASSERT(fld != NULL);
- spin_lock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
list_for_each_entry(target,
- &fld->fld_targets, fldt_chain)
+ &fld->lcf_targets, ft_chain)
{
- struct client_obd *cli = &target->fldt_exp->exp_obd->u.cli;
-
rc = snprintf(page, count, "%s\n",
- cli->cl_target_uuid.uuid);
+ fld_target_name(target));
page += rc;
count -= rc;
total += rc;
if (count == 0)
break;
}
- spin_unlock(&fld->fld_lock);
+ spin_unlock(&fld->lcf_lock);
RETURN(total);
}
LASSERT(fld != NULL);
- spin_lock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
rc = snprintf(page, count, "%s\n",
- fld->fld_hash->fh_name);
- spin_unlock(&fld->fld_lock);
+ fld->lcf_hash->fh_name);
+ spin_unlock(&fld->lcf_lock);
RETURN(rc);
}
}
if (hash != NULL) {
- spin_lock(&fld->fld_lock);
- fld->fld_hash = hash;
- spin_unlock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
+ fld->lcf_hash = hash;
+ spin_unlock(&fld->lcf_lock);
- CDEBUG(D_WARNING, "FLD(cli): changed hash to \"%s\"\n",
- hash->fh_name);
+ CDEBUG(D_WARNING, "%s: changed hash to \"%s\"\n",
+ fld->lcf_name, hash->fh_name);
}
RETURN(count);
*/
int ls_hash_mask;
-
/*
* LRU list, updated on each access to object. Protected by
* ->ls_guard.
* Client Seq Manager
*/
struct lu_client_seq *ls_client_seq;
-
- /* Sequence controller node */
struct obd_export *ls_client_exp;
/* statistical counters. Protected by nothing, races are accepted. */
LUSTRE_SEQ_DATA
};
+struct lu_server_seq;
+
/* client sequence manager interface */
struct lu_client_seq {
/* sequence-controller export. */
/* sequence width, that is how many objects may be allocated in one
* sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH. */
__u64 lcs_width;
+
+ /* seq-server for direct talking */
+ struct lu_server_seq *lcs_srv;
+ const struct lu_context *lcs_ctx;
};
-#ifdef __KERNEL__
/* server sequence manager interface */
struct lu_server_seq {
/* available sequence space */
__u64 lss_super_width;
__u64 lss_meta_width;
};
-#endif
#ifdef __KERNEL__
int seq_server_init(struct lu_server_seq *seq,
struct dt_device *dev,
- const char *uuid,
+ const char *prefix,
enum lu_mgr_type type,
const struct lu_context *ctx);
void seq_server_fini(struct lu_server_seq *seq,
const struct lu_context *ctx);
+int seq_server_alloc_super(struct lu_server_seq *seq,
+ struct lu_range *range,
+ const struct lu_context *ctx);
+
+int seq_server_alloc_meta(struct lu_server_seq *seq,
+ struct lu_range *range,
+ const struct lu_context *ctx);
+
int seq_server_set_cli(struct lu_server_seq *seq,
struct lu_client_seq *cli,
const struct lu_context *ctx);
#endif
int seq_client_init(struct lu_client_seq *seq,
- const char *uuid,
struct obd_export *exp,
- enum lu_cli_type type);
+ enum lu_cli_type type,
+ const char *prefix,
+ struct lu_server_seq *srv,
+ const struct lu_context *ctx);
void seq_client_fini(struct lu_client_seq *seq);
LUSTRE_CLI_FLD_HASH_RRB
};
-struct fld_target {
- struct list_head fldt_chain;
- struct obd_export *fldt_exp;
- __u64 fldt_idx;
+struct lu_server_fld;
+
+struct lu_fld_target {
+ struct list_head ft_chain;
+ struct obd_export *ft_exp;
+ struct lu_server_fld *ft_srv;
+ __u64 ft_idx;
};
-typedef int (*fld_hash_func_t) (struct lu_client_fld *, __u64);
-typedef struct fld_target * (*fld_scan_func_t) (struct lu_client_fld *, __u64);
+typedef int
+(*fld_hash_func_t) (struct lu_client_fld *, __u64);
+
+typedef struct lu_fld_target *
+(*fld_scan_func_t) (struct lu_client_fld *, __u64);
struct lu_fld_hash {
const char *fh_name;
struct lu_server_fld {
/* service proc entry */
- cfs_proc_dir_entry_t *fld_proc_entry;
+ cfs_proc_dir_entry_t *lsf_proc_entry;
/* fld dir proc entry */
- cfs_proc_dir_entry_t *fld_proc_dir;
+ cfs_proc_dir_entry_t *lsf_proc_dir;
/* pointer to started server service */
- struct ptlrpc_service *fld_service;
+ struct ptlrpc_service *lsf_service;
/* /fld file object device */
- struct dt_object *fld_obj;
+ struct dt_object *lsf_obj;
/* fld service name in form "fld-MDTXXX" */
- char fld_name[80];
+ char lsf_name[80];
};
struct fld_cache_entry {
struct lu_client_fld {
/* client side proc entry */
- cfs_proc_dir_entry_t *fld_proc_dir;
+ cfs_proc_dir_entry_t *lcf_proc_dir;
/* list of exports client FLD knows about */
- struct list_head fld_targets;
+ struct list_head lcf_targets;
/* current hash to be used to chose an export */
- struct lu_fld_hash *fld_hash;
+ struct lu_fld_hash *lcf_hash;
/* exports count */
- int fld_count;
+ int lcf_count;
/* lock protecting exports list and fld_hash */
- spinlock_t fld_lock;
+ spinlock_t lcf_lock;
/* client FLD cache */
- struct fld_cache_info *fld_cache;
+ struct fld_cache_info *lcf_cache;
/* client fld proc entry name */
- char fld_name[80];
+ char lcf_name[80];
+
+ const struct lu_context *lcf_ctx;
};
/* server methods */
void fld_server_fini(struct lu_server_fld *fld,
const struct lu_context *ctx);
+int fld_server_create(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ seqno_t seq, mdsno_t mds);
+
+int fld_server_delete(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ seqno_t seq);
+
+int fld_server_lookup(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ seqno_t seq, mdsno_t *mds);
+
/* client methods */
int fld_client_init(struct lu_client_fld *fld,
- const char *uuid,
- int hash);
+ const char *prefix, int hash,
+ const struct lu_context *ctx);
void fld_client_fini(struct lu_client_fld *fld);
seqno_t seq);
int fld_client_add_target(struct lu_client_fld *fld,
- struct obd_export *exp);
+ struct lu_fld_target *tar);
int fld_client_del_target(struct lu_client_fld *fld,
- struct obd_export *exp);
+ __u64 idx);
/* cache methods */
struct fld_cache_info *fld_cache_init(int hash_size,
struct lustre_handle conn = {0, };
struct obd_device *mdc_obd;
struct obd_export *mdc_exp;
+ struct lu_fld_target target;
int rc;
#ifdef __KERNEL__
struct proc_dir_entry *lmv_proc_dir;
}
mdc_exp = class_conn2export(&conn);
- fld_client_add_target(&lmv->lmv_fld, mdc_exp);
+
+ target.ft_srv = NULL;
+ target.ft_exp = mdc_exp;
+ target.ft_idx = tgt->idx;
+
+ fld_client_add_target(&lmv->lmv_fld, &target);
mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
}
#endif
rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
- LUSTRE_CLI_FLD_HASH_DHT);
+ LUSTRE_CLI_FLD_HASH_DHT, NULL);
if (rc) {
CERROR("can't init FLD, err %d\n",
rc);
static int mdc_fid_init(struct obd_export *exp)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
- char *uuid;
+ char *prefix;
int rc;
ENTRY;
if (cli->cl_seq == NULL)
RETURN(-ENOMEM);
- OBD_ALLOC(uuid, MAX_OBD_NAME + 5);
- if (uuid == NULL)
+ OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+ if (prefix == NULL)
GOTO(out_free_seq, rc = -ENOMEM);
- snprintf(uuid, MAX_OBD_NAME + 5, "srv-%s",
+ snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s",
exp->exp_obd->obd_name);
/* init client side sequence-manager */
- rc = seq_client_init(cli->cl_seq, uuid,
- exp, LUSTRE_SEQ_METADATA);
- OBD_FREE(uuid, MAX_OBD_NAME + 5);
+ rc = seq_client_init(cli->cl_seq, exp,
+ LUSTRE_SEQ_METADATA,
+ prefix, NULL, NULL);
+ OBD_FREE(prefix, MAX_OBD_NAME + 5);
if (rc)
GOTO(out_free_seq, rc);
RETURN(rc);
}
-
void mdt_lock_handle_init(struct mdt_lock_handle *lh)
{
lh->mlh_lh.cookie = 0ull;
OBD_FREE_PTR(ls->ls_server_seq);
ls->ls_server_seq = NULL;
}
+
if (ls && ls->ls_control_seq) {
seq_server_fini(ls->ls_control_seq, ctx);
OBD_FREE_PTR(ls->ls_control_seq);
ls->ls_control_seq = NULL;
}
+
+ if (ls && ls->ls_client_seq) {
+ seq_client_fini(ls->ls_client_seq);
+ OBD_FREE_PTR(ls->ls_client_seq);
+ ls->ls_client_seq = NULL;
+ }
+
RETURN(0);
}
struct mdt_device *m)
{
struct lu_site *ls;
+ char *prefix;
int rc;
ENTRY;
ls = m->mdt_md_dev.md_lu_dev.ld_site;
- /* sequence-controller node */
+ /*
+ * This is sequence-controller node. Init seq-controller server on local
+ * MDT.
+ */
if (ls->ls_node_id == 0) {
LASSERT(ls->ls_control_seq == NULL);
+
OBD_ALLOC_PTR(ls->ls_control_seq);
+ if (ls->ls_control_seq == NULL)
+ RETURN(-ENOMEM);
- if (ls->ls_control_seq != NULL) {
- rc = seq_server_init(ls->ls_control_seq,
- m->mdt_bottom, uuid,
- LUSTRE_SEQ_CONTROLLER,
- ctx);
- } else
- rc = -ENOMEM;
+ rc = seq_server_init(ls->ls_control_seq,
+ m->mdt_bottom, uuid,
+ LUSTRE_SEQ_CONTROLLER,
+ ctx);
+
+ if (rc)
+ GOTO(out_seq_fini, rc);
+
+ OBD_ALLOC_PTR(ls->ls_client_seq);
+ if (ls->ls_client_seq == NULL)
+ GOTO(out_seq_fini, rc = -ENOMEM);
+
+ OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+ if (prefix == NULL) {
+ OBD_FREE_PTR(ls->ls_client_seq);
+ GOTO(out_seq_fini, rc = -ENOMEM);
+ }
+
+ snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
+ uuid);
+
+ /*
+ * Init seq-controller client after seq-controller server is
+ * ready. Pass ls->ls_control_seq to it for direct talking.
+ */
+ rc = seq_client_init(ls->ls_client_seq, NULL,
+ LUSTRE_SEQ_METADATA, prefix,
+ ls->ls_control_seq, ctx);
+ OBD_FREE(prefix, MAX_OBD_NAME + 5);
+
+ if (rc)
+ GOTO(out_seq_fini, rc);
}
+ /* Init seq-server on local MDT */
LASSERT(ls->ls_server_seq == NULL);
+
OBD_ALLOC_PTR(ls->ls_server_seq);
+ if (ls->ls_server_seq == NULL)
+ GOTO(out_seq_fini, rc = -ENOMEM);
- if (ls->ls_server_seq != NULL) {
- rc = seq_server_init(ls->ls_server_seq,
- m->mdt_bottom, uuid,
- LUSTRE_SEQ_SERVER,
- ctx);
- } else
- rc = -ENOMEM;
+ rc = seq_server_init(ls->ls_server_seq,
+ m->mdt_bottom, uuid,
+ LUSTRE_SEQ_SERVER,
+ ctx);
+ if (rc)
+ GOTO(out_seq_fini, rc = -ENOMEM);
+ /* Assign seq-controller client to local seq-server. */
+ if (ls->ls_node_id == 0) {
+ LASSERT(ls->ls_client_seq != NULL);
+
+ rc = seq_server_set_cli(ls->ls_server_seq,
+ ls->ls_client_seq,
+ ctx);
+ }
+
+ EXIT;
+out_seq_fini:
if (rc)
mdt_seq_fini(ctx, m);
- RETURN(rc);
+ return rc;
}
/*
RETURN(-EINVAL);
}
- /* check if this is first MDC add and controller is not yet
+ /* check if this is adding the first MDC and controller is not yet
* initialized. */
- if (index != 0 || ls->ls_client_exp)
+ if (index != 0 || ls->ls_client_seq)
RETURN(0);
uuid_str = lustre_cfg_string(cfg, 1);
OBD_ALLOC_PTR(ls->ls_client_seq);
if (ls->ls_client_seq != NULL) {
- char *uuid;
+ char *prefix;
- OBD_ALLOC(uuid, MAX_OBD_NAME + 5);
- if (!uuid)
+ OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+ if (!prefix)
RETURN(-ENOMEM);
- snprintf(uuid, MAX_OBD_NAME + 5, "ctl-%s",
- mdc->obd_name);
+ snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
+ mdc->obd_name);
rc = seq_client_init(ls->ls_client_seq,
- uuid, ls->ls_client_exp,
- LUSTRE_SEQ_METADATA);
- OBD_FREE(uuid, MAX_OBD_NAME + 5);
+ ls->ls_client_exp,
+ LUSTRE_SEQ_METADATA,
+ prefix, NULL, NULL);
+ OBD_FREE(prefix, MAX_OBD_NAME + 5);
} else
rc = -ENOMEM;
static void mdt_seq_fini_cli(struct mdt_device *m)
{
struct lu_site *ls;
+ int rc;
ENTRY;
seq_server_set_cli(ls->ls_server_seq,
NULL, NULL);
- if (ls && ls->ls_client_seq) {
- seq_client_fini(ls->ls_client_seq);
- OBD_FREE_PTR(ls->ls_client_seq);
- ls->ls_client_seq = NULL;
- }
-
if (ls && ls->ls_client_exp) {
- int rc = obd_disconnect(ls->ls_client_exp);
- ls->ls_client_exp = NULL;
-
+ rc = obd_disconnect(ls->ls_client_exp);
if (rc) {
CERROR("failure to disconnect "
"obd: %d\n", rc);
}
+ ls->ls_client_exp = NULL;
}
EXIT;
}
/*
* FLD wrappers
*/
-static int mdt_fld_init(const struct lu_context *ctx,
- const char *uuid,
+static int mdt_fld_fini(const struct lu_context *ctx,
struct mdt_device *m)
{
- struct lu_site *ls;
- int rc;
+ struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
ENTRY;
- ls = m->mdt_md_dev.md_lu_dev.ld_site;
-
- OBD_ALLOC_PTR(ls->ls_server_fld);
+ if (ls && ls->ls_server_fld) {
+ fld_server_fini(ls->ls_server_fld, ctx);
+ OBD_FREE_PTR(ls->ls_server_fld);
+ ls->ls_server_fld = NULL;
+ }
- if (ls->ls_server_fld != NULL) {
- rc = fld_server_init(ls->ls_server_fld, ctx,
- m->mdt_bottom, uuid);
- if (rc) {
- OBD_FREE_PTR(ls->ls_server_fld);
- ls->ls_server_fld = NULL;
- }
- } else
- rc = -ENOMEM;
+ if (ls && ls->ls_client_fld != NULL) {
+ fld_client_fini(ls->ls_client_fld);
+ OBD_FREE_PTR(ls->ls_client_fld);
+ ls->ls_client_fld = NULL;
+ }
- RETURN(rc);
+ RETURN(0);
}
-static int mdt_fld_fini(const struct lu_context *ctx,
+static int mdt_fld_init(const struct lu_context *ctx,
+ const char *uuid,
struct mdt_device *m)
{
- struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+ struct lu_fld_target target;
+ struct lu_site *ls;
+ int rc;
ENTRY;
- if (ls && ls->ls_server_fld) {
- fld_server_fini(ls->ls_server_fld, ctx);
+ ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+ OBD_ALLOC_PTR(ls->ls_server_fld);
+ if (ls->ls_server_fld == NULL)
+ RETURN(rc = -ENOMEM);
+
+ rc = fld_server_init(ls->ls_server_fld, ctx,
+ m->mdt_bottom, uuid);
+ if (rc) {
OBD_FREE_PTR(ls->ls_server_fld);
ls->ls_server_fld = NULL;
}
- RETURN(0);
+
+ OBD_ALLOC_PTR(ls->ls_client_fld);
+ if (!ls->ls_client_fld)
+ GOTO(out_fld_fini, rc = -ENOMEM);
+
+ rc = fld_client_init(ls->ls_client_fld, uuid,
+ LUSTRE_CLI_FLD_HASH_DHT,
+ ctx);
+ if (rc) {
+ CERROR("can't init FLD, err %d\n", rc);
+ OBD_FREE_PTR(ls->ls_client_fld);
+ GOTO(out_fld_fini, rc);
+ }
+
+ target.ft_srv = ls->ls_server_fld;
+ target.ft_idx = ls->ls_node_id;
+ target.ft_exp = NULL;
+
+ fld_client_add_target(ls->ls_client_fld, &target);
+ EXIT;
+out_fld_fini:
+ if (rc)
+ mdt_fld_fini(ctx, m);
+ return rc;
}
/* device init/fini methods */
// for_all_existing_mdt except current one
for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
char *mdtname;
- if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
+ if (i != mti->mti_stripe_index &&
+ test_bit(i, fsdb->fsdb_mdt_index_map)) {
sprintf(mdt_index,"-MDT%04x",i);
name_create(&mdtname, mti->mti_fsname, mdt_index);
goto out_rmdir;
}
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq");
+ snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_ctl");
+ ret = touch_file(filepnm);
+ if (ret) {
+ goto out_umount;
+ }
+
+ snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_srv");
ret = touch_file(filepnm);
if (ret) {
goto out_umount;