Whamcloud - gitweb
LU-8928 osd: convert osd-zfs to reference dnode, not db
[fs/lustre-release.git] / lustre / osd-zfs / osd_object.c
index 1cdb0e1..473211a 100644 (file)
@@ -86,9 +86,9 @@ osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
        int rc;
 
        LASSERT(obj->oo_sa_hdl == NULL);
-       LASSERT(obj->oo_db != NULL);
+       LASSERT(obj->oo_dn != NULL);
 
-       rc = -sa_handle_get(o->od_os, obj->oo_db->db_object, obj,
+       rc = -sa_handle_get(o->od_os, obj->oo_dn->dn_object, obj,
                            SA_HDL_PRIVATE, &obj->oo_sa_hdl);
        if (rc)
                return rc;
@@ -189,9 +189,9 @@ int __osd_object_attr_get(const struct lu_env *env, struct osd_device *o,
        int              rc;
        ENTRY;
 
-       LASSERT(obj->oo_db != NULL);
+       LASSERT(obj->oo_dn != NULL);
 
-       rc = -sa_handle_get(o->od_os, obj->oo_db->db_object, NULL,
+       rc = -sa_handle_get(o->od_os, obj->oo_dn->dn_object, NULL,
                            SA_HDL_PRIVATE, &sa_hdl);
        if (rc)
                RETURN(rc);
@@ -259,29 +259,31 @@ out_sa:
        RETURN(rc);
 }
 
-int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
-                  uint64_t oid, dmu_buf_t **dbp)
+int __osd_obj2dnode(const struct lu_env *env, objset_t *os,
+                  uint64_t oid, dnode_t **dnp)
 {
        dmu_object_info_t *doi = &osd_oti_get(env)->oti_doi;
+       dmu_buf_t *db;
+       dmu_buf_impl_t *dbi;
        int rc;
 
-       rc = -sa_buf_hold(os, oid, osd_obj_tag, dbp);
+       rc = dmu_bonus_hold(os, oid, osd_obj_tag, &db);
        if (rc)
                return rc;
 
-       dmu_object_info_from_db(*dbp, doi);
+       dbi = (dmu_buf_impl_t *)db;
+       DB_DNODE_ENTER(dbi);
+       *dnp = DB_DNODE(dbi);
+
+       LASSERT(*dnp != NULL);
+       dmu_object_info_from_dnode(*dnp, doi);
        if (unlikely (oid != DMU_USERUSED_OBJECT &&
            oid != DMU_GROUPUSED_OBJECT && doi->doi_bonus_type != DMU_OT_SA)) {
-               sa_buf_rele(*dbp, osd_obj_tag);
-               *dbp = NULL;
+               osd_dnode_rele(*dnp);
+               *dnp = NULL;
                return -EINVAL;
        }
 
-       LASSERT(*dbp);
-       LASSERT((*dbp)->db_object == oid);
-       LASSERT((*dbp)->db_offset == -1);
-       LASSERT((*dbp)->db_data != NULL);
-
        return 0;
 }
 
@@ -325,7 +327,7 @@ int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
        int                      rc = 0;
        ENTRY;
 
-       if (obj->oo_db == NULL)
+       if (obj->oo_dn == NULL)
                RETURN(0);
 
        /* object exist */
@@ -413,8 +415,8 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
 
        rc = osd_fid_lookup(env, osd, lu_object_fid(l), &oid);
        if (rc == 0) {
-               LASSERT(obj->oo_db == NULL);
-               rc = __osd_obj2dbuf(env, osd->od_os, oid, &obj->oo_db);
+               LASSERT(obj->oo_dn == NULL);
+               rc = __osd_obj2dnode(env, osd->od_os, oid, &obj->oo_dn);
                /* EEXIST will be returned if object is being deleted in ZFS */
                if (rc == -EEXIST) {
                        rc = 0;
@@ -425,7 +427,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
                               osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
                        GOTO(out, rc);
                }
-               LASSERT(obj->oo_db);
+               LASSERT(obj->oo_dn);
                rc = osd_object_init0(env, obj);
                if (rc != 0)
                        GOTO(out, rc);
@@ -538,7 +540,7 @@ static int osd_declare_object_destroy(const struct lu_env *env,
 
        osd_object_set_destroy_type(obj);
        if (obj->oo_destroy == OSD_DESTROY_SYNC)
-               dmu_tx_hold_free(oh->ot_tx, obj->oo_db->db_object,
+               dmu_tx_hold_free(oh->ot_tx, obj->oo_dn->dn_object,
                                 0, DMU_OBJECT_END);
        else
                dmu_tx_hold_zap(oh->ot_tx, osd->od_unlinkedid, TRUE, NULL);
@@ -568,7 +570,7 @@ static int osd_object_destroy(const struct lu_env *env,
        if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
                GOTO(out, rc = -ENOENT);
 
-       LASSERT(obj->oo_db != NULL);
+       LASSERT(obj->oo_dn != NULL);
 
        oh = container_of0(th, struct osd_thandle, ot_super);
        LASSERT(oh != NULL);
@@ -606,7 +608,7 @@ static int osd_object_destroy(const struct lu_env *env,
                       " %d: rc = %d\n", osd->od_svname, PFID(fid),
                       obj->oo_attr.la_gid, rc);
 
-       oid = obj->oo_db->db_object;
+       oid = obj->oo_dn->dn_object;
        if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
                /* this may happen if the destroy wasn't declared
                 * e.g. when the object is created and then destroyed
@@ -647,15 +649,15 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 {
        struct osd_object *obj = osd_obj(l);
 
-       if (obj->oo_db != NULL) {
+       if (obj->oo_dn != NULL) {
                osd_object_sa_fini(obj);
                if (obj->oo_sa_xattr) {
                        nvlist_free(obj->oo_sa_xattr);
                        obj->oo_sa_xattr = NULL;
                }
-               sa_buf_rele(obj->oo_db, osd_obj_tag);
+               osd_dnode_rele(obj->oo_dn);
                list_del(&obj->oo_sa_linkage);
-               obj->oo_db = NULL;
+               obj->oo_dn = NULL;
        }
 }
 
@@ -746,7 +748,7 @@ static int osd_attr_get(const struct lu_env *env,
                GOTO(out, rc = -ENOENT);
 
        LASSERT(osd_invariant(obj));
-       LASSERT(obj->oo_db);
+       LASSERT(obj->oo_dn);
 
        read_lock(&obj->oo_attr_lock);
        *attr = obj->oo_attr;
@@ -873,7 +875,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
             txh = list_next(&oh->ot_tx->tx_holds, txh)) {
                if (txh->txh_dnode == NULL)
                        continue;
-               if (txh->txh_dnode->dn_object != obj->oo_db->db_object)
+               if (txh->txh_dnode->dn_object != obj->oo_dn->dn_object)
                        continue;
                /* this object is part of the transaction already
                 * we don't need to declare bonus again */
@@ -881,7 +883,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                break;
        }
        if (!found)
-               dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
+               dmu_tx_hold_bonus(oh->ot_tx, obj->oo_dn->dn_object);
        if (oh->ot_tx->tx_err != 0)
                GOTO(out, rc = -oh->ot_tx->tx_err);
 
@@ -1273,19 +1275,59 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
        return rc;
 }
 
+static int osd_find_new_dnode(const struct lu_env *env, dmu_tx_t *tx,
+                             uint64_t oid, dnode_t **dnp)
+{
+       dmu_tx_hold_t *txh;
+       int rc = 0;
+
+       /* take dnode_t from tx to save on dnode#->dnode_t lookup */
+       for (txh = list_tail(&tx->tx_holds); txh;
+            txh = list_prev(&tx->tx_holds, txh)) {
+               dnode_t *dn = txh->txh_dnode;
+               dmu_buf_impl_t *db;
+
+               if (dn == NULL)
+                       continue;
+               if (dn->dn_object != oid)
+                       continue;
+               db = dn->dn_bonus;
+               if (db == NULL) {
+                       rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+                       if (dn->dn_bonus == NULL)
+                               dbuf_create_bonus(dn);
+                       rw_exit(&dn->dn_struct_rwlock);
+               }
+               db = dn->dn_bonus;
+               LASSERT(db);
+               LASSERT(dn->dn_handle);
+               DB_DNODE_ENTER(db);
+               if (refcount_add(&db->db_holds, osd_obj_tag) == 1) {
+                       refcount_add(&dn->dn_holds, tag);
+                       atomic_inc_32(&dn->dn_dbufs_count);
+               }
+               *dnp = dn;
+               break;
+       }
+
+       if (unlikely(*dnp == NULL))
+               rc = __osd_obj2dnode(env, tx->tx_objset, oid, dnp);
+
+       return rc;
+}
+
 /*
  * The transaction passed to this routine must have
  * dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
  * to a transaction group.
  */
 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
-                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la)
+                       dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la)
 {
-       uint64_t             oid;
-       int                  rc;
        struct osd_device   *osd = osd_obj2dev(obj);
        const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
        dmu_object_type_t    type = DMU_OT_PLAIN_FILE_CONTENTS;
+       uint64_t oid;
 
        /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
         * would get an additional ditto copy */
@@ -1295,14 +1337,12 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
 
        /* Create a new DMU object using the default dnode size. */
        oid = osd_dmu_object_alloc(osd->od_os, type, 0, 0, tx);
-       rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, dbp);
-       LASSERTF(rc == 0, "sa_buf_hold %llu failed: %d\n", oid, rc);
 
        LASSERT(la->la_valid & LA_MODE);
        la->la_size = 0;
        la->la_nlink = 1;
 
-       return 0;
+       return osd_find_new_dnode(env, tx, oid, dnp);
 }
 
 /*
@@ -1316,15 +1356,15 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
  * then we might need to re-evaluate the use of this flag and instead do
  * a conversion from the different internal ZAP hash formats being used. */
 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
-                    dmu_buf_t **zap_dbp, dmu_tx_t *tx,
-                    struct lu_attr *la, zap_flags_t flags)
+                    dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la,
+                    zap_flags_t flags)
 {
        uint64_t oid;
-       int      rc;
 
        /* Assert that the transaction has been assigned to a
           transaction group. */
        LASSERT(tx->tx_txg != 0);
+       *dnp = NULL;
 
        oid = osd_zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
                                   DMU_OT_DIRECTORY_CONTENTS,
@@ -1332,57 +1372,53 @@ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
                                   DN_MAX_INDBLKSHIFT, /* indirect blockshift */
                                   0, tx);
 
-       rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, zap_dbp);
-       if (rc)
-               return rc;
-
        la->la_size = 2;
        la->la_nlink = 1;
 
-       return 0;
+       return osd_find_new_dnode(env, tx, oid, dnp);
 }
 
-static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
+                         struct lu_attr *la, struct osd_thandle *oh)
 {
-       dmu_buf_t *db;
-       int        rc;
+       dnode_t *dn;
+       int rc;
 
        /* Index file should be created as regular file in order not to confuse
         * ZPL which could interpret them as directory.
         * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
         * binary keys */
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la,
+       rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
                              ZAP_FLAG_UINT64_KEY);
        if (rc)
                return ERR_PTR(rc);
-       return db;
+       return dn;
 }
 
-static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
+                         struct lu_attr *la, struct osd_thandle *oh)
 {
-       dmu_buf_t *db;
-       int        rc;
+       dnode_t *dn;
+       int rc;
 
        LASSERT(S_ISDIR(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, 0);
+       rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la, 0);
        if (rc)
                return ERR_PTR(rc);
-       return db;
+       return dn;
 }
 
-static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
+                         struct lu_attr *la, struct osd_thandle *oh)
 {
        const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
-       dmu_buf_t           *db;
-       int                  rc;
        struct osd_device *osd = osd_obj2dev(obj);
+       dnode_t *dn;
+       int rc;
 
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+       rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
 
@@ -1392,7 +1428,7 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
                 * it will break the assumption in tgt_thread_big_cache where
                 * the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
                 * RDMA due to subpage transfer size */
-               rc = -dmu_object_set_blocksize(osd->od_os, db->db_object,
+               rc = -dmu_object_set_blocksize(osd->od_os, dn->dn_object,
                                               PAGE_SIZE, 0, oh->ot_tx);
                if (unlikely(rc)) {
                        CERROR("%s: can't change blocksize: %d\n",
@@ -1401,41 +1437,41 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
                }
        }
 
-       return db;
+       return dn;
 }
 
-static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
+                         struct lu_attr *la, struct osd_thandle *oh)
 {
-       dmu_buf_t *db;
-       int        rc;
+       dnode_t *dn;
+       int rc;
 
        LASSERT(S_ISLNK(la->la_mode));
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+       rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
-       return db;
+       return dn;
 }
 
-static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+static dnode_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
+                         struct lu_attr *la, struct osd_thandle *oh)
 {
-       dmu_buf_t *db;
-       int        rc;
+       dnode_t *dn;
+       int rc;
 
        if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
                la->la_valid |= LA_RDEV;
 
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+       rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
-       return db;
+       return dn;
 }
 
-typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env,
-                                    struct osd_object *obj,
-                                    struct lu_attr *la,
-                                    struct osd_thandle *oh);
+typedef dnode_t *(*osd_obj_type_f)(const struct lu_env *env,
+                                  struct osd_object *obj,
+                                  struct lu_attr *la,
+                                  struct osd_thandle *oh);
 
 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
 {
@@ -1481,7 +1517,7 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
        struct osd_device       *osd = osd_obj2dev(obj);
        char                    *buf = info->oti_str;
        struct osd_thandle      *oh;
-       dmu_buf_t               *db = NULL;
+       dnode_t *dn = NULL;
        uint64_t                 zapid, parent = 0;
        int                      rc;
 
@@ -1505,27 +1541,27 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         * XXX missing: Quote handling.
         */
 
-       LASSERT(obj->oo_db == NULL);
+       LASSERT(obj->oo_dn == NULL);
 
        /* to follow ZFS on-disk format we need
         * to initialize parent dnode properly */
        if (hint != NULL && hint->dah_parent != NULL &&
            !dt_object_remote(hint->dah_parent))
-               parent = osd_dt_obj(hint->dah_parent)->oo_db->db_object;
+               parent = osd_dt_obj(hint->dah_parent)->oo_dn->dn_object;
 
        /* we may fix some attributes, better do not change the source */
        obj->oo_attr = *attr;
        obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE;
 
-       db = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
-       if (IS_ERR(db)) {
-               rc = PTR_ERR(db);
-               db = NULL;
+       dn = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
+       if (IS_ERR(dn)) {
+               rc = PTR_ERR(dn);
+               dn = NULL;
                GOTO(out, rc);
        }
 
        zde->zde_pad = 0;
-       zde->zde_dnode = db->db_object;
+       zde->zde_dnode = dn->dn_object;
        zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
 
        zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str));
@@ -1535,13 +1571,13 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
 
        /* Now add in all of the "SA" attributes */
-       rc = -sa_handle_get(osd->od_os, db->db_object, NULL,
+       rc = -sa_handle_get(osd->od_os, dn->dn_object, NULL,
                            SA_HDL_PRIVATE, &obj->oo_sa_hdl);
        if (rc)
                GOTO(out, rc);
 
        /* configure new osd object */
-       obj->oo_db = db;
+       obj->oo_dn = dn;
        parent = parent != 0 ? parent : zapid;
        rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx,
                             &obj->oo_attr, parent);
@@ -1588,10 +1624,10 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                        "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc);
 
 out:
-       if (unlikely(rc && db)) {
-               dmu_object_free(osd->od_os, db->db_object, oh->ot_tx);
-               sa_buf_rele(db, osd_obj_tag);
-               obj->oo_db = NULL;
+       if (unlikely(rc && dn)) {
+               dmu_object_free(osd->od_os, dn->dn_object, oh->ot_tx);
+               osd_dnode_rele(dn);
+               obj->oo_dn = NULL;
        }
        up_write(&obj->oo_guard);
        RETURN(rc);