X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_object.c;h=c78e5f2499ef27bfbe6f1296cfa6b99713f57956;hp=56f595dd4c449ef0e4e5484d244648da695db2e7;hb=6e3ec5812ebd1b5ecf7cae584f429b013ffe7431;hpb=e9dc8feb37943024452dcc5acb3f96b95c6a518d diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 56f595d..c78e5f2 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -46,7 +46,11 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include @@ -55,7 +59,11 @@ #include #include +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include @@ -67,6 +75,15 @@ static int mdd_xattr_get(const struct lu_env *env, struct md_object *obj, struct lu_buf *buf, const char *name); +int mdd_data_get(const struct lu_env *env, struct mdd_object *obj, + void **data) +{ + LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n", + PFID(mdd_object_fid(obj))); + mdo_data_get(env, obj, data); + return 0; +} + int mdd_la_get(const struct lu_env *env, struct mdd_object *obj, struct lu_attr *la, struct lustre_capa *capa) { @@ -155,7 +172,11 @@ struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len) return buf; } -/* preserve old data */ +/** Increase the size of the \a mti_big_buf. + * preserves old data in buffer + * old buffer remains unchanged on error + * \retval 0 or -ENOMEM + */ int mdd_buf_grow(const struct lu_env *env, ssize_t len) { struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf; @@ -252,7 +273,7 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env, } static int mdd_object_init(const struct lu_env *env, struct lu_object *o, - const struct lu_object_conf *_) + const struct lu_object_conf *unused) { struct mdd_device *d = lu2mdd_dev(o->lo_dev); struct mdd_object *mdd_obj = lu2mdd_obj(o); @@ -291,7 +312,11 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o) static int mdd_object_print(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { - return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o); + struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o); + return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, " + "valid=%x, cltime=%llu, flags=%lx)", + mdd, mdd->mod_count, mdd->mod_valid, + mdd->mod_cltime, mdd->mod_flags); } static const struct lu_object_operations mdd_lu_obj_ops = { @@ -372,6 +397,7 @@ out: /** mdd_path() lookup structure. */ struct path_lookup_info { __u64 pli_recno; /**< history point */ + __u64 pli_currec; /**< current record */ struct lu_fid pli_fid; struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */ struct mdd_object *pli_mdd_obj; @@ -423,12 +449,10 @@ static int mdd_path_current(const struct lu_env *env, /* Get parent fid and object name */ mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD); buf = mdd_links_get(env, mdd_obj); - if (IS_ERR(buf)) - GOTO(out, rc = PTR_ERR(buf)); mdd_read_unlock(env, mdd_obj); mdd_object_put(env, mdd_obj); - if (rc < 0) - GOTO(out, rc); + if (IS_ERR(buf)) + GOTO(out, rc = PTR_ERR(buf)); leh = buf->lb_buf; lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ @@ -462,7 +486,12 @@ static int mdd_path_current(const struct lu_env *env, pli->pli_fids[pli->pli_fidcount] = *tmpfid; } - /* Verify that our path hasn't changed since we started the lookup */ + /* Verify that our path hasn't changed since we started the lookup. + Record the current index, and verify the path resolves to the + same fid. If it does, then the path is correct as of this index. */ + cfs_spin_lock(&mdd->mdd_cl.mc_lock); + pli->pli_currec = mdd->mdd_cl.mc_index; + cfs_spin_unlock(&mdd->mdd_cl.mc_lock); rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid); if (rc) { CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc); @@ -486,9 +515,15 @@ out: return rc; } +static int mdd_path_historic(const struct lu_env *env, + struct path_lookup_info *pli) +{ + return 0; +} + /* Returns the full path to this fid, as of changelog record recno. */ static int mdd_path(const struct lu_env *env, struct md_object *obj, - char *path, int pathlen, __u64 recno, int *linkno) + char *path, int pathlen, __u64 *recno, int *linkno) { struct path_lookup_info *pli; int tries = 3; @@ -509,7 +544,7 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, RETURN(-ENOMEM); pli->pli_mdd_obj = md2mdd_obj(obj); - pli->pli_recno = recno; + pli->pli_recno = *recno; pli->pli_path = path; pli->pli_pathlen = pathlen; pli->pli_linkno = *linkno; @@ -518,7 +553,6 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, while (tries-- && rc == -EAGAIN) rc = mdd_path_current(env, pli); -#if 0 /* We need old path names only for replication */ /* For historical path lookup, the current links may not have existed * at "recno" time. We must switch over to earlier links/parents * by using the changelog records. If the earlier parent doesn't @@ -527,12 +561,13 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, * We may ignore this problem for the initial implementation and * state that an "original" hardlink must still exist for us to find * historic path name. */ - if (pli->pli_recno != -1) + if (pli->pli_recno != -1) { rc = mdd_path_historic(env, pli); -#endif - - /* return next link index to caller */ - *linkno = pli->pli_linkno; + } else { + *recno = pli->pli_currec; + /* Return next link index to caller */ + *linkno = pli->pli_linkno; + } OBD_FREE_PTR(pli); @@ -571,8 +606,8 @@ int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, RETURN(rc); } -static int mdd_get_default_md(struct mdd_object *mdd_obj, - struct lov_mds_md *lmm, int *size) +int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm, + int *size) { struct lov_desc *ldesc; struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj); @@ -650,6 +685,57 @@ static int __mdd_lmv_get(const struct lu_env *env, RETURN(rc); } +static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj, + struct md_attr *ma) +{ + struct mdd_thread_info *info = mdd_env_info(env); + struct lustre_mdt_attrs *lma = + (struct lustre_mdt_attrs *)info->mti_xattr_buf; + int lma_size; + int rc; + ENTRY; + + /* If all needed data are already valid, nothing to do */ + if ((ma->ma_valid & (MA_HSM | MA_SOM)) == + (ma->ma_need & (MA_HSM | MA_SOM))) + RETURN(0); + + /* Read LMA from disk EA */ + lma_size = sizeof(info->mti_xattr_buf); + rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA); + if (rc <= 0) + RETURN(rc); + + /* Useless to check LMA incompatibility because this is already done in + * osd_ea_fid_get(), and this will fail long before this code is + * called. + * So, if we are here, LMA is compatible. + */ + + lustre_lma_swab(lma); + + /* Swab and copy LMA */ + if (ma->ma_need & MA_HSM) { + if (lma->lma_compat & LMAC_HSM) + ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK; + else + ma->ma_hsm_flags = 0; + ma->ma_valid |= MA_HSM; + } + + /* Copy SOM */ + if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) { + LASSERT(ma->ma_som != NULL); + ma->ma_som->msd_ioepoch = lma->lma_ioepoch; + ma->ma_som->msd_size = lma->lma_som_size; + ma->ma_som->msd_blocks = lma->lma_som_blocks; + ma->ma_som->msd_mountid = lma->lma_som_mountid; + ma->ma_valid |= MA_SOM; + } + + RETURN(0); +} + static int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma) @@ -669,6 +755,10 @@ static int mdd_attr_get_internal(const struct lu_env *env, if (S_ISDIR(mdd_object_type(mdd_obj))) rc = __mdd_lmv_get(env, mdd_obj, ma); } + if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) { + if (S_ISREG(mdd_object_type(mdd_obj))) + rc = __mdd_lma_get(env, mdd_obj, ma); + } #ifdef CONFIG_FS_POSIX_ACL if (rc == 0 && ma->ma_need & MA_ACL_DEF) { if (S_ISDIR(mdd_object_type(mdd_obj))) @@ -684,7 +774,8 @@ int mdd_attr_get_internal_locked(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma) { int rc; - int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF); + int needlock = ma->ma_need & + (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM); if (needlock) mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD); @@ -930,7 +1021,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, struct lu_attr *la, const struct md_attr *ma) { struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct md_ucred *uc = md_ucred(env); + struct md_ucred *uc; int rc; ENTRY; @@ -945,6 +1036,13 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE)) RETURN(-EPERM); + /* export destroy does not have ->le_ses, but we may want + * to drop LUSTRE_SOM_FL. */ + if (!env->le_ses) + RETURN(0); + + uc = md_ucred(env); + rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA); if (rc) RETURN(rc); @@ -1020,7 +1118,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, !mdd_capable(uc, CFS_CAP_FOWNER)) RETURN(-EPERM); - if (la->la_mode == (umode_t) -1) + if (la->la_mode == (cfs_umode_t) -1) la->la_mode = tmp_la->la_mode; else la->la_mode = (la->la_mode & S_IALLUGO) | @@ -1165,10 +1263,10 @@ static int mdd_changelog_data_store(const struct lu_env *env, RETURN(-ENOMEM); rec = (struct llog_changelog_rec *)buf->lb_buf; - rec->cr_flags = CLF_VERSION; - rec->cr_type = (__u32)type; - rec->cr_tfid = *tfid; - rec->cr_namelen = 0; + rec->cr.cr_flags = CLF_VERSION; + rec->cr.cr_type = (__u32)type; + rec->cr.cr_tfid = *tfid; + rec->cr.cr_namelen = 0; mdd_obj->mod_cltime = cfs_time_current_64(); rc = mdd_changelog_llog_write(mdd, rec, handle); @@ -1181,6 +1279,83 @@ static int mdd_changelog_data_store(const struct lu_env *env, return 0; } +/** + * Should be called with write lock held. + * + * \see mdd_lma_set_locked(). + */ +static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj, + const struct md_attr *ma, struct thandle *handle) +{ + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_buf *buf; + struct lustre_mdt_attrs *lma = + (struct lustre_mdt_attrs *) info->mti_xattr_buf; + int lmasize = sizeof(struct lustre_mdt_attrs); + int rc = 0; + + ENTRY; + + /* Either HSM or SOM part is not valid, we need to read it before */ + if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) { + rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA); + if (rc <= 0) + RETURN(rc); + + lustre_lma_swab(lma); + } else { + memset(lma, 0, lmasize); + } + + /* Copy HSM data */ + if (ma->ma_valid & MA_HSM) { + lma->lma_flags |= ma->ma_hsm_flags & HSM_FLAGS_MASK; + lma->lma_compat |= LMAC_HSM; + } + + /* Copy SOM data */ + if (ma->ma_valid & MA_SOM) { + LASSERT(ma->ma_som != NULL); + if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) { + lma->lma_compat &= ~LMAC_SOM; + } else { + lma->lma_compat |= LMAC_SOM; + lma->lma_ioepoch = ma->ma_som->msd_ioepoch; + lma->lma_som_size = ma->ma_som->msd_size; + lma->lma_som_blocks = ma->ma_som->msd_blocks; + lma->lma_som_mountid = ma->ma_som->msd_mountid; + } + } + + /* Copy FID */ + memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid)); + + lustre_lma_swab(lma); + buf = mdd_buf_get(env, lma, lmasize); + rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle); + + RETURN(rc); +} + +/** + * Save LMA extended attributes with data from \a ma. + * + * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if + * not, LMA EA will be first read from disk, modified and write back. + * + */ +static int mdd_lma_set_locked(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct md_attr *ma, struct thandle *handle) +{ + int rc; + + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); + rc = __mdd_lma_set(env, mdd_obj, ma, handle); + mdd_write_unlock(env, mdd_obj); + return rc; +} + /* set attr and LOV EA at once, return updated attr */ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, const struct md_attr *ma) @@ -1194,11 +1369,13 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qnids[MAXQUOTAS] = { 0, 0 }; unsigned int qoids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1242,18 +1419,19 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, mdd_quota_wrapper(la_copy, qnids); mdd_quota_wrapper(la_tmp, qoids); /* get file quota for new owner */ - lquota_chkquota(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], 1, - &inode_pending, NULL, 0, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qnids, inode_pending, 1, NULL, 0, + NULL, 0); block_count = (la_tmp->la_blocks + 7) >> 3; - if (block_count) + if (block_count) { + void *data = NULL; + mdd_data_get(env, mdd_obj, &data); /* get block quota for new owner */ lquota_chkquota(mds_quota_interface_ref, obd, - qnids[USRQUOTA], - qnids[GRPQUOTA], - block_count, &block_pending, - NULL, LQUOTA_FLAGS_BLK, - NULL, 0); + exp, qnids, block_pending, + block_count, NULL, + LQUOTA_FLAGS_BLK, data, 1); + } } } #endif @@ -1280,7 +1458,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, } if (rc == 0 && ma->ma_valid & MA_LOV) { - umode_t mode; + cfs_umode_t mode; mode = mdd_object_type(mdd_obj); if (S_ISREG(mode) || S_ISDIR(mode)) { @@ -1293,6 +1471,14 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, } } + if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) { + cfs_umode_t mode; + + mode = mdd_object_type(mdd_obj); + if (S_ISREG(mode)) + rc = mdd_lma_set_locked(env, mdd_obj, ma, handle); + + } cleanup: if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))) rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj, @@ -1305,14 +1491,10 @@ cleanup: } #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], - inode_pending, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], - block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qnids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qnids, + block_pending, 1); /* Trigger dqrel/dqacq for original owner and new owner. * If failed, the next call for lquota_chkquota will * process it. */ @@ -1378,6 +1560,11 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj, RETURN(rc); mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP); + /* security-replated changes may require sync */ + if (!strcmp(name, XATTR_NAME_ACL_ACCESS) && + mdd->mdd_sync_permission == 1) + txn_param_sync(&mdd_env_info(env)->mti_param); + handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); @@ -1548,10 +1735,12 @@ static int mdd_object_create(const struct lu_env *env, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc = 0; ENTRY; @@ -1561,8 +1750,8 @@ static int mdd_object_create(const struct lu_env *env, quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA], - qids[GRPQUOTA], 1, &inode_pending, NULL, 0, + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qids, inode_pending, 1, NULL, 0, NULL, 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: @@ -1575,10 +1764,9 @@ static int mdd_object_create(const struct lu_env *env, } /* get block quota for child */ if (block_count) - lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - block_count, &block_pending, NULL, - LQUOTA_FLAGS_BLK, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qids, block_pending, block_count, + NULL, LQUOTA_FLAGS_BLK, NULL, 0); } #endif @@ -1645,18 +1833,14 @@ unlock: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - inode_pending, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qids, + block_pending, 1); /* Trigger dqacq on the owner of child. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, - FSFILT_OP_CREATE_PARTIAL_CHILD); + quota_opc); } #endif return rc; @@ -1827,6 +2011,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; int rc; int reset = 1; @@ -1850,27 +2035,53 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* release open count */ mdd_obj->mod_count --; - if (mdd_obj->mod_count == 0) { + if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) { /* remove link to object from orphan index */ - if (mdd_obj->mod_flags & ORPHAN_OBJ) - __mdd_orphan_del(env, mdd_obj, handle); + rc = __mdd_orphan_del(env, mdd_obj, handle); + if (rc == 0) { + CDEBUG(D_HA, "Object "DFID" is deleted from orphan " + "list, OSS objects to be destroyed.\n", + PFID(mdd_object_fid(mdd_obj))); + } else { + CERROR("Object "DFID" can not be deleted from orphan " + "list, maybe cause OST objects can not be " + "destroyed (err: %d).\n", + PFID(mdd_object_fid(mdd_obj)), rc); + /* If object was not deleted from orphan list, do not + * destroy OSS objects, which will be done when next + * recovery. */ + GOTO(out, rc); + } } rc = mdd_iattr_get(env, mdd_obj, ma); - if (rc == 0) { - if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) { - rc = mdd_object_kill(env, mdd_obj, ma); + /* Object maybe not in orphan list originally, it is rare case for + * mdd_finish_unlink() failure. */ + if (rc == 0 && ma->ma_attr.la_nlink == 0) { #ifdef HAVE_QUOTA_SUPPORT - if (mds->mds_quota) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; - mdd_quota_wrapper(&ma->ma_attr, qids); - } + if (mds->mds_quota) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + } #endif - if (rc == 0) - reset = 0; + /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */ + if (ma->ma_valid & MA_FLAGS && + ma->ma_attr_flags & MDS_CLOSE_CLEANUP) { + rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr); + } else { + rc = mdd_object_kill(env, mdd_obj, ma); + if (rc == 0) + reset = 0; } + + if (rc != 0) + CERROR("Error when prepare to delete Object "DFID" , " + "which will cause OST objects can not be " + "destroyed.\n", PFID(mdd_object_fid(mdd_obj))); } + EXIT; +out: if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); @@ -1883,7 +2094,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, quota_opc); #endif - RETURN(rc); + return rc; } /* @@ -1905,15 +2116,14 @@ static int mdd_readpage_sanity_check(const struct lu_env *env, RETURN(rc); } -static int mdd_dir_page_build(const struct lu_env *env, int first, - void *area, int nob, const struct dt_it_ops *iops, - struct dt_it *it, __u64 *start, __u64 *end, - struct lu_dirent **last) +static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, + int first, void *area, int nob, + const struct dt_it_ops *iops, struct dt_it *it, + __u64 *start, __u64 *end, + struct lu_dirent **last, __u32 attr) { - struct lu_fid *fid = &mdd_env_info(env)->mti_fid2; - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_fid_pack *pack = &info->mti_pack; int result; + __u64 hash = 0; struct lu_dirent *ent; if (first) { @@ -1922,56 +2132,62 @@ static int mdd_dir_page_build(const struct lu_env *env, int first, nob -= sizeof (struct lu_dirpage); } - LASSERT(nob > sizeof *ent); - ent = area; - result = 0; do { - char *name; int len; int recsize; - __u64 hash; - name = (char *)iops->key(env, it); len = iops->key_size(env, it); - pack = (struct lu_fid_pack *)iops->rec(env, it); - result = fid_unpack(pack, fid); - if (result != 0) - break; + /* IAM iterator can return record with zero len. */ + if (len == 0) + goto next; - recsize = (sizeof(*ent) + len + 7) & ~7; hash = iops->store(env, it); - *end = hash; + if (unlikely(first)) { + first = 0; + *start = hash; + } - CDEBUG(D_INFO, "%p %p %d "DFID": "LPU64" (%d) \"%*.*s\"\n", - name, ent, nob, PFID(fid), hash, len, len, len, name); + /* calculate max space required for lu_dirent */ + recsize = lu_dirent_calc_size(len, attr); if (nob >= recsize) { - ent->lde_fid = *fid; - fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid); - ent->lde_hash = hash; - ent->lde_namelen = cpu_to_le16(len); - ent->lde_reclen = cpu_to_le16(recsize); - memcpy(ent->lde_name, name, len); - if (first && ent == area) - *start = hash; - *last = ent; - ent = (void *)ent + recsize; - nob -= recsize; - result = iops->next(env, it); + result = iops->rec(env, it, ent, attr); + if (result == -ESTALE) + goto next; + if (result != 0) + goto out; + + /* osd might not able to pack all attributes, + * so recheck rec length */ + recsize = le16_to_cpu(ent->lde_reclen); } else { /* * record doesn't fit into page, enlarge previous one. */ - LASSERT(*last != NULL); - (*last)->lde_reclen = - cpu_to_le16(le16_to_cpu((*last)->lde_reclen) + - nob); - break; + if (*last) { + (*last)->lde_reclen = + cpu_to_le16(le16_to_cpu((*last)->lde_reclen) + + nob); + result = 0; + } else + result = -EINVAL; + + goto out; } + *last = ent; + ent = (void *)ent + recsize; + nob -= recsize; + +next: + result = iops->next(env, it); + if (result == -ESTALE) + goto next; } while (result == 0); +out: + *end = hash; return result; } @@ -1983,6 +2199,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, const struct dt_it_ops *iops; struct page *pg; struct lu_dirent *last = NULL; + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); int i; int rc; int nob; @@ -2005,7 +2222,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, rc = iops->load(env, it, rdpg->rp_hash); - if (rc == 0) + if (rc == 0){ /* * Iterator didn't find record with exactly the key requested. * @@ -2018,7 +2235,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, * state)---position it on the next item. */ rc = iops->next(env, it); - else if (rc > 0) + } else if (rc > 0) rc = 0; /* @@ -2032,11 +2249,14 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, i++, nob -= CFS_PAGE_SIZE) { LASSERT(i < rdpg->rp_npages); pg = rdpg->rp_pages[i]; - rc = mdd_dir_page_build(env, !i, cfs_kmap(pg), + rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg), min_t(int, nob, CFS_PAGE_SIZE), iops, - it, &hash_start, &hash_end, &last); - if (rc != 0 || i == rdpg->rp_npages - 1) - last->lde_reclen = 0; + it, &hash_start, &hash_end, &last, + rdpg->rp_attrs); + if (rc != 0 || i == rdpg->rp_npages - 1) { + if (last) + last->lde_reclen = 0; + } cfs_kunmap(pg); } if (rc > 0) { @@ -2050,13 +2270,14 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, struct lu_dirpage *dp; dp = cfs_kmap(rdpg->rp_pages[0]); - dp->ldp_hash_start = rdpg->rp_hash; - dp->ldp_hash_end = hash_end; + dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash); + dp->ldp_hash_end = cpu_to_le64(hash_end); if (i == 0) /* * No pages were processed, mark this. */ dp->ldp_flags |= LDF_EMPTY; + dp->ldp_flags = cpu_to_le32(dp->ldp_flags); cfs_kunmap(rdpg->rp_pages[0]); } @@ -2066,8 +2287,8 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, return rc; } -static int mdd_readpage(const struct lu_env *env, struct md_object *obj, - const struct lu_rdpg *rdpg) +int mdd_readpage(const struct lu_env *env, struct md_object *obj, + const struct lu_rdpg *rdpg) { struct mdd_object *mdd_obj = md2mdd_obj(obj); int rc; @@ -2098,8 +2319,8 @@ static int mdd_readpage(const struct lu_env *env, struct md_object *obj, pg = rdpg->rp_pages[0]; dp = (struct lu_dirpage*)cfs_kmap(pg); memset(dp, 0 , sizeof(struct lu_dirpage)); - dp->ldp_hash_start = rdpg->rp_hash; - dp->ldp_hash_end = DIR_END_OFF; + dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash); + dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF); dp->ldp_flags |= LDF_EMPTY; dp->ldp_flags = cpu_to_le32(dp->ldp_flags); cfs_kunmap(pg); @@ -2124,6 +2345,24 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj) return next->do_ops->do_object_sync(env, next); } +static dt_obj_version_t mdd_version_get(const struct lu_env *env, + struct md_object *obj) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_get(env, mdd_object_child(mdd_obj)); +} + +static void mdd_version_set(const struct lu_env *env, struct md_object *obj, + dt_obj_version_t version) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_set(env, mdd_object_child(mdd_obj), version); +} + const struct md_object_operations mdd_obj_ops = { .moo_permission = mdd_permission, .moo_attr_get = mdd_attr_get, @@ -2141,5 +2380,7 @@ const struct md_object_operations mdd_obj_ops = { .moo_readlink = mdd_readlink, .moo_capa_get = mdd_capa_get, .moo_object_sync = mdd_object_sync, + .moo_version_get = mdd_version_get, + .moo_version_set = mdd_version_set, .moo_path = mdd_path, };