Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Wed, 13 Sep 2006 15:39:57 +0000 (15:39 +0000)
committerwangdi <wangdi>
Wed, 13 Sep 2006 15:39:57 +0000 (15:39 +0000)
a lot fixes about splitting dir

23 files changed:
lustre/cmm/cmm_split.c
lustre/cmm/mdc_device.c
lustre/cmm/mdc_internal.h
lustre/cmm/mdc_object.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_req_layout.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/liblustre/namei.c
lustre/liblustre/super.c
lustre/llite/llite_lib.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/lmv/lmv_object.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_handler.c
lustre/mds/handler.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_open.c
lustre/obdclass/mea.c
lustre/ptlrpc/layout.c

index af7f9c2..ce0bffc 100644 (file)
@@ -36,6 +36,7 @@
 #include <obd_class.h>
 #include <lustre_fid.h>
 #include <lustre_mds.h>
+#include <lustre_idl.h>
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
@@ -44,7 +45,7 @@
 #define CMM_NO_SPLITTABLE       2
 
 enum {
-        SPLIT_SIZE =  8*1024
+        SPLIT_SIZE =  12*1024
 };
 
 static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
@@ -74,6 +75,8 @@ static int cmm_expect_splitting(const struct lu_context *ctx,
         if (rc)
                 GOTO(cleanup, rc);
 
+        rc = CMM_EXPECT_SPLIT;
+
         if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
@@ -84,7 +87,7 @@ cleanup:
 }
 
 #define cmm_md_size(stripes)                            \
-       (sizeof(struct lmv_stripe_md) + stripes * sizeof(struct lu_fid))
+       (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
 
 static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
                          struct lu_fid *fid, int count)
@@ -100,7 +103,7 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
                                  mc_linkage) {
                 LASSERT(cmm->cmm_local_num != mc->mc_num);
 
-                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
+                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
                 if (rc > 0) {
                         struct lu_site *ls;
 
@@ -113,6 +116,7 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
                         spin_unlock(&cmm->cmm_tgt_guard);
                         RETURN(rc);
                 }
+                i++;
         }
         spin_unlock(&cmm->cmm_tgt_guard);
         LASSERT(i == count);
@@ -183,9 +187,9 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
         if (!lmv)
                 RETURN(-ENOMEM);
 
-        lmv->mea_master = -1;
-        lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
-        lmv->mea_count = cmm->cmm_tgt_count;
+        lmv->mea_master = cmm->cmm_local_num;
+        lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
+        lmv->mea_count = cmm->cmm_tgt_count + 1;
 
         lmv->mea_ids[0] = *lf;
 
@@ -193,15 +197,12 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
         if (rc)
                 GOTO(cleanup, rc);
 
-        for (i = 1; i < cmm->cmm_tgt_count; i ++) {
+        for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
                 rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
                 if (rc)
                         GOTO(cleanup, rc);
         }
 
-        rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
-                          MDS_LMV_MD_NAME, 0);
-
         ma->ma_lmv_size = lmv_size;
         ma->ma_lmv = lmv;
 cleanup:
@@ -290,7 +291,6 @@ static int cmm_remove_entries(const struct lu_context *ctx,
         RETURN(rc);
 }
 #endif
-#define MAX_HASH_SIZE 0x3fffffff
 #define SPLIT_PAGE_COUNT 1
 static int cmm_scan_and_split(const struct lu_context *ctx,
                               struct md_object *mo, struct md_attr *ma)
@@ -317,8 +317,8 @@ static int cmm_scan_and_split(const struct lu_context *ctx,
                         GOTO(cleanup, rc = -ENOMEM);
         }
 
-        hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
-        for (i = 1; i < cmm->cmm_tgt_count; i++) {
+        hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
+        for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
                 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
                 __u32 hash_end;
 
@@ -355,7 +355,7 @@ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
         if (ma == NULL)
                 RETURN(-ENOMEM);
 
-        ma->ma_need = MA_INODE;
+        ma->ma_need = MA_INODE|MA_LMV;
         rc = mo_attr_get(ctx, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
@@ -372,7 +372,16 @@ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
 
         /* step3: scan and split the object */
         rc = cmm_scan_and_split(ctx, mo, ma);
+        if (rc)
+                GOTO(cleanup, ma);
+
+        /* step4: set mea to the master object */
+        rc = mo_xattr_set(ctx, md_object_next(mo), ma->ma_lmv, ma->ma_lmv_size,
+                          MDS_LMV_MD_NAME, 0);
 
+        if (rc == -ERESTART) 
+                CWARN("Dir"DFID" has been split \n", 
+                                PFID(lu_object_fid(&mo->mo_lu)));
 cleanup:
         if (ma->ma_lmv_size && ma->ma_lmv)
                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
index a476cc2..1780677 100644 (file)
@@ -35,6 +35,7 @@
 #include <obd_class.h>
 #include <lprocfs_status.h>
 #include <lustre_ver.h>
+#include "cmm_internal.h"
 #include "mdc_internal.h"
 
 static struct lu_device_operations mdc_lu_ops;
index f5a5d30..a86f358 100644 (file)
@@ -96,8 +96,8 @@ struct lu_object *mdc_object_alloc(const struct lu_context *,
                                    const struct lu_object_header *,
                                    struct lu_device *);
 #ifdef HAVE_SPLIT_SUPPORT
-int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
-                  struct page *page, __u32 end);
+int mdc_send_page(struct cmm_device *cmm, const struct lu_context *ctx, 
+                  struct md_object *mo, struct page *page, __u32 end);
 #endif
 
 #endif /* __KERNEL__ */
index 5f595be..10bf0e9 100644 (file)
@@ -35,6 +35,7 @@
 #include <lustre_lib.h>
 #include <obd_class.h>
 #include <lustre_mdc.h>
+#include "cmm_internal.h"
 #include "mdc_internal.h"
 
 static struct md_object_operations mdc_mo_ops;
@@ -256,14 +257,19 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx,
 
         kmap(page);
         dp = page_address(page);
+
+        ent = lu_dirent_start(dp);
+        if (ent->lde_hash > end)
+                RETURN(-E2BIG);
+
         for (ent = lu_dirent_start(dp); ent != NULL;
              ent = lu_dirent_next(ent)) {
-                if (ent->lde_hash < end) {
+                if (ent->lde_hash > end) {
                         offset = (int)((__u32)ent - (__u32)dp);
                         rc1 = -E2BIG;
                         goto send_page;
                 }
-                        
+
                 /* allocate new fid for each obj */
                 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL);
                 if (rc > 0) {
@@ -274,7 +280,7 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx,
                                                fid_seq(&ent->lde_fid),
                                                mc->mc_num, ctx);
                 }
-                
+
                 if (rc < 0) {
                         kunmap(page);
                         RETURN(rc);
index 17fd2d2..ead982e 100644 (file)
@@ -298,6 +298,7 @@ static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
 
 #define MEA_MAGIC_LAST_CHAR      0xb2221ca1
 #define MEA_MAGIC_ALL_CHARS      0xb222a11c
+#define MEA_MAGIC_HASH_SEGMENT   0xb222a11b
 
 struct lmv_stripe_md {
         __u32         mea_magic;
index 5f33a29..46348ad 100644 (file)
@@ -109,6 +109,7 @@ extern const struct req_format RQF_MDS_PIN;
 extern const struct req_format RQF_MDS_CONNECT;
 extern const struct req_format RQF_MDS_DISCONNECT;
 extern const struct req_format RQF_MDS_READPAGE;
+extern const struct req_format RQF_MDS_WRITEPAGE;
 extern const struct req_format RQF_MDS_DONE_WRITING;
 
 /*
index debc5cf..640a5f6 100644 (file)
@@ -1159,7 +1159,8 @@ struct md_ops {
         int (*m_init_ea_size)(struct obd_export *, int, int, int);
 
         int (*m_get_lustre_md)(struct obd_export *, struct ptlrpc_request *,
-                               int, struct obd_export *, struct lustre_md *);
+                               int, struct obd_export *, struct obd_export *, 
+                               struct lustre_md *);
 
         int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *);
 
index 0d94bc0..995d2af 100644 (file)
@@ -1767,13 +1767,14 @@ static inline int md_unlink(struct obd_export *exp, struct md_op_data *op_data,
 static inline int md_get_lustre_md(struct obd_export *exp,
                                    struct ptlrpc_request *req,
                                    int offset, struct obd_export *dt_exp,
+                                   struct obd_export *md_exp,
                                    struct lustre_md *md)
 {
         ENTRY;
         EXP_CHECK_MD_OP(exp, get_lustre_md);
         MD_COUNTER_INCREMENT(exp->exp_obd, get_lustre_md);
         RETURN(MDP(exp->exp_obd, get_lustre_md)(exp, req, offset,
-                                                dt_exp, md));
+                                                dt_exp, md_exp, md));
 }
 
 static inline int md_free_lustre_md(struct obd_export *exp,
index aea7407..9da93a1 100644 (file)
@@ -214,7 +214,8 @@ static int pnode_revalidate_finish(struct ptlrpc_request *req,
                 RETURN(-ENOENT);
 
         rc = md_get_lustre_md(llu_i2sbi(inode)->ll_md_exp, req,
-                              offset, llu_i2sbi(inode)->ll_dt_exp, &md);
+                              offset, llu_i2sbi(inode)->ll_dt_exp, 
+                              llu_i2sbi(inode)->ll_md_exp, &md);
         if (rc)
                 RETURN(rc);
 
@@ -357,7 +358,7 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
                         ptlrpc_req_finished(request);
 
                 rc = md_get_lustre_md(sbi->ll_md_exp, request, offset,
-                                      sbi->ll_dt_exp, &md);
+                                      sbi->ll_dt_exp, sbi->ll_md_exp, &md);
                 if (rc)
                         RETURN(rc);
 
index 3676070..6dad0bc 100644 (file)
@@ -454,7 +454,7 @@ static int llu_inode_revalidate(struct inode *inode)
                         RETURN(-abs(rc));
                 }
                 rc = md_get_lustre_md(sbi->ll_md_exp, req, REPLY_REC_OFF,
-                                      sbi->ll_dt_exp, &md);
+                                      sbi->ll_dt_exp, sbi->ll_md_exp, &md);
 
                 /* XXX Too paranoid? */
                 if (((md.body->valid ^ valid) & OBD_MD_FLEASIZE) &&
@@ -701,7 +701,7 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr)
                 }
 
                 rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
-                                      sbi->ll_dt_exp, &md);
+                                      sbi->ll_dt_exp, sbi->ll_md_exp, &md);
                 if (rc) {
                         ptlrpc_req_finished(request);
                         RETURN(rc);
@@ -1730,7 +1730,7 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
         }
         
         rc = md_get_lustre_md(sbi->ll_md_exp, req,
-                              1, sbi->ll_dt_exp, &md);
+                              1, sbi->ll_dt_exp, sbi->ll_md_exp, &md);
         if (rc)
                 GOTO(out, rc);
         
@@ -2085,7 +2085,7 @@ llu_fsswop_mount(const char *source,
         }
 
         err = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
-                               sbi->ll_dt_exp, &md);
+                               sbi->ll_dt_exp, sbi->ll_md_exp, &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n",err);
                 GOTO(out_request, err);
index 58df98f..d36139f 100644 (file)
@@ -342,7 +342,8 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
         }
 
         err = md_get_lustre_md(sbi->ll_md_exp, request, 
-                               REPLY_REC_OFF, sbi->ll_dt_exp, &md);
+                               REPLY_REC_OFF, sbi->ll_dt_exp, sbi->ll_md_exp, 
+                               &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n", err);
                 ptlrpc_req_finished (request);
@@ -1279,7 +1280,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 }
 
                 rc = md_get_lustre_md(sbi->ll_md_exp, request, 
-                                      REPLY_REC_OFF, sbi->ll_dt_exp, &md);
+                                      REPLY_REC_OFF, sbi->ll_dt_exp, 
+                                      sbi->ll_md_exp, &md);
                 if (rc) {
                         ptlrpc_req_finished(request);
                         RETURN(rc);
@@ -1935,7 +1937,7 @@ int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
         prune_deathrow(sbi, 1);
 
         rc = md_get_lustre_md(sbi->ll_md_exp, req, offset,
-                              sbi->ll_dt_exp, &md);
+                              sbi->ll_dt_exp, sbi->ll_md_exp, &md);
         if (rc)
                 RETURN(rc);
 
index 3d5d58b..efe7337 100644 (file)
@@ -37,6 +37,7 @@
 #endif
 
 #include <lustre/lustre_idl.h>
+#include <lustre_idl.h>
 #include <obd_support.h>
 #include <lustre_lib.h>
 #include <lustre_net.h>
@@ -178,10 +179,14 @@ repeat:
                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                    (char *)op_data->name, op_data->namelen);
 
-                CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
-                       mds, PFID(&rpid));
                 rpid = obj->lo_inodes[mds].li_fid;
+                rc = lmv_fld_lookup(lmv, &rpid, &mds);
                 lmv_obj_put(obj);
+                if (rc) 
+                        GOTO(out_free_sop_data, rc);
+                
+                CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
+                       mds, PFID(&rpid));
         }
 
         sop_data->fid1 = rpid;
@@ -878,7 +883,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                 LASSERT(body);
 
 update:
-                obj->lo_inodes[i].li_size = body->size;
+                obj->lo_inodes[i].li_size = (MAX_HASH_SIZE/obj->lo_objcount) * 
+                                            (i + 1);
 
                 CDEBUG(D_OTHER, "fresh: %lu\n",
                        (unsigned long)obj->lo_inodes[i].li_size);
index d9342be..24644a0 100644 (file)
@@ -29,6 +29,7 @@
 #include <linux/init.h>
 #include <linux/slab.h>
 #include <linux/pagemap.h>
+#include <linux/mm.h>
 #include <asm/div64.h>
 #include <linux/seq_file.h>
 #include <linux/namei.h>
@@ -38,6 +39,7 @@
 #include <linux/ext2_fs.h>
 
 #include <lustre/lustre_idl.h>
+#include <lustre_idl.h>
 #include <lustre_log.h>
 #include <obd_support.h>
 #include <lustre_lib.h>
@@ -694,9 +696,37 @@ static int lmv_placement_policy(struct obd_device *obd,
 
 #endif
                 } else {
-                        /* default policy is to use parent MDS */
+                        struct lmv_obj *obj;
                         LASSERT(fid_is_sane(hint->ph_pfid));
-                        rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
+                        
+                        obj = lmv_obj_grab(obd, hint->ph_pfid);
+                        if (obj) {
+                                /* If the dir got split, alloc fid according 
+                                 * to its hash
+                                 */
+                                struct lu_fid *rpid;
+
+                                *mds = raw_name2idx(obj->lo_hashtype, 
+                                                    obj->lo_objcount,
+                                                    hint->ph_cname->name, 
+                                                    hint->ph_cname->len);
+                                rpid = &obj->lo_inodes[*mds].li_fid;
+                                rc = lmv_fld_lookup(lmv, rpid, mds);
+                                if (rc) {
+                                        lmv_obj_put(obj);
+                                        GOTO(exit, rc);
+                                }
+                                CDEBUG(D_INODE, "the obj "DFID" has been"
+                                       "splitted,got MDS at "LPU64" by name %s\n", 
+                                       PFID(hint->ph_pfid), *mds, 
+                                       hint->ph_cname->name);
+
+                                rc = 0;
+                        } else {
+                                /* default policy is to use parent MDS */
+                                rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
+                        }
+                        
                 }
         } else {
                 /* sequences among all tgts are not well balanced, allocate new
@@ -705,7 +735,7 @@ static int lmv_placement_policy(struct obd_device *obd,
                 *mds = 0;
                 rc = -EINVAL;
         }
-
+exit:
         if (rc) {
                 CERROR("cannot choose MDS, err = %d\n", rc);
         } else {
@@ -1180,7 +1210,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
                 GOTO(cleanup, rc);
         }
 
-        rc = md_get_lustre_md(tgt_exp, req, 0, NULL, &md);
+        rc = md_get_lustre_md(tgt_exp, req, 1, NULL, exp, &md);
         if (rc) {
                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
                 GOTO(cleanup, rc);
@@ -1824,7 +1854,7 @@ static int lmv_readpage(struct obd_export *exp,
         struct obd_export *tgt_exp;
         struct lu_fid rid = *fid;
         struct lmv_obj *obj;
-        int i, rc;
+        int i = 0, rc;
         ENTRY;
 
         rc = lmv_check_connect(obd);
@@ -1847,22 +1877,47 @@ static int lmv_readpage(struct obd_export *exp,
                 rid = obj->lo_inodes[i].li_fid;
 
                 lmv_obj_unlock(obj);
-                lmv_obj_put(obj);
 
                 CDEBUG(D_OTHER, "forward to "DFID" with offset %lu\n",
                        PFID(&rid), (unsigned long)offset);
         }
-        
+
         tgt_exp = lmv_get_export(lmv, &rid);
         if (IS_ERR(tgt_exp))
-                RETURN(PTR_ERR(tgt_exp));
+                GOTO(cleanup, PTR_ERR(tgt_exp));
 
         rc = md_readpage(tgt_exp, &rid, offset, page, request);
-
+        if (rc) 
+                GOTO(cleanup, rc);
+#ifdef __KERNEL__
+        if (obj && i < obj->lo_objcount - 1) {
+                struct lu_dirpage *dp;
+                __u32 end;
+               /* This dirobj has been splitted, so we 
+                * check whether reach the end of one hash_segment
+                * and  reset ldp->ldp_hash_end
+                */
+                kmap(page);
+                dp = page_address(page); 
+                end = le32_to_cpu(dp->ldp_hash_end);
+                if (end == ~0ul) {
+                        __u32 hash_segment_end = (i + 1) * 
+                                            MAX_HASH_SIZE/obj->lo_objcount;
+                        dp->ldp_hash_end = cpu_to_le32(hash_segment_end); 
+                        CDEBUG(D_INFO,"reset hash end %x for split obj "DFID"",
+                               le32_to_cpu(dp->ldp_hash_end), PFID(&rid)); 
+                }
+                kunmap(page);
+                
+        }
+#endif
         /*
          * Here we could remove "." and ".." from all pages which at not from
          * master. But MDS has only "." and ".." for master dir.
          */
+cleanup:
+        if (obj)
+                lmv_obj_put(obj);
         RETURN(rc);
 }
 
@@ -2178,7 +2233,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
                 RETURN(mea_size);
 
         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
-            mea->mea_magic == MEA_MAGIC_ALL_CHARS)
+            mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
+            mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
         {
                 magic = le32_to_cpu(mea->mea_magic);
         } else {
@@ -2256,15 +2312,16 @@ int lmv_lock_match(struct obd_export *exp, int flags,
 }
 
 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
-                      int offset, struct obd_export *dt_exp,
-                      struct lustre_md *md)
+                      int offset, struct obd_export *dt_exp, 
+                      struct obd_export *md_exp, struct lustre_md *md)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         int rc;
 
         ENTRY;
-        rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md);
+        rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md_exp,
+                              md);
         RETURN(rc);
 }
 
index ab48f9f..ddd126c 100644 (file)
@@ -40,6 +40,7 @@
 #include <lustre_lib.h>
 #include <lustre_net.h>
 #include <lustre_dlm.h>
+#include <lustre_idl.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
 #include "lmv_internal.h"
@@ -64,7 +65,8 @@ lmv_obj_alloc(struct obd_device *obd,
         struct lmv_obd *lmv = &obd->u.lmv;
 
         LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR
-                || mea->mea_magic == MEA_MAGIC_ALL_CHARS);
+                || mea->mea_magic == MEA_MAGIC_ALL_CHARS 
+                || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT);
 
         OBD_SLAB_ALLOC(obj, obj_cache, CFS_ALLOC_STD,
                        sizeof(*obj));
@@ -317,7 +319,7 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
                         GOTO(cleanup, obj = ERR_PTR(rc));
                 }
 
-                rc = md_get_lustre_md(exp, req, 0, NULL, &md);
+                rc = md_get_lustre_md(exp, req, 0, NULL, exp, &md);
                 if (rc) {
                         CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
                         GOTO(cleanup, obj = ERR_PTR(rc));
index e1c73a0..43dcc49 100644 (file)
@@ -157,7 +157,9 @@ int mdc_open(struct obd_export *exp, obd_id ino, int type, int flags,
 struct obd_client_handle;
 
 int mdc_get_lustre_md(struct obd_export *md_exp, struct ptlrpc_request *req,
-                      int offset, struct obd_export *dt_exp, struct lustre_md *md);
+                      int offset, struct obd_export *dt_exp, 
+                      struct obd_export *lmv_exp,
+                      struct lustre_md *md);
 
 int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md);
 
index a6a5894..43f9dbc 100644 (file)
@@ -509,7 +509,18 @@ int mdc_enqueue(struct obd_export *exp,
                     !it_open_error(DISP_OPEN_OPEN, it))
                         mdc_set_open_replay_data(NULL, NULL, req);
 
-                if ((body->valid & OBD_MD_FLEASIZE) != 0) {
+                if ((body->valid & OBD_MD_FLDIREA) != 0) {
+                        if (body->eadatasize) {
+                                eadata = lustre_swab_repbuf(req, 
+                                                DLM_REPLY_REC_OFF + 1,
+                                                body->eadatasize, NULL);
+                                if (eadata == NULL) {
+                                        CERROR ("Missing/short eadata\n");
+                                        RETURN (-EPROTO);
+                                }
+                        }
+                }
+                if ((body->valid & OBD_MD_FLEASIZE)) {
                         /* The eadata is opaque; just check that it is there.
                          * Eventually, obd_unpackmd() will check the contents */
                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
index 6a7edaa..dfd3599 100644 (file)
@@ -379,7 +379,9 @@ int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req,
 #endif
 
 int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
-                      int offset, struct obd_export *dt_exp, struct lustre_md *md)
+                      int offset, struct obd_export *dt_exp, 
+                      struct obd_export *md_exp, 
+                      struct lustre_md *md)
 {
         int rc = 0;
         ENTRY;
@@ -420,9 +422,28 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
 
                 offset++;
         } else if (md->body->valid & OBD_MD_FLDIREA) {
-                /* TODO: umka, please handle this case */
+                int lmvsize;
+                struct lov_mds_md *lmv;
                 LASSERT(S_ISDIR(md->body->mode));
-                offset++;
+        
+                if (md->body->eadatasize == 0) {
+                        RETURN(0);
+                }
+                if (md->body->valid & OBD_MD_MEA) {
+                        lmvsize = md->body->eadatasize;
+                        lmv = lustre_msg_buf(req->rq_repmsg, offset, lmvsize);
+                        LASSERT (lmv != NULL);
+                        LASSERT_REPSWABBED(req, offset);
+
+                        rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv, 
+                                          lmvsize);
+                        if (rc < 0)
+                                RETURN(rc);
+
+                        LASSERT (rc >= sizeof (*md->mea));
+                }
+                rc = 0;
+                offset ++; 
         }
 
         /* for ACL, it's possible that FLACL is set but aclsize is zero.  only
index 5d9772a..01099fd 100644 (file)
@@ -281,7 +281,6 @@ static int __mdd_lmm_get(const struct lu_context *ctxt,
         RETURN(rc);
 }
 
-#ifdef HAVE_SPLIT_SUPPORT
 /* get lmv EA only*/
 static int __mdd_lmv_get(const struct lu_context *ctxt,
                          struct mdd_object *mdd_obj, struct md_attr *ma)
@@ -296,7 +295,6 @@ static int __mdd_lmv_get(const struct lu_context *ctxt,
         }
         RETURN(rc);
 }
-#endif
 
 static int mdd_attr_get_internal(const struct lu_context *ctxt,
                                  struct mdd_object *mdd_obj,
@@ -313,12 +311,10 @@ static int mdd_attr_get_internal(const struct lu_context *ctxt,
                     S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
         }
-#ifdef HAVE_SPLIT_SUPPORT
         if (rc == 0 && ma->ma_need & MA_LMV) {
                 if (S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmv_get(ctxt, mdd_obj, ma);
         }
-#endif
         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
                         rc, ma->ma_valid);
         RETURN(rc);
@@ -586,7 +582,7 @@ static int mdd_recovery_complete(const struct lu_context *ctxt,
         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
         int rc;
         ENTRY;
-/* TODO:
+        /* TODO:
         rc = mdd_lov_set_nextid(ctx, mdd);
         if (rc) {
                 CERROR("%s: mdd_lov_set_nextid failed %d\n",
@@ -730,6 +726,18 @@ static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o,
         if (buf && buf_len > 0) {
                 rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
                                                 0, handle);
+#ifdef HAVE_SPLIT_SUPPORT
+                if (rc == 0) {
+                        /* very ugly hack, if setting lmv, it means splitting 
+                         * sucess, we should return -ERESTART to notify the 
+                         * client, so transno for this splitting should be
+                         * zero according to the replay rules. so return -ERESTART
+                         * here let mdt trans stop callback know this. 
+                         */
+                        if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0) 
+                                rc = -ERESTART;
+                }
+#endif
         }else if (buf == NULL && buf_len == 0) {
                 rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
         }
index 4458f76..994e3ad 100644 (file)
@@ -1362,6 +1362,7 @@ int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_GETATTR_NAME:
         case MDS_STATFS:
         case MDS_READPAGE:
+        case MDS_WRITEPAGE:
         case MDS_REINT:
         case MDS_CLOSE:
         case MDS_DONE_WRITING:
index dc44085..265a610 100644 (file)
@@ -254,10 +254,18 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
-        ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
-
-        ma->ma_need = MA_INODE | MA_LOV;
+        if(reqbody->valid & OBD_MD_MEA) {
+                /* Assumption: MDT_MD size is enough for lmv size FIXME */
+                ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
+                ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD, 
+                                                             RCL_SERVER);
+                ma->ma_need = MA_INODE | MA_LMV;
+        } else {
+                ma->ma_need = MA_INODE | MA_LOV ;
+                ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
+                ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
+                                                             RCL_SERVER);
+        }
         rc = mo_attr_get(ctxt, next, ma);
         if (rc == -EREMOTE) {
                 /* This object is located on remote node.*/
@@ -285,6 +293,12 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                         else
                                 repbody->valid |= OBD_MD_FLEASIZE;
                 }
+                if (ma->ma_valid & MA_LMV) {
+                        LASSERT(S_ISDIR(la->la_mode));
+                        repbody->eadatasize = ma->ma_lmv_size;
+                        repbody->valid |= OBD_MD_FLDIREA;
+                        repbody->valid |= OBD_MD_MEA;
+                }
         } else if (S_ISLNK(la->la_mode) &&
                           reqbody->valid & OBD_MD_LINKNAME) {
                 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
@@ -589,7 +603,7 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
                 rc = mdo_name_insert(info->mti_ctxt,
                                      md_object_next(&object->mot_obj),
                                      ent->lde_name, lf, 0);
-                /* FIXME: add cross_flags */
+                CDEBUG(D_INFO, "insert name %s rc %d \n", ent->lde_name, rc);
                 if (rc) {
                         kunmap(page);
                         RETURN(rc);
@@ -618,7 +632,7 @@ static int mdt_writepage(struct mdt_thread_info *info)
         ENTRY;
 
         desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
-        if (desc)
+        if (!desc)
                 RETURN(-ENOMEM);
 
         /* allocate the page for the desc */
index 4278d8d..6ba8fed 100644 (file)
@@ -693,15 +693,19 @@ int mdt_open(struct mdt_thread_info *info)
 
         if (result == -ENOENT) {
                 /* not found and with MDS_OPEN_CREAT: let's create it */
-                mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                 result = mdo_create(info->mti_ctxt,
                                     mdt_object_child(parent),
                                     rr->rr_name,
                                     mdt_object_child(child),
                                     &info->mti_spec,
                                     &info->mti_attr);
-                if (result != 0)
+                if (result == -ERESTART)
                         GOTO(out_child, result);
+                else {        
+                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
+                        if (result != 0)
+                                GOTO(out_child, result);
+                }
                 created = 1;
         } else {
                 /* we have to get attr & lov ea for this object*/
index 2de1de6..317af8d 100644 (file)
@@ -33,6 +33,7 @@
 #include <obd.h>
 #endif
 #include <lprocfs_status.h>
+#include <lustre_idl.h>
 
 static int mea_last_char_hash(int count, char *name, int namelen)
 {
@@ -55,6 +56,19 @@ static int mea_all_chars_hash(int count, char *name, int namelen)
         return c;
 }
 
+/* This hash calculate method must be same as the lvar hash method */
+static int mea_hash_segment(int count, char *name, int namelen)
+{
+        __u32 result = 0;
+        __u32 hash_segment = MAX_HASH_SIZE / count;
+        
+        strncpy((void *)&result, name, min(namelen, (int)sizeof result));
+
+        result = (result << 1) & 0x7fffffff;
+
+        return result / hash_segment;
+}
+
 int raw_name2idx(int hashtype, int count, const char *name, int namelen)
 {
         unsigned int c = 0;
@@ -70,6 +84,9 @@ int raw_name2idx(int hashtype, int count, const char *name, int namelen)
                 case MEA_MAGIC_ALL_CHARS:
                         c = mea_all_chars_hash(count, (char *) name, namelen);
                         break;
+                case MEA_MAGIC_HASH_SEGMENT:
+                        c = mea_hash_segment(count, (char *) name, namelen);
+                        break;
                 default:
                         CERROR("unknown hash type 0x%x\n", hashtype);
         }
index be3d5c8..76c9ecc 100644 (file)
@@ -291,6 +291,7 @@ static const struct req_format *req_formats[] = {
         &RQF_MDS_CLOSE,
         &RQF_MDS_PIN,
         &RQF_MDS_READPAGE,
+        &RQF_MDS_WRITEPAGE,
         &RQF_MDS_DONE_WRITING
 };
 
@@ -612,6 +613,11 @@ const struct req_format RQF_MDS_READPAGE =
                         mdt_body_only, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_READPAGE);
 
+const struct req_format RQF_MDS_WRITEPAGE =
+        DEFINE_REQ_FMT0("MDS_WRITEPAGE",
+                        mdt_body_only, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_WRITEPAGE);
+
 #if !defined(__REQ_LAYOUT_USER__)
 
 int req_layout_init(void)