Whamcloud - gitweb
- fix wrong checking
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
index c7ecfb9..293a5bb 100644 (file)
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
-#define CMM_NO_SPLIT_EXPECTED   0
-#define CMM_EXPECT_SPLIT        1
-#define CMM_NO_SPLITTABLE       2
+static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
+                                  ssize_t len)
+{
+        struct lu_buf *buf;
 
-enum {
-        SPLIT_SIZE =  64*1024
-};
+        buf = &cmm_env_info(env)->cmi_buf;
+        buf->lb_buf = area;
+        buf->lb_len = len;
+        return buf;
+}
 
-static int cmm_expect_splitting(const struct lu_env *env,
-                                struct md_object *mo,
-                                struct md_attr *ma)
+int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
+                     const char *name)
+{
+        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+        int rc;
+        ENTRY;
+        
+        /* Try to get the LMV EA size */
+        memset(ma, 0, sizeof(*ma));
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mp, ma);
+        if (rc)
+                RETURN(rc);
+
+        if (ma->ma_valid & MA_LMV) {
+                int stripe;
+
+                OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+                if (ma->ma_lmv == NULL)
+                        RETURN(-ENOMEM);
+
+                /* Get LMV EA */
+                ma->ma_need = MA_INODE | MA_LMV;
+                rc = mo_attr_get(env, mp, ma);
+                if (rc)
+                        RETURN(rc);
+                
+                /* Skip checking the slave dirs (mea_count == 0) */
+                if (ma->ma_lmv->mea_count == 0)
+                        RETURN(0);
+                /* 
+                 * Get stripe by name to check the name belongs to master dir,
+                 * otherwise return the -ERESTART
+                 */
+                stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
+                
+                /* Master stripe is always 0 */
+                if (stripe != 0)
+                        rc = -ERESTART;
+                
+                OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+        }
+        RETURN(rc);
+}
+
+int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
+                         struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lu_fid *fid = NULL;
         int rc = CMM_EXPECT_SPLIT;
         ENTRY;
 
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mo, ma);
+        if (rc)
+                GOTO(cleanup, rc = CMM_NOT_SPLITTABLE);
+
         if (cmm->cmm_tgt_count == 0)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
-        if (ma->ma_attr.la_size < SPLIT_SIZE)
+        if (ma->ma_attr.la_size < CMM_SPLIT_SIZE)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
         if (ma->ma_lmv_size)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+        
         OBD_ALLOC_PTR(fid);
         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
         if (rc)
@@ -84,77 +137,7 @@ cleanup:
 
 #define cmm_md_size(stripes) \
        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
-#if 0
-/* Under discussion, disabled for now */
-static int cmm_slave_fids_alloc(const struct lu_env *env,
-                                struct cmm_device *cmm,
-                                struct lu_fid *fids)
-{
-        struct lu_site *ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
-        struct mdc_device *mc, *tmp;
-        int rc = 0;
 
-        /* 
-         * XXX: In fact here would be nice to protect cmm->cmm_targets but we
-         * can't use spinlock here and do something complex is no time for that,
-         * especially taking into account that split will be removed after
-         * acceptance. So we suppose no changes to targets should happen this
-         * time.
-         */
-        list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
-                /* Allocate slave fid on mds @mc->mc_num. */
-                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fids[mc->mc_num], NULL);
-                if (rc > 0) {
-                        rc = fld_client_create(ls->ls_client_fld,
-                                               fid_seq(&fids[mc->mc_num]),
-                                               mc->mc_num, env);
-                        if (rc) {
-                                CERROR("Can't create fld entry, "
-                                       "rc %d\n", rc);
-                        }
-                } else if (rc < 0) {
-                        RETURN(rc);
-                }
-        }
-        RETURN(rc);
-}
-#else
-static int cmm_slave_fids_alloc(const struct lu_env *env,
-                                struct cmm_device *cmm,
-                                struct lu_fid *fids)
-{
-        struct  mdc_device *mc, *tmp;
-        int rc = 0, i = 0;
-
-        /* 
-         * XXX: In fact here would be nice to protect cmm->cmm_targets but we
-         * can't use spinlock here and do something complex is no time for that,
-         * especially taking into account that split will be removed after
-         * acceptance. So we suppose no changes to targets should happen this
-         * time.
-         */
-        list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
-                /* Allocate slave fid on mds @mc->mc_num. */
-                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fids[i], NULL);
-                if (rc > 0) {
-                        struct lu_site *ls;
-
-                        ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
-                        rc = fld_client_create(ls->ls_client_fld,
-                                               fid_seq(&fids[i]),
-                                               mc->mc_num, env);
-                        if (rc)
-                                CERROR("Can't create fld entry, rc %d\n", rc);
-                        
-                }
-                
-                if (rc < 0)
-                         break;
-                i++;
-         }
-         RETURN(rc);
-}
-#endif
 struct cmm_object *cmm_object_find(const struct lu_env *env,
                                    struct cmm_device *d,
                                    const struct lu_fid *f)
@@ -178,21 +161,18 @@ static inline void cmm_object_put(const struct lu_env *env,
         lu_object_put(env, &o->cmo_obj.mo_lu);
 }
 
-static int cmm_create_remote_obj(const struct lu_env *env,
-                                 struct cmm_device *cmm,
-                                 struct lu_fid *fid, struct md_attr *ma,
-                                 const struct lmv_stripe_md *lmv,
-                                 int lmv_size)
+static int cmm_object_create(const struct lu_env *env,
+                             struct cmm_device *cmm,
+                             struct lu_fid *fid,
+                             struct md_attr *ma,
+                             struct lmv_stripe_md *lmv,
+                             int lmv_size)
 {
-        struct cmm_object *obj;
         struct md_create_spec *spec;
+        struct cmm_object *obj;
         int rc;
         ENTRY;
 
-        /*
-         * XXX: Since capablity will not work with split, we pass NULL capablity
-         * here. Should be fixed later.
-         */
         obj = cmm_object_find(env, cmm, fid);
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
@@ -210,78 +190,49 @@ static int cmm_create_remote_obj(const struct lu_env *env,
         cmm_object_put(env, obj);
         RETURN(rc);
 }
-#if 0
-static int cmm_create_slave_objects(const struct lu_env *env,
-                                    struct md_object *mo,
-                                    struct md_attr *ma)
+
+static int cmm_fid_alloc(const struct lu_env *env,
+                         struct cmm_device *cmm,
+                         struct mdc_device *mc,
+                         struct lu_fid *fid)
 {
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
-        struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
-        int lmv_size, i, rc;
+        int rc;
         ENTRY;
 
-        lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
+        LASSERT(cmm != NULL);
+        LASSERT(mc != NULL);
+        LASSERT(fid != NULL);
 
-        /* This lmv will be free after finish splitting. */
-        OBD_ALLOC(lmv, lmv_size);
-        if (!lmv)
-                RETURN(-ENOMEM);
+        down(&mc->mc_fid_sem);
 
-        lmv->mea_master = cmm->cmm_local_num;
-        lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
-        lmv->mea_count = cmm->cmm_tgt_count + 1;
-
-        /* Store master FID to local node idx number. */
-        lmv->mea_ids[cmm->cmm_local_num] = *lf;
-
-        /* Allocate slave fids and setup FLD for them. */
-        rc = cmm_alloc_slave_fids(env, cmm, lmv->mea_ids);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        OBD_ALLOC_PTR(slave_lmv);
-        if (!slave_lmv)
-                GOTO(cleanup, rc = -ENOMEM);
-
-        slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
-        slave_lmv->mea_master = cmm->cmm_local_num;
-        slave_lmv->mea_count = 0;
-
-        for (i = 0; i < cmm->cmm_tgt_count + 1; i++) {
-                if (i == cmm->cmm_local_num)
-                        continue;
-                
-                rc = cmm_create_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
-                                           slave_lmv, sizeof(*slave_lmv));
+        /* Alloc new fid on @mc. */
+        rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
+        if (rc > 0) {
+                /* Setup FLD for new sequenceif needed. */
+                rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
+                                       mc->mc_num, env);
                 if (rc)
-                        GOTO(cleanup, rc);
+                        CERROR("Can't create fld entry, rc %d\n", rc);
         }
-
-        ma->ma_lmv_size = lmv_size;
-        ma->ma_lmv = lmv;
-        EXIT;
-cleanup:
-        if (slave_lmv)
-                OBD_FREE_PTR(slave_lmv);
-        if (rc && lmv)
-                OBD_FREE_PTR(lmv);
-        return rc;
+        up(&mc->mc_fid_sem);
+        
+        RETURN(rc);
 }
-#else
-static int cmm_create_slave_objects(const struct lu_env *env,
-                                    struct md_object *mo,
-                                    struct md_attr *ma)
+
+static int cmm_slaves_create(const struct lu_env *env,
+                             struct md_object *mo,
+                             struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
         struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
-        int lmv_size, i, rc;
+        struct mdc_device *mc, *tmp;
+        int lmv_size, i = 1, rc = 0;
         ENTRY;
 
         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
 
-        /* This lmv will be free after finish splitting. */
+        /* This lmv will free after finish splitting. */
         OBD_ALLOC(lmv, lmv_size);
         if (!lmv)
                 RETURN(-ENOMEM);
@@ -293,11 +244,6 @@ static int cmm_create_slave_objects(const struct lu_env *env,
         /* Store master FID to local node idx number. */
         lmv->mea_ids[0] = *lf;
 
-        /* Allocate slave fids and setup FLD for them. */
-        rc = cmm_slave_fids_alloc(env, cmm, lmv->mea_ids);
-        if (rc)
-                GOTO(cleanup, rc);
-
         OBD_ALLOC_PTR(slave_lmv);
         if (!slave_lmv)
                 GOTO(cleanup, rc = -ENOMEM);
@@ -306,11 +252,21 @@ static int cmm_create_slave_objects(const struct lu_env *env,
         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         slave_lmv->mea_count = 0;
 
-        for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
-                rc = cmm_create_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
-                                           slave_lmv, sizeof(*slave_lmv));
+        list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
+                /* Alloc fid for slave object. */
+                rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
+                if (rc) {
+                        CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
+                               mc->mc_num, rc);
+                        GOTO(cleanup, rc);
+                }
+
+                /* Create slave on remote MDT. */
+                rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
+                                       slave_lmv, sizeof(*slave_lmv));
                 if (rc)
                         GOTO(cleanup, rc);
+                i++;
         }
 
         ma->ma_lmv_size = lmv_size;
@@ -326,7 +282,7 @@ cleanup:
         }
         return rc;
 }
-#endif
+
 static int cmm_send_split_pages(const struct lu_env *env,
                                 struct md_object *mo,
                                 struct lu_rdpg *rdpg,
@@ -425,9 +381,10 @@ static int cmm_remove_entries(const struct lu_env *env,
                 }
         }
         *len = CFS_PAGE_SIZE;
+        EXIT;
 unmap:
         kunmap(rdpg->rp_pages[0]);
-        RETURN(rc);
+        return rc;
 }
 
 static int cmm_split_entries(const struct lu_env *env,
@@ -506,13 +463,10 @@ static int cmm_scan_and_split(const struct lu_env *env,
         }
 
         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
-        for (i = 0; i < cmm->cmm_tgt_count + 1; i++) {
+        for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
                 struct lu_fid *lf;
                 __u32 hash_end;
 
-                if (i == cmm->cmm_local_num)
-                        continue;
-                
                 lf = &ma->ma_lmv->mea_ids[i];
 
                 rdpg->rp_hash = i * hash_segement;
@@ -536,18 +490,7 @@ free_rdpg:
         return rc;
 }
 
-static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
-                                  ssize_t len)
-{
-        struct lu_buf *buf;
-
-        buf = &cmm_env_info(env)->cmi_buf;
-        buf->lb_buf = area;
-        buf->lb_len = len;
-        return buf;
-}
-
-int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
+int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
@@ -556,14 +499,9 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-        
         memset(ma, 0, sizeof(*ma));
-        ma->ma_need = MA_INODE|MA_LMV;
-        rc = mo_attr_get(env, mo, ma);
-        if (rc)
-                GOTO(cleanup, ma);
 
-        /* step1: checking whether the dir need to be splitted */
+        /* Step1: Checking whether the dir needs to be split. */
         rc = cmm_expect_splitting(env, mo, ma);
         if (rc != CMM_EXPECT_SPLIT)
                 GOTO(cleanup, rc = 0);
@@ -576,23 +514,25 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
         if (rc)
                 GOTO(cleanup, rc = 0);
 
-        /* step2: create slave objects */
-        rc = cmm_create_slave_objects(env, mo, ma);
+        /* Step2: Create slave objects (on slave MDTs) */
+        rc = cmm_slaves_create(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
-        /* step3: scan and split the object */
+        /* Step3: Scan and split the object. */
         rc = cmm_scan_and_split(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
         
-        /* step4: set mea to the master object */
-        rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
-        if (rc == -ERESTART)
+        /* Step4: Set mea to the master object. */
+        rc = mo_xattr_set(env, md_object_next(mo), buf,
+                          MDS_LMV_MD_NAME, 0);
+        if (rc == -ERESTART) {
                 CWARN("Dir "DFID" has been split\n",
                       PFID(lu_object_fid(&mo->mo_lu)));
+        }
         EXIT;
 cleanup:
         if (ma->ma_lmv_size && ma->ma_lmv)