Whamcloud - gitweb
LU-1333 hsm: Add hsm_release feature. 28/7028/8
authorJinshan Xiong <jinshan.xiong@intel.com>
Fri, 19 Jul 2013 22:27:30 +0000 (15:27 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 29 Jul 2013 17:41:06 +0000 (17:41 +0000)
HSM Release is one of the key feature of HSM. To perform HSM
release, clients need to acquire the file lease exclusivelt and
flush dirty cache from clients. A special close REQ will be sent
to the MDT to release the lease and get rid of OST objects.

Signed-off-by: Aurelien Degremont <aurelien.degremont@cea.fr>
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I77eeccf81a852955354a382d80b887df00d1c4fc
Reviewed-on: http://review.whamcloud.com/7028
Tested-by: Hudson
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
23 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre/lustre_user.h
lustre/include/lustre_req_layout.h
lustre/include/obd.h
lustre/lclient/lcommon_misc.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/vvp_object.c
lustre/lod/lod_qos.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_object.c
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/pack_generic.c
lustre/tests/sanity-hsm.sh
lustre/utils/req-layout.c

index c3352bb..860a3de 100644 (file)
@@ -1746,6 +1746,7 @@ static inline __u32 lov_mds_md_size(__u16 stripes, __u32 lmm_magic)
 #define OBD_MD_FLRMTRGETFACL (0x0008000000000000ULL) /* lfs rgetfacl case */
 
 #define OBD_MD_FLDATAVERSION (0x0010000000000000ULL) /* iversion sum */
+#define OBD_MD_FLRELEASED    (0x0020000000000000ULL) /* file released */
 
 #define OBD_MD_FLGETATTR (OBD_MD_FLID    | OBD_MD_FLATIME | OBD_MD_FLMTIME | \
                           OBD_MD_FLCTIME | OBD_MD_FLSIZE  | OBD_MD_FLBLKSZ | \
@@ -2384,6 +2385,7 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa);
                                              * delegation, succeed if it's not
                                              * being opened with conflict mode.
                                              */
+#define MDS_OPEN_RELEASE   02000000000000ULL /* Open the file for HSM release */
 
 /* permission for create non-directory file */
 #define MAY_CREATE      (1 << 7)
@@ -2402,7 +2404,7 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa);
 /* lfs rgetfacl permission check */
 #define MAY_RGETFACL    (1 << 14)
 
-enum {
+enum mds_op_bias {
        MDS_CHECK_SPLIT         = 1 << 0,
        MDS_CROSS_REF           = 1 << 1,
        MDS_VTX_BYPASS          = 1 << 2,
@@ -2415,6 +2417,7 @@ enum {
        MDS_DATA_MODIFIED       = 1 << 9,
        MDS_CREATE_VOLATILE     = 1 << 10,
        MDS_OWNEROVERRIDE       = 1 << 11,
+       MDS_HSM_RELEASE         = 1 << 12,
 };
 
 /* instance of mdt_reint_rec */
@@ -3738,5 +3741,14 @@ struct mdc_swap_layouts {
 
 void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl);
 
+struct close_data {
+       struct lustre_handle    cd_handle;
+       struct lu_fid           cd_fid;
+       __u64                   cd_data_version;
+       __u64                   cd_reserved[8];
+};
+
+void lustre_swab_close_data(struct close_data *data);
+
 #endif
 /** @} lustreidl */
index 039f9ec..952c264 100644 (file)
@@ -634,10 +634,13 @@ struct if_quotactl {
 };
 
 /* swap layout flags */
-#define        SWAP_LAYOUTS_CHECK_DV1          (1 << 0)
-#define        SWAP_LAYOUTS_CHECK_DV2          (1 << 1)
-#define        SWAP_LAYOUTS_KEEP_MTIME         (1 << 2)
-#define        SWAP_LAYOUTS_KEEP_ATIME         (1 << 3)
+#define SWAP_LAYOUTS_CHECK_DV1         (1 << 0)
+#define SWAP_LAYOUTS_CHECK_DV2         (1 << 1)
+#define SWAP_LAYOUTS_KEEP_MTIME                (1 << 2)
+#define SWAP_LAYOUTS_KEEP_ATIME                (1 << 3)
+
+/* Swap XATTR_NAME_HSM as well, only on the MDT so far */
+#define SWAP_LAYOUTS_MDS_HSM           (1 << 31)
 struct lustre_swap_layouts {
        __u64   sl_flags;
        __u32   sl_fd;
index d795b03..9e66328 100644 (file)
@@ -164,6 +164,7 @@ extern struct req_format RQF_UPDATE_OBJ;
  */
 extern struct req_format RQF_MDS_GETATTR_NAME;
 extern struct req_format RQF_MDS_CLOSE;
+extern struct req_format RQF_MDS_RELEASE_CLOSE;
 extern struct req_format RQF_MDS_PIN;
 extern struct req_format RQF_MDS_UNPIN;
 extern struct req_format RQF_MDS_CONNECT;
@@ -262,6 +263,7 @@ extern struct req_msg_field RMF_GETINFO_VAL;
 extern struct req_msg_field RMF_GETINFO_VALLEN;
 extern struct req_msg_field RMF_GETINFO_KEY;
 extern struct req_msg_field RMF_IDX_INFO;
+extern struct req_msg_field RMF_CLOSE_DATA;
 
 /*
  * connection handle received in MDS_CONNECT request.
index e9150ae..d387129 100644 (file)
@@ -1084,24 +1084,24 @@ struct md_op_data {
 #ifdef __KERNEL__
        unsigned int            op_attr_flags;
 #endif
-        __u64                   op_valid;
-        loff_t                  op_attr_blocks;
+       __u64                   op_valid;
+       loff_t                  op_attr_blocks;
 
-        /* Size-on-MDS epoch and flags. */
-        __u64                   op_ioepoch;
+       /* Size-on-MDS epoch and flags. */
+       __u64                   op_ioepoch;
        __u32                   op_flags;
 
-        /* Capa fields */
-        struct obd_capa        *op_capa1;
-        struct obd_capa        *op_capa2;
+       /* Capa fields */
+       struct obd_capa        *op_capa1;
+       struct obd_capa        *op_capa2;
 
-        /* Various operation flags. */
-       __u32                   op_bias;
+       /* Various operation flags. */
+       enum mds_op_bias        op_bias;
 
-        /* Operation type */
+       /* Operation type */
        __u32                   op_opc;
 
-        /* Used by readdir */
+       /* Used by readdir */
        __u64                   op_offset;
 
        /* Used by readdir */
@@ -1110,6 +1110,10 @@ struct md_op_data {
        /* used to transfer info between the stacks of MD client
         * see enum op_cli_flags */
        __u32                   op_cli_flags;
+
+       /* File object data version for HSM release, on client */
+       __u64                   op_data_version;
+       struct lustre_handle    op_lease_handle;
 };
 
 enum op_cli_flags {
index 26cfa04..9dd8918 100644 (file)
@@ -140,11 +140,13 @@ int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
         io->ci_obj = obj;
        io->ci_ignore_layout = 1;
 
-        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
-        if (rc) {
-                LASSERT(rc < 0);
-                cl_env_put(env, &refcheck);
-                return rc;
+       rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+       if (rc) {
+               /* Does not make sense to take GL for released layout */
+               if (rc > 0)
+                       rc = -ENOTSUPP;
+               cl_env_put(env, &refcheck);
+               return rc;
         }
 
         descr = &ccc_env_info(env)->cti_descr;
index e625200..23d2af8 100644 (file)
@@ -1833,8 +1833,28 @@ out_rmdir:
                        RETURN(-EFAULT);
                }
 
-               rc = obd_iocontrol(cmd, ll_i2mdexp(inode), totalsize,
-                                  hur, NULL);
+               if (hur->hur_request.hr_action == HUA_RELEASE) {
+                       const struct lu_fid *fid;
+                       struct inode *f;
+                       int i;
+
+                       for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
+                               fid = &hur->hur_user_item[i].hui_fid;
+                               f = search_inode_for_lustre(inode->i_sb, fid);
+                               if (IS_ERR(f)) {
+                                       rc = PTR_ERR(f);
+                                       break;
+                               }
+
+                               rc = ll_hsm_release(f);
+                               iput(f);
+                               if (rc != 0)
+                                       break;
+                       }
+               } else {
+                       rc = obd_iocontrol(cmd, ll_i2mdexp(inode), totalsize,
+                                          hur, NULL);
+               }
 
                OBD_FREE_LARGE(hur, totalsize);
 
index 648b40a..83476bd 100644 (file)
@@ -119,8 +119,9 @@ out:
 }
 
 static int ll_close_inode_openhandle(struct obd_export *md_exp,
-                                     struct inode *inode,
-                                     struct obd_client_handle *och)
+                                    struct inode *inode,
+                                    struct obd_client_handle *och,
+                                    const __u64 *data_version)
 {
         struct obd_export *exp = ll_i2mdexp(inode);
         struct md_op_data *op_data;
@@ -144,7 +145,14 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
         if (op_data == NULL)
                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
 
-        ll_prepare_close(inode, op_data, och);
+       ll_prepare_close(inode, op_data, och);
+       if (data_version != NULL) {
+               /* Pass in data_version implies release. */
+               op_data->op_bias |= MDS_HSM_RELEASE;
+               op_data->op_data_version = *data_version;
+               op_data->op_lease_handle = och->och_lease_handle;
+               op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
+       }
         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
         rc = md_close(md_exp, op_data, och->och_mod, &req);
         if (rc == -EAGAIN) {
@@ -173,8 +181,6 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
                spin_unlock(&lli->lli_lock);
        }
 
-        ll_finish_md_op_data(op_data);
-
         if (rc == 0) {
                 rc = ll_objects_destroy(req, inode);
                 if (rc)
@@ -182,6 +188,14 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
                                inode->i_ino, rc);
         }
 
+       if (rc == 0 && op_data->op_bias & MDS_HSM_RELEASE) {
+               struct mdt_body *body;
+               body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+               if (!(body->valid & OBD_MD_FLRELEASED))
+                       rc = -EBUSY;
+       }
+
+        ll_finish_md_op_data(op_data);
         EXIT;
 out:
 
@@ -233,7 +247,7 @@ int ll_md_real_close(struct inode *inode, int flags)
         if (och) { /* There might be a race and somebody have freed this och
                       already */
                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
-                                               inode, och);
+                                              inode, och, NULL);
         }
 
         RETURN(rc);
@@ -264,7 +278,7 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode,
        }
 
        if (fd->fd_och != NULL) {
-               rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och);
+               rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och, NULL);
                fd->fd_och = NULL;
                GOTO(out, rc);
        }
@@ -734,7 +748,7 @@ static int ll_md_blocking_lease_ast(struct ldlm_lock *lock,
  * Acquire a lease and open the file.
  */
 struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
-                                       fmode_t fmode)
+                                       fmode_t fmode, __u64 open_flags)
 {
        struct lookup_intent it = { .it_op = IT_OPEN };
        struct ll_sb_info *sbi = ll_i2sbi(inode);
@@ -803,7 +817,8 @@ struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
        /* To tell the MDT this openhandle is from the same owner */
        op_data->op_handle = old_handle;
 
-       it.it_flags = fmode | MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
+       it.it_flags = fmode | open_flags;
+       it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
        rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, &req,
                                ll_md_blocking_lease_ast,
        /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
@@ -849,7 +864,7 @@ struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
        RETURN(och);
 
 out_close:
-       rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och);
+       rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och, NULL);
        if (rc2)
                CERROR("Close openhandle returned %d\n", rc2);
 
@@ -895,7 +910,8 @@ int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
        if (lease_broken != NULL)
                *lease_broken = cancelled;
 
-       rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och);
+       rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
+                                      NULL);
        RETURN(rc);
 }
 EXPORT_SYMBOL(ll_lease_close);
@@ -1724,14 +1740,14 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
        ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
 
         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
-                                       inode, och);
- out:
-        /* this one is in place of ll_file_open */
-        if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
-                ptlrpc_req_finished(it->d.lustre.it_data);
-                it_clear_disposition(it, DISP_ENQ_OPEN_REF);
-        }
-        RETURN(rc);
+                                      inode, och, NULL);
+out:
+       /* this one is in place of ll_file_open */
+       if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
+               ptlrpc_req_finished(it->d.lustre.it_data);
+               it_clear_disposition(it, DISP_ENQ_OPEN_REF);
+       }
+       RETURN(rc);
 }
 
 /**
@@ -1934,6 +1950,53 @@ out:
        RETURN(rc);
 }
 
+/*
+ * Trigger a HSM release request for the provided inode.
+ */
+int ll_hsm_release(struct inode *inode)
+{
+       struct cl_env_nest nest;
+       struct lu_env *env;
+       struct obd_client_handle *och = NULL;
+       __u64 data_version = 0;
+       int rc;
+       ENTRY;
+
+       CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
+              ll_get_fsname(inode->i_sb, NULL, 0),
+              PFID(&ll_i2info(inode)->lli_fid));
+
+       och = ll_lease_open(inode, NULL, FMODE_WRITE, MDS_OPEN_RELEASE);
+       if (IS_ERR(och))
+               GOTO(out, rc = PTR_ERR(och));
+
+       /* Grab latest data_version and [am]time values */
+       rc = ll_data_version(inode, &data_version, 1);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       env = cl_env_nested_get(&nest);
+       if (IS_ERR(env))
+               GOTO(out, rc = PTR_ERR(env));
+
+       ll_merge_lvb(env, inode);
+       cl_env_nested_put(&nest, env);
+
+       /* Release the file.
+        * NB: lease lock handle is released in mdc_hsm_release_pack() because
+        * we still need it to pack l_remote_handle to MDT. */
+       rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
+                                      &data_version);
+       och = NULL;
+
+       EXIT;
+out:
+       if (och != NULL && !IS_ERR(och)) /* close the file */
+               ll_lease_close(och, inode, NULL);
+
+       return rc;
+}
+
 struct ll_swap_stack {
        struct iattr             ia1, ia2;
        __u64                    dv1, dv2;
@@ -2361,7 +2424,7 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
                CDEBUG(D_INODE, "Set lease with mode %d\n", mode);
 
                /* apply for lease */
-               och = ll_lease_open(inode, file, mode);
+               och = ll_lease_open(inode, file, mode, 0);
                if (IS_ERR(och))
                        RETURN(PTR_ERR(och));
 
index 912b04d..4fc1e4b 100644 (file)
@@ -817,9 +817,10 @@ int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg);
 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
 int ll_fid2path(struct inode *inode, void *arg);
 int ll_data_version(struct inode *inode, __u64 *data_version, int extent_lock);
+int ll_hsm_release(struct inode *inode);
 
 struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
-                                       fmode_t mode);
+                                       fmode_t mode, __u64 flags);
 int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
                   bool *lease_broken);
 
index c60b22a..65a6e45 100644 (file)
@@ -141,7 +141,7 @@ int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
                        lli->lli_layout_gen,
                        conf->u.coc_md->lsm->lsm_layout_gen);
 
-               lli->lli_has_smd = true;
+               lli->lli_has_smd = lsm_has_objects(conf->u.coc_md->lsm);
                lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen;
        } else {
                CDEBUG(D_VFSTRACE, "layout lock destroyed: %u.\n",
index 8a4d1bb..3cae7a3 100644 (file)
@@ -1211,18 +1211,13 @@ static int lod_use_defined_striping(const struct lu_env *env,
                                    struct lod_object *mo,
                                    const struct lu_buf *buf)
 {
-       struct lod_device      *d = lu2lod_dev(lod2lu_obj(mo)->lo_dev);
        struct lov_mds_md_v1   *v1 = buf->lb_buf;
        struct lov_mds_md_v3   *v3 = buf->lb_buf;
        struct lov_ost_data_v1 *objs;
        __u32                   magic;
-       int                     rc;
+       int                     rc = 0;
        ENTRY;
 
-       rc = lod_verify_striping(d, buf, 1);
-       if (rc)
-               RETURN(rc);
-
        magic = le32_to_cpu(v1->lmm_magic);
        if (magic == LOV_MAGIC_V1_DEF) {
                objs = &v1->lmm_objects[0];
@@ -1239,7 +1234,14 @@ static int lod_use_defined_striping(const struct lu_env *env,
        mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
        LASSERT(buf->lb_len >= lov_mds_md_size(mo->ldo_stripenr, magic));
 
-       rc = lod_initialize_objects(env, mo, objs);
+       /* fixup for released file before object initialization */
+       if (mo->ldo_pattern & LOV_PATTERN_F_RELEASED) {
+               mo->ldo_released_stripenr = mo->ldo_stripenr;
+               mo->ldo_stripenr = 0;
+       }
+
+       if (mo->ldo_stripenr > 0)
+               rc = lod_initialize_objects(env, mo, objs);
 
 out:
        RETURN(rc);
index c7056c8..2698495 100644 (file)
@@ -172,6 +172,22 @@ enum lov_layout_type {
        LLT_NR
 };
 
+static inline char *llt2str(enum lov_layout_type llt)
+{
+       switch (llt) {
+       case LLT_EMPTY:
+               return "EMPTY";
+       case LLT_RAID0:
+               return "RAID0";
+       case LLT_RELEASED:
+               return "RELEASED";
+       case LLT_NR:
+               LBUG();
+       }
+       LBUG();
+       return "";
+}
+
 /**
  * lov-specific file state.
  *
index 65dd11d..1d10ef5 100644 (file)
@@ -401,33 +401,41 @@ static int lov_print_empty(const struct lu_env *env, void *cookie,
 }
 
 static int lov_print_raid0(const struct lu_env *env, void *cookie,
-                           lu_printer_t p, const struct lu_object *o)
+                          lu_printer_t p, const struct lu_object *o)
 {
-        struct lov_object       *lov = lu2lov(o);
-        struct lov_layout_raid0 *r0  = lov_r0(lov);
-       struct lov_stripe_md    *lsm = lov->lo_lsm;
-        int i;
+       struct lov_object       *lov = lu2lov(o);
+       struct lov_layout_raid0 *r0  = lov_r0(lov);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
+       int                      i;
 
-        (*p)(env, cookie, "stripes: %d, %svalid, lsm{%p 0x%08X %d %u %u}: \n",
-               r0->lo_nr, lov->lo_layout_invalid ? "in" : "", lsm,
+       (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
+               r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
                lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
                lsm->lsm_stripe_count, lsm->lsm_layout_gen);
-        for (i = 0; i < r0->lo_nr; ++i) {
-                struct lu_object *sub;
-
-                if (r0->lo_sub[i] != NULL) {
-                        sub = lovsub2lu(r0->lo_sub[i]);
-                        lu_object_print(env, cookie, p, sub);
-                } else
-                        (*p)(env, cookie, "sub %d absent\n", i);
-        }
-        return 0;
+       for (i = 0; i < r0->lo_nr; ++i) {
+               struct lu_object *sub;
+
+               if (r0->lo_sub[i] != NULL) {
+                       sub = lovsub2lu(r0->lo_sub[i]);
+                       lu_object_print(env, cookie, p, sub);
+               } else {
+                       (*p)(env, cookie, "sub %d absent\n", i);
+               }
+       }
+       return 0;
 }
 
 static int lov_print_released(const struct lu_env *env, void *cookie,
                                lu_printer_t p, const struct lu_object *o)
 {
-       (*p)(env, cookie, "released\n");
+       struct lov_object       *lov = lu2lov(o);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
+
+       (*p)(env, cookie,
+               "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
+               lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+               lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+               lsm->lsm_stripe_count, lsm->lsm_layout_gen);
        return 0;
 }
 
@@ -648,8 +656,8 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
 }
 
 static int lov_layout_change(const struct lu_env *unused,
-                             struct lov_object *lov,
-                             const struct cl_object_conf *conf)
+                            struct lov_object *lov,
+                            const struct cl_object_conf *conf)
 {
        int result;
        enum lov_layout_type llt = LLT_EMPTY;
@@ -676,6 +684,10 @@ static int lov_layout_change(const struct lu_env *unused,
                RETURN(PTR_ERR(env));
        }
 
+       CDEBUG(D_INODE, DFID" from %s to %s\n",
+              PFID(lu_object_fid(lov2lu(lov))),
+              llt2str(lov->lo_type), llt2str(llt));
+
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
 
@@ -766,8 +778,9 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        if (conf->u.coc_md != NULL)
                lsm = conf->u.coc_md->lsm;
        if ((lsm == NULL && lov->lo_lsm == NULL) ||
-           (lsm != NULL && lov->lo_lsm != NULL &&
-            lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
+           ((lsm != NULL && lov->lo_lsm != NULL) &&
+            (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
+            (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
                /* same version of layout */
                lov->lo_layout_invalid = false;
                GOTO(out, result = 0);
@@ -784,6 +797,8 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
 
 out:
        lov_conf_unlock(lov);
+       CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
+              PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
        RETURN(result);
 }
 
index 09b0ffa..3a4978b 100644 (file)
@@ -189,7 +189,8 @@ static __u64 mds_pack_open_flags(__u64 flags, __u32 mode)
        __u64 cr_flags = (flags & (FMODE_READ | FMODE_WRITE |
                                   MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS |
                                   MDS_OPEN_OWNEROVERRIDE | MDS_OPEN_LOCK |
-                                  MDS_OPEN_BY_FID | MDS_OPEN_LEASE));
+                                  MDS_OPEN_BY_FID | MDS_OPEN_LEASE |
+                                  MDS_OPEN_RELEASE));
         if (flags & O_CREAT)
                 cr_flags |= MDS_OPEN_CREAT;
         if (flags & O_EXCL)
@@ -503,6 +504,28 @@ void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
         }
 }
 
+static void mdc_hsm_release_pack(struct ptlrpc_request *req,
+                                struct md_op_data *op_data)
+{
+       if (op_data->op_bias & MDS_HSM_RELEASE) {
+               struct close_data *data;
+               struct ldlm_lock *lock;
+
+               data = req_capsule_client_get(&req->rq_pill, &RMF_CLOSE_DATA);
+               LASSERT(data != NULL);
+
+               lock = ldlm_handle2lock(&op_data->op_lease_handle);
+               if (lock != NULL) {
+                       data->cd_handle = lock->l_remote_handle;
+                       ldlm_lock_put(lock);
+               }
+               ldlm_cli_cancel(&op_data->op_lease_handle, LCF_LOCAL);
+
+               data->cd_data_version = op_data->op_data_version;
+               data->cd_fid = op_data->op_fid2;
+       }
+}
+
 void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
 {
         struct mdt_ioepoch *epoch;
@@ -514,6 +537,7 @@ void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
         mdc_setattr_pack_rec(rec, op_data);
         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
         mdc_ioepoch_pack(epoch, op_data);
+       mdc_hsm_release_pack(req, op_data);
 }
 
 static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
index ebdd54b..aea0265 100644 (file)
@@ -824,13 +824,29 @@ static void mdc_close_handle_reply(struct ptlrpc_request *req,
 int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
               struct md_open_data *mod, struct ptlrpc_request **request)
 {
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct ptlrpc_request *req;
-        int                    rc;
-        ENTRY;
+       struct obd_device     *obd = class_exp2obd(exp);
+       struct ptlrpc_request *req;
+       struct req_format     *req_fmt;
+       int                    rc;
+       int                    saved_rc = 0;
+       ENTRY;
 
-        *request = NULL;
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_CLOSE);
+       req_fmt = &RQF_MDS_CLOSE;
+       if (op_data->op_bias & MDS_HSM_RELEASE) {
+               req_fmt = &RQF_MDS_RELEASE_CLOSE;
+
+               /* allocate a FID for volatile file */
+               rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
+               if (rc < 0) {
+                       CERROR("%s: "DFID" failed to allocate FID: %d\n",
+                              obd->obd_name, PFID(&op_data->op_fid1), rc);
+                       /* save the errcode and proceed to close */
+                       saved_rc = rc;
+               }
+       }
+
+       *request = NULL;
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -920,7 +936,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
         }
         *request = req;
         mdc_close_handle_reply(req, op_data, rc);
-        RETURN(rc);
+        RETURN(rc < 0 ? rc : saved_rc);
 }
 
 int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
index 1b37f46..58ca471 100644 (file)
@@ -148,7 +148,7 @@ struct mdd_thread_info {
        struct lu_dirent          mti_ent;
        char                      mti_key[NAME_MAX + 16];
         struct obd_trans_info     mti_oti;
-        struct lu_buf             mti_buf;
+        struct lu_buf             mti_buf[4];
         struct lu_buf             mti_big_buf; /* biggish persistent buf */
        struct lu_buf             mti_link_buf; /* buf for link ea */
         struct lu_name            mti_name;
index b4f16b3..99102ea 100644 (file)
@@ -120,23 +120,23 @@ const struct lu_name *mdd_name_get_const(const struct lu_env *env,
 
 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 {
-        struct lu_buf *buf;
+       struct lu_buf *buf;
 
-        buf = &mdd_env_info(env)->mti_buf;
-        buf->lb_buf = area;
-        buf->lb_len = len;
-        return buf;
+       buf = &mdd_env_info(env)->mti_buf[0];
+       buf->lb_buf = area;
+       buf->lb_len = len;
+       return buf;
 }
 
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
                                        const void *area, ssize_t len)
 {
-        struct lu_buf *buf;
+       struct lu_buf *buf;
 
-        buf = &mdd_env_info(env)->mti_buf;
-        buf->lb_buf = (void *)area;
-        buf->lb_len = len;
-        return buf;
+       buf = &mdd_env_info(env)->mti_buf[0];
+       buf->lb_buf = (void *)area;
+       buf->lb_len = len;
+       return buf;
 }
 
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
@@ -964,7 +964,7 @@ static int mdd_hsm_update_locked(const struct lu_env *env,
        struct mdd_thread_info *info = mdd_env_info(env);
        struct mdd_device      *mdd = mdo2mdd(obj);
        struct mdd_object      *mdd_obj = md2mdd_obj(obj);
-       struct lu_buf          *current_buf = &info->mti_buf;
+       struct lu_buf          *current_buf;
        struct md_hsm          *current_mh;
        struct md_hsm          *new_mh;
        int                     rc;
@@ -975,12 +975,12 @@ static int mdd_hsm_update_locked(const struct lu_env *env,
                RETURN(-ENOMEM);
 
        /* Read HSM attrs from disk */
-       current_buf->lb_buf = info->mti_xattr_buf;
-       current_buf->lb_len = sizeof(info->mti_xattr_buf);
        CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       current_buf = mdd_buf_get(env, info->mti_xattr_buf,
+                                 sizeof(info->mti_xattr_buf));
        rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
                           mdd_object_capa(env, mdd_obj));
-       rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
+       rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
        if (rc < 0 && rc != -ENODATA)
                GOTO(free, rc);
        else if (rc == -ENODATA)
@@ -1010,7 +1010,6 @@ free:
        return(rc);
 }
 
-
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1159,11 +1158,11 @@ stop:
  * read lov EA of an object
  * return the lov EA in an allocated lu_buf
  */
-static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
-                                    struct mdd_object *obj)
+static int mdd_get_lov_ea(const struct lu_env *env,
+                         struct mdd_object *obj,
+                         struct lu_buf *lmm_buf)
 {
        struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
-       struct lu_buf   *lmm_buf = NULL;
        int              rc, sz;
        ENTRY;
 
@@ -1198,28 +1197,46 @@ repeat:
                goto repeat;
        }
 
-       OBD_ALLOC_PTR(lmm_buf);
-       if (!lmm_buf)
+       lu_buf_alloc(lmm_buf, sz);
+       if (lmm_buf->lb_buf == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       OBD_ALLOC(lmm_buf->lb_buf, sz);
-       if (!lmm_buf->lb_buf)
-               GOTO(free, rc = -ENOMEM);
-
        memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
-       lmm_buf->lb_len = sz;
-
-       GOTO(out, rc = 0);
+       rc = 0;
+       EXIT;
 
-free:
-       if (lmm_buf)
-               OBD_FREE_PTR(lmm_buf);
 out:
-       if (rc)
-               return ERR_PTR(rc);
-       return lmm_buf;
+       if (rc < 0)
+               lu_buf_free(lmm_buf);
+       return rc;
 }
 
+static int mdd_xattr_hsm_replace(const struct lu_env *env,
+                                struct mdd_object *o, struct lu_buf *buf,
+                                struct thandle *handle)
+{
+       struct hsm_attrs *attrs;
+       __u32 hsm_flags;
+       int flags = 0;
+       int rc;
+       ENTRY;
+
+       rc = mdo_xattr_set(env, o, buf, XATTR_NAME_HSM, LU_XATTR_REPLACE,
+                          handle, mdd_object_capa(env, o));
+       if (rc != 0)
+               RETURN(rc);
+
+       attrs = buf->lb_buf;
+       hsm_flags = le32_to_cpu(attrs->hsm_flags);
+       if (!(hsm_flags & HS_RELEASED) || mdd_is_dead_obj(o))
+               RETURN(0);
+
+       /* Add a changelog record for release. */
+       hsm_set_cl_event(&flags, HE_RELEASE);
+       rc = mdd_changelog_data_store(env, mdo2mdd(&o->mod_obj), CL_HSM,
+                                     flags, o, handle);
+       RETURN(rc);
+}
 
 /*
  *  check if layout swapping between 2 objects is allowed
@@ -1268,36 +1285,38 @@ static int mdd_layout_swap_allowed(const struct lu_env *env,
 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                            struct md_object *obj2, __u64 flags)
 {
-       struct mdd_object       *o1, *o2, *fst_o, *snd_o;
-       struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
-       struct lu_buf           *fst_buf, *snd_buf;
-       struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
-       struct thandle          *handle;
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct mdd_object       *fst_o = md2mdd_obj(obj1);
+       struct mdd_object       *snd_o = md2mdd_obj(obj2);
        struct mdd_device       *mdd = mdo2mdd(obj1);
-       int                      rc;
+       struct lov_mds_md       *fst_lmm, *snd_lmm;
+       struct lu_buf           *fst_buf = &info->mti_buf[0];
+       struct lu_buf           *snd_buf = &info->mti_buf[1];
+       struct lu_buf           *fst_hsm_buf = &info->mti_buf[2];
+       struct lu_buf           *snd_hsm_buf = &info->mti_buf[3];
+       struct ost_id           *saved_oi = NULL;
+       struct thandle          *handle;
        __u16                    fst_gen, snd_gen;
        int                      fst_fl;
+       int                      rc;
+       int                      rc2;
        ENTRY;
 
+       CLASSERT(ARRAY_SIZE(info->mti_buf) >= 4);
+       memset(info->mti_buf, 0, sizeof(info->mti_buf));
+
        /* we have to sort the 2 obj, so locking will always
         * be in the same order, even in case of 2 concurrent swaps */
-       rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
-                       mdo2fid(md2mdd_obj(obj2)));
-       /* same fid ? */
-       if (rc == 0)
+       rc = lu_fid_cmp(mdo2fid(fst_o), mdo2fid(snd_o));
+       if (rc == 0) /* same fid ? */
                RETURN(-EPERM);
 
-       if (rc > 0) {
-               o1 = md2mdd_obj(obj1);
-               o2 = md2mdd_obj(obj2);
-       } else {
-               o1 = md2mdd_obj(obj2);
-               o2 = md2mdd_obj(obj1);
-       }
+       if (rc < 0)
+               swap(fst_o, snd_o);
 
        /* check if layout swapping is allowed */
-       rc = mdd_layout_swap_allowed(env, o1, o2);
-       if (rc)
+       rc = mdd_layout_swap_allowed(env, fst_o, snd_o);
+       if (rc != 0)
                RETURN(rc);
 
        handle = mdd_trans_create(env, mdd);
@@ -1305,45 +1324,30 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                RETURN(PTR_ERR(handle));
 
        /* objects are already sorted */
-       mdd_write_lock(env, o1, MOR_TGT_CHILD);
-       mdd_write_lock(env, o2, MOR_TGT_CHILD);
-
-       lmm1_buf = mdd_get_lov_ea(env, o1);
-       if (IS_ERR(lmm1_buf)) {
-               rc = PTR_ERR(lmm1_buf);
-               lmm1_buf = NULL;
-               if (rc != -ENODATA)
-                       GOTO(stop, rc);
-       }
+       mdd_write_lock(env, fst_o, MOR_TGT_CHILD);
+       mdd_write_lock(env, snd_o, MOR_TGT_CHILD);
 
-       lmm2_buf = mdd_get_lov_ea(env, o2);
-       if (IS_ERR(lmm2_buf)) {
-               rc = PTR_ERR(lmm2_buf);
-               lmm2_buf = NULL;
-               if (rc != -ENODATA)
-                       GOTO(stop, rc);
-       }
+       rc = mdd_get_lov_ea(env, fst_o, fst_buf);
+       if (rc < 0 && rc != -ENODATA)
+               GOTO(stop, rc);
+
+       rc = mdd_get_lov_ea(env, snd_o, snd_buf);
+       if (rc < 0 && rc != -ENODATA)
+               GOTO(stop, rc);
 
        /* swapping 2 non existant layouts is a success */
-       if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
+       if (fst_buf->lb_buf == NULL && snd_buf->lb_buf == NULL)
                GOTO(stop, rc = 0);
 
        /* to help inode migration between MDT, it is better to
         * start by the no layout file (if one), so we order the swap */
-       if (lmm1_buf == NULL) {
-               fst_o = o1;
-               fst_buf = lmm1_buf;
-               snd_o = o2;
-               snd_buf = lmm2_buf;
-       } else {
-               fst_o = o2;
-               fst_buf = lmm2_buf;
-               snd_o = o1;
-               snd_buf = lmm1_buf;
+       if (snd_buf->lb_buf == NULL) {
+               swap(fst_o, snd_o);
+               swap(fst_buf, snd_buf);
        }
 
        /* lmm and generation layout initialization */
-       if (fst_buf) {
+       if (fst_buf->lb_buf != NULL) {
                fst_lmm = fst_buf->lb_buf;
                fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
                fst_fl  = LU_XATTR_REPLACE;
@@ -1353,7 +1357,7 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                fst_fl  = LU_XATTR_CREATE;
        }
 
-       LASSERT(snd_buf != NULL);
+       LASSERT(snd_buf->lb_buf != NULL);
        snd_lmm = snd_buf->lb_buf;
        snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
 
@@ -1362,18 +1366,13 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
        fst_gen++;
 
        /* set the file specific informations in lmm */
-       if (fst_lmm) {
-               /* save the orignal lmm common header of first file
-                * to be able to roll back */
-               OBD_ALLOC_PTR(old_fst_lmm);
-               if (old_fst_lmm == NULL)
-                       GOTO(stop, rc = -ENOMEM);
-               *old_fst_lmm = *fst_lmm;
+       if (fst_lmm != NULL) {
+               saved_oi = &info->mti_oa.o_oi;
 
+               *saved_oi = fst_lmm->lmm_oi;
                fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
                fst_lmm->lmm_oi = snd_lmm->lmm_oi;
-
-               snd_lmm->lmm_oi = old_fst_lmm->lmm_oi;
+               snd_lmm->lmm_oi = *saved_oi;
        } else {
                if (snd_lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1))
                        snd_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
@@ -1382,52 +1381,98 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                else
                        GOTO(stop, rc = -EPROTO);
        }
-
        snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
 
+       /* Prepare HSM attribute if it's required */
+       if (flags & SWAP_LAYOUTS_MDS_HSM) {
+               const int buflen = sizeof(struct hsm_attrs);
+
+               lu_buf_alloc(fst_hsm_buf, buflen);
+               lu_buf_alloc(snd_hsm_buf, buflen);
+               if (fst_hsm_buf->lb_buf == NULL || snd_hsm_buf->lb_buf == NULL)
+                       GOTO(stop, rc = -ENOMEM);
+
+               /* Read HSM attribute */
+               rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM,
+                                  BYPASS_CAPA);
+               if (rc < 0)
+                       GOTO(stop, rc);
+
+               rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM,
+                                  BYPASS_CAPA);
+               if (rc < 0)
+                       GOTO(stop, rc);
+
+               rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_hsm_buf,
+                                          XATTR_NAME_HSM, LU_XATTR_REPLACE,
+                                          handle);
+               if (rc < 0)
+                       GOTO(stop, rc);
+
+               rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_hsm_buf,
+                                          XATTR_NAME_HSM, LU_XATTR_REPLACE,
+                                          handle);
+               if (rc < 0)
+                       GOTO(stop, rc);
+       }
+
        /* prepare transaction */
        rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
                                   fst_fl, handle);
-       if (rc)
+       if (rc != 0)
                GOTO(stop, rc);
 
-       if (fst_buf)
+       if (fst_buf->lb_buf != NULL)
                rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf,
                                           XATTR_NAME_LOV, LU_XATTR_REPLACE,
                                           handle);
        else
                rc = mdd_declare_xattr_del(env, mdd, snd_o, XATTR_NAME_LOV,
                                           handle);
-       if (rc)
+       if (rc != 0)
                GOTO(stop, rc);
 
        rc = mdd_trans_start(env, mdd, handle);
-       if (rc)
+       if (rc != 0)
                GOTO(stop, rc);
 
+       if (flags & SWAP_LAYOUTS_MDS_HSM) {
+               rc = mdd_xattr_hsm_replace(env, fst_o, snd_hsm_buf, handle);
+               if (rc < 0)
+                       GOTO(stop, rc);
+
+               rc = mdd_xattr_hsm_replace(env, snd_o, fst_hsm_buf, handle);
+               if (rc < 0) {
+                       rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf,
+                                                   handle);
+                       if (rc2 < 0)
+                               CERROR("%s: restore "DFID" HSM error: %d/%d\n",
+                                      mdd_obj_dev_name(fst_o),
+                                      PFID(mdo2fid(fst_o)), rc, rc2);
+                       GOTO(stop, rc);
+               }
+       }
+
        rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle,
                           mdd_object_capa(env, fst_o));
-       if (rc)
+       if (rc != 0)
                GOTO(stop, rc);
 
-       if (fst_buf)
+       if (fst_buf->lb_buf != NULL)
                rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
                                   LU_XATTR_REPLACE, handle,
                                   mdd_object_capa(env, snd_o));
        else
                rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle,
                                   mdd_object_capa(env, snd_o));
-       if (rc) {
-               int     rc2;
+       if (rc != 0) {
+               int steps = 0;
 
                /* failure on second file, but first was done, so we have
-                * to roll back first */
-               /* restore object_id, object_seq and generation number
-                * on first file */
-               if (fst_lmm) {
-                       LASSERT(old_fst_lmm != NULL);
-                       fst_lmm->lmm_oi = old_fst_lmm->lmm_oi;
-                       fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
+                * to roll back first. */
+               if (fst_buf->lb_buf != NULL) {
+                       fst_lmm->lmm_oi = *saved_oi;
+                       fst_lmm->lmm_layout_gen = cpu_to_le16(fst_gen - 1);
                        rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
                                            LU_XATTR_REPLACE, handle,
                                            mdd_object_capa(env, fst_o));
@@ -1435,15 +1480,25 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                        rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle,
                                            mdd_object_capa(env, fst_o));
                }
+               if (rc2 < 0)
+                       goto do_lbug;
+
+               ++steps;
+               rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf, handle);
+               if (rc2 < 0)
+                       goto do_lbug;
 
-               if (rc2) {
+               ++steps;
+               rc2 = mdd_xattr_hsm_replace(env, snd_o, snd_hsm_buf, handle);
+
+       do_lbug:
+               if (rc2 < 0) {
                        /* very bad day */
-                       CERROR("%s: unable to roll back after swap layouts"
-                              " failure between "DFID" and "DFID
-                              " rc2 = %d rc = %d)\n",
-                              mdd2obd_dev(mdd)->obd_name,
+                       CERROR("%s: unable to roll back layout swap. FIDs: "
+                              DFID" and "DFID "error: %d/%d, steps: %d\n",
+                              mdd_obj_dev_name(fst_o),
                               PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
-                              rc2, rc);
+                              rc, rc2, steps);
                        /* a solution to avoid journal commit is to panic,
                         * but it has strong consequences so we use LBUG to
                         * allow sysdamin to choose to panic or not
@@ -1465,22 +1520,13 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
 
 stop:
        mdd_trans_stop(env, mdd, rc, handle);
-       mdd_write_unlock(env, o2);
-       mdd_write_unlock(env, o1);
-
-       if (lmm1_buf && lmm1_buf->lb_buf)
-               OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
-       if (lmm1_buf)
-               OBD_FREE_PTR(lmm1_buf);
-
-       if (lmm2_buf && lmm2_buf->lb_buf)
-               OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
-       if (lmm2_buf)
-               OBD_FREE_PTR(lmm2_buf);
-
-       if (old_fst_lmm)
-               OBD_FREE_PTR(old_fst_lmm);
+       mdd_write_unlock(env, snd_o);
+       mdd_write_unlock(env, fst_o);
 
+       lu_buf_free(fst_buf);
+       lu_buf_free(snd_buf);
+       lu_buf_free(fst_hsm_buf);
+       lu_buf_free(snd_hsm_buf);
        return rc;
 }
 
index 474848f..becdb15 100644 (file)
@@ -1014,6 +1014,7 @@ static inline int mdt_hsm_cdt_init(struct mdt_device *mdt)
        sema_init(&cdt->cdt_llog_lock, 1);
        init_rwsem(&cdt->cdt_agent_lock);
        init_rwsem(&cdt->cdt_request_lock);
+       sema_init(&cdt->cdt_restore_lock, 1);
 
        CFS_INIT_LIST_HEAD(&cdt->cdt_requests);
        CFS_INIT_LIST_HEAD(&cdt->cdt_agents);
index 4eec5f2..92ab613 100644 (file)
@@ -813,11 +813,16 @@ static int mdt_setattr_unpack_rec(struct mdt_thread_info *info)
        else
                ma->ma_attr_flags &= ~MDS_DATA_MODIFIED;
 
-        if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
-                mdt_set_capainfo(info, 0, rr->rr_fid1,
-                                 req_capsule_client_get(pill, &RMF_CAPA1));
+       if (rec->sa_bias & MDS_HSM_RELEASE)
+               ma->ma_attr_flags |= MDS_HSM_RELEASE;
+       else
+               ma->ma_attr_flags &= ~MDS_HSM_RELEASE;
 
-        RETURN(0);
+       if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
+               mdt_set_capainfo(info, 0, rr->rr_fid1,
+                                req_capsule_client_get(pill, &RMF_CAPA1));
+
+       RETURN(0);
 }
 
 static int mdt_ioepoch_unpack(struct mdt_thread_info *info)
@@ -875,6 +880,24 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
+static int mdt_hsm_release_unpack(struct mdt_thread_info *info)
+{
+       struct md_attr          *ma = &info->mti_attr;
+       struct req_capsule      *pill = info->mti_pill;
+       ENTRY;
+
+       if (!(ma->ma_attr_flags & MDS_HSM_RELEASE))
+               RETURN(0);
+
+       req_capsule_extend(pill, &RQF_MDS_RELEASE_CLOSE);
+
+       if (!(req_capsule_has_field(pill, &RMF_CLOSE_DATA, RCL_CLIENT) &&
+           req_capsule_field_present(pill, &RMF_CLOSE_DATA, RCL_CLIENT)))
+               RETURN(-EFAULT);
+
+       RETURN(0);
+}
+
 int mdt_close_unpack(struct mdt_thread_info *info)
 {
         int rc;
@@ -887,6 +910,11 @@ int mdt_close_unpack(struct mdt_thread_info *info)
        rc = mdt_setattr_unpack_rec(info);
        if (rc)
                RETURN(rc);
+
+       rc = mdt_hsm_release_unpack(info);
+       if (rc)
+               RETURN(rc);
+
        RETURN(mdt_init_ucred_reint(info));
 }
 
index 2e90841..6bc4f59 100644 (file)
@@ -655,7 +655,7 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
 {
        LASSERT(mfd != NULL);
 
-       CDEBUG(D_HA, DFID "Change mfd mode 0x%Lx->0x%Lx\n",
+       CDEBUG(D_HA, DFID " Change mfd mode "LPO64" -> "LPO64".\n",
               PFID(mdt_object_fid(mfd->mfd_object)), mfd->mfd_mode, mode);
 
        mfd->mfd_mode = mode;
@@ -677,7 +677,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
 
         isreg = S_ISREG(la->la_mode);
         isdir = S_ISDIR(la->la_mode);
-        if (isreg && !(ma->ma_valid & MA_LOV)) {
+       if (isreg && !(ma->ma_valid & MA_LOV) && !(flags & MDS_OPEN_RELEASE)) {
                 /*
                  * No EA, check whether it is will set regEA and dirEA since in
                  * above attr get, these size might be zero, so reset it, to
@@ -1375,6 +1375,23 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
        RETURN_EXIT;
 }
 
+/**
+ * Check release is permitted for the current HSM flags.
+ */
+static bool mdt_hsm_release_allow(struct md_attr *ma)
+{
+       if (!(ma->ma_valid & MA_HSM))
+               return false;
+
+       if (ma->ma_hsm.mh_flags & (HS_DIRTY|HS_NORELEASE|HS_LOST))
+               return false;
+
+       if (!(ma->ma_hsm.mh_flags & HS_ARCHIVED))
+               return false;
+
+       return true;
+}
+
 int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
                         struct mdt_lock_handle *lhc)
 {
@@ -1423,9 +1440,16 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
 
        mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
+       if (flags & MDS_OPEN_RELEASE)
+               ma->ma_need |= MA_HSM;
        rc = mdt_attr_get_complex(info, o, ma);
-        if (rc)
-                GOTO(out, rc);
+       if (rc)
+               GOTO(out, rc);
+
+       /* If a release request, check file flags are fine and ask for an
+        * exclusive open access. */
+       if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
+               GOTO(out, rc = -EPERM);
 
        rc = mdt_object_open_lock(info, o, lhc, &ibits);
         if (rc)
@@ -1814,6 +1838,15 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                result = rc;
                /* openlock will be released if mdt_finish_open failed */
                mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+
+               if (created && create_flags & MDS_OPEN_VOLATILE) {
+                       CERROR("%s: cannot open volatile file "DFID", orphan "
+                              "file will be left in PENDING directory until "
+                              "next reboot, rc = %d\n", mdt_obd_name(mdt),
+                              PFID(mdt_object_fid(child)), rc);
+                       GOTO(out_child_unlock, result);
+               }
+
                if (created) {
                        ma->ma_need = 0;
                        ma->ma_valid = 0;
@@ -1843,6 +1876,240 @@ out:
        return result;
 }
 
+/**
+ * Create an orphan object use local root.
+ */
+static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
+                                         struct mdt_device *mdt,
+                                         const struct lu_fid *fid,
+                                         struct md_attr *attr, fmode_t fmode)
+{
+       const struct lu_env *env = info->mti_env;
+       struct md_op_spec *spec = &info->mti_spec;
+       struct lu_fid *rootfid = &info->mti_tmp_fid1;
+       struct mdt_object *obj = NULL;
+       struct mdt_object *local_root;
+       static const char name[] = "i_am_nobody";
+       struct lu_name *lname;
+       int rc;
+       ENTRY;
+
+       rc = dt_root_get(env, mdt->mdt_bottom, rootfid);
+       if (rc != 0)
+               RETURN(ERR_PTR(rc));
+
+       local_root = mdt_object_find(env, mdt, rootfid);
+       if (IS_ERR(local_root))
+               RETURN(local_root);
+
+       obj = mdt_object_new(env, mdt, fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       spec->sp_cr_lookup = 0;
+       spec->sp_feat = &dt_directory_features;
+       spec->sp_cr_mode = MDL_MINMODE; /* no lock */
+       spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
+       if (attr->ma_valid & MA_LOV) {
+               spec->u.sp_ea.eadata = attr->ma_lmm;
+               spec->u.sp_ea.eadatalen = attr->ma_lmm_size;
+               spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+       } else {
+               spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+       }
+
+       lname = mdt_name(env, (char *)name, sizeof(name) - 1);
+       rc = mdo_create(env, mdt_object_child(local_root), lname,
+                       mdt_object_child(obj), spec, attr);
+       if (rc == 0) {
+               rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
+               if (rc < 0)
+                       CERROR("%s: cannot open volatile file "DFID", orphan "
+                              "file will be left in PENDING directory until "
+                              "next reboot, rc = %d\n", mdt_obd_name(mdt),
+                              PFID(fid), rc);
+       }
+       EXIT;
+
+out:
+       if (rc < 0) {
+               if (!IS_ERR(obj))
+                       mdt_object_put(env, obj);
+               obj = ERR_PTR(rc);
+       }
+       mdt_object_put(env, local_root);
+       return obj;
+}
+
+static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct md_attr *ma)
+{
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LAYOUT];
+       struct close_data      *data;
+       struct ldlm_lock       *lease;
+       struct mdt_object      *orphan;
+       struct md_attr         *orp_ma;
+       struct lu_buf          *buf;
+       bool                    lease_broken;
+       int                     rc;
+       int                     rc2;
+       ENTRY;
+
+       data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+       if (data == NULL)
+               RETURN(-EPROTO);
+
+       lease = ldlm_handle2lock(&data->cd_handle);
+       if (lease == NULL)
+               RETURN(-ESTALE);
+
+       /* try to hold open_sem so that nobody else can open the file */
+       if (!down_write_trylock(&o->mot_open_sem)) {
+               ldlm_lock_cancel(lease);
+               ldlm_lock_put(lease);
+               RETURN(-EBUSY);
+       }
+
+       /* Check if the lease open lease has already canceled */
+       lock_res_and_lock(lease);
+       lease_broken = ldlm_is_cancel(lease);
+       unlock_res_and_lock(lease);
+
+       LDLM_DEBUG(lease, DFID " lease broken? %d\n",
+                  PFID(mdt_object_fid(o)), lease_broken);
+
+       /* Cancel server side lease. Client side counterpart should
+        * have been cancelled. It's okay to cancel it now as we've
+        * held mot_open_sem. */
+       ldlm_lock_cancel(lease);
+       ldlm_lock_put(lease);
+
+       if (lease_broken) /* don't perform release task */
+               GOTO(out_unlock, rc = -ESTALE);
+
+       if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
+               GOTO(out_unlock, rc = -EINVAL);
+
+       /* ma_need was set before but it seems fine to change it in order to
+        * avoid modifying the one from RPC */
+       ma->ma_need = MA_HSM | MA_LOV;
+       rc = mdt_attr_get_complex(info, o, ma);
+       if (rc != 0)
+               GOTO(out_unlock, rc);
+
+       if (!mdt_hsm_release_allow(ma))
+               GOTO(out_unlock, rc = -EPERM);
+
+       /* already released? */
+       if (ma->ma_hsm.mh_flags & HS_RELEASED)
+               GOTO(out_unlock, rc = 0);
+
+       /* Compare on-disk and packed data_version */
+       if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
+               CDEBUG(D_HSM, DFID" data_version mismatches: packed="LPU64
+                      " and on-disk="LPU64"\n", PFID(mdt_object_fid(o)),
+                      data->cd_data_version, ma->ma_hsm.mh_arch_ver);
+               /* XXX: Enable this line when hsm_archive is operational!
+               GOTO(out_unlock, rc = -EPERM);
+               */
+       }
+
+       ma->ma_valid = MA_INODE;
+       ma->ma_attr.la_valid &= LA_SIZE | LA_MTIME | LA_ATIME;
+       rc = mo_attr_set(info->mti_env, mdt_object_child(o), ma);
+       if (rc < 0)
+               GOTO(out_unlock, rc);
+
+       if (!(ma->ma_valid & MA_LOV)) {
+               /* Even empty file are released */
+               memset(ma->ma_lmm, 0, sizeof(*ma->ma_lmm));
+               ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+               ma->ma_lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+               ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE);
+               ma->ma_valid |= MA_LOV;
+       } else {
+               /* Magic must be LOV_MAGIC_Vx_DEF otherwise LOD will interpret
+                * ma_lmm as lov_user_md, then it will be confused by union of
+                * layout_gen and stripe_offset. */
+               if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1)
+                       ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+               else if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3)
+                       ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF);
+               else
+                       GOTO(out_unlock, rc = -EINVAL);
+       }
+
+       /* Set file as released */
+       ma->ma_lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
+
+       /* Hopefully it's not used in this call path */
+       orp_ma = &info->mti_u.som.attr;
+       orp_ma->ma_valid = MA_INODE | MA_LOV;
+       orp_ma->ma_attr.la_mode = S_IFREG;
+       orp_ma->ma_attr.la_valid = LA_MODE;
+       orp_ma->ma_lmm = ma->ma_lmm;
+       orp_ma->ma_lmm_size = ma->ma_lmm_size;
+       orphan = mdt_orphan_open(info, info->mti_mdt, &data->cd_fid, orp_ma,
+                                FMODE_WRITE);
+       if (IS_ERR(orphan)) {
+               CERROR("%s: cannot open orphan file "DFID": rc = %ld\n",
+                      mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid),
+                      PTR_ERR(orphan));
+               GOTO(out_unlock, rc = PTR_ERR(orphan));
+       }
+
+       /* Set up HSM attribute for orphan object */
+       CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       buf = &info->mti_buf;
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(struct hsm_attrs);
+       ma->ma_hsm.mh_flags |= HS_RELEASED;
+       lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
+       ma->ma_hsm.mh_flags &= ~HS_RELEASED;
+       rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
+                         XATTR_NAME_HSM, 0);
+       if (rc < 0)
+               GOTO(out_close, rc);
+
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT, MDT_LOCAL_LOCK);
+       if (rc == 0) {
+               /* Swap layout with orphan object */
+               rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
+                                    mdt_object_child(orphan),
+                                    SWAP_LAYOUTS_MDS_HSM);
+
+               /* Release exclusive LL */
+               mdt_object_unlock(info, o, lh, 1);
+       }
+       EXIT;
+
+out_close:
+       /* Close orphan object anyway */
+       rc2 = mo_close(info->mti_env, mdt_object_child(orphan), orp_ma,
+                      FMODE_WRITE);
+       if (rc2 < 0)
+               CERROR("%s: error closing volatile file "DFID": rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
+       LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
+                       "object closed\n");
+       mdt_object_put(info->mti_env, orphan);
+
+out_unlock:
+       up_write(&o->mot_open_sem);
+
+       if (rc == 0) { /* already released */
+               struct mdt_body *repbody;
+               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+               LASSERT(repbody != NULL);
+               repbody->valid |= OBD_MD_FLRELEASED;
+       }
+
+       ma->ma_valid = 0;
+       ma->ma_need = 0;
+       return rc;
+}
+
 #define MFD_CLOSED(mode) (((mode) & ~(MDS_FMODE_EPOCH | MDS_FMODE_SOM | \
                                       MDS_FMODE_TRUNC)) == MDS_FMODE_CLOSED)
 
@@ -1863,6 +2130,16 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
 
         mode = mfd->mfd_mode;
 
+       if (ma->ma_attr_flags & MDS_HSM_RELEASE) {
+               rc = mdt_hsm_release(info, o, ma);
+               if (rc < 0) {
+                       CDEBUG(D_HSM, "%s: File " DFID " release failed: %d\n",
+                               mdt_obd_name(info->mti_mdt),
+                               PFID(mdt_object_fid(o)), rc);
+                       /* continue to close even error occurred. */
+               }
+       }
+
         if ((mode & FMODE_WRITE) || (mode & MDS_FMODE_TRUNC)) {
                 mdt_write_put(o);
                 ret = mdt_ioepoch_close(info, o);
index c16d05b..9358419 100644 (file)
@@ -149,6 +149,14 @@ static const struct req_msg_field *mdt_close_client[] = {
         &RMF_CAPA1
 };
 
+static const struct req_msg_field *mdt_release_close_client[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_MDT_EPOCH,
+       &RMF_REC_REINT,
+       &RMF_CAPA1,
+       &RMF_CLOSE_DATA
+};
+
 static const struct req_msg_field *obd_statfs_server[] = {
         &RMF_PTLRPC_BODY,
         &RMF_OBD_STATFS
@@ -670,6 +678,7 @@ static struct req_format *req_formats[] = {
         &RQF_MDS_GETXATTR,
         &RQF_MDS_SYNC,
         &RQF_MDS_CLOSE,
+       &RQF_MDS_RELEASE_CLOSE,
         &RQF_MDS_PIN,
         &RQF_MDS_UNPIN,
         &RQF_MDS_READPAGE,
@@ -889,6 +898,11 @@ struct req_msg_field RMF_PTLRPC_BODY =
                     sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body, NULL);
 EXPORT_SYMBOL(RMF_PTLRPC_BODY);
 
+struct req_msg_field RMF_CLOSE_DATA =
+       DEFINE_MSGF("data_version", 0,
+                   sizeof(struct close_data), lustre_swab_close_data, NULL);
+EXPORT_SYMBOL(RMF_CLOSE_DATA);
+
 struct req_msg_field RMF_OBD_STATFS =
         DEFINE_MSGF("obd_statfs", 0,
                     sizeof(struct obd_statfs), lustre_swab_obd_statfs, NULL);
@@ -1416,6 +1430,11 @@ struct req_format RQF_MDS_CLOSE =
                         mdt_close_client, mds_last_unlink_server);
 EXPORT_SYMBOL(RQF_MDS_CLOSE);
 
+struct req_format RQF_MDS_RELEASE_CLOSE =
+       DEFINE_REQ_FMT0("MDS_CLOSE",
+                       mdt_release_close_client, mds_last_unlink_server);
+EXPORT_SYMBOL(RQF_MDS_RELEASE_CLOSE);
+
 struct req_format RQF_MDS_PIN =
         DEFINE_REQ_FMT0("MDS_PIN",
                         mdt_body_capa, mdt_body_only);
index 09ecfd9..431fb90 100644 (file)
@@ -2580,3 +2580,10 @@ void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl)
        __swab64s(&msl->msl_flags);
 }
 EXPORT_SYMBOL(lustre_swab_swap_layouts);
+
+void lustre_swab_close_data(struct close_data *cd)
+{
+       lustre_swab_lu_fid(&cd->cd_fid);
+       __swab64s(&cd->cd_data_version);
+}
+EXPORT_SYMBOL(lustre_swab_close_data);
index 0cf3355..fa2845c 100644 (file)
@@ -24,6 +24,7 @@ ALWAYS_EXCEPT="$SANITY_HSM_EXCEPT"
 TMP=${TMP:-/tmp}
 
 ORIG_PWD=${PWD}
+MCREATE=${MCREATE:-mcreate}
 
 LUSTRE=${LUSTRE:-$(cd $(dirname $0)/..; echo $PWD)}
 . $LUSTRE/tests/test-framework.sh
@@ -50,6 +51,37 @@ assert_DIR
 
 build_test_filter
 
+# $RUNAS_ID may get set incorrectly somewhere else
+[ $UID -eq 0 -a $RUNAS_ID -eq 0 ] &&
+       error "\$RUNAS_ID set to 0, but \$UID is also 0!"
+
+check_runas_id $RUNAS_ID $RUNAS_GID $RUNAS
+
+copytool_cleanup() {
+       # TODO: add copytool cleanup code here!
+       return
+}
+
+copytool_setup() {
+       # TODO: add copytool setup code here!
+       return
+}
+
+fail() {
+       copytool_cleanup
+       error $*
+}
+
+path2fid() {
+       $LFS path2fid $1 | tr -d '[]'
+}
+
+make_small() {
+       local file2=${1/$DIR/$DIR}
+       dd if=/dev/urandom of=$file2 count=2 bs=1M
+               path2fid $1
+}
+
 test_1() {
        mkdir -p $DIR/$tdir
        chmod 777 $DIR/$tdir
@@ -208,6 +240,191 @@ test_3() {
 }
 run_test 3 "Check file dirtyness when opening for write"
 
+test_20() {
+       mkdir -p $DIR/$tdir
+
+       local f=$DIR/$tdir/sample
+       touch $f
+
+       # Could not release a non-archived file
+       $LFS hsm_release $f && error "release should not succeed"
+
+       # For following tests, we must test them with HS_ARCHIVED set
+       $LFS hsm_set --exists --archived $f || error "could not add flag"
+
+       # Could not release a file if no-release is set
+       $LFS hsm_set --norelease $f || error "could not add flag"
+       $LFS hsm_release $f && error "release should not succeed"
+       $LFS hsm_clear --norelease $f || error "could not remove flag"
+
+       # Could not release a file if lost
+       $LFS hsm_set --lost $f || error "could not add flag"
+       $LFS hsm_release $f && error "release should not succeed"
+       $LFS hsm_clear --lost $f || error "could not remove flag"
+
+       # Could not release a file if dirty
+       $LFS hsm_set --dirty $f || error "could not add flag"
+       $LFS hsm_release $f && error "release should not succeed"
+       $LFS hsm_clear --dirty $f || error "could not remove flag"
+
+}
+run_test 20 "Release is not permitted"
+
+test_21() {
+       # test needs a running copytool
+       copytool_setup
+
+       mkdir -p $DIR/$tdir
+       local f=$DIR/$tdir/test_release
+
+       # Create a file and check its states
+       local fid=$(make_small $f)
+       $LFS hsm_state $f | grep -q " (0x00000000)" ||
+               fail "wrong clean hsm state"
+
+#      $LFS hsm_archive $f || fail "could not archive file"
+#      wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+       [ $(stat -c "%b" $f) -ne "0" ] || fail "wrong block number"
+       local sz=$(stat -c "%s" $f)
+       [ $sz -ne "0" ] || fail "file size should not be zero"
+
+       # Release and check states
+       $LFS hsm_release $f || fail "could not release file"
+       $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+               fail "wrong released hsm state"
+       [ $(stat -c "%b" $f) -eq "0" ] || fail "wrong block number"
+       [ $(stat -c "%s" $f) -eq $sz ] || fail "wrong file size"
+
+       # Check we can release an file without stripe info
+       f=$f.nolov
+       $MCREATE $f
+       fid=$(path2fid $f)
+       $LFS hsm_state $f | grep -q " (0x00000000)" ||
+               fail "wrong clean hsm state"
+
+#      $LFS hsm_archive $f || fail "could not archive file"
+#      wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+       # Release and check states
+       $LFS hsm_release $f || fail "could not release file"
+       $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+               fail "wrong released hsm state"
+
+       # Release again a file that is already released is OK
+       $LFS hsm_release $f || fail "second release should succeed"
+       $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+               fail "wrong released hsm state"
+
+       copytool_cleanup
+}
+run_test 21 "Simple release tests"
+
+test_22() {
+       # test needs a running copytool
+       copytool_setup
+
+       mkdir -p $DIR/$tdir
+
+       local f=$DIR/$tdir/test_release
+       local swap=$DIR/$tdir/test_swap
+
+       # Create a file and check its states
+       local fid=$(make_small $f)
+       $LFS hsm_state $f | grep -q " (0x00000000)" ||
+               fail "wrong clean hsm state"
+
+#      $LFS hsm_archive $f || fail "could not archive file"
+#      wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+       # Release and check states
+       $LFS hsm_release $f || fail "could not release file"
+       $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+               fail "wrong released hsm state"
+
+       make_small $swap || fail "could not create $swap"
+       $LFS swap_layouts $swap $f && fail "swap_layouts should failed"
+
+       true
+       copytool_cleanup
+}
+run_test 22 "Could not swap a release file"
+
+
+test_23() {
+       # test needs a running copytool
+       copytool_setup
+
+       mkdir -p $DIR/$tdir
+
+       local f=$DIR/$tdir/test_mtime
+
+       # Create a file and check its states
+       local fid=$(make_small $f)
+       $LFS hsm_state $f | grep -q " (0x00000000)"  ||
+               fail "wrong clean hsm state"
+
+#      $LFS hsm_archive $f || fail "could not archive file"
+#      wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+       # Set modification time in the past
+       touch -m -a -d @978261179 $f
+
+       # Release and check states
+       $LFS hsm_release $f || fail "could not release file"
+       $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+               fail "wrong released hsm state"
+       local MTIME=$(stat -c "%Y" $f)
+       local ATIME=$(stat -c "%X" $f)
+       [ $MTIME -eq "978261179" ] || fail "bad mtime: $MTIME"
+       [ $ATIME -eq "978261179" ] || fail "bad atime: $ATIME"
+
+       copytool_cleanup
+}
+run_test 23 "Release does not change a/mtime (utime)"
+
+test_24() {
+       # test needs a running copytool
+       copytool_setup
+
+       mkdir -p $DIR/$tdir
+
+       local f=$DIR/$tdir/test_mtime
+
+       # Create a file and check its states
+       local fid=$(make_small $f)
+       $LFS hsm_state $f | grep -q " (0x00000000)" ||
+               fail "wrong clean hsm state"
+
+       # ensure mtime is different
+       sleep 1
+       echo "append" >> $f
+       local MTIME=$(stat -c "%Y" $f)
+       local ATIME=$(stat -c "%X" $f)
+
+#      $LFS hsm_archive $f || fail "could not archive file"
+#      wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+       # Release and check states
+       $LFS hsm_release $f || fail "could not release file"
+       $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+               fail "wrong released hsm state"
+
+       [ "$(stat -c "%Y" $f)" -eq "$MTIME" ] ||
+               fail "mtime should be $MTIME"
+
+#      [ "$(stat -c "%X" $f)" -eq "$ATIME" ] ||
+#              fail "atime should be $ATIME"
+
+       copytool_cleanup
+}
+run_test 24 "Release does not change a/mtime (i/o)"
+
 log "cleanup: ======================================================"
 cd $ORIG_PWD
 check_and_cleanup_lustre
index c90aba5..ddf7d46 100644 (file)
@@ -93,6 +93,7 @@
 #define lustre_swab_hsm_request NULL
 #define lustre_swab_update_buf NULL
 #define lustre_swab_update_reply_buf NULL
+#define lustre_swab_close_data NULL
 
 #define dump_rniobuf NULL
 #define dump_ioo NULL