* GPL HEADER END
*/
/*
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
* Use is subject to license terms.
*
* lustre/mdt/mdt_lvb.c
*/
#define DEBUG_SUBSYSTEM S_MDS
-
+#include <lustre_swab.h>
#include "mdt_internal.h"
/* Called with res->lr_lvb_sem held */
-static int mdt_lvbo_init(struct ldlm_resource *res)
+static int mdt_lvbo_init(const struct lu_env *env, struct ldlm_resource *res)
{
if (IS_LQUOTA_RES(res)) {
struct mdt_device *mdt;
/* call lvbo init function of quota master */
return qmt_hdls.qmth_lvbo_init(mdt->mdt_qmt_dev, res);
}
+ return 0;
+}
+
+int mdt_dom_lvb_alloc(struct ldlm_resource *res)
+{
+ struct ost_lvb *lvb;
+
+ mutex_lock(&res->lr_lvb_mutex);
+ if (res->lr_lvb_data == NULL) {
+ OBD_ALLOC_PTR(lvb);
+ if (lvb == NULL) {
+ mutex_unlock(&res->lr_lvb_mutex);
+ return -ENOMEM;
+ }
+ res->lr_lvb_data = lvb;
+ res->lr_lvb_len = sizeof(*lvb);
+
+ /* Store error in LVB to inidicate it has no data yet.
+ */
+ OST_LVB_SET_ERR(lvb->lvb_blocks, -ENODATA);
+ }
+ mutex_unlock(&res->lr_lvb_mutex);
return 0;
}
-static int mdt_lvbo_update(struct ldlm_resource *res,
- struct ldlm_lock *lock,
- struct ptlrpc_request *req,
+int mdt_dom_lvb_is_valid(struct ldlm_resource *res)
+{
+ struct ost_lvb *res_lvb = res->lr_lvb_data;
+
+ return !(res_lvb == NULL || OST_LVB_IS_ERR(res_lvb->lvb_blocks));
+}
+
+int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
+ struct ldlm_resource *res, bool increase_only)
+{
+ struct mdt_thread_info *info = mdt_th_info(env);
+ const struct lu_fid *fid = mdt_object_fid(mo);
+ struct ost_lvb *lvb;
+ struct md_attr *ma;
+ int rc = 0;
+
+ ENTRY;
+
+ lvb = res->lr_lvb_data;
+ LASSERT(lvb);
+
+ if (!mdt_object_exists(mo) || mdt_object_remote(mo))
+ RETURN(-ENOENT);
+
+ ma = &info->mti_attr2;
+ ma->ma_valid = 0;
+ ma->ma_need = MA_INODE;
+ rc = mo_attr_get(env, mdt_object_child(mo), ma);
+ if (rc)
+ RETURN(rc);
+
+ lock_res(res);
+ if (ma->ma_attr.la_size > lvb->lvb_size || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_size, ma->ma_attr.la_size);
+ lvb->lvb_size = ma->ma_attr.la_size;
+ }
+
+ if (ma->ma_attr.la_mtime > lvb->lvb_mtime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_mtime, ma->ma_attr.la_mtime);
+ lvb->lvb_mtime = ma->ma_attr.la_mtime;
+ }
+ if (ma->ma_attr.la_atime > lvb->lvb_atime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_atime, ma->ma_attr.la_atime);
+ lvb->lvb_atime = ma->ma_attr.la_atime;
+ }
+ if (ma->ma_attr.la_ctime > lvb->lvb_ctime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_ctime, ma->ma_attr.la_ctime);
+ lvb->lvb_ctime = ma->ma_attr.la_ctime;
+ }
+ if (ma->ma_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
+ "%llu -> %llu\n", PFID(fid), lvb->lvb_blocks,
+ (unsigned long long)ma->ma_attr.la_blocks);
+ lvb->lvb_blocks = ma->ma_attr.la_blocks;
+ }
+ unlock_res(res);
+
+ RETURN(rc);
+}
+
+int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+ struct ldlm_lock *lock, struct ptlrpc_request *req,
+ bool increase_only)
+{
+ struct obd_export *exp = lock ? lock->l_export : NULL;
+ struct mdt_device *mdt;
+ struct mdt_object *mo;
+ struct mdt_thread_info *info;
+ struct ost_lvb *lvb;
+ struct lu_fid *fid;
+ int rc = 0;
+
+ ENTRY;
+
+ /* Before going further let's check that OBD and export are healthy.
+ */
+ if (exp != NULL &&
+ (exp->exp_disconnected || exp->exp_failed ||
+ exp->exp_obd->obd_stopping)) {
+ CDEBUG(D_INFO, "Skip LVB update, export is %s, obd is %s\n",
+ exp->exp_failed ? "failed" : "disconnected",
+ exp->exp_obd->obd_stopping ? "stopping" : "OK");
+ RETURN(0);
+ }
+
+ rc = mdt_dom_lvb_alloc(res);
+ if (rc < 0)
+ RETURN(rc);
+
+ mdt = ldlm_res_to_ns(res)->ns_lvbp;
+ if (mdt == NULL)
+ RETURN(-ENOENT);
+
+ LASSERT(env);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info) {
+ rc = lu_env_refill_by_tags((struct lu_env *)env,
+ LCT_MD_THREAD, 0);
+ if (rc)
+ GOTO(out_env, rc);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info)
+ GOTO(out_env, rc = -ENOMEM);
+ }
+ if (!info->mti_exp)
+ info->mti_exp = req ? req->rq_export : NULL;
+ if (!info->mti_mdt)
+ info->mti_mdt = mdt;
+
+ fid = &info->mti_tmp_fid2;
+ fid_extract_from_res_name(fid, &res->lr_name);
+
+ lvb = res->lr_lvb_data;
+ LASSERT(lvb);
+
+ /* Update the LVB from the network message */
+ if (req != NULL) {
+ struct ost_lvb *rpc_lvb;
+
+ rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
+ &RMF_DLM_LVB,
+ lustre_swab_ost_lvb);
+ if (rpc_lvb == NULL)
+ goto disk_update;
+
+ lock_res(res);
+ if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_size, rpc_lvb->lvb_size);
+ lvb->lvb_size = rpc_lvb->lvb_size;
+ }
+ if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_mtime, rpc_lvb->lvb_mtime);
+ lvb->lvb_mtime = rpc_lvb->lvb_mtime;
+ }
+ if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_atime, rpc_lvb->lvb_atime);
+ lvb->lvb_atime = rpc_lvb->lvb_atime;
+ }
+ if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_ctime, rpc_lvb->lvb_ctime);
+ lvb->lvb_ctime = rpc_lvb->lvb_ctime;
+ }
+ if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
+ "%llu -> %llu\n", PFID(fid),
+ lvb->lvb_blocks, rpc_lvb->lvb_blocks);
+ lvb->lvb_blocks = rpc_lvb->lvb_blocks;
+ }
+ unlock_res(res);
+ }
+
+disk_update:
+ /* Update the LVB from the disk inode */
+ mo = mdt_object_find(env, mdt, fid);
+ if (IS_ERR(mo))
+ GOTO(out_env, rc = PTR_ERR(mo));
+
+ rc = mdt_dom_disk_lvbo_update(env, mo, res, !!increase_only);
+ mdt_object_put(env, mo);
+out_env:
+ return rc;
+}
+
+static int mdt_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+ struct ldlm_lock *lock, struct ptlrpc_request *req,
int increase_only)
{
+ ENTRY;
+
if (IS_LQUOTA_RES(res)) {
- struct mdt_device *mdt;
+ struct mdt_device *mdt;
mdt = ldlm_res_to_ns(res)->ns_lvbp;
if (mdt->mdt_qmt_dev == NULL)
increase_only);
}
+ /* Data-on-MDT lvbo update.
+ * Like a ldlm_lock_init() the lock can be skipped and that means
+ * it is DOM resource because lvbo_update() without lock is called
+ * by MDT for DOM objects only.
+ */
+ if (lock == NULL || ldlm_has_dom(lock))
+ return mdt_dom_lvbo_update(env, res, lock, req,
+ !!increase_only);
return 0;
}
return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock);
}
+ if (ldlm_has_dom(lock))
+ return sizeof(struct ost_lvb);
+
if (ldlm_has_layout(lock))
return mdt->mdt_max_mdsize;
return 0;
}
-static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
+/**
+ * Implementation of ldlm_valblock_ops::lvbo_fill for MDT.
+ *
+ * This function is called to fill the given RPC buffer \a buf with LVB data
+ *
+ * \param[in] env execution environment
+ * \param[in] lock LDLM lock
+ * \param[in] buf RPC buffer to fill
+ * \param[in,out] lvblen lvb buffer length
+ *
+ * \retval size of LVB data written into \a buf buffer
+ * or -ERANGE when the provided @lvblen is not big enough,
+ * and the needed lvb buffer size will be returned in
+ * @lvblen
+ */
+static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
+ void *lvb, int *lvblen)
{
- struct lu_env env;
struct mdt_thread_info *info;
struct mdt_device *mdt;
struct lu_fid *fid;
struct mdt_object *obj = NULL;
struct md_object *child = NULL;
+ struct lu_env _env;
int rc;
ENTRY;
+ if (!env) {
+ rc = lu_env_init(&_env, LCT_DT_THREAD);
+ if (rc)
+ RETURN(rc);
+ env = &_env;
+ }
+
mdt = ldlm_lock_to_ns(lock)->ns_lvbp;
if (IS_LQUOTA_RES(lock->l_resource)) {
if (mdt->mdt_qmt_dev == NULL)
- RETURN(0);
+ GOTO(out_env, rc = 0);
/* call lvbo fill function of quota master */
rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
- lvblen);
- RETURN(rc);
+ *lvblen);
+ GOTO(out_env, rc);
+ }
+
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info) {
+ rc = lu_env_refill_by_tags((struct lu_env *)env,
+ LCT_MD_THREAD, 0);
+ if (rc)
+ GOTO(out, rc);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info)
+ GOTO(out, rc = -ENOMEM);
+ }
+ if (!info->mti_env)
+ info->mti_env = env;
+ if (!info->mti_exp)
+ info->mti_exp = lock->l_export;
+ if (!info->mti_mdt)
+ info->mti_mdt = mdt;
+
+ /* LVB for DoM lock is needed only for glimpse,
+ * don't fill DoM data if there is layout lock */
+ if (ldlm_has_dom(lock)) {
+ struct ldlm_resource *res = lock->l_resource;
+ int lvb_len = sizeof(struct ost_lvb);
+
+ if (!mdt_dom_lvb_is_valid(res))
+ mdt_dom_lvbo_update(env, lock->l_resource,
+ lock, NULL, 0);
+
+ if (lvb_len > *lvblen)
+ lvb_len = *lvblen;
+
+ lock_res(res);
+ memcpy(lvb, res->lr_lvb_data, lvb_len);
+ unlock_res(res);
+
+ GOTO(out, rc = lvb_len);
}
/* Only fill layout if layout lock is granted */
if (!ldlm_has_layout(lock) || lock->l_granted_mode != lock->l_req_mode)
- RETURN(0);
-
- /* layout lock will be granted to client, fill in lvb with layout */
-
- /* XXX create an env to talk to mdt stack. We should get this env from
- * ptlrpc_thread->t_env. */
- rc = lu_env_init(&env, LCT_MD_THREAD);
- /* Likely ENOMEM */
- if (rc)
- RETURN(rc);
-
- info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
- /* Likely ENOMEM */
- if (info == NULL)
- GOTO(out, rc = -ENOMEM);
-
- memset(info, 0, sizeof *info);
- info->mti_env = &env;
- info->mti_exp = lock->l_export;
- info->mti_mdt = mdt;
+ GOTO(out, rc = 0);
/* XXX get fid by resource id. why don't include fid in ldlm_resource */
fid = &info->mti_tmp_fid2;
fid_extract_from_res_name(fid, &lock->l_resource->lr_name);
- obj = mdt_object_find(&env, info->mti_mdt, fid);
+ obj = mdt_object_find(env, info->mti_mdt, fid);
if (IS_ERR(obj))
GOTO(out, rc = PTR_ERR(obj));
if (!mdt_object_exists(obj) || mdt_object_remote(obj))
- GOTO(out, rc = -ENOENT);
+ GOTO(out_put, rc = -ENOENT);
child = mdt_object_child(obj);
/* get the length of lsm */
- rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
+ rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
if (rc < 0)
- GOTO(out, rc);
-
+ GOTO(out_put, rc);
if (rc > 0) {
struct lu_buf *lmm = NULL;
-
- if (lvblen < rc) {
- CERROR("%s: expected %d actual %d.\n",
- mdt_obd_name(mdt), rc, lvblen);
- GOTO(out, rc = -ERANGE);
+ if (*lvblen < rc) {
+ int level;
+
+ /* The layout EA may be larger than mdt_max_mdsize
+ * and in that case mdt_max_mdsize is just updated
+ * but if EA size is less than mdt_max_mdsize then
+ * it is an error in lvblen value provided. */
+ if (rc > info->mti_mdt->mdt_max_mdsize) {
+ info->mti_mdt->mdt_max_mdsize = rc;
+ level = D_INFO;
+ } else {
+ level = D_ERROR;
+ }
+ CDEBUG_LIMIT(level, "%s: small buffer size %d for EA "
+ "%d (max_mdsize %d): rc = %d\n",
+ mdt_obd_name(mdt), *lvblen, rc,
+ info->mti_mdt->mdt_max_mdsize, -ERANGE);
+ *lvblen = rc;
+ GOTO(out_put, rc = -ERANGE);
}
-
lmm = &info->mti_buf;
lmm->lb_buf = lvb;
lmm->lb_len = rc;
-
- rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV);
+ rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV);
if (rc < 0)
- GOTO(out, rc);
+ GOTO(out_put, rc);
}
-out:
+out_put:
if (obj != NULL && !IS_ERR(obj))
- mdt_object_put(&env, obj);
- lu_env_fini(&env);
- RETURN(rc < 0 ? 0 : rc);
+ mdt_object_put(env, obj);
+out:
+ if (rc < 0 && rc != -ERANGE)
+ rc = 0;
+out_env:
+ if (env == &_env)
+ lu_env_fini(&_env);
+ RETURN(rc);
}
static int mdt_lvbo_free(struct ldlm_resource *res)
return qmt_hdls.qmth_lvbo_free(mdt->mdt_qmt_dev, res);
}
+ /* Data-on-MDT lvbo free */
+ if (res->lr_lvb_data != NULL)
+ OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
return 0;
}