Whamcloud - gitweb
LU-3963 obdclass: convert to linux list api
[fs/lustre-release.git] / lustre / include / dt_object.h
index ea09e46..acf22f5 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -29,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -51,7 +49,7 @@
  * @{
  */
 
-
+#include <obd_support.h>
 /*
  * super-class definitions.
  */
@@ -67,9 +65,9 @@ struct thandle;
 struct dt_device;
 struct dt_object;
 struct dt_index_features;
-struct dt_quota_ctxt;
 struct niobuf_local;
 struct niobuf_remote;
+struct ldlm_enqueue_info;
 
 typedef enum {
         MNTOPT_USERXATTR        = 0x00000001,
@@ -82,7 +80,6 @@ struct dt_device_param {
         unsigned           ddp_block_shift;
         mntopt_t           ddp_mntopts;
         unsigned           ddp_max_ea_size;
-        void              *ddp_mnt; /* XXX: old code can retrieve mnt -bzzz */
         int                ddp_mount_type;
         unsigned long long ddp_maxbytes;
         /* percentage of available space to reserve for grant error margin */
@@ -104,9 +101,14 @@ typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th,
  * Special per-transaction callback for cases when just commit callback
  * is needed and per-device callback are not convenient to use
  */
+#define TRANS_COMMIT_CB_MAGIC  0xa0a00a0a
+#define MAX_COMMIT_CB_STR_LEN  32
+
 struct dt_txn_commit_cb {
-        cfs_list_t dcb_linkage;
-        dt_cb_t    dcb_func;
+       struct list_head        dcb_linkage;
+       dt_cb_t                 dcb_func;
+       __u32                   dcb_magic;
+       char                    dcb_name[MAX_COMMIT_CB_STR_LEN];
 };
 
 /**
@@ -128,11 +130,11 @@ struct dt_device_operations {
          */
         int   (*dt_trans_start)(const struct lu_env *env,
                                 struct dt_device *dev, struct thandle *th);
-        /**
-         * Finish previously started transaction.
-         */
-        int   (*dt_trans_stop)(const struct lu_env *env,
-                               struct thandle *th);
+       /**
+        * Finish previously started transaction.
+        */
+       int   (*dt_trans_stop)(const struct lu_env *env, struct dt_device *dev,
+                              struct thandle *th);
         /**
          * Add commit callback to the transaction.
          */
@@ -171,12 +173,6 @@ struct dt_device_operations {
                                    struct dt_device *dev,
                                    int mode, unsigned long timeout,
                                    __u32 alg, struct lustre_capa_key *keys);
-        /**
-         * Initialize quota context.
-         */
-        void (*dt_init_quota_ctxt)(const struct lu_env *env,
-                                   struct dt_device *dev,
-                                   struct dt_quota_ctxt *ctxt, void *data);
 };
 
 struct dt_index_features {
@@ -202,7 +198,12 @@ enum dt_index_flags {
         /** index can be modified */
         DT_IND_UPDATE = 1 << 2,
         /** index supports records with non-unique (duplicate) keys */
-        DT_IND_NONUNQ = 1 << 3
+        DT_IND_NONUNQ = 1 << 3,
+        /**
+         * index support fixed-size keys sorted with natural numerical way
+         * and is able to return left-side value if no exact value found
+         */
+        DT_IND_RANGE = 1 << 4,
 };
 
 /**
@@ -210,6 +211,18 @@ enum dt_index_flags {
  * names to fids).
  */
 extern const struct dt_index_features dt_directory_features;
+extern const struct dt_index_features dt_otable_features;
+extern const struct dt_index_features dt_lfsck_orphan_features;
+extern const struct dt_index_features dt_lfsck_features;
+
+/* index features supported by the accounting objects */
+extern const struct dt_index_features dt_acct_features;
+
+/* index features supported by the quota global indexes */
+extern const struct dt_index_features dt_quota_glb_features;
+
+/* index features supported by the quota slave indexes */
+extern const struct dt_index_features dt_quota_slv_features;
 
 /**
  * This is a general purpose dt allocation hint.
@@ -217,8 +230,10 @@ extern const struct dt_index_features dt_directory_features;
  * It can contain any allocation hint in the future.
  */
 struct dt_allocation_hint {
-        struct dt_object           *dah_parent;
-        __u32                       dah_mode;
+       struct dt_object        *dah_parent;
+       const void              *dah_eadata;
+       int                     dah_eadata_len;
+       __u32                   dah_mode;
 };
 
 /**
@@ -244,6 +259,7 @@ struct dt_object_format {
         enum dt_format_type dof_type;
         union {
                 struct dof_regular {
+                       int striped;
                 } dof_reg;
                 struct dof_dir {
                 } dof_dir;
@@ -263,6 +279,8 @@ enum dt_format_type dt_mode_to_dft(__u32 mode);
 
 typedef __u64 dt_obj_version_t;
 
+union ldlm_policy_data;
+
 /**
  * Per-dt-object operations.
  */
@@ -287,6 +305,9 @@ struct dt_object_operations {
          * lu_object_operations, but that would break existing symmetry.
          */
 
+       int   (*do_declare_attr_get)(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct lustre_capa *capa);
         /**
          * Return standard attributes.
          *
@@ -309,6 +330,13 @@ struct dt_object_operations {
                              const struct lu_attr *attr,
                              struct thandle *handle,
                              struct lustre_capa *capa);
+
+       int   (*do_declare_xattr_get)(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lu_buf *buf,
+                                     const char *name,
+                                     struct lustre_capa *capa);
+
         /**
          * Return a value of an extended attribute.
          *
@@ -361,10 +389,11 @@ struct dt_object_operations {
          * (2) The type of child is in \a child_mode.
          * (3) The result hint is stored in \a ah;
          */
-        void  (*do_ah_init)(const struct lu_env *env,
-                            struct dt_allocation_hint *ah,
-                            struct dt_object *parent,
-                            cfs_umode_t child_mode);
+       void  (*do_ah_init)(const struct lu_env *env,
+                           struct dt_allocation_hint *ah,
+                           struct dt_object *parent,
+                           struct dt_object *child,
+                           umode_t child_mode);
         /**
          * Create new object on this device.
          *
@@ -422,18 +451,31 @@ struct dt_object_operations {
         int   (*do_ref_del)(const struct lu_env *env,
                             struct dt_object *dt, struct thandle *th);
 
-        struct obd_capa *(*do_capa_get)(const struct lu_env *env,
-                                        struct dt_object *dt,
-                                        struct lustre_capa *old,
-                                        __u64 opc);
-        int (*do_object_sync)(const struct lu_env *, struct dt_object *);
-        /**
-         * Get object info of next level. Currently, only get inode from osd.
-         * This is only used by quota b=16542
-         * precondition: dt_object_exists(dt);
-         */
-        int (*do_data_get)(const struct lu_env *env, struct dt_object *dt,
-                           void **data);
+       struct obd_capa *(*do_capa_get)(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       struct lustre_capa *old,
+                                       __u64 opc);
+       int (*do_object_sync)(const struct lu_env *env, struct dt_object *obj,
+                             __u64 start, __u64 end);
+       /**
+        * Get object info of next level. Currently, only get inode from osd.
+        * This is only used by quota b=16542
+        * precondition: dt_object_exists(dt);
+        */
+       int (*do_data_get)(const struct lu_env *env, struct dt_object *dt,
+                          void **data);
+
+       /**
+        * Lock object.
+        */
+       int (*do_object_lock)(const struct lu_env *env, struct dt_object *dt,
+                             struct lustre_handle *lh,
+                             struct ldlm_enqueue_info *einfo,
+                             union ldlm_policy_data *policy);
+
+       int (*do_object_unlock)(const struct lu_env *env, struct dt_object *dt,
+                               struct ldlm_enqueue_info *einfo,
+                               union ldlm_policy_data *policy);
 };
 
 /**
@@ -446,17 +488,17 @@ struct dt_body_operations {
         ssize_t (*dbo_read)(const struct lu_env *env, struct dt_object *dt,
                             struct lu_buf *buf, loff_t *pos,
                             struct lustre_capa *capa);
-        /**
-         * precondition: dt_object_exists(dt);
-         */
-        ssize_t (*dbo_declare_write)(const struct lu_env *env,
-                                     struct dt_object *dt,
-                                     const loff_t size, loff_t pos,
-                                     struct thandle *handle);
-        ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
-                             const struct lu_buf *buf, loff_t *pos,
-                             struct thandle *handle, struct lustre_capa *capa,
-                             int ignore_quota);
+       /**
+        * precondition: dt_object_exists(dt);
+        */
+       ssize_t (*dbo_declare_write)(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const struct lu_buf *buf, loff_t pos,
+                                    struct thandle *handle);
+       ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_buf *buf, loff_t *pos,
+                            struct thandle *handle, struct lustre_capa *capa,
+                            int ignore_quota);
         /*
          * methods for zero-copy IO
          */
@@ -504,9 +546,9 @@ struct dt_body_operations {
          * Punch object's content
          * precondition: regular object, not index
          */
-        int   (*do_declare_punch)(const struct lu_env *, struct dt_object *,
+        int   (*dbo_declare_punch)(const struct lu_env *, struct dt_object *,
                                   __u64, __u64, struct thandle *th);
-        int   (*do_punch)(const struct lu_env *env, struct dt_object *dt,
+        int   (*dbo_punch)(const struct lu_env *env, struct dt_object *dt,
                           __u64 start, __u64 end, struct thandle *th,
                           struct lustre_capa *capa);
 };
@@ -588,6 +630,9 @@ struct dt_index_operations {
                                       const struct dt_it *di,
                                       struct dt_rec *rec,
                                       __u32 attr);
+               int        (*rec_size)(const struct lu_env *env,
+                                      const struct dt_it *di,
+                                     __u32 attr);
                 __u64        (*store)(const struct lu_env *env,
                                       const struct dt_it *di);
                 int           (*load)(const struct lu_env *env,
@@ -597,6 +642,36 @@ struct dt_index_operations {
         } dio_it;
 };
 
+enum dt_otable_it_valid {
+       DOIV_ERROR_HANDLE       = 0x0001,
+       DOIV_DRYRUN             = 0x0002,
+};
+
+enum dt_otable_it_flags {
+       /* Exit when fail. */
+       DOIF_FAILOUT    = 0x0001,
+
+       /* Reset iteration position to the device beginning. */
+       DOIF_RESET      = 0x0002,
+
+       /* There is up layer component uses the iteration. */
+       DOIF_OUTUSED    = 0x0004,
+
+       /* Check only without repairing. */
+       DOIF_DRYRUN     = 0x0008,
+};
+
+/* otable based iteration needs to use the common DT interation APIs.
+ * To initialize the iteration, it needs call dio_it::init() firstly.
+ * Here is how the otable based iteration should prepare arguments to
+ * call dt_it_ops::init().
+ *
+ * For otable based iteration, the 32-bits 'attr' for dt_it_ops::init()
+ * is composed of two parts:
+ * low 16-bits is for valid bits, high 16-bits is for flags bits. */
+#define DT_OTABLE_IT_FLAGS_SHIFT       16
+#define DT_OTABLE_IT_FLAGS_MASK        0xffff0000
+
 struct dt_device {
         struct lu_device                   dd_lu_dev;
         const struct dt_device_operations *dd_ops;
@@ -606,7 +681,8 @@ struct dt_device {
          * way, because callbacks are supposed to be added/deleted only during
          * single-threaded start-up shut-down procedures.
          */
-        cfs_list_t                         dd_txn_callbacks;
+       struct list_head                   dd_txn_callbacks;
+       unsigned int                       dd_record_fid_accessed:1;
 };
 
 int  dt_device_init(struct dt_device *dev, struct lu_device_type *t);
@@ -630,6 +706,29 @@ struct dt_object {
         const struct dt_index_operations  *do_index_ops;
 };
 
+/*
+ * In-core representation of per-device local object OID storage
+ */
+struct local_oid_storage {
+       /* all initialized llog systems on this node linked by this */
+       struct list_head  los_list;
+
+       /* how many handle's reference this los has */
+       atomic_t          los_refcount;
+       struct dt_device *los_dev;
+       struct dt_object *los_obj;
+
+       /* data used to generate new fids */
+       struct mutex      los_id_lock;
+       __u64             los_seq;
+       __u32             los_last_oid;
+};
+
+static inline struct lu_device *dt2lu_dev(struct dt_device *d)
+{
+        return &d->dd_lu_dev;
+}
+
 static inline struct dt_object *lu2dt(struct lu_object *l)
 {
         LASSERT(l == NULL || IS_ERR(l) || lu_device_is_dt(l->lo_dev));
@@ -646,6 +745,30 @@ static inline int dt_object_exists(const struct dt_object *dt)
         return lu_object_exists(&dt->do_lu);
 }
 
+static inline int dt_object_remote(const struct dt_object *dt)
+{
+       return lu_object_remote(&dt->do_lu);
+}
+
+static inline struct dt_object *lu2dt_obj(struct lu_object *o)
+{
+       LASSERT(ergo(o != NULL, lu_device_is_dt(o->lo_dev)));
+       return container_of0(o, struct dt_object, do_lu);
+}
+
+struct thandle_update {
+       /* In DNE, one transaction can be disassembled into
+        * updates on several different MDTs, and these updates
+        * will be attached to tu_remote_update_list per target.
+        * Only single thread will access the list, no need lock
+        */
+       struct list_head        tu_remote_update_list;
+
+       /* sent after or before local transaction */
+       unsigned int            tu_sent_after_local_trans:1,
+                               tu_only_remote_trans:1;
+};
+
 /**
  * This is the general purpose transaction handle.
  * 1. Transaction Life Cycle
@@ -661,26 +784,45 @@ static inline int dt_object_exists(const struct dt_object *dt)
  *      No RPC request should be issued inside transaction.
  */
 struct thandle {
-        /** the dt device on which the transactions are executed */
-        struct dt_device *th_dev;
+       /** the dt device on which the transactions are executed */
+       struct dt_device *th_dev;
+
+       atomic_t        th_refc;
+       /* the size of transaction */
+       int             th_alloc_size;
+
+       /** context for this transaction, tag is LCT_TX_HANDLE */
+       struct lu_context th_ctx;
 
-        /** additional tags (layers can add in declare) */
-        __u32             th_tags;
+       /** additional tags (layers can add in declare) */
+       __u32             th_tags;
 
-        /** context for this transaction, tag is LCT_TX_HANDLE */
-        struct lu_context th_ctx;
+       /** the last operation result in this transaction.
+        * this value is used in recovery */
+       __s32             th_result;
 
-        /** the last operation result in this transaction.
-         * this value is used in recovery */
-        __s32             th_result;
+       /** whether we need sync commit */
+       unsigned int            th_sync:1;
 
-        /** whether we need sync commit */
-        int               th_sync:1;
+       /* local transation, no need to inform other layers */
+       unsigned int            th_local:1;
 
-        /* local transation, no need to inform other layers */
-        int               th_local:1;
+       struct thandle_update   *th_update;
 };
 
+static inline void thandle_get(struct thandle *thandle)
+{
+       atomic_inc(&thandle->th_refc);
+}
+
+static inline void thandle_put(struct thandle *thandle)
+{
+       if (atomic_dec_and_test(&thandle->th_refc)) {
+               if (thandle->th_update != NULL)
+                       OBD_FREE_PTR(thandle->th_update);
+               OBD_FREE(thandle, thandle->th_alloc_size);
+       }
+}
 /**
  * Transaction call-backs.
  *
@@ -698,9 +840,9 @@ struct dt_txn_callback {
         int (*dtc_txn_stop)(const struct lu_env *env,
                             struct thandle *txn, void *cookie);
         void (*dtc_txn_commit)(struct thandle *txn, void *cookie);
-        void                *dtc_cookie;
-        __u32                dtc_tag;
-        cfs_list_t           dtc_linkage;
+       void                    *dtc_cookie;
+       __u32                   dtc_tag;
+       struct list_head        dtc_linkage;
 };
 
 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb);
@@ -727,15 +869,114 @@ int dt_path_parser(const struct lu_env *env,
                    char *local, dt_entry_func_t entry_func,
                    void *data);
 
+struct dt_object *
+dt_store_resolve(const struct lu_env *env, struct dt_device *dt,
+                const char *path, struct lu_fid *fid);
+
 struct dt_object *dt_store_open(const struct lu_env *env,
                                 struct dt_device *dt,
                                 const char *dirname,
                                 const char *filename,
                                 struct lu_fid *fid);
 
-struct dt_object *dt_locate(const struct lu_env *env,
-                            struct dt_device *dev,
-                            const struct lu_fid *fid);
+struct dt_object *dt_find_or_create(const struct lu_env *env,
+                                    struct dt_device *dt,
+                                    const struct lu_fid *fid,
+                                    struct dt_object_format *dof,
+                                    struct lu_attr *attr);
+
+struct dt_object *dt_locate_at(const struct lu_env *env,
+                              struct dt_device *dev,
+                              const struct lu_fid *fid,
+                              struct lu_device *top_dev,
+                              const struct lu_object_conf *conf);
+
+static inline struct dt_object *
+dt_locate(const struct lu_env *env, struct dt_device *dev,
+         const struct lu_fid *fid)
+{
+       return dt_locate_at(env, dev, fid,
+                           dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
+}
+
+int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
+                          const struct lu_fid *first_fid,
+                          struct local_oid_storage **los);
+void local_oid_storage_fini(const struct lu_env *env,
+                           struct local_oid_storage *los);
+int local_object_fid_generate(const struct lu_env *env,
+                             struct local_oid_storage *los,
+                             struct lu_fid *fid);
+int local_object_declare_create(const struct lu_env *env,
+                               struct local_oid_storage *los,
+                               struct dt_object *o,
+                               struct lu_attr *attr,
+                               struct dt_object_format *dof,
+                               struct thandle *th);
+int local_object_create(const struct lu_env *env,
+                       struct local_oid_storage *los,
+                       struct dt_object *o,
+                       struct lu_attr *attr, struct dt_object_format *dof,
+                       struct thandle *th);
+struct dt_object *local_file_find_or_create(const struct lu_env *env,
+                                           struct local_oid_storage *los,
+                                           struct dt_object *parent,
+                                           const char *name, __u32 mode);
+struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
+                                                    struct dt_device *dt,
+                                                    const struct lu_fid *fid,
+                                                    struct dt_object *parent,
+                                                    const char *name,
+                                                    __u32 mode);
+struct dt_object *
+local_index_find_or_create(const struct lu_env *env,
+                          struct local_oid_storage *los,
+                          struct dt_object *parent,
+                          const char *name, __u32 mode,
+                          const struct dt_index_features *ft);
+struct dt_object *
+local_index_find_or_create_with_fid(const struct lu_env *env,
+                                   struct dt_device *dt,
+                                   const struct lu_fid *fid,
+                                   struct dt_object *parent,
+                                   const char *name, __u32 mode,
+                                   const struct dt_index_features *ft);
+int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
+                       struct dt_object *parent, const char *name);
+
+static inline int dt_object_lock(const struct lu_env *env,
+                                struct dt_object *o, struct lustre_handle *lh,
+                                struct ldlm_enqueue_info *einfo,
+                                union ldlm_policy_data *policy)
+{
+       LASSERT(o != NULL);
+       LASSERT(o->do_ops != NULL);
+       LASSERT(o->do_ops->do_object_lock != NULL);
+       return o->do_ops->do_object_lock(env, o, lh, einfo, policy);
+}
+
+static inline int dt_object_unlock(const struct lu_env *env,
+                                  struct dt_object *o,
+                                  struct ldlm_enqueue_info *einfo,
+                                  union ldlm_policy_data *policy)
+{
+       LASSERT(o != NULL);
+       LASSERT(o->do_ops != NULL);
+       LASSERT(o->do_ops->do_object_unlock != NULL);
+       return o->do_ops->do_object_unlock(env, o, einfo, policy);
+}
+
+int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
+                 const char *name, struct lu_fid *fid);
+
+static inline int dt_object_sync(const struct lu_env *env, struct dt_object *o,
+                                __u64 start, __u64 end)
+{
+       LASSERT(o);
+       LASSERT(o->do_ops);
+       LASSERT(o->do_ops->do_object_sync);
+       return o->do_ops->do_object_sync(env, o, start, end);
+}
 
 int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
                            struct thandle *th);
@@ -744,11 +985,21 @@ void dt_version_set(const struct lu_env *env, struct dt_object *o,
 dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o);
 
 
+int dt_read(const struct lu_env *env, struct dt_object *dt,
+            struct lu_buf *buf, loff_t *pos);
 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
                    struct lu_buf *buf, loff_t *pos);
 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
                     const struct lu_buf *buf, loff_t *pos, struct thandle *th);
-
+typedef int (*dt_index_page_build_t)(const struct lu_env *env,
+                                    union lu_page *lp, int nob,
+                                    const struct dt_it_ops *iops,
+                                    struct dt_it *it, __u32 attr, void *arg);
+int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
+                 const struct lu_rdpg *rdpg, dt_index_page_build_t filler,
+                 void *arg);
+int dt_index_read(const struct lu_env *env, struct dt_device *dev,
+                 struct idx_info *ii, const struct lu_rdpg *rdpg);
 
 static inline struct thandle *dt_trans_create(const struct lu_env *env,
                                               struct dt_device *d)
@@ -776,30 +1027,34 @@ static inline int dt_trans_start_local(const struct lu_env *env,
 static inline int dt_trans_stop(const struct lu_env *env,
                                 struct dt_device *d, struct thandle *th)
 {
-        LASSERT(d->dd_ops->dt_trans_stop);
-        return d->dd_ops->dt_trans_stop(env, th);
+       LASSERT(d->dd_ops->dt_trans_stop);
+       return d->dd_ops->dt_trans_stop(env, d, th);
 }
 
 static inline int dt_trans_cb_add(struct thandle *th,
-                                  struct dt_txn_commit_cb *dcb)
+                                 struct dt_txn_commit_cb *dcb)
 {
-        LASSERT(th->th_dev->dd_ops->dt_trans_cb_add);
-        return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb);
+       LASSERT(th->th_dev->dd_ops->dt_trans_cb_add);
+       dcb->dcb_magic = TRANS_COMMIT_CB_MAGIC;
+       return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb);
 }
 /** @} dt */
 
 
 static inline int dt_declare_record_write(const struct lu_env *env,
-                                          struct dt_object *dt,
-                                          int size, loff_t pos,
-                                          struct thandle *th)
+                                         struct dt_object *dt,
+                                         const struct lu_buf *buf,
+                                         loff_t pos,
+                                         struct thandle *th)
 {
-        int rc;
-
-        LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
-        LASSERT(th != NULL);
-        rc = dt->do_body_ops->dbo_declare_write(env, dt, size, pos, th);
-        return rc;
+       int rc;
+
+       LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
+       LASSERT(th != NULL);
+       LASSERT(dt->do_body_ops);
+       LASSERT(dt->do_body_ops->dbo_declare_write);
+       rc = dt->do_body_ops->dbo_declare_write(env, dt, buf, pos, th);
+       return rc;
 }
 
 static inline int dt_declare_create(const struct lu_env *env,
@@ -895,6 +1150,16 @@ static inline int dt_write_locked(const struct lu_env *env,
         return dt->do_ops->do_write_locked(env, dt);
 }
 
+static inline int dt_declare_attr_get(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lustre_capa *capa)
+{
+       LASSERT(dt);
+       LASSERT(dt->do_ops);
+       LASSERT(dt->do_ops->do_declare_attr_get);
+       return dt->do_ops->do_declare_attr_get(env, dt, capa);
+}
+
 static inline int dt_attr_get(const struct lu_env *env, struct dt_object *dt,
                               struct lu_attr *la, void *arg)
 {
@@ -967,20 +1232,20 @@ static inline struct obd_capa *dt_capa_get(const struct lu_env *env,
 {
         LASSERT(dt);
         LASSERT(dt->do_ops);
-        LASSERT(dt->do_ops->do_ref_del);
+       LASSERT(dt->do_ops->do_capa_get);
         return dt->do_ops->do_capa_get(env, dt, old, opc);
 }
 
 static inline int dt_bufs_get(const struct lu_env *env, struct dt_object *d,
-                              struct niobuf_remote *rnb,
-                              struct niobuf_local *lnb, int rw,
-                              struct lustre_capa *capa)
+                             struct niobuf_remote *rnb,
+                             struct niobuf_local *lnb, int rw,
+                             struct lustre_capa *capa)
 {
-        LASSERT(d);
-        LASSERT(d->do_body_ops);
-        LASSERT(d->do_body_ops->dbo_bufs_get);
-        return d->do_body_ops->dbo_bufs_get(env, d, rnb->offset,
-                                            rnb->len, lnb, rw, capa);
+       LASSERT(d);
+       LASSERT(d->do_body_ops);
+       LASSERT(d->do_body_ops->dbo_bufs_get);
+       return d->do_body_ops->dbo_bufs_get(env, d, rnb->rnb_offset,
+                                           rnb->rnb_len, lnb, rw, capa);
 }
 
 static inline int dt_bufs_put(const struct lu_env *env, struct dt_object *d,
@@ -1037,8 +1302,8 @@ static inline int dt_declare_punch(const struct lu_env *env,
 {
         LASSERT(dt);
         LASSERT(dt->do_body_ops);
-        LASSERT(dt->do_body_ops->do_declare_punch);
-        return dt->do_body_ops->do_declare_punch(env, dt, start, end, th);
+        LASSERT(dt->do_body_ops->dbo_declare_punch);
+        return dt->do_body_ops->dbo_declare_punch(env, dt, start, end, th);
 }
 
 static inline int dt_punch(const struct lu_env *env, struct dt_object *dt,
@@ -1047,8 +1312,8 @@ static inline int dt_punch(const struct lu_env *env, struct dt_object *dt,
 {
         LASSERT(dt);
         LASSERT(dt->do_body_ops);
-        LASSERT(dt->do_body_ops->do_punch);
-        return dt->do_body_ops->do_punch(env, dt, start, end, th, capa);
+        LASSERT(dt->do_body_ops->dbo_punch);
+        return dt->do_body_ops->dbo_punch(env, dt, start, end, th, capa);
 }
 
 static inline int dt_fiemap_get(const struct lu_env *env, struct dt_object *d,
@@ -1057,7 +1322,8 @@ static inline int dt_fiemap_get(const struct lu_env *env, struct dt_object *d,
         LASSERT(d);
         if (d->do_body_ops == NULL)
                 return -EPROTO;
-        LASSERT(d->do_body_ops->dbo_fiemap_get);
+       if (d->do_body_ops->dbo_fiemap_get == NULL)
+               return -EOPNOTSUPP;
         return d->do_body_ops->dbo_fiemap_get(env, d, fm);
 }
 
@@ -1177,6 +1443,18 @@ static inline int dt_xattr_set(const struct lu_env *env,
         return dt->do_ops->do_xattr_set(env, dt, buf, name, fl, th, capa);
 }
 
+static inline int dt_declare_xattr_get(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      struct lu_buf *buf,
+                                      const char *name,
+                                      struct lustre_capa *capa)
+{
+       LASSERT(dt);
+       LASSERT(dt->do_ops);
+       LASSERT(dt->do_ops->do_declare_xattr_get);
+       return dt->do_ops->do_declare_xattr_get(env, dt, buf, name, capa);
+}
+
 static inline int dt_xattr_get(const struct lu_env *env,
                               struct dt_object *dt, struct lu_buf *buf,
                               const char *name, struct lustre_capa *capa)
@@ -1229,6 +1507,18 @@ static inline int dt_commit_async(const struct lu_env *env,
         return dev->dd_ops->dt_commit_async(env, dev);
 }
 
+static inline int dt_init_capa_ctxt(const struct lu_env *env,
+                                   struct dt_device *dev,
+                                   int mode, unsigned long timeout,
+                                   __u32 alg, struct lustre_capa_key *keys)
+{
+       LASSERT(dev);
+       LASSERT(dev->dd_ops);
+       LASSERT(dev->dd_ops->dt_init_capa_ctxt);
+       return dev->dd_ops->dt_init_capa_ctxt(env, dev, mode,
+                                             timeout, alg, keys);
+}
+
 static inline int dt_lookup(const struct lu_env *env,
                             struct dt_object *dt,
                             struct dt_rec *rec,
@@ -1248,4 +1538,77 @@ static inline int dt_lookup(const struct lu_env *env,
                 ret = -ENOENT;
         return ret;
 }
+
+#define LU221_BAD_TIME (0x80000000U + 24 * 3600)
+
+struct dt_find_hint {
+       struct lu_fid        *dfh_fid;
+       struct dt_device     *dfh_dt;
+       struct dt_object     *dfh_o;
+};
+
+struct dt_insert_rec {
+       union {
+               const struct lu_fid     *rec_fid;
+               void                    *rec_data;
+       };
+       union {
+               struct {
+                       __u32            rec_type;
+                       __u32            rec_padding;
+               };
+               __u64                    rec_misc;
+       };
+};
+
+struct dt_thread_info {
+       char                     dti_buf[DT_MAX_PATH];
+       struct dt_find_hint      dti_dfh;
+       struct lu_attr           dti_attr;
+       struct lu_fid            dti_fid;
+       struct dt_object_format  dti_dof;
+       struct lustre_mdt_attrs  dti_lma;
+       struct lu_buf            dti_lb;
+       struct lu_object_conf    dti_conf;
+       loff_t                   dti_off;
+       struct dt_insert_rec     dti_dt_rec;
+};
+
+extern struct lu_context_key dt_key;
+
+static inline struct dt_thread_info *dt_info(const struct lu_env *env)
+{
+       struct dt_thread_info *dti;
+
+       dti = lu_context_key_get(&env->le_ctx, &dt_key);
+       LASSERT(dti);
+       return dti;
+}
+
+int dt_global_init(void);
+void dt_global_fini(void);
+
+# ifdef LPROCFS
+#ifndef HAVE_ONLY_PROCFS_SEQ
+int lprocfs_dt_rd_blksize(char *page, char **start, off_t off,
+                         int count, int *eof, void *data);
+int lprocfs_dt_rd_kbytestotal(char *page, char **start, off_t off,
+                             int count, int *eof, void *data);
+int lprocfs_dt_rd_kbytesfree(char *page, char **start, off_t off,
+                            int count, int *eof, void *data);
+int lprocfs_dt_rd_kbytesavail(char *page, char **start, off_t off,
+                             int count, int *eof, void *data);
+int lprocfs_dt_rd_filestotal(char *page, char **start, off_t off,
+                            int count, int *eof, void *data);
+int lprocfs_dt_rd_filesfree(char *page, char **start, off_t off,
+                           int count, int *eof, void *data);
+#endif
+int lprocfs_dt_blksize_seq_show(struct seq_file *m, void *v);
+int lprocfs_dt_kbytestotal_seq_show(struct seq_file *m, void *v);
+int lprocfs_dt_kbytesfree_seq_show(struct seq_file *m, void *v);
+int lprocfs_dt_kbytesavail_seq_show(struct seq_file *m, void *v);
+int lprocfs_dt_filestotal_seq_show(struct seq_file *m, void *v);
+int lprocfs_dt_filesfree_seq_show(struct seq_file *m, void *v);
+# endif /* LPROCFS */
+
 #endif /* __LUSTRE_DT_OBJECT_H */