Whamcloud - gitweb
LU-909 osd: changes to osd api
authorAlex Zhuravlev <bzzz@whamcloud.com>
Fri, 6 Jan 2012 05:01:40 +0000 (13:01 +0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 6 Jan 2012 21:36:55 +0000 (16:36 -0500)
the main purpose of the patch is to get declare methods in the API
and to teach osd-based devices to use that

- new declaration methods for each changing method
- explicit destroy method:
  ->do_ref_del() never destroy object
- methods to access data in 0-copy manner:
  no actual implementation in this patch
- mdd/fld use new methods to create/declare/start transactions
- specific method to change/access version are removed:
  use xattr methods
- ldiskfs osd tracks all declarations and asserts if caller
  is trying to call changing method w/o proper declaration

Change-Id: I473c0c2950c1920abb2fef1dac465c08f35522ea
Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Signed-off-by: Hongchao Zhang <hongchao.zhang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1669
Tested-by: Hudson
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
28 files changed:
lustre/autoconf/lustre-core.m4
lustre/cmm/cmm_object.c
lustre/fid/fid_internal.h
lustre/fid/fid_store.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/include/dt_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fld.h
lustre/include/md_object.h
lustre/mdd/mdd_device.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_object.c
lustre/mdd/mdd_orphans.c
lustre/mdd/mdd_trans.c
lustre/mdt/mdt_capa.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/obdclass/dt_object.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/ptlrpc/target.c
lustre/tests/replay-single.sh

index 8bd7f6b..4fea8cc 100644 (file)
@@ -510,7 +510,10 @@ AC_TRY_RUN([
        AC_MSG_RESULT([ACL size $acl_size])
         AC_DEFINE_UNQUOTED(XATTR_ACL_SIZE, AS_TR_SH([$acl_size]), [size of xattr acl])
 ],[
-        AC_ERROR([ACL size can't computed])
+        AC_ERROR([ACL size can't be computed])
+],[
+       AC_MSG_RESULT([can't check ACL size, make it 260])
+        AC_DEFINE_UNQUOTED(XATTR_ACL_SIZE,260)
 ])
 CFLAGS="$tmp_flags"
 ])
index df839c9..01e9244 100644 (file)
@@ -411,18 +411,6 @@ static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
         RETURN(rc);
 }
 
-static dt_obj_version_t cml_version_get(const struct lu_env *env,
-                                        struct md_object *mo)
-{
-        return mo_version_get(env, md_object_next(mo));
-}
-
-static void cml_version_set(const struct lu_env *env, struct md_object *mo,
-                            dt_obj_version_t version)
-{
-        return mo_version_set(env, md_object_next(mo), version);
-}
-
 static const struct md_object_operations cml_mo_ops = {
         .moo_permission    = cml_permission,
         .moo_attr_get      = cml_attr_get,
@@ -441,8 +429,6 @@ static const struct md_object_operations cml_mo_ops = {
         .moo_changelog     = cml_changelog,
         .moo_capa_get      = cml_capa_get,
         .moo_object_sync   = cml_object_sync,
-        .moo_version_get   = cml_version_get,
-        .moo_version_set   = cml_version_set,
         .moo_path          = cml_path,
         .moo_file_lock     = cml_file_lock,
         .moo_file_unlock   = cml_file_unlock,
@@ -1140,28 +1126,6 @@ static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo,
         return -EREMOTE;
 }
 
-/**
- * cmr moo_version_get().
- */
-static dt_obj_version_t cmr_version_get(const struct lu_env *env,
-                                        struct md_object *mo)
-{
-        /** Don't check remote object version */
-        return 0;
-}
-
-
-/**
- * cmr moo_version_set().
- * No need to update remote object version here, it is done as a part
- * of reintegration of partial operation on the remote server.
- */
-static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
-                            dt_obj_version_t version)
-{
-        return;
-}
-
 /** Set of md_object_operations for cmr. */
 static const struct md_object_operations cmr_mo_ops = {
         .moo_permission    = cmr_permission,
@@ -1181,8 +1145,6 @@ static const struct md_object_operations cmr_mo_ops = {
         .moo_changelog     = cmr_changelog,
         .moo_capa_get      = cmr_capa_get,
         .moo_object_sync   = cmr_object_sync,
-        .moo_version_get   = cmr_version_get,
-        .moo_version_set   = cmr_version_set,
         .moo_path          = cmr_path,
         .moo_file_lock     = cmr_file_lock,
         .moo_file_unlock   = cmr_file_unlock,
index 96fb114..7908db8 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -48,7 +49,6 @@
 #ifdef __KERNEL__
 struct seq_thread_info {
         struct req_capsule     *sti_pill;
-        struct txn_param        sti_txn;
         struct lu_seq_range     sti_space;
         struct lu_buf           sti_buf;
 };
index c09cf0c..71e4f4d 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -88,6 +89,26 @@ void seq_update_cb(struct lu_env *env, struct thandle *th,
         OBD_FREE_PTR(ccb);
 }
 
+struct thandle *seq_store_trans_create(struct lu_server_seq *seq,
+                                       const struct lu_env *env)
+{
+        struct dt_device *dt_dev;
+
+        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+        return dt_trans_create(env, dt_dev);
+}
+
+int seq_store_trans_start(struct lu_server_seq *seq, const struct lu_env *env,
+                          struct thandle *th)
+{
+        struct dt_device *dt_dev;
+        ENTRY;
+
+        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+
+        return dt_trans_start(env, dt_dev, th);
+}
+
 int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
 {
         struct seq_update_callback *ccb;
@@ -106,6 +127,20 @@ int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
         return rc;
 }
 
+int seq_declare_store_write(struct lu_server_seq *seq,
+                            const struct lu_env *env,
+                            struct thandle *th)
+{
+        struct dt_object *dt_obj = seq->lss_obj;
+        int rc;
+        ENTRY;
+
+        rc = dt_obj->do_body_ops->dbo_declare_write(env, dt_obj,
+                                                    sizeof(struct lu_seq_range),
+                                                    0, th);
+        return rc;
+}
+
 /* This function implies that caller takes care about locking. */
 int seq_store_write(struct lu_server_seq *seq,
                     const struct lu_env *env,
@@ -141,36 +176,44 @@ int seq_store_write(struct lu_server_seq *seq,
 int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
                      struct lu_seq_range *out, int sync)
 {
-        struct seq_thread_info *info;
         struct dt_device *dt_dev;
         struct thandle *th;
         int rc;
-        int credits = SEQ_TXN_STORE_CREDITS;
         ENTRY;
 
         dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
-        info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
 
-        if (out != NULL)
-                credits += FLD_TXN_INDEX_INSERT_CREDITS;
-
-        txn_param_init(&info->sti_txn, credits);
-        th = dt_trans_start(env, dt_dev, &info->sti_txn);
+        th = seq_store_trans_create(seq, env);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
+        rc = seq_declare_store_write(seq, env, th);
+        if (rc)
+                GOTO(exit, rc);
+
+        if (out != NULL) {
+                rc = fld_declare_server_create(seq->lss_site->ms_server_fld,
+                                               env, th);
+                if (rc)
+                        GOTO(exit, rc);
+        }
+
+        rc = seq_store_trans_start(seq, env, th);
+        if (rc)
+                GOTO(exit, rc);
+
         rc = seq_store_write(seq, env, th);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                        seq->lss_name, rc);
-                GOTO(out,rc);
+                GOTO(exit,rc);
         } else if (out != NULL) {
                 rc = fld_server_create(seq->lss_site->ms_server_fld,
                                        env, out, th);
                 if (rc) {
                         CERROR("%s: Can't Update fld database, rc %d\n",
                                seq->lss_name, rc);
-                        GOTO(out,rc);
+                        GOTO(exit,rc);
                 }
         }
 
@@ -181,7 +224,7 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
                 sync = !!seq_update_cb_add(th, seq);
 
         th->th_sync |= sync;
-out:
+exit:
         dt_trans_stop(env, dt_dev, th);
         return rc;
 }
index ae9520a..872358c 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -111,6 +112,30 @@ static void __exit fld_mod_exit(void)
         }
 }
 
+int fld_declare_server_create(struct lu_server_fld *fld,
+                              const struct lu_env *env,
+                              struct thandle *th)
+{
+        struct dt_object *dt_obj = fld->lsf_obj;
+        int rc;
+
+        ENTRY;
+
+        /* for ldiskfs OSD it's enough to declare operation with any ops
+         * with DMU we'll probably need to specify exact key/value */
+        rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
+        if (rc)
+                GOTO(out, rc);
+        rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
+        if (rc)
+                GOTO(out, rc);
+        rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
+                                                      NULL, NULL, th);
+out:
+        RETURN(rc);
+}
+EXPORT_SYMBOL(fld_declare_server_create);
+
 /**
  * Insert FLD index entry and update FLD cache.
  *
@@ -245,7 +270,6 @@ out:
 
         RETURN(rc);
 }
-
 EXPORT_SYMBOL(fld_server_create);
 
 /**
index 997e781..a3277db 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -112,19 +113,24 @@ static struct dt_rec *fld_rec(const struct lu_env *env,
         RETURN((void *)rec);
 }
 
-struct thandle* fld_trans_start(struct lu_server_fld *fld,
-                                const struct lu_env *env, int credit)
+struct thandle *fld_trans_create(struct lu_server_fld *fld,
+                                const struct lu_env *env)
 {
-        struct fld_thread_info *info;
         struct dt_device *dt_dev;
-        struct txn_param *p;
 
         dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
-        p = &info->fti_txn_param;
-        txn_param_init(p, credit);
 
-        return dt_dev->dd_ops->dt_trans_start(env, dt_dev, p);
+        return dt_dev->dd_ops->dt_trans_create(env, dt_dev);
+}
+
+int fld_trans_start(struct lu_server_fld *fld,
+                                const struct lu_env *env, struct thandle *th)
+{
+        struct dt_device *dt_dev;
+
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+
+        return dt_dev->dd_ops->dt_trans_start(env, dt_dev, th);
 }
 
 void fld_trans_stop(struct lu_server_fld *fld,
@@ -136,6 +142,26 @@ void fld_trans_stop(struct lu_server_fld *fld,
         dt_dev->dd_ops->dt_trans_stop(env, th);
 }
 
+int fld_declare_index_create(struct lu_server_fld *fld,
+                             const struct lu_env *env,
+                             const struct lu_seq_range *range,
+                             struct thandle *th)
+{
+        struct dt_object *dt_obj = fld->lsf_obj;
+        seqno_t start;
+        int rc;
+
+        ENTRY;
+
+        start = range->lsr_start;
+        LASSERT(range_is_sane(range));
+
+        rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
+                                                      fld_rec(env, range),
+                                                      fld_key(env, start), th);
+        RETURN(rc);
+}
+
 /**
  * insert range in fld store.
  *
@@ -192,9 +218,8 @@ int fld_index_delete(struct lu_server_fld *fld,
 
         ENTRY;
 
-        rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
-                                              fld_key(env, seq), th,
-                                              BYPASS_CAPA);
+        rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, fld_key(env, seq),
+                                              th, BYPASS_CAPA);
 
         CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n",
                fld->lsf_name, PRANGE(range), rc);
@@ -254,11 +279,22 @@ static int fld_insert_igif_fld(struct lu_server_fld *fld,
 {
         struct thandle *th;
         int rc;
-
         ENTRY;
-        th = fld_trans_start(fld, env, FLD_TXN_INDEX_INSERT_CREDITS);
+
+        /* FLD_TXN_INDEX_INSERT_CREDITS */
+        th = fld_trans_create(fld, env);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
+        rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th);
+        if (rc) {
+                fld_trans_stop(fld, env, th);
+                RETURN(rc);
+        }
+        rc = fld_trans_start(fld, env, th);
+        if (rc) {
+                fld_trans_stop(fld, env, th);
+                RETURN(rc);
+        }
 
         rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
         fld_trans_stop(fld, env, th);
index 034d987..6179b3e 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -146,12 +147,13 @@ struct fld_thread_info {
         struct lu_seq_range fti_rec;
         struct lu_seq_range fti_lrange;
         struct lu_seq_range fti_irange;
-        struct txn_param    fti_txn_param;
 };
 
 
-struct thandle* fld_trans_start(struct lu_server_fld *fld,
-                                const struct lu_env *env, int credit);
+struct thandle *fld_trans_create(struct lu_server_fld *fld,
+                                const struct lu_env *env);
+int fld_trans_start(struct lu_server_fld *fld,
+                    const struct lu_env *env, struct thandle *th);
 
 void fld_trans_stop(struct lu_server_fld *fld,
                     const struct lu_env *env, struct thandle* th);
index db232da..d7db320 100644 (file)
@@ -65,11 +65,12 @@ struct proc_dir_entry;
 struct lustre_cfg;
 
 struct thandle;
-struct txn_param;
 struct dt_device;
 struct dt_object;
 struct dt_index_features;
 struct dt_quota_ctxt;
+struct niobuf_local;
+struct niobuf_remote;
 
 typedef enum {
         MNTOPT_USERXATTR        = 0x00000001,
@@ -82,6 +83,16 @@ struct dt_device_param {
         unsigned           ddp_block_shift;
         mntopt_t           ddp_mntopts;
         unsigned           ddp_max_ea_size;
+        void              *ddp_mnt; /* XXX: old code can retrieve mnt -bzzz */
+        int                ddp_mount_type;
+        unsigned long long ddp_maxbytes;
+        /* percentage of available space to reserve for grant error margin */
+        int                ddp_grant_reserved;
+        /* per-inode space consumption */
+        short              ddp_inodespace;
+        /* per-fragment grant overhead to be used by client for grant
+         * calculation */
+        int                ddp_grant_frag;
 };
 
 /**
@@ -100,25 +111,6 @@ struct dt_txn_commit_cb {
 };
 
 /**
- * Basic transaction credit op
- */
-enum dt_txn_op {
-        DTO_INDEX_INSERT,
-        DTO_INDEX_DELETE,
-        DTO_IDNEX_UPDATE,
-        DTO_OBJECT_CREATE,
-        DTO_OBJECT_DELETE,
-        DTO_ATTR_SET_BASE,
-        DTO_XATTR_SET,
-        DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
-        DTO_WRITE_BASE,
-        DTO_WRITE_BLOCK,
-        DTO_ATTR_SET_CHOWN,
-
-        DTO_NR
-};
-
-/**
  * Operations on dt device.
  */
 struct dt_device_operations {
@@ -126,17 +118,21 @@ struct dt_device_operations {
          * Return device-wide statistics.
          */
         int   (*dt_statfs)(const struct lu_env *env,
-                           struct dt_device *dev, cfs_kstatfs_t *sfs);
+                           struct dt_device *dev, cfs_kstatfs_t *osfs);
+        /**
+         * Create transaction, described by \a param.
+         */
+        struct thandle *(*dt_trans_create)(const struct lu_env *env,
+                                           struct dt_device *dev);
         /**
          * Start transaction, described by \a param.
          */
-        struct thandle *(*dt_trans_start)(const struct lu_env *env,
-                                          struct dt_device *dev,
-                                          struct txn_param *param);
+        int   (*dt_trans_start)(const struct lu_env *env,
+                                struct dt_device *dev, struct thandle *th);
         /**
          * Finish previously started transaction.
          */
-        void  (*dt_trans_stop)(const struct lu_env *env,
+        int   (*dt_trans_stop)(const struct lu_env *env,
                                struct thandle *th);
         /**
          * Add commit callback to the transaction.
@@ -182,12 +178,6 @@ struct dt_device_operations {
         void (*dt_init_quota_ctxt)(const struct lu_env *env,
                                    struct dt_device *dev,
                                    struct dt_quota_ctxt *ctxt, void *data);
-
-        /**
-         *  get transaction credits for given \a op.
-         */
-        int (*dt_credit_get)(const struct lu_env *env, struct dt_device *dev,
-                             enum dt_txn_op);
 };
 
 struct dt_index_features {
@@ -272,7 +262,6 @@ struct dt_object_format {
 
 enum dt_format_type dt_mode_to_dft(__u32 mode);
 
-/** Version type. May differ in DMU and ldiskfs */
 typedef __u64 dt_obj_version_t;
 
 /**
@@ -312,6 +301,10 @@ struct dt_object_operations {
          *
          * precondition: dt_object_exists(dt);
          */
+        int   (*do_declare_attr_set)(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     const struct lu_attr *attr,
+                                     struct thandle *handle);
         int   (*do_attr_set)(const struct lu_env *env,
                              struct dt_object *dt,
                              const struct lu_attr *attr,
@@ -332,6 +325,11 @@ struct dt_object_operations {
          *
          * precondition: dt_object_exists(dt);
          */
+        int   (*do_declare_xattr_set)(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      const struct lu_buf *buf,
+                                      const char *name, int fl,
+                                      struct thandle *handle);
         int   (*do_xattr_set)(const struct lu_env *env,
                               struct dt_object *dt, const struct lu_buf *buf,
                               const char *name, int fl, struct thandle *handle,
@@ -341,6 +339,9 @@ struct dt_object_operations {
          *
          * precondition: dt_object_exists(dt);
          */
+        int   (*do_declare_xattr_del)(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      const char *name, struct thandle *handle);
         int   (*do_xattr_del)(const struct lu_env *env,
                               struct dt_object *dt,
                               const char *name, struct thandle *handle,
@@ -371,6 +372,12 @@ struct dt_object_operations {
          * precondition: !dt_object_exists(dt);
          * postcondition: ergo(result == 0, dt_object_exists(dt));
          */
+        int   (*do_declare_create)(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   struct lu_attr *attr,
+                                   struct dt_allocation_hint *hint,
+                                   struct dt_object_format *dof,
+                                   struct thandle *th);
         int   (*do_create)(const struct lu_env *env, struct dt_object *dt,
                            struct lu_attr *attr,
                            struct dt_allocation_hint *hint,
@@ -378,6 +385,17 @@ struct dt_object_operations {
                            struct thandle *th);
 
         /**
+          Destroy object on this device
+         * precondition: !dt_object_exists(dt);
+         * postcondition: ergo(result == 0, dt_object_exists(dt));
+         */
+        int   (*do_declare_destroy)(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct thandle *th);
+        int   (*do_destroy)(const struct lu_env *env, struct dt_object *dt,
+                            struct thandle *th);
+
+        /**
          * Announce that this object is going to be used as an index. This
          * operation check that object supports indexing operations and
          * installs appropriate dt_index_operations vector on success.
@@ -392,13 +410,17 @@ struct dt_object_operations {
          * Add nlink of the object
          * precondition: dt_object_exists(dt);
          */
-        void  (*do_ref_add)(const struct lu_env *env,
+        int   (*do_declare_ref_add)(const struct lu_env *env,
+                                    struct dt_object *dt, struct thandle *th);
+        int   (*do_ref_add)(const struct lu_env *env,
                             struct dt_object *dt, struct thandle *th);
         /**
          * Del nlink of the object
          * precondition: dt_object_exists(dt);
          */
-        void  (*do_ref_del)(const struct lu_env *env,
+        int   (*do_declare_ref_del)(const struct lu_env *env,
+                                    struct dt_object *dt, struct thandle *th);
+        int   (*do_ref_del)(const struct lu_env *env,
                             struct dt_object *dt, struct thandle *th);
 
         struct obd_capa *(*do_capa_get)(const struct lu_env *env,
@@ -406,10 +428,6 @@ struct dt_object_operations {
                                         struct lustre_capa *old,
                                         __u64 opc);
         int (*do_object_sync)(const struct lu_env *, struct dt_object *);
-        dt_obj_version_t (*do_version_get)(const struct lu_env *env,
-                                           struct dt_object *dt);
-        void (*do_version_set)(const struct lu_env *env, struct dt_object *dt,
-                               dt_obj_version_t new_version);
         /**
          * Get object info of next level. Currently, only get inode from osd.
          * This is only used by quota b=16542
@@ -432,10 +450,66 @@ struct dt_body_operations {
         /**
          * precondition: dt_object_exists(dt);
          */
+        ssize_t (*dbo_declare_write)(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     const loff_t size, loff_t pos,
+                                     struct thandle *handle);
         ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
                              const struct lu_buf *buf, loff_t *pos,
                              struct thandle *handle, struct lustre_capa *capa,
                              int ignore_quota);
+        /*
+         * methods for zero-copy IO
+         */
+
+        /*
+         * precondition: dt_object_exists(dt);
+         * returns:
+         * < 0 - error code
+         * = 0 - illegal
+         * > 0 - number of local buffers prepared
+         */
+        int (*dbo_bufs_get)(const struct lu_env *env, struct dt_object *dt,
+                            loff_t pos, ssize_t len, struct niobuf_local *lb,
+                            int rw, struct lustre_capa *capa);
+        /*
+         * precondition: dt_object_exists(dt);
+         */
+        int (*dbo_bufs_put)(const struct lu_env *env, struct dt_object *dt,
+                            struct niobuf_local *lb, int nr);
+        /*
+         * precondition: dt_object_exists(dt);
+         */
+        int (*dbo_write_prep)(const struct lu_env *env, struct dt_object *dt,
+                              struct niobuf_local *lb, int nr);
+        /*
+         * precondition: dt_object_exists(dt);
+         */
+        int (*dbo_declare_write_commit)(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        struct niobuf_local *,
+                                        int, struct thandle *);
+        /*
+         * precondition: dt_object_exists(dt);
+         */
+        int (*dbo_write_commit)(const struct lu_env *env, struct dt_object *dt,
+                                struct niobuf_local *, int, struct thandle *);
+        /*
+         * precondition: dt_object_exists(dt);
+         */
+        int (*dbo_read_prep)(const struct lu_env *env, struct dt_object *dt,
+                             struct niobuf_local *lnb, int nr);
+        int (*dbo_fiemap_get)(const struct lu_env *env, struct dt_object *dt,
+                              struct ll_user_fiemap *fm);
+        /**
+         * Punch object's content
+         * precondition: regular object, not index
+         */
+        int   (*do_declare_punch)(const struct lu_env *, struct dt_object *,
+                                  __u64, __u64, struct thandle *th);
+        int   (*do_punch)(const struct lu_env *env, struct dt_object *dt,
+                          __u64 start, __u64 end, struct thandle *th,
+                          struct lustre_capa *capa);
 };
 
 /**
@@ -466,6 +540,11 @@ struct dt_index_operations {
         /**
          * precondition: dt_object_exists(dt);
          */
+        int (*dio_declare_insert)(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  const struct dt_rec *rec,
+                                  const struct dt_key *key,
+                                  struct thandle *handle);
         int (*dio_insert)(const struct lu_env *env, struct dt_object *dt,
                           const struct dt_rec *rec, const struct dt_key *key,
                           struct thandle *handle, struct lustre_capa *capa,
@@ -473,6 +552,10 @@ struct dt_index_operations {
         /**
          * precondition: dt_object_exists(dt);
          */
+        int (*dio_declare_delete)(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  const struct dt_key *key,
+                                  struct thandle *handle);
         int (*dio_delete)(const struct lu_env *env, struct dt_object *dt,
                           const struct dt_key *key, struct thandle *handle,
                           struct lustre_capa *capa);
@@ -504,12 +587,14 @@ struct dt_index_operations {
                                       const struct dt_it *di);
                 int            (*rec)(const struct lu_env *env,
                                       const struct dt_it *di,
-                                      struct lu_dirent *lde,
+                                      struct dt_rec *rec,
                                       __u32 attr);
                 __u64        (*store)(const struct lu_env *env,
                                       const struct dt_it *di);
                 int           (*load)(const struct lu_env *env,
                                       const struct dt_it *di, __u64 hash);
+                int        (*key_rec)(const struct lu_env *env,
+                                      const struct dt_it *di, void* key_rec);
         } dio_it;
 };
 
@@ -546,6 +631,12 @@ struct dt_object {
         const struct dt_index_operations  *do_index_ops;
 };
 
+static inline struct dt_object *lu2dt(struct lu_object *l)
+{
+        LASSERT(l == NULL || IS_ERR(l) || lu_device_is_dt(l->lo_dev));
+        return container_of0(l, struct dt_object, do_lu);
+}
+
 int  dt_object_init(struct dt_object *obj,
                     struct lu_object_header *h, struct lu_device *d);
 
@@ -556,23 +647,6 @@ static inline int dt_object_exists(const struct dt_object *dt)
         return lu_object_exists(&dt->do_lu);
 }
 
-struct txn_param {
-        /** number of blocks this transaction will modify */
-        unsigned int tp_credits;
-};
-
-static inline void txn_param_init(struct txn_param *p, unsigned int credits)
-{
-        memset(p, 0, sizeof(*p));
-        p->tp_credits = credits;
-}
-
-static inline void txn_param_credit_add(struct txn_param *p,
-                                        unsigned int credits)
-{
-        p->tp_credits += credits;
-}
-
 /**
  * This is the general purpose transaction handle.
  * 1. Transaction Life Cycle
@@ -591,14 +665,21 @@ struct thandle {
         /** the dt device on which the transactions are executed */
         struct dt_device *th_dev;
 
+        /** additional tags (layers can add in declare) */
+        __u32             th_tags;
+
         /** context for this transaction, tag is LCT_TX_HANDLE */
         struct lu_context th_ctx;
 
         /** the last operation result in this transaction.
          * this value is used in recovery */
         __s32             th_result;
+
         /** whether we need sync commit */
-        int               th_sync;
+        int               th_sync:1;
+
+        /* local transation, no need to inform other layers */
+        int               th_local:1;
 };
 
 /**
@@ -614,7 +695,7 @@ struct thandle {
  */
 struct dt_txn_callback {
         int (*dtc_txn_start)(const struct lu_env *env,
-                             struct txn_param *param, void *cookie);
+                             struct thandle *txn, void *cookie);
         int (*dtc_txn_stop)(const struct lu_env *env,
                             struct thandle *txn, void *cookie);
         void (*dtc_txn_commit)(struct thandle *txn, void *cookie);
@@ -627,7 +708,7 @@ void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb);
 void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb);
 
 int dt_txn_hook_start(const struct lu_env *env,
-                      struct dt_device *dev, struct txn_param *param);
+                      struct dt_device *dev, struct thandle *txn);
 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn);
 void dt_txn_hook_commit(struct thandle *txn);
 
@@ -657,19 +738,12 @@ struct dt_object *dt_locate(const struct lu_env *env,
                             struct dt_device *dev,
                             const struct lu_fid *fid);
 
-static inline dt_obj_version_t do_version_get(const struct lu_env *env,
-                                              struct dt_object *o)
-{
-        LASSERT(o->do_ops->do_version_get);
-        return o->do_ops->do_version_get(env, o);
-}
+int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
+                           struct thandle *th);
+void dt_version_set(const struct lu_env *env, struct dt_object *o,
+                    dt_obj_version_t version, struct thandle *th);
+dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o);
 
-static inline void do_version_set(const struct lu_env *env,
-                                  struct dt_object *o, dt_obj_version_t v)
-{
-        LASSERT(o->do_ops->do_version_set);
-        return o->do_ops->do_version_set(env, o, v);
-}
 
 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
                    struct lu_buf *buf, loff_t *pos);
@@ -677,17 +751,32 @@ int dt_record_write(const struct lu_env *env, struct dt_object *dt,
                     const struct lu_buf *buf, loff_t *pos, struct thandle *th);
 
 
-static inline struct thandle *dt_trans_start(const struct lu_env *env,
-                                             struct dt_device *d,
-                                             struct txn_param *p)
+static inline struct thandle *dt_trans_create(const struct lu_env *env,
+                                              struct dt_device *d)
 {
-        LASSERT(d->dd_ops->dt_trans_start);
-        return d->dd_ops->dt_trans_start(env, d, p);
+        LASSERT(d->dd_ops->dt_trans_create);
+        return d->dd_ops->dt_trans_create(env, d);
 }
 
-static inline void dt_trans_stop(const struct lu_env *env,
+static inline int dt_trans_start(const struct lu_env *env,
                                  struct dt_device *d, struct thandle *th)
 {
+        LASSERT(d->dd_ops->dt_trans_start);
+        return d->dd_ops->dt_trans_start(env, d, th);
+}
+
+/* for this transaction hooks shouldn't be called */
+static inline int dt_trans_start_local(const struct lu_env *env,
+                                       struct dt_device *d, struct thandle *th)
+{
+        LASSERT(d->dd_ops->dt_trans_start);
+        th->th_local = 1;
+        return d->dd_ops->dt_trans_start(env, d, th);
+}
+
+static inline int dt_trans_stop(const struct lu_env *env,
+                                struct dt_device *d, struct thandle *th)
+{
         LASSERT(d->dd_ops->dt_trans_stop);
         return d->dd_ops->dt_trans_stop(env, th);
 }
@@ -699,4 +788,465 @@ static inline int dt_trans_cb_add(struct thandle *th,
         return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb);
 }
 /** @} dt */
+
+
+static inline int dt_declare_record_write(const struct lu_env *env,
+                                          struct dt_object *dt,
+                                          int size, loff_t pos,
+                                          struct thandle *th)
+{
+        int rc;
+
+        LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
+        LASSERT(th != NULL);
+        rc = dt->do_body_ops->dbo_declare_write(env, dt, size, pos, th);
+        return rc;
+}
+
+static inline int dt_declare_create(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct lu_attr *attr,
+                                    struct dt_allocation_hint *hint,
+                                    struct dt_object_format *dof,
+                                    struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_declare_create);
+        return dt->do_ops->do_declare_create(env, dt, attr, hint, dof, th);
+}
+
+static inline int dt_create(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct lu_attr *attr,
+                                    struct dt_allocation_hint *hint,
+                                    struct dt_object_format *dof,
+                                    struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_create);
+        return dt->do_ops->do_create(env, dt, attr, hint, dof, th);
+}
+
+static inline int dt_declare_destroy(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_declare_destroy);
+        return dt->do_ops->do_declare_destroy(env, dt, th);
+}
+
+static inline int dt_destroy(const struct lu_env *env,
+                             struct dt_object *dt,
+                             struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_destroy);
+        return dt->do_ops->do_destroy(env, dt, th);
+}
+
+static inline void dt_read_lock(const struct lu_env *env,
+                                struct dt_object *dt,
+                                unsigned role)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_read_lock);
+        dt->do_ops->do_read_lock(env, dt, role);
+}
+
+static inline void dt_write_lock(const struct lu_env *env,
+                                struct dt_object *dt,
+                                unsigned role)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_write_lock);
+        dt->do_ops->do_write_lock(env, dt, role);
+}
+
+static inline void dt_read_unlock(const struct lu_env *env,
+                                struct dt_object *dt)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_read_unlock);
+        dt->do_ops->do_read_unlock(env, dt);
+}
+
+static inline void dt_write_unlock(const struct lu_env *env,
+                                struct dt_object *dt)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_write_unlock);
+        dt->do_ops->do_write_unlock(env, dt);
+}
+
+static inline int dt_write_locked(const struct lu_env *env,
+                                  struct dt_object *dt)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_write_locked);
+        return dt->do_ops->do_write_locked(env, dt);
+}
+
+static inline int dt_attr_get(const struct lu_env *env, struct dt_object *dt,
+                              struct lu_attr *la, void *arg)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_attr_get);
+        return dt->do_ops->do_attr_get(env, dt, la, arg);
+}
+
+static inline int dt_declare_attr_set(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      const struct lu_attr *la,
+                                      struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_declare_attr_set);
+        return dt->do_ops->do_declare_attr_set(env, dt, la, th);
+}
+
+static inline int dt_attr_set(const struct lu_env *env, struct dt_object *dt,
+                              const struct lu_attr *la, struct thandle *th,
+                              struct lustre_capa *capa)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_attr_set);
+        return dt->do_ops->do_attr_set(env, dt, la, th, capa);
+}
+
+static inline int dt_declare_ref_add(const struct lu_env *env,
+                                     struct dt_object *dt, struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_declare_ref_add);
+        return dt->do_ops->do_declare_ref_add(env, dt, th);
+}
+
+static inline int dt_ref_add(const struct lu_env *env,
+                             struct dt_object *dt, struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_ref_add);
+        return dt->do_ops->do_ref_add(env, dt, th);
+}
+
+static inline int dt_declare_ref_del(const struct lu_env *env,
+                                     struct dt_object *dt, struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_declare_ref_del);
+        return dt->do_ops->do_declare_ref_del(env, dt, th);
+}
+
+static inline int dt_ref_del(const struct lu_env *env,
+                             struct dt_object *dt, struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_ref_del);
+        return dt->do_ops->do_ref_del(env, dt, th);
+}
+
+static inline struct obd_capa *dt_capa_get(const struct lu_env *env,
+                                           struct dt_object *dt,
+                                           struct lustre_capa *old, __u64 opc)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_ref_del);
+        return dt->do_ops->do_capa_get(env, dt, old, opc);
+}
+
+static inline int dt_bufs_get(const struct lu_env *env, struct dt_object *d,
+                              struct niobuf_remote *rnb,
+                              struct niobuf_local *lnb, int rw,
+                              struct lustre_capa *capa)
+{
+        LASSERT(d);
+        LASSERT(d->do_body_ops);
+        LASSERT(d->do_body_ops->dbo_bufs_get);
+        return d->do_body_ops->dbo_bufs_get(env, d, rnb->offset,
+                                            rnb->len, lnb, rw, capa);
+}
+
+static inline int dt_bufs_put(const struct lu_env *env, struct dt_object *d,
+                              struct niobuf_local *lnb, int n)
+{
+        LASSERT(d);
+        LASSERT(d->do_body_ops);
+        LASSERT(d->do_body_ops->dbo_bufs_put);
+        return d->do_body_ops->dbo_bufs_put(env, d, lnb, n);
+}
+
+static inline int dt_write_prep(const struct lu_env *env, struct dt_object *d,
+                                struct niobuf_local *lnb, int n)
+{
+        LASSERT(d);
+        LASSERT(d->do_body_ops);
+        LASSERT(d->do_body_ops->dbo_write_prep);
+        return d->do_body_ops->dbo_write_prep(env, d, lnb, n);
+}
+
+static inline int dt_declare_write_commit(const struct lu_env *env,
+                                          struct dt_object *d,
+                                          struct niobuf_local *lnb,
+                                          int n, struct thandle *th)
+{
+        LASSERTF(d != NULL, "dt is NULL when we want to declare write\n");
+        LASSERT(th != NULL);
+        return d->do_body_ops->dbo_declare_write_commit(env, d, lnb, n, th);
+}
+
+
+static inline int dt_write_commit(const struct lu_env *env,
+                                  struct dt_object *d, struct niobuf_local *lnb,
+                                  int n, struct thandle *th)
+{
+        LASSERT(d);
+        LASSERT(d->do_body_ops);
+        LASSERT(d->do_body_ops->dbo_write_commit);
+        return d->do_body_ops->dbo_write_commit(env, d, lnb, n, th);
+}
+
+static inline int dt_read_prep(const struct lu_env *env, struct dt_object *d,
+                               struct niobuf_local *lnb, int n)
+{
+        LASSERT(d);
+        LASSERT(d->do_body_ops);
+        LASSERT(d->do_body_ops->dbo_read_prep);
+        return d->do_body_ops->dbo_read_prep(env, d, lnb, n);
+}
+
+static inline int dt_declare_punch(const struct lu_env *env,
+                                   struct dt_object *dt, __u64 start,
+                                   __u64 end, struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_body_ops);
+        LASSERT(dt->do_body_ops->do_declare_punch);
+        return dt->do_body_ops->do_declare_punch(env, dt, start, end, th);
+}
+
+static inline int dt_punch(const struct lu_env *env, struct dt_object *dt,
+                           __u64 start, __u64 end, struct thandle *th,
+                           struct lustre_capa *capa)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_body_ops);
+        LASSERT(dt->do_body_ops->do_punch);
+        return dt->do_body_ops->do_punch(env, dt, start, end, th, capa);
+}
+
+static inline int dt_fiemap_get(const struct lu_env *env, struct dt_object *d,
+                                struct ll_user_fiemap *fm)
+{
+        LASSERT(d);
+        if (d->do_body_ops == NULL)
+                return -EPROTO;
+        LASSERT(d->do_body_ops->dbo_fiemap_get);
+        return d->do_body_ops->dbo_fiemap_get(env, d, fm);
+}
+
+static inline int dt_statfs(const struct lu_env *env, struct dt_device *dev,
+                            cfs_kstatfs_t *osfs)
+{
+        LASSERT(dev);
+        LASSERT(dev->dd_ops);
+        LASSERT(dev->dd_ops->dt_statfs);
+        return dev->dd_ops->dt_statfs(env, dev, osfs);
+}
+
+static inline int dt_root_get(const struct lu_env *env, struct dt_device *dev,
+                              struct lu_fid *f)
+{
+        LASSERT(dev);
+        LASSERT(dev->dd_ops);
+        LASSERT(dev->dd_ops->dt_root_get);
+        return dev->dd_ops->dt_root_get(env, dev, f);
+}
+
+static inline void dt_conf_get(const struct lu_env *env,
+                               const struct dt_device *dev,
+                               struct dt_device_param *param)
+{
+        LASSERT(dev);
+        LASSERT(dev->dd_ops);
+        LASSERT(dev->dd_ops->dt_conf_get);
+        return dev->dd_ops->dt_conf_get(env, dev, param);
+}
+
+static inline int dt_sync(const struct lu_env *env, struct dt_device *dev)
+{
+        LASSERT(dev);
+        LASSERT(dev->dd_ops);
+        LASSERT(dev->dd_ops->dt_sync);
+        return dev->dd_ops->dt_sync(env, dev);
+}
+
+static inline void dt_ro(const struct lu_env *env, struct dt_device *dev)
+{
+        LASSERT(dev);
+        LASSERT(dev->dd_ops);
+        LASSERT(dev->dd_ops->dt_ro);
+        return dev->dd_ops->dt_ro(env, dev);
+}
+
+static inline int dt_declare_insert(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const struct dt_rec *rec,
+                                    const struct dt_key *key,
+                                    struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_index_ops);
+        LASSERT(dt->do_index_ops->dio_declare_insert);
+        return dt->do_index_ops->dio_declare_insert(env, dt, rec, key, th);
+}
+
+static inline int dt_insert(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const struct dt_rec *rec,
+                                    const struct dt_key *key,
+                                    struct thandle *th,
+                                    struct lustre_capa *capa,
+                                    int noquota)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_index_ops);
+        LASSERT(dt->do_index_ops->dio_insert);
+        return dt->do_index_ops->dio_insert(env, dt, rec, key, th,
+                                            capa, noquota);
+}
+
+static inline int dt_declare_xattr_del(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const char *name,
+                                       struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_declare_xattr_del);
+        return dt->do_ops->do_declare_xattr_del(env, dt, name, th);
+}
+
+static inline int dt_xattr_del(const struct lu_env *env,
+                               struct dt_object *dt, const char *name,
+                               struct thandle *th,
+                               struct lustre_capa *capa)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_xattr_del);
+        return dt->do_ops->do_xattr_del(env, dt, name, th, capa);
+}
+
+static inline int dt_declare_xattr_set(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      const struct lu_buf *buf,
+                                      const char *name, int fl,
+                                      struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_declare_xattr_set);
+        return dt->do_ops->do_declare_xattr_set(env, dt, buf, name, fl, th);
+}
+
+static inline int dt_xattr_set(const struct lu_env *env,
+                              struct dt_object *dt, const struct lu_buf *buf,
+                              const char *name, int fl, struct thandle *th,
+                              struct lustre_capa *capa)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_xattr_set);
+        return dt->do_ops->do_xattr_set(env, dt, buf, name, fl, th, capa);
+}
+
+static inline int dt_xattr_get(const struct lu_env *env,
+                              struct dt_object *dt, struct lu_buf *buf,
+                              const char *name, struct lustre_capa *capa)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_xattr_get);
+        return dt->do_ops->do_xattr_get(env, dt, buf, name, capa);
+}
+
+static inline int dt_xattr_list(const struct lu_env *env,
+                               struct dt_object *dt, struct lu_buf *buf,
+                               struct lustre_capa *capa)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_ops);
+        LASSERT(dt->do_ops->do_xattr_list);
+        return dt->do_ops->do_xattr_list(env, dt, buf, capa);
+}
+
+static inline int dt_declare_delete(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const struct dt_key *key,
+                                    struct thandle *th)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_index_ops);
+        LASSERT(dt->do_index_ops->dio_declare_delete);
+        return dt->do_index_ops->dio_declare_delete(env, dt, key, th);
+}
+
+static inline int dt_delete(const struct lu_env *env,
+                            struct dt_object *dt,
+                            const struct dt_key *key,
+                            struct thandle *th,
+                            struct lustre_capa *capa)
+{
+        LASSERT(dt);
+        LASSERT(dt->do_index_ops);
+        LASSERT(dt->do_index_ops->dio_delete);
+        return dt->do_index_ops->dio_delete(env, dt, key, th, capa);
+}
+
+static inline int dt_commit_async(const struct lu_env *env,
+                                  struct dt_device *dev)
+{
+        LASSERT(dev);
+        LASSERT(dev->dd_ops);
+        LASSERT(dev->dd_ops->dt_commit_async);
+        return dev->dd_ops->dt_commit_async(env, dev);
+}
+
+static inline int dt_lookup(const struct lu_env *env,
+                            struct dt_object *dt,
+                            struct dt_rec *rec,
+                            const struct dt_key *key,
+                            struct lustre_capa *capa)
+{
+        int ret;
+
+        LASSERT(dt);
+        LASSERT(dt->do_index_ops);
+        LASSERT(dt->do_index_ops->dio_lookup);
+
+        ret = dt->do_index_ops->dio_lookup(env, dt, rec, key, capa);
+        if (ret > 0)
+                ret = 0;
+        else if (ret == 0)
+                ret = -ENOENT;
+        return ret;
+}
 #endif /* __LUSTRE_DT_OBJECT_H */
index f030e2b..c7b341e 100644 (file)
@@ -1337,6 +1337,7 @@ struct lov_mds_md_v1 {            /* LOV EA mds/wire data (little-endian) */
 #define XATTR_NAME_LMA          "trusted.lma"
 #define XATTR_NAME_LMV          "trusted.lmv"
 #define XATTR_NAME_LINK         "trusted.link"
+#define XATTR_NAME_VERSION      "trusted.version"
 
 
 struct lov_mds_md_v3 {            /* LOV EA mds/wire data (little-endian) */
index a0328d1..256879c 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -155,6 +156,10 @@ int fld_server_init(struct lu_server_fld *fld,
 void fld_server_fini(struct lu_server_fld *fld,
                      const struct lu_env *env);
 
+int fld_declare_server_create(struct lu_server_fld *fld,
+                              const struct lu_env *env,
+                              struct thandle *th);
+
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_env *env,
                       struct lu_seq_range *add_range,
index 0346116..f55856c 100644 (file)
@@ -278,10 +278,6 @@ struct md_object_operations {
                             struct lustre_capa *, int renewal);
 
         int (*moo_object_sync)(const struct lu_env *, struct md_object *);
-        dt_obj_version_t (*moo_version_get)(const struct lu_env *,
-                                            struct md_object *);
-        void (*moo_version_set)(const struct lu_env *, struct md_object *,
-                                dt_obj_version_t);
         int (*moo_path)(const struct lu_env *env, struct md_object *obj,
                         char *path, int pathlen, __u64 *recno, int *linkno);
         int (*moo_file_lock)(const struct lu_env *env, struct md_object *obj,
@@ -756,20 +752,6 @@ static inline int mo_object_sync(const struct lu_env *env, struct md_object *m)
         return m->mo_ops->moo_object_sync(env, m);
 }
 
-static inline dt_obj_version_t mo_version_get(const struct lu_env *env,
-                                              struct md_object *m)
-{
-        LASSERT(m->mo_ops->moo_version_get);
-        return m->mo_ops->moo_version_get(env, m);
-}
-
-static inline void mo_version_set(const struct lu_env *env,
-                                  struct md_object *m, dt_obj_version_t ver)
-{
-        LASSERT(m->mo_ops->moo_version_set);
-        return m->mo_ops->moo_version_set(env, m, ver);
-}
-
 static inline int mo_file_lock(const struct lu_env *env, struct md_object *m,
                                struct lov_mds_md *lmm,
                                struct ldlm_extent *extent,
index 2747214..0c891fa 100644 (file)
@@ -91,7 +91,7 @@ static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
         mdd->mdd_child = lu2dt_dev(next);
 
         /* Prepare transactions callbacks. */
-        mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
+        mdd->mdd_txn_cb.dtc_txn_start = NULL;
         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
         mdd->mdd_txn_cb.dtc_txn_commit = NULL;
         mdd->mdd_txn_cb.dtc_cookie = mdd;
@@ -136,6 +136,10 @@ static void mdd_device_shutdown(const struct lu_env *env,
         if (m->mdd_obd_dev)
                 mdd_fini_obd(env, m, cfg);
         orph_index_fini(env, m);
+        if (m->mdd_capa != NULL) {
+                lu_object_put(env, &m->mdd_capa->do_lu);
+                m->mdd_capa = NULL;
+        }
         /* remove upcall device*/
         md_upcall_fini(&m->mdd_md_dev);
         EXIT;
@@ -367,6 +371,7 @@ int mdd_changelog_llog_cancel(struct mdd_device *mdd, long long endrec)
            time.  In case of crash, we just restart with old log so we're
            allright. */
         if (endrec == cur) {
+                /* XXX: transaction is started by llog itself */
                 rc = mdd_changelog_write_header(mdd, CLM_PURGE);
                 if (rc)
                       goto out;
@@ -377,6 +382,7 @@ int mdd_changelog_llog_cancel(struct mdd_device *mdd, long long endrec)
            changed since the last purge) */
         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
 
+        /* XXX: transaction is started by llog itself */
         rc = llog_cancel(ctxt, NULL, 1, (struct llog_cookie *)&endrec, 0);
 out:
         llog_ctxt_put(ctxt);
@@ -409,6 +415,7 @@ int mdd_changelog_write_header(struct mdd_device *mdd, int markerflags)
         /* Status and action flags */
         rec->cr.cr_markerflags = mdd->mdd_cl.mc_flags | markerflags;
 
+        /* XXX: transaction is started by llog itself */
         rc = (mdd->mdd_cl.mc_mask & (1 << CL_MARK)) ?
                 mdd_changelog_llog_write(mdd, rec, NULL) : 0;
 
@@ -559,19 +566,6 @@ static int dot_lustre_mdd_object_sync(const struct lu_env *env,
         return -ENOSYS;
 }
 
-static dt_obj_version_t dot_lustre_mdd_version_get(const struct lu_env *env,
-                                                   struct md_object *obj)
-{
-        return 0;
-}
-
-static void dot_lustre_mdd_version_set(const struct lu_env *env,
-                                       struct md_object *obj,
-                                       dt_obj_version_t version)
-{
-        return;
-}
-
 static int dot_lustre_mdd_path(const struct lu_env *env, struct md_object *obj,
                            char *path, int pathlen, __u64 *recno, int *linkno)
 {
@@ -608,8 +602,6 @@ static struct md_object_operations mdd_dot_lustre_obj_ops = {
         .moo_close         = dot_lustre_mdd_close,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = dot_lustre_mdd_object_sync,
-        .moo_version_get   = dot_lustre_mdd_version_get,
-        .moo_version_set   = dot_lustre_mdd_version_set,
         .moo_path          = dot_lustre_mdd_path,
         .moo_file_lock     = dot_file_lock,
         .moo_file_unlock   = dot_file_unlock,
@@ -994,12 +986,9 @@ static int mdd_process_config(const struct lu_env *env,
 
                 rc = mdd_init_obd(env, m, cfg);
                 if (rc) {
-                        CERROR("lov init error %d \n", rc);
+                        CERROR("lov init error %d\n", rc);
                         GOTO(out, rc);
                 }
-                rc = mdd_txn_init_credits(env, m);
-                if (rc)
-                        break;
 
                 mdd_changelog_init(env, m);
                 break;
@@ -1089,6 +1078,7 @@ static int mdd_prepare(const struct lu_env *env,
         struct mdd_device *mdd = lu2mdd_dev(cdev);
         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
         struct dt_object *root;
+        struct lu_fid     fid;
         int rc;
 
         ENTRY;
@@ -1115,6 +1105,14 @@ static int mdd_prepare(const struct lu_env *env,
                 GOTO(out, rc);
         }
 
+        /* we use capa file to declare llog changes,
+         * will be fixed with new llog in 2.3 */
+        root = dt_store_open(env, mdd->mdd_child, "", CAPA_KEYS, &fid);
+        if (!IS_ERR(root))
+                mdd->mdd_capa = root;
+        else
+                rc = PTR_ERR(root);
+
 out:
         RETURN(rc);
 }
@@ -1350,6 +1348,35 @@ out:
         RETURN(rc);
 }
 
+int mdd_declare_llog_cancel(const struct lu_env *env, struct mdd_device *mdd,
+                            struct thandle *handle)
+{
+        int rc;
+
+
+        /* XXX: this is a temporary solution to declare llog changes
+         *      will be fixed in 2.3 with new llog implementation */
+
+        LASSERT(mdd->mdd_capa);
+
+        /* the llog record could be canceled either by modifying
+         * the plain llog's header or by destroying the llog itself
+         * when this record is the last one in it, it can't be known
+         * here, but the catlog's header will also be modified for
+         * the second case, then the first case can be covered and
+         * is no need to declare it */
+
+        /* destroy empty plain log */
+        rc = dt_declare_destroy(env, mdd->mdd_capa, handle);
+        if (rc)
+                return rc;
+
+        /* record the catlog's header if an empty plain log was destroyed */
+        rc = dt_declare_record_write(env, mdd->mdd_capa,
+                                     sizeof(struct llog_logid_rec), 0, handle);
+        return rc;
+}
+
 struct mdd_changelog_user_data {
         __u64 mcud_endrec; /**< purge record for this user */
         __u64 mcud_minrec; /**< lowest changelog recno still referenced */
@@ -1401,7 +1428,7 @@ static int mdd_changelog_user_purge_cb(struct llog_handle *llh,
         /* Special case: unregister this user */
         if (mcud->mcud_endrec == MCUD_UNREGISTER) {
                 struct llog_cookie cookie;
-                void *trans_h;
+                void *th;
                 struct mdd_device *mdd = mcud->mcud_mdd;
 
                 cookie.lgc_lgl = llh->lgh_id;
@@ -1409,20 +1436,27 @@ static int mdd_changelog_user_purge_cb(struct llog_handle *llh,
 
                 /* XXX This is a workaround for the deadlock of changelog
                  * adding vs. changelog cancelling. LU-81. */
-                mdd_txn_param_build(mcud->mcud_env, mdd, MDD_TXN_UNLINK_OP, 0);
-                trans_h = mdd_trans_start(mcud->mcud_env, mdd);
-                if (IS_ERR(trans_h)) {
-                        CERROR("fsfilt_start_log failed: %ld\n",
-                               PTR_ERR(trans_h));
-                        RETURN(PTR_ERR(trans_h));
+                th = mdd_trans_create(mcud->mcud_env, mdd);
+                if (IS_ERR(th)) {
+                        CERROR("Cannot get thandle\n");
+                        RETURN(-ENOMEM);
                 }
 
+                rc = mdd_declare_llog_cancel(mcud->mcud_env, mdd, th); 
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_trans_start(mcud->mcud_env, mdd, th);
+                if (rc)
+                        GOTO(stop, rc);
+
                 rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle,
                                              1, &cookie);
                 if (rc == 0)
                         mcud->mcud_usercount--;
 
-                mdd_trans_stop(mcud->mcud_env, mdd, rc, trans_h);
+stop:
+                mdd_trans_stop(mcud->mcud_env, mdd, 0, th);
                 RETURN(rc);
         }
 
index 14c0ed8..38ec54a 100644 (file)
@@ -81,6 +81,9 @@ static struct lu_name lname_dotdot = {
 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                         const struct lu_name *lname, struct lu_fid* fid,
                         int mask);
+static int mdd_declare_links_add(const struct lu_env *env,
+                                 struct mdd_object *mdd_obj,
+                                 struct thandle *handle);
 static int mdd_links_add(const struct lu_env *env,
                          struct mdd_object *mdd_obj,
                          const struct lu_fid *pfid,
@@ -609,6 +612,63 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
+int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
+                            int reclen, struct thandle *handle)
+{
+        int rc;
+
+        /* XXX: this is a temporary solution to declare llog changes
+         *      will be fixed in 2.3 with new llog implementation */
+
+        LASSERT(mdd->mdd_capa);
+
+        /* record itself */
+        rc = dt_declare_record_write(env, mdd->mdd_capa, reclen, 0, handle);
+        if (rc)
+                return rc;
+
+        /* header will be updated as well */
+        rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE,
+                                     0, handle);
+        if (rc)
+                return rc;
+
+        /* also we should be able to create new plain log */
+        rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
+        if (rc)
+                return rc;
+
+        /* new record referencing new plain llog */
+        rc = dt_declare_record_write(env, mdd->mdd_capa,
+                                     sizeof(struct llog_logid_rec), 0, handle);
+        if (rc)
+                return rc;
+
+        /* catalog's header will be updated as well */
+        rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE,
+                                     0, handle);
+
+        return rc;
+}
+
+int mdd_declare_changelog_store(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                const struct lu_name *fname,
+                                struct thandle *handle)
+{
+        int reclen;
+
+        /* Not recording */
+        if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+                return 0;
+
+        /* we'll be writing payload + llog header */
+        reclen = sizeof(struct llog_changelog_rec);
+        if (fname)
+                reclen += llog_data_len(fname->ln_namelen);
+
+        return mdd_declare_llog_record(env, mdd, reclen, handle);
+}
 
 /** Store a namespace change changelog record
  * If this fails, we must fail the whole transaction; we don't
@@ -674,6 +734,40 @@ static int mdd_changelog_ns_store(const struct lu_env  *env,
         return 0;
 }
 
+static int mdd_declare_link(const struct lu_env *env,
+                            struct mdd_device *mdd,
+                            struct mdd_object *p,
+                            struct mdd_object *c,
+                            const struct lu_name *name,
+                            struct thandle *handle)
+{
+        int rc;
+
+        rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_ref_add(env, c, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_attr_set(env, p, NULL, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_attr_set(env, c, NULL, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_links_add(env, c, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_changelog_store(env, mdd, name, handle);
+
+        return rc;
+}
+
 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                     struct md_object *src_obj, const struct lu_name *lname,
                     struct md_attr *ma)
@@ -713,11 +807,18 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         }
 #endif
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP, 1);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
@@ -758,6 +859,7 @@ out_trans:
         if (rc == 0)
                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
                                             mdd_tobj, NULL, lname, handle);
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 out_pending:
 #ifdef HAVE_QUOTA_SUPPORT
@@ -773,6 +875,20 @@ out_pending:
         return rc;
 }
 
+int mdd_declare_finish_unlink(const struct lu_env *env,
+                              struct mdd_object *obj,
+                              struct md_attr *ma,
+                              struct thandle *handle)
+{
+        int rc;
+
+        rc = orph_declare_index_insert(env, obj, handle);
+        if (rc)
+                return rc;
+
+        return mdd_declare_object_kill(env, obj, ma, handle);
+}
+
 /* caller should take a lock before calling */
 int mdd_finish_unlink(const struct lu_env *env,
                       struct mdd_object *obj, struct md_attr *ma,
@@ -805,7 +921,7 @@ int mdd_finish_unlink(const struct lu_env *env,
                                         PFID(mdd_object_fid(obj)),
                                         obj->mod_count);
                 } else {
-                        rc = mdd_object_kill(env, obj, ma);
+                        rc = mdd_object_kill(env, obj, ma, th);
                         if (rc == 0)
                                 reset = 0;
                 }
@@ -832,6 +948,50 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
+static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
+                              struct mdd_object *p, struct mdd_object *c,
+                              const struct lu_name *name, struct md_attr *ma,
+                              struct thandle *handle)
+{
+        int rc;
+
+        rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_ref_del(env, p, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_ref_del(env, c, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_ref_del(env, c, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_attr_set(env, p, NULL, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_attr_set(env, c, NULL, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_finish_unlink(env, c, ma, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_links_add(env, c, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_changelog_store(env, mdd, name, handle);
+
+        return rc;
+}
+
 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                       struct md_object *cobj, const struct lu_name *lname,
                       struct md_attr *ma)
@@ -857,14 +1017,19 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
                  PFID(mdd_object_fid(mdd_cobj)));
 
-        rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP, 1);
-        if (rc)
-                RETURN(rc);
-
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
+                                lname, ma, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
@@ -942,6 +1107,7 @@ out_trans:
                          mdd_cobj, mdd_pobj, NULL, lname, handle);
         }
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc)
@@ -998,6 +1164,10 @@ static int mdd_name_insert(const struct lu_env *env,
         int rc;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota) {
                 if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
@@ -1019,11 +1189,12 @@ static int mdd_name_insert(const struct lu_env *env,
                 }
         }
 #endif
-        mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP, 0);
-        handle = mdd_trans_start(env, mdo2mdd(pobj));
+        handle = mdd_trans_create(env, mdo2mdd(pobj));
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_trans_start(env, mdo2mdd(pobj), handle);
+
         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
@@ -1113,6 +1284,10 @@ static int mdd_name_remove(const struct lu_env *env,
         int rc;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota) {
                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
@@ -1124,11 +1299,12 @@ static int mdd_name_remove(const struct lu_env *env,
                 }
         }
 #endif
-        mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP, 0);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
@@ -1223,6 +1399,10 @@ static int mdd_rename_tgt(const struct lu_env *env,
         int rc;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota && !tobj) {
                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
@@ -1240,14 +1420,12 @@ static int mdd_rename_tgt(const struct lu_env *env,
                 }
         }
 #endif
-        if (tobj && mdd_object_exists(mdd_tobj))
-                mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_TGT_OP,1);
-        else
-                mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP, 1);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
@@ -1355,6 +1533,27 @@ static int mdd_cd_sanity_check(const struct lu_env *env,
 
 }
 
+static int mdd_declare_create_data(const struct lu_env *env,
+                                   struct mdd_device *mdd,
+                                   struct mdd_object *obj,
+                                   int lmm_size,
+                                   struct thandle *handle)
+{
+        struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
+        int            rc;
+
+        buf->lb_buf = NULL;
+        buf->lb_len = lmm_size;
+        rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+                                   0, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_lov_objid_update(env, mdd, handle);
+
+        return rc;
+}
+
 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
                            struct md_object *cobj, const struct md_op_spec *spec,
                            struct md_attr *ma)
@@ -1381,11 +1580,18 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 RETURN(rc);
 
-        mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_CREATE_DATA_OP, 0);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_free, rc = PTR_ERR(handle));
 
+        rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         /*
          * XXX: Setting the lov ea is not locked but setting the attr is locked?
          * Should this be fixed?
@@ -1411,6 +1617,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
         if (rc == 0)
                 mdd_lov_objid_update(mdd, lmm);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 out_free:
         /* Finish mdd_lov_create() stuff. */
@@ -1468,6 +1675,26 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         RETURN(rc);
 }
 
+int mdd_declare_object_initialize(const struct lu_env *env,
+                                  struct mdd_object *child,
+                                  struct md_attr *ma,
+                                  struct thandle *handle)
+{
+        int rc;
+
+        rc = mdo_declare_attr_set(env, child, &ma->ma_attr, handle);
+        if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) {
+                rc = mdo_declare_index_insert(env, child, mdo2fid(child),
+                                dot, handle);
+                if (rc == 0)
+                        rc = mdo_declare_ref_add(env, child, handle);
+        }
+        if (rc == 0)
+                mdd_declare_links_add(env, child, handle);
+
+        return rc;
+}
+
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                           const struct lu_name *lname, struct mdd_object *child,
                           struct md_attr *ma, struct thandle *handle,
@@ -1590,6 +1817,76 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int mdd_declare_create(const struct lu_env *env,
+                              struct mdd_device *mdd,
+                              struct mdd_object *p,
+                              struct mdd_object *c,
+                              const struct lu_name *name,
+                              struct md_attr *ma,
+                              int lmm_size,
+                              struct thandle *handle,
+                              const struct md_op_spec *spec)
+{
+        struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
+        int            rc = 0;
+
+        rc = mdd_declare_object_create_internal(env, p, c, ma, handle, spec);
+        if (rc)
+                GOTO(out, rc);
+
+        /* if dir, then can inherit default ACl */
+        buf->lb_buf = NULL;
+        buf->lb_len = lmm_size;
+        if (S_ISDIR(ma->ma_attr.la_mode)) {
+                rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_DEFAULT,
+                                           0, handle);
+                if (rc == 0)
+                        rc = mdo_declare_ref_add(env, p, handle);
+        }
+        if (rc)
+                GOTO(out, rc);
+
+        rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_ACCESS,
+                                   0, handle);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = mdd_declare_object_initialize(env, c, ma, handle);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = mdo_declare_index_insert(env, p, mdo2fid(c),
+                                      name->ln_name, handle);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
+                                   0, handle);
+        if (rc)
+                GOTO(out, rc);
+
+        if (S_ISLNK(ma->ma_attr.la_mode)) {
+                rc = dt_declare_record_write(env, mdd_object_child(c),
+                                             strlen(spec->u.sp_symname), 0,
+                                             handle);
+                if (rc)
+                        GOTO(out, rc);
+        }
+        rc = mdo_declare_attr_set(env, p, &ma->ma_attr, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_changelog_store(env, mdd, name, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_lov_objid_update(env, mdd, handle);
+
+out:
+        return rc;
+}
+
+
 /*
  * Create object and insert it into namespace.
  */
@@ -1738,11 +2035,19 @@ static int mdd_create(const struct lu_env *env,
                         got_def_acl = 1;
         }
 
-        mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_MKDIR_OP, 1);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_free, rc = PTR_ERR(handle));
 
+        rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, ma,
+                                lmm_size, handle, spec);
+        if (rc)
+                GOTO(out_stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(out_stop, rc);
+
         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
@@ -1869,6 +2174,7 @@ out_trans:
                             S_ISREG(attr->la_mode) ? CL_CREATE :
                             S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
                             0, son, mdd_pobj, NULL, lname, handle);
+out_stop:
         mdd_trans_stop(env, mdd, rc, handle);
 out_free:
         /* finish lov_create stuff, free all temporary data */
@@ -1974,6 +2280,124 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int mdd_declare_rename(const struct lu_env *env,
+                              struct mdd_device *mdd,
+                              struct mdd_object *mdd_spobj,
+                              struct mdd_object *mdd_tpobj,
+                              struct mdd_object *mdd_sobj,
+                              struct mdd_object *mdd_tobj,
+                              const struct lu_name *sname,
+                              const struct lu_name *tname,
+                              struct md_attr *ma,
+                              struct thandle *handle)
+{
+        int rc;
+
+        LASSERT(mdd_spobj);
+        LASSERT(mdd_tpobj);
+        LASSERT(mdd_sobj);
+
+        /* name from source dir */
+        rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
+        if (rc)
+                return rc;
+
+        /* .. from source child */
+        if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+                /* source child can be directory,
+                 * counted by source dir's nlink */
+                rc = mdo_declare_ref_del(env, mdd_spobj, handle);
+                if (rc)
+                        return rc;
+
+                rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle);
+                if (rc)
+                        return rc;
+
+                rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj),
+                                              dotdot, handle);
+                if (rc)
+                        return rc;
+
+                /* new target child can be directory,
+                 * counted by target dir's nlink */
+                rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
+                if (rc)
+                        return rc;
+
+        }
+
+        rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle);
+        if (rc)
+                return rc;
+        mdd_declare_links_add(env, mdd_sobj, handle);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle);
+        if (rc)
+                return rc;
+
+        /* new name */
+        rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
+                        tname->ln_name, handle);
+        if (rc)
+                return rc;
+
+        /* name from target dir (old name), we declare it unconditionally
+         * as mdd_rename() calls delete unconditionally as well. so just
+         * to balance declarations vs calls to change ... */
+        rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
+        if (rc)
+                return rc;
+
+        if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
+                /* delete target child in target parent directory */
+                rc = mdo_declare_ref_del(env, mdd_tobj, handle);
+                if (rc)
+                        return rc;
+
+                if (S_ISDIR(mdd_object_type(mdd_tobj))) {
+                        /* target child can be directory,
+                         * delete "." reference in target child directory */
+                        rc = mdo_declare_ref_del(env, mdd_tobj, handle);
+                        if (rc)
+                                return rc;
+
+                        /* delete ".." reference in target parent directory */
+                        rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
+                        if (rc)
+                                return rc;
+                }
+
+                rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle);
+                if (rc)
+                        return rc;
+
+                mdd_declare_links_add(env, mdd_tobj, handle);
+                if (rc)
+                        return rc;
+
+                rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
+                if (rc)
+                        return rc;
+        }
+
+        rc = mdd_declare_changelog_store(env, mdd, tname, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_changelog_store(env, mdd, sname, handle);
+        if (rc)
+                return rc;
+
+        return rc;
+}
+
 /* src object can be remote that is why we use only fid and type of object */
 static int mdd_rename(const struct lu_env *env,
                       struct md_object *src_pobj, struct md_object *tgt_pobj,
@@ -2040,14 +2464,21 @@ static int mdd_rename(const struct lu_env *env,
                 }
         }
 #endif
-        if (tobj && mdd_object_exists(mdd_tobj))
-                mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_OP, 2);
-        else
-                mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP, 2);
-        handle = mdd_trans_start(env, mdd);
+        mdd_sobj = mdd_object_find(env, mdd, lf);
+
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
+                                mdd_tobj, lsname, ltname, ma, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         /* FIXME: Should consider tobj and sobj too in rename_lock. */
         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
         if (rc < 0)
@@ -2073,7 +2504,6 @@ static int mdd_rename(const struct lu_env *env,
         if (sdlh == NULL || tdlh == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
 
-        mdd_sobj = mdd_object_find(env, mdd, lf);
         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
                                      mdd_sobj, mdd_tobj, ma);
         if (rc)
@@ -2258,6 +2688,7 @@ cleanup_unlocked:
                                             ltname, handle);
         }
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
         if (mdd_sobj)
                 mdd_object_put(env, mdd_sobj);
@@ -2399,6 +2830,20 @@ static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf,
         return 0;
 }
 
+static int mdd_declare_links_add(const struct lu_env *env,
+                                 struct mdd_object *mdd_obj,
+                                 struct thandle *handle)
+{
+        int rc;
+
+        /* XXX: max size? */
+        rc = mdo_declare_xattr_set(env, mdd_obj,
+                             mdd_buf_get_const(env, NULL, 4096),
+                             XATTR_NAME_LINK, 0, handle);
+
+        return rc;
+}
+
 /* For pathologic linkers, we don't want to spend lots of time scanning the
  * link ea.  Limit ourseleves to something reasonable; links not in the EA
  * can be looked up via (slower) parent lookup.
index 1a525a3..4080998 100644 (file)
@@ -71,28 +71,6 @@ static inline void mdd_quota_wrapper(struct lu_attr *la, unsigned int *qids)
  * are already protected by ldlm lock */
 #define MDD_DISABLE_PDO_LOCK    1
 
-enum mdd_txn_op {
-        MDD_TXN_OBJECT_DESTROY_OP = 0,
-        MDD_TXN_OBJECT_CREATE_OP,
-        MDD_TXN_ATTR_SET_OP,
-        MDD_TXN_XATTR_SET_OP,
-        MDD_TXN_INDEX_INSERT_OP,
-        MDD_TXN_INDEX_DELETE_OP,
-        MDD_TXN_LINK_OP,
-        MDD_TXN_UNLINK_OP,
-        MDD_TXN_RENAME_OP,
-        MDD_TXN_RENAME_TGT_OP,
-        MDD_TXN_CREATE_DATA_OP,
-        MDD_TXN_MKDIR_OP,
-        MDD_TXN_CLOSE_OP,
-        MDD_TXN_LAST_OP
-};
-
-struct mdd_txn_op_descr {
-        enum mdd_txn_op mod_op;
-        unsigned int    mod_credits;
-};
-
 /* Changelog flags */
 /** changelog is recording */
 #define CLM_ON    0x00001
@@ -128,10 +106,10 @@ struct mdd_device {
         struct lu_fid                    mdd_root_fid;
         struct dt_device_param           mdd_dt_conf;
         struct dt_object                *mdd_orphans;
+        struct dt_object                *mdd_capa;
         struct dt_txn_callback           mdd_txn_cb;
         cfs_proc_dir_entry_t            *mdd_proc_entry;
         struct lprocfs_stats            *mdd_stats;
-        struct mdd_txn_op_descr          mdd_tod[MDD_TXN_LAST_OP];
         struct mdd_changelog             mdd_cl;
         unsigned long                    mdd_atime_diff;
         struct mdd_object               *mdd_dot_lustre;
@@ -173,7 +151,6 @@ struct mdd_object {
 };
 
 struct mdd_thread_info {
-        struct txn_param          mti_param;
         struct lu_fid             mti_fid;
         struct lu_fid             mti_fid2; /* used for be & cpu converting */
         struct lu_attr            mti_la;
@@ -225,6 +202,8 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                    struct lov_mds_md **lmm, int *lmm_size,
                    const struct md_op_spec *spec, struct lu_attr *la);
 int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm);
+int mdd_declare_lov_objid_update(const struct lu_env *, struct mdd_device *,
+                                 struct thandle *);
 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm);
 void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
                            struct lov_mds_md *lmm, int lmm_size,
@@ -251,8 +230,10 @@ int mdd_attr_check_set_internal(const struct lu_env *env,
                                 struct lu_attr *attr,
                                 struct thandle *handle,
                                 int needacl);
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+                            struct md_attr *ma, struct thandle *handle);
 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma);
+                    struct md_attr *ma, struct thandle *handle);
 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
                   struct md_attr *ma);
 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
@@ -325,6 +306,8 @@ void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
                     struct lu_name *lname, struct lu_fid *pfid);
 
 /* mdd_lov.c */
+int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj,
+                           struct md_attr *ma, struct thandle *handle);
 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
                    struct mdd_object *mdd_cobj, struct md_attr *ma);
 
@@ -353,7 +336,10 @@ int __mdd_orphan_del(const struct lu_env *, struct mdd_object *,
                      struct thandle *);
 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd);
 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd);
-int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd);
+int orph_declare_index_insert(const struct lu_env *, struct mdd_object *,
+                              struct thandle *);
+int orph_declare_index_delete(const struct lu_env *, struct mdd_object *,
+                              struct thandle *);
 
 /* mdd_lproc.c */
 void lprocfs_mdd_init_vars(struct lprocfs_static_vars *lvars);
@@ -382,8 +368,20 @@ struct mdd_object *mdd_object_find(const struct lu_env *env,
 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm);
 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                  const struct lu_rdpg *rdpg);
+int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
+                            int reclen, struct thandle *handle);
+int mdd_declare_changelog_store(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                const struct lu_name *fname,
+                                struct thandle *handle);
 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
                   int flags, struct md_object *obj);
+int mdd_declare_object_create_internal(const struct lu_env *env,
+                                       struct mdd_object *p,
+                                       struct mdd_object *c,
+                                       struct md_attr *ma,
+                                       struct thandle *handle,
+                                       const struct md_op_spec *spec);
 /* mdd_quota.c*/
 #ifdef HAVE_QUOTA_SUPPORT
 int mdd_quota_notify(const struct lu_env *env, struct md_device *m);
@@ -416,19 +414,6 @@ int mdd_quota_finvalidate(const struct lu_env *env, struct md_device *m,
 #endif
 
 /* mdd_trans.c */
-void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
-                         enum mdd_txn_op, int changelog_cnt);
-int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
-                               struct lov_mds_md *lmm, enum mdd_txn_op op,
-                               int changelog_cnt);
-int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                            struct md_attr *ma, enum mdd_txn_op,
-                            int changelog_cnt);
-void mdd_setattr_txn_param_build(const struct lu_env *env,
-                                 struct md_object *obj,
-                                 struct md_attr *ma, enum mdd_txn_op,
-                                 int changelog_cnt);
-
 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
                     struct mdd_object *obj, struct lu_attr *la);
 
@@ -438,17 +423,16 @@ static inline void mdd_object_put(const struct lu_env *env,
         lu_object_put(env, &o->mod_obj.mo_lu);
 }
 
-struct thandle* mdd_trans_start(const struct lu_env *env,
-                                       struct mdd_device *);
-
+struct thandle *mdd_trans_create(const struct lu_env *env,
+                                 struct mdd_device *mdd);
+int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd,
+                    struct thandle *th);
 void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
                     int rc, struct thandle *handle);
-
-int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param,
-                     void *cookie);
-
 int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
                     void *cookie);
+int mdd_txn_start_cb(const struct lu_env *env, struct thandle *,
+                     void *cookie);
 
 /* mdd_device.c */
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
@@ -481,6 +465,8 @@ int mdd_def_acl_get(const struct lu_env *env, struct mdd_object *mdd_obj,
                     struct md_attr *ma);
 int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
                   struct thandle *handle);
+int __mdd_declare_acl_init(const struct lu_env *env, struct mdd_object *obj,
+                           int is_dir, struct thandle *handle);
 int __mdd_acl_init(const struct lu_env *env, struct mdd_object *obj,
                    struct lu_buf *buf, __u32 *mode, struct thandle *handle);
 int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
@@ -673,8 +659,19 @@ static inline int mdo_attr_get(const struct lu_env *env, struct mdd_object *obj,
         return next->do_ops->do_attr_get(env, next, la, capa);
 }
 
-static inline int mdo_attr_set(const struct lu_env *env, struct mdd_object *obj,
-                               const struct lu_attr *la, struct thandle *handle,
+static inline int mdo_declare_attr_set(const struct lu_env *env,
+                                       struct mdd_object *obj,
+                                       const struct lu_attr *la,
+                                       struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(obj);
+        return dt_declare_attr_set(env, next, la, handle);
+}
+
+static inline int mdo_attr_set(const struct lu_env *env,
+                               struct mdd_object *obj,
+                               const struct lu_attr *la,
+                               struct thandle *handle,
                                struct lustre_capa *capa)
 {
         struct dt_object *next = mdd_object_child(obj);
@@ -690,6 +687,16 @@ static inline int mdo_xattr_get(const struct lu_env *env,struct mdd_object *obj,
         return next->do_ops->do_xattr_get(env, next, buf, name, capa);
 }
 
+static inline int mdo_declare_xattr_set(const struct lu_env *env,
+                                        struct mdd_object *obj,
+                                        const struct lu_buf *buf,
+                                        const char *name,
+                                        int fl, struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(obj);
+        return dt_declare_xattr_set(env, next, buf, name, fl, handle);
+}
+
 static inline int mdo_xattr_set(const struct lu_env *env,struct mdd_object *obj,
                                 const struct lu_buf *buf, const char *name,
                                 int fl, struct thandle *handle,
@@ -701,6 +708,15 @@ static inline int mdo_xattr_set(const struct lu_env *env,struct mdd_object *obj,
                                           capa);
 }
 
+static inline int mdo_declare_xattr_del(const struct lu_env *env,
+                                        struct mdd_object *obj,
+                                        const char *name,
+                                        struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(obj);
+        return dt_declare_xattr_del(env, next, name, handle);
+}
+
 static inline int mdo_xattr_del(const struct lu_env *env,struct mdd_object *obj,
                                 const char *name, struct thandle *handle,
                                 struct lustre_capa *capa)
@@ -727,16 +743,69 @@ int mdo_index_try(const struct lu_env *env, struct mdd_object *obj,
         return next->do_ops->do_index_try(env, next, feat);
 }
 
-static inline void mdo_ref_add(const struct lu_env *env, struct mdd_object *obj,
-                               struct thandle *handle)
+static inline
+int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
+                             const struct lu_fid *fid, const char *name,
+                             struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(obj);
+        int              rc = 0;
+
+        /*
+         * if the object doesn't exist yet, then it's supposed to be created
+         * and declaration of the creation should be enough to insert ./..
+         */
+        if (mdd_object_exists(obj)) {
+                rc = -ENOTDIR;
+                if (dt_try_as_dir(env, next))
+                        rc = dt_declare_insert(env, next,
+                                               (struct dt_rec *)fid,
+                                               (const struct dt_key *)name,
+                                               handle);
+        }
+
+        return rc;
+}
+
+static inline
+int mdo_declare_index_delete(const struct lu_env *env, struct mdd_object *obj,
+                             const char *name, struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(obj);
+
+        if (!dt_try_as_dir(env, next))
+                return -ENOTDIR;
+
+        return dt_declare_delete(env, next, (const struct dt_key *)name,
+                                 handle);
+}
+
+static inline int mdo_declare_ref_add(const struct lu_env *env,
+                                      struct mdd_object *obj,
+                                      struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(obj);
+        return dt_declare_ref_add(env, next, handle);
+}
+
+static inline int mdo_ref_add(const struct lu_env *env, struct mdd_object *obj,
+                              struct thandle *handle)
 {
         struct dt_object *next = mdd_object_child(obj);
         LASSERT(mdd_object_exists(obj));
         return next->do_ops->do_ref_add(env, next, handle);
 }
 
-static inline void mdo_ref_del(const struct lu_env *env, struct mdd_object *obj,
-                               struct thandle *handle)
+static inline int mdo_declare_ref_del(const struct lu_env *env,
+                                      struct mdd_object *obj,
+                                      struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(obj);
+        return dt_declare_ref_del(env, next, handle);
+}
+
+static inline int mdo_ref_del(const struct lu_env *env, struct mdd_object *obj,
+                              struct thandle *handle)
 {
         struct dt_object *next = mdd_object_child(obj);
         LASSERT(mdd_object_exists(obj));
@@ -744,6 +813,18 @@ static inline void mdo_ref_del(const struct lu_env *env, struct mdd_object *obj,
 }
 
 static inline
+int mdo_declare_create_obj(const struct lu_env *env, struct mdd_object *o,
+                           struct lu_attr *attr,
+                           struct dt_allocation_hint *hint,
+                           struct dt_object_format *dof,
+                           struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(o);
+        return next->do_ops->do_declare_create(env, next, attr, hint,
+                                               dof, handle);
+}
+
+static inline
 int mdo_create_obj(const struct lu_env *env, struct mdd_object *o,
                    struct lu_attr *attr,
                    struct dt_allocation_hint *hint,
@@ -754,6 +835,22 @@ int mdo_create_obj(const struct lu_env *env, struct mdd_object *o,
         return next->do_ops->do_create(env, next, attr, hint, dof, handle);
 }
 
+static inline
+int mdo_declare_destroy(const struct lu_env *env, struct mdd_object *o,
+                        struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(o);
+        return dt_declare_destroy(env, next, handle);
+}
+
+static inline
+int mdo_destroy(const struct lu_env *env, struct mdd_object *o,
+                struct thandle *handle)
+{
+        struct dt_object *next = mdd_object_child(o);
+        return dt_destroy(env, next, handle);
+}
+
 static inline struct obd_capa *mdo_capa_get(const struct lu_env *env,
                                             struct mdd_object *obj,
                                             struct lustre_capa *old,
index 69df68e..e9a1417 100644 (file)
@@ -369,6 +369,24 @@ int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm)
         return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
 }
 
+int mdd_declare_lov_objid_update(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct thandle *handle)
+{
+        struct obd_device *obd = mdd2obd_dev(mdd);
+        int size;
+
+        /* in prepare we create local files */
+        if (unlikely(mdd->mdd_capa == NULL))
+                return 0;
+
+        /* XXX: this is a temporary solution to declare llog changes
+         *      will be fixed in 2.3 with new llog implementation */
+
+        size = obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id);
+        return dt_declare_record_write(env, mdd->mdd_capa, size, 0, handle);
+}
+
 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
 {
         /* copy mds_lov code is using wrong layer */
@@ -661,6 +679,48 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
         RETURN(rc);
 }
 
+int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj,
+                           struct md_attr *ma, struct thandle *handle)
+{
+        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+        int rc, stripe, i;
+
+        LASSERT(obj);
+        LASSERT(ma);
+
+        if (!S_ISREG(lu_object_attr(&obj->mod_obj.mo_lu)))
+                return 0;
+
+        rc = mdd_lmm_get_locked(env, obj, ma);
+        if (rc || !(ma->ma_valid & MA_LOV))
+                return rc;
+
+        LASSERT(ma->ma_lmm);
+        if (le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V1 &&
+                        le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V3) {
+                CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
+                                mdd->mdd_obd_dev->obd_name,
+                                le32_to_cpu(ma->ma_lmm->lmm_magic),
+                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
+                return -EINVAL;
+        }
+
+        if ((int)le32_to_cpu(ma->ma_lmm->lmm_stripe_count) < 0)
+                stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
+        else
+                stripe = le32_to_cpu(ma->ma_lmm->lmm_stripe_count);
+
+        for (i = 0; i < stripe; i++) {
+                rc = mdd_declare_llog_record(env, mdd,
+                                             sizeof(struct llog_unlink_rec),
+                                             handle);
+                if (rc)
+                        return rc;
+        }
+
+        return rc;
+}
+
 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
                    struct mdd_object *mdd_cobj, struct md_attr *ma)
 {
index fa25ba2..9c85d46 100644 (file)
@@ -55,6 +55,7 @@
 #include <lprocfs_status.h>
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
+#include <obd_lov.h>
 
 #include <lustre_param.h>
 #include <lustre_mds.h>
@@ -870,6 +871,30 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
         RETURN(rc);
 }
 
+int mdd_declare_object_create_internal(const struct lu_env *env,
+                                       struct mdd_object *p,
+                                       struct mdd_object *c,
+                                       struct md_attr *ma,
+                                       struct thandle *handle,
+                                       const struct md_op_spec *spec)
+{
+        struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+        const struct dt_index_features *feat = spec->sp_feat;
+        int rc;
+        ENTRY;
+
+        if (feat != &dt_directory_features && feat != NULL)
+                dof->dof_type = DFT_INDEX;
+        else
+                dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
+
+        dof->u.dof_idx.di_feat = feat;
+
+        rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
+
+        RETURN(rc);
+}
+
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                                struct mdd_object *c, struct md_attr *ma,
                                struct thandle *handle,
@@ -1279,6 +1304,7 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
                 RETURN(0);
 
         LASSERT(mdd_obj != NULL);
+        LASSERT(handle != NULL);
 
         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
@@ -1300,20 +1326,11 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         rec->cr.cr_namelen = 0;
         mdd_obj->mod_cltime = cfs_time_current_64();
 
-        if (handle == NULL) {
-                /* Used for the close event only for now. */
-                LASSERT(type == CL_CLOSE);
-                LASSERT(mdd_env_info(env)->mti_param.tp_credits != 0);
-                th = mdd_trans_start(env, mdd);
-                if (IS_ERR(th))
-                        GOTO(err, rc = PTR_ERR(th));
-        }
-
         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
 
         if (th)
                 mdd_trans_stop(env, mdd, rc, th);
-err:
+
         if (rc < 0) {
                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
                        rc, type, PFID(tfid));
@@ -1332,14 +1349,22 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
         int rc;
         ENTRY;
 
-        handle = mdd_trans_start(env, mdd);
-
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 return(PTR_ERR(handle));
 
+        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
                                       handle);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1452,6 +1477,69 @@ static int mdd_attr_set_changelog(const struct lu_env *env,
                                         md2mdd_obj(obj), handle);
 }
 
+static int mdd_declare_attr_set(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                                const struct md_attr *ma,
+                                struct lov_mds_md *lmm,
+                                struct thandle *handle)
+{
+        struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
+        int             rc, stripe, i;
+
+        rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+        if (rc)
+                return rc;
+
+        if (ma->ma_valid & MA_LOV) {
+                buf->lb_buf = NULL;
+                buf->lb_len = ma->ma_lmm_size;
+                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+                                           0, handle);
+                if (rc)
+                        return rc;
+        }
+
+        if (ma->ma_valid & (MA_HSM | MA_SOM)) {
+                buf->lb_buf = NULL;
+                buf->lb_len = sizeof(struct lustre_mdt_attrs);
+                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
+                                           0, handle);
+                if (rc)
+                        return rc;
+        }
+
+        /* basically the log is the same as in unlink case */
+        if (lmm) {
+                if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
+                                le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
+                        CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
+                               mdd->mdd_obd_dev->obd_name,
+                               le32_to_cpu(lmm->lmm_magic),
+                               PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
+                        return -EINVAL;
+                }
+
+                stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
+                if ((int)le32_to_cpu(lmm->lmm_stripe_count) >= 0)
+                        stripe = le32_to_cpu(lmm->lmm_stripe_count);
+
+                for (i = 0; i < stripe; i++) {
+                        rc = mdd_declare_llog_record(env, mdd,
+                                        sizeof(struct llog_unlink_rec),
+                                        handle);
+                        if (rc)
+                                return rc;
+                }
+        }
+
+        return rc;
+}
+
 /* set attr and LOV EA at once, return updated attr */
 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         const struct md_attr *ma)
@@ -1461,7 +1549,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct thandle *handle;
         struct lov_mds_md *lmm = NULL;
         struct llog_cookie *logcookies = NULL;
-        int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
+        int  rc, lmm_size = 0, cookie_size = 0;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         struct obd_device *obd = mdd->mdd_obd_dev;
         struct mds_obd *mds = &obd->u.mds;
@@ -1484,33 +1572,36 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
                 RETURN(0);
 
-        /*TODO: add lock here*/
-        /* start a log jounal handle if needed */
         if (S_ISREG(mdd_object_type(mdd_obj)) &&
             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
                 lmm_size = mdd_lov_mdsize(env, mdd);
                 lmm = mdd_max_lmm_get(env, mdd);
                 if (lmm == NULL)
-                        GOTO(no_trans, rc = -ENOMEM);
+                        RETURN(-ENOMEM);
 
                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
                                 XATTR_NAME_LOV);
 
                 if (rc < 0)
-                        GOTO(no_trans, rc);
-        }
-
-        chlog_cnt = 1;
-        if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
-                chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
-                         lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
+                        RETURN(rc);
         }
 
-        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
-                                    MDD_TXN_ATTR_SET_OP, chlog_cnt);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
-                GOTO(no_trans, rc = PTR_ERR(handle));
+                RETURN(PTR_ERR(handle));
+
+        rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
+                                  lmm_size > 0 ? lmm : NULL, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        /* permission changes may require sync operation */
+        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+                handle->th_sync = !!mdd->mdd_sync_permission;
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
 
         /* permission changes may require sync operation */
         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
@@ -1595,8 +1686,8 @@ cleanup:
         if (rc == 0)
                 rc = mdd_attr_set_changelog(env, obj, handle,
                                             ma->ma_attr.la_valid);
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
-no_trans:
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
@@ -1654,6 +1745,27 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int mdd_declare_xattr_set(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *obj,
+                                 const struct lu_buf *buf,
+                                 const char *name,
+                                 struct thandle *handle)
+
+{
+        int rc;
+
+        rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
+        if (rc)
+                return rc;
+
+        /* Only record user xattr changes */
+        if ((strncmp("user.", name, 5) == 0))
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+        return rc;
+}
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1672,12 +1784,24 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
         /* security-replated changes may require sync */
+        if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
+            mdd->mdd_sync_permission == 1)
+                handle->th_sync = 1;
+
+        rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        /* security-replated changes may require sync */
         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
                 handle->th_sync |= mdd->mdd_sync_permission;
 
@@ -1692,11 +1816,32 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
+
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
 }
 
+static int mdd_declare_xattr_del(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *obj,
+                                 const char *name,
+                                 struct thandle *handle)
+{
+        int rc;
+
+        rc = mdo_declare_xattr_del(env, obj, name, handle);
+        if (rc)
+                return rc;
+
+        /* Only record user xattr changes */
+        if ((strncmp("user.", name, 5) == 0))
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+        return rc;
+}
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1714,11 +1859,18 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdo_xattr_del(env, mdd_obj, name, handle,
                            mdd_object_capa(env, mdd_obj));
@@ -1734,6 +1886,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1756,6 +1909,10 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
         /*
          * Check -ENOENT early here because we need to get object type
          * to calculate credits before transaction start
@@ -1765,14 +1922,12 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
 
         LASSERT(mdd_object_exists(mdd_obj) > 0);
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
-        if (rc)
-                RETURN(rc);
-
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
 
         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
@@ -1865,6 +2020,10 @@ static int mdd_object_create(const struct lu_env *env,
         int rc = 0;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota) {
                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
@@ -1890,11 +2049,12 @@ static int mdd_object_create(const struct lu_env *env,
         }
 #endif
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
         if (rc)
@@ -1977,11 +2137,16 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
-        handle = mdd_trans_start(env, mdd);
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
         if (rc == 0)
@@ -2104,10 +2269,22 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
         return rc;
 }
 
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+                            struct md_attr *ma, struct thandle *handle)
+{
+        int rc;
+
+        rc = mdd_declare_unlink_log(env, obj, ma, handle);
+        if (rc)
+                return rc;
+
+        return mdo_declare_destroy(env, obj, handle);
+}
+
 /* return md_attr back,
  * if it is last unlink then return lov ea + llog cookie*/
 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma)
+                    struct md_attr *ma, struct thandle *handle)
 {
         int rc = 0;
         ENTRY;
@@ -2121,9 +2298,27 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
                                             obj, ma);
         }
+
+        if (rc == 0)
+                rc = mdo_destroy(env, obj, handle);
+
         RETURN(rc);
 }
 
+static int mdd_declare_close(const struct lu_env *env,
+                             struct mdd_object *obj,
+                             struct md_attr *ma,
+                             struct thandle *handle)
+{
+        int rc;
+
+        rc = orph_declare_index_delete(env, obj, handle);
+        if (rc)
+                return rc;
+
+        return mdd_declare_object_kill(env, obj, ma, handle);
+}
+
 /*
  * No permission check is needed.
  */
@@ -2157,13 +2352,21 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         if (mdd_obj->mod_count == 1 &&
             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
  again:
-                rc = mdd_log_txn_param_build(env, obj, ma,
-                                             MDD_TXN_UNLINK_OP, 1);
-                if (rc)
-                        RETURN(rc);
-                handle = mdd_trans_start(env, mdo2mdd(obj));
+                handle = mdd_trans_create(env, mdo2mdd(obj));
                 if (IS_ERR(handle))
                         RETURN(PTR_ERR(handle));
+
+                rc = mdd_declare_close(env, mdd_obj, ma, handle);
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                if (rc)
+                        GOTO(stop, rc);
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
@@ -2178,6 +2381,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
 
         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
                 /* remove link to object from orphan index */
+                LASSERT(handle != NULL);
                 rc = __mdd_orphan_del(env, mdd_obj, handle);
                 if (rc == 0) {
                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
@@ -2210,7 +2414,27 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
                 } else {
-                        rc = mdd_object_kill(env, mdd_obj, ma);
+                        if (handle == NULL) {
+                                handle = mdd_trans_create(env, mdo2mdd(obj));
+                                if (IS_ERR(handle))
+                                        GOTO(out, rc = PTR_ERR(handle));
+
+                                rc = mdd_declare_object_kill(env, mdd_obj, ma,
+                                                             handle);
+                                if (rc)
+                                        GOTO(out, rc);
+
+                                rc = mdd_declare_changelog_store(env, mdd,
+                                                                 NULL, handle);
+                                if (rc)
+                                        GOTO(stop, rc);
+
+                                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                                if (rc)
+                                        GOTO(out, rc);
+                        }
+
+                        rc = mdd_object_kill(env, mdd_obj, ma, handle);
                         if (rc == 0)
                                 reset = 0;
                 }
@@ -2231,12 +2455,26 @@ out:
         if (rc == 0 &&
             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
-                if (handle == 0)
-                        mdd_txn_param_build(env, mdd, MDD_TXN_CLOSE_OP, 1);
+                if (handle == NULL) {
+                        handle = mdd_trans_create(env, mdo2mdd(obj));
+                        if (IS_ERR(handle))
+                                GOTO(stop, rc = IS_ERR(handle));
+
+                        rc = mdd_declare_changelog_store(env, mdd, NULL,
+                                                         handle);
+                        if (rc)
+                                GOTO(stop, rc);
+
+                        rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                        if (rc)
+                                GOTO(stop, rc);
+                }
+
                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
                                          mdd_obj, handle);
         }
 
+stop:
         if (handle != NULL)
                 mdd_trans_stop(env, mdd, rc, handle);
 #ifdef HAVE_QUOTA_SUPPORT
@@ -2289,7 +2527,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 int    len;
                 int    recsize;
 
-                len  = iops->key_size(env, it);
+                len = iops->key_size(env, it);
 
                 /* IAM iterator can return record with zero len. */
                 if (len == 0)
@@ -2305,7 +2543,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 recsize = lu_dirent_calc_size(len, attr);
 
                 if (nob >= recsize) {
-                        result = iops->rec(env, it, ent, attr);
+                        result = iops->rec(env, it, (struct dt_rec *)ent, attr);
                         if (result == -ESTALE)
                                 goto next;
                         if (result != 0)
@@ -2501,24 +2739,6 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
         return next->do_ops->do_object_sync(env, next);
 }
 
-static dt_obj_version_t mdd_version_get(const struct lu_env *env,
-                                        struct md_object *obj)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        return do_version_get(env, mdd_object_child(mdd_obj));
-}
-
-static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
-                            dt_obj_version_t version)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        do_version_set(env, mdd_object_child(mdd_obj), version);
-}
-
 const struct md_object_operations mdd_obj_ops = {
         .moo_permission    = mdd_permission,
         .moo_attr_get      = mdd_attr_get,
@@ -2537,8 +2757,6 @@ const struct md_object_operations mdd_obj_ops = {
         .moo_changelog     = mdd_changelog,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = mdd_object_sync,
-        .moo_version_get   = mdd_version_get,
-        .moo_version_set   = mdd_version_set,
         .moo_path          = mdd_path,
         .moo_file_lock     = mdd_file_lock,
         .moo_file_unlock   = mdd_file_unlock,
index 2902df5..108988a 100644 (file)
@@ -57,6 +57,7 @@
 #include "mdd_internal.h"
 
 const char orph_index_name[] = "PENDING";
+const char *dotdot = "..";
 
 enum {
         ORPH_OP_UNLINK,
@@ -182,6 +183,41 @@ static inline void mdd_orphan_ref_del(const struct lu_env *env,
 }
 
 
+int orph_declare_index_insert(const struct lu_env *env,
+                              struct mdd_object *obj,
+                              struct thandle *th)
+{
+        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+        int                rc;
+
+        rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_ref_add(env, obj, th);
+        if (rc)
+                return rc;
+
+        if (!S_ISDIR(mdd_object_type(obj)))
+                return 0;
+
+        rc = mdo_declare_ref_add(env, obj, th);
+        if (rc)
+                return rc;
+
+        rc = dt_declare_ref_add(env, mdd->mdd_orphans, th);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_index_delete(env, obj, dotdot, th);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_index_insert(env, obj, NULL, dotdot, th);
+
+        return rc;
+}
+
 static int orph_index_insert(const struct lu_env *env,
                              struct mdd_object *obj,
                              __u32 op,
@@ -191,7 +227,6 @@ static int orph_index_insert(const struct lu_env *env,
         struct dt_object        *dor    = mdd->mdd_orphans;
         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
         struct dt_object        *next   = mdd_object_child(obj);
-        const struct dt_key     *dotdot = (const struct dt_key *) "..";
         int rc;
         ENTRY;
 
@@ -217,11 +252,13 @@ static int orph_index_insert(const struct lu_env *env,
         if (!dt_try_as_dir(env, next))
                 goto out;
         next->do_index_ops->dio_delete(env, next,
-                                       dotdot, th, BYPASS_CAPA);
+                                       (const struct dt_key *)dotdot,
+                                       th, BYPASS_CAPA);
 
         next->do_index_ops->dio_insert(env, next,
-                                       (struct dt_rec *) lf_dor,
-                                       dotdot, th, BYPASS_CAPA, 1);
+                                       (struct dt_rec *)lf_dor,
+                                       (const struct dt_key *)dotdot,
+                                       th, BYPASS_CAPA, 1);
 
 out:
         if (rc == 0)
@@ -264,9 +301,36 @@ static int orphan_object_kill(const struct lu_env *env,
                 if (rc == 0)
                         rc = mdd_lov_destroy(env, mdd, obj, la);
         }
+        mdo_destroy(env, obj, th);
         RETURN(rc);
 }
 
+int orph_declare_index_delete(const struct lu_env *env,
+                              struct mdd_object *obj,
+                              struct thandle *th)
+{
+        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+        int                rc;
+
+        rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th);
+        if (rc)
+                return rc;
+
+        rc = mdo_declare_ref_del(env, obj, th);
+        if (rc)
+                return rc;
+
+        if (S_ISDIR(mdd_object_type(obj))) {
+                rc = mdo_declare_ref_del(env, obj, th);
+                if (rc)
+                        return rc;
+
+                rc = dt_declare_ref_del(env, mdd->mdd_orphans, th);
+        }
+
+        return rc;
+}
+
 static int orph_index_delete(const struct lu_env *env,
                              struct mdd_object *obj,
                              __u32 op,
@@ -330,12 +394,22 @@ static int orphan_object_destroy(const struct lu_env *env,
         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
         ma->ma_valid = 0;
 
-        mdd_log_txn_param_build(env, &obj->mod_obj, ma, MDD_TXN_UNLINK_OP, 0);
-        th = mdd_trans_start(env, mdd);
+        th = mdd_trans_create(env, mdd);
         if (IS_ERR(th)) {
                 CERROR("Cannot get thandle\n");
                 RETURN(-ENOMEM);
         }
+        rc = orph_declare_index_delete(env, obj, th);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_declare_object_kill(env, obj, ma, th);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, th);
+        if (rc)
+                GOTO(stop, rc);
 
         mdd_write_lock(env, obj, MOR_TGT_CHILD);
         if (likely(obj->mod_count == 0)) {
@@ -348,6 +422,8 @@ static int orphan_object_destroy(const struct lu_env *env,
                 mdd_orphan_write_unlock(env, mdd);
         }
         mdd_write_unlock(env, obj);
+
+stop:
         mdd_trans_stop(env, mdd, 0, th);
 
         RETURN(rc);
index c31ede1..393cd99 100644 (file)
 
 #include "mdd_internal.h"
 
-static int dto_txn_credits[DTO_NR];
-
-int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param,
-                     void *cookie)
-{
-        struct mdd_device *mdd = cookie;
-        struct obd_device *obd = mdd2obd_dev(mdd);
-        /* Each transaction updates lov objids, the credits should be added for
-         * this */
-        int blk, shift = mdd->mdd_dt_conf.ddp_block_shift;
-        blk = ((obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id) +
-               (1 << shift) - 1) >> shift) + 1;
-
-        /* add lov objids credits */
-        param->tp_credits += blk * dto_txn_credits[DTO_WRITE_BLOCK] +
-                             dto_txn_credits[DTO_WRITE_BASE];
-
-        return 0;
-}
-
 int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
                     void *cookie)
 {
@@ -100,202 +80,16 @@ int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
         return mds_lov_write_objids(obd);
 }
 
-void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
-                         enum mdd_txn_op op, int changelog_cnt)
+struct thandle *mdd_trans_create(const struct lu_env *env,
+                                 struct mdd_device *mdd)
 {
-        LASSERT(0 <= op && op < MDD_TXN_LAST_OP);
-
-        txn_param_init(&mdd_env_info(env)->mti_param,
-                       mdd->mdd_tod[op].mod_credits);
-        if (changelog_cnt > 0) {
-                txn_param_credit_add(&mdd_env_info(env)->mti_param,
-                                  changelog_cnt * dto_txn_credits[DTO_LOG_REC]);
-        }
+        return mdd_child_ops(mdd)->dt_trans_create(env, mdd->mdd_child);
 }
 
-int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
-                               struct lov_mds_md *lmm, enum mdd_txn_op op,
-                               int changelog_cnt)
+int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd,
+                    struct thandle *th)
 {
-        int stripes = 0;
-        ENTRY;
-
-        LASSERT(op == MDD_TXN_CREATE_DATA_OP || op == MDD_TXN_MKDIR_OP);
-
-        if (lmm == NULL)
-                GOTO(out, 0);
-        /* only replay create request will cause lov_objid update */
-        if (!mdd->mdd_obd_dev->obd_recovering)
-                GOTO(out, 0);
-
-        /* add possible orphan unlink rec credits used in lov_objid update */
-        if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1) {
-                stripes = le32_to_cpu(((struct lov_mds_md_v1*)lmm)
-                                      ->lmm_stripe_count);
-        } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3){
-                stripes = le32_to_cpu(((struct lov_mds_md_v3*)lmm)
-                                      ->lmm_stripe_count);
-        } else {
-                CERROR("Unknown lmm type %X\n", le32_to_cpu(lmm->lmm_magic));
-                LBUG();
-        }
-out:
-        mdd_txn_param_build(env, mdd, op, stripes + changelog_cnt);
-        RETURN(0);
-}
-
-int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                            struct md_attr *ma, enum mdd_txn_op op,
-                            int changelog_cnt)
-{
-        struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
-        int rc, stripe = 0;
-        ENTRY;
-
-        if (S_ISDIR(lu_object_attr(&obj->mo_lu)))
-                GOTO(out, rc = 0);
-
-        LASSERT(op == MDD_TXN_UNLINK_OP || op == MDD_TXN_RENAME_OP ||
-                op == MDD_TXN_RENAME_TGT_OP);
-        rc = mdd_lmm_get_locked(env, md2mdd_obj(obj), ma);
-        if (rc || !(ma->ma_valid & MA_LOV))
-                GOTO(out, rc);
-
-        LASSERTF(le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1 ||
-                 le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3,
-                 "%08x", le32_to_cpu(ma->ma_lmm->lmm_magic));
-
-        if ((int)le32_to_cpu(ma->ma_lmm->lmm_stripe_count) < 0)
-                stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
-        else
-                stripe = le32_to_cpu(ma->ma_lmm->lmm_stripe_count);
-
-out:
-        mdd_txn_param_build(env, mdd, op, stripe + changelog_cnt);
-
-        RETURN(rc);
-}
-
-void mdd_setattr_txn_param_build(const struct lu_env *env,
-                                 struct md_object *obj,
-                                 struct md_attr *ma, enum mdd_txn_op op,
-                                 int changelog_cnt)
-{
-        struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
-
-        mdd_txn_param_build(env, mdd, op, changelog_cnt);
-        if (ma->ma_attr.la_valid & (LA_UID | LA_GID))
-                txn_param_credit_add(&mdd_env_info(env)->mti_param,
-                                     dto_txn_credits[DTO_ATTR_SET_CHOWN]);
-}
-
-static void mdd_txn_init_dto_credits(const struct lu_env *env,
-                                     struct mdd_device *mdd, int *dto_credits)
-{
-        int op, credits;
-        for (op = 0; op < DTO_NR; op++) {
-                credits = mdd_child_ops(mdd)->dt_credit_get(env, mdd->mdd_child,
-                                                            op);
-                LASSERT(credits >= 0);
-                dto_txn_credits[op] = credits;
-        }
-}
-
-int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd)
-{
-        int op;
-
-        /* Init credits for each ops. */
-        mdd_txn_init_dto_credits(env, mdd, dto_txn_credits);
-
-        /* Calculate the mdd credits. */
-        for (op = MDD_TXN_OBJECT_DESTROY_OP; op < MDD_TXN_LAST_OP; op++) {
-                int *c = &mdd->mdd_tod[op].mod_credits;
-                int *dt = dto_txn_credits;
-                mdd->mdd_tod[op].mod_op = op;
-                switch(op) {
-                        case MDD_TXN_OBJECT_DESTROY_OP:
-                                /* Unused now */
-                                *c = dt[DTO_OBJECT_DELETE];
-                                break;
-                        case MDD_TXN_OBJECT_CREATE_OP:
-                                /* OI INSERT + CREATE OBJECT */
-                                *c = dt[DTO_INDEX_INSERT] +
-                                     dt[DTO_OBJECT_CREATE];
-                                break;
-                        case MDD_TXN_ATTR_SET_OP:
-                                /* ATTR set + XATTR(lsm, lmv) set */
-                                *c = dt[DTO_ATTR_SET_BASE] +
-                                     dt[DTO_XATTR_SET];
-                                break;
-                        case MDD_TXN_XATTR_SET_OP:
-                                *c = dt[DTO_XATTR_SET];
-                                break;
-                        case MDD_TXN_INDEX_INSERT_OP:
-                                *c = dt[DTO_INDEX_INSERT];
-                                break;
-                        case MDD_TXN_INDEX_DELETE_OP:
-                                *c = dt[DTO_INDEX_DELETE];
-                                break;
-                        case MDD_TXN_LINK_OP:
-                                *c = dt[DTO_INDEX_INSERT];
-                                break;
-                        case MDD_TXN_UNLINK_OP:
-                                /* delete index + Unlink log +
-                                 * mdd orphan handling */
-                                *c = dt[DTO_INDEX_DELETE] +
-                                        dt[DTO_INDEX_DELETE] +
-                                        dt[DTO_INDEX_INSERT] * 2 +
-                                        dt[DTO_XATTR_SET] * 3;
-                                break;
-                        case MDD_TXN_RENAME_OP:
-                                /* 2 delete index + 1 insert + Unlink log */
-                                *c = 2 * dt[DTO_INDEX_DELETE] +
-                                        dt[DTO_INDEX_INSERT] +
-                                        dt[DTO_INDEX_DELETE] +
-                                        dt[DTO_INDEX_INSERT] * 2 +
-                                        dt[DTO_XATTR_SET] * 3;
-                                break;
-                        case MDD_TXN_RENAME_TGT_OP:
-                                /* index insert + index delete */
-                                *c = dt[DTO_INDEX_DELETE] +
-                                        dt[DTO_INDEX_INSERT] +
-                                        dt[DTO_INDEX_DELETE] +
-                                        dt[DTO_INDEX_INSERT] * 2 +
-                                        dt[DTO_XATTR_SET] * 3;
-                                break;
-                        case MDD_TXN_CREATE_DATA_OP:
-                                /* same as set xattr(lsm) */
-                                *c = dt[DTO_XATTR_SET];
-                                break;
-                        case MDD_TXN_MKDIR_OP:
-                                /* INDEX INSERT + OI INSERT +
-                                 * CREATE_OBJECT_CREDITS
-                                 * SET_MD CREDITS is already counted in
-                                 * CREATE_OBJECT CREDITS
-                                 */
-                                 *c = 2 * dt[DTO_INDEX_INSERT] +
-                                          dt[DTO_OBJECT_CREATE];
-                                break;
-                        case MDD_TXN_CLOSE_OP:
-                                *c = 0;
-                                break;
-                        default:
-                                CERROR("Invalid op %d init its credit\n", op);
-                                LBUG();
-                }
-        }
-        RETURN(0);
-}
-
-struct thandle* mdd_trans_start(const struct lu_env *env,
-                                struct mdd_device *mdd)
-{
-        struct txn_param *p = &mdd_env_info(env)->mti_param;
-        struct thandle *th;
-
-        th = mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p);
-        return th;
+        return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, th);
 }
 
 void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
index 578c3e8..968121d 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * Copyright (c) 2011 Whamcloud, Inc.
@@ -92,11 +93,19 @@ static int write_capa_keys(const struct lu_env *env,
         int i, rc;
 
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        mdt_trans_credit_init(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_OP);
-        th = mdt_trans_start(env, mdt);
+        th = mdt_trans_create(env, mdt);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
+        rc = dt_declare_record_write(env, mdt->mdt_ck_obj,
+                                     sizeof(*tmp) * 3, 0, th);
+        if (rc)
+                goto stop;
+
+        rc = mdt_trans_start(env, mdt, th);
+        if (rc)
+                goto stop;
+
         tmp = &mti->mti_capa_key;
 
         for (i = 0; i < 2; i++) {
@@ -109,6 +118,7 @@ static int write_capa_keys(const struct lu_env *env,
                         break;
         }
 
+stop:
         mdt_trans_stop(env, mdt, th);
 
         CDEBUG(D_INFO, "write capability keys rc = %d:\n", rc);
index 0a98f73..f6444c3 100644 (file)
@@ -5459,7 +5459,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
                  *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
                 rc = -ENOENT;
         } else {
-                version = mo_version_get(mti->mti_env, mdt_object_child(obj));
+                version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
                *(__u64 *)data->ioc_inlbuf2 = version;
                 rc = 0;
         }
index aee5a99..f4a1a31 100644 (file)
@@ -391,7 +391,6 @@ struct mdt_thread_info {
         struct lr_server_data      mti_lsd;
         struct lsd_client_data     mti_lcd;
         loff_t                     mti_off;
-        struct txn_param           mti_txn_param;
         struct lu_buf              mti_buf;
         struct lustre_capa_key     mti_capa_key;
 
@@ -603,11 +602,10 @@ int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *,
                            const struct md_attr *);
 void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *);
 
-void mdt_trans_credit_init(const struct lu_env *env,
-                           struct mdt_device *mdt,
-                           enum mdt_txn_op op);
-struct thandle* mdt_trans_start(const struct lu_env *env,
-                                struct mdt_device *mdt);
+struct thandle *mdt_trans_create(const struct lu_env *env,
+                                 struct mdt_device *mdt);
+int mdt_trans_start(const struct lu_env *env, struct mdt_device *mdt,
+                    struct thandle *th);
 void mdt_trans_stop(const struct lu_env *env,
                     struct mdt_device *mdt, struct thandle *th);
 int mdt_record_write(const struct lu_env *env,
@@ -650,6 +648,16 @@ static inline struct mdt_device *mdt_dev(struct lu_device *d)
         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
 }
 
+static inline struct dt_object *mdt_obj2dt(struct mdt_object *mo)
+{
+        struct lu_object *lo;
+        struct mdt_device *mdt = mdt_dev(mo->mot_obj.mo_lu.lo_dev);
+
+        lo = lu_object_locate(mo->mot_obj.mo_lu.lo_header,
+                              mdt->mdt_bottom->dd_lu_dev.ld_type);
+        return lu2dt(lo);
+}
+
 /* mdt/mdt_identity.c */
 #define MDT_IDENTITY_UPCALL_PATH        "/usr/sbin/l_getidentity"
 
index 8ae3631..abc604c 100644 (file)
@@ -80,50 +80,16 @@ const struct lu_buf *mdt_buf_const(const struct lu_env *env,
         return buf;
 }
 
-static inline int mdt_trans_credit_get(const struct lu_env *env,
-                                       struct mdt_device *mdt,
-                                       enum mdt_txn_op op)
-{
-        struct dt_device *dev = mdt->mdt_bottom;
-        int cr;
-        switch (op) {
-                case MDT_TXN_CAPA_KEYS_WRITE_OP:
-                case MDT_TXN_LAST_RCVD_WRITE_OP:
-                        cr = dev->dd_ops->dt_credit_get(env,
-                                                        dev,
-                                                        DTO_WRITE_BLOCK);
-                break;
-                default:
-                        LBUG();
-        }
-        return cr;
-}
-
-void mdt_trans_credit_init(const struct lu_env *env,
-                           struct mdt_device *mdt,
-                           enum mdt_txn_op op)
+struct thandle *mdt_trans_create(const struct lu_env *env,
+                                 struct mdt_device *mdt)
 {
-        struct mdt_thread_info *mti;
-        struct txn_param *p;
-        int cr;
-
-        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        p = &mti->mti_txn_param;
-
-        cr = mdt_trans_credit_get(env, mdt, op);
-        txn_param_init(p, cr);
+        return mdt->mdt_bottom->dd_ops->dt_trans_create(env, mdt->mdt_bottom);
 }
 
-struct thandle* mdt_trans_start(const struct lu_env *env,
-                                struct mdt_device *mdt)
+int mdt_trans_start(const struct lu_env *env, struct mdt_device *mdt,
+                    struct thandle *th)
 {
-        struct mdt_thread_info *mti;
-        struct txn_param *p;
-
-        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        p = &mti->mti_txn_param;
-
-        return dt_trans_start(env, mdt->mdt_bottom, p);
+        return dt_trans_start(env, mdt->mdt_bottom, th);
 }
 
 void mdt_trans_stop(const struct lu_env *env,
@@ -159,6 +125,18 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env,
         return rc;
 }
 
+static int mdt_declare_last_rcvd_header_write(const struct lu_env *env,
+                                              struct mdt_device *mdt,
+                                              struct thandle *th)
+{
+        struct mdt_thread_info *mti;
+
+        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+
+        return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+                                       sizeof(mti->mti_lsd), 0, th);
+}
+
 static int mdt_last_rcvd_header_write(const struct lu_env *env,
                                       struct mdt_device *mdt,
                                       struct thandle *th)
@@ -212,6 +190,14 @@ static int mdt_last_rcvd_read(const struct lu_env *env, struct mdt_device *mdt,
         return rc;
 }
 
+static int mdt_declare_last_rcvd_write(const struct lu_env *env,
+                                       struct mdt_device *mdt,
+                                       loff_t off, struct thandle *th)
+{
+        return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+                                       sizeof(struct lsd_client_data), off, th);
+}
+
 static int mdt_last_rcvd_write(const struct lu_env *env,
                                struct mdt_device *mdt,
                                struct lsd_client_data *lcd,
@@ -503,11 +489,18 @@ static int mdt_server_data_update(const struct lu_env *env,
 
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
 
-        mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
-        th = mdt_trans_start(env, mdt);
+        th = mdt_trans_create(env, mdt);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
+        rc = mdt_declare_last_rcvd_header_write(env, mdt, th);
+        if (rc)
+                goto out;
+
+        rc = mdt_trans_start(env, mdt, th);
+        if (rc)
+                goto out;
+
         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
                mdt->mdt_lut.lut_obd->u.obt.obt_mount_count,
                mdt->mdt_lut.lut_last_transno);
@@ -517,6 +510,8 @@ static int mdt_server_data_update(const struct lu_env *env,
         cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
 
         rc = mdt_last_rcvd_header_write(env, mdt, th);
+
+out:
         mdt_trans_stop(env, mdt, th);
         return rc;
 }
@@ -575,12 +570,18 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
                 RETURN(-ENOSPC);
 
-        mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
-
-        th = mdt_trans_start(env, mdt);
+        th = mdt_trans_create(env, mdt);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
+        rc = mdt_declare_last_rcvd_write(env, mdt, off, th);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdt_trans_start(env, mdt, th);
+        if (rc)
+                GOTO(stop, rc);
+
         /*
          * Until this operations will be committed the sync is needed
          * for this export. This should be done _after_ starting the
@@ -600,6 +601,8 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
         rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th);
         CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
                cl_idx, ted->ted_lr_off, (int)sizeof(*(ted->ted_lcd)));
+
+stop:
         mdt_trans_stop(env, mdt, th);
 
         RETURN(rc);
@@ -709,15 +712,24 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
          * be in server data or in client data in case of failure */
         mdt_server_data_update(env, mdt);
 
-        mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
-        th = mdt_trans_start(env, mdt);
+        th = mdt_trans_create(env, mdt);
         if (IS_ERR(th))
                 GOTO(free, rc = PTR_ERR(th));
 
+        rc = mdt_declare_last_rcvd_write(env, mdt, off, th);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdt_trans_start(env, mdt, th);
+        if (rc)
+                GOTO(stop, rc);
+
         cfs_mutex_down(&ted->ted_lcd_lock);
         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
         rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th);
         cfs_mutex_up(&ted->ted_lcd_lock);
+
+stop:
         mdt_trans_stop(env, mdt, th);
 
         CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
@@ -831,23 +843,13 @@ extern struct lu_context_key mdt_thread_key;
 
 /* add credits for last_rcvd update */
 static int mdt_txn_start_cb(const struct lu_env *env,
-                            struct txn_param *param, void *cookie)
+                            struct thandle *th, void *cookie)
 {
         struct mdt_device *mdt = cookie;
 
-        param->tp_credits += mdt_trans_credit_get(env, mdt,
-                                                  MDT_TXN_LAST_RCVD_WRITE_OP);
-        return 0;
-}
-
-/* Set new object versions */
-static void mdt_version_set(struct mdt_thread_info *info)
-{
-        if (info->mti_mos != NULL) {
-                mo_version_set(info->mti_env, mdt_object_child(info->mti_mos),
-                               info->mti_transno);
-                info->mti_mos = NULL;
-        }
+        /* XXX: later we'll be declaring this at specific offset */
+        return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+                                       sizeof(struct lsd_client_data), 0, th);
 }
 
 /* Update last_rcvd records with latests transaction data */
@@ -893,8 +895,11 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
         LASSERT(req != NULL && req->rq_repmsg != NULL);
 
         /** VBR: set new versions */
-        if (txn->th_result == 0)
-                mdt_version_set(mti);
+        if (txn->th_result == 0 && mti->mti_mos != NULL) {
+                dt_version_set(env, mdt_obj2dt(mti->mti_mos),
+                               mti->mti_transno, txn);
+                mti->mti_mos = NULL;
+        }
 
         /* filling reply data */
         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
index 3c4c011..ec2e5d8 100644 (file)
@@ -110,7 +110,7 @@ static void mdt_obj_version_get(struct mdt_thread_info *info,
         LASSERT(o);
         LASSERT(mdt_object_exists(o) >= 0);
         if (mdt_object_exists(o) > 0)
-                *version = mo_version_get(info->mti_env, mdt_object_child(o));
+                *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
         else
                 *version = ENOENT_VERSION;
         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
index 3d08d2f..4ecf61e 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -89,21 +90,23 @@ void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
 EXPORT_SYMBOL(dt_txn_callback_del);
 
 int dt_txn_hook_start(const struct lu_env *env,
-                      struct dt_device *dev, struct txn_param *param)
+                      struct dt_device *dev, struct thandle *th)
 {
-        int result;
+        int rc = 0;
         struct dt_txn_callback *cb;
 
-        result = 0;
+        if (th->th_local)
+                return 0;
+
         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
                 if (cb->dtc_txn_start == NULL ||
                     !(cb->dtc_tag & env->le_ctx.lc_tags))
                         continue;
-                result = cb->dtc_txn_start(env, param, cb->dtc_cookie);
-                if (result < 0)
+                rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
+                if (rc < 0)
                         break;
         }
-        return result;
+        return rc;
 }
 EXPORT_SYMBOL(dt_txn_hook_start);
 
@@ -111,18 +114,20 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
 {
         struct dt_device       *dev = txn->th_dev;
         struct dt_txn_callback *cb;
-        int                     result;
+        int                     rc = 0;
+
+        if (txn->th_local)
+                return 0;
 
-        result = 0;
         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
                 if (cb->dtc_txn_stop == NULL ||
                     !(cb->dtc_tag & env->le_ctx.lc_tags))
                         continue;
-                result = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
-                if (result < 0)
+                rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
+                if (rc < 0)
                         break;
         }
-        return result;
+        return rc;
 }
 EXPORT_SYMBOL(dt_txn_hook_stop);
 
@@ -130,6 +135,9 @@ void dt_txn_hook_commit(struct thandle *txn)
 {
         struct dt_txn_callback *cb;
 
+        if (txn->th_local)
+                return;
+
         cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
                                 dtc_linkage) {
                 if (cb->dtc_txn_commit)
@@ -200,31 +208,21 @@ enum dt_format_type dt_mode_to_dft(__u32 mode)
         }
         return result;
 }
-
 EXPORT_SYMBOL(dt_mode_to_dft);
+
 /**
  * lookup fid for object named \a name in directory \a dir.
  */
 
-static int dt_lookup(const struct lu_env *env, struct dt_object *dir,
-                     const char *name, struct lu_fid *fid)
+int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
+                  const char *name, struct lu_fid *fid)
 {
-        struct dt_rec       *rec = (struct dt_rec *)fid;
-        const struct dt_key *key = (const struct dt_key *)name;
-        int result;
-
-        if (dt_try_as_dir(env, dir)) {
-                result = dir->do_index_ops->dio_lookup(env, dir, rec, key,
-                                                       BYPASS_CAPA);
-                if (result > 0)
-                        result = 0;
-                else if (result == 0)
-                        result = -ENOENT;
-        } else
-                result = -ENOTDIR;
-        return result;
+        if (dt_try_as_dir(env, dir))
+                return dt_lookup(env, dir, (struct dt_rec *)fid,
+                                 (const struct dt_key *)name, BYPASS_CAPA);
+        return -ENOTDIR;
 }
-
+EXPORT_SYMBOL(dt_lookup_dir);
 /**
  * get object for given \a fid.
  */
@@ -257,7 +255,7 @@ static int dt_find_entry(const struct lu_env *env, const char *entry, void *data
         struct dt_object     *obj = dfh->dfh_o;
         int                   result;
 
-        result = dt_lookup(env, obj, entry, fid);
+        result = dt_lookup_dir(env, obj, entry, fid);
         lu_object_put(env, &obj->do_lu);
         if (result == 0) {
                 obj = dt_locate(env, dt, fid);
@@ -341,7 +339,7 @@ static struct dt_object *dt_reg_open(const struct lu_env *env,
         struct dt_object *o;
         int result;
 
-        result = dt_lookup(env, p, name, fid);
+        result = dt_lookup_dir(env, p, name, fid);
         if (result == 0){
                 o = dt_locate(env, dt, fid);
         }
@@ -416,6 +414,8 @@ int dt_record_write(const struct lu_env *env, struct dt_object *dt,
 
         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
         LASSERT(th != NULL);
+        LASSERT(dt->do_body_ops);
+        LASSERT(dt->do_body_ops->dbo_write);
         rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
         if (rc == buf->lb_len)
                 rc = 0;
@@ -425,5 +425,56 @@ int dt_record_write(const struct lu_env *env, struct dt_object *dt,
 }
 EXPORT_SYMBOL(dt_record_write);
 
+int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
+                           struct thandle *th)
+{
+        struct lu_buf vbuf;
+        char *xname = XATTR_NAME_VERSION;
+
+        LASSERT(o);
+        vbuf.lb_buf = NULL;
+        vbuf.lb_len = sizeof(dt_obj_version_t);
+        return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
+
+}
+EXPORT_SYMBOL(dt_declare_version_set);
+
+void dt_version_set(const struct lu_env *env, struct dt_object *o,
+                    dt_obj_version_t version, struct thandle *th)
+{
+        struct lu_buf vbuf;
+        char *xname = XATTR_NAME_VERSION;
+        int rc;
+
+        LASSERT(o);
+        vbuf.lb_buf = &version;
+        vbuf.lb_len = sizeof(version);
+
+        rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA);
+        if (rc != 0)
+                CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
+        return;
+}
+EXPORT_SYMBOL(dt_version_set);
+
+dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
+{
+        struct lu_buf vbuf;
+        char *xname = XATTR_NAME_VERSION;
+        dt_obj_version_t version;
+        int rc;
+
+        LASSERT(o);
+        vbuf.lb_buf = &version;
+        vbuf.lb_len = sizeof(version);
+        rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA);
+        if (rc != sizeof(version)) {
+                CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
+                version = 0;
+        }
+        return version;
+}
+EXPORT_SYMBOL(dt_version_get);
+
 const struct dt_index_features dt_directory_features;
 EXPORT_SYMBOL(dt_directory_features);
index bffa028..45f92aa 100644 (file)
@@ -138,9 +138,25 @@ static       struct lu_context_key            osd_key;
 static const struct dt_object_operations      osd_obj_ops;
 static const struct dt_object_operations      osd_obj_ea_ops;
 static const struct dt_body_operations        osd_body_ops;
+static const struct dt_body_operations        osd_body_ops_new;
 static const struct dt_index_operations       osd_index_iam_ops;
 static const struct dt_index_operations       osd_index_ea_ops;
 
+#define OSD_TRACK_DECLARES
+#ifdef OSD_TRACK_DECLARES
+#define OSD_DECLARE_OP(oh, op)   {                               \
+        LASSERT(oh->ot_handle == NULL);                          \
+        ((oh)->ot_declare_ ##op)++; }
+#define OSD_EXEC_OP(handle, op)     {                            \
+        struct osd_thandle *oh;                                  \
+        oh = container_of0(handle, struct osd_thandle, ot_super);\
+        LASSERT((oh)->ot_declare_ ##op > 0);                     \
+        ((oh)->ot_declare_ ##op)--; }
+#else
+#define OSD_DECLARE_OP(oh, op)
+#define OSD_EXEC_OP(oh, op)
+#endif
+
 struct osd_thandle {
         struct thandle          ot_super;
         handle_t               *ot_handle;
@@ -148,6 +164,20 @@ struct osd_thandle {
         cfs_list_t              ot_dcb_list;
         /* Link to the device, for debugging. */
         struct lu_ref_link     *ot_dev_link;
+        int                     ot_credits;
+
+#ifdef OSD_TRACK_DECLARES
+        unsigned char           ot_declare_attr_set;
+        unsigned char           ot_declare_punch;
+        unsigned char           ot_declare_xattr_set;
+        unsigned char           ot_declare_create;
+        unsigned char           ot_declare_destroy;
+        unsigned char           ot_declare_ref_add;
+        unsigned char           ot_declare_ref_del;
+        unsigned char           ot_declare_write;
+        unsigned char           ot_declare_insert;
+        unsigned char           ot_declare_delete;
+#endif
 
 #if OSD_THANDLE_STATS
         /** time when this handle was allocated */
@@ -158,6 +188,25 @@ struct osd_thandle {
 #endif
 };
 
+/**
+ * Basic transaction credit op
+ */
+enum dt_txn_op {
+        DTO_INDEX_INSERT,
+        DTO_INDEX_DELETE,
+        DTO_INDEX_UPDATE,
+        DTO_OBJECT_CREATE,
+        DTO_OBJECT_DELETE,
+        DTO_ATTR_SET_BASE,
+        DTO_XATTR_SET,
+        DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
+        DTO_WRITE_BASE,
+        DTO_WRITE_BLOCK,
+        DTO_ATTR_SET_CHOWN,
+
+        DTO_NR
+};
+
 /*
  * Helpers.
  */
@@ -474,6 +523,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
         LINVRNT(osd_invariant(obj));
 
         result = osd_fid_lookup(env, obj, lu_object_fid(l));
+        obj->oo_dt.do_body_ops = &osd_body_ops_new;
         if (result == 0) {
                 if (obj->oo_inode != NULL)
                         osd_object_init0(obj);
@@ -641,9 +691,11 @@ static void __osd_th_check_slow(void *oth, struct osd_device *dev,
  * Concurrency: doesn't access mutable data.
  */
 static int osd_param_is_sane(const struct osd_device *dev,
-                             const struct txn_param *param)
+                             const struct thandle *th)
 {
-        return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
+        struct osd_thandle *oh;
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
 }
 
 /*
@@ -681,86 +733,129 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
         OBD_FREE_PTR(oh);
 }
 
+static struct thandle *osd_trans_create(const struct lu_env *env,
+                                        struct dt_device *d)
+{
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_thandle     *oh;
+        struct thandle         *th;
+        ENTRY;
+
+        th = ERR_PTR(-ENOMEM);
+        OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
+        if (oh != NULL) {
+                th = &oh->ot_super;
+                th->th_dev = d;
+                th->th_result = 0;
+                th->th_tags = LCT_TX_HANDLE;
+                oh->ot_credits = 0;
+                oti->oti_dev = osd_dt_dev(d);
+                CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
+                osd_th_alloced(oh);
+        }
+        RETURN(th);
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
-static struct thandle *osd_trans_start(const struct lu_env *env,
-                                       struct dt_device *d,
-                                       struct txn_param *p)
+int osd_trans_start(const struct lu_env *env, struct dt_device *d,
+                    struct thandle *th)
 {
+        struct osd_thread_info *oti = osd_oti_get(env);
         struct osd_device  *dev = osd_dt_dev(d);
         handle_t           *jh;
         struct osd_thandle *oh;
-        struct thandle     *th;
-        int hook_res;
+        int rc;
 
         ENTRY;
 
-        hook_res = dt_txn_hook_start(env, d, p);
-        if (hook_res != 0)
-                RETURN(ERR_PTR(hook_res));
+        LASSERT(current->journal_info == NULL);
 
-        if (osd_param_is_sane(dev, p)) {
-                OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
-                if (oh != NULL) {
-                        struct osd_thread_info *oti = osd_oti_get(env);
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh != NULL);
+        LASSERT(oh->ot_handle == NULL);
 
-                        /*
-                         * XXX temporary stuff. Some abstraction layer should
-                         * be used.
-                         */
-                        oti->oti_dev = dev;
-                        CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
-                        osd_th_alloced(oh);
-                        jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
-                        osd_th_started(oh);
-                        if (!IS_ERR(jh)) {
-                                oh->ot_handle = jh;
-                                th = &oh->ot_super;
-                                th->th_dev = d;
-                                th->th_result = 0;
-                                th->th_sync = 0;
-                                lu_device_get(&d->dd_lu_dev);
-                                oh->ot_dev_link = lu_ref_add
-                                        (&d->dd_lu_dev.ld_reference,
-                                         "osd-tx", th);
-                                /* add commit callback */
-                                lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
-                                lu_context_enter(&th->th_ctx);
-                                LASSERT(oti->oti_txns == 0);
-                                LASSERT(oti->oti_r_locks == 0);
-                                LASSERT(oti->oti_w_locks == 0);
-                                oti->oti_txns++;
-                        } else {
-                                OBD_FREE_PTR(oh);
-                                th = (void *)jh;
-                        }
-                } else
-                        th = ERR_PTR(-ENOMEM);
-        } else {
-                CERROR("Invalid transaction parameters\n");
-                th = ERR_PTR(-EINVAL);
+        rc = dt_txn_hook_start(env, d, th);
+        if (rc != 0)
+                GOTO(out, rc);
+
+        oh->ot_credits += LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev));
+
+        if (!osd_param_is_sane(dev, th)) {
+                CWARN("%s: too many transaction credits (%d > %d)\n",
+                      d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
+                      osd_journal(dev)->j_max_transaction_buffers);
+#ifdef OSD_TRACK_DECLARES
+                CERROR("  attr_set: %d, punch: %d, xattr_set: %d,\n",
+                       oh->ot_declare_attr_set, oh->ot_declare_punch,
+                       oh->ot_declare_xattr_set);
+                CERROR("  create: %d, ref_add: %d, ref_del: %d, write: %d\n",
+                       oh->ot_declare_create, oh->ot_declare_ref_add,
+                       oh->ot_declare_ref_del, oh->ot_declare_write);
+                CERROR("  insert: %d, delete: %d\n",
+                       oh->ot_declare_insert, oh->ot_declare_delete);
+#endif
         }
 
-        RETURN(th);
+        /*
+         * XXX temporary stuff. Some abstraction layer should
+         * be used.
+         */
+        jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
+        osd_th_started(oh);
+        if (!IS_ERR(jh)) {
+                oh->ot_handle = jh;
+                LASSERT(oti->oti_txns == 0);
+                lu_context_init(&th->th_ctx, th->th_tags);
+                lu_context_enter(&th->th_ctx);
+
+                lu_device_get(&d->dd_lu_dev);
+                oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
+                                             "osd-tx", th);
+
+                /*
+                 * XXX: current rule is that we first start tx,
+                 *      then lock object(s), but we can't use
+                 *      this rule for data (due to locking specifics
+                 *      in ldiskfs). also in long-term we'd like to
+                 *      use usually-used (locks;tx) ordering. so,
+                 *      UGLY thing is that we'll use one ordering for
+                 *      data (ofd) and reverse ordering for metadata
+                 *      (mdd). then at some point we'll fix the latter
+                 */
+                if (lu_device_is_md(&d->dd_lu_dev)) {
+                        LASSERT(oti->oti_r_locks == 0);
+                        LASSERT(oti->oti_w_locks == 0);
+                }
+
+                oti->oti_txns++;
+                rc = 0;
+        } else {
+                rc = PTR_ERR(jh);
+        }
+out:
+        RETURN(rc);
 }
 
 /*
  * Concurrency: shouldn't matter.
  */
-static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
 {
-        int result;
-        struct osd_thandle *oh;
+        int                     rc = 0;
+        struct osd_thandle     *oh;
         struct osd_thread_info *oti = osd_oti_get(env);
 
         ENTRY;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
+
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
                 hdl->h_sync = th->th_sync;
+
                 /*
                  * add commit callback
                  * notice we don't do this in osd_trans_start()
@@ -771,20 +866,33 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
 
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
-                LASSERT(oti->oti_r_locks == 0);
-                LASSERT(oti->oti_w_locks == 0);
-                result = dt_txn_hook_stop(env, th);
-                if (result != 0)
-                        CERROR("Failure in transaction hook: %d\n", result);
+                /*
+                 * XXX: current rule is that we first start tx,
+                 *      then lock object(s), but we can't use
+                 *      this rule for data (due to locking specifics
+                 *      in ldiskfs). also in long-term we'd like to
+                 *      use usually-used (locks;tx) ordering. so,
+                 *      UGLY thing is that we'll use one ordering for
+                 *      data (ofd) and reverse ordering for metadata
+                 *      (mdd). then at some point we'll fix the latter
+                 */
+                if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
+                        LASSERT(oti->oti_r_locks == 0);
+                        LASSERT(oti->oti_w_locks == 0);
+                }
+                rc = dt_txn_hook_stop(env, th);
+                if (rc != 0)
+                        CERROR("Failure in transaction hook: %d\n", rc);
                 oh->ot_handle = NULL;
                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
-                                  result = ldiskfs_journal_stop(hdl));
-                if (result != 0)
-                        CERROR("Failure to stop transaction: %d\n", result);
+                                  rc = ldiskfs_journal_stop(hdl));
+                if (rc != 0)
+                        CERROR("Failure to stop transaction: %d\n", rc);
         } else {
                 OBD_FREE_PTR(oh);
         }
-        EXIT;
+
+        RETURN(rc);
 }
 
 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
@@ -798,35 +906,6 @@ static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
 }
 
 /*
- * Concurrency: no concurrent access is possible that late in object
- * life-cycle.
- */
-static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
-{
-        const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
-        struct osd_device      *osd = osd_obj2dev(obj);
-        struct osd_thread_info *oti = osd_oti_get(env);
-        struct txn_param       *prm = &oti->oti_txn;
-        struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
-        struct thandle         *th;
-        int result;
-
-        lu_env_init(env_del_obj, LCT_DT_THREAD);
-        txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
-                            OSD_TXN_INODE_DELETE_CREDITS);
-        th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
-        if (!IS_ERR(th)) {
-                result = osd_oi_delete(osd_oti_get(env_del_obj),
-                                       &osd->od_oi, fid, th);
-                osd_trans_stop(env_del_obj, th);
-        } else
-                result = PTR_ERR(th);
-
-        lu_env_fini(env_del_obj);
-        return result;
-}
-
-/*
  * Called just before object is freed. Releases all resources except for
  * object itself (that is released by osd_object_free()).
  *
@@ -846,16 +925,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 
         osd_index_fini(obj);
         if (inode != NULL) {
-                int result;
-
-                if (osd_inode_unlinked(inode)) {
-                        result = osd_inode_remove(env, obj);
-                        if (result != 0)
-                                LU_OBJECT_DEBUG(D_ERROR, env, l,
-                                                "Failed to cleanup: %d\n",
-                                                result);
-                }
-
                 iput(inode);
                 obj->oo_inode = NULL;
         }
@@ -867,11 +936,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 static void osd_object_release(const struct lu_env *env,
                                struct lu_object *l)
 {
-        struct osd_object *o = osd_obj(l);
-
-        LASSERT(!lu_object_is_dying(l->lo_header));
-        if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
-                cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
 }
 
 /*
@@ -996,20 +1060,18 @@ static int osd_commit_async(const struct lu_env *env,
 /*
  * Concurrency: shouldn't matter.
  */
-lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
 
 static void osd_ro(const struct lu_env *env, struct dt_device *d)
 {
+        struct super_block *sb = osd_sb(osd_dt_dev(d));
         ENTRY;
 
         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
 
-        __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
-                          fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
+        __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
         EXIT;
 }
 
-
 /*
  * Concurrency: serialization provided by callers.
  */
@@ -1063,7 +1125,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
         /**
          * Unused now
          */
-        [DTO_IDNEX_UPDATE]  = 16,
+        [DTO_INDEX_UPDATE]  = 16,
         /**
          * Create a object. The same as create object in EXT3.
          * DATA_TRANS_BLOCKS(14) +
@@ -1072,14 +1134,13 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
          */
         [DTO_OBJECT_CREATE] = 25,
         /**
-         * Unused now
+         * XXX: real credits to be fixed
          */
         [DTO_OBJECT_DELETE] = 25,
         /**
-         * Attr set credits.
-         * 3(inode bits, group, GDT)
+         * Attr set credits (inode)
          */
-        [DTO_ATTR_SET_BASE] = 3,
+        [DTO_ATTR_SET_BASE] = 1,
         /**
          * Xattr set. The same as xattr of EXT3.
          * DATA_TRANS_BLOCKS(14)
@@ -1089,7 +1150,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
         [DTO_XATTR_SET]     = 14,
         [DTO_LOG_REC]       = 14,
         /**
-         * creadits for inode change during write.
+         * credits for inode change during write.
          */
         [DTO_WRITE_BASE]    = 3,
         /**
@@ -1123,7 +1184,7 @@ static const int osd_dto_credits_quota[DTO_NR] = {
         /**
          * Unused now.
          */
-        [DTO_IDNEX_UPDATE]  = 16,
+        [DTO_INDEX_UPDATE]  = 16,
         /*
          * Create a object. Same as create object in EXT3 filesystem.
          * DATA_TRANS_BLOCKS(16) +
@@ -1170,23 +1231,10 @@ static const int osd_dto_credits_quota[DTO_NR] = {
         [DTO_ATTR_SET_CHOWN]= 68,
 };
 
-static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
-                          enum dt_txn_op op)
-{
-        LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
-                ARRAY_SIZE(osd_dto_credits_quota));
-        LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
-#ifdef HAVE_QUOTA_SUPPORT
-        if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
-                return osd_dto_credits_quota[op];
-        else
-#endif
-                return osd_dto_credits_noquota[op];
-}
-
 static const struct dt_device_operations osd_dt_ops = {
         .dt_root_get       = osd_root_get,
         .dt_statfs         = osd_statfs,
+        .dt_trans_create   = osd_trans_create,
         .dt_trans_start    = osd_trans_start,
         .dt_trans_stop     = osd_trans_stop,
         .dt_trans_cb_add   = osd_trans_cb_add,
@@ -1194,7 +1242,6 @@ static const struct dt_device_operations osd_dt_ops = {
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
         .dt_commit_async   = osd_commit_async,
-        .dt_credit_get     = osd_credit_get,
         .dt_init_capa_ctxt = osd_init_capa_ctxt,
         .dt_init_quota_ctxt= osd_init_quota_ctxt,
 };
@@ -1422,6 +1469,25 @@ static int osd_attr_get(const struct lu_env *env,
         return 0;
 }
 
+static int osd_declare_attr_set(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct lu_attr *attr,
+                                struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+        LASSERT(osd_invariant(obj));
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, attr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
+}
+
 static int osd_inode_setattr(const struct lu_env *env,
                              struct inode *inode, const struct lu_attr *attr)
 {
@@ -1506,6 +1572,8 @@ static int osd_attr_set(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, attr_set);
+
         cfs_spin_lock(&obj->oo_guard);
         rc = osd_inode_setattr(env, obj->oo_inode, attr);
         cfs_spin_unlock(&obj->oo_guard);
@@ -1824,6 +1892,33 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
 }
 
+static int osd_declare_object_create(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lu_attr *attr,
+                                     struct dt_allocation_hint *hint,
+                                     struct dt_object_format *dof,
+                                     struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        OSD_DECLARE_OP(oh, create);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        /* if this is directory, then we expect . and ..
+         * to be inserted as well */
+        OSD_DECLARE_OP(oh, insert);
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+        return 0;
+}
+
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                              struct lu_attr *attr,
                              struct dt_allocation_hint *hint,
@@ -1842,6 +1937,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, create);
+
         result = __osd_object_create(info, obj, attr, hint, dof, th);
         if (result == 0)
                 result = __osd_oi_insert(env, obj, fid, th);
@@ -1852,6 +1949,63 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
+ * Called to destroy on-disk representation of the object
+ *
+ * Concurrency: must be locked
+ */
+static int osd_declare_object_destroy(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      struct thandle *th)
+{
+        struct osd_object      *obj = osd_dt_obj(dt);
+        struct inode           *inode = obj->oo_inode;
+        struct osd_thandle     *oh;
+        ENTRY;
+
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+        LASSERT(inode);
+        LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
+
+        OSD_DECLARE_OP(oh, destroy);
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        RETURN(0);
+}
+
+static int osd_object_destroy(const struct lu_env *env,
+                              struct dt_object *dt,
+                              struct thandle *th)
+{
+        const struct lu_fid    *fid = lu_object_fid(&dt->do_lu);
+        struct osd_object      *obj = osd_dt_obj(dt);
+        struct inode           *inode = obj->oo_inode;
+        struct osd_device      *osd = osd_obj2dev(obj);
+        struct osd_thandle     *oh;
+        int                     result;
+        ENTRY;
+
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle);
+        LASSERT(inode);
+        LASSERT(osd_inode_unlinked(inode));
+
+        OSD_EXEC_OP(th, destroy);
+
+        result = osd_oi_delete(osd_oti_get(env), &osd->od_oi, fid, th);
+
+        /* XXX: add to ext3 orphan list */
+        /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
+
+        /* not needed in the cache anymore */
+        set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
+
+        RETURN(0);
+}
+
+/**
  * Helper function for osd_xattr_set()
  */
 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
@@ -2044,6 +2198,8 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, create);
+
         result = __osd_object_create(info, obj, attr, hint, dof, th);
 
         /* objects under osd root shld have igif fid, so dont add fid EA */
@@ -2058,12 +2214,30 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         RETURN(result);
 }
 
+static int osd_declare_object_ref_add(const struct lu_env *env,
+                               struct dt_object *dt,
+                               struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        /* it's possible that object doesn't exist yet */
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, ref_add);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
-static void osd_object_ref_add(const struct lu_env *env,
-                               struct dt_object *dt,
-                               struct thandle *th)
+static int osd_object_ref_add(const struct lu_env *env,
+                              struct dt_object *dt,
+                              struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
@@ -2073,20 +2247,42 @@ static void osd_object_ref_add(const struct lu_env *env,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, ref_add);
+
         cfs_spin_lock(&obj->oo_guard);
         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
         inode->i_nlink++;
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
+
+        return 0;
+}
+
+static int osd_declare_object_ref_del(const struct lu_env *env,
+                               struct dt_object *dt,
+                               struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, ref_del);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
 }
 
 /*
  * Concurrency: @dt is write locked.
  */
-static void osd_object_ref_del(const struct lu_env *env,
-                               struct dt_object *dt,
-                               struct thandle *th)
+static int osd_object_ref_del(const struct lu_env *env,
+                              struct dt_object *dt,
+                              struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
@@ -2096,12 +2292,30 @@ static void osd_object_ref_del(const struct lu_env *env,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, ref_del);
+
         cfs_spin_lock(&obj->oo_guard);
         LASSERT(inode->i_nlink > 0);
         inode->i_nlink--;
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
+
+        return 0;
+}
+
+/*
+ * Get the 64-bit version for an inode.
+ */
+static int osd_object_version_get(const struct lu_env *env,
+                                  struct dt_object *dt, dt_obj_version_t *ver)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
+               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        *ver = LDISKFS_I(inode)->i_fs_version;
+        return 0;
 }
 
 /*
@@ -2118,6 +2332,15 @@ static int osd_xattr_get(const struct lu_env *env,
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_obj_dentry;
 
+        /* version get is not real XATTR but uses xattr API */
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* for version we are just using xattr API but change inode
+                 * field instead */
+                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+                osd_object_version_get(env, dt, buf->lb_buf);
+                return sizeof(dt_obj_version_t);
+        }
+
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
@@ -2129,6 +2352,47 @@ static int osd_xattr_get(const struct lu_env *env,
         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
 }
 
+
+static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                                 const struct lu_buf *buf, const char *name,
+                                 int fl, struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* no credits for version */
+                return 0;
+        }
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, xattr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+        return 0;
+}
+
+/*
+ * Set the 64-bit version for object
+ */
+static void osd_object_version_set(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   dt_obj_version_t *new_version)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+               *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+
+        LDISKFS_I(inode)->i_fs_version = *new_version;
+        /** Version is set after all inode operations are finished,
+         *  so we should mark it dirty here */
+        inode->i_sb->s_op->dirty_inode(inode);
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
@@ -2138,9 +2402,19 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
 {
         LASSERT(handle != NULL);
 
+        /* version set is not real XATTR */
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* for version we are just using xattr API but change inode
+                 * field instead */
+                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+                osd_object_version_set(env, dt, buf->lb_buf);
+                return sizeof(dt_obj_version_t);
+        }
+
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, xattr_set);
         return __osd_xattr_set(env, dt, buf, name, fl);
 }
 
@@ -2168,6 +2442,25 @@ static int osd_xattr_list(const struct lu_env *env,
         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
 }
 
+static int osd_declare_xattr_del(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                const char *name,
+                                struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, xattr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+        return 0;
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
@@ -2191,6 +2484,8 @@ static int osd_xattr_del(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, xattr_set);
+
         dentry->d_inode = inode;
         rc = inode->i_op->removexattr(dentry, name);
         return rc;
@@ -2303,35 +2598,6 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
         RETURN(rc);
 }
 
-/*
- * Get the 64-bit version for an inode.
- */
-static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
-                                               struct dt_object *dt)
-{
-        struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
-        CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
-               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
-        return LDISKFS_I(inode)->i_fs_version;
-}
-
-/*
- * Set the 64-bit version and return the old version.
- */
-static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
-                                   dt_obj_version_t new_version)
-{
-        struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
-        CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
-               new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
-        LDISKFS_I(inode)->i_fs_version = new_version;
-        /** Version is set after all inode operations are finished,
-         *  so we should mark it dirty here */
-        inode->i_sb->s_op->dirty_inode(inode);
-}
-
 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
                         void **data)
 {
@@ -2477,27 +2743,33 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 }
 
 static const struct dt_object_operations osd_obj_ops = {
-        .do_read_lock    = osd_object_read_lock,
-        .do_write_lock   = osd_object_write_lock,
-        .do_read_unlock  = osd_object_read_unlock,
-        .do_write_unlock = osd_object_write_unlock,
-        .do_write_locked = osd_object_write_locked,
-        .do_attr_get     = osd_attr_get,
-        .do_attr_set     = osd_attr_set,
-        .do_ah_init      = osd_ah_init,
-        .do_create       = osd_object_create,
-        .do_index_try    = osd_index_try,
-        .do_ref_add      = osd_object_ref_add,
-        .do_ref_del      = osd_object_ref_del,
-        .do_xattr_get    = osd_xattr_get,
-        .do_xattr_set    = osd_xattr_set,
-        .do_xattr_del    = osd_xattr_del,
-        .do_xattr_list   = osd_xattr_list,
-        .do_capa_get     = osd_capa_get,
-        .do_object_sync  = osd_object_sync,
-        .do_version_get  = osd_object_version_get,
-        .do_version_set  = osd_object_version_set,
-        .do_data_get     = osd_data_get,
+        .do_read_lock         = osd_object_read_lock,
+        .do_write_lock        = osd_object_write_lock,
+        .do_read_unlock       = osd_object_read_unlock,
+        .do_write_unlock      = osd_object_write_unlock,
+        .do_write_locked      = osd_object_write_locked,
+        .do_attr_get          = osd_attr_get,
+        .do_declare_attr_set  = osd_declare_attr_set,
+        .do_attr_set          = osd_attr_set,
+        .do_ah_init           = osd_ah_init,
+        .do_declare_create    = osd_declare_object_create,
+        .do_create            = osd_object_create,
+        .do_declare_destroy   = osd_declare_object_destroy,
+        .do_destroy           = osd_object_destroy,
+        .do_index_try         = osd_index_try,
+        .do_declare_ref_add   = osd_declare_object_ref_add,
+        .do_ref_add           = osd_object_ref_add,
+        .do_declare_ref_del   = osd_declare_object_ref_del,
+        .do_ref_del           = osd_object_ref_del,
+        .do_xattr_get         = osd_xattr_get,
+        .do_declare_xattr_set = osd_declare_xattr_set,
+        .do_xattr_set         = osd_xattr_set,
+        .do_declare_xattr_del = osd_declare_xattr_del,
+        .do_xattr_del         = osd_xattr_del,
+        .do_xattr_list        = osd_xattr_list,
+        .do_capa_get          = osd_capa_get,
+        .do_object_sync       = osd_object_sync,
+        .do_data_get          = osd_data_get,
 };
 
 /**
@@ -2505,27 +2777,33 @@ static const struct dt_object_operations osd_obj_ops = {
  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
  */
 static const struct dt_object_operations osd_obj_ea_ops = {
-        .do_read_lock    = osd_object_read_lock,
-        .do_write_lock   = osd_object_write_lock,
-        .do_read_unlock  = osd_object_read_unlock,
-        .do_write_unlock = osd_object_write_unlock,
-        .do_write_locked = osd_object_write_locked,
-        .do_attr_get     = osd_attr_get,
-        .do_attr_set     = osd_attr_set,
-        .do_ah_init      = osd_ah_init,
-        .do_create       = osd_object_ea_create,
-        .do_index_try    = osd_index_try,
-        .do_ref_add      = osd_object_ref_add,
-        .do_ref_del      = osd_object_ref_del,
-        .do_xattr_get    = osd_xattr_get,
-        .do_xattr_set    = osd_xattr_set,
-        .do_xattr_del    = osd_xattr_del,
-        .do_xattr_list   = osd_xattr_list,
-        .do_capa_get     = osd_capa_get,
-        .do_object_sync  = osd_object_sync,
-        .do_version_get  = osd_object_version_get,
-        .do_version_set  = osd_object_version_set,
-        .do_data_get     = osd_data_get,
+        .do_read_lock         = osd_object_read_lock,
+        .do_write_lock        = osd_object_write_lock,
+        .do_read_unlock       = osd_object_read_unlock,
+        .do_write_unlock      = osd_object_write_unlock,
+        .do_write_locked      = osd_object_write_locked,
+        .do_attr_get          = osd_attr_get,
+        .do_declare_attr_set  = osd_declare_attr_set,
+        .do_attr_set          = osd_attr_set,
+        .do_ah_init           = osd_ah_init,
+        .do_declare_create    = osd_declare_object_create,
+        .do_create            = osd_object_ea_create,
+        .do_declare_destroy   = osd_declare_object_destroy,
+        .do_destroy           = osd_object_destroy,
+        .do_index_try         = osd_index_try,
+        .do_declare_ref_add   = osd_declare_object_ref_add,
+        .do_ref_add           = osd_object_ref_add,
+        .do_declare_ref_del   = osd_declare_object_ref_del,
+        .do_ref_del           = osd_object_ref_del,
+        .do_xattr_get         = osd_xattr_get,
+        .do_declare_xattr_set = osd_declare_xattr_set,
+        .do_xattr_set         = osd_xattr_set,
+        .do_declare_xattr_del = osd_declare_xattr_del,
+        .do_xattr_del         = osd_xattr_del,
+        .do_xattr_list        = osd_xattr_list,
+        .do_capa_get          = osd_capa_get,
+        .do_object_sync       = osd_object_sync,
+        .do_data_get          = osd_data_get,
 };
 
 /*
@@ -2704,6 +2982,23 @@ static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
         return err;
 }
 
+static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
+                                 const loff_t size, loff_t pos,
+                                 struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, write);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK];
+
+        return 0;
+}
+
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, loff_t *pos,
                          struct thandle *handle, struct lustre_capa *capa,
@@ -2748,11 +3043,35 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
         return result;
 }
 
+/*
+ * in some cases we may need declare methods for objects being created
+ * e.g., when we create symlink
+ */
+static const struct dt_body_operations osd_body_ops_new = {
+        .dbo_declare_write = osd_declare_write,
+};
+
 static const struct dt_body_operations osd_body_ops = {
-        .dbo_read  = osd_read,
-        .dbo_write = osd_write
+        .dbo_read          = osd_read,
+        .dbo_declare_write = osd_declare_write,
+        .dbo_write         = osd_write
 };
 
+static int osd_index_declare_iam_delete(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct dt_key *key,
+                                        struct thandle *handle)
+{
+        struct osd_thandle    *oh;
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        return 0;
+}
 
 /**
  *      delete a (key, value) pair from index \a dt specified by \a key
@@ -2786,6 +3105,8 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
                 RETURN(-EACCES);
 
+        OSD_EXEC_OP(handle, delete);
+
         ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
@@ -2800,6 +3121,25 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
+static int osd_index_declare_ea_delete(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct dt_key *key,
+                                       struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        return 0;
+}
+
 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
                                           struct dt_rec *fid)
 {
@@ -2843,6 +3183,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         LASSERT(dt_object_exists(dt));
         LASSERT(handle != NULL);
 
+        OSD_EXEC_OP(handle, delete);
+
         oh = container_of(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_handle != NULL);
         LASSERT(oh->ot_handle->h_transaction != NULL);
@@ -2937,6 +3279,26 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
+static int osd_index_declare_iam_insert(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct dt_rec *rec,
+                                        const struct dt_key *key,
+                                        struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        return 0;
+}
+
 /**
  *      Inserts (key, value) pair in \a dt index object.
  *
@@ -2974,6 +3336,8 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                 return -EACCES;
 
+        OSD_EXEC_OP(th, insert);
+
         ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
@@ -3262,6 +3626,26 @@ static inline void osd_object_put(const struct lu_env *env,
         lu_object_put(env, &obj->oo_dt.do_lu);
 }
 
+static int osd_index_declare_ea_insert(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct dt_rec *rec,
+                                       const struct dt_key *key,
+                                       struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        return 0;
+}
+
 /**
  * Index add function for interoperability mode (b11826).
  * It will add the directory entry.This entry is needed to
@@ -3488,13 +3872,14 @@ static inline void osd_it_pack_dirent(struct lu_dirent *ent,
  */
 static int osd_it_iam_rec(const struct lu_env *env,
                           const struct dt_it *di,
-                          struct lu_dirent *lde,
+                          struct dt_rec *dtrec,
                           __u32 attr)
 {
         struct osd_it_iam *it        = (struct osd_it_iam *)di;
         struct osd_thread_info *info = osd_oti_get(env);
         struct lu_fid     *fid       = &info->oti_fid;
         const struct osd_fid_pack *rec;
+        struct lu_dirent *lde = (struct lu_dirent *)dtrec;
         char *name;
         int namelen;
         __u64 hash;
@@ -3553,9 +3938,11 @@ static int osd_it_iam_load(const struct lu_env *env,
 }
 
 static const struct dt_index_operations osd_index_iam_ops = {
-        .dio_lookup = osd_index_iam_lookup,
-        .dio_insert = osd_index_iam_insert,
-        .dio_delete = osd_index_iam_delete,
+        .dio_lookup         = osd_index_iam_lookup,
+        .dio_declare_insert = osd_index_declare_iam_insert,
+        .dio_insert         = osd_index_iam_insert,
+        .dio_declare_delete = osd_index_declare_iam_delete,
+        .dio_delete         = osd_index_iam_delete,
         .dio_it     = {
                 .init     = osd_it_iam_init,
                 .fini     = osd_it_iam_fini,
@@ -3840,12 +4227,13 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
  */
 static inline int osd_it_ea_rec(const struct lu_env *env,
                                 const struct dt_it *di,
-                                struct lu_dirent *lde,
+                                struct dt_rec *dtrec,
                                 __u32 attr)
 {
         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
         struct osd_object       *obj    = it->oie_obj;
         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
+        struct lu_dirent        *lde    = (struct lu_dirent *)dtrec;
         int    rc = 0;
 
         ENTRY;
@@ -3938,9 +4326,11 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
  */
 static const struct dt_index_operations osd_index_ea_ops = {
-        .dio_lookup = osd_index_ea_lookup,
-        .dio_insert = osd_index_ea_insert,
-        .dio_delete = osd_index_ea_delete,
+        .dio_lookup         = osd_index_ea_lookup,
+        .dio_declare_insert = osd_index_declare_ea_insert,
+        .dio_insert         = osd_index_ea_insert,
+        .dio_declare_delete = osd_index_declare_ea_delete,
+        .dio_delete         = osd_index_ea_delete,
         .dio_it     = {
                 .init     = osd_it_ea_init,
                 .fini     = osd_it_ea_fini,
index 662f7a1..37fae5a 100644 (file)
@@ -275,7 +275,6 @@ struct osd_thread_info {
         /*
          * XXX temporary: for ->i_op calls.
          */
-        struct txn_param       oti_txn;
         struct timespec        oti_time;
         /*
          * XXX temporary: fake struct file for osd_object_sync
index 0de17bf..0b4f439 100644 (file)
@@ -28,6 +28,7 @@
 /*
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -149,19 +150,25 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
                                const struct lu_buf *buf, loff_t *off, int sync)
 {
         struct thandle *th;
-        struct txn_param p;
-        int rc, credits;
+        int rc;
         ENTRY;
 
-        credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
-                                                         DTO_WRITE_BLOCK);
-        txn_param_init(&p, credits);
-
-        th = dt_trans_start(env, lut->lut_bottom, &p);
+        th = dt_trans_create(env, lut->lut_bottom);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
+        rc = dt_declare_record_write(env, lut->lut_last_rcvd,
+                                     buf->lb_len, *off, th);
+        if (rc)
+                goto stop;
+
+        rc = dt_trans_start(env, lut->lut_bottom, th);
+        if (rc)
+                goto stop;
+
         rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
+
+stop:
         dt_trans_stop(env, lut->lut_bottom, th);
 
         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
@@ -239,8 +246,8 @@ int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
 /**
  * Update server data in last_rcvd
  */
-static int lut_server_data_update(const struct lu_env *env,
-                                  struct lu_target *lut, int sync)
+int lut_server_data_update(const struct lu_env *env,
+                           struct lu_target *lut, int sync)
 {
         struct lr_server_data tmp_lsd;
         loff_t tmp_off = 0;
index 596e7a4..872d1d0 100755 (executable)
@@ -2077,7 +2077,10 @@ run_test 85a "check the cancellation of unused locks during recovery(IBITS)"
 test_85b() { #bug 16774
     lctl set_param -n ldlm.cancel_unused_locks_before_replay "1"
 
-    lfs setstripe -o 0 -c 1 $DIR
+    do_facet mgs $LCTL pool_new $FSNAME.$TESTNAME || return 1
+    do_facet mgs $LCTL pool_add $FSNAME.$TESTNAME $FSNAME-OST0000 || return 2
+
+    lfs setstripe -c 1 -p $FSNAME.$TESTNAME $DIR
 
     for i in `seq 100`; do
         dd if=/dev/urandom of=$DIR/$tfile-$i bs=4096 count=32 >/dev/null 2>&1
@@ -2093,12 +2096,16 @@ test_85b() { #bug 16774
     addr=`echo $lov_id | awk '{print $4}' | awk -F '-' '{print $3}'`
     count=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
     echo "before recovery: unused locks count = $count"
+    [ $count != 0 ] || return 3
 
     fail ost1
 
     count2=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
     echo "after recovery: unused locks count = $count2"
 
+    do_facet mgs $LCTL pool_remove $FSNAME.$TESTNAME $FSNAME-OST0000 || return 4
+    do_facet mgs $LCTL pool_destroy $FSNAME.$TESTNAME || return 5
+
     if [ $count2 -ge $count ]; then
         error "unused locks are not canceled"
     fi