X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_object.c;h=224aff7b93d52fe03812a39fb85f73b9357b8c6f;hb=f2bedfe6e389eccf931f339cd8efe93f72ed530c;hp=a2ce9914a3a5dd7543f9317bff46baf21bf62580;hpb=d6b08e30909b3b02ffaf6a7dffabd48642b6f37f;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index a2ce991..224aff7 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -265,7 +265,7 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env, } static int mdd_object_init(const struct lu_env *env, struct lu_object *o, - const struct lu_object_conf *_) + const struct lu_object_conf *unused) { struct mdd_device *d = lu2mdd_dev(o->lo_dev); struct mdd_object *mdd_obj = lu2mdd_obj(o); @@ -304,7 +304,11 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o) static int mdd_object_print(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { - return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o); + struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o); + return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, " + "valid=%x, cltime=%llu, flags=%lx)", + mdd, mdd->mod_count, mdd->mod_valid, + mdd->mod_cltime, mdd->mod_flags); } static const struct lu_object_operations mdd_lu_obj_ops = { @@ -385,6 +389,7 @@ out: /** mdd_path() lookup structure. */ struct path_lookup_info { __u64 pli_recno; /**< history point */ + __u64 pli_currec; /**< current record */ struct lu_fid pli_fid; struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */ struct mdd_object *pli_mdd_obj; @@ -473,7 +478,12 @@ static int mdd_path_current(const struct lu_env *env, pli->pli_fids[pli->pli_fidcount] = *tmpfid; } - /* Verify that our path hasn't changed since we started the lookup */ + /* Verify that our path hasn't changed since we started the lookup. + Record the current index, and verify the path resolves to the + same fid. If it does, then the path is correct as of this index. */ + spin_lock(&mdd->mdd_cl.mc_lock); + pli->pli_currec = mdd->mdd_cl.mc_index; + spin_unlock(&mdd->mdd_cl.mc_lock); rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid); if (rc) { CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc); @@ -497,9 +507,15 @@ out: return rc; } +static int mdd_path_historic(const struct lu_env *env, + struct path_lookup_info *pli) +{ + return 0; +} + /* Returns the full path to this fid, as of changelog record recno. */ static int mdd_path(const struct lu_env *env, struct md_object *obj, - char *path, int pathlen, __u64 recno, int *linkno) + char *path, int pathlen, __u64 *recno, int *linkno) { struct path_lookup_info *pli; int tries = 3; @@ -520,7 +536,7 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, RETURN(-ENOMEM); pli->pli_mdd_obj = md2mdd_obj(obj); - pli->pli_recno = recno; + pli->pli_recno = *recno; pli->pli_path = path; pli->pli_pathlen = pathlen; pli->pli_linkno = *linkno; @@ -529,7 +545,6 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, while (tries-- && rc == -EAGAIN) rc = mdd_path_current(env, pli); -#if 0 /* We need old path names only for replication */ /* For historical path lookup, the current links may not have existed * at "recno" time. We must switch over to earlier links/parents * by using the changelog records. If the earlier parent doesn't @@ -538,12 +553,13 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj, * We may ignore this problem for the initial implementation and * state that an "original" hardlink must still exist for us to find * historic path name. */ - if (pli->pli_recno != -1) + if (pli->pli_recno != -1) { rc = mdd_path_historic(env, pli); -#endif - - /* return next link index to caller */ - *linkno = pli->pli_linkno; + } else { + *recno = pli->pli_currec; + /* Return next link index to caller */ + *linkno = pli->pli_linkno; + } OBD_FREE_PTR(pli); @@ -1209,7 +1225,8 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, unsigned int qnids[MAXQUOTAS] = { 0, 0 }; unsigned int qoids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1253,20 +1270,17 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, mdd_quota_wrapper(la_copy, qnids); mdd_quota_wrapper(la_tmp, qoids); /* get file quota for new owner */ - lquota_chkquota(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], 1, - &inode_pending, NULL, 0, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, qnids, + inode_pending, 1, NULL, 0, NULL, 0); block_count = (la_tmp->la_blocks + 7) >> 3; if (block_count) { void *data = NULL; mdd_data_get(env, mdd_obj, &data); /* get block quota for new owner */ lquota_chkquota(mds_quota_interface_ref, obd, - qnids[USRQUOTA], - qnids[GRPQUOTA], - block_count, &block_pending, - NULL, LQUOTA_FLAGS_BLK, - data, 1); + qnids, block_pending, + block_count, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } } @@ -1319,14 +1333,10 @@ cleanup: } #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], - inode_pending, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qnids[USRQUOTA], qnids[GRPQUOTA], - block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qnids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qnids, + block_pending, 1); /* Trigger dqrel/dqacq for original owner and new owner. * If failed, the next call for lquota_chkquota will * process it. */ @@ -1565,7 +1575,8 @@ static int mdd_object_create(const struct lu_env *env, struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc = 0; ENTRY; @@ -1575,9 +1586,8 @@ static int mdd_object_create(const struct lu_env *env, quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA], - qids[GRPQUOTA], 1, &inode_pending, NULL, 0, - NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, qids, + inode_pending, 1, NULL, 0, NULL, 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: case S_IFDIR: @@ -1589,9 +1599,8 @@ static int mdd_object_create(const struct lu_env *env, } /* get block quota for child */ if (block_count) - lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - block_count, &block_pending, NULL, + lquota_chkquota(mds_quota_interface_ref, obd, qids, + block_pending, block_count, NULL, LQUOTA_FLAGS_BLK, NULL, 0); } #endif @@ -1659,18 +1668,14 @@ unlock: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - inode_pending, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qids, + block_pending, 1); /* Trigger dqacq on the owner of child. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, - FSFILT_OP_CREATE_PARTIAL_CHILD); + quota_opc); } #endif return rc; @@ -1841,6 +1846,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; int rc; int reset = 1; @@ -1864,27 +1870,53 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* release open count */ mdd_obj->mod_count --; - if (mdd_obj->mod_count == 0) { + if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) { /* remove link to object from orphan index */ - if (mdd_obj->mod_flags & ORPHAN_OBJ) - __mdd_orphan_del(env, mdd_obj, handle); + rc = __mdd_orphan_del(env, mdd_obj, handle); + if (rc == 0) { + CDEBUG(D_HA, "Object "DFID" is deleted from orphan " + "list, OSS objects to be destroyed.\n", + PFID(mdd_object_fid(mdd_obj))); + } else { + CERROR("Object "DFID" can not be deleted from orphan " + "list, maybe cause OST objects can not be " + "destroyed (err: %d).\n", + PFID(mdd_object_fid(mdd_obj)), rc); + /* If object was not deleted from orphan list, do not + * destroy OSS objects, which will be done when next + * recovery. */ + GOTO(out, rc); + } } rc = mdd_iattr_get(env, mdd_obj, ma); - if (rc == 0) { - if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) { - rc = mdd_object_kill(env, mdd_obj, ma); + /* Object maybe not in orphan list originally, it is rare case for + * mdd_finish_unlink() failure. */ + if (rc == 0 && ma->ma_attr.la_nlink == 0) { #ifdef HAVE_QUOTA_SUPPORT - if (mds->mds_quota) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; - mdd_quota_wrapper(&ma->ma_attr, qids); - } + if (mds->mds_quota) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + } #endif - if (rc == 0) - reset = 0; + /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */ + if (ma->ma_valid & MA_FLAGS && + ma->ma_attr_flags & MDS_CLOSE_CLEANUP) { + rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr); + } else { + rc = mdd_object_kill(env, mdd_obj, ma); + if (rc == 0) + reset = 0; } + + if (rc != 0) + CERROR("Error when prepare to delete Object "DFID" , " + "which will cause OST objects can not be " + "destroyed.\n", PFID(mdd_object_fid(mdd_obj))); } + EXIT; +out: if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); @@ -1897,7 +1929,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, quota_opc); #endif - RETURN(rc); + return rc; } /* @@ -1919,71 +1951,6 @@ static int mdd_readpage_sanity_check(const struct lu_env *env, RETURN(rc); } -static int mdd_append_attrs(const struct lu_env *env, - struct mdd_device *mdd, - __u32 attr, - const struct dt_it_ops *iops, - struct dt_it *it, - struct lu_dirent*ent) -{ - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_fid *fid = &info->mti_fid2; - int len = cpu_to_le16(ent->lde_namelen); - const unsigned align = sizeof(struct luda_type) - 1; - struct lu_fid_pack *pack; - struct mdd_object *obj; - struct luda_type *lt; - int rc = 0; - - if (attr & LUDA_FID) { - pack = (struct lu_fid_pack *)iops->rec(env, it); - if (IS_ERR(pack)) { - rc = PTR_ERR(pack); - ent->lde_attrs = 0; - goto out; - } - rc = fid_unpack(pack, fid); - if (rc != 0) { - ent->lde_attrs = 0; - goto out; - } - - fid_cpu_to_le(&ent->lde_fid, fid); - ent->lde_attrs = LUDA_FID; - } - - /* check if file type is required */ - if (attr & LUDA_TYPE) { - if (!(attr & LUDA_FID)) { - CERROR("wrong attr : [%x]\n",attr); - rc = -EINVAL; - goto out; - } - - obj = mdd_object_find(env, mdd, fid); - if (obj == NULL) /* remote object */ - goto out; - - if (IS_ERR(obj)) { - rc = PTR_ERR(obj); - goto out; - } - - if (mdd_object_exists(obj) == +1) { - len = (len + align) & ~align; - - lt = (void *) ent->lde_name + len; - lt->lt_type = cpu_to_le16(mdd_object_type(obj)); - - ent->lde_attrs |= LUDA_TYPE; - } - mdd_object_put(env, obj); - } -out: - ent->lde_attrs = cpu_to_le32(ent->lde_attrs); - return rc; -} - static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, int first, void *area, int nob, const struct dt_it_ops *iops, struct dt_it *it, @@ -1991,8 +1958,8 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, struct lu_dirent **last, __u32 attr) { int result; + __u64 hash = 0; struct lu_dirent *ent; - __u64 hash = 0; if (first) { memset(area, 0, sizeof (struct lu_dirpage)); @@ -2002,7 +1969,6 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, ent = area; do { - char *name; int len; int recsize; @@ -2012,28 +1978,25 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, if (len == 0) goto next; - name = (char *)iops->key(env, it); hash = iops->store(env, it); - if (unlikely(first)) { first = 0; *start = hash; } + /* calculate max space required for lu_dirent */ recsize = lu_dirent_calc_size(len, attr); - CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n", - name, ent, nob, hash, len, len, len, name); - if (nob >= recsize) { - ent->lde_hash = cpu_to_le64(hash); - ent->lde_namelen = cpu_to_le16(len); - ent->lde_reclen = cpu_to_le16(recsize); - memcpy(ent->lde_name, name, len); - - result = mdd_append_attrs(env, mdd, attr, iops, it, ent); + result = iops->rec(env, it, ent, attr); + if (result == -ESTALE) + goto next; if (result != 0) goto out; + + /* osd might not able to pack all attributes, + * so recheck rec length */ + recsize = le16_to_cpu(ent->lde_reclen); } else { /* * record doesn't fit into page, enlarge previous one. @@ -2054,6 +2017,8 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, next: result = iops->next(env, it); + if (result == -ESTALE) + goto next; } while (result == 0); out: @@ -2092,7 +2057,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, rc = iops->load(env, it, rdpg->rp_hash); - if (rc == 0) + if (rc == 0){ /* * Iterator didn't find record with exactly the key requested. * @@ -2105,7 +2070,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, * state)---position it on the next item. */ rc = iops->next(env, it); - else if (rc > 0) + } else if (rc > 0) rc = 0; /* @@ -2215,6 +2180,24 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj) return next->do_ops->do_object_sync(env, next); } +static dt_obj_version_t mdd_version_get(const struct lu_env *env, + struct md_object *obj) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_get(env, mdd_object_child(mdd_obj)); +} + +static void mdd_version_set(const struct lu_env *env, struct md_object *obj, + dt_obj_version_t version) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_set(env, mdd_object_child(mdd_obj), version); +} + const struct md_object_operations mdd_obj_ops = { .moo_permission = mdd_permission, .moo_attr_get = mdd_attr_get, @@ -2232,5 +2215,7 @@ const struct md_object_operations mdd_obj_ops = { .moo_readlink = mdd_readlink, .moo_capa_get = mdd_capa_get, .moo_object_sync = mdd_object_sync, + .moo_version_get = mdd_version_get, + .moo_version_set = mdd_version_set, .moo_path = mdd_path, };