Whamcloud - gitweb
LU-5814 lov: remove LSM from struct lustre_md
[fs/lustre-release.git] / lustre / lov / lov_object.c
index afe3b9f..d32ecd1 100644 (file)
 
 #include "lov_cl_internal.h"
 
+static inline struct lov_device *lov_object_dev(struct lov_object *obj)
+{
+       return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
+}
+
 /** \addtogroup lov
  *  @{
  */
  */
 
 struct lov_layout_operations {
-        int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
-                        struct lov_object *lov,
-                        const struct cl_object_conf *conf,
-                        union lov_layout_state *state);
+       int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
+                       struct lov_object *lov, struct lov_stripe_md *lsm,
+                       const struct cl_object_conf *conf,
+                       union lov_layout_state *state);
        int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state);
         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
@@ -81,28 +86,11 @@ struct lov_layout_operations {
 
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
 
-struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
-{
-       struct lu_object *luobj;
-       struct lov_stripe_md *lsm = NULL;
-
-       if (clobj == NULL)
-               return NULL;
-
-       luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
-                                &lov_device_type);
-       if (luobj != NULL)
-               lsm = lov_lsm_addref(lu2lov(luobj));
-       return lsm;
-}
-EXPORT_SYMBOL(lov_lsm_get);
-
-void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
+static void lov_lsm_put(struct lov_stripe_md *lsm)
 {
        if (lsm != NULL)
                lov_free_memmd(&lsm);
 }
-EXPORT_SYMBOL(lov_lsm_put);
 
 /*****************************************************************************
  *
@@ -119,12 +107,12 @@ static void lov_install_empty(const struct lu_env *env,
          */
 }
 
-static int lov_init_empty(const struct lu_env *env,
-                          struct lov_device *dev, struct lov_object *lov,
-                          const struct cl_object_conf *conf,
-                          union  lov_layout_state *state)
+static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
+                         struct lov_object *lov, struct lov_stripe_md *lsm,
+                         const struct cl_object_conf *conf,
+                         union lov_layout_state *state)
 {
-        return 0;
+       return 0;
 }
 
 static void lov_install_raid0(const struct lu_env *env,
@@ -233,10 +221,10 @@ static int lov_page_slice_fixup(struct lov_object *lov,
        return cl_object_header(stripe)->coh_page_bufsize;
 }
 
-static int lov_init_raid0(const struct lu_env *env,
-                          struct lov_device *dev, struct lov_object *lov,
-                          const struct cl_object_conf *conf,
-                          union  lov_layout_state *state)
+static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
+                         struct lov_object *lov, struct lov_stripe_md *lsm,
+                         const struct cl_object_conf *conf,
+                         union lov_layout_state *state)
 {
         int result;
         int i;
@@ -244,7 +232,6 @@ static int lov_init_raid0(const struct lu_env *env,
         struct cl_object        *stripe;
         struct lov_thread_info  *lti     = lov_env_info(env);
         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
-        struct lov_stripe_md    *lsm     = conf->u.coc_md->lsm;
         struct lu_fid           *ofid    = &lti->lti_fid;
         struct lov_layout_raid0 *r0      = &state->raid0;
 
@@ -258,7 +245,7 @@ static int lov_init_raid0(const struct lu_env *env,
 
        LASSERT(lov->lo_lsm == NULL);
        lov->lo_lsm = lsm_addref(lsm);
-       r0->lo_nr  = lsm->lsm_stripe_count;
+       r0->lo_nr = lsm->lsm_stripe_count;
        LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
        lov->lo_layout_invalid = true;
@@ -319,12 +306,11 @@ out:
 }
 
 static int lov_init_released(const struct lu_env *env,
-                       struct lov_device *dev, struct lov_object *lov,
-                       const struct cl_object_conf *conf,
-                       union  lov_layout_state *state)
+                            struct lov_device *dev, struct lov_object *lov,
+                            struct lov_stripe_md *lsm,
+                            const struct cl_object_conf *conf,
+                            union lov_layout_state *state)
 {
-       struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
-
        LASSERT(lsm != NULL);
        LASSERT(lsm_is_released(lsm));
        LASSERT(lov->lo_lsm == NULL);
@@ -397,7 +383,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
          * ->lo_sub[] slot in lovsub_object_fini() */
        if (r0->lo_sub[idx] == los) {
                waiter = &lov_env_info(env)->lti_waiter;
-               init_waitqueue_entry_current(waiter);
+               init_waitqueue_entry(waiter, current);
                add_wait_queue(&bkt->lsb_marche_funebre, waiter);
                set_current_state(TASK_UNINTERRUPTIBLE);
                while (1) {
@@ -407,7 +393,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
                        spin_lock(&r0->lo_sub_lock);
                        if (r0->lo_sub[idx] == los) {
                                spin_unlock(&r0->lo_sub_lock);
-                               waitq_wait(waiter, TASK_UNINTERRUPTIBLE);
+                               schedule();
                        } else {
                                spin_unlock(&r0->lo_sub_lock);
                                set_current_state(TASK_RUNNING);
@@ -777,26 +763,21 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
 }
 
 static int lov_layout_change(const struct lu_env *unused,
-                            struct lov_object *lov,
+                            struct lov_object *lov, struct lov_stripe_md *lsm,
                             const struct cl_object_conf *conf)
 {
-       int result;
-       enum lov_layout_type llt = LLT_EMPTY;
+       enum lov_layout_type llt = lov_type(lsm);
        union lov_layout_state *state = &lov->u;
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
-
        void *cookie;
        struct lu_env *env;
        int refcheck;
+       int rc;
        ENTRY;
 
        LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
 
-       if (conf->u.coc_md != NULL)
-               llt = lov_type(conf->u.coc_md->lsm);
-       LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
-
        cookie = cl_env_reenter();
        env = cl_env_get(&refcheck);
        if (IS_ERR(env)) {
@@ -804,6 +785,8 @@ static int lov_layout_change(const struct lu_env *unused,
                RETURN(PTR_ERR(env));
        }
 
+       LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
+
        CDEBUG(D_INODE, DFID" from %s to %s\n",
               PFID(lu_object_fid(lov2lu(lov))),
               llt2str(lov->lo_type), llt2str(llt));
@@ -811,38 +794,40 @@ static int lov_layout_change(const struct lu_env *unused,
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
 
-       result = cl_object_prune(env, &lov->lo_cl);
-       if (result != 0)
-               GOTO(out, result);
+       rc = cl_object_prune(env, &lov->lo_cl);
+       if (rc != 0)
+               GOTO(out, rc);
 
-       result = old_ops->llo_delete(env, lov, &lov->u);
-       if (result == 0) {
-               old_ops->llo_fini(env, lov, &lov->u);
+       rc = old_ops->llo_delete(env, lov, &lov->u);
+       if (rc != 0)
+               GOTO(out, rc);
 
-               LASSERT(atomic_read(&lov->lo_active_ios) == 0);
+       old_ops->llo_fini(env, lov, &lov->u);
 
-               lov->lo_type = LLT_EMPTY;
-               /* page bufsize fixup */
-               cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
-                       lov_page_slice_fixup(lov, NULL);
+       LASSERT(atomic_read(&lov->lo_active_ios) == 0);
 
-               result = new_ops->llo_init(env,
-                                       lu2lov_dev(lov->lo_cl.co_lu.lo_dev),
-                                       lov, conf, state);
-               if (result == 0) {
-                       new_ops->llo_install(env, lov, state);
-                       lov->lo_type = llt;
-               } else {
-                       new_ops->llo_delete(env, lov, state);
-                       new_ops->llo_fini(env, lov, state);
-                       /* this file becomes an EMPTY file. */
-               }
+       lov->lo_type = LLT_EMPTY;
+
+       /* page bufsize fixup */
+       cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
+               lov_page_slice_fixup(lov, NULL);
+
+       rc = new_ops->llo_init(env, lov_object_dev(lov), lov, lsm, conf, state);
+       if (rc != 0) {
+               new_ops->llo_delete(env, lov, state);
+               new_ops->llo_fini(env, lov, state);
+               /* this file becomes an EMPTY file. */
+               GOTO(out, rc);
        }
 
+       new_ops->llo_install(env, lov, state);
+       lov->lo_type = llt;
+
 out:
        cl_env_put(env, &refcheck);
        cl_env_reexit(cookie);
-       RETURN(result);
+
+       RETURN(rc);
 }
 
 /*****************************************************************************
@@ -851,29 +836,43 @@ out:
  *
  */
 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
-                    const struct lu_object_conf *conf)
+                   const struct lu_object_conf *conf)
 {
-        struct lov_device            *dev   = lu2lov_dev(obj->lo_dev);
-        struct lov_object            *lov   = lu2lov(obj);
-        const struct cl_object_conf  *cconf = lu2cl_conf(conf);
-        union  lov_layout_state      *set   = &lov->u;
-        const struct lov_layout_operations *ops;
-        int result;
+       struct lov_object            *lov   = lu2lov(obj);
+       struct lov_device            *dev   = lov_object_dev(lov);
+       const struct cl_object_conf  *cconf = lu2cl_conf(conf);
+       union lov_layout_state       *set   = &lov->u;
+       const struct lov_layout_operations *ops;
+       struct lov_stripe_md *lsm = NULL;
+       int rc;
+       ENTRY;
 
-        ENTRY;
        init_rwsem(&lov->lo_type_guard);
        atomic_set(&lov->lo_active_ios, 0);
        init_waitqueue_head(&lov->lo_waitq);
-
        cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
 
-        /* no locking is necessary, as object is being created */
-       lov->lo_type = lov_type(cconf->u.coc_md->lsm);
-        ops = &lov_dispatch[lov->lo_type];
-        result = ops->llo_init(env, dev, lov, cconf, set);
-        if (result == 0)
-                ops->llo_install(env, lov, set);
-        RETURN(result);
+       if (cconf->u.coc_layout.lb_buf != NULL) {
+               lsm = lov_unpackmd(dev->ld_lov,
+                                  cconf->u.coc_layout.lb_buf,
+                                  cconf->u.coc_layout.lb_len);
+               if (IS_ERR(lsm))
+                       RETURN(PTR_ERR(lsm));
+       }
+
+       /* no locking is necessary, as object is being created */
+       lov->lo_type = lov_type(lsm);
+       ops = &lov_dispatch[lov->lo_type];
+       rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
+       if (rc != 0)
+               GOTO(out_lsm, rc);
+
+       ops->llo_install(env, lov, set);
+
+out_lsm:
+       lov_lsm_put(lsm);
+
+       RETURN(rc);
 }
 
 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
@@ -884,6 +883,15 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        int                      result = 0;
        ENTRY;
 
+       if (conf->coc_opc == OBJECT_CONF_SET &&
+           conf->u.coc_layout.lb_buf != NULL) {
+               lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
+                                  conf->u.coc_layout.lb_buf,
+                                  conf->u.coc_layout.lb_len);
+               if (IS_ERR(lsm))
+                       RETURN(PTR_ERR(lsm));
+       }
+
        lov_conf_lock(lov);
        if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
                lov->lo_layout_invalid = true;
@@ -902,8 +910,6 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
 
        LASSERT(conf->coc_opc == OBJECT_CONF_SET);
 
-       if (conf->u.coc_md != NULL)
-               lsm = conf->u.coc_md->lsm;
        if ((lsm == NULL && lov->lo_lsm == NULL) ||
            ((lsm != NULL && lov->lo_lsm != NULL) &&
             (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
@@ -919,12 +925,13 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                GOTO(out, result = -EBUSY);
        }
 
-       result = lov_layout_change(env, lov, conf);
+       result = lov_layout_change(env, lov, lsm, conf);
        lov->lo_layout_invalid = result != 0;
        EXIT;
 
 out:
        lov_conf_unlock(lov);
+       lov_lsm_put(lsm);
        CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
               PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
        RETURN(result);
@@ -1060,7 +1067,7 @@ static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
  * \param current_extent [in]  where to start copying in the extent array
  */
 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
-                                        struct ll_fiemap_extent *lcl_fm_ext,
+                                        struct fiemap_extent *lcl_fm_ext,
                                         int ost_index, unsigned int ext_count,
                                         int current_extent)
 {
@@ -1074,7 +1081,7 @@ static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
 
        /* Copy fm_extent's from fm_local to return buffer */
        to = (char *)fiemap + fiemap_count_to_size(current_extent);
-       memcpy(to, lcl_fm_ext, ext_count * sizeof(struct ll_fiemap_extent));
+       memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
 }
 
 #define FIEMAP_BUFFER_SIZE 4096
@@ -1167,7 +1174,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        struct cl_object        *subobj = NULL;
        struct lov_obd          *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
        struct fiemap           *fm_local = NULL;
-       struct ll_fiemap_extent *lcl_fm_ext;
+       struct fiemap_extent    *lcl_fm_ext;
        loff_t                  fm_start;
        loff_t                  fm_end;
        loff_t                  fm_length;
@@ -1203,7 +1210,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
                GOTO(out, rc = -ENOTSUPP);
 
        if (lsm_is_released(lsm)) {
-               if (fiemap->fm_start < fmkey->oa.o_size) {
+               if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
                        /**
                         * released file, return a minimal FIEMAP if
                         * request fits in file-size.
@@ -1211,12 +1218,13 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
                        fiemap->fm_mapped_extents = 1;
                        fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
                        if (fiemap->fm_start + fiemap->fm_length <
-                           fmkey->oa.o_size)
+                           fmkey->lfik_oa.o_size)
                                fiemap->fm_extents[0].fe_length =
                                        fiemap->fm_length;
                        else
                                fiemap->fm_extents[0].fe_length =
-                                       fmkey->oa.o_size - fiemap->fm_start;
+                                       fmkey->lfik_oa.o_size -
+                                       fiemap->fm_start;
                        fiemap->fm_extents[0].fe_flags |=
                                FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
                }
@@ -1236,11 +1244,11 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        fm_length = fiemap->fm_length;
        /* Calculate start stripe, last stripe and length of mapping */
        start_stripe = lov_stripe_number(lsm, fm_start);
-       fm_end = (fm_length == ~0ULL) ? fmkey->oa.o_size :
+       fm_end = (fm_length == ~0ULL) ? fmkey->lfik_oa.o_size :
                                        fm_start + fm_length - 1;
        /* If fm_length != ~0ULL but fm_start_fm_length-1 exceeds file size */
-       if (fm_end > fmkey->oa.o_size)
-               fm_end = fmkey->oa.o_size;
+       if (fm_end > fmkey->lfik_oa.o_size)
+               fm_end = fmkey->lfik_oa.o_size;
 
        last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
                                              start_stripe, &stripe_count);
@@ -1349,7 +1357,8 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
 
                        fm_local->fm_start = lun_start;
                        fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
-                       memcpy(&fmkey->fiemap, fm_local, sizeof(*fm_local));
+                       memcpy(&fmkey->lfik_fiemap, fm_local,
+                              sizeof(*fm_local));
                        *buflen = fiemap_count_to_size(
                                                fm_local->fm_extent_count);
 
@@ -1400,7 +1409,7 @@ inactive_tgt:
                        if (lov_stripe_size(lsm,
                                        lcl_fm_ext[ext_count - 1].fe_logical +
                                        lcl_fm_ext[ext_count - 1].fe_length,
-                                       cur_stripe) >= fmkey->oa.o_size)
+                                       cur_stripe) >= fmkey->lfik_oa.o_size)
                                ost_eof = true;
 
                        fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
@@ -1441,149 +1450,71 @@ obj_put:
 out:
        if (fm_local != NULL)
                OBD_FREE_LARGE(fm_local, buffer_size);
-       lov_lsm_put(obj, lsm);
-       RETURN(rc);
+
+       lov_lsm_put(lsm);
+
+       return rc;
 }
 
-static int lov_dispatch_obd_info_get(const struct lu_env *env,
-                                    struct cl_object *obj,
-                                    struct obd_info *oinfo,
-                                    struct ptlrpc_request_set *set)
+static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
+                               struct lov_user_md __user *lum)
 {
-       struct cl_object        *subobj = NULL;
-       struct lov_obd          *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
-       struct lov_request_set  *lovset;
-       struct list_head        *pos;
-       struct lov_request      *req;
-       int                     rc;
-       int                     rc2;
+       struct lov_object       *lov = cl2lov(obj);
+       struct lov_stripe_md    *lsm;
+       int                     rc = 0;
        ENTRY;
 
-       rc = lov_prep_getattr_set(lov2obd(lov)->obd_self_export, oinfo,
-                                 &lovset);
-       if (rc != 0)
-               RETURN(rc);
-
-       CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes.\n",
-              POSTID(&oinfo->oi_md->lsm_oi),
-              oinfo->oi_md->lsm_stripe_count,
-              oinfo->oi_md->lsm_stripe_size);
-
-       list_for_each(pos, &lovset->set_list) {
-               req = list_entry(pos, struct lov_request, rq_link);
-
-               CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx"
-                      "%u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
-                      POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
-               subobj = lov_find_subobj(env, cl2lov(obj), oinfo->oi_md,
-                                        req->rq_stripe);
-               if (IS_ERR(subobj))
-                       GOTO(errout, rc = PTR_ERR(subobj));
-
-               rc = cl_object_obd_info_get(env, subobj, &req->rq_oi, set);
-               cl_object_put(env, subobj);
-               if (rc != 0) {
-                       CERROR("%s: getattr objid "DOSTID" subobj"
-                              DOSTID" on OST idx %d: rc = %d\n",
-                              lov2obd(lov)->obd_name,
-                              POSTID(&oinfo->oi_oa->o_oi),
-                              POSTID(&req->rq_oi.oi_oa->o_oi),
-                              req->rq_idx, rc);
-                       GOTO(errout, rc);
-               }
-       }
+       lsm = lov_lsm_addref(lov);
+       if (lsm == NULL)
+               RETURN(-ENODATA);
 
-       if (!list_empty(&set->set_requests)) {
-               LASSERT(rc == 0);
-               LASSERT(set->set_interpret == NULL);
-               set->set_interpret = lov_getattr_interpret;
-               set->set_arg = lovset;
-               GOTO(out, rc);
-       }
-errout:
-       if (rc)
-               atomic_set(&lovset->set_completes, 0);
-       rc2 = lov_fini_getattr_set(lovset);
-       rc = rc != 0 ? rc : rc2;
-out:
+       rc = lov_getstripe(cl2lov(obj), lsm, lum);
+       lov_lsm_put(lsm);
        RETURN(rc);
 }
 
-static int lov_object_data_version(const struct lu_env *env,
-                                  struct cl_object *obj, __u64 *data_version,
-                                  int flags)
+static int lov_object_layout_get(const struct lu_env *env,
+                                struct cl_object *obj,
+                                struct cl_layout *cl)
 {
-       struct ptlrpc_request_set       *set;
-       struct obd_info                 oinfo =  { { { 0 } } };
-       struct obdo                     *obdo = NULL;
-       struct lov_stripe_md            *lsm;
-       int                             rc;
+       struct lov_object *lov = cl2lov(obj);
+       struct lov_stripe_md *lsm = lov_lsm_addref(lov);
+       struct lu_buf *buf = &cl->cl_buf;
+       ssize_t rc;
        ENTRY;
 
-       lsm = lov_lsm_addref(cl2lov(obj));
-       if (!lsm_has_objects(lsm)) {
-               /* If no stripe, we consider version is 0. */
-               *data_version = 0;
-               GOTO(out, rc = 0);
+       if (lsm == NULL) {
+               cl->cl_size = 0;
+               cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
+               cl->cl_is_released = false;
+
+               RETURN(0);
        }
 
-       OBD_ALLOC_PTR(obdo);
-       if (obdo == NULL)
-               GOTO(out, rc = -ENOMEM);
+       cl->cl_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
+       cl->cl_layout_gen = lsm->lsm_layout_gen;
+       cl->cl_is_released = lsm_is_released(lsm);
 
-       oinfo.oi_md = lsm;
-       oinfo.oi_oa = obdo;
-       obdo->o_oi = lsm->lsm_oi;
-       obdo->o_mode = S_IFREG;
-       obdo->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLTYPE;
-       if (flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
-               obdo->o_valid |= OBD_MD_FLFLAGS;
-               obdo->o_flags |= OBD_FL_SRVLOCK;
-               if (flags & LL_DV_WR_FLUSH)
-                       obdo->o_flags |= OBD_FL_FLUSH;
-       }
+       rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
+       lov_lsm_put(lsm);
 
-       set = ptlrpc_prep_set();
-       if (set == NULL)
-               GOTO(out_obdo, rc = -ENOMEM);
-
-       rc = lov_dispatch_obd_info_get(env, obj, &oinfo, set);
-       if (rc == 0)
-               rc = ptlrpc_set_wait(set);
-       ptlrpc_set_destroy(set);
-       if (rc == 0) {
-               oinfo.oi_oa->o_valid &= OBD_MD_FLDATAVERSION | OBD_MD_FLFLAGS;
-               if (flags & LL_DV_WR_FLUSH &&
-                   !(oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
-                     oinfo.oi_oa->o_flags & OBD_FL_FLUSH))
-                       rc = -EOPNOTSUPP;
-               else if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
-                       rc = -EOPNOTSUPP;
-               else
-                       *data_version = obdo->o_data_version;
-       }
-out_obdo:
-       OBD_FREE_PTR(obdo);
-out:
-       lov_lsm_put(obj, lsm);
-       RETURN(rc);
+       RETURN(rc < 0 ? rc : 0);
 }
 
-static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
-                               struct lov_user_md __user *lum)
+static loff_t lov_object_maxbytes(struct cl_object *obj)
 {
-       struct lov_object       *lov = cl2lov(obj);
-       struct lov_stripe_md    *lsm;
-       int                     rc = 0;
-       ENTRY;
+       struct lov_object *lov = cl2lov(obj);
+       struct lov_stripe_md *lsm = lov_lsm_addref(lov);
+       loff_t maxbytes;
 
-       lsm = lov_lsm_addref(lov);
        if (lsm == NULL)
-               RETURN(-ENODATA);
+               return LLONG_MAX;
 
-       rc = lov_getstripe(cl2lov(obj), lsm, lum);
-       lov_lsm_put(obj, lsm);
-       RETURN(rc);
+       maxbytes = lsm->lsm_maxbytes;
+
+       lov_lsm_put(lsm);
+
+       return maxbytes;
 }
 
 static int lov_object_find_cbdata(const struct lu_env *env,
@@ -1607,9 +1538,10 @@ static const struct cl_object_operations lov_ops = {
        .coo_attr_update  = lov_attr_update,
        .coo_conf_set     = lov_conf_set,
        .coo_getstripe    = lov_object_getstripe,
+       .coo_layout_get   = lov_object_layout_get,
+       .coo_maxbytes     = lov_object_maxbytes,
        .coo_find_cbdata  = lov_object_find_cbdata,
        .coo_fiemap       = lov_object_fiemap,
-       .coo_data_version = lov_object_data_version,
 };
 
 static const struct lu_object_operations lov_lu_obj_ops = {