X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fofd%2Fofd_objects.c;h=85791583ed35fa31a8b23c1b0ead60f1230cbe3a;hb=95f579520f793f2a2f357a51add59b62e783775e;hp=ec5fb4f78cfd8f9ae78cf680c6c91d056c27b19b;hpb=10da8afb278634a40be72f48dae42ce9755c62a0;p=fs%2Flustre-release.git diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index ec5fb4f..8579158 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -23,7 +23,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -160,6 +160,68 @@ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo) return 0; } +struct ofd_precreate_cb { + struct dt_txn_commit_cb opc_cb; + struct ofd_seq *opc_oseq; + int opc_objects; +}; + +static void ofd_cb_precreate(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) +{ + struct ofd_precreate_cb *opc; + struct ofd_seq *oseq; + + opc = container_of(cb, struct ofd_precreate_cb, opc_cb); + oseq = opc->opc_oseq; + + CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n", + opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress), + PFID(&oseq->os_oi.oi_fid), th->th_sync); + atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress); + ofd_seq_put(env, opc->opc_oseq); + OBD_FREE_PTR(opc); +} + +static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th, + struct ofd_seq *oseq, int objects) +{ + struct ofd_precreate_cb *opc; + struct dt_txn_commit_cb *dcb; + int precreate, rc; + + OBD_ALLOC_PTR(opc); + if (!opc) + return -ENOMEM; + + precreate = atomic_read(&oseq->os_precreate_in_progress); + atomic_inc(&oseq->os_refc); + opc->opc_oseq = oseq; + opc->opc_objects = objects; + CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n", + opc->opc_objects, precreate, + PFID(&oseq->os_oi.oi_fid), th->th_sync); + + if ((precreate + objects) >= (5 * OST_MAX_PRECREATE)) + th->th_sync = 1; + + dcb = &opc->opc_cb; + dcb->dcb_func = ofd_cb_precreate; + INIT_LIST_HEAD(&dcb->dcb_linkage); + strlcpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name)); + + rc = dt_trans_cb_add(th, dcb); + if (rc) { + ofd_seq_put(env, oseq); + OBD_FREE_PTR(opc); + return rc; + } + + atomic_add(objects, &oseq->os_precreate_in_progress); + + return 0; +} + /** * Precreate the given number \a nr of objects in the given sequence \a oseq. * @@ -207,18 +269,18 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, /* Don't create objects beyond the valid range for this SEQ */ if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && - (id + nr) >= IDIF_MAX_OID)) { + (id + nr) > IDIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n", ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && - (id + nr) >= OBIF_MAX_OID)) { + (id + nr) > OBIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n", ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); } - OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *)); + OBD_ALLOC(batch, nr_saved * sizeof(*batch)); if (batch == NULL) RETURN(-ENOMEM); @@ -254,7 +316,6 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, break; } - ofd_write_lock(env, fo); batch[i] = fo; } info->fti_buf.lb_buf = &tmp; @@ -320,7 +381,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, * may cannot get latest last_id although new OST-object created. */ if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) { tmp = cpu_to_le64(id + nr - 1); - dt_write_lock(env, oseq->os_lastid_obj, 0); + dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID); rc = dt_record_write(env, oseq->os_lastid_obj, &info->fti_buf, &info->fti_off, th); dt_write_unlock(env, oseq->os_lastid_obj); @@ -332,6 +393,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, fo = batch[i]; LASSERT(fo); + ofd_write_lock(env, fo); + /* Only the new created objects need to be recorded. */ if (ofd->ofd_osd->dd_record_fid_accessed) { struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl; @@ -348,6 +411,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, rc = dt_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th); + ofd_write_unlock(env, fo); if (rc < 0) { if (i == 0) GOTO(trans_stop, rc); @@ -356,7 +420,10 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, break; } LASSERT(ofd_object_exists(fo)); + } else { + ofd_write_unlock(env, fo); } + ofd_seq_last_oid_set(oseq, id + i); } @@ -368,7 +435,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, info->fti_off = 0; tmp = cpu_to_le64(ofd_seq_last_oid(oseq)); - dt_write_lock(env, oseq->os_lastid_obj, 0); + dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID); rc1 = dt_record_write(env, oseq->os_lastid_obj, &info->fti_buf, &info->fti_off, th); dt_write_unlock(env, oseq->os_lastid_obj); @@ -379,6 +446,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, ofd_seq_last_oid(oseq)); } + if (objects) + ofd_precreate_cb_add(env, th, oseq, objects); trans_stop: rc2 = ofd_trans_stop(env, ofd, th, rc); if (rc2) @@ -389,12 +458,11 @@ trans_stop: out: for (i = 0; i < nr_saved; i++) { fo = batch[i]; - if (fo) { - ofd_write_unlock(env, fo); - ofd_object_put(env, fo); - } + if (!fo) + continue; + ofd_object_put(env, fo); } - OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *)); + OBD_FREE(batch, nr_saved * sizeof(*batch)); CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER, "created %d/%d objects: %d\n", objects, nr_saved, rc); @@ -536,11 +604,20 @@ int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo, RETURN(-EPERM); } + if (ff->ff_layout_version & LU_LAYOUT_RESYNC) { + /* this opens a new era of writing */ + ff->ff_layout_version = 0; + ff->ff_range = 0; + } + /* it's not allowed to change it to a smaller value */ if (oa->o_layout_version < ff->ff_layout_version) RETURN(-EINVAL); - if (ff->ff_layout_version == 0) { + if (ff->ff_layout_version == 0 || + oa->o_layout_version & LU_LAYOUT_RESYNC) { + /* if LU_LAYOUT_RESYNC is set, it closes the era of + * writing. Only mirror I/O can write this object. */ ff->ff_layout_version = oa->o_layout_version; ff->ff_range = 0; } else if (oa->o_layout_version > ff->ff_layout_version) { @@ -576,74 +653,69 @@ int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo, int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, struct lu_attr *la, struct obdo *oa) { - struct ofd_thread_info *info = ofd_info(env); - struct ofd_device *ofd = ofd_obj2dev(fo); - struct filter_fid *ff = &info->fti_mds_fid; - struct thandle *th; - struct ofd_mod_data *fmd; - int fl; - int rc; - int rc2; + struct ofd_thread_info *info = ofd_info(env); + struct ofd_device *ofd = ofd_obj2dev(fo); + struct filter_fid *ff = &info->fti_mds_fid; + struct thandle *th; + int fl, rc, rc2; + ENTRY; - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); - - if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) { - fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid); - if (fmd && fmd->fmd_mactime_xid < info->fti_xid) - fmd->fmd_mactime_xid = info->fti_xid; - ofd_fmd_put(info->fti_exp, fmd); - } + GOTO(out, rc = -ENOENT); /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) - GOTO(unlock, rc); + GOTO(out, rc); rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */); if (rc != 0) - GOTO(unlock, rc); - - fl = ofd_object_ff_update(env, fo, oa, ff); - if (fl < 0) - GOTO(unlock, rc = fl); + GOTO(out, rc); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) - GOTO(unlock, rc = PTR_ERR(th)); + GOTO(out, rc = PTR_ERR(th)); rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th); if (rc) GOTO(stop, rc); - if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) - ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); - else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) - le32_add_cpu(&ff->ff_parent.f_oid, -1); - - info->fti_buf.lb_buf = ff; - info->fti_buf.lb_len = sizeof(*ff); - rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, fl, - th); - if (rc) - GOTO(stop, rc); - } + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf, + XATTR_NAME_FID, 0, th); + if (rc) + GOTO(stop, rc); rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th); if (rc) GOTO(stop, rc); + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + /* serialize vs ofd_commitrw_write() */ + if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) + tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid, + info->fti_xid); + rc = dt_attr_set(env, ofd_object_child(fo), la, th); if (rc) - GOTO(stop, rc); + GOTO(unlock, rc); + + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) - GOTO(stop, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(unlock, rc); info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); @@ -653,8 +725,10 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } - GOTO(stop, rc); + GOTO(unlock, rc); +unlock: + ofd_write_unlock(env, fo); stop: rc2 = ofd_trans_stop(env, ofd, th, rc); if (rc2) @@ -662,10 +736,7 @@ stop: ofd_name(ofd), rc2); if (!rc) rc = rc2; - -unlock: - ofd_write_unlock(env, fo); - +out: return rc; } @@ -691,61 +762,39 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, __u64 start, __u64 end, struct lu_attr *la, struct obdo *oa) { - struct ofd_thread_info *info = ofd_info(env); - struct ofd_device *ofd = ofd_obj2dev(fo); - struct ofd_mod_data *fmd; - struct dt_object *dob = ofd_object_child(fo); - struct filter_fid *ff = &info->fti_mds_fid; - struct thandle *th; - int fl; - int rc; - int rc2; + struct ofd_thread_info *info = ofd_info(env); + struct ofd_device *ofd = ofd_obj2dev(fo); + struct dt_object *dob = ofd_object_child(fo); + struct filter_fid *ff = &info->fti_mds_fid; + struct thandle *th; + int fl, rc, rc2; ENTRY; /* we support truncate, not punch yet */ LASSERT(end == OBD_OBJECT_EOF); - ofd_write_lock(env, fo); - fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid); - if (fmd && fmd->fmd_mactime_xid < info->fti_xid) - fmd->fmd_mactime_xid = info->fti_xid; - ofd_fmd_put(info->fti_exp, fmd); - if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(out, rc = -ENOENT); if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) { rc = ofd_verify_ff(env, fo, oa); if (rc != 0) - GOTO(unlock, rc); - } - - /* need to verify layout version */ - if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { - rc = ofd_verify_layout_version(env, fo, oa); - if (rc) - GOTO(unlock, rc); - - oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + GOTO(out, rc); } /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) - GOTO(unlock, rc); + GOTO(out, rc); rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */); if (rc != 0) - GOTO(unlock, rc); - - fl = ofd_object_ff_update(env, fo, oa, ff); - if (fl < 0) - GOTO(unlock, rc = fl); + GOTO(out, rc); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) - GOTO(unlock, rc = PTR_ERR(th)); + GOTO(out, rc = PTR_ERR(th)); rc = dt_declare_attr_set(env, dob, la, th); if (rc) @@ -755,45 +804,67 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) - ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); - else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) - le32_add_cpu(&ff->ff_parent.f_oid, -1); - - info->fti_buf.lb_buf = ff; - info->fti_buf.lb_len = sizeof(*ff); - rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, fl, - th); - if (rc) - GOTO(stop, rc); - } + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf, + XATTR_NAME_FID, 0, th); + if (rc) + GOTO(stop, rc); rc = ofd_trans_start(env, ofd, fo, th); if (rc) GOTO(stop, rc); + ofd_write_lock(env, fo); + + if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) + tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid, + info->fti_xid); + + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) + GOTO(unlock, rc); + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th); if (rc) - GOTO(stop, rc); + GOTO(unlock, rc); + + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); rc = dt_attr_set(env, dob, la, th); if (rc) - GOTO(stop, rc); + GOTO(unlock, rc); if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) - GOTO(stop, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(unlock, rc); + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, XATTR_NAME_FID, fl, th); if (!rc) filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } - GOTO(stop, rc); + GOTO(unlock, rc); +unlock: + ofd_write_unlock(env, fo); stop: rc2 = ofd_trans_stop(env, ofd, th, rc); if (rc2 != 0) @@ -801,9 +872,7 @@ stop: ofd_name(ofd), rc2); if (!rc) rc = rc2; -unlock: - ofd_write_unlock(env, fo); - +out: return rc; } @@ -831,13 +900,12 @@ int ofd_destroy(const struct lu_env *env, struct ofd_object *fo, ENTRY; - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(out, rc = -ENOENT); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) - GOTO(unlock, rc = PTR_ERR(th)); + GOTO(out, rc = PTR_ERR(th)); rc = dt_declare_ref_del(env, ofd_object_child(fo), th); if (rc < 0) @@ -854,10 +922,16 @@ int ofd_destroy(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid); + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(stop, rc = -ENOENT); + + tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid); dt_ref_del(env, ofd_object_child(fo), th); dt_destroy(env, ofd_object_child(fo), th); + ofd_write_unlock(env, fo); + stop: rc2 = ofd_trans_stop(env, ofd, th, rc); if (rc2) @@ -865,8 +939,7 @@ stop: ofd_name(ofd), rc2); if (!rc) rc = rc2; -unlock: - ofd_write_unlock(env, fo); +out: RETURN(rc); }