if (oh->ot_assigned == 0) {
LASSERT(oh->ot_tx);
dmu_tx_abort(oh->ot_tx);
- osd_object_sa_dirty_rele(env, oh);
+ osd_object_sa_dirty_rele(oh);
osd_unlinked_list_emptify(osd, &unlinked, false);
/* there won't be any commit, release reserved quota space now,
* if any */
LASSERT(oh->ot_tx);
txg = oh->ot_tx->tx_txg;
- osd_object_sa_dirty_rele(env, oh);
+ osd_object_sa_dirty_rele(oh);
/* XXX: Once dmu_tx_commit() called, oh/th could have been freed
* by osd_trans_commit_cb already. */
dmu_tx_commit(oh->ot_tx);
INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
INIT_LIST_HEAD(&oh->ot_unlinked_list);
INIT_LIST_HEAD(&oh->ot_sa_list);
+ sema_init(&oh->ot_sa_lock, 1);
memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
th = &oh->ot_super;
th->th_dev = dt;
{
struct osd_thread_info *info = data;
- lu_buf_free(&info->oti_xattr_lbuf);
OBD_FREE_PTR(info);
}
static void osd_key_exit(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
{
+ struct osd_thread_info *info = data;
+
+ memset(info, 0, sizeof(*info));
}
struct lu_context_key osd_key = {
struct lquota_id_info oti_qi;
struct lu_seq_range oti_seq_range;
- struct lu_buf oti_xattr_lbuf;
};
extern struct lu_context_key osd_key;
struct list_head ot_stop_dcb_list;
struct list_head ot_unlinked_list;
struct list_head ot_sa_list;
+ struct semaphore ot_sa_lock;
dmu_tx_t *ot_tx;
struct lquota_trans ot_quota_trans;
__u32 ot_write_commit:1,
uint64_t oo_xattr;
enum osd_destroy_type oo_destroy;
- __u32 oo_destroyed:1,
- oo_late_xattr:1;
+ __u32 oo_destroyed:1;
/* the i_flags in LMA */
__u32 oo_lma_flags;
/* osd_object.c */
extern char *osd_obj_tag;
-void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh);
-void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh);
+void osd_object_sa_dirty_rele(struct osd_thandle *oh);
int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
uint64_t oid, dmu_buf_t **dbp);
struct lu_object *osd_object_alloc(const struct lu_env *env,
uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
/* osd_xattr.c */
-int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
- struct osd_thandle *oh);
int __osd_xattr_load(struct osd_device *osd, uint64_t dnode,
nvlist_t **sa_xattr);
int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
/*
* Add object to list of dirty objects in tx handle.
*/
-void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
+static void
+osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
{
if (!list_empty(&obj->oo_sa_linkage))
return;
+ down(&oh->ot_sa_lock);
write_lock(&obj->oo_attr_lock);
if (likely(list_empty(&obj->oo_sa_linkage)))
list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
write_unlock(&obj->oo_attr_lock);
+ up(&oh->ot_sa_lock);
}
/*
* Release spill block dbuf hold for all dirty SAs.
*/
-void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh)
+void osd_object_sa_dirty_rele(struct osd_thandle *oh)
{
struct osd_object *obj;
+ down(&oh->ot_sa_lock);
while (!list_empty(&oh->ot_sa_list)) {
obj = list_entry(oh->ot_sa_list.next,
struct osd_object, oo_sa_linkage);
+ sa_spill_rele(obj->oo_sa_hdl);
write_lock(&obj->oo_attr_lock);
list_del_init(&obj->oo_sa_linkage);
- if (obj->oo_late_xattr) {
- LASSERT(oh->ot_assigned != 0);
- /* we need oo_guard to protect oo_sa_xattr
- * consistency, but if we just take it, then
- * we break lock ordering. in majority of
- * cases the callers don't try to set EAs
- * very often in concurrent manner, so hopefully
- * we don't need to retake locks too often */
- if (down_read_trylock(&obj->oo_guard)) {
- __osd_sa_xattr_update(env, obj, oh);
- up_read(&obj->oo_guard);
- } else {
- down_read(&obj->oo_guard);
- __osd_sa_xattr_update(env, obj, oh);
- up_read(&obj->oo_guard);
- }
- }
- sa_spill_rele(obj->oo_sa_hdl);
write_unlock(&obj->oo_attr_lock);
}
+ up(&oh->ot_sa_lock);
}
/*
RETURN(0);
}
-int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
- struct osd_thandle *oh)
-{
- struct lu_buf *lb = &osd_oti_get(env)->oti_xattr_lbuf;
- struct osd_device *osd = osd_obj2dev(obj);
- char *dxattr;
- size_t size;
- int rc;
-
- obj->oo_late_xattr = 0;
-
- /* Update the SA for additions, modifications, and removals. */
- rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
- if (rc)
- return rc;
-
- lu_buf_check_and_alloc(lb, size);
- if (lb->lb_buf == NULL) {
- CERROR("%s: can't allocate buffer for xattr update\n",
- osd->od_svname);
- return -ENOMEM;
- }
-
- dxattr = lb->lb_buf;
- rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &size,
- NV_ENCODE_XDR, KM_SLEEP);
- if (rc)
- return rc;
- LASSERT(dxattr == lb->lb_buf);
-
- sa_update(obj->oo_sa_hdl, SA_ZPL_DXATTR(osd), dxattr, size, oh->ot_tx);
-
- return 0;
-}
-
/*
* Set an extended attribute.
* This transaction must have called udmu_xattr_declare_set() first.
*
* No locking is done here.
*/
-int __osd_sa_xattr_schedule_update(const struct lu_env *env,
- struct osd_object *obj,
- struct osd_thandle *oh)
+static int
+__osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
+ struct osd_thandle *oh)
{
+ struct osd_device *osd = osd_obj2dev(obj);
+ char *dxattr;
+ size_t sa_size;
+ int rc;
+
ENTRY;
LASSERT(obj->oo_sa_hdl);
LASSERT(obj->oo_sa_xattr);
- /* schedule batched SA update in osd_object_sa_dirty_rele() */
- obj->oo_late_xattr = 1;
- osd_object_sa_dirty_add(obj, oh);
+ /* Update the SA for additions, modifications, and removals. */
+ rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR);
+ if (rc)
+ return rc;
- RETURN(0);
+ dxattr = osd_zio_buf_alloc(sa_size);
+ if (dxattr == NULL)
+ RETURN(-ENOMEM);
+
+ rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size,
+ NV_ENCODE_XDR, KM_SLEEP);
+ if (rc)
+ GOTO(out_free, rc);
+ rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(osd), dxattr, sa_size, oh);
+out_free:
+ osd_zio_buf_free(dxattr, sa_size);
+ RETURN(rc);
}
int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
const struct lu_buf *buf, const char *name, int fl,
struct osd_thandle *oh)
{
- dmu_buf_impl_t *db;
uchar_t *nv_value;
size_t size;
int nv_size;
DATA_TYPE_BYTE_ARRAY);
if (rc < 0)
return rc;
- rc = __osd_sa_xattr_schedule_update(env, obj, oh);
+ rc = __osd_sa_xattr_update(env, obj, oh);
return rc == 0 ? -EFBIG : rc;
}
} else if (rc == -ENOENT) {
if (rc)
return rc;
- /* batch updates only for just created dnodes where we
- * used to set number of EAs in a single transaction */
- db = (dmu_buf_impl_t *)obj->oo_db;
- if (DB_DNODE(db)->dn_allocated_txg == oh->ot_tx->tx_txg)
- rc = __osd_sa_xattr_schedule_update(env, obj, oh);
- else
- rc = __osd_sa_xattr_update(env, obj, oh);
-
+ rc = __osd_sa_xattr_update(env, obj, oh);
return rc;
}
rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
if (rc == 0)
- rc = __osd_sa_xattr_schedule_update(env, obj, oh);
+ rc = __osd_sa_xattr_update(env, obj, oh);
return rc;
}