X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fofd%2Fofd_io.c;h=2c51f01a01ff63d98a06144ac5ab7390823287b3;hb=10da8afb278634a40be72f48dae42ce9755c62a0;hp=924bffab6eed77f6d88b63ff8f1fff0c3358825d;hpb=b601f8997dd5c666d49fc2e2ea15196dfc917a2d;p=fs%2Flustre-release.git diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index 924bffa..2c51f01 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -427,6 +427,49 @@ int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo, } /** + * FLR: verify the layout version of object. + * + * \param[in] env execution environment + * \param[in] fo OFD object + * \param[in] oa OBDO structure with layout version + * + * \retval 0 on successful verification + * \retval -EINPROGRESS layout version is in transfer + * \retval -ESTALE the layout version on client is stale + */ +int ofd_verify_layout_version(const struct lu_env *env, + struct ofd_object *fo, const struct obdo *oa) +{ + int rc; + ENTRY; + + rc = ofd_object_ff_load(env, fo); + if (rc < 0) { + if (rc == -ENODATA) + rc = -EINPROGRESS; + GOTO(out, rc); + } + + /* this update is not legitimate */ + if (oa->o_layout_version < fo->ofo_ff.ff_layout_version) + GOTO(out, rc = -ESTALE); + + /* layout version is not transmitted yet */ + if (oa->o_layout_version > + fo->ofo_ff.ff_layout_version + fo->ofo_ff.ff_range) + GOTO(out, rc = -EINPROGRESS); + + EXIT; + +out: + CDEBUG(D_INODE, DFID " verify layout version: %u vs. %u, rc: %d\n", + PFID(lu_object_fid(&fo->ofo_obj.do_lu)), + fo->ofo_ff.ff_layout_version, oa->o_layout_version, rc); + return rc; + +} + +/** * Prepare buffers for read request processing. * * This function converts remote buffers from client to local buffers @@ -628,6 +671,18 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp, } } + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) { + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + GOTO(out, rc); + } + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some * space back if possible */ tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt); @@ -817,7 +872,7 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd, * \param[in] ofd OFD device * \param[in] ofd_obj OFD object * \param[in] la object attributes - * \param[in] ff parent FID + * \param[in] oa obdo * * \retval 0 on successful attributes update * \retval negative value on error @@ -825,14 +880,15 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd, static int ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, struct ofd_object *ofd_obj, struct lu_attr *la, - struct filter_fid *ff) + struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); + struct filter_fid *ff = &info->fti_mds_fid; __u64 valid = la->la_valid; - int rc; struct thandle *th; struct dt_object *dt_obj; - int ff_needed = 0; + int fl = 0; + int rc; ENTRY; @@ -847,15 +903,11 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, if (rc != 0) GOTO(out, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, ofd_obj); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(out, rc); - } + fl = ofd_object_ff_update(env, ofd_obj, oa, ff); + if (fl < 0) + GOTO(out, rc = fl); - if (!la->la_valid && !ff_needed) + if (!la->la_valid && !fl) /* no attributes to set */ GOTO(out, rc = 0); @@ -869,14 +921,12 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, GOTO(out_tx, rc); } - if (ff_needed) { + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) le32_add_cpu(&ff->ff_parent.f_oid, -1); - info->fti_buf.lb_buf = ff; - info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, 0, th); if (rc) @@ -896,14 +946,21 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, GOTO(out_tx, rc); } - /* set filter fid EA */ - if (ff_needed) { + /* set filter fid EA. + * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID + * while the write lock should be held. However, it should work because + * write RPCs only modify ff_{parent,layout} and those information will + * be the same from all the write RPCs. The reason that fl is not used + * in dt_xattr_set() is to allow this race. */ + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) GOTO(out_tx, rc); + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, 0, th); - if (!rc) + if (rc == 0) filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff)); } @@ -1012,7 +1069,7 @@ static int ofd_soft_sync_cb_add(struct thandle *th, struct obd_export *exp) static int ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp, struct ofd_device *ofd, const struct lu_fid *fid, - struct lu_attr *la, struct filter_fid *ff, int objcount, + struct lu_attr *la, struct obdo *oa, int objcount, int niocount, struct niobuf_local *lnb, unsigned long granted, int old_rc) { @@ -1048,7 +1105,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp, * dt_declare_write_commit() since quota enforcement is now handled in * declare phases. */ - rc = ofd_write_attr_set(env, ofd, fo, la, ff); + rc = ofd_write_attr_set(env, ofd, fo, la, oa); if (rc) GOTO(out, rc); @@ -1203,7 +1260,6 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, struct ofd_mod_data *fmd; __u64 valid; struct ofd_device *ofd = ofd_exp(exp); - struct filter_fid *ff = NULL; const struct lu_fid *fid = &oa->o_oi.oi_fid; int rc = 0; @@ -1227,13 +1283,8 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, ofd_fmd_put(exp, fmd); la_from_obdo(&info->fti_attr, oa, valid); - if (oa->o_valid & OBD_MD_FLFID) { - ff = &info->fti_mds_fid; - ofd_prepare_fidea(ff, oa); - } - rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr, - ff, objcount, npages, lnb, + oa, objcount, npages, lnb, oa->o_grant_used, old_rc); if (rc == 0) obdo_from_la(oa, &info->fti_attr,