Whamcloud - gitweb
LU-5242 osd-zfs: umount hang in sanity 133g 30/13630/17
authorIsaac Huang <he.huang@intel.com>
Mon, 20 Apr 2015 17:27:21 +0000 (11:27 -0600)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 17 May 2015 22:46:50 +0000 (22:46 +0000)
It's not optimal to free large/sparse object directly by
dmu_object_free(). The dmu_tx_count_free() may over estimate the
memory overhead and cause the TX to fail in arc_tempreserve_space()
and eventually hang.

The correct way is to truncate the object with dmu_free_long_range()
before freeing it. This patch moves large object to ZPL unlinked set
and then later truncates and frees it, with as many TXs as needed.

Change-Id: I077dbcafff5abaea4a42dc9bc3f9ae980bdcdc54
Signed-off-by: Isaac Huang <he.huang@intel.com>
Test-Parameters: envdefinitions=SLOW=yes mdtfilesystemtype=zfs mdsfilesystemtype=zfs ostfilesystemtype=zfs
Reviewed-on: http://review.whamcloud.com/13630
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c
lustre/osd-zfs/osd_xattr.c
lustre/tests/sanity.sh

index e13909c..c87cc00 100644 (file)
@@ -227,6 +227,26 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
        RETURN(rc);
 }
 
+static int osd_unlinked_object_free(struct osd_device *osd, uint64_t oid);
+
+static void osd_unlinked_list_emptify(struct osd_device *osd,
+                                     struct list_head *list, bool free)
+{
+       struct osd_object *obj;
+       uint64_t           oid;
+
+       while (!list_empty(list)) {
+               obj = list_entry(list->next,
+                                struct osd_object, oo_unlinked_linkage);
+               LASSERT(obj->oo_db != NULL);
+               oid = obj->oo_db->db_object;
+
+               list_del_init(&obj->oo_unlinked_linkage);
+               if (free)
+                       (void)osd_unlinked_object_free(osd, oid);
+       }
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
@@ -234,17 +254,22 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
                          struct thandle *th)
 {
        struct osd_device       *osd = osd_dt_dev(th->th_dev);
+       bool                     sync = (th->th_sync != 0);
        struct osd_thandle      *oh;
+       struct list_head         unlinked;
        uint64_t                 txg;
        int                      rc;
        ENTRY;
 
        oh = container_of0(th, struct osd_thandle, ot_super);
+       INIT_LIST_HEAD(&unlinked);
+       list_splice_init(&oh->ot_unlinked_list, &unlinked);
 
        if (oh->ot_assigned == 0) {
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
                osd_object_sa_dirty_rele(oh);
+               osd_unlinked_list_emptify(osd, &unlinked, false);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
                qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
@@ -271,9 +296,13 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        txg = oh->ot_tx->tx_txg;
 
        osd_object_sa_dirty_rele(oh);
+       /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
+        * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
 
-       if (th->th_sync)
+       osd_unlinked_list_emptify(osd, &unlinked, true);
+
+       if (sync)
                txg_wait_synced(dmu_objset_pool(osd->od_os), txg);
 
        RETURN(rc);
@@ -301,6 +330,7 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
 
        oh->ot_tx = tx;
        INIT_LIST_HEAD(&oh->ot_dcb_list);
+       INIT_LIST_HEAD(&oh->ot_unlinked_list);
        INIT_LIST_HEAD(&oh->ot_sa_list);
        sema_init(&oh->ot_sa_lock, 1);
        memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
@@ -783,6 +813,14 @@ static int osd_objset_open(struct osd_device *o)
                GOTO(out, rc);
        }
 
+       rc = -zap_lookup(o->od_os, MASTER_NODE_OBJ, ZFS_UNLINKED_SET,
+                        8, 1, &o->od_unlinkedid);
+       if (rc) {
+               CERROR("%s: lookup for %s failed: rc = %d\n",
+                      o->od_svname, ZFS_UNLINKED_SET, rc);
+               GOTO(out, rc);
+       }
+
        /* Check that user/group usage tracking is supported */
        if (!dmu_objset_userused_enabled(o->od_os) ||
            DMU_USERUSED_DNODE(o->od_os)->dn_type != DMU_OT_USERGROUP_USED ||
@@ -793,12 +831,81 @@ static int osd_objset_open(struct osd_device *o)
        }
 
 out:
-       if (rc != 0 && o->od_os != NULL)
+       if (rc != 0 && o->od_os != NULL) {
                dmu_objset_disown(o->od_os, o);
+               o->od_os = NULL;
+       }
 
        RETURN(rc);
 }
 
+static int
+osd_unlinked_object_free(struct osd_device *osd, uint64_t oid)
+{
+       int       rc;
+       dmu_tx_t *tx;
+
+       rc = -dmu_free_long_range(osd->od_os, oid, 0, DMU_OBJECT_END);
+       if (rc != 0) {
+               CWARN("%s: Cannot truncate "LPU64": rc = %d\n",
+                     osd->od_svname, oid, rc);
+               return rc;
+       }
+
+       tx = dmu_tx_create(osd->od_os);
+       dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
+       dmu_tx_hold_zap(tx, osd->od_unlinkedid, FALSE, NULL);
+       rc = -dmu_tx_assign(tx, TXG_WAIT);
+       if (rc != 0) {
+               CWARN("%s: Cannot assign tx for "LPU64": rc = %d\n",
+                     osd->od_svname, oid, rc);
+               goto failed;
+       }
+
+       rc = -zap_remove_int(osd->od_os, osd->od_unlinkedid, oid, tx);
+       if (rc != 0) {
+               CWARN("%s: Cannot remove "LPU64" from unlinked set: rc = %d\n",
+                     osd->od_svname, oid, rc);
+               goto failed;
+       }
+
+       rc = -dmu_object_free(osd->od_os, oid, tx);
+       if (rc != 0) {
+               CWARN("%s: Cannot free "LPU64": rc = %d\n",
+                     osd->od_svname, oid, rc);
+               goto failed;
+       }
+       dmu_tx_commit(tx);
+
+       return 0;
+
+failed:
+       LASSERT(rc != 0);
+       dmu_tx_abort(tx);
+
+       return rc;
+}
+
+static void
+osd_unlinked_drain(const struct lu_env *env, struct osd_device *osd)
+{
+       zap_cursor_t     zc;
+       zap_attribute_t *za = &osd_oti_get(env)->oti_za;
+
+       zap_cursor_init(&zc, osd->od_os, osd->od_unlinkedid);
+
+       while (zap_cursor_retrieve(&zc, za) == 0) {
+               /* If cannot free the object, leave it in the unlinked set,
+                * until the OSD is mounted again when obd_unlinked_drain()
+                * will be called. */
+               if (osd_unlinked_object_free(osd, za->za_first_integer) != 0)
+                       break;
+               zap_cursor_advance(&zc);
+       }
+
+       zap_cursor_fini(&zc);
+}
+
 static int osd_mount(const struct lu_env *env,
                     struct osd_device *o, struct lustre_cfg *cfg)
 {
@@ -884,6 +991,7 @@ static int osd_mount(const struct lu_env *env,
        if (opts == NULL || strstr(opts, "noacl") == NULL)
                o->od_posix_acl = 1;
 
+       osd_unlinked_drain(env, o);
 err:
        if (rc) {
                dmu_objset_disown(o->od_os, o);
index 367b7f7..261ac90 100644 (file)
@@ -698,8 +698,8 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
        int rc;
        ENTRY;
 
-       LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
+       LASSERT(zap_db);
+       LASSERT(osd_object_is_zap(zap_db));
 
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
index c698db1..12bf5a5 100644 (file)
@@ -203,6 +203,7 @@ static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
 struct osd_thandle {
        struct thandle           ot_super;
        struct list_head         ot_dcb_list;
+       struct list_head         ot_unlinked_list;
        struct list_head         ot_sa_list;
        struct semaphore         ot_sa_lock;
        dmu_tx_t                *ot_tx;
@@ -245,6 +246,7 @@ struct osd_device {
        /* information about underlying file system */
        struct objset           *od_os;
        uint64_t                 od_rootid;  /* id of root znode */
+       uint64_t                 od_unlinkedid; /* id of unlinked zapobj */
        /* SA attr mapping->id,
         * name is the same as in ZFS to use defines SA_ZPL_...*/
        sa_attr_type_t           *z_attr_table;
@@ -295,6 +297,12 @@ struct osd_device {
        struct lu_client_seq    *od_cl_seq;
 };
 
+enum osd_destroy_type {
+       OSD_DESTROY_NONE = 0,
+       OSD_DESTROY_SYNC = 1,
+       OSD_DESTROY_ASYNC = 2,
+};
+
 struct osd_object {
        struct dt_object         oo_dt;
        /*
@@ -308,6 +316,7 @@ struct osd_object {
        sa_handle_t             *oo_sa_hdl;
        nvlist_t                *oo_sa_xattr;
        struct list_head         oo_sa_linkage;
+       struct list_head         oo_unlinked_linkage;
 
        struct rw_semaphore      oo_sem;
 
@@ -315,9 +324,10 @@ struct osd_object {
        rwlock_t                 oo_attr_lock;
        struct lu_attr           oo_attr;
 
-       /* protects extended attributes */
+       /* protects extended attributes and oo_unlinked_linkage */
        struct semaphore         oo_guard;
        uint64_t                 oo_xattr;
+       enum osd_destroy_type    oo_destroy;
 
        /* record size for index file */
        unsigned char            oo_keysize;
@@ -484,6 +494,11 @@ int osd_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
                          const char *name, struct thandle *handle);
 int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *handle);
+void osd_declare_xattrs_destroy(const struct lu_env *env,
+                               struct osd_object *obj,
+                               struct osd_thandle *oh);
+int osd_xattrs_destroy(const struct lu_env *env,
+                      struct osd_object *obj, struct osd_thandle *oh);
 int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
                   const struct lu_buf *lb);
 void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
index ab0a288..2279df0 100644 (file)
@@ -136,7 +136,7 @@ void osd_object_sa_dirty_rele(struct osd_thandle *oh)
        down(&oh->ot_sa_lock);
        while (!list_empty(&oh->ot_sa_list)) {
                obj = list_entry(oh->ot_sa_list.next,
-                                    struct osd_object, oo_sa_linkage);
+                                struct osd_object, oo_sa_linkage);
                sa_spill_rele(obj->oo_sa_hdl);
                write_lock(&obj->oo_attr_lock);
                list_del_init(&obj->oo_sa_linkage);
@@ -291,9 +291,11 @@ struct lu_object *osd_object_alloc(const struct lu_env *env,
                mo->oo_dt.do_ops = &osd_obj_ops;
                l->lo_ops = &osd_lu_obj_ops;
                INIT_LIST_HEAD(&mo->oo_sa_linkage);
+               INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
                init_rwsem(&mo->oo_sem);
                sema_init(&mo->oo_guard, 1);
                rwlock_init(&mo->oo_attr_lock);
+               mo->oo_destroy = OSD_DESTROY_NONE;
                return l;
        } else {
                return NULL;
@@ -306,7 +308,7 @@ struct lu_object *osd_object_alloc(const struct lu_env *env,
 int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
 {
        struct osd_device       *osd = osd_obj2dev(obj);
-       const struct lu_fid     *fid  = lu_object_fid(&obj->oo_dt.do_lu);
+       const struct lu_fid     *fid = lu_object_fid(&obj->oo_dt.do_lu);
        int                      rc = 0;
        ENTRY;
 
@@ -432,53 +434,46 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
        OBD_SLAB_FREE_PTR(obj, osd_object_kmem);
 }
 
-static void __osd_declare_object_destroy(const struct lu_env *env,
-                                        struct osd_object *obj,
-                                        struct osd_thandle *oh)
+static int
+osd_object_unlinked_add(struct osd_object *obj, struct osd_thandle *oh)
 {
-       struct osd_device       *osd = osd_obj2dev(obj);
-       dmu_buf_t               *db = obj->oo_db;
-       zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
-       uint64_t                 oid = db->db_object, xid;
-       dmu_tx_t                *tx = oh->ot_tx;
-       zap_cursor_t            *zc;
-       int                      rc = 0;
+       int rc = -EBUSY;
 
-       dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
+       down(&obj->oo_guard);
 
-       /* zap holding xattrs */
-       if (obj->oo_xattr != ZFS_NO_OBJECT) {
-               oid = obj->oo_xattr;
+       LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
 
-               dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
+       if (likely(list_empty(&obj->oo_unlinked_linkage))) {
+               list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
+               rc = 0;
+       }
 
-               rc = osd_zap_cursor_init(&zc, osd->od_os, oid, 0);
-               if (rc)
-                       goto out;
-
-               while ((rc = -zap_cursor_retrieve(zc, za)) == 0) {
-                       BUG_ON(za->za_integer_length != sizeof(uint64_t));
-                       BUG_ON(za->za_num_integers != 1);
-
-                       rc = -zap_lookup(osd->od_os, obj->oo_xattr, za->za_name,
-                                        sizeof(uint64_t), 1, &xid);
-                       if (rc) {
-                               CERROR("%s: xattr lookup failed: rc = %d\n",
-                                      osd->od_svname, rc);
-                               goto out_err;
-                       }
-                       dmu_tx_hold_free(tx, xid, 0, DMU_OBJECT_END);
-
-                       zap_cursor_advance(zc);
-               }
-               if (rc == -ENOENT)
-                       rc = 0;
-out_err:
-               osd_zap_cursor_fini(zc);
+       up(&obj->oo_guard);
+
+       return rc;
+}
+
+/* Default to max data size covered by a level-1 indirect block */
+static unsigned long osd_sync_destroy_max_size =
+       1UL << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT + SPA_MAXBLOCKSHIFT);
+CFS_MODULE_PARM(osd_sync_destroy_max_size, "ul", ulong, 0444,
+               "Maximum object size to use synchronous destroy.");
+
+static inline void
+osd_object_set_destroy_type(struct osd_object *obj)
+{
+       /*
+        * Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
+        * only once and use it consistently thereafter.
+        */
+       down(&obj->oo_guard);
+       if (obj->oo_destroy == OSD_DESTROY_NONE) {
+               if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
+                       obj->oo_destroy = OSD_DESTROY_SYNC;
+               else /* Larger objects are destroyed asynchronously */
+                       obj->oo_destroy = OSD_DESTROY_ASYNC;
        }
-out:
-       if (rc && tx->tx_err == 0)
-               tx->tx_err = -rc;
+       up(&obj->oo_guard);
 }
 
 static int osd_declare_object_destroy(const struct lu_env *env,
@@ -490,8 +485,8 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
-       uint64_t                 zapid;
        int                      rc;
+       uint64_t                 zapid;
        ENTRY;
 
        LASSERT(th != NULL);
@@ -500,19 +495,18 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        oh = container_of0(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_tx != NULL);
 
-       /* declare that we'll destroy the object */
-       __osd_declare_object_destroy(env, obj, oh);
-
        /* declare that we'll remove object from fid-dnode mapping */
        zapid = osd_get_name_n_idx(env, osd, fid, buf);
        dmu_tx_hold_bonus(oh->ot_tx, zapid);
-       dmu_tx_hold_zap(oh->ot_tx, zapid, 0, buf);
+       dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, buf);
+
+       osd_declare_xattrs_destroy(env, obj, oh);
 
        /* declare that we'll remove object from inode accounting ZAPs */
        dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, 0, buf);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, buf);
        dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
-       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, 0, buf);
+       dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, buf);
 
        /* one less inode */
        rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
@@ -523,68 +517,21 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        /* data to be truncated */
        rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
                               obj->oo_attr.la_gid, 0, oh, true, NULL, false);
-       RETURN(rc);
-}
-
-/*
- * Delete a DMU object
- *
- * The transaction passed to this routine must have
- * dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END) called
- * and then assigned to a transaction group.
- *
- * This will release db and set it to NULL to prevent further dbuf releases.
- */
-static int __osd_object_destroy(const struct lu_env *env,
-                               struct osd_object *obj,
-                               dmu_tx_t *tx, void *tag)
-{
-       struct osd_device       *osd = osd_obj2dev(obj);
-       uint64_t                 xid;
-       zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
-       zap_cursor_t            *zc;
-       int                      rc;
-
-       /* Assert that the transaction has been assigned to a
-          transaction group. */
-       LASSERT(tx->tx_txg != 0);
-
-       /* zap holding xattrs */
-       if (obj->oo_xattr != ZFS_NO_OBJECT) {
-               rc = osd_zap_cursor_init(&zc, osd->od_os, obj->oo_xattr, 0);
-               if (rc)
-                       return rc;
-               while ((rc = -zap_cursor_retrieve(zc, za)) == 0) {
-                       BUG_ON(za->za_integer_length != sizeof(uint64_t));
-                       BUG_ON(za->za_num_integers != 1);
-
-                       rc = -zap_lookup(osd->od_os, obj->oo_xattr, za->za_name,
-                                        sizeof(uint64_t), 1, &xid);
-                       if (rc) {
-                               CERROR("%s: lookup xattr %s failed: rc = %d\n",
-                                      osd->od_svname, za->za_name, rc);
-                               continue;
-                       }
-                       rc = -dmu_object_free(osd->od_os, xid, tx);
-                       if (rc)
-                               CERROR("%s: fetch xattr %s failed: rc = %d\n",
-                                      osd->od_svname, za->za_name, rc);
-                       zap_cursor_advance(zc);
-               }
-               osd_zap_cursor_fini(zc);
+       if (rc)
+               RETURN(rc);
 
-               rc = -dmu_object_free(osd->od_os, obj->oo_xattr, tx);
-               if (rc)
-                       CERROR("%s: freeing xattr failed: rc = %d\n",
-                              osd->od_svname, rc);
-       }
+       osd_object_set_destroy_type(obj);
+       if (obj->oo_destroy == OSD_DESTROY_SYNC)
+               dmu_tx_hold_free(oh->ot_tx, obj->oo_db->db_object,
+                                0, DMU_OBJECT_END);
+       else
+               dmu_tx_hold_zap(oh->ot_tx, osd->od_unlinkedid, TRUE, NULL);
 
-       return -dmu_object_free(osd->od_os, obj->oo_db->db_object, tx);
+       RETURN(0);
 }
 
 static int osd_object_destroy(const struct lu_env *env,
-                             struct dt_object *dt,
-                             struct thandle *th)
+                             struct dt_object *dt, struct thandle *th)
 {
        char                    *buf = osd_oti_get(env)->oti_str;
        struct osd_object       *obj = osd_dt_obj(dt);
@@ -592,7 +539,7 @@ static int osd_object_destroy(const struct lu_env *env,
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osd_thandle      *oh;
        int                      rc;
-       uint64_t                 zapid;
+       uint64_t                 oid, zapid;
        ENTRY;
 
        LASSERT(obj->oo_db != NULL);
@@ -603,13 +550,19 @@ static int osd_object_destroy(const struct lu_env *env,
        LASSERT(oh != NULL);
        LASSERT(oh->ot_tx != NULL);
 
-       zapid = osd_get_name_n_idx(env, osd, fid, buf);
-
        /* remove obj ref from index dir (it depends) */
+       zapid = osd_get_name_n_idx(env, osd, fid, buf);
        rc = -zap_remove(osd->od_os, zapid, buf, oh->ot_tx);
        if (rc) {
-               CERROR("%s: zap_remove() failed: rc = %d\n",
-                      osd->od_svname, rc);
+               CERROR("%s: zap_remove(%s) failed: rc = %d\n",
+                      osd->od_svname, buf, rc);
+               GOTO(out, rc);
+       }
+
+       rc = osd_xattrs_destroy(env, obj, oh);
+       if (rc) {
+               CERROR("%s: cannot destroy xattrs for %s: rc = %d\n",
+                      osd->od_svname, buf, rc);
                GOTO(out, rc);
        }
 
@@ -617,24 +570,34 @@ static int osd_object_destroy(const struct lu_env *env,
         * operation if something goes wrong while updating accounting, but we
         * still log an error message to notify the administrator */
        rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid,
-                       obj->oo_attr.la_uid, -1, oh->ot_tx);
+                               obj->oo_attr.la_uid, -1, oh->ot_tx);
        if (rc)
                CERROR("%s: failed to remove "DFID" from accounting ZAP for usr"
-                       " %d: rc = %d\n", osd->od_svname, PFID(fid),
-                       obj->oo_attr.la_uid, rc);
+                      " %d: rc = %d\n", osd->od_svname, PFID(fid),
+                      obj->oo_attr.la_uid, rc);
        rc = -zap_increment_int(osd->od_os, osd->od_igrp_oid,
                                obj->oo_attr.la_gid, -1, oh->ot_tx);
        if (rc)
                CERROR("%s: failed to remove "DFID" from accounting ZAP for grp"
-                       " %d: rc = %d\n", osd->od_svname, PFID(fid),
-                       obj->oo_attr.la_gid, rc);
+                      " %d: rc = %d\n", osd->od_svname, PFID(fid),
+                      obj->oo_attr.la_gid, rc);
 
-       /* kill object */
-       rc = __osd_object_destroy(env, obj, oh->ot_tx, osd_obj_tag);
-       if (rc) {
-               CERROR("%s: __osd_object_destroy() failed: rc = %d\n",
-                      osd->od_svname, rc);
-               GOTO(out, rc);
+       oid = obj->oo_db->db_object;
+       if (obj->oo_destroy == OSD_DESTROY_SYNC) {
+               rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
+               if (rc)
+                       CERROR("%s: failed to free %s "LPU64": rc = %d\n",
+                              osd->od_svname, buf, oid, rc);
+       } else { /* asynchronous destroy */
+               rc = osd_object_unlinked_add(obj, oh);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = -zap_add_int(osd->od_os, osd->od_unlinkedid,
+                                 oid, oh->ot_tx);
+               if (rc)
+                       CERROR("%s: zap_add_int() failed %s "LPU64": rc = %d\n",
+                              osd->od_svname, buf, oid, rc);
        }
 
 out:
index 0a663fc..435e654 100644 (file)
@@ -689,8 +689,7 @@ static char *root2convert = "ROOT";
  * This is only needed for pre-production 2.4 ZFS filesystems, and
  * can be removed in the future.
  */
-int osd_convert_root_to_new_seq(const struct lu_env *env,
-                                       struct osd_device *o)
+int osd_convert_root_to_new_seq(const struct lu_env *env, struct osd_device *o)
 {
        struct luz_direntry *lze = &osd_oti_get(env)->oti_zde;
        char                *buf = osd_oti_get(env)->oti_str;
index 4ec0912..777fdb0 100644 (file)
@@ -752,6 +752,96 @@ int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc);
 }
 
+void osd_declare_xattrs_destroy(const struct lu_env *env,
+                               struct osd_object *obj, struct osd_thandle *oh)
+{
+       struct osd_device *osd = osd_obj2dev(obj);
+       zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
+       uint64_t           oid = obj->oo_xattr, xid;
+       dmu_tx_t          *tx = oh->ot_tx;
+       zap_cursor_t      *zc;
+       int                rc;
+
+       if (oid == ZFS_NO_OBJECT)
+               return; /* Nothing to do for SA xattrs */
+
+       /* Declare to free the ZAP holding xattrs */
+       dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
+
+       rc = osd_zap_cursor_init(&zc, osd->od_os, oid, 0);
+       if (rc)
+               goto out;
+
+       while (zap_cursor_retrieve(zc, za) == 0) {
+               LASSERT(za->za_num_integers == 1);
+               LASSERT(za->za_integer_length == sizeof(uint64_t));
+
+               rc = -zap_lookup(osd->od_os, oid, za->za_name,
+                                sizeof(uint64_t), 1, &xid);
+               if (rc) {
+                       CERROR("%s: xattr %s lookup failed: rc = %d\n",
+                              osd->od_svname, za->za_name, rc);
+                       break;
+               }
+               dmu_tx_hold_free(tx, xid, 0, DMU_OBJECT_END);
+
+               zap_cursor_advance(zc);
+       }
+
+       osd_zap_cursor_fini(zc);
+out:
+       if (rc && tx->tx_err == 0)
+               tx->tx_err = -rc;
+}
+
+int osd_xattrs_destroy(const struct lu_env *env,
+                      struct osd_object *obj, struct osd_thandle *oh)
+{
+       struct osd_device *osd = osd_obj2dev(obj);
+       dmu_tx_t          *tx = oh->ot_tx;
+       zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
+       zap_cursor_t      *zc;
+       uint64_t           xid;
+       int                rc;
+
+       /* The transaction must have been assigned to a transaction group. */
+       LASSERT(tx->tx_txg != 0);
+
+       if (obj->oo_xattr == ZFS_NO_OBJECT)
+               return 0; /* Nothing to do for SA xattrs */
+
+       /* Free the ZAP holding the xattrs */
+       rc = osd_zap_cursor_init(&zc, osd->od_os, obj->oo_xattr, 0);
+       if (rc)
+               return rc;
+
+       while (zap_cursor_retrieve(zc, za) == 0) {
+               LASSERT(za->za_num_integers == 1);
+               LASSERT(za->za_integer_length == sizeof(uint64_t));
+
+               rc = -zap_lookup(osd->od_os, obj->oo_xattr, za->za_name,
+                                sizeof(uint64_t), 1, &xid);
+               if (rc) {
+                       CERROR("%s: lookup xattr %s failed: rc = %d\n",
+                              osd->od_svname, za->za_name, rc);
+               } else {
+                       rc = -dmu_object_free(osd->od_os, xid, tx);
+                       if (rc)
+                               CERROR("%s: free xattr %s failed: rc = %d\n",
+                                      osd->od_svname, za->za_name, rc);
+               }
+               zap_cursor_advance(zc);
+       }
+       osd_zap_cursor_fini(zc);
+
+       rc = -dmu_object_free(osd->od_os, obj->oo_xattr, tx);
+       if (rc)
+               CERROR("%s: free xattr "LPU64" failed: rc = %d\n",
+                      osd->od_svname, obj->oo_xattr, rc);
+
+       return rc;
+}
+
 static int
 osd_sa_xattr_list(const struct lu_env *env, struct osd_object *obj,
                  const struct lu_buf *lb)
index 7733ffc..08f125d 100644 (file)
@@ -61,8 +61,8 @@ init_logging
 [ "$SLOW" = "no" ] && EXCEPT_SLOW="24o 24D 27m 64b 68 71 77f 78 115 124b 230d"
 
 if [ $(facet_fstype $SINGLEMDS) = "zfs" ]; then
-       # bug number for skipped test: LU-4536 LU-5242   LU-1957 LU-2805
-       ALWAYS_EXCEPT="$ALWAYS_EXCEPT  65ic     78 79 80 180     184c"
+       # bug number for skipped test: LU-4536 LU-1957 LU-2805
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT  65ic    180     184c"
        [ "$SLOW" = "no" ] && EXCEPT_SLOW="$EXCEPT_SLOW 51b 51ba"
 fi