X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_open.c;h=f5a244a16673ff03b8a5f383c2ead14118e5b0a1;hp=0cb4d93f06afd964863edead36ec4283179ff852;hb=ff8198e01924600ae68b4a7f44378989b74c5882;hpb=99cf16e16b815702404d1109cd07ade5ff7d656e diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 0cb4d93..f5a244a 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -664,7 +664,8 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode) } static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p, - struct mdt_object *o, __u64 flags, int created) + struct mdt_object *o, __u64 flags, int created, + struct ldlm_reply *rep) { struct ptlrpc_request *req = mdt_info_req(info); struct mdt_export_data *med = &req->rq_export->exp_mdt_data; @@ -692,6 +693,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p, rc = mdt_create_data(info, p, o); if (rc) RETURN(rc); + + if (exp_connect_flags(req->rq_export) & OBD_CONNECT_DISP_STRIPE) + mdt_set_disposition(info, rep, DISP_OPEN_STRIPE); } CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n", @@ -713,13 +717,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p, repbody->ioepoch = o->mot_ioepoch; } } else if (flags & MDS_FMODE_EXEC) { - /* if file is released, we can't deny write because we must - * restore (write) it to access it.*/ - if ((ma->ma_valid & MA_HSM) && - (ma->ma_hsm.mh_flags & HS_RELEASED)) - rc = 0; - else - rc = mdt_write_deny(o); + rc = mdt_write_deny(o); } if (rc) RETURN(rc); @@ -813,13 +811,14 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p, RETURN(rc); err_out: - if (flags & FMODE_WRITE) - /* XXX We also need to close io epoch here. - * See LU-1220 - green */ - mdt_write_put(o); - else if (flags & FMODE_EXEC) - mdt_write_allow(o); - return rc; + if (flags & FMODE_WRITE) + /* XXX We also need to close io epoch here. + * See LU-1220 - green */ + mdt_write_put(o); + else if (flags & MDS_FMODE_EXEC) + mdt_write_allow(o); + + return rc; } int mdt_finish_open(struct mdt_thread_info *info, @@ -905,7 +904,7 @@ int mdt_finish_open(struct mdt_thread_info *info, } #endif - if (info->mti_mdt->mdt_opts.mo_mds_capa && + if (info->mti_mdt->mdt_lut.lut_mds_capa && exp_connect_flags(exp) & OBD_CONNECT_MDS_CAPA) { struct lustre_capa *capa; @@ -917,8 +916,7 @@ int mdt_finish_open(struct mdt_thread_info *info, RETURN(rc); repbody->valid |= OBD_MD_FLMDSCAPA; } - - if (info->mti_mdt->mdt_opts.mo_oss_capa && + if (info->mti_mdt->mdt_lut.lut_oss_capa && exp_connect_flags(exp) & OBD_CONNECT_OSS_CAPA && S_ISREG(lu_object_attr(&o->mot_obj))) { struct lustre_capa *capa; @@ -986,15 +984,15 @@ int mdt_finish_open(struct mdt_thread_info *info, repbody->valid |= OBD_MD_FLEASIZE; } mdt_set_disposition(info, rep, DISP_OPEN_OPEN); - RETURN(0); - } - } + RETURN(0); + } + } - rc = mdt_mfd_open(info, p, o, flags, created); + rc = mdt_mfd_open(info, p, o, flags, created, rep); if (!rc) mdt_set_disposition(info, rep, DISP_OPEN_OPEN); - RETURN(rc); + RETURN(rc); } extern void mdt_req_from_lcd(struct ptlrpc_request *req, @@ -1222,11 +1220,7 @@ static int mdt_object_open_lock(struct mdt_thread_info *info, if (open_flags & MDS_OPEN_LOCK) { if (open_flags & FMODE_WRITE) lm = LCK_CW; - /* if file is released, we can't deny write because we must - * restore (write) it to access it. */ - else if ((open_flags & MDS_FMODE_EXEC) && - !((ma->ma_valid & MA_HSM) && - (ma->ma_hsm.mh_flags & HS_RELEASED))) + else if (open_flags & MDS_FMODE_EXEC) lm = LCK_PR; else lm = LCK_CR; @@ -1283,6 +1277,15 @@ static int mdt_object_open_lock(struct mdt_thread_info *info, ", open_flags = "LPO64"\n", PFID(mdt_object_fid(obj)), open_flags); + /* We cannot enqueue another lock for the same resource we + * already have a lock for, due to mechanics of waiting list + * iterating in ldlm, see LU-3601. + * As such we'll drop the open lock we just got above here, + * it's ok not to have this open lock as it's main purpose is to + * flush unused cached client open handles. */ + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) + mdt_object_unlock(info, obj, lhc, 1); + LASSERT(!try_layout); mdt_lock_handle_init(ll); mdt_lock_reg_init(ll, LCK_EX); @@ -1372,12 +1375,13 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info, rc = 1; } - if (rc != 0) { + if (rc != 0 || !lustre_handle_is_used(&lhc->mlh_reg_lh)) { struct ldlm_reply *ldlm_rep; ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK); - mdt_object_unlock(info, obj, lhc, 1); + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) + mdt_object_unlock(info, obj, lhc, 1); } RETURN_EXIT; } @@ -1492,12 +1496,6 @@ out: return rc; } -int mdt_pin(struct mdt_thread_info* info) -{ - ENTRY; - RETURN(err_serious(-EOPNOTSUPP)); -} - /* Cross-ref request. Currently it can only be a pure open (w/o create) */ static int mdt_cross_open(struct mdt_thread_info *info, const struct lu_fid *parent_fid, @@ -1572,7 +1570,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) __u64 create_flags = info->mti_spec.sp_cr_flags; __u64 ibits = 0; struct mdt_reint_record *rr = &info->mti_rr; - struct lu_name *lname; int result, rc; int created = 0; __u32 msg_flags; @@ -1606,11 +1603,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) info->mti_spec.u.sp_ea.eadata == NULL) GOTO(out, result = err_serious(-EINVAL)); - CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") " - "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n", - PFID(rr->rr_fid1), rr->rr_name, - PFID(rr->rr_fid2), create_flags, - ma->ma_attr.la_mode, msg_flags); + CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") " + "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n", + PFID(rr->rr_fid1), PNAME(&rr->rr_name), + PFID(rr->rr_fid2), create_flags, + ma->ma_attr.la_mode, msg_flags); + if (info->mti_cross_ref) { /* This is cross-ref open */ mdt_set_disposition(info, ldlm_rep, @@ -1638,7 +1636,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) GOTO(out, result = -EFAULT); } CDEBUG(D_INFO, "No object(1), continue as regular open.\n"); - } else if ((rr->rr_namelen == 0 && create_flags & MDS_OPEN_LOCK) || + } else if ((!lu_name_is_valid(&rr->rr_name) && + (create_flags & MDS_OPEN_LOCK)) || (create_flags & MDS_OPEN_BY_FID)) { result = mdt_open_by_fid_lock(info, ldlm_rep, lhc); /* If result is 0 then open by FID has found the file @@ -1653,9 +1652,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) result != -EREMOTE) GOTO(out, result); - if (unlikely(rr->rr_namelen == 0)) - GOTO(out, result = -EINVAL); - CDEBUG(D_INFO, "No object(2), continue as regular open.\n"); } @@ -1665,9 +1661,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) mdt_set_disposition(info, ldlm_rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD)); + if (!lu_name_is_valid(&rr->rr_name)) + GOTO(out, result = -EPROTO); + lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ? - LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen); + mdt_lock_pdo_init(lh, + (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR, + &rr->rr_name); parent = mdt_object_find_lock(info, rr->rr_fid1, lh, MDS_INODELOCK_UPDATE); @@ -1681,12 +1681,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) fid_zero(child_fid); - lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); - result = mdo_lookup(info->mti_env, mdt_object_child(parent), - lname, child_fid, &info->mti_spec); - LASSERTF(ergo(result == 0, fid_is_sane(child_fid)), - "looking for "DFID"/%s, result fid="DFID"\n", - PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid)); + result = mdo_lookup(info->mti_env, mdt_object_child(parent), + &rr->rr_name, child_fid, &info->mti_spec); + + LASSERTF(ergo(result == 0, fid_is_sane(child_fid)), + "looking for "DFID"/"DNAME", found FID = "DFID"\n", + PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name), + PFID(child_fid)); if (result != 0 && result != -ENOENT && result != -ESTALE) GOTO(out_parent, result); @@ -1734,12 +1735,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1)) GOTO(out_child, result = -EPERM); - /* save versions in reply */ - mdt_version_get_save(info, parent, 0); - mdt_version_get_save(info, child, 1); + /* save versions in reply */ + mdt_version_get_save(info, parent, 0); + mdt_version_get_save(info, child, 1); - /* version of child will be changed */ - info->mti_mos = child; + /* version of child will be changed */ + tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child)); /* Not found and with MDS_OPEN_CREAT: let's create it. */ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE); @@ -1755,12 +1756,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) info->mti_spec.sp_cr_lookup = 0; info->mti_spec.sp_feat = &dt_directory_features; - result = mdo_create(info->mti_env, - mdt_object_child(parent), - lname, - mdt_object_child(child), - &info->mti_spec, - &info->mti_attr); + result = mdo_create(info->mti_env, + mdt_object_child(parent), + &rr->rr_name, + mdt_object_child(child), + &info->mti_spec, + &info->mti_attr); if (result == -ERESTART) { mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE); GOTO(out_child, result); @@ -1816,30 +1817,38 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) else result = -EREMOTE; GOTO(out_child, result); + } else if (mdt_object_exists(child)) { + /* We have to get attr & LOV EA & HSM for this + * object. */ + ma->ma_need |= MA_HSM; + result = mdt_attr_get_complex(info, child, ma); } else { - if (mdt_object_exists(child)) { - /* We have to get attr & LOV EA & HSM for this - * object */ - ma->ma_need |= MA_HSM; - result = mdt_attr_get_complex(info, child, ma); - } else { - /*object non-exist!!!*/ - LBUG(); - } + /* Object does not exist. Likely FS corruption. */ + CERROR("%s: name '"DNAME"' present, but FID " + DFID" is invalid\n", + mdt_obd_name(info->mti_mdt), + PNAME(&rr->rr_name), PFID(child_fid)); + GOTO(out_child, result = -EIO); } } - LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh)); - - /* get openlock if this is not replay and if a client requested it */ - if (!req_is_replay(req)) { - rc = mdt_object_open_lock(info, child, lhc, &ibits); - if (rc != 0) - GOTO(out_child_unlock, result = rc); - else if (create_flags & MDS_OPEN_LOCK) + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { + /* the open lock might already be gotten in + * mdt_intent_fixup_resent */ + LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); + if (create_flags & MDS_OPEN_LOCK) mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK); + } else { + /* get openlock if this isn't replay and client requested it */ + if (!req_is_replay(req)) { + rc = mdt_object_open_lock(info, child, lhc, &ibits); + if (rc != 0) + GOTO(out_child_unlock, result = rc); + else if (create_flags & MDS_OPEN_LOCK) + mdt_set_disposition(info, ldlm_rep, + DISP_OPEN_LOCK); + } } - /* Try to open it now. */ rc = mdt_finish_open(info, parent, child, create_flags, created, ldlm_rep); @@ -1863,7 +1872,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) rc = mdo_unlink(info->mti_env, mdt_object_child(parent), mdt_object_child(child), - lname, + &rr->rr_name, &info->mti_attr, 0); if (rc != 0) CERROR("%s: "DFID" cleanup of open: rc = %d\n", @@ -1898,8 +1907,10 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info, struct lu_fid *local_root_fid = &info->mti_tmp_fid1; struct mdt_object *obj = NULL; struct mdt_object *local_root; - static const char name[] = "i_am_nobody"; - struct lu_name *lname; + static const struct lu_name lname = { + .ln_name = "i_am_nobody", + .ln_namelen = sizeof("i_am_nobody") - 1, + }; struct lu_ucred *uc; cfs_cap_t uc_cap_save; int rc; @@ -1929,12 +1940,10 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info, spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE; } - lname = mdt_name(env, (char *)name, sizeof(name) - 1); - uc = lu_ucred(env); uc_cap_save = uc->uc_cap; uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE; - rc = mdo_create(env, mdt_object_child(local_root), lname, + rc = mdo_create(env, mdt_object_child(local_root), &lname, mdt_object_child(obj), spec, attr); uc->uc_cap = uc_cap_save; if (rc < 0) { @@ -1975,6 +1984,9 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, int rc2; ENTRY; + if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY) + RETURN(-EROFS); + data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA); if (data == NULL) RETURN(-EPROTO); @@ -2240,22 +2252,23 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd) RETURN(rc ? rc : ret); } -int mdt_close(struct mdt_thread_info *info) +int mdt_close(struct tgt_session_info *tsi) { + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct ptlrpc_request *req = tgt_ses_req(tsi); struct mdt_export_data *med; struct mdt_file_data *mfd; struct mdt_object *o; struct md_attr *ma = &info->mti_attr; struct mdt_body *repbody = NULL; - struct ptlrpc_request *req = mdt_info_req(info); int rc, ret = 0; ENTRY; mdt_counter_incr(req, LPROC_MDT_CLOSE); - /* Close may come with the Size-on-MDS update. Unpack it. */ - rc = mdt_close_unpack(info); - if (rc) - RETURN(err_serious(rc)); + /* Close may come with the Size-on-MDS update. Unpack it. */ + rc = mdt_close_unpack(info); + if (rc) + GOTO(out, rc = err_serious(rc)); LASSERT(info->mti_ioepoch); @@ -2269,7 +2282,7 @@ int mdt_close(struct mdt_thread_info *info) if (rc == 0) mdt_fix_reply(info); mdt_exit_ucred(info); - RETURN(lustre_msg_get_status(req->rq_repmsg)); + GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg)); } /* Continue to close handle even if we can not pack reply */ @@ -2324,13 +2337,15 @@ int mdt_close(struct mdt_thread_info *info) } mdt_exit_ucred(info); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) - RETURN(err_serious(-ENOMEM)); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) + GOTO(out, rc = err_serious(-ENOMEM)); - if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP, - OBD_FAIL_MDS_CLOSE_NET_REP)) - info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP; - RETURN(rc ? rc : ret); + if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP, + OBD_FAIL_MDS_CLOSE_NET_REP)) + tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP; +out: + mdt_thread_info_fini(info); + RETURN(rc ? rc : ret); } /** @@ -2341,35 +2356,35 @@ int mdt_close(struct mdt_thread_info *info) * and got a trasid. Waiting for such DONE_WRITING is not reliable, so just * skip attributes and reconstruct the reply here. */ -int mdt_done_writing(struct mdt_thread_info *info) +int mdt_done_writing(struct tgt_session_info *tsi) { - struct ptlrpc_request *req = mdt_info_req(info); + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct mdt_thread_info *info = tsi2mdt_info(tsi); struct mdt_body *repbody = NULL; struct mdt_export_data *med; struct mdt_file_data *mfd; int rc; ENTRY; - rc = req_capsule_server_pack(info->mti_pill); - if (rc) - RETURN(err_serious(rc)); + rc = req_capsule_server_pack(tsi->tsi_pill); + if (rc) + GOTO(out, rc = err_serious(rc)); - repbody = req_capsule_server_get(info->mti_pill, - &RMF_MDT_BODY); - repbody->eadatasize = 0; - repbody->aclsize = 0; + repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY); + repbody->eadatasize = 0; + repbody->aclsize = 0; - /* Done Writing may come with the Size-on-MDS update. Unpack it. */ - rc = mdt_close_unpack(info); - if (rc) - RETURN(err_serious(rc)); + /* Done Writing may come with the Size-on-MDS update. Unpack it. */ + rc = mdt_close_unpack(info); + if (rc) + GOTO(out, rc = err_serious(rc)); if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) { mdt_exit_ucred(info); - RETURN(lustre_msg_get_status(req->rq_repmsg)); + GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg)); } - med = &info->mti_exp->exp_mdt_data; + med = &info->mti_exp->exp_mdt_data; spin_lock(&med->med_open_lock); mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle, req_is_replay(req)); @@ -2411,5 +2426,7 @@ int mdt_done_writing(struct mdt_thread_info *info) mdt_empty_transno(info, rc); error_ucred: mdt_exit_ucred(info); +out: + mdt_thread_info_fini(info); RETURN(rc); }