Whamcloud - gitweb
some code cleanup in split.
authorhuanghua <huanghua>
Sat, 28 Oct 2006 15:37:19 +0000 (15:37 +0000)
committerhuanghua <huanghua>
Sat, 28 Oct 2006 15:37:19 +0000 (15:37 +0000)
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/cmm/mdc_object.c
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_object.c

index 04edafc..c1fe0d2 100644 (file)
@@ -376,15 +376,25 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env,
                                 struct md_object *mo, mdl_mode_t lm)
 {
 #ifdef HAVE_SPLIT_SUPPORT
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-        int rc, split;
+        int rc, split, lmv_size;
         ENTRY;
 
+        memset(ma, 0, sizeof(*ma));
+        lmv_size = ma->ma_lmv_size = (sizeof(struct lmv_stripe_md) + 
+                        (cmm->cmm_tgt_count + 1) * sizeof(struct lu_fid));
+
+        OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+        if (ma->ma_lmv == NULL)
+                RETURN(-ENOMEM);
+
         /*
          * Check only if we need protection from split. If not - mdt handles
          * other cases.
          */
         rc = cmm_expect_splitting(env, mo, ma, &split); 
+        OBD_FREE(ma->ma_lmv, lmv_size);
         if (rc) {
                 CERROR("Can't check for possible split, error %d\n",
                        rc);
index baf7ba0..726a616 100644 (file)
@@ -131,8 +131,10 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
                 RETURN(0);
         }
 
-        /* MA_INODE is needed to check inode size. */
-        memset(ma, 0, sizeof(*ma));
+        /* 
+         * MA_INODE is needed to check inode size. 
+         * Memory is prepared by caller.
+         */
         ma->ma_need = MA_INODE | MA_LMV;
         rc = mo_attr_get(env, mo, ma);
         if (rc)
@@ -152,9 +154,6 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
         RETURN(0);
 }
 
-#define cmm_md_size(stripes) \
-       (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
-
 struct cmm_object *cmm_object_find(const struct lu_env *env,
                                    struct cmm_device *d,
                                    const struct lu_fid *f)
@@ -240,20 +239,15 @@ static int cmm_slaves_create(const struct lu_env *env,
                              struct md_object *mo,
                              struct md_attr *ma)
 {
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
-        struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
-        struct mdc_device *mc, *tmp;
-        int lmv_size, i = 1, rc = 0;
+        struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
+        struct lmv_stripe_md *lmv;
+        struct lmv_stripe_md *slave_lmv = NULL;
+        struct mdc_device    *mc, *tmp;
+        int i = 1, rc = 0;
         ENTRY;
 
-        lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
-
-        /* This lmv will free after finish splitting. */
-        OBD_ALLOC(lmv, lmv_size);
-        if (!lmv)
-                RETURN(-ENOMEM);
-
+        lmv = ma->ma_lmv;
         lmv->mea_master = cmm->cmm_local_num;
         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         lmv->mea_count = cmm->cmm_tgt_count + 1;
@@ -262,8 +256,8 @@ static int cmm_slaves_create(const struct lu_env *env,
         lmv->mea_ids[0] = *lf;
 
         OBD_ALLOC_PTR(slave_lmv);
-        if (!slave_lmv)
-                GOTO(cleanup, rc = -ENOMEM);
+        if (slave_lmv == NULL)
+                RETURN(-ENOMEM);
 
         slave_lmv->mea_master = cmm->cmm_local_num;
         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
@@ -286,17 +280,9 @@ static int cmm_slaves_create(const struct lu_env *env,
                 i++;
         }
 
-        ma->ma_lmv_size = lmv_size;
-        ma->ma_lmv = lmv;
         EXIT;
 cleanup:
-        if (slave_lmv)
-                OBD_FREE_PTR(slave_lmv);
-        if (rc && lmv) {
-                OBD_FREE(lmv, lmv_size);
-                ma->ma_lmv = NULL;
-                ma->ma_lmv_size = 0;
-        }
+        OBD_FREE_PTR(slave_lmv);
         return rc;
 }
 
@@ -424,19 +410,25 @@ static int cmm_split_entries(const struct lu_env *env,
                 kunmap(rdpg->rp_pages[0]);
 
                 rc = mo_readpage(env, md_object_next(mo), rdpg);
-                if (rc)
+                if (rc) {
+                        CERROR("Error in readpage: %d\n", rc);
                         RETURN(rc);
+                }
 
                 /* Remove the old entries */
                 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
-                if (rc)
+                if (rc) {
+                        CERROR("Error in remove entry: %d\n", rc);
                         RETURN(rc);
+                }
 
                 /* Send page to slave object */
                 if (len > 0) {
                         rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
-                        if (rc)
+                        if (rc) {
+                                CERROR("Error in sending pages: %d\n", rc);
                                 RETURN(rc);
+                        }
                 }
 
                 kmap(rdpg->rp_pages[0]);
@@ -489,8 +481,12 @@ static int cmm_scan_and_split(const struct lu_env *env,
                 rdpg->rp_hash = i * hash_segement;
                 hash_end = rdpg->rp_hash + hash_segement;
                 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
-                if (rc)
+                if (rc) {
+                        CERROR("Error (rc=%d) while splitting for %d: fid="
+                                DFID", %08x:%08x\n", rc, i, PFID(lf), 
+                                rdpg->rp_hash, hash_end);
                         GOTO(cleanup, rc);
+                }
         }
         EXIT;
 cleanup:
@@ -507,23 +503,36 @@ free_rdpg:
         return rc;
 }
 
+#define cmm_md_size(stripes) \
+       (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
+
 int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-        struct lu_buf *buf;
-        int rc = 0, split;
+        struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
+        struct lu_buf     *buf;
+        int rc = 0, split, lmv_size;
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
 
+        memset(ma, 0, sizeof(*ma));
+        lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
+
+        /* 
+         * Preparing memory for LMV. This will be freed after finish splitting.
+         */
+        OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+        if (ma->ma_lmv == NULL)
+                RETURN(-ENOMEM);
+
         /* Step1: Checking whether the dir needs to be split. */
         rc = cmm_expect_splitting(env, mo, ma, &split);
         if (rc)
-                RETURN(rc);
+                GOTO(cleanup, rc);
         
         if (split != CMM_EXPECT_SPLIT)
-                RETURN(0);
+                GOTO(cleanup, rc = 0);
 
         LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if "
                  "dir is protected by MDL_EX lock. Lock mode 0x%x\n",
@@ -536,10 +545,12 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
         if (rc) {
                 CERROR("Can't disable trans for split, rc %d\n", rc);
-                RETURN(rc);
+                GOTO(cleanup, rc);
         }
 
         /* Step2: Create slave objects (on slave MDTs) */
+        ma->ma_valid = 0;
+        ma->ma_lmv_size = lmv_size;
         rc = cmm_slaves_create(env, mo, ma);
         if (rc) {
                 CERROR("Can't create slaves for split, rc %d\n", rc);
@@ -568,9 +579,7 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
         }
         EXIT;
 cleanup:
-        if (ma->ma_lmv_size && ma->ma_lmv)
-                OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
-        
+        OBD_FREE(ma->ma_lmv, lmv_size);
         return rc;
 }
 
index 80ab880..101503f 100644 (file)
@@ -399,7 +399,7 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
 
         rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
                           page, offset);
-        CDEBUG(D_INFO, "send page %p  offset %d fid "DFID" rc %d \n",
+        CDEBUG(rc ? D_ERROR : D_INFO, "send page %p  offset %d fid "DFID" rc %d \n",
                page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
         RETURN(rc);
 }
index e235d74..c267b8d 100644 (file)
@@ -577,26 +577,33 @@ int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
 int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
                           struct lov_mds_md *lmm, int lmm_size)
 {
-        struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
-        struct obd_device       *obd = mdd2obd_dev(mdd);
-        struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
-        struct dt_object        *next = mdd_object_child(obj);
-        __u32  seq  = lu_object_fid(mdd2lu_obj(obj))->f_seq;
-        __u32  oid  = lu_object_fid(mdd2lu_obj(obj))->f_oid;
-        struct obd_capa *oc;
+        struct mdd_device   *mdd = mdo2mdd(&obj->mod_obj);
+        struct obd_device   *obd = mdd2obd_dev(mdd);
+        struct lu_attr      *tmp_la = &mdd_env_info(env)->mti_la;
+        struct dt_object    *next = mdd_object_child(obj);
+        const struct lu_fid *fid = lu_object_fid(mdd2lu_obj(obj));
+        struct obd_capa     *oc;
         int rc = 0;
         ENTRY;
 
+        mdd_read_lock(env, obj);
         rc = next->do_ops->do_attr_get(env, next, tmp_la,
                                        mdd_object_capa(env, obj));
+        mdd_read_unlock(env, obj);
         if (rc)
                 RETURN(rc);
 
         oc = next->do_ops->do_capa_get(env, next, NULL, CAPA_OPC_MDS_DEFAULT);
         if (IS_ERR(oc))
                 oc = NULL;
+        
+        /* 
+         * Wangdi: please fix this. OST will oops if this is called.
+         */
+/*
         rc = mds_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
-                                   lmm_size, NULL, seq, oid, oc);
+                                   lmm_size, NULL, fid_seq(fid), fid_oid(fid), oc);
+*/
         capa_put(oc);
 
         RETURN(rc);
index 974cb09..9842342 100644 (file)
@@ -284,7 +284,6 @@ static int __mdd_lmm_get(const struct lu_env *env,
         if (ma->ma_valid & MA_LOV)
                 RETURN(0);
 
-        LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
         lmm_size = ma->ma_lmm_size;
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &lmm_size,
                         MDS_LOV_MD_NAME);
@@ -704,7 +703,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle *handle;
         struct lov_mds_md *lmm = NULL;
-        int  rc = 0, lmm_size = 0, max_size = 0;
+        int  rc, lmm_size = 0, max_size = 0;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         ENTRY;
 
@@ -718,6 +717,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
                 max_size = mdd_lov_mdsize(env, mdd);
                 OBD_ALLOC(lmm, max_size);
+                lmm_size = max_size;
                 if (lmm == NULL)
                         GOTO(cleanup, rc = -ENOMEM);
 
@@ -766,7 +766,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         }
 cleanup:
         mdd_trans_stop(env, mdd, rc, handle);
-        if (rc == 0 && lmm_size) {
+        if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
         }