Whamcloud - gitweb
LU-14607 osp: separate buffer for large XATTR
[fs/lustre-release.git] / lustre / osp / osp_object.c
index 7c52b7e..8e8b1ad 100644 (file)
  * get_attr()/get_xattr() called. For a large system, the LFSCK synchronous
  * mode scanning is prohibitively inefficient.
  *
- * So the OSP maintains the OSP object attributes cache to cache some
- * attributes on the local MDT. The cache is organized against the OSP
- * object as follows:
- *
- * struct osp_xattr_entry {
- *     struct list_head         oxe_list;
- *     atomic_t                 oxe_ref;
- *     void                    *oxe_value;
- *     int                      oxe_buflen;
- *     int                      oxe_namelen;
- *     int                      oxe_vallen;
- *     unsigned int             oxe_exist:1,
- *                              oxe_ready:1;
- *     char                     oxe_buf[0];
- * };
- *
- * struct osp_object {
- *     ...
- *     struct lu_attr          opo_attr;
- *     struct list_head        opo_xattr_list;
- *     spinlock_t              opo_lock;
- *     ...
- * };
+ * The OSP maintains the OSP object attributes cache to cache some
+ * attributes on the local MDT.
  *
  * The basic attributes, such as owner/mode/flags, are stored in the
  * osp_object::opo_attr. The extended attributes will be stored
@@ -175,6 +154,73 @@ static void osp_object_assign_fid(const struct lu_env *env,
 #define OXE_DEFAULT_LEN        16
 
 /**
+ * Allocate osp_xattr_entry.
+ *
+ * If total size exceeds PAGE_SIZE, name and value will allocated in a
+ * separate buf, otherwise it's allocated inline.
+ *
+ * \param[in] name     pointer to XATTR name
+ * \param[in] namelen  XATTR name len
+ * \param[in] vallen   XATTR value len
+ * \retval             oxe pointer on success
+ * \retval             NULL on failure
+ */
+static struct osp_xattr_entry *osp_oac_xattr_alloc(const char *name,
+                                                  size_t namelen,
+                                                  size_t vallen)
+{
+       struct osp_xattr_entry *oxe;
+       size_t size;
+
+       if (!vallen)
+               vallen = OXE_DEFAULT_LEN;
+       size = sizeof(*oxe) + namelen + 1 + vallen;
+       if (likely(size <= PAGE_SIZE)) {
+               OBD_ALLOC(oxe, size);
+               if (unlikely(!oxe))
+                       return NULL;
+               oxe->oxe_buflen = size;
+               oxe->oxe_value = oxe->oxe_name + namelen + 1;
+       } else {
+               char *buf;
+
+               OBD_ALLOC_LARGE(buf, vallen);
+               if (unlikely(!buf))
+                       return NULL;
+
+               size -= vallen;
+               OBD_ALLOC(oxe, size);
+               if (unlikely(!oxe)) {
+                       OBD_FREE(buf, size);
+                       return NULL;
+               }
+               oxe->oxe_buflen = vallen;
+               oxe->oxe_value = buf;
+               oxe->oxe_largebuf = 1;
+       }
+
+       INIT_LIST_HEAD(&oxe->oxe_list);
+
+       oxe->oxe_namelen = namelen;
+       memcpy(oxe->oxe_name, name, namelen);
+       /* One ref is for the caller, the other is for the entry on the list. */
+       atomic_set(&oxe->oxe_ref, 2);
+
+       return oxe;
+}
+
+static void osp_oac_xattr_free(struct osp_xattr_entry *oxe)
+{
+       LASSERT(list_empty(&oxe->oxe_list));
+       if (unlikely(oxe->oxe_largebuf)) {
+               OBD_FREE_LARGE(oxe->oxe_value, oxe->oxe_buflen);
+               OBD_FREE(oxe, sizeof(*oxe) + oxe->oxe_namelen + 1);
+       } else {
+               OBD_FREE(oxe, oxe->oxe_buflen);
+       }
+}
+
+/**
  * Release reference from the OSP object extended attribute entry.
  *
  * If it is the last reference, then free the entry.
@@ -183,11 +229,8 @@ static void osp_object_assign_fid(const struct lu_env *env,
  */
 static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
 {
-       if (atomic_dec_and_test(&oxe->oxe_ref)) {
-               LASSERT(list_empty(&oxe->oxe_list));
-
-               OBD_FREE_LARGE(oxe, oxe->oxe_buflen);
-       }
+       if (atomic_dec_and_test(&oxe->oxe_ref))
+               osp_oac_xattr_free(oxe);
 }
 
 /**
@@ -212,7 +255,7 @@ osp_oac_xattr_find_locked(struct osp_object *obj, const char *name,
 
        list_for_each_entry(oxe, &obj->opo_xattr_list, oxe_list) {
                if (namelen == oxe->oxe_namelen &&
-                   strncmp(name, oxe->oxe_buf, namelen) == 0)
+                   strncmp(name, oxe->oxe_name, namelen) == 0)
                        return oxe;
        }
 
@@ -272,25 +315,15 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        struct osp_xattr_entry *oxe;
        struct osp_xattr_entry *tmp = NULL;
        size_t namelen = strlen(name);
-       size_t size = sizeof(*oxe) + namelen + 1 +
-                     (len ? len : OXE_DEFAULT_LEN);
 
        oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe)
                return oxe;
 
-       OBD_ALLOC_LARGE(oxe, size);
-       if (unlikely(!oxe))
+       oxe = osp_oac_xattr_alloc(name, namelen, len);
+       if (!oxe)
                return NULL;
 
-       INIT_LIST_HEAD(&oxe->oxe_list);
-       oxe->oxe_buflen = size;
-       oxe->oxe_namelen = namelen;
-       memcpy(oxe->oxe_buf, name, namelen);
-       oxe->oxe_value = oxe->oxe_buf + namelen + 1;
-       /* One ref is for the caller, the other is for the entry on the list. */
-       atomic_set(&oxe->oxe_ref, 2);
-
        spin_lock(&obj->opo_lock);
        tmp = osp_oac_xattr_find_locked(obj, name, namelen);
        if (!tmp)
@@ -300,13 +333,22 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        spin_unlock(&obj->opo_lock);
 
        if (tmp) {
-               OBD_FREE_LARGE(oxe, size);
+               osp_oac_xattr_free(oxe);
                oxe = tmp;
        }
 
        return oxe;
 }
 
+/* whether \a oxe is large enough to hold XATTR value */
+static inline bool oxe_can_hold(struct osp_xattr_entry *oxe, size_t len)
+{
+       if (unlikely(oxe->oxe_largebuf))
+               return oxe->oxe_buflen > len;
+
+       return oxe->oxe_buflen - oxe->oxe_namelen - 1 - sizeof(*oxe) > len;
+}
+
 /**
  * Assign the cached OST-object's EA with the given value.
  *
@@ -328,31 +370,22 @@ osp_oac_xattr_assignment(struct osp_object *obj, struct osp_xattr_entry *oxe,
        struct osp_xattr_entry *new = NULL;
        struct osp_xattr_entry *old = NULL;
        int namelen = oxe->oxe_namelen;
-       size_t size = sizeof(*oxe) + namelen + 1 + buf->lb_len;
        bool unlink_only = false;
 
-       if (oxe->oxe_buflen < size) {
-               OBD_ALLOC_LARGE(new, size);
+       if (!oxe_can_hold(oxe, buf->lb_len)) {
+               new = osp_oac_xattr_alloc(oxe->oxe_name, namelen, buf->lb_len);
                if (likely(new)) {
-                       INIT_LIST_HEAD(&new->oxe_list);
-                       new->oxe_buflen = size;
-                       new->oxe_namelen = namelen;
-                       memcpy(new->oxe_buf, oxe->oxe_buf, namelen);
-                       new->oxe_value = new->oxe_buf + namelen + 1;
-                       /* One ref is for the caller,
-                        * the other is for the entry on the list. */
-                       atomic_set(&new->oxe_ref, 2);
                        __osp_oac_xattr_assignment(obj, new, buf);
                } else {
                        unlink_only = true;
                        CWARN("%s: cannot update cached xattr %.*s of "DFID"\n",
-                             osp_dto2name(obj), namelen, oxe->oxe_buf,
+                             osp_dto2name(obj), namelen, oxe->oxe_name,
                              PFID(lu_object_fid(&obj->opo_obj.do_lu)));
                }
        }
 
        spin_lock(&obj->opo_lock);
-       old = osp_oac_xattr_find_locked(obj, oxe->oxe_buf, namelen);
+       old = osp_oac_xattr_find_locked(obj, oxe->oxe_name, namelen);
        if (likely(old)) {
                if (new) {
                        /* Unlink the 'old'. */
@@ -823,7 +856,6 @@ static int osp_xattr_get_interpterer(const struct lu_env *env,
        spin_lock(&obj->opo_lock);
        if (rc >= 0) {
                struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
-               size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1;
 
                rc = object_update_result_data_get(reply, rbuf, index);
                if (rc == -ENOENT || rc == -ENODATA || rc == 0) {
@@ -832,8 +864,7 @@ static int osp_xattr_get_interpterer(const struct lu_env *env,
                        goto unlock;
                }
 
-               if (unlikely(rc < 0) ||
-                   rbuf->lb_len > (oxe->oxe_buflen - len)) {
+               if (unlikely(rc < 0) || !oxe_can_hold(oxe, rbuf->lb_len)) {
                        oxe->oxe_ready = 0;
                        goto unlock;
                }
@@ -2318,13 +2349,13 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o)
        dt_object_fini(&obj->opo_obj);
        lu_object_header_fini(h);
        list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
-               list_del(&oxe->oxe_list);
+               list_del_init(&oxe->oxe_list);
                count = atomic_read(&oxe->oxe_ref);
                LASSERTF(count == 1,
                         "Still has %d users on the xattr entry %.*s\n",
-                        count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
+                        count-1, (int)oxe->oxe_namelen, oxe->oxe_name);
 
-               OBD_FREE_LARGE(oxe, oxe->oxe_buflen);
+               osp_oac_xattr_free(oxe);
        }
        OBD_FREE_PRE(obj, sizeof(*obj), "slab-freed");
        call_rcu(&obj->opo_header.loh_rcu, osp_object_free_rcu);