From a1c5adf7f466cce5b9abae46704c126b1f11d6da Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Wed, 19 May 2021 10:58:19 +0800 Subject: [PATCH] LU-14607 osp: separate buffer for large XATTR Once XATTR is too large to fit into PAGE_SIZE, allocate value in a separate buffer for osp_xattr_entry. Signed-off-by: Lai Siyao Change-Id: Ied090ff73e2e5cdeaf2d91a3670067210f2ab1d7 Reviewed-on: https://review.whamcloud.com/43736 Reviewed-by: Andreas Dilger Tested-by: jenkins Tested-by: Maloo Reviewed-by: John L. Hammond Reviewed-by: Oleg Drokin --- lustre/osp/osp_internal.h | 15 ++--- lustre/osp/osp_object.c | 153 ++++++++++++++++++++++++++++------------------ 2 files changed, 100 insertions(+), 68 deletions(-) diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index b074fa0..0c71b26 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -294,14 +294,15 @@ extern struct kmem_cache *osp_object_kmem; * The left part is for value, binary mode. */ struct osp_xattr_entry { struct list_head oxe_list; - atomic_t oxe_ref; void *oxe_value; - size_t oxe_buflen; - size_t oxe_namelen; - size_t oxe_vallen; - unsigned int oxe_exist:1, - oxe_ready:1; - char oxe_buf[0]; + atomic_t oxe_ref; + unsigned int oxe_buflen; + unsigned int oxe_vallen; + unsigned short oxe_namelen; + unsigned short oxe_exist:1, + oxe_ready:1, + oxe_largebuf:1; + char oxe_name[0]; }; /* this is a top object */ diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 7c52b7e..8e8b1ad 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -39,29 +39,8 @@ * get_attr()/get_xattr() called. For a large system, the LFSCK synchronous * mode scanning is prohibitively inefficient. * - * So the OSP maintains the OSP object attributes cache to cache some - * attributes on the local MDT. The cache is organized against the OSP - * object as follows: - * - * struct osp_xattr_entry { - * struct list_head oxe_list; - * atomic_t oxe_ref; - * void *oxe_value; - * int oxe_buflen; - * int oxe_namelen; - * int oxe_vallen; - * unsigned int oxe_exist:1, - * oxe_ready:1; - * char oxe_buf[0]; - * }; - * - * struct osp_object { - * ... - * struct lu_attr opo_attr; - * struct list_head opo_xattr_list; - * spinlock_t opo_lock; - * ... - * }; + * The OSP maintains the OSP object attributes cache to cache some + * attributes on the local MDT. * * The basic attributes, such as owner/mode/flags, are stored in the * osp_object::opo_attr. The extended attributes will be stored @@ -175,6 +154,73 @@ static void osp_object_assign_fid(const struct lu_env *env, #define OXE_DEFAULT_LEN 16 /** + * Allocate osp_xattr_entry. + * + * If total size exceeds PAGE_SIZE, name and value will allocated in a + * separate buf, otherwise it's allocated inline. + * + * \param[in] name pointer to XATTR name + * \param[in] namelen XATTR name len + * \param[in] vallen XATTR value len + * \retval oxe pointer on success + * \retval NULL on failure + */ +static struct osp_xattr_entry *osp_oac_xattr_alloc(const char *name, + size_t namelen, + size_t vallen) +{ + struct osp_xattr_entry *oxe; + size_t size; + + if (!vallen) + vallen = OXE_DEFAULT_LEN; + size = sizeof(*oxe) + namelen + 1 + vallen; + if (likely(size <= PAGE_SIZE)) { + OBD_ALLOC(oxe, size); + if (unlikely(!oxe)) + return NULL; + oxe->oxe_buflen = size; + oxe->oxe_value = oxe->oxe_name + namelen + 1; + } else { + char *buf; + + OBD_ALLOC_LARGE(buf, vallen); + if (unlikely(!buf)) + return NULL; + + size -= vallen; + OBD_ALLOC(oxe, size); + if (unlikely(!oxe)) { + OBD_FREE(buf, size); + return NULL; + } + oxe->oxe_buflen = vallen; + oxe->oxe_value = buf; + oxe->oxe_largebuf = 1; + } + + INIT_LIST_HEAD(&oxe->oxe_list); + + oxe->oxe_namelen = namelen; + memcpy(oxe->oxe_name, name, namelen); + /* One ref is for the caller, the other is for the entry on the list. */ + atomic_set(&oxe->oxe_ref, 2); + + return oxe; +} + +static void osp_oac_xattr_free(struct osp_xattr_entry *oxe) +{ + LASSERT(list_empty(&oxe->oxe_list)); + if (unlikely(oxe->oxe_largebuf)) { + OBD_FREE_LARGE(oxe->oxe_value, oxe->oxe_buflen); + OBD_FREE(oxe, sizeof(*oxe) + oxe->oxe_namelen + 1); + } else { + OBD_FREE(oxe, oxe->oxe_buflen); + } +} + +/** * Release reference from the OSP object extended attribute entry. * * If it is the last reference, then free the entry. @@ -183,11 +229,8 @@ static void osp_object_assign_fid(const struct lu_env *env, */ static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe) { - if (atomic_dec_and_test(&oxe->oxe_ref)) { - LASSERT(list_empty(&oxe->oxe_list)); - - OBD_FREE_LARGE(oxe, oxe->oxe_buflen); - } + if (atomic_dec_and_test(&oxe->oxe_ref)) + osp_oac_xattr_free(oxe); } /** @@ -212,7 +255,7 @@ osp_oac_xattr_find_locked(struct osp_object *obj, const char *name, list_for_each_entry(oxe, &obj->opo_xattr_list, oxe_list) { if (namelen == oxe->oxe_namelen && - strncmp(name, oxe->oxe_buf, namelen) == 0) + strncmp(name, oxe->oxe_name, namelen) == 0) return oxe; } @@ -272,25 +315,15 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) struct osp_xattr_entry *oxe; struct osp_xattr_entry *tmp = NULL; size_t namelen = strlen(name); - size_t size = sizeof(*oxe) + namelen + 1 + - (len ? len : OXE_DEFAULT_LEN); oxe = osp_oac_xattr_find(obj, name, false); if (oxe) return oxe; - OBD_ALLOC_LARGE(oxe, size); - if (unlikely(!oxe)) + oxe = osp_oac_xattr_alloc(name, namelen, len); + if (!oxe) return NULL; - INIT_LIST_HEAD(&oxe->oxe_list); - oxe->oxe_buflen = size; - oxe->oxe_namelen = namelen; - memcpy(oxe->oxe_buf, name, namelen); - oxe->oxe_value = oxe->oxe_buf + namelen + 1; - /* One ref is for the caller, the other is for the entry on the list. */ - atomic_set(&oxe->oxe_ref, 2); - spin_lock(&obj->opo_lock); tmp = osp_oac_xattr_find_locked(obj, name, namelen); if (!tmp) @@ -300,13 +333,22 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) spin_unlock(&obj->opo_lock); if (tmp) { - OBD_FREE_LARGE(oxe, size); + osp_oac_xattr_free(oxe); oxe = tmp; } return oxe; } +/* whether \a oxe is large enough to hold XATTR value */ +static inline bool oxe_can_hold(struct osp_xattr_entry *oxe, size_t len) +{ + if (unlikely(oxe->oxe_largebuf)) + return oxe->oxe_buflen > len; + + return oxe->oxe_buflen - oxe->oxe_namelen - 1 - sizeof(*oxe) > len; +} + /** * Assign the cached OST-object's EA with the given value. * @@ -328,31 +370,22 @@ osp_oac_xattr_assignment(struct osp_object *obj, struct osp_xattr_entry *oxe, struct osp_xattr_entry *new = NULL; struct osp_xattr_entry *old = NULL; int namelen = oxe->oxe_namelen; - size_t size = sizeof(*oxe) + namelen + 1 + buf->lb_len; bool unlink_only = false; - if (oxe->oxe_buflen < size) { - OBD_ALLOC_LARGE(new, size); + if (!oxe_can_hold(oxe, buf->lb_len)) { + new = osp_oac_xattr_alloc(oxe->oxe_name, namelen, buf->lb_len); if (likely(new)) { - INIT_LIST_HEAD(&new->oxe_list); - new->oxe_buflen = size; - new->oxe_namelen = namelen; - memcpy(new->oxe_buf, oxe->oxe_buf, namelen); - new->oxe_value = new->oxe_buf + namelen + 1; - /* One ref is for the caller, - * the other is for the entry on the list. */ - atomic_set(&new->oxe_ref, 2); __osp_oac_xattr_assignment(obj, new, buf); } else { unlink_only = true; CWARN("%s: cannot update cached xattr %.*s of "DFID"\n", - osp_dto2name(obj), namelen, oxe->oxe_buf, + osp_dto2name(obj), namelen, oxe->oxe_name, PFID(lu_object_fid(&obj->opo_obj.do_lu))); } } spin_lock(&obj->opo_lock); - old = osp_oac_xattr_find_locked(obj, oxe->oxe_buf, namelen); + old = osp_oac_xattr_find_locked(obj, oxe->oxe_name, namelen); if (likely(old)) { if (new) { /* Unlink the 'old'. */ @@ -823,7 +856,6 @@ static int osp_xattr_get_interpterer(const struct lu_env *env, spin_lock(&obj->opo_lock); if (rc >= 0) { struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; - size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1; rc = object_update_result_data_get(reply, rbuf, index); if (rc == -ENOENT || rc == -ENODATA || rc == 0) { @@ -832,8 +864,7 @@ static int osp_xattr_get_interpterer(const struct lu_env *env, goto unlock; } - if (unlikely(rc < 0) || - rbuf->lb_len > (oxe->oxe_buflen - len)) { + if (unlikely(rc < 0) || !oxe_can_hold(oxe, rbuf->lb_len)) { oxe->oxe_ready = 0; goto unlock; } @@ -2318,13 +2349,13 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o) dt_object_fini(&obj->opo_obj); lu_object_header_fini(h); list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) { - list_del(&oxe->oxe_list); + list_del_init(&oxe->oxe_list); count = atomic_read(&oxe->oxe_ref); LASSERTF(count == 1, "Still has %d users on the xattr entry %.*s\n", - count-1, (int)oxe->oxe_namelen, oxe->oxe_buf); + count-1, (int)oxe->oxe_namelen, oxe->oxe_name); - OBD_FREE_LARGE(oxe, oxe->oxe_buflen); + osp_oac_xattr_free(oxe); } OBD_FREE_PRE(obj, sizeof(*obj), "slab-freed"); call_rcu(&obj->opo_header.loh_rcu, osp_object_free_rcu); -- 1.8.3.1