From 2fca7e2cf4d6a0c6f4fd9328a7686b7addad6dd2 Mon Sep 17 00:00:00 2001 From: wangdi Date: Thu, 19 Oct 2006 15:49:53 +0000 Subject: [PATCH] Branch: b_new_cmd update for reorganizing mdd files --- lustre/mdd/Makefile.in | 2 +- lustre/mdd/mdd_dir.c | 1357 ++++++++++++++++++++++++++++++++++++ lustre/mdd/mdd_handler.c | 1688 +++++---------------------------------------- lustre/mdd/mdd_internal.h | 48 +- lustre/mdd/mdd_orphans.c | 2 +- 5 files changed, 1580 insertions(+), 1517 deletions(-) create mode 100644 lustre/mdd/mdd_dir.c diff --git a/lustre/mdd/Makefile.in b/lustre/mdd/Makefile.in index cda9afd..faa2af3 100644 --- a/lustre/mdd/Makefile.in +++ b/lustre/mdd/Makefile.in @@ -1,5 +1,5 @@ MODULES := mdd -mdd-objs := mdd_handler.o mdd_lov.o mdd_orphans.o mdd_lproc.o +mdd-objs := mdd_handler.o mdd_lov.o mdd_orphans.o mdd_lproc.o mdd_dir.o mdd-objs += mdd_device.o mdd_trans.o mdd_object.o mdd_permission.o EXTRA_PRE_CFLAGS := -I@LINUX@/fs -I@LUSTRE@ -I@LUSTRE@/ldiskfs diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c new file mode 100644 index 0000000..f11d5d5 --- /dev/null +++ b/lustre/mdd/mdd_dir.c @@ -0,0 +1,1357 @@ +/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * mdd/mdd_handler.c + * Lustre Metadata Server (mdd) routines + * + * Copyright (C) 2006 Cluster File Systems, Inc. + * Author: Wang Di + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_MDS + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "mdd_internal.h" + +static const char dot[] = "."; +static const char dotdot[] = ".."; + +static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, + const char *name, const struct lu_fid* fid, int mask); +static int +__mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, + const char *name, const struct lu_fid* fid, int mask) +{ + struct mdd_object *mdd_obj = md2mdd_obj(pobj); + int rc; + + mdd_read_lock(env, mdd_obj); + rc = __mdd_lookup(env, pobj, name, fid, mask); + mdd_read_unlock(env, mdd_obj); + + return rc; +} + +static int mdd_lookup(const struct lu_env *env, + struct md_object *pobj, const char *name, + struct lu_fid* fid) +{ + int rc; + ENTRY; + rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC); + RETURN(rc); +} + + +static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, + struct lu_fid *fid) +{ + return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0); +} + +/* + * return 1: if lf is the fid of the ancestor of p1; + * return 0: if not; + * + * return -EREMOTE: if remote object is found, in this + * case fid of remote object is saved to @pf; + * + * otherwise: values < 0, errors. + */ +static int mdd_is_parent(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *p1, + const struct lu_fid *lf, + struct lu_fid *pf) +{ + struct mdd_object *parent = NULL; + struct lu_fid *pfid; + int rc; + ENTRY; + + LASSERT(!lu_fid_eq(mdo2fid(p1), lf)); + pfid = &mdd_env_info(env)->mti_fid; + + /* Do not lookup ".." in root, they do not exist there. */ + if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid)) + RETURN(0); + + for(;;) { + rc = mdd_parent_fid(env, p1, pfid); + if (rc) + GOTO(out, rc); + if (lu_fid_eq(pfid, &mdd->mdd_root_fid)) + GOTO(out, rc = 0); + if (lu_fid_eq(pfid, lf)) + GOTO(out, rc = 1); + if (parent) + mdd_object_put(env, parent); + parent = mdd_object_find(env, mdd, pfid); + + /* cross-ref parent */ + if (parent == NULL) { + if (pf != NULL) + *pf = *pfid; + GOTO(out, rc = EREMOTE); + } else if (IS_ERR(parent)) + GOTO(out, rc = PTR_ERR(parent)); + p1 = parent; + } + EXIT; +out: + if (parent && !IS_ERR(parent)) + mdd_object_put(env, parent); + return rc; +} + +/* + * No permission check is needed. + * + * returns 1: if fid is ancestor of @mo; + * returns 0: if fid is not a ancestor of @mo; + * + * returns EREMOTE if remote object is found, fid of remote object is saved to + * @fid; + * + * returns < 0: if error + */ +static int mdd_is_subdir(const struct lu_env *env, + struct md_object *mo, const struct lu_fid *fid, + struct lu_fid *sfid) +{ + struct mdd_device *mdd = mdo2mdd(mo); + int rc; + ENTRY; + + if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo)))) + RETURN(0); + + rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid); + + RETURN(rc); +} + +/*Check whether it may create the cobj under the pobj*/ +static int mdd_may_create(const struct lu_env *env, + struct mdd_object *pobj, struct mdd_object *cobj, + int need_check) +{ + int rc = 0; + ENTRY; + + if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu)) + RETURN(-EEXIST); + + if (mdd_is_dead_obj(pobj)) + RETURN(-ENOENT); + + /*check pobj may create or not*/ + if (need_check) + rc = mdd_permission_internal(env, pobj, + MAY_WRITE | MAY_EXEC); + + RETURN(rc); +} + +/* + * It's inline, so penalty for filesystems that don't use sticky bit is + * minimal. + */ +static inline int mdd_is_sticky(const struct lu_env *env, + struct mdd_object *pobj, + struct mdd_object *cobj) +{ + struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; + struct md_ucred *uc = md_ucred(env); + int rc; + + rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA); + if (rc) { + return rc; + } else if (tmp_la->la_uid == uc->mu_fsuid) { + return 0; + } else { + rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA); + if (rc) + return rc; + else if (!(tmp_la->la_mode & S_ISVTX)) + return 0; + else if (tmp_la->la_uid == uc->mu_fsuid) + return 0; + else + return !mdd_capable(uc, CAP_FOWNER); + } +} + +/* Check whether it may delete the cobj under the pobj. */ +static int mdd_may_delete(const struct lu_env *env, + struct mdd_object *pobj, + struct mdd_object *cobj, + int is_dir, int need_check) +{ + struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj); + int rc = 0; + ENTRY; + + LASSERT(cobj); + + if (!lu_object_exists(&cobj->mod_obj.mo_lu)) + RETURN(-ENOENT); + + if (mdd_is_immutable(cobj) || mdd_is_append(cobj)) + RETURN(-EPERM); + + if (is_dir) { + if (!S_ISDIR(mdd_object_type(cobj))) + RETURN(-ENOTDIR); + + if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid)) + RETURN(-EBUSY); + + } else if (S_ISDIR(mdd_object_type(cobj))) { + RETURN(-EISDIR); + } + + if (pobj) { + if (mdd_is_dead_obj(pobj)) + RETURN(-ENOENT); + + if (mdd_is_sticky(env, pobj, cobj)) + RETURN(-EPERM); + + if (need_check) + rc = mdd_permission_internal(env, pobj, + MAY_WRITE | MAY_EXEC); + } + RETURN(rc); +} + +int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj, + struct mdd_object *src_obj) +{ + int rc = 0; + ENTRY; + + if (tgt_obj) { + rc = mdd_may_create(env, tgt_obj, NULL, 1); + if (rc) + RETURN(rc); + } + + if (S_ISDIR(mdd_object_type(src_obj))) + RETURN(-EPERM); + + if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj)) + RETURN(-EPERM); + + RETURN(rc); +} + +static void mdd_lock2(const struct lu_env *env, + struct mdd_object *o0, struct mdd_object *o1) +{ + mdd_write_lock(env, o0); + mdd_write_lock(env, o1); +} + +static void mdd_unlock2(const struct lu_env *env, + struct mdd_object *o0, struct mdd_object *o1) +{ + mdd_write_unlock(env, o1); + mdd_write_unlock(env, o0); +} + +/* insert new index, add reference if isdir, update times */ +static int __mdd_index_insert(const struct lu_env *env, + struct mdd_object *pobj, const struct lu_fid *lf, + const char *name, int isdir, struct thandle *th, + struct lustre_capa *capa) +{ + int rc; + struct dt_object *next = mdd_object_child(pobj); + ENTRY; + +#if 0 + struct lu_attr *la = &mdd_env_info(env)->mti_la; +#endif + + if (dt_try_as_dir(env, next)) + rc = next->do_index_ops->dio_insert(env, next, + (struct dt_rec *)lf, + (struct dt_key *)name, + th, capa); + else + rc = -ENOTDIR; + + if (rc == 0) { + if (isdir) + mdd_ref_add_internal(env, pobj, th); +#if 0 + la->la_valid = LA_MTIME|LA_CTIME; + la->la_atime = ma->ma_attr.la_atime; + la->la_ctime = ma->ma_attr.la_ctime; + rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0); +#endif + } + return rc; +} + +static int __mdd_index_delete(const struct lu_env *env, + struct mdd_object *pobj, const char *name, + int is_dir, struct thandle *handle, + struct lustre_capa *capa) +{ + int rc; + struct dt_object *next = mdd_object_child(pobj); + ENTRY; + + if (dt_try_as_dir(env, next)) { + rc = next->do_index_ops->dio_delete(env, next, + (struct dt_key *)name, + handle, capa); + if (rc == 0 && is_dir) + mdd_ref_del_internal(env, pobj, handle); + } else + rc = -ENOTDIR; + RETURN(rc); +} + + +static int __mdd_index_insert_only(const struct lu_env *env, + struct mdd_object *pobj, + const struct lu_fid *lf, + const char *name, struct thandle *th, + struct lustre_capa *capa) +{ + int rc; + struct dt_object *next = mdd_object_child(pobj); + ENTRY; + + if (dt_try_as_dir(env, next)) + rc = next->do_index_ops->dio_insert(env, next, + (struct dt_rec *)lf, + (struct dt_key *)name, th, capa); + else + rc = -ENOTDIR; + RETURN(rc); +} + +static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, + struct md_object *src_obj, const char *name, + struct md_attr *ma) +{ + struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj); + struct mdd_object *mdd_sobj = md2mdd_obj(src_obj); + struct mdd_device *mdd = mdo2mdd(src_obj); + struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; + struct thandle *handle; + int rc; + ENTRY; + + mdd_txn_param_build(env, MDD_TXN_LINK_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + mdd_lock2(env, mdd_tobj, mdd_sobj); + + rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj); + if (rc) + GOTO(out, rc); + + rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj), + name, handle, + mdd_object_capa(env, mdd_tobj)); + if (rc == 0) + mdd_ref_add_internal(env, mdd_sobj, handle); + + *la_copy = ma->ma_attr; + la_copy->la_valid = LA_CTIME; + rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0); + if (rc) + GOTO(out, rc); + + la_copy->la_valid = LA_CTIME | LA_MTIME; + rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0); + +out: + mdd_unlock2(env, mdd_tobj, mdd_sobj); + mdd_trans_stop(env, mdd, rc, handle); + RETURN(rc); +} + +/* caller should take a lock before calling */ +int mdd_finish_unlink(const struct lu_env *env, + struct mdd_object *obj, struct md_attr *ma, + struct thandle *th) +{ + int rc; + ENTRY; + + rc = mdd_iattr_get(env, obj, ma); + if (rc == 0 && ma->ma_attr.la_nlink == 0) { + /* add new orphan and the object + * will be deleted during the object_put() */ + if (__mdd_orphan_add(env, obj, th) == 0) + set_bit(LU_OBJECT_ORPHAN, + &mdd2lu_obj(obj)->lo_header->loh_flags); + + if (obj->mod_count == 0) + rc = mdd_object_kill(env, obj, ma); + } + RETURN(rc); +} + +/* + * Check that @dir contains no entries except (possibly) dot and dotdot. + * + * Returns: + * + * 0 empty + * -ENOTEMPTY not empty + * -ve other error + * + */ +static int mdd_dir_is_empty(const struct lu_env *env, + struct mdd_object *dir) +{ + struct dt_it *it; + struct dt_object *obj; + struct dt_it_ops *iops; + int result; + ENTRY; + + obj = mdd_object_child(dir); + iops = &obj->do_index_ops->dio_it; + it = iops->init(env, obj, 0); + if (it != NULL) { + result = iops->get(env, it, (const void *)""); + if (result > 0) { + int i; + for (result = 0, i = 0; result == 0 && i < 3; ++i) + result = iops->next(env, it); + if (result == 0) + result = -ENOTEMPTY; + else if (result == +1) + result = 0; + } else if (result == 0) + /* + * Huh? Index contains no zero key? + */ + result = -EIO; + + iops->put(env, it); + iops->fini(env, it); + } else + result = -ENOMEM; + RETURN(result); +} + +int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj, + struct mdd_object *cobj, struct md_attr *ma) +{ + struct dt_object *dt_cobj = mdd_object_child(cobj); + int rc = 0; + ENTRY; + + rc = mdd_may_delete(env, pobj, cobj, + S_ISDIR(ma->ma_attr.la_mode), 1); + if (rc) + RETURN(rc); + + if (S_ISDIR(mdd_object_type(cobj))) { + if (dt_try_as_dir(env, dt_cobj)) + rc = mdd_dir_is_empty(env, cobj); + else + rc = -ENOTDIR; + } + + RETURN(rc); +} + +static int mdd_unlink(const struct lu_env *env, + struct md_object *pobj, struct md_object *cobj, + const char *name, struct md_attr *ma) +{ + struct mdd_device *mdd = mdo2mdd(pobj); + struct mdd_object *mdd_pobj = md2mdd_obj(pobj); + struct mdd_object *mdd_cobj = md2mdd_obj(cobj); + struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; + struct thandle *handle; + int rc, is_dir; + ENTRY; + + mdd_txn_param_build(env, MDD_TXN_UNLINK_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + mdd_lock2(env, mdd_pobj, mdd_cobj); + + rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma); + if (rc) + GOTO(cleanup, rc); + + is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu)); + rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle, + mdd_object_capa(env, mdd_pobj)); + if (rc) + GOTO(cleanup, rc); + + mdd_ref_del_internal(env, mdd_cobj, handle); + *la_copy = ma->ma_attr; + if (is_dir) { + /* unlink dot */ + mdd_ref_del_internal(env, mdd_cobj, handle); + } else { + la_copy->la_valid = LA_CTIME; + rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0); + if (rc) + GOTO(cleanup, rc); + } + + la_copy->la_valid = LA_CTIME | LA_MTIME; + rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0); + if (rc) + GOTO(cleanup, rc); + + rc = mdd_finish_unlink(env, mdd_cobj, ma, handle); + + if (rc == 0) + obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, + strlen("unlinked"), "unlinked", 0, + NULL, NULL); + +cleanup: + mdd_unlock2(env, mdd_pobj, mdd_cobj); + mdd_trans_stop(env, mdd, rc, handle); + RETURN(rc); +} + +/* + * Partial operation. Be aware, this is called with write lock taken, so we use + * locksless version of __mdd_lookup() here. + */ +static int mdd_ni_sanity_check(const struct lu_env *env, + struct md_object *pobj, + const char *name, + const struct lu_fid *fid) +{ + struct mdd_object *obj = md2mdd_obj(pobj); +#if 0 + int rc; +#endif + ENTRY; + + /* EEXIST check */ + if (mdd_is_dead_obj(obj)) + RETURN(-ENOENT); + + /* The exist of the name will be checked in _index_insert. */ +#if 0 + rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC); + if (rc != -ENOENT) + RETURN(rc ? : -EEXIST); + else + RETURN(0); +#endif + RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC)); +} + +static int mdd_name_insert(const struct lu_env *env, + struct md_object *pobj, + const char *name, const struct lu_fid *fid, + int isdir) +{ + struct mdd_object *mdd_obj = md2mdd_obj(pobj); + struct thandle *handle; + int rc; + ENTRY; + + mdd_txn_param_build(env, MDD_TXN_INDEX_INSERT_OP); + handle = mdd_trans_start(env, mdo2mdd(pobj)); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + mdd_write_lock(env, mdd_obj); + rc = mdd_ni_sanity_check(env, pobj, name, fid); + if (rc) + GOTO(out_unlock, rc); + + rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle, + BYPASS_CAPA); + +out_unlock: + mdd_write_unlock(env, mdd_obj); + + mdd_trans_stop(env, mdo2mdd(pobj), rc, handle); + RETURN(rc); +} + +/* + * Be aware, this is called with write lock taken, so we use locksless version + * of __mdd_lookup() here. + */ +static int mdd_nr_sanity_check(const struct lu_env *env, + struct md_object *pobj, + const char *name) +{ + struct mdd_object *obj = md2mdd_obj(pobj); +#if 0 + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_fid *fid = &info->mti_fid; + int rc; +#endif + ENTRY; + + /* EEXIST check */ + if (mdd_is_dead_obj(obj)) + RETURN(-ENOENT); + + /* The exist of the name will be checked in _index_delete. */ +#if 0 + rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC); + RETURN(rc); +#endif + RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC)); +} + +static int mdd_name_remove(const struct lu_env *env, + struct md_object *pobj, + const char *name, int is_dir) +{ + struct mdd_device *mdd = mdo2mdd(pobj); + struct mdd_object *mdd_obj = md2mdd_obj(pobj); + struct thandle *handle; + int rc; + ENTRY; + + mdd_txn_param_build(env, MDD_TXN_INDEX_DELETE_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + mdd_write_lock(env, mdd_obj); + rc = mdd_nr_sanity_check(env, pobj, name); + if (rc) + GOTO(out_unlock, rc); + + rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle, + BYPASS_CAPA); + +out_unlock: + mdd_write_unlock(env, mdd_obj); + + mdd_trans_stop(env, mdd, rc, handle); + RETURN(rc); +} +static int mdd_rt_sanity_check(const struct lu_env *env, + struct mdd_object *tgt_pobj, + struct mdd_object *tobj, + const struct lu_fid *sfid, + const char *name, struct md_attr *ma) +{ + int rc, src_is_dir; + ENTRY; + + /* EEXIST check */ + if (mdd_is_dead_obj(tgt_pobj)) + RETURN(-ENOENT); + + src_is_dir = S_ISDIR(ma->ma_attr.la_mode); + if (tobj) { + rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1); + if (!rc && S_ISDIR(mdd_object_type(tobj)) && + mdd_dir_is_empty(env, tobj)) + RETURN(-ENOTEMPTY); + } else { + rc = mdd_may_create(env, tgt_pobj, NULL, 1); + } + + RETURN(rc); +} + +static int mdd_rename_tgt(const struct lu_env *env, + struct md_object *pobj, struct md_object *tobj, + const struct lu_fid *lf, const char *name, + struct md_attr *ma) +{ + struct mdd_device *mdd = mdo2mdd(pobj); + struct mdd_object *mdd_tpobj = md2mdd_obj(pobj); + struct mdd_object *mdd_tobj = md2mdd_obj(tobj); + struct thandle *handle; + int rc; + ENTRY; + + mdd_txn_param_build(env, MDD_TXN_RENAME_TGT_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + if (mdd_tobj) + mdd_lock2(env, mdd_tpobj, mdd_tobj); + else + mdd_write_lock(env, mdd_tpobj); + + /*TODO rename sanity checking*/ + rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma); + if (rc) + GOTO(cleanup, rc); + + /* if rename_tgt is called then we should just re-insert name with + * correct fid, no need to dec/inc parent nlink if obj is dir */ + rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA); + if (rc) + GOTO(cleanup, rc); + + rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle, + BYPASS_CAPA); + if (rc) + GOTO(cleanup, rc); + + if (tobj && lu_object_exists(&tobj->mo_lu)) + mdd_ref_del_internal(env, mdd_tobj, handle); +cleanup: + if (tobj) + mdd_unlock2(env, mdd_tpobj, mdd_tobj); + else + mdd_write_unlock(env, mdd_tpobj); + mdd_trans_stop(env, mdd, rc, handle); + RETURN(rc); +} + +/* + * The permission has been checked when obj created, + * no need check again. + */ +static int mdd_cd_sanity_check(const struct lu_env *env, + struct mdd_object *obj) +{ + int rc = 0; + ENTRY; + + /* EEXIST check */ + if (!obj || mdd_is_dead_obj(obj)) + RETURN(-ENOENT); + +#if 0 + mdd_read_lock(env, obj); + rc = mdd_permission_internal(env, obj, MAY_WRITE); + mdd_read_unlock(env, obj); +#endif + + RETURN(rc); + +} + +static int mdd_create_data(const struct lu_env *env, + struct md_object *pobj, struct md_object *cobj, + const struct md_create_spec *spec, + struct md_attr *ma) +{ + struct mdd_device *mdd = mdo2mdd(cobj); + struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */ + struct mdd_object *son = md2mdd_obj(cobj); + struct lu_attr *attr = &ma->ma_attr; + struct lov_mds_md *lmm = NULL; + int lmm_size = 0; + struct thandle *handle; + int rc; + ENTRY; + + rc = mdd_cd_sanity_check(env, son); + if (rc) + RETURN(rc); + + if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE || + !(spec->sp_cr_flags & FMODE_WRITE)) + RETURN(0); + rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, + attr); + if (rc) + RETURN(rc); + + mdd_txn_param_build(env, MDD_TXN_CREATE_DATA_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(rc = PTR_ERR(handle)); + + /* + * XXX: Setting the lov ea is not locked but setting the attr is locked? + */ + + /* Replay creates has objects already */ + if (spec->u.sp_ea.no_lov_create) { + CDEBUG(D_INFO, "we already have lov ea\n"); + rc = mdd_lov_set_md(env, mdd_pobj, son, + (struct lov_mds_md *)spec->u.sp_ea.eadata, + spec->u.sp_ea.eadatalen, handle, 0); + } else + rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, + lmm_size, handle, 0); + + if (rc == 0) + rc = mdd_attr_get_internal_locked(env, son, ma); + + /* Finish mdd_lov_create() stuff. */ + mdd_lov_create_finish(env, mdd, rc); + mdd_trans_stop(env, mdd, rc, handle); + if (lmm) + OBD_FREE(lmm, lmm_size); + RETURN(rc); +} + +static int +__mdd_lookup(const struct lu_env *env, struct md_object *pobj, + const char *name, const struct lu_fid* fid, int mask) +{ + struct mdd_object *mdd_obj = md2mdd_obj(pobj); + struct dt_object *dir = mdd_object_child(mdd_obj); + struct dt_rec *rec = (struct dt_rec *)fid; + const struct dt_key *key = (const struct dt_key *)name; + int rc; + ENTRY; + + if (mdd_is_dead_obj(mdd_obj)) + RETURN(-ESTALE); + + rc = lu_object_exists(mdd2lu_obj(mdd_obj)); + if (rc == 0) + RETURN(-ESTALE); + else if (rc < 0) { + CERROR("Object "DFID" locates on remote server\n", + PFID(mdo2fid(mdd_obj))); + LBUG(); + } + +#if 0 + if (mask == MAY_EXEC) + rc = mdd_exec_permission_lite(env, mdd_obj); + else +#endif + rc = mdd_permission_internal(env, mdd_obj, mask); + if (rc) + RETURN(rc); + + if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) + rc = dir->do_index_ops->dio_lookup(env, dir, rec, key, + mdd_object_capa(env, mdd_obj)); + else + rc = -ENOTDIR; + + RETURN(rc); +} + +int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, + struct mdd_object *child, struct md_attr *ma, + struct thandle *handle) +{ + int rc; + ENTRY; + + /* update attributes for child. + * FIXME: + * (1) the valid bits should be converted between Lustre and Linux; + * (2) maybe, the child attributes should be set in OSD when creation. + */ + + rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0); + if (rc != 0) + RETURN(rc); + + if (S_ISDIR(ma->ma_attr.la_mode)) { + /* add . and .. for newly created dir */ + mdd_ref_add_internal(env, child, handle); + rc = __mdd_index_insert_only(env, child, mdo2fid(child), + dot, handle, BYPASS_CAPA); + if (rc == 0) { + rc = __mdd_index_insert_only(env, child, pfid, + dotdot, handle, + BYPASS_CAPA); + if (rc != 0) { + int rc2; + + rc2 = __mdd_index_delete(env, child, dot, 0, + handle, BYPASS_CAPA); + if (rc2 != 0) + CERROR("Failure to cleanup after dotdot" + " creation: %d (%d)\n", rc2, rc); + else + mdd_ref_del_internal(env, child, handle); + } + } + } + RETURN(rc); +} + +static int mdd_create_sanity_check(const struct lu_env *env, + struct md_object *pobj, + const char *name, struct md_attr *ma) +{ + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_attr *la = &info->mti_la; + struct lu_fid *fid = &info->mti_fid; + struct mdd_object *obj = md2mdd_obj(pobj); + int rc; + ENTRY; + + /* EEXIST check */ + if (mdd_is_dead_obj(obj)) + RETURN(-ENOENT); + + /* + * Check if the name already exist, though it will be checked + * in _index_insert also, for avoiding rolling back if exists + * _index_insert. + */ + rc = __mdd_lookup_locked(env, pobj, name, fid, + MAY_WRITE | MAY_EXEC); + if (rc != -ENOENT) + RETURN(rc ? : -EEXIST); + + /* sgid check */ + mdd_read_lock(env, obj); + rc = mdd_la_get(env, obj, la, BYPASS_CAPA); + mdd_read_unlock(env, obj); + if (rc != 0) + RETURN(rc); + + if (la->la_mode & S_ISGID) { + ma->ma_attr.la_gid = la->la_gid; + if (S_ISDIR(ma->ma_attr.la_mode)) { + ma->ma_attr.la_mode |= S_ISGID; + ma->ma_attr.la_valid |= LA_MODE; + } + } + + switch (ma->ma_attr.la_mode & S_IFMT) { + case S_IFREG: + case S_IFDIR: + case S_IFLNK: + case S_IFCHR: + case S_IFBLK: + case S_IFIFO: + case S_IFSOCK: + rc = 0; + break; + default: + rc = -EINVAL; + break; + } + RETURN(rc); +} + +/* + * Create object and insert it into namespace. + */ +static int mdd_create(const struct lu_env *env, + struct md_object *pobj, const char *name, + struct md_object *child, + struct md_create_spec *spec, + struct md_attr* ma) +{ + struct mdd_device *mdd = mdo2mdd(pobj); + struct mdd_object *mdd_pobj = md2mdd_obj(pobj); + struct mdd_object *son = md2mdd_obj(child); + struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; + struct lu_attr *attr = &ma->ma_attr; + struct lov_mds_md *lmm = NULL; + struct thandle *handle; + int rc, created = 0, inserted = 0, lmm_size = 0; + struct timeval start; + ENTRY; + + mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE); + /* + * Two operations have to be performed: + * + * - allocation of new object (->do_create()), and + * + * - insertion into parent index (->dio_insert()). + * + * Due to locking, operation order is not important, when both are + * successful, *but* error handling cases are quite different: + * + * - if insertion is done first, and following object creation fails, + * insertion has to be rolled back, but this operation might fail + * also leaving us with dangling index entry. + * + * - if creation is done first, is has to be undone if insertion + * fails, leaving us with leaked space, which is neither good, nor + * fatal. + * + * It seems that creation-first is simplest solution, but it is + * sub-optimal in the frequent + * + * $ mkdir foo + * $ mkdir foo + * + * case, because second mkdir is bound to create object, only to + * destroy it immediately. + * + * To avoid this follow local file systems that do double lookup: + * + * 0. lookup -> -EEXIST (mdd_create_sanity_check()) + * + * 1. create (mdd_object_create_internal()) + * + * 2. insert (__mdd_index_insert(), lookup again) + */ + + /* sanity checks before big job */ + rc = mdd_create_sanity_check(env, pobj, name, ma); + if (rc) + RETURN(rc); + + /* no RPC inside the transaction, so OST objects should be created at + * first */ + if (S_ISREG(attr->la_mode)) { + rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, + spec, attr); + if (rc) + RETURN(rc); + } + + mdd_txn_param_build(env, MDD_TXN_MKDIR_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + mdd_write_lock(env, mdd_pobj); + + /* + * XXX check that link can be added to the parent in mkdir case. + */ + + mdd_write_lock(env, son); + rc = mdd_object_create_internal(env, son, ma, handle); + if (rc) { + mdd_write_unlock(env, son); + GOTO(cleanup, rc); + } + + created = 1; + +#ifdef CONFIG_FS_POSIX_ACL + rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle); + if (rc) { + mdd_write_unlock(env, son); + GOTO(cleanup, rc); + } else { + ma->ma_attr.la_valid |= LA_MODE; + } +#endif + + rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), + son, ma, handle); + mdd_write_unlock(env, son); + if (rc) + /* + * Object has no links, so it will be destroyed when last + * reference is released. (XXX not now.) + */ + GOTO(cleanup, rc); + + rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son), + name, S_ISDIR(attr->la_mode), handle, + mdd_object_capa(env, mdd_pobj)); + + if (rc) + GOTO(cleanup, rc); + + inserted = 1; + /* replay creates has objects already */ + if (spec->u.sp_ea.no_lov_create) { + CDEBUG(D_INFO, "we already have lov ea\n"); + rc = mdd_lov_set_md(env, mdd_pobj, son, + (struct lov_mds_md *)spec->u.sp_ea.eadata, + spec->u.sp_ea.eadatalen, handle, 0); + } else + rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, + lmm_size, handle, 0); + if (rc) { + CERROR("error on stripe info copy %d \n", rc); + GOTO(cleanup, rc); + } + + if (S_ISLNK(attr->la_mode)) { + struct dt_object *dt = mdd_object_child(son); + const char *target_name = spec->u.sp_symname; + int sym_len = strlen(target_name); + const struct lu_buf *buf; + loff_t pos = 0; + + buf = mdd_buf_get_const(env, target_name, sym_len); + rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle, + mdd_object_capa(env, son)); + if (rc == sym_len) + rc = 0; + else + rc = -EFAULT; + } + + *la_copy = ma->ma_attr; + la_copy->la_valid = LA_CTIME | LA_MTIME; + rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0); + if (rc) + GOTO(cleanup, rc); + + /* return attr back */ + rc = mdd_attr_get_internal_locked(env, son, ma); +cleanup: + if (rc && created) { + int rc2 = 0; + + if (inserted) { + rc2 = __mdd_index_delete(env, mdd_pobj, name, + S_ISDIR(attr->la_mode), + handle, BYPASS_CAPA); + if (rc2) + CERROR("error can not cleanup destroy %d\n", + rc2); + } + if (rc2 == 0) { + mdd_write_lock(env, son); + mdd_ref_del_internal(env, son, handle); + mdd_write_unlock(env, son); + } + } + /* finish mdd_lov_create() stuff */ + mdd_lov_create_finish(env, mdd, rc); + if (lmm) + OBD_FREE(lmm, lmm_size); + mdd_write_unlock(env, mdd_pobj); + mdd_trans_stop(env, mdd, rc, handle); + mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE); + RETURN(rc); +} + + +static int mdd_rename_lock(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *src_pobj, + struct mdd_object *tgt_pobj) +{ + int rc; + ENTRY; + + if (src_pobj == tgt_pobj) { + mdd_write_lock(env, src_pobj); + RETURN(0); + } + + /* compared the parent child relationship of src_p&tgt_p */ + if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){ + mdd_lock2(env, src_pobj, tgt_pobj); + RETURN(0); + } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) { + mdd_lock2(env, tgt_pobj, src_pobj); + RETURN(0); + } + + rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL); + if (rc < 0) + RETURN(rc); + + if (rc == 1) { + mdd_lock2(env, tgt_pobj, src_pobj); + RETURN(0); + } + + mdd_lock2(env, src_pobj, tgt_pobj); + RETURN(0); +} + +static void mdd_rename_unlock(const struct lu_env *env, + struct mdd_object *src_pobj, + struct mdd_object *tgt_pobj) +{ + mdd_write_unlock(env, src_pobj); + if (src_pobj != tgt_pobj) + mdd_write_unlock(env, tgt_pobj); +} + +static int mdd_rename_sanity_check(const struct lu_env *env, + struct mdd_object *src_pobj, + struct mdd_object *tgt_pobj, + const struct lu_fid *sfid, + int src_is_dir, + struct mdd_object *tobj) +{ + int rc; + ENTRY; + + if (mdd_is_dead_obj(src_pobj)) + RETURN(-ENOENT); + + /* The sobj maybe on the remote, check parent permission only here */ + rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC); + if (rc) + RETURN(rc); + + if (!tobj) { + rc = mdd_may_create(env, tgt_pobj, NULL, + (src_pobj != tgt_pobj)); + } else { + mdd_read_lock(env, tobj); + rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, + (src_pobj != tgt_pobj)); + if (rc == 0) + if (S_ISDIR(mdd_object_type(tobj)) + && mdd_dir_is_empty(env, tobj)) + rc = -ENOTEMPTY; + mdd_read_unlock(env, tobj); + } + + RETURN(rc); +} +/* src object can be remote that is why we use only fid and type of object */ +static int mdd_rename(const struct lu_env *env, + struct md_object *src_pobj, struct md_object *tgt_pobj, + const struct lu_fid *lf, const char *sname, + struct md_object *tobj, const char *tname, + struct md_attr *ma) +{ + struct mdd_device *mdd = mdo2mdd(src_pobj); + struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); + struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); + struct mdd_object *mdd_sobj = NULL; + struct mdd_object *mdd_tobj = NULL; + struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; + struct thandle *handle; + int is_dir; + int rc; + ENTRY; + + LASSERT(ma->ma_attr.la_mode & S_IFMT); + is_dir = S_ISDIR(ma->ma_attr.la_mode); + if (ma->ma_attr.la_valid & LA_FLAGS && + ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)) + RETURN(-EPERM); + + if (tobj) + mdd_tobj = md2mdd_obj(tobj); + + mdd_txn_param_build(env, MDD_TXN_RENAME_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + /* FIXME: Should consider tobj and sobj too in rename_lock. */ + rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj); + if (rc) + GOTO(cleanup_unlocked, rc); + + rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, + lf, is_dir, mdd_tobj); + if (rc) + GOTO(cleanup, rc); + + rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle, + mdd_object_capa(env, mdd_spobj)); + if (rc) + GOTO(cleanup, rc); + + /* + * Here tobj can be remote one, so we do index_delete unconditionally + * and -ENOENT is allowed. + */ + rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, + mdd_object_capa(env, mdd_tpobj)); + if (rc != 0 && rc != -ENOENT) + GOTO(cleanup, rc); + + rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, + mdd_object_capa(env, mdd_tpobj)); + if (rc) + GOTO(cleanup, rc); + + mdd_sobj = mdd_object_find(env, mdd, lf); + *la_copy = ma->ma_attr; + la_copy->la_valid = LA_CTIME; + if (mdd_sobj) { + /*XXX: how to update ctime for remote sobj? */ + rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle); + if (rc) + GOTO(cleanup, rc); + } + if (tobj && lu_object_exists(&tobj->mo_lu)) { + mdd_write_lock(env, mdd_tobj); + mdd_ref_del_internal(env, mdd_tobj, handle); + /* remove dot reference */ + if (is_dir) + mdd_ref_del_internal(env, mdd_tobj, handle); + + la_copy->la_valid = LA_CTIME; + rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0); + if (rc) + GOTO(cleanup, rc); + + rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); + mdd_write_unlock(env, mdd_tobj); + if (rc) + GOTO(cleanup, rc); + } + + la_copy->la_valid = LA_CTIME | LA_MTIME; + rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0); + if (rc) + GOTO(cleanup, rc); + + if (mdd_spobj != mdd_tpobj) { + la_copy->la_valid = LA_CTIME | LA_MTIME; + rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0); + } + +cleanup: + mdd_rename_unlock(env, mdd_spobj, mdd_tpobj); +cleanup_unlocked: + mdd_trans_stop(env, mdd, rc, handle); + if (mdd_sobj) + mdd_object_put(env, mdd_sobj); + RETURN(rc); +} + +struct md_dir_operations mdd_dir_ops = { + .mdo_is_subdir = mdd_is_subdir, + .mdo_lookup = mdd_lookup, + .mdo_create = mdd_create, + .mdo_rename = mdd_rename, + .mdo_link = mdd_link, + .mdo_unlink = mdd_unlink, + .mdo_name_insert = mdd_name_insert, + .mdo_name_remove = mdd_name_remove, + .mdo_rename_tgt = mdd_rename_tgt, + .mdo_create_data = mdd_create_data +}; diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index 78d7d2a..e40ed11 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -44,26 +44,12 @@ #include "mdd_internal.h" - -static const char dot[] = "."; -static const char dotdot[] = ".."; - -static inline int mdd_is_append(struct mdd_object *obj) -{ - return obj->mod_flags & APPEND_OBJ; -} - static inline void mdd_set_dead_obj(struct mdd_object *obj) { if (obj) obj->mod_flags |= DEAD_OBJ; } -static inline int mdd_is_dead_obj(struct mdd_object *obj) -{ - return obj && obj->mod_flags & DEAD_OBJ; -} - int mdd_la_get(const struct lu_env *env, struct mdd_object *obj, struct lu_attr *la, struct lustre_capa *capa) { @@ -97,8 +83,8 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj) RETURN(rc); } -static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle) +void mdd_ref_add_internal(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle) { struct dt_object *next; @@ -107,9 +93,8 @@ static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj, next->do_ops->do_ref_add(env, next, handle); } -static void -__mdd_ref_del(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle) +void mdd_ref_del_internal(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle) { struct dt_object *next = mdd_object_child(obj); ENTRY; @@ -120,105 +105,9 @@ __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj, EXIT; } - -/*Check whether it may create the cobj under the pobj*/ -static int mdd_may_create(const struct lu_env *env, - struct mdd_object *pobj, struct mdd_object *cobj, - int need_check) -{ - int rc = 0; - ENTRY; - - if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu)) - RETURN(-EEXIST); - - if (mdd_is_dead_obj(pobj)) - RETURN(-ENOENT); - - /*check pobj may create or not*/ - if (need_check) - rc = mdd_permission_internal(env, pobj, - MAY_WRITE | MAY_EXEC); - - RETURN(rc); -} - -/* - * It's inline, so penalty for filesystems that don't use sticky bit is - * minimal. - */ -static inline int mdd_is_sticky(const struct lu_env *env, - struct mdd_object *pobj, - struct mdd_object *cobj) -{ - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct md_ucred *uc = md_ucred(env); - int rc; - - rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA); - if (rc) { - return rc; - } else if (tmp_la->la_uid == uc->mu_fsuid) { - return 0; - } else { - rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA); - if (rc) - return rc; - else if (!(tmp_la->la_mode & S_ISVTX)) - return 0; - else if (tmp_la->la_uid == uc->mu_fsuid) - return 0; - else - return !mdd_capable(uc, CAP_FOWNER); - } -} - -/* Check whether it may delete the cobj under the pobj. */ -static int mdd_may_delete(const struct lu_env *env, - struct mdd_object *pobj, - struct mdd_object *cobj, - int is_dir, int need_check) -{ - struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj); - int rc = 0; - ENTRY; - - LASSERT(cobj); - - if (!lu_object_exists(&cobj->mod_obj.mo_lu)) - RETURN(-ENOENT); - - if (mdd_is_immutable(cobj) || mdd_is_append(cobj)) - RETURN(-EPERM); - - if (is_dir) { - if (!S_ISDIR(mdd_object_type(cobj))) - RETURN(-ENOTDIR); - - if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid)) - RETURN(-EBUSY); - - } else if (S_ISDIR(mdd_object_type(cobj))) { - RETURN(-EISDIR); - } - - if (pobj) { - if (mdd_is_dead_obj(pobj)) - RETURN(-ENOENT); - - if (mdd_is_sticky(env, pobj, cobj)) - RETURN(-EPERM); - - if (need_check) - rc = mdd_permission_internal(env, pobj, - MAY_WRITE | MAY_EXEC); - } - RETURN(rc); -} - /* get only inode attributes */ -static int __mdd_iattr_get(const struct lu_env *env, - struct mdd_object *mdd_obj, struct md_attr *ma) +int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, + struct md_attr *ma) { int rc = 0; ENTRY; @@ -270,7 +159,7 @@ static int mdd_attr_get_internal(const struct lu_env *env, ENTRY; if (ma->ma_need & MA_INODE) - rc = __mdd_iattr_get(env, mdd_obj, ma); + rc = mdd_iattr_get(env, mdd_obj, ma); if (rc == 0 && ma->ma_need & MA_LOV) { if (S_ISREG(mdd_object_type(mdd_obj)) || @@ -292,9 +181,8 @@ static int mdd_attr_get_internal(const struct lu_env *env, RETURN(rc); } -static inline int mdd_attr_get_internal_locked(const struct lu_env *env, - struct mdd_object *mdd_obj, - struct md_attr *ma) +int mdd_attr_get_internal_locked(const struct lu_env *env, + struct mdd_object *mdd_obj, struct md_attr *ma) { int rc; mdd_read_lock(env, mdd_obj); @@ -412,21 +300,8 @@ void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj) next->do_ops->do_read_unlock(env, next); } -static void mdd_lock2(const struct lu_env *env, - struct mdd_object *o0, struct mdd_object *o1) -{ - mdd_write_lock(env, o0); - mdd_write_lock(env, o1); -} - -static void mdd_unlock2(const struct lu_env *env, - struct mdd_object *o0, struct mdd_object *o1) -{ - mdd_write_unlock(env, o1); - mdd_write_unlock(env, o0); -} -static int __mdd_object_create(const struct lu_env *env, +int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, struct thandle *handle) { @@ -870,1416 +745,205 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj, RETURN(rc); } -static int __mdd_index_insert_only(const struct lu_env *env, - struct mdd_object *pobj, - const struct lu_fid *lf, - const char *name, struct thandle *th, - struct lustre_capa *capa) +/* partial unlink */ +static int mdd_ref_del(const struct lu_env *env, struct md_object *obj, + struct md_attr *ma) { + struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct mdd_device *mdd = mdo2mdd(obj); + struct thandle *handle; int rc; - struct dt_object *next = mdd_object_child(pobj); ENTRY; - if (dt_try_as_dir(env, next)) - rc = next->do_index_ops->dio_insert(env, next, - (struct dt_rec *)lf, - (struct dt_key *)name, th, capa); - else - rc = -ENOTDIR; - RETURN(rc); -} + mdd_txn_param_build(env, MDD_TXN_UNLINK_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(-ENOMEM); -/* insert new index, add reference if isdir, update times */ -static int __mdd_index_insert(const struct lu_env *env, - struct mdd_object *pobj, const struct lu_fid *lf, - const char *name, int isdir, struct thandle *th, - struct lustre_capa *capa) -{ - int rc; - struct dt_object *next = mdd_object_child(pobj); - ENTRY; + mdd_write_lock(env, mdd_obj); -#if 0 - struct lu_attr *la = &mdd_env_info(env)->mti_la; -#endif + rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma); + if (rc) + GOTO(cleanup, rc); - if (dt_try_as_dir(env, next)) - rc = next->do_index_ops->dio_insert(env, next, - (struct dt_rec *)lf, - (struct dt_key *)name, - th, capa); - else - rc = -ENOTDIR; + mdd_ref_del_internal(env, mdd_obj, handle); - if (rc == 0) { - if (isdir) - __mdd_ref_add(env, pobj, th); -#if 0 - la->la_valid = LA_MTIME|LA_CTIME; - la->la_atime = ma->ma_attr.la_atime; - la->la_ctime = ma->ma_attr.la_ctime; - rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0); -#endif + if (S_ISDIR(lu_object_attr(&obj->mo_lu))) { + /* unlink dot */ + mdd_ref_del_internal(env, mdd_obj, handle); } - return rc; -} -static int __mdd_index_delete(const struct lu_env *env, - struct mdd_object *pobj, const char *name, - int is_dir, struct thandle *handle, - struct lustre_capa *capa) -{ - int rc; - struct dt_object *next = mdd_object_child(pobj); - ENTRY; + rc = mdd_finish_unlink(env, mdd_obj, ma, handle); - if (dt_try_as_dir(env, next)) { - rc = next->do_index_ops->dio_delete(env, next, - (struct dt_key *)name, - handle, capa); - if (rc == 0 && is_dir) - __mdd_ref_del(env, pobj, handle); - } else - rc = -ENOTDIR; - RETURN(rc); + EXIT; +cleanup: + mdd_write_unlock(env, mdd_obj); + mdd_trans_stop(env, mdd, rc, handle); + return rc; } -static int mdd_link_sanity_check(const struct lu_env *env, - struct mdd_object *tgt_obj, - struct mdd_object *src_obj) +/* partial operation */ +static int mdd_oc_sanity_check(const struct lu_env *env, + struct mdd_object *obj, + struct md_attr *ma) { - int rc = 0; + int rc; ENTRY; - if (tgt_obj) { - rc = mdd_may_create(env, tgt_obj, NULL, 1); - if (rc) - RETURN(rc); + switch (ma->ma_attr.la_mode & S_IFMT) { + case S_IFREG: + case S_IFDIR: + case S_IFLNK: + case S_IFCHR: + case S_IFBLK: + case S_IFIFO: + case S_IFSOCK: + rc = 0; + break; + default: + rc = -EINVAL; + break; } - - if (S_ISDIR(mdd_object_type(src_obj))) - RETURN(-EPERM); - - if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj)) - RETURN(-EPERM); - RETURN(rc); } -static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, - struct md_object *src_obj, const char *name, - struct md_attr *ma) +static int mdd_object_create(const struct lu_env *env, + struct md_object *obj, + const struct md_create_spec *spec, + struct md_attr *ma) { - struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj); - struct mdd_object *mdd_sobj = md2mdd_obj(src_obj); - struct mdd_device *mdd = mdo2mdd(src_obj); - struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; + + struct mdd_device *mdd = mdo2mdd(obj); + struct mdd_object *mdd_obj = md2mdd_obj(obj); + const struct lu_fid *pfid = spec->u.sp_pfid; struct thandle *handle; int rc; ENTRY; - mdd_txn_param_build(env, MDD_TXN_LINK_OP); + rc = mdd_oc_sanity_check(env, mdd_obj, ma); + if (rc) + RETURN(rc); + + mdd_txn_param_build(env, MDD_TXN_OBJECT_CREATE_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); - mdd_lock2(env, mdd_tobj, mdd_sobj); - - rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj); + mdd_write_lock(env, mdd_obj); + rc = mdd_object_create_internal(env, mdd_obj, ma, handle); if (rc) - GOTO(out, rc); + GOTO(unlock, rc); - rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj), - name, handle, - mdd_object_capa(env, mdd_tobj)); - if (rc == 0) - __mdd_ref_add(env, mdd_sobj, handle); + if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) { + /* If creating the slave object, set slave EA here. */ + int lmv_size = spec->u.sp_ea.eadatalen; + struct lmv_stripe_md *lmv; - *la_copy = ma->ma_attr; - la_copy->la_valid = LA_CTIME; - rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0); - if (rc) - GOTO(out, rc); + lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata; + LASSERT(lmv != NULL && lmv_size > 0); + + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, lmv, lmv_size), + MDS_LMV_MD_NAME, 0, handle); + if (rc) + GOTO(unlock, rc); + pfid = spec->u.sp_ea.fid; - la_copy->la_valid = LA_CTIME | LA_MTIME; - rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0); + CDEBUG(D_INFO, "Set slave ea "DFID", eadatalen %d, rc %d\n", + PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc); + rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr, handle, 0); + } else { +#ifdef CONFIG_FS_POSIX_ACL + if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) { + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + + buf->lb_buf = (void *)spec->u.sp_ea.eadata; + buf->lb_len = spec->u.sp_ea.eadatalen; + if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) { + rc = __mdd_acl_init(env, mdd_obj, buf, + &ma->ma_attr.la_mode, + handle); + if (rc) + GOTO(unlock, rc); + else + ma->ma_attr.la_valid |= LA_MODE; + } + } +#endif + rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle); + } + EXIT; +unlock: + mdd_write_unlock(env, mdd_obj); + if (rc == 0) + rc = mdd_attr_get_internal_locked(env, mdd_obj, ma); -out: - mdd_unlock2(env, mdd_tobj, mdd_sobj); mdd_trans_stop(env, mdd, rc, handle); - RETURN(rc); + return rc; } /* - * Check that @dir contains no entries except (possibly) dot and dotdot. - * - * Returns: - * - * 0 empty - * -ENOTEMPTY not empty - * -ve other error - * + * XXX: if permission check is needed here? */ -static int mdd_dir_is_empty(const struct lu_env *env, - struct mdd_object *dir) +static int mdd_ref_add(const struct lu_env *env, + struct md_object *obj) { - struct dt_it *it; - struct dt_object *obj; - struct dt_it_ops *iops; - int result; + struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct mdd_device *mdd = mdo2mdd(obj); + struct thandle *handle; + int rc; ENTRY; - obj = mdd_object_child(dir); - iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, 0); - if (it != NULL) { - result = iops->get(env, it, (const void *)""); - if (result > 0) { - int i; - for (result = 0, i = 0; result == 0 && i < 3; ++i) - result = iops->next(env, it); - if (result == 0) - result = -ENOTEMPTY; - else if (result == +1) - result = 0; - } else if (result == 0) - /* - * Huh? Index contains no zero key? - */ - result = -EIO; - - iops->put(env, it); - iops->fini(env, it); - } else - result = -ENOMEM; - RETURN(result); + mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP); + handle = mdd_trans_start(env, mdd); + if (IS_ERR(handle)) + RETURN(-ENOMEM); + + mdd_write_lock(env, mdd_obj); + rc = mdd_link_sanity_check(env, NULL, mdd_obj); + if (!rc) + mdd_ref_add_internal(env, mdd_obj, handle); + mdd_write_unlock(env, mdd_obj); + + mdd_trans_stop(env, mdd, 0, handle); + + RETURN(0); } -/* return md_attr back, - * if it is last unlink then return lov ea + llog cookie*/ -int __mdd_object_kill(const struct lu_env *env, - struct mdd_object *obj, - struct md_attr *ma) +/* do NOT or the MAY_*'s, you'll get the weakest */ +static int accmode(struct mdd_object *mdd_obj, int flags) { - int rc = 0; - ENTRY; + int res = 0; - mdd_set_dead_obj(obj); - if (S_ISREG(mdd_object_type(obj))) { - /* Return LOV & COOKIES unconditionally here. We clean evth up. - * Caller must be ready for that. */ - rc = __mdd_lmm_get(env, obj, ma); - if ((ma->ma_valid & MA_LOV)) - rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj), - obj, ma); - } - RETURN(rc); +#if 0 + /* Sadly, NFSD reopens a file repeatedly during operation, so the + * "acc_mode = 0" allowance for newly-created files isn't honoured. + * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file + * owner can write to a file even if it is marked readonly to hide + * its brokenness. (bug 5781) */ + if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid) + return 0; +#endif + if (flags & FMODE_READ) + res = MAY_READ; + if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND)) + res |= MAY_WRITE; + if (flags & MDS_FMODE_EXEC) + res = MAY_EXEC; + return res; } -/* caller should take a lock before calling */ -static int __mdd_finish_unlink(const struct lu_env *env, - struct mdd_object *obj, struct md_attr *ma, - struct thandle *th) +static int mdd_open_sanity_check(const struct lu_env *env, + struct mdd_object *obj, int flag) { + struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; + int mode = accmode(obj, flag); int rc; ENTRY; - rc = __mdd_iattr_get(env, obj, ma); - if (rc == 0 && ma->ma_attr.la_nlink == 0) { - /* add new orphan and the object - * will be deleted during the object_put() */ - if (__mdd_orphan_add(env, obj, th) == 0) - set_bit(LU_OBJECT_ORPHAN, - &mdd2lu_obj(obj)->lo_header->loh_flags); - - if (obj->mod_count == 0) - rc = __mdd_object_kill(env, obj, ma); - } - RETURN(rc); -} - -static int mdd_unlink_sanity_check(const struct lu_env *env, - struct mdd_object *pobj, - struct mdd_object *cobj, - struct md_attr *ma) -{ - struct dt_object *dt_cobj = mdd_object_child(cobj); - int rc = 0; - ENTRY; - - rc = mdd_may_delete(env, pobj, cobj, - S_ISDIR(ma->ma_attr.la_mode), 1); - if (rc) - RETURN(rc); - - if (S_ISDIR(mdd_object_type(cobj))) { - if (dt_try_as_dir(env, dt_cobj)) - rc = mdd_dir_is_empty(env, cobj); - else - rc = -ENOTDIR; - } - - RETURN(rc); -} - -static int mdd_unlink(const struct lu_env *env, - struct md_object *pobj, struct md_object *cobj, - const char *name, struct md_attr *ma) -{ - struct mdd_device *mdd = mdo2mdd(pobj); - struct mdd_object *mdd_pobj = md2mdd_obj(pobj); - struct mdd_object *mdd_cobj = md2mdd_obj(cobj); - struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; - struct thandle *handle; - int rc, is_dir; - ENTRY; - - mdd_txn_param_build(env, MDD_TXN_UNLINK_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - mdd_lock2(env, mdd_pobj, mdd_cobj); - - rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma); - if (rc) - GOTO(cleanup, rc); - - is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu)); - rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle, - mdd_object_capa(env, mdd_pobj)); - if (rc) - GOTO(cleanup, rc); - - __mdd_ref_del(env, mdd_cobj, handle); - *la_copy = ma->ma_attr; - if (is_dir) { - /* unlink dot */ - __mdd_ref_del(env, mdd_cobj, handle); - } else { - la_copy->la_valid = LA_CTIME; - rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0); - if (rc) - GOTO(cleanup, rc); - } - - la_copy->la_valid = LA_CTIME | LA_MTIME; - rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0); - if (rc) - GOTO(cleanup, rc); - - rc = __mdd_finish_unlink(env, mdd_cobj, ma, handle); - - if (rc == 0) - obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, - strlen("unlinked"), "unlinked", 0, - NULL, NULL); - -cleanup: - mdd_unlock2(env, mdd_pobj, mdd_cobj); - mdd_trans_stop(env, mdd, rc, handle); - RETURN(rc); -} - -/* partial unlink */ -static int mdd_ref_del(const struct lu_env *env, struct md_object *obj, - struct md_attr *ma) -{ - struct mdd_object *mdd_obj = md2mdd_obj(obj); - struct mdd_device *mdd = mdo2mdd(obj); - struct thandle *handle; - int rc; - ENTRY; - - mdd_txn_param_build(env, MDD_TXN_UNLINK_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(-ENOMEM); - - mdd_write_lock(env, mdd_obj); - - rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma); - if (rc) - GOTO(cleanup, rc); - - __mdd_ref_del(env, mdd_obj, handle); - - if (S_ISDIR(lu_object_attr(&obj->mo_lu))) { - /* unlink dot */ - __mdd_ref_del(env, mdd_obj, handle); - } - - rc = __mdd_finish_unlink(env, mdd_obj, ma, handle); - - EXIT; -cleanup: - mdd_write_unlock(env, mdd_obj); - mdd_trans_stop(env, mdd, rc, handle); - return rc; -} - -static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, - const char *name, const struct lu_fid* fid, int mask); - -static int -__mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, - const char *name, const struct lu_fid* fid, int mask) -{ - struct mdd_object *mdd_obj = md2mdd_obj(pobj); - int rc; - - mdd_read_lock(env, mdd_obj); - rc = __mdd_lookup(env, pobj, name, fid, mask); - mdd_read_unlock(env, mdd_obj); - - return rc; -} - -static int mdd_lookup(const struct lu_env *env, - struct md_object *pobj, const char *name, - struct lu_fid* fid) -{ - int rc; - ENTRY; - rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC); - RETURN(rc); -} - -static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, - struct lu_fid *fid) -{ - return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0); -} - -/* - * return 1: if lf is the fid of the ancestor of p1; - * return 0: if not; - * - * return -EREMOTE: if remote object is found, in this - * case fid of remote object is saved to @pf; - * - * otherwise: values < 0, errors. - */ -static int mdd_is_parent(const struct lu_env *env, - struct mdd_device *mdd, - struct mdd_object *p1, - const struct lu_fid *lf, - struct lu_fid *pf) -{ - struct mdd_object *parent = NULL; - struct lu_fid *pfid; - int rc; - ENTRY; - - LASSERT(!lu_fid_eq(mdo2fid(p1), lf)); - pfid = &mdd_env_info(env)->mti_fid; - - /* Do not lookup ".." in root, they do not exist there. */ - if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid)) - RETURN(0); - - for(;;) { - rc = mdd_parent_fid(env, p1, pfid); - if (rc) - GOTO(out, rc); - if (lu_fid_eq(pfid, &mdd->mdd_root_fid)) - GOTO(out, rc = 0); - if (lu_fid_eq(pfid, lf)) - GOTO(out, rc = 1); - if (parent) - mdd_object_put(env, parent); - parent = mdd_object_find(env, mdd, pfid); - - /* cross-ref parent */ - if (parent == NULL) { - if (pf != NULL) - *pf = *pfid; - GOTO(out, rc = EREMOTE); - } else if (IS_ERR(parent)) - GOTO(out, rc = PTR_ERR(parent)); - p1 = parent; - } - EXIT; -out: - if (parent && !IS_ERR(parent)) - mdd_object_put(env, parent); - return rc; -} - -static int mdd_rename_lock(const struct lu_env *env, - struct mdd_device *mdd, - struct mdd_object *src_pobj, - struct mdd_object *tgt_pobj) -{ - int rc; - ENTRY; - - if (src_pobj == tgt_pobj) { - mdd_write_lock(env, src_pobj); - RETURN(0); - } - - /* compared the parent child relationship of src_p&tgt_p */ - if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){ - mdd_lock2(env, src_pobj, tgt_pobj); - RETURN(0); - } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) { - mdd_lock2(env, tgt_pobj, src_pobj); - RETURN(0); - } - - rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL); - if (rc < 0) - RETURN(rc); - - if (rc == 1) { - mdd_lock2(env, tgt_pobj, src_pobj); - RETURN(0); - } - - mdd_lock2(env, src_pobj, tgt_pobj); - RETURN(0); -} - -static void mdd_rename_unlock(const struct lu_env *env, - struct mdd_object *src_pobj, - struct mdd_object *tgt_pobj) -{ - mdd_write_unlock(env, src_pobj); - if (src_pobj != tgt_pobj) - mdd_write_unlock(env, tgt_pobj); -} - -static int mdd_rename_sanity_check(const struct lu_env *env, - struct mdd_object *src_pobj, - struct mdd_object *tgt_pobj, - const struct lu_fid *sfid, - int src_is_dir, - struct mdd_object *tobj) -{ - int rc; - ENTRY; - - if (mdd_is_dead_obj(src_pobj)) - RETURN(-ENOENT); - - /* The sobj maybe on the remote, check parent permission only here */ - rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC); - if (rc) - RETURN(rc); - - if (!tobj) { - rc = mdd_may_create(env, tgt_pobj, NULL, - (src_pobj != tgt_pobj)); - } else { - mdd_read_lock(env, tobj); - rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, - (src_pobj != tgt_pobj)); - if (rc == 0) - if (S_ISDIR(mdd_object_type(tobj)) - && mdd_dir_is_empty(env, tobj)) - rc = -ENOTEMPTY; - mdd_read_unlock(env, tobj); - } - - RETURN(rc); -} -/* src object can be remote that is why we use only fid and type of object */ -static int mdd_rename(const struct lu_env *env, - struct md_object *src_pobj, struct md_object *tgt_pobj, - const struct lu_fid *lf, const char *sname, - struct md_object *tobj, const char *tname, - struct md_attr *ma) -{ - struct mdd_device *mdd = mdo2mdd(src_pobj); - struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); - struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); - struct mdd_object *mdd_sobj = NULL; - struct mdd_object *mdd_tobj = NULL; - struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; - struct thandle *handle; - int is_dir; - int rc; - ENTRY; - - LASSERT(ma->ma_attr.la_mode & S_IFMT); - is_dir = S_ISDIR(ma->ma_attr.la_mode); - if (ma->ma_attr.la_valid & LA_FLAGS && - ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)) - RETURN(-EPERM); - - if (tobj) - mdd_tobj = md2mdd_obj(tobj); - - mdd_txn_param_build(env, MDD_TXN_RENAME_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - /* FIXME: Should consider tobj and sobj too in rename_lock. */ - rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj); - if (rc) - GOTO(cleanup_unlocked, rc); - - rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, - lf, is_dir, mdd_tobj); - if (rc) - GOTO(cleanup, rc); - - rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle, - mdd_object_capa(env, mdd_spobj)); - if (rc) - GOTO(cleanup, rc); - - /* - * Here tobj can be remote one, so we do index_delete unconditionally - * and -ENOENT is allowed. - */ - rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, - mdd_object_capa(env, mdd_tpobj)); - if (rc != 0 && rc != -ENOENT) - GOTO(cleanup, rc); - - rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, - mdd_object_capa(env, mdd_tpobj)); - if (rc) - GOTO(cleanup, rc); - - mdd_sobj = mdd_object_find(env, mdd, lf); - *la_copy = ma->ma_attr; - la_copy->la_valid = LA_CTIME; - if (mdd_sobj) { - /*XXX: how to update ctime for remote sobj? */ - rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle); - if (rc) - GOTO(cleanup, rc); - } - if (tobj && lu_object_exists(&tobj->mo_lu)) { - mdd_write_lock(env, mdd_tobj); - __mdd_ref_del(env, mdd_tobj, handle); - /* remove dot reference */ - if (is_dir) - __mdd_ref_del(env, mdd_tobj, handle); - - la_copy->la_valid = LA_CTIME; - rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0); - if (rc) - GOTO(cleanup, rc); - - rc = __mdd_finish_unlink(env, mdd_tobj, ma, handle); - mdd_write_unlock(env, mdd_tobj); - if (rc) - GOTO(cleanup, rc); - } - - la_copy->la_valid = LA_CTIME | LA_MTIME; - rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0); - if (rc) - GOTO(cleanup, rc); - - if (mdd_spobj != mdd_tpobj) { - la_copy->la_valid = LA_CTIME | LA_MTIME; - rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0); - } - -cleanup: - mdd_rename_unlock(env, mdd_spobj, mdd_tpobj); -cleanup_unlocked: - mdd_trans_stop(env, mdd, rc, handle); - if (mdd_sobj) - mdd_object_put(env, mdd_sobj); - RETURN(rc); -} - -static int -__mdd_lookup(const struct lu_env *env, struct md_object *pobj, - const char *name, const struct lu_fid* fid, int mask) -{ - struct mdd_object *mdd_obj = md2mdd_obj(pobj); - struct dt_object *dir = mdd_object_child(mdd_obj); - struct dt_rec *rec = (struct dt_rec *)fid; - const struct dt_key *key = (const struct dt_key *)name; - int rc; - ENTRY; - - if (mdd_is_dead_obj(mdd_obj)) - RETURN(-ESTALE); - - rc = lu_object_exists(mdd2lu_obj(mdd_obj)); - if (rc == 0) - RETURN(-ESTALE); - else if (rc < 0) { - CERROR("Object "DFID" locates on remote server\n", - PFID(mdo2fid(mdd_obj))); - LBUG(); - } - -#if 0 - if (mask == MAY_EXEC) - rc = mdd_exec_permission_lite(env, mdd_obj); - else -#endif - rc = mdd_permission_internal(env, mdd_obj, mask); - if (rc) - RETURN(rc); - - if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) - rc = dir->do_index_ops->dio_lookup(env, dir, rec, key, - mdd_object_capa(env, mdd_obj)); - else - rc = -ENOTDIR; - - RETURN(rc); -} - -/* - * No permission check is needed. - * - * returns 1: if fid is ancestor of @mo; - * returns 0: if fid is not a ancestor of @mo; - * - * returns EREMOTE if remote object is found, fid of remote object is saved to - * @fid; - * - * returns < 0: if error - */ -static int mdd_is_subdir(const struct lu_env *env, - struct md_object *mo, const struct lu_fid *fid, - struct lu_fid *sfid) -{ - struct mdd_device *mdd = mdo2mdd(mo); - int rc; - ENTRY; - - if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo)))) - RETURN(0); - - rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid); - - RETURN(rc); -} - -static int __mdd_object_initialize(const struct lu_env *env, - const struct lu_fid *pfid, - struct mdd_object *child, - struct md_attr *ma, struct thandle *handle) -{ - int rc; - ENTRY; - - /* update attributes for child. - * FIXME: - * (1) the valid bits should be converted between Lustre and Linux; - * (2) maybe, the child attributes should be set in OSD when creation. - */ - - rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0); - if (rc != 0) - RETURN(rc); - - if (S_ISDIR(ma->ma_attr.la_mode)) { - /* add . and .. for newly created dir */ - __mdd_ref_add(env, child, handle); - rc = __mdd_index_insert_only(env, child, mdo2fid(child), - dot, handle, BYPASS_CAPA); - if (rc == 0) { - rc = __mdd_index_insert_only(env, child, pfid, - dotdot, handle, - BYPASS_CAPA); - if (rc != 0) { - int rc2; - - rc2 = __mdd_index_delete(env, child, dot, 0, - handle, BYPASS_CAPA); - if (rc2 != 0) - CERROR("Failure to cleanup after dotdot" - " creation: %d (%d)\n", rc2, rc); - else - __mdd_ref_del(env, child, handle); - } - } - } - RETURN(rc); -} - -/* - * The permission has been checked when obj created, - * no need check again. - */ -static int mdd_cd_sanity_check(const struct lu_env *env, - struct mdd_object *obj) -{ - int rc = 0; - ENTRY; - - /* EEXIST check */ - if (!obj || mdd_is_dead_obj(obj)) - RETURN(-ENOENT); - -#if 0 - mdd_read_lock(env, obj); - rc = mdd_permission_internal(env, obj, MAY_WRITE); - mdd_read_unlock(env, obj); -#endif - - RETURN(rc); - -} - -static int mdd_create_data(const struct lu_env *env, - struct md_object *pobj, struct md_object *cobj, - const struct md_create_spec *spec, - struct md_attr *ma) -{ - struct mdd_device *mdd = mdo2mdd(cobj); - struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */ - struct mdd_object *son = md2mdd_obj(cobj); - struct lu_attr *attr = &ma->ma_attr; - struct lov_mds_md *lmm = NULL; - int lmm_size = 0; - struct thandle *handle; - int rc; - ENTRY; - - rc = mdd_cd_sanity_check(env, son); - if (rc) - RETURN(rc); - - if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE || - !(spec->sp_cr_flags & FMODE_WRITE)) - RETURN(0); - rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, - attr); - if (rc) - RETURN(rc); - - mdd_txn_param_build(env, MDD_TXN_CREATE_DATA_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(rc = PTR_ERR(handle)); - - /* - * XXX: Setting the lov ea is not locked but setting the attr is locked? - */ - - /* Replay creates has objects already */ - if (spec->u.sp_ea.no_lov_create) { - CDEBUG(D_INFO, "we already have lov ea\n"); - rc = mdd_lov_set_md(env, mdd_pobj, son, - (struct lov_mds_md *)spec->u.sp_ea.eadata, - spec->u.sp_ea.eadatalen, handle, 0); - } else - rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, - lmm_size, handle, 0); - - if (rc == 0) - rc = mdd_attr_get_internal_locked(env, son, ma); - - /* Finish mdd_lov_create() stuff. */ - mdd_lov_create_finish(env, mdd, rc); - mdd_trans_stop(env, mdd, rc, handle); - if (lmm) - OBD_FREE(lmm, lmm_size); - RETURN(rc); -} - -static int mdd_create_sanity_check(const struct lu_env *env, - struct md_object *pobj, - const char *name, struct md_attr *ma) -{ - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_attr *la = &info->mti_la; - struct lu_fid *fid = &info->mti_fid; - struct mdd_object *obj = md2mdd_obj(pobj); - int rc; - ENTRY; - - /* EEXIST check */ - if (mdd_is_dead_obj(obj)) - RETURN(-ENOENT); - - /* - * Check if the name already exist, though it will be checked - * in _index_insert also, for avoiding rolling back if exists - * _index_insert. - */ - rc = __mdd_lookup_locked(env, pobj, name, fid, - MAY_WRITE | MAY_EXEC); - if (rc != -ENOENT) - RETURN(rc ? : -EEXIST); - - /* sgid check */ - mdd_read_lock(env, obj); - rc = mdd_la_get(env, obj, la, BYPASS_CAPA); - mdd_read_unlock(env, obj); - if (rc != 0) - RETURN(rc); - - if (la->la_mode & S_ISGID) { - ma->ma_attr.la_gid = la->la_gid; - if (S_ISDIR(ma->ma_attr.la_mode)) { - ma->ma_attr.la_mode |= S_ISGID; - ma->ma_attr.la_valid |= LA_MODE; - } - } - - switch (ma->ma_attr.la_mode & S_IFMT) { - case S_IFREG: - case S_IFDIR: - case S_IFLNK: - case S_IFCHR: - case S_IFBLK: - case S_IFIFO: - case S_IFSOCK: - rc = 0; - break; - default: - rc = -EINVAL; - break; - } - RETURN(rc); -} - -/* - * Create object and insert it into namespace. - */ -static int mdd_create(const struct lu_env *env, - struct md_object *pobj, const char *name, - struct md_object *child, - struct md_create_spec *spec, - struct md_attr* ma) -{ - struct mdd_device *mdd = mdo2mdd(pobj); - struct mdd_object *mdd_pobj = md2mdd_obj(pobj); - struct mdd_object *son = md2mdd_obj(child); - struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; - struct lu_attr *attr = &ma->ma_attr; - struct lov_mds_md *lmm = NULL; - struct thandle *handle; - int rc, created = 0, inserted = 0, lmm_size = 0; - struct timeval start; - ENTRY; - - mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE); - /* - * Two operations have to be performed: - * - * - allocation of new object (->do_create()), and - * - * - insertion into parent index (->dio_insert()). - * - * Due to locking, operation order is not important, when both are - * successful, *but* error handling cases are quite different: - * - * - if insertion is done first, and following object creation fails, - * insertion has to be rolled back, but this operation might fail - * also leaving us with dangling index entry. - * - * - if creation is done first, is has to be undone if insertion - * fails, leaving us with leaked space, which is neither good, nor - * fatal. - * - * It seems that creation-first is simplest solution, but it is - * sub-optimal in the frequent - * - * $ mkdir foo - * $ mkdir foo - * - * case, because second mkdir is bound to create object, only to - * destroy it immediately. - * - * To avoid this follow local file systems that do double lookup: - * - * 0. lookup -> -EEXIST (mdd_create_sanity_check()) - * - * 1. create (__mdd_object_create()) - * - * 2. insert (__mdd_index_insert(), lookup again) - */ - - /* sanity checks before big job */ - rc = mdd_create_sanity_check(env, pobj, name, ma); - if (rc) - RETURN(rc); - - /* no RPC inside the transaction, so OST objects should be created at - * first */ - if (S_ISREG(attr->la_mode)) { - rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, - spec, attr); - if (rc) - RETURN(rc); - } - - mdd_txn_param_build(env, MDD_TXN_MKDIR_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - mdd_write_lock(env, mdd_pobj); - - /* - * XXX check that link can be added to the parent in mkdir case. - */ - - mdd_write_lock(env, son); - rc = __mdd_object_create(env, son, ma, handle); - if (rc) { - mdd_write_unlock(env, son); - GOTO(cleanup, rc); - } - - created = 1; - -#ifdef CONFIG_FS_POSIX_ACL - rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle); - if (rc) { - mdd_write_unlock(env, son); - GOTO(cleanup, rc); - } else { - ma->ma_attr.la_valid |= LA_MODE; - } -#endif - - rc = __mdd_object_initialize(env, mdo2fid(mdd_pobj), - son, ma, handle); - mdd_write_unlock(env, son); - if (rc) - /* - * Object has no links, so it will be destroyed when last - * reference is released. (XXX not now.) - */ - GOTO(cleanup, rc); - - rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son), - name, S_ISDIR(attr->la_mode), handle, - mdd_object_capa(env, mdd_pobj)); - - if (rc) - GOTO(cleanup, rc); - - inserted = 1; - /* replay creates has objects already */ - if (spec->u.sp_ea.no_lov_create) { - CDEBUG(D_INFO, "we already have lov ea\n"); - rc = mdd_lov_set_md(env, mdd_pobj, son, - (struct lov_mds_md *)spec->u.sp_ea.eadata, - spec->u.sp_ea.eadatalen, handle, 0); - } else - rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, - lmm_size, handle, 0); - if (rc) { - CERROR("error on stripe info copy %d \n", rc); - GOTO(cleanup, rc); - } - - if (S_ISLNK(attr->la_mode)) { - struct dt_object *dt = mdd_object_child(son); - const char *target_name = spec->u.sp_symname; - int sym_len = strlen(target_name); - const struct lu_buf *buf; - loff_t pos = 0; - - buf = mdd_buf_get_const(env, target_name, sym_len); - rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle, - mdd_object_capa(env, son)); - if (rc == sym_len) - rc = 0; - else - rc = -EFAULT; - } - - *la_copy = ma->ma_attr; - la_copy->la_valid = LA_CTIME | LA_MTIME; - rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0); - if (rc) - GOTO(cleanup, rc); - - /* return attr back */ - rc = mdd_attr_get_internal_locked(env, son, ma); -cleanup: - if (rc && created) { - int rc2 = 0; - - if (inserted) { - rc2 = __mdd_index_delete(env, mdd_pobj, name, - S_ISDIR(attr->la_mode), - handle, BYPASS_CAPA); - if (rc2) - CERROR("error can not cleanup destroy %d\n", - rc2); - } - if (rc2 == 0) { - mdd_write_lock(env, son); - __mdd_ref_del(env, son, handle); - mdd_write_unlock(env, son); - } - } - /* finish mdd_lov_create() stuff */ - mdd_lov_create_finish(env, mdd, rc); - if (lmm) - OBD_FREE(lmm, lmm_size); - mdd_write_unlock(env, mdd_pobj); - mdd_trans_stop(env, mdd, rc, handle); - mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE); - RETURN(rc); -} - -/* partial operation */ -static int mdd_oc_sanity_check(const struct lu_env *env, - struct mdd_object *obj, - struct md_attr *ma) -{ - int rc; - ENTRY; - - switch (ma->ma_attr.la_mode & S_IFMT) { - case S_IFREG: - case S_IFDIR: - case S_IFLNK: - case S_IFCHR: - case S_IFBLK: - case S_IFIFO: - case S_IFSOCK: - rc = 0; - break; - default: - rc = -EINVAL; - break; - } - RETURN(rc); -} - -static int mdd_object_create(const struct lu_env *env, - struct md_object *obj, - const struct md_create_spec *spec, - struct md_attr *ma) -{ - - struct mdd_device *mdd = mdo2mdd(obj); - struct mdd_object *mdd_obj = md2mdd_obj(obj); - const struct lu_fid *pfid = spec->u.sp_pfid; - struct thandle *handle; - int rc; - ENTRY; - - rc = mdd_oc_sanity_check(env, mdd_obj, ma); - if (rc) - RETURN(rc); - - mdd_txn_param_build(env, MDD_TXN_OBJECT_CREATE_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - mdd_write_lock(env, mdd_obj); - rc = __mdd_object_create(env, mdd_obj, ma, handle); - if (rc) - GOTO(unlock, rc); - - if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) { - /* If creating the slave object, set slave EA here. */ - int lmv_size = spec->u.sp_ea.eadatalen; - struct lmv_stripe_md *lmv; - - lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata; - LASSERT(lmv != NULL && lmv_size > 0); - - rc = __mdd_xattr_set(env, mdd_obj, - mdd_buf_get_const(env, lmv, lmv_size), - MDS_LMV_MD_NAME, 0, handle); - if (rc) - GOTO(unlock, rc); - pfid = spec->u.sp_ea.fid; - - CDEBUG(D_INFO, "Set slave ea "DFID", eadatalen %d, rc %d\n", - PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc); - rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr, handle, 0); - } else { -#ifdef CONFIG_FS_POSIX_ACL - if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) { - struct lu_buf *buf = &mdd_env_info(env)->mti_buf; - - buf->lb_buf = (void *)spec->u.sp_ea.eadata; - buf->lb_len = spec->u.sp_ea.eadatalen; - if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) { - rc = __mdd_acl_init(env, mdd_obj, buf, - &ma->ma_attr.la_mode, - handle); - if (rc) - GOTO(unlock, rc); - else - ma->ma_attr.la_valid |= LA_MODE; - } - } -#endif - rc = __mdd_object_initialize(env, pfid, mdd_obj, ma, handle); - } - EXIT; -unlock: - mdd_write_unlock(env, mdd_obj); - if (rc == 0) - rc = mdd_attr_get_internal_locked(env, mdd_obj, ma); - - mdd_trans_stop(env, mdd, rc, handle); - return rc; -} - -/* - * Partial operation. Be aware, this is called with write lock taken, so we use - * locksless version of __mdd_lookup() here. - */ -static int mdd_ni_sanity_check(const struct lu_env *env, - struct md_object *pobj, - const char *name, - const struct lu_fid *fid) -{ - struct mdd_object *obj = md2mdd_obj(pobj); -#if 0 - int rc; -#endif - ENTRY; - - /* EEXIST check */ - if (mdd_is_dead_obj(obj)) - RETURN(-ENOENT); - - /* The exist of the name will be checked in _index_insert. */ -#if 0 - rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC); - if (rc != -ENOENT) - RETURN(rc ? : -EEXIST); - else - RETURN(0); -#endif - RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC)); -} - -static int mdd_name_insert(const struct lu_env *env, - struct md_object *pobj, - const char *name, const struct lu_fid *fid, - int isdir) -{ - struct mdd_object *mdd_obj = md2mdd_obj(pobj); - struct thandle *handle; - int rc; - ENTRY; - - mdd_txn_param_build(env, MDD_TXN_INDEX_INSERT_OP); - handle = mdd_trans_start(env, mdo2mdd(pobj)); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - mdd_write_lock(env, mdd_obj); - rc = mdd_ni_sanity_check(env, pobj, name, fid); - if (rc) - GOTO(out_unlock, rc); - - rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle, - BYPASS_CAPA); - -out_unlock: - mdd_write_unlock(env, mdd_obj); - - mdd_trans_stop(env, mdo2mdd(pobj), rc, handle); - RETURN(rc); -} - -/* - * Be aware, this is called with write lock taken, so we use locksless version - * of __mdd_lookup() here. - */ -static int mdd_nr_sanity_check(const struct lu_env *env, - struct md_object *pobj, - const char *name) -{ - struct mdd_object *obj = md2mdd_obj(pobj); -#if 0 - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_fid *fid = &info->mti_fid; - int rc; -#endif - ENTRY; - - /* EEXIST check */ - if (mdd_is_dead_obj(obj)) - RETURN(-ENOENT); - - /* The exist of the name will be checked in _index_delete. */ -#if 0 - rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC); - RETURN(rc); -#endif - RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC)); -} - -static int mdd_name_remove(const struct lu_env *env, - struct md_object *pobj, - const char *name, int is_dir) -{ - struct mdd_device *mdd = mdo2mdd(pobj); - struct mdd_object *mdd_obj = md2mdd_obj(pobj); - struct thandle *handle; - int rc; - ENTRY; - - mdd_txn_param_build(env, MDD_TXN_INDEX_DELETE_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - mdd_write_lock(env, mdd_obj); - rc = mdd_nr_sanity_check(env, pobj, name); - if (rc) - GOTO(out_unlock, rc); - - rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle, - BYPASS_CAPA); - -out_unlock: - mdd_write_unlock(env, mdd_obj); - - mdd_trans_stop(env, mdd, rc, handle); - RETURN(rc); -} - -static int mdd_rt_sanity_check(const struct lu_env *env, - struct mdd_object *tgt_pobj, - struct mdd_object *tobj, - const struct lu_fid *sfid, - const char *name, struct md_attr *ma) -{ - int rc, src_is_dir; - ENTRY; - - /* EEXIST check */ - if (mdd_is_dead_obj(tgt_pobj)) - RETURN(-ENOENT); - - src_is_dir = S_ISDIR(ma->ma_attr.la_mode); - if (tobj) { - rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1); - if (!rc && S_ISDIR(mdd_object_type(tobj)) && - mdd_dir_is_empty(env, tobj)) - RETURN(-ENOTEMPTY); - } else { - rc = mdd_may_create(env, tgt_pobj, NULL, 1); - } - - RETURN(rc); -} - -static int mdd_rename_tgt(const struct lu_env *env, - struct md_object *pobj, struct md_object *tobj, - const struct lu_fid *lf, const char *name, - struct md_attr *ma) -{ - struct mdd_device *mdd = mdo2mdd(pobj); - struct mdd_object *mdd_tpobj = md2mdd_obj(pobj); - struct mdd_object *mdd_tobj = md2mdd_obj(tobj); - struct thandle *handle; - int rc; - ENTRY; - - mdd_txn_param_build(env, MDD_TXN_RENAME_TGT_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - if (mdd_tobj) - mdd_lock2(env, mdd_tpobj, mdd_tobj); - else - mdd_write_lock(env, mdd_tpobj); - - /*TODO rename sanity checking*/ - rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma); - if (rc) - GOTO(cleanup, rc); - - /* if rename_tgt is called then we should just re-insert name with - * correct fid, no need to dec/inc parent nlink if obj is dir */ - rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA); - if (rc) - GOTO(cleanup, rc); - - rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle, - BYPASS_CAPA); - if (rc) - GOTO(cleanup, rc); - - if (tobj && lu_object_exists(&tobj->mo_lu)) - __mdd_ref_del(env, mdd_tobj, handle); -cleanup: - if (tobj) - mdd_unlock2(env, mdd_tpobj, mdd_tobj); - else - mdd_write_unlock(env, mdd_tpobj); - mdd_trans_stop(env, mdd, rc, handle); - RETURN(rc); -} - -/* - * XXX: if permission check is needed here? - */ -static int mdd_ref_add(const struct lu_env *env, - struct md_object *obj) -{ - struct mdd_object *mdd_obj = md2mdd_obj(obj); - struct mdd_device *mdd = mdo2mdd(obj); - struct thandle *handle; - int rc; - ENTRY; - - mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP); - handle = mdd_trans_start(env, mdd); - if (IS_ERR(handle)) - RETURN(-ENOMEM); - - mdd_write_lock(env, mdd_obj); - rc = mdd_link_sanity_check(env, NULL, mdd_obj); - if (!rc) - __mdd_ref_add(env, mdd_obj, handle); - mdd_write_unlock(env, mdd_obj); - - mdd_trans_stop(env, mdd, 0, handle); - - RETURN(0); -} - -/* do NOT or the MAY_*'s, you'll get the weakest */ -static int accmode(struct mdd_object *mdd_obj, int flags) -{ - int res = 0; - -#if 0 - /* Sadly, NFSD reopens a file repeatedly during operation, so the - * "acc_mode = 0" allowance for newly-created files isn't honoured. - * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file - * owner can write to a file even if it is marked readonly to hide - * its brokenness. (bug 5781) */ - if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid) - return 0; -#endif - if (flags & FMODE_READ) - res = MAY_READ; - if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND)) - res |= MAY_WRITE; - if (flags & MDS_FMODE_EXEC) - res = MAY_EXEC; - return res; -} - -static int mdd_open_sanity_check(const struct lu_env *env, - struct mdd_object *obj, int flag) -{ - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - int mode = accmode(obj, flag); - int rc; - ENTRY; - - /* EEXIST check */ - if (mdd_is_dead_obj(obj)) - RETURN(-ENOENT); + /* EEXIST check */ + if (mdd_is_dead_obj(obj)) + RETURN(-ENOENT); rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA); if (rc) @@ -2344,6 +1008,26 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj, return rc; } +/* return md_attr back, + * if it is last unlink then return lov ea + llog cookie*/ +int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma) +{ + int rc = 0; + ENTRY; + + mdd_set_dead_obj(obj); + if (S_ISREG(mdd_object_type(obj))) { + /* Return LOV & COOKIES unconditionally here. We clean evth up. + * Caller must be ready for that. */ + rc = __mdd_lmm_get(env, obj, ma); + if ((ma->ma_valid & MA_LOV)) + rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj), + obj, ma); + } + RETURN(rc); +} + /* * No permission check is needed. */ @@ -2358,10 +1042,10 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* release open count */ mdd_obj->mod_count --; - rc = __mdd_iattr_get(env, mdd_obj, ma); + rc = mdd_iattr_get(env, mdd_obj, ma); if (rc == 0 && mdd_obj->mod_count == 0) { if (ma->ma_attr.la_nlink == 0) - rc = __mdd_object_kill(env, mdd_obj, ma); + rc = mdd_object_kill(env, mdd_obj, ma); } mdd_write_unlock(env, mdd_obj); RETURN(rc); @@ -2414,19 +1098,6 @@ out_unlock: RETURN(rc); } -struct md_dir_operations mdd_dir_ops = { - .mdo_is_subdir = mdd_is_subdir, - .mdo_lookup = mdd_lookup, - .mdo_create = mdd_create, - .mdo_rename = mdd_rename, - .mdo_link = mdd_link, - .mdo_unlink = mdd_unlink, - .mdo_name_insert = mdd_name_insert, - .mdo_name_remove = mdd_name_remove, - .mdo_rename_tgt = mdd_rename_tgt, - .mdo_create_data = mdd_create_data -}; - struct md_object_operations mdd_obj_ops = { .moo_permission = mdd_permission, .moo_attr_get = mdd_attr_get, @@ -2444,4 +1115,3 @@ struct md_object_operations mdd_obj_ops = { .moo_readlink = mdd_readlink, .moo_capa_get = mdd_capa_get }; - diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 110637c..33fb538 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -109,13 +109,41 @@ int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj, void *md, int *md_size, const char *name); int mdd_la_get(const struct lu_env *env, struct mdd_object *obj, struct lu_attr *la, struct lustre_capa *capa); - -int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, - struct mdd_object *mdd_cobj, struct md_attr *ma); - int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o, const struct lu_attr *attr, struct thandle *handle, const int needacl); +int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma); +int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, + struct md_attr *ma); +int mdd_attr_get_internal_locked(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct md_attr *ma); +int mdd_object_create_internal(const struct lu_env *env, + struct mdd_object *obj, struct md_attr *ma, + struct thandle *handle); +int mdd_attr_set_internal_locked(const struct lu_env *env, + struct mdd_object *o, + const struct lu_attr *attr, + struct thandle *handle); +/* mdd_dir.c */ +int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj, + struct mdd_object *cobj, struct md_attr *ma); +int mdd_finish_unlink(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma, struct thandle *th); +int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, + struct mdd_object *child, struct md_attr *ma, + struct thandle *handle); +int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj, + struct mdd_object *src_obj); +void mdd_ref_add_internal(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle); +void mdd_ref_del_internal(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle); +/* mdd_lov.c */ +int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, + struct mdd_object *mdd_cobj, struct md_attr *ma); + int mdd_get_cookie_size(const struct lu_env *env, struct mdd_device *mdd, struct lov_mds_md *lmm); @@ -140,8 +168,6 @@ int __mdd_orphan_del(const struct lu_env *, struct mdd_object *, struct thandle *); int orph_index_init(const struct lu_env *env, struct mdd_device *mdd); void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd); -int __mdd_object_kill(const struct lu_env *, struct mdd_object *, - struct md_attr *); int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd); int mdd_procfs_init(struct mdd_device *mdd); @@ -317,6 +343,16 @@ static inline int mdd_is_immutable(struct mdd_object *obj) return obj->mod_flags & IMMUTE_OBJ; } +static inline int mdd_is_dead_obj(struct mdd_object *obj) +{ + return obj && obj->mod_flags & DEAD_OBJ; +} + +static inline int mdd_is_append(struct mdd_object *obj) +{ + return obj->mod_flags & APPEND_OBJ; +} + static inline struct lustre_capa *mdd_object_capa(const struct lu_env *env, const struct mdd_object *obj) { diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index dfa05ab..06bee79 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -125,7 +125,7 @@ static void orph_key_test_and_del(const struct lu_env *env, /* non-opened orphan, let's delete it */ struct md_attr *ma = &mdd_env_info(env)->mti_ma; CWARN("Found orphan!\n"); - __mdd_object_kill(env, mdo, ma); + mdd_object_kill(env, mdo, ma); /* TODO: now handle OST objects */ //mdd_ost_objects_destroy(env, ma); /* TODO: destroy index entry */ -- 1.8.3.1