Whamcloud - gitweb
LU-911 osd: OI is implemented internally within OSD
authorAlex Zhuravlev <bzzz@whamcloud.com>
Mon, 12 Dec 2011 16:16:19 +0000 (19:16 +0300)
committerOleg Drokin <green@whamcloud.com>
Thu, 29 Mar 2012 03:20:15 +0000 (23:20 -0400)
previously lu infrastructure was used and it was impossible
to initialize OI without MDD (which is the case of OST).
instead ldiskfs osd use ldiskfs directly.

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: Iaca243959dbb83bbbe81145b2ceaaa97f95be70c
Reviewed-on: http://review.whamcloud.com/1835
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_fid.h
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_oi.c
lustre/osd-ldiskfs/osd_oi.h

index 32d680c..a9aff3a 100644 (file)
@@ -124,12 +124,6 @@ enum local_oid {
         LLOG_CATALOGS_OID       = 4118UL,
         MGS_CONFIGS_OID         = 4119UL,
         OFD_HEALTH_CHECK_OID    = 4120UL,
         LLOG_CATALOGS_OID       = 4118UL,
         MGS_CONFIGS_OID         = 4119UL,
         OFD_HEALTH_CHECK_OID    = 4120UL,
-
-        /** first OID for first OI fid */
-        OSD_OI_FID_OID_FIRST    = 5000UL,
-        /** reserve enough in case we want to have more in the future */
-        OSD_OI_FID_OID_MAX      = OSD_OI_FID_OID_FIRST +
-                                  (1UL << OSD_OI_FID_OID_BITS_MAX),
 };
 
 static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
 };
 
 static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
index 1ff54ba..84e7573 100644 (file)
@@ -110,8 +110,8 @@ static int osd_object_invariant(const struct lu_object *l)
 static inline void
 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
 {
 static inline void
 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
 {
-        struct md_ucred    *uc = md_ucred(env);
-        struct cred        *tc;
+        struct md_ucred *uc = md_ucred(env);
+        struct cred     *tc;
 
         LASSERT(uc != NULL);
 
 
         LASSERT(uc != NULL);
 
@@ -254,9 +254,9 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
 /*
  * retrieve object from backend ext fs.
  **/
 /*
  * retrieve object from backend ext fs.
  **/
-static struct inode *osd_iget(struct osd_thread_info *info,
-                              struct osd_device *dev,
-                              const struct osd_inode_id *id)
+struct inode *osd_iget(struct osd_thread_info *info,
+                       struct osd_device *dev,
+                       const struct osd_inode_id *id)
 {
         struct inode *inode = NULL;
 
 {
         struct inode *inode = NULL;
 
@@ -313,17 +313,18 @@ static int osd_fid_lookup(const struct lu_env *env,
         ENTRY;
 
         info = osd_oti_get(env);
         ENTRY;
 
         info = osd_oti_get(env);
+        LASSERT(info);
         dev  = osd_dev(ldev);
         id   = &info->oti_id;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
                 RETURN(-ENOENT);
 
         dev  = osd_dev(ldev);
         id   = &info->oti_id;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
                 RETURN(-ENOENT);
 
-        result = osd_oi_lookup(info, osd_fid2oi(dev, fid), fid, id);
+        result = osd_oi_lookup(info, dev, fid, id);
         if (result != 0) {
                 if (result == -ENOENT)
                         result = 0;
         if (result != 0) {
                 if (result == -ENOENT)
                         result = 0;
-                goto out;
+                GOTO(out, result);
         }
 
         inode = osd_iget(info, dev, id);
         }
 
         inode = osd_iget(info, dev, id);
@@ -336,7 +337,7 @@ static int osd_fid_lookup(const struct lu_env *env,
                  * place holders for objects yet to be created.
                  */
                 result = PTR_ERR(inode);
                  * place holders for objects yet to be created.
                  */
                 result = PTR_ERR(inode);
-                goto out;
+                GOTO(out, result);
         }
 
         obj->oo_inode = inode;
         }
 
         obj->oo_inode = inode;
@@ -358,7 +359,6 @@ static int osd_fid_lookup(const struct lu_env *env,
         }
 out:
         LINVRNT(osd_invariant(obj));
         }
 out:
         LINVRNT(osd_invariant(obj));
-
         RETURN(result);
 }
 
         RETURN(result);
 }
 
@@ -411,30 +411,6 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
         OBD_FREE_PTR(obj);
 }
 
         OBD_FREE_PTR(obj);
 }
 
-/**
- * IAM Iterator
- */
-static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
-                                             const struct iam_container *bag)
-{
-        return bag->ic_descr->id_ops->id_ipd_alloc(bag,
-                                           osd_oti_get(env)->oti_it_ipd);
-}
-
-static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
-                                              const struct iam_container *bag)
-{
-        return bag->ic_descr->id_ops->id_ipd_alloc(bag,
-                                           osd_oti_get(env)->oti_idx_ipd);
-}
-
-static void osd_ipd_put(const struct lu_env *env,
-                        const struct iam_container *bag,
-                        struct iam_path_descr *ipd)
-{
-        bag->ic_descr->id_ops->id_ipd_free(ipd);
-}
-
 /*
  * Concurrency: no concurrent access is possible that late in object
  * life-cycle.
 /*
  * Concurrency: no concurrent access is possible that late in object
  * life-cycle.
@@ -1440,27 +1416,13 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
         return 0;
 }
 
         return 0;
 }
 
-static struct dentry * osd_child_dentry_get(const struct lu_env *env,
-                                            struct osd_object *obj,
-                                            const char *name,
-                                            const int namelen)
+struct dentry *osd_child_dentry_get(const struct lu_env *env,
+                                    struct osd_object *obj,
+                                    const char *name, const int namelen)
 {
 {
-        struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry *child_dentry = &info->oti_child_dentry;
-        struct dentry *obj_dentry = &info->oti_obj_dentry;
-
-        obj_dentry->d_inode = obj->oo_inode;
-        obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
-        obj_dentry->d_name.hash = 0;
-
-        child_dentry->d_name.hash = 0;
-        child_dentry->d_parent = obj_dentry;
-        child_dentry->d_name.name = name;
-        child_dentry->d_name.len = namelen;
-        return child_dentry;
+        return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
 }
 
 }
 
-
 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
                       cfs_umode_t mode,
                       struct dt_allocation_hint *hint,
 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
                       cfs_umode_t mode,
                       struct dt_allocation_hint *hint,
@@ -1724,7 +1686,7 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
         id->oii_ino = obj->oo_inode->i_ino;
         id->oii_gen = obj->oo_inode->i_generation;
 
         id->oii_ino = obj->oo_inode->i_ino;
         id->oii_gen = obj->oo_inode->i_generation;
 
-        return osd_oi_insert(info, osd_fid2oi(osd, fid), fid, id, th,
+        return osd_oi_insert(info, osd, fid, id, th,
                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
 }
 
                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
 }
 
@@ -1857,8 +1819,7 @@ static int osd_object_destroy(const struct lu_env *env,
 
         OSD_EXEC_OP(th, destroy);
 
 
         OSD_EXEC_OP(th, destroy);
 
-        result = osd_oi_delete(osd_oti_get(env),
-                               osd_fid2oi(osd, fid), fid, th);
+        result = osd_oi_delete(osd_oti_get(env), osd, fid, th);
 
         /* XXX: add to ext3 orphan list */
         /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
 
         /* XXX: add to ext3 orphan list */
         /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
@@ -1932,17 +1893,6 @@ static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
 }
 
 /**
 }
 
 /**
- * Helper function to pack the fid, ldiskfs stores fid in packed format.
- */
-void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
-                  struct lu_fid *befider)
-{
-        fid_cpu_to_be(befider, (struct lu_fid *)fid);
-        memcpy(pack->fp_area, befider, sizeof(*befider));
-        pack->fp_len =  sizeof(*befider) + 1;
-}
-
-/**
  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
  * To have compatilibility with 1.8 ldiskfs driver we need to have
  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
  * To have compatilibility with 1.8 ldiskfs driver we need to have
@@ -1960,23 +1910,6 @@ void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
                       (struct lu_fid *)fid);
 }
 
                       (struct lu_fid *)fid);
 }
 
-int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
-{
-        int result;
-
-        result = 0;
-        switch (pack->fp_len) {
-        case sizeof *fid + 1:
-                memcpy(fid, pack->fp_area, sizeof *fid);
-                fid_be_to_cpu(fid, fid);
-                break;
-        default:
-                CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
-                result = -EIO;
-        }
-        return result;
-}
-
 /**
  * Try to read the fid from inode ea into dt_rec, if return value
  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
 /**
  * Try to read the fid from inode ea into dt_rec, if return value
  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
@@ -4047,7 +3980,7 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
                 o->od_obj_area = NULL;
         }
         if (o->od_oi_table != NULL)
                 o->od_obj_area = NULL;
         }
         if (o->od_oi_table != NULL)
-                osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
+                osd_oi_fini(info, o);
 
         if (o->od_fsops) {
                 fsfilt_put_ops(o->od_fsops);
 
         if (o->od_fsops) {
                 fsfilt_put_ops(o->od_fsops);
@@ -4211,13 +4144,12 @@ static int osd_prepare(const struct lu_env *env,
 
         ENTRY;
         /* 1. initialize oi before any file create or file open */
 
         ENTRY;
         /* 1. initialize oi before any file create or file open */
-        result = osd_oi_init(oti, &osd->od_oi_table,
-                             &osd->od_dt_dev, lu2md_dev(pdev));
+        result = osd_oi_init(oti, osd);
         if (result < 0)
                 RETURN(result);
 
         if (result < 0)
                 RETURN(result);
 
-        LASSERT(result > 0);
-        osd->od_oi_count = result;
+        if (!lu_device_is_md(pdev))
+                RETURN(0);
 
         lmi = osd->od_mount;
         lsi = s2lsi(lmi->lmi_sb);
 
         lmi = osd->od_mount;
         lsi = s2lsi(lmi->lmi_sb);
index 068c9af..b94c0f2 100644 (file)
@@ -99,6 +99,19 @@ struct osd_directory {
         struct iam_descr     od_descr;
 };
 
         struct iam_descr     od_descr;
 };
 
+/*
+ * Object Index (oi) instance.
+ */
+struct osd_oi {
+        /*
+         * underlying index object, where fid->id mapping in stored.
+         */
+        struct inode         *oi_inode;
+        struct osd_directory   oi_dir;
+};
+
+extern const int osd_dto_credits_noquota[];
+
 struct osd_object {
         struct dt_object        oo_dt;
         /**
 struct osd_object {
         struct dt_object        oo_dt;
         /**
@@ -197,7 +210,7 @@ struct osd_device {
          */
         struct dt_object         *od_obj_area;
         /* object index */
          */
         struct dt_object         *od_obj_area;
         /* object index */
-        struct osd_oi            *od_oi_table;
+        struct osd_oi           **od_oi_table;
         /* total number of OI containers */
         int                       od_oi_count;
         /*
         /* total number of OI containers */
         int                       od_oi_count;
         /*
@@ -517,6 +530,9 @@ int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
                     struct lustre_capa *capa, __u64 opc);
 void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
                      int type, uid_t id, struct inode *inode);
                     struct lustre_capa *capa, __u64 opc);
 void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
                      int type, uid_t id, struct inode *inode);
+struct inode *osd_iget(struct osd_thread_info *info,
+                       struct osd_device *dev,
+                       const struct osd_inode_id *id);
 int generic_error_remove_page(struct address_space *mapping,
                                      struct page *page);
 
 int generic_error_remove_page(struct address_space *mapping,
                                      struct page *page);
 
@@ -562,8 +578,8 @@ static inline int osd_fid_is_igif(const struct lu_fid *fid)
         return fid_is_igif(fid) || osd_fid_is_root(fid);
 }
 
         return fid_is_igif(fid) || osd_fid_is_root(fid);
 }
 
-static inline struct osd_oi *
-osd_fid2oi(struct osd_device *osd, const struct lu_fid *fid)
+static inline struct osd_oi *osd_fid2oi(struct osd_device *osd,
+                                        const struct lu_fid *fid)
 {
         if (!fid_is_norm(fid))
                 return NULL;
 {
         if (!fid_is_norm(fid))
                 return NULL;
@@ -571,12 +587,9 @@ osd_fid2oi(struct osd_device *osd, const struct lu_fid *fid)
         LASSERT(osd->od_oi_table != NULL && osd->od_oi_count >= 1);
         /* It can work even od_oi_count equals to 1 although it's unexpected,
          * the only reason we set it to 1 is for performance measurement */
         LASSERT(osd->od_oi_table != NULL && osd->od_oi_count >= 1);
         /* It can work even od_oi_count equals to 1 although it's unexpected,
          * the only reason we set it to 1 is for performance measurement */
-        return &osd->od_oi_table[fid->f_seq & (osd->od_oi_count - 1)];
+        return osd->od_oi_table[fid->f_seq & (osd->od_oi_count - 1)];
 }
 
 }
 
-/*
- * Helpers.
- */
 extern const struct lu_device_operations  osd_lu_ops;
 
 static inline int lu_device_is_osd(const struct lu_device *d)
 extern const struct lu_device_operations  osd_lu_ops;
 
 static inline int lu_device_is_osd(const struct lu_device *d)
@@ -642,5 +655,81 @@ static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
 
 extern const struct dt_body_operations osd_body_ops_new;
 
 
 extern const struct dt_body_operations osd_body_ops_new;
 
+/**
+ * IAM Iterator
+ */
+static inline
+struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
+                                      const struct iam_container *bag)
+{
+        return bag->ic_descr->id_ops->id_ipd_alloc(bag,
+                                           osd_oti_get(env)->oti_it_ipd);
+}
+
+static inline
+struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
+                                       const struct iam_container *bag)
+{
+        return bag->ic_descr->id_ops->id_ipd_alloc(bag,
+                                           osd_oti_get(env)->oti_idx_ipd);
+}
+
+static inline void osd_ipd_put(const struct lu_env *env,
+                               const struct iam_container *bag,
+                               struct iam_path_descr *ipd)
+{
+        bag->ic_descr->id_ops->id_ipd_free(ipd);
+}
+
+static inline
+struct dentry *osd_child_dentry_by_inode(const struct lu_env *env,
+                                         struct inode *inode,
+                                         const char *name, const int namelen)
+{
+        struct osd_thread_info *info   = osd_oti_get(env);
+        struct dentry *child_dentry = &info->oti_child_dentry;
+        struct dentry *obj_dentry = &info->oti_obj_dentry;
+
+        obj_dentry->d_inode = inode;
+        obj_dentry->d_sb = inode->i_sb;
+        obj_dentry->d_name.hash = 0;
+
+        child_dentry->d_name.hash = 0;
+        child_dentry->d_parent = obj_dentry;
+        child_dentry->d_name.name = name;
+        child_dentry->d_name.len = namelen;
+        return child_dentry;
+}
+
+/**
+ * Helper function to pack the fid, ldiskfs stores fid in packed format.
+ */
+static inline
+void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
+                  struct lu_fid *befider)
+{
+        fid_cpu_to_be(befider, (struct lu_fid *)fid);
+        memcpy(pack->fp_area, befider, sizeof(*befider));
+        pack->fp_len =  sizeof(*befider) + 1;
+}
+
+static inline
+int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
+{
+        int result;
+
+        result = 0;
+        switch (pack->fp_len) {
+        case sizeof *fid + 1:
+                memcpy(fid, pack->fp_area, sizeof *fid);
+                fid_be_to_cpu(fid, fid);
+                break;
+        default:
+                CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
+                result = -EIO;
+        }
+        return result;
+}
+
 #endif /* __KERNEL__ */
 #endif /* _OSD_INTERNAL_H */
 #endif /* __KERNEL__ */
 #endif /* _OSD_INTERNAL_H */
index ac6744f..c1ce0a6 100644 (file)
@@ -96,6 +96,131 @@ static struct dt_index_features oi_feat = {
 
 #define OSD_OI_NAME_BASE        "oi.16"
 
 
 #define OSD_OI_NAME_BASE        "oi.16"
 
+static void osd_oi_table_put(struct osd_thread_info *info,
+                             struct osd_oi **oi_table, unsigned oi_count)
+{
+        struct iam_container *bag;
+        int                   i;
+
+        for (i = 0; i < oi_count; i++) {
+                LASSERT(oi_table[i] != NULL);
+                LASSERT(oi_table[i]->oi_inode != NULL);
+
+                bag = &(oi_table[i]->oi_dir.od_container);
+                if (bag->ic_object == oi_table[i]->oi_inode)
+                        iam_container_fini(bag);
+                iput(oi_table[i]->oi_inode);
+                oi_table[i]->oi_inode = NULL;
+                OBD_FREE_PTR(oi_table[i]);
+        }
+}
+
+static int osd_oi_index_create_one(struct osd_thread_info *info,
+                                   struct osd_device *osd, const char *name,
+                                   struct dt_index_features *feat)
+{
+        const struct lu_env             *env = info->oti_env;
+        struct osd_inode_id             *id  = &info->oti_id;
+        struct buffer_head              *bh;
+        struct inode                    *inode;
+        struct ldiskfs_dir_entry_2      *de;
+        struct dentry                   *dentry;
+        struct inode                    *dir;
+        handle_t                        *jh;
+        int                              rc;
+
+        dentry = osd_child_dentry_by_inode(env, osd_sb(osd)->s_root->d_inode,
+                                           name, strlen(name));
+        dir = osd_sb(osd)->s_root->d_inode;
+        bh = osd_ldiskfs_find_entry(dir, dentry, &de, NULL);
+        if (bh) {
+                brelse(bh);
+
+                id->oii_ino = le32_to_cpu(de->inode);
+                id->oii_gen = OSD_OII_NOGEN;
+
+                inode = osd_iget(info, osd, id);
+                if (!IS_ERR(inode)) {
+                        iput(inode);
+                        RETURN(-EEXIST);
+                }
+                RETURN(PTR_ERR(inode));
+        }
+
+        jh = ldiskfs_journal_start_sb(osd_sb(osd), 100);
+        LASSERT(!IS_ERR(jh));
+
+        inode = ldiskfs_create_inode(jh, osd_sb(osd)->s_root->d_inode,
+                                     (S_IFREG | S_IRUGO | S_IWUSR));
+        LASSERT(!IS_ERR(inode));
+
+        if (feat->dif_flags & DT_IND_VARKEY)
+                rc = iam_lvar_create(inode, feat->dif_keysize_max,
+                                     feat->dif_ptrsize, feat->dif_recsize_max,
+                                     jh);
+        else
+                rc = iam_lfix_create(inode, feat->dif_keysize_max,
+                                     feat->dif_ptrsize, feat->dif_recsize_max,
+                                     jh);
+
+        dentry = osd_child_dentry_by_inode(env, osd_sb(osd)->s_root->d_inode,
+                                           name, strlen(name));
+        rc = osd_ldiskfs_add_entry(jh, dentry, inode, NULL);
+        LASSERT(rc == 0);
+
+        ldiskfs_journal_stop(jh);
+        iput(inode);
+
+        return rc;
+}
+
+static struct inode *osd_oi_index_open(struct osd_thread_info *info,
+                                       struct osd_device *osd,
+                                       const char *name,
+                                       struct dt_index_features *f,
+                                       bool create)
+{
+        struct dentry *dentry;
+        struct inode  *inode;
+        int            rc;
+
+        dentry = ll_lookup_one_len(name, osd_sb(osd)->s_root, strlen(name));
+        if (IS_ERR(dentry))
+                return (void *) dentry;
+
+        if (dentry->d_inode) {
+                LASSERT(!is_bad_inode(dentry->d_inode));
+                inode = dentry->d_inode;
+                atomic_inc(&inode->i_count);
+                dput(dentry);
+                return inode;
+        }
+
+        /* create */
+        dput(dentry);
+        shrink_dcache_parent(osd_sb(osd)->s_root);
+        if (!create)
+                return ERR_PTR(-ENOENT);
+
+        rc = osd_oi_index_create_one(info, osd, name, f);
+        if (rc)
+                RETURN(ERR_PTR(rc));
+
+        dentry = ll_lookup_one_len(name, osd_sb(osd)->s_root, strlen(name));
+        if (IS_ERR(dentry))
+                return (void *) dentry;
+
+        if (dentry->d_inode) {
+                LASSERT(!is_bad_inode(dentry->d_inode));
+                inode = dentry->d_inode;
+                atomic_inc(&inode->i_count);
+                dput(dentry);
+                return inode;
+        }
+
+        return ERR_PTR(-ENOENT);
+}
+
 /**
  * Open an OI(Ojbect Index) container.
  *
 /**
  * Open an OI(Ojbect Index) container.
  *
@@ -105,46 +230,50 @@ static struct dt_index_features oi_feat = {
  * \retval      0       success
  * \retval      -ve     failure
  */
  * \retval      0       success
  * \retval      -ve     failure
  */
-static int
-osd_oi_open(struct osd_thread_info *info,
-            struct dt_device *dev, char *name, struct dt_object **objp)
+static int osd_oi_open(struct osd_thread_info *info, struct osd_device *osd,
+                       char *name, struct osd_oi **oi_slot, bool create)
 {
 {
-        const struct lu_env *env = info->oti_env;
-        struct dt_object    *obj;
-        int                  rc;
+        struct osd_directory *dir;
+        struct iam_container *bag;
+        struct inode         *inode;
+        struct osd_oi        *oi;
+        int                   rc;
 
 
-        obj = dt_store_open(env, dev, "", name, &info->oti_fid);
-        if (IS_ERR(obj))
-                return PTR_ERR(obj);
+        ENTRY;
 
 
-        oi_feat.dif_keysize_min = sizeof(info->oti_fid);
-        oi_feat.dif_keysize_max = sizeof(info->oti_fid);
+        oi_feat.dif_keysize_min = sizeof(struct lu_fid);
+        oi_feat.dif_keysize_max = sizeof(struct lu_fid);
 
 
-        rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
-        if (rc != 0) {
-                lu_object_put(info->oti_env, &obj->do_lu);
-                CERROR("%s: wrong index %s: rc = %d\n",
-                       dev->dd_lu_dev.ld_obd->obd_name, name, rc);
-                return rc;
-        }
+        inode = osd_oi_index_open(info, osd, name, &oi_feat, create);
+        if (IS_ERR(inode))
+                RETURN(PTR_ERR(inode));
 
 
-        *objp = obj;
-        return 0;
-}
+        OBD_ALLOC_PTR(oi);
+        if (oi == NULL)
+                GOTO(out_inode, rc = -ENOMEM);
 
 
+        oi->oi_inode = inode;
+        dir = &oi->oi_dir;
 
 
-static void
-osd_oi_table_put(struct osd_thread_info *info,
-                 struct osd_oi *oi_table, unsigned oi_count)
-{
-        int     i;
+        bag = &dir->od_container;
+        rc = iam_container_init(bag, &dir->od_descr, inode);
+        if (rc < 0)
+                GOTO(out_free, rc);
 
 
-        for (i = 0; i < oi_count; i++) {
-                LASSERT(oi_table[i].oi_dir != NULL);
+        rc = iam_container_setup(bag);
+        if (rc < 0)
+                GOTO(out_container, rc);
 
 
-                lu_object_put(info->oti_env, &oi_table[i].oi_dir->do_lu);
-                oi_table[i].oi_dir = NULL;
-        }
+        *oi_slot = oi;
+        RETURN(0);
+
+out_container:
+        iam_container_fini(bag);
+out_free:
+        OBD_FREE_PTR(oi);
+out_inode:
+        iput(inode);
+        return rc;
 }
 
 /**
 }
 
 /**
@@ -158,19 +287,20 @@ osd_oi_table_put(struct osd_thread_info *info,
  * success, or error code in failure.
  *
  * \param     oi_count  Number of expected OI containers
  * success, or error code in failure.
  *
  * \param     oi_count  Number of expected OI containers
- * \param     try_all   Try to open all OIs even see failures
+ * \param     create    Create OIs if doesn't exist
  *
  * \retval    +ve       number of opened OI containers
  * \retval      0       no OI containers found
  * \retval    -ve       failure
  */
 static int
  *
  * \retval    +ve       number of opened OI containers
  * \retval      0       no OI containers found
  * \retval    -ve       failure
  */
 static int
-osd_oi_table_open(struct osd_thread_info *info, struct dt_device *dev,
-                  struct osd_oi *oi_table, unsigned oi_count, int try_all)
+osd_oi_table_open(struct osd_thread_info *info, struct osd_device *osd,
+                  struct osd_oi **oi_table, unsigned oi_count, bool create)
 {
 {
-        int     count = 0;
-        int     rc = 0;
-        int     i;
+        struct dt_device *dev = &osd->od_dt_dev;
+        int               count = 0;
+        int               rc = 0;
+        int               i;
 
         /* NB: oi_count != 0 means that we have already created/known all OIs
          * and have known exact number of OIs. */
 
         /* NB: oi_count != 0 means that we have already created/known all OIs
          * and have known exact number of OIs. */
@@ -180,32 +310,24 @@ osd_oi_table_open(struct osd_thread_info *info, struct dt_device *dev,
                 char name[12];
 
                 sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
                 char name[12];
 
                 sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
-                rc = osd_oi_open(info, dev, name, &oi_table[i].oi_dir);
+                rc = osd_oi_open(info, osd, name, &oi_table[i], create);
                 if (rc == 0) {
                         count++;
                         continue;
                 }
 
                 if (rc == 0) {
                         count++;
                         continue;
                 }
 
-                if (try_all)
-                        continue;
-
                 if (rc == -ENOENT && oi_count == 0)
                         return count;
 
                 CERROR("%s: can't open %s: rc = %d\n",
                        dev->dd_lu_dev.ld_obd->obd_name, name, rc);
                 if (rc == -ENOENT && oi_count == 0)
                         return count;
 
                 CERROR("%s: can't open %s: rc = %d\n",
                        dev->dd_lu_dev.ld_obd->obd_name, name, rc);
-
                 if (oi_count > 0) {
                         CERROR("%s: expect to open total %d OI files.\n",
                                dev->dd_lu_dev.ld_obd->obd_name, oi_count);
                 }
                 if (oi_count > 0) {
                         CERROR("%s: expect to open total %d OI files.\n",
                                dev->dd_lu_dev.ld_obd->obd_name, oi_count);
                 }
-
                 break;
         }
 
                 break;
         }
 
-        if (try_all)
-                return count;
-
         if (rc < 0) {
                 osd_oi_table_put(info, oi_table, count);
                 return rc;
         if (rc < 0) {
                 osd_oi_table_put(info, oi_table, count);
                 return rc;
@@ -214,172 +336,248 @@ osd_oi_table_open(struct osd_thread_info *info, struct dt_device *dev,
         return count;
 }
 
         return count;
 }
 
-static int osd_oi_table_create(struct osd_thread_info *info,
-                               struct dt_device *dev,
-                               struct md_device *mdev, int oi_count)
-{
-        const struct lu_env *env;
-        struct md_object *mdo;
-        int i;
-
-        env = info->oti_env;
-        for (i = 0; i < oi_count; ++i) {
-                char name[12];
-
-                sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
-
-                lu_local_obj_fid(&info->oti_fid, OSD_OI_FID_OID_FIRST + i);
-                oi_feat.dif_keysize_min = sizeof(info->oti_fid);
-                oi_feat.dif_keysize_max = sizeof(info->oti_fid);
-
-                mdo = llo_store_create_index(env, mdev, dev, "", name,
-                                             &info->oti_fid, &oi_feat);
-                if (IS_ERR(mdo)) {
-                        CERROR("Failed to create OI[%d] on %s: %d\n",
-                               i, dev->dd_lu_dev.ld_obd->obd_name,
-                               (int)PTR_ERR(mdo));
-                        RETURN(PTR_ERR(mdo));
-                }
-
-                lu_object_put(env, &mdo->mo_lu);
-        }
-        return 0;
-}
-
-int osd_oi_init(struct osd_thread_info *info,
-                struct osd_oi **oi_table,
-                struct dt_device *dev,
-                struct md_device *mdev)
+int osd_oi_init(struct osd_thread_info *info, struct osd_device *osd)
 {
 {
-        struct osd_oi *oi;
-        int rc;
+        struct dt_device *dev = &osd->od_dt_dev;
+        struct osd_oi   **oi;
+        int               rc;
 
         OBD_ALLOC(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
         if (oi == NULL)
                 return -ENOMEM;
 
         cfs_mutex_lock(&oi_init_lock);
 
         OBD_ALLOC(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
         if (oi == NULL)
                 return -ENOMEM;
 
         cfs_mutex_lock(&oi_init_lock);
-
-        rc = osd_oi_table_open(info, dev, oi, 0, 0);
+        /* try to open existing multiple OIs first */
+        rc = osd_oi_table_open(info, osd, oi, 0, false);
         if (rc != 0)
                 goto out;
 
         if (rc != 0)
                 goto out;
 
-        rc = osd_oi_open(info, dev, OSD_OI_NAME_BASE, &oi[0].oi_dir);
+        /* if previous failed then try found single OI from old filesystem */
+        rc = osd_oi_open(info, osd, OSD_OI_NAME_BASE, &oi[0], false);
         if (rc == 0) { /* found single OI from old filesystem */
                 rc = 1;
                 goto out;
         if (rc == 0) { /* found single OI from old filesystem */
                 rc = 1;
                 goto out;
-        }
-
-        if (rc != -ENOENT) {
+        } else if (rc != -ENOENT) {
                 CERROR("%s: can't open %s: rc = %d\n",
                        dev->dd_lu_dev.ld_obd->obd_name, OSD_OI_NAME_BASE, rc);
                 goto out;
         }
 
                 CERROR("%s: can't open %s: rc = %d\n",
                        dev->dd_lu_dev.ld_obd->obd_name, OSD_OI_NAME_BASE, rc);
                 goto out;
         }
 
-        /* create OI objects */
-        rc = osd_oi_table_create(info, dev, mdev, osd_oi_count);
-        if (rc != 0)
-                goto out;
-
-        rc = osd_oi_table_open(info, dev, oi, osd_oi_count, 0);
-        LASSERT(rc == osd_oi_count || rc < 0);
-
- out:
+        /* No OIs exist, new filesystem, create OI objects */
+        rc = osd_oi_table_open(info, osd, oi, osd_oi_count, true);
+        LASSERT(ergo(rc >= 0, rc == osd_oi_count));
+out:
         if (rc < 0) {
                 OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
         } else {
                 LASSERT((rc & (rc - 1)) == 0);
         if (rc < 0) {
                 OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
         } else {
                 LASSERT((rc & (rc - 1)) == 0);
-                *oi_table = oi;
+                osd->od_oi_table = oi;
+                osd->od_oi_count = rc;
+                rc = 0;
         }
 
         cfs_mutex_unlock(&oi_init_lock);
         return rc;
 }
 
         }
 
         cfs_mutex_unlock(&oi_init_lock);
         return rc;
 }
 
-void osd_oi_fini(struct osd_thread_info *info,
-                 struct osd_oi **oi_table, unsigned oi_count)
+void osd_oi_fini(struct osd_thread_info *info, struct osd_device *osd)
 {
 {
-        struct osd_oi *oi = *oi_table;
+        osd_oi_table_put(info, osd->od_oi_table, osd->od_oi_count);
 
 
-        osd_oi_table_put(info, oi, oi_count);
+        OBD_FREE(osd->od_oi_table,
+                 sizeof(*(osd->od_oi_table)) * OSD_OI_FID_NR_MAX);
+        osd->od_oi_table = NULL;
+}
 
 
-        OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
-        *oi_table = NULL;
+static inline int fid_is_fs_root(const struct lu_fid *fid)
+{
+        /* Map root inode to special local object FID */
+        return (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
+                         fid_oid(fid) == OSD_FS_ROOT_OID));
 }
 
 }
 
-int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
+static int osd_oi_iam_lookup(struct osd_thread_info *oti,
+                             struct osd_oi *oi, struct dt_rec *rec,
+                             const struct dt_key *key)
+{
+        struct iam_container  *bag;
+        struct iam_iterator   *it = &oti->oti_idx_it;
+        struct iam_rec        *iam_rec;
+        struct iam_path_descr *ipd;
+        int                    rc;
+        ENTRY;
+
+        LASSERT(oi);
+        LASSERT(oi->oi_inode);
+
+        bag = &oi->oi_dir.od_container;
+        ipd = osd_idx_ipd_get(oti->oti_env, bag);
+        if (IS_ERR(ipd))
+                RETURN(-ENOMEM);
+
+        /* got ipd now we can start iterator. */
+        iam_it_init(it, bag, 0, ipd);
+
+        rc = iam_it_get(it, (struct iam_key *)key);
+        if (rc >= 0) {
+                if (S_ISDIR(oi->oi_inode->i_mode))
+                        iam_rec = (struct iam_rec *)oti->oti_ldp;
+                else
+                        iam_rec = (struct iam_rec *)rec;
+
+                iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
+                if (S_ISDIR(oi->oi_inode->i_mode))
+                        osd_fid_unpack((struct lu_fid *)rec,
+                                       (struct osd_fid_pack *)iam_rec);
+        }
+        iam_it_put(it);
+        iam_it_fini(it);
+        osd_ipd_put(oti->oti_env, bag, ipd);
+
+        LINVRNT(osd_invariant(obj));
+
+        RETURN(rc);
+}
+
+int osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
                   const struct lu_fid *fid, struct osd_inode_id *id)
 {
                   const struct lu_fid *fid, struct osd_inode_id *id)
 {
-        struct lu_fid *oi_fid = &info->oti_fid;
-        int rc;
+        struct lu_fid       *oi_fid = &info->oti_fid;
+        const struct dt_key *key;
+        int                  rc     = 0;
 
         if (osd_fid_is_igif(fid)) {
                 lu_igif_to_id(fid, id);
                 rc = 0;
         } else {
 
         if (osd_fid_is_igif(fid)) {
                 lu_igif_to_id(fid, id);
                 rc = 0;
         } else {
-                struct dt_object    *idx;
-                const struct dt_key *key;
-
                 if (!fid_is_norm(fid))
                         return -ENOENT;
 
                 if (!fid_is_norm(fid))
                         return -ENOENT;
 
-                idx = oi->oi_dir;
                 fid_cpu_to_be(oi_fid, fid);
                 key = (struct dt_key *) oi_fid;
                 fid_cpu_to_be(oi_fid, fid);
                 key = (struct dt_key *) oi_fid;
-                rc = idx->do_index_ops->dio_lookup(info->oti_env, idx,
-                                                   (struct dt_rec *)id, key,
-                                                   BYPASS_CAPA);
+
+                rc = osd_oi_iam_lookup(info, osd_fid2oi(osd, fid),
+                                       (struct dt_rec *)id, key);
+
                 if (rc > 0) {
                         id->oii_ino = be32_to_cpu(id->oii_ino);
                         id->oii_gen = be32_to_cpu(id->oii_gen);
                         rc = 0;
                 if (rc > 0) {
                         id->oii_ino = be32_to_cpu(id->oii_ino);
                         id->oii_gen = be32_to_cpu(id->oii_gen);
                         rc = 0;
-                } else if (rc == 0)
+                } else if (rc == 0) {
                         rc = -ENOENT;
                         rc = -ENOENT;
+                }
         }
         return rc;
 }
 
         }
         return rc;
 }
 
-int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi,
+static int osd_oi_iam_insert(struct osd_thread_info *oti, struct osd_oi *oi,
+                             const struct dt_rec *rec, const struct dt_key *key,
+                             struct thandle *th, int ignore_quota)
+{
+        struct iam_container  *bag;
+        struct iam_rec        *iam_rec = (struct iam_rec *)oti->oti_ldp;
+        struct iam_path_descr *ipd;
+        struct osd_thandle    *oh;
+        int                    rc;
+#ifdef HAVE_QUOTA_SUPPORT
+        cfs_cap_t              save    = cfs_curproc_cap_pack();
+#endif
+        ENTRY;
+
+        LASSERT(oi);
+        LASSERT(oi->oi_inode);
+
+        bag = &oi->oi_dir.od_container;
+        ipd = osd_idx_ipd_get(oti->oti_env, bag);
+        if (unlikely(ipd == NULL))
+                RETURN(-ENOMEM);
+
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle != NULL);
+        LASSERT(oh->ot_handle->h_transaction != NULL);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (ignore_quota)
+                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+        else
+                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+#endif
+        if (S_ISDIR(oi->oi_inode->i_mode))
+                osd_fid_pack((struct osd_fid_pack *)iam_rec, rec,
+                             &oti->oti_fid);
+        else
+                iam_rec = (struct iam_rec *) rec;
+        rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
+                        iam_rec, ipd);
+#ifdef HAVE_QUOTA_SUPPORT
+        cfs_curproc_cap_unpack(save);
+#endif
+        osd_ipd_put(oti->oti_env, bag, ipd);
+        LINVRNT(osd_invariant(obj));
+        RETURN(rc);
+}
+
+int osd_oi_insert(struct osd_thread_info *info, struct osd_device *osd,
                   const struct lu_fid *fid, const struct osd_inode_id *id0,
                   struct thandle *th, int ignore_quota)
 {
                   const struct lu_fid *fid, const struct osd_inode_id *id0,
                   struct thandle *th, int ignore_quota)
 {
-        struct lu_fid *oi_fid = &info->oti_fid;
-        struct dt_object    *idx;
+        struct lu_fid       *oi_fid = &info->oti_fid;
         struct osd_inode_id *id;
         const struct dt_key *key;
 
         if (!fid_is_norm(fid))
                 return 0;
 
         struct osd_inode_id *id;
         const struct dt_key *key;
 
         if (!fid_is_norm(fid))
                 return 0;
 
-        idx = oi->oi_dir;
         fid_cpu_to_be(oi_fid, fid);
         fid_cpu_to_be(oi_fid, fid);
-        key = (struct dt_key *) oi_fid;
+        key = (struct dt_key *)oi_fid;
 
         id  = &info->oti_id;
         id->oii_ino = cpu_to_be32(id0->oii_ino);
         id->oii_gen = cpu_to_be32(id0->oii_gen);
 
         id  = &info->oti_id;
         id->oii_ino = cpu_to_be32(id0->oii_ino);
         id->oii_gen = cpu_to_be32(id0->oii_gen);
-        return idx->do_index_ops->dio_insert(info->oti_env, idx,
-                                             (struct dt_rec *)id,
-                                             key, th, BYPASS_CAPA,
-                                             ignore_quota);
+
+        return osd_oi_iam_insert(info, osd_fid2oi(osd, fid),
+                                 (struct dt_rec *)id, key, th, ignore_quota);
+}
+
+static int osd_oi_iam_delete(struct osd_thread_info *oti, struct osd_oi *oi,
+                             const struct dt_key *key, struct thandle *handle)
+{
+        struct iam_container  *bag;
+        struct iam_path_descr *ipd;
+        struct osd_thandle    *oh;
+        int                    rc;
+        ENTRY;
+
+        LASSERT(oi);
+
+        bag = &oi->oi_dir.od_container;
+        ipd = osd_idx_ipd_get(oti->oti_env, bag);
+        if (unlikely(ipd == NULL))
+                RETURN(-ENOMEM);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle != NULL);
+        LASSERT(oh->ot_handle->h_transaction != NULL);
+
+        rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
+        osd_ipd_put(oti->oti_env, bag, ipd);
+        LINVRNT(osd_invariant(obj));
+        RETURN(rc);
 }
 
 int osd_oi_delete(struct osd_thread_info *info,
 }
 
 int osd_oi_delete(struct osd_thread_info *info,
-                  struct osd_oi *oi, const struct lu_fid *fid,
+                  struct osd_device *osd, const struct lu_fid *fid,
                   struct thandle *th)
 {
                   struct thandle *th)
 {
-        struct lu_fid *oi_fid = &info->oti_fid;
-        struct dt_object    *idx;
+        struct lu_fid       *oi_fid = &info->oti_fid;
         const struct dt_key *key;
 
         if (!fid_is_norm(fid))
                 return 0;
 
         const struct dt_key *key;
 
         if (!fid_is_norm(fid))
                 return 0;
 
-        idx = oi->oi_dir;
         fid_cpu_to_be(oi_fid, fid);
         fid_cpu_to_be(oi_fid, fid);
-        key = (struct dt_key *) oi_fid;
-        return idx->do_index_ops->dio_delete(info->oti_env, idx,
-                                             key, th, BYPASS_CAPA);
+        key = (struct dt_key *)oi_fid;
+
+        return osd_oi_iam_delete(info, osd_fid2oi(osd, fid), key, th);
 }
 
 int osd_oi_mod_init()
 }
 
 int osd_oi_mod_init()
index 94efae1..ce68721 100644 (file)
@@ -65,16 +65,8 @@ struct lu_site;
 struct thandle;
 
 struct dt_device;
 struct thandle;
 
 struct dt_device;
-
-/*
- * Object Index (oi) instance.
- */
-struct osd_oi {
-        /*
-         * underlying index object, where fid->id mapping in stored.
-         */
-        struct dt_object *oi_dir;
-};
+struct osd_device;
+struct osd_oi;
 
 /*
  * Storage cookie. Datum uniquely identifying inode on the underlying file
 
 /*
  * Storage cookie. Datum uniquely identifying inode on the underlying file
@@ -89,20 +81,15 @@ struct osd_inode_id {
 };
 
 int osd_oi_mod_init(void);
 };
 
 int osd_oi_mod_init(void);
-int osd_oi_init(struct osd_thread_info *info,
-                struct osd_oi **oi_table,
-                struct dt_device *dev,
-                struct md_device *mdev);
-void osd_oi_fini(struct osd_thread_info *info,
-                 struct osd_oi **oi_table, unsigned oi_count);
-
-int  osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
+int osd_oi_init(struct osd_thread_info *info, struct osd_device *osd);
+void osd_oi_fini(struct osd_thread_info *info, struct osd_device *osd);
+int  osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
                    const struct lu_fid *fid, struct osd_inode_id *id);
                    const struct lu_fid *fid, struct osd_inode_id *id);
-int  osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi,
+int  osd_oi_insert(struct osd_thread_info *info, struct osd_device *osd,
                    const struct lu_fid *fid, const struct osd_inode_id *id,
                    struct thandle *th, int ingore_quota);
 int  osd_oi_delete(struct osd_thread_info *info,
                    const struct lu_fid *fid, const struct osd_inode_id *id,
                    struct thandle *th, int ingore_quota);
 int  osd_oi_delete(struct osd_thread_info *info,
-                   struct osd_oi *oi, const struct lu_fid *fid,
+                   struct osd_device *osd, const struct lu_fid *fid,
                    struct thandle *th);
 
 #endif /* __KERNEL__ */
                    struct thandle *th);
 
 #endif /* __KERNEL__ */