#define DEBUG_SUBSYSTEM S_MDS
#include <linux/module.h>
+#ifdef HAVE_EXT4_LDISKFS
+#include <ldiskfs/ldiskfs_jbd2.h>
+#else
#include <linux/jbd.h>
+#endif
#include <obd.h>
#include <obd_class.h>
#include <obd_support.h>
#include <lustre_fid.h>
#include <lustre_param.h>
+#ifdef HAVE_EXT4_LDISKFS
+#include <ldiskfs/ldiskfs.h>
+#else
#include <linux/ldiskfs_fs.h>
+#endif
#include <lustre_mds.h>
#include <lustre/lustre_idl.h>
else
OBD_FREE(buf->lb_buf, buf->lb_len);
buf->lb_buf = NULL;
+ buf->lb_len = 0;
}
const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
}
static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
- const struct lu_object_conf *_)
+ const struct lu_object_conf *unused)
{
struct mdd_device *d = lu2mdd_dev(o->lo_dev);
struct mdd_object *mdd_obj = lu2mdd_obj(o);
{
struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
- "valid=%x, cltime=%llu, flags=%lx",
+ "valid=%x, cltime="LPU64", flags=%lx)",
mdd, mdd->mod_count, mdd->mod_valid,
mdd->mod_cltime, mdd->mod_flags);
}
/* Verify that our path hasn't changed since we started the lookup.
Record the current index, and verify the path resolves to the
same fid. If it does, then the path is correct as of this index. */
- spin_lock(&mdd->mdd_cl.mc_lock);
+ cfs_spin_lock(&mdd->mdd_cl.mc_lock);
pli->pli_currec = mdd->mdd_cl.mc_index;
- spin_unlock(&mdd->mdd_cl.mc_lock);
+ cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
if (rc) {
CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
LASSERT(ldesc != NULL);
+ LASSERT(size != NULL);
- if (!lmm)
+ if (!lmm) {
+ *size = 0;
RETURN(0);
+ }
lmm->lmm_magic = LOV_MAGIC_V1;
- lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
+ lmm->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
lmm->lmm_pattern = ldesc->ld_pattern;
lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
XATTR_NAME_LOV);
-
- if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
+ if (rc == 0 && ma->ma_need & MA_LOV_DEF)
rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
- &ma->ma_lmm_size);
- }
-
+ &ma->ma_lmm_size);
if (rc > 0) {
ma->ma_valid |= MA_LOV;
rc = 0;
RETURN(rc);
}
+static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
+ struct md_attr *ma)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lustre_mdt_attrs *lma =
+ (struct lustre_mdt_attrs *)info->mti_xattr_buf;
+ int lma_size;
+ int rc;
+ ENTRY;
+
+ /* If all needed data are already valid, nothing to do */
+ if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
+ (ma->ma_need & (MA_HSM | MA_SOM)))
+ RETURN(0);
+
+ /* Read LMA from disk EA */
+ lma_size = sizeof(info->mti_xattr_buf);
+ rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
+ if (rc <= 0)
+ RETURN(rc);
+
+ /* Useless to check LMA incompatibility because this is already done in
+ * osd_ea_fid_get(), and this will fail long before this code is
+ * called.
+ * So, if we are here, LMA is compatible.
+ */
+
+ lustre_lma_swab(lma);
+
+ /* Swab and copy LMA */
+ if (ma->ma_need & MA_HSM) {
+ if (lma->lma_compat & LMAC_HSM)
+ ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
+ else
+ ma->ma_hsm_flags = 0;
+ ma->ma_valid |= MA_HSM;
+ }
+
+ /* Copy SOM */
+ if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
+ LASSERT(ma->ma_som != NULL);
+ ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
+ ma->ma_som->msd_size = lma->lma_som_size;
+ ma->ma_som->msd_blocks = lma->lma_som_blocks;
+ ma->ma_som->msd_mountid = lma->lma_som_mountid;
+ ma->ma_valid |= MA_SOM;
+ }
+
+ RETURN(0);
+}
+
static int mdd_attr_get_internal(const struct lu_env *env,
struct mdd_object *mdd_obj,
struct md_attr *ma)
if (S_ISDIR(mdd_object_type(mdd_obj)))
rc = __mdd_lmv_get(env, mdd_obj, ma);
}
+ if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
+ if (S_ISREG(mdd_object_type(mdd_obj)))
+ rc = __mdd_lma_get(env, mdd_obj, ma);
+ }
#ifdef CONFIG_FS_POSIX_ACL
if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
if (S_ISDIR(mdd_object_type(mdd_obj)))
struct mdd_object *mdd_obj, struct md_attr *ma)
{
int rc;
- int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
+ int needlock = ma->ma_need &
+ (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
if (needlock)
mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
struct lu_attr *la, const struct md_attr *ma)
{
struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
- struct md_ucred *uc = md_ucred(env);
+ struct md_ucred *uc;
int rc;
ENTRY;
if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
RETURN(-EPERM);
+ /* export destroy does not have ->le_ses, but we may want
+ * to drop LUSTRE_SOM_FL. */
+ if (!env->le_ses)
+ RETURN(0);
+
+ uc = md_ucred(env);
+
rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
if (rc)
RETURN(rc);
!mdd_capable(uc, CFS_CAP_FOWNER))
RETURN(-EPERM);
- if (la->la_mode == (umode_t) -1)
+ if (la->la_mode == (cfs_umode_t) -1)
la->la_mode = tmp_la->la_mode;
else
la->la_mode = (la->la_mode & S_IALLUGO) |
LASSERT(handle != NULL);
LASSERT(mdd_obj != NULL);
- if ((type == CL_SETATTR) &&
+ if ((type == CL_TIME) &&
cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
/* Don't need multiple updates in this log */
/* Don't check under lock - no big deal if we get an extra
RETURN(-ENOMEM);
rec = (struct llog_changelog_rec *)buf->lb_buf;
- rec->cr_flags = CLF_VERSION;
- rec->cr_type = (__u32)type;
- rec->cr_tfid = *tfid;
- rec->cr_namelen = 0;
+ rec->cr.cr_flags = CLF_VERSION;
+ rec->cr.cr_type = (__u32)type;
+ rec->cr.cr_tfid = *tfid;
+ rec->cr.cr_namelen = 0;
mdd_obj->mod_cltime = cfs_time_current_64();
rc = mdd_changelog_llog_write(mdd, rec, handle);
return 0;
}
+/**
+ * Should be called with write lock held.
+ *
+ * \see mdd_lma_set_locked().
+ */
+static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
+ const struct md_attr *ma, struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lu_buf *buf;
+ struct lustre_mdt_attrs *lma =
+ (struct lustre_mdt_attrs *) info->mti_xattr_buf;
+ int lmasize = sizeof(struct lustre_mdt_attrs);
+ int rc = 0;
+
+ ENTRY;
+
+ /* Either HSM or SOM part is not valid, we need to read it before */
+ if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
+ rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
+ if (rc <= 0)
+ RETURN(rc);
+
+ lustre_lma_swab(lma);
+ } else {
+ memset(lma, 0, lmasize);
+ }
+
+ /* Copy HSM data */
+ if (ma->ma_valid & MA_HSM) {
+ lma->lma_flags |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
+ lma->lma_compat |= LMAC_HSM;
+ }
+
+ /* Copy SOM data */
+ if (ma->ma_valid & MA_SOM) {
+ LASSERT(ma->ma_som != NULL);
+ if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
+ lma->lma_compat &= ~LMAC_SOM;
+ } else {
+ lma->lma_compat |= LMAC_SOM;
+ lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
+ lma->lma_som_size = ma->ma_som->msd_size;
+ lma->lma_som_blocks = ma->ma_som->msd_blocks;
+ lma->lma_som_mountid = ma->ma_som->msd_mountid;
+ }
+ }
+
+ /* Copy FID */
+ memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
+
+ lustre_lma_swab(lma);
+ buf = mdd_buf_get(env, lma, lmasize);
+ rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
+
+ RETURN(rc);
+}
+
+/**
+ * Save LMA extended attributes with data from \a ma.
+ *
+ * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
+ * not, LMA EA will be first read from disk, modified and write back.
+ *
+ */
+static int mdd_lma_set_locked(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ const struct md_attr *ma, struct thandle *handle)
+{
+ int rc;
+
+ mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+ rc = __mdd_lma_set(env, mdd_obj, ma, handle);
+ mdd_write_unlock(env, mdd_obj);
+ return rc;
+}
+
/* set attr and LOV EA at once, return updated attr */
static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
const struct md_attr *ma)
unsigned int qnids[MAXQUOTAS] = { 0, 0 };
unsigned int qoids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
- int inode_pending = 0, block_pending = 0;
+ int inode_pending[MAXQUOTAS] = { 0, 0 };
+ int block_pending[MAXQUOTAS] = { 0, 0 };
#endif
ENTRY;
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
+ struct obd_export *exp = md_quota(env)->mq_exp;
struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
mdd_quota_wrapper(la_copy, qnids);
mdd_quota_wrapper(la_tmp, qoids);
/* get file quota for new owner */
- lquota_chkquota(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA], 1,
- &inode_pending, NULL, 0, NULL, 0);
+ lquota_chkquota(mds_quota_interface_ref, obd, exp,
+ qnids, inode_pending, 1, NULL, 0,
+ NULL, 0);
block_count = (la_tmp->la_blocks + 7) >> 3;
if (block_count) {
void *data = NULL;
mdd_data_get(env, mdd_obj, &data);
/* get block quota for new owner */
lquota_chkquota(mds_quota_interface_ref, obd,
- qnids[USRQUOTA],
- qnids[GRPQUOTA],
- block_count, &block_pending,
- NULL, LQUOTA_FLAGS_BLK,
- data, 1);
+ exp, qnids, block_pending,
+ block_count, NULL,
+ LQUOTA_FLAGS_BLK, data, 1);
}
}
}
}
if (rc == 0 && ma->ma_valid & MA_LOV) {
- umode_t mode;
+ cfs_umode_t mode;
mode = mdd_object_type(mdd_obj);
if (S_ISREG(mode) || S_ISDIR(mode)) {
}
}
+ if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
+ cfs_umode_t mode;
+
+ mode = mdd_object_type(mdd_obj);
+ if (S_ISREG(mode))
+ rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
+
+ }
cleanup:
- if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
- rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
- handle);
+ if (rc == 0)
+ rc = mdd_changelog_data_store(env, mdd,
+ (ma->ma_attr.la_valid &
+ ~(LA_MTIME|LA_CTIME|LA_ATIME)) ?
+ CL_SETATTR : CL_TIME,
+ mdd_obj, handle);
mdd_trans_stop(env, mdd, rc, handle);
if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
/*set obd attr, if needed*/
}
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (inode_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA],
- inode_pending, 0);
- if (block_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA],
- block_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+ inode_pending, 0);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+ block_pending, 1);
/* Trigger dqrel/dqacq for original owner and new owner.
* If failed, the next call for lquota_chkquota will
* process it. */
RETURN(rc);
mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+ /* security-replated changes may require sync */
+ if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
+ mdd->mdd_sync_permission == 1)
+ txn_param_sync(&mdd_env_info(env)->mti_param);
+
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
struct thandle *handle;
#ifdef HAVE_QUOTA_SUPPORT
struct obd_device *obd = mdd->mdd_obd_dev;
+ struct obd_export *exp = md_quota(env)->mq_exp;
struct mds_obd *mds = &obd->u.mds;
unsigned int qids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
- int inode_pending = 0, block_pending = 0;
+ int inode_pending[MAXQUOTAS] = { 0, 0 };
+ int block_pending[MAXQUOTAS] = { 0, 0 };
#endif
int rc = 0;
ENTRY;
quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
mdd_quota_wrapper(&ma->ma_attr, qids);
/* get file quota for child */
- lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
- qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
+ lquota_chkquota(mds_quota_interface_ref, obd, exp,
+ qids, inode_pending, 1, NULL, 0,
NULL, 0);
switch (ma->ma_attr.la_mode & S_IFMT) {
case S_IFLNK:
}
/* get block quota for child */
if (block_count)
- lquota_chkquota(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- block_count, &block_pending, NULL,
- LQUOTA_FLAGS_BLK, NULL, 0);
+ lquota_chkquota(mds_quota_interface_ref, obd, exp,
+ qids, block_pending, block_count,
+ NULL, LQUOTA_FLAGS_BLK, NULL, 0);
}
#endif
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (inode_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- inode_pending, 0);
- if (block_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- block_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+ inode_pending, 0);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+ block_pending, 1);
/* Trigger dqacq on the owner of child. If failed,
* the next call for lquota_chkquota will process it. */
lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
- FSFILT_OP_CREATE_PARTIAL_CHILD);
+ quota_opc);
}
#endif
return rc;
struct md_attr *ma)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
+ struct mdd_device *mdd = mdo2mdd(obj);
struct thandle *handle;
int rc;
int reset = 1;
/* release open count */
mdd_obj->mod_count --;
- if (mdd_obj->mod_count == 0) {
+ if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
/* remove link to object from orphan index */
- if (mdd_obj->mod_flags & ORPHAN_OBJ)
- __mdd_orphan_del(env, mdd_obj, handle);
+ rc = __mdd_orphan_del(env, mdd_obj, handle);
+ if (rc == 0) {
+ CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+ "list, OSS objects to be destroyed.\n",
+ PFID(mdd_object_fid(mdd_obj)));
+ } else {
+ CERROR("Object "DFID" can not be deleted from orphan "
+ "list, maybe cause OST objects can not be "
+ "destroyed (err: %d).\n",
+ PFID(mdd_object_fid(mdd_obj)), rc);
+ /* If object was not deleted from orphan list, do not
+ * destroy OSS objects, which will be done when next
+ * recovery. */
+ GOTO(out, rc);
+ }
}
rc = mdd_iattr_get(env, mdd_obj, ma);
- if (rc == 0) {
- if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
- rc = mdd_object_kill(env, mdd_obj, ma);
+ /* Object maybe not in orphan list originally, it is rare case for
+ * mdd_finish_unlink() failure. */
+ if (rc == 0 && ma->ma_attr.la_nlink == 0) {
#ifdef HAVE_QUOTA_SUPPORT
- if (mds->mds_quota) {
- quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
- mdd_quota_wrapper(&ma->ma_attr, qids);
- }
+ if (mds->mds_quota) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ mdd_quota_wrapper(&ma->ma_attr, qids);
+ }
#endif
- if (rc == 0)
- reset = 0;
+ /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
+ if (ma->ma_valid & MA_FLAGS &&
+ ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
+ rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
+ } else {
+ rc = mdd_object_kill(env, mdd_obj, ma);
+ if (rc == 0)
+ reset = 0;
}
+
+ if (rc != 0)
+ CERROR("Error when prepare to delete Object "DFID" , "
+ "which will cause OST objects can not be "
+ "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
}
+ EXIT;
+out:
if (reset)
ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
quota_opc);
#endif
- RETURN(rc);
+ return rc;
}
/*
RETURN(rc);
}
-static int mdd_append_attrs(const struct lu_env *env,
- struct mdd_device *mdd,
- __u32 attr,
- const struct dt_it_ops *iops,
- struct dt_it *it,
- struct lu_dirent*ent)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_fid *fid = &info->mti_fid2;
- int len = cpu_to_le16(ent->lde_namelen);
- const unsigned align = sizeof(struct luda_type) - 1;
- struct lu_fid_pack *pack;
- struct mdd_object *obj;
- struct luda_type *lt;
- int rc = 0;
-
- if (attr & LUDA_FID) {
- pack = (struct lu_fid_pack *)iops->rec(env, it);
- if (IS_ERR(pack)) {
- rc = PTR_ERR(pack);
- ent->lde_attrs = 0;
- goto out;
- }
- rc = fid_unpack(pack, fid);
- if (rc != 0) {
- ent->lde_attrs = 0;
- goto out;
- }
-
- fid_cpu_to_le(&ent->lde_fid, fid);
- ent->lde_attrs = LUDA_FID;
- }
-
- /* check if file type is required */
- if (attr & LUDA_TYPE) {
- if (!(attr & LUDA_FID)) {
- CERROR("wrong attr : [%x]\n",attr);
- rc = -EINVAL;
- goto out;
- }
-
- obj = mdd_object_find(env, mdd, fid);
- if (obj == NULL) /* remote object */
- goto out;
-
- if (IS_ERR(obj)) {
- rc = PTR_ERR(obj);
- goto out;
- }
-
- if (mdd_object_exists(obj) == +1) {
- len = (len + align) & ~align;
-
- lt = (void *) ent->lde_name + len;
- lt->lt_type = cpu_to_le16(mdd_object_type(obj));
-
- ent->lde_attrs |= LUDA_TYPE;
- }
- mdd_object_put(env, obj);
- }
-out:
- ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
- return rc;
-}
-
static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
int first, void *area, int nob,
const struct dt_it_ops *iops, struct dt_it *it,
struct lu_dirent **last, __u32 attr)
{
int result;
+ __u64 hash = 0;
struct lu_dirent *ent;
- __u64 hash = 0;
if (first) {
memset(area, 0, sizeof (struct lu_dirpage));
ent = area;
do {
- char *name;
int len;
int recsize;
if (len == 0)
goto next;
- name = (char *)iops->key(env, it);
hash = iops->store(env, it);
-
if (unlikely(first)) {
first = 0;
*start = hash;
}
+ /* calculate max space required for lu_dirent */
recsize = lu_dirent_calc_size(len, attr);
- CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n",
- name, ent, nob, hash, len, len, len, name);
-
if (nob >= recsize) {
- ent->lde_hash = cpu_to_le64(hash);
- ent->lde_namelen = cpu_to_le16(len);
- ent->lde_reclen = cpu_to_le16(recsize);
- memcpy(ent->lde_name, name, len);
-
- result = mdd_append_attrs(env, mdd, attr, iops, it, ent);
+ result = iops->rec(env, it, ent, attr);
+ if (result == -ESTALE)
+ goto next;
if (result != 0)
goto out;
+
+ /* osd might not able to pack all attributes,
+ * so recheck rec length */
+ recsize = le16_to_cpu(ent->lde_reclen);
} else {
/*
* record doesn't fit into page, enlarge previous one.
next:
result = iops->next(env, it);
+ if (result == -ESTALE)
+ goto next;
} while (result == 0);
out:
rc = iops->load(env, it, rdpg->rp_hash);
- if (rc == 0)
+ if (rc == 0){
/*
* Iterator didn't find record with exactly the key requested.
*
* state)---position it on the next item.
*/
rc = iops->next(env, it);
- else if (rc > 0)
+ } else if (rc > 0)
rc = 0;
/*
return next->do_ops->do_object_sync(env, next);
}
+static dt_obj_version_t mdd_version_get(const struct lu_env *env,
+ struct md_object *obj)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_get(env, mdd_object_child(mdd_obj));
+}
+
+static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
+ dt_obj_version_t version)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_set(env, mdd_object_child(mdd_obj), version);
+}
+
const struct md_object_operations mdd_obj_ops = {
.moo_permission = mdd_permission,
.moo_attr_get = mdd_attr_get,
.moo_readlink = mdd_readlink,
.moo_capa_get = mdd_capa_get,
.moo_object_sync = mdd_object_sync,
+ .moo_version_get = mdd_version_get,
+ .moo_version_set = mdd_version_set,
.moo_path = mdd_path,
};