Whamcloud - gitweb
LU-5814 lov: add cl_object_layout_get() 80/13680/11
authorJohn L. Hammond <john.hammond@intel.com>
Thu, 26 Mar 2015 03:56:13 +0000 (20:56 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 9 Apr 2015 03:26:41 +0000 (03:26 +0000)
Add cl_object_layout_get() to return the layout and generation of an
object. Replace some direct accesses to object LSM with calls to this
function.

In ll_getxattr() factor out the LOV xattr specific handling into a new
function ll_getxattr_lov() which calls cl_object_layout_get(). In
ll_listxattr() call ll_getxattr_lov() to determine if a lustre.lov
xattr should be emitted.  Add lov_lsm_pack() to generate LOV xattrs
from a LSM.

Remove the unused functions ccc_inode_lsm_{get,put}() and
lov_lsm_get().

Signed-off-by: John L. Hammond <john.hammond@intel.com>
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: If0d7696473848c8eace3c5b03c016514bc28509f
Reviewed-on: http://review.whamcloud.com/13680
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
12 files changed:
lustre/include/cl_object.h
lustre/llite/file.c
lustre/llite/lcommon_cl.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/vvp_internal.h
lustre/llite/vvp_object.c
lustre/llite/xattr.c
lustre/lov/lov_internal.h
lustre/lov/lov_object.c
lustre/lov/lov_pack.c
lustre/obdclass/cl_object.c

index 63be0fc..c32bb0c 100644 (file)
@@ -305,6 +305,24 @@ enum {
        OBJECT_CONF_WAIT = 2
 };
 
        OBJECT_CONF_WAIT = 2
 };
 
+enum {
+       CL_LAYOUT_GEN_NONE      = (u32)-2,      /* layout lock was cancelled */
+       CL_LAYOUT_GEN_EMPTY     = (u32)-1,      /* for empty layout */
+};
+
+struct cl_layout {
+       /** the buffer to return the layout in lov_mds_md format. */
+       struct lu_buf   cl_buf;
+       /** size of layout in lov_mds_md format. */
+       size_t          cl_size;
+       /** Layout generation. */
+       u32             cl_layout_gen;
+       /** True if this is a released file.
+        * Temporarily added for released file truncate in ll_setattr_raw().
+        * It will be removed later. -Jinshan */
+       bool            cl_is_released;
+};
+
 /**
  * Operations implemented for each cl object layer.
  *
 /**
  * Operations implemented for each cl object layer.
  *
@@ -427,6 +445,11 @@ struct cl_object_operations {
         */
        int (*coo_data_version)(const struct lu_env *env, struct cl_object *obj,
                                __u64 *version, int flags);
         */
        int (*coo_data_version)(const struct lu_env *env, struct cl_object *obj,
                                __u64 *version, int flags);
+       /**
+        * Get layout and generation of the object.
+        */
+       int (*coo_layout_get)(const struct lu_env *env, struct cl_object *obj,
+                             struct cl_layout *layout);
 };
 
 /**
 };
 
 /**
@@ -2207,6 +2230,8 @@ int cl_object_obd_info_get(const struct lu_env *env, struct cl_object *obj,
                           struct ptlrpc_request_set *set);
 int cl_object_data_version(const struct lu_env *env, struct cl_object *obj,
                           __u64 *version, int flags);
                           struct ptlrpc_request_set *set);
 int cl_object_data_version(const struct lu_env *env, struct cl_object *obj,
                           __u64 *version, int flags);
+int cl_object_layout_get(const struct lu_env *env, struct cl_object *obj,
+                        struct cl_layout *cl);
 
 /**
  * Returns true, iff \a o0 and \a o1 are slices of the same object.
 
 /**
  * Returns true, iff \a o0 and \a o1 are slices of the same object.
index 0315ebd..b7d5689 100644 (file)
@@ -3548,48 +3548,54 @@ ll_iocontrol_call(struct inode *inode, struct file *file,
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
+       struct cl_object *obj = lli->lli_clob;
        struct cl_env_nest nest;
        struct lu_env *env;
        struct cl_env_nest nest;
        struct lu_env *env;
-       int result;
+       int rc;
        ENTRY;
 
        ENTRY;
 
-       if (lli->lli_clob == NULL)
+       if (obj == NULL)
                RETURN(0);
 
        env = cl_env_nested_get(&nest);
        if (IS_ERR(env))
                RETURN(PTR_ERR(env));
 
                RETURN(0);
 
        env = cl_env_nested_get(&nest);
        if (IS_ERR(env))
                RETURN(PTR_ERR(env));
 
-       result = cl_conf_set(env, lli->lli_clob, conf);
-       cl_env_nested_put(&nest, env);
+       rc = cl_conf_set(env, lli->lli_clob, conf);
+       if (rc < 0)
+               GOTO(out, rc);
 
        if (conf->coc_opc == OBJECT_CONF_SET) {
                struct ldlm_lock *lock = conf->coc_lock;
 
        if (conf->coc_opc == OBJECT_CONF_SET) {
                struct ldlm_lock *lock = conf->coc_lock;
+               struct cl_layout cl = {
+                       .cl_layout_gen = 0,
+               };
 
                LASSERT(lock != NULL);
                LASSERT(ldlm_has_layout(lock));
 
                LASSERT(lock != NULL);
                LASSERT(ldlm_has_layout(lock));
-               if (result == 0) {
-                       struct lustre_md *md = conf->u.coc_md;
-                       __u32 gen = LL_LAYOUT_GEN_EMPTY;
-
-                       /* it can only be allowed to match after layout is
-                        * applied to inode otherwise false layout would be
-                        * seen. Applying layout shoud happen before dropping
-                        * the intent lock. */
-                       ldlm_lock_allow_match(lock);
-
-                       lli->lli_has_smd = lsm_has_objects(md->lsm);
-                       if (md->lsm != NULL)
-                               gen = md->lsm->lsm_layout_gen;
-
-                       CDEBUG(D_VFSTRACE,
-                              DFID ": layout version change: %u -> %u\n",
-                              PFID(&lli->lli_fid), ll_layout_version_get(lli),
-                              gen);
-                       ll_layout_version_set(lli, gen);
-               }
+
+               /* it can only be allowed to match after layout is
+                * applied to inode otherwise false layout would be
+                * seen. Applying layout shoud happen before dropping
+                * the intent lock. */
+               ldlm_lock_allow_match(lock);
+
+               rc = cl_object_layout_get(env, obj, &cl);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               CDEBUG(D_VFSTRACE,
+                      DFID": layout version change: %u -> %u\n",
+                      PFID(&lli->lli_fid), ll_layout_version_get(lli),
+                      cl.cl_layout_gen);
+               ll_layout_version_set(lli, cl.cl_layout_gen);
+               lli->lli_has_smd = lsm_has_objects(conf->u.coc_md->lsm);
        }
        }
-       RETURN(result);
+
+out:
+       cl_env_nested_put(&nest, env);
+
+       RETURN(rc);
 }
 
 /* Fetch layout from MDT with getxattr request, if it's not ready yet */
 }
 
 /* Fetch layout from MDT with getxattr request, if it's not ready yet */
@@ -3665,7 +3671,7 @@ out:
  * in this function.
  */
 static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
  * in this function.
  */
 static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
-                               struct inode *inode, __u32 *gen, bool reconf)
+                             struct inode *inode)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
        struct ll_sb_info    *sbi = ll_i2sbi(inode);
 {
        struct ll_inode_info *lli = ll_i2info(inode);
        struct ll_sb_info    *sbi = ll_i2sbi(inode);
@@ -3683,8 +3689,8 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
        LASSERT(lock != NULL);
        LASSERT(ldlm_has_layout(lock));
 
        LASSERT(lock != NULL);
        LASSERT(ldlm_has_layout(lock));
 
-       LDLM_DEBUG(lock, "file "DFID"(%p) being reconfigured: %d",
-                  PFID(&lli->lli_fid), inode, reconf);
+       LDLM_DEBUG(lock, "file "DFID"(%p) being reconfigured",
+                  PFID(&lli->lli_fid), inode);
 
        /* in case this is a caching lock and reinstate with new inode */
        md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
 
        /* in case this is a caching lock and reinstate with new inode */
        md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
@@ -3695,16 +3701,8 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
        /* checking lvb_ready is racy but this is okay. The worst case is
         * that multi processes may configure the file on the same time. */
 
        /* checking lvb_ready is racy but this is okay. The worst case is
         * that multi processes may configure the file on the same time. */
 
-       if (lvb_ready || !reconf) {
-               rc = -ENODATA;
-               if (lvb_ready) {
-                       /* layout_gen must be valid if layout lock is not
-                        * cancelled and stripe has already set */
-                       *gen = ll_layout_version_get(lli);
-                       rc = 0;
-               }
-               GOTO(out, rc);
-       }
+       if (lvb_ready)
+               GOTO(out, rc = 0);
 
        rc = ll_layout_fetch(inode, lock);
        if (rc < 0)
 
        rc = ll_layout_fetch(inode, lock);
        if (rc < 0)
@@ -3717,19 +3715,18 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
        if (lock->l_lvb_data != NULL) {
                rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
                                  lock->l_lvb_data, lock->l_lvb_len);
        if (lock->l_lvb_data != NULL) {
                rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
                                  lock->l_lvb_data, lock->l_lvb_len);
-               if (rc >= 0) {
-                       *gen = LL_LAYOUT_GEN_EMPTY;
-                       if (md.lsm != NULL)
-                               *gen = md.lsm->lsm_layout_gen;
-                       rc = 0;
-               } else {
+               if (rc < 0) {
                        CERROR("%s: file "DFID" unpackmd error: %d\n",
                                ll_get_fsname(inode->i_sb, NULL, 0),
                                PFID(&lli->lli_fid), rc);
                        CERROR("%s: file "DFID" unpackmd error: %d\n",
                                ll_get_fsname(inode->i_sb, NULL, 0),
                                PFID(&lli->lli_fid), rc);
+                       GOTO(out, rc);
                }
                }
+
+               LASSERTF(md.lsm != NULL, "lvb_data = %p, lvb_len = %u\n",
+                        lock->l_lvb_data, lock->l_lvb_len);
+
+               rc = 0;
        }
        }
-       if (rc < 0)
-               GOTO(out, rc);
 
        /* set layout to file. Unlikely this will fail as old layout was
         * surely eliminated */
 
        /* set layout to file. Unlikely this will fail as old layout was
         * surely eliminated */
@@ -3771,20 +3768,7 @@ out:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-/**
- * This function checks if there exists a LAYOUT lock on the client side,
- * or enqueues it if it doesn't have one in cache.
- *
- * This function will not hold layout lock so it may be revoked any time after
- * this function returns. Any operations depend on layout should be redone
- * in that case.
- *
- * This function should be called before lov_io_init() to get an uptodate
- * layout version, the caller should save the version number and after IO
- * is finished, this function should be called again to verify that layout
- * is not changed during IO time.
- */
-int ll_layout_refresh(struct inode *inode, __u32 *gen)
+static int ll_layout_refresh_locked(struct inode *inode)
 {
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
 {
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
@@ -3801,37 +3785,23 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        int rc;
        ENTRY;
 
        int rc;
        ENTRY;
 
-       *gen = ll_layout_version_get(lli);
-       if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK) || *gen != LL_LAYOUT_GEN_NONE)
-               RETURN(0);
-
-       /* sanity checks */
-       LASSERT(fid_is_sane(ll_inode2fid(inode)));
-       LASSERT(S_ISREG(inode->i_mode));
-
-       /* take layout lock mutex to enqueue layout lock exclusively. */
-       mutex_lock(&lli->lli_layout_mutex);
-
 again:
        /* mostly layout lock is caching on the local side, so try to match
         * it before grabbing layout lock mutex. */
        mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
                               LCK_CR | LCK_CW | LCK_PR | LCK_PW);
        if (mode != 0) { /* hit cached lock */
 again:
        /* mostly layout lock is caching on the local side, so try to match
         * it before grabbing layout lock mutex. */
        mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
                               LCK_CR | LCK_CW | LCK_PR | LCK_PW);
        if (mode != 0) { /* hit cached lock */
-               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               rc = ll_layout_lock_set(&lockh, mode, inode);
                if (rc == -EAGAIN)
                        goto again;
 
                if (rc == -EAGAIN)
                        goto again;
 
-               mutex_unlock(&lli->lli_layout_mutex);
                RETURN(rc);
        }
 
        op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
                                     0, 0, LUSTRE_OPC_ANY, NULL);
                RETURN(rc);
        }
 
        op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
                                     0, 0, LUSTRE_OPC_ANY, NULL);
-       if (IS_ERR(op_data)) {
-               mutex_unlock(&lli->lli_layout_mutex);
+       if (IS_ERR(op_data))
                RETURN(PTR_ERR(op_data));
                RETURN(PTR_ERR(op_data));
-       }
 
        /* have to enqueue one */
        memset(&it, 0, sizeof(it));
 
        /* have to enqueue one */
        memset(&it, 0, sizeof(it));
@@ -3856,10 +3826,51 @@ again:
        if (rc == 0) {
                /* set lock data in case this is a new lock */
                ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
        if (rc == 0) {
                /* set lock data in case this is a new lock */
                ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
-               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               rc = ll_layout_lock_set(&lockh, mode, inode);
                if (rc == -EAGAIN)
                        goto again;
        }
                if (rc == -EAGAIN)
                        goto again;
        }
+
+       RETURN(rc);
+}
+
+/**
+ * This function checks if there exists a LAYOUT lock on the client side,
+ * or enqueues it if it doesn't have one in cache.
+ *
+ * This function will not hold layout lock so it may be revoked any time after
+ * this function returns. Any operations depend on layout should be redone
+ * in that case.
+ *
+ * This function should be called before lov_io_init() to get an uptodate
+ * layout version, the caller should save the version number and after IO
+ * is finished, this function should be called again to verify that layout
+ * is not changed during IO time.
+ */
+int ll_layout_refresh(struct inode *inode, __u32 *gen)
+{
+       struct ll_inode_info    *lli = ll_i2info(inode);
+       struct ll_sb_info       *sbi = ll_i2sbi(inode);
+       int rc;
+       ENTRY;
+
+       *gen = ll_layout_version_get(lli);
+       if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK) || *gen != CL_LAYOUT_GEN_NONE)
+               RETURN(0);
+
+       /* sanity checks */
+       LASSERT(fid_is_sane(ll_inode2fid(inode)));
+       LASSERT(S_ISREG(inode->i_mode));
+
+       /* take layout lock mutex to enqueue layout lock exclusively. */
+       mutex_lock(&lli->lli_layout_mutex);
+
+       rc = ll_layout_refresh_locked(inode);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       *gen = ll_layout_version_get(lli);
+out:
        mutex_unlock(&lli->lli_layout_mutex);
 
        RETURN(rc);
        mutex_unlock(&lli->lli_layout_mutex);
 
        RETURN(rc);
index e693fa0..53b5d4e 100644 (file)
@@ -296,21 +296,3 @@ __u32 cl_fid_build_gen(const struct lu_fid *fid)
         gen = (fid_flatten(fid) >> 32);
         RETURN(gen);
 }
         gen = (fid_flatten(fid) >> 32);
         RETURN(gen);
 }
-
-/* lsm is unreliable after hsm implementation as layout can be changed at
- * any time. This is only to support old, non-clio-ized interfaces. It will
- * cause deadlock if clio operations are called with this extra layout refcount
- * because in case the layout changed during the IO, ll_layout_refresh() will
- * have to wait for the refcount to become zero to destroy the older layout.
- *
- * Notice that the lsm returned by this function may not be valid unless called
- * inside layout lock - MDS_INODELOCK_LAYOUT. */
-struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode)
-{
-       return lov_lsm_get(ll_i2info(inode)->lli_clob);
-}
-
-void inline ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm)
-{
-       lov_lsm_put(ll_i2info(inode)->lli_clob, lsm);
-}
index ecc17ad..36be655 100644 (file)
@@ -1493,11 +1493,6 @@ struct if_quotactl_18 {
 /* End compatibility for old (1.8) compiled userspace quota code */
 #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0) */
 
 /* End compatibility for old (1.8) compiled userspace quota code */
 #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0) */
 
-enum {
-       LL_LAYOUT_GEN_NONE  = ((__u32)-2),      /* layout lock was cancelled */
-       LL_LAYOUT_GEN_EMPTY = ((__u32)-1)       /* for empty layout */
-};
-
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 int ll_layout_restore(struct inode *inode, loff_t start, __u64 length);
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 int ll_layout_restore(struct inode *inode, loff_t start, __u64 length);
index 29d8633..4e7cf0c 100644 (file)
@@ -966,7 +966,7 @@ void ll_lli_init(struct ll_inode_info *lli)
        spin_lock_init(&lli->lli_agl_lock);
        lli->lli_has_smd = false;
        spin_lock_init(&lli->lli_layout_lock);
        spin_lock_init(&lli->lli_agl_lock);
        lli->lli_has_smd = false;
        spin_lock_init(&lli->lli_layout_lock);
-       ll_layout_version_set(lli, LL_LAYOUT_GEN_NONE);
+       ll_layout_version_set(lli, CL_LAYOUT_GEN_NONE);
        lli->lli_clob = NULL;
 
        init_rwsem(&lli->lli_xattrs_list_rwsem);
        lli->lli_clob = NULL;
 
        init_rwsem(&lli->lli_xattrs_list_rwsem);
@@ -1651,14 +1651,29 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import)
         * but other attributes must be set
         */
        if (S_ISREG(inode->i_mode)) {
         * but other attributes must be set
         */
        if (S_ISREG(inode->i_mode)) {
-               struct lov_stripe_md *lsm;
+               struct cl_layout cl = {
+                       .cl_is_released = false,
+               };
+               struct lu_env *env;
+               int refcheck;
                __u32 gen;
 
                __u32 gen;
 
-               ll_layout_refresh(inode, &gen);
-               lsm = ccc_inode_lsm_get(inode);
-               if (lsm && lsm->lsm_pattern & LOV_PATTERN_F_RELEASED)
-                       file_is_released = true;
-               ccc_inode_lsm_put(inode, lsm);
+               rc = ll_layout_refresh(inode, &gen);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               /* XXX: the only place we need to know the layout type,
+                * this will be removed by a later patch. -Jinshan */
+               env = cl_env_get(&refcheck);
+               if (IS_ERR(env))
+                       GOTO(out, rc = PTR_ERR(env));
+
+               rc = cl_object_layout_get(env, lli->lli_clob, &cl);
+               cl_env_put(env, &refcheck);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               file_is_released = cl.cl_is_released;
 
                if (!hsm_import && attr->ia_valid & ATTR_SIZE) {
                        if (file_is_released) {
 
                if (!hsm_import && attr->ia_valid & ATTR_SIZE) {
                        if (file_is_released) {
index 7d98e49..4bd6554 100644 (file)
@@ -339,20 +339,8 @@ static inline struct vvp_lock *cl2vvp_lock(const struct cl_lock_slice *slice)
        ((void)sizeof(env), (void)sizeof(clob), (void)sizeof !!(expr))
 #endif /* CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
 
        ((void)sizeof(env), (void)sizeof(clob), (void)sizeof !!(expr))
 #endif /* CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
 
-/**
- * New interfaces to get and put lov_stripe_md from lov layer. This violates
- * layering because lov_stripe_md is supposed to be a private data in lov.
- *
- * NB: If you find you have to use these interfaces for your new code, please
- * think about it again. These interfaces may be removed in the future for
- * better layering. */
-struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj);
-void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm);
 int lov_read_and_clear_async_rc(struct cl_object *clob);
 
 int lov_read_and_clear_async_rc(struct cl_object *clob);
 
-struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode);
-void ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm);
-
 int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
                struct cl_io *io);
 int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io);
 int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
                struct cl_io *io);
 int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io);
index 9e723e2..29bba00 100644 (file)
@@ -137,7 +137,7 @@ static int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
                CDEBUG(D_VFSTRACE, DFID ": losing layout lock\n",
                       PFID(&lli->lli_fid));
 
                CDEBUG(D_VFSTRACE, DFID ": losing layout lock\n",
                       PFID(&lli->lli_fid));
 
-               ll_layout_version_set(lli, LL_LAYOUT_GEN_NONE);
+               ll_layout_version_set(lli, CL_LAYOUT_GEN_NONE);
 
                /* Clean up page mmap for this inode.
                 * The reason for us to do this is that if the page has
 
                /* Clean up page mmap for this inode.
                 * The reason for us to do this is that if the page has
index 18429ab..9fea17e 100644 (file)
@@ -451,160 +451,150 @@ out:
         return rc;
 }
 
         return rc;
 }
 
-ssize_t ll_getxattr(struct dentry *dentry, const char *name,
-                    void *buffer, size_t size)
+static ssize_t ll_getxattr_lov(struct inode *inode, void *buf, size_t buf_size)
 {
 {
-        struct inode *inode = dentry->d_inode;
+       ssize_t rc;
 
 
-        LASSERT(inode);
-        LASSERT(name);
+       if (S_ISREG(inode->i_mode)) {
+               struct cl_object *obj = ll_i2info(inode)->lli_clob;
+               struct lu_env *env;
+               struct cl_layout cl = {
+                       .cl_buf.lb_buf = buf,
+                       .cl_buf.lb_len = buf_size,
+               };
+               int refcheck;
+
+               if (obj == NULL)
+                       RETURN(-ENODATA);
+
+               env = cl_env_get(&refcheck);
+               if (IS_ERR(env))
+                       RETURN(PTR_ERR(env));
+
+               rc = cl_object_layout_get(env, obj, &cl);
+               if (rc < 0)
+                       GOTO(out_env, rc);
 
 
-       CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), xattr %s\n",
-              PFID(ll_inode2fid(inode)), inode, name);
+               if (cl.cl_size == 0)
+                       GOTO(out_env, rc = -ENODATA);
 
 
-        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETXATTR, 1);
+               rc = cl.cl_size;
 
 
-        if ((strncmp(name, XATTR_TRUSTED_PREFIX,
-                     sizeof(XATTR_TRUSTED_PREFIX) - 1) == 0 &&
-             strcmp(name + sizeof(XATTR_TRUSTED_PREFIX) - 1, "lov") == 0) ||
-            (strncmp(name, XATTR_LUSTRE_PREFIX,
-                     sizeof(XATTR_LUSTRE_PREFIX) - 1) == 0 &&
-             strcmp(name + sizeof(XATTR_LUSTRE_PREFIX) - 1, "lov") == 0)) {
-               struct lov_stripe_md *lsm;
-                struct lov_user_md *lump;
-                struct lov_mds_md *lmm = NULL;
-                struct ptlrpc_request *request = NULL;
-                int rc = 0, lmmsize = 0;
-
-                if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
-                        return -ENODATA;
-
-               lsm = ccc_inode_lsm_get(inode);
-               if (lsm == NULL) {
-                       if (S_ISDIR(inode->i_mode)) {
-                               rc = ll_dir_getstripe(inode, (void **)&lmm,
-                                                     &lmmsize, &request, 0);
-                       } else {
-                               rc = -ENODATA;
-                       }
-               } else {
-                       /* LSM is present already after lookup/getattr call.
-                        * we need to grab layout lock once it is implemented */
-                       rc = obd_packmd(ll_i2dtexp(inode), &lmm, lsm);
-                       lmmsize = rc;
-               }
-               ccc_inode_lsm_put(inode, lsm);
-
-                if (rc < 0)
-                       GOTO(out, rc);
-
-                if (size == 0) {
-                        /* used to call ll_get_max_mdsize() forward to get
-                         * the maximum buffer size, while some apps (such as
-                         * rsync 3.0.x) care much about the exact xattr value
-                         * size */
-                        rc = lmmsize;
-                        GOTO(out, rc);
-                }
+               if (buf_size == 0)
+                       GOTO(out_env, rc);
 
 
-                if (size < lmmsize) {
-                        CERROR("server bug: replied size %d > %d for %s (%s)\n",
-                               lmmsize, (int)size, dentry->d_name.name, name);
-                        GOTO(out, rc = -ERANGE);
-                }
+               LASSERT(buf != NULL && rc <= buf_size);
 
 
-                lump = (struct lov_user_md *)buffer;
-                memcpy(lump, lmm, lmmsize);
-               /* do not return layout gen for getxattr otherwise it would
-                * confuse tar --xattr by recognizing layout gen as stripe
-                * offset when the file is restored. See LU-2809. */
-               lump->lmm_layout_gen = 0;
+               /* Do not return layout gen for getxattr() since
+                * otherwise it would confuse tar --xattr by
+                * recognizing layout gen as stripe offset when the
+                * file is restored. See LU-2809. */
+               ((struct lov_mds_md *)buf)->lmm_layout_gen = 0;
+out_env:
+               cl_env_put(env, &refcheck);
 
 
-                rc = lmmsize;
-out:
-                if (request)
-                        ptlrpc_req_finished(request);
-                else if (lmm)
-                        obd_free_diskmd(ll_i2dtexp(inode), &lmm);
-                return(rc);
-        }
+               RETURN(rc);
+       } else if (S_ISDIR(inode->i_mode)) {
+               struct lov_mds_md *lmm = NULL;
+               int lmm_size = 0;
+               struct ptlrpc_request *req = NULL;
 
 
-        return ll_getxattr_common(inode, name, buffer, size, OBD_MD_FLXATTR);
+               rc = ll_dir_getstripe(inode, (void **)&lmm, &lmm_size,
+                                     &req, 0);
+               if (rc < 0)
+                       GOTO(out_req, rc);
+
+               if (buf_size == 0)
+                       GOTO(out_req, rc = lmm_size);
+
+               if (buf_size < lmm_size)
+                       GOTO(out_req, rc = -ERANGE);
+
+               memcpy(buf, lmm, lmm_size);
+               GOTO(out_req, rc = lmm_size);
+out_req:
+               if (req != NULL)
+                       ptlrpc_req_finished(req);
+
+               return rc;
+       } else {
+               RETURN(-ENODATA);
+       }
 }
 
 }
 
-ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size)
+ssize_t ll_getxattr(struct dentry *dentry, const char *name, void *buf,
+                   size_t buf_size)
 {
 {
-        struct inode *inode = dentry->d_inode;
-        int rc = 0, rc2 = 0;
-        struct lov_mds_md *lmm = NULL;
-        struct ptlrpc_request *request = NULL;
-        int lmmsize;
+       struct inode *inode = dentry->d_inode;
 
 
-        LASSERT(inode);
+       LASSERT(inode);
+       LASSERT(name);
+
+       CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), xattr %s\n",
+              PFID(ll_inode2fid(inode)), inode, name);
+
+       ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETXATTR, 1);
+
+       if (strcmp(name, XATTR_LUSTRE_LOV) == 0 ||
+           strcmp(name, XATTR_NAME_LOV) == 0)
+               return ll_getxattr_lov(inode, buf, buf_size);
+       else
+               return ll_getxattr_common(inode, name, buf, buf_size,
+                                         OBD_MD_FLXATTR);
+}
+
+ssize_t ll_listxattr(struct dentry *dentry, char *buf, size_t buf_size)
+{
+       struct inode *inode = dentry->d_inode;
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       char *xattr_name;
+       ssize_t rc, rc2;
+       size_t len, rem;
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
               PFID(ll_inode2fid(inode)), inode);
 
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
               PFID(ll_inode2fid(inode)), inode);
 
-        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LISTXATTR, 1);
-
-        rc = ll_getxattr_common(inode, NULL, buffer, size, OBD_MD_FLXATTRLS);
-        if (rc < 0)
-                GOTO(out, rc);
-
-       if (buffer != NULL) {
-               struct ll_sb_info *sbi = ll_i2sbi(inode);
-               char *xattr_name = buffer;
-               int xlen, rem = rc;
-
-               while (rem > 0) {
-                       xlen = strnlen(xattr_name, rem - 1) + 1;
-                       rem -= xlen;
-                       if (xattr_type_filter(sbi,
-                                       get_xattr_type(xattr_name)) == 0) {
-                               /* skip OK xattr type
-                                * leave it in buffer
-                                */
-                               xattr_name += xlen;
-                               continue;
-                       }
-                       /* move up remaining xattrs in buffer
-                        * removing the xattr that is not OK
-                        */
-                       memmove(xattr_name, xattr_name + xlen, rem);
-                       rc -= xlen;
-               }
-       }
-       if (S_ISREG(inode->i_mode)) {
-               if (!ll_i2info(inode)->lli_has_smd)
-                       rc2 = -1;
-       } else if (S_ISDIR(inode->i_mode)) {
-               rc2 = ll_dir_getstripe(inode, (void **)&lmm, &lmmsize, &request,
-                                      0);
-       }
+       ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LISTXATTR, 1);
 
 
-        if (rc2 < 0) {
-                GOTO(out, rc2 = 0);
-        } else if (S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) {
-                const int prefix_len = sizeof(XATTR_LUSTRE_PREFIX) - 1;
-                const size_t name_len   = sizeof("lov") - 1;
-                const size_t total_len  = prefix_len + name_len + 1;
+       rc = ll_getxattr_common(inode, NULL, buf, buf_size, OBD_MD_FLXATTRLS);
+       if (rc < 0)
+               RETURN(rc);
 
 
-               if (((rc + total_len) > size) && (buffer != NULL)) {
-                       ptlrpc_req_finished(request);
-                       return -ERANGE;
-               }
+       /* If we're being called to get the size of the xattr list
+        * (buf_size == 0) then just assume that a lustre.lov xattr
+        * exists. */
+       if (buf_size == 0)
+               RETURN(rc + sizeof(XATTR_LUSTRE_LOV));
+
+       xattr_name = buf;
+       rem = rc;
 
 
-               if (buffer != NULL) {
-                       buffer += rc;
-                       memcpy(buffer, XATTR_LUSTRE_PREFIX, prefix_len);
-                       memcpy(buffer + prefix_len, "lov", name_len);
-                       buffer[prefix_len + name_len] = '\0';
+       while (rem > 0) {
+               len = strnlen(xattr_name, rem - 1) + 1;
+               rem -= len;
+               if (xattr_type_filter(sbi, get_xattr_type(xattr_name)) == 0) {
+                       /* Skip OK xattr type, leave it in buffer. */
+                       xattr_name += len;
+                       continue;
                }
                }
-               rc2 = total_len;
+
+               /* Move up remaining xattrs in buffer removing the
+                * xattr that is not OK. */
+               memmove(xattr_name, xattr_name + len, rem);
+               rc -= len;
        }
        }
-out:
-        ptlrpc_req_finished(request);
-        rc = rc + rc2;
 
 
-        return rc;
+       rc2 = ll_getxattr_lov(inode, NULL, 0);
+       if (rc2 == -ENODATA)
+               RETURN(rc);
+
+       if (rc2 < 0)
+               RETURN(rc2);
+
+       if (buf_size < rc + sizeof(XATTR_LUSTRE_LOV))
+               RETURN(-ERANGE);
+
+       memcpy(buf + rc, XATTR_LUSTRE_LOV, sizeof(XATTR_LUSTRE_LOV));
+
+       RETURN(rc + sizeof(XATTR_LUSTRE_LOV));
 }
 }
index 525a00c..e5c4618 100644 (file)
@@ -185,6 +185,8 @@ int lov_del_target(struct obd_device *obd, __u32 index,
 int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
 
 /* lov_pack.c */
 int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
 
 /* lov_pack.c */
+ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
+                    size_t buf_size);
 int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm,
                struct lov_stripe_md *lsm);
 int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
 int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm,
                struct lov_stripe_md *lsm);
 int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
index afe3b9f..0c8bf7e 100644 (file)
@@ -81,28 +81,11 @@ struct lov_layout_operations {
 
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
 
 
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
 
-struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
-{
-       struct lu_object *luobj;
-       struct lov_stripe_md *lsm = NULL;
-
-       if (clobj == NULL)
-               return NULL;
-
-       luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
-                                &lov_device_type);
-       if (luobj != NULL)
-               lsm = lov_lsm_addref(lu2lov(luobj));
-       return lsm;
-}
-EXPORT_SYMBOL(lov_lsm_get);
-
-void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
+static void lov_lsm_put(struct lov_stripe_md *lsm)
 {
        if (lsm != NULL)
                lov_free_memmd(&lsm);
 }
 {
        if (lsm != NULL)
                lov_free_memmd(&lsm);
 }
-EXPORT_SYMBOL(lov_lsm_put);
 
 /*****************************************************************************
  *
 
 /*****************************************************************************
  *
@@ -1441,8 +1424,10 @@ obj_put:
 out:
        if (fm_local != NULL)
                OBD_FREE_LARGE(fm_local, buffer_size);
 out:
        if (fm_local != NULL)
                OBD_FREE_LARGE(fm_local, buffer_size);
-       lov_lsm_put(obj, lsm);
-       RETURN(rc);
+
+       lov_lsm_put(lsm);
+
+       return rc;
 }
 
 static int lov_dispatch_obd_info_get(const struct lu_env *env,
 }
 
 static int lov_dispatch_obd_info_get(const struct lu_env *env,
@@ -1565,7 +1550,7 @@ static int lov_object_data_version(const struct lu_env *env,
 out_obdo:
        OBD_FREE_PTR(obdo);
 out:
 out_obdo:
        OBD_FREE_PTR(obdo);
 out:
-       lov_lsm_put(obj, lsm);
+       lov_lsm_put(lsm);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -1582,10 +1567,38 @@ static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
                RETURN(-ENODATA);
 
        rc = lov_getstripe(cl2lov(obj), lsm, lum);
                RETURN(-ENODATA);
 
        rc = lov_getstripe(cl2lov(obj), lsm, lum);
-       lov_lsm_put(obj, lsm);
+       lov_lsm_put(lsm);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+static int lov_object_layout_get(const struct lu_env *env,
+                                struct cl_object *obj,
+                                struct cl_layout *cl)
+{
+       struct lov_object *lov = cl2lov(obj);
+       struct lov_stripe_md *lsm = lov_lsm_addref(lov);
+       struct lu_buf *buf = &cl->cl_buf;
+       ssize_t rc;
+       ENTRY;
+
+       if (lsm == NULL) {
+               cl->cl_size = 0;
+               cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
+               cl->cl_is_released = false;
+
+               RETURN(0);
+       }
+
+       cl->cl_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
+       cl->cl_layout_gen = lsm->lsm_layout_gen;
+       cl->cl_is_released = lsm_is_released(lsm);
+
+       rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
+       lov_lsm_put(lsm);
+
+       RETURN(rc < 0 ? rc : 0);
+}
+
 static int lov_object_find_cbdata(const struct lu_env *env,
                                  struct cl_object *obj, ldlm_iterator_t iter,
                                  void *data)
 static int lov_object_find_cbdata(const struct lu_env *env,
                                  struct cl_object *obj, ldlm_iterator_t iter,
                                  void *data)
@@ -1607,6 +1620,7 @@ static const struct cl_object_operations lov_ops = {
        .coo_attr_update  = lov_attr_update,
        .coo_conf_set     = lov_conf_set,
        .coo_getstripe    = lov_object_getstripe,
        .coo_attr_update  = lov_attr_update,
        .coo_conf_set     = lov_conf_set,
        .coo_getstripe    = lov_object_getstripe,
+       .coo_layout_get   = lov_object_layout_get,
        .coo_find_cbdata  = lov_object_find_cbdata,
        .coo_fiemap       = lov_object_fiemap,
        .coo_data_version = lov_object_data_version,
        .coo_find_cbdata  = lov_object_find_cbdata,
        .coo_fiemap       = lov_object_fiemap,
        .coo_data_version = lov_object_data_version,
index 2a14e1f..34f872e 100644 (file)
@@ -121,6 +121,62 @@ void lov_dump_lmm(int level, void *lmm)
        }
 }
 
        }
 }
 
+/**
+ * Pack LOV striping metadata for disk storage format (in little
+ * endian byte order).
+ *
+ * This follows the getxattr() conventions. If \a buf_size is zero
+ * then return the size needed. If \a buf_size is too small then
+ * return -ERANGE. Otherwise return the size of the result.
+ */
+ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
+                    size_t buf_size)
+{
+       struct lov_mds_md_v1 *lmmv1 = buf;
+       struct lov_mds_md_v3 *lmmv3 = buf;
+       struct lov_ost_data_v1 *lmm_objects;
+       size_t lmm_size;
+       unsigned int i;
+       ENTRY;
+
+       lmm_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
+       if (buf_size == 0)
+               RETURN(lmm_size);
+
+       if (buf_size < lmm_size)
+               RETURN(-ERANGE);
+
+       /* lmmv1 and lmmv3 point to the same struct and have the
+        * same first fields
+        */
+       lmmv1->lmm_magic = cpu_to_le32(lsm->lsm_magic);
+       lmm_oi_cpu_to_le(&lmmv1->lmm_oi, &lsm->lsm_oi);
+       lmmv1->lmm_stripe_size = cpu_to_le32(lsm->lsm_stripe_size);
+       lmmv1->lmm_stripe_count = cpu_to_le16(lsm->lsm_stripe_count);
+       lmmv1->lmm_pattern = cpu_to_le32(lsm->lsm_pattern);
+       lmmv1->lmm_layout_gen = cpu_to_le16(lsm->lsm_layout_gen);
+
+       if (lsm->lsm_magic == LOV_MAGIC_V3) {
+               CLASSERT(sizeof(lsm->lsm_pool_name) ==
+                        sizeof(lmmv3->lmm_pool_name));
+               strlcpy(lmmv3->lmm_pool_name, lsm->lsm_pool_name,
+                       sizeof(lmmv3->lmm_pool_name));
+               lmm_objects = lmmv3->lmm_objects;
+       } else {
+               lmm_objects = lmmv1->lmm_objects;
+       }
+
+       for (i = 0; i < lsm->lsm_stripe_count; i++) {
+               struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+
+               ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi);
+               lmm_objects[i].l_ost_gen = cpu_to_le32(loi->loi_ost_gen);
+               lmm_objects[i].l_ost_idx = cpu_to_le32(loi->loi_ost_idx);
+       }
+
+       RETURN(lmm_size);
+}
+
 /* Pack LOV object metadata for disk storage.  It is packed in LE byte
  * order and is opaque to the networking layer.
  *
 /* Pack LOV object metadata for disk storage.  It is packed in LE byte
  * order and is opaque to the networking layer.
  *
@@ -132,13 +188,8 @@ void lov_dump_lmm(int level, void *lmm)
 static int lov_obd_packmd(struct lov_obd *lov, struct lov_mds_md **lmmp,
                          struct lov_stripe_md *lsm)
 {
 static int lov_obd_packmd(struct lov_obd *lov, struct lov_mds_md **lmmp,
                          struct lov_stripe_md *lsm)
 {
-        struct lov_mds_md_v1 *lmmv1;
-        struct lov_mds_md_v3 *lmmv3;
-        __u16 stripe_count;
-        struct lov_ost_data_v1 *lmm_objects;
-        int lmm_size, lmm_magic;
-        int i;
-       int cplen = 0;
+       __u16 stripe_count;
+       int lmm_size, lmm_magic;
         ENTRY;
 
         if (lsm) {
         ENTRY;
 
         if (lsm) {
@@ -197,49 +248,13 @@ static int lov_obd_packmd(struct lov_obd *lov, struct lov_mds_md **lmmp,
                         RETURN(-ENOMEM);
         }
 
                         RETURN(-ENOMEM);
         }
 
-        CDEBUG(D_INFO, "lov_packmd: LOV_MAGIC 0x%08X, lmm_size = %d \n",
-               lmm_magic, lmm_size);
-
-        lmmv1 = *lmmp;
-        lmmv3 = (struct lov_mds_md_v3 *)*lmmp;
-        if (lmm_magic == LOV_MAGIC_V3)
-                lmmv3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
-        else
-                lmmv1->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
-
-        if (!lsm)
-                RETURN(lmm_size);
-
-        /* lmmv1 and lmmv3 point to the same struct and have the
-         * same first fields
-         */
-       lmm_oi_cpu_to_le(&lmmv1->lmm_oi, &lsm->lsm_oi);
-        lmmv1->lmm_stripe_size = cpu_to_le32(lsm->lsm_stripe_size);
-        lmmv1->lmm_stripe_count = cpu_to_le16(stripe_count);
-        lmmv1->lmm_pattern = cpu_to_le32(lsm->lsm_pattern);
-        lmmv1->lmm_layout_gen = cpu_to_le16(lsm->lsm_layout_gen);
-        if (lsm->lsm_magic == LOV_MAGIC_V3) {
-               cplen = strlcpy(lmmv3->lmm_pool_name, lsm->lsm_pool_name,
-                               sizeof(lmmv3->lmm_pool_name));
-               if (cplen >= sizeof(lmmv3->lmm_pool_name))
-                       RETURN(-E2BIG);
-                lmm_objects = lmmv3->lmm_objects;
-        } else {
-                lmm_objects = lmmv1->lmm_objects;
-        }
+       CDEBUG(D_INFO, "lov_packmd: LOV_MAGIC 0x%08X, lmm_size = %d\n",
+              lmm_magic, lmm_size);
 
 
-       for (i = 0; i < stripe_count; i++) {
-               struct lov_oinfo *loi = lsm->lsm_oinfo[i];
-               /* XXX LOV STACKING call down to osc_packmd() to do packing */
-               LASSERTF(ostid_id(&loi->loi_oi) != 0, "lmm_oi "DOSTID
-                        " stripe %u/%u idx %u\n", POSTID(&lmmv1->lmm_oi),
-                        i, stripe_count, loi->loi_ost_idx);
-               ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi);
-               lmm_objects[i].l_ost_gen = cpu_to_le32(loi->loi_ost_gen);
-               lmm_objects[i].l_ost_idx = cpu_to_le32(loi->loi_ost_idx);
-       }
+       if (!lsm)
+               RETURN(lmm_size);
 
 
-       RETURN(lmm_size);
+       RETURN(lov_lsm_pack(lsm, *lmmp, lmm_size));
 }
 
 int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
 }
 
 int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
index b003952..a63b0cf 100644 (file)
@@ -459,6 +459,21 @@ int cl_object_data_version(const struct lu_env *env, struct cl_object *obj,
 }
 EXPORT_SYMBOL(cl_object_data_version);
 
 }
 EXPORT_SYMBOL(cl_object_data_version);
 
+int cl_object_layout_get(const struct lu_env *env, struct cl_object *obj,
+                        struct cl_layout *cl)
+{
+       struct lu_object_header *top = obj->co_lu.lo_header;
+       ENTRY;
+
+       list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
+               if (obj->co_ops->coo_layout_get != NULL)
+                       return obj->co_ops->coo_layout_get(env, obj, cl);
+       }
+
+       RETURN(-EOPNOTSUPP);
+}
+EXPORT_SYMBOL(cl_object_layout_get);
+
 /**
  * Helper function removing all object locks, and marking object for
  * deletion. All object pages must have been deleted at this point.
 /**
  * Helper function removing all object locks, and marking object for
  * deletion. All object pages must have been deleted at this point.