Whamcloud - gitweb
LU-5683 clio: add CIT_DATA_VERSION 49/14649/5
authorJohn L. Hammond <john.hammond@intel.com>
Thu, 30 Apr 2015 20:57:59 +0000 (15:57 -0500)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 20 May 2015 13:14:44 +0000 (13:14 +0000)
Add a new cl_io type CIT_DATA_VERSION to get file data version. Remove
the old data version code (cl_object_obd_info_get() and
cl_object_data_version()). Add test_37() to sanity-hsm.sh to ensure
that data version is computed under the correct layout.

Signed-off-by: John L. Hammond <john.hammond@intel.com>
Change-Id: If371c8e8c18e91d476f94de2e0170199252c088d
Reviewed-on: http://review.whamcloud.com/14649
Tested-by: Jenkins
Reviewed-by: Henri Doreau <henri.doreau@cea.fr>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
16 files changed:
lustre/include/cl_object.h
lustre/include/obd.h
lustre/llite/file.c
lustre/lov/lov_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_merge.c
lustre/lov/lov_obd.c
lustre/lov/lov_object.c
lustre/lov/lov_request.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_object.c
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_object.c
lustre/osc/osc_request.c
lustre/tests/sanity-hsm.sh

index 52860d1..2654ec4 100644 (file)
@@ -435,17 +435,6 @@ struct cl_object_operations {
                          struct ll_fiemap_info_key *fmkey,
                          struct fiemap *fiemap, size_t *buflen);
        /**
-        * Get attributes of the object from server. (top->bottom)
-        */
-       int (*coo_obd_info_get)(const struct lu_env *env, struct cl_object *obj,
-                               struct obd_info *oinfo,
-                               struct ptlrpc_request_set *set);
-       /**
-        * Get data version of the object. (top->bottom)
-        */
-       int (*coo_data_version)(const struct lu_env *env, struct cl_object *obj,
-                               __u64 *version, int flags);
-       /**
         * Get layout and generation of the object.
         */
        int (*coo_layout_get)(const struct lu_env *env, struct cl_object *obj,
@@ -1418,6 +1407,8 @@ enum cl_io_type {
         CIT_WRITE,
         /** truncate, utime system calls */
         CIT_SETATTR,
+       /** get data version */
+       CIT_DATA_VERSION,
         /**
          * page fault handling
          */
@@ -1820,6 +1811,10 @@ struct cl_io {
                        const struct lu_fid     *sa_parent_fid;
                        struct obd_capa         *sa_capa;
                } ci_setattr;
+               struct cl_data_version_io {
+                       u64 dv_data_version;
+                       int dv_flags;
+               } ci_data_version;
                 struct cl_fault_io {
                         /** page index within file. */
                         pgoff_t         ft_index;
@@ -2229,11 +2224,6 @@ int cl_object_find_cbdata(const struct lu_env *env, struct cl_object *obj,
 int cl_object_fiemap(const struct lu_env *env, struct cl_object *obj,
                     struct ll_fiemap_info_key *fmkey, struct fiemap *fiemap,
                     size_t *buflen);
-int cl_object_obd_info_get(const struct lu_env *env, struct cl_object *obj,
-                          struct obd_info *oinfo,
-                          struct ptlrpc_request_set *set);
-int cl_object_data_version(const struct lu_env *env, struct cl_object *obj,
-                          __u64 *version, int flags);
 int cl_object_layout_get(const struct lu_env *env, struct cl_object *obj,
                         struct cl_layout *cl);
 loff_t cl_object_maxbytes(struct cl_object *obj);
index f82f3d1..5613fc4 100644 (file)
@@ -145,8 +145,6 @@ struct obd_info {
            - while setattr, the flags used for distinguish punch operation
          */
        __u64                   oi_flags;
-        /* lsm data specific for every OSC. */
-        struct lov_stripe_md   *oi_md;
         /* obdo data specific for every OSC, if needed at all. */
         struct obdo            *oi_oa;
         /* statfs data specific for every OSC, if needed at all. */
index 3b76ada..5d7f902 100644 (file)
@@ -1807,13 +1807,16 @@ error:
  */
 int ll_data_version(struct inode *inode, __u64 *data_version, int flags)
 {
-       struct lu_env   *env;
-       int             refcheck;
-       int             rc;
+       struct cl_object *obj = ll_i2info(inode)->lli_clob;
+       struct lu_env *env;
+       struct cl_io *io;
+       int refcheck;
+       int result;
+
        ENTRY;
 
        /* If no file object initialized, we consider its version is 0. */
-       if (ll_i2info(inode)->lli_clob == NULL) {
+       if (obj == NULL) {
                *data_version = 0;
                RETURN(0);
        }
@@ -1822,10 +1825,27 @@ int ll_data_version(struct inode *inode, __u64 *data_version, int flags)
        if (IS_ERR(env))
                RETURN(PTR_ERR(env));
 
-       rc = cl_object_data_version(env, ll_i2info(inode)->lli_clob,
-                                   data_version, flags);
+       io = vvp_env_thread_io(env);
+       io->ci_obj = obj;
+       io->u.ci_data_version.dv_data_version = 0;
+       io->u.ci_data_version.dv_flags = flags;
+
+restart:
+       if (cl_io_init(env, io, CIT_DATA_VERSION, io->ci_obj) == 0)
+               result = cl_io_loop(env, io);
+       else
+               result = io->ci_result;
+
+       *data_version = io->u.ci_data_version.dv_data_version;
+
+       cl_io_fini(env, io);
+
+       if (unlikely(io->ci_need_restart))
+               goto restart;
+
        cl_env_put(env, &refcheck);
-       RETURN(rc);
+
+       RETURN(result);
 }
 
 /*
index 525ff2f..ddb71fa 100644 (file)
@@ -127,8 +127,6 @@ static inline void lov_put_reqset(struct lov_request_set *set)
         (char *)((lv)->lov_tgts[index]->ltd_uuid.uuid)
 
 /* lov_merge.c */
-void lov_merge_attrs(struct obdo *tgt, struct obdo *src, u64 valid,
-                    struct lov_stripe_md *lsm, int stripeno, int *set);
 int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
                       struct ost_lvb *lvb, __u64 *kms_place);
 
@@ -150,12 +148,7 @@ void lov_set_add_req(struct lov_request *req, struct lov_request_set *set);
 int lov_set_finished(struct lov_request_set *set, int idempotent);
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc);
-int lov_update_common_set(struct lov_request_set *set,
-                          struct lov_request *req, int rc);
 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx);
-int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
-                         struct lov_request_set **reqset);
-int lov_fini_getattr_set(struct lov_request_set *set);
 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
                         struct lov_request_set **reqset);
 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
@@ -181,7 +174,6 @@ int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
                             __u32 *indexp, int *genp);
 int lov_del_target(struct obd_device *obd, __u32 index,
                    struct obd_uuid *uuidp, int gen);
-int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
 
 /* lov_pack.c */
 ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
index 4582529..f227c6e 100644 (file)
@@ -106,6 +106,12 @@ static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
                 }
                 break;
         }
+       case CIT_DATA_VERSION: {
+               io->u.ci_data_version.dv_data_version = 0;
+               io->u.ci_data_version.dv_flags =
+                       parent->u.ci_data_version.dv_flags;
+               break;
+       }
         case CIT_FAULT: {
                 struct cl_object *obj = parent->ci_obj;
                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
@@ -349,6 +355,11 @@ static int lov_io_slice_init(struct lov_io *lio,
                 lio->lis_endpos = OBD_OBJECT_EOF;
                 break;
 
+       case CIT_DATA_VERSION:
+               lio->lis_pos = 0;
+               lio->lis_endpos = OBD_OBJECT_EOF;
+               break;
+
         case CIT_FAULT: {
                 pgoff_t index = io->u.ci_fault.ft_index;
                 lio->lis_pos = cl_offset(io->ci_obj, index);
@@ -555,6 +566,27 @@ static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
         LASSERT(rc == 0);
 }
 
+static void
+lov_io_data_version_end(const struct lu_env *env, const struct cl_io_slice *ios)
+{
+       struct lov_io *lio = cl2lov_io(env, ios);
+       struct cl_io *parent = lio->lis_cl.cis_io;
+       struct lov_io_sub *sub;
+
+       ENTRY;
+       list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
+               lov_io_end_wrapper(env, sub->sub_io);
+
+               parent->u.ci_data_version.dv_data_version +=
+                       sub->sub_io->u.ci_data_version.dv_data_version;
+
+               if (parent->ci_result == 0)
+                       parent->ci_result = sub->sub_io->ci_result;
+       }
+
+       EXIT;
+}
+
 static void lov_io_iter_fini(const struct lu_env *env,
                              const struct cl_io_slice *ios)
 {
@@ -865,6 +897,15 @@ static const struct cl_io_operations lov_io_ops = {
                         .cio_start     = lov_io_start,
                         .cio_end       = lov_io_end
                 },
+               [CIT_DATA_VERSION] = {
+                       .cio_fini       = lov_io_fini,
+                       .cio_iter_init  = lov_io_iter_init,
+                       .cio_iter_fini  = lov_io_iter_fini,
+                       .cio_lock       = lov_io_lock,
+                       .cio_unlock     = lov_io_unlock,
+                       .cio_start      = lov_io_start,
+                       .cio_end        = lov_io_data_version_end,
+               },
                 [CIT_FAULT] = {
                         .cio_fini      = lov_io_fini,
                         .cio_iter_init = lov_io_iter_init,
@@ -1010,6 +1051,7 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
                break;
        case CIT_FSYNC:
        case CIT_SETATTR:
+       case CIT_DATA_VERSION:
                result = +1;
                break;
        case CIT_WRITE:
@@ -1046,6 +1088,7 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
                LASSERTF(0, "invalid type %d\n", io->ci_type);
        case CIT_MISC:
        case CIT_FSYNC:
+       case CIT_DATA_VERSION:
                result = 1;
                break;
        case CIT_SETATTR:
index 7b1601c..d91cc2f 100644 (file)
@@ -110,53 +110,3 @@ int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
         lvb->lvb_ctime = current_ctime;
         RETURN(rc);
 }
-
-void lov_merge_attrs(struct obdo *tgt, struct obdo *src, u64 valid,
-                    struct lov_stripe_md *lsm, int stripeno, int *set)
-{
-       valid &= src->o_valid;
-
-       if (*set != 0) {
-               tgt->o_valid &= valid;
-               if (valid & OBD_MD_FLSIZE) {
-                       /* this handles sparse files properly */
-                       u64 lov_size;
-
-                       lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
-                       if (lov_size > tgt->o_size)
-                               tgt->o_size = lov_size;
-               }
-               if (valid & OBD_MD_FLBLOCKS)
-                       tgt->o_blocks += src->o_blocks;
-               if (valid & OBD_MD_FLBLKSZ)
-                       tgt->o_blksize += src->o_blksize;
-               if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
-                       tgt->o_ctime = src->o_ctime;
-               if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
-                       tgt->o_mtime = src->o_mtime;
-               if (valid & OBD_MD_FLDATAVERSION)
-                       tgt->o_data_version += src->o_data_version;
-
-               /* handle flags */
-               if (valid & OBD_MD_FLFLAGS)
-                       tgt->o_flags &= src->o_flags;
-               else
-                       tgt->o_flags = 0;
-       } else {
-               memcpy(tgt, src, sizeof(*tgt));
-               tgt->o_oi = lsm->lsm_oi;
-               tgt->o_valid = valid;
-               if (valid & OBD_MD_FLSIZE)
-                       tgt->o_size = lov_stripe_size(lsm, src->o_size,
-                                                     stripeno);
-               tgt->o_flags = 0;
-               if (valid & OBD_MD_FLFLAGS)
-                       tgt->o_flags = src->o_flags;
-       }
-
-       /* data_version needs to be valid on all stripes to be correct! */
-       if (!(valid & OBD_MD_FLDATAVERSION))
-               tgt->o_valid &= ~OBD_MD_FLDATAVERSION;
-
-       *set += 1;
-}
index 1d93581..354f9ff 100644 (file)
@@ -1029,19 +1029,6 @@ do {
                  "%p->lsm_magic=%x\n", (lsmp), (lsmp)->lsm_magic);              \
 } while (0)
 
-int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
-{
-       struct lov_request_set *lovset = (struct lov_request_set *)data;
-       int err;
-       ENTRY;
-
-       /* don't do attribute merge if this aysnc op failed */
-       if (rc)
-               atomic_set(&lovset->set_completes, 0);
-       err = lov_fini_getattr_set(lovset);
-       RETURN(rc ? rc : err);
-}
-
 int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
 {
        struct lov_request_set *lovset = (struct lov_request_set *)data;
index 21c24c2..6836529 100644 (file)
@@ -1430,130 +1430,6 @@ out:
        return rc;
 }
 
-static int lov_dispatch_obd_info_get(const struct lu_env *env,
-                                    struct cl_object *obj,
-                                    struct obd_info *oinfo,
-                                    struct ptlrpc_request_set *set)
-{
-       struct cl_object        *subobj = NULL;
-       struct lov_obd          *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
-       struct lov_request_set  *lovset;
-       struct list_head        *pos;
-       struct lov_request      *req;
-       int                     rc;
-       int                     rc2;
-       ENTRY;
-
-       rc = lov_prep_getattr_set(lov2obd(lov)->obd_self_export, oinfo,
-                                 &lovset);
-       if (rc != 0)
-               RETURN(rc);
-
-       CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes.\n",
-              POSTID(&oinfo->oi_md->lsm_oi),
-              oinfo->oi_md->lsm_stripe_count,
-              oinfo->oi_md->lsm_stripe_size);
-
-       list_for_each(pos, &lovset->set_list) {
-               req = list_entry(pos, struct lov_request, rq_link);
-
-               CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx"
-                      "%u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
-                      POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
-               subobj = lov_find_subobj(env, cl2lov(obj), oinfo->oi_md,
-                                        req->rq_stripe);
-               if (IS_ERR(subobj))
-                       GOTO(errout, rc = PTR_ERR(subobj));
-
-               rc = cl_object_obd_info_get(env, subobj, &req->rq_oi, set);
-               cl_object_put(env, subobj);
-               if (rc != 0) {
-                       CERROR("%s: getattr objid "DOSTID" subobj"
-                              DOSTID" on OST idx %d: rc = %d\n",
-                              lov2obd(lov)->obd_name,
-                              POSTID(&oinfo->oi_oa->o_oi),
-                              POSTID(&req->rq_oi.oi_oa->o_oi),
-                              req->rq_idx, rc);
-                       GOTO(errout, rc);
-               }
-       }
-
-       if (!list_empty(&set->set_requests)) {
-               LASSERT(rc == 0);
-               LASSERT(set->set_interpret == NULL);
-               set->set_interpret = lov_getattr_interpret;
-               set->set_arg = lovset;
-               GOTO(out, rc);
-       }
-errout:
-       if (rc)
-               atomic_set(&lovset->set_completes, 0);
-       rc2 = lov_fini_getattr_set(lovset);
-       rc = rc != 0 ? rc : rc2;
-out:
-       RETURN(rc);
-}
-
-static int lov_object_data_version(const struct lu_env *env,
-                                  struct cl_object *obj, __u64 *data_version,
-                                  int flags)
-{
-       struct ptlrpc_request_set       *set;
-       struct obd_info                 oinfo =  { { { 0 } } };
-       struct obdo                     *obdo = NULL;
-       struct lov_stripe_md            *lsm;
-       int                             rc;
-       ENTRY;
-
-       lsm = lov_lsm_addref(cl2lov(obj));
-       if (!lsm_has_objects(lsm)) {
-               /* If no stripe, we consider version is 0. */
-               *data_version = 0;
-               GOTO(out, rc = 0);
-       }
-
-       OBD_ALLOC_PTR(obdo);
-       if (obdo == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       oinfo.oi_md = lsm;
-       oinfo.oi_oa = obdo;
-       obdo->o_oi = lsm->lsm_oi;
-       obdo->o_mode = S_IFREG;
-       obdo->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLTYPE;
-       if (flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
-               obdo->o_valid |= OBD_MD_FLFLAGS;
-               obdo->o_flags |= OBD_FL_SRVLOCK;
-               if (flags & LL_DV_WR_FLUSH)
-                       obdo->o_flags |= OBD_FL_FLUSH;
-       }
-
-       set = ptlrpc_prep_set();
-       if (set == NULL)
-               GOTO(out_obdo, rc = -ENOMEM);
-
-       rc = lov_dispatch_obd_info_get(env, obj, &oinfo, set);
-       if (rc == 0)
-               rc = ptlrpc_set_wait(set);
-       ptlrpc_set_destroy(set);
-       if (rc == 0) {
-               oinfo.oi_oa->o_valid &= OBD_MD_FLDATAVERSION | OBD_MD_FLFLAGS;
-               if (flags & LL_DV_WR_FLUSH &&
-                   !(oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
-                     oinfo.oi_oa->o_flags & OBD_FL_FLUSH))
-                       rc = -EOPNOTSUPP;
-               else if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
-                       rc = -EOPNOTSUPP;
-               else
-                       *data_version = obdo->o_data_version;
-       }
-out_obdo:
-       OBD_FREE_PTR(obdo);
-out:
-       lov_lsm_put(lsm);
-       RETURN(rc);
-}
-
 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
                                struct lov_user_md __user *lum)
 {
@@ -1640,7 +1516,6 @@ static const struct cl_object_operations lov_ops = {
        .coo_maxbytes     = lov_object_maxbytes,
        .coo_find_cbdata  = lov_object_find_cbdata,
        .coo_fiemap       = lov_object_fiemap,
-       .coo_data_version = lov_object_data_version,
 };
 
 static const struct lu_object_operations lov_lu_obj_ops = {
index 8774fc8..d95e486 100644 (file)
@@ -106,23 +106,6 @@ void lov_update_set(struct lov_request_set *set,
        wake_up(&set->set_waitq);
 }
 
-int lov_update_common_set(struct lov_request_set *set,
-                          struct lov_request *req, int rc)
-{
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        ENTRY;
-
-        lov_update_set(set, req, rc);
-
-        /* grace error on inactive ost */
-        if (rc && !(lov->lov_tgts[req->rq_idx] &&
-                    lov->lov_tgts[req->rq_idx]->ltd_active))
-                rc = 0;
-
-        /* FIXME in raid1 regime, should return 0 */
-        RETURN(rc);
-}
-
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
        list_add_tail(&req->rq_link, &set->set_list);
@@ -186,135 +169,6 @@ out:
        return rc;
 }
 
-static int common_attr_done(struct lov_request_set *set)
-{
-       struct list_head *pos;
-        struct lov_request *req;
-        struct obdo *tmp_oa;
-        int rc = 0, attrset = 0;
-        ENTRY;
-
-       LASSERT(set->set_oi != NULL);
-
-       if (set->set_oi->oi_oa == NULL)
-               RETURN(0);
-
-       if (!atomic_read(&set->set_success))
-               RETURN(-EIO);
-
-       OBDO_ALLOC(tmp_oa);
-       if (tmp_oa == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       list_for_each(pos, &set->set_list) {
-               req = list_entry(pos, struct lov_request, rq_link);
-
-                if (!req->rq_complete || req->rq_rc)
-                        continue;
-                if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
-                        continue;
-                lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
-                                req->rq_oi.oi_oa->o_valid,
-                                set->set_oi->oi_md, req->rq_stripe, &attrset);
-        }
-        if (!attrset) {
-                CERROR("No stripes had valid attrs\n");
-                rc = -EIO;
-        }
-
-        tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
-        memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
-out:
-        if (tmp_oa)
-                OBDO_FREE(tmp_oa);
-        RETURN(rc);
-
-}
-
-int lov_fini_getattr_set(struct lov_request_set *set)
-{
-       int rc = 0;
-       ENTRY;
-
-       if (set == NULL)
-               RETURN(0);
-       LASSERT(set->set_exp);
-       if (atomic_read(&set->set_completes))
-               rc = common_attr_done(set);
-
-       lov_put_reqset(set);
-
-       RETURN(rc);
-}
-
-/* The callback for osc_getattr_async that finilizes a request info when a
- * response is received. */
-static int cb_getattr_update(void *cookie, int rc)
-{
-        struct obd_info *oinfo = cookie;
-        struct lov_request *lovreq;
-        lovreq = container_of(oinfo, struct lov_request, rq_oi);
-        return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
-}
-
-int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
-                         struct lov_request_set **reqset)
-{
-        struct lov_request_set *set;
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int rc = 0, i;
-        ENTRY;
-
-        OBD_ALLOC(set, sizeof(*set));
-        if (set == NULL)
-                RETURN(-ENOMEM);
-        lov_init_set(set);
-
-        set->set_exp = exp;
-        set->set_oi = oinfo;
-
-        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
-               struct lov_oinfo *loi;
-               struct lov_request *req;
-
-               loi = oinfo->oi_md->lsm_oinfo[i];
-               if (lov_oinfo_is_dummy(loi))
-                       continue;
-
-               if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
-                       CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
-                       continue;
-               }
-
-                OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
-                        GOTO(out_set, rc = -ENOMEM);
-
-                req->rq_stripe = i;
-                req->rq_idx = loi->loi_ost_idx;
-
-                OBDO_ALLOC(req->rq_oi.oi_oa);
-                if (req->rq_oi.oi_oa == NULL) {
-                        OBD_FREE(req, sizeof(*req));
-                        GOTO(out_set, rc = -ENOMEM);
-                }
-                memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
-                       sizeof(*req->rq_oi.oi_oa));
-                req->rq_oi.oi_oa->o_oi = loi->loi_oi;
-                req->rq_oi.oi_cb_up = cb_getattr_update;
-                req->rq_oi.oi_capa = oinfo->oi_capa;
-
-                lov_set_add_req(req, set);
-        }
-        if (!set->set_count)
-                GOTO(out_set, rc = -EIO);
-        *reqset = set;
-        RETURN(rc);
-out_set:
-        lov_fini_getattr_set(set);
-        RETURN(rc);
-}
-
 #define LOV_U64_MAX ((__u64)~0ULL)
 #define LOV_SUM_MAX(tot, add)                                           \
         do {                                                            \
index 2d8a11b..0a3e2b4 100644 (file)
@@ -131,6 +131,7 @@ void cl_io_fini(const struct lu_env *env, struct cl_io *io)
        switch(io->ci_type) {
        case CIT_READ:
        case CIT_WRITE:
+       case CIT_DATA_VERSION:
                break;
        case CIT_FAULT:
        case CIT_FSYNC:
index 9b3e71b..0874045 100644 (file)
@@ -418,47 +418,6 @@ int cl_object_fiemap(const struct lu_env *env, struct cl_object *obj,
 }
 EXPORT_SYMBOL(cl_object_fiemap);
 
-int cl_object_obd_info_get(const struct lu_env *env, struct cl_object *obj,
-                          struct obd_info *oinfo,
-                          struct ptlrpc_request_set *set)
-{
-       struct lu_object_header *top;
-       int                     result = 0;
-       ENTRY;
-
-       top = obj->co_lu.lo_header;
-       list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
-               if (obj->co_ops->coo_obd_info_get != NULL) {
-                       result = obj->co_ops->coo_obd_info_get(env, obj, oinfo,
-                                                              set);
-                       if (result != 0)
-                               break;
-               }
-       }
-       RETURN(result);
-}
-EXPORT_SYMBOL(cl_object_obd_info_get);
-
-int cl_object_data_version(const struct lu_env *env, struct cl_object *obj,
-                          __u64 *data_version, int flags)
-{
-       struct lu_object_header *top;
-       int                     result = 0;
-       ENTRY;
-
-       top = obj->co_lu.lo_header;
-       list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
-               if (obj->co_ops->coo_data_version != NULL) {
-                       result = obj->co_ops->coo_data_version(env, obj,
-                                                       data_version, flags);
-                       if (result != 0)
-                               break;
-               }
-       }
-       RETURN(result);
-}
-EXPORT_SYMBOL(cl_object_data_version);
-
 int cl_object_layout_get(const struct lu_env *env, struct cl_object *obj,
                         struct cl_layout *cl)
 {
index 7663bff..e13eadf 100644 (file)
@@ -229,7 +229,4 @@ struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo);
 void osc_set_capa_size(struct ptlrpc_request *req,
                       const struct req_msg_field *field, struct obd_capa *oc);
-int osc_getattr_interpret(const struct lu_env *env,
-                         struct ptlrpc_request *req,
-                         struct osc_async_args *aa, int rc);
 #endif /* OSC_INTERNAL_H */
index 686e1e8..34679c9 100644 (file)
@@ -611,6 +611,111 @@ static void osc_io_setattr_end(const struct lu_env *env,
        }
 }
 
+struct osc_data_version_args {
+       struct osc_io *dva_oio;
+};
+
+static int
+osc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+                          void *arg, int rc)
+{
+       struct osc_data_version_args *dva = arg;
+       struct osc_io *oio = dva->dva_oio;
+       const struct ost_body *body;
+
+       ENTRY;
+       if (rc < 0)
+               GOTO(out, rc);
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, &oio->oi_oa,
+                            &body->oa);
+       EXIT;
+out:
+       oio->oi_cbarg.opc_rc = rc;
+       complete(&oio->oi_cbarg.opc_sync);
+
+       return 0;
+}
+
+static int osc_io_data_version_start(const struct lu_env *env,
+                                    const struct cl_io_slice *slice)
+{
+       struct cl_data_version_io *dv   = &slice->cis_io->u.ci_data_version;
+       struct osc_io           *oio    = cl2osc_io(env, slice);
+       struct obdo             *oa     = &oio->oi_oa;
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       struct osc_object       *obj    = cl2osc(slice->cis_obj);
+       struct lov_oinfo        *loi    = obj->oo_oinfo;
+       struct obd_export       *exp    = osc_export(obj);
+       struct ptlrpc_request   *req;
+       struct ost_body         *body;
+       struct osc_data_version_args *dva;
+       int rc;
+
+       ENTRY;
+       memset(oa, 0, sizeof(*oa));
+       oa->o_oi = loi->loi_oi;
+       oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
+               oa->o_valid |= OBD_MD_FLFLAGS;
+               oa->o_flags |= OBD_FL_SRVLOCK;
+               if (dv->dv_flags & LL_DV_WR_FLUSH)
+                       oa->o_flags |= OBD_FL_FLUSH;
+       }
+
+       init_completion(&cbargs->opc_sync);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
+       if (rc < 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_interpret_reply = osc_data_version_interpret;
+       CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
+       dva = ptlrpc_req_async_args(req);
+       dva->dva_oio = oio;
+
+       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+
+       RETURN(0);
+}
+
+static void osc_io_data_version_end(const struct lu_env *env,
+                                   const struct cl_io_slice *slice)
+{
+       struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
+       struct osc_io           *oio    = cl2osc_io(env, slice);
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+
+       ENTRY;
+       wait_for_completion(&cbargs->opc_sync);
+
+       if (cbargs->opc_rc != 0) {
+               slice->cis_io->ci_result = cbargs->opc_rc;
+       } else if (!(oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)) {
+               slice->cis_io->ci_result = -EOPNOTSUPP;
+       } else {
+               dv->dv_data_version = oio->oi_oa.o_data_version;
+               slice->cis_io->ci_result = 0;
+       }
+
+       EXIT;
+}
+
 static int osc_io_read_start(const struct lu_env *env,
                              const struct cl_io_slice *slice)
 {
@@ -768,6 +873,10 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_start  = osc_io_setattr_start,
                        .cio_end    = osc_io_setattr_end
                },
+               [CIT_DATA_VERSION] = {
+                       .cio_start  = osc_io_data_version_start,
+                       .cio_end    = osc_io_data_version_end,
+               },
                [CIT_FAULT] = {
                        .cio_start  = osc_io_fault_start,
                        .cio_end    = osc_io_end,
index 3a2d09b..0477dcd 100644 (file)
@@ -341,41 +341,6 @@ drop_lock:
        RETURN(rc);
 }
 
-static int osc_object_obd_info_get(const struct lu_env *env,
-                                  struct cl_object *obj,
-                                  struct obd_info *oinfo,
-                                  struct ptlrpc_request_set *set)
-{
-       struct ptlrpc_request   *req;
-       struct osc_async_args   *aa;
-       int                     rc;
-       ENTRY;
-
-       req = ptlrpc_request_alloc(class_exp2cliimp(osc_export(cl2osc(obj))),
-                                  &RQF_OST_GETATTR);
-       if (req == NULL)
-               RETURN(-ENOMEM);
-
-       osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
-       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
-       if (rc != 0) {
-               ptlrpc_request_free(req);
-               RETURN(rc);
-       }
-
-       osc_pack_req_body(req, oinfo);
-
-       ptlrpc_request_set_replen(req);
-       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
-
-       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-       aa = ptlrpc_req_async_args(req);
-       aa->aa_oi = oinfo;
-
-       ptlrpc_set_add_req(set, req);
-       RETURN(0);
-}
-
 void osc_object_set_contended(struct osc_object *obj)
 {
         obj->oo_contention_time = cfs_time_current();
@@ -424,7 +389,6 @@ static const struct cl_object_operations osc_ops = {
        .coo_prune        = osc_object_prune,
        .coo_find_cbdata  = osc_object_find_cbdata,
        .coo_fiemap       = osc_object_fiemap,
-       .coo_obd_info_get = osc_object_obd_info_get,
 };
 
 static const struct lu_object_operations osc_lu_obj_ops = {
index 58647e7..28a7106 100644 (file)
@@ -135,35 +135,6 @@ void osc_set_capa_size(struct ptlrpc_request *req,
                 ;
 }
 
-int osc_getattr_interpret(const struct lu_env *env,
-                         struct ptlrpc_request *req,
-                         struct osc_async_args *aa, int rc)
-{
-        struct ost_body *body;
-        ENTRY;
-
-        if (rc != 0)
-                GOTO(out, rc);
-
-        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body) {
-               CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-               lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
-                                    aa->aa_oi->oi_oa, &body->oa);
-
-               /* This should really be sent by the OST */
-               aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
-               aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
-        } else {
-                CDEBUG(D_INFO, "can't unpack ost_body\n");
-                rc = -EPROTO;
-                aa->aa_oi->oi_oa->o_valid = 0;
-        }
-out:
-        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
-        RETURN(rc);
-}
-
 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
                        struct obd_info *oinfo)
 {
index c777e32..5d1f474 100755 (executable)
@@ -2525,6 +2525,31 @@ test_36() {
 }
 run_test 36 "Move file during restore"
 
+test_37() {
+       # LU-5683: check that an archived dirty file can be rearchived.
+       copytool_cleanup
+       copytool_setup $SINGLEAGT $MOUNT2
+
+       mkdir -p $DIR/$tdir
+       local f=$DIR/$tdir/$tfile
+       local fid
+
+       fid=$(make_small $f) || error "cannot create small file"
+
+       $LFS hsm_archive --archive $HSM_ARCHIVE_NUMBER $f
+       wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_release $f || error "cannot release $f"
+
+       # Dirty file.
+       dd if=/dev/urandom of=$f bs=1M count=1 || error "cannot dirty file"
+
+       $LFS hsm_archive --archive $HSM_ARCHIVE_NUMBER $f
+       wait_request_state $fid ARCHIVE SUCCEED
+
+       copytool_cleanup
+}
+run_test 37 "re-archive a dirty file"
+
 multi_archive() {
        local prefix=$1
        local count=$2