Whamcloud - gitweb
LU-7898 osd: remove unnecessary declarations 01/19101/12
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Wed, 23 Mar 2016 18:42:54 +0000 (21:42 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 2 Sep 2016 02:24:14 +0000 (02:24 +0000)
Refactor the code a bit to remove unnecessary declarations
(which are very expensive in ZFS). The patch also introduces
initial preparations to support large dnodes - it tracks
all declared EAs at object creation and tracked number can
be used to request dnode of appropriate size.

With this patch + LU-7918 disk/memory space reserved for a
single-stripe creation goes down from ~33MB to 4.6MB.

Performance improvements from this patch are also significant.
Running mdtest create performance on a test node (ramdisk):

    Threads    0.6.5   0.6.5+patch
        1       9933       14279
        2      12870       20469
        4      16405       26407
        8      19320       28254
       16      15648       26620
       32      14107       26483

Change-Id: I0778ad8d13ba1f7a5fa5ad5d874fbb1bd7203958
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: http://review.whamcloud.com/19101
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_io.c
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c
lustre/osd-zfs/osd_xattr.c

index 8c1ffbf..5f86160 100644 (file)
@@ -165,7 +165,6 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
 
        LASSERT(lu_object_exists(lo));
        LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
        LASSERT(info);
 
        OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
@@ -425,8 +424,6 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
        int                 rc;
        ENTRY;
 
-       LASSERT(osd_object_is_zap(obj->oo_db));
-
        if (name[0] == '.') {
                if (name[1] == 0) {
                        const struct lu_fid *f = lu_object_fid(&dt->do_lu);
@@ -476,8 +473,10 @@ static int osd_declare_dir_insert(const struct lu_env *env,
        else
                object = obj->oo_db->db_object;
 
-       dmu_tx_hold_bonus(oh->ot_tx, object);
-       dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before insertion */
+       dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL);
 
        RETURN(0);
 }
@@ -624,7 +623,6 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(parent->oo_db);
-       LASSERT(osd_object_is_zap(parent->oo_db));
 
        LASSERT(dt_object_exists(dt));
        LASSERT(osd_invariant(parent));
@@ -726,12 +724,15 @@ static int osd_declare_dir_delete(const struct lu_env *env,
 
        if (dt_object_exists(dt)) {
                LASSERT(obj->oo_db);
-               LASSERT(osd_object_is_zap(obj->oo_db));
                dnode = obj->oo_db->db_object;
        } else {
                dnode = DMU_NEW_OBJECT;
        }
-       dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key);
+
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before deletion */
+       dmu_tx_hold_zap(oh->ot_tx, dnode, FALSE, NULL);
 
        RETURN(0);
 }
@@ -748,7 +749,6 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(zap_db);
-       LASSERT(osd_object_is_zap(zap_db));
 
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
@@ -1212,9 +1212,9 @@ static int osd_declare_index_insert(const struct lu_env *env,
 
        dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
 
-       /* It is not clear what API should be used for binary keys, so we pass
-        * a null name which has the side effect of over-reserving space,
-        * accounting for the worst case. See zap_count_write() */
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before insertion */
        dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
 
        RETURN(0);
@@ -1262,7 +1262,11 @@ static int osd_declare_index_delete(const struct lu_env *env,
        LASSERT(obj->oo_db);
 
        oh = container_of0(th, struct osd_thandle, ot_super);
-       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
+
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before deletion */
+       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, FALSE, NULL);
 
        RETURN(0);
 }
index 0706ea1..6fb7a6e 100644 (file)
@@ -347,10 +347,17 @@ struct osd_object {
 
        /* the i_flags in LMA */
        __u32                    oo_lma_flags;
-       /* record size for index file */
-       unsigned char            oo_keysize;
-       unsigned char            oo_recsize;
-       unsigned char            oo_recusize;   /* unit size */
+       union {
+               int             oo_ea_in_bonus; /* EA bytes we expect */
+               struct {
+                       /* record size for index file */
+                       unsigned char            oo_keysize;
+                       unsigned char            oo_recsize;
+                       unsigned char            oo_recusize;   /* unit size */
+               };
+       };
+
+
 };
 
 int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *);
@@ -465,10 +472,12 @@ int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
                         void *buf, uint32_t buflen, struct osd_thandle *oh);
 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
                     dmu_buf_t **zap_dbp, dmu_tx_t *tx, struct lu_attr *la,
-                    uint64_t parent, zap_flags_t flags);
+                    zap_flags_t flags);
 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
-                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la,
-                       uint64_t parent);
+                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la);
+int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
+                   sa_handle_t *sa_hdl, dmu_tx_t *tx,
+                   struct lu_attr *la, uint64_t parent);
 
 /* osd_oi.c */
 int osd_oi_init(const struct lu_env *env, struct osd_device *o);
@@ -476,7 +485,7 @@ void osd_oi_fini(const struct lu_env *env, struct osd_device *o);
 int osd_fid_lookup(const struct lu_env *env,
                   struct osd_device *, const struct lu_fid *, uint64_t *);
 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
-                           const struct lu_fid *fid, char *buf);
+                           const struct lu_fid *fid, char *buf, int bufsize);
 int osd_options_init(void);
 int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd,
                       __u64 seq);
@@ -525,6 +534,8 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
 int __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
                    const struct lu_buf *buf, const char *name, int fl,
                    struct osd_thandle *oh);
+int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
+                         struct osd_thandle *oh);
 static inline int
 osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
index eed35f5..d94bc97 100644 (file)
@@ -168,18 +168,11 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
         * LOHA_EXISTs is supposed to be the last step in the
         * initialization */
 
-       /* declare possible size change. notice we can't check
-        * current size here as another thread can change it */
-
-       if (dt_object_exists(dt)) {
-               LASSERT(obj->oo_db);
+       /* size change (in dnode) will be declared by dmu_tx_hold_write() */
+       if (dt_object_exists(dt))
                oid = obj->oo_db->db_object;
-
-               dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
-       } else {
+       else
                oid = DMU_NEW_OBJECT;
-               dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
-       }
 
        /* XXX: we still miss for append declaration support in ZFS
         *      -1 means append which is used by llog mostly, llog
index bd1fac2..ef912ad 100644 (file)
@@ -504,7 +504,6 @@ static int osd_declare_object_destroy(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct thandle *th)
 {
-       char                    *buf = osd_oti_get(env)->oti_str;
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
@@ -520,17 +519,14 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        LASSERT(oh->ot_tx != NULL);
 
        /* declare that we'll remove object from fid-dnode mapping */
-       zapid = osd_get_name_n_idx(env, osd, fid, buf);
-       dmu_tx_hold_bonus(oh->ot_tx, zapid);
-       dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, buf);
+       zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0);
+       dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, NULL);
 
        osd_declare_xattrs_destroy(env, obj, oh);
 
        /* declare that we'll remove object from inode accounting ZAPs */
-       dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, buf);
-       dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, buf);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
 
        /* one less inode */
        rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
@@ -557,7 +553,8 @@ static int osd_declare_object_destroy(const struct lu_env *env,
 static int osd_object_destroy(const struct lu_env *env,
                              struct dt_object *dt, struct thandle *th)
 {
-       char                    *buf = osd_oti_get(env)->oti_str;
+       struct osd_thread_info  *info = osd_oti_get(env);
+       char                    *buf = info->oti_str;
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
@@ -578,7 +575,7 @@ static int osd_object_destroy(const struct lu_env *env,
        LASSERT(oh->ot_tx != NULL);
 
        /* remove obj ref from index dir (it depends) */
-       zapid = osd_get_name_n_idx(env, osd, fid, buf);
+       zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str));
        rc = -zap_remove(osd->od_os, zapid, buf, oh->ot_tx);
        if (rc) {
                CERROR("%s: zap_remove(%s) failed: rc = %d\n",
@@ -846,13 +843,14 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                struct thandle *handle)
 {
        struct osd_thread_info  *info = osd_oti_get(env);
-       char                    *buf = osd_oti_get(env)->oti_str;
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
+       dmu_tx_hold_t           *txh;
        struct osd_thandle      *oh;
        uint64_t                 bspace;
        uint32_t                 blksize;
        int                      rc = 0;
+       bool                     found;
        ENTRY;
 
 
@@ -867,20 +865,39 @@ static int osd_declare_attr_set(const struct lu_env *env,
 
        LASSERT(obj->oo_sa_hdl != NULL);
        LASSERT(oh->ot_tx != NULL);
-       dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
+       /* regular attributes are part of the bonus buffer */
+       /* let's check whether this object is already part of
+        * transaction.. */
+       found = false;
+       for (txh = list_head(&oh->ot_tx->tx_holds); txh;
+            txh = list_next(&oh->ot_tx->tx_holds, txh)) {
+               if (txh->txh_dnode == NULL)
+                       continue;
+               if (txh->txh_dnode->dn_object != obj->oo_db->db_object)
+                       continue;
+               /* this object is part of the transaction already
+                * we don't need to declare bonus again */
+               found = true;
+               break;
+       }
+       if (!found)
+               dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
        if (oh->ot_tx->tx_err != 0)
                GOTO(out, rc = -oh->ot_tx->tx_err);
 
-       sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
-       bspace = toqb(bspace * blksize);
+       if (attr && attr->la_valid & LA_FLAGS) {
+               /* LMA is usually a part of bonus, no need to declare
+                * anything else */
+       }
 
-       __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
-                               XATTR_NAME_LMA, oh);
+       if (attr && (attr->la_valid & (LA_UID | LA_GID))) {
+               sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
+               bspace = toqb(bspace * blksize);
+       }
 
        if (attr && attr->la_valid & LA_UID) {
                /* account for user inode tracking ZAP update */
-               dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
-               dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
+               dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
 
                /* quota enforcement for user */
                if (attr->la_uid != obj->oo_attr.la_uid) {
@@ -894,8 +911,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
        }
        if (attr && attr->la_valid & LA_GID) {
                /* account for user inode tracking ZAP update */
-               dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
-               dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
+               dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
 
                /* quota enforcement for group */
                if (attr->la_gid != obj->oo_attr.la_gid) {
@@ -1121,13 +1137,12 @@ static int osd_declare_object_create(const struct lu_env *env,
                                     struct dt_object_format *dof,
                                     struct thandle *handle)
 {
-       char                    *buf = osd_oti_get(env)->oti_str;
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
        uint64_t                 zapid;
-       int                      rc;
+       int                      rc, dnode_size;
        ENTRY;
 
        LASSERT(dof);
@@ -1147,18 +1162,26 @@ static int osd_declare_object_create(const struct lu_env *env,
        oh = container_of0(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_tx != NULL);
 
+       /* this is the minimum set of EAs on every Lustre object */
+       obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE +
+                               sizeof(__u64) + /* VBR VERSION */
+                               sizeof(struct lustre_mdt_attrs); /* LMA */
+       /* reserve 32 bytes for extra stuff like ACLs */
+       dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32);
+
        switch (dof->dof_type) {
                case DFT_DIR:
                        dt->do_index_ops = &osd_dir_ops;
                case DFT_INDEX:
                        /* for zap create */
-                       dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL);
+                       dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, FALSE, NULL);
+                       dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
                        break;
                case DFT_REGULAR:
                case DFT_SYM:
                case DFT_NODE:
                        /* first, we'll create new object */
-                       dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
+                       dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
                        break;
 
                default:
@@ -1167,20 +1190,12 @@ static int osd_declare_object_create(const struct lu_env *env,
        }
 
        /* and we'll add it to some mapping */
-       zapid = osd_get_name_n_idx(env, osd, fid, buf);
-       dmu_tx_hold_bonus(oh->ot_tx, zapid);
-       dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, buf);
+       zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0);
+       dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, NULL);
 
        /* we will also update inode accounting ZAPs */
-       dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
-       dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
-
-       dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
-
-       __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
-                               XATTR_NAME_LMA, oh);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
 
        rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
                               false, NULL, false);
@@ -1188,10 +1203,9 @@ static int osd_declare_object_create(const struct lu_env *env,
 }
 
 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
-                   uint64_t oid, dmu_tx_t *tx, struct lu_attr *la,
-                   uint64_t parent)
+                   sa_handle_t *sa_hdl, dmu_tx_t *tx,
+                   struct lu_attr *la, uint64_t parent)
 {
-       sa_handle_t     *sa_hdl;
        sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
        struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
        uint64_t         gen;
@@ -1200,9 +1214,10 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
        int              cnt;
        int              rc;
 
-       gethrestime(&now);
-       gen = dmu_tx_get_txg(tx);
+       LASSERT(sa_hdl);
 
+       gen = dmu_tx_get_txg(tx);
+       gethrestime(&now);
        ZFS_TIME_ENCODE(&now, crtime);
 
        osa->atime[0] = la->la_atime;
@@ -1216,11 +1231,6 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
        osa->flags = attrs_fs2zfs(la->la_flags);
        osa->size  = la->la_size;
 
-       /* Now add in all of the "SA" attributes */
-       rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
-               return rc;
-
        /*
         * we need to create all SA below upon object create.
         *
@@ -1249,7 +1259,6 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
 
        rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
 
-       sa_handle_destroy(sa_hdl);
        return rc;
 }
 
@@ -1259,8 +1268,7 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
  * to a transaction group.
  */
 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
-                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la,
-                       uint64_t parent)
+                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la)
 {
        uint64_t             oid;
        int                  rc;
@@ -1268,10 +1276,6 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
        const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
        dmu_object_type_t    type = DMU_OT_PLAIN_FILE_CONTENTS;
 
-       /* Assert that the transaction has been assigned to a
-          transaction group. */
-       LASSERT(tx->tx_txg != 0);
-
        /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
         * would get an additional ditto copy */
        if (unlikely(S_ISREG(la->la_mode) &&
@@ -1287,14 +1291,6 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
        la->la_size = 0;
        la->la_nlink = 1;
 
-       rc = __osd_attr_init(env, osd, oid, tx, la, parent);
-       if (rc != 0) {
-               sa_buf_rele(*dbp, osd_obj_tag);
-               *dbp = NULL;
-               dmu_object_free(osd->od_os, oid, tx);
-               return rc;
-       }
-
        return 0;
 }
 
@@ -1310,7 +1306,7 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
  * a conversion from the different internal ZAP hash formats being used. */
 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
                     dmu_buf_t **zap_dbp, dmu_tx_t *tx,
-                    struct lu_attr *la, uint64_t parent, zap_flags_t flags)
+                    struct lu_attr *la, zap_flags_t flags)
 {
        uint64_t oid;
        int      rc;
@@ -1329,16 +1325,14 @@ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
        if (rc)
                return rc;
 
-       LASSERT(la->la_valid & LA_MODE);
        la->la_size = 2;
        la->la_nlink = 1;
 
-       return __osd_attr_init(env, osd, oid, tx, la, parent);
+       return 0;
 }
 
 static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, uint64_t parent,
-                           struct osd_thandle *oh)
+                           struct lu_attr *la, struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
@@ -1348,7 +1342,7 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
         * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
         * binary keys */
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, parent,
+       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la,
                              ZAP_FLAG_UINT64_KEY);
        if (rc)
                return ERR_PTR(rc);
@@ -1356,23 +1350,20 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
 }
 
 static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, uint64_t parent,
-                           struct osd_thandle *oh)
+                           struct lu_attr *la, struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
 
        LASSERT(S_ISDIR(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &db,
-                             oh->ot_tx, la, parent, 0);
+       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, 0);
        if (rc)
                return ERR_PTR(rc);
        return db;
 }
 
 static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, uint64_t parent,
-                           struct osd_thandle *oh)
+                           struct lu_attr *la, struct osd_thandle *oh)
 {
        const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
        dmu_buf_t           *db;
@@ -1380,7 +1371,7 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
        struct osd_device *osd = osd_obj2dev(obj);
 
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent);
+       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
 
@@ -1402,31 +1393,28 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
 }
 
 static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, uint64_t parent,
-                           struct osd_thandle *oh)
+                           struct lu_attr *la, struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
 
        LASSERT(S_ISLNK(la->la_mode));
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent);
+       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
        return db;
 }
 
 static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, uint64_t parent,
-                           struct osd_thandle *oh)
+                           struct lu_attr *la, struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
 
-       la->la_valid = LA_MODE;
        if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
                la->la_valid |= LA_RDEV;
 
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent);
+       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
        return db;
@@ -1435,7 +1423,6 @@ static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
 typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env,
                                     struct osd_object *obj,
                                     struct lu_attr *la,
-                                    uint64_t parent,
                                     struct osd_thandle *oh);
 
 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
@@ -1466,28 +1453,6 @@ static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
 }
 
 /*
- * Primitives for directory (i.e. ZAP) handling
- */
-static inline int osd_init_lma(const struct lu_env *env, struct osd_object *obj,
-                              const struct lu_fid *fid, struct osd_thandle *oh)
-{
-       struct osd_thread_info  *info = osd_oti_get(env);
-       struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
-       struct lu_buf            buf;
-       int rc;
-
-       lustre_lma_init(lma, fid, 0, 0);
-       lustre_lma_swab(lma);
-       buf.lb_buf = lma;
-       buf.lb_len = sizeof(*lma);
-
-       rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
-                                   LU_XATTR_CREATE, oh);
-
-       return rc;
-}
-
-/*
  * Concurrency: @dt is write locked.
  */
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
@@ -1496,14 +1461,16 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                             struct dt_object_format *dof,
                             struct thandle *th)
 {
-       struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
+       struct zpl_direntry     *zde = &info->oti_zde.lzd_reg;
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
-       char                    *buf = osd_oti_get(env)->oti_str;
+       char                    *buf = info->oti_str;
        struct osd_thandle      *oh;
-       dmu_buf_t               *db;
-       uint64_t                 zapid;
+       dmu_buf_t               *db = NULL;
+       uint64_t                 zapid, parent = 0;
        int                      rc;
 
        ENTRY;
@@ -1530,25 +1497,68 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 
        /* to follow ZFS on-disk format we need
         * to initialize parent dnode properly */
-       zapid = 0;
        if (hint != NULL && hint->dah_parent != NULL &&
            !dt_object_remote(hint->dah_parent))
-               zapid = osd_dt_obj(hint->dah_parent)->oo_db->db_object;
+               parent = osd_dt_obj(hint->dah_parent)->oo_db->db_object;
 
-       db = osd_create_type_f(dof->dof_type)(env, obj, attr, zapid, oh);
-       if (IS_ERR(db))
-               GOTO(out, rc = PTR_ERR(db));
+       /* we may fix some attributes, better do not change the source */
+       obj->oo_attr = *attr;
+       obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE;
+
+       db = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
+       if (IS_ERR(db)) {
+               rc = PTR_ERR(db);
+               db = NULL;
+               GOTO(out, rc);
+       }
 
        zde->zde_pad = 0;
        zde->zde_dnode = db->db_object;
        zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
 
-       zapid = osd_get_name_n_idx(env, osd, fid, buf);
+       zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str));
 
        rc = -zap_add(osd->od_os, zapid, buf, 8, 1, zde, oh->ot_tx);
        if (rc)
                GOTO(out, rc);
 
+       /* Now add in all of the "SA" attributes */
+       rc = -sa_handle_get(osd->od_os, db->db_object, NULL,
+                           SA_HDL_PRIVATE, &obj->oo_sa_hdl);
+       if (rc)
+               GOTO(out, rc);
+
+       /* configure new osd object */
+       obj->oo_db = db;
+       parent = parent != 0 ? parent : zapid;
+       rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx,
+                            &obj->oo_attr, parent);
+       if (rc)
+               GOTO(out, rc);
+
+       /* XXX: oo_lma_flags */
+       obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
+       smp_mb();
+       obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
+       if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu))))
+               /* no body operations for accounting objects */
+               obj->oo_dt.do_body_ops = &osd_body_ops;
+
+       rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
+       if (rc)
+               GOTO(out, rc);
+
+       /* initialize LMA */
+       lustre_lma_init(lma, lu_object_fid(&obj->oo_dt.do_lu), 0, 0);
+       lustre_lma_swab(lma);
+       rc = -nvlist_add_byte_array(obj->oo_sa_xattr, XATTR_NAME_LMA,
+                                   (uchar_t *)lma, sizeof(*lma));
+       if (rc)
+               GOTO(out, rc);
+       rc = __osd_sa_xattr_update(env, obj, oh);
+       if (rc)
+               GOTO(out, rc);
+
        /* Add new object to inode accounting.
         * Errors are not considered as fatal */
        rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid,
@@ -1564,18 +1574,12 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d "
                        "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc);
 
-       /* configure new osd object */
-       obj->oo_db = db;
-       rc = osd_object_init0(env, obj);
-       LASSERT(ergo(rc == 0, dt_object_exists(dt)));
-       LASSERT(osd_invariant(obj));
-
-       rc = osd_init_lma(env, obj, fid, oh);
-       if (rc != 0)
-               CERROR("%s: can not set LMA on "DFID": rc = %d\n",
-                      osd->od_svname, PFID(fid), rc);
-
 out:
+       if (unlikely(rc && db)) {
+               dmu_object_free(osd->od_os, db->db_object, oh->ot_tx);
+               sa_buf_rele(db, osd_obj_tag);
+               obj->oo_db = NULL;
+       }
        up_write(&obj->oo_guard);
        RETURN(rc);
 }
index 560c90f..6d3df99 100644 (file)
@@ -142,8 +142,9 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o,
 {
        struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
        struct lu_attr          *la = &osd_oti_get(env)->oti_la;
-       dmu_buf_t               *db;
+       sa_handle_t             *sa_hdl = NULL;
        dmu_tx_t                *tx;
+       uint64_t                 oid;
        int                      rc;
 
        /* verify it doesn't already exist */
@@ -168,21 +169,36 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o,
                return rc;
        }
 
+       oid = zap_create_flags(o->od_os, 0, ZAP_FLAG_HASH64,
+                              DMU_OT_DIRECTORY_CONTENTS,
+                              14, /* == ZFS fzap_default_block_shift */
+                              DN_MAX_INDBLKSHIFT, /* indirect block shift */
+                              DMU_OT_SA, DN_MAX_BONUSLEN, tx);
+
+       rc = -sa_handle_get(o->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc)
+               goto commit;
        la->la_valid = LA_MODE | LA_UID | LA_GID;
        la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
        la->la_uid = la->la_gid = 0;
-       __osd_zap_create(env, o, &db, tx, la, parent, 0);
+       rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent);
+       sa_handle_destroy(sa_hdl);
+       if (rc)
+               goto commit;
 
-       zde->zde_dnode = db->db_object;
+       zde->zde_dnode = oid;
        zde->zde_pad = 0;
        zde->zde_type = IFTODT(S_IFDIR);
 
        rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx);
 
+commit:
+       if (rc)
+               dmu_object_free(o->od_os, oid, tx);
        dmu_tx_commit(tx);
 
-       *child = db->db_object;
-       sa_buf_rele(db, osd_obj_tag);
+       if (rc == 0)
+               *child = oid;
 
        return rc;
 }
@@ -366,7 +382,7 @@ out:
  */
 static uint64_t
 osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
-                       const struct lu_fid *fid, char *buf)
+                       const struct lu_fid *fid, char *buf, int bufsize)
 {
        struct osd_seq  *osd_seq;
        unsigned long   b;
@@ -391,7 +407,8 @@ osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
        b = id % OSD_OST_MAP_SIZE;
        LASSERT(osd_seq->os_compat_dirs[b]);
 
-       sprintf(buf, LPU64, id);
+       if (buf)
+               snprintf(buf, bufsize, LPU64, id);
 
        return osd_seq->os_compat_dirs[b];
 }
@@ -416,28 +433,29 @@ osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
 
        LASSERT(osd->od_oi_table != NULL);
        oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
-       osd_fid2str(buf, fid);
+       if (buf)
+               osd_fid2str(buf, fid);
 
        return oi->oi_zapid;
 }
 
 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
-                           const struct lu_fid *fid, char *buf)
+                           const struct lu_fid *fid, char *buf, int bufsize)
 {
        uint64_t zapid;
 
        LASSERT(fid);
-       LASSERT(buf);
 
        if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) {
-               zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf);
+               zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf, bufsize);
        } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
                /* special objects with fixed known fids get their name */
                char *name = oid2name(fid_oid(fid));
 
                if (name) {
                        zapid = osd->od_root;
-                       strcpy(buf, name);
+                       if (buf)
+                               strncpy(buf, name, bufsize);
                        if (fid_is_acct(fid))
                                zapid = MASTER_NODE_OBJ;
                } else {
@@ -477,8 +495,8 @@ int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev,
        } else if (unlikely(fid_is_fs_root(fid))) {
                *oid = dev->od_root;
        } else {
-               zapid = osd_get_name_n_idx(env, dev, fid, buf);
-
+               zapid = osd_get_name_n_idx(env, dev, fid, buf,
+                                          sizeof(info->oti_buf));
                rc = -zap_lookup(dev->od_os, zapid, buf,
                                8, 1, &info->oti_zde);
                if (rc)
index 218464f..0ea9977 100644 (file)
@@ -267,39 +267,18 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc);
 }
 
-void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
-                            int vallen, const char *name,
-                            struct osd_thandle *oh)
+/* the function is used to declare EAs when SA is not supported */
+void __osd_xattr_declare_legacy(const struct lu_env *env,
+                               struct osd_object *obj,
+                               int vallen, const char *name,
+                               struct osd_thandle *oh)
 {
        struct osd_device *osd = osd_obj2dev(obj);
-       dmu_buf_t         *db = obj->oo_db;
-       dmu_tx_t          *tx = oh->ot_tx;
-       uint64_t           xa_data_obj;
-       int                rc = 0;
-       int                here;
-
-       if (unlikely(obj->oo_destroyed))
-               return;
-
-       here = dt_object_exists(&obj->oo_dt);
-
-       /* object may be not yet created */
-       if (here) {
-               LASSERT(db);
-               LASSERT(obj->oo_sa_hdl);
-               /* we might just update SA_ZPL_DXATTR */
-               dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
-
-               if (obj->oo_xattr == ZFS_NO_OBJECT)
-                       rc = -ENOENT;
-       }
+       dmu_tx_t *tx = oh->ot_tx;
+       uint64_t xa_data_obj;
+       int rc;
 
-       if (!here || rc == -ENOENT) {
-               /* we'll be updating SA_ZPL_XATTR */
-               if (here) {
-                       LASSERT(obj->oo_sa_hdl);
-                       dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
-               }
+       if (obj->oo_xattr == ZFS_NO_OBJECT) {
                /* xattr zap + entry */
                dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name);
                /* xattr value obj */
@@ -318,7 +297,6 @@ void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
                dmu_tx_hold_bonus(tx, xa_data_obj);
                dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END);
                dmu_tx_hold_write(tx, xa_data_obj, 0, vallen);
-               return;
        } else if (rc == -ENOENT) {
                /*
                 * Entry doesn't exist, we need to create a new one and a new
@@ -328,11 +306,43 @@ void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
                dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name);
                dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
                dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
+       }
+}
+
+void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
+                            int vallen, const char *name,
+                            struct osd_thandle *oh)
+{
+       dmu_buf_t         *db = obj->oo_db;
+       dmu_tx_t          *tx = oh->ot_tx;
+
+       if (unlikely(obj->oo_destroyed))
+               return;
+
+       if (unlikely(!osd_obj2dev(obj)->od_xattr_in_sa)) {
+               __osd_xattr_declare_legacy(env, obj, vallen, name, oh);
                return;
        }
 
-       /* An error happened */
-       tx->tx_err = -rc;
+       /* declare EA in SA */
+       if (dt_object_exists(&obj->oo_dt)) {
+               LASSERT(obj->oo_sa_hdl);
+               /* XXX: it should be possible to skip spill
+                * declaration if specific EA is part of
+                * bonus and doesn't grow */
+               dmu_tx_hold_spill(tx, db->db_object);
+               return;
+       }
+
+       /* the object doesn't exist, but we've declared bonus
+        * in osd_declare_object_create() yet */
+       if (obj->oo_ea_in_bonus > DN_MAX_BONUSLEN) {
+               /* spill has been declared already */
+       } else if (obj->oo_ea_in_bonus + vallen > DN_MAX_BONUSLEN) {
+               /* we're about to exceed bonus, let's declare spill */
+               dmu_tx_hold_spill(tx, DMU_NEW_OBJECT);
+       }
+       obj->oo_ea_in_bonus += vallen;
 }
 
 int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
@@ -494,8 +504,7 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
 
                la->la_valid = LA_MODE;
                la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
-               rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la,
-                                     obj->oo_db->db_object, 0);
+               rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la, 0);
                if (rc)
                        return rc;
 
@@ -550,8 +559,7 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
 
                la->la_valid = LA_MODE;
                la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
-               rc = __osd_object_create(env, obj, &xa_data_db, tx, la,
-                                        obj->oo_xattr);
+               rc = __osd_object_create(env, obj, &xa_data_db, tx, la);
                if (rc)
                        goto out;
                xa_data_obj = xa_data_db->db_object;