X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_object.c;h=224aff7b93d52fe03812a39fb85f73b9357b8c6f;hb=f2bedfe6e389eccf931f339cd8efe93f72ed530c;hp=9d0dbc1737c331eee7e97fd082d2b3741b568802;hpb=a87b1b04494193c39e3e4289b0d41c228d9451b1;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 9d0dbc1..224aff7 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -265,7 +265,7 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env, } static int mdd_object_init(const struct lu_env *env, struct lu_object *o, - const struct lu_object_conf *_) + const struct lu_object_conf *unused) { struct mdd_device *d = lu2mdd_dev(o->lo_dev); struct mdd_object *mdd_obj = lu2mdd_obj(o); @@ -304,7 +304,11 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o) static int mdd_object_print(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { - return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o); + struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o); + return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, " + "valid=%x, cltime=%llu, flags=%lx)", + mdd, mdd->mod_count, mdd->mod_valid, + mdd->mod_cltime, mdd->mod_flags); } static const struct lu_object_operations mdd_lu_obj_ops = { @@ -385,6 +389,7 @@ out: /** mdd_path() lookup structure. */ struct path_lookup_info { __u64 pli_recno; /**< history point */ + __u64 pli_currec; /**< current record */ struct lu_fid pli_fid; struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */ struct mdd_object *pli_mdd_obj; @@ -473,7 +478,12 @@ static int mdd_path_current(const struct lu_env *env, pli->pli_fids[pli->pli_fidcount] = *tmpfid; } - /* Verify that our path hasn't changed since we started the lookup */ + /* Verify that our path hasn't changed since we started the lookup. + Record the current index, and verify the path resolves to the + same fid. If it does, then the path is correct as of this index. */ + spin_lock(&mdd->mdd_cl.mc_lock); + pli->pli_currec = mdd->mdd_cl.mc_index; + spin_unlock(&mdd->mdd_cl.mc_lock); rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid); if (rc) { CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc); @@ -497,9 +507,15 @@ out: return rc; } +static int mdd_path_historic(const struct lu_env *env, + struct path_lookup_info *pli) +{ + return 0; +} + /* Returns the full path to this fid, as of changelog record recno. */ static int mdd_path(const struct lu_env *env, struct md_object *obj, - char *path, int pathlen, __u64 recno, int *linkno) + char *path, int pathlen, __u64 *recno, int *linkno) { struct path_lookup_info *pli; int tries = 3; @@ -520,7 +536,7 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, RETURN(-ENOMEM); pli->pli_mdd_obj = md2mdd_obj(obj); - pli->pli_recno = recno; + pli->pli_recno = *recno; pli->pli_path = path; pli->pli_pathlen = pathlen; pli->pli_linkno = *linkno; @@ -529,7 +545,6 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, while (tries-- && rc == -EAGAIN) rc = mdd_path_current(env, pli); -#if 0 /* We need old path names only for replication */ /* For historical path lookup, the current links may not have existed * at "recno" time. We must switch over to earlier links/parents * by using the changelog records. If the earlier parent doesn't @@ -538,12 +553,13 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, * We may ignore this problem for the initial implementation and * state that an "original" hardlink must still exist for us to find * historic path name. */ - if (pli->pli_recno != -1) + if (pli->pli_recno != -1) { rc = mdd_path_historic(env, pli); -#endif - - /* return next link index to caller */ - *linkno = pli->pli_linkno; + } else { + *recno = pli->pli_currec; + /* Return next link index to caller */ + *linkno = pli->pli_linkno; + } OBD_FREE_PTR(pli); @@ -1209,7 +1225,8 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, unsigned int qnids[MAXQUOTAS] = { 0, 0 }; unsigned int qoids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1253,20 +1270,17 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, mdd_quota_wrapper(la_copy, qnids); mdd_quota_wrapper(la_tmp, qoids); /* get file quota for new owner */ - lquota_chkquota(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], 1, - &inode_pending, NULL, 0, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, qnids, + inode_pending, 1, NULL, 0, NULL, 0); block_count = (la_tmp->la_blocks + 7) >> 3; if (block_count) { void *data = NULL; mdd_data_get(env, mdd_obj, &data); /* get block quota for new owner */ lquota_chkquota(mds_quota_interface_ref, obd, - qnids[USRQUOTA], - qnids[GRPQUOTA], - block_count, &block_pending, - NULL, LQUOTA_FLAGS_BLK, - data, 1); + qnids, block_pending, + block_count, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } } @@ -1319,14 +1333,10 @@ cleanup: } #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], - inode_pending, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], - block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qnids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qnids, + block_pending, 1); /* Trigger dqrel/dqacq for original owner and new owner. * If failed, the next call for lquota_chkquota will * process it. */ @@ -1565,7 +1575,8 @@ static int mdd_object_create(const struct lu_env *env, struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc = 0; ENTRY; @@ -1575,9 +1586,8 @@ static int mdd_object_create(const struct lu_env *env, quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA], - qids[GRPQUOTA], 1, &inode_pending, NULL, 0, - NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, qids, + inode_pending, 1, NULL, 0, NULL, 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: case S_IFDIR: @@ -1589,9 +1599,8 @@ static int mdd_object_create(const struct lu_env *env, } /* get block quota for child */ if (block_count) - lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - block_count, &block_pending, NULL, + lquota_chkquota(mds_quota_interface_ref, obd, qids, + block_pending, block_count, NULL, LQUOTA_FLAGS_BLK, NULL, 0); } #endif @@ -1659,18 +1668,14 @@ unlock: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - inode_pending, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qids, + block_pending, 1); /* Trigger dqacq on the owner of child. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, - FSFILT_OP_CREATE_PARTIAL_CHILD); + quota_opc); } #endif return rc; @@ -1841,6 +1846,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; int rc; int reset = 1; @@ -1864,27 +1870,53 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* release open count */ mdd_obj->mod_count --; - if (mdd_obj->mod_count == 0) { + if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) { /* remove link to object from orphan index */ - if (mdd_obj->mod_flags & ORPHAN_OBJ) - __mdd_orphan_del(env, mdd_obj, handle); + rc = __mdd_orphan_del(env, mdd_obj, handle); + if (rc == 0) { + CDEBUG(D_HA, "Object "DFID" is deleted from orphan " + "list, OSS objects to be destroyed.\n", + PFID(mdd_object_fid(mdd_obj))); + } else { + CERROR("Object "DFID" can not be deleted from orphan " + "list, maybe cause OST objects can not be " + "destroyed (err: %d).\n", + PFID(mdd_object_fid(mdd_obj)), rc); + /* If object was not deleted from orphan list, do not + * destroy OSS objects, which will be done when next + * recovery. */ + GOTO(out, rc); + } } rc = mdd_iattr_get(env, mdd_obj, ma); - if (rc == 0) { - if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) { - rc = mdd_object_kill(env, mdd_obj, ma); + /* Object maybe not in orphan list originally, it is rare case for + * mdd_finish_unlink() failure. */ + if (rc == 0 && ma->ma_attr.la_nlink == 0) { #ifdef HAVE_QUOTA_SUPPORT - if (mds->mds_quota) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; - mdd_quota_wrapper(&ma->ma_attr, qids); - } + if (mds->mds_quota) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + } #endif - if (rc == 0) - reset = 0; + /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */ + if (ma->ma_valid & MA_FLAGS && + ma->ma_attr_flags & MDS_CLOSE_CLEANUP) { + rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr); + } else { + rc = mdd_object_kill(env, mdd_obj, ma); + if (rc == 0) + reset = 0; } + + if (rc != 0) + CERROR("Error when prepare to delete Object "DFID" , " + "which will cause OST objects can not be " + "destroyed.\n", PFID(mdd_object_fid(mdd_obj))); } + EXIT; +out: if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); @@ -1897,7 +1929,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, quota_opc); #endif - RETURN(rc); + return rc; } /* @@ -1919,15 +1951,14 @@ static int mdd_readpage_sanity_check(const struct lu_env *env, RETURN(rc); } -static int mdd_dir_page_build(const struct lu_env *env, int first, - void *area, int nob, const struct dt_it_ops *iops, - struct dt_it *it, __u64 *start, __u64 *end, - struct lu_dirent **last) +static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, + int first, void *area, int nob, + const struct dt_it_ops *iops, struct dt_it *it, + __u64 *start, __u64 *end, + struct lu_dirent **last, __u32 attr) { - struct lu_fid *fid = &mdd_env_info(env)->mti_fid2; - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_fid_pack *pack = &info->mti_pack; int result; + __u64 hash = 0; struct lu_dirent *ent; if (first) { @@ -1936,56 +1967,62 @@ static int mdd_dir_page_build(const struct lu_env *env, int first, nob -= sizeof (struct lu_dirpage); } - LASSERT(nob > sizeof *ent); - ent = area; - result = 0; do { - char *name; int len; int recsize; - __u64 hash; - name = (char *)iops->key(env, it); len = iops->key_size(env, it); - pack = (struct lu_fid_pack *)iops->rec(env, it); - result = fid_unpack(pack, fid); - if (result != 0) - break; + /* IAM iterator can return record with zero len. */ + if (len == 0) + goto next; - recsize = (sizeof(*ent) + len + 7) & ~7; hash = iops->store(env, it); - *end = hash; + if (unlikely(first)) { + first = 0; + *start = hash; + } - CDEBUG(D_INFO, "%p %p %d "DFID": "LPU64" (%d) \"%*.*s\"\n", - name, ent, nob, PFID(fid), hash, len, len, len, name); + /* calculate max space required for lu_dirent */ + recsize = lu_dirent_calc_size(len, attr); if (nob >= recsize) { - ent->lde_fid = *fid; - fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid); - ent->lde_hash = hash; - ent->lde_namelen = cpu_to_le16(len); - ent->lde_reclen = cpu_to_le16(recsize); - memcpy(ent->lde_name, name, len); - if (first && ent == area) - *start = hash; - *last = ent; - ent = (void *)ent + recsize; - nob -= recsize; - result = iops->next(env, it); + result = iops->rec(env, it, ent, attr); + if (result == -ESTALE) + goto next; + if (result != 0) + goto out; + + /* osd might not able to pack all attributes, + * so recheck rec length */ + recsize = le16_to_cpu(ent->lde_reclen); } else { /* * record doesn't fit into page, enlarge previous one. */ - LASSERT(*last != NULL); - (*last)->lde_reclen = - cpu_to_le16(le16_to_cpu((*last)->lde_reclen) + - nob); - break; + if (*last) { + (*last)->lde_reclen = + cpu_to_le16(le16_to_cpu((*last)->lde_reclen) + + nob); + result = 0; + } else + result = -EINVAL; + + goto out; } + *last = ent; + ent = (void *)ent + recsize; + nob -= recsize; + +next: + result = iops->next(env, it); + if (result == -ESTALE) + goto next; } while (result == 0); +out: + *end = hash; return result; } @@ -1997,6 +2034,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, const struct dt_it_ops *iops; struct page *pg; struct lu_dirent *last = NULL; + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); int i; int rc; int nob; @@ -2019,7 +2057,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, rc = iops->load(env, it, rdpg->rp_hash); - if (rc == 0) + if (rc == 0){ /* * Iterator didn't find record with exactly the key requested. * @@ -2032,7 +2070,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, * state)---position it on the next item. */ rc = iops->next(env, it); - else if (rc > 0) + } else if (rc > 0) rc = 0; /* @@ -2046,11 +2084,14 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, i++, nob -= CFS_PAGE_SIZE) { LASSERT(i < rdpg->rp_npages); pg = rdpg->rp_pages[i]; - rc = mdd_dir_page_build(env, !i, cfs_kmap(pg), + rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg), min_t(int, nob, CFS_PAGE_SIZE), iops, - it, &hash_start, &hash_end, &last); - if (rc != 0 || i == rdpg->rp_npages - 1) - last->lde_reclen = 0; + it, &hash_start, &hash_end, &last, + rdpg->rp_attrs); + if (rc != 0 || i == rdpg->rp_npages - 1) { + if (last) + last->lde_reclen = 0; + } cfs_kunmap(pg); } if (rc > 0) { @@ -2064,13 +2105,14 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, struct lu_dirpage *dp; dp = cfs_kmap(rdpg->rp_pages[0]); - dp->ldp_hash_start = rdpg->rp_hash; - dp->ldp_hash_end = hash_end; + dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash); + dp->ldp_hash_end = cpu_to_le64(hash_end); if (i == 0) /* * No pages were processed, mark this. */ dp->ldp_flags |= LDF_EMPTY; + dp->ldp_flags = cpu_to_le32(dp->ldp_flags); cfs_kunmap(rdpg->rp_pages[0]); } @@ -2080,8 +2122,8 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, return rc; } -static int mdd_readpage(const struct lu_env *env, struct md_object *obj, - const struct lu_rdpg *rdpg) +int mdd_readpage(const struct lu_env *env, struct md_object *obj, + const struct lu_rdpg *rdpg) { struct mdd_object *mdd_obj = md2mdd_obj(obj); int rc; @@ -2112,8 +2154,8 @@ static int mdd_readpage(const struct lu_env *env, struct md_object *obj, pg = rdpg->rp_pages[0]; dp = (struct lu_dirpage*)cfs_kmap(pg); memset(dp, 0 , sizeof(struct lu_dirpage)); - dp->ldp_hash_start = rdpg->rp_hash; - dp->ldp_hash_end = DIR_END_OFF; + dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash); + dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF); dp->ldp_flags |= LDF_EMPTY; dp->ldp_flags = cpu_to_le32(dp->ldp_flags); cfs_kunmap(pg); @@ -2138,6 +2180,24 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj) return next->do_ops->do_object_sync(env, next); } +static dt_obj_version_t mdd_version_get(const struct lu_env *env, + struct md_object *obj) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_get(env, mdd_object_child(mdd_obj)); +} + +static void mdd_version_set(const struct lu_env *env, struct md_object *obj, + dt_obj_version_t version) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_set(env, mdd_object_child(mdd_obj), version); +} + const struct md_object_operations mdd_obj_ops = { .moo_permission = mdd_permission, .moo_attr_get = mdd_attr_get, @@ -2155,5 +2215,7 @@ const struct md_object_operations mdd_obj_ops = { .moo_readlink = mdd_readlink, .moo_capa_get = mdd_capa_get, .moo_object_sync = mdd_object_sync, + .moo_version_get = mdd_version_get, + .moo_version_set = mdd_version_set, .moo_path = mdd_path, };