la->la_flags = attrs_zfs2fs(osa->flags);
la->la_size = osa->size;
+ /* Try to get extra flag from LMA. Right now, only LMAI_ORPHAN
+ * flags is stored in LMA, and it is only for orphan directory */
+ if (S_ISDIR(la->la_mode) && dt_object_exists(&obj->oo_dt)) {
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma;
+ struct lu_buf buf;
+
+ lma = (struct lustre_mdt_attrs *)info->oti_buf;
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(info->oti_buf);
+ rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
+ if (rc > 0) {
+ rc = 0;
+ lma->lma_incompat = le32_to_cpu(lma->lma_incompat);
+ obj->oo_lma_flags =
+ lma_to_lustre_flags(lma->lma_incompat);
+
+ } else if (rc == -ENODATA) {
+ rc = 0;
+ }
+ }
+
if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
rc = -sa_lookup(sa_hdl, SA_ZPL_RDEV(o), &osa->rdev, 8);
if (rc)
INIT_LIST_HEAD(&mo->oo_sa_linkage);
INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
init_rwsem(&mo->oo_sem);
- sema_init(&mo->oo_guard, 1);
+ init_rwsem(&mo->oo_guard);
rwlock_init(&mo->oo_attr_lock);
mo->oo_destroy = OSD_DESTROY_NONE;
return l;
{
int rc = -EBUSY;
- down(&obj->oo_guard);
-
LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
+ /* the object is supposed to be exclusively locked by
+ * the caller (osd_object_destroy()), while the transaction
+ * (oh) is per-thread and not shared */
if (likely(list_empty(&obj->oo_unlinked_linkage))) {
list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
rc = 0;
}
- up(&obj->oo_guard);
-
return rc;
}
* Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
* only once and use it consistently thereafter.
*/
- down(&obj->oo_guard);
+ down_write(&obj->oo_guard);
if (obj->oo_destroy == OSD_DESTROY_NONE) {
if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
obj->oo_destroy = OSD_DESTROY_SYNC;
else /* Larger objects are destroyed asynchronously */
obj->oo_destroy = OSD_DESTROY_ASYNC;
}
- up(&obj->oo_guard);
+ up_write(&obj->oo_guard);
}
static int osd_declare_object_destroy(const struct lu_env *env,
uint64_t oid, zapid;
ENTRY;
+ down_write(&obj->oo_guard);
+
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
+
LASSERT(obj->oo_db != NULL);
- LASSERT(dt_object_exists(dt));
- LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
oh = container_of0(th, struct osd_thandle, ot_super);
LASSERT(oh != NULL);
set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
if (rc == 0)
obj->oo_destroyed = 1;
+ up_write(&obj->oo_guard);
RETURN (0);
}
struct osd_object *obj = osd_dt_obj(dt);
uint64_t blocks;
uint32_t blksize;
+ int rc = 0;
+
+ down_read(&obj->oo_guard);
+
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
- LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
LASSERT(obj->oo_db);
read_lock(&obj->oo_attr_lock);
*attr = obj->oo_attr;
+ if (obj->oo_lma_flags & LUSTRE_ORPHAN_FL)
+ attr->la_flags |= LUSTRE_ORPHAN_FL;
read_unlock(&obj->oo_attr_lock);
/* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
attr->la_blocks = blocks;
attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
- return 0;
+out:
+ up_read(&obj->oo_guard);
+ return rc;
}
/* Simple wrapper on top of qsd API which implement quota transfer for osd
struct osd_thandle *oh;
uint64_t bspace;
uint32_t blksize;
- int rc;
+ int rc = 0;
ENTRY;
- if (!dt_object_exists(dt)) {
- /* XXX: sanity check that object creation is declared */
- RETURN(0);
- }
LASSERT(handle != NULL);
LASSERT(osd_invariant(obj));
oh = container_of0(handle, struct osd_thandle, ot_super);
+ down_read(&obj->oo_guard);
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = 0);
+
LASSERT(obj->oo_sa_hdl != NULL);
LASSERT(oh->ot_tx != NULL);
dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
if (oh->ot_tx->tx_err != 0)
- RETURN(-oh->ot_tx->tx_err);
+ GOTO(out, rc = -oh->ot_tx->tx_err);
sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
bspace = toqb(bspace * blksize);
+ __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
+ XATTR_NAME_LMA, oh);
+
if (attr && attr->la_valid & LA_UID) {
/* account for user inode tracking ZAP update */
dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
obj->oo_attr.la_uid, attr->la_uid,
bspace, &info->oti_qi);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
}
}
if (attr && attr->la_valid & LA_GID) {
obj->oo_attr.la_gid, attr->la_gid,
bspace, &info->oti_qi);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
}
}
- RETURN(0);
+out:
+ up_read(&obj->oo_guard);
+ RETURN(rc);
}
/*
static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_attr *la, struct thandle *handle)
{
+ struct osd_thread_info *info = osd_oti_get(env);
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
- struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
+ struct osa_attr *osa = &info->oti_osa;
sa_bulk_attr_t *bulk;
__u64 valid = la->la_valid;
int cnt;
int rc = 0;
ENTRY;
+
+ down_read(&obj->oo_guard);
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
+
LASSERT(handle != NULL);
- LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
LASSERT(obj->oo_sa_hdl);
valid &= ~(LA_SIZE | LA_BLOCKS);
if (valid == 0)
- RETURN(0);
+ GOTO(out, rc = 0);
+
+ if (valid & LA_FLAGS) {
+ struct lustre_mdt_attrs *lma;
+ struct lu_buf buf;
+
+ if (la->la_flags & LUSTRE_LMA_FL_MASKS) {
+ CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
+ lma = (struct lustre_mdt_attrs *)&info->oti_buf;
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(info->oti_buf);
+ rc = osd_xattr_get(env, &obj->oo_dt, &buf,
+ XATTR_NAME_LMA);
+ if (rc > 0) {
+ lma->lma_incompat =
+ le32_to_cpu(lma->lma_incompat);
+ lma->lma_incompat |=
+ lustre_to_lma_flags(la->la_flags);
+ lma->lma_incompat =
+ cpu_to_le32(lma->lma_incompat);
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(*lma);
+ rc = osd_xattr_set_internal(env, obj, &buf,
+ XATTR_NAME_LMA,
+ LU_XATTR_REPLACE,
+ oh);
+ }
+ if (rc < 0) {
+ CWARN("%s: failed to set LMA flags: rc = %d\n",
+ osd->od_svname, rc);
+ RETURN(rc);
+ }
+ }
+ }
OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 10);
if (bulk == NULL)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
/* do both accounting updates outside oo_attr_lock below */
if ((valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 10);
+out:
+ up_read(&obj->oo_guard);
RETURN(rc);
}
/* concurrent create declarations should not see
* the object inconsistent (db, attr, etc).
* in regular cases acquisition should be cheap */
- down(&obj->oo_guard);
+ down_write(&obj->oo_guard);
+
+ if (unlikely(dt_object_exists(dt)))
+ GOTO(out, rc = -EEXIST);
LASSERT(osd_invariant(obj));
- LASSERT(!dt_object_exists(dt));
LASSERT(dof != NULL);
LASSERT(th != NULL);
}
out:
- up(&obj->oo_guard);
+ up_write(&obj->oo_guard);
RETURN(rc);
}
ENTRY;
- if (!dt_object_exists(dt))
- RETURN(-ENOENT);
+ down_read(&obj->oo_guard);
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
LASSERT(osd_invariant(obj));
LASSERT(obj->oo_sa_hdl != NULL);
write_unlock(&obj->oo_attr_lock);
rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
- return rc;
+
+out:
+ up_read(&obj->oo_guard);
+ RETURN(rc);
}
static int osd_declare_object_ref_del(const struct lu_env *env,
ENTRY;
+ down_read(&obj->oo_guard);
+
+ if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+ GOTO(out, rc = -ENOENT);
+
LASSERT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
LASSERT(obj->oo_sa_hdl != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
write_unlock(&obj->oo_attr_lock);
rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
+
+out:
+ up_read(&obj->oo_guard);
RETURN(rc);
}