* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
return 0;
}
+struct ofd_precreate_cb {
+ struct dt_txn_commit_cb opc_cb;
+ struct ofd_seq *opc_oseq;
+ int opc_objects;
+};
+
+static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct ofd_precreate_cb *opc;
+ struct ofd_seq *oseq;
+
+ opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
+ oseq = opc->opc_oseq;
+
+ CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
+ opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
+ PFID(&oseq->os_oi.oi_fid), th->th_sync);
+ atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
+ ofd_seq_put(env, opc->opc_oseq);
+ OBD_FREE_PTR(opc);
+}
+
+static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
+ struct ofd_seq *oseq, int objects)
+{
+ struct ofd_precreate_cb *opc;
+ struct dt_txn_commit_cb *dcb;
+ int precreate, rc;
+
+ OBD_ALLOC_PTR(opc);
+ if (!opc)
+ return -ENOMEM;
+
+ precreate = atomic_read(&oseq->os_precreate_in_progress);
+ atomic_inc(&oseq->os_refc);
+ opc->opc_oseq = oseq;
+ opc->opc_objects = objects;
+ CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
+ opc->opc_objects, precreate,
+ PFID(&oseq->os_oi.oi_fid), th->th_sync);
+
+ if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
+ th->th_sync = 1;
+
+ dcb = &opc->opc_cb;
+ dcb->dcb_func = ofd_cb_precreate;
+ INIT_LIST_HEAD(&dcb->dcb_linkage);
+ strlcpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
+
+ rc = dt_trans_cb_add(th, dcb);
+ if (rc) {
+ ofd_seq_put(env, oseq);
+ OBD_FREE_PTR(opc);
+ return rc;
+ }
+
+ atomic_add(objects, &oseq->os_precreate_in_progress);
+
+ return 0;
+}
+
/**
* Precreate the given number \a nr of objects in the given sequence \a oseq.
*
/* Don't create objects beyond the valid range for this SEQ */
if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
- (id + nr) >= IDIF_MAX_OID)) {
+ (id + nr) > IDIF_MAX_OID)) {
CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
RETURN(rc = -ENOSPC);
} else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
- (id + nr) >= OBIF_MAX_OID)) {
+ (id + nr) > OBIF_MAX_OID)) {
CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
RETURN(rc = -ENOSPC);
}
- OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *));
+ OBD_ALLOC(batch, nr_saved * sizeof(*batch));
if (batch == NULL)
RETURN(-ENOMEM);
break;
}
- ofd_write_lock(env, fo);
batch[i] = fo;
}
info->fti_buf.lb_buf = &tmp;
* may cannot get latest last_id although new OST-object created. */
if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
tmp = cpu_to_le64(id + nr - 1);
- dt_write_lock(env, oseq->os_lastid_obj, 0);
+ dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
rc = dt_record_write(env, oseq->os_lastid_obj,
&info->fti_buf, &info->fti_off, th);
dt_write_unlock(env, oseq->os_lastid_obj);
fo = batch[i];
LASSERT(fo);
+ ofd_write_lock(env, fo);
+
/* Only the new created objects need to be recorded. */
if (ofd->ofd_osd->dd_record_fid_accessed) {
struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
rc = dt_create(env, next, &info->fti_attr, NULL,
&info->fti_dof, th);
+ ofd_write_unlock(env, fo);
if (rc < 0) {
if (i == 0)
GOTO(trans_stop, rc);
break;
}
LASSERT(ofd_object_exists(fo));
+ } else {
+ ofd_write_unlock(env, fo);
}
+
ofd_seq_last_oid_set(oseq, id + i);
}
info->fti_off = 0;
tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
- dt_write_lock(env, oseq->os_lastid_obj, 0);
+ dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
rc1 = dt_record_write(env, oseq->os_lastid_obj,
&info->fti_buf, &info->fti_off, th);
dt_write_unlock(env, oseq->os_lastid_obj);
ofd_seq_last_oid(oseq));
}
+ if (objects)
+ ofd_precreate_cb_add(env, th, oseq, objects);
trans_stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2)
out:
for (i = 0; i < nr_saved; i++) {
fo = batch[i];
- if (fo) {
- ofd_write_unlock(env, fo);
- ofd_object_put(env, fo);
- }
+ if (!fo)
+ continue;
+ ofd_object_put(env, fo);
}
- OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *));
+ OBD_FREE(batch, nr_saved * sizeof(*batch));
CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
"created %d/%d objects: %d\n", objects, nr_saved, rc);
RETURN(-EPERM);
}
+ if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
+ /* this opens a new era of writing */
+ ff->ff_layout_version = 0;
+ ff->ff_range = 0;
+ }
+
/* it's not allowed to change it to a smaller value */
if (oa->o_layout_version < ff->ff_layout_version)
RETURN(-EINVAL);
- if (ff->ff_layout_version == 0) {
+ if (ff->ff_layout_version == 0 ||
+ oa->o_layout_version & LU_LAYOUT_RESYNC) {
+ /* if LU_LAYOUT_RESYNC is set, it closes the era of
+ * writing. Only mirror I/O can write this object. */
ff->ff_layout_version = oa->o_layout_version;
ff->ff_range = 0;
} else if (oa->o_layout_version > ff->ff_layout_version) {
- ff->ff_range = MAX(ff->ff_range,
- oa->o_layout_version - ff->ff_layout_version);
+ ff->ff_range = max_t(__u32, ff->ff_range,
+ oa->o_layout_version -
+ ff->ff_layout_version);
}
}
int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
struct lu_attr *la, struct obdo *oa)
{
- struct ofd_thread_info *info = ofd_info(env);
- struct ofd_device *ofd = ofd_obj2dev(fo);
- struct filter_fid *ff = &info->fti_mds_fid;
- struct thandle *th;
- struct ofd_mod_data *fmd;
- int fl;
- int rc;
- int rc2;
+ struct ofd_thread_info *info = ofd_info(env);
+ struct ofd_device *ofd = ofd_obj2dev(fo);
+ struct filter_fid *ff = &info->fti_mds_fid;
+ struct thandle *th;
+ int fl, rc, rc2;
+
ENTRY;
- ofd_write_lock(env, fo);
if (!ofd_object_exists(fo))
- GOTO(unlock, rc = -ENOENT);
-
- if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
- fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
- if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
- fmd->fmd_mactime_xid = info->fti_xid;
- ofd_fmd_put(info->fti_exp, fmd);
- }
+ GOTO(out, rc = -ENOENT);
/* VBR: version recovery check */
rc = ofd_version_get_check(info, fo);
if (rc)
- GOTO(unlock, rc);
+ GOTO(out, rc);
rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
if (rc != 0)
- GOTO(unlock, rc);
-
- fl = ofd_object_ff_update(env, fo, oa, ff);
- if (fl < 0)
- GOTO(unlock, rc = fl);
+ GOTO(out, rc);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
- GOTO(unlock, rc = PTR_ERR(th));
+ GOTO(out, rc = PTR_ERR(th));
rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
if (rc)
GOTO(stop, rc);
- if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
- ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
- else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
- le32_add_cpu(&ff->ff_parent.f_oid, -1);
-
- info->fti_buf.lb_buf = ff;
- info->fti_buf.lb_len = sizeof(*ff);
- rc = dt_declare_xattr_set(env, ofd_object_child(fo),
- &info->fti_buf, XATTR_NAME_FID, fl,
- th);
- if (rc)
- GOTO(stop, rc);
- }
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
+ rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
+ XATTR_NAME_FID, 0, th);
+ if (rc)
+ GOTO(stop, rc);
rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
if (rc)
GOTO(stop, rc);
+ ofd_write_lock(env, fo);
+ if (!ofd_object_exists(fo))
+ GOTO(unlock, rc = -ENOENT);
+
+ /* serialize vs ofd_commitrw_write() */
+ if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
+ tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
+ info->fti_xid);
+
rc = dt_attr_set(env, ofd_object_child(fo), la, th);
if (rc)
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+
+ fl = ofd_object_ff_update(env, fo, oa, ff);
+ if (fl < 0)
+ GOTO(unlock, rc = fl);
if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
- GOTO(stop, rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
+ ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
+ else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
+ le32_add_cpu(&ff->ff_parent.f_oid, -1);
+ else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
+ GOTO(unlock, rc);
info->fti_buf.lb_buf = ff;
info->fti_buf.lb_len = sizeof(*ff);
filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
}
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+unlock:
+ ofd_write_unlock(env, fo);
stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2)
ofd_name(ofd), rc2);
if (!rc)
rc = rc2;
-
-unlock:
- ofd_write_unlock(env, fo);
-
+out:
return rc;
}
__u64 start, __u64 end, struct lu_attr *la,
struct obdo *oa)
{
- struct ofd_thread_info *info = ofd_info(env);
- struct ofd_device *ofd = ofd_obj2dev(fo);
- struct ofd_mod_data *fmd;
- struct dt_object *dob = ofd_object_child(fo);
- struct filter_fid *ff = &info->fti_mds_fid;
- struct thandle *th;
- int fl;
- int rc;
- int rc2;
+ struct ofd_thread_info *info = ofd_info(env);
+ struct ofd_device *ofd = ofd_obj2dev(fo);
+ struct dt_object *dob = ofd_object_child(fo);
+ struct filter_fid *ff = &info->fti_mds_fid;
+ struct thandle *th;
+ int fl, rc, rc2;
ENTRY;
/* we support truncate, not punch yet */
LASSERT(end == OBD_OBJECT_EOF);
- ofd_write_lock(env, fo);
- fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
- if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
- fmd->fmd_mactime_xid = info->fti_xid;
- ofd_fmd_put(info->fti_exp, fmd);
-
if (!ofd_object_exists(fo))
- GOTO(unlock, rc = -ENOENT);
+ GOTO(out, rc = -ENOENT);
if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
rc = ofd_verify_ff(env, fo, oa);
if (rc != 0)
- GOTO(unlock, rc);
- }
-
- /* need to verify layout version */
- if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
- rc = ofd_verify_layout_version(env, fo, oa);
- if (rc)
- GOTO(unlock, rc);
-
- oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+ GOTO(out, rc);
}
/* VBR: version recovery check */
rc = ofd_version_get_check(info, fo);
if (rc)
- GOTO(unlock, rc);
+ GOTO(out, rc);
rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
if (rc != 0)
- GOTO(unlock, rc);
-
- fl = ofd_object_ff_update(env, fo, oa, ff);
- if (fl < 0)
- GOTO(unlock, rc = fl);
+ GOTO(out, rc);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
- GOTO(unlock, rc = PTR_ERR(th));
+ GOTO(out, rc = PTR_ERR(th));
rc = dt_declare_attr_set(env, dob, la, th);
if (rc)
if (rc)
GOTO(stop, rc);
- if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
- ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
- else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
- le32_add_cpu(&ff->ff_parent.f_oid, -1);
-
- info->fti_buf.lb_buf = ff;
- info->fti_buf.lb_len = sizeof(*ff);
- rc = dt_declare_xattr_set(env, ofd_object_child(fo),
- &info->fti_buf, XATTR_NAME_FID, fl,
- th);
- if (rc)
- GOTO(stop, rc);
- }
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
+ rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
+ XATTR_NAME_FID, 0, th);
+ if (rc)
+ GOTO(stop, rc);
rc = ofd_trans_start(env, ofd, fo, th);
if (rc)
GOTO(stop, rc);
+ ofd_write_lock(env, fo);
+
+ if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
+ tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
+ info->fti_xid);
+
+ if (!ofd_object_exists(fo))
+ GOTO(unlock, rc = -ENOENT);
+
+ /* need to verify layout version */
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+ rc = ofd_verify_layout_version(env, fo, oa);
+ if (rc)
+ GOTO(unlock, rc);
+
+ oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+ }
+
rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
if (rc)
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+
+ fl = ofd_object_ff_update(env, fo, oa, ff);
+ if (fl < 0)
+ GOTO(unlock, rc = fl);
rc = dt_attr_set(env, dob, la, th);
if (rc)
- GOTO(stop, rc);
+ GOTO(unlock, rc);
if (fl) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
- GOTO(stop, rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
+ ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
+ else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
+ le32_add_cpu(&ff->ff_parent.f_oid, -1);
+ else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
+ GOTO(unlock, rc);
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
XATTR_NAME_FID, fl, th);
if (!rc)
filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
}
- GOTO(stop, rc);
+ GOTO(unlock, rc);
+unlock:
+ ofd_write_unlock(env, fo);
stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2 != 0)
ofd_name(ofd), rc2);
if (!rc)
rc = rc2;
-unlock:
- ofd_write_unlock(env, fo);
-
+out:
return rc;
}
ENTRY;
- ofd_write_lock(env, fo);
if (!ofd_object_exists(fo))
- GOTO(unlock, rc = -ENOENT);
+ GOTO(out, rc = -ENOENT);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
- GOTO(unlock, rc = PTR_ERR(th));
+ GOTO(out, rc = PTR_ERR(th));
rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
if (rc < 0)
if (rc)
GOTO(stop, rc);
- ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
+ ofd_write_lock(env, fo);
+ if (!ofd_object_exists(fo))
+ GOTO(stop, rc = -ENOENT);
+
+ tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
dt_ref_del(env, ofd_object_child(fo), th);
dt_destroy(env, ofd_object_child(fo), th);
+ ofd_write_unlock(env, fo);
+
stop:
rc2 = ofd_trans_stop(env, ofd, th, rc);
if (rc2)
ofd_name(ofd), rc2);
if (!rc)
rc = rc2;
-unlock:
- ofd_write_unlock(env, fo);
+out:
RETURN(rc);
}