Whamcloud - gitweb
LU-8159 osd-ldiskfs: cache xattr in osd-ldiskfs 96/20496/3
authorLai Siyao <lai.siyao@intel.com>
Tue, 24 May 2016 01:56:08 +0000 (09:56 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 27 Jun 2016 18:55:45 +0000 (18:55 +0000)
In LU-7660 LOD doesn't cache default striping, so each create will
get it from bottom. LU-7660 has enabled xattr cache in OSP, and
osd-zfs caches xattr already, this patch introduces a similar
mechanism in osd-ldiskfs.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I3382119acaf9a819ee2be9737d1eae833e9ea2a4
Reviewed-on: http://review.whamcloud.com/20496
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/target/update_trans.c

index 7b549b3..21bbdfc 100644 (file)
@@ -347,6 +347,7 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
                init_rwsem(&mo->oo_sem);
                init_rwsem(&mo->oo_ext_idx_sem);
                spin_lock_init(&mo->oo_guard);
+               INIT_LIST_HEAD(&mo->oo_xattr_list);
                 return l;
         } else {
                 return NULL;
@@ -1061,6 +1062,136 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
        return result;
 }
 
+/* The first part of oxe_buf is xattr name, and is '\0' terminated.
+ * The left part is for value, binary mode. */
+struct osd_xattr_entry {
+       struct list_head        oxe_list;
+       size_t                  oxe_len;
+       size_t                  oxe_namelen;
+       bool                    oxe_exist;
+       struct rcu_head         oxe_rcu;
+       char                    oxe_buf[0];
+};
+
+static struct osd_xattr_entry *osd_oxc_lookup(struct osd_object *obj,
+                                             const char *name,
+                                             size_t namelen)
+{
+       struct osd_xattr_entry *oxe;
+
+       list_for_each_entry(oxe, &obj->oo_xattr_list, oxe_list) {
+               if (namelen == oxe->oxe_namelen &&
+                   strncmp(name, oxe->oxe_buf, namelen) == 0)
+                       return oxe;
+       }
+
+       return NULL;
+}
+
+static int osd_oxc_get(struct osd_object *obj, const char *name,
+                      struct lu_buf *buf)
+{
+       struct osd_xattr_entry *oxe;
+       size_t vallen;
+       ENTRY;
+
+       rcu_read_lock();
+       oxe = osd_oxc_lookup(obj, name, strlen(name));
+       if (oxe == NULL) {
+               rcu_read_unlock();
+               RETURN(-ENOENT);
+       }
+
+       if (!oxe->oxe_exist) {
+               rcu_read_unlock();
+               RETURN(-ENODATA);
+       }
+
+       vallen = oxe->oxe_len - sizeof(*oxe) - oxe->oxe_namelen - 1;
+       LASSERT(vallen > 0);
+
+       if (buf->lb_buf == NULL) {
+               rcu_read_unlock();
+               RETURN(vallen);
+       }
+
+       if (buf->lb_len < vallen) {
+               rcu_read_unlock();
+               RETURN(-ERANGE);
+       }
+
+       memcpy(buf->lb_buf, oxe->oxe_buf + oxe->oxe_namelen + 1, vallen);
+       rcu_read_unlock();
+
+       RETURN(vallen);
+}
+
+static void osd_oxc_free(struct rcu_head *head)
+{
+       struct osd_xattr_entry *oxe;
+
+       oxe = container_of(head, struct osd_xattr_entry, oxe_rcu);
+       OBD_FREE(oxe, oxe->oxe_len);
+}
+
+static inline void __osd_oxc_del(struct osd_object *obj, const char *name)
+{
+       struct osd_xattr_entry *oxe;
+
+       oxe = osd_oxc_lookup(obj, name, strlen(name));
+       if (oxe != NULL) {
+               list_del(&oxe->oxe_list);
+               call_rcu(&oxe->oxe_rcu, osd_oxc_free);
+       }
+}
+
+static void osd_oxc_add(struct osd_object *obj, const char *name,
+                       const char *buf, int buflen)
+{
+       struct osd_xattr_entry *oxe;
+       size_t namelen = strlen(name);
+       size_t len = sizeof(*oxe) + namelen + 1 + buflen;
+
+       OBD_ALLOC(oxe, len);
+       if (oxe == NULL)
+               return;
+
+       INIT_LIST_HEAD(&oxe->oxe_list);
+       oxe->oxe_len = len;
+       oxe->oxe_namelen = namelen;
+       memcpy(oxe->oxe_buf, name, namelen);
+       if (buflen > 0) {
+               LASSERT(buf != NULL);
+               memcpy(oxe->oxe_buf + namelen + 1, buf, buflen);
+               oxe->oxe_exist = true;
+       } else {
+               oxe->oxe_exist = false;
+       }
+
+       /* this should be rarely called, just remove old and add new */
+       spin_lock(&obj->oo_guard);
+       __osd_oxc_del(obj, name);
+       list_add_tail(&oxe->oxe_list, &obj->oo_xattr_list);
+       spin_unlock(&obj->oo_guard);
+}
+
+static void osd_oxc_del(struct osd_object *obj, const char *name)
+{
+       spin_lock(&obj->oo_guard);
+       __osd_oxc_del(obj, name);
+       spin_unlock(&obj->oo_guard);
+}
+
+static void osd_oxc_fini(struct osd_object *obj)
+{
+       struct osd_xattr_entry *oxe, *next;
+
+       list_for_each_entry_safe(oxe, next, &obj->oo_xattr_list, oxe_list) {
+               list_del(&oxe->oxe_list);
+               OBD_FREE(oxe, oxe->oxe_len);
+       }
+}
+
 /*
  * Concurrency: no concurrent access is possible that late in object
  * life-cycle.
@@ -1071,6 +1202,7 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
 
         LINVRNT(osd_invariant(obj));
 
+       osd_oxc_fini(obj);
         dt_object_fini(&obj->oo_dt);
         if (obj->oo_hl_head != NULL)
                 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
@@ -3388,15 +3520,17 @@ static int osd_object_version_get(const struct lu_env *env,
 static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
                         struct lu_buf *buf, const char *name)
 {
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_obj_dentry;
+       struct osd_object      *obj    = osd_dt_obj(dt);
+       struct inode           *inode  = obj->oo_inode;
+       struct osd_thread_info *info   = osd_oti_get(env);
+       struct dentry          *dentry = &info->oti_obj_dentry;
+       bool                    cache_xattr = false;
+       int                     rc;
 
-        /* version get is not real XATTR but uses xattr API */
-        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
-                /* for version we are just using xattr API but change inode
-                 * field instead */
+       /* version get is not real XATTR but uses xattr API */
+       if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+               /* for version we are just using xattr API but change inode
+                * field instead */
                if (buf->lb_len == 0)
                        return sizeof(dt_obj_version_t);
 
@@ -3406,7 +3540,7 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
                osd_object_version_get(env, dt, buf->lb_buf);
 
                return sizeof(dt_obj_version_t);
-        }
+       }
 
        if (!dt_object_exists(dt))
                return -ENOENT;
@@ -3415,9 +3549,26 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
        LASSERT(inode->i_op != NULL);
        LASSERT(inode->i_op->getxattr != NULL);
 
-       return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len);
-}
+       if (strcmp(name, XATTR_NAME_LOV) == 0 ||
+           strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
+               cache_xattr = true;
+
+       if (cache_xattr) {
+               rc = osd_oxc_get(obj, name, buf);
+               if (rc != -ENOENT)
+                       return rc;
+       }
+
+       rc = __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len);
+       if (cache_xattr) {
+               if (rc == -ENOENT || rc == -ENODATA)
+                       osd_oxc_add(obj, name, NULL, 0);
+               else if (rc > 0 && buf->lb_buf != NULL)
+                       osd_oxc_add(obj, name, buf->lb_buf, rc);
+       }
 
+       return rc;
+}
 
 static int osd_declare_xattr_set(const struct lu_env *env,
                                  struct dt_object *dt,
@@ -3516,16 +3667,16 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
        int                     rc;
        ENTRY;
 
-        LASSERT(handle != NULL);
+       LASSERT(handle != NULL);
 
-        /* version set is not real XATTR */
-        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
-                /* for version we are just using xattr API but change inode
-                 * field instead */
-                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
-                osd_object_version_set(env, dt, buf->lb_buf);
-                return sizeof(dt_obj_version_t);
-        }
+       /* version set is not real XATTR */
+       if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+               /* for version we are just using xattr API but change inode
+                * field instead */
+               LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+               osd_object_version_set(env, dt, buf->lb_buf);
+               return sizeof(dt_obj_version_t);
+       }
 
        CDEBUG(D_INODE, DFID" set xattr '%s' with size %zu\n",
               PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
@@ -3561,6 +3712,11 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                               fs_flags);
        osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
 
+       if (rc == 0 &&
+           (strcmp(name, XATTR_NAME_LOV) == 0 ||
+            strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0))
+               osd_oxc_add(obj, name, buf->lb_buf, buf->lb_len);
+
        return rc;
 }
 
@@ -3638,6 +3794,12 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
        dentry->d_sb = inode->i_sb;
        rc = inode->i_op->removexattr(dentry, name);
        osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
+
+       if (rc == 0 &&
+           (strcmp(name, XATTR_NAME_LOV) == 0 ||
+            strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0))
+               osd_oxc_del(obj, name);
+
        return rc;
 }
 
index 654884d..d7baa6a 100644 (file)
@@ -148,6 +148,8 @@ struct osd_object {
 #ifdef CONFIG_LOCKDEP
         struct lockdep_map      oo_dep_map;
 #endif
+
+       struct list_head        oo_xattr_list;
 };
 
 struct osd_obj_seq {
index 2d7c2cf..7928bae 100644 (file)
@@ -72,7 +72,7 @@ static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt,
        struct sub_thandle      *st;
 
        LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
-       CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d"
+       CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d "
               "batchid "LPU64"\n",
               tmt->tmt_master_sub_dt ?
               tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name :