X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fofd%2Fofd_objects.c;h=7605de455321653ac73492592859e0f6d5cf4084;hb=b724079edc5b66e1046b5760a6bad3045e9a9260;hp=b6fcd4a81c6cab08826bf484e98d246c9129b816;hpb=382afc18c20823445d2155a77808c502919faf6a;p=fs%2Flustre-release.git diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index b6fcd4a..7605de4 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -23,7 +23,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014 Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,7 +41,6 @@ #define DEBUG_SUBSYSTEM S_FILTER #include -#include #include #include "ofd_internal.h" @@ -60,8 +59,8 @@ * \retval 0 if version matches * \retval -EOVERFLOW on version mismatch */ -int ofd_version_get_check(struct ofd_thread_info *info, - struct ofd_object *fo) +static int ofd_version_get_check(struct ofd_thread_info *info, + struct ofd_object *fo) { dt_obj_version_t curr_version; @@ -76,7 +75,7 @@ int ofd_version_get_check(struct ofd_thread_info *info, /* VBR: version is checked always because costs nothing */ if (info->fti_pre_version != 0 && info->fti_pre_version != curr_version) { - CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", + CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n", info->fti_pre_version, curr_version); spin_lock(&info->fti_exp->exp_lock); info->fti_exp->exp_vbr_failed = 1; @@ -121,7 +120,7 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, * Get FID of parent MDT object. * * This function reads extended attribute XATTR_NAME_FID of OFD object which - * contains the MDT parent object FID and saves it in ofd_object::ofo_pfid. + * contains the MDT parent object FID and saves it in ofd_object::ofo_ff. * * The filter_fid::ff_parent::f_ver field currently holds * the OST-object index in the parent MDT-object's layout EA, @@ -137,47 +136,31 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, */ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo) { - struct ofd_thread_info *info = ofd_info(env); - struct filter_fid_old *ff = &info->fti_mds_fid_old; - struct lu_buf *buf = &info->fti_buf; - struct lu_fid *pfid = &fo->ofo_pfid; - int rc = 0; + struct ofd_thread_info *info = ofd_info(env); + struct filter_fid *ff = &fo->ofo_ff; + struct lu_buf *buf = &info->fti_buf; + int rc = 0; - if (fid_is_sane(pfid)) + if (fid_is_sane(&ff->ff_parent)) return 0; buf->lb_buf = ff; buf->lb_len = sizeof(*ff); - rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID, - BYPASS_CAPA); + rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID); if (rc < 0) return rc; - if (rc < sizeof(struct lu_fid)) { - fid_zero(pfid); - - return -ENODATA; + if (unlikely(rc < sizeof(struct lu_fid))) { + fid_zero(&ff->ff_parent); + return -EINVAL; } - pfid->f_seq = le64_to_cpu(ff->ff_parent.f_seq); - pfid->f_oid = le32_to_cpu(ff->ff_parent.f_oid); - pfid->f_stripe_idx = le32_to_cpu(ff->ff_parent.f_stripe_idx); + filter_fid_le_to_cpu(ff, ff, rc); return 0; } /** - * Put OFD object reference. - * - * \param[in] env execution environment - * \param[in] fo OFD object - */ -void ofd_object_put(const struct lu_env *env, struct ofd_object *fo) -{ - lu_object_put(env, &fo->ofo_obj.do_lu); -} - -/** * Precreate the given number \a nr of objects in the given sequence \a oseq. * * This function precreates new OST objects in the given sequence. @@ -205,7 +188,7 @@ void ofd_object_put(const struct lu_env *env, struct ofd_object *fo) * \retval negative value on error */ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, - obd_id id, struct ofd_seq *oseq, int nr, int sync) + u64 id, struct ofd_seq *oseq, int nr, int sync) { struct ofd_thread_info *info = ofd_info(env); struct ofd_object *fo = NULL; @@ -213,22 +196,23 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, struct thandle *th; struct ofd_object **batch; struct lu_fid *fid = &info->fti_fid; - obd_id tmp; - int rc; - int i; - int objects = 0; - int nr_saved = nr; + u64 tmp; + int rc; + int rc2; + int i; + int objects = 0; + int nr_saved = nr; ENTRY; /* Don't create objects beyond the valid range for this SEQ */ if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && - (id + nr) >= IDIF_MAX_OID)) { + (id + nr) > IDIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n", ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && - (id + nr) >= OBIF_MAX_OID)) { + (id + nr) > OBIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n", ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); @@ -239,7 +223,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, RETURN(-ENOMEM); info->fti_attr.la_valid = LA_TYPE | LA_MODE; - info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | 0666; + info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666; info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG); info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME; @@ -294,7 +278,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, if (unlikely(ofd_object_exists(fo))) { /* object may exist being re-created by write replay */ - CDEBUG(D_INODE, "object "LPX64"/"LPX64" exists: " + CDEBUG(D_INODE, "object %#llx/%#llx exists: " DFID"\n", ostid_seq(&oseq->os_oi), id, PFID(lu_object_fid(&fo->ofo_obj.do_lu))); continue; @@ -305,7 +289,10 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, rc = dt_declare_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th); - if (rc) { + if (rc < 0) { + if (i == 0) + GOTO(trans_stop, rc); + nr = i; break; } @@ -318,8 +305,6 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n", ofd_name(ofd), PFID(fid), nr); - LASSERT(nr > 0); - /* When the LFSCK scanning the whole device to verify the LAST_ID file * consistency, it will load the last_id into RAM firstly, and compare * the last_id with each OST-object's ID. If the later one is larger, @@ -349,10 +334,11 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, /* Only the new created objects need to be recorded. */ if (ofd->ofd_osd->dd_record_fid_accessed) { - lfsck_pack_rfa(&ofd_info(env)->fti_lr, - lu_object_fid(&fo->ofo_obj.do_lu)); - lfsck_in_notify(env, ofd->ofd_osd, - &ofd_info(env)->fti_lr); + struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl; + + lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu), + LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT); + lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL); } if (likely(!ofd_object_exists(fo) && @@ -362,8 +348,13 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, rc = dt_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th); - if (rc) + if (rc < 0) { + if (i == 0) + GOTO(trans_stop, rc); + + rc = 0; break; + } LASSERT(ofd_object_exists(fo)); } ofd_seq_last_oid_set(oseq, id + i); @@ -382,14 +373,19 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, &info->fti_buf, &info->fti_off, th); dt_write_unlock(env, oseq->os_lastid_obj); if (rc1 != 0) - CERROR("%s: fail to reset the LAST_ID for seq ("LPX64 - ") from "LPU64" to "LPU64"\n", ofd_name(ofd), + CERROR("%s: fail to reset the LAST_ID for seq (%#llx" + ") from %llu to %llu\n", ofd_name(ofd), ostid_seq(&oseq->os_oi), id + nr - 1, ofd_seq_last_oid(oseq)); } trans_stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; out: for (i = 0; i < nr_saved; i++) { fo = batch[i]; @@ -412,9 +408,9 @@ out: * * If the object still has SUID+SGID bits set, meaning that it was precreated * by the MDT before it was assigned to any file, (see ofd_precreate_objects()) - * then we will accept the UID+GID if sent by the client for initializing the - * ownership of this object. We only allow this to happen once (so clear these - * bits) and later only allow setattr. + * then we will accept the UID/GID/PROJID if sent by the client for initializing + * the ownership of this object. We only allow this to happen once (so clear + * these bits) and later only allow setattr. * * \param[in] env execution environment * \param[in] fo OFD object @@ -424,7 +420,7 @@ out: * \retval 0 if successful * \retval negative value on error */ -int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, +int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo, struct lu_attr *la, int is_setattr) { struct ofd_thread_info *info = ofd_info(env); @@ -434,26 +430,37 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, ENTRY; - if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID)) + if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) && + !(la->la_valid & LA_PROJID)) RETURN(0); - rc = dt_attr_get(env, ofd_object_child(fo), ln, BYPASS_CAPA); + rc = dt_attr_get(env, ofd_object_child(fo), ln); if (rc != 0) RETURN(rc); LASSERT(ln->la_valid & LA_MODE); + /* + * Only allow setattr to change UID/GID/PROJID, if + * SUID+SGID is not set which means this is not + * initialization of this objects. + */ if (!is_setattr) { if (!(ln->la_mode & S_ISUID)) la->la_valid &= ~LA_UID; if (!(ln->la_mode & S_ISGID)) la->la_valid &= ~LA_GID; + if (!(ln->la_mode & S_ISVTX)) + la->la_valid &= ~LA_PROJID; } + /* Initialize ownership of this object, clear SUID+SGID bits*/ if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID)) mask |= S_ISUID; if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID)) mask |= S_ISGID; + if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX)) + mask |= S_ISVTX; if (mask != 0) { if (!(la->la_valid & LA_MODE) || !is_setattr) { la->la_mode = ln->la_mode; @@ -466,6 +473,100 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, } /** + * Check if it needs to update filter_fid by the value of @oa. + * + * \param[in] env env + * \param[in] fo ofd object + * \param[in] oa obdo from client or MDT + * \param[out] ff if filter_fid needs updating, this field is used to + * return the new buffer + * + * \retval < 0 error occurred + * \retval 0 doesn't need to update filter_fid + * \retval FL_XATTR_{CREATE,REPLACE} flag for xattr update + */ +int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo, + const struct obdo *oa, struct filter_fid *ff) +{ + int rc = 0; + ENTRY; + + if (!(oa->o_valid & + (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION))) + RETURN(0); + + rc = ofd_object_ff_load(env, fo); + if (rc < 0 && rc != -ENODATA) + RETURN(rc); + + LASSERT(ff != &fo->ofo_ff); + if (rc == -ENODATA) { + rc = LU_XATTR_CREATE; + memset(ff, 0, sizeof(*ff)); + } else { + rc = LU_XATTR_REPLACE; + memcpy(ff, &fo->ofo_ff, sizeof(*ff)); + } + + if (oa->o_valid & OBD_MD_FLFID) { + /* packing fid and converting it to LE for storing into EA. + * Here ->o_stripe_idx should be filled by LOV and rest of + * fields - by client. */ + ff->ff_parent.f_seq = oa->o_parent_seq; + ff->ff_parent.f_oid = oa->o_parent_oid; + /* XXX: we are ignoring o_parent_ver here, since this should + * be the same for all objects in this fileset. */ + ff->ff_parent.f_ver = oa->o_stripe_idx; + } + if (oa->o_valid & OBD_MD_FLOSTLAYOUT) + ff->ff_layout = oa->o_layout; + + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n", + PFID(&fo->ofo_ff.ff_parent), + PFID(lu_object_fid(&fo->ofo_obj.do_lu)), + ff->ff_layout_version, oa->o_layout_version); + + /* only the MDS has the authority to update layout version */ + if (!(exp_connect_flags(ofd_info(env)->fti_exp) & + OBD_CONNECT_MDS)) { + CERROR(DFID": update layout version from client\n", + PFID(&fo->ofo_ff.ff_parent)); + + RETURN(-EPERM); + } + + if (ff->ff_layout_version & LU_LAYOUT_RESYNC) { + /* this opens a new era of writing */ + ff->ff_layout_version = 0; + ff->ff_range = 0; + } + + /* it's not allowed to change it to a smaller value */ + if (oa->o_layout_version < ff->ff_layout_version) + RETURN(-EINVAL); + + if (ff->ff_layout_version == 0 || + oa->o_layout_version & LU_LAYOUT_RESYNC) { + /* if LU_LAYOUT_RESYNC is set, it closes the era of + * writing. Only mirror I/O can write this object. */ + ff->ff_layout_version = oa->o_layout_version; + ff->ff_range = 0; + } else if (oa->o_layout_version > ff->ff_layout_version) { + ff->ff_range = MAX(ff->ff_range, + oa->o_layout_version - ff->ff_layout_version); + } + } + + if (memcmp(ff, &fo->ofo_ff, sizeof(*ff))) + filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); + else /* no change */ + rc = 0; + + RETURN(rc); +} + +/** * Set OFD object attributes. * * This function sets OFD object attributes taken from incoming request. @@ -476,20 +577,22 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, * \param[in] env execution environment * \param[in] fo OFD object * \param[in] la object attributes - * \param[in] ff filter_fid structure, contains additional attributes + * \param[in] oa obdo carries fid, ost_layout, layout version * * \retval 0 if successful * \retval negative value on error */ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, - struct lu_attr *la, struct filter_fid *ff) + struct lu_attr *la, struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; struct ofd_mod_data *fmd; - int ff_needed = 0; - int rc; + int fl; + int rc; + int rc2; ENTRY; ofd_write_lock(env, fo); @@ -508,17 +611,13 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(unlock, rc); - rc = ofd_attr_handle_ugid(env, fo, la, 1 /* is_setattr */); + rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */); if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -528,11 +627,16 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -542,30 +646,32 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - rc = dt_attr_set(env, ofd_object_child(fo), la, th, - ofd_object_capa(env, fo)); + rc = dt_attr_set(env, ofd_object_child(fo), la, th); if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(stop, rc); + + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th, BYPASS_CAPA); - if (rc == 0) { - fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); - fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); - /* Currently, the filter_fid::ff_parent::f_ver is not - * the real parent MDT-object's FID::f_ver, instead it - * is the OST-object index in its parent MDT-object's - * layout EA. */ - fo->ofo_pfid.f_stripe_idx = - le32_to_cpu(ff->ff_parent.f_stripe_idx); - } + XATTR_NAME_FID, fl, th); + if (!rc) + filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } GOTO(stop, rc); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; + unlock: ofd_write_unlock(env, fo); @@ -585,7 +691,6 @@ unlock: * \param[in] start start offset to punch from * \param[in] end end of punch * \param[in] la object attributes - * \param[in] ff filter_fid structure * \param[in] oa obdo struct from incoming request * * \retval 0 if successful @@ -593,27 +698,29 @@ unlock: */ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, __u64 start, __u64 end, struct lu_attr *la, - struct filter_fid *ff, struct obdo *oa) + struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); struct ofd_mod_data *fmd; struct dt_object *dob = ofd_object_child(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; - int ff_needed = 0; - int rc; + int fl; + int rc; + int rc2; ENTRY; /* we support truncate, not punch yet */ LASSERT(end == OBD_OBJECT_EOF); + ofd_write_lock(env, fo); fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid); if (fmd && fmd->fmd_mactime_xid < info->fti_xid) fmd->fmd_mactime_xid = info->fti_xid; ofd_fmd_put(info->fti_exp, fmd); - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) GOTO(unlock, rc = -ENOENT); @@ -623,22 +730,27 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, GOTO(unlock, rc); } + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) + GOTO(unlock, rc); + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) GOTO(unlock, rc); - rc = ofd_attr_handle_ugid(env, fo, la, 0 /* !is_setattr */); + rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */); if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -652,11 +764,16 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -666,34 +783,33 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th, - ofd_object_capa(env, fo)); + rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th); if (rc) GOTO(stop, rc); - rc = dt_attr_set(env, dob, la, th, ofd_object_capa(env, fo)); + rc = dt_attr_set(env, dob, la, th); if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(stop, rc); + rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th, BYPASS_CAPA); - if (rc == 0) { - fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); - fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); - /* Currently, the filter_fid::ff_parent::f_ver is not - * the real parent MDT-object's FID::f_ver, instead it - * is the OST-object index in its parent MDT-object's - * layout EA. */ - fo->ofo_pfid.f_stripe_idx = - le32_to_cpu(ff->ff_parent.f_stripe_idx); - } + XATTR_NAME_FID, fl, th); + if (!rc) + filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } GOTO(stop, rc); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2 != 0) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; unlock: ofd_write_unlock(env, fo); @@ -714,12 +830,13 @@ unlock: * \retval 0 if successful * \retval negative value on error */ -int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, +int ofd_destroy(const struct lu_env *env, struct ofd_object *fo, int orphan) { struct ofd_device *ofd = ofd_obj2dev(fo); struct thandle *th; - int rc = 0; + int rc = 0; + int rc2; ENTRY; @@ -731,8 +848,14 @@ int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, if (IS_ERR(th)) GOTO(unlock, rc = PTR_ERR(th)); - dt_declare_ref_del(env, ofd_object_child(fo), th); - dt_declare_destroy(env, ofd_object_child(fo), th); + rc = dt_declare_ref_del(env, ofd_object_child(fo), th); + if (rc < 0) + GOTO(stop, rc); + + rc = dt_declare_destroy(env, ofd_object_child(fo), th); + if (rc < 0) + GOTO(stop, rc); + if (orphan) rc = dt_trans_start_local(env, ofd->ofd_osd, th); else @@ -745,7 +868,12 @@ int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, dt_ref_del(env, ofd_object_child(fo), th); dt_destroy(env, ofd_object_child(fo), th); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s failed to stop transaction: %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; unlock: ofd_write_unlock(env, fo); RETURN(rc); @@ -772,23 +900,7 @@ int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo, ENTRY; if (ofd_object_exists(fo)) { - rc = dt_attr_get(env, ofd_object_child(fo), la, - ofd_object_capa(env, fo)); - -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0) - /* Try to correct for a bug in 2.1.0 (LU-221) that caused - * negative timestamps to appear to be in the far future, - * due old timestamp being stored on disk as an unsigned value. - * This fixes up any bad values stored on disk before - * returning them to the client, and ensures any timestamp - * updates are correct. LU-1042 */ - if (unlikely(la->la_atime == LU221_BAD_TIME)) - la->la_atime = 0; - if (unlikely(la->la_mtime == LU221_BAD_TIME)) - la->la_mtime = 0; - if (unlikely(la->la_ctime == LU221_BAD_TIME)) - la->la_ctime = 0; -#endif + rc = dt_attr_get(env, ofd_object_child(fo), la); } else { rc = -ENOENT; }