static int obf_attr_get(const struct lu_env *env, struct md_object *obj,
struct md_attr *ma)
{
- int rc = 0;
-
- if (ma->ma_need & MA_INODE) {
- struct mdd_device *mdd = mdo2mdd(obj);
-
- /* "fid" is a virtual object and hence does not have any "real"
- * attributes. So we reuse attributes of .lustre for "fid" dir */
- ma->ma_need |= MA_INODE;
- rc = mdd_attr_get(env, &mdd->mdd_dot_lustre->mod_obj, ma);
- if (rc)
- return rc;
- ma->ma_valid |= MA_INODE;
- }
-
- /* "fid" directory does not have any striping information. */
- if (ma->ma_need & MA_LOV) {
- struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
- if (ma->ma_valid & MA_LOV)
- return 0;
-
- if (!(S_ISREG(mdd_object_type(mdd_obj)) ||
- S_ISDIR(mdd_object_type(mdd_obj))))
- return 0;
+ struct mdd_device *mdd = mdo2mdd(obj);
- if (ma->ma_need & MA_LOV_DEF) {
- rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
- if (rc > 0) {
- ma->ma_lmm_size = rc;
- ma->ma_valid |= MA_LOV;
- rc = 0;
- }
- }
- }
-
- return rc;
+ /* "fid" is a virtual object and hence does not have any "real"
+ * attributes. So we reuse attributes of .lustre for "fid" dir */
+ return mdd_attr_get(env, &mdd->mdd_dot_lustre->mod_obj, ma);
}
static int obf_attr_set(const struct lu_env *env, struct md_object *obj,
struct md_object *obj, struct lu_buf *buf,
const char *name)
{
- return 0;
+ int rc = 0;
+
+ /* XXX: a temp. solution till LOD/OSP is landed */
+ if (strcmp(name, XATTR_NAME_LOV) == 0) {
+ if (buf->lb_buf == NULL) {
+ rc = sizeof(struct lov_user_md);
+ } else if (buf->lb_len >= sizeof(struct lov_user_md)) {
+ rc = mdd_get_default_md(md2mdd_obj(obj), buf->lb_buf);
+ } else {
+ rc = -ERANGE;
+ }
+ }
+
+ return rc;
}
static int obf_xattr_set(const struct lu_env *env,
rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
lmm_size, handle, 0);
- if (rc == 0)
- rc = mdd_attr_get_internal_locked(env, son, ma);
-
/* update lov_objid data, must be before transaction stop! */
if (rc == 0)
mdd_lov_objid_update(mdd, lmm);
CERROR("error on stripe info copy %d \n", rc);
GOTO(cleanup, rc);
}
- if (lmm && lmm_size > 0) {
- /* Set Lov here, do not get lmm again later */
- if (lmm_size > ma->ma_lmm_size) {
- /* Reply buffer is smaller, need bigger one */
- mdd_max_lmm_buffer(env, lmm_size);
- if (unlikely(info->mti_max_lmm == NULL))
- GOTO(cleanup, rc = -ENOMEM);
- ma->ma_lmm = info->mti_max_lmm;
- ma->ma_big_lmm_used = 1;
- }
- memcpy(ma->ma_lmm, lmm, lmm_size);
- ma->ma_lmm_size = lmm_size;
- ma->ma_valid |= MA_LOV;
- }
if (S_ISLNK(attr->la_mode)) {
struct md_ucred *uc = md_ucred(env);
if (rc)
GOTO(cleanup, rc);
- /* Return attr back. */
- rc = mdd_attr_get_internal_locked(env, son, ma);
EXIT;
cleanup:
if (rc != 0 && created != 0) {
struct md_attr *ma, struct thandle *handle);
int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
struct md_attr *ma);
-int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct md_attr *ma);
-int mdd_attr_get_internal_locked(const struct lu_env *env,
- struct mdd_object *mdd_obj,
- struct md_attr *ma);
int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
struct mdd_object *c, struct lu_attr *attr,
struct thandle *handle,
return 0;
}
-int mdd_def_acl_get(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct md_attr *ma);
int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
struct thandle *handle);
int __mdd_declare_acl_init(const struct lu_env *env, struct mdd_object *obj,
RETURN(rc);
}
-/* get the first parent fid from link EA */
-static int mdd_pfid_get(const struct lu_env *env,
- struct mdd_object *mdd_obj, struct md_attr *ma)
-{
- struct lu_buf *buf;
- struct link_ea_header *leh;
- struct link_ea_entry *lee;
- struct lu_fid *pfid = &ma->ma_pfid;
- ENTRY;
-
- if (ma->ma_valid & MA_PFID)
- RETURN(0);
-
- buf = mdd_links_get(env, mdd_obj);
- if (IS_ERR(buf))
- RETURN(PTR_ERR(buf));
-
- leh = buf->lb_buf;
- lee = (struct link_ea_entry *)(leh + 1);
- memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
- fid_be_to_cpu(pfid, pfid);
- ma->ma_valid |= MA_PFID;
- if (buf->lb_len > OBD_ALLOC_BIG)
- /* if we vmalloced a large buffer drop it */
- mdd_buf_put(buf);
- RETURN(0);
-}
-
int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
struct md_attr *ma)
{
RETURN(rc);
}
-/* get lmv EA only*/
-static int __mdd_lmv_get(const struct lu_env *env,
- struct mdd_object *mdd_obj, struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
- if (ma->ma_valid & MA_LMV)
- RETURN(0);
-
- rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
- XATTR_NAME_LMV);
- if (rc > 0) {
- ma->ma_valid |= MA_LMV;
- rc = 0;
- }
- RETURN(rc);
-}
-
-static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct md_attr *ma)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lustre_mdt_attrs *lma =
- (struct lustre_mdt_attrs *)info->mti_xattr_buf;
- int lma_size;
- int rc;
- ENTRY;
-
- /* If all needed data are already valid, nothing to do */
- if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
- (ma->ma_need & (MA_HSM | MA_SOM)))
- RETURN(0);
-
- /* Read LMA from disk EA */
- lma_size = sizeof(info->mti_xattr_buf);
- rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
- if (rc <= 0)
- RETURN(rc);
-
- /* Useless to check LMA incompatibility because this is already done in
- * osd_ea_fid_get(), and this will fail long before this code is
- * called.
- * So, if we are here, LMA is compatible.
- */
-
- lustre_lma_swab(lma);
-
- /* Swab and copy LMA */
- if (ma->ma_need & MA_HSM) {
- if (lma->lma_compat & LMAC_HSM)
- ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
- else
- ma->ma_hsm.mh_flags = 0;
- ma->ma_valid |= MA_HSM;
- }
-
- /* Copy SOM */
- if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
- LASSERT(ma->ma_som != NULL);
- ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
- ma->ma_som->msd_size = lma->lma_som_size;
- ma->ma_som->msd_blocks = lma->lma_som_blocks;
- ma->ma_som->msd_mountid = lma->lma_som_mountid;
- ma->ma_valid |= MA_SOM;
- }
-
- RETURN(0);
-}
-
-int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct md_attr *ma)
-{
- int rc = 0;
- ENTRY;
-
- if (ma->ma_need & MA_INODE)
- rc = mdd_iattr_get(env, mdd_obj, ma);
-
- if (rc == 0 && ma->ma_need & MA_LOV) {
- if (S_ISREG(mdd_object_type(mdd_obj)) ||
- S_ISDIR(mdd_object_type(mdd_obj)))
- rc = __mdd_lmm_get(env, mdd_obj, ma);
- }
- if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
- if (S_ISREG(mdd_object_type(mdd_obj)))
- rc = mdd_pfid_get(env, mdd_obj, ma);
- }
- if (rc == 0 && ma->ma_need & MA_LMV) {
- if (S_ISDIR(mdd_object_type(mdd_obj)))
- rc = __mdd_lmv_get(env, mdd_obj, ma);
- }
- if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
- if (S_ISREG(mdd_object_type(mdd_obj)))
- rc = __mdd_lma_get(env, mdd_obj, ma);
- }
-#ifdef CONFIG_FS_POSIX_ACL
- if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
- if (S_ISDIR(mdd_object_type(mdd_obj)))
- rc = mdd_def_acl_get(env, mdd_obj, ma);
- }
-#endif
- CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
- rc, ma->ma_valid, ma->ma_lmm);
- RETURN(rc);
-}
-
-int mdd_attr_get_internal_locked(const struct lu_env *env,
- struct mdd_object *mdd_obj, struct md_attr *ma)
-{
- int rc;
- int needlock = ma->ma_need &
- (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
-
- if (needlock)
- mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
- rc = mdd_attr_get_internal(env, mdd_obj, ma);
- if (needlock)
- mdd_read_unlock(env, mdd_obj);
- return rc;
-}
-
/*
* No permission check is needed.
*/
int rc;
ENTRY;
- rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
+ rc = mdd_iattr_get(env, mdd_obj, ma);
RETURN(rc);
}
const char *name)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
+ struct mdd_device *mdd = mdo2mdd(obj);
+ struct lu_fid rootfid;
+ int is_root;
int rc;
ENTRY;
mdd_object_capa(env, mdd_obj));
mdd_read_unlock(env, mdd_obj);
+ dt_root_get(env, mdd->mdd_child, &rootfid);
+ is_root = lu_fid_eq(mdd_object_fid(mdd_obj), &rootfid);
+
+ /* XXX: a temp. solution till LOD/OSP is landed */
+ if (rc == -ENODATA && strcmp(name, XATTR_NAME_LOV) == 0 && is_root) {
+ if (buf->lb_buf == NULL) {
+ rc = sizeof(struct lov_user_md);
+ } else if (buf->lb_len >= sizeof(struct lov_user_md)) {
+ rc = mdd_get_default_md(mdd_obj, buf->lb_buf);
+ } else {
+ rc = -ERANGE;
+ }
+ }
+
RETURN(rc);
}
#ifdef CONFIG_FS_POSIX_ACL
/*
- * Get default acl EA only.
- * Hold read_lock for mdd_obj.
- */
-int mdd_def_acl_get(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct md_attr *ma)
-{
- struct lu_buf *buf;
- int rc;
- ENTRY;
-
- if (ma->ma_valid & MA_ACL_DEF)
- RETURN(0);
-
- buf = mdd_buf_get(env, ma->ma_acl, ma->ma_acl_size);
- rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_ACL_DEFAULT,
- BYPASS_CAPA);
- if (rc > 0) {
- ma->ma_acl_size = rc;
- ma->ma_valid |= MA_ACL_DEF;
- rc = 0;
- } else if ((rc == -EOPNOTSUPP) || (rc == -ENODATA)) {
- rc = 0;
- }
- RETURN(rc);
-}
-
-/*
* Hold write_lock for o.
*/
int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
if (!S_ISREG(attr->la_mode)) {
b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
- } else if (ma->ma_need & MA_LOV && ma->ma_lmm_size == 0) {
+ } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
/* means no objects are allocated on osts. */
LASSERT(!(ma->ma_valid & MA_LOV));
/* just ignore blocks occupied by extend attributes on MDS */
EXIT;
}
+static int mdt_big_lmm_get(const struct lu_env *env, struct mdt_object *o,
+ struct md_attr *ma)
+{
+ struct mdt_thread_info *info;
+ int rc;
+ ENTRY;
+
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ LASSERT(info != NULL);
+ LASSERT(ma->ma_lmm_size > 0);
+ LASSERT(info->mti_big_lmm_used == 0);
+ rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
+ XATTR_NAME_LOV);
+ if (rc < 0)
+ RETURN(rc);
+
+ /* big_lmm may need to be grown */
+ if (info->mti_big_lmmsize < rc) {
+ int size = size_roundup_power2(rc);
+
+ if (info->mti_big_lmmsize > 0) {
+ /* free old buffer */
+ LASSERT(info->mti_big_lmm);
+ OBD_FREE_LARGE(info->mti_big_lmm,
+ info->mti_big_lmmsize);
+ info->mti_big_lmm = NULL;
+ info->mti_big_lmmsize = 0;
+ }
+
+ OBD_ALLOC_LARGE(info->mti_big_lmm, size);
+ if (info->mti_big_lmm == NULL)
+ RETURN(-ENOMEM);
+ info->mti_big_lmmsize = size;
+ }
+ LASSERT(info->mti_big_lmmsize >= rc);
+
+ info->mti_buf.lb_buf = info->mti_big_lmm;
+ info->mti_buf.lb_len = info->mti_big_lmmsize;
+ rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf,
+ XATTR_NAME_LOV);
+ if (rc < 0)
+ RETURN(rc);
+
+ info->mti_big_lmm_used = 1;
+ ma->ma_valid |= MA_LOV;
+ ma->ma_lmm = info->mti_big_lmm;
+ ma->ma_lmm_size = rc;
+
+ /* update mdt_max_mdsize so all clients will be aware about that */
+ if (info->mti_mdt->mdt_max_mdsize < rc)
+ info->mti_mdt->mdt_max_mdsize = rc;
+
+ RETURN(0);
+}
+
+int mdt_attr_get_lov(struct mdt_thread_info *info,
+ struct mdt_object *o, struct md_attr *ma)
+{
+ struct md_object *next = mdt_object_child(o);
+ struct lu_buf *buf = &info->mti_buf;
+ int rc;
+
+ buf->lb_buf = ma->ma_lmm;
+ buf->lb_len = ma->ma_lmm_size;
+ rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LOV);
+ if (rc > 0) {
+ ma->ma_lmm_size = rc;
+ ma->ma_valid |= MA_LOV;
+ rc = 0;
+ } else if (rc == -ENODATA) {
+ /* no LOV EA */
+ rc = 0;
+ } else if (rc == -ERANGE) {
+ rc = mdt_big_lmm_get(info->mti_env, o, ma);
+ }
+
+ return rc;
+}
+
+int mdt_attr_get_complex(struct mdt_thread_info *info,
+ struct mdt_object *o, struct md_attr *ma)
+{
+ const struct lu_env *env = info->mti_env;
+ struct md_object *next = mdt_object_child(o);
+ struct lu_buf *buf = &info->mti_buf;
+ u32 mode = lu_object_attr(&next->mo_lu);
+ int need = ma->ma_need;
+ int rc = 0, rc2;
+ ENTRY;
+
+ /* do we really need PFID */
+ LASSERT((ma->ma_need & MA_PFID) == 0);
+
+ ma->ma_valid = 0;
+
+ if (need & MA_INODE) {
+ ma->ma_need = MA_INODE;
+ rc = mo_attr_get(env, next, ma);
+ if (rc)
+ GOTO(out, rc);
+ ma->ma_valid |= MA_INODE;
+ }
+
+ if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
+ rc = mdt_attr_get_lov(info, o, ma);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ if (need & MA_LMV && S_ISDIR(mode)) {
+ buf->lb_buf = ma->ma_lmv;
+ buf->lb_len = ma->ma_lmv_size;
+ rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
+ if (rc2 > 0) {
+ ma->ma_lmv_size = rc2;
+ ma->ma_valid |= MA_LMV;
+ } else if (rc2 == -ENODATA) {
+ /* no LMV EA */
+ ma->ma_lmv_size = 0;
+ } else
+ GOTO(out, rc = rc2);
+ }
+
+
+ if (rc == 0 && S_ISREG(mode) && (need & (MA_HSM | MA_SOM))) {
+ struct lustre_mdt_attrs *lma;
+
+ lma = (struct lustre_mdt_attrs *)info->mti_xattr_buf;
+ CLASSERT(sizeof(*lma) <= sizeof(info->mti_xattr_buf));
+
+ buf->lb_buf = lma;
+ buf->lb_len = sizeof(info->mti_xattr_buf);
+ rc = mo_xattr_get(env, next, buf, XATTR_NAME_LMA);
+ if (rc > 0) {
+ lustre_lma_swab(lma);
+ /* Swab and copy LMA */
+ if (need & MA_HSM) {
+ if (lma->lma_compat & LMAC_HSM)
+ ma->ma_hsm.mh_flags =
+ lma->lma_flags & HSM_FLAGS_MASK;
+ else
+ ma->ma_hsm.mh_flags = 0;
+ ma->ma_valid |= MA_HSM;
+ }
+ /* Copy SOM */
+ if (need & MA_SOM && lma->lma_compat & LMAC_SOM) {
+ LASSERT(ma->ma_som != NULL);
+ ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
+ ma->ma_som->msd_size = lma->lma_som_size;
+ ma->ma_som->msd_blocks = lma->lma_som_blocks;
+ ma->ma_som->msd_mountid = lma->lma_som_mountid;
+ ma->ma_valid |= MA_SOM;
+ }
+ rc = 0;
+ } else if (rc == -ENODATA) {
+ rc = 0;
+ }
+ }
+
+#ifdef CONFIG_FS_POSIX_ACL
+ if (need & MA_ACL_DEF && S_ISDIR(mode)) {
+ buf->lb_buf = ma->ma_acl;
+ buf->lb_len = ma->ma_acl_size;
+ rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
+ if (rc2 > 0) {
+ ma->ma_acl_size = rc2;
+ ma->ma_valid |= MA_ACL_DEF;
+ } else if (rc2 == -ENODATA) {
+ /* no ACLs */
+ ma->ma_acl_size = 0;
+ } else
+ GOTO(out, rc = rc2);
+ }
+#endif
+out:
+ ma->ma_need = need;
+ CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+ rc, ma->ma_valid, ma->ma_lmm);
+ RETURN(rc);
+}
static int mdt_getattr_internal(struct mdt_thread_info *info,
struct mdt_object *o, int ma_need)
struct mdt_body *repbody;
struct lu_buf *buffer = &info->mti_buf;
int rc;
+ int is_root;
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
GOTO(out, rc = 0);
}
- buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
- buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
+ buffer->lb_len = reqbody->eadatasize;
+ if (buffer->lb_len > 0)
+ buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
+ else
+ buffer->lb_buf = NULL;
/* If it is dir object and client require MEA, then we got MEA */
if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
if (ma->ma_need & MA_SOM)
ma->ma_som = &info->mti_u.som.data;
- rc = mo_attr_get(env, next, ma);
+ rc = mdt_attr_get_complex(info, o, ma);
if (unlikely(rc)) {
CERROR("getattr error for "DFID": %d\n",
PFID(mdt_object_fid(o)), rc);
RETURN(rc);
}
+ is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid);
+
+ /* the Lustre protocol supposes to return default striping
+ * on the user-visible root if explicitly requested */
+ if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) &&
+ (ma->ma_need & MA_LOV_DEF && is_root) && (ma->ma_need & MA_LOV)) {
+ struct lu_fid rootfid;
+ struct mdt_object *root;
+ struct mdt_device *mdt = info->mti_mdt;
+
+ rc = dt_root_get(env, mdt->mdt_bottom, &rootfid);
+ if (rc)
+ RETURN(rc);
+ root = mdt_object_find(env, mdt, &rootfid);
+ if (IS_ERR(root))
+ RETURN(PTR_ERR(root));
+ rc = mdt_attr_get_lov(info, root, ma);
+ mdt_object_put(info->mti_env, root);
+ if (unlikely(rc)) {
+ CERROR("getattr error for "DFID": %d\n",
+ PFID(mdt_object_fid(o)), rc);
+ RETURN(rc);
+ }
+ }
+
if (likely(ma->ma_valid & MA_INODE))
mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
else
ma->ma_valid = 0;
ma->ma_need = MA_INODE;
- rc = mo_attr_get(info->mti_env,
- mdt_object_child(child), ma);
+ rc = mdt_attr_get_complex(info, child, ma);
if (unlikely(rc != 0))
GOTO(out_child, rc);
if (rc == 0) {
rc = mdt_object_sync(info);
if (rc == 0) {
- struct md_object *next;
const struct lu_fid *fid;
struct lu_attr *la = &info->mti_attr.ma_attr;
- next = mdt_object_child(info->mti_object);
info->mti_attr.ma_need = MA_INODE;
info->mti_attr.ma_valid = 0;
- rc = mo_attr_get(info->mti_env, next,
- &info->mti_attr);
+ rc = mdt_attr_get_complex(info, info->mti_object,
+ &info->mti_attr);
if (rc == 0) {
body = req_capsule_server_get(pill,
&RMF_MDT_BODY);
info->mti_no_need_trans = 0;
info->mti_cross_ref = 0;
info->mti_opdata = 0;
+ info->mti_big_lmm_used = 0;
/* To not check for split by default. */
info->mti_spec.sp_ck_split = 0;
rc = child_lu_dev->ld_ops->ldo_prepare(env,
&m->mdt_md_dev.md_lu_dev,
child_lu_dev);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = m->mdt_child->md_ops->mdo_root_get(env, m->mdt_child,
+ &m->mdt_md_root_fid);
+
out:
/* fini from last known good lu_device */
if (rc)
}
/* context key constructor/destructor: mdt_key_init, mdt_key_fini */
-LU_KEY_INIT_FINI(mdt, struct mdt_thread_info);
+LU_KEY_INIT(mdt, struct mdt_thread_info);
+
+static void mdt_key_fini(const struct lu_context *ctx,
+ struct lu_context_key *key, void* data)
+{
+ struct mdt_thread_info *info = data;
+
+ if (info->mti_big_lmm) {
+ OBD_FREE_LARGE(info->mti_big_lmm, info->mti_big_lmmsize);
+ info->mti_big_lmm = NULL;
+ info->mti_big_lmmsize = 0;
+ }
+ OBD_FREE_PTR(info);
+}
/* context key: mdt_thread_key */
LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD);
struct lprocfs_stats *mdt_stats;
int mdt_sec_level;
struct rename_stats mdt_rename_stats;
+ struct lu_fid mdt_md_root_fid;
};
#define MDT_SERVICE_WATCHDOG_FACTOR (2)
/* Ops object filename */
struct lu_name mti_name;
+ /* per-thread values, can be re-used */
+ void *mti_big_lmm;
+ int mti_big_lmmsize;
+ /* big_lmm buffer was used and must be used in reply */
+ int mti_big_lmm_used;
+ /* should be enough to fit lustre_mdt_attrs */
+ char mti_xattr_buf[128];
};
static inline const struct md_device_operations *
MDT_SOM_ENABLE = 1,
};
+int mdt_attr_get_complex(struct mdt_thread_info *info,
+ struct mdt_object *o, struct md_attr *ma);
int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
int created);
int mdt_object_is_som_enabled(struct mdt_object *mo);
/* MDT_MD buffer may be bigger than packed value, let's shrink all
* buffers before growing it */
- if (info->mti_attr.ma_big_lmm_used) {
+ if (info->mti_big_lmm_used) {
LASSERT(req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER));
md_packed = req_capsule_get_size(pill, &RMF_MDT_MD,
RCL_SERVER);
req_capsule_shrink(pill, &RMF_MDT_MD, 0, RCL_SERVER);
/* free big lmm if md_size is not needed */
if (md_size == 0)
- info->mti_attr.ma_big_lmm_used = 0;
+ info->mti_big_lmm_used = 0;
} else if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
req_capsule_shrink(pill, &RMF_MDT_MD, md_size, RCL_SERVER);
}
*/
/* Grow MD buffer if needed finally */
- if (info->mti_attr.ma_big_lmm_used) {
+ if (info->mti_big_lmm_used) {
void *lmm;
LASSERT(md_size > md_packed);
if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size)
info->mti_mdt->mdt_max_mdsize =
info->mti_attr.ma_lmm_size;
- info->mti_attr.ma_big_lmm_used = 0;
+ info->mti_big_lmm_used = 0;
}
RETURN(rc);
}
*/
if (rr->rr_eadatalen == 0 &&
!(info->mti_spec.sp_cr_flags & MDS_OPEN_DELAY_CREATE))
- rr->rr_eadatalen = MIN_MD_SIZE; }
+ rr->rr_eadatalen = MIN_MD_SIZE;
+ }
RETURN(0);
}
rc = mdo_create_data(info->mti_env,
p ? mdt_object_child(p) : NULL,
mdt_object_child(o), spec, ma);
+ if (rc == 0)
+ rc = mdt_attr_get_complex(info, o, ma);
+
if (rc == 0 && ma->ma_valid & MA_LOV)
o->mot_flags |= MOF_LOV_CREATED;
}
tmp_ma->ma_som = &info->mti_u.som.data;
tmp_ma->ma_need = MA_INODE | MA_LOV | MA_SOM;
tmp_ma->ma_valid = 0;
- rc = mo_attr_get(info->mti_env, mdt_object_child(o), tmp_ma);
+ rc = mdt_attr_get_complex(info, o, tmp_ma);
if (rc)
GOTO(error_up, rc);
}
rc = mdt_object_exists(child);
if (rc > 0) {
- struct md_object *next;
mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
- next = mdt_object_child(child);
- rc = mo_attr_get(env, next, ma);
+ rc = mdt_attr_get_complex(info, child, ma);
if (rc == 0)
rc = mdt_finish_open(info, parent, child,
flags, 1, ldlm_rep);
int mdt_open_by_fid(struct mdt_thread_info* info,
struct ldlm_reply *rep)
{
- const struct lu_env *env = info->mti_env;
__u32 flags = info->mti_spec.sp_cr_flags;
struct mdt_reint_record *rr = &info->mti_rr;
struct md_attr *ma = &info->mti_attr;
DISP_LOOKUP_EXECD |
DISP_LOOKUP_POS));
- rc = mo_attr_get(env, mdt_object_child(o), ma);
+ rc = mdt_attr_get_complex(info, o, ma);
if (rc == 0)
rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
} else if (rc == 0) {
if (rc)
GOTO(out, rc);
- rc = mo_attr_get(env, mdt_object_child(o), ma);
+ rc = mdt_attr_get_complex(info, o, ma);
if (rc)
GOTO(out, rc);
goto out;
mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
- rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
+ rc = mdt_attr_get_complex(info, o, ma);
if (rc == 0)
rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
} else if (rc == 0) {
mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
GOTO(out_child, result);
} else {
+
+ /* XXX: we should call this once, see few lines below */
+ if (result == 0)
+ result = mdt_attr_get_complex(info, child, ma);
+
if (result != 0)
GOTO(out_child, result);
}
created = 1;
} else {
/* We have to get attr & lov ea for this object */
- result = mo_attr_get(info->mti_env, mdt_object_child(child),
- ma);
+ result = mdt_attr_get_complex(info, child, ma);
/*
* The object is on remote node, return its FID for remote open.
*/
body = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
mti->mti_attr.ma_need = MA_INODE;
mti->mti_attr.ma_valid = 0;
- rc = mo_attr_get(mti->mti_env, mdt_object_child(child), &mti->mti_attr);
+ rc = mdt_attr_get_complex(mti, child, &mti->mti_attr);
if (rc == -EREMOTE) {
/* object was created on remote server */
req->rq_status = rc;
}
mti->mti_attr.ma_need = MA_INODE;
mti->mti_attr.ma_valid = 0;
- mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr);
+ mdt_attr_get_complex(mti, obj, &mti->mti_attr);
mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
mdt_object_fid(obj));
if (mti->mti_ioepoch && (mti->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
rc = mdo_create(info->mti_env, next, lname,
mdt_object_child(child),
&info->mti_spec, ma);
+ if (rc == 0)
+ rc = mdt_attr_get_complex(info, child, ma);
+
if (rc == 0) {
/* Return fid & attr to client. */
if (ma->ma_valid & MA_INODE)
* recovery, just get attr in that case.
*/
if (mdt_object_exists(o) == 1) {
- rc = mo_attr_get(info->mti_env, next, ma);
+ rc = mdt_attr_get_complex(info, o, ma);
} else {
/*
* Here, NO permission check for object_create,
struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
struct mdt_file_data *mfd;
struct mdt_object *mo;
- struct md_object *next;
struct mdt_body *repbody;
int som_au, rc, rc2;
ENTRY;
ma->ma_need = MA_INODE;
ma->ma_valid = 0;
- next = mdt_object_child(mo);
- rc = mo_attr_get(info->mti_env, next, ma);
+ rc = mdt_attr_get_complex(info, mo, ma);
if (rc != 0)
GOTO(out_put, rc);
mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
mdt_object_child(mc), lname, ma);
+ if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
+ rc = mdt_attr_get_complex(info, mc, ma);
if (rc == 0)
mdt_handle_last_unlink(info, mc, ma);