X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd-ldiskfs%2Fosd_handler.c;h=84e7573c3fc1bec1d42242a4062b05712ffdd287;hp=3645cdc8292605f7c1da00df678a0f970ac2bdf4;hb=f2f09b6ecd85ceb05cd64907a71f87bbc49bfc21;hpb=bb82568ca39a55bfdcf9977be972d1dea8a705e1 diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 3645cdc..84e7573 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -28,6 +28,8 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -76,128 +78,24 @@ /* llo_* api support */ #include +#ifdef HAVE_LDISKFS_PDO +int ldiskfs_pdo = 1; +CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644, + "ldiskfs with parallel directory operations"); +#else +int ldiskfs_pdo = 0; +#endif + static const char dot[] = "."; static const char dotdot[] = ".."; static const char remote_obj_dir[] = "REM_OBJ_DIR"; -struct osd_directory { - struct iam_container od_container; - struct iam_descr od_descr; -}; - -struct osd_object { - struct dt_object oo_dt; - /** - * Inode for file system object represented by this osd_object. This - * inode is pinned for the whole duration of lu_object life. - * - * Not modified concurrently (either setup early during object - * creation, or assigned by osd_object_create() under write lock). - */ - struct inode *oo_inode; - /** - * to protect index ops. - */ - cfs_rw_semaphore_t oo_ext_idx_sem; - cfs_rw_semaphore_t oo_sem; - struct osd_directory *oo_dir; - /** protects inode attributes. */ - cfs_spinlock_t oo_guard; - /** - * Following two members are used to indicate the presence of dot and - * dotdot in the given directory. This is required for interop mode - * (b11826). - */ - int oo_compat_dot_created; - int oo_compat_dotdot_created; - - const struct lu_env *oo_owner; -#ifdef CONFIG_LOCKDEP - struct lockdep_map oo_dep_map; -#endif -}; - static const struct lu_object_operations osd_lu_obj_ops; -static const struct lu_device_operations osd_lu_ops; -static struct lu_context_key osd_key; static const struct dt_object_operations osd_obj_ops; static const struct dt_object_operations osd_obj_ea_ops; -static const struct dt_body_operations osd_body_ops; static const struct dt_index_operations osd_index_iam_ops; static const struct dt_index_operations osd_index_ea_ops; -struct osd_thandle { - struct thandle ot_super; - handle_t *ot_handle; - struct journal_callback ot_jcb; - /* Link to the device, for debugging. */ - struct lu_ref_link *ot_dev_link; - -#if OSD_THANDLE_STATS - /** time when this handle was allocated */ - cfs_time_t oth_alloced; - - /** time when this thanle was started */ - cfs_time_t oth_started; -#endif -}; - -/* - * Helpers. - */ -static int lu_device_is_osd(const struct lu_device *d) -{ - return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops); -} - -static struct osd_device *osd_dt_dev(const struct dt_device *d) -{ - LASSERT(lu_device_is_osd(&d->dd_lu_dev)); - return container_of0(d, struct osd_device, od_dt_dev); -} - -static struct osd_device *osd_dev(const struct lu_device *d) -{ - LASSERT(lu_device_is_osd(d)); - return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev)); -} - -static struct osd_device *osd_obj2dev(const struct osd_object *o) -{ - return osd_dev(o->oo_dt.do_lu.lo_dev); -} - -static struct super_block *osd_sb(const struct osd_device *dev) -{ - return dev->od_mount->lmi_mnt->mnt_sb; -} - -static int osd_object_is_root(const struct osd_object *obj) -{ - return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode; -} - -static struct osd_object *osd_obj(const struct lu_object *o) -{ - LASSERT(lu_device_is_osd(o->lo_dev)); - return container_of0(o, struct osd_object, oo_dt.do_lu); -} - -static struct osd_object *osd_dt_obj(const struct dt_object *d) -{ - return osd_obj(&d->do_lu); -} - -static struct lu_device *osd2lu_dev(struct osd_device *osd) -{ - return &osd->od_dt_dev.dd_lu_dev; -} - -static journal_t *osd_journal(const struct osd_device *dev) -{ - return LDISKFS_SB(osd_sb(dev))->s_journal; -} - static int osd_has_index(const struct osd_object *obj) { return obj->oo_dt.do_index_ops != NULL; @@ -212,8 +110,8 @@ static int osd_object_invariant(const struct lu_object *l) static inline void osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save) { - struct md_ucred *uc = md_ucred(env); - struct cred *tc; + struct md_ucred *uc = md_ucred(env); + struct cred *tc; LASSERT(uc != NULL); @@ -223,9 +121,10 @@ osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save) if ((tc = prepare_creds())) { tc->fsuid = uc->mu_fsuid; tc->fsgid = uc->mu_fsgid; - tc->cap_effective = uc->mu_cap; commit_creds(tc); } + /* XXX not suboptimal */ + cfs_curproc_cap_unpack(uc->mu_cap); } static inline void @@ -242,11 +141,6 @@ osd_pop_ctxt(struct osd_ctxt *save) } #endif -static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env) -{ - return lu_context_key_get(&env->le_ctx, &osd_key); -} - /* * Concurrency: doesn't matter */ @@ -277,6 +171,51 @@ static int osd_root_get(const struct lu_env *env, return 0; } +static inline int osd_qid_type(struct osd_thandle *oh, int i) +{ + return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA; +} + +static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type) +{ + oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0); +} + +void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh, + int type, uid_t id, struct inode *inode) +{ +#ifdef CONFIG_QUOTA + int i, allocated = 0; + struct osd_object *obj; + + LASSERT(dt != NULL); + LASSERT(oh != NULL); + LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u", + oh->ot_id_cnt); + + /* id entry is allocated in the quota file */ + if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off) + allocated = 1; + + for (i = 0; i < oh->ot_id_cnt; i++) { + if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type) + return; + } + + if (unlikely(i >= OSD_MAX_UGID_CNT)) { + CERROR("more than %d uid/gids for a transaction?\n", i); + return; + } + + oh->ot_id_array[i] = id; + osd_qid_set_type(oh, i, type); + oh->ot_id_cnt++; + obj = osd_dt_obj(dt); + oh->ot_credits += (allocated || id == 0) ? + 1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj))); +#endif +} + /* * OSD object methods. */ @@ -307,30 +246,23 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, cfs_init_rwsem(&mo->oo_ext_idx_sem); cfs_spin_lock_init(&mo->oo_guard); return l; - } else + } else { return NULL; + } } /* * retrieve object from backend ext fs. **/ -static struct inode *osd_iget(struct osd_thread_info *info, - struct osd_device *dev, - const struct osd_inode_id *id) +struct inode *osd_iget(struct osd_thread_info *info, + struct osd_device *dev, + const struct osd_inode_id *id) { struct inode *inode = NULL; -#ifdef HAVE_EXT4_LDISKFS inode = ldiskfs_iget(osd_sb(dev), id->oii_ino); - if (IS_ERR(inode)) - /* Newer kernels return an error instead of a NULL pointer */ - inode = NULL; -#else - inode = iget(osd_sb(dev), id->oii_ino); -#endif - if (inode == NULL) { - CERROR("no inode\n"); - inode = ERR_PTR(-EACCES); + if (IS_ERR(inode)) { + CERROR("Cannot get inode, rc = %li\n", PTR_ERR(inode)); } else if (id->oii_gen != OSD_OII_NOGEN && inode->i_generation != id->oii_gen) { iput(inode); @@ -346,6 +278,14 @@ static struct inode *osd_iget(struct osd_thread_info *info, CERROR("bad inode %lx\n",inode->i_ino); iput(inode); inode = ERR_PTR(-ENOENT); + } else { + /* Do not update file c/mtime in ldiskfs. + * NB: we don't have any lock to protect this because we don't + * have reference on osd_object now, but contention with + * another lookup + attr_set can't happen in the tiny window + * between if (...) and set S_NOCMTIME. */ + if (!(inode->i_flags & S_NOCMTIME)) + inode->i_flags |= S_NOCMTIME; } return inode; } @@ -357,13 +297,12 @@ static int osd_fid_lookup(const struct lu_env *env, struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; struct osd_device *dev; struct osd_inode_id *id; - struct osd_oi *oi; struct inode *inode; int result; LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); - LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid)); + LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid)); /* * This assertion checks that osd layer sees only local * fids. Unfortunately it is somewhat expensive (does a @@ -374,37 +313,52 @@ static int osd_fid_lookup(const struct lu_env *env, ENTRY; info = osd_oti_get(env); + LASSERT(info); dev = osd_dev(ldev); id = &info->oti_id; - oi = &dev->od_oi; if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) RETURN(-ENOENT); - result = osd_oi_lookup(info, oi, fid, id); - if (result == 0) { - inode = osd_iget(info, dev, id); - if (!IS_ERR(inode)) { - obj->oo_inode = inode; - LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); - if (dev->od_iop_mode) { - obj->oo_compat_dot_created = 1; - obj->oo_compat_dotdot_created = 1; - } + result = osd_oi_lookup(info, dev, fid, id); + if (result != 0) { + if (result == -ENOENT) result = 0; - } else - /* - * If fid wasn't found in oi, inode-less object is - * created, for which lu_object_exists() returns - * false. This is used in a (frequent) case when - * objects are created as locking anchors or - * place holders for objects yet to be created. - */ - result = PTR_ERR(inode); - } else if (result == -ENOENT) - result = 0; - LINVRNT(osd_invariant(obj)); + GOTO(out, result); + } + inode = osd_iget(info, dev, id); + if (IS_ERR(inode)) { + /* + * If fid wasn't found in oi, inode-less object is + * created, for which lu_object_exists() returns + * false. This is used in a (frequent) case when + * objects are created as locking anchors or + * place holders for objects yet to be created. + */ + result = PTR_ERR(inode); + GOTO(out, result); + } + + obj->oo_inode = inode; + LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); + if (dev->od_iop_mode) { + obj->oo_compat_dot_created = 1; + obj->oo_compat_dotdot_created = 1; + } + + if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */ + goto out; + + LASSERT(obj->oo_hl_head == NULL); + obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF); + if (obj->oo_hl_head == NULL) { + obj->oo_inode = NULL; + iput(inode); + result = -ENOMEM; + } +out: + LINVRNT(osd_invariant(obj)); RETURN(result); } @@ -432,6 +386,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l, LINVRNT(osd_invariant(obj)); result = osd_fid_lookup(env, obj, lu_object_fid(l)); + obj->oo_dt.do_body_ops = &osd_body_ops_new; if (result == 0) { if (obj->oo_inode != NULL) osd_object_init0(obj); @@ -451,33 +406,11 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l) LINVRNT(osd_invariant(obj)); dt_object_fini(&obj->oo_dt); + if (obj->oo_hl_head != NULL) + ldiskfs_htree_lock_head_free(obj->oo_hl_head); OBD_FREE_PTR(obj); } -/** - * IAM Iterator - */ -static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env, - const struct iam_container *bag) -{ - return bag->ic_descr->id_ops->id_ipd_alloc(bag, - osd_oti_get(env)->oti_it_ipd); -} - -static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env, - const struct iam_container *bag) -{ - return bag->ic_descr->id_ops->id_ipd_alloc(bag, - osd_oti_get(env)->oti_idx_ipd); -} - -static void osd_ipd_put(const struct lu_env *env, - const struct iam_container *bag, - struct iam_path_descr *ipd) -{ - bag->ic_descr->id_ops->id_ipd_free(ipd); -} - /* * Concurrency: no concurrent access is possible that late in object * life-cycle. @@ -597,36 +530,38 @@ static void __osd_th_check_slow(void *oth, struct osd_device *dev, * Concurrency: doesn't access mutable data. */ static int osd_param_is_sane(const struct osd_device *dev, - const struct txn_param *param) + const struct thandle *th) { - return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers; + struct osd_thandle *oh; + oh = container_of0(th, struct osd_thandle, ot_super); + return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers; } /* * Concurrency: shouldn't matter. */ +#ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD +static void osd_trans_commit_cb(struct super_block *sb, + struct journal_callback *jcb, int error) +#else static void osd_trans_commit_cb(struct journal_callback *jcb, int error) +#endif { struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb); struct thandle *th = &oh->ot_super; - struct dt_device *dev = th->th_dev; - struct lu_device *lud = &dev->dd_lu_dev; + struct lu_device *lud = &th->th_dev->dd_lu_dev; + struct dt_txn_commit_cb *dcb, *tmp; - LASSERT(dev != NULL); LASSERT(oh->ot_handle == NULL); - if (error) { + if (error) CERROR("transaction @0x%p commit error: %d\n", th, error); - } else { - struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit; - /* - * This od_env_for_commit is only for commit usage. see - * "struct dt_device" - */ - lu_context_enter(&env->le_ctx); - dt_txn_hook_commit(env, th); - lu_context_exit(&env->le_ctx); - } + + dt_txn_hook_commit(th); + + /* call per-transaction callbacks if any */ + cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) + dcb->dcb_func(NULL, th, dcb, error); lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th); lu_device_put(lud); @@ -637,129 +572,200 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error) OBD_FREE_PTR(oh); } +static struct thandle *osd_trans_create(const struct lu_env *env, + struct dt_device *d) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_iobuf *iobuf = &oti->oti_iobuf; + struct osd_thandle *oh; + struct thandle *th; + ENTRY; + + /* on pending IO in this thread should left from prev. request */ + LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0); + + th = ERR_PTR(-ENOMEM); + OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); + if (oh != NULL) { + th = &oh->ot_super; + th->th_dev = d; + th->th_result = 0; + th->th_tags = LCT_TX_HANDLE; + oh->ot_credits = 0; + oti->oti_dev = osd_dt_dev(d); + CFS_INIT_LIST_HEAD(&oh->ot_dcb_list); + osd_th_alloced(oh); + } + RETURN(th); +} + /* * Concurrency: shouldn't matter. */ -static struct thandle *osd_trans_start(const struct lu_env *env, - struct dt_device *d, - struct txn_param *p) +int osd_trans_start(const struct lu_env *env, struct dt_device *d, + struct thandle *th) { + struct osd_thread_info *oti = osd_oti_get(env); struct osd_device *dev = osd_dt_dev(d); handle_t *jh; struct osd_thandle *oh; - struct thandle *th; - int hook_res; + int rc; ENTRY; - hook_res = dt_txn_hook_start(env, d, p); - if (hook_res != 0) - RETURN(ERR_PTR(hook_res)); + LASSERT(current->journal_info == NULL); - if (osd_param_is_sane(dev, p)) { - OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); - if (oh != NULL) { - struct osd_thread_info *oti = osd_oti_get(env); + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh != NULL); + LASSERT(oh->ot_handle == NULL); - /* - * XXX temporary stuff. Some abstraction layer should - * be used. - */ - oti->oti_dev = dev; - osd_th_alloced(oh); - jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits); - osd_th_started(oh); - if (!IS_ERR(jh)) { - oh->ot_handle = jh; - th = &oh->ot_super; - th->th_dev = d; - th->th_result = 0; - jh->h_sync = p->tp_sync; - lu_device_get(&d->dd_lu_dev); - oh->ot_dev_link = lu_ref_add - (&d->dd_lu_dev.ld_reference, - "osd-tx", th); - /* add commit callback */ - lu_context_init(&th->th_ctx, LCT_TX_HANDLE); - lu_context_enter(&th->th_ctx); - osd_journal_callback_set(jh, osd_trans_commit_cb, - (struct journal_callback *)&oh->ot_jcb); - LASSERT(oti->oti_txns == 0); - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - oti->oti_txns++; - } else { - OBD_FREE_PTR(oh); - th = (void *)jh; - } - } else - th = ERR_PTR(-ENOMEM); - } else { - CERROR("Invalid transaction parameters\n"); - th = ERR_PTR(-EINVAL); + rc = dt_txn_hook_start(env, d, th); + if (rc != 0) + GOTO(out, rc); + + if (!osd_param_is_sane(dev, th)) { + CWARN("%s: too many transaction credits (%d > %d)\n", + d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits, + osd_journal(dev)->j_max_transaction_buffers); + /* XXX Limit the credits to 'max_transaction_buffers', and + * let the underlying filesystem to catch the error if + * we really need so many credits. + * + * This should be removed when we can calculate the + * credits precisely. */ + oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers; +#ifdef OSD_TRACK_DECLARES + CERROR(" attr_set: %d, punch: %d, xattr_set: %d,\n", + oh->ot_declare_attr_set, oh->ot_declare_punch, + oh->ot_declare_xattr_set); + CERROR(" create: %d, ref_add: %d, ref_del: %d, write: %d\n", + oh->ot_declare_create, oh->ot_declare_ref_add, + oh->ot_declare_ref_del, oh->ot_declare_write); + CERROR(" insert: %d, delete: %d, destroy: %d\n", + oh->ot_declare_insert, oh->ot_declare_delete, + oh->ot_declare_destroy); +#endif } - RETURN(th); + /* + * XXX temporary stuff. Some abstraction layer should + * be used. + */ + jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits); + osd_th_started(oh); + if (!IS_ERR(jh)) { + oh->ot_handle = jh; + LASSERT(oti->oti_txns == 0); + lu_context_init(&th->th_ctx, th->th_tags); + lu_context_enter(&th->th_ctx); + + lu_device_get(&d->dd_lu_dev); + oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference, + "osd-tx", th); + + /* + * XXX: current rule is that we first start tx, + * then lock object(s), but we can't use + * this rule for data (due to locking specifics + * in ldiskfs). also in long-term we'd like to + * use usually-used (locks;tx) ordering. so, + * UGLY thing is that we'll use one ordering for + * data (ofd) and reverse ordering for metadata + * (mdd). then at some point we'll fix the latter + */ + if (lu_device_is_md(&d->dd_lu_dev)) { + LASSERT(oti->oti_r_locks == 0); + LASSERT(oti->oti_w_locks == 0); + } + + oti->oti_txns++; + rc = 0; + } else { + rc = PTR_ERR(jh); + } +out: + RETURN(rc); } /* * Concurrency: shouldn't matter. */ -static void osd_trans_stop(const struct lu_env *env, struct thandle *th) +static int osd_trans_stop(const struct lu_env *env, struct thandle *th) { - int result; - struct osd_thandle *oh; + int rc = 0; + struct osd_thandle *oh; struct osd_thread_info *oti = osd_oti_get(env); + struct osd_iobuf *iobuf = &oti->oti_iobuf; ENTRY; oh = container_of0(th, struct osd_thandle, ot_super); + if (oh->ot_handle != NULL) { handle_t *hdl = oh->ot_handle; + hdl->h_sync = th->th_sync; + + /* + * add commit callback + * notice we don't do this in osd_trans_start() + * as underlying transaction can change during truncate + */ + osd_journal_callback_set(hdl, osd_trans_commit_cb, + &oh->ot_jcb); + LASSERT(oti->oti_txns == 1); oti->oti_txns--; - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - result = dt_txn_hook_stop(env, th); - if (result != 0) - CERROR("Failure in transaction hook: %d\n", result); + /* + * XXX: current rule is that we first start tx, + * then lock object(s), but we can't use + * this rule for data (due to locking specifics + * in ldiskfs). also in long-term we'd like to + * use usually-used (locks;tx) ordering. so, + * UGLY thing is that we'll use one ordering for + * data (ofd) and reverse ordering for metadata + * (mdd). then at some point we'll fix the latter + */ + if (lu_device_is_md(&th->th_dev->dd_lu_dev)) { + LASSERT(oti->oti_r_locks == 0); + LASSERT(oti->oti_w_locks == 0); + } + rc = dt_txn_hook_stop(env, th); + if (rc != 0) + CERROR("Failure in transaction hook: %d\n", rc); oh->ot_handle = NULL; OSD_CHECK_SLOW_TH(oh, oti->oti_dev, - result = ldiskfs_journal_stop(hdl)); - if (result != 0) - CERROR("Failure to stop transaction: %d\n", result); + rc = ldiskfs_journal_stop(hdl)); + if (rc != 0) + CERROR("Failure to stop transaction: %d\n", rc); + } else { + OBD_FREE_PTR(oh); } - EXIT; + + /* as we want IO to journal and data IO be concurrent, we don't block + * awaiting data IO completion in osd_do_bio(), instead we wait here + * once transaction is submitted to the journal. all reqular requests + * don't do direct IO (except read/write), thus this wait_event becomes + * no-op for them. + * + * IMPORTANT: we have to wait till any IO submited by the thread is + * completed otherwise iobuf may be corrupted by different request + */ + cfs_wait_event(iobuf->dr_wait, cfs_atomic_read(&iobuf->dr_numreqs)==0); + if (!rc) + rc = iobuf->dr_error; + + RETURN(rc); } -/* - * Concurrency: no concurrent access is possible that late in object - * life-cycle. - */ -static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj) +static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) { - const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); - struct osd_device *osd = osd_obj2dev(obj); - struct osd_thread_info *oti = osd_oti_get(env); - struct txn_param *prm = &oti->oti_txn; - struct lu_env *env_del_obj = &oti->oti_obj_delete_tx_env; - struct thandle *th; - int result; + struct osd_thandle *oh = container_of0(th, struct osd_thandle, + ot_super); - lu_env_init(env_del_obj, LCT_DT_THREAD); - txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + - OSD_TXN_INODE_DELETE_CREDITS); - th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm); - if (!IS_ERR(th)) { - result = osd_oi_delete(osd_oti_get(env_del_obj), - &osd->od_oi, fid, th); - osd_trans_stop(env_del_obj, th); - } else - result = PTR_ERR(th); + cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list); - lu_env_fini(env_del_obj); - return result; + return 0; } /* @@ -782,16 +788,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) osd_index_fini(obj); if (inode != NULL) { - int result; - - if (osd_inode_unlinked(inode)) { - result = osd_inode_remove(env, obj); - if (result != 0) - LU_OBJECT_DEBUG(D_ERROR, env, l, - "Failed to cleanup: %d\n", - result); - } - iput(inode); obj->oo_inode = NULL; } @@ -803,11 +799,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) static void osd_object_release(const struct lu_env *env, struct lu_object *l) { - struct osd_object *o = osd_obj(l); - - LASSERT(!lu_object_is_dying(l->lo_header)); - if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode)) - cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags); } /* @@ -862,12 +853,27 @@ static void osd_conf_get(const struct lu_env *env, const struct dt_device *dev, struct dt_device_param *param) { + struct super_block *sb = osd_sb(osd_dt_dev(dev)); + /* * XXX should be taken from not-yet-existing fs abstraction layer. */ - param->ddp_max_name_len = LDISKFS_NAME_LEN; - param->ddp_max_nlink = LDISKFS_LINK_MAX; - param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits; + param->ddp_max_name_len = LDISKFS_NAME_LEN; + param->ddp_max_nlink = LDISKFS_LINK_MAX; + param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits; + param->ddp_mntopts = 0; + if (test_opt(sb, XATTR_USER)) + param->ddp_mntopts |= MNTOPT_USERXATTR; + if (test_opt(sb, POSIX_ACL)) + param->ddp_mntopts |= MNTOPT_ACL; + +#if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE) + if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE)) + param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE; + else +#endif + param->ddp_max_ea_size = sb->s_blocksize; + } /** @@ -917,20 +923,18 @@ static int osd_commit_async(const struct lu_env *env, /* * Concurrency: shouldn't matter. */ -lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *); static void osd_ro(const struct lu_env *env, struct dt_device *d) { + struct super_block *sb = osd_sb(osd_dt_dev(d)); ENTRY; CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME); - __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))), - fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d)))); + __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev); EXIT; } - /* * Concurrency: serialization provided by callers. */ @@ -971,7 +975,7 @@ static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d, * Note: we do not count into QUOTA here. * If we mount with --data_journal we may need more. */ -static const int osd_dto_credits_noquota[DTO_NR] = { +const int osd_dto_credits_noquota[DTO_NR] = { /** * Insert/Delete. * INDEX_EXTRA_TRANS_BLOCKS(8) + @@ -984,7 +988,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = { /** * Unused now */ - [DTO_IDNEX_UPDATE] = 16, + [DTO_INDEX_UPDATE] = 16, /** * Create a object. The same as create object in EXT3. * DATA_TRANS_BLOCKS(14) + @@ -993,14 +997,13 @@ static const int osd_dto_credits_noquota[DTO_NR] = { */ [DTO_OBJECT_CREATE] = 25, /** - * Unused now + * XXX: real credits to be fixed */ [DTO_OBJECT_DELETE] = 25, /** - * Attr set credits. - * 3(inode bits, group, GDT) + * Attr set credits (inode) */ - [DTO_ATTR_SET_BASE] = 3, + [DTO_ATTR_SET_BASE] = 1, /** * Xattr set. The same as xattr of EXT3. * DATA_TRANS_BLOCKS(14) @@ -1010,7 +1013,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = { [DTO_XATTR_SET] = 14, [DTO_LOG_REC] = 14, /** - * creadits for inode change during write. + * credits for inode change during write. */ [DTO_WRITE_BASE] = 3, /** @@ -1024,97 +1027,17 @@ static const int osd_dto_credits_noquota[DTO_NR] = { [DTO_ATTR_SET_CHOWN]= 0 }; -/** - * Note: we count into QUOTA here. - * If we mount with --data_journal we may need more. - */ -static const int osd_dto_credits_quota[DTO_NR] = { - /** - * INDEX_EXTRA_TRANS_BLOCKS(8) + - * SINGLEDATA_TRANS_BLOCKS(8) + - * 2 * QUOTA_TRANS_BLOCKS(2) - */ - [DTO_INDEX_INSERT] = 20, - /** - * INDEX_EXTRA_TRANS_BLOCKS(8) + - * SINGLEDATA_TRANS_BLOCKS(8) + - * 2 * QUOTA_TRANS_BLOCKS(2) - */ - [DTO_INDEX_DELETE] = 20, - /** - * Unused now. - */ - [DTO_IDNEX_UPDATE] = 16, - /* - * Create a object. Same as create object in EXT3 filesystem. - * DATA_TRANS_BLOCKS(16) + - * INDEX_EXTRA_BLOCKS(8) + - * 3(inode bits, groups, GDT) + - * 2 * QUOTA_INIT_BLOCKS(25) - */ - [DTO_OBJECT_CREATE] = 77, - /* - * Unused now. - * DATA_TRANS_BLOCKS(16) + - * INDEX_EXTRA_BLOCKS(8) + - * 3(inode bits, groups, GDT) + - * QUOTA(?) - */ - [DTO_OBJECT_DELETE] = 27, - /** - * Attr set credits. - * 3 (inode bit, group, GDT) + - */ - [DTO_ATTR_SET_BASE] = 3, - /** - * Xattr set. The same as xattr of EXT3. - * DATA_TRANS_BLOCKS(16) - * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are - * also counted in. Do not know why? - */ - [DTO_XATTR_SET] = 16, - [DTO_LOG_REC] = 16, - /** - * creadits for inode change during write. - */ - [DTO_WRITE_BASE] = 3, - /** - * credits for single block write. - */ - [DTO_WRITE_BLOCK] = 16, - /** - * Attr set credits for chown. - * It is added to already set setattr credits - * 2 * QUOTA_INIT_BLOCKS(25) + - * 2 * QUOTA_DEL_BLOCKS(9) - */ - [DTO_ATTR_SET_CHOWN]= 68, -}; - -static int osd_credit_get(const struct lu_env *env, struct dt_device *d, - enum dt_txn_op op) -{ - LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) == - ARRAY_SIZE(osd_dto_credits_quota)); - LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota)); -#ifdef HAVE_QUOTA_SUPPORT - if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA)) - return osd_dto_credits_quota[op]; - else -#endif - return osd_dto_credits_noquota[op]; -} - static const struct dt_device_operations osd_dt_ops = { .dt_root_get = osd_root_get, .dt_statfs = osd_statfs, + .dt_trans_create = osd_trans_create, .dt_trans_start = osd_trans_start, .dt_trans_stop = osd_trans_stop, + .dt_trans_cb_add = osd_trans_cb_add, .dt_conf_get = osd_conf_get, .dt_sync = osd_sync, .dt_ro = osd_ro, .dt_commit_async = osd_commit_async, - .dt_credit_get = osd_credit_get, .dt_init_capa_ctxt = osd_init_capa_ctxt, .dt_init_quota_ctxt= osd_init_quota_ctxt, }; @@ -1243,8 +1166,8 @@ static int capa_is_sane(const struct lu_env *env, RETURN(0); } -static int osd_object_auth(const struct lu_env *env, struct dt_object *dt, - struct lustre_capa *capa, __u64 opc) +int osd_object_auth(const struct lu_env *env, struct dt_object *dt, + struct lustre_capa *capa, __u64 opc) { const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_device *dev = osd_dev(dt->do_lu.lo_dev); @@ -1342,6 +1265,42 @@ static int osd_attr_get(const struct lu_env *env, return 0; } +static int osd_declare_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *handle) +{ + struct osd_thandle *oh; + struct osd_object *obj; + + LASSERT(dt != NULL); + LASSERT(handle != NULL); + + obj = osd_dt_obj(dt); + LASSERT(osd_invariant(obj)); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, attr_set); + oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + + if (attr && attr->la_valid & LA_UID) { + if (obj->oo_inode) + osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid, + obj->oo_inode); + osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL); + } + if (attr && attr->la_valid & LA_GID) { + if (obj->oo_inode) + osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid, + obj->oo_inode); + osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL); + } + + return 0; +} + static int osd_inode_setattr(const struct lu_env *env, struct inode *inode, const struct lu_attr *attr) { @@ -1366,7 +1325,7 @@ static int osd_inode_setattr(const struct lu_env *env, iattr.ia_uid = attr->la_uid; iattr.ia_gid = attr->la_gid; osd_push_ctxt(env, save); - rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0; + rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0; osd_pop_ctxt(save); if (rc != 0) return rc; @@ -1402,8 +1361,11 @@ static int osd_inode_setattr(const struct lu_env *env, if (bits & LA_RDEV) inode->i_rdev = attr->la_rdev; - if (bits & LA_FLAGS) - inode->i_flags = ll_ext_to_inode_flags(attr->la_flags); + if (bits & LA_FLAGS) { + /* always keep S_NOCMTIME */ + inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) | + S_NOCMTIME; + } return 0; } @@ -1423,12 +1385,14 @@ static int osd_attr_set(const struct lu_env *env, if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; + OSD_EXEC_OP(handle, attr_set); + cfs_spin_lock(&obj->oo_guard); rc = osd_inode_setattr(env, obj->oo_inode, attr); cfs_spin_unlock(&obj->oo_guard); if (!rc) - mark_inode_dirty(obj->oo_inode); + obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode); return rc; } @@ -1447,30 +1411,18 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct thandle *th) { osd_object_init0(obj); + if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW)) + unlock_new_inode(obj->oo_inode); return 0; } -static struct dentry * osd_child_dentry_get(const struct lu_env *env, - struct osd_object *obj, - const char *name, - const int namelen) +struct dentry *osd_child_dentry_get(const struct lu_env *env, + struct osd_object *obj, + const char *name, const int namelen) { - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *child_dentry = &info->oti_child_dentry; - struct dentry *obj_dentry = &info->oti_obj_dentry; - - obj_dentry->d_inode = obj->oo_inode; - obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); - obj_dentry->d_name.hash = 0; - - child_dentry->d_name.hash = 0; - child_dentry->d_parent = obj_dentry; - child_dentry->d_name.name = name; - child_dentry->d_name.len = namelen; - return child_dentry; + return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen); } - static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, cfs_umode_t mode, struct dt_allocation_hint *hint, @@ -1487,6 +1439,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); + LASSERT(obj->oo_hl_head == NULL); + + if (S_ISDIR(mode) && ldiskfs_pdo) { + obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF); + if (obj->oo_hl_head == NULL) + return -ENOMEM; + } oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); @@ -1496,22 +1455,30 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, else parent = osd->od_obj_area; - LASSERT(parent != NULL); - LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL); - #ifdef HAVE_QUOTA_SUPPORT osd_push_ctxt(info->oti_env, save); #endif inode = ldiskfs_create_inode(oth->ot_handle, - osd_dt_obj(parent)->oo_inode, mode); + parent ? osd_dt_obj(parent)->oo_inode : + osd_sb(osd)->s_root->d_inode, + mode); #ifdef HAVE_QUOTA_SUPPORT osd_pop_ctxt(save); #endif if (!IS_ERR(inode)) { + /* Do not update file c/mtime in ldiskfs. + * NB: don't need any lock because no contention at this + * early stage */ + inode->i_flags |= S_NOCMTIME; obj->oo_inode = inode; result = 0; - } else + } else { + if (obj->oo_hl_head != NULL) { + ldiskfs_htree_lock_head_free(obj->oo_hl_head); + obj->oo_hl_head = NULL; + } result = PTR_ERR(inode); + } LINVRNT(osd_invariant(obj)); return result; } @@ -1719,10 +1686,50 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj, id->oii_ino = obj->oo_inode->i_ino; id->oii_gen = obj->oo_inode->i_generation; - return osd_oi_insert(info, &osd->od_oi, fid, id, th, + return osd_oi_insert(info, osd, fid, id, th, uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); } +static int osd_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, create); + oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE]; + /* XXX: So far, only normal fid needs be inserted into the oi, + * things could be changed later. Revise following code then. */ + if (fid_is_norm(lu_object_fid(&dt->do_lu))) { + OSD_DECLARE_OP(oh, insert); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + } + /* If this is directory, then we expect . and .. to be inserted as + * well. The one directory block always needs to be created for the + * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap, + * block), there is no danger of needing a tree for the first block. + */ + if (attr && S_ISDIR(attr->la_mode)) { + OSD_DECLARE_OP(oh, insert); + OSD_DECLARE_OP(oh, insert); + oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE]; + } + + if (attr) { + osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL); + osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL); + } + return 0; +} + static int osd_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, @@ -1741,6 +1748,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, create); + result = __osd_object_create(info, obj, attr, hint, dof, th); if (result == 0) result = __osd_oi_insert(env, obj, fid, th); @@ -1751,6 +1760,77 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, } /** + * Called to destroy on-disk representation of the object + * + * Concurrency: must be locked + */ +static int osd_declare_object_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thandle *oh; + + ENTRY; + + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + LASSERT(inode); + + OSD_DECLARE_OP(oh, destroy); + OSD_DECLARE_OP(oh, delete); + oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE]; + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; + + osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode); + osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode); + + RETURN(0); +} + +static int osd_object_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_device *osd = osd_obj2dev(obj); + struct osd_thandle *oh; + int result; + ENTRY; + + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle); + LASSERT(inode); + LASSERT(!lu_object_is_dying(dt->do_lu.lo_header)); + + if (S_ISDIR(inode->i_mode)) { + LASSERT(osd_inode_unlinked(inode) || + inode->i_nlink == 1); + cfs_spin_lock(&obj->oo_guard); + inode->i_nlink = 0; + cfs_spin_unlock(&obj->oo_guard); + inode->i_sb->s_op->dirty_inode(inode); + } else { + LASSERT(osd_inode_unlinked(inode)); + } + + OSD_EXEC_OP(th, destroy); + + result = osd_oi_delete(osd_oti_get(env), osd, fid, th); + + /* XXX: add to ext3 orphan list */ + /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */ + + /* not needed in the cache anymore */ + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); + + RETURN(0); +} + +/** * Helper function for osd_xattr_set() */ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, @@ -1760,9 +1840,8 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); struct dentry *dentry = &info->oti_child_dentry; - struct timespec *t = &info->oti_time; int fs_flags = 0; - int rc; + int rc; LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL); @@ -1775,14 +1854,8 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, fs_flags |= XATTR_CREATE; dentry->d_inode = inode; - *t = inode->i_ctime; rc = inode->i_op->setxattr(dentry, name, buf->lb_buf, buf->lb_len, fs_flags); - /* ctime should not be updated with server-side time. */ - cfs_spin_lock(&obj->oo_guard); - inode->i_ctime = *t; - cfs_spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); return rc; } @@ -1820,17 +1893,6 @@ static inline void osd_igif_get(const struct lu_env *env, struct inode *inode, } /** - * Helper function to pack the fid, ldiskfs stores fid in packed format. - */ -void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid, - struct lu_fid *befider) -{ - fid_cpu_to_be(befider, (struct lu_fid *)fid); - memcpy(pack->fp_area, befider, sizeof(*befider)); - pack->fp_len = sizeof(*befider) + 1; -} - -/** * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata. * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs. * To have compatilibility with 1.8 ldiskfs driver we need to have @@ -1848,23 +1910,6 @@ void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param, (struct lu_fid *)fid); } -int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack) -{ - int result; - - result = 0; - switch (pack->fp_len) { - case sizeof *fid + 1: - memcpy(fid, pack->fp_area, sizeof *fid); - fid_be_to_cpu(fid, fid); - break; - default: - CERROR("Unexpected packed fid size: %d\n", pack->fp_len); - result = -EIO; - } - return result; -} - /** * Try to read the fid from inode ea into dt_rec, if return value * i.e. rc is +ve, then we got fid, otherwise we will have to form igif @@ -1933,15 +1978,15 @@ out: * \retval -ve, on error */ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) { const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_object *obj = osd_dt_obj(dt); struct osd_thread_info *info = osd_oti_get(env); - int result; + int result; ENTRY; @@ -1950,6 +1995,8 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, create); + result = __osd_object_create(info, obj, attr, hint, dof, th); /* objects under osd root shld have igif fid, so dont add fid EA */ @@ -1964,59 +2011,133 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, RETURN(result); } +static int osd_declare_object_ref_add(const struct lu_env *env, + struct dt_object *dt, + struct thandle *handle) +{ + struct osd_thandle *oh; + + /* it's possible that object doesn't exist yet */ + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, ref_add); + oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + + return 0; +} + /* * Concurrency: @dt is write locked. */ -static void osd_object_ref_add(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int osd_object_ref_add(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; + struct inode *inode = obj->oo_inode; LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, ref_add); + + /* + * DIR_NLINK feature is set for compatibility reasons if: + * 1) nlinks > LDISKFS_LINK_MAX, or + * 2) nlinks == 2, since this indicates i_nlink was previously 1. + * + * It is easier to always set this flag (rather than check and set), + * since it has less overhead, and the superblock will be dirtied + * at some point. Both e2fsprogs and any Lustre-supported ldiskfs + * do not actually care whether this flag is set or not. + */ cfs_spin_lock(&obj->oo_guard); - LASSERT(inode->i_nlink < LDISKFS_LINK_MAX); inode->i_nlink++; + if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) { + if (inode->i_nlink >= LDISKFS_LINK_MAX || + inode->i_nlink == 2) + inode->i_nlink = 1; + } + LASSERT(inode->i_nlink < LDISKFS_LINK_MAX); cfs_spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); + inode->i_sb->s_op->dirty_inode(inode); LINVRNT(osd_invariant(obj)); + + return 0; +} + +static int osd_declare_object_ref_del(const struct lu_env *env, + struct dt_object *dt, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, ref_del); + oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + + return 0; } /* * Concurrency: @dt is write locked. */ -static void osd_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; + struct inode *inode = obj->oo_inode; LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, ref_del); + cfs_spin_lock(&obj->oo_guard); LASSERT(inode->i_nlink > 0); inode->i_nlink--; + /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX) + * then the nlink count is 1. Don't let it be set to 0 or the directory + * inode will be deleted incorrectly. */ + if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0) + inode->i_nlink++; cfs_spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); + inode->i_sb->s_op->dirty_inode(inode); LINVRNT(osd_invariant(obj)); + + return 0; +} + +/* + * Get the 64-bit version for an inode. + */ +static int osd_object_version_get(const struct lu_env *env, + struct dt_object *dt, dt_obj_version_t *ver) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n", + LDISKFS_I(inode)->i_fs_version, inode->i_ino); + *ver = LDISKFS_I(inode)->i_fs_version; + return 0; } /* * Concurrency: @dt is read locked. */ -static int osd_xattr_get(const struct lu_env *env, - struct dt_object *dt, - struct lu_buf *buf, - const char *name, +static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *buf, const char *name, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -2024,6 +2145,15 @@ static int osd_xattr_get(const struct lu_env *env, struct osd_thread_info *info = osd_oti_get(env); struct dentry *dentry = &info->oti_obj_dentry; + /* version get is not real XATTR but uses xattr API */ + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* for version we are just using xattr API but change inode + * field instead */ + LASSERT(buf->lb_len == sizeof(dt_obj_version_t)); + osd_object_version_get(env, dt, buf->lb_buf); + return sizeof(dt_obj_version_t); + } + LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); @@ -2035,6 +2165,48 @@ static int osd_xattr_get(const struct lu_env *env, return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len); } + +static int osd_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, const char *name, + int fl, struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(handle != NULL); + + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* no credits for version */ + return 0; + } + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, xattr_set); + oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET]; + + return 0; +} + +/* + * Set the 64-bit version for object + */ +static void osd_object_version_set(const struct lu_env *env, + struct dt_object *dt, + dt_obj_version_t *new_version) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n", + *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino); + + LDISKFS_I(inode)->i_fs_version = *new_version; + /** Version is set after all inode operations are finished, + * so we should mark it dirty here */ + inode->i_sb->s_op->dirty_inode(inode); +} + /* * Concurrency: @dt is write locked. */ @@ -2044,19 +2216,27 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, { LASSERT(handle != NULL); + /* version set is not real XATTR */ + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* for version we are just using xattr API but change inode + * field instead */ + LASSERT(buf->lb_len == sizeof(dt_obj_version_t)); + osd_object_version_set(env, dt, buf->lb_buf); + return sizeof(dt_obj_version_t); + } + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; + OSD_EXEC_OP(handle, xattr_set); return __osd_xattr_set(env, dt, buf, name, fl); } /* * Concurrency: @dt is read locked. */ -static int osd_xattr_list(const struct lu_env *env, - struct dt_object *dt, - struct lu_buf *buf, - struct lustre_capa *capa) +static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *buf, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -2074,20 +2254,35 @@ static int osd_xattr_list(const struct lu_env *env, return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len); } +static int osd_declare_xattr_del(const struct lu_env *env, + struct dt_object *dt, const char *name, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, xattr_set); + oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET]; + + return 0; +} + /* * Concurrency: @dt is write locked. */ -static int osd_xattr_del(const struct lu_env *env, - struct dt_object *dt, - const char *name, - struct thandle *handle, +static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *handle, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); struct dentry *dentry = &info->oti_obj_dentry; - struct timespec *t = &info->oti_time; int rc; LASSERT(dt_object_exists(dt)); @@ -2098,14 +2293,10 @@ static int osd_xattr_del(const struct lu_env *env, if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; + OSD_EXEC_OP(handle, xattr_set); + dentry->d_inode = inode; - *t = inode->i_ctime; rc = inode->i_op->removexattr(dentry, name); - /* ctime should not be updated with server-side time. */ - cfs_spin_lock(&obj->oo_guard); - inode->i_ctime = *t; - cfs_spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); return rc; } @@ -2198,51 +2389,23 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) { - int rc; struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); struct dentry *dentry = &info->oti_obj_dentry; struct file *file = &info->oti_file; - ENTRY; - - dentry->d_inode = inode; - file->f_dentry = dentry; - file->f_mapping = inode->i_mapping; - file->f_op = inode->i_fop; - LOCK_INODE_MUTEX(inode); - rc = file->f_op->fsync(file, dentry, 0); - UNLOCK_INODE_MUTEX(inode); - RETURN(rc); -} - -/* - * Get the 64-bit version for an inode. - */ -static dt_obj_version_t osd_object_version_get(const struct lu_env *env, - struct dt_object *dt) -{ - struct inode *inode = osd_dt_obj(dt)->oo_inode; - - CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n", - LDISKFS_I(inode)->i_fs_version, inode->i_ino); - return LDISKFS_I(inode)->i_fs_version; -} + int rc; -/* - * Set the 64-bit version and return the old version. - */ -static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt, - dt_obj_version_t new_version) -{ - struct inode *inode = osd_dt_obj(dt)->oo_inode; + ENTRY; - CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n", - new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino); - LDISKFS_I(inode)->i_fs_version = new_version; - /** Version is set after all inode operations are finished, - * so we should mark it dirty here */ - inode->i_sb->s_op->dirty_inode(inode); + dentry->d_inode = inode; + file->f_dentry = dentry; + file->f_mapping = inode->i_mapping; + file->f_op = inode->i_fop; + LOCK_INODE_MUTEX(inode); + rc = file->f_op->fsync(file, dentry, 0); + UNLOCK_INODE_MUTEX(inode); + RETURN(rc); } static int osd_data_get(const struct lu_env *env, struct dt_object *dt, @@ -2293,18 +2456,30 @@ static int osd_iam_container_init(const struct lu_env *env, struct osd_object *obj, struct osd_directory *dir) { + struct iam_container *bag = &dir->od_container; int result; - struct iam_container *bag; - bag = &dir->od_container; result = iam_container_init(bag, &dir->od_descr, obj->oo_inode); - if (result == 0) { - result = iam_container_setup(bag); - if (result == 0) - obj->oo_dt.do_index_ops = &osd_index_iam_ops; - else - iam_container_fini(bag); + if (result != 0) + return result; + + result = iam_container_setup(bag); + if (result != 0) + goto out; + + if (osd_obj2dev(obj)->od_iop_mode) { + u32 ptr = bag->ic_descr->id_ops->id_root_ptr(bag); + + bag->ic_root_bh = ldiskfs_bread(NULL, obj->oo_inode, + ptr, 0, &result); } + + out: + if (result == 0) + obj->oo_dt.do_index_ops = &osd_index_iam_ops; + else + iam_container_fini(bag); + return result; } @@ -2361,10 +2536,12 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, else result = 0; cfs_up_write(&obj->oo_ext_idx_sem); - } else + } else { result = -ENOMEM; - } else + } + } else { result = 0; + } if (result == 0 && ea_dir == 0) { if (!osd_iam_index_probe(env, obj, feat)) @@ -2376,27 +2553,33 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, } static const struct dt_object_operations osd_obj_ops = { - .do_read_lock = osd_object_read_lock, - .do_write_lock = osd_object_write_lock, - .do_read_unlock = osd_object_read_unlock, - .do_write_unlock = osd_object_write_unlock, - .do_write_locked = osd_object_write_locked, - .do_attr_get = osd_attr_get, - .do_attr_set = osd_attr_set, - .do_ah_init = osd_ah_init, - .do_create = osd_object_create, - .do_index_try = osd_index_try, - .do_ref_add = osd_object_ref_add, - .do_ref_del = osd_object_ref_del, - .do_xattr_get = osd_xattr_get, - .do_xattr_set = osd_xattr_set, - .do_xattr_del = osd_xattr_del, - .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, - .do_object_sync = osd_object_sync, - .do_version_get = osd_object_version_get, - .do_version_set = osd_object_version_set, - .do_data_get = osd_data_get, + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, + .do_attr_get = osd_attr_get, + .do_declare_attr_set = osd_declare_attr_set, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_declare_create = osd_declare_object_create, + .do_create = osd_object_create, + .do_declare_destroy = osd_declare_object_destroy, + .do_destroy = osd_object_destroy, + .do_index_try = osd_index_try, + .do_declare_ref_add = osd_declare_object_ref_add, + .do_ref_add = osd_object_ref_add, + .do_declare_ref_del = osd_declare_object_ref_del, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_declare_xattr_set = osd_declare_xattr_set, + .do_xattr_set = osd_xattr_set, + .do_declare_xattr_del = osd_declare_xattr_del, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, }; /** @@ -2404,255 +2587,51 @@ static const struct dt_object_operations osd_obj_ops = { * (i.e. to run 2.0 mds on 1.8 disk) (b11826) */ static const struct dt_object_operations osd_obj_ea_ops = { - .do_read_lock = osd_object_read_lock, - .do_write_lock = osd_object_write_lock, - .do_read_unlock = osd_object_read_unlock, - .do_write_unlock = osd_object_write_unlock, - .do_write_locked = osd_object_write_locked, - .do_attr_get = osd_attr_get, - .do_attr_set = osd_attr_set, - .do_ah_init = osd_ah_init, - .do_create = osd_object_ea_create, - .do_index_try = osd_index_try, - .do_ref_add = osd_object_ref_add, - .do_ref_del = osd_object_ref_del, - .do_xattr_get = osd_xattr_get, - .do_xattr_set = osd_xattr_set, - .do_xattr_del = osd_xattr_del, - .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, - .do_object_sync = osd_object_sync, - .do_version_get = osd_object_version_get, - .do_version_set = osd_object_version_set, - .do_data_get = osd_data_get, + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, + .do_attr_get = osd_attr_get, + .do_declare_attr_set = osd_declare_attr_set, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_declare_create = osd_declare_object_create, + .do_create = osd_object_ea_create, + .do_declare_destroy = osd_declare_object_destroy, + .do_destroy = osd_object_destroy, + .do_index_try = osd_index_try, + .do_declare_ref_add = osd_declare_object_ref_add, + .do_ref_add = osd_object_ref_add, + .do_declare_ref_del = osd_declare_object_ref_del, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_declare_xattr_set = osd_declare_xattr_set, + .do_xattr_set = osd_xattr_set, + .do_declare_xattr_del = osd_declare_xattr_del, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, }; -/* - * Body operations. - */ - -/* - * XXX: Another layering violation for now. - * - * We don't want to use ->f_op->read methods, because generic file write - * - * - serializes on ->i_sem, and - * - * - does a lot of extra work like balance_dirty_pages(), - * - * which doesn't work for globally shared files like /last-received. - */ -static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen) -{ - struct ldiskfs_inode_info *ei = LDISKFS_I(inode); - - memcpy(buffer, (char*)ei->i_data, buflen); - - return buflen; -} - -static int osd_ldiskfs_read(struct inode *inode, void *buf, int size, - loff_t *offs) -{ - struct buffer_head *bh; - unsigned long block; - int osize = size; - int blocksize; - int csize; - int boffs; - int err; - - /* prevent reading after eof */ - spin_lock(&inode->i_lock); - if (i_size_read(inode) < *offs + size) { - size = i_size_read(inode) - *offs; - spin_unlock(&inode->i_lock); - if (size < 0) { - CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n", - i_size_read(inode), *offs); - return -EBADR; - } else if (size == 0) { - return 0; - } - } else { - spin_unlock(&inode->i_lock); - } - - blocksize = 1 << inode->i_blkbits; - - while (size > 0) { - block = *offs >> inode->i_blkbits; - boffs = *offs & (blocksize - 1); - csize = min(blocksize - boffs, size); - bh = ldiskfs_bread(NULL, inode, block, 0, &err); - if (!bh) { - CERROR("can't read block: %d\n", err); - return err; - } - - memcpy(buf, bh->b_data + boffs, csize); - brelse(bh); - - *offs += csize; - buf += csize; - size -= csize; - } - return osize; -} - -static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos, - struct lustre_capa *capa) +static int osd_index_declare_iam_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) { - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - int rc; - - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) - RETURN(-EACCES); - - /* Read small symlink from inode body as we need to maintain correct - * on-disk symlinks for ldiskfs. - */ - if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) && - (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data))) - rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len); - else - rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos); - - return rc; -} + struct osd_thandle *oh; -static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen) -{ + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); - memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer, - buflen); - LDISKFS_I(inode)->i_disksize = buflen; - i_size_write(inode, buflen); - inode->i_sb->s_op->dirty_inode(inode); + OSD_DECLARE_OP(oh, delete); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; return 0; } -static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize, - loff_t *offs, handle_t *handle) -{ - struct buffer_head *bh = NULL; - loff_t offset = *offs; - loff_t new_size = i_size_read(inode); - unsigned long block; - int blocksize = 1 << inode->i_blkbits; - int err = 0; - int size; - int boffs; - int dirty_inode = 0; - - while (bufsize > 0) { - if (bh != NULL) - brelse(bh); - - block = offset >> inode->i_blkbits; - boffs = offset & (blocksize - 1); - size = min(blocksize - boffs, bufsize); - bh = ldiskfs_bread(handle, inode, block, 1, &err); - if (!bh) { - CERROR("can't read/create block: %d\n", err); - break; - } - - err = ldiskfs_journal_get_write_access(handle, bh); - if (err) { - CERROR("journal_get_write_access() returned error %d\n", - err); - break; - } - LASSERTF(boffs + size <= bh->b_size, - "boffs %d size %d bh->b_size %lu", - boffs, size, (unsigned long)bh->b_size); - memcpy(bh->b_data + boffs, buf, size); - err = ldiskfs_journal_dirty_metadata(handle, bh); - if (err) - break; - - if (offset + size > new_size) - new_size = offset + size; - offset += size; - bufsize -= size; - buf += size; - } - if (bh) - brelse(bh); - - /* correct in-core and on-disk sizes */ - if (new_size > i_size_read(inode)) { - spin_lock(&inode->i_lock); - if (new_size > i_size_read(inode)) - i_size_write(inode, new_size); - if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) { - LDISKFS_I(inode)->i_disksize = i_size_read(inode); - dirty_inode = 1; - } - spin_unlock(&inode->i_lock); - if (dirty_inode) - inode->i_sb->s_op->dirty_inode(inode); - } - - if (err == 0) - *offs = offset; - return err; -} - -static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, struct lustre_capa *capa, - int ignore_quota) -{ - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thandle *oh; - ssize_t result = 0; -#ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = current->cap_effective; -#endif - - LASSERT(handle != NULL); - - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE)) - RETURN(-EACCES); - - oh = container_of(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle->h_transaction != NULL); -#ifdef HAVE_QUOTA_SUPPORT - if (ignore_quota) - current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; - else - current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; -#endif - /* Write small symlink to inode body as we need to maintain correct - * on-disk symlinks for ldiskfs. - */ - if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) && - (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data))) - result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len); - else - result = osd_ldiskfs_write_record(inode, buf->lb_buf, - buf->lb_len, pos, - oh->ot_handle); -#ifdef HAVE_QUOTA_SUPPORT - current->cap_effective = save; -#endif - if (result == 0) - result = buf->lb_len; - return result; -} - -static const struct dt_body_operations osd_body_ops = { - .dbo_read = osd_read, - .dbo_write = osd_write -}; - - /** * delete a (key, value) pair from index \a dt specified by \a key * @@ -2666,14 +2645,15 @@ static const struct dt_body_operations osd_body_ops = { */ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *handle, + const struct dt_key *key, + struct thandle *handle, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct osd_thandle *oh; struct iam_path_descr *ipd; struct iam_container *bag = &obj->oo_dir->od_container; - int rc; + int rc; ENTRY; @@ -2685,6 +2665,8 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) RETURN(-EACCES); + OSD_EXEC_OP(handle, delete); + ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) RETURN(-ENOMEM); @@ -2699,11 +2681,36 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static int osd_index_declare_ea_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, delete); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; + + LASSERT(osd_dt_obj(dt)->oo_inode); + osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, + osd_dt_obj(dt)->oo_inode); + osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, + osd_dt_obj(dt)->oo_inode); + + return 0; +} + static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, struct dt_rec *fid) { struct osd_fid_pack *rec; - int rc = -ENODATA; + int rc = -ENODATA; if (de->file_type & LDISKFS_DIRENT_LUFID) { rec = (struct osd_fid_pack *) (de->name + de->name_len + 1); @@ -2723,7 +2730,8 @@ static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, * \retval -ve, on error */ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *handle, + const struct dt_key *key, + struct thandle *handle, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -2732,8 +2740,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, struct osd_thandle *oh; struct ldiskfs_dir_entry_2 *de; struct buffer_head *bh; - - int rc; + struct htree_lock *hlock = NULL; + int rc; ENTRY; @@ -2741,6 +2749,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(dt_object_exists(dt)); LASSERT(handle != NULL); + OSD_EXEC_OP(handle, delete); + oh = container_of(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle != NULL); LASSERT(oh->ot_handle->h_transaction != NULL); @@ -2751,28 +2761,27 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); - cfs_down_write(&obj->oo_ext_idx_sem); - bh = ll_ldiskfs_find_entry(dir, dentry, &de); - if (bh) { - struct osd_thread_info *oti = osd_oti_get(env); - struct timespec *ctime = &oti->oti_time; - struct timespec *mtime = &oti->oti_time2; + if (obj->oo_hl_head != NULL) { + hlock = osd_oti_get(env)->oti_hlock; + ldiskfs_htree_lock(hlock, obj->oo_hl_head, + dir, LDISKFS_HLOCK_DEL); + } else { + cfs_down_write(&obj->oo_ext_idx_sem); + } - *ctime = dir->i_ctime; - *mtime = dir->i_mtime; + bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); + if (bh) { rc = ldiskfs_delete_entry(oh->ot_handle, - dir, de, bh); - /* xtime should not be updated with server-side time. */ - cfs_spin_lock(&obj->oo_guard); - dir->i_ctime = *ctime; - dir->i_mtime = *mtime; - cfs_spin_unlock(&obj->oo_guard); - mark_inode_dirty(dir); + dir, de, bh); brelse(bh); - } else + } else { rc = -ENOENT; + } + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + cfs_up_write(&obj->oo_ext_idx_sem); - cfs_up_write(&obj->oo_ext_idx_sem); LASSERT(osd_invariant(obj)); RETURN(rc); } @@ -2792,13 +2801,14 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_rec *rec, const struct dt_key *key, struct lustre_capa *capa) { - struct osd_object *obj = osd_dt_obj(dt); - struct iam_path_descr *ipd; - struct iam_container *bag = &obj->oo_dir->od_container; + struct osd_object *obj = osd_dt_obj(dt); + struct iam_path_descr *ipd; + struct iam_container *bag = &obj->oo_dir->od_container; struct osd_thread_info *oti = osd_oti_get(env); struct iam_iterator *it = &oti->oti_idx_it; - struct iam_rec *iam_rec; - int rc; + struct iam_rec *iam_rec; + int rc; + ENTRY; LASSERT(osd_invariant(obj)); @@ -2836,6 +2846,26 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static int osd_index_declare_iam_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, insert); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + + return 0; +} + /** * Inserts (key, value) pair in \a dt index object. * @@ -2848,20 +2878,20 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, * \retval -ve failure */ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, - const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, struct lustre_capa *capa, - int ignore_quota) + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th, + struct lustre_capa *capa, int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); struct iam_path_descr *ipd; struct osd_thandle *oh; struct iam_container *bag = &obj->oo_dir->od_container; #ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = current->cap_effective; + cfs_cap_t save = cfs_curproc_cap_pack(); #endif struct osd_thread_info *oti = osd_oti_get(env); - struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp; - int rc; + struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp; + int rc; ENTRY; @@ -2873,6 +2903,8 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) return -EACCES; + OSD_EXEC_OP(th, insert); + ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) RETURN(-ENOMEM); @@ -2882,9 +2914,9 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, LASSERT(oh->ot_handle->h_transaction != NULL); #ifdef HAVE_QUOTA_SUPPORT if (ignore_quota) - current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; + cfs_cap_raise(CFS_CAP_SYS_RESOURCE); else - current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; + cfs_cap_lower(CFS_CAP_SYS_RESOURCE); #endif if (S_ISDIR(obj->oo_inode->i_mode)) osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid); @@ -2893,7 +2925,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key, iam_rec, ipd); #ifdef HAVE_QUOTA_SUPPORT - current->cap_effective = save; + cfs_curproc_cap_unpack(save); #endif osd_ipd_put(env, bag, ipd); LINVRNT(osd_invariant(obj)); @@ -2909,16 +2941,14 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, * \retval -ve, on error */ static int __osd_ea_add_rec(struct osd_thread_info *info, - struct osd_object *pobj, - struct inode *cinode, - const char *name, - const struct dt_rec *fid, - struct thandle *th) + struct osd_object *pobj, struct inode *cinode, + const char *name, const struct dt_rec *fid, + struct htree_lock *hlock, struct thandle *th) { struct ldiskfs_dentry_param *ldp; - struct dentry *child; - struct osd_thandle *oth; - int rc; + struct dentry *child; + struct osd_thandle *oth; + int rc; oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle != NULL); @@ -2927,13 +2957,13 @@ static int __osd_ea_add_rec(struct osd_thread_info *info, child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name)); if (fid_is_igif((struct lu_fid *)fid) || - fid_seq((struct lu_fid *)fid) >= FID_SEQ_NORMAL) { + fid_is_norm((struct lu_fid *)fid)) { ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; osd_get_ldiskfs_dirent_param(ldp, fid); child->d_fsdata = (void*) ldp; } else child->d_fsdata = NULL; - rc = ldiskfs_add_entry(oth->ot_handle, child, cinode); + rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock); RETURN(rc); } @@ -2991,11 +3021,11 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, /* in case of rename, dotdot is already created */ if (dir->oo_compat_dotdot_created) { return __osd_ea_add_rec(info, dir, parent_dir, name, - dot_dot_fid, th); + dot_dot_fid, NULL, th); } - result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode, - dot_ldp, dot_dot_ldp); + result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, + inode, dot_ldp, dot_dot_ldp); if (result == 0) dir->oo_compat_dotdot_created = 1; } @@ -3008,23 +3038,42 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, * It will call the appropriate osd_add* function and return the * value, return by respective functions. */ -static int osd_ea_add_rec(const struct lu_env *env, - struct osd_object *pobj, - struct inode *cinode, - const char *name, - const struct dt_rec *fid, - struct thandle *th) -{ - struct osd_thread_info *info = osd_oti_get(env); - int rc; +static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj, + struct inode *cinode, const char *name, + const struct dt_rec *fid, struct thandle *th) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct htree_lock *hlock; + int rc; + + hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL; if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' && - name[2] =='\0'))) + name[2] =='\0'))) { + if (hlock != NULL) { + ldiskfs_htree_lock(hlock, pobj->oo_hl_head, + pobj->oo_inode, 0); + } else { + cfs_down_write(&pobj->oo_ext_idx_sem); + } rc = osd_add_dot_dotdot(info, pobj, cinode, name, (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu), fid, th); + } else { + if (hlock != NULL) { + ldiskfs_htree_lock(hlock, pobj->oo_hl_head, + pobj->oo_inode, LDISKFS_HLOCK_ADD); + } else { + cfs_down_write(&pobj->oo_ext_idx_sem); + } + + rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, + hlock, th); + } + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); else - rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th); + cfs_up_write(&pobj->oo_ext_idx_sem); return rc; } @@ -3045,16 +3094,24 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, struct ldiskfs_dir_entry_2 *de; struct buffer_head *bh; struct lu_fid *fid = (struct lu_fid *) rec; - int ino; - int rc; + struct htree_lock *hlock = NULL; + int ino; + int rc; LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL); dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); - cfs_down_read(&obj->oo_ext_idx_sem); - bh = ll_ldiskfs_find_entry(dir, dentry, &de); + if (obj->oo_hl_head != NULL) { + hlock = osd_oti_get(env)->oti_hlock; + ldiskfs_htree_lock(hlock, obj->oo_hl_head, + dir, LDISKFS_HLOCK_LOOKUP); + } else { + cfs_down_read(&obj->oo_ext_idx_sem); + } + + bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); if (bh) { ino = le32_to_cpu(de->inode); rc = osd_get_fid_from_dentry(de, rec); @@ -3063,10 +3120,14 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, brelse(bh); if (rc != 0) rc = osd_ea_fid_get(env, obj, ino, fid); - } else + } else { rc = -ENOENT; + } - cfs_up_read(&obj->oo_ext_idx_sem); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + cfs_up_read(&obj->oo_ext_idx_sem); RETURN (rc); } @@ -3082,10 +3143,10 @@ struct osd_object *osd_object_find(const struct lu_env *env, struct dt_object *dt, const struct lu_fid *fid) { - struct lu_device *ludev = dt->do_lu.lo_dev; - struct osd_object *child = NULL; - struct lu_object *luch; - struct lu_object *lo; + struct lu_device *ludev = dt->do_lu.lo_dev; + struct osd_object *child = NULL; + struct lu_object *luch; + struct lu_object *lo; luch = lu_object_find(env, ludev, fid, NULL); if (!IS_ERR(luch)) { @@ -3126,6 +3187,32 @@ static inline void osd_object_put(const struct lu_env *env, lu_object_put(env, &obj->oo_dt.do_lu); } +static int osd_index_declare_ea_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, insert); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + + LASSERT(osd_dt_obj(dt)->oo_inode); + osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, + osd_dt_obj(dt)->oo_inode); + osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, + osd_dt_obj(dt)->oo_inode); + + return 0; +} + /** * Index add function for interoperability mode (b11826). * It will add the directory entry.This entry is needed to @@ -3142,14 +3229,14 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th, struct lustre_capa *capa, int ignore_quota) { - struct osd_object *obj = osd_dt_obj(dt); - struct lu_fid *fid = (struct lu_fid *) rec; - const char *name = (const char *)key; - struct osd_object *child; + struct osd_object *obj = osd_dt_obj(dt); + struct lu_fid *fid = (struct lu_fid *) rec; + const char *name = (const char *)key; + struct osd_object *child; #ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = current->cap_effective; + cfs_cap_t save = cfs_curproc_cap_pack(); #endif - int rc; + int rc; ENTRY; @@ -3162,32 +3249,17 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, child = osd_object_find(env, dt, fid); if (!IS_ERR(child)) { - struct inode *inode = obj->oo_inode; - struct osd_thread_info *oti = osd_oti_get(env); - struct timespec *ctime = &oti->oti_time; - struct timespec *mtime = &oti->oti_time2; - - *ctime = inode->i_ctime; - *mtime = inode->i_mtime; #ifdef HAVE_QUOTA_SUPPORT if (ignore_quota) - current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK; + cfs_cap_raise(CFS_CAP_SYS_RESOURCE); else - current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; + cfs_cap_lower(CFS_CAP_SYS_RESOURCE); #endif - cfs_down_write(&obj->oo_ext_idx_sem); rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th); - cfs_up_write(&obj->oo_ext_idx_sem); #ifdef HAVE_QUOTA_SUPPORT - current->cap_effective = save; + cfs_curproc_cap_unpack(save); #endif osd_object_put(env, child); - /* xtime should not be updated with server-side time. */ - cfs_spin_lock(&obj->oo_guard); - inode->i_ctime = *ctime; - inode->i_mtime = *mtime; - cfs_spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); } else { rc = PTR_ERR(child); } @@ -3203,15 +3275,16 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, */ static struct dt_it *osd_it_iam_init(const struct lu_env *env, - struct dt_object *dt, - struct lustre_capa *capa) + struct dt_object *dt, + __u32 unused, + struct lustre_capa *capa) { - struct osd_it_iam *it; + struct osd_it_iam *it; struct osd_thread_info *oti = osd_oti_get(env); - struct osd_object *obj = osd_dt_obj(dt); - struct lu_object *lo = &dt->do_lu; - struct iam_path_descr *ipd; - struct iam_container *bag = &obj->oo_dir->od_container; + struct osd_object *obj = osd_dt_obj(dt); + struct lu_object *lo = &dt->do_lu; + struct iam_path_descr *ipd; + struct iam_container *bag = &obj->oo_dir->od_container; LASSERT(lu_object_exists(lo)); @@ -3236,7 +3309,7 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env, static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di) { - struct osd_it_iam *it = (struct osd_it_iam *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; struct osd_object *obj = it->oi_obj; iam_it_fini(&it->oi_it); @@ -3256,7 +3329,7 @@ static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di) */ static int osd_it_iam_get(const struct lu_env *env, - struct dt_it *di, const struct dt_key *key) + struct dt_it *di, const struct dt_key *key) { struct osd_it_iam *it = (struct osd_it_iam *)di; @@ -3316,13 +3389,11 @@ static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di) return iam_it_key_size(&it->oi_it); } -static inline void osd_it_append_attrs(struct lu_dirent*ent, - __u32 attr, - int len, - __u16 type) +static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr, + int len, __u16 type) { - struct luda_type *lt; - const unsigned align = sizeof(struct luda_type) - 1; + struct luda_type *lt; + const unsigned align = sizeof(struct luda_type) - 1; /* check if file type is required */ if (attr & LUDA_TYPE) { @@ -3341,12 +3412,9 @@ static inline void osd_it_append_attrs(struct lu_dirent*ent, */ static inline void osd_it_pack_dirent(struct lu_dirent *ent, - struct lu_fid *fid, - __u64 offset, - char *name, - __u16 namelen, - __u16 type, - __u32 attr) + struct lu_fid *fid, __u64 offset, + char *name, __u16 namelen, + __u16 type, __u32 attr) { fid_cpu_to_le(&ent->lde_fid, fid); ent->lde_attrs = LUDA_FID; @@ -3366,13 +3434,13 @@ static inline void osd_it_pack_dirent(struct lu_dirent *ent, */ static int osd_it_iam_rec(const struct lu_env *env, const struct dt_it *di, - struct lu_dirent *lde, - __u32 attr) + struct dt_rec *dtrec, __u32 attr) { struct osd_it_iam *it = (struct osd_it_iam *)di; struct osd_thread_info *info = osd_oti_get(env); struct lu_fid *fid = &info->oti_fid; const struct osd_fid_pack *rec; + struct lu_dirent *lde = (struct lu_dirent *)dtrec; char *name; int namelen; __u64 hash; @@ -3423,7 +3491,7 @@ static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di) */ static int osd_it_iam_load(const struct lu_env *env, - const struct dt_it *di, __u64 hash) + const struct dt_it *di, __u64 hash) { struct osd_it_iam *it = (struct osd_it_iam *)di; @@ -3431,9 +3499,11 @@ static int osd_it_iam_load(const struct lu_env *env, } static const struct dt_index_operations osd_index_iam_ops = { - .dio_lookup = osd_index_iam_lookup, - .dio_insert = osd_index_iam_insert, - .dio_delete = osd_index_iam_delete, + .dio_lookup = osd_index_iam_lookup, + .dio_declare_insert = osd_index_declare_iam_insert, + .dio_insert = osd_index_iam_insert, + .dio_declare_delete = osd_index_declare_iam_delete, + .dio_delete = osd_index_iam_delete, .dio_it = { .init = osd_it_iam_init, .fini = osd_it_iam_fini, @@ -3456,6 +3526,7 @@ static const struct dt_index_operations osd_index_iam_ops = { */ static struct dt_it *osd_it_ea_init(const struct lu_env *env, struct dt_object *dt, + __u32 attr, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -3477,6 +3548,10 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env, it->oie_obj = obj; it->oie_file.f_pos = 0; it->oie_file.f_dentry = obj_dentry; + if (attr & LUDA_64BITHASH) + it->oie_file.f_flags = O_64BITHASH; + else + it->oie_file.f_flags = O_32BITHASH; it->oie_file.f_mapping = obj->oo_inode->i_mapping; it->oie_file.f_op = obj->oo_inode->i_fop; it->oie_file.private_data = NULL; @@ -3596,22 +3671,34 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, * \retval 0 on success * \retval -ve on error */ -static int osd_ldiskfs_it_fill(const struct dt_it *di) +static int osd_ldiskfs_it_fill(const struct lu_env *env, + const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; struct inode *inode = obj->oo_inode; - int result = 0; + struct htree_lock *hlock = NULL; + int result = 0; ENTRY; it->oie_dirent = it->oie_buf; it->oie_rd_dirent = 0; - cfs_down_read(&obj->oo_ext_idx_sem); + if (obj->oo_hl_head != NULL) { + hlock = osd_oti_get(env)->oti_hlock; + ldiskfs_htree_lock(hlock, obj->oo_hl_head, + inode, LDISKFS_HLOCK_READDIR); + } else { + cfs_down_read(&obj->oo_ext_idx_sem); + } + result = inode->i_fop->readdir(&it->oie_file, it, (filldir_t) osd_ldiskfs_filldir); - cfs_up_read(&obj->oo_ext_idx_sem); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + cfs_up_read(&obj->oo_ext_idx_sem); if (it->oie_rd_dirent == 0) { result = -EIO; @@ -3652,7 +3739,7 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) if (it->oie_file.f_pos == LDISKFS_HTREE_EOF) rc = +1; else - rc = osd_ldiskfs_it_fill(di); + rc = osd_ldiskfs_it_fill(env, di); } RETURN(rc); @@ -3669,8 +3756,8 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; - ENTRY; - RETURN((struct dt_key *)it->oie_dirent->oied_name); + + return (struct dt_key *)it->oie_dirent->oied_name; } /** @@ -3683,8 +3770,8 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env, static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; - ENTRY; - RETURN(it->oie_dirent->oied_namelen); + + return it->oie_dirent->oied_namelen; } @@ -3701,12 +3788,12 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) */ static inline int osd_it_ea_rec(const struct lu_env *env, const struct dt_it *di, - struct lu_dirent *lde, - __u32 attr) + struct dt_rec *dtrec, __u32 attr) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; struct lu_fid *fid = &it->oie_dirent->oied_fid; + struct lu_dirent *lde = (struct lu_dirent *)dtrec; int rc = 0; ENTRY; @@ -3734,8 +3821,8 @@ static inline int osd_it_ea_rec(const struct lu_env *env, static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; - ENTRY; - RETURN(it->oie_dirent->oied_off); + + return it->oie_dirent->oied_off; } /** @@ -3757,7 +3844,7 @@ static int osd_it_ea_load(const struct lu_env *env, ENTRY; it->oie_file.f_pos = hash; - rc = osd_ldiskfs_it_fill(di); + rc = osd_ldiskfs_it_fill(env, di); if (rc == 0) rc = +1; @@ -3799,9 +3886,11 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826) */ static const struct dt_index_operations osd_index_ea_ops = { - .dio_lookup = osd_index_ea_lookup, - .dio_insert = osd_index_ea_insert, - .dio_delete = osd_index_ea_delete, + .dio_lookup = osd_index_ea_lookup, + .dio_declare_insert = osd_index_declare_ea_insert, + .dio_insert = osd_index_ea_insert, + .dio_declare_delete = osd_index_declare_ea_delete, + .dio_delete = osd_index_ea_delete, .dio_it = { .init = osd_it_ea_init, .fini = osd_it_ea_fini, @@ -3822,19 +3911,26 @@ static void *osd_key_init(const struct lu_context *ctx, struct osd_thread_info *info; OBD_ALLOC_PTR(info); - if (info != NULL) { - OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); - if (info->oti_it_ea_buf != NULL) { - info->oti_env = container_of(ctx, struct lu_env, - le_ctx); - } else { - OBD_FREE_PTR(info); - info = ERR_PTR(-ENOMEM); - } - } else { - info = ERR_PTR(-ENOMEM); - } + if (info == NULL) + return ERR_PTR(-ENOMEM); + + OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + if (info->oti_it_ea_buf == NULL) + goto out_free_info; + + info->oti_env = container_of(ctx, struct lu_env, le_ctx); + + info->oti_hlock = ldiskfs_htree_lock_alloc(); + if (info->oti_hlock == NULL) + goto out_free_ea; + return info; + + out_free_ea: + OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + out_free_info: + OBD_FREE_PTR(info); + return ERR_PTR(-ENOMEM); } static void osd_key_fini(const struct lu_context *ctx, @@ -3842,6 +3938,8 @@ static void osd_key_fini(const struct lu_context *ctx, { struct osd_thread_info *info = data; + if (info->oti_hlock != NULL) + ldiskfs_htree_lock_free(info->oti_hlock); OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); OBD_FREE_PTR(info); } @@ -3859,7 +3957,7 @@ static void osd_key_exit(const struct lu_context *ctx, /* type constructor/destructor: osd_type_init, osd_type_fini */ LU_TYPE_INIT_FINI(osd, &osd_key); -static struct lu_context_key osd_key = { +struct lu_context_key osd_key = { .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD, .lct_init = osd_key_init, .lct_fini = osd_key_fini, @@ -3870,17 +3968,7 @@ static struct lu_context_key osd_key = { static int osd_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) { - int rc; - struct lu_context *ctx; - - /* context for commit hooks */ - ctx = &osd_dev(d)->od_env_for_commit.le_ctx; - rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF); - if (rc == 0) { - rc = osd_procfs_init(osd_dev(d), name); - ctx->lc_cookie = 0x3; - } - return rc; + return osd_procfs_init(osd_dev(d), name); } static int osd_shutdown(const struct lu_env *env, struct osd_device *o) @@ -3891,7 +3979,13 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o) lu_object_put(env, &o->od_obj_area->do_lu); o->od_obj_area = NULL; } - osd_oi_fini(info, &o->od_oi); + if (o->od_oi_table != NULL) + osd_oi_fini(info, o); + + if (o->od_fsops) { + fsfilt_put_ops(o->od_fsops); + o->od_fsops = NULL; + } RETURN(0); } @@ -3905,6 +3999,13 @@ static int osd_mount(const struct lu_env *env, struct lustre_sb_info *lsi; ENTRY; + + o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS)); + if (o->od_fsops == NULL) { + CERROR("Can't find fsfilt_ldiskfs\n"); + RETURN(-ENOTSUPP); + } + if (o->od_mount != NULL) { CERROR("Already mounted (%s)\n", dev); RETURN(-EEXIST); @@ -3954,7 +4055,6 @@ static struct lu_device *osd_device_fini(const struct lu_env *env, osd_dev(d)->od_mount->lmi_mnt); osd_dev(d)->od_mount = NULL; - lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx); RETURN(NULL); } @@ -4044,11 +4144,13 @@ static int osd_prepare(const struct lu_env *env, ENTRY; /* 1. initialize oi before any file create or file open */ - result = osd_oi_init(oti, &osd->od_oi, - &osd->od_dt_dev, lu2md_dev(pdev)); - if (result != 0) + result = osd_oi_init(oti, osd); + if (result < 0) RETURN(result); + if (!lu_device_is_md(pdev)) + RETURN(0); + lmi = osd->od_mount; lsi = s2lsi(lmi->lmi_sb); ldd = lsi->lsi_ldd; @@ -4082,7 +4184,7 @@ static const struct lu_object_operations osd_lu_obj_ops = { .loo_object_invariant = osd_object_invariant }; -static const struct lu_device_operations osd_lu_ops = { +const struct lu_device_operations osd_lu_ops = { .ldo_object_alloc = osd_object_alloc, .ldo_process_config = osd_process_config, .ldo_recovery_complete = osd_recovery_complete, @@ -4145,4 +4247,4 @@ MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")"); MODULE_LICENSE("GPL"); -cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit); +cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);