From: wang di Date: Thu, 3 Apr 2014 13:09:24 +0000 (-0700) Subject: LU-4690 lod: separate master object with master stripe X-Git-Tag: 2.5.59~34 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=60e07b972114df24105a3a1bfa7365892f72a4a7 LU-4690 lod: separate master object with master stripe Separate master stripe with master object, so 1. stripeEA only exists on master object. 2. sub-stripe object will be inserted into master object as sub-directory, and it can get the master object by "..". By this, it will remove those specilities for stripe0 in LMV and LOD. And also simplify LFSCK, i.e. consistency check would be easier. And also after this separation, LOD will know whether iterating the whole stripe or single stripe eaisly, i.e. for master_object, it will iterate the whole stripes, for sub_stripe, it will only iterate the single stripe. This patch also fixes a few things in osp orphan iteration, to make it work with remote dir entry iteration. When then master object becomes an orphan, we should mark all of its sub-stripes as dead object as well, otherwise client might still be able to create files under these stripes. A few fixes for striped directory layout lock: 1. stripe 0 should be locked as EX, same as other stripes. 2. Acquire the layout for directory, when it is being unliked. Signed-off-by: wang di Change-Id: I6212fb97a2360664b48e0a75424a89c857da2043 Reviewed-on: http://review.whamcloud.com/9511 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: John L. Hammond Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 4aa058b..0f02ba2 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -630,6 +630,9 @@ struct dt_index_operations { const struct dt_it *di, struct dt_rec *rec, __u32 attr); + int (*rec_size)(const struct lu_env *env, + const struct dt_it *di, + __u32 attr); __u64 (*store)(const struct lu_env *env, const struct dt_it *di); int (*load)(const struct lu_env *env, diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index b27d33f..cbf72c9 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2741,18 +2741,47 @@ struct lmv_desc { extern void lustre_swab_lmv_desc (struct lmv_desc *ld); -/* lmv structures */ -#define LMV_MAGIC_V1 0x0CD10CD0 /* normal stripe lmv magic */ -#define LMV_USER_MAGIC 0x0CD20CD0 /* default lmv magic*/ -#define LMV_MAGIC_MIGRATE 0x0CD30CD0 /* migrate stripe lmv magic */ +/* LMV layout EA, and it will be stored both in master and slave object */ +struct lmv_mds_md_v1 { + __u32 lmv_magic; + __u32 lmv_stripe_count; + __u32 lmv_master_mdt_index; /* On master object, it is master + * MDT index, on slave object, it + * is stripe index of the slave obj */ + __u32 lmv_hash_type; /* dir stripe policy, i.e. indicate + * which hash function to be used, + * Note: only lower 16 bits is being + * used for now. Higher 16 bits will + * be used to mark the object status, + * for example migrating or dead. */ + __u32 lmv_layout_version; /* Used for directory restriping */ + __u32 lmv_padding; + struct lu_fid lmv_master_fid; /* The FID of the master object, which + * is the namespace-visible dir FID */ + char lmv_pool_name[LOV_MAXPOOLNAME]; /* pool name */ + struct lu_fid lmv_stripe_fids[0]; /* FIDs for each stripe */ +}; + +#define LMV_MAGIC_V1 0x0CD20CD0 /* normal stripe lmv magic */ #define LMV_MAGIC LMV_MAGIC_V1 +/* #define LMV_USER_MAGIC 0x0CD30CD0 */ +#define LMV_MAGIC_STRIPE 0x0CD40CD0 /* magic for dir sub_stripe */ + +/* Right now only the lower part(0-16bits) of lmv_hash_type is being used, + * and the higher part will be the flag to indicate the status of object, + * for example the object is being migrated. And the hash function + * might be interpreted differently with different flags. */ enum lmv_hash_type { LMV_HASH_TYPE_ALL_CHARS = 1, LMV_HASH_TYPE_FNV_1A_64 = 2, - LMV_HASH_TYPE_MIGRATION = 3, }; +#define LMV_HASH_TYPE_MASK 0x0000ffff + +#define LMV_HASH_FLAG_MIGRATION 0x80000000 +#define LMV_HASH_FLAG_DEAD 0x40000000 + #define LMV_HASH_NAME_ALL_CHARS "all_char" #define LMV_HASH_NAME_FNV_1A_64 "fnv_1a_64" @@ -2784,18 +2813,6 @@ static inline __u64 lustre_hash_fnv_1a_64(const void *buf, size_t size) return hash; } -struct lmv_mds_md_v1 { - __u32 lmv_magic; - __u32 lmv_stripe_count; /* stripe count */ - __u32 lmv_master_mdt_index; /* master MDT index */ - __u32 lmv_hash_type; /* dir stripe policy, i.e. indicate - * which hash function to be used */ - __u32 lmv_layout_version; /* Used for directory restriping */ - __u32 lmv_padding; - char lmv_pool_name[LOV_MAXPOOLNAME]; /* pool name */ - struct lu_fid lmv_stripe_fids[0]; /* FIDs for each stripe */ -}; - union lmv_mds_md { __u32 lmv_magic; struct lmv_mds_md_v1 lmv_md_v1; @@ -2807,8 +2824,7 @@ extern void lustre_swab_lmv_mds_md(union lmv_mds_md *lmm); static inline int lmv_mds_md_size(int stripe_count, unsigned int lmm_magic) { switch (lmm_magic) { - case LMV_MAGIC_V1: - case LMV_MAGIC_MIGRATE: { + case LMV_MAGIC_V1:{ struct lmv_mds_md_v1 *lmm1; return sizeof(*lmm1) + stripe_count * @@ -2823,7 +2839,6 @@ static inline int lmv_mds_md_stripe_count_get(const union lmv_mds_md *lmm) { switch (le32_to_cpu(lmm->lmv_magic)) { case LMV_MAGIC_V1: - case LMV_MAGIC_MIGRATE: return le32_to_cpu(lmm->lmv_md_v1.lmv_stripe_count); case LMV_USER_MAGIC: return le32_to_cpu(lmm->lmv_user_md.lum_stripe_count); @@ -2837,7 +2852,6 @@ static inline int lmv_mds_md_stripe_count_set(union lmv_mds_md *lmm, { switch (le32_to_cpu(lmm->lmv_magic)) { case LMV_MAGIC_V1: - case LMV_MAGIC_MIGRATE: lmm->lmv_md_v1.lmv_stripe_count = cpu_to_le32(stripe_count); break; case LMV_USER_MAGIC: @@ -3719,6 +3733,7 @@ enum idx_info_flags { II_FL_VARKEY = 1 << 1, /* keys can be of variable size */ II_FL_VARREC = 1 << 2, /* records can be of variable size */ II_FL_NONUNQ = 1 << 3, /* index supports non-unique keys */ + II_FL_NOKEY = 1 << 4, /* client doesn't care about key */ }; #define LIP_MAGIC 0x8A6D6B6C @@ -3966,6 +3981,7 @@ enum update_type { OUT_INDEX_INSERT = 10, OUT_INDEX_DELETE = 11, OUT_WRITE = 12, + OUT_XATTR_DEL = 13, OUT_LAST }; diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index 59532f7..eadec9d 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -300,8 +300,7 @@ struct ost_id { #define LOV_USER_MAGIC_JOIN_V1 0x0BD20BD0 #define LOV_USER_MAGIC_V3 0x0BD30BD0 -#define LMV_MAGIC_V1 0x0CD10CD0 /*normal stripe lmv magic */ -#define LMV_USER_MAGIC 0x0CD20CD0 /*default lmv magic*/ +#define LMV_USER_MAGIC 0x0CD30CD0 /*default lmv magic*/ #define LOV_PATTERN_RAID0 0x001 #define LOV_PATTERN_RAID1 0x002 diff --git a/lustre/include/lustre_lmv.h b/lustre/include/lustre_lmv.h index fc15717..d65b6bd 100644 --- a/lustre/include/lustre_lmv.h +++ b/lustre/include/lustre_lmv.h @@ -48,10 +48,35 @@ struct lmv_stripe_md { __u32 lsm_md_layout_version; __u32 lsm_md_default_count; __u32 lsm_md_default_index; + struct lu_fid lsm_md_master_fid; char lsm_md_pool_name[LOV_MAXPOOLNAME]; struct lmv_oinfo lsm_md_oinfo[0]; }; +static inline bool +lsm_md_eq(const struct lmv_stripe_md *lsm1, const struct lmv_stripe_md *lsm2) +{ + int idx; + + if (lsm1->lsm_md_magic != lsm2->lsm_md_magic || + lsm1->lsm_md_stripe_count != lsm2->lsm_md_stripe_count || + lsm1->lsm_md_master_mdt_index != + lsm2->lsm_md_master_mdt_index || + lsm1->lsm_md_hash_type != lsm2->lsm_md_hash_type || + lsm1->lsm_md_layout_version != + lsm2->lsm_md_layout_version || + strcmp(lsm1->lsm_md_pool_name, + lsm2->lsm_md_pool_name) != 0) + return false; + + for (idx = 0; idx < lsm1->lsm_md_stripe_count; idx++) { + if (!lu_fid_eq(&lsm1->lsm_md_oinfo[idx].lmo_fid, + &lsm2->lsm_md_oinfo[idx].lmo_fid)) + return false; + } + + return true; +} union lmv_mds_md; int lmv_pack_md(union lmv_mds_md **lmmp, const struct lmv_stripe_md *lsm, @@ -98,10 +123,8 @@ static inline void lmv_cpu_to_le(union lmv_mds_md *lmv_dst, { switch (lmv_src->lmv_magic) { case LMV_MAGIC_V1: - case LMV_MAGIC_MIGRATE: { lmv1_cpu_to_le(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1); break; - } default: break; } @@ -112,10 +135,8 @@ static inline void lmv_le_to_cpu(union lmv_mds_md *lmv_dst, { switch (le32_to_cpu(lmv_src->lmv_magic)) { case LMV_MAGIC_V1: - case LMV_MAGIC_MIGRATE: { lmv1_le_to_cpu(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1); break; - } default: break; } diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index ba88aed..0b44b64 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -423,6 +423,14 @@ struct md_object { const struct md_dir_operations *mo_dir_ops; }; +/* Mark the object to be dead, and can not be accessed anymore. + * XXX, right now, it will only be used for striped directory to + * mark the slave stripes dead, when deleting master object. It will be + * stored in slave LMV EA (see lod_mark_dead_object), which is only + * temporary, and will be removed later when we have proper way to mark + * the dead object. */ +#define LUSTRE_SLAVE_DEAD_FL 0x80000000 + /** * seq-server site. */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index eee94a3..6edb1d3 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1058,9 +1058,9 @@ struct obd_ops { struct obd_export *exp, enum lu_cli_type type); int (*o_fid_fini)(struct obd_device *obd); - /* Allocate new fid according to passed @hint. */ - int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data); + /* Allocate new fid according to passed @hint. */ + int (*o_fid_alloc)(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data); /* * Object with @fid is getting deleted, we may want to do something diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 7464435..8db8904 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1043,18 +1043,19 @@ static inline int obd_fid_fini(struct obd_device *obd) RETURN(rc); } -static inline int obd_fid_alloc(struct obd_export *exp, +static inline int obd_fid_alloc(const struct lu_env *env, + struct obd_export *exp, struct lu_fid *fid, struct md_op_data *op_data) { - int rc; - ENTRY; + int rc; + ENTRY; - EXP_CHECK_DT_OP(exp, fid_alloc); - EXP_COUNTER_INCREMENT(exp, fid_alloc); + EXP_CHECK_DT_OP(exp, fid_alloc); + EXP_COUNTER_INCREMENT(exp, fid_alloc); - rc = OBP(exp->exp_obd, fid_alloc)(exp, fid, op_data); - RETURN(rc); + rc = OBP(exp->exp_obd, fid_alloc)(env, exp, fid, op_data); + RETURN(rc); } static inline int obd_ping(const struct lu_env *env, struct obd_export *exp) diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 0f811aa..54acb3d 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -590,9 +590,8 @@ int ll_dir_getstripe(struct inode *inode, void **plmm, int *plmm_size, if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm); break; - case LMV_MAGIC: - case LMV_MAGIC_MIGRATE: - if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) + case LMV_MAGIC_V1: + if (LMV_MAGIC != cpu_to_le32(LMV_MAGIC)) lustre_swab_lmv_mds_md((union lmv_mds_md *)lmm); break; case LMV_USER_MAGIC: @@ -1184,7 +1183,7 @@ lmv_out_free: rc = ll_dir_getstripe(inode, (void **)&lmm, &lmmsize, &request, valid); - if (rc != 0 && rc != -ENODATA) + if (rc != 0) GOTO(finish_req, rc); /* Get default LMV EA */ @@ -1201,38 +1200,29 @@ lmv_out_free: GOTO(finish_req, rc); } - /* Get normal LMV EA */ - if (rc == -ENODATA) { - stripe_count = 1; - } else { - LASSERT(lmm != NULL); - stripe_count = lmv_mds_md_stripe_count_get(lmm); - } - + stripe_count = lmv_mds_md_stripe_count_get(lmm); lum_size = lmv_user_md_size(stripe_count, LMV_MAGIC_V1); OBD_ALLOC(tmp, lum_size); if (tmp == NULL) GOTO(finish_req, rc = -ENOMEM); - tmp->lum_magic = LMV_MAGIC_V1; - tmp->lum_stripe_count = 1; mdt_index = ll_get_mdt_idx(inode); if (mdt_index < 0) GOTO(out_tmp, rc = -ENOMEM); + + tmp->lum_magic = LMV_MAGIC_V1; + tmp->lum_stripe_count = 0; tmp->lum_stripe_offset = mdt_index; - tmp->lum_objects[0].lum_mds = mdt_index; - tmp->lum_objects[0].lum_fid = *ll_inode2fid(inode); - for (i = 1; i < stripe_count; i++) { - struct lmv_mds_md_v1 *lmm1; - - lmm1 = &lmm->lmv_md_v1; - mdt_index = ll_get_mdt_idx_by_fid(sbi, - &lmm1->lmv_stripe_fids[i]); + for (i = 0; i < stripe_count; i++) { + struct lu_fid *fid; + + fid = &lmm->lmv_md_v1.lmv_stripe_fids[i]; + mdt_index = ll_get_mdt_idx_by_fid(sbi, fid); if (mdt_index < 0) GOTO(out_tmp, rc = mdt_index); tmp->lum_objects[i].lum_mds = mdt_index; - tmp->lum_objects[i].lum_fid = lmm1->lmv_stripe_fids[i]; + tmp->lum_objects[i].lum_fid = *fid; tmp->lum_stripe_count++; } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index ee8829d..c058a6d 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1250,9 +1250,9 @@ static struct inode *ll_iget_anon_dir(struct super_block *sb, ll_lli_init(lli); LASSERT(lsm != NULL); - /* master stripe FID */ - lli->lli_pfid = lsm->lsm_md_oinfo[0].lmo_fid; - CDEBUG(D_INODE, "lli %p master "DFID" slave "DFID"\n", + /* master object FID */ + lli->lli_pfid = body->fid1; + CDEBUG(D_INODE, "lli %p slave "DFID" master "DFID"\n", lli, PFID(fid), PFID(&lli->lli_pfid)); unlock_new_inode(inode); } @@ -1273,21 +1273,23 @@ static int ll_init_lsm_md(struct inode *inode, struct lustre_md *md) for (i = 0; i < lsm->lsm_md_stripe_count; i++) { fid = &lsm->lsm_md_oinfo[i].lmo_fid; LASSERT(lsm->lsm_md_oinfo[i].lmo_root == NULL); - if (i == 0) { + /* Unfortunately ll_iget will call ll_update_inode, + * where the initialization of slave inode is slightly + * different, so it reset lsm_md to NULL to avoid + * initializing lsm for slave inode. */ + /* For migrating inode, master stripe and master object will + * be same, so we only need assign this inode */ + if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION && i == 0) lsm->lsm_md_oinfo[i].lmo_root = inode; - } else { - /* Unfortunately ll_iget will call ll_update_inode, - * where the initialization of slave inode is slightly - * different, so it reset lsm_md to NULL to avoid - * initializing lsm for slave inode. */ + else lsm->lsm_md_oinfo[i].lmo_root = - ll_iget_anon_dir(inode->i_sb, fid, md); - if (IS_ERR(lsm->lsm_md_oinfo[i].lmo_root)) { - int rc = PTR_ERR(lsm->lsm_md_oinfo[i].lmo_root); + ll_iget_anon_dir(inode->i_sb, fid, md); - lsm->lsm_md_oinfo[i].lmo_root = NULL; - return rc; - } + if (IS_ERR(lsm->lsm_md_oinfo[i].lmo_root)) { + int rc = PTR_ERR(lsm->lsm_md_oinfo[i].lmo_root); + + lsm->lsm_md_oinfo[i].lmo_root = NULL; + return rc; } } @@ -1315,7 +1317,6 @@ static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md) { struct ll_inode_info *lli = ll_i2info(inode); struct lmv_stripe_md *lsm = md->lmv; - int idx; int rc; ENTRY; @@ -1327,7 +1328,8 @@ static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md) if (lsm == NULL) { if (lli->lli_lsm_md == NULL) { RETURN(0); - } else if (lli->lli_lsm_md->lsm_md_magic == LMV_MAGIC_MIGRATE) { + } else if (lli->lli_lsm_md->lsm_md_hash_type & + LMV_HASH_FLAG_MIGRATION) { /* migration is done, the temporay MIGRATE layout has * been removed */ CDEBUG(D_INODE, DFID" finish migration.\n", @@ -1359,39 +1361,40 @@ static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md) } /* Compare the old and new stripe information */ - if (!lli_lsm_md_eq(lli->lli_lsm_md, lsm)) { - CERROR("inode %p %lu mismatch\n" - " new(%p) vs lli_lsm_md(%p):\n" - " magic: %x %x\n" - " count: %x %x\n" - " master: %x %x\n" - " hash_type: %x %x\n" - " layout: %x %x\n" - " pool: %s %s\n", - inode, inode->i_ino, lsm, lli->lli_lsm_md, - lsm->lsm_md_magic, lli->lli_lsm_md->lsm_md_magic, + if (!lsm_md_eq(lli->lli_lsm_md, lsm)) { + struct lmv_stripe_md *old_lsm = lli->lli_lsm_md; + int idx; + + CERROR("%s: lmv layout mismatch "DFID"(%p)/"DFID"(%p)" + "magic:0x%x/0x%x stripe count: %d/%d master_mdt: %d/%d" + "hash_type:0x%x/0x%x layout: 0x%x/0x%x pool:%s/%s\n", + ll_get_fsname(inode->i_sb, NULL, 0), + PFID(&lsm->lsm_md_master_fid), lsm, + PFID(&old_lsm->lsm_md_master_fid), old_lsm, + lsm->lsm_md_magic, old_lsm->lsm_md_magic, lsm->lsm_md_stripe_count, - lli->lli_lsm_md->lsm_md_stripe_count, + old_lsm->lsm_md_stripe_count, lsm->lsm_md_master_mdt_index, - lli->lli_lsm_md->lsm_md_master_mdt_index, - lsm->lsm_md_hash_type, lli->lli_lsm_md->lsm_md_hash_type, + old_lsm->lsm_md_master_mdt_index, + lsm->lsm_md_hash_type, old_lsm->lsm_md_hash_type, lsm->lsm_md_layout_version, - lli->lli_lsm_md->lsm_md_layout_version, + old_lsm->lsm_md_layout_version, lsm->lsm_md_pool_name, - lli->lli_lsm_md->lsm_md_pool_name); - RETURN(-EIO); - } + old_lsm->lsm_md_pool_name); + + for (idx = 0; idx < old_lsm->lsm_md_stripe_count; idx++) { + CERROR("%s: sub FIDs in old lsm idx %d, old: "DFID"\n", + ll_get_fsname(inode->i_sb, NULL, 0), idx, + PFID(&old_lsm->lsm_md_oinfo[idx].lmo_fid)); + } - for (idx = 0; idx < lli->lli_lsm_md->lsm_md_stripe_count; idx++) { - if (!lu_fid_eq(&lli->lli_lsm_md->lsm_md_oinfo[idx].lmo_fid, - &lsm->lsm_md_oinfo[idx].lmo_fid)) { - CERROR("%s: FID in lsm mismatch idx %d, old: "DFID - "new:"DFID"\n", + for (idx = 0; idx < lsm->lsm_md_stripe_count; idx++) { + CERROR("%s: sub FIDs in new lsm idx %d, new: "DFID"\n", ll_get_fsname(inode->i_sb, NULL, 0), idx, - PFID(&lli->lli_lsm_md->lsm_md_oinfo[idx].lmo_fid), PFID(&lsm->lsm_md_oinfo[idx].lmo_fid)); - RETURN(-EIO); } + + RETURN(-EIO); } rc = md_update_lsm_md(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md, diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 09ee3b2..08e0297 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -184,9 +184,6 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody, * revalidate slaves has some problems, temporarily return, * we may not need that */ - if (lsm->lsm_md_stripe_count <= 1) - RETURN(0); - OBD_ALLOC_PTR(op_data); if (op_data == NULL) RETURN(-ENOMEM); @@ -205,14 +202,6 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody, fid = lsm->lsm_md_oinfo[i].lmo_fid; inode = lsm->lsm_md_oinfo[i].lmo_root; - if (i == 0) { - if (mbody != NULL) { - body = mbody; - goto update; - } else { - goto release_lock; - } - } /* * Prepare op_data for revalidating. Note that @fid2 shluld be @@ -246,7 +235,6 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody, body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); LASSERT(body != NULL); -update: if (unlikely(body->nlink < 2)) { CERROR("%s: nlink %d < 2 corrupt stripe %d "DFID ":" DFID"\n", obd->obd_name, body->nlink, @@ -265,9 +253,6 @@ update: GOTO(cleanup, rc = -EIO); } - if (i != 0) - md_set_lock_data(tgt->ltd_exp, &lockh->cookie, - inode, NULL); i_size_write(inode, body->size); set_nlink(inode, body->nlink); @@ -278,7 +263,9 @@ update: if (req != NULL) ptlrpc_req_finished(req); } -release_lock: + + md_set_lock_data(tgt->ltd_exp, &lockh->cookie, inode, NULL); + size += i_size_read(inode); if (i != 0) @@ -385,7 +372,7 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data, * fid and setup FLD for it. */ op_data->op_fid3 = op_data->op_fid2; - rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc != 0) RETURN(rc); } @@ -475,8 +462,8 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } RETURN(rc); - } else if (it_disposition(it, DISP_LOOKUP_NEG) && - lsm != NULL && lsm->lsm_md_magic == LMV_MAGIC_MIGRATE) { + } else if (it_disposition(it, DISP_LOOKUP_NEG) && lsm != NULL && + lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION) { /* For migrating directory, if it can not find the child in * the source directory(master stripe), try the targeting * directory(stripe 1) */ diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index 622b2e1..5cadf54 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -75,8 +75,8 @@ int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid, mdsno_t *mds); int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, mdsno_t mds); -int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data); +int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data); int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, const union lmv_mds_md *lmm, int stripe_count); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index ad76048..b89a40a 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -90,39 +90,35 @@ lmv_hash_fnv1a(unsigned int count, const char *name, int namelen) return hash; } -int lmv_name_to_stripe_index(enum lmv_hash_type hashtype, - unsigned int max_mdt_index, +int lmv_name_to_stripe_index(__u32 lmv_hash_type, unsigned int stripe_count, const char *name, int namelen) { int idx; + __u32 hash_type = lmv_hash_type & LMV_HASH_TYPE_MASK; LASSERT(namelen > 0); - if (max_mdt_index <= 1) + if (stripe_count <= 1) return 0; - switch (hashtype) { + /* for migrating object, always start from 0 stripe */ + if (lmv_hash_type & LMV_HASH_FLAG_MIGRATION) + return 0; + + switch (hash_type) { case LMV_HASH_TYPE_ALL_CHARS: - idx = lmv_hash_all_chars(max_mdt_index, name, namelen); + idx = lmv_hash_all_chars(stripe_count, name, namelen); break; case LMV_HASH_TYPE_FNV_1A_64: - idx = lmv_hash_fnv1a(max_mdt_index, name, namelen); + idx = lmv_hash_fnv1a(stripe_count, name, namelen); break; - /* LMV_HASH_TYPE_MIGRATION means the file is being migrated, - * and the file should be accessed by client, except for - * lookup(see lmv_intent_lookup), return -EACCES here */ - case LMV_HASH_TYPE_MIGRATION: - CERROR("%.*s is being migrated: rc = %d\n", namelen, - name, -EACCES); - return -EACCES; default: - CERROR("Unknown hash type 0x%x\n", hashtype); + CERROR("Unknown hash type 0x%x\n", hash_type); return -EINVAL; } CDEBUG(D_INFO, "name %.*s hash_type %d idx %d\n", namelen, name, - hashtype, idx); + hash_type, idx); - LASSERT(idx < max_mdt_index); return idx; } @@ -1371,14 +1367,14 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL) GOTO(out, rc = -ENODEV); - /* - * Asking underlaying tgt layer to allocate new fid. - */ - rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL); - if (rc > 0) { - LASSERT(fid_is_sane(fid)); - rc = 0; - } + /* + * Asking underlaying tgt layer to allocate new fid. + */ + rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL); + if (rc > 0) { + LASSERT(fid_is_sane(fid)); + rc = 0; + } EXIT; out: @@ -1386,8 +1382,8 @@ out: return rc; } -int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; @@ -1788,9 +1784,7 @@ struct lmv_tgt_desc struct lmv_stripe_md *lsm = op_data->op_mea1; struct lmv_tgt_desc *tgt; - if (lsm == NULL || lsm->lsm_md_stripe_count <= 1 || - op_data->op_namelen == 0 || - lsm->lsm_md_magic == LMV_MAGIC_MIGRATE) { + if (lsm == NULL || op_data->op_namelen == 0) { tgt = lmv_find_target(lmv, fid); if (IS_ERR(tgt)) return tgt; @@ -1830,7 +1824,7 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), op_data->op_mds); - rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc) RETURN(rc); @@ -2155,7 +2149,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, if (op_data->op_cli_flags & CLI_MIGRATE) { LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n", PFID(&op_data->op_fid3)); - rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc) RETURN(rc); src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid3); @@ -2511,8 +2505,7 @@ retry: RETURN(PTR_ERR(tgt)); /* For striped dir, we need to locate the parent as well */ - if (op_data->op_mea1 != NULL && - op_data->op_mea1->lsm_md_stripe_count > 1) { + if (op_data->op_mea1 != NULL) { struct lmv_tgt_desc *tmp; LASSERT(op_data->op_name != NULL && @@ -2831,9 +2824,13 @@ static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm, lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index); lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type); lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version); + fid_le_to_cpu(&lsm->lsm_md_master_fid, &lmm1->lmv_master_fid); cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name, sizeof(lsm->lsm_md_pool_name)); + if (!fid_is_sane(&lsm->lsm_md_master_fid)) + RETURN(-EPROTO); + if (cplen >= sizeof(lsm->lsm_md_pool_name)) RETURN(-E2BIG); @@ -2873,8 +2870,12 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, if (lsm != NULL && lmm == NULL) { #ifdef __KERNEL__ int i; - for (i = 1; i < lsm->lsm_md_stripe_count; i++) { - if (lsm->lsm_md_oinfo[i].lmo_root != NULL) + for (i = 0; i < lsm->lsm_md_stripe_count; i++) { + /* For migrating inode, the master stripe and master + * object will be the same, so do not need iput, see + * ll_update_lsm_md */ + if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION && + i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL) iput(lsm->lsm_md_oinfo[i].lmo_root); } #endif @@ -2897,7 +2898,6 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, /* Unpack memmd */ if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 && - le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_MIGRATE && le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) { CERROR("%s: invalid lmv magic %x: rc = %d\n", exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic), @@ -2905,8 +2905,7 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, RETURN(-EIO); } - if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1 || - le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_MIGRATE) + if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1) lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm)); else /** @@ -2926,7 +2925,6 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, switch (le32_to_cpu(lmm->lmv_magic)) { case LMV_MAGIC_V1: - case LMV_MAGIC_MIGRATE: rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1); break; default: @@ -3299,9 +3297,6 @@ int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp, int lmv_update_lsm_md(struct obd_export *exp, struct lmv_stripe_md *lsm, struct mdt_body *body, ldlm_blocking_callback cb_blocking) { - if (lsm->lsm_md_stripe_count <= 1) - return 0; - return lmv_revalidate_slaves(exp, body, lsm, cb_blocking, 0); } diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index c3b1bc3..4eab8cd 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -260,6 +260,9 @@ struct lod_object { struct lod_it { struct dt_object *lit_obj; /* object from the layer below */ + /* stripe offset of iteration */ + __u32 lit_stripe_index; + __u32 lit_attr; struct dt_it *lit_it; /* iterator from the layer below */ }; @@ -275,6 +278,11 @@ struct lod_thread_info { struct lu_attr lti_attr; struct lod_it lti_it; struct ldlm_res_id lti_res_id; + + /* used to hold lu_dirent, NAME_MAX + sizeof(struct lu_dirent) */ + char lti_key[NAME_MAX + sizeof(struct lu_dirent)]; + + struct dt_object_format lti_format; }; extern const struct lu_device_operations lod_lu_ops; @@ -368,7 +376,8 @@ int lod_del_device(const struct lu_env *env, struct lod_device *lod, unsigned gen, bool for_ost); int lod_fini_tgt(const struct lu_env *env, struct lod_device *lod, struct lod_tgt_descs *ltd, bool for_ost); -int lod_load_striping(const struct lu_env *env, struct lod_object *mo); +int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo); +int lod_load_striping(const struct lu_env *env, struct lod_object *lo); int lod_get_ea(const struct lu_env *env, struct lod_object *lo, const char *name); diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index 937e7ab..e3cd87a 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -836,28 +836,22 @@ out: RETURN(rc); } -/* - * Load and parse striping information, create in-core representation for the - * stripes - */ -int lod_load_striping(const struct lu_env *env, struct lod_object *lo) +int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) { struct lod_thread_info *info = lod_env_info(env); struct dt_object *next = dt_object_child(&lo->ldo_obj); int rc = 0; ENTRY; - /* - * currently this code is supposed to be called from declaration - * phase only, thus the object is not expected to be locked by caller - */ - dt_write_lock(env, next, 0); /* already initialized? */ if (lo->ldo_stripe != NULL) GOTO(out, rc = 0); + if (!dt_object_exists(next)) + GOTO(out, rc = 0); + /* Do not load stripe for slaves of striped dir */ - if (!dt_object_exists(next) || lo->ldo_dir_slave_stripe) + if (lo->ldo_dir_slave_stripe) GOTO(out, rc = 0); /* only regular files can be striped */ @@ -885,10 +879,26 @@ int lod_load_striping(const struct lu_env *env, struct lod_object *lo) rc = lod_parse_dir_striping(env, lo, &info->lti_buf); } out: - dt_write_unlock(env, next); RETURN(rc); } +/** + * Load and parse striping information, create in-core representation for the + * stripes + **/ +int lod_load_striping(const struct lu_env *env, struct lod_object *lo) +{ + struct dt_object *next = dt_object_child(&lo->ldo_obj); + int rc = 0; + + /* currently this code is supposed to be called from declaration + * phase only, thus the object is not expected to be locked by caller */ + dt_write_lock(env, next, 0); + rc = lod_load_striping_locked(env, lo); + dt_write_unlock(env, next); + return rc; +} + int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, int specific) { diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 410b368..8b51dd8 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -124,10 +124,10 @@ static struct dt_it *lod_it_init(const struct lu_env *env, } #define LOD_CHECK_IT(env, it) \ -{ \ +do { \ LASSERT((it)->lit_obj != NULL); \ LASSERT((it)->lit_it != NULL); \ -} while(0) +} while (0) void lod_it_fini(const struct lu_env *env, struct dt_it *di) { @@ -188,7 +188,18 @@ int lod_it_rec(const struct lu_env *env, const struct dt_it *di, const struct lod_it *it = (const struct lod_it *)di; LOD_CHECK_IT(env, it); - return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); + return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, + attr); +} + +int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di, + __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it, + attr); } __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) @@ -208,12 +219,13 @@ int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) } int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void* key_rec) + void *key_rec) { const struct lod_it *it = (const struct lod_it *)di; LOD_CHECK_IT(env, it); - return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec); + return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, + key_rec); } static struct dt_index_operations lod_index_ops = { @@ -231,12 +243,484 @@ static struct dt_index_operations lod_index_ops = { .key = lod_it_key, .key_size = lod_it_key_size, .rec = lod_it_rec, + .rec_size = lod_it_rec_size, .store = lod_it_store, .load = lod_it_load, .key_rec = lod_it_key_rec, } }; +/** + * Implementation of dt_index_operations:: dio_it.init + * + * This function is to initialize the iterator for striped directory, + * basically these lod_striped_it_xxx will just locate the stripe + * and call the correspondent api of its next lower layer. + * + * \param[in] env execution environment. + * \param[in] dt the striped directory object to be iterated. + * \param[in] attr the attribute of iterator, mostly used to indicate + * the entry attribute in the object to be iterated. + * \param[in] capa capability(useless in current implementation) + * + * \retval initialized iterator(dt_it) if successful initialize the + * iteration. lit_stripe_index will be used to indicate the + * current iterate position among stripes. + * \retval ERR pointer if initialization is failed. + */ +static struct dt_it *lod_striped_it_init(const struct lu_env *env, + struct dt_object *dt, __u32 attr, + struct lustre_capa *capa) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next; + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + ENTRY; + + LASSERT(lo->ldo_stripenr > 0); + next = lo->ldo_stripe[0]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + if (IS_ERR(it_next)) + return it_next; + + /* currently we do not use more than one iterator per thread + * so we store it in thread info. if at some point we need + * more active iterators in a single thread, we can allocate + * additional ones */ + LASSERT(it->lit_obj == NULL); + + it->lit_stripe_index = 0; + it->lit_attr = attr; + it->lit_it = it_next; + it->lit_obj = dt; + + return (struct dt_it *)it; +} + +#define LOD_CHECK_STRIPED_IT(env, it, lo) \ +do { \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ + LASSERT((lo)->ldo_stripenr > 0); \ + LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \ +} while (0) + +/** + * Implementation of dt_index_operations:: dio_it.fini + * + * This function is to finish the iterator for striped directory. + * + * \param[in] env execution environment. + * \param[in] di the iterator for the striped directory + * + */ +static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + next->do_index_ops->dio_it.fini(env, it->lit_it); + + /* the iterator not in use any more */ + it->lit_obj = NULL; + it->lit_it = NULL; + it->lit_stripe_index = 0; +} + +/** + * Implementation of dt_index_operations:: dio_it.get + * + * This function is to position the iterator with given key + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] key the key the iterator will be positioned. + * + * \retval 0 if successfully position iterator by the key. + * \retval negative error if position is failed. + */ +static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + ENTRY; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.get(env, it->lit_it, key); +} + +/** + * Implementation of dt_index_operations:: dio_it.put + * + * This function is supposed to be the pair of it_get, but currently do + * nothing. see (osd_it_ea_put or osd_index_it_put) + */ +static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.put(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.next + * + * This function is to position the iterator to the next entry, if current + * stripe is finished by checking the return value of next() in current + * stripe. it will go to next stripe. In the mean time, the sub-iterator + * for next stripe needs to be initialized. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval 0 if successfully position iterator to the next entry. + * \retval negative error if position is failed. + */ +static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + struct dt_it *it_next; + int rc; + ENTRY; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); +again: + rc = next->do_index_ops->dio_it.next(env, it->lit_it); + if (rc < 0) + RETURN(rc); + + if (rc == 0 && it->lit_stripe_index == 0) + RETURN(rc); + + if (rc == 0 && it->lit_stripe_index > 0) { + struct lu_dirent *ent; + + ent = (struct lu_dirent *)lod_env_info(env)->lti_key; + + rc = next->do_index_ops->dio_it.rec(env, it->lit_it, + (struct dt_rec *)ent, + it->lit_attr); + if (rc != 0) + RETURN(rc); + + /* skip . and .. for slave stripe */ + if ((strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 1) || + (strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 2)) + goto again; + + RETURN(rc); + } + + /* go to next stripe */ + if (it->lit_stripe_index + 1 >= lo->ldo_stripenr) + RETURN(1); + + it->lit_stripe_index++; + + next->do_index_ops->dio_it.put(env, it->lit_it); + next->do_index_ops->dio_it.fini(env, it->lit_it); + + rc = next->do_ops->do_index_try(env, next, &dt_directory_features); + if (rc != 0) + RETURN(rc); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr, + BYPASS_CAPA); + if (!IS_ERR(it_next)) { + it->lit_it = it_next; + goto again; + } else { + rc = PTR_ERR(it_next); + } + + RETURN(rc); +} + +/** + * Implementation of dt_index_operations:: dio_it.key + * + * This function is to get the key of the iterator at current position. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval key(dt_key) if successfully get the key. + * \retval negative error if can not get the key. + */ +static struct dt_key *lod_striped_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.key(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.key_size + * + * This function is to get the key_size of current key. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval key_size if successfully get the key_size. + * \retval negative error if can not get the key_size. + */ +static int lod_striped_it_key_size(const struct lu_env *env, + const struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.key_size(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.rec + * + * This function is to get the record at current position. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] attr the attribute of iterator, mostly used to indicate + * the entry attribute in the object to be iterated. + * \param[out] rec hold the return record. + * + * \retval 0 if successfully get the entry. + * \retval negative error if can not get entry. + */ +static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); +} + +/** + * Implementation of dt_index_operations:: dio_it.rec_size + * + * This function is to get the record_size at current record. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] attr the attribute of iterator, mostly used to indicate + * the entry attribute in the object to be iterated. + * + * \retval rec_size if successfully get the entry size. + * \retval negative error if can not get entry size. + */ +static int lod_striped_it_rec_size(const struct lu_env *env, + const struct dt_it *di, __u32 attr) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr); +} + +/** + * Implementation of dt_index_operations:: dio_it.store + * + * This function will a cookie for current position of the iterator head, + * so that user can use this cookie to load/start the iterator next time. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval the cookie. + */ +static __u64 lod_striped_it_store(const struct lu_env *env, + const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.store(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.load + * + * This function will position the iterator with the given hash(usually + * get from store), + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] hash the given hash. + * + * \retval >0 if successfuly load the iterator to the given position. + * \retval <0 if load is failed. + */ +static int lod_striped_it_load(const struct lu_env *env, + const struct dt_it *di, __u64 hash) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.load(env, it->lit_it, hash); +} + +static struct dt_index_operations lod_striped_index_ops = { + .dio_lookup = lod_index_lookup, + .dio_declare_insert = lod_declare_index_insert, + .dio_insert = lod_index_insert, + .dio_declare_delete = lod_declare_index_delete, + .dio_delete = lod_index_delete, + .dio_it = { + .init = lod_striped_it_init, + .fini = lod_striped_it_fini, + .get = lod_striped_it_get, + .put = lod_striped_it_put, + .next = lod_striped_it_next, + .key = lod_striped_it_key, + .key_size = lod_striped_it_key_size, + .rec = lod_striped_it_rec, + .rec_size = lod_striped_it_rec_size, + .store = lod_striped_it_store, + .load = lod_striped_it_load, + } +}; + +/** + * Implementation of dt_object_operations:: do_index_try + * + * This function will try to initialize the index api pointer for the + * given object, usually it the entry point of the index api. i.e. + * the index object should be initialized in index_try, then start + * using index api. For striped directory, it will try to initialize + * all of its sub_stripes. + * + * \param[in] env execution environment. + * \param[in] dt the index object to be initialized. + * \param[in] feat the features of this object, for example fixed or + * variable key size etc. + * + * \retval >0 if the initialization is successful. + * \retval <0 if the initialization is failed. + */ +static int lod_index_try(const struct lu_env *env, struct dt_object *dt, + const struct dt_index_features *feat) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; + + LASSERT(next->do_ops); + LASSERT(next->do_ops->do_index_try); + + rc = lod_load_striping_locked(env, lo); + if (rc != 0) + RETURN(rc); + + rc = next->do_ops->do_index_try(env, next, feat); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr > 0) { + int i; + + for (i = 0; i < lo->ldo_stripenr; i++) { + if (dt_object_exists(lo->ldo_stripe[i]) == 0) + continue; + rc = lo->ldo_stripe[i]->do_ops->do_index_try(env, + lo->ldo_stripe[i], feat); + if (rc != 0) + RETURN(rc); + } + dt->do_index_ops = &lod_striped_index_ops; + } else { + dt->do_index_ops = &lod_index_ops; + } + + RETURN(rc); +} + static void lod_object_read_lock(const struct lu_env *env, struct dt_object *dt, unsigned role) { @@ -272,7 +756,118 @@ static int lod_attr_get(const struct lu_env *env, struct lu_attr *attr, struct lustre_capa *capa) { - return dt_attr_get(env, dt_object_child(dt), attr, capa); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; + + rc = dt_attr_get(env, dt_object_child(dt), attr, capa); + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0) + RETURN(rc); + + rc = lod_load_striping_locked(env, lo); + if (rc) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(rc); + + attr->la_nlink = 2; + attr->la_size = 0; + for (i = 0; i < lo->ldo_stripenr; i++) { + struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr; + + LASSERT(lo->ldo_stripe[i]); + if (dt_object_exists(lo->ldo_stripe[i])) + continue; + + rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa); + if (rc != 0) + break; + + /* -2 for . and .. on each stripe */ + if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK) + attr->la_nlink += sub_attr->la_nlink - 2; + if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE) + attr->la_size += sub_attr->la_size; + + if (sub_attr->la_valid & LA_ATIME && + attr->la_valid & LA_ATIME && + attr->la_atime < sub_attr->la_atime) + attr->la_atime = sub_attr->la_atime; + + if (sub_attr->la_valid & LA_CTIME && + attr->la_valid & LA_CTIME && + attr->la_ctime < sub_attr->la_ctime) + attr->la_ctime = sub_attr->la_ctime; + + if (sub_attr->la_valid & LA_MTIME && + attr->la_valid & LA_MTIME && + attr->la_mtime < sub_attr->la_mtime) + attr->la_mtime = sub_attr->la_mtime; + } + + CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n", + PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr, + attr->la_nlink, attr->la_size); + + RETURN(rc); +} + +/** + * Mark all of sub-stripes dead of the striped directory. + **/ +static int lod_mark_dead_object(const struct lu_env *env, + struct dt_object *dt, + struct thandle *handle, + bool declare) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct lmv_mds_md_v1 *lmv; + __u32 dead_hash_type; + int rc; + int i; + + ENTRY; + + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(0); + + rc = lod_load_striping_locked(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(0); + + rc = lod_get_lmv_ea(env, lo); + if (rc <= 0) + RETURN(rc); + + lmv = lod_env_info(env)->lti_ea_store; + lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); + dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD; + lmv->lmv_hash_type = cpu_to_le32(dead_hash_type); + for (i = 0; i < lo->ldo_stripenr; i++) { + struct lu_buf buf; + + lmv->lmv_master_mdt_index = i; + buf.lb_buf = lmv; + buf.lb_len = sizeof(*lmv); + if (declare) { + rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, handle); + } else { + rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf, + XATTR_NAME_LMV, LU_XATTR_REPLACE, + handle, BYPASS_CAPA); + } + if (rc != 0) + break; + } + + RETURN(rc); } static int lod_declare_attr_set(const struct lu_env *env, @@ -285,6 +880,13 @@ static int lod_declare_attr_set(const struct lu_env *env, int rc, i; ENTRY; + /* Set dead object on all other stripes */ + if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && + attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { + rc = lod_mark_dead_object(env, dt, handle, true); + RETURN(rc); + } + /* * declare setattr on the local object */ @@ -322,28 +924,6 @@ static int lod_declare_attr_set(const struct lu_env *env, if (lo->ldo_stripenr == 0) RETURN(0); - if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) { - struct lu_attr *la = &lod_env_info(env)->lti_attr; - bool setattr_time = false; - - rc = dt_attr_get(env, dt_object_child(dt), la, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - - /* If it will only setattr time, it will only set - * time < current_time */ - if ((attr->la_valid & LA_ATIME && - attr->la_atime < la->la_atime) || - (attr->la_valid & LA_CTIME && - attr->la_ctime < la->la_ctime) || - (attr->la_valid & LA_MTIME && - attr->la_mtime < la->la_mtime)) - setattr_time = true; - - if (!setattr_time) - RETURN(0); - } /* * if object is striped declare changes on the stripes */ @@ -384,11 +964,18 @@ static int lod_attr_set(const struct lu_env *env, struct thandle *handle, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc, i; + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc, i; ENTRY; + /* Set dead object on all other stripes */ + if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && + attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { + rc = lod_mark_dead_object(env, dt, handle, false); + RETURN(rc); + } + /* * apply changes to the local object */ @@ -411,35 +998,14 @@ static int lod_attr_set(const struct lu_env *env, if (lo->ldo_stripenr == 0) RETURN(0); - if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) { - struct lu_attr *la = &lod_env_info(env)->lti_attr; - bool setattr_time = false; - - rc = dt_attr_get(env, dt_object_child(dt), la, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - - /* If it will only setattr time, it will only set - * time < current_time */ - if ((attr->la_valid & LA_ATIME && - attr->la_atime < la->la_atime) || - (attr->la_valid & LA_CTIME && - attr->la_ctime < la->la_ctime) || - (attr->la_valid & LA_MTIME && - attr->la_mtime < la->la_mtime)) - setattr_time = true; - - if (!setattr_time) - RETURN(0); - } - /* * if object is striped, apply changes to all the stripes */ LASSERT(lo->ldo_stripe); for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); + if (dt_object_exists(lo->ldo_stripe[i]) == 0) + continue; rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa); if (rc) { CERROR("failed declaration: %d\n", rc); @@ -548,10 +1114,6 @@ static int lod_verify_md_striping(struct lod_device *lod, if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0)) GOTO(out, rc = -EINVAL); - - if (unlikely(le32_to_cpu(lum->lum_stripe_count) > - lod->lod_remote_mdt_count + 1)) - GOTO(out, rc = -EINVAL); out: if (rc != 0) CERROR("%s: invalid lmv_user_md: magic = %x, " @@ -562,6 +1124,19 @@ out: return rc; } +/** + * Master LMVEA will be same as slave LMVEA, except + * 1. different magic + * 2. No lmv_stripe_fids on slave + * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index. + */ +static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv, + const struct lmv_mds_md_v1 *master_lmv) +{ + *slave_lmv = *master_lmv; + slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); +} + int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, struct lu_buf *lmv_buf) { @@ -579,7 +1154,7 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, LASSERT(lo->ldo_dir_striped != 0); LASSERT(lo->ldo_stripenr > 0); - stripe_count = lo->ldo_stripenr + 1; + stripe_count = lo->ldo_stripenr; lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC); if (info->lti_ea_store_size < lmm_size) { rc = lod_ea_store_resize(info, lmm_size); @@ -597,13 +1172,13 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, RETURN(rc); lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx); - fid_cpu_to_le(&lmm1->lmv_stripe_fids[0], lu_object_fid(&dt->do_lu)); + fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu)); for (i = 0; i < lo->ldo_stripenr; i++) { struct dt_object *dto; dto = lo->ldo_stripe[i]; LASSERT(dto != NULL); - fid_cpu_to_le(&lmm1->lmv_stripe_fids[i + 1], + fid_cpu_to_le(&lmm1->lmv_stripe_fids[i], lu_object_fid(&dto->do_lu)); } @@ -628,56 +1203,63 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, int rc = 0; ENTRY; - if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_MIGRATE) + if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) RETURN(0); + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) { + lo->ldo_dir_slave_stripe = 1; + RETURN(0); + } + if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) RETURN(-EINVAL); if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1) RETURN(0); - fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[0]); - /* Do not load striping information for slave inode */ - if (!lu_fid_eq(fid, lu_object_fid(&lo->ldo_obj.do_lu))) { - lo->ldo_dir_slave_stripe = 1; - RETURN(0); - } - LASSERT(lo->ldo_stripe == NULL); OBD_ALLOC(stripe, sizeof(stripe[0]) * - (le32_to_cpu(lmv1->lmv_stripe_count) - 1)); + (le32_to_cpu(lmv1->lmv_stripe_count))); if (stripe == NULL) RETURN(-ENOMEM); - /* skip master stripe */ - for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) { - struct lod_tgt_desc *tgt; - int idx; - int type = LU_SEQ_RANGE_ANY; + for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) { + struct dt_device *tgt_dt; struct dt_object *dto; + int type = LU_SEQ_RANGE_ANY; + __u32 idx; fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]); + if (!fid_is_sane(fid)) + GOTO(out, rc = -ESTALE); + rc = lod_fld_lookup(env, lod, fid, &idx, &type); if (rc != 0) GOTO(out, rc); - tgt = LTD_TGT(ltd, idx); - if (tgt == NULL) - GOTO(out, rc = -ESTALE); + if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { + tgt_dt = lod->lod_child; + } else { + struct lod_tgt_desc *tgt; - dto = dt_locate_at(env, tgt->ltd_tgt, fid, + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + GOTO(out, rc = -ESTALE); + tgt_dt = tgt->ltd_tgt; + } + + dto = dt_locate_at(env, tgt_dt, fid, lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, NULL); if (IS_ERR(dto)) GOTO(out, rc = PTR_ERR(dto)); - stripe[i - 1] = dto; + stripe[i] = dto; } out: lo->ldo_stripe = stripe; - lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count) - 1; - lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count) - 1; + lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); if (rc != 0) lod_object_free_striping(env, lo); @@ -688,13 +1270,18 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, const struct lmv_user_md_v1 *lum, + struct dt_object_format *dof, struct thandle *th) { struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); struct dt_object **stripe; struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; int stripe_count; int *idx_array; int rc = 0; @@ -706,9 +1293,13 @@ static int lod_prep_md_striped_create(const struct lu_env *env, LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0); - /* Do not need allocated master stripe */ stripe_count = le32_to_cpu(lum->lum_stripe_count); - OBD_ALLOC(stripe, sizeof(stripe[0]) * (stripe_count - 1)); + + /* shrink the stripe_count to the avaible MDT count */ + if (stripe_count > lod->lod_remote_mdt_count + 1) + stripe_count = lod->lod_remote_mdt_count + 1; + + OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); if (stripe == NULL) RETURN(-ENOMEM); @@ -716,13 +1307,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (idx_array == NULL) GOTO(out_free, rc = -ENOMEM); - idx_array[0] = le32_to_cpu(lum->lum_stripe_offset); - for (i = 1; i < stripe_count; i++) { - struct lod_tgt_desc *tgt; + for (i = 0; i < stripe_count; i++) { + struct lod_tgt_desc *tgt = NULL; struct dt_object *dto; - struct lu_fid fid; + struct lu_fid fid = { 0 }; int idx; struct lu_object_conf conf = { 0 }; + struct dt_device *tgt_dt = NULL; + + if (i == 0) { + /* Right now, master stripe and master object are + * on the same MDT */ + idx = le32_to_cpu(lum->lum_stripe_offset); + rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, + NULL); + if (rc < 0) + GOTO(out_put, rc); + tgt_dt = lod->lod_child; + goto next; + } idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1); @@ -735,7 +1338,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, " allocated %d, last allocated %d\n", idx, lod->lod_remote_mdt_count, i, idx_array[i - 1]); - /* Find next avaible target */ + /* Find next available target */ if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) continue; @@ -751,6 +1354,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (already_allocated) continue; + /* check the status of the OSP */ + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + continue; + + tgt_dt = tgt->ltd_tgt; + rc = dt_statfs(env, tgt_dt, NULL); + if (rc) { + /* this OSP doesn't feel well */ + rc = 0; + continue; + } + + rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL); + if (rc < 0) { + rc = 0; + continue; + } + break; } @@ -765,27 +1387,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, " allocated %d, last allocated %d\n", idx, lod->lod_remote_mdt_count, i, idx_array[i - 1]); - tgt = LTD_TGT(ltd, idx); - LASSERT(tgt != NULL); - - rc = obd_fid_alloc(tgt->ltd_exp, &fid, NULL); - if (rc < 0) - GOTO(out_put, rc); - rc = 0; - +next: + /* tgt_dt and fid must be ready after search avaible OSP + * in the above loop */ + LASSERT(tgt_dt != NULL); + LASSERT(fid_is_sane(&fid)); conf.loc_flags = LOC_F_NEW; - dto = dt_locate_at(env, tgt->ltd_tgt, &fid, - dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf); + dto = dt_locate_at(env, tgt_dt, &fid, + dt->do_lu.lo_dev->ld_site->ls_top_dev, + &conf); if (IS_ERR(dto)) GOTO(out_put, rc = PTR_ERR(dto)); - stripe[i - 1] = dto; + stripe[i] = dto; idx_array[i] = idx; } lo->ldo_dir_striped = 1; lo->ldo_stripe = stripe; - lo->ldo_stripenr = i - 1; - lo->ldo_stripes_allocated = stripe_count - 1; + lo->ldo_stripenr = i; + lo->ldo_stripes_allocated = stripe_count; if (lo->ldo_stripenr == 0) GOTO(out_put, rc = -ENOSPC); @@ -793,13 +1413,24 @@ static int lod_prep_md_striped_create(const struct lu_env *env, rc = lod_prep_lmv_md(env, dt, &lmv_buf); if (rc != 0) GOTO(out_put, rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + GOTO(out_put, rc = -ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); + + if (!dt_try_as_dir(env, dt_object_child(dt))) + GOTO(out_put, rc = -EINVAL); for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; + struct dt_object *dto = stripe[i]; + char *stripe_name = info->lti_key; - dto = stripe[i]; - /* only create slave striped object */ - rc = dt_declare_create(env, dto, attr, NULL, NULL, th); + rc = dt_declare_create(env, dto, attr, NULL, dof, th); if (rc != 0) GOTO(out_put, rc); @@ -824,7 +1455,6 @@ static int lod_prep_md_striped_create(const struct lu_env *env, !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, lo->ldo_def_stripenr, lo->ldo_def_stripe_offset)) { - struct lod_thread_info *info; struct lov_user_md_v3 *v3; /* sigh, lti_ea_store has been used for lmv_buf, @@ -846,7 +1476,6 @@ static int lod_prep_md_striped_create(const struct lu_env *env, strncpy(v3->lmm_pool_name, lo->ldo_pool, LOV_MAXPOOLNAME); - info = lod_env_info(env); info->lti_buf.lb_buf = v3; info->lti_buf.lb_len = sizeof(*v3); rc = dt_declare_xattr_set(env, dto, @@ -857,8 +1486,22 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (rc != 0) GOTO(out_put, rc); } - rc = dt_declare_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, 0, - th); + + slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); + rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf, + XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out_put, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + rc = dt_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)lu_object_fid(&dto->do_lu), + (const struct dt_key *)stripe_name, th); + if (rc != 0) + GOTO(out_put, rc); + + rc = dt_declare_ref_add(env, dt_object_child(dt), th); if (rc != 0) GOTO(out_put, rc); } @@ -870,15 +1513,20 @@ static int lod_prep_md_striped_create(const struct lu_env *env, out_put: if (rc < 0) { - for (i = 0; i < stripe_count - 1; i++) + for (i = 0; i < stripe_count; i++) if (stripe[i] != NULL) lu_object_put(env, &stripe[i]->do_lu); - OBD_FREE(stripe, sizeof(stripe[0]) * (stripe_count - 1)); + OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); + lo->ldo_stripenr = 0; + lo->ldo_stripes_allocated = 0; + lo->ldo_stripe = NULL; } out_free: if (idx_array != NULL) OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); RETURN(rc); } @@ -890,6 +1538,7 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, const struct lu_buf *lum_buf, + struct dt_object_format *dof, struct thandle *th) { struct lod_object *lo = lod_dt_obj(dt); @@ -905,7 +1554,7 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), (int)le32_to_cpu(lum->lum_stripe_offset)); - if (le32_to_cpu(lum->lum_stripe_count) <= 1) + if (le32_to_cpu(lum->lum_stripe_count) == 0) GOTO(out, rc = 0); rc = lod_verify_md_striping(lod, lum); @@ -913,7 +1562,7 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, GOTO(out, rc); /* prepare dir striped objects */ - rc = lod_prep_md_striped_create(env, dt, attr, lum, th); + rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); if (rc != 0) { /* failed to create striping, let's reset * config so that others don't get confused */ @@ -924,6 +1573,52 @@ out: RETURN(rc); } +static int lod_dir_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; + + if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + struct lmv_user_md_v1 *lum; + + LASSERT(buf != NULL && buf->lb_buf != NULL); + lum = buf->lb_buf; + rc = lod_verify_md_striping(d, lum); + if (rc != 0) + RETURN(rc); + } + + rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + if (rc != 0) + RETURN(rc); + + /* set xattr to each stripes, if needed */ + rc = lod_load_striping(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(rc); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf, + name, fl, th); + if (rc != 0) + break; + } + + RETURN(rc); +} + /* * LOV xattr is a storage for striping, and LOD owns this xattr. * but LOD allows others to control striping to some extent @@ -969,39 +1664,7 @@ static int lod_declare_xattr_set(const struct lu_env *env, } rc = lod_declare_striped_object(env, dt, attr, buf, th); } else if (S_ISDIR(mode)) { - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *lo = lod_dt_obj(dt); - int i; - - if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - struct lmv_user_md_v1 *lum; - - LASSERT(buf != NULL && buf->lb_buf != NULL); - lum = buf->lb_buf; - rc = lod_verify_md_striping(d, lum); - if (rc != 0) - RETURN(rc); - } - - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); - if (rc != 0) - RETURN(rc); - - /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); - if (rc != 0) - RETURN(rc); - - if (lo->ldo_stripenr == 0) - RETURN(rc); - - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf, - name, fl, th); - if (rc != 0) - break; - } + rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else { rc = dt_declare_xattr_set(env, next, buf, name, fl, th); } @@ -1016,6 +1679,67 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo) lod_object_set_pool(lo, NULL); lo->ldo_def_stripe_size = 0; lo->ldo_def_stripenr = 0; + if (lo->ldo_dir_stripe != NULL) + lo->ldo_dir_striping_cached = 0; +} + +static int lod_xattr_set_internal(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, struct thandle *th, + struct lustre_capa *capa) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(rc); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th, + capa); + if (rc != 0) + break; + } + + RETURN(rc); +} + +static int lod_xattr_del_internal(const struct lu_env *env, + struct dt_object *dt, + const char *name, struct thandle *th, + struct lustre_capa *capa) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = dt_xattr_del(env, next, name, th, capa); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(rc); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th, + capa); + if (rc != 0) + break; + } + + RETURN(rc); } static int lod_xattr_set_lov_on_dir(const struct lu_env *env, @@ -1026,7 +1750,6 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, struct lustre_capa *capa) { struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct dt_object *next = dt_object_child(dt); struct lod_object *l = lod_dt_obj(dt); struct lov_user_md_v1 *lum; struct lov_user_md_v3 *v3 = NULL; @@ -1062,11 +1785,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, (lum->lmm_stripe_count), (lum->lmm_stripe_offset)) && lum->lmm_magic == LOV_USER_MAGIC_V1) { - rc = dt_xattr_del(env, next, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th, capa); if (rc == -ENODATA) rc = 0; } else { - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); } RETURN(rc); @@ -1079,7 +1802,6 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, struct thandle *th, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); struct lod_object *l = lod_dt_obj(dt); struct lmv_user_md_v1 *lum; int rc; @@ -1095,27 +1817,26 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), le32_to_cpu(lum->lum_stripe_offset)) && le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { - rc = dt_xattr_del(env, next, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th, capa); if (rc == -ENODATA) rc = 0; } else { - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); if (rc != 0) RETURN(rc); + } - /* Update default stripe cache */ - if (l->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(l->ldo_dir_stripe); - if (l->ldo_dir_stripe == NULL) - RETURN(-ENOMEM); - } - - l->ldo_dir_striping_cached = 0; - l->ldo_dir_def_striping_set = 1; - l->ldo_dir_def_stripenr = - le32_to_cpu(lum->lum_stripe_count) - 1; + /* Update default stripe cache */ + if (l->ldo_dir_stripe == NULL) { + OBD_ALLOC_PTR(l->ldo_dir_stripe); + if (l->ldo_dir_stripe == NULL) + RETURN(-ENOMEM); } + l->ldo_dir_striping_cached = 0; + l->ldo_dir_def_striping_set = 1; + l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count); + RETURN(rc); } @@ -1125,7 +1846,13 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, struct lustre_capa *capa) { struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *attr = &info->lti_attr; + struct dt_object_format *dof = &info->lti_format; struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; int i; int rc; ENTRY; @@ -1138,19 +1865,34 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (lo->ldo_stripenr == 0) RETURN(0); + rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + + attr->la_valid = LA_TYPE | LA_MODE; + dof->dof_type = DFT_DIR; + rc = lod_prep_lmv_md(env, dt, &lmv_buf); if (rc != 0) RETURN(rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + RETURN(-ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); for (i = 0; i < lo->ldo_stripenr; i++) { struct dt_object *dto; - struct lu_attr *attr = &lod_env_info(env)->lti_attr; + char *stripe_name = info->lti_key; dto = lo->ldo_stripe[i]; - memset(attr, 0, sizeof(*attr)); - attr->la_valid = LA_TYPE | LA_MODE; - attr->la_mode = S_IFDIR; - rc = dt_create(env, dto, attr, NULL, NULL, th); + dt_write_lock(env, dto, MOR_TGT_CHILD); + rc = dt_create(env, dto, attr, NULL, dof, th); + dt_write_unlock(env, dto); if (rc != 0) RETURN(rc); @@ -1170,7 +1912,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, lo->ldo_def_stripenr, lo->ldo_def_stripe_offset)) { - struct lod_thread_info *info; struct lov_user_md_v3 *v3; /* sigh, lti_ea_store has been used for lmv_buf, @@ -1178,7 +1919,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, * stripe EA */ OBD_ALLOC_PTR(v3); if (v3 == NULL) - RETURN(-ENOMEM); + GOTO(out, rc); memset(v3, 0, sizeof(*v3)); v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); @@ -1192,24 +1933,182 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, strncpy(v3->lmm_pool_name, lo->ldo_pool, LOV_MAXPOOLNAME); - info = lod_env_info(env); info->lti_buf.lb_buf = v3; info->lti_buf.lb_len = sizeof(*v3); rc = dt_xattr_set(env, dto, &info->lti_buf, XATTR_NAME_LOV, 0, th, capa); OBD_FREE_PTR(v3); if (rc != 0) + GOTO(out, rc); + } + + slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); + rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV, + fl, th, capa); + if (rc != 0) + GOTO(out, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + rc = dt_insert(env, dt_object_child(dt), + (const struct dt_rec *)lu_object_fid(&dto->do_lu), + (const struct dt_key *)stripe_name, th, capa, 0); + if (rc != 0) + GOTO(out, rc); + + rc = dt_ref_add(env, dt_object_child(dt), th); + if (rc != 0) + GOTO(out, rc); + } + + rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV, + fl, th, capa); + +out: + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); + + RETURN(rc); +} + +int lod_dir_striping_create_internal(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th, + bool declare) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + if (lo->ldo_dir_def_striping_set && + !LMVEA_DELETE_VALUES(lo->ldo_stripenr, + lo->ldo_dir_stripe_offset)) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + int stripe_count = lo->ldo_stripenr; + + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) RETURN(rc); + v1 = info->lti_ea_store; } - rc = dt_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, fl, th, - capa); + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = cpu_to_le32(stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lo->ldo_dir_stripe_offset); + + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + + if (declare) + rc = lod_declare_xattr_set_lmv(env, dt, attr, + &info->lti_buf, dof, th); + else + rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, + XATTR_NAME_LMV, 0, th, + BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + } + + /* Transfer default LMV striping from the parent */ + if (lo->ldo_dir_striping_cached && + !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, + lo->ldo_dir_def_stripe_offset)) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + int def_stripe_count = lo->ldo_dir_def_stripenr; + + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) + RETURN(rc); + v1 = info->lti_ea_store; + } + + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = cpu_to_le32(def_stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lo->ldo_dir_def_stripe_offset); + v1->lum_hash_type = + cpu_to_le32(lo->ldo_dir_def_hash_type); + + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + if (declare) + rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, + XATTR_NAME_DEFAULT_LMV, + 0, th); + else + rc = lod_xattr_set_default_lmv_on_dir(env, dt, + &info->lti_buf, + XATTR_NAME_DEFAULT_LMV, 0, + th, BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + } + + /* Transfer default LOV striping from the parent */ + if (lo->ldo_striping_cached && + !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, + lo->ldo_def_stripenr, + lo->ldo_def_stripe_offset)) { + struct lov_user_md_v3 *v3 = info->lti_ea_store; + + if (info->lti_ea_store_size < sizeof(*v3)) { + rc = lod_ea_store_resize(info, sizeof(*v3)); + if (rc != 0) + RETURN(rc); + v3 = info->lti_ea_store; + } + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); + if (lo->ldo_pool) + strncpy(v3->lmm_pool_name, lo->ldo_pool, + LOV_MAXPOOLNAME); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + + if (declare) + rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + else + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + XATTR_NAME_LOV, 0, th, + BYPASS_CAPA); + if (rc != 0) + RETURN(rc); } - rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV, - fl, th, capa); + RETURN(0); +} + +static int lod_declare_dir_striping_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + return lod_dir_striping_create_internal(env, dt, attr, dof, th, true); +} - RETURN(rc); +static int lod_dir_striping_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); } static int lod_xattr_set(const struct lu_env *env, @@ -1217,17 +2116,36 @@ static int lod_xattr_set(const struct lu_env *env, const char *name, int fl, struct thandle *th, struct lustre_capa *capa) { - struct lod_object *lo = lod_dt_obj(dt); struct dt_object *next = dt_object_child(dt); - __u32 attr; int rc; - int i; ENTRY; - attr = dt->do_lu.lo_header->loh_attr & S_IFMT; - if (S_ISDIR(attr) && strcmp(name, XATTR_NAME_LOV) == 0) { + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_LMV) == 0) { + struct lmv_mds_md_v1 *lmm = buf->lb_buf; + + if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & + LMV_HASH_FLAG_MIGRATION) + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + else + rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + + RETURN(rc); + } + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_LOV) == 0) { + /* default LOVEA */ rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa); - } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) { + RETURN(rc); + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + /* default LMVEA */ + rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, + th, capa); + RETURN(rc); + } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && + !strcmp(name, XATTR_NAME_LOV)) { /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() @@ -1240,32 +2158,12 @@ static int lod_xattr_set(const struct lu_env *env, } else { rc = lod_striping_create(env, dt, NULL, NULL, th); } - } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - if (!S_ISDIR(attr)) - RETURN(-ENOTDIR); - rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, - th, capa); - } else { - /* - * behave transparantly for all other EAs - */ - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); - } - - if (rc != 0 || !S_ISDIR(attr)) - RETURN(rc); - - if (lo->ldo_stripenr == 0) RETURN(rc); - - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th, - capa); - if (rc != 0) - break; } + /* then all other xattr */ + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + RETURN(rc); } @@ -1408,7 +2306,7 @@ static int lod_cache_parent_lmv_striping(const struct lu_env *env, rc = 0; v1 = info->lti_ea_store; - lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count) - 1; + lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count); lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); lp->ldo_dir_def_striping_set = 1; @@ -1460,6 +2358,7 @@ static void lod_ah_init(const struct lu_env *env, struct lod_object *lp = NULL; struct lod_object *lc; struct lov_desc *desc; + int rc; ENTRY; LASSERT(child); @@ -1467,6 +2366,9 @@ static void lod_ah_init(const struct lu_env *env, if (likely(parent)) { nextp = dt_object_child(parent); lp = lod_dt_obj(parent); + rc = lod_load_striping(env, lp); + if (rc != 0) + return; } nextc = dt_object_child(child); @@ -1485,8 +2387,6 @@ static void lod_ah_init(const struct lu_env *env, NULL : nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { - int rc; - if (lc->ldo_dir_stripe == NULL) { OBD_ALLOC_PTR(lc->ldo_dir_stripe); if (lc->ldo_dir_stripe == NULL) @@ -1536,7 +2436,6 @@ static void lod_ah_init(const struct lu_env *env, /* If the directory is specified with certain stripes */ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - int rc; rc = lod_verify_md_striping(d, lum1); if (rc == 0 && @@ -1544,7 +2443,7 @@ static void lod_ah_init(const struct lu_env *env, /* Directory will be striped only if * stripe_count > 1 */ lc->ldo_stripenr = - le32_to_cpu(lum1->lum_stripe_count) - 1; + le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = le32_to_cpu(lum1->lum_stripe_offset); lc->ldo_dir_hash_type = @@ -1732,146 +2631,6 @@ out: RETURN(rc); } -int lod_dir_striping_create_internal(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - const struct dt_object_format *dof, - struct thandle *th, - bool declare) -{ - struct lod_thread_info *info = lod_env_info(env); - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - ENTRY; - - if (lo->ldo_dir_def_striping_set && - !LMVEA_DELETE_VALUES(lo->ldo_stripenr, - lo->ldo_dir_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int stripe_count = lo->ldo_stripenr + 1; - - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; - } - - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_stripe_offset); - - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); - - if (declare) - rc = lod_declare_xattr_set_lmv(env, dt, attr, - &info->lti_buf, th); - else - rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, - XATTR_NAME_LMV, 0, th, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - } - - /* Transfer default LMV striping from the parent */ - if (lo->ldo_dir_striping_cached && - !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, - lo->ldo_dir_def_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int def_stripe_count = lo->ldo_dir_def_stripenr + 1; - - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; - } - - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(def_stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_def_stripe_offset); - v1->lum_hash_type = - cpu_to_le32(lo->ldo_dir_def_hash_type); - - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); - if (declare) - rc = dt_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_DEFAULT_LMV, 0, - th); - else - rc = dt_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_DEFAULT_LMV, 0, th, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - } - - /* Transfer default LOV striping from the parent */ - if (lo->ldo_striping_cached && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset)) { - struct lov_user_md_v3 *v3 = info->lti_ea_store; - - if (info->lti_ea_store_size < sizeof(*v3)) { - rc = lod_ea_store_resize(info, sizeof(*v3)); - if (rc != 0) - RETURN(rc); - v3 = info->lti_ea_store; - } - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool) - strncpy(v3->lmm_pool_name, lo->ldo_pool, - LOV_MAXPOOLNAME); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - - if (declare) - rc = dt_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); - else - rc = dt_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - } - - RETURN(0); -} - -static int lod_declare_dir_striping_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th) -{ - return lod_dir_striping_create_internal(env, dt, attr, dof, th, true); -} - -static int lod_dir_striping_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th) -{ - return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); -} - static int lod_declare_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -1958,14 +2717,12 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, /* create local object */ rc = dt_create(env, next, attr, hint, dof, th); + if (rc != 0) + RETURN(rc); - if (rc == 0) { - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - lo->ldo_dir_stripe != NULL) - rc = lod_dir_striping_create(env, dt, attr, dof, th); - else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0) - rc = lod_striping_create(env, dt, attr, dof, th); - } + if (S_ISREG(dt->do_lu.lo_header->loh_attr) && + lo->ldo_stripe && dof->u.dof_reg.striped != 0) + rc = lod_striping_create(env, dt, attr, dof, th); RETURN(rc); } @@ -1976,34 +2733,55 @@ static int lod_declare_object_destroy(const struct lu_env *env, { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + char *stripe_name = info->lti_key; int rc, i; ENTRY; /* - * we declare destroy for the local object + * load striping information, notice we don't do this when object + * is being initialized as we don't need this information till + * few specific cases like destroy, chown */ - rc = dt_declare_destroy(env, next, th); + rc = lod_load_striping(env, lo); if (rc) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) - RETURN(0); + /* declare destroy for all underlying objects */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); + for (i = 0; i < lo->ldo_stripenr; i++) { + rc = dt_declare_ref_del(env, next, th); + if (rc != 0) + RETURN(rc); + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), + i); + rc = dt_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + RETURN(rc); + } + } /* - * load striping information, notice we don't do this when object - * is being initialized as we don't need this information till - * few specific cases like destroy, chown + * we declare destroy for the local object */ - rc = lod_load_striping(env, lo); + rc = dt_declare_destroy(env, next, th); if (rc) RETURN(rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) + RETURN(0); + /* declare destroy for all underlying objects */ for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); rc = dt_declare_destroy(env, lo->ldo_stripe[i], th); - - if (rc) + if (rc != 0) break; } @@ -2015,48 +2793,56 @@ static int lod_object_destroy(const struct lu_env *env, { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + char *stripe_name = info->lti_key; int rc, i; ENTRY; - /* destroy local object */ + /* destroy sub-stripe of master object */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); + + for (i = 0; i < lo->ldo_stripenr; i++) { + rc = dt_ref_del(env, next, th); + if (rc != 0) + RETURN(rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), + i); + + CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n", + PFID(lu_object_fid(&dt->do_lu)), stripe_name, + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); + + rc = dt_delete(env, next, + (const struct dt_key *)stripe_name, + th, BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + } + } rc = dt_destroy(env, next, th); - if (rc) + if (rc != 0) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) RETURN(0); - /* destroy all underlying objects */ + /* destroy all striped objects */ for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - /* for striped directory, next == ldo_stripe[0] */ - if (next != lo->ldo_stripe[i]) { - rc = dt_destroy(env, lo->ldo_stripe[i], th); - if (rc) - break; - } + rc = dt_destroy(env, lo->ldo_stripe[i], th); + if (rc != 0) + break; } RETURN(rc); } -static int lod_index_try(const struct lu_env *env, struct dt_object *dt, - const struct dt_index_features *feat) -{ - struct dt_object *next = dt_object_child(dt); - int rc; - ENTRY; - - LASSERT(next->do_ops); - LASSERT(next->do_ops->do_index_try); - - rc = next->do_ops->do_index_try(env, next, feat); - if (next->do_index_ops && dt->do_index_ops == NULL) - dt->do_index_ops = &lod_index_ops; - - RETURN(rc); -} - static int lod_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { @@ -2113,7 +2899,7 @@ static int lod_object_unlock_internal(const struct lu_env *env, if (slave_locks == NULL) RETURN(0); - for (i = 0; i < slave_locks->lsl_lock_count; i++) { + for (i = 1; i < slave_locks->lsl_lock_count; i++) { if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) { int rc1; @@ -2141,18 +2927,18 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, if (slave_locks == NULL) RETURN(0); + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(-ENOTDIR); + rc = lod_load_striping(env, lo); if (rc != 0) RETURN(rc); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ - if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt))) + if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt))) RETURN(0); - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); - /* Only cancel slave lock for striped dir */ rc = lod_object_unlock_internal(env, dt, einfo, policy); @@ -2192,7 +2978,7 @@ static int lod_object_lock(const struct lu_env *env, RETURN(rc); /* No stripes */ - if (lo->ldo_stripenr == 0) + if (lo->ldo_stripenr <= 1) RETURN(0); slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr * @@ -2204,7 +2990,7 @@ static int lod_object_lock(const struct lu_env *env, slave_locks->lsl_lock_count = lo->ldo_stripenr; /* striped directory lock */ - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 1; i < lo->ldo_stripenr; i++) { struct lustre_handle lockh; struct ldlm_res_id *res_id; @@ -2218,7 +3004,6 @@ static int lod_object_lock(const struct lu_env *env, policy); if (rc != 0) GOTO(out, rc); - slave_locks->lsl_handle[i] = lockh; } diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index ee1089a..b6ed204 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -98,8 +98,8 @@ int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid, struct list_head *cancels, ldlm_mode_t mode, __u64 bits); /* mdc/mdc_request.c */ -int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data); +int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data); int mdc_open(struct obd_export *exp, obd_id ino, int type, int flags, struct lov_mds_md *lmm, int lmm_size, struct lustre_handle *fh, diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 2b5babb..bf68ed9 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -1192,7 +1192,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, /* For case if upper layer did not alloc fid, do it now. */ if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) { - rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc < 0) { CERROR("Can't alloc new fid, rc %d\n", rc); RETURN(rc); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index e9ad092..112b53b 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -225,18 +225,16 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data, struct list_head cancels = LIST_HEAD_INIT(cancels); ENTRY; - /* For case if upper layer did not alloc fid, do it now. */ - if (!fid_is_sane(&op_data->op_fid2)) { - /* - * mdc_fid_alloc() may return errno 1 in case of switch to new - * sequence, handle this. - */ - rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data); - if (rc < 0) { - CERROR("Can't alloc new fid, rc %d\n", rc); - RETURN(rc); - } - } + /* For case if upper layer did not alloc fid, do it now. */ + if (!fid_is_sane(&op_data->op_fid2)) { + /* + * mdc_fid_alloc() may return errno 1 in case of switch to new + * sequence, handle this. + */ + rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); + if (rc < 0) + RETURN(rc); + } rebuild: count = 0; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 7610e59..00ceb06 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -879,7 +879,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, req_fmt = &RQF_MDS_RELEASE_CLOSE; /* allocate a FID for volatile file */ - rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc < 0) { CERROR("%s: "DFID" failed to allocate FID: %d\n", obd->obd_name, PFID(&op_data->op_fid1), rc); @@ -3067,13 +3067,13 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, RETURN(rc); } -int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { - struct client_obd *cli = &exp->exp_obd->u.cli; - struct lu_client_seq *seq = cli->cl_seq; - ENTRY; - RETURN(seq_client_alloc_fid(NULL, seq, fid)); + struct client_obd *cli = &exp->exp_obd->u.cli; + struct lu_client_seq *seq = cli->cl_seq; + ENTRY; + RETURN(seq_client_alloc_fid(env, seq, fid)); } struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 0b382f3..f593270 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -260,39 +260,39 @@ int mdd_is_subdir(const struct lu_env *env, struct md_object *mo, static int mdd_dir_is_empty(const struct lu_env *env, struct mdd_object *dir) { - struct dt_it *it; - struct dt_object *obj; - const struct dt_it_ops *iops; - int result; - ENTRY; + struct dt_it *it; + struct dt_object *obj; + const struct dt_it_ops *iops; + int result; + ENTRY; - obj = mdd_object_child(dir); - if (!dt_try_as_dir(env, obj)) - RETURN(-ENOTDIR); - - iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); - if (!IS_ERR(it)) { - result = iops->get(env, it, (const void *)""); - if (result > 0) { - int i; - for (result = 0, i = 0; result == 0 && i < 3; ++i) - result = iops->next(env, it); - if (result == 0) - result = -ENOTEMPTY; - else if (result == +1) - result = 0; - } else if (result == 0) - /* - * Huh? Index contains no zero key? - */ - result = -EIO; - - iops->put(env, it); - iops->fini(env, it); - } else - result = PTR_ERR(it); - RETURN(result); + obj = mdd_object_child(dir); + if (!dt_try_as_dir(env, obj)) + RETURN(-ENOTDIR); + + iops = &obj->do_index_ops->dio_it; + it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); + if (!IS_ERR(it)) { + result = iops->get(env, it, (const struct dt_key *)""); + if (result > 0) { + int i; + for (result = 0, i = 0; result == 0 && i < 3; ++i) + result = iops->next(env, it); + if (result == 0) + result = -ENOTEMPTY; + else if (result == 1) + result = 0; + } else if (result == 0) + /* + * Huh? Index contains no zero key? + */ + result = -EIO; + + iops->put(env, it); + iops->fini(env, it); + } else + result = PTR_ERR(it); + RETURN(result); } static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj, @@ -321,8 +321,10 @@ static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj, */ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj, const struct lu_attr *pattr, - struct mdd_object *cobj, int check_perm, int check_nlink) + struct mdd_object *cobj, bool check_perm, bool check_nlink) { + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_buf *xbuf; int rc = 0; ENTRY; @@ -332,6 +334,19 @@ int mdd_may_create(const struct lu_env *env, if (mdd_is_dead_obj(pobj)) RETURN(-ENOENT); + /* If the parent is a sub-stripe, check whether it is dead */ + xbuf = mdd_buf_get(env, info->mti_key, sizeof(info->mti_key)); + rc = mdo_xattr_get(env, pobj, xbuf, XATTR_NAME_LMV, + mdd_object_capa(env, pobj)); + if (unlikely(rc > 0)) { + struct lmv_mds_md_v1 *lmv1 = xbuf->lb_buf; + + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE && + le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_DEAD) + RETURN(-ESTALE); + } + rc = 0; + if (check_perm) rc = mdd_permission_internal_locked(env, pobj, pattr, MAY_WRITE | MAY_EXEC, @@ -517,7 +532,7 @@ static int mdd_link_sanity_check(const struct lu_env *env, LASSERT(src_obj != tgt_obj); if (tgt_obj) { - rc = mdd_may_create(env, tgt_obj, tattr, NULL, 1, 0); + rc = mdd_may_create(env, tgt_obj, tattr, NULL, true, false); if (rc) RETURN(rc); } @@ -1323,12 +1338,41 @@ out_pending: return rc; } +static int mdd_mark_dead_object(const struct lu_env *env, + struct mdd_object *obj, struct thandle *handle, + bool declare) +{ + struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start); + int rc; + + if (!declare) + obj->mod_flags |= DEAD_OBJ; + + if (!S_ISDIR(mdd_object_type(obj))) + return 0; + + attr->la_valid = LA_FLAGS; + attr->la_flags = LUSTRE_SLAVE_DEAD_FL; + + if (declare) + rc = mdo_declare_attr_set(env, obj, attr, handle); + else + rc = mdo_attr_set(env, obj, attr, handle, + mdd_object_capa(env, obj)); + + return rc; +} + static int mdd_declare_finish_unlink(const struct lu_env *env, struct mdd_object *obj, struct thandle *handle) { int rc; + rc = mdd_mark_dead_object(env, obj, handle, true); + if (rc != 0) + return rc; + rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle); if (rc != 0) return rc; @@ -1354,7 +1398,9 @@ int mdd_finish_unlink(const struct lu_env *env, LASSERT(mdd_write_locked(env, obj) != 0); if (ma->ma_attr.la_nlink == 0 || is_dir) { - obj->mod_flags |= DEAD_OBJ; + rc = mdd_mark_dead_object(env, obj, th, false); + if (rc != 0) + RETURN(rc); /* add new orphan and the object * will be deleted during mdd_close() */ @@ -1802,12 +1848,12 @@ static int mdd_create_sanity_check(const struct lu_env *env, struct lu_attr *cattr, struct md_op_spec *spec) { - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_fid *fid = &info->mti_fid; - struct mdd_object *obj = md2mdd_obj(pobj); - struct mdd_device *m = mdo2mdd(pobj); - int rc; - ENTRY; + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_fid *fid = &info->mti_fid; + struct mdd_object *obj = md2mdd_obj(pobj); + struct mdd_device *m = mdo2mdd(pobj); + int rc; + ENTRY; /* EEXIST check */ if (mdd_is_dead_obj(obj)) @@ -1828,15 +1874,9 @@ static int mdd_create_sanity_check(const struct lu_env *env, MAY_WRITE | MAY_EXEC); if (rc != -ENOENT) RETURN(rc ? : -EEXIST); - } else { - /* - * Check WRITE permission for the parent. - * EXEC permission have been checked - * when lookup before create already. - */ - rc = mdd_permission_internal_locked(env, obj, pattr, MAY_WRITE, - MOR_TGT_PARENT); - if (rc) + } else { + rc = mdd_may_create(env, obj, pattr, NULL, true, false); + if (rc != 0) RETURN(rc); } @@ -2067,6 +2107,43 @@ static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj, if (rc) GOTO(unlock, rc); + /* Note: In DNE phase I, for striped dir, though sub-stripes will be + * created in declare phase, they also needs to be added to master + * object as sub-directory entry. So it has to initialize the master + * object, then set dir striped EA.(in mdo_xattr_set) */ + rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle, + spec); + if (rc != 0) + GOTO(err_destroy, rc); + + /* + * in case of replay we just set LOVEA provided by the client + * XXX: I think it would be interesting to try "old" way where + * MDT calls this xattr_set(LOV) in a different transaction. + * probably this way we code can be made better. + */ + + /* During creation, there are only a few cases we need do xattr_set to + * create stripes. + * 1. regular file: see comments above. + * 2. create striped directory with provided stripeEA. + * 3. create striped directory because inherit default layout from the + * parent. */ + if (spec->no_create || + (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) || + S_ISDIR(attr->la_mode)) { + const struct lu_buf *buf; + + buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, + spec->u.sp_ea.eadatalen); + rc = mdo_xattr_set(env, son, buf, + S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV : + XATTR_NAME_LOV, 0, + handle, BYPASS_CAPA); + if (rc != 0) + GOTO(err_destroy, rc); + } + #ifdef CONFIG_FS_POSIX_ACL if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) { @@ -2087,29 +2164,6 @@ static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj, } #endif - rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle, - spec); - if (rc != 0) - GOTO(err_destroy, rc); - - /* - * in case of replay we just set LOVEA provided by the client - * XXX: I think it would be interesting to try "old" way where - * MDT calls this xattr_set(LOV) in a different transaction. - * probably this way we code can be made better. - */ - if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA && - S_ISREG(attr->la_mode))) { - const struct lu_buf *buf; - - buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, - spec->u.sp_ea.eadatalen); - rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle, - BYPASS_CAPA); - if (rc != 0) - GOTO(err_destroy, rc); - } - if (S_ISLNK(attr->la_mode)) { struct lu_ucred *uc = lu_ucred_assert(env); struct dt_object *dt = mdd_object_child(son); @@ -2411,7 +2465,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env, * So check may_create, but not check may_unlink. */ if (!tobj) rc = mdd_may_create(env, tgt_pobj, tpattr, NULL, - (src_pobj != tgt_pobj), 0); + (src_pobj != tgt_pobj), false); else rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr, (src_pobj != tgt_pobj), 1); @@ -3206,7 +3260,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env, return rc; } - mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE); + mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1); buf = mdd_buf_get_const(env, mgr_ea, mgr_easize); rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle); @@ -3236,6 +3290,7 @@ static int mdd_migrate_create(const struct lu_env *env, struct thandle *handle; struct lmv_mds_md_v1 *mgr_ea; struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix); + struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint; int mgr_easize; int rc; ENTRY; @@ -3259,7 +3314,7 @@ static int mdd_migrate_create(const struct lu_env *env, RETURN(rc); } spec->u.sp_symname = link_buf.lb_buf; - } else{ + } else if S_ISREG(la->la_mode) { /* retrieve lov of the old object */ rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf); if (rc != 0 && rc != -ENODATA) @@ -3272,13 +3327,16 @@ static int mdd_migrate_create(const struct lu_env *env, } mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf; - mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_MIGRATE); + mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1); mgr_ea->lmv_stripe_count = cpu_to_le32(2); mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id; - mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_MIGRATION); + mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION); + fid_cpu_to_le(&mgr_ea->lmv_master_fid, mdd_object_fid(mdd_sobj)); fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj)); fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj)); + mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); @@ -3300,22 +3358,14 @@ static int mdd_migrate_create(const struct lu_env *env, /* create the target object */ rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL, - NULL, handle); + hint, handle); if (rc != 0) GOTO(stop_trans, rc); - if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) { - buf = mdd_buf_get_const(env, lmm_buf.lb_buf, lmm_buf.lb_len); - rc = mdo_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV, - 0, handle, mdd_object_capa(env, mdd_sobj)); - if (rc != 0) - GOTO(stop_trans, rc); - } - /* Set MIGRATE EA on the source inode, so once the migration needs * to be re-done during failover, the re-do process can locate the * target object which is already being created. */ - mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE); + mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1); buf = mdd_buf_get_const(env, mgr_ea, mgr_easize); rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle, mdd_object_capa(env, mdd_sobj)); @@ -3793,7 +3843,7 @@ static int mdd_migrate_sanity_check(const struct lu_env *env, ENTRY; - mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE); + mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1); mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize); if (mgr_buf->lb_buf == NULL) RETURN(-ENOMEM); @@ -3807,8 +3857,8 @@ static int mdd_migrate_sanity_check(const struct lu_env *env, * is being set by previous migration process, so it * needs to override the IMMUTE flag, otherwise the * following sanity check will fail */ - if (le32_to_cpu(lmm->lmv_md_v1.lmv_magic) == - LMV_MAGIC_MIGRATE) { + if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) & + LMV_HASH_FLAG_MIGRATION) { struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj); sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL; diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index f6657be..4411892 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -190,7 +190,7 @@ int mdd_is_subdir(const struct lu_env *env, struct md_object *mo, const struct lu_fid *fid, struct lu_fid *sfid); int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj, const struct lu_attr *pattr, struct mdd_object *cobj, - int check_perm, int check_nlink); + bool check_perm, bool check_nlink); int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj, const struct lu_attr *pattr, const struct lu_attr *attr); int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj, diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index d0b9e225..9f0f8c2 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -1969,6 +1969,7 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj, * No pages were processed, mark this for first page * and send back. */ + dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF); dp->ldp_flags = cpu_to_le32(LDF_EMPTY); rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count); } diff --git a/lustre/mdd/mdd_permission.c b/lustre/mdd/mdd_permission.c index da184e8..42b80b5 100644 --- a/lustre/mdd/mdd_permission.c +++ b/lustre/mdd/mdd_permission.c @@ -308,7 +308,8 @@ int mdd_permission(const struct lu_env *env, struct lu_ucred *uc = NULL; struct lu_attr *pattr = NULL; struct lu_attr *cattr = MDD_ENV_VAR(env, cattr); - int check_create, check_link; + bool check_create; + bool check_link; int check_unlink; int check_rename_src, check_rename_tar; int check_vtx_part, check_vtx_full; @@ -336,8 +337,8 @@ int mdd_permission(const struct lu_env *env, if (unlikely(mask & MDS_OPEN_CROSS)) mask = accmode(env, cattr, mask & ~MDS_OPEN_CROSS); - check_create = mask & MAY_CREATE; - check_link = mask & MAY_LINK; + check_create = mask & MAY_CREATE; + check_link = mask & MAY_LINK; check_unlink = mask & MAY_UNLINK; check_rename_src = mask & MAY_RENAME_SRC; check_rename_tar = mask & MAY_RENAME_TAR; @@ -355,8 +356,8 @@ int mdd_permission(const struct lu_env *env, MOR_TGT_CHILD); if (!rc && (check_create || check_link)) - rc = mdd_may_create(env, mdd_pobj, pattr, mdd_cobj, 1, - check_link); + rc = mdd_may_create(env, mdd_pobj, pattr, mdd_cobj, true, + check_link); if (!rc && check_unlink) rc = mdd_may_unlink(env, mdd_pobj, pattr, cattr); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 9f84d08..34d5b97 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -508,8 +508,8 @@ out: return rc; } -static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, - const char *name) +int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, + const char *name) { const struct lu_env *env = info->mti_env; int rc; @@ -543,6 +543,8 @@ static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, info->mti_buf.lb_buf = info->mti_big_lmm; info->mti_buf.lb_len = info->mti_big_lmmsize; rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name); + if (rc > 0) + info->mti_big_lmm_used = 1; RETURN(rc); } @@ -594,7 +596,6 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, return rc; rc = mdt_big_xattr_get(info, o, name); if (rc > 0) { - info->mti_big_lmm_used = 1; if (!strcmp(name, XATTR_NAME_LOV)) { ma->ma_valid |= MA_LOV; ma->ma_lmm = info->mti_big_lmm; diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 5b8cea1..bce5d12 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -717,6 +717,8 @@ enum { int mdt_get_info(struct tgt_session_info *tsi); int mdt_attr_get_complex(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma); +int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, + const char *name); int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma, const char *name); int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o, diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index ae30238..1c60854 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -576,6 +576,10 @@ void mdt_dump_lmv(unsigned int level, const union lmv_mds_md *lmv) le32_to_cpu(lmm1->lmv_magic), le32_to_cpu(lmm1->lmv_master_mdt_index), le32_to_cpu(lmm1->lmv_stripe_count)); + + if (le32_to_cpu(lmm1->lmv_magic) == LMV_MAGIC_STRIPE) + return; + for (i = 0; i < le32_to_cpu(lmm1->lmv_stripe_count); i++) { struct lu_fid fid; diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 6c657da..948a7b3 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -399,6 +399,8 @@ put_parent: static int mdt_unlock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, __u64 ibits, + struct mdt_lock_handle *s0_lh, + struct mdt_object *s0_obj, struct ldlm_enqueue_info *einfo) { ldlm_policy_data_t *policy = &mti->mti_policy; @@ -408,6 +410,12 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti, if (!S_ISDIR(obj->mot_header.loh_attr)) RETURN(0); + /* Unlock stripe 0 */ + if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) { + LASSERT(s0_obj != NULL); + mdt_object_unlock_put(mti, s0_obj, s0_lh, 1); + } + memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; @@ -422,15 +430,51 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti, **/ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, ldlm_mode_t mode, __u64 ibits, + struct mdt_lock_handle *s0_lh, + struct mdt_object **s0_objp, struct ldlm_enqueue_info *einfo) { ldlm_policy_data_t *policy = &mti->mti_policy; - int rc; + struct lu_buf *buf = &mti->mti_buf; + struct lmv_mds_md_v1 *lmv; + struct lu_fid *fid = &mti->mti_tmp_fid1; + int rc; ENTRY; if (!S_ISDIR(obj->mot_header.loh_attr)) RETURN(0); + buf->lb_buf = mti->mti_xattr_buf; + buf->lb_len = sizeof(mti->mti_xattr_buf); + rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf, + XATTR_NAME_LMV); + if (rc == -ERANGE) { + rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV); + if (rc > 0) { + buf->lb_buf = mti->mti_big_lmm; + buf->lb_len = mti->mti_big_lmmsize; + } + } + + if (rc == -ENODATA || rc == -ENOENT) + RETURN(0); + + if (rc <= 0) + RETURN(rc); + + lmv = buf->lb_buf; + if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) + RETURN(-EINVAL); + + /* Sigh, 0_stripe and master object are different + * object, though they are in the same MDT, to avoid + * adding osd_object_lock here, so we will enqueue the + * stripe0 lock in MDT0 for now */ + fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]); + *s0_objp = mdt_object_find_lock(mti, fid, s0_lh, ibits); + if (IS_ERR(*s0_objp)) + RETURN(PTR_ERR(*s0_objp)); + memset(einfo, 0, sizeof(*einfo)); einfo->ei_type = LDLM_IBITS; einfo->ei_mode = mode; @@ -448,12 +492,14 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, struct md_attr *ma, int flags) { - struct mdt_lock_handle *lh; - int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS); - __u64 lockpart = MDS_INODELOCK_UPDATE; + struct mdt_lock_handle *lh; + int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS); + __u64 lockpart = MDS_INODELOCK_UPDATE; struct ldlm_enqueue_info *einfo = &info->mti_einfo; - int rc; - ENTRY; + struct mdt_lock_handle *s0_lh; + struct mdt_object *s0_obj = NULL; + int rc; + ENTRY; /* attr shouldn't be set on remote object */ LASSERT(!mdt_object_remote(mo)); @@ -472,7 +518,9 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, if (rc != 0) RETURN(rc); - rc = mdt_lock_slaves(info, mo, LCK_EX, lockpart, einfo); + s0_lh = &info->mti_lh[MDT_LH_LOCAL]; + mdt_lock_reg_init(s0_lh, LCK_EX); + rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo); if (rc != 0) GOTO(out_unlock, rc); @@ -511,7 +559,7 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, EXIT; out_unlock: - mdt_unlock_slaves(info, mo, lockpart, einfo); + mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo); mdt_object_unlock(info, mo, lh, rc); return rc; } @@ -769,6 +817,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, struct mdt_lock_handle *parent_lh; struct mdt_lock_handle *child_lh; struct ldlm_enqueue_info *einfo = &info->mti_einfo; + struct mdt_lock_handle *s0_lh = NULL; + struct mdt_object *s0_obj = NULL; int rc; int no_name = 0; ENTRY; @@ -916,7 +966,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, ma->ma_need = MA_INODE; ma->ma_valid = 0; - rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, einfo); + s0_lh = &info->mti_lh[MDT_LH_LOCAL]; + mdt_lock_reg_init(s0_lh, LCK_EX); + rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_lh, + &s0_obj, einfo); if (rc != 0) GOTO(unlock_child, rc); @@ -961,7 +1014,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, EXIT; unlock_child: - mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, einfo); + mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo); mdt_object_unlock(info, mc, child_lh, rc); /* Since we do not need reply md striped dir info to client, so @@ -1440,7 +1493,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv); lmm1 = &ma->ma_lmv->lmv_md_v1; - if (lmm1->lmv_magic != LMV_MAGIC_MIGRATE) { + if (!(lmm1->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) { CERROR("%s: can not migrate striped dir "DFID ": rc = %d\n", mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(mold)), -EPERM); diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 16312eb..a941781 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -677,10 +677,6 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, int rc, size; ENTRY; - /* no support for variable key & record size for now */ - LASSERT((ii->ii_flags & II_FL_VARKEY) == 0); - LASSERT((ii->ii_flags & II_FL_VARREC) == 0); - /* initialize the header of the new container */ memset(lip, 0, LIP_HDR_SIZE); lip->lip_magic = LIP_MAGIC; @@ -696,7 +692,9 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, do { char *tmp_entry = entry; struct dt_key *key; - __u64 hash; + __u64 hash; + __u16 keysize; + __u16 recsize; /* fetch 64-bit hash value */ hash = iops->store(env, it); @@ -713,18 +711,24 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, GOTO(out, rc = 0); } - if ((ii->ii_flags & II_FL_NOHASH) == 0) { + if (!(ii->ii_flags & II_FL_NOHASH)) { /* client wants to the 64-bit hash value associated with * each record */ memcpy(tmp_entry, &hash, sizeof(hash)); tmp_entry += sizeof(hash); } - /* then the key value */ - LASSERT(iops->key_size(env, it) == ii->ii_keysize); - key = iops->key(env, it); - memcpy(tmp_entry, key, ii->ii_keysize); - tmp_entry += ii->ii_keysize; + if (ii->ii_flags & II_FL_VARKEY) + keysize = iops->key_size(env, it); + else + keysize = ii->ii_keysize; + + if (!(ii->ii_flags & II_FL_NOKEY)) { + /* then the key value */ + key = iops->key(env, it); + memcpy(tmp_entry, key, keysize); + tmp_entry += keysize; + } /* and finally the record */ rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr); @@ -736,7 +740,13 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, lip->lip_nr++; if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0)) ii->ii_hash_start = hash; - entry = tmp_entry + ii->ii_recsize; + + if (ii->ii_flags & II_FL_VARREC) + recsize = iops->rec_size(env, it, attr); + else + recsize = ii->ii_recsize; + + entry = tmp_entry + recsize; nob -= size; } @@ -758,6 +768,7 @@ out: return rc; } + /* * Walk index and fill lu_page containers with key/record pairs * @@ -810,6 +821,10 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj, rc = iops->next(env, it); } else if (rc > 0) { rc = 0; + } else { + if (rc == -ENODATA) + rc = 0; + GOTO(out, rc); } /* Fill containers one after the other. There might be multiple @@ -841,6 +856,7 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj, kunmap(rdpg->rp_pages[i]); } +out: iops->put(env, it); iops->fini(env, it); @@ -878,16 +894,10 @@ int dt_index_read(const struct lu_env *env, struct dt_device *dev, if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0) RETURN(-EFAULT); - if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL) - /* we don't support directory transfer via OBD_IDX_READ for the - * time being */ + if (!fid_is_quota(&ii->ii_fid) && !fid_is_layout_rbtree(&ii->ii_fid) && + !fid_is_norm(&ii->ii_fid)) RETURN(-EOPNOTSUPP); - if (!fid_is_quota(&ii->ii_fid) && !fid_is_layout_rbtree(&ii->ii_fid)) - /* Block access to all local files except quota files and - * layout rbtree. */ - RETURN(-EPERM); - /* lookup index object subject to the transfer */ obj = dt_locate(env, dev, &ii->ii_fid); if (IS_ERR(obj)) @@ -909,25 +919,16 @@ int dt_index_read(const struct lu_env *env, struct dt_device *dev, } /* fill ii_flags with supported index features */ - ii->ii_flags &= II_FL_NOHASH; - - ii->ii_keysize = feat->dif_keysize_max; - if ((feat->dif_flags & DT_IND_VARKEY) != 0) { - /* key size is variable */ - ii->ii_flags |= II_FL_VARKEY; - /* we don't support variable key size for the time being */ - GOTO(out, rc = -EOPNOTSUPP); - } + ii->ii_flags &= (II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY | + II_FL_VARREC); - ii->ii_recsize = feat->dif_recsize_max; - if ((feat->dif_flags & DT_IND_VARREC) != 0) { - /* record size is variable */ - ii->ii_flags |= II_FL_VARREC; - /* we don't support variable record size for the time being */ - GOTO(out, rc = -EOPNOTSUPP); - } + if (!(feat->dif_flags & DT_IND_VARKEY)) + ii->ii_keysize = feat->dif_keysize_max; + + if (!(feat->dif_flags & DT_IND_VARREC)) + ii->ii_recsize = feat->dif_recsize_max; - if ((feat->dif_flags & DT_IND_NONUNQ) != 0) + if (!(feat->dif_flags & DT_IND_NONUNQ)) /* key isn't necessarily unique */ ii->ii_flags |= II_FL_NONUNQ; @@ -938,7 +939,7 @@ int dt_index_read(const struct lu_env *env, struct dt_device *dev, } /* walk the index and fill lu_idxpages with key/record pairs */ - rc = dt_index_walk(env, obj, rdpg, dt_index_page_build ,ii); + rc = dt_index_walk(env, obj, rdpg, dt_index_page_build, ii); if (!fid_is_layout_rbtree(&ii->ii_fid)) dt_read_unlock(env, obj); diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index f352d41..5ee7ff1 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -2786,6 +2786,8 @@ static int osd_object_ref_add(const struct lu_env *env, osd_trans_exec_op(env, th, OSD_OT_REF_ADD); + CDEBUG(D_INODE, DFID" increase nlink %d\n", + PFID(lu_object_fid(&dt->do_lu)), inode->i_nlink); /* * The DIR_NLINK feature allows directories to exceed LDISKFS_LINK_MAX * (65000) subdirectories by storing "1" in i_nlink if the link count @@ -2860,6 +2862,9 @@ static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt, return 0; } + CDEBUG(D_INODE, DFID" decrease nlink %d\n", + PFID(lu_object_fid(&dt->do_lu)), inode->i_nlink); + ldiskfs_dec_count(oh->ot_handle, inode); spin_unlock(&obj->oo_guard); @@ -4265,15 +4270,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env, osd_trans_declare_op(env, oh, OSD_OT_INSERT, osd_dto_credits_noquota[DTO_INDEX_INSERT]); - if (osd_dt_obj(dt)->oo_inode == NULL) { - const char *name = (const char *)key; - /* Object is not being created yet. Only happens when - * 1. declare directory create - * 2. declare insert . - * 3. declare insert .. - */ - LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0); - } else { + if (osd_dt_obj(dt)->oo_inode != NULL) { struct inode *inode = osd_dt_obj(dt)->oo_inode; /* We ignore block quota on meta pool (MDTs), so needn't @@ -4387,6 +4384,9 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th); + CDEBUG(D_INODE, "parent %lu insert %s:%lu rc = %d\n", + obj->oo_inode->i_ino, name, child_inode->i_ino, rc); + iput(child_inode); if (child != NULL) osd_object_put(env, child); @@ -4843,6 +4843,7 @@ static int osd_ldiskfs_filldir(void *buf, const char *name, int namelen, * * \retval 0 on success * \retval -ve on error + * \retval +1 reach the end of entry */ static int osd_ldiskfs_it_fill(const struct lu_env *env, const struct dt_it *di) @@ -4889,6 +4890,8 @@ static int osd_ldiskfs_it_fill(const struct lu_env *env, /*If it does not get any dirent, it means it has been reached *to the end of the dir */ it->oie_file.f_pos = ldiskfs_get_htree_eof(&it->oie_file); + if (rc == 0) + rc = 1; } else { it->oie_dirent = it->oie_buf; it->oie_it_dirent = 1; @@ -5402,6 +5405,27 @@ static inline int osd_it_ea_rec(const struct lu_env *env, } /** + * Returns the record size size at current position. + * + * This function will return record(lu_dirent) size in bytes. + * + * \param[in] env execution environment + * \param[in] di iterator's in memory structure + * \param[in] attr attribute of the entry, only requires LUDA_TYPE to + * calculate the lu_dirent size. + * + * \retval record size(in bytes & in memory) of the current lu_dirent + * entry. + */ +static int osd_it_ea_rec_size(const struct lu_env *env, const struct dt_it *di, + __u32 attr) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + + return lu_dirent_calc_size(it->oie_dirent->oied_namelen, attr); +} + +/** * Returns a cookie for current position of the iterator head, so that * user can use this cookie to load/start the iterator next time. * @@ -5436,6 +5460,9 @@ static int osd_it_ea_load(const struct lu_env *env, it->oie_file.f_pos = hash; rc = osd_ldiskfs_it_fill(env, di); + if (rc > 0) + rc = -ENODATA; + if (rc == 0) rc = +1; @@ -5476,23 +5503,24 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826) */ static const struct dt_index_operations osd_index_ea_ops = { - .dio_lookup = osd_index_ea_lookup, - .dio_declare_insert = osd_index_declare_ea_insert, - .dio_insert = osd_index_ea_insert, - .dio_declare_delete = osd_index_declare_ea_delete, - .dio_delete = osd_index_ea_delete, - .dio_it = { - .init = osd_it_ea_init, - .fini = osd_it_ea_fini, - .get = osd_it_ea_get, - .put = osd_it_ea_put, - .next = osd_it_ea_next, - .key = osd_it_ea_key, - .key_size = osd_it_ea_key_size, - .rec = osd_it_ea_rec, - .store = osd_it_ea_store, - .load = osd_it_ea_load - } + .dio_lookup = osd_index_ea_lookup, + .dio_declare_insert = osd_index_declare_ea_insert, + .dio_insert = osd_index_ea_insert, + .dio_declare_delete = osd_index_declare_ea_delete, + .dio_delete = osd_index_ea_delete, + .dio_it = { + .init = osd_it_ea_init, + .fini = osd_it_ea_fini, + .get = osd_it_ea_get, + .put = osd_it_ea_put, + .next = osd_it_ea_next, + .key = osd_it_ea_key, + .key_size = osd_it_ea_key_size, + .rec = osd_it_ea_rec, + .rec_size = osd_it_ea_rec_size, + .store = osd_it_ea_store, + .load = osd_it_ea_load + } }; static void *osd_key_init(const struct lu_context *ctx, @@ -5568,6 +5596,43 @@ static int osd_device_init(const struct lu_env *env, struct lu_device *d, return osd_procfs_init(osd, name); } +static int osd_fid_init(const struct lu_env *env, struct osd_device *osd) +{ + struct seq_server_site *ss = osd_seq_site(osd); + int rc; + ENTRY; + + if (osd->od_is_ost || osd->od_cl_seq != NULL) + RETURN(0); + + if (unlikely(ss == NULL)) + RETURN(-ENODEV); + + OBD_ALLOC_PTR(osd->od_cl_seq); + if (osd->od_cl_seq == NULL) + RETURN(-ENOMEM); + + rc = seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA, + osd->od_svname, ss->ss_server_seq); + + if (rc != 0) { + OBD_FREE_PTR(osd->od_cl_seq); + osd->od_cl_seq = NULL; + } + + RETURN(rc); +} + +static void osd_fid_fini(const struct lu_env *env, struct osd_device *osd) +{ + if (osd->od_cl_seq == NULL) + return; + + seq_client_fini(osd->od_cl_seq); + OBD_FREE_PTR(osd->od_cl_seq); + osd->od_cl_seq = NULL; +} + static int osd_shutdown(const struct lu_env *env, struct osd_device *o) { ENTRY; @@ -5578,6 +5643,8 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o) o->od_quota_slave = NULL; } + osd_fid_fini(env, o); + RETURN(0); } @@ -5973,13 +6040,26 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev, int result = 0; ENTRY; - if (osd->od_quota_slave != NULL) + if (osd->od_quota_slave != NULL) { /* set up quota slave objects */ result = qsd_prepare(env, osd->od_quota_slave); + if (result != 0) + RETURN(result); + } + + result = osd_fid_init(env, osd); RETURN(result); } +int osd_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) +{ + struct osd_device *osd = osd_dev(exp->exp_obd->obd_lu_dev); + + return seq_client_alloc_fid(env, osd->od_cl_seq, fid); +} + static const struct lu_object_operations osd_lu_obj_ops = { .loo_object_init = osd_object_init, .loo_object_delete = osd_object_delete, @@ -6023,7 +6103,8 @@ struct lu_device_type osd_device_type = { static struct obd_ops osd_obd_device_ops = { .o_owner = THIS_MODULE, .o_connect = osd_obd_connect, - .o_disconnect = osd_obd_disconnect + .o_disconnect = osd_obd_disconnect, + .o_fid_alloc = osd_fid_alloc, }; static int __init osd_mod_init(void) diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index e1bde19..a5f092e 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -275,6 +275,9 @@ struct osd_device { /* quota slave instance */ struct qsd_instance *od_quota_slave; + + /* osd seq instance */ + struct lu_client_seq *od_cl_seq; }; /* There are at most 10 uid/gids are affected in a transaction, and diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index 0dc8c2e..df174d1 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -878,6 +878,14 @@ static void osd_type_stop(struct lu_device_type *t) { } +int osd_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) +{ + struct osd_device *osd = osd_dev(exp->exp_obd->obd_lu_dev); + + return seq_client_alloc_fid(env, osd->od_cl_seq, fid); +} + static struct lu_device_type_operations osd_device_type_ops = { .ldto_init = osd_type_init, .ldto_fini = osd_type_fini, @@ -903,7 +911,8 @@ static struct lu_device_type osd_device_type = { static struct obd_ops osd_obd_device_ops = { .o_owner = THIS_MODULE, .o_connect = osd_obd_connect, - .o_disconnect = osd_obd_disconnect + .o_disconnect = osd_obd_disconnect, + .o_fid_alloc = osd_fid_alloc }; int __init osd_init(void) diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index ee95d01..dd1acf3 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -969,6 +969,44 @@ out: RETURN(rc); } +static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di, + __u32 attr) +{ + struct osd_zap_it *it = (struct osd_zap_it *)di; + zap_attribute_t *za = &osd_oti_get(env)->oti_za; + int rc, namelen = 0; + ENTRY; + + if (it->ozi_pos <= 1) + namelen = cpu_to_le16(1); + else if (it->ozi_pos == 2) + namelen = cpu_to_le16(2); + + if (namelen > 0) { + rc = lu_dirent_calc_size(namelen, attr); + RETURN(rc); + } + + rc = -zap_cursor_retrieve(it->ozi_zc, za); + if (unlikely(rc != 0)) + RETURN(rc); + + if (za->za_integer_length != 8 || za->za_num_integers < 3) { + CERROR("%s: unsupported direntry format: %d %d\n", + osd_obj2dev(it->ozi_obj)->od_svname, + za->za_integer_length, (int)za->za_num_integers); + RETURN(-EIO); + } + + namelen = strlen(za->za_name); + if (namelen > NAME_MAX) + RETURN(-EOVERFLOW); + + rc = lu_dirent_calc_size(namelen, attr); + + RETURN(rc); +} + static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di) { struct osd_zap_it *it = (struct osd_zap_it *)di; @@ -1035,6 +1073,7 @@ static struct dt_index_operations osd_dir_ops = { .key = osd_dir_it_key, .key_size = osd_dir_it_key_size, .rec = osd_dir_it_rec, + .rec_size = osd_dir_it_rec_size, .store = osd_dir_it_store, .load = osd_dir_it_load } diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index 63f680c..a621c32 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -298,6 +298,9 @@ struct osd_device { struct osd_zfs_acct_txg *od_acct_delta; cfs_hash_t *od_acct_usr; cfs_hash_t *od_acct_grp; + + /* osd seq instance */ + struct lu_client_seq *od_cl_seq; }; struct osd_object { diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 38d2de1..0a1cafa 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -1158,14 +1158,20 @@ static int osp_obd_get_info(const struct lu_env *env, struct obd_export *exp, RETURN(rc); } -int osp_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +int osp_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { - struct client_obd *cli = &exp->exp_obd->u.cli; - struct lu_client_seq *seq = cli->cl_seq; - + struct client_obd *cli = &exp->exp_obd->u.cli; + struct osp_device *osp = lu2osp_dev(exp->exp_obd->obd_lu_dev); + struct lu_client_seq *seq = cli->cl_seq; ENTRY; - RETURN(seq_client_alloc_fid(NULL, seq, fid)); + + LASSERT(osp->opd_obd->u.cli.cl_seq != NULL); + /* Sigh, fid client is not ready yet */ + if (osp->opd_obd->u.cli.cl_seq->lcs_exp == NULL) + RETURN(-ENOTCONN); + + RETURN(seq_client_alloc_fid(env, seq, fid)); } /* context key constructor/destructor: mdt_key_init, mdt_key_fini */ diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index c90e029..e220914 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -236,7 +236,6 @@ struct osp_object { struct dt_object opo_obj; unsigned int opo_reserved:1, opo_new:1, - opo_empty:1, opo_non_exist:1; /* read/write lock for md osp object */ @@ -274,6 +273,22 @@ struct osp_thread_info { struct obdo osi_obdo; }; +/* Iterator for OSP */ +struct osp_it { + __u32 ooi_pos_page; + __u32 ooi_pos_lu_page; + int ooi_pos_ent; + int ooi_total_npages; + int ooi_valid_npages; + unsigned int ooi_swab:1; + __u64 ooi_next; + struct dt_object *ooi_obj; + void *ooi_ent; + struct page *ooi_cur_page; + struct lu_idxpage *ooi_cur_idxpage; + struct page **ooi_pages; +}; + /* The transaction only include the updates on the remote node, and * no local updates at all */ static inline bool is_only_remote_trans(struct thandle *th) @@ -514,6 +529,12 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th, struct lustre_capa *capa); +int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th); +int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th, + struct lustre_capa *capa); + int osp_declare_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th); int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, @@ -522,6 +543,16 @@ int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th); +struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt, + __u32 attr, struct lustre_capa *capa); +void osp_it_fini(const struct lu_env *env, struct dt_it *di); +int osp_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key); +void osp_it_put(const struct lu_env *env, struct dt_it *di); +__u64 osp_it_store(const struct lu_env *env, const struct dt_it *di); +int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void *key_rec); +int osp_it_next_page(const struct lu_env *env, struct dt_it *di); /* osp_md_object.c */ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object *dt, @@ -537,6 +568,7 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th, struct lustre_capa *capa); +extern const struct dt_index_operations osp_md_index_ops; /* osp_precreate.c */ int osp_init_precreate(struct osp_device *d); diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index e331bfa..26f8e32 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -132,8 +132,6 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); - CDEBUG(D_INFO, "create object "DFID"\n", PFID(&dt->do_lu.lo_header->loh_fid)); @@ -141,7 +139,6 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, * if creation reaches here, it means the object has been created * successfully */ dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); - obj->opo_empty = 1; return 0; } @@ -478,51 +475,41 @@ static int osp_md_index_delete(const struct lu_env *env, return 0; } -/** - * Creates or initializes iterator context. - * - * Note: for OSP, these index iterate api is only used to check - * whether the directory is empty now (see mdd_dir_is_empty). - * Since dir_empty will be return by OUT_ATTR_GET(see osp_attr_get/ - * out_attr_get). So the implementation of these iterator is simplied - * to make mdd_dir_is_empty happy. The real iterator should be - * implemented, if we need it one day. - */ -static struct dt_it *osp_it_init(const struct lu_env *env, - struct dt_object *dt, - __u32 attr, - struct lustre_capa *capa) -{ - lu_object_get(&dt->do_lu); - return (struct dt_it *)dt; -} - -static void osp_it_fini(const struct lu_env *env, struct dt_it *di) +int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) { - struct dt_object *dt = (struct dt_object *)di; - lu_object_put(env, &dt->do_lu); -} - -static int osp_it_get(const struct lu_env *env, - struct dt_it *di, const struct dt_key *key) -{ - return 1; -} - -static void osp_it_put(const struct lu_env *env, struct dt_it *di) -{ - return; -} + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + int rc; + ENTRY; -static int osp_it_next(const struct lu_env *env, struct dt_it *di) -{ - struct dt_object *dt = (struct dt_object *)di; - struct osp_object *o = dt2osp_obj(dt); +again: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos_ent++; + if (ent == NULL) { + it->ooi_ent = + (struct lu_dirent *)idxpage->lip_entries; + RETURN(0); + } else if (le16_to_cpu(ent->lde_reclen) != 0 && + it->ooi_pos_ent < idxpage->lip_nr) { + ent = (struct lu_dirent *)(((char *)ent) + + le16_to_cpu(ent->lde_reclen)); + it->ooi_ent = ent; + RETURN(0); + } else { + it->ooi_ent = NULL; + } + } - if (o->opo_empty) - return 1; + rc = osp_it_next_page(env, di); + if (rc == 0) + goto again; - return 0; + RETURN(rc); } static struct dt_key *osp_it_key(const struct lu_env *env, @@ -538,16 +525,15 @@ static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di) return 0; } -static int osp_it_rec(const struct lu_env *env, const struct dt_it *di, - struct dt_rec *lde, __u32 attr) +static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) { - LBUG(); - return 0; -} + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + int reclen; -static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) -{ - LBUG(); + reclen = lu_dirent_calc_size(ent->lde_namelen, attr); + memcpy(rec, ent, reclen); return 0; } @@ -558,14 +544,7 @@ static int osp_it_load(const struct lu_env *env, const struct dt_it *di, return 0; } -static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void *key_rec) -{ - LBUG(); - return 0; -} - -static const struct dt_index_operations osp_md_index_ops = { +const struct dt_index_operations osp_md_index_ops = { .dio_lookup = osp_md_index_lookup, .dio_declare_insert = osp_md_declare_insert, .dio_insert = osp_md_index_insert, @@ -576,10 +555,10 @@ static const struct dt_index_operations osp_md_index_ops = { .fini = osp_it_fini, .get = osp_it_get, .put = osp_it_put, - .next = osp_it_next, + .next = osp_md_index_it_next, .key = osp_it_key, .key_size = osp_it_key_size, - .rec = osp_it_rec, + .rec = osp_md_index_it_rec, .store = osp_it_store, .load = osp_it_load, .key_rec = osp_it_key_rec, @@ -665,6 +644,8 @@ struct dt_object_operations osp_md_obj_ops = { .do_xattr_get = osp_xattr_get, .do_declare_xattr_set = osp_declare_xattr_set, .do_xattr_set = osp_xattr_set, + .do_declare_xattr_del = osp_declare_xattr_del, + .do_xattr_del = osp_xattr_del, .do_index_try = osp_md_index_try, .do_object_lock = osp_md_object_lock, .do_object_unlock = osp_md_object_unlock, diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index f30c9d3..efd2a3d 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -370,13 +370,6 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, if (rc != 0) GOTO(out, rc); - if (!is_ost_obj(&dt->do_lu)) { - if (attr->la_flags == 1) - obj->opo_empty = 0; - else - obj->opo_empty = 1; - } - GOTO(out, rc = 0); out: @@ -871,6 +864,36 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, return 0; } +int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) +{ + struct dt_update_request *update; + const struct lu_fid *fid; + int size = strlen(name); + int rc; + + update = out_find_create_update_loc(th, dt); + if (IS_ERR(update)) + return PTR_ERR(update); + + fid = lu_object_fid(&dt->do_lu); + + rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size, + (const char **)&name); + + return rc; +} + +int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th, + struct lustre_capa *capa) +{ + CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name, + PFID(&dt->do_lu.lo_header->loh_fid)); + + return 0; +} + static int osp_declare_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -1084,21 +1107,6 @@ int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -struct osp_orphan_it { - int ooi_pos0; - int ooi_pos1; - int ooi_pos2; - int ooi_total_npages; - int ooi_valid_npages; - unsigned int ooi_swab:1; - __u64 ooi_next; - struct dt_object *ooi_obj; - struct lu_orphan_ent *ooi_ent; - struct page *ooi_cur_page; - struct lu_idxpage *ooi_cur_idxpage; - struct page **ooi_pages; -}; - static int osp_orphan_index_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_rec *rec, @@ -1145,30 +1153,27 @@ static int osp_orphan_index_delete(const struct lu_env *env, return -EOPNOTSUPP; } -static struct dt_it *osp_orphan_it_init(const struct lu_env *env, - struct dt_object *dt, - __u32 attr, - struct lustre_capa *capa) +struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt, + __u32 attr, struct lustre_capa *capa) { - struct osp_orphan_it *it; + struct osp_it *it; OBD_ALLOC_PTR(it); if (it == NULL) return ERR_PTR(-ENOMEM); - it->ooi_pos2 = -1; + it->ooi_pos_ent = -1; it->ooi_obj = dt; return (struct dt_it *)it; } -static void osp_orphan_it_fini(const struct lu_env *env, - struct dt_it *di) +void osp_it_fini(const struct lu_env *env, struct dt_it *di) { - struct osp_orphan_it *it = (struct osp_orphan_it *)di; - struct page **pages = it->ooi_pages; - int npages = it->ooi_total_npages; - int i; + struct osp_it *it = (struct osp_it *)di; + struct page **pages = it->ooi_pages; + int npages = it->ooi_total_npages; + int i; if (pages != NULL) { for (i = 0; i < npages; i++) { @@ -1185,8 +1190,7 @@ static void osp_orphan_it_fini(const struct lu_env *env, OBD_FREE_PTR(it); } -static int osp_orphan_it_fetch(const struct lu_env *env, - struct osp_orphan_it *it) +static int osp_it_fetch(const struct lu_env *env, struct osp_it *it) { struct lu_device *dev = it->ooi_obj->do_lu.lo_dev; struct osp_device *osp = lu2osp_dev(dev); @@ -1227,6 +1231,27 @@ static int osp_orphan_it_fetch(const struct lu_env *env, } req->rq_request_portal = OUT_PORTAL; + ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO); + memset(ii, 0, sizeof(*ii)); + if (fid_is_last_id(lu_object_fid(&it->ooi_obj->do_lu))) { + /* LFSCK will iterate orphan object[FID_SEQ_LAYOUT_BTREE, + * ost_index, 0] with LAST_ID FID, so it needs to replace + * the FID with orphan FID here */ + ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE; + ii->ii_fid.f_oid = osp->opd_index; + ii->ii_fid.f_ver = 0; + ii->ii_flags = II_FL_NOHASH; + } else { + ii->ii_fid = *lu_object_fid(&it->ooi_obj->do_lu); + ii->ii_flags = II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY | + II_FL_VARREC; + } + ii->ii_magic = IDX_INFO_MAGIC; + ii->ii_count = npages * LU_PAGE_COUNT; + ii->ii_hash_start = it->ooi_next; + ii->ii_attrs = + osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; + ptlrpc_at_set_req_timeout(req); desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, @@ -1239,18 +1264,6 @@ static int osp_orphan_it_fetch(const struct lu_env *env, for (i = 0; i < npages; i++) ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); - ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO); - memset(ii, 0, sizeof(*ii)); - ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE; - ii->ii_fid.f_oid = osp->opd_index; - ii->ii_fid.f_ver = 0; - ii->ii_magic = IDX_INFO_MAGIC; - ii->ii_flags = II_FL_NOHASH; - ii->ii_count = npages * LU_PAGE_COUNT; - ii->ii_hash_start = it->ooi_next; - ii->ii_attrs = - osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; - ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (rc != 0) @@ -1260,6 +1273,7 @@ static int osp_orphan_it_fetch(const struct lu_env *env, req->rq_bulk->bd_nob_transferred); if (rc < 0) GOTO(out, rc); + rc = 0; ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO); if (ii->ii_magic != IDX_INFO_MAGIC) @@ -1279,22 +1293,19 @@ static int osp_orphan_it_fetch(const struct lu_env *env, it->ooi_next = ii->ii_hash_end; - GOTO(out, rc = 0); - out: ptlrpc_req_finished(req); return rc; } -static int osp_orphan_it_next(const struct lu_env *env, - struct dt_it *di) +int osp_it_next_page(const struct lu_env *env, struct dt_it *di) { - struct osp_orphan_it *it = (struct osp_orphan_it *)di; - struct lu_idxpage *idxpage; + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; struct page **pages; - int rc; - int i; + int rc; + int i; ENTRY; again2: @@ -1303,23 +1314,17 @@ again2: if (idxpage->lip_nr == 0) RETURN(1); - it->ooi_pos2++; - if (it->ooi_pos2 < idxpage->lip_nr) { - it->ooi_ent = - (struct lu_orphan_ent *)idxpage->lip_entries + - it->ooi_pos2; - if (it->ooi_swab) - lustre_swab_orphan_ent(it->ooi_ent); + if (it->ooi_pos_ent < idxpage->lip_nr) { + CDEBUG(D_INFO, "ooi_pos %d nr %d\n", + (int)it->ooi_pos_ent, (int)idxpage->lip_nr); RETURN(0); } - it->ooi_cur_idxpage = NULL; - it->ooi_pos1++; - + it->ooi_pos_lu_page++; again1: - if (it->ooi_pos1 < LU_PAGE_COUNT) { + if (it->ooi_pos_lu_page < LU_PAGE_COUNT) { it->ooi_cur_idxpage = (void *)it->ooi_cur_page + - LU_PAGE_SIZE * it->ooi_pos1; + LU_PAGE_SIZE * it->ooi_pos_lu_page; if (it->ooi_swab) lustre_swab_lip_header(it->ooi_cur_idxpage); if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) { @@ -1330,24 +1335,25 @@ again1: "%d/%d while read layout orphan index\n", osp->opd_obd->obd_name, it->ooi_cur_idxpage->lip_magic, - LIP_MAGIC, it->ooi_pos0, it->ooi_pos1); + LIP_MAGIC, it->ooi_pos_page, + it->ooi_pos_lu_page); /* Skip this lu_page next time. */ - it->ooi_pos2 = idxpage->lip_nr - 1; + it->ooi_pos_ent = idxpage->lip_nr - 1; RETURN(-EINVAL); } - it->ooi_pos2 = -1; + it->ooi_pos_ent = -1; goto again2; } kunmap(it->ooi_cur_page); it->ooi_cur_page = NULL; - it->ooi_pos0++; + it->ooi_pos_page++; again0: pages = it->ooi_pages; - if (it->ooi_pos0 < it->ooi_valid_npages) { - it->ooi_cur_page = kmap(pages[it->ooi_pos0]); - it->ooi_pos1 = 0; + if (it->ooi_pos_page < it->ooi_valid_npages) { + it->ooi_cur_page = kmap(pages[it->ooi_pos_page]); + it->ooi_pos_lu_page = 0; goto again1; } @@ -1357,7 +1363,7 @@ again0: } OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages)); - it->ooi_pos0 = 0; + it->ooi_pos_page = 0; it->ooi_total_npages = 0; it->ooi_valid_npages = 0; it->ooi_swab = 0; @@ -1370,30 +1376,59 @@ again0: if (it->ooi_next == II_END_OFF) RETURN(1); - rc = osp_orphan_it_fetch(env, it); + rc = osp_it_fetch(env, it); if (rc == 0) goto again0; RETURN(rc); } -static int osp_orphan_it_get(const struct lu_env *env, - struct dt_it *di, - const struct dt_key *key) +int osp_orphan_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + int rc; + ENTRY; + +again: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos_ent++; + if (it->ooi_pos_ent < idxpage->lip_nr) { + it->ooi_ent = + (struct lu_orphan_ent *)idxpage->lip_entries + + it->ooi_pos_ent; + if (it->ooi_swab) + lustre_swab_orphan_ent(it->ooi_ent); + RETURN(0); + } + } + + rc = osp_it_next_page(env, di); + if (rc == 0) + goto again; + + RETURN(rc); +} + +int osp_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) { - return -ENOSYS; + return 1; } -static void osp_orphan_it_put(const struct lu_env *env, - struct dt_it *di) +void osp_it_put(const struct lu_env *env, struct dt_it *di) { } -static struct dt_key *osp_orphan_it_key(const struct lu_env *env, - const struct dt_it *di) +struct dt_key *osp_orphan_it_key(const struct lu_env *env, + const struct dt_it *di) { - struct osp_orphan_it *it = (struct osp_orphan_it *)di; - struct lu_orphan_ent *ent = it->ooi_ent; + struct osp_it *it = (struct osp_it *)di; + struct lu_orphan_ent *ent = (struct lu_orphan_ent *)it->ooi_ent; if (likely(ent != NULL)) return (struct dt_key *)(&ent->loe_key); @@ -1401,19 +1436,16 @@ static struct dt_key *osp_orphan_it_key(const struct lu_env *env, return NULL; } -static int osp_orphan_it_key_size(const struct lu_env *env, - const struct dt_it *di) +int osp_orphan_it_key_size(const struct lu_env *env, const struct dt_it *di) { return sizeof(struct lu_fid); } -static int osp_orphan_it_rec(const struct lu_env *env, - const struct dt_it *di, - struct dt_rec *rec, - __u32 attr) +int osp_orphan_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) { - struct osp_orphan_it *it = (struct osp_orphan_it *)di; - struct lu_orphan_ent *ent = it->ooi_ent; + struct osp_it *it = (struct osp_it *)di; + struct lu_orphan_ent *ent = (struct lu_orphan_ent *)it->ooi_ent; if (likely(ent != NULL)) { *(struct lu_orphan_rec *)rec = ent->loe_rec; @@ -1423,10 +1455,9 @@ static int osp_orphan_it_rec(const struct lu_env *env, return -EINVAL; } -static __u64 osp_orphan_it_store(const struct lu_env *env, - const struct dt_it *di) +__u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) { - struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct osp_it *it = (struct osp_it *)di; return it->ooi_next; } @@ -1437,11 +1468,10 @@ static __u64 osp_orphan_it_store(const struct lu_env *env, * call next() to move to a valid position. * \retval -ve: on error */ -static int osp_orphan_it_load(const struct lu_env *env, - const struct dt_it *di, - __u64 hash) +int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di, + __u64 hash) { - struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct osp_it *it = (struct osp_it *)di; int rc; it->ooi_next = hash; @@ -1455,9 +1485,8 @@ static int osp_orphan_it_load(const struct lu_env *env, return rc; } -static int osp_orphan_it_key_rec(const struct lu_env *env, - const struct dt_it *di, - void *key_rec) +int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void *key_rec) { return 0; } @@ -1469,17 +1498,17 @@ static const struct dt_index_operations osp_orphan_index_ops = { .dio_declare_delete = osp_orphan_index_declare_delete, .dio_delete = osp_orphan_index_delete, .dio_it = { - .init = osp_orphan_it_init, - .fini = osp_orphan_it_fini, + .init = osp_it_init, + .fini = osp_it_fini, .next = osp_orphan_it_next, - .get = osp_orphan_it_get, - .put = osp_orphan_it_put, + .get = osp_it_get, + .put = osp_it_put, .key = osp_orphan_it_key, .key_size = osp_orphan_it_key_size, .rec = osp_orphan_it_rec, - .store = osp_orphan_it_store, + .store = osp_it_store, .load = osp_orphan_it_load, - .key_rec = osp_orphan_it_key_rec, + .key_rec = osp_it_key_rec, } }; @@ -1489,13 +1518,11 @@ static int osp_index_try(const struct lu_env *env, { const struct lu_fid *fid = lu_object_fid(&dt->do_lu); - if (fid_is_last_id(fid) && fid_is_idif(fid)) { + if (fid_is_last_id(fid) && fid_is_idif(fid)) dt->do_index_ops = &osp_orphan_index_ops; - - return 0; - } - - return -EINVAL; + else + dt->do_index_ops = &osp_md_index_ops; + return 0; } struct dt_object_operations osp_obj_ops = { diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 40e5ce3..b9c9322 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -81,7 +81,7 @@ out: #define TX_ALLOC_STEP 8 static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func, tx_exec_func_t undo, - char *file, int line) + const char *file, int line) { int rc; int i; @@ -210,7 +210,7 @@ static int __out_tx_create(const struct lu_env *env, struct dt_object *obj, struct dt_object_format *dof, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -333,7 +333,7 @@ static int __out_tx_attr_set(const struct lu_env *env, const struct lu_attr *attr, struct thandle_exec_args *th, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -417,42 +417,6 @@ static int out_attr_get(struct tgt_session_info *tsi) rc = dt_attr_get(env, obj, la, NULL); if (rc) GOTO(out_unlock, rc); - /* - * If it is a directory, we will also check whether the - * directory is empty. - * la_flags = 0 : Empty. - * = 1 : Not empty. - */ - la->la_flags = 0; - if (S_ISDIR(la->la_mode)) { - struct dt_it *it; - const struct dt_it_ops *iops; - - if (!dt_try_as_dir(env, obj)) - GOTO(out_unlock, rc = -ENOTDIR); - - iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); - if (!IS_ERR(it)) { - int result; - result = iops->get(env, it, (const void *)""); - if (result > 0) { - int i; - for (result = 0, i = 0; result == 0 && i < 3; - ++i) - result = iops->next(env, it); - if (result == 0) - la->la_flags = 1; - } else if (result == 0) - /* - * Huh? Index contains no zero key? - */ - rc = -EIO; - - iops->put(env, it); - iops->fini(env, it); - } - } obdo->o_valid = 0; obdo_from_la(obdo, la, la->la_valid); @@ -624,7 +588,7 @@ static int __out_tx_xattr_set(const struct lu_env *env, const char *name, int flags, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -699,6 +663,79 @@ static int out_xattr_set(struct tgt_session_info *tsi) RETURN(rc); } +static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n", + dt_obd_name(th->th_dev), arg->u.xattr_set.name, + PFID(lu_object_fid(&dt_obj->do_lu))); + + if (!lu_object_exists(&dt_obj->do_lu)) + GOTO(out, rc = -ENOENT); + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name, + th, NULL); + dt_write_unlock(env, dt_obj); +out: + CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); + + return rc; +} + +static int __out_tx_xattr_del(const struct lu_env *env, + struct dt_object *dt_obj, const char *name, + struct thandle_exec_args *ta, + struct object_update_reply *reply, + int index, const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_xattr_del(env, dt_obj, name, ta->ta_handle); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->u.xattr_set.name = name; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_xattr_del(struct tgt_session_info *tsi) +{ + struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); + struct object_update *update = tti->tti_u.update.tti_update; + struct dt_object *obj = tti->tti_u.update.tti_dt_object; + char *name; + int rc; + ENTRY; + + name = object_update_param_get(update, 0, NULL); + if (name == NULL) { + CERROR("%s: empty name for xattr set: rc = %d\n", + tgt_name(tsi->tsi_tgt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea, + tti->tti_u.update.tti_update_reply, + tti->tti_u.update.tti_update_reply_index); + RETURN(rc); +} + static int out_obj_ref_add(const struct lu_env *env, struct dt_object *dt_obj, struct thandle *th) @@ -750,7 +787,7 @@ static int __out_tx_ref_add(const struct lu_env *env, struct dt_object *dt_obj, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -815,7 +852,7 @@ static int __out_tx_ref_del(const struct lu_env *env, struct dt_object *dt_obj, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -925,7 +962,7 @@ static int __out_tx_index_insert(const struct lu_env *env, char *name, struct lu_fid *fid, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -1027,7 +1064,7 @@ static int __out_tx_index_delete(const struct lu_env *env, struct dt_object *dt_obj, char *name, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -1107,7 +1144,7 @@ static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th, static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -1179,7 +1216,7 @@ static int __out_tx_write(const struct lu_env *env, const struct lu_buf *buf, loff_t pos, struct thandle_exec_args *ta, struct object_update_reply *reply, - int index, char *file, int line) + int index, const char *file, int line) { struct tx_arg *arg; int rc; @@ -1269,6 +1306,8 @@ static struct tgt_handler out_update_ops[] = { out_attr_get), DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO, out_xattr_set), + DEF_OUT_HNDL(OUT_XATTR_DEL, "out_xattr_del", MUTABOR | HABEO_REFERO, + out_xattr_del), DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO, out_xattr_get), DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO, diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index f0ed071..86436c2 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -55,7 +55,7 @@ struct tx_arg { tx_exec_func_t exec_fn; tx_exec_func_t undo_fn; struct dt_object *object; - char *file; + const char *file; struct object_update_reply *reply; int line; int index; @@ -183,6 +183,10 @@ int out_handle(struct tgt_session_info *tsi); __out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx, \ __FILE__, __LINE__) +#define out_tx_xattr_del(info, obj, name, th, reply, idx) \ + __out_tx_xattr_del(info, obj, name, th, reply, idx, \ + __FILE__, __LINE__) + #define out_tx_ref_add(info, obj, th, reply, idx) \ __out_tx_ref_add(info, obj, th, reply, idx, __FILE__, __LINE__) diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 0229fb0..95f5fce 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -214,7 +214,6 @@ test_4() { local MDTIDX=1 local remote_dir=remote_dir - [ $MDSCOUNT -ge 2 ] && skip "skip now for LU-4690" && return #LU-4690 test_mkdir $DIR/$remote_dir || error "Create remote directory failed" @@ -225,6 +224,8 @@ test_4() { error "Expect error removing in-use dir $DIR/$remote_dir" test -d $DIR/$remote_dir || error "Remote directory disappeared" + + rm -rf $DIR/$remote_dir || error "remove remote dir error" } run_test 4 "mkdir; touch dir/file; rmdir; checkdir (expect error)" @@ -515,7 +516,7 @@ run_test 17h "create objects: lov_free_memmd() doesn't lbug" test_17i() { #bug 20018 remote_mds_nodsh && skip "remote MDS with nodsh" && return [ $PARALLEL == "yes" ] && skip "skip parallel run" && return - test_mkdir -p $DIR/$tdir + test_mkdir -c1 $DIR/$tdir local foo=$DIR/$tdir/$tfile local mdt_idx if [[ $MDSCOUNT -gt 1 ]]; then @@ -664,9 +665,9 @@ test_17n() { [ $PARALLEL == "yes" ] && skip "skip parallel run" && return - mkdir -p $DIR/$tdir + mkdir $DIR/$tdir for ((i=0; i<10; i++)); do - $LFS mkdir -i 1 $DIR/$tdir/remote_dir_${i} || + $LFS mkdir -i1 -c2 $DIR/$tdir/remote_dir_${i} || error "create remote dir error $i" createmany -o $DIR/$tdir/remote_dir_${i}/f 10 || error "create files under remote dir failed $i" @@ -2225,6 +2226,25 @@ test_31o() { # LU-2901 } run_test 31o "duplicate hard links with same filename" +test_31p() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + + mkdir $DIR/$tdir + $LFS setdirstripe -i0 -c2 $DIR/$tdir/striped_dir + $LFS setdirstripe -D -c2 -t all_char $DIR/$tdir/striped_dir + + opendirunlink $DIR/$tdir/striped_dir/test1 || + error "open unlink test1 failed" + opendirunlink $DIR/$tdir/striped_dir/test2 || + error "open unlink test2 failed" + + $CHECKSTAT -a $DIR/$tdir/striped_dir/test1 || + error "test1 still exists" + $CHECKSTAT -a $DIR/$tdir/striped_dir/test2 || + error "test2 still exists" +} +run_test 31p "remove of open striped directory" + cleanup_test32_mount() { trap 0 $UMOUNT -d $DIR/$tdir/ext2-mountpoint @@ -2804,6 +2824,25 @@ test_36h() { } run_test 36h "utime on file racing with OST BRW write ==========" +test_36i() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + + mkdir $DIR/$tdir + $LFS setdirstripe -i0 -c$MDSCOUNT $DIR/$tdir/striped_dir + + local mtime=$(stat -c%Y $DIR/$tdir/striped_dir) + local new_mtime=$((mtime + 200)) + + #change Modify time of striped dir + touch -m -d @$new_mtime $DIR/$tdir/striped_dir || + error "change mtime failed" + + local got=$(stat -c%Y $DIR/$tdir/striped_dir) + + [ "$new_mtime" = "$got" ] || error "expect $new_mtime got $got" +} +run_test 36i "change mtime on striped directory" + # test_37 - duplicate with tests 32q 32r test_38() { @@ -3803,7 +3842,7 @@ run_test 50 "special situations: /proc symlinks ===============" test_51a() { # was test_51 # bug 1516 - create an empty entry right after ".." then split dir - test_mkdir -p $DIR/$tdir + test_mkdir -c1 $DIR/$tdir touch $DIR/$tdir/foo $MCREATE $DIR/$tdir/bar rm $DIR/$tdir/foo @@ -3827,7 +3866,7 @@ test_51b() { # cleanup the directory rm -fr $BASE - test_mkdir -p $BASE + test_mkdir -p -c1 $BASE local mdtidx=$(printf "%04x" $($LFS getstripe -M $BASE)) local numfree=$(lctl get_param -n mdc.$FSNAME-MDT$mdtidx*.filesfree) @@ -12679,6 +12718,50 @@ test_300f() { } run_test 300f "check rename cross striped directory" +test_300g() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + local stripe_count + local dir + + mkdir $DIR/$tdir + $LFS setdirstripe -i 0 -c $MDSCOUNT -t all_char \ + $DIR/$tdir/striped_dir || + error "set striped dir error" + + $LFS setdirstripe -D -c $MDSCOUNT -t all_char $DIR/$tdir/striped_dir || + error "set default stripe on striped dir error" + + mkdir -p $DIR/$tdir/striped_dir/{test1,test2,test3,test4} + + for dir in $(find $DIR/$tdir/striped_dir/*); do + stripe_count=$($LFS getdirstripe -c $dir) + [ $stripe_count -eq $MDSCOUNT ] || + error "expect $MDSCOUNT get $stripe_count for $dir" + done + + rmdir $DIR/$tdir/striped_dir/* || error "rmdir1 failed" + #change default stripe count to 2 + $LFS setdirstripe -D -c 2 -t all_char $DIR/$tdir/striped_dir || + error "set default stripe on striped dir error" + + mkdir -p $DIR/$tdir/striped_dir/{test1,test2,test3,test4} + + rmdir $DIR/$tdir/striped_dir/* || error "rmdir2 failed" + + #change default stripe count to 1 + $LFS setdirstripe -D -c 1 -t all_char $DIR/$tdir/striped_dir || + error "set default stripe on striped dir error" + + mkdir -p $DIR/$tdir/striped_dir/{test1,test2,test3,test4} + for dir in $(find $DIR/$tdir/striped_dir/*); do + stripe_count=$($LFS getdirstripe -c $dir) + [ $stripe_count -eq 1 ] || + error "expect 1 get $stripe_count for $dir" + done + rmdir $DIR/$tdir/striped_dir/* || error "rmdir3 failed" +} +run_test 300g "check default striped directory for striped directory" + # # tests that do cleanup/setup should be run at the end # diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index 203dff3..5e16e22 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -2387,6 +2387,7 @@ void lmv_dump_user_lmm(struct lmv_user_md *lum, char *pool_name, struct lmv_user_mds_data *objects = lum->lum_objects; char *prefix = lum->lum_magic == LMV_USER_MAGIC ? "(Default)" : ""; int i, obdstripe = 0; + char *seperator = ""; if (obdindex != OBD_NOT_FOUND) { for (i = 0; i < lum->lum_stripe_count; i++) { @@ -2413,21 +2414,26 @@ void lmv_dump_user_lmm(struct lmv_user_md *lum, char *pool_name, llapi_printf(LLAPI_MSG_NORMAL, "%s%s\n", prefix, path); if (verbose & VERBOSE_COUNT) { + llapi_printf(LLAPI_MSG_NORMAL, "%s", seperator); if (verbose & ~VERBOSE_COUNT) llapi_printf(LLAPI_MSG_NORMAL, "lmv_stripe_count: "); - llapi_printf(LLAPI_MSG_NORMAL, "%u\n", + llapi_printf(LLAPI_MSG_NORMAL, "%u", (int)lum->lum_stripe_count); + seperator = "\n"; } if (verbose & VERBOSE_OFFSET) { + llapi_printf(LLAPI_MSG_NORMAL, "%s", seperator); if (verbose & ~VERBOSE_OFFSET) llapi_printf(LLAPI_MSG_NORMAL, "lmv_stripe_offset: "); - llapi_printf(LLAPI_MSG_NORMAL, "%d\n", + llapi_printf(LLAPI_MSG_NORMAL, "%d", (int)lum->lum_stripe_offset); + seperator = "\n"; } if (verbose & VERBOSE_OBJID && lum->lum_magic != LMV_USER_MAGIC) { - if ((obdstripe == 1)) + llapi_printf(LLAPI_MSG_NORMAL, "%s", seperator); + if (obdstripe == 1 && lum->lum_stripe_count > 0) llapi_printf(LLAPI_MSG_NORMAL, "mdtidx\t\t FID[seq:oid:ver]\n"); for (i = 0; i < lum->lum_stripe_count; i++) { @@ -2443,12 +2449,16 @@ void lmv_dump_user_lmm(struct lmv_user_md *lum, char *pool_name, } if ((verbose & VERBOSE_POOL) && (pool_name[0] != '\0')) { + llapi_printf(LLAPI_MSG_NORMAL, "%s", seperator); if (verbose & ~VERBOSE_POOL) llapi_printf(LLAPI_MSG_NORMAL, "%slmv_pool: ", prefix); llapi_printf(LLAPI_MSG_NORMAL, "%s%c ", pool_name, ' '); + seperator = "\n"; } - llapi_printf(LLAPI_MSG_NORMAL, "\n"); + + if (!(verbose & VERBOSE_OBJID)) + llapi_printf(LLAPI_MSG_NORMAL, "\n"); } void llapi_lov_dump_user_lmm(struct find_param *param, char *path, int is_dir) @@ -3305,6 +3315,17 @@ static int cb_getstripe(char *path, DIR *parent, DIR **dirp, void *data, lum->lum_stripe_count = 0; lum->lum_stripe_offset = -1; goto dump; + } else if (param->get_lmv) { + struct lmv_user_md *lum = param->fp_lmv_md; + int mdtidx; + + ret = llapi_file_fget_mdtidx(dirfd(d), &mdtidx); + if (ret != 0) + goto err_out; + lum->lum_magic = LMV_MAGIC_V1; + lum->lum_stripe_count = 0; + lum->lum_stripe_offset = mdtidx; + goto dump; } else { struct lov_user_md *lmm = ¶m->lmd->lmd_lmm; lmm->lmm_magic = LOV_USER_MAGIC_V1; @@ -3333,11 +3354,12 @@ static int cb_getstripe(char *path, DIR *parent, DIR **dirp, void *data, __func__, path); } else { ret = -errno; - llapi_error(LLAPI_MSG_ERROR, ret, - "error: %s: %s failed for %s", - __func__, d ? "LL_IOC_LOV_GETSTRIPE" : - "IOC_MDC_GETFILESTRIPE", path); - } +err_out: + llapi_error(LLAPI_MSG_ERROR, ret, + "error: %s: %s failed for %s", + __func__, d ? "LL_IOC_LOV_GETSTRIPE" : + "IOC_MDC_GETFILESTRIPE", path); + } return ret; }