CMM_SPLIT_SIZE = 64 * 1024
};
-#define CMM_SPLIT_PAGE_COUNT 1
-
/*
* This function checks if passed @name come to correct server (local MDT). If
* not - return -ERESTART and let client know that dir was split and client
struct cml_object *clo = md2cml_obj(mp);
int rc;
ENTRY;
-
+
/* not split yet */
if (clo->clo_split == CMM_SPLIT_NONE ||
clo->clo_split == CMM_SPLIT_DENIED)
clo->clo_split = CMM_SPLIT_NONE;
RETURN(0);
}
-
+
LASSERT(ma->ma_lmv_size > 0);
OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
if (ma->ma_lmv == NULL)
if (ma->ma_lmv->mea_count != 0) {
int idx;
- /*
+ /*
* Get stripe by name to check the name belongs to master dir,
* otherwise return the -ERESTART
*/
idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
-
- /*
+
+ /*
* Check if name came to correct MDT server. We suppose that if
* client does not know about split, it sends create operation
* to master MDT. And this is master job to say it that dir got
*/
if (idx != 0)
rc = -ERESTART;
-
+
/* update split state to DONE if unknown */
if (clo->clo_split == CMM_SPLIT_UNKNOWN)
clo->clo_split = CMM_SPLIT_DONE;
ENTRY;
memset(ma, 0, sizeof(*ma));
-
+
/*
* Check only if we need protection from split. If not - mdt handles
* other cases.
if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
RETURN(MDL_EX);
- /*
+ /*
* Have no idea about lock mode, let it be what higher layer wants.
*/
RETURN(MDL_MINMODE);
* Assumption: ma_valid = 0 here, we only need get inode and lmv_size
* for this get_attr.
*/
- LASSERT(ma->ma_valid == 0);
+ LASSERT(ma->ma_valid == 0);
ma->ma_need = MA_INODE | MA_LMV;
rc = mo_attr_get(env, mo, ma);
if (rc)
*split = clo->clo_split = CMM_SPLIT_NONE;
RETURN(0);
}
-
+
*split = clo->clo_split = CMM_SPLIT_NEEDED;
RETURN(0);
}
* Allocate new on passed @mc for slave object which is going to create there
* soon.
*/
-static int cmm_split_fid_alloc(const struct lu_env *env,
+static int cmm_split_fid_alloc(const struct lu_env *env,
struct cmm_device *cmm,
- struct mdc_device *mc,
+ struct mdc_device *mc,
struct lu_fid *fid)
{
int rc;
LASSERT(cmm != NULL && mc != NULL && fid != NULL);
down(&mc->mc_fid_sem);
-
+
/* Alloc new fid on @mc. */
rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
if (rc > 0) {
CERROR("Can't create fld entry, rc %d\n", rc);
}
up(&mc->mc_fid_sem);
-
+
RETURN(rc);
}
/* Allocate new slave object on passed @mc */
-static int cmm_split_slave_create(const struct lu_env *env,
+static int cmm_split_slave_create(const struct lu_env *env,
struct cmm_device *cmm,
struct mdc_device *mc,
- struct lu_fid *fid,
+ struct lu_fid *fid,
struct md_attr *ma,
struct lmv_stripe_md *lmv,
int lmv_size)
{
- struct md_create_spec *spec;
+ struct md_create_spec *spec = &cmm_env_info(env)->cmi_spec;
struct cmm_object *obj;
int rc;
ENTRY;
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
- OBD_ALLOC_PTR(spec);
- if (spec == NULL)
- RETURN(-ENOMEM);
-
+ memset(spec, 0, sizeof *spec);
spec->u.sp_ea.fid = fid;
spec->u.sp_ea.eadata = lmv;
spec->u.sp_ea.eadatalen = lmv_size;
spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
spec, ma);
- OBD_FREE_PTR(spec);
-
cmm_object_put(env, obj);
RETURN(rc);
}
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
- struct lmv_stripe_md *slave_lmv = NULL;
+ struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
struct mdc_device *mc, *tmp;
struct lmv_stripe_md *lmv;
int i = 1, rc = 0;
lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
lmv->mea_count = cmm->cmm_tgt_count + 1;
- /*
+ /*
* Store master FID to local node idx number. Local node is always
* master and its stripe number if 0.
*/
lmv->mea_ids[0] = *lf;
- OBD_ALLOC_PTR(slave_lmv);
- if (slave_lmv == NULL)
- RETURN(-ENOMEM);
-
+ memset(slave_lmv, 0, sizeof *slave_lmv);
slave_lmv->mea_master = cmm->cmm_local_num;
slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
slave_lmv->mea_count = 0;
ma->ma_valid |= MA_LMV;
EXIT;
cleanup:
- OBD_FREE_PTR(slave_lmv);
return rc;
}
obj = cmm_object_find(env, cmm, ef);
if (IS_ERR(obj)) {
CERROR("Error while find object by "DFID
- ", rc %d\n", ef, (int)PTR_ERR(obj));
+ ", rc %d\n", PFID(ef), (int)PTR_ERR(obj));
return;
}
local = (lu_object_exists(&obj->cmo_obj.mo_lu) > 0);
-
+
printk("%4.4x "DFID" %*.*s/%8.8x [%4.4x] (%s)\n",
le16_to_cpu(ent->lde_reclen), PFID(ef),
le16_to_cpu(ent->lde_namelen),
ent->lde_name, le32_to_cpu(ent->lde_hash),
le16_to_cpu(ent->lde_namelen),
(local ? "lc" : "cr"));
-
+
cmm_object_put(env, obj);
}
if (le16_to_cpu(dp->ldp_flags) & LDF_EMPTY)
return;
-
+
printk("Dump: page: [%8.8x-%8.8x]/[%8.8x], flags: "
"%4.4x\n", le32_to_cpu(dp->ldp_hash_start),
le32_to_cpu(dp->ldp_hash_end), hash_end,
le16_to_cpu(dp->ldp_flags));
-
+
for (ent = lu_dirent_start(dp);
ent != NULL && le32_to_cpu(ent->lde_hash) < hash_end;
ent = lu_dirent_next(ent)) {
if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
else
- /*
+ /*
* XXX: These days only cross-ref dirs are possible, so for the
* sake of simplicity, in split, we suppose that all cross-ref
* names pint to directory and do not do additional getattr to
}
/* Read one page of entries from local MDT. */
-static int cmm_split_read_page(const struct lu_env *env,
+static int cmm_split_read_page(const struct lu_env *env,
struct md_object *mo,
struct lu_rdpg *rdpg)
{
RETURN(rc);
}
-/*
+/*
* This function performs migration of all pages with entries which fit into one
* stripe and one hash segment.
*/
static int cmm_split_process_stripe(const struct lu_env *env,
struct md_object *mo,
- struct lu_rdpg *rdpg,
+ struct lu_rdpg *rdpg,
struct lu_fid *lf,
__u32 end)
{
struct md_attr *ma)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct lu_rdpg *rdpg = NULL;
+ struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
__u32 hash_segement;
int rc = 0, i;
ENTRY;
- OBD_ALLOC_PTR(rdpg);
- if (!rdpg)
- RETURN(-ENOMEM);
-
+ memset(rdpg, 0, sizeof *rdpg);
rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
-
- OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
- if (rdpg->rp_pages == NULL)
- GOTO(free_rdpg, rc = -ENOMEM);
+ rdpg->rp_pages = cmm_env_info(env)->cmi_pages;
for (i = 0; i < rdpg->rp_npages; i++) {
rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
if (rc) {
CERROR("Error (rc = %d) while splitting for %d: fid="
- DFID", %08x:%08x\n", rc, i, PFID(lf),
+ DFID", %08x:%08x\n", rc, i, PFID(lf),
rdpg->rp_hash, hash_end);
GOTO(cleanup, rc);
}
for (i = 0; i < rdpg->rp_npages; i++)
if (rdpg->rp_pages[i] != NULL)
__free_pages(rdpg->rp_pages[i], 0);
- if (rdpg->rp_pages)
- OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
- sizeof rdpg->rp_pages[0]);
-free_rdpg:
- if (rdpg)
- OBD_FREE_PTR(rdpg);
-
return rc;
}
}
la_size = ma->ma_attr.la_size;
-
+
/* Split should be done now, let's do it. */
CWARN("Dir "DFID" is going to split (dir size: "LPU64")\n",
PFID(lu_object_fid(&mo->mo_lu)), la_size);
*/
CWARN("Dir "DFID" has been split (dir size: "LPU64")\n",
PFID(lu_object_fid(&mo->mo_lu)), la_size);
-
+
rc = -ERESTART;
EXIT;
cleanup: