X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fofd%2Fofd_objects.c;h=7605de455321653ac73492592859e0f6d5cf4084;hp=07f7dc86037235e1831f45e61471a93c561a3e25;hb=b724079edc5b66e1046b5760a6bad3045e9a9260;hpb=d10200a80770f0029d1d665af954187b9ad883df diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index 07f7dc8..7605de4 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -23,7 +23,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,7 +41,6 @@ #define DEBUG_SUBSYSTEM S_FILTER #include -#include #include #include "ofd_internal.h" @@ -121,7 +120,7 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, * Get FID of parent MDT object. * * This function reads extended attribute XATTR_NAME_FID of OFD object which - * contains the MDT parent object FID and saves it in ofd_object::ofo_pfid. + * contains the MDT parent object FID and saves it in ofd_object::ofo_ff. * * The filter_fid::ff_parent::f_ver field currently holds * the OST-object index in the parent MDT-object's layout EA, @@ -137,13 +136,12 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, */ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo) { - struct ofd_thread_info *info = ofd_info(env); - struct filter_fid_old *ff = &info->fti_mds_fid_old; - struct lu_buf *buf = &info->fti_buf; - struct lu_fid *pfid = &fo->ofo_pfid; - int rc = 0; + struct ofd_thread_info *info = ofd_info(env); + struct filter_fid *ff = &fo->ofo_ff; + struct lu_buf *buf = &info->fti_buf; + int rc = 0; - if (fid_is_sane(pfid)) + if (fid_is_sane(&ff->ff_parent)) return 0; buf->lb_buf = ff; @@ -152,31 +150,17 @@ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo) if (rc < 0) return rc; - if (rc < sizeof(struct lu_fid)) { - fid_zero(pfid); - - return -ENODATA; + if (unlikely(rc < sizeof(struct lu_fid))) { + fid_zero(&ff->ff_parent); + return -EINVAL; } - pfid->f_seq = le64_to_cpu(ff->ff_parent.f_seq); - pfid->f_oid = le32_to_cpu(ff->ff_parent.f_oid); - pfid->f_stripe_idx = le32_to_cpu(ff->ff_parent.f_stripe_idx); + filter_fid_le_to_cpu(ff, ff, rc); return 0; } /** - * Put OFD object reference. - * - * \param[in] env execution environment - * \param[in] fo OFD object - */ -void ofd_object_put(const struct lu_env *env, struct ofd_object *fo) -{ - lu_object_put(env, &fo->ofo_obj.do_lu); -} - -/** * Precreate the given number \a nr of objects in the given sequence \a oseq. * * This function precreates new OST objects in the given sequence. @@ -212,22 +196,23 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, struct thandle *th; struct ofd_object **batch; struct lu_fid *fid = &info->fti_fid; - u64 tmp; - int rc; - int i; - int objects = 0; - int nr_saved = nr; + u64 tmp; + int rc; + int rc2; + int i; + int objects = 0; + int nr_saved = nr; ENTRY; /* Don't create objects beyond the valid range for this SEQ */ if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && - (id + nr) >= IDIF_MAX_OID)) { + (id + nr) > IDIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n", ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && - (id + nr) >= OBIF_MAX_OID)) { + (id + nr) > OBIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n", ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); @@ -238,7 +223,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, RETURN(-ENOMEM); info->fti_attr.la_valid = LA_TYPE | LA_MODE; - info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | 0666; + info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666; info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG); info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME; @@ -349,12 +334,11 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, /* Only the new created objects need to be recorded. */ if (ofd->ofd_osd->dd_record_fid_accessed) { - struct lfsck_request *lr = &ofd_info(env)->fti_lr; + struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl; - lfsck_pack_rfa(lr, lu_object_fid(&fo->ofo_obj.do_lu), - LE_FID_ACCESSED, - LFSCK_TYPE_LAYOUT); - lfsck_in_notify(env, ofd->ofd_osd, lr, NULL); + lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu), + LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT); + lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL); } if (likely(!ofd_object_exists(fo) && @@ -396,7 +380,12 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, } trans_stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; out: for (i = 0; i < nr_saved; i++) { fo = batch[i]; @@ -419,9 +408,9 @@ out: * * If the object still has SUID+SGID bits set, meaning that it was precreated * by the MDT before it was assigned to any file, (see ofd_precreate_objects()) - * then we will accept the UID+GID if sent by the client for initializing the - * ownership of this object. We only allow this to happen once (so clear these - * bits) and later only allow setattr. + * then we will accept the UID/GID/PROJID if sent by the client for initializing + * the ownership of this object. We only allow this to happen once (so clear + * these bits) and later only allow setattr. * * \param[in] env execution environment * \param[in] fo OFD object @@ -431,7 +420,7 @@ out: * \retval 0 if successful * \retval negative value on error */ -int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, +int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo, struct lu_attr *la, int is_setattr) { struct ofd_thread_info *info = ofd_info(env); @@ -441,7 +430,8 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, ENTRY; - if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID)) + if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) && + !(la->la_valid & LA_PROJID)) RETURN(0); rc = dt_attr_get(env, ofd_object_child(fo), ln); @@ -450,17 +440,27 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, LASSERT(ln->la_valid & LA_MODE); + /* + * Only allow setattr to change UID/GID/PROJID, if + * SUID+SGID is not set which means this is not + * initialization of this objects. + */ if (!is_setattr) { if (!(ln->la_mode & S_ISUID)) la->la_valid &= ~LA_UID; if (!(ln->la_mode & S_ISGID)) la->la_valid &= ~LA_GID; + if (!(ln->la_mode & S_ISVTX)) + la->la_valid &= ~LA_PROJID; } + /* Initialize ownership of this object, clear SUID+SGID bits*/ if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID)) mask |= S_ISUID; if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID)) mask |= S_ISGID; + if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX)) + mask |= S_ISVTX; if (mask != 0) { if (!(la->la_valid & LA_MODE) || !is_setattr) { la->la_mode = ln->la_mode; @@ -473,6 +473,100 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, } /** + * Check if it needs to update filter_fid by the value of @oa. + * + * \param[in] env env + * \param[in] fo ofd object + * \param[in] oa obdo from client or MDT + * \param[out] ff if filter_fid needs updating, this field is used to + * return the new buffer + * + * \retval < 0 error occurred + * \retval 0 doesn't need to update filter_fid + * \retval FL_XATTR_{CREATE,REPLACE} flag for xattr update + */ +int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo, + const struct obdo *oa, struct filter_fid *ff) +{ + int rc = 0; + ENTRY; + + if (!(oa->o_valid & + (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION))) + RETURN(0); + + rc = ofd_object_ff_load(env, fo); + if (rc < 0 && rc != -ENODATA) + RETURN(rc); + + LASSERT(ff != &fo->ofo_ff); + if (rc == -ENODATA) { + rc = LU_XATTR_CREATE; + memset(ff, 0, sizeof(*ff)); + } else { + rc = LU_XATTR_REPLACE; + memcpy(ff, &fo->ofo_ff, sizeof(*ff)); + } + + if (oa->o_valid & OBD_MD_FLFID) { + /* packing fid and converting it to LE for storing into EA. + * Here ->o_stripe_idx should be filled by LOV and rest of + * fields - by client. */ + ff->ff_parent.f_seq = oa->o_parent_seq; + ff->ff_parent.f_oid = oa->o_parent_oid; + /* XXX: we are ignoring o_parent_ver here, since this should + * be the same for all objects in this fileset. */ + ff->ff_parent.f_ver = oa->o_stripe_idx; + } + if (oa->o_valid & OBD_MD_FLOSTLAYOUT) + ff->ff_layout = oa->o_layout; + + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n", + PFID(&fo->ofo_ff.ff_parent), + PFID(lu_object_fid(&fo->ofo_obj.do_lu)), + ff->ff_layout_version, oa->o_layout_version); + + /* only the MDS has the authority to update layout version */ + if (!(exp_connect_flags(ofd_info(env)->fti_exp) & + OBD_CONNECT_MDS)) { + CERROR(DFID": update layout version from client\n", + PFID(&fo->ofo_ff.ff_parent)); + + RETURN(-EPERM); + } + + if (ff->ff_layout_version & LU_LAYOUT_RESYNC) { + /* this opens a new era of writing */ + ff->ff_layout_version = 0; + ff->ff_range = 0; + } + + /* it's not allowed to change it to a smaller value */ + if (oa->o_layout_version < ff->ff_layout_version) + RETURN(-EINVAL); + + if (ff->ff_layout_version == 0 || + oa->o_layout_version & LU_LAYOUT_RESYNC) { + /* if LU_LAYOUT_RESYNC is set, it closes the era of + * writing. Only mirror I/O can write this object. */ + ff->ff_layout_version = oa->o_layout_version; + ff->ff_range = 0; + } else if (oa->o_layout_version > ff->ff_layout_version) { + ff->ff_range = MAX(ff->ff_range, + oa->o_layout_version - ff->ff_layout_version); + } + } + + if (memcmp(ff, &fo->ofo_ff, sizeof(*ff))) + filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); + else /* no change */ + rc = 0; + + RETURN(rc); +} + +/** * Set OFD object attributes. * * This function sets OFD object attributes taken from incoming request. @@ -483,20 +577,22 @@ int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo, * \param[in] env execution environment * \param[in] fo OFD object * \param[in] la object attributes - * \param[in] ff filter_fid structure, contains additional attributes + * \param[in] oa obdo carries fid, ost_layout, layout version * * \retval 0 if successful * \retval negative value on error */ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, - struct lu_attr *la, struct filter_fid *ff) + struct lu_attr *la, struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; struct ofd_mod_data *fmd; - int ff_needed = 0; - int rc; + int fl; + int rc; + int rc2; ENTRY; ofd_write_lock(env, fo); @@ -515,17 +611,13 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(unlock, rc); - rc = ofd_attr_handle_ugid(env, fo, la, 1 /* is_setattr */); + rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */); if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -535,11 +627,16 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -553,25 +650,28 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(stop, rc); + + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th); - if (rc == 0) { - fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); - fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); - /* Currently, the filter_fid::ff_parent::f_ver is not - * the real parent MDT-object's FID::f_ver, instead it - * is the OST-object index in its parent MDT-object's - * layout EA. */ - fo->ofo_pfid.f_stripe_idx = - le32_to_cpu(ff->ff_parent.f_stripe_idx); - } + XATTR_NAME_FID, fl, th); + if (!rc) + filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } GOTO(stop, rc); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; + unlock: ofd_write_unlock(env, fo); @@ -591,7 +691,6 @@ unlock: * \param[in] start start offset to punch from * \param[in] end end of punch * \param[in] la object attributes - * \param[in] ff filter_fid structure * \param[in] oa obdo struct from incoming request * * \retval 0 if successful @@ -599,15 +698,17 @@ unlock: */ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, __u64 start, __u64 end, struct lu_attr *la, - struct filter_fid *ff, struct obdo *oa) + struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); struct ofd_mod_data *fmd; struct dt_object *dob = ofd_object_child(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; - int ff_needed = 0; - int rc; + int fl; + int rc; + int rc2; ENTRY; @@ -629,22 +730,27 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, GOTO(unlock, rc); } + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) + GOTO(unlock, rc); + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) GOTO(unlock, rc); - rc = ofd_attr_handle_ugid(env, fo, la, 0 /* !is_setattr */); + rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */); if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -658,11 +764,16 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -680,25 +791,25 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(stop, rc); + rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th); - if (rc == 0) { - fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); - fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); - /* Currently, the filter_fid::ff_parent::f_ver is not - * the real parent MDT-object's FID::f_ver, instead it - * is the OST-object index in its parent MDT-object's - * layout EA. */ - fo->ofo_pfid.f_stripe_idx = - le32_to_cpu(ff->ff_parent.f_stripe_idx); - } + XATTR_NAME_FID, fl, th); + if (!rc) + filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } GOTO(stop, rc); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2 != 0) + CERROR("%s: failed to stop transaction: rc = %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; unlock: ofd_write_unlock(env, fo); @@ -719,12 +830,13 @@ unlock: * \retval 0 if successful * \retval negative value on error */ -int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, +int ofd_destroy(const struct lu_env *env, struct ofd_object *fo, int orphan) { struct ofd_device *ofd = ofd_obj2dev(fo); struct thandle *th; - int rc = 0; + int rc = 0; + int rc2; ENTRY; @@ -756,7 +868,12 @@ int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, dt_ref_del(env, ofd_object_child(fo), th); dt_destroy(env, ofd_object_child(fo), th); stop: - ofd_trans_stop(env, ofd, th, rc); + rc2 = ofd_trans_stop(env, ofd, th, rc); + if (rc2) + CERROR("%s failed to stop transaction: %d\n", + ofd_name(ofd), rc2); + if (!rc) + rc = rc2; unlock: ofd_write_unlock(env, fo); RETURN(rc);