X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_object.c;h=19670319e7858ed0ace870d5f49fecdd339b6346;hb=42c04e8ee918adb6ce658334c12610e925466752;hp=79166fcc7418ff5017ae1331bf0e5d01a677dfad;hpb=6dc53dbb70e05edeb40ca021b7d3df1203560147;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 79166fc..1967031 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -46,7 +46,11 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include @@ -55,7 +59,11 @@ #include #include +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include @@ -265,7 +273,7 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env, } static int mdd_object_init(const struct lu_env *env, struct lu_object *o, - const struct lu_object_conf *_) + const struct lu_object_conf *unused) { struct mdd_device *d = lu2mdd_dev(o->lo_dev); struct mdd_object *mdd_obj = lu2mdd_obj(o); @@ -677,6 +685,57 @@ static int __mdd_lmv_get(const struct lu_env *env, RETURN(rc); } +static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj, + struct md_attr *ma) +{ + struct mdd_thread_info *info = mdd_env_info(env); + struct lustre_mdt_attrs *lma = + (struct lustre_mdt_attrs *)info->mti_xattr_buf; + int lma_size; + int rc; + ENTRY; + + /* If all needed data are already valid, nothing to do */ + if ((ma->ma_valid & (MA_HSM | MA_SOM)) == + (ma->ma_need & (MA_HSM | MA_SOM))) + RETURN(0); + + /* Read LMA from disk EA */ + lma_size = sizeof(info->mti_xattr_buf); + rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA); + if (rc <= 0) + RETURN(rc); + + /* Useless to check LMA incompatibility because this is already done in + * osd_ea_fid_get(), and this will fail long before this code is + * called. + * So, if we are here, LMA is compatible. + */ + + lustre_lma_swab(lma); + + /* Swab and copy LMA */ + if (ma->ma_need & MA_HSM) { + if (lma->lma_compat & LMAC_HSM) + ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK; + else + ma->ma_hsm_flags = 0; + ma->ma_valid |= MA_HSM; + } + + /* Copy SOM */ + if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) { + LASSERT(ma->ma_som != NULL); + ma->ma_som->msd_ioepoch = lma->lma_ioepoch; + ma->ma_som->msd_size = lma->lma_som_size; + ma->ma_som->msd_blocks = lma->lma_som_blocks; + ma->ma_som->msd_mountid = lma->lma_som_mountid; + ma->ma_valid |= MA_SOM; + } + + RETURN(0); +} + static int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma) @@ -696,6 +755,10 @@ static int mdd_attr_get_internal(const struct lu_env *env, if (S_ISDIR(mdd_object_type(mdd_obj))) rc = __mdd_lmv_get(env, mdd_obj, ma); } + if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) { + if (S_ISREG(mdd_object_type(mdd_obj))) + rc = __mdd_lma_get(env, mdd_obj, ma); + } #ifdef CONFIG_FS_POSIX_ACL if (rc == 0 && ma->ma_need & MA_ACL_DEF) { if (S_ISDIR(mdd_object_type(mdd_obj))) @@ -711,7 +774,8 @@ int mdd_attr_get_internal_locked(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma) { int rc; - int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF); + int needlock = ma->ma_need & + (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM); if (needlock) mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD); @@ -957,7 +1021,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, struct lu_attr *la, const struct md_attr *ma) { struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct md_ucred *uc = md_ucred(env); + struct md_ucred *uc; int rc; ENTRY; @@ -972,6 +1036,13 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE)) RETURN(-EPERM); + /* export destroy does not have ->le_ses, but we may want + * to drop LUSTRE_SOM_FL. */ + if (!env->le_ses) + RETURN(0); + + uc = md_ucred(env); + rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA); if (rc) RETURN(rc); @@ -1192,10 +1263,10 @@ static int mdd_changelog_data_store(const struct lu_env *env, RETURN(-ENOMEM); rec = (struct llog_changelog_rec *)buf->lb_buf; - rec->cr_flags = CLF_VERSION; - rec->cr_type = (__u32)type; - rec->cr_tfid = *tfid; - rec->cr_namelen = 0; + rec->cr.cr_flags = CLF_VERSION; + rec->cr.cr_type = (__u32)type; + rec->cr.cr_tfid = *tfid; + rec->cr.cr_namelen = 0; mdd_obj->mod_cltime = cfs_time_current_64(); rc = mdd_changelog_llog_write(mdd, rec, handle); @@ -1208,6 +1279,83 @@ static int mdd_changelog_data_store(const struct lu_env *env, return 0; } +/** + * Should be called with write lock held. + * + * \see mdd_lma_set_locked(). + */ +static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj, + const struct md_attr *ma, struct thandle *handle) +{ + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_buf *buf; + struct lustre_mdt_attrs *lma = + (struct lustre_mdt_attrs *) info->mti_xattr_buf; + int lmasize = sizeof(struct lustre_mdt_attrs); + int rc = 0; + + ENTRY; + + /* Either HSM or SOM part is not valid, we need to read it before */ + if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) { + rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA); + if (rc <= 0) + RETURN(rc); + + lustre_lma_swab(lma); + } else { + memset(lma, 0, lmasize); + } + + /* Copy HSM data */ + if (ma->ma_valid & MA_HSM) { + lma->lma_flags |= ma->ma_hsm_flags & HSM_FLAGS_MASK; + lma->lma_compat |= LMAC_HSM; + } + + /* Copy SOM data */ + if (ma->ma_valid & MA_SOM) { + LASSERT(ma->ma_som != NULL); + if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) { + lma->lma_compat &= ~LMAC_SOM; + } else { + lma->lma_compat |= LMAC_SOM; + lma->lma_ioepoch = ma->ma_som->msd_ioepoch; + lma->lma_som_size = ma->ma_som->msd_size; + lma->lma_som_blocks = ma->ma_som->msd_blocks; + lma->lma_som_mountid = ma->ma_som->msd_mountid; + } + } + + /* Copy FID */ + memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid)); + + lustre_lma_swab(lma); + buf = mdd_buf_get(env, lma, lmasize); + rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle); + + RETURN(rc); +} + +/** + * Save LMA extended attributes with data from \a ma. + * + * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if + * not, LMA EA will be first read from disk, modified and write back. + * + */ +static int mdd_lma_set_locked(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct md_attr *ma, struct thandle *handle) +{ + int rc; + + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); + rc = __mdd_lma_set(env, mdd_obj, ma, handle); + mdd_write_unlock(env, mdd_obj); + return rc; +} + /* set attr and LOV EA at once, return updated attr */ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, const struct md_attr *ma) @@ -1221,6 +1369,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qnids[MAXQUOTAS] = { 0, 0 }; unsigned int qoids[MAXQUOTAS] = { 0, 0 }; @@ -1270,15 +1419,16 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, mdd_quota_wrapper(la_copy, qnids); mdd_quota_wrapper(la_tmp, qoids); /* get file quota for new owner */ - lquota_chkquota(mds_quota_interface_ref, obd, qnids, - inode_pending, 1, NULL, 0, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qnids, inode_pending, 1, NULL, 0, + NULL, 0); block_count = (la_tmp->la_blocks + 7) >> 3; if (block_count) { void *data = NULL; mdd_data_get(env, mdd_obj, &data); /* get block quota for new owner */ lquota_chkquota(mds_quota_interface_ref, obd, - qnids, block_pending, + exp, qnids, block_pending, block_count, NULL, LQUOTA_FLAGS_BLK, data, 1); } @@ -1321,6 +1471,14 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, } } + if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) { + umode_t mode; + + mode = mdd_object_type(mdd_obj); + if (S_ISREG(mode)) + rc = mdd_lma_set_locked(env, mdd_obj, ma, handle); + + } cleanup: if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))) rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj, @@ -1402,6 +1560,11 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj, RETURN(rc); mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP); + /* security-replated changes may require sync */ + if (!strcmp(name, XATTR_NAME_ACL_ACCESS) && + mdd->mdd_sync_permission == 1) + txn_param_sync(&mdd_env_info(env)->mti_param); + handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); @@ -1572,6 +1735,7 @@ static int mdd_object_create(const struct lu_env *env, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; @@ -1586,8 +1750,9 @@ static int mdd_object_create(const struct lu_env *env, quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, qids, - inode_pending, 1, NULL, 0, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qids, inode_pending, 1, NULL, 0, + NULL, 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: case S_IFDIR: @@ -1599,9 +1764,9 @@ static int mdd_object_create(const struct lu_env *env, } /* get block quota for child */ if (block_count) - lquota_chkquota(mds_quota_interface_ref, obd, qids, - block_pending, block_count, NULL, - LQUOTA_FLAGS_BLK, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qids, block_pending, block_count, + NULL, LQUOTA_FLAGS_BLK, NULL, 0); } #endif @@ -1846,6 +2011,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; int rc; int reset = 1; @@ -1869,27 +2035,53 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* release open count */ mdd_obj->mod_count --; - if (mdd_obj->mod_count == 0) { + if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) { /* remove link to object from orphan index */ - if (mdd_obj->mod_flags & ORPHAN_OBJ) - __mdd_orphan_del(env, mdd_obj, handle); + rc = __mdd_orphan_del(env, mdd_obj, handle); + if (rc == 0) { + CDEBUG(D_HA, "Object "DFID" is deleted from orphan " + "list, OSS objects to be destroyed.\n", + PFID(mdd_object_fid(mdd_obj))); + } else { + CERROR("Object "DFID" can not be deleted from orphan " + "list, maybe cause OST objects can not be " + "destroyed (err: %d).\n", + PFID(mdd_object_fid(mdd_obj)), rc); + /* If object was not deleted from orphan list, do not + * destroy OSS objects, which will be done when next + * recovery. */ + GOTO(out, rc); + } } rc = mdd_iattr_get(env, mdd_obj, ma); - if (rc == 0) { - if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) { - rc = mdd_object_kill(env, mdd_obj, ma); + /* Object maybe not in orphan list originally, it is rare case for + * mdd_finish_unlink() failure. */ + if (rc == 0 && ma->ma_attr.la_nlink == 0) { #ifdef HAVE_QUOTA_SUPPORT - if (mds->mds_quota) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; - mdd_quota_wrapper(&ma->ma_attr, qids); - } + if (mds->mds_quota) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + } #endif - if (rc == 0) - reset = 0; + /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */ + if (ma->ma_valid & MA_FLAGS && + ma->ma_attr_flags & MDS_CLOSE_CLEANUP) { + rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr); + } else { + rc = mdd_object_kill(env, mdd_obj, ma); + if (rc == 0) + reset = 0; } + + if (rc != 0) + CERROR("Error when prepare to delete Object "DFID" , " + "which will cause OST objects can not be " + "destroyed.\n", PFID(mdd_object_fid(mdd_obj))); } + EXIT; +out: if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); @@ -1902,7 +2094,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, quota_opc); #endif - RETURN(rc); + return rc; } /* @@ -1924,71 +2116,6 @@ static int mdd_readpage_sanity_check(const struct lu_env *env, RETURN(rc); } -static int mdd_append_attrs(const struct lu_env *env, - struct mdd_device *mdd, - __u32 attr, - const struct dt_it_ops *iops, - struct dt_it *it, - struct lu_dirent*ent) -{ - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_fid *fid = &info->mti_fid2; - int len = cpu_to_le16(ent->lde_namelen); - const unsigned align = sizeof(struct luda_type) - 1; - struct lu_fid_pack *pack; - struct mdd_object *obj; - struct luda_type *lt; - int rc = 0; - - if (attr & LUDA_FID) { - pack = (struct lu_fid_pack *)iops->rec(env, it); - if (IS_ERR(pack)) { - rc = PTR_ERR(pack); - ent->lde_attrs = 0; - goto out; - } - rc = fid_unpack(pack, fid); - if (rc != 0) { - ent->lde_attrs = 0; - goto out; - } - - fid_cpu_to_le(&ent->lde_fid, fid); - ent->lde_attrs = LUDA_FID; - } - - /* check if file type is required */ - if (attr & LUDA_TYPE) { - if (!(attr & LUDA_FID)) { - CERROR("wrong attr : [%x]\n",attr); - rc = -EINVAL; - goto out; - } - - obj = mdd_object_find(env, mdd, fid); - if (obj == NULL) /* remote object */ - goto out; - - if (IS_ERR(obj)) { - rc = PTR_ERR(obj); - goto out; - } - - if (mdd_object_exists(obj) == +1) { - len = (len + align) & ~align; - - lt = (void *) ent->lde_name + len; - lt->lt_type = cpu_to_le16(mdd_object_type(obj)); - - ent->lde_attrs |= LUDA_TYPE; - } - mdd_object_put(env, obj); - } -out: - ent->lde_attrs = cpu_to_le32(ent->lde_attrs); - return rc; -} - static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, int first, void *area, int nob, const struct dt_it_ops *iops, struct dt_it *it, @@ -1996,8 +2123,8 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, struct lu_dirent **last, __u32 attr) { int result; + __u64 hash = 0; struct lu_dirent *ent; - __u64 hash = 0; if (first) { memset(area, 0, sizeof (struct lu_dirpage)); @@ -2007,7 +2134,6 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, ent = area; do { - char *name; int len; int recsize; @@ -2017,30 +2143,25 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, if (len == 0) goto next; - name = (char *)iops->key(env, it); hash = iops->store(env, it); - if (unlikely(first)) { first = 0; *start = hash; } + /* calculate max space required for lu_dirent */ recsize = lu_dirent_calc_size(len, attr); - CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n", - name, ent, nob, hash, len, len, len, name); - if (nob >= recsize) { - ent->lde_hash = cpu_to_le64(hash); - ent->lde_namelen = cpu_to_le16(len); - ent->lde_reclen = cpu_to_le16(recsize); - memcpy(ent->lde_name, name, len); - - result = mdd_append_attrs(env, mdd, attr, iops, it, ent); + result = iops->rec(env, it, ent, attr); if (result == -ESTALE) goto next; if (result != 0) goto out; + + /* osd might not able to pack all attributes, + * so recheck rec length */ + recsize = le16_to_cpu(ent->lde_reclen); } else { /* * record doesn't fit into page, enlarge previous one. @@ -2101,7 +2222,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, rc = iops->load(env, it, rdpg->rp_hash); - if (rc == 0) + if (rc == 0){ /* * Iterator didn't find record with exactly the key requested. * @@ -2114,7 +2235,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, * state)---position it on the next item. */ rc = iops->next(env, it); - else if (rc > 0) + } else if (rc > 0) rc = 0; /*