Whamcloud - gitweb
LU-3536 lod: Separate thandle to different layers. 40/10640/55
authorWang Di <di.wang@intel.com>
Thu, 19 Jun 2014 10:18:29 +0000 (03:18 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 8 Apr 2015 02:04:02 +0000 (02:04 +0000)
Separate thandle into different layers on MDT stack.

The current implementation use single thandle in all
layers, which might cause some issues for cross-MDT
transaction, for example during transaction stop,
it needs to stop local OSD transaction first,
then send remote RPC, because we do not want hold the
transaction, during RPC sending, but once we stop osd
transaction, which might cause this single thandle be
destroyed (see osd_trans_stop()), but all of remote
updates are still attached in this thandle.

This patch will separate the thandle to different layers:
1. MDD thandle will present itself in MDD transaction.
2. LOD thandle will distribute the thandle to all sub-thandles,
   and also attach the update blob buffer to store updates for
   cross-MDT operations.
3. OSP thandle will store the updates for the remote
   correspondent target, and also manage to send them to the
   remote target.
4. OSD thandle will still be the same as the original one,
   relying on the bottom FS to achieve the "atomic".

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I0c73cc80fa692c2e5d5a09e441c28e228d822ce0
Reviewed-on: http://review.whamcloud.com/10640
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
25 files changed:
lustre/include/dt_object.h
lustre/include/lustre_update.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_namespace.c
lustre/lod/Makefile.in
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/lod/lod_qos.c
lustre/lod/lod_sub_object.c [new file with mode: 0644]
lustre/mdd/mdd_dir.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_handler.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_sync.c
lustre/osp/osp_trans.c
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/layout.c
lustre/target/Makefile.am
lustre/target/out_lib.c
lustre/target/update_trans.c [new file with mode: 0644]
lustre/tests/test-framework.sh

index cb99e18..c192728 100644 (file)
@@ -1776,19 +1776,6 @@ static inline struct dt_object *lu2dt_obj(struct lu_object *o)
        return container_of0(o, struct dt_object, do_lu);
 }
 
        return container_of0(o, struct dt_object, do_lu);
 }
 
-struct thandle_update {
-       /* In DNE, one transaction can be disassembled into
-        * updates on several different MDTs, and these updates
-        * will be attached to tu_remote_update_list per target.
-        * Only single thread will access the list, no need lock
-        */
-       struct list_head        tu_remote_update_list;
-
-       /* sent after or before local transaction */
-       unsigned int            tu_sent_after_local_trans:1,
-                               tu_only_remote_trans:1;
-};
-
 /**
  * This is the general purpose transaction handle.
  * 1. Transaction Life Cycle
 /**
  * This is the general purpose transaction handle.
  * 1. Transaction Life Cycle
@@ -1807,9 +1794,8 @@ struct thandle {
        /** the dt device on which the transactions are executed */
        struct dt_device *th_dev;
 
        /** the dt device on which the transactions are executed */
        struct dt_device *th_dev;
 
-       atomic_t        th_refc;
-       /* the size of transaction */
-       int             th_alloc_size;
+       /* In some callback function, it needs to access the top_th directly */
+       struct thandle *th_top;
 
        /** context for this transaction, tag is LCT_TX_HANDLE */
        struct lu_context th_ctx;
 
        /** context for this transaction, tag is LCT_TX_HANDLE */
        struct lu_context th_ctx;
@@ -1822,27 +1808,11 @@ struct thandle {
        __s32             th_result;
 
        /** whether we need sync commit */
        __s32             th_result;
 
        /** whether we need sync commit */
-       unsigned int            th_sync:1;
-
+       unsigned int            th_sync:1,
        /* local transation, no need to inform other layers */
        /* local transation, no need to inform other layers */
-       unsigned int            th_local:1;
-
-       struct thandle_update   *th_update;
+                               th_local:1;
 };
 
 };
 
-static inline void thandle_get(struct thandle *thandle)
-{
-       atomic_inc(&thandle->th_refc);
-}
-
-static inline void thandle_put(struct thandle *thandle)
-{
-       if (atomic_dec_and_test(&thandle->th_refc)) {
-               if (thandle->th_update != NULL)
-                       OBD_FREE_PTR(thandle->th_update);
-               OBD_FREE(thandle, thandle->th_alloc_size);
-       }
-}
 /**
  * Transaction call-backs.
  *
 /**
  * Transaction call-backs.
  *
@@ -1919,6 +1889,18 @@ dt_locate(const struct lu_env *env, struct dt_device *dev,
                            dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
 }
 
                            dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
 }
 
+static inline struct dt_object *
+dt_object_locate(struct dt_object *dto, struct dt_device *dt_dev)
+{
+       struct lu_object *lo;
+
+       list_for_each_entry(lo, &dto->do_lu.lo_header->loh_layers, lo_linkage) {
+               if (lo->lo_dev == &dt_dev->dd_lu_dev)
+                       return container_of(lo, struct dt_object, do_lu);
+       }
+       return NULL;
+}
+
 int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                           const struct lu_fid *first_fid,
                           struct local_oid_storage **los);
 int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                           const struct lu_fid *first_fid,
                           struct local_oid_storage **los);
@@ -2351,6 +2333,27 @@ static inline int dt_read_prep(const struct lu_env *env, struct dt_object *d,
         return d->do_body_ops->dbo_read_prep(env, d, lnb, n);
 }
 
         return d->do_body_ops->dbo_read_prep(env, d, lnb, n);
 }
 
+static inline int dt_declare_write(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  const struct lu_buf *buf, loff_t pos,
+                                  struct thandle *th)
+{
+       LASSERT(dt);
+       LASSERT(dt->do_body_ops);
+       LASSERT(dt->do_body_ops->dbo_declare_write);
+       return dt->do_body_ops->dbo_declare_write(env, dt, buf, pos, th);
+}
+
+static inline ssize_t dt_write(const struct lu_env *env, struct dt_object *dt,
+                              const struct lu_buf *buf, loff_t *pos,
+                              struct thandle *th, int rq)
+{
+       LASSERT(dt);
+       LASSERT(dt->do_body_ops);
+       LASSERT(dt->do_body_ops->dbo_write);
+       return dt->do_body_ops->dbo_write(env, dt, buf, pos, th, rq);
+}
+
 static inline int dt_declare_punch(const struct lu_env *env,
                                    struct dt_object *dt, __u64 start,
                                    __u64 end, struct thandle *th)
 static inline int dt_declare_punch(const struct lu_env *env,
                                    struct dt_object *dt, __u64 start,
                                    __u64 end, struct thandle *th)
index c75e4fb..861d631 100644 (file)
 #ifndef _LUSTRE_UPDATE_H
 #define _LUSTRE_UPDATE_H
 #include <lustre_net.h>
 #ifndef _LUSTRE_UPDATE_H
 #define _LUSTRE_UPDATE_H
 #include <lustre_net.h>
+#include <dt_object.h>
 
 #define OUT_UPDATE_INIT_BUFFER_SIZE    4096
 #define OUT_UPDATE_REPLY_SIZE          8192
 
 
 #define OUT_UPDATE_INIT_BUFFER_SIZE    4096
 #define OUT_UPDATE_REPLY_SIZE          8192
 
-struct dt_object;
-struct dt_object_hint;
-struct dt_object_format;
-struct dt_allocation_hint;
 struct dt_key;
 struct dt_rec;
 struct dt_key;
 struct dt_rec;
-struct thandle;
 
 struct update_buffer {
        struct object_update_request    *ub_req;
        size_t                          ub_req_size;
 };
 
 
 struct update_buffer {
        struct object_update_request    *ub_req;
        size_t                          ub_req_size;
 };
 
+#define TOP_THANDLE_MAGIC      0x20140917
+/* {top,sub}_thandle are used to manage distributed transactions which
+ * include updates on several nodes. A top_handle represents the
+ * whole operation, and sub_thandle represents updates on each node. */
+struct top_thandle {
+       struct thandle          tt_super;
+       __u32                   tt_magic;
+       /* The master sub transaction. */
+       struct thandle          *tt_master_sub_thandle;
+
+       /* Other sub thandle will be listed here. */
+       struct list_head        tt_sub_thandle_list;
+};
+
+struct sub_thandle {
+       /* point to the osd/osp_thandle */
+       struct thandle          *st_sub_th;
+       struct list_head        st_sub_list;
+};
+
 /**
  * Tracking the updates being executed on this dt_device.
  */
 struct dt_update_request {
        struct dt_device                *dur_dt;
        /* attached itself to thandle */
 /**
  * Tracking the updates being executed on this dt_device.
  */
 struct dt_update_request {
        struct dt_device                *dur_dt;
        /* attached itself to thandle */
-       struct list_head                dur_list;
        int                             dur_flags;
        /* update request result */
        int                             dur_rc;
        int                             dur_flags;
        /* update request result */
        int                             dur_rc;
@@ -167,13 +182,9 @@ static inline void update_inc_batchid(struct dt_update_request *update)
 }
 
 /* target/out_lib.c */
 }
 
 /* target/out_lib.c */
-struct thandle_update;
-struct dt_update_request *out_find_update(struct thandle_update *tu,
-                                         struct dt_device *dt_dev);
 void dt_update_request_destroy(struct dt_update_request *update);
 struct dt_update_request *dt_update_request_create(struct dt_device *dt);
 void dt_update_request_destroy(struct dt_update_request *update);
 struct dt_update_request *dt_update_request_create(struct dt_device *dt);
-struct dt_update_request *dt_update_request_find_or_create(struct thandle *th,
-                                                         struct dt_object *dt);
+
 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
                    enum update_type op, const struct lu_fid *fid,
                    int params_count, __u16 *param_sizes, const void **bufs,
 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
                    enum update_type op, const struct lu_fid *fid,
                    int params_count, __u16 *param_sizes, const void **bufs,
@@ -214,4 +225,27 @@ int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
                          const struct dt_key *key);
 int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
                       const struct lu_fid *fid, const char *name);
                          const struct dt_key *key);
 int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
                       const struct lu_fid *fid, const char *name);
+
+/* target/update_trans.c */
+struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
+                                     struct thandle *th,
+                                     struct dt_device *sub_dt);
+
+static inline struct thandle *
+thandle_get_sub(const struct lu_env *env, struct thandle *th,
+                const struct dt_object *sub_obj)
+{
+       return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev));
+}
+
+struct thandle *
+top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
+
+int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
+                   struct thandle *th);
+
+int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
+                  struct thandle *th);
+
+void top_thandle_destroy(struct top_thandle *top_th);
 #endif
 #endif
index bb873c8..e156fd2 100644 (file)
@@ -2876,7 +2876,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
        struct dt_object_format         *dof    = &info->lti_dof;
        struct ost_id                   *oi     = &info->lti_oi;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct dt_object_format         *dof    = &info->lti_dof;
        struct ost_id                   *oi     = &info->lti_oi;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
-       struct dt_device                *dev    = lfsck->li_bottom;
+       struct dt_device                *dev;
        struct lu_device                *d      =
                                &lfsck_obj2dev(llr->llr_child)->dd_lu_dev;
        struct lu_object                *o;
        struct lu_device                *d      =
                                &lfsck_obj2dev(llr->llr_child)->dd_lu_dev;
        struct lu_object                *o;
@@ -2922,6 +2922,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
        la->la_valid = LA_UID | LA_GID;
        memset(dof, 0, sizeof(*dof));
 
        la->la_valid = LA_UID | LA_GID;
        memset(dof, 0, sizeof(*dof));
 
+       dev = lfsck_obj2dev(child);
        handle = dt_trans_create(env, dev);
        if (IS_ERR(handle))
                GOTO(log, rc = PTR_ERR(handle));
        handle = dt_trans_create(env, dev);
        if (IS_ERR(handle))
                GOTO(log, rc = PTR_ERR(handle));
@@ -2930,7 +2931,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
        if (rc != 0)
                GOTO(stop, rc);
 
-       rc = dt_trans_start_local(env, dev, handle);
+       rc = dt_trans_start(env, dev, handle);
        if (rc != 0)
                GOTO(stop, rc);
 
        if (rc != 0)
                GOTO(stop, rc);
 
index 70e36a1..1aa9e4a 100644 (file)
@@ -2038,6 +2038,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
        const struct lu_fid             *cfid   = lfsck_dto2fid(child);
        struct lu_fid                    tfid;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        const struct lu_fid             *cfid   = lfsck_dto2fid(child);
        struct lu_fid                    tfid;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_object                *dto;
        struct dt_device                *dev    = lfsck->li_next;
        struct thandle                  *th     = NULL;
        struct lfsck_lock_handle        *llh    = &info->lti_llh;
        struct dt_device                *dev    = lfsck->li_next;
        struct thandle                  *th     = NULL;
        struct lfsck_lock_handle        *llh    = &info->lti_llh;
@@ -2061,14 +2062,15 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
        if (IS_ERR(th))
                GOTO(unlock1, rc = PTR_ERR(th));
 
        if (IS_ERR(th))
                GOTO(unlock1, rc = PTR_ERR(th));
 
-       rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
+       dto = dt_object_locate(parent, th->th_dev);
+       rc = dt_declare_delete(env, dto, (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
 
        if (update) {
                rec->rec_type = lfsck_object_type(child) & S_IFMT;
                rec->rec_fid = cfid;
        if (rc != 0)
                GOTO(stop, rc);
 
        if (update) {
                rec->rec_type = lfsck_object_type(child) & S_IFMT;
                rec->rec_fid = cfid;
-               rc = dt_declare_insert(env, parent,
+               rc = dt_declare_insert(env, dto,
                                       (const struct dt_rec *)rec,
                                       (const struct dt_key *)name2, th);
                if (rc != 0)
                                       (const struct dt_rec *)rec,
                                       (const struct dt_key *)name2, th);
                if (rc != 0)
@@ -2076,7 +2078,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
        }
 
        if (dec) {
        }
 
        if (dec) {
-               rc = dt_declare_ref_del(env, parent, th);
+               rc = dt_declare_ref_del(env, dto, th);
                if (rc != 0)
                        GOTO(stop, rc);
        }
                if (rc != 0)
                        GOTO(stop, rc);
        }
@@ -2085,8 +2087,9 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
        if (rc != 0)
                GOTO(stop, rc);
 
-       dt_write_lock(env, parent, 0);
-       rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
+
+       dt_write_lock(env, dto, 0);
+       rc = dt_lookup(env, dto, (struct dt_rec *)&tfid,
                       (const struct dt_key *)name);
        /* Someone has removed the bad name entry by race. */
        if (rc == -ENOENT)
                       (const struct dt_key *)name);
        /* Someone has removed the bad name entry by race. */
        if (rc == -ENOENT)
@@ -2103,12 +2106,12 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
        if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
                GOTO(unlock2, rc = 1);
 
        if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
                GOTO(unlock2, rc = 1);
 
-       rc = dt_delete(env, parent, (const struct dt_key *)name, th);
+       rc = dt_delete(env, dto, (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(unlock2, rc);
 
        if (update) {
        if (rc != 0)
                GOTO(unlock2, rc);
 
        if (update) {
-               rc = dt_insert(env, parent,
+               rc = dt_insert(env, dto,
                               (const struct dt_rec *)rec,
                               (const struct dt_key *)name2, th, 1);
                if (rc != 0)
                               (const struct dt_rec *)rec,
                               (const struct dt_key *)name2, th, 1);
                if (rc != 0)
@@ -2116,7 +2119,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
        }
 
        if (dec) {
        }
 
        if (dec) {
-               rc = dt_ref_del(env, parent, th);
+               rc = dt_ref_del(env, dto, th);
                if (rc != 0)
                        GOTO(unlock2, rc);
        }
                if (rc != 0)
                        GOTO(unlock2, rc);
        }
index 31459b5..29acce8 100644 (file)
@@ -1,5 +1,6 @@
 MODULES := lod
 MODULES := lod
-lod-objs := lod_dev.o lod_lov.o lproc_lod.o lod_pool.o lod_object.o lod_qos.o
+lod-objs := lod_dev.o lod_lov.o lproc_lod.o lod_pool.o lod_object.o lod_qos.o  \
+           lod_sub_object.o
 
 EXTRA_DIST = $(lod-objs:.o=.c) lod_internal.h
 
 
 EXTRA_DIST = $(lod-objs:.o=.c) lod_internal.h
 
index 14e76df..1c1b6ce 100644 (file)
@@ -533,14 +533,16 @@ static int lod_statfs(const struct lu_env *env,
  * see include/dt_object.h for the details.
  */
 static struct thandle *lod_trans_create(const struct lu_env *env,
  * see include/dt_object.h for the details.
  */
 static struct thandle *lod_trans_create(const struct lu_env *env,
-                                       struct dt_device *dev)
+                                       struct dt_device *dt)
 {
        struct thandle *th;
 
 {
        struct thandle *th;
 
-       th = dt_trans_create(env, dt2lod_dev(dev)->lod_child);
+       th = top_trans_create(env, dt2lod_dev(dt)->lod_child);
        if (IS_ERR(th))
                return th;
 
        if (IS_ERR(th))
                return th;
 
+       th->th_dev = dt;
+
        return th;
 }
 
        return th;
 }
 
@@ -552,25 +554,10 @@ static struct thandle *lod_trans_create(const struct lu_env *env,
  *
  * see include/dt_object.h for the details.
  */
  *
  * see include/dt_object.h for the details.
  */
-static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
+static int lod_trans_start(const struct lu_env *env, struct dt_device *dt,
                           struct thandle *th)
 {
                           struct thandle *th)
 {
-       struct lod_device *lod = dt2lod_dev((struct dt_device *) dev);
-       int rc = 0;
-
-       if (unlikely(th->th_update != NULL)) {
-               struct thandle_update *tu = th->th_update;
-               struct dt_update_request *update;
-
-               list_for_each_entry(update, &tu->tu_remote_update_list,
-                                   dur_list) {
-                       LASSERT(update->dur_dt != NULL);
-                       rc = dt_trans_start(env, update->dur_dt, th);
-                       if (rc != 0)
-                               return rc;
-               }
-       }
-       return dt_trans_start(env, lod->lod_child, th);
+       return top_trans_start(env, dt2lod_dev(dt)->lod_child, th);
 }
 
 /**
 }
 
 /**
@@ -584,31 +571,7 @@ static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
 static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
                          struct thandle *th)
 {
 static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
                          struct thandle *th)
 {
-       struct thandle_update           *tu = th->th_update;
-       struct dt_update_request        *update;
-       struct dt_update_request        *tmp;
-       int                             rc2 = 0;
-       int                             rc;
-       ENTRY;
-
-       thandle_get(th);
-       rc = dt_trans_stop(env, th->th_dev, th);
-       if (likely(tu == NULL)) {
-               thandle_put(th);
-               RETURN(rc);
-       }
-
-       list_for_each_entry_safe(update, tmp,
-                                &tu->tu_remote_update_list,
-                                dur_list) {
-               /* update will be freed inside dt_trans_stop */
-               rc2 = dt_trans_stop(env, update->dur_dt, th);
-               if (unlikely(rc2 != 0 && rc == 0))
-                       rc = rc2;
-       }
-       thandle_put(th);
-
-       RETURN(rc);
+       return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th);
 }
 
 /**
 }
 
 /**
index 3d6fa6d..2830b83 100644 (file)
@@ -482,5 +482,80 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
                        struct thandle *th);
 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo);
 
                        struct thandle *th);
 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo);
 
+/* lod_sub_object.c */
+struct thandle *lod_sub_get_thandle(const struct lu_env *env,
+                                   struct thandle *th,
+                                   const struct dt_object *sub_obj);
+int lod_sub_object_declare_create(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 struct lu_attr *attr,
+                                 struct dt_allocation_hint *hint,
+                                 struct dt_object_format *dof,
+                                 struct thandle *th);
+int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt,
+                         struct lu_attr *attr,
+                         struct dt_allocation_hint *hint,
+                         struct dt_object_format *dof,
+                         struct thandle *th);
+int lod_sub_object_declare_ref_add(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct thandle *th);
+int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt,
+                          struct thandle *th);
+int lod_sub_object_declare_ref_del(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct thandle *th);
+int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt,
+                          struct thandle *th);
+int lod_sub_object_declare_destroy(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct thandle *th);
+int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
+                          struct thandle *th);
+int lod_sub_object_declare_insert(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 const struct dt_rec *rec,
+                                 const struct dt_key *key,
+                                 struct thandle *th);
+int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
+                               const struct dt_rec *rec,
+                               const struct dt_key *key, struct thandle *th,
+                               int ign);
+int lod_sub_object_declare_delete(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 const struct dt_key *key,
+                                 struct thandle *th);
+int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt,
+                         const struct dt_key *name, struct thandle *th);
+int lod_sub_object_declare_xattr_set(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const struct lu_buf *buf,
+                                    const char *name, int fl,
+                                    struct thandle *th);
+int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_buf *buf, const char *name, int fl,
+                            struct thandle *th);
+int lod_sub_object_declare_attr_set(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   const struct lu_attr *attr,
+                                   struct thandle *th);
+int lod_sub_object_attr_set(const struct lu_env *env,
+                           struct dt_object *dt,
+                           const struct lu_attr *attr,
+                           struct thandle *th);
+int lod_sub_object_declare_xattr_del(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const char *name,
+                                    struct thandle *th);
+int lod_sub_object_xattr_del(const struct lu_env *env,
+                            struct dt_object *dt,
+                            const char *name,
+                            struct thandle *th);
+int lod_sub_object_declare_write(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct lu_buf *buf, loff_t pos,
+                                struct thandle *th);
+ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_buf *buf, loff_t *pos,
+                            struct thandle *th, int rq);
 #endif
 #endif
-
index f9bc8ae..c4a6544 100644 (file)
@@ -667,10 +667,12 @@ int lod_generate_and_set_lovea(const struct lu_env *env,
 
        info->lti_buf.lb_buf = lmm;
        info->lti_buf.lb_len = lmm_size;
 
        info->lti_buf.lb_buf = lmm;
        info->lti_buf.lb_len = lmm_size;
-       rc = dt_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0,
-                         th);
-       if (rc < 0)
+       rc = lod_sub_object_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
+                                     0, th);
+       if (rc < 0) {
                lod_object_free_striping(env, lo);
                lod_object_free_striping(env, lo);
+               RETURN(rc);
+       }
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
index f8536d0..ad146e6 100644 (file)
@@ -86,9 +86,10 @@ static int lod_declare_index_insert(const struct lu_env *env,
                                    struct dt_object *dt,
                                    const struct dt_rec *rec,
                                    const struct dt_key *key,
                                    struct dt_object *dt,
                                    const struct dt_rec *rec,
                                    const struct dt_key *key,
-                                   struct thandle *handle)
+                                   struct thandle *th)
 {
 {
-       return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
+       return lod_sub_object_declare_insert(env, dt_object_child(dt),
+                                            rec, key, th);
 }
 
 /**
 }
 
 /**
@@ -105,7 +106,8 @@ static int lod_index_insert(const struct lu_env *env,
                            struct thandle *th,
                            int ign)
 {
                            struct thandle *th,
                            int ign)
 {
-       return dt_insert(env, dt_object_child(dt), rec, key, th, ign);
+       return lod_sub_object_index_insert(env, dt_object_child(dt), rec, key,
+                                          th, ign);
 }
 
 /**
 }
 
 /**
@@ -121,7 +123,8 @@ static int lod_declare_index_delete(const struct lu_env *env,
                                    const struct dt_key *key,
                                    struct thandle *th)
 {
                                    const struct dt_key *key,
                                    struct thandle *th)
 {
-       return dt_declare_delete(env, dt_object_child(dt), key, th);
+       return lod_sub_object_declare_delete(env, dt_object_child(dt), key,
+                                            th);
 }
 
 /**
 }
 
 /**
@@ -136,7 +139,7 @@ static int lod_index_delete(const struct lu_env *env,
                            const struct dt_key *key,
                            struct thandle *th)
 {
                            const struct dt_key *key,
                            struct thandle *th)
 {
-       return dt_delete(env, dt_object_child(dt), key, th);
+       return lod_sub_object_delete(env, dt_object_child(dt), key, th);
 }
 
 /**
 }
 
 /**
@@ -153,7 +156,6 @@ static struct dt_it *lod_it_init(const struct lu_env *env,
        struct lod_it           *it = &lod_env_info(env)->lti_it;
        struct dt_it            *it_next;
 
        struct lod_it           *it = &lod_env_info(env)->lti_it;
        struct dt_it            *it_next;
 
-
        it_next = next->do_index_ops->dio_it.init(env, next, attr);
        if (IS_ERR(it_next))
                return it_next;
        it_next = next->do_index_ops->dio_it.init(env, next, attr);
        if (IS_ERR(it_next))
                return it_next;
@@ -1075,7 +1077,7 @@ static int lod_attr_get(const struct lu_env *env,
  **/
 static int lod_mark_dead_object(const struct lu_env *env,
                                struct dt_object *dt,
  **/
 static int lod_mark_dead_object(const struct lu_env *env,
                                struct dt_object *dt,
-                               struct thandle *handle,
+                               struct thandle *th,
                                bool declare)
 {
        struct lod_object       *lo = lod_dt_obj(dt);
                                bool declare)
 {
        struct lod_object       *lo = lod_dt_obj(dt);
@@ -1111,13 +1113,14 @@ static int lod_mark_dead_object(const struct lu_env *env,
                buf.lb_buf = lmv;
                buf.lb_len = sizeof(*lmv);
                if (declare) {
                buf.lb_buf = lmv;
                buf.lb_len = sizeof(*lmv);
                if (declare) {
-                       rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
-                                                 XATTR_NAME_LMV,
-                                                 LU_XATTR_REPLACE, handle);
+                       rc = lod_sub_object_declare_xattr_set(env,
+                                               lo->ldo_stripe[i], &buf,
+                                               XATTR_NAME_LMV,
+                                               LU_XATTR_REPLACE, th);
                } else {
                } else {
-                       rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
-                                         XATTR_NAME_LMV, LU_XATTR_REPLACE,
-                                         handle);
+                       rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+                                                     &buf, XATTR_NAME_LMV,
+                                                     LU_XATTR_REPLACE, th);
                }
                if (rc != 0)
                        break;
                }
                if (rc != 0)
                        break;
@@ -1137,7 +1140,7 @@ static int lod_mark_dead_object(const struct lu_env *env,
 static int lod_declare_attr_set(const struct lu_env *env,
                                struct dt_object *dt,
                                const struct lu_attr *attr,
 static int lod_declare_attr_set(const struct lu_env *env,
                                struct dt_object *dt,
                                const struct lu_attr *attr,
-                               struct thandle *handle)
+                               struct thandle *th)
 {
        struct dt_object  *next = dt_object_child(dt);
        struct lod_object *lo = lod_dt_obj(dt);
 {
        struct dt_object  *next = dt_object_child(dt);
        struct lod_object *lo = lod_dt_obj(dt);
@@ -1147,14 +1150,14 @@ static int lod_declare_attr_set(const struct lu_env *env,
        /* Set dead object on all other stripes */
        if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
            attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
        /* Set dead object on all other stripes */
        if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
            attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
-               rc = lod_mark_dead_object(env, dt, handle, true);
+               rc = lod_mark_dead_object(env, dt, th, true);
                RETURN(rc);
        }
 
        /*
         * declare setattr on the local object
         */
                RETURN(rc);
        }
 
        /*
         * declare setattr on the local object
         */
-       rc = dt_declare_attr_set(env, next, attr, handle);
+       rc = lod_sub_object_declare_attr_set(env, next, attr, th);
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
@@ -1193,20 +1196,20 @@ static int lod_declare_attr_set(const struct lu_env *env,
         */
        LASSERT(lo->ldo_stripe);
        for (i = 0; i < lo->ldo_stripenr; i++) {
         */
        LASSERT(lo->ldo_stripe);
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               if (likely(lo->ldo_stripe[i] != NULL)) {
-                       rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
-                                                handle);
-                       if (rc != 0) {
-                               CERROR("failed declaration: %d\n", rc);
-                               break;
-                       }
-               }
+               if (lo->ldo_stripe[i] == NULL)
+                       continue;
+               rc = lod_sub_object_declare_attr_set(env,
+                                       lo->ldo_stripe[i], attr,
+                                       th);
+               if (rc != 0)
+                       RETURN(rc);
        }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
            dt_object_exists(next) != 0 &&
            dt_object_remote(next) == 0)
        }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
            dt_object_exists(next) != 0 &&
            dt_object_remote(next) == 0)
-               dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
+               lod_sub_object_declare_xattr_del(env, next,
+                                               XATTR_NAME_LOV, th);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
            dt_object_exists(next) &&
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
            dt_object_exists(next) &&
@@ -1216,8 +1219,9 @@ static int lod_declare_attr_set(const struct lu_env *env,
 
                buf->lb_buf = info->lti_ea_store;
                buf->lb_len = info->lti_ea_store_size;
 
                buf->lb_buf = info->lti_ea_store;
                buf->lb_len = info->lti_ea_store_size;
-               dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
-                                    LU_XATTR_REPLACE, handle);
+               lod_sub_object_declare_xattr_set(env, next, buf,
+                                                XATTR_NAME_LOV,
+                                                LU_XATTR_REPLACE, th);
        }
 
        RETURN(rc);
        }
 
        RETURN(rc);
@@ -1234,7 +1238,7 @@ static int lod_declare_attr_set(const struct lu_env *env,
 static int lod_attr_set(const struct lu_env *env,
                        struct dt_object *dt,
                        const struct lu_attr *attr,
 static int lod_attr_set(const struct lu_env *env,
                        struct dt_object *dt,
                        const struct lu_attr *attr,
-                       struct thandle *handle)
+                       struct thandle *th)
 {
        struct dt_object        *next = dt_object_child(dt);
        struct lod_object       *lo = lod_dt_obj(dt);
 {
        struct dt_object        *next = dt_object_child(dt);
        struct lod_object       *lo = lod_dt_obj(dt);
@@ -1244,14 +1248,14 @@ static int lod_attr_set(const struct lu_env *env,
        /* Set dead object on all other stripes */
        if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
            attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
        /* Set dead object on all other stripes */
        if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
            attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
-               rc = lod_mark_dead_object(env, dt, handle, false);
+               rc = lod_mark_dead_object(env, dt, th, false);
                RETURN(rc);
        }
 
        /*
         * apply changes to the local object
         */
                RETURN(rc);
        }
 
        /*
         * apply changes to the local object
         */
-       rc = dt_attr_set(env, next, attr, handle);
+       rc = lod_sub_object_attr_set(env, next, attr, th);
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
@@ -1277,21 +1281,20 @@ static int lod_attr_set(const struct lu_env *env,
        for (i = 0; i < lo->ldo_stripenr; i++) {
                if (unlikely(lo->ldo_stripe[i] == NULL))
                        continue;
        for (i = 0; i < lo->ldo_stripenr; i++) {
                if (unlikely(lo->ldo_stripe[i] == NULL))
                        continue;
+
                if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
                    (dt_object_exists(lo->ldo_stripe[i]) == 0))
                        continue;
 
                if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
                    (dt_object_exists(lo->ldo_stripe[i]) == 0))
                        continue;
 
-               rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle);
-               if (rc != 0) {
-                       CERROR("failed declaration: %d\n", rc);
+               rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th);
+               if (rc != 0)
                        break;
                        break;
-               }
        }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
            dt_object_exists(next) != 0 &&
            dt_object_remote(next) == 0)
        }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
            dt_object_exists(next) != 0 &&
            dt_object_remote(next) == 0)
-               dt_xattr_del(env, next, XATTR_NAME_LOV, handle);
+               rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
            dt_object_exists(next) &&
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
            dt_object_exists(next) &&
@@ -1322,8 +1325,9 @@ static int lod_attr_set(const struct lu_env *env,
                fid->f_oid--;
                fid_to_ostid(fid, oi);
                ostid_cpu_to_le(oi, &objs->l_ost_oi);
                fid->f_oid--;
                fid_to_ostid(fid, oi);
                ostid_cpu_to_le(oi, &objs->l_ost_oi);
-               dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
-                            LU_XATTR_REPLACE, handle);
+
+               rc = lod_sub_object_xattr_set(env, next, buf, XATTR_NAME_LOV,
+                                             LU_XATTR_REPLACE, th);
        }
 
        RETURN(rc);
        }
 
        RETURN(rc);
@@ -1629,6 +1633,139 @@ out:
  * \retval             0 on success
  * \retval             negative if failed
  */
  * \retval             0 on success
  * \retval             negative if failed
  */
+static int lod_dir_declare_create_stripes(const struct lu_env *env,
+                                         struct dt_object *dt,
+                                         struct lu_attr *attr,
+                                         struct dt_object_format *dof,
+                                         struct thandle *th)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lu_buf           lmv_buf;
+       struct lu_buf           slave_lmv_buf;
+       struct lmv_mds_md_v1    *lmm;
+       struct lmv_mds_md_v1    *slave_lmm = NULL;
+       struct dt_insert_rec    *rec = &info->lti_dt_rec;
+       struct lod_object       *lo = lod_dt_obj(dt);
+       int                     rc;
+       __u32                   i;
+       ENTRY;
+
+       rc = lod_prep_lmv_md(env, dt, &lmv_buf);
+       if (rc != 0)
+               GOTO(out, rc);
+       lmm = lmv_buf.lb_buf;
+
+       OBD_ALLOC_PTR(slave_lmm);
+       if (slave_lmm == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       lod_prep_slave_lmv_md(slave_lmm, lmm);
+       slave_lmv_buf.lb_buf = slave_lmm;
+       slave_lmv_buf.lb_len = sizeof(*slave_lmm);
+
+       if (!dt_try_as_dir(env, dt_object_child(dt)))
+               GOTO(out, rc = -EINVAL);
+
+       rec->rec_type = S_IFDIR;
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               struct dt_object        *dto = lo->ldo_stripe[i];
+               char                    *stripe_name = info->lti_key;
+               struct lu_name          *sname;
+               struct linkea_data       ldata          = { NULL };
+               struct lu_buf           linkea_buf;
+
+               rc = lod_sub_object_declare_create(env, dto, attr, NULL,
+                                                  dof, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (!dt_try_as_dir(env, dto))
+                       GOTO(out, rc = -EINVAL);
+
+               rc = lod_sub_object_declare_ref_add(env, dto, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
+               rc = lod_sub_object_declare_insert(env, dto,
+                                       (const struct dt_rec *)rec,
+                                       (const struct dt_key *)dot, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* master stripe FID will be put to .. */
+               rec->rec_fid = lu_object_fid(&dt->do_lu);
+               rc = lod_sub_object_declare_insert(env, dto,
+                                       (const struct dt_rec *)rec,
+                                       (const struct dt_key *)dotdot,
+                                       th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
+                   cfs_fail_val != i) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
+                           cfs_fail_val == i)
+                               slave_lmm->lmv_master_mdt_index =
+                                                       cpu_to_le32(i + 1);
+                       else
+                               slave_lmm->lmv_master_mdt_index =
+                                                       cpu_to_le32(i);
+                       rc = lod_sub_object_declare_xattr_set(env, dto,
+                                       &slave_lmv_buf, XATTR_NAME_LMV, 0, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+                   cfs_fail_val == i)
+                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+                               PFID(lu_object_fid(&dto->do_lu)), i + 1);
+               else
+                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+                               PFID(lu_object_fid(&dto->do_lu)), i);
+
+               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+               linkea_buf.lb_len = ldata.ld_leh->leh_len;
+               rc = lod_sub_object_declare_xattr_set(env, dto, &linkea_buf,
+                                         XATTR_NAME_LINK, 0, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
+               rc = lod_sub_object_declare_insert(env, dt_object_child(dt),
+                                      (const struct dt_rec *)rec,
+                                      (const struct dt_key *)stripe_name,
+                                      th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = lod_sub_object_declare_ref_add(env, dt_object_child(dt),
+                                                   th);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+
+       rc = lod_sub_object_declare_xattr_set(env, dt_object_child(dt),
+                               &lmv_buf, XATTR_NAME_LMV, 0, th);
+       if (rc != 0)
+               GOTO(out, rc);
+out:
+       if (slave_lmm != NULL)
+               OBD_FREE_PTR(slave_lmm);
+
+       RETURN(rc);
+}
+
 static int lod_prep_md_striped_create(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct lu_attr *attr,
 static int lod_prep_md_striped_create(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct lu_attr *attr,
@@ -1639,13 +1776,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
        struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
        struct lod_object       *lo = lod_dt_obj(dt);
        struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
        struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
        struct lod_object       *lo = lod_dt_obj(dt);
-       struct lod_thread_info  *info = lod_env_info(env);
        struct dt_object        **stripe;
        struct dt_object        **stripe;
-       struct lu_buf           lmv_buf;
-       struct lu_buf           slave_lmv_buf;
-       struct lmv_mds_md_v1    *lmm;
-       struct lmv_mds_md_v1    *slave_lmm = NULL;
-       struct dt_insert_rec    *rec = &info->lti_dt_rec;
        __u32                   stripe_count;
        int                     *idx_array;
        int                     rc = 0;
        __u32                   stripe_count;
        int                     *idx_array;
        int                     rc = 0;
@@ -1774,145 +1905,7 @@ next:
        if (lo->ldo_stripenr == 0)
                GOTO(out_put, rc = -ENOSPC);
 
        if (lo->ldo_stripenr == 0)
                GOTO(out_put, rc = -ENOSPC);
 
-       rc = lod_prep_lmv_md(env, dt, &lmv_buf);
-       if (rc != 0)
-               GOTO(out_put, rc);
-       lmm = lmv_buf.lb_buf;
-
-       OBD_ALLOC_PTR(slave_lmm);
-       if (slave_lmm == NULL)
-               GOTO(out_put, rc = -ENOMEM);
-
-       lod_prep_slave_lmv_md(slave_lmm, lmm);
-       slave_lmv_buf.lb_buf = slave_lmm;
-       slave_lmv_buf.lb_len = sizeof(*slave_lmm);
-
-       if (!dt_try_as_dir(env, dt_object_child(dt)))
-               GOTO(out_put, rc = -EINVAL);
-
-       rec->rec_type = S_IFDIR;
-       for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object        *dto            = stripe[i];
-               char                    *stripe_name    = info->lti_key;
-               struct lu_name          *sname;
-               struct linkea_data       ldata          = { NULL };
-               struct lu_buf            linkea_buf;
-
-               rc = dt_declare_create(env, dto, attr, NULL, dof, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               if (!dt_try_as_dir(env, dto))
-                       GOTO(out_put, rc = -EINVAL);
-
-               rc = dt_declare_ref_add(env, dto, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
-                                      (const struct dt_key *)dot, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               /* master stripe FID will be put to .. */
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
-                                      (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               /* probably nothing to inherite */
-               if (lo->ldo_def_striping_set &&
-                   !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
-                                        lo->ldo_def_stripenr,
-                                        lo->ldo_def_stripe_offset,
-                                        lo->ldo_pool)) {
-                       struct lov_user_md_v3   *v3;
-
-                       /* sigh, lti_ea_store has been used for lmv_buf,
-                        * so we have to allocate buffer for default
-                        * stripe EA */
-                       OBD_ALLOC_PTR(v3);
-                       if (v3 == NULL)
-                               GOTO(out_put, rc = -ENOMEM);
-
-                       memset(v3, 0, sizeof(*v3));
-                       v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
-                       v3->lmm_stripe_count =
-                               cpu_to_le16(lo->ldo_def_stripenr);
-                       v3->lmm_stripe_offset =
-                               cpu_to_le16(lo->ldo_def_stripe_offset);
-                       v3->lmm_stripe_size =
-                               cpu_to_le32(lo->ldo_def_stripe_size);
-                       if (lo->ldo_pool != NULL)
-                               strlcpy(v3->lmm_pool_name, lo->ldo_pool,
-                                       sizeof(v3->lmm_pool_name));
-
-                       info->lti_buf.lb_buf = v3;
-                       info->lti_buf.lb_len = sizeof(*v3);
-                       rc = dt_declare_xattr_set(env, dto,
-                                                 &info->lti_buf,
-                                                 XATTR_NAME_LOV,
-                                                 0, th);
-                       OBD_FREE_PTR(v3);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
-
-               if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
-                   cfs_fail_val != i) {
-                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
-                           cfs_fail_val == i)
-                               slave_lmm->lmv_master_mdt_index =
-                                                       cpu_to_le32(i + 1);
-                       else
-                               slave_lmm->lmv_master_mdt_index =
-                                                       cpu_to_le32(i);
-                       rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
-                                                 XATTR_NAME_LMV, 0, th);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
-                   cfs_fail_val == i)
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i + 1);
-               else
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i);
-
-               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-               linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = dt_declare_xattr_set(env, dto, &linkea_buf,
-                                         XATTR_NAME_LINK, 0, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_declare_insert(env, dt_object_child(dt),
-                                      (const struct dt_rec *)rec,
-                                      (const struct dt_key *)stripe_name, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rc = dt_declare_ref_add(env, dt_object_child(dt), th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-       }
-
-       rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
-                                 XATTR_NAME_LMV, 0, th);
+       rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th);
        if (rc != 0)
                GOTO(out_put, rc);
 
        if (rc != 0)
                GOTO(out_put, rc);
 
@@ -1930,8 +1923,6 @@ out_put:
 out_free:
        if (idx_array != NULL)
                OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
 out_free:
        if (idx_array != NULL)
                OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
-       if (slave_lmm != NULL)
-               OBD_FREE_PTR(slave_lmm);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -2028,7 +2019,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
                        RETURN(rc);
        }
 
                        RETURN(rc);
        }
 
-       rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
+       rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th);
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
@@ -2046,8 +2037,9 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
-                                         name, fl, th);
+
+               rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i],
+                                               buf, name, fl, th);
                if (rc != 0)
                        break;
        }
                if (rc != 0)
                        break;
        }
@@ -2102,7 +2094,8 @@ static int lod_declare_xattr_set(const struct lu_env *env,
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
        } else {
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
        } else {
-               rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
+               rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
+                                                     fl, th);
        }
 
        RETURN(rc);
        }
 
        RETURN(rc);
@@ -2150,7 +2143,7 @@ static int lod_xattr_set_internal(const struct lu_env *env,
        int                     i;
        ENTRY;
 
        int                     i;
        ENTRY;
 
-       rc = dt_xattr_set(env, next, buf, name, fl, th);
+       rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th);
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
@@ -2163,7 +2156,9 @@ static int lod_xattr_set_internal(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th);
+
+               rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name,
+                                             fl, th);
                if (rc != 0)
                        break;
        }
                if (rc != 0)
                        break;
        }
@@ -2194,7 +2189,7 @@ static int lod_xattr_del_internal(const struct lu_env *env,
        int                     i;
        ENTRY;
 
        int                     i;
        ENTRY;
 
-       rc = dt_xattr_del(env, next, name, th);
+       rc = lod_sub_object_xattr_del(env, next, name, th);
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
@@ -2203,7 +2198,9 @@ static int lod_xattr_del_internal(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th);
+
+               rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name,
+                                             th);
                if (rc != 0)
                        break;
        }
                if (rc != 0)
                        break;
        }
@@ -2409,71 +2406,39 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
 
        rec->rec_type = S_IFDIR;
        for (i = 0; i < lo->ldo_stripenr; i++) {
 
        rec->rec_type = S_IFDIR;
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object        *dto;
-               char                    *stripe_name    = info->lti_key;
+               struct dt_object *dto;
+               char             *stripe_name = info->lti_key;
                struct lu_name          *sname;
                struct linkea_data       ldata          = { NULL };
                struct lu_buf            linkea_buf;
 
                dto = lo->ldo_stripe[i];
                struct lu_name          *sname;
                struct linkea_data       ldata          = { NULL };
                struct lu_buf            linkea_buf;
 
                dto = lo->ldo_stripe[i];
+
                dt_write_lock(env, dto, MOR_TGT_CHILD);
                dt_write_lock(env, dto, MOR_TGT_CHILD);
-               rc = dt_create(env, dto, attr, NULL, dof, th);
+               rc = lod_sub_object_create(env, dto, attr, NULL, dof,
+                                          th);
                if (rc != 0) {
                        dt_write_unlock(env, dto);
                if (rc != 0) {
                        dt_write_unlock(env, dto);
-                       RETURN(rc);
+                       GOTO(out, rc);
                }
 
                }
 
-               rc = dt_ref_add(env, dto, th);
+               rc = lod_sub_object_ref_add(env, dto, th);
                dt_write_unlock(env, dto);
                if (rc != 0)
                dt_write_unlock(env, dto);
                if (rc != 0)
-                       RETURN(rc);
+                       GOTO(out, rc);
 
                rec->rec_fid = lu_object_fid(&dto->do_lu);
 
                rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_insert(env, dto, (const struct dt_rec *)rec,
-                              (const struct dt_key *)dot, th, 0);
+               rc = lod_sub_object_index_insert(env, dto,
+                               (const struct dt_rec *)rec,
+                               (const struct dt_key *)dot, th, 0);
                if (rc != 0)
                if (rc != 0)
-                       RETURN(rc);
+                       GOTO(out, rc);
 
                rec->rec_fid = lu_object_fid(&dt->do_lu);
 
                rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = dt_insert(env, dto, (struct dt_rec *)rec,
+               rc = lod_sub_object_index_insert(env, dto, (struct dt_rec *)rec,
                               (const struct dt_key *)dotdot, th, 0);
                if (rc != 0)
                               (const struct dt_key *)dotdot, th, 0);
                if (rc != 0)
-                       RETURN(rc);
-
-               if (lo->ldo_def_striping_set &&
-                   !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
-                                        lo->ldo_def_stripenr,
-                                        lo->ldo_def_stripe_offset,
-                                        lo->ldo_pool)) {
-                       struct lov_user_md_v3   *v3;
-
-                       /* sigh, lti_ea_store has been used for lmv_buf,
-                        * so we have to allocate buffer for default
-                        * stripe EA */
-                       OBD_ALLOC_PTR(v3);
-                       if (v3 == NULL)
-                               GOTO(out, rc);
-
-                       memset(v3, 0, sizeof(*v3));
-                       v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
-                       v3->lmm_stripe_count =
-                               cpu_to_le16(lo->ldo_def_stripenr);
-                       v3->lmm_stripe_offset =
-                               cpu_to_le16(lo->ldo_def_stripe_offset);
-                       v3->lmm_stripe_size =
-                               cpu_to_le32(lo->ldo_def_stripe_size);
-                       if (lo->ldo_pool != NULL)
-                               strlcpy(v3->lmm_pool_name, lo->ldo_pool,
-                                       sizeof(v3->lmm_pool_name));
-
-                       info->lti_buf.lb_buf = v3;
-                       info->lti_buf.lb_len = sizeof(*v3);
-                       rc = dt_xattr_set(env, dto, &info->lti_buf,
-                                         XATTR_NAME_LOV, 0, th);
-                       OBD_FREE_PTR(v3);
-                       if (rc != 0)
-                               GOTO(out, rc);
-               }
+                       GOTO(out, rc);
 
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
 
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
@@ -2484,8 +2449,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                        else
                                slave_lmm->lmv_master_mdt_index =
                                                        cpu_to_le32(i);
                        else
                                slave_lmm->lmv_master_mdt_index =
                                                        cpu_to_le32(i);
-                       rc = dt_xattr_set(env, dto, &slave_lmv_buf,
-                                         XATTR_NAME_LMV, fl, th);
+
+                       rc = lod_sub_object_xattr_set(env, dto, &slave_lmv_buf,
+                                                     XATTR_NAME_LMV, fl, th);
                        if (rc != 0)
                                GOTO(out, rc);
                }
                        if (rc != 0)
                                GOTO(out, rc);
                }
@@ -2509,27 +2475,26 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
 
                linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
                linkea_buf.lb_len = ldata.ld_leh->leh_len;
 
                linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
                linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
-                                 0, th);
+               rc = lod_sub_object_xattr_set(env, dto, &linkea_buf,
+                                       XATTR_NAME_LINK, 0, th);
                if (rc != 0)
                        GOTO(out, rc);
 
                rec->rec_fid = lu_object_fid(&dto->do_lu);
                if (rc != 0)
                        GOTO(out, rc);
 
                rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_insert(env, dt_object_child(dt),
+               rc = lod_sub_object_index_insert(env, dt_object_child(dt),
                               (const struct dt_rec *)rec,
                               (const struct dt_key *)stripe_name, th, 0);
                if (rc != 0)
                        GOTO(out, rc);
 
                               (const struct dt_rec *)rec,
                               (const struct dt_key *)stripe_name, th, 0);
                if (rc != 0)
                        GOTO(out, rc);
 
-               rc = dt_ref_add(env, dt_object_child(dt), th);
+               rc = lod_sub_object_ref_add(env, dt_object_child(dt), th);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV))
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV))
-               rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf,
-                                 XATTR_NAME_LMV, fl, th);
-
+               rc = lod_sub_object_xattr_set(env, dt_object_child(dt),
+                                             &lmv_buf, XATTR_NAME_LMV, fl, th);
 out:
        if (slave_lmm != NULL)
                OBD_FREE_PTR(slave_lmm);
 out:
        if (slave_lmm != NULL)
                OBD_FREE_PTR(slave_lmm);
@@ -2736,7 +2701,8 @@ static int lod_xattr_set(const struct lu_env *env,
 
                if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
                                                LMV_HASH_FLAG_MIGRATION)
 
                if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
                                                LMV_HASH_FLAG_MIGRATION)
-                       rc = dt_xattr_set(env, next, buf, name, fl, th);
+                       rc = lod_sub_object_xattr_set(env, next, buf, name, fl,
+                                                     th);
                else
                        rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
 
                else
                        rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
 
@@ -2759,12 +2725,13 @@ static int lod_xattr_set(const struct lu_env *env,
                /* in case of lov EA swap, just set it
                 * if not, it is a replay so check striping match what we
                 * already have during req replay, declare_xattr_set()
                /* in case of lov EA swap, just set it
                 * if not, it is a replay so check striping match what we
                 * already have during req replay, declare_xattr_set()
-                * defines striping, then create() does the work
-               */
+                * defines striping, then create() does the work */
                if (fl & LU_XATTR_REPLACE) {
                        /* free stripes, then update disk */
                        lod_object_free_striping(env, lod_dt_obj(dt));
                if (fl & LU_XATTR_REPLACE) {
                        /* free stripes, then update disk */
                        lod_object_free_striping(env, lod_dt_obj(dt));
-                       rc = dt_xattr_set(env, next, buf, name, fl, th);
+
+                       rc = lod_sub_object_xattr_set(env, next, buf, name,
+                                                     fl, th);
                } else {
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
                }
                } else {
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
                }
@@ -2792,7 +2759,8 @@ static int lod_declare_xattr_del(const struct lu_env *env,
        int                     i;
        ENTRY;
 
        int                     i;
        ENTRY;
 
-       rc = dt_declare_xattr_del(env, dt_object_child(dt), name, th);
+       rc = lod_sub_object_declare_xattr_del(env, dt_object_child(dt),
+                                             name, th);
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
@@ -2809,7 +2777,8 @@ static int lod_declare_xattr_del(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_declare_xattr_del(env, lo->ldo_stripe[i], name, th);
+               rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i],
+                                                     name, th);
                if (rc != 0)
                        break;
        }
                if (rc != 0)
                        break;
        }
@@ -2837,7 +2806,7 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
        if (!strcmp(name, XATTR_NAME_LOV))
                lod_object_free_striping(env, lod_dt_obj(dt));
 
        if (!strcmp(name, XATTR_NAME_LOV))
                lod_object_free_striping(env, lod_dt_obj(dt));
 
-       rc = dt_xattr_del(env, next, name, th);
+       rc = lod_sub_object_xattr_del(env, next, name, th);
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
@@ -2846,7 +2815,8 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th);
+
+               rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th);
                if (rc != 0)
                        break;
        }
                if (rc != 0)
                        break;
        }
@@ -3319,7 +3289,8 @@ static int lod_declare_init_size(const struct lu_env *env,
        attr->la_valid = LA_SIZE;
        attr->la_size = size;
 
        attr->la_valid = LA_SIZE;
        attr->la_size = size;
 
-       rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
+       rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr,
+                                            th);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -3385,8 +3356,8 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
                info->lti_buf = *lovea;
        }
 
                info->lti_buf = *lovea;
        }
 
-       rc = dt_declare_xattr_set(env, next, &info->lti_buf,
-                                 XATTR_NAME_LOV, 0, th);
+       rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+                                             XATTR_NAME_LOV, 0, th);
        if (rc)
                GOTO(out, rc);
 
        if (rc)
                GOTO(out, rc);
 
@@ -3432,8 +3403,8 @@ static int lod_declare_object_create(const struct lu_env *env,
        /*
         * first of all, we declare creation of local object
         */
        /*
         * first of all, we declare creation of local object
         */
-       rc = dt_declare_create(env, next, attr, hint, dof, th);
-       if (rc)
+       rc = lod_sub_object_declare_create(env, next, attr, hint, dof, th);
+       if (rc != 0)
                GOTO(out, rc);
 
        if (dof->dof_type == DFT_SYM)
                GOTO(out, rc);
 
        if (dof->dof_type == DFT_SYM)
@@ -3513,8 +3484,8 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
        /* create all underlying objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
        /* create all underlying objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
-
+               rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL,
+                                          dof, th);
                if (rc)
                        break;
        }
                if (rc)
                        break;
        }
@@ -3542,13 +3513,13 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
                             struct dt_allocation_hint *hint,
                             struct dt_object_format *dof, struct thandle *th)
 {
                             struct dt_allocation_hint *hint,
                             struct dt_object_format *dof, struct thandle *th)
 {
-       struct dt_object   *next = dt_object_child(dt);
        struct lod_object  *lo = lod_dt_obj(dt);
        int                 rc;
        ENTRY;
 
        /* create local object */
        struct lod_object  *lo = lod_dt_obj(dt);
        int                 rc;
        ENTRY;
 
        /* create local object */
-       rc = dt_create(env, next, attr, hint, dof, th);
+       rc = lod_sub_object_create(env, dt_object_child(dt), attr, hint, dof,
+                                  th);
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
@@ -3598,22 +3569,24 @@ static int lod_declare_object_destroy(const struct lu_env *env,
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_stripenr; i++) {
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_stripenr; i++) {
-                       rc = dt_declare_ref_del(env, next, th);
+                       rc = lod_sub_object_declare_ref_del(env, next, th);
                        if (rc != 0)
                                RETURN(rc);
                        if (rc != 0)
                                RETURN(rc);
+
                        snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
                                i);
                        snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
                                i);
-                       rc = dt_declare_delete(env, next,
+                       rc = lod_sub_object_declare_delete(env, next,
                                        (const struct dt_key *)stripe_name, th);
                        if (rc != 0)
                                RETURN(rc);
                }
        }
                                        (const struct dt_key *)stripe_name, th);
                        if (rc != 0)
                                RETURN(rc);
                }
        }
+
        /*
         * we declare destroy for the local object
         */
        /*
         * we declare destroy for the local object
         */
-       rc = dt_declare_destroy(env, next, th);
+       rc = lod_sub_object_declare_destroy(env, next, th);
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
@@ -3622,18 +3595,17 @@ static int lod_declare_object_destroy(const struct lu_env *env,
 
        /* declare destroy all striped objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
 
        /* declare destroy all striped objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               if (likely(lo->ldo_stripe[i] != NULL)) {
-                       if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
-                               rc = dt_declare_ref_del(env, lo->ldo_stripe[i],
-                                                       th);
-                               if (rc != 0)
-                                       RETURN(rc);
-                       }
+               if (lo->ldo_stripe[i] == NULL)
+                       continue;
 
 
-                       rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
-                       if (rc != 0)
-                               break;
-               }
+               if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+                       rc = lod_sub_object_declare_ref_del(env,
+                                       lo->ldo_stripe[i], th);
+
+               rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i],
+                                       th);
+               if (rc != 0)
+                       break;
        }
 
        RETURN(rc);
        }
 
        RETURN(rc);
@@ -3667,7 +3639,7 @@ static int lod_object_destroy(const struct lu_env *env,
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_stripenr; i++) {
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_stripenr; i++) {
-                       rc = dt_ref_del(env, next, th);
+                       rc = lod_sub_object_ref_del(env, next, th);
                        if (rc != 0)
                                RETURN(rc);
 
                        if (rc != 0)
                                RETURN(rc);
 
@@ -3679,13 +3651,14 @@ static int lod_object_destroy(const struct lu_env *env,
                               PFID(lu_object_fid(&dt->do_lu)), stripe_name,
                               PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
 
                               PFID(lu_object_fid(&dt->do_lu)), stripe_name,
                               PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
 
-                       rc = dt_delete(env, next,
+                       rc = lod_sub_object_delete(env, next,
                                       (const struct dt_key *)stripe_name, th);
                        if (rc != 0)
                                RETURN(rc);
                }
        }
                                       (const struct dt_key *)stripe_name, th);
                        if (rc != 0)
                                RETURN(rc);
                }
        }
-       rc = dt_destroy(env, next, th);
+
+       rc = lod_sub_object_destroy(env, next, th);
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
@@ -3700,13 +3673,14 @@ static int lod_object_destroy(const struct lu_env *env,
                        if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
                                dt_write_lock(env, lo->ldo_stripe[i],
                                              MOR_TGT_CHILD);
                        if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
                                dt_write_lock(env, lo->ldo_stripe[i],
                                              MOR_TGT_CHILD);
-                               rc = dt_ref_del(env, lo->ldo_stripe[i], th);
+                               rc = lod_sub_object_ref_del(env,
+                                               lo->ldo_stripe[i], th);
                                dt_write_unlock(env, lo->ldo_stripe[i]);
                                if (rc != 0)
                                        break;
                        }
 
                                dt_write_unlock(env, lo->ldo_stripe[i]);
                                if (rc != 0)
                                        break;
                        }
 
-                       rc = dt_destroy(env, lo->ldo_stripe[i], th);
+                       rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th);
                        if (rc != 0)
                                break;
                }
                        if (rc != 0)
                                break;
                }
@@ -3724,7 +3698,7 @@ static int lod_object_destroy(const struct lu_env *env,
 static int lod_declare_ref_add(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
 static int lod_declare_ref_add(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
-       return dt_declare_ref_add(env, dt_object_child(dt), th);
+       return lod_sub_object_declare_ref_add(env, dt_object_child(dt), th);
 }
 
 /**
 }
 
 /**
@@ -3735,7 +3709,7 @@ static int lod_declare_ref_add(const struct lu_env *env,
 static int lod_ref_add(const struct lu_env *env,
                       struct dt_object *dt, struct thandle *th)
 {
 static int lod_ref_add(const struct lu_env *env,
                       struct dt_object *dt, struct thandle *th)
 {
-       return dt_ref_add(env, dt_object_child(dt), th);
+       return lod_sub_object_ref_add(env, dt_object_child(dt), th);
 }
 
 /**
 }
 
 /**
@@ -3747,7 +3721,7 @@ static int lod_ref_add(const struct lu_env *env,
 static int lod_declare_ref_del(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
 static int lod_declare_ref_del(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
-       return dt_declare_ref_del(env, dt_object_child(dt), th);
+       return lod_sub_object_declare_ref_del(env, dt_object_child(dt), th);
 }
 
 /**
 }
 
 /**
@@ -3758,7 +3732,7 @@ static int lod_declare_ref_del(const struct lu_env *env,
 static int lod_ref_del(const struct lu_env *env,
                       struct dt_object *dt, struct thandle *th)
 {
 static int lod_ref_del(const struct lu_env *env,
                       struct dt_object *dt, struct thandle *th)
 {
-       return dt_ref_del(env, dt_object_child(dt), th);
+       return lod_sub_object_ref_del(env, dt_object_child(dt), th);
 }
 
 /**
 }
 
 /**
@@ -3996,8 +3970,8 @@ static ssize_t lod_declare_write(const struct lu_env *env,
                                 const struct lu_buf *buf, loff_t pos,
                                 struct thandle *th)
 {
                                 const struct lu_buf *buf, loff_t pos,
                                 struct thandle *th)
 {
-       return dt_declare_record_write(env, dt_object_child(dt),
-                                      buf, pos, th);
+       return lod_sub_object_declare_write(env, dt_object_child(dt), buf, pos,
+                                           th);
 }
 
 /**
 }
 
 /**
@@ -4009,9 +3983,7 @@ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
                         const struct lu_buf *buf, loff_t *pos,
                         struct thandle *th, int iq)
 {
                         const struct lu_buf *buf, loff_t *pos,
                         struct thandle *th, int iq)
 {
-       struct dt_object *next = dt_object_child(dt);
-       LASSERT(next);
-       return next->do_body_ops->dbo_write(env, next, buf, pos, th, iq);
+       return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq);
 }
 
 static const struct dt_body_operations lod_body_lnk_ops = {
 }
 
 static const struct dt_body_operations lod_body_lnk_ops = {
index 33f8d0f..7ee3d33 100644 (file)
@@ -696,8 +696,8 @@ static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env,
 
        dt = container_of(n, struct dt_object, do_lu);
 
 
        dt = container_of(n, struct dt_object, do_lu);
 
-       rc = dt_declare_create(env, dt, NULL, NULL, NULL, th);
-       if (rc) {
+       rc = lod_sub_object_declare_create(env, dt, NULL, NULL, NULL, th);
+       if (rc < 0) {
                CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
                       ost_idx, rc);
                lu_object_put(env, o);
                CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
                       ost_idx, rc);
                lu_object_put(env, o);
@@ -1900,8 +1900,9 @@ int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
                        o = lo->ldo_stripe[i];
                        LASSERT(o);
 
                        o = lo->ldo_stripe[i];
                        LASSERT(o);
 
-                       rc = dt_declare_create(env, o, attr, NULL, NULL, th);
-                       if (rc) {
+                       rc = lod_sub_object_declare_create(env, o, attr, NULL,
+                                                          NULL, th);
+                       if (rc < 0) {
                                CERROR("can't declare create: %d\n", rc);
                                break;
                        }
                                CERROR("can't declare create: %d\n", rc);
                                break;
                        }
diff --git a/lustre/lod/lod_sub_object.c b/lustre/lod/lod_sub_object.c
new file mode 100644 (file)
index 0000000..cce804f
--- /dev/null
@@ -0,0 +1,702 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2014, Intel Corporation.
+ */
+/*
+ * lustre/lod/lod_sub_object.c
+ *
+ * LOD sub object methods
+ *
+ * This file implements sub-object methods for LOD.
+ *
+ * LOD is Logic volume layer in the MDS stack, which will handle striping
+ * and distribute the update to different OSP/OSD. After directing the updates
+ * to one specific OSD/OSP, it also needs to do some thing before calling
+ * OSD/OSP API, for example recording updates for cross-MDT operation, get
+ * the next level transaction etc.
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <obd.h>
+#include <obd_class.h>
+#include <lustre_ver.h>
+#include <obd_support.h>
+#include <lprocfs_status.h>
+
+#include <lustre_fid.h>
+#include <lustre_param.h>
+#include <md_object.h>
+#include <lustre_linkea.h>
+
+#include "lod_internal.h"
+
+struct thandle *lod_sub_get_thandle(const struct lu_env *env,
+                                   struct thandle *th,
+                                   const struct dt_object *sub_obj)
+{
+       struct lod_device       *lod = dt2lod_dev(th->th_dev);
+       struct top_thandle      *tth;
+       struct thandle          *sub_th;
+       int                     type = LU_SEQ_RANGE_ANY;
+       __u32                   mdt_index;
+       int                     rc;
+       ENTRY;
+
+       if (th->th_top == NULL)
+               RETURN(th);
+
+       tth = container_of(th, struct top_thandle, tt_super);
+       LASSERT(tth->tt_magic == TOP_THANDLE_MAGIC);
+       /* local object must be mdt object, Note: during ost object
+        * creation, FID is not assigned until osp_object_create(),
+        * so if the FID of sub_obj is zero, it means OST object. */
+       if (!dt_object_remote(sub_obj) ||
+           fid_is_zero(lu_object_fid(&sub_obj->do_lu)))
+               RETURN(tth->tt_master_sub_thandle);
+
+       rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu),
+                           &mdt_index, &type);
+       if (rc < 0)
+               RETURN(ERR_PTR(rc));
+
+       if (type == LU_SEQ_RANGE_OST)
+               RETURN(tth->tt_master_sub_thandle);
+
+       sub_th = thandle_get_sub(env, th, sub_obj);
+
+       RETURN(sub_th);
+}
+
+/**
+ * Declare sub-object creation.
+ *
+ * Get transaction of next layer and declare the creation of the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       the object being created
+ * \param[in] attr     the attributes of the object being created
+ * \param[in] hint     the hint of the creation
+ * \param[in] dof      the object format of the creation
+ * \param[th] th       the transaction handle
+ *
+ * \retval             0 if the declaration succeeds
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_create(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 struct lu_attr *attr,
+                                 struct dt_allocation_hint *hint,
+                                 struct dt_object_format *dof,
+                                 struct thandle *th)
+{
+       struct thandle *sub_th;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               return PTR_ERR(sub_th);
+
+       return dt_declare_create(env, dt, attr, hint, dof, sub_th);
+}
+
+/**
+ * Create sub-object.
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation, and create the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       the object being created
+ * \param[in] attr     the attributes of the object being created
+ * \param[in] hint     the hint of the creation
+ * \param[in] dof      the object format of the creation
+ * \param[th] th       the transaction handle
+ *
+ * \retval             0 if the creation succeeds
+ * \retval             negative errno if the creation fails.
+ */
+int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt,
+                         struct lu_attr *attr,
+                         struct dt_allocation_hint *hint,
+                         struct dt_object_format *dof,
+                         struct thandle *th)
+{
+       struct thandle     *sub_th;
+       int                 rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_create(env, dt, attr, hint, dof, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare adding reference for the sub-object
+ *
+ * Get transaction of next layer and declare the reference adding.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt object to add reference
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_ref_add(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_declare_ref_add(env, dt, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Add reference for the sub-object
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation and add reference of the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt object to add reference
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if it succeeds.
+ * \retval             negative errno if it fails.
+ */
+int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt,
+                          struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_ref_add(env, dt, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare deleting reference for the sub-object
+ *
+ * Get transaction of next layer and declare the reference deleting.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt object to delete reference
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_ref_del(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_declare_ref_del(env, dt, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Delete reference for the sub-object
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation and delete reference of the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt object to delete reference
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if it succeeds.
+ * \retval             negative errno if it fails.
+ */
+int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt,
+                          struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_ref_del(env, dt, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare destroying sub-object
+ *
+ * Get transaction of next layer and declare the sub-object destroy.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt object to be destroyed
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_destroy(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_declare_destroy(env, dt, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Destroy sub-object
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation and destroy the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt object to be destroyed
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the destroy succeeds.
+ * \retval             negative errno if the destroy fails.
+ */
+int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
+                          struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_destroy(env, dt, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare sub-object index insert
+ *
+ * Get transaction of next layer and declare index insert.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object for which to insert index
+ * \param[in] rec      record of the index which will be inserted
+ * \param[in] key      key of the index which will be inserted
+ * \param[in] th       the transaction handle
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_insert(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 const struct dt_rec *rec,
+                                 const struct dt_key *key,
+                                 struct thandle *th)
+{
+       struct thandle *sub_th;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               return PTR_ERR(sub_th);
+
+       return dt_declare_insert(env, dt, rec, key, sub_th);
+}
+
+/**
+ * Insert index of sub object
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation, and insert the index.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object for which to insert index
+ * \param[in] rec      record of the index to be inserted
+ * \param[in] key      key of the index to be inserted
+ * \param[in] th       the transaction handle
+ * \param[in] ign      whether ignore quota
+ *
+ * \retval             0 if the insertion succeeds.
+ * \retval             negative errno if the insertion fails.
+ */
+int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
+                               const struct dt_rec *rec,
+                               const struct dt_key *key, struct thandle *th,
+                               int ign)
+{
+       struct thandle *sub_th;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               return PTR_ERR(sub_th);
+
+       return dt_insert(env, dt, rec, key, sub_th, ign);
+}
+
+/**
+ * Declare sub-object index delete
+ *
+ * Get transaction of next layer and declare index deletion.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object for which to delete index
+ * \param[in] key      key of the index which will be deleted
+ * \param[in] th       the transaction handle
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_delete(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 const struct dt_key *key,
+                                 struct thandle *th)
+{
+       struct thandle *sub_th;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               return PTR_ERR(sub_th);
+
+       return dt_declare_delete(env, dt, key, sub_th);
+}
+
+/**
+ * Delete index of sub object
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation, and delete the index.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object for which to delete index
+ * \param[in] key      key of the index to be deleted
+ * \param[in] th       the transaction handle
+ *
+ * \retval             0 if the deletion succeeds.
+ * \retval             negative errno if the deletion fails.
+ */
+int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt,
+                         const struct dt_key *name, struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_delete(env, dt, name, sub_th);
+       RETURN(rc);
+}
+
+/**
+ * Declare xattr_set
+ *
+ * Get transaction of next layer, and declare xattr set.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to set xattr
+ * \param[in] buf      xattr to be set
+ * \param[in] name     name of the xattr
+ * \param[in] fl       flag for setting xattr
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_xattr_set(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const struct lu_buf *buf,
+                                    const char *name, int fl,
+                                    struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Set xattr
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation, and set xattr to the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to set xattr
+ * \param[in] buf      xattr to be set
+ * \param[in] name     name of the xattr
+ * \param[in] fl       flag for setting xattr
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the xattr setting succeeds.
+ * \retval             negative errno if xattr setting fails.
+ */
+int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_buf *buf, const char *name, int fl,
+                            struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_xattr_set(env, dt, buf, name, fl, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare attr_set
+ *
+ * Get transaction of next layer, and declare attr set.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to set attr
+ * \param[in] attr     attributes to be set
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_attr_set(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   const struct lu_attr *attr,
+                                   struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_declare_attr_set(env, dt, attr, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * attributes set
+ *
+ * Get transaction of next layer, record updates if it belongs to cross-MDT
+ * operation, and set attributes to the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to set attr
+ * \param[in] attr     attrbutes to be set
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if attributes setting succeeds.
+ * \retval             negative errno if the attributes setting fails.
+ */
+int lod_sub_object_attr_set(const struct lu_env *env,
+                           struct dt_object *dt,
+                           const struct lu_attr *attr,
+                           struct thandle *th)
+{
+       struct thandle     *sub_th;
+       int                 rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_attr_set(env, dt, attr, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare xattr_del
+ *
+ * Get transaction of next layer, and declare xattr deletion.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to delete xattr
+ * \param[in] name     name of the xattr to be deleted
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the declaration succeeds.
+ * \retval             negative errno if the declaration fails.
+ */
+int lod_sub_object_declare_xattr_del(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const char *name,
+                                    struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_declare_xattr_del(env, dt, name, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * xattribute deletion
+ *
+ * Get transaction of next layer, record update if it belongs to cross-MDT
+ * operation and delete xattr.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to delete xattr
+ * \param[in] name     name of the xattr to be deleted
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the deletion succeeds.
+ * \retval             negative errno if the deletion fails.
+ */
+int lod_sub_object_xattr_del(const struct lu_env *env,
+                            struct dt_object *dt,
+                            const char *name,
+                            struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_xattr_del(env, dt, name, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare buffer write
+ *
+ * Get transaction of next layer and declare buffer write.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object to be written
+ * \param[in] buf      buffer to write which includes an embedded size field
+ * \param[in] pos      offet in the object to start writing at
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the insertion succeeds.
+ * \retval             negative errno if the insertion fails.
+ */
+int lod_sub_object_declare_write(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct lu_buf *buf, loff_t pos,
+                                struct thandle *th)
+{
+       struct thandle  *sub_th;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_declare_write(env, dt, buf, pos, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Write buffer to sub object
+ *
+ * Get transaction of next layer, records buffer write if it belongs to
+ * Cross-MDT operation, and write buffer.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object to be written
+ * \param[in] buf      buffer to write which includes an embedded size field
+ * \param[in] pos      offet in the object to start writing at
+ * \param[in] th       transaction handle
+ * \param[in] rq       enforcement for this write
+ *
+ * \retval             the buffer size in bytes if it succeeds.
+ * \retval             negative errno if it fails.
+ */
+ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_buf *buf, loff_t *pos,
+                            struct thandle *th, int rq)
+{
+       struct thandle  *sub_th;
+       ssize_t         rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       rc = dt_write(env, dt, buf, pos, sub_th, rq);
+       RETURN(rc);
+}
index d581740..92b3e8f 100644 (file)
@@ -663,6 +663,7 @@ int mdd_declare_changelog_store(const struct lu_env *env,
        struct llog_ctxt                *ctxt;
        struct llog_changelog_rec       *rec;
        struct lu_buf                   *buf;
        struct llog_ctxt                *ctxt;
        struct llog_changelog_rec       *rec;
        struct lu_buf                   *buf;
+       struct thandle                  *llog_th;
        int                              reclen;
        int                              rc;
 
        int                              reclen;
        int                              rc;
 
@@ -683,7 +684,13 @@ int mdd_declare_changelog_store(const struct lu_env *env,
        if (ctxt == NULL)
                return -ENXIO;
 
        if (ctxt == NULL)
                return -ENXIO;
 
-       rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
+       llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
+       if (IS_ERR(llog_th))
+               GOTO(out_put, rc = PTR_ERR(llog_th));
+
+       rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
+
+out_put:
        llog_ctxt_put(ctxt);
 
        return rc;
        llog_ctxt_put(ctxt);
 
        return rc;
@@ -701,6 +708,7 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
 {
        struct obd_device       *obd = mdd2obd_dev(mdd);
        struct llog_ctxt        *ctxt;
 {
        struct obd_device       *obd = mdd2obd_dev(mdd);
        struct llog_ctxt        *ctxt;
+       struct thandle          *llog_th;
        int                      rc;
 
        rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
        int                      rc;
 
        rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
@@ -721,12 +729,17 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        if (ctxt == NULL)
                return -ENXIO;
 
        if (ctxt == NULL)
                return -ENXIO;
 
+       llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
+       if (IS_ERR(llog_th))
+               GOTO(out_put, rc = PTR_ERR(llog_th));
+
        /* nested journal transaction */
        /* nested journal transaction */
-       rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, th);
+       rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
+
+out_put:
        llog_ctxt_put(ctxt);
        if (rc > 0)
                rc = 0;
        llog_ctxt_put(ctxt);
        if (rc > 0)
                rc = 0;
-
        return rc;
 }
 
        return rc;
 }
 
@@ -1111,6 +1124,7 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
        if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
            mdd_object_remote(mdd_obj) == 0) {
                struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
        if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
            mdd_object_remote(mdd_obj) == 0) {
                struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+               struct thandle  *sub_th;
 
                /* XXX: If the linkEA is overflow, then we need to notify the
                 *      namespace LFSCK to skip "nlink" attribute verification
 
                /* XXX: If the linkEA is overflow, then we need to notify the
                 *      namespace LFSCK to skip "nlink" attribute verification
@@ -1120,8 +1134,11 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
                 *      mechanism in future. LU-5802. */
                lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
                               LFSCK_TYPE_NAMESPACE);
                 *      mechanism in future. LU-5802. */
                lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
                               LFSCK_TYPE_NAMESPACE);
+
+               sub_th = thandle_get_sub_by_dt(env, handle,
+                               mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
                lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
                lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
-                               lr, handle);
+                               lr, sub_th);
        }
 
        return rc;
        }
 
        return rc;
@@ -1152,6 +1169,7 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
 
        if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
                struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
 
        if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
                struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+               struct thandle  *sub_th;
 
                /* XXX: If the linkEA is overflow, then we need to notify the
                 *      namespace LFSCK to skip "nlink" attribute verification
 
                /* XXX: If the linkEA is overflow, then we need to notify the
                 *      namespace LFSCK to skip "nlink" attribute verification
@@ -1161,9 +1179,12 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
                 *      mechanism in future. LU-5802. */
                lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
                               LFSCK_TYPE_NAMESPACE);
                 *      mechanism in future. LU-5802. */
                lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
                               LFSCK_TYPE_NAMESPACE);
+
+               sub_th = thandle_get_sub_by_dt(env, handle,
+                               mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
                rc = lfsck_in_notify(env,
                                     mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
                rc = lfsck_in_notify(env,
                                     mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
-                                    lr, handle);
+                                    lr, sub_th);
        }
 
        return rc;
        }
 
        return rc;
@@ -2064,13 +2085,6 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                if (rc)
                        return rc;
        }
                if (rc)
                        return rc;
        }
-
-       /* XXX: For remote create, it should indicate the remote RPC
-        * will be sent after local transaction is finished, which
-        * is not very nice, but it will be removed once we fully support
-        * async update */
-       if (mdd_object_remote(p) && handle->th_update != NULL)
-               handle->th_update->tu_sent_after_local_trans = 1;
 out:
        return rc;
 }
 out:
        return rc;
 }
index e2d7e68..6aa802e 100644 (file)
@@ -959,7 +959,7 @@ static void osd_trans_commit_cb(struct super_block *sb,
 
         lu_context_exit(&th->th_ctx);
         lu_context_fini(&th->th_ctx);
 
         lu_context_exit(&th->th_ctx);
         lu_context_fini(&th->th_ctx);
-       thandle_put(th);
+       OBD_FREE_PTR(oh);
 }
 
 static struct thandle *osd_trans_create(const struct lu_env *env,
 }
 
 static struct thandle *osd_trans_create(const struct lu_env *env,
@@ -984,8 +984,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
                th->th_result = 0;
                th->th_tags = LCT_TX_HANDLE;
                oh->ot_credits = 0;
                th->th_result = 0;
                th->th_tags = LCT_TX_HANDLE;
                oh->ot_credits = 0;
-               atomic_set(&th->th_refc, 1);
-               th->th_alloc_size = sizeof(*oh);
                INIT_LIST_HEAD(&oh->ot_dcb_list);
                osd_th_alloced(oh);
 
                INIT_LIST_HEAD(&oh->ot_dcb_list);
                osd_th_alloced(oh);
 
@@ -1169,7 +1167,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
                        CERROR("%s: failed to stop transaction: rc = %d\n",
                               osd_name(osd), rc);
        } else {
                        CERROR("%s: failed to stop transaction: rc = %d\n",
                               osd_name(osd), rc);
        } else {
-               thandle_put(&oh->ot_super);
+               OBD_FREE_PTR(oh);
        }
 
        /* inform the quota slave device that the transaction is stopping */
        }
 
        /* inform the quota slave device that the transaction is stopping */
index dfae067..1625b62 100644 (file)
@@ -166,7 +166,7 @@ static void osd_trans_commit_cb(void *cb_data, int error)
        th->th_dev = NULL;
        lu_context_exit(&th->th_ctx);
        lu_context_fini(&th->th_ctx);
        th->th_dev = NULL;
        lu_context_exit(&th->th_ctx);
        lu_context_fini(&th->th_ctx);
-       thandle_put(&oh->ot_super);
+       OBD_FREE_PTR(oh);
 
        EXIT;
 }
 
        EXIT;
 }
@@ -248,7 +248,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
                /* there won't be any commit, release reserved quota space now,
                 * if any */
                qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
                qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
-               thandle_put(&oh->ot_super);
+               OBD_FREE_PTR(oh);
                RETURN(0);
        }
 
                RETURN(0);
        }
 
@@ -308,8 +308,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        th->th_dev = dt;
        th->th_result = 0;
        th->th_tags = LCT_TX_HANDLE;
        th->th_dev = dt;
        th->th_result = 0;
        th->th_tags = LCT_TX_HANDLE;
-       atomic_set(&th->th_refc, 1);
-       th->th_alloc_size = sizeof(*oh);
        RETURN(th);
 }
 
        RETURN(th);
 }
 
index 243d3ca..f5fd23d 100644 (file)
@@ -296,11 +296,35 @@ struct osp_it {
        struct page              **ooi_pages;
 };
 
        struct page              **ooi_pages;
 };
 
+struct osp_thandle {
+       struct thandle           ot_super;
+       struct dt_update_request *ot_dur;
+       bool                     ot_send_updates_after_local_trans:1;
+
+       /* OSP will use this thandle to update last oid*/
+       struct thandle          *ot_storage_th;
+};
+
+static inline struct osp_thandle *
+thandle_to_osp_thandle(struct thandle *th)
+{
+       return container_of(th, struct osp_thandle, ot_super);
+}
+
+static inline struct dt_update_request *
+thandle_to_dt_update_request(struct thandle *th)
+{
+       struct osp_thandle *oth;
+
+       oth = thandle_to_osp_thandle(th);
+       return oth->ot_dur;
+}
+
 /* The transaction only include the updates on the remote node, and
  * no local updates at all */
 static inline bool is_only_remote_trans(struct thandle *th)
 {
 /* The transaction only include the updates on the remote node, and
  * no local updates at all */
 static inline bool is_only_remote_trans(struct thandle *th)
 {
-       return th->th_dev != NULL && th->th_dev->dd_ops == &osp_dt_ops;
+       return th->th_top == NULL;
 }
 
 static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off,
 }
 
 static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off,
@@ -534,12 +558,17 @@ struct thandle *osp_trans_create(const struct lu_env *env,
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
+
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                        const struct object_update_request *ureq,
                        struct ptlrpc_request **reqp);
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
                    struct dt_update_request *update,
                    struct ptlrpc_request **reqp, bool rpc_lock);
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                        const struct object_update_request *ureq,
                        struct ptlrpc_request **reqp);
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
                    struct dt_update_request *update,
                    struct ptlrpc_request **reqp, bool rpc_lock);
+
+struct thandle *osp_get_storage_thandle(const struct lu_env *env,
+                                       struct thandle *th,
+                                       struct osp_device *osp);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr);
index c031457..a587b57 100644 (file)
@@ -85,13 +85,8 @@ static int __osp_md_declare_object_create(const struct lu_env *env,
        struct dt_update_request        *update;
        int                             rc;
 
        struct dt_update_request        *update;
        int                             rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        if (lu_object_exists(&dt->do_lu)) {
                /* If the object already exists, we needs to destroy
 
        if (lu_object_exists(&dt->do_lu)) {
                /* If the object already exists, we needs to destroy
@@ -245,13 +240,8 @@ static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
        struct dt_update_request        *update;
        int                             rc;
 
        struct dt_update_request        *update;
        int                             rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                     (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        rc = out_ref_del_pack(env, &update->dur_buf,
                              lu_object_fid(&dt->do_lu),
 
        rc = out_ref_del_pack(env, &update->dur_buf,
                              lu_object_fid(&dt->do_lu),
@@ -330,13 +320,8 @@ static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
        struct dt_update_request        *update;
        int                             rc;
 
        struct dt_update_request        *update;
        int                             rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        rc = out_ref_add_pack(env, &update->dur_buf,
                              lu_object_fid(&dt->do_lu),
 
        rc = out_ref_add_pack(env, &update->dur_buf,
                              lu_object_fid(&dt->do_lu),
@@ -443,13 +428,8 @@ int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
        struct dt_update_request        *update;
        int                             rc;
 
        struct dt_update_request        *update;
        int                             rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        rc = out_attr_set_pack(env, &update->dur_buf,
                               lu_object_fid(&dt->do_lu), attr,
 
        rc = out_attr_set_pack(env, &update->dur_buf,
                               lu_object_fid(&dt->do_lu), attr,
@@ -722,20 +702,23 @@ static int __osp_md_index_insert(const struct lu_env *env,
                                 const struct dt_key *key,
                                 struct thandle *th)
 {
                                 const struct dt_key *key,
                                 struct thandle *th)
 {
-       struct dt_update_request *update;
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *update = oth->ot_dur;
        int                      rc;
 
        int                      rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
 
        rc = out_index_insert_pack(env, &update->dur_buf,
                                   lu_object_fid(&dt->do_lu), rec, key,
                                   update->dur_batchid);
 
        rc = out_index_insert_pack(env, &update->dur_buf,
                                   lu_object_fid(&dt->do_lu), rec, key,
                                   update->dur_batchid);
+       if (rc != 0)
+               return rc;
+
+       /* Before async update is allowed, if it will insert remote
+        * name entry, it should make sure the local object is created,
+        * i.e. the remote update RPC should be sent after local
+        * update(create object) */
+       oth->ot_send_updates_after_local_trans = true;
+
        return rc;
 }
 
        return rc;
 }
 
@@ -829,13 +812,8 @@ static int __osp_md_index_delete(const struct lu_env *env,
        struct dt_update_request *update;
        int                      rc;
 
        struct dt_update_request *update;
        int                      rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        rc = out_index_delete_pack(env, &update->dur_buf,
                                   lu_object_fid(&dt->do_lu), key,
 
        rc = out_index_delete_pack(env, &update->dur_buf,
                                   lu_object_fid(&dt->do_lu), key,
@@ -1226,13 +1204,8 @@ static ssize_t osp_md_declare_write(const struct lu_env *env,
        struct dt_update_request  *update;
        ssize_t                   rc;
 
        struct dt_update_request  *update;
        ssize_t                   rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
                            buf, pos, update->dur_batchid);
 
        rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
                            buf, pos, update->dur_batchid);
index 8388f63..c984151 100644 (file)
@@ -1121,16 +1121,8 @@ static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
        ENTRY;
 
        LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
-
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed "DFID": rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      PFID(lu_object_fid(&dt->do_lu)),
-                      (int)PTR_ERR(update));
-
-               RETURN(PTR_ERR(update));
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        rc = out_xattr_set_pack(env, &update->dur_buf,
                                lu_object_fid(&dt->do_lu),
 
        rc = out_xattr_set_pack(env, &update->dur_buf,
                                lu_object_fid(&dt->do_lu),
@@ -1266,9 +1258,8 @@ static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
        struct osp_xattr_entry   *oxe;
        int                       rc;
 
        struct osp_xattr_entry   *oxe;
        int                       rc;
 
-       update = dt_update_request_find_or_create(th, dt);
-       if (IS_ERR(update))
-               return PTR_ERR(update);
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
        fid = lu_object_fid(&dt->do_lu);
 
 
        fid = lu_object_fid(&dt->do_lu);
 
@@ -1388,6 +1379,7 @@ static int osp_declare_object_create(const struct lu_env *env,
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
        const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
+       struct thandle          *local_th;
        int                      rc = 0;
 
        ENTRY;
        int                      rc = 0;
 
        ENTRY;
@@ -1416,13 +1408,19 @@ static int osp_declare_object_create(const struct lu_env *env,
         */
        /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
 
         */
        /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
 
+       local_th = osp_get_storage_thandle(env, th, d);
+       if (IS_ERR(local_th))
+               RETURN(PTR_ERR(local_th));
+
        if (unlikely(!fid_is_zero(fid))) {
                /* replay case: caller knows fid */
                osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
                osi->osi_lb.lb_len = sizeof(osi->osi_id);
                osi->osi_lb.lb_buf = NULL;
        if (unlikely(!fid_is_zero(fid))) {
                /* replay case: caller knows fid */
                osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
                osi->osi_lb.lb_len = sizeof(osi->osi_id);
                osi->osi_lb.lb_buf = NULL;
+
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
-                                            &osi->osi_lb, osi->osi_off, th);
+                                            &osi->osi_lb, osi->osi_off,
+                                            local_th);
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
@@ -1447,7 +1445,8 @@ static int osp_declare_object_create(const struct lu_env *env,
                osi->osi_lb.lb_len = sizeof(osi->osi_id);
                osi->osi_lb.lb_buf = NULL;
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
                osi->osi_lb.lb_len = sizeof(osi->osi_id);
                osi->osi_lb.lb_buf = NULL;
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
-                                            &osi->osi_lb, osi->osi_off, th);
+                                            &osi->osi_lb, osi->osi_off,
+                                            local_th);
        } else {
                /* not needed in the cache anymore */
                set_bit(LU_OBJECT_HEARD_BANSHEE,
        } else {
                /* not needed in the cache anymore */
                set_bit(LU_OBJECT_HEARD_BANSHEE,
@@ -1487,6 +1486,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
        struct osp_object       *o = dt2osp_obj(dt);
        int                     rc = 0;
        struct lu_fid           *fid = &osi->osi_fid;
        struct osp_object       *o = dt2osp_obj(dt);
        int                     rc = 0;
        struct lu_fid           *fid = &osi->osi_fid;
+       struct thandle          *local_th;
        ENTRY;
 
        if (is_only_remote_trans(th) &&
        ENTRY;
 
        if (is_only_remote_trans(th) &&
@@ -1529,6 +1529,9 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
        if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d))
                th->th_sync = 1;
 
        if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d))
                th->th_sync = 1;
 
+       local_th = osp_get_storage_thandle(env, th, d);
+       if (IS_ERR(local_th))
+               RETURN(PTR_ERR(local_th));
        /*
         * it's OK if the import is inactive by this moment - id was created
         * by OST earlier, we just need to maintain it consistently on the disk
        /*
         * it's OK if the import is inactive by this moment - id was created
         * by OST earlier, we just need to maintain it consistently on the disk
@@ -1563,7 +1566,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
                           &d->opd_last_used_fid.f_oid, d->opd_index);
 
        rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
                           &d->opd_last_used_fid.f_oid, d->opd_index);
 
        rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
-                            &osi->osi_off, th);
+                            &osi->osi_off, local_th);
 
        CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
               d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
 
        CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
               d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
index a480daa..17d90ed 100644 (file)
@@ -243,13 +243,17 @@ int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
        struct osp_thread_info  *osi = osp_env_info(env);
        struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
        struct llog_ctxt        *ctxt;
        struct osp_thread_info  *osi = osp_env_info(env);
        struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
        struct llog_ctxt        *ctxt;
+       struct thandle          *storage_th;
        int                      rc;
 
        ENTRY;
 
        /* it's a layering violation, to access internals of th,
         * but we can do this as a sanity check, for a while */
        int                      rc;
 
        ENTRY;
 
        /* it's a layering violation, to access internals of th,
         * but we can do this as a sanity check, for a while */
-       LASSERT(th->th_dev == d->opd_storage);
+       LASSERT(th->th_top != NULL);
+       storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
+       if (IS_ERR(storage_th))
+               RETURN(PTR_ERR(storage_th));
 
        switch (type) {
        case MDS_UNLINK64_REC:
 
        switch (type) {
        case MDS_UNLINK64_REC:
@@ -263,12 +267,13 @@ int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
        }
 
        /* we want ->dt_trans_start() to allocate per-thandle structure */
        }
 
        /* we want ->dt_trans_start() to allocate per-thandle structure */
-       th->th_tags |= LCT_OSP_THREAD;
+       storage_th->th_tags |= LCT_OSP_THREAD;
 
        ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
        LASSERT(ctxt);
 
 
        ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
        LASSERT(ctxt);
 
-       rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr, th);
+       rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
+                             storage_th);
        llog_ctxt_put(ctxt);
 
        RETURN(rc);
        llog_ctxt_put(ctxt);
 
        RETURN(rc);
@@ -303,13 +308,17 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
        struct osp_thread_info  *osi = osp_env_info(env);
        struct llog_ctxt        *ctxt;
        struct osp_txn_info     *txn;
        struct osp_thread_info  *osi = osp_env_info(env);
        struct llog_ctxt        *ctxt;
        struct osp_txn_info     *txn;
+       struct thandle          *storage_th;
        int                      rc;
 
        ENTRY;
 
        /* it's a layering violation, to access internals of th,
         * but we can do this as a sanity check, for a while */
        int                      rc;
 
        ENTRY;
 
        /* it's a layering violation, to access internals of th,
         * but we can do this as a sanity check, for a while */
-       LASSERT(th->th_dev == d->opd_storage);
+       LASSERT(th->th_top != NULL);
+       storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
+       if (IS_ERR(storage_th))
+               RETURN(PTR_ERR(storage_th));
 
        switch (type) {
        case MDS_UNLINK64_REC:
 
        switch (type) {
        case MDS_UNLINK64_REC:
@@ -335,7 +344,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                LBUG();
        }
 
                LBUG();
        }
 
-       txn = osp_txn_info(&th->th_ctx);
+       txn = osp_txn_info(&storage_th->th_ctx);
        LASSERT(txn);
 
        txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
        LASSERT(txn);
 
        txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
@@ -346,7 +355,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                RETURN(-ENOMEM);
 
        rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
                RETURN(-ENOMEM);
 
        rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
-                     th);
+                     storage_th);
        llog_ctxt_put(ctxt);
 
        if (likely(rc >= 0)) {
        llog_ctxt_put(ctxt);
 
        if (likely(rc >= 0)) {
index d00dbe7..3daed73 100644 (file)
@@ -239,8 +239,6 @@ int osp_unplug_async_request(const struct lu_env *env,
                }
                dt_update_request_destroy(update);
        } else {
                }
                dt_update_request_destroy(update);
        } else {
-               LASSERT(list_empty(&update->dur_list));
-
                args = ptlrpc_req_async_args(req);
                args->oaua_update = update;
                args->oaua_count = NULL;
                args = ptlrpc_req_async_args(req);
                args->oaua_update = update;
                args->oaua_count = NULL;
@@ -385,37 +383,29 @@ out:
  */
 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 {
  */
 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 {
-       struct thandle          *th = NULL;
-       struct thandle_update   *tu = NULL;
-       int                      rc = 0;
+       struct osp_thandle              *oth;
+       struct thandle                  *th = NULL;
+       struct dt_update_request        *update;
+       ENTRY;
 
 
-       OBD_ALLOC_PTR(th);
-       if (unlikely(th == NULL))
-               GOTO(out, rc = -ENOMEM);
+       OBD_ALLOC_PTR(oth);
+       if (unlikely(oth == NULL))
+               RETURN(ERR_PTR(-ENOMEM));
 
 
+       th = &oth->ot_super;
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
-       atomic_set(&th->th_refc, 1);
-       th->th_alloc_size = sizeof(*th);
-
-       OBD_ALLOC_PTR(tu);
-       if (tu == NULL)
-               GOTO(out, rc = -ENOMEM);
 
 
-       INIT_LIST_HEAD(&tu->tu_remote_update_list);
-       tu->tu_only_remote_trans = 1;
-       th->th_update = tu;
-
-out:
-       if (rc != 0) {
-               if (tu != NULL)
-                       OBD_FREE_PTR(tu);
-               if (th != NULL)
-                       OBD_FREE_PTR(th);
-               th = ERR_PTR(rc);
+       update = dt_update_request_create(d);
+       if (IS_ERR(update)) {
+               OBD_FREE_PTR(oth);
+               RETURN(ERR_CAST(update));
        }
 
        }
 
-       return th;
+       oth->ot_dur = update;
+       oth->ot_send_updates_after_local_trans = false;
+
+       RETURN(th);
 }
 
 /**
 }
 
 /**
@@ -543,45 +533,31 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                             struct dt_update_request *dt_update,
                             struct thandle *th, bool flow_control)
 {
                             struct dt_update_request *dt_update,
                             struct thandle *th, bool flow_control)
 {
-       struct thandle_update   *tu = th->th_update;
-       int                      rc = 0;
-
-       LASSERT(tu != NULL);
+       int     rc = 0;
 
 
-       if (is_only_remote_trans(th)) {
+       if (is_only_remote_trans(th) && !th->th_sync) {
                struct osp_async_update_args    *args;
                struct ptlrpc_request           *req;
 
                struct osp_async_update_args    *args;
                struct ptlrpc_request           *req;
 
-               list_del_init(&dt_update->dur_list);
-               if (th->th_sync) {
-                       rc = osp_remote_sync(env, osp, dt_update, NULL, true);
-                       dt_update_request_destroy(dt_update);
-
-                       return rc;
-               }
-
                rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                         dt_update->dur_buf.ub_req, &req);
                rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                         dt_update->dur_buf.ub_req, &req);
-               if (rc == 0) {
-                       down_read(&osp->opd_async_updates_rwsem);
+               if (rc != 0)
+                       return rc;
+               down_read(&osp->opd_async_updates_rwsem);
 
 
-                       args = ptlrpc_req_async_args(req);
-                       args->oaua_update = dt_update;
-                       args->oaua_count = &osp->opd_async_updates_count;
-                       args->oaua_waitq = &osp->opd_syn_barrier_waitq;
-                       args->oaua_flow_control = flow_control;
-                       req->rq_interpret_reply =
-                               osp_async_update_interpret;
+               args = ptlrpc_req_async_args(req);
+               args->oaua_update = dt_update;
+               args->oaua_count = &osp->opd_async_updates_count;
+               args->oaua_waitq = &osp->opd_syn_barrier_waitq;
+               args->oaua_flow_control = flow_control;
+               req->rq_interpret_reply =
+                       osp_async_update_interpret;
 
 
-                       atomic_inc(args->oaua_count);
-                       up_read(&osp->opd_async_updates_rwsem);
+               atomic_inc(args->oaua_count);
+               up_read(&osp->opd_async_updates_rwsem);
 
 
-                       ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
-               } else {
-                       dt_update_request_destroy(dt_update);
-               }
+               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
        } else {
        } else {
-               th->th_sync = 1;
                rc = osp_remote_sync(env, osp, dt_update, NULL, true);
        }
 
                rc = osp_remote_sync(env, osp, dt_update, NULL, true);
        }
 
@@ -589,6 +565,55 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 }
 
 /**
 }
 
 /**
+ * Get local thandle for osp_thandle
+ *
+ * Get the local OSD thandle from the OSP thandle. Currently, there
+ * are a few OSP API (osp_object_create() and osp_sync_add()) needs
+ * to update the object on local OSD device.
+ *
+ * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
+ * we will get local thandle by thandle_get_sub_by_dt.
+ *
+ * If the osp_thandle is remote thandle (th_top == NULL, only used
+ * by LFSCK), then it will create a local thandle, and stop it in
+ * osp_trans_stop(). And this only happens on OSP for OST.
+ *
+ * These are temporary solution, once OSP accessing OSD object is
+ * being fixed properly, this function should be removed. XXX
+ *
+ * \param[in] env              pointer to the thread context
+ * \param[in] th               pointer to the transaction handler
+ * \param[in] dt               pointer to the OSP device
+ *
+ * \retval                     pointer to the local thandle
+ * \retval                     ERR_PTR(errno) if it fails.
+ **/
+struct thandle *osp_get_storage_thandle(const struct lu_env *env,
+                                       struct thandle *th,
+                                       struct osp_device *osp)
+{
+       struct osp_thandle      *oth;
+       struct thandle          *local_th;
+
+       if (th->th_top != NULL)
+               return thandle_get_sub_by_dt(env, th->th_top,
+                                            osp->opd_storage);
+
+       LASSERT(!osp->opd_connect_mdt);
+       oth = thandle_to_osp_thandle(th);
+       if (oth->ot_storage_th != NULL)
+               return oth->ot_storage_th;
+
+       local_th = dt_trans_create(env, osp->opd_storage);
+       if (IS_ERR(local_th))
+               return local_th;
+
+       oth->ot_storage_th = local_th;
+
+       return local_th;
+}
+
+/**
  * The OSP layer dt_device_operations::dt_trans_start() interface
  * to start the transaction.
  *
  * The OSP layer dt_device_operations::dt_trans_start() interface
  * to start the transaction.
  *
@@ -618,22 +643,42 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th)
 {
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th)
 {
-       struct thandle_update           *tu = th->th_update;
-       struct dt_update_request        *dt_update;
-       int                              rc = 0;
-
-       if (tu == NULL)
-               return rc;
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *dt_update;
+       int                      rc = 0;
 
 
-       /* Check whether there are updates related with this OSP */
-       dt_update = out_find_update(tu, dt);
-       if (dt_update == NULL)
-               return rc;
+       dt_update = oth->ot_dur;
+       LASSERT(dt_update != NULL);
 
 
-       if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
+       /* return if there are no updates,  */
+       if (dt_update->dur_buf.ub_req == NULL ||
+           dt_update->dur_buf.ub_req->ourq_count == 0)
+               GOTO(out, rc = 0);
+
+       /* Note: some updates needs to send before local transaction,
+        * some needs to send after local transaction.
+        *
+        * If the transaction only includes remote updates, it will
+        * send updates to remote MDT in osp_trans_stop.
+        *
+        * If it is remote create, it will send the remote req after
+        * local transaction. i.e. create the object locally first,
+        * then insert the name entry.
+        *
+        * If it is remote unlink, it will send the remote req before
+        * the local transaction, i.e. delete the name entry remote
+        * first, then destroy the local object. */
+       if (!is_only_remote_trans(th) &&
+           !oth->ot_send_updates_after_local_trans)
                rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
                                       false);
 
                rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
                                       false);
 
+out:
+       /* For remote thandle, if there are local thandle, start it here*/
+       if (th->th_top == NULL && oth->ot_storage_th != NULL)
+               rc = dt_trans_start(env, oth->ot_storage_th->th_dev,
+                                   oth->ot_storage_th);
+
        return rc;
 }
 
        return rc;
 }
 
@@ -660,66 +705,65 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th)
 {
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th)
 {
-       struct thandle_update           *tu = th->th_update;
-       struct dt_update_request        *dt_update;
-       int                              rc = 0;
-       ENTRY;
 
 
-       LASSERT(tu != NULL);
-       LASSERT(tu != LP_POISON);
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *dt_update;
+       int                      rc = 0;
+       bool                     keep_dt_update = false;
+       ENTRY;
 
 
-       /* Check whether there are updates related with this OSP */
-       dt_update = out_find_update(tu, dt);
-       if (dt_update == NULL) {
-               if (!is_only_remote_trans(th))
-                       RETURN(rc);
+       dt_update = oth->ot_dur;
+       LASSERT(dt_update != NULL);
+       LASSERT(dt_update != LP_POISON);
 
 
-               GOTO(put, rc);
+       /* For remote transaction, if there is local storage thandle,
+        * stop it first */
+       if (oth->ot_storage_th != NULL && th->th_top == NULL) {
+               dt_trans_stop(env, oth->ot_storage_th->th_dev,
+                             oth->ot_storage_th);
+               oth->ot_storage_th = NULL;
        }
        }
-
+       /* If there are no updates, destroy dt_update and thandle */
        if (dt_update->dur_buf.ub_req == NULL ||
        if (dt_update->dur_buf.ub_req == NULL ||
-           dt_update->dur_buf.ub_req->ourq_count == 0) {
-               dt_update_request_destroy(dt_update);
-               GOTO(put, rc);
-       }
-
-       if (is_only_remote_trans(th)) {
-               if (th->th_result == 0) {
-                       struct osp_device *osp = dt2osp_dev(th->th_dev);
-                       struct client_obd *cli = &osp->opd_obd->u.cli;
+           dt_update->dur_buf.ub_req->ourq_count == 0)
+               GOTO(out, rc);
 
 
-                       rc = obd_get_request_slot(cli);
-                       if (!osp->opd_imp_active || !osp->opd_imp_connected) {
-                               if (rc == 0)
-                                       obd_put_request_slot(cli);
+       if (is_only_remote_trans(th) && !th->th_sync) {
+               struct osp_device *osp = dt2osp_dev(th->th_dev);
+               struct client_obd *cli = &osp->opd_obd->u.cli;
 
 
-                               rc = -ENOTCONN;
-                       }
-
-                       if (rc != 0) {
-                               dt_update_request_destroy(dt_update);
-                               GOTO(put, rc);
-                       }
+               if (th->th_result != 0) {
+                       rc = th->th_result;
+                       GOTO(out, rc);
+               }
 
 
-                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              dt_update, th, true);
-                       if (rc != 0)
+               rc = obd_get_request_slot(cli);
+               if (!osp->opd_imp_active || !osp->opd_imp_connected) {
+                       if (rc == 0)
                                obd_put_request_slot(cli);
                                obd_put_request_slot(cli);
-               } else {
-                       rc = th->th_result;
-                       dt_update_request_destroy(dt_update);
+                       rc = -ENOTCONN;
                }
                }
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = osp_trans_trigger(env, dt2osp_dev(dt),
+                                      dt_update, th, true);
+               if (rc != 0)
+                       obd_put_request_slot(cli);
+               else
+                       keep_dt_update = true;
        } else {
        } else {
-               if (tu->tu_sent_after_local_trans)
-                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              dt_update, th, false);
+               if (oth->ot_send_updates_after_local_trans ||
+                  (is_only_remote_trans(th) && th->th_sync))
+                       rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
+                                              th, false);
                rc = dt_update->dur_rc;
                rc = dt_update->dur_rc;
-               dt_update_request_destroy(dt_update);
        }
 
        }
 
-       GOTO(put, rc);
+out:
+       if (!keep_dt_update)
+               dt_update_request_destroy(dt_update);
+       OBD_FREE_PTR(oth);
 
 
-put:
-       thandle_put(th);
-       return rc;
+       RETURN(rc);
 }
 }
index 2a54e95..7a32f73 100644 (file)
@@ -20,6 +20,7 @@ ptlrpc_objs += nrs_tbf.o errno.o
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
 target_objs += $(TARGET)out_lib.o
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
 target_objs += $(TARGET)out_lib.o
+target_objs += $(TARGET)update_trans.o
 
 nodemap_objs = nodemap_handler.o nodemap_lproc.o nodemap_range.o
 nodemap_objs += nodemap_idmap.o nodemap_rbtree.o nodemap_member.o
 
 nodemap_objs = nodemap_handler.o nodemap_lproc.o nodemap_range.o
 nodemap_objs += nodemap_idmap.o nodemap_rbtree.o nodemap_member.o
index eec818c..877235e 100644 (file)
@@ -61,7 +61,6 @@
 /* obd2cli_tgt() (required by DEBUG_REQ()) */
 #include <obd.h>
 #include <lustre_debug.h>
 /* obd2cli_tgt() (required by DEBUG_REQ()) */
 #include <obd.h>
 #include <lustre_debug.h>
-#include <lustre_update.h>
 #endif /* !__REQ_LAYOUT_USER__ */
 
 /* struct ptlrpc_request, lustre_msg* */
 #endif /* !__REQ_LAYOUT_USER__ */
 
 /* struct ptlrpc_request, lustre_msg* */
index b11c11d..eaf3957 100644 (file)
@@ -33,3 +33,4 @@
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
 EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \
             out_handler.c out_lib.c
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
 EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \
             out_handler.c out_lib.c
+EXTRA_DIST += update_trans.c
index dce9da9..192a2b8 100644 (file)
 
 #define OUT_UPDATE_BUFFER_SIZE_ADD     4096
 #define OUT_UPDATE_BUFFER_SIZE_MAX     (256 * 4096)  /* 1MB update size now */
 
 #define OUT_UPDATE_BUFFER_SIZE_ADD     4096
 #define OUT_UPDATE_BUFFER_SIZE_MAX     (256 * 4096)  /* 1MB update size now */
-
-struct dt_update_request*
-out_find_update(struct thandle_update *tu, struct dt_device *dt_dev)
-{
-       struct dt_update_request   *dt_update;
-
-       list_for_each_entry(dt_update, &tu->tu_remote_update_list,
-                           dur_list) {
-               if (dt_update->dur_dt == dt_dev)
-                       return dt_update;
-       }
-       return NULL;
-}
-EXPORT_SYMBOL(out_find_update);
-
-static struct object_update_request *object_update_request_alloc(size_t size)
+static inline struct object_update_request *
+object_update_request_alloc(size_t size)
 {
        struct object_update_request *ourq;
 
 {
        struct object_update_request *ourq;
 
@@ -67,31 +53,19 @@ static struct object_update_request *object_update_request_alloc(size_t size)
        RETURN(ourq);
 }
 
        RETURN(ourq);
 }
 
-static void object_update_request_free(struct object_update_request *ourq,
-                                      size_t ourq_size)
+static inline void
+object_update_request_free(struct object_update_request *ourq,
+                          size_t ourq_size)
 {
        if (ourq != NULL)
                OBD_FREE_LARGE(ourq, ourq_size);
 }
 
 {
        if (ourq != NULL)
                OBD_FREE_LARGE(ourq, ourq_size);
 }
 
-void dt_update_request_destroy(struct dt_update_request *dt_update)
-{
-       if (dt_update == NULL)
-               return;
-
-       list_del(&dt_update->dur_list);
-
-       object_update_request_free(dt_update->dur_buf.ub_req,
-                                  dt_update->dur_buf.ub_req_size);
-       OBD_FREE_PTR(dt_update);
-}
-EXPORT_SYMBOL(dt_update_request_destroy);
-
 /**
  * Allocate and initialize dt_update_request
  *
  * dt_update_request is being used to track updates being executed on
 /**
  * Allocate and initialize dt_update_request
  *
  * dt_update_request is being used to track updates being executed on
- * this dt_device(OSD or OSP). The update buffer will be 8k initially,
+ * this dt_device(OSD or OSP). The update buffer will be 4k initially,
  * and increased if needed.
  *
  * \param [in] dt      dt device
  * and increased if needed.
  *
  * \param [in] dt      dt device
@@ -117,7 +91,6 @@ struct dt_update_request *dt_update_request_create(struct dt_device *dt)
        dt_update->dur_buf.ub_req = ourq;
        dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
 
        dt_update->dur_buf.ub_req = ourq;
        dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
 
-       INIT_LIST_HEAD(&dt_update->dur_list);
        dt_update->dur_dt = dt;
        dt_update->dur_batchid = 0;
        INIT_LIST_HEAD(&dt_update->dur_cb_items);
        dt_update->dur_dt = dt;
        dt_update->dur_batchid = 0;
        INIT_LIST_HEAD(&dt_update->dur_cb_items);
@@ -127,53 +100,22 @@ struct dt_update_request *dt_update_request_create(struct dt_device *dt)
 EXPORT_SYMBOL(dt_update_request_create);
 
 /**
 EXPORT_SYMBOL(dt_update_request_create);
 
 /**
- * Find or create dt_update_request.
+ * Destroy dt_update_request
  *
  *
- * Find or create one loc in th_dev/dev_obj_update for the update,
- * Because only one thread can access this thandle, no need
- * lock now.
- *
- * \param[in] th       transaction handle
- * \param[in] dt       lookup update request by dt_object
- *
- * \retval             pointer of dt_update_request if it can be created
- *                      or found.
- * \retval             ERR_PTR(errno) if it can not be created or found.
+ * \param [in] dt_update       dt_update_request being destroyed
  */
  */
-struct dt_update_request *
-dt_update_request_find_or_create(struct thandle *th, struct dt_object *dt)
+void dt_update_request_destroy(struct dt_update_request *dt_update)
 {
 {
-       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-       struct thandle_update   *tu = th->th_update;
-       struct dt_update_request *update;
-       ENTRY;
-
-       if (tu == NULL) {
-               OBD_ALLOC_PTR(tu);
-               if (tu == NULL)
-                       RETURN(ERR_PTR(-ENOMEM));
-
-               INIT_LIST_HEAD(&tu->tu_remote_update_list);
-               tu->tu_sent_after_local_trans = 0;
-               th->th_update = tu;
-       }
-
-       update = out_find_update(tu, dt_dev);
-       if (update != NULL)
-               RETURN(update);
-
-       update = dt_update_request_create(dt_dev);
-       if (IS_ERR(update))
-               RETURN(update);
-
-       list_add_tail(&update->dur_list, &tu->tu_remote_update_list);
+       if (dt_update == NULL)
+               return;
 
 
-       if (!tu->tu_only_remote_trans)
-               thandle_get(th);
+       object_update_request_free(dt_update->dur_buf.ub_req,
+                                  dt_update->dur_buf.ub_req_size);
+       OBD_FREE_PTR(dt_update);
 
 
-       RETURN(update);
+       return;
 }
 }
-EXPORT_SYMBOL(dt_update_request_find_or_create);
+EXPORT_SYMBOL(dt_update_request_destroy);
 
 /**
  * resize update buffer
 
 /**
  * resize update buffer
@@ -223,9 +165,9 @@ static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size)
  * \param[in] batchid  batchid(transaction no) of this update
  *
  * \retval             0 pack update succeed.
  * \param[in] batchid  batchid(transaction no) of this update
  *
  * \retval             0 pack update succeed.
- *                      negative errno pack update failed.
+ * \retval              negative errno pack update failed.
  **/
  **/
-static struct object_update*
+static struct object_update *
 out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
                       enum update_type op, const struct lu_fid *fid,
                       int params_count, __u16 *param_sizes, __u64 batchid)
 out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
                       enum update_type op, const struct lu_fid *fid,
                       int params_count, __u16 *param_sizes, __u64 batchid)
diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c
new file mode 100644 (file)
index 0000000..b57fbe8
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2014, Intel Corporation.
+ */
+/*
+ * lustre/target/update_trans.c
+ *
+ * This file implements the update distribute transaction API.
+ *
+ * To manage the cross-MDT operation (distribute operation) transaction,
+ * the transaction will also be separated two layers on MD stack, top
+ * transaction and sub transaction.
+ *
+ * During the distribute operation, top transaction is created in the LOD
+ * layer, and represent the operation. Sub transaction is created by
+ * each OSD or OSP. Top transaction start/stop will trigger all of its sub
+ * transaction start/stop. Top transaction (the whole operation) is committed
+ * only all of its sub transaction are committed.
+ *
+ * there are three kinds of transactions
+ * 1. local transaction: All updates are in a single local OSD.
+ * 2. Remote transaction: All Updates are only in the remote OSD,
+ *    i.e. locally all updates are in OSP.
+ * 3. Mixed transaction: Updates are both in local OSD and remote
+ *    OSD.
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <lu_target.h>
+#include <lustre_update.h>
+#include <obd.h>
+#include <obd_class.h>
+
+/**
+ * Create the top transaction.
+ *
+ * Create the top transaction on the master device. It will create a top
+ * thandle and a sub thandle on the master device.
+ *
+ * \param[in] env              execution environment
+ * \param[in] master_dev       master_dev the top thandle will be created
+ *
+ * \retval                     pointer to the created thandle.
+ * \retval                     ERR_PTR(errno) if creation failed.
+ */
+struct thandle *
+top_trans_create(const struct lu_env *env, struct dt_device *master_dev)
+{
+       struct top_thandle      *top_th;
+       struct thandle          *child_th;
+
+       OBD_ALLOC_GFP(top_th, sizeof(*top_th), __GFP_IO);
+       if (top_th == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       child_th = dt_trans_create(env, master_dev);
+       if (IS_ERR(child_th)) {
+               OBD_FREE_PTR(top_th);
+               return child_th;
+       }
+
+       top_th->tt_magic = TOP_THANDLE_MAGIC;
+       top_th->tt_master_sub_thandle = child_th;
+       child_th->th_top = &top_th->tt_super;
+       INIT_LIST_HEAD(&top_th->tt_sub_thandle_list);
+       top_th->tt_super.th_top = &top_th->tt_super;
+
+       return &top_th->tt_super;
+}
+EXPORT_SYMBOL(top_trans_create);
+
+/**
+ * start the top transaction.
+ *
+ * Start all of its sub transactions, then start master sub transaction.
+ *
+ * \param[in] env              execution environment
+ * \param[in] master_dev       master_dev the top thandle will be start
+ * \param[in] th               top thandle
+ *
+ * \retval                     0 if transaction start succeeds.
+ * \retval                     negative errno if start fails.
+ */
+int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
+                   struct thandle *th)
+{
+       struct top_thandle      *top_th = container_of(th, struct top_thandle,
+                                                      tt_super);
+       struct sub_thandle      *lst;
+       int                     rc = 0;
+
+       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
+       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+               lst->st_sub_th->th_sync = th->th_sync;
+               lst->st_sub_th->th_local = th->th_local;
+               rc = dt_trans_start(env, lst->st_sub_th->th_dev,
+                                   lst->st_sub_th);
+               if (rc != 0)
+                       return rc;
+       }
+
+       top_th->tt_master_sub_thandle->th_local = th->th_local;
+       top_th->tt_master_sub_thandle->th_sync = th->th_sync;
+
+       return dt_trans_start(env, master_dev, top_th->tt_master_sub_thandle);
+}
+EXPORT_SYMBOL(top_trans_start);
+
+/**
+ * Stop the top transaction.
+ *
+ * Stop the transaction on the master device first, then stop transactions
+ * on other sub devices.
+ *
+ * \param[in] env              execution environment
+ * \param[in] master_dev       master_dev the top thandle will be created
+ * \param[in] th               top thandle
+ *
+ * \retval                     0 if stop transaction succeeds.
+ * \retval                     negative errno if stop transaction fails.
+ */
+int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
+                  struct thandle *th)
+{
+       struct sub_thandle      *lst;
+       struct top_thandle      *top_th = container_of(th, struct top_thandle,
+                                                      tt_super);
+       int                     rc;
+       ENTRY;
+
+       /* Note: we always need walk through all of sub_transaction to do
+        * transaction stop to release the resource here */
+       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
+
+       top_th->tt_master_sub_thandle->th_local = th->th_local;
+       top_th->tt_master_sub_thandle->th_sync = th->th_sync;
+
+       /* To avoid sending RPC while holding thandle, it always stop local
+        * transaction first, then other sub thandle */
+       rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle);
+
+       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+               int     rc2;
+
+               if (rc != 0)
+                       lst->st_sub_th->th_result = rc;
+               lst->st_sub_th->th_sync = th->th_sync;
+               lst->st_sub_th->th_local = th->th_local;
+               rc2 = dt_trans_stop(env, lst->st_sub_th->th_dev,
+                                   lst->st_sub_th);
+               if (unlikely(rc2 < 0 && rc == 0))
+                       rc = rc2;
+       }
+
+       top_thandle_destroy(top_th);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(top_trans_stop);
+
+/**
+ * Get sub thandle.
+ *
+ * Get sub thandle from the top thandle according to the sub dt_device.
+ *
+ * \param[in] env      execution environment
+ * \param[in] th       thandle on the top layer.
+ * \param[in] sub_dt   sub dt_device used to get sub transaction
+ *
+ * \retval             thandle of sub transaction if succeed
+ * \retval             PTR_ERR(errno) if failed
+ */
+struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
+                                     struct thandle *th,
+                                     struct dt_device *sub_dt)
+{
+       struct sub_thandle      *lst;
+       struct top_thandle      *top_th = container_of(th, struct top_thandle,
+                                                      tt_super);
+       struct thandle          *sub_th;
+       ENTRY;
+
+       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
+       LASSERT(top_th->tt_master_sub_thandle != NULL);
+       if (likely(sub_dt == top_th->tt_master_sub_thandle->th_dev))
+               RETURN(top_th->tt_master_sub_thandle);
+
+       /* Find or create the transaction in tt_trans_list, since there is
+        * always only one thread access the list, so no need lock here */
+       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+               if (lst->st_sub_th->th_dev == sub_dt)
+                       RETURN(lst->st_sub_th);
+       }
+
+       sub_th = dt_trans_create(env, sub_dt);
+       if (IS_ERR(sub_th))
+               RETURN(sub_th);
+
+       /* XXX all of mixed transaction (see struct th_handle) will
+        * be synchronized until async update is done */
+       th->th_sync = 1;
+
+       sub_th->th_top = th;
+       OBD_ALLOC_PTR(lst);
+       if (lst == NULL) {
+               dt_trans_stop(env, sub_dt, sub_th);
+               RETURN(ERR_PTR(-ENOMEM));
+       }
+
+       INIT_LIST_HEAD(&lst->st_sub_list);
+       lst->st_sub_th = sub_th;
+       list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list);
+
+       RETURN(sub_th);
+}
+EXPORT_SYMBOL(thandle_get_sub_by_dt);
+
+/**
+ * Top thandle destroy
+ *
+ * Destroy the top thandle and all of its sub thandle.
+ *
+ * \param[in] top_th   top thandle to be destroyed.
+ */
+void top_thandle_destroy(struct top_thandle *top_th)
+{
+       struct sub_thandle *st;
+       struct sub_thandle *tmp;
+
+       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
+       list_for_each_entry_safe(st, tmp, &top_th->tt_sub_thandle_list,
+                                st_sub_list) {
+               list_del(&st->st_sub_list);
+               OBD_FREE_PTR(st);
+       }
+       OBD_FREE_PTR(top_th);
+}
+EXPORT_SYMBOL(top_thandle_destroy);
index ee32957..5fd1573 100755 (executable)
@@ -1193,6 +1193,12 @@ mount_facet() {
     else
         set_default_debug_facet $facet
 
     else
         set_default_debug_facet $facet
 
+       if [[ $facet == mds* ]]; then
+               do_facet $facet \
+                       lctl set_param -n mdt.${FSNAME}*.enable_remote_dir=1 \
+                               2>/dev/null
+       fi
+
                label=$(devicelabel ${facet} ${!dev})
         [ -z "$label" ] && echo no label for ${!dev} && exit 1
         eval export ${facet}_svc=${label}
                label=$(devicelabel ${facet} ${!dev})
         [ -z "$label" ] && echo no label for ${!dev} && exit 1
         eval export ${facet}_svc=${label}