X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_lvb.c;h=729cb8ea6394047bc67fc003ffd7ae2bda2f9b0e;hb=4affa48f676b9ecb47b9827e82f76a09587ceea1;hp=d3fdb5181ed66ee3db079c08bbcdbe20a4d4ea54;hpb=ccabce23bd9e366c345c852f565766a799f61238;p=fs%2Flustre-release.git diff --git a/lustre/mdt/mdt_lvb.c b/lustre/mdt/mdt_lvb.c index d3fdb51..729cb8e 100644 --- a/lustre/mdt/mdt_lvb.c +++ b/lustre/mdt/mdt_lvb.c @@ -96,7 +96,7 @@ int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo, if (!mdt_object_exists(mo) || mdt_object_remote(mo)) RETURN(-ENOENT); - ma = &info->mti_attr; + ma = &info->mti_attr2; ma->ma_valid = 0; ma->ma_need = MA_INODE; rc = mo_attr_get(env, mdt_object_child(mo), ma); @@ -144,24 +144,21 @@ int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, struct ptlrpc_request *req, bool increase_only) { struct obd_export *exp = lock ? lock->l_export : NULL; + const struct lu_env *env = lu_env_find(); struct mdt_device *mdt; struct mdt_object *mo; struct mdt_thread_info *info; struct ost_lvb *lvb; - struct lu_env env; struct lu_fid *fid; int rc = 0; ENTRY; /* Before going further let's check that OBD and export are healthy. + * The condition matches one in ptlrpc_send_reply() */ - if (exp != NULL && - (exp->exp_disconnected || exp->exp_failed || - exp->exp_obd->obd_stopping)) { - CDEBUG(D_INFO, "Skip LVB update, export is %s, obd is %s\n", - exp->exp_failed ? "failed" : "disconnected", - exp->exp_obd->obd_stopping ? "stopping" : "OK"); + if (exp && exp->exp_obd && exp->exp_obd->obd_fail) { + CDEBUG(D_INFO, "Skip LVB update, obd is failing over\n"); RETURN(0); } @@ -173,18 +170,17 @@ int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, if (mdt == NULL) RETURN(-ENOENT); - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); - - info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - if (info == NULL) - GOTO(out_env, rc = -ENOMEM); - - memset(info, 0, sizeof *info); - info->mti_env = &env; - info->mti_exp = req ? req->rq_export : NULL; - info->mti_mdt = mdt; + LASSERT(env); + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) { + rc = lu_env_refill_by_tags((struct lu_env *)env, + LCT_MD_THREAD, 0); + if (rc) + GOTO(out_env, rc); + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) + GOTO(out_env, rc = -ENOMEM); + } fid = &info->mti_tmp_fid2; fid_extract_from_res_name(fid, &res->lr_name); @@ -238,14 +234,13 @@ int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, disk_update: /* Update the LVB from the disk inode */ - mo = mdt_object_find(&env, mdt, fid); + mo = mdt_object_find(env, mdt, fid); if (IS_ERR(mo)) GOTO(out_env, rc = PTR_ERR(mo)); - rc = mdt_dom_disk_lvbo_update(&env, mo, res, !!increase_only); - mdt_object_put(&env, mo); + rc = mdt_dom_disk_lvbo_update(env, mo, res, !!increase_only); + mdt_object_put(env, mo); out_env: - lu_env_fini(&env); return rc; } @@ -283,7 +278,8 @@ static int mdt_lvbo_size(struct ldlm_lock *lock) /* resource on server side never changes. */ mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp; - LASSERT(mdt != NULL); + if (!mdt) + return 0; if (IS_LQUOTA_RES(lock->l_resource)) { if (mdt->mdt_qmt_dev == NULL) @@ -293,6 +289,11 @@ static int mdt_lvbo_size(struct ldlm_lock *lock) return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock); } + /* Always prefer DoM LVB data because layout is never returned in + * LVB when lock bits are combined with DoM, this is either GETATTR + * or OPEN enqueue. Meanwhile GL AST can be issued on such combined + * lock bits and it uses LVB for DoM data. + */ if (ldlm_has_dom(lock)) return sizeof(struct ost_lvb); @@ -302,117 +303,152 @@ static int mdt_lvbo_size(struct ldlm_lock *lock) return 0; } -static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) +/** + * Implementation of ldlm_valblock_ops::lvbo_fill for MDT. + * + * This function is called to fill the given RPC buffer \a buf with LVB data + * + * \param[in] env execution environment + * \param[in] lock LDLM lock + * \param[in] buf RPC buffer to fill + * \param[in,out] lvblen lvb buffer length + * + * \retval size of LVB data written into \a buf buffer + * or -ERANGE when the provided @lvblen is not big enough, + * and the needed lvb buffer size will be returned in + * @lvblen + */ +static int mdt_lvbo_fill(struct ldlm_lock *lock, + void *lvb, int *lvblen) { - struct lu_env env; struct mdt_thread_info *info; struct mdt_device *mdt; + struct lu_env *env; struct lu_fid *fid; struct mdt_object *obj = NULL; struct md_object *child = NULL; int rc; ENTRY; + env = lu_env_find(); + LASSERT(env); + mdt = ldlm_lock_to_ns(lock)->ns_lvbp; + if (!mdt) + RETURN(0); + if (IS_LQUOTA_RES(lock->l_resource)) { if (mdt->mdt_qmt_dev == NULL) - RETURN(0); + GOTO(out, rc = 0); /* call lvbo fill function of quota master */ rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb, - lvblen); - RETURN(rc); + *lvblen); + GOTO(out, rc); + } + + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) { + rc = lu_env_refill_by_tags(env, LCT_MD_THREAD, 0); + if (rc) + GOTO(out, rc); + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) + GOTO(out, rc = -ENOMEM); } - /* LVB for DoM lock is needed only for glimpse, - * don't fill DoM data if there is layout lock */ + /* DOM LVB is used by glimpse and IO completion when + * DoM bits is always alone. + * If DoM bit is combined with any other bit then it is + * intent OPEN or GETATTR lock which is not filling + * LVB buffer in reply neither for DoM nor for LAYOUT. + */ if (ldlm_has_dom(lock)) { struct ldlm_resource *res = lock->l_resource; int lvb_len = sizeof(struct ost_lvb); if (!mdt_dom_lvb_is_valid(res)) - mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0); - - if (lvb_len > lvblen) - lvb_len = lvblen; + mdt_dom_lvbo_update(res, lock, NULL, 0); + LASSERT(*lvblen >= lvb_len); lock_res(res); memcpy(lvb, res->lr_lvb_data, lvb_len); unlock_res(res); - - RETURN(lvb_len); + GOTO(out, rc = lvb_len); } /* Only fill layout if layout lock is granted */ - if (!ldlm_has_layout(lock) || lock->l_granted_mode != lock->l_req_mode) - RETURN(0); - - /* XXX create an env to talk to mdt stack. We should get this env from - * ptlrpc_thread->t_env. */ - rc = lu_env_init(&env, LCT_MD_THREAD); - /* Likely ENOMEM */ - if (rc) - RETURN(rc); - - info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - /* Likely ENOMEM */ - if (info == NULL) - GOTO(out, rc = -ENOMEM); - - memset(info, 0, sizeof *info); - info->mti_env = &env; - info->mti_exp = lock->l_export; - info->mti_mdt = mdt; + if (!ldlm_has_layout(lock) || !ldlm_is_granted(lock)) + GOTO(out, rc = 0); /* XXX get fid by resource id. why don't include fid in ldlm_resource */ fid = &info->mti_tmp_fid2; fid_extract_from_res_name(fid, &lock->l_resource->lr_name); - obj = mdt_object_find(&env, info->mti_mdt, fid); + obj = mdt_object_find(env, mdt, fid); if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); if (!mdt_object_exists(obj) || mdt_object_remote(obj)) - GOTO(out, rc = -ENOENT); + GOTO(out_put, rc = -ENOENT); child = mdt_object_child(obj); /* get the length of lsm */ - rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV); + rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV); if (rc < 0) - GOTO(out, rc); + GOTO(out_put, rc); if (rc > 0) { struct lu_buf *lmm = NULL; - if (lvblen < rc) { - CERROR("%s: expected %d actual %d.\n", - mdt_obd_name(mdt), rc, lvblen); - /* if layout size is bigger then update max_mdsize */ - if (rc > info->mti_mdt->mdt_max_mdsize) - info->mti_mdt->mdt_max_mdsize = rc; - GOTO(out, rc = -ERANGE); + if (*lvblen < rc) { + int level; + + /* The layout EA may be larger than mdt_max_mdsize + * and in that case mdt_max_mdsize is just updated + * but if EA size is less than mdt_max_mdsize then + * it is an error in lvblen value provided. */ + if (rc > mdt->mdt_max_mdsize) { + mdt->mdt_max_mdsize = rc; + level = D_INFO; + } else { + /* The PFL layout EA could be enlarged when + * the corresponding layout of some IO range + * is started to be written, which can cause + * other thread to get incorrect layout size + * at mdt_intent_layout, see LU-13261. */ + level = D_LAYOUT; + } + CDEBUG_LIMIT(level, "%s: small buffer size %d for EA " + "%d (max_mdsize %d): rc = %d\n", + mdt_obd_name(mdt), *lvblen, rc, + mdt->mdt_max_mdsize, -ERANGE); + *lvblen = rc; + GOTO(out_put, rc = -ERANGE); } lmm = &info->mti_buf; lmm->lb_buf = lvb; lmm->lb_len = rc; - rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV); + rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV); if (rc < 0) - GOTO(out, rc); + GOTO(out_put, rc); } -out: +out_put: if (obj != NULL && !IS_ERR(obj)) - mdt_object_put(&env, obj); - lu_env_fini(&env); - RETURN(rc < 0 ? 0 : rc); + mdt_object_put(env, obj); +out: + if (rc < 0 && rc != -ERANGE) + rc = 0; + RETURN(rc); } static int mdt_lvbo_free(struct ldlm_resource *res) { if (IS_LQUOTA_RES(res)) { - struct mdt_device *mdt; + struct mdt_device *mdt; mdt = ldlm_res_to_ns(res)->ns_lvbp; - if (mdt->mdt_qmt_dev == NULL) + if (!mdt || !mdt->mdt_qmt_dev) return 0; /* call lvbo free function of quota master */