From: Lai Siyao Date: Mon, 21 Mar 2016 14:05:10 +0000 (+0800) Subject: LU-7660 dne: support fs default stripe X-Git-Tag: 2.8.56~71 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=226fd401f9d8bfcd1a71bf264d9baef1e0842441 LU-7660 dne: support fs default stripe On DNE system slave MDTs should honor fs default stripe, to achieve this, slave MDT should lock fs root, and cache XATTR_NAME_LOV, below functionalities are added for this: * take and cache remote root XATTR lock on slave MDTs. * add dt_invalidate operation to invalidate OSP attributes cache, normally it's called in mdt_object_unlock(), but if we cache this lock, it's called upon cross-MDT lock cancel. * enable OSP attribute cache by default. * migrate should invalidate remote target explictely if it's newly created, because it doesn't take its lock beforehand. * OSP should cleanup ldlm namespace in device cleanup because there may be cached locks. * add sanity.sh 405 test for this. Signed-off-by: Lai Siyao Change-Id: I90e5f35f3b6a294a2a559c28df267f92ce188b9c Reviewed-on: http://review.whamcloud.com/19041 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index c69674f..ebabaa6 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -1015,6 +1015,20 @@ struct dt_object_operations { struct dt_object *dt, struct ldlm_enqueue_info *einfo, union ldlm_policy_data *policy); + + /** + * Invalidate attribute cache. + * + * This method invalidate attribute cache of the object, which is on OSP + * only. + * + * \param[in] env execution envionment for this thread + * \param[in] dt object + * + * \retval 0 on success + * \retval negative negated errno on error + */ + int (*do_invalidate)(const struct lu_env *env, struct dt_object *dt); }; /** @@ -2612,6 +2626,15 @@ static inline int dt_xattr_list(const struct lu_env *env, struct dt_object *dt, return dt->do_ops->do_xattr_list(env, dt, buf); } +static inline int dt_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_invalidate); + + return dt->do_ops->do_invalidate(env, dt); +} + static inline int dt_declare_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 49dd2e6..c34c6e2 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -214,6 +214,8 @@ struct md_object_operations { struct md_object *obj, struct ldlm_enqueue_info *einfo, union ldlm_policy_data *policy); + + int (*moo_invalidate)(const struct lu_env *env, struct md_object *obj); }; /** @@ -402,6 +404,12 @@ static inline int mo_xattr_list(const struct lu_env *env, return m->mo_ops->moo_xattr_list(env, m, buf); } +static inline int mo_invalidate(const struct lu_env *env, struct md_object *m) +{ + LASSERT(m->mo_ops->moo_invalidate); + return m->mo_ops->moo_invalidate(env, m); +} + static inline int mo_swap_layouts(const struct lu_env *env, struct md_object *o1, struct md_object *o2, __u64 flags) diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 7d23aae..c3d67e4 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -975,6 +975,11 @@ static int lod_process_config(const struct lu_env *env, GOTO(out, rc); } case LCFG_PRE_CLEANUP: { + if (lod->lod_md_root != NULL) { + lu_object_put(env, &lod->lod_md_root->ldo_obj.do_lu); + lod->lod_md_root = NULL; + } + lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg); lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg); next = &lod->lod_child->dd_lu_dev; @@ -1562,7 +1567,7 @@ static int lod_init0(const struct lu_env *env, struct lod_device *lod, if (rc) GOTO(out_pools, rc); - spin_lock_init(&lod->lod_desc_lock); + spin_lock_init(&lod->lod_lock); spin_lock_init(&lod->lod_connects_lock); lod_tgt_desc_init(&lod->lod_mdt_descs); lod_tgt_desc_init(&lod->lod_ost_descs); diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index 42b8ef4..d831ea9 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -180,8 +180,8 @@ struct lod_device { /* lov settings descriptor storing static information */ struct lov_desc lod_desc; - /* use to protect ld_active_tgt_count and all ltd_active */ - spinlock_t lod_desc_lock; + /* protect ld_active_tgt_count, ltd_active and lod_md_root */ + spinlock_t lod_lock; /* Description of OST */ struct lod_tgt_descs lod_ost_descs; @@ -210,6 +210,9 @@ struct lod_device { enum lustre_sec_part lod_sp_me; struct proc_dir_entry *lod_symlink; + + /* ROOT object, used to fetch FS default striping */ + struct lod_object *lod_md_root; }; #define lod_osts lod_ost_descs.ltd_tgts @@ -226,58 +229,77 @@ struct lod_device { #define ltd_mdt ltd_tgt #define lod_mdt_desc lod_tgt_desc -struct lod_dir_stripe_info { - __u32 ldsi_stripe_offset; - __u32 ldsi_def_stripenr; - __u32 ldsi_def_stripe_offset; - __u32 ldsi_def_hash_type; - __u32 ldsi_hash_type; - - unsigned int ldsi_def_striping_set:1, - ldsi_def_striping_cached:1, - ldsi_striped:1; +struct lod_default_striping { + /* default LOV */ + __u32 lds_def_stripe_size; + __u16 lds_def_stripenr; + __u16 lds_def_stripe_offset; + char lds_def_pool[LOV_MAXPOOLNAME + 1]; + /* default LMV */ + __u32 lds_dir_def_stripenr; + __u32 lds_dir_def_stripe_offset; + __u32 lds_dir_def_hash_type; + /* flags whether default striping is set */ + __u32 lds_def_striping_set:1, + lds_dir_def_striping_set:1; }; -/* - * XXX: shrink this structure, currently it's 72bytes on 32bit arch, - * so, slab will be allocating 128bytes - */ struct lod_object { - struct dt_object ldo_obj; - - /* if object is striped, then the next fields describe stripes */ - /* For striped directory, ldo_stripenr == slave stripe count */ - __u16 ldo_stripenr; - __u16 ldo_layout_gen; - __u32 ldo_stripe_size; - __u32 ldo_pattern; - __u16 ldo_released_stripenr; - char *ldo_pool; - struct dt_object **ldo_stripe; - /* to know how much memory to free, ldo_stripenr can be less */ - /* default striping for directory represented by this object - * is cached in stripenr/stripe_size */ - unsigned int ldo_stripes_allocated:16, - ldo_striping_cached:1, - ldo_def_striping_set:1, - ldo_def_striping_cached:1, - /* ldo_dir_slave_stripe indicate this is a slave stripe of - * a striped dir */ - ldo_dir_slave_stripe:1; - __u32 ldo_def_stripe_size; - __u16 ldo_def_stripenr; - __u16 ldo_def_stripe_offset; - struct lod_dir_stripe_info *ldo_dir_stripe; + struct dt_object ldo_obj; + union { + /* file stripe */ + struct { + /* + * don't change field order, because both file and + * directory use ldo_stripenr/ldo_stripes_allocated + * to access stripe number. + */ + __u16 ldo_stripenr; + __u16 ldo_stripes_allocated; + __u16 ldo_layout_gen; + __u16 ldo_released_stripenr; + __u32 ldo_pattern; + __u32 ldo_stripe_size; + __u16 ldo_stripe_offset; + char *ldo_pool; + }; + /* directory stripe */ + struct { + __u16 ldo_dir_stripenr; + __u16 ldo_dir_stripes_allocated; + __u32 ldo_dir_stripe_offset; + __u32 ldo_dir_hash_type; + __u32 ldo_dir_slave_stripe:1, + ldo_dir_striped:1; + /* + * default striping is not cached, so this field is + * invalid after create, make sure it's used by + * lod_dir_striping_create_internal() only. + */ + struct lod_default_striping *ldo_def_striping; + }; + }; + struct dt_object **ldo_stripe; }; -#define ldo_dir_stripe_offset ldo_dir_stripe->ldsi_stripe_offset -#define ldo_dir_def_stripenr ldo_dir_stripe->ldsi_def_stripenr -#define ldo_dir_hash_type ldo_dir_stripe->ldsi_hash_type -#define ldo_dir_def_hash_type ldo_dir_stripe->ldsi_def_hash_type -#define ldo_dir_striped ldo_dir_stripe->ldsi_striped -#define ldo_dir_def_striping_set ldo_dir_stripe->ldsi_def_striping_set -#define ldo_dir_def_striping_cached ldo_dir_stripe->ldsi_def_striping_cached -#define ldo_dir_def_stripe_offset ldo_dir_stripe->ldsi_def_stripe_offset +static inline int lod_object_set_pool(struct lod_object *lo, const char *pool) +{ + int len; + + if (lo->ldo_pool != NULL) { + len = strlen(lo->ldo_pool) + 1; + OBD_FREE(lo->ldo_pool, len); + lo->ldo_pool = NULL; + } + if (pool != NULL) { + len = strlen(pool) + 1; + OBD_ALLOC(lo->ldo_pool, len); + if (lo->ldo_pool == NULL) + return -ENOMEM; + strlcpy(lo->ldo_pool, pool, len); + } + return 0; +} struct lod_it { struct dt_object *lit_obj; /* object from the layer below */ @@ -289,25 +311,28 @@ struct lod_it { struct lod_thread_info { /* per-thread buffer for LOV EA, may be vmalloc'd */ - void *lti_ea_store; - __u32 lti_ea_store_size; + void *lti_ea_store; + __u32 lti_ea_store_size; /* per-thread buffer for LMV EA */ - struct lu_buf lti_buf; - struct ost_id lti_ostid; - struct lu_fid lti_fid; - struct obd_statfs lti_osfs; - struct lu_attr lti_attr; - struct lod_it lti_it; - struct ldlm_res_id lti_res_id; + struct lu_buf lti_buf; + struct ost_id lti_ostid; + struct lu_fid lti_fid; + struct obd_statfs lti_osfs; + struct lu_attr lti_attr; + struct lod_it lti_it; + struct ldlm_res_id lti_res_id; /* used to hold lu_dirent, sizeof(struct lu_dirent) + NAME_MAX */ - char lti_key[sizeof(struct lu_dirent) + NAME_MAX]; - struct dt_object_format lti_format; - struct lu_name lti_name; - struct lu_buf lti_linkea_buf; - struct dt_insert_rec lti_dt_rec; - struct llog_catid lti_cid; - struct llog_cookie lti_cookie; - struct lustre_cfg lti_lustre_cfg; + char lti_key[sizeof(struct lu_dirent) + + NAME_MAX]; + struct dt_object_format lti_format; + struct lu_name lti_name; + struct lu_buf lti_linkea_buf; + struct dt_insert_rec lti_dt_rec; + struct llog_catid lti_cid; + struct llog_cookie lti_cookie; + struct lustre_cfg lti_lustre_cfg; + /* used to store parent default striping in create */ + struct lod_default_striping lti_def_striping; }; extern const struct lu_device_operations lod_lu_ops; @@ -489,7 +514,6 @@ extern struct dt_object_operations lod_obj_ops; extern struct lu_object_operations lod_lu_obj_ops; int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, struct lu_buf *buf, bool resize); -int lod_object_set_pool(struct lod_object *o, char *pool); int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, const struct lu_buf *lovea, struct thandle *th); diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index 0ed2fe8..d362f48 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -960,7 +960,7 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, if (magic == LOV_MAGIC_V3) { struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *) lmm; objs = &v3->lmm_objects[0]; - lod_object_set_pool(lo, v3->lmm_pool_name); + /* no need to set pool, which is used in create only */ } else { objs = &lmm->lmm_objects[0]; } @@ -1041,9 +1041,6 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) */ rc = lod_parse_dir_striping(env, lo, buf); } - - if (rc == 0) - lo->ldo_striping_cached = 1; out: RETURN(rc); } diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 165d4bd..87dd8b6 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -2105,22 +2105,6 @@ static int lod_declare_xattr_set(const struct lu_env *env, } /** - * Resets cached default striping in the object. - * - * \param[in] lo object - */ -static void lod_lov_stripe_cache_clear(struct lod_object *lo) -{ - lo->ldo_def_striping_set = 0; - lo->ldo_def_striping_cached = 0; - lod_object_set_pool(lo, NULL); - lo->ldo_def_stripe_size = 0; - lo->ldo_def_stripenr = 0; - if (lo->ldo_dir_stripe != NULL) - lo->ldo_dir_def_striping_cached = 0; -} - -/** * Apply xattr changes to the object. * * Applies xattr changes to the object and the stripes if the latter exist. @@ -2237,18 +2221,12 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, struct thandle *th) { struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *l = lod_dt_obj(dt); struct lov_user_md_v1 *lum; struct lov_user_md_v3 *v3 = NULL; const char *pool_name = NULL; int rc; ENTRY; - /* If it is striped dir, we should clear the stripe cache for - * slave stripe as well, but there are no effective way to - * notify the LOD on the slave MDT, so we do not cache stripe - * information for slave stripe for now. XXX*/ - lod_lov_stripe_cache_clear(l); LASSERT(buf != NULL && buf->lb_buf != NULL); lum = buf->lb_buf; @@ -2308,7 +2286,6 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, const char *name, int fl, struct thandle *th) { - struct lod_object *l = lod_dt_obj(dt); struct lmv_user_md_v1 *lum; int rc; ENTRY; @@ -2332,14 +2309,6 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, RETURN(rc); } - /* Update default stripe cache */ - if (l->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(l->ldo_dir_stripe); - if (l->ldo_dir_stripe == NULL) - RETURN(-ENOMEM); - } - - l->ldo_dir_def_striping_cached = 0; RETURN(rc); } @@ -2535,11 +2504,17 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, struct thandle *th, bool declare) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + const struct lod_default_striping *lds = lo->ldo_def_striping; + const char *poolname = NULL; + int rc; ENTRY; + LASSERT(ergo(lds != NULL, + lds->lds_def_striping_set || + lds->lds_dir_def_striping_set)); + if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr, lo->ldo_dir_stripe_offset)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; @@ -2572,11 +2547,10 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, } /* Transfer default LMV striping from the parent */ - if (lo->ldo_dir_def_striping_set && - !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, - lo->ldo_dir_def_stripe_offset)) { + if (lds != NULL && lds->lds_dir_def_striping_set && + !LMVEA_DELETE_VALUES(lds->lds_dir_def_stripenr, + lds->lds_dir_def_stripe_offset)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int def_stripe_count = lo->ldo_dir_def_stripenr; if (info->lti_ea_store_size < sizeof(*v1)) { rc = lod_ea_store_resize(info, sizeof(*v1)); @@ -2587,11 +2561,11 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, memset(v1, 0, sizeof(*v1)); v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(def_stripe_count); + v1->lum_stripe_count = cpu_to_le32(lds->lds_dir_def_stripenr); v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_def_stripe_offset); + cpu_to_le32(lds->lds_dir_def_stripe_offset); v1->lum_hash_type = - cpu_to_le32(lo->ldo_dir_def_hash_type); + cpu_to_le32(lds->lds_dir_def_hash_type); info->lti_buf.lb_buf = v1; info->lti_buf.lb_len = sizeof(*v1); @@ -2608,12 +2582,15 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, RETURN(rc); } + if (lds != NULL && lds->lds_def_pool[0] != '\0') + poolname = lds->lds_def_pool; + /* Transfer default LOV striping from the parent */ - if (lo->ldo_def_striping_set && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { + if (lds != NULL && lds->lds_def_striping_set && + !LOVEA_DELETE_VALUES(lds->lds_def_stripe_size, + lds->lds_def_stripenr, + lds->lds_def_stripe_offset, + poolname)) { struct lov_user_md_v3 *v3 = info->lti_ea_store; if (info->lti_ea_store_size < sizeof(*v3)) { @@ -2625,11 +2602,11 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, memset(v3, 0, sizeof(*v3)); v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, + v3->lmm_stripe_count = cpu_to_le16(lds->lds_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lds->lds_def_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(lds->lds_def_stripe_size); + if (poolname != NULL) + strlcpy(v3->lmm_pool_name, poolname, sizeof(v3->lmm_pool_name)); info->lti_buf.lb_buf = v3; @@ -2663,14 +2640,7 @@ static int lod_dir_striping_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - int rc; - - rc = lod_dir_striping_create_internal(env, dt, attr, dof, th, false); - if (rc == 0) - lo->ldo_striping_cached = 1; - - return rc; + return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); } /** @@ -2852,38 +2822,6 @@ static int lod_xattr_list(const struct lu_env *env, return dt_xattr_list(env, dt_object_child(dt), buf); } -/** - * Initialize a pool the object belongs to. - * - * When a striped object is being created, striping configuration - * may demand the stripes are allocated on a limited set of the - * targets. These limited sets are known as "pools". So we copy - * a pool name into the object and later actual creation methods - * (like lod_object_create()) will use this information to allocate - * the stripes properly. - * - * \param[in] o object - * \param[in] pool pool name - */ -int lod_object_set_pool(struct lod_object *o, char *pool) -{ - int len; - - if (o->ldo_pool) { - len = strlen(o->ldo_pool); - OBD_FREE(o->ldo_pool, len + 1); - o->ldo_pool = NULL; - } - if (pool) { - len = strlen(pool); - OBD_ALLOC(o->ldo_pool, len + 1); - if (o->ldo_pool == NULL) - return -ENOMEM; - strcpy(o->ldo_pool, pool); - } - return 0; -} - static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid) { return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); @@ -2891,44 +2829,31 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi /** - * Cache default regular striping in the object. - * - * To improve performance of striped regular object creation we cache - * default LOV striping (if it exists) in the parent directory object. + * Get default striping. * * \param[in] env execution environment - * \param[in] lp object + * \param[in] lo object + * \param[out] lds default striping * * \retval 0 on success * \retval negative if failed */ -static int lod_cache_parent_lov_striping(const struct lu_env *env, - struct lod_object *lp) +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) { - struct lod_thread_info *info = lod_env_info(env); - struct lov_user_md_v1 *v1 = NULL; - struct lov_user_md_v3 *v3 = NULL; - int rc; - ENTRY; + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v1 *v1 = NULL; + struct lov_user_md_v3 *v3 = NULL; + int rc; - /* called from MDD without parent being write locked, - * lock it here */ - dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0); - rc = lod_get_lov_ea(env, lp); + rc = lod_get_lov_ea(env, lo); if (rc < 0) - GOTO(unlock, rc); - - if (rc < (typeof(rc))sizeof(struct lov_user_md)) { - /* don't lookup for non-existing or invalid striping */ - lp->ldo_def_striping_set = 0; - lp->ldo_def_striping_cached = 1; - lp->ldo_def_stripe_size = 0; - lp->ldo_def_stripenr = 0; - lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1); - GOTO(unlock, rc = 0); - } + return rc; + + if (rc < (typeof(rc))sizeof(struct lov_user_md)) + return 0; - rc = 0; v1 = info->lti_ea_store; if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { lustre_swab_lov_user_md_v1(v1); @@ -2938,127 +2863,126 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env, } if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1) - GOTO(unlock, rc = 0); + return 0; if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0) - GOTO(unlock, rc = 0); - - CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n", - PFID(lu_object_fid(&lp->ldo_obj.do_lu)), - (int)v1->lmm_stripe_count, - (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset); - - lp->ldo_def_stripenr = v1->lmm_stripe_count; - lp->ldo_def_stripe_size = v1->lmm_stripe_size; - lp->ldo_def_stripe_offset = v1->lmm_stripe_offset; - lp->ldo_def_striping_cached = 1; - lp->ldo_def_striping_set = 1; + return 0; + + lds->lds_def_stripenr = v1->lmm_stripe_count; + lds->lds_def_stripe_size = v1->lmm_stripe_size; + lds->lds_def_stripe_offset = v1->lmm_stripe_offset; + lds->lds_def_striping_set = 1; if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - /* XXX: sanity check here */ - v3 = (struct lov_user_md_v3 *) v1; - if (v3->lmm_pool_name[0]) - lod_object_set_pool(lp, v3->lmm_pool_name); + v3 = (struct lov_user_md_v3 *)v1; + if (v3->lmm_pool_name[0] != '\0') + strlcpy(lds->lds_def_pool, v3->lmm_pool_name, + sizeof(lds->lds_def_pool)); } - EXIT; -unlock: - dt_write_unlock(env, dt_object_child(&lp->ldo_obj)); - return rc; -} + return 0; +} /** - * Cache default directory striping in the object. - * - * To improve performance of striped directory creation we cache default - * directory striping (if it exists) in the parent directory object. + * Get default directory striping. * * \param[in] env execution environment - * \param[in] lp object + * \param[in] lo object + * \param[out] lds default striping * * \retval 0 on success * \retval negative if failed */ -static int lod_cache_parent_lmv_striping(const struct lu_env *env, - struct lod_object *lp) +static int lod_get_default_lmv_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) { struct lod_thread_info *info = lod_env_info(env); struct lmv_user_md_v1 *v1 = NULL; int rc; - ENTRY; - /* called from MDD without parent being write locked, - * lock it here */ - dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0); - rc = lod_get_default_lmv_ea(env, lp); + rc = lod_get_default_lmv_ea(env, lo); if (rc < 0) - GOTO(unlock, rc); - - if (rc < (typeof(rc))sizeof(struct lmv_user_md)) { - /* don't lookup for non-existing or invalid striping */ - lp->ldo_dir_def_striping_set = 0; - lp->ldo_dir_def_striping_cached = 1; - lp->ldo_dir_def_stripenr = 0; - lp->ldo_dir_def_stripe_offset = - (typeof(v1->lum_stripe_offset))(-1); - lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64; - GOTO(unlock, rc = 0); - } + return rc; + + if (rc < (typeof(rc))sizeof(struct lmv_user_md)) + return 0; - rc = 0; v1 = info->lti_ea_store; - lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count); - lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); - lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); - lp->ldo_dir_def_striping_set = 1; - lp->ldo_dir_def_striping_cached = 1; + lds->lds_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count); + lds->lds_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); + lds->lds_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); + lds->lds_dir_def_striping_set = 1; - EXIT; -unlock: - dt_write_unlock(env, dt_object_child(&lp->ldo_obj)); - return rc; + return 0; } /** - * Cache default striping in the object. + * Get default striping in the object. * - * To improve performance of striped object creation we cache default striping - * (if it exists) in the parent directory object. We always cache default - * striping for the regular files (stored in LOV EA) and we cache default - * striping for the directories if requested by \a child_mode (when a new - * directory is being created). + * Get object default striping and default directory striping. * * \param[in] env execution environment - * \param[in] lp object - * \param[in] child_mode new object's mode + * \param[in] lo object + * \param[out] lds default striping * * \retval 0 on success * \retval negative if failed */ -static int lod_cache_parent_striping(const struct lu_env *env, - struct lod_object *lp, - umode_t child_mode) +static int lod_get_default_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) { - int rc = 0; - ENTRY; - - if (!lp->ldo_def_striping_cached) { - /* we haven't tried to get default striping for - * the directory yet, let's cache it in the object */ - rc = lod_cache_parent_lov_striping(env, lp); - if (rc != 0) - RETURN(rc); - } + int rc; - /* If the parent is on the remote MDT, we should always - * try to refresh the default stripeEA cache, because we - * do not cache default striping information for remote - * object. */ - if (S_ISDIR(child_mode) && (!lp->ldo_dir_def_striping_cached || - dt_object_remote(&lp->ldo_obj))) - rc = lod_cache_parent_lmv_striping(env, lp); + rc = lod_get_default_lov_striping(env, lo, lds); + if (rc == 0) + rc = lod_get_default_lmv_striping(env, lo, lds); + return rc; +} - RETURN(rc); +/** + * Apply default striping on object. + * + * If object striping pattern is not set, set to the one in default striping. + * The default striping is from parent or fs. + * + * \param[in] lo new object + * \param[in] lds default striping + * \param[in] mode new object's mode + */ +static void lod_striping_from_default(struct lod_object *lo, + const struct lod_default_striping *lds, + umode_t mode) +{ + if (lds->lds_def_striping_set && S_ISREG(mode)) { + if (lo->ldo_stripenr == 0) + lo->ldo_stripenr = lds->lds_def_stripenr; + if (lo->ldo_stripe_size == 0) + lo->ldo_stripe_size = lds->lds_def_stripe_size; + if (lo->ldo_stripe_offset == LOV_OFFSET_DEFAULT) + lo->ldo_stripe_offset = lds->lds_def_stripe_offset; + if (lo->ldo_pool == NULL && lds->lds_def_pool[0] != '\0') + lod_object_set_pool(lo, lds->lds_def_pool); + + CDEBUG(D_INFO, "striping from default: count %hu, size %u, " + "offset %d, pool %s\n", + lo->ldo_stripenr, lo->ldo_stripe_size, + (int)lo->ldo_stripe_offset, lo->ldo_pool ?: ""); + } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { + if (lo->ldo_stripenr == 0) + lo->ldo_stripenr = lds->lds_dir_def_stripenr; + if (lo->ldo_dir_stripe_offset == -1) + lo->ldo_dir_stripe_offset = + lds->lds_dir_def_stripe_offset; + if (lo->ldo_dir_hash_type == 0) + lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type; + + CDEBUG(D_INFO, "striping from default: count %hu, offset %d, " + "hash_type %u\n", + lo->ldo_stripenr, (int)lo->ldo_dir_stripe_offset, + lo->ldo_dir_hash_type); + } } /** @@ -3079,12 +3003,13 @@ static void lod_ah_init(const struct lu_env *env, umode_t child_mode) { struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); - struct dt_object *nextp = NULL; - struct dt_object *nextc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_default_striping *lds = &info->lti_def_striping; + struct dt_object *nextp = NULL; + struct dt_object *nextc; struct lod_object *lp = NULL; struct lod_object *lc; - struct lov_desc *desc; - int rc; + struct lov_desc *desc; ENTRY; LASSERT(child); @@ -3092,9 +3017,6 @@ static void lod_ah_init(const struct lu_env *env, if (likely(parent)) { nextp = dt_object_child(parent); lp = lod_dt_obj(parent); - rc = lod_load_striping(env, lp); - if (rc != 0) - return; } nextc = dt_object_child(child); @@ -3107,85 +3029,33 @@ static void lod_ah_init(const struct lu_env *env, nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { - if (lc->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(lc->ldo_dir_stripe); - if (lc->ldo_dir_stripe == NULL) - return; - } + /* other default values are 0 */ + lc->ldo_dir_stripe_offset = -1; - LASSERT(lp != NULL); - if (lp->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(lp->ldo_dir_stripe); - if (lp->ldo_dir_stripe == NULL) - return; - } + memset(lds, 0, sizeof(*lds)); + lod_get_default_striping(env, lp, lds); - rc = lod_cache_parent_striping(env, lp, child_mode); - if (rc != 0) - return; - - /* transfer defaults to new directory */ - if (lp->ldo_def_striping_set) { - if (lp->ldo_pool) - lod_object_set_pool(lc, lp->ldo_pool); - lc->ldo_def_stripenr = lp->ldo_def_stripenr; - lc->ldo_def_stripe_size = lp->ldo_def_stripe_size; - lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; - lc->ldo_def_striping_set = 1; - lc->ldo_def_striping_cached = 1; - CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", - (int)lc->ldo_def_stripe_size, - (int)lc->ldo_def_stripe_offset, - (int)lc->ldo_def_stripenr); - } - - /* transfer dir defaults to new directory */ - if (lp->ldo_dir_def_striping_set) { - lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr; - lc->ldo_dir_def_stripe_offset = - lp->ldo_dir_def_stripe_offset; - lc->ldo_dir_def_hash_type = - lp->ldo_dir_def_hash_type; - lc->ldo_dir_def_striping_set = 1; - lc->ldo_dir_def_striping_cached = 1; - CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n", - (int)lc->ldo_dir_def_stripenr, - (int)lc->ldo_dir_def_stripe_offset, - lc->ldo_dir_def_hash_type); - } + /* inherit parent default striping */ + if (lds->lds_def_striping_set || lds->lds_dir_def_striping_set) + lc->ldo_def_striping = lds; /* It should always honour the specified stripes */ - if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) { + if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + lod_verify_md_striping(d, ah->dah_eadata) == 0) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - rc = lod_verify_md_striping(d, lum1); - if (rc == 0 && - le32_to_cpu(lum1->lum_stripe_count) > 1) { - lc->ldo_stripenr = - le32_to_cpu(lum1->lum_stripe_count); - lc->ldo_dir_stripe_offset = - le32_to_cpu(lum1->lum_stripe_offset); - lc->ldo_dir_hash_type = - le32_to_cpu(lum1->lum_hash_type); - CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n", - lc->ldo_stripenr, - (int)lc->ldo_dir_stripe_offset); - } - /* then check whether there is default stripes from parent */ - } else if (lp->ldo_dir_def_striping_set) { - /* If there are default dir stripe from parent */ - lc->ldo_stripenr = lp->ldo_dir_def_stripenr; + lc->ldo_stripenr = le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = - lp->ldo_dir_def_stripe_offset; + le32_to_cpu(lum1->lum_stripe_offset); lc->ldo_dir_hash_type = - lp->ldo_dir_def_hash_type; - CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n", - lc->ldo_stripenr, - (int)lc->ldo_dir_stripe_offset); + le32_to_cpu(lum1->lum_hash_type); + CDEBUG(D_INFO, "set dir stripe: count %hu, offset %d, " + "hash_type %u\n", + lc->ldo_stripenr, + (int)lc->ldo_dir_stripe_offset, + lc->ldo_dir_hash_type); } else { - /* set default stripe for this directory */ - lc->ldo_stripenr = 0; - lc->ldo_dir_stripe_offset = -1; + lod_striping_from_default(lc, lds, child_mode); } /* shrink the stripe_count to the avaible MDT count */ @@ -3200,10 +3070,11 @@ static void lod_ah_init(const struct lu_env *env, if (lc->ldo_stripenr == 1) lc->ldo_stripenr = 0; - CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n", - lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset); + CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", + lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset, + lc->ldo_dir_hash_type); - goto out; + RETURN_EXIT; } /* @@ -3213,45 +3084,60 @@ static void lod_ah_init(const struct lu_env *env, */ if (!lod_object_will_be_striped(S_ISREG(child_mode), lu_object_fid(&child->do_lu))) - goto out; - /* - * try from the parent - */ + RETURN_EXIT; + + /* other default values are 0 */ + lc->ldo_stripe_offset = LOV_OFFSET_DEFAULT; + + /* striping from parent default */ if (likely(parent)) { - lod_cache_parent_striping(env, lp, child_mode); - - lc->ldo_def_stripe_offset = LOV_OFFSET_DEFAULT; - - if (lp->ldo_def_striping_set) { - if (lp->ldo_pool) - lod_object_set_pool(lc, lp->ldo_pool); - lc->ldo_stripenr = lp->ldo_def_stripenr; - lc->ldo_stripe_size = lp->ldo_def_stripe_size; - lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; - CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n", - lc->ldo_stripenr, lc->ldo_stripe_size, - lp->ldo_pool ? lp->ldo_pool : ""); + memset(lds, 0, sizeof(*lds)); + lod_get_default_lov_striping(env, lp, lds); + lod_striping_from_default(lc, lds, child_mode); + } + + if (d->lod_md_root == NULL) { + struct dt_object *root; + struct lod_object *lroot; + + lu_root_fid(&info->lti_fid); + root = dt_locate(env, &d->lod_dt_dev, &info->lti_fid); + if (!IS_ERR(root)) { + lroot = lod_dt_obj(root); + + spin_lock(&d->lod_lock); + if (d->lod_md_root != NULL) + lu_object_put(env, + &d->lod_md_root->ldo_obj.do_lu); + d->lod_md_root = lroot; + spin_unlock(&d->lod_lock); } } + /* if parent doesn't provide all defaults, striping from fs default */ + if (d->lod_md_root != NULL && + (lc->ldo_stripenr == 0 || + lc->ldo_stripe_size == 0 || + lc->ldo_stripe_offset == LOV_OFFSET_DEFAULT || + lc->ldo_pool == NULL)) { + memset(lds, 0, sizeof(*lds)); + lod_get_default_lov_striping(env, d->lod_md_root, lds); + lod_striping_from_default(lc, lds, child_mode); + } + /* - * if the parent doesn't provide with specific pattern, grab fs-wide one + * fs default striping may not be explicitly set, or historically set + * in config log, check striping sanity here and fix to sane values. */ desc = &d->lod_desc; if (lc->ldo_stripenr == 0) lc->ldo_stripenr = desc->ld_default_stripe_count; if (lc->ldo_stripe_size == 0) lc->ldo_stripe_size = desc->ld_default_stripe_size; - CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n", - lc->ldo_stripenr, lc->ldo_stripe_size, - lc->ldo_pool ? lc->ldo_pool : ""); - -out: - /* we do not cache stripe information for slave stripe, see - * lod_xattr_set_lov_on_dir */ - if (lp != NULL && lp->ldo_dir_slave_stripe) - lod_lov_stripe_cache_clear(lp); + CDEBUG(D_INFO, "final striping [%hu %u %d %s]\n", + lc->ldo_stripenr, lc->ldo_stripe_size, + (int)lc->ldo_stripe_offset, lc->ldo_pool ?: ""); EXIT; } @@ -3495,11 +3381,7 @@ static int lod_declare_object_create(const struct lu_env *env, } } - /* Orphan object (like migrating object) does not have - * lod_dir_stripe, see lod_ah_init */ - if (lo->ldo_dir_stripe != NULL) - rc = lod_declare_dir_striping_create(env, dt, attr, - dof, th); + rc = lod_declare_dir_striping_create(env, dt, attr, dof, th); } out: RETURN(rc); @@ -3532,8 +3414,6 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, int rc = 0, i; ENTRY; - LASSERT(lo->ldo_striping_cached == 0); - /* create all underlying objects */ for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); @@ -3543,11 +3423,8 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, break; } - if (rc == 0) { + if (rc == 0) rc = lod_generate_and_set_lovea(env, lo, th); - if (rc == 0) - lo->ldo_striping_cached = 1; - } RETURN(rc); } @@ -3804,7 +3681,7 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, * Release LDLM locks on the stripes of a striped directory. * * Iterates over all the locks taken on the stripe objects and - * release them using ->do_object_unlock() method. + * cancel them. * * \param[in] env execution environment * \param[in] dt striped object @@ -3829,8 +3706,8 @@ static int lod_object_unlock_internal(const struct lu_env *env, for (i = 1; i < slave_locks->count; i++) { if (lustre_handle_is_used(&slave_locks->handles[i])) - ldlm_lock_decref(&slave_locks->handles[i], - einfo->ei_mode); + ldlm_lock_decref_and_cancel(&slave_locks->handles[i], + einfo->ei_mode); } RETURN(rc); @@ -3864,8 +3741,10 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, LASSERT(!dt_object_remote(dt_object_child(dt))); /* locks were unlocked in MDT layer */ - for (i = 1; i < slave_locks->count; i++) + for (i = 1; i < slave_locks->count; i++) { LASSERT(!lustre_handle_is_used(&slave_locks->handles[i])); + dt_invalidate(env, lo->ldo_stripe[i]); + } slave_locks_size = sizeof(*slave_locks) + slave_locks->count * sizeof(slave_locks->handles[0]); @@ -3904,22 +3783,29 @@ static int lod_object_lock(const struct lu_env *env, } if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); + GOTO(out, rc = -ENOTDIR); rc = lod_load_striping(env, lo); if (rc != 0) - RETURN(rc); + GOTO(out, rc); /* No stripes */ - if (lo->ldo_stripenr <= 1) - RETURN(0); + if (lo->ldo_stripenr <= 1) { + /* + * NB, ei_cbdata stores pointer to slave locks, if no locks + * taken, make sure it's set to NULL, otherwise MDT will try to + * unlock them. + */ + einfo->ei_cbdata = NULL; + GOTO(out, rc = 0); + } slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr * sizeof(slave_locks->handles[0]); /* Freed in lod_object_unlock */ OBD_ALLOC(slave_locks, slave_locks_size); if (slave_locks == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); slave_locks->count = lo->ldo_stripenr; /* striped directory lock */ @@ -3959,23 +3845,32 @@ static int lod_object_lock(const struct lu_env *env, NULL, &lockh); } if (rc != 0) - GOTO(out, rc); + break; slave_locks->handles[i] = lockh; } - einfo->ei_cbdata = slave_locks; -out: if (rc != 0 && slave_locks != NULL) { - einfo->ei_cbdata = slave_locks; lod_object_unlock_internal(env, dt, einfo, policy); OBD_FREE(slave_locks, slave_locks_size); - einfo->ei_cbdata = NULL; } - + EXIT; +out: + if (rc != 0) + einfo->ei_cbdata = NULL; RETURN(rc); } +/** + * Implementation of dt_object_operations::do_invalidate. + * + * \see dt_object_operations::do_invalidate() in the API description for details + */ +static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + return dt_invalidate(env, dt_object_child(dt)); +} + struct dt_object_operations lod_obj_ops = { .do_read_lock = lod_object_read_lock, .do_write_lock = lod_object_write_lock, @@ -4004,6 +3899,7 @@ struct dt_object_operations lod_obj_ops = { .do_object_sync = lod_object_sync, .do_object_lock = lod_object_lock, .do_object_unlock = lod_object_unlock, + .do_invalidate = lod_invalidate, }; /** @@ -4164,14 +4060,11 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, */ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) { - int i; + int len; - if (lo->ldo_dir_stripe != NULL) { - OBD_FREE_PTR(lo->ldo_dir_stripe); - lo->ldo_dir_stripe = NULL; - } + if (lo->ldo_stripe != NULL) { + int i; - if (lo->ldo_stripe) { LASSERT(lo->ldo_stripes_allocated > 0); for (i = 0; i < lo->ldo_stripenr; i++) { @@ -4179,14 +4072,12 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) lu_object_put(env, &lo->ldo_stripe[i]->do_lu); } - i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated; - OBD_FREE(lo->ldo_stripe, i); + len = sizeof(struct dt_object *) * lo->ldo_stripes_allocated; + OBD_FREE(lo->ldo_stripe, len); lo->ldo_stripe = NULL; lo->ldo_stripes_allocated = 0; } - lo->ldo_striping_cached = 0; lo->ldo_stripenr = 0; - lo->ldo_pattern = 0; } /** @@ -4218,18 +4109,13 @@ static int lod_object_start(const struct lu_env *env, struct lu_object *o) */ static void lod_object_free(const struct lu_env *env, struct lu_object *o) { - struct lod_object *mo = lu2lod_obj(o); - - /* - * release all underlying object pinned - */ - - lod_object_free_striping(env, mo); - - lod_object_set_pool(mo, NULL); + struct lod_object *lo = lu2lod_obj(o); + lod_object_set_pool(lo, NULL); + /* release all underlying object pinned */ + lod_object_free_striping(env, lo); lu_object_fini(o); - OBD_SLAB_FREE_PTR(mo, lod_object_kmem); + OBD_SLAB_FREE_PTR(lo, lod_object_kmem); } /** diff --git a/lustre/lod/lod_qos.c b/lustre/lod/lod_qos.c index 8022335..d1985b8 100644 --- a/lustre/lod/lod_qos.c +++ b/lustre/lod/lod_qos.c @@ -209,7 +209,7 @@ static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d, /* check whether device has changed state (active, inactive) */ if (rc != 0 && ost->ltd_active) { /* turned inactive? */ - spin_lock(&d->lod_desc_lock); + spin_lock(&d->lod_lock); if (ost->ltd_active) { ost->ltd_active = 0; if (rc == -ENOTCONN) @@ -222,13 +222,13 @@ static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d, CDEBUG(D_CONFIG, "%s: turns inactive\n", ost->ltd_exp->exp_obd->obd_name); } - spin_unlock(&d->lod_desc_lock); + spin_unlock(&d->lod_lock); } else if (rc == 0 && ost->ltd_active == 0) { /* turned active? */ LASSERTF(d->lod_desc.ld_active_tgt_count < d->lod_ostnr, "active tgt count %d, ost nr %d\n", d->lod_desc.ld_active_tgt_count, d->lod_ostnr); - spin_lock(&d->lod_desc_lock); + spin_lock(&d->lod_lock); if (ost->ltd_active == 0) { ost->ltd_active = 1; ost->ltd_connecting = 0; @@ -238,7 +238,7 @@ static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d, CDEBUG(D_CONFIG, "%s: turns active\n", ost->ltd_exp->exp_obd->obd_name); } - spin_unlock(&d->lod_desc_lock); + spin_unlock(&d->lod_lock); } RETURN(rc); @@ -1105,7 +1105,7 @@ static int lod_alloc_ost_list(const struct lu_env *env, v3 = (struct lov_user_md_v3 *)lum; for (i = 0; i < lo->ldo_stripenr; i++) { - if (v3->lmm_objects[i].l_ost_idx == lo->ldo_def_stripe_offset) { + if (v3->lmm_objects[i].l_ost_idx == lo->ldo_stripe_offset) { array_idx = i; break; } @@ -1113,7 +1113,7 @@ static int lod_alloc_ost_list(const struct lu_env *env, if (i == lo->ldo_stripenr) { CDEBUG(D_OTHER, "%s: start index %d not in the specified list of OSTs\n", - lod2obd(m)->obd_name, lo->ldo_def_stripe_offset); + lod2obd(m)->obd_name, lo->ldo_stripe_offset); RETURN(-EINVAL); } @@ -1161,7 +1161,7 @@ static int lod_alloc_ost_list(const struct lu_env *env, /** * Allocate a striping on a predefined set of OSTs. * - * Allocates new layout starting from OST index in lo->ldo_def_stripe_offset. + * Allocates new layout starting from OST index in lo->ldo_stripe_offset. * Full OSTs are not considered. The exact order of OSTs is not important and * varies depending on OST status. The allocation procedure prefers the targets * with precreated objects ready. The number of stripes needed and stripe @@ -1217,15 +1217,14 @@ repeat_find: /* search loi_ost_idx in ost array */ array_idx = 0; for (i = 0; i < ost_count; i++) { - if (osts->op_array[i] == lo->ldo_def_stripe_offset) { + if (osts->op_array[i] == lo->ldo_stripe_offset) { array_idx = i; break; } } if (i == ost_count) { CERROR("Start index %d not found in pool '%s'\n", - lo->ldo_def_stripe_offset, - lo->ldo_pool ? lo->ldo_pool : ""); + lo->ldo_stripe_offset, lo->ldo_pool ?: ""); GOTO(out, rc = -EINVAL); } @@ -1786,7 +1785,7 @@ static int lod_qos_parse_config(const struct lu_env *env, if (v1->lmm_stripe_count > 0) lo->ldo_stripenr = v1->lmm_stripe_count; - lo->ldo_def_stripe_offset = v1->lmm_stripe_offset; + lo->ldo_stripe_offset = v1->lmm_stripe_offset; lod_object_set_pool(lo, NULL); if (pool_name != NULL) { @@ -1797,14 +1796,14 @@ static int lod_qos_parse_config(const struct lu_env *env, /* coverity[overrun-buffer-val] */ pool = lod_find_pool(d, pool_name); if (pool != NULL) { - if (lo->ldo_def_stripe_offset != LOV_OFFSET_DEFAULT) { + if (lo->ldo_stripe_offset != LOV_OFFSET_DEFAULT) { rc = lod_check_index_in_pool( - lo->ldo_def_stripe_offset, pool); + lo->ldo_stripe_offset, pool); if (rc < 0) { lod_pool_putref(pool); CERROR("%s: invalid offset, %u\n", lod2obd(d)->obd_name, - lo->ldo_def_stripe_offset); + lo->ldo_stripe_offset); RETURN(-EINVAL); } } @@ -1911,7 +1910,7 @@ int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo, if (lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { rc = lod_alloc_ost_list(env, lo, stripe, lum, th); - } else if (lo->ldo_def_stripe_offset == LOV_OFFSET_DEFAULT) { + } else if (lo->ldo_stripe_offset == LOV_OFFSET_DEFAULT) { rc = lod_alloc_qos(env, lo, stripe, flag, th); if (rc == -EAGAIN) rc = lod_alloc_rr(env, lo, stripe, flag, th); diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 0e9e941..c722509 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -4211,6 +4211,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj, struct mdd_object *mdd_tobj = md2mdd_obj(tobj); struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr); struct lu_attr *pattr = MDD_ENV_VAR(env, pattr); + bool created = false; int rc; ENTRY; @@ -4269,6 +4270,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj, lname, so_attr); if (rc != 0) GOTO(put, rc); + created = true; } LASSERT(mdd_object_exists(mdd_tobj)); @@ -4295,6 +4297,10 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj, ma); if (rc != 0) GOTO(put, rc); + + /* newly created target was not locked, don't cache its attributes */ + if (created) + mdd_invalidate(env, tobj); put: RETURN(rc); } diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 4f99f02..60094ca 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -302,6 +302,7 @@ int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd, const struct lu_name *tname, const struct lu_name *sname, struct thandle *handle); +int mdd_invalidate(const struct lu_env *env, struct md_object *obj); int mdd_declare_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct mdd_object *c, @@ -571,6 +572,12 @@ mdo_xattr_list(const struct lu_env *env, struct mdd_object *obj, return dt_xattr_list(env, next, buf); } +static inline int +mdo_invalidate(const struct lu_env *env, struct mdd_object *obj) +{ + return dt_invalidate(env, mdd_object_child(obj)); +} + static inline int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj, const struct lu_fid *fid, __u32 type, diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 4ad7174..692d445 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -339,6 +339,11 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj, RETURN(rc); } +int mdd_invalidate(const struct lu_env *env, struct md_object *obj) +{ + return mdo_invalidate(env, md2mdd_obj(obj)); +} + int mdd_declare_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct mdd_object *c, @@ -2066,6 +2071,7 @@ const struct md_object_operations mdd_obj_ops = { .moo_xattr_get = mdd_xattr_get, .moo_xattr_set = mdd_xattr_set, .moo_xattr_list = mdd_xattr_list, + .moo_invalidate = mdd_invalidate, .moo_xattr_del = mdd_xattr_del, .moo_swap_layouts = mdd_swap_layouts, .moo_open = mdd_open, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 3a9419b..acc008f 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -448,15 +448,15 @@ static int mdt_statfs(struct tgt_session_info *tsi) rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs); if (rc) GOTO(out, rc); - spin_lock(&info->mti_mdt->mdt_osfs_lock); + spin_lock(&info->mti_mdt->mdt_lock); info->mti_mdt->mdt_osfs = *osfs; info->mti_mdt->mdt_osfs_age = cfs_time_current_64(); - spin_unlock(&info->mti_mdt->mdt_osfs_lock); + spin_unlock(&info->mti_mdt->mdt_lock); } else { /** use cached statfs data */ - spin_lock(&info->mti_mdt->mdt_osfs_lock); + spin_lock(&info->mti_mdt->mdt_lock); *osfs = info->mti_mdt->mdt_osfs; - spin_unlock(&info->mti_mdt->mdt_osfs_lock); + spin_unlock(&info->mti_mdt->mdt_lock); } if (rc == 0) @@ -1237,17 +1237,13 @@ out: */ static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2) { - __u64 o1_flags; + unsigned int o1_lov_created = o1->mot_lov_created; mutex_lock(&o1->mot_lov_mutex); mutex_lock(&o2->mot_lov_mutex); - o1_flags = o1->mot_flags; - o1->mot_flags = (o1->mot_flags & ~MOF_LOV_CREATED) | - (o2->mot_flags & MOF_LOV_CREATED); - - o2->mot_flags = (o2->mot_flags & ~MOF_LOV_CREATED) | - (o1_flags & MOF_LOV_CREATED); + o1->mot_lov_created = o2->mot_lov_created; + o2->mot_lov_created = o1_lov_created; mutex_unlock(&o2->mot_lov_mutex); mutex_unlock(&o1->mot_lov_mutex); @@ -2375,12 +2371,13 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { - struct lustre_handle lockh; - int rc; + int rc = 0; ENTRY; switch (flag) { - case LDLM_CB_BLOCKING: + case LDLM_CB_BLOCKING: { + struct lustre_handle lockh; + ldlm_lock2handle(lock, &lockh); rc = ldlm_cli_cancel(&lockh, ldlm_is_atomic_cb(lock) ? 0 : LCF_ASYNC); @@ -2389,17 +2386,46 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(rc); } break; - case LDLM_CB_CANCELING: - LDLM_DEBUG(lock, "Revoke remote lock"); + } + case LDLM_CB_CANCELING: { + LDLM_DEBUG(lock, "Revoke remote lock\n"); + /* discard slc lock here so that it can be cleaned anytime, * especially for cleanup_resource() */ tgt_discard_slc_lock(lock); + + /* once we cache lock, l_ast_data is set to mdt_object */ + if (lock->l_ast_data != NULL) { + struct mdt_object *mo = lock->l_ast_data; + struct lu_env env; + + rc = lu_env_init(&env, LCT_MD_THREAD); + if (unlikely(rc != 0)) { + struct obd_device *obd; + + obd = ldlm_lock_to_ns(lock)->ns_obd; + CWARN("%s: lu_env initialization failed, object" + "%p "DFID" is leaked!\n", + obd->obd_name, mo, + PFID(mdt_object_fid(mo))); + RETURN(rc); + } + + if (lock->l_policy_data.l_inodebits.bits & + (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)) { + rc = mo_invalidate(&env, mdt_object_child(mo)); + mo->mot_cache_attr = 0; + } + mdt_object_put(&env, mo); + lu_env_fini(&env); + } break; + } default: LBUG(); } - RETURN(0); + RETURN(rc); } int mdt_check_resent_lock(struct mdt_thread_info *info, @@ -2439,7 +2465,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, const struct lu_fid *fid, struct lustre_handle *lh, - enum ldlm_mode mode, __u64 ibits, bool nonblock) + enum ldlm_mode mode, __u64 ibits, bool nonblock, + bool cache) { struct ldlm_enqueue_info *einfo = &mti->mti_einfo; union ldlm_policy_data *policy = &mti->mti_policy; @@ -2460,12 +2487,24 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, einfo->ei_res_id = res_id; if (nonblock) einfo->ei_nonblock = 1; + if (cache) { + /* + * if we cache lock, couple lock with mdt_object, so that object + * can be easily found in lock ASTs. + */ + mdt_object_get(mti->mti_env, o); + einfo->ei_cbdata = o; + } memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, policy); + if (rc < 0 && cache) { + mdt_object_put(mti->mti_env, o); + einfo->ei_cbdata = NULL; + } RETURN(rc); } @@ -2614,7 +2653,8 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, rc = mdt_remote_object_lock(info, o, mdt_object_fid(o), &lh->mlh_rreg_lh, lh->mlh_rreg_mode, - MDS_INODELOCK_UPDATE, nonblock); + MDS_INODELOCK_UPDATE, nonblock, + false); if (rc != ELDLM_OK) { if (local_lh != NULL) mdt_object_unlock(info, o, local_lh, rc); @@ -2742,17 +2782,24 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, * \param decref force immediate lock releasing */ static void mdt_save_remote_lock(struct mdt_thread_info *info, - struct lustre_handle *h, enum ldlm_mode mode, - int decref) + struct mdt_object *o, struct lustre_handle *h, + enum ldlm_mode mode, int decref) { ENTRY; if (lustre_handle_is_used(h)) { + struct ldlm_lock *lock = ldlm_handle2lock(h); + + if (o != NULL && + (lock->l_policy_data.l_inodebits.bits & + (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE))) + mo_invalidate(info->mti_env, mdt_object_child(o)); + if (decref || !info->mti_has_trans || !(mode & (LCK_PW | LCK_EX))) { ldlm_lock_decref_and_cancel(h, mode); + LDLM_LOCK_PUT(lock); } else { - struct ldlm_lock *lock = ldlm_handle2lock(h); struct ptlrpc_request *req = mdt_info_req(info); LASSERT(req != NULL); @@ -2784,7 +2831,8 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); - mdt_save_remote_lock(info, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref); + mdt_save_remote_lock(info, o, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, + decref); EXIT; } @@ -4446,6 +4494,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) struct lfsck_stop stop; ENTRY; + if (m->mdt_md_root != NULL) { + mdt_object_put(env, m->mdt_md_root); + m->mdt_md_root = NULL; + } + stop.ls_status = LS_PAUSED; stop.ls_flags = 0; next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop); @@ -4566,7 +4619,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); init_rwsem(&m->mdt_squash.rsi_sem); - spin_lock_init(&m->mdt_osfs_lock); + spin_lock_init(&m->mdt_lock); m->mdt_osfs_age = cfs_time_shift_64(-1000); m->mdt_enable_remote_dir = 0; m->mdt_enable_remote_dir_gid = 0; @@ -4902,8 +4955,10 @@ static int mdt_object_print(const struct lu_env *env, void *cookie, struct mdt_object *mdto = mdt_obj((struct lu_object *)o); return (*p)(env, cookie, - LUSTRE_MDT_NAME"-object@%p(flags=%d, writecount=%d)", - mdto, mdto->mot_flags, mdto->mot_write_count); + LUSTRE_MDT_NAME"-object@%p(%s %s, writecount=%d)", + mdto, mdto->mot_lov_created ? "lov_created" : "", + mdto->mot_cache_attr ? "cache_attr" : "", + mdto->mot_write_count); } static int mdt_prepare(const struct lu_env *env, diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 9015cc6..68dcde9 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -199,10 +199,13 @@ struct mdt_device { mdt_skip_lfsck:1; gid_t mdt_enable_remote_dir_gid; + + /* lock for osfs and md_root */ + spinlock_t mdt_lock; + /* statfs optimization: we cache a bit */ struct obd_statfs mdt_osfs; __u64 mdt_osfs_age; - spinlock_t mdt_osfs_lock; /* root squash */ struct root_squash_info mdt_squash; @@ -222,20 +225,19 @@ struct mdt_device { /* MDT device async commit count, used for debug and sanity test */ atomic_t mdt_async_commit_count; + + struct mdt_object *mdt_md_root; }; #define MDT_SERVICE_WATCHDOG_FACTOR (2) #define MDT_COS_DEFAULT (0) -enum mdt_object_flags { - /** lov object has been created. */ - MOF_LOV_CREATED = 1 << 0, -}; - struct mdt_object { struct lu_object_header mot_header; struct lu_object mot_obj; - enum mdt_object_flags mot_flags; + unsigned int mot_lov_created:1, /* lov object created */ + mot_cache_attr:1; /* enable remote object + * attribute cache */ int mot_write_count; spinlock_t mot_write_lock; /* Lock to protect create_data */ @@ -626,7 +628,8 @@ void mdt_client_compatibility(struct mdt_thread_info *info); int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, const struct lu_fid *fid, struct lustre_handle *lh, - enum ldlm_mode mode, __u64 ibits, bool nonblock); + enum ldlm_mode mode, __u64 ibits, bool nonblock, + bool cache); enum mdt_name_flags { MNF_FIX_ANON = 1, diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index b305b57..7872e9c 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -123,7 +123,7 @@ static int mdt_create_data(struct mdt_thread_info *info, ma->ma_need = MA_INODE | MA_LOV; ma->ma_valid = 0; mutex_lock(&o->mot_lov_mutex); - if (!(o->mot_flags & MOF_LOV_CREATED)) { + if (!o->mot_lov_created) { rc = mdo_create_data(info->mti_env, p ? mdt_object_child(p) : NULL, mdt_object_child(o), spec, ma); @@ -131,7 +131,7 @@ static int mdt_create_data(struct mdt_thread_info *info, rc = mdt_attr_get_complex(info, o, ma); if (rc == 0 && ma->ma_valid & MA_LOV) - o->mot_flags |= MOF_LOV_CREATED; + o->mot_lov_created = 1; } mutex_unlock(&o->mot_lov_mutex); @@ -1172,6 +1172,48 @@ out: RETURN(rc); } +/* + * find root object and take its xattr lock if it's on remote MDT, later create + * may use fs default striping (which is stored in root xattr). + */ +static int mdt_lock_root_xattr(struct mdt_thread_info *info, + struct mdt_device *mdt) +{ + struct mdt_object *md_root = mdt->mdt_md_root; + struct lustre_handle lhroot; + int rc; + + if (md_root == NULL) { + lu_root_fid(&info->mti_tmp_fid1); + md_root = mdt_object_find(info->mti_env, mdt, + &info->mti_tmp_fid1); + if (IS_ERR(md_root)) + return PTR_ERR(md_root); + + spin_lock(&mdt->mdt_lock); + if (mdt->mdt_md_root != NULL) + mdt_object_put(info->mti_env, mdt->mdt_md_root); + mdt->mdt_md_root = md_root; + spin_unlock(&mdt->mdt_lock); + } + + if (md_root->mot_cache_attr || !mdt_object_remote(md_root)) + return 0; + + rc = mdt_remote_object_lock(info, md_root, mdt_object_fid(md_root), + &lhroot, LCK_PR, MDS_INODELOCK_XATTR, + false, true); + if (rc < 0) + return rc; + + md_root->mot_cache_attr = 1; + + /* don't cancel this lock, so that we know the cached xattr is valid. */ + ldlm_lock_decref(&lhroot, LCK_PR); + + return 0; +} + int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) { struct mdt_device *mdt = info->mti_mdt; @@ -1263,17 +1305,21 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) GOTO(out, result); } - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) - GOTO(out, result = err_serious(-ENOMEM)); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) + GOTO(out, result = err_serious(-ENOMEM)); - mdt_set_disposition(info, ldlm_rep, - (DISP_IT_EXECD | DISP_LOOKUP_EXECD)); + mdt_set_disposition(info, ldlm_rep, + (DISP_IT_EXECD | DISP_LOOKUP_EXECD)); if (!lu_name_is_valid(&rr->rr_name)) GOTO(out, result = -EPROTO); + result = mdt_lock_root_xattr(info, mdt); + if (result < 0) + GOTO(out, result); + again: - lh = &info->mti_lh[MDT_LH_PARENT]; + lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR, &rr->rr_name); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index cfe0dd1..461675c 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -323,7 +323,7 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti, /* borrow s0_lh temporarily to do mdt unlock */ mdt_lock_reg_init(s0_lh, einfo->ei_mode); s0_lh->mlh_rreg_lh = slave_locks->handles[i]; - mdt_object_unlock(mti, obj, s0_lh, decref); + mdt_object_unlock(mti, NULL, s0_lh, decref); slave_locks->handles[i].cookie = 0ull; } } @@ -386,6 +386,8 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, int rc; ENTRY; + memset(einfo, 0, sizeof(*einfo)); + rc = mdt_init_slaves(mti, obj, s0_fid); if (rc <= 0) RETURN(rc); @@ -409,7 +411,6 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, } } - memset(einfo, 0, sizeof(*einfo)); einfo->ei_type = LDLM_IBITS; einfo->ei_mode = mode; einfo->ei_cb_bl = mdt_remote_blocking_ast; @@ -752,21 +753,10 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, GOTO(out_put, rc = -EPROTO); rc = mdt_attr_set(info, mo, ma); - if (rc) - GOTO(out_put, rc); - } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) { - struct lu_buf *buf = &info->mti_buf; - - if (ma->ma_attr.la_valid != 0) - GOTO(out_put, rc = -EPROTO); - - buf->lb_buf = ma->ma_lmm; - buf->lb_len = ma->ma_lmm_size; - rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), - buf, XATTR_NAME_LOV, 0); if (rc) GOTO(out_put, rc); - } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) { + } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) && + (ma->ma_valid & MA_INODE)) { struct lu_buf *buf = &info->mti_buf; struct mdt_lock_handle *lh; @@ -780,15 +770,21 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_reg_init(lh, LCK_PW); - rc = mdt_object_lock(info, mo, lh, - MDS_INODELOCK_XATTR); + rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR); if (rc != 0) GOTO(out_put, rc); - buf->lb_buf = ma->ma_lmv; - buf->lb_len = ma->ma_lmv_size; - rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), - buf, XATTR_NAME_DEFAULT_LMV, 0); + if (ma->ma_valid & MA_LOV) { + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + } else { + buf->lb_buf = ma->ma_lmv; + buf->lb_len = ma->ma_lmv_size; + } + rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf, + (ma->ma_valid & MA_LOV) ? + XATTR_NAME_LOV : XATTR_NAME_DEFAULT_LMV, + 0); mdt_object_unlock(info, mo, lh, rc); if (rc) @@ -1021,7 +1017,7 @@ relock: rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc), &child_lh->mlh_rreg_lh, child_lh->mlh_rreg_mode, - MDS_INODELOCK_LOOKUP, false); + MDS_INODELOCK_LOOKUP, false, false); if (rc != ELDLM_OK) GOTO(put_child, rc); @@ -1275,7 +1271,7 @@ static int mdt_rename_lock(struct mdt_thread_info *info, rc = mdt_remote_object_lock(info, obj, &LUSTRE_BFL_FID, lh, LCK_EX, - MDS_INODELOCK_UPDATE, false); + MDS_INODELOCK_UPDATE, false, false); mdt_object_put(info->mti_env, obj); } else { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; @@ -1654,7 +1650,7 @@ out_lease: rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold), &lh_childp->mlh_rreg_lh, lh_childp->mlh_rreg_mode, - MDS_INODELOCK_LOOKUP, false); + MDS_INODELOCK_LOOKUP, false, false); if (rc != ELDLM_OK) GOTO(out_unlock_list, rc); @@ -1720,7 +1716,7 @@ out_lease: mdt_object_fid(mnew), &lh_tgtp->mlh_rreg_lh, lh_tgtp->mlh_rreg_mode, - MDS_INODELOCK_UPDATE, false); + MDS_INODELOCK_UPDATE, false, false); if (rc != 0) { lh_tgtp = NULL; GOTO(out_put_new, rc); @@ -2032,7 +2028,7 @@ relock: &lh_oldp->mlh_rreg_lh, lh_oldp->mlh_rreg_mode, MDS_INODELOCK_LOOKUP, - false); + false, false); if (rc != ELDLM_OK) GOTO(out_put_new, rc); @@ -2081,7 +2077,7 @@ relock: &lh_oldp->mlh_rreg_lh, lh_oldp->mlh_rreg_mode, MDS_INODELOCK_LOOKUP, - false); + false, false); if (rc != ELDLM_OK) GOTO(out_put_old, rc); diff --git a/lustre/obdclass/linkea.c b/lustre/obdclass/linkea.c index 910efb9..c119fcd 100644 --- a/lustre/obdclass/linkea.c +++ b/lustre/obdclass/linkea.c @@ -132,8 +132,8 @@ int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname, ldata->ld_reclen = linkea_entry_pack(ldata->ld_lee, lname, pfid); ldata->ld_leh->leh_len += ldata->ld_reclen; ldata->ld_leh->leh_reccount++; - CDEBUG(D_INODE, "New link_ea name '%.*s' is added\n", - lname->ln_namelen, lname->ln_name); + CDEBUG(D_INODE, "New link_ea name '"DFID":%.*s' is added\n", + PFID(pfid), lname->ln_namelen, lname->ln_name); return 0; } EXPORT_SYMBOL(linkea_add_buf); diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index bf1a02d..984f052 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -4003,6 +4003,11 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static int osd_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + return 0; +} + /* * Index operations. */ @@ -4170,6 +4175,7 @@ static const struct dt_object_operations osd_obj_ops = { .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, .do_object_sync = osd_object_sync, + .do_invalidate = osd_invalidate, }; /** diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index 7034d1d..a1d3ab7 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -1684,6 +1684,11 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt, RETURN(0); } +static int osd_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + return 0; +} + static struct dt_object_operations osd_obj_ops = { .do_read_lock = osd_object_read_lock, .do_write_lock = osd_object_write_lock, @@ -1710,6 +1715,7 @@ static struct dt_object_operations osd_obj_ops = { .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, .do_object_sync = osd_object_sync, + .do_invalidate = osd_invalidate, }; static struct lu_object_operations osd_lu_obj_ops = { diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 9dab59c..087d1c9 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -455,10 +455,11 @@ static void osp_last_used_fini(const struct lu_env *env, struct osp_device *osp) */ static int osp_disconnect(struct osp_device *d) { + struct obd_device *obd = d->opd_obd; struct obd_import *imp; int rc = 0; - imp = d->opd_obd->u.cli.cl_import; + imp = obd->u.cli.cl_import; /* Mark import deactivated now, so we don't try to reconnect if any * of the cleanup RPCs fails (e.g. ldlm cancel, etc). We don't @@ -477,8 +478,7 @@ static int osp_disconnect(struct osp_device *d) rc = ptlrpc_disconnect_import(imp, 0); if (rc != 0) - CERROR("%s: can't disconnect: rc = %d\n", - d->opd_obd->obd_name, rc); + CERROR("%s: can't disconnect: rc = %d\n", obd->obd_name, rc); ptlrpc_invalidate_import(imp); @@ -636,6 +636,8 @@ static int osp_process_config(const struct lu_env *env, case LCFG_PRE_CLEANUP: rc = osp_disconnect(d); osp_update_fini(env, d); + if (obd->obd_namespace != NULL) + ldlm_namespace_free_prior(obd->obd_namespace, NULL, 1); break; case LCFG_CLEANUP: lu_dev_del_linkage(dev->ld_site, dev); diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 64c727d..66f2e13 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -280,11 +280,6 @@ struct osp_xattr_entry { char oxe_buf[0]; }; -struct osp_object_attr { - struct lu_attr ooa_attr; - struct list_head ooa_xattr_list; -}; - /* this is a top object */ struct osp_object { struct lu_object_header opo_header; @@ -296,7 +291,8 @@ struct osp_object { /* read/write lock for md osp object */ struct rw_semaphore opo_sem; const struct lu_env *opo_owner; - struct osp_object_attr *opo_ooa; + struct lu_attr opo_attr; + struct list_head opo_xattr_list; /* Protect opo_ooa. */ spinlock_t opo_lock; }; @@ -733,6 +729,7 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th); int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th); +int osp_invalidate(const struct lu_env *env, struct dt_object *dt); int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th); @@ -748,7 +745,6 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di); int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, void *key_rec); int osp_it_next_page(const struct lu_env *env, struct dt_it *di); -int osp_oac_init(struct osp_object *obj); /* osp_md_object.c */ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object *dt, diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 21ccacf..461df06 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -88,16 +88,11 @@ static int osp_object_create_interpreter(const struct lu_env *env, obj->opo_non_exist = 1; } - /* Invalid the opo cache for the object after the object - * is being created, so attr_get will try to get attr - * from the remote object. XXX this can be improved when - * we have object lock/cache invalidate mechanism in OSP - * layer */ - if (obj->opo_ooa != NULL) { - spin_lock(&obj->opo_lock); - obj->opo_ooa->ooa_attr.la_valid = 0; - spin_unlock(&obj->opo_lock); - } + /* + * invalidate opo cache for the object after the object is created, so + * attr_get will try to get attr from remote object. + */ + osp_invalidate(env, &obj->opo_obj); return 0; } @@ -125,15 +120,6 @@ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); - int rc; - - if (obj->opo_ooa == NULL) { - rc = osp_oac_init(obj); - if (rc != 0) - return rc; - } - return osp_trans_update_request_create(th); } @@ -196,8 +182,7 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); dt2osp_obj(dt)->opo_non_exist = 0; - LASSERT(obj->opo_ooa != NULL); - obj->opo_ooa->ooa_attr = *attr; + obj->opo_attr = *attr; out: return rc; } @@ -1033,6 +1018,7 @@ struct dt_object_operations osp_md_obj_ops = { .do_index_try = osp_md_index_try, .do_object_lock = osp_md_object_lock, .do_object_unlock = osp_md_object_unlock, + .do_invalidate = osp_invalidate, }; /** @@ -1115,10 +1101,8 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, /* XXX: how about the write error happened later? */ *pos += buf->lb_len; - if (obj->opo_ooa != NULL && - obj->opo_ooa->ooa_attr.la_valid & LA_SIZE && - obj->opo_ooa->ooa_attr.la_size < *pos) - obj->opo_ooa->ooa_attr.la_size = *pos; + if (obj->opo_attr.la_valid & LA_SIZE && obj->opo_attr.la_size < *pos) + obj->opo_attr.la_size = *pos; RETURN(buf->lb_len); } diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 96e3135..1f46f77 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -59,23 +59,19 @@ * char oxe_buf[0]; * }; * - * struct osp_object_attr { - * struct lu_attr ooa_attr; - * struct list_head ooa_xattr_list; - * }; - * * struct osp_object { * ... - * struct osp_object_attr *opo_ooa; + * struct lu_attr opo_attr; + * struct list_head opo_xattr_list; * spinlock_t opo_lock; * ... * }; * * The basic attributes, such as owner/mode/flags, are stored in the - * osp_object_attr::ooa_attr. The extended attributes will be stored + * osp_object::opo_attr. The extended attributes will be stored * as osp_xattr_entry. Every extended attribute has an independent * osp_xattr_entry, and all the osp_xattr_entry are linked into the - * osp_object_attr::ooa_xattr_list. The OSP object attributes cache + * osp_object::opo_xattr_list. The OSP object attributes cache * is protected by the osp_object::opo_lock. * * Not all OSP objects have an attributes cache because maintaining @@ -164,41 +160,12 @@ static void osp_object_assign_fid(const struct lu_env *env, } /** - * Initialize the OSP object attributes cache. - * - * \param[in] obj pointer to the OSP object - * - * \retval 0 for success - * \retval negative error number on failure - */ -int osp_oac_init(struct osp_object *obj) -{ - struct osp_object_attr *ooa; - - OBD_ALLOC_PTR(ooa); - if (ooa == NULL) - return -ENOMEM; - - INIT_LIST_HEAD(&ooa->ooa_xattr_list); - spin_lock(&obj->opo_lock); - if (likely(obj->opo_ooa == NULL)) { - obj->opo_ooa = ooa; - spin_unlock(&obj->opo_lock); - } else { - spin_unlock(&obj->opo_lock); - OBD_FREE_PTR(ooa); - } - - return 0; -} - -/** * Find the named extended attribute in the OSP object attributes cache. * * The caller should take the osp_object::opo_lock before calling * this function. * - * \param[in] ooa pointer to the OSP object attributes cache + * \param[in] obj pointer to the OSP object * \param[in] name the name of the extended attribute * \param[in] namelen the name length of the extended attribute * @@ -207,12 +174,12 @@ int osp_oac_init(struct osp_object *obj) * in the cache */ static struct osp_xattr_entry * -osp_oac_xattr_find_locked(struct osp_object_attr *ooa, - const char *name, size_t namelen) +osp_oac_xattr_find_locked(struct osp_object *obj, const char *name, + size_t namelen) { struct osp_xattr_entry *oxe; - list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) { + list_for_each_entry(oxe, &obj->opo_xattr_list, oxe_list) { if (namelen == oxe->oxe_namelen && strncmp(name, oxe->oxe_buf, namelen) == 0) return oxe; @@ -241,15 +208,12 @@ static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj, struct osp_xattr_entry *oxe = NULL; spin_lock(&obj->opo_lock); - if (obj->opo_ooa != NULL) { - oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name, - strlen(name)); - if (oxe != NULL) { - if (unlink) - list_del_init(&oxe->oxe_list); - else - atomic_inc(&oxe->oxe_ref); - } + oxe = osp_oac_xattr_find_locked(obj, name, strlen(name)); + if (oxe != NULL) { + if (unlink) + list_del_init(&oxe->oxe_list); + else + atomic_inc(&oxe->oxe_ref); } spin_unlock(&obj->opo_lock); @@ -274,14 +238,11 @@ static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj, static struct osp_xattr_entry * osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) { - struct osp_object_attr *ooa = obj->opo_ooa; struct osp_xattr_entry *oxe; struct osp_xattr_entry *tmp = NULL; size_t namelen = strlen(name); size_t size = sizeof(*oxe) + namelen + 1 + len; - LASSERT(ooa != NULL); - oxe = osp_oac_xattr_find(obj, name, false); if (oxe != NULL) return oxe; @@ -299,9 +260,9 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) atomic_set(&oxe->oxe_ref, 2); spin_lock(&obj->opo_lock); - tmp = osp_oac_xattr_find_locked(ooa, name, namelen); + tmp = osp_oac_xattr_find_locked(obj, name, namelen); if (tmp == NULL) - list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list); + list_add_tail(&oxe->oxe_list, &obj->opo_xattr_list); else atomic_inc(&tmp->oxe_ref); spin_unlock(&obj->opo_lock); @@ -335,13 +296,10 @@ static struct osp_xattr_entry * osp_oac_xattr_replace(struct osp_object *obj, struct osp_xattr_entry **poxe, size_t len) { - struct osp_object_attr *ooa = obj->opo_ooa; struct osp_xattr_entry *oxe; size_t namelen = (*poxe)->oxe_namelen; size_t size = sizeof(*oxe) + namelen + 1 + len; - LASSERT(ooa != NULL); - OBD_ALLOC(oxe, size); if (unlikely(oxe == NULL)) return NULL; @@ -355,11 +313,11 @@ osp_oac_xattr_replace(struct osp_object *obj, atomic_set(&oxe->oxe_ref, 2); spin_lock(&obj->opo_lock); - *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen); + *poxe = osp_oac_xattr_find_locked(obj, oxe->oxe_buf, namelen); LASSERT(*poxe != NULL); list_del_init(&(*poxe)->oxe_list); - list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list); + list_add_tail(&oxe->oxe_list, &obj->opo_xattr_list); spin_unlock(&obj->opo_lock); return oxe; @@ -423,15 +381,9 @@ static int osp_get_attr_from_reply(const struct lu_env *env, lustre_get_wire_obdo(NULL, lobdo, wobdo); spin_lock(&obj->opo_lock); - if (obj->opo_ooa != NULL) { - la_from_obdo(&obj->opo_ooa->ooa_attr, lobdo, lobdo->o_valid); - if (attr != NULL) - *attr = obj->opo_ooa->ooa_attr; - } else { - LASSERT(attr != NULL); - - la_from_obdo(attr, lobdo, lobdo->o_valid); - } + la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid); + if (attr != NULL) + *attr = obj->opo_attr; spin_unlock(&obj->opo_lock); return 0; @@ -462,8 +414,6 @@ static int osp_attr_get_interpterer(const struct lu_env *env, { struct lu_attr *attr = data; - LASSERT(obj->opo_ooa != NULL); - if (rc == 0) { osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS; obj->opo_non_exist = 0; @@ -506,16 +456,9 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt) struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); int rc = 0; - if (obj->opo_ooa == NULL) { - rc = osp_oac_init(obj); - if (rc != 0) - return rc; - } - mutex_lock(&osp->opd_async_requests_mutex); rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL, - &obj->opo_ooa->ooa_attr, - sizeof(struct obdo), + &obj->opo_attr, sizeof(struct obdo), osp_attr_get_interpterer); mutex_unlock(&osp->opd_async_requests_mutex); @@ -554,16 +497,14 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist) RETURN(-ENOENT); - if (obj->opo_ooa != NULL) { - spin_lock(&obj->opo_lock); - if (obj->opo_ooa->ooa_attr.la_valid != 0 && !obj->opo_stale) { - *attr = obj->opo_ooa->ooa_attr; - spin_unlock(&obj->opo_lock); - - RETURN(0); - } + spin_lock(&obj->opo_lock); + if (obj->opo_attr.la_valid != 0 && !obj->opo_stale) { + *attr = obj->opo_attr; spin_unlock(&obj->opo_lock); + + RETURN(0); } + spin_unlock(&obj->opo_lock); update = osp_update_request_create(dev); if (IS_ERR(update)) @@ -606,8 +547,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); spin_lock(&obj->opo_lock); - if (obj->opo_stale) - obj->opo_stale = 0; + obj->opo_stale = 0; spin_unlock(&obj->opo_lock); GOTO(out, rc); @@ -729,11 +669,11 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, CDEBUG(D_INFO, "(1) set attr "DFID": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), rc); - if (rc != 0 || o->opo_ooa == NULL) + if (rc != 0) RETURN(rc); /* Update the OSP object attributes cache. */ - la = &o->opo_ooa->ooa_attr; + la = &o->opo_attr; spin_lock(&o->opo_lock); if (attr->la_valid & LA_UID) { la->la_uid = attr->la_uid; @@ -773,12 +713,9 @@ static int osp_xattr_get_interpterer(const struct lu_env *env, struct osp_object *obj, void *data, int index, int rc) { - struct osp_object_attr *ooa = obj->opo_ooa; struct osp_xattr_entry *oxe = data; struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; - LASSERT(ooa != NULL); - if (rc == 0) { size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1; @@ -848,12 +785,6 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, if (unlikely(buf->lb_len == 0)) return 0; - if (obj->opo_ooa == NULL) { - rc = osp_oac_init(obj); - if (rc != 0) - return rc; - } - oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len); if (oxe == NULL) return -ENOMEM; @@ -945,34 +876,31 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, if (unlikely(obj->opo_non_exist)) RETURN(-ENOENT); - /* Only cache xattr for OST object */ - if (!osp->opd_connect_mdt) { - oxe = osp_oac_xattr_find(obj, name, false); - if (oxe != NULL) { - spin_lock(&obj->opo_lock); - if (oxe->oxe_ready) { - if (!oxe->oxe_exist) - GOTO(unlock, rc = -ENODATA); + oxe = osp_oac_xattr_find(obj, name, false); + if (oxe != NULL) { + spin_lock(&obj->opo_lock); + if (oxe->oxe_ready) { + if (!oxe->oxe_exist) + GOTO(unlock, rc = -ENODATA); - if (buf->lb_buf == NULL) - GOTO(unlock, rc = oxe->oxe_vallen); + if (buf->lb_buf == NULL) + GOTO(unlock, rc = oxe->oxe_vallen); - if (buf->lb_len < oxe->oxe_vallen) - GOTO(unlock, rc = -ERANGE); + if (buf->lb_len < oxe->oxe_vallen) + GOTO(unlock, rc = -ERANGE); - memcpy(buf->lb_buf, oxe->oxe_value, - oxe->oxe_vallen); + memcpy(buf->lb_buf, oxe->oxe_value, + oxe->oxe_vallen); - GOTO(unlock, rc = oxe->oxe_vallen); + GOTO(unlock, rc = oxe->oxe_vallen); unlock: - spin_unlock(&obj->opo_lock); - osp_oac_xattr_put(oxe); - - return rc; - } spin_unlock(&obj->opo_lock); + osp_oac_xattr_put(oxe); + + return rc; } + spin_unlock(&obj->opo_lock); } update = osp_update_request_create(dev); if (IS_ERR(update)) @@ -993,9 +921,6 @@ unlock: obj->opo_non_exist = 1; } - if (obj->opo_ooa == NULL) - GOTO(out, rc); - if (oxe == NULL) oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len); @@ -1041,8 +966,6 @@ unlock: GOTO(out, rc = -ERANGE); memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len); - if (obj->opo_ooa == NULL || osp->opd_connect_mdt) - GOTO(out, rc); if (oxe == NULL) { oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len); @@ -1155,7 +1078,6 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); - struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct osp_update_request *update; struct osp_xattr_entry *oxe; int rc; @@ -1169,7 +1091,7 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, rc = osp_update_rpc_pack(env, xattr_set, update, OUT_XATTR_SET, lu_object_fid(&dt->do_lu), buf, name, fl); - if (rc != 0 || o->opo_ooa == NULL || osp->opd_connect_mdt) + if (rc != 0) RETURN(rc); oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len); @@ -1271,7 +1193,7 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL, fid, name); - if (rc != 0 || o->opo_ooa == NULL) + if (rc != 0) return rc; oxe = osp_oac_xattr_find(o, name, true); @@ -1283,6 +1205,37 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, } /** + * Implement OSP layer dt_object_operations::do_invalidate() interface. + * + * Invalidate attributes cached on the specified MDT/OST object. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + struct osp_object *obj = dt2osp_obj(dt); + struct osp_xattr_entry *oxe; + struct osp_xattr_entry *tmp; + ENTRY; + + spin_lock(&obj->opo_lock); + list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) { + oxe->oxe_ready = 0; + list_del_init(&oxe->oxe_list); + osp_oac_xattr_put(oxe); + } + obj->opo_attr.la_valid = 0; + obj->opo_stale = 1; + spin_unlock(&obj->opo_lock); + + RETURN(0); +} + +/** * Implement OSP layer dt_object_operations::do_declare_create() interface. * * Declare that the caller will create the OST object. @@ -2133,6 +2086,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o, spin_lock_init(&po->opo_lock); o->lo_header->loh_attr |= LOHA_REMOTE; + INIT_LIST_HEAD(&po->opo_xattr_list); if (is_ost_obj(o)) { po->opo_obj.do_ops = &osp_obj_ops; @@ -2141,6 +2095,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o, po->opo_obj.do_ops = &osp_md_obj_ops; po->opo_obj.do_body_ops = &osp_md_body_ops; + if (conf != NULL && conf->loc_flags & LOC_F_NEW) { po->opo_non_exist = 1; } else { @@ -2174,26 +2129,20 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o) { struct osp_object *obj = lu2osp_obj(o); struct lu_object_header *h = o->lo_header; + struct osp_xattr_entry *oxe; + struct osp_xattr_entry *tmp; + int count; dt_object_fini(&obj->opo_obj); lu_object_header_fini(h); - if (obj->opo_ooa != NULL) { - struct osp_xattr_entry *oxe; - struct osp_xattr_entry *tmp; - int count; - - list_for_each_entry_safe(oxe, tmp, - &obj->opo_ooa->ooa_xattr_list, - oxe_list) { - list_del(&oxe->oxe_list); - count = atomic_read(&oxe->oxe_ref); - LASSERTF(count == 1, - "Still has %d users on the xattr entry %.*s\n", - count-1, (int)oxe->oxe_namelen, oxe->oxe_buf); - - OBD_FREE(oxe, oxe->oxe_buflen); - } - OBD_FREE_PTR(obj->opo_ooa); + list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) { + list_del(&oxe->oxe_list); + count = atomic_read(&oxe->oxe_ref); + LASSERTF(count == 1, + "Still has %d users on the xattr entry %.*s\n", + count-1, (int)oxe->oxe_namelen, oxe->oxe_buf); + + OBD_FREE(oxe, oxe->oxe_buflen); } OBD_SLAB_FREE_PTR(obj, osp_object_kmem); } diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index 7bb3bea..4d2adc6 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -491,7 +491,6 @@ static void osp_thandle_invalidate_object(const struct lu_env *env, struct object_update_request *our_req = ours->ours_req; unsigned int i; struct lu_object *obj; - struct osp_object *osp_obj; for (i = 0; i < our_req->ourq_count; i++) { struct object_update *update; @@ -512,13 +511,7 @@ static void osp_thandle_invalidate_object(const struct lu_env *env, if (IS_ERR(obj)) break; - osp_obj = lu2osp_obj(obj); - if (osp_obj->opo_ooa != NULL) { - spin_lock(&osp_obj->opo_lock); - osp_obj->opo_ooa->ooa_attr.la_valid = 0; - osp_obj->opo_stale = 1; - spin_unlock(&osp_obj->opo_lock); - } + osp_invalidate(env, lu2dt_obj(obj)); lu_object_put(env, obj); } } diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index dfc8ba7..6618b42 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -13162,7 +13162,7 @@ test_230f() { # a should be migrated to MDT1, since no other links on MDT0 $LFS migrate -m 1 $DIR/$tdir/migrate_dir || - error "migrate dir fails" + error "#1 migrate dir fails" mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir) [ $mdt_index == 1 ] || error "migrate_dir is not on MDT1" mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir/a) @@ -13170,12 +13170,12 @@ test_230f() { # a should stay on MDT1, because it is a mulitple link file $LFS migrate -m 0 $DIR/$tdir/migrate_dir || - error "migrate dir fails" + error "#2 migrate dir fails" mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir/a) [ $mdt_index == 1 ] || error "a is not on MDT1" $LFS migrate -m 1 $DIR/$tdir/migrate_dir || - error "migrate dir fails" + error "#3 migrate dir fails" a_fid=$($LFS path2fid $DIR/$tdir/migrate_dir/a) ln_fid=$($LFS path2fid $DIR/$tdir/other_dir/ln1) @@ -13186,7 +13186,7 @@ test_230f() { # a should be migrated to MDT0, since no other links on MDT1 $LFS migrate -m 0 $DIR/$tdir/migrate_dir || - error "migrate dir fails" + error "#4 migrate dir fails" mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir/a) [ $mdt_index == 0 ] || error "a is not on MDT0" @@ -14901,6 +14901,84 @@ test_405() { } run_test 405 "Various layout swap lock tests" +test_406() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + [ $OSTCOUNT -lt 2 ] && skip "needs >= 2 OSTs" && return + [ -n "$FILESET" ] && skip "SKIP due to FILESET set" && return + [ $PARALLEL == "yes" ] && skip "skip parallel run" && return + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.8.50) ] && + skip "Need MDS version at least 2.8.50" && return + + local def_stripenr=$($GETSTRIPE -c $MOUNT) + local def_stripe_size=$($GETSTRIPE -S $MOUNT) + local def_stripe_offset=$($GETSTRIPE -i $MOUNT) + local def_pool=$($GETSTRIPE -p $MOUNT) + + local test_pool=$TESTNAME + pool_add $test_pool || error "pool_add failed" + pool_add_targets $test_pool 0 $(($OSTCOUNT - 1)) 1 || + error "pool_add_targets failed" + + # parent set default stripe count only, child will stripe from both + # parent and fs default + $SETSTRIPE -c 1 -i 1 -S $((def_stripe_size * 2)) -p $test_pool $MOUNT || + error "setstripe $MOUNT failed" + $LFS mkdir -c $MDSCOUNT $DIR/$tdir || error "mkdir $tdir failed" + $SETSTRIPE -c $OSTCOUNT $DIR/$tdir || error "setstripe $tdir failed" + for i in $(seq 10); do + local f=$DIR/$tdir/$tfile.$i + touch $f || error "touch failed" + local count=$($GETSTRIPE -c $f) + [ $count -eq $OSTCOUNT ] || + error "$f stripe count $count != $OSTCOUNT" + local offset=$($GETSTRIPE -i $f) + [ $offset -eq 1 ] || error "$f stripe offset $offset != 1" + local size=$($GETSTRIPE -S $f) + [ $size -eq $((def_stripe_size * 2)) ] || + error "$f stripe size $size != $((def_stripe_size * 2))" + local pool=$($GETSTRIPE -p $f) + [ $pool == $test_pool ] || error "$f pool $pool != $test_pool" + done + + # change fs default striping, delete parent default striping, now child + # will stripe from new fs default striping only + $SETSTRIPE -c 1 -S $def_stripe_size -i 0 $MOUNT || + error "change $MOUNT default stripe failed" + $SETSTRIPE -c 0 $DIR/$tdir || error "delete $tdir default stripe failed" + for i in $(seq 11 20); do + local f=$DIR/$tdir/$tfile.$i + touch $f || error "touch $f failed" + local count=$($GETSTRIPE -c $f) + [ $count -eq 1 ] || error "$f stripe count $count != 1" + local offset=$($GETSTRIPE -i $f) + [ $offset -eq 0 ] || error "$f stripe offset $offset != 0" + local size=$($GETSTRIPE -S $f) + [ $size -eq $def_stripe_size ] || + error "$f stripe size $size != $def_stripe_size" + local pool=$($GETSTRIPE -p $f) + [ "#$pool" == "#" ] || error "$f pool $pool is set" + + done + + unlinkmany $DIR/$tdir/$tfile. 1 20 + + # restore FS default striping + if [ -z $def_pool ]; then + $SETSTRIPE -c $def_stripenr -S $def_stripe_size \ + -i $def_stripe_offset $MOUNT || + error "restore default striping failed" + else + $SETSTRIPE -c $def_stripenr -S $def_stripe_size -p $def_pool \ + -i $def_stripe_offset $MOUNT || + error "restore default striping with $def_pool failed" + fi + + local f=$DIR/$tdir/$tfile + pool_remove_all_targets $test_pool $f + pool_remove $test_pool $f +} +run_test 406 "DNE support fs default striping" + # # tests that do cleanup/setup should be run at the end #