Whamcloud - gitweb
- fix wrong checking
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
index 847b428..293a5bb 100644 (file)
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
-#define CMM_NO_SPLIT_EXPECTED   0
-#define CMM_EXPECT_SPLIT        1
-#define CMM_NO_SPLITTABLE       2
+static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
+                                  ssize_t len)
+{
+        struct lu_buf *buf;
 
-enum {
-        SPLIT_SIZE =  64*1024
-};
+        buf = &cmm_env_info(env)->cmi_buf;
+        buf->lb_buf = area;
+        buf->lb_len = len;
+        return buf;
+}
 
-static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
+int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
+                     const char *name)
 {
-       return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
+        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+        int rc;
+        ENTRY;
+        
+        /* Try to get the LMV EA size */
+        memset(ma, 0, sizeof(*ma));
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mp, ma);
+        if (rc)
+                RETURN(rc);
+
+        if (ma->ma_valid & MA_LMV) {
+                int stripe;
+
+                OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+                if (ma->ma_lmv == NULL)
+                        RETURN(-ENOMEM);
+
+                /* Get LMV EA */
+                ma->ma_need = MA_INODE | MA_LMV;
+                rc = mo_attr_get(env, mp, ma);
+                if (rc)
+                        RETURN(rc);
+                
+                /* Skip checking the slave dirs (mea_count == 0) */
+                if (ma->ma_lmv->mea_count == 0)
+                        RETURN(0);
+                /* 
+                 * Get stripe by name to check the name belongs to master dir,
+                 * otherwise return the -ERESTART
+                 */
+                stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
+                
+                /* Master stripe is always 0 */
+                if (stripe != 0)
+                        rc = -ERESTART;
+                
+                OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+        }
+        RETURN(rc);
 }
 
-static int cmm_expect_splitting(const struct lu_env *env,
-                                struct md_object *mo,
-                                struct md_attr *ma)
+int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
+                         struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lu_fid *fid = NULL;
         int rc = CMM_EXPECT_SPLIT;
         ENTRY;
 
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mo, ma);
+        if (rc)
+                GOTO(cleanup, rc = CMM_NOT_SPLITTABLE);
+
         if (cmm->cmm_tgt_count == 0)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
-        if (ma->ma_attr.la_size < SPLIT_SIZE)
+        if (ma->ma_attr.la_size < CMM_SPLIT_SIZE)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
         if (ma->ma_lmv_size)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+        
         OBD_ALLOC_PTR(fid);
         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
         if (rc)
@@ -77,65 +125,28 @@ static int cmm_expect_splitting(const struct lu_env *env,
 
         rc = CMM_EXPECT_SPLIT;
 
-        if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
+        if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo))))
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
+        EXIT;
 cleanup:
         if (fid)
                 OBD_FREE_PTR(fid);
-        RETURN(rc);
+        return rc;
 }
 
-#define cmm_md_size(stripes)                            \
+#define cmm_md_size(stripes) \
        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
 
-static int cmm_alloc_fid(const struct lu_env *env, struct cmm_device *cmm,
-                         struct lu_fid *fid, int count)
-{
-        struct  mdc_device *mc, *tmp;
-        int rc = 0, i = 0;
-
-        LASSERT(count == cmm->cmm_tgt_count);
-        /* FIXME: this spin_lock maybe not proper,
-         * because fid_alloc may need RPC */
-        spin_lock(&cmm->cmm_tgt_guard);
-        list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
-                                 mc_linkage) {
-                LASSERT(cmm->cmm_local_num != mc->mc_num);
-
-                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
-                if (rc > 0) {
-                        struct lu_site *ls;
-
-                        ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
-                        rc = fld_client_create(ls->ls_client_fld,
-                                               fid_seq(&fid[i]),
-                                               mc->mc_num, env);
-                }
-                if (rc < 0) {
-                        spin_unlock(&cmm->cmm_tgt_guard);
-                        RETURN(rc);
-                }
-                i++;
-        }
-        spin_unlock(&cmm->cmm_tgt_guard);
-        LASSERT(i == count);
-        if (rc == 1)
-                rc = 0;
-        RETURN(rc);
-}
-
 struct cmm_object *cmm_object_find(const struct lu_env *env,
                                    struct cmm_device *d,
-                                   const struct lu_fid *f,
-                                   struct lustre_capa *capa)
+                                   const struct lu_fid *f)
 {
         struct lu_object *o;
         struct cmm_object *m;
         ENTRY;
 
-        o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f,
-                           capa);
+        o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
         if (IS_ERR(o))
                 m = (struct cmm_object *)o;
         else
@@ -150,20 +161,19 @@ static inline void cmm_object_put(const struct lu_env *env,
         lu_object_put(env, &o->cmo_obj.mo_lu);
 }
 
-static int cmm_creat_remote_obj(const struct lu_env *env,
-                                struct cmm_device *cmm,
-                                struct lu_fid *fid, struct md_attr *ma,
-                                const struct lmv_stripe_md *lmv,
-                                int lmv_size)
+static int cmm_object_create(const struct lu_env *env,
+                             struct cmm_device *cmm,
+                             struct lu_fid *fid,
+                             struct md_attr *ma,
+                             struct lmv_stripe_md *lmv,
+                             int lmv_size)
 {
-        struct cmm_object *obj;
         struct md_create_spec *spec;
+        struct cmm_object *obj;
         int rc;
         ENTRY;
 
-        /* XXX Since capablity will not work with split. so we
-         * pass NULL capablity here */
-        obj = cmm_object_find(env, cmm, fid, NULL);
+        obj = cmm_object_find(env, cmm, fid);
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
 
@@ -181,18 +191,48 @@ static int cmm_creat_remote_obj(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int cmm_create_slave_objects(const struct lu_env *env,
-                                    struct md_object *mo, struct md_attr *ma)
+static int cmm_fid_alloc(const struct lu_env *env,
+                         struct cmm_device *cmm,
+                         struct mdc_device *mc,
+                         struct lu_fid *fid)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(cmm != NULL);
+        LASSERT(mc != NULL);
+        LASSERT(fid != NULL);
+
+        down(&mc->mc_fid_sem);
+
+        /* Alloc new fid on @mc. */
+        rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
+        if (rc > 0) {
+                /* Setup FLD for new sequenceif needed. */
+                rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
+                                       mc->mc_num, env);
+                if (rc)
+                        CERROR("Can't create fld entry, rc %d\n", rc);
+        }
+        up(&mc->mc_fid_sem);
+        
+        RETURN(rc);
+}
+
+static int cmm_slaves_create(const struct lu_env *env,
+                             struct md_object *mo,
+                             struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
-        int lmv_size, i, rc;
-        struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
+        struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
+        struct mdc_device *mc, *tmp;
+        int lmv_size, i = 1, rc = 0;
         ENTRY;
 
         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
 
-        /* This lmv will be free after finish splitting. */
+        /* This lmv will free after finish splitting. */
         OBD_ALLOC(lmv, lmv_size);
         if (!lmv)
                 RETURN(-ENOMEM);
@@ -201,13 +241,9 @@ static int cmm_create_slave_objects(const struct lu_env *env,
         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         lmv->mea_count = cmm->cmm_tgt_count + 1;
 
+        /* Store master FID to local node idx number. */
         lmv->mea_ids[0] = *lf;
 
-        rc = cmm_alloc_fid(env, cmm, &lmv->mea_ids[1],
-                           cmm->cmm_tgt_count);
-        if (rc)
-                GOTO(cleanup, rc);
-
         OBD_ALLOC_PTR(slave_lmv);
         if (!slave_lmv)
                 GOTO(cleanup, rc = -ENOMEM);
@@ -215,23 +251,41 @@ static int cmm_create_slave_objects(const struct lu_env *env,
         slave_lmv->mea_master = cmm->cmm_local_num;
         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         slave_lmv->mea_count = 0;
-        for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
-                rc = cmm_creat_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
-                                          slave_lmv, sizeof(slave_lmv));
+
+        list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
+                /* Alloc fid for slave object. */
+                rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
+                if (rc) {
+                        CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
+                               mc->mc_num, rc);
+                        GOTO(cleanup, rc);
+                }
+
+                /* Create slave on remote MDT. */
+                rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
+                                       slave_lmv, sizeof(*slave_lmv));
                 if (rc)
                         GOTO(cleanup, rc);
+                i++;
         }
 
         ma->ma_lmv_size = lmv_size;
         ma->ma_lmv = lmv;
+        EXIT;
 cleanup:
         if (slave_lmv)
                 OBD_FREE_PTR(slave_lmv);
-        RETURN(rc);
+        if (rc && lmv) {
+                OBD_FREE(lmv, lmv_size);
+                ma->ma_lmv = NULL;
+                ma->ma_lmv_size = 0;
+        }
+        return rc;
 }
 
 static int cmm_send_split_pages(const struct lu_env *env,
-                                struct md_object *mo, struct lu_rdpg *rdpg,
+                                struct md_object *mo,
+                                struct lu_rdpg *rdpg,
                                 struct lu_fid *fid, int len)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
@@ -239,7 +293,7 @@ static int cmm_send_split_pages(const struct lu_env *env,
         int rc = 0;
         ENTRY;
 
-        obj = cmm_object_find(env, cmm, fid, NULL);
+        obj = cmm_object_find(env, cmm, fid);
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
 
@@ -249,6 +303,55 @@ static int cmm_send_split_pages(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int cmm_remove_dir_ent(const struct lu_env *env,
+                              struct md_object *mo,
+                              struct lu_dirent *ent)
+{
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct cmm_object *obj;
+        char *name;
+        int is_dir, rc;
+        ENTRY;
+
+        if (!strncmp(ent->lde_name, ".", ent->lde_namelen) ||
+            !strncmp(ent->lde_name, "..", ent->lde_namelen))
+                RETURN(0);
+
+        obj = cmm_object_find(env, cmm, &ent->lde_fid);
+        if (IS_ERR(obj))
+                RETURN(PTR_ERR(obj));
+
+        if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
+                is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
+        else
+                /* XXX: is this correct? */
+                is_dir = 1;
+
+        OBD_ALLOC(name, ent->lde_namelen + 1);
+        if (!name)
+                GOTO(cleanup, rc = -ENOMEM);
+
+        memcpy(name, ent->lde_name, ent->lde_namelen);
+        rc = mdo_name_remove(env, md_object_next(mo),
+                             name, is_dir);
+        OBD_FREE(name, ent->lde_namelen + 1);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        /*
+         * This ent will be transferred to slave MDS and insert it there, so in
+         * the slave MDS, we should know whether this object is dir or not, so
+         * use the highest bit of the hash to indicate that (because we do not
+         * use highest bit of hash).
+         */
+        if (is_dir)
+                ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
+cleanup:
+        cmm_object_put(env, obj);
+
+        RETURN(rc);
+}
+
 static int cmm_remove_entries(const struct lu_env *env,
                               struct md_object *mo, struct lu_rdpg *rdpg,
                               __u32 hash_end, __u32 *len)
@@ -261,26 +364,13 @@ static int cmm_remove_entries(const struct lu_env *env,
         kmap(rdpg->rp_pages[0]);
         dp = page_address(rdpg->rp_pages[0]);
         for (ent = lu_dirent_start(dp); ent != NULL;
-                          ent = lu_dirent_next(ent)) {
+             ent = lu_dirent_next(ent)) {
                 if (ent->lde_hash < hash_end) {
-                        if (strncmp(ent->lde_name, ".", ent->lde_namelen) &&
-                            strncmp(ent->lde_name, "..", ent->lde_namelen)) {
-                                char *name;
-                                /* FIXME: Here we allocate name for each name,
-                                 * maybe stupid, but can not find better way.
-                                 * will find better way */
-                                OBD_ALLOC(name, ent->lde_namelen + 1);
-                                memcpy(name, ent->lde_name, ent->lde_namelen);
-                                rc = mdo_name_remove(env, md_object_next(mo),
-                                                     name, 0);
-                                OBD_FREE(name, ent->lde_namelen + 1);
-                        }
+                        rc = cmm_remove_dir_ent(env, mo, ent);
                         if (rc) {
-                                /* FIXME: Do not know why it return -ENOENT
-                                 * in some case
-                                 * */
-                                if (rc != -ENOENT)
-                                        GOTO(unmap, rc);
+                                CERROR("Can not del %s rc %d\n", ent->lde_name,
+                                                                 rc);
+                                GOTO(unmap, rc);
                         }
                 } else {
                         if (ent != lu_dirent_start(dp))
@@ -291,9 +381,10 @@ static int cmm_remove_entries(const struct lu_env *env,
                 }
         }
         *len = CFS_PAGE_SIZE;
+        EXIT;
 unmap:
         kunmap(rdpg->rp_pages[0]);
-        RETURN(rc);
+        return rc;
 }
 
 static int cmm_split_entries(const struct lu_env *env,
@@ -304,8 +395,9 @@ static int cmm_split_entries(const struct lu_env *env,
         ENTRY;
 
         LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
-                        "npages %d \n", rdpg->rp_npages);
-        /* Read splitted page and send them to the slave master */
+                 "npages %d\n", rdpg->rp_npages);
+
+        /* Read split page and send them to the slave master. */
         do {
                 struct lu_dirpage *ldp;
                 __u32  len = 0;
@@ -315,14 +407,8 @@ static int cmm_split_entries(const struct lu_env *env,
                 kunmap(rdpg->rp_pages[0]);
 
                 rc = mo_readpage(env, md_object_next(mo), rdpg);
-                /* -E2BIG means it already reach the end of the dir */
-                if (rc) {
-                        if (rc != -ERANGE) {
-                                if (rc == -E2BIG)
-                                        rc = 0;
-                                RETURN(rc);
-                        }
-                }
+                if (rc)
+                        RETURN(rc);
 
                 /* Remove the old entries */
                 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
@@ -347,13 +433,16 @@ static int cmm_split_entries(const struct lu_env *env,
 
         RETURN(rc);
 }
+
 #define SPLIT_PAGE_COUNT 1
+
 static int cmm_scan_and_split(const struct lu_env *env,
-                              struct md_object *mo, struct md_attr *ma)
+                              struct md_object *mo,
+                              struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct lu_rdpg *rdpg = NULL;
         __u32 hash_segement;
-        struct lu_rdpg   *rdpg = NULL;
         int rc = 0, i;
 
         OBD_ALLOC_PTR(rdpg);
@@ -363,7 +452,7 @@ static int cmm_scan_and_split(const struct lu_env *env,
         rdpg->rp_npages = SPLIT_PAGE_COUNT;
         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
 
-        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
         if (rdpg->rp_pages == NULL)
                 GOTO(free_rdpg, rc = -ENOMEM);
 
@@ -375,91 +464,80 @@ static int cmm_scan_and_split(const struct lu_env *env,
 
         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
-                struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
+                struct lu_fid *lf;
                 __u32 hash_end;
 
+                lf = &ma->ma_lmv->mea_ids[i];
+
                 rdpg->rp_hash = i * hash_segement;
                 hash_end = rdpg->rp_hash + hash_segement;
                 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
                 if (rc)
                         GOTO(cleanup, rc);
         }
+        EXIT;
 cleanup:
         for (i = 0; i < rdpg->rp_npages; i++)
                 if (rdpg->rp_pages[i] != NULL)
                         __free_pages(rdpg->rp_pages[i], 0);
         if (rdpg->rp_pages)
                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
-                                         sizeof rdpg->rp_pages[0]);
+                         sizeof rdpg->rp_pages[0]);
 free_rdpg:
         if (rdpg)
                 OBD_FREE_PTR(rdpg);
 
-        RETURN(rc);
-}
-
-static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
-                                  ssize_t len)
-{
-        struct lu_buf *buf;
-        
-        buf = &cmm_env_info(env)->cmi_buf;
-        buf->lb_buf = area;
-        buf->lb_len = len;
-        return buf;
+        return rc;
 }
 
-int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
+int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct md_attr *ma;
+        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
         struct lu_buf *buf;
         int rc = 0;
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
+        memset(ma, 0, sizeof(*ma));
 
-        OBD_ALLOC_PTR(ma);
-        if (ma == NULL)
-                RETURN(-ENOMEM);
-
-        ma->ma_need = MA_INODE|MA_LMV;
-        rc = mo_attr_get(env, mo, ma);
-        if (rc)
-                GOTO(cleanup, ma);
-
-        /* step1: checking whether the dir need to be splitted */
+        /* Step1: Checking whether the dir needs to be split. */
         rc = cmm_expect_splitting(env, mo, ma);
         if (rc != CMM_EXPECT_SPLIT)
                 GOTO(cleanup, rc = 0);
 
-        /* Disable trans for splitting, since there will be
-         * so many trans in this one ops, confilct with current
-         * recovery design */
+        /*
+         * Disable trans for splitting, since there will be so many trans in
+         * this one ops, confilct with current recovery design.
+         */
         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
         if (rc)
                 GOTO(cleanup, rc = 0);
 
-        /* step2: create slave objects */
-        rc = cmm_create_slave_objects(env, mo, ma);
+        /* Step2: Create slave objects (on slave MDTs) */
+        rc = cmm_slaves_create(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
-        /* step3: scan and split the object */
+        /* Step3: Scan and split the object. */
         rc = cmm_scan_and_split(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
-        /* step4: set mea to the master object */
-        rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
-        if (rc == -ERESTART)
-                CWARN("Dir"DFID" has been split \n",
-                                PFID(lu_object_fid(&mo->mo_lu)));
+        
+        /* Step4: Set mea to the master object. */
+        rc = mo_xattr_set(env, md_object_next(mo), buf,
+                          MDS_LMV_MD_NAME, 0);
+        if (rc == -ERESTART) {
+                CWARN("Dir "DFID" has been split\n",
+                      PFID(lu_object_fid(&mo->mo_lu)));
+        }
+        EXIT;
 cleanup:
         if (ma->ma_lmv_size && ma->ma_lmv)
                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
-
-        OBD_FREE_PTR(ma);
-        RETURN(rc);
+        
+        return rc;
 }
+