X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd%2Fosd_handler.c;h=97452a682f58c8ddcc7292b2a31e1a1873abf594;hp=7324e2780d0a8c440e276d712a396758467d2966;hb=c159c408293fbebf71a948e630aa9f637f3c8ffe;hpb=5e72bf1720374198302a1f82d6e2022423e8cb39 diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 7324e27..97452a6 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -1,29 +1,43 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/osd/osd_handler.c - * Top-level entry points into osd module + * GPL HEADER START * - * Copyright (c) 2006 Cluster File Systems, Inc. - * Author: Nikita Danilov + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/osd/osd_handler.c + * + * Top-level entry points into osd module + * + * Author: Nikita Danilov */ #ifndef EXPORT_SYMTAB @@ -60,11 +74,6 @@ #include /* struct ptlrpc_thread */ #include -/* LUSTRE_OSD_NAME */ -#include -/* class_register_type(), class_unregister_type(), class_get_type() */ -#include -#include /* fid_is_local() */ #include @@ -73,6 +82,13 @@ #include "osd_internal.h" #include "osd_igif.h" +/* llo_* api support */ +#include + +static const char dot[] = "."; +static const char dotdot[] = ".."; +static const char remote_obj_dir[] = "REM_OBJ_DIR"; + struct osd_directory { struct iam_container od_container; struct iam_descr od_descr; @@ -81,7 +97,7 @@ struct osd_directory { struct osd_object { struct dt_object oo_dt; - /* + /** * Inode for file system object represented by this osd_object. This * inode is pinned for the whole duration of lu_object life. * @@ -91,58 +107,24 @@ struct osd_object { struct inode *oo_inode; struct rw_semaphore oo_sem; struct osd_directory *oo_dir; - /* protects inode attributes. */ + /** protects inode attributes. */ spinlock_t oo_guard; -#if OSD_COUNTERS - const struct lu_env *oo_owner; -#endif -}; - -/* - * osd device. - */ -struct osd_device { - /* super-class */ - struct dt_device od_dt_dev; - /* information about underlying file system */ - struct lustre_mount_info *od_mount; - /* object index */ - struct osd_oi od_oi; - /* - * XXX temporary stuff for object index: directory where every object - * is named by its fid. - */ - struct dentry *od_obj_area; - - /* Environment for transaction commit callback. - * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD - * is serialized, that is there is no more than one transaction commit - * at a time (JBD journal_commit_transaction() is serialized). - * This means that it's enough to have _one_ lu_context. + /** + * Following two members are used to indicate the presence of dot and + * dotdot in the given directory. This is required for interop mode + * (b11826). */ - struct lu_env od_env_for_commit; + int oo_compat_dot_created; + int oo_compat_dotdot_created; - /* - * Fid Capability - */ - unsigned int od_fl_capa:1; - unsigned long od_capa_timeout; - __u32 od_capa_alg; - struct lustre_capa_key *od_capa_keys; - struct hlist_head *od_capa_hash; - - /* - * statfs optimization: we cache a bit. - */ - cfs_time_t od_osfs_age; - struct kstatfs od_kstatfs; - spinlock_t od_osfs_lock; + const struct lu_env *oo_owner; +#ifdef CONFIG_LOCKDEP + struct lockdep_map oo_dep_map; +#endif }; static int osd_root_get (const struct lu_env *env, struct dt_device *dev, struct lu_fid *f); -static int osd_statfs (const struct lu_env *env, - struct dt_device *dev, struct kstatfs *sfs); static int lu_device_is_osd (const struct lu_device *d); static void osd_mod_exit (void) __exit; @@ -150,12 +132,13 @@ static int osd_mod_init (void) __init; static int osd_type_init (struct lu_device_type *t); static void osd_type_fini (struct lu_device_type *t); static int osd_object_init (const struct lu_env *env, - struct lu_object *l); + struct lu_object *l, + const struct lu_object_conf *_); static void osd_object_release(const struct lu_env *env, struct lu_object *l); static int osd_object_print (const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o); -static void osd_device_free (const struct lu_env *env, +static struct lu_device *osd_device_free (const struct lu_env *env, struct lu_device *m); static void *osd_key_init (const struct lu_context *ctx, struct lu_context_key *key); @@ -173,41 +156,64 @@ static int osd_fid_lookup (const struct lu_env *env, const struct lu_fid *fid); static void osd_inode_getattr (const struct lu_env *env, struct inode *inode, struct lu_attr *attr); -static void osd_inode_setattr (const struct lu_env *env, +static int osd_inode_setattr (const struct lu_env *env, struct inode *inode, const struct lu_attr *attr); static int osd_param_is_sane (const struct osd_device *dev, const struct txn_param *param); -static int osd_index_lookup (const struct lu_env *env, - struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa); -static int osd_index_insert (const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa); -static int osd_index_delete (const struct lu_env *env, - struct dt_object *dt, const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa); -static int osd_index_probe (const struct lu_env *env, - struct osd_object *o, - const struct dt_index_features *feat); +static int osd_index_iam_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa); +static int osd_index_ea_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa); +static int osd_index_iam_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ingore_quota); +static int osd_index_ea_insert (const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ingore_quota); +static int osd_index_iam_delete(const struct lu_env *env, + struct dt_object *dt, const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa); +static int osd_index_ea_delete (const struct lu_env *env, + struct dt_object *dt, const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa); + +static int osd_iam_index_probe (const struct lu_env *env, + struct osd_object *o, + const struct dt_index_features *feat); static int osd_index_try (const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat); static void osd_index_fini (struct osd_object *o); -static void osd_it_fini (const struct lu_env *env, struct dt_it *di); -static int osd_it_get (const struct lu_env *env, +static void osd_it_iam_fini (const struct lu_env *env, struct dt_it *di); +static int osd_it_iam_get (const struct lu_env *env, + struct dt_it *di, const struct dt_key *key); +static void osd_it_iam_put (const struct lu_env *env, struct dt_it *di); +static int osd_it_iam_next (const struct lu_env *env, struct dt_it *di); +static int osd_it_iam_key_size (const struct lu_env *env, + const struct dt_it *di); +static void osd_it_ea_fini (const struct lu_env *env, struct dt_it *di); +static int osd_it_ea_get (const struct lu_env *env, struct dt_it *di, const struct dt_key *key); -static void osd_it_put (const struct lu_env *env, struct dt_it *di); -static int osd_it_next (const struct lu_env *env, struct dt_it *di); -static int osd_it_del (const struct lu_env *env, struct dt_it *di, - struct thandle *th); -static int osd_it_key_size (const struct lu_env *env, +static void osd_it_ea_put (const struct lu_env *env, struct dt_it *di); +static int osd_it_ea_next (const struct lu_env *env, struct dt_it *di); +static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di); + static void osd_conf_get (const struct lu_env *env, const struct dt_device *dev, struct dt_device_param *param); @@ -233,13 +239,21 @@ static struct inode *osd_iget (struct osd_thread_info *info, struct osd_device *dev, const struct osd_inode_id *id); static struct super_block *osd_sb (const struct osd_device *dev); -static struct dt_it *osd_it_init (const struct lu_env *env, - struct dt_object *dt, int wable, +static struct dt_it *osd_it_iam_init (const struct lu_env *env, + struct dt_object *dt, struct lustre_capa *capa); -static struct dt_key *osd_it_key (const struct lu_env *env, +static struct dt_key *osd_it_iam_key (const struct lu_env *env, const struct dt_it *di); -static struct dt_rec *osd_it_rec (const struct lu_env *env, +static struct dt_rec *osd_it_iam_rec (const struct lu_env *env, const struct dt_it *di); +static struct dt_it *osd_it_ea_init (const struct lu_env *env, + struct dt_object *dt, + struct lustre_capa *capa); +static struct dt_key *osd_it_ea_key (const struct lu_env *env, + const struct dt_it *di); +static struct dt_rec *osd_it_ea_rec (const struct lu_env *env, + const struct dt_it *di); + static struct timespec *osd_inode_time (const struct lu_env *env, struct inode *inode, __u64 seconds); @@ -248,25 +262,58 @@ static struct thandle *osd_trans_start (const struct lu_env *env, struct txn_param *p); static journal_t *osd_journal (const struct osd_device *dev); -static struct lu_device_type_operations osd_device_type_ops; -static struct lu_device_type osd_device_type; -static struct lu_object_operations osd_lu_obj_ops; -static struct obd_ops osd_obd_device_ops; -static struct lprocfs_vars lprocfs_osd_module_vars[]; -static struct lprocfs_vars lprocfs_osd_obd_vars[]; -static struct lu_device_operations osd_lu_ops; -static struct lu_context_key osd_key; -static struct dt_object_operations osd_obj_ops; -static struct dt_body_operations osd_body_ops; -static struct dt_index_operations osd_index_ops; -static struct dt_index_operations osd_index_compat_ops; +static int __osd_ea_add_rec(struct osd_thread_info *info, + struct osd_object *pobj, + struct osd_object *cobj, + const char *name, + struct thandle *th); + +static const struct lu_device_type_operations osd_device_type_ops; +static struct lu_device_type osd_device_type; +static const struct lu_object_operations osd_lu_obj_ops; +static struct obd_ops osd_obd_device_ops; +static const struct lu_device_operations osd_lu_ops; +static struct lu_context_key osd_key; +static const struct dt_object_operations osd_obj_ops; +static const struct dt_object_operations osd_obj_ea_ops; +static const struct dt_body_operations osd_body_ops; +static const struct dt_index_operations osd_index_iam_ops; +static const struct dt_index_operations osd_index_ea_ops; struct osd_thandle { struct thandle ot_super; handle_t *ot_handle; struct journal_callback ot_jcb; + /* Link to the device, for debugging. */ + struct lu_ref_link *ot_dev_link; + }; +#ifdef HAVE_QUOTA_SUPPORT +static inline void +osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save) +{ + struct md_ucred *uc = md_ucred(env); + + LASSERT(uc != NULL); + + save->oc_uid = current->fsuid; + save->oc_gid = current->fsgid; + save->oc_cap = current->cap_effective; + current->fsuid = uc->mu_fsuid; + current->fsgid = uc->mu_fsgid; + current->cap_effective = uc->mu_cap; +} + +static inline void +osd_pop_ctxt(struct osd_ctxt *save) +{ + current->fsuid = save->oc_uid; + current->fsgid = save->oc_gid; + current->cap_effective = save->oc_cap; +} +#endif + /* * Invariants, assertions. */ @@ -298,7 +345,6 @@ static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env) return lu_context_key_get(&env->le_ctx, &osd_key); } -#if OSD_COUNTERS /* * Concurrency: doesn't matter */ @@ -316,15 +362,6 @@ static int osd_write_locked(const struct lu_env *env, struct osd_object *o) return oti->oti_w_locks > 0 && o->oo_owner == env; } -#define OSD_COUNTERS_DO(exp) exp -#else - - -#define osd_read_locked(env, o) (1) -#define osd_write_locked(env, o) (1) -#define OSD_COUNTERS_DO(exp) ((void)0) -#endif - /* * Concurrency: doesn't access mutable data */ @@ -358,7 +395,11 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, l = &mo->oo_dt.do_lu; dt_object_init(&mo->oo_dt, NULL, d); - mo->oo_dt.do_ops = &osd_obj_ops; + if (osd_dev(d)->od_iop_mode) + mo->oo_dt.do_ops = &osd_obj_ea_ops; + else + mo->oo_dt.do_ops = &osd_obj_ops; + l->lo_ops = &osd_lu_obj_ops; init_rwsem(&mo->oo_sem); spin_lock_init(&mo->oo_guard); @@ -382,19 +423,20 @@ static void osd_object_init0(struct osd_object *obj) * Concurrency: no concurrent access is possible that early in object * life-cycle. */ -static int osd_object_init(const struct lu_env *env, struct lu_object *l) +static int osd_object_init(const struct lu_env *env, struct lu_object *l, + const struct lu_object_conf *_) { struct osd_object *obj = osd_obj(l); int result; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); result = osd_fid_lookup(env, obj, lu_object_fid(l)); if (result == 0) { if (obj->oo_inode != NULL) osd_object_init0(obj); } - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); return result; } @@ -406,17 +448,24 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l) { struct osd_object *obj = osd_obj(l); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); dt_object_fini(&obj->oo_dt); OBD_FREE_PTR(obj); } -static struct iam_path_descr *osd_ipd_get(const struct lu_env *env, - const struct iam_container *bag) +static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env, + const struct iam_container *bag) { return bag->ic_descr->id_ops->id_ipd_alloc(bag, - osd_oti_get(env)->oti_ipd); + osd_oti_get(env)->oti_it_ipd); +} + +static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env, + const struct iam_container *bag) +{ + return bag->ic_descr->id_ops->id_ipd_alloc(bag, + osd_oti_get(env)->oti_idx_ipd); } static void osd_ipd_put(const struct lu_env *env, @@ -473,7 +522,7 @@ static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj) struct thandle *th; int result; - txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + + txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + OSD_TXN_INODE_DELETE_CREDITS); th = osd_trans_start(env, &osd->od_dt_dev, prm); if (!IS_ERR(th)) { @@ -496,12 +545,10 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) struct osd_object *obj = osd_obj(l); struct inode *inode = obj->oo_inode; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); /* * If object is unlinked remove fid->ino mapping from object index. - * - * File body will be deleted by iput(). */ osd_index_fini(obj); @@ -515,6 +562,7 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) "Failed to cleanup: %d\n", result); } + iput(inode); obj->oo_inode = NULL; } @@ -556,8 +604,8 @@ static int osd_object_print(const struct lu_env *env, void *cookie, /* * Concurrency: shouldn't matter. */ -static int osd_statfs(const struct lu_env *env, - struct dt_device *d, struct kstatfs *sfs) +int osd_statfs(const struct lu_env *env, struct dt_device *d, + struct kstatfs *sfs) { struct osd_device *osd = osd_dt_dev(d); struct super_block *sb = osd_sb(osd); @@ -572,7 +620,7 @@ static int osd_statfs(const struct lu_env *env, } if (likely(result == 0)) - *sfs = osd->od_kstatfs; + *sfs = osd->od_kstatfs; spin_unlock(&osd->od_osfs_lock); return result; @@ -593,6 +641,19 @@ static void osd_conf_get(const struct lu_env *env, param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits; } +/** + * Helper function to get and fill the buffer with input values. + */ +static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len) +{ + struct lu_buf *buf; + + buf = &osd_oti_get(env)->oti_buf; + buf->lb_buf = area; + buf->lb_len = len; + return buf; +} + /* * Journal */ @@ -612,8 +673,9 @@ static int osd_param_is_sane(const struct osd_device *dev, static void osd_trans_commit_cb(struct journal_callback *jcb, int error) { struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb); - struct thandle *th = &oh->ot_super; + struct thandle *th = &oh->ot_super; struct dt_device *dev = th->th_dev; + struct lu_device *lud = &dev->dd_lu_dev; LASSERT(dev != NULL); LASSERT(oh->ot_handle == NULL); @@ -621,14 +683,18 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error) if (error) { CERROR("transaction @0x%p commit error: %d\n", th, error); } else { + struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit; /* * This od_env_for_commit is only for commit usage. see * "struct dt_device" */ - dt_txn_hook_commit(&osd_dt_dev(dev)->od_env_for_commit, th); + lu_context_enter(&env->le_ctx); + dt_txn_hook_commit(env, th); + lu_context_exit(&env->le_ctx); } - lu_device_put(&dev->dd_lu_dev); + lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th); + lu_device_put(lud); th->th_dev = NULL; lu_context_exit(&th->th_ctx); @@ -658,6 +724,8 @@ static struct thandle *osd_trans_start(const struct lu_env *env, if (osd_param_is_sane(dev, p)) { OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); if (oh != NULL) { + struct osd_thread_info *oti = osd_oti_get(env); + /* * XXX temporary stuff. Some abstraction layer should * be used. @@ -671,22 +739,18 @@ static struct thandle *osd_trans_start(const struct lu_env *env, th->th_result = 0; jh->h_sync = p->tp_sync; lu_device_get(&d->dd_lu_dev); + oh->ot_dev_link = lu_ref_add + (&d->dd_lu_dev.ld_reference, + "osd-tx", th); /* add commit callback */ lu_context_init(&th->th_ctx, LCT_TX_HANDLE); lu_context_enter(&th->th_ctx); journal_callback_set(jh, osd_trans_commit_cb, (struct journal_callback *)&oh->ot_jcb); -#if OSD_COUNTERS - { - struct osd_thread_info *oti = - osd_oti_get(env); - LASSERT(oti->oti_txns == 0); LASSERT(oti->oti_r_locks == 0); LASSERT(oti->oti_w_locks == 0); oti->oti_txns++; - } -#endif } else { OBD_FREE_PTR(oh); th = (void *)jh; @@ -708,35 +772,25 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th) { int result; struct osd_thandle *oh; + struct osd_thread_info *oti = osd_oti_get(env); ENTRY; oh = container_of0(th, struct osd_thandle, ot_super); if (oh->ot_handle != NULL) { handle_t *hdl = oh->ot_handle; - /* - * XXX temporary stuff. Some abstraction layer should be used. - */ + + LASSERT(oti->oti_txns == 1); + oti->oti_txns--; + LASSERT(oti->oti_r_locks == 0); + LASSERT(oti->oti_w_locks == 0); result = dt_txn_hook_stop(env, th); if (result != 0) CERROR("Failure in transaction hook: %d\n", result); - - /**/ oh->ot_handle = NULL; result = journal_stop(hdl); if (result != 0) CERROR("Failure to stop transaction: %d\n", result); - -#if OSD_COUNTERS - { - struct osd_thread_info *oti = osd_oti_get(env); - - LASSERT(oti->oti_txns == 1); - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - oti->oti_txns--; - } -#endif } EXIT; } @@ -750,6 +804,28 @@ static int osd_sync(const struct lu_env *env, struct dt_device *d) return ldiskfs_force_commit(osd_sb(osd_dt_dev(d))); } +/** + * Start commit for OSD device. + * + * An implementation of dt_commit_async method for OSD device. + * Asychronously starts underlayng fs sync and thereby a transaction + * commit. + * + * \param env environment + * \param d dt device + * + * \see dt_device_operations + */ +static int osd_commit_async(const struct lu_env *env, + struct dt_device *d) +{ + struct super_block *s = osd_sb(osd_dt_dev(d)); + ENTRY; + + CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME); + RETURN(s->s_op->sync_fs(s, 0)); +} + /* * Concurrency: shouldn't matter. */ @@ -766,6 +842,7 @@ static void osd_ro(const struct lu_env *env, struct dt_device *d) EXIT; } + /* * Concurrency: serialization provided by callers. */ @@ -783,49 +860,164 @@ static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d, RETURN(0); } -/* Note: we did not count into QUOTA here, If we mount with --data_journal - * we may need more*/ -static const int osd_dto_credits[DTO_NR] = { - /* - * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) + - * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since - * iam have more level than Ext3 htree +/** + * Concurrency: serialization provided by callers. + */ +static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d, + struct dt_quota_ctxt *ctxt, void *data) +{ + struct obd_device *obd = (void *)ctxt; + struct vfsmount *mnt = (struct vfsmount *)data; + ENTRY; + + obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb; + OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); + obd->obd_lvfs_ctxt.pwdmnt = mnt; + obd->obd_lvfs_ctxt.pwd = mnt->mnt_root; + obd->obd_lvfs_ctxt.fs = get_ds(); + + EXIT; +} + +/** + * Note: we do not count into QUOTA here. + * If we mount with --data_journal we may need more. + */ +static const int osd_dto_credits_noquota[DTO_NR] = { + /** + * Insert/Delete. + * INDEX_EXTRA_TRANS_BLOCKS(8) + + * SINGLEDATA_TRANS_BLOCKS(8) + * XXX Note: maybe iam need more, since iam have more level than + * EXT3 htree. */ [DTO_INDEX_INSERT] = 16, [DTO_INDEX_DELETE] = 16, + /** + * Unused now + */ [DTO_IDNEX_UPDATE] = 16, - /* - * Create a object. Same as create object in Ext3 filesystem, but did - * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) + - * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT) + /** + * Create a object. The same as create object in EXT3. + * DATA_TRANS_BLOCKS(14) + + * INDEX_EXTRA_BLOCKS(8) + + * 3(inode bits, groups, GDT) + */ + [DTO_OBJECT_CREATE] = 25, + /** + * Unused now + */ + [DTO_OBJECT_DELETE] = 25, + /** + * Attr set credits. + * 3(inode bits, group, GDT) + */ + [DTO_ATTR_SET_BASE] = 3, + /** + * Xattr set. The same as xattr of EXT3. + * DATA_TRANS_BLOCKS(14) + * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are + * also counted in. Do not know why? + */ + [DTO_XATTR_SET] = 14, + [DTO_LOG_REC] = 14, + /** + * creadits for inode change during write. + */ + [DTO_WRITE_BASE] = 3, + /** + * credits for single block write. + */ + [DTO_WRITE_BLOCK] = 14, + /** + * Attr set credits for chown. + * 3 (inode bit, group, GDT) */ - [DTO_OBJECT_CREATE] = 23, - [DTO_OBJECT_DELETE] = 23, + [DTO_ATTR_SET_CHOWN]= 3 +}; + +/** + * Note: we count into QUOTA here. + * If we mount with --data_journal we may need more. + */ +static const int osd_dto_credits_quota[DTO_NR] = { + /** + * INDEX_EXTRA_TRANS_BLOCKS(8) + + * SINGLEDATA_TRANS_BLOCKS(8) + + * 2 * QUOTA_TRANS_BLOCKS(2) + */ + [DTO_INDEX_INSERT] = 20, + /** + * INDEX_EXTRA_TRANS_BLOCKS(8) + + * SINGLEDATA_TRANS_BLOCKS(8) + + * 2 * QUOTA_TRANS_BLOCKS(2) + */ + [DTO_INDEX_DELETE] = 20, + /** + * Unused now. + */ + [DTO_IDNEX_UPDATE] = 16, /* - * Attr set credits 3 inode, group, GDT + * Create a object. Same as create object in EXT3 filesystem. + * DATA_TRANS_BLOCKS(16) + + * INDEX_EXTRA_BLOCKS(8) + + * 3(inode bits, groups, GDT) + + * 2 * QUOTA_INIT_BLOCKS(25) */ - [DTO_ATTR_SET] = 3, + [DTO_OBJECT_CREATE] = 77, /* - * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note: - * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are - * also counted in. Do not know why? + * Unused now. + * DATA_TRANS_BLOCKS(16) + + * INDEX_EXTRA_BLOCKS(8) + + * 3(inode bits, groups, GDT) + + * QUOTA(?) + */ + [DTO_OBJECT_DELETE] = 27, + /** + * Attr set credits. + * 3 (inode bit, group, GDT) + + */ + [DTO_ATTR_SET_BASE] = 3, + /** + * Xattr set. The same as xattr of EXT3. + * DATA_TRANS_BLOCKS(16) + * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are + * also counted in. Do not know why? */ [DTO_XATTR_SET] = 16, [DTO_LOG_REC] = 16, - /* creadits for inode change during write */ + /** + * creadits for inode change during write. + */ [DTO_WRITE_BASE] = 3, - /* credits for single block write */ - [DTO_WRITE_BLOCK] = 12 + /** + * credits for single block write. + */ + [DTO_WRITE_BLOCK] = 16, + /** + * Attr set credits for chown. + * 3 (inode bit, group, GDT) + + * 2 * QUOTA_INIT_BLOCKS(25) + + * 2 * QUOTA_DEL_BLOCKS(9) + */ + [DTO_ATTR_SET_CHOWN]= 71 }; static int osd_credit_get(const struct lu_env *env, struct dt_device *d, enum dt_txn_op op) { - LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits)); - return osd_dto_credits[op]; + LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) == + ARRAY_SIZE(osd_dto_credits_quota)); + LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota)); +#ifdef HAVE_QUOTA_SUPPORT + if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA)) + return osd_dto_credits_quota[op]; + else +#endif + return osd_dto_credits_noquota[op]; } -static struct dt_device_operations osd_dt_ops = { +static const struct dt_device_operations osd_dt_ops = { .dt_root_get = osd_root_get, .dt_statfs = osd_statfs, .dt_trans_start = osd_trans_start, @@ -833,63 +1025,53 @@ static struct dt_device_operations osd_dt_ops = { .dt_conf_get = osd_conf_get, .dt_sync = osd_sync, .dt_ro = osd_ro, + .dt_commit_async = osd_commit_async, .dt_credit_get = osd_credit_get, .dt_init_capa_ctxt = osd_init_capa_ctxt, + .dt_init_quota_ctxt= osd_init_quota_ctxt, }; static void osd_object_read_lock(const struct lu_env *env, - struct dt_object *dt) + struct dt_object *dt, unsigned role) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *oti = osd_oti_get(env); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); - OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env)); - down_read(&obj->oo_sem); -#if OSD_COUNTERS - { - struct osd_thread_info *oti = osd_oti_get(env); + LASSERT(obj->oo_owner != env); + down_read_nested(&obj->oo_sem, role); - LASSERT(obj->oo_owner == NULL); - oti->oti_r_locks++; - } -#endif + LASSERT(obj->oo_owner == NULL); + oti->oti_r_locks++; } static void osd_object_write_lock(const struct lu_env *env, - struct dt_object *dt) + struct dt_object *dt, unsigned role) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *oti = osd_oti_get(env); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); - OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env)); - down_write(&obj->oo_sem); -#if OSD_COUNTERS - { - struct osd_thread_info *oti = osd_oti_get(env); + LASSERT(obj->oo_owner != env); + down_write_nested(&obj->oo_sem, role); - LASSERT(obj->oo_owner == NULL); - obj->oo_owner = env; - oti->oti_w_locks++; - } -#endif + LASSERT(obj->oo_owner == NULL); + obj->oo_owner = env; + oti->oti_w_locks++; } static void osd_object_read_unlock(const struct lu_env *env, struct dt_object *dt) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *oti = osd_oti_get(env); - LASSERT(osd_invariant(obj)); -#if OSD_COUNTERS - { - struct osd_thread_info *oti = osd_oti_get(env); + LINVRNT(osd_invariant(obj)); - LASSERT(oti->oti_r_locks > 0); - oti->oti_r_locks--; - } -#endif + LASSERT(oti->oti_r_locks > 0); + oti->oti_r_locks--; up_read(&obj->oo_sem); } @@ -897,18 +1079,14 @@ static void osd_object_write_unlock(const struct lu_env *env, struct dt_object *dt) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *oti = osd_oti_get(env); - LASSERT(osd_invariant(obj)); -#if OSD_COUNTERS - { - struct osd_thread_info *oti = osd_oti_get(env); - - LASSERT(obj->oo_owner == env); - LASSERT(oti->oti_w_locks > 0); - oti->oti_w_locks--; - obj->oo_owner = NULL; - } -#endif + LINVRNT(osd_invariant(obj)); + + LASSERT(obj->oo_owner == env); + LASSERT(oti->oti_w_locks > 0); + oti->oti_w_locks--; + obj->oo_owner = NULL; up_write(&obj->oo_sem); } @@ -918,6 +1096,7 @@ static int capa_is_sane(const struct lu_env *env, struct lustre_capa_key *keys) { struct osd_thread_info *oti = osd_oti_get(env); + struct lustre_capa *tcapa = &oti->oti_capa; struct obd_capa *oc; int i, rc = 0; ENTRY; @@ -932,6 +1111,11 @@ static int capa_is_sane(const struct lu_env *env, RETURN(rc); } + if (capa_is_expired_sec(capa)) { + DEBUG_CAPA(D_ERROR, capa, "expired"); + RETURN(-ESTALE); + } + spin_lock(&capa_lock); for (i = 0; i < 2; i++) { if (keys[i].lk_keyid == capa->lc_keyid) { @@ -946,11 +1130,11 @@ static int capa_is_sane(const struct lu_env *env, RETURN(-ESTALE); } - rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key); + rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key); if (rc) RETURN(rc); - if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) - { + + if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) { DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch"); RETURN(-EACCES); } @@ -966,6 +1150,7 @@ static int osd_object_auth(const struct lu_env *env, struct dt_object *dt, { const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_device *dev = osd_dev(dt->do_lu.lo_dev); + struct md_capainfo *ci; int rc; if (!dev->od_fl_capa) @@ -974,6 +1159,13 @@ static int osd_object_auth(const struct lu_env *env, struct dt_object *dt, if (capa == BYPASS_CAPA) return 0; + ci = md_capainfo(env); + if (unlikely(!ci)) + return 0; + + if (ci->mc_auth == LC_ID_NONE) + return 0; + if (!capa) { CERROR("no capability is provided for fid "DFID"\n", PFID(fid)); return -EACCES; @@ -1006,7 +1198,7 @@ static int osd_attr_get(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); LASSERT(dt_object_exists(dt)); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) return -EACCES; @@ -1024,6 +1216,7 @@ static int osd_attr_set(const struct lu_env *env, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); + int rc; LASSERT(handle != NULL); LASSERT(dt_object_exists(dt)); @@ -1033,11 +1226,12 @@ static int osd_attr_set(const struct lu_env *env, return -EACCES; spin_lock(&obj->oo_guard); - osd_inode_setattr(env, obj->oo_inode, attr); + rc = osd_inode_setattr(env, obj->oo_inode, attr); spin_unlock(&obj->oo_guard); - mark_inode_dirty(obj->oo_inode); - return 0; + if (!rc) + mark_inode_dirty(obj->oo_inode); + return rc; } static struct timespec *osd_inode_time(const struct lu_env *env, @@ -1052,8 +1246,8 @@ static struct timespec *osd_inode_time(const struct lu_env *env, return t; } -static void osd_inode_setattr(const struct lu_env *env, - struct inode *inode, const struct lu_attr *attr) +static int osd_inode_setattr(const struct lu_env *env, + struct inode *inode, const struct lu_attr *attr) { __u64 bits; @@ -1061,6 +1255,28 @@ static void osd_inode_setattr(const struct lu_env *env, LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */ +#ifdef HAVE_QUOTA_SUPPORT + if ((bits & LA_UID && attr->la_uid != inode->i_uid) || + (bits & LA_GID && attr->la_gid != inode->i_gid)) { + struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt; + struct iattr iattr; + int rc; + + iattr.ia_valid = 0; + if (bits & LA_UID) + iattr.ia_valid |= ATTR_UID; + if (bits & LA_GID) + iattr.ia_valid |= ATTR_GID; + iattr.ia_uid = attr->la_uid; + iattr.ia_gid = attr->la_gid; + osd_push_ctxt(env, save); + rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0; + osd_pop_ctxt(save); + if (rc != 0) + return rc; + } +#endif + if (bits & LA_ATIME) inode->i_atime = *osd_inode_time(env, inode, attr->la_atime); if (bits & LA_CTIME) @@ -1071,8 +1287,14 @@ static void osd_inode_setattr(const struct lu_env *env, LDISKFS_I(inode)->i_disksize = attr->la_size; i_size_write(inode, attr->la_size); } +# if 0 + /* + * OSD should not change "i_blocks" which is used by quota. + * "i_blocks" should be changed by ldiskfs only. + * Disable this assignment until SOM to fix some EA field. */ if (bits & LA_BLOCKS) inode->i_blocks = attr->la_blocks; +#endif if (bits & LA_MODE) inode->i_mode = (inode->i_mode & S_IFMT) | (attr->la_mode & ~S_IFMT); @@ -1091,6 +1313,7 @@ static void osd_inode_setattr(const struct lu_env *env, li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) | (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE); } + return 0; } /* @@ -1116,6 +1339,43 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj, extern struct inode *ldiskfs_create_inode(handle_t *handle, struct inode * dir, int mode); +extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry, + struct inode *inode); +extern int ldiskfs_delete_entry(handle_t *handle, + struct inode * dir, + struct ldiskfs_dir_entry_2 * de_del, + struct buffer_head * bh); +extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry, + struct ldiskfs_dir_entry_2 + ** res_dir); +extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir, + struct inode *inode); + +extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode, + int name_index, const char *name, + const void *value, size_t value_len, + int flags); + +static struct dentry * osd_child_dentry_get(const struct lu_env *env, + struct osd_object *obj, + const char *name, + const int namelen) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *child_dentry = &info->oti_child_dentry; + struct dentry *obj_dentry = &info->oti_obj_dentry; + + obj_dentry->d_inode = obj->oo_inode; + obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); + obj_dentry->d_name.hash = 0; + + child_dentry->d_name.hash = 0; + child_dentry->d_parent = obj_dentry; + child_dentry->d_name.name = name; + child_dentry->d_name.len = namelen; + return child_dentry; +} + static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, umode_t mode, @@ -1125,29 +1385,40 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, int result; struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oth; - struct inode *parent; + struct dt_object *parent; struct inode *inode; +#ifdef HAVE_QUOTA_SUPPORT + struct osd_ctxt *save = &info->oti_ctxt; +#endif - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); - LASSERT(osd->od_obj_area != NULL); oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); if (hint && hint->dah_parent) - parent = osd_dt_obj(hint->dah_parent)->oo_inode; + parent = hint->dah_parent; else - parent = osd->od_obj_area->d_inode; - LASSERT(parent->i_op != NULL); + parent = osd->od_obj_area; - inode = ldiskfs_create_inode(oth->ot_handle, parent, mode); + LASSERT(parent != NULL); + LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL); + +#ifdef HAVE_QUOTA_SUPPORT + osd_push_ctxt(info->oti_env, save); +#endif + inode = ldiskfs_create_inode(oth->ot_handle, + osd_dt_obj(parent)->oo_inode, mode); +#ifdef HAVE_QUOTA_SUPPORT + osd_pop_ctxt(save); +#endif if (!IS_ERR(inode)) { obj->oo_inode = inode; result = 0; } else result = PTR_ERR(inode); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); return result; } @@ -1155,6 +1426,10 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize, int recsize, handle_t *handle); +extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize, + int recsize, handle_t *handle); + + enum { OSD_NAME_LEN = 255 }; @@ -1162,22 +1437,25 @@ enum { static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { int result; struct osd_thandle *oth; + struct osd_device *osd = osd_obj2dev(obj); + __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX)); LASSERT(S_ISDIR(attr->la_mode)); oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); - result = osd_mkfile(info, obj, (attr->la_mode & - (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th); - if (result == 0) { + result = osd_mkfile(info, obj, mode, hint, th); + if (result == 0 && osd->od_iop_mode == 0) { LASSERT(obj->oo_inode != NULL); /* * XXX uh-oh... call low-level iam function directly. */ + result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4, sizeof (struct lu_fid_pack), oth->ot_handle); @@ -1185,9 +1463,47 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, return result; } +static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + int result; + struct osd_thandle *oth; + const struct dt_index_features *feat = dof->u.dof_idx.di_feat; + + __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX)); + + LASSERT(S_ISREG(attr->la_mode)); + + oth = container_of(th, struct osd_thandle, ot_super); + LASSERT(oth->ot_handle->h_transaction != NULL); + + result = osd_mkfile(info, obj, mode, hint, th); + if (result == 0) { + LASSERT(obj->oo_inode != NULL); + if (feat->dif_flags & DT_IND_VARKEY) + result = iam_lvar_create(obj->oo_inode, + feat->dif_keysize_max, + feat->dif_ptrsize, + feat->dif_recsize_max, + oth->ot_handle); + else + result = iam_lfix_create(obj->oo_inode, + feat->dif_keysize_max, + feat->dif_ptrsize, + feat->dif_recsize_max, + oth->ot_handle); + + } + return result; +} + static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { LASSERT(S_ISREG(attr->la_mode)); @@ -1198,6 +1514,7 @@ static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj, static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { LASSERT(S_ISLNK(attr->la_mode)); @@ -1208,56 +1525,53 @@ static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj, static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - int result; - struct osd_device *osd = osd_obj2dev(obj); - struct inode *dir; umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX); + int result; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); - LASSERT(osd->od_obj_area != NULL); LASSERT(S_ISCHR(mode) || S_ISBLK(mode) || S_ISFIFO(mode) || S_ISSOCK(mode)); - dir = osd->od_obj_area->d_inode; - LASSERT(dir->i_op != NULL); - result = osd_mkfile(info, obj, mode, hint, th); if (result == 0) { LASSERT(obj->oo_inode != NULL); init_special_inode(obj->oo_inode, mode, attr->la_rdev); } - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); return result; } typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *, struct lu_attr *, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *); -static osd_obj_type_f osd_create_type_f(__u32 mode) +static osd_obj_type_f osd_create_type_f(enum dt_format_type type) { osd_obj_type_f result; - switch (mode) { - case S_IFDIR: + switch (type) { + case DFT_DIR: result = osd_mkdir; break; - case S_IFREG: + case DFT_REGULAR: result = osd_mkreg; break; - case S_IFLNK: + case DFT_SYM: result = osd_mksym; break; - case S_IFCHR: - case S_IFBLK: - case S_IFIFO: - case S_IFSOCK: + case DFT_NODE: result = osd_mknod; break; + case DFT_INDEX: + result = osd_mk_index; + break; + default: LBUG(); break; @@ -1276,81 +1590,261 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, ah->dah_mode = child_mode; } +/** + * Helper function for osd_object_create() + * + * \retval 0, on success + */ +static int __osd_object_create(struct osd_thread_info *info, + struct osd_object *obj, struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + + int result; -/* - * Concurrency: @dt is write locked. + result = osd_create_pre(info, obj, attr, th); + if (result == 0) { + result = osd_create_type_f(dof->dof_type)(info, obj, + attr, hint, dof, th); + if (result == 0) + result = osd_create_post(info, obj, attr, th); + } + return result; +} + +/** + * Helper function for osd_object_create() + * + * \retval 0, on success */ +static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj, + const struct lu_fid *fid, struct thandle *th) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct osd_inode_id *id = &info->oti_id; + struct osd_device *osd = osd_obj2dev(obj); + struct md_ucred *uc = md_ucred(env); + + LASSERT(obj->oo_inode != NULL); + LASSERT(uc != NULL); + + id->oii_ino = obj->oo_inode->i_ino; + id->oii_gen = obj->oo_inode->i_generation; + + return osd_oi_insert(info, &osd->od_oi, fid, id, th, + uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); +} + static int osd_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - const struct lu_fid *fid = lu_object_fid(&dt->do_lu); - struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); - struct osd_thread_info *info = osd_oti_get(env); + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *info = osd_oti_get(env); int result; ENTRY; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(!dt_object_exists(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - /* - * XXX missing: Quote handling. - */ - - result = osd_create_pre(info, obj, attr, th); - if (result == 0) { - result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj, - attr, hint, th); - if (result == 0) - result = osd_create_post(info, obj, attr, th); - } - if (result == 0) { - struct osd_inode_id *id = &info->oti_id; - - LASSERT(obj->oo_inode != NULL); - - id->oii_ino = obj->oo_inode->i_ino; - id->oii_gen = obj->oo_inode->i_generation; - - result = osd_oi_insert(info, &osd->od_oi, fid, id, th); - } + result = __osd_object_create(info, obj, attr, hint, dof, th); + if (result == 0) + result = __osd_oi_insert(env, obj, fid, th); LASSERT(ergo(result == 0, dt_object_exists(dt))); LASSERT(osd_invariant(obj)); RETURN(result); } -/* - * Concurrency: @dt is write locked. +/** + * Helper function for osd_xattr_set() */ -static void osd_object_ref_add(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, int fl) { - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *dentry = &info->oti_child_dentry; + struct timespec *t = &info->oti_time; + int fs_flags = 0; + int rc; - LASSERT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); + LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL); LASSERT(osd_write_locked(env, obj)); - LASSERT(th != NULL); - spin_lock(&obj->oo_guard); - if (inode->i_nlink < LDISKFS_LINK_MAX) { - inode->i_nlink ++; + if (fl & LU_XATTR_REPLACE) + fs_flags |= XATTR_REPLACE; + + if (fl & LU_XATTR_CREATE) + fs_flags |= XATTR_CREATE; + + dentry->d_inode = inode; + *t = inode->i_ctime; + rc = inode->i_op->setxattr(dentry, name, buf->lb_buf, + buf->lb_len, fs_flags); + if (likely(rc == 0)) { + spin_lock(&obj->oo_guard); + inode->i_ctime = *t; spin_unlock(&obj->oo_guard); mark_inode_dirty(inode); - } else { - spin_unlock(&obj->oo_guard); - LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu, - "Overflowed nlink\n"); } + return rc; +} + +/** + * Put the fid into lustre_mdt_attrs, and then place the structure + * inode's ea. This fid should not be altered during the life time + * of the inode. + * + * \retval +ve, on success + * \retval -ve, on error + * + * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here + */ +static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_fid *fid) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; + + fid_cpu_to_be(&mdt_attrs->lma_self_fid, fid); + + return __osd_xattr_set(env, dt, + osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs), + XATTR_NAME_LMA, LU_XATTR_CREATE); + +} + +/** + * Helper function to form igif + */ +static inline void osd_igif_get(const struct lu_env *env, struct dentry *dentry, + struct lu_fid *fid) +{ + struct inode *inode = dentry->d_inode; + lu_igif_build(fid, inode->i_ino, inode->i_generation); +} + +/** + * Helper function to pack the fid + */ +static inline void osd_fid_pack(const struct lu_env *env, const struct lu_fid *fid, + struct lu_fid_pack *pack) +{ + fid_pack(pack, fid, &osd_oti_get(env)->oti_fid); +} + +/** + * Try to read the fid from inode ea into dt_rec, if return value + * i.e. rc is +ve, then we got fid, otherwise we will have to form igif + * + * \param rec, the data-structure into which fid/igif is read + * + * \retval 0, on success + */ +static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry, + struct dt_rec *rec) +{ + struct inode *inode = dentry->d_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; + struct lu_fid *fid = &info->oti_fid; + int rc; + + LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); + + rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs, + sizeof *mdt_attrs); + + if (rc > 0) { + fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid); + rc = 0; + } else if (rc == -ENODATA) { + osd_igif_get(env, dentry, fid); + rc = 0; + } + + if (rc == 0) + osd_fid_pack(env, fid, (struct lu_fid_pack*)rec); + + return rc; +} + +/** + * OSD layer object create function for interoperability mode (b11826). + * This is mostly similar to osd_object_create(). Only difference being, fid is + * inserted into inode ea here. + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *info = osd_oti_get(env); + int result; + int is_root = 0; + + ENTRY; + LASSERT(osd_invariant(obj)); + LASSERT(!dt_object_exists(dt)); + LASSERT(osd_write_locked(env, obj)); + LASSERT(th != NULL); + + result = __osd_object_create(info, obj, attr, hint, dof, th); + + if (hint && hint->dah_parent) + is_root = osd_object_is_root(osd_dt_obj(hint->dah_parent)); + + /* objects under osd root shld have igif fid, so dont add fid EA */ + if (result == 0 && is_root == 0) + result = osd_ea_fid_set(env, dt, fid); + + if (result == 0) + result = __osd_oi_insert(env, obj, fid, th); + + LASSERT(ergo(result == 0, dt_object_exists(dt))); + LINVRNT(osd_invariant(obj)); + RETURN(result); +} + +/* + * Concurrency: @dt is write locked. + */ +static void osd_object_ref_add(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + + LINVRNT(osd_invariant(obj)); + LASSERT(dt_object_exists(dt)); + LASSERT(osd_write_locked(env, obj)); + LASSERT(th != NULL); + + spin_lock(&obj->oo_guard); + LASSERT(inode->i_nlink < LDISKFS_LINK_MAX); + inode->i_nlink++; + spin_unlock(&obj->oo_guard); + mark_inode_dirty(inode); + LINVRNT(osd_invariant(obj)); } /* @@ -1363,22 +1857,17 @@ static void osd_object_ref_del(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); spin_lock(&obj->oo_guard); - if (inode->i_nlink > 0) { - inode->i_nlink --; - spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); - } else { - spin_unlock(&obj->oo_guard); - LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu, - "Underflowed nlink\n"); - } - LASSERT(osd_invariant(obj)); + LASSERT(inode->i_nlink > 0); + inode->i_nlink--; + spin_unlock(&obj->oo_guard); + mark_inode_dirty(inode); + LINVRNT(osd_invariant(obj)); } /* @@ -1393,7 +1882,7 @@ static int osd_xattr_get(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; + struct dentry *dentry = &info->oti_obj_dentry; LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); @@ -1406,6 +1895,7 @@ static int osd_xattr_get(const struct lu_env *env, return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len); } + /* * Concurrency: @dt is write locked. */ @@ -1413,32 +1903,12 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *handle, struct lustre_capa *capa) { - int fs_flags; - - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; - - LASSERT(dt_object_exists(dt)); - LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL); - LASSERT(osd_write_locked(env, obj)); LASSERT(handle != NULL); if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; - dentry->d_inode = inode; - - fs_flags = 0; - if (fl & LU_XATTR_REPLACE) - fs_flags |= XATTR_REPLACE; - - if (fl & LU_XATTR_CREATE) - fs_flags |= XATTR_CREATE; - - return inode->i_op->setxattr(dentry, name, - buf->lb_buf, buf->lb_len, fs_flags); + return __osd_xattr_set(env, dt, buf, name, fl); } /* @@ -1452,7 +1922,7 @@ static int osd_xattr_list(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; + struct dentry *dentry = &info->oti_obj_dentry; LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL); @@ -1477,7 +1947,9 @@ static int osd_xattr_del(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; + struct dentry *dentry = &info->oti_obj_dentry; + struct timespec *t = &info->oti_time; + int rc; LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL); @@ -1488,7 +1960,16 @@ static int osd_xattr_del(const struct lu_env *env, return -EACCES; dentry->d_inode = inode; - return inode->i_op->removexattr(dentry, name); + *t = inode->i_ctime; + rc = inode->i_op->removexattr(dentry, name); + if (likely(rc == 0)) { + /* ctime should not be updated with server-side time. */ + spin_lock(&obj->oo_guard); + inode->i_ctime = *t; + spin_unlock(&obj->oo_guard); + mark_inode_dirty(inode); + } + return rc; } static struct obd_capa *osd_capa_get(const struct lu_env *env, @@ -1503,6 +1984,7 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, struct lustre_capa_key *key = &info->oti_capa_key; struct lustre_capa *capa = &info->oti_capa; struct obd_capa *oc; + struct md_capainfo *ci; int rc; ENTRY; @@ -1510,16 +1992,47 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, RETURN(ERR_PTR(-ENOENT)); LASSERT(dt_object_exists(dt)); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); /* renewal sanity check */ if (old && osd_object_auth(env, dt, old, opc)) RETURN(ERR_PTR(-EACCES)); + ci = md_capainfo(env); + if (unlikely(!ci)) + RETURN(ERR_PTR(-ENOENT)); + + switch (ci->mc_auth) { + case LC_ID_NONE: + RETURN(NULL); + case LC_ID_PLAIN: + capa->lc_uid = obj->oo_inode->i_uid; + capa->lc_gid = obj->oo_inode->i_gid; + capa->lc_flags = LC_ID_PLAIN; + break; + case LC_ID_CONVERT: { + __u32 d[4], s[4]; + + s[0] = obj->oo_inode->i_uid; + get_random_bytes(&(s[1]), sizeof(__u32)); + s[2] = obj->oo_inode->i_gid; + get_random_bytes(&(s[3]), sizeof(__u32)); + rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN); + if (unlikely(rc)) + RETURN(ERR_PTR(rc)); + + capa->lc_uid = ((__u64)d[1] << 32) | d[0]; + capa->lc_gid = ((__u64)d[3] << 32) | d[2]; + capa->lc_flags = LC_ID_CONVERT; + break; + } + default: + RETURN(ERR_PTR(-EINVAL)); + } + capa->lc_fid = *fid; capa->lc_opc = opc; - capa->lc_uid = 0; - capa->lc_flags = dev->od_capa_alg << 24; + capa->lc_flags |= dev->od_capa_alg << 24; capa->lc_timeout = dev->od_capa_timeout; capa->lc_expiry = 0; @@ -1534,7 +2047,7 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, spin_unlock(&capa_lock); capa->lc_keyid = key->lk_keyid; - capa->lc_expiry = CURRENT_SECONDS + dev->od_capa_timeout; + capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout; rc = capa_hmac(capa->lc_hmac, capa, key->lk_key); if (rc) { @@ -1546,7 +2059,37 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, RETURN(oc); } -static struct dt_object_operations osd_obj_ops = { +static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) +{ + int rc; + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *dentry = &info->oti_obj_dentry; + struct file *file = &info->oti_file; + ENTRY; + + dentry->d_inode = inode; + file->f_dentry = dentry; + file->f_mapping = inode->i_mapping; + file->f_op = inode->i_fop; + LOCK_INODE_MUTEX(inode); + rc = file->f_op->fsync(file, dentry, 0); + UNLOCK_INODE_MUTEX(inode); + RETURN(rc); +} + +static int osd_data_get(const struct lu_env *env, struct dt_object *dt, + void **data) +{ + struct osd_object *obj = osd_dt_obj(dt); + ENTRY; + + *data = (void *)obj->oo_inode; + RETURN(0); +} + +static const struct dt_object_operations osd_obj_ops = { .do_read_lock = osd_object_read_lock, .do_write_lock = osd_object_write_lock, .do_read_unlock = osd_object_read_unlock, @@ -1563,6 +2106,33 @@ static struct dt_object_operations osd_obj_ops = { .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, +}; + +/** + * dt_object_operations for interoperability mode + * (i.e. to run 2.0 mds on 1.8 disk) (b11826) + */ +static const struct dt_object_operations osd_obj_ea_ops = { + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_attr_get = osd_attr_get, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_create = osd_object_ea_create, + .do_index_try = osd_index_try, + .do_ref_add = osd_object_ref_add, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_xattr_set = osd_xattr_set, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, }; /* @@ -1598,11 +2168,15 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, struct lustre_capa *capa) + struct thandle *handle, struct lustre_capa *capa, + int ignore_quota) { struct inode *inode = osd_dt_obj(dt)->oo_inode; struct osd_thandle *oh; ssize_t result; +#ifdef HAVE_QUOTA_SUPPORT + cfs_cap_t save = current->cap_effective; +#endif LASSERT(handle != NULL); @@ -1611,14 +2185,23 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, oh = container_of(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle->h_transaction != NULL); +#ifdef HAVE_QUOTA_SUPPORT + if (ignore_quota) + current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; + else + current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; +#endif result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len, pos, oh->ot_handle); +#ifdef HAVE_QUOTA_SUPPORT + current->cap_effective = save; +#endif if (result == 0) result = buf->lb_len; return result; } -static struct dt_body_operations osd_body_ops = { +static const struct dt_body_operations osd_body_ops = { .dbo_read = osd_read, .dbo_write = osd_write }; @@ -1632,10 +2215,11 @@ static int osd_object_is_root(const struct osd_object *obj) return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode; } -static int osd_index_probe(const struct lu_env *env, struct osd_object *o, +static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o, const struct dt_index_features *feat) { struct iam_descr *descr; + struct dt_object *dt = &o->oo_dt; if (osd_object_is_root(o)) return feat == &dt_directory_features; @@ -1643,14 +2227,23 @@ static int osd_index_probe(const struct lu_env *env, struct osd_object *o, LASSERT(o->oo_dir != NULL); descr = o->oo_dir->od_container.ic_descr; - if (feat == &dt_directory_features) - return descr == &iam_htree_compat_param || - (descr->id_rec_size == sizeof(struct lu_fid_pack) && - 1 /* - * XXX check that index looks like directory. - */ - ); - else + if (feat == &dt_directory_features) { + if (descr->id_rec_size == sizeof(struct lu_fid_pack)) + return 1; + + if (descr == &iam_htree_compat_param) { + /* if it is a HTREE dir then there is good chance that, + * we dealing with ext3 directory here with no FIDs. */ + + if (descr->id_rec_size == + sizeof ((struct ldiskfs_dir_entry_2 *)NULL)->inode) { + + dt->do_index_ops = &osd_index_ea_ops; + return 1; + } + } + return 0; + } else { return feat->dif_keysize_min <= descr->id_key_size && descr->id_key_size <= feat->dif_keysize_max && @@ -1661,11 +2254,12 @@ static int osd_index_probe(const struct lu_env *env, struct osd_object *o, ergo(feat->dif_flags & DT_IND_UPDATE, 1 /* XXX check that object (and file system) is * writable */); + } } -static int osd_container_init(const struct lu_env *env, - struct osd_object *obj, - struct osd_directory *dir) +static int osd_iam_container_init(const struct lu_env *env, + struct osd_object *obj, + struct osd_directory *dir) { int result; struct iam_container *bag; @@ -1675,7 +2269,7 @@ static int osd_container_init(const struct lu_env *env, if (result == 0) { result = iam_container_setup(bag); if (result == 0) - obj->oo_dt.do_index_ops = &osd_index_ops; + obj->oo_dt.do_index_ops = &osd_index_iam_ops; else iam_container_fini(bag); } @@ -1689,14 +2283,23 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat) { int result; + int ea_dir = 0; struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); if (osd_object_is_root(obj)) { - dt->do_index_ops = &osd_index_compat_ops; + dt->do_index_ops = &osd_index_ea_ops; result = 0; + } else if (feat == &dt_directory_features && osd->od_iop_mode) { + dt->do_index_ops = &osd_index_ea_ops; + if (S_ISDIR(obj->oo_inode->i_mode)) + result = 0; + else + result = -ENOTDIR; + ea_dir = 1; } else if (!osd_has_index(obj)) { struct osd_directory *dir; @@ -1722,7 +2325,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, * recheck under lock. */ if (!osd_has_index(obj)) - result = osd_container_init(env, obj, dir); + result = osd_iam_container_init(env, obj, dir); else result = 0; up(&obj->oo_dir->od_sem); @@ -1731,18 +2334,30 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, } else result = 0; - if (result == 0) { - if (!osd_index_probe(env, obj, feat)) + if (result == 0 && ea_dir == 0) { + if (!osd_iam_index_probe(env, obj, feat)) result = -ENOTDIR; } - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); return result; } -static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *handle, - struct lustre_capa *capa) +/** + * delete a (key, value) pair from index \a dt specified by \a key + * + * \param dt_object osd index object + * \param key key for index + * \param rec record reference + * \param handle transaction handler + * + * \retval 0 success + * \retval -ve failure + */ + +static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *key, struct thandle *handle, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct osd_thandle *oh; @@ -1752,7 +2367,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, ENTRY; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); LASSERT(bag->ic_object == obj->oo_inode); LASSERT(handle != NULL); @@ -1760,7 +2375,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) RETURN(-EACCES); - ipd = osd_ipd_get(env, bag); + ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) RETURN(-ENOMEM); @@ -1770,19 +2385,83 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd); osd_ipd_put(env, bag, ipd); + LINVRNT(osd_invariant(obj)); + RETURN(rc); +} + +/** + * Index delete function for interoperability mode (b11826). + * It will remove the directory entry added by osd_index_ea_insert(). + * This entry is needed to maintain name->fid mapping. + * + * \param key, key i.e. file entry to be deleted + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *key, struct thandle *handle, + struct lustre_capa *capa) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *dir = obj->oo_inode; + struct dentry *dentry; + struct osd_thandle *oh; + struct ldiskfs_dir_entry_2 *de; + struct buffer_head *bh; + + int rc; + + ENTRY; + + LINVRNT(osd_invariant(obj)); + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle != NULL); + LASSERT(oh->ot_handle->h_transaction != NULL); + + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) + RETURN(-EACCES); + + dentry = osd_child_dentry_get(env, obj, + (char *)key, strlen((char *)key)); + bh = ldiskfs_find_entry(dentry, &de); + if (bh) { + rc = ldiskfs_delete_entry(oh->ot_handle, + dir, de, bh); + if (!rc) + mark_inode_dirty(dir); + brelse(bh); + } else + rc = -ENOENT; + LASSERT(osd_invariant(obj)); RETURN(rc); } -static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) +/** + * Lookup index for \a key and copy record to \a rec. + * + * \param dt_object osd index object + * \param key key for index + * \param rec record reference + * + * \retval +ve success : exact mach + * \retval 0 return record with key not greater than \a key + * \retval -ve failure + */ +static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct iam_path_descr *ipd; struct iam_container *bag = &obj->oo_dir->od_container; + struct osd_thread_info *oti = osd_oti_get(env); + struct iam_iterator *it = &oti->oti_idx_it; int rc; - ENTRY; LASSERT(osd_invariant(obj)); @@ -1790,33 +2469,56 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, LASSERT(bag->ic_object == obj->oo_inode); if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) - return -EACCES; + RETURN(-EACCES); - ipd = osd_ipd_get(env, bag); - if (unlikely(ipd == NULL)) + ipd = osd_idx_ipd_get(env, bag); + if (IS_ERR(ipd)) RETURN(-ENOMEM); - rc = iam_lookup(bag, (const struct iam_key *)key, - (struct iam_rec *)rec, ipd); + /* got ipd now we can start iterator. */ + iam_it_init(it, bag, 0, ipd); + + rc = iam_it_get(it, (struct iam_key *)key); + if (rc >= 0) + iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec); + + iam_it_put(it); + iam_it_fini(it); osd_ipd_put(env, bag, ipd); - LASSERT(osd_invariant(obj)); + + LINVRNT(osd_invariant(obj)); RETURN(rc); } -static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, - const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, struct lustre_capa *capa) +/** + * Inserts (key, value) pair in \a dt index object. + * + * \param dt osd index object + * \param key key for index + * \param rec record reference + * \param th transaction handler + * + * \retval 0 success + * \retval -ve failure + */ +static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, const struct dt_key *key, + struct thandle *th, struct lustre_capa *capa, + int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); struct iam_path_descr *ipd; struct osd_thandle *oh; struct iam_container *bag = &obj->oo_dir->od_container; +#ifdef HAVE_QUOTA_SUPPORT + cfs_cap_t save = current->cap_effective; +#endif int rc; ENTRY; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); LASSERT(bag->ic_object == obj->oo_inode); LASSERT(th != NULL); @@ -1824,388 +2526,810 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) return -EACCES; - ipd = osd_ipd_get(env, bag); - if (unlikely(ipd == NULL)) - RETURN(-ENOMEM); + ipd = osd_idx_ipd_get(env, bag); + if (unlikely(ipd == NULL)) + RETURN(-ENOMEM); + + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle != NULL); + LASSERT(oh->ot_handle->h_transaction != NULL); +#ifdef HAVE_QUOTA_SUPPORT + if (ignore_quota) + current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; + else + current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; +#endif + rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key, + (struct iam_rec *)rec, ipd); +#ifdef HAVE_QUOTA_SUPPORT + current->cap_effective = save; +#endif + osd_ipd_put(env, bag, ipd); + LINVRNT(osd_invariant(obj)); + RETURN(rc); +} + +/** + * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries + * into the directory.Also sets flags into osd object to + * indicate dot and dotdot are created. This is required for + * interoperability mode (b11826) + * + * \param dir directory for dot and dotdot fixup. + * \param obj child object for linking + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_add_dot_dotdot(struct osd_thread_info *info, + struct osd_object *dir, + struct osd_object *obj, const char *name, + struct thandle *th) +{ + struct inode *parent_dir = obj->oo_inode; + struct inode *inode = dir->oo_inode; + struct osd_thandle *oth; + int result = 0; + + oth = container_of(th, struct osd_thandle, ot_super); + LASSERT(oth->ot_handle->h_transaction != NULL); + LASSERT(S_ISDIR(dir->oo_inode->i_mode)); + + if (strcmp(name, dot) == 0) { + if (dir->oo_compat_dot_created) { + result = -EEXIST; + } else { + LASSERT(obj == dir); + dir->oo_compat_dot_created = 1; + result = 0; + } + } else if(strcmp(name, dotdot) == 0) { + if (!dir->oo_compat_dot_created) + return -EINVAL; + if (dir->oo_compat_dotdot_created) + return __osd_ea_add_rec(info, dir, obj, name, th); + + result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode); + if (result == 0) + dir->oo_compat_dotdot_created = 1; + } + + return result; +} + +/** + * Calls ldiskfs_add_entry() to add directory entry + * into the directory. This is required for + * interoperability mode (b11826) + * + * \retval 0, on success + * \retval -ve, on error + */ +static int __osd_ea_add_rec(struct osd_thread_info *info, + struct osd_object *pobj, + struct osd_object *cobj, + const char *name, + struct thandle *th) +{ + struct dentry *child; + struct osd_thandle *oth; + struct inode *cinode = cobj->oo_inode; + int rc; + + oth = container_of(th, struct osd_thandle, ot_super); + LASSERT(oth->ot_handle != NULL); + LASSERT(oth->ot_handle->h_transaction != NULL); + + child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name)); + rc = ldiskfs_add_entry(oth->ot_handle, child, cinode); + + RETURN(rc); +} + +/** + * It will call the appropriate osd_add* function and return the + * value, return by respective functions. + */ +static int osd_ea_add_rec(const struct lu_env *env, + struct osd_object *pobj, + struct osd_object *cobj, + const char *name, + struct thandle *th) +{ + struct osd_thread_info *info = osd_oti_get(env); + int rc; + + if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' && + name[2] =='\0'))) + rc = osd_add_dot_dotdot(info, pobj, cobj, name, th); + else + rc = __osd_ea_add_rec(info, pobj, cobj, name, th); + + return rc; +} + +/** + * Calls ->lookup() to find dentry. From dentry get inode and + * read inode's ea to get fid. This is required for interoperability + * mode (b11826) + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, + struct dt_rec *rec, const struct dt_key *key) +{ + struct inode *dir = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *dentry; + struct osd_device *dev = osd_dev(obj->oo_dt.do_lu.lo_dev); + struct osd_inode_id *id = &info->oti_id; + struct ldiskfs_dir_entry_2 *de; + struct buffer_head *bh; + struct inode *inode; + int ino; + int rc; + + LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL); + + dentry = osd_child_dentry_get(env, obj, + (char *)key, strlen((char *)key)); + bh = ldiskfs_find_entry(dentry, &de); + if (bh) { + ino = le32_to_cpu(de->inode); + brelse(bh); + id->oii_ino = ino; + id->oii_gen = OSD_OII_NOGEN; + + inode = osd_iget(info, dev, id); + if (!IS_ERR(inode)) { + dentry->d_inode = inode; + + rc = osd_ea_fid_get(env, dentry, rec); + iput(inode); + } else + rc = -ENOENT; + } else + rc = -ENOENT; + + RETURN (rc); +} + +/** + * Find the osd object for given fid. + * + * \param fid, need to find the osd object having this fid + * + * \retval osd_object, on success + * \retval -ve, on error + */ +struct osd_object *osd_object_find(const struct lu_env *env, + struct dt_object *dt, + const struct lu_fid *fid) +{ + struct lu_device *ludev = dt->do_lu.lo_dev; + struct osd_object *child = NULL; + struct lu_object *luch; + struct lu_object *lo; + + luch = lu_object_find(env, ludev, fid, NULL); + if (!IS_ERR(luch)) { + if (lu_object_exists(luch)) { + lo = lu_object_locate(luch->lo_header, ludev->ld_type); + if (lo != NULL) + child = osd_obj(lo); + else + LU_OBJECT_DEBUG(D_ERROR, env, luch, + "lu_object can't be located" + ""DFID"\n", PFID(fid)); + + if (child == NULL) { + lu_object_put(env, luch); + CERROR("Unable to get osd_object\n"); + child = ERR_PTR(-ENOENT); + } + } else { + LU_OBJECT_DEBUG(D_ERROR, env, luch, + "lu_object does not exists "DFID"\n", + PFID(fid)); + child = ERR_PTR(-ENOENT); + } + } else + child = (void *)luch; + + return child; +} + +/** + * Put the osd object once done with it. + * + * \param obj, osd object that needs to be put + */ +static inline void osd_object_put(const struct lu_env *env, + struct osd_object *obj) +{ + lu_object_put(env, &obj->oo_dt.do_lu); +} + +/** + * Index add function for interoperability mode (b11826). + * It will add the directory entry.This entry is needed to + * maintain name->fid mapping. + * + * \param key, it is key i.e. file entry to be inserted + * \param rec, it is value of given key i.e. fid + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th, + struct lustre_capa *capa, int ignore_quota) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + const struct lu_fid_pack *pack = (const struct lu_fid_pack *)rec; + const char *name = (const char *)key; + struct osd_object *child; +#ifdef HAVE_QUOTA_SUPPORT + cfs_cap_t save = current->cap_effective; +#endif + int rc; + + ENTRY; + + LASSERT(osd_invariant(obj)); + LASSERT(dt_object_exists(dt)); + LASSERT(th != NULL); + + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) + RETURN(-EACCES); + + rc = fid_unpack(pack, fid); + if (rc != 0) + RETURN(rc); + child = osd_object_find(env, dt, fid); + if (!IS_ERR(child)) { +#ifdef HAVE_QUOTA_SUPPORT + if (ignore_quota) + current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; + else + current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; +#endif + rc = osd_ea_add_rec(env, obj, child, name, th); + +#ifdef HAVE_QUOTA_SUPPORT + current->cap_effective = save; +#endif + osd_object_put(env, child); + } else { + rc = PTR_ERR(child); + } - oh = container_of0(th, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle != NULL); - LASSERT(oh->ot_handle->h_transaction != NULL); - rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key, - (struct iam_rec *)rec, ipd); - osd_ipd_put(env, bag, ipd); LASSERT(osd_invariant(obj)); RETURN(rc); } -/* - * Iterator operations. +/** + * Initialize osd Iterator for given osd index object. + * + * \param dt osd index object */ -struct osd_it { - struct osd_object *oi_obj; - struct iam_path_descr *oi_ipd; - struct iam_iterator oi_it; -}; -static struct dt_it *osd_it_init(const struct lu_env *env, - struct dt_object *dt, int writable, +static struct dt_it *osd_it_iam_init(const struct lu_env *env, + struct dt_object *dt, struct lustre_capa *capa) { - struct osd_it *it; + struct osd_it_iam *it; + struct osd_thread_info *oti = osd_oti_get(env); struct osd_object *obj = osd_dt_obj(dt); struct lu_object *lo = &dt->do_lu; struct iam_path_descr *ipd; struct iam_container *bag = &obj->oo_dir->od_container; - __u32 flags; LASSERT(lu_object_exists(lo)); - if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE : - CAPA_OPC_BODY_READ)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) return ERR_PTR(-EACCES); - flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE; - OBD_ALLOC_PTR(it); - if (it != NULL) { - /* - * XXX: as ipd is allocated within osd_thread_info, assignment - * below implies that iterator usage is confined within single - * environment. - */ - ipd = osd_ipd_get(env, bag); - if (likely(ipd != NULL)) { - it->oi_obj = obj; - it->oi_ipd = ipd; - lu_object_get(lo); - iam_it_init(&it->oi_it, bag, flags, ipd); - return (struct dt_it *)it; - } else - OBD_FREE_PTR(it); + it = &oti->oti_it; + ipd = osd_it_ipd_get(env, bag); + if (likely(ipd != NULL)) { + it->oi_obj = obj; + it->oi_ipd = ipd; + lu_object_get(lo); + iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd); + return (struct dt_it *)it; } return ERR_PTR(-ENOMEM); } -static void osd_it_fini(const struct lu_env *env, struct dt_it *di) +/** + * free given Iterator. + */ + +static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; struct osd_object *obj = it->oi_obj; iam_it_fini(&it->oi_it); osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd); lu_object_put(env, &obj->oo_dt.do_lu); - OBD_FREE_PTR(it); } -static int osd_it_get(const struct lu_env *env, +/** + * Move Iterator to record specified by \a key + * + * \param di osd iterator + * \param key key for index + * + * \retval +ve di points to record with least key not larger than key + * \retval 0 di points to exact matched key + * \retval -ve failure + */ + +static int osd_it_iam_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_get(&it->oi_it, (const struct iam_key *)key); } -static void osd_it_put(const struct lu_env *env, struct dt_it *di) +/** + * Release Iterator + * + * \param di osd iterator + */ + +static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; iam_it_put(&it->oi_it); } -static int osd_it_next(const struct lu_env *env, struct dt_it *di) +/** + * Move iterator by one record + * + * \param di osd iterator + * + * \retval +1 end of container reached + * \retval 0 success + * \retval -ve failure + */ + +static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_next(&it->oi_it); } -static int osd_it_del(const struct lu_env *env, struct dt_it *di, - struct thandle *th) -{ - struct osd_it *it = (struct osd_it *)di; - struct osd_thandle *oh; - - LASSERT(th != NULL); - - oh = container_of0(th, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle != NULL); - LASSERT(oh->ot_handle->h_transaction != NULL); - - return iam_it_rec_delete(oh->ot_handle, &it->oi_it); -} +/** + * Return pointer to the key under iterator. + */ -static struct dt_key *osd_it_key(const struct lu_env *env, +static struct dt_key *osd_it_iam_key(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return (struct dt_key *)iam_it_key_get(&it->oi_it); } -static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di) +/** + * Return size of key under iterator (in bytes) + */ + +static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_key_size(&it->oi_it); } -static struct dt_rec *osd_it_rec(const struct lu_env *env, +/** + * Return pointer to the record under iterator. + */ +static struct dt_rec *osd_it_iam_rec(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return (struct dt_rec *)iam_it_rec_get(&it->oi_it); } -static __u32 osd_it_store(const struct lu_env *env, const struct dt_it *di) +/** + * Returns cookie for current Iterator position. + */ +static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_store(&it->oi_it); } -static int osd_it_load(const struct lu_env *env, - const struct dt_it *di, __u32 hash) +/** + * Restore iterator from cookie. + * + * \param di osd iterator + * \param hash Iterator location cookie + * + * \retval +ve di points to record with least key not larger than key. + * \retval 0 di points to exact matched key + * \retval -ve failure + */ + +static int osd_it_iam_load(const struct lu_env *env, + const struct dt_it *di, __u64 hash) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_load(&it->oi_it, hash); } -static struct dt_index_operations osd_index_ops = { - .dio_lookup = osd_index_lookup, - .dio_insert = osd_index_insert, - .dio_delete = osd_index_delete, +static const struct dt_index_operations osd_index_iam_ops = { + .dio_lookup = osd_index_iam_lookup, + .dio_insert = osd_index_iam_insert, + .dio_delete = osd_index_iam_delete, .dio_it = { - .init = osd_it_init, - .fini = osd_it_fini, - .get = osd_it_get, - .put = osd_it_put, - .del = osd_it_del, - .next = osd_it_next, - .key = osd_it_key, - .key_size = osd_it_key_size, - .rec = osd_it_rec, - .store = osd_it_store, - .load = osd_it_load + .init = osd_it_iam_init, + .fini = osd_it_iam_fini, + .get = osd_it_iam_get, + .put = osd_it_iam_put, + .next = osd_it_iam_next, + .key = osd_it_iam_key, + .key_size = osd_it_iam_key_size, + .rec = osd_it_iam_rec, + .store = osd_it_iam_store, + .load = osd_it_iam_load } }; -static int osd_index_compat_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa) +/** + * Creates or initializes iterator context. + * + * \retval struct osd_it_ea, iterator structure on success + * + */ +static struct dt_it *osd_it_ea_init(const struct lu_env *env, + struct dt_object *dt, + struct lustre_capa *capa) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *info = osd_oti_get(env); + struct osd_it_ea *it = &info->oti_it_ea; + struct lu_object *lo = &dt->do_lu; + struct dentry *obj_dentry = &info->oti_it_dentry; + ENTRY; + LASSERT(lu_object_exists(lo)); + + obj_dentry->d_inode = obj->oo_inode; + obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); + obj_dentry->d_name.hash = 0; + + it->oie_namelen = 0; + it->oie_curr_pos = 0; + it->oie_next_pos = 0; + it->oie_obj = obj; + it->oie_file.f_dentry = obj_dentry; + it->oie_file.f_mapping = obj->oo_inode->i_mapping; + it->oie_file.f_op = obj->oo_inode->i_fop; + it->oie_file.private_data = NULL; + lu_object_get(lo); + + RETURN((struct dt_it*) it); +} + +/** + * Destroy or finishes iterator context. + * + * \param di, struct osd_it_ea, iterator structure to be destroyed + */ +static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + - LASSERT(handle != NULL); - LASSERT(S_ISDIR(obj->oo_inode->i_mode)); ENTRY; + lu_object_put(env, &obj->oo_dt.do_lu); + EXIT; +} -#if 0 - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) - RETURN(-EACCES); -#endif +/** + * It position the iterator at given key, so that next lookup continues from + * that key Or it is similar to dio_it->load() but based on a key, + * rather than file position. + * + * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator + * to the beginning. + * + * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty(). + */ +static int osd_it_ea_get(const struct lu_env *env, + struct dt_it *di, const struct dt_key *key) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; - RETURN(-EOPNOTSUPP); + ENTRY; + LASSERT(((const char *)key)[0] == '\0'); + it->oie_namelen = 0; + it->oie_curr_pos = 0; + it->oie_next_pos = 0; + + RETURN(+1); } -/* - * Compatibility index operations. +/** + * Does nothing */ +static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di) +{ +} - -static void osd_build_pack(const struct lu_env *env, struct osd_device *osd, - struct dentry *dentry, struct lu_fid_pack *pack) +/** + * It is called internally by ->readdir(). It fills the + * iterator's in-memory data structure with required + * information i.e. name, namelen, rec_size etc. + * + * \param buf, in which information to be filled in. + * \param name, name of the file in given dir + * + * \retval 0, on success + * \retval 1, on buffer full + */ +static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, + loff_t offset, ino_t ino, + unsigned int d_type) { - struct inode *inode = dentry->d_inode; - struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + struct osd_it_ea *it = (struct osd_it_ea *)buf; + struct dirent64 *dirent = &it->oie_dirent64; + int reclen = LDISKFS_DIR_REC_LEN(namelen); - lu_igif_build(fid, inode->i_ino, inode->i_generation); - fid_cpu_to_be(fid, fid); - pack->fp_len = sizeof *fid + 1; - memcpy(pack->fp_area, fid, sizeof *fid); + + ENTRY; + if (it->oie_namelen) + RETURN(-ENOENT); + + if (namelen == 0 || namelen > LDISKFS_NAME_LEN) + RETURN(-EIO); + + strncpy(dirent->d_name, name, LDISKFS_NAME_LEN); + dirent->d_name[namelen] = 0; + dirent->d_ino = ino; + dirent->d_off = offset; + dirent->d_reclen = reclen; + it->oie_namelen = namelen; + it->oie_curr_pos = offset; + + RETURN(0); } -static int osd_index_compat_lookup(const struct lu_env *env, - struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) +/** + * Calls ->readdir() to load a directory entry at a time + * and stored it in iterator's in-memory data structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval 0, on success + * \retval -ve, on error + */ +int osd_ldiskfs_it_fill(const struct dt_it *di) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + struct inode *inode = obj->oo_inode; + int result = 0; - struct osd_device *osd = osd_obj2dev(obj); - struct osd_thread_info *info = osd_oti_get(env); - struct inode *dir; + ENTRY; + it->oie_namelen = 0; + it->oie_file.f_pos = it->oie_curr_pos; - int result; + result = inode->i_fop->readdir(&it->oie_file, it, + (filldir_t) osd_ldiskfs_filldir); - /* - * XXX temporary solution. - */ - struct dentry *dentry; - struct dentry *parent; + it->oie_next_pos = it->oie_file.f_pos; - LASSERT(osd_invariant(obj)); - LASSERT(S_ISDIR(obj->oo_inode->i_mode)); - LASSERT(osd_has_index(obj)); + if(!result && it->oie_namelen == 0) + result = -EIO; - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) - return -EACCES; + RETURN(result); +} - info->oti_str.name = (const char *)key; - info->oti_str.len = strlen((const char *)key); +/** + * It calls osd_ldiskfs_it_fill() which will use ->readdir() + * to load a directory entry at a time and stored it in + * iterator's in-memory data structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval +ve, iterator reached to end + * \retval 0, iterator not reached to end + * \retval -ve, on error + */ +static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + int rc; - dir = obj->oo_inode; - LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL); + ENTRY; + it->oie_curr_pos = it->oie_next_pos; - parent = d_alloc_root(dir); - if (parent == NULL) - return -ENOMEM; - igrab(dir); - dentry = d_alloc(parent, &info->oti_str); - if (dentry != NULL) { - struct dentry *d; + if (it->oie_curr_pos == LDISKFS_HTREE_EOF) + rc = +1; + else + rc = osd_ldiskfs_it_fill(di); - /* - * XXX passing NULL for nameidata should work for - * ext3/ldiskfs. - */ - d = dir->i_op->lookup(dir, dentry, NULL); - if (d == NULL) { - /* - * normal case, result is in @dentry. - */ - if (dentry->d_inode != NULL) { - osd_build_pack(env, osd, dentry, - (struct lu_fid_pack *)rec); - result = 0; - } else - result = -ENOENT; - } else { - /* What? Disconnected alias? Ppheeeww... */ - CERROR("Aliasing where not expected\n"); - result = -EIO; - dput(d); - } - dput(dentry); - } else - result = -ENOMEM; - dput(parent); - LASSERT(osd_invariant(obj)); - return result; + RETURN(rc); } -static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev, - struct inode *dir, struct inode *inode, const char *name) +/** + * Returns the key at current position from iterator's in memory structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval key i.e. struct dt_key on success + */ +static struct dt_key *osd_it_ea_key(const struct lu_env *env, + const struct dt_it *di) { - struct dentry *old; - struct dentry *new; - struct dentry *parent; + struct osd_it_ea *it = (struct osd_it_ea *)di; + ENTRY; + RETURN((struct dt_key *)it->oie_dirent64.d_name); +} - int result; +/** + * Returns the key's size at current position from iterator's in memory structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval key_size i.e. struct dt_key on success + */ +static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + ENTRY; + RETURN(it->oie_namelen); +} - info->oti_str.name = name; - info->oti_str.len = strlen(name); - - LASSERT(atomic_read(&dir->i_count) > 0); - result = -ENOMEM; - old = d_alloc(dev->od_obj_area, &info->oti_str); - if (old != NULL) { - d_instantiate(old, inode); - igrab(inode); - LASSERT(atomic_read(&dir->i_count) > 0); - parent = d_alloc_root(dir); - if (parent != NULL) { - igrab(dir); - LASSERT(atomic_read(&dir->i_count) > 1); - new = d_alloc(parent, &info->oti_str); - LASSERT(atomic_read(&dir->i_count) > 1); - if (new != NULL) { - LASSERT(atomic_read(&dir->i_count) > 1); - result = dir->i_op->link(old, dir, new); - LASSERT(atomic_read(&dir->i_count) > 1); - dput(new); - LASSERT(atomic_read(&dir->i_count) > 1); - } - LASSERT(atomic_read(&dir->i_count) > 1); - dput(parent); - LASSERT(atomic_read(&dir->i_count) > 0); - } - dput(old); +/** + * Returns the value (i.e. fid/igif) at current position from iterator's + * in memory structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval value i.e. struct dt_rec on success + */ +static struct dt_rec *osd_it_ea_rec(const struct lu_env *env, + const struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + struct osd_thread_info *info = osd_oti_get(env); + struct osd_inode_id *id = &info->oti_id; + struct lu_fid_pack *rec = &info->oti_pack; + struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; + struct dentry *dentry = &info->oti_child_dentry; + struct osd_device *dev; + struct inode *inode; + int rc; + + ENTRY; + dev = osd_dev(ldev); + id->oii_ino = it->oie_dirent64.d_ino; + id->oii_gen = OSD_OII_NOGEN; + inode = osd_iget(info, dev, id); + if (!IS_ERR(inode)) { + dentry->d_inode = inode; + LASSERT(dentry->d_inode->i_sb == osd_sb(dev)); + } else { + CERROR("Error getting inode for ino =%d", id->oii_ino); + RETURN((struct dt_rec *) PTR_ERR(inode)); } - LASSERT(atomic_read(&dir->i_count) > 0); - return result; + + rc = osd_ea_fid_get(env, dentry, (struct dt_rec*) rec); + + iput(inode); + RETURN((struct dt_rec *)rec); + } +/** + * Returns a cookie for current position of the iterator head, so that + * user can use this cookie to load/start the iterator next time. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval cookie for current position, on success + */ +static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + ENTRY; + RETURN(it->oie_curr_pos); +} -/* - * XXX Temporary stuff. +/** + * It calls osd_ldiskfs_it_fill() which will use ->readdir() + * to load a directory entry at a time and stored it i inn, + * in iterator's in-memory data structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval +ve, on success + * \retval -ve, on error */ -static int osd_index_compat_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa) +static int osd_it_ea_load(const struct lu_env *env, + const struct dt_it *di, __u64 hash) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_it_ea *it = (struct osd_it_ea *)di; + int rc; + + ENTRY; + it->oie_curr_pos = it->oie_next_pos = hash; - const char *name = (const char *)key; + rc = osd_ldiskfs_it_fill(di); + if (rc == 0) + rc = +1; - struct lu_device *ludev = dt->do_lu.lo_dev; - struct lu_object *luch; + RETURN(rc); +} +/** + * Index and Iterator operations for interoperability + * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826) + */ +static const struct dt_index_operations osd_index_ea_ops = { + .dio_lookup = osd_index_ea_lookup, + .dio_insert = osd_index_ea_insert, + .dio_delete = osd_index_ea_delete, + .dio_it = { + .init = osd_it_ea_init, + .fini = osd_it_ea_fini, + .get = osd_it_ea_get, + .put = osd_it_ea_put, + .next = osd_it_ea_next, + .key = osd_it_ea_key, + .key_size = osd_it_ea_key_size, + .rec = osd_it_ea_rec, + .store = osd_it_ea_store, + .load = osd_it_ea_load + } +}; - struct osd_thread_info *info = osd_oti_get(env); - const struct lu_fid_pack *pack = (const struct lu_fid_pack *)rec; - struct lu_fid *fid = &osd_oti_get(env)->oti_fid; +/** + * Index lookup function for interoperability mode (b11826). + * + * \param key, key i.e. file name to be searched + * + * \retval +ve, on success + * \retval -ve, on error + */ +static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa) +{ + struct osd_object *obj = osd_dt_obj(dt); + int rc = 0; - int result; + ENTRY; LASSERT(S_ISDIR(obj->oo_inode->i_mode)); - LASSERT(osd_invariant(obj)); - LASSERT(th != NULL); + LINVRNT(osd_invariant(obj)); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) return -EACCES; - result = fid_unpack(pack, fid); - if (result != 0) - return result; + rc = osd_ea_lookup_rec(env, obj, rec, key); - luch = lu_object_find(env, ludev->ld_site, fid); - if (!IS_ERR(luch)) { - if (lu_object_exists(luch)) { - struct osd_object *child; - - child = osd_obj(lu_object_locate(luch->lo_header, - ludev->ld_type)); - if (child != NULL) - result = osd_add_rec(info, osd_obj2dev(obj), - obj->oo_inode, - child->oo_inode, name); - else { - CERROR("No osd slice.\n"); - result = -ENOENT; - } - LASSERT(osd_invariant(obj)); - LASSERT(osd_invariant(child)); - } else { - CERROR("Sorry.\n"); - result = -ENOENT; - } - lu_object_put(env, luch); - } else - result = PTR_ERR(luch); - LASSERT(osd_invariant(obj)); - return result; + if (rc == 0) + rc = +1; + RETURN(rc); } -static struct dt_index_operations osd_index_compat_ops = { - .dio_lookup = osd_index_compat_lookup, - .dio_insert = osd_index_compat_insert, - .dio_delete = osd_index_compat_delete -}; - /* type constructor/destructor: osd_type_init, osd_type_fini */ LU_TYPE_INIT_FINI(osd, &osd_key); @@ -2235,19 +3359,27 @@ LU_KEY_FINI(osd, struct osd_thread_info); static void osd_key_exit(const struct lu_context *ctx, struct lu_context_key *key, void *data) { -#if OSD_COUNTERS struct osd_thread_info *info = data; LASSERT(info->oti_r_locks == 0); LASSERT(info->oti_w_locks == 0); LASSERT(info->oti_txns == 0); -#endif } static int osd_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) { - return lu_env_init(&osd_dev(d)->od_env_for_commit, NULL, LCT_MD_THREAD); + int rc; + struct lu_context *ctx; + + /* context for commit hooks */ + ctx = &osd_dev(d)->od_env_for_commit.le_ctx; + rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF); + if (rc == 0) { + rc = osd_procfs_init(osd_dev(d), name); + ctx->lc_cookie = 0x3; + } + return rc; } static int osd_shutdown(const struct lu_env *env, struct osd_device *o) @@ -2255,7 +3387,7 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o) struct osd_thread_info *info = osd_oti_get(env); ENTRY; if (o->od_obj_area != NULL) { - dput(o->od_obj_area); + lu_object_put(env, &o->od_obj_area->do_lu); o->od_obj_area = NULL; } osd_oi_fini(info, &o->od_oi); @@ -2268,8 +3400,8 @@ static int osd_mount(const struct lu_env *env, { struct lustre_mount_info *lmi; const char *dev = lustre_cfg_string(cfg, 0); - struct osd_thread_info *info = osd_oti_get(env); - int result; + struct lustre_disk_data *ldd; + struct lustre_sb_info *lsi; ENTRY; @@ -2289,35 +3421,40 @@ static int osd_mount(const struct lu_env *env, /* save lustre_mount_info in dt_device */ o->od_mount = lmi; - result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev); - if (result == 0) { - struct dentry *d; + lsi = s2lsi(lmi->lmi_sb); + ldd = lsi->lsi_ldd; - d = simple_mkdir(osd_sb(o)->s_root, "*OBJ-TEMP*", 0777, 1); - if (!IS_ERR(d)) { - o->od_obj_area = d; - } else - result = PTR_ERR(d); - } - if (result != 0) - osd_shutdown(env, o); - RETURN(result); + if (ldd->ldd_flags & LDD_F_IAM_DIR) { + o->od_iop_mode = 0; + LCONSOLE_WARN("OSD: IAM mode enabled\n"); + } else + o->od_iop_mode = 1; + + o->od_obj_area = NULL; + RETURN(0); } static struct lu_device *osd_device_fini(const struct lu_env *env, struct lu_device *d) { + int rc; ENTRY; shrink_dcache_sb(osd_sb(osd_dev(d))); osd_sync(env, lu2dt_dev(d)); + rc = osd_procfs_fini(osd_dev(d)); + if (rc) { + CERROR("proc fini error %d \n", rc); + RETURN (ERR_PTR(rc)); + } + if (osd_dev(d)->od_mount) server_put_mount(osd_dev(d)->od_mount->lmi_name, osd_dev(d)->od_mount->lmi_mnt); osd_dev(d)->od_mount = NULL; - lu_env_fini(&osd_dev(d)->od_env_for_commit); + lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx); RETURN(NULL); } @@ -2340,22 +3477,30 @@ static struct lu_device *osd_device_alloc(const struct lu_env *env, spin_lock_init(&o->od_osfs_lock); o->od_osfs_age = cfs_time_shift_64(-1000); o->od_capa_hash = init_capa_hash(); - if (o->od_capa_hash == NULL) + if (o->od_capa_hash == NULL) { + dt_device_fini(&o->od_dt_dev); l = ERR_PTR(-ENOMEM); + } } else l = ERR_PTR(result); + + if (IS_ERR(l)) + OBD_FREE_PTR(o); } else l = ERR_PTR(-ENOMEM); return l; } -static void osd_device_free(const struct lu_env *env, struct lu_device *d) +static struct lu_device *osd_device_free(const struct lu_env *env, + struct lu_device *d) { struct osd_device *o = osd_dev(d); + ENTRY; cleanup_capa_hash(o->od_capa_hash); dt_device_fini(&o->od_dt_dev); OBD_FREE_PTR(o); + RETURN(NULL); } static int osd_process_config(const struct lu_env *env, @@ -2373,13 +3518,14 @@ static int osd_process_config(const struct lu_env *env, err = osd_shutdown(env, o); break; default: - err = -ENOTTY; + err = -ENOSYS; } RETURN(err); } + extern void ldiskfs_orphan_cleanup (struct super_block * sb, - struct ldiskfs_super_block * es); + struct ldiskfs_super_block * es); static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d) @@ -2391,6 +3537,49 @@ static int osd_recovery_complete(const struct lu_env *env, RETURN(0); } +static int osd_prepare(const struct lu_env *env, + struct lu_device *pdev, + struct lu_device *dev) +{ + struct osd_device *osd = osd_dev(dev); + struct lustre_sb_info *lsi; + struct lustre_disk_data *ldd; + struct lustre_mount_info *lmi; + struct osd_thread_info *oti = osd_oti_get(env); + struct dt_object *d; + int result; + + ENTRY; + /* 1. initialize oi before any file create or file open */ + result = osd_oi_init(oti, &osd->od_oi, + &osd->od_dt_dev, lu2md_dev(pdev)); + if (result != 0) + RETURN(result); + + lmi = osd->od_mount; + lsi = s2lsi(lmi->lmi_sb); + ldd = lsi->lsi_ldd; + + /* 2. setup local objects */ + result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev)); + if (result) + goto out; + + /* 3. open remote object dir */ + d = dt_store_open(env, lu2dt_dev(dev), "", + remote_obj_dir, &oti->oti_fid); + if (!IS_ERR(d)) { + osd->od_obj_area = d; + result = 0; + } else { + result = PTR_ERR(d); + osd->od_obj_area = NULL; + } + +out: + RETURN(result); +} + static struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev, const struct osd_inode_id *id) @@ -2405,7 +3594,8 @@ static struct inode *osd_iget(struct osd_thread_info *info, CERROR("bad inode\n"); iput(inode); inode = ERR_PTR(-ENOENT); - } else if (inode->i_generation != id->oii_gen) { + } else if (id->oii_gen != OSD_OII_NOGEN && + inode->i_generation != id->oii_gen) { CERROR("stale inode\n"); iput(inode); inode = ERR_PTR(-ESTALE); @@ -2426,7 +3616,7 @@ static int osd_fid_lookup(const struct lu_env *env, struct inode *inode; int result; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); LASSERT(fid_is_sane(fid)); /* @@ -2434,7 +3624,7 @@ static int osd_fid_lookup(const struct lu_env *env, * fids. Unfortunately it is somewhat expensive (does a * cache-lookup). Disabling it for production/acceptance-testing. */ - LASSERT(1 || fid_is_local(ldev->ld_site, fid)); + LASSERT(1 || fid_is_local(env, ldev->ld_site, fid)); ENTRY; @@ -2452,6 +3642,10 @@ static int osd_fid_lookup(const struct lu_env *env, if (!IS_ERR(inode)) { obj->oo_inode = inode; LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); + if (dev->od_iop_mode) { + obj->oo_compat_dot_created = 1; + obj->oo_compat_dotdot_created = 1; + } result = 0; } else /* @@ -2464,7 +3658,8 @@ static int osd_fid_lookup(const struct lu_env *env, result = PTR_ERR(inode); } else if (result == -ENOENT) result = 0; - LASSERT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); + RETURN(result); } @@ -2552,7 +3747,7 @@ static int osd_object_invariant(const struct lu_object *l) return osd_invariant(osd_obj(l)); } -static struct lu_object_operations osd_lu_obj_ops = { +static const struct lu_object_operations osd_lu_obj_ops = { .loo_object_init = osd_object_init, .loo_object_delete = osd_object_delete, .loo_object_release = osd_object_release, @@ -2561,16 +3756,20 @@ static struct lu_object_operations osd_lu_obj_ops = { .loo_object_invariant = osd_object_invariant }; -static struct lu_device_operations osd_lu_ops = { +static const struct lu_device_operations osd_lu_ops = { .ldo_object_alloc = osd_object_alloc, .ldo_process_config = osd_process_config, - .ldo_recovery_complete = osd_recovery_complete + .ldo_recovery_complete = osd_recovery_complete, + .ldo_prepare = osd_prepare, }; -static struct lu_device_type_operations osd_device_type_ops = { +static const struct lu_device_type_operations osd_device_type_ops = { .ldto_init = osd_type_init, .ldto_fini = osd_type_fini, + .ldto_start = osd_type_start, + .ldto_stop = osd_type_stop, + .ldto_device_alloc = osd_device_alloc, .ldto_device_free = osd_device_free, @@ -2588,29 +3787,23 @@ static struct lu_device_type osd_device_type = { /* * lprocfs legacy support. */ -static struct lprocfs_vars lprocfs_osd_obd_vars[] = { - { 0 } -}; - -static struct lprocfs_vars lprocfs_osd_module_vars[] = { - { 0 } -}; - static struct obd_ops osd_obd_device_ops = { .o_owner = THIS_MODULE }; -static void lprocfs_osd_init_vars(struct lprocfs_static_vars *lvars) -{ - lvars->module_vars = lprocfs_osd_module_vars; - lvars->obd_vars = lprocfs_osd_obd_vars; -} - +static struct lu_local_obj_desc llod_osd_rem_obj_dir = { + .llod_name = remote_obj_dir, + .llod_oid = OSD_REM_OBJ_DIR_OID, + .llod_is_index = 1, + .llod_feat = &dt_directory_features, +}; static int __init osd_mod_init(void) { struct lprocfs_static_vars lvars; + osd_oi_mod_init(); + llo_local_obj_register(&llod_osd_rem_obj_dir); lprocfs_osd_init_vars(&lvars); return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars, LUSTRE_OSD_NAME, &osd_device_type); @@ -2618,10 +3811,11 @@ static int __init osd_mod_init(void) static void __exit osd_mod_exit(void) { + llo_local_obj_unregister(&llod_osd_rem_obj_dir); class_unregister_type(LUSTRE_OSD_NAME); } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")"); MODULE_LICENSE("GPL");