SPLIT_SIZE = 64*1024
};
-static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
-{
- return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
-}
-
static int cmm_expect_splitting(const struct lu_env *env,
struct md_object *mo,
struct md_attr *ma)
rc = CMM_EXPECT_SPLIT;
- if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
+ if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo))))
GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
EXIT;
#define cmm_md_size(stripes) \
(sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
-static int cmm_fid_alloc(const struct lu_env *env, struct cmm_device *cmm,
- struct lu_fid *fid, int count)
+static int cmm_alloc_slave_fids(const struct lu_env *env,
+ struct cmm_device *cmm,
+ struct lu_fid *fids)
{
- struct mdc_device *mc, *tmp;
- int rc = 0, i = 0;
-
- LASSERT(count == cmm->cmm_tgt_count);
+ struct lu_site *ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
+ struct mdc_device *mc, *tmp;
+ int rc = 0;
/*
- * XXX: in fact here would be nice to protect cmm->cmm_targets but we
+ * XXX: In fact here would be nice to protect cmm->cmm_targets but we
* can't use spinlock here and do something complex is no time for that,
* especially taking into account that split will be removed after
* acceptance. So we suppose no changes to targets should happen this
* time.
*/
list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
- LASSERT(cmm->cmm_local_num != mc->mc_num);
-
- rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
+ /* Allocate slave fid on mds @mc->mc_num. */
+ rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fids[mc->mc_num], NULL);
if (rc > 0) {
- struct lu_site *ls;
-
- ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
rc = fld_client_create(ls->ls_client_fld,
- fid_seq(&fid[i]),
+ fid_seq(&fids[mc->mc_num]),
mc->mc_num, env);
if (rc) {
CERROR("Can't create fld entry, "
"rc %d\n", rc);
}
- }
-
- if (rc < 0)
+ } else if (rc < 0) {
RETURN(rc);
- i++;
+ }
}
- LASSERT(i == count);
- if (rc == 1)
- rc = 0;
RETURN(rc);
}
lu_object_put(env, &o->cmo_obj.mo_lu);
}
-static int cmm_creat_remote_obj(const struct lu_env *env,
- struct cmm_device *cmm,
- struct lu_fid *fid, struct md_attr *ma,
- const struct lmv_stripe_md *lmv,
- int lmv_size)
+static int cmm_create_remote_obj(const struct lu_env *env,
+ struct cmm_device *cmm,
+ struct lu_fid *fid, struct md_attr *ma,
+ const struct lmv_stripe_md *lmv,
+ int lmv_size)
{
struct cmm_object *obj;
struct md_create_spec *spec;
int rc;
ENTRY;
- /* XXX Since capablity will not work with split. so we
- * pass NULL capablity here */
+ /*
+ * XXX: Since capablity will not work with split, we pass NULL capablity
+ * here. Should be fixed later.
+ */
obj = cmm_object_find(env, cmm, fid);
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
}
static int cmm_create_slave_objects(const struct lu_env *env,
- struct md_object *mo, struct md_attr *ma)
+ struct md_object *mo,
+ struct md_attr *ma)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
+ struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
int lmv_size, i, rc;
- struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
ENTRY;
lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
lmv->mea_count = cmm->cmm_tgt_count + 1;
- lmv->mea_ids[0] = *lf;
+ /* Store master FID to local node idx number. */
+ lmv->mea_ids[cmm->cmm_local_num] = *lf;
- rc = cmm_fid_alloc(env, cmm, &lmv->mea_ids[1],
- cmm->cmm_tgt_count);
+ /* Allocate slave fids and setup FLD for them. */
+ rc = cmm_alloc_slave_fids(env, cmm, lmv->mea_ids);
if (rc)
GOTO(cleanup, rc);
if (!slave_lmv)
GOTO(cleanup, rc = -ENOMEM);
- slave_lmv->mea_master = cmm->cmm_local_num;
slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
+ slave_lmv->mea_master = cmm->cmm_local_num;
slave_lmv->mea_count = 0;
- for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
- rc = cmm_creat_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
- slave_lmv, sizeof(slave_lmv));
+
+ for (i = 0; i < cmm->cmm_tgt_count + 1; i++) {
+ if (i == cmm->cmm_local_num)
+ continue;
+
+ rc = cmm_create_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
+ slave_lmv, sizeof(*slave_lmv));
if (rc)
GOTO(cleanup, rc);
}
ma->ma_lmv_size = lmv_size;
ma->ma_lmv = lmv;
+ EXIT;
cleanup:
if (slave_lmv)
OBD_FREE_PTR(slave_lmv);
- RETURN(rc);
+ if (rc && lmv)
+ OBD_FREE_PTR(lmv);
+ return rc;
}
static int cmm_send_split_pages(const struct lu_env *env,
- struct md_object *mo, struct lu_rdpg *rdpg,
+ struct md_object *mo,
+ struct lu_rdpg *rdpg,
struct lu_fid *fid, int len)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
RETURN(rc);
}
-static int cmm_remove_dir_ent(const struct lu_env *env, struct md_object *mo,
+static int cmm_remove_dir_ent(const struct lu_env *env,
+ struct md_object *mo,
struct lu_dirent *ent)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
}
#define SPLIT_PAGE_COUNT 1
+
static int cmm_scan_and_split(const struct lu_env *env,
- struct md_object *mo, struct md_attr *ma)
+ struct md_object *mo,
+ struct md_attr *ma)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+ struct lu_rdpg *rdpg = NULL;
__u32 hash_segement;
- struct lu_rdpg *rdpg = NULL;
int rc = 0, i;
OBD_ALLOC_PTR(rdpg);
rdpg->rp_npages = SPLIT_PAGE_COUNT;
rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
- OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+ OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
if (rdpg->rp_pages == NULL)
GOTO(free_rdpg, rc = -ENOMEM);
}
hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
- for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
- struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
+ for (i = 0; i < cmm->cmm_tgt_count + 1; i++) {
+ struct lu_fid *lf;
__u32 hash_end;
+ if (i == cmm->cmm_local_num)
+ continue;
+
+ lf = &ma->ma_lmv->mea_ids[i];
+
rdpg->rp_hash = i * hash_segement;
hash_end = rdpg->rp_hash + hash_segement;
rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
if (rc)
GOTO(cleanup, rc);
}
+ EXIT;
cleanup:
for (i = 0; i < rdpg->rp_npages; i++)
if (rdpg->rp_pages[i] != NULL)
if (rdpg)
OBD_FREE_PTR(rdpg);
- RETURN(rc);
+ return rc;
}
static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
if (rc != CMM_EXPECT_SPLIT)
GOTO(cleanup, rc = 0);
- /* Disable trans for splitting, since there will be
- * so many trans in this one ops, confilct with current
- * recovery design */
+ /*
+ * Disable trans for splitting, since there will be so many trans in
+ * this one ops, confilct with current recovery design.
+ */
rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
if (rc)
GOTO(cleanup, rc = 0);
GOTO(cleanup, ma);
buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
+
/* step4: set mea to the master object */
rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
if (rc == -ERESTART)
CWARN("Dir "DFID" has been split\n",
PFID(lu_object_fid(&mo->mo_lu)));
+ EXIT;
cleanup:
if (ma->ma_lmv_size && ma->ma_lmv)
OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
- RETURN(rc);
+ return rc;
}