int rc;
LASSERT(obj->oo_sa_hdl == NULL);
- LASSERT(obj->oo_db != NULL);
+ LASSERT(obj->oo_dn != NULL);
- rc = -sa_handle_get(o->od_os, obj->oo_db->db_object, obj,
+ rc = -sa_handle_get(o->od_os, obj->oo_dn->dn_object, obj,
SA_HDL_PRIVATE, &obj->oo_sa_hdl);
if (rc)
return rc;
int rc;
ENTRY;
- LASSERT(obj->oo_db != NULL);
+ LASSERT(obj->oo_dn != NULL);
- rc = -sa_handle_get(o->od_os, obj->oo_db->db_object, NULL,
+ rc = -sa_handle_get(o->od_os, obj->oo_dn->dn_object, NULL,
SA_HDL_PRIVATE, &sa_hdl);
if (rc)
RETURN(rc);
RETURN(rc);
}
-int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
- uint64_t oid, dmu_buf_t **dbp)
+int __osd_obj2dnode(const struct lu_env *env, objset_t *os,
+ uint64_t oid, dnode_t **dnp)
{
dmu_object_info_t *doi = &osd_oti_get(env)->oti_doi;
+ dmu_buf_t *db;
+ dmu_buf_impl_t *dbi;
int rc;
- rc = -sa_buf_hold(os, oid, osd_obj_tag, dbp);
+ rc = dmu_bonus_hold(os, oid, osd_obj_tag, &db);
if (rc)
return rc;
- dmu_object_info_from_db(*dbp, doi);
+ dbi = (dmu_buf_impl_t *)db;
+ DB_DNODE_ENTER(dbi);
+ *dnp = DB_DNODE(dbi);
+
+ LASSERT(*dnp != NULL);
+ dmu_object_info_from_dnode(*dnp, doi);
if (unlikely (oid != DMU_USERUSED_OBJECT &&
oid != DMU_GROUPUSED_OBJECT && doi->doi_bonus_type != DMU_OT_SA)) {
- sa_buf_rele(*dbp, osd_obj_tag);
- *dbp = NULL;
+ osd_dnode_rele(*dnp);
+ *dnp = NULL;
return -EINVAL;
}
- LASSERT(*dbp);
- LASSERT((*dbp)->db_object == oid);
- LASSERT((*dbp)->db_offset == -1);
- LASSERT((*dbp)->db_data != NULL);
-
return 0;
}
int rc = 0;
ENTRY;
- if (obj->oo_db == NULL)
+ if (obj->oo_dn == NULL)
RETURN(0);
/* object exist */
rc = osd_fid_lookup(env, osd, lu_object_fid(l), &oid);
if (rc == 0) {
- LASSERT(obj->oo_db == NULL);
- rc = __osd_obj2dbuf(env, osd->od_os, oid, &obj->oo_db);
+ LASSERT(obj->oo_dn == NULL);
+ rc = __osd_obj2dnode(env, osd->od_os, oid, &obj->oo_dn);
/* EEXIST will be returned if object is being deleted in ZFS */
if (rc == -EEXIST) {
rc = 0;
osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
GOTO(out, rc);
}
- LASSERT(obj->oo_db);
+ LASSERT(obj->oo_dn);
rc = osd_object_init0(env, obj);
if (rc != 0)
GOTO(out, rc);
osd_object_set_destroy_type(obj);
if (obj->oo_destroy == OSD_DESTROY_SYNC)
- dmu_tx_hold_free(oh->ot_tx, obj->oo_db->db_object,
+ dmu_tx_hold_free(oh->ot_tx, obj->oo_dn->dn_object,
0, DMU_OBJECT_END);
else
dmu_tx_hold_zap(oh->ot_tx, osd->od_unlinkedid, TRUE, NULL);
if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
GOTO(out, rc = -ENOENT);
- LASSERT(obj->oo_db != NULL);
+ LASSERT(obj->oo_dn != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
LASSERT(oh != NULL);
" %d: rc = %d\n", osd->od_svname, PFID(fid),
obj->oo_attr.la_gid, rc);
- oid = obj->oo_db->db_object;
+ oid = obj->oo_dn->dn_object;
if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
/* this may happen if the destroy wasn't declared
* e.g. when the object is created and then destroyed
{
struct osd_object *obj = osd_obj(l);
- if (obj->oo_db != NULL) {
+ if (obj->oo_dn != NULL) {
osd_object_sa_fini(obj);
if (obj->oo_sa_xattr) {
nvlist_free(obj->oo_sa_xattr);
obj->oo_sa_xattr = NULL;
}
- sa_buf_rele(obj->oo_db, osd_obj_tag);
+ osd_dnode_rele(obj->oo_dn);
list_del(&obj->oo_sa_linkage);
- obj->oo_db = NULL;
+ obj->oo_dn = NULL;
}
}
GOTO(out, rc = -ENOENT);
LASSERT(osd_invariant(obj));
- LASSERT(obj->oo_db);
+ LASSERT(obj->oo_dn);
read_lock(&obj->oo_attr_lock);
*attr = obj->oo_attr;
txh = list_next(&oh->ot_tx->tx_holds, txh)) {
if (txh->txh_dnode == NULL)
continue;
- if (txh->txh_dnode->dn_object != obj->oo_db->db_object)
+ if (txh->txh_dnode->dn_object != obj->oo_dn->dn_object)
continue;
/* this object is part of the transaction already
* we don't need to declare bonus again */
break;
}
if (!found)
- dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
+ dmu_tx_hold_bonus(oh->ot_tx, obj->oo_dn->dn_object);
if (oh->ot_tx->tx_err != 0)
GOTO(out, rc = -oh->ot_tx->tx_err);
return rc;
}
+static int osd_find_new_dnode(const struct lu_env *env, dmu_tx_t *tx,
+ uint64_t oid, dnode_t **dnp)
+{
+ dmu_tx_hold_t *txh;
+ int rc = 0;
+
+ /* take dnode_t from tx to save on dnode#->dnode_t lookup */
+ for (txh = list_tail(&tx->tx_holds); txh;
+ txh = list_prev(&tx->tx_holds, txh)) {
+ dnode_t *dn = txh->txh_dnode;
+ dmu_buf_impl_t *db;
+
+ if (dn == NULL)
+ continue;
+ if (dn->dn_object != oid)
+ continue;
+ db = dn->dn_bonus;
+ if (db == NULL) {
+ rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+ if (dn->dn_bonus == NULL)
+ dbuf_create_bonus(dn);
+ rw_exit(&dn->dn_struct_rwlock);
+ }
+ db = dn->dn_bonus;
+ LASSERT(db);
+ LASSERT(dn->dn_handle);
+ DB_DNODE_ENTER(db);
+ if (refcount_add(&db->db_holds, osd_obj_tag) == 1) {
+ refcount_add(&dn->dn_holds, tag);
+ atomic_inc_32(&dn->dn_dbufs_count);
+ }
+ *dnp = dn;
+ break;
+ }
+
+ if (unlikely(*dnp == NULL))
+ rc = __osd_obj2dnode(env, tx->tx_objset, oid, dnp);
+
+ return rc;
+}
+
/*
* The transaction passed to this routine must have
* dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
* to a transaction group.
*/
int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
- dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la)
+ dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la)
{
- uint64_t oid;
- int rc;
struct osd_device *osd = osd_obj2dev(obj);
const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
dmu_object_type_t type = DMU_OT_PLAIN_FILE_CONTENTS;
+ uint64_t oid;
/* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
* would get an additional ditto copy */
/* Create a new DMU object using the default dnode size. */
oid = osd_dmu_object_alloc(osd->od_os, type, 0, 0, tx);
- rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, dbp);
- LASSERTF(rc == 0, "sa_buf_hold %llu failed: %d\n", oid, rc);
LASSERT(la->la_valid & LA_MODE);
la->la_size = 0;
la->la_nlink = 1;
- return 0;
+ return osd_find_new_dnode(env, tx, oid, dnp);
}
/*
* then we might need to re-evaluate the use of this flag and instead do
* a conversion from the different internal ZAP hash formats being used. */
int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
- dmu_buf_t **zap_dbp, dmu_tx_t *tx,
- struct lu_attr *la, zap_flags_t flags)
+ dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la,
+ zap_flags_t flags)
{
uint64_t oid;
- int rc;
/* Assert that the transaction has been assigned to a
transaction group. */
LASSERT(tx->tx_txg != 0);
+ *dnp = NULL;
oid = osd_zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
DMU_OT_DIRECTORY_CONTENTS,
DN_MAX_INDBLKSHIFT, /* indirect blockshift */
0, tx);
- rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, zap_dbp);
- if (rc)
- return rc;
-
la->la_size = 2;
la->la_nlink = 1;
- return 0;
+ return osd_find_new_dnode(env, tx, oid, dnp);
}
-static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
- struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
+ struct lu_attr *la, struct osd_thandle *oh)
{
- dmu_buf_t *db;
- int rc;
+ dnode_t *dn;
+ int rc;
/* Index file should be created as regular file in order not to confuse
* ZPL which could interpret them as directory.
* We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
* binary keys */
LASSERT(S_ISREG(la->la_mode));
- rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la,
+ rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
ZAP_FLAG_UINT64_KEY);
if (rc)
return ERR_PTR(rc);
- return db;
+ return dn;
}
-static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
- struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
+ struct lu_attr *la, struct osd_thandle *oh)
{
- dmu_buf_t *db;
- int rc;
+ dnode_t *dn;
+ int rc;
LASSERT(S_ISDIR(la->la_mode));
- rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, 0);
+ rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la, 0);
if (rc)
return ERR_PTR(rc);
- return db;
+ return dn;
}
-static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
- struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
+ struct lu_attr *la, struct osd_thandle *oh)
{
const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
- dmu_buf_t *db;
- int rc;
struct osd_device *osd = osd_obj2dev(obj);
+ dnode_t *dn;
+ int rc;
LASSERT(S_ISREG(la->la_mode));
- rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+ rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
if (rc)
return ERR_PTR(rc);
* it will break the assumption in tgt_thread_big_cache where
* the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
* RDMA due to subpage transfer size */
- rc = -dmu_object_set_blocksize(osd->od_os, db->db_object,
+ rc = -dmu_object_set_blocksize(osd->od_os, dn->dn_object,
PAGE_SIZE, 0, oh->ot_tx);
if (unlikely(rc)) {
CERROR("%s: can't change blocksize: %d\n",
}
}
- return db;
+ return dn;
}
-static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
- struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
+ struct lu_attr *la, struct osd_thandle *oh)
{
- dmu_buf_t *db;
- int rc;
+ dnode_t *dn;
+ int rc;
LASSERT(S_ISLNK(la->la_mode));
- rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+ rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
if (rc)
return ERR_PTR(rc);
- return db;
+ return dn;
}
-static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
- struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
+ struct lu_attr *la, struct osd_thandle *oh)
{
- dmu_buf_t *db;
- int rc;
+ dnode_t *dn;
+ int rc;
if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
la->la_valid |= LA_RDEV;
- rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+ rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
if (rc)
return ERR_PTR(rc);
- return db;
+ return dn;
}
-typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env,
- struct osd_object *obj,
- struct lu_attr *la,
- struct osd_thandle *oh);
+typedef dnode_t *(*osd_obj_type_f)(const struct lu_env *env,
+ struct osd_object *obj,
+ struct lu_attr *la,
+ struct osd_thandle *oh);
static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
{
struct osd_device *osd = osd_obj2dev(obj);
char *buf = info->oti_str;
struct osd_thandle *oh;
- dmu_buf_t *db = NULL;
+ dnode_t *dn = NULL;
uint64_t zapid, parent = 0;
int rc;
* XXX missing: Quote handling.
*/
- LASSERT(obj->oo_db == NULL);
+ LASSERT(obj->oo_dn == NULL);
/* to follow ZFS on-disk format we need
* to initialize parent dnode properly */
if (hint != NULL && hint->dah_parent != NULL &&
!dt_object_remote(hint->dah_parent))
- parent = osd_dt_obj(hint->dah_parent)->oo_db->db_object;
+ parent = osd_dt_obj(hint->dah_parent)->oo_dn->dn_object;
/* we may fix some attributes, better do not change the source */
obj->oo_attr = *attr;
obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE;
- db = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
- if (IS_ERR(db)) {
- rc = PTR_ERR(db);
- db = NULL;
+ dn = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
+ if (IS_ERR(dn)) {
+ rc = PTR_ERR(dn);
+ dn = NULL;
GOTO(out, rc);
}
zde->zde_pad = 0;
- zde->zde_dnode = db->db_object;
+ zde->zde_dnode = dn->dn_object;
zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str));
GOTO(out, rc);
/* Now add in all of the "SA" attributes */
- rc = -sa_handle_get(osd->od_os, db->db_object, NULL,
+ rc = -sa_handle_get(osd->od_os, dn->dn_object, NULL,
SA_HDL_PRIVATE, &obj->oo_sa_hdl);
if (rc)
GOTO(out, rc);
/* configure new osd object */
- obj->oo_db = db;
+ obj->oo_dn = dn;
parent = parent != 0 ? parent : zapid;
rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx,
&obj->oo_attr, parent);
"(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc);
out:
- if (unlikely(rc && db)) {
- dmu_object_free(osd->od_os, db->db_object, oh->ot_tx);
- sa_buf_rele(db, osd_obj_tag);
- obj->oo_db = NULL;
+ if (unlikely(rc && dn)) {
+ dmu_object_free(osd->od_os, dn->dn_object, oh->ot_tx);
+ osd_dnode_rele(dn);
+ obj->oo_dn = NULL;
}
up_write(&obj->oo_guard);
RETURN(rc);