X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fofd%2Fofd_objects.c;h=ec5fb4f78cfd8f9ae78cf680c6c91d056c27b19b;hp=3aa18f50377c0b09b1a43cf176588e2921b05210;hb=10da8afb278634a40be72f48dae42ce9755c62a0;hpb=ae052f9f2e2699389617cbd12cc815f77f7e499b diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index 3aa18f5..ec5fb4f 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -23,7 +23,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014 Intel Corporation. + * Copyright (c) 2012, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,7 +41,6 @@ #define DEBUG_SUBSYSTEM S_FILTER #include -#include #include #include "ofd_internal.h" @@ -76,7 +75,7 @@ static int ofd_version_get_check(struct ofd_thread_info *info, /* VBR: version is checked always because costs nothing */ if (info->fti_pre_version != 0 && info->fti_pre_version != curr_version) { - CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", + CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n", info->fti_pre_version, curr_version); spin_lock(&info->fti_exp->exp_lock); info->fti_exp->exp_vbr_failed = 1; @@ -121,7 +120,7 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, * Get FID of parent MDT object. * * This function reads extended attribute XATTR_NAME_FID of OFD object which - * contains the MDT parent object FID and saves it in ofd_object::ofo_pfid. + * contains the MDT parent object FID and saves it in ofd_object::ofo_ff. * * The filter_fid::ff_parent::f_ver field currently holds * the OST-object index in the parent MDT-object's layout EA, @@ -137,47 +136,31 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, */ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo) { - struct ofd_thread_info *info = ofd_info(env); - struct filter_fid_old *ff = &info->fti_mds_fid_old; - struct lu_buf *buf = &info->fti_buf; - struct lu_fid *pfid = &fo->ofo_pfid; - int rc = 0; + struct ofd_thread_info *info = ofd_info(env); + struct filter_fid *ff = &fo->ofo_ff; + struct lu_buf *buf = &info->fti_buf; + int rc = 0; - if (fid_is_sane(pfid)) + if (fid_is_sane(&ff->ff_parent)) return 0; buf->lb_buf = ff; buf->lb_len = sizeof(*ff); - rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID, - BYPASS_CAPA); + rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID); if (rc < 0) return rc; - if (rc < sizeof(struct lu_fid)) { - fid_zero(pfid); - - return -ENODATA; + if (unlikely(rc < sizeof(struct lu_fid))) { + fid_zero(&ff->ff_parent); + return -EINVAL; } - pfid->f_seq = le64_to_cpu(ff->ff_parent.f_seq); - pfid->f_oid = le32_to_cpu(ff->ff_parent.f_oid); - pfid->f_stripe_idx = le32_to_cpu(ff->ff_parent.f_stripe_idx); + filter_fid_le_to_cpu(ff, ff, rc); return 0; } /** - * Put OFD object reference. - * - * \param[in] env execution environment - * \param[in] fo OFD object - */ -void ofd_object_put(const struct lu_env *env, struct ofd_object *fo) -{ - lu_object_put(env, &fo->ofo_obj.do_lu); -} - -/** * Precreate the given number \a nr of objects in the given sequence \a oseq. * * This function precreates new OST objects in the given sequence. @@ -213,11 +196,12 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, struct thandle *th; struct ofd_object **batch; struct lu_fid *fid = &info->fti_fid; - u64 tmp; - int rc; - int i; - int objects = 0; - int nr_saved = nr; + u64 tmp; + int rc; + int rc2; + int i; + int objects = 0; + int nr_saved = nr; ENTRY; @@ -239,7 +223,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, RETURN(-ENOMEM); info->fti_attr.la_valid = LA_TYPE | LA_MODE; - info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | 0666; + info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666; info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG); info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME; @@ -294,7 +278,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, if (unlikely(ofd_object_exists(fo))) { /* object may exist being re-created by write replay */ - CDEBUG(D_INODE, "object "LPX64"/"LPX64" exists: " + CDEBUG(D_INODE, "object %#llx/%#llx exists: " DFID"\n", ostid_seq(&oseq->os_oi), id, PFID(lu_object_fid(&fo->ofo_obj.do_lu))); continue; @@ -350,12 +334,11 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, /* Only the new created objects need to be recorded. */ if (ofd->ofd_osd->dd_record_fid_accessed) { - struct lfsck_request *lr = &ofd_info(env)->fti_lr; + struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl; - lfsck_pack_rfa(lr, lu_object_fid(&fo->ofo_obj.do_lu), - LE_FID_ACCESSED, - LFSCK_TYPE_LAYOUT); - lfsck_in_notify(env, ofd->ofd_osd, lr, NULL); + lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu), + LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT); + lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL); } if (likely(!ofd_object_exists(fo) && @@ -390,14 +373,19 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, &info->fti_buf, &info->fti_off, th); dt_write_unlock(env, oseq->os_lastid_obj); if (rc1 != 0) - CERROR("%s: fail to reset the LAST_ID for seq ("LPX64 - ") from "LPU64" to "LPU64"\n", ofd_name(ofd), + CERROR("%s: fail to reset the LAST_ID for seq (%#llx" + ") from %llu to %llu\n", ofd_name(ofd), ostid_seq(&oseq->os_oi), id + nr - 1, ofd_seq_last_oid(oseq)); } trans_stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; out: for (i = 0; i < nr_saved; i++) { fo = batch[i]; @@ -420,9 +408,9 @@ out: * * If the object still has SUID+SGID bits set, meaning that it was precreated * by the MDT before it was assigned to any file, (see ofd_precreate_objects()) - * then we will accept the UID+GID if sent by the client for initializing the - * ownership of this object. We only allow this to happen once (so clear these - * bits) and later only allow setattr. + * then we will accept the UID/GID/PROJID if sent by the client for initializing + * the ownership of this object. We only allow this to happen once (so clear + * these bits) and later only allow setattr. * * \param[in] env execution environment * \param[in] fo OFD object @@ -432,7 +420,7 @@ out: * \retval 0 if successful * \retval negative value on error */ -int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, +int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo, struct lu_attr *la, int is_setattr) { struct ofd_thread_info *info = ofd_info(env); @@ -442,26 +430,37 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, ENTRY; - if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID)) + if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) && + !(la->la_valid & LA_PROJID)) RETURN(0); - rc = dt_attr_get(env, ofd_object_child(fo), ln, BYPASS_CAPA); + rc = dt_attr_get(env, ofd_object_child(fo), ln); if (rc != 0) RETURN(rc); LASSERT(ln->la_valid & LA_MODE); + /* + * Only allow setattr to change UID/GID/PROJID, if + * SUID+SGID is not set which means this is not + * initialization of this objects. + */ if (!is_setattr) { if (!(ln->la_mode & S_ISUID)) la->la_valid &= ~LA_UID; if (!(ln->la_mode & S_ISGID)) la->la_valid &= ~LA_GID; + if (!(ln->la_mode & S_ISVTX)) + la->la_valid &= ~LA_PROJID; } + /* Initialize ownership of this object, clear SUID+SGID bits*/ if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID)) mask |= S_ISUID; if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID)) mask |= S_ISGID; + if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX)) + mask |= S_ISVTX; if (mask != 0) { if (!(la->la_valid & LA_MODE) || !is_setattr) { la->la_mode = ln->la_mode; @@ -474,6 +473,91 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, } /** + * Check if it needs to update filter_fid by the value of @oa. + * + * \param[in] env env + * \param[in] fo ofd object + * \param[in] oa obdo from client or MDT + * \param[out] ff if filter_fid needs updating, this field is used to + * return the new buffer + * + * \retval < 0 error occurred + * \retval 0 doesn't need to update filter_fid + * \retval FL_XATTR_{CREATE,REPLACE} flag for xattr update + */ +int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo, + const struct obdo *oa, struct filter_fid *ff) +{ + int rc = 0; + ENTRY; + + if (!(oa->o_valid & + (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION))) + RETURN(0); + + rc = ofd_object_ff_load(env, fo); + if (rc < 0 && rc != -ENODATA) + RETURN(rc); + + LASSERT(ff != &fo->ofo_ff); + if (rc == -ENODATA) { + rc = LU_XATTR_CREATE; + memset(ff, 0, sizeof(*ff)); + } else { + rc = LU_XATTR_REPLACE; + memcpy(ff, &fo->ofo_ff, sizeof(*ff)); + } + + if (oa->o_valid & OBD_MD_FLFID) { + /* packing fid and converting it to LE for storing into EA. + * Here ->o_stripe_idx should be filled by LOV and rest of + * fields - by client. */ + ff->ff_parent.f_seq = oa->o_parent_seq; + ff->ff_parent.f_oid = oa->o_parent_oid; + /* XXX: we are ignoring o_parent_ver here, since this should + * be the same for all objects in this fileset. */ + ff->ff_parent.f_ver = oa->o_stripe_idx; + } + if (oa->o_valid & OBD_MD_FLOSTLAYOUT) + ff->ff_layout = oa->o_layout; + + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n", + PFID(&fo->ofo_ff.ff_parent), + PFID(lu_object_fid(&fo->ofo_obj.do_lu)), + ff->ff_layout_version, oa->o_layout_version); + + /* only the MDS has the authority to update layout version */ + if (!(exp_connect_flags(ofd_info(env)->fti_exp) & + OBD_CONNECT_MDS)) { + CERROR(DFID": update layout version from client\n", + PFID(&fo->ofo_ff.ff_parent)); + + RETURN(-EPERM); + } + + /* it's not allowed to change it to a smaller value */ + if (oa->o_layout_version < ff->ff_layout_version) + RETURN(-EINVAL); + + if (ff->ff_layout_version == 0) { + ff->ff_layout_version = oa->o_layout_version; + ff->ff_range = 0; + } else if (oa->o_layout_version > ff->ff_layout_version) { + ff->ff_range = MAX(ff->ff_range, + oa->o_layout_version - ff->ff_layout_version); + } + } + + if (memcmp(ff, &fo->ofo_ff, sizeof(*ff))) + filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); + else /* no change */ + rc = 0; + + RETURN(rc); +} + +/** * Set OFD object attributes. * * This function sets OFD object attributes taken from incoming request. @@ -484,20 +568,22 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, * \param[in] env execution environment * \param[in] fo OFD object * \param[in] la object attributes - * \param[in] ff filter_fid structure, contains additional attributes + * \param[in] oa obdo carries fid, ost_layout, layout version * * \retval 0 if successful * \retval negative value on error */ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, - struct lu_attr *la, struct filter_fid *ff) + struct lu_attr *la, struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; struct ofd_mod_data *fmd; - int ff_needed = 0; - int rc; + int fl; + int rc; + int rc2; ENTRY; ofd_write_lock(env, fo); @@ -516,17 +602,13 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(unlock, rc); - rc = ofd_attr_handle_ugid(env, fo, la, 1 /* is_setattr */); + rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */); if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -536,11 +618,16 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -550,30 +637,32 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - rc = dt_attr_set(env, ofd_object_child(fo), la, th, - ofd_object_capa(env, fo)); + rc = dt_attr_set(env, ofd_object_child(fo), la, th); if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(stop, rc); + + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th, BYPASS_CAPA); - if (rc == 0) { - fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); - fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); - /* Currently, the filter_fid::ff_parent::f_ver is not - * the real parent MDT-object's FID::f_ver, instead it - * is the OST-object index in its parent MDT-object's - * layout EA. */ - fo->ofo_pfid.f_stripe_idx = - le32_to_cpu(ff->ff_parent.f_stripe_idx); - } + XATTR_NAME_FID, fl, th); + if (!rc) + filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } GOTO(stop, rc); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; + unlock: ofd_write_unlock(env, fo); @@ -593,7 +682,6 @@ unlock: * \param[in] start start offset to punch from * \param[in] end end of punch * \param[in] la object attributes - * \param[in] ff filter_fid structure * \param[in] oa obdo struct from incoming request * * \retval 0 if successful @@ -601,15 +689,17 @@ unlock: */ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, __u64 start, __u64 end, struct lu_attr *la, - struct filter_fid *ff, struct obdo *oa) + struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); struct ofd_mod_data *fmd; struct dt_object *dob = ofd_object_child(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; - int ff_needed = 0; - int rc; + int fl; + int rc; + int rc2; ENTRY; @@ -631,22 +721,27 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, GOTO(unlock, rc); } + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) + GOTO(unlock, rc); + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) GOTO(unlock, rc); - rc = ofd_attr_handle_ugid(env, fo, la, 0 /* !is_setattr */); + rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */); if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -660,11 +755,16 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -674,34 +774,33 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th, - ofd_object_capa(env, fo)); + rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th); if (rc) GOTO(stop, rc); - rc = dt_attr_set(env, dob, la, th, ofd_object_capa(env, fo)); + rc = dt_attr_set(env, dob, la, th); if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(stop, rc); + rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th, BYPASS_CAPA); - if (rc == 0) { - fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); - fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); - /* Currently, the filter_fid::ff_parent::f_ver is not - * the real parent MDT-object's FID::f_ver, instead it - * is the OST-object index in its parent MDT-object's - * layout EA. */ - fo->ofo_pfid.f_stripe_idx = - le32_to_cpu(ff->ff_parent.f_stripe_idx); - } + XATTR_NAME_FID, fl, th); + if (!rc) + filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } GOTO(stop, rc); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2 != 0) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; unlock: ofd_write_unlock(env, fo); @@ -722,12 +821,13 @@ unlock: * \retval 0 if successful * \retval negative value on error */ -int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, +int ofd_destroy(const struct lu_env *env, struct ofd_object *fo, int orphan) { struct ofd_device *ofd = ofd_obj2dev(fo); struct thandle *th; - int rc = 0; + int rc = 0; + int rc2; ENTRY; @@ -759,7 +859,12 @@ int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, dt_ref_del(env, ofd_object_child(fo), th); dt_destroy(env, ofd_object_child(fo), th); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s failed to stop transaction: %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; unlock: ofd_write_unlock(env, fo); RETURN(rc); @@ -786,23 +891,7 @@ int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo, ENTRY; if (ofd_object_exists(fo)) { - rc = dt_attr_get(env, ofd_object_child(fo), la, - ofd_object_capa(env, fo)); - -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0) - /* Try to correct for a bug in 2.1.0 (LU-221) that caused - * negative timestamps to appear to be in the far future, - * due old timestamp being stored on disk as an unsigned value. - * This fixes up any bad values stored on disk before - * returning them to the client, and ensures any timestamp - * updates are correct. LU-1042 */ - if (unlikely(la->la_atime == LU221_BAD_TIME)) - la->la_atime = 0; - if (unlikely(la->la_mtime == LU221_BAD_TIME)) - la->la_mtime = 0; - if (unlikely(la->la_ctime == LU221_BAD_TIME)) - la->la_ctime = 0; -#endif + rc = dt_attr_get(env, ofd_object_child(fo), la); } else { rc = -ENOENT; }