Whamcloud - gitweb
- fix wrong checking
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
index e7e4bc2..293a5bb 100644 (file)
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
-#define CMM_NO_SPLIT_EXPECTED   0
-#define CMM_EXPECT_SPLIT        1
-#define CMM_NO_SPLITTABLE       2
+static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
+                                  ssize_t len)
+{
+        struct lu_buf *buf;
 
-enum {
-        SPLIT_SIZE =  64*1024
-};
+        buf = &cmm_env_info(env)->cmi_buf;
+        buf->lb_buf = area;
+        buf->lb_len = len;
+        return buf;
+}
 
-static int cmm_expect_splitting(const struct lu_env *env,
-                                struct md_object *mo,
-                                struct md_attr *ma)
+int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
+                     const char *name)
+{
+        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+        int rc;
+        ENTRY;
+        
+        /* Try to get the LMV EA size */
+        memset(ma, 0, sizeof(*ma));
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mp, ma);
+        if (rc)
+                RETURN(rc);
+
+        if (ma->ma_valid & MA_LMV) {
+                int stripe;
+
+                OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+                if (ma->ma_lmv == NULL)
+                        RETURN(-ENOMEM);
+
+                /* Get LMV EA */
+                ma->ma_need = MA_INODE | MA_LMV;
+                rc = mo_attr_get(env, mp, ma);
+                if (rc)
+                        RETURN(rc);
+                
+                /* Skip checking the slave dirs (mea_count == 0) */
+                if (ma->ma_lmv->mea_count == 0)
+                        RETURN(0);
+                /* 
+                 * Get stripe by name to check the name belongs to master dir,
+                 * otherwise return the -ERESTART
+                 */
+                stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
+                
+                /* Master stripe is always 0 */
+                if (stripe != 0)
+                        rc = -ERESTART;
+                
+                OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+        }
+        RETURN(rc);
+}
+
+int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
+                         struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lu_fid *fid = NULL;
         int rc = CMM_EXPECT_SPLIT;
         ENTRY;
 
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mo, ma);
+        if (rc)
+                GOTO(cleanup, rc = CMM_NOT_SPLITTABLE);
+
         if (cmm->cmm_tgt_count == 0)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
-        if (ma->ma_attr.la_size < SPLIT_SIZE)
+        if (ma->ma_attr.la_size < CMM_SPLIT_SIZE)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
         if (ma->ma_lmv_size)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+        
         OBD_ALLOC_PTR(fid);
         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
         if (rc)
@@ -174,7 +227,7 @@ static int cmm_slaves_create(const struct lu_env *env,
         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
         struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
         struct mdc_device *mc, *tmp;
-        int lmv_size, i = 1, rc;
+        int lmv_size, i = 1, rc = 0;
         ENTRY;
 
         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
@@ -437,18 +490,7 @@ free_rdpg:
         return rc;
 }
 
-static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
-                                  ssize_t len)
-{
-        struct lu_buf *buf;
-
-        buf = &cmm_env_info(env)->cmi_buf;
-        buf->lb_buf = area;
-        buf->lb_len = len;
-        return buf;
-}
-
-int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
+int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
@@ -457,14 +499,9 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-        
         memset(ma, 0, sizeof(*ma));
-        ma->ma_need = MA_INODE | MA_LMV;
-        rc = mo_attr_get(env, mo, ma);
-        if (rc)
-                GOTO(cleanup, ma);
 
-        /* step1: checking whether the dir need to be splitted */
+        /* Step1: Checking whether the dir needs to be split. */
         rc = cmm_expect_splitting(env, mo, ma);
         if (rc != CMM_EXPECT_SPLIT)
                 GOTO(cleanup, rc = 0);
@@ -477,23 +514,25 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
         if (rc)
                 GOTO(cleanup, rc = 0);
 
-        /* step2: create slave objects */
+        /* Step2: Create slave objects (on slave MDTs) */
         rc = cmm_slaves_create(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
-        /* step3: scan and split the object */
+        /* Step3: Scan and split the object. */
         rc = cmm_scan_and_split(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
         
-        /* step4: set mea to the master object */
-        rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
-        if (rc == -ERESTART)
+        /* Step4: Set mea to the master object. */
+        rc = mo_xattr_set(env, md_object_next(mo), buf,
+                          MDS_LMV_MD_NAME, 0);
+        if (rc == -ERESTART) {
                 CWARN("Dir "DFID" has been split\n",
                       PFID(lu_object_fid(&mo->mo_lu)));
+        }
         EXIT;
 cleanup:
         if (ma->ma_lmv_size && ma->ma_lmv)