From: Mikhail Pershin Date: Wed, 23 May 2012 09:42:12 +0000 (+0400) Subject: LU-1406 ofd: add object methods X-Git-Tag: 2.2.55~14 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=a42560217562c7b00e5b680b347687f894939601 LU-1406 ofd: add object methods OFD objects methods except IO Signed-off-by: Mikhail Pershin Change-Id: I17d1e9fd462f8022b516f16df71e40be0879efea Reviewed-on: http://review.whamcloud.com/2887 Reviewed-by: Alex Zhuravlev Tested-by: Hudson Tested-by: Maloo Reviewed-by: Andreas Dilger --- diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index b2e9021..d05b002 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -1264,4 +1264,7 @@ static inline int dt_lookup(const struct lu_env *env, ret = -ENOENT; return ret; } + +#define LU221_BAD_TIME (0x80000000U + 24 * 3600) + #endif /* __LUSTRE_DT_OBJECT_H */ diff --git a/lustre/ofd/Makefile.in b/lustre/ofd/Makefile.in index 7d72020..aa73c81 100644 --- a/lustre/ofd/Makefile.in +++ b/lustre/ofd/Makefile.in @@ -1,6 +1,6 @@ MODULES := ofd -ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o ofd_trans.o +ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o ofd_trans.o ofd_objects.o ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o EXTRA_DIST = $(ofd-objs:%.o=%.c) ofd_internal.h diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index 55aeb4f..c41f992 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -41,6 +41,7 @@ #include #include #include +#include #include #define OFD_INIT_OBJID 0 @@ -161,8 +162,9 @@ static inline char *ofd_name(struct ofd_device *ofd) } struct ofd_object { - struct lu_object_header ofo_header; + struct lu_object_header ofo_header; struct dt_object ofo_obj; + int ofo_ff_exists; }; static inline struct ofd_object *ofd_obj(struct lu_object *o) @@ -190,6 +192,50 @@ static inline struct dt_object *ofd_object_child(struct ofd_object *_obj) return container_of0(lu_object_next(lu), struct dt_object, do_lu); } +static inline struct ofd_device *ofd_obj2dev(const struct ofd_object *fo) +{ + return ofd_dev(fo->ofo_obj.do_lu.lo_dev); +} + +static inline struct lustre_capa *ofd_object_capa(const struct lu_env *env, + const struct ofd_object *obj) +{ + /* TODO: see mdd_object_capa() */ + return BYPASS_CAPA; +} + +static inline void ofd_read_lock(const struct lu_env *env, + struct ofd_object *fo) +{ + struct dt_object *next = ofd_object_child(fo); + + next->do_ops->do_read_lock(env, next, 0); +} + +static inline void ofd_read_unlock(const struct lu_env *env, + struct ofd_object *fo) +{ + struct dt_object *next = ofd_object_child(fo); + + next->do_ops->do_read_unlock(env, next); +} + +static inline void ofd_write_lock(const struct lu_env *env, + struct ofd_object *fo) +{ + struct dt_object *next = ofd_object_child(fo); + + next->do_ops->do_write_lock(env, next, 0); +} + +static inline void ofd_write_unlock(const struct lu_env *env, + struct ofd_object *fo) +{ + struct dt_object *next = ofd_object_child(fo); + + next->do_ops->do_write_unlock(env, next); +} + /* * Common data shared by obdofd-level handlers. This is allocated per-thread * to reduce stack consumption. @@ -206,12 +252,17 @@ struct ofd_thread_info { struct lu_fid fti_fid; struct lu_attr fti_attr; + struct lu_attr fti_attr2; + struct filter_fid fti_mds_fid2; + struct ost_id fti_ostid; struct ofd_object *fti_obj; union { char name[64]; /* for ofd_init0() */ struct obd_statfs osfs; /* for obdofd_statfs() */ } fti_u; + /* Ops object filename */ + struct lu_name fti_name; struct dt_object_format fti_dof; struct lu_buf fti_buf; loff_t fti_off; @@ -240,6 +291,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* ofd_fs.c */ obd_id ofd_last_id(struct ofd_device *ofd, obd_seq seq); +void ofd_last_id_set(struct ofd_device *ofd, obd_id id, obd_seq seq); int ofd_group_load(const struct lu_env *env, struct ofd_device *ofd, int); int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd, struct obd_device *obd); @@ -261,6 +313,30 @@ void lprocfs_ofd_init_vars(struct lprocfs_static_vars *lvars); int lproc_ofd_attach_seqstat(struct obd_device *dev); extern struct file_operations ofd_per_nid_stats_fops; +/* ofd_objects.c */ +struct ofd_object *ofd_object_find(const struct lu_env *env, + struct ofd_device *ofd, + const struct lu_fid *fid); +struct ofd_object *ofd_object_find_or_create(const struct lu_env *env, + struct ofd_device *ofd, + const struct lu_fid *fid, + struct lu_attr *attr); +int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo); +int ofd_precreate_object(const struct lu_env *env, struct ofd_device *ofd, + obd_id id, obd_seq seq); + +void ofd_object_put(const struct lu_env *env, struct ofd_object *fo); +int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, + struct lu_attr *la, struct filter_fid *ff); +int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, + __u64 start, __u64 end, struct lu_attr *la, + struct filter_fid *ff); +int ofd_object_destroy(const struct lu_env *, struct ofd_object *, int); +int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo, + struct lu_attr *la); +int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, + struct lu_attr *la, int is_setattr); + /* ofd_grants.c */ #define OFD_GRANT_RATIO_SHIFT 8 static inline __u64 ofd_grant_reserved(struct ofd_device *ofd, obd_size bavail) @@ -351,9 +427,13 @@ static inline struct ofd_thread_info * ofd_info_init(const struct lu_env *env, LASSERT(info); LASSERT(info->fti_exp == NULL); LASSERT(info->fti_env == NULL); + LASSERT(info->fti_attr.la_valid == 0); info->fti_env = env; info->fti_exp = exp; + info->fti_pre_version = 0; + info->fti_transno = 0; + info->fti_has_trans = 0; return info; } @@ -368,6 +448,22 @@ static inline void ofd_slc_set(struct ofd_device *ofd) ofd->ofd_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL; } +static inline void ofd_prepare_fidea(struct filter_fid *ff, struct obdo *oa) +{ + if (!(oa->o_valid & OBD_MD_FLGROUP)) + oa->o_seq = 0; + /* packing fid and converting it to LE for storing into EA. + * Here ->o_stripe_idx should be filled by LOV and rest of + * fields - by client. */ + ff->ff_parent.f_seq = cpu_to_le64(oa->o_parent_seq); + ff->ff_parent.f_oid = cpu_to_le32(oa->o_parent_oid); + /* XXX: we are ignoring o_parent_ver here, since this should + * be the same for all objects in this fileset. */ + ff->ff_parent.f_ver = cpu_to_le32(oa->o_stripe_idx); + ff->ff_objid = cpu_to_le64(oa->o_id); + ff->ff_seq = cpu_to_le64(oa->o_seq); +} + /* niobuf_local has no rnb_ prefix in master */ #define rnb_offset offset #define rnb_flags flags diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c new file mode 100644 index 0000000..cf349ca --- /dev/null +++ b/lustre/ofd/ofd_objects.c @@ -0,0 +1,533 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/ofd/ofd_objects.c + * + * Author: Alex Zhuravlev + * Author: Mikhail Pershin + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include + +#include "ofd_internal.h" + +int ofd_version_get_check(struct ofd_thread_info *info, + struct ofd_object *fo) +{ + dt_obj_version_t curr_version; + + LASSERT(ofd_object_exists(fo)); + LASSERT(info->fti_exp); + + curr_version = dt_version_get(info->fti_env, ofd_object_child(fo)); + if ((__s64)curr_version == -EOPNOTSUPP) + RETURN(0); + /* VBR: version is checked always because costs nothing */ + if (info->fti_pre_version != 0 && + info->fti_pre_version != curr_version) { + CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", + info->fti_pre_version, curr_version); + cfs_spin_lock(&info->fti_exp->exp_lock); + info->fti_exp->exp_vbr_failed = 1; + cfs_spin_unlock(&info->fti_exp->exp_lock); + RETURN (-EOVERFLOW); + } + info->fti_pre_version = curr_version; + RETURN(0); +} + +struct ofd_object *ofd_object_find(const struct lu_env *env, + struct ofd_device *ofd, + const struct lu_fid *fid) +{ + struct ofd_object *fo; + struct lu_object *o; + + ENTRY; + + o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL); + if (likely(!IS_ERR(o))) + fo = ofd_obj(o); + else + fo = (struct ofd_object *)o; /* return error */ + RETURN(fo); +} + +struct ofd_object *ofd_object_find_or_create(const struct lu_env *env, + struct ofd_device *ofd, + const struct lu_fid *fid, + struct lu_attr *attr) +{ + struct ofd_thread_info *info = ofd_info(env); + struct lu_object *fo_obj; + struct dt_object *dto; + + ENTRY; + + info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG); + + dto = dt_find_or_create(env, ofd->ofd_osd, fid, &info->fti_dof, attr); + if (IS_ERR(dto)) + RETURN((struct ofd_object *)dto); + + fo_obj = lu_object_locate(dto->do_lu.lo_header, + ofd->ofd_dt_dev.dd_lu_dev.ld_type); + RETURN(ofd_obj(fo_obj)); +} + +int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo) +{ + struct ofd_thread_info *info = ofd_info(env); + int rc = 0; + + ENTRY; + + if (!fo->ofo_ff_exists) { + /* + * This actually means that we don't know whether the object + * has the "fid" EA or not. + */ + info->fti_buf.lb_buf = &info->fti_mds_fid2; + info->fti_buf.lb_len = sizeof(info->fti_mds_fid2); + rc = dt_xattr_get(env, ofd_object_child(fo), &info->fti_buf, + XATTR_NAME_FID, BYPASS_CAPA); + if (rc >= 0 || rc == -ENODATA) { + /* + * Here we assume that, if the object doesn't have the + * "fid" EA, the caller will add one, unless a fatal + * error (e.g., a memory or disk failure) prevents it + * from doing so. + */ + fo->ofo_ff_exists = 1; + } + if (rc > 0) + rc = 0; + } + RETURN(rc); +} + +void ofd_object_put(const struct lu_env *env, struct ofd_object *fo) +{ + lu_object_put(env, &fo->ofo_obj.do_lu); +} + +int ofd_precreate_object(const struct lu_env *env, struct ofd_device *ofd, + obd_id id, obd_seq group) +{ + struct ofd_thread_info *info = ofd_info(env); + struct ofd_object *fo; + struct dt_object *next; + struct thandle *th; + obd_id tmp; + int rc; + + ENTRY; + + /* Don't create objects beyond the valid range for this SEQ */ + if (unlikely(fid_seq_is_mdt0(group) && id >= IDIF_MAX_OID)) { + CERROR("%s:"POSTID" hit the IDIF_MAX_OID (1<<48)!\n", + ofd_name(ofd), id, group); + RETURN(rc = -ENOSPC); + } else if (unlikely(!fid_seq_is_mdt0(group) && id >= OBIF_MAX_OID)) { + CERROR("%s:"POSTID" hit the OBIF_MAX_OID (1<<32)!\n", + ofd_name(ofd), id, group); + RETURN(rc = -ENOSPC); + } + info->fti_ostid.oi_id = id; + info->fti_ostid.oi_seq = group; + fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + + info->fti_attr.la_valid = LA_TYPE | LA_MODE; + /* + * We mark object SUID+SGID to flag it for accepting UID+GID from + * client on first write. Currently the permission bits on the OST are + * never used, so this is OK. + */ + info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | 0666; + info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG); + + /* Initialize a/c/m time so any client timestamp will always + * be newer and update the inode. ctime = 0 is also handled + * specially in osd_inode_setattr(). See LU-221, LU-1042 */ + info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME; + info->fti_attr.la_atime = 0; + info->fti_attr.la_mtime = 0; + info->fti_attr.la_ctime = 0; + + next = ofd_object_child(fo); + LASSERT(next != NULL); + + info->fti_buf.lb_buf = &tmp; + info->fti_buf.lb_len = sizeof(tmp); + info->fti_off = 0; + + ofd_write_lock(env, fo); + th = ofd_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(out_unlock, rc = PTR_ERR(th)); + + rc = dt_declare_record_write(env, ofd->ofd_lastid_obj[group], + sizeof(tmp), info->fti_off, th); + if (rc) + GOTO(trans_stop, rc); + + if (unlikely(ofd_object_exists(fo))) { + /* object may exist being re-created by write replay */ + CDEBUG(D_INODE, "object %u/"LPD64" exists: "DFID"\n", + (unsigned) group, id, PFID(&info->fti_fid)); + rc = dt_trans_start_local(env, ofd->ofd_osd, th); + if (rc) + GOTO(trans_stop, rc); + GOTO(last_id_write, rc); + } + rc = dt_declare_create(env, next, &info->fti_attr, NULL, + &info->fti_dof, th); + if (rc) + GOTO(trans_stop, rc); + + rc = dt_trans_start_local(env, ofd->ofd_osd, th); + if (rc) + GOTO(trans_stop, rc); + + CDEBUG(D_OTHER, "create new object %lu:%llu\n", + (unsigned long) info->fti_fid.f_oid, info->fti_fid.f_seq); + + rc = dt_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th); + if (rc) + GOTO(trans_stop, rc); + LASSERT(ofd_object_exists(fo)); + +last_id_write: + ofd_last_id_set(ofd, id, group); + + tmp = cpu_to_le64(ofd_last_id(ofd, group)); + rc = dt_record_write(env, ofd->ofd_lastid_obj[group], &info->fti_buf, + &info->fti_off, th); +trans_stop: + ofd_trans_stop(env, ofd, th, rc); +out_unlock: + ofd_write_unlock(env, fo); + ofd_object_put(env, fo); + RETURN(rc); +} + +/* + * If the object still has SUID+SGID bits set (see ofd_precreate_object()) then + * we will accept the UID+GID if sent by the client for initializing the + * ownership of this object. We only allow this to happen once (so clear these + * bits) and later only allow setattr. + */ +int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, + struct lu_attr *la, int is_setattr) +{ + struct ofd_thread_info *info = ofd_info(env); + struct lu_attr *ln = &info->fti_attr2; + __u32 mask = 0; + int rc; + + ENTRY; + + if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID)) + RETURN(0); + + rc = dt_attr_get(env, ofd_object_child(fo), ln, BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + + LASSERT(ln->la_valid & LA_MODE); + + if (!is_setattr) { + if (!(ln->la_mode & S_ISUID)) + la->la_valid &= ~LA_UID; + if (!(ln->la_mode & S_ISGID)) + la->la_valid &= ~LA_GID; + } + + if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID)) + mask |= S_ISUID; + if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID)) + mask |= S_ISGID; + if (mask != 0) { + if (!(la->la_valid & LA_MODE) || !is_setattr) { + la->la_mode = ln->la_mode; + la->la_valid |= LA_MODE; + } + la->la_mode &= ~mask; + } + + RETURN(0); +} + +int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, + struct lu_attr *la, struct filter_fid *ff) +{ + struct ofd_thread_info *info = ofd_info(env); + struct ofd_device *ofd = ofd_obj2dev(fo); + struct thandle *th; + struct ofd_mod_data *fmd; + int ff_needed = 0; + int rc; + ENTRY; + + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) { + fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid); + if (fmd && fmd->fmd_mactime_xid < info->fti_xid) + fmd->fmd_mactime_xid = info->fti_xid; + ofd_fmd_put(info->fti_exp, fmd); + } + + /* VBR: version recovery check */ + rc = ofd_version_get_check(info, fo); + if (rc) + GOTO(unlock, rc); + + rc = ofd_attr_handle_ugid(env, fo, la, 1 /* is_setattr */); + if (rc != 0) + GOTO(unlock, rc); + + if (ff != NULL) { + rc = ofd_object_ff_check(env, fo); + if (rc == -ENODATA) + ff_needed = 1; + else if (rc < 0) + GOTO(unlock, rc); + } + + th = ofd_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th); + if (rc) + GOTO(stop, rc); + + if (ff_needed) { + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_declare_xattr_set(env, ofd_object_child(fo), + &info->fti_buf, XATTR_NAME_FID, 0, + th); + if (rc) + GOTO(stop, rc); + } + + rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th); + if (rc) + GOTO(stop, rc); + + rc = dt_attr_set(env, ofd_object_child(fo), la, th, + ofd_object_capa(env, fo)); + if (rc) + GOTO(stop, rc); + + if (ff_needed) + rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, + XATTR_NAME_FID, 0, th, BYPASS_CAPA); + +stop: + ofd_trans_stop(env, ofd, th, rc); +unlock: + ofd_write_unlock(env, fo); + RETURN(rc); +} + +int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, + __u64 start, __u64 end, struct lu_attr *la, + struct filter_fid *ff) +{ + struct ofd_thread_info *info = ofd_info(env); + struct ofd_device *ofd = ofd_obj2dev(fo); + struct ofd_mod_data *fmd; + struct dt_object *dob = ofd_object_child(fo); + struct thandle *th; + int ff_needed = 0; + int rc; + + ENTRY; + + /* we support truncate, not punch yet */ + LASSERT(end == OBD_OBJECT_EOF); + + fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid); + if (fmd && fmd->fmd_mactime_xid < info->fti_xid) + fmd->fmd_mactime_xid = info->fti_xid; + ofd_fmd_put(info->fti_exp, fmd); + + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + /* VBR: version recovery check */ + rc = ofd_version_get_check(info, fo); + if (rc) + GOTO(unlock, rc); + + rc = ofd_attr_handle_ugid(env, fo, la, 0 /* !is_setattr */); + if (rc != 0) + GOTO(unlock, rc); + + if (ff != NULL) { + rc = ofd_object_ff_check(env, fo); + if (rc == -ENODATA) + ff_needed = 1; + else if (rc < 0) + GOTO(unlock, rc); + } + + th = ofd_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + rc = dt_declare_attr_set(env, dob, la, th); + if (rc) + GOTO(stop, rc); + + rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th); + if (rc) + GOTO(stop, rc); + + if (ff_needed) { + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_declare_xattr_set(env, ofd_object_child(fo), + &info->fti_buf, XATTR_NAME_FID, 0, + th); + if (rc) + GOTO(stop, rc); + } + + rc = ofd_trans_start(env, ofd, fo, th); + if (rc) + GOTO(stop, rc); + + rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th, + ofd_object_capa(env, fo)); + if (rc) + GOTO(stop, rc); + + rc = dt_attr_set(env, dob, la, th, ofd_object_capa(env, fo)); + if (rc) + GOTO(stop, rc); + + if (ff_needed) + rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, + XATTR_NAME_FID, 0, th, BYPASS_CAPA); + +stop: + ofd_trans_stop(env, ofd, th, rc); +unlock: + ofd_write_unlock(env, fo); + RETURN(rc); +} + +int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, + int orphan) +{ + struct ofd_device *ofd = ofd_obj2dev(fo); + struct thandle *th; + int rc = 0; + + ENTRY; + + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + th = ofd_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + dt_declare_ref_del(env, ofd_object_child(fo), th); + dt_declare_destroy(env, ofd_object_child(fo), th); + if (orphan) + rc = dt_trans_start_local(env, ofd->ofd_osd, th); + else + rc = ofd_trans_start(env, ofd, NULL, th); + if (rc) + GOTO(stop, rc); + + ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid); + + dt_ref_del(env, ofd_object_child(fo), th); + dt_destroy(env, ofd_object_child(fo), th); +stop: + ofd_trans_stop(env, ofd, th, rc); +unlock: + ofd_write_unlock(env, fo); + RETURN(rc); +} + +int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo, + struct lu_attr *la) +{ + int rc = 0; + + ENTRY; + + if (ofd_object_exists(fo)) { + rc = dt_attr_get(env, ofd_object_child(fo), la, + ofd_object_capa(env, fo)); + +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0) + /* Try to correct for a bug in 2.1.0 (LU-221) that caused + * negative timestamps to appear to be in the far future, + * due old timestamp being stored on disk as an unsigned value. + * This fixes up any bad values stored on disk before + * returning them to the client, and ensures any timestamp + * updates are correct. LU-1042 */ + if (unlikely(la->la_atime == LU221_BAD_TIME)) + la->la_atime = 0; + if (unlikely(la->la_mtime == LU221_BAD_TIME)) + la->la_mtime = 0; + if (unlikely(la->la_ctime == LU221_BAD_TIME)) + la->la_ctime = 0; +#else +#warning "remove old LU-221/LU-1042 workaround code" +#endif + } else { + rc = -ENOENT; + } + RETURN(rc); +}