#include "cmm_internal.h"
#include "mdc_internal.h"
-static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
- ssize_t len)
-{
- struct lu_buf *buf;
-
- buf = &cmm_env_info(env)->cmi_buf;
- buf->lb_buf = area;
- buf->lb_len = len;
- return buf;
-}
+enum {
+ CMM_SPLIT_SIZE = 64 * 1024
+};
-int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
- const char *name)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
- struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
- int rc;
- ENTRY;
-
- if (cmm->cmm_tgt_count == 0)
- RETURN(0);
-
- /* Try to get the LMV EA size */
- memset(ma, 0, sizeof(*ma));
- ma->ma_need = MA_LMV;
- rc = mo_attr_get(env, mp, ma);
- if (rc)
- RETURN(rc);
-
- if (ma->ma_valid & MA_LMV) {
- int stripe;
-
- /*
- * Clean MA_LMV in ->ma_valid because mdd will do nothing
- * counting that EA is already taken.
- */
- ma->ma_valid &= ~MA_LMV;
-
- LASSERT(ma->ma_lmv_size > 0);
- OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
- if (ma->ma_lmv == NULL)
- RETURN(-ENOMEM);
-
- /* Get LMV EA */
- ma->ma_need = MA_LMV;
- rc = mo_attr_get(env, mp, ma);
-
- /* Skip checking the slave dirs (mea_count is 0) */
- if (rc == 0 && ma->ma_lmv->mea_count != 0) {
- /*
- * Get stripe by name to check the name belongs to
- * master dir, otherwise return the -ERESTART
- */
- stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
-
- /* Master stripe is always 0 */
- if (stripe != 0)
- rc = -ERESTART;
- }
- OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
- }
- RETURN(rc);
-}
+enum {
+ CMM_NO_SPLIT_EXPECTED = 0,
+ CMM_EXPECT_SPLIT = 1,
+ CMM_NOT_SPLITTABLE = 2
+};
int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
struct md_attr *ma, int *split)
int rc;
ENTRY;
- /*
- * Check first most light things like tgt count and root fid. For some
- * case this style should yeild better performance.
- */
+ /* No need split for single MDS */
if (cmm->cmm_tgt_count == 0) {
*split = CMM_NO_SPLIT_EXPECTED;
RETURN(0);
}
- rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
- &root_fid);
+ /* No need split for Root object */
+ rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, &root_fid);
if (rc)
RETURN(rc);
RETURN(0);
}
- /*
- * MA_INODE is needed to check inode size.
- * Memory is prepared by caller.
+ /*
+ * Assumption: ma_valid = 0 here, we only need get
+ * inode and lmv_size for this get_attr
*/
+ LASSERT(ma->ma_valid == 0);
ma->ma_need = MA_INODE | MA_LMV;
rc = mo_attr_get(env, mo, ma);
if (rc)
RETURN(rc);
-
+
+ /* No need split for already split object */
if (ma->ma_valid & MA_LMV) {
+ LASSERT(ma->ma_lmv_size > 0);
*split = CMM_NOT_SPLITTABLE;
RETURN(0);
}
-
+
+ /* No need split for object whose size < CMM_SPLIT_SIZE */
if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
*split = CMM_NO_SPLIT_EXPECTED;
RETURN(0);
}
-
+
*split = CMM_EXPECT_SPLIT;
RETURN(0);
}
lu_object_put(env, &o->cmo_obj.mo_lu);
}
-static int cmm_object_create(const struct lu_env *env,
- struct cmm_device *cmm,
- struct lu_fid *fid,
- struct md_attr *ma,
- struct lmv_stripe_md *lmv,
- int lmv_size)
+static int cmm_slave_object_create(const struct lu_env *env,
+ struct cmm_device *cmm,
+ struct lu_fid *fid,
+ struct md_attr *ma,
+ struct lmv_stripe_md *lmv,
+ int lmv_size)
{
struct md_create_spec *spec;
struct cmm_object *obj;
RETURN(rc);
}
-static int cmm_fid_alloc(const struct lu_env *env,
- struct cmm_device *cmm,
- struct mdc_device *mc,
- struct lu_fid *fid)
+static int cmm_slave_fid_alloc(const struct lu_env *env,
+ struct cmm_device *cmm,
+ struct mdc_device *mc,
+ struct lu_fid *fid)
{
int rc;
ENTRY;
- LASSERT(cmm != NULL);
- LASSERT(mc != NULL);
- LASSERT(fid != NULL);
+ LASSERT(cmm != NULL && mc != NULL && fid != NULL);
down(&mc->mc_fid_sem);
-
/* Alloc new fid on @mc. */
rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
if (rc > 0) {
ENTRY;
lmv = ma->ma_lmv;
+ /* Init the split MEA */
lmv->mea_master = cmm->cmm_local_num;
lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
lmv->mea_count = cmm->cmm_tgt_count + 1;
list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
/* Alloc fid for slave object. */
- rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
+ rc = cmm_slave_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
if (rc) {
CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
mc->mc_num, rc);
}
/* Create slave on remote MDT. */
- rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
- slave_lmv, sizeof(*slave_lmv));
+ rc = cmm_slave_object_create(env, cmm, &lmv->mea_ids[i], ma,
+ slave_lmv, sizeof(*slave_lmv));
if (rc)
GOTO(cleanup, rc);
i++;
}
+ ma->ma_valid |= MA_LMV;
EXIT;
cleanup:
OBD_FREE_PTR(slave_lmv);
RETURN(rc);
}
-static int cmm_remove_dir_ent(const struct lu_env *env,
- struct md_object *mo,
- struct lu_dirent *ent)
+static int cmm_remove_split_ent(const struct lu_env *env,
+ struct md_object *mo,
+ struct lu_dirent *ent)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct cmm_object *obj;
}
static int cmm_remove_entries(const struct lu_env *env,
- struct md_object *mo, struct lu_rdpg *rdpg,
+ struct md_object *mo,
+ struct lu_rdpg *rdpg,
__u32 hash_end, __u32 *len)
{
struct lu_dirpage *dp;
kmap(rdpg->rp_pages[0]);
dp = page_address(rdpg->rp_pages[0]);
- for (ent = lu_dirent_start(dp); ent != NULL;
+ *len = 0;
+
+ for (ent = lu_dirent_start(dp);
+ ent != NULL && ent->lde_hash < hash_end;
ent = lu_dirent_next(ent)) {
- if (ent->lde_hash < hash_end) {
- rc = cmm_remove_dir_ent(env, mo, ent);
- if (rc) {
- CERROR("Can not del %s rc %d\n", ent->lde_name,
- rc);
- GOTO(unmap, rc);
- }
- } else {
- if (ent != lu_dirent_start(dp))
- *len = (int)((__u32)ent - (__u32)dp);
- else
- *len = 0;
+ rc = cmm_remove_split_ent(env, mo, ent);
+ if (rc) {
+ /*
+ * XXX Error handler to insert remove name back,
+ * currently we assumed it will success anyway
+ * in verfication test.
+ *
+ */
+ CERROR("Can not del %*.*s rc %d\n", ent->lde_namelen,
+ ent->lde_namelen, ent->lde_name, rc);
GOTO(unmap, rc);
}
+ *len += le16_to_cpu(ent->lde_reclen);
}
- *len = CFS_PAGE_SIZE;
+ if (ent != lu_dirent_start(dp))
+ *len += sizeof(struct lu_dirpage);
EXIT;
unmap:
kunmap(rdpg->rp_pages[0]);
return rc;
}
-static int cmm_split_entries(const struct lu_env *env,
- struct md_object *mo, struct lu_rdpg *rdpg,
+static int cmm_split_entries(const struct lu_env *env,
+ struct md_object *mo,
+ struct lu_rdpg *rdpg,
struct lu_fid *lf, __u32 end)
{
int rc, done = 0;
ENTRY;
- LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
- "npages %d\n", rdpg->rp_npages);
-
+ LASSERT(rdpg->rp_npages == 1);
/* Read split page and send them to the slave master. */
do {
struct lu_dirpage *ldp;
}
#define SPLIT_PAGE_COUNT 1
-
static int cmm_scan_and_split(const struct lu_env *env,
struct md_object *mo,
struct md_attr *ma)
GOTO(cleanup, rc = -ENOMEM);
}
+ LASSERT(ma->ma_valid & MA_LMV);
hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
struct lu_fid *lf;
return rc;
}
-#define cmm_md_size(stripes) \
- (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
+static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
+ ssize_t len)
+{
+ struct lu_buf *buf;
+
+ buf = &cmm_env_info(env)->cmi_buf;
+ buf->lb_buf = area;
+ buf->lb_len = len;
+ return buf;
+}
+
+#define CMM_MD_SIZE(stripes) (sizeof(struct lmv_stripe_md) + \
+ (stripes) * sizeof(struct lu_fid))
int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
struct lu_buf *buf;
- int rc = 0, split, lmv_size;
+ int rc = 0, split;
ENTRY;
LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-
memset(ma, 0, sizeof(*ma));
- lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
-
- /*
- * Preparing memory for LMV. This will be freed after finish splitting.
- */
- OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
- if (ma->ma_lmv == NULL)
- RETURN(-ENOMEM);
-
/* Step1: Checking whether the dir needs to be split. */
rc = cmm_expect_splitting(env, mo, ma, &split);
if (rc)
- GOTO(cleanup, rc);
-
+ RETURN(rc);
if (split != CMM_EXPECT_SPLIT)
- GOTO(cleanup, rc = 0);
+ RETURN(0);
LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if "
"dir is protected by MDL_EX lock. Lock mode 0x%x\n",
(int)mo->mo_pdo_mode);
-
+
/*
* Disable trans for splitting, since there will be so many trans in
* this one ops, confilct with current recovery design.
GOTO(cleanup, rc);
}
- /* Step2: Create slave objects (on slave MDTs) */
- ma->ma_valid = 0;
- ma->ma_lmv_size = lmv_size;
+ /* step2: Prepare the md memory */
+ ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
+ OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+ if (ma->ma_lmv == NULL)
+ RETURN(-ENOMEM);
+
+ /* Step3: Create slave objects and fill the ma->ma_lmv */
rc = cmm_slaves_create(env, mo, ma);
if (rc) {
CERROR("Can't create slaves for split, rc %d\n", rc);
GOTO(cleanup, rc);
}
- /* Step3: Scan and split the object. */
+ /* Step4: Scan and split the object. */
rc = cmm_scan_and_split(env, mo, ma);
if (rc) {
CERROR("Can't scan and split, rc %d\n", rc);
GOTO(cleanup, rc);
}
+ /* Step5: Set mea to the master object. */
+ LASSERT(ma->ma_valid & MA_LMV);
buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
-
- /* Step4: Set mea to the master object. */
rc = mo_xattr_set(env, md_object_next(mo), buf,
MDS_LMV_MD_NAME, 0);
- if (rc == 0) {
- CWARN("Dir "DFID" has been split\n",
- PFID(lu_object_fid(&mo->mo_lu)));
- rc = -ERESTART;
- } else {
- CERROR("Can't set MEA to master dir, "
- "rc %d\n", rc);
+ if (rc) {
+ CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
+ GOTO(cleanup, rc);
}
- EXIT;
+
+ /* Finally, split succeed, tell client to recreate the object */
+ CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
+ rc = -ERESTART;
cleanup:
- OBD_FREE(ma->ma_lmv, lmv_size);
- return rc;
+ OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+ RETURN(rc);
+}
+
+int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
+ const char *name)
+{
+ struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
+ struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+ int rc;
+ ENTRY;
+
+ if (cmm->cmm_tgt_count == 0)
+ RETURN(0);
+
+ /* Try to get the LMV EA size */
+ memset(ma, 0, sizeof(*ma));
+ ma->ma_need = MA_LMV;
+ rc = mo_attr_get(env, mp, ma);
+ if (rc)
+ RETURN(rc);
+ /* No LMV just return */
+ if (!(ma->ma_valid & MA_LMV))
+ RETURN(rc);
+
+ LASSERT(ma->ma_lmv_size > 0);
+ OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+ if (ma->ma_lmv == NULL)
+ RETURN(-ENOMEM);
+
+ /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
+ ma->ma_valid &= ~MA_LMV;
+ ma->ma_need = MA_LMV;
+ rc = mo_attr_get(env, mp, ma);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ /* Skip checking the slave dirs (mea_count is 0) */
+ if (ma->ma_lmv->mea_count != 0) {
+ int stripe;
+ /*
+ * Get stripe by name to check the name belongs to
+ * master dir, otherwise return the -ERESTART
+ */
+ stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
+
+ /* Master stripe is always 0 */
+ if (stripe != 0)
+ rc = -ERESTART;
+ }
+cleanup:
+ OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+ RETURN(rc);
+}
+
+int cmm_split_lock_mode(const struct lu_env *env, struct md_object *mo,
+ mdl_mode_t lm)
+{
+ struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+ int rc, split;
+ ENTRY;
+
+ memset(ma, 0, sizeof(*ma));
+ /*
+ * Check only if we need protection from split.
+ * If not - mdt handles other cases.
+ */
+ rc = cmm_expect_splitting(env, mo, ma, &split);
+ if (rc) {
+ CERROR("Can't check for possible split, rc %d\n", rc);
+ RETURN(MDL_MINMODE);
+ }
+
+ /*
+ * Do not take PDO lock on non-splittable objects if
+ * this is not PW, this should speed things up a bit.
+ */
+ if (split == CMM_NOT_SPLITTABLE && lm != MDL_PW)
+ RETURN(MDL_NL);
+
+ /* Protect splitting by exclusive lock. */
+ if (split == CMM_EXPECT_SPLIT && lm == MDL_PW)
+ RETURN(MDL_EX);
+
+ /*
+ * XXX Have no idea about lock mode, let it be
+ * what higher layer wants.
+ */
+ RETURN(MDL_MINMODE);
}