X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=3b6918b137d5f0e6489f42210515ff60a321e468;hp=923e42a5c08ec994053da658238547d4883be5ed;hb=5e11d744becb81e79930b23ea0f998c7e7f07c08;hpb=fca35f74f9ec5c5ed77e774f3e3209d9df057a01 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 923e42a..3b6918b 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -555,7 +555,7 @@ int mdt_pack_size2body(struct mdt_thread_info *info, RETURN(0); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL /* * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap. * @@ -801,10 +801,15 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } else if (info->mti_som_valid) { /* som is valid */ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */ + b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS; + b->mbo_size = ma->ma_som.ms_size; + b->mbo_blocks = ma->ma_som.ms_blocks; } } - if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE)) + if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE || + b->mbo_valid & OBD_MD_FLLAZYSIZE)) CDEBUG(D_VFSTRACE, DFID": returning size %llu\n", PFID(fid), (unsigned long long)b->mbo_size); @@ -933,8 +938,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, buf->lb_len = ma->ma_lmv_size; LASSERT(!(ma->ma_valid & MA_LMV)); } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - buf->lb_buf = ma->ma_lmv; - buf->lb_len = ma->ma_lmv_size; + buf->lb_buf = ma->ma_default_lmv; + buf->lb_len = ma->ma_default_lmv_size; LASSERT(!(ma->ma_valid & MA_LMV_DEF)); } else { return -EINVAL; @@ -967,7 +972,7 @@ got: ma->ma_lmv_size = rc; ma->ma_valid |= MA_LMV; } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - ma->ma_lmv_size = rc; + ma->ma_default_lmv_size = rc; ma->ma_valid |= MA_LMV_DEF; } @@ -1107,8 +1112,8 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, if (need & MA_HSM && S_ISREG(mode)) { buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); - CLASSERT(sizeof(struct hsm_attrs) <= - sizeof(info->mti_xattr_buf)); + BUILD_BUG_ON(sizeof(struct hsm_attrs) > + sizeof(info->mti_xattr_buf)); rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM); rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm); if (rc2 == 0) @@ -1117,7 +1122,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc = rc2); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL if (need & MA_ACL_DEF && S_ISDIR(mode)) { buf->lb_buf = ma->ma_acl; buf->lb_len = ma->ma_acl_size; @@ -1188,25 +1193,55 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, req->rq_export->exp_client_uuid.uuid); } - /* If it is dir object and client require MEA, then we got MEA */ + /* from 2.12.58 intent_getattr pack default LMV in reply */ if (S_ISDIR(lu_object_attr(&next->mo_lu)) && - (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) { + ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) == + (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) && + req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD, + RCL_SERVER)) { + ma->ma_lmv = buffer->lb_buf; + ma->ma_lmv_size = buffer->lb_len; + ma->ma_default_lmv = req_capsule_server_get(pill, + &RMF_DEFAULT_MDT_MD); + ma->ma_default_lmv_size = req_capsule_get_size(pill, + &RMF_DEFAULT_MDT_MD, + RCL_SERVER); + ma->ma_need = MA_INODE; + if (ma->ma_lmv_size > 0) + ma->ma_need |= MA_LMV; + if (ma->ma_default_lmv_size > 0) + ma->ma_need |= MA_LMV_DEF; + } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) && + (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) { + /* If it is dir and client require MEA, then we got MEA */ /* Assumption: MDT_MD size is enough for lmv size. */ ma->ma_lmv = buffer->lb_buf; ma->ma_lmv_size = buffer->lb_len; ma->ma_need = MA_INODE; if (ma->ma_lmv_size > 0) { - if (reqbody->mbo_valid & OBD_MD_MEA) + if (reqbody->mbo_valid & OBD_MD_MEA) { ma->ma_need |= MA_LMV; - else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) + } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) { ma->ma_need |= MA_LMV_DEF; + ma->ma_default_lmv = buffer->lb_buf; + ma->ma_lmv = NULL; + ma->ma_default_lmv_size = buffer->lb_len; + ma->ma_lmv_size = 0; + } } } else { ma->ma_lmm = buffer->lb_buf; ma->ma_lmm_size = buffer->lb_len; ma->ma_need = MA_INODE | MA_HSM; - if (ma->ma_lmm_size > 0) + if (ma->ma_lmm_size > 0) { ma->ma_need |= MA_LOV; + /* Older clients may crash if they getattr overstriped + * files + */ + if (!exp_connect_overstriping(exp) && + mdt_lmm_is_overstriping(ma->ma_lmm)) + RETURN(-EOPNOTSUPP); + } } if (S_ISDIR(lu_object_attr(&next->mo_lu)) && @@ -1264,8 +1299,13 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, if (!mdt_is_striped_client(req->rq_export)) RETURN(-ENOTSUPP); LASSERT(S_ISDIR(la->la_mode)); - mdt_dump_lmv(D_INFO, ma->ma_lmv); - repbody->mbo_eadatasize = ma->ma_lmv_size; + /* + * when ll_dir_getstripe() gets default LMV, it + * checks mbo_eadatasize. + */ + if (!(ma->ma_valid & MA_LMV)) + repbody->mbo_eadatasize = + ma->ma_default_lmv_size; repbody->mbo_valid |= (OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA); } @@ -1316,7 +1356,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_max_mdsize); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) && (reqbody->mbo_valid & OBD_MD_FLACL)) { struct lu_nodemap *nodemap = nodemap_get_from_exp(exp); @@ -1345,9 +1385,11 @@ static int mdt_getattr(struct tgt_session_info *tsi) int rc, rc2; ENTRY; - reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); - LASSERT(reqbody); - LASSERT(obj != NULL); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); + + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); + LASSERT(reqbody); LASSERT(lu_object_assert_exists(&obj->mot_obj)); /* Special case for Data-on-MDT files to get data version */ @@ -1422,42 +1464,54 @@ out: /** * Handler of layout intent RPC requiring the layout modification * - * \param[in] info thread environment - * \param[in] obj object - * \param[in] layout layout change descriptor + * \param[in] info thread environment + * \param[in] obj object + * \param[out] lhc object ldlm lock handle + * \param[in] layout layout change descriptor * * \retval 0 on success * \retval < 0 error code */ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lhc, struct md_layout_change *layout) { - struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL]; int rc; + ENTRY; if (!mdt_object_exists(obj)) - GOTO(out, rc = -ENOENT); + RETURN(-ENOENT); if (!S_ISREG(lu_object_attr(&obj->mot_obj))) - GOTO(out, rc = -EINVAL); + RETURN(-EINVAL); rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL, MAY_WRITE); if (rc) - GOTO(out, rc); + RETURN(rc); - /* take layout lock to prepare layout change */ - mdt_lock_reg_init(lh, LCK_EX); - rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT); - if (rc) - GOTO(out, rc); + rc = mdt_check_resent_lock(info, obj, lhc); + if (rc < 0) + RETURN(rc); + + if (rc > 0) { + /* not resent */ + mdt_lock_handle_init(lhc); + mdt_lock_reg_init(lhc, LCK_EX); + rc = mdt_reint_object_lock(info, obj, lhc, MDS_INODELOCK_LAYOUT, + false); + if (rc) + RETURN(rc); + } mutex_lock(&obj->mot_som_mutex); rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout); mutex_unlock(&obj->mot_som_mutex); - mdt_object_unlock(info, obj, lh, 1); -out: + + if (rc) + mdt_object_unlock(info, obj, lhc, 1); + RETURN(rc); } @@ -1505,6 +1559,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) RETURN(-EOPNOTSUPP); info = tsi2mdt_info(tsi); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); if (info->mti_dlm_req != NULL) ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); @@ -1626,7 +1682,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, __u64 try_bits = 0; bool is_resent; int ma_need = 0; - bool deal_with_dom = false; int rc; ENTRY; @@ -1680,11 +1735,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } rc = mdt_getattr_internal(info, child, 0); - if (unlikely(rc != 0)) + if (unlikely(rc != 0)) { mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } - mdt_pack_secctx_in_reply(info, child); - + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) + mdt_object_unlock(info, child, lhc, 1); RETURN(rc); } @@ -1799,7 +1857,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_lock_reg_init(lhc, LCK_PR); if (!(child_bits & MDS_INODELOCK_UPDATE) && - mdt_object_exists(child) && !mdt_object_remote(child)) { + !mdt_object_remote(child)) { struct md_attr *ma = &info->mti_attr; ma->ma_valid = 0; @@ -1813,12 +1871,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, * lock and this might save us RPC on later STAT. For * directories, it also let negative dentry cache start * working for this dir. */ - if (ma->ma_valid & MA_INODE && - ma->ma_attr.la_valid & LA_CTIME && - info->mti_mdt->mdt_namespace->ns_ctime_age_limit + - ma->ma_attr.la_ctime < ktime_get_real_seconds()) - child_bits |= MDS_INODELOCK_UPDATE; - } + if (ma->ma_valid & MA_INODE && + ma->ma_attr.la_valid & LA_CTIME && + info->mti_mdt->mdt_namespace->ns_ctime_age_limit + + ma->ma_attr.la_ctime < ktime_get_real_seconds()) + child_bits |= MDS_INODELOCK_UPDATE; + } /* layout lock must be granted in a best-effort way * for IT operations */ @@ -1853,14 +1911,21 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, GOTO(out_child, rc); } - lock = ldlm_handle2lock(&lhc->mlh_reg_lh); - /* finally, we can get attr for child. */ rc = mdt_getattr_internal(info, child, ma_need); if (unlikely(rc != 0)) { mdt_object_unlock(info, child, lhc, 1); - GOTO(out_lock, rc); - } else if (lock) { + GOTO(out_child, rc); + } + + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) { + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } + + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + if (lock) { /* Debugging code. */ LDLM_DEBUG(lock, "Returning lock to client"); LASSERTF(fid_res_name_eq(mdt_object_fid(child), @@ -1870,33 +1935,27 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, PFID(mdt_object_fid(child))); if (S_ISREG(lu_object_attr(&child->mot_obj)) && - mdt_object_exists(child) && !mdt_object_remote(child) && - child != parent) - deal_with_dom = true; - } - - mdt_pack_secctx_in_reply(info, child); - -out_lock: - if (lock) + !mdt_object_remote(child) && child != parent) { + mdt_object_put(info->mti_env, child); + rc = mdt_pack_size2body(info, child_fid, + &lhc->mlh_reg_lh); + if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { + /* DOM lock was taken in advance but this is + * not DoM file. Drop the lock. + */ + lock_res_and_lock(lock); + ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM); + unlock_res_and_lock(lock); + } + LDLM_LOCK_PUT(lock); + GOTO(out_parent, rc = 0); + } LDLM_LOCK_PUT(lock); + } EXIT; out_child: mdt_object_put(info->mti_env, child); - if (deal_with_dom) { - rc = mdt_pack_size2body(info, child_fid, - &lhc->mlh_reg_lh); - if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { - /* DOM lock was taken in advance but this is - * not DoM file. Drop the lock. - */ - lock_res_and_lock(lock); - ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM); - unlock_res_and_lock(lock); - } - rc = 0; - } out_parent: if (lhp) mdt_object_unlock(info, parent, lhp, 1); @@ -1942,6 +2001,219 @@ out_shrink: return rc; } +static int mdt_rmfid_unlink(struct mdt_thread_info *info, + const struct lu_fid *pfid, + const struct lu_name *name, + struct mdt_object *obj, s64 ctime) +{ + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct ldlm_enqueue_info *einfo = &info->mti_einfo[0]; + struct mdt_device *mdt = info->mti_mdt; + struct md_attr *ma = &info->mti_attr; + struct mdt_lock_handle *parent_lh; + struct mdt_lock_handle *child_lh; + struct mdt_object *pobj; + bool cos_incompat = false; + int rc; + ENTRY; + + pobj = mdt_object_find(info->mti_env, mdt, pfid); + if (IS_ERR(pobj)) + GOTO(out, rc = PTR_ERR(pobj)); + + parent_lh = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(parent_lh, LCK_PW, name); + rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE); + if (rc != 0) + GOTO(put_parent, rc); + + if (mdt_object_remote(pobj)) + cos_incompat = true; + + rc = mdo_lookup(info->mti_env, mdt_object_child(pobj), + name, child_fid, &info->mti_spec); + if (rc != 0) + GOTO(unlock_parent, rc); + + if (!lu_fid_eq(child_fid, mdt_object_fid(obj))) + GOTO(unlock_parent, rc = -EREMCHG); + + child_lh = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_reg_init(child_lh, LCK_EX); + rc = mdt_reint_striped_lock(info, obj, child_lh, + MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE, + einfo, cos_incompat); + if (rc != 0) + GOTO(unlock_parent, rc); + + if (atomic_read(&obj->mot_open_count)) { + CDEBUG(D_OTHER, "object "DFID" open, skip\n", + PFID(mdt_object_fid(obj))); + GOTO(unlock_child, rc = -EBUSY); + } + + ma->ma_need = 0; + ma->ma_valid = MA_INODE; + ma->ma_attr.la_valid = LA_CTIME; + ma->ma_attr.la_ctime = ctime; + + mutex_lock(&obj->mot_lov_mutex); + + rc = mdo_unlink(info->mti_env, mdt_object_child(pobj), + mdt_object_child(obj), name, ma, 0); + + mutex_unlock(&obj->mot_lov_mutex); + +unlock_child: + mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1); +unlock_parent: + mdt_object_unlock(info, pobj, parent_lh, 1); +put_parent: + mdt_object_put(info->mti_env, pobj); +out: + RETURN(rc); +} + +static int mdt_rmfid_check_permission(struct mdt_thread_info *info, + struct mdt_object *obj) +{ + struct lu_ucred *uc = lu_ucred(info->mti_env); + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + int rc = 0; + ENTRY; + + ma->ma_need = MA_INODE; + rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma); + if (rc) + GOTO(out, rc); + + if (la->la_flags & LUSTRE_IMMUTABLE_FL) + rc = -EACCES; + + if (md_capable(uc, CFS_CAP_DAC_OVERRIDE)) + RETURN(0); + if (uc->uc_fsuid == la->la_uid) { + if ((la->la_mode & S_IWUSR) == 0) + rc = -EACCES; + } else if (uc->uc_fsgid == la->la_gid) { + if ((la->la_mode & S_IWGRP) == 0) + rc = -EACCES; + } else if ((la->la_mode & S_IWOTH) == 0) { + rc = -EACCES; + } + +out: + RETURN(rc); +} + +static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid, + s64 ctime) +{ + struct mdt_device *mdt = info->mti_mdt; + struct mdt_object *obj = NULL; + struct linkea_data ldata = { NULL }; + struct lu_buf *buf = &info->mti_big_buf; + struct lu_name *name = &info->mti_name; + struct lu_fid *pfid = &info->mti_tmp_fid1; + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen, count, rc = 0; + ENTRY; + + if (!fid_is_sane(fid)) + GOTO(out, rc = -EINVAL); + + if (!fid_is_namespace_visible(fid)) + GOTO(out, rc = -EINVAL); + + obj = mdt_object_find(info->mti_env, mdt, fid); + if (IS_ERR(obj)) + GOTO(out, rc = PTR_ERR(obj)); + + if (mdt_object_remote(obj)) + GOTO(out, rc = -EREMOTE); + if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header)) + GOTO(out, rc = -ENOENT); + + rc = mdt_rmfid_check_permission(info, obj); + if (rc) + GOTO(out, rc); + + /* take LinkEA */ + buf = lu_buf_check_and_alloc(buf, PATH_MAX); + if (!buf->lb_buf) + GOTO(out, rc = -ENOMEM); + + ldata.ld_buf = buf; + rc = mdt_links_read(info, obj, &ldata); + if (rc) + GOTO(out, rc); + + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + for (count = 0; count < leh->leh_reccount; count++) { + /* remove every hardlink */ + linkea_entry_unpack(lee, &reclen, name, pfid); + lee = (struct link_ea_entry *) ((char *)lee + reclen); + rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime); + if (rc) + break; + } + +out: + if (obj && !IS_ERR(obj)) + mdt_object_put(info->mti_env, obj); + if (info->mti_big_buf.lb_buf) + lu_buf_free(&info->mti_big_buf); + + RETURN(rc); +} + +static int mdt_rmfid(struct tgt_session_info *tsi) +{ + struct mdt_thread_info *mti = tsi2mdt_info(tsi); + struct mdt_body *reqbody; + struct lu_fid *fids, *rfids; + int bufsize, rc; + __u32 *rcs; + int i, nr; + ENTRY; + + reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY); + if (reqbody == NULL) + RETURN(-EPROTO); + bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY, + RCL_CLIENT); + nr = bufsize / sizeof(struct lu_fid); + if (nr * sizeof(struct lu_fid) != bufsize) + RETURN(-EINVAL); + req_capsule_set_size(tsi->tsi_pill, &RMF_RCS, + RCL_SERVER, nr * sizeof(__u32)); + req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY, + RCL_SERVER, nr * sizeof(struct lu_fid)); + rc = req_capsule_server_pack(tsi->tsi_pill); + if (rc) + GOTO(out, rc = err_serious(rc)); + fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY); + if (fids == NULL) + RETURN(-EPROTO); + rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS); + LASSERT(rcs); + rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY); + LASSERT(rfids); + + mdt_init_ucred(mti, reqbody); + for (i = 0; i < nr; i++) { + rfids[i] = fids[i]; + rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime); + } + mdt_exit_ucred(mti); + +out: + RETURN(rc); +} + static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg); @@ -2196,7 +2468,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info, if (rc < 0) { GOTO(out_ucred, rc); } else if (rc == 1) { - DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); + DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt"); rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); GOTO(out_ucred, rc); } @@ -2302,7 +2574,7 @@ int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, struct mdt_object *mo) { - int rc; + int rc = 0; ENTRY; @@ -2313,7 +2585,16 @@ static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, RETURN(-ESTALE); } - rc = mo_object_sync(env, mdt_object_child(mo)); + if (S_ISREG(lu_object_attr(&mo->mot_obj))) { + struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt; + dt_obj_version_t version; + + version = dt_version_get(env, mdt_obj2dt(mo)); + if (version > tgt->lut_obd->obd_last_committed) + rc = mo_object_sync(env, mdt_object_child(mo)); + } else { + rc = mo_object_sync(env, mdt_object_child(mo)); + } RETURN(rc); } @@ -2335,6 +2616,9 @@ static int mdt_sync(struct tgt_session_info *tsi) } else { struct mdt_thread_info *info = tsi2mdt_info(tsi); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); + /* sync an object */ rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, info->mti_object); @@ -2443,12 +2727,14 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case LUSTRE_Q_SETDEFAULT: if (!nodemap_can_setquota(nodemap)) GOTO(out_nodemap, rc = -EPERM); + /* fallthrough */ case Q_GETINFO: case Q_GETQUOTA: case LUSTRE_Q_GETDEFAULT: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ + /* fallthrough */ case Q_GETOINFO: case Q_GETOQUOTA: break; @@ -3092,8 +3378,9 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, * object anyway XXX*/ if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) { - CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to" - "EX lock.\n", mdt_obd_name(info->mti_mdt), + CDEBUG(D_INFO, + "%s: "DFID" convert PDO lock to EX lock.\n", + mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(o))); lh->mlh_pdo_hash = 0; lh->mlh_rreg_mode = LCK_EX; @@ -3510,7 +3797,7 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi) cfs_fail_val == tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) { set_current_state(TASK_UNINTERRUPTIBLE); - schedule_timeout(msecs_to_jiffies(3 * MSEC_PER_SEC)); + schedule_timeout(cfs_time_seconds(3)); } return tgt_connect(tsi); @@ -3656,7 +3943,7 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, * If the xid matches, then we know this is a resent request, and allow * it. (It's probably an OPEN, for which we don't send a lock. */ - if (req_can_reconstruct(req, NULL)) + if (req_can_reconstruct(req, NULL) != 0) return; /* @@ -3702,7 +3989,10 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, if (ldlm_rep == NULL || OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) { mdt_object_unlock(info, info->mti_object, lhc, 1); - RETURN(err_serious(-EFAULT)); + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); } ldlm_rep->lock_policy_res2 = clear_serious(rc); @@ -3795,13 +4085,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, struct ldlm_lock **lockp, __u64 flags) { - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT]; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP }; struct layout_intent *intent; + struct ldlm_reply *ldlm_rep; struct lu_fid *fid = &info->mti_tmp_fid2; struct mdt_object *obj = NULL; int layout_size = 0; + struct lu_buf *buf = &layout.mlc_buf; int rc = 0; + ENTRY; fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); @@ -3829,24 +4122,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, case LAYOUT_INTENT_RESTORE: CERROR("%s: Unsupported layout intent opc %d\n", mdt_obd_name(info->mti_mdt), intent->li_opc); - rc = -ENOTSUPP; - break; + RETURN(-ENOTSUPP); default: CERROR("%s: Unknown layout intent opc %d\n", mdt_obd_name(info->mti_mdt), intent->li_opc); - rc = -EINVAL; - break; + RETURN(-EINVAL); } - if (rc < 0) - RETURN(rc); - - /* Get lock from request for possible resent case. */ - mdt_intent_fixup_resent(info, *lockp, lhc, flags); obj = mdt_object_find(info->mti_env, info->mti_mdt, fid); if (IS_ERR(obj)) - GOTO(out, rc = PTR_ERR(obj)); - + RETURN(PTR_ERR(obj)); if (mdt_object_exists(obj) && !mdt_object_remote(obj)) { /* if layout is going to be changed don't use the current EA @@ -3858,7 +4143,7 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, } else { layout_size = mdt_attr_get_eabuf_size(info, obj); if (layout_size < 0) - GOTO(out_obj, rc = layout_size); + GOTO(out, rc = layout_size); if (layout_size > info->mti_mdt->mdt_max_mdsize) info->mti_mdt->mdt_max_mdsize = layout_size; @@ -3871,72 +4156,68 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, * set reply buffer size, so that ldlm_handle_enqueue0()-> * ldlm_lvbo_fill() will fill the reply buffer with lovea. */ - (*lockp)->l_lvb_type = LVB_T_LAYOUT; req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER, layout_size); rc = req_capsule_server_pack(info->mti_pill); if (rc) - GOTO(out_obj, rc); + GOTO(out, rc); + ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (!ldlm_rep) + GOTO(out, rc = -EPROTO); - if (layout.mlc_opc != MD_LAYOUT_NOP) { - struct lu_buf *buf = &layout.mlc_buf; + mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); - /** - * mdt_layout_change is a reint operation, when the request - * is resent, layout write shouldn't reprocess it again. - */ - rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc); - if (rc) - GOTO(out_obj, rc = rc < 0 ? rc : 0); + /* take lock in ldlm_lock_enqueue() for LAYOUT_INTENT_ACCESS */ + if (layout.mlc_opc == MD_LAYOUT_NOP) + GOTO(out, rc = 0); - /** - * There is another resent case: the client's job has been - * done by another client, referring lod_declare_layout_change - * -EALREADY case, and it became a operation w/o transaction, - * so we should not do the layout change, otherwise - * mdt_layout_change() will try to cancel the granted server - * CR lock whose remote counterpart is still in hold on the - * client, and a deadlock ensues. - */ - rc = mdt_check_resent_lock(info, obj, lhc); - if (rc <= 0) - GOTO(out_obj, rc); - - buf->lb_buf = NULL; - buf->lb_len = 0; - if (unlikely(req_is_replay(mdt_info_req(info)))) { - buf->lb_buf = req_capsule_client_get(info->mti_pill, - &RMF_EADATA); - buf->lb_len = req_capsule_get_size(info->mti_pill, - &RMF_EADATA, RCL_CLIENT); - /* - * If it's a replay of layout write intent RPC, the - * client has saved the extended lovea when - * it get reply then. - */ - if (buf->lb_len > 0) - mdt_fix_lov_magic(info, buf->lb_buf); - } + rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc); + if (rc < 0) + GOTO(out, rc); + if (rc == 1) { + DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); + rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); + GOTO(out, rc); + } + + buf->lb_buf = NULL; + buf->lb_len = 0; + if (unlikely(req_is_replay(mdt_info_req(info)))) { + buf->lb_buf = req_capsule_client_get(info->mti_pill, + &RMF_EADATA); + buf->lb_len = req_capsule_get_size(info->mti_pill, + &RMF_EADATA, RCL_CLIENT); /* - * Instantiate some layout components, if @buf contains - * lovea, then it's a replay of the layout intent write - * RPC. + * If it's a replay of layout write intent RPC, the client has + * saved the extended lovea when it get reply then. */ - rc = mdt_layout_change(info, obj, &layout); - if (rc) - GOTO(out_obj, rc); + if (buf->lb_len > 0) + mdt_fix_lov_magic(info, buf->lb_buf); } -out_obj: - mdt_object_put(info->mti_env, obj); - if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh)) + /* Get lock from request for possible resent case. */ + mdt_intent_fixup_resent(info, *lockp, lhc, flags); + (*lockp)->l_lvb_type = LVB_T_LAYOUT; + + /* + * Instantiate some layout components, if @buf contains lovea, then it's + * a replay of the layout intent write RPC. + */ + rc = mdt_layout_change(info, obj, lhc, &layout); + ldlm_rep->lock_policy_res2 = clear_serious(rc); + + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc); + if (rc == ELDLM_LOCK_REPLACED && + (*lockp)->l_granted_mode == LCK_EX) + ldlm_lock_mode_downgrade(*lockp, LCK_CR); + } + EXIT; out: - lhc->mlh_reg_lh.cookie = 0; - - RETURN(rc); + mdt_object_put(info->mti_env, obj); + return rc; } static int mdt_intent_open(enum ldlm_intent_flags it_opc, @@ -3965,11 +4246,15 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, rc = mdt_reint_internal(info, lhc, opc); - /* Check whether the reply has been packed successfully. */ - if (mdt_info_req(info)->rq_repmsg != NULL) - rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); - if (rep == NULL) - RETURN(err_serious(-EFAULT)); + /* Check whether the reply has been packed successfully. */ + if (mdt_info_req(info)->rq_repmsg != NULL) + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (rep == NULL) { + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); + } /* MDC expects this in any case */ if (rc != 0) @@ -4023,6 +4308,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, u64); enum tgt_handler_flags it_handler_flags = 0; struct ldlm_reply *rep; + bool check_mdt_object = false; int rc; ENTRY; @@ -4039,12 +4325,15 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, it_handler = &mdt_intent_open; break; case IT_GETATTR: + check_mdt_object = true; + /* fallthrough */ case IT_LOOKUP: it_format = &RQF_LDLM_INTENT_GETATTR; it_handler = &mdt_intent_getattr; it_handler_flags = HAS_REPLY; break; case IT_GETXATTR: + check_mdt_object = true; it_format = &RQF_LDLM_INTENT_GETXATTR; it_handler = &mdt_intent_getxattr; it_handler_flags = HAS_BODY; @@ -4090,6 +4379,9 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, if (rc < 0) RETURN(rc); + if (unlikely(info->mti_object == NULL && check_mdt_object)) + RETURN(-EPROTO); + if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export)) RETURN(-EROFS); @@ -4783,9 +5075,12 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_qmt_dev = obd->obd_lu_dev; /* configure local quota objects */ - rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env, - &mdt->mdt_lu_dev, - mdt->mdt_qmt_dev); + if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_INIT)) + rc = -EBADF; + else + rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env, + &mdt->mdt_lu_dev, + mdt->mdt_qmt_dev); if (rc) GOTO(class_cleanup, rc); @@ -4805,6 +5100,7 @@ class_cleanup: if (rc) { class_manual_cleanup(obd); mdt->mdt_qmt_dev = NULL; + GOTO(lcfg_cleanup, rc); } class_detach: if (rc) @@ -4849,6 +5145,9 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi) struct mdt_thread_info *info = tsi2mdt_info(tsi); int rc; + if (unlikely(info->mti_object == NULL)) + return -EPROTO; + rc = mdt_getxattr(info); mdt_thread_info_fini(info); @@ -4898,6 +5197,7 @@ TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_REQUEST, TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_SWAP_LAYOUTS, mdt_swap_layouts), +TGT_MDT_HDL(IS_MUTABLE, MDS_RMFID, mdt_rmfid), }; static struct tgt_handler mdt_io_ops[] = { @@ -4998,8 +5298,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) ping_evictor_stop(); /* Remove the HSM /proc entry so the coordinator cannot be - * restarted by a user while it's shutting down. */ - hsm_cdt_procfs_fini(m); + * restarted by a user while it's shutting down. + */ mdt_hsm_cdt_stop(m); mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT); @@ -5019,7 +5319,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) /* Calling the cleanup functions in the same order as in the mdt_init0 * error path */ - mdt_procfs_fini(m); + mdt_tunables_fini(m); target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); @@ -5131,12 +5431,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); - init_rwsem(&m->mdt_squash.rsi_sem); + spin_lock_init(&m->mdt_squash.rsi_lock); spin_lock_init(&m->mdt_lock); m->mdt_enable_remote_dir = 1; m->mdt_enable_striped_dir = 1; m->mdt_enable_dir_migration = 1; m->mdt_enable_remote_dir_gid = 0; + m->mdt_enable_chprojid_gid = 0; + m->mdt_enable_remote_rename = 1; atomic_set(&m->mdt_mds_mds_conns, 0); atomic_set(&m->mdt_async_commit_count, 0); @@ -5268,7 +5570,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_free_hsm, rc); } - rc = mdt_procfs_init(m, dev); + rc = mdt_tunables_init(m, dev); if (rc) { CERROR("Can't init MDT lprocfs, rc %d\n", rc); GOTO(err_recovery, rc); @@ -5296,9 +5598,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, RETURN(0); err_procfs: - mdt_procfs_fini(m); + mdt_tunables_fini(m); err_recovery: - target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; err_free_hsm: @@ -5309,6 +5610,11 @@ err_los_fini: err_fs_cleanup: mdt_fs_cleanup(env, m); err_tgt: + /* keep recoverable clients */ + obd->obd_fail = 1; + target_recovery_fini(obd); + obd_exports_barrier(obd); + obd_zombie_barrier(); tgt_fini(env, &m->mdt_lut); err_free_ns: ldlm_namespace_free(m->mdt_namespace, NULL, 0); @@ -5350,12 +5656,12 @@ static int mdt_process_config(const struct lu_env *env, switch (cfg->lcfg_command) { case LCFG_PARAM: { - struct obd_device *obd = d->ld_obd; - + struct obd_device *obd = d->ld_obd; /* For interoperability */ - struct cfg_interop_param *ptr = NULL; - struct lustre_cfg *old_cfg = NULL; - char *param = NULL; + struct cfg_interop_param *ptr = NULL; + struct lustre_cfg *old_cfg = NULL; + char *param = NULL; + ssize_t count; param = lustre_cfg_string(cfg, 1); if (param == NULL) { @@ -5384,17 +5690,22 @@ static int mdt_process_config(const struct lu_env *env, } } - rc = class_process_proc_param(PARAM_MDT, obd->obd_vars, - cfg, obd); - if (rc > 0 || rc == -ENOSYS) { + count = class_modify_config(cfg, PARAM_MDT, + &obd->obd_kset.kobj); + if (count < 0) { + struct coordinator *cdt = &m->mdt_coordinator; + /* is it an HSM var ? */ - rc = class_process_proc_param(PARAM_HSM, - hsm_cdt_get_proc_vars(), - cfg, obd); - if (rc > 0 || rc == -ENOSYS) + count = class_modify_config(cfg, PARAM_HSM, + &cdt->cdt_hsm_kobj); + if (count < 0) /* we don't understand; pass it on */ rc = next->ld_ops->ldo_process_config(env, next, cfg); + else + rc = count > 0 ? 0 : count; + } else { + rc = count > 0 ? 0 : count; } if (old_cfg) @@ -5766,7 +6077,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, static int mdt_export_cleanup(struct obd_export *exp) { - struct list_head closing_list; + LIST_HEAD(closing_list); struct mdt_export_data *med = &exp->exp_mdt_data; struct obd_device *obd = exp->exp_obd; struct mdt_device *mdt; @@ -5776,7 +6087,6 @@ static int mdt_export_cleanup(struct obd_export *exp) int rc = 0; ENTRY; - INIT_LIST_HEAD(&closing_list); spin_lock(&med->med_open_lock); while (!list_empty(&med->med_open_head)) { struct list_head *tmp = med->med_open_head.next; @@ -5833,13 +6143,14 @@ static int mdt_export_cleanup(struct obd_export *exp) ma->ma_valid = MA_FLAGS; ma->ma_attr_flags |= MDS_KEEP_ORPHAN; } - mdt_mfd_close(info, mfd); - } - } - info->mti_mdt = NULL; - /* cleanup client slot early */ - /* Do not erase record for recoverable client. */ - if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) + ma->ma_valid |= MA_FORCE_LOG; + mdt_mfd_close(info, mfd); + } + } + info->mti_mdt = NULL; + /* cleanup client slot early */ + /* Do not erase record for recoverable client. */ + if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) tgt_client_del(&env, exp); lu_env_fini(&env); @@ -5848,14 +6159,14 @@ static int mdt_export_cleanup(struct obd_export *exp) static inline void mdt_enable_slc(struct mdt_device *mdt) { - if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL; + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING; } static inline void mdt_disable_slc(struct mdt_device *mdt) { - if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER; } static int mdt_obd_disconnect(struct obd_export *exp) @@ -6014,6 +6325,11 @@ static int mdt_init_export(struct obd_export *exp) exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); + OBD_ALLOC(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + if (exp->exp_used_slots == NULL) + RETURN(-ENOMEM); + /* self-export doesn't need client data and ldlm initialization */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, &exp->exp_client_uuid))) @@ -6032,6 +6348,10 @@ static int mdt_init_export(struct obd_export *exp) err_free: tgt_client_free(exp); err: + OBD_FREE(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + exp->exp_used_slots = NULL; + CERROR("%s: Failed to initialize export: rc = %d\n", exp->exp_obd->obd_name, rc); return rc; @@ -6042,6 +6362,10 @@ static int mdt_destroy_export(struct obd_export *exp) ENTRY; target_destroy_export(exp); + if (exp->exp_used_slots) + OBD_FREE(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + /* destroy can be called from failed obd_setup, so * checking uuid is safer than obd_self_export */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, @@ -6620,7 +6944,7 @@ static int mdt_obd_postrecov(struct obd_device *obd) return rc; } -static struct obd_ops mdt_obd_device_ops = { +static const struct obd_ops mdt_obd_device_ops = { .o_owner = THIS_MODULE, .o_set_info_async = mdt_obd_set_info_async, .o_connect = mdt_obd_connect, @@ -6769,10 +7093,10 @@ static int __init mdt_init(void) { int rc; - CLASSERT(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") == - FID_NOBRACE_LEN + 1); - CLASSERT(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") == - FID_LEN + 1); + BUILD_BUG_ON(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") != + FID_NOBRACE_LEN + 1); + BUILD_BUG_ON(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") != + FID_LEN + 1); rc = lu_kmem_init(mdt_caches); if (rc) return rc;