X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_open.c;h=b839eaa7966873ce62712f071afade72503fc7d1;hb=075bea805efe8a7ef1a3aabd8dd2c166bb52115b;hp=43d891cb15f8c8b0e356a4ccf278035368d54531;hpb=591a9b4cebc510ff51f0fdb944e5a81f08fdaf62;p=fs%2Flustre-release.git diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 43d891c..b839eaa 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -44,15 +44,7 @@ #include "mdt_internal.h" #include -/* we do nothing because we do not have refcount now */ -static void mdt_mfd_get(void *mfdp) -{ -} - -static struct portals_handle_ops mfd_open_handle_ops = { - .hop_addref = mdt_mfd_get, - .hop_free = NULL, -}; +static const char mfd_open_handle_owner[] = "mdt"; /* Create a new mdt_file_data struct, initialize it, * and insert it to global hash table */ @@ -63,10 +55,11 @@ struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med) OBD_ALLOC_PTR(mfd); if (mfd != NULL) { - INIT_LIST_HEAD_RCU(&mfd->mfd_open_handle.h_link); - mfd->mfd_open_handle.h_owner = med; + refcount_set(&mfd->mfd_open_handle.h_ref, 1); + INIT_HLIST_NODE(&mfd->mfd_open_handle.h_link); + mfd->mfd_owner = med; INIT_LIST_HEAD(&mfd->mfd_list); - class_handle_hash(&mfd->mfd_open_handle, &mfd_open_handle_ops); + class_handle_hash(&mfd->mfd_open_handle, mfd_open_handle_owner); } RETURN(mfd); @@ -86,9 +79,12 @@ struct mdt_file_data *mdt_open_handle2mfd(struct mdt_export_data *med, ENTRY; LASSERT(open_handle != NULL); - mfd = class_handle2object(open_handle->cookie, med); + mfd = class_handle2object(open_handle->cookie, mfd_open_handle_owner); + if (mfd) + refcount_dec(&mfd->mfd_open_handle.h_ref); + /* during dw/setattr replay the mfd can be found by old handle */ - if (mfd == NULL && is_replay_or_resent) { + if ((!mfd || mfd->mfd_owner != med) && is_replay_or_resent) { list_for_each_entry(mfd, &med->med_open_head, mfd_list) { if (mfd->mfd_open_handle_old.cookie == open_handle->cookie) @@ -103,8 +99,10 @@ struct mdt_file_data *mdt_open_handle2mfd(struct mdt_export_data *med, /* free mfd */ void mdt_mfd_free(struct mdt_file_data *mfd) { + LASSERT(refcount_read(&mfd->mfd_open_handle.h_ref) == 1); LASSERT(list_empty(&mfd->mfd_list)); - OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_open_handle); + OBD_FREE_PRE(mfd, sizeof(*mfd), "rcu"); + kfree_rcu(mfd, mfd_open_handle.h_rcu); } static int mdt_create_data(struct mdt_thread_info *info, @@ -309,7 +307,15 @@ static void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info, struct mdt_object *obj, struct md_attr *ma) { - LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL); + if (ma->ma_lmv || ma->ma_lmm) { + CDEBUG(D_INFO, DFID " %s already set.\n", + PFID(mdt_object_fid(obj)), + ma->ma_lmv ? (ma->ma_lmm ? "ma_lmv and ma_lmm" + : "ma_lmv") + : "ma_lmm"); + return; + } + if (S_ISDIR(obj->mot_header.loh_attr)) { ma->ma_lmv = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD); @@ -394,7 +400,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p, RETURN(rc); rc = mo_open(info->mti_env, mdt_object_child(o), - created ? open_flags | MDS_OPEN_CREATED : open_flags); + created ? open_flags | MDS_OPEN_CREATED : open_flags, + &info->mti_spec); if (rc != 0) { /* If we allow the client to chgrp (CFS_SETGRP_PERM), but the * client does not know which suppgid should be sent to the MDS, @@ -562,7 +569,7 @@ static int mdt_finish_open(struct mdt_thread_info *info, RETURN(-ENOENT); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL if (exp_connect_flags(exp) & OBD_CONNECT_ACL) { struct lu_nodemap *nodemap = nodemap_get_from_exp(exp); if (IS_ERR(nodemap)) @@ -802,8 +809,9 @@ static int mdt_object_open_lock(struct mdt_thread_info *info, bool try_layout = false; bool create_layout = false; int rc = 0; - int dom_stripes = LMM_NO_DOM; - bool dom_lock = false; + __u32 dom_stripe = 0; + unsigned int dom_only = 0; + unsigned int dom_lock = 0; ENTRY; @@ -821,23 +829,23 @@ static int mdt_object_open_lock(struct mdt_thread_info *info, ma->ma_need & MA_LOV) try_layout = true; - /* DoM files can have just MDT stripe or combined MDT + OST - * stripes. - * - In the first case the open for read/write will do IO to - * the MDT stripe and it makes sense to take IO lock in - * advance along with OPEN even if it is blocking lock. - * - In the second case it is just size of MDT stripe and it - * is quite unlikely that client will write into it, though - * it may read it. So IO lock will be taken optionally if it - * is non-blocking one. + /* DoM files can take IO lock at OPEN when it makes sense, + * check if file has DoM stripe and ask for lock if client + * no lock on that resource yet. */ if (ma->ma_valid & MA_LOV && ma->ma_lmm != NULL) - dom_stripes = mdt_lmm_dom_entry(ma->ma_lmm); - - if (dom_stripes == LMM_DOM_ONLY && - info->mti_mdt->mdt_opts.mo_dom_lock > 0 && + dom_stripe = mdt_lmm_dom_entry_check(ma->ma_lmm, + &dom_only); + /* If only DOM stripe is being used then we can expect IO + * to it after OPEN and will return corresponding DOM ibit + * using default strategy from mdt_opts.mo_dom_lock. + * Otherwise trylock mode is used always and DOM ibit will + * be returned optionally. + */ + if (dom_stripe && !mdt_dom_client_has_lock(info, mdt_object_fid(obj))) - dom_lock = true; + dom_lock = !dom_only ? TRYLOCK_DOM_ON_OPEN : + info->mti_mdt->mdt_opts.mo_dom_lock; } if (acq_lease) { @@ -892,17 +900,7 @@ static int mdt_object_open_lock(struct mdt_thread_info *info, lhc = &info->mti_lh[MDT_LH_LOCAL]; } else if (dom_lock) { lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR; - if (info->mti_mdt->mdt_opts.mo_dom_lock == - TRYLOCK_DOM_ON_OPEN) { - trybits |= MDS_INODELOCK_DOM | - MDS_INODELOCK_LAYOUT; - } else { - /* mo_dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */ - *ibits = MDS_INODELOCK_DOM; - if (info->mti_mdt->mdt_opts.mo_dom_read_open) { - trybits |= MDS_INODELOCK_LAYOUT; - } - } + trybits |= MDS_INODELOCK_DOM | MDS_INODELOCK_LAYOUT; } CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n", @@ -1219,7 +1217,13 @@ static int mdt_cross_open(struct mdt_thread_info *info, if (rc != 0) GOTO(out, rc); - mdt_pack_secctx_in_reply(info, o); + rc = mdt_pack_secctx_in_reply(info, o); + if (unlikely(rc)) + GOTO(out, rc); + + rc = mdt_pack_encctx_in_reply(info, o); + if (unlikely(rc)) + GOTO(out, rc); rc = mdt_finish_open(info, NULL, o, open_flags, 0, rep); } else { @@ -1309,13 +1313,14 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) int result, rc; int created = 0; int object_locked = 0; + enum ldlm_mode lock_mode = LCK_PR; u32 msg_flags; + ktime_t kstart = ktime_get(); ENTRY; OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE, (obd_timeout + 1) / 4); - mdt_counter_incr(req, LPROC_MDT_OPEN); repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); ma->ma_need = MA_INODE; @@ -1361,15 +1366,11 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) * via a regular replay. */ if (!(open_flags & MDS_OPEN_CREAT)) { DEBUG_REQ(D_ERROR, req, - "OPEN & CREAT not in open replay/by_fid."); + "OPEN & CREAT not in open replay/by_fid"); GOTO(out, result = -EFAULT); } CDEBUG(D_INFO, "No object(1), continue as regular open.\n"); - } else if (open_flags & (MDS_OPEN_BY_FID | MDS_OPEN_LOCK)) { - /* - * MDS_OPEN_LOCK is checked for backward compatibility with 2.1 - * client. - */ + } else if (open_flags & MDS_OPEN_BY_FID) { result = mdt_open_by_fid_lock(info, ldlm_rep, lhc); if (result < 0) CDEBUG(D_INFO, "no object for "DFID": %d\n", @@ -1390,82 +1391,60 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) if (result < 0) GOTO(out, result); -again: - lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh, (open_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR, - &rr->rr_name); - parent = mdt_object_find(info->mti_env, mdt, rr->rr_fid1); if (IS_ERR(parent)) GOTO(out, result = PTR_ERR(parent)); - result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE); - if (result != 0) { + /* get and check version of parent */ + result = mdt_version_get_check(info, parent, 0); + if (result) { mdt_object_put(info->mti_env, parent); GOTO(out, result); } - /* get and check version of parent */ - result = mdt_version_get_check(info, parent, 0); - if (result) - GOTO(out_parent, result); + OBD_RACE(OBD_FAIL_MDS_REINT_OPEN); +again_pw: + lh = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name); + result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE); + if (result != 0) { + mdt_object_put(info->mti_env, parent); + GOTO(out, result); + } fid_zero(child_fid); - result = mdo_lookup(info->mti_env, mdt_object_child(parent), - &rr->rr_name, child_fid, &info->mti_spec); + result = -ENOENT; + if ((open_flags & MDS_OPEN_VOLATILE) == 0) + result = mdo_lookup(info->mti_env, mdt_object_child(parent), + &rr->rr_name, child_fid, &info->mti_spec); LASSERTF(ergo(result == 0, fid_is_sane(child_fid)), "looking for "DFID"/"DNAME", found FID = "DFID"\n", PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name), PFID(child_fid)); - if (result != 0 && result != -ENOENT && result != -ESTALE) + if (result != 0 && result != -ENOENT) GOTO(out_parent, result); - if (result == -ENOENT || result == -ESTALE) { - /* If the object is dead, let's check if the object - * is being migrated to a new object */ - if (result == -ESTALE) { - struct lu_buf lmv_buf; - - lmv_buf.lb_buf = info->mti_xattr_buf; - lmv_buf.lb_len = sizeof(info->mti_xattr_buf); - rc = mo_xattr_get(info->mti_env, - mdt_object_child(parent), - &lmv_buf, XATTR_NAME_LMV); - if (rc > 0) { - struct lmv_mds_md_v1 *lmv; - - lmv = lmv_buf.lb_buf; - if (le32_to_cpu(lmv->lmv_hash_type) & - LMV_HASH_FLAG_MIGRATION) { - /* Get the new parent FID and retry */ - mdt_object_unlock_put(info, parent, - lh, 1); - mdt_lock_handle_init(lh); - fid_le_to_cpu( - (struct lu_fid *)rr->rr_fid1, - &lmv->lmv_stripe_fids[1]); - goto again; - } - } - } + OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2); + if (result == -ENOENT) { mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); - if (result == -ESTALE) { - /* - * -ESTALE means the parent is a dead(unlinked) dir, so - * it should return -ENOENT to in accordance with the - * original mds implementaion. - */ - GOTO(out_parent, result = -ENOENT); - } - if (!(open_flags & MDS_OPEN_CREAT)) GOTO(out_parent, result); if (mdt_rdonly(req->rq_export)) GOTO(out_parent, result = -EROFS); + + if (lock_mode == LCK_PR) { + /* first pass: get write lock and restart */ + mdt_object_unlock(info, parent, lh, 1); + mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + mdt_lock_handle_init(lh); + lock_mode = LCK_PW; + goto again_pw; + } + *child_fid = *info->mti_rr.rr_fid2; LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n", PFID(child_fid)); @@ -1491,7 +1470,8 @@ again: if (result == -ENOENT) { /* Create under OBF and .lustre is not permitted */ - if (!fid_is_md_operative(rr->rr_fid1)) + if (!fid_is_md_operative(rr->rr_fid1) && + (open_flags & MDS_OPEN_VOLATILE) == 0) GOTO(out_child, result = -EPERM); /* save versions in reply */ @@ -1528,7 +1508,8 @@ again: GOTO(out_child, result); } created = 1; - mdt_counter_incr(req, LPROC_MDT_MKNOD); + mdt_counter_incr(req, LPROC_MDT_MKNOD, + ktime_us_delta(ktime_get(), kstart)); } else { /* * The object is on remote node, return its FID for remote open. @@ -1559,6 +1540,14 @@ again: result = -MDT_EREMOTE_OPEN; GOTO(out_child, result); } else if (mdt_object_exists(child)) { + /* Check early for MDS_OPEN_DIRECTORY/O_DIRECTORY to + * avoid opening regular files from lfs getstripe + * since doing so breaks the leases used by lfs + * mirror. See LU-13693. */ + if (open_flags & MDS_OPEN_DIRECTORY && + S_ISREG(lu_object_attr(&child->mot_obj))) + GOTO(out_child, result = -ENOTDIR); + /* We have to get attr & LOV EA & HSM for this * object. */ mdt_prep_ma_buf_from_rep(info, child, ma); @@ -1575,7 +1564,16 @@ again: } } - mdt_pack_secctx_in_reply(info, child); + repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize; + repbody->mbo_valid |= OBD_MD_FLMODEASIZE; + + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) + GOTO(out_child, result = rc); + + rc = mdt_pack_encctx_in_reply(info, child); + if (unlikely(rc)) + GOTO(out_child, result = rc); rc = mdt_check_resent_lock(info, child, lhc); if (rc < 0) { @@ -1629,6 +1627,10 @@ again: mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE); } } + + mdt_counter_incr(req, LPROC_MDT_OPEN, + ktime_us_delta(ktime_get(), kstart)); + EXIT; out_child_unlock: if (object_locked) @@ -1693,7 +1695,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info, uc = lu_ucred(env); uc_cap_save = uc->uc_cap; - uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE; + uc->uc_cap |= BIT(CAP_DAC_OVERRIDE); rc = mdo_create(env, mdt_object_child(local_root), &lname, mdt_object_child(obj), spec, attr); uc->uc_cap = uc_cap_save; @@ -1703,7 +1705,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info, GOTO(out, rc); } - rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED); + rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED, spec); if (rc < 0) CERROR("%s: cannot open volatile file "DFID", orphan " "file will be left in PENDING directory until " @@ -1746,6 +1748,22 @@ static inline int mdt_hsm_set_released(struct lov_mds_md *lmm) return 0; } +static inline int mdt_get_lmm_gen(struct lov_mds_md *lmm, __u32 *gen) +{ + struct lov_comp_md_v1 *comp_v1; + + if (le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_COMP_V1)) { + comp_v1 = (struct lov_comp_md_v1 *)lmm; + *gen = le32_to_cpu(comp_v1->lcm_layout_gen); + } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 || + le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) { + *gen = le16_to_cpu(lmm->lmm_layout_gen); + } else { + return -EINVAL; + } + return 0; +} + static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma) { @@ -1805,19 +1823,66 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, if (rc != 0) GOTO(out_unlock, rc); - if (!mdt_hsm_release_allow(ma)) - GOTO(out_unlock, rc = -EPERM); - - /* already released? */ - if (ma->ma_hsm.mh_flags & HS_RELEASED) - GOTO(out_unlock, rc = 0); + if (ma->ma_attr_flags & MDS_PCC_ATTACH) { + if (ma->ma_valid & MA_HSM) { + if (ma->ma_hsm.mh_flags & HS_RELEASED) + GOTO(out_unlock, rc = -EALREADY); + + if (ma->ma_hsm.mh_arch_id != data->cd_archive_id) + CDEBUG(D_CACHE, + DFID" archive id diff: %llu:%u\n", + PFID(mdt_object_fid(o)), + ma->ma_hsm.mh_arch_id, + data->cd_archive_id); + + if (!(ma->ma_hsm.mh_flags & HS_DIRTY) && + ma->ma_hsm.mh_arch_ver == data->cd_data_version) { + CDEBUG(D_CACHE, + DFID" data version matches: packed=%llu " + "and on-disk=%llu\n", + PFID(mdt_object_fid(o)), + data->cd_data_version, + ma->ma_hsm.mh_arch_ver); + ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS; + } - /* Compare on-disk and packed data_version */ - if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) { - CDEBUG(D_HSM, DFID" data_version mismatches: packed=%llu" - " and on-disk=%llu\n", PFID(mdt_object_fid(o)), - data->cd_data_version, ma->ma_hsm.mh_arch_ver); - GOTO(out_unlock, rc = -EPERM); + if (ma->ma_hsm.mh_flags & HS_DIRTY) + ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS; + } else { + /* Set up HSM attribte for PCC archived object */ + BUILD_BUG_ON(sizeof(struct hsm_attrs) > + sizeof(info->mti_xattr_buf)); + buf = &info->mti_buf; + buf->lb_buf = info->mti_xattr_buf; + buf->lb_len = sizeof(struct hsm_attrs); + memset(&ma->ma_hsm, 0, sizeof(ma->ma_hsm)); + ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS; + ma->ma_hsm.mh_arch_id = data->cd_archive_id; + ma->ma_hsm.mh_arch_ver = data->cd_data_version; + lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm); + + rc = mo_xattr_set(info->mti_env, mdt_object_child(o), + buf, XATTR_NAME_HSM, 0); + if (rc) + GOTO(out_unlock, rc); + } + } else { + if (!mdt_hsm_release_allow(ma)) + GOTO(out_unlock, rc = -EPERM); + + /* already released? */ + if (ma->ma_hsm.mh_flags & HS_RELEASED) + GOTO(out_unlock, rc = 0); + + /* Compare on-disk and packed data_version */ + if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) { + CDEBUG(D_HSM, DFID" data_version mismatches: " + "packed=%llu and on-disk=%llu\n", + PFID(mdt_object_fid(o)), + data->cd_data_version, + ma->ma_hsm.mh_arch_ver); + GOTO(out_unlock, rc = -EPERM); + } } ma->ma_valid = MA_INODE; @@ -1885,7 +1950,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, } /* Set up HSM attribute for orphan object */ - CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf)); + BUILD_BUG_ON(sizeof(struct hsm_attrs) > sizeof(info->mti_xattr_buf)); buf = &info->mti_buf; buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(struct hsm_attrs); @@ -1902,7 +1967,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, /* The orphan has root ownership so we need to raise * CAP_FOWNER to set the HSM attributes. */ cap = uc->uc_cap; - uc->uc_cap |= MD_CAP_TO_MASK(CFS_CAP_FOWNER); + uc->uc_cap |= MD_CAP_TO_MASK(CAP_FOWNER); rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf, XATTR_NAME_HSM, 0); uc->uc_cap = cap; @@ -1913,6 +1978,12 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, rc = mo_swap_layouts(info->mti_env, mdt_object_child(o), mdt_object_child(orphan), SWAP_LAYOUTS_MDS_HSM); + + if (!rc && ma->ma_attr_flags & MDS_PCC_ATTACH) { + ma->ma_need = MA_LOV; + rc = mdt_attr_get_complex(info, o, ma); + } + EXIT; out_layout_lock: @@ -1939,10 +2010,17 @@ out_unlock: repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(repbody != NULL); repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED; + if (ma->ma_attr_flags & MDS_PCC_ATTACH) { + LASSERT(ma->ma_valid & MA_LOV); + rc = mdt_get_lmm_gen(ma->ma_lmm, + &repbody->mbo_layout_gen); + if (!rc) + repbody->mbo_valid |= OBD_MD_LAYOUT_VERSION; + } } out_reprocess: - ldlm_reprocess_all(lease->l_resource); + ldlm_reprocess_all(lease->l_resource, lease); LDLM_LOCK_PUT(lease); ma->ma_valid = 0; @@ -1958,7 +2036,7 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info, struct mdt_lock_handle *lh2 = &info->mti_lh[MDT_LH_OLD]; struct close_data *data; struct ldlm_lock *lease; - struct mdt_object *o1 = o, *o2; + struct mdt_object *o1 = o, *o2 = NULL; bool lease_broken; bool swap_objects; int rc; @@ -1992,10 +2070,11 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info, if (IS_ERR(o2)) GOTO(out_lease, rc = PTR_ERR(o2)); - if (!S_ISREG(lu_object_attr(&o2->mot_obj))) { - swap_objects = false; /* not swapped yet */ + if (!mdt_object_exists(o2)) + GOTO(out_obj, rc = -ENOENT); + + if (!S_ISREG(lu_object_attr(&o2->mot_obj))) GOTO(out_obj, rc = -EINVAL); - } if (swap_objects) swap(o1, o2); @@ -2103,9 +2182,11 @@ out_unlock_sem: } out_obj: - mdt_object_put(info->mti_env, swap_objects ? o1 : o2); + /* Callee takes care of o, we must put the other one. We know + * that o1 != o2 from check of lu_fid_cmp() above. */ + mdt_object_put(info->mti_env, o1 != o ? o1 : o2); - ldlm_reprocess_all(lease->l_resource); + ldlm_reprocess_all(lease->l_resource, lease); out_lease: LDLM_LOCK_PUT(lease); @@ -2121,13 +2202,15 @@ out_lease: static int mdt_close_resync_done(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma) { - struct close_data *data; - struct ldlm_lock *lease; - struct md_layout_change layout = { 0 }; - __u32 *resync_ids = NULL; - size_t resync_count = 0; - bool lease_broken; - int rc; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LOCAL]; + struct close_data *data; + struct ldlm_lock *lease; + struct md_layout_change layout = { 0 }; + __u32 *resync_ids = NULL; + size_t resync_count = 0; + bool lease_broken; + int rc; + ENTRY; if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY) @@ -2181,7 +2264,7 @@ static int mdt_close_resync_done(struct mdt_thread_info *info, RCL_CLIENT)) GOTO(out_unlock, rc = -EPROTO); - OBD_ALLOC(resync_ids, resync_count * sizeof(__u32)); + OBD_ALLOC_PTR_ARRAY(resync_ids, resync_count); if (!resync_ids) GOTO(out_unlock, rc = -ENOMEM); @@ -2200,10 +2283,12 @@ static int mdt_close_resync_done(struct mdt_thread_info *info, layout.mlc_som.lsa_size = ma->ma_attr.la_size; layout.mlc_som.lsa_blocks = ma->ma_attr.la_blocks; } - rc = mdt_layout_change(info, o, &layout); + rc = mdt_layout_change(info, o, lhc, &layout); if (rc) GOTO(out_unlock, rc); + mdt_object_unlock(info, o, lhc, 0); + EXIT; out_unlock: @@ -2219,10 +2304,10 @@ out_unlock: } if (resync_ids) - OBD_FREE(resync_ids, resync_count * sizeof(__u32)); + OBD_FREE_PTR_ARRAY(resync_ids, resync_count); out_reprocess: - ldlm_reprocess_all(lease->l_resource); + ldlm_reprocess_all(lease->l_resource, lease); LDLM_LOCK_PUT(lease); ma->ma_valid = 0; @@ -2254,8 +2339,10 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd) intent = ma->ma_attr_flags & MDS_CLOSE_INTENT; *ofid = *mdt_object_fid(o); - CDEBUG(D_INODE, "%s: close file "DFID" with intent: %llx\n", - mdt_obd_name(info->mti_mdt), PFID(ofid), intent); + /* the below message is checked in replay-single.sh test_46 */ + CDEBUG(D_INODE, "%s: %sclosing file handle "DFID" with intent: %llx\n", + mdt_obd_name(info->mti_mdt), + ma->ma_valid & MA_FORCE_LOG ? "force " : "", PFID(ofid), intent); switch (intent) { case MDS_HSM_RELEASE: { @@ -2307,15 +2394,24 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd) else if (open_flags & MDS_FMODE_EXEC) mdt_write_allow(o); - /* Update atime on close only. */ + /* Update atime|mtime|ctime on close. */ if ((open_flags & MDS_FMODE_EXEC || open_flags & MDS_FMODE_READ || open_flags & MDS_FMODE_WRITE) && (ma->ma_valid & MA_INODE) && - (ma->ma_attr.la_valid & LA_ATIME)) { - /* Set the atime only. */ - ma->ma_valid = MA_INODE; - ma->ma_attr.la_valid = LA_ATIME; - rc = mo_attr_set(info->mti_env, next, ma); - } + (ma->ma_attr.la_valid & LA_ATIME || + ma->ma_attr.la_valid & LA_MTIME || + ma->ma_attr.la_valid & LA_CTIME)) { + ma->ma_valid = MA_INODE; + ma->ma_attr_flags |= MDS_CLOSE_UPDATE_TIMES; + ma->ma_attr.la_valid &= (LA_ATIME | LA_MTIME | LA_CTIME); + + if (ma->ma_attr.la_valid & LA_MTIME) { + rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid); + if (!rc) + ma->ma_valid |= MA_PFID; + } + + rc = mo_attr_set(info->mti_env, next, ma); + } /* If file data is modified, add the dirty flag. */ if (ma->ma_attr_flags & MDS_DATA_MODIFIED) @@ -2382,10 +2478,10 @@ int mdt_close(struct tgt_session_info *tsi) struct ptlrpc_request *req = tgt_ses_req(tsi); struct md_attr *ma = &info->mti_attr; struct mdt_body *repbody = NULL; + ktime_t kstart = ktime_get(); int rc, ret = 0; ENTRY; - mdt_counter_incr(req, LPROC_MDT_CLOSE); /* Close may come with the Size-on-MDS update. Unpack it. */ rc = mdt_close_unpack(info); if (rc) @@ -2440,5 +2536,8 @@ int mdt_close(struct tgt_session_info *tsi) tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP; out: mdt_thread_info_fini(info); + if (rc == 0) + mdt_counter_incr(req, LPROC_MDT_CLOSE, + ktime_us_delta(ktime_get(), kstart)); RETURN(rc ? rc : ret); }