X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosd-ldiskfs%2Fosd_handler.c;h=e1bc36f0d994723066f43638ac25de0f809ea515;hb=9857b9d2bcf39c5bced8f4f7eedbe3170331a9ae;hp=18a3a861d6faca097c603b38968c4d95859f4813;hpb=14ede0dd6b76c4161a95c7f021e4857942415a7b;p=fs%2Flustre-release.git diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 18a3a86..e1bc36f 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -29,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -43,9 +41,6 @@ * Pravin Shelar : Added fid in dirent */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS #include @@ -58,8 +53,6 @@ #include /* XATTR_{REPLACE,CREATE} */ #include -/* simple_mkdir() */ -#include /* * struct OBD_{ALLOC,FREE}*() @@ -68,23 +61,17 @@ #include /* struct ptlrpc_thread */ #include - -/* fid_is_local() */ #include #include "osd_internal.h" -#include "osd_igif.h" /* llo_* api support */ #include +#include -#ifdef HAVE_LDISKFS_PDO int ldiskfs_pdo = 1; CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644, "ldiskfs with parallel directory operations"); -#else -int ldiskfs_pdo = 0; -#endif static const char dot[] = "."; static const char dotdot[] = ".."; @@ -93,10 +80,26 @@ static const char remote_obj_dir[] = "REM_OBJ_DIR"; static const struct lu_object_operations osd_lu_obj_ops; static const struct dt_object_operations osd_obj_ops; static const struct dt_object_operations osd_obj_ea_ops; -static const struct dt_body_operations osd_body_ops_new; +static const struct dt_object_operations osd_obj_otable_it_ops; static const struct dt_index_operations osd_index_iam_ops; static const struct dt_index_operations osd_index_ea_ops; +#ifdef OSD_TRACK_DECLARES +int osd_trans_declare_op2rb[] = { + [OSD_OT_ATTR_SET] = OSD_OT_ATTR_SET, + [OSD_OT_PUNCH] = OSD_OT_MAX, + [OSD_OT_XATTR_SET] = OSD_OT_XATTR_SET, + [OSD_OT_CREATE] = OSD_OT_DESTROY, + [OSD_OT_DESTROY] = OSD_OT_CREATE, + [OSD_OT_REF_ADD] = OSD_OT_REF_DEL, + [OSD_OT_REF_DEL] = OSD_OT_REF_ADD, + [OSD_OT_WRITE] = OSD_OT_WRITE, + [OSD_OT_INSERT] = OSD_OT_DELETE, + [OSD_OT_DELETE] = OSD_OT_INSERT, + [OSD_OT_QUOTA] = OSD_OT_MAX, +}; +#endif + static int osd_has_index(const struct osd_object *obj) { return obj->oo_dt.do_index_ops != NULL; @@ -107,41 +110,6 @@ static int osd_object_invariant(const struct lu_object *l) return osd_invariant(osd_obj(l)); } -#ifdef HAVE_QUOTA_SUPPORT -static inline void -osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save) -{ - struct md_ucred *uc = md_ucred(env); - struct cred *tc; - - LASSERT(uc != NULL); - - save->oc_uid = current_fsuid(); - save->oc_gid = current_fsgid(); - save->oc_cap = current_cap(); - if ((tc = prepare_creds())) { - tc->fsuid = uc->mu_fsuid; - tc->fsgid = uc->mu_fsgid; - commit_creds(tc); - } - /* XXX not suboptimal */ - cfs_curproc_cap_unpack(uc->mu_cap); -} - -static inline void -osd_pop_ctxt(struct osd_ctxt *save) -{ - struct cred *tc; - - if ((tc = prepare_creds())) { - tc->fsuid = save->oc_uid; - tc->fsgid = save->oc_gid; - tc->cap_effective = save->oc_cap; - commit_creds(tc); - } -} -#endif - /* * Concurrency: doesn't matter */ @@ -165,58 +133,10 @@ static int osd_write_locked(const struct lu_env *env, struct osd_object *o) static int osd_root_get(const struct lu_env *env, struct dt_device *dev, struct lu_fid *f) { - struct inode *inode; - - inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode; - LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation); + lu_local_obj_fid(f, OSD_FS_ROOT_OID); return 0; } -static inline int osd_qid_type(struct osd_thandle *oh, int i) -{ - return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA; -} - -static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type) -{ - oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0); -} - -static void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh, - int type, uid_t id, struct inode *inode) -{ -#ifdef CONFIG_QUOTA - int i, allocated = 0; - struct osd_object *obj; - - LASSERT(dt != NULL); - LASSERT(oh != NULL); - LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u", - oh->ot_id_cnt); - - /* id entry is allocated in the quota file */ - if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off) - allocated = 1; - - for (i = 0; i < oh->ot_id_cnt; i++) { - if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type) - return; - } - - if (unlikely(i >= OSD_MAX_UGID_CNT)) { - CERROR("more than %d uid/gids for a transaction?\n", i); - return; - } - - oh->ot_id_array[i] = id; - osd_qid_set_type(oh, i, type); - oh->ot_id_cnt++; - obj = osd_dt_obj(dt); - oh->ot_credits += (allocated || id == 0) ? - 1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj))); -#endif -} - /* * OSD object methods. */ @@ -237,130 +157,286 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, l = &mo->oo_dt.do_lu; dt_object_init(&mo->oo_dt, NULL, d); - if (osd_dev(d)->od_iop_mode) - mo->oo_dt.do_ops = &osd_obj_ea_ops; - else - mo->oo_dt.do_ops = &osd_obj_ops; - + mo->oo_dt.do_ops = &osd_obj_ea_ops; l->lo_ops = &osd_lu_obj_ops; - cfs_init_rwsem(&mo->oo_sem); - cfs_init_rwsem(&mo->oo_ext_idx_sem); - cfs_spin_lock_init(&mo->oo_guard); + init_rwsem(&mo->oo_sem); + init_rwsem(&mo->oo_ext_idx_sem); + spin_lock_init(&mo->oo_guard); return l; } else { return NULL; } } +static inline int __osd_xattr_get(struct inode *inode, struct dentry *dentry, + const char *name, void *buf, int len) +{ + dentry->d_inode = inode; + return inode->i_op->getxattr(dentry, name, buf, len); +} + +int osd_get_lma(struct osd_thread_info *info, struct inode *inode, + struct dentry *dentry, struct lustre_mdt_attrs *lma) +{ + int rc; + + rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA, (void *)lma, + sizeof(*lma)); + if (rc == -ERANGE) { + /* try with old lma size */ + rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, + info->oti_mdt_attrs_old, + LMA_OLD_SIZE); + if (rc > 0) + memcpy(lma, info->oti_mdt_attrs_old, sizeof(*lma)); + } + if (rc > 0) { + /* Check LMA compatibility */ + if (lma->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP)) { + CWARN("%.16s: unsupported incompat LMA feature(s) " + "%lx/%#x\n", + LDISKFS_SB(inode->i_sb)->s_es->s_volume_name, + inode->i_ino, le32_to_cpu(lma->lma_incompat) & + ~LMA_INCOMPAT_SUPP); + rc = -ENOSYS; + } else { + lustre_lma_swab(lma); + rc = 0; + } + } else if (rc == 0) { + rc = -ENODATA; + } + + return rc; +} + /* * retrieve object from backend ext fs. **/ -static struct inode *osd_iget(struct osd_thread_info *info, - struct osd_device *dev, - const struct osd_inode_id *id) -{ - struct inode *inode = NULL; - - inode = ldiskfs_iget(osd_sb(dev), id->oii_ino); - if (IS_ERR(inode)) { - CERROR("Cannot get inode, rc = %li\n", PTR_ERR(inode)); - } else if (id->oii_gen != OSD_OII_NOGEN && - inode->i_generation != id->oii_gen) { - iput(inode); - inode = ERR_PTR(-ESTALE); - } else if (inode->i_nlink == 0) { - /* due to parallel readdir and unlink, - * we can have dead inode here. */ - CWARN("stale inode\n"); - make_bad_inode(inode); - iput(inode); - inode = ERR_PTR(-ESTALE); - } else if (is_bad_inode(inode)) { - CERROR("bad inode %lx\n",inode->i_ino); - iput(inode); - inode = ERR_PTR(-ENOENT); - } else { - /* Do not update file c/mtime in ldiskfs. - * NB: we don't have any lock to protect this because we don't - * have reference on osd_object now, but contention with - * another lookup + attr_set can't happen in the tiny window - * between if (...) and set S_NOCMTIME. */ - if (!(inode->i_flags & S_NOCMTIME)) - inode->i_flags |= S_NOCMTIME; - } - return inode; -} - -static int osd_fid_lookup(const struct lu_env *env, - struct osd_object *obj, const struct lu_fid *fid) -{ - struct osd_thread_info *info; - struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; - struct osd_device *dev; - struct osd_inode_id *id; - struct inode *inode; - int result; - - LINVRNT(osd_invariant(obj)); - LASSERT(obj->oo_inode == NULL); - LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid)); - /* - * This assertion checks that osd layer sees only local - * fids. Unfortunately it is somewhat expensive (does a - * cache-lookup). Disabling it for production/acceptance-testing. - */ - LASSERT(1 || fid_is_local(env, ldev->ld_site, fid)); - - ENTRY; - - info = osd_oti_get(env); - dev = osd_dev(ldev); - id = &info->oti_id; - - if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) - RETURN(-ENOENT); - - result = osd_oi_lookup(info, osd_fid2oi(dev, fid), fid, id); - if (result != 0) { - if (result == -ENOENT) - result = 0; - goto out; - } - - inode = osd_iget(info, dev, id); - if (IS_ERR(inode)) { - /* - * If fid wasn't found in oi, inode-less object is - * created, for which lu_object_exists() returns - * false. This is used in a (frequent) case when - * objects are created as locking anchors or - * place holders for objects yet to be created. - */ - result = PTR_ERR(inode); - goto out; +struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev, + struct osd_inode_id *id) +{ + struct inode *inode = NULL; + + inode = ldiskfs_iget(osd_sb(dev), id->oii_ino); + if (IS_ERR(inode)) { + CDEBUG(D_INODE, "no inode: ino = %u, rc = %ld\n", + id->oii_ino, PTR_ERR(inode)); + } else if (id->oii_gen != OSD_OII_NOGEN && + inode->i_generation != id->oii_gen) { + CDEBUG(D_INODE, "unmatched inode: ino = %u, gen0 = %u, " + "gen1 = %u\n", + id->oii_ino, id->oii_gen, inode->i_generation); + iput(inode); + inode = ERR_PTR(-ESTALE); + } else if (inode->i_nlink == 0) { + /* due to parallel readdir and unlink, + * we can have dead inode here. */ + CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino); + make_bad_inode(inode); + iput(inode); + inode = ERR_PTR(-ESTALE); + } else if (is_bad_inode(inode)) { + CWARN("%.16s: bad inode: ino = %u\n", + LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name, id->oii_ino); + iput(inode); + inode = ERR_PTR(-ENOENT); + } else { + if (id->oii_gen == OSD_OII_NOGEN) + osd_id_gen(id, inode->i_ino, inode->i_generation); + + /* Do not update file c/mtime in ldiskfs. + * NB: we don't have any lock to protect this because we don't + * have reference on osd_object now, but contention with + * another lookup + attr_set can't happen in the tiny window + * between if (...) and set S_NOCMTIME. */ + if (!(inode->i_flags & S_NOCMTIME)) + inode->i_flags |= S_NOCMTIME; + } + return inode; +} + +static struct inode * +osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev, + struct osd_inode_id *id, struct lu_fid *fid) +{ + struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; + struct inode *inode; + int rc; + + inode = osd_iget(info, dev, id); + if (IS_ERR(inode)) + return inode; + + rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma); + if (rc == 0) { + *fid = lma->lma_self_fid; + } else if (rc == -ENODATA) { + if (unlikely(inode == osd_sb(dev)->s_root->d_inode)) + lu_local_obj_fid(fid, OSD_FS_ROOT_OID); + else + lu_igif_build(fid, inode->i_ino, inode->i_generation); + } else { + iput(inode); + inode = ERR_PTR(rc); + } + return inode; +} + +static struct inode * +osd_iget_verify(struct osd_thread_info *info, struct osd_device *dev, + struct osd_inode_id *id, const struct lu_fid *fid) +{ + struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; + struct inode *inode; + int rc; + + inode = osd_iget(info, dev, id); + if (IS_ERR(inode)) + return inode; + + rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma); + if (rc == -ENODATA) + return inode; + + if (rc != 0) { + iput(inode); + return ERR_PTR(rc); + } + + if (!lu_fid_eq(fid, &lma->lma_self_fid)) { + CDEBUG(D_LFSCK, "inconsistent obj: "DFID", %lu, "DFID"\n", + PFID(&lma->lma_self_fid), inode->i_ino, PFID(fid)); + iput(inode); + return ERR_PTR(-EREMCHG); + } + + return inode; +} + +static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj, + const struct lu_fid *fid, + const struct lu_object_conf *conf) +{ + struct osd_thread_info *info; + struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; + struct osd_device *dev; + struct osd_idmap_cache *oic; + struct osd_inode_id *id; + struct inode *inode; + struct osd_scrub *scrub; + struct scrub_file *sf; + int result; + int verify = 0; + ENTRY; + + LINVRNT(osd_invariant(obj)); + LASSERT(obj->oo_inode == NULL); + LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid)); + + dev = osd_dev(ldev); + scrub = &dev->od_scrub; + sf = &scrub->os_file; + info = osd_oti_get(env); + LASSERT(info); + oic = &info->oti_cache; + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) + RETURN(-ENOENT); + + /* Search order: 1. per-thread cache. */ + if (lu_fid_eq(fid, &oic->oic_fid)) { + id = &oic->oic_lid; + goto iget; + } + + id = &info->oti_id; + if (!cfs_list_empty(&scrub->os_inconsistent_items)) { + /* Search order: 2. OI scrub pending list. */ + result = osd_oii_lookup(dev, fid, id); + if (result == 0) + goto iget; + } + + if (sf->sf_flags & SF_INCONSISTENT) + verify = 1; + + /* + * Objects are created as locking anchors or place holders for objects + * yet to be created. No need to osd_oi_lookup() at here because FID + * shouldn't never be re-used, if it's really a duplicate FID from + * unexpected reason, we should be able to detect it later by calling + * do_create->osd_oi_insert() + */ + if (conf != NULL && conf->loc_flags & LOC_F_NEW) + GOTO(out, result = 0); + + /* Search order: 3. OI files. */ + result = osd_oi_lookup(info, dev, fid, id, true); + if (result == -ENOENT) { + if (!fid_is_norm(fid) || fid_is_on_ost(info, dev, fid) || + !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid), + sf->sf_oi_bitmap)) + GOTO(out, result = 0); + + goto trigger; + } + + if (result != 0) + GOTO(out, result); + +iget: + if (verify == 0) + inode = osd_iget(info, dev, id); + else + inode = osd_iget_verify(info, dev, id, fid); + if (IS_ERR(inode)) { + result = PTR_ERR(inode); + if (result == -ENOENT || result == -ESTALE) { + fid_zero(&oic->oic_fid); + result = 0; + } else if (result == -EREMCHG) { + +trigger: + if (thread_is_running(&scrub->os_thread)) { + result = -EINPROGRESS; + } else if (!dev->od_noscrub) { + result = osd_scrub_start(dev); + LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC " + "for "DFID", rc = %d [1]\n", + LDISKFS_SB(osd_sb(dev))->s_es->\ + s_volume_name,PFID(fid), result); + if (result == 0 || result == -EALREADY) + result = -EINPROGRESS; + else + result = -EREMCHG; + } + } + + GOTO(out, result); } obj->oo_inode = inode; LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); - if (dev->od_iop_mode) { - obj->oo_compat_dot_created = 1; - obj->oo_compat_dotdot_created = 1; - } + + obj->oo_compat_dot_created = 1; + obj->oo_compat_dotdot_created = 1; if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */ - goto out; + GOTO(out, result = 0); - LASSERT(obj->oo_hl_head == NULL); - obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF); - if (obj->oo_hl_head == NULL) { - obj->oo_inode = NULL; - iput(inode); - result = -ENOMEM; - } -out: - LINVRNT(osd_invariant(obj)); + LASSERT(obj->oo_hl_head == NULL); + obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF); + if (obj->oo_hl_head == NULL) { + obj->oo_inode = NULL; + iput(inode); + GOTO(out, result = -ENOMEM); + } + GOTO(out, result = 0); - RETURN(result); +out: + LINVRNT(osd_invariant(obj)); + return result; } /* @@ -379,21 +455,26 @@ static void osd_object_init0(struct osd_object *obj) * life-cycle. */ static int osd_object_init(const struct lu_env *env, struct lu_object *l, - const struct lu_object_conf *unused) + const struct lu_object_conf *conf) { - struct osd_object *obj = osd_obj(l); - int result; + struct osd_object *obj = osd_obj(l); + int result; - LINVRNT(osd_invariant(obj)); + LINVRNT(osd_invariant(obj)); - result = osd_fid_lookup(env, obj, lu_object_fid(l)); - obj->oo_dt.do_body_ops = &osd_body_ops_new; - if (result == 0) { - if (obj->oo_inode != NULL) - osd_object_init0(obj); - } - LINVRNT(osd_invariant(obj)); - return result; + if (fid_is_otable_it(&l->lo_header->loh_fid)) { + obj->oo_dt.do_ops = &osd_obj_otable_it_ops; + l->lo_header->loh_attr |= LOHA_EXISTS; + return 0; + } + + result = osd_fid_lookup(env, obj, lu_object_fid(l), conf); + obj->oo_dt.do_body_ops = &osd_body_ops_new; + if (result == 0 && obj->oo_inode != NULL) + osd_object_init0(obj); + + LINVRNT(osd_invariant(obj)); + return result; } /* @@ -412,30 +493,6 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l) OBD_FREE_PTR(obj); } -/** - * IAM Iterator - */ -static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env, - const struct iam_container *bag) -{ - return bag->ic_descr->id_ops->id_ipd_alloc(bag, - osd_oti_get(env)->oti_it_ipd); -} - -static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env, - const struct iam_container *bag) -{ - return bag->ic_descr->id_ops->id_ipd_alloc(bag, - osd_oti_get(env)->oti_idx_ipd); -} - -static void osd_ipd_put(const struct lu_env *env, - const struct iam_container *bag, - struct iam_path_descr *ipd) -{ - bag->ic_descr->id_ops->id_ipd_free(ipd); -} - /* * Concurrency: no concurrent access is possible that late in object * life-cycle. @@ -493,8 +550,7 @@ static void osd_th_started(struct osd_thandle *oth) /** * Helper function to convert time interval to microseconds packed in - * long int (default time units for the counter in "stats" initialized - * by lu_time_init() ) + * long int. */ static long interval_to_usec(cfs_time_t start, cfs_time_t end) { @@ -554,23 +610,19 @@ static void __osd_th_check_slow(void *oth, struct osd_device *dev, /* * Concurrency: doesn't access mutable data. */ -static int osd_param_is_sane(const struct osd_device *dev, - const struct thandle *th) +static int osd_param_is_not_sane(const struct osd_device *dev, + const struct thandle *th) { - struct osd_thandle *oh; - oh = container_of0(th, struct osd_thandle, ot_super); - return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers; + struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super); + + return oh->ot_credits > osd_journal(dev)->j_max_transaction_buffers; } /* * Concurrency: shouldn't matter. */ -#ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD static void osd_trans_commit_cb(struct super_block *sb, - struct journal_callback *jcb, int error) -#else -static void osd_trans_commit_cb(struct journal_callback *jcb, int error) -#endif + struct ldiskfs_journal_cb_entry *jcb, int error) { struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb); struct thandle *th = &oh->ot_super; @@ -584,9 +636,14 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error) dt_txn_hook_commit(th); - /* call per-transaction callbacks if any */ - cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) - dcb->dcb_func(NULL, th, dcb, error); + /* call per-transaction callbacks if any */ + cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) { + LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC, + "commit callback entry: magic=%x name='%s'\n", + dcb->dcb_magic, dcb->dcb_name); + cfs_list_del_init(&dcb->dcb_linkage); + dcb->dcb_func(NULL, th, dcb, error); + } lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th); lu_device_put(lud); @@ -601,13 +658,19 @@ static struct thandle *osd_trans_create(const struct lu_env *env, struct dt_device *d) { struct osd_thread_info *oti = osd_oti_get(env); + struct osd_iobuf *iobuf = &oti->oti_iobuf; struct osd_thandle *oh; struct thandle *th; ENTRY; + /* on pending IO in this thread should left from prev. request */ + LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0); + th = ERR_PTR(-ENOMEM); OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); if (oh != NULL) { + oh->ot_quota_trans = &oti->oti_quota_trans; + memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans)); th = &oh->ot_super; th->th_dev = d; th->th_result = 0; @@ -616,6 +679,11 @@ static struct thandle *osd_trans_create(const struct lu_env *env, oti->oti_dev = osd_dt_dev(d); CFS_INIT_LIST_HEAD(&oh->ot_dcb_list); osd_th_alloced(oh); + + memset(oti->oti_declare_ops, 0, OSD_OT_MAX); + memset(oti->oti_declare_ops_rb, 0, OSD_OT_MAX); + memset(oti->oti_declare_ops_cred, 0, OSD_OT_MAX); + oti->oti_rollback = false; } RETURN(th); } @@ -644,29 +712,60 @@ int osd_trans_start(const struct lu_env *env, struct dt_device *d, if (rc != 0) GOTO(out, rc); - if (!osd_param_is_sane(dev, th)) { - CWARN("%s: too many transaction credits (%d > %d)\n", - d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits, - osd_journal(dev)->j_max_transaction_buffers); - /* XXX Limit the credits to 'max_transaction_buffers', and - * let the underlying filesystem to catch the error if - * we really need so many credits. - * - * This should be removed when we can calculate the - * credits precisely. */ - oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers; + if (unlikely(osd_param_is_not_sane(dev, th))) { + static unsigned long last_printed; + static int last_credits; + + CWARN("%.16s: too many transaction credits (%d > %d)\n", + LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name, + oh->ot_credits, + osd_journal(dev)->j_max_transaction_buffers); #ifdef OSD_TRACK_DECLARES - CERROR(" attr_set: %d, punch: %d, xattr_set: %d,\n", - oh->ot_declare_attr_set, oh->ot_declare_punch, - oh->ot_declare_xattr_set); - CERROR(" create: %d, ref_add: %d, ref_del: %d, write: %d\n", - oh->ot_declare_create, oh->ot_declare_ref_add, - oh->ot_declare_ref_del, oh->ot_declare_write); - CERROR(" insert: %d, delete: %d, destroy: %d\n", - oh->ot_declare_insert, oh->ot_declare_delete, - oh->ot_declare_destroy); + CWARN(" create: %u/%u, delete: %u/%u, destroy: %u/%u\n", + oti->oti_declare_ops[OSD_OT_CREATE], + oti->oti_declare_ops_cred[OSD_OT_CREATE], + oti->oti_declare_ops[OSD_OT_DELETE], + oti->oti_declare_ops_cred[OSD_OT_DELETE], + oti->oti_declare_ops[OSD_OT_DESTROY], + oti->oti_declare_ops_cred[OSD_OT_DESTROY]); + CWARN(" attr_set: %u/%u, xattr_set: %u/%u\n", + oti->oti_declare_ops[OSD_OT_ATTR_SET], + oti->oti_declare_ops_cred[OSD_OT_ATTR_SET], + oti->oti_declare_ops[OSD_OT_XATTR_SET], + oti->oti_declare_ops_cred[OSD_OT_XATTR_SET]); + CWARN(" write: %u/%u, punch: %u/%u, quota %u/%u\n", + oti->oti_declare_ops[OSD_OT_WRITE], + oti->oti_declare_ops_cred[OSD_OT_WRITE], + oti->oti_declare_ops[OSD_OT_PUNCH], + oti->oti_declare_ops_cred[OSD_OT_PUNCH], + oti->oti_declare_ops[OSD_OT_QUOTA], + oti->oti_declare_ops_cred[OSD_OT_QUOTA]); + CWARN(" insert: %u/%u, delete: %u/%u\n", + oti->oti_declare_ops[OSD_OT_INSERT], + oti->oti_declare_ops_cred[OSD_OT_INSERT], + oti->oti_declare_ops[OSD_OT_DESTROY], + oti->oti_declare_ops_cred[OSD_OT_DESTROY]); + CWARN(" ref_add: %u/%u, ref_del: %u/%u\n", + oti->oti_declare_ops[OSD_OT_REF_ADD], + oti->oti_declare_ops_cred[OSD_OT_REF_ADD], + oti->oti_declare_ops[OSD_OT_REF_DEL], + oti->oti_declare_ops_cred[OSD_OT_REF_DEL]); + + if (last_credits != oh->ot_credits && + time_after(jiffies, last_printed + 60 * HZ)) { + libcfs_debug_dumpstack(NULL); + last_credits = oh->ot_credits; + last_printed = jiffies; + } #endif - } + /* XXX Limit the credits to 'max_transaction_buffers', and + * let the underlying filesystem to catch the error if + * we really need so many credits. + * + * This should be removed when we can calculate the + * credits precisely. */ + oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers; + } /* * XXX temporary stuff. Some abstraction layer should @@ -683,22 +782,6 @@ int osd_trans_start(const struct lu_env *env, struct dt_device *d, lu_device_get(&d->dd_lu_dev); oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference, "osd-tx", th); - - /* - * XXX: current rule is that we first start tx, - * then lock object(s), but we can't use - * this rule for data (due to locking specifics - * in ldiskfs). also in long-term we'd like to - * use usually-used (locks;tx) ordering. so, - * UGLY thing is that we'll use one ordering for - * data (ofd) and reverse ordering for metadata - * (mdd). then at some point we'll fix the latter - */ - if (lu_device_is_md(&d->dd_lu_dev)) { - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - } - oti->oti_txns++; rc = 0; } else { @@ -716,43 +799,38 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th) int rc = 0; struct osd_thandle *oh; struct osd_thread_info *oti = osd_oti_get(env); - + struct osd_iobuf *iobuf = &oti->oti_iobuf; + struct qsd_instance *qsd = oti->oti_dev->od_quota_slave; ENTRY; oh = container_of0(th, struct osd_thandle, ot_super); + if (qsd != NULL) + /* inform the quota slave device that the transaction is + * stopping */ + qsd_op_end(env, qsd, oh->ot_quota_trans); + oh->ot_quota_trans = NULL; + if (oh->ot_handle != NULL) { handle_t *hdl = oh->ot_handle; - hdl->h_sync = th->th_sync; - /* * add commit callback * notice we don't do this in osd_trans_start() * as underlying transaction can change during truncate */ - osd_journal_callback_set(hdl, osd_trans_commit_cb, + ldiskfs_journal_callback_add(hdl, osd_trans_commit_cb, &oh->ot_jcb); LASSERT(oti->oti_txns == 1); oti->oti_txns--; - /* - * XXX: current rule is that we first start tx, - * then lock object(s), but we can't use - * this rule for data (due to locking specifics - * in ldiskfs). also in long-term we'd like to - * use usually-used (locks;tx) ordering. so, - * UGLY thing is that we'll use one ordering for - * data (ofd) and reverse ordering for metadata - * (mdd). then at some point we'll fix the latter - */ - if (lu_device_is_md(&th->th_dev->dd_lu_dev)) { - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - } rc = dt_txn_hook_stop(env, th); if (rc != 0) CERROR("Failure in transaction hook: %d\n", rc); + + /* hook functions might modify th_sync */ + hdl->h_sync = th->th_sync; + oh->ot_handle = NULL; OSD_CHECK_SLOW_TH(oh, oti->oti_dev, rc = ldiskfs_journal_stop(hdl)); @@ -762,17 +840,33 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th) OBD_FREE_PTR(oh); } + /* as we want IO to journal and data IO be concurrent, we don't block + * awaiting data IO completion in osd_do_bio(), instead we wait here + * once transaction is submitted to the journal. all reqular requests + * don't do direct IO (except read/write), thus this wait_event becomes + * no-op for them. + * + * IMPORTANT: we have to wait till any IO submited by the thread is + * completed otherwise iobuf may be corrupted by different request + */ + cfs_wait_event(iobuf->dr_wait, + cfs_atomic_read(&iobuf->dr_numreqs) == 0); + if (!rc) + rc = iobuf->dr_error; + RETURN(rc); } static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) { - struct osd_thandle *oh = container_of0(th, struct osd_thandle, - ot_super); + struct osd_thandle *oh = container_of0(th, struct osd_thandle, + ot_super); - cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list); + LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC); + LASSERT(&dcb->dcb_func != NULL); + cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list); - return 0; + return 0; } /* @@ -795,8 +889,24 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) osd_index_fini(obj); if (inode != NULL) { + struct qsd_instance *qsd = osd_obj2dev(obj)->od_quota_slave; + qid_t uid = inode->i_uid; + qid_t gid = inode->i_gid; + iput(inode); obj->oo_inode = NULL; + + if (qsd != NULL) { + struct osd_thread_info *info = osd_oti_get(env); + struct lquota_id_info *qi = &info->oti_qi; + + /* Release granted quota to master if necessary */ + qi->lqi_id.qid_uid = uid; + qsd_op_adjust(env, qsd, &qi->lqi_id, USRQUOTA); + + qi->lqi_id.qid_uid = gid; + qsd_op_adjust(env, qsd, &qi->lqi_id, GRPQUOTA); + } } } @@ -821,7 +931,8 @@ static int osd_object_print(const struct lu_env *env, void *cookie, d = o->oo_dir->od_container.ic_descr; else d = NULL; - return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]", + return (*p)(env, cookie, + LUSTRE_OSD_LDISKFS_NAME"-object@%p(i:%p:%lu/%u)[%s]", o, o->oo_inode, o->oo_inode ? o->oo_inode->i_ino : 0UL, o->oo_inode ? o->oo_inode->i_generation : 0, @@ -832,27 +943,58 @@ static int osd_object_print(const struct lu_env *env, void *cookie, * Concurrency: shouldn't matter. */ int osd_statfs(const struct lu_env *env, struct dt_device *d, - cfs_kstatfs_t *sfs) + struct obd_statfs *sfs) { - struct osd_device *osd = osd_dt_dev(d); + struct osd_device *osd = osd_dt_dev(d); struct super_block *sb = osd_sb(osd); + struct kstatfs *ksfs; int result = 0; - cfs_spin_lock(&osd->od_osfs_lock); - /* cache 1 second */ - if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) { - result = ll_do_statfs(sb, &osd->od_kstatfs); - if (likely(result == 0)) /* N.B. statfs can't really fail */ - osd->od_osfs_age = cfs_time_current_64(); + if (unlikely(osd->od_mnt == NULL)) + return -EINPROGRESS; + + /* osd_lproc.c call this without env, allocate ksfs for that case */ + if (unlikely(env == NULL)) { + OBD_ALLOC_PTR(ksfs); + if (ksfs == NULL) + return -ENOMEM; + } else { + ksfs = &osd_oti_get(env)->oti_ksfs; } - if (likely(result == 0)) - *sfs = osd->od_kstatfs; - cfs_spin_unlock(&osd->od_osfs_lock); + spin_lock(&osd->od_osfs_lock); + /* cache 1 second */ + if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) { + result = sb->s_op->statfs(sb->s_root, ksfs); + if (likely(result == 0)) { /* N.B. statfs can't really fail */ + osd->od_osfs_age = cfs_time_current_64(); + statfs_pack(&osd->od_statfs, ksfs); + if (sb->s_flags & MS_RDONLY) + sfs->os_state = OS_STATE_READONLY; + } + } + + if (likely(result == 0)) + *sfs = osd->od_statfs; + spin_unlock(&osd->od_osfs_lock); + + if (unlikely(env == NULL)) + OBD_FREE_PTR(ksfs); return result; } +/** + * Estimate space needed for file creations. We assume the largest filename + * which is 2^64 - 1, hence a filename of 20 chars. + * This is 28 bytes per object which is 28MB for 1M objects ... no so bad. + */ +#ifdef __LDISKFS_DIR_REC_LEN +#define PER_OBJ_USAGE __LDISKFS_DIR_REC_LEN(20) +#else +#define PER_OBJ_USAGE LDISKFS_DIR_REC_LEN(20) +#endif + /* * Concurrency: doesn't access mutable data. */ @@ -865,9 +1007,20 @@ static void osd_conf_get(const struct lu_env *env, /* * XXX should be taken from not-yet-existing fs abstraction layer. */ + param->ddp_mnt = osd_dt_dev(dev)->od_mnt; param->ddp_max_name_len = LDISKFS_NAME_LEN; param->ddp_max_nlink = LDISKFS_LINK_MAX; - param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits; + param->ddp_block_shift = sb->s_blocksize_bits; + param->ddp_mount_type = LDD_MT_LDISKFS; + param->ddp_maxbytes = sb->s_maxbytes; + /* Overhead estimate should be fairly accurate, so we really take a tiny + * error margin which also avoids fragmenting the filesystem too much */ + param->ddp_grant_reserved = 2; /* end up to be 1.9% after conversion */ + /* inode are statically allocated, so per-inode space consumption + * is the space consumed by the directory entry */ + param->ddp_inodespace = PER_OBJ_USAGE; + /* per-fragment overhead to be used by the client code */ + param->ddp_grant_frag = 6 * LDISKFS_BLOCK_SIZE(sb); param->ddp_mntopts = 0; if (test_opt(sb, XATTR_USER)) param->ddp_mntopts |= MNTOPT_USERXATTR; @@ -883,25 +1036,12 @@ static void osd_conf_get(const struct lu_env *env, } -/** - * Helper function to get and fill the buffer with input values. - */ -static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len) -{ - struct lu_buf *buf; - - buf = &osd_oti_get(env)->oti_buf; - buf->lb_buf = area; - buf->lb_len = len; - return buf; -} - /* * Concurrency: shouldn't matter. */ static int osd_sync(const struct lu_env *env, struct dt_device *d) { - CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME); + CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_LDISKFS_NAME); return ldiskfs_force_commit(osd_sb(osd_dt_dev(d))); } @@ -923,7 +1063,7 @@ static int osd_commit_async(const struct lu_env *env, struct super_block *s = osd_sb(osd_dt_dev(d)); ENTRY; - CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME); + CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_LDISKFS_NAME); RETURN(s->s_op->sync_fs(s, 0)); } @@ -931,15 +1071,33 @@ static int osd_commit_async(const struct lu_env *env, * Concurrency: shouldn't matter. */ -static void osd_ro(const struct lu_env *env, struct dt_device *d) +static int osd_ro(const struct lu_env *env, struct dt_device *d) { - struct super_block *sb = osd_sb(osd_dt_dev(d)); - ENTRY; - - CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME); - - __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev); - EXIT; + struct super_block *sb = osd_sb(osd_dt_dev(d)); + struct block_device *dev = sb->s_bdev; +#ifdef HAVE_DEV_SET_RDONLY + struct block_device *jdev = LDISKFS_SB(sb)->journal_bdev; + int rc = 0; +#else + int rc = -EOPNOTSUPP; +#endif + ENTRY; + +#ifdef HAVE_DEV_SET_RDONLY + CERROR("*** setting %s read-only ***\n", osd_dt_dev(d)->od_svname); + + if (jdev && (jdev != dev)) { + CDEBUG(D_IOCTL | D_HA, "set journal dev %lx rdonly\n", + (long)jdev); + dev_set_rdonly(jdev); + } + CDEBUG(D_IOCTL | D_HA, "set dev %lx rdonly\n", (long)dev); + dev_set_rdonly(dev); +#else + CERROR("%s: %lx CANNOT BE SET READONLY: rc = %d\n", + osd_dt_dev(d)->od_svname, (long)dev, rc); +#endif + RETURN(rc); } /* @@ -960,25 +1118,6 @@ static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d, } /** - * Concurrency: serialization provided by callers. - */ -static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d, - struct dt_quota_ctxt *ctxt, void *data) -{ - struct obd_device *obd = (void *)ctxt; - struct vfsmount *mnt = (struct vfsmount *)data; - ENTRY; - - obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb; - OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); - obd->obd_lvfs_ctxt.pwdmnt = mnt; - obd->obd_lvfs_ctxt.pwd = mnt->mnt_root; - obd->obd_lvfs_ctxt.fs = get_ds(); - - EXIT; -} - -/** * Note: we do not count into QUOTA here. * If we mount with --data_journal we may need more. */ @@ -993,7 +1132,7 @@ const int osd_dto_credits_noquota[DTO_NR] = { [DTO_INDEX_INSERT] = 16, [DTO_INDEX_DELETE] = 16, /** - * Unused now + * Used for OI scrub */ [DTO_INDEX_UPDATE] = 16, /** @@ -1046,7 +1185,6 @@ static const struct dt_device_operations osd_dt_ops = { .dt_ro = osd_ro, .dt_commit_async = osd_commit_async, .dt_init_capa_ctxt = osd_init_capa_ctxt, - .dt_init_quota_ctxt= osd_init_quota_ctxt, }; static void osd_object_read_lock(const struct lu_env *env, @@ -1058,7 +1196,7 @@ static void osd_object_read_lock(const struct lu_env *env, LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_owner != env); - cfs_down_read_nested(&obj->oo_sem, role); + down_read_nested(&obj->oo_sem, role); LASSERT(obj->oo_owner == NULL); oti->oti_r_locks++; @@ -1073,7 +1211,7 @@ static void osd_object_write_lock(const struct lu_env *env, LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_owner != env); - cfs_down_write_nested(&obj->oo_sem, role); + down_write_nested(&obj->oo_sem, role); LASSERT(obj->oo_owner == NULL); obj->oo_owner = env; @@ -1090,7 +1228,7 @@ static void osd_object_read_unlock(const struct lu_env *env, LASSERT(oti->oti_r_locks > 0); oti->oti_r_locks--; - cfs_up_read(&obj->oo_sem); + up_read(&obj->oo_sem); } static void osd_object_write_unlock(const struct lu_env *env, @@ -1105,7 +1243,7 @@ static void osd_object_write_unlock(const struct lu_env *env, LASSERT(oti->oti_w_locks > 0); oti->oti_w_locks--; obj->oo_owner = NULL; - cfs_up_write(&obj->oo_sem); + up_write(&obj->oo_sem); } static int osd_object_write_locked(const struct lu_env *env, @@ -1144,14 +1282,14 @@ static int capa_is_sane(const struct lu_env *env, RETURN(-ESTALE); } - cfs_spin_lock(&capa_lock); - for (i = 0; i < 2; i++) { - if (keys[i].lk_keyid == capa->lc_keyid) { - oti->oti_capa_key = keys[i]; - break; - } - } - cfs_spin_unlock(&capa_lock); + spin_lock(&capa_lock); + for (i = 0; i < 2; i++) { + if (keys[i].lk_keyid == capa->lc_keyid) { + oti->oti_capa_key = keys[i]; + break; + } + } + spin_unlock(&capa_lock); if (i == 2) { DEBUG_CAPA(D_ERROR, capa, "no matched capa key"); @@ -1219,15 +1357,15 @@ int osd_object_auth(const struct lu_env *env, struct dt_object *dt, } static struct timespec *osd_inode_time(const struct lu_env *env, - struct inode *inode, __u64 seconds) + struct inode *inode, __u64 seconds) { - struct osd_thread_info *oti = osd_oti_get(env); - struct timespec *t = &oti->oti_time; + struct osd_thread_info *oti = osd_oti_get(env); + struct timespec *t = &oti->oti_time; - t->tv_sec = seconds; - t->tv_nsec = 0; - *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb)); - return t; + t->tv_sec = seconds; + t->tv_nsec = 0; + *t = timespec_trunc(*t, inode->i_sb->s_time_gran); + return t; } @@ -1236,7 +1374,8 @@ static void osd_inode_getattr(const struct lu_env *env, { attr->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | LA_SIZE | LA_BLOCKS | LA_UID | LA_GID | - LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE; + LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE | + LA_TYPE; attr->la_atime = LTIME_S(inode->i_atime); attr->la_mtime = LTIME_S(inode->i_mtime); @@ -1249,8 +1388,8 @@ static void osd_inode_getattr(const struct lu_env *env, attr->la_flags = LDISKFS_I(inode)->i_flags; attr->la_nlink = inode->i_nlink; attr->la_rdev = inode->i_rdev; - attr->la_blksize = ll_inode_blksize(inode); - attr->la_blkbits = inode->i_blkbits; + attr->la_blksize = 1 << inode->i_blkbits; + attr->la_blkbits = inode->i_blkbits; } static int osd_attr_get(const struct lu_env *env, @@ -1260,16 +1399,16 @@ static int osd_attr_get(const struct lu_env *env, { struct osd_object *obj = osd_dt_obj(dt); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LINVRNT(osd_invariant(obj)); if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) return -EACCES; - cfs_spin_lock(&obj->oo_guard); - osd_inode_getattr(env, obj->oo_inode, attr); - cfs_spin_unlock(&obj->oo_guard); - return 0; + spin_lock(&obj->oo_guard); + osd_inode_getattr(env, obj->oo_inode, attr); + spin_unlock(&obj->oo_guard); + return 0; } static int osd_declare_attr_set(const struct lu_env *env, @@ -1277,35 +1416,139 @@ static int osd_declare_attr_set(const struct lu_env *env, const struct lu_attr *attr, struct thandle *handle) { - struct osd_thandle *oh; - struct osd_object *obj; - - LASSERT(dt != NULL); - LASSERT(handle != NULL); - - obj = osd_dt_obj(dt); - LASSERT(osd_invariant(obj)); - - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); - - OSD_DECLARE_OP(oh, attr_set); - oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; - - if (attr && attr->la_valid & LA_UID) { - if (obj->oo_inode) - osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid, - obj->oo_inode); - osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL); - } - if (attr && attr->la_valid & LA_GID) { - if (obj->oo_inode) - osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid, - obj->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL); - } - - return 0; + struct osd_thandle *oh; + struct osd_object *obj; + struct osd_thread_info *info = osd_oti_get(env); + struct lquota_id_info *qi = &info->oti_qi; + long long bspace; + int rc = 0; + bool allocated; + ENTRY; + + LASSERT(dt != NULL); + LASSERT(handle != NULL); + + obj = osd_dt_obj(dt); + LASSERT(osd_invariant(obj)); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + osd_trans_declare_op(env, oh, OSD_OT_ATTR_SET, + osd_dto_credits_noquota[DTO_ATTR_SET_BASE]); + + if (attr == NULL || obj->oo_inode == NULL) + RETURN(rc); + + bspace = obj->oo_inode->i_blocks; + bspace <<= obj->oo_inode->i_sb->s_blocksize_bits; + bspace = toqb(bspace); + + /* Changing ownership is always preformed by super user, it should not + * fail with EDQUOT. + * + * We still need to call the osd_declare_qid() to calculate the journal + * credits for updating quota accounting files and to trigger quota + * space adjustment once the operation is completed.*/ + if ((attr->la_valid & LA_UID) != 0 && + attr->la_uid != obj->oo_inode->i_uid) { + qi->lqi_type = USRQUOTA; + + /* inode accounting */ + qi->lqi_is_blk = false; + + /* one more inode for the new owner ... */ + qi->lqi_id.qid_uid = attr->la_uid; + qi->lqi_space = 1; + allocated = (attr->la_uid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and one less inode for the current uid */ + qi->lqi_id.qid_uid = obj->oo_inode->i_uid; + qi->lqi_space = -1; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* block accounting */ + qi->lqi_is_blk = true; + + /* more blocks for the new owner ... */ + qi->lqi_id.qid_uid = attr->la_uid; + qi->lqi_space = bspace; + allocated = (attr->la_uid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and finally less blocks for the current owner */ + qi->lqi_id.qid_uid = obj->oo_inode->i_uid; + qi->lqi_space = -bspace; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + } + + if (attr->la_valid & LA_GID && + attr->la_gid != obj->oo_inode->i_gid) { + qi->lqi_type = GRPQUOTA; + + /* inode accounting */ + qi->lqi_is_blk = false; + + /* one more inode for the new group owner ... */ + qi->lqi_id.qid_gid = attr->la_gid; + qi->lqi_space = 1; + allocated = (attr->la_gid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and one less inode for the current gid */ + qi->lqi_id.qid_gid = obj->oo_inode->i_gid; + qi->lqi_space = -1; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* block accounting */ + qi->lqi_is_blk = true; + + /* more blocks for the new owner ... */ + qi->lqi_id.qid_gid = attr->la_gid; + qi->lqi_space = bspace; + allocated = (attr->la_gid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and finally less blocks for the current owner */ + qi->lqi_id.qid_gid = obj->oo_inode->i_gid; + qi->lqi_space = -bspace; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + } + + RETURN(rc); } static int osd_inode_setattr(const struct lu_env *env, @@ -1315,30 +1558,6 @@ static int osd_inode_setattr(const struct lu_env *env, bits = attr->la_valid; - LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */ - -#ifdef HAVE_QUOTA_SUPPORT - if ((bits & LA_UID && attr->la_uid != inode->i_uid) || - (bits & LA_GID && attr->la_gid != inode->i_gid)) { - struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt; - struct iattr iattr; - int rc; - - iattr.ia_valid = 0; - if (bits & LA_UID) - iattr.ia_valid |= ATTR_UID; - if (bits & LA_GID) - iattr.ia_valid |= ATTR_GID; - iattr.ia_uid = attr->la_uid; - iattr.ia_gid = attr->la_gid; - osd_push_ctxt(env, save); - rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0; - osd_pop_ctxt(save); - if (rc != 0) - return rc; - } -#endif - if (bits & LA_ATIME) inode->i_atime = *osd_inode_time(env, inode, attr->la_atime); if (bits & LA_CTIME) @@ -1364,7 +1583,7 @@ static int osd_inode_setattr(const struct lu_env *env, if (bits & LA_GID) inode->i_gid = attr->la_gid; if (bits & LA_NLINK) - inode->i_nlink = attr->la_nlink; + set_nlink(inode, attr->la_nlink); if (bits & LA_RDEV) inode->i_rdev = attr->la_rdev; @@ -1376,6 +1595,32 @@ static int osd_inode_setattr(const struct lu_env *env, return 0; } +static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr) +{ + if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) || + (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) { + struct iattr iattr; + int rc; + + iattr.ia_valid = 0; + if (attr->la_valid & LA_UID) + iattr.ia_valid |= ATTR_UID; + if (attr->la_valid & LA_GID) + iattr.ia_valid |= ATTR_GID; + iattr.ia_uid = attr->la_uid; + iattr.ia_gid = attr->la_gid; + + rc = ll_vfs_dq_transfer(inode, &iattr); + if (rc) { + CERROR("%s: quota transfer failed: rc = %d. Is quota " + "enforcement enabled on the ldiskfs filesystem?", + inode->i_sb->s_id, rc); + return rc; + } + } + return 0; +} + static int osd_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, @@ -1383,67 +1628,66 @@ static int osd_attr_set(const struct lu_env *env, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode; int rc; LASSERT(handle != NULL); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(osd_invariant(obj)); if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; - OSD_EXEC_OP(handle, attr_set); - - cfs_spin_lock(&obj->oo_guard); - rc = osd_inode_setattr(env, obj->oo_inode, attr); - cfs_spin_unlock(&obj->oo_guard); + osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET); + + if (OBD_FAIL_CHECK(OBD_FAIL_OSD_FID_MAPPING)) { + struct osd_thread_info *oti = osd_oti_get(env); + const struct lu_fid *fid0 = lu_object_fid(&dt->do_lu); + struct lu_fid *fid1 = &oti->oti_fid; + struct osd_inode_id *id = &oti->oti_id; + struct iam_path_descr *ipd; + struct iam_container *bag; + struct osd_thandle *oh; + int rc; + + fid_cpu_to_be(fid1, fid0); + memset(id, 1, sizeof(*id)); + bag = &osd_fid2oi(osd_dev(dt->do_lu.lo_dev), + fid0)->oi_dir.od_container; + ipd = osd_idx_ipd_get(env, bag); + if (unlikely(ipd == NULL)) + RETURN(-ENOMEM); + + oh = container_of0(handle, struct osd_thandle, ot_super); + rc = iam_update(oh->ot_handle, bag, (const struct iam_key *)fid1, + (const struct iam_rec *)id, ipd); + osd_ipd_put(env, bag, ipd); + return(rc > 0 ? 0 : rc); + } + + inode = obj->oo_inode; + ll_vfs_dq_init(inode); + + rc = osd_quota_transfer(inode, attr); + if (rc) + return rc; + + spin_lock(&obj->oo_guard); + rc = osd_inode_setattr(env, inode, attr); + spin_unlock(&obj->oo_guard); if (!rc) - obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode); + ll_dirty_inode(inode, I_DIRTY_DATASYNC); return rc; } -/* - * Object creation. - * - * XXX temporary solution. - */ -static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj, - struct lu_attr *attr, struct thandle *th) -{ - return 0; -} - -static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj, - struct lu_attr *attr, struct thandle *th) -{ - osd_object_init0(obj); - if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW)) - unlock_new_inode(obj->oo_inode); - return 0; -} - -static struct dentry * osd_child_dentry_get(const struct lu_env *env, - struct osd_object *obj, - const char *name, - const int namelen) +struct dentry *osd_child_dentry_get(const struct lu_env *env, + struct osd_object *obj, + const char *name, const int namelen) { - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *child_dentry = &info->oti_child_dentry; - struct dentry *obj_dentry = &info->oti_obj_dentry; - - obj_dentry->d_inode = obj->oo_inode; - obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); - obj_dentry->d_name.hash = 0; - - child_dentry->d_name.hash = 0; - child_dentry->d_parent = obj_dentry; - child_dentry->d_name.name = name; - child_dentry->d_name.len = namelen; - return child_dentry; + return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen); } - static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, cfs_umode_t mode, struct dt_allocation_hint *hint, @@ -1452,11 +1696,8 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, int result; struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oth; - struct dt_object *parent; + struct dt_object *parent = NULL; struct inode *inode; -#ifdef HAVE_QUOTA_SUPPORT - struct osd_ctxt *save = &info->oti_ctxt; -#endif LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); @@ -1473,25 +1714,20 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, if (hint && hint->dah_parent) parent = hint->dah_parent; - else - parent = osd->od_obj_area; - - LASSERT(parent != NULL); - LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL); -#ifdef HAVE_QUOTA_SUPPORT - osd_push_ctxt(info->oti_env, save); -#endif inode = ldiskfs_create_inode(oth->ot_handle, - osd_dt_obj(parent)->oo_inode, mode); -#ifdef HAVE_QUOTA_SUPPORT - osd_pop_ctxt(save); -#endif + parent ? osd_dt_obj(parent)->oo_inode : + osd_sb(osd)->s_root->d_inode, + mode); if (!IS_ERR(inode)) { /* Do not update file c/mtime in ldiskfs. * NB: don't need any lock because no contention at this * early stage */ inode->i_flags |= S_NOCMTIME; + + /* For new created object, it must be consistent, + * and it is unnecessary to scrub against it. */ + ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NOSCRUB); obj->oo_inode = inode; result = 0; } else { @@ -1517,7 +1753,6 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, { int result; struct osd_thandle *oth; - struct osd_device *osd = osd_obj2dev(obj); __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX)); LASSERT(S_ISDIR(attr->la_mode)); @@ -1525,16 +1760,7 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); result = osd_mkfile(info, obj, mode, hint, th); - if (result == 0 && osd->od_iop_mode == 0) { - LASSERT(obj->oo_inode != NULL); - /* - * XXX uh-oh... call low-level iam function directly. - */ - result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4, - sizeof (struct osd_fid_pack), - oth->ot_handle); - } return result; } @@ -1548,7 +1774,7 @@ static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj, struct osd_thandle *oth; const struct dt_index_features *feat = dof->u.dof_idx.di_feat; - __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX)); + __u32 mode = (attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX)); LASSERT(S_ISREG(attr->la_mode)); @@ -1583,7 +1809,7 @@ static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj, { LASSERT(S_ISREG(attr->la_mode)); return osd_mkfile(info, obj, (attr->la_mode & - (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th); + (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th); } static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj, @@ -1594,7 +1820,7 @@ static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj, { LASSERT(S_ISLNK(attr->la_mode)); return osd_mkfile(info, obj, (attr->la_mode & - (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th); + (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th); } static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj, @@ -1603,7 +1829,7 @@ static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj, struct dt_object_format *dof, struct thandle *th) { - cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX); + cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX); int result; LINVRNT(osd_invariant(obj)); @@ -1614,7 +1840,12 @@ static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj, result = osd_mkfile(info, obj, mode, hint, th); if (result == 0) { LASSERT(obj->oo_inode != NULL); - init_special_inode(obj->oo_inode, mode, attr->la_rdev); + /* + * This inode should be marked dirty for i_rdev. Currently + * that is done in the osd_attr_init(). + */ + init_special_inode(obj->oo_inode, obj->oo_inode->i_mode, + attr->la_rdev); } LINVRNT(osd_invariant(obj)); return result; @@ -1656,7 +1887,8 @@ static osd_obj_type_f osd_create_type_f(enum dt_format_type type) static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, - struct dt_object *parent, cfs_umode_t child_mode) + struct dt_object *parent, struct dt_object *child, + cfs_umode_t child_mode) { LASSERT(ah); @@ -1665,6 +1897,44 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, ah->dah_mode = child_mode; } +static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj, + struct lu_attr *attr, struct dt_object_format *dof) +{ + struct inode *inode = obj->oo_inode; + __u64 valid = attr->la_valid; + int result; + + attr->la_valid &= ~(LA_TYPE | LA_MODE); + + if (dof->dof_type != DFT_NODE) + attr->la_valid &= ~LA_RDEV; + if ((valid & LA_ATIME) && (attr->la_atime == LTIME_S(inode->i_atime))) + attr->la_valid &= ~LA_ATIME; + if ((valid & LA_CTIME) && (attr->la_ctime == LTIME_S(inode->i_ctime))) + attr->la_valid &= ~LA_CTIME; + if ((valid & LA_MTIME) && (attr->la_mtime == LTIME_S(inode->i_mtime))) + attr->la_valid &= ~LA_MTIME; + + result = osd_quota_transfer(inode, attr); + if (result) + return; + + if (attr->la_valid != 0) { + result = osd_inode_setattr(info->oti_env, inode, attr); + /* + * The osd_inode_setattr() should always succeed here. The + * only error that could be returned is EDQUOT when we are + * trying to change the UID or GID of the inode. However, this + * should not happen since quota enforcement is no longer + * enabled on ldiskfs (lquota takes care of it). + */ + LASSERTF(result == 0, "%d", result); + ll_dirty_inode(inode, I_DIRTY_DATASYNC); + } + + attr->la_valid = valid; +} + /** * Helper function for osd_object_create() * @@ -1676,17 +1946,27 @@ static int __osd_object_create(struct osd_thread_info *info, struct dt_object_format *dof, struct thandle *th) { + int result; + __u32 umask; - int result; + /* we drop umask so that permissions we pass are not affected */ + umask = current->fs->umask; + current->fs->umask = 0; - result = osd_create_pre(info, obj, attr, th); + result = osd_create_type_f(dof->dof_type)(info, obj, attr, hint, dof, + th); if (result == 0) { - result = osd_create_type_f(dof->dof_type)(info, obj, - attr, hint, dof, th); - if (result == 0) - result = osd_create_post(info, obj, attr, th); + osd_attr_init(info, obj, attr, dof); + osd_object_init0(obj); + /* bz 24037 */ + if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW)) + unlock_new_inode(obj->oo_inode); } - return result; + + /* restore previous umask value */ + current->fs->umask = umask; + + return result; } /** @@ -1700,60 +1980,107 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj, struct osd_thread_info *info = osd_oti_get(env); struct osd_inode_id *id = &info->oti_id; struct osd_device *osd = osd_obj2dev(obj); - struct md_ucred *uc = md_ucred(env); LASSERT(obj->oo_inode != NULL); - LASSERT(uc != NULL); - - id->oii_ino = obj->oo_inode->i_ino; - id->oii_gen = obj->oo_inode->i_generation; - return osd_oi_insert(info, osd_fid2oi(osd, fid), fid, id, th, - uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); + osd_id_gen(id, obj->oo_inode->i_ino, obj->oo_inode->i_generation); + return osd_oi_insert(info, osd, fid, id, th); } -static int osd_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *handle) +int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd, + const struct lu_fid *fid, struct lu_seq_range *range) { - struct osd_thandle *oh; - - LASSERT(handle != NULL); + struct seq_server_site *ss = osd_seq_site(osd); + int rc; - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); + if (fid_is_idif(fid)) { + range->lsr_flags = LU_SEQ_RANGE_OST; + range->lsr_index = fid_idif_ost_idx(fid); + return 0; + } - OSD_DECLARE_OP(oh, create); - oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE]; - /* XXX: So far, only normal fid needs be inserted into the oi, - * things could be changed later. Revise following code then. */ - if (fid_is_norm(lu_object_fid(&dt->do_lu))) { - OSD_DECLARE_OP(oh, insert); - oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; - } - /* If this is directory, then we expect . and .. to be inserted as - * well. The one directory block always needs to be created for the - * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap, - * block), there is no danger of needing a tree for the first block. - */ - if (attr && S_ISDIR(attr->la_mode)) { - OSD_DECLARE_OP(oh, insert); - OSD_DECLARE_OP(oh, insert); - oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE]; - } + if (!fid_seq_in_fldb(fid_seq(fid))) { + range->lsr_flags = LU_SEQ_RANGE_MDT; + if (ss != NULL) + /* FIXME: If ss is NULL, it suppose not get lsr_index + * at all */ + range->lsr_index = ss->ss_node_id; + return 0; + } - if (attr) { - osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL); - osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL); - } - return 0; + LASSERT(ss != NULL); + range->lsr_flags = -1; + rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range); + if (rc != 0) { + CERROR("%s can not find "DFID": rc = %d\n", + osd_name(osd), PFID(fid), rc); + } + return rc; } -static int osd_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, +/* + * Concurrency: no external locking is necessary. + */ +static int osd_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *handle) +{ + struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range; + struct osd_thandle *oh; + int rc; + ENTRY; + + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + osd_trans_declare_op(env, oh, OSD_OT_CREATE, + osd_dto_credits_noquota[DTO_OBJECT_CREATE]); + if (!fid_is_on_ost(osd_oti_get(env), osd_dt_dev(handle->th_dev), + lu_object_fid(&dt->do_lu))) + /* Reuse idle OI block may cause additional one OI block + * to be changed. */ + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1); + + /* If this is directory, then we expect . and .. to be inserted as + * well. The one directory block always needs to be created for the + * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap, + * block), there is no danger of needing a tree for the first block. + */ + if (attr && S_ISDIR(attr->la_mode)) { + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_WRITE_BASE]); + osd_trans_declare_op(env, oh, OSD_OT_INSERT, 0); + } + + if (!attr) + RETURN(0); + + rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh, + false, false, NULL, false); + if (rc != 0) + RETURN(rc); + + /* It does fld look up inside declare, and the result will be + * added to fld cache, so the following fld lookup inside insert + * does not need send RPC anymore, so avoid send rpc with holding + * transaction */ + if (fid_is_norm(lu_object_fid(&dt->do_lu)) && + !fid_is_last_id(lu_object_fid(&dt->do_lu))) + osd_fld_lookup(env, osd_dt_dev(handle->th_dev), + lu_object_fid(&dt->do_lu), range); + + + RETURN(rc); +} + +static int osd_object_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) @@ -1766,17 +2093,25 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, ENTRY; LINVRNT(osd_invariant(obj)); - LASSERT(!dt_object_exists(dt)); + LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - OSD_EXEC_OP(th, create); + if (unlikely(fid_is_acct(fid))) + /* Quota files can't be created from the kernel any more, + * 'tune2fs -O quota' will take care of creating them */ + RETURN(-EPERM); + + osd_trans_exec_op(env, th, OSD_OT_CREATE); + osd_trans_declare_rb(env, th, OSD_OT_REF_ADD); result = __osd_object_create(info, obj, attr, hint, dof, th); if (result == 0) result = __osd_oi_insert(env, obj, fid, th); - LASSERT(ergo(result == 0, dt_object_exists(dt))); + LASSERT(ergo(result == 0, + dt_object_exists(dt) && !dt_object_remote(dt))); + LASSERT(osd_invariant(obj)); RETURN(result); } @@ -1787,28 +2122,35 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, * Concurrency: must be locked */ static int osd_declare_object_destroy(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) -{ - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thandle *oh; - - ENTRY; - - oh = container_of0(th, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); - LASSERT(inode); - - OSD_DECLARE_OP(oh, destroy); - OSD_DECLARE_OP(oh, delete); - oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE]; - oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; - - osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode); - osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode); - - RETURN(0); + struct dt_object *dt, + struct thandle *th) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thandle *oh; + int rc; + ENTRY; + + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + LASSERT(inode); + + osd_trans_declare_op(env, oh, OSD_OT_DELETE, + osd_dto_credits_noquota[DTO_OBJECT_DELETE]); + /* Recycle idle OI leaf may cause additional three OI blocks + * to be changed. */ + osd_trans_declare_op(env, oh, OSD_OT_DESTROY, + osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3); + + /* one less inode */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh, + false, true, NULL, false); + if (rc) + RETURN(rc); + /* data to be truncated */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } static int osd_object_destroy(const struct lu_env *env, @@ -1828,21 +2170,32 @@ static int osd_object_destroy(const struct lu_env *env, LASSERT(inode); LASSERT(!lu_object_is_dying(dt->do_lu.lo_header)); - if (S_ISDIR(inode->i_mode)) { - LASSERT(osd_inode_unlinked(inode) || - inode->i_nlink == 1); - cfs_spin_lock(&obj->oo_guard); - inode->i_nlink = 0; - cfs_spin_unlock(&obj->oo_guard); - inode->i_sb->s_op->dirty_inode(inode); - } else { - LASSERT(osd_inode_unlinked(inode)); - } - - OSD_EXEC_OP(th, destroy); - - result = osd_oi_delete(osd_oti_get(env), - osd_fid2oi(osd, fid), fid, th); + if (unlikely(fid_is_acct(fid))) + RETURN(-EPERM); + + /* Parallel control for OI scrub. For most of cases, there is no + * lock contention. So it will not affect unlink performance. */ + mutex_lock(&inode->i_mutex); + if (S_ISDIR(inode->i_mode)) { + LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1); + /* it will check/delete the agent inode for every dir + * destory, how to optimize it? unlink performance + * impaction XXX */ + result = osd_delete_from_agent(env, osd, obj, oh); + if (result != 0 && result != -ENOENT) { + CERROR("%s: delete agent inode "DFID": rc = %d\n", + osd_name(osd), PFID(fid), result); + } + spin_lock(&obj->oo_guard); + clear_nlink(inode); + spin_unlock(&obj->oo_guard); + ll_dirty_inode(inode, I_DIRTY_DATASYNC); + } + + osd_trans_exec_op(env, th, OSD_OT_DESTROY); + + result = osd_oi_delete(osd_oti_get(env), osd, fid, th); + mutex_unlock(&inode->i_mutex); /* XXX: add to ext3 orphan list */ /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */ @@ -1853,33 +2206,15 @@ static int osd_object_destroy(const struct lu_env *env, RETURN(0); } -/** - * Helper function for osd_xattr_set() - */ -static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, const char *name, int fl) -{ - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_child_dentry; - int fs_flags = 0; - int rc; - - LASSERT(dt_object_exists(dt)); - LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL); - LASSERT(osd_write_locked(env, obj)); - - if (fl & LU_XATTR_REPLACE) - fs_flags |= XATTR_REPLACE; - - if (fl & LU_XATTR_CREATE) - fs_flags |= XATTR_CREATE; +static inline int __osd_xattr_set(struct osd_thread_info *info, + struct inode *inode, const char *name, + const void *buf, int buflen, int fl) +{ + struct dentry *dentry = &info->oti_child_dentry; - dentry->d_inode = inode; - rc = inode->i_op->setxattr(dentry, name, buf->lb_buf, - buf->lb_len, fs_flags); - return rc; + ll_vfs_dq_init(inode); + dentry->d_inode = inode; + return inode->i_op->setxattr(dentry, name, buf, buflen, fl); } /** @@ -1892,38 +2227,27 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, * * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here */ -static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_fid *fid) +int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode, + const struct lu_fid *fid) { - struct osd_thread_info *info = osd_oti_get(env); - struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; + struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; + int rc; - lustre_lma_init(mdt_attrs, fid); - lustre_lma_swab(mdt_attrs); - return __osd_xattr_set(env, dt, - osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs), - XATTR_NAME_LMA, LU_XATTR_CREATE); + if (OBD_FAIL_CHECK(OBD_FAIL_FID_INLMA)) + return 0; -} + if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF) && fid_is_client_visible(fid)) + return 0; -/** - * Helper function to form igif - */ -static inline void osd_igif_get(const struct lu_env *env, struct inode *inode, - struct lu_fid *fid) -{ - LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation); -} + lustre_lma_init(lma, fid); + lustre_lma_swab(lma); -/** - * Helper function to pack the fid, ldiskfs stores fid in packed format. - */ -void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid, - struct lu_fid *befider) -{ - fid_cpu_to_be(befider, (struct lu_fid *)fid); - memcpy(pack->fp_area, befider, sizeof(*befider)); - pack->fp_len = sizeof(*befider) + 1; + rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma, sizeof(*lma), + XATTR_CREATE); + /* Someone may created the EA by race. */ + if (unlikely(rc == -EEXIST)) + rc = 0; + return rc; } /** @@ -1935,89 +2259,129 @@ void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid, * its inmemory API. */ void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param, - const struct dt_rec *fid) + const struct dt_rec *fid) { - param->edp_magic = LDISKFS_LUFID_MAGIC; - param->edp_len = sizeof(struct lu_fid) + 1; + if (!fid_is_client_mdt_visible((const struct lu_fid *)fid)) { + param->edp_magic = 0; + return; + } - fid_cpu_to_be((struct lu_fid *)param->edp_data, - (struct lu_fid *)fid); -} - -int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack) -{ - int result; - - result = 0; - switch (pack->fp_len) { - case sizeof *fid + 1: - memcpy(fid, pack->fp_area, sizeof *fid); - fid_be_to_cpu(fid, fid); - break; - default: - CERROR("Unexpected packed fid size: %d\n", pack->fp_len); - result = -EIO; - } - return result; + param->edp_magic = LDISKFS_LUFID_MAGIC; + param->edp_len = sizeof(struct lu_fid) + 1; + fid_cpu_to_be((struct lu_fid *)param->edp_data, (struct lu_fid *)fid); } /** - * Try to read the fid from inode ea into dt_rec, if return value - * i.e. rc is +ve, then we got fid, otherwise we will have to form igif + * Try to read the fid from inode ea into dt_rec. * * \param fid object fid. * * \retval 0 on success */ static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj, - __u32 ino, struct lu_fid *fid) + __u32 ino, struct lu_fid *fid, + struct osd_inode_id *id) { - struct osd_thread_info *info = osd_oti_get(env); - struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; - struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; - struct dentry *dentry = &info->oti_child_dentry; - struct osd_inode_id *id = &info->oti_id; - struct osd_device *dev; - struct inode *inode; - int rc; + struct osd_thread_info *info = osd_oti_get(env); + struct inode *inode; + ENTRY; - ENTRY; - dev = osd_dev(ldev); + osd_id_gen(id, ino, OSD_OII_NOGEN); + inode = osd_iget_fid(info, osd_obj2dev(obj), id, fid); + if (IS_ERR(inode)) + RETURN(PTR_ERR(inode)); - id->oii_ino = ino; - id->oii_gen = OSD_OII_NOGEN; + iput(inode); + RETURN(0); +} - inode = osd_iget(info, dev, id); - if (IS_ERR(inode)) { - rc = PTR_ERR(inode); - GOTO(out,rc); - } - dentry->d_inode = inode; +static int osd_add_dot_dotdot_internal(struct osd_thread_info *info, + struct inode *dir, + struct inode *parent_dir, + const struct dt_rec *dot_fid, + const struct dt_rec *dot_dot_fid, + struct osd_thandle *oth) +{ + struct ldiskfs_dentry_param *dot_ldp; + struct ldiskfs_dentry_param *dot_dot_ldp; - LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); - rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs, - sizeof *mdt_attrs); - - /* Check LMA compatibility */ - if (rc > 0 && - (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) { - CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n", - inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) & - ~LMA_INCOMPAT_SUPP); - return -ENOSYS; - } + dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2; + osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid); - if (rc > 0) { - lustre_lma_swab(mdt_attrs); - memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid)); - rc = 0; - } else if (rc == -ENODATA) { - osd_igif_get(env, inode, fid); - rc = 0; - } - iput(inode); -out: - RETURN(rc); + dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; + dot_ldp->edp_magic = 0; + return ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, + dir, dot_ldp, dot_dot_ldp); +} + +/** + * Create an local inode for remote entry + */ +static struct inode *osd_create_remote_inode(const struct lu_env *env, + struct osd_device *osd, + struct osd_object *pobj, + const struct lu_fid *fid, + struct thandle *th) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct inode *local; + struct osd_thandle *oh; + int rc; + ENTRY; + + LASSERT(th); + oh = container_of(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle->h_transaction != NULL); + + /* FIXME: Insert index api needs to know the mode of + * the remote object. Just use S_IFDIR for now */ + local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR); + if (IS_ERR(local)) { + CERROR("%s: create local error %d\n", osd_name(osd), + (int)PTR_ERR(local)); + RETURN(local); + } + + rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode, + (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu), + (const struct dt_rec *)fid, oh); + if (rc != 0) { + CERROR("%s: "DFID" add dot dotdot error: rc = %d\n", + osd_name(osd), PFID(fid), rc); + RETURN(ERR_PTR(rc)); + } + + RETURN(local); +} + +/** + * Delete local inode for remote entry + */ +static int osd_delete_remote_inode(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid, + __u32 ino, struct osd_thandle *oh) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; + struct inode *inode; + ENTRY; + + id->oii_ino = le32_to_cpu(ino); + id->oii_gen = OSD_OII_NOGEN; + inode = osd_iget(oti, osd, id); + if (IS_ERR(inode)) { + CERROR("%s: iget error "DFID" id %u:%u\n", osd_name(osd), + PFID(fid), id->oii_ino, id->oii_gen); + RETURN(PTR_ERR(inode)); + } + + clear_nlink(inode); + mark_inode_dirty(inode); + CDEBUG(D_INODE, "%s: delete remote inode "DFID" %lu\n", + osd_name(osd), PFID(fid), inode->i_ino); + iput(inode); + RETURN(0); } /** @@ -2042,22 +2406,29 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(osd_invariant(obj)); - LASSERT(!dt_object_exists(dt)); + LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - OSD_EXEC_OP(th, create); + if (unlikely(fid_is_acct(fid))) + /* Quota files can't be created from the kernel any more, + * 'tune2fs -O quota' will take care of creating them */ + RETURN(-EPERM); - result = __osd_object_create(info, obj, attr, hint, dof, th); + osd_trans_exec_op(env, th, OSD_OT_CREATE); + osd_trans_declare_rb(env, th, OSD_OT_REF_ADD); - /* objects under osd root shld have igif fid, so dont add fid EA */ - if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL) - result = osd_ea_fid_set(env, dt, fid); + result = __osd_object_create(info, obj, attr, hint, dof, th); + if ((result == 0) && + (fid_is_last_id(fid) || + !fid_is_on_ost(info, osd_dt_dev(th->th_dev), fid))) + result = osd_ea_fid_set(info, obj->oo_inode, fid); if (result == 0) result = __osd_oi_insert(env, obj, fid, th); - LASSERT(ergo(result == 0, dt_object_exists(dt))); + LASSERT(ergo(result == 0, + dt_object_exists(dt) && !dt_object_remote(dt))); LINVRNT(osd_invariant(obj)); RETURN(result); } @@ -2066,7 +2437,7 @@ static int osd_declare_object_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *handle) { - struct osd_thandle *oh; + struct osd_thandle *oh; /* it's possible that object doesn't exist yet */ LASSERT(handle != NULL); @@ -2074,10 +2445,10 @@ static int osd_declare_object_ref_add(const struct lu_env *env, oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, ref_add); - oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + osd_trans_declare_op(env, oh, OSD_OT_REF_ADD, + osd_dto_credits_noquota[DTO_ATTR_SET_BASE]); - return 0; + return 0; } /* @@ -2090,35 +2461,39 @@ static int osd_object_ref_add(const struct lu_env *env, struct inode *inode = obj->oo_inode; LINVRNT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - OSD_EXEC_OP(th, ref_add); - - /* - * DIR_NLINK feature is set for compatibility reasons if: - * 1) nlinks > LDISKFS_LINK_MAX, or - * 2) nlinks == 2, since this indicates i_nlink was previously 1. - * - * It is easier to always set this flag (rather than check and set), - * since it has less overhead, and the superblock will be dirtied - * at some point. Both e2fsprogs and any Lustre-supported ldiskfs - * do not actually care whether this flag is set or not. - */ - cfs_spin_lock(&obj->oo_guard); - inode->i_nlink++; - if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) { - if (inode->i_nlink >= LDISKFS_LINK_MAX || - inode->i_nlink == 2) - inode->i_nlink = 1; - } - LASSERT(inode->i_nlink < LDISKFS_LINK_MAX); - cfs_spin_unlock(&obj->oo_guard); - inode->i_sb->s_op->dirty_inode(inode); - LINVRNT(osd_invariant(obj)); - - return 0; + osd_trans_exec_op(env, th, OSD_OT_REF_ADD); + + /* + * DIR_NLINK feature is set for compatibility reasons if: + * 1) nlinks > LDISKFS_LINK_MAX, or + * 2) nlinks == 2, since this indicates i_nlink was previously 1. + * + * It is easier to always set this flag (rather than check and set), + * since it has less overhead, and the superblock will be dirtied + * at some point. Both e2fsprogs and any Lustre-supported ldiskfs + * do not actually care whether this flag is set or not. + */ + spin_lock(&obj->oo_guard); + /* inc_nlink from 0 may cause WARN_ON */ + if(inode->i_nlink == 0) + set_nlink(inode, 1); + else + inc_nlink(inode); + if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) { + if (inode->i_nlink >= LDISKFS_LINK_MAX || + inode->i_nlink == 2) + set_nlink(inode, 1); + } + LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX); + spin_unlock(&obj->oo_guard); + ll_dirty_inode(inode, I_DIRTY_DATASYNC); + LINVRNT(osd_invariant(obj)); + + return 0; } static int osd_declare_object_ref_del(const struct lu_env *env, @@ -2127,16 +2502,16 @@ static int osd_declare_object_ref_del(const struct lu_env *env, { struct osd_thandle *oh; - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(handle != NULL); oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, ref_del); - oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + osd_trans_declare_op(env, oh, OSD_OT_REF_DEL, + osd_dto_credits_noquota[DTO_ATTR_SET_BASE]); - return 0; + return 0; } /* @@ -2149,25 +2524,25 @@ static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt, struct inode *inode = obj->oo_inode; LINVRNT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - OSD_EXEC_OP(th, ref_del); - - cfs_spin_lock(&obj->oo_guard); - LASSERT(inode->i_nlink > 0); - inode->i_nlink--; - /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX) - * then the nlink count is 1. Don't let it be set to 0 or the directory - * inode will be deleted incorrectly. */ - if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0) - inode->i_nlink++; - cfs_spin_unlock(&obj->oo_guard); - inode->i_sb->s_op->dirty_inode(inode); - LINVRNT(osd_invariant(obj)); + osd_trans_exec_op(env, th, OSD_OT_REF_DEL); - return 0; + spin_lock(&obj->oo_guard); + LASSERT(inode->i_nlink > 0); + drop_nlink(inode); + /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX) + * then the nlink count is 1. Don't let it be set to 0 or the directory + * inode will be deleted incorrectly. */ + if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0) + set_nlink(inode, 1); + spin_unlock(&obj->oo_guard); + ll_dirty_inode(inode, I_DIRTY_DATASYNC); + LINVRNT(osd_invariant(obj)); + + return 0; } /* @@ -2205,15 +2580,13 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, return sizeof(dt_obj_version_t); } - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); - LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) return -EACCES; - dentry->d_inode = inode; - return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len); + return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len); } @@ -2222,22 +2595,19 @@ static int osd_declare_xattr_set(const struct lu_env *env, const struct lu_buf *buf, const char *name, int fl, struct thandle *handle) { - struct osd_thandle *oh; + struct osd_thandle *oh; - LASSERT(handle != NULL); - - if (strcmp(name, XATTR_NAME_VERSION) == 0) { - /* no credits for version */ - return 0; - } + LASSERT(handle != NULL); - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, xattr_set); - oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET]; + osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET, + strcmp(name, XATTR_NAME_VERSION) == 0 ? + osd_dto_credits_noquota[DTO_ATTR_SET_BASE] : + osd_dto_credits_noquota[DTO_XATTR_SET]); - return 0; + return 0; } /* @@ -2255,7 +2625,7 @@ static void osd_object_version_set(const struct lu_env *env, LDISKFS_I(inode)->i_fs_version = *new_version; /** Version is set after all inode operations are finished, * so we should mark it dirty here */ - inode->i_sb->s_op->dirty_inode(inode); + ll_dirty_inode(inode, I_DIRTY_DATASYNC); } /* @@ -2265,6 +2635,12 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *handle, struct lustre_capa *capa) { + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + int fs_flags = 0; + ENTRY; + LASSERT(handle != NULL); /* version set is not real XATTR */ @@ -2279,8 +2655,15 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; - OSD_EXEC_OP(handle, xattr_set); - return __osd_xattr_set(env, dt, buf, name, fl); + osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET); + if (fl & LU_XATTR_REPLACE) + fs_flags |= XATTR_REPLACE; + + if (fl & LU_XATTR_CREATE) + fs_flags |= XATTR_CREATE; + + return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len, + fs_flags); } /* @@ -2294,7 +2677,7 @@ static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, struct osd_thread_info *info = osd_oti_get(env); struct dentry *dentry = &info->oti_obj_dentry; - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL); LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); @@ -2311,16 +2694,16 @@ static int osd_declare_xattr_del(const struct lu_env *env, { struct osd_thandle *oh; - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(handle != NULL); oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, xattr_set); - oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET]; + osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET, + osd_dto_credits_noquota[DTO_XATTR_SET]); - return 0; + return 0; } /* @@ -2336,16 +2719,16 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, struct dentry *dentry = &info->oti_obj_dentry; int rc; - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL); - LASSERT(osd_write_locked(env, obj)); LASSERT(handle != NULL); if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; - OSD_EXEC_OP(handle, xattr_set); + osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET); + ll_vfs_dq_init(inode); dentry->d_inode = inode; rc = inode->i_op->removexattr(dentry, name); return rc; @@ -2370,7 +2753,7 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, if (!dev->od_fl_capa) RETURN(ERR_PTR(-ENOENT)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LINVRNT(osd_invariant(obj)); /* renewal sanity check */ @@ -2421,9 +2804,9 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, RETURN(oc); } - cfs_spin_lock(&capa_lock); - *key = dev->od_capa_keys[1]; - cfs_spin_unlock(&capa_lock); + spin_lock(&capa_lock); + *key = dev->od_capa_keys[1]; + spin_unlock(&capa_lock); capa->lc_keyid = key->lk_keyid; capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout; @@ -2440,23 +2823,27 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) { - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_obj_dentry; - struct file *file = &info->oti_file; - int rc; - - ENTRY; - - dentry->d_inode = inode; - file->f_dentry = dentry; - file->f_mapping = inode->i_mapping; - file->f_op = inode->i_fop; - LOCK_INODE_MUTEX(inode); - rc = file->f_op->fsync(file, dentry, 0); - UNLOCK_INODE_MUTEX(inode); - RETURN(rc); + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *dentry = &info->oti_obj_dentry; + struct file *file = &info->oti_file; + int rc; + + ENTRY; + + dentry->d_inode = inode; + file->f_dentry = dentry; + file->f_mapping = inode->i_mapping; + file->f_op = inode->i_fop; +#ifndef HAVE_FILE_FSYNC_4ARGS + mutex_lock(&inode->i_mutex); +#endif + rc = do_fsync(file, 0); +#ifndef HAVE_FILE_FSYNC_4ARGS + mutex_unlock(&inode->i_mutex); +#endif + RETURN(rc); } static int osd_data_get(const struct lu_env *env, struct dt_object *dt, @@ -2515,17 +2902,6 @@ static int osd_iam_container_init(const struct lu_env *env, return result; result = iam_container_setup(bag); - if (result != 0) - goto out; - - if (osd_obj2dev(obj)->od_iop_mode) { - u32 ptr = bag->ic_descr->id_ops->id_root_ptr(bag); - - bag->ic_root_bh = ldiskfs_bread(NULL, obj->oo_inode, - ptr, 0, &result); - } - - out: if (result == 0) obj->oo_dt.do_index_ops = &osd_index_iam_ops; else @@ -2541,52 +2917,57 @@ static int osd_iam_container_init(const struct lu_env *env, static int osd_index_try(const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat) { - int result; - int ea_dir = 0; - struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); + int result; + int skip_iam = 0; + struct osd_object *obj = osd_dt_obj(dt); LINVRNT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); if (osd_object_is_root(obj)) { dt->do_index_ops = &osd_index_ea_ops; result = 0; - } else if (feat == &dt_directory_features && osd->od_iop_mode) { + } else if (feat == &dt_directory_features) { dt->do_index_ops = &osd_index_ea_ops; - if (S_ISDIR(obj->oo_inode->i_mode)) + if (obj->oo_inode != NULL && S_ISDIR(obj->oo_inode->i_mode)) result = 0; else result = -ENOTDIR; - ea_dir = 1; + skip_iam = 1; + } else if (unlikely(feat == &dt_otable_features)) { + dt->do_index_ops = &osd_otable_ops; + return 0; + } else if (unlikely(feat == &dt_acct_features)) { + dt->do_index_ops = &osd_acct_index_ops; + result = 0; + skip_iam = 1; } else if (!osd_has_index(obj)) { struct osd_directory *dir; OBD_ALLOC_PTR(dir); if (dir != NULL) { - cfs_spin_lock(&obj->oo_guard); - if (obj->oo_dir == NULL) - obj->oo_dir = dir; - else - /* - * Concurrent thread allocated container data. - */ - OBD_FREE_PTR(dir); - cfs_spin_unlock(&obj->oo_guard); - /* - * Now, that we have container data, serialize its - * initialization. - */ - cfs_down_write(&obj->oo_ext_idx_sem); - /* - * recheck under lock. - */ - if (!osd_has_index(obj)) - result = osd_iam_container_init(env, obj, dir); - else - result = 0; - cfs_up_write(&obj->oo_ext_idx_sem); + spin_lock(&obj->oo_guard); + if (obj->oo_dir == NULL) + obj->oo_dir = dir; + else + /* + * Concurrent thread allocated container data. + */ + OBD_FREE_PTR(dir); + spin_unlock(&obj->oo_guard); + /* + * Now, that we have container data, serialize its + * initialization. + */ + down_write(&obj->oo_ext_idx_sem); + /* + * recheck under lock. + */ + if (!osd_has_index(obj)) + result = osd_iam_container_init(env, obj, dir); + else + result = 0; + up_write(&obj->oo_ext_idx_sem); } else { result = -ENOMEM; } @@ -2594,348 +2975,95 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, result = 0; } - if (result == 0 && ea_dir == 0) { - if (!osd_iam_index_probe(env, obj, feat)) - result = -ENOTDIR; - } - LINVRNT(osd_invariant(obj)); - - return result; -} - -static const struct dt_object_operations osd_obj_ops = { - .do_read_lock = osd_object_read_lock, - .do_write_lock = osd_object_write_lock, - .do_read_unlock = osd_object_read_unlock, - .do_write_unlock = osd_object_write_unlock, - .do_write_locked = osd_object_write_locked, - .do_attr_get = osd_attr_get, - .do_declare_attr_set = osd_declare_attr_set, - .do_attr_set = osd_attr_set, - .do_ah_init = osd_ah_init, - .do_declare_create = osd_declare_object_create, - .do_create = osd_object_create, - .do_declare_destroy = osd_declare_object_destroy, - .do_destroy = osd_object_destroy, - .do_index_try = osd_index_try, - .do_declare_ref_add = osd_declare_object_ref_add, - .do_ref_add = osd_object_ref_add, - .do_declare_ref_del = osd_declare_object_ref_del, - .do_ref_del = osd_object_ref_del, - .do_xattr_get = osd_xattr_get, - .do_declare_xattr_set = osd_declare_xattr_set, - .do_xattr_set = osd_xattr_set, - .do_declare_xattr_del = osd_declare_xattr_del, - .do_xattr_del = osd_xattr_del, - .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, - .do_object_sync = osd_object_sync, - .do_data_get = osd_data_get, -}; - -/** - * dt_object_operations for interoperability mode - * (i.e. to run 2.0 mds on 1.8 disk) (b11826) - */ -static const struct dt_object_operations osd_obj_ea_ops = { - .do_read_lock = osd_object_read_lock, - .do_write_lock = osd_object_write_lock, - .do_read_unlock = osd_object_read_unlock, - .do_write_unlock = osd_object_write_unlock, - .do_write_locked = osd_object_write_locked, - .do_attr_get = osd_attr_get, - .do_declare_attr_set = osd_declare_attr_set, - .do_attr_set = osd_attr_set, - .do_ah_init = osd_ah_init, - .do_declare_create = osd_declare_object_create, - .do_create = osd_object_ea_create, - .do_declare_destroy = osd_declare_object_destroy, - .do_destroy = osd_object_destroy, - .do_index_try = osd_index_try, - .do_declare_ref_add = osd_declare_object_ref_add, - .do_ref_add = osd_object_ref_add, - .do_declare_ref_del = osd_declare_object_ref_del, - .do_ref_del = osd_object_ref_del, - .do_xattr_get = osd_xattr_get, - .do_declare_xattr_set = osd_declare_xattr_set, - .do_xattr_set = osd_xattr_set, - .do_declare_xattr_del = osd_declare_xattr_del, - .do_xattr_del = osd_xattr_del, - .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, - .do_object_sync = osd_object_sync, - .do_data_get = osd_data_get, -}; - -/* - * Body operations. - */ - -/* - * XXX: Another layering violation for now. - * - * We don't want to use ->f_op->read methods, because generic file write - * - * - serializes on ->i_sem, and - * - * - does a lot of extra work like balance_dirty_pages(), - * - * which doesn't work for globally shared files like /last-received. - */ -static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen) -{ - struct ldiskfs_inode_info *ei = LDISKFS_I(inode); - - memcpy(buffer, (char*)ei->i_data, buflen); - - return buflen; -} - -static int osd_ldiskfs_read(struct inode *inode, void *buf, int size, - loff_t *offs) -{ - struct buffer_head *bh; - unsigned long block; - int osize = size; - int blocksize; - int csize; - int boffs; - int err; - - /* prevent reading after eof */ - spin_lock(&inode->i_lock); - if (i_size_read(inode) < *offs + size) { - size = i_size_read(inode) - *offs; - spin_unlock(&inode->i_lock); - if (size < 0) { - CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n", - i_size_read(inode), *offs); - return -EBADR; - } else if (size == 0) { - return 0; - } - } else { - spin_unlock(&inode->i_lock); - } - - blocksize = 1 << inode->i_blkbits; - - while (size > 0) { - block = *offs >> inode->i_blkbits; - boffs = *offs & (blocksize - 1); - csize = min(blocksize - boffs, size); - bh = ldiskfs_bread(NULL, inode, block, 0, &err); - if (!bh) { - CERROR("can't read block: %d\n", err); - return err; - } - - memcpy(buf, bh->b_data + boffs, csize); - brelse(bh); - - *offs += csize; - buf += csize; - size -= csize; - } - return osize; -} - -static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos, - struct lustre_capa *capa) -{ - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - int rc; - - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) - RETURN(-EACCES); - - /* Read small symlink from inode body as we need to maintain correct - * on-disk symlinks for ldiskfs. - */ - if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) && - (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data))) - rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len); - else - rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos); - - return rc; -} - -static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen) -{ - - memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer, - buflen); - LDISKFS_I(inode)->i_disksize = buflen; - i_size_write(inode, buflen); - inode->i_sb->s_op->dirty_inode(inode); - - return 0; -} - -static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize, - loff_t *offs, handle_t *handle) -{ - struct buffer_head *bh = NULL; - loff_t offset = *offs; - loff_t new_size = i_size_read(inode); - unsigned long block; - int blocksize = 1 << inode->i_blkbits; - int err = 0; - int size; - int boffs; - int dirty_inode = 0; - - while (bufsize > 0) { - if (bh != NULL) - brelse(bh); - - block = offset >> inode->i_blkbits; - boffs = offset & (blocksize - 1); - size = min(blocksize - boffs, bufsize); - bh = ldiskfs_bread(handle, inode, block, 1, &err); - if (!bh) { - CERROR("can't read/create block: %d\n", err); - break; - } - - err = ldiskfs_journal_get_write_access(handle, bh); - if (err) { - CERROR("journal_get_write_access() returned error %d\n", - err); - break; - } - LASSERTF(boffs + size <= bh->b_size, - "boffs %d size %d bh->b_size %lu", - boffs, size, (unsigned long)bh->b_size); - memcpy(bh->b_data + boffs, buf, size); - err = ldiskfs_journal_dirty_metadata(handle, bh); - if (err) - break; - - if (offset + size > new_size) - new_size = offset + size; - offset += size; - bufsize -= size; - buf += size; - } - if (bh) - brelse(bh); - - /* correct in-core and on-disk sizes */ - if (new_size > i_size_read(inode)) { - spin_lock(&inode->i_lock); - if (new_size > i_size_read(inode)) - i_size_write(inode, new_size); - if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) { - LDISKFS_I(inode)->i_disksize = i_size_read(inode); - dirty_inode = 1; - } - spin_unlock(&inode->i_lock); - if (dirty_inode) - inode->i_sb->s_op->dirty_inode(inode); - } - - if (err == 0) - *offs = offset; - return err; -} - -static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, - const loff_t size, loff_t pos, - struct thandle *handle) -{ - struct osd_thandle *oh; - int credits; - - LASSERT(handle != NULL); - - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); - - /* XXX: size == 0 or INT_MAX indicating a catalog header update or - * llog write, see comment in mdd_declare_llog_record(). - * - * This hack should be removed in 2.3 - */ - if (size == DECLARE_LLOG_REWRITE) - credits = 2; - else if (size == DECLARE_LLOG_WRITE) - credits = 6; - else - credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK]; - - OSD_DECLARE_OP(oh, write); - oh->ot_credits += credits; - - if (osd_dt_obj(dt)->oo_inode == NULL) - return 0; - - osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, - osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, - osd_dt_obj(dt)->oo_inode); - return 0; -} - -static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, struct lustre_capa *capa, - int ignore_quota) -{ - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thandle *oh; - ssize_t result = 0; -#ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = cfs_curproc_cap_pack(); -#endif - - LASSERT(handle != NULL); + if (result == 0 && skip_iam == 0) { + if (!osd_iam_index_probe(env, obj, feat)) + result = -ENOTDIR; + } + LINVRNT(osd_invariant(obj)); - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE)) - RETURN(-EACCES); + if (result == 0 && is_quota_glb_feat(feat) && + fid_seq(lu_object_fid(&dt->do_lu)) == FID_SEQ_QUOTA_GLB) + result = osd_quota_migration(env, dt, feat); - oh = container_of(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle->h_transaction != NULL); -#ifdef HAVE_QUOTA_SUPPORT - if (ignore_quota) - cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - else - cfs_cap_lower(CFS_CAP_SYS_RESOURCE); -#endif - /* Write small symlink to inode body as we need to maintain correct - * on-disk symlinks for ldiskfs. - */ - if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) && - (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data))) - result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len); - else - result = osd_ldiskfs_write_record(inode, buf->lb_buf, - buf->lb_len, pos, - oh->ot_handle); -#ifdef HAVE_QUOTA_SUPPORT - cfs_curproc_cap_unpack(save); -#endif - if (result == 0) - result = buf->lb_len; return result; } -/* - * in some cases we may need declare methods for objects being created - * e.g., when we create symlink +static int osd_otable_it_attr_get(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct lustre_capa *capa) +{ + attr->la_valid = 0; + return 0; +} + +static const struct dt_object_operations osd_obj_ops = { + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, + .do_attr_get = osd_attr_get, + .do_declare_attr_set = osd_declare_attr_set, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_declare_create = osd_declare_object_create, + .do_create = osd_object_create, + .do_declare_destroy = osd_declare_object_destroy, + .do_destroy = osd_object_destroy, + .do_index_try = osd_index_try, + .do_declare_ref_add = osd_declare_object_ref_add, + .do_ref_add = osd_object_ref_add, + .do_declare_ref_del = osd_declare_object_ref_del, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_declare_xattr_set = osd_declare_xattr_set, + .do_xattr_set = osd_xattr_set, + .do_declare_xattr_del = osd_declare_xattr_del, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, +}; + +/** + * dt_object_operations for interoperability mode + * (i.e. to run 2.0 mds on 1.8 disk) (b11826) */ -static const struct dt_body_operations osd_body_ops_new = { - .dbo_declare_write = osd_declare_write, +static const struct dt_object_operations osd_obj_ea_ops = { + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, + .do_attr_get = osd_attr_get, + .do_declare_attr_set = osd_declare_attr_set, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_declare_create = osd_declare_object_create, + .do_create = osd_object_ea_create, + .do_declare_destroy = osd_declare_object_destroy, + .do_destroy = osd_object_destroy, + .do_index_try = osd_index_try, + .do_declare_ref_add = osd_declare_object_ref_add, + .do_ref_add = osd_object_ref_add, + .do_declare_ref_del = osd_declare_object_ref_del, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_declare_xattr_set = osd_declare_xattr_set, + .do_xattr_set = osd_xattr_set, + .do_declare_xattr_del = osd_declare_xattr_del, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, }; -const struct dt_body_operations osd_body_ops = { - .dbo_read = osd_read, - .dbo_declare_write = osd_declare_write, - .dbo_write = osd_write +static const struct dt_object_operations osd_obj_otable_it_ops = { + .do_attr_get = osd_otable_it_attr_get, + .do_index_try = osd_index_try, }; static int osd_index_declare_iam_delete(const struct lu_env *env, @@ -2948,10 +3076,10 @@ static int osd_index_declare_iam_delete(const struct lu_env *env, oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, delete); - oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; + osd_trans_declare_op(env, oh, OSD_OT_DELETE, + osd_dto_credits_noquota[DTO_INDEX_DELETE]); - return 0; + return 0; } /** @@ -2971,23 +3099,24 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, struct thandle *handle, struct lustre_capa *capa) { - struct osd_object *obj = osd_dt_obj(dt); - struct osd_thandle *oh; - struct iam_path_descr *ipd; - struct iam_container *bag = &obj->oo_dir->od_container; - int rc; + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_object *obj = osd_dt_obj(dt); + struct osd_thandle *oh; + struct iam_path_descr *ipd; + struct iam_container *bag = &obj->oo_dir->od_container; + int rc; ENTRY; LINVRNT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(bag->ic_object == obj->oo_inode); LASSERT(handle != NULL); if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) RETURN(-EACCES); - OSD_EXEC_OP(handle, delete); + osd_trans_exec_op(env, handle, OSD_OT_DELETE); ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) @@ -2997,6 +3126,12 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(oh->ot_handle != NULL); LASSERT(oh->ot_handle->h_transaction != NULL); + if (fid_is_quota(lu_object_fid(&dt->do_lu))) { + /* swab quota uid/gid provided by caller */ + oti->oti_quota_id = cpu_to_le64(*((__u64 *)key)); + key = (const struct dt_key *)&oti->oti_quota_id; + } + rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd); osd_ipd_put(env, bag, ipd); LINVRNT(osd_invariant(obj)); @@ -3004,28 +3139,30 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, } static int osd_index_declare_ea_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *handle) + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) { - struct osd_thandle *oh; + struct osd_thandle *oh; + struct inode *inode; + int rc; + ENTRY; - LASSERT(dt_object_exists(dt)); - LASSERT(handle != NULL); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); + LASSERT(handle != NULL); - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, delete); - oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; + osd_trans_declare_op(env, oh, OSD_OT_DELETE, + osd_dto_credits_noquota[DTO_INDEX_DELETE]); - LASSERT(osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, - osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, - osd_dt_obj(dt)->oo_inode); + inode = osd_dt_obj(dt)->oo_inode; + LASSERT(inode); - return 0; + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, @@ -3038,7 +3175,29 @@ static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, rec = (struct osd_fid_pack *) (de->name + de->name_len + 1); rc = osd_fid_unpack((struct lu_fid *)fid, rec); } - RETURN(rc); + return rc; +} + +static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, + struct lu_fid *fid) +{ + struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range; + struct seq_server_site *ss = osd_seq_site(osd); + int rc; + ENTRY; + + /* Those FID seqs, which are not in FLDB, must be local seq */ + if (unlikely(!fid_seq_in_fldb(fid_seq(fid)) || ss == NULL)) + RETURN(0); + + rc = osd_fld_lookup(env, osd, fid, range); + if (rc != 0) { + CERROR("%s: Can not lookup fld for "DFID"\n", + osd_name(osd), PFID(fid)); + RETURN(rc); + } + + RETURN(ss->ss_node_id != range->lsr_index); } /** @@ -3060,18 +3219,19 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, struct inode *dir = obj->oo_inode; struct dentry *dentry; struct osd_thandle *oh; - struct ldiskfs_dir_entry_2 *de; + struct ldiskfs_dir_entry_2 *de = NULL; struct buffer_head *bh; struct htree_lock *hlock = NULL; - int rc; - + struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); + int rc; ENTRY; LINVRNT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(handle != NULL); - OSD_EXEC_OP(handle, delete); + osd_trans_exec_op(env, handle, OSD_OT_DELETE); oh = container_of(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle != NULL); @@ -3080,6 +3240,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) RETURN(-EACCES); + ll_vfs_dq_init(dir); dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); @@ -3088,7 +3249,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, ldiskfs_htree_lock(hlock, obj->oo_hl_head, dir, LDISKFS_HLOCK_DEL); } else { - cfs_down_write(&obj->oo_ext_idx_sem); + down_write(&obj->oo_ext_idx_sem); } bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); @@ -3102,7 +3263,31 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, if (hlock != NULL) ldiskfs_htree_unlock(hlock); else - cfs_up_write(&obj->oo_ext_idx_sem); + up_write(&obj->oo_ext_idx_sem); + + if (rc != 0) + GOTO(out, rc); + + /* For inode on the remote MDT, .. will point to + * /Agent directory. So do not try to lookup/delete + * remote inode for .. */ + if (strcmp((char *)key, dotdot) == 0) + GOTO(out, rc = 0); + + LASSERT(de != NULL); + rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid); + if (rc == 0 && osd_remote_fid(env, osd, fid)) { + __u32 ino = le32_to_cpu(de->inode); + + rc = osd_delete_remote_inode(env, osd, fid, ino, oh); + if (rc != 0) + CERROR("%s: del local inode "DFID": rc = %d\n", + osd_name(osd), PFID(fid), rc); + } else { + if (rc == -ENODATA) + rc = 0; + } +out: LASSERT(osd_invariant(obj)); RETURN(rc); @@ -3134,7 +3319,7 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(bag->ic_object == obj->oo_inode); if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) @@ -3147,6 +3332,12 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, /* got ipd now we can start iterator. */ iam_it_init(it, bag, 0, ipd); + if (fid_is_quota(lu_object_fid(&dt->do_lu))) { + /* swab quota uid/gid provided by caller */ + oti->oti_quota_id = cpu_to_le64(*((__u64 *)key)); + key = (const struct dt_key *)&oti->oti_quota_id; + } + rc = iam_it_get(it, (struct iam_key *)key); if (rc >= 0) { if (S_ISDIR(obj->oo_inode->i_mode)) @@ -3155,10 +3346,14 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, iam_rec = (struct iam_rec *) rec; iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec); + if (S_ISDIR(obj->oo_inode->i_mode)) osd_fid_unpack((struct lu_fid *) rec, (struct osd_fid_pack *)iam_rec); + else if (fid_is_quota(lu_object_fid(&dt->do_lu))) + osd_quota_unpack(obj, rec); } + iam_it_put(it); iam_it_fini(it); osd_ipd_put(env, bag, ipd); @@ -3176,16 +3371,15 @@ static int osd_index_declare_iam_insert(const struct lu_env *env, { struct osd_thandle *oh; - LASSERT(dt_object_exists(dt)); LASSERT(handle != NULL); oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, insert); - oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_INDEX_INSERT]); - return 0; + return 0; } /** @@ -3208,24 +3402,21 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, struct iam_path_descr *ipd; struct osd_thandle *oh; struct iam_container *bag = &obj->oo_dir->od_container; -#ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = cfs_curproc_cap_pack(); -#endif struct osd_thread_info *oti = osd_oti_get(env); - struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp; + struct iam_rec *iam_rec; int rc; ENTRY; LINVRNT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(bag->ic_object == obj->oo_inode); LASSERT(th != NULL); if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) - return -EACCES; + RETURN(-EACCES); - OSD_EXEC_OP(th, insert); + osd_trans_exec_op(env, th, OSD_OT_INSERT); ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) @@ -3234,21 +3425,22 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, oh = container_of0(th, struct osd_thandle, ot_super); LASSERT(oh->ot_handle != NULL); LASSERT(oh->ot_handle->h_transaction != NULL); -#ifdef HAVE_QUOTA_SUPPORT - if (ignore_quota) - cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - else - cfs_cap_lower(CFS_CAP_SYS_RESOURCE); -#endif - if (S_ISDIR(obj->oo_inode->i_mode)) - osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid); - else - iam_rec = (struct iam_rec *) rec; + if (S_ISDIR(obj->oo_inode->i_mode)) { + iam_rec = (struct iam_rec *)oti->oti_ldp; + osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid); + } else if (fid_is_quota(lu_object_fid(&dt->do_lu))) { + /* pack quota uid/gid */ + oti->oti_quota_id = cpu_to_le64(*((__u64 *)key)); + key = (const struct dt_key *)&oti->oti_quota_id; + /* pack quota record */ + rec = osd_quota_pack(obj, rec, &oti->oti_quota_rec); + iam_rec = (struct iam_rec *)rec; + } else { + iam_rec = (struct iam_rec *)rec; + } + rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key, iam_rec, ipd); -#ifdef HAVE_QUOTA_SUPPORT - cfs_curproc_cap_unpack(save); -#endif osd_ipd_put(env, bag, ipd); LINVRNT(osd_invariant(obj)); RETURN(rc); @@ -3275,19 +3467,20 @@ static int __osd_ea_add_rec(struct osd_thread_info *info, oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle != NULL); LASSERT(oth->ot_handle->h_transaction != NULL); + LASSERT(pobj->oo_inode); - child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name)); - - if (fid_is_igif((struct lu_fid *)fid) || - fid_is_norm((struct lu_fid *)fid)) { - ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; - osd_get_ldiskfs_dirent_param(ldp, fid); - child->d_fsdata = (void*) ldp; - } else - child->d_fsdata = NULL; - rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock); + ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; + if (unlikely(pobj->oo_inode == + osd_sb(osd_obj2dev(pobj))->s_root->d_inode)) + ldp->edp_magic = 0; + else + osd_get_ldiskfs_dirent_param(ldp, fid); + child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name)); + child->d_fsdata = (void *)ldp; + ll_vfs_dq_init(pobj->oo_inode); + rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock); - RETURN(rc); + RETURN(rc); } /** @@ -3309,10 +3502,8 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, const struct dt_rec *dot_dot_fid, struct thandle *th) { - struct inode *inode = dir->oo_inode; - struct ldiskfs_dentry_param *dot_ldp; - struct ldiskfs_dentry_param *dot_dot_ldp; - struct osd_thandle *oth; + struct inode *inode = dir->oo_inode; + struct osd_thandle *oth; int result = 0; oth = container_of(th, struct osd_thandle, ot_super); @@ -3327,32 +3518,23 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, dir->oo_compat_dot_created = 1; result = 0; } - } else if(strcmp(name, dotdot) == 0) { - dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp; - dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2; - - if (!dir->oo_compat_dot_created) - return -EINVAL; - if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) { - osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid); - osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid); - } else { - dot_ldp = NULL; - dot_dot_ldp = NULL; - } - /* in case of rename, dotdot is already created */ - if (dir->oo_compat_dotdot_created) { - return __osd_ea_add_rec(info, dir, parent_dir, name, - dot_dot_fid, NULL, th); - } + } else if (strcmp(name, dotdot) == 0) { + if (!dir->oo_compat_dot_created) + return -EINVAL; + /* in case of rename, dotdot is already created */ + if (dir->oo_compat_dotdot_created) { + return __osd_ea_add_rec(info, dir, parent_dir, name, + dot_dot_fid, NULL, th); + } - result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, - inode, dot_ldp, dot_dot_ldp); - if (result == 0) - dir->oo_compat_dotdot_created = 1; - } + result = osd_add_dot_dotdot_internal(info, dir->oo_inode, + parent_dir, dot_fid, + dot_dot_fid, oth); + if (result == 0) + dir->oo_compat_dotdot_created = 1; + } - return result; + return result; } @@ -3376,7 +3558,7 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj, ldiskfs_htree_lock(hlock, pobj->oo_hl_head, pobj->oo_inode, 0); } else { - cfs_down_write(&pobj->oo_ext_idx_sem); + down_write(&pobj->oo_ext_idx_sem); } rc = osd_add_dot_dotdot(info, pobj, cinode, name, (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu), @@ -3386,20 +3568,104 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj, ldiskfs_htree_lock(hlock, pobj->oo_hl_head, pobj->oo_inode, LDISKFS_HLOCK_ADD); } else { - cfs_down_write(&pobj->oo_ext_idx_sem); + down_write(&pobj->oo_ext_idx_sem); } - rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, - hlock, th); + if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR)) { + struct lu_fid *tfid = &info->oti_fid; + + *tfid = *(const struct lu_fid *)fid; + tfid->f_ver = ~0; + rc = __osd_ea_add_rec(info, pobj, cinode, name, + (const struct dt_rec *)tfid, + hlock, th); + } else { + rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, + hlock, th); + } } if (hlock != NULL) ldiskfs_htree_unlock(hlock); else - cfs_up_write(&pobj->oo_ext_idx_sem); + up_write(&pobj->oo_ext_idx_sem); return rc; } +static void +osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev, + struct osd_idmap_cache *oic) +{ + struct osd_scrub *scrub = &dev->od_scrub; + struct lu_fid *fid = &oic->oic_fid; + struct osd_inode_id *id = &oti->oti_id; + int once = 0; + int rc; + ENTRY; + + if (!fid_is_norm(fid) && !fid_is_igif(fid)) + RETURN_EXIT; + +again: + rc = osd_oi_lookup(oti, dev, fid, id, true); + if (rc != 0 && rc != -ENOENT) + RETURN_EXIT; + + if (rc == 0 && osd_id_eq(id, &oic->oic_lid)) + RETURN_EXIT; + + if (thread_is_running(&scrub->os_thread)) { + rc = osd_oii_insert(dev, oic, rc == -ENOENT); + /* There is race condition between osd_oi_lookup and OI scrub. + * The OI scrub finished just after osd_oi_lookup() failure. + * Under such case, it is unnecessary to trigger OI scrub again, + * but try to call osd_oi_lookup() again. */ + if (unlikely(rc == -EAGAIN)) + goto again; + + RETURN_EXIT; + } + + if (!dev->od_noscrub && ++once == 1) { + CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID"\n", + PFID(fid)); + rc = osd_scrub_start(dev); + LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC for "DFID + ", rc = %d [2]\n", + LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name, + PFID(fid), rc); + if (rc == 0) + goto again; + } + + EXIT; +} + +static int osd_fail_fid_lookup(struct osd_thread_info *oti, + struct osd_device *dev, + struct osd_idmap_cache *oic, + struct lu_fid *fid, __u32 ino) +{ + struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs; + struct inode *inode; + int rc; + + osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN); + inode = osd_iget(oti, dev, &oic->oic_lid); + if (IS_ERR(inode)) { + fid_zero(&oic->oic_fid); + return PTR_ERR(inode); + } + + rc = osd_get_lma(oti, inode, &oti->oti_obj_dentry, lma); + iput(inode); + if (rc != 0) + fid_zero(&oic->oic_fid); + else + *fid = oic->oic_fid = lma->lma_self_fid; + return rc; +} + /** * Calls ->lookup() to find dentry. From dentry get inode and * read inode's ea to get fid. This is required for interoperability @@ -3419,6 +3685,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, struct htree_lock *hlock = NULL; int ino; int rc; + ENTRY; LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL); @@ -3430,27 +3697,58 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, ldiskfs_htree_lock(hlock, obj->oo_hl_head, dir, LDISKFS_HLOCK_LOOKUP); } else { - cfs_down_read(&obj->oo_ext_idx_sem); + down_read(&obj->oo_ext_idx_sem); } bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); if (bh) { - ino = le32_to_cpu(de->inode); - rc = osd_get_fid_from_dentry(de, rec); - - /* done with de, release bh */ - brelse(bh); - if (rc != 0) - rc = osd_ea_fid_get(env, obj, ino, fid); - } else { - rc = -ENOENT; - } + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; + struct osd_idmap_cache *oic = &oti->oti_cache; + struct osd_device *dev = osd_obj2dev(obj); + struct osd_scrub *scrub = &dev->od_scrub; + struct scrub_file *sf = &scrub->os_file; + + ino = le32_to_cpu(de->inode); + if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP)) { + brelse(bh); + rc = osd_fail_fid_lookup(oti, dev, oic, fid, ino); + GOTO(out, rc); + } + + rc = osd_get_fid_from_dentry(de, rec); + + /* done with de, release bh */ + brelse(bh); + if (rc != 0) + rc = osd_ea_fid_get(env, obj, ino, fid, id); + else + osd_id_gen(id, ino, OSD_OII_NOGEN); + if (rc != 0 || osd_remote_fid(env, dev, fid)) { + fid_zero(&oic->oic_fid); + GOTO(out, rc); + } + + oic->oic_lid = *id; + oic->oic_fid = *fid; + if ((scrub->os_pos_current <= ino) && + ((sf->sf_flags & SF_INCONSISTENT) || + (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) || + ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), + sf->sf_oi_bitmap))) + osd_consistency_check(oti, dev, oic); + } else { + rc = -ENOENT; + } + + GOTO(out, rc); - if (hlock != NULL) - ldiskfs_htree_unlock(hlock); - else - cfs_up_read(&obj->oo_ext_idx_sem); - RETURN (rc); +out: + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + up_read(&obj->oo_ext_idx_sem); + return rc; } /** @@ -3470,7 +3768,17 @@ struct osd_object *osd_object_find(const struct lu_env *env, struct lu_object *luch; struct lu_object *lo; - luch = lu_object_find(env, ludev, fid, NULL); + /* + * at this point topdev might not exist yet + * (i.e. MGS is preparing profiles). so we can + * not rely on topdev and instead lookup with + * our device passed as topdev. this can't work + * if the object isn't cached yet (as osd doesn't + * allocate lu_header). IOW, the object must be + * in the cache, otherwise lu_object_alloc() crashes + * -bzzz + */ + luch = lu_object_find_at(env, ludev, fid, NULL); if (!IS_ERR(luch)) { if (lu_object_exists(luch)) { lo = lu_object_locate(luch->lo_header, ludev->ld_type); @@ -3479,7 +3787,7 @@ struct osd_object *osd_object_find(const struct lu_env *env, else LU_OBJECT_DEBUG(D_ERROR, env, luch, "lu_object can't be located" - ""DFID"\n", PFID(fid)); + DFID"\n", PFID(fid)); if (child == NULL) { lu_object_put(env, luch); @@ -3490,6 +3798,7 @@ struct osd_object *osd_object_find(const struct lu_env *env, LU_OBJECT_DEBUG(D_ERROR, env, luch, "lu_object does not exists "DFID"\n", PFID(fid)); + lu_object_put(env, luch); child = ERR_PTR(-ENOENT); } } else @@ -3510,29 +3819,61 @@ static inline void osd_object_put(const struct lu_env *env, } static int osd_index_declare_ea_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *handle) -{ - struct osd_thandle *oh; - - LASSERT(dt_object_exists(dt)); - LASSERT(handle != NULL); - - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); - - OSD_DECLARE_OP(oh, insert); - oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; - - LASSERT(osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, - osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, - osd_dt_obj(dt)->oo_inode); - - return 0; + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); + struct lu_fid *fid = (struct lu_fid *)rec; + int rc; + ENTRY; + + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_INDEX_INSERT]); + + if (osd_dt_obj(dt)->oo_inode == NULL) { + const char *name = (const char *)key; + /* Object is not being created yet. Only happens when + * 1. declare directory create + * 2. declare insert . + * 3. declare insert .. + */ + LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0); + } else { + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + /* We ignore block quota on meta pool (MDTs), so needn't + * calculate how many blocks will be consumed by this index + * insert */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, + oh, true, true, NULL, false); + } + + if (fid == NULL) + RETURN(0); + + rc = osd_remote_fid(env, osd, fid); + if (rc <= 0) + RETURN(rc); + + rc = 0; + + osd_trans_declare_op(env, oh, OSD_OT_CREATE, + osd_dto_credits_noquota[DTO_OBJECT_CREATE]); + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1); + osd_trans_declare_op(env, oh, OSD_OT_INSERT, + osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1); + + RETURN(rc); } /** @@ -3551,43 +3892,79 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th, struct lustre_capa *capa, int ignore_quota) { - struct osd_object *obj = osd_dt_obj(dt); - struct lu_fid *fid = (struct lu_fid *) rec; - const char *name = (const char *)key; - struct osd_object *child; -#ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = cfs_curproc_cap_pack(); -#endif - int rc; - - ENTRY; + struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); + struct lu_fid *fid = (struct lu_fid *) rec; + const char *name = (const char *)key; + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; + struct inode *child_inode = NULL; + struct osd_object *child = NULL; + int rc; + ENTRY; LASSERT(osd_invariant(obj)); - LASSERT(dt_object_exists(dt)); + LASSERT(dt_object_exists(dt) && !dt_object_remote(dt)); LASSERT(th != NULL); + osd_trans_exec_op(env, th, OSD_OT_INSERT); + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) RETURN(-EACCES); - child = osd_object_find(env, dt, fid); - if (!IS_ERR(child)) { -#ifdef HAVE_QUOTA_SUPPORT - if (ignore_quota) - cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - else - cfs_cap_lower(CFS_CAP_SYS_RESOURCE); -#endif - rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th); -#ifdef HAVE_QUOTA_SUPPORT - cfs_curproc_cap_unpack(save); -#endif - osd_object_put(env, child); - } else { - rc = PTR_ERR(child); - } - - LASSERT(osd_invariant(obj)); - RETURN(rc); + LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid)); + + rc = osd_remote_fid(env, osd, fid); + if (rc < 0) { + CERROR("%s: Can not find object "DFID" rc %d\n", + osd_name(osd), PFID(fid), rc); + RETURN(rc); + } + + if (rc == 1) { + /* Insert remote entry */ + if (strcmp(name, dotdot) == 0 && strlen(name) == 2) { + struct osd_mdobj_map *omm = osd->od_mdt_map; + struct osd_thandle *oh; + + /* If parent on remote MDT, we need put this object + * under AGENT */ + oh = container_of(th, typeof(*oh), ot_super); + rc = osd_add_to_agent(env, osd, obj, oh); + if (rc != 0) { + CERROR("%s: add agent "DFID" error: rc = %d\n", + osd_name(osd), + PFID(lu_object_fid(&dt->do_lu)), rc); + RETURN(rc); + } + + child_inode = igrab(omm->omm_agent_dentry->d_inode); + } else { + child_inode = osd_create_remote_inode(env, osd, obj, + fid, th); + if (IS_ERR(child_inode)) + RETURN(PTR_ERR(child_inode)); + } + } else { + /* Insert local entry */ + child = osd_object_find(env, dt, fid); + if (IS_ERR(child)) { + CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n", + osd_name(osd), PFID(fid), + id->oii_ino, id->oii_gen, + (int)PTR_ERR(child_inode)); + RETURN(PTR_ERR(child_inode)); + } + child_inode = igrab(child->oo_inode); + } + + rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th); + + iput(child_inode); + if (child != NULL) + osd_object_put(env, child); + LASSERT(osd_invariant(obj)); + RETURN(rc); } /** @@ -3653,7 +4030,14 @@ static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di) static int osd_it_iam_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key) { - struct osd_it_iam *it = (struct osd_it_iam *)di; + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_it_iam *it = (struct osd_it_iam *)di; + + if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) { + /* swab quota uid/gid */ + oti->oti_quota_id = cpu_to_le64(*((__u64 *)key)); + key = (struct dt_key *)&oti->oti_quota_id; + } return iam_it_get(&it->oi_it, (const struct iam_key *)key); } @@ -3663,7 +4047,6 @@ static int osd_it_iam_get(const struct lu_env *env, * * \param di osd iterator */ - static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di) { struct osd_it_iam *it = (struct osd_it_iam *)di; @@ -3695,9 +4078,20 @@ static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di) static struct dt_key *osd_it_iam_key(const struct lu_env *env, const struct dt_it *di) { - struct osd_it_iam *it = (struct osd_it_iam *)di; + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_it_iam *it = (struct osd_it_iam *)di; + struct osd_object *obj = it->oi_obj; + struct dt_key *key; + + key = (struct dt_key *)iam_it_key_get(&it->oi_it); + + if (!IS_ERR(key) && fid_is_quota(lu_object_fid(&obj->oo_dt.do_lu))) { + /* swab quota uid/gid */ + oti->oti_quota_id = le64_to_cpu(*((__u64 *)key)); + key = (struct dt_key *)&oti->oti_quota_id; + } - return (struct dt_key *)iam_it_key_get(&it->oi_it); + return key; } /** @@ -3711,44 +4105,42 @@ static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di) return iam_it_key_size(&it->oi_it); } -static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr, - int len, __u16 type) +static inline void +osd_it_append_attrs(struct lu_dirent *ent, int len, __u16 type) { - struct luda_type *lt; - const unsigned align = sizeof(struct luda_type) - 1; - - /* check if file type is required */ - if (attr & LUDA_TYPE) { - len = (len + align) & ~align; + /* check if file type is required */ + if (ent->lde_attrs & LUDA_TYPE) { + struct luda_type *lt; + int align = sizeof(*lt) - 1; - lt = (void *) ent->lde_name + len; - lt->lt_type = cpu_to_le16(CFS_DTTOIF(type)); - ent->lde_attrs |= LUDA_TYPE; - } + len = (len + align) & ~align; + lt = (struct luda_type *)(ent->lde_name + len); + lt->lt_type = cpu_to_le16(DTTOIF(type)); + } - ent->lde_attrs = cpu_to_le32(ent->lde_attrs); + ent->lde_attrs = cpu_to_le32(ent->lde_attrs); } /** * build lu direct from backend fs dirent. */ -static inline void osd_it_pack_dirent(struct lu_dirent *ent, - struct lu_fid *fid, __u64 offset, - char *name, __u16 namelen, - __u16 type, __u32 attr) +static inline void +osd_it_pack_dirent(struct lu_dirent *ent, struct lu_fid *fid, __u64 offset, + char *name, __u16 namelen, __u16 type, __u32 attr) { - fid_cpu_to_le(&ent->lde_fid, fid); - ent->lde_attrs = LUDA_FID; + ent->lde_attrs = attr | LUDA_FID; + fid_cpu_to_le(&ent->lde_fid, fid); - ent->lde_hash = cpu_to_le64(offset); - ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr)); + ent->lde_hash = cpu_to_le64(offset); + ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr)); - strncpy(ent->lde_name, name, namelen); - ent->lde_namelen = cpu_to_le16(namelen); + strncpy(ent->lde_name, name, namelen); + ent->lde_name[namelen] = '\0'; + ent->lde_namelen = cpu_to_le16(namelen); - /* append lustre attributes */ - osd_it_append_attrs(ent, attr, namelen, type); + /* append lustre attributes */ + osd_it_append_attrs(ent, namelen, type); } /** @@ -3758,37 +4150,48 @@ static int osd_it_iam_rec(const struct lu_env *env, const struct dt_it *di, struct dt_rec *dtrec, __u32 attr) { - struct osd_it_iam *it = (struct osd_it_iam *)di; - struct osd_thread_info *info = osd_oti_get(env); - struct lu_fid *fid = &info->oti_fid; - const struct osd_fid_pack *rec; - struct lu_dirent *lde = (struct lu_dirent *)dtrec; - char *name; - int namelen; - __u64 hash; - int rc; + struct osd_it_iam *it = (struct osd_it_iam *)di; + struct osd_thread_info *info = osd_oti_get(env); + ENTRY; - name = (char *)iam_it_key_get(&it->oi_it); - if (IS_ERR(name)) - RETURN(PTR_ERR(name)); + if (S_ISDIR(it->oi_obj->oo_inode->i_mode)) { + const struct osd_fid_pack *rec; + struct lu_fid *fid = &info->oti_fid; + struct lu_dirent *lde = (struct lu_dirent *)dtrec; + char *name; + int namelen; + __u64 hash; + int rc; - namelen = iam_it_key_size(&it->oi_it); + name = (char *)iam_it_key_get(&it->oi_it); + if (IS_ERR(name)) + RETURN(PTR_ERR(name)); - rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it); - if (IS_ERR(rec)) - RETURN(PTR_ERR(rec)); + namelen = iam_it_key_size(&it->oi_it); - rc = osd_fid_unpack(fid, rec); - if (rc) - RETURN(rc); + rec = (const struct osd_fid_pack *)iam_it_rec_get(&it->oi_it); + if (IS_ERR(rec)) + RETURN(PTR_ERR(rec)); - hash = iam_it_store(&it->oi_it); + rc = osd_fid_unpack(fid, rec); + if (rc) + RETURN(rc); - /* IAM does not store object type in IAM index (dir) */ - osd_it_pack_dirent(lde, fid, hash, name, namelen, - 0, LUDA_FID); + hash = iam_it_store(&it->oi_it); - return 0; + /* IAM does not store object type in IAM index (dir) */ + osd_it_pack_dirent(lde, fid, hash, name, namelen, + 0, LUDA_FID); + } else if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) { + iam_reccpy(&it->oi_it.ii_path.ip_leaf, + (struct iam_rec *)dtrec); + osd_quota_unpack(it->oi_obj, dtrec); + } else { + iam_reccpy(&it->oi_it.ii_path.ip_leaf, + (struct iam_rec *)dtrec); + } + + RETURN(0); } /** @@ -3840,6 +4243,7 @@ static const struct dt_index_operations osd_index_iam_ops = { } }; + /** * Creates or initializes iterator context. * @@ -3871,9 +4275,9 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env, it->oie_file.f_pos = 0; it->oie_file.f_dentry = obj_dentry; if (attr & LUDA_64BITHASH) - it->oie_file.f_flags = O_64BITHASH; + it->oie_file.f_mode |= FMODE_64BITHASH; else - it->oie_file.f_flags = O_32BITHASH; + it->oie_file.f_mode |= FMODE_32BITHASH; it->oie_file.f_mapping = obj->oo_inode->i_mapping; it->oie_file.f_op = obj->oo_inode->i_fop; it->oie_file.private_data = NULL; @@ -3946,6 +4350,7 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, unsigned d_type) { struct osd_it_ea *it = (struct osd_it_ea *)buf; + struct osd_object *obj = it->oie_obj; struct osd_it_ea_dirent *ent = it->oie_dirent; struct lu_fid *fid = &ent->oied_fid; struct osd_fid_pack *rec; @@ -3961,16 +4366,23 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, OSD_IT_EA_BUFSIZE) RETURN(1); - if (d_type & LDISKFS_DIRENT_LUFID) { - rec = (struct osd_fid_pack*) (name + namelen + 1); - - if (osd_fid_unpack(fid, rec) != 0) - fid_zero(fid); - - d_type &= ~LDISKFS_DIRENT_LUFID; - } else { - fid_zero(fid); - } + /* "." is just the object itself. */ + if (namelen == 1 && name[0] == '.') { + *fid = obj->oo_dt.do_lu.lo_header->loh_fid; + } else if (d_type & LDISKFS_DIRENT_LUFID) { + rec = (struct osd_fid_pack*) (name + namelen + 1); + if (osd_fid_unpack(fid, rec) != 0) + fid_zero(fid); + } else { + fid_zero(fid); + } + d_type &= ~LDISKFS_DIRENT_LUFID; + + /* NOT export local root. */ + if (unlikely(osd_sb(osd_obj2dev(obj))->s_root->d_inode->i_ino == ino)) { + ino = obj->oo_inode->i_ino; + *fid = obj->oo_dt.do_lu.lo_header->loh_fid; + } ent->oied_ino = ino; ent->oied_off = offset; @@ -4011,7 +4423,7 @@ static int osd_ldiskfs_it_fill(const struct lu_env *env, ldiskfs_htree_lock(hlock, obj->oo_hl_head, inode, LDISKFS_HLOCK_READDIR); } else { - cfs_down_read(&obj->oo_ext_idx_sem); + down_read(&obj->oo_ext_idx_sem); } result = inode->i_fop->readdir(&it->oie_file, it, @@ -4020,7 +4432,7 @@ static int osd_ldiskfs_it_fill(const struct lu_env *env, if (hlock != NULL) ldiskfs_htree_unlock(hlock); else - cfs_up_read(&obj->oo_ext_idx_sem); + up_read(&obj->oo_ext_idx_sem); if (it->oie_rd_dirent == 0) { result = -EIO; @@ -4058,7 +4470,7 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) it->oie_it_dirent++; RETURN(0); } else { - if (it->oie_file.f_pos == LDISKFS_HTREE_EOF) + if (it->oie_file.f_pos == ldiskfs_get_htree_eof(&it->oie_file)) rc = +1; else rc = osd_ldiskfs_it_fill(env, di); @@ -4096,10 +4508,337 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) return it->oie_dirent->oied_namelen; } +static int +osd_dirent_update(handle_t *jh, struct super_block *sb, + struct osd_it_ea_dirent *ent, struct lu_fid *fid, + struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de) +{ + struct osd_fid_pack *rec; + int rc; + ENTRY; + + LASSERT(de->file_type & LDISKFS_DIRENT_LUFID); + LASSERT(de->rec_len >= de->name_len + sizeof(struct osd_fid_pack)); + + rc = ldiskfs_journal_get_write_access(jh, bh); + if (rc != 0) { + CERROR("%.16s: fail to write access for update dirent: " + "name = %.*s, rc = %d\n", + LDISKFS_SB(sb)->s_es->s_volume_name, + ent->oied_namelen, ent->oied_name, rc); + RETURN(rc); + } + + rec = (struct osd_fid_pack *)(de->name + de->name_len + 1); + fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid); + rc = ldiskfs_journal_dirty_metadata(jh, bh); + if (rc != 0) + CERROR("%.16s: fail to dirty metadata for update dirent: " + "name = %.*s, rc = %d\n", + LDISKFS_SB(sb)->s_es->s_volume_name, + ent->oied_namelen, ent->oied_name, rc); + + RETURN(rc); +} + +static inline int +osd_dirent_has_space(__u16 reclen, __u16 namelen, unsigned blocksize) +{ + if (ldiskfs_rec_len_from_disk(reclen, blocksize) >= + __LDISKFS_DIR_REC_LEN(namelen + 1 + sizeof(struct osd_fid_pack))) + return 1; + else + return 0; +} + +static int +osd_dirent_reinsert(const struct lu_env *env, handle_t *jh, + struct inode *dir, struct inode *inode, + struct osd_it_ea_dirent *ent, struct lu_fid *fid, + struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de, + struct htree_lock *hlock) +{ + struct dentry *dentry; + struct osd_fid_pack *rec; + struct ldiskfs_dentry_param *ldp; + int rc; + ENTRY; + + if (!LDISKFS_HAS_INCOMPAT_FEATURE(inode->i_sb, + LDISKFS_FEATURE_INCOMPAT_DIRDATA)) + RETURN(0); + + /* There is enough space to hold the FID-in-dirent. */ + if (osd_dirent_has_space(de->rec_len, ent->oied_namelen, + dir->i_sb->s_blocksize)) { + rc = ldiskfs_journal_get_write_access(jh, bh); + if (rc != 0) { + CERROR("%.16s: fail to write access for reinsert " + "dirent: name = %.*s, rc = %d\n", + LDISKFS_SB(inode->i_sb)->s_es->s_volume_name, + ent->oied_namelen, ent->oied_name, rc); + RETURN(rc); + } + + de->name[de->name_len] = 0; + rec = (struct osd_fid_pack *)(de->name + de->name_len + 1); + rec->fp_len = sizeof(struct lu_fid) + 1; + fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid); + de->file_type |= LDISKFS_DIRENT_LUFID; + + rc = ldiskfs_journal_dirty_metadata(jh, bh); + if (rc != 0) + CERROR("%.16s: fail to dirty metadata for reinsert " + "dirent: name = %.*s, rc = %d\n", + LDISKFS_SB(inode->i_sb)->s_es->s_volume_name, + ent->oied_namelen, ent->oied_name, rc); + + RETURN(rc); + } + + rc = ldiskfs_delete_entry(jh, dir, de, bh); + if (rc != 0) { + CERROR("%.16s: fail to delete entry for reinsert dirent: " + "name = %.*s, rc = %d\n", + LDISKFS_SB(inode->i_sb)->s_es->s_volume_name, + ent->oied_namelen, ent->oied_name, rc); + RETURN(rc); + } + + dentry = osd_child_dentry_by_inode(env, dir, ent->oied_name, + ent->oied_namelen); + ldp = (struct ldiskfs_dentry_param *)osd_oti_get(env)->oti_ldp; + osd_get_ldiskfs_dirent_param(ldp, (const struct dt_rec *)fid); + dentry->d_fsdata = (void *)ldp; + ll_vfs_dq_init(dir); + rc = osd_ldiskfs_add_entry(jh, dentry, inode, hlock); + /* It is too bad, we cannot reinsert the name entry back. + * That means we lose it! */ + if (rc != 0) + CERROR("%.16s: fail to insert entry for reinsert dirent: " + "name = %.*s, rc = %d\n", + LDISKFS_SB(inode->i_sb)->s_es->s_volume_name, + ent->oied_namelen, ent->oied_name, rc); + + RETURN(rc); +} + +static int +osd_dirent_check_repair(const struct lu_env *env, struct osd_object *obj, + struct osd_it_ea *it, struct lu_fid *fid, + struct osd_inode_id *id, __u32 *attr) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; + struct osd_device *dev = osd_obj2dev(obj); + struct super_block *sb = osd_sb(dev); + const char *devname = + LDISKFS_SB(sb)->s_es->s_volume_name; + struct osd_it_ea_dirent *ent = it->oie_dirent; + struct inode *dir = obj->oo_inode; + struct htree_lock *hlock = NULL; + struct buffer_head *bh = NULL; + handle_t *jh = NULL; + struct ldiskfs_dir_entry_2 *de; + struct dentry *dentry; + struct inode *inode; + int credits; + int rc; + bool dirty = false; + bool is_dotdot = false; + ENTRY; + + if (ent->oied_name[0] == '.') { + /* Skip dot entry, even if it has stale FID-in-dirent, because + * we do not use such FID-in-dirent anymore, it is harmless. */ + if (ent->oied_namelen == 1) + RETURN(0); + + if (ent->oied_namelen == 2 && ent->oied_name[1] == '.') + is_dotdot = true; + } + + dentry = osd_child_dentry_get(env, obj, ent->oied_name, + ent->oied_namelen); + + /* We need to ensure that the name entry is still valid. + * Because it may be removed or renamed by other already. + * + * The unlink or rename operation will start journal before PDO lock, + * so to avoid deadlock, here we need to start journal handle before + * related PDO lock also. But because we do not know whether there + * will be something to be repaired before PDO lock, we just start + * journal without conditions. + * + * We may need to remove the name entry firstly, then insert back. + * One credit is for user quota file update. + * One credit is for group quota file update. + * Two credits are for dirty inode. */ + credits = osd_dto_credits_noquota[DTO_INDEX_DELETE] + + osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1 + 1 + 2; + +again: + if (dev->od_dirent_journal) { + jh = ldiskfs_journal_start_sb(sb, credits); + if (IS_ERR(jh)) { + rc = PTR_ERR(jh); + CERROR("%.16s: fail to start trans for dirent " + "check_repair: credits %d, name %.*s, rc %d\n", + devname, credits, ent->oied_namelen, + ent->oied_name, rc); + RETURN(rc); + } + } + + if (obj->oo_hl_head != NULL) { + hlock = osd_oti_get(env)->oti_hlock; + ldiskfs_htree_lock(hlock, obj->oo_hl_head, dir, + LDISKFS_HLOCK_DEL); + } else { + down_write(&obj->oo_ext_idx_sem); + } + + bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); + /* For dotdot entry, if there is not enough space to hold FID-in-dirent, + * just keep it there. It only happens when the device upgraded from 1.8 + * or restored from MDT file-level backup. For the whole directory, only + * dotdot entry has no FID-in-dirent and needs to get FID from LMA when + * readdir, it will not affect the performance much. */ + if ((bh == NULL) || (le32_to_cpu(de->inode) != ent->oied_ino) || + (is_dotdot && !osd_dirent_has_space(de->rec_len, + ent->oied_namelen, + sb->s_blocksize))) { + *attr |= LUDA_IGNORE; + GOTO(out_journal, rc = 0); + } + + osd_id_gen(id, ent->oied_ino, OSD_OII_NOGEN); + inode = osd_iget(info, dev, id); + if (IS_ERR(inode)) { + rc = PTR_ERR(inode); + if (rc == -ENOENT || rc == -ESTALE) { + *attr |= LUDA_IGNORE; + rc = 0; + } + + GOTO(out_journal, rc); + } + + rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma); + if (rc == 0) { + if (fid_is_sane(fid)) { + /* FID-in-dirent is valid. */ + if (lu_fid_eq(fid, &lma->lma_self_fid)) + GOTO(out_inode, rc = 0); + + /* Do not repair under dryrun mode. */ + if (*attr & LUDA_VERIFY_DRYRUN) { + *attr |= LUDA_REPAIR; + GOTO(out_inode, rc = 0); + } + + if (!dev->od_dirent_journal) { + iput(inode); + brelse(bh); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + up_write(&obj->oo_ext_idx_sem); + dev->od_dirent_journal = 1; + goto again; + } + + *fid = lma->lma_self_fid; + dirty = true; + /* Update the FID-in-dirent. */ + rc = osd_dirent_update(jh, sb, ent, fid, bh, de); + if (rc == 0) + *attr |= LUDA_REPAIR; + } else { + /* Do not repair under dryrun mode. */ + if (*attr & LUDA_VERIFY_DRYRUN) { + *attr |= LUDA_REPAIR; + GOTO(out_inode, rc = 0); + } + + if (!dev->od_dirent_journal) { + iput(inode); + brelse(bh); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + up_write(&obj->oo_ext_idx_sem); + dev->od_dirent_journal = 1; + goto again; + } + + *fid = lma->lma_self_fid; + dirty = true; + /* Append the FID-in-dirent. */ + rc = osd_dirent_reinsert(env, jh, dir, inode, ent, + fid, bh, de, hlock); + if (rc == 0) + *attr |= LUDA_REPAIR; + } + } else if (rc == -ENODATA) { + /* Do not repair under dryrun mode. */ + if (*attr & LUDA_VERIFY_DRYRUN) { + if (fid_is_sane(fid)) + *attr |= LUDA_REPAIR; + else + *attr |= LUDA_UPGRADE; + GOTO(out_inode, rc = 0); + } + + if (!dev->od_dirent_journal) { + iput(inode); + brelse(bh); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + up_write(&obj->oo_ext_idx_sem); + dev->od_dirent_journal = 1; + goto again; + } + + dirty = true; + if (unlikely(fid_is_sane(fid))) { + /* FID-in-dirent exists, but FID-in-LMA is lost. + * Trust the FID-in-dirent, and add FID-in-LMA. */ + rc = osd_ea_fid_set(info, inode, fid); + if (rc == 0) + *attr |= LUDA_REPAIR; + } else { + lu_igif_build(fid, inode->i_ino, inode->i_generation); + /* It is probably IGIF object. Only aappend the + * FID-in-dirent. OI scrub will process FID-in-LMA. */ + rc = osd_dirent_reinsert(env, jh, dir, inode, ent, + fid, bh, de, hlock); + if (rc == 0) + *attr |= LUDA_UPGRADE; + } + } + + GOTO(out_inode, rc); + +out_inode: + iput(inode); + +out_journal: + brelse(bh); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + up_write(&obj->oo_ext_idx_sem); + if (jh != NULL) + ldiskfs_journal_stop(jh); + if (rc >= 0 && !dirty) + dev->od_dirent_journal = 0; + return rc; +} /** - * Returns the value (i.e. fid/igif) at current position from iterator's - * in memory structure. + * Returns the value at current position from iterator's in memory structure. * * \param di struct osd_it_ea, iterator's in memory structure * \param attr attr requested for dirent. @@ -4109,27 +4848,69 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) * \retval -ve on error */ static inline int osd_it_ea_rec(const struct lu_env *env, - const struct dt_it *di, - struct dt_rec *dtrec, __u32 attr) -{ - struct osd_it_ea *it = (struct osd_it_ea *)di; - struct osd_object *obj = it->oie_obj; - struct lu_fid *fid = &it->oie_dirent->oied_fid; - struct lu_dirent *lde = (struct lu_dirent *)dtrec; - int rc = 0; - - ENTRY; - - if (!fid_is_sane(fid)) - rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid); - - if (rc == 0) - osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off, - it->oie_dirent->oied_name, - it->oie_dirent->oied_namelen, - it->oie_dirent->oied_type, - attr); - RETURN(rc); + const struct dt_it *di, + struct dt_rec *dtrec, __u32 attr) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + struct osd_device *dev = osd_obj2dev(obj); + struct osd_scrub *scrub = &dev->od_scrub; + struct scrub_file *sf = &scrub->os_file; + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_inode_id *id = &oti->oti_id; + struct osd_idmap_cache *oic = &oti->oti_cache; + struct lu_fid *fid = &it->oie_dirent->oied_fid; + struct lu_dirent *lde = (struct lu_dirent *)dtrec; + __u32 ino = it->oie_dirent->oied_ino; + int rc = 0; + ENTRY; + + if (attr & LUDA_VERIFY) { + attr |= LUDA_TYPE; + if (unlikely(ino == osd_sb(dev)->s_root->d_inode->i_ino)) { + attr |= LUDA_IGNORE; + rc = 0; + goto pack; + } + + rc = osd_dirent_check_repair(env, obj, it, fid, id, &attr); + } else { + attr &= ~LU_DIRENT_ATTRS_MASK; + if (!fid_is_sane(fid)) { + if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP)) + RETURN(-ENOENT); + + rc = osd_ea_fid_get(env, obj, ino, fid, id); + } else { + osd_id_gen(id, ino, OSD_OII_NOGEN); + } + } + + if (rc < 0) + RETURN(rc); + +pack: + osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off, + it->oie_dirent->oied_name, + it->oie_dirent->oied_namelen, + it->oie_dirent->oied_type, attr); + + if (osd_remote_fid(env, dev, fid)) + RETURN(0); + + if (likely(!(attr & LUDA_IGNORE))) { + oic->oic_lid = *id; + oic->oic_fid = *fid; + } + + if (!(attr & LUDA_VERIFY) && + (scrub->os_pos_current <= ino) && + ((sf->sf_flags & SF_INCONSISTENT) || + (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) || + ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap))) + osd_consistency_check(oti, dev, oic); + + RETURN(rc); } /** @@ -4197,7 +4978,6 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, return -EACCES; rc = osd_ea_lookup_rec(env, obj, rec, key); - if (rc == 0) rc = +1; RETURN(rc); @@ -4258,12 +5038,14 @@ static void *osd_key_init(const struct lu_context *ctx, static void osd_key_fini(const struct lu_context *ctx, struct lu_context_key *key, void* data) { - struct osd_thread_info *info = data; + struct osd_thread_info *info = data; - if (info->oti_hlock != NULL) - ldiskfs_htree_lock_free(info->oti_hlock); - OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); - OBD_FREE_PTR(info); + if (info->oti_hlock != NULL) + ldiskfs_htree_lock_free(info->oti_hlock); + OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + lu_buf_free(&info->oti_iobuf.dr_pg_buf); + lu_buf_free(&info->oti_iobuf.dr_bl_buf); + OBD_FREE_PTR(info); } static void osd_key_exit(const struct lu_context *ctx, @@ -4280,7 +5062,7 @@ static void osd_key_exit(const struct lu_context *ctx, LU_TYPE_INIT_FINI(osd, &osd_key); struct lu_context_key osd_key = { - .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD, + .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL, .lct_init = osd_key_init, .lct_fini = osd_key_fini, .lct_exit = osd_key_exit @@ -4290,59 +5072,130 @@ struct lu_context_key osd_key = { static int osd_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) { - return osd_procfs_init(osd_dev(d), name); + struct osd_device *osd = osd_dev(d); + + if (strlcpy(osd->od_svname, name, sizeof(osd->od_svname)) + >= sizeof(osd->od_svname)) + return -E2BIG; + return osd_procfs_init(osd, name); } static int osd_shutdown(const struct lu_env *env, struct osd_device *o) { - struct osd_thread_info *info = osd_oti_get(env); - ENTRY; - if (o->od_obj_area != NULL) { - lu_object_put(env, &o->od_obj_area->do_lu); - o->od_obj_area = NULL; - } - if (o->od_oi_table != NULL) - osd_oi_fini(info, &o->od_oi_table, o->od_oi_count); + ENTRY; - RETURN(0); + osd_scrub_cleanup(env, o); + + if (o->od_fsops) { + fsfilt_put_ops(o->od_fsops); + o->od_fsops = NULL; + } + + /* shutdown quota slave instance associated with the device */ + if (o->od_quota_slave != NULL) { + qsd_fini(env, o->od_quota_slave); + o->od_quota_slave = NULL; + } + + RETURN(0); } static int osd_mount(const struct lu_env *env, struct osd_device *o, struct lustre_cfg *cfg) { - struct lustre_mount_info *lmi; - const char *dev = lustre_cfg_string(cfg, 0); - struct lustre_disk_data *ldd; - struct lustre_sb_info *lsi; - + const char *name = lustre_cfg_string(cfg, 0); + const char *dev = lustre_cfg_string(cfg, 1); + const char *opts; + unsigned long page, s_flags, lmd_flags = 0; + struct page *__page; + struct file_system_type *type; + char *options = NULL; + char *str; + int rc = 0; ENTRY; - if (o->od_mount != NULL) { - CERROR("Already mounted (%s)\n", dev); - RETURN(-EEXIST); - } - /* get mount */ - lmi = server_get_mount(dev); - if (lmi == NULL) { - CERROR("Cannot get mount info for %s!\n", dev); - RETURN(-EFAULT); + if (o->od_mnt != NULL) + RETURN(0); + + if (strlen(dev) >= sizeof(o->od_mntdev)) + RETURN(-E2BIG); + strcpy(o->od_mntdev, dev); + + o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS)); + if (o->od_fsops == NULL) { + CERROR("Can't find fsfilt_ldiskfs\n"); + RETURN(-ENOTSUPP); } - LASSERT(lmi != NULL); - /* save lustre_mount_info in dt_device */ - o->od_mount = lmi; + OBD_PAGE_ALLOC(__page, CFS_ALLOC_STD); + if (__page == NULL) + GOTO(out, rc = -ENOMEM); + + str = lustre_cfg_string(cfg, 2); + s_flags = simple_strtoul(str, NULL, 0); + str = strstr(str, ":"); + if (str) + lmd_flags = simple_strtoul(str + 1, NULL, 0); + opts = lustre_cfg_string(cfg, 3); + page = (unsigned long)cfs_page_address(__page); + options = (char *)page; + *options = '\0'; + if (opts == NULL) + strcat(options, "user_xattr,acl"); + else + strcat(options, opts); + + /* Glom up mount options */ + if (*options != '\0') + strcat(options, ","); + strlcat(options, "no_mbcache", CFS_PAGE_SIZE); + + type = get_fs_type("ldiskfs"); + if (!type) { + CERROR("%s: cannot find ldiskfs module\n", name); + GOTO(out, rc = -ENODEV); + } + + o->od_mnt = vfs_kern_mount(type, s_flags, dev, options); + cfs_module_put(type->owner); + + if (IS_ERR(o->od_mnt)) { + rc = PTR_ERR(o->od_mnt); + CERROR("%s: can't mount %s: %d\n", name, dev, rc); + o->od_mnt = NULL; + GOTO(out, rc); + } + +#ifdef HAVE_DEV_SET_RDONLY + if (dev_check_rdonly(o->od_mnt->mnt_sb->s_bdev)) { + CERROR("%s: underlying device %s is marked as read-only. " + "Setup failed\n", name, dev); + mntput(o->od_mnt); + o->od_mnt = NULL; + GOTO(out, rc = -EROFS); + } +#endif + + if (!LDISKFS_HAS_COMPAT_FEATURE(o->od_mnt->mnt_sb, + LDISKFS_FEATURE_COMPAT_HAS_JOURNAL)) { + CERROR("%s: device %s is mounted w/o journal\n", name, dev); + mntput(o->od_mnt); + o->od_mnt = NULL; + GOTO(out, rc = -EINVAL); + } - lsi = s2lsi(lmi->lmi_sb); - ldd = lsi->lsi_ldd; + ldiskfs_set_inode_state(osd_sb(o)->s_root->d_inode, + LDISKFS_STATE_LUSTRE_NO_OI); + if (lmd_flags & LMD_FLG_NOSCRUB) + o->od_noscrub = 1; - if (ldd->ldd_flags & LDD_F_IAM_DIR) { - o->od_iop_mode = 0; - LCONSOLE_WARN("OSD: IAM mode enabled\n"); - } else - o->od_iop_mode = 1; +out: + if (__page) + OBD_PAGE_FREE(__page); + if (rc) + fsfilt_put_ops(o->od_fsops); - o->od_obj_area = NULL; - RETURN(0); + RETURN(rc); } static struct lu_device *osd_device_fini(const struct lu_env *env, @@ -4351,6 +5204,10 @@ static struct lu_device *osd_device_fini(const struct lu_env *env, int rc; ENTRY; + rc = osd_shutdown(env, osd_dev(d)); + + osd_obj_map_fini(osd_dev(d)); + shrink_dcache_sb(osd_sb(osd_dev(d))); osd_sync(env, lu2dt_dev(d)); @@ -4360,45 +5217,138 @@ static struct lu_device *osd_device_fini(const struct lu_env *env, RETURN (ERR_PTR(rc)); } - if (osd_dev(d)->od_mount) - server_put_mount(osd_dev(d)->od_mount->lmi_name, - osd_dev(d)->od_mount->lmi_mnt); - osd_dev(d)->od_mount = NULL; + if (osd_dev(d)->od_mnt) { + mntput(osd_dev(d)->od_mnt); + osd_dev(d)->od_mnt = NULL; + } RETURN(NULL); } +static int osd_device_init0(const struct lu_env *env, + struct osd_device *o, + struct lustre_cfg *cfg) +{ + struct lu_device *l = osd2lu_dev(o); + struct osd_thread_info *info; + int rc; + int cplen = 0; + + /* if the module was re-loaded, env can loose its keys */ + rc = lu_env_refill((struct lu_env *) env); + if (rc) + GOTO(out, rc); + info = osd_oti_get(env); + LASSERT(info); + + l->ld_ops = &osd_lu_ops; + o->od_dt_dev.dd_ops = &osd_dt_ops; + + spin_lock_init(&o->od_osfs_lock); + mutex_init(&o->od_otable_mutex); + o->od_osfs_age = cfs_time_shift_64(-1000); + + o->od_capa_hash = init_capa_hash(); + if (o->od_capa_hash == NULL) + GOTO(out, rc = -ENOMEM); + + o->od_read_cache = 1; + o->od_writethrough_cache = 1; + o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE; + + rc = osd_mount(env, o, cfg); + if (rc) + GOTO(out_capa, rc); + + CFS_INIT_LIST_HEAD(&o->od_ios_list); + /* setup scrub, including OI files initialization */ + rc = osd_scrub_setup(env, o); + if (rc < 0) + GOTO(out_mnt, rc); + + cplen = strlcpy(o->od_svname, lustre_cfg_string(cfg, 4), + sizeof(o->od_svname)); + if (cplen >= sizeof(o->od_svname)) { + rc = -E2BIG; + GOTO(out_mnt, rc); + } + + rc = osd_obj_map_init(o); + if (rc != 0) + GOTO(out_scrub, rc); + + rc = lu_site_init(&o->od_site, l); + if (rc) + GOTO(out_compat, rc); + o->od_site.ls_bottom_dev = l; + + rc = lu_site_init_finish(&o->od_site); + if (rc) + GOTO(out_site, rc); + + rc = osd_procfs_init(o, o->od_svname); + if (rc != 0) { + CERROR("%s: can't initialize procfs: rc = %d\n", + o->od_svname, rc); + GOTO(out_site, rc); + } + + LASSERT(l->ld_site->ls_linkage.next && l->ld_site->ls_linkage.prev); + + /* initialize quota slave instance */ + o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev, + o->od_proc_entry); + if (IS_ERR(o->od_quota_slave)) { + rc = PTR_ERR(o->od_quota_slave); + o->od_quota_slave = NULL; + GOTO(out_procfs, rc); + } + + RETURN(0); +out_procfs: + osd_procfs_fini(o); +out_site: + lu_site_fini(&o->od_site); +out_compat: + osd_obj_map_fini(o); +out_scrub: + osd_scrub_cleanup(env, o); +out_mnt: + osd_oi_fini(info, o); + osd_shutdown(env, o); + mntput(o->od_mnt); + o->od_mnt = NULL; +out_capa: + cleanup_capa_hash(o->od_capa_hash); +out: + RETURN(rc); +} + static struct lu_device *osd_device_alloc(const struct lu_env *env, struct lu_device_type *t, struct lustre_cfg *cfg) { - struct lu_device *l; - struct osd_device *o; - - OBD_ALLOC_PTR(o); - if (o != NULL) { - int result; - - result = dt_device_init(&o->od_dt_dev, t); - if (result == 0) { - l = osd2lu_dev(o); - l->ld_ops = &osd_lu_ops; - o->od_dt_dev.dd_ops = &osd_dt_ops; - cfs_spin_lock_init(&o->od_osfs_lock); - o->od_osfs_age = cfs_time_shift_64(-1000); - o->od_capa_hash = init_capa_hash(); - if (o->od_capa_hash == NULL) { - dt_device_fini(&o->od_dt_dev); - l = ERR_PTR(-ENOMEM); - } - } else - l = ERR_PTR(result); + struct osd_device *o; + int rc; - if (IS_ERR(l)) - OBD_FREE_PTR(o); - } else - l = ERR_PTR(-ENOMEM); - return l; + OBD_ALLOC_PTR(o); + if (o == NULL) + return ERR_PTR(-ENOMEM); + + rc = dt_device_init(&o->od_dt_dev, t); + if (rc == 0) { + /* Because the ctx might be revived in dt_device_init, + * refill the env here */ + lu_env_refill((struct lu_env *)env); + rc = osd_device_init0(env, o, cfg); + if (rc) + dt_device_fini(&o->od_dt_dev); + } + + if (unlikely(rc != 0)) + OBD_FREE_PTR(o); + + return rc == 0 ? osd2lu_dev(o) : ERR_PTR(rc); } static struct lu_device *osd_device_free(const struct lu_env *env, @@ -4408,6 +5358,14 @@ static struct lu_device *osd_device_free(const struct lu_env *env, ENTRY; cleanup_capa_hash(o->od_capa_hash); + /* XXX: make osd top device in order to release reference */ + d->ld_site->ls_top_dev = d; + lu_site_purge(env, d->ld_site, -1); + if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) { + LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL); + lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer); + } + lu_site_fini(&o->od_site); dt_device_fini(&o->od_dt_dev); OBD_FREE_PTR(o); RETURN(NULL); @@ -4425,8 +5383,9 @@ static int osd_process_config(const struct lu_env *env, err = osd_mount(env, o, cfg); break; case LCFG_CLEANUP: - err = osd_shutdown(env, o); - break; + lu_dev_del_linkage(d->ld_site, d); + err = osd_shutdown(env, o); + break; default: err = -ENOSYS; } @@ -4437,53 +5396,92 @@ static int osd_process_config(const struct lu_env *env, static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d) { - RETURN(0); + struct osd_device *osd = osd_dev(d); + int rc = 0; + ENTRY; + + if (osd->od_quota_slave == NULL) + RETURN(0); + + /* start qsd instance on recovery completion, this notifies the quota + * slave code that we are about to process new requests now */ + rc = qsd_start(env, osd->od_quota_slave); + RETURN(rc); +} + +/* + * we use exports to track all osd users + */ +static int osd_obd_connect(const struct lu_env *env, struct obd_export **exp, + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data, void *localdata) +{ + struct osd_device *osd = osd_dev(obd->obd_lu_dev); + struct lustre_handle conn; + int rc; + ENTRY; + + CDEBUG(D_CONFIG, "connect #%d\n", osd->od_connects); + + rc = class_connect(&conn, obd, cluuid); + if (rc) + RETURN(rc); + + *exp = class_conn2export(&conn); + + spin_lock(&osd->od_osfs_lock); + osd->od_connects++; + spin_unlock(&osd->od_osfs_lock); + + RETURN(0); +} + +/* + * once last export (we don't count self-export) disappeared + * osd can be released + */ +static int osd_obd_disconnect(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + struct osd_device *osd = osd_dev(obd->obd_lu_dev); + int rc, release = 0; + ENTRY; + + /* Only disconnect the underlying layers on the final disconnect. */ + spin_lock(&osd->od_osfs_lock); + osd->od_connects--; + if (osd->od_connects == 0) + release = 1; + spin_unlock(&osd->od_osfs_lock); + + rc = class_disconnect(exp); /* bz 9811 */ + + if (rc == 0 && release) + class_manual_cleanup(obd); + RETURN(rc); } -static int osd_prepare(const struct lu_env *env, - struct lu_device *pdev, +static int osd_prepare(const struct lu_env *env, struct lu_device *pdev, struct lu_device *dev) { - struct osd_device *osd = osd_dev(dev); - struct lustre_sb_info *lsi; - struct lustre_disk_data *ldd; - struct lustre_mount_info *lmi; - struct osd_thread_info *oti = osd_oti_get(env); - struct dt_object *d; - int result; + struct osd_device *osd = osd_dev(dev); + int result = 0; + ENTRY; - ENTRY; - /* 1. initialize oi before any file create or file open */ - result = osd_oi_init(oti, &osd->od_oi_table, - &osd->od_dt_dev, lu2md_dev(pdev)); - if (result < 0) - RETURN(result); - - LASSERT(result > 0); - osd->od_oi_count = result; - - lmi = osd->od_mount; - lsi = s2lsi(lmi->lmi_sb); - ldd = lsi->lsi_ldd; - - /* 2. setup local objects */ - result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev)); - if (result) - goto out; - - /* 3. open remote object dir */ - d = dt_store_open(env, lu2dt_dev(dev), "", - remote_obj_dir, &oti->oti_fid); - if (!IS_ERR(d)) { - osd->od_obj_area = d; - result = 0; - } else { - result = PTR_ERR(d); - osd->od_obj_area = NULL; - } + if (dev->ld_site && lu_device_is_md(dev->ld_site->ls_top_dev)) { + /* MDT/MDD still use old infrastructure to create + * special files */ + result = llo_local_objects_setup(env, lu2md_dev(pdev), + lu2dt_dev(dev)); + if (result) + RETURN(result); + } -out: - RETURN(result); + if (osd->od_quota_slave != NULL) + /* set up quota slave objects */ + result = qsd_prepare(env, osd->od_quota_slave); + + RETURN(result); } static const struct lu_object_operations osd_lu_obj_ops = { @@ -4516,25 +5514,20 @@ static const struct lu_device_type_operations osd_device_type_ops = { .ldto_device_fini = osd_device_fini }; -static struct lu_device_type osd_device_type = { +struct lu_device_type osd_device_type = { .ldt_tags = LU_DEVICE_DT, - .ldt_name = LUSTRE_OSD_NAME, + .ldt_name = LUSTRE_OSD_LDISKFS_NAME, .ldt_ops = &osd_device_type_ops, - .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD + .ldt_ctx_tags = LCT_LOCAL, }; /* * lprocfs legacy support. */ static struct obd_ops osd_obd_device_ops = { - .o_owner = THIS_MODULE -}; - -static struct lu_local_obj_desc llod_osd_rem_obj_dir = { - .llod_name = remote_obj_dir, - .llod_oid = OSD_REM_OBJ_DIR_OID, - .llod_is_index = 1, - .llod_feat = &dt_directory_features, + .o_owner = THIS_MODULE, + .o_connect = osd_obd_connect, + .o_disconnect = osd_obd_disconnect }; static int __init osd_mod_init(void) @@ -4542,20 +5535,18 @@ static int __init osd_mod_init(void) struct lprocfs_static_vars lvars; osd_oi_mod_init(); - llo_local_obj_register(&llod_osd_rem_obj_dir); lprocfs_osd_init_vars(&lvars); return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars, - LUSTRE_OSD_NAME, &osd_device_type); + LUSTRE_OSD_LDISKFS_NAME, &osd_device_type); } static void __exit osd_mod_exit(void) { - llo_local_obj_unregister(&llod_osd_rem_obj_dir); - class_unregister_type(LUSTRE_OSD_NAME); + class_unregister_type(LUSTRE_OSD_LDISKFS_NAME); } MODULE_AUTHOR("Sun Microsystems, Inc. "); -MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")"); +MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_LDISKFS_NAME")"); MODULE_LICENSE("GPL"); cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);