* get_attr()/get_xattr() called. For a large system, the LFSCK synchronous
* mode scanning is prohibitively inefficient.
*
- * So the OSP maintains the OSP object attributes cache to cache some
- * attributes on the local MDT. The cache is organized against the OSP
- * object as follows:
- *
- * struct osp_xattr_entry {
- * struct list_head oxe_list;
- * atomic_t oxe_ref;
- * void *oxe_value;
- * int oxe_buflen;
- * int oxe_namelen;
- * int oxe_vallen;
- * unsigned int oxe_exist:1,
- * oxe_ready:1;
- * char oxe_buf[0];
- * };
- *
- * struct osp_object {
- * ...
- * struct lu_attr opo_attr;
- * struct list_head opo_xattr_list;
- * spinlock_t opo_lock;
- * ...
- * };
+ * The OSP maintains the OSP object attributes cache to cache some
+ * attributes on the local MDT.
*
* The basic attributes, such as owner/mode/flags, are stored in the
* osp_object::opo_attr. The extended attributes will be stored
#define OXE_DEFAULT_LEN 16
/**
+ * Allocate osp_xattr_entry.
+ *
+ * If total size exceeds PAGE_SIZE, name and value will allocated in a
+ * separate buf, otherwise it's allocated inline.
+ *
+ * \param[in] name pointer to XATTR name
+ * \param[in] namelen XATTR name len
+ * \param[in] vallen XATTR value len
+ * \retval oxe pointer on success
+ * \retval NULL on failure
+ */
+static struct osp_xattr_entry *osp_oac_xattr_alloc(const char *name,
+ size_t namelen,
+ size_t vallen)
+{
+ struct osp_xattr_entry *oxe;
+ size_t size;
+
+ if (!vallen)
+ vallen = OXE_DEFAULT_LEN;
+ size = sizeof(*oxe) + namelen + 1 + vallen;
+ if (likely(size <= PAGE_SIZE)) {
+ OBD_ALLOC(oxe, size);
+ if (unlikely(!oxe))
+ return NULL;
+ oxe->oxe_buflen = size;
+ oxe->oxe_value = oxe->oxe_name + namelen + 1;
+ } else {
+ char *buf;
+
+ OBD_ALLOC_LARGE(buf, vallen);
+ if (unlikely(!buf))
+ return NULL;
+
+ size -= vallen;
+ OBD_ALLOC(oxe, size);
+ if (unlikely(!oxe)) {
+ OBD_FREE(buf, size);
+ return NULL;
+ }
+ oxe->oxe_buflen = vallen;
+ oxe->oxe_value = buf;
+ oxe->oxe_largebuf = 1;
+ }
+
+ INIT_LIST_HEAD(&oxe->oxe_list);
+
+ oxe->oxe_namelen = namelen;
+ memcpy(oxe->oxe_name, name, namelen);
+ /* One ref is for the caller, the other is for the entry on the list. */
+ atomic_set(&oxe->oxe_ref, 2);
+
+ return oxe;
+}
+
+static void osp_oac_xattr_free(struct osp_xattr_entry *oxe)
+{
+ LASSERT(list_empty(&oxe->oxe_list));
+ if (unlikely(oxe->oxe_largebuf)) {
+ OBD_FREE_LARGE(oxe->oxe_value, oxe->oxe_buflen);
+ OBD_FREE(oxe, sizeof(*oxe) + oxe->oxe_namelen + 1);
+ } else {
+ OBD_FREE(oxe, oxe->oxe_buflen);
+ }
+}
+
+/**
* Release reference from the OSP object extended attribute entry.
*
* If it is the last reference, then free the entry.
*/
static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
{
- if (atomic_dec_and_test(&oxe->oxe_ref)) {
- LASSERT(list_empty(&oxe->oxe_list));
-
- OBD_FREE_LARGE(oxe, oxe->oxe_buflen);
- }
+ if (atomic_dec_and_test(&oxe->oxe_ref))
+ osp_oac_xattr_free(oxe);
}
/**
list_for_each_entry(oxe, &obj->opo_xattr_list, oxe_list) {
if (namelen == oxe->oxe_namelen &&
- strncmp(name, oxe->oxe_buf, namelen) == 0)
+ strncmp(name, oxe->oxe_name, namelen) == 0)
return oxe;
}
struct osp_xattr_entry *oxe;
struct osp_xattr_entry *tmp = NULL;
size_t namelen = strlen(name);
- size_t size = sizeof(*oxe) + namelen + 1 +
- (len ? len : OXE_DEFAULT_LEN);
oxe = osp_oac_xattr_find(obj, name, false);
if (oxe)
return oxe;
- OBD_ALLOC_LARGE(oxe, size);
- if (unlikely(!oxe))
+ oxe = osp_oac_xattr_alloc(name, namelen, len);
+ if (!oxe)
return NULL;
- INIT_LIST_HEAD(&oxe->oxe_list);
- oxe->oxe_buflen = size;
- oxe->oxe_namelen = namelen;
- memcpy(oxe->oxe_buf, name, namelen);
- oxe->oxe_value = oxe->oxe_buf + namelen + 1;
- /* One ref is for the caller, the other is for the entry on the list. */
- atomic_set(&oxe->oxe_ref, 2);
-
spin_lock(&obj->opo_lock);
tmp = osp_oac_xattr_find_locked(obj, name, namelen);
if (!tmp)
spin_unlock(&obj->opo_lock);
if (tmp) {
- OBD_FREE_LARGE(oxe, size);
+ osp_oac_xattr_free(oxe);
oxe = tmp;
}
return oxe;
}
+/* whether \a oxe is large enough to hold XATTR value */
+static inline bool oxe_can_hold(struct osp_xattr_entry *oxe, size_t len)
+{
+ if (unlikely(oxe->oxe_largebuf))
+ return oxe->oxe_buflen > len;
+
+ return oxe->oxe_buflen - oxe->oxe_namelen - 1 - sizeof(*oxe) > len;
+}
+
/**
* Assign the cached OST-object's EA with the given value.
*
struct osp_xattr_entry *new = NULL;
struct osp_xattr_entry *old = NULL;
int namelen = oxe->oxe_namelen;
- size_t size = sizeof(*oxe) + namelen + 1 + buf->lb_len;
bool unlink_only = false;
- if (oxe->oxe_buflen < size) {
- OBD_ALLOC_LARGE(new, size);
+ if (!oxe_can_hold(oxe, buf->lb_len)) {
+ new = osp_oac_xattr_alloc(oxe->oxe_name, namelen, buf->lb_len);
if (likely(new)) {
- INIT_LIST_HEAD(&new->oxe_list);
- new->oxe_buflen = size;
- new->oxe_namelen = namelen;
- memcpy(new->oxe_buf, oxe->oxe_buf, namelen);
- new->oxe_value = new->oxe_buf + namelen + 1;
- /* One ref is for the caller,
- * the other is for the entry on the list. */
- atomic_set(&new->oxe_ref, 2);
__osp_oac_xattr_assignment(obj, new, buf);
} else {
unlink_only = true;
CWARN("%s: cannot update cached xattr %.*s of "DFID"\n",
- osp_dto2name(obj), namelen, oxe->oxe_buf,
+ osp_dto2name(obj), namelen, oxe->oxe_name,
PFID(lu_object_fid(&obj->opo_obj.do_lu)));
}
}
spin_lock(&obj->opo_lock);
- old = osp_oac_xattr_find_locked(obj, oxe->oxe_buf, namelen);
+ old = osp_oac_xattr_find_locked(obj, oxe->oxe_name, namelen);
if (likely(old)) {
if (new) {
/* Unlink the 'old'. */
spin_lock(&obj->opo_lock);
if (rc >= 0) {
struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
- size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1;
rc = object_update_result_data_get(reply, rbuf, index);
if (rc == -ENOENT || rc == -ENODATA || rc == 0) {
goto unlock;
}
- if (unlikely(rc < 0) ||
- rbuf->lb_len > (oxe->oxe_buflen - len)) {
+ if (unlikely(rc < 0) || !oxe_can_hold(oxe, rbuf->lb_len)) {
oxe->oxe_ready = 0;
goto unlock;
}
dt_object_fini(&obj->opo_obj);
lu_object_header_fini(h);
list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
- list_del(&oxe->oxe_list);
+ list_del_init(&oxe->oxe_list);
count = atomic_read(&oxe->oxe_ref);
LASSERTF(count == 1,
"Still has %d users on the xattr entry %.*s\n",
- count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
+ count-1, (int)oxe->oxe_namelen, oxe->oxe_name);
- OBD_FREE_LARGE(oxe, oxe->oxe_buflen);
+ osp_oac_xattr_free(oxe);
}
OBD_FREE_PRE(obj, sizeof(*obj), "slab-freed");
call_rcu(&obj->opo_header.loh_rcu, osp_object_free_rcu);