From 0279c4de77c351f9fe8c31f51e93f4a29d0fb2c0 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 21 Feb 2009 18:35:17 +0000 Subject: [PATCH] - zero-copy IO methods in ldiskfs OSD - ldiskfs OSD creates object index (fid-to-inode mapping) using internal means --- lustre/osd/osd_handler.c | 328 +++++----------------- lustre/osd/osd_internal.h | 130 +++++++++ lustre/osd/osd_io.c | 689 ++++++++++++++++++++++++++++++++++++++++++++++ lustre/osd/osd_oi.c | 108 ++++++-- lustre/osd/osd_oi.h | 7 +- 5 files changed, 987 insertions(+), 275 deletions(-) create mode 100644 lustre/osd/osd_io.c diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 7950a3e..e17fb7e 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -89,40 +89,6 @@ static const char dot[] = "."; static const char dotdot[] = ".."; static const char remote_obj_dir[] = "REM_OBJ_DIR"; -struct osd_directory { - struct iam_container od_container; - struct iam_descr od_descr; - struct semaphore od_sem; -}; - -struct osd_object { - struct dt_object oo_dt; - /** - * Inode for file system object represented by this osd_object. This - * inode is pinned for the whole duration of lu_object life. - * - * Not modified concurrently (either setup early during object - * creation, or assigned by osd_object_create() under write lock). - */ - struct inode *oo_inode; - struct rw_semaphore oo_sem; - struct osd_directory *oo_dir; - /** protects inode attributes. */ - spinlock_t oo_guard; - /** - * Following two members are used to indicate the presence of dot and - * dotdot in the given directory. This is required for interop mode - * (b11826). - */ - int oo_compat_dot_created; - int oo_compat_dotdot_created; - - const struct lu_env *oo_owner; -#ifdef CONFIG_LOCKDEP - struct lockdep_map oo_dep_map; -#endif -}; - static int osd_root_get (const struct lu_env *env, struct dt_device *dev, struct lu_fid *f); @@ -237,9 +203,6 @@ static struct lu_device *osd_device_alloc (const struct lu_env *env, static struct lu_object *osd_object_alloc (const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d); -static struct inode *osd_iget (struct osd_thread_info *info, - struct osd_device *dev, - const struct osd_inode_id *id); static struct super_block *osd_sb (const struct osd_device *dev); static struct dt_it *osd_it_iam_init (const struct lu_env *env, struct dt_object *dt, @@ -277,71 +240,14 @@ static struct lu_device_type osd_device_type; static const struct lu_object_operations osd_lu_obj_ops; static struct obd_ops osd_obd_device_ops; static const struct lu_device_operations osd_lu_ops; -static struct lu_context_key osd_key; +extern struct lu_context_key osd_key; static const struct dt_object_operations osd_obj_ops; static const struct dt_object_operations osd_obj_ea_ops; -static const struct dt_body_operations osd_body_ops; -static const struct dt_body_operations osd_body_ops_new; +extern const struct dt_body_operations osd_body_ops; +extern const struct dt_body_operations osd_body_ops_new; static const struct dt_index_operations osd_index_iam_ops; static const struct dt_index_operations osd_index_ea_ops; -/** - * Basic transaction credit op - */ -enum dt_txn_op { - DTO_INDEX_INSERT, - DTO_INDEX_DELETE, - DTO_IDNEX_UPDATE, - DTO_OBJECT_CREATE, - DTO_OBJECT_DELETE, - DTO_ATTR_SET_BASE, - DTO_XATTR_SET, - DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */ - DTO_WRITE_BASE, - DTO_WRITE_BLOCK, - DTO_ATTR_SET_CHOWN, - - DTO_NR -}; - -static const int osd_dto_credits_noquota[DTO_NR]; - -#define OSD_TRACK_DECLARES -#ifdef OSD_TRACK_DECLARES -#define OSD_DECLARE_OP(oh,op) { \ - LASSERT(oh->ot_handle == NULL); \ - ((oh)->ot_declare_ ##op)++;} -#define OSD_EXEC_OP(handle,op) { \ - struct osd_thandle *oh; \ - oh = container_of0(handle, struct osd_thandle, ot_super);\ - LASSERT((oh)->ot_declare_ ##op > 0); \ - ((oh)->ot_declare_ ##op)--;} -#else -#define OSD_DECLARE_OP(oh,op) -#define OSD_EXEC_OP(oh,op) -#endif - -struct osd_thandle { - struct thandle ot_super; - handle_t *ot_handle; - struct journal_callback ot_jcb; - /* Link to the device, for debugging. */ - struct lu_ref_link *ot_dev_link; - int ot_credits; -#ifdef OSD_TRACK_DECLARES - int ot_declare_attr_set; - int ot_declare_punch; - int ot_declare_xattr_set; - int ot_declare_xattr_del; - int ot_declare_create; - int ot_declare_ref_add; - int ot_declare_ref_del; - int ot_declare_write; - int ot_declare_insert; - int ot_declare_delete; -#endif -}; - #ifdef HAVE_QUOTA_SUPPORT static inline void osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save) @@ -367,6 +273,16 @@ osd_pop_ctxt(struct osd_ctxt *save) } #endif +static struct super_block *osd_sb(const struct osd_device *dev) +{ + return dev->od_mount->lmi_mnt->mnt_sb; +} + +static journal_t *osd_journal(const struct osd_device *dev) +{ + return LDISKFS_SB(osd_sb(dev))->s_journal; +} + /* * Invariants, assertions. */ @@ -981,7 +897,7 @@ static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d, * Note: we do not count into QUOTA here. * If we mount with --data_journal we may need more. */ -static const int osd_dto_credits_noquota[DTO_NR] = { +const int osd_dto_credits_noquota[DTO_NR] = { /** * Insert/Delete. * INDEX_EXTRA_TRANS_BLOCKS(8) + @@ -1229,8 +1145,8 @@ static int capa_is_sane(const struct lu_env *env, RETURN(0); } -static int osd_object_auth(const struct lu_env *env, struct dt_object *dt, - struct lustre_capa *capa, __u64 opc) +int osd_object_auth(const struct lu_env *env, struct dt_object *dt, + struct lustre_capa *capa, __u64 opc) { const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_device *dev = osd_dev(dt->do_lu.lo_dev); @@ -1442,17 +1358,6 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj, return 0; } -extern struct inode *ldiskfs_create_inode(handle_t *handle, - struct inode * dir, int mode); -extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry, - struct inode *inode); -extern int ldiskfs_delete_entry(handle_t *handle, - struct inode * dir, - struct ldiskfs_dir_entry_2 * de_del, - struct buffer_head * bh); -extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry, - struct ldiskfs_dir_entry_2 - ** res_dir); extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir, struct inode *inode); @@ -1461,17 +1366,17 @@ extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode, const void *value, size_t value_len, int flags); -static struct dentry * osd_child_dentry_get(const struct lu_env *env, - struct osd_object *obj, - const char *name, - const int namelen) +struct dentry * osd_child_dentry_by_inode(const struct lu_env *env, + struct inode *inode, + const char *name, + const int namelen) { struct osd_thread_info *info = osd_oti_get(env); struct dentry *child_dentry = &info->oti_child_dentry; struct dentry *obj_dentry = &info->oti_obj_dentry; - obj_dentry->d_inode = obj->oo_inode; - obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); + obj_dentry->d_inode = inode; + obj_dentry->d_sb = inode->i_sb; obj_dentry->d_name.hash = 0; child_dentry->d_name.hash = 0; @@ -1481,6 +1386,13 @@ static struct dentry * osd_child_dentry_get(const struct lu_env *env, return child_dentry; } +struct dentry * osd_child_dentry_get(const struct lu_env *env, + struct osd_object *obj, + const char *name, + const int namelen) +{ + return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen); +} static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, umode_t mode, @@ -1490,7 +1402,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, int result; struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oth; - struct dt_object *parent; + struct dt_object *parent = NULL; struct inode *inode; #ifdef HAVE_QUOTA_SUPPORT struct osd_ctxt *save = &info->oti_ctxt; @@ -1507,14 +1419,24 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, else parent = osd->od_obj_area; +#if 0 LASSERT(parent != NULL); LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL); +#else + /* obdfilter can't create directory yet + * i think it's responsibility of osd to + * create all internal objects/dirs by itself + * -bzzzz */ +#endif #ifdef HAVE_QUOTA_SUPPORT osd_push_ctxt(info->oti_env, save); #endif inode = ldiskfs_create_inode(oth->ot_handle, - osd_dt_obj(parent)->oo_inode, mode); + parent ? + osd_dt_obj(parent)->oo_inode : + osd_sb(osd)->s_root->d_inode, + mode); #ifdef HAVE_QUOTA_SUPPORT osd_pop_ctxt(save); #endif @@ -1528,11 +1450,6 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, } -extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize, - int recsize, handle_t *handle); - -extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize, - int recsize, handle_t *handle); enum { @@ -1709,6 +1626,8 @@ static int __osd_object_create(struct osd_thread_info *info, int result; + LASSERT(dof); + result = osd_create_pre(info, obj, attr, th); if (result == 0) { result = osd_create_type_f(dof->dof_type)(info, obj, @@ -1730,16 +1649,20 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj, struct osd_thread_info *info = osd_oti_get(env); struct osd_inode_id *id = &info->oti_id; struct osd_device *osd = osd_obj2dev(obj); +#if 0 struct md_ucred *uc = md_ucred(env); + LASSERT(uc != NULL); +#else + struct md_ucred *uc = NULL; +#endif LASSERT(obj->oo_inode != NULL); - LASSERT(uc != NULL); id->oii_ino = obj->oo_inode->i_ino; id->oii_gen = obj->oo_inode->i_generation; return osd_oi_insert(info, &osd->od_oi, fid, id, th, - uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); + uc ? uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK : 1); } static int osd_declare_object_create(const struct lu_env *env, @@ -2355,102 +2278,6 @@ static const struct dt_object_operations osd_obj_ea_ops = { }; /* - * Body operations. - */ - -/* - * XXX: Another layering violation for now. - * - * We don't want to use ->f_op->read methods, because generic file write - * - * - serializes on ->i_sem, and - * - * - does a lot of extra work like balance_dirty_pages(), - * - * which doesn't work for globally shared files like /last-received. - */ -int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs); -int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize, - loff_t *offs, handle_t *handle); - -static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos, - struct lustre_capa *capa) -{ - struct inode *inode = osd_dt_obj(dt)->oo_inode; - - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) - RETURN(-EACCES); - - return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos); -} - -static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, - const loff_t size, loff_t pos, - struct thandle *handle, struct lustre_capa *capa) -{ - struct osd_thandle *oh; - - LASSERT(handle != NULL); - - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE)) - return -EACCES; - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); - - OSD_DECLARE_OP(oh, write); - oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK]; - - return 0; -} - -static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, int ignore_quota) -{ - struct inode *inode = osd_dt_obj(dt)->oo_inode; - struct osd_thandle *oh; - ssize_t result; -#ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = current->cap_effective; -#endif - - LASSERT(handle != NULL); - OSD_EXEC_OP(handle, write); - - oh = container_of(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle->h_transaction != NULL); -#ifdef HAVE_QUOTA_SUPPORT - if (ignore_quota) - current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; - else - current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; -#endif - result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len, - pos, oh->ot_handle); -#ifdef HAVE_QUOTA_SUPPORT - current->cap_effective = save; -#endif - if (result == 0) - result = buf->lb_len; - return result; -} - -/* - * in some cases we may need declare methods for objects being created - * e.g., when we create symlink - */ -static const struct dt_body_operations osd_body_ops_new = { - .dbo_declare_write = osd_declare_write, -}; - -static const struct dt_body_operations osd_body_ops = { - .dbo_read = osd_read, - .dbo_declare_write = osd_declare_write, - .dbo_write = osd_write -}; - -/* * Index operations. */ @@ -3691,7 +3518,7 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, /* type constructor/destructor: osd_type_init, osd_type_fini */ LU_TYPE_INIT_FINI(osd, &osd_key); -static struct lu_context_key osd_key = { +struct lu_context_key osd_key = { .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD, .lct_init = osd_key_init, .lct_fini = osd_key_fini, @@ -3732,7 +3559,7 @@ static int osd_device_init(const struct lu_env *env, struct lu_device *d, /* context for commit hooks */ ctx = &osd_dev(d)->od_env_for_commit.le_ctx; - rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF); + rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_DT_THREAD|LCT_REMEMBER|LCT_NOREF); if (rc == 0) { rc = osd_procfs_init(osd_dev(d), name); ctx->lc_cookie = 0x3; @@ -3782,6 +3609,9 @@ static int osd_mount(const struct lu_env *env, lsi = s2lsi(lmi->lmi_sb); ldd = lsi->lsi_ldd; + o->od_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); + LASSERT(o->od_fsops); + if (ldd->ldd_flags & LDD_F_IAM_DIR) { o->od_iop_mode = 0; LCONSOLE_WARN("OSD: IAM mode enabled\n"); @@ -3910,8 +3740,7 @@ static int osd_prepare(const struct lu_env *env, ENTRY; /* 1. initialize oi before any file create or file open */ - result = osd_oi_init(oti, &osd->od_oi, - &osd->od_dt_dev, lu2md_dev(pdev)); + result = osd_oi_init(oti, &osd->od_oi, osd); if (result != 0) RETURN(result); @@ -3919,29 +3748,32 @@ static int osd_prepare(const struct lu_env *env, lsi = s2lsi(lmi->lmi_sb); ldd = lsi->lsi_ldd; - /* 2. setup local objects */ - result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev)); - if (result) - goto out; - - /* 3. open remote object dir */ - d = dt_store_open(env, lu2dt_dev(dev), "", - remote_obj_dir, &oti->oti_fid); - if (!IS_ERR(d)) { - osd->od_obj_area = d; - result = 0; - } else { - result = PTR_ERR(d); - osd->od_obj_area = NULL; + if (lu_device_is_md(pdev)) { + /* 2. setup local objects */ + result = llo_local_objects_setup(env, lu2md_dev(pdev), + lu2dt_dev(dev)); + if (result) + goto out; + + /* 3. open remote object dir */ + d = dt_store_open(env, lu2dt_dev(dev), "", + remote_obj_dir, &oti->oti_fid); + if (!IS_ERR(d)) { + osd->od_obj_area = d; + result = 0; + } else { + result = PTR_ERR(d); + osd->od_obj_area = NULL; + } } + out: RETURN(result); } -static struct inode *osd_iget(struct osd_thread_info *info, - struct osd_device *dev, - const struct osd_inode_id *id) +struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev, + const struct osd_inode_id *id) { struct inode *inode; @@ -4086,16 +3918,6 @@ static struct lu_device *osd2lu_dev(struct osd_device *osd) return &osd->od_dt_dev.dd_lu_dev; } -static struct super_block *osd_sb(const struct osd_device *dev) -{ - return dev->od_mount->lmi_mnt->mnt_sb; -} - -static journal_t *osd_journal(const struct osd_device *dev) -{ - return LDISKFS_SB(osd_sb(dev))->s_journal; -} - static int osd_has_index(const struct osd_object *obj) { return obj->oo_dt.do_index_ops != NULL; diff --git a/lustre/osd/osd_internal.h b/lustre/osd/osd_internal.h index 52a165d..8056971e 100644 --- a/lustre/osd/osd_internal.h +++ b/lustre/osd/osd_internal.h @@ -63,6 +63,8 @@ #include #include +#include + #include #include "osd_oi.h" @@ -79,6 +81,97 @@ struct osd_ctxt { }; #endif +#define OSD_TRACK_DECLARES +#ifdef OSD_TRACK_DECLARES +#define OSD_DECLARE_OP(oh,op) { \ + LASSERT(oh->ot_handle == NULL); \ + ((oh)->ot_declare_ ##op)++;} +#define OSD_EXEC_OP(handle,op) { \ + struct osd_thandle *oh; \ + oh = container_of0(handle, struct osd_thandle, ot_super);\ + LASSERT((oh)->ot_declare_ ##op > 0); \ + ((oh)->ot_declare_ ##op)--;} +#else +#define OSD_DECLARE_OP(oh,op) +#define OSD_EXEC_OP(oh,op) +#endif + +struct osd_thandle { + struct thandle ot_super; + handle_t *ot_handle; + struct journal_callback ot_jcb; + /* Link to the device, for debugging. */ + struct lu_ref_link *ot_dev_link; + int ot_credits; +#ifdef OSD_TRACK_DECLARES + int ot_declare_attr_set; + int ot_declare_punch; + int ot_declare_xattr_set; + int ot_declare_xattr_del; + int ot_declare_create; + int ot_declare_ref_add; + int ot_declare_ref_del; + int ot_declare_write; + int ot_declare_insert; + int ot_declare_delete; +#endif +}; + +/** + * Basic transaction credit op + */ +enum dt_txn_op { + DTO_INDEX_INSERT, + DTO_INDEX_DELETE, + DTO_IDNEX_UPDATE, + DTO_OBJECT_CREATE, + DTO_OBJECT_DELETE, + DTO_ATTR_SET_BASE, + DTO_XATTR_SET, + DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */ + DTO_WRITE_BASE, + DTO_WRITE_BLOCK, + DTO_ATTR_SET_CHOWN, + + DTO_NR +}; + +extern const int osd_dto_credits_noquota[DTO_NR]; + +struct osd_directory { + struct iam_container od_container; + struct iam_descr od_descr; + struct semaphore od_sem; +}; + +struct osd_object { + struct dt_object oo_dt; + /** + * Inode for file system object represented by this osd_object. This + * inode is pinned for the whole duration of lu_object life. + * + * Not modified concurrently (either setup early during object + * creation, or assigned by osd_object_create() under write lock). + */ + struct inode *oo_inode; + struct rw_semaphore oo_sem; + struct osd_directory *oo_dir; + /** protects inode attributes. */ + spinlock_t oo_guard; + /** + * Following two members are used to indicate the presence of dot and + * dotdot in the given directory. This is required for interop mode + * (b11826). + */ + int oo_compat_dot_created; + int oo_compat_dotdot_created; + + const struct lu_env *oo_owner; +#ifdef CONFIG_LOCKDEP + struct lockdep_map oo_dep_map; +#endif +}; + /* * osd device. */ @@ -126,6 +219,8 @@ struct osd_device { * It will be initialized, using mount param. */ __u32 od_iop_mode; + + struct fsfilt_operations *od_fsops; }; /** @@ -156,6 +251,19 @@ struct osd_it_iam { struct iam_iterator oi_it; }; +#define MAX_BLOCKS_PER_PAGE (CFS_PAGE_SIZE / 512) + +struct filter_iobuf { + atomic_t dr_numreqs; /* number of reqs being processed */ + wait_queue_head_t dr_wait; + int dr_max_pages; + int dr_npages; + int dr_error; + struct page *dr_pages[PTLRPC_MAX_BRW_PAGES]; + unsigned long dr_blocks[PTLRPC_MAX_BRW_PAGES*MAX_BLOCKS_PER_PAGE]; + unsigned int dr_ignore_quota:1; +}; + struct osd_thread_info { const struct lu_env *oti_env; /** @@ -226,6 +334,9 @@ struct osd_thread_info { #ifdef HAVE_QUOTA_SUPPORT struct osd_ctxt oti_ctxt; #endif + + /** 0-copy IO */ + struct filter_iobuf oti_iobuf; }; #ifdef LPROCFS @@ -240,5 +351,24 @@ void osd_lprocfs_time_end(const struct lu_env *env, int osd_statfs(const struct lu_env *env, struct dt_device *dev, struct kstatfs *sfs); +struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev, + const struct osd_inode_id *id); +extern struct inode *ldiskfs_create_inode(handle_t *handle, + struct inode * dir, int mode); +extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize, + int recsize, handle_t *handle); + +extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize, + int recsize, handle_t *handle); +extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry, + struct inode *inode); +extern int ldiskfs_delete_entry(handle_t *handle, + struct inode * dir, + struct ldiskfs_dir_entry_2 * de_del, + struct buffer_head * bh); +extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry, + struct ldiskfs_dir_entry_2 + ** res_dir); + #endif /* __KERNEL__ */ #endif /* _OSD_INTERNAL_H */ diff --git a/lustre/osd/osd_io.c b/lustre/osd/osd_io.c new file mode 100644 index 0000000..47731b9 --- /dev/null +++ b/lustre/osd/osd_io.c @@ -0,0 +1,689 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/osd/osd_io.c + * + * body operations + * + * Author: Nikita Danilov + * Author: Alex Zhuravlev + * + */ + +#include + +/* LUSTRE_VERSION_CODE */ +#include +/* prerequisite for linux/xattr.h */ +#include +/* prerequisite for linux/xattr.h */ +#include +/* + * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd + * and file system is not yet specified. + */ +/* handle_t, journal_start(), journal_stop() */ +#include +/* LDISKFS_SB() */ +#include +#include + +/* + * struct OBD_{ALLOC,FREE}*() + * OBD_FAIL_CHECK + */ +#include +/* struct ptlrpc_thread */ +#include + +/* fid_is_local() */ +#include +#include + +#include "osd_internal.h" + +static struct osd_object *osd_obj(const struct lu_object *o) +{ + return container_of0(o, struct osd_object, oo_dt.do_lu); +} + +static struct osd_object *osd_dt_obj(const struct dt_object *d) +{ + return osd_obj(&d->do_lu); +} + +static struct osd_device *osd_dt_dev(const struct dt_device *d) +{ + return container_of0(d, struct osd_device, od_dt_dev); +} + +static struct osd_device *osd_dev(const struct lu_device *d) +{ + return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev)); +} + +static struct osd_device *osd_obj2dev(const struct osd_object *o) +{ + return osd_dev(o->oo_dt.do_lu.lo_dev); +} + +extern struct lu_context_key osd_key; + +static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env) +{ + return lu_context_key_get(&env->le_ctx, &osd_key); +} + +int osd_object_auth(const struct lu_env *env, struct dt_object *dt, + struct lustre_capa *capa, __u64 opc); + +static void filter_init_iobuf(struct filter_iobuf *iobuf) +{ + + init_waitqueue_head(&iobuf->dr_wait); + atomic_set(&iobuf->dr_numreqs, 0); + iobuf->dr_max_pages = PTLRPC_MAX_BRW_PAGES; + iobuf->dr_npages = 0; + iobuf->dr_error = 0; +} + +static void filter_iobuf_add_page(struct filter_iobuf *iobuf, struct page *page) +{ + LASSERT(iobuf->dr_npages < iobuf->dr_max_pages); + iobuf->dr_pages[iobuf->dr_npages++] = page; +} + +static int dio_complete_routine(struct bio *bio, unsigned int done, int error) +{ + struct filter_iobuf *iobuf = bio->bi_private; + struct bio_vec *bvl; + int i; + + /* CAVEAT EMPTOR: possibly in IRQ context + * DO NOT record procfs stats here!!! */ + + if (bio->bi_size) /* Not complete */ + return 1; + + if (unlikely(iobuf == NULL)) { + CERROR("***** bio->bi_private is NULL! This should never " + "happen. Normally, I would crash here, but instead I " + "will dump the bio contents to the console. Please " + "report this to , along " + "with any interesting messages leading up to this point " + "(like SCSI errors, perhaps). Because bi_private is " + "NULL, I can't wake up the thread that initiated this " + "IO - you will probably have to reboot this node.\n"); + CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, " + "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, " + "bi_private: %p\n", bio->bi_next, bio->bi_flags, + bio->bi_rw, bio->bi_vcnt, bio->bi_idx, bio->bi_size, + bio->bi_end_io, atomic_read(&bio->bi_cnt), + bio->bi_private); + return 0; + } + + /* the check is outside of the cycle for performance reason -bzzz */ + if (!test_bit(BIO_RW, &bio->bi_rw)) { + bio_for_each_segment(bvl, bio, i) { + if (likely(error == 0)) + SetPageUptodate(bvl->bv_page); + LASSERT(PageLocked(bvl->bv_page)); + ClearPageConstant(bvl->bv_page); + } + } else { + if (mapping_cap_page_constant_write(iobuf->dr_pages[0]->mapping)){ + bio_for_each_segment(bvl, bio, i) { + ClearPageConstant(bvl->bv_page); + } + } + } + + /* any real error is good enough -bzzz */ + if (error != 0 && iobuf->dr_error == 0) + iobuf->dr_error = error; + + if (atomic_dec_and_test(&iobuf->dr_numreqs)) + wake_up(&iobuf->dr_wait); + + /* Completed bios used to be chained off iobuf->dr_bios and freed in + * filter_clear_dreq(). It was then possible to exhaust the biovec-256 + * mempool when serious on-disk fragmentation was encountered, + * deadlocking the OST. The bios are now released as soon as complete + * so the pool cannot be exhausted while IOs are competing. bug 10076 */ + bio_put(bio); + return 0; +} + +static void osd_submit_bio(int rw, struct bio *bio) +{ + LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw); + if (rw == OBD_BRW_READ) + submit_bio(READ, bio); + else + submit_bio(WRITE, bio); +} + +static int can_be_merged(struct bio *bio, sector_t sector) +{ + unsigned int size; + + if (!bio) + return 0; + + size = bio->bi_size >> 9; + return bio->bi_sector + size == sector ? 1 : 0; +} + +static int osd_do_bio(struct inode *inode, struct filter_iobuf *iobuf, int rw) +{ + int blocks_per_page = CFS_PAGE_SIZE >> inode->i_blkbits; + struct page **pages = iobuf->dr_pages; + int npages = iobuf->dr_npages; + unsigned long *blocks = iobuf->dr_blocks; + int total_blocks = npages * blocks_per_page; + int sector_bits = inode->i_sb->s_blocksize_bits - 9; + unsigned int blocksize = inode->i_sb->s_blocksize; + struct bio *bio = NULL; + int frags = 0; + struct page *page; + unsigned int page_offset; + sector_t sector; + int nblocks; + int block_idx; + int page_idx; + int i; + int rc = 0; + ENTRY; + + LASSERT(iobuf->dr_npages == npages); + + for (page_idx = 0, block_idx = 0; + page_idx < npages; + page_idx++, block_idx += blocks_per_page) { + + page = pages[page_idx]; + LASSERT (block_idx + blocks_per_page <= total_blocks); + + for (i = 0, page_offset = 0; + i < blocks_per_page; + i += nblocks, page_offset += blocksize * nblocks) { + + nblocks = 1; + + if (blocks[block_idx + i] == 0) { /* hole */ + LASSERTF(rw == OBD_BRW_READ, + "page_idx %u, block_idx %u, i %u\n", + page_idx, block_idx, i); + memset(kmap(page) + page_offset, 0, blocksize); + kunmap(page); + continue; + } + + sector = (sector_t)blocks[block_idx + i] << sector_bits; + + /* Additional contiguous file blocks? */ + while (i + nblocks < blocks_per_page && + (sector + (nblocks << sector_bits)) == + ((sector_t)blocks[block_idx + i + nblocks] << + sector_bits)) + nblocks++; + + /* I only set the page to be constant only if it + * is mapped to a contiguous underlying disk block(s). + * It will then make sure the corresponding device + * cache of raid5 will be overwritten by this page. + * - jay */ + if ((rw == OBD_BRW_WRITE) && + (nblocks == blocks_per_page) && + mapping_cap_page_constant_write(inode->i_mapping)) + SetPageConstant(page); + + if (bio != NULL && + can_be_merged(bio, sector) && + bio_add_page(bio, page, + blocksize * nblocks, page_offset) != 0) + continue; /* added this frag OK */ + + if (bio != NULL) { + request_queue_t *q = + bdev_get_queue(bio->bi_bdev); + + /* Dang! I have to fragment this I/O */ + CDEBUG(D_INODE, "bio++ sz %d vcnt %d(%d) " + "sectors %d(%d) psg %d(%d) hsg %d(%d)\n", + bio->bi_size, + bio->bi_vcnt, bio->bi_max_vecs, + bio->bi_size >> 9, q->max_sectors, + bio_phys_segments(q, bio), + q->max_phys_segments, + bio_hw_segments(q, bio), + q->max_hw_segments); + + atomic_inc(&iobuf->dr_numreqs); + osd_submit_bio(rw, bio); + frags++; + } + + /* allocate new bio, limited by max BIO size, b=9945 */ + bio = bio_alloc(GFP_NOIO, max(BIO_MAX_PAGES, + (npages - page_idx) * + blocks_per_page)); + if (bio == NULL) { + CERROR("Can't allocate bio %u*%u = %u pages\n", + (npages - page_idx), blocks_per_page, + (npages - page_idx) * blocks_per_page); + rc = -ENOMEM; + goto out; + } + + bio->bi_bdev = inode->i_sb->s_bdev; + bio->bi_sector = sector; + bio->bi_end_io = dio_complete_routine; + bio->bi_private = iobuf; + + rc = bio_add_page(bio, page, + blocksize * nblocks, page_offset); + LASSERT (rc != 0); + } + } + + if (bio != NULL) { + atomic_inc(&iobuf->dr_numreqs); + osd_submit_bio(rw, bio); + frags++; + rc = 0; + } + + out: + wait_event(iobuf->dr_wait, atomic_read(&iobuf->dr_numreqs) == 0); + + if (rc == 0) + rc = iobuf->dr_error; + RETURN(rc); +} + +static int osd_map_remote_to_local(loff_t offset, ssize_t len, int *nrpages, + struct niobuf_local *res) +{ + struct niobuf_local *lb = res; + ENTRY; + + *nrpages = 0; + + while (len > 0) { + int poff = offset & (CFS_PAGE_SIZE - 1); + int plen = CFS_PAGE_SIZE - poff; + + if (plen > len) + plen = len; + lb->file_offset = offset; + lb->page_offset = poff; + lb->len = plen; + //lb->flags = rnb->flags; + lb->flags = 0; + lb->page = NULL; + lb->rc = 0; + lb->lnb_grant_used = 0; + + LASSERTF(plen <= len, "plen %u, len %u\n", plen, len); + offset += plen; + len -= plen; + lb++; + (*nrpages)++; + } + + RETURN(0); +} + +int osd_get_bufs(const struct lu_env *env, struct dt_object *d, + loff_t pos, ssize_t len, struct niobuf_local *l) +{ + struct osd_object *obj = osd_dt_obj(d); + struct niobuf_local *lb; + int npages, i, rc = 0; + + LASSERT(obj->oo_inode); + + osd_map_remote_to_local(pos, len, &npages, l); + + for (i = 0, lb = l; i < npages; i++, lb++) { + + /* We still set up for ungranted pages so that granted pages + * can be written to disk as they were promised, and portals + * needs to keep the pages all aligned properly. */ + lb->obj = obj; + + lb->page = find_or_create_page(obj->oo_inode->i_mapping, + lb->file_offset >> CFS_PAGE_SHIFT, + GFP_NOFS | __GFP_HIGHMEM); + if (lb->page == NULL) + GOTO(cleanup, rc = -ENOMEM); + +#if 0 + /* DLM locking protects us from write and truncate competing + * for same region, but truncate can leave dirty page in the + * cache. it's possible the writeout on a such a page is in + * progress when we access it. it's also possible that during + * this writeout we put new (partial) data, but then won't + * be able to proceed in filter_commitrw_write(). thus let's + * just wait for writeout completion, should be rare enough. + * -bzzz */ + if (obd->u.filter.fo_writethrough_cache) + wait_on_page_writeback(lb->page); +#endif + BUG_ON(PageWriteback(lb->page)); + + } + rc = i; + +cleanup: + RETURN(rc); +} + +static int osd_put_bufs(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lb, int npages) +{ + int i; + + for (i = 0; i < npages; i++) { + if (lb[i].page == NULL) + continue; + { + unsigned long *pp = (void *) page_address(lb[i].page); + if (*pp == 0) + CERROR("page %lu starts with 0\n", lb[i].page->index); + } + LASSERT(PageLocked(lb[i].page)); + unlock_page(lb[i].page); + page_cache_release(lb[i].page); + lb[i].page = NULL; + } + RETURN(0); +} + +static int osd_write_prep(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lb, int npages, unsigned long *used) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct filter_iobuf *iobuf = &oti->oti_iobuf; + struct inode *inode = osd_dt_obj(dt)->oo_inode; + struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt)); + ssize_t isize; + __s64 maxidx; + int rc, i; + + LASSERT(inode); + + filter_init_iobuf(iobuf); + + isize = i_size_read(inode); + maxidx = ((isize + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT) - 1; + + for (i = 0; i < npages; i++) { + if (lb[i].len == CFS_PAGE_SIZE) + continue; + + if (maxidx >= lb[i].page->index) { + filter_iobuf_add_page(iobuf, lb[i].page); + } else { + long off; + char *p = kmap(lb[i].page); + + off = lb[i].page_offset; + if (off) + memset(p, 0, off); + off = (lb[i].page_offset + lb[i].len) & ~CFS_PAGE_MASK; + if (off) + memset(p + off, 0, CFS_PAGE_SIZE - off); + kunmap(lb[i].page); + } + } + rc = osd->od_fsops->fs_map_inode_pages(inode, iobuf->dr_pages, + iobuf->dr_npages, iobuf->dr_blocks, + NULL, 0, NULL); + rc = osd_do_bio(inode, iobuf, OBD_BRW_READ); + RETURN(rc); +} + +static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lb, int npages, struct thandle *thandle) +{ + RETURN(0); +} + +static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lb, int npages, struct thandle *thandle) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct filter_iobuf *iobuf = &oti->oti_iobuf; + struct inode *inode = osd_dt_obj(dt)->oo_inode; + struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt)); + loff_t isize; + int rc, i; + + LASSERT(inode); + + filter_init_iobuf(iobuf); + isize = i_size_read(inode); + + for (i = 0; i < npages; i++) { + if (lb[i].rc) { /* ENOSPC, network RPC error, etc. */ + CDEBUG(D_INODE, "Skipping [%d] == %d\n", i, lb[i].rc); + continue; + } + + { + unsigned long *pp = (void *) page_address(lb[i].page); + if (*pp == 0) + CERROR("page %lu starts with 0\n", lb[i].page->index); + } + + LASSERT(PageLocked(lb[i].page)); + LASSERT(!PageWriteback(lb[i].page)); + + if (lb[i].file_offset + lb[i].len > isize) + isize = lb[i].file_offset + lb[i].len; + + /* preceding filemap_write_and_wait() should have clean pages */ +#if 0 + if (fo->fo_writethrough_cache) + clear_page_dirty_for_io(lb[i].page); +#endif + LASSERT(!PageDirty(lb[i].page)); + + SetPageUptodate(lb[i].page); + + filter_iobuf_add_page(iobuf, lb[i].page); + } + rc = osd->od_fsops->fs_map_inode_pages(inode, iobuf->dr_pages, + iobuf->dr_npages, iobuf->dr_blocks, + NULL, 1, NULL); + if (isize > i_size_read(inode)) { + i_size_write(inode, isize); + LDISKFS_I(inode)->i_disksize = isize; + mark_inode_dirty(inode); + } + + rc = osd_do_bio(inode, iobuf, OBD_BRW_WRITE); + RETURN(0); +} + +static int osd_read_prep(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lb, int npages) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct filter_iobuf *iobuf = &oti->oti_iobuf; + struct inode *inode = osd_dt_obj(dt)->oo_inode; + struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt)); + int rc = 0, i, m = 0; + + LASSERT(inode); + + filter_init_iobuf(iobuf); + + for (i = 0; i < npages; i++) { + + if (i_size_read(inode) <= lb[i].file_offset) + /* If there's no more data, abort early. lb->rc == 0, + * so it's easy to detect later. */ + break; + + if (i_size_read(inode) < lb[i].file_offset + lb[i].len - 1) + lb[i].rc = i_size_read(inode) - lb[i].file_offset; + else + lb[i].rc = lb[i].len; + m += lb[i].len; + + if (PageUptodate(lb[i].page)) { + { + unsigned long *pp = (void *) page_address(lb[i].page); + if (*pp == 0) + CERROR("page %lu starts with 0\n", lb[i].page->index); + } + } + + filter_iobuf_add_page(iobuf, lb[i].page); + } + if (iobuf->dr_npages) { + rc = osd->od_fsops->fs_map_inode_pages(inode, iobuf->dr_pages, + iobuf->dr_npages, + iobuf->dr_blocks, + NULL, 0, NULL); + rc = osd_do_bio(inode, iobuf, OBD_BRW_READ); + } + + RETURN(rc); +} + +/* + * XXX: Another layering violation for now. + * + * We don't want to use ->f_op->read methods, because generic file write + * + * - serializes on ->i_sem, and + * + * - does a lot of extra work like balance_dirty_pages(), + * + * which doesn't work for globally shared files like /last-received. + */ +int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs); +int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize, + loff_t *offs, handle_t *handle); + +static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *buf, loff_t *pos, + struct lustre_capa *capa) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) + RETURN(-EACCES); + + return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos); +} + +static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, + const loff_t size, loff_t pos, + struct thandle *handle, struct lustre_capa *capa) +{ + struct osd_thandle *oh; + + LASSERT(handle != NULL); + + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE)) + return -EACCES; + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, write); + oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK]; + + return 0; +} + +static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *handle, int ignore_quota) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + struct osd_thandle *oh; + ssize_t result; +#ifdef HAVE_QUOTA_SUPPORT + cfs_cap_t save = current->cap_effective; +#endif + + LASSERT(handle != NULL); + OSD_EXEC_OP(handle, write); + + oh = container_of(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle->h_transaction != NULL); +#ifdef HAVE_QUOTA_SUPPORT + if (ignore_quota) + current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; + else + current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; +#endif + result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len, + pos, oh->ot_handle); +#ifdef HAVE_QUOTA_SUPPORT + current->cap_effective = save; +#endif + if (result == 0) + result = buf->lb_len; + return result; +} + +/* + * in some cases we may need declare methods for objects being created + * e.g., when we create symlink + */ +const struct dt_body_operations osd_body_ops_new = { + .dbo_declare_write = osd_declare_write, +}; + +const struct dt_body_operations osd_body_ops = { + .dbo_read = osd_read, + .dbo_declare_write = osd_declare_write, + .dbo_write = osd_write, + .dbo_get_bufs = osd_get_bufs, + .dbo_put_bufs = osd_put_bufs, + .dbo_write_prep = osd_write_prep, + .dbo_declare_write_commit = osd_declare_write_commit, + .dbo_write_commit = osd_write_commit, + .dbo_read_prep = osd_read_prep + +}; + diff --git a/lustre/osd/osd_oi.c b/lustre/osd/osd_oi.c index ad3c424..7759133 100644 --- a/lustre/osd/osd_oi.c +++ b/lustre/osd/osd_oi.c @@ -80,6 +80,16 @@ struct oi_descr { __u32 oid; }; +static struct super_block *osd_sb(const struct osd_device *dev) +{ + return dev->od_mount->lmi_mnt->mnt_sb; +} + +static journal_t *osd_journal(const struct osd_device *dev) +{ + return LDISKFS_SB(osd_sb(dev))->s_journal; +} + /** to serialize concurrent OI index initialization */ static struct mutex oi_init_lock; @@ -103,42 +113,99 @@ static const struct oi_descr oi_descr[OSD_OI_FID_NR] = { } }; +struct dentry * osd_child_dentry_by_inode(const struct lu_env *env, + struct inode *inode, + const char *name, + const int namelen); +extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry, + struct ldiskfs_dir_entry_2 + ** res_dir); + +static int osd_oi_index_create_one(struct osd_thread_info *info, + struct osd_device *osd, char *name, + struct dt_index_features *feat) +{ + const struct lu_env *env = info->oti_env; + struct osd_inode_id *id = &info->oti_id; + struct buffer_head *bh; + struct inode *inode; + struct ldiskfs_dir_entry_2 *de; + struct dentry *dentry; + handle_t *jh; + int rc; + + dentry = osd_child_dentry_by_inode(env, osd_sb(osd)->s_root->d_inode, + name, strlen(name)); + bh = ldiskfs_find_entry(dentry, &de); + if (bh) { + brelse(bh); + + id->oii_ino = le32_to_cpu(de->inode); + id->oii_gen = OSD_OII_NOGEN; + + inode = osd_iget(info, osd, id); + if (!IS_ERR(inode)) { + iput(inode); + RETURN(-EEXIST); + } + RETURN(PTR_ERR(inode)); + } + + jh = journal_start(osd_journal(osd), 100); + LASSERT(!IS_ERR(jh)); + + inode = ldiskfs_create_inode(jh, osd_sb(osd)->s_root->d_inode, + (S_IFMT | S_IRWXUGO | S_ISVTX)); + LASSERT(!IS_ERR(inode)); + + if (feat->dif_flags & DT_IND_VARKEY) + rc = iam_lvar_create(inode, feat->dif_keysize_max, + feat->dif_ptrsize, feat->dif_recsize_max, jh); + else + rc = iam_lfix_create(inode, feat->dif_keysize_max, + feat->dif_ptrsize, feat->dif_recsize_max, jh); + + dentry = osd_child_dentry_by_inode(env, osd_sb(osd)->s_root->d_inode, + name, strlen(name)); + rc = ldiskfs_add_entry(jh, dentry, inode); + LASSERT(rc == 0); + + journal_stop(jh); + iput(inode); + + return rc; +} + static int osd_oi_index_create(struct osd_thread_info *info, - struct dt_device *dev, - struct md_device *mdev) + struct osd_device *osd) { const struct lu_env *env; struct lu_fid *oi_fid = &info->oti_fid; - struct md_object *mdo; int i; int rc; env = info->oti_env; - for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) { + for (i = rc = 0; i < OSD_OI_FID_NR; ++i) { char *name; name = oi_descr[i].name; lu_local_obj_fid(oi_fid, oi_descr[i].oid); - oi_feat.dif_keysize_min = oi_descr[i].fid_size, - oi_feat.dif_keysize_max = oi_descr[i].fid_size, - - mdo = llo_store_create_index(env, mdev, dev, - "", name, - oi_fid, &oi_feat); + oi_feat.dif_keysize_min = oi_descr[i].fid_size; + oi_feat.dif_keysize_max = oi_descr[i].fid_size; - if (IS_ERR(mdo)) - RETURN(PTR_ERR(mdo)); - - lu_object_put(env, &mdo->mo_lu); + rc = osd_oi_index_create_one(info, osd, name, &oi_feat); + + if (rc == -ESTALE || rc != -EEXIST) + return(rc); } return 0; } int osd_oi_init(struct osd_thread_info *info, struct osd_oi *oi, - struct dt_device *dev, - struct md_device *mdev) + struct osd_device *osd) { + struct dt_device *dev = &osd->od_dt_dev; const struct lu_env *env; int rc; int i; @@ -147,8 +214,8 @@ int osd_oi_init(struct osd_thread_info *info, env = info->oti_env; mutex_lock(&oi_init_lock); - memset(oi, 0, sizeof *oi); retry: + memset(oi, 0, sizeof *oi); for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) { const char *name; struct dt_object *obj; @@ -169,8 +236,10 @@ retry: } } else { rc = PTR_ERR(obj); + while (--i >= 0) + lu_object_put(env, &oi->oi_dir[i]->do_lu); if (rc == -ENOENT) { - rc = osd_oi_index_create(info, dev, mdev); + rc = osd_oi_index_create(info, osd); if (!rc) goto retry; } @@ -244,6 +313,9 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi, return -ENOENT; key = oi_fid_key(info, oi, fid, &idx); + LASSERT(idx); + LASSERT(idx->do_index_ops); + LASSERT(idx->do_index_ops->dio_lookup); rc = idx->do_index_ops->dio_lookup(info->oti_env, idx, (struct dt_rec *)id, key, BYPASS_CAPA); diff --git a/lustre/osd/osd_oi.h b/lustre/osd/osd_oi.h index fe87768..c826827 100644 --- a/lustre/osd/osd_oi.h +++ b/lustre/osd/osd_oi.h @@ -62,6 +62,7 @@ struct lu_site; struct thandle; struct dt_device; +struct osd_device; enum { OSD_OI_FID_SMALL, @@ -92,10 +93,8 @@ struct osd_inode_id { }; int osd_oi_mod_init(void); -int osd_oi_init(struct osd_thread_info *info, - struct osd_oi *oi, - struct dt_device *dev, - struct md_device *mdev); +int osd_oi_init(struct osd_thread_info *info, struct osd_oi *oi, + struct osd_device *osd); void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi); int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi, -- 1.8.3.1