From 3a5db1d89aec0cc3969966f87b94315870f49d2d Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Tue, 24 May 2016 09:56:08 +0800 Subject: [PATCH] LU-8159 osd-ldiskfs: cache xattr in osd-ldiskfs In LU-7660 LOD doesn't cache default striping, so each create will get it from bottom. LU-7660 has enabled xattr cache in OSP, and osd-zfs caches xattr already, this patch introduces a similar mechanism in osd-ldiskfs. Signed-off-by: Lai Siyao Change-Id: I3382119acaf9a819ee2be9737d1eae833e9ea2a4 Reviewed-on: http://review.whamcloud.com/20496 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: wangdi Reviewed-by: Oleg Drokin --- lustre/osd-ldiskfs/osd_handler.c | 202 ++++++++++++++++++++++++++++++++++---- lustre/osd-ldiskfs/osd_internal.h | 2 + lustre/target/update_trans.c | 2 +- 3 files changed, 185 insertions(+), 21 deletions(-) diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 7b549b3..21bbdfc 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -347,6 +347,7 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, init_rwsem(&mo->oo_sem); init_rwsem(&mo->oo_ext_idx_sem); spin_lock_init(&mo->oo_guard); + INIT_LIST_HEAD(&mo->oo_xattr_list); return l; } else { return NULL; @@ -1061,6 +1062,136 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l, return result; } +/* The first part of oxe_buf is xattr name, and is '\0' terminated. + * The left part is for value, binary mode. */ +struct osd_xattr_entry { + struct list_head oxe_list; + size_t oxe_len; + size_t oxe_namelen; + bool oxe_exist; + struct rcu_head oxe_rcu; + char oxe_buf[0]; +}; + +static struct osd_xattr_entry *osd_oxc_lookup(struct osd_object *obj, + const char *name, + size_t namelen) +{ + struct osd_xattr_entry *oxe; + + list_for_each_entry(oxe, &obj->oo_xattr_list, oxe_list) { + if (namelen == oxe->oxe_namelen && + strncmp(name, oxe->oxe_buf, namelen) == 0) + return oxe; + } + + return NULL; +} + +static int osd_oxc_get(struct osd_object *obj, const char *name, + struct lu_buf *buf) +{ + struct osd_xattr_entry *oxe; + size_t vallen; + ENTRY; + + rcu_read_lock(); + oxe = osd_oxc_lookup(obj, name, strlen(name)); + if (oxe == NULL) { + rcu_read_unlock(); + RETURN(-ENOENT); + } + + if (!oxe->oxe_exist) { + rcu_read_unlock(); + RETURN(-ENODATA); + } + + vallen = oxe->oxe_len - sizeof(*oxe) - oxe->oxe_namelen - 1; + LASSERT(vallen > 0); + + if (buf->lb_buf == NULL) { + rcu_read_unlock(); + RETURN(vallen); + } + + if (buf->lb_len < vallen) { + rcu_read_unlock(); + RETURN(-ERANGE); + } + + memcpy(buf->lb_buf, oxe->oxe_buf + oxe->oxe_namelen + 1, vallen); + rcu_read_unlock(); + + RETURN(vallen); +} + +static void osd_oxc_free(struct rcu_head *head) +{ + struct osd_xattr_entry *oxe; + + oxe = container_of(head, struct osd_xattr_entry, oxe_rcu); + OBD_FREE(oxe, oxe->oxe_len); +} + +static inline void __osd_oxc_del(struct osd_object *obj, const char *name) +{ + struct osd_xattr_entry *oxe; + + oxe = osd_oxc_lookup(obj, name, strlen(name)); + if (oxe != NULL) { + list_del(&oxe->oxe_list); + call_rcu(&oxe->oxe_rcu, osd_oxc_free); + } +} + +static void osd_oxc_add(struct osd_object *obj, const char *name, + const char *buf, int buflen) +{ + struct osd_xattr_entry *oxe; + size_t namelen = strlen(name); + size_t len = sizeof(*oxe) + namelen + 1 + buflen; + + OBD_ALLOC(oxe, len); + if (oxe == NULL) + return; + + INIT_LIST_HEAD(&oxe->oxe_list); + oxe->oxe_len = len; + oxe->oxe_namelen = namelen; + memcpy(oxe->oxe_buf, name, namelen); + if (buflen > 0) { + LASSERT(buf != NULL); + memcpy(oxe->oxe_buf + namelen + 1, buf, buflen); + oxe->oxe_exist = true; + } else { + oxe->oxe_exist = false; + } + + /* this should be rarely called, just remove old and add new */ + spin_lock(&obj->oo_guard); + __osd_oxc_del(obj, name); + list_add_tail(&oxe->oxe_list, &obj->oo_xattr_list); + spin_unlock(&obj->oo_guard); +} + +static void osd_oxc_del(struct osd_object *obj, const char *name) +{ + spin_lock(&obj->oo_guard); + __osd_oxc_del(obj, name); + spin_unlock(&obj->oo_guard); +} + +static void osd_oxc_fini(struct osd_object *obj) +{ + struct osd_xattr_entry *oxe, *next; + + list_for_each_entry_safe(oxe, next, &obj->oo_xattr_list, oxe_list) { + list_del(&oxe->oxe_list); + OBD_FREE(oxe, oxe->oxe_len); + } +} + /* * Concurrency: no concurrent access is possible that late in object * life-cycle. @@ -1071,6 +1202,7 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l) LINVRNT(osd_invariant(obj)); + osd_oxc_fini(obj); dt_object_fini(&obj->oo_dt); if (obj->oo_hl_head != NULL) ldiskfs_htree_lock_head_free(obj->oo_hl_head); @@ -3388,15 +3520,17 @@ static int osd_object_version_get(const struct lu_env *env, static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, const char *name) { - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_obj_dentry; + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *dentry = &info->oti_obj_dentry; + bool cache_xattr = false; + int rc; - /* version get is not real XATTR but uses xattr API */ - if (strcmp(name, XATTR_NAME_VERSION) == 0) { - /* for version we are just using xattr API but change inode - * field instead */ + /* version get is not real XATTR but uses xattr API */ + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* for version we are just using xattr API but change inode + * field instead */ if (buf->lb_len == 0) return sizeof(dt_obj_version_t); @@ -3406,7 +3540,7 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, osd_object_version_get(env, dt, buf->lb_buf); return sizeof(dt_obj_version_t); - } + } if (!dt_object_exists(dt)) return -ENOENT; @@ -3415,9 +3549,26 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, LASSERT(inode->i_op != NULL); LASSERT(inode->i_op->getxattr != NULL); - return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len); -} + if (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) + cache_xattr = true; + + if (cache_xattr) { + rc = osd_oxc_get(obj, name, buf); + if (rc != -ENOENT) + return rc; + } + + rc = __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len); + if (cache_xattr) { + if (rc == -ENOENT || rc == -ENODATA) + osd_oxc_add(obj, name, NULL, 0); + else if (rc > 0 && buf->lb_buf != NULL) + osd_oxc_add(obj, name, buf->lb_buf, rc); + } + return rc; +} static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, @@ -3516,16 +3667,16 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - LASSERT(handle != NULL); + LASSERT(handle != NULL); - /* version set is not real XATTR */ - if (strcmp(name, XATTR_NAME_VERSION) == 0) { - /* for version we are just using xattr API but change inode - * field instead */ - LASSERT(buf->lb_len == sizeof(dt_obj_version_t)); - osd_object_version_set(env, dt, buf->lb_buf); - return sizeof(dt_obj_version_t); - } + /* version set is not real XATTR */ + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* for version we are just using xattr API but change inode + * field instead */ + LASSERT(buf->lb_len == sizeof(dt_obj_version_t)); + osd_object_version_set(env, dt, buf->lb_buf); + return sizeof(dt_obj_version_t); + } CDEBUG(D_INODE, DFID" set xattr '%s' with size %zu\n", PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len); @@ -3561,6 +3712,11 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, fs_flags); osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET); + if (rc == 0 && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)) + osd_oxc_add(obj, name, buf->lb_buf, buf->lb_len); + return rc; } @@ -3638,6 +3794,12 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, dentry->d_sb = inode->i_sb; rc = inode->i_op->removexattr(dentry, name); osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET); + + if (rc == 0 && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)) + osd_oxc_del(obj, name); + return rc; } diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index 654884d..d7baa6a 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -148,6 +148,8 @@ struct osd_object { #ifdef CONFIG_LOCKDEP struct lockdep_map oo_dep_map; #endif + + struct list_head oo_xattr_list; }; struct osd_obj_seq { diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index 2d7c2cf..7928bae 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -72,7 +72,7 @@ static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt, struct sub_thandle *st; LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC); - CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d" + CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d " "batchid "LPU64"\n", tmt->tmt_master_sub_dt ? tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name : -- 1.8.3.1