Whamcloud - gitweb
LU-822 osd: multiple Object Index files
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_oi.c
index c63b7f5..bcad98e 100644 (file)
 #include "osd_igif.h"
 #include "dt_object.h"
 
-struct oi_descr {
-        int   fid_size;
-        char *name;
-        __u32 oid;
-};
+#define OSD_OI_FID_NR         (1UL << OSD_OI_FID_OID_BITS)
+#define OSD_OI_FID_NR_MAX     (1UL << OSD_OI_FID_OID_BITS_MAX)
+
+static unsigned int osd_oi_num = OSD_OI_FID_NR;
+CFS_MODULE_PARM(osd_oi_num, "i", int, 0444,
+                "Number of Object Index containers to be created, "
+                "it's only valid for new filesystem.");
 
 /** to serialize concurrent OI index initialization */
 static cfs_mutex_t oi_init_lock;
@@ -90,39 +92,152 @@ static struct dt_index_features oi_feat = {
         .dif_ptrsize     = 4
 };
 
-static const struct oi_descr oi_descr[OSD_OI_FID_NR] = {
-        [OSD_OI_FID_16] = {
-                .fid_size = sizeof(struct lu_fid),
-                .name     = "oi.16",
-                .oid      = OSD_OI_FID_16_OID
+#define OSD_OI_NAME_BASE        "oi.16"
+
+/**
+ * Open an OI(Ojbect Index) container.
+ *
+ * \param       name    Name of OI container
+ * \param       objp    Pointer of returned OI
+ *
+ * \retval      0       success
+ * \retval      -ve     failure
+ */
+static int
+osd_oi_open(struct osd_thread_info *info,
+            struct dt_device *dev, char *name, struct dt_object **objp)
+{
+        const struct lu_env *env = info->oti_env;
+        struct dt_object    *obj;
+        int                  rc;
+
+        obj = dt_store_open(env, dev, "", name, &info->oti_fid);
+        if (IS_ERR(obj))
+                return PTR_ERR(obj);
+
+        oi_feat.dif_keysize_min = sizeof(info->oti_fid);
+        oi_feat.dif_keysize_max = sizeof(info->oti_fid);
+
+        rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
+        if (rc != 0) {
+                lu_object_put(info->oti_env, &obj->do_lu);
+                CERROR("%s: wrong index %s: rc = %d\n",
+                       dev->dd_lu_dev.ld_obd->obd_name, name, rc);
+                return rc;
         }
-};
 
-static int osd_oi_index_create(struct osd_thread_info *info,
+        *objp = obj;
+        return 0;
+}
+
+
+static void
+osd_oi_table_put(struct osd_thread_info *info,
+                 struct osd_oi *oi_table, unsigned oi_count)
+{
+        int     i;
+
+        for (i = 0; i < oi_count; i++) {
+                LASSERT(oi_table[i].oi_dir != NULL);
+
+                lu_object_put(info->oti_env, &oi_table[i].oi_dir->do_lu);
+                oi_table[i].oi_dir = NULL;
+        }
+}
+
+/**
+ * Open OI(Object Index) table.
+ * If \a oi_count is zero, which means caller doesn't know how many OIs there
+ * will be, this function can either return 0 for new filesystem, or number
+ * of OIs on existed filesystem.
+ *
+ * If \a oi_count is non-zero, which means caller does know number of OIs on
+ * filesystem, this function should return the exactly same number on
+ * success, or error code in failure.
+ *
+ * \param     oi_count  Number of expected OI containers
+ * \param     try_all   Try to open all OIs even see failures
+ *
+ * \retval    +ve       number of opened OI containers
+ * \retval      0       no OI containers found
+ * \retval    -ve       failure
+ */
+static int
+osd_oi_table_open(struct osd_thread_info *info, struct dt_device *dev,
+                  struct osd_oi *oi_table, unsigned oi_count, int try_all)
+{
+        int     count = 0;
+        int     rc = 0;
+        int     i;
+
+        /* NB: oi_count != 0 means that we have already created/known all OIs
+         * and have known exact number of OIs. */
+        LASSERT(oi_count <= OSD_OI_FID_NR_MAX);
+
+        for (i = 0; i < (oi_count != 0 ? oi_count : OSD_OI_FID_NR_MAX); i++) {
+                char name[12];
+
+                sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
+                rc = osd_oi_open(info, dev, name, &oi_table[i].oi_dir);
+                if (rc == 0) {
+                        count++;
+                        continue;
+                }
+
+                if (try_all)
+                        continue;
+
+                if (rc == -ENOENT && oi_count == 0)
+                        return count;
+
+                CERROR("%s: can't open %s: rc = %d\n",
+                       dev->dd_lu_dev.ld_obd->obd_name, name, rc);
+
+                if (oi_count > 0) {
+                        CERROR("%s: expect to open total %d OI files.\n",
+                               dev->dd_lu_dev.ld_obd->obd_name, oi_count);
+                }
+
+                break;
+        }
+
+        if (try_all)
+                return count;
+
+        if (rc < 0) {
+                osd_oi_table_put(info, oi_table, count);
+                return rc;
+        }
+
+        return count;
+}
+
+static int osd_oi_table_create(struct osd_thread_info *info,
                                struct dt_device *dev,
-                               struct md_device *mdev)
+                               struct md_device *mdev, int oi_count)
 {
         const struct lu_env *env;
-        struct lu_fid *oi_fid = &info->oti_fid;
         struct md_object *mdo;
         int i;
-        int rc;
 
         env = info->oti_env;
+        for (i = 0; i < oi_count; ++i) {
+                char name[12];
 
-        for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
-                char *name;
-                name = oi_descr[i].name;
-                lu_local_obj_fid(oi_fid, oi_descr[i].oid);
-                oi_feat.dif_keysize_min = oi_descr[i].fid_size,
-                oi_feat.dif_keysize_max = oi_descr[i].fid_size,
+                sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
 
-                mdo = llo_store_create_index(env, mdev, dev,
-                                             "", name,
-                                             oi_fid, &oi_feat);
+                lu_local_obj_fid(&info->oti_fid, OSD_OI_FID_OID_FIRST + i);
+                oi_feat.dif_keysize_min = sizeof(info->oti_fid);
+                oi_feat.dif_keysize_max = sizeof(info->oti_fid);
 
-                if (IS_ERR(mdo))
+                mdo = llo_store_create_index(env, mdev, dev, "", name,
+                                             &info->oti_fid, &oi_feat);
+                if (IS_ERR(mdo)) {
+                        CERROR("Failed to create OI[%d] on %s: %d\n",
+                               i, dev->dd_lu_dev.ld_obd->obd_name,
+                               (int)PTR_ERR(mdo));
                         RETURN(PTR_ERR(mdo));
+                }
 
                 lu_object_put(env, &mdo->mo_lu);
         }
@@ -130,68 +245,62 @@ static int osd_oi_index_create(struct osd_thread_info *info,
 }
 
 int osd_oi_init(struct osd_thread_info *info,
-                struct osd_oi *oi,
+                struct osd_oi **oi_table,
                 struct dt_device *dev,
                 struct md_device *mdev)
 {
-        const struct lu_env *env;
+        struct osd_oi *oi;
         int rc;
-        int i;
 
-        env = info->oti_env;
+        OBD_ALLOC(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
+        if (oi == NULL)
+                return -ENOMEM;
+
         cfs_mutex_lock(&oi_init_lock);
-        memset(oi, 0, sizeof *oi);
-retry:
-        for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
-                const char       *name;
-                struct dt_object *obj;
-
-                name = oi_descr[i].name;
-                oi_feat.dif_keysize_min = oi_descr[i].fid_size,
-                oi_feat.dif_keysize_max = oi_descr[i].fid_size,
-
-                obj = dt_store_open(env, dev, "", name, &info->oti_fid);
-                if (!IS_ERR(obj)) {
-                        rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
-                        if (rc == 0) {
-                                LASSERT(obj->do_index_ops != NULL);
-                                oi->oi_dir = obj;
-                        } else {
-                                CERROR("Wrong index \"%s\": %d\n", name, rc);
-                                lu_object_put(env, &obj->do_lu);
-                        }
-                } else {
-                        rc = PTR_ERR(obj);
-                        if (rc == -ENOENT) {
-                                rc = osd_oi_index_create(info, dev, mdev);
-                                if (!rc)
-                                        goto retry;
-                        }
-                        CERROR("Cannot open \"%s\": %d\n", name, rc);
-                }
+
+        rc = osd_oi_table_open(info, dev, oi, 0, 0);
+        if (rc != 0)
+                goto out;
+
+        rc = osd_oi_open(info, dev, OSD_OI_NAME_BASE, &oi[0].oi_dir);
+        if (rc == 0) { /* found single OI from old filesystem */
+                rc = 1;
+                goto out;
+        }
+
+        if (rc != -ENOENT) {
+                CERROR("%s: can't open %s: rc = %d\n",
+                       dev->dd_lu_dev.ld_obd->obd_name, OSD_OI_NAME_BASE, rc);
+                goto out;
         }
+
+        /* create OI objects */
+        rc = osd_oi_table_create(info, dev, mdev, osd_oi_num);
         if (rc != 0)
-                osd_oi_fini(info, oi);
+                goto out;
+
+        rc = osd_oi_table_open(info, dev, oi, osd_oi_num, 0);
+        LASSERT(rc == osd_oi_num || rc < 0);
+
+ out:
+        if (rc < 0)
+                OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
+        else
+                *oi_table = oi;
 
         cfs_mutex_unlock(&oi_init_lock);
         return rc;
 }
 
-void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi)
+void osd_oi_fini(struct osd_thread_info *info,
+                 struct osd_oi **oi_table, unsigned oi_count)
 {
-        if (oi->oi_dir != NULL) {
-                lu_object_put(info->oti_env, &oi->oi_dir->do_lu);
-                oi->oi_dir = NULL;
-        }
-}
+        struct osd_oi *oi = *oi_table;
 
-static inline int fid_is_oi_fid(const struct lu_fid *fid)
-{
-        /* We need to filter-out oi obj's fid. As we can not store it, while
-         * oi-index create operation.
-         */
-        return (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
-                fid_oid(fid) == OSD_OI_FID_16_OID));
+        osd_oi_table_put(info, oi, oi_count);
+
+        OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
+        *oi_table = NULL;
 }
 
 int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
@@ -207,7 +316,7 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
                 struct dt_object    *idx;
                 const struct dt_key *key;
 
-                if (fid_is_oi_fid(fid))
+                if (!fid_is_norm(fid))
                         return -ENOENT;
 
                 idx = oi->oi_dir;
@@ -235,10 +344,7 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi,
         struct osd_inode_id *id;
         const struct dt_key *key;
 
-        if (osd_fid_is_igif(fid))
-                return 0;
-
-        if (fid_is_oi_fid(fid))
+        if (!fid_is_norm(fid))
                 return 0;
 
         idx = oi->oi_dir;
@@ -262,7 +368,7 @@ int osd_oi_delete(struct osd_thread_info *info,
         struct dt_object    *idx;
         const struct dt_key *key;
 
-        if (osd_fid_is_igif(fid))
+        if (!fid_is_norm(fid))
                 return 0;
 
         idx = oi->oi_dir;
@@ -274,6 +380,9 @@ int osd_oi_delete(struct osd_thread_info *info,
 
 int osd_oi_mod_init()
 {
+        if (osd_oi_num == 0 || osd_oi_num > OSD_OI_FID_NR_MAX)
+                osd_oi_num = OSD_OI_FID_NR;
+
         cfs_mutex_init(&oi_init_lock);
         return 0;
 }