* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/ofd/ofd_objects.c
*
#include <dt_object.h>
#include <lustre_lfsck.h>
+#include <lustre_export.h>
#include "ofd_internal.h"
{
dt_obj_version_t curr_version;
- LASSERT(ofd_object_exists(fo));
-
if (info->fti_exp == NULL)
RETURN(0);
buf->lb_buf = ff;
buf->lb_len = sizeof(*ff);
rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
+ if (rc == -ERANGE) {
+ struct filter_fid *ff_new;
+
+ OBD_ALLOC(ff_new, sizeof(*ff) + FILTER_FID_EXTRA_SIZE);
+ if (!ff_new)
+ return -ENOMEM;
+ buf->lb_buf = ff_new;
+ buf->lb_len = sizeof(*ff) + FILTER_FID_EXTRA_SIZE;
+ rc = dt_xattr_get(env, ofd_object_child(fo), buf,
+ XATTR_NAME_FID);
+ if (rc > 0)
+ memcpy(ff, ff_new, sizeof(*ff));
+ OBD_FREE(ff_new, sizeof(*ff) + FILTER_FID_EXTRA_SIZE);
+ }
if (rc < 0)
return rc;
return 0;
}
+struct ofd_precreate_cb {
+ struct dt_txn_commit_cb opc_cb;
+ struct ofd_seq *opc_oseq;
+ int opc_objects;
+};
+
+static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct ofd_precreate_cb *opc;
+ struct ofd_seq *oseq;
+
+ opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
+ oseq = opc->opc_oseq;
+
+ CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
+ opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
+ PFID(&oseq->os_oi.oi_fid), th->th_sync);
+ atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
+ ofd_seq_put(env, opc->opc_oseq);
+ OBD_FREE_PTR(opc);
+}
+
+static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
+ struct ofd_seq *oseq, int objects)
+{
+ struct ofd_precreate_cb *opc;
+ struct dt_txn_commit_cb *dcb;
+ int precreate, rc;
+
+ OBD_ALLOC_PTR(opc);
+ if (!opc)
+ return -ENOMEM;
+
+ precreate = atomic_read(&oseq->os_precreate_in_progress);
+ refcount_inc(&oseq->os_refc);
+ opc->opc_oseq = oseq;
+ opc->opc_objects = objects;
+ CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
+ opc->opc_objects, precreate,
+ PFID(&oseq->os_oi.oi_fid), th->th_sync);
+
+ if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
+ th->th_sync = 1;
+
+ dcb = &opc->opc_cb;
+ dcb->dcb_func = ofd_cb_precreate;
+ INIT_LIST_HEAD(&dcb->dcb_linkage);
+ strscpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
+
+ rc = dt_trans_cb_add(th, dcb);
+ if (rc) {
+ ofd_seq_put(env, oseq);
+ OBD_FREE_PTR(opc);
+ return rc;
+ }
+
+ atomic_add(objects, &oseq->os_precreate_in_progress);
+
+ return 0;
+}
+
/**
* Precreate the given number \a nr of objects in the given sequence \a oseq.
*
* update the inode. The ctime = 0 case is also handled specially in
* osd_inode_setattr(). See LU-221, LU-1042 for details.
*
- * \param[in] env execution environment
- * \param[in] ofd OFD device
- * \param[in] id object ID to start precreation from
- * \param[in] oseq object sequence
- * \param[in] nr number of objects to precreate
- * \param[in] sync synchronous precreation flag
+ * \param[in] env execution environment
+ * \param[in] ofd OFD device
+ * \param[in] id object ID to start precreation from
+ * \param[in] oseq object sequence
+ * \param[in] nr number of objects to precreate
+ * \param[in] sync synchronous precreation flag
+ * \param[in] trans_local start local transaction
*
* \retval 0 if successful
* \retval negative value on error
*/
int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
- u64 id, struct ofd_seq *oseq, int nr, int sync)
+ u64 id, struct ofd_seq *oseq, int nr, int sync,
+ bool trans_local)
{
struct ofd_thread_info *info = ofd_info(env);
struct ofd_object *fo = NULL;
ENTRY;
- /* Don't create objects beyond the valid range for this SEQ */
+ /* Don't create objects beyond the valid range for this SEQ
+ * Last object to create is (id + nr - 1), but we move -1 on LHS
+ * to +1 on RHS to evaluate constant at compile time. LU-11186
+ */
if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
- (id + nr) >= IDIF_MAX_OID)) {
+ id + nr > IDIF_MAX_OID + 1)) {
CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
RETURN(rc = -ENOSPC);
} else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
- (id + nr) >= OBIF_MAX_OID)) {
+ id + nr > OBIF_MAX_OID + 1)) {
CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
RETURN(rc = -ENOSPC);
}
- OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *));
+ OBD_ALLOC_PTR_ARRAY(batch, nr_saved);
if (batch == NULL)
RETURN(-ENOMEM);
break;
}
- ofd_write_lock(env, fo);
batch[i] = fo;
}
info->fti_buf.lb_buf = &tmp;
}
}
- rc = dt_trans_start_local(env, ofd->ofd_osd, th);
+ /* Only needed for MDS+OSS rolling upgrade interop with 2.16+older. */
+ if (unlikely(trans_local))
+ rc = dt_trans_start_local(env, ofd->ofd_osd, th);
+ else
+ rc = dt_trans_start(env, ofd->ofd_osd, th);
if (rc)
GOTO(trans_stop, rc);
* To make above mechanism to work, before OFD pre-create OST-objects,
* it needs to update the LAST_ID file firstly, otherwise, the LFSCK
* may cannot get latest last_id although new OST-object created. */
- if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
+ if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
tmp = cpu_to_le64(id + nr - 1);
- dt_write_lock(env, oseq->os_lastid_obj, 0);
+ dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
rc = dt_record_write(env, oseq->os_lastid_obj,
&info->fti_buf, &info->fti_off, th);
dt_write_unlock(env, oseq->os_lastid_obj);
fo = batch[i];
LASSERT(fo);
+ ofd_write_lock(env, fo);
+
/* Only the new created objects need to be recorded. */
if (ofd->ofd_osd->dd_record_fid_accessed) {
struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
}
if (likely(!ofd_object_exists(fo) &&
- !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
+ !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
next = ofd_object_child(fo);
LASSERT(next != NULL);
rc = dt_create(env, next, &info->fti_attr, NULL,
&info->fti_dof, th);
+ ofd_write_unlock(env, fo);
if (rc < 0) {
if (i == 0)
GOTO(trans_stop, rc);
break;
}
LASSERT(ofd_object_exists(fo));
+ } else {
+ ofd_write_unlock(env, fo);
}
+
ofd_seq_last_oid_set(oseq, id + i);
}
info->fti_off = 0;
tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
- dt_write_lock(env, oseq->os_lastid_obj, 0);
+ dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
rc1 = dt_record_write(env, oseq->os_lastid_obj,
&info->fti_buf, &info->fti_off, th);
dt_write_unlock(env, oseq->os_lastid_obj);
ofd_seq_last_oid(oseq));
}
+ if (objects)
+ ofd_precreate_cb_add(env, th, oseq, objects);
trans_stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2)
out:
for (i = 0; i < nr_saved; i++) {
fo = batch[i];
- if (fo) {
- ofd_write_unlock(env, fo);
- ofd_object_put(env, fo);
- }
+ if (!fo)
+ continue;
+ ofd_object_put(env, fo);
}
- OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *));
+ OBD_FREE_PTR_ARRAY(batch, nr_saved);
CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
"created %d/%d objects: %d\n", objects, nr_saved, rc);
PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
ff->ff_layout_version, oa->o_layout_version);
- /* only the MDS has the authority to update layout version */
- if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
- OBD_CONNECT_MDS)) {
- CERROR(DFID": update layout version from client\n",
- PFID(&fo->ofo_ff.ff_parent));
-
- RETURN(-EPERM);
+ /**
+ * resync write from client on non-primary objects and
+ * resync start from MDS on primary objects will contain
+ * LU_LAYOUT_RESYNC flag in the @oa.
+ *
+ * The layout version checking for write/punch from client
+ * happens in ofd_verify_layout_version() before coming to
+ * here, so that resync with smaller layout version client
+ * will be rejected there, the biggest resync version will
+ * be recorded in the OFD objects.
+ */
+ if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
+ /* this opens a new era of writing */
+ ff->ff_layout_version = 0;
+ ff->ff_range = 0;
}
/* it's not allowed to change it to a smaller value */
- if (oa->o_layout_version < ff->ff_layout_version)
+ if (ofd_layout_version_less(oa->o_layout_version,
+ ff->ff_layout_version))
RETURN(-EINVAL);
- if (ff->ff_layout_version == 0) {
+ if (ff->ff_layout_version == 0 ||
+ oa->o_layout_version & LU_LAYOUT_RESYNC) {
+ /* if LU_LAYOUT_RESYNC is set, it closes the era of
+ * writing. Only mirror I/O can write this object. */
ff->ff_layout_version = oa->o_layout_version;
ff->ff_range = 0;
} else if (oa->o_layout_version > ff->ff_layout_version) {
- ff->ff_range = MAX(ff->ff_range,
- oa->o_layout_version - ff->ff_layout_version);
+ ff->ff_range = max_t(__u32, ff->ff_range,
+ oa->o_layout_version -
+ ff->ff_layout_version);
}
}
int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
struct lu_attr *la, struct obdo *oa)
{
- struct ofd_thread_info *info = ofd_info(env);
- struct ofd_device *ofd = ofd_obj2dev(fo);
- struct filter_fid *ff = &info->fti_mds_fid;
- struct thandle *th;
- struct ofd_mod_data *fmd;
- int fl;
- int rc;
- int rc2;
+ struct ofd_thread_info *info = ofd_info(env);
+ struct ofd_device *ofd = ofd_obj2dev(fo);
+ struct filter_fid *ff = &info->fti_mds_fid;
+ struct thandle *th;
+ int fl, rc, rc2;
+
ENTRY;
- ofd_write_lock(env, fo);
if (!ofd_object_exists(fo))
- GOTO(unlock, rc = -ENOENT);
-
- if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
- fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
- if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
- fmd->fmd_mactime_xid = info->fti_xid;
- ofd_fmd_put(info->fti_exp, fmd);
- }
+ GOTO(out, rc = -ENOENT);
/* VBR: version recovery check */
rc = ofd_version_get_check(info, fo);
if (rc)
- GOTO(unlock, rc);
+ GOTO(out, rc);
rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
if (rc != 0)
- GOTO(unlock, rc);
-
- fl = ofd_object_ff_update(env, fo, oa, ff);
- if (fl < 0)
- GOTO(unlock, rc = fl);
+ GOTO(out, rc);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
- GOTO(unlock, rc = PTR_ERR(th));
+ GOTO(out, rc = PTR_ERR(th));
rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
if (rc)
GOTO(stop, rc);
- if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
- ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
- else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
- le32_add_cpu(&ff->ff_parent.f_oid, -1);
-
- info->fti_buf.lb_buf = ff;
- info->fti_buf.lb_len = sizeof(*ff);
- rc = dt_declare_xattr_set(env, ofd_object_child(fo),
- &info->fti_buf, XATTR_NAME_FID, fl,
- th);
- if (rc)
- GOTO(stop, rc);
- }
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
+ rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
+ XATTR_NAME_FID, 0, th);
+ if (rc)
+ GOTO(stop, rc);
rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
if (rc)
GOTO(stop, rc);
+ ofd_write_lock(env, fo);
+ if (!ofd_object_exists(fo))
+ GOTO(unlock, rc = -ENOENT);
+
+ /* serialize vs ofd_commitrw_write() */
+ if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
+ tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
+ info->fti_xid);
+
rc = dt_attr_set(env, ofd_object_child(fo), la, th);
if (rc)
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+
+ fl = ofd_object_ff_update(env, fo, oa, ff);
+ if (fl < 0)
+ GOTO(unlock, rc = fl);
if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
- GOTO(stop, rc);
+ if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
+ ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
+ else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
+ le32_add_cpu(&ff->ff_parent.f_oid, -1);
+ else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
+ GOTO(unlock, rc);
info->fti_buf.lb_buf = ff;
info->fti_buf.lb_len = sizeof(*ff);
filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
}
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+unlock:
+ ofd_write_unlock(env, fo);
stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2)
ofd_name(ofd), rc2);
if (!rc)
rc = rc2;
+out:
+ return rc;
+}
-unlock:
- ofd_write_unlock(env, fo);
+/**
+ * Fallocate(Preallocate) space for OFD object.
+ *
+ * This function allocates space for the object from the \a start
+ * offset to the \a end offset.
+ *
+ * \param[in] env execution environment
+ * \param[in] fo OFD object
+ * \param[in] start start offset to allocate from
+ * \param[in] end end of allocate
+ * \param[in] mode fallocate mode
+ * \param[in] la object attributes
+ * \param[in] ff filter_fid structure
+ *
+ * \retval 0 if successful
+ * \retval negative value on error
+ */
+int ofd_object_fallocate(const struct lu_env *env, struct ofd_object *fo,
+ __u64 start, __u64 end, int mode, struct lu_attr *la,
+ struct obdo *oa)
+{
+ struct ofd_thread_info *info = ofd_info(env);
+ struct ofd_device *ofd = ofd_obj2dev(fo);
+ struct dt_object *dob = ofd_object_child(fo);
+ struct thandle *th;
+ struct filter_fid *ff = &info->fti_mds_fid;
+ bool ff_needed = false;
+ int rc;
- return rc;
+ ENTRY;
+
+ if (!ofd_object_exists(fo))
+ RETURN(-ENOENT);
+
+ /* VBR: version recovery check */
+ rc = ofd_version_get_check(info, fo);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (ff != NULL) {
+ rc = ofd_object_ff_load(env, fo);
+ if (rc == -ENODATA)
+ ff_needed = true;
+ else if (rc < 0)
+ RETURN(rc);
+
+ if (ff_needed) {
+ if (oa->o_valid & OBD_MD_FLFID) {
+ ff->ff_parent.f_seq = oa->o_parent_seq;
+ ff->ff_parent.f_oid = oa->o_parent_oid;
+ ff->ff_parent.f_ver = oa->o_stripe_idx;
+ }
+ if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
+ ff->ff_layout = oa->o_layout;
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION)
+ ff->ff_layout_version = oa->o_layout_version;
+ filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
+ }
+ }
+
+ th = ofd_trans_create(env, ofd);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = dt_declare_attr_set(env, dob, la, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_declare_fallocate(env, dob, start, end, mode, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ if (ff_needed) {
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
+ rc = dt_declare_xattr_set(env, ofd_object_child(fo),
+ &info->fti_buf, XATTR_NAME_FID, 0,
+ th);
+ if (rc)
+ GOTO(stop, rc);
+ }
+
+ rc = ofd_trans_start(env, ofd, fo, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ ofd_read_lock(env, fo);
+ if (!ofd_object_exists(fo))
+ GOTO(unlock, rc = -ENOENT);
+
+ if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
+ tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
+ info->fti_xid);
+
+ rc = dt_falloc(env, dob, start, end, mode, th);
+ if (rc)
+ GOTO(unlock, rc);
+
+ rc = dt_attr_set(env, dob, la, th);
+ if (rc)
+ GOTO(unlock, rc);
+
+ if (ff_needed) {
+ rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
+ XATTR_NAME_FID, 0, th);
+ if (!rc)
+ filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
+ }
+unlock:
+ ofd_read_unlock(env, fo);
+stop:
+ ofd_trans_stop(env, ofd, th, rc);
+ RETURN(rc);
}
/**
__u64 start, __u64 end, struct lu_attr *la,
struct obdo *oa)
{
- struct ofd_thread_info *info = ofd_info(env);
- struct ofd_device *ofd = ofd_obj2dev(fo);
- struct ofd_mod_data *fmd;
- struct dt_object *dob = ofd_object_child(fo);
- struct filter_fid *ff = &info->fti_mds_fid;
- struct thandle *th;
- int fl;
- int rc;
- int rc2;
+ struct ofd_thread_info *info = ofd_info(env);
+ struct ofd_device *ofd = ofd_obj2dev(fo);
+ struct dt_object *dob = ofd_object_child(fo);
+ struct filter_fid *ff = &info->fti_mds_fid;
+ struct thandle *th;
+ int fl, rc, rc2;
ENTRY;
/* we support truncate, not punch yet */
LASSERT(end == OBD_OBJECT_EOF);
- ofd_write_lock(env, fo);
- fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
- if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
- fmd->fmd_mactime_xid = info->fti_xid;
- ofd_fmd_put(info->fti_exp, fmd);
-
if (!ofd_object_exists(fo))
- GOTO(unlock, rc = -ENOENT);
+ GOTO(out, rc = -ENOENT);
if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
rc = ofd_verify_ff(env, fo, oa);
if (rc != 0)
- GOTO(unlock, rc);
- }
-
- /* need to verify layout version */
- if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
- rc = ofd_verify_layout_version(env, fo, oa);
- if (rc)
- GOTO(unlock, rc);
-
- oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+ GOTO(out, rc);
}
/* VBR: version recovery check */
rc = ofd_version_get_check(info, fo);
if (rc)
- GOTO(unlock, rc);
+ GOTO(out, rc);
rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
if (rc != 0)
- GOTO(unlock, rc);
-
- fl = ofd_object_ff_update(env, fo, oa, ff);
- if (fl < 0)
- GOTO(unlock, rc = fl);
+ GOTO(out, rc);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
- GOTO(unlock, rc = PTR_ERR(th));
+ GOTO(out, rc = PTR_ERR(th));
+ if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & LUSTRE_ENCRYPT_FL) {
+ /* punch must be aware we are dealing with an encrypted file */
+ la->la_valid |= LA_FLAGS;
+ la->la_flags |= LUSTRE_ENCRYPT_FL;
+ }
rc = dt_declare_attr_set(env, dob, la, th);
if (rc)
GOTO(stop, rc);
if (rc)
GOTO(stop, rc);
- if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
- ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
- else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
- le32_add_cpu(&ff->ff_parent.f_oid, -1);
-
- info->fti_buf.lb_buf = ff;
- info->fti_buf.lb_len = sizeof(*ff);
- rc = dt_declare_xattr_set(env, ofd_object_child(fo),
- &info->fti_buf, XATTR_NAME_FID, fl,
- th);
- if (rc)
- GOTO(stop, rc);
- }
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
+ rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
+ XATTR_NAME_FID, 0, th);
+ if (rc)
+ GOTO(stop, rc);
rc = ofd_trans_start(env, ofd, fo, th);
if (rc)
GOTO(stop, rc);
+ ofd_write_lock(env, fo);
+
+ if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
+ tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
+ info->fti_xid);
+
+ if (!ofd_object_exists(fo))
+ GOTO(unlock, rc = -ENOENT);
+
+ /* need to verify layout version */
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+ rc = ofd_verify_layout_version(env, fo, oa);
+ if (rc)
+ GOTO(unlock, rc);
+ }
+
rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
if (rc)
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+
+ fl = ofd_object_ff_update(env, fo, oa, ff);
+ if (fl < 0)
+ GOTO(unlock, rc = fl);
rc = dt_attr_set(env, dob, la, th);
if (rc)
- GOTO(stop, rc);
+ GOTO(unlock, rc);
if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
- GOTO(stop, rc);
+ if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
+ ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
+ else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
+ le32_add_cpu(&ff->ff_parent.f_oid, -1);
+ else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
+ GOTO(unlock, rc);
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
XATTR_NAME_FID, fl, th);
if (!rc)
filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
}
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+unlock:
+ ofd_write_unlock(env, fo);
stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2 != 0)
ofd_name(ofd), rc2);
if (!rc)
rc = rc2;
-unlock:
- ofd_write_unlock(env, fo);
-
+out:
return rc;
}
ENTRY;
- ofd_write_lock(env, fo);
if (!ofd_object_exists(fo))
- GOTO(unlock, rc = -ENOENT);
+ GOTO(out, rc = -ENOENT);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
- GOTO(unlock, rc = PTR_ERR(th));
+ GOTO(out, rc = PTR_ERR(th));
rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
if (rc < 0)
if (rc)
GOTO(stop, rc);
- ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
+ ofd_write_lock(env, fo);
+ if (!ofd_object_exists(fo))
+ GOTO(unlock, rc = -ENOENT);
+
+ tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
dt_ref_del(env, ofd_object_child(fo), th);
dt_destroy(env, ofd_object_child(fo), th);
+unlock:
+ ofd_write_unlock(env, fo);
stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2)
ofd_name(ofd), rc2);
if (!rc)
rc = rc2;
-unlock:
- ofd_write_unlock(env, fo);
+out:
RETURN(rc);
}