#define cmm_md_size(stripes) \
(sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
-#if 0
-/* Under discussion, disabled for now */
-static int cmm_fids_slave_alloc(const struct lu_env *env,
- struct cmm_device *cmm,
- struct lu_fid *fids)
-{
- struct lu_site *ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
- struct mdc_device *mc, *tmp;
- int rc = 0;
-
- /*
- * XXX: In fact here would be nice to protect cmm->cmm_targets but we
- * can't use spinlock here and do something complex is no time for that,
- * especially taking into account that split will be removed after
- * acceptance. So we suppose no changes to targets should happen this
- * time.
- */
- list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
- /* Allocate slave fid on mds @mc->mc_num. */
- rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fids[mc->mc_num], NULL);
- if (rc > 0) {
- rc = fld_client_create(ls->ls_client_fld,
- fid_seq(&fids[mc->mc_num]),
- mc->mc_num, env);
- if (rc) {
- CERROR("Can't create fld entry, "
- "rc %d\n", rc);
- }
- } else if (rc < 0) {
- RETURN(rc);
- }
- }
- RETURN(rc);
-}
-#else
-static int cmm_fids_slave_alloc(const struct lu_env *env,
- struct cmm_device *cmm,
- struct lu_fid *fids)
-{
- struct mdc_device *mc, *tmp;
- int rc = 0;
- int i = 1; /* skip master slot */
-
- /*
- * XXX: In fact here would be nice to protect cmm->cmm_targets but we
- * can't use spinlock here and do something complex is no time for that,
- * especially taking into account that split will be removed after
- * acceptance. So we suppose no changes to targets should happen this
- * time.
- */
- list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
- /* Allocate slave fid on mds @mc->mc_num. */
- rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fids[i], NULL);
- if (rc > 0) {
- struct lu_site *ls;
- ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
- rc = fld_client_create(ls->ls_client_fld,
- fid_seq(&fids[i]),
- mc->mc_num, env);
- if (rc)
- CERROR("Can't create fld entry, rc %d\n", rc);
-
- }
-
- if (rc < 0)
- break;
- i++;
- }
- RETURN(rc);
-}
-#endif
struct cmm_object *cmm_object_find(const struct lu_env *env,
struct cmm_device *d,
const struct lu_fid *f)
lu_object_put(env, &o->cmo_obj.mo_lu);
}
-static int cmm_create_remote_obj(const struct lu_env *env,
- struct cmm_device *cmm,
- struct lu_fid *fid, struct md_attr *ma,
- const struct lmv_stripe_md *lmv,
- int lmv_size)
+static int cmm_object_create(const struct lu_env *env,
+ struct cmm_device *cmm,
+ struct lu_fid *fid,
+ struct md_attr *ma,
+ struct lmv_stripe_md *lmv,
+ int lmv_size)
{
- struct cmm_object *obj;
struct md_create_spec *spec;
+ struct cmm_object *obj;
int rc;
ENTRY;
- /*
- * XXX: Since capablity will not work with split, we pass NULL capablity
- * here. Should be fixed later.
- */
obj = cmm_object_find(env, cmm, fid);
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
cmm_object_put(env, obj);
RETURN(rc);
}
-#if 0
-static int cmm_objs_slave_create(const struct lu_env *env,
- struct md_object *mo,
- struct md_attr *ma)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
- struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
- int lmv_size, i, rc;
- ENTRY;
-
- lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
-
- /* This lmv will be free after finish splitting. */
- OBD_ALLOC(lmv, lmv_size);
- if (!lmv)
- RETURN(-ENOMEM);
-
- lmv->mea_master = cmm->cmm_local_num;
- lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
- lmv->mea_count = cmm->cmm_tgt_count + 1;
-
- /* Store master FID to local node idx number. */
- lmv->mea_ids[cmm->cmm_local_num] = *lf;
-
- /* Allocate slave fids and setup FLD for them. */
- rc = cmm_fids_slave_alloc(env, cmm, lmv->mea_ids);
- if (rc)
- GOTO(cleanup, rc);
-
- OBD_ALLOC_PTR(slave_lmv);
- if (!slave_lmv)
- GOTO(cleanup, rc = -ENOMEM);
- slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
- slave_lmv->mea_master = cmm->cmm_local_num;
- slave_lmv->mea_count = 0;
-
- for (i = 0; i < cmm->cmm_tgt_count + 1; i++) {
- if (i == cmm->cmm_local_num)
- continue;
-
- rc = cmm_create_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
- slave_lmv, sizeof(*slave_lmv));
- if (rc)
- GOTO(cleanup, rc);
- }
-
- ma->ma_lmv_size = lmv_size;
- ma->ma_lmv = lmv;
- EXIT;
-cleanup:
- if (slave_lmv)
- OBD_FREE_PTR(slave_lmv);
- if (rc && lmv)
- OBD_FREE_PTR(lmv);
- return rc;
-}
-#else
-static int cmm_objs_slave_create(const struct lu_env *env,
- struct md_object *mo,
- struct md_attr *ma)
+static int cmm_slaves_create(const struct lu_env *env,
+ struct md_object *mo,
+ struct md_attr *ma)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
- int lmv_size, i, rc;
+ struct mdc_device *mc, *tmp;
+ int lmv_size, i = 1, rc;
ENTRY;
lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
/* Store master FID to local node idx number. */
lmv->mea_ids[0] = *lf;
- /* Allocate slave fids and setup FLD for them. */
- rc = cmm_fids_slave_alloc(env, cmm, lmv->mea_ids);
- if (rc)
- GOTO(cleanup, rc);
-
OBD_ALLOC_PTR(slave_lmv);
if (!slave_lmv)
GOTO(cleanup, rc = -ENOMEM);
slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
slave_lmv->mea_count = 0;
- for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
- rc = cmm_create_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
- slave_lmv, sizeof(*slave_lmv));
+ list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
+ /* Alloc fid for slave object. */
+ rc = obd_fid_alloc(mc->mc_desc.cl_exp, &lmv->mea_ids[i], NULL);
+ if (rc > 0) {
+ struct lu_site *ls;
+
+ /* Setup FLD for new sequence. */
+ ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
+ rc = fld_client_create(ls->ls_client_fld,
+ fid_seq(&lmv->mea_ids[i]),
+ mc->mc_num, env);
+ if (rc) {
+ CERROR("Can't create fld entry, rc %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+ }
+
+ /* Create slave on remote MDT. */
+ rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
+ slave_lmv, sizeof(*slave_lmv));
if (rc)
GOTO(cleanup, rc);
+ i++;
}
ma->ma_lmv_size = lmv_size;
}
return rc;
}
-#endif
+
static int cmm_send_split_pages(const struct lu_env *env,
struct md_object *mo,
struct lu_rdpg *rdpg,
GOTO(cleanup, rc = 0);
/* step2: create slave objects */
- rc = cmm_objs_slave_create(env, mo, ma);
+ rc = cmm_slaves_create(env, mo, ma);
if (rc)
GOTO(cleanup, ma);