X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=inline;f=lustre%2Fmdt%2Fmdt_handler.c;h=4caa22a4b90301da10f2e2694aea3ff95490d2a6;hb=4d7b022e373d265f4f3b9d90af44cddd0e65f9ae;hp=9f8590bbb0078eca5055d56a9ae8cdb830638a54;hpb=20ffe2191c828d3e4922429f2a98584d4e41eb53;p=fs%2Flustre-release.git diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 9f8590b..4caa22a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -23,7 +23,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2010, 2016, Intel Corporation. + * Copyright (c) 2010, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -61,12 +61,11 @@ #include #include #include - +#include #include #include "mdt_internal.h" - static unsigned int max_mod_rpcs_per_client = 8; module_param(max_mod_rpcs_per_client, uint, 0644); MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client"); @@ -94,7 +93,6 @@ enum ldlm_mode mdt_dlm_lock_modes[] = { }; static struct mdt_device *mdt_dev(struct lu_device *d); -static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags); static const struct lu_object_operations mdt_obj_ops; @@ -270,18 +268,13 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, { struct mdt_device *mdt = info->mti_mdt; struct lu_name *lname = &info->mti_name; - char *name = NULL; + char *filename = info->mti_filename; struct mdt_object *parent; u32 mode; int rc = 0; LASSERT(!info->mti_cross_ref); - OBD_ALLOC(name, NAME_MAX + 1); - if (name == NULL) - return -ENOMEM; - lname->ln_name = name; - /* * We may want to allow this to mount a completely separate * fileset from the MDT in the future, but keeping it to @@ -317,8 +310,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, break; } - strncpy(name, s1, lname->ln_namelen); - name[lname->ln_namelen] = '\0'; + strncpy(filename, s1, lname->ln_namelen); + filename[lname->ln_namelen] = '\0'; + lname->ln_name = filename; parent = mdt_object_find(info->mti_env, mdt, fid); if (IS_ERR(parent)) { @@ -343,8 +337,6 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, } } - OBD_FREE(name, NAME_MAX + 1); - return rc; } @@ -413,12 +405,16 @@ out: static int mdt_statfs(struct tgt_session_info *tsi) { - struct ptlrpc_request *req = tgt_ses_req(tsi); - struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct md_device *next = info->mti_mdt->mdt_child; - struct ptlrpc_service_part *svcpt; - struct obd_statfs *osfs; - int rc; + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct mdt_device *mdt = info->mti_mdt; + struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; + struct md_device *next = mdt->mdt_child; + struct ptlrpc_service_part *svcpt; + struct obd_statfs *osfs; + struct mdt_body *reqbody = NULL; + struct mdt_statfs_cache *msf; + int rc; ENTRY; @@ -440,24 +436,72 @@ static int mdt_statfs(struct tgt_session_info *tsi) if (!osfs) GOTO(out, rc = -EPROTO); - /** statfs information are cached in the mdt_device */ - if (cfs_time_before_64(info->mti_mdt->mdt_osfs_age, - cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS))) { - /** statfs data is too old, get up-to-date one */ - rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs); - if (rc) - GOTO(out, rc); - spin_lock(&info->mti_mdt->mdt_lock); - info->mti_mdt->mdt_osfs = *osfs; - info->mti_mdt->mdt_osfs_age = cfs_time_current_64(); - spin_unlock(&info->mti_mdt->mdt_lock); - } else { - /** use cached statfs data */ - spin_lock(&info->mti_mdt->mdt_lock); - *osfs = info->mti_mdt->mdt_osfs; - spin_unlock(&info->mti_mdt->mdt_lock); + if (mdt_is_sum_statfs_client(req->rq_export) && + lustre_packed_msg_size(req->rq_reqmsg) == + req_capsule_fmt_size(req->rq_reqmsg->lm_magic, + &RQF_MDS_STATFS_NEW, RCL_CLIENT)) { + req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW); + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); } + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + msf = &mdt->mdt_sum_osfs; + else + msf = &mdt->mdt_osfs; + + if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) { + /** statfs data is too old, get up-to-date one */ + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + rc = next->md_ops->mdo_statfs(info->mti_env, + next, osfs); + else + rc = dt_statfs(info->mti_env, mdt->mdt_bottom, + osfs); + if (rc) + GOTO(out, rc); + spin_lock(&mdt->mdt_lock); + msf->msf_osfs = *osfs; + msf->msf_age = ktime_get_seconds(); + spin_unlock(&mdt->mdt_lock); + } else { + /** use cached statfs data */ + spin_lock(&mdt->mdt_lock); + *osfs = msf->msf_osfs; + spin_unlock(&mdt->mdt_lock); + } + + /* at least try to account for cached pages. its still racy and + * might be under-reporting if clients haven't announced their + * caches with brw recently */ + CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu" + " pending %llu free %llu avail %llu\n", + tgd->tgd_tot_dirty, tgd->tgd_tot_granted, + tgd->tgd_tot_pending, + osfs->os_bfree << tgd->tgd_blockbits, + osfs->os_bavail << tgd->tgd_blockbits); + + osfs->os_bavail -= min_t(u64, osfs->os_bavail, + ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending + + osfs->os_bsize - 1) >> tgd->tgd_blockbits)); + + tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__); + CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; " + "%llu objects: %llu free; state %x\n", + osfs->os_blocks, osfs->os_bfree, osfs->os_bavail, + osfs->os_files, osfs->os_ffree, osfs->os_state); + + if (!exp_grant_param_supp(tsi->tsi_exp) && + tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) { + /* clients which don't support OBD_CONNECT_GRANT_PARAM + * should not see a block size > page size, otherwise + * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12) + * block size which is the biggest block size known to work + * with all client's page size. */ + osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_bfree <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT; + } if (rc == 0) mdt_counter_incr(req, LPROC_MDT_STATFS); out: @@ -465,6 +509,52 @@ out: RETURN(rc); } +/** + * Pack size attributes into the reply. + */ +int mdt_pack_size2body(struct mdt_thread_info *info, + const struct lu_fid *fid, struct lustre_handle *lh) +{ + struct mdt_body *b; + struct md_attr *ma = &info->mti_attr; + int dom_stripe; + bool dom_lock = false; + + ENTRY; + + LASSERT(ma->ma_attr.la_valid & LA_MODE); + + if (!S_ISREG(ma->ma_attr.la_mode) || + !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)) + RETURN(-ENODATA); + + dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm); + /* no DoM stripe, no size in reply */ + if (dom_stripe == LMM_NO_DOM) + RETURN(-ENOENT); + + if (lustre_handle_is_used(lh)) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(lh); + if (lock != NULL) { + dom_lock = ldlm_has_dom(lock); + LDLM_LOCK_PUT(lock); + } + } + + /* no DoM lock, no size in reply */ + if (!dom_lock) + RETURN(0); + + /* Either DoM lock exists or LMM has only DoM stripe then + * return size on body. */ + b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + + mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock); + RETURN(0); +} + #ifdef CONFIG_FS_POSIX_ACL /* * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap. @@ -483,13 +573,13 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody, struct md_object *next = mdt_object_child(o); struct lu_buf *buf = &info->mti_buf; struct mdt_device *mdt = info->mti_mdt; + struct req_capsule *pill = info->mti_pill; int rc; ENTRY; - buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL); - buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL, - RCL_SERVER); + buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL); + buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER); if (buf->lb_len == 0) RETURN(0); @@ -537,6 +627,36 @@ again: mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc); } } else { + int client; + int server; + int acl_buflen; + int lmm_buflen = 0; + int lmmsize = 0; + + acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER); + if (acl_buflen >= rc) + goto map; + + /* If LOV/LMA EA is small, we can reuse part of their buffer */ + client = ptlrpc_req_get_repsize(pill->rc_req); + server = lustre_packed_msg_size(pill->rc_req->rq_repmsg); + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) { + lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD, + RCL_SERVER); + lmmsize = repbody->mbo_eadatasize; + } + + if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) { + CDEBUG(D_INODE, "%s: client prepared buffer size %d " + "is not big enough with the ACL size %d (%d)\n", + mdt_obd_name(mdt), client, rc, + server - acl_buflen - lmm_buflen + rc + lmmsize); + repbody->mbo_aclsize = 0; + repbody->mbo_valid &= ~OBD_MD_FLACL; + RETURN(-ERANGE); + } + +map: if (buf->lb_buf == info->mti_big_acl) info->mti_big_acl_used = 1; @@ -547,6 +667,8 @@ again: CERROR("%s: nodemap_map_acl unable to parse "DFID " ACL: rc = %d\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc); + repbody->mbo_aclsize = 0; + repbody->mbo_valid &= ~OBD_MD_FLACL; } else { repbody->mbo_aclsize = rc; repbody->mbo_valid |= OBD_MD_FLACL; @@ -665,17 +787,20 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, /* if no object is allocated on osts, the size on mds is valid. * b=22272 */ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL && - mdt_hsm_is_released(ma->ma_lmm)) { - /* A released file stores its size on MDS. */ - /* But return 1 block for released file, unless tools like tar - * will consider it fully sparse. (LU-3864) - */ - if (unlikely(b->mbo_size == 0)) - b->mbo_blocks = 0; - else - b->mbo_blocks = 1; - b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) { + if (mdt_hsm_is_released(ma->ma_lmm)) { + /* A released file stores its size on MDS. */ + /* But return 1 block for released file, unless tools + * like tar will consider it fully sparse. (LU-3864) + */ + if (unlikely(b->mbo_size == 0)) + b->mbo_blocks = 0; + else + b->mbo_blocks = 1; + b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } else if (info->mti_som_valid) { /* som is valid */ + b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } } if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE)) @@ -814,6 +939,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, return -EINVAL; } + LASSERT(buf->lb_buf); + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { @@ -866,8 +993,8 @@ got: return rc; } -static int mdt_attr_get_pfid(struct mdt_thread_info *info, - struct mdt_object *o, struct lu_fid *pfid) +int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, + struct lu_fid *pfid) { struct lu_buf *buf = &info->mti_buf; struct link_ea_header *leh; @@ -935,6 +1062,9 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, rc = mo_attr_get(env, next, ma); if (rc) GOTO(out, rc); + + if (S_ISREG(mode)) + (void) mdt_get_som(info, o, ma); ma->ma_valid |= MA_INODE; } @@ -964,6 +1094,15 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); } + /* + * In the handle of MA_INODE, we may already get the SOM attr. + */ + if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) { + rc = mdt_get_som(info, o, ma); + if (rc != 0) + GOTO(out, rc); + } + if (need & MA_HSM && S_ISREG(mode)) { buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); @@ -1210,6 +1349,12 @@ static int mdt_getattr(struct tgt_session_info *tsi) LASSERT(obj != NULL); LASSERT(lu_object_assert_exists(&obj->mot_obj)); + /* Special case for Data-on-MDT files to get data version */ + if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) { + rc = mdt_data_version_get(tsi); + GOTO(out, rc); + } + /* Unlike intent case where we need to pre-fill out buffers early on * in intent policy for ldlm reasons, here we can have a much better * guess at EA size by just reading it from disk. @@ -1219,7 +1364,6 @@ static int mdt_getattr(struct tgt_session_info *tsi) /* No easy way to know how long is the symlink, but it cannot * be more than PATH_MAX, so we allocate +1 */ rc = PATH_MAX + 1; - /* A special case for fs ROOT: getattr there might fetch * default EA for entire fs, not just for this dir! */ @@ -1263,12 +1407,12 @@ static int mdt_getattr(struct tgt_session_info *tsi) info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); rc = mdt_getattr_internal(info, obj, 0); - EXIT; + EXIT; out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; + mdt_client_compatibility(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; out: mdt_thread_info_fini(info); return rc; @@ -1279,31 +1423,20 @@ out: * * \param[in] info thread environment * \param[in] obj object - * \param[in] layout layout intent - * \param[in] buf buffer containing client's lovea, could be empty + * \param[in] layout layout change descriptor * * \retval 0 on success * \retval < 0 error code */ -static int mdt_layout_change(struct mdt_thread_info *info, - struct mdt_object *obj, - struct layout_intent *layout, - const struct lu_buf *buf) +int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, + struct md_layout_change *layout) { struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL]; int rc; ENTRY; - CDEBUG(D_INFO, "got layout change request from client: " - "opc:%u flags:%#x extent[%#llx,%#llx)\n", - layout->li_opc, layout->li_flags, - layout->li_start, layout->li_end); - if (layout->li_start >= layout->li_end) { - CERROR("Recieved an invalid layout change range [%llu, %llu) " - "for "DFID"\n", layout->li_start, layout->li_end, - PFID(mdt_object_fid(obj))); - RETURN(-EINVAL); - } + if (!mdt_object_exists(obj)) + GOTO(out, rc = -ENOENT); if (!S_ISREG(lu_object_attr(&obj->mot_obj))) GOTO(out, rc = -EINVAL); @@ -1315,14 +1448,13 @@ static int mdt_layout_change(struct mdt_thread_info *info, /* take layout lock to prepare layout change */ mdt_lock_reg_init(lh, LCK_EX); - rc = mdt_object_lock(info, obj, lh, - MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR); + rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT); if (rc) GOTO(out, rc); - rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout, - buf); - + mutex_lock(&obj->mot_som_mutex); + rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout); + mutex_unlock(&obj->mot_som_mutex); mdt_object_unlock(info, obj, lh, 1); out: RETURN(rc); @@ -1395,12 +1527,12 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) /* permission check. Make sure the calling process having permission * to write both files. */ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); @@ -1676,19 +1808,23 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (ma->ma_valid & MA_INODE && ma->ma_attr.la_valid & LA_CTIME && info->mti_mdt->mdt_namespace->ns_ctime_age_limit + - ma->ma_attr.la_ctime < cfs_time_current_sec()) + ma->ma_attr.la_ctime < ktime_get_real_seconds()) child_bits |= MDS_INODELOCK_UPDATE; } /* layout lock must be granted in a best-effort way * for IT operations */ LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT)); - if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) && - exp_connect_layout(info->mti_exp) && - S_ISREG(lu_object_attr(&child->mot_obj)) && + if (S_ISREG(lu_object_attr(&child->mot_obj)) && !mdt_object_remote(child) && ldlm_rep != NULL) { - /* try to grant layout lock for regular file. */ - try_bits = MDS_INODELOCK_LAYOUT; + if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) && + exp_connect_layout(info->mti_exp)) { + /* try to grant layout lock for regular file. */ + try_bits = MDS_INODELOCK_LAYOUT; + } + /* Acquire DOM lock in advance for data-on-mdt file */ + if (child != parent) + try_bits |= MDS_INODELOCK_DOM; } if (try_bits != 0) { @@ -1723,17 +1859,38 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, "Lock res_id: "DLDLMRES", fid: "DFID"\n", PLDLMRES(lock->l_resource), PFID(mdt_object_fid(child))); - } - if (lock) - LDLM_LOCK_PUT(lock); - EXIT; + if (S_ISREG(lu_object_attr(&child->mot_obj)) && + mdt_object_exists(child) && !mdt_object_remote(child) && + child != parent) { + LDLM_LOCK_PUT(lock); + mdt_object_put(info->mti_env, child); + /* NB: call the mdt_pack_size2body always after + * mdt_object_put(), that is why this special + * exit path is used. */ + rc = mdt_pack_size2body(info, child_fid, + &lhc->mlh_reg_lh); + if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { + /* DOM lock was taken in advance but this is + * not DoM file. Drop the lock. */ + lock_res_and_lock(lock); + ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM); + unlock_res_and_lock(lock); + } + + GOTO(out_parent, rc = 0); + } + } + if (lock) + LDLM_LOCK_PUT(lock); + + EXIT; out_child: - mdt_object_put(info->mti_env, child); + mdt_object_put(info->mti_env, child); out_parent: - if (lhp) - mdt_object_unlock(info, parent, lhp, 1); - return rc; + if (lhp) + mdt_object_unlock(info, parent, lhp, 1); + return rc; } /* normal handler: should release the child lock */ @@ -2002,11 +2159,24 @@ static int mdt_reint_internal(struct mdt_thread_info *info, out_ucred: mdt_exit_ucred(info); out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - return rc; + mdt_client_compatibility(info); + + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + + /* + * Data-on-MDT optimization - read data along with OPEN and return it + * in reply. Do that only if we have both DOM and LAYOUT locks. + */ + if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) && + info->mti_attr.ma_lmm != NULL && + mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) { + rc = mdt_dom_read_on_open(info, info->mti_mdt, + &lhc->mlh_reg_lh); + } + + return rc; } static long mdt_reint_opcode(struct ptlrpc_request *req, @@ -2048,7 +2218,8 @@ static int mdt_reint(struct tgt_session_info *tsi) [REINT_OPEN] = &RQF_MDS_REINT_OPEN, [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR, [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK, - [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME + [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE, + [REINT_RESYNC] = &RQF_MDS_REINT_RESYNC, }; ENTRY; @@ -2071,7 +2242,7 @@ static int mdt_reint(struct tgt_session_info *tsi) } /* this should sync the whole device */ -static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) +int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) { struct dt_device *dt = mdt->mdt_bottom; int rc; @@ -2082,20 +2253,21 @@ static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) } /* this should sync this object */ -static int mdt_object_sync(struct mdt_thread_info *info) +static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, + struct mdt_object *mo) { - struct md_object *next; int rc; + ENTRY; - if (!mdt_object_exists(info->mti_object)) { + if (!mdt_object_exists(mo)) { CWARN("%s: non existing object "DFID": rc = %d\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(info->mti_object)), -ESTALE); + exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)), + -ESTALE); RETURN(-ESTALE); } - next = mdt_object_child(info->mti_object); - rc = mo_object_sync(info->mti_env, next); + + rc = mo_object_sync(env, mdt_object_child(mo)); RETURN(rc); } @@ -2118,7 +2290,8 @@ static int mdt_sync(struct tgt_session_info *tsi) struct mdt_thread_info *info = tsi2mdt_info(tsi); /* sync an object */ - rc = mdt_object_sync(info); + rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, + info->mti_object); if (rc == 0) { const struct lu_fid *fid; struct lu_attr *la = &info->mti_attr.ma_attr; @@ -2142,6 +2315,54 @@ static int mdt_sync(struct tgt_session_info *tsi) RETURN(rc); } +static int mdt_data_sync(struct tgt_session_info *tsi) +{ + struct mdt_thread_info *info; + struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp); + struct ost_body *body = tsi->tsi_ost_body; + struct ost_body *repbody; + struct mdt_object *mo = NULL; + struct md_attr *ma; + int rc = 0; + + ENTRY; + + repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); + + /* if no fid is specified then do nothing, + * device sync is done via MDS_SYNC */ + if (fid_is_zero(&tsi->tsi_fid)) + RETURN(0); + + mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid); + if (IS_ERR(mo)) + RETURN(PTR_ERR(mo)); + + rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo); + if (rc) + GOTO(put, rc); + + repbody->oa.o_oi = body->oa.o_oi; + repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + + info = tsi2mdt_info(tsi); + ma = &info->mti_attr; + ma->ma_need = MA_INODE; + ma->ma_valid = 0; + rc = mdt_attr_get_complex(info, mo, ma); + if (rc == 0) + obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS); + else + rc = 0; + mdt_thread_info_fini(info); + + EXIT; +put: + if (mo != NULL) + mdt_object_put(tsi->tsi_env, mo); + return rc; +} + /* * Handle quota control requests to consult current usage/limit, but also * to configure quota enforcement @@ -2173,10 +2394,12 @@ static int mdt_quotactl(struct tgt_session_info *tsi) /* master quotactl */ case Q_SETINFO: case Q_SETQUOTA: + case LUSTRE_Q_SETDEFAULT: if (!nodemap_can_setquota(nodemap)) GOTO(out_nodemap, rc = -EPERM); case Q_GETINFO: case Q_GETQUOTA: + case LUSTRE_Q_GETDEFAULT: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ @@ -2226,6 +2449,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_SETINFO: case Q_SETQUOTA: case Q_GETQUOTA: + case LUSTRE_Q_SETDEFAULT: + case LUSTRE_Q_GETDEFAULT: /* forward quotactl request to QMT */ rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); break; @@ -2433,6 +2658,7 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, { struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd; struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + struct ldlm_cb_set_arg *arg = data; bool commit_async = false; int rc; ENTRY; @@ -2445,17 +2671,22 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, unlock_res_and_lock(lock); RETURN(0); } - /* There is no lock conflict if l_blocking_lock == NULL, - * it indicates a blocking ast sent from ldlm_lock_decref_internal - * when the last reference to a local lock was released */ - if (lock->l_req_mode & (LCK_PW | LCK_EX) && - lock->l_blocking_lock != NULL) { + + /* A blocking ast may be sent from ldlm_lock_decref_internal + * when the last reference to a local lock was released and + * during blocking event from ldlm_work_bl_ast_lock(). + * The 'data' parameter is l_ast_data in the first case and + * callback arguments in the second one. Distinguish them by that. + */ + if (!data || data == lock->l_ast_data || !arg->bl_desc) + goto skip_cos_checks; + + if (lock->l_req_mode & (LCK_PW | LCK_EX)) { if (mdt_cos_is_enabled(mdt)) { - if (lock->l_client_cookie != - lock->l_blocking_lock->l_client_cookie) + if (!arg->bl_desc->bl_same_client) mdt_set_lock_sync(lock); } else if (mdt_slc_is_enabled(mdt) && - ldlm_is_cos_incompat(lock->l_blocking_lock)) { + arg->bl_desc->bl_cos_incompat) { mdt_set_lock_sync(lock); /* * we may do extra commit here, but there is a small @@ -2469,11 +2700,11 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, */ commit_async = true; } - } else if (lock->l_req_mode == LCK_COS && - lock->l_blocking_lock != NULL) { + } else if (lock->l_req_mode == LCK_COS) { commit_async = true; } +skip_cos_checks: rc = ldlm_blocking_ast_nocheck(lock); if (commit_async) { @@ -2604,7 +2835,7 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, struct lustre_handle *lh, enum ldlm_mode mode, __u64 *ibits, __u64 trybits, bool cache) { - struct ldlm_enqueue_info *einfo = &mti->mti_einfo; + struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo; union ldlm_policy_data *policy = &mti->mti_policy; struct ldlm_res_id *res_id = &mti->mti_res_id; int rc = 0; @@ -2631,17 +2862,14 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, einfo->ei_cbdata = o; } - memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = *ibits; policy->l_inodebits.try_bits = trybits; rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, policy); - if (rc < 0 && cache) { + if (rc < 0 && cache) mdt_object_put(mti->mti_env, o); - einfo->ei_cbdata = NULL; - } /* Return successfully acquired bits to a caller */ if (rc == 0) { @@ -2670,7 +2898,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0; + __u64 dlmflags = 0, *cookie = NULL; int rc; ENTRY; @@ -2688,9 +2916,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, } /* Only enqueue LOOKUP lock for remote object */ - if (mdt_object_remote(o)) { - LASSERT(*ibits == MDS_INODELOCK_LOOKUP); - } + LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP)); if (lh->mlh_type == MDT_PDO_LOCK) { /* check for exists after object is locked */ @@ -2704,10 +2930,12 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, } } - fid_build_reg_res_name(mdt_object_fid(o), res_id); dlmflags |= LDLM_FL_ATOMIC_CB; + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + /* * Take PDO lock on whole directory and build correct @res_id for lock * on part of directory. @@ -2721,12 +2949,16 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * is never going to be sent to client and we do not * want it slowed down due to possible cancels. */ - policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; - policy->l_inodebits.try_bits = 0; - rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + policy->l_inodebits.bits = + *ibits & MDS_INODELOCK_UPDATE; + policy->l_inodebits.try_bits = + trybits & MDS_INODELOCK_UPDATE; + /* at least one of them should be set */ + LASSERT(policy->l_inodebits.bits | + policy->l_inodebits.try_bits); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, policy, res_id, + dlmflags, cookie); if (unlikely(rc != 0)) GOTO(out_unlock, rc); } @@ -2746,10 +2978,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, + policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, + cookie); out_unlock: if (rc != 0) mdt_object_unlock(info, o, lh, 1); @@ -2826,6 +3057,10 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, } } + /* other components like LFSCK can use lockless access + * and populate cache, so we better invalidate it */ + mo_invalidate(info->mti_env, mdt_object_child(o)); + RETURN(0); } @@ -2848,8 +3083,17 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 *ibits, __u64 trybits, bool cos_incompat) { - return mdt_object_lock_internal(info, o, lh, ibits, trybits, - cos_incompat); + bool trylock_only = *ibits == 0; + int rc; + + LASSERT(!(*ibits & trybits)); + rc = mdt_object_lock_internal(info, o, lh, ibits, trybits, + cos_incompat); + if (rc && trylock_only) { /* clear error for try ibits lock only */ + LASSERT(*ibits == 0); + rc = 0; + } + return rc; } /** @@ -2865,8 +3109,8 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, * \param mode lock mode * \param decref force immediate lock releasing */ -static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, - enum ldlm_mode mode, int decref) +void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, + enum ldlm_mode mode, int decref) { ENTRY; @@ -2891,7 +3135,7 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, "state %p transno %lld\n", req, req->rq_reply_state, req->rq_transno); if (cos) { - ldlm_lock_downgrade(lock, LCK_COS); + ldlm_lock_mode_downgrade(lock, LCK_COS); mode = LCK_COS; } if (req->rq_export->exp_disconnected) @@ -3027,7 +3271,8 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, * actually exists on storage (lu_object_exists()). * */ -static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) +static int mdt_body_unpack(struct mdt_thread_info *info, + enum tgt_handler_flags flags) { const struct mdt_body *body; struct mdt_object *obj; @@ -3066,7 +3311,8 @@ static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) RETURN(rc); } -static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags) +static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, + enum tgt_handler_flags flags) { struct req_capsule *pill = info->mti_pill; int rc; @@ -3150,6 +3396,7 @@ void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_opdata = 0; info->mti_big_lmm_used = 0; info->mti_big_acl_used = 0; + info->mti_som_valid = 0; info->mti_spec.no_create = 0; info->mti_spec.sp_rm_entry = 0; @@ -3209,118 +3456,25 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi) return tgt_connect(tsi); } -enum mdt_it_code { - MDT_IT_OPEN, - MDT_IT_OCREAT, - MDT_IT_CREATE, - MDT_IT_GETATTR, - MDT_IT_READDIR, - MDT_IT_LOOKUP, - MDT_IT_UNLINK, - MDT_IT_TRUNC, - MDT_IT_GETXATTR, - MDT_IT_LAYOUT, - MDT_IT_QUOTA, - MDT_IT_NR -}; - -static int mdt_intent_getattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); - -static int mdt_intent_getxattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags); - -static int mdt_intent_layout(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); -static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); - -static struct mdt_it_flavor { - const struct req_format *it_fmt; - __u32 it_flags; - int (*it_act)(enum mdt_it_code , - struct mdt_thread_info *, - struct ldlm_lock **, - __u64); - long it_reint; -} mdt_it_flavor[] = { - [MDT_IT_OPEN] = { - .it_fmt = &RQF_LDLM_INTENT, - /*.it_flags = HABEO_REFERO,*/ - .it_flags = 0, - .it_act = mdt_intent_reint, - .it_reint = REINT_OPEN - }, - [MDT_IT_OCREAT] = { - .it_fmt = &RQF_LDLM_INTENT, - /* - * OCREAT is not a MUTABOR request as if the file - * already exists. - * We do the extra check of OBD_CONNECT_RDONLY in - * mdt_reint_open() when we really need to create - * the object. - */ - .it_flags = 0, - .it_act = mdt_intent_reint, - .it_reint = REINT_OPEN - }, - [MDT_IT_CREATE] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_flags = MUTABOR, - .it_act = mdt_intent_reint, - .it_reint = REINT_CREATE - }, - [MDT_IT_GETATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_flags = HABEO_REFERO, - .it_act = mdt_intent_getattr - }, - [MDT_IT_READDIR] = { - .it_fmt = NULL, - .it_flags = 0, - .it_act = NULL - }, - [MDT_IT_LOOKUP] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_flags = HABEO_REFERO, - .it_act = mdt_intent_getattr - }, - [MDT_IT_UNLINK] = { - .it_fmt = &RQF_LDLM_INTENT_UNLINK, - .it_flags = MUTABOR, - .it_act = NULL, - .it_reint = REINT_UNLINK - }, - [MDT_IT_TRUNC] = { - .it_fmt = NULL, - .it_flags = MUTABOR, - .it_act = NULL - }, - [MDT_IT_GETXATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETXATTR, - .it_flags = HABEO_CORPUS, - .it_act = mdt_intent_getxattr - }, - [MDT_IT_LAYOUT] = { - .it_fmt = &RQF_LDLM_INTENT_LAYOUT, - .it_flags = 0, - .it_act = mdt_intent_layout - } -}; +static int mdt_intent_glimpse(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, __u64 flags) +{ + return mdt_glimpse_enqueue(info, info->mti_mdt->mdt_namespace, + lockp, flags); +} +static int mdt_intent_brw(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, __u64 flags) +{ + return mdt_brw_enqueue(info, info->mti_mdt->mdt_namespace, + lockp, flags); +} -static int -mdt_intent_lock_replace(struct mdt_thread_info *info, - struct ldlm_lock **lockp, - struct mdt_lock_handle *lh, - __u64 flags, int result) +int mdt_intent_lock_replace(struct mdt_thread_info *info, + struct ldlm_lock **lockp, + struct mdt_lock_handle *lh, + __u64 flags, int result) { struct ptlrpc_request *req = mdt_info_req(info); struct ldlm_lock *lock = *lockp; @@ -3396,6 +3550,8 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, new_lock->l_export = class_export_lock_get(req->rq_export, new_lock); new_lock->l_blocking_ast = lock->l_blocking_ast; new_lock->l_completion_ast = lock->l_completion_ast; + if (ldlm_has_dom(new_lock)) + new_lock->l_glimpse_ast = ldlm_server_glimpse_ast; new_lock->l_remote_handle = lock->l_remote_handle; new_lock->l_flags &= ~LDLM_FL_LOCAL; @@ -3411,10 +3567,9 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, RETURN(ELDLM_LOCK_REPLACED); } -static void mdt_intent_fixup_resent(struct mdt_thread_info *info, - struct ldlm_lock *new_lock, - struct mdt_lock_handle *lh, - __u64 flags) +void mdt_intent_fixup_resent(struct mdt_thread_info *info, + struct ldlm_lock *new_lock, + struct mdt_lock_handle *lh, __u64 flags) { struct ptlrpc_request *req = mdt_info_req(info); struct ldlm_request *dlmreq; @@ -3455,10 +3610,10 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, dlmreq->lock_handle[0].cookie); } -static int mdt_intent_getxattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags) +static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *ldlm_rep = NULL; @@ -3505,7 +3660,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode, RETURN(rc); } -static int mdt_intent_getattr(enum mdt_it_code opcode, +static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3528,18 +3683,19 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - switch (opcode) { - case MDT_IT_LOOKUP: + switch (it_opc) { + case IT_LOOKUP: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; - break; - case MDT_IT_GETATTR: + break; + case IT_GETATTR: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM; - break; - default: - CERROR("Unsupported intent (%d)\n", opcode); - GOTO(out_shrink, rc = -EINVAL); - } + break; + default: + CERROR("%s: unsupported intent %#x\n", + mdt_obd_name(info->mti_mdt), (unsigned int)it_opc); + GOTO(out_shrink, rc = -EINVAL); + } rc = mdt_init_ucred_intent_getattr(info, reqbody); if (rc) @@ -3574,34 +3730,36 @@ out_shrink: return rc; } -static int mdt_intent_layout(enum mdt_it_code opcode, +static int mdt_intent_layout(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT]; - struct layout_intent *layout; - struct lu_fid *fid; + struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP }; + struct layout_intent *intent; + struct lu_fid *fid = &info->mti_tmp_fid2; struct mdt_object *obj = NULL; - bool layout_change = false; int layout_size = 0; int rc = 0; ENTRY; - if (opcode != MDT_IT_LAYOUT) { - CERROR("%s: Unknown intent (%d)\n", mdt_obd_name(info->mti_mdt), - opcode); - RETURN(-EINVAL); - } + fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); - layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT); - if (layout == NULL) + intent = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT); + if (intent == NULL) RETURN(-EPROTO); - switch (layout->li_opc) { + CDEBUG(D_INFO, DFID "got layout change request from client: " + "opc:%u flags:%#x extent "DEXT"\n", + PFID(fid), intent->li_opc, intent->li_flags, + PEXT(&intent->li_extent)); + + switch (intent->li_opc) { case LAYOUT_INTENT_TRUNC: case LAYOUT_INTENT_WRITE: - layout_change = true; + layout.mlc_opc = MD_LAYOUT_WRITE; + layout.mlc_intent = intent; break; case LAYOUT_INTENT_ACCESS: break; @@ -3610,21 +3768,18 @@ static int mdt_intent_layout(enum mdt_it_code opcode, case LAYOUT_INTENT_RELEASE: case LAYOUT_INTENT_RESTORE: CERROR("%s: Unsupported layout intent opc %d\n", - mdt_obd_name(info->mti_mdt), layout->li_opc); + mdt_obd_name(info->mti_mdt), intent->li_opc); rc = -ENOTSUPP; break; default: CERROR("%s: Unknown layout intent opc %d\n", - mdt_obd_name(info->mti_mdt), layout->li_opc); + mdt_obd_name(info->mti_mdt), intent->li_opc); rc = -EINVAL; break; } if (rc < 0) RETURN(rc); - fid = &info->mti_tmp_fid2; - fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); - /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); @@ -3632,13 +3787,24 @@ static int mdt_intent_layout(enum mdt_it_code opcode, if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); + if (mdt_object_exists(obj) && !mdt_object_remote(obj)) { - layout_size = mdt_attr_get_eabuf_size(info, obj); - if (layout_size < 0) - GOTO(out_obj, rc = layout_size); + /* if layout is going to be changed don't use the current EA + * size but the maximum one. That buffer will be shrinked + * to the actual size in req_capsule_shrink() before reply. + */ + if (layout.mlc_opc == MD_LAYOUT_WRITE) { + layout_size = info->mti_mdt->mdt_max_mdsize; + } else { + layout_size = mdt_attr_get_eabuf_size(info, obj); + if (layout_size < 0) + GOTO(out_obj, rc = layout_size); - if (layout_size > info->mti_mdt->mdt_max_mdsize) - info->mti_mdt->mdt_max_mdsize = layout_size; + if (layout_size > info->mti_mdt->mdt_max_mdsize) + info->mti_mdt->mdt_max_mdsize = layout_size; + } + CDEBUG(D_INFO, "%s: layout_size %d\n", + mdt_obd_name(info->mti_mdt), layout_size); } /* @@ -3653,8 +3819,8 @@ static int mdt_intent_layout(enum mdt_it_code opcode, GOTO(out_obj, rc); - if (layout_change) { - struct lu_buf *buf = &info->mti_buf; + if (layout.mlc_opc != MD_LAYOUT_NOP) { + struct lu_buf *buf = &layout.mlc_buf; /** * mdt_layout_change is a reint operation, when the request @@ -3692,13 +3858,12 @@ static int mdt_intent_layout(enum mdt_it_code opcode, if (buf->lb_len > 0) mdt_fix_lov_magic(info, buf->lb_buf); } - /* * Instantiate some layout components, if @buf contains * lovea, then it's a replay of the layout intent write * RPC. */ - rc = mdt_layout_change(info, obj, layout, buf); + rc = mdt_layout_change(info, obj, &layout); if (rc) GOTO(out_obj, rc); } @@ -3711,13 +3876,13 @@ out_obj: out: lhc->mlh_reg_lh.cookie = 0; - return rc; + RETURN(rc); } -static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags) +static int mdt_intent_open(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *rep = NULL; @@ -3735,12 +3900,6 @@ static int mdt_intent_reint(enum mdt_it_code opcode, if (opc < 0) RETURN(opc); - if (mdt_it_flavor[opcode].it_reint != opc) { - CERROR("Reint code %ld doesn't match intent: %d\n", - opc, opcode); - RETURN(err_serious(-EPROTO)); - } - /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); @@ -3790,69 +3949,60 @@ static int mdt_intent_reint(enum mdt_it_code opcode, RETURN(ELDLM_LOCK_ABORTED); } -static int mdt_intent_code(enum ldlm_intent_flags itcode) -{ +static int mdt_intent_opc(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + u64 flags /* LDLM_FL_* */) +{ + struct req_capsule *pill = info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + const struct req_format *it_format; + int (*it_handler)(enum ldlm_intent_flags, + struct mdt_thread_info *, + struct ldlm_lock **, + u64); + enum tgt_handler_flags it_handler_flags = 0; + struct ldlm_reply *rep; int rc; + ENTRY; - switch (itcode) { + switch (it_opc) { case IT_OPEN: - rc = MDT_IT_OPEN; - break; case IT_OPEN|IT_CREAT: - rc = MDT_IT_OCREAT; - break; - case IT_CREAT: - rc = MDT_IT_CREATE; - break; - case IT_READDIR: - rc = MDT_IT_READDIR; + /* + * OCREAT is not a MUTABOR request since the file may + * already exist. We do the extra check of + * OBD_CONNECT_RDONLY in mdt_reint_open() when we + * really need to create the object. + */ + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_open; break; case IT_GETATTR: - rc = MDT_IT_GETATTR; - break; case IT_LOOKUP: - rc = MDT_IT_LOOKUP; - break; - case IT_UNLINK: - rc = MDT_IT_UNLINK; - break; - case IT_TRUNC: - rc = MDT_IT_TRUNC; + it_format = &RQF_LDLM_INTENT_GETATTR; + it_handler = &mdt_intent_getattr; + it_handler_flags = HABEO_REFERO; break; case IT_GETXATTR: - rc = MDT_IT_GETXATTR; + it_format = &RQF_LDLM_INTENT_GETXATTR; + it_handler = &mdt_intent_getxattr; + it_handler_flags = HABEO_CORPUS; break; case IT_LAYOUT: - rc = MDT_IT_LAYOUT; + it_format = &RQF_LDLM_INTENT_LAYOUT; + it_handler = &mdt_intent_layout; break; - case IT_QUOTA_DQACQ: - case IT_QUOTA_CONN: - rc = MDT_IT_QUOTA; + case IT_GLIMPSE: + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_glimpse; break; - default: - CERROR("Unknown intent opcode: 0x%08x\n", itcode); - rc = -EINVAL; + case IT_BRW: + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_brw; break; - } - return rc; -} - -static int mdt_intent_opc(enum ldlm_intent_flags itopc, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, __u64 flags) -{ - struct req_capsule *pill = info->mti_pill; - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_it_flavor *flv; - int opc; - int rc; - ENTRY; - - opc = mdt_intent_code(itopc); - if (opc < 0) - RETURN(-EINVAL); - - if (opc == MDT_IT_QUOTA) { + case IT_QUOTA_DQACQ: + case IT_QUOTA_CONN: { struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev; if (qmt == NULL) @@ -3868,87 +4018,101 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc, flags); RETURN(rc); } + default: + CERROR("%s: unknown intent code %#x\n", + mdt_obd_name(info->mti_mdt), it_opc); + RETURN(-EPROTO); + } - flv = &mdt_it_flavor[opc]; - if (flv->it_fmt != NULL) - req_capsule_extend(pill, flv->it_fmt); + req_capsule_extend(pill, it_format); - rc = mdt_unpack_req_pack_rep(info, flv->it_flags); + rc = mdt_unpack_req_pack_rep(info, it_handler_flags); if (rc < 0) RETURN(rc); - if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export)) + if (it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export)) RETURN(-EROFS); - if (flv->it_act != NULL) { - struct ldlm_reply *rep; + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); + /* execute policy */ + rc = (*it_handler)(it_opc, info, lockp, flags); - /* execute policy */ - rc = flv->it_act(opc, info, lockp, flags); - - /* Check whether the reply has been packed successfully. */ - if (req->rq_repmsg != NULL) { - rep = req_capsule_server_get(info->mti_pill, - &RMF_DLM_REP); - rep->lock_policy_res2 = - ptlrpc_status_hton(rep->lock_policy_res2); - } + /* Check whether the reply has been packed successfully. */ + if (req->rq_repmsg != NULL) { + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + rep->lock_policy_res2 = + ptlrpc_status_hton(rep->lock_policy_res2); } RETURN(rc); } -static int mdt_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - enum ldlm_mode mode, __u64 flags, void *data) +static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req, + enum ldlm_intent_flags it_opc) +{ + struct lprocfs_stats *srv_stats = ptlrpc_req2svc(req)->srv_stats; + + /* update stats when IT code is known */ + if (srv_stats != NULL) + lprocfs_counter_incr(srv_stats, + PTLRPC_LAST_CNTR + (it_opc == IT_GLIMPSE ? + LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE)); +} + +static int mdt_intent_policy(const struct lu_env *env, + struct ldlm_namespace *ns, + struct ldlm_lock **lockp, + void *req_cookie, + enum ldlm_mode mode, + __u64 flags, void *data) { struct tgt_session_info *tsi; struct mdt_thread_info *info; struct ptlrpc_request *req = req_cookie; struct ldlm_intent *it; struct req_capsule *pill; + const struct ldlm_lock_desc *ldesc; int rc; ENTRY; LASSERT(req != NULL); - tsi = tgt_ses_info(req->rq_svc_thread->t_env); + tsi = tgt_ses_info(env); info = tsi2mdt_info(tsi); - LASSERT(info != NULL); - pill = info->mti_pill; - LASSERT(pill->rc_req == req); + LASSERT(info != NULL); + pill = info->mti_pill; + LASSERT(pill->rc_req == req); + ldesc = &info->mti_dlm_req->lock_desc; - if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { + if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC); - it = req_capsule_client_get(pill, &RMF_LDLM_INTENT); - if (it != NULL) { - rc = mdt_intent_opc(it->opc, info, lockp, flags); - if (rc == 0) - rc = ELDLM_OK; - - /* Lock without inodebits makes no sense and will oops - * later in ldlm. Let's check it now to see if we have - * ibits corrupted somewhere in mdt_intent_opc(). - * The case for client miss to set ibits has been - * processed by others. */ - LASSERT(ergo(info->mti_dlm_req->lock_desc.l_resource.\ - lr_type == LDLM_IBITS, - info->mti_dlm_req->lock_desc.\ - l_policy_data.l_inodebits.bits != 0)); - } else - rc = err_serious(-EFAULT); - } else { - /* No intent was provided */ - LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE); + it = req_capsule_client_get(pill, &RMF_LDLM_INTENT); + if (it != NULL) { + mdt_ptlrpc_stats_update(req, it->opc); + rc = mdt_intent_opc(it->opc, info, lockp, flags); + if (rc == 0) + rc = ELDLM_OK; + + /* Lock without inodebits makes no sense and will oops + * later in ldlm. Let's check it now to see if we have + * ibits corrupted somewhere in mdt_intent_opc(). + * The case for client miss to set ibits has been + * processed by others. */ + LASSERT(ergo(ldesc->l_resource.lr_type == LDLM_IBITS, + ldesc->l_policy_data.l_inodebits.bits != 0)); + } else { + rc = err_serious(-EFAULT); + } + } else { + /* No intent was provided */ req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); - rc = req_capsule_server_pack(pill); - if (rc) - rc = err_serious(rc); - } + rc = req_capsule_server_pack(pill); + if (rc) + rc = err_serious(rc); + } mdt_thread_info_fini(info); RETURN(rc); } @@ -4631,6 +4795,11 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi) return rc; } +#define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET +#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET +#define OST_BRW_READ OST_READ +#define OST_BRW_WRITE OST_WRITE + static struct tgt_handler mdt_tgt_handlers[] = { TGT_RPC_HANDLER(MDS_FIRST_OPC, 0, MDS_CONNECT, mdt_tgt_connect, @@ -4671,6 +4840,17 @@ TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR, mdt_swap_layouts), }; +static struct tgt_handler mdt_io_ops[] = { +TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read, + mdt_hp_brw), +TGT_OST_HDL_HP(HABEO_CORPUS | MUTABOR, OST_BRW_WRITE, tgt_brw_write, + mdt_hp_brw), +TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO | MUTABOR, + OST_PUNCH, mdt_punch_hdl, + mdt_hp_punch), +TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC, mdt_data_sync), +}; + static struct tgt_handler mdt_sec_ctx_ops[] = { TGT_SEC_HDL_VAR(0, SEC_CTX_INIT, mdt_sec_ctx_handle), TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT,mdt_sec_ctx_handle), @@ -4732,7 +4912,11 @@ static struct tgt_opc_slice mdt_common_slice[] = { .tos_opc_end = LFSCK_LAST_OPC, .tos_hs = tgt_lfsck_handlers }, - + { + .tos_opc_start = OST_FIRST_OPC, + .tos_opc_end = OST_LAST_OPC, + .tos_hs = mdt_io_ops + }, { .tos_hs = NULL } @@ -4787,6 +4971,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) mdt_hsm_cdt_fini(m); + if (m->mdt_los != NULL) { + local_oid_storage_fini(env, m->mdt_los); + m->mdt_los = NULL; + } + if (m->mdt_namespace != NULL) { ldlm_namespace_free_post(m->mdt_namespace); d->ld_obd->obd_namespace = m->mdt_namespace = NULL; @@ -4823,12 +5012,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, struct obd_device *obd; const char *dev = lustre_cfg_string(cfg, 0); const char *num = lustre_cfg_string(cfg, 2); + struct tg_grants_data *tgd = &m->mdt_lut.lut_tgd; struct lustre_mount_info *lmi = NULL; struct lustre_sb_info *lsi; struct lu_site *s; struct seq_server_site *ss_site; const char *identity_upcall = "NONE"; struct md_device *next; + struct lu_fid fid; int rc; long node_id; mntopt_t mntopts; @@ -4853,7 +5044,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); - m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */ + m->mdt_max_mdsize = MAX_MD_SIZE_OLD; m->mdt_opts.mo_evict_tgt_nids = 1; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; @@ -4872,13 +5063,19 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_skip_lfsck = 1; } + /* DoM files get IO lock at open by default */ + m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN; + /* DoM files are read at open and data is packed in the reply */ + m->mdt_opts.mo_dom_read_open = 1; + m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); init_rwsem(&m->mdt_squash.rsi_sem); spin_lock_init(&m->mdt_lock); - m->mdt_osfs_age = cfs_time_shift_64(-1000); - m->mdt_enable_remote_dir = 0; + m->mdt_enable_remote_dir = 1; + m->mdt_enable_striped_dir = 1; + m->mdt_enable_dir_migration = 1; m->mdt_enable_remote_dir_gid = 0; atomic_set(&m->mdt_mds_mds_conns, 0); @@ -4945,23 +5142,39 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, /* set obd_namespace for compatibility with old code */ obd->obd_namespace = m->mdt_namespace; - rc = mdt_hsm_cdt_init(m); - if (rc != 0) { - CERROR("%s: error initializing coordinator, rc %d\n", - mdt_obd_name(m), rc); - GOTO(err_free_ns, rc); - } - rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice, OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_MDS_ALL_REPLY_NET); if (rc) - GOTO(err_free_hsm, rc); + GOTO(err_free_ns, rc); + + /* Amount of available space excluded from granting and reserved + * for metadata. It is in percentage and 50% is default value. */ + tgd->tgd_reserved_pcnt = 50; + + if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits)) + m->mdt_brw_size = 1U << tgd->tgd_blockbits; + else + m->mdt_brw_size = ONE_MB_BRW_SIZE; rc = mdt_fs_setup(env, m, obd, lsi); if (rc) GOTO(err_tgt, rc); + fid.f_seq = FID_SEQ_LOCAL_NAME; + fid.f_oid = 1; + fid.f_ver = 0; + rc = local_oid_storage_init(env, m->mdt_bottom, &fid, &m->mdt_los); + if (rc != 0) + GOTO(err_fs_cleanup, rc); + + rc = mdt_hsm_cdt_init(m); + if (rc != 0) { + CERROR("%s: error initializing coordinator, rc %d\n", + mdt_obd_name(m), rc); + GOTO(err_los_fini, rc); + } + tgt_adapt_sptlrpc_conf(&m->mdt_lut); next = m->mdt_child; @@ -4992,7 +5205,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (IS_ERR(m->mdt_identity_cache)) { rc = PTR_ERR(m->mdt_identity_cache); m->mdt_identity_cache = NULL; - GOTO(err_fs_cleanup, rc); + GOTO(err_free_hsm, rc); } rc = mdt_procfs_init(m, dev); @@ -5028,12 +5241,15 @@ err_recovery: target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; +err_free_hsm: + mdt_hsm_cdt_fini(m); +err_los_fini: + local_oid_storage_fini(env, m->mdt_los); + m->mdt_los = NULL; err_fs_cleanup: mdt_fs_cleanup(env, m); err_tgt: tgt_fini(env, &m->mdt_lut); -err_free_hsm: - mdt_hsm_cdt_fini(m); err_free_ns: ldlm_namespace_free(m->mdt_namespace, NULL, 0); obd->obd_namespace = m->mdt_namespace = NULL; @@ -5154,7 +5370,9 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, lu_object_add_top(h, o); o->lo_ops = &mdt_obj_ops; spin_lock_init(&mo->mot_write_lock); + mutex_init(&mo->mot_som_mutex); mutex_init(&mo->mot_lov_mutex); + init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); atomic_set(&mo->mot_open_count, 0); RETURN(o); @@ -5323,10 +5541,12 @@ static int mdt_obd_set_info_async(const struct lu_env *env, * \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size * \retval -EBADE client and server feature requirements are incompatible */ -static int mdt_connect_internal(struct obd_export *exp, +static int mdt_connect_internal(const struct lu_env *env, + struct obd_export *exp, struct mdt_device *mdt, - struct obd_connect_data *data) + struct obd_connect_data *data, bool reconnect) { + const char *obd_name = mdt_obd_name(mdt); LASSERT(data != NULL); data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; @@ -5341,15 +5561,6 @@ static int mdt_connect_internal(struct obd_export *exp, data->ocd_ibits_known &= MDS_INODELOCK_FULL; - if (!(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && - !(data->ocd_connect_flags & OBD_CONNECT_IBITS)) { - CWARN("%s: client %s does not support ibits lock, either " - "very old or an invalid client: flags %#llx\n", - mdt_obd_name(mdt), exp->exp_client_uuid.uuid, - data->ocd_connect_flags); - return -EBADE; - } - if (!mdt->mdt_opts.mo_acl) data->ocd_connect_flags &= ~OBD_CONNECT_ACL; @@ -5357,20 +5568,44 @@ static int mdt_connect_internal(struct obd_export *exp, data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; if (OCD_HAS_FLAG(data, BRW_SIZE)) { - data->ocd_brw_size = min(data->ocd_brw_size, MD_MAX_BRW_SIZE); + data->ocd_brw_size = min(data->ocd_brw_size, + mdt->mdt_brw_size); if (data->ocd_brw_size == 0) { CERROR("%s: cli %s/%p ocd_connect_flags: %#llx " "ocd_version: %x ocd_grant: %d ocd_index: %u " "ocd_brw_size unexpectedly zero, network data " "corruption? Refusing to connect this client\n", - mdt_obd_name(mdt), - exp->exp_client_uuid.uuid, + obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, data->ocd_grant, data->ocd_index); return -EPROTO; } } + if (OCD_HAS_FLAG(data, GRANT_PARAM)) { + struct dt_device_param *ddp = &mdt->mdt_lut.lut_dt_conf; + + /* client is reporting its page size, for future use */ + exp->exp_target_data.ted_pagebits = data->ocd_grant_blkbits; + data->ocd_grant_blkbits = mdt->mdt_lut.lut_tgd.tgd_blockbits; + /* ddp_inodespace may not be power-of-two value, eg. for ldiskfs + * it's LDISKFS_DIR_REC_LEN(20) = 28. */ + data->ocd_grant_inobits = fls(ddp->ddp_inodespace - 1); + /* ocd_grant_tax_kb is in 1K byte blocks */ + data->ocd_grant_tax_kb = ddp->ddp_extent_tax >> 10; + data->ocd_grant_max_blks = ddp->ddp_max_extent_blks; + } + + /* Save connect_data we have so far because tgt_grant_connect() + * uses it to calculate grant, and we want to save the client + * version before it is overwritten by LUSTRE_VERSION_CODE. */ + exp->exp_connect_data = *data; + if (OCD_HAS_FLAG(data, GRANT)) + tgt_grant_connect(env, exp, data, !reconnect); + + if (OCD_HAS_FLAG(data, MAXBYTES)) + data->ocd_maxbytes = mdt->mdt_lut.lut_dt_conf.ddp_maxbytes; + /* NB: Disregard the rule against updating * exp_connect_data.ocd_connect_flags in this case, since * tgt_client_new() needs to know if this is a lightweight @@ -5386,7 +5621,7 @@ static int mdt_connect_internal(struct obd_export *exp, if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) { CWARN("%s: MDS requires FID support, but client not\n", - mdt_obd_name(mdt)); + obd_name); return -EBADE; } @@ -5414,6 +5649,33 @@ static int mdt_connect_internal(struct obd_export *exp, spin_unlock(&exp->exp_lock); } + if (OCD_HAS_FLAG(data, CKSUM)) { + __u32 cksum_types = data->ocd_cksum_types; + + /* The client set in ocd_cksum_types the checksum types it + * supports. We have to mask off the algorithms that we don't + * support */ + data->ocd_cksum_types &= + obd_cksum_types_supported_server(obd_name); + + if (unlikely(data->ocd_cksum_types == 0)) { + CERROR("%s: Connect with checksum support but no " + "ocd_cksum_types is set\n", + exp->exp_obd->obd_name); + RETURN(-EPROTO); + } + + CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " + "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp), + cksum_types, data->ocd_cksum_types); + } else { + /* This client does not support OBD_CONNECT_CKSUM + * fall back to CRC32 */ + CDEBUG(D_RPCTRACE, "%s: cli %s does not support " + "OBD_CONNECT_CKSUM, CRC32 will be used\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp)); + } + return 0; } @@ -5462,7 +5724,7 @@ static int mdt_export_cleanup(struct obd_export *exp) /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ - class_handle_unhash(&mfd->mfd_handle); + class_handle_unhash(&mfd->mfd_open_handle); list_move_tail(&mfd->mfd_list, &closing_list); } spin_unlock(&med->med_open_lock); @@ -5503,7 +5765,7 @@ static int mdt_export_cleanup(struct obd_export *exp) * archive request into a noop if it's not actually * dirty. */ - if (mfd->mfd_mode & FMODE_WRITE) + if (mfd->mfd_open_flags & MDS_FMODE_WRITE) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */ @@ -5538,11 +5800,15 @@ static inline void mdt_disable_slc(struct mdt_device *mdt) static int mdt_obd_disconnect(struct obd_export *exp) { - int rc; - ENTRY; + int rc; + + ENTRY; - LASSERT(exp); - class_export_get(exp); + LASSERT(exp); + class_export_get(exp); + + if (!(exp->exp_flags & OBD_OPT_FORCE)) + tgt_grant_sanity_check(exp->exp_obd, __func__); if ((exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) && !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)) { @@ -5556,6 +5822,8 @@ static int mdt_obd_disconnect(struct obd_export *exp) if (rc != 0) CDEBUG(D_IOCTL, "server disconnect error: rc = %d\n", rc); + tgt_grant_discard(exp); + rc = mdt_export_cleanup(exp); nodemap_del_member(exp); class_export_put(exp); @@ -5617,7 +5885,7 @@ static int mdt_obd_connect(const struct lu_env *env, if (rc != 0 && rc != -EEXIST) GOTO(out, rc); - rc = mdt_connect_internal(lexp, mdt, data); + rc = mdt_connect_internal(env, lexp, mdt, data, false); if (rc == 0) { struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; @@ -5663,7 +5931,8 @@ static int mdt_obd_reconnect(const struct lu_env *env, if (rc != 0 && rc != -EEXIST) RETURN(rc); - rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data); + rc = mdt_connect_internal(env, exp, mdt_dev(obd->obd_lu_dev), data, + true); if (rc == 0) mdt_export_stats_init(obd, exp, localdata); else @@ -5725,6 +5994,17 @@ static int mdt_destroy_export(struct obd_export *exp) LASSERT(list_empty(&exp->exp_outstanding_replies)); LASSERT(list_empty(&exp->exp_mdt_data.med_open_head)); + /* + * discard grants once we're sure no more + * interaction with the client is possible + */ + tgt_grant_discard(exp); + if (exp_connect_flags(exp) & OBD_CONNECT_GRANT) + exp->exp_obd->u.obt.obt_lut->lut_tgd.tgd_tot_granted_clients--; + + if (!(exp->exp_flags & OBD_OPT_FORCE)) + tgt_grant_sanity_check(exp->exp_obd, __func__); + RETURN(0); } @@ -6290,6 +6570,9 @@ static struct obd_ops mdt_obd_device_ops = { .o_destroy_export = mdt_destroy_export, .o_iocontrol = mdt_iocontrol, .o_postrecov = mdt_obd_postrecov, + /* Data-on-MDT IO methods */ + .o_preprw = mdt_obd_preprw, + .o_commitrw = mdt_obd_commitrw, }; static struct lu_device* mdt_device_fini(const struct lu_env *env, @@ -6381,12 +6664,12 @@ struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info) * \param mdt mdt device * \param val 0 disables COS, other values enable COS */ -void mdt_enable_cos(struct mdt_device *mdt, int val) +void mdt_enable_cos(struct mdt_device *mdt, bool val) { struct lu_env env; int rc; - mdt->mdt_opts.mo_cos = !!val; + mdt->mdt_opts.mo_cos = val; rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) { CWARN("%s: lu_env initialization failed, cannot "