#define OBD_MD_FLRMTRGETFACL (0x0008000000000000ULL) /* lfs rgetfacl case */
#define OBD_MD_FLDATAVERSION (0x0010000000000000ULL) /* iversion sum */
+#define OBD_MD_FLRELEASED (0x0020000000000000ULL) /* file released */
#define OBD_MD_FLGETATTR (OBD_MD_FLID | OBD_MD_FLATIME | OBD_MD_FLMTIME | \
OBD_MD_FLCTIME | OBD_MD_FLSIZE | OBD_MD_FLBLKSZ | \
* delegation, succeed if it's not
* being opened with conflict mode.
*/
+#define MDS_OPEN_RELEASE 02000000000000ULL /* Open the file for HSM release */
/* permission for create non-directory file */
#define MAY_CREATE (1 << 7)
/* lfs rgetfacl permission check */
#define MAY_RGETFACL (1 << 14)
-enum {
+enum mds_op_bias {
MDS_CHECK_SPLIT = 1 << 0,
MDS_CROSS_REF = 1 << 1,
MDS_VTX_BYPASS = 1 << 2,
MDS_DATA_MODIFIED = 1 << 9,
MDS_CREATE_VOLATILE = 1 << 10,
MDS_OWNEROVERRIDE = 1 << 11,
+ MDS_HSM_RELEASE = 1 << 12,
};
/* instance of mdt_reint_rec */
void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl);
+struct close_data {
+ struct lustre_handle cd_handle;
+ struct lu_fid cd_fid;
+ __u64 cd_data_version;
+ __u64 cd_reserved[8];
+};
+
+void lustre_swab_close_data(struct close_data *data);
+
#endif
/** @} lustreidl */
};
/* swap layout flags */
-#define SWAP_LAYOUTS_CHECK_DV1 (1 << 0)
-#define SWAP_LAYOUTS_CHECK_DV2 (1 << 1)
-#define SWAP_LAYOUTS_KEEP_MTIME (1 << 2)
-#define SWAP_LAYOUTS_KEEP_ATIME (1 << 3)
+#define SWAP_LAYOUTS_CHECK_DV1 (1 << 0)
+#define SWAP_LAYOUTS_CHECK_DV2 (1 << 1)
+#define SWAP_LAYOUTS_KEEP_MTIME (1 << 2)
+#define SWAP_LAYOUTS_KEEP_ATIME (1 << 3)
+
+/* Swap XATTR_NAME_HSM as well, only on the MDT so far */
+#define SWAP_LAYOUTS_MDS_HSM (1 << 31)
struct lustre_swap_layouts {
__u64 sl_flags;
__u32 sl_fd;
*/
extern struct req_format RQF_MDS_GETATTR_NAME;
extern struct req_format RQF_MDS_CLOSE;
+extern struct req_format RQF_MDS_RELEASE_CLOSE;
extern struct req_format RQF_MDS_PIN;
extern struct req_format RQF_MDS_UNPIN;
extern struct req_format RQF_MDS_CONNECT;
extern struct req_msg_field RMF_GETINFO_VALLEN;
extern struct req_msg_field RMF_GETINFO_KEY;
extern struct req_msg_field RMF_IDX_INFO;
+extern struct req_msg_field RMF_CLOSE_DATA;
/*
* connection handle received in MDS_CONNECT request.
#ifdef __KERNEL__
unsigned int op_attr_flags;
#endif
- __u64 op_valid;
- loff_t op_attr_blocks;
+ __u64 op_valid;
+ loff_t op_attr_blocks;
- /* Size-on-MDS epoch and flags. */
- __u64 op_ioepoch;
+ /* Size-on-MDS epoch and flags. */
+ __u64 op_ioepoch;
__u32 op_flags;
- /* Capa fields */
- struct obd_capa *op_capa1;
- struct obd_capa *op_capa2;
+ /* Capa fields */
+ struct obd_capa *op_capa1;
+ struct obd_capa *op_capa2;
- /* Various operation flags. */
- __u32 op_bias;
+ /* Various operation flags. */
+ enum mds_op_bias op_bias;
- /* Operation type */
+ /* Operation type */
__u32 op_opc;
- /* Used by readdir */
+ /* Used by readdir */
__u64 op_offset;
/* Used by readdir */
/* used to transfer info between the stacks of MD client
* see enum op_cli_flags */
__u32 op_cli_flags;
+
+ /* File object data version for HSM release, on client */
+ __u64 op_data_version;
+ struct lustre_handle op_lease_handle;
};
enum op_cli_flags {
io->ci_obj = obj;
io->ci_ignore_layout = 1;
- rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
- if (rc) {
- LASSERT(rc < 0);
- cl_env_put(env, &refcheck);
- return rc;
+ rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+ if (rc) {
+ /* Does not make sense to take GL for released layout */
+ if (rc > 0)
+ rc = -ENOTSUPP;
+ cl_env_put(env, &refcheck);
+ return rc;
}
descr = &ccc_env_info(env)->cti_descr;
RETURN(-EFAULT);
}
- rc = obd_iocontrol(cmd, ll_i2mdexp(inode), totalsize,
- hur, NULL);
+ if (hur->hur_request.hr_action == HUA_RELEASE) {
+ const struct lu_fid *fid;
+ struct inode *f;
+ int i;
+
+ for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
+ fid = &hur->hur_user_item[i].hui_fid;
+ f = search_inode_for_lustre(inode->i_sb, fid);
+ if (IS_ERR(f)) {
+ rc = PTR_ERR(f);
+ break;
+ }
+
+ rc = ll_hsm_release(f);
+ iput(f);
+ if (rc != 0)
+ break;
+ }
+ } else {
+ rc = obd_iocontrol(cmd, ll_i2mdexp(inode), totalsize,
+ hur, NULL);
+ }
OBD_FREE_LARGE(hur, totalsize);
}
static int ll_close_inode_openhandle(struct obd_export *md_exp,
- struct inode *inode,
- struct obd_client_handle *och)
+ struct inode *inode,
+ struct obd_client_handle *och,
+ const __u64 *data_version)
{
struct obd_export *exp = ll_i2mdexp(inode);
struct md_op_data *op_data;
if (op_data == NULL)
GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
- ll_prepare_close(inode, op_data, och);
+ ll_prepare_close(inode, op_data, och);
+ if (data_version != NULL) {
+ /* Pass in data_version implies release. */
+ op_data->op_bias |= MDS_HSM_RELEASE;
+ op_data->op_data_version = *data_version;
+ op_data->op_lease_handle = och->och_lease_handle;
+ op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
+ }
epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
rc = md_close(md_exp, op_data, och->och_mod, &req);
if (rc == -EAGAIN) {
spin_unlock(&lli->lli_lock);
}
- ll_finish_md_op_data(op_data);
-
if (rc == 0) {
rc = ll_objects_destroy(req, inode);
if (rc)
inode->i_ino, rc);
}
+ if (rc == 0 && op_data->op_bias & MDS_HSM_RELEASE) {
+ struct mdt_body *body;
+ body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ if (!(body->valid & OBD_MD_FLRELEASED))
+ rc = -EBUSY;
+ }
+
+ ll_finish_md_op_data(op_data);
EXIT;
out:
if (och) { /* There might be a race and somebody have freed this och
already */
rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
- inode, och);
+ inode, och, NULL);
}
RETURN(rc);
}
if (fd->fd_och != NULL) {
- rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och);
+ rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och, NULL);
fd->fd_och = NULL;
GOTO(out, rc);
}
* Acquire a lease and open the file.
*/
struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
- fmode_t fmode)
+ fmode_t fmode, __u64 open_flags)
{
struct lookup_intent it = { .it_op = IT_OPEN };
struct ll_sb_info *sbi = ll_i2sbi(inode);
/* To tell the MDT this openhandle is from the same owner */
op_data->op_handle = old_handle;
- it.it_flags = fmode | MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
+ it.it_flags = fmode | open_flags;
+ it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, &req,
ll_md_blocking_lease_ast,
/* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
RETURN(och);
out_close:
- rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och);
+ rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och, NULL);
if (rc2)
CERROR("Close openhandle returned %d\n", rc2);
if (lease_broken != NULL)
*lease_broken = cancelled;
- rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och);
+ rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
+ NULL);
RETURN(rc);
}
EXPORT_SYMBOL(ll_lease_close);
ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
- inode, och);
- out:
- /* this one is in place of ll_file_open */
- if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
- ptlrpc_req_finished(it->d.lustre.it_data);
- it_clear_disposition(it, DISP_ENQ_OPEN_REF);
- }
- RETURN(rc);
+ inode, och, NULL);
+out:
+ /* this one is in place of ll_file_open */
+ if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
+ ptlrpc_req_finished(it->d.lustre.it_data);
+ it_clear_disposition(it, DISP_ENQ_OPEN_REF);
+ }
+ RETURN(rc);
}
/**
RETURN(rc);
}
+/*
+ * Trigger a HSM release request for the provided inode.
+ */
+int ll_hsm_release(struct inode *inode)
+{
+ struct cl_env_nest nest;
+ struct lu_env *env;
+ struct obd_client_handle *och = NULL;
+ __u64 data_version = 0;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
+ ll_get_fsname(inode->i_sb, NULL, 0),
+ PFID(&ll_i2info(inode)->lli_fid));
+
+ och = ll_lease_open(inode, NULL, FMODE_WRITE, MDS_OPEN_RELEASE);
+ if (IS_ERR(och))
+ GOTO(out, rc = PTR_ERR(och));
+
+ /* Grab latest data_version and [am]time values */
+ rc = ll_data_version(inode, &data_version, 1);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ env = cl_env_nested_get(&nest);
+ if (IS_ERR(env))
+ GOTO(out, rc = PTR_ERR(env));
+
+ ll_merge_lvb(env, inode);
+ cl_env_nested_put(&nest, env);
+
+ /* Release the file.
+ * NB: lease lock handle is released in mdc_hsm_release_pack() because
+ * we still need it to pack l_remote_handle to MDT. */
+ rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
+ &data_version);
+ och = NULL;
+
+ EXIT;
+out:
+ if (och != NULL && !IS_ERR(och)) /* close the file */
+ ll_lease_close(och, inode, NULL);
+
+ return rc;
+}
+
struct ll_swap_stack {
struct iattr ia1, ia2;
__u64 dv1, dv2;
CDEBUG(D_INODE, "Set lease with mode %d\n", mode);
/* apply for lease */
- och = ll_lease_open(inode, file, mode);
+ och = ll_lease_open(inode, file, mode, 0);
if (IS_ERR(och))
RETURN(PTR_ERR(och));
int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
int ll_fid2path(struct inode *inode, void *arg);
int ll_data_version(struct inode *inode, __u64 *data_version, int extent_lock);
+int ll_hsm_release(struct inode *inode);
struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
- fmode_t mode);
+ fmode_t mode, __u64 flags);
int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
bool *lease_broken);
lli->lli_layout_gen,
conf->u.coc_md->lsm->lsm_layout_gen);
- lli->lli_has_smd = true;
+ lli->lli_has_smd = lsm_has_objects(conf->u.coc_md->lsm);
lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen;
} else {
CDEBUG(D_VFSTRACE, "layout lock destroyed: %u.\n",
struct lod_object *mo,
const struct lu_buf *buf)
{
- struct lod_device *d = lu2lod_dev(lod2lu_obj(mo)->lo_dev);
struct lov_mds_md_v1 *v1 = buf->lb_buf;
struct lov_mds_md_v3 *v3 = buf->lb_buf;
struct lov_ost_data_v1 *objs;
__u32 magic;
- int rc;
+ int rc = 0;
ENTRY;
- rc = lod_verify_striping(d, buf, 1);
- if (rc)
- RETURN(rc);
-
magic = le32_to_cpu(v1->lmm_magic);
if (magic == LOV_MAGIC_V1_DEF) {
objs = &v1->lmm_objects[0];
mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
LASSERT(buf->lb_len >= lov_mds_md_size(mo->ldo_stripenr, magic));
- rc = lod_initialize_objects(env, mo, objs);
+ /* fixup for released file before object initialization */
+ if (mo->ldo_pattern & LOV_PATTERN_F_RELEASED) {
+ mo->ldo_released_stripenr = mo->ldo_stripenr;
+ mo->ldo_stripenr = 0;
+ }
+
+ if (mo->ldo_stripenr > 0)
+ rc = lod_initialize_objects(env, mo, objs);
out:
RETURN(rc);
LLT_NR
};
+static inline char *llt2str(enum lov_layout_type llt)
+{
+ switch (llt) {
+ case LLT_EMPTY:
+ return "EMPTY";
+ case LLT_RAID0:
+ return "RAID0";
+ case LLT_RELEASED:
+ return "RELEASED";
+ case LLT_NR:
+ LBUG();
+ }
+ LBUG();
+ return "";
+}
+
/**
* lov-specific file state.
*
}
static int lov_print_raid0(const struct lu_env *env, void *cookie,
- lu_printer_t p, const struct lu_object *o)
+ lu_printer_t p, const struct lu_object *o)
{
- struct lov_object *lov = lu2lov(o);
- struct lov_layout_raid0 *r0 = lov_r0(lov);
- struct lov_stripe_md *lsm = lov->lo_lsm;
- int i;
+ struct lov_object *lov = lu2lov(o);
+ struct lov_layout_raid0 *r0 = lov_r0(lov);
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+ int i;
- (*p)(env, cookie, "stripes: %d, %svalid, lsm{%p 0x%08X %d %u %u}: \n",
- r0->lo_nr, lov->lo_layout_invalid ? "in" : "", lsm,
+ (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
+ r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
lsm->lsm_stripe_count, lsm->lsm_layout_gen);
- for (i = 0; i < r0->lo_nr; ++i) {
- struct lu_object *sub;
-
- if (r0->lo_sub[i] != NULL) {
- sub = lovsub2lu(r0->lo_sub[i]);
- lu_object_print(env, cookie, p, sub);
- } else
- (*p)(env, cookie, "sub %d absent\n", i);
- }
- return 0;
+ for (i = 0; i < r0->lo_nr; ++i) {
+ struct lu_object *sub;
+
+ if (r0->lo_sub[i] != NULL) {
+ sub = lovsub2lu(r0->lo_sub[i]);
+ lu_object_print(env, cookie, p, sub);
+ } else {
+ (*p)(env, cookie, "sub %d absent\n", i);
+ }
+ }
+ return 0;
}
static int lov_print_released(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o)
{
- (*p)(env, cookie, "released\n");
+ struct lov_object *lov = lu2lov(o);
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+
+ (*p)(env, cookie,
+ "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
+ lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+ lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+ lsm->lsm_stripe_count, lsm->lsm_layout_gen);
return 0;
}
}
static int lov_layout_change(const struct lu_env *unused,
- struct lov_object *lov,
- const struct cl_object_conf *conf)
+ struct lov_object *lov,
+ const struct cl_object_conf *conf)
{
int result;
enum lov_layout_type llt = LLT_EMPTY;
RETURN(PTR_ERR(env));
}
+ CDEBUG(D_INODE, DFID" from %s to %s\n",
+ PFID(lu_object_fid(lov2lu(lov))),
+ llt2str(lov->lo_type), llt2str(llt));
+
old_ops = &lov_dispatch[lov->lo_type];
new_ops = &lov_dispatch[llt];
if (conf->u.coc_md != NULL)
lsm = conf->u.coc_md->lsm;
if ((lsm == NULL && lov->lo_lsm == NULL) ||
- (lsm != NULL && lov->lo_lsm != NULL &&
- lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
+ ((lsm != NULL && lov->lo_lsm != NULL) &&
+ (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
+ (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
/* same version of layout */
lov->lo_layout_invalid = false;
GOTO(out, result = 0);
out:
lov_conf_unlock(lov);
+ CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
+ PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
RETURN(result);
}
__u64 cr_flags = (flags & (FMODE_READ | FMODE_WRITE |
MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS |
MDS_OPEN_OWNEROVERRIDE | MDS_OPEN_LOCK |
- MDS_OPEN_BY_FID | MDS_OPEN_LEASE));
+ MDS_OPEN_BY_FID | MDS_OPEN_LEASE |
+ MDS_OPEN_RELEASE));
if (flags & O_CREAT)
cr_flags |= MDS_OPEN_CREAT;
if (flags & O_EXCL)
}
}
+static void mdc_hsm_release_pack(struct ptlrpc_request *req,
+ struct md_op_data *op_data)
+{
+ if (op_data->op_bias & MDS_HSM_RELEASE) {
+ struct close_data *data;
+ struct ldlm_lock *lock;
+
+ data = req_capsule_client_get(&req->rq_pill, &RMF_CLOSE_DATA);
+ LASSERT(data != NULL);
+
+ lock = ldlm_handle2lock(&op_data->op_lease_handle);
+ if (lock != NULL) {
+ data->cd_handle = lock->l_remote_handle;
+ ldlm_lock_put(lock);
+ }
+ ldlm_cli_cancel(&op_data->op_lease_handle, LCF_LOCAL);
+
+ data->cd_data_version = op_data->op_data_version;
+ data->cd_fid = op_data->op_fid2;
+ }
+}
+
void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
{
struct mdt_ioepoch *epoch;
mdc_setattr_pack_rec(rec, op_data);
mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
mdc_ioepoch_pack(epoch, op_data);
+ mdc_hsm_release_pack(req, op_data);
}
static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
struct md_open_data *mod, struct ptlrpc_request **request)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct ptlrpc_request *req;
- int rc;
- ENTRY;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct ptlrpc_request *req;
+ struct req_format *req_fmt;
+ int rc;
+ int saved_rc = 0;
+ ENTRY;
- *request = NULL;
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_CLOSE);
+ req_fmt = &RQF_MDS_CLOSE;
+ if (op_data->op_bias & MDS_HSM_RELEASE) {
+ req_fmt = &RQF_MDS_RELEASE_CLOSE;
+
+ /* allocate a FID for volatile file */
+ rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc < 0) {
+ CERROR("%s: "DFID" failed to allocate FID: %d\n",
+ obd->obd_name, PFID(&op_data->op_fid1), rc);
+ /* save the errcode and proceed to close */
+ saved_rc = rc;
+ }
+ }
+
+ *request = NULL;
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt);
if (req == NULL)
RETURN(-ENOMEM);
}
*request = req;
mdc_close_handle_reply(req, op_data, rc);
- RETURN(rc);
+ RETURN(rc < 0 ? rc : saved_rc);
}
int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
struct lu_dirent mti_ent;
char mti_key[NAME_MAX + 16];
struct obd_trans_info mti_oti;
- struct lu_buf mti_buf;
+ struct lu_buf mti_buf[4];
struct lu_buf mti_big_buf; /* biggish persistent buf */
struct lu_buf mti_link_buf; /* buf for link ea */
struct lu_name mti_name;
struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
{
- struct lu_buf *buf;
+ struct lu_buf *buf;
- buf = &mdd_env_info(env)->mti_buf;
- buf->lb_buf = area;
- buf->lb_len = len;
- return buf;
+ buf = &mdd_env_info(env)->mti_buf[0];
+ buf->lb_buf = area;
+ buf->lb_len = len;
+ return buf;
}
const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
const void *area, ssize_t len)
{
- struct lu_buf *buf;
+ struct lu_buf *buf;
- buf = &mdd_env_info(env)->mti_buf;
- buf->lb_buf = (void *)area;
- buf->lb_len = len;
- return buf;
+ buf = &mdd_env_info(env)->mti_buf[0];
+ buf->lb_buf = (void *)area;
+ buf->lb_len = len;
+ return buf;
}
struct lu_object *mdd_object_alloc(const struct lu_env *env,
struct mdd_thread_info *info = mdd_env_info(env);
struct mdd_device *mdd = mdo2mdd(obj);
struct mdd_object *mdd_obj = md2mdd_obj(obj);
- struct lu_buf *current_buf = &info->mti_buf;
+ struct lu_buf *current_buf;
struct md_hsm *current_mh;
struct md_hsm *new_mh;
int rc;
RETURN(-ENOMEM);
/* Read HSM attrs from disk */
- current_buf->lb_buf = info->mti_xattr_buf;
- current_buf->lb_len = sizeof(info->mti_xattr_buf);
CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+ current_buf = mdd_buf_get(env, info->mti_xattr_buf,
+ sizeof(info->mti_xattr_buf));
rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
mdd_object_capa(env, mdd_obj));
- rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
+ rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
if (rc < 0 && rc != -ENODATA)
GOTO(free, rc);
else if (rc == -ENODATA)
return(rc);
}
-
/**
* The caller should guarantee to update the object ctime
* after xattr_set if needed.
* read lov EA of an object
* return the lov EA in an allocated lu_buf
*/
-static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
- struct mdd_object *obj)
+static int mdd_get_lov_ea(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct lu_buf *lmm_buf)
{
struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
- struct lu_buf *lmm_buf = NULL;
int rc, sz;
ENTRY;
goto repeat;
}
- OBD_ALLOC_PTR(lmm_buf);
- if (!lmm_buf)
+ lu_buf_alloc(lmm_buf, sz);
+ if (lmm_buf->lb_buf == NULL)
GOTO(out, rc = -ENOMEM);
- OBD_ALLOC(lmm_buf->lb_buf, sz);
- if (!lmm_buf->lb_buf)
- GOTO(free, rc = -ENOMEM);
-
memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
- lmm_buf->lb_len = sz;
-
- GOTO(out, rc = 0);
+ rc = 0;
+ EXIT;
-free:
- if (lmm_buf)
- OBD_FREE_PTR(lmm_buf);
out:
- if (rc)
- return ERR_PTR(rc);
- return lmm_buf;
+ if (rc < 0)
+ lu_buf_free(lmm_buf);
+ return rc;
}
+static int mdd_xattr_hsm_replace(const struct lu_env *env,
+ struct mdd_object *o, struct lu_buf *buf,
+ struct thandle *handle)
+{
+ struct hsm_attrs *attrs;
+ __u32 hsm_flags;
+ int flags = 0;
+ int rc;
+ ENTRY;
+
+ rc = mdo_xattr_set(env, o, buf, XATTR_NAME_HSM, LU_XATTR_REPLACE,
+ handle, mdd_object_capa(env, o));
+ if (rc != 0)
+ RETURN(rc);
+
+ attrs = buf->lb_buf;
+ hsm_flags = le32_to_cpu(attrs->hsm_flags);
+ if (!(hsm_flags & HS_RELEASED) || mdd_is_dead_obj(o))
+ RETURN(0);
+
+ /* Add a changelog record for release. */
+ hsm_set_cl_event(&flags, HE_RELEASE);
+ rc = mdd_changelog_data_store(env, mdo2mdd(&o->mod_obj), CL_HSM,
+ flags, o, handle);
+ RETURN(rc);
+}
/*
* check if layout swapping between 2 objects is allowed
static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
struct md_object *obj2, __u64 flags)
{
- struct mdd_object *o1, *o2, *fst_o, *snd_o;
- struct lu_buf *lmm1_buf = NULL, *lmm2_buf = NULL;
- struct lu_buf *fst_buf, *snd_buf;
- struct lov_mds_md *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
- struct thandle *handle;
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_object *fst_o = md2mdd_obj(obj1);
+ struct mdd_object *snd_o = md2mdd_obj(obj2);
struct mdd_device *mdd = mdo2mdd(obj1);
- int rc;
+ struct lov_mds_md *fst_lmm, *snd_lmm;
+ struct lu_buf *fst_buf = &info->mti_buf[0];
+ struct lu_buf *snd_buf = &info->mti_buf[1];
+ struct lu_buf *fst_hsm_buf = &info->mti_buf[2];
+ struct lu_buf *snd_hsm_buf = &info->mti_buf[3];
+ struct ost_id *saved_oi = NULL;
+ struct thandle *handle;
__u16 fst_gen, snd_gen;
int fst_fl;
+ int rc;
+ int rc2;
ENTRY;
+ CLASSERT(ARRAY_SIZE(info->mti_buf) >= 4);
+ memset(info->mti_buf, 0, sizeof(info->mti_buf));
+
/* we have to sort the 2 obj, so locking will always
* be in the same order, even in case of 2 concurrent swaps */
- rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
- mdo2fid(md2mdd_obj(obj2)));
- /* same fid ? */
- if (rc == 0)
+ rc = lu_fid_cmp(mdo2fid(fst_o), mdo2fid(snd_o));
+ if (rc == 0) /* same fid ? */
RETURN(-EPERM);
- if (rc > 0) {
- o1 = md2mdd_obj(obj1);
- o2 = md2mdd_obj(obj2);
- } else {
- o1 = md2mdd_obj(obj2);
- o2 = md2mdd_obj(obj1);
- }
+ if (rc < 0)
+ swap(fst_o, snd_o);
/* check if layout swapping is allowed */
- rc = mdd_layout_swap_allowed(env, o1, o2);
- if (rc)
+ rc = mdd_layout_swap_allowed(env, fst_o, snd_o);
+ if (rc != 0)
RETURN(rc);
handle = mdd_trans_create(env, mdd);
RETURN(PTR_ERR(handle));
/* objects are already sorted */
- mdd_write_lock(env, o1, MOR_TGT_CHILD);
- mdd_write_lock(env, o2, MOR_TGT_CHILD);
-
- lmm1_buf = mdd_get_lov_ea(env, o1);
- if (IS_ERR(lmm1_buf)) {
- rc = PTR_ERR(lmm1_buf);
- lmm1_buf = NULL;
- if (rc != -ENODATA)
- GOTO(stop, rc);
- }
+ mdd_write_lock(env, fst_o, MOR_TGT_CHILD);
+ mdd_write_lock(env, snd_o, MOR_TGT_CHILD);
- lmm2_buf = mdd_get_lov_ea(env, o2);
- if (IS_ERR(lmm2_buf)) {
- rc = PTR_ERR(lmm2_buf);
- lmm2_buf = NULL;
- if (rc != -ENODATA)
- GOTO(stop, rc);
- }
+ rc = mdd_get_lov_ea(env, fst_o, fst_buf);
+ if (rc < 0 && rc != -ENODATA)
+ GOTO(stop, rc);
+
+ rc = mdd_get_lov_ea(env, snd_o, snd_buf);
+ if (rc < 0 && rc != -ENODATA)
+ GOTO(stop, rc);
/* swapping 2 non existant layouts is a success */
- if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
+ if (fst_buf->lb_buf == NULL && snd_buf->lb_buf == NULL)
GOTO(stop, rc = 0);
/* to help inode migration between MDT, it is better to
* start by the no layout file (if one), so we order the swap */
- if (lmm1_buf == NULL) {
- fst_o = o1;
- fst_buf = lmm1_buf;
- snd_o = o2;
- snd_buf = lmm2_buf;
- } else {
- fst_o = o2;
- fst_buf = lmm2_buf;
- snd_o = o1;
- snd_buf = lmm1_buf;
+ if (snd_buf->lb_buf == NULL) {
+ swap(fst_o, snd_o);
+ swap(fst_buf, snd_buf);
}
/* lmm and generation layout initialization */
- if (fst_buf) {
+ if (fst_buf->lb_buf != NULL) {
fst_lmm = fst_buf->lb_buf;
fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
fst_fl = LU_XATTR_REPLACE;
fst_fl = LU_XATTR_CREATE;
}
- LASSERT(snd_buf != NULL);
+ LASSERT(snd_buf->lb_buf != NULL);
snd_lmm = snd_buf->lb_buf;
snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
fst_gen++;
/* set the file specific informations in lmm */
- if (fst_lmm) {
- /* save the orignal lmm common header of first file
- * to be able to roll back */
- OBD_ALLOC_PTR(old_fst_lmm);
- if (old_fst_lmm == NULL)
- GOTO(stop, rc = -ENOMEM);
- *old_fst_lmm = *fst_lmm;
+ if (fst_lmm != NULL) {
+ saved_oi = &info->mti_oa.o_oi;
+ *saved_oi = fst_lmm->lmm_oi;
fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
fst_lmm->lmm_oi = snd_lmm->lmm_oi;
-
- snd_lmm->lmm_oi = old_fst_lmm->lmm_oi;
+ snd_lmm->lmm_oi = *saved_oi;
} else {
if (snd_lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1))
snd_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
else
GOTO(stop, rc = -EPROTO);
}
-
snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
+ /* Prepare HSM attribute if it's required */
+ if (flags & SWAP_LAYOUTS_MDS_HSM) {
+ const int buflen = sizeof(struct hsm_attrs);
+
+ lu_buf_alloc(fst_hsm_buf, buflen);
+ lu_buf_alloc(snd_hsm_buf, buflen);
+ if (fst_hsm_buf->lb_buf == NULL || snd_hsm_buf->lb_buf == NULL)
+ GOTO(stop, rc = -ENOMEM);
+
+ /* Read HSM attribute */
+ rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM,
+ BYPASS_CAPA);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM,
+ BYPASS_CAPA);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_hsm_buf,
+ XATTR_NAME_HSM, LU_XATTR_REPLACE,
+ handle);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_hsm_buf,
+ XATTR_NAME_HSM, LU_XATTR_REPLACE,
+ handle);
+ if (rc < 0)
+ GOTO(stop, rc);
+ }
+
/* prepare transaction */
rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
fst_fl, handle);
- if (rc)
+ if (rc != 0)
GOTO(stop, rc);
- if (fst_buf)
+ if (fst_buf->lb_buf != NULL)
rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf,
XATTR_NAME_LOV, LU_XATTR_REPLACE,
handle);
else
rc = mdd_declare_xattr_del(env, mdd, snd_o, XATTR_NAME_LOV,
handle);
- if (rc)
+ if (rc != 0)
GOTO(stop, rc);
rc = mdd_trans_start(env, mdd, handle);
- if (rc)
+ if (rc != 0)
GOTO(stop, rc);
+ if (flags & SWAP_LAYOUTS_MDS_HSM) {
+ rc = mdd_xattr_hsm_replace(env, fst_o, snd_hsm_buf, handle);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = mdd_xattr_hsm_replace(env, snd_o, fst_hsm_buf, handle);
+ if (rc < 0) {
+ rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf,
+ handle);
+ if (rc2 < 0)
+ CERROR("%s: restore "DFID" HSM error: %d/%d\n",
+ mdd_obj_dev_name(fst_o),
+ PFID(mdo2fid(fst_o)), rc, rc2);
+ GOTO(stop, rc);
+ }
+ }
+
rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle,
mdd_object_capa(env, fst_o));
- if (rc)
+ if (rc != 0)
GOTO(stop, rc);
- if (fst_buf)
+ if (fst_buf->lb_buf != NULL)
rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
LU_XATTR_REPLACE, handle,
mdd_object_capa(env, snd_o));
else
rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle,
mdd_object_capa(env, snd_o));
- if (rc) {
- int rc2;
+ if (rc != 0) {
+ int steps = 0;
/* failure on second file, but first was done, so we have
- * to roll back first */
- /* restore object_id, object_seq and generation number
- * on first file */
- if (fst_lmm) {
- LASSERT(old_fst_lmm != NULL);
- fst_lmm->lmm_oi = old_fst_lmm->lmm_oi;
- fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
+ * to roll back first. */
+ if (fst_buf->lb_buf != NULL) {
+ fst_lmm->lmm_oi = *saved_oi;
+ fst_lmm->lmm_layout_gen = cpu_to_le16(fst_gen - 1);
rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
LU_XATTR_REPLACE, handle,
mdd_object_capa(env, fst_o));
rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle,
mdd_object_capa(env, fst_o));
}
+ if (rc2 < 0)
+ goto do_lbug;
+
+ ++steps;
+ rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf, handle);
+ if (rc2 < 0)
+ goto do_lbug;
- if (rc2) {
+ ++steps;
+ rc2 = mdd_xattr_hsm_replace(env, snd_o, snd_hsm_buf, handle);
+
+ do_lbug:
+ if (rc2 < 0) {
/* very bad day */
- CERROR("%s: unable to roll back after swap layouts"
- " failure between "DFID" and "DFID
- " rc2 = %d rc = %d)\n",
- mdd2obd_dev(mdd)->obd_name,
+ CERROR("%s: unable to roll back layout swap. FIDs: "
+ DFID" and "DFID "error: %d/%d, steps: %d\n",
+ mdd_obj_dev_name(fst_o),
PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
- rc2, rc);
+ rc, rc2, steps);
/* a solution to avoid journal commit is to panic,
* but it has strong consequences so we use LBUG to
* allow sysdamin to choose to panic or not
stop:
mdd_trans_stop(env, mdd, rc, handle);
- mdd_write_unlock(env, o2);
- mdd_write_unlock(env, o1);
-
- if (lmm1_buf && lmm1_buf->lb_buf)
- OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
- if (lmm1_buf)
- OBD_FREE_PTR(lmm1_buf);
-
- if (lmm2_buf && lmm2_buf->lb_buf)
- OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
- if (lmm2_buf)
- OBD_FREE_PTR(lmm2_buf);
-
- if (old_fst_lmm)
- OBD_FREE_PTR(old_fst_lmm);
+ mdd_write_unlock(env, snd_o);
+ mdd_write_unlock(env, fst_o);
+ lu_buf_free(fst_buf);
+ lu_buf_free(snd_buf);
+ lu_buf_free(fst_hsm_buf);
+ lu_buf_free(snd_hsm_buf);
return rc;
}
sema_init(&cdt->cdt_llog_lock, 1);
init_rwsem(&cdt->cdt_agent_lock);
init_rwsem(&cdt->cdt_request_lock);
+ sema_init(&cdt->cdt_restore_lock, 1);
CFS_INIT_LIST_HEAD(&cdt->cdt_requests);
CFS_INIT_LIST_HEAD(&cdt->cdt_agents);
else
ma->ma_attr_flags &= ~MDS_DATA_MODIFIED;
- if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
- mdt_set_capainfo(info, 0, rr->rr_fid1,
- req_capsule_client_get(pill, &RMF_CAPA1));
+ if (rec->sa_bias & MDS_HSM_RELEASE)
+ ma->ma_attr_flags |= MDS_HSM_RELEASE;
+ else
+ ma->ma_attr_flags &= ~MDS_HSM_RELEASE;
- RETURN(0);
+ if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
+ mdt_set_capainfo(info, 0, rr->rr_fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
+
+ RETURN(0);
}
static int mdt_ioepoch_unpack(struct mdt_thread_info *info)
RETURN(rc);
}
+static int mdt_hsm_release_unpack(struct mdt_thread_info *info)
+{
+ struct md_attr *ma = &info->mti_attr;
+ struct req_capsule *pill = info->mti_pill;
+ ENTRY;
+
+ if (!(ma->ma_attr_flags & MDS_HSM_RELEASE))
+ RETURN(0);
+
+ req_capsule_extend(pill, &RQF_MDS_RELEASE_CLOSE);
+
+ if (!(req_capsule_has_field(pill, &RMF_CLOSE_DATA, RCL_CLIENT) &&
+ req_capsule_field_present(pill, &RMF_CLOSE_DATA, RCL_CLIENT)))
+ RETURN(-EFAULT);
+
+ RETURN(0);
+}
+
int mdt_close_unpack(struct mdt_thread_info *info)
{
int rc;
rc = mdt_setattr_unpack_rec(info);
if (rc)
RETURN(rc);
+
+ rc = mdt_hsm_release_unpack(info);
+ if (rc)
+ RETURN(rc);
+
RETURN(mdt_init_ucred_reint(info));
}
{
LASSERT(mfd != NULL);
- CDEBUG(D_HA, DFID "Change mfd mode 0x%Lx->0x%Lx\n",
+ CDEBUG(D_HA, DFID " Change mfd mode "LPO64" -> "LPO64".\n",
PFID(mdt_object_fid(mfd->mfd_object)), mfd->mfd_mode, mode);
mfd->mfd_mode = mode;
isreg = S_ISREG(la->la_mode);
isdir = S_ISDIR(la->la_mode);
- if (isreg && !(ma->ma_valid & MA_LOV)) {
+ if (isreg && !(ma->ma_valid & MA_LOV) && !(flags & MDS_OPEN_RELEASE)) {
/*
* No EA, check whether it is will set regEA and dirEA since in
* above attr get, these size might be zero, so reset it, to
RETURN_EXIT;
}
+/**
+ * Check release is permitted for the current HSM flags.
+ */
+static bool mdt_hsm_release_allow(struct md_attr *ma)
+{
+ if (!(ma->ma_valid & MA_HSM))
+ return false;
+
+ if (ma->ma_hsm.mh_flags & (HS_DIRTY|HS_NORELEASE|HS_LOST))
+ return false;
+
+ if (!(ma->ma_hsm.mh_flags & HS_ARCHIVED))
+ return false;
+
+ return true;
+}
+
int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
struct mdt_lock_handle *lhc)
{
mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
+ if (flags & MDS_OPEN_RELEASE)
+ ma->ma_need |= MA_HSM;
rc = mdt_attr_get_complex(info, o, ma);
- if (rc)
- GOTO(out, rc);
+ if (rc)
+ GOTO(out, rc);
+
+ /* If a release request, check file flags are fine and ask for an
+ * exclusive open access. */
+ if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
+ GOTO(out, rc = -EPERM);
rc = mdt_object_open_lock(info, o, lhc, &ibits);
if (rc)
result = rc;
/* openlock will be released if mdt_finish_open failed */
mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+
+ if (created && create_flags & MDS_OPEN_VOLATILE) {
+ CERROR("%s: cannot open volatile file "DFID", orphan "
+ "file will be left in PENDING directory until "
+ "next reboot, rc = %d\n", mdt_obd_name(mdt),
+ PFID(mdt_object_fid(child)), rc);
+ GOTO(out_child_unlock, result);
+ }
+
if (created) {
ma->ma_need = 0;
ma->ma_valid = 0;
return result;
}
+/**
+ * Create an orphan object use local root.
+ */
+static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
+ struct mdt_device *mdt,
+ const struct lu_fid *fid,
+ struct md_attr *attr, fmode_t fmode)
+{
+ const struct lu_env *env = info->mti_env;
+ struct md_op_spec *spec = &info->mti_spec;
+ struct lu_fid *rootfid = &info->mti_tmp_fid1;
+ struct mdt_object *obj = NULL;
+ struct mdt_object *local_root;
+ static const char name[] = "i_am_nobody";
+ struct lu_name *lname;
+ int rc;
+ ENTRY;
+
+ rc = dt_root_get(env, mdt->mdt_bottom, rootfid);
+ if (rc != 0)
+ RETURN(ERR_PTR(rc));
+
+ local_root = mdt_object_find(env, mdt, rootfid);
+ if (IS_ERR(local_root))
+ RETURN(local_root);
+
+ obj = mdt_object_new(env, mdt, fid);
+ if (IS_ERR(obj))
+ GOTO(out, rc = PTR_ERR(obj));
+
+ spec->sp_cr_lookup = 0;
+ spec->sp_feat = &dt_directory_features;
+ spec->sp_cr_mode = MDL_MINMODE; /* no lock */
+ spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
+ if (attr->ma_valid & MA_LOV) {
+ spec->u.sp_ea.eadata = attr->ma_lmm;
+ spec->u.sp_ea.eadatalen = attr->ma_lmm_size;
+ spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+ } else {
+ spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+ }
+
+ lname = mdt_name(env, (char *)name, sizeof(name) - 1);
+ rc = mdo_create(env, mdt_object_child(local_root), lname,
+ mdt_object_child(obj), spec, attr);
+ if (rc == 0) {
+ rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
+ if (rc < 0)
+ CERROR("%s: cannot open volatile file "DFID", orphan "
+ "file will be left in PENDING directory until "
+ "next reboot, rc = %d\n", mdt_obd_name(mdt),
+ PFID(fid), rc);
+ }
+ EXIT;
+
+out:
+ if (rc < 0) {
+ if (!IS_ERR(obj))
+ mdt_object_put(env, obj);
+ obj = ERR_PTR(rc);
+ }
+ mdt_object_put(env, local_root);
+ return obj;
+}
+
+static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
+ struct md_attr *ma)
+{
+ struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LAYOUT];
+ struct close_data *data;
+ struct ldlm_lock *lease;
+ struct mdt_object *orphan;
+ struct md_attr *orp_ma;
+ struct lu_buf *buf;
+ bool lease_broken;
+ int rc;
+ int rc2;
+ ENTRY;
+
+ data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+ if (data == NULL)
+ RETURN(-EPROTO);
+
+ lease = ldlm_handle2lock(&data->cd_handle);
+ if (lease == NULL)
+ RETURN(-ESTALE);
+
+ /* try to hold open_sem so that nobody else can open the file */
+ if (!down_write_trylock(&o->mot_open_sem)) {
+ ldlm_lock_cancel(lease);
+ ldlm_lock_put(lease);
+ RETURN(-EBUSY);
+ }
+
+ /* Check if the lease open lease has already canceled */
+ lock_res_and_lock(lease);
+ lease_broken = ldlm_is_cancel(lease);
+ unlock_res_and_lock(lease);
+
+ LDLM_DEBUG(lease, DFID " lease broken? %d\n",
+ PFID(mdt_object_fid(o)), lease_broken);
+
+ /* Cancel server side lease. Client side counterpart should
+ * have been cancelled. It's okay to cancel it now as we've
+ * held mot_open_sem. */
+ ldlm_lock_cancel(lease);
+ ldlm_lock_put(lease);
+
+ if (lease_broken) /* don't perform release task */
+ GOTO(out_unlock, rc = -ESTALE);
+
+ if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
+ GOTO(out_unlock, rc = -EINVAL);
+
+ /* ma_need was set before but it seems fine to change it in order to
+ * avoid modifying the one from RPC */
+ ma->ma_need = MA_HSM | MA_LOV;
+ rc = mdt_attr_get_complex(info, o, ma);
+ if (rc != 0)
+ GOTO(out_unlock, rc);
+
+ if (!mdt_hsm_release_allow(ma))
+ GOTO(out_unlock, rc = -EPERM);
+
+ /* already released? */
+ if (ma->ma_hsm.mh_flags & HS_RELEASED)
+ GOTO(out_unlock, rc = 0);
+
+ /* Compare on-disk and packed data_version */
+ if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
+ CDEBUG(D_HSM, DFID" data_version mismatches: packed="LPU64
+ " and on-disk="LPU64"\n", PFID(mdt_object_fid(o)),
+ data->cd_data_version, ma->ma_hsm.mh_arch_ver);
+ /* XXX: Enable this line when hsm_archive is operational!
+ GOTO(out_unlock, rc = -EPERM);
+ */
+ }
+
+ ma->ma_valid = MA_INODE;
+ ma->ma_attr.la_valid &= LA_SIZE | LA_MTIME | LA_ATIME;
+ rc = mo_attr_set(info->mti_env, mdt_object_child(o), ma);
+ if (rc < 0)
+ GOTO(out_unlock, rc);
+
+ if (!(ma->ma_valid & MA_LOV)) {
+ /* Even empty file are released */
+ memset(ma->ma_lmm, 0, sizeof(*ma->ma_lmm));
+ ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+ ma->ma_lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+ ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE);
+ ma->ma_valid |= MA_LOV;
+ } else {
+ /* Magic must be LOV_MAGIC_Vx_DEF otherwise LOD will interpret
+ * ma_lmm as lov_user_md, then it will be confused by union of
+ * layout_gen and stripe_offset. */
+ if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1)
+ ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+ else if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3)
+ ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF);
+ else
+ GOTO(out_unlock, rc = -EINVAL);
+ }
+
+ /* Set file as released */
+ ma->ma_lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
+
+ /* Hopefully it's not used in this call path */
+ orp_ma = &info->mti_u.som.attr;
+ orp_ma->ma_valid = MA_INODE | MA_LOV;
+ orp_ma->ma_attr.la_mode = S_IFREG;
+ orp_ma->ma_attr.la_valid = LA_MODE;
+ orp_ma->ma_lmm = ma->ma_lmm;
+ orp_ma->ma_lmm_size = ma->ma_lmm_size;
+ orphan = mdt_orphan_open(info, info->mti_mdt, &data->cd_fid, orp_ma,
+ FMODE_WRITE);
+ if (IS_ERR(orphan)) {
+ CERROR("%s: cannot open orphan file "DFID": rc = %ld\n",
+ mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid),
+ PTR_ERR(orphan));
+ GOTO(out_unlock, rc = PTR_ERR(orphan));
+ }
+
+ /* Set up HSM attribute for orphan object */
+ CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+ buf = &info->mti_buf;
+ buf->lb_buf = info->mti_xattr_buf;
+ buf->lb_len = sizeof(struct hsm_attrs);
+ ma->ma_hsm.mh_flags |= HS_RELEASED;
+ lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
+ ma->ma_hsm.mh_flags &= ~HS_RELEASED;
+ rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
+ XATTR_NAME_HSM, 0);
+ if (rc < 0)
+ GOTO(out_close, rc);
+
+ mdt_lock_reg_init(lh, LCK_EX);
+ rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT, MDT_LOCAL_LOCK);
+ if (rc == 0) {
+ /* Swap layout with orphan object */
+ rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
+ mdt_object_child(orphan),
+ SWAP_LAYOUTS_MDS_HSM);
+
+ /* Release exclusive LL */
+ mdt_object_unlock(info, o, lh, 1);
+ }
+ EXIT;
+
+out_close:
+ /* Close orphan object anyway */
+ rc2 = mo_close(info->mti_env, mdt_object_child(orphan), orp_ma,
+ FMODE_WRITE);
+ if (rc2 < 0)
+ CERROR("%s: error closing volatile file "DFID": rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
+ LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
+ "object closed\n");
+ mdt_object_put(info->mti_env, orphan);
+
+out_unlock:
+ up_write(&o->mot_open_sem);
+
+ if (rc == 0) { /* already released */
+ struct mdt_body *repbody;
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ LASSERT(repbody != NULL);
+ repbody->valid |= OBD_MD_FLRELEASED;
+ }
+
+ ma->ma_valid = 0;
+ ma->ma_need = 0;
+ return rc;
+}
+
#define MFD_CLOSED(mode) (((mode) & ~(MDS_FMODE_EPOCH | MDS_FMODE_SOM | \
MDS_FMODE_TRUNC)) == MDS_FMODE_CLOSED)
mode = mfd->mfd_mode;
+ if (ma->ma_attr_flags & MDS_HSM_RELEASE) {
+ rc = mdt_hsm_release(info, o, ma);
+ if (rc < 0) {
+ CDEBUG(D_HSM, "%s: File " DFID " release failed: %d\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(o)), rc);
+ /* continue to close even error occurred. */
+ }
+ }
+
if ((mode & FMODE_WRITE) || (mode & MDS_FMODE_TRUNC)) {
mdt_write_put(o);
ret = mdt_ioepoch_close(info, o);
&RMF_CAPA1
};
+static const struct req_msg_field *mdt_release_close_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_MDT_EPOCH,
+ &RMF_REC_REINT,
+ &RMF_CAPA1,
+ &RMF_CLOSE_DATA
+};
+
static const struct req_msg_field *obd_statfs_server[] = {
&RMF_PTLRPC_BODY,
&RMF_OBD_STATFS
&RQF_MDS_GETXATTR,
&RQF_MDS_SYNC,
&RQF_MDS_CLOSE,
+ &RQF_MDS_RELEASE_CLOSE,
&RQF_MDS_PIN,
&RQF_MDS_UNPIN,
&RQF_MDS_READPAGE,
sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body, NULL);
EXPORT_SYMBOL(RMF_PTLRPC_BODY);
+struct req_msg_field RMF_CLOSE_DATA =
+ DEFINE_MSGF("data_version", 0,
+ sizeof(struct close_data), lustre_swab_close_data, NULL);
+EXPORT_SYMBOL(RMF_CLOSE_DATA);
+
struct req_msg_field RMF_OBD_STATFS =
DEFINE_MSGF("obd_statfs", 0,
sizeof(struct obd_statfs), lustre_swab_obd_statfs, NULL);
mdt_close_client, mds_last_unlink_server);
EXPORT_SYMBOL(RQF_MDS_CLOSE);
+struct req_format RQF_MDS_RELEASE_CLOSE =
+ DEFINE_REQ_FMT0("MDS_CLOSE",
+ mdt_release_close_client, mds_last_unlink_server);
+EXPORT_SYMBOL(RQF_MDS_RELEASE_CLOSE);
+
struct req_format RQF_MDS_PIN =
DEFINE_REQ_FMT0("MDS_PIN",
mdt_body_capa, mdt_body_only);
__swab64s(&msl->msl_flags);
}
EXPORT_SYMBOL(lustre_swab_swap_layouts);
+
+void lustre_swab_close_data(struct close_data *cd)
+{
+ lustre_swab_lu_fid(&cd->cd_fid);
+ __swab64s(&cd->cd_data_version);
+}
+EXPORT_SYMBOL(lustre_swab_close_data);
TMP=${TMP:-/tmp}
ORIG_PWD=${PWD}
+MCREATE=${MCREATE:-mcreate}
LUSTRE=${LUSTRE:-$(cd $(dirname $0)/..; echo $PWD)}
. $LUSTRE/tests/test-framework.sh
build_test_filter
+# $RUNAS_ID may get set incorrectly somewhere else
+[ $UID -eq 0 -a $RUNAS_ID -eq 0 ] &&
+ error "\$RUNAS_ID set to 0, but \$UID is also 0!"
+
+check_runas_id $RUNAS_ID $RUNAS_GID $RUNAS
+
+copytool_cleanup() {
+ # TODO: add copytool cleanup code here!
+ return
+}
+
+copytool_setup() {
+ # TODO: add copytool setup code here!
+ return
+}
+
+fail() {
+ copytool_cleanup
+ error $*
+}
+
+path2fid() {
+ $LFS path2fid $1 | tr -d '[]'
+}
+
+make_small() {
+ local file2=${1/$DIR/$DIR}
+ dd if=/dev/urandom of=$file2 count=2 bs=1M
+ path2fid $1
+}
+
test_1() {
mkdir -p $DIR/$tdir
chmod 777 $DIR/$tdir
}
run_test 3 "Check file dirtyness when opening for write"
+test_20() {
+ mkdir -p $DIR/$tdir
+
+ local f=$DIR/$tdir/sample
+ touch $f
+
+ # Could not release a non-archived file
+ $LFS hsm_release $f && error "release should not succeed"
+
+ # For following tests, we must test them with HS_ARCHIVED set
+ $LFS hsm_set --exists --archived $f || error "could not add flag"
+
+ # Could not release a file if no-release is set
+ $LFS hsm_set --norelease $f || error "could not add flag"
+ $LFS hsm_release $f && error "release should not succeed"
+ $LFS hsm_clear --norelease $f || error "could not remove flag"
+
+ # Could not release a file if lost
+ $LFS hsm_set --lost $f || error "could not add flag"
+ $LFS hsm_release $f && error "release should not succeed"
+ $LFS hsm_clear --lost $f || error "could not remove flag"
+
+ # Could not release a file if dirty
+ $LFS hsm_set --dirty $f || error "could not add flag"
+ $LFS hsm_release $f && error "release should not succeed"
+ $LFS hsm_clear --dirty $f || error "could not remove flag"
+
+}
+run_test 20 "Release is not permitted"
+
+test_21() {
+ # test needs a running copytool
+ copytool_setup
+
+ mkdir -p $DIR/$tdir
+ local f=$DIR/$tdir/test_release
+
+ # Create a file and check its states
+ local fid=$(make_small $f)
+ $LFS hsm_state $f | grep -q " (0x00000000)" ||
+ fail "wrong clean hsm state"
+
+# $LFS hsm_archive $f || fail "could not archive file"
+# wait_request_state $fid ARCHIVE SUCCEED
+ $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+ [ $(stat -c "%b" $f) -ne "0" ] || fail "wrong block number"
+ local sz=$(stat -c "%s" $f)
+ [ $sz -ne "0" ] || fail "file size should not be zero"
+
+ # Release and check states
+ $LFS hsm_release $f || fail "could not release file"
+ $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+ fail "wrong released hsm state"
+ [ $(stat -c "%b" $f) -eq "0" ] || fail "wrong block number"
+ [ $(stat -c "%s" $f) -eq $sz ] || fail "wrong file size"
+
+ # Check we can release an file without stripe info
+ f=$f.nolov
+ $MCREATE $f
+ fid=$(path2fid $f)
+ $LFS hsm_state $f | grep -q " (0x00000000)" ||
+ fail "wrong clean hsm state"
+
+# $LFS hsm_archive $f || fail "could not archive file"
+# wait_request_state $fid ARCHIVE SUCCEED
+ $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+ # Release and check states
+ $LFS hsm_release $f || fail "could not release file"
+ $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+ fail "wrong released hsm state"
+
+ # Release again a file that is already released is OK
+ $LFS hsm_release $f || fail "second release should succeed"
+ $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+ fail "wrong released hsm state"
+
+ copytool_cleanup
+}
+run_test 21 "Simple release tests"
+
+test_22() {
+ # test needs a running copytool
+ copytool_setup
+
+ mkdir -p $DIR/$tdir
+
+ local f=$DIR/$tdir/test_release
+ local swap=$DIR/$tdir/test_swap
+
+ # Create a file and check its states
+ local fid=$(make_small $f)
+ $LFS hsm_state $f | grep -q " (0x00000000)" ||
+ fail "wrong clean hsm state"
+
+# $LFS hsm_archive $f || fail "could not archive file"
+# wait_request_state $fid ARCHIVE SUCCEED
+ $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+ # Release and check states
+ $LFS hsm_release $f || fail "could not release file"
+ $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+ fail "wrong released hsm state"
+
+ make_small $swap || fail "could not create $swap"
+ $LFS swap_layouts $swap $f && fail "swap_layouts should failed"
+
+ true
+ copytool_cleanup
+}
+run_test 22 "Could not swap a release file"
+
+
+test_23() {
+ # test needs a running copytool
+ copytool_setup
+
+ mkdir -p $DIR/$tdir
+
+ local f=$DIR/$tdir/test_mtime
+
+ # Create a file and check its states
+ local fid=$(make_small $f)
+ $LFS hsm_state $f | grep -q " (0x00000000)" ||
+ fail "wrong clean hsm state"
+
+# $LFS hsm_archive $f || fail "could not archive file"
+# wait_request_state $fid ARCHIVE SUCCEED
+ $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+ # Set modification time in the past
+ touch -m -a -d @978261179 $f
+
+ # Release and check states
+ $LFS hsm_release $f || fail "could not release file"
+ $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+ fail "wrong released hsm state"
+ local MTIME=$(stat -c "%Y" $f)
+ local ATIME=$(stat -c "%X" $f)
+ [ $MTIME -eq "978261179" ] || fail "bad mtime: $MTIME"
+ [ $ATIME -eq "978261179" ] || fail "bad atime: $ATIME"
+
+ copytool_cleanup
+}
+run_test 23 "Release does not change a/mtime (utime)"
+
+test_24() {
+ # test needs a running copytool
+ copytool_setup
+
+ mkdir -p $DIR/$tdir
+
+ local f=$DIR/$tdir/test_mtime
+
+ # Create a file and check its states
+ local fid=$(make_small $f)
+ $LFS hsm_state $f | grep -q " (0x00000000)" ||
+ fail "wrong clean hsm state"
+
+ # ensure mtime is different
+ sleep 1
+ echo "append" >> $f
+ local MTIME=$(stat -c "%Y" $f)
+ local ATIME=$(stat -c "%X" $f)
+
+# $LFS hsm_archive $f || fail "could not archive file"
+# wait_request_state $fid ARCHIVE SUCCEED
+ $LFS hsm_set --archived --exist $f || fail "could not archive file"
+
+ # Release and check states
+ $LFS hsm_release $f || fail "could not release file"
+ $LFS hsm_state $f | grep -q " (0x0000000d)" ||
+ fail "wrong released hsm state"
+
+ [ "$(stat -c "%Y" $f)" -eq "$MTIME" ] ||
+ fail "mtime should be $MTIME"
+
+# [ "$(stat -c "%X" $f)" -eq "$ATIME" ] ||
+# fail "atime should be $ATIME"
+
+ copytool_cleanup
+}
+run_test 24 "Release does not change a/mtime (i/o)"
+
log "cleanup: ======================================================"
cd $ORIG_PWD
check_and_cleanup_lustre
#define lustre_swab_hsm_request NULL
#define lustre_swab_update_buf NULL
#define lustre_swab_update_reply_buf NULL
+#define lustre_swab_close_data NULL
#define dump_rniobuf NULL
#define dump_ioo NULL