Whamcloud - gitweb
Land b_head_interop_disk on HEAD (20081119_1314)
authordeshmukh <deshmukh>
Wed, 19 Nov 2008 08:55:54 +0000 (08:55 +0000)
committerdeshmukh <deshmukh>
Wed, 19 Nov 2008 08:55:54 +0000 (08:55 +0000)
b=11826
i=nikita
i=adilger

60 files changed:
lustre/ChangeLog
lustre/cmm/cmm_device.c
lustre/fid/fid_handler.c
lustre/fid/fid_lib.c
lustre/fid/fid_store.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/include/dt_object.h
lustre/include/linux/lvfs.h
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_disk.h
lustre/include/lustre_fid.h
lustre/include/lustre_fld.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_lov.h
lustre/llite/llite_fid.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/xattr.c
lustre/lov/lov_obd.c
lustre/lov/lov_request.c
lustre/lvfs/lvfs_linux.c
lustre/mdd/mdd_device.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_object.c
lustre/mdd/mdd_orphans.c
lustre/mdd/mdd_trans.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_lov.c
lustre/mdt/mdt_capa.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/obdclass/Makefile.in
lustre/obdclass/dt_object.c
lustre/obdclass/lu_object.c
lustre/obdclass/md_local_object.c [new file with mode: 0644]
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/lproc_obdfilter.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/osd/osd_handler.c
lustre/osd/osd_internal.h
lustre/osd/osd_oi.c
lustre/osd/osd_oi.h
lustre/tests/cfg/lmv.sh
lustre/tests/conf-sanity.sh
lustre/tests/disk1_8.tgz [new file with mode: 0644]
lustre/tests/test-framework.sh
lustre/utils/mkfs_lustre.c

index 0026ea9..1a55ea8 100644 (file)
@@ -13,6 +13,11 @@ tbd  Sun Microsystems, Inc.
         removed cwd "./" (refer to Bugzilla 14399).
        * File join has been disabled in this release, refer to Bugzilla 16929.
  
+
+Severity   : enhancement
+Bugzilla   : 11826
+Description: Interoperability at server side (Disk interoperability)
+
 Severity   : enhancement
 Bugzilla   : 17458
 Description: Update to SLES10 SP2 kernel-2.6.16.60-0.31.
index 839322a..a74fdf4 100644 (file)
@@ -613,10 +613,24 @@ static int cmm_recovery_complete(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int cmm_prepare(const struct lu_env *env,
+                       struct lu_device *pdev,
+                       struct lu_device *dev)
+{
+        struct cmm_device *cmm = lu2cmm_dev(dev);
+        struct lu_device *next = md2lu_dev(cmm->cmm_child);
+        int rc;
+
+        ENTRY;
+        rc = next->ld_ops->ldo_prepare(env, dev, next);
+        RETURN(rc);
+}
+
 static const struct lu_device_operations cmm_lu_ops = {
        .ldo_object_alloc      = cmm_object_alloc,
         .ldo_process_config    = cmm_process_config,
-        .ldo_recovery_complete = cmm_recovery_complete
+        .ldo_recovery_complete = cmm_recovery_complete,
+        .ldo_prepare           = cmm_prepare,
 };
 
 /* --- lu_device_type operations --- */
index 774aacc..2b28571 100644 (file)
@@ -556,6 +556,18 @@ EXPORT_SYMBOL(seq_server_fini);
 
 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
 
+static struct lu_local_obj_desc llod_seq_srv = {
+        .llod_name      = LUSTRE_SEQ_SRV_NAME,
+        .llod_oid       = FID_SEQ_SRV_OID,
+        .llod_is_index  = 0,
+};
+
+static struct lu_local_obj_desc llod_seq_ctl = {
+        .llod_name      = LUSTRE_SEQ_CTL_NAME,
+        .llod_oid       = FID_SEQ_CTL_OID,
+        .llod_is_index  = 0,
+};
+
 static int __init fid_mod_init(void)
 {
         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
@@ -564,6 +576,9 @@ static int __init fid_mod_init(void)
         if (IS_ERR(seq_type_proc_dir))
                 return PTR_ERR(seq_type_proc_dir);
 
+        llo_local_obj_register(&llod_seq_srv);
+        llo_local_obj_register(&llod_seq_ctl);
+
         LU_CONTEXT_KEY_INIT(&seq_thread_key);
         lu_context_key_register(&seq_thread_key);
         return 0;
index 254a4e1..694ee78 100644 (file)
@@ -69,6 +69,7 @@
  * </pre>
  *
  * The first 0x400 sequences of normal FID are reserved for special purpose.
+ * FID_SEQ_START + 1 is for local file id generation.
  */
 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
         FID_SEQ_START + 0x400ULL,
index 7a827da..de4bec3 100644 (file)
@@ -167,7 +167,7 @@ int seq_store_init(struct lu_server_seq *seq,
         name = seq->lss_type == LUSTRE_SEQ_SERVER ?
                 LUSTRE_SEQ_SRV_NAME : LUSTRE_SEQ_CTL_NAME;
 
-        dt_obj = dt_store_open(env, dt, name, &fid);
+        dt_obj = dt_store_open(env, dt, "", name, &fid);
         if (!IS_ERR(dt_obj)) {
                 seq->lss_obj = dt_obj;
                rc = 0;
index 3138a54..a5809bc 100644 (file)
@@ -63,6 +63,7 @@
 #include <lprocfs_status.h>
 
 #include <md_object.h>
+#include <lustre_fid.h>
 #include <lustre_req_layout.h>
 #include "fld_internal.h"
 
@@ -76,6 +77,13 @@ LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD|LCT_DT_THREAD);
 
 cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
 
+static struct lu_local_obj_desc llod_fld_index = {
+        .llod_name      = fld_index_name,
+        .llod_oid       = FLD_INDEX_OID,
+        .llod_is_index  = 1,
+        .llod_feat      = &fld_index_features,
+};
+
 static int __init fld_mod_init(void)
 {
         fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
@@ -84,6 +92,8 @@ static int __init fld_mod_init(void)
         if (IS_ERR(fld_type_proc_dir))
                 return PTR_ERR(fld_type_proc_dir);
 
+        llo_local_obj_register(&llod_fld_index);
+
         LU_CONTEXT_KEY_INIT(&fld_thread_key);
         lu_context_key_register(&fld_thread_key);
         return 0;
index aba0bb0..1b927ea 100644 (file)
 #include "fld_internal.h"
 
 const char fld_index_name[] = "fld";
+EXPORT_SYMBOL(fld_index_name);
 
-static const struct dt_index_features fld_index_features = {
+const struct dt_index_features fld_index_features = {
         .dif_flags       = DT_IND_UPDATE,
         .dif_keysize_min = sizeof(seqno_t),
         .dif_keysize_max = sizeof(seqno_t),
         .dif_recsize_min = sizeof(mdsno_t),
-        .dif_recsize_max = sizeof(mdsno_t)
+        .dif_recsize_max = sizeof(mdsno_t),
+        .dif_ptrsize     = 4
 };
 
+EXPORT_SYMBOL(fld_index_features);
+
 /*
  * number of blocks to reserve for particular operations. Should be function of
  * ... something. Stub for now.
@@ -173,8 +177,11 @@ int fld_index_lookup(struct lu_server_fld *fld,
 
         rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec,
                                               fld_key(env, seq), BYPASS_CAPA);
-        if (rc == 0)
+        if (rc > 0) {
                 *mds = be64_to_cpu(*(__u64 *)rec);
+                rc = 0;
+        } else
+                rc = -ENOENT;
         RETURN(rc);
 }
 
@@ -187,7 +194,7 @@ int fld_index_init(struct lu_server_fld *fld,
         int rc;
         ENTRY;
 
-        dt_obj = dt_store_open(env, dt, fld_index_name, &fid);
+        dt_obj = dt_store_open(env, dt, "", fld_index_name, &fid);
         if (!IS_ERR(dt_obj)) {
                 fld->lsf_obj = dt_obj;
                 rc = dt_obj->do_ops->do_index_try(env, dt_obj,
index 0cd80c4..536273d 100644 (file)
@@ -171,6 +171,8 @@ struct dt_index_features {
         size_t dif_recsize_min;
         /** maximal required record size, 0 if no limit */
         size_t dif_recsize_max;
+        /** pointer size for record */
+        size_t dif_ptrsize;
 };
 
 enum dt_index_flags {
@@ -196,11 +198,51 @@ extern const struct dt_index_features dt_directory_features;
  * It can contain any allocation hint in the future.
  */
 struct dt_allocation_hint {
-        struct dt_object *dah_parent;
-        __u32             dah_mode;
+        struct dt_object           *dah_parent;
+        __u32                       dah_mode;
 };
 
 /**
+ * object type specifier.
+ */
+
+enum dt_format_type {
+        DFT_REGULAR,
+        DFT_DIR,
+        /** for mknod */
+        DFT_NODE,
+        /** for special index */
+        DFT_INDEX,
+        /** for symbolic link */
+        DFT_SYM,
+};
+
+/**
+ * object format specifier.
+ */
+struct dt_object_format {
+        /** type for dt object */
+        enum dt_format_type dof_type;
+        union {
+                struct dof_regular {
+                } dof_reg;
+                struct dof_dir {
+                } dof_dir;
+                struct dof_node {
+                } dof_node;
+                /**
+                 * special index need feature as parameter to create
+                 * special idx
+                 */
+                struct dof_index {
+                        const struct dt_index_features *di_feat;
+                } dof_idx;
+        } u;
+};
+
+enum dt_format_type dt_mode_to_dft(__u32 mode);
+
+/**
  * Per-dt-object operations.
  */
 struct dt_object_operations {
@@ -297,6 +339,7 @@ struct dt_object_operations {
         int   (*do_create)(const struct lu_env *env, struct dt_object *dt,
                            struct lu_attr *attr,
                            struct dt_allocation_hint *hint,
+                           struct dt_object_format *dof,
                            struct thandle *th);
 
         /**
@@ -397,7 +440,7 @@ struct dt_index_operations {
                  * precondition: dt_object_exists(dt);
                  */
                 struct dt_it *(*init)(const struct lu_env *env,
-                                      struct dt_object *dt, int writable,
+                                      struct dt_object *dt,
                                       struct lustre_capa *capa);
                 void          (*fini)(const struct lu_env *env,
                                       struct dt_it *di);
@@ -406,8 +449,6 @@ struct dt_index_operations {
                                       const struct dt_key *key);
                 void           (*put)(const struct lu_env *env,
                                       struct dt_it *di);
-                int            (*del)(const struct lu_env *env,
-                                      struct dt_it *di, struct thandle *th);
                 int           (*next)(const struct lu_env *env,
                                       struct dt_it *di);
                 struct dt_key *(*key)(const struct lu_env *env,
@@ -536,10 +577,30 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn);
 int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn);
 
 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj);
+
+/**
+ * Callback function used for parsing path.
+ * \see llo_store_resolve
+ */
+typedef int (*dt_entry_func_t)(const struct lu_env *env,
+                            const char *name,
+                            void *pvt);
+
+#define DT_MAX_PATH 1024
+
+int dt_path_parser(const struct lu_env *env,
+                   char *local, dt_entry_func_t entry_func,
+                   void *data);
+
 struct dt_object *dt_store_open(const struct lu_env *env,
-                                struct dt_device *dt, const char *name,
+                                struct dt_device *dt,
+                                const char *dirname,
+                                const char *filename,
                                 struct lu_fid *fid);
 
-/** @} dt */
+struct dt_object *dt_locate(const struct lu_env *env,
+                            struct dt_device *dev,
+                            const struct lu_fid *fid);
 
+/** @} dt */
 #endif /* __LUSTRE_DT_OBJECT_H */
index 26959b5..17576c3 100644 (file)
@@ -96,7 +96,7 @@ struct lvfs_run_ctxt {
 #ifdef __KERNEL__
 
 struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt, 
-                            char *name, int mode, int fix);
+                            const char *name, int mode, int fix);
 struct dentry *simple_mknod(struct dentry *dir, char *name, int mode, int fix);
 int lustre_rename(struct dentry *dir, struct vfsmount *mnt, char *oldname,
                   char *newname);
index 9c2f283..1b00b02 100644 (file)
@@ -156,6 +156,16 @@ struct lu_device_operations {
         int (*ldo_recovery_complete)(const struct lu_env *,
                                      struct lu_device *);
 
+        /**
+         * initialize local objects for device. this method called after layer has
+         * been initialized (after LCFG_SETUP stage) and before it starts serving
+         * user requests.
+         */
+
+        int (*ldo_prepare)(const struct lu_env *,
+                           struct lu_device *parent,
+                           struct lu_device *dev);
+
 };
 
 /**
@@ -1268,8 +1278,8 @@ int lu_site_stats_print(const struct lu_site *s, char *page, int count);
  * Common name structure to be passed around for various name related methods.
  */
 struct lu_name {
-        char    *ln_name;
-        int      ln_namelen;
+        const char    *ln_name;
+        int            ln_namelen;
 };
 
 /**
@@ -1320,5 +1330,4 @@ int  lu_kmem_init(struct lu_kmem_descr *caches);
 void lu_kmem_fini(struct lu_kmem_descr *caches);
 
 /** @} lu */
-
 #endif /* __LUSTRE_LU_OBJECT_H */
index 81fd3b7..59fcca8 100644 (file)
@@ -269,10 +269,24 @@ struct lu_fid {
 };
 
 /**
+ * Following struct for MDT attributes, that will be kept inode's EA.
+ * Introduced in 2.0 release (please see b15993, for details)
+ */
+struct lustre_mdt_attrs {
+        /** FID of this inode */
+        struct lu_fid  lma_self_fid;
+        /** SOM state, mdt/ost type, others */
+        __u64   lma_flags;
+        /** total sectors in objects */
+        __u64   lma_som_sectors;
+};
+
+
+/**
  * fid constants
  */
 enum {
-        /* initial fid id value */
+        /** initial fid id value */
         LUSTRE_FID_INIT_OID  = 1UL
 };
 
index 0e253d0..6e0a0f6 100644 (file)
 
 #define MDT_LOGS_DIR      "LOGS"  /* COMPAT_146 */
 #define MOUNT_CONFIGS_DIR "CONFIGS"
-/* Persistent mount data are stored on the disk in this file. */
-#define MOUNT_DATA_FILE    MOUNT_CONFIGS_DIR"/mountdata"
-#define LAST_RCVD         "last_received"
+#define CONFIGS_FILE      "mountdata"
+/** Persistent mount data are stored on the disk in this file. */
+#define MOUNT_DATA_FILE    MOUNT_CONFIGS_DIR"/"CONFIGS_FILE
+#define LAST_RCVD         "last_rcvd"
 #define LOV_OBJID         "lov_objid"
 #define HEALTH_CHECK      "health_check"
 #define CAPA_KEYS         "capa_keys"
 #define LDD_F_SV_TYPE_MDT   0x0001
 #define LDD_F_SV_TYPE_OST   0x0002
 #define LDD_F_SV_TYPE_MGS   0x0004
-#define LDD_F_NEED_INDEX    0x0010 /* need an index assignment */
-#define LDD_F_VIRGIN        0x0020 /* never registered */
-#define LDD_F_UPDATE        0x0040 /* update the config logs for this server*/
-#define LDD_F_REWRITE_LDD   0x0080 /* rewrite the LDD */
-#define LDD_F_WRITECONF     0x0100 /* regenerate all logs for this fs */
-#define LDD_F_UPGRADE14     0x0200 /* COMPAT_14 */
-#define LDD_F_PARAM         0x0400 /* process as lctl conf_param */
+/** need an index assignment */
+#define LDD_F_NEED_INDEX    0x0010
+/** never registered */
+#define LDD_F_VIRGIN        0x0020
+/** update the config logs for this server*/
+#define LDD_F_UPDATE        0x0040
+/** rewrite the LDD */
+#define LDD_F_REWRITE_LDD   0x0080
+/** regenerate all logs for this fs */
+#define LDD_F_WRITECONF     0x0100
+/** COMPAT_14 */
+#define LDD_F_UPGRADE14     0x0200
+/** process as lctl conf_param */
+#define LDD_F_PARAM         0x0400
+/** backend fs make use of IAM directory format. */
+#define LDD_F_IAM_DIR       0x0800
 
 enum ldd_mount_type {
         LDD_MT_EXT3 = 0,
@@ -196,17 +206,28 @@ struct lustre_mount_data {
 #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
 #endif
 
-/* COMPAT_146 */
-#define OBD_COMPAT_OST          0x00000002 /* this is an OST (temporary) */
-#define OBD_COMPAT_MDT          0x00000004 /* this is an MDT (temporary) */
-/* end COMPAT_146 */
-
-#define OBD_ROCOMPAT_LOVOBJID   0x00000001 /* MDS handles LOV_OBJID file */
-
-#define OBD_INCOMPAT_GROUPS     0x00000001 /* OST handles group subdirs */
-#define OBD_INCOMPAT_OST        0x00000002 /* this is an OST */
-#define OBD_INCOMPAT_MDT        0x00000004 /* this is an MDT */
-#define OBD_INCOMPAT_COMMON_LR  0x00000008 /* common last_rvcd format */
+/** COMPAT_146: this is an OST (temporary) */
+#define OBD_COMPAT_OST          0x00000002
+/** COMPAT_146: this is an MDT (temporary) */
+#define OBD_COMPAT_MDT          0x00000004
+
+/** MDS handles LOV_OBJID file */
+#define OBD_ROCOMPAT_LOVOBJID   0x00000001
+
+/** OST handles group subdirs */
+#define OBD_INCOMPAT_GROUPS     0x00000001
+/** this is an OST */
+#define OBD_INCOMPAT_OST        0x00000002
+/** this is an MDT */
+#define OBD_INCOMPAT_MDT        0x00000004
+/** common last_rvcd format */
+#define OBD_INCOMPAT_COMMON_LR  0x00000008
+/** FID is enabled */
+#define OBD_INCOMPAT_FID        0x00000010
+/**
+ * lustre disk using iam format to store directory entries
+ */
+#define OBD_INCOMPAT_IAM_DIR    0x00000020
 
 
 /* Data stored per server at the head of the last_rcvd file.  In le32 order.
index 7133abd..470feae 100644 (file)
@@ -79,6 +79,38 @@ enum {
         LUSTRE_SEQ_SUPER_WIDTH = (LUSTRE_SEQ_META_WIDTH * LUSTRE_SEQ_META_WIDTH)
 };
 
+/** special fid seq: used for local object create. */
+#define FID_SEQ_LOCAL_FILE      (FID_SEQ_START + 1)
+
+/** special OID for local objects */
+enum {
+        /** \see osd_oi_index_create */
+        OSD_OI_FID_SMALL_OID    = 1UL,
+        OSD_OI_FID_OTHER_OID    = 2UL,
+        /** \see fld_mod_init */
+        FLD_INDEX_OID           = 3UL,
+        /** \see fid_mod_init */
+        FID_SEQ_CTL_OID         = 4UL,
+        FID_SEQ_SRV_OID         = 5UL,
+        /** \see mdd_mod_init */
+        MDD_ROOT_INDEX_OID      = 6UL,
+        MDD_ORPHAN_OID          = 7UL,
+        MDD_LOV_OBJ_OID         = 8UL,
+        MDD_CAPA_KEYS_OID       = 9UL,
+        MDD_OBJECTS_OID         = 10UL,
+        /** \see mdt_mod_init */
+        MDT_LAST_RECV_OID       = 11UL,
+        /** \see osd_mod_init */
+        OSD_REM_OBJ_DIR_OID     = 12UL,
+};
+
+static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
+{
+        fid->f_seq = FID_SEQ_LOCAL_FILE;
+        fid->f_oid = oid;
+        fid->f_ver = 0;
+}
+
 enum lu_mgr_type {
         LUSTRE_SEQ_SERVER,
         LUSTRE_SEQ_CONTROLLER
index a65408f..ec8be4f 100644 (file)
 struct lu_client_fld;
 struct lu_server_fld;
 
+extern const struct dt_index_features fld_index_features;
+extern const char fld_index_name[];
+
+
 struct fld_stats {
         __u64   fst_count;
         __u64   fst_cache;
index ecc92dc..5a20550 100644 (file)
@@ -183,6 +183,9 @@ struct md_op_spec {
 
         /** Check for split */
         int        sp_ck_split;
+
+        /** to create directory */
+        const struct dt_index_features *sp_feat;
 };
 
 /**
@@ -802,6 +805,51 @@ static inline int mdo_rename_tgt(const struct lu_env *env,
         }
 }
 
-/** @} md */
+struct dt_device;
+/**
+ * Structure to hold object information. This is used to create object
+ */
+struct lu_local_obj_desc {
+        const char                      *llod_name;
+        __u32                            llod_oid;
+        int                              llod_is_index;
+        const struct dt_index_features * llod_feat;
+        struct list_head                 llod_linkage;
+};
+
+struct md_object *llo_store_resolve(const struct lu_env *env,
+                                    struct md_device *md,
+                                    struct dt_device *dt,
+                                    const char *path,
+                                    struct lu_fid *fid);
+
+struct md_object *llo_store_open(const struct lu_env *env,
+                                 struct md_device *md,
+                                 struct dt_device *dt,
+                                 const char *dirname,
+                                 const char *objname,
+                                 struct lu_fid *fid);
+
+struct md_object *llo_store_create_index(const struct lu_env *env,
+                                         struct md_device *md,
+                                         struct dt_device *dt,
+                                         const char *dirname,
+                                         const char *objname,
+                                         const struct lu_fid *fid,
+                                         const struct dt_index_features *feat);
+
+struct md_object *llo_store_create(const struct lu_env *env,
+                                   struct md_device *md,
+                                   struct dt_device *dt,
+                                   const char *dirname,
+                                   const char *objname,
+                                   const struct lu_fid *fid);
+
+int llo_local_obj_register(struct lu_local_obj_desc *);
+
+int llo_local_objects_setup(const struct lu_env *env,
+                             struct md_device * md,
+                             struct dt_device * dt);
 
+/** @} md */
 #endif /* _LINUX_MD_OBJECT_H */
index 5dd346e..2d499b6 100644 (file)
@@ -891,11 +891,40 @@ struct target_recovery_data {
 };
 
 enum filter_groups {
+        FILTER_GROUP_MDS0 = 0,
         FILTER_GROUP_LLOG = 1,
-        FILTER_GROUP_ECHO,
-        FILTER_GROUP_MDS0
+        FILTER_GROUP_ECHO = 2 ,
+        FILTER_GROUP_MDS1_N_BASE = 3
 };
 
+static inline __u64 obdo_mdsno(struct obdo *oa)
+{
+        if (oa->o_gr)
+                return oa->o_gr - FILTER_GROUP_MDS1_N_BASE;
+        return 0;
+}
+
+static inline int mdt_to_obd_objgrp(int mdtid)
+{
+        if (mdtid)
+                return FILTER_GROUP_MDS1_N_BASE + mdtid;
+        return 0;
+}
+
+/**
+  * In HEAD for CMD, the object is created in group number which is 3>=
+  * or indexing starts from 3. To test this assertions are added to disallow
+  * group 0. But to run 2.0 mds server on 1.8.x disk format (i.e. interop_mode)
+  * object in group 0 needs to be allowed.
+  * So for interop mode following changes needs to be done:
+  * 1. No need to assert on group 0 or allow group 0
+  * 2. The group number indexing starts from 0 instead of 3
+  */
+
+#define CHECK_MDS_GROUP(group)          (group == FILTER_GROUP_MDS0 || \
+                                         group > FILTER_GROUP_MDS1_N_BASE)
+#define LASSERT_MDS_GROUP(group)        LASSERT(CHECK_MDS_GROUP(group))
+
 struct obd_llog_group {
         struct list_head   olg_list;
         int                olg_group;
@@ -1545,7 +1574,7 @@ static inline void init_obd_quota_ops(quota_interface_t *interface,
 
 static inline __u64 oinfo_mdsno(struct obd_info *oinfo)
 {
-        return oinfo->oi_oa->o_gr - FILTER_GROUP_MDS0;
+        return obdo_mdsno(oinfo->oi_oa);
 }
 
 static inline struct lustre_capa *oinfo_capa(struct obd_info *oinfo)
index 8fe3e6c..385fbd9 100644 (file)
@@ -433,10 +433,15 @@ static inline int obd_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 
         ldt = obd->obd_type->typ_lu;
         if (ldt != NULL) {
+                struct lu_context  session_ctx;
                 struct lu_env env;
+                lu_context_init(&session_ctx, LCT_SESSION);
+                session_ctx.lc_thread = NULL;
+                lu_context_enter(&session_ctx);
 
                 rc = lu_env_init(&env, ldt->ldt_ctx_tags);
                 if (rc == 0) {
+                        env.le_ses = &session_ctx;
                         d = ldt->ldt_ops->ldto_device_alloc(&env, ldt, cfg);
                         lu_env_fini(&env);
                         if (!IS_ERR(d)) {
@@ -446,6 +451,9 @@ static inline int obd_setup(struct obd_device *obd, struct lustre_cfg *cfg)
                         } else
                                 rc = PTR_ERR(d);
                 }
+                lu_context_exit(&session_ctx);
+                lu_context_fini(&session_ctx);
+
         } else {
                 OBD_CHECK_DT_OP(obd, setup, -EOPNOTSUPP);
                 OBD_COUNTER_INCREMENT(obd, setup);
index da3ca51..b4de8d2 100644 (file)
@@ -52,7 +52,6 @@ static inline int lov_mds_md_size(int stripes, int lmm_magic)
                         stripes * sizeof(struct lov_ost_data_v1);
 }
 
-
 #define IOC_LOV_TYPE                   'g'
 #define IOC_LOV_MIN_NR                 50
 #define IOC_LOV_SET_OSC_ACTIVE         _IOWR('g', 50, long)
index 15c4021..eab0e84 100644 (file)
@@ -77,3 +77,15 @@ ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
         ino = ino | 0x80000000;
         RETURN(ino);
 }
+
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi,
+                       struct lu_fid *fid)
+{
+        __u32 gen = 0;
+        ENTRY;
+
+        if (fid_is_igif(fid)) {
+                gen = lu_igif_gen(fid);
+        }
+        RETURN(gen);
+}
index 2c6153b..22b5981 100644 (file)
@@ -964,6 +964,7 @@ int lustre_check_remote_perm(struct inode *inode, int mask);
 
 /* llite/llite_fid.c */
 ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid);
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi, struct lu_fid *fid);
 
 /* llite/llite_capa.c */
 extern cfs_timer_t ll_capa_timer;
index 4542588..a2a6c3e 100644 (file)
@@ -1619,6 +1619,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
         }
 #endif
         inode->i_ino = ll_fid_build_ino(sbi, &body->fid1);
+        inode->i_generation = ll_fid_build_gen(sbi, &body->fid1);
 
         if (body->valid & OBD_MD_FLATIME &&
             body->atime > LTIME_S(inode->i_atime))
index e2835d9..d66a025 100644 (file)
@@ -238,7 +238,9 @@ int ll_setxattr(struct dentry *dentry, const char *name,
                 }
 
                 return rc;
-        }
+
+        } else if (strcmp(name, "trusted.lma") == 0) /* b17288: ignore common_ea */
+                return 0;
 
         return ll_setxattr_common(inode, name, value, size, flags,
                                   OBD_MD_FLXATTR);
index ea90841..7e49a54 100644 (file)
@@ -1692,7 +1692,7 @@ static int lov_change_cbdata(struct obd_export *exp,
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
 
-        LASSERT(lsm->lsm_object_gr > 0);
+        LASSERT_MDS_GROUP(lsm->lsm_object_gr);
 
         lov = &exp->exp_obd->u.lov;
         for (i = 0; i < lsm->lsm_stripe_count; i++) {
@@ -1730,7 +1730,7 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
 
-        LASSERT(lsm->lsm_object_gr > 0);
+        LASSERT_MDS_GROUP(lsm->lsm_object_gr);
         LASSERT(lockh);
         lov = &exp->exp_obd->u.lov;
         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
@@ -1786,7 +1786,7 @@ static int lov_cancel_unused(struct obd_export *exp,
 
         ASSERT_LSM_MAGIC(lsm);
 
-        LASSERT(lsm->lsm_object_gr > 0);
+        LASSERT_MDS_GROUP(lsm->lsm_object_gr);
         for (i = 0; i < lsm->lsm_stripe_count; i++) {
                 struct lov_stripe_md submd;
                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
index 176968f..2c8c0ad 100644 (file)
@@ -1198,8 +1198,11 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
-                                || req->rq_oi.oi_oa->o_gr>0);
+                LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
+                         CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
+                         "req->rq_oi.oi_oa->o_valid="LPX64" "
+                         "req->rq_oi.oi_oa->o_gr="LPU64"\n",
+                         req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
index f855ca5..5d07875 100644 (file)
@@ -230,8 +230,8 @@ out_up:
 EXPORT_SYMBOL(simple_mknod);
 
 /* utility to make a directory */
-struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt,
-                            char *name, int mode, int fix)
+struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt, 
+                            const char *name, int mode, int fix)
 {
         struct dentry *dchild;
         int err = 0;
index 26a905f..3e7f17a 100644 (file)
@@ -53,6 +53,8 @@
 #include <obd_support.h>
 #include <lprocfs_status.h>
 
+#include <lustre_disk.h>
+#include <lustre_fid.h>
 #include <linux/ldiskfs_fs.h>
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
@@ -62,7 +64,8 @@
 
 const struct md_device_operations mdd_ops;
 
-static const char *mdd_root_dir_name = "root";
+static const char mdd_root_dir_name[] = "ROOT";
+
 static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
                            const char *name, struct lu_device *next)
 {
@@ -99,25 +102,6 @@ static struct lu_device *mdd_device_fini(const struct lu_env *env,
         return next;
 }
 
-static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd)
-{
-        int rc;
-        struct dt_object *root;
-        ENTRY;
-
-        dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
-        root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name,
-                             &mdd->mdd_root_fid);
-        if (!IS_ERR(root)) {
-                LASSERT(root != NULL);
-                lu_object_put(env, &root->do_lu);
-                rc = orph_index_init(env, mdd);
-        } else
-                rc = PTR_ERR(root);
-
-        RETURN(rc);
-}
-
 static void mdd_device_shutdown(const struct lu_env *env,
                                 struct mdd_device *m, struct lustre_cfg *cfg)
 {
@@ -162,9 +146,6 @@ static int mdd_process_config(const struct lu_env *env,
                         CERROR("lov init error %d \n", rc);
                         GOTO(out, rc);
                 }
-                rc = mdd_mount(env, m);
-                if (rc)
-                        GOTO(out, rc);
                 rc = mdd_txn_init_credits(env, m);
                 break;
         case LCFG_CLEANUP:
@@ -243,10 +224,39 @@ static int mdd_recovery_complete(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int mdd_prepare(const struct lu_env *env,
+                       struct lu_device *pdev,
+                       struct lu_device *cdev)
+{
+        struct mdd_device *mdd = lu2mdd_dev(cdev);
+        struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
+        struct dt_object *root;
+        int rc;
+
+        ENTRY;
+        rc = next->ld_ops->ldo_prepare(env, cdev, next);
+        if (rc)
+                GOTO(out, rc);
+
+        dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
+        root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name,
+                             &mdd->mdd_root_fid);
+        if (!IS_ERR(root)) {
+                LASSERT(root != NULL);
+                lu_object_put(env, &root->do_lu);
+                rc = orph_index_init(env, mdd);
+        } else
+                rc = PTR_ERR(root);
+
+out:
+        RETURN(rc);
+}
+
 const struct lu_device_operations mdd_lu_ops = {
         .ldo_object_alloc      = mdd_object_alloc,
         .ldo_process_config    = mdd_process_config,
-        .ldo_recovery_complete = mdd_recovery_complete
+        .ldo_recovery_complete = mdd_recovery_complete,
+        .ldo_prepare           = mdd_prepare,
 };
 
 /*
@@ -465,10 +475,35 @@ static void mdd_key_fini(const struct lu_context *ctx,
 /* context key: mdd_thread_key */
 LU_CONTEXT_KEY_DEFINE(mdd, LCT_MD_THREAD);
 
+static struct lu_local_obj_desc llod_capa_key = {
+        .llod_name      = CAPA_KEYS,
+        .llod_oid       = MDD_CAPA_KEYS_OID,
+        .llod_is_index  = 0,
+};
+
+static struct lu_local_obj_desc llod_mdd_orphan = {
+        .llod_name      = orph_index_name,
+        .llod_oid       = MDD_ORPHAN_OID,
+        .llod_is_index  = 1,
+        .llod_feat      = &dt_directory_features,
+};
+
+static struct lu_local_obj_desc llod_mdd_root = {
+        .llod_name      = mdd_root_dir_name,
+        .llod_oid       = MDD_ROOT_INDEX_OID,
+        .llod_is_index  = 1,
+        .llod_feat      = &dt_directory_features,
+};
+
 static int __init mdd_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
         lprocfs_mdd_init_vars(&lvars);
+
+        llo_local_obj_register(&llod_capa_key);
+        llo_local_obj_register(&llod_mdd_orphan);
+        llo_local_obj_register(&llod_mdd_root);
+
         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
                                    LUSTRE_MDD_NAME, &mdd_device_type);
 }
index 7450c1e..7fe28f3 100644 (file)
@@ -75,7 +75,7 @@ static int
 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
                     const struct lu_name *lname, struct lu_fid* fid, int mask)
 {
-        char *name = lname->ln_name;
+        const char *name = lname->ln_name;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct dynlock_handle *dlh;
         int rc;
@@ -232,7 +232,7 @@ static int mdd_dir_is_empty(const struct lu_env *env,
                 RETURN(-ENOTDIR);
 
         iops = &obj->do_index_ops->dio_it;
-        it = iops->init(env, obj, 0, BYPASS_CAPA);
+        it = iops->init(env, obj, BYPASS_CAPA);
         if (it != NULL) {
                 result = iops->get(env, it, (const void *)"");
                 if (result > 0) {
@@ -458,15 +458,6 @@ int mdd_link_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
-                                   const struct lu_fid *fid)
-{
-        struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
-
-        fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
-        return (const struct dt_rec *)pack;
-}
-
 /**
  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
@@ -590,7 +581,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                     struct md_object *src_obj, const struct lu_name *lname,
                     struct md_attr *ma)
 {
-        char *name = lname->ln_name;
+        const char *name = lname->ln_name;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
@@ -682,23 +673,28 @@ int mdd_finish_unlink(const struct lu_env *env,
                       struct thandle *th)
 {
         int rc;
+        int reset = 1;
         ENTRY;
 
         rc = mdd_iattr_get(env, obj, ma);
         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
                 /* add new orphan and the object
-                 * will be deleted during the object_put() */
-                if (__mdd_orphan_add(env, obj, th) == 0)
-                        obj->mod_flags |= ORPHAN_OBJ;
+                 * will be deleted during mdd_close() */
+                if (obj->mod_count) {
+                        rc = __mdd_orphan_add(env, obj, th);
+                        if (rc == 0)
+                                obj->mod_flags |= ORPHAN_OBJ;
+                }
 
                 obj->mod_flags |= DEAD_OBJ;
-                if (obj->mod_count == 0)
+                if (!(obj->mod_flags & ORPHAN_OBJ)) {
                         rc = mdd_object_kill(env, obj, ma);
-                else
-                        /* clear MA_LOV | MA_COOKIE, if we do not
-                         * unlink it in case we get it somewhere */
-                        ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
-        } else
+                        if (rc == 0)
+                                reset = 0;
+                }
+
+        }
+        if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
 
         RETURN(rc);
@@ -723,7 +719,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                       struct md_object *cobj, const struct lu_name *lname,
                       struct md_attr *ma)
 {
-        char *name = lname->ln_name;
+        const char *name = lname->ln_name;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
@@ -850,7 +846,7 @@ static int mdd_name_insert(const struct lu_env *env,
                            const struct lu_fid *fid,
                            const struct md_attr *ma)
 {
-        char *name = lname->ln_name;
+        const char *name = lname->ln_name;
         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct mdd_device *mdd = mdo2mdd(pobj);
@@ -968,7 +964,7 @@ static int mdd_name_remove(const struct lu_env *env,
                            const struct lu_name *lname,
                            const struct md_attr *ma)
 {
-        char *name = lname->ln_name;
+        const char *name = lname->ln_name;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct mdd_device *mdd = mdo2mdd(pobj);
@@ -1074,7 +1070,7 @@ static int mdd_rename_tgt(const struct lu_env *env,
                           const struct lu_fid *lf, const struct lu_name *lname,
                           struct md_attr *ma)
 {
-        char *name = lname->ln_name;
+        const char *name = lname->ln_name;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
@@ -1280,7 +1276,7 @@ static int
 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
              const struct lu_name *lname, struct lu_fid* fid, int mask)
 {
-        char                *name = lname->ln_name;
+        const char          *name = lname->ln_name;
         const struct dt_key *key = (const struct dt_key *)name;
         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
         struct mdd_device   *m = mdo2mdd(pobj);
@@ -1315,8 +1311,10 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                 rc = dir->do_index_ops->dio_lookup(env, dir,
                                                  (struct dt_rec *)pack, key,
                                                  mdd_object_capa(env, mdd_obj));
-                if (rc == 0)
+                if (rc > 0)
                         rc = fid_unpack(pack, fid);
+                else if (rc == 0)
+                        rc = -ENOENT;
         } else
                 rc = -ENOTDIR;
 
@@ -1325,7 +1323,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
 
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                           struct mdd_object *child, struct md_attr *ma,
-                          struct thandle *handle)
+                          struct thandle *handle, const struct md_op_spec *spec)
 {
         int rc;
         ENTRY;
@@ -1469,7 +1467,7 @@ static int mdd_create(const struct lu_env *env,
         struct lov_mds_md      *lmm = NULL;
         struct thandle         *handle;
         struct dynlock_handle  *dlh;
-        char                   *name = lname->ln_name;
+        const char             *name = lname->ln_name;
         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
         int got_def_acl = 0;
 #ifdef HAVE_QUOTA_SUPPORT
@@ -1604,7 +1602,7 @@ static int mdd_create(const struct lu_env *env,
                 GOTO(out_trans, rc = -ENOMEM);
 
         mdd_write_lock(env, son, MOR_TGT_CHILD);
-        rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
+        rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec);
         if (rc) {
                 mdd_write_unlock(env, son);
                 GOTO(cleanup, rc);
@@ -1629,7 +1627,7 @@ static int mdd_create(const struct lu_env *env,
 #endif
 
         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
-                                   son, ma, handle);
+                                   son, ma, handle, spec);
         mdd_write_unlock(env, son);
         if (rc)
                 /*
@@ -1836,8 +1834,8 @@ static int mdd_rename(const struct lu_env *env,
                       struct md_object *tobj, const struct lu_name *ltname,
                       struct md_attr *ma)
 {
-        char *sname = lsname->ln_name;
-        char *tname = ltname->ln_name;
+        const char *sname = lsname->ln_name;
+        const char *tname = ltname->ln_name;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
@@ -1846,6 +1844,10 @@ static int mdd_rename(const struct lu_env *env,
         struct mdd_object *mdd_tobj = NULL;
         struct dynlock_handle *sdlh, *tdlh;
         struct thandle *handle;
+        const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
+        int is_dir;
+        int rc;
+
 #ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdd->mdd_obd_dev;
         struct mds_obd *mds = &obd->u.mds;
@@ -1854,7 +1856,6 @@ static int mdd_rename(const struct lu_env *env,
         unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
         int quota_opc = 0, rec_pending = 0;
 #endif
-        int rc, is_dir;
         ENTRY;
 
         LASSERT(ma->ma_attr.la_mode & S_IFMT);
@@ -1928,6 +1929,20 @@ static int mdd_rename(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
+        /* "mv dir1 dir2" needs "dir1/.." link update */
+        if (is_dir) {
+                rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle,
+                                        mdd_object_capa(env, mdd_spobj));
+                if (rc)
+                       GOTO(cleanup, rc);
+
+                rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot,
+                                        is_dir, handle,
+                                        mdd_object_capa(env, mdd_tpobj));
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
+
         /*
          * Here tobj can be remote one, so we do index_delete unconditionally
          * and -ENOENT is allowed.
index 16de10a..fce9bda 100644 (file)
@@ -135,13 +135,6 @@ struct mdd_object {
 #endif
 };
 
-struct orph_key {
-        /* fid of the object*/
-        struct lu_fid ok_fid;
-        /* type of operation: unlink, truncate */
-        __u32         ok_op;
-} __attribute__((packed));
-
 struct mdd_thread_info {
         struct txn_param          mti_param;
         struct lu_fid             mti_fid;
@@ -149,7 +142,7 @@ struct mdd_thread_info {
         struct md_attr            mti_ma;
         struct lu_attr            mti_la_for_fix;
         struct obd_info           mti_oi;
-        struct orph_key           mti_orph_key;
+        char                      mti_orph_key[NAME_MAX + 1];
         struct obd_trans_info     mti_oti;
         struct lu_buf             mti_buf;
         struct obdo               mti_oa;
@@ -161,9 +154,14 @@ struct mdd_thread_info {
         int                       mti_max_lmm_size;
         struct llog_cookie       *mti_max_cookie;
         int                       mti_max_cookie_size;
+        struct dt_object_format   mti_dof;
         struct obd_quotactl       mti_oqctl;
 };
 
+extern const char orph_index_name[];
+
+extern const struct dt_index_features orph_index_features;
+
 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
                                    struct mdd_device *mdd);
 
@@ -214,7 +212,8 @@ int mdd_attr_get_internal_locked(const struct lu_env *env,
                                  struct md_attr *ma);
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                                struct mdd_object *c, struct md_attr *ma,
-                               struct thandle *handle);
+                               struct thandle *handle,
+                               const struct md_op_spec *spec);
 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
                                        struct mdd_object *obj,
                                        struct lu_attr *attr,
@@ -262,7 +261,7 @@ int mdd_finish_unlink(const struct lu_env *env, struct mdd_object *obj,
                       struct md_attr *ma, struct thandle *th);
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                           struct mdd_object *child, struct md_attr *ma,
-                          struct thandle *handle);
+                          struct thandle *handle, const struct md_op_spec *spec);
 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
                           const struct lu_name *lname, struct mdd_object *src_obj);
 /* mdd_lov.c */
@@ -348,6 +347,9 @@ int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
 int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
                                 struct md_attr *ma, enum mdd_txn_op);
 
+int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
+                    struct mdd_object *obj, struct lu_attr *la);
+
 static inline void mdd_object_put(const struct lu_env *env,
                                   struct mdd_object *o)
 {
@@ -475,6 +477,15 @@ static inline const struct lu_fid *mdo2fid(const struct mdd_object *obj)
         return lu_object_fid(&obj->mod_obj.mo_lu);
 }
 
+static inline const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
+                                                 const struct lu_fid *fid)
+{
+        struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
+
+        fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
+        return (const struct dt_rec *)pack;
+}
+
 static inline umode_t mdd_object_type(const struct mdd_object *obj)
 {
         return lu_object_attr(&obj->mod_obj.mo_lu);
@@ -658,10 +669,11 @@ static inline
 int mdo_create_obj(const struct lu_env *env, struct mdd_object *o,
                    struct lu_attr *attr,
                    struct dt_allocation_hint *hint,
+                   struct dt_object_format *dof,
                    struct thandle *handle)
 {
         struct dt_object *next = mdd_object_child(o);
-        return next->do_ops->do_create(env, next, attr, hint, handle);
+        return next->do_ops->do_create(env, next, attr, hint, dof, handle);
 }
 
 static inline struct obd_capa *mdo_capa_get(const struct lu_env *env,
index 5e5bd18..8b6ce98 100644 (file)
@@ -409,6 +409,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
 {
         struct obd_device     *obd = mdd2obd_dev(mdd);
         struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
+        struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
         struct obdo           *oa;
         struct lov_stripe_md  *lsm = NULL;
         const void            *eadata = spec->u.sp_ea.eadata;
@@ -437,8 +438,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
 
         oa->o_uid = 0; /* must have 0 uid / gid on OST */
         oa->o_gid = 0;
-        oa->o_gr = FILTER_GROUP_MDS0 +
-                lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
+        oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
         oa->o_mode = S_IFREG | 0600;
         oa->o_id = mdd_lov_create_id(mdd_object_fid(child));
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
@@ -484,7 +484,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                         }
                         GOTO(out_oti, rc);
                 }
-                LASSERT(lsm->lsm_object_gr >= FILTER_GROUP_MDS0);
+                LASSERT_MDS_GROUP(lsm->lsm_object_gr);
         } else {
                 LASSERT(eadata != NULL);
                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
@@ -558,6 +558,111 @@ out_ids:
         return rc;
 }
 
+/*
+ * used when destroying orphans and from mds_reint_unlink() when MDS wants to
+ * destroy objects on OSS.
+ */
+static
+int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
+                      struct mdd_object *obj, struct lu_attr *la,
+                      struct lov_mds_md *lmm, int lmm_size,
+                      struct llog_cookie *logcookies,
+                      int log_unlink)
+{
+        struct obd_device     *obd = mdd2obd_dev(mdd);
+        struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
+        struct lov_stripe_md  *lsm = NULL;
+        struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
+        struct obdo           *oa = &mdd_env_info(env)->mti_oa;
+        struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
+        int rc;
+        ENTRY;
+
+        if (lmm_size == 0)
+                RETURN(0);
+
+        rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size);
+        if (rc < 0) {
+                CERROR("Error unpack md %p\n", lmm);
+                RETURN(rc);
+        } else {
+                LASSERT(rc >= sizeof(*lsm));
+                rc = 0;
+        }
+
+        oa->o_id = lsm->lsm_object_id;
+        oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
+        oa->o_mode = la->la_mode & S_IFMT;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
+
+        oti_init(oti, NULL);
+        if (log_unlink && logcookies) {
+                oa->o_valid |= OBD_MD_FLCOOKIE;
+                oti->oti_logcookies = logcookies;
+        }
+
+        CDEBUG(D_INFO, "destroying OSS object %d/%d\n",
+                        (int)oa->o_id, (int)oa->o_gr);
+
+        rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, BYPASS_CAPA);
+
+        obd_free_memmd(lov_exp, &lsm);
+        RETURN(rc);
+}
+
+/*
+ * called with obj not locked. 
+ */
+
+int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
+                    struct mdd_object *obj, struct lu_attr *la)
+{
+        struct md_attr    *ma = &mdd_env_info(env)->mti_ma;
+        int                rc;
+        ENTRY;
+
+        if (unlikely(la->la_nlink != 0)) {
+                CWARN("Attempt to destroy OSS object when nlink == %d\n",
+                      la->la_nlink);
+                RETURN(0);
+        }
+
+        ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
+        ma->ma_lmm = mdd_max_lmm_get(env, mdd);
+        ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
+        ma->ma_cookie = mdd_max_cookie_get(env, mdd);
+        if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
+                RETURN(rc = -ENOMEM);
+
+        /* get lov ea */
+
+        rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
+                               MDS_LOV_MD_NAME);
+
+        if (rc <= 0) {
+                CWARN("Get lov ea failed for "DFID" rc = %d\n",
+                         PFID(mdo2fid(obj)), rc);
+                if (rc == 0)
+                        rc = -ENOENT;
+                RETURN(rc);
+        }
+
+        ma->ma_valid = MA_LOV;
+        
+        rc = mdd_unlink_log(env, mdd, obj, ma);
+        if (rc) {
+                CWARN("mds unlink log for "DFID" failed: %d\n",
+                       PFID(mdo2fid(obj)), rc);
+                RETURN(rc);
+        }
+
+        if (ma->ma_valid & MA_COOKIE)
+                rc = mdd_lovobj_unlink(env, mdd, obj, la,
+                                       ma->ma_lmm, ma->ma_lmm_size,
+                                       ma->ma_cookie, 1);
+        RETURN(rc);
+}
+
 int mdd_log_op_unlink(struct obd_device *obd,
                       struct lov_mds_md *lmm, int lmm_size,
                       struct llog_cookie *logcookies, int cookies_size)
index c13cdd6..d7a9969 100644 (file)
@@ -211,38 +211,17 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
         OBD_FREE_PTR(mdd);
 }
 
-/* orphan handling is here */
-static void mdd_object_delete(const struct lu_env *env, struct lu_object *o)
+static int mdd_object_print(const struct lu_env *env, void *cookie,
+                            lu_printer_t p, const struct lu_object *o)
 {
-        struct mdd_object *mdd_obj = lu2mdd_obj(o);
-        struct thandle *handle = NULL;
-        ENTRY;
-
-        if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
-                return;
-
-        if (mdd_obj->mod_flags & ORPHAN_OBJ) {
-                mdd_txn_param_build(env, lu2mdd_dev(o->lo_dev),
-                                    MDD_TXN_INDEX_DELETE_OP);
-                handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
-                if (IS_ERR(handle))
-                        CERROR("Cannot get thandle\n");
-                else {
-                        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-                        /* let's remove obj from the orphan list */
-                        __mdd_orphan_del(env, mdd_obj, handle);
-                        mdd_write_unlock(env, mdd_obj);
-                        mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
-                                       0, handle);
-                }
-        }
+        return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
 }
 
 static const struct lu_object_operations mdd_lu_obj_ops = {
-        .loo_object_init    = mdd_object_init,
-        .loo_object_start   = mdd_object_start,
-        .loo_object_free    = mdd_object_free,
-        .loo_object_delete  = mdd_object_delete
+       .loo_object_init    = mdd_object_init,
+       .loo_object_start   = mdd_object_start,
+       .loo_object_free    = mdd_object_free,
+       .loo_object_print   = mdd_object_print,
 };
 
 struct mdd_object *mdd_object_find(const struct lu_env *env,
@@ -486,10 +465,13 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
 
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                                struct mdd_object *c, struct md_attr *ma,
-                               struct thandle *handle)
+                               struct thandle *handle,
+                               const struct md_op_spec *spec)
 {
         struct lu_attr *attr = &ma->ma_attr;
         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
+        struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+        const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
 
@@ -497,11 +479,19 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                 struct dt_object *next = mdd_object_child(c);
                 LASSERT(next);
 
+                if (feat != &dt_directory_features && feat != NULL)
+                        dof->dof_type = DFT_INDEX;
+                else
+                        dof->dof_type = dt_mode_to_dft(attr->la_mode);
+
+                dof->u.dof_idx.di_feat = feat;
+
                 /* @hint will be initialized by underlying device. */
                 next->do_ops->do_ah_init(env, hint,
                                          p ? mdd_object_child(p) : NULL,
                                          attr->la_mode & S_IFMT);
-                rc = mdo_create_obj(env, c, attr, hint, handle);
+
+                rc = mdo_create_obj(env, c, attr, hint, dof, handle);
                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
         } else
                 rc = -EEXIST;
@@ -1222,7 +1212,7 @@ static int mdd_object_create(const struct lu_env *env,
         if (rc)
                 GOTO(unlock, rc);
 
-        rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle);
+        rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
         if (rc)
                 GOTO(unlock, rc);
 
@@ -1262,7 +1252,7 @@ static int mdd_object_create(const struct lu_env *env,
                         pfid = spec->u.sp_ea.fid;
                 }
 #endif
-                rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
+                rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle, spec);
         }
         EXIT;
 unlock:
@@ -1440,6 +1430,7 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
         if (S_ISREG(mdd_object_type(obj))) {
                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
                  * Caller must be ready for that. */
+
                 rc = __mdd_lmm_get(env, obj, ma);
                 if ((ma->ma_valid & MA_LOV))
                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
@@ -1454,9 +1445,11 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
 static int mdd_close(const struct lu_env *env, struct md_object *obj,
                      struct md_attr *ma)
 {
-        int rc;
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct thandle    *handle;
+        int rc;
+        int reset = 1;
+
 #ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
         struct mds_obd *mds = &obd->u.mds;
@@ -1476,19 +1469,30 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         /* release open count */
         mdd_obj->mod_count --;
 
+        if (mdd_obj->mod_count == 0) {
+                /* remove link to object from orphan index */
+                if (mdd_obj->mod_flags & ORPHAN_OBJ)
+                        __mdd_orphan_del(env, mdd_obj, handle);
+        }
+
         rc = mdd_iattr_get(env, mdd_obj, ma);
-        if (rc == 0 && mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
-                rc = mdd_object_kill(env, mdd_obj, ma);
+        if (rc == 0) {
+                if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
+                        rc = mdd_object_kill(env, mdd_obj, ma);
 #ifdef HAVE_QUOTA_SUPPORT
-                if (mds->mds_quota) {
-                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
-                        mdd_quota_wrapper(&ma->ma_attr, qids);
-                }
+                        if (mds->mds_quota) {
+                                quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+                                mdd_quota_wrapper(&ma->ma_attr, qids);
+                        }
 #endif
-        } else {
-                ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+                        if (rc == 0)
+                                reset = 0;
+                }
         }
 
+        if (reset)
+                ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+
         mdd_write_unlock(env, mdd_obj);
         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
 #ifdef HAVE_QUOTA_SUPPORT
@@ -1614,7 +1618,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          * iterate through directory and fill pages from @rdpg
          */
         iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, 0, mdd_object_capa(env, obj));
+        it = iops->init(env, next, mdd_object_capa(env, obj));
         if (IS_ERR(it))
                 return PTR_ERR(it);
 
index 940a4df..e587094 100644 (file)
@@ -38,6 +38,7 @@
  * Orphan handling code
  *
  * Author: Mike Pershin <tappro@clusterfs.com>
+ *         Pravin B Shelar <pravin.shelar@sun.com>
  */
 
 #ifndef EXPORT_SYMTAB
 #include <lustre_fid.h>
 #include "mdd_internal.h"
 
-const char orph_index_name[] = "orphans";
-
-static const struct dt_index_features orph_index_features = {
-        .dif_flags       = DT_IND_UPDATE,
-        .dif_keysize_min = sizeof(struct orph_key),
-        .dif_keysize_max = sizeof(struct orph_key),
-        .dif_recsize_min = sizeof(loff_t),
-        .dif_recsize_max = sizeof(loff_t)
-};
+const char orph_index_name[] = "PENDING";
 
 enum {
         ORPH_OP_UNLINK,
         ORPH_OP_TRUNCATE
 };
 
-static struct orph_key *orph_key_fill(const struct lu_env *env,
-                                      const struct lu_fid *lf, __u32 op)
+#define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
+#define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
+
+static struct dt_key* orph_key_fill(const struct lu_env *env,
+                                    const struct lu_fid *lf, __u32 op)
 {
-        struct orph_key *key = &mdd_env_info(env)->mti_orph_key;
+        char *key = mdd_env_info(env)->mti_orph_key;
+        int rc;
+
         LASSERT(key);
-        fid_cpu_to_be(&key->ok_fid, lf);
-        key->ok_op = cpu_to_be32(op);
-        return key;
+        rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT, fid_seq(lf),
+                      fid_oid(lf), fid_ver(lf), op);
+        if (rc > 0)
+                return (struct dt_key*) key;
+        else
+                return ERR_PTR(rc);
+}
+
+static struct dt_key* orph_key_fill_18(const struct lu_env *env,
+                                       const struct lu_fid *lf)
+{
+        char *key = mdd_env_info(env)->mti_orph_key;
+        int rc;
+
+        LASSERT(key);
+        rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18, fid_seq(lf),
+                      fid_oid(lf));
+        if (rc > 0)
+                return (struct dt_key*) key;
+        else
+                return ERR_PTR(rc);
+}
+
+static int orphan_key_to_fid(char *key, struct lu_fid *lf)
+{
+        int rc = 0;
+        unsigned int op;
+
+        rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT, &lf->f_seq, &lf->f_oid,
+                    &lf->f_ver, &op);
+        if (rc == 4)
+                return 0;
+
+        /* build igif */
+        rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
+                    &lf->f_seq, &lf->f_oid);
+        if (rc == 2) {
+                lf->f_ver = 0;
+                return 0;
+        }
+
+        CERROR("can not parse orphan file name %s\n",key);
+        return -EINVAL;
+}
+
+static inline void mdd_orphan_write_lock(const struct lu_env *env,
+                                    struct mdd_device *mdd)
+{
+
+        struct dt_object        *dor    = mdd->mdd_orphans;
+        dor->do_ops->do_write_lock(env, dor, MOR_TGT_CHILD);
+}
+
+static inline void mdd_orphan_write_unlock(const struct lu_env *env,
+                                           struct mdd_device *mdd)
+{
+
+        struct dt_object        *dor    = mdd->mdd_orphans;
+        dor->do_ops->do_write_unlock(env, dor);
+}
+
+static inline int mdd_orphan_insert_obj(const struct lu_env *env,
+                                        struct mdd_device *mdd,
+                                        struct mdd_object *obj,
+                                        __u32 op,
+                                        struct thandle *th)
+{
+        struct dt_object        *dor    = mdd->mdd_orphans;
+        const struct lu_fid     *lf     = mdo2fid(obj);
+        struct dt_key           *key    = orph_key_fill(env, lf, op);
+        ENTRY;
+
+        return  dor->do_index_ops->dio_insert(env, dor,
+                                              __mdd_fid_rec(env, lf),
+                                              key, th,
+                                              BYPASS_CAPA, 1);
+}
+
+static inline int mdd_orphan_delete_obj(const struct lu_env *env,
+                                        struct mdd_device  *mdd ,
+                                        struct dt_key *key,
+                                        struct thandle *th)
+{
+        struct dt_object        *dor    = mdd->mdd_orphans;
+
+        return  dor->do_index_ops->dio_delete(env, dor,
+                                              key, th,
+                                              BYPASS_CAPA);
 }
 
+static inline void mdd_orphan_ref_add(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct thandle *th)
+{
+        struct dt_object        *dor    = mdd->mdd_orphans;
+        dor->do_ops->do_ref_add(env, dor, th);
+}
+
+static inline void mdd_orphan_ref_del(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct thandle *th)
+{
+        struct dt_object        *dor    = mdd->mdd_orphans;
+        dor->do_ops->do_ref_del(env, dor, th);
+}
+
+
 static int orph_index_insert(const struct lu_env *env,
-                             struct mdd_object *obj, __u32 op,
-                             loff_t *offset, struct thandle *th)
+                             struct mdd_object *obj,
+                             __u32 op,
+                             struct thandle *th)
 {
-        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
-        struct dt_object *dor = mdd->mdd_orphans;
-        struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op);
+        struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
+        struct dt_object        *dor    = mdd->mdd_orphans;
+        const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
+        struct dt_object        *next   = mdd_object_child(obj);
+        const struct dt_key     *dotdot = (const struct dt_key *) "..";
         int rc;
         ENTRY;
 
-        rc = dor->do_index_ops->dio_insert(env, dor, (struct dt_rec *)offset,
-                                           (struct dt_key *)key, th,
-                                           BYPASS_CAPA, 1);
+        mdd_orphan_write_lock(env, mdd);
+
+        rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
+        if (rc)
+                GOTO(out, rc);
+
+        mdo_ref_add(env, obj, th);
+        if (!S_ISDIR(mdd_object_type(obj)))
+                goto out;
+
+        mdo_ref_add(env, obj, th);
+        mdd_orphan_ref_add(env, mdd, th);
+
+        /* try best to fixup directory, dont return errors
+         * from here */
+        if (!dt_try_as_dir(env, next))
+                goto out;
+        next->do_index_ops->dio_delete(env, next,
+                                       dotdot, th, BYPASS_CAPA);
+
+        next->do_index_ops->dio_insert(env, next,
+                                       __mdd_fid_rec(env, lf_dor),
+                                       dotdot, th, BYPASS_CAPA, 1);
+
+out:
+        mdd_orphan_write_unlock(env, mdd);
+
         RETURN(rc);
 }
 
+/**
+ * destroy osd object on mdd and associated ost objects.
+ *
+ * \param obj orphan object
+ * \param mdd used for sending llog msg to osts
+ *
+ * \retval  0   success
+ * \retval -ve  error
+ */
+static int orphan_object_kill(const struct lu_env *env,
+                              struct mdd_object *obj,
+                              struct mdd_device *mdd,
+                              struct thandle *th)
+{
+        struct lu_attr *la = &mdd_env_info(env)->mti_la;
+        int rc;
+
+        /* No need to lock this object as its recovery phase, and
+         * no other thread can access it. But we need to lock it
+         * as its precondition for osd api we using. */
+
+        mdd_write_lock(env, obj, MOR_TGT_CHILD);
+        mdo_ref_del(env, obj, th);
+        if (S_ISDIR(mdd_object_type(obj))) {
+                mdo_ref_del(env, obj, th);
+                mdd_orphan_ref_del(env, mdd, th);
+                mdd_write_unlock(env, obj);
+        } else {
+                /* regular file , cleanup linked ost objects */
+                rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
+                mdd_write_unlock(env, obj);
+                if (rc)
+                        RETURN(rc);
+
+                mdd_lov_destroy(env, mdd, obj, la);
+        }
+        return 0;
+}
+
 static int orph_index_delete(const struct lu_env *env,
-                             struct mdd_object *obj, __u32 op,
+                             struct mdd_object *obj,
+                             __u32 op,
                              struct thandle *th)
 {
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
         struct dt_object *dor = mdd->mdd_orphans;
-        struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op);
+        struct dt_key *key;
         int rc;
+
         ENTRY;
+
         LASSERT(dor);
-        rc = dor->do_index_ops->dio_delete(env, dor,
-                                           (struct dt_key *)key, th,
-                                           BYPASS_CAPA);
-        RETURN(rc);
 
+        key = orph_key_fill(env, mdo2fid(obj), op);
+        mdd_orphan_write_lock(env, mdd);
+
+        rc = mdd_orphan_delete_obj(env, mdd, key, th);
+
+        if (rc == -ENOENT) {
+                key = orph_key_fill_18(env, mdo2fid(obj));
+                rc = mdd_orphan_delete_obj(env, mdd, key, th);
+        }
+
+        if (!rc) {
+                /* lov objects will be destroyed by caller */
+                mdo_ref_del(env, obj, th);
+                if (S_ISDIR(mdd_object_type(obj))) {
+                        mdo_ref_del(env, obj, th);
+                        mdd_orphan_ref_del(env, mdd, th);
+                }
+        } else
+                CERROR("could not delete object: rc = %d\n",rc);
+
+        obj->mod_flags &= ~ORPHAN_OBJ;
+        mdd_orphan_write_unlock(env, mdd);
+        RETURN(rc);
 }
 
-static inline struct orph_key *orph_key_empty(const struct lu_env *env,
-                                              __u32 op)
+
+static int orphan_object_destroy(const struct lu_env *env,
+                                 struct mdd_object *obj,
+                                 struct dt_key *key)
 {
-        struct orph_key *key = &mdd_env_info(env)->mti_orph_key;
-        LASSERT(key);
-        fid_zero(&key->ok_fid);
-        key->ok_op = cpu_to_be32(op);
-        return key;
+        struct thandle *th = NULL;
+        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+        int rc;
+        ENTRY;
+
+        mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
+        th = mdd_trans_start(env, mdd);
+        if (IS_ERR(th)) {
+                CERROR("Cannot get thandle\n");
+                RETURN(-ENOMEM);
+        }
+
+        mdd_orphan_write_lock(env, mdd);
+        rc = mdd_orphan_delete_obj(env, mdd, key, th);
+        if (!rc)
+                orphan_object_kill(env, obj, mdd, th);
+        else
+                CERROR("could not delete object: rc = %d\n",rc);
+
+        mdd_orphan_write_unlock(env, mdd);
+        mdd_trans_stop(env, mdd, 0, th);
+
+        RETURN(rc);
 }
 
-static void orph_key_test_and_del(const struct lu_env *env,
-                                  struct mdd_device *mdd,
-                                  const struct orph_key *key)
+static int orph_key_test_and_del(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct lu_fid *lf,
+                                 struct dt_key *key)
 {
         struct mdd_object *mdo;
+        int rc;
+
+        mdo = mdd_object_find(env, mdd, lf);
 
-        mdo = mdd_object_find(env, mdd, &key->ok_fid);
         if (IS_ERR(mdo))
-                CERROR("Invalid orphan!\n");
-        else {
-                mdd_write_lock(env, mdo, MOR_TGT_CHILD);
-                if (mdo->mod_count == 0) {
-                        /* non-opened orphan, let's delete it */
-                        struct md_attr *ma = &mdd_env_info(env)->mti_ma;
-                        CWARN("Found orphan!\n");
-                        mdd_object_kill(env, mdo, ma);
-                        /* TODO: now handle OST objects */
-                        //mdd_ost_objects_destroy(env, ma);
-                        /* TODO: destroy index entry */
-                }
-                mdd_write_unlock(env, mdo);
-                mdd_object_put(env, mdo);
+                return PTR_ERR(mdo);
+
+        rc = -EBUSY;
+        if (mdo->mod_count == 0) {
+                CWARN("Found orphan!\n");
+                rc = orphan_object_destroy(env, mdo, key);
+        } else {
+                mdo->mod_flags |= ORPHAN_OBJ;
         }
+
+        mdd_object_put(env, mdo);
+        return rc;
 }
 
 static int orph_index_iterate(const struct lu_env *env,
                               struct mdd_device *mdd)
 {
-        struct dt_object *dt_obj = mdd->mdd_orphans;
-        struct dt_it     *it;
+        struct dt_object *dor = mdd->mdd_orphans;
+        char             *mti_key = mdd_env_info(env)->mti_orph_key;
         const struct dt_it_ops *iops;
-        struct orph_key  *key = orph_key_empty(env, 0);
-        int result;
+        struct dt_it     *it;
+        char             *key;
+        struct lu_fid     fid;
+        int               result = 0;
+        int               key_sz = 0;
+        int               rc;
+        __u64             cookie;
         ENTRY;
 
-        iops = &dt_obj->do_index_ops->dio_it;
-        it = iops->init(env, dt_obj, 1, BYPASS_CAPA);
+        /* In recovery phase, do not need for any lock here */
+
+        iops = &dor->do_index_ops->dio_it;
+        it = iops->init(env, dor, BYPASS_CAPA);
         if (it != NULL) {
-                result = iops->get(env, it, (const void *)key);
+                result = iops->get(env, it, (const void *)"");
                 if (result > 0) {
-                        int i;
                         /* main cycle */
-                        for (result = 0, i = 0; result == +1; ++i) {
+                        do {
+
                                 key = (void *)iops->key(env, it);
-                                fid_be_to_cpu(&key->ok_fid, &key->ok_fid);
-                                orph_key_test_and_del(env, mdd, key);
+                                if (IS_ERR(key))
+                                        goto next;
+                                key_sz = iops->key_size(env, it);
+
+                                /* filter out "." and ".." entries from
+                                 * PENDING dir. */
+                                if (key_sz < 8)
+                                        goto next;
+
+                                memcpy(mti_key, key, key_sz);
+                                mti_key[key_sz] = 0;
+
+                                if (orphan_key_to_fid(mti_key, &fid))
+                                        goto next;
+                                if (!fid_is_sane(&fid))
+                                        goto next;
+
+                                /* kill orphan object */
+                                cookie =  iops->store(env, it);
+                                iops->put(env, it);
+                                rc = orph_key_test_and_del(env, mdd, &fid,
+                                                (struct dt_key *)mti_key);
+
+                                /* after index delete reset iterator */
+                                if (!rc)
+                                        result = iops->get(env, it,
+                                                           (const void *)"");
+                                else
+                                        result = iops->load(env, it, cookie);
+next:
                                 result = iops->next(env, it);
-                        }
+                        } while (result == 0);
+                        result = 0;
                 } else if (result == 0)
                         /* Index contains no zero key? */
                         result = -EIO;
-
                 iops->put(env, it);
                 iops->fini(env, it);
         } else
@@ -184,17 +427,17 @@ int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
 {
         struct lu_fid fid;
         struct dt_object *d;
-        int rc;
+        int rc = 0;
         ENTRY;
 
-        d = dt_store_open(env, mdd->mdd_child, orph_index_name, &fid);
+        d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
         if (!IS_ERR(d)) {
                 mdd->mdd_orphans = d;
-                rc = d->do_ops->do_index_try(env, d, &orph_index_features);
-                if (rc == 0)
-                        LASSERT(d->do_index_ops != NULL);
-                else
-                        CERROR("\"%s\" is not an index!\n", orph_index_name);
+                if (!dt_try_as_dir(env, d)) {
+                        rc = -ENOTDIR;
+                        CERROR("\"%s\" is not an index! : rc = %d\n",
+                                        orph_index_name, rc);
+                }
         } else {
                 CERROR("cannot find \"%s\" obj %d\n",
                        orph_index_name, (int)PTR_ERR(d));
@@ -214,18 +457,45 @@ void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
         EXIT;
 }
 
+/**
+ *  Iterate orphan index to cleanup orphan objects in case of recovery.
+ *  \param d   mdd device in recovery.
+ *
+ */
+
 int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
 {
         return orph_index_iterate(env, d);
 }
 
+/**
+ *  delete an orphan \a obj from orphan index.
+ *  \param obj file or directory.
+ *  \param th  transaction for index insert.
+ *
+ *  \pre obj nlink == 0 && obj->mod_count != 0
+ *
+ *  \retval 0  success
+ *  \retva  -ve index operation error.
+ */
+
 int __mdd_orphan_add(const struct lu_env *env,
                      struct mdd_object *obj, struct thandle *th)
 {
-        loff_t offset = 0;
-        return orph_index_insert(env, obj, ORPH_OP_UNLINK, &offset, th);
+        return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
 }
 
+/**
+ *  delete an orphan \a obj from orphan index.
+ *  \param obj file or directory.
+ *  \param th  transaction for index deletion and object destruction.
+ *
+ *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
+ *
+ *  \retval 0  success
+ *  \retva  -ve index operation error.
+ */
+
 int __mdd_orphan_del(const struct lu_env *env,
                      struct mdd_object *obj, struct thandle *th)
 {
index 2c0a827..947ef75 100644 (file)
@@ -201,18 +201,28 @@ int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd)
                                 *c = dt[DTO_INDEX_INSERT];
                                 break;
                         case MDD_TXN_UNLINK_OP:
-                                /* delete index + Unlink log */
-                                *c = dt[DTO_INDEX_DELETE];
+                                /* delete index + Unlink log +
+                                 * mdd orphan handling */
+                                *c = dt[DTO_INDEX_DELETE] +
+                                        dt[DTO_INDEX_DELETE] +
+                                        dt[DTO_INDEX_INSERT] * 2 +
+                                        dt[DTO_XATTR_SET] * 3;
                                 break;
                         case MDD_TXN_RENAME_OP:
                                 /* 2 delete index + 1 insert + Unlink log */
                                 *c = 2 * dt[DTO_INDEX_DELETE] +
-                                         dt[DTO_INDEX_INSERT];
+                                        dt[DTO_INDEX_INSERT] +
+                                        dt[DTO_INDEX_DELETE] +
+                                        dt[DTO_INDEX_INSERT] * 2 +
+                                        dt[DTO_XATTR_SET] * 3;
                                 break;
                         case MDD_TXN_RENAME_TGT_OP:
                                 /* index insert + index delete */
                                 *c = dt[DTO_INDEX_DELETE] +
-                                     dt[DTO_INDEX_INSERT];
+                                        dt[DTO_INDEX_INSERT] +
+                                        dt[DTO_INDEX_DELETE] +
+                                        dt[DTO_INDEX_INSERT] * 2 +
+                                        dt[DTO_XATTR_SET] * 3;
                                 break;
                         case MDD_TXN_CREATE_DATA_OP:
                                 /* same as set xattr(lsm) */
index c888039..a3e34df 100644 (file)
@@ -432,6 +432,9 @@ static int mds_cmd_cleanup(struct obd_device *obd)
                 LCONSOLE_WARN("%s: shutting down for failover; client state "
                               "will be preserved.\n", obd->obd_name);
 
+        if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+                RETURN(0);
+
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         mds_lov_destroy_objids(obd);
index a39e495..ce287c6 100644 (file)
@@ -135,7 +135,7 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
                             handle, 0);
         if (!err) {
-                oa->o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
+                oa->o_gr = mdt_to_obd_objgrp(mds->mds_id);
                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
         } else if (!rc)
                 rc = err;
index 1968b9c..b623979 100644 (file)
@@ -307,7 +307,7 @@ int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
          * objects above this ID, they will be removed. */
         memset(&oa, 0, sizeof(oa));
         oa.o_flags = OBD_FL_DELORPHAN;
-        oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
+        oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
         if (ost_uuid != NULL)
                 oti.oti_ost_uuid = ost_uuid;
@@ -483,7 +483,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
 #endif
         data->ocd_version = LUSTRE_VERSION_CODE;
-        data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
+        data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
         OBD_FREE(data, sizeof(*data));
@@ -633,7 +633,7 @@ static int __mds_lov_synchronize(void *data)
                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
                 GOTO(out, rc);
         }
-        mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
+        mgi.group = mdt_to_obd_objgrp(mds->mds_id);
         mgi.uuid = uuid;
 
         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
index 3f911de..1f03d81 100644 (file)
@@ -60,10 +60,6 @@ static void make_capa_key(struct lustre_capa_key *key,
         ll_get_random_bytes(key->lk_key, sizeof(key->lk_key));
 }
 
-enum {
-        MDT_TXN_CAPA_KEYS_WRITE_CREDITS = 1
-};
-
 static inline void lck_cpu_to_le(struct lustre_capa_key *tgt,
                                  struct lustre_capa_key *src)
 {
@@ -93,8 +89,8 @@ static int write_capa_keys(const struct lu_env *env,
         int i, rc;
 
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-
-        th = mdt_trans_start(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_CREDITS);
+        mdt_trans_credit_init(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_OP);
+        th = mdt_trans_start(env, mdt);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
index 5b9a3d9..5547e62 100644 (file)
@@ -4028,11 +4028,14 @@ out:
 }
 
 static int mdt_stack_init(struct lu_env *env,
-                          struct mdt_device *m, struct lustre_cfg *cfg)
+                          struct mdt_device *m,
+                          struct lustre_cfg *cfg,
+                          struct lustre_mount_info  *lmi)
 {
         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
         struct lu_device  *tmp;
         struct md_device  *md;
+        struct lu_device  *child_lu_dev;
         int rc;
         ENTRY;
 
@@ -4067,7 +4070,15 @@ static int mdt_stack_init(struct lu_env *env,
         /* process setup config */
         tmp = &m->mdt_md_dev.md_lu_dev;
         rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
-        GOTO(out, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        /* initialize local objects */
+        child_lu_dev = &m->mdt_child->md_lu_dev;
+
+        rc = child_lu_dev->ld_ops->ldo_prepare(env,
+                                               &m->mdt_md_dev.md_lu_dev,
+                                               child_lu_dev);
 out:
         /* fini from last known good lu_device */
         if (rc)
@@ -4210,6 +4221,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         const char                *num = lustre_cfg_string(cfg, 2);
         struct lustre_mount_info  *lmi = NULL;
         struct lustre_sb_info     *lsi;
+        struct lustre_disk_data   *ldd;
         struct lu_site            *s;
         struct md_site            *mite;
         const char                *identity_upcall = "NONE";
@@ -4217,6 +4229,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         struct md_device          *next;
 #endif
         int                        rc;
+        int                        node_id;
         ENTRY;
 
         md_device_init(&m->mdt_md_dev, ldt);
@@ -4253,6 +4266,15 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         } else {
                 lsi = s2lsi(lmi->lmi_sb);
                 fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts);
+                server_put_mount_2(dev, lmi->lmi_mnt);
+                /* CMD is supported only in IAM mode */
+                ldd = lsi->lsi_ldd;
+                LASSERT(num);
+                node_id = simple_strtol(num, NULL, 10);
+                if (!(ldd->ldd_flags & LDD_F_IAM_DIR) && node_id) {
+                        CERROR("CMD Operation not allowed in IOP mode\n");
+                        RETURN(-EINVAL);
+                }
         }
 
         rwlock_init(&m->mdt_sptlrpc_lock);
@@ -4305,12 +4327,11 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                                    lprocfs_nid_stats_clear_write, obd, NULL);
 
         /* set server index */
-        LASSERT(num);
-        lu_site2md(s)->ms_node_id = simple_strtol(num, NULL, 10);
+        lu_site2md(s)->ms_node_id = node_id;
 
         /* failover is the default
          * FIXME: we do not failout mds0/mgs, which may cause some problems.
-         * assumed whose ls_node_id == 0 XXX
+         * assumed whose ms_node_id == 0 XXX
          * */
         obd->obd_replayable = 1;
         /* No connection accepted until configurations will finish */
@@ -4325,7 +4346,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         }
 
         /* init the stack */
-        rc = mdt_stack_init((struct lu_env *)env, m, cfg);
+        rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi);
         if (rc) {
                 CERROR("Can't init device stack, rc %d\n", rc);
                 GOTO(err_fini_proc, rc);
@@ -4370,7 +4391,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (rc)
                 GOTO(err_free_ns, rc);
 
-        rc = mdt_fs_setup(env, m, obd);
+        rc = mdt_fs_setup(env, m, obd, lsi);
         if (rc)
                 GOTO(err_capa, rc);
 
@@ -4492,6 +4513,19 @@ static int mdt_process_config(const struct lu_env *env,
                 struct lprocfs_static_vars lvars;
                 struct obd_device *obd = d->ld_obd;
 
+                /*
+                 * For interoperability between 1.8 and 2.0,
+                 * skip old "mdt.group_upcall" param.
+                 */
+                {
+                        char *param = lustre_cfg_string(cfg, 1);
+                        if (param && !strncmp("mdt.group_upcall", param, 16)) {
+                                CWARN("For 1.8 interoperability, skip this"
+                                       " mdt.group_upcall. It is obsolete\n");
+                                break;
+                        }
+                }
+
                 lprocfs_mdt_init_vars(&lvars);
                 rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars,
                                               cfg, obd);
@@ -4583,7 +4617,7 @@ static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
 
 static const struct lu_device_operations mdt_lu_ops = {
         .ldo_object_alloc   = mdt_object_alloc,
-        .ldo_process_config = mdt_process_config
+        .ldo_process_config = mdt_process_config,
 };
 
 static const struct lu_object_operations mdt_obj_ops = {
@@ -5200,11 +5234,19 @@ static struct lu_device_type mdt_device_type = {
         .ldt_ctx_tags = LCT_MD_THREAD
 };
 
+static struct lu_local_obj_desc mdt_last_recv = {
+        .llod_name      = LAST_RCVD,
+        .llod_oid       = MDT_LAST_RECV_OID,
+        .llod_is_index  = 0,
+};
+
 static int __init mdt_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
         int rc;
 
+        llo_local_obj_register(&mdt_last_recv);
+
         mdt_num_threads = MDT_NUM_THREADS;
         lprocfs_mdt_init_vars(&lvars);
         rc = class_register_type(&mdt_obd_device_ops, NULL,
index b5aaecb..ae204a9 100644 (file)
@@ -381,6 +381,12 @@ struct mdt_commit_cb {
         void     *mdt_cb_data;
 };
 
+enum mdt_txn_op {
+        MDT_TXN_CAPA_KEYS_WRITE_OP,
+        MDT_TXN_LAST_RCVD_WRITE_OP,
+};
+
+
 /*
  * Info allocated per-transaction.
  */
@@ -535,7 +541,7 @@ extern void target_recovery_fini(struct obd_device *obd);
 extern void target_recovery_init(struct obd_device *obd,
                                  svc_handler_t handler);
 int mdt_fs_setup(const struct lu_env *, struct mdt_device *,
-                 struct obd_device *);
+                 struct obd_device *, struct lustre_sb_info *lsi);
 void mdt_fs_cleanup(const struct lu_env *, struct mdt_device *);
 
 int mdt_client_del(const struct lu_env *env,
@@ -580,8 +586,12 @@ void mdt_shrink_reply(struct mdt_thread_info *info);
 int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *,
                            const struct md_attr *);
 void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *);
+
+void mdt_trans_credit_init(const struct lu_env *env,
+                           struct mdt_device *mdt,
+                           enum mdt_txn_op op);
 struct thandle* mdt_trans_start(const struct lu_env *env,
-                                struct mdt_device *mdt, int credits);
+                                struct mdt_device *mdt);
 void mdt_trans_stop(const struct lu_env *env,
                     struct mdt_device *mdt, struct thandle *th);
 int mdt_record_write(const struct lu_env *env,
index 916e3e0..48c6af1 100644 (file)
@@ -1003,6 +1003,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                  * not exist.
                  */
                 info->mti_spec.sp_cr_lookup = 0;
+                info->mti_spec.sp_feat = &dt_directory_features;
 
                 result = mdo_create(info->mti_env,
                                     mdt_object_child(parent),
index 1286919..81c9dfa 100644 (file)
@@ -108,21 +108,49 @@ int mdt_record_write(const struct lu_env *env,
                 rc = -EFAULT;
         return rc;
 }
-/* only one record write */
 
-enum {
-        MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3
-};
+static inline int mdt_trans_credit_get(const struct lu_env *env,
+                                       struct mdt_device *mdt,
+                                       enum mdt_txn_op op)
+{
+        struct dt_device *dev = mdt->mdt_bottom;
+        int cr;
+        switch (op) {
+                case MDT_TXN_CAPA_KEYS_WRITE_OP:
+                case MDT_TXN_LAST_RCVD_WRITE_OP:
+                        cr = dev->dd_ops->dt_credit_get(env,
+                                                        dev,
+                                                        DTO_WRITE_BLOCK);
+                break;
+                default:
+                        LBUG();
+        }
+        return cr;
+}
+
+void mdt_trans_credit_init(const struct lu_env *env,
+                           struct mdt_device *mdt,
+                           enum mdt_txn_op op)
+{
+        struct mdt_thread_info *mti;
+        struct txn_param *p;
+        int cr;
+
+        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+        p = &mti->mti_txn_param;
+
+        cr = mdt_trans_credit_get(env, mdt, op);
+        txn_param_init(p, cr);
+}
 
 struct thandle* mdt_trans_start(const struct lu_env *env,
-                                struct mdt_device *mdt, int credits)
+                                struct mdt_device *mdt)
 {
         struct mdt_thread_info *mti;
         struct txn_param *p;
 
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         p = &mti->mti_txn_param;
-        txn_param_init(p, credits);
 
         /* export can require sync operations */
         if (mti->mti_exp != NULL)
@@ -225,7 +253,8 @@ static inline int mdt_last_rcvd_header_write(const struct lu_env *env,
 
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
 
-        th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+        mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+        th = mdt_trans_start(env, mdt);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
@@ -419,7 +448,8 @@ err_client:
 }
 
 static int mdt_server_data_init(const struct lu_env *env,
-                                struct mdt_device *mdt)
+                                struct mdt_device *mdt,
+                                struct lustre_sb_info *lsi)
 {
         struct lr_server_data  *lsd = &mdt->mdt_lsd;
         struct lsd_client_data *lcd = NULL;
@@ -427,6 +457,7 @@ static int mdt_server_data_init(const struct lu_env *env,
         struct mdt_thread_info *mti;
         struct dt_object       *obj;
         struct lu_attr         *la;
+        struct lustre_disk_data  *ldd;
         unsigned long last_rcvd_size;
         __u64 mount_count;
         int rc;
@@ -479,7 +510,13 @@ static int mdt_server_data_init(const struct lu_env *env,
         }
         mount_count = lsd->lsd_mount_count;
 
+        ldd = lsi->lsi_ldd;
+
+        if (ldd->ldd_flags & LDD_F_IAM_DIR)
+                lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR;
+
         lsd->lsd_feature_compat = OBD_COMPAT_MDT;
+        lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
 
         spin_lock(&mdt->mdt_transno_lock);
         mdt->mdt_last_transno = lsd->lsd_last_transno;
@@ -616,7 +653,8 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
         /* write new client data */
         off = med->med_lr_off;
-        th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+        mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+        th = mdt_trans_start(env, mdt);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
@@ -739,7 +777,8 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
          * mdt->mdt_last_rcvd may be NULL that time.
          */
         if (mdt->mdt_last_rcvd != NULL) {
-                th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+                mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+                th = mdt_trans_start(env, mdt);
                 if (IS_ERR(th))
                         GOTO(free, rc = PTR_ERR(th));
 
@@ -847,7 +886,10 @@ extern struct lu_context_key mdt_thread_key;
 static int mdt_txn_start_cb(const struct lu_env *env,
                             struct txn_param *param, void *cookie)
 {
-        param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS;
+        struct mdt_device *mdt = cookie;
+
+        param->tp_credits += mdt_trans_credit_get(env, mdt,
+                                                  MDT_TXN_LAST_RCVD_WRITE_OP);
         return 0;
 }
 
@@ -946,7 +988,8 @@ static int mdt_txn_commit_cb(const struct lu_env *env,
 }
 
 int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
-                 struct obd_device *obd)
+                 struct obd_device *obd,
+                 struct lustre_sb_info *lsi)
 {
         struct lu_fid fid;
         struct dt_object *o;
@@ -965,10 +1008,10 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
 
         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
 
-        o = dt_store_open(env, mdt->mdt_bottom, LAST_RCVD, &fid);
+        o = dt_store_open(env, mdt->mdt_bottom, "", LAST_RCVD, &fid);
         if (!IS_ERR(o)) {
                 mdt->mdt_last_rcvd = o;
-                rc = mdt_server_data_init(env, mdt);
+                rc = mdt_server_data_init(env, mdt, lsi);
                 if (rc)
                         GOTO(put_last_rcvd, rc);
         } else {
@@ -977,7 +1020,7 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
                 RETURN(rc);
         }
 
-        o = dt_store_open(env, mdt->mdt_bottom, CAPA_KEYS, &fid);
+        o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid);
         if (!IS_ERR(o)) {
                 mdt->mdt_ck_obj = o;
                 rc = mdt_capa_keys_init(env, mdt);
index 2fb2fde..4de1f39 100644 (file)
@@ -145,6 +145,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
                  * or not.
                  */
                 info->mti_spec.sp_cr_lookup = 1;
+                info->mti_spec.sp_feat = &dt_directory_features;
 
                 lname = mdt_name(info->mti_env, (char *)rr->rr_name,
                                  rr->rr_namelen);
index 1bbd3c3..2c7f0d2 100644 (file)
@@ -27,6 +27,7 @@ obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o mea.o
 obdclass-all-objs += lu_object.o dt_object.o hash.o capa.o lu_time.o
 obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o
 obdclass-all-objs += acl.o idmap.o
+obdclass-all-objs += md_local_object.o
 
 obdclass-objs := $(obdclass-linux-objs) $(obdclass-all-objs)
 
index 2a99005..a4bbb9a 100644 (file)
 /* fid_be_to_cpu() */
 #include <lustre_fid.h>
 
+struct dt_find_hint {
+        struct lu_fid        *dfh_fid;
+        struct dt_device     *dfh_dt;
+        struct dt_object     *dfh_o;
+};
+
+struct dt_thread_info {
+        char                    dti_buf[DT_MAX_PATH];
+        struct lu_fid_pack      dti_pack;
+        struct dt_find_hint     dti_dfh;
+};
+
+/* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
+LU_KEY_INIT(dt_global, struct dt_thread_info);
+LU_KEY_FINI(dt_global, struct dt_thread_info);
+
+static struct lu_context_key dt_key = {
+        .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
+        .lct_init = dt_global_key_init,
+        .lct_fini = dt_global_key_fini
+};
+
 /* no lock is necessary to protect the list, because call-backs
  * are added during system startup. Please refer to "struct dt_device".
  */
@@ -157,13 +179,44 @@ int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj)
 }
 EXPORT_SYMBOL(dt_try_as_dir);
 
-extern struct lu_context_key lu_global_key;
+enum dt_format_type dt_mode_to_dft(__u32 mode)
+{
+        enum dt_format_type result;
+
+        switch (mode & S_IFMT) {
+        case S_IFDIR:
+                result = DFT_DIR;
+                break;
+        case S_IFREG:
+                result = DFT_REGULAR;
+                break;
+        case S_IFLNK:
+                result = DFT_SYM;
+                break;
+        case S_IFCHR:
+        case S_IFBLK:
+        case S_IFIFO:
+        case S_IFSOCK:
+                result = DFT_NODE;
+                break;
+        default:
+                LBUG();
+                break;
+        }
+        return result;
+}
+
+EXPORT_SYMBOL(dt_mode_to_dft);
+/**
+ * lookup fid for object named \a name in directory \a dir.
+ */
 
 static int dt_lookup(const struct lu_env *env, struct dt_object *dir,
                      const char *name, struct lu_fid *fid)
 {
-        struct lu_fid_pack  *pack = lu_context_key_get(&env->le_ctx,
-                                                       &lu_global_key);
+        struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
+                                                         &dt_key);
+        struct lu_fid_pack *pack = &info->dti_pack;
         struct dt_rec       *rec = (struct dt_rec *)pack;
         const struct dt_key *key = (const struct dt_key *)name;
         int result;
@@ -171,16 +224,21 @@ static int dt_lookup(const struct lu_env *env, struct dt_object *dir,
         if (dt_try_as_dir(env, dir)) {
                 result = dir->do_index_ops->dio_lookup(env, dir, rec, key,
                                                        BYPASS_CAPA);
-                if (result == 0)
+                if (result > 0)
                         result = fid_unpack(pack, fid);
+                else if (result == 0)
+                        result = -ENOENT;
         } else
                 result = -ENOTDIR;
         return result;
 }
 
-static struct dt_object *dt_locate(const struct lu_env *env,
-                                   struct dt_device *dev,
-                                   const struct lu_fid *fid)
+/**
+ * get object for given \a fid.
+ */
+struct dt_object *dt_locate(const struct lu_env *env,
+                            struct dt_device *dev,
+                            const struct lu_fid *fid)
 {
         struct lu_object *obj;
         struct dt_object *dt;
@@ -191,38 +249,154 @@ static struct dt_object *dt_locate(const struct lu_env *env,
                 LASSERT(obj != NULL);
                 dt = container_of(obj, struct dt_object, do_lu);
         } else
-                dt = (void *)obj;
+                dt = (struct dt_object *)obj;
         return dt;
 }
+EXPORT_SYMBOL(dt_locate);
 
-struct dt_object *dt_store_open(const struct lu_env *env,
-                                struct dt_device *dt, const char *name,
-                                struct lu_fid *fid)
+/**
+ * find a object named \a entry in given \a dfh->dfh_o directory.
+ */
+static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
 {
+        struct dt_find_hint  *dfh = data;
+        struct dt_device     *dt = dfh->dfh_dt;
+        struct lu_fid        *fid = dfh->dfh_fid;
+        struct dt_object     *obj = dfh->dfh_o;
+        int                   result;
+
+        result = dt_lookup(env, obj, entry, fid);
+        lu_object_put(env, &obj->do_lu);
+        if (result == 0) {
+                obj = dt_locate(env, dt, fid);
+                if (IS_ERR(obj))
+                        result = PTR_ERR(obj);
+        }
+        dfh->dfh_o = obj;
+        return result;
+}
+
+/**
+ * Abstract function which parses path name. This function feeds
+ * path component to \a entry_func.
+ */
+int dt_path_parser(const struct lu_env *env,
+                   char *path, dt_entry_func_t entry_func,
+                   void *data)
+{
+        char *e;
+        int rc = 0;
+
+        while (1) {
+                e = strsep(&path, "/");
+                if (e == NULL)
+                        break;
+
+                if (e[0] == 0) {
+                        if (!path || path[0] == '\0')
+                                break;
+                        continue;
+                }
+                rc = entry_func(env, e, data);
+                if (rc)
+                        break;
+        }
+
+        return rc;
+}
+
+static struct dt_object *dt_store_resolve(const struct lu_env *env,
+                                          struct dt_device *dt,
+                                          const char *path,
+                                          struct lu_fid *fid)
+{
+        struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
+                                                         &dt_key);
+        struct dt_find_hint *dfh = &info->dti_dfh;
+        struct dt_object     *obj = dfh->dfh_o;
+        char *local = info->dti_buf;
         int result;
 
-        struct dt_object *root;
-        struct dt_object *child;
+        dfh->dfh_dt = dt;
+        dfh->dfh_fid = fid;
+
+        strncpy(local, path, DT_MAX_PATH);
+        local[DT_MAX_PATH - 1] = '\0';
 
         result = dt->dd_ops->dt_root_get(env, dt, fid);
         if (result == 0) {
-                root = dt_locate(env, dt, fid);
-                if (!IS_ERR(root)) {
-                        result = dt_lookup(env, root, name, fid);
-                        if (result == 0)
-                                child = dt_locate(env, dt, fid);
-                        else
-                                child = ERR_PTR(result);
-                        lu_object_put(env, &root->do_lu);
-                } else {
-                        CERROR("No root\n");
-                        child = (void *)root;
+                obj = dt_locate(env, dt, fid);
+                if (!IS_ERR(obj)) {
+                        dfh->dfh_o = obj;
+                        result = dt_path_parser(env, local, dt_find_entry, dfh);
+                        if (result != 0)
+                                obj = ERR_PTR(result);
                 }
-        } else
-                child = ERR_PTR(result);
-        return child;
+        } else {
+                obj = ERR_PTR(result);
+        }
+        return obj;
+}
+
+static struct dt_object *dt_reg_open(const struct lu_env *env,
+                                     struct dt_device *dt,
+                                     struct dt_object *p,
+                                     const char *name,
+                                     struct lu_fid *fid)
+{
+        struct dt_object *o;
+        int result;
+
+        result = dt_lookup(env, p, name, fid);
+        if (result == 0){
+                o = dt_locate(env, dt, fid);
+        }
+        else
+                o = ERR_PTR(result);
+
+        return o;
+}
+
+/**
+ * Open dt object named \a filename from \a dirname directory.
+ *      \param  dt      dt device
+ *      \param  fid     on success, object fid is stored in *fid
+ */
+struct dt_object *dt_store_open(const struct lu_env *env,
+                                struct dt_device *dt,
+                                const char *dirname,
+                                const char *filename,
+                                struct lu_fid *fid)
+{
+        struct dt_object *file;
+        struct dt_object *dir;
+
+        dir = dt_store_resolve(env, dt, dirname, fid);
+        if (!IS_ERR(dir)) {
+                file = dt_reg_open(env, dt, dir,
+                                   filename, fid);
+                lu_object_put(env, &dir->do_lu);
+        } else {
+                file = dir;
+        }
+        return file;
 }
 EXPORT_SYMBOL(dt_store_open);
 
+/* dt class init function. */
+int dt_global_init(void)
+{
+        int result;
+
+        LU_CONTEXT_KEY_INIT(&dt_key);
+        result = lu_context_key_register(&dt_key);
+        return result;
+}
+
+void dt_global_fini(void)
+{
+        lu_context_key_degister(&dt_key);
+}
+
 const struct dt_index_features dt_directory_features;
 EXPORT_SYMBOL(dt_directory_features);
index 42798fb..01b2d3e 100644 (file)
@@ -1512,6 +1512,12 @@ void cl_global_fini(void);
 int  lu_ref_global_init(void);
 void lu_ref_global_fini(void);
 
+int dt_global_init(void);
+void dt_global_fini(void);
+
+int llo_global_init(void);
+void llo_global_fini(void);
+
 /**
  * Initialization of global lu_* data.
  */
@@ -1549,10 +1555,22 @@ int lu_global_init(void)
                 return -ENOMEM;
 
         result = lu_time_global_init();
-        if (result != 0)
-                return result;
+        if (result)
+                GOTO(out, result);
+
+#ifdef __KERNEL__
+        result = dt_global_init();
+        if (result)
+                GOTO(out, result);
 
-        return cl_global_init();
+        result = llo_global_init();
+        if (result)
+                GOTO(out, result);
+#endif
+        result = cl_global_init();
+out:
+
+        return result;
 }
 
 /**
@@ -1561,6 +1579,10 @@ int lu_global_init(void)
 void lu_global_fini(void)
 {
         cl_global_fini();
+#ifdef __KERNEL__
+        llo_global_fini();
+        dt_global_fini();
+#endif
         lu_time_global_fini();
         if (lu_site_shrinker != NULL) {
                 remove_shrinker(lu_site_shrinker);
@@ -1739,4 +1761,3 @@ void lu_kmem_fini(struct lu_kmem_descr *caches)
         }
 }
 EXPORT_SYMBOL(lu_kmem_fini);
-
diff --git a/lustre/obdclass/md_local_object.c b/lustre/obdclass/md_local_object.c
new file mode 100644 (file)
index 0000000..919c284
--- /dev/null
@@ -0,0 +1,447 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/obdclass/md_local_object.c
+ *
+ * Lustre Local Object create APIs
+ * 'create on first mount' facility. Files registed under llo module will
+ * be created on first mount.
+ *
+ * Author: Pravin Shelar  <pravin.shelar@sun.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#include <obd_support.h>
+#include <lustre_disk.h>
+#include <lustre_fid.h>
+#include <lu_object.h>
+#include <libcfs/list.h>
+#include <md_object.h>
+
+
+/** List head to hold list of objects to be created. */
+static struct list_head llo_lobj_list;
+
+/** Lock to protect list manipulations */
+static struct mutex     llo_lock;
+
+/**
+ * Structure used to maintain state of path parsing.
+ * \see llo_find_entry, llo_store_resolve
+ */
+struct llo_find_hint {
+        struct lu_fid        *lfh_cfid;
+        struct md_device     *lfh_md;
+        struct md_object     *lfh_pobj;
+};
+
+/**
+ * Thread Local storage for this module.
+ */
+struct llo_thread_info {
+        /** buffer to resolve path */
+        char                    lti_buf[DT_MAX_PATH];
+        /** used for path resolve */
+        struct lu_fid           lti_fid;
+        /** used to pass child object fid */
+        struct lu_fid           lti_cfid;
+        struct llo_find_hint    lti_lfh;
+        struct md_op_spec       lti_spc;
+        struct md_attr          lti_ma;
+        struct lu_name          lti_lname;
+};
+
+LU_KEY_INIT(llod_global, struct llo_thread_info);
+LU_KEY_FINI(llod_global, struct llo_thread_info);
+
+static struct lu_context_key llod_key = {
+        .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD,
+        .lct_init = llod_global_key_init,
+        .lct_fini = llod_global_key_fini
+};
+
+static inline struct llo_thread_info * llo_env_info(const struct lu_env *env)
+{
+        return lu_context_key_get(&env->le_ctx,  &llod_key);
+}
+
+/**
+ * Search md object for given fid.
+ */
+static struct md_object *llo_locate(const struct lu_env *env,
+                                    struct md_device *md,
+                                    const struct lu_fid *fid)
+{
+        struct lu_object *obj;
+        struct md_object *mdo;
+
+        obj = lu_object_find(env, &md->md_lu_dev, fid, NULL);
+        if (!IS_ERR(obj)) {
+                obj = lu_object_locate(obj->lo_header, md->md_lu_dev.ld_type);
+                LASSERT(obj != NULL);
+                mdo = (struct md_object *) obj;
+        } else
+                mdo = (struct md_object *)obj;
+        return mdo;
+}
+
+/**
+ * Lookup FID for object named \a name in directory \a pobj.
+ */
+static int llo_lookup(const struct lu_env  *env,
+                      struct md_object *pobj,
+                      const char *name,
+                      struct lu_fid *fid)
+{
+        struct llo_thread_info *info = llo_env_info(env);
+        struct lu_name          *lname = &info->lti_lname;
+        struct md_op_spec       *spec = &info->lti_spc;
+
+        spec->sp_feat = NULL;
+        spec->sp_cr_flags = 0;
+        spec->sp_cr_lookup = 1;
+        spec->sp_cr_mode = 0;
+        spec->sp_ck_split = 0;
+
+        lname->ln_name = name;
+        lname->ln_namelen = strlen(name);
+
+        return mdo_lookup(env, pobj, lname, fid, spec);
+}
+
+/**
+ * Function to look up path component, this is passed to parsing
+ * function. \see llo_store_resolve
+ */
+static int llo_find_entry(const struct lu_env  *env,
+                          const char *name, void *data)
+{
+        struct llo_find_hint    *lfh = data;
+        struct md_device        *md = lfh->lfh_md;
+        struct lu_fid           *fid = lfh->lfh_cfid;
+        struct md_object        *obj = lfh->lfh_pobj;
+        int                     result;
+
+        /* lookup fid for object */
+        result = llo_lookup(env, obj, name, fid);
+        lu_object_put(env, &obj->mo_lu);
+
+        if (result == 0) {
+                /* get md object for fid that we got in lookup */
+                obj = llo_locate(env, md, fid);
+                if (IS_ERR(obj))
+                        result = PTR_ERR(obj);
+        }
+
+        lfh->lfh_pobj = obj;
+        return result;
+}
+
+static struct md_object *llo_reg_open(const struct lu_env *env,
+                                      struct md_device *md,
+                                      struct md_object *p,
+                                      const char *name,
+                                      struct lu_fid *fid)
+{
+        struct md_object *o;
+        int result;
+
+        result = llo_lookup(env, p, name, fid);
+        if (result == 0)
+                o = llo_locate(env, md, fid);
+        else
+                o = ERR_PTR(result);
+
+        return o;
+}
+
+/**
+ * Resolve given \a path, on success function returns
+ * md object for last directory and \a fid points to
+ * its fid.
+ */
+struct md_object *llo_store_resolve(const struct lu_env *env,
+                                    struct md_device *md,
+                                    struct dt_device *dt,
+                                    const char *path,
+                                    struct lu_fid *fid)
+{
+        struct llo_thread_info *info = llo_env_info(env);
+        struct llo_find_hint *lfh = &info->lti_lfh;
+        char *local = info->lti_buf;
+        struct md_object        *obj = lfh->lfh_pobj;
+        int result;
+
+        strncpy(local, path, DT_MAX_PATH);
+        local[DT_MAX_PATH - 1] = '\0';
+
+        lfh->lfh_md = md;
+        lfh->lfh_cfid = fid;
+        /* start path resolution from backend fs root. */
+        result = dt->dd_ops->dt_root_get(env, dt, fid);
+        if (result == 0) {
+                /* get md object for root */
+                obj = llo_locate(env, md, fid);
+                if (!IS_ERR(obj)) {
+                        /* start path parser from root md */
+                        lfh->lfh_pobj = obj;
+                        result = dt_path_parser(env, local, llo_find_entry, lfh);
+                        if (result != 0)
+                                obj = ERR_PTR(result);
+                }
+        } else {
+                obj = ERR_PTR(result);
+        }
+        return obj;
+}
+EXPORT_SYMBOL(llo_store_resolve);
+
+/**
+ * Returns md object for \a objname in given \a dirname.
+ */
+struct md_object *llo_store_open(const struct lu_env *env,
+                                 struct md_device *md,
+                                 struct dt_device *dt,
+                                 const char *dirname,
+                                 const char *objname,
+                                 struct lu_fid *fid)
+{
+        struct md_object *obj;
+        struct md_object *dir;
+
+        /* search md object for parent dir */
+        dir = llo_store_resolve(env, md, dt, dirname, fid);
+        if (!IS_ERR(dir)) {
+                obj = llo_reg_open(env, md, dir, objname, fid);
+                lu_object_put(env, &dir->mo_lu);
+        } else
+                obj = dir;
+
+        return obj;
+}
+EXPORT_SYMBOL(llo_store_open);
+
+static struct md_object *llo_create_obj(const struct lu_env *env,
+                                        struct md_device *md,
+                                        struct md_object *dir,
+                                        const char *objname,
+                                        const struct lu_fid *fid,
+                                        const struct dt_index_features *feat)
+{
+        struct llo_thread_info *info = llo_env_info(env);
+        struct md_object        *mdo;
+        struct md_attr          *ma = &info->lti_ma;
+        struct md_op_spec       *spec = &info->lti_spc;
+        struct lu_name          *lname = &info->lti_lname;
+        struct lu_attr          *la = &ma->ma_attr;
+        int rc;
+
+        mdo = llo_locate(env, md, fid);
+        if (IS_ERR(mdo))
+                return mdo;
+
+        lname->ln_name = objname;
+        lname->ln_namelen = strlen(objname);
+
+        spec->sp_feat = feat;
+        spec->sp_cr_flags = 0;
+        spec->sp_cr_lookup = 1;
+        spec->sp_cr_mode = 0;
+        spec->sp_ck_split = 0;
+
+        if (feat == &dt_directory_features)
+                la->la_mode = S_IFDIR;
+        else
+                la->la_mode = S_IFREG;
+
+        la->la_mode |= S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+        la->la_uid = la->la_gid = 0;
+        la->la_valid = LA_MODE | LA_UID | LA_GID;
+
+        ma->ma_valid = 0;
+        ma->ma_need = 0;
+
+        rc = mdo_create(env, dir, lname, mdo, spec, ma);
+
+        if (rc) {
+                lu_object_put(env, &mdo->mo_lu);
+                mdo = ERR_PTR(rc);
+        }
+
+        return mdo;
+}
+
+/**
+ * Create md object, object could be diretcory or
+ * special index defined by \a feat in \a directory.
+ *
+ *       \param  md       device
+ *       \param  dir      parent directory
+ *       \param  objname  file name
+ *       \param  fid      object fid
+ *       \param  feat     index features required for directory create
+ */
+
+struct md_object *llo_store_create_index(const struct lu_env *env,
+                                         struct md_device *md,
+                                         struct dt_device *dt,
+                                         const char *dirname,
+                                         const char *objname,
+                                         const struct lu_fid *fid,
+                                         const struct dt_index_features *feat)
+{
+        struct llo_thread_info *info = llo_env_info(env);
+        struct md_object *obj;
+        struct md_object *dir;
+        struct lu_fid *ignore = &info->lti_fid;
+
+        dir = llo_store_resolve(env, md, dt, dirname, ignore);
+        if (!IS_ERR(dir)) {
+                obj = llo_create_obj(env, md, dir, objname, fid, feat);
+                lu_object_put(env, &dir->mo_lu);
+        } else {
+                obj = dir;
+        }
+        return obj;
+}
+
+EXPORT_SYMBOL(llo_store_create_index);
+
+/**
+ * Create md object for regular file in \a directory.
+ *
+ *       \param  md       device
+ *       \param  dir      parent directory
+ *       \param  objname  file name
+ *       \param  fid      object fid.
+ */
+
+struct md_object *llo_store_create(const struct lu_env *env,
+                                   struct md_device *md,
+                                   struct dt_device *dt,
+                                   const char *dirname,
+                                   const char *objname,
+                                   const struct lu_fid *fid)
+{
+        return llo_store_create_index(env, md, dt, dirname,
+                                      objname, fid, NULL);
+}
+
+EXPORT_SYMBOL(llo_store_create);
+
+/**
+ * Register object for 'create on first mount' facility.
+ */
+
+int llo_local_obj_register(struct lu_local_obj_desc *llod)
+{
+        mutex_lock(&llo_lock);
+        list_add(&llod->llod_linkage, &llo_lobj_list);
+        mutex_unlock(&llo_lock);
+
+        return 0;
+}
+
+EXPORT_SYMBOL(llo_local_obj_register);
+
+/**
+ * Created registed objects.
+ */
+
+int llo_local_objects_setup(const struct lu_env *env,
+                             struct md_device * md,
+                             struct dt_device *dt)
+{
+        struct llo_thread_info *info = llo_env_info(env);
+        struct lu_fid *fid;
+        struct lu_local_obj_desc *scan;
+        struct md_object *mdo;
+        int rc = 0;
+
+        fid = &info->lti_cfid;
+
+        mutex_lock(&llo_lock);
+
+        list_for_each_entry(scan, &llo_lobj_list, llod_linkage) {
+
+                lu_local_obj_fid(fid, scan->llod_oid);
+
+                if (scan->llod_is_index)
+                        mdo = llo_store_create_index(env, md, dt ,
+                                                     "", scan->llod_name,
+                                                     fid,
+                                                     scan->llod_feat);
+                else
+                        mdo = llo_store_create(env, md, dt,
+                                               "", scan->llod_name,
+                                               fid);
+                if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) {
+                        rc = PTR_ERR(mdo);
+                        CERROR("creating obj [%s] fid = "DFID" rc = %d\n",
+                               scan->llod_name, PFID(fid), rc);
+                        goto out;
+                }
+
+                if (!IS_ERR(mdo))
+                        lu_object_put(env, &mdo->mo_lu);
+        }
+
+out:
+        mutex_unlock(&llo_lock);
+        return rc;
+}
+
+EXPORT_SYMBOL(llo_local_objects_setup);
+
+int llo_global_init(void)
+{
+        int result;
+
+        CFS_INIT_LIST_HEAD(&llo_lobj_list);
+        mutex_init(&llo_lock);
+
+        LU_CONTEXT_KEY_INIT(&llod_key);
+        result = lu_context_key_register(&llod_key);
+        return result;
+}
+
+void llo_global_fini(void)
+{
+        lu_context_key_degister(&llod_key);
+}
index 128343e..ef51c1e 100644 (file)
@@ -1047,6 +1047,29 @@ static int class_config_llog_handler(struct llog_handle * handle,
                         break;
                 }
 
+                /*
+                 * For interoperability between 1.8 and 2.0,
+                 * rename "mds" obd device type to "mdt".
+                 */
+                {
+                        char *typename = lustre_cfg_string(lcfg, 1);
+                        char *index = lustre_cfg_string(lcfg, 2);
+                        
+                        if ((lcfg->lcfg_command == LCFG_ATTACH && typename &&
+                             strcmp(typename, "mds") == 0)) {
+                                CWARN("For 1.8 interoperability, rename obd "
+                                       "type from mds to mdt\n");
+                                typename[2] = 't';
+                        }
+                        if ((lcfg->lcfg_command == LCFG_SETUP && index &&
+                             strcmp(index, "type") == 0)) {
+                                CWARN("For 1.8 interoperability, set this"
+                                       " index to '0'\n");
+                                index[0] = '0';
+                                index[1] = 0;
+                        }
+                }
+
                 if ((clli->cfg_flags & CFG_F_EXCLUDE) &&
                     (lcfg->lcfg_command == LCFG_LOV_ADD_OBD))
                         /* Add inactive instead */
index 639bfbb..2c3d5c1 100644 (file)
@@ -80,7 +80,7 @@
 #include "filter_internal.h"
 
 /* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0
+#define FILTER_MIN_GROUPS FILTER_GROUP_MDS1_N_BASE
 static struct lvfs_callback_ops filter_lvfs_ops;
 cfs_mem_cache_t *ll_fmd_cachep;
 
@@ -955,7 +955,9 @@ static int filter_update_last_group(struct obd_device *obd, int group)
                 CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
                 GOTO(cleanup, rc);
         }
-        LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS);
+        LASSERTF(off == 0 || CHECK_MDS_GROUP(last_group),
+                 "off = %llu and last_group = %d\n", off, last_group);
+
         CDEBUG(D_INODE, "%s: previous %d, new %d\n",
                obd->obd_name, last_group, group);
 
@@ -1145,8 +1147,6 @@ static int filter_read_groups(struct obd_device *obd, int last_group,
         down(&filter->fo_init_lock);
         old_count = filter->fo_group_count;
         for (group = old_count; group <= last_group; group++) {
-                if (group == 0)
-                        continue; /* no group zero */
 
                 rc = filter_read_group_internal(obd, group, create);
                 if (rc != 0)
@@ -1245,7 +1245,7 @@ static int filter_prep_groups(struct obd_device *obd)
         if (off == 0) {
                 last_group = FILTER_MIN_GROUPS;
         } else {
-                LASSERT(last_group >= FILTER_MIN_GROUPS);
+                LASSERT_MDS_GROUP(last_group);
         }
 
         CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
@@ -1369,7 +1369,7 @@ static void filter_post(struct obd_device *obd)
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
-        for (i = 1; i < filter->fo_group_count; i++) {
+        for (i = 0; i < filter->fo_group_count; i++) {
                 rc = filter_update_last_objid(obd, i,
                                 (i == filter->fo_group_count - 1));
                 if (rc)
@@ -1416,7 +1416,6 @@ obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
         spin_lock(&filter->fo_objidlock);
         id = filter->fo_last_objids[group];
         spin_unlock(&filter->fo_objidlock);
-
         return id;
 }
 
@@ -1433,7 +1432,7 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
         struct filter_subdirs *subdirs;
         LASSERT(group < filter->fo_group_count); /* FIXME: object groups */
 
-        if ((group > 0 && group < FILTER_GROUP_MDS0) ||
+        if ((group > FILTER_GROUP_MDS0 && group < FILTER_GROUP_MDS1_N_BASE) ||
              filter->fo_subdir_count == 0)
                 return filter->fo_dentry_O_groups[group];
 
@@ -2770,8 +2769,6 @@ static int filter_connect(const struct lu_env *env,
         }
 
         group = data->ocd_group;
-        if (group == 0)
-                GOTO(cleanup, rc);
 
         CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
               obd->obd_name, exp->exp_handle.h_cookie, group);
@@ -2948,7 +2945,7 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
         struct obd_llog_group *olg_min, *olg;
         struct filter_obd *filter;
-        int worked = 0, group;
+        int worked = -1, group;
         struct llog_ctxt *ctxt;
         ENTRY;
 
@@ -3454,7 +3451,7 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
         ENTRY;
 
         LASSERT(oa);
-        LASSERT(oa->o_gr != 0);
+        LASSERT_MDS_GROUP(oa->o_gr);
         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
         LASSERT(down_trylock(&filter->fo_create_locks[oa->o_gr]) != 0);
 
@@ -3552,8 +3549,8 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa,
                                obd->obd_name);
                         GOTO(out, rc = 0);
                 }
-                /* only precreate if group == 0 and o_id is specified */
-                if (group < FILTER_GROUP_MDS0 || oa->o_id == 0)
+                /* only precreate if group == 0 and o_id is specfied */
+                if (group == FILTER_GROUP_LLOG || oa->o_id == 0)
                         diff = 1;
                 else
                         diff = oa->o_id - filter_last_id(filter, group);
@@ -3832,7 +3829,7 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
         CDEBUG(D_INODE, "%s: filter_create(od->o_gr="LPU64",od->o_id="
                LPU64")\n", obd->obd_name, oa->o_gr, oa->o_id);
 
-        if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
+        if (!(oa->o_valid & OBD_MD_FLGROUP)) {
                 CERROR("!!! nid %s sent invalid object group %d\n",
                         obd_export_nid2str(exp), group);
                 RETURN(-EINVAL);
@@ -4230,13 +4227,32 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen,
         RETURN(-EINVAL);
 }
 
+static inline int filter_setup_llog_group(struct obd_export *exp,
+                                          struct obd_device *obd,
+                                           int group)
+{
+        struct obd_llog_group *olg;
+        struct llog_ctxt *ctxt;
+        int rc;
+
+        olg = filter_find_create_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERTF(ctxt != NULL, "ctxt is null\n");
+
+        rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        llog_ctxt_put(ctxt);
+        return rc;
+}
 static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                                  void *key, __u32 vallen, void *val,
                                  struct ptlrpc_request_set *set)
 {
         struct obd_device *obd;
-        struct obd_llog_group *olg;
-        struct llog_ctxt *ctxt;
         int rc = 0, group;
         ENTRY;
 
@@ -4268,23 +4284,20 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
 
         /* setup llog imports */
         LASSERT(val != NULL);
-        group = (int)(*(__u32 *)val);
-        LASSERT(group >= FILTER_GROUP_MDS0);
-
-        olg = filter_find_create_olg(obd, group);
-        if (IS_ERR(olg))
-                RETURN(PTR_ERR(olg));
-
-        llog_group_set_export(olg, exp);
 
-        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-        LASSERTF(ctxt != NULL, "ctxt is null\n");
-
-        rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
-        llog_ctxt_put(ctxt);
+        group = (int)(*(__u32 *)val);
+        LASSERT_MDS_GROUP(group);
+        rc = filter_setup_llog_group(exp, obd, group);
+        if (rc)
+                goto out;
 
         lquota_setinfo(filter_quota_interface_ref, obd, exp);
 
+        if (group == FILTER_GROUP_MDS0) {
+                /* setup llog group 1 for interop */
+                filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG);
+        }
+out:
         RETURN(rc);
 }
 
index e5db720..28578d6 100644 (file)
@@ -235,12 +235,6 @@ static void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars)
 /* Quota stuff */
 extern quota_interface_t *filter_quota_interface_ref;
 
-/* Capability */
-static inline __u64 obdo_mdsno(struct obdo *oa)
-{
-        return oa->o_gr - FILTER_GROUP_MDS0;
-}
-
 int filter_update_capa_key(struct obd_device *obd, struct lustre_capa_key *key);
 int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid,
                      struct lustre_capa *capa, __u64 opc);
index 7810acc..5e5f1c7 100644 (file)
@@ -103,8 +103,14 @@ static int lprocfs_filter_rd_last_id(char *page, char **start, off_t off,
 
         if (obd == NULL)
                 return 0;
+        rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, 0));
+        if (rc < 0)
+                return rc;
+        page += rc;
+        count -= rc;
+        retval += rc;
 
-        for (i = FILTER_GROUP_MDS0; i < filter->fo_group_count; i++) {
+        for (i = FILTER_GROUP_MDS1_N_BASE + 1; i < filter->fo_group_count; i++) {
                 rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, i));
                 if (rc < 0) {
                         retval = rc;
index b34341b..39cea5a 100644 (file)
@@ -184,7 +184,7 @@ static int oscc_internal_create(struct osc_creator *oscc)
         spin_lock(&oscc->oscc_lock);
         body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
         body->oa.o_gr = oscc->oscc_oa.o_gr;
-        LASSERT(body->oa.o_gr > 0);
+        LASSERT_MDS_GROUP(body->oa.o_gr);
         body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
         spin_unlock(&oscc->oscc_lock);
         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
@@ -317,7 +317,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
 
         LASSERT(oa);
         LASSERT(ea);
-        LASSERT(oa->o_gr > 0);
+        LASSERT_MDS_GROUP(oa->o_gr);
         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 
         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
index 329200b..f36306f 100644 (file)
@@ -96,7 +96,7 @@ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
 
         if (lsm) {
                 LASSERT(lsm->lsm_object_id);
-                LASSERT(lsm->lsm_object_gr);
+                LASSERT_MDS_GROUP(lsm->lsm_object_gr);
                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
         }
@@ -153,7 +153,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
                 LASSERT((*lsmp)->lsm_object_id);
-                LASSERT((*lsmp)->lsm_object_gr);
+                LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
         }
 
         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
@@ -312,8 +312,10 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
         int                    rc;
         ENTRY;
 
-        LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
-                                        oinfo->oi_oa->o_gr > 0);
+        LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
+                 CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
+                 "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
+                 oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
 
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
         if (req == NULL)
@@ -3634,7 +3636,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
 
                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
-                LASSERT(oscc->oscc_oa.o_gr > 0);
+                LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
         }
 
index 57e2f71..1844e6c 100644 (file)
 #include "osd_internal.h"
 #include "osd_igif.h"
 
+/* llo_* api support */
+#include <md_object.h>
+
+static const char MDT_XATTR_NAME[] = "trusted.lma";
+static const char dot[] = ".";
+static const char dotdot[] = "..";
+static const char remote_obj_dir[] = "REM_OBJ_DIR";
+
 struct osd_directory {
         struct iam_container od_container;
         struct iam_descr     od_descr;
@@ -102,6 +110,14 @@ struct osd_object {
         struct osd_directory  *oo_dir;
         /** protects inode attributes. */
         spinlock_t             oo_guard;
+        /**
+         * Following two members are used to indicate the presence of dot and
+         * dotdot in the given directory. This is required for interop mode
+         * (b11826).
+         */
+        int oo_compat_dot_created;
+        int oo_compat_dotdot_created;
+
         const struct lu_env   *oo_owner;
 #ifdef CONFIG_LOCKDEP
         struct lockdep_map     oo_dep_map;
@@ -145,38 +161,60 @@ static int   osd_inode_setattr (const struct lu_env *env,
                                 struct inode *inode, const struct lu_attr *attr);
 static int   osd_param_is_sane (const struct osd_device *dev,
                                 const struct txn_param *param);
-static int   osd_index_lookup  (const struct lu_env *env,
-                                struct dt_object *dt,
-                                struct dt_rec *rec, const struct dt_key *key,
-                                struct lustre_capa *capa);
-static int   osd_index_insert  (const struct lu_env *env,
-                                struct dt_object *dt,
-                                const struct dt_rec *rec,
-                                const struct dt_key *key,
-                                struct thandle *handle,
-                                struct lustre_capa *capa,
-                                int ingore_quota);
-static int   osd_index_delete  (const struct lu_env *env,
-                                struct dt_object *dt, const struct dt_key *key,
-                                struct thandle *handle,
-                                struct lustre_capa *capa);
-static int   osd_index_probe   (const struct lu_env *env,
-                                struct osd_object *o,
-                                const struct dt_index_features *feat);
+static int   osd_index_iam_lookup(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct dt_rec *rec, const struct dt_key *key,
+                                  struct lustre_capa *capa);
+static int   osd_index_ea_lookup(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 struct dt_rec *rec, const struct dt_key *key,
+                                 struct lustre_capa *capa);
+static int   osd_index_iam_insert(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  const struct dt_rec *rec,
+                                  const struct dt_key *key,
+                                  struct thandle *handle,
+                                  struct lustre_capa *capa,
+                                  int ingore_quota);
+static int   osd_index_ea_insert (const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  const struct dt_rec *rec,
+                                  const struct dt_key *key,
+                                  struct thandle *handle,
+                                  struct lustre_capa *capa,
+                                  int ingore_quota);
+static int   osd_index_iam_delete(const struct lu_env *env,
+                                  struct dt_object *dt, const struct dt_key *key,
+                                  struct thandle *handle,
+                                  struct lustre_capa *capa);
+static int   osd_index_ea_delete (const struct lu_env *env,
+                                  struct dt_object *dt, const struct dt_key *key,
+                                  struct thandle *handle,
+                                  struct lustre_capa *capa);
+
+static int   osd_iam_index_probe   (const struct lu_env *env,
+                                    struct osd_object *o,
+                                    const struct dt_index_features *feat);
 static int   osd_index_try     (const struct lu_env *env,
                                 struct dt_object *dt,
                                 const struct dt_index_features *feat);
 static void  osd_index_fini    (struct osd_object *o);
 
-static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
-static int   osd_it_get        (const struct lu_env *env,
+static void  osd_it_iam_fini       (const struct lu_env *env, struct dt_it *di);
+static int   osd_it_iam_get        (const struct lu_env *env,
+                                    struct dt_it *di, const struct dt_key *key);
+static void  osd_it_iam_put        (const struct lu_env *env, struct dt_it *di);
+static int   osd_it_iam_next       (const struct lu_env *env, struct dt_it *di);
+static int   osd_it_iam_key_size   (const struct lu_env *env,
+                                    const struct dt_it *di);
+static void  osd_it_ea_fini    (const struct lu_env *env, struct dt_it *di);
+static int   osd_it_ea_get     (const struct lu_env *env,
                                 struct dt_it *di, const struct dt_key *key);
-static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
-static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
-static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
-                                struct thandle *th);
-static int   osd_it_key_size   (const struct lu_env *env,
+static void  osd_it_ea_put     (const struct lu_env *env, struct dt_it *di);
+static int   osd_it_ea_next    (const struct lu_env *env, struct dt_it *di);
+static int   osd_it_ea_key_size(const struct lu_env *env,
                                 const struct dt_it *di);
+
 static void  osd_conf_get      (const struct lu_env *env,
                                 const struct dt_device *dev,
                                 struct dt_device_param *param);
@@ -202,13 +240,21 @@ static struct inode       *osd_iget         (struct osd_thread_info *info,
                                              struct osd_device *dev,
                                              const struct osd_inode_id *id);
 static struct super_block *osd_sb           (const struct osd_device *dev);
-static struct dt_it       *osd_it_init      (const struct lu_env *env,
-                                             struct dt_object *dt, int wable,
+static struct dt_it       *osd_it_iam_init  (const struct lu_env *env,
+                                             struct dt_object *dt,
+                                             struct lustre_capa *capa);
+static struct dt_key      *osd_it_iam_key   (const struct lu_env *env,
+                                             const struct dt_it *di);
+static struct dt_rec      *osd_it_iam_rec   (const struct lu_env *env,
+                                             const struct dt_it *di);
+static struct dt_it       *osd_it_ea_init   (const struct lu_env *env,
+                                             struct dt_object *dt,
                                              struct lustre_capa *capa);
-static struct dt_key      *osd_it_key       (const struct lu_env *env,
+static struct dt_key      *osd_it_ea_key    (const struct lu_env *env,
                                              const struct dt_it *di);
-static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
+static struct dt_rec      *osd_it_ea_rec    (const struct lu_env *env,
                                              const struct dt_it *di);
+
 static struct timespec    *osd_inode_time   (const struct lu_env *env,
                                              struct inode *inode,
                                              __u64 seconds);
@@ -217,6 +263,12 @@ static struct thandle     *osd_trans_start  (const struct lu_env *env,
                                              struct txn_param *p);
 static journal_t          *osd_journal      (const struct osd_device *dev);
 
+static int __osd_ea_add_rec(struct osd_thread_info *info,
+                            struct osd_object *pobj,
+                            struct osd_object *cobj,
+                            const char *name,
+                            struct thandle *th);
+
 static const struct lu_device_type_operations osd_device_type_ops;
 static       struct lu_device_type            osd_device_type;
 static const struct lu_object_operations      osd_lu_obj_ops;
@@ -224,9 +276,10 @@ static       struct obd_ops                   osd_obd_device_ops;
 static const struct lu_device_operations      osd_lu_ops;
 static       struct lu_context_key            osd_key;
 static const struct dt_object_operations      osd_obj_ops;
+static const struct dt_object_operations      osd_obj_ea_ops;
 static const struct dt_body_operations        osd_body_ops;
-static const struct dt_index_operations       osd_index_ops;
-static const struct dt_index_operations       osd_index_compat_ops;
+static const struct dt_index_operations       osd_index_iam_ops;
+static const struct dt_index_operations       osd_index_ea_ops;
 
 struct osd_thandle {
         struct thandle          ot_super;
@@ -343,7 +396,11 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
 
                 l = &mo->oo_dt.do_lu;
                 dt_object_init(&mo->oo_dt, NULL, d);
-                mo->oo_dt.do_ops = &osd_obj_ops;
+                if (osd_dev(d)->od_iop_mode)
+                        mo->oo_dt.do_ops = &osd_obj_ea_ops;
+                else
+                        mo->oo_dt.do_ops = &osd_obj_ops;
+
                 l->lo_ops = &osd_lu_obj_ops;
                 init_rwsem(&mo->oo_sem);
                 spin_lock_init(&mo->oo_guard);
@@ -398,11 +455,18 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
         OBD_FREE_PTR(obj);
 }
 
-static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
-                                          const struct iam_container *bag)
+static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
+                                             const struct iam_container *bag)
+{
+        return bag->ic_descr->id_ops->id_ipd_alloc(bag,
+                                           osd_oti_get(env)->oti_it_ipd);
+}
+
+static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
+                                              const struct iam_container *bag)
 {
         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
-                                                   osd_oti_get(env)->oti_ipd);
+                                           osd_oti_get(env)->oti_idx_ipd);
 }
 
 static void osd_ipd_put(const struct lu_env *env,
@@ -486,8 +550,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 
         /*
          * If object is unlinked remove fid->ino mapping from object index.
-         *
-         * File body will be deleted by iput().
          */
 
         osd_index_fini(obj);
@@ -501,6 +563,7 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
                                                 "Failed to cleanup: %d\n",
                                                 result);
                 }
+
                 iput(inode);
                 obj->oo_inode = NULL;
         }
@@ -579,6 +642,19 @@ static void osd_conf_get(const struct lu_env *env,
         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
 }
 
+/**
+ * Helper function to get and fill the buffer with input values.
+ */
+static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
+{
+        struct lu_buf *buf;
+
+        buf = &osd_oti_get(env)->oti_buf;
+        buf->lb_buf = area;
+        buf->lb_len = len;
+        return buf;
+}
+
 /*
  * Journal
  */
@@ -767,6 +843,7 @@ static void osd_ro(const struct lu_env *env, struct dt_device *d)
         EXIT;
 }
 
+
 /*
  * Concurrency: serialization provided by callers.
  */
@@ -1259,6 +1336,43 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
 
 extern struct inode *ldiskfs_create_inode(handle_t *handle,
                                           struct inode * dir, int mode);
+extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry,
+                             struct inode *inode);
+extern int ldiskfs_delete_entry(handle_t *handle,
+                                struct inode * dir,
+                                struct ldiskfs_dir_entry_2 * de_del,
+                                struct buffer_head * bh);
+extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry,
+                                               struct ldiskfs_dir_entry_2
+                                               ** res_dir);
+extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir,
+                                  struct inode *inode);
+
+extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode,
+                                    int name_index, const char *name,
+                                    const void *value, size_t value_len,
+                                    int flags);
+
+static struct dentry * osd_child_dentry_get(const struct lu_env *env,
+                                            struct osd_object *obj,
+                                            const char *name,
+                                            const int namelen)
+{
+        struct osd_thread_info *info   = osd_oti_get(env);
+        struct dentry *child_dentry = &info->oti_child_dentry;
+        struct dentry *obj_dentry = &info->oti_obj_dentry;
+
+        obj_dentry->d_inode = obj->oo_inode;
+        obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
+        obj_dentry->d_name.hash = 0;
+
+        child_dentry->d_name.hash = 0;
+        child_dentry->d_parent = obj_dentry;
+        child_dentry->d_name.name = name;
+        child_dentry->d_name.len = namelen;
+        return child_dentry;
+}
+
 
 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
                       umode_t mode,
@@ -1268,7 +1382,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         int result;
         struct osd_device  *osd = osd_obj2dev(obj);
         struct osd_thandle *oth;
-        struct inode       *parent;
+        struct dt_object   *parent;
         struct inode       *inode;
 #ifdef HAVE_QUOTA_SUPPORT
         struct osd_ctxt    *save = &info->oti_ctxt;
@@ -1276,21 +1390,23 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
-        LASSERT(osd->od_obj_area != NULL);
 
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle->h_transaction != NULL);
 
         if (hint && hint->dah_parent)
-                parent = osd_dt_obj(hint->dah_parent)->oo_inode;
+                parent = hint->dah_parent;
         else
-                parent = osd->od_obj_area->d_inode;
-        LASSERT(parent->i_op != NULL);
+                parent = osd->od_obj_area;
+
+        LASSERT(parent != NULL);
+        LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
 
 #ifdef HAVE_QUOTA_SUPPORT
         osd_push_ctxt(info->oti_env, save);
 #endif
-        inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
+        inode = ldiskfs_create_inode(oth->ot_handle,
+                                     osd_dt_obj(parent)->oo_inode, mode);
 #ifdef HAVE_QUOTA_SUPPORT
         osd_pop_ctxt(save);
 #endif
@@ -1307,6 +1423,10 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
                            int recsize, handle_t *handle);
 
+extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize,
+                           int recsize, handle_t *handle);
+
+
 enum {
         OSD_NAME_LEN = 255
 };
@@ -1314,22 +1434,25 @@ enum {
 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
                      struct lu_attr *attr,
                      struct dt_allocation_hint *hint,
+                     struct dt_object_format *dof,
                      struct thandle *th)
 {
         int result;
         struct osd_thandle *oth;
+        struct osd_device *osd = osd_obj2dev(obj);
+        __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
 
         LASSERT(S_ISDIR(attr->la_mode));
 
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle->h_transaction != NULL);
-        result = osd_mkfile(info, obj, (attr->la_mode &
-                            (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
-        if (result == 0) {
+        result = osd_mkfile(info, obj, mode, hint, th);
+        if (result == 0 && osd->od_iop_mode == 0) {
                 LASSERT(obj->oo_inode != NULL);
                 /*
                  * XXX uh-oh... call low-level iam function directly.
                  */
+
                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
                                          sizeof (struct lu_fid_pack),
                                          oth->ot_handle);
@@ -1337,9 +1460,47 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
         return result;
 }
 
+static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
+                        struct lu_attr *attr,
+                        struct dt_allocation_hint *hint,
+                        struct dt_object_format *dof,
+                        struct thandle *th)
+{
+        int result;
+        struct osd_thandle *oth;
+        const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
+
+        __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
+
+        LASSERT(S_ISREG(attr->la_mode));
+
+        oth = container_of(th, struct osd_thandle, ot_super);
+        LASSERT(oth->ot_handle->h_transaction != NULL);
+
+        result = osd_mkfile(info, obj, mode, hint, th);
+        if (result == 0) {
+                LASSERT(obj->oo_inode != NULL);
+                if (feat->dif_flags & DT_IND_VARKEY)
+                        result = iam_lvar_create(obj->oo_inode,
+                                                 feat->dif_keysize_max,
+                                                 feat->dif_ptrsize,
+                                                 feat->dif_recsize_max,
+                                                 oth->ot_handle);
+                else
+                        result = iam_lfix_create(obj->oo_inode,
+                                                 feat->dif_keysize_max,
+                                                 feat->dif_ptrsize,
+                                                 feat->dif_recsize_max,
+                                                 oth->ot_handle);
+
+        }
+        return result;
+}
+
 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
                      struct lu_attr *attr,
                      struct dt_allocation_hint *hint,
+                     struct dt_object_format *dof,
                      struct thandle *th)
 {
         LASSERT(S_ISREG(attr->la_mode));
@@ -1350,6 +1511,7 @@ static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
                      struct lu_attr *attr,
                      struct dt_allocation_hint *hint,
+                     struct dt_object_format *dof,
                      struct thandle *th)
 {
         LASSERT(S_ISLNK(attr->la_mode));
@@ -1360,22 +1522,17 @@ static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
                      struct lu_attr *attr,
                      struct dt_allocation_hint *hint,
+                     struct dt_object_format *dof,
                      struct thandle *th)
 {
-        int result;
-        struct osd_device *osd = osd_obj2dev(obj);
-        struct inode      *dir;
         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
+        int result;
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
-        LASSERT(osd->od_obj_area != NULL);
         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
                 S_ISFIFO(mode) || S_ISSOCK(mode));
 
-        dir = osd->od_obj_area->d_inode;
-        LASSERT(dir->i_op != NULL);
-
         result = osd_mkfile(info, obj, mode, hint, th);
         if (result == 0) {
                 LASSERT(obj->oo_inode != NULL);
@@ -1388,28 +1545,30 @@ static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
                               struct lu_attr *,
                               struct dt_allocation_hint *hint,
+                              struct dt_object_format *dof,
                               struct thandle *);
 
-static osd_obj_type_f osd_create_type_f(__u32 mode)
+static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
 {
         osd_obj_type_f result;
 
-        switch (mode) {
-        case S_IFDIR:
+        switch (type) {
+        case DFT_DIR:
                 result = osd_mkdir;
                 break;
-        case S_IFREG:
+        case DFT_REGULAR:
                 result = osd_mkreg;
                 break;
-        case S_IFLNK:
+        case DFT_SYM:
                 result = osd_mksym;
                 break;
-        case S_IFCHR:
-        case S_IFBLK:
-        case S_IFIFO:
-        case S_IFSOCK:
+        case DFT_NODE:
                 result = osd_mknod;
                 break;
+        case DFT_INDEX:
+                result = osd_mk_index;
+                break;
+
         default:
                 LBUG();
                 break;
@@ -1428,19 +1587,62 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
         ah->dah_mode = child_mode;
 }
 
+/**
+ * Helper function for osd_object_create()
+ *
+ * \retval 0, on success
+ */
+static int __osd_object_create(struct osd_thread_info *info,
+                               struct osd_object *obj, struct lu_attr *attr,
+                               struct dt_allocation_hint *hint,
+                               struct dt_object_format *dof,
+                               struct thandle *th)
+{
+
+        int result;
 
-/*
- * Concurrency: @dt is write locked.
+        result = osd_create_pre(info, obj, attr, th);
+        if (result == 0) {
+                result = osd_create_type_f(dof->dof_type)(info, obj,
+                                           attr, hint, dof, th);
+                if (result == 0)
+                        result = osd_create_post(info, obj, attr, th);
+        }
+        return result;
+}
+
+/**
+ * Helper function for osd_object_create()
+ *
+ * \retval 0, on success
  */
+static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
+                           const struct lu_fid *fid, struct thandle *th)
+{
+        struct osd_thread_info *info = osd_oti_get(env);
+        struct osd_inode_id    *id   = &info->oti_id;
+        struct osd_device      *osd  = osd_obj2dev(obj);
+        struct md_ucred        *uc   = md_ucred(env);
+
+        LASSERT(obj->oo_inode != NULL);
+        LASSERT(uc != NULL);
+
+        id->oii_ino = obj->oo_inode->i_ino;
+        id->oii_gen = obj->oo_inode->i_generation;
+
+        return osd_oi_insert(info, &osd->od_oi, fid, id, th,
+                             uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
+}
+
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                              struct lu_attr *attr,
                              struct dt_allocation_hint *hint,
+                             struct dt_object_format *dof,
                              struct thandle *th)
 {
-        const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
-        struct osd_object      *obj  = osd_dt_obj(dt);
-        struct osd_device      *osd  = osd_obj2dev(obj);
-        struct osd_thread_info *info = osd_oti_get(env);
+        const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
+        struct osd_object      *obj    = osd_dt_obj(dt);
+        struct osd_thread_info *info   = osd_oti_get(env);
         int result;
 
         ENTRY;
@@ -1450,31 +1652,170 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
-        /*
-         * XXX missing: Quote handling.
-         */
+        result = __osd_object_create(info, obj, attr, hint, dof, th);
+        if (result == 0)
+                result = __osd_oi_insert(env, obj, fid, th);
 
-        result = osd_create_pre(info, obj, attr, th);
-        if (result == 0) {
-                result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
-                                                                attr, hint, th);
-                if (result == 0)
-                        result = osd_create_post(info, obj, attr, th);
+        LASSERT(ergo(result == 0, dt_object_exists(dt)));
+        LASSERT(osd_invariant(obj));
+        RETURN(result);
+}
+
+/**
+ * Helper function for osd_xattr_set()
+ */
+static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_buf *buf, const char *name, int fl)
+{
+        struct osd_object      *obj      = osd_dt_obj(dt);
+        struct inode           *inode    = obj->oo_inode;
+        struct osd_thread_info *info     = osd_oti_get(env);
+        struct dentry          *dentry   = &info->oti_child_dentry;
+        struct timespec        *t        = &info->oti_time;
+        int                     fs_flags = 0;
+        int  rc;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
+        LASSERT(osd_write_locked(env, obj));
+
+        if (fl & LU_XATTR_REPLACE)
+                fs_flags |= XATTR_REPLACE;
+
+        if (fl & LU_XATTR_CREATE)
+                fs_flags |= XATTR_CREATE;
+
+        dentry->d_inode = inode;
+        *t = inode->i_ctime;
+        rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
+                                   buf->lb_len, fs_flags);
+        if (likely(rc == 0)) {
+                spin_lock(&obj->oo_guard);
+                inode->i_ctime = *t;
+                spin_unlock(&obj->oo_guard);
+                mark_inode_dirty(inode);
         }
-        if (result == 0) {
-                struct osd_inode_id *id = &info->oti_id;
-                struct md_ucred     *uc = md_ucred(env);
+        return rc;
+}
 
-                LASSERT(obj->oo_inode != NULL);
-                LASSERT(uc != NULL);
+/**
+ * Put the fid into lustre_mdt_attrs, and then place the structure
+ * inode's ea. This fid should not be altered during the life time
+ * of the inode.
+ *
+ * \retval +ve, on success
+ * \retval -ve, on error
+ *
+ * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
+ */
+static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
+                          const struct lu_fid *fid)
+{
+        struct osd_thread_info  *info      = osd_oti_get(env);
+        struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
+
+        fid_cpu_to_be(&mdt_attrs->lma_self_fid, fid);
+
+        return __osd_xattr_set(env, dt,
+                               osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
+                               MDT_XATTR_NAME, LU_XATTR_CREATE);
+
+}
+
+/**
+ * Helper function to form igif
+ */
+static inline void osd_igif_get(const struct lu_env *env, struct dentry *dentry,
+                                struct lu_fid *fid)
+{
+        struct inode  *inode = dentry->d_inode;
+        lu_igif_build(fid, inode->i_ino, inode->i_generation);
+}
+
+/**
+ * Helper function to pack the fid
+ */
+static inline void osd_fid_pack(const struct lu_env *env, const struct lu_fid *fid,
+                                struct lu_fid_pack *pack)
+{
+        fid_pack(pack, fid, &osd_oti_get(env)->oti_fid);
+}
+
+/**
+ * Try to read the fid from inode ea into dt_rec, if return value
+ * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
+ *
+ * \param rec, the data-structure into which fid/igif is read
+ *
+ * \retval 0, on success
+ */
+static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry,
+                          struct dt_rec *rec)
+{
+        struct inode            *inode     = dentry->d_inode;
+        struct osd_thread_info  *info      = osd_oti_get(env);
+        struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
+        struct lu_fid           *fid       = &info->oti_fid;
+        int rc;
+
+        LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
 
-                id->oii_ino = obj->oo_inode->i_ino;
-                id->oii_gen = obj->oo_inode->i_generation;
+        rc = inode->i_op->getxattr(dentry, MDT_XATTR_NAME, (void *)mdt_attrs,
+                                   sizeof *mdt_attrs);
 
-                result = osd_oi_insert(info, &osd->od_oi, fid, id, th,
-                                       uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
+        if (rc > 0) {
+                fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid);
+                rc = 0;
+        } else if (rc == -ENODATA) {
+                osd_igif_get(env, dentry, fid);
+                rc = 0;
         }
 
+        if (rc == 0)
+                osd_fid_pack(env, fid, (struct lu_fid_pack*)rec);
+
+        return rc;
+}
+
+/**
+ * OSD layer object create function for interoperability mode (b11826).
+ * This is mostly similar to osd_object_create(). Only difference being, fid is
+ * inserted into inode ea here.
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
+ */
+static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
+                             struct lu_attr *attr,
+                             struct dt_allocation_hint *hint,
+                             struct dt_object_format *dof,
+                             struct thandle *th)
+{
+        const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
+        struct osd_object      *obj    = osd_dt_obj(dt);
+        struct osd_thread_info *info   = osd_oti_get(env);
+        int result;
+        int is_root = 0;
+
+        ENTRY;
+
+        LASSERT(osd_invariant(obj));
+        LASSERT(!dt_object_exists(dt));
+        LASSERT(osd_write_locked(env, obj));
+        LASSERT(th != NULL);
+
+        result = __osd_object_create(info, obj, attr, hint, dof, th);
+
+        if (hint && hint->dah_parent)
+                is_root = osd_object_is_root(osd_dt_obj(hint->dah_parent));
+
+        /* objects under osd root shld have igif fid, so dont add fid EA */
+        if (result == 0 && is_root == 0)
+                result = osd_ea_fid_set(env, dt, fid);
+
+        if (result == 0)
+                result = __osd_oi_insert(env, obj, fid, th);
+
         LASSERT(ergo(result == 0, dt_object_exists(dt)));
         LINVRNT(osd_invariant(obj));
         RETURN(result);
@@ -1538,7 +1879,7 @@ static int osd_xattr_get(const struct lu_env *env,
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_dentry;
+        struct dentry          *dentry = &info->oti_obj_dentry;
 
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
@@ -1551,6 +1892,7 @@ static int osd_xattr_get(const struct lu_env *env,
         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
 }
 
+
 /*
  * Concurrency: @dt is write locked.
  */
@@ -1558,39 +1900,12 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, const char *name, int fl,
                          struct thandle *handle, struct lustre_capa *capa)
 {
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_dentry;
-        struct timespec        *t      = &info->oti_time;
-        int                     fs_flags = 0, rc;
-
-        LASSERT(dt_object_exists(dt));
-        LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
-        LASSERT(osd_write_locked(env, obj));
         LASSERT(handle != NULL);
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
-        if (fl & LU_XATTR_REPLACE)
-                fs_flags |= XATTR_REPLACE;
-
-        if (fl & LU_XATTR_CREATE)
-                fs_flags |= XATTR_CREATE;
-
-        dentry->d_inode = inode;
-        *t = inode->i_ctime;
-        rc = inode->i_op->setxattr(dentry, name,
-                                   buf->lb_buf, buf->lb_len, fs_flags);
-        if (likely(rc == 0)) {
-                /* ctime should not be updated with server-side time. */
-                spin_lock(&obj->oo_guard);
-                inode->i_ctime = *t;
-                spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
-        }
-        return rc;
+        return __osd_xattr_set(env, dt, buf, name, fl);
 }
 
 /*
@@ -1604,7 +1919,7 @@ static int osd_xattr_list(const struct lu_env *env,
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_dentry;
+        struct dentry          *dentry = &info->oti_obj_dentry;
 
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
@@ -1629,7 +1944,7 @@ static int osd_xattr_del(const struct lu_env *env,
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_dentry;
+        struct dentry          *dentry = &info->oti_obj_dentry;
         struct timespec        *t      = &info->oti_time;
         int                     rc;
 
@@ -1747,7 +2062,7 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_dentry;
+        struct dentry          *dentry = &info->oti_obj_dentry;
         struct file            *file   = &info->oti_file;
         ENTRY;
 
@@ -1781,6 +2096,30 @@ static const struct dt_object_operations osd_obj_ops = {
         .do_object_sync  = osd_object_sync,
 };
 
+/**
+ * dt_object_operations for interoperability mode
+ * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
+ */
+static const struct dt_object_operations osd_obj_ea_ops = {
+        .do_read_lock    = osd_object_read_lock,
+        .do_write_lock   = osd_object_write_lock,
+        .do_read_unlock  = osd_object_read_unlock,
+        .do_write_unlock = osd_object_write_unlock,
+        .do_attr_get     = osd_attr_get,
+        .do_attr_set     = osd_attr_set,
+        .do_ah_init      = osd_ah_init,
+        .do_create       = osd_object_ea_create,
+        .do_index_try    = osd_index_try,
+        .do_ref_add      = osd_object_ref_add,
+        .do_ref_del      = osd_object_ref_del,
+        .do_xattr_get    = osd_xattr_get,
+        .do_xattr_set    = osd_xattr_set,
+        .do_xattr_del    = osd_xattr_del,
+        .do_xattr_list   = osd_xattr_list,
+        .do_capa_get     = osd_capa_get,
+        .do_object_sync  = osd_object_sync,
+};
+
 /*
  * Body operations.
  */
@@ -1861,10 +2200,11 @@ static int osd_object_is_root(const struct osd_object *obj)
         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
 }
 
-static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
+static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
                            const struct dt_index_features *feat)
 {
         struct iam_descr *descr;
+        struct dt_object *dt = &o->oo_dt;
 
         if (osd_object_is_root(o))
                 return feat == &dt_directory_features;
@@ -1872,14 +2212,23 @@ static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
         LASSERT(o->oo_dir != NULL);
 
         descr = o->oo_dir->od_container.ic_descr;
-        if (feat == &dt_directory_features)
-                return descr == &iam_htree_compat_param ||
-                        (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
-                         1 /*
-                            * XXX check that index looks like directory.
-                            */
-                                );
-        else
+        if (feat == &dt_directory_features) {
+                if (descr->id_rec_size == sizeof(struct lu_fid_pack))
+                        return 1;
+
+                if (descr == &iam_htree_compat_param) {
+                        /* if it is a HTREE dir then there is good chance that,
+                         * we dealing with ext3 directory here with no FIDs. */
+
+                        if (descr->id_rec_size ==
+                            sizeof ((struct ldiskfs_dir_entry_2 *)NULL)->inode) {
+
+                                dt->do_index_ops = &osd_index_ea_ops;
+                                return 1;
+                        }
+                }
+                return 0;
+        } else {
                 return
                         feat->dif_keysize_min <= descr->id_key_size &&
                         descr->id_key_size <= feat->dif_keysize_max &&
@@ -1890,11 +2239,12 @@ static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
                         ergo(feat->dif_flags & DT_IND_UPDATE,
                              1 /* XXX check that object (and file system) is
                                 * writable */);
+        }
 }
 
-static int osd_container_init(const struct lu_env *env,
-                              struct osd_object *obj,
-                              struct osd_directory *dir)
+static int osd_iam_container_init(const struct lu_env *env,
+                                  struct osd_object *obj,
+                                  struct osd_directory *dir)
 {
         int result;
         struct iam_container *bag;
@@ -1904,7 +2254,7 @@ static int osd_container_init(const struct lu_env *env,
         if (result == 0) {
                 result = iam_container_setup(bag);
                 if (result == 0)
-                        obj->oo_dt.do_index_ops = &osd_index_ops;
+                        obj->oo_dt.do_index_ops = &osd_index_iam_ops;
                 else
                         iam_container_fini(bag);
         }
@@ -1918,16 +2268,25 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                          const struct dt_index_features *feat)
 {
         int result;
+        int ea_dir = 0;
         struct osd_object *obj = osd_dt_obj(dt);
+        struct osd_device *osd = osd_obj2dev(obj);
 
         LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
 
         if (osd_object_is_root(obj)) {
-                dt->do_index_ops = &osd_index_compat_ops;
+                dt->do_index_ops = &osd_index_ea_ops;
                 result = 0;
-        } else if (!osd_has_index(obj)) {
-                struct osd_directory *dir;
+        } else if (feat == &dt_directory_features && osd->od_iop_mode) {
+                dt->do_index_ops = &osd_index_ea_ops;
+                if (S_ISDIR(obj->oo_inode->i_mode))
+                        result = 0;
+                else
+                        result = -ENOTDIR;
+                ea_dir = 1;
+        } else if (!osd_has_index(obj)) {
+                struct osd_directory *dir;
 
                 OBD_ALLOC_PTR(dir);
                 if (dir != NULL) {
@@ -1951,7 +2310,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                          * recheck under lock.
                          */
                         if (!osd_has_index(obj))
-                                result = osd_container_init(env, obj, dir);
+                                result = osd_iam_container_init(env, obj, dir);
                         else
                                 result = 0;
                         up(&obj->oo_dir->od_sem);
@@ -1960,8 +2319,8 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
         } else
                 result = 0;
 
-        if (result == 0) {
-                if (!osd_index_probe(env, obj, feat))
+        if (result == 0 && ea_dir == 0) {
+                if (!osd_iam_index_probe(env, obj, feat))
                         result = -ENOTDIR;
         }
         LINVRNT(osd_invariant(obj));
@@ -1969,9 +2328,21 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
         return result;
 }
 
-static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
-                            const struct dt_key *key, struct thandle *handle,
-                            struct lustre_capa *capa)
+/**
+ *      delete a (key, value) pair from index \a dt specified by \a key
+ *
+ *      \param  dt_object      osd index object
+ *      \param  key     key for index
+ *      \param  rec     record reference
+ *      \param  handle  transaction handler
+ *
+ *      \retval  0  success
+ *      \retval -ve   failure
+ */
+
+static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
+                                const struct dt_key *key, struct thandle *handle,
+                                struct lustre_capa *capa)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
         struct osd_thandle    *oh;
@@ -1989,7 +2360,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
                 RETURN(-EACCES);
 
-        ipd = osd_ipd_get(env, bag);
+        ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
 
@@ -2003,40 +2374,123 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
-static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
-                            struct dt_rec *rec, const struct dt_key *key,
-                            struct lustre_capa *capa)
+/**
+ * Index delete function for interoperability mode (b11826).
+ * It will remove the directory entry added by osd_index_ea_insert().
+ * This entry is needed to maintain name->fid mapping.
+ *
+ * \param key,  key i.e. file entry to be deleted
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
+ */
+static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
+                               const struct dt_key *key, struct thandle *handle,
+                               struct lustre_capa *capa)
+{
+        struct osd_object          *obj    = osd_dt_obj(dt);
+        struct inode               *dir    = obj->oo_inode;
+        struct dentry              *dentry;
+        struct osd_thandle         *oh;
+        struct ldiskfs_dir_entry_2 *de;
+        struct buffer_head         *bh;
+
+        int rc;
+
+        ENTRY;
+
+        LINVRNT(osd_invariant(obj));
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle != NULL);
+        LASSERT(oh->ot_handle->h_transaction != NULL);
+
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
+                RETURN(-EACCES);
+
+        dentry = osd_child_dentry_get(env, obj,
+                                      (char *)key, strlen((char *)key));
+        bh = ldiskfs_find_entry(dentry, &de);
+        if (bh) {
+                rc = ldiskfs_delete_entry(oh->ot_handle,
+                                dir, de, bh);
+                if (!rc)
+                        mark_inode_dirty(dir);
+                brelse(bh);
+        } else
+                rc = -ENOENT;
+
+        LASSERT(osd_invariant(obj));
+        RETURN(rc);
+}
+
+/**
+ *      Lookup index for \a key and copy record to \a rec.
+ *
+ *      \param  dt_object      osd index object
+ *      \param  key     key for index
+ *      \param  rec     record reference
+ *
+ *      \retval  +ve  success : exact mach
+ *      \retval  0    return record with key not greater than \a key
+ *      \retval -ve   failure
+ */
+static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
+                                struct dt_rec *rec, const struct dt_key *key,
+                                struct lustre_capa *capa)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
         struct iam_path_descr *ipd;
         struct iam_container  *bag = &obj->oo_dir->od_container;
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct iam_iterator    *it = &oti->oti_idx_it;
         int rc;
-
         ENTRY;
 
-        LINVRNT(osd_invariant(obj));
+        LASSERT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(bag->ic_object == obj->oo_inode);
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
-                return -EACCES;
+                RETURN(-EACCES);
 
-        ipd = osd_ipd_get(env, bag);
-        if (unlikely(ipd == NULL))
+        ipd = osd_idx_ipd_get(env, bag);
+        if (IS_ERR(ipd))
                 RETURN(-ENOMEM);
 
-        rc = iam_lookup(bag, (const struct iam_key *)key,
-                        (struct iam_rec *)rec, ipd);
+        /* got ipd now we can start iterator. */
+        iam_it_init(it, bag, 0, ipd);
+
+        rc = iam_it_get(it, (struct iam_key *)key);
+        if (rc >= 0)
+                iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec);
+
+        iam_it_put(it);
+        iam_it_fini(it);
         osd_ipd_put(env, bag, ipd);
+
         LINVRNT(osd_invariant(obj));
 
         RETURN(rc);
 }
 
-static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
-                            const struct dt_rec *rec, const struct dt_key *key,
-                            struct thandle *th, struct lustre_capa *capa,
-                            int ignore_quota)
+/**
+ *      Inserts (key, value) pair in \a dt index object.
+ *
+ *      \param  dt      osd index object
+ *      \param  key     key for index
+ *      \param  rec     record reference
+ *      \param  th      transaction handler
+ *
+ *      \retval  0  success
+ *      \retval -ve failure
+ */
+static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
+                                const struct dt_rec *rec, const struct dt_key *key,
+                                struct thandle *th, struct lustre_capa *capa,
+                                int ignore_quota)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
         struct iam_path_descr *ipd;
@@ -2057,7 +2511,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                 return -EACCES;
 
-        ipd = osd_ipd_get(env, bag);
+        ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
 
@@ -2080,375 +2534,775 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
-/*
- * Iterator operations.
+/**
+ * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
+ * into the directory.Also sets flags into osd object to
+ * indicate dot and dotdot are created. This is required for
+ * interoperability mode (b11826)
+ *
+ * \param dir   directory for dot and dotdot fixup.
+ * \param obj   child object for linking
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
  */
-struct osd_it {
-        struct osd_object     *oi_obj;
-        struct iam_path_descr *oi_ipd;
-        struct iam_iterator    oi_it;
-};
+static int osd_add_dot_dotdot(struct osd_thread_info *info,
+                              struct osd_object *dir,
+                              struct osd_object *obj, const char *name,
+                              struct thandle *th)
+{
+        struct inode            *parent_dir   = obj->oo_inode;
+        struct inode            *inode  = dir->oo_inode;
+        struct osd_thandle      *oth;
+        int result = 0;
+
+        oth = container_of(th, struct osd_thandle, ot_super);
+        LASSERT(oth->ot_handle->h_transaction != NULL);
+        LASSERT(S_ISDIR(dir->oo_inode->i_mode));
+
+        if (strcmp(name, dot) == 0) {
+                if (dir->oo_compat_dot_created) {
+                        result = -EEXIST;
+                } else {
+                        LASSERT(obj == dir);
+                        dir->oo_compat_dot_created = 1;
+                        result = 0;
+                }
+        } else if(strcmp(name, dotdot) == 0) {
+                if (!dir->oo_compat_dot_created)
+                        return -EINVAL;
+                if (dir->oo_compat_dotdot_created)
+                        return __osd_ea_add_rec(info, dir, obj, name, th);
+
+                result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode);
+                if (result == 0)
+                       dir->oo_compat_dotdot_created = 1;
+        }
+
+        return result;
+}
+
+/**
+ * Calls ldiskfs_add_entry() to add directory entry
+ * into the directory. This is required for
+ * interoperability mode (b11826)
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
+ */
+static int __osd_ea_add_rec(struct osd_thread_info *info,
+                            struct osd_object *pobj,
+                            struct osd_object *cobj,
+                            const char *name,
+                            struct thandle *th)
+{
+        struct dentry      *child;
+        struct osd_thandle *oth;
+        struct inode       *cinode  = cobj->oo_inode;
+        int rc;
+
+        oth = container_of(th, struct osd_thandle, ot_super);
+        LASSERT(oth->ot_handle != NULL);
+        LASSERT(oth->ot_handle->h_transaction != NULL);
+
+        child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
+        rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+
+        RETURN(rc);
+}
+
+/**
+ * It will call the appropriate osd_add* function and return the
+ * value, return by respective functions.
+ */
+static int osd_ea_add_rec(const struct lu_env *env,
+                          struct osd_object *pobj,
+                          struct osd_object *cobj,
+                          const char *name,
+                          struct thandle *th)
+{
+        struct osd_thread_info    *info   = osd_oti_get(env);
+        int rc;
 
-static struct dt_it *osd_it_init(const struct lu_env *env,
-                                 struct dt_object *dt, int writable,
+        if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
+                                                   name[2] =='\0')))
+                rc = osd_add_dot_dotdot(info, pobj, cobj, name, th);
+        else
+                rc = __osd_ea_add_rec(info, pobj, cobj, name, th);
+
+        return rc;
+}
+
+/**
+ * Calls ->lookup() to find dentry. From dentry get inode and
+ * read inode's ea to get fid. This is required for  interoperability
+ * mode (b11826)
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
+ */
+static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
+                             struct dt_rec *rec, const struct dt_key *key)
+{
+        struct inode            *dir    = obj->oo_inode;
+        struct osd_thread_info  *info   = osd_oti_get(env);
+        struct dentry           *dentry;
+        struct osd_device      *dev = osd_dev(obj->oo_dt.do_lu.lo_dev);
+        struct osd_inode_id    *id     = &info->oti_id;
+        struct ldiskfs_dir_entry_2 *de;
+        struct buffer_head         *bh;
+        struct inode *inode;
+        int ino;
+        int rc;
+
+        LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
+
+        dentry = osd_child_dentry_get(env, obj,
+                                      (char *)key, strlen((char *)key));
+        bh = ldiskfs_find_entry(dentry, &de);
+        if (bh) {
+                ino = le32_to_cpu(de->inode);
+                brelse(bh);
+                id->oii_ino = ino;
+                id->oii_gen = OSD_OII_NOGEN;
+
+                inode = osd_iget(info, dev, id);
+                if (!IS_ERR(inode)) {
+                        dentry->d_inode = inode;
+
+                        rc = osd_ea_fid_get(env, dentry, rec);
+                        iput(inode);
+                } else
+                        rc = -ENOENT;
+        } else
+                rc = -ENOENT;
+
+        RETURN (rc);
+}
+
+/**
+ * Find the osd object for given fid.
+ *
+ * \param fid, need to find the osd object having this fid
+ *
+ * \retval osd_object, on success
+ * \retval        -ve, on error
+ */
+struct osd_object *osd_object_find(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   const struct lu_fid *fid)
+{
+        struct lu_device         *ludev = dt->do_lu.lo_dev;
+        struct osd_object        *child = NULL;
+        struct lu_object         *luch;
+        struct lu_object         *lo;
+
+        luch = lu_object_find(env, ludev, fid, NULL);
+        if (!IS_ERR(luch)) {
+                if (lu_object_exists(luch)) {
+                        lo = lu_object_locate(luch->lo_header, ludev->ld_type);
+                        if (lo != NULL)
+                                child = osd_obj(lo);
+                        else
+                                LU_OBJECT_DEBUG(D_ERROR, env, luch,
+                                                "lu_object can't be located"
+                                                ""DFID"\n", PFID(fid));
+
+                        if (child == NULL) {
+                                lu_object_put(env, luch);
+                                CERROR("Unable to get osd_object\n");
+                                child = ERR_PTR(-ENOENT);
+                        }
+                } else {
+                        LU_OBJECT_DEBUG(D_ERROR, env, luch,
+                                        "lu_object does not exists "DFID"\n",
+                                        PFID(fid));
+                        child = ERR_PTR(-ENOENT);
+                }
+        } else
+                child = (void *)luch;
+
+        return child;
+}
+
+/**
+ * Put the osd object once done with it.
+ *
+ * \param obj, osd object that needs to be put
+ */
+static inline void osd_object_put(const struct lu_env *env,
+                                  struct osd_object *obj)
+{
+        lu_object_put(env, &obj->oo_dt.do_lu);
+}
+
+/**
+ * Index add function for interoperability mode (b11826).
+ * It will add the directory entry.This entry is needed to
+ * maintain name->fid mapping.
+ *
+ * \param key, it is key i.e. file entry to be inserted
+ * \param rec, it is value of given key i.e. fid
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
+ */
+static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
+                               const struct dt_rec *rec,
+                               const struct dt_key *key, struct thandle *th,
+                               struct lustre_capa *capa, int ignore_quota)
+{
+        struct osd_object        *obj   = osd_dt_obj(dt);
+        struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
+        const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
+        const char               *name  = (const char *)key;
+        struct osd_object        *child;
+
+        int rc;
+
+        ENTRY;
+
+        LASSERT(osd_invariant(obj));
+        LASSERT(dt_object_exists(dt));
+        LASSERT(th != NULL);
+
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+                RETURN(-EACCES);
+
+        rc = fid_unpack(pack, fid);
+        if (rc != 0)
+                RETURN(rc);
+        child = osd_object_find(env, dt, fid);
+        if (!IS_ERR(child)) {
+                rc = osd_ea_add_rec(env, obj, child, name, th);
+                osd_object_put(env, child);
+        } else {
+                rc = PTR_ERR(child);
+        }
+
+        LASSERT(osd_invariant(obj));
+        RETURN(rc);
+}
+
+/**
+ *  Initialize osd Iterator for given osd index object.
+ *
+ *  \param  dt      osd index object
+ */
+
+static struct dt_it *osd_it_iam_init(const struct lu_env *env,
+                                 struct dt_object *dt,
                                  struct lustre_capa *capa)
 {
-        struct osd_it         *it;
+        struct osd_it_iam         *it;
+        struct osd_thread_info *oti = osd_oti_get(env);
         struct osd_object     *obj = osd_dt_obj(dt);
         struct lu_object      *lo  = &dt->do_lu;
         struct iam_path_descr *ipd;
         struct iam_container  *bag = &obj->oo_dir->od_container;
-        __u32                  flags;
 
         LASSERT(lu_object_exists(lo));
 
-        if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
-                            CAPA_OPC_BODY_READ))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
                 return ERR_PTR(-EACCES);
 
-        flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
-        OBD_ALLOC_PTR(it);
-        if (it != NULL) {
-                /*
-                 * XXX: as ipd is allocated within osd_thread_info, assignment
-                 * below implies that iterator usage is confined within single
-                 * environment.
-                 */
-                ipd = osd_ipd_get(env, bag);
-                if (likely(ipd != NULL)) {
-                        it->oi_obj = obj;
-                        it->oi_ipd = ipd;
-                        lu_object_get(lo);
-                        iam_it_init(&it->oi_it, bag, flags, ipd);
-                        return (struct dt_it *)it;
-                } else
-                        OBD_FREE_PTR(it);
+        it = &oti->oti_it;
+        ipd = osd_it_ipd_get(env, bag);
+        if (likely(ipd != NULL)) {
+                it->oi_obj = obj;
+                it->oi_ipd = ipd;
+                lu_object_get(lo);
+                iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
+                return (struct dt_it *)it;
         }
         return ERR_PTR(-ENOMEM);
 }
 
-static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
+/**
+ * free given Iterator.
+ */
+
+static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it     *it = (struct osd_it *)di;
+        struct osd_it_iam     *it = (struct osd_it_iam *)di;
         struct osd_object *obj = it->oi_obj;
 
         iam_it_fini(&it->oi_it);
         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
         lu_object_put(env, &obj->oo_dt.do_lu);
-        OBD_FREE_PTR(it);
 }
 
-static int osd_it_get(const struct lu_env *env,
+/**
+ *  Move Iterator to record specified by \a key
+ *
+ *  \param  di      osd iterator
+ *  \param  key     key for index
+ *
+ *  \retval +ve  di points to record with least key not larger than key
+ *  \retval  0   di points to exact matched key
+ *  \retval -ve  failure
+ */
+
+static int osd_it_iam_get(const struct lu_env *env,
                       struct dt_it *di, const struct dt_key *key)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
 }
 
-static void osd_it_put(const struct lu_env *env, struct dt_it *di)
+/**
+ *  Release Iterator
+ *
+ *  \param  di      osd iterator
+ */
+
+static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         iam_it_put(&it->oi_it);
 }
 
-static int osd_it_next(const struct lu_env *env, struct dt_it *di)
+/**
+ *  Move iterator by one record
+ *
+ *  \param  di      osd iterator
+ *
+ *  \retval +1   end of container reached
+ *  \retval  0   success
+ *  \retval -ve  failure
+ */
+
+static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         return iam_it_next(&it->oi_it);
 }
 
-static int osd_it_del(const struct lu_env *env, struct dt_it *di,
-                      struct thandle *th)
-{
-        struct osd_it      *it = (struct osd_it *)di;
-        struct osd_thandle *oh;
-
-        LASSERT(th != NULL);
-
-        oh = container_of0(th, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle != NULL);
-        LASSERT(oh->ot_handle->h_transaction != NULL);
-
-        return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
-}
+/**
+ * Return pointer to the key under iterator.
+ */
 
-static struct dt_key *osd_it_key(const struct lu_env *env,
+static struct dt_key *osd_it_iam_key(const struct lu_env *env,
                                  const struct dt_it *di)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         return (struct dt_key *)iam_it_key_get(&it->oi_it);
 }
 
-static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
+/**
+ * Return size of key under iterator (in bytes)
+ */
+
+static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         return iam_it_key_size(&it->oi_it);
 }
 
-static struct dt_rec *osd_it_rec(const struct lu_env *env,
+/**
+ * Return pointer to the record under iterator.
+ */
+static struct dt_rec *osd_it_iam_rec(const struct lu_env *env,
                                  const struct dt_it *di)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
 }
 
-static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
+/**
+ * Returns cookie for current Iterator position.
+ */
+static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         return iam_it_store(&it->oi_it);
 }
 
-static int osd_it_load(const struct lu_env *env,
+/**
+ * Restore iterator from cookie.
+ *
+ * \param  di      osd iterator
+ * \param  hash    Iterator location cookie
+ *
+ * \retval +ve  di points to record with least key not larger than key.
+ * \retval  0   di points to exact matched key
+ * \retval -ve  failure
+ */
+
+static int osd_it_iam_load(const struct lu_env *env,
                        const struct dt_it *di, __u64 hash)
 {
-        struct osd_it *it = (struct osd_it *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
 
         return iam_it_load(&it->oi_it, hash);
 }
 
-static const struct dt_index_operations osd_index_ops = {
-        .dio_lookup = osd_index_lookup,
-        .dio_insert = osd_index_insert,
-        .dio_delete = osd_index_delete,
+static const struct dt_index_operations osd_index_iam_ops = {
+        .dio_lookup = osd_index_iam_lookup,
+        .dio_insert = osd_index_iam_insert,
+        .dio_delete = osd_index_iam_delete,
         .dio_it     = {
-                .init     = osd_it_init,
-                .fini     = osd_it_fini,
-                .get      = osd_it_get,
-                .put      = osd_it_put,
-                .del      = osd_it_del,
-                .next     = osd_it_next,
-                .key      = osd_it_key,
-                .key_size = osd_it_key_size,
-                .rec      = osd_it_rec,
-                .store    = osd_it_store,
-                .load     = osd_it_load
+                .init     = osd_it_iam_init,
+                .fini     = osd_it_iam_fini,
+                .get      = osd_it_iam_get,
+                .put      = osd_it_iam_put,
+                .next     = osd_it_iam_next,
+                .key      = osd_it_iam_key,
+                .key_size = osd_it_iam_key_size,
+                .rec      = osd_it_iam_rec,
+                .store    = osd_it_iam_store,
+                .load     = osd_it_iam_load
         }
 };
 
-static int osd_index_compat_delete(const struct lu_env *env,
-                                   struct dt_object *dt,
-                                   const struct dt_key *key,
-                                   struct thandle *handle,
-                                   struct lustre_capa *capa)
+/**
+ * Creates or initializes iterator context.
+ *
+ * \retval struct osd_it_ea, iterator structure on success
+ *
+ */
+static struct dt_it *osd_it_ea_init(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct lustre_capa *capa)
+{
+        struct osd_object       *obj  = osd_dt_obj(dt);
+        struct osd_thread_info  *info = osd_oti_get(env);
+        struct osd_it_ea        *it   = &info->oti_it_ea;
+        struct lu_object        *lo   = &dt->do_lu;
+        struct dentry           *obj_dentry = &info->oti_obj_dentry;
+        ENTRY;
+        LASSERT(lu_object_exists(lo));
+
+        obj_dentry->d_inode = obj->oo_inode;
+        obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
+        obj_dentry->d_name.hash = 0;
+
+        it->oie_namelen         = 0;
+        it->oie_curr_pos        = 0;
+        it->oie_next_pos        = 0;
+        it->oie_obj             = obj;
+        it->oie_file.f_dentry   = obj_dentry;
+        it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
+        it->oie_file.f_op         = obj->oo_inode->i_fop;
+        it->oie_file.private_data = NULL;
+        lu_object_get(lo);
+
+        RETURN((struct dt_it*) it);
+}
+
+/**
+ * Destroy or finishes iterator context.
+ *
+ * \param di, struct osd_it_ea, iterator structure to be destroyed
+ */
+static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_object *obj = osd_dt_obj(dt);
+        struct osd_it_ea     *it   = (struct osd_it_ea *)di;
+        struct osd_object    *obj  = it->oie_obj;
+
 
-        LASSERT(handle != NULL);
-        LASSERT(S_ISDIR(obj->oo_inode->i_mode));
         ENTRY;
+        lu_object_put(env, &obj->oo_dt.do_lu);
+        EXIT;
+}
 
-#if 0
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
-                RETURN(-EACCES);
-#endif
+/**
+ * It position the iterator at given key, so that next lookup continues from
+ * that key Or it is similar to dio_it->load() but based on a key,
+ * rather than file position.
+ *
+ * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
+ * to the beginning.
+ *
+ * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
+ */
+static int osd_it_ea_get(const struct lu_env *env,
+                         struct dt_it *di, const struct dt_key *key)
+{
+        struct osd_it_ea     *it   = (struct osd_it_ea *)di;
 
-        RETURN(-EOPNOTSUPP);
+        ENTRY;
+        LASSERT(((const char *)key)[0] == '\0');
+        it->oie_namelen         = 0;
+        it->oie_curr_pos        = 0;
+        it->oie_next_pos        = 0;
+
+        RETURN(+1);
 }
 
-/*
- * Compatibility index operations.
+/**
+ * Does nothing
  */
+static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
+{
+}
 
-
-static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
-                           struct dentry *dentry, struct lu_fid_pack *pack)
+/**
+ * It is called internally by ->readdir(). It fills the
+ * iterator's in-memory data structure with required
+ * information i.e. name, namelen, rec_size etc.
+ *
+ * \param buf, in which information to be filled in.
+ * \param name, name of the file in given dir
+ *
+ * \retval 0, on success
+ * \retval 1, on buffer full
+ */
+static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
+                               loff_t offset, ino_t ino,
+                               unsigned int d_type)
 {
-        struct inode  *inode = dentry->d_inode;
-        struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
+        struct osd_it_ea   *it     = (struct osd_it_ea *)buf;
+        struct dirent64    *dirent = &it->oie_dirent64;
+        int                 reclen = LDISKFS_DIR_REC_LEN(namelen);
 
-        lu_igif_build(fid, inode->i_ino, inode->i_generation);
-        fid_cpu_to_be(fid, fid);
-        pack->fp_len = sizeof *fid + 1;
-        memcpy(pack->fp_area, fid, sizeof *fid);
+
+        ENTRY;
+        if (it->oie_namelen)
+                RETURN(-ENOENT);
+
+        if (namelen == 0 || namelen > LDISKFS_NAME_LEN)
+                RETURN(-EIO);
+
+        strncpy(dirent->d_name, name, LDISKFS_NAME_LEN);
+        dirent->d_name[namelen] = 0;
+        dirent->d_ino           = ino;
+        dirent->d_off           = offset;
+        dirent->d_reclen        = reclen;
+        it->oie_namelen         = namelen;
+        it->oie_curr_pos        = offset;
+
+        RETURN(0);
 }
 
-static int osd_index_compat_lookup(const struct lu_env *env,
-                                   struct dt_object *dt,
-                                   struct dt_rec *rec, const struct dt_key *key,
-                                   struct lustre_capa *capa)
+/**
+ * Calls ->readdir() to load a directory entry at a time
+ * and stored it in iterator's in-memory data structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
+ */
+int osd_ldiskfs_it_fill(const struct dt_it *di)
 {
-        struct osd_object *obj = osd_dt_obj(dt);
+        struct osd_it_ea   *it    = (struct osd_it_ea *)di;
+        struct osd_object  *obj   = it->oie_obj;
+        struct inode       *inode = obj->oo_inode;
+        int                result = 0;
 
-        struct osd_device      *osd  = osd_obj2dev(obj);
-        struct osd_thread_info *info = osd_oti_get(env);
-        struct inode           *dir;
+        ENTRY;
+        it->oie_namelen    = 0;
+        it->oie_file.f_pos = it->oie_curr_pos;
 
-        int result;
+        result = inode->i_fop->readdir(&it->oie_file, it,
+                                       (filldir_t) osd_ldiskfs_filldir);
 
-        /*
-         * XXX temporary solution.
-         */
-        struct dentry *dentry;
-        struct dentry *parent;
+        it->oie_next_pos = it->oie_file.f_pos;
 
-        LINVRNT(osd_invariant(obj));
-        LASSERT(S_ISDIR(obj->oo_inode->i_mode));
-        LASSERT(osd_has_index(obj));
+        if(!result && it->oie_namelen == 0)
+                result = -EIO;
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
-                return -EACCES;
+        RETURN(result);
+}
 
-        info->oti_str.name = (const char *)key;
-        info->oti_str.len  = strlen((const char *)key);
+/**
+ * It calls osd_ldiskfs_it_fill() which will use ->readdir()
+ * to load a directory entry at a time and stored it in
+ * iterator's in-memory data structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval +ve, iterator reached to end
+ * \retval   0, iterator not reached to end
+ * \retval -ve, on error
+ */
+static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
+{
+        struct osd_it_ea *it = (struct osd_it_ea *)di;
+        int rc;
 
-        dir = obj->oo_inode;
-        LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
+        ENTRY;
+        it->oie_curr_pos = it->oie_next_pos;
 
-        parent = d_alloc_root(dir);
-        if (parent == NULL)
-                return -ENOMEM;
-        igrab(dir);
-        dentry = d_alloc(parent, &info->oti_str);
-        if (dentry != NULL) {
-                struct dentry *d;
+        if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
+                rc = +1;
+        else
+                rc = osd_ldiskfs_it_fill(di);
 
-                /*
-                 * XXX passing NULL for nameidata should work for
-                 * ext3/ldiskfs.
-                 */
-                d = dir->i_op->lookup(dir, dentry, NULL);
-                if (d == NULL) {
-                        /*
-                         * normal case, result is in @dentry.
-                         */
-                        if (dentry->d_inode != NULL) {
-                                osd_build_pack(env, osd, dentry,
-                                               (struct lu_fid_pack *)rec);
-                                result = 0;
-                        } else
-                                result = -ENOENT;
-                 } else {
-                        /* What? Disconnected alias? Ppheeeww... */
-                        CERROR("Aliasing where not expected\n");
-                        result = -EIO;
-                        dput(d);
-                }
-                dput(dentry);
-        } else
-                result = -ENOMEM;
-        dput(parent);
-        LINVRNT(osd_invariant(obj));
-        return result;
+        RETURN(rc);
+}
+
+/**
+ * Returns the key at current position from iterator's in memory structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval key i.e. struct dt_key on success
+ */
+static struct dt_key *osd_it_ea_key(const struct lu_env *env,
+                                    const struct dt_it *di)
+{
+        struct osd_it_ea *it = (struct osd_it_ea *)di;
+        ENTRY;
+        RETURN((struct dt_key *)it->oie_dirent64.d_name);
 }
 
-static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
-                       struct inode *dir, struct inode *inode, const char *name)
+/**
+ * Returns the key's size at current position from iterator's in memory structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval key_size i.e. struct dt_key on success
+ */
+static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
 {
-        struct dentry *old;
-        struct dentry *new;
-        struct dentry *parent;
+        struct osd_it_ea *it = (struct osd_it_ea *)di;
+        ENTRY;
+        RETURN(it->oie_namelen);
+}
 
-        int result;
+/**
+ * Returns the value (i.e. fid/igif) at current position from iterator's
+ * in memory structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval value i.e. struct dt_rec on success
+ */
+static struct dt_rec *osd_it_ea_rec(const struct lu_env *env,
+                                    const struct dt_it *di)
+{
+        struct osd_it_ea       *it     = (struct osd_it_ea *)di;
+        struct osd_object      *obj    = it->oie_obj;
+        struct osd_thread_info *info   = osd_oti_get(env);
+        struct osd_inode_id    *id     = &info->oti_id;
+        struct lu_fid_pack     *rec    = &info->oti_pack;
+        struct lu_device       *ldev   = obj->oo_dt.do_lu.lo_dev;
+        struct dentry          *dentry = &info->oti_child_dentry;
+        struct osd_device      *dev;
+        struct inode           *inode;
+        int                    rc;
 
-        info->oti_str.name = name;
-        info->oti_str.len  = strlen(name);
-
-        LASSERT(atomic_read(&dir->i_count) > 0);
-        result = -ENOMEM;
-        old = d_alloc(dev->od_obj_area, &info->oti_str);
-        if (old != NULL) {
-                d_instantiate(old, inode);
-                igrab(inode);
-                LASSERT(atomic_read(&dir->i_count) > 0);
-                parent = d_alloc_root(dir);
-                if (parent != NULL) {
-                        igrab(dir);
-                        LASSERT(atomic_read(&dir->i_count) > 1);
-                        new = d_alloc(parent, &info->oti_str);
-                        LASSERT(atomic_read(&dir->i_count) > 1);
-                        if (new != NULL) {
-                                LASSERT(atomic_read(&dir->i_count) > 1);
-                                result = dir->i_op->link(old, dir, new);
-                                LASSERT(atomic_read(&dir->i_count) > 1);
-                                dput(new);
-                                LASSERT(atomic_read(&dir->i_count) > 1);
-                        }
-                        LASSERT(atomic_read(&dir->i_count) > 1);
-                        dput(parent);
-                        LASSERT(atomic_read(&dir->i_count) > 0);
-                }
-                dput(old);
+        ENTRY;
+        dev  = osd_dev(ldev);
+        id->oii_ino = it->oie_dirent64.d_ino;
+        id->oii_gen = OSD_OII_NOGEN;
+        inode = osd_iget(info, dev, id);
+        if (!IS_ERR(inode)) {
+                dentry->d_inode = inode;
+                LASSERT(dentry->d_inode->i_sb == osd_sb(dev));
+        } else {
+                CERROR("Error getting inode for ino =%d", id->oii_ino);
+                RETURN((struct dt_rec *) PTR_ERR(inode));
         }
-        LASSERT(atomic_read(&dir->i_count) > 0);
-        return result;
+
+        rc = osd_ea_fid_get(env, dentry, (struct dt_rec*) rec);
+
+        iput(inode);
+        RETURN((struct dt_rec *)rec);
+
 }
 
+/**
+ * Returns a cookie for current position of the iterator head, so that
+ * user can use this cookie to load/start the iterator next time.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval cookie for current position, on success
+ */
+static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
+{
+        struct osd_it_ea *it = (struct osd_it_ea *)di;
+        ENTRY;
+        RETURN(it->oie_curr_pos);
+}
 
-/*
- * XXX Temporary stuff.
+/**
+ * It calls osd_ldiskfs_it_fill() which will use ->readdir()
+ * to load a directory entry at a time and stored it i inn,
+ * in iterator's in-memory data structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval +ve, on success
+ * \retval -ve, on error
  */
-static int osd_index_compat_insert(const struct lu_env *env,
-                                   struct dt_object *dt,
-                                   const struct dt_rec *rec,
-                                   const struct dt_key *key, struct thandle *th,
-                                   struct lustre_capa *capa,
-                                   int ignore_quota)
+static int osd_it_ea_load(const struct lu_env *env,
+                          const struct dt_it *di, __u64 hash)
 {
-        struct osd_object     *obj = osd_dt_obj(dt);
+        struct osd_it_ea *it = (struct osd_it_ea *)di;
+        int rc;
 
-        const char          *name = (const char *)key;
+        ENTRY;
+        it->oie_curr_pos = it->oie_next_pos = hash;
 
-        struct lu_device    *ludev = dt->do_lu.lo_dev;
-        struct lu_object    *luch;
+        rc =  osd_ldiskfs_it_fill(di);
+        if (rc == 0)
+                rc = +1;
 
-        struct osd_thread_info   *info = osd_oti_get(env);
-        const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
-        struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
+        RETURN(rc);
+}
+/**
+ * Index and Iterator operations for interoperability
+ * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
+ */
+static const struct dt_index_operations osd_index_ea_ops = {
+        .dio_lookup = osd_index_ea_lookup,
+        .dio_insert = osd_index_ea_insert,
+        .dio_delete = osd_index_ea_delete,
+        .dio_it     = {
+                .init     = osd_it_ea_init,
+                .fini     = osd_it_ea_fini,
+                .get      = osd_it_ea_get,
+                .put      = osd_it_ea_put,
+                .next     = osd_it_ea_next,
+                .key      = osd_it_ea_key,
+                .key_size = osd_it_ea_key_size,
+                .rec      = osd_it_ea_rec,
+                .store    = osd_it_ea_store,
+                .load     = osd_it_ea_load
+        }
+};
 
-        int result;
+/**
+ * Index lookup function for interoperability mode (b11826).
+ *
+ * \param key,  key i.e. file name to be searched
+ *
+ * \retval +ve, on success
+ * \retval -ve, on error
+ */
+static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
+                               struct dt_rec *rec, const struct dt_key *key,
+                               struct lustre_capa *capa)
+{
+        struct osd_object *obj = osd_dt_obj(dt);
+        int rc = 0;
+
+        ENTRY;
 
         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
         LINVRNT(osd_invariant(obj));
-        LASSERT(th != NULL);
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
                 return -EACCES;
 
-        result = fid_unpack(pack, fid);
-        if (result != 0)
-                return result;
+        rc = osd_ea_lookup_rec(env, obj, rec, key);
 
-        luch = lu_object_find(env, ludev, fid, NULL);
-        if (!IS_ERR(luch)) {
-                if (lu_object_exists(luch)) {
-                        struct osd_object *child;
-
-                        child = osd_obj(lu_object_locate(luch->lo_header,
-                                                         ludev->ld_type));
-                        if (child != NULL)
-                                result = osd_add_rec(info, osd_obj2dev(obj),
-                                                     obj->oo_inode,
-                                                     child->oo_inode, name);
-                        else {
-                                CERROR("No osd slice.\n");
-                                result = -ENOENT;
-                        }
-                        LINVRNT(osd_invariant(obj));
-                        LINVRNT(osd_invariant(child));
-                } else {
-                        CERROR("Sorry.\n");
-                        result = -ENOENT;
-                }
-                lu_object_put(env, luch);
-        } else
-                result = PTR_ERR(luch);
-        LINVRNT(osd_invariant(obj));
-        return result;
+        if (rc == 0)
+                rc = +1;
+        RETURN(rc);
 }
 
-static const struct dt_index_operations osd_index_compat_ops = {
-        .dio_lookup = osd_index_compat_lookup,
-        .dio_insert = osd_index_compat_insert,
-        .dio_delete = osd_index_compat_delete
-};
-
 /* type constructor/destructor: osd_type_init, osd_type_fini */
 LU_TYPE_INIT_FINI(osd, &osd_key);
 
@@ -2506,7 +3360,7 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
         struct osd_thread_info *info = osd_oti_get(env);
         ENTRY;
         if (o->od_obj_area != NULL) {
-                dput(o->od_obj_area);
+                lu_object_put(env, &o->od_obj_area->do_lu);
                 o->od_obj_area = NULL;
         }
         osd_oi_fini(info, &o->od_oi);
@@ -2519,8 +3373,8 @@ static int osd_mount(const struct lu_env *env,
 {
         struct lustre_mount_info *lmi;
         const char               *dev  = lustre_cfg_string(cfg, 0);
-        struct osd_thread_info   *info = osd_oti_get(env);
-        int result;
+        struct lustre_disk_data  *ldd;
+        struct lustre_sb_info    *lsi;
 
         ENTRY;
 
@@ -2540,20 +3394,17 @@ static int osd_mount(const struct lu_env *env,
         /* save lustre_mount_info in dt_device */
         o->od_mount = lmi;
 
-        result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
-        if (result == 0) {
-                struct dentry *d;
+        lsi = s2lsi(lmi->lmi_sb);
+        ldd = lsi->lsi_ldd;
 
-                d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
-                                 0777, 1);
-                if (!IS_ERR(d)) {
-                        o->od_obj_area = d;
-                } else
-                        result = PTR_ERR(d);
-        }
-        if (result != 0)
-                osd_shutdown(env, o);
-        RETURN(result);
+        if (ldd->ldd_flags & LDD_F_IAM_DIR) {
+                o->od_iop_mode = 0;
+                LCONSOLE_WARN("OSD: IAM mode enabled\n");
+        } else
+                o->od_iop_mode = 1;
+
+        o->od_obj_area = NULL;
+        RETURN(0);
 }
 
 static struct lu_device *osd_device_fini(const struct lu_env *env,
@@ -2640,11 +3491,12 @@ static int osd_process_config(const struct lu_env *env,
                 err = osd_shutdown(env, o);
                 break;
         default:
-                err = -ENOTTY;
+                err = -ENOSYS;
         }
 
         RETURN(err);
 }
+
 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
                                     struct ldiskfs_super_block * es);
 
@@ -2658,6 +3510,49 @@ static int osd_recovery_complete(const struct lu_env *env,
         RETURN(0);
 }
 
+static int osd_prepare(const struct lu_env *env,
+                       struct lu_device *pdev,
+                       struct lu_device *dev)
+{
+        struct osd_device *osd = osd_dev(dev);
+        struct lustre_sb_info *lsi;
+        struct lustre_disk_data *ldd;
+        struct lustre_mount_info  *lmi;
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct dt_object *d;
+        int result;
+
+        ENTRY;
+        /* 1. initialize oi before any file create or file open */
+        result = osd_oi_init(oti, &osd->od_oi,
+                             &osd->od_dt_dev, lu2md_dev(pdev));
+        if (result != 0)
+                RETURN(result);
+
+        lmi = osd->od_mount;
+        lsi = s2lsi(lmi->lmi_sb);
+        ldd = lsi->lsi_ldd;
+
+        /* 2. setup local objects */
+        result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
+        if (result)
+                goto out;
+
+        /* 3. open remote object dir */
+        d = dt_store_open(env, lu2dt_dev(dev), "",
+                          remote_obj_dir, &oti->oti_fid);
+        if (!IS_ERR(d)) {
+                osd->od_obj_area = d;
+                result = 0;
+        } else {
+                result = PTR_ERR(d);
+                osd->od_obj_area = NULL;
+        }
+
+out:
+        RETURN(result);
+}
+
 static struct inode *osd_iget(struct osd_thread_info *info,
                               struct osd_device *dev,
                               const struct osd_inode_id *id)
@@ -2672,7 +3567,8 @@ static struct inode *osd_iget(struct osd_thread_info *info,
                 CERROR("bad inode\n");
                 iput(inode);
                 inode = ERR_PTR(-ENOENT);
-        } else if (inode->i_generation != id->oii_gen) {
+        } else if (id->oii_gen != OSD_OII_NOGEN &&
+                   inode->i_generation != id->oii_gen) {
                 CERROR("stale inode\n");
                 iput(inode);
                 inode = ERR_PTR(-ESTALE);
@@ -2719,6 +3615,10 @@ static int osd_fid_lookup(const struct lu_env *env,
                 if (!IS_ERR(inode)) {
                         obj->oo_inode = inode;
                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
+                        if (dev->od_iop_mode) {
+                                obj->oo_compat_dot_created = 1;
+                                obj->oo_compat_dotdot_created = 1;
+                        }
                         result = 0;
                 } else
                         /*
@@ -2732,6 +3632,7 @@ static int osd_fid_lookup(const struct lu_env *env,
         } else if (result == -ENOENT)
                 result = 0;
         LINVRNT(osd_invariant(obj));
+
         RETURN(result);
 }
 
@@ -2831,7 +3732,8 @@ static const struct lu_object_operations osd_lu_obj_ops = {
 static const struct lu_device_operations osd_lu_ops = {
         .ldo_object_alloc      = osd_object_alloc,
         .ldo_process_config    = osd_process_config,
-        .ldo_recovery_complete = osd_recovery_complete
+        .ldo_recovery_complete = osd_recovery_complete,
+        .ldo_prepare           = osd_prepare,
 };
 
 static const struct lu_device_type_operations osd_device_type_ops = {
@@ -2862,10 +3764,19 @@ static struct obd_ops osd_obd_device_ops = {
         .o_owner = THIS_MODULE
 };
 
+static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
+        .llod_name      = remote_obj_dir,
+        .llod_oid       = OSD_REM_OBJ_DIR_OID,
+        .llod_is_index  = 1,
+        .llod_feat      = &dt_directory_features,
+};
+
 static int __init osd_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
 
+        osd_oi_mod_init();
+        llo_local_obj_register(&llod_osd_rem_obj_dir);
         lprocfs_osd_init_vars(&lvars);
         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
                                    LUSTRE_OSD_NAME, &osd_device_type);
index e187323..952754a 100644 (file)
@@ -54,6 +54,8 @@
 /* struct dentry */
 #include <linux/dcache.h>
 #include <linux/lustre_iam.h>
+/* struct dirent64 */
+#include <linux/dirent.h>
 
 /* LUSTRE_OSD_NAME */
 #include <obd.h>
@@ -66,6 +68,7 @@
 
 struct inode;
 
+#define OSD_OII_NOGEN (0)
 #define OSD_COUNTERS (0)
 
 #ifdef HAVE_QUOTA_SUPPORT
@@ -90,7 +93,7 @@ struct osd_device {
          * XXX temporary stuff for object index: directory where every object
          * is named by its fid.
          */
-        struct dentry            *od_obj_area;
+        struct dt_object         *od_obj_area;
 
         /* Environment for transaction commit callback.
          * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
@@ -117,23 +120,59 @@ struct osd_device {
         cfs_time_t                od_osfs_age;
         struct kstatfs            od_kstatfs;
         spinlock_t                od_osfs_lock;
+
+        /**
+         * The following flag indicates, if it is interop mode or not.
+         * It will be initialized, using mount param.
+         */
+        __u32                     od_iop_mode;
 };
 
+/**
+ * This is iterator's in-memory data structure in interoperability
+ * mode (i.e. iterator over ldiskfs style directory)
+ */
+struct osd_it_ea {
+        struct osd_object   *oie_obj;
+        /** used in ldiskfs iterator, to stored file pointer */
+        struct file          oie_file;
+        /** used in ldiskfs iterator, to store directory entry */
+        struct dirent64      oie_dirent64;
+        /** current file position */
+        __u64               oie_curr_pos;
+        /** next file position */
+        __u64               oie_next_pos;
+        /** namelen of the file */
+        __u8                oie_namelen;
+
+};
+
+/**
+ * Iterator's in-memory data structure for IAM mode.
+ */
+struct osd_it_iam {
+        struct osd_object     *oi_obj;
+        struct iam_path_descr *oi_ipd;
+        struct iam_iterator    oi_it;
+};
 
 struct osd_thread_info {
         const struct lu_env   *oti_env;
+        /**
+         * used for index operations.
+         */
+        struct dentry          oti_obj_dentry;
+        struct dentry          oti_child_dentry;
 
         struct lu_fid          oti_fid;
         struct osd_inode_id    oti_id;
         /*
          * XXX temporary: for ->i_op calls.
          */
-        struct qstr            oti_str;
         struct txn_param       oti_txn;
         /*
          * XXX temporary: fake dentry used by xattr calls.
          */
-        struct dentry          oti_dentry;
         struct timespec        oti_time;
         /*
          * XXX temporary: fake struct file for osd_object_sync
@@ -147,14 +186,43 @@ struct osd_thread_info {
 
         struct lu_fid_pack     oti_pack;
 
-        /* union to guarantee that ->oti_ipd[] has proper alignment. */
+        /**
+         * following ipd and it structures are used for osd_index_iam_lookup()
+         * these are defined separately as we might do index operation
+         * in open iterator session.
+         */
+
+        /** osd iterator context used for iterator session */
+
+        union {
+                struct osd_it_iam      oti_it;
+                /** ldiskfs iterator data structure, see osd_it_ea_{init, fini} */
+                struct osd_it_ea       oti_it_ea;
+        };
+
+
+        /** IAM iterator for index operation. */
+        struct iam_iterator    oti_idx_it;
+
+        /** union to guarantee that ->oti_ipd[] has proper alignment. */
         union {
-        char                   oti_ipd[DX_IPD_MAX_SIZE];
+                char           oti_it_ipd[DX_IPD_MAX_SIZE];
                 long long      oti_alignment_lieutenant;
         };
+
+        union {
+                char           oti_idx_ipd[DX_IPD_MAX_SIZE];
+                long long      oti_alignment_lieutenant_colonel;
+        };
+
+
         int                    oti_r_locks;
         int                    oti_w_locks;
         int                    oti_txns;
+        /** used in osd_fid_set() to put xattr */
+        struct lu_buf          oti_buf;
+        /** used in osd_ea_fid_set() to set fid into common ea */
+        struct lustre_mdt_attrs oti_mdt_attrs;
 #ifdef HAVE_QUOTA_SUPPORT
         struct osd_ctxt        oti_ctxt;
 #endif
index 79d4082..ad03c3c 100644 (file)
 struct oi_descr {
         int   fid_size;
         char *name;
+        __u32 oid;
+};
+
+/** to serialize concurrent OI index initialization */
+static struct mutex oi_init_lock;
+
+static struct dt_index_features oi_feat = {
+        .dif_flags       = DT_IND_UPDATE,
+        .dif_recsize_min = sizeof(struct osd_inode_id),
+        .dif_recsize_max = sizeof(struct osd_inode_id),
+        .dif_ptrsize     = 4
 };
 
 static const struct oi_descr oi_descr[OSD_OI_FID_NR] = {
         [OSD_OI_FID_SMALL] = {
                 .fid_size = 5,
-                .name     = "oi.5"
+                .name     = "oi.5",
+                .oid      = OSD_OI_FID_SMALL_OID
         },
         [OSD_OI_FID_OTHER] = {
                 .fid_size = sizeof(struct lu_fid),
-                .name     = "oi.16"
+                .name     = "oi.16",
+                .oid      = OSD_OI_FID_OTHER_OID
         }
 };
 
+static int osd_oi_index_create(struct osd_thread_info *info,
+                               struct dt_device *dev,
+                               struct md_device *mdev)
+{
+        const struct lu_env *env;
+        struct lu_fid *oi_fid = &info->oti_fid;
+        struct md_object *mdo;
+        int i;
+        int rc;
+
+        env = info->oti_env;
+
+        for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
+                char *name;
+                name = oi_descr[i].name;
+                lu_local_obj_fid(oi_fid, oi_descr[i].oid);
+                oi_feat.dif_keysize_min = oi_descr[i].fid_size,
+                oi_feat.dif_keysize_max = oi_descr[i].fid_size,
+
+                mdo = llo_store_create_index(env, mdev, dev,
+                                             "/", name,
+                                             oi_fid, &oi_feat);
+
+                if (IS_ERR(mdo))
+                        RETURN(PTR_ERR(mdo));
+
+                lu_object_put(env, &mdo->mo_lu);
+        }
+        return 0;
+}
+
 int osd_oi_init(struct osd_thread_info *info,
-                struct osd_oi *oi, struct dt_device *dev)
+                struct osd_oi *oi,
+                struct dt_device *dev,
+                struct md_device *mdev)
 {
+        const struct lu_env *env;
         int rc;
         int i;
-        const struct lu_env *env;
 
         CLASSERT(ARRAY_SIZE(oi->oi_dir) == ARRAY_SIZE(oi_descr));
 
         env = info->oti_env;
-
+        mutex_lock(&oi_init_lock);
         memset(oi, 0, sizeof *oi);
-
-        for (i = rc = 0; i < ARRAY_SIZE(oi->oi_dir) && rc == 0; ++i) {
+retry:
+        for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
                 const char       *name;
-                /*
-                 * Allocate on stack---this is initialization.
-                 */
-                const struct dt_index_features feat = {
-                        .dif_flags       = DT_IND_UPDATE,
-                        .dif_keysize_min = oi_descr[i].fid_size,
-                        .dif_keysize_max = oi_descr[i].fid_size,
-                        .dif_recsize_min = sizeof(struct osd_inode_id),
-                        .dif_recsize_max = sizeof(struct osd_inode_id)
-                };
                 struct dt_object *obj;
 
                 name = oi_descr[i].name;
-                obj = dt_store_open(env, dev, name, &info->oti_fid);
+                oi_feat.dif_keysize_min = oi_descr[i].fid_size,
+                oi_feat.dif_keysize_max = oi_descr[i].fid_size,
+
+                obj = dt_store_open(env, dev, "", name, &info->oti_fid);
                 if (!IS_ERR(obj)) {
-                        rc = obj->do_ops->do_index_try(env, obj, &feat);
+                        rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
                         if (rc == 0) {
                                 LASSERT(obj->do_index_ops != NULL);
                                 oi->oi_dir[i] = obj;
@@ -130,17 +169,25 @@ int osd_oi_init(struct osd_thread_info *info,
                         }
                 } else {
                         rc = PTR_ERR(obj);
+                        if (rc == -ENOENT) {
+                                rc = osd_oi_index_create(info, dev, mdev);
+                                if (!rc)
+                                        goto retry;
+                        }
                         CERROR("Cannot open \"%s\": %d\n", name, rc);
                 }
         }
         if (rc != 0)
                 osd_oi_fini(info, oi);
+
+        mutex_unlock(&oi_init_lock);
         return rc;
 }
 
 void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi)
 {
         int i;
+
         for (i = 0; i < ARRAY_SIZE(oi->oi_dir); ++i) {
                 if (oi->oi_dir[i] != NULL) {
                         lu_object_put(info->oti_env, &oi->oi_dir[i]->do_lu);
@@ -171,6 +218,16 @@ static const struct dt_key *oi_fid_key(struct osd_thread_info *info,
         return NULL;
 }
 
+static inline int fid_is_oi_fid(const struct lu_fid *fid)
+{
+        /* We need to filter-out oi obj's fid. As we can not store it, while
+         * oi-index create operation.
+         */
+        return (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
+               (fid_oid(fid) == OSD_OI_FID_SMALL_OID ||
+                fid_oid(fid) == OSD_OI_FID_OTHER_OID)));
+}
+
 int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
                   const struct lu_fid *fid, struct osd_inode_id *id)
 {
@@ -183,12 +240,19 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
                 struct dt_object    *idx;
                 const struct dt_key *key;
 
+                if (fid_is_oi_fid(fid))
+                        return -ENOENT;
+
                 key = oi_fid_key(info, oi, fid, &idx);
                 rc = idx->do_index_ops->dio_lookup(info->oti_env, idx,
                                                    (struct dt_rec *)id, key,
                                                    BYPASS_CAPA);
-                id->oii_ino = be32_to_cpu(id->oii_ino);
-                id->oii_gen = be32_to_cpu(id->oii_gen);
+                if (rc > 0) {
+                        id->oii_ino = be32_to_cpu(id->oii_ino);
+                        id->oii_gen = be32_to_cpu(id->oii_gen);
+                        rc = 0;
+                } else if (rc == 0)
+                        rc = -ENOENT;
         }
         return rc;
 }
@@ -204,6 +268,9 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi,
         if (fid_is_igif(fid))
                 return 0;
 
+        if (fid_is_oi_fid(fid))
+                return 0;
+
         key = oi_fid_key(info, oi, fid, &idx);
         id  = &info->oti_id;
         id->oii_ino = cpu_to_be32(id0->oii_ino);
@@ -228,3 +295,9 @@ int osd_oi_delete(struct osd_thread_info *info,
         return idx->do_index_ops->dio_delete(info->oti_env, idx,
                                              key, th, BYPASS_CAPA);
 }
+
+int osd_oi_mod_init()
+{
+        mutex_init(&oi_init_lock);
+        return 0;
+}
index 8e02eb2..fe87768 100644 (file)
@@ -54,6 +54,7 @@
 /* struct rw_semaphore */
 #include <linux/rwsem.h>
 #include <lu_object.h>
+#include <md_object.h>
 
 struct lu_fid;
 struct osd_thread_info;
@@ -90,8 +91,11 @@ struct osd_inode_id {
         __u32 oii_gen; /* inode generation */
 };
 
-int  osd_oi_init(struct osd_thread_info *info,
-                 struct osd_oi *oi, struct dt_device *dev);
+int osd_oi_mod_init(void);
+int osd_oi_init(struct osd_thread_info *info,
+                struct osd_oi *oi,
+                struct dt_device *dev,
+                struct md_device *mdev);
 void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi);
 
 int  osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
index 6358789..3b573bc 100644 (file)
@@ -29,6 +29,7 @@ TMP=${TMP:-/tmp}
 MDSDEV=${MDSDEV:-$TMP/${FSNAME}-mdt1}
 MDSCOUNT=${MDSCOUNT:-3}
 test $MDSCOUNT -gt 4 && MDSCOUNT=4
+MDSCOUNT=1
 MDSDEVBASE=${MDSDEVBASE:-$TMP/${FSNAME}-mdt}
 MDSSIZE=${MDSSIZE:-100000}
 
index 3cf4e80..f4754f4 100644 (file)
@@ -1022,26 +1022,27 @@ test_32b() {
        [ -z "$TUNEFS" ] && skip "No tunefs" && return
        local DISK1_8=$LUSTRE/tests/disk1_8.tgz
        [ ! -r $DISK1_8 ] && skip "Cannot find $DISK1_8" && return 0
-       mkdir -p $TMP/$tdir
-       tar xjvf $DISK1_8 -C $TMP/$tdir || \
+       local tmpdir=$TMP/$tdir
+       mkdir -p $tmpdir
+       tar xjvf $DISK1_8 -C $tmpdir || \
                { skip "Cannot untar $DISK1_8" && return ; }
 
        load_modules
        lctl set_param debug=$PTLDEBUG
-       NEWNAME=sofia
+       NEWNAME=lustre
 
        # writeconf will cause servers to register with their current nids
        $TUNEFS --writeconf --fsname=$NEWNAME $tmpdir/mds || error "tunefs failed"
-       start mds $tmpdir/mds "-o loop" || return 3
+       start mds1 $tmpdir/mds "-o loop" || return 3
        local UUID=$(lctl get_param -n mdt.${NEWNAME}-MDT0000.uuid)
        echo MDS uuid $UUID
-       [ "$UUID" == "mdsA_UUID" ] || error "UUID is wrong: $UUID" 
+       [ "$UUID" == "${NEWNAME}-MDT0000_UUID" ] || error "UUID is wrong: $UUID" 
 
-       $TUNEFS --mgsnode=`hostname` --fsname=$NEWNAME --writeconf $tmpdir/ost1 || error "tunefs failed"
+       $TUNEFS --mgsnode=`hostname` --writeconf --fsname=$NEWNAME $tmpdir/ost1 || error "tunefs failed"
        start ost1 $tmpdir/ost1 "-o loop" || return 5
        UUID=$(lctl get_param -n obdfilter.${NEWNAME}-OST0000.uuid)
        echo OST uuid $UUID
-       [ "$UUID" == "ost1_UUID" ] || error "UUID is wrong: $UUID"
+       [ "$UUID" == "${NEWNAME}-OST0000_UUID" ] || error "UUID is wrong: $UUID"
 
        echo "OSC changes should succeed:" 
        $LCTL conf_param ${NEWNAME}-OST0000.osc.max_dirty_mb=15 || return 7
@@ -1059,7 +1060,7 @@ test_32b() {
        mount_client $MOUNT
        FSNAME=$OLDFS
        set_and_check client "lctl get_param -n mdc.*.max_rpcs_in_flight" "${NEWNAME}-MDT0000.mdc.max_rpcs_in_flight" || return 11
-       [ "$(cksum $MOUNT/passwd | cut -d' ' -f 1,2)" == "2479747619 779" ] || return 12  
+       [ "$(cksum $MOUNT/passwd | cut -d' ' -f 1,2)" == "94306271 1478" ] || return 12
        echo "ok."
 
        cleanup
diff --git a/lustre/tests/disk1_8.tgz b/lustre/tests/disk1_8.tgz
new file mode 100644 (file)
index 0000000..1657c1e
Binary files /dev/null and b/lustre/tests/disk1_8.tgz differ
index f4b7a48..dce9601 100644 (file)
@@ -15,6 +15,7 @@ export GSS=false
 export GSS_KRB5=false
 export GSS_PIPEFS=false
 export IDENTITY_UPCALL=default
+
 #export PDSH="pdsh -S -Rssh -w"
 
 # eg, assert_env LUSTRE MDSNODES OSTNODES CLIENTS
@@ -1080,6 +1081,11 @@ mdsmkfsopts()
 }
 
 formatall() {
+    if [ "$IAMDIR" == "yes" ]; then
+        MDS_MKFS_OPTS="$MDS_MKFS_OPTS --iam-dir"
+        MDSn_MKFS_OPTS="$MDSn_MKFS_OPTS --iam-dir"
+    fi
+
     [ "$FSTYPE" ] && FSTYPE_OPT="--backfstype $FSTYPE"
 
     if [ ! -z $SEC ]; then
index 0bd83b7..8f54f8b 100644 (file)
@@ -98,6 +98,7 @@ char *progname;
 int verbose = 1;
 static int print_only = 0;
 static int failover = 0;
+static int upgrade_to_18 = 0;
 
 void usage(FILE *out)
 {
@@ -130,6 +131,7 @@ void usage(FILE *out)
                 "\t\t--mkfsoptions=<opts> : format options\n"
                 "\t\t--reformat: overwrite an existing disk\n"
                 "\t\t--stripe-count-hint=#N : used for optimizing MDT inode size\n"
+                "\t\t--iam-dir: make use of IAM directory format on backfs, incompatible with ext3.\n"
 #else
                 "\t\t--erase-params : erase all old parameter settings\n"
                 "\t\t--nomgs: turn off MGS service on this MDT\n"
@@ -716,7 +718,7 @@ void print_ldd(char *str, struct lustre_disk_data *ldd)
         printf("Lustre FS:  %s\n", ldd->ldd_fsname);
         printf("Mount type: %s\n", MT_STR(ldd));
         printf("Flags:      %#x\n", ldd->ldd_flags);
-        printf("              (%s%s%s%s%s%s%s%s)\n",
+        printf("              (%s%s%s%s%s%s%s%s%s)\n",
                IS_MDT(ldd) ? "MDT ":"",
                IS_OST(ldd) ? "OST ":"",
                IS_MGS(ldd) ? "MGS ":"",
@@ -724,6 +726,7 @@ void print_ldd(char *str, struct lustre_disk_data *ldd)
                ldd->ldd_flags & LDD_F_VIRGIN     ? "first_time ":"",
                ldd->ldd_flags & LDD_F_UPDATE     ? "update ":"",
                ldd->ldd_flags & LDD_F_WRITECONF  ? "writeconf ":"",
+               ldd->ldd_flags & LDD_F_IAM_DIR  ? "IAM_dir_format ":"",
                ldd->ldd_flags & LDD_F_UPGRADE14  ? "upgrade1.4 ":"");
         printf("Persistent mount opts: %s\n", ldd->ldd_mount_opts);
         printf("Parameters:%s\n", ldd->ldd_params);
@@ -732,6 +735,67 @@ void print_ldd(char *str, struct lustre_disk_data *ldd)
         printf("\n");
 }
 
+static int touch_file(char *filename)
+{
+        int fd;
+
+        if (filename == NULL) {
+                return 1;
+        }
+
+        fd = open(filename, O_CREAT | O_TRUNC, 0600);
+        if (fd < 0) {
+                return 1;
+        } else {
+                close(fd);
+                return 0;
+        }
+}
+
+/* keep it less than LL_FID_NAMELEN */
+#define DUMMY_FILE_NAME_LEN             25
+#define EXT3_DIRENT_SIZE                DUMMY_FILE_NAME_LEN
+
+/* Need to add these many entries to this directory to make HTREE dir. */
+#define MIN_ENTRIES_REQ_FOR_HTREE       ((L_BLOCK_SIZE / EXT3_DIRENT_SIZE))
+
+static int add_dummy_files(char *dir)
+{
+        char fpname[PATH_MAX];
+        int i;
+        int rc;
+
+        for (i = 0; i < MIN_ENTRIES_REQ_FOR_HTREE; i++) {
+                snprintf(fpname, PATH_MAX, "%s/%0*d", dir,
+                         DUMMY_FILE_NAME_LEN, i);
+
+                rc = touch_file(fpname);
+                if (rc && rc != -EEXIST) {
+                        fprintf(stderr,
+                                "%s: Can't create dummy file %s: %s\n",
+                                progname, fpname , strerror(errno));
+                        return rc;
+                }
+        }
+        return 0;
+}
+
+static int __l_mkdir(char * filepnm, int mode , struct mkfs_opts *mop)
+{
+        int ret;
+
+        ret = mkdir(filepnm, mode);
+        if (ret && ret != -EEXIST)
+                return ret;
+
+        /* IAM mode supports ext3 directories of HTREE type only. So add dummy
+         * entries to new directory to create htree type of container for
+         * this directory. */
+        if (mop->mo_ldd.ldd_flags & LDD_F_IAM_DIR)
+                return add_dummy_files(filepnm);
+        return 0;
+}
+
 /* Write the server config files */
 int write_local_files(struct mkfs_opts *mop)
 {
@@ -766,7