From d6ab8d43c8b5c47dccee43c31ed9f7b6e6513700 Mon Sep 17 00:00:00 2001 From: wangdi Date: Sat, 16 Nov 2013 00:41:27 -0800 Subject: [PATCH] LU-1187 osd: add agent/local inode for remote directory A local inode will be created for remote directory entry, so to make e2fsck check pass. The remote directory on the remote MDT will be put under the agent directory. Remove LA_TYPE checking during osd_inode_set, because when initialize the remote object, it needs to get TYPE information from remote attr get, where we need type information anyway, (osp_md_attr_get ->out_attr_get->osd_attr_get | LA_TYPE). So osd_attr_get will always return LA_TYPE valid information here, then if it do attr_get/modify/attr_set might hit this ASSERT, So during osd_attr_set, we can just ignore type. Signed-off-by: wang di Change-Id: I7f177fa8ae2ab450cde78a8c2da9a2770ec6b0cb Reviewed-on: http://review.whamcloud.com/4931 Reviewed-by: Alex Zhuravlev Reviewed-by: Andreas Dilger Tested-by: Hudson Tested-by: Maloo --- lustre/osd-ldiskfs/osd_compat.c | 200 +++++++++++++++++++----- lustre/osd-ldiskfs/osd_handler.c | 318 +++++++++++++++++++++++++++++++------- lustre/osd-ldiskfs/osd_internal.h | 21 +++ 3 files changed, 445 insertions(+), 94 deletions(-) diff --git a/lustre/osd-ldiskfs/osd_compat.c b/lustre/osd-ldiskfs/osd_compat.c index 62837d8..09045b7 100644 --- a/lustre/osd-ldiskfs/osd_compat.c +++ b/lustre/osd-ldiskfs/osd_compat.c @@ -115,6 +115,123 @@ out: } /* + * directory structure on legacy MDT: + * + * REM_OBJ_DIR/ per mdt + * AGENT_OBJ_DIR/ per mdt + * + */ +static const char remote_obj_dir[] = "REM_OBJ_DIR"; +static const char agent_obj_dir[] = "AGENT_OBJ_DIR"; +int osd_mdt_init(struct osd_device *dev) +{ + struct lvfs_run_ctxt new; + struct lvfs_run_ctxt save; + struct dentry *parent; + struct osd_mdobj_map *omm; + struct dentry *d; + int rc = 0; + ENTRY; + + OBD_ALLOC_PTR(dev->od_mdt_map); + if (dev->od_mdt_map == NULL) + RETURN(-ENOMEM); + + omm = dev->od_mdt_map; + + LASSERT(dev->od_fsops); + + parent = osd_sb(dev)->s_root; + osd_push_ctxt(dev, &new, &save); + + d = simple_mkdir(parent, dev->od_mnt, agent_obj_dir, + 0755, 1); + if (IS_ERR(d)) + GOTO(cleanup, rc = PTR_ERR(d)); + + omm->omm_agent_dentry = d; + +cleanup: + pop_ctxt(&save, &new, NULL); + if (rc) { + if (omm->omm_agent_dentry != NULL) + dput(omm->omm_agent_dentry); + OBD_FREE_PTR(omm); + dev->od_mdt_map = NULL; + } + RETURN(rc); +} + +static void osd_mdt_fini(struct osd_device *osd) +{ + struct osd_mdobj_map *omm = osd->od_mdt_map; + + if (omm == NULL) + return; + + if (omm->omm_agent_dentry) + dput(omm->omm_agent_dentry); + + OBD_FREE_PTR(omm); + osd->od_ost_map = NULL; +} + +int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd, + struct osd_object *obj, struct osd_thandle *oh) +{ + struct osd_mdobj_map *omm = osd->od_mdt_map; + struct osd_thread_info *oti = osd_oti_get(env); + char *name_buf = oti->oti_name; + struct dentry *agent; + struct dentry *parent; + int rc; + + parent = omm->omm_agent_dentry; + sprintf(name_buf, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu))); + agent = osd_child_dentry_by_inode(env, parent->d_inode, + name_buf, strlen(name_buf)); + mutex_lock(&parent->d_inode->i_mutex); + rc = osd_ldiskfs_add_entry(oh->ot_handle, agent, obj->oo_inode, NULL); + parent->d_inode->i_nlink++; + mark_inode_dirty(parent->d_inode); + mutex_unlock(&parent->d_inode->i_mutex); + if (rc != 0) + CERROR("%s: "DFID" add agent error: rc = %d\n", osd_name(osd), + PFID(lu_object_fid(&obj->oo_dt.do_lu)), rc); + RETURN(rc); +} + +int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd, + struct osd_object *obj, struct osd_thandle *oh) +{ + struct osd_mdobj_map *omm = osd->od_mdt_map; + struct osd_thread_info *oti = osd_oti_get(env); + char *name = oti->oti_name; + struct dentry *agent; + struct dentry *parent; + struct ldiskfs_dir_entry_2 *de; + struct buffer_head *bh; + int rc; + + parent = omm->omm_agent_dentry; + sprintf(name, DFID, PFID(lu_object_fid(&obj->oo_dt.do_lu))); + agent = osd_child_dentry_by_inode(env, parent->d_inode, + name, strlen(name)); + mutex_lock(&parent->d_inode->i_mutex); + bh = osd_ldiskfs_find_entry(parent->d_inode, agent, &de, NULL); + if (bh == NULL) { + mutex_unlock(&parent->d_inode->i_mutex); + RETURN(-ENOENT); + } + rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode, de, bh); + parent->d_inode->i_nlink--; + mark_inode_dirty(parent->d_inode); + mutex_unlock(&parent->d_inode->i_mutex); + brelse(bh); + RETURN(rc); +} + +/* * directory structure on legacy OST: * * O//d0-31/ @@ -193,6 +310,30 @@ static void osd_seq_free(struct osd_obj_map *map, OBD_FREE_PTR(osd_seq); } +static void osd_ost_fini(struct osd_device *osd) +{ + struct osd_obj_seq *osd_seq; + struct osd_obj_seq *tmp; + struct osd_obj_map *map = osd->od_ost_map; + ENTRY; + + if (map == NULL) + return; + + write_lock(&map->om_seq_list_lock); + cfs_list_for_each_entry_safe(osd_seq, tmp, + &map->om_seq_list, + oos_seq_list) { + osd_seq_free(map, osd_seq); + } + write_unlock(&map->om_seq_list_lock); + if (map->om_root) + dput(map->om_root); + OBD_FREE_PTR(map); + osd->od_ost_map = NULL; + EXIT; +} + int osd_obj_map_init(struct osd_device *dev) { int rc; @@ -200,8 +341,11 @@ int osd_obj_map_init(struct osd_device *dev) /* prepare structures for OST */ rc = osd_ost_init(dev); + if (rc) + RETURN(rc); /* prepare structures for MDS */ + rc = osd_mdt_init(dev); RETURN(rc); } @@ -229,27 +373,8 @@ struct osd_obj_seq *osd_seq_find(struct osd_obj_map *map, obd_seq seq) void osd_obj_map_fini(struct osd_device *dev) { - struct osd_obj_seq *osd_seq; - struct osd_obj_seq *tmp; - struct osd_obj_map *map = dev->od_ost_map; - ENTRY; - - map = dev->od_ost_map; - if (map == NULL) - return; - - write_lock(&dev->od_ost_map->om_seq_list_lock); - cfs_list_for_each_entry_safe(osd_seq, tmp, - &dev->od_ost_map->om_seq_list, - oos_seq_list) { - osd_seq_free(map, osd_seq); - } - write_unlock(&dev->od_ost_map->om_seq_list_lock); - if (map->om_root) - dput(map->om_root); - OBD_FREE_PTR(dev->od_ost_map); - dev->od_ost_map = NULL; - EXIT; + osd_ost_fini(dev); + osd_mdt_fini(dev); } static int osd_obj_del_entry(struct osd_thread_info *info, @@ -257,26 +382,25 @@ static int osd_obj_del_entry(struct osd_thread_info *info, struct dentry *dird, char *name, struct thandle *th) { - struct ldiskfs_dir_entry_2 *de; - struct buffer_head *bh; - struct osd_thandle *oh; - struct dentry *child; - struct inode *dir = dird->d_inode; - int rc; - - ENTRY; + struct ldiskfs_dir_entry_2 *de; + struct buffer_head *bh; + struct osd_thandle *oh; + struct dentry *child; + struct inode *dir = dird->d_inode; + int rc; + ENTRY; - oh = container_of(th, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle != NULL); - LASSERT(oh->ot_handle->h_transaction != NULL); + oh = container_of(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle != NULL); + LASSERT(oh->ot_handle->h_transaction != NULL); - child = &info->oti_child_dentry; - child->d_name.hash = 0; - child->d_name.name = name; - child->d_name.len = strlen(name); - child->d_parent = dird; - child->d_inode = NULL; + child = &info->oti_child_dentry; + child->d_name.hash = 0; + child->d_name.name = name; + child->d_name.len = strlen(name); + child->d_parent = dird; + child->d_inode = NULL; ll_vfs_dq_init(dir); mutex_lock(&dir->i_mutex); diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index af3f674..b322eeb 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -1546,8 +1546,6 @@ static int osd_inode_setattr(const struct lu_env *env, bits = attr->la_valid; - LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */ - if (bits & LA_ATIME) inode->i_atime = *osd_inode_time(env, inode, attr->la_atime); if (bits & LA_CTIME) @@ -1976,7 +1974,7 @@ int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd, rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range); if (rc != 0) { CERROR("%s can not find "DFID": rc = %d\n", - osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid), rc); + osd_name(osd), PFID(fid), rc); } return rc; } @@ -2142,8 +2140,15 @@ static int osd_object_destroy(const struct lu_env *env, * lock contention. So it will not affect unlink performance. */ mutex_lock(&inode->i_mutex); if (S_ISDIR(inode->i_mode)) { - LASSERT(osd_inode_unlinked(inode) || - inode->i_nlink == 1); + LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1); + /* it will check/delete the agent inode for every dir + * destory, how to optimize it? unlink performance + * impaction XXX */ + result = osd_delete_agent_inode(env, osd, obj, oh); + if (result != 0 && result != -ENOENT) { + CERROR("%s: delete agent inode "DFID": rc = %d\n", + osd_name(osd), PFID(fid), result); + } spin_lock(&obj->oo_guard); clear_nlink(inode); spin_unlock(&obj->oo_guard); @@ -2251,6 +2256,101 @@ static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj, RETURN(0); } +static int osd_add_dot_dotdot_internal(struct osd_thread_info *info, + struct inode *dir, + struct inode *parent_dir, + const struct dt_rec *dot_fid, + const struct dt_rec *dot_dot_fid, + struct osd_thandle *oth) +{ + struct ldiskfs_dentry_param *dot_ldp; + struct ldiskfs_dentry_param *dot_dot_ldp; + + dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2; + osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid); + + dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; + dot_ldp->edp_magic = 0; + return ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, + dir, dot_ldp, dot_dot_ldp); +} + +/** + * Create an local inode for remote entry + */ +static struct inode *osd_create_remote_inode(const struct lu_env *env, + struct osd_device *osd, + struct osd_object *pobj, + const struct lu_fid *fid, + struct thandle *th) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct inode *local; + struct osd_thandle *oh; + int rc; + ENTRY; + + LASSERT(th); + oh = container_of(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle->h_transaction != NULL); + +#ifdef HAVE_QUOTA_SUPPORT + osd_push_ctxt(info->oti_env, save); +#endif + /* FIXME: Insert index api needs to know the mode of + * the remote object. Just use S_IFDIR for now */ + local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR); +#ifdef HAVE_QUOTA_SUPPORT + osd_pop_ctxt(info->oti_env, save); +#endif + if (IS_ERR(local)) { + CERROR("%s: create local error %d\n", osd_name(osd), + (int)PTR_ERR(local)); + RETURN(local); + } + + rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode, + (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu), + (const struct dt_rec *)fid, oh); + if (rc != 0) { + CERROR("%s: "DFID" add dot dotdot error: rc = %d\n", + osd_name(osd), PFID(fid), rc); + RETURN(ERR_PTR(rc)); + } + + RETURN(local); +} + +/** + * Delete local inode for remote entry + */ +static int osd_delete_remote_inode(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid, + __u32 ino, struct osd_thandle *oh) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; + struct inode *inode; + ENTRY; + + id->oii_ino = le32_to_cpu(ino); + id->oii_gen = OSD_OII_NOGEN; + inode = osd_iget(oti, osd, id); + if (IS_ERR(inode)) { + CERROR("%s: iget error "DFID" id %u:%u\n", osd_name(osd), + PFID(fid), id->oii_ino, id->oii_gen); + RETURN(PTR_ERR(inode)); + } + + clear_nlink(inode); + mark_inode_dirty(inode); + CDEBUG(D_INODE, "%s: delete remote inode "DFID" %lu\n", + osd_name(osd), PFID(fid), inode->i_ino); + iput(inode); + RETURN(0); +} + /** * OSD layer object create function for interoperability mode (b11826). * This is mostly similar to osd_object_create(). Only difference being, fid is @@ -3040,6 +3140,27 @@ static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, RETURN(rc); } +static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, + struct lu_fid *fid) +{ + struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range; + struct seq_server_site *ss = osd_seq_site(osd); + int rc; + ENTRY; + + if ((!fid_is_norm(fid) && !fid_is_igif(fid)) || ss == NULL) + RETURN(0); + + rc = osd_fld_lookup(env, osd, fid, range); + if (rc != 0) { + CERROR("%s: Can not lookup fld for "DFID"\n", + osd_name(osd), PFID(fid)); + RETURN(rc); + } + + RETURN(ss->ss_node_id != range->lsr_index); +} + /** * Index delete function for interoperability mode (b11826). * It will remove the directory entry added by osd_index_ea_insert(). @@ -3059,11 +3180,12 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, struct inode *dir = obj->oo_inode; struct dentry *dentry; struct osd_thandle *oh; - struct ldiskfs_dir_entry_2 *de; + struct ldiskfs_dir_entry_2 *de = NULL; struct buffer_head *bh; struct htree_lock *hlock = NULL; - int rc; - + struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); + int rc; ENTRY; LINVRNT(osd_invariant(obj)); @@ -3104,6 +3226,24 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, else up_write(&obj->oo_ext_idx_sem); + if (rc != 0) + GOTO(out, rc); + + LASSERT(de != NULL); + rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid); + if (rc == 0 && osd_remote_fid(env, osd, fid)) { + __u32 ino = le32_to_cpu(de->inode); + + rc = osd_delete_remote_inode(env, osd, fid, ino, oh); + if (rc != 0) + CERROR("%s: del local inode "DFID": rc = %d\n", + osd_name(osd), PFID(fid), rc); + } else { + if (rc == -ENODATA) + rc = 0; + } +out: + LASSERT(osd_invariant(obj)); RETURN(rc); } @@ -3319,8 +3459,6 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, struct thandle *th) { struct inode *inode = dir->oo_inode; - struct ldiskfs_dentry_param *dot_ldp; - struct ldiskfs_dentry_param *dot_dot_ldp; struct osd_thandle *oth; int result = 0; @@ -3336,21 +3474,18 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, dir->oo_compat_dot_created = 1; result = 0; } - } else if(strcmp(name, dotdot) == 0) { + } else if (strcmp(name, dotdot) == 0) { if (!dir->oo_compat_dot_created) return -EINVAL; - - dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2; - osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid); /* in case of rename, dotdot is already created */ - if (dir->oo_compat_dotdot_created) + if (dir->oo_compat_dotdot_created) { return __osd_ea_add_rec(info, dir, parent_dir, name, dot_dot_fid, NULL, th); + } - dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; - dot_ldp->edp_magic = 0; - result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, - inode, dot_ldp, dot_dot_ldp); + result = osd_add_dot_dotdot_internal(info, dir->oo_inode, + parent_dir, dot_fid, + dot_dot_fid, oth); if (result == 0) dir->oo_compat_dotdot_created = 1; } @@ -3489,6 +3624,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); if (bh) { struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; struct osd_idmap_cache *oic = &oti->oti_cache; struct osd_device *dev = osd_obj2dev(obj); struct osd_scrub *scrub = &dev->od_scrub; @@ -3500,14 +3636,15 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, /* done with de, release bh */ brelse(bh); if (rc != 0) - rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid); + rc = osd_ea_fid_get(env, obj, ino, fid, id); else - osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN); - if (rc != 0) { + osd_id_gen(id, ino, OSD_OII_NOGEN); + if (rc != 0 || osd_remote_fid(env, dev, fid)) { fid_zero(&oic->oic_fid); GOTO(out, rc); } + oic->oic_lid = *id; oic->oic_fid = *fid; if ((scrub->os_pos_current <= ino) && (sf->sf_flags & SF_INCONSISTENT || @@ -3602,7 +3739,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env, struct thandle *handle) { struct osd_thandle *oh; - struct inode *inode; + struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); struct lu_fid *fid = (struct lu_fid *)rec; int rc; ENTRY; @@ -3616,24 +3753,42 @@ static int osd_index_declare_ea_insert(const struct lu_env *env, osd_trans_declare_op(env, oh, OSD_OT_INSERT, osd_dto_credits_noquota[DTO_INDEX_INSERT]); - inode = osd_dt_obj(dt)->oo_inode; - LASSERT(inode); + if (osd_dt_obj(dt)->oo_inode == NULL) { + const char *name = (const char *)key; + /* Object is not being created yet. Only happens when + * 1. declare directory create + * 2. declare insert . + * 3. declare insert .. + */ + LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0); + } else { + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + /* We ignore block quota on meta pool (MDTs), so needn't + * calculate how many blocks will be consumed by this index + * insert */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, + oh, true, true, NULL, false); + } - /* We ignore block quota on meta pool (MDTs), so needn't - * calculate how many blocks will be consumed by this index - * insert */ - rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, - true, true, NULL, false); if (fid == NULL) RETURN(0); - /* It does fld look up inside declare, and the result will be - * added to fld cache, so the following fld lookup inside insert - * does not need send RPC anymore, so avoid send rpc with holding - * transaction */ - LASSERTF(fid_is_sane(fid), "fid is insane"DFID"\n", PFID(fid)); - osd_fld_lookup(env, osd_dt_dev(handle->th_dev), fid, - &osd_oti_get(env)->oti_seq_range); + rc = osd_remote_fid(env, osd, fid); + if (rc <= 0) + RETURN(rc); + +#ifdef OSD_DECLARE_OP +#define OSD_OT_CREATE create +#define OSD_OT_INSERT insert +#define osd_trans_declare_op(env, oh, op, cred) OSD_DECLARE_OP(oh, op, cred) +#endif + osd_trans_declare_op(env, oh, OSD_OT_CREATE, + osd_dto_credits_noquota[DTO_OBJECT_CREATE]); + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1); + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1); RETURN(rc); } @@ -3654,13 +3809,16 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th, struct lustre_capa *capa, int ignore_quota) { - struct osd_object *obj = osd_dt_obj(dt); - struct lu_fid *fid = (struct lu_fid *) rec; - const char *name = (const char *)key; - struct osd_object *child; - int rc; - - ENTRY; + struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); + struct lu_fid *fid = (struct lu_fid *) rec; + const char *name = (const char *)key; + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; + struct inode *child_inode = NULL; + struct osd_object *child = NULL; + int rc; + ENTRY; LASSERT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); @@ -3671,16 +3829,59 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) RETURN(-EACCES); - child = osd_object_find(env, dt, fid); - if (!IS_ERR(child)) { - rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th); - osd_object_put(env, child); - } else { - rc = PTR_ERR(child); - } + LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid)); - LASSERT(osd_invariant(obj)); - RETURN(rc); + rc = osd_remote_fid(env, osd, fid); + if (rc < 0) { + CERROR("%s: Can not find object "DFID" rc %d\n", + osd_name(osd), PFID(fid), rc); + RETURN(rc); + } + + if (rc == 1) { + /* Insert remote entry */ + if (strcmp(name, dotdot) == 0 && strlen(name) == 2) { + struct osd_mdobj_map *omm = osd->od_mdt_map; + struct osd_thandle *oh; + + /* If parent on remote MDT, we need put this object + * under AGENT */ + oh = container_of(th, typeof(*oh), ot_super); + rc = osd_create_agent_inode(env, osd, obj, oh); + if (rc != 0) { + CERROR("%s: add agent "DFID" error: rc = %d\n", + osd_name(osd), + PFID(lu_object_fid(&dt->do_lu)), rc); + RETURN(rc); + } + + child_inode = igrab(omm->omm_agent_dentry->d_inode); + } else { + child_inode = osd_create_remote_inode(env, osd, obj, + fid, th); + if (IS_ERR(child_inode)) + RETURN(PTR_ERR(child_inode)); + } + } else { + /* Insert local entry */ + child = osd_object_find(env, dt, fid); + if (IS_ERR(child)) { + CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n", + osd_name(osd), PFID(fid), + id->oii_ino, id->oii_gen, + (int)PTR_ERR(child_inode)); + RETURN(PTR_ERR(child_inode)); + } + child_inode = igrab(child->oo_inode); + } + + rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th); + + iput(child_inode); + if (child != NULL) + osd_object_put(env, child); + LASSERT(osd_invariant(obj)); + RETURN(rc); } /** @@ -3763,7 +3964,6 @@ static int osd_it_iam_get(const struct lu_env *env, * * \param di osd iterator */ - static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di) { struct osd_it_iam *it = (struct osd_it_iam *)di; @@ -4249,6 +4449,7 @@ static inline int osd_it_ea_rec(const struct lu_env *env, struct osd_scrub *scrub = &dev->od_scrub; struct scrub_file *sf = &scrub->os_file; struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; struct osd_idmap_cache *oic = &oti->oti_cache; struct lu_fid *fid = &it->oie_dirent->oied_fid; struct lu_dirent *lde = (struct lu_dirent *)dtrec; @@ -4257,19 +4458,24 @@ static inline int osd_it_ea_rec(const struct lu_env *env, ENTRY; if (!fid_is_sane(fid)) { - rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid); + rc = osd_ea_fid_get(env, obj, ino, fid, id); if (rc != 0) { fid_zero(&oic->oic_fid); RETURN(rc); } } else { - osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN); + osd_id_gen(id, ino, OSD_OII_NOGEN); } osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off, it->oie_dirent->oied_name, it->oie_dirent->oied_namelen, it->oie_dirent->oied_type, attr); + + if (osd_remote_fid(env, dev, fid)) + RETURN(0); + + oic->oic_lid = *id; oic->oic_fid = *fid; if ((scrub->os_pos_current <= ino) && (sf->sf_flags & SF_INCONSISTENT || diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index e210f74..01ffd80 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -169,6 +169,16 @@ struct osd_obj_map { struct semaphore om_dir_init_sem; }; +struct osd_mdobj { + struct dentry *om_root; /* AGENT/ */ + obd_seq om_index; /* mdt index */ + cfs_list_t om_list; /* list to omm_list */ +}; + +struct osd_mdobj_map { + struct dentry *omm_agent_dentry; +}; + #define osd_ldiskfs_find_entry(dir, dentry, de, lock) \ ll_ldiskfs_find_entry(dir, dentry, de, lock) #define osd_ldiskfs_add_entry(handle, child, cinode, hlock) \ @@ -265,6 +275,7 @@ struct osd_device { struct lu_site od_site; struct osd_obj_map *od_ost_map; + struct osd_mdobj_map *od_mdt_map; unsigned long long od_readcache_max_filesize; int od_read_cache; @@ -591,6 +602,7 @@ struct osd_thread_info { bool oti_rollback; #endif + char oti_name[48]; }; extern int ldiskfs_pdo; @@ -643,6 +655,15 @@ int osd_scrub_dump(struct osd_device *dev, char *buf, int len); int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd, const struct lu_fid *fid, struct lu_seq_range *range); +struct dentry *osd_agent_lookup(struct osd_mdobj_map *omm, int index); +struct dentry *osd_agent_load(const struct osd_device *osd, int mdt_index, + int create); + +int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd, + struct osd_object *obj, struct osd_thandle *oh); +int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd, + struct osd_object *obj, struct osd_thandle *oh); + /* osd_quota_fmt.c */ int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj, int type, uint blk, int depth, uint index, -- 1.8.3.1