X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=efdbb0704a2c8ddec1351439a70563546b6360ec;hp=b0ad2574a5e94a87b5e92a9944fa6361bc7b5d73;hb=4918fe40db262b19093436caca688c75eb632496;hpb=470bdeec6ca5b4c68f456a10d68511653e67b378 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index b0ad257..efdbb07 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -58,6 +58,7 @@ #include #include #include +#include #include #include #include @@ -66,7 +67,6 @@ #include "mdt_internal.h" - static unsigned int max_mod_rpcs_per_client = 8; module_param(max_mod_rpcs_per_client, uint, 0644); MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client"); @@ -269,18 +269,13 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, { struct mdt_device *mdt = info->mti_mdt; struct lu_name *lname = &info->mti_name; - char *name = NULL; + char *filename = info->mti_filename; struct mdt_object *parent; u32 mode; int rc = 0; LASSERT(!info->mti_cross_ref); - OBD_ALLOC(name, NAME_MAX + 1); - if (name == NULL) - return -ENOMEM; - lname->ln_name = name; - /* * We may want to allow this to mount a completely separate * fileset from the MDT in the future, but keeping it to @@ -316,8 +311,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, break; } - strncpy(name, s1, lname->ln_namelen); - name[lname->ln_namelen] = '\0'; + strncpy(filename, s1, lname->ln_namelen); + filename[lname->ln_namelen] = '\0'; + lname->ln_name = filename; parent = mdt_object_find(info->mti_env, mdt, fid); if (IS_ERR(parent)) { @@ -342,8 +338,6 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, } } - OBD_FREE(name, NAME_MAX + 1); - return rc; } @@ -412,13 +406,17 @@ out: static int mdt_statfs(struct tgt_session_info *tsi) { - struct ptlrpc_request *req = tgt_ses_req(tsi); - struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_device *mdt = info->mti_mdt; - struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; - struct ptlrpc_service_part *svcpt; - struct obd_statfs *osfs; - int rc; + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct mdt_device *mdt = info->mti_mdt; + struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; + struct md_device *next = mdt->mdt_child; + struct ptlrpc_service_part *svcpt; + struct obd_statfs *osfs; + struct mdt_body *reqbody = NULL; + struct mdt_statfs_cache *msf; + ktime_t kstart = ktime_get(); + int rc; ENTRY; @@ -440,11 +438,39 @@ static int mdt_statfs(struct tgt_session_info *tsi) if (!osfs) GOTO(out, rc = -EPROTO); - rc = tgt_statfs_internal(tsi->tsi_env, &mdt->mdt_lut, osfs, - ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS, - NULL); - if (unlikely(rc)) - GOTO(out, rc); + if (mdt_is_sum_statfs_client(req->rq_export) && + lustre_packed_msg_size(req->rq_reqmsg) == + req_capsule_fmt_size(req->rq_reqmsg->lm_magic, + &RQF_MDS_STATFS_NEW, RCL_CLIENT)) { + req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW); + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + } + + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + msf = &mdt->mdt_sum_osfs; + else + msf = &mdt->mdt_osfs; + + if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) { + /** statfs data is too old, get up-to-date one */ + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + rc = next->md_ops->mdo_statfs(info->mti_env, + next, osfs); + else + rc = dt_statfs(info->mti_env, mdt->mdt_bottom, + osfs); + if (rc) + GOTO(out, rc); + spin_lock(&mdt->mdt_lock); + msf->msf_osfs = *osfs; + msf->msf_age = ktime_get_seconds(); + spin_unlock(&mdt->mdt_lock); + } else { + /** use cached statfs data */ + spin_lock(&mdt->mdt_lock); + *osfs = msf->msf_osfs; + spin_unlock(&mdt->mdt_lock); + } /* at least try to account for cached pages. its still racy and * might be under-reporting if clients haven't announced their @@ -479,12 +505,65 @@ static int mdt_statfs(struct tgt_session_info *tsi) osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT; } if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_STATFS); + mdt_counter_incr(req, LPROC_MDT_STATFS, + ktime_us_delta(ktime_get(), kstart)); out: mdt_thread_info_fini(info); RETURN(rc); } +__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only) +{ + struct lov_comp_md_v1 *comp_v1; + struct lov_mds_md *v1; + __u32 off; + __u32 dom_stripesize = 0; + int i; + bool has_ost_stripes = false; + + ENTRY; + + if (is_dom_only) + *is_dom_only = 0; + + if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1) + RETURN(0); + + comp_v1 = (struct lov_comp_md_v1 *)lmm; + off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + + /* Fast check for DoM entry with no mirroring, should be the first */ + if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 && + lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT) + RETURN(0); + + /* check all entries otherwise */ + for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) { + struct lov_comp_md_entry_v1 *lcme; + + lcme = &comp_v1->lcm_entries[i]; + if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT)) + continue; + + off = le32_to_cpu(lcme->lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + + if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) == + LOV_PATTERN_MDT) + dom_stripesize = le32_to_cpu(v1->lmm_stripe_size); + else + has_ost_stripes = true; + + if (dom_stripesize && has_ost_stripes) + RETURN(dom_stripesize); + } + /* DoM-only case exits here */ + if (is_dom_only && dom_stripesize) + *is_dom_only = 1; + RETURN(dom_stripesize); +} + /** * Pack size attributes into the reply. */ @@ -493,7 +572,7 @@ int mdt_pack_size2body(struct mdt_thread_info *info, { struct mdt_body *b; struct md_attr *ma = &info->mti_attr; - int dom_stripe; + __u32 dom_stripe; bool dom_lock = false; ENTRY; @@ -504,9 +583,9 @@ int mdt_pack_size2body(struct mdt_thread_info *info, !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)) RETURN(-ENODATA); - dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm); + dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm); /* no DoM stripe, no size in reply */ - if (dom_stripe == LMM_NO_DOM) + if (!dom_stripe) RETURN(-ENOENT); if (lustre_handle_is_used(lh)) { @@ -531,7 +610,7 @@ int mdt_pack_size2body(struct mdt_thread_info *info, RETURN(0); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL /* * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap. * @@ -573,25 +652,27 @@ again: exp_connect_large_acl(info->mti_exp) && buf->lb_buf != info->mti_big_acl) { if (info->mti_big_acl == NULL) { + info->mti_big_aclsize = + min_t(unsigned int, + mdt->mdt_max_ea_size, + XATTR_SIZE_MAX); OBD_ALLOC_LARGE(info->mti_big_acl, - mdt->mdt_max_ea_size); + info->mti_big_aclsize); if (info->mti_big_acl == NULL) { + info->mti_big_aclsize = 0; CERROR("%s: unable to grow " DFID" ACL buffer\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o))); RETURN(-ENOMEM); } - - info->mti_big_aclsize = - mdt->mdt_max_ea_size; } CDEBUG(D_INODE, "%s: grow the "DFID " ACL buffer to size %d\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o)), - mdt->mdt_max_ea_size); + info->mti_big_aclsize); buf->lb_buf = info->mti_big_acl; buf->lb_len = info->mti_big_aclsize; @@ -701,6 +782,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_ctime = attr->la_ctime; b->mbo_valid |= OBD_MD_FLCTIME; } + if (attr->la_valid & LA_BTIME) { + b->mbo_btime = attr->la_btime; + b->mbo_valid |= OBD_MD_FLBTIME; + } if (attr->la_valid & LA_FLAGS) { b->mbo_flags = attr->la_flags; b->mbo_valid |= OBD_MD_FLFLAGS; @@ -776,10 +861,15 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } else if (info->mti_som_valid) { /* som is valid */ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */ + b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS; + b->mbo_size = ma->ma_som.ms_size; + b->mbo_blocks = ma->ma_som.ms_blocks; } } - if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE)) + if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE || + b->mbo_valid & OBD_MD_FLLAZYSIZE)) CDEBUG(D_VFSTRACE, DFID": returning size %llu\n", PFID(fid), (unsigned long long)b->mbo_size); @@ -892,8 +982,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, RETURN(rc); } -int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, - struct md_attr *ma, const char *name) +int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, + struct md_attr *ma, const char *name) { struct md_object *next = mdt_object_child(o); struct lu_buf *buf = &info->mti_buf; @@ -908,13 +998,15 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, buf->lb_len = ma->ma_lmv_size; LASSERT(!(ma->ma_valid & MA_LMV)); } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - buf->lb_buf = ma->ma_lmv; - buf->lb_len = ma->ma_lmv_size; + buf->lb_buf = ma->ma_default_lmv; + buf->lb_len = ma->ma_default_lmv_size; LASSERT(!(ma->ma_valid & MA_LMV_DEF)); } else { return -EINVAL; } + LASSERT(buf->lb_buf); + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { @@ -940,7 +1032,7 @@ got: ma->ma_lmv_size = rc; ma->ma_valid |= MA_LMV; } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - ma->ma_lmv_size = rc; + ma->ma_default_lmv_size = rc; ma->ma_valid |= MA_LMV_DEF; } @@ -967,6 +1059,40 @@ got: return rc; } +int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, + struct md_attr *ma, const char *name) +{ + int rc; + + if (!info->mti_big_lmm) { + OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE); + if (!info->mti_big_lmm) + return -ENOMEM; + info->mti_big_lmmsize = PAGE_SIZE; + } + + if (strcmp(name, XATTR_NAME_LOV) == 0) { + ma->ma_lmm = info->mti_big_lmm; + ma->ma_lmm_size = info->mti_big_lmmsize; + ma->ma_valid &= ~MA_LOV; + } else if (strcmp(name, XATTR_NAME_LMV) == 0) { + ma->ma_lmv = info->mti_big_lmm; + ma->ma_lmv_size = info->mti_big_lmmsize; + ma->ma_valid &= ~MA_LMV; + } else { + LBUG(); + } + + LASSERT(!info->mti_big_lmm_used); + rc = __mdt_stripe_get(info, o, ma, name); + /* since big_lmm is always used here, clear 'used' flag to avoid + * assertion in mdt_big_xattr_get(). + */ + info->mti_big_lmm_used = 0; + + return rc; +} + int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, struct lu_fid *pfid) { @@ -1014,6 +1140,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, RETURN(0); } +int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o, + struct lu_fid *pfid, struct lu_name *lname) +{ + struct lu_buf *buf = &info->mti_buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen; + int rc; + + buf->lb_buf = info->mti_xattr_buf; + buf->lb_len = sizeof(info->mti_xattr_buf); + rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf, + XATTR_NAME_LINK); + if (rc == -ERANGE) { + rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK); + buf->lb_buf = info->mti_big_lmm; + buf->lb_len = info->mti_big_lmmsize; + } + if (rc < 0) + return rc; + + if (rc < sizeof(*leh)) { + CERROR("short LinkEA on "DFID": rc = %d\n", + PFID(mdt_object_fid(o)), rc); + return -ENODATA; + } + + leh = (struct link_ea_header *)buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + } + if (leh->leh_magic != LINK_EA_MAGIC) + return -EINVAL; + + if (leh->leh_reccount == 0) + return -ENODATA; + + linkea_entry_unpack(lee, &reclen, lname, pfid); + + return 0; +} + int mdt_attr_get_complex(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma) { @@ -1051,19 +1222,19 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, } if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV); if (rc) GOTO(out, rc); } if (need & MA_LMV && S_ISDIR(mode)) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); if (rc != 0) GOTO(out, rc); } if (need & MA_LMV_DEF && S_ISDIR(mode)) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV); if (rc != 0) GOTO(out, rc); } @@ -1080,8 +1251,8 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, if (need & MA_HSM && S_ISREG(mode)) { buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); - CLASSERT(sizeof(struct hsm_attrs) <= - sizeof(info->mti_xattr_buf)); + BUILD_BUG_ON(sizeof(struct hsm_attrs) > + sizeof(info->mti_xattr_buf)); rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM); rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm); if (rc2 == 0) @@ -1090,7 +1261,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc = rc2); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL if (need & MA_ACL_DEF && S_ISDIR(mode)) { buf->lb_buf = ma->ma_acl; buf->lb_len = ma->ma_acl_size; @@ -1113,19 +1284,22 @@ out: } static int mdt_getattr_internal(struct mdt_thread_info *info, - struct mdt_object *o, int ma_need) + struct mdt_object *o, int ma_need) { - struct md_object *next = mdt_object_child(o); - const struct mdt_body *reqbody = info->mti_body; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct lu_attr *la = &ma->ma_attr; - struct req_capsule *pill = info->mti_pill; - const struct lu_env *env = info->mti_env; - struct mdt_body *repbody; - struct lu_buf *buffer = &info->mti_buf; - struct obd_export *exp = info->mti_exp; - int rc; + struct mdt_device *mdt = info->mti_mdt; + struct md_object *next = mdt_object_child(o); + const struct mdt_body *reqbody = info->mti_body; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + struct req_capsule *pill = info->mti_pill; + const struct lu_env *env = info->mti_env; + struct mdt_body *repbody; + struct lu_buf *buffer = &info->mti_buf; + struct obd_export *exp = info->mti_exp; + ktime_t kstart = ktime_get(); + int rc; + ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) @@ -1161,34 +1335,64 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, req->rq_export->exp_client_uuid.uuid); } - /* If it is dir object and client require MEA, then we got MEA */ + /* from 2.12.58 intent_getattr pack default LMV in reply */ if (S_ISDIR(lu_object_attr(&next->mo_lu)) && - (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) { + ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) == + (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) && + req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD, + RCL_SERVER)) { + ma->ma_lmv = buffer->lb_buf; + ma->ma_lmv_size = buffer->lb_len; + ma->ma_default_lmv = req_capsule_server_get(pill, + &RMF_DEFAULT_MDT_MD); + ma->ma_default_lmv_size = req_capsule_get_size(pill, + &RMF_DEFAULT_MDT_MD, + RCL_SERVER); + ma->ma_need = MA_INODE; + if (ma->ma_lmv_size > 0) + ma->ma_need |= MA_LMV; + if (ma->ma_default_lmv_size > 0) + ma->ma_need |= MA_LMV_DEF; + } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) && + (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) { + /* If it is dir and client require MEA, then we got MEA */ /* Assumption: MDT_MD size is enough for lmv size. */ ma->ma_lmv = buffer->lb_buf; ma->ma_lmv_size = buffer->lb_len; ma->ma_need = MA_INODE; if (ma->ma_lmv_size > 0) { - if (reqbody->mbo_valid & OBD_MD_MEA) + if (reqbody->mbo_valid & OBD_MD_MEA) { ma->ma_need |= MA_LMV; - else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) + } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) { ma->ma_need |= MA_LMV_DEF; + ma->ma_default_lmv = buffer->lb_buf; + ma->ma_lmv = NULL; + ma->ma_default_lmv_size = buffer->lb_len; + ma->ma_lmv_size = 0; + } } } else { ma->ma_lmm = buffer->lb_buf; ma->ma_lmm_size = buffer->lb_len; ma->ma_need = MA_INODE | MA_HSM; - if (ma->ma_lmm_size > 0) + if (ma->ma_lmm_size > 0) { ma->ma_need |= MA_LOV; + /* Older clients may crash if they getattr overstriped + * files + */ + if (!exp_connect_overstriping(exp) && + mdt_lmm_is_overstriping(ma->ma_lmm)) + RETURN(-EOPNOTSUPP); + } } - if (S_ISDIR(lu_object_attr(&next->mo_lu)) && + if (S_ISDIR(lu_object_attr(&next->mo_lu)) && reqbody->mbo_valid & OBD_MD_FLDIREA && - lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) { - /* get default stripe info for this dir. */ - ma->ma_need |= MA_LOV_DEF; - } - ma->ma_need |= ma_need; + lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) { + /* get default stripe info for this dir. */ + ma->ma_need |= MA_LOV_DEF; + } + ma->ma_need |= ma_need; rc = mdt_attr_get_complex(info, o, ma); if (unlikely(rc)) { @@ -1207,22 +1411,27 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_t_state = MS_RESTORE; } - if (likely(ma->ma_valid & MA_INODE)) - mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); - else - RETURN(-EFAULT); + if (unlikely(!(ma->ma_valid & MA_INODE))) + RETURN(-EFAULT); + + mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); - if (mdt_body_has_lov(la, reqbody)) { - if (ma->ma_valid & MA_LOV) { - LASSERT(ma->ma_lmm_size); + if (mdt_body_has_lov(la, reqbody)) { + u32 stripe_count = 1; + + if (ma->ma_valid & MA_LOV) { + LASSERT(ma->ma_lmm_size); repbody->mbo_eadatasize = ma->ma_lmm_size; if (S_ISDIR(la->la_mode)) repbody->mbo_valid |= OBD_MD_FLDIREA; else repbody->mbo_valid |= OBD_MD_FLEASIZE; mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid); - } + } if (ma->ma_valid & MA_LMV) { + struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1; + u32 magic = le32_to_cpu(lmv->lmv_magic); + /* Return -ENOTSUPP for old client */ if (!mdt_is_striped_client(req->rq_export)) RETURN(-ENOTSUPP); @@ -1231,17 +1440,41 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, mdt_dump_lmv(D_INFO, ma->ma_lmv); repbody->mbo_eadatasize = ma->ma_lmv_size; repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA); + + stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv)) + mdt_restripe_migrate_add(info, o); + else if (magic == LMV_MAGIC_V1 && + lmv_is_restriping(lmv)) + mdt_restripe_update_add(info, o); } if (ma->ma_valid & MA_LMV_DEF) { /* Return -ENOTSUPP for old client */ if (!mdt_is_striped_client(req->rq_export)) RETURN(-ENOTSUPP); LASSERT(S_ISDIR(la->la_mode)); - mdt_dump_lmv(D_INFO, ma->ma_lmv); - repbody->mbo_eadatasize = ma->ma_lmv_size; + /* + * when ll_dir_getstripe() gets default LMV, it + * checks mbo_eadatasize. + */ + if (!(ma->ma_valid & MA_LMV)) + repbody->mbo_eadatasize = + ma->ma_default_lmv_size; repbody->mbo_valid |= (OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA); } + CDEBUG(D_VFSTRACE, + "dirent count %llu stripe count %u MDT count %d\n", + ma->ma_attr.la_dirent_count, stripe_count, + atomic_read(&mdt->mdt_mds_mds_conns) + 1); + if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET && + ma->ma_attr.la_dirent_count > + mdt->mdt_restriper.mdr_dir_split_count && + !fid_is_root(mdt_object_fid(o)) && + mdt->mdt_enable_dir_auto_split && + !o->mot_restriping && + stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1) + mdt_auto_split_add(info, o); } else if (S_ISLNK(la->la_mode) && reqbody->mbo_valid & OBD_MD_LINKNAME) { buffer->lb_buf = ma->ma_lmm; @@ -1279,8 +1512,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, print_limit < rc ? "..." : "", print_limit, (char *)ma->ma_lmm + rc - print_limit, rc); rc = 0; - } - } + } + } if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) { repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize; @@ -1289,7 +1522,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_max_mdsize); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) && (reqbody->mbo_valid & OBD_MD_FLACL)) { struct lu_nodemap *nodemap = nodemap_get_from_exp(exp); @@ -1302,10 +1535,11 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, #endif out: - if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_GETATTR); + if (rc == 0) + mdt_counter_incr(req, LPROC_MDT_GETATTR, + ktime_us_delta(ktime_get(), kstart)); - RETURN(rc); + RETURN(rc); } static int mdt_getattr(struct tgt_session_info *tsi) @@ -1318,9 +1552,11 @@ static int mdt_getattr(struct tgt_session_info *tsi) int rc, rc2; ENTRY; - reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); - LASSERT(reqbody); - LASSERT(obj != NULL); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); + + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); + LASSERT(reqbody); LASSERT(lu_object_assert_exists(&obj->mot_obj)); /* Special case for Data-on-MDT files to get data version */ @@ -1395,42 +1631,59 @@ out: /** * Handler of layout intent RPC requiring the layout modification * - * \param[in] info thread environment - * \param[in] obj object - * \param[in] layout layout change descriptor + * \param[in] info thread environment + * \param[in] obj object + * \param[out] lhc object ldlm lock handle + * \param[in] layout layout change descriptor * * \retval 0 on success * \retval < 0 error code */ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lhc, struct md_layout_change *layout) { - struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL]; int rc; + ENTRY; if (!mdt_object_exists(obj)) - GOTO(out, rc = -ENOENT); + RETURN(-ENOENT); if (!S_ISREG(lu_object_attr(&obj->mot_obj))) - GOTO(out, rc = -EINVAL); + RETURN(-EINVAL); rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL, MAY_WRITE); if (rc) - GOTO(out, rc); + RETURN(rc); - /* take layout lock to prepare layout change */ - mdt_lock_reg_init(lh, LCK_EX); - rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT); - if (rc) - GOTO(out, rc); + rc = mdt_check_resent_lock(info, obj, lhc); + if (rc < 0) + RETURN(rc); + + if (rc > 0) { + /* not resent */ + __u64 lockpart = MDS_INODELOCK_LAYOUT; + + /* take layout lock to prepare layout change */ + if (layout->mlc_opc == MD_LAYOUT_WRITE) + lockpart |= MDS_INODELOCK_UPDATE; + + mdt_lock_handle_init(lhc); + mdt_lock_reg_init(lhc, LCK_EX); + rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false); + if (rc) + RETURN(rc); + } mutex_lock(&obj->mot_som_mutex); rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout); mutex_unlock(&obj->mot_som_mutex); - mdt_object_unlock(info, obj, lh, 1); -out: + + if (rc) + mdt_object_unlock(info, obj, lhc, 1); + RETURN(rc); } @@ -1478,6 +1731,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) RETURN(-EOPNOTSUPP); info = tsi2mdt_info(tsi); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); if (info->mti_dlm_req != NULL) ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); @@ -1501,12 +1756,12 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) /* permission check. Make sure the calling process having permission * to write both files. */ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); @@ -1587,14 +1842,15 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, __u64 child_bits, struct ldlm_reply *ldlm_rep) { - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_body *reqbody = NULL; - struct mdt_object *parent = info->mti_object; - struct mdt_object *child; - struct lu_fid *child_fid = &info->mti_tmp_fid1; - struct lu_name *lname = NULL; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_body *reqbody = NULL; + struct mdt_object *parent = info->mti_object; + struct mdt_object *child = NULL; + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct lu_name *lname = NULL; struct mdt_lock_handle *lhp = NULL; - struct ldlm_lock *lock; + struct ldlm_lock *lock; + struct req_capsule *pill = info->mti_pill; __u64 try_bits = 0; bool is_resent; int ma_need = 0; @@ -1626,12 +1882,15 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_lock_reg_init(lhc, LCK_PR); /* - * Object's name is on another MDS, no lookup or layout - * lock is needed here but update lock is. + * Object's name entry is on another MDS, it will + * request PERM lock only because LOOKUP lock is owned + * by the MDS where name entry resides. + * + * TODO: it should try layout lock too. - Jinshan */ child_bits &= ~(MDS_INODELOCK_LOOKUP | MDS_INODELOCK_LAYOUT); - child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE; + child_bits |= MDS_INODELOCK_PERM; rc = mdt_object_lock(info, child, lhc, child_bits); if (rc < 0) @@ -1648,29 +1907,64 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } rc = mdt_getattr_internal(info, child, 0); - if (unlikely(rc != 0)) + if (unlikely(rc != 0)) { mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } - RETURN(rc); - } + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) + mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } lname = &info->mti_name; - mdt_name_unpack(info->mti_pill, &RMF_NAME, lname, MNF_FIX_ANON); + mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON); if (lu_name_is_valid(lname)) { + if (mdt_object_remote(parent)) { + CERROR("%s: parent "DFID" is on remote target\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(parent))); + RETURN(-EPROTO); + } + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", " "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), PNAME(lname), ldlm_rep); } else { - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); if (unlikely(reqbody == NULL)) RETURN(err_serious(-EPROTO)); *child_fid = reqbody->mbo_fid2; - if (unlikely(!fid_is_sane(child_fid))) RETURN(err_serious(-EINVAL)); + if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { + mdt_object_get(info->mti_env, parent); + child = parent; + } else { + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } + + if (mdt_object_remote(child)) { + CERROR("%s: child "DFID" is on remote target\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(child))); + GOTO(out_child, rc = -EPROTO); + } + + /* don't fetch LOOKUP lock if it's remote object */ + rc = mdt_is_remote_object(info, parent, child); + if (rc < 0) + GOTO(out_child, rc); + if (rc) + child_bits &= ~MDS_INODELOCK_LOOKUP; + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), @@ -1683,14 +1977,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, LU_OBJECT_DEBUG(D_INODE, info->mti_env, &parent->mot_obj, "Parent doesn't exist!"); - RETURN(-ESTALE); - } - - if (mdt_object_remote(parent)) { - CERROR("%s: parent "DFID" is on remote target\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(parent))); - RETURN(-EIO); + GOTO(out_child, rc = -ESTALE); } if (lu_name_is_valid(lname)) { @@ -1724,30 +2011,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); if (rc != 0) - GOTO(out_parent, rc); - } - - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + GOTO(unlock_parent, rc); - /* - *step 3: find the child object by fid & lock it. - * regardless if it is local or remote. - * - *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e) - * set parent dir fid the same as child fid in getattr by fid case - * we should not lu_object_find() the object again, could lead - * to hung if there is a concurrent unlink destroyed the object. - */ - if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { - mdt_object_get(info->mti_env, parent); - child = parent; - } else { child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); + if (unlikely(IS_ERR(child))) + GOTO(unlock_parent, rc = PTR_ERR(child)); } - if (unlikely(IS_ERR(child))) - GOTO(out_parent, rc = PTR_ERR(child)); + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + + /* step 3: lock child regardless if it is local or remote. */ + LASSERT(child); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); if (!mdt_object_exists(child)) { @@ -1765,7 +2040,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_lock_reg_init(lhc, LCK_PR); if (!(child_bits & MDS_INODELOCK_UPDATE) && - mdt_object_exists(child) && !mdt_object_remote(child)) { + !mdt_object_remote(child)) { struct md_attr *ma = &info->mti_attr; ma->ma_valid = 0; @@ -1779,12 +2054,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, * lock and this might save us RPC on later STAT. For * directories, it also let negative dentry cache start * working for this dir. */ - if (ma->ma_valid & MA_INODE && - ma->ma_attr.la_valid & LA_CTIME && - info->mti_mdt->mdt_namespace->ns_ctime_age_limit + - ma->ma_attr.la_ctime < ktime_get_real_seconds()) - child_bits |= MDS_INODELOCK_UPDATE; - } + if (ma->ma_valid & MA_INODE && + ma->ma_attr.la_valid & LA_CTIME && + info->mti_mdt->mdt_namespace->ns_ctime_age_limit + + ma->ma_attr.la_ctime < ktime_get_real_seconds()) + child_bits |= MDS_INODELOCK_UPDATE; + } /* layout lock must be granted in a best-effort way * for IT operations */ @@ -1815,17 +2090,25 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, child_bits &= ~MDS_INODELOCK_UPDATE; rc = mdt_object_lock(info, child, lhc, child_bits); } - if (unlikely(rc != 0)) - GOTO(out_child, rc); - } + if (unlikely(rc != 0)) + GOTO(out_child, rc); + } - lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + /* finally, we can get attr for child. */ + rc = mdt_getattr_internal(info, child, ma_need); + if (unlikely(rc != 0)) { + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } - /* finally, we can get attr for child. */ - rc = mdt_getattr_internal(info, child, ma_need); - if (unlikely(rc != 0)) { - mdt_object_unlock(info, child, lhc, 1); - } else if (lock) { + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) { + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } + + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + if (lock) { /* Debugging code. */ LDLM_DEBUG(lock, "Returning lock to client"); LASSERTF(fid_res_name_eq(mdt_object_fid(child), @@ -1835,36 +2118,32 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, PFID(mdt_object_fid(child))); if (S_ISREG(lu_object_attr(&child->mot_obj)) && - mdt_object_exists(child) && !mdt_object_remote(child) && - child != parent) { - LDLM_LOCK_PUT(lock); + !mdt_object_remote(child) && child != parent) { mdt_object_put(info->mti_env, child); - /* NB: call the mdt_pack_size2body always after - * mdt_object_put(), that is why this speacial - * exit path is used. */ rc = mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh); if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { /* DOM lock was taken in advance but this is - * not DoM file. Drop the lock. */ + * not DoM file. Drop the lock. + */ lock_res_and_lock(lock); ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM); unlock_res_and_lock(lock); } - - GOTO(out_parent, rc = 0); + LDLM_LOCK_PUT(lock); + GOTO(unlock_parent, rc = 0); } - } - if (lock) - LDLM_LOCK_PUT(lock); + LDLM_LOCK_PUT(lock); + } - EXIT; + EXIT; out_child: - mdt_object_put(info->mti_env, child); -out_parent: - if (lhp) - mdt_object_unlock(info, parent, lhp, 1); - return rc; + if (child) + mdt_object_put(info->mti_env, child); +unlock_parent: + if (lhp) + mdt_object_unlock(info, parent, lhp, 1); + return rc; } /* normal handler: should release the child lock */ @@ -1886,24 +2165,237 @@ static int mdt_getattr_name(struct tgt_session_info *tsi) repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - rc = mdt_init_ucred_intent_getattr(info, reqbody); - if (unlikely(rc)) - GOTO(out_shrink, rc); + rc = mdt_init_ucred_intent_getattr(info, reqbody); + if (unlikely(rc)) + GOTO(out_shrink, rc); + + rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { + ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode); + lhc->mlh_reg_lh.cookie = 0; + } + mdt_exit_ucred(info); + EXIT; +out_shrink: + mdt_client_compatibility(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + mdt_thread_info_fini(info); + return rc; +} + +static int mdt_rmfid_unlink(struct mdt_thread_info *info, + const struct lu_fid *pfid, + const struct lu_name *name, + struct mdt_object *obj, s64 ctime) +{ + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct ldlm_enqueue_info *einfo = &info->mti_einfo[0]; + struct mdt_device *mdt = info->mti_mdt; + struct md_attr *ma = &info->mti_attr; + struct mdt_lock_handle *parent_lh; + struct mdt_lock_handle *child_lh; + struct mdt_object *pobj; + bool cos_incompat = false; + int rc; + ENTRY; + + pobj = mdt_object_find(info->mti_env, mdt, pfid); + if (IS_ERR(pobj)) + GOTO(out, rc = PTR_ERR(pobj)); + + parent_lh = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(parent_lh, LCK_PW, name); + rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE); + if (rc != 0) + GOTO(put_parent, rc); + + if (mdt_object_remote(pobj)) + cos_incompat = true; + + rc = mdo_lookup(info->mti_env, mdt_object_child(pobj), + name, child_fid, &info->mti_spec); + if (rc != 0) + GOTO(unlock_parent, rc); + + if (!lu_fid_eq(child_fid, mdt_object_fid(obj))) + GOTO(unlock_parent, rc = -EREMCHG); + + child_lh = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_reg_init(child_lh, LCK_EX); + rc = mdt_reint_striped_lock(info, obj, child_lh, + MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE, + einfo, cos_incompat); + if (rc != 0) + GOTO(unlock_parent, rc); + + if (atomic_read(&obj->mot_open_count)) { + CDEBUG(D_OTHER, "object "DFID" open, skip\n", + PFID(mdt_object_fid(obj))); + GOTO(unlock_child, rc = -EBUSY); + } + + ma->ma_need = 0; + ma->ma_valid = MA_INODE; + ma->ma_attr.la_valid = LA_CTIME; + ma->ma_attr.la_ctime = ctime; + + mutex_lock(&obj->mot_lov_mutex); + + rc = mdo_unlink(info->mti_env, mdt_object_child(pobj), + mdt_object_child(obj), name, ma, 0); + + mutex_unlock(&obj->mot_lov_mutex); + +unlock_child: + mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1); +unlock_parent: + mdt_object_unlock(info, pobj, parent_lh, 1); +put_parent: + mdt_object_put(info->mti_env, pobj); +out: + RETURN(rc); +} + +static int mdt_rmfid_check_permission(struct mdt_thread_info *info, + struct mdt_object *obj) +{ + struct lu_ucred *uc = lu_ucred(info->mti_env); + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + int rc = 0; + ENTRY; + + ma->ma_need = MA_INODE; + rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma); + if (rc) + GOTO(out, rc); + + if (la->la_flags & LUSTRE_IMMUTABLE_FL) + rc = -EACCES; + + if (md_capable(uc, CFS_CAP_DAC_OVERRIDE)) + RETURN(0); + if (uc->uc_fsuid == la->la_uid) { + if ((la->la_mode & S_IWUSR) == 0) + rc = -EACCES; + } else if (uc->uc_fsgid == la->la_gid) { + if ((la->la_mode & S_IWGRP) == 0) + rc = -EACCES; + } else if ((la->la_mode & S_IWOTH) == 0) { + rc = -EACCES; + } + +out: + RETURN(rc); +} + +static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid, + s64 ctime) +{ + struct mdt_device *mdt = info->mti_mdt; + struct mdt_object *obj = NULL; + struct linkea_data ldata = { NULL }; + struct lu_buf *buf = &info->mti_big_buf; + struct lu_name *name = &info->mti_name; + struct lu_fid *pfid = &info->mti_tmp_fid1; + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen, count, rc = 0; + ENTRY; + + if (!fid_is_sane(fid)) + GOTO(out, rc = -EINVAL); + + if (!fid_is_namespace_visible(fid)) + GOTO(out, rc = -EINVAL); + + obj = mdt_object_find(info->mti_env, mdt, fid); + if (IS_ERR(obj)) + GOTO(out, rc = PTR_ERR(obj)); + + if (mdt_object_remote(obj)) + GOTO(out, rc = -EREMOTE); + if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header)) + GOTO(out, rc = -ENOENT); + + rc = mdt_rmfid_check_permission(info, obj); + if (rc) + GOTO(out, rc); + + /* take LinkEA */ + buf = lu_buf_check_and_alloc(buf, PATH_MAX); + if (!buf->lb_buf) + GOTO(out, rc = -ENOMEM); + + ldata.ld_buf = buf; + rc = mdt_links_read(info, obj, &ldata); + if (rc) + GOTO(out, rc); + + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + for (count = 0; count < leh->leh_reccount; count++) { + /* remove every hardlink */ + linkea_entry_unpack(lee, &reclen, name, pfid); + lee = (struct link_ea_entry *) ((char *)lee + reclen); + rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime); + if (rc) + break; + } + +out: + if (obj && !IS_ERR(obj)) + mdt_object_put(info->mti_env, obj); + if (info->mti_big_buf.lb_buf) + lu_buf_free(&info->mti_big_buf); + + RETURN(rc); +} + +static int mdt_rmfid(struct tgt_session_info *tsi) +{ + struct mdt_thread_info *mti = tsi2mdt_info(tsi); + struct mdt_body *reqbody; + struct lu_fid *fids, *rfids; + int bufsize, rc; + __u32 *rcs; + int i, nr; + ENTRY; + + reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY); + if (reqbody == NULL) + RETURN(-EPROTO); + bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY, + RCL_CLIENT); + nr = bufsize / sizeof(struct lu_fid); + if (nr * sizeof(struct lu_fid) != bufsize) + RETURN(-EINVAL); + req_capsule_set_size(tsi->tsi_pill, &RMF_RCS, + RCL_SERVER, nr * sizeof(__u32)); + req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY, + RCL_SERVER, nr * sizeof(struct lu_fid)); + rc = req_capsule_server_pack(tsi->tsi_pill); + if (rc) + GOTO(out, rc = err_serious(rc)); + fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY); + if (fids == NULL) + RETURN(-EPROTO); + rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS); + LASSERT(rcs); + rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY); + LASSERT(rfids); - rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); - if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { - ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode); - lhc->mlh_reg_lh.cookie = 0; - } - mdt_exit_ucred(info); - EXIT; -out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - mdt_thread_info_fini(info); - return rc; + mdt_init_ucred(mti, reqbody); + for (i = 0; i < nr; i++) { + rfids[i] = fids[i]; + rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime); + } + mdt_exit_ucred(mti); + +out: + RETURN(rc); } static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, @@ -1959,6 +2451,8 @@ static int mdt_set_info(struct tgt_session_info *tsi) __swab32s(&cs->cs_id); } + if (!mdt_is_rootadmin(tsi2mdt_info(tsi))) + RETURN(-EACCES); rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export, vallen, val, NULL); } else if (KEY_IS(KEY_EVICT_BY_NID)) { @@ -2008,17 +2502,17 @@ static int mdt_readpage(struct tgt_session_info *tsi) exp_max_brw_size(tsi->tsi_exp)); rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT; - OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); - if (rdpg->rp_pages == NULL) - RETURN(-ENOMEM); + OBD_ALLOC_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages); + if (rdpg->rp_pages == NULL) + RETURN(-ENOMEM); - for (i = 0; i < rdpg->rp_npages; ++i) { + for (i = 0; i < rdpg->rp_npages; ++i) { rdpg->rp_pages[i] = alloc_page(GFP_NOFS); - if (rdpg->rp_pages[i] == NULL) - GOTO(free_rdpg, rc = -ENOMEM); - } + if (rdpg->rp_pages[i] == NULL) + GOTO(free_rdpg, rc = -ENOMEM); + } - /* call lower layers to fill allocated pages with directory data */ + /* call lower layers to fill allocated pages with directory data */ rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg); if (rc < 0) GOTO(free_rdpg, rc); @@ -2032,7 +2526,7 @@ free_rdpg: for (i = 0; i < rdpg->rp_npages; i++) if (rdpg->rp_pages[i] != NULL) __free_page(rdpg->rp_pages[i]); - OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); + OBD_FREE_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) RETURN(0); @@ -2060,6 +2554,34 @@ static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op) return 0; } +static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op) +{ + return op == REINT_OPEN && + !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT)); +} + +static void mdt_preset_secctx_size(struct mdt_thread_info *info) +{ + struct req_capsule *pill = info->mti_pill; + + if (req_capsule_has_field(pill, &RMF_FILE_SECCTX, + RCL_SERVER) && + req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT)) { + if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT) != 0) { + /* pre-set size in server part with max size */ + req_capsule_set_size(pill, &RMF_FILE_SECCTX, + RCL_SERVER, + info->mti_mdt->mdt_max_ea_size); + } else { + req_capsule_set_size(pill, &RMF_FILE_SECCTX, + RCL_SERVER, 0); + } + } + +} + static int mdt_reint_internal(struct mdt_thread_info *info, struct mdt_lock_handle *lhc, __u32 op) @@ -2070,19 +2592,25 @@ static int mdt_reint_internal(struct mdt_thread_info *info, ENTRY; - rc = mdt_reint_unpack(info, op); - if (rc != 0) { - CERROR("Can't unpack reint, rc %d\n", rc); - RETURN(err_serious(rc)); - } + rc = mdt_reint_unpack(info, op); + if (rc != 0) { + CERROR("Can't unpack reint, rc %d\n", rc); + RETURN(err_serious(rc)); + } - /* for replay (no_create) lmm is not needed, client has it already */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, + + /* check if the file system is set to readonly. O_RDONLY open + * is still allowed even the file system is set to readonly mode */ + if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op)) + RETURN(err_serious(-EROFS)); + + /* for replay (no_create) lmm is not needed, client has it already */ + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, DEF_REP_MD_SIZE); /* llog cookies are always 0, the field is kept for compatibility */ - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD @@ -2092,46 +2620,48 @@ static int mdt_reint_internal(struct mdt_thread_info *info, req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); - rc = req_capsule_server_pack(pill); - if (rc != 0) { - CERROR("Can't pack response, rc %d\n", rc); - RETURN(err_serious(rc)); - } + mdt_preset_secctx_size(info); + + rc = req_capsule_server_pack(pill); + if (rc != 0) { + CERROR("Can't pack response, rc %d\n", rc); + RETURN(err_serious(rc)); + } - if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) { - repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); - LASSERT(repbody); + if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) { + repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); + LASSERT(repbody); repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - } + } - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); - /* for replay no cookkie / lmm need, because client have this already */ - if (info->mti_spec.no_create) - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0); + /* for replay no cookkie / lmm need, because client have this already */ + if (info->mti_spec.no_create) + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0); - rc = mdt_init_ucred_reint(info); - if (rc) - GOTO(out_shrink, rc); + rc = mdt_init_ucred_reint(info); + if (rc) + GOTO(out_shrink, rc); - rc = mdt_fix_attr_ucred(info, op); - if (rc != 0) - GOTO(out_ucred, rc = err_serious(rc)); + rc = mdt_fix_attr_ucred(info, op); + if (rc != 0) + GOTO(out_ucred, rc = err_serious(rc)); rc = mdt_check_resent(info, mdt_reconstruct, lhc); if (rc < 0) { GOTO(out_ucred, rc); } else if (rc == 1) { - DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); + DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt"); rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); - GOTO(out_ucred, rc); - } - rc = mdt_reint_rec(info, lhc); - EXIT; + GOTO(out_ucred, rc); + } + rc = mdt_reint_rec(info, lhc); + EXIT; out_ucred: - mdt_exit_ucred(info); + mdt_exit_ucred(info); out_shrink: mdt_client_compatibility(info); @@ -2141,14 +2671,11 @@ out_shrink: /* * Data-on-MDT optimization - read data along with OPEN and return it - * in reply. Do that only if we have both DOM and LAYOUT locks. + * in reply when possible. */ - if (rc == 0 && op == REINT_OPEN && - info->mti_attr.ma_lmm != NULL && - mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) { + if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req)) rc = mdt_dom_read_on_open(info, info->mti_mdt, &lhc->mlh_reg_lh); - } return rc; } @@ -2216,7 +2743,7 @@ static int mdt_reint(struct tgt_session_info *tsi) } /* this should sync the whole device */ -static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) +int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) { struct dt_device *dt = mdt->mdt_bottom; int rc; @@ -2230,7 +2757,7 @@ static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, struct mdt_object *mo) { - int rc; + int rc = 0; ENTRY; @@ -2241,7 +2768,16 @@ static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, RETURN(-ESTALE); } - rc = mo_object_sync(env, mdt_object_child(mo)); + if (S_ISREG(lu_object_attr(&mo->mot_obj))) { + struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt; + dt_obj_version_t version; + + version = dt_version_get(env, mdt_obj2dt(mo)); + if (version > tgt->lut_obd->obd_last_committed) + rc = mo_object_sync(env, mdt_object_child(mo)); + } else { + rc = mo_object_sync(env, mdt_object_child(mo)); + } RETURN(rc); } @@ -2251,6 +2787,7 @@ static int mdt_sync(struct tgt_session_info *tsi) struct ptlrpc_request *req = tgt_ses_req(tsi); struct req_capsule *pill = tsi->tsi_pill; struct mdt_body *body; + ktime_t kstart = ktime_get(); int rc; ENTRY; @@ -2263,6 +2800,9 @@ static int mdt_sync(struct tgt_session_info *tsi) } else { struct mdt_thread_info *info = tsi2mdt_info(tsi); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); + /* sync an object */ rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, info->mti_object); @@ -2284,7 +2824,8 @@ static int mdt_sync(struct tgt_session_info *tsi) mdt_thread_info_fini(info); } if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_SYNC); + mdt_counter_incr(req, LPROC_MDT_SYNC, + ktime_us_delta(ktime_get(), kstart)); RETURN(rc); } @@ -2343,17 +2884,17 @@ put: */ static int mdt_quotactl(struct tgt_session_info *tsi) { - struct obd_export *exp = tsi->tsi_exp; - struct req_capsule *pill = tsi->tsi_pill; - struct obd_quotactl *oqctl, *repoqc; - int id, rc; - struct mdt_device *mdt = mdt_exp2dev(exp); - struct lu_device *qmt = mdt->mdt_qmt_dev; - struct lu_nodemap *nodemap; + struct obd_export *exp = tsi->tsi_exp; + struct req_capsule *pill = tsi->tsi_pill; + struct obd_quotactl *oqctl, *repoqc; + int id, rc; + struct mdt_device *mdt = mdt_exp2dev(exp); + struct lu_device *qmt = mdt->mdt_qmt_dev; + struct lu_nodemap *nodemap; ENTRY; oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL); - if (oqctl == NULL) + if (!oqctl) RETURN(err_serious(-EPROTO)); rc = req_capsule_server_pack(pill); @@ -2369,20 +2910,28 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_SETINFO: case Q_SETQUOTA: case LUSTRE_Q_SETDEFAULT: + case LUSTRE_Q_SETQUOTAPOOL: + case LUSTRE_Q_SETINFOPOOL: if (!nodemap_can_setquota(nodemap)) GOTO(out_nodemap, rc = -EPERM); + /* fallthrough */ case Q_GETINFO: case Q_GETQUOTA: case LUSTRE_Q_GETDEFAULT: + case LUSTRE_Q_GETQUOTAPOOL: + case LUSTRE_Q_GETINFOPOOL: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ + /* fallthrough */ case Q_GETOINFO: case Q_GETOQUOTA: break; default: - CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd); - GOTO(out_nodemap, rc = -EFAULT); + rc = -EFAULT; + CERROR("%s: unsupported quotactl command %d: rc = %d\n", + mdt_obd_name(mdt), oqctl->qc_cmd, rc); + GOTO(out_nodemap, rc); } id = oqctl->qc_id; @@ -2425,6 +2974,10 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_GETQUOTA: case LUSTRE_Q_SETDEFAULT: case LUSTRE_Q_GETDEFAULT: + case LUSTRE_Q_SETQUOTAPOOL: + case LUSTRE_Q_GETQUOTAPOOL: + case LUSTRE_Q_SETINFOPOOL: + case LUSTRE_Q_GETINFOPOOL: /* forward quotactl request to QMT */ rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); break; @@ -2444,8 +2997,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi) if (oqctl->qc_id != id) swap(oqctl->qc_id, id); - *repoqc = *oqctl; - + QCTL_COPY(repoqc, oqctl); EXIT; out_nodemap: @@ -2632,6 +3184,7 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, { struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd; struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + struct ldlm_cb_set_arg *arg = data; bool commit_async = false; int rc; ENTRY; @@ -2644,17 +3197,22 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, unlock_res_and_lock(lock); RETURN(0); } - /* There is no lock conflict if l_blocking_lock == NULL, - * it indicates a blocking ast sent from ldlm_lock_decref_internal - * when the last reference to a local lock was released */ - if (lock->l_req_mode & (LCK_PW | LCK_EX) && - lock->l_blocking_lock != NULL) { + + /* A blocking ast may be sent from ldlm_lock_decref_internal + * when the last reference to a local lock was released and + * during blocking event from ldlm_work_bl_ast_lock(). + * The 'data' parameter is l_ast_data in the first case and + * callback arguments in the second one. Distinguish them by that. + */ + if (!data || data == lock->l_ast_data || !arg->bl_desc) + goto skip_cos_checks; + + if (lock->l_req_mode & (LCK_PW | LCK_EX)) { if (mdt_cos_is_enabled(mdt)) { - if (lock->l_client_cookie != - lock->l_blocking_lock->l_client_cookie) + if (!arg->bl_desc->bl_same_client) mdt_set_lock_sync(lock); } else if (mdt_slc_is_enabled(mdt) && - ldlm_is_cos_incompat(lock->l_blocking_lock)) { + arg->bl_desc->bl_cos_incompat) { mdt_set_lock_sync(lock); /* * we may do extra commit here, but there is a small @@ -2668,11 +3226,11 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, */ commit_async = true; } - } else if (lock->l_req_mode == LCK_COS && - lock->l_blocking_lock != NULL) { + } else if (lock->l_req_mode == LCK_COS) { commit_async = true; } +skip_cos_checks: rc = ldlm_blocking_ast_nocheck(lock); if (commit_async) { @@ -2739,10 +3297,9 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, rc = lu_env_init(&env, LCT_MD_THREAD); if (unlikely(rc != 0)) { - CWARN("%s: lu_env initialization failed, object" - "%p "DFID" is leaked!\n", + CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n", obd->obd_name, mo, - PFID(mdt_object_fid(mo))); + PFID(mdt_object_fid(mo)), rc); RETURN(rc); } @@ -2798,6 +3355,11 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, return 1; } +static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock) +{ + mdt_object_get(NULL, lock->l_ast_data); +} + int mdt_remote_object_lock_try(struct mdt_thread_info *mti, struct mdt_object *o, const struct lu_fid *fid, struct lustre_handle *lh, enum ldlm_mode mode, @@ -2826,8 +3388,8 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, * if we cache lock, couple lock with mdt_object, so that object * can be easily found in lock ASTs. */ - mdt_object_get(mti->mti_env, o); einfo->ei_cbdata = o; + einfo->ei_cb_created = mdt_remote_object_lock_created_cb; } memset(policy, 0, sizeof(*policy)); @@ -2836,8 +3398,6 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, policy); - if (rc < 0 && cache) - mdt_object_put(mti->mti_env, o); /* Return successfully acquired bits to a caller */ if (rc == 0) { @@ -2858,15 +3418,14 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, cache); } -static int mdt_object_local_lock(struct mdt_thread_info *info, - struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) +int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 *ibits, + __u64 trybits, bool cos_incompat) { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0; + __u64 dlmflags = 0, *cookie = NULL; int rc; ENTRY; @@ -2886,6 +3445,11 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, /* Only enqueue LOOKUP lock for remote object */ LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP)); + /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */ + if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX && + *ibits == MDS_INODELOCK_OPEN) + dlmflags |= LDLM_FL_CANCEL_ON_BLOCK; + if (lh->mlh_type == MDT_PDO_LOCK) { /* check for exists after object is locked */ if (mdt_object_exists(o) == 0) { @@ -2898,10 +3462,12 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, } } - fid_build_reg_res_name(mdt_object_fid(o), res_id); dlmflags |= LDLM_FL_ATOMIC_CB; + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + /* * Take PDO lock on whole directory and build correct @res_id for lock * on part of directory. @@ -2922,10 +3488,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, /* at least one of them should be set */ LASSERT(policy->l_inodebits.bits | policy->l_inodebits.try_bits); - rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, policy, res_id, + dlmflags, cookie); if (unlikely(rc != 0)) GOTO(out_unlock, rc); } @@ -2945,10 +3510,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, + policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, + cookie); out_unlock: if (rc != 0) mdt_object_unlock(info, o, lh, 1); @@ -3006,8 +3570,9 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, * object anyway XXX*/ if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) { - CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to" - "EX lock.\n", mdt_obd_name(info->mti_mdt), + CDEBUG(D_INFO, + "%s: "DFID" convert PDO lock to EX lock.\n", + mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(o))); lh->mlh_pdo_hash = 0; lh->mlh_rreg_mode = LCK_EX; @@ -3025,6 +3590,10 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, } } + /* other components like LFSCK can use lockless access + * and populate cache, so we better invalidate it */ + mo_invalidate(info->mti_env, mdt_object_child(o)); + RETURN(0); } @@ -3143,20 +3712,18 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info, if (lustre_handle_is_used(h)) { struct ldlm_lock *lock = ldlm_handle2lock(h); + struct ptlrpc_request *req = mdt_info_req(info); if (o != NULL && (lock->l_policy_data.l_inodebits.bits & (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE))) mo_invalidate(info->mti_env, mdt_object_child(o)); - if (decref || !info->mti_has_trans || + if (decref || !info->mti_has_trans || !req || !(mode & (LCK_PW | LCK_EX))) { ldlm_lock_decref_and_cancel(h, mode); LDLM_LOCK_PUT(lock); } else { - struct ptlrpc_request *req = mdt_info_req(info); - - LASSERT(req != NULL); tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock, req->rq_transno); ldlm_lock_decref(h, mode); @@ -3231,7 +3798,7 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, * - create lu_object, corresponding to the fid in mdt_body, and save it in * @info; * - * - if HABEO_CORPUS flag is set for this request type check whether object + * - if HAS_BODY flag is set for this request type check whether object * actually exists on storage (lu_object_exists()). * */ @@ -3262,7 +3829,7 @@ static int mdt_body_unpack(struct mdt_thread_info *info, obj = mdt_object_find(env, info->mti_mdt, &body->mbo_fid1); if (!IS_ERR(obj)) { - if ((flags & HABEO_CORPUS) && !mdt_object_exists(obj)) { + if ((flags & HAS_BODY) && !mdt_object_exists(obj)) { mdt_object_put(env, obj); rc = -ENOENT; } else { @@ -3278,21 +3845,22 @@ static int mdt_body_unpack(struct mdt_thread_info *info, static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, enum tgt_handler_flags flags) { - struct req_capsule *pill = info->mti_pill; - int rc; - ENTRY; + struct req_capsule *pill = info->mti_pill; + int rc; + + ENTRY; - if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) - rc = mdt_body_unpack(info, flags); - else - rc = 0; + if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) + rc = mdt_body_unpack(info, flags); + else + rc = 0; - if (rc == 0 && (flags & HABEO_REFERO)) { - /* Pack reply. */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, + if (rc == 0 && (flags & HAS_REPLY)) { + /* Pack reply. */ + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, DEF_REP_MD_SIZE); - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); @@ -3303,9 +3871,14 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); - rc = req_capsule_server_pack(pill); - } - RETURN(rc); + mdt_preset_secctx_size(info); + + rc = req_capsule_server_pack(pill); + if (rc) + CWARN("%s: cannot pack response: rc = %d\n", + mdt_obd_name(info->mti_mdt), rc); + } + RETURN(rc); } void mdt_lock_handle_init(struct mdt_lock_handle *lh) @@ -3365,7 +3938,6 @@ void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_spec.no_create = 0; info->mti_spec.sp_rm_entry = 0; info->mti_spec.sp_permitted = 0; - info->mti_spec.sp_migrate_close = 0; info->mti_spec.u.sp_ea.eadata = NULL; info->mti_spec.u.sp_ea.eadatalen = 0; @@ -3385,6 +3957,7 @@ void mdt_thread_info_fini(struct mdt_thread_info *info) info->mti_env = NULL; info->mti_pill = NULL; info->mti_exp = NULL; + info->mti_mdt = NULL; if (unlikely(info->mti_big_buf.lb_buf != NULL)) lu_buf_free(&info->mti_big_buf); @@ -3412,10 +3985,8 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi) { if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) && cfs_fail_val == - tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) { - set_current_state(TASK_UNINTERRUPTIBLE); - schedule_timeout(msecs_to_jiffies(3 * MSEC_PER_SEC)); - } + tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) + schedule_timeout_uninterruptible(cfs_time_seconds(3)); return tgt_connect(tsi); } @@ -3560,7 +4131,7 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, * If the xid matches, then we know this is a resent request, and allow * it. (It's probably an OPEN, for which we don't send a lock. */ - if (req_can_reconstruct(req, NULL)) + if (req_can_reconstruct(req, NULL) != 0) return; /* @@ -3606,7 +4177,10 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, if (ldlm_rep == NULL || OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) { mdt_object_unlock(info, info->mti_object, lhc, 1); - RETURN(err_serious(-EFAULT)); + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); } ldlm_rep->lock_policy_res2 = clear_serious(rc); @@ -3699,13 +4273,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, struct ldlm_lock **lockp, __u64 flags) { - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT]; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP }; struct layout_intent *intent; + struct ldlm_reply *ldlm_rep; struct lu_fid *fid = &info->mti_tmp_fid2; struct mdt_object *obj = NULL; int layout_size = 0; + struct lu_buf *buf = &layout.mlc_buf; int rc = 0; + ENTRY; fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); @@ -3733,24 +4310,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, case LAYOUT_INTENT_RESTORE: CERROR("%s: Unsupported layout intent opc %d\n", mdt_obd_name(info->mti_mdt), intent->li_opc); - rc = -ENOTSUPP; - break; + RETURN(-ENOTSUPP); default: CERROR("%s: Unknown layout intent opc %d\n", mdt_obd_name(info->mti_mdt), intent->li_opc); - rc = -EINVAL; - break; + RETURN(-EINVAL); } - if (rc < 0) - RETURN(rc); - - /* Get lock from request for possible resent case. */ - mdt_intent_fixup_resent(info, *lockp, lhc, flags); obj = mdt_object_find(info->mti_env, info->mti_mdt, fid); if (IS_ERR(obj)) - GOTO(out, rc = PTR_ERR(obj)); - + RETURN(PTR_ERR(obj)); if (mdt_object_exists(obj) && !mdt_object_remote(obj)) { /* if layout is going to be changed don't use the current EA @@ -3762,82 +4331,80 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, } else { layout_size = mdt_attr_get_eabuf_size(info, obj); if (layout_size < 0) - GOTO(out_obj, rc = layout_size); + GOTO(out, rc = layout_size); if (layout_size > info->mti_mdt->mdt_max_mdsize) info->mti_mdt->mdt_max_mdsize = layout_size; } + CDEBUG(D_INFO, "%s: layout_size %d\n", + mdt_obd_name(info->mti_mdt), layout_size); } /* * set reply buffer size, so that ldlm_handle_enqueue0()-> * ldlm_lvbo_fill() will fill the reply buffer with lovea. */ - (*lockp)->l_lvb_type = LVB_T_LAYOUT; req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER, layout_size); rc = req_capsule_server_pack(info->mti_pill); if (rc) - GOTO(out_obj, rc); + GOTO(out, rc); + ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (!ldlm_rep) + GOTO(out, rc = -EPROTO); - if (layout.mlc_opc != MD_LAYOUT_NOP) { - struct lu_buf *buf = &layout.mlc_buf; + mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); - /** - * mdt_layout_change is a reint operation, when the request - * is resent, layout write shouldn't reprocess it again. - */ - rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc); - if (rc) - GOTO(out_obj, rc = rc < 0 ? rc : 0); + /* take lock in ldlm_lock_enqueue() for LAYOUT_INTENT_ACCESS */ + if (layout.mlc_opc == MD_LAYOUT_NOP) + GOTO(out, rc = 0); - /** - * There is another resent case: the client's job has been - * done by another client, referring lod_declare_layout_change - * -EALREADY case, and it became a operation w/o transaction, - * so we should not do the layout change, otherwise - * mdt_layout_change() will try to cancel the granted server - * CR lock whose remote counterpart is still in hold on the - * client, and a deadlock ensues. - */ - rc = mdt_check_resent_lock(info, obj, lhc); - if (rc <= 0) - GOTO(out_obj, rc); - - buf->lb_buf = NULL; - buf->lb_len = 0; - if (unlikely(req_is_replay(mdt_info_req(info)))) { - buf->lb_buf = req_capsule_client_get(info->mti_pill, - &RMF_EADATA); - buf->lb_len = req_capsule_get_size(info->mti_pill, - &RMF_EADATA, RCL_CLIENT); - /* - * If it's a replay of layout write intent RPC, the - * client has saved the extended lovea when - * it get reply then. - */ - if (buf->lb_len > 0) - mdt_fix_lov_magic(info, buf->lb_buf); - } + rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc); + if (rc < 0) + GOTO(out, rc); + if (rc == 1) { + DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); + rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); + GOTO(out, rc); + } + + buf->lb_buf = NULL; + buf->lb_len = 0; + if (unlikely(req_is_replay(mdt_info_req(info)))) { + buf->lb_buf = req_capsule_client_get(info->mti_pill, + &RMF_EADATA); + buf->lb_len = req_capsule_get_size(info->mti_pill, + &RMF_EADATA, RCL_CLIENT); /* - * Instantiate some layout components, if @buf contains - * lovea, then it's a replay of the layout intent write - * RPC. + * If it's a replay of layout write intent RPC, the client has + * saved the extended lovea when it get reply then. */ - rc = mdt_layout_change(info, obj, &layout); - if (rc) - GOTO(out_obj, rc); + if (buf->lb_len > 0) + mdt_fix_lov_magic(info, buf->lb_buf); } -out_obj: - mdt_object_put(info->mti_env, obj); - if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh)) + /* Get lock from request for possible resent case. */ + mdt_intent_fixup_resent(info, *lockp, lhc, flags); + (*lockp)->l_lvb_type = LVB_T_LAYOUT; + + /* + * Instantiate some layout components, if @buf contains lovea, then it's + * a replay of the layout intent write RPC. + */ + rc = mdt_layout_change(info, obj, lhc, &layout); + ldlm_rep->lock_policy_res2 = clear_serious(rc); + + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc); + if (rc == ELDLM_LOCK_REPLACED && + (*lockp)->l_granted_mode == LCK_EX) + ldlm_lock_mode_downgrade(*lockp, LCK_CR); + } + EXIT; out: - lhc->mlh_reg_lh.cookie = 0; - + mdt_object_put(info->mti_env, obj); return rc; } @@ -3867,11 +4434,15 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, rc = mdt_reint_internal(info, lhc, opc); - /* Check whether the reply has been packed successfully. */ - if (mdt_info_req(info)->rq_repmsg != NULL) - rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); - if (rep == NULL) - RETURN(err_serious(-EFAULT)); + /* Check whether the reply has been packed successfully. */ + if (mdt_info_req(info)->rq_repmsg != NULL) + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (rep == NULL) { + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); + } /* MDC expects this in any case */ if (rc != 0) @@ -3925,6 +4496,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, u64); enum tgt_handler_flags it_handler_flags = 0; struct ldlm_reply *rep; + bool check_mdt_object = false; int rc; ENTRY; @@ -3932,7 +4504,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, case IT_OPEN: case IT_OPEN|IT_CREAT: /* - * OCREAT is not a MUTABOR request since the file may + * OCREAT is not a IS_MUTABLE request since the file may * already exist. We do the extra check of * OBD_CONNECT_RDONLY in mdt_reint_open() when we * really need to create the object. @@ -3941,15 +4513,18 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, it_handler = &mdt_intent_open; break; case IT_GETATTR: + check_mdt_object = true; + /* fallthrough */ case IT_LOOKUP: it_format = &RQF_LDLM_INTENT_GETATTR; it_handler = &mdt_intent_getattr; - it_handler_flags = HABEO_REFERO; + it_handler_flags = HAS_REPLY; break; case IT_GETXATTR: + check_mdt_object = true; it_format = &RQF_LDLM_INTENT_GETXATTR; it_handler = &mdt_intent_getxattr; - it_handler_flags = HABEO_CORPUS; + it_handler_flags = HAS_BODY; break; case IT_LAYOUT: it_format = &RQF_LDLM_INTENT_LAYOUT; @@ -3992,7 +4567,10 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, if (rc < 0) RETURN(rc); - if (it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export)) + if (unlikely(info->mti_object == NULL && check_mdt_object)) + RETURN(-EPROTO); + + if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export)) RETURN(-EROFS); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); @@ -4022,9 +4600,12 @@ static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req, LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE)); } -static int mdt_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - enum ldlm_mode mode, __u64 flags, void *data) +static int mdt_intent_policy(const struct lu_env *env, + struct ldlm_namespace *ns, + struct ldlm_lock **lockp, + void *req_cookie, + enum ldlm_mode mode, + __u64 flags, void *data) { struct tgt_session_info *tsi; struct mdt_thread_info *info; @@ -4038,7 +4619,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, LASSERT(req != NULL); - tsi = tgt_ses_info(req->rq_svc_thread->t_env); + tsi = tgt_ses_info(env); info = tsi2mdt_info(tsi); LASSERT(info != NULL); @@ -4197,9 +4778,8 @@ out_free: */ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) { - struct seq_server_site *ss = mdt_seq_site(mdt); - int rc; - char *prefix; + struct seq_server_site *ss = mdt_seq_site(mdt); + char *prefix; ENTRY; /* check if this is adding the first MDC and controller is not yet @@ -4217,19 +4797,12 @@ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) /* Note: seq_client_fini will be called in seq_site_fini */ snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt)); - rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA, - prefix, ss->ss_node_id == 0 ? ss->ss_control_seq : + seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA, + prefix, ss->ss_node_id == 0 ? ss->ss_control_seq : NULL); OBD_FREE(prefix, MAX_OBD_NAME + 5); - if (rc != 0) { - OBD_FREE_PTR(ss->ss_client_seq); - ss->ss_client_seq = NULL; - RETURN(rc); - } - rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq); - - RETURN(rc); + RETURN(seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq)); } static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt) @@ -4682,9 +5255,12 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_qmt_dev = obd->obd_lu_dev; /* configure local quota objects */ - rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env, - &mdt->mdt_lu_dev, - mdt->mdt_qmt_dev); + if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_INIT)) + rc = -EBADF; + else + rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env, + &mdt->mdt_lu_dev, + mdt->mdt_qmt_dev); if (rc) GOTO(class_cleanup, rc); @@ -4704,6 +5280,7 @@ class_cleanup: if (rc) { class_manual_cleanup(obd); mdt->mdt_qmt_dev = NULL; + GOTO(lcfg_cleanup, rc); } class_detach: if (rc) @@ -4748,12 +5325,25 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi) struct mdt_thread_info *info = tsi2mdt_info(tsi); int rc; + if (unlikely(info->mti_object == NULL)) + return -EPROTO; + rc = mdt_getxattr(info); mdt_thread_info_fini(info); return rc; } +static int mdt_llog_open(struct tgt_session_info *tsi) +{ + ENTRY; + + if (!mdt_is_rootadmin(tsi2mdt_info(tsi))) + RETURN(err_serious(-EACCES)); + + RETURN(tgt_llog_open(tsi)); +} + #define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET #define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET #define OST_BRW_READ OST_READ @@ -4767,47 +5357,48 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC, 0, MDS_DISCONNECT, tgt_disconnect, &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION), TGT_RPC_HANDLER(MDS_FIRST_OPC, - HABEO_REFERO, MDS_SET_INFO, mdt_set_info, - &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION), + HAS_REPLY, MDS_SET_INFO, mdt_set_info, + &RQF_MDT_SET_INFO, LUSTRE_MDS_VERSION), TGT_MDT_HDL(0, MDS_GET_INFO, mdt_get_info), -TGT_MDT_HDL(0 | HABEO_REFERO, MDS_GET_ROOT, mdt_get_root), -TGT_MDT_HDL(HABEO_CORPUS, MDS_GETATTR, mdt_getattr), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, +TGT_MDT_HDL(HAS_REPLY, MDS_GET_ROOT, mdt_get_root), +TGT_MDT_HDL(HAS_BODY, MDS_GETATTR, mdt_getattr), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_GETATTR_NAME, mdt_getattr_name), -TGT_MDT_HDL(HABEO_CORPUS, MDS_GETXATTR, mdt_tgt_getxattr), -TGT_MDT_HDL(0 | HABEO_REFERO, MDS_STATFS, mdt_statfs), -TGT_MDT_HDL(0 | MUTABOR, MDS_REINT, mdt_reint), -TGT_MDT_HDL(HABEO_CORPUS, MDS_CLOSE, mdt_close), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_READPAGE, mdt_readpage), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_SYNC, mdt_sync), +TGT_MDT_HDL(HAS_BODY, MDS_GETXATTR, mdt_tgt_getxattr), +TGT_MDT_HDL(HAS_REPLY, MDS_STATFS, mdt_statfs), +TGT_MDT_HDL(IS_MUTABLE, MDS_REINT, mdt_reint), +TGT_MDT_HDL(HAS_BODY, MDS_CLOSE, mdt_close), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_READPAGE, mdt_readpage), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_SYNC, mdt_sync), TGT_MDT_HDL(0, MDS_QUOTACTL, mdt_quotactl), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_PROGRESS, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_PROGRESS, mdt_hsm_progress), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_REGISTER, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_REGISTER, mdt_hsm_ct_register), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_UNREGISTER, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_UNREGISTER, mdt_hsm_ct_unregister), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_STATE_GET, mdt_hsm_state_get), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_STATE_SET, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_STATE_SET, mdt_hsm_state_set), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_ACTION, mdt_hsm_action), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_REQUEST, mdt_hsm_request), -TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR, +TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_SWAP_LAYOUTS, mdt_swap_layouts), +TGT_MDT_HDL(IS_MUTABLE, MDS_RMFID, mdt_rmfid), }; static struct tgt_handler mdt_io_ops[] = { -TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read, +TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY, OST_BRW_READ, tgt_brw_read, mdt_hp_brw), -TGT_OST_HDL_HP(HABEO_CORPUS | MUTABOR, OST_BRW_WRITE, tgt_brw_write, +TGT_OST_HDL_HP(HAS_BODY | IS_MUTABLE, OST_BRW_WRITE, tgt_brw_write, mdt_hp_brw), -TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO | MUTABOR, +TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_PUNCH, mdt_punch_hdl, mdt_hp_punch), -TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC, mdt_data_sync), +TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC, mdt_data_sync), }; static struct tgt_handler mdt_sec_ctx_ops[] = { @@ -4817,7 +5408,14 @@ TGT_SEC_HDL_VAR(0, SEC_CTX_FINI, mdt_sec_ctx_handle) }; static struct tgt_handler mdt_quota_ops[] = { -TGT_QUOTA_HDL(HABEO_REFERO, QUOTA_DQACQ, mdt_quota_dqacq), +TGT_QUOTA_HDL(HAS_REPLY, QUOTA_DQACQ, mdt_quota_dqacq), +}; + +static struct tgt_handler mdt_llog_handlers[] = { + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE, mdt_llog_open), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, tgt_llog_next_block), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK, tgt_llog_prev_block), }; static struct tgt_opc_slice mdt_common_slice[] = { @@ -4864,7 +5462,7 @@ static struct tgt_opc_slice mdt_common_slice[] = { { .tos_opc_start = LLOG_FIRST_OPC, .tos_opc_end = LLOG_LAST_OPC, - .tos_hs = tgt_llog_handlers + .tos_hs = mdt_llog_handlers }, { .tos_opc_start = LFSCK_FIRST_OPC, @@ -4894,11 +5492,13 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop); mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child)); + + mdt_restriper_stop(m); ping_evictor_stop(); /* Remove the HSM /proc entry so the coordinator cannot be - * restarted by a user while it's shutting down. */ - hsm_cdt_procfs_fini(m); + * restarted by a user while it's shutting down. + */ mdt_hsm_cdt_stop(m); mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT); @@ -4918,7 +5518,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) /* Calling the cleanup functions in the same order as in the mdt_init0 * error path */ - mdt_procfs_fini(m); + mdt_tunables_fini(m); target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); @@ -5003,7 +5603,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); - m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */ + m->mdt_max_mdsize = MAX_MD_SIZE_OLD; m->mdt_opts.mo_evict_tgt_nids = 1; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; @@ -5013,16 +5613,19 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, RETURN(-EFAULT); } else { lsi = s2lsi(lmi->lmi_sb); + LASSERT(lsi->lsi_lmd); /* CMD is supported only in IAM mode */ LASSERT(num); - node_id = simple_strtol(num, NULL, 10); + rc = kstrtol(num, 10, &node_id); + if (rc) + RETURN(rc); + obd->u.obt.obt_magic = OBT_MAGIC; - if (lsi->lsi_lmd != NULL && - lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK) + if (lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK) m->mdt_skip_lfsck = 1; } - /* DoM files get IO lock at open by default */ + /* DoM files get IO lock at open optionally by default */ m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN; /* DoM files are read at open and data is packed in the reply */ m->mdt_opts.mo_dom_read_open = 1; @@ -5030,10 +5633,17 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); - init_rwsem(&m->mdt_squash.rsi_sem); + spin_lock_init(&m->mdt_squash.rsi_lock); spin_lock_init(&m->mdt_lock); - m->mdt_enable_remote_dir = 0; + m->mdt_enable_remote_dir = 1; + m->mdt_enable_striped_dir = 1; + m->mdt_enable_dir_migration = 1; + m->mdt_enable_dir_restripe = 0; + m->mdt_enable_dir_auto_split = 0; m->mdt_enable_remote_dir_gid = 0; + m->mdt_enable_chprojid_gid = 0; + m->mdt_enable_remote_rename = 1; + m->mdt_dir_restripe_nsonly = 1; atomic_set(&m->mdt_mds_mds_conns, 0); atomic_set(&m->mdt_async_commit_count, 0); @@ -5106,8 +5716,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_free_ns, rc); /* Amount of available space excluded from granting and reserved - * for metadata. It is in percentage and 50% is default value. */ - tgd->tgd_reserved_pcnt = 50; + * for metadata. It is a percentage of the total MDT size. */ + tgd->tgd_reserved_pcnt = 10; if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits)) m->mdt_brw_size = 1U << tgd->tgd_blockbits; @@ -5165,7 +5775,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_free_hsm, rc); } - rc = mdt_procfs_init(m, dev); + rc = mdt_tunables_init(m, dev); if (rc) { CERROR("Can't init MDT lprocfs, rc %d\n", rc); GOTO(err_recovery, rc); @@ -5191,11 +5801,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT) ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT; + if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV)) + m->mdt_lut.lut_local_recovery = 1; + + rc = mdt_restriper_start(m); + if (rc) + GOTO(err_ping_evictor, rc); + RETURN(0); + +err_ping_evictor: + ping_evictor_stop(); err_procfs: - mdt_procfs_fini(m); + mdt_tunables_fini(m); err_recovery: - target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; err_free_hsm: @@ -5206,6 +5825,11 @@ err_los_fini: err_fs_cleanup: mdt_fs_cleanup(env, m); err_tgt: + /* keep recoverable clients */ + obd->obd_fail = 1; + target_recovery_fini(obd); + obd_exports_barrier(obd); + obd_zombie_barrier(); tgt_fini(env, &m->mdt_lut); err_free_ns: ldlm_namespace_free(m->mdt_namespace, NULL, 0); @@ -5247,12 +5871,12 @@ static int mdt_process_config(const struct lu_env *env, switch (cfg->lcfg_command) { case LCFG_PARAM: { - struct obd_device *obd = d->ld_obd; - + struct obd_device *obd = d->ld_obd; /* For interoperability */ - struct cfg_interop_param *ptr = NULL; - struct lustre_cfg *old_cfg = NULL; - char *param = NULL; + struct cfg_interop_param *ptr = NULL; + struct lustre_cfg *old_cfg = NULL; + char *param = NULL; + ssize_t count; param = lustre_cfg_string(cfg, 1); if (param == NULL) { @@ -5281,17 +5905,22 @@ static int mdt_process_config(const struct lu_env *env, } } - rc = class_process_proc_param(PARAM_MDT, obd->obd_vars, - cfg, obd); - if (rc > 0 || rc == -ENOSYS) { + count = class_modify_config(cfg, PARAM_MDT, + &obd->obd_kset.kobj); + if (count < 0) { + struct coordinator *cdt = &m->mdt_coordinator; + /* is it an HSM var ? */ - rc = class_process_proc_param(PARAM_HSM, - hsm_cdt_get_proc_vars(), - cfg, obd); - if (rc > 0 || rc == -ENOSYS) + count = class_modify_config(cfg, PARAM_HSM, + &cdt->cdt_hsm_kobj); + if (count < 0) /* we don't understand; pass it on */ rc = next->ld_ops->ldo_process_config(env, next, cfg); + else + rc = count > 0 ? 0 : count; + } else { + rc = count > 0 ? 0 : count; } if (old_cfg) @@ -5332,6 +5961,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); atomic_set(&mo->mot_open_count, 0); + mo->mot_restripe_offset = 0; + INIT_LIST_HEAD(&mo->mot_restripe_linkage); RETURN(o); } RETURN(NULL); @@ -5359,22 +5990,31 @@ static int mdt_object_init(const struct lu_env *env, struct lu_object *o, RETURN(rc); } +static void mdt_object_free_rcu(struct rcu_head *head) +{ + struct mdt_object *mo = container_of(head, struct mdt_object, + mot_header.loh_rcu); + + kmem_cache_free(mdt_object_kmem, mo); +} + static void mdt_object_free(const struct lu_env *env, struct lu_object *o) { - struct mdt_object *mo = mdt_obj(o); - struct lu_object_header *h; - ENTRY; + struct mdt_object *mo = mdt_obj(o); + struct lu_object_header *h; + ENTRY; - h = o->lo_header; - CDEBUG(D_INFO, "object free, fid = "DFID"\n", - PFID(lu_object_fid(o))); + h = o->lo_header; + CDEBUG(D_INFO, "object free, fid = "DFID"\n", + PFID(lu_object_fid(o))); LASSERT(atomic_read(&mo->mot_open_count) == 0); LASSERT(atomic_read(&mo->mot_lease_count) == 0); lu_object_fini(o); lu_object_header_fini(h); - OBD_SLAB_FREE_PTR(mo, mdt_object_kmem); + OBD_FREE_PRE(mo, sizeof(*mo), "slab-freed"); + call_rcu(&mo->mot_header.loh_rcu, mdt_object_free_rcu); EXIT; } @@ -5473,6 +6113,18 @@ static int mdt_obd_set_info_async(const struct lu_env *env, RETURN(0); } +static inline void mdt_enable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING; +} + +static inline void mdt_disable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER; +} + /** * Match client and server connection feature flags. * @@ -5633,6 +6285,12 @@ static int mdt_connect_internal(const struct lu_env *env, exp->exp_obd->obd_name, obd_export_nid2str(exp)); } + if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && + !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { + atomic_inc(&mdt->mdt_mds_mds_conns); + mdt_enable_slc(mdt); + } + return 0; } @@ -5663,7 +6321,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, static int mdt_export_cleanup(struct obd_export *exp) { - struct list_head closing_list; + LIST_HEAD(closing_list); struct mdt_export_data *med = &exp->exp_mdt_data; struct obd_device *obd = exp->exp_obd; struct mdt_device *mdt; @@ -5673,7 +6331,6 @@ static int mdt_export_cleanup(struct obd_export *exp) int rc = 0; ENTRY; - INIT_LIST_HEAD(&closing_list); spin_lock(&med->med_open_lock); while (!list_empty(&med->med_open_head)) { struct list_head *tmp = med->med_open_head.next; @@ -5681,7 +6338,7 @@ static int mdt_export_cleanup(struct obd_export *exp) /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ - class_handle_unhash(&mfd->mfd_handle); + class_handle_unhash(&mfd->mfd_open_handle); list_move_tail(&mfd->mfd_list, &closing_list); } spin_unlock(&med->med_open_lock); @@ -5722,7 +6379,7 @@ static int mdt_export_cleanup(struct obd_export *exp) * archive request into a noop if it's not actually * dirty. */ - if (mfd->mfd_mode & MDS_FMODE_WRITE) + if (mfd->mfd_open_flags & MDS_FMODE_WRITE) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */ @@ -5730,31 +6387,20 @@ static int mdt_export_cleanup(struct obd_export *exp) ma->ma_valid = MA_FLAGS; ma->ma_attr_flags |= MDS_KEEP_ORPHAN; } - mdt_mfd_close(info, mfd); - } - } - info->mti_mdt = NULL; - /* cleanup client slot early */ - /* Do not erase record for recoverable client. */ - if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) + ma->ma_valid |= MA_FORCE_LOG; + mdt_mfd_close(info, mfd); + } + } + info->mti_mdt = NULL; + /* cleanup client slot early */ + /* Do not erase record for recoverable client. */ + if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) tgt_client_del(&env, exp); lu_env_fini(&env); RETURN(rc); } -static inline void mdt_enable_slc(struct mdt_device *mdt) -{ - if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL; -} - -static inline void mdt_disable_slc(struct mdt_device *mdt) -{ - if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; -} - static int mdt_obd_disconnect(struct obd_export *exp) { int rc; @@ -5809,12 +6455,6 @@ static int mdt_obd_connect(const struct lu_env *env, mdt = mdt_dev(obd->obd_lu_dev); - if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && - !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { - atomic_inc(&mdt->mdt_mds_mds_conns); - mdt_enable_slc(mdt); - } - /* * first, check whether the stack is ready to handle requests * XXX: probably not very appropriate method is used now @@ -5911,6 +6551,11 @@ static int mdt_init_export(struct obd_export *exp) exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); + OBD_ALLOC(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + if (exp->exp_used_slots == NULL) + RETURN(-ENOMEM); + /* self-export doesn't need client data and ldlm initialization */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, &exp->exp_client_uuid))) @@ -5929,6 +6574,10 @@ static int mdt_init_export(struct obd_export *exp) err_free: tgt_client_free(exp); err: + OBD_FREE(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + exp->exp_used_slots = NULL; + CERROR("%s: Failed to initialize export: rc = %d\n", exp->exp_obd->obd_name, rc); return rc; @@ -5939,6 +6588,10 @@ static int mdt_destroy_export(struct obd_export *exp) ENTRY; target_destroy_export(exp); + if (exp->exp_used_slots) + OBD_FREE(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + /* destroy can be called from failed obd_setup, so * checking uuid is safer than obd_self_export */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, @@ -6517,7 +7170,7 @@ static int mdt_obd_postrecov(struct obd_device *obd) return rc; } -static struct obd_ops mdt_obd_device_ops = { +static const struct obd_ops mdt_obd_device_ops = { .o_owner = THIS_MODULE, .o_set_info_async = mdt_obd_set_info_async, .o_connect = mdt_obd_connect, @@ -6666,10 +7319,10 @@ static int __init mdt_init(void) { int rc; - CLASSERT(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") == - FID_NOBRACE_LEN + 1); - CLASSERT(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") == - FID_LEN + 1); + BUILD_BUG_ON(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") != + FID_NOBRACE_LEN + 1); + BUILD_BUG_ON(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") != + FID_LEN + 1); rc = lu_kmem_init(mdt_caches); if (rc) return rc;