#include "cmm_internal.h"
#include "mdc_internal.h"
-#define CMM_NO_SPLIT_EXPECTED 0
-#define CMM_EXPECT_SPLIT 1
-#define CMM_NO_SPLITTABLE 2
+static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
+ ssize_t len)
+{
+ struct lu_buf *buf;
-enum {
- SPLIT_SIZE = 64*1024
-};
+ buf = &cmm_env_info(env)->cmi_buf;
+ buf->lb_buf = area;
+ buf->lb_len = len;
+ return buf;
+}
-static int cmm_expect_splitting(const struct lu_env *env,
- struct md_object *mo,
- struct md_attr *ma)
+int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
+ const char *name)
+{
+ struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+ int rc;
+ ENTRY;
+
+ /* Try to get the LMV EA size */
+ memset(ma, 0, sizeof(*ma));
+ ma->ma_need = MA_INODE | MA_LMV;
+ rc = mo_attr_get(env, mp, ma);
+ if (rc)
+ RETURN(rc);
+
+ if (ma->ma_valid & MA_LMV) {
+ int stripe;
+
+ OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+ if (ma->ma_lmv == NULL)
+ RETURN(-ENOMEM);
+
+ /* Get LMV EA */
+ ma->ma_need = MA_INODE | MA_LMV;
+ rc = mo_attr_get(env, mp, ma);
+ if (rc)
+ RETURN(rc);
+
+ /* Skip checking the slave dirs (mea_count == 0) */
+ if (ma->ma_lmv->mea_count == 0)
+ RETURN(0);
+ /*
+ * Get stripe by name to check the name belongs to master dir,
+ * otherwise return the -ERESTART
+ */
+ stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
+
+ /* Master stripe is always 0 */
+ if (stripe != 0)
+ rc = -ERESTART;
+
+ OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+ }
+ RETURN(rc);
+}
+
+int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
+ struct md_attr *ma)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct lu_fid *fid = NULL;
int rc = CMM_EXPECT_SPLIT;
ENTRY;
+ ma->ma_need = MA_INODE | MA_LMV;
+ rc = mo_attr_get(env, mo, ma);
+ if (rc)
+ GOTO(cleanup, rc = CMM_NOT_SPLITTABLE);
+
if (cmm->cmm_tgt_count == 0)
GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
- if (ma->ma_attr.la_size < SPLIT_SIZE)
+ if (ma->ma_attr.la_size < CMM_SPLIT_SIZE)
GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
if (ma->ma_lmv_size)
GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+
OBD_ALLOC_PTR(fid);
rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
if (rc)
struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
struct mdc_device *mc, *tmp;
- int lmv_size, i = 1, rc;
+ int lmv_size, i = 1, rc = 0;
ENTRY;
lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
return rc;
}
-static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
- ssize_t len)
-{
- struct lu_buf *buf;
-
- buf = &cmm_env_info(env)->cmi_buf;
- buf->lb_buf = area;
- buf->lb_len = len;
- return buf;
-}
-
-int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
+int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
ENTRY;
LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-
memset(ma, 0, sizeof(*ma));
- ma->ma_need = MA_INODE | MA_LMV;
- rc = mo_attr_get(env, mo, ma);
- if (rc)
- GOTO(cleanup, ma);
- /* step1: checking whether the dir need to be splitted */
+ /* Step1: Checking whether the dir needs to be split. */
rc = cmm_expect_splitting(env, mo, ma);
if (rc != CMM_EXPECT_SPLIT)
GOTO(cleanup, rc = 0);
if (rc)
GOTO(cleanup, rc = 0);
- /* step2: create slave objects */
+ /* Step2: Create slave objects (on slave MDTs) */
rc = cmm_slaves_create(env, mo, ma);
if (rc)
GOTO(cleanup, ma);
- /* step3: scan and split the object */
+ /* Step3: Scan and split the object. */
rc = cmm_scan_and_split(env, mo, ma);
if (rc)
GOTO(cleanup, ma);
buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
- /* step4: set mea to the master object */
- rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
- if (rc == -ERESTART)
+ /* Step4: Set mea to the master object. */
+ rc = mo_xattr_set(env, md_object_next(mo), buf,
+ MDS_LMV_MD_NAME, 0);
+ if (rc == -ERESTART) {
CWARN("Dir "DFID" has been split\n",
PFID(lu_object_fid(&mo->mo_lu)));
+ }
EXIT;
cleanup:
if (ma->ma_lmv_size && ma->ma_lmv)