* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
la->la_flags = attrs_zfs2fs(osa->flags);
la->la_size = osa->size;
+ /* Try to get extra flag from LMA. Right now, only LMAI_ORPHAN
+ * flags is stored in LMA, and it is only for orphan directory */
+ if (S_ISDIR(la->la_mode) && dt_object_exists(&obj->oo_dt)) {
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma;
+ struct lu_buf buf;
+
+ lma = (struct lustre_mdt_attrs *)info->oti_buf;
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(info->oti_buf);
+ rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
+ if (rc > 0) {
+ rc = 0;
+ lma->lma_incompat = le32_to_cpu(lma->lma_incompat);
+ obj->oo_lma_flags =
+ lma_to_lustre_flags(lma->lma_incompat);
+
+ } else if (rc == -ENODATA) {
+ rc = 0;
+ }
+ }
+
if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
rc = -sa_lookup(sa_hdl, SA_ZPL_RDEV(o), &osa->rdev, 8);
if (rc)
INIT_LIST_HEAD(&mo->oo_sa_linkage);
INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
init_rwsem(&mo->oo_sem);
- sema_init(&mo->oo_guard, 1);
+ init_rwsem(&mo->oo_guard);
rwlock_init(&mo->oo_attr_lock);
mo->oo_destroy = OSD_DESTROY_NONE;
return l;
if (rc == 0) {
LASSERT(obj->oo_db == NULL);
rc = __osd_obj2dbuf(env, osd->od_os, oid, &obj->oo_db);
+ /* EEXIST will be returned if object is being deleted in ZFS */
+ if (rc == -EEXIST) {
+ rc = 0;
+ GOTO(out, rc);
+ }
if (rc != 0) {
CERROR("%s: lookup "DFID"/"LPX64" failed: rc = %d\n",
osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
{
int rc = -EBUSY;
- down(&obj->oo_guard);
-
LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
+ /* the object is supposed to be exclusively locked by
+ * the caller (osd_object_destroy()), while the transaction
+ * (oh) is per-thread and not shared */
if (likely(list_empty(&obj->oo_unlinked_linkage))) {
list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
rc = 0;
}
- up(&obj->oo_guard);
-
return rc;
}
/* Default to max data size covered by a level-1 indirect block */
static unsigned long osd_sync_destroy_max_size =
1UL << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT + SPA_MAXBLOCKSHIFT);
-CFS_MODULE_PARM(osd_sync_destroy_max_size, "ul", ulong, 0444,
- "Maximum object size to use synchronous destroy.");
+module_param(osd_sync_destroy_max_size, ulong, 0444);
+MODULE_PARM_DESC(osd_sync_destroy_max_size, "Maximum object size to use synchronous destroy.");
static inline void
osd_object_set_destroy_type(struct osd_object *obj)
* Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
* only once and use it consistently thereafter.
*/
- down(&obj->oo_guard);
+ down_write(&obj->oo_guard);
if (obj->oo_destroy == OSD_DESTROY_NONE) {
if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
obj->oo_destroy = OSD_DESTROY_SYNC;
else /* Larger objects are destroyed asynchronously */
obj->oo_destroy = OSD_DESTROY_ASYNC;
}
- up(&obj->oo_guard);
+ up_write(&obj->oo_guard);
}
static int osd_declare_object_destroy(const struct lu_env *env,
uint64_t oid, zapid;
ENTRY;
+ down_write(&obj->oo_guard);
+
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
+
LASSERT(obj->oo_db != NULL);
- LASSERT(dt_object_exists(dt));
- LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
oh = container_of0(th, struct osd_thandle, ot_super);
LASSERT(oh != NULL);
obj->oo_attr.la_gid, rc);
oid = obj->oo_db->db_object;
- if (obj->oo_destroy == OSD_DESTROY_SYNC) {
+ if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
+ /* this may happen if the destroy wasn't declared
+ * e.g. when the object is created and then destroyed
+ * in the same transaction - we don't need additional
+ * space for destroy specifically */
+ LASSERT(obj->oo_attr.la_size <= osd_sync_destroy_max_size);
+ rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
+ if (rc)
+ CERROR("%s: failed to free %s "LPU64": rc = %d\n",
+ osd->od_svname, buf, oid, rc);
+ } else if (obj->oo_destroy == OSD_DESTROY_SYNC) {
rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
if (rc)
CERROR("%s: failed to free %s "LPU64": rc = %d\n",
set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
if (rc == 0)
obj->oo_destroyed = 1;
+ up_write(&obj->oo_guard);
RETURN (0);
}
LASSERT(osd_invariant(obj));
- down_read(&obj->oo_sem);
+ down_read_nested(&obj->oo_sem, role);
}
static void osd_object_write_lock(const struct lu_env *env,
LASSERT(osd_invariant(obj));
- down_write(&obj->oo_sem);
+ down_write_nested(&obj->oo_sem, role);
}
static void osd_object_read_unlock(const struct lu_env *env,
struct osd_object *obj = osd_dt_obj(dt);
uint64_t blocks;
uint32_t blksize;
+ int rc = 0;
+
+ down_read(&obj->oo_guard);
+
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
- LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
LASSERT(obj->oo_db);
read_lock(&obj->oo_attr_lock);
*attr = obj->oo_attr;
+ if (obj->oo_lma_flags & LUSTRE_ORPHAN_FL)
+ attr->la_flags |= LUSTRE_ORPHAN_FL;
read_unlock(&obj->oo_attr_lock);
/* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
attr->la_blocks = blocks;
attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
- return 0;
+out:
+ up_read(&obj->oo_guard);
+ return rc;
}
/* Simple wrapper on top of qsd API which implement quota transfer for osd
struct osd_thandle *oh;
uint64_t bspace;
uint32_t blksize;
- int rc;
+ int rc = 0;
ENTRY;
- if (!dt_object_exists(dt)) {
- /* XXX: sanity check that object creation is declared */
- RETURN(0);
- }
LASSERT(handle != NULL);
LASSERT(osd_invariant(obj));
oh = container_of0(handle, struct osd_thandle, ot_super);
+ down_read(&obj->oo_guard);
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = 0);
+
LASSERT(obj->oo_sa_hdl != NULL);
LASSERT(oh->ot_tx != NULL);
dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
if (oh->ot_tx->tx_err != 0)
- RETURN(-oh->ot_tx->tx_err);
+ GOTO(out, rc = -oh->ot_tx->tx_err);
sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
bspace = toqb(bspace * blksize);
+ __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
+ XATTR_NAME_LMA, oh);
+
if (attr && attr->la_valid & LA_UID) {
/* account for user inode tracking ZAP update */
dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
obj->oo_attr.la_uid, attr->la_uid,
bspace, &info->oti_qi);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
}
}
if (attr && attr->la_valid & LA_GID) {
obj->oo_attr.la_gid, attr->la_gid,
bspace, &info->oti_qi);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
}
}
- RETURN(0);
+out:
+ up_read(&obj->oo_guard);
+ RETURN(rc);
}
/*
static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_attr *la, struct thandle *handle)
{
+ struct osd_thread_info *info = osd_oti_get(env);
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
- struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
+ struct osa_attr *osa = &info->oti_osa;
sa_bulk_attr_t *bulk;
__u64 valid = la->la_valid;
int cnt;
int rc = 0;
ENTRY;
+
+ down_read(&obj->oo_guard);
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
+
LASSERT(handle != NULL);
- LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
LASSERT(obj->oo_sa_hdl);
valid &= ~(LA_SIZE | LA_BLOCKS);
if (valid == 0)
- RETURN(0);
+ GOTO(out, rc = 0);
+
+ if (valid & LA_FLAGS) {
+ struct lustre_mdt_attrs *lma;
+ struct lu_buf buf;
+
+ if (la->la_flags & LUSTRE_LMA_FL_MASKS) {
+ CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
+ lma = (struct lustre_mdt_attrs *)&info->oti_buf;
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(info->oti_buf);
+ rc = osd_xattr_get(env, &obj->oo_dt, &buf,
+ XATTR_NAME_LMA);
+ if (rc > 0) {
+ lma->lma_incompat =
+ le32_to_cpu(lma->lma_incompat);
+ lma->lma_incompat |=
+ lustre_to_lma_flags(la->la_flags);
+ lma->lma_incompat =
+ cpu_to_le32(lma->lma_incompat);
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(*lma);
+ rc = osd_xattr_set_internal(env, obj, &buf,
+ XATTR_NAME_LMA,
+ LU_XATTR_REPLACE,
+ oh);
+ }
+ if (rc < 0) {
+ CWARN("%s: failed to set LMA flags: rc = %d\n",
+ osd->od_svname, rc);
+ RETURN(rc);
+ }
+ }
+ }
OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 10);
if (bulk == NULL)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
/* do both accounting updates outside oo_attr_lock below */
if ((valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 10);
+out:
+ up_read(&obj->oo_guard);
RETURN(rc);
}
/* concurrent create declarations should not see
* the object inconsistent (db, attr, etc).
* in regular cases acquisition should be cheap */
- down(&obj->oo_guard);
+ down_write(&obj->oo_guard);
+
+ if (unlikely(dt_object_exists(dt)))
+ GOTO(out, rc = -EEXIST);
LASSERT(osd_invariant(obj));
- LASSERT(!dt_object_exists(dt));
LASSERT(dof != NULL);
LASSERT(th != NULL);
LASSERT(osd_invariant(obj));
rc = osd_init_lma(env, obj, fid, oh);
- if (rc) {
+ if (rc != 0)
CERROR("%s: can not set LMA on "DFID": rc = %d\n",
osd->od_svname, PFID(fid), rc);
- /* ignore errors during LMA initialization */
- rc = 0;
- }
out:
- up(&obj->oo_guard);
+ up_write(&obj->oo_guard);
RETURN(rc);
}
ENTRY;
+ down_read(&obj->oo_guard);
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
+
LASSERT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
LASSERT(obj->oo_sa_hdl != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
write_unlock(&obj->oo_attr_lock);
rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
- return rc;
+
+out:
+ up_read(&obj->oo_guard);
+ RETURN(rc);
}
static int osd_declare_object_ref_del(const struct lu_env *env,
ENTRY;
+ down_read(&obj->oo_guard);
+
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
+
LASSERT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
LASSERT(obj->oo_sa_hdl != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
write_unlock(&obj->oo_attr_lock);
rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
- return rc;
+
+out:
+ up_read(&obj->oo_guard);
+ RETURN(rc);
}
static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,