}
/**
+ * FLR: verify the layout version of object.
+ *
+ * \param[in] env execution environment
+ * \param[in] fo OFD object
+ * \param[in] oa OBDO structure with layout version
+ *
+ * \retval 0 on successful verification
+ * \retval -EINPROGRESS layout version is in transfer
+ * \retval -ESTALE the layout version on client is stale
+ */
+int ofd_verify_layout_version(const struct lu_env *env,
+ struct ofd_object *fo, const struct obdo *oa)
+{
+ int rc;
+ ENTRY;
+
+ rc = ofd_object_ff_load(env, fo);
+ if (rc < 0) {
+ if (rc == -ENODATA)
+ rc = -EINPROGRESS;
+ GOTO(out, rc);
+ }
+
+ /* this update is not legitimate */
+ if (oa->o_layout_version < fo->ofo_ff.ff_layout_version)
+ GOTO(out, rc = -ESTALE);
+
+ /* layout version is not transmitted yet */
+ if (oa->o_layout_version >
+ fo->ofo_ff.ff_layout_version + fo->ofo_ff.ff_range)
+ GOTO(out, rc = -EINPROGRESS);
+
+ EXIT;
+
+out:
+ CDEBUG(D_INODE, DFID " verify layout version: %u vs. %u, rc: %d\n",
+ PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
+ fo->ofo_ff.ff_layout_version, oa->o_layout_version, rc);
+ return rc;
+
+}
+
+/**
* Prepare buffers for read request processing.
*
* This function converts remote buffers from client to local buffers
}
}
+ /* need to verify layout version */
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+ rc = ofd_verify_layout_version(env, fo, oa);
+ if (rc) {
+ ofd_read_unlock(env, fo);
+ ofd_object_put(env, fo);
+ GOTO(out, rc);
+ }
+
+ oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+ }
+
/* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
* space back if possible */
tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
* \param[in] ofd OFD device
* \param[in] ofd_obj OFD object
* \param[in] la object attributes
- * \param[in] ff parent FID
+ * \param[in] oa obdo
*
* \retval 0 on successful attributes update
* \retval negative value on error
static int
ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
struct ofd_object *ofd_obj, struct lu_attr *la,
- struct filter_fid *ff)
+ struct obdo *oa)
{
struct ofd_thread_info *info = ofd_info(env);
+ struct filter_fid *ff = &info->fti_mds_fid;
__u64 valid = la->la_valid;
- int rc;
struct thandle *th;
struct dt_object *dt_obj;
- int ff_needed = 0;
+ int fl = 0;
+ int rc;
ENTRY;
if (rc != 0)
GOTO(out, rc);
- if (ff != NULL) {
- rc = ofd_object_ff_load(env, ofd_obj);
- if (rc == -ENODATA)
- ff_needed = 1;
- else if (rc < 0)
- GOTO(out, rc);
- }
+ fl = ofd_object_ff_update(env, ofd_obj, oa, ff);
+ if (fl < 0)
+ GOTO(out, rc = fl);
- if (!la->la_valid && !ff_needed)
+ if (!la->la_valid && !fl)
/* no attributes to set */
GOTO(out, rc = 0);
GOTO(out_tx, rc);
}
- if (ff_needed) {
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
le32_add_cpu(&ff->ff_parent.f_oid, -1);
- info->fti_buf.lb_buf = ff;
- info->fti_buf.lb_len = sizeof(*ff);
rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
XATTR_NAME_FID, 0, th);
if (rc)
GOTO(out_tx, rc);
}
- /* set filter fid EA */
- if (ff_needed) {
+ /* set filter fid EA.
+ * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID
+ * while the write lock should be held. However, it should work because
+ * write RPCs only modify ff_{parent,layout} and those information will
+ * be the same from all the write RPCs. The reason that fl is not used
+ * in dt_xattr_set() is to allow this race. */
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
GOTO(out_tx, rc);
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
0, th);
- if (!rc)
+ if (rc == 0)
filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff));
}
static int
ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
struct ofd_device *ofd, const struct lu_fid *fid,
- struct lu_attr *la, struct filter_fid *ff, int objcount,
+ struct lu_attr *la, struct obdo *oa, int objcount,
int niocount, struct niobuf_local *lnb,
unsigned long granted, int old_rc)
{
* dt_declare_write_commit() since quota enforcement is now handled in
* declare phases.
*/
- rc = ofd_write_attr_set(env, ofd, fo, la, ff);
+ rc = ofd_write_attr_set(env, ofd, fo, la, oa);
if (rc)
GOTO(out, rc);
struct ofd_mod_data *fmd;
__u64 valid;
struct ofd_device *ofd = ofd_exp(exp);
- struct filter_fid *ff = NULL;
const struct lu_fid *fid = &oa->o_oi.oi_fid;
int rc = 0;
ofd_fmd_put(exp, fmd);
la_from_obdo(&info->fti_attr, oa, valid);
- if (oa->o_valid & OBD_MD_FLFID) {
- ff = &info->fti_mds_fid;
- ofd_prepare_fidea(ff, oa);
- }
-
rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr,
- ff, objcount, npages, lnb,
+ oa, objcount, npages, lnb,
oa->o_grant_used, old_rc);
if (rc == 0)
obdo_from_la(oa, &info->fti_attr,