Whamcloud - gitweb
Revert "LU-7898 osd: remove unnecessary declarations" 93/22293/2
authorOleg Drokin <oleg.drokin@intel.com>
Fri, 2 Sep 2016 16:38:18 +0000 (16:38 +0000)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 2 Sep 2016 16:43:08 +0000 (16:43 +0000)
This patch causes build failures in master due to
reverted LU-7899 6cd79ab5860c5 patch that I failed
to catch in time due to deficiency in my build process.

This cannot be easily fixed since apparently a big
chunk of functionality was yanked from under this patch,
so I can only revert it for now.

This reverts commit ead6df2feee9c143b617cb60e50e403c955bd401.

Change-Id: I5ee89bf0c9260312f157c251b83dd417fa2cf260
Reviewed-on: http://review.whamcloud.com/22293
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_io.c
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c
lustre/osd-zfs/osd_xattr.c

index 5f86160..8c1ffbf 100644 (file)
@@ -165,6 +165,7 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
 
        LASSERT(lu_object_exists(lo));
        LASSERT(obj->oo_db);
+       LASSERT(osd_object_is_zap(obj->oo_db));
        LASSERT(info);
 
        OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
@@ -424,6 +425,8 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
        int                 rc;
        ENTRY;
 
+       LASSERT(osd_object_is_zap(obj->oo_db));
+
        if (name[0] == '.') {
                if (name[1] == 0) {
                        const struct lu_fid *f = lu_object_fid(&dt->do_lu);
@@ -473,10 +476,8 @@ static int osd_declare_dir_insert(const struct lu_env *env,
        else
                object = obj->oo_db->db_object;
 
-       /* do not specify the key as then DMU is trying to look it up
-        * which is very expensive. usually the layers above lookup
-        * before insertion */
-       dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL);
+       dmu_tx_hold_bonus(oh->ot_tx, object);
+       dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
 
        RETURN(0);
 }
@@ -623,6 +624,7 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(parent->oo_db);
+       LASSERT(osd_object_is_zap(parent->oo_db));
 
        LASSERT(dt_object_exists(dt));
        LASSERT(osd_invariant(parent));
@@ -724,15 +726,12 @@ static int osd_declare_dir_delete(const struct lu_env *env,
 
        if (dt_object_exists(dt)) {
                LASSERT(obj->oo_db);
+               LASSERT(osd_object_is_zap(obj->oo_db));
                dnode = obj->oo_db->db_object;
        } else {
                dnode = DMU_NEW_OBJECT;
        }
-
-       /* do not specify the key as then DMU is trying to look it up
-        * which is very expensive. usually the layers above lookup
-        * before deletion */
-       dmu_tx_hold_zap(oh->ot_tx, dnode, FALSE, NULL);
+       dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key);
 
        RETURN(0);
 }
@@ -749,6 +748,7 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(zap_db);
+       LASSERT(osd_object_is_zap(zap_db));
 
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
@@ -1212,9 +1212,9 @@ static int osd_declare_index_insert(const struct lu_env *env,
 
        dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
 
-       /* do not specify the key as then DMU is trying to look it up
-        * which is very expensive. usually the layers above lookup
-        * before insertion */
+       /* It is not clear what API should be used for binary keys, so we pass
+        * a null name which has the side effect of over-reserving space,
+        * accounting for the worst case. See zap_count_write() */
        dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
 
        RETURN(0);
@@ -1262,11 +1262,7 @@ static int osd_declare_index_delete(const struct lu_env *env,
        LASSERT(obj->oo_db);
 
        oh = container_of0(th, struct osd_thandle, ot_super);
-
-       /* do not specify the key as then DMU is trying to look it up
-        * which is very expensive. usually the layers above lookup
-        * before deletion */
-       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, FALSE, NULL);
+       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
 
        RETURN(0);
 }
index 6fb7a6e..0706ea1 100644 (file)
@@ -347,17 +347,10 @@ struct osd_object {
 
        /* the i_flags in LMA */
        __u32                    oo_lma_flags;
-       union {
-               int             oo_ea_in_bonus; /* EA bytes we expect */
-               struct {
-                       /* record size for index file */
-                       unsigned char            oo_keysize;
-                       unsigned char            oo_recsize;
-                       unsigned char            oo_recusize;   /* unit size */
-               };
-       };
-
-
+       /* record size for index file */
+       unsigned char            oo_keysize;
+       unsigned char            oo_recsize;
+       unsigned char            oo_recusize;   /* unit size */
 };
 
 int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *);
@@ -472,12 +465,10 @@ int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
                         void *buf, uint32_t buflen, struct osd_thandle *oh);
 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
                     dmu_buf_t **zap_dbp, dmu_tx_t *tx, struct lu_attr *la,
-                    zap_flags_t flags);
+                    uint64_t parent, zap_flags_t flags);
 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
-                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la);
-int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
-                   sa_handle_t *sa_hdl, dmu_tx_t *tx,
-                   struct lu_attr *la, uint64_t parent);
+                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la,
+                       uint64_t parent);
 
 /* osd_oi.c */
 int osd_oi_init(const struct lu_env *env, struct osd_device *o);
@@ -485,7 +476,7 @@ void osd_oi_fini(const struct lu_env *env, struct osd_device *o);
 int osd_fid_lookup(const struct lu_env *env,
                   struct osd_device *, const struct lu_fid *, uint64_t *);
 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
-                           const struct lu_fid *fid, char *buf, int bufsize);
+                           const struct lu_fid *fid, char *buf);
 int osd_options_init(void);
 int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd,
                       __u64 seq);
@@ -534,8 +525,6 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
 int __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
                    const struct lu_buf *buf, const char *name, int fl,
                    struct osd_thandle *oh);
-int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
-                         struct osd_thandle *oh);
 static inline int
 osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
index d94bc97..eed35f5 100644 (file)
@@ -168,11 +168,18 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
         * LOHA_EXISTs is supposed to be the last step in the
         * initialization */
 
-       /* size change (in dnode) will be declared by dmu_tx_hold_write() */
-       if (dt_object_exists(dt))
+       /* declare possible size change. notice we can't check
+        * current size here as another thread can change it */
+
+       if (dt_object_exists(dt)) {
+               LASSERT(obj->oo_db);
                oid = obj->oo_db->db_object;
-       else
+
+               dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
+       } else {
                oid = DMU_NEW_OBJECT;
+               dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
+       }
 
        /* XXX: we still miss for append declaration support in ZFS
         *      -1 means append which is used by llog mostly, llog
index ef912ad..bd1fac2 100644 (file)
@@ -504,6 +504,7 @@ static int osd_declare_object_destroy(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct thandle *th)
 {
+       char                    *buf = osd_oti_get(env)->oti_str;
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
@@ -519,14 +520,17 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        LASSERT(oh->ot_tx != NULL);
 
        /* declare that we'll remove object from fid-dnode mapping */
-       zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0);
-       dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, NULL);
+       zapid = osd_get_name_n_idx(env, osd, fid, buf);
+       dmu_tx_hold_bonus(oh->ot_tx, zapid);
+       dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, buf);
 
        osd_declare_xattrs_destroy(env, obj, oh);
 
        /* declare that we'll remove object from inode accounting ZAPs */
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
+       dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, buf);
+       dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, buf);
 
        /* one less inode */
        rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
@@ -553,8 +557,7 @@ static int osd_declare_object_destroy(const struct lu_env *env,
 static int osd_object_destroy(const struct lu_env *env,
                              struct dt_object *dt, struct thandle *th)
 {
-       struct osd_thread_info  *info = osd_oti_get(env);
-       char                    *buf = info->oti_str;
+       char                    *buf = osd_oti_get(env)->oti_str;
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
@@ -575,7 +578,7 @@ static int osd_object_destroy(const struct lu_env *env,
        LASSERT(oh->ot_tx != NULL);
 
        /* remove obj ref from index dir (it depends) */
-       zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str));
+       zapid = osd_get_name_n_idx(env, osd, fid, buf);
        rc = -zap_remove(osd->od_os, zapid, buf, oh->ot_tx);
        if (rc) {
                CERROR("%s: zap_remove(%s) failed: rc = %d\n",
@@ -843,14 +846,13 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                struct thandle *handle)
 {
        struct osd_thread_info  *info = osd_oti_get(env);
+       char                    *buf = osd_oti_get(env)->oti_str;
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
-       dmu_tx_hold_t           *txh;
        struct osd_thandle      *oh;
        uint64_t                 bspace;
        uint32_t                 blksize;
        int                      rc = 0;
-       bool                     found;
        ENTRY;
 
 
@@ -865,39 +867,20 @@ static int osd_declare_attr_set(const struct lu_env *env,
 
        LASSERT(obj->oo_sa_hdl != NULL);
        LASSERT(oh->ot_tx != NULL);
-       /* regular attributes are part of the bonus buffer */
-       /* let's check whether this object is already part of
-        * transaction.. */
-       found = false;
-       for (txh = list_head(&oh->ot_tx->tx_holds); txh;
-            txh = list_next(&oh->ot_tx->tx_holds, txh)) {
-               if (txh->txh_dnode == NULL)
-                       continue;
-               if (txh->txh_dnode->dn_object != obj->oo_db->db_object)
-                       continue;
-               /* this object is part of the transaction already
-                * we don't need to declare bonus again */
-               found = true;
-               break;
-       }
-       if (!found)
-               dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
+       dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
        if (oh->ot_tx->tx_err != 0)
                GOTO(out, rc = -oh->ot_tx->tx_err);
 
-       if (attr && attr->la_valid & LA_FLAGS) {
-               /* LMA is usually a part of bonus, no need to declare
-                * anything else */
-       }
+       sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
+       bspace = toqb(bspace * blksize);
 
-       if (attr && (attr->la_valid & (LA_UID | LA_GID))) {
-               sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
-               bspace = toqb(bspace * blksize);
-       }
+       __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
+                               XATTR_NAME_LMA, oh);
 
        if (attr && attr->la_valid & LA_UID) {
                /* account for user inode tracking ZAP update */
-               dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
+               dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
+               dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
 
                /* quota enforcement for user */
                if (attr->la_uid != obj->oo_attr.la_uid) {
@@ -911,7 +894,8 @@ static int osd_declare_attr_set(const struct lu_env *env,
        }
        if (attr && attr->la_valid & LA_GID) {
                /* account for user inode tracking ZAP update */
-               dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
+               dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
+               dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
 
                /* quota enforcement for group */
                if (attr->la_gid != obj->oo_attr.la_gid) {
@@ -1137,12 +1121,13 @@ static int osd_declare_object_create(const struct lu_env *env,
                                     struct dt_object_format *dof,
                                     struct thandle *handle)
 {
+       char                    *buf = osd_oti_get(env)->oti_str;
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
        uint64_t                 zapid;
-       int                      rc, dnode_size;
+       int                      rc;
        ENTRY;
 
        LASSERT(dof);
@@ -1162,26 +1147,18 @@ static int osd_declare_object_create(const struct lu_env *env,
        oh = container_of0(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_tx != NULL);
 
-       /* this is the minimum set of EAs on every Lustre object */
-       obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE +
-                               sizeof(__u64) + /* VBR VERSION */
-                               sizeof(struct lustre_mdt_attrs); /* LMA */
-       /* reserve 32 bytes for extra stuff like ACLs */
-       dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32);
-
        switch (dof->dof_type) {
                case DFT_DIR:
                        dt->do_index_ops = &osd_dir_ops;
                case DFT_INDEX:
                        /* for zap create */
-                       dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, FALSE, NULL);
-                       dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
+                       dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL);
                        break;
                case DFT_REGULAR:
                case DFT_SYM:
                case DFT_NODE:
                        /* first, we'll create new object */
-                       dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
+                       dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
                        break;
 
                default:
@@ -1190,12 +1167,20 @@ static int osd_declare_object_create(const struct lu_env *env,
        }
 
        /* and we'll add it to some mapping */
-       zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0);
-       dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, NULL);
+       zapid = osd_get_name_n_idx(env, osd, fid, buf);
+       dmu_tx_hold_bonus(oh->ot_tx, zapid);
+       dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, buf);
 
        /* we will also update inode accounting ZAPs */
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
+       dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
+       dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
+
+       dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
+
+       __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
+                               XATTR_NAME_LMA, oh);
 
        rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
                               false, NULL, false);
@@ -1203,9 +1188,10 @@ static int osd_declare_object_create(const struct lu_env *env,
 }
 
 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
-                   sa_handle_t *sa_hdl, dmu_tx_t *tx,
-                   struct lu_attr *la, uint64_t parent)
+                   uint64_t oid, dmu_tx_t *tx, struct lu_attr *la,
+                   uint64_t parent)
 {
+       sa_handle_t     *sa_hdl;
        sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
        struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
        uint64_t         gen;
@@ -1214,10 +1200,9 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
        int              cnt;
        int              rc;
 
-       LASSERT(sa_hdl);
-
-       gen = dmu_tx_get_txg(tx);
        gethrestime(&now);
+       gen = dmu_tx_get_txg(tx);
+
        ZFS_TIME_ENCODE(&now, crtime);
 
        osa->atime[0] = la->la_atime;
@@ -1231,6 +1216,11 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
        osa->flags = attrs_fs2zfs(la->la_flags);
        osa->size  = la->la_size;
 
+       /* Now add in all of the "SA" attributes */
+       rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc)
+               return rc;
+
        /*
         * we need to create all SA below upon object create.
         *
@@ -1259,6 +1249,7 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
 
        rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
 
+       sa_handle_destroy(sa_hdl);
        return rc;
 }
 
@@ -1268,7 +1259,8 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
  * to a transaction group.
  */
 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
-                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la)
+                       dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la,
+                       uint64_t parent)
 {
        uint64_t             oid;
        int                  rc;
@@ -1276,6 +1268,10 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
        const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
        dmu_object_type_t    type = DMU_OT_PLAIN_FILE_CONTENTS;
 
+       /* Assert that the transaction has been assigned to a
+          transaction group. */
+       LASSERT(tx->tx_txg != 0);
+
        /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
         * would get an additional ditto copy */
        if (unlikely(S_ISREG(la->la_mode) &&
@@ -1291,6 +1287,14 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
        la->la_size = 0;
        la->la_nlink = 1;
 
+       rc = __osd_attr_init(env, osd, oid, tx, la, parent);
+       if (rc != 0) {
+               sa_buf_rele(*dbp, osd_obj_tag);
+               *dbp = NULL;
+               dmu_object_free(osd->od_os, oid, tx);
+               return rc;
+       }
+
        return 0;
 }
 
@@ -1306,7 +1310,7 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
  * a conversion from the different internal ZAP hash formats being used. */
 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
                     dmu_buf_t **zap_dbp, dmu_tx_t *tx,
-                    struct lu_attr *la, zap_flags_t flags)
+                    struct lu_attr *la, uint64_t parent, zap_flags_t flags)
 {
        uint64_t oid;
        int      rc;
@@ -1325,14 +1329,16 @@ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
        if (rc)
                return rc;
 
+       LASSERT(la->la_valid & LA_MODE);
        la->la_size = 2;
        la->la_nlink = 1;
 
-       return 0;
+       return __osd_attr_init(env, osd, oid, tx, la, parent);
 }
 
 static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+                           struct lu_attr *la, uint64_t parent,
+                           struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
@@ -1342,7 +1348,7 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
         * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
         * binary keys */
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la,
+       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, parent,
                              ZAP_FLAG_UINT64_KEY);
        if (rc)
                return ERR_PTR(rc);
@@ -1350,20 +1356,23 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
 }
 
 static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+                           struct lu_attr *la, uint64_t parent,
+                           struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
 
        LASSERT(S_ISDIR(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, 0);
+       rc = __osd_zap_create(env, osd_obj2dev(obj), &db,
+                             oh->ot_tx, la, parent, 0);
        if (rc)
                return ERR_PTR(rc);
        return db;
 }
 
 static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+                           struct lu_attr *la, uint64_t parent,
+                           struct osd_thandle *oh)
 {
        const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
        dmu_buf_t           *db;
@@ -1371,7 +1380,7 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
        struct osd_device *osd = osd_obj2dev(obj);
 
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent);
        if (rc)
                return ERR_PTR(rc);
 
@@ -1393,28 +1402,31 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
 }
 
 static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+                           struct lu_attr *la, uint64_t parent,
+                           struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
 
        LASSERT(S_ISLNK(la->la_mode));
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent);
        if (rc)
                return ERR_PTR(rc);
        return db;
 }
 
 static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
-                           struct lu_attr *la, struct osd_thandle *oh)
+                           struct lu_attr *la, uint64_t parent,
+                           struct osd_thandle *oh)
 {
        dmu_buf_t *db;
        int        rc;
 
+       la->la_valid = LA_MODE;
        if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
                la->la_valid |= LA_RDEV;
 
-       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la);
+       rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent);
        if (rc)
                return ERR_PTR(rc);
        return db;
@@ -1423,6 +1435,7 @@ static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
 typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env,
                                     struct osd_object *obj,
                                     struct lu_attr *la,
+                                    uint64_t parent,
                                     struct osd_thandle *oh);
 
 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
@@ -1453,6 +1466,28 @@ static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
 }
 
 /*
+ * Primitives for directory (i.e. ZAP) handling
+ */
+static inline int osd_init_lma(const struct lu_env *env, struct osd_object *obj,
+                              const struct lu_fid *fid, struct osd_thandle *oh)
+{
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
+       struct lu_buf            buf;
+       int rc;
+
+       lustre_lma_init(lma, fid, 0, 0);
+       lustre_lma_swab(lma);
+       buf.lb_buf = lma;
+       buf.lb_len = sizeof(*lma);
+
+       rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
+                                   LU_XATTR_CREATE, oh);
+
+       return rc;
+}
+
+/*
  * Concurrency: @dt is write locked.
  */
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
@@ -1461,16 +1496,14 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                             struct dt_object_format *dof,
                             struct thandle *th)
 {
-       struct osd_thread_info  *info = osd_oti_get(env);
-       struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
-       struct zpl_direntry     *zde = &info->oti_zde.lzd_reg;
+       struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
-       char                    *buf = info->oti_str;
+       char                    *buf = osd_oti_get(env)->oti_str;
        struct osd_thandle      *oh;
-       dmu_buf_t               *db = NULL;
-       uint64_t                 zapid, parent = 0;
+       dmu_buf_t               *db;
+       uint64_t                 zapid;
        int                      rc;
 
        ENTRY;
@@ -1497,68 +1530,25 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 
        /* to follow ZFS on-disk format we need
         * to initialize parent dnode properly */
+       zapid = 0;
        if (hint != NULL && hint->dah_parent != NULL &&
            !dt_object_remote(hint->dah_parent))
-               parent = osd_dt_obj(hint->dah_parent)->oo_db->db_object;
+               zapid = osd_dt_obj(hint->dah_parent)->oo_db->db_object;
 
-       /* we may fix some attributes, better do not change the source */
-       obj->oo_attr = *attr;
-       obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE;
-
-       db = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
-       if (IS_ERR(db)) {
-               rc = PTR_ERR(db);
-               db = NULL;
-               GOTO(out, rc);
-       }
+       db = osd_create_type_f(dof->dof_type)(env, obj, attr, zapid, oh);
+       if (IS_ERR(db))
+               GOTO(out, rc = PTR_ERR(db));
 
        zde->zde_pad = 0;
        zde->zde_dnode = db->db_object;
        zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
 
-       zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str));
+       zapid = osd_get_name_n_idx(env, osd, fid, buf);
 
        rc = -zap_add(osd->od_os, zapid, buf, 8, 1, zde, oh->ot_tx);
        if (rc)
                GOTO(out, rc);
 
-       /* Now add in all of the "SA" attributes */
-       rc = -sa_handle_get(osd->od_os, db->db_object, NULL,
-                           SA_HDL_PRIVATE, &obj->oo_sa_hdl);
-       if (rc)
-               GOTO(out, rc);
-
-       /* configure new osd object */
-       obj->oo_db = db;
-       parent = parent != 0 ? parent : zapid;
-       rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx,
-                            &obj->oo_attr, parent);
-       if (rc)
-               GOTO(out, rc);
-
-       /* XXX: oo_lma_flags */
-       obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
-       smp_mb();
-       obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
-       if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu))))
-               /* no body operations for accounting objects */
-               obj->oo_dt.do_body_ops = &osd_body_ops;
-
-       rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
-       if (rc)
-               GOTO(out, rc);
-
-       /* initialize LMA */
-       lustre_lma_init(lma, lu_object_fid(&obj->oo_dt.do_lu), 0, 0);
-       lustre_lma_swab(lma);
-       rc = -nvlist_add_byte_array(obj->oo_sa_xattr, XATTR_NAME_LMA,
-                                   (uchar_t *)lma, sizeof(*lma));
-       if (rc)
-               GOTO(out, rc);
-       rc = __osd_sa_xattr_update(env, obj, oh);
-       if (rc)
-               GOTO(out, rc);
-
        /* Add new object to inode accounting.
         * Errors are not considered as fatal */
        rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid,
@@ -1574,12 +1564,18 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d "
                        "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc);
 
+       /* configure new osd object */
+       obj->oo_db = db;
+       rc = osd_object_init0(env, obj);
+       LASSERT(ergo(rc == 0, dt_object_exists(dt)));
+       LASSERT(osd_invariant(obj));
+
+       rc = osd_init_lma(env, obj, fid, oh);
+       if (rc != 0)
+               CERROR("%s: can not set LMA on "DFID": rc = %d\n",
+                      osd->od_svname, PFID(fid), rc);
+
 out:
-       if (unlikely(rc && db)) {
-               dmu_object_free(osd->od_os, db->db_object, oh->ot_tx);
-               sa_buf_rele(db, osd_obj_tag);
-               obj->oo_db = NULL;
-       }
        up_write(&obj->oo_guard);
        RETURN(rc);
 }
index 6d3df99..560c90f 100644 (file)
@@ -142,9 +142,8 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o,
 {
        struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
        struct lu_attr          *la = &osd_oti_get(env)->oti_la;
-       sa_handle_t             *sa_hdl = NULL;
+       dmu_buf_t               *db;
        dmu_tx_t                *tx;
-       uint64_t                 oid;
        int                      rc;
 
        /* verify it doesn't already exist */
@@ -169,36 +168,21 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o,
                return rc;
        }
 
-       oid = zap_create_flags(o->od_os, 0, ZAP_FLAG_HASH64,
-                              DMU_OT_DIRECTORY_CONTENTS,
-                              14, /* == ZFS fzap_default_block_shift */
-                              DN_MAX_INDBLKSHIFT, /* indirect block shift */
-                              DMU_OT_SA, DN_MAX_BONUSLEN, tx);
-
-       rc = -sa_handle_get(o->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
-               goto commit;
        la->la_valid = LA_MODE | LA_UID | LA_GID;
        la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
        la->la_uid = la->la_gid = 0;
-       rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent);
-       sa_handle_destroy(sa_hdl);
-       if (rc)
-               goto commit;
+       __osd_zap_create(env, o, &db, tx, la, parent, 0);
 
-       zde->zde_dnode = oid;
+       zde->zde_dnode = db->db_object;
        zde->zde_pad = 0;
        zde->zde_type = IFTODT(S_IFDIR);
 
        rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx);
 
-commit:
-       if (rc)
-               dmu_object_free(o->od_os, oid, tx);
        dmu_tx_commit(tx);
 
-       if (rc == 0)
-               *child = oid;
+       *child = db->db_object;
+       sa_buf_rele(db, osd_obj_tag);
 
        return rc;
 }
@@ -382,7 +366,7 @@ out:
  */
 static uint64_t
 osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
-                       const struct lu_fid *fid, char *buf, int bufsize)
+                       const struct lu_fid *fid, char *buf)
 {
        struct osd_seq  *osd_seq;
        unsigned long   b;
@@ -407,8 +391,7 @@ osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
        b = id % OSD_OST_MAP_SIZE;
        LASSERT(osd_seq->os_compat_dirs[b]);
 
-       if (buf)
-               snprintf(buf, bufsize, LPU64, id);
+       sprintf(buf, LPU64, id);
 
        return osd_seq->os_compat_dirs[b];
 }
@@ -433,29 +416,28 @@ osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
 
        LASSERT(osd->od_oi_table != NULL);
        oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
-       if (buf)
-               osd_fid2str(buf, fid);
+       osd_fid2str(buf, fid);
 
        return oi->oi_zapid;
 }
 
 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
-                           const struct lu_fid *fid, char *buf, int bufsize)
+                           const struct lu_fid *fid, char *buf)
 {
        uint64_t zapid;
 
        LASSERT(fid);
+       LASSERT(buf);
 
        if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) {
-               zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf, bufsize);
+               zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf);
        } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
                /* special objects with fixed known fids get their name */
                char *name = oid2name(fid_oid(fid));
 
                if (name) {
                        zapid = osd->od_root;
-                       if (buf)
-                               strncpy(buf, name, bufsize);
+                       strcpy(buf, name);
                        if (fid_is_acct(fid))
                                zapid = MASTER_NODE_OBJ;
                } else {
@@ -495,8 +477,8 @@ int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev,
        } else if (unlikely(fid_is_fs_root(fid))) {
                *oid = dev->od_root;
        } else {
-               zapid = osd_get_name_n_idx(env, dev, fid, buf,
-                                          sizeof(info->oti_buf));
+               zapid = osd_get_name_n_idx(env, dev, fid, buf);
+
                rc = -zap_lookup(dev->od_os, zapid, buf,
                                8, 1, &info->oti_zde);
                if (rc)
index 0ea9977..218464f 100644 (file)
@@ -267,18 +267,39 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc);
 }
 
-/* the function is used to declare EAs when SA is not supported */
-void __osd_xattr_declare_legacy(const struct lu_env *env,
-                               struct osd_object *obj,
-                               int vallen, const char *name,
-                               struct osd_thandle *oh)
+void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
+                            int vallen, const char *name,
+                            struct osd_thandle *oh)
 {
        struct osd_device *osd = osd_obj2dev(obj);
-       dmu_tx_t *tx = oh->ot_tx;
-       uint64_t xa_data_obj;
-       int rc;
+       dmu_buf_t         *db = obj->oo_db;
+       dmu_tx_t          *tx = oh->ot_tx;
+       uint64_t           xa_data_obj;
+       int                rc = 0;
+       int                here;
 
-       if (obj->oo_xattr == ZFS_NO_OBJECT) {
+       if (unlikely(obj->oo_destroyed))
+               return;
+
+       here = dt_object_exists(&obj->oo_dt);
+
+       /* object may be not yet created */
+       if (here) {
+               LASSERT(db);
+               LASSERT(obj->oo_sa_hdl);
+               /* we might just update SA_ZPL_DXATTR */
+               dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
+
+               if (obj->oo_xattr == ZFS_NO_OBJECT)
+                       rc = -ENOENT;
+       }
+
+       if (!here || rc == -ENOENT) {
+               /* we'll be updating SA_ZPL_XATTR */
+               if (here) {
+                       LASSERT(obj->oo_sa_hdl);
+                       dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
+               }
                /* xattr zap + entry */
                dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name);
                /* xattr value obj */
@@ -297,6 +318,7 @@ void __osd_xattr_declare_legacy(const struct lu_env *env,
                dmu_tx_hold_bonus(tx, xa_data_obj);
                dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END);
                dmu_tx_hold_write(tx, xa_data_obj, 0, vallen);
+               return;
        } else if (rc == -ENOENT) {
                /*
                 * Entry doesn't exist, we need to create a new one and a new
@@ -306,43 +328,11 @@ void __osd_xattr_declare_legacy(const struct lu_env *env,
                dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name);
                dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
                dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
-       }
-}
-
-void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
-                            int vallen, const char *name,
-                            struct osd_thandle *oh)
-{
-       dmu_buf_t         *db = obj->oo_db;
-       dmu_tx_t          *tx = oh->ot_tx;
-
-       if (unlikely(obj->oo_destroyed))
-               return;
-
-       if (unlikely(!osd_obj2dev(obj)->od_xattr_in_sa)) {
-               __osd_xattr_declare_legacy(env, obj, vallen, name, oh);
                return;
        }
 
-       /* declare EA in SA */
-       if (dt_object_exists(&obj->oo_dt)) {
-               LASSERT(obj->oo_sa_hdl);
-               /* XXX: it should be possible to skip spill
-                * declaration if specific EA is part of
-                * bonus and doesn't grow */
-               dmu_tx_hold_spill(tx, db->db_object);
-               return;
-       }
-
-       /* the object doesn't exist, but we've declared bonus
-        * in osd_declare_object_create() yet */
-       if (obj->oo_ea_in_bonus > DN_MAX_BONUSLEN) {
-               /* spill has been declared already */
-       } else if (obj->oo_ea_in_bonus + vallen > DN_MAX_BONUSLEN) {
-               /* we're about to exceed bonus, let's declare spill */
-               dmu_tx_hold_spill(tx, DMU_NEW_OBJECT);
-       }
-       obj->oo_ea_in_bonus += vallen;
+       /* An error happened */
+       tx->tx_err = -rc;
 }
 
 int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
@@ -504,7 +494,8 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
 
                la->la_valid = LA_MODE;
                la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
-               rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la, 0);
+               rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la,
+                                     obj->oo_db->db_object, 0);
                if (rc)
                        return rc;
 
@@ -559,7 +550,8 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
 
                la->la_valid = LA_MODE;
                la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
-               rc = __osd_object_create(env, obj, &xa_data_db, tx, la);
+               rc = __osd_object_create(env, obj, &xa_data_db, tx, la,
+                                        obj->oo_xattr);
                if (rc)
                        goto out;
                xa_data_obj = xa_data_db->db_object;