X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd%2Fosd_handler.c;h=b826a7112165c47c27f7b95a81fd452a1383ed06;hp=0d4b6be5a6ce89b93fa29557f9663434c23ae953;hb=b9b2dae910f6eec020955be6ded8c4118f965b0f;hpb=4201416b775b14d6e4cd89b7c68bb1c1bc950144 diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 0d4b6be..b826a71 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -38,6 +38,7 @@ * Top-level entry points into osd module * * Author: Nikita Danilov + * Pravin Shelar : Added fid in dirent */ #ifndef EXPORT_SYMTAB @@ -55,15 +56,6 @@ #include /* XATTR_{REPLACE,CREATE} */ #include -/* - * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd - * and file system is not yet specified. - */ -/* handle_t, journal_start(), journal_stop() */ -#include -/* LDISKFS_SB() */ -#include -#include /* simple_mkdir() */ #include @@ -77,7 +69,6 @@ /* fid_is_local() */ #include -#include #include "osd_internal.h" #include "osd_igif.h" @@ -85,7 +76,6 @@ /* llo_* api support */ #include -static const char MDT_XATTR_NAME[] = "trusted.lma"; static const char dot[] = "."; static const char dotdot[] = ".."; static const char remote_obj_dir[] = "REM_OBJ_DIR"; @@ -93,7 +83,6 @@ static const char remote_obj_dir[] = "REM_OBJ_DIR"; struct osd_directory { struct iam_container od_container; struct iam_descr od_descr; - struct semaphore od_sem; }; struct osd_object { @@ -106,17 +95,21 @@ struct osd_object { * creation, or assigned by osd_object_create() under write lock). */ struct inode *oo_inode; - struct rw_semaphore oo_sem; + /** + * to protect index ops. + */ + cfs_rw_semaphore_t oo_ext_idx_sem; + cfs_rw_semaphore_t oo_sem; struct osd_directory *oo_dir; /** protects inode attributes. */ - spinlock_t oo_guard; + cfs_spinlock_t oo_guard; /** * Following two members are used to indicate the presence of dot and * dotdot in the given directory. This is required for interop mode * (b11826). */ - int oo_compat_dot_created; - int oo_compat_dotdot_created; + int oo_compat_dot_created; + int oo_compat_dotdot_created; const struct lu_env *oo_owner; #ifdef CONFIG_LOCKDEP @@ -124,155 +117,7 @@ struct osd_object { #endif }; -static int osd_root_get (const struct lu_env *env, - struct dt_device *dev, struct lu_fid *f); - -static int lu_device_is_osd (const struct lu_device *d); -static void osd_mod_exit (void) __exit; -static int osd_mod_init (void) __init; -static int osd_type_init (struct lu_device_type *t); -static void osd_type_fini (struct lu_device_type *t); -static int osd_object_init (const struct lu_env *env, - struct lu_object *l, - const struct lu_object_conf *_); -static void osd_object_release(const struct lu_env *env, - struct lu_object *l); -static int osd_object_print (const struct lu_env *env, void *cookie, - lu_printer_t p, const struct lu_object *o); -static struct lu_device *osd_device_free (const struct lu_env *env, - struct lu_device *m); -static void *osd_key_init (const struct lu_context *ctx, - struct lu_context_key *key); -static void osd_key_fini (const struct lu_context *ctx, - struct lu_context_key *key, void *data); -static void osd_key_exit (const struct lu_context *ctx, - struct lu_context_key *key, void *data); -static int osd_has_index (const struct osd_object *obj); -static void osd_object_init0 (struct osd_object *obj); -static int osd_device_init (const struct lu_env *env, - struct lu_device *d, const char *, - struct lu_device *); -static int osd_fid_lookup (const struct lu_env *env, - struct osd_object *obj, - const struct lu_fid *fid); -static void osd_inode_getattr (const struct lu_env *env, - struct inode *inode, struct lu_attr *attr); -static int osd_inode_setattr (const struct lu_env *env, - struct inode *inode, const struct lu_attr *attr); -static int osd_param_is_sane (const struct osd_device *dev, - const struct txn_param *param); -static int osd_index_iam_lookup(const struct lu_env *env, - struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa); -static int osd_index_ea_lookup(const struct lu_env *env, - struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa); -static int osd_index_iam_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa, - int ingore_quota); -static int osd_index_ea_insert (const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa, - int ingore_quota); -static int osd_index_iam_delete(const struct lu_env *env, - struct dt_object *dt, const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa); -static int osd_index_ea_delete (const struct lu_env *env, - struct dt_object *dt, const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa); - -static int osd_iam_index_probe (const struct lu_env *env, - struct osd_object *o, - const struct dt_index_features *feat); -static int osd_index_try (const struct lu_env *env, - struct dt_object *dt, - const struct dt_index_features *feat); -static void osd_index_fini (struct osd_object *o); - -static void osd_it_iam_fini (const struct lu_env *env, struct dt_it *di); -static int osd_it_iam_get (const struct lu_env *env, - struct dt_it *di, const struct dt_key *key); -static void osd_it_iam_put (const struct lu_env *env, struct dt_it *di); -static int osd_it_iam_next (const struct lu_env *env, struct dt_it *di); -static int osd_it_iam_key_size (const struct lu_env *env, - const struct dt_it *di); -static void osd_it_ea_fini (const struct lu_env *env, struct dt_it *di); -static int osd_it_ea_get (const struct lu_env *env, - struct dt_it *di, const struct dt_key *key); -static void osd_it_ea_put (const struct lu_env *env, struct dt_it *di); -static int osd_it_ea_next (const struct lu_env *env, struct dt_it *di); -static int osd_it_ea_key_size(const struct lu_env *env, - const struct dt_it *di); - -static void osd_conf_get (const struct lu_env *env, - const struct dt_device *dev, - struct dt_device_param *param); -static void osd_trans_stop (const struct lu_env *env, - struct thandle *th); -static int osd_object_is_root(const struct osd_object *obj); - -static struct osd_object *osd_obj (const struct lu_object *o); -static struct osd_device *osd_dev (const struct lu_device *d); -static struct osd_device *osd_dt_dev (const struct dt_device *d); -static struct osd_object *osd_dt_obj (const struct dt_object *d); -static struct osd_device *osd_obj2dev (const struct osd_object *o); -static struct lu_device *osd2lu_dev (struct osd_device *osd); -static struct lu_device *osd_device_fini (const struct lu_env *env, - struct lu_device *d); -static struct lu_device *osd_device_alloc (const struct lu_env *env, - struct lu_device_type *t, - struct lustre_cfg *cfg); -static struct lu_object *osd_object_alloc (const struct lu_env *env, - const struct lu_object_header *hdr, - struct lu_device *d); -static struct inode *osd_iget (struct osd_thread_info *info, - struct osd_device *dev, - const struct osd_inode_id *id); -static struct super_block *osd_sb (const struct osd_device *dev); -static struct dt_it *osd_it_iam_init (const struct lu_env *env, - struct dt_object *dt, - struct lustre_capa *capa); -static struct dt_key *osd_it_iam_key (const struct lu_env *env, - const struct dt_it *di); -static struct dt_rec *osd_it_iam_rec (const struct lu_env *env, - const struct dt_it *di); -static struct dt_it *osd_it_ea_init (const struct lu_env *env, - struct dt_object *dt, - struct lustre_capa *capa); -static struct dt_key *osd_it_ea_key (const struct lu_env *env, - const struct dt_it *di); -static struct dt_rec *osd_it_ea_rec (const struct lu_env *env, - const struct dt_it *di); - -static struct timespec *osd_inode_time (const struct lu_env *env, - struct inode *inode, - __u64 seconds); -static struct thandle *osd_trans_start (const struct lu_env *env, - struct dt_device *d, - struct txn_param *p); -static journal_t *osd_journal (const struct osd_device *dev); - -static int __osd_ea_add_rec(struct osd_thread_info *info, - struct osd_object *pobj, - struct osd_object *cobj, - const char *name, - struct thandle *th); - -static const struct lu_device_type_operations osd_device_type_ops; -static struct lu_device_type osd_device_type; static const struct lu_object_operations osd_lu_obj_ops; -static struct obd_ops osd_obd_device_ops; static const struct lu_device_operations osd_lu_ops; static struct lu_context_key osd_key; static const struct dt_object_operations osd_obj_ops; @@ -290,6 +135,72 @@ struct osd_thandle { }; +/* + * Helpers. + */ +static int lu_device_is_osd(const struct lu_device *d) +{ + return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops); +} + +static struct osd_device *osd_dt_dev(const struct dt_device *d) +{ + LASSERT(lu_device_is_osd(&d->dd_lu_dev)); + return container_of0(d, struct osd_device, od_dt_dev); +} + +static struct osd_device *osd_dev(const struct lu_device *d) +{ + LASSERT(lu_device_is_osd(d)); + return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev)); +} + +static struct osd_device *osd_obj2dev(const struct osd_object *o) +{ + return osd_dev(o->oo_dt.do_lu.lo_dev); +} + +static struct super_block *osd_sb(const struct osd_device *dev) +{ + return dev->od_mount->lmi_mnt->mnt_sb; +} + +static int osd_object_is_root(const struct osd_object *obj) +{ + return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode; +} + +static struct osd_object *osd_obj(const struct lu_object *o) +{ + LASSERT(lu_device_is_osd(o->lo_dev)); + return container_of0(o, struct osd_object, oo_dt.do_lu); +} + +static struct osd_object *osd_dt_obj(const struct dt_object *d) +{ + return osd_obj(&d->do_lu); +} + +static struct lu_device *osd2lu_dev(struct osd_device *osd) +{ + return &osd->od_dt_dev.dd_lu_dev; +} + +static journal_t *osd_journal(const struct osd_device *dev) +{ + return LDISKFS_SB(osd_sb(dev))->s_journal; +} + +static int osd_has_index(const struct osd_object *obj) +{ + return obj->oo_dt.do_index_ops != NULL; +} + +static int osd_object_invariant(const struct lu_object *l) +{ + return osd_invariant(osd_obj(l)); +} + #ifdef HAVE_QUOTA_SUPPORT static inline void osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save) @@ -315,32 +226,6 @@ osd_pop_ctxt(struct osd_ctxt *save) } #endif -/* - * Invariants, assertions. - */ - -/* - * XXX: do not enable this, until invariant checking code is made thread safe - * in the face of pdirops locking. - */ -#define OSD_INVARIANT_CHECKS (0) - -#if OSD_INVARIANT_CHECKS -static int osd_invariant(const struct osd_object *obj) -{ - return - obj != NULL && - ergo(obj->oo_inode != NULL, - obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) && - atomic_read(&obj->oo_inode->i_count) > 0) && - ergo(obj->oo_dir != NULL && - obj->oo_dir->od_conationer.ic_object != NULL, - obj->oo_dir->od_conationer.ic_object == obj->oo_inode); -} -#else -#define osd_invariant(obj) (1) -#endif - static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env) { return lu_context_key_get(&env->le_ctx, &osd_key); @@ -402,14 +287,112 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, mo->oo_dt.do_ops = &osd_obj_ops; l->lo_ops = &osd_lu_obj_ops; - init_rwsem(&mo->oo_sem); - spin_lock_init(&mo->oo_guard); + cfs_init_rwsem(&mo->oo_sem); + cfs_init_rwsem(&mo->oo_ext_idx_sem); + cfs_spin_lock_init(&mo->oo_guard); return l; } else return NULL; } /* + * retrieve object from backend ext fs. + **/ +static struct inode *osd_iget(struct osd_thread_info *info, + struct osd_device *dev, + const struct osd_inode_id *id) +{ + struct inode *inode = NULL; + +#ifdef HAVE_EXT4_LDISKFS + inode = ldiskfs_iget(osd_sb(dev), id->oii_ino); + if (IS_ERR(inode)) + /* Newer kernels return an error instead of a NULL pointer */ + inode = NULL; +#else + inode = iget(osd_sb(dev), id->oii_ino); +#endif + if (inode == NULL) { + CERROR("no inode\n"); + inode = ERR_PTR(-EACCES); + } else if (id->oii_gen != OSD_OII_NOGEN && + inode->i_generation != id->oii_gen) { + iput(inode); + inode = ERR_PTR(-ESTALE); + } else if (inode->i_nlink == 0) { + /* due to parallel readdir and unlink, + * we can have dead inode here. */ + CWARN("stale inode\n"); + make_bad_inode(inode); + iput(inode); + inode = ERR_PTR(-ESTALE); + } else if (is_bad_inode(inode)) { + CERROR("bad inode %lx\n",inode->i_ino); + iput(inode); + inode = ERR_PTR(-ENOENT); + } + return inode; +} + +static int osd_fid_lookup(const struct lu_env *env, + struct osd_object *obj, const struct lu_fid *fid) +{ + struct osd_thread_info *info; + struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; + struct osd_device *dev; + struct osd_inode_id *id; + struct osd_oi *oi; + struct inode *inode; + int result; + + LINVRNT(osd_invariant(obj)); + LASSERT(obj->oo_inode == NULL); + LASSERT(fid_is_sane(fid)); + /* + * This assertion checks that osd layer sees only local + * fids. Unfortunately it is somewhat expensive (does a + * cache-lookup). Disabling it for production/acceptance-testing. + */ + LASSERT(1 || fid_is_local(env, ldev->ld_site, fid)); + + ENTRY; + + info = osd_oti_get(env); + dev = osd_dev(ldev); + id = &info->oti_id; + oi = &dev->od_oi; + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) + RETURN(-ENOENT); + + result = osd_oi_lookup(info, oi, fid, id); + if (result == 0) { + inode = osd_iget(info, dev, id); + if (!IS_ERR(inode)) { + obj->oo_inode = inode; + LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); + if (dev->od_iop_mode) { + obj->oo_compat_dot_created = 1; + obj->oo_compat_dotdot_created = 1; + } + result = 0; + } else + /* + * If fid wasn't found in oi, inode-less object is + * created, for which lu_object_exists() returns + * false. This is used in a (frequent) case when + * objects are created as locking anchors or + * place holders for objects yet to be created. + */ + result = PTR_ERR(inode); + } else if (result == -ENOENT) + result = 0; + LINVRNT(osd_invariant(obj)); + + RETURN(result); +} + +/* * Concurrency: shouldn't matter. */ static void osd_object_init0(struct osd_object *obj) @@ -425,7 +408,7 @@ static void osd_object_init0(struct osd_object *obj) * life-cycle. */ static int osd_object_init(const struct lu_env *env, struct lu_object *l, - const struct lu_object_conf *_) + const struct lu_object_conf *unused) { struct osd_object *obj = osd_obj(l); int result; @@ -455,6 +438,9 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l) OBD_FREE_PTR(obj); } +/** + * IAM Iterator + */ static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env, const struct iam_container *bag) { @@ -511,6 +497,147 @@ enum { }; /* + * Journal + */ + +/* + * Concurrency: doesn't access mutable data. + */ +static int osd_param_is_sane(const struct osd_device *dev, + const struct txn_param *param) +{ + return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers; +} + +/* + * Concurrency: shouldn't matter. + */ +static void osd_trans_commit_cb(struct journal_callback *jcb, int error) +{ + struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb); + struct thandle *th = &oh->ot_super; + struct dt_device *dev = th->th_dev; + struct lu_device *lud = &dev->dd_lu_dev; + + LASSERT(dev != NULL); + LASSERT(oh->ot_handle == NULL); + + if (error) { + CERROR("transaction @0x%p commit error: %d\n", th, error); + } else { + struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit; + /* + * This od_env_for_commit is only for commit usage. see + * "struct dt_device" + */ + lu_context_enter(&env->le_ctx); + dt_txn_hook_commit(env, th); + lu_context_exit(&env->le_ctx); + } + + lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th); + lu_device_put(lud); + th->th_dev = NULL; + + lu_context_exit(&th->th_ctx); + lu_context_fini(&th->th_ctx); + OBD_FREE_PTR(oh); +} + +/* + * Concurrency: shouldn't matter. + */ +static struct thandle *osd_trans_start(const struct lu_env *env, + struct dt_device *d, + struct txn_param *p) +{ + struct osd_device *dev = osd_dt_dev(d); + handle_t *jh; + struct osd_thandle *oh; + struct thandle *th; + int hook_res; + + ENTRY; + + hook_res = dt_txn_hook_start(env, d, p); + if (hook_res != 0) + RETURN(ERR_PTR(hook_res)); + + if (osd_param_is_sane(dev, p)) { + OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); + if (oh != NULL) { + struct osd_thread_info *oti = osd_oti_get(env); + + /* + * XXX temporary stuff. Some abstraction layer should + * be used. + */ + + jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits); + if (!IS_ERR(jh)) { + oh->ot_handle = jh; + th = &oh->ot_super; + th->th_dev = d; + th->th_result = 0; + jh->h_sync = p->tp_sync; + lu_device_get(&d->dd_lu_dev); + oh->ot_dev_link = lu_ref_add + (&d->dd_lu_dev.ld_reference, + "osd-tx", th); + /* add commit callback */ + lu_context_init(&th->th_ctx, LCT_TX_HANDLE); + lu_context_enter(&th->th_ctx); + osd_journal_callback_set(jh, osd_trans_commit_cb, + (struct journal_callback *)&oh->ot_jcb); + LASSERT(oti->oti_txns == 0); + LASSERT(oti->oti_r_locks == 0); + LASSERT(oti->oti_w_locks == 0); + oti->oti_txns++; + } else { + OBD_FREE_PTR(oh); + th = (void *)jh; + } + } else + th = ERR_PTR(-ENOMEM); + } else { + CERROR("Invalid transaction parameters\n"); + th = ERR_PTR(-EINVAL); + } + + RETURN(th); +} + +/* + * Concurrency: shouldn't matter. + */ +static void osd_trans_stop(const struct lu_env *env, struct thandle *th) +{ + int result; + struct osd_thandle *oh; + struct osd_thread_info *oti = osd_oti_get(env); + + ENTRY; + + oh = container_of0(th, struct osd_thandle, ot_super); + if (oh->ot_handle != NULL) { + handle_t *hdl = oh->ot_handle; + + LASSERT(oti->oti_txns == 1); + oti->oti_txns--; + LASSERT(oti->oti_r_locks == 0); + LASSERT(oti->oti_w_locks == 0); + result = dt_txn_hook_stop(env, th); + if (result != 0) + CERROR("Failure in transaction hook: %d\n", result); + oh->ot_handle = NULL; + result = ldiskfs_journal_stop(hdl); + if (result != 0) + CERROR("Failure to stop transaction: %d\n", result); + } + EXIT; +} + +/* * Concurrency: no concurrent access is possible that late in object * life-cycle. */ @@ -520,17 +647,22 @@ static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj) struct osd_device *osd = osd_obj2dev(obj); struct osd_thread_info *oti = osd_oti_get(env); struct txn_param *prm = &oti->oti_txn; + struct lu_env *env_del_obj = &oti->oti_obj_delete_tx_env; struct thandle *th; int result; + lu_env_init(env_del_obj, LCT_DT_THREAD); txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + OSD_TXN_INODE_DELETE_CREDITS); - th = osd_trans_start(env, &osd->od_dt_dev, prm); + th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm); if (!IS_ERR(th)) { - result = osd_oi_delete(oti, &osd->od_oi, fid, th); - osd_trans_stop(env, th); + result = osd_oi_delete(osd_oti_get(env_del_obj), + &osd->od_oi, fid, th); + osd_trans_stop(env_del_obj, th); } else result = PTR_ERR(th); + + lu_env_fini(env_del_obj); return result; } @@ -579,7 +711,7 @@ static void osd_object_release(const struct lu_env *env, LASSERT(!lu_object_is_dying(l->lo_header)); if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode)) - set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags); + cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags); } /* @@ -591,209 +723,68 @@ static int osd_object_print(const struct lu_env *env, void *cookie, struct osd_object *o = osd_obj(l); struct iam_descr *d; - if (o->oo_dir != NULL) - d = o->oo_dir->od_container.ic_descr; - else - d = NULL; - return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]", - o, o->oo_inode, - o->oo_inode ? o->oo_inode->i_ino : 0UL, - o->oo_inode ? o->oo_inode->i_generation : 0, - d ? d->id_ops->id_name : "plain"); -} - -/* - * Concurrency: shouldn't matter. - */ -int osd_statfs(const struct lu_env *env, struct dt_device *d, - struct kstatfs *sfs) -{ - struct osd_device *osd = osd_dt_dev(d); - struct super_block *sb = osd_sb(osd); - int result = 0; - - spin_lock(&osd->od_osfs_lock); - /* cache 1 second */ - if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) { - result = ll_do_statfs(sb, &osd->od_kstatfs); - if (likely(result == 0)) /* N.B. statfs can't really fail */ - osd->od_osfs_age = cfs_time_current_64(); - } - - if (likely(result == 0)) - *sfs = osd->od_kstatfs; - spin_unlock(&osd->od_osfs_lock); - - return result; -} - -/* - * Concurrency: doesn't access mutable data. - */ -static void osd_conf_get(const struct lu_env *env, - const struct dt_device *dev, - struct dt_device_param *param) -{ - /* - * XXX should be taken from not-yet-existing fs abstraction layer. - */ - param->ddp_max_name_len = LDISKFS_NAME_LEN; - param->ddp_max_nlink = LDISKFS_LINK_MAX; - param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits; -} - -/** - * Helper function to get and fill the buffer with input values. - */ -static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len) -{ - struct lu_buf *buf; - - buf = &osd_oti_get(env)->oti_buf; - buf->lb_buf = area; - buf->lb_len = len; - return buf; -} - -/* - * Journal - */ - -/* - * Concurrency: doesn't access mutable data. - */ -static int osd_param_is_sane(const struct osd_device *dev, - const struct txn_param *param) -{ - return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers; -} - -/* - * Concurrency: shouldn't matter. - */ -static void osd_trans_commit_cb(struct journal_callback *jcb, int error) -{ - struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb); - struct thandle *th = &oh->ot_super; - struct dt_device *dev = th->th_dev; - struct lu_device *lud = &dev->dd_lu_dev; - - LASSERT(dev != NULL); - LASSERT(oh->ot_handle == NULL); - - if (error) { - CERROR("transaction @0x%p commit error: %d\n", th, error); - } else { - struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit; - /* - * This od_env_for_commit is only for commit usage. see - * "struct dt_device" - */ - lu_context_enter(&env->le_ctx); - dt_txn_hook_commit(env, th); - lu_context_exit(&env->le_ctx); - } - - lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th); - lu_device_put(lud); - th->th_dev = NULL; - - lu_context_exit(&th->th_ctx); - lu_context_fini(&th->th_ctx); - OBD_FREE_PTR(oh); + if (o->oo_dir != NULL) + d = o->oo_dir->od_container.ic_descr; + else + d = NULL; + return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]", + o, o->oo_inode, + o->oo_inode ? o->oo_inode->i_ino : 0UL, + o->oo_inode ? o->oo_inode->i_generation : 0, + d ? d->id_ops->id_name : "plain"); } /* * Concurrency: shouldn't matter. */ -static struct thandle *osd_trans_start(const struct lu_env *env, - struct dt_device *d, - struct txn_param *p) +int osd_statfs(const struct lu_env *env, struct dt_device *d, + cfs_kstatfs_t *sfs) { - struct osd_device *dev = osd_dt_dev(d); - handle_t *jh; - struct osd_thandle *oh; - struct thandle *th; - int hook_res; - - ENTRY; - - hook_res = dt_txn_hook_start(env, d, p); - if (hook_res != 0) - RETURN(ERR_PTR(hook_res)); - - if (osd_param_is_sane(dev, p)) { - OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); - if (oh != NULL) { - struct osd_thread_info *oti = osd_oti_get(env); - - /* - * XXX temporary stuff. Some abstraction layer should - * be used. - */ + struct osd_device *osd = osd_dt_dev(d); + struct super_block *sb = osd_sb(osd); + int result = 0; - jh = journal_start(osd_journal(dev), p->tp_credits); - if (!IS_ERR(jh)) { - oh->ot_handle = jh; - th = &oh->ot_super; - th->th_dev = d; - th->th_result = 0; - jh->h_sync = p->tp_sync; - lu_device_get(&d->dd_lu_dev); - oh->ot_dev_link = lu_ref_add - (&d->dd_lu_dev.ld_reference, - "osd-tx", th); - /* add commit callback */ - lu_context_init(&th->th_ctx, LCT_TX_HANDLE); - lu_context_enter(&th->th_ctx); - journal_callback_set(jh, osd_trans_commit_cb, - (struct journal_callback *)&oh->ot_jcb); - LASSERT(oti->oti_txns == 0); - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - oti->oti_txns++; - } else { - OBD_FREE_PTR(oh); - th = (void *)jh; - } - } else - th = ERR_PTR(-ENOMEM); - } else { - CERROR("Invalid transaction parameters\n"); - th = ERR_PTR(-EINVAL); + cfs_spin_lock(&osd->od_osfs_lock); + /* cache 1 second */ + if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) { + result = ll_do_statfs(sb, &osd->od_kstatfs); + if (likely(result == 0)) /* N.B. statfs can't really fail */ + osd->od_osfs_age = cfs_time_current_64(); } - RETURN(th); + if (likely(result == 0)) + *sfs = osd->od_kstatfs; + cfs_spin_unlock(&osd->od_osfs_lock); + + return result; } /* - * Concurrency: shouldn't matter. + * Concurrency: doesn't access mutable data. */ -static void osd_trans_stop(const struct lu_env *env, struct thandle *th) +static void osd_conf_get(const struct lu_env *env, + const struct dt_device *dev, + struct dt_device_param *param) { - int result; - struct osd_thandle *oh; - struct osd_thread_info *oti = osd_oti_get(env); - - ENTRY; + /* + * XXX should be taken from not-yet-existing fs abstraction layer. + */ + param->ddp_max_name_len = LDISKFS_NAME_LEN; + param->ddp_max_nlink = LDISKFS_LINK_MAX; + param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits; +} - oh = container_of0(th, struct osd_thandle, ot_super); - if (oh->ot_handle != NULL) { - handle_t *hdl = oh->ot_handle; +/** + * Helper function to get and fill the buffer with input values. + */ +static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len) +{ + struct lu_buf *buf; - LASSERT(oti->oti_txns == 1); - oti->oti_txns--; - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - result = dt_txn_hook_stop(env, th); - if (result != 0) - CERROR("Failure in transaction hook: %d\n", result); - oh->ot_handle = NULL; - result = journal_stop(hdl); - if (result != 0) - CERROR("Failure to stop transaction: %d\n", result); - } - EXIT; + buf = &osd_oti_get(env)->oti_buf; + buf->lb_buf = area; + buf->lb_len = len; + return buf; } /* @@ -917,8 +908,8 @@ static const int osd_dto_credits_noquota[DTO_NR] = { /** * Xattr set. The same as xattr of EXT3. * DATA_TRANS_BLOCKS(14) - * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are - * also counted in. Do not know why? + * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS + * are also counted in. Do not know why? */ [DTO_XATTR_SET] = 14, [DTO_LOG_REC] = 14, @@ -932,9 +923,9 @@ static const int osd_dto_credits_noquota[DTO_NR] = { [DTO_WRITE_BLOCK] = 14, /** * Attr set credits for chown. - * 3 (inode bit, group, GDT) + * This is extra credits for setattr, and it is null without quota */ - [DTO_ATTR_SET_CHOWN]= 3 + [DTO_ATTR_SET_CHOWN]= 0 }; /** @@ -956,7 +947,7 @@ static const int osd_dto_credits_quota[DTO_NR] = { [DTO_INDEX_DELETE] = 20, /** * Unused now. - */ + */ [DTO_IDNEX_UPDATE] = 16, /* * Create a object. Same as create object in EXT3 filesystem. @@ -972,7 +963,7 @@ static const int osd_dto_credits_quota[DTO_NR] = { * INDEX_EXTRA_BLOCKS(8) + * 3(inode bits, groups, GDT) + * QUOTA(?) - */ + */ [DTO_OBJECT_DELETE] = 27, /** * Attr set credits. @@ -997,11 +988,11 @@ static const int osd_dto_credits_quota[DTO_NR] = { [DTO_WRITE_BLOCK] = 16, /** * Attr set credits for chown. - * 3 (inode bit, group, GDT) + + * It is added to already set setattr credits * 2 * QUOTA_INIT_BLOCKS(25) + * 2 * QUOTA_DEL_BLOCKS(9) */ - [DTO_ATTR_SET_CHOWN]= 71 + [DTO_ATTR_SET_CHOWN]= 68, }; static int osd_credit_get(const struct lu_env *env, struct dt_device *d, @@ -1041,7 +1032,7 @@ static void osd_object_read_lock(const struct lu_env *env, LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_owner != env); - down_read_nested(&obj->oo_sem, role); + cfs_down_read_nested(&obj->oo_sem, role); LASSERT(obj->oo_owner == NULL); oti->oti_r_locks++; @@ -1056,7 +1047,7 @@ static void osd_object_write_lock(const struct lu_env *env, LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_owner != env); - down_write_nested(&obj->oo_sem, role); + cfs_down_write_nested(&obj->oo_sem, role); LASSERT(obj->oo_owner == NULL); obj->oo_owner = env; @@ -1073,7 +1064,7 @@ static void osd_object_read_unlock(const struct lu_env *env, LASSERT(oti->oti_r_locks > 0); oti->oti_r_locks--; - up_read(&obj->oo_sem); + cfs_up_read(&obj->oo_sem); } static void osd_object_write_unlock(const struct lu_env *env, @@ -1088,7 +1079,17 @@ static void osd_object_write_unlock(const struct lu_env *env, LASSERT(oti->oti_w_locks > 0); oti->oti_w_locks--; obj->oo_owner = NULL; - up_write(&obj->oo_sem); + cfs_up_write(&obj->oo_sem); +} + +static int osd_object_write_locked(const struct lu_env *env, + struct dt_object *dt) +{ + struct osd_object *obj = osd_dt_obj(dt); + + LINVRNT(osd_invariant(obj)); + + return obj->oo_owner == env; } static int capa_is_sane(const struct lu_env *env, @@ -1117,14 +1118,14 @@ static int capa_is_sane(const struct lu_env *env, RETURN(-ESTALE); } - spin_lock(&capa_lock); + cfs_spin_lock(&capa_lock); for (i = 0; i < 2; i++) { if (keys[i].lk_keyid == capa->lc_keyid) { oti->oti_capa_key = keys[i]; break; } } - spin_unlock(&capa_lock); + cfs_spin_unlock(&capa_lock); if (i == 2) { DEBUG_CAPA(D_ERROR, capa, "no matched capa key"); @@ -1191,60 +1192,58 @@ static int osd_object_auth(const struct lu_env *env, struct dt_object *dt, return 0; } -static int osd_attr_get(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct lustre_capa *capa) +static struct timespec *osd_inode_time(const struct lu_env *env, + struct inode *inode, __u64 seconds) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *oti = osd_oti_get(env); + struct timespec *t = &oti->oti_time; - LASSERT(dt_object_exists(dt)); - LINVRNT(osd_invariant(obj)); + t->tv_sec = seconds; + t->tv_nsec = 0; + *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb)); + return t; +} - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) - return -EACCES; - spin_lock(&obj->oo_guard); - osd_inode_getattr(env, obj->oo_inode, attr); - spin_unlock(&obj->oo_guard); - return 0; +static void osd_inode_getattr(const struct lu_env *env, + struct inode *inode, struct lu_attr *attr) +{ + attr->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | + LA_SIZE | LA_BLOCKS | LA_UID | LA_GID | + LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE; + + attr->la_atime = LTIME_S(inode->i_atime); + attr->la_mtime = LTIME_S(inode->i_mtime); + attr->la_ctime = LTIME_S(inode->i_ctime); + attr->la_mode = inode->i_mode; + attr->la_size = i_size_read(inode); + attr->la_blocks = inode->i_blocks; + attr->la_uid = inode->i_uid; + attr->la_gid = inode->i_gid; + attr->la_flags = LDISKFS_I(inode)->i_flags; + attr->la_nlink = inode->i_nlink; + attr->la_rdev = inode->i_rdev; + attr->la_blksize = ll_inode_blksize(inode); + attr->la_blkbits = inode->i_blkbits; } -static int osd_attr_set(const struct lu_env *env, +static int osd_attr_get(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, - struct thandle *handle, + struct lu_attr *attr, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); - int rc; - LASSERT(handle != NULL); LASSERT(dt_object_exists(dt)); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) return -EACCES; - spin_lock(&obj->oo_guard); - rc = osd_inode_setattr(env, obj->oo_inode, attr); - spin_unlock(&obj->oo_guard); - - if (!rc) - mark_inode_dirty(obj->oo_inode); - return rc; -} - -static struct timespec *osd_inode_time(const struct lu_env *env, - struct inode *inode, __u64 seconds) -{ - struct osd_thread_info *oti = osd_oti_get(env); - struct timespec *t = &oti->oti_time; - - t->tv_sec = seconds; - t->tv_nsec = 0; - *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb)); - return t; + cfs_spin_lock(&obj->oo_guard); + osd_inode_getattr(env, obj->oo_inode, attr); + cfs_spin_unlock(&obj->oo_guard); + return 0; } static int osd_inode_setattr(const struct lu_env *env, @@ -1263,7 +1262,11 @@ static int osd_inode_setattr(const struct lu_env *env, struct iattr iattr; int rc; - iattr.ia_valid = bits & (LA_UID | LA_GID); + iattr.ia_valid = 0; + if (bits & LA_UID) + iattr.ia_valid |= ATTR_UID; + if (bits & LA_GID) + iattr.ia_valid |= ATTR_GID; iattr.ia_uid = attr->la_uid; iattr.ia_gid = attr->la_gid; osd_push_ctxt(env, save); @@ -1284,11 +1287,10 @@ static int osd_inode_setattr(const struct lu_env *env, LDISKFS_I(inode)->i_disksize = attr->la_size; i_size_write(inode, attr->la_size); } -# if 0 - /* - * OSD should not change "i_blocks" which is used by quota. - * "i_blocks" should be changed by ldiskfs only. - * Disable this assignment until SOM to fix some EA field. */ + +#if 0 + /* OSD should not change "i_blocks" which is used by quota. + * "i_blocks" should be changed by ldiskfs only. */ if (bits & LA_BLOCKS) inode->i_blocks = attr->la_blocks; #endif @@ -1304,21 +1306,41 @@ static int osd_inode_setattr(const struct lu_env *env, if (bits & LA_RDEV) inode->i_rdev = attr->la_rdev; - if (bits & LA_FLAGS) { - struct ldiskfs_inode_info *li = LDISKFS_I(inode); - - li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) | - (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE); - } + if (bits & LA_FLAGS) + inode->i_flags = ll_ext_to_inode_flags(attr->la_flags); return 0; } +static int osd_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *handle, + struct lustre_capa *capa) +{ + struct osd_object *obj = osd_dt_obj(dt); + int rc; + + LASSERT(handle != NULL); + LASSERT(dt_object_exists(dt)); + LASSERT(osd_invariant(obj)); + + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) + return -EACCES; + + cfs_spin_lock(&obj->oo_guard); + rc = osd_inode_setattr(env, obj->oo_inode, attr); + cfs_spin_unlock(&obj->oo_guard); + + if (!rc) + mark_inode_dirty(obj->oo_inode); + return rc; +} + /* * Object creation. * * XXX temporary solution. */ - static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct thandle *th) { @@ -1328,31 +1350,10 @@ static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj, static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct thandle *th) { - LASSERT(obj->oo_inode != NULL); - osd_object_init0(obj); return 0; } -extern struct inode *ldiskfs_create_inode(handle_t *handle, - struct inode * dir, int mode); -extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry, - struct inode *inode); -extern int ldiskfs_delete_entry(handle_t *handle, - struct inode * dir, - struct ldiskfs_dir_entry_2 * de_del, - struct buffer_head * bh); -extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry, - struct ldiskfs_dir_entry_2 - ** res_dir); -extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir, - struct inode *inode); - -extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode, - int name_index, const char *name, - const void *value, size_t value_len, - int flags); - static struct dentry * osd_child_dentry_get(const struct lu_env *env, struct osd_object *obj, const char *name, @@ -1375,7 +1376,7 @@ static struct dentry * osd_child_dentry_get(const struct lu_env *env, static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, - umode_t mode, + cfs_umode_t mode, struct dt_allocation_hint *hint, struct thandle *th) { @@ -1419,14 +1420,6 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, return result; } - -extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize, - int recsize, handle_t *handle); - -extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize, - int recsize, handle_t *handle); - - enum { OSD_NAME_LEN = 255 }; @@ -1454,7 +1447,7 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, */ result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4, - sizeof (struct lu_fid_pack), + sizeof (struct osd_fid_pack), oth->ot_handle); } return result; @@ -1525,7 +1518,7 @@ static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj, struct dt_object_format *dof, struct thandle *th) { - umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX); + cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX); int result; LINVRNT(osd_invariant(obj)); @@ -1578,7 +1571,7 @@ static osd_obj_type_f osd_create_type_f(enum dt_format_type type) static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, - struct dt_object *parent, umode_t child_mode) + struct dt_object *parent, cfs_umode_t child_mode) { LASSERT(ah); @@ -1689,12 +1682,11 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, *t = inode->i_ctime; rc = inode->i_op->setxattr(dentry, name, buf->lb_buf, buf->lb_len, fs_flags); - if (likely(rc == 0)) { - spin_lock(&obj->oo_guard); - inode->i_ctime = *t; - spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); - } + /* ctime should not be updated with server-side time. */ + cfs_spin_lock(&obj->oo_guard); + inode->i_ctime = *t; + cfs_spin_unlock(&obj->oo_guard); + mark_inode_dirty(inode); return rc; } @@ -1714,67 +1706,126 @@ static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt, struct osd_thread_info *info = osd_oti_get(env); struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; - fid_cpu_to_be(&mdt_attrs->lma_self_fid, fid); - + lustre_lma_init(mdt_attrs, fid); + lustre_lma_swab(mdt_attrs); return __osd_xattr_set(env, dt, osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs), - MDT_XATTR_NAME, LU_XATTR_CREATE); + XATTR_NAME_LMA, LU_XATTR_CREATE); } /** * Helper function to form igif */ -static inline void osd_igif_get(const struct lu_env *env, struct dentry *dentry, +static inline void osd_igif_get(const struct lu_env *env, struct inode *inode, struct lu_fid *fid) { - struct inode *inode = dentry->d_inode; lu_igif_build(fid, inode->i_ino, inode->i_generation); } /** - * Helper function to pack the fid + * Helper function to pack the fid, ldiskfs stores fid in packed format. + */ +void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid, + struct lu_fid *befider) +{ + fid_cpu_to_be(befider, (struct lu_fid *)fid); + memcpy(pack->fp_area, befider, sizeof(*befider)); + pack->fp_len = sizeof(*befider) + 1; +} + +/** + * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata. + * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs. + * To have compatilibility with 1.8 ldiskfs driver we need to have + * magic number at start of fid data. + * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs. + * its inmemory API. */ -static inline void osd_fid_pack(const struct lu_env *env, const struct lu_fid *fid, - struct lu_fid_pack *pack) +void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param, + const struct dt_rec *fid) +{ + param->edp_magic = LDISKFS_LUFID_MAGIC; + param->edp_len = sizeof(struct lu_fid) + 1; + + fid_cpu_to_be((struct lu_fid *)param->edp_data, + (struct lu_fid *)fid); +} + +int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack) { - fid_pack(pack, fid, &osd_oti_get(env)->oti_fid); + int result; + + result = 0; + switch (pack->fp_len) { + case sizeof *fid + 1: + memcpy(fid, pack->fp_area, sizeof *fid); + fid_be_to_cpu(fid, fid); + break; + default: + CERROR("Unexpected packed fid size: %d\n", pack->fp_len); + result = -EIO; + } + return result; } /** * Try to read the fid from inode ea into dt_rec, if return value * i.e. rc is +ve, then we got fid, otherwise we will have to form igif * - * \param rec, the data-structure into which fid/igif is read + * \param fid object fid. * - * \retval 0, on success + * \retval 0 on success */ -static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry, - struct dt_rec *rec) +static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj, + __u32 ino, struct lu_fid *fid) { - struct inode *inode = dentry->d_inode; struct osd_thread_info *info = osd_oti_get(env); struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; - struct lu_fid *fid = &info->oti_fid; - int rc; + struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; + struct dentry *dentry = &info->oti_child_dentry; + struct osd_inode_id *id = &info->oti_id; + struct osd_device *dev; + struct inode *inode; + int rc; - LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); + ENTRY; + dev = osd_dev(ldev); + + id->oii_ino = ino; + id->oii_gen = OSD_OII_NOGEN; + + inode = osd_iget(info, dev, id); + if (IS_ERR(inode)) { + rc = PTR_ERR(inode); + GOTO(out,rc); + } + dentry->d_inode = inode; - rc = inode->i_op->getxattr(dentry, MDT_XATTR_NAME, (void *)mdt_attrs, + LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); + rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs, sizeof *mdt_attrs); + /* Check LMA compatibility */ + if (rc > 0 && + (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) { + CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n", + inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) & + ~LMA_INCOMPAT_SUPP); + return -ENOSYS; + } + if (rc > 0) { - fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid); + lustre_lma_swab(mdt_attrs); + memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid)); rc = 0; } else if (rc == -ENODATA) { - osd_igif_get(env, dentry, fid); + osd_igif_get(env, inode, fid); rc = 0; } - - if (rc == 0) - osd_fid_pack(env, fid, (struct lu_fid_pack*)rec); - - return rc; + iput(inode); +out: + RETURN(rc); } /** @@ -1795,7 +1846,6 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, struct osd_object *obj = osd_dt_obj(dt); struct osd_thread_info *info = osd_oti_get(env); int result; - int is_root = 0; ENTRY; @@ -1806,11 +1856,8 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, result = __osd_object_create(info, obj, attr, hint, dof, th); - if (hint && hint->dah_parent) - is_root = osd_object_is_root(osd_dt_obj(hint->dah_parent)); - /* objects under osd root shld have igif fid, so dont add fid EA */ - if (result == 0 && is_root == 0) + if (result == 0 && fid_seq(fid) >= FID_SEQ_DISTRIBUTED_START) result = osd_ea_fid_set(env, dt, fid); if (result == 0) @@ -1836,10 +1883,10 @@ static void osd_object_ref_add(const struct lu_env *env, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - spin_lock(&obj->oo_guard); + cfs_spin_lock(&obj->oo_guard); LASSERT(inode->i_nlink < LDISKFS_LINK_MAX); inode->i_nlink++; - spin_unlock(&obj->oo_guard); + cfs_spin_unlock(&obj->oo_guard); mark_inode_dirty(inode); LINVRNT(osd_invariant(obj)); } @@ -1859,10 +1906,10 @@ static void osd_object_ref_del(const struct lu_env *env, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - spin_lock(&obj->oo_guard); + cfs_spin_lock(&obj->oo_guard); LASSERT(inode->i_nlink > 0); inode->i_nlink--; - spin_unlock(&obj->oo_guard); + cfs_spin_unlock(&obj->oo_guard); mark_inode_dirty(inode); LINVRNT(osd_invariant(obj)); } @@ -1892,7 +1939,6 @@ static int osd_xattr_get(const struct lu_env *env, return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len); } - /* * Concurrency: @dt is write locked. */ @@ -1959,13 +2005,11 @@ static int osd_xattr_del(const struct lu_env *env, dentry->d_inode = inode; *t = inode->i_ctime; rc = inode->i_op->removexattr(dentry, name); - if (likely(rc == 0)) { - /* ctime should not be updated with server-side time. */ - spin_lock(&obj->oo_guard); - inode->i_ctime = *t; - spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); - } + /* ctime should not be updated with server-side time. */ + cfs_spin_lock(&obj->oo_guard); + inode->i_ctime = *t; + cfs_spin_unlock(&obj->oo_guard); + mark_inode_dirty(inode); return rc; } @@ -2011,9 +2055,9 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, __u32 d[4], s[4]; s[0] = obj->oo_inode->i_uid; - get_random_bytes(&(s[1]), sizeof(__u32)); + ll_get_random_bytes(&(s[1]), sizeof(__u32)); s[2] = obj->oo_inode->i_gid; - get_random_bytes(&(s[3]), sizeof(__u32)); + ll_get_random_bytes(&(s[3]), sizeof(__u32)); rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN); if (unlikely(rc)) RETURN(ERR_PTR(rc)); @@ -2039,9 +2083,9 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, RETURN(oc); } - spin_lock(&capa_lock); + cfs_spin_lock(&capa_lock); *key = dev->od_capa_keys[1]; - spin_unlock(&capa_lock); + cfs_spin_unlock(&capa_lock); capa->lc_keyid = key->lk_keyid; capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout; @@ -2076,11 +2120,171 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) RETURN(rc); } +/* + * Get the 64-bit version for an inode. + */ +static dt_obj_version_t osd_object_version_get(const struct lu_env *env, + struct dt_object *dt) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n", + LDISKFS_I(inode)->i_fs_version, inode->i_ino); + return LDISKFS_I(inode)->i_fs_version; +} + +/* + * Set the 64-bit version and return the old version. + */ +static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt, + dt_obj_version_t new_version) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n", + new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino); + LDISKFS_I(inode)->i_fs_version = new_version; + /** Version is set after all inode operations are finished, + * so we should mark it dirty here */ + inode->i_sb->s_op->dirty_inode(inode); +} + +static int osd_data_get(const struct lu_env *env, struct dt_object *dt, + void **data) +{ + struct osd_object *obj = osd_dt_obj(dt); + ENTRY; + + *data = (void *)obj->oo_inode; + RETURN(0); +} + +/* + * Index operations. + */ + +static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o, + const struct dt_index_features *feat) +{ + struct iam_descr *descr; + + if (osd_object_is_root(o)) + return feat == &dt_directory_features; + + LASSERT(o->oo_dir != NULL); + + descr = o->oo_dir->od_container.ic_descr; + if (feat == &dt_directory_features) { + if (descr->id_rec_size == sizeof(struct osd_fid_pack)) + return 1; + else + return 0; + } else { + return + feat->dif_keysize_min <= descr->id_key_size && + descr->id_key_size <= feat->dif_keysize_max && + feat->dif_recsize_min <= descr->id_rec_size && + descr->id_rec_size <= feat->dif_recsize_max && + !(feat->dif_flags & (DT_IND_VARKEY | + DT_IND_VARREC | DT_IND_NONUNQ)) && + ergo(feat->dif_flags & DT_IND_UPDATE, + 1 /* XXX check that object (and file system) is + * writable */); + } +} + +static int osd_iam_container_init(const struct lu_env *env, + struct osd_object *obj, + struct osd_directory *dir) +{ + int result; + struct iam_container *bag; + + bag = &dir->od_container; + result = iam_container_init(bag, &dir->od_descr, obj->oo_inode); + if (result == 0) { + result = iam_container_setup(bag); + if (result == 0) + obj->oo_dt.do_index_ops = &osd_index_iam_ops; + else + iam_container_fini(bag); + } + return result; +} + + +/* + * Concurrency: no external locking is necessary. + */ +static int osd_index_try(const struct lu_env *env, struct dt_object *dt, + const struct dt_index_features *feat) +{ + int result; + int ea_dir = 0; + struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); + + LINVRNT(osd_invariant(obj)); + LASSERT(dt_object_exists(dt)); + + if (osd_object_is_root(obj)) { + dt->do_index_ops = &osd_index_ea_ops; + result = 0; + } else if (feat == &dt_directory_features && osd->od_iop_mode) { + dt->do_index_ops = &osd_index_ea_ops; + if (S_ISDIR(obj->oo_inode->i_mode)) + result = 0; + else + result = -ENOTDIR; + ea_dir = 1; + } else if (!osd_has_index(obj)) { + struct osd_directory *dir; + + OBD_ALLOC_PTR(dir); + if (dir != NULL) { + + cfs_spin_lock(&obj->oo_guard); + if (obj->oo_dir == NULL) + obj->oo_dir = dir; + else + /* + * Concurrent thread allocated container data. + */ + OBD_FREE_PTR(dir); + cfs_spin_unlock(&obj->oo_guard); + /* + * Now, that we have container data, serialize its + * initialization. + */ + cfs_down_write(&obj->oo_ext_idx_sem); + /* + * recheck under lock. + */ + if (!osd_has_index(obj)) + result = osd_iam_container_init(env, obj, dir); + else + result = 0; + cfs_up_write(&obj->oo_ext_idx_sem); + } else + result = -ENOMEM; + } else + result = 0; + + if (result == 0 && ea_dir == 0) { + if (!osd_iam_index_probe(env, obj, feat)) + result = -ENOTDIR; + } + LINVRNT(osd_invariant(obj)); + + return result; +} + static const struct dt_object_operations osd_obj_ops = { .do_read_lock = osd_object_read_lock, .do_write_lock = osd_object_write_lock, .do_read_unlock = osd_object_read_unlock, .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, .do_attr_get = osd_attr_get, .do_attr_set = osd_attr_set, .do_ah_init = osd_ah_init, @@ -2094,6 +2298,9 @@ static const struct dt_object_operations osd_obj_ops = { .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, + .do_version_get = osd_object_version_get, + .do_version_set = osd_object_version_set, + .do_data_get = osd_data_get, }; /** @@ -2105,6 +2312,7 @@ static const struct dt_object_operations osd_obj_ea_ops = { .do_write_lock = osd_object_write_lock, .do_read_unlock = osd_object_read_unlock, .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, .do_attr_get = osd_attr_get, .do_attr_set = osd_attr_set, .do_ah_init = osd_ah_init, @@ -2118,6 +2326,9 @@ static const struct dt_object_operations osd_obj_ea_ops = { .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, + .do_version_get = osd_object_version_get, + .do_version_set = osd_object_version_set, + .do_data_get = osd_data_get, }; /* @@ -2135,203 +2346,221 @@ static const struct dt_object_operations osd_obj_ea_ops = { * * which doesn't work for globally shared files like /last-received. */ -int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs); -int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize, - loff_t *offs, handle_t *handle); - -static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos, - struct lustre_capa *capa) +static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen) { - struct inode *inode = osd_dt_obj(dt)->oo_inode; + struct ldiskfs_inode_info *ei = LDISKFS_I(inode); - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) - RETURN(-EACCES); + memcpy(buffer, (char*)ei->i_data, buflen); - return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos); + return buflen; } -static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, struct lustre_capa *capa, - int ignore_quota) +static int osd_ldiskfs_read(struct inode *inode, void *buf, int size, + loff_t *offs) { - struct inode *inode = osd_dt_obj(dt)->oo_inode; - struct osd_thandle *oh; - ssize_t result; -#ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = current->cap_effective; -#endif - - LASSERT(handle != NULL); + struct buffer_head *bh; + unsigned long block; + int osize = size; + int blocksize; + int csize; + int boffs; + int err; - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE)) - RETURN(-EACCES); + /* prevent reading after eof */ + spin_lock(&inode->i_lock); + if (i_size_read(inode) < *offs + size) { + size = i_size_read(inode) - *offs; + spin_unlock(&inode->i_lock); + if (size < 0) { + CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n", + i_size_read(inode), *offs); + return -EBADR; + } else if (size == 0) { + return 0; + } + } else { + spin_unlock(&inode->i_lock); + } - oh = container_of(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle->h_transaction != NULL); -#ifdef HAVE_QUOTA_SUPPORT - if (ignore_quota) - current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; - else - current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; -#endif - result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len, - pos, oh->ot_handle); -#ifdef HAVE_QUOTA_SUPPORT - current->cap_effective = save; -#endif - if (result == 0) - result = buf->lb_len; - return result; -} + blocksize = 1 << inode->i_blkbits; -static const struct dt_body_operations osd_body_ops = { - .dbo_read = osd_read, - .dbo_write = osd_write -}; + while (size > 0) { + block = *offs >> inode->i_blkbits; + boffs = *offs & (blocksize - 1); + csize = min(blocksize - boffs, size); + bh = ldiskfs_bread(NULL, inode, block, 0, &err); + if (!bh) { + CERROR("can't read block: %d\n", err); + return err; + } -/* - * Index operations. - */ + memcpy(buf, bh->b_data + boffs, csize); + brelse(bh); -static int osd_object_is_root(const struct osd_object *obj) -{ - return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode; + *offs += csize; + buf += csize; + size -= csize; + } + return osize; } -static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o, - const struct dt_index_features *feat) +static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *buf, loff_t *pos, + struct lustre_capa *capa) { - struct iam_descr *descr; - struct dt_object *dt = &o->oo_dt; - - if (osd_object_is_root(o)) - return feat == &dt_directory_features; - - LASSERT(o->oo_dir != NULL); - - descr = o->oo_dir->od_container.ic_descr; - if (feat == &dt_directory_features) { - if (descr->id_rec_size == sizeof(struct lu_fid_pack)) - return 1; - - if (descr == &iam_htree_compat_param) { - /* if it is a HTREE dir then there is good chance that, - * we dealing with ext3 directory here with no FIDs. */ - - if (descr->id_rec_size == - sizeof ((struct ldiskfs_dir_entry_2 *)NULL)->inode) { + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + int rc; - dt->do_index_ops = &osd_index_ea_ops; - return 1; - } - } - return 0; - } else { - return - feat->dif_keysize_min <= descr->id_key_size && - descr->id_key_size <= feat->dif_keysize_max && - feat->dif_recsize_min <= descr->id_rec_size && - descr->id_rec_size <= feat->dif_recsize_max && - !(feat->dif_flags & (DT_IND_VARKEY | - DT_IND_VARREC | DT_IND_NONUNQ)) && - ergo(feat->dif_flags & DT_IND_UPDATE, - 1 /* XXX check that object (and file system) is - * writable */); - } -} + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) + RETURN(-EACCES); -static int osd_iam_container_init(const struct lu_env *env, - struct osd_object *obj, - struct osd_directory *dir) -{ - int result; - struct iam_container *bag; + /* Read small symlink from inode body as we need to maintain correct + * on-disk symlinks for ldiskfs. + */ + if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) && + (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data))) + rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len); + else + rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos); - bag = &dir->od_container; - result = iam_container_init(bag, &dir->od_descr, obj->oo_inode); - if (result == 0) { - result = iam_container_setup(bag); - if (result == 0) - obj->oo_dt.do_index_ops = &osd_index_iam_ops; - else - iam_container_fini(bag); - } - return result; + return rc; } -/* - * Concurrency: no external locking is necessary. - */ -static int osd_index_try(const struct lu_env *env, struct dt_object *dt, - const struct dt_index_features *feat) +static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen) { - int result; - int ea_dir = 0; - struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); - LINVRNT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer, + buflen); + LDISKFS_I(inode)->i_disksize = buflen; + i_size_write(inode, buflen); + inode->i_sb->s_op->dirty_inode(inode); - if (osd_object_is_root(obj)) { - dt->do_index_ops = &osd_index_ea_ops; - result = 0; - } else if (feat == &dt_directory_features && osd->od_iop_mode) { - dt->do_index_ops = &osd_index_ea_ops; - if (S_ISDIR(obj->oo_inode->i_mode)) - result = 0; - else - result = -ENOTDIR; - ea_dir = 1; - } else if (!osd_has_index(obj)) { - struct osd_directory *dir; + return 0; +} - OBD_ALLOC_PTR(dir); - if (dir != NULL) { - sema_init(&dir->od_sem, 1); +static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize, + loff_t *offs, handle_t *handle) +{ + struct buffer_head *bh = NULL; + loff_t offset = *offs; + loff_t new_size = i_size_read(inode); + unsigned long block; + int blocksize = 1 << inode->i_blkbits; + int err = 0; + int size; + int boffs; + int dirty_inode = 0; + + while (bufsize > 0) { + if (bh != NULL) + brelse(bh); + + block = offset >> inode->i_blkbits; + boffs = offset & (blocksize - 1); + size = min(blocksize - boffs, bufsize); + bh = ldiskfs_bread(handle, inode, block, 1, &err); + if (!bh) { + CERROR("can't read/create block: %d\n", err); + break; + } - spin_lock(&obj->oo_guard); - if (obj->oo_dir == NULL) - obj->oo_dir = dir; - else - /* - * Concurrent thread allocated container data. - */ - OBD_FREE_PTR(dir); - spin_unlock(&obj->oo_guard); - /* - * Now, that we have container data, serialize its - * initialization. - */ - down(&obj->oo_dir->od_sem); - /* - * recheck under lock. - */ - if (!osd_has_index(obj)) - result = osd_iam_container_init(env, obj, dir); - else - result = 0; - up(&obj->oo_dir->od_sem); - } else - result = -ENOMEM; - } else - result = 0; + err = ldiskfs_journal_get_write_access(handle, bh); + if (err) { + CERROR("journal_get_write_access() returned error %d\n", + err); + break; + } + LASSERTF(boffs + size <= bh->b_size, + "boffs %d size %d bh->b_size %lu", + boffs, size, (unsigned long)bh->b_size); + memcpy(bh->b_data + boffs, buf, size); + err = ldiskfs_journal_dirty_metadata(handle, bh); + if (err) + break; - if (result == 0 && ea_dir == 0) { - if (!osd_iam_index_probe(env, obj, feat)) - result = -ENOTDIR; + if (offset + size > new_size) + new_size = offset + size; + offset += size; + bufsize -= size; + buf += size; } - LINVRNT(osd_invariant(obj)); + if (bh) + brelse(bh); + + /* correct in-core and on-disk sizes */ + if (new_size > i_size_read(inode)) { + spin_lock(&inode->i_lock); + if (new_size > i_size_read(inode)) + i_size_write(inode, new_size); + if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) { + LDISKFS_I(inode)->i_disksize = i_size_read(inode); + dirty_inode = 1; + } + spin_unlock(&inode->i_lock); + if (dirty_inode) + inode->i_sb->s_op->dirty_inode(inode); + } + + if (err == 0) + *offs = offset; + return err; +} + +static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *handle, struct lustre_capa *capa, + int ignore_quota) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thandle *oh; + ssize_t result = 0; +#ifdef HAVE_QUOTA_SUPPORT + cfs_cap_t save = current->cap_effective; +#endif + + LASSERT(handle != NULL); + + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE)) + RETURN(-EACCES); + oh = container_of(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle->h_transaction != NULL); +#ifdef HAVE_QUOTA_SUPPORT + if (ignore_quota) + current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; + else + current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; +#endif + /* Write small symlink to inode body as we need to maintain correct + * on-disk symlinks for ldiskfs. + */ + if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) && + (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data))) + result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len); + else + result = osd_ldiskfs_write_record(inode, buf->lb_buf, + buf->lb_len, pos, + oh->ot_handle); +#ifdef HAVE_QUOTA_SUPPORT + current->cap_effective = save; +#endif + if (result == 0) + result = buf->lb_len; return result; } +static const struct dt_body_operations osd_body_ops = { + .dbo_read = osd_read, + .dbo_write = osd_write +}; + + /** * delete a (key, value) pair from index \a dt specified by \a key * - * \param dt_object osd index object + * \param dt osd index object * \param key key for index * \param rec record reference * \param handle transaction handler @@ -2374,6 +2603,19 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, + struct dt_rec *fid) +{ + struct osd_fid_pack *rec; + int rc = -ENODATA; + + if (de->file_type & LDISKFS_DIRENT_LUFID) { + rec = (struct osd_fid_pack *) (de->name + de->name_len + 1); + rc = osd_fid_unpack((struct lu_fid *)fid, rec); + } + RETURN(rc); +} + /** * Index delete function for interoperability mode (b11826). * It will remove the directory entry added by osd_index_ea_insert(). @@ -2412,16 +2654,29 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); - bh = ldiskfs_find_entry(dentry, &de); + + cfs_down_write(&obj->oo_ext_idx_sem); + bh = ll_ldiskfs_find_entry(dir, dentry, &de); if (bh) { + struct osd_thread_info *oti = osd_oti_get(env); + struct timespec *ctime = &oti->oti_time; + struct timespec *mtime = &oti->oti_time2; + + *ctime = dir->i_ctime; + *mtime = dir->i_mtime; rc = ldiskfs_delete_entry(oh->ot_handle, dir, de, bh); - if (!rc) - mark_inode_dirty(dir); + /* xtime should not be updated with server-side time. */ + cfs_spin_lock(&obj->oo_guard); + dir->i_ctime = *ctime; + dir->i_mtime = *mtime; + cfs_spin_unlock(&obj->oo_guard); + mark_inode_dirty(dir); brelse(bh); } else rc = -ENOENT; + cfs_up_write(&obj->oo_ext_idx_sem); LASSERT(osd_invariant(obj)); RETURN(rc); } @@ -2429,7 +2684,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, /** * Lookup index for \a key and copy record to \a rec. * - * \param dt_object osd index object + * \param dt osd index object * \param key key for index * \param rec record reference * @@ -2446,6 +2701,7 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, struct iam_container *bag = &obj->oo_dir->od_container; struct osd_thread_info *oti = osd_oti_get(env); struct iam_iterator *it = &oti->oti_idx_it; + struct iam_rec *iam_rec; int rc; ENTRY; @@ -2464,9 +2720,17 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, iam_it_init(it, bag, 0, ipd); rc = iam_it_get(it, (struct iam_key *)key); - if (rc >= 0) - iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec); + if (rc >= 0) { + if (S_ISDIR(obj->oo_inode->i_mode)) + iam_rec = (struct iam_rec *)oti->oti_ldp; + else + iam_rec = (struct iam_rec *) rec; + iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec); + if (S_ISDIR(obj->oo_inode->i_mode)) + osd_fid_unpack((struct lu_fid *) rec, + (struct osd_fid_pack *)iam_rec); + } iam_it_put(it); iam_it_fini(it); osd_ipd_put(env, bag, ipd); @@ -2499,6 +2763,8 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, #ifdef HAVE_QUOTA_SUPPORT cfs_cap_t save = current->cap_effective; #endif + struct osd_thread_info *oti = osd_oti_get(env); + struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp; int rc; ENTRY; @@ -2524,8 +2790,12 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, else current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; #endif + if (S_ISDIR(obj->oo_inode->i_mode)) + osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid); + else + iam_rec = (struct iam_rec *) rec; rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key, - (struct iam_rec *)rec, ipd); + iam_rec, ipd); #ifdef HAVE_QUOTA_SUPPORT current->cap_effective = save; #endif @@ -2535,6 +2805,44 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, } /** + * Calls ldiskfs_add_entry() to add directory entry + * into the directory. This is required for + * interoperability mode (b11826) + * + * \retval 0, on success + * \retval -ve, on error + */ +static int __osd_ea_add_rec(struct osd_thread_info *info, + struct osd_object *pobj, + struct inode *cinode, + const char *name, + const struct dt_rec *fid, + struct thandle *th) +{ + struct ldiskfs_dentry_param *ldp; + struct dentry *child; + struct osd_thandle *oth; + int rc; + + oth = container_of(th, struct osd_thandle, ot_super); + LASSERT(oth->ot_handle != NULL); + LASSERT(oth->ot_handle->h_transaction != NULL); + + child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name)); + + if (fid_is_igif((struct lu_fid *)fid) || + fid_seq((struct lu_fid *)fid) >= FID_SEQ_DISTRIBUTED_START) { + ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; + osd_get_ldiskfs_dirent_param(ldp, fid); + child->d_fsdata = (void*) ldp; + } else + child->d_fsdata = NULL; + rc = ldiskfs_add_entry(oth->ot_handle, child, cinode); + + RETURN(rc); +} + +/** * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries * into the directory.Also sets flags into osd object to * indicate dot and dotdot are created. This is required for @@ -2548,11 +2856,14 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, */ static int osd_add_dot_dotdot(struct osd_thread_info *info, struct osd_object *dir, - struct osd_object *obj, const char *name, + struct inode *parent_dir, const char *name, + const struct dt_rec *dot_fid, + const struct dt_rec *dot_dot_fid, struct thandle *th) { - struct inode *parent_dir = obj->oo_inode; struct inode *inode = dir->oo_inode; + struct ldiskfs_dentry_param *dot_ldp; + struct ldiskfs_dentry_param *dot_dot_ldp; struct osd_thandle *oth; int result = 0; @@ -2564,17 +2875,31 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, if (dir->oo_compat_dot_created) { result = -EEXIST; } else { - LASSERT(obj == dir); + LASSERT(inode == parent_dir); dir->oo_compat_dot_created = 1; result = 0; } } else if(strcmp(name, dotdot) == 0) { + dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; + dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2; + if (!dir->oo_compat_dot_created) return -EINVAL; - if (dir->oo_compat_dotdot_created) - return __osd_ea_add_rec(info, dir, obj, name, th); + if (fid_seq((struct lu_fid *) dot_fid) >= FID_SEQ_DISTRIBUTED_START) { + osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid); + osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid); + } else { + dot_ldp = NULL; + dot_dot_ldp = NULL; + } + /* in case of rename, dotdot is already created */ + if (dir->oo_compat_dotdot_created) { + return __osd_ea_add_rec(info, dir, parent_dir, name, + dot_dot_fid, th); + } - result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode); + result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode, + dot_ldp, dot_dot_ldp); if (result == 0) dir->oo_compat_dotdot_created = 1; } @@ -2582,34 +2907,6 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, return result; } -/** - * Calls ldiskfs_add_entry() to add directory entry - * into the directory. This is required for - * interoperability mode (b11826) - * - * \retval 0, on success - * \retval -ve, on error - */ -static int __osd_ea_add_rec(struct osd_thread_info *info, - struct osd_object *pobj, - struct osd_object *cobj, - const char *name, - struct thandle *th) -{ - struct dentry *child; - struct osd_thandle *oth; - struct inode *cinode = cobj->oo_inode; - int rc; - - oth = container_of(th, struct osd_thandle, ot_super); - LASSERT(oth->ot_handle != NULL); - LASSERT(oth->ot_handle->h_transaction != NULL); - - child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name)); - rc = ldiskfs_add_entry(oth->ot_handle, child, cinode); - - RETURN(rc); -} /** * It will call the appropriate osd_add* function and return the @@ -2617,8 +2914,9 @@ static int __osd_ea_add_rec(struct osd_thread_info *info, */ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj, - struct osd_object *cobj, + struct inode *cinode, const char *name, + const struct dt_rec *fid, struct thandle *th) { struct osd_thread_info *info = osd_oti_get(env); @@ -2626,9 +2924,11 @@ static int osd_ea_add_rec(const struct lu_env *env, if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' && name[2] =='\0'))) - rc = osd_add_dot_dotdot(info, pobj, cobj, name, th); + rc = osd_add_dot_dotdot(info, pobj, cinode, name, + (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu), + fid, th); else - rc = __osd_ea_add_rec(info, pobj, cobj, name, th); + rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th); return rc; } @@ -2644,14 +2944,11 @@ static int osd_ea_add_rec(const struct lu_env *env, static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, struct dt_rec *rec, const struct dt_key *key) { - struct inode *dir = obj->oo_inode; - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry; - struct osd_device *dev = osd_dev(obj->oo_dt.do_lu.lo_dev); - struct osd_inode_id *id = &info->oti_id; + struct inode *dir = obj->oo_inode; + struct dentry *dentry; struct ldiskfs_dir_entry_2 *de; struct buffer_head *bh; - struct inode *inode; + struct lu_fid *fid = (struct lu_fid *) rec; int ino; int rc; @@ -2659,34 +2956,31 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); - bh = ldiskfs_find_entry(dentry, &de); + + cfs_down_read(&obj->oo_ext_idx_sem); + bh = ll_ldiskfs_find_entry(dir, dentry, &de); if (bh) { ino = le32_to_cpu(de->inode); - brelse(bh); - id->oii_ino = ino; - id->oii_gen = OSD_OII_NOGEN; - - inode = osd_iget(info, dev, id); - if (!IS_ERR(inode)) { - dentry->d_inode = inode; + rc = osd_get_fid_from_dentry(de, rec); - rc = osd_ea_fid_get(env, dentry, rec); - iput(inode); - } else - rc = -ENOENT; + /* done with de, release bh */ + brelse(bh); + if (rc != 0) + rc = osd_ea_fid_get(env, obj, ino, fid); } else rc = -ENOENT; + cfs_up_read(&obj->oo_ext_idx_sem); RETURN (rc); } /** * Find the osd object for given fid. * - * \param fid, need to find the osd object having this fid + * \param fid need to find the osd object having this fid * - * \retval osd_object, on success - * \retval -ve, on error + * \retval osd_object on success + * \retval -ve on error */ struct osd_object *osd_object_find(const struct lu_env *env, struct dt_object *dt, @@ -2728,7 +3022,7 @@ struct osd_object *osd_object_find(const struct lu_env *env, /** * Put the osd object once done with it. * - * \param obj, osd object that needs to be put + * \param obj osd object that needs to be put */ static inline void osd_object_put(const struct lu_env *env, struct osd_object *obj) @@ -2741,8 +3035,8 @@ static inline void osd_object_put(const struct lu_env *env, * It will add the directory entry.This entry is needed to * maintain name->fid mapping. * - * \param key, it is key i.e. file entry to be inserted - * \param rec, it is value of given key i.e. fid + * \param key it is key i.e. file entry to be inserted + * \param rec it is value of given key i.e. fid * * \retval 0, on success * \retval -ve, on error @@ -2753,12 +3047,11 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, struct lustre_capa *capa, int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); - struct lu_fid *fid = &osd_oti_get(env)->oti_fid; - const struct lu_fid_pack *pack = (const struct lu_fid_pack *)rec; + struct lu_fid *fid = (struct lu_fid *) rec; const char *name = (const char *)key; struct osd_object *child; #ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = current->cap_effective; + cfs_cap_t save = current->cap_effective; #endif int rc; @@ -2771,23 +3064,34 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) RETURN(-EACCES); - rc = fid_unpack(pack, fid); - if (rc != 0) - RETURN(rc); child = osd_object_find(env, dt, fid); if (!IS_ERR(child)) { + struct inode *inode = obj->oo_inode; + struct osd_thread_info *oti = osd_oti_get(env); + struct timespec *ctime = &oti->oti_time; + struct timespec *mtime = &oti->oti_time2; + + *ctime = inode->i_ctime; + *mtime = inode->i_mtime; #ifdef HAVE_QUOTA_SUPPORT if (ignore_quota) current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; else current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; #endif - rc = osd_ea_add_rec(env, obj, child, name, th); - + cfs_down_write(&obj->oo_ext_idx_sem); + rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th); + cfs_up_write(&obj->oo_ext_idx_sem); #ifdef HAVE_QUOTA_SUPPORT current->cap_effective = save; #endif osd_object_put(env, child); + /* xtime should not be updated with server-side time. */ + cfs_spin_lock(&obj->oo_guard); + inode->i_ctime = *ctime; + inode->i_mtime = *mtime; + cfs_spin_unlock(&obj->oo_guard); + mark_inode_dirty(inode); } else { rc = PTR_ERR(child); } @@ -2916,15 +3220,89 @@ static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di) return iam_it_key_size(&it->oi_it); } +static inline void osd_it_append_attrs(struct lu_dirent*ent, + __u32 attr, + int len, + __u16 type) +{ + struct luda_type *lt; + const unsigned align = sizeof(struct luda_type) - 1; + + /* check if file type is required */ + if (attr & LUDA_TYPE) { + len = (len + align) & ~align; + + lt = (void *) ent->lde_name + len; + lt->lt_type = cpu_to_le16(CFS_DTTOIF(type)); + ent->lde_attrs |= LUDA_TYPE; + } + + ent->lde_attrs = cpu_to_le32(ent->lde_attrs); +} + +/** + * build lu direct from backend fs dirent. + */ + +static inline void osd_it_pack_dirent(struct lu_dirent *ent, + struct lu_fid *fid, + __u64 offset, + char *name, + __u16 namelen, + __u16 type, + __u32 attr) +{ + fid_cpu_to_le(&ent->lde_fid, fid); + ent->lde_attrs = LUDA_FID; + + ent->lde_hash = cpu_to_le64(offset); + ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr)); + + strncpy(ent->lde_name, name, namelen); + ent->lde_namelen = cpu_to_le16(namelen); + + /* append lustre attributes */ + osd_it_append_attrs(ent, attr, namelen, type); +} + /** * Return pointer to the record under iterator. */ -static struct dt_rec *osd_it_iam_rec(const struct lu_env *env, - const struct dt_it *di) +static int osd_it_iam_rec(const struct lu_env *env, + const struct dt_it *di, + struct lu_dirent *lde, + __u32 attr) { - struct osd_it_iam *it = (struct osd_it_iam *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; + struct osd_thread_info *info = osd_oti_get(env); + struct lu_fid *fid = &info->oti_fid; + const struct osd_fid_pack *rec; + char *name; + int namelen; + __u64 hash; + int rc; - return (struct dt_rec *)iam_it_rec_get(&it->oi_it); + name = (char *)iam_it_key_get(&it->oi_it); + if (IS_ERR(name)) + RETURN(PTR_ERR(name)); + + namelen = iam_it_key_size(&it->oi_it); + + rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it); + if (IS_ERR(rec)) + RETURN(PTR_ERR(rec)); + + rc = osd_fid_unpack(fid, rec); + if (rc) + RETURN(rc); + + hash = iam_it_store(&it->oi_it); + + /* IAM does not store object type in IAM index (dir) */ + osd_it_pack_dirent(lde, fid, hash, name, namelen, + 0, LUDA_FID); + + return 0; } /** @@ -2996,31 +3374,33 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env, obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); obj_dentry->d_name.hash = 0; - it->oie_namelen = 0; - it->oie_curr_pos = 0; - it->oie_next_pos = 0; + it->oie_rd_dirent = 0; + it->oie_it_dirent = 0; + it->oie_dirent = NULL; + it->oie_buf = info->oti_it_ea_buf; it->oie_obj = obj; + it->oie_file.f_pos = 0; it->oie_file.f_dentry = obj_dentry; it->oie_file.f_mapping = obj->oo_inode->i_mapping; it->oie_file.f_op = obj->oo_inode->i_fop; it->oie_file.private_data = NULL; lu_object_get(lo); - - RETURN((struct dt_it*) it); + RETURN((struct dt_it *) it); } /** * Destroy or finishes iterator context. * - * \param di, struct osd_it_ea, iterator structure to be destroyed + * \param di iterator structure to be destroyed */ static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; - + struct inode *inode = obj->oo_inode; ENTRY; + it->oie_file.f_op->release(inode, &it->oie_file); lu_object_put(env, &obj->oo_dt.do_lu); EXIT; } @@ -3042,9 +3422,10 @@ static int osd_it_ea_get(const struct lu_env *env, ENTRY; LASSERT(((const char *)key)[0] == '\0'); - it->oie_namelen = 0; - it->oie_curr_pos = 0; - it->oie_next_pos = 0; + it->oie_file.f_pos = 0; + it->oie_rd_dirent = 0; + it->oie_it_dirent = 0; + it->oie_dirent = NULL; RETURN(+1); } @@ -3061,36 +3442,52 @@ static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di) * iterator's in-memory data structure with required * information i.e. name, namelen, rec_size etc. * - * \param buf, in which information to be filled in. - * \param name, name of the file in given dir + * \param buf in which information to be filled in. + * \param name name of the file in given dir * - * \retval 0, on success - * \retval 1, on buffer full + * \retval 0 on success + * \retval 1 on buffer full */ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, - loff_t offset, ino_t ino, - unsigned int d_type) + loff_t offset, __u64 ino, + unsigned d_type) { - struct osd_it_ea *it = (struct osd_it_ea *)buf; - struct dirent64 *dirent = &it->oie_dirent64; - int reclen = LDISKFS_DIR_REC_LEN(namelen); - - + struct osd_it_ea *it = (struct osd_it_ea *)buf; + struct osd_it_ea_dirent *ent = it->oie_dirent; + struct lu_fid *fid = &ent->oied_fid; + struct osd_fid_pack *rec; ENTRY; - if (it->oie_namelen) - RETURN(-ENOENT); - if (namelen == 0 || namelen > LDISKFS_NAME_LEN) + /* this should never happen */ + if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) { + CERROR("ldiskfs return invalid namelen %d\n", namelen); RETURN(-EIO); + } - strncpy(dirent->d_name, name, LDISKFS_NAME_LEN); - dirent->d_name[namelen] = 0; - dirent->d_ino = ino; - dirent->d_off = offset; - dirent->d_reclen = reclen; - it->oie_namelen = namelen; - it->oie_curr_pos = offset; + if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen > + OSD_IT_EA_BUFSIZE) + RETURN(1); + if (d_type & LDISKFS_DIRENT_LUFID) { + rec = (struct osd_fid_pack*) (name + namelen + 1); + + if (osd_fid_unpack(fid, rec) != 0) + fid_zero(fid); + + d_type &= ~LDISKFS_DIRENT_LUFID; + } else { + fid_zero(fid); + } + + ent->oied_ino = ino; + ent->oied_off = offset; + ent->oied_namelen = namelen; + ent->oied_type = d_type; + + memcpy(ent->oied_name, name, namelen); + + it->oie_rd_dirent++; + it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen); RETURN(0); } @@ -3098,12 +3495,12 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, * Calls ->readdir() to load a directory entry at a time * and stored it in iterator's in-memory data structure. * - * \param di, struct osd_it_ea, iterator's in memory structure + * \param di iterator's in memory structure * - * \retval 0, on success - * \retval -ve, on error + * \retval 0 on success + * \retval -ve on error */ -int osd_ldiskfs_it_fill(const struct dt_it *di) +static int osd_ldiskfs_it_fill(const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; @@ -3111,16 +3508,21 @@ int osd_ldiskfs_it_fill(const struct dt_it *di) int result = 0; ENTRY; - it->oie_namelen = 0; - it->oie_file.f_pos = it->oie_curr_pos; + it->oie_dirent = it->oie_buf; + it->oie_rd_dirent = 0; + cfs_down_read(&obj->oo_ext_idx_sem); result = inode->i_fop->readdir(&it->oie_file, it, (filldir_t) osd_ldiskfs_filldir); - it->oie_next_pos = it->oie_file.f_pos; + cfs_up_read(&obj->oo_ext_idx_sem); - if(!result && it->oie_namelen == 0) + if (it->oie_rd_dirent == 0) { result = -EIO; + } else { + it->oie_dirent = it->oie_buf; + it->oie_it_dirent = 1; + } RETURN(result); } @@ -3130,11 +3532,11 @@ int osd_ldiskfs_it_fill(const struct dt_it *di) * to load a directory entry at a time and stored it in * iterator's in-memory data structure. * - * \param di, struct osd_it_ea, iterator's in memory structure + * \param di iterator's in memory structure * - * \retval +ve, iterator reached to end - * \retval 0, iterator not reached to end - * \retval -ve, on error + * \retval +ve iterator reached to end + * \retval 0 iterator not reached to end + * \retval -ve on error */ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) { @@ -3142,12 +3544,20 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) int rc; ENTRY; - it->oie_curr_pos = it->oie_next_pos; - if (it->oie_curr_pos == LDISKFS_HTREE_EOF) - rc = +1; - else - rc = osd_ldiskfs_it_fill(di); + if (it->oie_it_dirent < it->oie_rd_dirent) { + it->oie_dirent = + (void *) it->oie_dirent + + cfs_size_round(sizeof(struct osd_it_ea_dirent) + + it->oie_dirent->oied_namelen); + it->oie_it_dirent++; + RETURN(0); + } else { + if (it->oie_file.f_pos == LDISKFS_HTREE_EOF) + rc = +1; + else + rc = osd_ldiskfs_it_fill(di); + } RETURN(rc); } @@ -3155,7 +3565,7 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) /** * Returns the key at current position from iterator's in memory structure. * - * \param di, struct osd_it_ea, iterator's in memory structure + * \param di iterator's in memory structure * * \retval key i.e. struct dt_key on success */ @@ -3164,13 +3574,13 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env, { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN((struct dt_key *)it->oie_dirent64.d_name); + RETURN((struct dt_key *)it->oie_dirent->oied_name); } /** * Returns the key's size at current position from iterator's in memory structure. * - * \param di, struct osd_it_ea, iterator's in memory structure + * \param di iterator's in memory structure * * \retval key_size i.e. struct dt_key on success */ @@ -3178,56 +3588,50 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN(it->oie_namelen); + RETURN(it->oie_dirent->oied_namelen); } + /** * Returns the value (i.e. fid/igif) at current position from iterator's * in memory structure. * - * \param di, struct osd_it_ea, iterator's in memory structure + * \param di struct osd_it_ea, iterator's in memory structure + * \param attr attr requested for dirent. + * \param lde lustre dirent * - * \retval value i.e. struct dt_rec on success + * \retval 0 no error and \param lde has correct lustre dirent. + * \retval -ve on error */ -static struct dt_rec *osd_it_ea_rec(const struct lu_env *env, - const struct dt_it *di) +static inline int osd_it_ea_rec(const struct lu_env *env, + const struct dt_it *di, + struct lu_dirent *lde, + __u32 attr) { - struct osd_it_ea *it = (struct osd_it_ea *)di; - struct osd_object *obj = it->oie_obj; - struct osd_thread_info *info = osd_oti_get(env); - struct osd_inode_id *id = &info->oti_id; - struct lu_fid_pack *rec = &info->oti_pack; - struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; - struct dentry *dentry = &info->oti_child_dentry; - struct osd_device *dev; - struct inode *inode; - int rc; + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + struct lu_fid *fid = &it->oie_dirent->oied_fid; + int rc = 0; ENTRY; - dev = osd_dev(ldev); - id->oii_ino = it->oie_dirent64.d_ino; - id->oii_gen = OSD_OII_NOGEN; - inode = osd_iget(info, dev, id); - if (!IS_ERR(inode)) { - dentry->d_inode = inode; - LASSERT(dentry->d_inode->i_sb == osd_sb(dev)); - } else { - CERROR("Error getting inode for ino =%d", id->oii_ino); - RETURN((struct dt_rec *) PTR_ERR(inode)); - } - rc = osd_ea_fid_get(env, dentry, (struct dt_rec*) rec); - - iput(inode); - RETURN((struct dt_rec *)rec); + if (!fid_is_sane(fid)) + rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid); + if (rc == 0) + osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off, + it->oie_dirent->oied_name, + it->oie_dirent->oied_namelen, + it->oie_dirent->oied_type, + attr); + RETURN(rc); } /** * Returns a cookie for current position of the iterator head, so that * user can use this cookie to load/start the iterator next time. * - * \param di, struct osd_it_ea, iterator's in memory structure + * \param di iterator's in memory structure * * \retval cookie for current position, on success */ @@ -3235,7 +3639,7 @@ static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN(it->oie_curr_pos); + RETURN(it->oie_dirent->oied_off); } /** @@ -3243,10 +3647,10 @@ static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di) * to load a directory entry at a time and stored it i inn, * in iterator's in-memory data structure. * - * \param di, struct osd_it_ea, iterator's in memory structure + * \param di struct osd_it_ea, iterator's in memory structure * - * \retval +ve, on success - * \retval -ve, on error + * \retval +ve on success + * \retval -ve on error */ static int osd_it_ea_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) @@ -3255,7 +3659,7 @@ static int osd_it_ea_load(const struct lu_env *env, int rc; ENTRY; - it->oie_curr_pos = it->oie_next_pos = hash; + it->oie_file.f_pos = hash; rc = osd_ldiskfs_it_fill(di); if (rc == 0) @@ -3263,27 +3667,6 @@ static int osd_it_ea_load(const struct lu_env *env, RETURN(rc); } -/** - * Index and Iterator operations for interoperability - * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826) - */ -static const struct dt_index_operations osd_index_ea_ops = { - .dio_lookup = osd_index_ea_lookup, - .dio_insert = osd_index_ea_insert, - .dio_delete = osd_index_ea_delete, - .dio_it = { - .init = osd_it_ea_init, - .fini = osd_it_ea_fini, - .get = osd_it_ea_get, - .put = osd_it_ea_put, - .next = osd_it_ea_next, - .key = osd_it_ea_key, - .key_size = osd_it_ea_key_size, - .rec = osd_it_ea_rec, - .store = osd_it_ea_store, - .load = osd_it_ea_load - } -}; /** * Index lookup function for interoperability mode (b11826). @@ -3315,14 +3698,26 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/* type constructor/destructor: osd_type_init, osd_type_fini */ -LU_TYPE_INIT_FINI(osd, &osd_key); - -static struct lu_context_key osd_key = { - .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD, - .lct_init = osd_key_init, - .lct_fini = osd_key_fini, - .lct_exit = osd_key_exit +/** + * Index and Iterator operations for interoperability + * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826) + */ +static const struct dt_index_operations osd_index_ea_ops = { + .dio_lookup = osd_index_ea_lookup, + .dio_insert = osd_index_ea_insert, + .dio_delete = osd_index_ea_delete, + .dio_it = { + .init = osd_it_ea_init, + .fini = osd_it_ea_fini, + .get = osd_it_ea_get, + .put = osd_it_ea_put, + .next = osd_it_ea_next, + .key = osd_it_ea_key, + .key_size = osd_it_ea_key_size, + .rec = osd_it_ea_rec, + .store = osd_it_ea_store, + .load = osd_it_ea_load + } }; static void *osd_key_init(const struct lu_context *ctx, @@ -3331,15 +3726,29 @@ static void *osd_key_init(const struct lu_context *ctx, struct osd_thread_info *info; OBD_ALLOC_PTR(info); - if (info != NULL) - info->oti_env = container_of(ctx, struct lu_env, le_ctx); - else + if (info != NULL) { + OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + if (info->oti_it_ea_buf != NULL) { + info->oti_env = container_of(ctx, struct lu_env, + le_ctx); + } else { + OBD_FREE_PTR(info); + info = ERR_PTR(-ENOMEM); + } + } else { info = ERR_PTR(-ENOMEM); + } return info; } -/* context key destructor: osd_key_fini */ -LU_KEY_FINI(osd, struct osd_thread_info); +static void osd_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void* data) +{ + struct osd_thread_info *info = data; + + OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + OBD_FREE_PTR(info); +} static void osd_key_exit(const struct lu_context *ctx, struct lu_context_key *key, void *data) @@ -3351,6 +3760,17 @@ static void osd_key_exit(const struct lu_context *ctx, LASSERT(info->oti_txns == 0); } +/* type constructor/destructor: osd_type_init, osd_type_fini */ +LU_TYPE_INIT_FINI(osd, &osd_key); + +static struct lu_context_key osd_key = { + .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD, + .lct_init = osd_key_init, + .lct_fini = osd_key_fini, + .lct_exit = osd_key_exit +}; + + static int osd_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) { @@ -3389,7 +3809,6 @@ static int osd_mount(const struct lu_env *env, struct lustre_sb_info *lsi; ENTRY; - if (o->od_mount != NULL) { CERROR("Already mounted (%s)\n", dev); RETURN(-EEXIST); @@ -3459,7 +3878,7 @@ static struct lu_device *osd_device_alloc(const struct lu_env *env, l = osd2lu_dev(o); l->ld_ops = &osd_lu_ops; o->od_dt_dev.dd_ops = &osd_dt_ops; - spin_lock_init(&o->od_osfs_lock); + cfs_spin_lock_init(&o->od_osfs_lock); o->od_osfs_age = cfs_time_shift_64(-1000); o->od_capa_hash = init_capa_hash(); if (o->od_capa_hash == NULL) { @@ -3509,16 +3928,9 @@ static int osd_process_config(const struct lu_env *env, RETURN(err); } -extern void ldiskfs_orphan_cleanup (struct super_block * sb, - struct ldiskfs_super_block * es); - static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d) { - struct osd_device *o = osd_dev(d); - ENTRY; - /* TODO: orphans handling */ - ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es); RETURN(0); } @@ -3565,173 +3977,6 @@ out: RETURN(result); } -static struct inode *osd_iget(struct osd_thread_info *info, - struct osd_device *dev, - const struct osd_inode_id *id) -{ - struct inode *inode; - - inode = iget(osd_sb(dev), id->oii_ino); - if (inode == NULL) { - CERROR("no inode\n"); - inode = ERR_PTR(-EACCES); - } else if (is_bad_inode(inode)) { - CERROR("bad inode\n"); - iput(inode); - inode = ERR_PTR(-ENOENT); - } else if (id->oii_gen != OSD_OII_NOGEN && - inode->i_generation != id->oii_gen) { - CERROR("stale inode\n"); - iput(inode); - inode = ERR_PTR(-ESTALE); - } - - return inode; - -} - -static int osd_fid_lookup(const struct lu_env *env, - struct osd_object *obj, const struct lu_fid *fid) -{ - struct osd_thread_info *info; - struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; - struct osd_device *dev; - struct osd_inode_id *id; - struct osd_oi *oi; - struct inode *inode; - int result; - - LINVRNT(osd_invariant(obj)); - LASSERT(obj->oo_inode == NULL); - LASSERT(fid_is_sane(fid)); - /* - * This assertion checks that osd layer sees only local - * fids. Unfortunately it is somewhat expensive (does a - * cache-lookup). Disabling it for production/acceptance-testing. - */ - LASSERT(1 || fid_is_local(env, ldev->ld_site, fid)); - - ENTRY; - - info = osd_oti_get(env); - dev = osd_dev(ldev); - id = &info->oti_id; - oi = &dev->od_oi; - - if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) - RETURN(-ENOENT); - - result = osd_oi_lookup(info, oi, fid, id); - if (result == 0) { - inode = osd_iget(info, dev, id); - if (!IS_ERR(inode)) { - obj->oo_inode = inode; - LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); - if (dev->od_iop_mode) { - obj->oo_compat_dot_created = 1; - obj->oo_compat_dotdot_created = 1; - } - result = 0; - } else - /* - * If fid wasn't found in oi, inode-less object is - * created, for which lu_object_exists() returns - * false. This is used in a (frequent) case when - * objects are created as locking anchors or - * place holders for objects yet to be created. - */ - result = PTR_ERR(inode); - } else if (result == -ENOENT) - result = 0; - LINVRNT(osd_invariant(obj)); - - RETURN(result); -} - -static void osd_inode_getattr(const struct lu_env *env, - struct inode *inode, struct lu_attr *attr) -{ - attr->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | - LA_SIZE | LA_BLOCKS | LA_UID | LA_GID | - LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE; - - attr->la_atime = LTIME_S(inode->i_atime); - attr->la_mtime = LTIME_S(inode->i_mtime); - attr->la_ctime = LTIME_S(inode->i_ctime); - attr->la_mode = inode->i_mode; - attr->la_size = i_size_read(inode); - attr->la_blocks = inode->i_blocks; - attr->la_uid = inode->i_uid; - attr->la_gid = inode->i_gid; - attr->la_flags = LDISKFS_I(inode)->i_flags; - attr->la_nlink = inode->i_nlink; - attr->la_rdev = inode->i_rdev; - attr->la_blksize = ll_inode_blksize(inode); - attr->la_blkbits = inode->i_blkbits; -} - -/* - * Helpers. - */ - -static int lu_device_is_osd(const struct lu_device *d) -{ - return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops); -} - -static struct osd_object *osd_obj(const struct lu_object *o) -{ - LASSERT(lu_device_is_osd(o->lo_dev)); - return container_of0(o, struct osd_object, oo_dt.do_lu); -} - -static struct osd_device *osd_dt_dev(const struct dt_device *d) -{ - LASSERT(lu_device_is_osd(&d->dd_lu_dev)); - return container_of0(d, struct osd_device, od_dt_dev); -} - -static struct osd_device *osd_dev(const struct lu_device *d) -{ - LASSERT(lu_device_is_osd(d)); - return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev)); -} - -static struct osd_object *osd_dt_obj(const struct dt_object *d) -{ - return osd_obj(&d->do_lu); -} - -static struct osd_device *osd_obj2dev(const struct osd_object *o) -{ - return osd_dev(o->oo_dt.do_lu.lo_dev); -} - -static struct lu_device *osd2lu_dev(struct osd_device *osd) -{ - return &osd->od_dt_dev.dd_lu_dev; -} - -static struct super_block *osd_sb(const struct osd_device *dev) -{ - return dev->od_mount->lmi_mnt->mnt_sb; -} - -static journal_t *osd_journal(const struct osd_device *dev) -{ - return LDISKFS_SB(osd_sb(dev))->s_journal; -} - -static int osd_has_index(const struct osd_object *obj) -{ - return obj->oo_dt.do_index_ops != NULL; -} - -static int osd_object_invariant(const struct lu_object *l) -{ - return osd_invariant(osd_obj(l)); -} - static const struct lu_object_operations osd_lu_obj_ops = { .loo_object_init = osd_object_init, .loo_object_delete = osd_object_delete,