Whamcloud - gitweb
LU-5099 api: transfer object type via dt_insert API 47/10447/22
authorFan Yong <fan.yong@intel.com>
Fri, 30 May 2014 04:06:29 +0000 (12:06 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 9 Jul 2014 14:41:32 +0000 (14:41 +0000)
When the LFSCK finds a dangling name entry, it may be required to
re-create the lost remote MDT-object. The LFSCK needs to know the
file type for the re-creating.

Generally, the LFSCK can get the file type from the name entry.
Unfortunately, when we insert the name entry into the directory
via dt_insert(), the sponsor does not transfer the object type.
It is not a problem for local case, because the OSD can get the
file type internally via checking the MDT-object directly. But
for remote case, when the OSD handles insert locally for remote
object, it cannot get the remote object type. On the other hand,
in DNE I, only directory can be remote, the OSD can assume that
the file type is S_IFDIR. But such assumption will be wrong when
enable remote object for normal file and will misguide the LFSCK
to re-create worng MDT-object.

This patch enhances the dt_insert() and dt_declare_insert() API
parameter to allow the insert operation sponsor to transfer the
file type directly.

The patch also changes the wire protocol for OUT_INDEX_INSERT
to transfer file type from one MDT to another.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I26a3b183f8c47a5e834a9708ae49c96d282b0067
Reviewed-on: http://review.whamcloud.com/10447
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
17 files changed:
lustre/include/dt_object.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lod/lod_internal.h
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_orphans.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_osd.c
lustre/obdclass/local_storage.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_index.c
lustre/osp/osp_md_object.c
lustre/target/out_handler.c
lustre/target/tgt_internal.h

index 0f02ba2..a39c461 100644 (file)
@@ -1547,6 +1547,20 @@ struct dt_find_hint {
        struct dt_object     *dfh_o;
 };
 
        struct dt_object     *dfh_o;
 };
 
+struct dt_insert_rec {
+       union {
+               const struct lu_fid     *rec_fid;
+               void                    *rec_data;
+       };
+       union {
+               struct {
+                       __u32            rec_type;
+                       __u32            rec_padding;
+               };
+               __u64                    rec_misc;
+       };
+};
+
 struct dt_thread_info {
        char                     dti_buf[DT_MAX_PATH];
        struct dt_find_hint      dti_dfh;
 struct dt_thread_info {
        char                     dti_buf[DT_MAX_PATH];
        struct dt_find_hint      dti_dfh;
@@ -1557,6 +1571,7 @@ struct dt_thread_info {
        struct lu_buf            dti_lb;
        struct lu_object_conf    dti_conf;
        loff_t                   dti_off;
        struct lu_buf            dti_lb;
        struct lu_object_conf    dti_conf;
        loff_t                   dti_off;
+       struct dt_insert_rec     dti_dt_rec;
 };
 
 extern struct lu_context_key dt_key;
 };
 
 extern struct lu_context_key dt_key;
index e3b6ea3..36259fa 100644 (file)
@@ -574,6 +574,7 @@ struct lfsck_thread_info {
        struct dt_allocation_hint lti_hint;
        struct lu_orphan_rec    lti_rec;
        struct lov_user_md      lti_lum;
        struct dt_allocation_hint lti_hint;
        struct lu_orphan_rec    lti_rec;
        struct lov_user_md      lti_lum;
+       struct dt_insert_rec    lti_dt_rec;
 };
 
 /* lfsck_lib.c */
 };
 
 /* lfsck_lib.c */
index f59dca7..cceebbd 100644 (file)
@@ -1979,6 +1979,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                                        __u32 ea_off)
 {
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
                                        __u32 ea_off)
 {
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
        char                            *name   = info->lti_key;
        struct lu_attr                  *la     = &info->lti_la;
        struct dt_object_format         *dof    = &info->lti_dof;
        char                            *name   = info->lti_key;
        struct lu_attr                  *la     = &info->lti_la;
        struct dt_object_format         *dof    = &info->lti_dof;
@@ -2108,8 +2109,10 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
                GOTO(stop, rc);
 
        /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
+       dtrec->rec_fid = pfid;
+       dtrec->rec_type = S_IFREG;
        rc = dt_declare_insert(env, lfsck->li_lpf_obj,
        rc = dt_declare_insert(env, lfsck->li_lpf_obj,
-                              (const struct dt_rec *)pfid,
+                              (const struct dt_rec *)dtrec,
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
@@ -2147,8 +2150,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
                GOTO(stop, rc);
 
        /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
-       rc = dt_insert(env, lfsck->li_lpf_obj,
-                      (const struct dt_rec *)pfid,
+       rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
index 247a608..b669e42 100644 (file)
@@ -364,6 +364,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                                  struct dt_object_format *dof,
                                  const char *name)
 {
                                  struct dt_object_format *dof,
                                  const char *name)
 {
+       struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
        struct dt_device        *dev    = lfsck->li_bottom;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
        struct dt_device        *dev    = lfsck->li_bottom;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
@@ -410,7 +411,9 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 4a. insert name into parent dir */
                GOTO(stop, rc);
 
        /* 4a. insert name into parent dir */
-       rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
+       rec->rec_type = S_IFDIR;
+       rec->rec_fid = cfid;
+       rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
@@ -440,13 +443,15 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                GOTO(unlock, rc = -ENOTDIR);
 
        /* 1b.2. insert dot into child dir */
                GOTO(unlock, rc = -ENOTDIR);
 
        /* 1b.2. insert dot into child dir */
-       rc = dt_insert(env, child, (const struct dt_rec *)cfid,
+       rec->rec_fid = cfid;
+       rc = dt_insert(env, child, (const struct dt_rec *)rec,
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
        /* 1b.3. insert dotdot into child dir */
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
        /* 1b.3. insert dotdot into child dir */
-       rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
+       rec->rec_fid = &LU_LPF_FID;
+       rc = dt_insert(env, child, (const struct dt_rec *)rec,
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
@@ -464,7 +469,8 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 4b. insert name into parent dir */
                GOTO(stop, rc);
 
        /* 4b. insert name into parent dir */
-       rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
+       rec->rec_fid = cfid;
+       rc = dt_insert(env, parent, (const struct dt_rec *)rec,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
@@ -502,6 +508,7 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
                                   struct dt_object_format *dof,
                                   const char *name)
 {
                                   struct dt_object_format *dof,
                                   const char *name)
 {
+       struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
        const struct lu_fid     *cfid   = lfsck_dto2fid(child);
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
        const struct lu_fid     *cfid   = lfsck_dto2fid(child);
@@ -590,13 +597,16 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
                GOTO(unlock, rc = -ENOTDIR);
 
        /* 1b.2. insert dot into child dir */
                GOTO(unlock, rc = -ENOTDIR);
 
        /* 1b.2. insert dot into child dir */
-       rc = dt_insert(env, child, (const struct dt_rec *)cfid,
+       rec->rec_type = S_IFDIR;
+       rec->rec_fid = cfid;
+       rc = dt_insert(env, child, (const struct dt_rec *)rec,
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
        /* 1b.3. insert dotdot into child dir */
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
        /* 1b.3. insert dotdot into child dir */
-       rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
+       rec->rec_fid = &LU_LPF_FID;
+       rc = dt_insert(env, child, (const struct dt_rec *)rec,
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
@@ -632,7 +642,8 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
                RETURN(PTR_ERR(th));
 
        /* 5a. insert name into parent dir */
                RETURN(PTR_ERR(th));
 
        /* 5a. insert name into parent dir */
-       rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
+       rec->rec_fid = cfid;
+       rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
@@ -647,7 +658,7 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 5b. insert name into parent dir */
                GOTO(stop, rc);
 
        /* 5b. insert name into parent dir */
-       rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
+       rc = dt_insert(env, parent, (const struct dt_rec *)rec,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
index 7d9b95f..22d248c 100644 (file)
@@ -284,6 +284,7 @@ struct lod_thread_info {
        struct dt_object_format lti_format;
        struct lu_name    lti_name;
        struct lu_buf     lti_linkea_buf;
        struct dt_object_format lti_format;
        struct lu_name    lti_name;
        struct lu_buf     lti_linkea_buf;
+       struct dt_insert_rec lti_dt_rec;
 };
 
 extern const struct lu_device_operations lod_lu_ops;
 };
 
 extern const struct lu_device_operations lod_lu_ops;
index fb43a8d..1892e3b 100644 (file)
@@ -1456,6 +1456,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        struct lu_buf           slave_lmv_buf;
        struct lmv_mds_md_v1    *lmm;
        struct lmv_mds_md_v1    *slave_lmm = NULL;
        struct lu_buf           slave_lmv_buf;
        struct lmv_mds_md_v1    *lmm;
        struct lmv_mds_md_v1    *slave_lmm = NULL;
+       struct dt_insert_rec    *rec = &info->lti_dt_rec;
        int                     stripe_count;
        int                     *idx_array;
        int                     rc = 0;
        int                     stripe_count;
        int                     *idx_array;
        int                     rc = 0;
@@ -1600,6 +1601,7 @@ next:
        if (!dt_try_as_dir(env, dt_object_child(dt)))
                GOTO(out_put, rc = -EINVAL);
 
        if (!dt_try_as_dir(env, dt_object_child(dt)))
                GOTO(out_put, rc = -EINVAL);
 
+       rec->rec_type = S_IFDIR;
        for (i = 0; i < lo->ldo_stripenr; i++) {
                struct dt_object        *dto            = stripe[i];
                char                    *stripe_name    = info->lti_key;
        for (i = 0; i < lo->ldo_stripenr; i++) {
                struct dt_object        *dto            = stripe[i];
                char                    *stripe_name    = info->lti_key;
@@ -1614,16 +1616,16 @@ next:
                if (!dt_try_as_dir(env, dto))
                        GOTO(out_put, rc = -EINVAL);
 
                if (!dt_try_as_dir(env, dto))
                        GOTO(out_put, rc = -EINVAL);
 
-               rc = dt_declare_insert(env, dto,
-                    (const struct dt_rec *)lu_object_fid(&dto->do_lu),
-                    (const struct dt_key *)dot, th);
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
+               rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)dot, th);
                if (rc != 0)
                        GOTO(out_put, rc);
 
                /* master stripe FID will be put to .. */
                if (rc != 0)
                        GOTO(out_put, rc);
 
                /* master stripe FID will be put to .. */
-               rc = dt_declare_insert(env, dto,
-                    (const struct dt_rec *)lu_object_fid(&dt->do_lu),
-                    (const struct dt_key *)dotdot, th);
+               rec->rec_fid = lu_object_fid(&dt->do_lu);
+               rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)dotdot, th);
                if (rc != 0)
                        GOTO(out_put, rc);
 
                if (rc != 0)
                        GOTO(out_put, rc);
 
@@ -1689,9 +1691,10 @@ next:
                if (rc != 0)
                        GOTO(out_put, rc);
 
                if (rc != 0)
                        GOTO(out_put, rc);
 
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
                rc = dt_declare_insert(env, dt_object_child(dt),
                rc = dt_declare_insert(env, dt_object_child(dt),
-                    (const struct dt_rec *)lu_object_fid(&dto->do_lu),
-                    (const struct dt_key *)stripe_name, th);
+                                      (const struct dt_rec *)rec,
+                                      (const struct dt_key *)stripe_name, th);
                if (rc != 0)
                        GOTO(out_put, rc);
 
                if (rc != 0)
                        GOTO(out_put, rc);
 
@@ -2055,6 +2058,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
        struct lu_buf           slave_lmv_buf;
        struct lmv_mds_md_v1    *lmm;
        struct lmv_mds_md_v1    *slave_lmm = NULL;
        struct lu_buf           slave_lmv_buf;
        struct lmv_mds_md_v1    *lmm;
        struct lmv_mds_md_v1    *slave_lmm = NULL;
+       struct dt_insert_rec    *rec = &info->lti_dt_rec;
        int                     i;
        int                     rc;
        ENTRY;
        int                     i;
        int                     rc;
        ENTRY;
@@ -2087,6 +2091,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
        slave_lmv_buf.lb_buf = slave_lmm;
        slave_lmv_buf.lb_len = sizeof(*slave_lmm);
 
        slave_lmv_buf.lb_buf = slave_lmm;
        slave_lmv_buf.lb_len = sizeof(*slave_lmm);
 
+       rec->rec_type = S_IFDIR;
        for (i = 0; i < lo->ldo_stripenr; i++) {
                struct dt_object        *dto;
                char                    *stripe_name    = info->lti_key;
        for (i = 0; i < lo->ldo_stripenr; i++) {
                struct dt_object        *dto;
                char                    *stripe_name    = info->lti_key;
@@ -2101,15 +2106,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                if (rc != 0)
                        RETURN(rc);
 
                if (rc != 0)
                        RETURN(rc);
 
-               rc = dt_insert(env, dto,
-                             (const struct dt_rec *)lu_object_fid(&dto->do_lu),
-                             (const struct dt_key *)dot, th, capa, 0);
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
+               rc = dt_insert(env, dto, (const struct dt_rec *)rec,
+                              (const struct dt_key *)dot, th, capa, 0);
                if (rc != 0)
                        RETURN(rc);
 
                if (rc != 0)
                        RETURN(rc);
 
-               rc = dt_insert(env, dto,
-                             (struct dt_rec *)lu_object_fid(&dt->do_lu),
-                             (const struct dt_key *)dotdot, th, capa, 0);
+               rec->rec_fid = lu_object_fid(&dt->do_lu);
+               rc = dt_insert(env, dto, (struct dt_rec *)rec,
+                              (const struct dt_key *)dotdot, th, capa, 0);
                if (rc != 0)
                        RETURN(rc);
 
                if (rc != 0)
                        RETURN(rc);
 
@@ -2172,9 +2177,10 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                if (rc != 0)
                        GOTO(out, rc);
 
                if (rc != 0)
                        GOTO(out, rc);
 
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
                rc = dt_insert(env, dt_object_child(dt),
                rc = dt_insert(env, dt_object_child(dt),
-                    (const struct dt_rec *)lu_object_fid(&dto->do_lu),
-                    (const struct dt_key *)stripe_name, th, capa, 0);
+                              (const struct dt_rec *)rec,
+                              (const struct dt_key *)stripe_name, th, capa, 0);
                if (rc != 0)
                        GOTO(out, rc);
 
                if (rc != 0)
                        GOTO(out, rc);
 
index 3b22a66..0262646 100644 (file)
@@ -588,7 +588,8 @@ static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *
 
 static int __mdd_index_insert_only(const struct lu_env *env,
                                   struct mdd_object *pobj,
 
 static int __mdd_index_insert_only(const struct lu_env *env,
                                   struct mdd_object *pobj,
-                                  const struct lu_fid *lf, const char *name,
+                                  const struct lu_fid *lf, __u32 type,
+                                  const char *name,
                                   struct thandle *handle,
                                   struct lustre_capa *capa)
 {
                                   struct thandle *handle,
                                   struct lustre_capa *capa)
 {
@@ -597,12 +598,15 @@ static int __mdd_index_insert_only(const struct lu_env *env,
        ENTRY;
 
        if (dt_try_as_dir(env, next)) {
        ENTRY;
 
        if (dt_try_as_dir(env, next)) {
-               struct lu_ucred  *uc = lu_ucred_check(env);
-               int ignore_quota;
+               struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
+               struct lu_ucred         *uc  = lu_ucred_check(env);
+               int                      ignore_quota;
 
 
+               rec->rec_fid = lf;
+               rec->rec_type = type;
                ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
                rc = next->do_index_ops->dio_insert(env, next,
                ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
                rc = next->do_index_ops->dio_insert(env, next,
-                                                   (struct dt_rec*)lf,
+                                                   (const struct dt_rec *)rec,
                                                    (const struct dt_key *)name,
                                                    handle, capa, ignore_quota);
        } else {
                                                    (const struct dt_key *)name,
                                                    handle, capa, ignore_quota);
        } else {
@@ -613,19 +617,21 @@ static int __mdd_index_insert_only(const struct lu_env *env,
 
 /* insert named index, add reference if isdir */
 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
 
 /* insert named index, add reference if isdir */
 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
-                              const struct lu_fid *lf, const char *name, int is_dir,
-                              struct thandle *handle, struct lustre_capa *capa)
+                             const struct lu_fid *lf, __u32 type,
+                             const char *name, struct thandle *handle,
+                             struct lustre_capa *capa)
 {
 {
-        int               rc;
-        ENTRY;
+       int rc;
+       ENTRY;
 
 
-        rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
-        if (rc == 0 && is_dir) {
-                mdd_write_lock(env, pobj, MOR_TGT_PARENT);
-                mdo_ref_add(env, pobj, handle);
-                mdd_write_unlock(env, pobj);
-        }
-        RETURN(rc);
+       rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle, capa);
+       if (rc == 0 && S_ISDIR(type)) {
+               mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+               mdo_ref_add(env, pobj, handle);
+               mdd_write_unlock(env, pobj);
+       }
+
+       RETURN(rc);
 }
 
 /* delete named index, drop reference if isdir */
 }
 
 /* delete named index, drop reference if isdir */
@@ -1229,18 +1235,19 @@ static inline int mdd_declare_links_del(const struct lu_env *env,
 }
 
 static int mdd_declare_link(const struct lu_env *env,
 }
 
 static int mdd_declare_link(const struct lu_env *env,
-                            struct mdd_device *mdd,
-                            struct mdd_object *p,
-                            struct mdd_object *c,
-                            const struct lu_name *name,
+                           struct mdd_device *mdd,
+                           struct mdd_object *p,
+                           struct mdd_object *c,
+                           const struct lu_name *name,
                            struct thandle *handle,
                            struct lu_attr *la,
                            struct linkea_data *data)
 {
                            struct thandle *handle,
                            struct lu_attr *la,
                            struct linkea_data *data)
 {
-        int rc;
+       int rc;
 
 
-        rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
-        if (rc)
+       rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
+                                     name->ln_name, handle);
+       if (rc != 0)
                 return rc;
 
         rc = mdo_declare_ref_add(env, c, handle);
                 return rc;
 
         rc = mdo_declare_ref_add(env, c, handle);
@@ -1320,7 +1327,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
 
 
        rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
 
 
        rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
-                                    name, handle,
+                                    mdd_object_type(mdd_sobj), name, handle,
                                     mdd_object_capa(env, mdd_tobj));
        if (rc != 0) {
                mdo_ref_del(env, mdd_sobj, handle);
                                     mdd_object_capa(env, mdd_tobj));
        if (rc != 0) {
                mdo_ref_del(env, mdd_sobj, handle);
@@ -1627,6 +1634,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                if (rc != 0) {
                        __mdd_index_insert_only(env, mdd_pobj,
                                                mdo2fid(mdd_cobj),
                if (rc != 0) {
                        __mdd_index_insert_only(env, mdd_pobj,
                                                mdo2fid(mdd_cobj),
+                                               mdd_object_type(mdd_cobj),
                                                name, handle,
                                                mdd_object_capa(env, mdd_pobj));
                        GOTO(cleanup, rc);
                                                name, handle,
                                                mdd_object_capa(env, mdd_pobj));
                        GOTO(cleanup, rc);
@@ -1802,7 +1810,7 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
                                         struct lu_attr *attr,
                                         struct thandle *handle)
 {
                                         struct lu_attr *attr,
                                         struct thandle *handle)
 {
-        int rc;
+       int rc;
        ENTRY;
 
        /*
        ENTRY;
 
        /*
@@ -1817,13 +1825,13 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
        attr->la_valid |= LA_MODE | LA_TYPE;
        if (rc == 0 && S_ISDIR(attr->la_mode)) {
                rc = mdo_declare_index_insert(env, child, mdo2fid(child),
        attr->la_valid |= LA_MODE | LA_TYPE;
        if (rc == 0 && S_ISDIR(attr->la_mode)) {
                rc = mdo_declare_index_insert(env, child, mdo2fid(child),
-                                             dot, handle);
-                if (rc == 0)
-                        rc = mdo_declare_ref_add(env, child, handle);
+                                             S_IFDIR, dot, handle);
+               if (rc == 0)
+                       rc = mdo_declare_ref_add(env, child, handle);
 
                rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
 
                rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
-                                             dotdot, handle);
-        }
+                                             S_IFDIR, dotdot, handle);
+       }
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -1851,14 +1859,14 @@ static int mdd_object_initialize(const struct lu_env *env,
                 /* Add "." and ".." for newly created dir */
                 mdo_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                 /* Add "." and ".." for newly created dir */
                 mdo_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
-                                             dot, handle, BYPASS_CAPA);
-                if (rc == 0)
-                        rc = __mdd_index_insert_only(env, child, pfid,
-                                                     dotdot, handle,
-                                                     BYPASS_CAPA);
-                if (rc != 0)
-                        mdo_ref_del(env, child, handle);
-        }
+                                            S_IFDIR, dot, handle, BYPASS_CAPA);
+               if (rc == 0)
+                       rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
+                                                    dotdot, handle,
+                                                    BYPASS_CAPA);
+               if (rc != 0)
+                       mdo_ref_del(env, child, handle);
+       }
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -2061,10 +2069,11 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
        } else {
                struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
 
        } else {
                struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
 
-               rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,
-                                             handle);
-               if (rc)
+               rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
+                                             name->ln_name, handle);
+               if (rc != 0)
                        return rc;
                        return rc;
+
                rc = mdd_declare_links_add(env, c, handle, ldata);
                if (rc)
                        return rc;
                rc = mdd_declare_links_add(env, c, handle, ldata);
                if (rc)
                        return rc;
@@ -2360,7 +2369,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                GOTO(out_volatile, rc);
        } else {
                rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
                GOTO(out_volatile, rc);
        } else {
                rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
-                                       name, S_ISDIR(attr->la_mode), handle,
+                                       attr->la_mode, name, handle,
                                        mdd_object_capa(env, mdd_pobj));
                if (rc != 0)
                        GOTO(err_created, rc);
                                        mdd_object_capa(env, mdd_pobj));
                if (rc != 0)
                        GOTO(err_created, rc);
@@ -2562,21 +2571,22 @@ static int mdd_declare_rename(const struct lu_env *env,
                if (mdd_spobj != mdd_tpobj) {
                        rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
                                                      handle);
                if (mdd_spobj != mdd_tpobj) {
                        rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
                                                      handle);
-                       if (rc)
+                       if (rc != 0)
                                return rc;
 
                        rc = mdo_declare_index_insert(env, mdd_sobj,
                                                      mdo2fid(mdd_tpobj),
                                return rc;
 
                        rc = mdo_declare_index_insert(env, mdd_sobj,
                                                      mdo2fid(mdd_tpobj),
-                                                     dotdot, handle);
-                       if (rc)
+                                                     S_IFDIR, dotdot, handle);
+                       if (rc != 0)
                                return rc;
                }
                                return rc;
                }
-                /* new target child can be directory,
-                 * counted by target dir's nlink */
-                rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
-                if (rc)
-                        return rc;
-        }
+
+               /* new target child can be directory,
+                * counted by target dir's nlink */
+               rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
+               if (rc != 0)
+                       return rc;
+       }
 
        la->la_valid = LA_CTIME | LA_MTIME;
        rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
 
        la->la_valid = LA_CTIME | LA_MTIME;
        rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
@@ -2596,11 +2606,12 @@ static int mdd_declare_rename(const struct lu_env *env,
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
-        /* new name */
-        rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
-                        tname->ln_name, handle);
-        if (rc)
-                return rc;
+       /* new name */
+       rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
+                                     mdd_object_type(mdd_sobj),
+                                     tname->ln_name, handle);
+       if (rc != 0)
+               return rc;
 
         /* name from target dir (old name), we declare it unconditionally
          * as mdd_rename() calls delete unconditionally as well. so just
 
         /* name from target dir (old name), we declare it unconditionally
          * as mdd_rename() calls delete unconditionally as well. so just
@@ -2740,12 +2751,13 @@ static int mdd_rename(const struct lu_env *env,
         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
                                         mdd_object_capa(env, mdd_sobj));
         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
                                         mdd_object_capa(env, mdd_sobj));
-                if (rc)
-                        GOTO(fixup_spobj2, rc);
+               if (rc != 0)
+                       GOTO(fixup_spobj2, rc);
 
 
-                rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
-                                      handle, mdd_object_capa(env, mdd_sobj));
-                if (rc)
+               rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
+                                            dotdot, handle,
+                                            mdd_object_capa(env, mdd_sobj));
+               if (rc != 0)
                         GOTO(fixup_spobj, rc);
         }
 
                         GOTO(fixup_spobj, rc);
         }
 
@@ -2765,9 +2777,9 @@ static int mdd_rename(const struct lu_env *env,
         }
 
         /* Insert new fid with target name into target dir */
         }
 
         /* Insert new fid with target name into target dir */
-        rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
-                                mdd_object_capa(env, mdd_tpobj));
-        if (rc)
+       rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
+                               tname, handle, mdd_object_capa(env, mdd_tpobj));
+       if (rc != 0)
                 GOTO(fixup_tpobj, rc);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
                 GOTO(fixup_tpobj, rc);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
@@ -2897,15 +2909,14 @@ fixup_tpobj:
                                        mdo_ref_add(env, mdd_tobj, handle);
                        }
 
                                        mdo_ref_add(env, mdd_tobj, handle);
                        }
 
-                        rc2 = __mdd_index_insert(env, mdd_tpobj,
-                                         mdo2fid(mdd_tobj), tname,
-                                         is_dir, handle,
-                                         BYPASS_CAPA);
-
-                        if (rc2)
-                                CWARN("tp obj fix error %d\n",rc2);
-                }
-        }
+                       rc2 = __mdd_index_insert(env, mdd_tpobj,
+                                                 mdo2fid(mdd_tobj),
+                                                 mdd_object_type(mdd_tobj),
+                                                 tname, handle, BYPASS_CAPA);
+                       if (rc2 != 0)
+                               CWARN("tp obj fix error: rc = %d\n", rc2);
+               }
+       }
 
 fixup_spobj:
        if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
 
 fixup_spobj:
        if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
@@ -2917,32 +2928,36 @@ fixup_spobj:
                               mdd2obd_dev(mdd)->obd_name, rc2);
 
 
                               mdd2obd_dev(mdd)->obd_name, rc2);
 
 
-               rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
+               rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
                                              dotdot, handle, BYPASS_CAPA);
                                              dotdot, handle, BYPASS_CAPA);
-               if (rc2)
+               if (rc2 != 0)
                        CWARN("%s: sp obj dotdot insert error: rc = %d\n",
                              mdd2obd_dev(mdd)->obd_name, rc2);
        }
 
 fixup_spobj2:
                        CWARN("%s: sp obj dotdot insert error: rc = %d\n",
                              mdd2obd_dev(mdd)->obd_name, rc2);
        }
 
 fixup_spobj2:
-        if (rc) {
-                rc2 = __mdd_index_insert(env, mdd_spobj,
-                                         lf, sname, is_dir, handle, BYPASS_CAPA);
-                if (rc2)
-                        CWARN("sp obj fix error %d\n",rc2);
-        }
+       if (rc != 0) {
+               rc2 = __mdd_index_insert(env, mdd_spobj, lf,
+                                        mdd_object_type(mdd_sobj), sname,
+                                        handle, BYPASS_CAPA);
+               if (rc2 != 0)
+                       CWARN("sp obj fix error: rc = %d\n", rc2);
+       }
+
 cleanup:
        if (tobj_locked)
                mdd_write_unlock(env, mdd_tobj);
 cleanup:
        if (tobj_locked)
                mdd_write_unlock(env, mdd_tobj);
+
 cleanup_unlocked:
 cleanup_unlocked:
-        if (rc == 0)
+       if (rc == 0)
                rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
                                                mdd_tobj, tpobj_fid, lf,
                                                spobj_fid, ltname, lsname,
                                                handle);
 
 stop:
                rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
                                                mdd_tobj, tpobj_fid, lf,
                                                spobj_fid, ltname, lsname,
                                                handle);
 
 stop:
-        mdd_trans_stop(env, mdd, rc, handle);
+       mdd_trans_stop(env, mdd, rc, handle);
+
 out_pending:
        mdd_object_put(env, mdd_sobj);
        return rc;
 out_pending:
        mdd_object_put(env, mdd_sobj);
        return rc;
@@ -3109,13 +3124,14 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
                        /* Insert new fid with target name into target dir */
                        rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
                                                      handle);
                        /* Insert new fid with target name into target dir */
                        rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
                                                      handle);
-                       if (rc)
+                       if (rc != 0)
                                GOTO(next_put, rc);
 
                        rc = mdo_declare_index_insert(env, pobj,
                                GOTO(next_put, rc);
 
                        rc = mdo_declare_index_insert(env, pobj,
-                                                     mdd_object_fid(mdd_tobj),
-                                                     lname.ln_name, handle);
-                       if (rc)
+                                       mdd_object_fid(mdd_tobj),
+                                       mdd_object_type(mdd_tobj),
+                                       lname.ln_name, handle);
+                       if (rc != 0)
                                GOTO(next_put, rc);
 
                        rc = mdo_declare_ref_add(env, mdd_tobj, handle);
                                GOTO(next_put, rc);
 
                        rc = mdo_declare_ref_add(env, mdd_tobj, handle);
@@ -3133,10 +3149,11 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
                                GOTO(next_put, rc);
 
                        rc = __mdd_index_insert(env, pobj,
                                GOTO(next_put, rc);
 
                        rc = __mdd_index_insert(env, pobj,
-                                               mdd_object_fid(mdd_tobj),
-                                               lname.ln_name, 0, handle,
-                                               mdd_object_capa(env, pobj));
-                       if (rc)
+                                       mdd_object_fid(mdd_tobj),
+                                       mdd_object_type(mdd_tobj),
+                                       lname.ln_name, handle,
+                                       mdd_object_capa(env, pobj));
+                       if (rc != 0)
                                GOTO(next_put, rc);
 
                        mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
                                GOTO(next_put, rc);
 
                        mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
@@ -3516,7 +3533,7 @@ static int mdd_migrate_entries(const struct lu_env *env,
                if (IS_ERR(child))
                        GOTO(out, rc = PTR_ERR(child));
 
                if (IS_ERR(child))
                        GOTO(out, rc = PTR_ERR(child));
 
-               is_dir = S_ISDIR(lu_object_attr(&child->mod_obj.mo_lu));
+               is_dir = S_ISDIR(mdd_object_type(child));
 
                snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
 
 
                snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
 
@@ -3543,6 +3560,7 @@ static int mdd_migrate_entries(const struct lu_env *env,
                if (likely(!target_exist)) {
                        rc = mdo_declare_index_insert(env, mdd_tobj,
                                                      &ent->lde_fid,
                if (likely(!target_exist)) {
                        rc = mdo_declare_index_insert(env, mdd_tobj,
                                                      &ent->lde_fid,
+                                                     mdd_object_type(child),
                                                      name, handle);
                        if (rc != 0)
                                GOTO(out_put, rc);
                                                      name, handle);
                        if (rc != 0)
                                GOTO(out_put, rc);
@@ -3571,7 +3589,7 @@ static int mdd_migrate_entries(const struct lu_env *env,
 
                        rc = mdo_declare_index_insert(env, child,
                                                      mdd_object_fid(mdd_tobj),
 
                        rc = mdo_declare_index_insert(env, child,
                                                      mdd_object_fid(mdd_tobj),
-                                                             dotdot, handle);
+                                                     S_IFDIR, dotdot, handle);
                        if (rc != 0)
                                GOTO(out_put, rc);
                }
                        if (rc != 0)
                                GOTO(out_put, rc);
                }
@@ -3592,7 +3610,8 @@ static int mdd_migrate_entries(const struct lu_env *env,
 
                if (likely(!target_exist)) {
                        rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
 
                if (likely(!target_exist)) {
                        rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
-                                               name, is_dir, handle,
+                                               mdd_object_type(child),
+                                               name, handle,
                                                mdd_object_capa(env, mdd_tobj));
                        if (rc != 0)
                                GOTO(out_put, rc);
                                                mdd_object_capa(env, mdd_tobj));
                        if (rc != 0)
                                GOTO(out_put, rc);
@@ -3617,7 +3636,7 @@ static int mdd_migrate_entries(const struct lu_env *env,
                                GOTO(out_put, rc);
 
                        rc = __mdd_index_insert_only(env, child,
                                GOTO(out_put, rc);
 
                        rc = __mdd_index_insert_only(env, child,
-                                        mdd_object_fid(mdd_tobj),
+                                        mdd_object_fid(mdd_tobj), S_IFDIR,
                                         dotdot, handle,
                                         mdd_object_capa(env, child));
                        if (rc != 0)
                                         dotdot, handle,
                                         mdd_object_capa(env, child));
                        if (rc != 0)
@@ -3715,6 +3734,7 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env,
 
        /* new name */
        rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
 
        /* new name */
        rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
+                                     mdd_object_type(mdd_tobj),
                                      lname->ln_name, handle);
        if (rc != 0)
                return rc;
                                      lname->ln_name, handle);
        if (rc != 0)
                return rc;
@@ -3831,8 +3851,9 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        }
 
        /* Insert new fid with target name into target dir */
        }
 
        /* Insert new fid with target name into target dir */
-       rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj), name,
-                               is_dir, handle, mdd_object_capa(env, mdd_pobj));
+       rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
+                               mdd_object_type(mdd_tobj), name,
+                               handle, mdd_object_capa(env, mdd_pobj));
        if (rc != 0)
                GOTO(stop_trans, rc);
 
        if (rc != 0)
                GOTO(stop_trans, rc);
 
index f98ba09..9183bba 100644 (file)
@@ -160,6 +160,7 @@ struct mdd_thread_info {
        struct dt_object_format   mti_dof;
        struct linkea_data        mti_link_data;
        struct md_op_spec         mti_spec;
        struct dt_object_format   mti_dof;
        struct linkea_data        mti_link_data;
        struct md_op_spec         mti_spec;
+       struct dt_insert_rec      mti_dt_rec;
 };
 
 extern const char orph_index_name[];
 };
 
 extern const char orph_index_name[];
@@ -614,32 +615,39 @@ int mdo_xattr_list(const struct lu_env *env, struct mdd_object *obj,
 
 static inline
 int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
 
 static inline
 int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
-                             const struct lu_fid *fid, const char *name,
-                             struct thandle *handle)
+                            const struct lu_fid *fid, __u32 type,
+                            const char *name, struct thandle *handle)
 {
 {
-        struct dt_object *next = mdd_object_child(obj);
-        int              rc = 0;
+       struct dt_object *next  = mdd_object_child(obj);
+       int               rc    = 0;
+
+       /*
+        * if the object doesn't exist yet, then it's supposed to be created
+        * and declaration of the creation should be enough to insert ./..
+        */
 
 
-        /*
-         * if the object doesn't exist yet, then it's supposed to be created
-         * and declaration of the creation should be enough to insert ./..
-         */
         /* FIXME: remote object should not be awared by MDD layer, but local
          * creation does not declare insert ./.. (comments above), which
          * is required by remote directory creation.
          * This remote check should be removed when mdd_object_exists check is
          * removed.
          */
         /* FIXME: remote object should not be awared by MDD layer, but local
          * creation does not declare insert ./.. (comments above), which
          * is required by remote directory creation.
          * This remote check should be removed when mdd_object_exists check is
          * removed.
          */
-        if (mdd_object_exists(obj) || mdd_object_remote(obj)) {
-                rc = -ENOTDIR;
-                if (dt_try_as_dir(env, next))
-                        rc = dt_declare_insert(env, next,
-                                               (struct dt_rec *)fid,
-                                               (const struct dt_key *)name,
-                                               handle);
-        }
-
-        return rc;
+       if (mdd_object_exists(obj) || mdd_object_remote(obj)) {
+               rc = -ENOTDIR;
+               if (dt_try_as_dir(env, next)) {
+                       struct dt_insert_rec *rec =
+                                       &mdd_env_info(env)->mti_dt_rec;
+
+                       rec->rec_fid = fid;
+                       rec->rec_type = type;
+                       rc = dt_declare_insert(env, next,
+                                              (const struct dt_rec *)rec,
+                                              (const struct dt_key *)name,
+                                              handle);
+               }
+        }
+
+        return rc;
 }
 
 static inline
 }
 
 static inline
index 8ab42ca..442452c 100644 (file)
@@ -109,20 +109,21 @@ static inline void mdd_orphan_write_unlock(const struct lu_env *env,
 }
 
 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
 }
 
 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
-                                        struct mdd_device *mdd,
-                                        struct mdd_object *obj,
-                                        __u32 op,
-                                        struct thandle *th)
+                                       struct mdd_device *mdd,
+                                       struct mdd_object *obj,
+                                       __u32 op, struct thandle *th)
 {
 {
-        struct dt_object        *dor    = mdd->mdd_orphans;
-        const struct lu_fid     *lf     = mdo2fid(obj);
-        struct dt_key           *key    = orph_key_fill(env, lf, op);
-        ENTRY;
+       struct dt_insert_rec    *rec    = &mdd_env_info(env)->mti_dt_rec;
+       struct dt_object        *dor    = mdd->mdd_orphans;
+       const struct lu_fid     *lf     = mdo2fid(obj);
+       struct dt_key           *key    = orph_key_fill(env, lf, op);
 
 
-        return  dor->do_index_ops->dio_insert(env, dor,
-                                              (struct dt_rec *)lf,
-                                              key, th,
-                                              BYPASS_CAPA, 1);
+       rec->rec_fid = lf;
+       rec->rec_type = mdd_object_type(obj);
+
+       return  dor->do_index_ops->dio_insert(env, dor,
+                                             (const struct dt_rec *)rec,
+                                             key, th, BYPASS_CAPA, 1);
 }
 
 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
 }
 
 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
@@ -158,14 +159,18 @@ int orph_declare_index_insert(const struct lu_env *env,
                              struct mdd_object *obj,
                              umode_t mode, struct thandle *th)
 {
                              struct mdd_object *obj,
                              umode_t mode, struct thandle *th)
 {
+       struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
        struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
        struct dt_key           *key;
        int                     rc;
 
        key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
 
        struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
        struct dt_key           *key;
        int                     rc;
 
        key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
 
-       rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, key, th);
-       if (rc)
+       rec->rec_fid = mdo2fid(obj);
+       rec->rec_type = mode;
+       rc = dt_declare_insert(env, mdd->mdd_orphans,
+                              (const struct dt_rec *)rec, key, th);
+       if (rc != 0)
                return rc;
 
        rc = mdo_declare_ref_add(env, obj, th);
                return rc;
 
        rc = mdo_declare_ref_add(env, obj, th);
@@ -187,21 +192,23 @@ int orph_declare_index_insert(const struct lu_env *env,
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
-       rc = mdo_declare_index_insert(env, obj, NULL, dotdot, th);
+       rc = mdo_declare_index_insert(env, obj,
+                                     lu_object_fid(&mdd->mdd_orphans->do_lu),
+                                     S_IFDIR, dotdot, th);
 
        return rc;
 }
 
 static int orph_index_insert(const struct lu_env *env,
 
        return rc;
 }
 
 static int orph_index_insert(const struct lu_env *env,
-                             struct mdd_object *obj,
-                             __u32 op,
-                             struct thandle *th)
+                            struct mdd_object *obj,
+                            __u32 op, struct thandle *th)
 {
 {
-        struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
-        struct dt_object        *dor    = mdd->mdd_orphans;
-        const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
-        struct dt_object        *next   = mdd_object_child(obj);
-        int rc;
+       struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
+       struct dt_object        *dor    = mdd->mdd_orphans;
+       const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
+       struct dt_object        *next   = mdd_object_child(obj);
+       struct dt_insert_rec    *rec    = &mdd_env_info(env)->mti_dt_rec;
+       int                      rc;
         ENTRY;
 
         LASSERT(mdd_write_locked(env, obj) != 0);
         ENTRY;
 
         LASSERT(mdd_write_locked(env, obj) != 0);
@@ -228,8 +235,9 @@ static int orph_index_insert(const struct lu_env *env,
                                        (const struct dt_key *)dotdot,
                                        th, BYPASS_CAPA);
 
                                        (const struct dt_key *)dotdot,
                                        th, BYPASS_CAPA);
 
-        next->do_index_ops->dio_insert(env, next,
-                                       (struct dt_rec *)lf_dor,
+       rec->rec_fid = lf_dor;
+       rec->rec_type = S_IFDIR;
+       next->do_index_ops->dio_insert(env, next, (const struct dt_rec *)rec,
                                        (const struct dt_key *)dotdot,
                                        th, BYPASS_CAPA, 1);
 
                                        (const struct dt_key *)dotdot,
                                        th, BYPASS_CAPA, 1);
 
index 5bf0493..6eec20b 100644 (file)
@@ -57,6 +57,7 @@ struct llog_thread_info {
        struct lu_buf                    lgi_buf;
        loff_t                           lgi_off;
        struct llog_logid_rec            lgi_logid;
        struct lu_buf                    lgi_buf;
        loff_t                           lgi_off;
        struct llog_logid_rec            lgi_logid;
+       struct dt_insert_rec             lgi_dt_rec;
 };
 
 extern struct lu_context_key llog_thread_key;
 };
 
 extern struct lu_context_key llog_thread_key;
index 6c80c73..8295d39 100644 (file)
@@ -1020,6 +1020,7 @@ static int llog_osd_declare_create(const struct lu_env *env,
                                   struct llog_handle *res, struct thandle *th)
 {
        struct llog_thread_info         *lgi = llog_info(env);
                                   struct llog_handle *res, struct thandle *th)
 {
        struct llog_thread_info         *lgi = llog_info(env);
+       struct dt_insert_rec            *rec = &lgi->lgi_dt_rec;
        struct local_oid_storage        *los;
        struct dt_object                *o;
        int                              rc;
        struct local_oid_storage        *los;
        struct dt_object                *o;
        int                              rc;
@@ -1051,8 +1052,10 @@ static int llog_osd_declare_create(const struct lu_env *env,
                if (IS_ERR(llog_dir))
                        RETURN(PTR_ERR(llog_dir));
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
                if (IS_ERR(llog_dir))
                        RETURN(PTR_ERR(llog_dir));
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
+               rec->rec_fid = &lgi->lgi_fid;
+               rec->rec_type = S_IFREG;
                rc = dt_declare_insert(env, llog_dir,
                rc = dt_declare_insert(env, llog_dir,
-                                      (struct dt_rec *)&lgi->lgi_fid,
+                                      (struct dt_rec *)rec,
                                       (struct dt_key *)res->lgh_name, th);
                lu_object_put(env, &llog_dir->do_lu);
                if (rc)
                                       (struct dt_key *)res->lgh_name, th);
                lu_object_put(env, &llog_dir->do_lu);
                if (rc)
@@ -1080,6 +1083,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                           struct thandle *th)
 {
        struct llog_thread_info *lgi = llog_info(env);
                           struct thandle *th)
 {
        struct llog_thread_info *lgi = llog_info(env);
+       struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
        struct local_oid_storage *los;
        struct dt_object        *o;
        int                      rc = 0;
        struct local_oid_storage *los;
        struct dt_object        *o;
        int                      rc = 0;
@@ -1115,9 +1119,10 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                        RETURN(PTR_ERR(llog_dir));
 
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
                        RETURN(PTR_ERR(llog_dir));
 
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
+               rec->rec_fid = &lgi->lgi_fid;
+               rec->rec_type = S_IFREG;
                dt_read_lock(env, llog_dir, 0);
                dt_read_lock(env, llog_dir, 0);
-               rc = dt_insert(env, llog_dir,
-                              (struct dt_rec *)&lgi->lgi_fid,
+               rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
                               (struct dt_key *)res->lgh_name,
                               th, BYPASS_CAPA, 1);
                dt_read_unlock(env, llog_dir);
                               (struct dt_key *)res->lgh_name,
                               th, BYPASS_CAPA, 1);
                dt_read_unlock(env, llog_dir);
index fd9f352..89fc5df 100644 (file)
@@ -301,6 +301,7 @@ struct dt_object *__local_file_create(const struct lu_env *env,
 {
        struct dt_thread_info   *dti    = dt_info(env);
        struct lu_object_conf   *conf   = &dti->dti_conf;
 {
        struct dt_thread_info   *dti    = dt_info(env);
        struct lu_object_conf   *conf   = &dti->dti_conf;
+       struct dt_insert_rec    *rec    = &dti->dti_dt_rec;
        struct dt_object        *dto;
        struct thandle          *th;
        int                      rc;
        struct dt_object        *dto;
        struct thandle          *th;
        int                      rc;
@@ -331,7 +332,10 @@ struct dt_object *__local_file_create(const struct lu_env *env,
                dt_declare_ref_add(env, parent, th);
        }
 
                dt_declare_ref_add(env, parent, th);
        }
 
-       rc = dt_declare_insert(env, parent, (void *)fid, (void *)name, th);
+       rec->rec_fid = fid;
+       rec->rec_type = dto->do_lu.lo_header->loh_attr;
+       rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
+                              (const struct dt_key *)name, th);
        if (rc)
                GOTO(trans_stop, rc);
 
        if (rc)
                GOTO(trans_stop, rc);
 
@@ -353,20 +357,27 @@ struct dt_object *__local_file_create(const struct lu_env *env,
        if (dti->dti_dof.dof_type == DFT_DIR) {
                if (!dt_try_as_dir(env, dto))
                        GOTO(destroy, rc = -ENOTDIR);
        if (dti->dti_dof.dof_type == DFT_DIR) {
                if (!dt_try_as_dir(env, dto))
                        GOTO(destroy, rc = -ENOTDIR);
+
+               rec->rec_type = S_IFDIR;
+               rec->rec_fid = fid;
                /* Add "." and ".." for newly created dir */
                /* Add "." and ".." for newly created dir */
-               rc = dt_insert(env, dto, (void *)fid, (void *)".", th,
-                              BYPASS_CAPA, 1);
-               if (rc)
+               rc = dt_insert(env, dto, (const struct dt_rec *)rec,
+                              (const struct dt_key *)".", th, BYPASS_CAPA, 1);
+               if (rc != 0)
                        GOTO(destroy, rc);
                        GOTO(destroy, rc);
+
                dt_ref_add(env, dto, th);
                dt_ref_add(env, dto, th);
-               rc = dt_insert(env, dto, (void *)lu_object_fid(&parent->do_lu),
-                              (void *)"..", th, BYPASS_CAPA, 1);
-               if (rc)
+               rec->rec_fid = lu_object_fid(&parent->do_lu);
+               rc = dt_insert(env, dto, (const struct dt_rec *)rec,
+                              (const struct dt_key *)"..", th, BYPASS_CAPA, 1);
+               if (rc != 0)
                        GOTO(destroy, rc);
        }
 
                        GOTO(destroy, rc);
        }
 
+       rec->rec_fid = fid;
+       rec->rec_type = dto->do_lu.lo_header->loh_attr;
        dt_write_lock(env, parent, 0);
        dt_write_lock(env, parent, 0);
-       rc = dt_insert(env, parent, (const struct dt_rec *)fid,
+       rc = dt_insert(env, parent, (const struct dt_rec *)rec,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (dti->dti_dof.dof_type == DFT_DIR)
                dt_ref_add(env, parent, th);
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (dti->dti_dof.dof_type == DFT_DIR)
                dt_ref_add(env, parent, th);
@@ -629,8 +640,11 @@ int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
 
        rc = dt_ref_del(env, dto, th);
        if (rc < 0) {
 
        rc = dt_ref_del(env, dto, th);
        if (rc < 0) {
-               rc = dt_insert(env, parent,
-                              (const struct dt_rec *)&dti->dti_fid,
+               struct dt_insert_rec *rec = &dti->dti_dt_rec;
+
+               rec->rec_fid = &dti->dti_fid;
+               rec->rec_type = dto->do_lu.lo_header->loh_attr;
+               rc = dt_insert(env, parent, (const struct dt_rec *)rec,
                               (const struct dt_key *)name, th, BYPASS_CAPA, 1);
                GOTO(unlock, rc);
        }
                               (const struct dt_key *)name, th, BYPASS_CAPA, 1);
                GOTO(unlock, rc);
        }
index cb0728d..e4412fb 100644 (file)
@@ -2571,9 +2571,9 @@ int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode,
  * its inmemory API.
  */
 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
  * its inmemory API.
  */
 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
-                                 const struct dt_rec *fid)
+                                 const struct lu_fid *fid)
 {
 {
-       if (!fid_is_namespace_visible((const struct lu_fid *)fid) ||
+       if (!fid_is_namespace_visible(fid) ||
            OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF)) {
                param->edp_magic = 0;
                return;
            OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF)) {
                param->edp_magic = 0;
                return;
@@ -2610,9 +2610,9 @@ static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
 
 static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
                                        struct inode *dir,
 
 static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
                                        struct inode *dir,
-                                       struct inode  *parent_dir,
-                                       const struct dt_rec *dot_fid,
-                                       const struct dt_rec *dot_dot_fid,
+                                       struct inode *parent_dir,
+                                       const struct lu_fid *dot_fid,
+                                       const struct lu_fid *dot_dot_fid,
                                        struct osd_thandle *oth)
 {
        struct ldiskfs_dentry_param *dot_ldp;
                                        struct osd_thandle *oth)
 {
        struct ldiskfs_dentry_param *dot_ldp;
@@ -2634,6 +2634,7 @@ static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
                                                  struct osd_device *osd,
                                                  struct osd_object *pobj,
                                                  const struct lu_fid *fid,
                                                  struct osd_device *osd,
                                                  struct osd_object *pobj,
                                                  const struct lu_fid *fid,
+                                                 __u32 type,
                                                  struct thandle *th)
 {
        struct osd_thread_info  *info = osd_oti_get(env);
                                                  struct thandle *th)
 {
        struct osd_thread_info  *info = osd_oti_get(env);
@@ -2646,9 +2647,7 @@ static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
        oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle->h_transaction != NULL);
 
        oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle->h_transaction != NULL);
 
-       /* FIXME: Insert index api needs to know the mode of
-        * the remote object. Just use S_IFDIR for now */
-       local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR);
+       local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, type);
        if (IS_ERR(local)) {
                CERROR("%s: create local error %d\n", osd_name(osd),
                       (int)PTR_ERR(local));
        if (IS_ERR(local)) {
                CERROR("%s: create local error %d\n", osd_name(osd),
                       (int)PTR_ERR(local));
@@ -2663,9 +2662,12 @@ static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
                RETURN(ERR_PTR(rc));
        }
 
                RETURN(ERR_PTR(rc));
        }
 
+       if (!S_ISDIR(type))
+               RETURN(local);
+
        rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
        rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
-               (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
-               (const struct dt_rec *)fid, oh);
+                                        lu_object_fid(&pobj->oo_dt.do_lu),
+                                        fid, oh);
        if (rc != 0) {
                CERROR("%s: "DFID" add dot dotdot error: rc = %d\n",
                        osd_name(osd), PFID(fid), rc);
        if (rc != 0) {
                CERROR("%s: "DFID" add dot dotdot error: rc = %d\n",
                        osd_name(osd), PFID(fid), rc);
@@ -3569,7 +3571,7 @@ static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
 }
 
 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
 }
 
 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
-                         struct lu_fid *fid)
+                         const struct lu_fid *fid)
 {
        struct seq_server_site  *ss = osd_seq_site(osd);
        ENTRY;
 {
        struct seq_server_site  *ss = osd_seq_site(osd);
        ENTRY;
@@ -3909,9 +3911,9 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
  * \retval -ve, on error
  */
 static int __osd_ea_add_rec(struct osd_thread_info *info,
  * \retval -ve, on error
  */
 static int __osd_ea_add_rec(struct osd_thread_info *info,
-                            struct osd_object *pobj, struct inode  *cinode,
-                            const char *name, const struct dt_rec *fid,
-                            struct htree_lock *hlock, struct thandle *th)
+                           struct osd_object *pobj, struct inode  *cinode,
+                           const char *name, const struct lu_fid *fid,
+                           struct htree_lock *hlock, struct thandle *th)
 {
         struct ldiskfs_dentry_param *ldp;
         struct dentry               *child;
 {
         struct ldiskfs_dentry_param *ldp;
         struct dentry               *child;
@@ -3950,11 +3952,11 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
  * \retval -ve, on error
  */
 static int osd_add_dot_dotdot(struct osd_thread_info *info,
  * \retval -ve, on error
  */
 static int osd_add_dot_dotdot(struct osd_thread_info *info,
-                              struct osd_object *dir,
-                              struct inode  *parent_dir, const char *name,
-                              const struct dt_rec *dot_fid,
-                              const struct dt_rec *dot_dot_fid,
-                              struct thandle *th)
+                             struct osd_object *dir,
+                             struct inode *parent_dir, const char *name,
+                             const struct lu_fid *dot_fid,
+                             const struct lu_fid *dot_dot_fid,
+                             struct thandle *th)
 {
         struct inode                *inode = dir->oo_inode;
         struct osd_thandle          *oth;
 {
         struct inode                *inode = dir->oo_inode;
         struct osd_thandle          *oth;
@@ -3982,8 +3984,8 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                }
 
                result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
                }
 
                result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
-                                               parent_dir, dot_fid,
-                                               dot_dot_fid, oth);
+                                                    parent_dir, dot_fid,
+                                                    dot_dot_fid, oth);
                if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
        }
                if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
        }
@@ -3997,8 +3999,8 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
  * value, return by respective functions.
  */
 static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
  * value, return by respective functions.
  */
 static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
-                          struct inode *cinode, const char *name,
-                          const struct dt_rec *fid, struct thandle *th)
+                         struct inode *cinode, const char *name,
+                         const struct lu_fid *fid, struct thandle *th)
 {
         struct osd_thread_info *info   = osd_oti_get(env);
         struct htree_lock      *hlock;
 {
         struct osd_thread_info *info   = osd_oti_get(env);
         struct htree_lock      *hlock;
@@ -4013,9 +4015,10 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
                                            pobj->oo_inode, 0);
                 } else {
                        down_write(&pobj->oo_ext_idx_sem);
                                            pobj->oo_inode, 0);
                 } else {
                        down_write(&pobj->oo_ext_idx_sem);
-                }
-                rc = osd_add_dot_dotdot(info, pobj, cinode, name,
-                     (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
+               }
+
+               rc = osd_add_dot_dotdot(info, pobj, cinode, name,
+                                       lu_object_fid(&pobj->oo_dt.do_lu),
                                         fid, th);
         } else {
                 if (hlock != NULL) {
                                         fid, th);
         } else {
                 if (hlock != NULL) {
@@ -4028,11 +4031,10 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
                if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR)) {
                        struct lu_fid *tfid = &info->oti_fid;
 
                if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR)) {
                        struct lu_fid *tfid = &info->oti_fid;
 
-                       *tfid = *(const struct lu_fid *)fid;
+                       *tfid = *fid;
                        tfid->f_ver = ~0;
                        rc = __osd_ea_add_rec(info, pobj, cinode, name,
                        tfid->f_ver = ~0;
                        rc = __osd_ea_add_rec(info, pobj, cinode, name,
-                                             (const struct dt_rec *)tfid,
-                                             hlock, th);
+                                             tfid, hlock, th);
                } else {
                        rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
                                              hlock, th);
                } else {
                        rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
                                              hlock, th);
@@ -4359,7 +4361,8 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
 {
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_dev(dt->do_lu.lo_dev);
 {
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_dev(dt->do_lu.lo_dev);
-       struct lu_fid           *fid = (struct lu_fid *) rec;
+       struct dt_insert_rec    *rec1   = (struct dt_insert_rec *)rec;
+       const struct lu_fid     *fid    = rec1->rec_fid;
        const char              *name = (const char *)key;
        struct osd_thread_info  *oti   = osd_oti_get(env);
        struct osd_inode_id     *id    = &oti->oti_id;
        const char              *name = (const char *)key;
        struct osd_thread_info  *oti   = osd_oti_get(env);
        struct osd_inode_id     *id    = &oti->oti_id;
@@ -4368,14 +4371,14 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
        int                     rc;
        ENTRY;
 
        int                     rc;
        ENTRY;
 
-        LASSERT(osd_invariant(obj));
+       LASSERT(osd_invariant(obj));
        LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
        LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(th != NULL);
+       LASSERT(th != NULL);
 
        osd_trans_exec_op(env, th, OSD_OT_INSERT);
 
 
        osd_trans_exec_op(env, th, OSD_OT_INSERT);
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
-                RETURN(-EACCES);
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+               RETURN(-EACCES);
 
        LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
 
 
        LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
 
@@ -4406,8 +4409,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                        child_inode = igrab(omm->omm_remote_parent->d_inode);
                } else {
                        child_inode = osd_create_local_agent_inode(env, osd,
                        child_inode = igrab(omm->omm_remote_parent->d_inode);
                } else {
                        child_inode = osd_create_local_agent_inode(env, osd,
-                                                                  obj, fid,
-                                                                  th);
+                                       obj, fid, rec1->rec_type & S_IFMT, th);
                        if (IS_ERR(child_inode))
                                RETURN(PTR_ERR(child_inode));
                }
                        if (IS_ERR(child_inode))
                                RETURN(PTR_ERR(child_inode));
                }
@@ -4424,7 +4426,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                child_inode = igrab(child->oo_inode);
        }
 
                child_inode = igrab(child->oo_inode);
        }
 
-       rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th);
+       rc = osd_ea_add_rec(env, obj, child_inode, name, fid, th);
 
        CDEBUG(D_INODE, "parent %lu insert %s:%lu rc = %d\n",
               obj->oo_inode->i_ino, name, child_inode->i_ino, rc);
 
        CDEBUG(D_INODE, "parent %lu insert %s:%lu rc = %d\n",
               obj->oo_inode->i_ino, name, child_inode->i_ino, rc);
@@ -5094,7 +5096,7 @@ osd_dirent_reinsert(const struct lu_env *env, handle_t *jh,
        dentry = osd_child_dentry_by_inode(env, dir, ent->oied_name,
                                           ent->oied_namelen);
        ldp = (struct ldiskfs_dentry_param *)osd_oti_get(env)->oti_ldp;
        dentry = osd_child_dentry_by_inode(env, dir, ent->oied_name,
                                           ent->oied_namelen);
        ldp = (struct ldiskfs_dentry_param *)osd_oti_get(env)->oti_ldp;
-       osd_get_ldiskfs_dirent_param(ldp, (const struct dt_rec *)fid);
+       osd_get_ldiskfs_dirent_param(ldp, fid);
        dentry->d_fsdata = (void *)ldp;
        ll_vfs_dq_init(dir);
        rc = osd_ldiskfs_add_entry(jh, dentry, inode, hlock);
        dentry->d_fsdata = (void *)ldp;
        ll_vfs_dq_init(dir);
        rc = osd_ldiskfs_add_entry(jh, dentry, inode, hlock);
index bd5a08a..43779a9 100644 (file)
@@ -494,7 +494,7 @@ static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
 }
 
 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
 }
 
 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
-                         struct lu_fid *fid)
+                         const struct lu_fid *fid)
 {
        struct seq_server_site  *ss = osd_seq_site(osd);
        ENTRY;
 {
        struct seq_server_site  *ss = osd_seq_site(osd);
        ENTRY;
@@ -537,7 +537,8 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object   *parent = osd_dt_obj(dt);
        struct osd_device   *osd = osd_obj2dev(parent);
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object   *parent = osd_dt_obj(dt);
        struct osd_device   *osd = osd_obj2dev(parent);
-       struct lu_fid       *fid = (struct lu_fid *)rec;
+       struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
+       const struct lu_fid *fid = rec1->rec_fid;
        struct osd_thandle  *oh;
        struct osd_object   *child = NULL;
        __u32                attr;
        struct osd_thandle  *oh;
        struct osd_object   *child = NULL;
        __u32                attr;
@@ -564,7 +565,7 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(rc == 1)) {
                /* Insert remote entry */
                memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
        if (unlikely(rc == 1)) {
                /* Insert remote entry */
                memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
-               oti->oti_zde.lzd_reg.zde_type = IFTODT(S_IFDIR & S_IFMT);
+               oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
        } else {
                /*
                 * To simulate old Orion setups with ./..  stored in the
        } else {
                /*
                 * To simulate old Orion setups with ./..  stored in the
index d84a259..6dec919 100644 (file)
@@ -397,13 +397,20 @@ static int osp_md_declare_insert(const struct lu_env *env,
                                 const struct dt_key *key,
                                 struct thandle *th)
 {
                                 const struct dt_key *key,
                                 struct thandle *th)
 {
-       struct dt_update_request *update;
-       struct lu_fid            *fid;
-       struct lu_fid            *rec_fid = (struct lu_fid *)rec;
-       int                      size[2] = {strlen((char *)key) + 1,
-                                                 sizeof(*rec_fid)};
-       const char               *bufs[2] = {(char *)key, (char *)rec_fid};
-       int                      rc;
+       struct osp_thread_info     *info = osp_env_info(env);
+       struct dt_update_request   *update;
+       struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
+       struct lu_fid              *fid =
+                               (struct lu_fid *)lu_object_fid(&dt->do_lu);
+       struct lu_fid              *rec_fid = &info->osi_fid;
+       __u32                       type = cpu_to_le32(rec1->rec_type);
+       int                         size[3] = { strlen((char *)key) + 1,
+                                               sizeof(*rec_fid),
+                                               sizeof(type) };
+       const char                 *bufs[3] = { (char *)key,
+                                               (char *)rec_fid,
+                                               (char *)&type };
+       int                         rc;
 
        update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
 
        update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
@@ -413,14 +420,11 @@ static int osp_md_declare_insert(const struct lu_env *env,
                return PTR_ERR(update);
        }
 
                return PTR_ERR(update);
        }
 
-       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-
-       CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
+       CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n",
               dt->do_lu.lo_dev->ld_obd->obd_name,
               dt->do_lu.lo_dev->ld_obd->obd_name,
-              PFID(fid), (char *)key, PFID(rec_fid));
-
-       fid_cpu_to_le(rec_fid, rec_fid);
+              PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type);
 
 
+       fid_cpu_to_le(rec_fid, rec1->rec_fid);
        rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
                               ARRAY_SIZE(size), size, bufs);
        return rc;
        rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
                               ARRAY_SIZE(size), size, bufs);
        return rc;
index b9c9322..78e55f4 100644 (file)
@@ -899,9 +899,10 @@ static int out_obj_index_insert(const struct lu_env *env,
 {
        int rc;
 
 {
        int rc;
 
-       CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
+       CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n",
               dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
               dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
-              (char *)key, PFID((struct lu_fid *)rec));
+              (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid),
+              ((struct dt_insert_rec *)rec)->rec_type);
 
        if (dt_try_as_dir(env, dt_obj) == 0)
                return -ENOTDIR;
 
        if (dt_try_as_dir(env, dt_obj) == 0)
                return -ENOTDIR;
@@ -940,7 +941,8 @@ static int out_tx_index_insert_exec(const struct lu_env *env,
        struct dt_object *dt_obj = arg->object;
        int rc;
 
        struct dt_object *dt_obj = arg->object;
        int rc;
 
-       rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
+       rc = out_obj_index_insert(env, dt_obj,
+                                 (const struct dt_rec *)&arg->u.insert.rec,
                                  arg->u.insert.key, th);
 
        CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
                                  arg->u.insert.key, th);
 
        CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
@@ -959,7 +961,8 @@ static int out_tx_index_insert_undo(const struct lu_env *env,
 
 static int __out_tx_index_insert(const struct lu_env *env,
                                 struct dt_object *dt_obj,
 
 static int __out_tx_index_insert(const struct lu_env *env,
                                 struct dt_object *dt_obj,
-                                char *name, struct lu_fid *fid,
+                                const struct dt_rec *rec,
+                                const struct dt_key *key,
                                 struct thandle_exec_args *ta,
                                 struct object_update_reply *reply,
                                 int index, const char *file, int line)
                                 struct thandle_exec_args *ta,
                                 struct object_update_reply *reply,
                                 int index, const char *file, int line)
@@ -973,8 +976,7 @@ static int __out_tx_index_insert(const struct lu_env *env,
                return rc;
        }
 
                return rc;
        }
 
-       rc = dt_declare_insert(env, dt_obj, (struct dt_rec *)fid,
-                              (struct dt_key *)name, ta->ta_handle);
+       rc = dt_declare_insert(env, dt_obj, rec, key, ta->ta_handle);
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
@@ -987,22 +989,23 @@ static int __out_tx_index_insert(const struct lu_env *env,
        arg->object = dt_obj;
        arg->reply = reply;
        arg->index = index;
        arg->object = dt_obj;
        arg->reply = reply;
        arg->index = index;
-       arg->u.insert.rec = (struct dt_rec *)fid;
-       arg->u.insert.key = (struct dt_key *)name;
+       arg->u.insert.rec = *(const struct dt_insert_rec *)rec;
+       arg->u.insert.key = key;
 
        return 0;
 }
 
 static int out_index_insert(struct tgt_session_info *tsi)
 {
 
        return 0;
 }
 
 static int out_index_insert(struct tgt_session_info *tsi)
 {
-       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
-       struct object_update    *update = tti->tti_u.update.tti_update;
-       struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
-       struct lu_fid     *fid;
-       char              *name;
-       int                rc = 0;
-       int                size;
-
+       struct tgt_thread_info  *tti    = tgt_th_info(tsi->tsi_env);
+       struct object_update    *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj    = tti->tti_u.update.tti_dt_object;
+       struct dt_insert_rec    *rec    = &tti->tti_rec;
+       struct lu_fid           *fid;
+       char                    *name;
+       __u32                   *ptype;
+       int                      rc     = 0;
+       int                      size;
        ENTRY;
 
        name = object_update_param_get(update, 0, NULL);
        ENTRY;
 
        name = object_update_param_get(update, 0, NULL);
@@ -1028,8 +1031,21 @@ static int out_index_insert(struct tgt_session_info *tsi)
                RETURN(err_serious(-EPROTO));
        }
 
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
-                                &tti->tti_tea,
+       ptype = object_update_param_get(update, 2, &size);
+       if (ptype == NULL || size != sizeof(*ptype)) {
+               CERROR("%s: invalid type for index insert: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+               __swab32s(ptype);
+
+       rec->rec_fid = fid;
+       rec->rec_type = *ptype;
+
+       rc = out_tx_index_insert(tsi->tsi_env, obj, (const struct dt_rec *)rec,
+                                (const struct dt_key *)name, &tti->tti_tea,
                                 tti->tti_u.update.tti_update_reply,
                                 tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
                                 tti->tti_u.update.tti_update_reply,
                                 tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
@@ -1043,7 +1059,7 @@ static int out_tx_index_delete_exec(const struct lu_env *env,
 
        rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
 
 
        rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
 
-       CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
+       CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n",
               dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
               dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
@@ -1061,7 +1077,8 @@ static int out_tx_index_delete_undo(const struct lu_env *env,
 }
 
 static int __out_tx_index_delete(const struct lu_env *env,
 }
 
 static int __out_tx_index_delete(const struct lu_env *env,
-                                struct dt_object *dt_obj, char *name,
+                                struct dt_object *dt_obj,
+                                const struct dt_key *key,
                                 struct thandle_exec_args *ta,
                                 struct object_update_reply *reply,
                                 int index, const char *file, int line)
                                 struct thandle_exec_args *ta,
                                 struct object_update_reply *reply,
                                 int index, const char *file, int line)
@@ -1075,8 +1092,7 @@ static int __out_tx_index_delete(const struct lu_env *env,
        }
 
        LASSERT(ta->ta_handle != NULL);
        }
 
        LASSERT(ta->ta_handle != NULL);
-       rc = dt_declare_delete(env, dt_obj, (struct dt_key *)name,
-                              ta->ta_handle);
+       rc = dt_declare_delete(env, dt_obj, key, ta->ta_handle);
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
@@ -1089,7 +1105,7 @@ static int __out_tx_index_delete(const struct lu_env *env,
        arg->object = dt_obj;
        arg->reply = reply;
        arg->index = index;
        arg->object = dt_obj;
        arg->reply = reply;
        arg->index = index;
-       arg->u.insert.key = (struct dt_key *)name;
+       arg->u.insert.key = key;
        return 0;
 }
 
        return 0;
 }
 
@@ -1111,7 +1127,8 @@ static int out_index_delete(struct tgt_session_info *tsi)
                RETURN(err_serious(-EPROTO));
        }
 
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
+       rc = out_tx_index_delete(tsi->tsi_env, obj, (const struct dt_key *)name,
+                                &tti->tti_tea,
                                 tti->tti_u.update.tti_update_reply,
                                 tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
                                 tti->tti_u.update.tti_update_reply,
                                 tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
index 86436c2..ebe3a1a 100644 (file)
@@ -61,7 +61,7 @@ struct tx_arg {
        int                      index;
        union {
                struct {
        int                      index;
        union {
                struct {
-                       const struct dt_rec     *rec;
+                       struct dt_insert_rec     rec;
                        const struct dt_key     *key;
                } insert;
                struct {
                        const struct dt_key     *key;
                } insert;
                struct {
@@ -136,6 +136,7 @@ struct tgt_thread_info {
                } update;
        } tti_u;
        struct lfsck_request tti_lr;
                } update;
        } tti_u;
        struct lfsck_request tti_lr;
+       struct dt_insert_rec tti_rec;
 };
 
 extern struct lu_context_key tgt_thread_key;
 };
 
 extern struct lu_context_key tgt_thread_key;
@@ -193,12 +194,12 @@ int out_handle(struct tgt_session_info *tsi);
 #define out_tx_ref_del(info, obj, th, reply, idx) \
        __out_tx_ref_del(info, obj, th, reply, idx, __FILE__, __LINE__)
 
 #define out_tx_ref_del(info, obj, th, reply, idx) \
        __out_tx_ref_del(info, obj, th, reply, idx, __FILE__, __LINE__)
 
-#define out_tx_index_insert(info, obj, th, name, fid, reply, idx) \
-       __out_tx_index_insert(info, obj, th, name, fid, reply, idx, \
+#define out_tx_index_insert(info, obj, rec, key, th, reply, idx) \
+       __out_tx_index_insert(info, obj, rec, key, th, reply, idx, \
                              __FILE__, __LINE__)
 
                              __FILE__, __LINE__)
 
-#define out_tx_index_delete(info, obj, th, name, reply, idx) \
-       __out_tx_index_delete(info, obj, th, name, reply, idx, \
+#define out_tx_index_delete(info, obj, key, th, reply, idx) \
+       __out_tx_index_delete(info, obj, key, th, reply, idx, \
                              __FILE__, __LINE__)
 
 #define out_tx_destroy(info, obj, th, reply, idx) \
                              __FILE__, __LINE__)
 
 #define out_tx_destroy(info, obj, th, reply, idx) \