From 9f79d4488fbb466647d1d09c2e6a1d3555d062fc Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Tue, 13 Feb 2018 16:10:37 +0300 Subject: [PATCH] LU-10048 ofd: take local locks within transaction The patch (with companion patch LU-10048 osd: async truncate) addresses long outstanding technical debt resulting in different locking order on MDT and OST. With OUT introduction this mismatch lead to deadlocks as OUT is used by the both sides and can't easily support two different locking models simultanously. The unified locking rules are: - open transaction (dt_trans_start()), - then take a lock (dt_{read|write}_lock()). Change-Id: I1eaeb58ae3d61869914293a9fea2d0a1faefe76b Signed-off-by: Alex Zhuravlev Reviewed-on: https://review.whamcloud.com/31293 Reviewed-by: Mike Pershin Reviewed-by: Andreas Dilger Reviewed-by: Lai Siyao Tested-by: jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/ofd/ofd_io.c | 86 +++++++++++--------- lustre/ofd/ofd_objects.c | 180 +++++++++++++++++++++-------------------- lustre/osd-ldiskfs/osd_quota.c | 21 ++--- lustre/ptlrpc/client.c | 3 +- 4 files changed, 148 insertions(+), 142 deletions(-) diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index 936faf8..092e592 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -660,6 +660,12 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp, ofd_seq_put(env, oseq); } + /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some + * space back if possible, we have to do this outside of the lock as + * grant preparation may need to sync whole fs thus wait for all the + * transactions to complete. */ + tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt); + fo = ofd_object_find(env, ofd, fid); if (IS_ERR(fo)) GOTO(out, rc = PTR_ERR(fo)); @@ -695,10 +701,6 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp, oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; } - /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some - * space back if possible */ - tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt); - if (ptlrpc_connection_is_local(exp->exp_connection)) dbt |= DT_BUFS_TYPE_LOCAL; @@ -727,6 +729,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp, if (unlikely(rc != 0)) GOTO(err, rc); + ofd_read_unlock(env, fo); ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid, tot_bytes); RETURN(0); err: @@ -915,11 +918,8 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, if (rc != 0) GOTO(out, rc); - fl = ofd_object_ff_update(env, ofd_obj, oa, ff); - if (fl < 0) - GOTO(out, rc = fl); - - if (!la->la_valid && !fl) + if (!la->la_valid && !(oa->o_valid & + (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION))) /* no attributes to set */ GOTO(out, rc = 0); @@ -933,17 +933,10 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, GOTO(out_tx, rc); } - if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) - ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); - else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) - le32_add_cpu(&ff->ff_parent.f_oid, -1); - - rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf, - XATTR_NAME_FID, 0, th); - if (rc) - GOTO(out_tx, rc); - } + rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf, + XATTR_NAME_FID, 0, th); + if (rc) + GOTO(out_tx, rc); /* We don't need a transno for this operation which will be re-executed * anyway when the OST_WRITE (with a transno assigned) is replayed */ @@ -951,33 +944,42 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, if (rc) GOTO(out_tx, rc); + ofd_read_lock(env, ofd_obj); + /* set uid/gid/projid */ if (la->la_valid) { rc = dt_attr_set(env, dt_obj, la, th); if (rc) - GOTO(out_tx, rc); + GOTO(out_unlock, rc); } + fl = ofd_object_ff_update(env, ofd_obj, oa, ff); + if (fl <= 0) + GOTO(out_unlock, rc = fl); + /* set filter fid EA. * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID * while the write lock should be held. However, it should work because * write RPCs only modify ff_{parent,layout} and those information will * be the same from all the write RPCs. The reason that fl is not used * in dt_xattr_set() is to allow this race. */ - if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) - GOTO(out_tx, rc); - - info->fti_buf.lb_buf = ff; - info->fti_buf.lb_len = sizeof(*ff); - rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, - 0, th); - if (rc == 0) - filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff)); - } - - GOTO(out_tx, rc); - + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(out_unlock, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, 0, th); + if (rc == 0) + filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff)); + + GOTO(out_unlock, rc); + +out_unlock: + ofd_read_unlock(env, ofd_obj); out_tx: dt_trans_stop(env, ofd->ofd_osd, th); out: @@ -1103,13 +1105,14 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp, fo = ofd_object_find(env, ofd, fid); LASSERT(fo != NULL); - LASSERT(ofd_object_exists(fo)); o = ofd_object_child(fo); LASSERT(o != NULL); if (old_rc) GOTO(out, rc = old_rc); + if (!ofd_object_exists(fo)) + GOTO(out, rc = -ENOENT); /* * The first write to each object must set some attributes. It is @@ -1182,21 +1185,27 @@ retry: if (rc) GOTO(out_stop, rc); + ofd_read_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(out_unlock, rc = -ENOENT); + if (likely(!fake_write)) { rc = dt_write_commit(env, o, lnb, niocount, th); if (rc) - GOTO(out_stop, rc); + GOTO(out_unlock, rc); } if (la->la_valid) { rc = dt_attr_set(env, o, la, th); if (rc) - GOTO(out_stop, rc); + GOTO(out_unlock, rc); } /* get attr to return */ rc = dt_attr_get(env, o, la); +out_unlock: + ofd_read_unlock(env, fo); out_stop: /* Force commit to make the just-deleted blocks * reusable. LU-456 */ @@ -1232,7 +1241,6 @@ out_stop: out: dt_bufs_put(env, o, lnb, niocount); - ofd_read_unlock(env, fo); ofd_object_put(env, fo); /* second put is pair to object_get in ofd_preprw_write */ ofd_object_put(env, fo); diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index 53e5b64..e98e72b 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -316,7 +316,6 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, break; } - ofd_write_lock(env, fo); batch[i] = fo; } info->fti_buf.lb_buf = &tmp; @@ -394,6 +393,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, fo = batch[i]; LASSERT(fo); + ofd_write_lock(env, fo); + /* Only the new created objects need to be recorded. */ if (ofd->ofd_osd->dd_record_fid_accessed) { struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl; @@ -410,6 +411,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, rc = dt_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th); + ofd_write_unlock(env, fo); if (rc < 0) { if (i == 0) GOTO(trans_stop, rc); @@ -418,7 +420,10 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, break; } LASSERT(ofd_object_exists(fo)); + } else { + ofd_write_unlock(env, fo); } + ofd_seq_last_oid_set(oseq, id + i); } @@ -453,10 +458,9 @@ trans_stop: out: for (i = 0; i < nr_saved; i++) { fo = batch[i]; - if (fo) { - ofd_write_unlock(env, fo); - ofd_object_put(env, fo); - } + if (!fo) + continue; + ofd_object_put(env, fo); } OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *)); @@ -657,9 +661,8 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, ENTRY; - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(out, rc = -ENOENT); if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid, @@ -668,50 +671,50 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) - GOTO(unlock, rc); + GOTO(out, rc); rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */); if (rc != 0) - GOTO(unlock, rc); - - fl = ofd_object_ff_update(env, fo, oa, ff); - if (fl < 0) - GOTO(unlock, rc = fl); + GOTO(out, rc); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) - GOTO(unlock, rc = PTR_ERR(th)); + GOTO(out, rc = PTR_ERR(th)); rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th); if (rc) GOTO(stop, rc); - if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) - ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); - else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) - le32_add_cpu(&ff->ff_parent.f_oid, -1); - - info->fti_buf.lb_buf = ff; - info->fti_buf.lb_len = sizeof(*ff); - rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, fl, - th); - if (rc) - GOTO(stop, rc); - } + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf, + XATTR_NAME_FID, 0, th); + if (rc) + GOTO(stop, rc); rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th); if (rc) GOTO(stop, rc); + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + rc = dt_attr_set(env, ofd_object_child(fo), la, th); if (rc) - GOTO(stop, rc); + GOTO(unlock, rc); + + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) - GOTO(stop, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(unlock, rc); info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); @@ -721,8 +724,10 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } - GOTO(stop, rc); + GOTO(unlock, rc); +unlock: + ofd_write_unlock(env, fo); stop: rc2 = ofd_trans_stop(env, ofd, th, rc); if (rc2) @@ -730,10 +735,7 @@ stop: ofd_name(ofd), rc2); if (!rc) rc = rc2; - -unlock: - ofd_write_unlock(env, fo); - +out: return rc; } @@ -771,45 +773,27 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, /* we support truncate, not punch yet */ LASSERT(end == OBD_OBJECT_EOF); - ofd_write_lock(env, fo); - if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) - tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid, - info->fti_xid); - if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(out, rc = -ENOENT); if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) { rc = ofd_verify_ff(env, fo, oa); if (rc != 0) - GOTO(unlock, rc); - } - - /* need to verify layout version */ - if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { - rc = ofd_verify_layout_version(env, fo, oa); - if (rc) - GOTO(unlock, rc); - - oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + GOTO(out, rc); } /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) - GOTO(unlock, rc); + GOTO(out, rc); rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */); if (rc != 0) - GOTO(unlock, rc); - - fl = ofd_object_ff_update(env, fo, oa, ff); - if (fl < 0) - GOTO(unlock, rc = fl); + GOTO(out, rc); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) - GOTO(unlock, rc = PTR_ERR(th)); + GOTO(out, rc = PTR_ERR(th)); rc = dt_declare_attr_set(env, dob, la, th); if (rc) @@ -819,45 +803,67 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) - ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); - else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) - le32_add_cpu(&ff->ff_parent.f_oid, -1); - - info->fti_buf.lb_buf = ff; - info->fti_buf.lb_len = sizeof(*ff); - rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, fl, - th); - if (rc) - GOTO(stop, rc); - } + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf, + XATTR_NAME_FID, 0, th); + if (rc) + GOTO(stop, rc); rc = ofd_trans_start(env, ofd, fo, th); if (rc) GOTO(stop, rc); + ofd_write_lock(env, fo); + + if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) + tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid, + info->fti_xid); + + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) + GOTO(unlock, rc); + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th); if (rc) - GOTO(stop, rc); + GOTO(unlock, rc); + + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); rc = dt_attr_set(env, dob, la, th); if (rc) - GOTO(stop, rc); + GOTO(unlock, rc); if (fl) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) - GOTO(stop, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) + ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) + le32_add_cpu(&ff->ff_parent.f_oid, -1); + else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) + GOTO(unlock, rc); + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, XATTR_NAME_FID, fl, th); if (!rc) filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } - GOTO(stop, rc); + GOTO(unlock, rc); +unlock: + ofd_write_unlock(env, fo); stop: rc2 = ofd_trans_stop(env, ofd, th, rc); if (rc2 != 0) @@ -865,9 +871,7 @@ stop: ofd_name(ofd), rc2); if (!rc) rc = rc2; -unlock: - ofd_write_unlock(env, fo); - +out: return rc; } @@ -895,13 +899,12 @@ int ofd_destroy(const struct lu_env *env, struct ofd_object *fo, ENTRY; - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(out, rc = -ENOENT); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) - GOTO(unlock, rc = PTR_ERR(th)); + GOTO(out, rc = PTR_ERR(th)); rc = dt_declare_ref_del(env, ofd_object_child(fo), th); if (rc < 0) @@ -918,10 +921,16 @@ int ofd_destroy(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(stop, rc = -ENOENT); + tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid); dt_ref_del(env, ofd_object_child(fo), th); dt_destroy(env, ofd_object_child(fo), th); + ofd_write_unlock(env, fo); + stop: rc2 = ofd_trans_stop(env, ofd, th, rc); if (rc2) @@ -929,8 +938,7 @@ stop: ofd_name(ofd), rc2); if (!rc) rc = rc2; -unlock: - ofd_write_unlock(env, fo); +out: RETURN(rc); } diff --git a/lustre/osd-ldiskfs/osd_quota.c b/lustre/osd-ldiskfs/osd_quota.c index 6a0dd48..03552be 100644 --- a/lustre/osd-ldiskfs/osd_quota.c +++ b/lustre/osd-ldiskfs/osd_quota.c @@ -580,31 +580,20 @@ int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh, if (obj != NULL) inode = obj->oo_inode; - /* root ID entry should be always present in the quota file */ if (qi->lqi_id.qid_uid == 0) { + /* root ID should be always present in the quota file */ crd = 1; } else { - /* - * used space for this ID could be dropped to zero, + /* can't rely on the current state as it can change + * by the execution. + * if used space for this ID could be dropped to zero, * reserve extra credits for removing ID entry from * the quota file */ if (qi->lqi_space < 0) crd = LDISKFS_QUOTA_DEL_BLOCKS(osd_sb(dev)); - /* - * reserve credits for adding ID entry to the quota - * file if the i_dquot isn't initialized yet. - */ - else if (inode == NULL || -#ifdef HAVE_EXT4_INFO_DQUOT - LDISKFS_I(inode)->i_dquot[qi->lqi_type] == - NULL) -#else - inode->i_dquot[qi->lqi_type] == NULL) -#endif - crd = LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev)); else - crd = 1; + crd = LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev)); } osd_trans_declare_op(env, oh, OSD_OT_QUOTA, crd); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 529a6fd..e482e56 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -1467,7 +1467,8 @@ static int after_reply(struct ptlrpc_request *req) ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { time64_t now = ktime_get_real_seconds(); - DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS"); + DEBUG_REQ(req->rq_nr_resend > 0 ? D_ERROR : D_RPCTRACE, req, + "Resending request on EINPROGRESS"); spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); -- 1.8.3.1