Whamcloud - gitweb
LU-7660 dne: support fs default stripe 41/19041/23
authorLai Siyao <lai.siyao@intel.com>
Mon, 21 Mar 2016 14:05:10 +0000 (22:05 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 11 Jul 2016 23:55:29 +0000 (23:55 +0000)
On DNE system slave MDTs should honor fs default stripe, to achieve
this, slave MDT should lock fs root, and cache XATTR_NAME_LOV, below
functionalities are added for this:
* take and cache remote root XATTR lock on slave MDTs.
* add dt_invalidate operation to invalidate OSP attributes cache,
  normally it's called in mdt_object_unlock(), but if we cache this
  lock, it's called upon cross-MDT lock cancel.
* enable OSP attribute cache by default.
* migrate should invalidate remote target explictely if it's newly
  created, because it doesn't take its lock beforehand.
* OSP should cleanup ldlm namespace in device cleanup because there
  may be cached locks.
* add sanity.sh 405 test for this.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I90e5f35f3b6a294a2a559c28df267f92ce188b9c
Reviewed-on: http://review.whamcloud.com/19041
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
23 files changed:
lustre/include/dt_object.h
lustre/include/md_object.h
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/lod/lod_qos.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/obdclass/linkea.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_object.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_trans.c
lustre/tests/sanity.sh

index c69674f..ebabaa6 100644 (file)
@@ -1015,6 +1015,20 @@ struct dt_object_operations {
                                struct dt_object *dt,
                                struct ldlm_enqueue_info *einfo,
                                union ldlm_policy_data *policy);
+
+       /**
+        * Invalidate attribute cache.
+        *
+        * This method invalidate attribute cache of the object, which is on OSP
+        * only.
+        *
+        * \param[in] env       execution envionment for this thread
+        * \param[in] dt        object
+        *
+        * \retval 0            on success
+        * \retval negative     negated errno on error
+        */
+       int   (*do_invalidate)(const struct lu_env *env, struct dt_object *dt);
 };
 
 /**
@@ -2612,6 +2626,15 @@ static inline int dt_xattr_list(const struct lu_env *env, struct dt_object *dt,
        return dt->do_ops->do_xattr_list(env, dt, buf);
 }
 
+static inline int dt_invalidate(const struct lu_env *env, struct dt_object *dt)
+{
+       LASSERT(dt);
+       LASSERT(dt->do_ops);
+       LASSERT(dt->do_ops->do_invalidate);
+
+       return dt->do_ops->do_invalidate(env, dt);
+}
+
 static inline int dt_declare_delete(const struct lu_env *env,
                                     struct dt_object *dt,
                                     const struct dt_key *key,
index 49dd2e6..c34c6e2 100644 (file)
@@ -214,6 +214,8 @@ struct md_object_operations {
                                 struct md_object *obj,
                                 struct ldlm_enqueue_info *einfo,
                                 union ldlm_policy_data *policy);
+
+       int (*moo_invalidate)(const struct lu_env *env, struct md_object *obj);
 };
 
 /**
@@ -402,6 +404,12 @@ static inline int mo_xattr_list(const struct lu_env *env,
         return m->mo_ops->moo_xattr_list(env, m, buf);
 }
 
+static inline int mo_invalidate(const struct lu_env *env, struct md_object *m)
+{
+       LASSERT(m->mo_ops->moo_invalidate);
+       return m->mo_ops->moo_invalidate(env, m);
+}
+
 static inline int mo_swap_layouts(const struct lu_env *env,
                                  struct md_object *o1,
                                  struct md_object *o2, __u64 flags)
index 7d23aae..c3d67e4 100644 (file)
@@ -975,6 +975,11 @@ static int lod_process_config(const struct lu_env *env,
                GOTO(out, rc);
        }
        case LCFG_PRE_CLEANUP: {
+               if (lod->lod_md_root != NULL) {
+                       lu_object_put(env, &lod->lod_md_root->ldo_obj.do_lu);
+                       lod->lod_md_root = NULL;
+               }
+
                lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
                lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
                next = &lod->lod_child->dd_lu_dev;
@@ -1562,7 +1567,7 @@ static int lod_init0(const struct lu_env *env, struct lod_device *lod,
        if (rc)
                GOTO(out_pools, rc);
 
-       spin_lock_init(&lod->lod_desc_lock);
+       spin_lock_init(&lod->lod_lock);
        spin_lock_init(&lod->lod_connects_lock);
        lod_tgt_desc_init(&lod->lod_mdt_descs);
        lod_tgt_desc_init(&lod->lod_ost_descs);
index 42b8ef4..d831ea9 100644 (file)
@@ -180,8 +180,8 @@ struct lod_device {
        /* lov settings descriptor storing static information */
        struct lov_desc       lod_desc;
 
-       /* use to protect ld_active_tgt_count and all ltd_active */
-       spinlock_t           lod_desc_lock;
+       /* protect ld_active_tgt_count, ltd_active and lod_md_root */
+       spinlock_t           lod_lock;
 
        /* Description of OST */
        struct lod_tgt_descs  lod_ost_descs;
@@ -210,6 +210,9 @@ struct lod_device {
        enum lustre_sec_part   lod_sp_me;
 
        struct proc_dir_entry *lod_symlink;
+
+       /* ROOT object, used to fetch FS default striping */
+       struct lod_object      *lod_md_root;
 };
 
 #define lod_osts       lod_ost_descs.ltd_tgts
@@ -226,58 +229,77 @@ struct lod_device {
 #define ltd_mdt                        ltd_tgt
 #define lod_mdt_desc           lod_tgt_desc
 
-struct lod_dir_stripe_info {
-       __u32   ldsi_stripe_offset;
-       __u32   ldsi_def_stripenr;
-       __u32   ldsi_def_stripe_offset;
-       __u32   ldsi_def_hash_type;
-       __u32   ldsi_hash_type;
-
-       unsigned int ldsi_def_striping_set:1,
-                    ldsi_def_striping_cached:1,
-                    ldsi_striped:1;
+struct lod_default_striping {
+       /* default LOV */
+       __u32           lds_def_stripe_size;
+       __u16           lds_def_stripenr;
+       __u16           lds_def_stripe_offset;
+       char            lds_def_pool[LOV_MAXPOOLNAME + 1];
+       /* default LMV */
+       __u32           lds_dir_def_stripenr;
+       __u32           lds_dir_def_stripe_offset;
+       __u32           lds_dir_def_hash_type;
+       /* flags whether default striping is set */
+       __u32           lds_def_striping_set:1,
+                       lds_dir_def_striping_set:1;
 };
 
-/*
- * XXX: shrink this structure, currently it's 72bytes on 32bit arch,
- *      so, slab will be allocating 128bytes
- */
 struct lod_object {
-       struct dt_object   ldo_obj;
-
-       /* if object is striped, then the next fields describe stripes */
-       /* For striped directory, ldo_stripenr == slave stripe count */
-       __u16              ldo_stripenr;
-       __u16              ldo_layout_gen;
-       __u32              ldo_stripe_size;
-       __u32              ldo_pattern;
-       __u16              ldo_released_stripenr;
-       char              *ldo_pool;
-       struct dt_object **ldo_stripe;
-       /* to know how much memory to free, ldo_stripenr can be less */
-       /* default striping for directory represented by this object
-        * is cached in stripenr/stripe_size */
-       unsigned int       ldo_stripes_allocated:16,
-                          ldo_striping_cached:1,
-                          ldo_def_striping_set:1,
-                          ldo_def_striping_cached:1,
-       /* ldo_dir_slave_stripe indicate this is a slave stripe of
-        * a striped dir */
-                          ldo_dir_slave_stripe:1;
-       __u32              ldo_def_stripe_size;
-       __u16              ldo_def_stripenr;
-       __u16              ldo_def_stripe_offset;
-       struct lod_dir_stripe_info      *ldo_dir_stripe;
+       struct dt_object                             ldo_obj;
+       union {
+               /* file stripe */
+               struct {
+                       /*
+                        * don't change field order, because both file and
+                        * directory use ldo_stripenr/ldo_stripes_allocated
+                        * to access stripe number.
+                        */
+                       __u16                        ldo_stripenr;
+                       __u16                        ldo_stripes_allocated;
+                       __u16                        ldo_layout_gen;
+                       __u16                        ldo_released_stripenr;
+                       __u32                        ldo_pattern;
+                       __u32                        ldo_stripe_size;
+                       __u16                        ldo_stripe_offset;
+                       char                        *ldo_pool;
+               };
+               /* directory stripe */
+               struct {
+                       __u16                        ldo_dir_stripenr;
+                       __u16                        ldo_dir_stripes_allocated;
+                       __u32                        ldo_dir_stripe_offset;
+                       __u32                        ldo_dir_hash_type;
+                       __u32                        ldo_dir_slave_stripe:1,
+                                                    ldo_dir_striped:1;
+                       /*
+                        * default striping is not cached, so this field is
+                        * invalid after create, make sure it's used by
+                        * lod_dir_striping_create_internal() only.
+                        */
+                       struct lod_default_striping *ldo_def_striping;
+               };
+       };
+       struct dt_object                           **ldo_stripe;
 };
 
-#define ldo_dir_stripe_offset  ldo_dir_stripe->ldsi_stripe_offset
-#define ldo_dir_def_stripenr   ldo_dir_stripe->ldsi_def_stripenr
-#define ldo_dir_hash_type      ldo_dir_stripe->ldsi_hash_type
-#define ldo_dir_def_hash_type  ldo_dir_stripe->ldsi_def_hash_type
-#define ldo_dir_striped                ldo_dir_stripe->ldsi_striped
-#define ldo_dir_def_striping_set       ldo_dir_stripe->ldsi_def_striping_set
-#define ldo_dir_def_striping_cached    ldo_dir_stripe->ldsi_def_striping_cached
-#define ldo_dir_def_stripe_offset      ldo_dir_stripe->ldsi_def_stripe_offset
+static inline int lod_object_set_pool(struct lod_object *lo, const char *pool)
+{
+       int len;
+
+       if (lo->ldo_pool != NULL) {
+               len = strlen(lo->ldo_pool) + 1;
+               OBD_FREE(lo->ldo_pool, len);
+               lo->ldo_pool = NULL;
+       }
+       if (pool != NULL) {
+               len = strlen(pool) + 1;
+               OBD_ALLOC(lo->ldo_pool, len);
+               if (lo->ldo_pool == NULL)
+                       return -ENOMEM;
+               strlcpy(lo->ldo_pool, pool, len);
+       }
+       return 0;
+}
 
 struct lod_it {
        struct dt_object        *lit_obj; /* object from the layer below */
@@ -289,25 +311,28 @@ struct lod_it {
 
 struct lod_thread_info {
        /* per-thread buffer for LOV EA, may be vmalloc'd */
-       void             *lti_ea_store;
-       __u32             lti_ea_store_size;
+       void                           *lti_ea_store;
+       __u32                           lti_ea_store_size;
        /* per-thread buffer for LMV EA */
-       struct lu_buf     lti_buf;
-       struct ost_id     lti_ostid;
-       struct lu_fid     lti_fid;
-       struct obd_statfs lti_osfs;
-       struct lu_attr    lti_attr;
-       struct lod_it     lti_it;
-       struct ldlm_res_id lti_res_id;
+       struct lu_buf                   lti_buf;
+       struct ost_id                   lti_ostid;
+       struct lu_fid                   lti_fid;
+       struct obd_statfs               lti_osfs;
+       struct lu_attr                  lti_attr;
+       struct lod_it                   lti_it;
+       struct ldlm_res_id              lti_res_id;
        /* used to hold lu_dirent, sizeof(struct lu_dirent) + NAME_MAX */
-       char              lti_key[sizeof(struct lu_dirent) + NAME_MAX];
-       struct dt_object_format lti_format;
-       struct lu_name    lti_name;
-       struct lu_buf     lti_linkea_buf;
-       struct dt_insert_rec lti_dt_rec;
-       struct llog_catid lti_cid;
-       struct llog_cookie lti_cookie;
-       struct lustre_cfg lti_lustre_cfg;
+       char                            lti_key[sizeof(struct lu_dirent) +
+                                               NAME_MAX];
+       struct dt_object_format         lti_format;
+       struct lu_name                  lti_name;
+       struct lu_buf                   lti_linkea_buf;
+       struct dt_insert_rec            lti_dt_rec;
+       struct llog_catid               lti_cid;
+       struct llog_cookie              lti_cookie;
+       struct lustre_cfg               lti_lustre_cfg;
+       /* used to store parent default striping in create */
+       struct lod_default_striping     lti_def_striping;
 };
 
 extern const struct lu_device_operations lod_lu_ops;
@@ -489,7 +514,6 @@ extern struct dt_object_operations lod_obj_ops;
 extern struct lu_object_operations lod_lu_obj_ops;
 int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
                        struct lu_buf *buf, bool resize);
-int lod_object_set_pool(struct lod_object *o, char *pool);
 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
                               struct lu_attr *attr,
                               const struct lu_buf *lovea, struct thandle *th);
index 0ed2fe8..d362f48 100644 (file)
@@ -960,7 +960,7 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
        if (magic == LOV_MAGIC_V3) {
                struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *) lmm;
                objs = &v3->lmm_objects[0];
-               lod_object_set_pool(lo, v3->lmm_pool_name);
+               /* no need to set pool, which is used in create only */
        } else {
                objs = &lmm->lmm_objects[0];
        }
@@ -1041,9 +1041,6 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo)
                 */
                rc = lod_parse_dir_striping(env, lo, buf);
        }
-
-       if (rc == 0)
-               lo->ldo_striping_cached = 1;
 out:
        RETURN(rc);
 }
index 165d4bd..87dd8b6 100644 (file)
@@ -2105,22 +2105,6 @@ static int lod_declare_xattr_set(const struct lu_env *env,
 }
 
 /**
- * Resets cached default striping in the object.
- *
- * \param[in] lo       object
- */
-static void lod_lov_stripe_cache_clear(struct lod_object *lo)
-{
-       lo->ldo_def_striping_set = 0;
-       lo->ldo_def_striping_cached = 0;
-       lod_object_set_pool(lo, NULL);
-       lo->ldo_def_stripe_size = 0;
-       lo->ldo_def_stripenr = 0;
-       if (lo->ldo_dir_stripe != NULL)
-               lo->ldo_dir_def_striping_cached = 0;
-}
-
-/**
  * Apply xattr changes to the object.
  *
  * Applies xattr changes to the object and the stripes if the latter exist.
@@ -2237,18 +2221,12 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
                                    struct thandle *th)
 {
        struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
-       struct lod_object       *l = lod_dt_obj(dt);
        struct lov_user_md_v1   *lum;
        struct lov_user_md_v3   *v3 = NULL;
        const char              *pool_name = NULL;
        int                      rc;
        ENTRY;
 
-       /* If it is striped dir, we should clear the stripe cache for
-        * slave stripe as well, but there are no effective way to
-        * notify the LOD on the slave MDT, so we do not cache stripe
-        * information for slave stripe for now. XXX*/
-       lod_lov_stripe_cache_clear(l);
        LASSERT(buf != NULL && buf->lb_buf != NULL);
        lum = buf->lb_buf;
 
@@ -2308,7 +2286,6 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
                                            const char *name, int fl,
                                            struct thandle *th)
 {
-       struct lod_object       *l = lod_dt_obj(dt);
        struct lmv_user_md_v1   *lum;
        int                      rc;
        ENTRY;
@@ -2332,14 +2309,6 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
                        RETURN(rc);
        }
 
-       /* Update default stripe cache */
-       if (l->ldo_dir_stripe == NULL) {
-               OBD_ALLOC_PTR(l->ldo_dir_stripe);
-               if (l->ldo_dir_stripe == NULL)
-                       RETURN(-ENOMEM);
-       }
-
-       l->ldo_dir_def_striping_cached = 0;
        RETURN(rc);
 }
 
@@ -2535,11 +2504,17 @@ static int lod_dir_striping_create_internal(const struct lu_env *env,
                                            struct thandle *th,
                                            bool declare)
 {
-       struct lod_thread_info  *info = lod_env_info(env);
-       struct lod_object       *lo = lod_dt_obj(dt);
-       int                     rc;
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       const struct lod_default_striping *lds = lo->ldo_def_striping;
+       const char *poolname = NULL;
+       int rc;
        ENTRY;
 
+       LASSERT(ergo(lds != NULL,
+                    lds->lds_def_striping_set ||
+                    lds->lds_dir_def_striping_set));
+
        if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
                                 lo->ldo_dir_stripe_offset)) {
                struct lmv_user_md_v1 *v1 = info->lti_ea_store;
@@ -2572,11 +2547,10 @@ static int lod_dir_striping_create_internal(const struct lu_env *env,
        }
 
        /* Transfer default LMV striping from the parent */
-       if (lo->ldo_dir_def_striping_set &&
-           !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
-                                lo->ldo_dir_def_stripe_offset)) {
+       if (lds != NULL && lds->lds_dir_def_striping_set &&
+           !LMVEA_DELETE_VALUES(lds->lds_dir_def_stripenr,
+                                lds->lds_dir_def_stripe_offset)) {
                struct lmv_user_md_v1 *v1 = info->lti_ea_store;
-               int def_stripe_count = lo->ldo_dir_def_stripenr;
 
                if (info->lti_ea_store_size < sizeof(*v1)) {
                        rc = lod_ea_store_resize(info, sizeof(*v1));
@@ -2587,11 +2561,11 @@ static int lod_dir_striping_create_internal(const struct lu_env *env,
 
                memset(v1, 0, sizeof(*v1));
                v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
-               v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
+               v1->lum_stripe_count = cpu_to_le32(lds->lds_dir_def_stripenr);
                v1->lum_stripe_offset =
-                               cpu_to_le32(lo->ldo_dir_def_stripe_offset);
+                               cpu_to_le32(lds->lds_dir_def_stripe_offset);
                v1->lum_hash_type =
-                               cpu_to_le32(lo->ldo_dir_def_hash_type);
+                               cpu_to_le32(lds->lds_dir_def_hash_type);
 
                info->lti_buf.lb_buf = v1;
                info->lti_buf.lb_len = sizeof(*v1);
@@ -2608,12 +2582,15 @@ static int lod_dir_striping_create_internal(const struct lu_env *env,
                        RETURN(rc);
        }
 
+       if (lds != NULL && lds->lds_def_pool[0] != '\0')
+               poolname = lds->lds_def_pool;
+
        /* Transfer default LOV striping from the parent */
-       if (lo->ldo_def_striping_set &&
-           !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
-                                lo->ldo_def_stripenr,
-                                lo->ldo_def_stripe_offset,
-                                lo->ldo_pool)) {
+       if (lds != NULL && lds->lds_def_striping_set &&
+           !LOVEA_DELETE_VALUES(lds->lds_def_stripe_size,
+                                lds->lds_def_stripenr,
+                                lds->lds_def_stripe_offset,
+                                poolname)) {
                struct lov_user_md_v3 *v3 = info->lti_ea_store;
 
                if (info->lti_ea_store_size < sizeof(*v3)) {
@@ -2625,11 +2602,11 @@ static int lod_dir_striping_create_internal(const struct lu_env *env,
 
                memset(v3, 0, sizeof(*v3));
                v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
-               v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
-               v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
-               v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
-               if (lo->ldo_pool != NULL)
-                       strlcpy(v3->lmm_pool_name, lo->ldo_pool,
+               v3->lmm_stripe_count = cpu_to_le16(lds->lds_def_stripenr);
+               v3->lmm_stripe_offset = cpu_to_le16(lds->lds_def_stripe_offset);
+               v3->lmm_stripe_size = cpu_to_le32(lds->lds_def_stripe_size);
+               if (poolname != NULL)
+                       strlcpy(v3->lmm_pool_name, poolname,
                                sizeof(v3->lmm_pool_name));
 
                info->lti_buf.lb_buf = v3;
@@ -2663,14 +2640,7 @@ static int lod_dir_striping_create(const struct lu_env *env,
                                   struct dt_object_format *dof,
                                   struct thandle *th)
 {
-       struct lod_object *lo = lod_dt_obj(dt);
-       int rc;
-
-       rc = lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
-       if (rc == 0)
-               lo->ldo_striping_cached = 1;
-
-       return rc;
+       return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
 }
 
 /**
@@ -2852,38 +2822,6 @@ static int lod_xattr_list(const struct lu_env *env,
        return dt_xattr_list(env, dt_object_child(dt), buf);
 }
 
-/**
- * Initialize a pool the object belongs to.
- *
- * When a striped object is being created, striping configuration
- * may demand the stripes are allocated on a limited set of the
- * targets. These limited sets are known as "pools". So we copy
- * a pool name into the object and later actual creation methods
- * (like lod_object_create()) will use this information to allocate
- * the stripes properly.
- *
- * \param[in] o                object
- * \param[in] pool     pool name
- */
-int lod_object_set_pool(struct lod_object *o, char *pool)
-{
-       int len;
-
-       if (o->ldo_pool) {
-               len = strlen(o->ldo_pool);
-               OBD_FREE(o->ldo_pool, len + 1);
-               o->ldo_pool = NULL;
-       }
-       if (pool) {
-               len = strlen(pool);
-               OBD_ALLOC(o->ldo_pool, len + 1);
-               if (o->ldo_pool == NULL)
-                       return -ENOMEM;
-               strcpy(o->ldo_pool, pool);
-       }
-       return 0;
-}
-
 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
 {
        return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
@@ -2891,44 +2829,31 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi
 
 
 /**
- * Cache default regular striping in the object.
- *
- * To improve performance of striped regular object creation we cache
- * default LOV striping (if it exists) in the parent directory object.
+ * Get default striping.
  *
  * \param[in] env              execution environment
- * \param[in] lp               object
+ * \param[in] lo               object
+ * \param[out] lds             default striping
  *
  * \retval             0 on success
  * \retval             negative if failed
  */
-static int lod_cache_parent_lov_striping(const struct lu_env *env,
-                                        struct lod_object *lp)
+static int lod_get_default_lov_striping(const struct lu_env *env,
+                                       struct lod_object *lo,
+                                       struct lod_default_striping *lds)
 {
-       struct lod_thread_info  *info = lod_env_info(env);
-       struct lov_user_md_v1   *v1 = NULL;
-       struct lov_user_md_v3   *v3 = NULL;
-       int                      rc;
-       ENTRY;
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lov_user_md_v1 *v1 = NULL;
+       struct lov_user_md_v3 *v3 = NULL;
+       int rc;
 
-       /* called from MDD without parent being write locked,
-        * lock it here */
-       dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
-       rc = lod_get_lov_ea(env, lp);
+       rc = lod_get_lov_ea(env, lo);
        if (rc < 0)
-               GOTO(unlock, rc);
-
-       if (rc < (typeof(rc))sizeof(struct lov_user_md)) {
-               /* don't lookup for non-existing or invalid striping */
-               lp->ldo_def_striping_set = 0;
-               lp->ldo_def_striping_cached = 1;
-               lp->ldo_def_stripe_size = 0;
-               lp->ldo_def_stripenr = 0;
-               lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
-               GOTO(unlock, rc = 0);
-       }
+               return rc;
+
+       if (rc < (typeof(rc))sizeof(struct lov_user_md))
+               return 0;
 
-       rc = 0;
        v1 = info->lti_ea_store;
        if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
                lustre_swab_lov_user_md_v1(v1);
@@ -2938,127 +2863,126 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env,
        }
 
        if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
-               GOTO(unlock, rc = 0);
+               return 0;
 
        if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
-               GOTO(unlock, rc = 0);
-
-       CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
-              PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
-              (int)v1->lmm_stripe_count,
-              (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
-
-       lp->ldo_def_stripenr = v1->lmm_stripe_count;
-       lp->ldo_def_stripe_size = v1->lmm_stripe_size;
-       lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
-       lp->ldo_def_striping_cached = 1;
-       lp->ldo_def_striping_set = 1;
+               return 0;
+
+       lds->lds_def_stripenr = v1->lmm_stripe_count;
+       lds->lds_def_stripe_size = v1->lmm_stripe_size;
+       lds->lds_def_stripe_offset = v1->lmm_stripe_offset;
+       lds->lds_def_striping_set = 1;
        if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
-               /* XXX: sanity check here */
-               v3 = (struct lov_user_md_v3 *) v1;
-               if (v3->lmm_pool_name[0])
-                       lod_object_set_pool(lp, v3->lmm_pool_name);
+               v3 = (struct lov_user_md_v3 *)v1;
+               if (v3->lmm_pool_name[0] != '\0')
+                       strlcpy(lds->lds_def_pool, v3->lmm_pool_name,
+                               sizeof(lds->lds_def_pool));
        }
-       EXIT;
-unlock:
-       dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
-       return rc;
-}
 
+       return 0;
+}
 
 /**
- * Cache default directory striping in the object.
- *
- * To improve performance of striped directory creation we cache default
- * directory striping (if it exists) in the parent directory object.
+ * Get default directory striping.
  *
  * \param[in] env              execution environment
- * \param[in] lp               object
+ * \param[in] lo               object
+ * \param[out] lds             default striping
  *
  * \retval             0 on success
  * \retval             negative if failed
  */
-static int lod_cache_parent_lmv_striping(const struct lu_env *env,
-                                        struct lod_object *lp)
+static int lod_get_default_lmv_striping(const struct lu_env *env,
+                                       struct lod_object *lo,
+                                       struct lod_default_striping *lds)
 {
        struct lod_thread_info  *info = lod_env_info(env);
        struct lmv_user_md_v1   *v1 = NULL;
        int                      rc;
-       ENTRY;
 
-       /* called from MDD without parent being write locked,
-        * lock it here */
-       dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
-       rc = lod_get_default_lmv_ea(env, lp);
+       rc = lod_get_default_lmv_ea(env, lo);
        if (rc < 0)
-               GOTO(unlock, rc);
-
-       if (rc < (typeof(rc))sizeof(struct lmv_user_md)) {
-               /* don't lookup for non-existing or invalid striping */
-               lp->ldo_dir_def_striping_set = 0;
-               lp->ldo_dir_def_striping_cached = 1;
-               lp->ldo_dir_def_stripenr = 0;
-               lp->ldo_dir_def_stripe_offset =
-                                       (typeof(v1->lum_stripe_offset))(-1);
-               lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
-               GOTO(unlock, rc = 0);
-       }
+               return rc;
+
+       if (rc < (typeof(rc))sizeof(struct lmv_user_md))
+               return 0;
 
-       rc = 0;
        v1 = info->lti_ea_store;
 
-       lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
-       lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
-       lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
-       lp->ldo_dir_def_striping_set = 1;
-       lp->ldo_dir_def_striping_cached = 1;
+       lds->lds_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
+       lds->lds_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
+       lds->lds_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
+       lds->lds_dir_def_striping_set = 1;
 
-       EXIT;
-unlock:
-       dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
-       return rc;
+       return 0;
 }
 
 /**
- * Cache default striping in the object.
+ * Get default striping in the object.
  *
- * To improve performance of striped object creation we cache default striping
- * (if it exists) in the parent directory object. We always cache default
- * striping for the regular files (stored in LOV EA) and we cache default
- * striping for the directories if requested by \a child_mode (when a new
- * directory is being created).
+ * Get object default striping and default directory striping.
  *
  * \param[in] env              execution environment
- * \param[in] lp               object
- * \param[in] child_mode       new object's mode
+ * \param[in] lo               object
+ * \param[out] lds             default striping
  *
  * \retval             0 on success
  * \retval             negative if failed
  */
-static int lod_cache_parent_striping(const struct lu_env *env,
-                                    struct lod_object *lp,
-                                    umode_t child_mode)
+static int lod_get_default_striping(const struct lu_env *env,
+                                   struct lod_object *lo,
+                                   struct lod_default_striping *lds)
 {
-       int rc = 0;
-       ENTRY;
-
-       if (!lp->ldo_def_striping_cached) {
-               /* we haven't tried to get default striping for
-                * the directory yet, let's cache it in the object */
-               rc = lod_cache_parent_lov_striping(env, lp);
-               if (rc != 0)
-                       RETURN(rc);
-       }
+       int rc;
 
-       /* If the parent is on the remote MDT, we should always
-        * try to refresh the default stripeEA cache, because we
-        * do not cache default striping information for remote
-        * object. */
-       if (S_ISDIR(child_mode) && (!lp->ldo_dir_def_striping_cached ||
-                                   dt_object_remote(&lp->ldo_obj)))
-               rc = lod_cache_parent_lmv_striping(env, lp);
+       rc = lod_get_default_lov_striping(env, lo, lds);
+       if (rc == 0)
+               rc = lod_get_default_lmv_striping(env, lo, lds);
+       return rc;
+}
 
-       RETURN(rc);
+/**
+ * Apply default striping on object.
+ *
+ * If object striping pattern is not set, set to the one in default striping.
+ * The default striping is from parent or fs.
+ *
+ * \param[in] lo               new object
+ * \param[in] lds              default striping
+ * \param[in] mode             new object's mode
+ */
+static void lod_striping_from_default(struct lod_object *lo,
+                                     const struct lod_default_striping *lds,
+                                     umode_t mode)
+{
+       if (lds->lds_def_striping_set && S_ISREG(mode)) {
+               if (lo->ldo_stripenr == 0)
+                       lo->ldo_stripenr = lds->lds_def_stripenr;
+               if (lo->ldo_stripe_size == 0)
+                       lo->ldo_stripe_size = lds->lds_def_stripe_size;
+               if (lo->ldo_stripe_offset == LOV_OFFSET_DEFAULT)
+                       lo->ldo_stripe_offset = lds->lds_def_stripe_offset;
+               if (lo->ldo_pool == NULL && lds->lds_def_pool[0] != '\0')
+                       lod_object_set_pool(lo, lds->lds_def_pool);
+
+               CDEBUG(D_INFO, "striping from default: count %hu, size %u, "
+                       "offset %d, pool %s\n",
+                       lo->ldo_stripenr, lo->ldo_stripe_size,
+                       (int)lo->ldo_stripe_offset, lo->ldo_pool ?: "");
+       } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) {
+               if (lo->ldo_stripenr == 0)
+                       lo->ldo_stripenr = lds->lds_dir_def_stripenr;
+               if (lo->ldo_dir_stripe_offset == -1)
+                       lo->ldo_dir_stripe_offset =
+                               lds->lds_dir_def_stripe_offset;
+               if (lo->ldo_dir_hash_type == 0)
+                       lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type;
+
+               CDEBUG(D_INFO, "striping from default: count %hu, offset %d, "
+                       "hash_type %u\n",
+                       lo->ldo_stripenr, (int)lo->ldo_dir_stripe_offset,
+                       lo->ldo_dir_hash_type);
+       }
 }
 
 /**
@@ -3079,12 +3003,13 @@ static void lod_ah_init(const struct lu_env *env,
                        umode_t child_mode)
 {
        struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
-       struct dt_object  *nextp = NULL;
-       struct dt_object  *nextc;
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_default_striping *lds = &info->lti_def_striping;
+       struct dt_object *nextp = NULL;
+       struct dt_object *nextc;
        struct lod_object *lp = NULL;
        struct lod_object *lc;
-       struct lov_desc   *desc;
-       int               rc;
+       struct lov_desc *desc;
        ENTRY;
 
        LASSERT(child);
@@ -3092,9 +3017,6 @@ static void lod_ah_init(const struct lu_env *env,
        if (likely(parent)) {
                nextp = dt_object_child(parent);
                lp = lod_dt_obj(parent);
-               rc = lod_load_striping(env, lp);
-               if (rc != 0)
-                       return;
        }
 
        nextc = dt_object_child(child);
@@ -3107,85 +3029,33 @@ static void lod_ah_init(const struct lu_env *env,
                nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
-               if (lc->ldo_dir_stripe == NULL) {
-                       OBD_ALLOC_PTR(lc->ldo_dir_stripe);
-                       if (lc->ldo_dir_stripe == NULL)
-                               return;
-               }
+               /* other default values are 0 */
+               lc->ldo_dir_stripe_offset = -1;
 
-               LASSERT(lp != NULL);
-               if (lp->ldo_dir_stripe == NULL) {
-                       OBD_ALLOC_PTR(lp->ldo_dir_stripe);
-                       if (lp->ldo_dir_stripe == NULL)
-                               return;
-               }
+               memset(lds, 0, sizeof(*lds));
+               lod_get_default_striping(env, lp, lds);
 
-               rc = lod_cache_parent_striping(env, lp, child_mode);
-               if (rc != 0)
-                       return;
-
-               /* transfer defaults to new directory */
-               if (lp->ldo_def_striping_set) {
-                       if (lp->ldo_pool)
-                               lod_object_set_pool(lc, lp->ldo_pool);
-                       lc->ldo_def_stripenr = lp->ldo_def_stripenr;
-                       lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
-                       lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
-                       lc->ldo_def_striping_set = 1;
-                       lc->ldo_def_striping_cached = 1;
-                       CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
-                              (int)lc->ldo_def_stripe_size,
-                              (int)lc->ldo_def_stripe_offset,
-                              (int)lc->ldo_def_stripenr);
-               }
-
-               /* transfer dir defaults to new directory */
-               if (lp->ldo_dir_def_striping_set) {
-                       lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
-                       lc->ldo_dir_def_stripe_offset =
-                                                 lp->ldo_dir_def_stripe_offset;
-                       lc->ldo_dir_def_hash_type =
-                                                 lp->ldo_dir_def_hash_type;
-                       lc->ldo_dir_def_striping_set = 1;
-                       lc->ldo_dir_def_striping_cached = 1;
-                       CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
-                              (int)lc->ldo_dir_def_stripenr,
-                              (int)lc->ldo_dir_def_stripe_offset,
-                              lc->ldo_dir_def_hash_type);
-               }
+               /* inherit parent default striping */
+               if (lds->lds_def_striping_set || lds->lds_dir_def_striping_set)
+                       lc->ldo_def_striping = lds;
 
                /* It should always honour the specified stripes */
-               if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
+               if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 &&
+                   lod_verify_md_striping(d, ah->dah_eadata) == 0) {
                        const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
 
-                       rc = lod_verify_md_striping(d, lum1);
-                       if (rc == 0 &&
-                               le32_to_cpu(lum1->lum_stripe_count) > 1) {
-                               lc->ldo_stripenr =
-                                       le32_to_cpu(lum1->lum_stripe_count);
-                               lc->ldo_dir_stripe_offset =
-                                       le32_to_cpu(lum1->lum_stripe_offset);
-                               lc->ldo_dir_hash_type =
-                                       le32_to_cpu(lum1->lum_hash_type);
-                               CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
-                                      lc->ldo_stripenr,
-                                      (int)lc->ldo_dir_stripe_offset);
-                       }
-               /* then check whether there is default stripes from parent */
-               } else if (lp->ldo_dir_def_striping_set) {
-                       /* If there are default dir stripe from parent */
-                       lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
+                       lc->ldo_stripenr = le32_to_cpu(lum1->lum_stripe_count);
                        lc->ldo_dir_stripe_offset =
-                                       lp->ldo_dir_def_stripe_offset;
+                                       le32_to_cpu(lum1->lum_stripe_offset);
                        lc->ldo_dir_hash_type =
-                                       lp->ldo_dir_def_hash_type;
-                       CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
-                              lc->ldo_stripenr,
-                              (int)lc->ldo_dir_stripe_offset);
+                                       le32_to_cpu(lum1->lum_hash_type);
+                       CDEBUG(D_INFO, "set dir stripe: count %hu, offset %d, "
+                               "hash_type %u\n",
+                               lc->ldo_stripenr,
+                               (int)lc->ldo_dir_stripe_offset,
+                               lc->ldo_dir_hash_type);
                } else {
-                       /* set default stripe for this directory */
-                       lc->ldo_stripenr = 0;
-                       lc->ldo_dir_stripe_offset = -1;
+                       lod_striping_from_default(lc, lds, child_mode);
                }
 
                /* shrink the stripe_count to the avaible MDT count */
@@ -3200,10 +3070,11 @@ static void lod_ah_init(const struct lu_env *env,
                if (lc->ldo_stripenr == 1)
                        lc->ldo_stripenr = 0;
 
-               CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
-                      lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
+               CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n",
+                      lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset,
+                      lc->ldo_dir_hash_type);
 
-               goto out;
+               RETURN_EXIT;
        }
 
        /*
@@ -3213,45 +3084,60 @@ static void lod_ah_init(const struct lu_env *env,
         */
        if (!lod_object_will_be_striped(S_ISREG(child_mode),
                                        lu_object_fid(&child->do_lu)))
-               goto out;
-       /*
-        * try from the parent
-        */
+               RETURN_EXIT;
+
+       /* other default values are 0 */
+       lc->ldo_stripe_offset = LOV_OFFSET_DEFAULT;
+
+       /* striping from parent default */
        if (likely(parent)) {
-               lod_cache_parent_striping(env, lp, child_mode);
-
-               lc->ldo_def_stripe_offset = LOV_OFFSET_DEFAULT;
-
-               if (lp->ldo_def_striping_set) {
-                       if (lp->ldo_pool)
-                               lod_object_set_pool(lc, lp->ldo_pool);
-                       lc->ldo_stripenr = lp->ldo_def_stripenr;
-                       lc->ldo_stripe_size = lp->ldo_def_stripe_size;
-                       lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
-                       CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
-                              lc->ldo_stripenr, lc->ldo_stripe_size,
-                              lp->ldo_pool ? lp->ldo_pool : "");
+               memset(lds, 0, sizeof(*lds));
+               lod_get_default_lov_striping(env, lp, lds);
+               lod_striping_from_default(lc, lds, child_mode);
+       }
+
+       if (d->lod_md_root == NULL) {
+               struct dt_object *root;
+               struct lod_object *lroot;
+
+               lu_root_fid(&info->lti_fid);
+               root = dt_locate(env, &d->lod_dt_dev, &info->lti_fid);
+               if (!IS_ERR(root)) {
+                       lroot = lod_dt_obj(root);
+
+                       spin_lock(&d->lod_lock);
+                       if (d->lod_md_root != NULL)
+                               lu_object_put(env,
+                                             &d->lod_md_root->ldo_obj.do_lu);
+                       d->lod_md_root = lroot;
+                       spin_unlock(&d->lod_lock);
                }
        }
 
+       /* if parent doesn't provide all defaults, striping from fs default */
+       if (d->lod_md_root != NULL &&
+           (lc->ldo_stripenr == 0 ||
+            lc->ldo_stripe_size == 0 ||
+            lc->ldo_stripe_offset == LOV_OFFSET_DEFAULT ||
+            lc->ldo_pool == NULL)) {
+               memset(lds, 0, sizeof(*lds));
+               lod_get_default_lov_striping(env, d->lod_md_root, lds);
+               lod_striping_from_default(lc, lds, child_mode);
+       }
+
        /*
-        * if the parent doesn't provide with specific pattern, grab fs-wide one
+        * fs default striping may not be explicitly set, or historically set
+        * in config log, check striping sanity here and fix to sane values.
         */
        desc = &d->lod_desc;
        if (lc->ldo_stripenr == 0)
                lc->ldo_stripenr = desc->ld_default_stripe_count;
        if (lc->ldo_stripe_size == 0)
                lc->ldo_stripe_size = desc->ld_default_stripe_size;
-       CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
-              lc->ldo_stripenr, lc->ldo_stripe_size,
-              lc->ldo_pool ? lc->ldo_pool : "");
-
-out:
-       /* we do not cache stripe information for slave stripe, see
-        * lod_xattr_set_lov_on_dir */
-       if (lp != NULL && lp->ldo_dir_slave_stripe)
-               lod_lov_stripe_cache_clear(lp);
 
+       CDEBUG(D_INFO, "final striping [%hu %u %d %s]\n",
+              lc->ldo_stripenr, lc->ldo_stripe_size,
+              (int)lc->ldo_stripe_offset, lc->ldo_pool ?: "");
        EXIT;
 }
 
@@ -3495,11 +3381,7 @@ static int lod_declare_object_create(const struct lu_env *env,
                        }
                }
 
-               /* Orphan object (like migrating object) does not have
-                * lod_dir_stripe, see lod_ah_init */
-               if (lo->ldo_dir_stripe != NULL)
-                       rc = lod_declare_dir_striping_create(env, dt, attr,
-                                                            dof, th);
+               rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
        }
 out:
        RETURN(rc);
@@ -3532,8 +3414,6 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
        int                rc = 0, i;
        ENTRY;
 
-       LASSERT(lo->ldo_striping_cached == 0);
-
        /* create all underlying objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
@@ -3543,11 +3423,8 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
                        break;
        }
 
-       if (rc == 0) {
+       if (rc == 0)
                rc = lod_generate_and_set_lovea(env, lo, th);
-               if (rc == 0)
-                       lo->ldo_striping_cached = 1;
-       }
 
        RETURN(rc);
 }
@@ -3804,7 +3681,7 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
  * Release LDLM locks on the stripes of a striped directory.
  *
  * Iterates over all the locks taken on the stripe objects and
- * release them using ->do_object_unlock() method.
+ * cancel them.
  *
  * \param[in] env      execution environment
  * \param[in] dt       striped object
@@ -3829,8 +3706,8 @@ static int lod_object_unlock_internal(const struct lu_env *env,
 
        for (i = 1; i < slave_locks->count; i++) {
                if (lustre_handle_is_used(&slave_locks->handles[i]))
-                       ldlm_lock_decref(&slave_locks->handles[i],
-                                        einfo->ei_mode);
+                       ldlm_lock_decref_and_cancel(&slave_locks->handles[i],
+                                                   einfo->ei_mode);
        }
 
        RETURN(rc);
@@ -3864,8 +3741,10 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
        LASSERT(!dt_object_remote(dt_object_child(dt)));
 
        /* locks were unlocked in MDT layer */
-       for (i = 1; i < slave_locks->count; i++)
+       for (i = 1; i < slave_locks->count; i++) {
                LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
+               dt_invalidate(env, lo->ldo_stripe[i]);
+       }
 
        slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
                           sizeof(slave_locks->handles[0]);
@@ -3904,22 +3783,29 @@ static int lod_object_lock(const struct lu_env *env,
        }
 
        if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
-               RETURN(-ENOTDIR);
+               GOTO(out, rc = -ENOTDIR);
 
        rc = lod_load_striping(env, lo);
        if (rc != 0)
-               RETURN(rc);
+               GOTO(out, rc);
 
        /* No stripes */
-       if (lo->ldo_stripenr <= 1)
-               RETURN(0);
+       if (lo->ldo_stripenr <= 1) {
+               /*
+                * NB, ei_cbdata stores pointer to slave locks, if no locks
+                * taken, make sure it's set to NULL, otherwise MDT will try to
+                * unlock them.
+                */
+               einfo->ei_cbdata = NULL;
+               GOTO(out, rc = 0);
+       }
 
        slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
                           sizeof(slave_locks->handles[0]);
        /* Freed in lod_object_unlock */
        OBD_ALLOC(slave_locks, slave_locks_size);
        if (slave_locks == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
        slave_locks->count = lo->ldo_stripenr;
 
        /* striped directory lock */
@@ -3959,23 +3845,32 @@ static int lod_object_lock(const struct lu_env *env,
                                                    NULL, &lockh);
                }
                if (rc != 0)
-                       GOTO(out, rc);
+                       break;
                slave_locks->handles[i] = lockh;
        }
-
        einfo->ei_cbdata = slave_locks;
 
-out:
        if (rc != 0 && slave_locks != NULL) {
-               einfo->ei_cbdata = slave_locks;
                lod_object_unlock_internal(env, dt, einfo, policy);
                OBD_FREE(slave_locks, slave_locks_size);
-               einfo->ei_cbdata = NULL;
        }
-
+       EXIT;
+out:
+       if (rc != 0)
+               einfo->ei_cbdata = NULL;
        RETURN(rc);
 }
 
+/**
+ * Implementation of dt_object_operations::do_invalidate.
+ *
+ * \see dt_object_operations::do_invalidate() in the API description for details
+ */
+static int lod_invalidate(const struct lu_env *env, struct dt_object *dt)
+{
+       return dt_invalidate(env, dt_object_child(dt));
+}
+
 struct dt_object_operations lod_obj_ops = {
        .do_read_lock           = lod_object_read_lock,
        .do_write_lock          = lod_object_write_lock,
@@ -4004,6 +3899,7 @@ struct dt_object_operations lod_obj_ops = {
        .do_object_sync         = lod_object_sync,
        .do_object_lock         = lod_object_lock,
        .do_object_unlock       = lod_object_unlock,
+       .do_invalidate          = lod_invalidate,
 };
 
 /**
@@ -4164,14 +4060,11 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
  */
 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
 {
-       int i;
+       int len;
 
-       if (lo->ldo_dir_stripe != NULL) {
-               OBD_FREE_PTR(lo->ldo_dir_stripe);
-               lo->ldo_dir_stripe = NULL;
-       }
+       if (lo->ldo_stripe != NULL) {
+               int i;
 
-       if (lo->ldo_stripe) {
                LASSERT(lo->ldo_stripes_allocated > 0);
 
                for (i = 0; i < lo->ldo_stripenr; i++) {
@@ -4179,14 +4072,12 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
                                lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
                }
 
-               i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
-               OBD_FREE(lo->ldo_stripe, i);
+               len = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
+               OBD_FREE(lo->ldo_stripe, len);
                lo->ldo_stripe = NULL;
                lo->ldo_stripes_allocated = 0;
        }
-       lo->ldo_striping_cached = 0;
        lo->ldo_stripenr = 0;
-       lo->ldo_pattern = 0;
 }
 
 /**
@@ -4218,18 +4109,13 @@ static int lod_object_start(const struct lu_env *env, struct lu_object *o)
  */
 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
 {
-       struct lod_object *mo = lu2lod_obj(o);
-
-       /*
-        * release all underlying object pinned
-        */
-
-       lod_object_free_striping(env, mo);
-
-       lod_object_set_pool(mo, NULL);
+       struct lod_object *lo = lu2lod_obj(o);
 
+       lod_object_set_pool(lo, NULL);
+       /* release all underlying object pinned */
+       lod_object_free_striping(env, lo);
        lu_object_fini(o);
-       OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
+       OBD_SLAB_FREE_PTR(lo, lod_object_kmem);
 }
 
 /**
index 8022335..d1985b8 100644 (file)
@@ -209,7 +209,7 @@ static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
        /* check whether device has changed state (active, inactive) */
        if (rc != 0 && ost->ltd_active) {
                /* turned inactive? */
-               spin_lock(&d->lod_desc_lock);
+               spin_lock(&d->lod_lock);
                if (ost->ltd_active) {
                        ost->ltd_active = 0;
                        if (rc == -ENOTCONN)
@@ -222,13 +222,13 @@ static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
                        CDEBUG(D_CONFIG, "%s: turns inactive\n",
                               ost->ltd_exp->exp_obd->obd_name);
                }
-               spin_unlock(&d->lod_desc_lock);
+               spin_unlock(&d->lod_lock);
        } else if (rc == 0 && ost->ltd_active == 0) {
                /* turned active? */
                LASSERTF(d->lod_desc.ld_active_tgt_count < d->lod_ostnr,
                         "active tgt count %d, ost nr %d\n",
                         d->lod_desc.ld_active_tgt_count, d->lod_ostnr);
-               spin_lock(&d->lod_desc_lock);
+               spin_lock(&d->lod_lock);
                if (ost->ltd_active == 0) {
                        ost->ltd_active = 1;
                        ost->ltd_connecting = 0;
@@ -238,7 +238,7 @@ static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
                        CDEBUG(D_CONFIG, "%s: turns active\n",
                               ost->ltd_exp->exp_obd->obd_name);
                }
-               spin_unlock(&d->lod_desc_lock);
+               spin_unlock(&d->lod_lock);
        }
 
        RETURN(rc);
@@ -1105,7 +1105,7 @@ static int lod_alloc_ost_list(const struct lu_env *env,
 
        v3 = (struct lov_user_md_v3 *)lum;
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               if (v3->lmm_objects[i].l_ost_idx == lo->ldo_def_stripe_offset) {
+               if (v3->lmm_objects[i].l_ost_idx == lo->ldo_stripe_offset) {
                        array_idx = i;
                        break;
                }
@@ -1113,7 +1113,7 @@ static int lod_alloc_ost_list(const struct lu_env *env,
        if (i == lo->ldo_stripenr) {
                CDEBUG(D_OTHER,
                       "%s: start index %d not in the specified list of OSTs\n",
-                      lod2obd(m)->obd_name, lo->ldo_def_stripe_offset);
+                      lod2obd(m)->obd_name, lo->ldo_stripe_offset);
                RETURN(-EINVAL);
        }
 
@@ -1161,7 +1161,7 @@ static int lod_alloc_ost_list(const struct lu_env *env,
 /**
  * Allocate a striping on a predefined set of OSTs.
  *
- * Allocates new layout starting from OST index in lo->ldo_def_stripe_offset.
+ * Allocates new layout starting from OST index in lo->ldo_stripe_offset.
  * Full OSTs are not considered. The exact order of OSTs is not important and
  * varies depending on OST status. The allocation procedure prefers the targets
  * with precreated objects ready. The number of stripes needed and stripe
@@ -1217,15 +1217,14 @@ repeat_find:
        /* search loi_ost_idx in ost array */
        array_idx = 0;
        for (i = 0; i < ost_count; i++) {
-               if (osts->op_array[i] == lo->ldo_def_stripe_offset) {
+               if (osts->op_array[i] == lo->ldo_stripe_offset) {
                        array_idx = i;
                        break;
                }
        }
        if (i == ost_count) {
                CERROR("Start index %d not found in pool '%s'\n",
-                      lo->ldo_def_stripe_offset,
-                      lo->ldo_pool ? lo->ldo_pool : "");
+                      lo->ldo_stripe_offset, lo->ldo_pool ?: "");
                GOTO(out, rc = -EINVAL);
        }
 
@@ -1786,7 +1785,7 @@ static int lod_qos_parse_config(const struct lu_env *env,
        if (v1->lmm_stripe_count > 0)
                lo->ldo_stripenr = v1->lmm_stripe_count;
 
-       lo->ldo_def_stripe_offset = v1->lmm_stripe_offset;
+       lo->ldo_stripe_offset = v1->lmm_stripe_offset;
 
        lod_object_set_pool(lo, NULL);
        if (pool_name != NULL) {
@@ -1797,14 +1796,14 @@ static int lod_qos_parse_config(const struct lu_env *env,
                /* coverity[overrun-buffer-val] */
                pool = lod_find_pool(d, pool_name);
                if (pool != NULL) {
-                       if (lo->ldo_def_stripe_offset != LOV_OFFSET_DEFAULT) {
+                       if (lo->ldo_stripe_offset != LOV_OFFSET_DEFAULT) {
                                rc = lod_check_index_in_pool(
-                                              lo->ldo_def_stripe_offset, pool);
+                                               lo->ldo_stripe_offset, pool);
                                if (rc < 0) {
                                        lod_pool_putref(pool);
                                        CERROR("%s: invalid offset, %u\n",
                                               lod2obd(d)->obd_name,
-                                              lo->ldo_def_stripe_offset);
+                                              lo->ldo_stripe_offset);
                                        RETURN(-EINVAL);
                                }
                        }
@@ -1911,7 +1910,7 @@ int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
 
                if (lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
                        rc = lod_alloc_ost_list(env, lo, stripe, lum, th);
-               } else if (lo->ldo_def_stripe_offset == LOV_OFFSET_DEFAULT) {
+               } else if (lo->ldo_stripe_offset == LOV_OFFSET_DEFAULT) {
                        rc = lod_alloc_qos(env, lo, stripe, flag, th);
                        if (rc == -EAGAIN)
                                rc = lod_alloc_rr(env, lo, stripe, flag, th);
index 0e9e941..c722509 100644 (file)
@@ -4211,6 +4211,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
        struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
        struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
        struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
+       bool                    created = false;
        int                     rc;
 
        ENTRY;
@@ -4269,6 +4270,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
                                        lname, so_attr);
                if (rc != 0)
                        GOTO(put, rc);
+               created = true;
        }
 
        LASSERT(mdd_object_exists(mdd_tobj));
@@ -4295,6 +4297,10 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
                                     ma);
        if (rc != 0)
                GOTO(put, rc);
+
+       /* newly created target was not locked, don't cache its attributes */
+       if (created)
+               mdd_invalidate(env, tobj);
 put:
        RETURN(rc);
 }
index 4f99f02..60094ca 100644 (file)
@@ -302,6 +302,7 @@ int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
                           const struct lu_name *tname,
                           const struct lu_name *sname,
                           struct thandle *handle);
+int mdd_invalidate(const struct lu_env *env, struct md_object *obj);
 int mdd_declare_object_create_internal(const struct lu_env *env,
                                       struct mdd_object *p,
                                       struct mdd_object *c,
@@ -571,6 +572,12 @@ mdo_xattr_list(const struct lu_env *env, struct mdd_object *obj,
        return dt_xattr_list(env, next, buf);
 }
 
+static inline int
+mdo_invalidate(const struct lu_env *env, struct mdd_object *obj)
+{
+       return dt_invalidate(env, mdd_object_child(obj));
+}
+
 static inline
 int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
                             const struct lu_fid *fid, __u32 type,
index 4ad7174..692d445 100644 (file)
@@ -339,6 +339,11 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
        RETURN(rc);
 }
 
+int mdd_invalidate(const struct lu_env *env, struct md_object *obj)
+{
+       return mdo_invalidate(env, md2mdd_obj(obj));
+}
+
 int mdd_declare_object_create_internal(const struct lu_env *env,
                                       struct mdd_object *p,
                                       struct mdd_object *c,
@@ -2066,6 +2071,7 @@ const struct md_object_operations mdd_obj_ops = {
        .moo_xattr_get          = mdd_xattr_get,
        .moo_xattr_set          = mdd_xattr_set,
        .moo_xattr_list         = mdd_xattr_list,
+       .moo_invalidate         = mdd_invalidate,
        .moo_xattr_del          = mdd_xattr_del,
        .moo_swap_layouts       = mdd_swap_layouts,
        .moo_open               = mdd_open,
index 3a9419b..acc008f 100644 (file)
@@ -448,15 +448,15 @@ static int mdt_statfs(struct tgt_session_info *tsi)
                rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs);
                if (rc)
                        GOTO(out, rc);
-               spin_lock(&info->mti_mdt->mdt_osfs_lock);
+               spin_lock(&info->mti_mdt->mdt_lock);
                info->mti_mdt->mdt_osfs = *osfs;
                info->mti_mdt->mdt_osfs_age = cfs_time_current_64();
-               spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+               spin_unlock(&info->mti_mdt->mdt_lock);
        } else {
                /** use cached statfs data */
-               spin_lock(&info->mti_mdt->mdt_osfs_lock);
+               spin_lock(&info->mti_mdt->mdt_lock);
                *osfs = info->mti_mdt->mdt_osfs;
-               spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+               spin_unlock(&info->mti_mdt->mdt_lock);
        }
 
        if (rc == 0)
@@ -1237,17 +1237,13 @@ out:
  */
 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
 {
-       __u64   o1_flags;
+       unsigned int o1_lov_created = o1->mot_lov_created;
 
        mutex_lock(&o1->mot_lov_mutex);
        mutex_lock(&o2->mot_lov_mutex);
 
-       o1_flags = o1->mot_flags;
-       o1->mot_flags = (o1->mot_flags & ~MOF_LOV_CREATED) |
-                       (o2->mot_flags & MOF_LOV_CREATED);
-
-       o2->mot_flags = (o2->mot_flags & ~MOF_LOV_CREATED) |
-                       (o1_flags & MOF_LOV_CREATED);
+       o1->mot_lov_created = o2->mot_lov_created;
+       o2->mot_lov_created = o1_lov_created;
 
        mutex_unlock(&o2->mot_lov_mutex);
        mutex_unlock(&o1->mot_lov_mutex);
@@ -2375,12 +2371,13 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag)
 {
-       struct lustre_handle lockh;
-       int               rc;
+       int rc = 0;
        ENTRY;
 
        switch (flag) {
-       case LDLM_CB_BLOCKING:
+       case LDLM_CB_BLOCKING: {
+               struct lustre_handle lockh;
+
                ldlm_lock2handle(lock, &lockh);
                rc = ldlm_cli_cancel(&lockh,
                        ldlm_is_atomic_cb(lock) ? 0 : LCF_ASYNC);
@@ -2389,17 +2386,46 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                        RETURN(rc);
                }
                break;
-       case LDLM_CB_CANCELING:
-               LDLM_DEBUG(lock, "Revoke remote lock");
+       }
+       case LDLM_CB_CANCELING: {
+               LDLM_DEBUG(lock, "Revoke remote lock\n");
+
                /* discard slc lock here so that it can be cleaned anytime,
                 * especially for cleanup_resource() */
                tgt_discard_slc_lock(lock);
+
+               /* once we cache lock, l_ast_data is set to mdt_object */
+               if (lock->l_ast_data != NULL) {
+                       struct mdt_object *mo = lock->l_ast_data;
+                       struct lu_env env;
+
+                       rc = lu_env_init(&env, LCT_MD_THREAD);
+                       if (unlikely(rc != 0)) {
+                               struct obd_device *obd;
+
+                               obd = ldlm_lock_to_ns(lock)->ns_obd;
+                               CWARN("%s: lu_env initialization failed, object"
+                                     "%p "DFID" is leaked!\n",
+                                     obd->obd_name, mo,
+                                     PFID(mdt_object_fid(mo)));
+                               RETURN(rc);
+                       }
+
+                       if (lock->l_policy_data.l_inodebits.bits &
+                           (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)) {
+                               rc = mo_invalidate(&env, mdt_object_child(mo));
+                               mo->mot_cache_attr = 0;
+                       }
+                       mdt_object_put(&env, mo);
+                       lu_env_fini(&env);
+               }
                break;
+       }
        default:
                LBUG();
        }
 
-       RETURN(0);
+       RETURN(rc);
 }
 
 int mdt_check_resent_lock(struct mdt_thread_info *info,
@@ -2439,7 +2465,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
 
 int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
                           const struct lu_fid *fid, struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool nonblock)
+                          enum ldlm_mode mode, __u64 ibits, bool nonblock,
+                          bool cache)
 {
        struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
        union ldlm_policy_data *policy = &mti->mti_policy;
@@ -2460,12 +2487,24 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
        einfo->ei_res_id = res_id;
        if (nonblock)
                einfo->ei_nonblock = 1;
+       if (cache) {
+               /*
+                * if we cache lock, couple lock with mdt_object, so that object
+                * can be easily found in lock ASTs.
+                */
+               mdt_object_get(mti->mti_env, o);
+               einfo->ei_cbdata = o;
+       }
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
        rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
                            policy);
+       if (rc < 0 && cache) {
+               mdt_object_put(mti->mti_env, o);
+               einfo->ei_cbdata = NULL;
+       }
        RETURN(rc);
 }
 
@@ -2614,7 +2653,8 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
                rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
                                            &lh->mlh_rreg_lh,
                                            lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE, nonblock);
+                                           MDS_INODELOCK_UPDATE, nonblock,
+                                           false);
                if (rc != ELDLM_OK) {
                        if (local_lh != NULL)
                                mdt_object_unlock(info, o, local_lh, rc);
@@ -2742,17 +2782,24 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
  * \param decref force immediate lock releasing
  */
 static void mdt_save_remote_lock(struct mdt_thread_info *info,
-                                struct lustre_handle *h, enum ldlm_mode mode,
-                                int decref)
+                                struct mdt_object *o, struct lustre_handle *h,
+                                enum ldlm_mode mode, int decref)
 {
        ENTRY;
 
        if (lustre_handle_is_used(h)) {
+               struct ldlm_lock *lock = ldlm_handle2lock(h);
+
+               if (o != NULL &&
+                   (lock->l_policy_data.l_inodebits.bits &
+                    (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)))
+                       mo_invalidate(info->mti_env, mdt_object_child(o));
+
                if (decref || !info->mti_has_trans ||
                    !(mode & (LCK_PW | LCK_EX))) {
                        ldlm_lock_decref_and_cancel(h, mode);
+                       LDLM_LOCK_PUT(lock);
                } else {
-                       struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
 
                        LASSERT(req != NULL);
@@ -2784,7 +2831,8 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 
        mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
-       mdt_save_remote_lock(info, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref);
+       mdt_save_remote_lock(info, o, &lh->mlh_rreg_lh, lh->mlh_rreg_mode,
+                            decref);
 
        EXIT;
 }
@@ -4446,6 +4494,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        struct lfsck_stop        stop;
        ENTRY;
 
+       if (m->mdt_md_root != NULL) {
+               mdt_object_put(env, m->mdt_md_root);
+               m->mdt_md_root = NULL;
+       }
+
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
@@ -4566,7 +4619,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_squash.rsi_gid = 0;
        INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
        init_rwsem(&m->mdt_squash.rsi_sem);
-       spin_lock_init(&m->mdt_osfs_lock);
+       spin_lock_init(&m->mdt_lock);
        m->mdt_osfs_age = cfs_time_shift_64(-1000);
        m->mdt_enable_remote_dir = 0;
        m->mdt_enable_remote_dir_gid = 0;
@@ -4902,8 +4955,10 @@ static int mdt_object_print(const struct lu_env *env, void *cookie,
        struct mdt_object *mdto = mdt_obj((struct lu_object *)o);
 
        return (*p)(env, cookie,
-                   LUSTRE_MDT_NAME"-object@%p(flags=%d, writecount=%d)",
-                   mdto, mdto->mot_flags, mdto->mot_write_count);
+                   LUSTRE_MDT_NAME"-object@%p(%s %s, writecount=%d)",
+                   mdto, mdto->mot_lov_created ? "lov_created" : "",
+                   mdto->mot_cache_attr ? "cache_attr" : "",
+                   mdto->mot_write_count);
 }
 
 static int mdt_prepare(const struct lu_env *env,
index 9015cc6..68dcde9 100644 (file)
@@ -199,10 +199,13 @@ struct mdt_device {
                                   mdt_skip_lfsck:1;
 
        gid_t                      mdt_enable_remote_dir_gid;
+
+       /* lock for osfs and md_root */
+       spinlock_t                 mdt_lock;
+
        /* statfs optimization: we cache a bit  */
        struct obd_statfs          mdt_osfs;
        __u64                      mdt_osfs_age;
-       spinlock_t                 mdt_osfs_lock;
 
         /* root squash */
        struct root_squash_info    mdt_squash;
@@ -222,20 +225,19 @@ struct mdt_device {
 
        /* MDT device async commit count, used for debug and sanity test */
        atomic_t                   mdt_async_commit_count;
+
+       struct mdt_object         *mdt_md_root;
 };
 
 #define MDT_SERVICE_WATCHDOG_FACTOR    (2)
 #define MDT_COS_DEFAULT         (0)
 
-enum mdt_object_flags {
-       /** lov object has been created. */
-       MOF_LOV_CREATED         = 1 << 0,
-};
-
 struct mdt_object {
        struct lu_object_header mot_header;
        struct lu_object        mot_obj;
-       enum mdt_object_flags   mot_flags;
+       unsigned int            mot_lov_created:1,  /* lov object created */
+                               mot_cache_attr:1;   /* enable remote object
+                                                    * attribute cache */
        int                     mot_write_count;
        spinlock_t              mot_write_lock;
         /* Lock to protect create_data */
@@ -626,7 +628,8 @@ void mdt_client_compatibility(struct mdt_thread_info *info);
 int mdt_remote_object_lock(struct mdt_thread_info *mti,
                           struct mdt_object *o, const struct lu_fid *fid,
                           struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool nonblock);
+                          enum ldlm_mode mode, __u64 ibits, bool nonblock,
+                          bool cache);
 
 enum mdt_name_flags {
        MNF_FIX_ANON = 1,
index b305b57..7872e9c 100644 (file)
@@ -123,7 +123,7 @@ static int mdt_create_data(struct mdt_thread_info *info,
        ma->ma_need = MA_INODE | MA_LOV;
        ma->ma_valid = 0;
        mutex_lock(&o->mot_lov_mutex);
-       if (!(o->mot_flags & MOF_LOV_CREATED)) {
+       if (!o->mot_lov_created) {
                rc = mdo_create_data(info->mti_env,
                                     p ? mdt_object_child(p) : NULL,
                                     mdt_object_child(o), spec, ma);
@@ -131,7 +131,7 @@ static int mdt_create_data(struct mdt_thread_info *info,
                        rc = mdt_attr_get_complex(info, o, ma);
 
                if (rc == 0 && ma->ma_valid & MA_LOV)
-                       o->mot_flags |= MOF_LOV_CREATED;
+                       o->mot_lov_created = 1;
        }
 
        mutex_unlock(&o->mot_lov_mutex);
@@ -1172,6 +1172,48 @@ out:
         RETURN(rc);
 }
 
+/*
+ * find root object and take its xattr lock if it's on remote MDT, later create
+ * may use fs default striping (which is stored in root xattr).
+ */
+static int mdt_lock_root_xattr(struct mdt_thread_info *info,
+                              struct mdt_device *mdt)
+{
+       struct mdt_object *md_root = mdt->mdt_md_root;
+       struct lustre_handle lhroot;
+       int rc;
+
+       if (md_root == NULL) {
+               lu_root_fid(&info->mti_tmp_fid1);
+               md_root = mdt_object_find(info->mti_env, mdt,
+                                         &info->mti_tmp_fid1);
+               if (IS_ERR(md_root))
+                       return PTR_ERR(md_root);
+
+               spin_lock(&mdt->mdt_lock);
+               if (mdt->mdt_md_root != NULL)
+                       mdt_object_put(info->mti_env, mdt->mdt_md_root);
+               mdt->mdt_md_root = md_root;
+               spin_unlock(&mdt->mdt_lock);
+       }
+
+       if (md_root->mot_cache_attr || !mdt_object_remote(md_root))
+               return 0;
+
+       rc = mdt_remote_object_lock(info, md_root, mdt_object_fid(md_root),
+                                   &lhroot, LCK_PR, MDS_INODELOCK_XATTR,
+                                   false, true);
+       if (rc < 0)
+               return rc;
+
+       md_root->mot_cache_attr = 1;
+
+       /* don't cancel this lock, so that we know the cached xattr is valid. */
+       ldlm_lock_decref(&lhroot, LCK_PR);
+
+       return 0;
+}
+
 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 {
         struct mdt_device       *mdt = info->mti_mdt;
@@ -1263,17 +1305,21 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                GOTO(out, result);
        }
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
-                GOTO(out, result = err_serious(-ENOMEM));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
+               GOTO(out, result = err_serious(-ENOMEM));
 
-        mdt_set_disposition(info, ldlm_rep,
-                            (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
+       mdt_set_disposition(info, ldlm_rep,
+                           (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
        if (!lu_name_is_valid(&rr->rr_name))
                GOTO(out, result = -EPROTO);
 
+       result = mdt_lock_root_xattr(info, mdt);
+       if (result < 0)
+               GOTO(out, result);
+
 again:
-        lh = &info->mti_lh[MDT_LH_PARENT];
+       lh = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh,
                          (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
                          &rr->rr_name);
index cfe0dd1..461675c 100644 (file)
@@ -323,7 +323,7 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti,
                        /* borrow s0_lh temporarily to do mdt unlock */
                        mdt_lock_reg_init(s0_lh, einfo->ei_mode);
                        s0_lh->mlh_rreg_lh = slave_locks->handles[i];
-                       mdt_object_unlock(mti, obj, s0_lh, decref);
+                       mdt_object_unlock(mti, NULL, s0_lh, decref);
                        slave_locks->handles[i].cookie = 0ull;
                }
        }
@@ -386,6 +386,8 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        int rc;
        ENTRY;
 
+       memset(einfo, 0, sizeof(*einfo));
+
        rc = mdt_init_slaves(mti, obj, s0_fid);
        if (rc <= 0)
                RETURN(rc);
@@ -409,7 +411,6 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
                }
        }
 
-       memset(einfo, 0, sizeof(*einfo));
        einfo->ei_type = LDLM_IBITS;
        einfo->ei_mode = mode;
        einfo->ei_cb_bl = mdt_remote_blocking_ast;
@@ -752,21 +753,10 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                        GOTO(out_put, rc = -EPROTO);
 
                rc = mdt_attr_set(info, mo, ma);
-                if (rc)
-                        GOTO(out_put, rc);
-       } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
-               struct lu_buf *buf  = &info->mti_buf;
-
-               if (ma->ma_attr.la_valid != 0)
-                       GOTO(out_put, rc = -EPROTO);
-
-               buf->lb_buf = ma->ma_lmm;
-               buf->lb_len = ma->ma_lmm_size;
-               rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
-                                 buf, XATTR_NAME_LOV, 0);
                if (rc)
                        GOTO(out_put, rc);
-       } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) {
+       } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
+                  (ma->ma_valid & MA_INODE)) {
                struct lu_buf *buf  = &info->mti_buf;
                struct mdt_lock_handle  *lh;
 
@@ -780,15 +770,21 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                lh = &info->mti_lh[MDT_LH_PARENT];
                mdt_lock_reg_init(lh, LCK_PW);
 
-               rc = mdt_object_lock(info, mo, lh,
-                                    MDS_INODELOCK_XATTR);
+               rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR);
                if (rc != 0)
                        GOTO(out_put, rc);
 
-               buf->lb_buf = ma->ma_lmv;
-               buf->lb_len = ma->ma_lmv_size;
-               rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
-                                 buf, XATTR_NAME_DEFAULT_LMV, 0);
+               if (ma->ma_valid & MA_LOV) {
+                       buf->lb_buf = ma->ma_lmm;
+                       buf->lb_len = ma->ma_lmm_size;
+               } else {
+                       buf->lb_buf = ma->ma_lmv;
+                       buf->lb_len = ma->ma_lmv_size;
+               }
+               rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
+                                 (ma->ma_valid & MA_LOV) ?
+                                       XATTR_NAME_LOV : XATTR_NAME_DEFAULT_LMV,
+                                 0);
 
                mdt_object_unlock(info, mo, lh, rc);
                if (rc)
@@ -1021,7 +1017,7 @@ relock:
                rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
                                            &child_lh->mlh_rreg_lh,
                                            child_lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
+                                           MDS_INODELOCK_LOOKUP, false, false);
                if (rc != ELDLM_OK)
                        GOTO(put_child, rc);
 
@@ -1275,7 +1271,7 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                rc = mdt_remote_object_lock(info, obj,
                                            &LUSTRE_BFL_FID, lh,
                                            LCK_EX,
-                                           MDS_INODELOCK_UPDATE, false);
+                                           MDS_INODELOCK_UPDATE, false, false);
                mdt_object_put(info->mti_env, obj);
        } else {
                struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
@@ -1654,7 +1650,7 @@ out_lease:
                rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold),
                                            &lh_childp->mlh_rreg_lh,
                                            lh_childp->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
+                                           MDS_INODELOCK_LOOKUP, false, false);
                if (rc != ELDLM_OK)
                        GOTO(out_unlock_list, rc);
 
@@ -1720,7 +1716,7 @@ out_lease:
                                            mdt_object_fid(mnew),
                                            &lh_tgtp->mlh_rreg_lh,
                                            lh_tgtp->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE, false);
+                                           MDS_INODELOCK_UPDATE, false, false);
                if (rc != 0) {
                        lh_tgtp = NULL;
                        GOTO(out_put_new, rc);
@@ -2032,7 +2028,7 @@ relock:
                                                    &lh_oldp->mlh_rreg_lh,
                                                    lh_oldp->mlh_rreg_mode,
                                                    MDS_INODELOCK_LOOKUP,
-                                                   false);
+                                                   false, false);
                        if (rc != ELDLM_OK)
                                GOTO(out_put_new, rc);
 
@@ -2081,7 +2077,7 @@ relock:
                                                    &lh_oldp->mlh_rreg_lh,
                                                    lh_oldp->mlh_rreg_mode,
                                                    MDS_INODELOCK_LOOKUP,
-                                                   false);
+                                                   false, false);
                        if (rc != ELDLM_OK)
                                GOTO(out_put_old, rc);
 
index 910efb9..c119fcd 100644 (file)
@@ -132,8 +132,8 @@ int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
        ldata->ld_reclen = linkea_entry_pack(ldata->ld_lee, lname, pfid);
        ldata->ld_leh->leh_len += ldata->ld_reclen;
        ldata->ld_leh->leh_reccount++;
-       CDEBUG(D_INODE, "New link_ea name '%.*s' is added\n",
-              lname->ln_namelen, lname->ln_name);
+       CDEBUG(D_INODE, "New link_ea name '"DFID":%.*s' is added\n",
+              PFID(pfid), lname->ln_namelen, lname->ln_name);
        return 0;
 }
 EXPORT_SYMBOL(linkea_add_buf);
index bf1a02d..984f052 100644 (file)
@@ -4003,6 +4003,11 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc);
 }
 
+static int osd_invalidate(const struct lu_env *env, struct dt_object *dt)
+{
+       return 0;
+}
+
 /*
  * Index operations.
  */
@@ -4170,6 +4175,7 @@ static const struct dt_object_operations osd_obj_ops = {
         .do_xattr_del         = osd_xattr_del,
         .do_xattr_list        = osd_xattr_list,
         .do_object_sync       = osd_object_sync,
+       .do_invalidate        = osd_invalidate,
 };
 
 /**
index 7034d1d..a1d3ab7 100644 (file)
@@ -1684,6 +1684,11 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
+static int osd_invalidate(const struct lu_env *env, struct dt_object *dt)
+{
+       return 0;
+}
+
 static struct dt_object_operations osd_obj_ops = {
        .do_read_lock           = osd_object_read_lock,
        .do_write_lock          = osd_object_write_lock,
@@ -1710,6 +1715,7 @@ static struct dt_object_operations osd_obj_ops = {
        .do_xattr_del           = osd_xattr_del,
        .do_xattr_list          = osd_xattr_list,
        .do_object_sync         = osd_object_sync,
+       .do_invalidate          = osd_invalidate,
 };
 
 static struct lu_object_operations osd_lu_obj_ops = {
index 9dab59c..087d1c9 100644 (file)
@@ -455,10 +455,11 @@ static void osp_last_used_fini(const struct lu_env *env, struct osp_device *osp)
  */
 static int osp_disconnect(struct osp_device *d)
 {
+       struct obd_device *obd = d->opd_obd;
        struct obd_import *imp;
        int rc = 0;
 
-       imp = d->opd_obd->u.cli.cl_import;
+       imp = obd->u.cli.cl_import;
 
        /* Mark import deactivated now, so we don't try to reconnect if any
         * of the cleanup RPCs fails (e.g. ldlm cancel, etc).  We don't
@@ -477,8 +478,7 @@ static int osp_disconnect(struct osp_device *d)
 
        rc = ptlrpc_disconnect_import(imp, 0);
        if (rc != 0)
-               CERROR("%s: can't disconnect: rc = %d\n",
-                      d->opd_obd->obd_name, rc);
+               CERROR("%s: can't disconnect: rc = %d\n", obd->obd_name, rc);
 
        ptlrpc_invalidate_import(imp);
 
@@ -636,6 +636,8 @@ static int osp_process_config(const struct lu_env *env,
        case LCFG_PRE_CLEANUP:
                rc = osp_disconnect(d);
                osp_update_fini(env, d);
+               if (obd->obd_namespace != NULL)
+                       ldlm_namespace_free_prior(obd->obd_namespace, NULL, 1);
                break;
        case LCFG_CLEANUP:
                lu_dev_del_linkage(dev->ld_site, dev);
index 64c727d..66f2e13 100644 (file)
@@ -280,11 +280,6 @@ struct osp_xattr_entry {
        char                     oxe_buf[0];
 };
 
-struct osp_object_attr {
-       struct lu_attr          ooa_attr;
-       struct list_head        ooa_xattr_list;
-};
-
 /* this is a top object */
 struct osp_object {
        struct lu_object_header opo_header;
@@ -296,7 +291,8 @@ struct osp_object {
        /* read/write lock for md osp object */
        struct rw_semaphore     opo_sem;
        const struct lu_env     *opo_owner;
-       struct osp_object_attr *opo_ooa;
+       struct lu_attr          opo_attr;
+       struct list_head        opo_xattr_list;
        /* Protect opo_ooa. */
        spinlock_t              opo_lock;
 };
@@ -733,6 +729,7 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
                          const char *name, struct thandle *th);
 int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *th);
+int osp_invalidate(const struct lu_env *env, struct dt_object *dt);
 
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th);
@@ -748,7 +745,6 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di);
 int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
                   void *key_rec);
 int osp_it_next_page(const struct lu_env *env, struct dt_it *di);
-int osp_oac_init(struct osp_object *obj);
 /* osp_md_object.c */
 int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object *dt,
index 21ccacf..461df06 100644 (file)
@@ -88,16 +88,11 @@ static int osp_object_create_interpreter(const struct lu_env *env,
                obj->opo_non_exist = 1;
        }
 
-       /* Invalid the opo cache for the object after the object
-        * is being created, so attr_get will try to get attr
-        * from the remote object. XXX this can be improved when
-        * we have object lock/cache invalidate mechanism in OSP
-        * layer */
-       if (obj->opo_ooa != NULL) {
-               spin_lock(&obj->opo_lock);
-               obj->opo_ooa->ooa_attr.la_valid = 0;
-               spin_unlock(&obj->opo_lock);
-       }
+       /*
+        * invalidate opo cache for the object after the object is created, so
+        * attr_get will try to get attr from remote object.
+        */
+       osp_invalidate(env, &obj->opo_obj);
 
        return 0;
 }
@@ -125,15 +120,6 @@ int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object_format *dof,
                                 struct thandle *th)
 {
-       struct osp_object *obj = dt2osp_obj(dt);
-       int               rc;
-
-       if (obj->opo_ooa == NULL) {
-               rc = osp_oac_init(obj);
-               if (rc != 0)
-                       return rc;
-       }
-
        return osp_trans_update_request_create(th);
 }
 
@@ -196,8 +182,7 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
        dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
        dt2osp_obj(dt)->opo_non_exist = 0;
 
-       LASSERT(obj->opo_ooa != NULL);
-       obj->opo_ooa->ooa_attr = *attr;
+       obj->opo_attr = *attr;
 out:
        return rc;
 }
@@ -1033,6 +1018,7 @@ struct dt_object_operations osp_md_obj_ops = {
        .do_index_try         = osp_md_index_try,
        .do_object_lock       = osp_md_object_lock,
        .do_object_unlock     = osp_md_object_unlock,
+       .do_invalidate        = osp_invalidate,
 };
 
 /**
@@ -1115,10 +1101,8 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
        /* XXX: how about the write error happened later? */
        *pos += buf->lb_len;
 
-       if (obj->opo_ooa != NULL &&
-           obj->opo_ooa->ooa_attr.la_valid & LA_SIZE &&
-           obj->opo_ooa->ooa_attr.la_size < *pos)
-               obj->opo_ooa->ooa_attr.la_size = *pos;
+       if (obj->opo_attr.la_valid & LA_SIZE && obj->opo_attr.la_size < *pos)
+               obj->opo_attr.la_size = *pos;
 
        RETURN(buf->lb_len);
 }
index 96e3135..1f46f77 100644 (file)
  *     char                     oxe_buf[0];
  * };
  *
- * struct osp_object_attr {
- *     struct lu_attr          ooa_attr;
- *     struct list_head        ooa_xattr_list;
- * };
- *
  * struct osp_object {
  *     ...
- *     struct osp_object_attr *opo_ooa;
+ *     struct lu_attr          opo_attr;
+ *     struct list_head        opo_xattr_list;
  *     spinlock_t              opo_lock;
  *     ...
  * };
  *
  * The basic attributes, such as owner/mode/flags, are stored in the
- * osp_object_attr::ooa_attr. The extended attributes will be stored
+ * osp_object::opo_attr. The extended attributes will be stored
  * as osp_xattr_entry. Every extended attribute has an independent
  * osp_xattr_entry, and all the osp_xattr_entry are linked into the
- * osp_object_attr::ooa_xattr_list. The OSP object attributes cache
+ * osp_object::opo_xattr_list. The OSP object attributes cache
  * is protected by the osp_object::opo_lock.
  *
  * Not all OSP objects have an attributes cache because maintaining
@@ -164,41 +160,12 @@ static void osp_object_assign_fid(const struct lu_env *env,
 }
 
 /**
- * Initialize the OSP object attributes cache.
- *
- * \param[in] obj      pointer to the OSP object
- *
- * \retval             0 for success
- * \retval             negative error number on failure
- */
-int osp_oac_init(struct osp_object *obj)
-{
-       struct osp_object_attr *ooa;
-
-       OBD_ALLOC_PTR(ooa);
-       if (ooa == NULL)
-               return -ENOMEM;
-
-       INIT_LIST_HEAD(&ooa->ooa_xattr_list);
-       spin_lock(&obj->opo_lock);
-       if (likely(obj->opo_ooa == NULL)) {
-               obj->opo_ooa = ooa;
-               spin_unlock(&obj->opo_lock);
-       } else {
-               spin_unlock(&obj->opo_lock);
-               OBD_FREE_PTR(ooa);
-       }
-
-       return 0;
-}
-
-/**
  * Find the named extended attribute in the OSP object attributes cache.
  *
  * The caller should take the osp_object::opo_lock before calling
  * this function.
  *
- * \param[in] ooa      pointer to the OSP object attributes cache
+ * \param[in] obj      pointer to the OSP object
  * \param[in] name     the name of the extended attribute
  * \param[in] namelen  the name length of the extended attribute
  *
@@ -207,12 +174,12 @@ int osp_oac_init(struct osp_object *obj)
  *                     in the cache
  */
 static struct osp_xattr_entry *
-osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
-                         const char *name, size_t namelen)
+osp_oac_xattr_find_locked(struct osp_object *obj, const char *name,
+                         size_t namelen)
 {
        struct osp_xattr_entry *oxe;
 
-       list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
+       list_for_each_entry(oxe, &obj->opo_xattr_list, oxe_list) {
                if (namelen == oxe->oxe_namelen &&
                    strncmp(name, oxe->oxe_buf, namelen) == 0)
                        return oxe;
@@ -241,15 +208,12 @@ static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
        struct osp_xattr_entry *oxe = NULL;
 
        spin_lock(&obj->opo_lock);
-       if (obj->opo_ooa != NULL) {
-               oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
-                                               strlen(name));
-               if (oxe != NULL) {
-                       if (unlink)
-                               list_del_init(&oxe->oxe_list);
-                       else
-                               atomic_inc(&oxe->oxe_ref);
-               }
+       oxe = osp_oac_xattr_find_locked(obj, name, strlen(name));
+       if (oxe != NULL) {
+               if (unlink)
+                       list_del_init(&oxe->oxe_list);
+               else
+                       atomic_inc(&oxe->oxe_ref);
        }
        spin_unlock(&obj->opo_lock);
 
@@ -274,14 +238,11 @@ static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
 static struct osp_xattr_entry *
 osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
 {
-       struct osp_object_attr *ooa     = obj->opo_ooa;
        struct osp_xattr_entry *oxe;
        struct osp_xattr_entry *tmp     = NULL;
        size_t                  namelen = strlen(name);
        size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
 
-       LASSERT(ooa != NULL);
-
        oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL)
                return oxe;
@@ -299,9 +260,9 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       tmp = osp_oac_xattr_find_locked(ooa, name, namelen);
+       tmp = osp_oac_xattr_find_locked(obj, name, namelen);
        if (tmp == NULL)
-               list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+               list_add_tail(&oxe->oxe_list, &obj->opo_xattr_list);
        else
                atomic_inc(&tmp->oxe_ref);
        spin_unlock(&obj->opo_lock);
@@ -335,13 +296,10 @@ static struct osp_xattr_entry *
 osp_oac_xattr_replace(struct osp_object *obj,
                      struct osp_xattr_entry **poxe, size_t len)
 {
-       struct osp_object_attr *ooa     = obj->opo_ooa;
        struct osp_xattr_entry *oxe;
        size_t                  namelen = (*poxe)->oxe_namelen;
        size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
 
-       LASSERT(ooa != NULL);
-
        OBD_ALLOC(oxe, size);
        if (unlikely(oxe == NULL))
                return NULL;
@@ -355,11 +313,11 @@ osp_oac_xattr_replace(struct osp_object *obj,
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen);
+       *poxe = osp_oac_xattr_find_locked(obj, oxe->oxe_buf, namelen);
        LASSERT(*poxe != NULL);
 
        list_del_init(&(*poxe)->oxe_list);
-       list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+       list_add_tail(&oxe->oxe_list, &obj->opo_xattr_list);
        spin_unlock(&obj->opo_lock);
 
        return oxe;
@@ -423,15 +381,9 @@ static int osp_get_attr_from_reply(const struct lu_env *env,
 
        lustre_get_wire_obdo(NULL, lobdo, wobdo);
        spin_lock(&obj->opo_lock);
-       if (obj->opo_ooa != NULL) {
-               la_from_obdo(&obj->opo_ooa->ooa_attr, lobdo, lobdo->o_valid);
-               if (attr != NULL)
-                       *attr = obj->opo_ooa->ooa_attr;
-       } else {
-               LASSERT(attr != NULL);
-
-               la_from_obdo(attr, lobdo, lobdo->o_valid);
-       }
+       la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid);
+       if (attr != NULL)
+               *attr = obj->opo_attr;
        spin_unlock(&obj->opo_lock);
 
        return 0;
@@ -462,8 +414,6 @@ static int osp_attr_get_interpterer(const struct lu_env *env,
 {
        struct lu_attr *attr = data;
 
-       LASSERT(obj->opo_ooa != NULL);
-
        if (rc == 0) {
                osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
                obj->opo_non_exist = 0;
@@ -506,16 +456,9 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt)
        struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
        int                      rc     = 0;
 
-       if (obj->opo_ooa == NULL) {
-               rc = osp_oac_init(obj);
-               if (rc != 0)
-                       return rc;
-       }
-
        mutex_lock(&osp->opd_async_requests_mutex);
        rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
-                                     &obj->opo_ooa->ooa_attr,
-                                     sizeof(struct obdo),
+                                     &obj->opo_attr, sizeof(struct obdo),
                                      osp_attr_get_interpterer);
        mutex_unlock(&osp->opd_async_requests_mutex);
 
@@ -554,16 +497,14 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
        if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
                RETURN(-ENOENT);
 
-       if (obj->opo_ooa != NULL) {
-               spin_lock(&obj->opo_lock);
-               if (obj->opo_ooa->ooa_attr.la_valid != 0 && !obj->opo_stale) {
-                       *attr = obj->opo_ooa->ooa_attr;
-                       spin_unlock(&obj->opo_lock);
-
-                       RETURN(0);
-               }
+       spin_lock(&obj->opo_lock);
+       if (obj->opo_attr.la_valid != 0 && !obj->opo_stale) {
+               *attr = obj->opo_attr;
                spin_unlock(&obj->opo_lock);
+
+               RETURN(0);
        }
+       spin_unlock(&obj->opo_lock);
 
        update = osp_update_request_create(dev);
        if (IS_ERR(update))
@@ -606,8 +547,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
 
        spin_lock(&obj->opo_lock);
-       if (obj->opo_stale)
-               obj->opo_stale = 0;
+       obj->opo_stale = 0;
        spin_unlock(&obj->opo_lock);
 
        GOTO(out, rc);
@@ -729,11 +669,11 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                CDEBUG(D_INFO, "(1) set attr "DFID": rc = %d\n",
                       PFID(&dt->do_lu.lo_header->loh_fid), rc);
 
-               if (rc != 0 || o->opo_ooa == NULL)
+               if (rc != 0)
                        RETURN(rc);
 
                /* Update the OSP object attributes cache. */
-               la = &o->opo_ooa->ooa_attr;
+               la = &o->opo_attr;
                spin_lock(&o->opo_lock);
                if (attr->la_valid & LA_UID) {
                        la->la_uid = attr->la_uid;
@@ -773,12 +713,9 @@ static int osp_xattr_get_interpterer(const struct lu_env *env,
                                     struct osp_object *obj,
                                     void *data, int index, int rc)
 {
-       struct osp_object_attr  *ooa  = obj->opo_ooa;
        struct osp_xattr_entry  *oxe  = data;
        struct lu_buf           *rbuf = &osp_env_info(env)->osi_lb2;
 
-       LASSERT(ooa != NULL);
-
        if (rc == 0) {
                size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1;
 
@@ -848,12 +785,6 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(buf->lb_len == 0))
                return 0;
 
-       if (obj->opo_ooa == NULL) {
-               rc = osp_oac_init(obj);
-               if (rc != 0)
-                       return rc;
-       }
-
        oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
        if (oxe == NULL)
                return -ENOMEM;
@@ -945,34 +876,31 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
-       /* Only cache xattr for OST object */
-       if (!osp->opd_connect_mdt) {
-               oxe = osp_oac_xattr_find(obj, name, false);
-               if (oxe != NULL) {
-                       spin_lock(&obj->opo_lock);
-                       if (oxe->oxe_ready) {
-                               if (!oxe->oxe_exist)
-                                       GOTO(unlock, rc = -ENODATA);
+       oxe = osp_oac_xattr_find(obj, name, false);
+       if (oxe != NULL) {
+               spin_lock(&obj->opo_lock);
+               if (oxe->oxe_ready) {
+                       if (!oxe->oxe_exist)
+                               GOTO(unlock, rc = -ENODATA);
 
-                               if (buf->lb_buf == NULL)
-                                       GOTO(unlock, rc = oxe->oxe_vallen);
+                       if (buf->lb_buf == NULL)
+                               GOTO(unlock, rc = oxe->oxe_vallen);
 
-                               if (buf->lb_len < oxe->oxe_vallen)
-                                       GOTO(unlock, rc = -ERANGE);
+                       if (buf->lb_len < oxe->oxe_vallen)
+                               GOTO(unlock, rc = -ERANGE);
 
-                               memcpy(buf->lb_buf, oxe->oxe_value,
-                                      oxe->oxe_vallen);
+                       memcpy(buf->lb_buf, oxe->oxe_value,
+                              oxe->oxe_vallen);
 
-                               GOTO(unlock, rc = oxe->oxe_vallen);
+                       GOTO(unlock, rc = oxe->oxe_vallen);
 
 unlock:
-                               spin_unlock(&obj->opo_lock);
-                               osp_oac_xattr_put(oxe);
-
-                               return rc;
-                       }
                        spin_unlock(&obj->opo_lock);
+                       osp_oac_xattr_put(oxe);
+
+                       return rc;
                }
+               spin_unlock(&obj->opo_lock);
        }
        update = osp_update_request_create(dev);
        if (IS_ERR(update))
@@ -993,9 +921,6 @@ unlock:
                        obj->opo_non_exist = 1;
                }
 
-               if (obj->opo_ooa == NULL)
-                       GOTO(out, rc);
-
                if (oxe == NULL)
                        oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
 
@@ -1041,8 +966,6 @@ unlock:
                GOTO(out, rc = -ERANGE);
 
        memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
-       if (obj->opo_ooa == NULL || osp->opd_connect_mdt)
-               GOTO(out, rc);
 
        if (oxe == NULL) {
                oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len);
@@ -1155,7 +1078,6 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
-       struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_update_request *update;
        struct osp_xattr_entry  *oxe;
        int                     rc;
@@ -1169,7 +1091,7 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
 
        rc = osp_update_rpc_pack(env, xattr_set, update, OUT_XATTR_SET,
                                 lu_object_fid(&dt->do_lu), buf, name, fl);
-       if (rc != 0 || o->opo_ooa == NULL || osp->opd_connect_mdt)
+       if (rc != 0)
                RETURN(rc);
 
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
@@ -1271,7 +1193,7 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
        rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL,
                                 fid, name);
-       if (rc != 0 || o->opo_ooa == NULL)
+       if (rc != 0)
                return rc;
 
        oxe = osp_oac_xattr_find(o, name, true);
@@ -1283,6 +1205,37 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
+ * Implement OSP layer dt_object_operations::do_invalidate() interface.
+ *
+ * Invalidate attributes cached on the specified MDT/OST object.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] dt       pointer to the OSP layer dt_object
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int osp_invalidate(const struct lu_env *env, struct dt_object *dt)
+{
+       struct osp_object *obj = dt2osp_obj(dt);
+       struct osp_xattr_entry *oxe;
+       struct osp_xattr_entry *tmp;
+       ENTRY;
+
+       spin_lock(&obj->opo_lock);
+       list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
+               oxe->oxe_ready = 0;
+               list_del_init(&oxe->oxe_list);
+               osp_oac_xattr_put(oxe);
+       }
+       obj->opo_attr.la_valid = 0;
+       obj->opo_stale = 1;
+       spin_unlock(&obj->opo_lock);
+
+       RETURN(0);
+}
+
+/**
  * Implement OSP layer dt_object_operations::do_declare_create() interface.
  *
  * Declare that the caller will create the OST object.
@@ -2133,6 +2086,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
 
        spin_lock_init(&po->opo_lock);
        o->lo_header->loh_attr |= LOHA_REMOTE;
+       INIT_LIST_HEAD(&po->opo_xattr_list);
 
        if (is_ost_obj(o)) {
                po->opo_obj.do_ops = &osp_obj_ops;
@@ -2141,6 +2095,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
 
                po->opo_obj.do_ops = &osp_md_obj_ops;
                po->opo_obj.do_body_ops = &osp_md_body_ops;
+
                if (conf != NULL && conf->loc_flags & LOC_F_NEW) {
                        po->opo_non_exist = 1;
                } else {
@@ -2174,26 +2129,20 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o)
 {
        struct osp_object       *obj = lu2osp_obj(o);
        struct lu_object_header *h = o->lo_header;
+       struct osp_xattr_entry *oxe;
+       struct osp_xattr_entry *tmp;
+       int                     count;
 
        dt_object_fini(&obj->opo_obj);
        lu_object_header_fini(h);
-       if (obj->opo_ooa != NULL) {
-               struct osp_xattr_entry *oxe;
-               struct osp_xattr_entry *tmp;
-               int                     count;
-
-               list_for_each_entry_safe(oxe, tmp,
-                                        &obj->opo_ooa->ooa_xattr_list,
-                                        oxe_list) {
-                       list_del(&oxe->oxe_list);
-                       count = atomic_read(&oxe->oxe_ref);
-                       LASSERTF(count == 1,
-                                "Still has %d users on the xattr entry %.*s\n",
-                                count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
-
-                       OBD_FREE(oxe, oxe->oxe_buflen);
-               }
-               OBD_FREE_PTR(obj->opo_ooa);
+       list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
+               list_del(&oxe->oxe_list);
+               count = atomic_read(&oxe->oxe_ref);
+               LASSERTF(count == 1,
+                        "Still has %d users on the xattr entry %.*s\n",
+                        count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
+
+               OBD_FREE(oxe, oxe->oxe_buflen);
        }
        OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
 }
index 7bb3bea..4d2adc6 100644 (file)
@@ -491,7 +491,6 @@ static void osp_thandle_invalidate_object(const struct lu_env *env,
                struct object_update_request *our_req = ours->ours_req;
                unsigned int i;
                struct lu_object *obj;
-               struct osp_object *osp_obj;
 
                for (i = 0; i < our_req->ourq_count; i++) {
                        struct object_update *update;
@@ -512,13 +511,7 @@ static void osp_thandle_invalidate_object(const struct lu_env *env,
                        if (IS_ERR(obj))
                                break;
 
-                       osp_obj = lu2osp_obj(obj);
-                       if (osp_obj->opo_ooa != NULL) {
-                               spin_lock(&osp_obj->opo_lock);
-                               osp_obj->opo_ooa->ooa_attr.la_valid = 0;
-                               osp_obj->opo_stale = 1;
-                               spin_unlock(&osp_obj->opo_lock);
-                       }
+                       osp_invalidate(env, lu2dt_obj(obj));
                        lu_object_put(env, obj);
                }
        }
index dfc8ba7..6618b42 100644 (file)
@@ -13162,7 +13162,7 @@ test_230f() {
 
        # a should be migrated to MDT1, since no other links on MDT0
        $LFS migrate -m 1 $DIR/$tdir/migrate_dir ||
-               error "migrate dir fails"
+               error "#1 migrate dir fails"
        mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir)
        [ $mdt_index == 1 ] || error "migrate_dir is not on MDT1"
        mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir/a)
@@ -13170,12 +13170,12 @@ test_230f() {
 
        # a should stay on MDT1, because it is a mulitple link file
        $LFS migrate -m 0 $DIR/$tdir/migrate_dir ||
-               error "migrate dir fails"
+               error "#2 migrate dir fails"
        mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir/a)
        [ $mdt_index == 1 ] || error "a is not on MDT1"
 
        $LFS migrate -m 1 $DIR/$tdir/migrate_dir ||
-               error "migrate dir fails"
+               error "#3 migrate dir fails"
 
        a_fid=$($LFS path2fid $DIR/$tdir/migrate_dir/a)
        ln_fid=$($LFS path2fid $DIR/$tdir/other_dir/ln1)
@@ -13186,7 +13186,7 @@ test_230f() {
 
        # a should be migrated to MDT0, since no other links on MDT1
        $LFS migrate -m 0 $DIR/$tdir/migrate_dir ||
-               error "migrate dir fails"
+               error "#4 migrate dir fails"
        mdt_index=$($LFS getstripe -M $DIR/$tdir/migrate_dir/a)
        [ $mdt_index == 0 ] || error "a is not on MDT0"
 
@@ -14901,6 +14901,84 @@ test_405() {
 }
 run_test 405 "Various layout swap lock tests"
 
+test_406() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       [ $OSTCOUNT -lt 2 ] && skip "needs >= 2 OSTs" && return
+       [ -n "$FILESET" ] && skip "SKIP due to FILESET set" && return
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.8.50) ] &&
+               skip "Need MDS version at least 2.8.50" && return
+
+       local def_stripenr=$($GETSTRIPE -c $MOUNT)
+       local def_stripe_size=$($GETSTRIPE -S $MOUNT)
+       local def_stripe_offset=$($GETSTRIPE -i $MOUNT)
+       local def_pool=$($GETSTRIPE -p $MOUNT)
+
+       local test_pool=$TESTNAME
+       pool_add $test_pool || error "pool_add failed"
+       pool_add_targets $test_pool 0 $(($OSTCOUNT - 1)) 1 ||
+               error "pool_add_targets failed"
+
+       # parent set default stripe count only, child will stripe from both
+       # parent and fs default
+       $SETSTRIPE -c 1 -i 1 -S $((def_stripe_size * 2)) -p $test_pool $MOUNT ||
+               error "setstripe $MOUNT failed"
+       $LFS mkdir -c $MDSCOUNT $DIR/$tdir || error "mkdir $tdir failed"
+       $SETSTRIPE -c $OSTCOUNT $DIR/$tdir || error "setstripe $tdir failed"
+       for i in $(seq 10); do
+               local f=$DIR/$tdir/$tfile.$i
+               touch $f || error "touch failed"
+               local count=$($GETSTRIPE -c $f)
+               [ $count -eq $OSTCOUNT ] ||
+                       error "$f stripe count $count != $OSTCOUNT"
+               local offset=$($GETSTRIPE -i $f)
+               [ $offset -eq 1 ] || error "$f stripe offset $offset != 1"
+               local size=$($GETSTRIPE -S $f)
+               [ $size -eq $((def_stripe_size * 2)) ] ||
+                       error "$f stripe size $size != $((def_stripe_size * 2))"
+               local pool=$($GETSTRIPE -p $f)
+               [ $pool == $test_pool ] || error "$f pool $pool != $test_pool"
+       done
+
+       # change fs default striping, delete parent default striping, now child
+       # will stripe from new fs default striping only
+       $SETSTRIPE -c 1 -S $def_stripe_size -i 0 $MOUNT ||
+               error "change $MOUNT default stripe failed"
+       $SETSTRIPE -c 0 $DIR/$tdir || error "delete $tdir default stripe failed"
+       for i in $(seq 11 20); do
+               local f=$DIR/$tdir/$tfile.$i
+               touch $f || error "touch $f failed"
+               local count=$($GETSTRIPE -c $f)
+               [ $count -eq 1 ] || error "$f stripe count $count != 1"
+               local offset=$($GETSTRIPE -i $f)
+               [ $offset -eq 0 ] || error "$f stripe offset $offset != 0"
+               local size=$($GETSTRIPE -S $f)
+               [ $size -eq $def_stripe_size ] ||
+                       error "$f stripe size $size != $def_stripe_size"
+               local pool=$($GETSTRIPE -p $f)
+               [ "#$pool" == "#" ] || error "$f pool $pool is set"
+
+       done
+
+       unlinkmany $DIR/$tdir/$tfile. 1 20
+
+       # restore FS default striping
+       if [ -z $def_pool ]; then
+               $SETSTRIPE -c $def_stripenr -S $def_stripe_size \
+                       -i $def_stripe_offset $MOUNT ||
+                       error "restore default striping failed"
+       else
+               $SETSTRIPE -c $def_stripenr -S $def_stripe_size -p $def_pool \
+                       -i $def_stripe_offset $MOUNT ||
+                       error "restore default striping with $def_pool failed"
+       fi
+
+       local f=$DIR/$tdir/$tfile
+       pool_remove_all_targets $test_pool $f
+       pool_remove $test_pool $f
+}
+run_test 406 "DNE support fs default striping"
+
 #
 # tests that do cleanup/setup should be run at the end
 #