return (&d->cmm_md_dev.md_lu_dev);
}
+#ifdef HAVE_SPLIT_SUPPORT
+enum {
+ CMM_SPLIT_UNKNOWN,
+ CMM_SPLIT_NONE,
+ CMM_SPLIT_NEEDED,
+ CMM_SPLIT_DONE,
+ CMM_SPLIT_DENIED
+};
+#endif
struct cmm_object {
struct md_object cmo_obj;
};
/* local CMM object */
struct cml_object {
struct cmm_object cmm_obj;
+#ifdef HAVE_SPLIT_SUPPORT
+ /* split state of object (for dirs only)*/
+ __u32 clo_split;
+#endif
};
/* remote CMM object */
const struct lu_object_header *hdr,
struct lu_device *);
+/*
+ * local CMM object operations. cml_...
+ */
+static inline struct cml_object *lu2cml_obj(struct lu_object *o)
+{
+ return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cml_object *md2cml_obj(struct md_object *mo)
+{
+ return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
+}
+static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
+{
+ return container_of0(co, struct cml_object, cmm_obj);
+}
int cmm_upcall(const struct lu_env *env, struct md_device *md,
enum md_upcall_event ev);
* of operations so we are avoiding multiple checks in code.
*/
-/*
- * local CMM object operations. cml_...
- */
-static inline struct cml_object *lu2cml_obj(struct lu_object *o)
-{
- return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
-}
-static inline struct cml_object *md2cml_obj(struct md_object *mo)
-{
- return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
-}
-static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
-{
- return container_of0(co, struct cml_object, cmm_obj);
-}
/* get local child device */
static struct lu_device *cml_child_dev(struct cmm_device *d)
{
ENTRY;
+#ifdef HAVE_SPLIT_SUPPORT
+ if(cd->cmm_tgt_count == 0)
+ lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
+ else
+ lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
+#endif
c_dev = cml_child_dev(cd);
if (c_dev == NULL) {
rc = -ENOENT;
* choosen.
*/
rc = cmm_split_try(env, mo_p);
- if (rc == -EALREADY) {
- /*
- * Dir is already split and we would like to check if
- * name came to correct MDT. If not -ERESTART is
- * returned by cmm_split_check()
- */
- rc = cmm_split_check(env, mo_p, child_name);
- if (rc)
- RETURN(rc);
- } else if (rc) {
+ if (rc) {
/*
* -ERESTART or some split error is returned, we can't
* proceed with create.
CMM_SPLIT_SIZE = 64 * 1024
};
-enum {
- CMM_NO_SPLIT_EXPECTED = 0,
- CMM_EXPECT_SPLIT = 1,
- CMM_NOT_SPLITTABLE = 2
-};
-
#define CMM_SPLIT_PAGE_COUNT 1
/*
int cmm_split_check(const struct lu_env *env, struct md_object *mp,
const char *name)
{
+ struct cml_object *clo = md2cml_obj(mp);
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
int rc;
ENTRY;
- if (cmm->cmm_tgt_count == 0)
+ /* not split yet */
+ if (clo->clo_split == CMM_SPLIT_NONE ||
+ clo->clo_split == CMM_SPLIT_DENIED)
RETURN(0);
/* Try to get the LMV EA size */
rc = mo_attr_get(env, mp, ma);
if (rc)
RETURN(rc);
-
+
/* No LMV just return */
- if (!(ma->ma_valid & MA_LMV))
+ if (!(ma->ma_valid & MA_LMV)) {
+ /* update split state if unknown */
+ if (clo->clo_split == CMM_SPLIT_UNKNOWN)
+ clo->clo_split = CMM_SPLIT_NONE;
RETURN(0);
-
+ }
+
LASSERT(ma->ma_lmv_size > 0);
OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
if (ma->ma_lmv == NULL)
*/
if (idx != 0)
rc = -ERESTART;
+
+ /* update split state to DONE if unknown */
+ if (clo->clo_split == CMM_SPLIT_UNKNOWN)
+ clo->clo_split = CMM_SPLIT_DONE;
+ } else {
+ /* split is denied for slave dir */
+ clo->clo_split = CMM_SPLIT_DENIED;
}
EXIT;
cleanup:
* Do not take PDO lock on non-splittable objects if this is not PW,
* this should speed things up a bit.
*/
- if (split == CMM_NOT_SPLITTABLE && lm != MDL_PW)
+ if (split == CMM_SPLIT_DONE && lm != MDL_PW)
RETURN(MDL_NL);
/* Protect splitting by exclusive lock. */
- if (split == CMM_EXPECT_SPLIT && lm == MDL_PW)
+ if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
RETURN(MDL_EX);
/*
RETURN(MDL_MINMODE);
}
-/* Check if split is expected for current thead. */
+/* Check if split is expected for current thread. */
int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
struct md_attr *ma, int *split)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+ struct cml_object *clo = md2cml_obj(mo);
struct lu_fid root_fid;
int rc;
ENTRY;
- /* No need split for single MDS */
- if (cmm->cmm_tgt_count == 0) {
- *split = CMM_NO_SPLIT_EXPECTED;
+ if (clo->clo_split == CMM_SPLIT_DONE ||
+ clo->clo_split == CMM_SPLIT_DENIED) {
+ *split = clo->clo_split;
RETURN(0);
}
+ /* CMM_SPLIT_UNKNOWN case below */
/* No need split for Root object */
rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, &root_fid);
RETURN(rc);
if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
- *split = CMM_NO_SPLIT_EXPECTED;
+ /* update split state */
+ *split = clo->clo_split == CMM_SPLIT_DENIED;
RETURN(0);
}
/* No need split for already split object */
if (ma->ma_valid & MA_LMV) {
LASSERT(ma->ma_lmv_size > 0);
- *split = CMM_NOT_SPLITTABLE;
+ *split = clo->clo_split = CMM_SPLIT_DONE;
RETURN(0);
}
/* No need split for object whose size < CMM_SPLIT_SIZE */
if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
- *split = CMM_NO_SPLIT_EXPECTED;
+ *split = clo->clo_split = CMM_SPLIT_NONE;
RETURN(0);
}
-
- *split = CMM_EXPECT_SPLIT;
+
+ *split = clo->clo_split = CMM_SPLIT_NEEDED;
RETURN(0);
}
if (rc)
RETURN(rc);
- if (split == CMM_NOT_SPLITTABLE) {
- /* Let caller know that dir is already split. */
- RETURN(-EALREADY);
- } else if (split == CMM_NO_SPLIT_EXPECTED) {
- /* No split is expected, caller may proceed with create. */
+ if (split != CMM_SPLIT_NEEDED) {
+ /* No split is needed, caller may proceed with create. */
RETURN(0);
- } else {
- /* Split should be done now, let's do it. */
- CWARN("Dir "DFID" is going to split\n",
- PFID(lu_object_fid(&mo->mo_lu)));
- la_size = ma->ma_attr.la_size;
}
+
+ /* Split should be done now, let's do it. */
+ CWARN("Dir "DFID" is going to split\n",
+ PFID(lu_object_fid(&mo->mo_lu)));
/*
* Disable transacrions for split, since there will be so many trans in
GOTO(cleanup, rc);
}
+ /* set flag in cmm_object */
+ md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
+
/*
* Finally, split succeed, tell client to repeat opetartion on correct
* MDT.