Whamcloud - gitweb
LU-1187 osd: allocate osd_compat_objid_seq dynamically
authorwangdi <di.wang@whamcloud.com>
Tue, 24 Sep 2013 09:16:38 +0000 (02:16 -0700)
committerOleg Drokin <green@whamcloud.com>
Thu, 10 Jan 2013 15:26:18 +0000 (10:26 -0500)
1. Allocate osd_compat_objid_seq dynamically, so osd can load
as many as seq it wants.
2. Change group to seq.
3. remove the compat from name, because we will still use
the same disk-layout for FID on OST.

Change-Id: I52d40ecd805aa44074838496e44ed2b71fd39f92
Signed-off-by: Wang Di <di.wang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/4323
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Hudson
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
lustre/include/lustre_disk.h
lustre/include/lustre_fid.h
lustre/osd-ldiskfs/osd_compat.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_oi.c

index 8902c3c..7869803 100644 (file)
@@ -60,6 +60,7 @@
 #define MOUNT_DATA_FILE    MOUNT_CONFIGS_DIR"/"CONFIGS_FILE
 #define LAST_RCVD         "last_rcvd"
 #define LOV_OBJID         "lov_objid"
+#define LOV_OBJSEQ             "lov_objseq"
 #define HEALTH_CHECK      "health_check"
 #define CAPA_KEYS         "capa_keys"
 #define CHANGELOG_USERS   "changelog_users"
index 0ab7b08..78207ca 100644 (file)
@@ -224,6 +224,7 @@ enum local_oid {
         LLOG_CATALOGS_OID       = 4118UL,
         MGS_CONFIGS_OID         = 4119UL,
         OFD_HEALTH_CHECK_OID    = 4120UL,
+       MDD_LOV_OBJ_OSEQ        = 4121UL,
 };
 
 static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
index 27a1f2c..2a79ae1 100644 (file)
@@ -35,7 +35,7 @@
  *
  * lustre/osd/osd_compat.c
  *
- * on-disk compatibility stuff for OST
+ * on-disk structure for managing /O
  *
  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
  */
 #include "osd_internal.h"
 #include "osd_oi.h"
 
-struct osd_compat_objid_seq {
-        /* protects on-fly initialization */
-       struct semaphore        dir_init_sem;
-        /* file storing last created objid */
-        struct osd_inode_id    last_id;
-        struct dentry         *groot; /* O/<seq> */
-        struct dentry        **dirs;  /* O/<seq>/d0-dXX */
-};
-
-#define MAX_OBJID_GROUP (FID_SEQ_ECHO + 1)
-
-struct osd_compat_objid {
-        int                          subdir_count;
-        struct dentry               *root;
-        struct osd_inode_id          last_rcvd_id;
-        struct osd_inode_id          last_seq_id;
-        struct osd_compat_objid_seq  groups[MAX_OBJID_GROUP];
-};
-
 static void osd_push_ctxt(const struct osd_device *dev,
                           struct lvfs_run_ctxt *newctxt,
                           struct lvfs_run_ctxt *save)
@@ -88,89 +69,11 @@ static void osd_push_ctxt(const struct osd_device *dev,
         push_ctxt(save, newctxt, NULL);
 }
 
-void osd_compat_seq_fini(struct osd_device *osd, int seq)
+static void osd_pop_ctxt(const struct osd_device *dev,
+                        struct lvfs_run_ctxt *new,
+                        struct lvfs_run_ctxt *save)
 {
-        struct osd_compat_objid_seq *grp;
-        struct osd_compat_objid     *map = osd->od_ost_map;
-        int                          i;
-
-        ENTRY;
-
-        grp = &map->groups[seq];
-        if (grp->groot ==NULL)
-                RETURN_EXIT;
-        LASSERT(grp->dirs);
-
-        for (i = 0; i < map->subdir_count; i++) {
-                if (grp->dirs[i] == NULL)
-                        break;
-                dput(grp->dirs[i]);
-        }
-
-        OBD_FREE(grp->dirs, sizeof(struct dentry *) * map->subdir_count);
-        dput(grp->groot);
-        EXIT;
-}
-
-int osd_compat_seq_init(struct osd_device *osd, int seq)
-{
-        struct osd_compat_objid_seq *grp;
-        struct osd_compat_objid     *map;
-        struct dentry               *d;
-        int                          rc = 0;
-        char                         name[32];
-        int                          i;
-        ENTRY;
-
-        map = osd->od_ost_map;
-        LASSERT(map);
-        LASSERT(map->root);
-        grp = &map->groups[seq];
-
-        if (grp->groot != NULL)
-                RETURN(0);
-
-       down(&grp->dir_init_sem);
-
-        sprintf(name, "%d", seq);
-        d = simple_mkdir(map->root, osd->od_mnt, name, 0755, 1);
-        if (IS_ERR(d)) {
-                rc = PTR_ERR(d);
-                GOTO(out, rc);
-        } else if (d->d_inode == NULL) {
-                rc = -EFAULT;
-                dput(d);
-                GOTO(out, rc);
-        }
-
-        LASSERT(grp->dirs == NULL);
-        OBD_ALLOC(grp->dirs, sizeof(d) * map->subdir_count);
-        if (grp->dirs == NULL) {
-                dput(d);
-                GOTO(out, rc = -ENOMEM);
-        }
-
-        grp->groot = d;
-        for (i = 0; i < map->subdir_count; i++) {
-                sprintf(name, "d%d", i);
-                d = simple_mkdir(grp->groot, osd->od_mnt, name, 0755, 1);
-                if (IS_ERR(d)) {
-                        rc = PTR_ERR(d);
-                        break;
-                } else if (d->d_inode == NULL) {
-                        rc = -EFAULT;
-                        dput(d);
-                        break;
-                }
-
-                grp->dirs[i] = d;
-        }
-
-        if (rc)
-                osd_compat_seq_fini(osd, seq);
-out:
-       up(&grp->dir_init_sem);
-        RETURN(rc);
+       pop_ctxt(save, new, NULL);
 }
 
 int osd_last_rcvd_subdir_count(struct osd_device *osd)
@@ -211,25 +114,6 @@ out:
        return count;
 }
 
-void osd_compat_fini(struct osd_device *dev)
-{
-        int i;
-
-        ENTRY;
-
-        if (dev->od_ost_map == NULL)
-                RETURN_EXIT;
-
-        for (i = 0; i < MAX_OBJID_GROUP; i++)
-                osd_compat_seq_fini(dev, i);
-
-        dput(dev->od_ost_map->root);
-        OBD_FREE_PTR(dev->od_ost_map);
-        dev->od_ost_map = NULL;
-
-        EXIT;
-}
-
 /*
  * directory structure on legacy OST:
  *
@@ -240,15 +124,13 @@ void osd_compat_fini(struct osd_device *dev)
  * CONFIGS
  *
  */
-int osd_compat_init(struct osd_device *dev)
+int osd_ost_init(struct osd_device *dev)
 {
        struct lvfs_run_ctxt  new;
        struct lvfs_run_ctxt  save;
        struct dentry        *rootd = osd_sb(dev)->s_root;
        struct dentry        *d;
        int                   rc;
-       int                   i;
-
        ENTRY;
 
        OBD_ALLOC_PTR(dev->od_ost_map);
@@ -262,36 +144,117 @@ int osd_compat_init(struct osd_device *dev)
                RETURN(rc);
        }
 
-        dev->od_ost_map->subdir_count = rc;
+       dev->od_ost_map->om_subdir_count = rc;
         rc = 0;
 
+       CFS_INIT_LIST_HEAD(&dev->od_ost_map->om_seq_list);
+       rwlock_init(&dev->od_ost_map->om_seq_list_lock);
+       sema_init(&dev->od_ost_map->om_dir_init_sem, 1);
+
         LASSERT(dev->od_fsops);
         osd_push_ctxt(dev, &new, &save);
 
         d = simple_mkdir(rootd, dev->od_mnt, "O", 0755, 1);
-        pop_ctxt(&save, &new, NULL);
+       if (IS_ERR(d))
+               GOTO(cleanup, rc = PTR_ERR(d));
+
+       dev->od_ost_map->om_root = d;
+
+cleanup:
+       osd_pop_ctxt(dev, &new, &save);
         if (IS_ERR(d)) {
                 OBD_FREE_PTR(dev->od_ost_map);
                 RETURN(PTR_ERR(d));
         }
 
-        dev->od_ost_map->root = d;
+       RETURN(rc);
+}
+
+static void osd_seq_free(struct osd_obj_map *map,
+                        struct osd_obj_seq *osd_seq)
+{
+       int j;
 
-        /* Initialize all groups */
-        for (i = 0; i < MAX_OBJID_GROUP; i++) {
-               sema_init(&dev->od_ost_map->groups[i].dir_init_sem, 1);
-                rc = osd_compat_seq_init(dev, i);
-                if (rc) {
-                        osd_compat_fini(dev);
-                        break;
+       cfs_list_del_init(&osd_seq->oos_seq_list);
+
+       if (osd_seq->oos_dirs) {
+               for (j = 0; j < osd_seq->oos_subdir_count; j++) {
+                       if (osd_seq->oos_dirs[j])
+                               dput(osd_seq->oos_dirs[j]);
                 }
+               OBD_FREE(osd_seq->oos_dirs,
+                        sizeof(struct dentry *) * osd_seq->oos_subdir_count);
         }
 
+       if (osd_seq->oos_root)
+               dput(osd_seq->oos_root);
+
+       OBD_FREE_PTR(osd_seq);
+}
+
+int osd_obj_map_init(struct osd_device *dev)
+{
+       int rc;
+       ENTRY;
+
+       /* prepare structures for OST */
+       rc = osd_ost_init(dev);
+
+       /* prepare structures for MDS */
+
         RETURN(rc);
 }
 
-int osd_compat_del_entry(struct osd_thread_info *info, struct osd_device *osd,
-                         struct dentry *dird, char *name, struct thandle *th)
+struct osd_obj_seq *osd_seq_find_locked(struct osd_obj_map *map, obd_seq seq)
+{
+       struct osd_obj_seq *osd_seq;
+
+       cfs_list_for_each_entry(osd_seq, &map->om_seq_list, oos_seq_list) {
+               if (osd_seq->oos_seq == seq)
+                       return osd_seq;
+       }
+       return NULL;
+}
+
+struct osd_obj_seq *osd_seq_find(struct osd_obj_map *map, obd_seq seq)
+{
+       struct osd_obj_seq *osd_seq;
+
+       read_lock(&map->om_seq_list_lock);
+       osd_seq = osd_seq_find_locked(map, seq);
+       read_unlock(&map->om_seq_list_lock);
+       return osd_seq;
+}
+
+void osd_obj_map_fini(struct osd_device *dev)
+{
+       struct osd_obj_seq    *osd_seq;
+       struct osd_obj_seq    *tmp;
+       struct osd_obj_map    *map = dev->od_ost_map;
+       ENTRY;
+
+       map = dev->od_ost_map;
+       if (map == NULL)
+               return;
+
+       write_lock(&dev->od_ost_map->om_seq_list_lock);
+       cfs_list_for_each_entry_safe(osd_seq, tmp,
+                                    &dev->od_ost_map->om_seq_list,
+                                    oos_seq_list) {
+               osd_seq_free(map, osd_seq);
+       }
+       write_unlock(&dev->od_ost_map->om_seq_list_lock);
+       if (map->om_root)
+               dput(map->om_root);
+       OBD_FREE_PTR(dev->od_ost_map);
+       dev->od_ost_map = NULL;
+       EXIT;
+}
+
+static int osd_obj_del_entry(struct osd_thread_info *info,
+                            struct osd_device *osd,
+                            struct dentry *dird, char *name,
+                            struct thandle *th)
 {
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
@@ -327,9 +290,11 @@ int osd_compat_del_entry(struct osd_thread_info *info, struct osd_device *osd,
        RETURN(rc);
 }
 
-int osd_compat_add_entry(struct osd_thread_info *info, struct osd_device *osd,
-                         struct dentry *dir, char *name,
-                         const struct osd_inode_id *id, struct thandle *th)
+int osd_obj_add_entry(struct osd_thread_info *info,
+                     struct osd_device *osd,
+                     struct dentry *dir, char *name,
+                     const struct osd_inode_id *id,
+                     struct thandle *th)
 {
         struct osd_thandle *oh;
         struct dentry *child;
@@ -362,48 +327,186 @@ int osd_compat_add_entry(struct osd_thread_info *info, struct osd_device *osd,
        RETURN(rc);
 }
 
-int osd_compat_objid_lookup(struct osd_thread_info *info,
-                            struct osd_device *dev, const struct lu_fid *fid,
-                            struct osd_inode_id *id)
+/**
+ * Use LPU64 for legacy OST sequences, but use LPX64i for new
+ * sequences names, so that the O/{seq}/dN/{oid} more closely
+ * follows the DFID/PFID format. This makes it easier to map from
+ * debug messages to objects in the future, and the legacy space
+ * of FID_SEQ_OST_MDT0 will be unused in the future.
+ **/
+static inline void osd_seq_name(char *seq_name, obd_seq seq)
 {
-        struct osd_compat_objid    *map;
-        struct dentry              *d;
-        struct dentry              *d_seq;
-        struct ost_id              *ostid = &info->oti_ostid;
-        int                         dirn;
-        char                        name[32];
-        struct ldiskfs_dir_entry_2 *de;
-        struct buffer_head         *bh;
-        struct inode               *dir;
-       struct inode               *inode;
+       sprintf(seq_name, (fid_seq_is_rsvd(seq) ||
+                          fid_seq_is_mdt0(seq)) ? LPU64 : LPX64i,
+               fid_seq_is_idif(seq) ? 0 : seq);
+}
+
+static inline void osd_oid_name(char *name, const struct lu_fid *fid, obd_id id)
+{
+       sprintf(name, (fid_seq_is_rsvd(fid_seq(fid)) ||
+               fid_seq_is_mdt0(fid_seq(fid)) ||
+               fid_seq_is_idif(fid_seq(fid))) ? LPU64 : LPX64i, id);
+}
+
+/* external locking is required */
+static int osd_seq_load_locked(struct osd_device *osd,
+                              struct osd_obj_seq *osd_seq)
+{
+       struct osd_obj_map  *map = osd->od_ost_map;
+       struct dentry       *seq_dir;
+       int                 rc = 0;
+       int                 i;
+       char                seq_name[32];
+       ENTRY;
+
+       if (osd_seq->oos_root != NULL)
+               RETURN(0);
+
+       LASSERT(map);
+       LASSERT(map->om_root);
+
+       osd_seq_name(seq_name, osd_seq->oos_seq);
+
+       seq_dir = simple_mkdir(map->om_root, osd->od_mnt, seq_name, 0755, 1);
+       if (IS_ERR(seq_dir))
+               GOTO(out_err, rc = PTR_ERR(seq_dir));
+       else if (seq_dir->d_inode == NULL)
+               GOTO(out_put, rc = -EFAULT);
+
+       osd_seq->oos_root = seq_dir;
+
+       LASSERT(osd_seq->oos_dirs == NULL);
+       OBD_ALLOC(osd_seq->oos_dirs,
+                 sizeof(seq_dir) * osd_seq->oos_subdir_count);
+       if (osd_seq->oos_dirs == NULL)
+               GOTO(out_put, rc = -ENOMEM);
+
+       for (i = 0; i < osd_seq->oos_subdir_count; i++) {
+               struct dentry   *dir;
+               char         name[32];
+
+               sprintf(name, "d%u", i);
+               dir = simple_mkdir(osd_seq->oos_root, osd->od_mnt, name,
+                                  0700, 1);
+               if (IS_ERR(dir)) {
+                       rc = PTR_ERR(dir);
+               } else if (dir->d_inode) {
+                       osd_seq->oos_dirs[i] = dir;
+                       rc = 0;
+               } else {
+                       LBUG();
+               }
+       }
+
+       if (rc != 0)
+               osd_seq_free(map, osd_seq);
+out_put:
+       if (rc != 0) {
+               dput(seq_dir);
+               osd_seq->oos_root = NULL;
+       }
+out_err:
+       RETURN(rc);
+}
+
+static struct osd_obj_seq *osd_seq_load(struct osd_device *osd, obd_seq seq)
+{
+       struct osd_obj_map      *map;
+       struct osd_obj_seq      *osd_seq;
+       int                     rc = 0;
+       ENTRY;
+
+       map = osd->od_ost_map;
+       LASSERT(map);
+       LASSERT(map->om_root);
+
+       osd_seq = osd_seq_find(map, seq);
+       if (likely(osd_seq != NULL))
+               RETURN(osd_seq);
+
+       /* Serializing init process */
+       down(&map->om_dir_init_sem);
+
+       /* Check whether the seq has been added */
+       read_lock(&map->om_seq_list_lock);
+       osd_seq = osd_seq_find_locked(map, seq);
+       if (osd_seq != NULL) {
+               read_unlock(&map->om_seq_list_lock);
+               GOTO(cleanup, rc = 0);
+       }
+       read_unlock(&map->om_seq_list_lock);
+
+       OBD_ALLOC_PTR(osd_seq);
+       if (osd_seq == NULL)
+               GOTO(cleanup, rc = -ENOMEM);
+
+       CFS_INIT_LIST_HEAD(&osd_seq->oos_seq_list);
+       osd_seq->oos_seq = seq;
+       /* Init subdir count to be 32, but each seq can have
+        * different subdir count */
+       osd_seq->oos_subdir_count = map->om_subdir_count;
+       rc = osd_seq_load_locked(osd, osd_seq);
+       if (rc != 0)
+               GOTO(cleanup, rc);
+
+       write_lock(&map->om_seq_list_lock);
+       cfs_list_add(&osd_seq->oos_seq_list, &map->om_seq_list);
+       write_unlock(&map->om_seq_list_lock);
+
+cleanup:
+       up(&map->om_dir_init_sem);
+       if (rc != 0) {
+               if (osd_seq != NULL)
+                       OBD_FREE_PTR(osd_seq);
+               RETURN(ERR_PTR(rc));
+       }
+
+       RETURN(osd_seq);
+}
+
+int osd_obj_map_lookup(struct osd_thread_info *info, struct osd_device *dev,
+                      const struct lu_fid *fid, struct osd_inode_id *id)
+{
+       struct osd_obj_map              *map;
+       struct osd_obj_seq              *osd_seq;
+       struct dentry                   *d_seq;
+       struct dentry                   *child;
+       struct ost_id                   *ostid = &info->oti_ostid;
+       int                             dirn;
+       char                            name[32];
+       struct ldiskfs_dir_entry_2      *de;
+       struct buffer_head              *bh;
+       struct inode                    *dir;
+       struct inode                    *inode;
         ENTRY;
 
         /* on the very first lookup we find and open directories */
 
         map = dev->od_ost_map;
         LASSERT(map);
-        LASSERT(map->root);
+       LASSERT(map->om_root);
 
         fid_ostid_pack(fid, ostid);
-        LASSERT(ostid->oi_seq < MAX_OBJID_GROUP);
-        LASSERT(map->subdir_count > 0);
-        LASSERT(map->groups[ostid->oi_seq].groot);
+       osd_seq = osd_seq_load(dev, ostid->oi_seq);
+       if (IS_ERR(osd_seq))
+               RETURN(PTR_ERR(osd_seq));
 
-        dirn = ostid->oi_id & (map->subdir_count - 1);
-        d = map->groups[ostid->oi_seq].dirs[dirn];
-        LASSERT(d);
+       dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+       d_seq = osd_seq->oos_dirs[dirn];
+       LASSERT(d_seq);
+
+       osd_oid_name(name, fid, ostid->oi_id);
 
-        sprintf(name, "%llu", ostid->oi_id);
-        d_seq = &info->oti_child_dentry;
-        d_seq->d_parent = d;
-        d_seq->d_name.hash = 0;
-        d_seq->d_name.name = name;
-        /* XXX: we can use rc from sprintf() instead of strlen() */
-        d_seq->d_name.len = strlen(name);
+       child = &info->oti_child_dentry;
+       child->d_parent = d_seq;
+       child->d_name.hash = 0;
+       child->d_name.name = name;
+       /* XXX: we can use rc from sprintf() instead of strlen() */
+       child->d_name.len = strlen(name);
 
-       dir = d->d_inode;
+       dir = d_seq->d_inode;
        mutex_lock(&dir->i_mutex);
-       bh = osd_ldiskfs_find_entry(dir, d_seq, &de, NULL);
+       bh = osd_ldiskfs_find_entry(dir, child, &de, NULL);
        mutex_unlock(&dir->i_mutex);
 
        if (bh == NULL)
@@ -420,63 +523,68 @@ int osd_compat_objid_lookup(struct osd_thread_info *info,
        RETURN(0);
 }
 
-int osd_compat_objid_insert(struct osd_thread_info *info,
-                            struct osd_device *osd,
-                            const struct lu_fid *fid,
-                            const struct osd_inode_id *id,
-                            struct thandle *th)
+int osd_obj_map_insert(struct osd_thread_info *info,
+                      struct osd_device *osd,
+                      const struct lu_fid *fid,
+                      const struct osd_inode_id *id,
+                      struct thandle *th)
 {
-        struct osd_compat_objid *map;
-        struct dentry           *d;
-        struct ost_id           *ostid = &info->oti_ostid;
-        int                      dirn, rc = 0;
-        char                     name[32];
+       struct osd_obj_map      *map;
+       struct osd_obj_seq      *osd_seq;
+       struct dentry           *d;
+       struct ost_id           *ostid = &info->oti_ostid;
+       int                     dirn, rc = 0;
+       char                    name[32];
         ENTRY;
 
         map = osd->od_ost_map;
         LASSERT(map);
-        LASSERT(map->root);
-        LASSERT(map->subdir_count > 0);
-        LASSERT(map->groups[ostid->oi_seq].groot);
 
-        /* map fid to group:objid */
+       /* map fid to seq:objid */
         fid_ostid_pack(fid, ostid);
-        dirn = ostid->oi_id & (map->subdir_count - 1);
-        d = map->groups[ostid->oi_seq].dirs[dirn];
+
+       osd_seq = osd_seq_load(osd, ostid->oi_seq);
+       if (IS_ERR(osd_seq))
+               RETURN(PTR_ERR(osd_seq));
+
+       dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+       d = osd_seq->oos_dirs[dirn];
         LASSERT(d);
 
-        sprintf(name, "%llu", ostid->oi_id);
-        rc = osd_compat_add_entry(info, osd, d, name, id, th);
+       osd_oid_name(name, fid, ostid->oi_id);
+       rc = osd_obj_add_entry(info, osd, d, name, id, th);
 
         RETURN(rc);
 }
 
-int osd_compat_objid_delete(struct osd_thread_info *info,
-                            struct osd_device *osd,
-                            const struct lu_fid *fid, struct thandle *th)
+int osd_obj_map_delete(struct osd_thread_info *info, struct osd_device *osd,
+                      const struct lu_fid *fid, struct thandle *th)
 {
-        struct osd_compat_objid *map;
-        struct dentry           *d;
-        struct ost_id           *ostid = &info->oti_ostid;
-        int                      dirn, rc = 0;
-        char                     name[32];
+       struct osd_obj_map      *map;
+       struct osd_obj_seq      *osd_seq;
+       struct dentry           *d;
+       struct ost_id           *ostid = &info->oti_ostid;
+       int                     dirn, rc = 0;
+       char                    name[32];
         ENTRY;
 
         map = osd->od_ost_map;
         LASSERT(map);
-        LASSERT(map->root);
-        LASSERT(map->subdir_count > 0);
-        LASSERT(map->groups[ostid->oi_seq].groot);
 
-        /* map fid to group:objid */
+       /* map fid to seq:objid */
         fid_ostid_pack(fid, ostid);
-        dirn = ostid->oi_id & (map->subdir_count - 1);
-        d = map->groups[ostid->oi_seq].dirs[dirn];
-        LASSERT(d);
 
-        sprintf(name, "%llu", ostid->oi_id);
-        rc = osd_compat_del_entry(info, osd, d, name, th);
+       osd_seq = osd_seq_load(osd, ostid->oi_seq);
+       if (IS_ERR(osd_seq))
+               GOTO(cleanup, rc = PTR_ERR(osd_seq));
 
+       dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+       d = osd_seq->oos_dirs[dirn];
+       LASSERT(d);
+
+       osd_oid_name(name, fid, ostid->oi_id);
+       rc = osd_obj_del_entry(info, osd, d, name, th);
+cleanup:
         RETURN(rc);
 }
 
@@ -501,6 +609,7 @@ static const struct named_oid oids[] = {
        { LLOG_CATALOGS_OID,    "CATALOGS" },
        { MGS_CONFIGS_OID,      "" /* MOUNT_CONFIGS_DIR */ },
        { OFD_HEALTH_CHECK_OID, HEALTH_CHECK },
+       { MDD_LOV_OBJ_OSEQ,     LOV_OBJSEQ },
        { 0,                    NULL }
 };
 
@@ -516,41 +625,40 @@ static char *oid2name(const unsigned long oid)
         return NULL;
 }
 
-int osd_compat_spec_insert(struct osd_thread_info *info,
-                           struct osd_device *osd, const struct lu_fid *fid,
-                           const struct osd_inode_id *id, struct thandle *th)
+int osd_obj_spec_insert(struct osd_thread_info *info, struct osd_device *osd,
+                       const struct lu_fid *fid,
+                       const struct osd_inode_id *id,
+                       struct thandle *th)
 {
-        struct osd_compat_objid *map = osd->od_ost_map;
-        struct dentry           *root = osd_sb(osd)->s_root;
-        char                    *name;
-        int                      rc = 0;
-        int                      seq;
-        ENTRY;
+       struct osd_obj_map      *map = osd->od_ost_map;
+       struct dentry           *root = osd_sb(osd)->s_root;
+       char                    *name;
+       int                     rc = 0;
+       ENTRY;
 
         if (fid_oid(fid) >= OFD_GROUP0_LAST_OID &&
             fid_oid(fid) < OFD_GROUP4K_LAST_OID) {
-                /* on creation of LAST_ID we create O/<group> hierarchy */
-                LASSERT(map);
-                seq = fid_oid(fid) - OFD_GROUP0_LAST_OID;
-                LASSERT(seq < MAX_OBJID_GROUP);
-                LASSERT(map->groups[seq].groot);
-               rc = osd_compat_add_entry(info, osd, map->groups[seq].groot,
-                                         "LAST_ID", id, th);
-        } else {
-                name = oid2name(fid_oid(fid));
-                if (name == NULL)
-                        CWARN("UNKNOWN COMPAT FID "DFID"\n", PFID(fid));
-                else if (name[0])
-                        rc = osd_compat_add_entry(info, osd, root, name, id,
-                                                  th);
-        }
+               struct osd_obj_seq      *osd_seq;
+               /* on creation of LAST_ID we create O/<seq> hierarchy */
+               LASSERT(map);
+               osd_seq = osd_seq_load(osd, fid_seq(fid));
+               if (IS_ERR(osd_seq))
+                       RETURN(PTR_ERR(osd_seq));
+               rc = osd_obj_add_entry(info, osd, osd_seq->oos_root,
+                                      "LAST_ID", id, th);
+       } else {
+               name = oid2name(fid_oid(fid));
+               if (name == NULL)
+                       CWARN("UNKNOWN COMPAT FID "DFID"\n", PFID(fid));
+               else if (name[0])
+                       rc = osd_obj_add_entry(info, osd, root, name, id, th);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
-int osd_compat_spec_lookup(struct osd_thread_info *info,
-                          struct osd_device *osd, const struct lu_fid *fid,
-                          struct osd_inode_id *id)
+int osd_obj_spec_lookup(struct osd_thread_info *info, struct osd_device *osd,
+                       const struct lu_fid *fid, struct osd_inode_id *id)
 {
        struct dentry   *root;
        struct dentry *dentry;
@@ -561,14 +669,12 @@ int osd_compat_spec_lookup(struct osd_thread_info *info,
 
        if (fid_oid(fid) >= OFD_GROUP0_LAST_OID &&
            fid_oid(fid) < OFD_GROUP4K_LAST_OID) {
-               struct osd_compat_objid *map = osd->od_ost_map;
-               int                      seq;
+               struct osd_obj_seq *osd_seq;
 
-               LASSERT(map);
-               seq = fid_oid(fid) - OFD_GROUP0_LAST_OID;
-               LASSERT(seq < MAX_OBJID_GROUP);
-               LASSERT(map->groups[seq].groot);
-               root = map->groups[seq].groot;
+               osd_seq = osd_seq_load(osd, fid_seq(fid));
+               if (IS_ERR(osd_seq))
+                       RETURN(PTR_ERR(osd_seq));
+               root = osd_seq->oos_root;
                name = "LAST_ID";
        } else {
                root = osd_sb(osd)->s_root;
index 27ac5fa..35f8768 100644 (file)
@@ -4461,7 +4461,7 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
 
        rc = osd_shutdown(env, osd_dev(d));
 
-        osd_compat_fini(osd_dev(d));
+       osd_obj_map_fini(osd_dev(d));
 
         shrink_dcache_sb(osd_sb(osd_dev(d)));
         osd_sync(env, lu2dt_dev(d));
@@ -4522,7 +4522,7 @@ static int osd_device_init0(const struct lu_env *env,
        strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
                        sizeof(o->od_svname) - 1);
 
-       rc = osd_compat_init(o);
+       rc = osd_obj_map_init(o);
        if (rc != 0)
                GOTO(out_scrub, rc);
 
@@ -4559,7 +4559,7 @@ out_procfs:
 out_site:
        lu_site_fini(&o->od_site);
 out_compat:
-       osd_compat_fini(o);
+       osd_obj_map_fini(o);
 out_scrub:
        osd_scrub_cleanup(env, o);
 out_mnt:
index bf75e01..e62af79 100644 (file)
@@ -140,6 +140,23 @@ struct osd_object {
 #endif
 };
 
+struct osd_obj_seq {
+       /* protects on-fly initialization */
+       int              oos_subdir_count; /* subdir count for each seq */
+       struct dentry    *oos_root;        /* O/<seq> */
+       struct dentry    **oos_dirs;       /* O/<seq>/d0-dXX */
+       obd_seq          oos_seq;          /* seq number */
+       cfs_list_t       oos_seq_list;     /* list to seq_list */
+};
+
+struct osd_obj_map {
+       struct dentry    *om_root;        /* dentry for /O */
+       rwlock_t         om_seq_list_lock; /* lock for seq_list */
+       cfs_list_t       om_seq_list;      /* list head for seq */
+       int              om_subdir_count;
+       struct semaphore om_dir_init_sem;
+};
+
 #ifdef HAVE_LDISKFS_PDO
 
 #define osd_ldiskfs_find_entry(dir, dentry, de, lock)   \
@@ -277,10 +294,7 @@ struct osd_device {
        int                       od_connects;
        struct lu_site            od_site;
 
-        /*
-         * mapping for legacy OST objids
-         */
-        struct osd_compat_objid  *od_ost_map;
+       struct osd_obj_map      *od_ost_map;
 
         unsigned long long        od_readcache_max_filesize;
         int                       od_read_cache;
@@ -651,25 +665,20 @@ struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
 struct inode *osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
                           struct osd_inode_id *id, struct lu_fid *fid);
 
-int osd_compat_init(struct osd_device *dev);
-void osd_compat_fini(struct osd_device *dev);
-int osd_compat_objid_lookup(struct osd_thread_info *info,
-                            struct osd_device *osd,
-                            const struct lu_fid *fid, struct osd_inode_id *id);
-int osd_compat_objid_insert(struct osd_thread_info *info,
-                            struct osd_device *osd,
-                            const struct lu_fid *fid,
-                            const struct osd_inode_id *id, struct thandle *th);
-int osd_compat_objid_delete(struct osd_thread_info *info,
-                            struct osd_device *osd,
-                            const struct lu_fid *fid, struct thandle *th);
-int osd_compat_spec_lookup(struct osd_thread_info *info,
-                           struct osd_device *osd,
-                           const struct lu_fid *fid, struct osd_inode_id *id);
-int osd_compat_spec_insert(struct osd_thread_info *info,
-                           struct osd_device *osd,
-                           const struct lu_fid *fid,
-                           const struct osd_inode_id *id, struct thandle *th);
+int osd_obj_map_init(struct osd_device *osd);
+void osd_obj_map_fini(struct osd_device *dev);
+int osd_obj_map_lookup(struct osd_thread_info *info, struct osd_device *osd,
+                       const struct lu_fid *fid, struct osd_inode_id *id);
+int osd_obj_map_insert(struct osd_thread_info *info, struct osd_device *osd,
+                      const struct lu_fid *fid, const struct osd_inode_id *id,
+                      struct thandle *th);
+int osd_obj_map_delete(struct osd_thread_info *info, struct osd_device *osd,
+                       const struct lu_fid *fid, struct thandle *th);
+int osd_obj_spec_lookup(struct osd_thread_info *info, struct osd_device *osd,
+                       const struct lu_fid *fid, struct osd_inode_id *id);
+int osd_obj_spec_insert(struct osd_thread_info *info, struct osd_device *osd,
+                       const struct lu_fid *fid, const struct osd_inode_id *id,
+                       struct thandle *th);
 
 void osd_scrub_file_reset(struct osd_scrub *scrub, __u8 *uuid, __u64 flags);
 int osd_scrub_file_store(struct osd_scrub *scrub);
@@ -816,6 +825,17 @@ static inline journal_t *osd_journal(const struct osd_device *dev)
         return LDISKFS_SB(osd_sb(dev))->s_journal;
 }
 
+static inline struct md_site *osd_md_site(struct osd_device *osd)
+{
+       return osd->od_dt_dev.dd_lu_dev.ld_site->ld_md_site;
+}
+
+static inline char *osd_name(struct osd_device *osd)
+{
+       return osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name;
+}
+
+
 extern const struct dt_body_operations osd_body_ops;
 extern struct lu_context_key osd_key;
 
index 9137865..30d2f54 100644 (file)
@@ -504,7 +504,8 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
 
         if (fid_is_idif(fid) || fid_seq(fid) == FID_SEQ_LLOG) {
                 /* old OSD obj id */
-                rc = osd_compat_objid_lookup(info, osd, fid, id);
+               /* FIXME: actually for all of the OST object */
+               rc = osd_obj_map_lookup(info, osd, fid, id);
         } else if (fid_is_igif(fid)) {
                 lu_igif_to_id(fid, id);
         } else if (fid_is_fs_root(fid)) {
@@ -515,7 +516,7 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
                        return osd_acct_obj_lookup(info, osd, fid, id);
 
                if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE))
-                       return osd_compat_spec_lookup(info, osd, fid, id);
+                       return osd_obj_spec_lookup(info, osd, fid, id);
 
                rc = __osd_oi_lookup(info, osd, fid, id);
         }
@@ -568,11 +569,11 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_device *osd,
                return 0;
 
        if (fid_is_idif(fid) || fid_seq(fid) == FID_SEQ_LLOG)
-               return osd_compat_objid_insert(info, osd, fid, id, th);
+               return osd_obj_map_insert(info, osd, fid, id, th);
 
        /* Server mount should not depends on OI files */
        if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE))
-               return osd_compat_spec_insert(info, osd, fid, id, th);
+               return osd_obj_spec_insert(info, osd, fid, id, th);
 
        fid_cpu_to_be(oi_fid, fid);
        osd_id_pack(oi_id, id);
@@ -621,7 +622,7 @@ int osd_oi_delete(struct osd_thread_info *info,
        LASSERT(fid_seq(fid) != FID_SEQ_LOCAL_FILE);
 
        if (fid_is_idif(fid) || fid_seq(fid) == FID_SEQ_LLOG)
-               return osd_compat_objid_delete(info, osd, fid, th);
+               return osd_obj_map_delete(info, osd, fid, th);
 
        fid_cpu_to_be(oi_fid, fid);
        return osd_oi_iam_delete(info, osd_fid2oi(osd, fid),