From 467521a236e4c4ef2ad57358cb645083e89f8bdf Mon Sep 17 00:00:00 2001 From: wangdi Date: Tue, 24 Sep 2013 02:16:38 -0700 Subject: [PATCH] LU-1187 osd: allocate osd_compat_objid_seq dynamically 1. Allocate osd_compat_objid_seq dynamically, so osd can load as many as seq it wants. 2. Change group to seq. 3. remove the compat from name, because we will still use the same disk-layout for FID on OST. Change-Id: I52d40ecd805aa44074838496e44ed2b71fd39f92 Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/4323 Reviewed-by: Andreas Dilger Tested-by: Hudson Reviewed-by: Fan Yong Tested-by: Maloo Reviewed-by: Alex Zhuravlev --- lustre/include/lustre_disk.h | 1 + lustre/include/lustre_fid.h | 1 + lustre/osd-ldiskfs/osd_compat.c | 582 ++++++++++++++++++++++---------------- lustre/osd-ldiskfs/osd_handler.c | 6 +- lustre/osd-ldiskfs/osd_internal.h | 66 +++-- lustre/osd-ldiskfs/osd_oi.c | 11 +- 6 files changed, 398 insertions(+), 269 deletions(-) diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index 8902c3c..7869803 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -60,6 +60,7 @@ #define MOUNT_DATA_FILE MOUNT_CONFIGS_DIR"/"CONFIGS_FILE #define LAST_RCVD "last_rcvd" #define LOV_OBJID "lov_objid" +#define LOV_OBJSEQ "lov_objseq" #define HEALTH_CHECK "health_check" #define CAPA_KEYS "capa_keys" #define CHANGELOG_USERS "changelog_users" diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 0ab7b08..78207ca 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -224,6 +224,7 @@ enum local_oid { LLOG_CATALOGS_OID = 4118UL, MGS_CONFIGS_OID = 4119UL, OFD_HEALTH_CHECK_OID = 4120UL, + MDD_LOV_OBJ_OSEQ = 4121UL, }; static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid) diff --git a/lustre/osd-ldiskfs/osd_compat.c b/lustre/osd-ldiskfs/osd_compat.c index 27a1f2c..2a79ae1 100644 --- a/lustre/osd-ldiskfs/osd_compat.c +++ b/lustre/osd-ldiskfs/osd_compat.c @@ -35,7 +35,7 @@ * * lustre/osd/osd_compat.c * - * on-disk compatibility stuff for OST + * on-disk structure for managing /O * * Author: Alex Zhuravlev */ @@ -57,25 +57,6 @@ #include "osd_internal.h" #include "osd_oi.h" -struct osd_compat_objid_seq { - /* protects on-fly initialization */ - struct semaphore dir_init_sem; - /* file storing last created objid */ - struct osd_inode_id last_id; - struct dentry *groot; /* O/ */ - struct dentry **dirs; /* O//d0-dXX */ -}; - -#define MAX_OBJID_GROUP (FID_SEQ_ECHO + 1) - -struct osd_compat_objid { - int subdir_count; - struct dentry *root; - struct osd_inode_id last_rcvd_id; - struct osd_inode_id last_seq_id; - struct osd_compat_objid_seq groups[MAX_OBJID_GROUP]; -}; - static void osd_push_ctxt(const struct osd_device *dev, struct lvfs_run_ctxt *newctxt, struct lvfs_run_ctxt *save) @@ -88,89 +69,11 @@ static void osd_push_ctxt(const struct osd_device *dev, push_ctxt(save, newctxt, NULL); } -void osd_compat_seq_fini(struct osd_device *osd, int seq) +static void osd_pop_ctxt(const struct osd_device *dev, + struct lvfs_run_ctxt *new, + struct lvfs_run_ctxt *save) { - struct osd_compat_objid_seq *grp; - struct osd_compat_objid *map = osd->od_ost_map; - int i; - - ENTRY; - - grp = &map->groups[seq]; - if (grp->groot ==NULL) - RETURN_EXIT; - LASSERT(grp->dirs); - - for (i = 0; i < map->subdir_count; i++) { - if (grp->dirs[i] == NULL) - break; - dput(grp->dirs[i]); - } - - OBD_FREE(grp->dirs, sizeof(struct dentry *) * map->subdir_count); - dput(grp->groot); - EXIT; -} - -int osd_compat_seq_init(struct osd_device *osd, int seq) -{ - struct osd_compat_objid_seq *grp; - struct osd_compat_objid *map; - struct dentry *d; - int rc = 0; - char name[32]; - int i; - ENTRY; - - map = osd->od_ost_map; - LASSERT(map); - LASSERT(map->root); - grp = &map->groups[seq]; - - if (grp->groot != NULL) - RETURN(0); - - down(&grp->dir_init_sem); - - sprintf(name, "%d", seq); - d = simple_mkdir(map->root, osd->od_mnt, name, 0755, 1); - if (IS_ERR(d)) { - rc = PTR_ERR(d); - GOTO(out, rc); - } else if (d->d_inode == NULL) { - rc = -EFAULT; - dput(d); - GOTO(out, rc); - } - - LASSERT(grp->dirs == NULL); - OBD_ALLOC(grp->dirs, sizeof(d) * map->subdir_count); - if (grp->dirs == NULL) { - dput(d); - GOTO(out, rc = -ENOMEM); - } - - grp->groot = d; - for (i = 0; i < map->subdir_count; i++) { - sprintf(name, "d%d", i); - d = simple_mkdir(grp->groot, osd->od_mnt, name, 0755, 1); - if (IS_ERR(d)) { - rc = PTR_ERR(d); - break; - } else if (d->d_inode == NULL) { - rc = -EFAULT; - dput(d); - break; - } - - grp->dirs[i] = d; - } - - if (rc) - osd_compat_seq_fini(osd, seq); -out: - up(&grp->dir_init_sem); - RETURN(rc); + pop_ctxt(save, new, NULL); } int osd_last_rcvd_subdir_count(struct osd_device *osd) @@ -211,25 +114,6 @@ out: return count; } -void osd_compat_fini(struct osd_device *dev) -{ - int i; - - ENTRY; - - if (dev->od_ost_map == NULL) - RETURN_EXIT; - - for (i = 0; i < MAX_OBJID_GROUP; i++) - osd_compat_seq_fini(dev, i); - - dput(dev->od_ost_map->root); - OBD_FREE_PTR(dev->od_ost_map); - dev->od_ost_map = NULL; - - EXIT; -} - /* * directory structure on legacy OST: * @@ -240,15 +124,13 @@ void osd_compat_fini(struct osd_device *dev) * CONFIGS * */ -int osd_compat_init(struct osd_device *dev) +int osd_ost_init(struct osd_device *dev) { struct lvfs_run_ctxt new; struct lvfs_run_ctxt save; struct dentry *rootd = osd_sb(dev)->s_root; struct dentry *d; int rc; - int i; - ENTRY; OBD_ALLOC_PTR(dev->od_ost_map); @@ -262,36 +144,117 @@ int osd_compat_init(struct osd_device *dev) RETURN(rc); } - dev->od_ost_map->subdir_count = rc; + dev->od_ost_map->om_subdir_count = rc; rc = 0; + CFS_INIT_LIST_HEAD(&dev->od_ost_map->om_seq_list); + rwlock_init(&dev->od_ost_map->om_seq_list_lock); + sema_init(&dev->od_ost_map->om_dir_init_sem, 1); + LASSERT(dev->od_fsops); osd_push_ctxt(dev, &new, &save); d = simple_mkdir(rootd, dev->od_mnt, "O", 0755, 1); - pop_ctxt(&save, &new, NULL); + if (IS_ERR(d)) + GOTO(cleanup, rc = PTR_ERR(d)); + + dev->od_ost_map->om_root = d; + +cleanup: + osd_pop_ctxt(dev, &new, &save); if (IS_ERR(d)) { OBD_FREE_PTR(dev->od_ost_map); RETURN(PTR_ERR(d)); } - dev->od_ost_map->root = d; + RETURN(rc); +} + +static void osd_seq_free(struct osd_obj_map *map, + struct osd_obj_seq *osd_seq) +{ + int j; - /* Initialize all groups */ - for (i = 0; i < MAX_OBJID_GROUP; i++) { - sema_init(&dev->od_ost_map->groups[i].dir_init_sem, 1); - rc = osd_compat_seq_init(dev, i); - if (rc) { - osd_compat_fini(dev); - break; + cfs_list_del_init(&osd_seq->oos_seq_list); + + if (osd_seq->oos_dirs) { + for (j = 0; j < osd_seq->oos_subdir_count; j++) { + if (osd_seq->oos_dirs[j]) + dput(osd_seq->oos_dirs[j]); } + OBD_FREE(osd_seq->oos_dirs, + sizeof(struct dentry *) * osd_seq->oos_subdir_count); } + if (osd_seq->oos_root) + dput(osd_seq->oos_root); + + OBD_FREE_PTR(osd_seq); +} + +int osd_obj_map_init(struct osd_device *dev) +{ + int rc; + ENTRY; + + /* prepare structures for OST */ + rc = osd_ost_init(dev); + + /* prepare structures for MDS */ + RETURN(rc); } -int osd_compat_del_entry(struct osd_thread_info *info, struct osd_device *osd, - struct dentry *dird, char *name, struct thandle *th) +struct osd_obj_seq *osd_seq_find_locked(struct osd_obj_map *map, obd_seq seq) +{ + struct osd_obj_seq *osd_seq; + + cfs_list_for_each_entry(osd_seq, &map->om_seq_list, oos_seq_list) { + if (osd_seq->oos_seq == seq) + return osd_seq; + } + return NULL; +} + +struct osd_obj_seq *osd_seq_find(struct osd_obj_map *map, obd_seq seq) +{ + struct osd_obj_seq *osd_seq; + + read_lock(&map->om_seq_list_lock); + osd_seq = osd_seq_find_locked(map, seq); + read_unlock(&map->om_seq_list_lock); + return osd_seq; +} + +void osd_obj_map_fini(struct osd_device *dev) +{ + struct osd_obj_seq *osd_seq; + struct osd_obj_seq *tmp; + struct osd_obj_map *map = dev->od_ost_map; + ENTRY; + + map = dev->od_ost_map; + if (map == NULL) + return; + + write_lock(&dev->od_ost_map->om_seq_list_lock); + cfs_list_for_each_entry_safe(osd_seq, tmp, + &dev->od_ost_map->om_seq_list, + oos_seq_list) { + osd_seq_free(map, osd_seq); + } + write_unlock(&dev->od_ost_map->om_seq_list_lock); + if (map->om_root) + dput(map->om_root); + OBD_FREE_PTR(dev->od_ost_map); + dev->od_ost_map = NULL; + EXIT; +} + +static int osd_obj_del_entry(struct osd_thread_info *info, + struct osd_device *osd, + struct dentry *dird, char *name, + struct thandle *th) { struct ldiskfs_dir_entry_2 *de; struct buffer_head *bh; @@ -327,9 +290,11 @@ int osd_compat_del_entry(struct osd_thread_info *info, struct osd_device *osd, RETURN(rc); } -int osd_compat_add_entry(struct osd_thread_info *info, struct osd_device *osd, - struct dentry *dir, char *name, - const struct osd_inode_id *id, struct thandle *th) +int osd_obj_add_entry(struct osd_thread_info *info, + struct osd_device *osd, + struct dentry *dir, char *name, + const struct osd_inode_id *id, + struct thandle *th) { struct osd_thandle *oh; struct dentry *child; @@ -362,48 +327,186 @@ int osd_compat_add_entry(struct osd_thread_info *info, struct osd_device *osd, RETURN(rc); } -int osd_compat_objid_lookup(struct osd_thread_info *info, - struct osd_device *dev, const struct lu_fid *fid, - struct osd_inode_id *id) +/** + * Use LPU64 for legacy OST sequences, but use LPX64i for new + * sequences names, so that the O/{seq}/dN/{oid} more closely + * follows the DFID/PFID format. This makes it easier to map from + * debug messages to objects in the future, and the legacy space + * of FID_SEQ_OST_MDT0 will be unused in the future. + **/ +static inline void osd_seq_name(char *seq_name, obd_seq seq) { - struct osd_compat_objid *map; - struct dentry *d; - struct dentry *d_seq; - struct ost_id *ostid = &info->oti_ostid; - int dirn; - char name[32]; - struct ldiskfs_dir_entry_2 *de; - struct buffer_head *bh; - struct inode *dir; - struct inode *inode; + sprintf(seq_name, (fid_seq_is_rsvd(seq) || + fid_seq_is_mdt0(seq)) ? LPU64 : LPX64i, + fid_seq_is_idif(seq) ? 0 : seq); +} + +static inline void osd_oid_name(char *name, const struct lu_fid *fid, obd_id id) +{ + sprintf(name, (fid_seq_is_rsvd(fid_seq(fid)) || + fid_seq_is_mdt0(fid_seq(fid)) || + fid_seq_is_idif(fid_seq(fid))) ? LPU64 : LPX64i, id); +} + +/* external locking is required */ +static int osd_seq_load_locked(struct osd_device *osd, + struct osd_obj_seq *osd_seq) +{ + struct osd_obj_map *map = osd->od_ost_map; + struct dentry *seq_dir; + int rc = 0; + int i; + char seq_name[32]; + ENTRY; + + if (osd_seq->oos_root != NULL) + RETURN(0); + + LASSERT(map); + LASSERT(map->om_root); + + osd_seq_name(seq_name, osd_seq->oos_seq); + + seq_dir = simple_mkdir(map->om_root, osd->od_mnt, seq_name, 0755, 1); + if (IS_ERR(seq_dir)) + GOTO(out_err, rc = PTR_ERR(seq_dir)); + else if (seq_dir->d_inode == NULL) + GOTO(out_put, rc = -EFAULT); + + osd_seq->oos_root = seq_dir; + + LASSERT(osd_seq->oos_dirs == NULL); + OBD_ALLOC(osd_seq->oos_dirs, + sizeof(seq_dir) * osd_seq->oos_subdir_count); + if (osd_seq->oos_dirs == NULL) + GOTO(out_put, rc = -ENOMEM); + + for (i = 0; i < osd_seq->oos_subdir_count; i++) { + struct dentry *dir; + char name[32]; + + sprintf(name, "d%u", i); + dir = simple_mkdir(osd_seq->oos_root, osd->od_mnt, name, + 0700, 1); + if (IS_ERR(dir)) { + rc = PTR_ERR(dir); + } else if (dir->d_inode) { + osd_seq->oos_dirs[i] = dir; + rc = 0; + } else { + LBUG(); + } + } + + if (rc != 0) + osd_seq_free(map, osd_seq); +out_put: + if (rc != 0) { + dput(seq_dir); + osd_seq->oos_root = NULL; + } +out_err: + RETURN(rc); +} + +static struct osd_obj_seq *osd_seq_load(struct osd_device *osd, obd_seq seq) +{ + struct osd_obj_map *map; + struct osd_obj_seq *osd_seq; + int rc = 0; + ENTRY; + + map = osd->od_ost_map; + LASSERT(map); + LASSERT(map->om_root); + + osd_seq = osd_seq_find(map, seq); + if (likely(osd_seq != NULL)) + RETURN(osd_seq); + + /* Serializing init process */ + down(&map->om_dir_init_sem); + + /* Check whether the seq has been added */ + read_lock(&map->om_seq_list_lock); + osd_seq = osd_seq_find_locked(map, seq); + if (osd_seq != NULL) { + read_unlock(&map->om_seq_list_lock); + GOTO(cleanup, rc = 0); + } + read_unlock(&map->om_seq_list_lock); + + OBD_ALLOC_PTR(osd_seq); + if (osd_seq == NULL) + GOTO(cleanup, rc = -ENOMEM); + + CFS_INIT_LIST_HEAD(&osd_seq->oos_seq_list); + osd_seq->oos_seq = seq; + /* Init subdir count to be 32, but each seq can have + * different subdir count */ + osd_seq->oos_subdir_count = map->om_subdir_count; + rc = osd_seq_load_locked(osd, osd_seq); + if (rc != 0) + GOTO(cleanup, rc); + + write_lock(&map->om_seq_list_lock); + cfs_list_add(&osd_seq->oos_seq_list, &map->om_seq_list); + write_unlock(&map->om_seq_list_lock); + +cleanup: + up(&map->om_dir_init_sem); + if (rc != 0) { + if (osd_seq != NULL) + OBD_FREE_PTR(osd_seq); + RETURN(ERR_PTR(rc)); + } + + RETURN(osd_seq); +} + +int osd_obj_map_lookup(struct osd_thread_info *info, struct osd_device *dev, + const struct lu_fid *fid, struct osd_inode_id *id) +{ + struct osd_obj_map *map; + struct osd_obj_seq *osd_seq; + struct dentry *d_seq; + struct dentry *child; + struct ost_id *ostid = &info->oti_ostid; + int dirn; + char name[32]; + struct ldiskfs_dir_entry_2 *de; + struct buffer_head *bh; + struct inode *dir; + struct inode *inode; ENTRY; /* on the very first lookup we find and open directories */ map = dev->od_ost_map; LASSERT(map); - LASSERT(map->root); + LASSERT(map->om_root); fid_ostid_pack(fid, ostid); - LASSERT(ostid->oi_seq < MAX_OBJID_GROUP); - LASSERT(map->subdir_count > 0); - LASSERT(map->groups[ostid->oi_seq].groot); + osd_seq = osd_seq_load(dev, ostid->oi_seq); + if (IS_ERR(osd_seq)) + RETURN(PTR_ERR(osd_seq)); - dirn = ostid->oi_id & (map->subdir_count - 1); - d = map->groups[ostid->oi_seq].dirs[dirn]; - LASSERT(d); + dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1); + d_seq = osd_seq->oos_dirs[dirn]; + LASSERT(d_seq); + + osd_oid_name(name, fid, ostid->oi_id); - sprintf(name, "%llu", ostid->oi_id); - d_seq = &info->oti_child_dentry; - d_seq->d_parent = d; - d_seq->d_name.hash = 0; - d_seq->d_name.name = name; - /* XXX: we can use rc from sprintf() instead of strlen() */ - d_seq->d_name.len = strlen(name); + child = &info->oti_child_dentry; + child->d_parent = d_seq; + child->d_name.hash = 0; + child->d_name.name = name; + /* XXX: we can use rc from sprintf() instead of strlen() */ + child->d_name.len = strlen(name); - dir = d->d_inode; + dir = d_seq->d_inode; mutex_lock(&dir->i_mutex); - bh = osd_ldiskfs_find_entry(dir, d_seq, &de, NULL); + bh = osd_ldiskfs_find_entry(dir, child, &de, NULL); mutex_unlock(&dir->i_mutex); if (bh == NULL) @@ -420,63 +523,68 @@ int osd_compat_objid_lookup(struct osd_thread_info *info, RETURN(0); } -int osd_compat_objid_insert(struct osd_thread_info *info, - struct osd_device *osd, - const struct lu_fid *fid, - const struct osd_inode_id *id, - struct thandle *th) +int osd_obj_map_insert(struct osd_thread_info *info, + struct osd_device *osd, + const struct lu_fid *fid, + const struct osd_inode_id *id, + struct thandle *th) { - struct osd_compat_objid *map; - struct dentry *d; - struct ost_id *ostid = &info->oti_ostid; - int dirn, rc = 0; - char name[32]; + struct osd_obj_map *map; + struct osd_obj_seq *osd_seq; + struct dentry *d; + struct ost_id *ostid = &info->oti_ostid; + int dirn, rc = 0; + char name[32]; ENTRY; map = osd->od_ost_map; LASSERT(map); - LASSERT(map->root); - LASSERT(map->subdir_count > 0); - LASSERT(map->groups[ostid->oi_seq].groot); - /* map fid to group:objid */ + /* map fid to seq:objid */ fid_ostid_pack(fid, ostid); - dirn = ostid->oi_id & (map->subdir_count - 1); - d = map->groups[ostid->oi_seq].dirs[dirn]; + + osd_seq = osd_seq_load(osd, ostid->oi_seq); + if (IS_ERR(osd_seq)) + RETURN(PTR_ERR(osd_seq)); + + dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1); + d = osd_seq->oos_dirs[dirn]; LASSERT(d); - sprintf(name, "%llu", ostid->oi_id); - rc = osd_compat_add_entry(info, osd, d, name, id, th); + osd_oid_name(name, fid, ostid->oi_id); + rc = osd_obj_add_entry(info, osd, d, name, id, th); RETURN(rc); } -int osd_compat_objid_delete(struct osd_thread_info *info, - struct osd_device *osd, - const struct lu_fid *fid, struct thandle *th) +int osd_obj_map_delete(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, struct thandle *th) { - struct osd_compat_objid *map; - struct dentry *d; - struct ost_id *ostid = &info->oti_ostid; - int dirn, rc = 0; - char name[32]; + struct osd_obj_map *map; + struct osd_obj_seq *osd_seq; + struct dentry *d; + struct ost_id *ostid = &info->oti_ostid; + int dirn, rc = 0; + char name[32]; ENTRY; map = osd->od_ost_map; LASSERT(map); - LASSERT(map->root); - LASSERT(map->subdir_count > 0); - LASSERT(map->groups[ostid->oi_seq].groot); - /* map fid to group:objid */ + /* map fid to seq:objid */ fid_ostid_pack(fid, ostid); - dirn = ostid->oi_id & (map->subdir_count - 1); - d = map->groups[ostid->oi_seq].dirs[dirn]; - LASSERT(d); - sprintf(name, "%llu", ostid->oi_id); - rc = osd_compat_del_entry(info, osd, d, name, th); + osd_seq = osd_seq_load(osd, ostid->oi_seq); + if (IS_ERR(osd_seq)) + GOTO(cleanup, rc = PTR_ERR(osd_seq)); + dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1); + d = osd_seq->oos_dirs[dirn]; + LASSERT(d); + + osd_oid_name(name, fid, ostid->oi_id); + rc = osd_obj_del_entry(info, osd, d, name, th); +cleanup: RETURN(rc); } @@ -501,6 +609,7 @@ static const struct named_oid oids[] = { { LLOG_CATALOGS_OID, "CATALOGS" }, { MGS_CONFIGS_OID, "" /* MOUNT_CONFIGS_DIR */ }, { OFD_HEALTH_CHECK_OID, HEALTH_CHECK }, + { MDD_LOV_OBJ_OSEQ, LOV_OBJSEQ }, { 0, NULL } }; @@ -516,41 +625,40 @@ static char *oid2name(const unsigned long oid) return NULL; } -int osd_compat_spec_insert(struct osd_thread_info *info, - struct osd_device *osd, const struct lu_fid *fid, - const struct osd_inode_id *id, struct thandle *th) +int osd_obj_spec_insert(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, + const struct osd_inode_id *id, + struct thandle *th) { - struct osd_compat_objid *map = osd->od_ost_map; - struct dentry *root = osd_sb(osd)->s_root; - char *name; - int rc = 0; - int seq; - ENTRY; + struct osd_obj_map *map = osd->od_ost_map; + struct dentry *root = osd_sb(osd)->s_root; + char *name; + int rc = 0; + ENTRY; if (fid_oid(fid) >= OFD_GROUP0_LAST_OID && fid_oid(fid) < OFD_GROUP4K_LAST_OID) { - /* on creation of LAST_ID we create O/ hierarchy */ - LASSERT(map); - seq = fid_oid(fid) - OFD_GROUP0_LAST_OID; - LASSERT(seq < MAX_OBJID_GROUP); - LASSERT(map->groups[seq].groot); - rc = osd_compat_add_entry(info, osd, map->groups[seq].groot, - "LAST_ID", id, th); - } else { - name = oid2name(fid_oid(fid)); - if (name == NULL) - CWARN("UNKNOWN COMPAT FID "DFID"\n", PFID(fid)); - else if (name[0]) - rc = osd_compat_add_entry(info, osd, root, name, id, - th); - } + struct osd_obj_seq *osd_seq; + /* on creation of LAST_ID we create O/ hierarchy */ + LASSERT(map); + osd_seq = osd_seq_load(osd, fid_seq(fid)); + if (IS_ERR(osd_seq)) + RETURN(PTR_ERR(osd_seq)); + rc = osd_obj_add_entry(info, osd, osd_seq->oos_root, + "LAST_ID", id, th); + } else { + name = oid2name(fid_oid(fid)); + if (name == NULL) + CWARN("UNKNOWN COMPAT FID "DFID"\n", PFID(fid)); + else if (name[0]) + rc = osd_obj_add_entry(info, osd, root, name, id, th); + } - RETURN(rc); + RETURN(rc); } -int osd_compat_spec_lookup(struct osd_thread_info *info, - struct osd_device *osd, const struct lu_fid *fid, - struct osd_inode_id *id) +int osd_obj_spec_lookup(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, struct osd_inode_id *id) { struct dentry *root; struct dentry *dentry; @@ -561,14 +669,12 @@ int osd_compat_spec_lookup(struct osd_thread_info *info, if (fid_oid(fid) >= OFD_GROUP0_LAST_OID && fid_oid(fid) < OFD_GROUP4K_LAST_OID) { - struct osd_compat_objid *map = osd->od_ost_map; - int seq; + struct osd_obj_seq *osd_seq; - LASSERT(map); - seq = fid_oid(fid) - OFD_GROUP0_LAST_OID; - LASSERT(seq < MAX_OBJID_GROUP); - LASSERT(map->groups[seq].groot); - root = map->groups[seq].groot; + osd_seq = osd_seq_load(osd, fid_seq(fid)); + if (IS_ERR(osd_seq)) + RETURN(PTR_ERR(osd_seq)); + root = osd_seq->oos_root; name = "LAST_ID"; } else { root = osd_sb(osd)->s_root; diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 27ac5fa..35f8768 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -4461,7 +4461,7 @@ static struct lu_device *osd_device_fini(const struct lu_env *env, rc = osd_shutdown(env, osd_dev(d)); - osd_compat_fini(osd_dev(d)); + osd_obj_map_fini(osd_dev(d)); shrink_dcache_sb(osd_sb(osd_dev(d))); osd_sync(env, lu2dt_dev(d)); @@ -4522,7 +4522,7 @@ static int osd_device_init0(const struct lu_env *env, strncpy(o->od_svname, lustre_cfg_string(cfg, 4), sizeof(o->od_svname) - 1); - rc = osd_compat_init(o); + rc = osd_obj_map_init(o); if (rc != 0) GOTO(out_scrub, rc); @@ -4559,7 +4559,7 @@ out_procfs: out_site: lu_site_fini(&o->od_site); out_compat: - osd_compat_fini(o); + osd_obj_map_fini(o); out_scrub: osd_scrub_cleanup(env, o); out_mnt: diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index bf75e012..e62af79 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -140,6 +140,23 @@ struct osd_object { #endif }; +struct osd_obj_seq { + /* protects on-fly initialization */ + int oos_subdir_count; /* subdir count for each seq */ + struct dentry *oos_root; /* O/ */ + struct dentry **oos_dirs; /* O//d0-dXX */ + obd_seq oos_seq; /* seq number */ + cfs_list_t oos_seq_list; /* list to seq_list */ +}; + +struct osd_obj_map { + struct dentry *om_root; /* dentry for /O */ + rwlock_t om_seq_list_lock; /* lock for seq_list */ + cfs_list_t om_seq_list; /* list head for seq */ + int om_subdir_count; + struct semaphore om_dir_init_sem; +}; + #ifdef HAVE_LDISKFS_PDO #define osd_ldiskfs_find_entry(dir, dentry, de, lock) \ @@ -277,10 +294,7 @@ struct osd_device { int od_connects; struct lu_site od_site; - /* - * mapping for legacy OST objids - */ - struct osd_compat_objid *od_ost_map; + struct osd_obj_map *od_ost_map; unsigned long long od_readcache_max_filesize; int od_read_cache; @@ -651,25 +665,20 @@ struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev, struct inode *osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev, struct osd_inode_id *id, struct lu_fid *fid); -int osd_compat_init(struct osd_device *dev); -void osd_compat_fini(struct osd_device *dev); -int osd_compat_objid_lookup(struct osd_thread_info *info, - struct osd_device *osd, - const struct lu_fid *fid, struct osd_inode_id *id); -int osd_compat_objid_insert(struct osd_thread_info *info, - struct osd_device *osd, - const struct lu_fid *fid, - const struct osd_inode_id *id, struct thandle *th); -int osd_compat_objid_delete(struct osd_thread_info *info, - struct osd_device *osd, - const struct lu_fid *fid, struct thandle *th); -int osd_compat_spec_lookup(struct osd_thread_info *info, - struct osd_device *osd, - const struct lu_fid *fid, struct osd_inode_id *id); -int osd_compat_spec_insert(struct osd_thread_info *info, - struct osd_device *osd, - const struct lu_fid *fid, - const struct osd_inode_id *id, struct thandle *th); +int osd_obj_map_init(struct osd_device *osd); +void osd_obj_map_fini(struct osd_device *dev); +int osd_obj_map_lookup(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, struct osd_inode_id *id); +int osd_obj_map_insert(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, const struct osd_inode_id *id, + struct thandle *th); +int osd_obj_map_delete(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, struct thandle *th); +int osd_obj_spec_lookup(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, struct osd_inode_id *id); +int osd_obj_spec_insert(struct osd_thread_info *info, struct osd_device *osd, + const struct lu_fid *fid, const struct osd_inode_id *id, + struct thandle *th); void osd_scrub_file_reset(struct osd_scrub *scrub, __u8 *uuid, __u64 flags); int osd_scrub_file_store(struct osd_scrub *scrub); @@ -816,6 +825,17 @@ static inline journal_t *osd_journal(const struct osd_device *dev) return LDISKFS_SB(osd_sb(dev))->s_journal; } +static inline struct md_site *osd_md_site(struct osd_device *osd) +{ + return osd->od_dt_dev.dd_lu_dev.ld_site->ld_md_site; +} + +static inline char *osd_name(struct osd_device *osd) +{ + return osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name; +} + + extern const struct dt_body_operations osd_body_ops; extern struct lu_context_key osd_key; diff --git a/lustre/osd-ldiskfs/osd_oi.c b/lustre/osd-ldiskfs/osd_oi.c index 9137865..30d2f54 100644 --- a/lustre/osd-ldiskfs/osd_oi.c +++ b/lustre/osd-ldiskfs/osd_oi.c @@ -504,7 +504,8 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd, if (fid_is_idif(fid) || fid_seq(fid) == FID_SEQ_LLOG) { /* old OSD obj id */ - rc = osd_compat_objid_lookup(info, osd, fid, id); + /* FIXME: actually for all of the OST object */ + rc = osd_obj_map_lookup(info, osd, fid, id); } else if (fid_is_igif(fid)) { lu_igif_to_id(fid, id); } else if (fid_is_fs_root(fid)) { @@ -515,7 +516,7 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd, return osd_acct_obj_lookup(info, osd, fid, id); if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) - return osd_compat_spec_lookup(info, osd, fid, id); + return osd_obj_spec_lookup(info, osd, fid, id); rc = __osd_oi_lookup(info, osd, fid, id); } @@ -568,11 +569,11 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_device *osd, return 0; if (fid_is_idif(fid) || fid_seq(fid) == FID_SEQ_LLOG) - return osd_compat_objid_insert(info, osd, fid, id, th); + return osd_obj_map_insert(info, osd, fid, id, th); /* Server mount should not depends on OI files */ if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) - return osd_compat_spec_insert(info, osd, fid, id, th); + return osd_obj_spec_insert(info, osd, fid, id, th); fid_cpu_to_be(oi_fid, fid); osd_id_pack(oi_id, id); @@ -621,7 +622,7 @@ int osd_oi_delete(struct osd_thread_info *info, LASSERT(fid_seq(fid) != FID_SEQ_LOCAL_FILE); if (fid_is_idif(fid) || fid_seq(fid) == FID_SEQ_LLOG) - return osd_compat_objid_delete(info, osd, fid, th); + return osd_obj_map_delete(info, osd, fid, th); fid_cpu_to_be(oi_fid, fid); return osd_oi_iam_delete(info, osd_fid2oi(osd, fid), -- 1.8.3.1