}
static struct md_device_operations cmm_md_ops = {
- .mdo_statfs = cmm_statfs,
- .mdo_root_get = cmm_root_get,
- .mdo_maxsize_get = cmm_maxsize_get,
- .mdo_init_capa_ctxt = cmm_init_capa_ctxt,
- .mdo_update_capa_key= cmm_update_capa_key,
+ .mdo_statfs = cmm_statfs,
+ .mdo_root_get = cmm_root_get,
+ .mdo_maxsize_get = cmm_maxsize_get,
+ .mdo_init_capa_ctxt = cmm_init_capa_ctxt,
+ .mdo_update_capa_key = cmm_update_capa_key,
};
extern struct lu_device_type mdc_device_type;
struct mdc_device *mc, *tmp;
struct lu_fld_target target;
struct lu_device *ld;
- struct lu_site *ls;
mdsno_t mdc_num;
int rc;
ENTRY;
ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
ld->ld_site = cmm2lu_dev(cm)->ld_site;
- rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL);
+ rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
if (rc) {
ldt->ldt_ops->ldto_device_free(env, ld);
RETURN (rc);
lu_device_get(cmm2lu_dev(cm));
- ls = cm->cmm_md_dev.md_lu_dev.ld_site;
-
target.ft_srv = NULL;
target.ft_idx = mc->mc_num;
target.ft_exp = mc->mc_desc.cl_exp;
-
- fld_client_add_target(ls->ls_client_fld, &target);
+ fld_client_add_target(cm->cmm_fld, &target);
- /* set max md size for the mdc */
+ /* Set max md size for the mdc. */
rc = cmm_post_init_mdc(env, cm);
-
RETURN(rc);
}
struct mdc_device *mc, *tmp;
ENTRY;
- /* finish all mdc devices */
+ /* Remove local target from FLD. */
+ fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
+
+ /* Finish all mdc devices. */
spin_lock(&cm->cmm_tgt_guard);
list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
struct lu_device *ld_m = mdc2lu_dev(mc);
EXIT;
}
+
static int cmm_device_mount(const struct lu_env *env,
struct cmm_device *m, struct lustre_cfg *cfg)
{
switch(cfg->lcfg_command) {
case LCFG_ADD_MDC:
+ /* On first ADD_MDC add also local target. */
+ if (!(m->cmm_flags & CMM_INITIALIZED)) {
+ struct lu_site *ls = cmm2lu_dev(m)->ld_site;
+ struct lu_fld_target target;
+
+ target.ft_srv = ls->ls_server_fld;
+ target.ft_idx = m->cmm_local_num;
+ target.ft_exp = NULL;
+
+ fld_client_add_target(m->cmm_fld, &target);
+ }
err = cmm_add_mdc(env, m, cfg);
- /* the first ADD_MDC can be counted as setup is finished */
- if ((m->cmm_flags & CMM_INITIALIZED) == 0)
+
+ /* The first ADD_MDC can be counted as setup is finished. */
+ if (!(m->cmm_flags & CMM_INITIALIZED))
m->cmm_flags |= CMM_INITIALIZED;
+
break;
case LCFG_SETUP:
{
{
struct lu_device *l;
struct cmm_device *m;
-
ENTRY;
OBD_ALLOC_PTR(m);
m->cmm_md_dev.md_upcall.mu_upcall = cmm_upcall;
l = cmm2lu_dev(m);
l->ld_ops = &cmm_lu_ops;
+
+ OBD_ALLOC_PTR(m->cmm_fld);
+ if (!m->cmm_fld)
+ GOTO(out_free_cmm, l = ERR_PTR(-ENOMEM));
}
- RETURN (l);
+ RETURN(l);
+out_free_cmm:
+ OBD_FREE_PTR(m);
+ return l;
}
static void cmm_device_free(const struct lu_env *env, struct lu_device *d)
LASSERT(m->cmm_tgt_count == 0);
LASSERT(list_empty(&m->cmm_targets));
+ if (m->cmm_fld != NULL) {
+ OBD_FREE_PTR(m->cmm_fld);
+ m->cmm_fld = NULL;
+ }
md_device_fini(&m->cmm_md_dev);
OBD_FREE_PTR(m);
}
lu_context_key_degister(&cmm_thread_key);
}
-static int cmm_device_init(const struct lu_env *env,
- struct lu_device *d, struct lu_device *next)
+static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
+ const char *name, struct lu_device *next)
{
struct cmm_device *m = lu2cmm_dev(d);
+ struct lu_site *ls;
int err = 0;
ENTRY;
m->cmm_tgt_count = 0;
m->cmm_child = lu2md_dev(next);
+ err = fld_client_init(m->cmm_fld, name,
+ LUSTRE_CLI_FLD_HASH_DHT);
+ if (err) {
+ CERROR("Can't init FLD, err %d\n", err);
+ RETURN(err);
+ }
+
+ /* Assign site's fld client ref, needed for asserts in osd. */
+ ls = cmm2lu_dev(m)->ld_site;
+ ls->ls_client_fld = m->cmm_fld;
+
RETURN(err);
}
{
struct cmm_device *cm = lu2cmm_dev(ld);
struct mdc_device *mc, *tmp;
+ struct lu_site *ls;
ENTRY;
- /* finish all mdc devices */
+
+ /* Finish all mdc devices */
spin_lock(&cm->cmm_tgt_guard);
list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
struct lu_device *ld_m = mdc2lu_dev(mc);
}
spin_unlock(&cm->cmm_tgt_guard);
+ fld_client_fini(cm->cmm_fld);
+ ls = cmm2lu_dev(cm)->ld_site;
+ ls->ls_client_fld = NULL;
+
RETURN (md2lu_dev(cm->cmm_child));
}
__u32 cmm_flags;
/* underlaying device in MDS stack, usually MDD */
struct md_device *cmm_child;
+ /* FLD client to talk to FLD */
+ struct lu_client_fld *cmm_fld;
/* other MD servers in cluster */
mdsno_t cmm_local_num;
__u32 cmm_tgt_count;
const struct lu_fid *fid, mdsno_t *mds,
const struct lu_env *env)
{
- struct lu_site *ls;
int rc = 0;
ENTRY;
LASSERT(fid_is_sane(fid));
- ls = cm->cmm_md_dev.md_lu_dev.ld_site;
-
- rc = fld_client_lookup(ls->ls_client_fld,
- fid_seq(fid), mds, env);
+ rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
if (rc) {
CERROR("Can't find mds by seq "LPX64", rc %d\n",
fid_seq(fid), rc);
*mds, cm->cmm_tgt_count);
rc = -EINVAL;
} else {
- CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "LPU64"\n",
- *mds, fid_seq(fid));
+ CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
+ LPU64"\n", *mds, fid_seq(fid));
}
RETURN (rc);
RETURN(rc);
}
+static int cmm_fid_alloc(const struct lu_env *env,
+ struct cmm_device *cmm,
+ struct mdc_device *mc,
+ struct lu_fid *fid)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(cmm != NULL);
+ LASSERT(mc != NULL);
+ LASSERT(fid != NULL);
+
+ rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
+ if (rc > 0) {
+ /* Setup FLD for new sequence. */
+ rc = fld_client_create(cmm->cmm_fld,
+ fid_seq(fid),
+ mc->mc_num, env);
+ if (rc)
+ CERROR("Can't create fld entry, rc %d\n", rc);
+ }
+ RETURN(rc);
+}
+
static int cmm_slaves_create(const struct lu_env *env,
struct md_object *mo,
struct md_attr *ma)
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
- struct mdc_device *mc, *tmp;
+ struct mdc_device *mc, *tmp;
int lmv_size, i = 1, rc;
ENTRY;
lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
- /* This lmv will be free after finish splitting. */
+ /* This lmv will free after finish splitting. */
OBD_ALLOC(lmv, lmv_size);
if (!lmv)
RETURN(-ENOMEM);
list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
/* Alloc fid for slave object. */
- rc = obd_fid_alloc(mc->mc_desc.cl_exp, &lmv->mea_ids[i], NULL);
- if (rc > 0) {
- struct lu_site *ls;
-
- /* Setup FLD for new sequence. */
- ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
- rc = fld_client_create(ls->ls_client_fld,
- fid_seq(&lmv->mea_ids[i]),
- mc->mc_num, env);
- if (rc) {
- CERROR("Can't create fld entry, rc %d\n", rc);
- GOTO(cleanup, rc);
- }
+ rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
+ if (rc) {
+ CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
+ mc->mc_num, rc);
+ GOTO(cleanup, rc);
}
/* Create slave on remote MDT. */
obd->u.cli.cl_max_mds_cookiesize = max_cookiesize;
}
-static int mdc_device_init(const struct lu_env *env,
- struct lu_device *ld, struct lu_device *next)
+static int mdc_device_init(const struct lu_env *env, struct lu_device *ld,
+ const char *name, struct lu_device *next)
{
return 0;
}
int rc;
ENTRY;
+ down(&fld->lsf_sem);
+
switch (opc) {
case FLD_CREATE:
rc = fld_server_create(fld, env,
mf->mf_seq, mf->mf_mds);
- /* do not return -EEXIST error for resent case */
+ /* Do not return -EEXIST error for resent case */
if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
rc = 0;
break;
case FLD_DELETE:
rc = fld_server_delete(fld, env, mf->mf_seq);
- /* do not return -ENOENT error for resent case */
+ /* Do not return -ENOENT error for resent case */
if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
rc = 0;
break;
rc = -EINVAL;
break;
}
+
+ up(&fld->lsf_sem);
+
CDEBUG(D_INFO|D_WARNING, "%s: FLD req handle: error %d (opc: %d, seq: "
- LPX64", mds: "LPU64")\n", fld->lsf_name, rc,
- opc, mf->mf_seq, mf->mf_mds);
+ LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc, mf->mf_seq,
+ mf->mf_mds);
+
RETURN(rc);
}
static int fld_handle(struct ptlrpc_request *req)
{
- const struct lu_env *env;
struct fld_thread_info *info;
+ const struct lu_env *env;
int rc;
env = req->rq_svc_thread->t_env;
snprintf(fld->lsf_name, sizeof(fld->lsf_name),
"srv-%s", prefix);
+ sema_init(&fld->lsf_sem, 1);
+
rc = fld_index_init(fld, env, dt);
if (rc)
GOTO(out, rc);
if (!IS_ERR(th)) {
rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
fld_rec(env, mds),
- fld_key(env, seq), th,
- BYPASS_CAPA);
+ fld_key(env, seq),
+ th, BYPASS_CAPA);
dt_dev->dd_ops->dt_trans_stop(env, th);
} else
rc = PTR_ERR(th);
LASSERT(fld != NULL);
snprintf(fld->lcf_name, sizeof(fld->lcf_name),
- "cli-%s", prefix);
+ "cli-srv-%s", prefix);
if (!hash_is_sane(hash)) {
CERROR("%s: Wrong hash function %#x\n",
fld->lcf_count = 0;
spin_lock_init(&fld->lcf_lock);
+ sema_init(&fld->lcf_sem, 1);
fld->lcf_hash = &fld_hash[hash];
INIT_LIST_HEAD(&fld->lcf_targets);
int rc;
ENTRY;
+ down(&fld->lcf_sem);
+
target = fld_client_get_target(fld, seq);
LASSERT(target != NULL);
CERROR("%s: Can't create FLD entry, rc %d\n",
fld->lcf_name, rc);
}
+ up(&fld->lcf_sem);
+
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_create);
int rc;
ENTRY;
+ down(&fld->lcf_sem);
+
fld_cache_delete(fld->lcf_cache, seq);
target = fld_client_get_target(fld, seq);
}
#endif
+ up(&fld->lcf_sem);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_delete);
int rc;
ENTRY;
+ down(&fld->lcf_sem);
+
/* Lookup it in the cache */
rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
- if (rc == 0)
+ if (rc == 0) {
+ up(&fld->lcf_sem);
RETURN(0);
+ }
/* Can not find it in the cache */
target = fld_client_get_target(fld, seq);
*/
fld_cache_insert(fld->lcf_cache, seq, *mds);
}
+ up(&fld->lcf_sem);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_lookup);
* Initialize the devices after allocation
*/
int (*ldto_device_init)(const struct lu_env *env,
- struct lu_device *, struct lu_device *);
+ struct lu_device *, const char *,
+ struct lu_device *);
/*
* Finalize device. Dual to ->ldto_device_init(). Returns pointer to
* the next device in the stack.
/* /fld file object device */
struct dt_object *lsf_obj;
+ /* Protect index modifications */
+ struct semaphore lsf_sem;
+
/* fld service name in form "fld-MDTXXX" */
char lsf_name[80];
};
/* lock protecting exports list and fld_hash */
spinlock_t lcf_lock;
+ /* protect fld req + cache modification */
+ struct semaphore lcf_sem;
+
/* client FLD cache */
struct fld_cache_info *lcf_cache;
/* client fld proc entry name */
char lcf_name[80];
- const struct lu_context *lcf_ctx;
+ const struct lu_context *lcf_ctx;
};
int fld_query(struct com_thread_info *info);
struct qstr *ph_pname;
struct lu_fid *ph_pfid;
struct qstr *ph_cname;
- int ph_opc;
+ int ph_opc;
};
#define LUSTRE_FLD_NAME "fld"
it->d.lustre.it_data = 0;
*reqp = NULL;
- /* We shoudld reallocate the FID for the object */
+ /* We should reallocate child FID. */
rc = lmv_alloc_fid_for_split(obd, &rpid, op_data,
&sop_data->fid2);
if (rc)
LASSERT(mds != NULL);
- /* here are some policies to allocate new fid */
+ /* Here are some policies to allocate new fid */
if (lmv_fids_balanced(obd)) {
- /* allocate new fid basing on its name in the case fids are
+ /*
+ * Allocate new fid basing on its name in the case fids are
* balanced, that is all sequences have more or less equal
- * number of objects created. */
+ * number of objects created.
+ */
if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
#if 1
*mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
hint->ph_cname);
rc = 0;
#else
- /* stress policy for tests - to use non-parent MDS */
+ /* Stress policy for tests - to use non-parent MDS */
LASSERT(fid_is_sane(hint->ph_pfid));
rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
if (rc)
obj = lmv_obj_grab(obd, hint->ph_pfid);
if (obj) {
- /* If the dir got split, alloc fid according
- * to its hash
+ /*
+ * If the dir got split, alloc fid according to
+ * its hash
*/
struct lu_fid *rpid;
hint->ph_cname->name,
hint->ph_cname->len);
rpid = &obj->lo_inodes[*mds].li_fid;
+ lmv_obj_put(obj);
+
rc = lmv_fld_lookup(lmv, rpid, mds);
- if (rc) {
- lmv_obj_put(obj);
+ if (rc)
GOTO(exit, rc);
- }
+
CDEBUG(D_INODE, "The obj "DFID" has been"
"split, got MDS at "LPU64" by name %s\n",
PFID(hint->ph_pfid), *mds,
hint->ph_cname->name);
- lmv_obj_put(obj);
rc = 0;
} else {
- /* default policy is to use parent MDS */
+ /* Default policy is to use parent MDS */
rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
}
}
} else {
- /* sequences among all tgts are not well balanced, allocate new
+ /*
+ * Sequences among all tgts are not well balanced, allocate new
* fid taking this into account to balance them. Not implemented
- * yet! */
+ * yet!
+ */
*mds = 0;
rc = -EINVAL;
}
exit:
if (rc) {
- CERROR("cannot choose MDS, err = %d\n", rc);
+ CERROR("Can't choose MDS, err = %d\n", rc);
} else {
LASSERT(*mds < lmv->desc.ld_tgt_count);
}
return 0;
}
-static int mdd_device_init(const struct lu_env *env,
- struct lu_device *d, struct lu_device *next)
+static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
+ const char *name, struct lu_device *next)
{
struct mdd_device *mdd = lu2mdd_dev(d);
struct dt_device *dt;
ls->ls_server_fld = NULL;
}
- if (ls && ls->ls_client_fld != NULL) {
- fld_client_fini(ls->ls_client_fld);
- OBD_FREE_PTR(ls->ls_client_fld);
- ls->ls_client_fld = NULL;
- }
-
RETURN(0);
}
const char *uuid,
struct mdt_device *m)
{
- struct lu_fld_target target;
struct lu_site *ls;
int rc;
ENTRY;
if (rc) {
OBD_FREE_PTR(ls->ls_server_fld);
ls->ls_server_fld = NULL;
+ RETURN(rc);
}
- OBD_ALLOC_PTR(ls->ls_client_fld);
- if (!ls->ls_client_fld)
- GOTO(out_fld_fini, rc = -ENOMEM);
-
- rc = fld_client_init(ls->ls_client_fld, uuid,
- LUSTRE_CLI_FLD_HASH_DHT);
- if (rc) {
- CERROR("can't init FLD, err %d\n", rc);
- OBD_FREE_PTR(ls->ls_client_fld);
- GOTO(out_fld_fini, rc);
- }
-
- target.ft_srv = ls->ls_server_fld;
- target.ft_idx = ls->ls_node_id;
- target.ft_exp = NULL;
-
- fld_client_add_target(ls->ls_client_fld, &target);
- EXIT;
-out_fld_fini:
- if (rc)
- mdt_fld_fini(env, m);
- return rc;
+ RETURN(0);
}
/* device init/fini methods */
struct lu_device *child,
struct lustre_cfg *cfg)
{
+ const char *dev = lustre_cfg_string(cfg, 0);
struct obd_type *type;
struct lu_device_type *ldt;
struct lu_device *d;
d->ld_site = child->ld_site;
type->typ_refcnt++;
- rc = ldt->ldt_ops->ldto_device_init(env, d, child);
+ rc = ldt->ldt_ops->ldto_device_init(env, d, dev, child);
if (rc) {
CERROR("can't init device '%s', rc %d\n", typename, rc);
GOTO(out_alloc, rc);
mdt_seq_fini(env, m);
mdt_seq_fini_cli(m);
mdt_fld_fini(env, m);
+ lprocfs_obd_cleanup(d->ld_obd);
if (m->mdt_rootsquash_info) {
OBD_FREE_PTR(m->mdt_rootsquash_info);
LASSERT(info != NULL);
obd = class_name2obd(dev);
- LASSERT(obd);
+ LASSERT(obd != NULL);
spin_lock_init(&m->mdt_transno_lock);
GOTO(err_fini_site, rc);
}
+ /* set server index */
+ LASSERT(num);
+ s->ls_node_id = simple_strtol(num, NULL, 10);
+
/* init the stack */
rc = mdt_stack_init(env, m, cfg);
if (rc) {
CERROR("can't init device stack, rc %d\n", rc);
- GOTO(err_fini_site, rc);
+ GOTO(err_fini_proc, rc);
}
- /* set server index */
- LASSERT(num);
- s->ls_node_id = simple_strtol(num, NULL, 10);
-
rc = mdt_fld_init(env, obd->obd_name, m);
if (rc)
GOTO(err_fini_stack, rc);
mdt_fld_fini(env, m);
err_fini_stack:
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
+err_fini_proc:
+ lprocfs_obd_cleanup(obd);
err_fini_site:
lu_site_fini(s);
err_free_site:
static int osd_has_index (const struct osd_object *obj);
static void osd_object_init0 (struct osd_object *obj);
static int osd_device_init (const struct lu_env *env,
- struct lu_device *d, struct lu_device *);
+ struct lu_device *d, const char *,
+ struct lu_device *);
static int osd_fid_lookup (const struct lu_env *env,
struct osd_object *obj,
const struct lu_fid *fid);
LASSERT(info->oti_txns == 0);
}
-static int osd_device_init(const struct lu_env *env,
- struct lu_device *d, struct lu_device *next)
+static int osd_device_init(const struct lu_env *env, struct lu_device *d,
+ const char *name, struct lu_device *next)
{
return lu_env_init(&osd_dev(d)->od_env_for_commit, NULL, LCT_MD_THREAD);
}
rm -rf $DIR/3c1 || error
}
-run_test 3c " dir splitting via lfs stripe ============================="
+#run_test 3c " dir splitting via lfs stripe ============================="
test_4a() {
let rr=0