From: wangdi Date: Sat, 27 Oct 2012 22:05:56 +0000 (-0700) Subject: LU-2216 mdt: remove obsolete DNE code X-Git-Tag: 2.3.54~10 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=54fe9796ec837698a27420c8a92d9493c733b6a9 LU-2216 mdt: remove obsolete DNE code 1. remove split checking and cross-ref code from DNE. 2. remove IAM code on ldiskfs and utils. 3. remove cmm directory. Change-Id: I0c81d753462863706e8918393369dde94a45030c Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/4353 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Andreas Dilger --- diff --git a/lustre/Makefile.in b/lustre/Makefile.in index b3f6f60..ae8c508 100644 --- a/lustre/Makefile.in +++ b/lustre/Makefile.in @@ -5,7 +5,7 @@ subdir-m += ptlrpc subdir-m += obdecho subdir-m += mgc -@SERVER_TRUE@subdir-m += mds ost mgs mdt cmm mdd ofd quota osp lod +@SERVER_TRUE@subdir-m += mds ost mgs mdt mdd ofd quota osp lod @CLIENT_TRUE@subdir-m += lov osc mdc lmv llite fld @LDISKFS_ENABLED_TRUE@subdir-m += osd-ldiskfs @ZFS_ENABLED_TRUE@subdir-m += osd-zfs diff --git a/lustre/autoMakefile.am b/lustre/autoMakefile.am index 728b4c2..dafe8c8 100644 --- a/lustre/autoMakefile.am +++ b/lustre/autoMakefile.am @@ -42,7 +42,7 @@ AUTOMAKE_OPTIONS = foreign ALWAYS_SUBDIRS = include lvfs obdclass ldlm ptlrpc obdecho \ mgc fid fld doc utils tests scripts autoconf contrib conf -SERVER_SUBDIRS = ost mds mgs mdt cmm mdd ofd osd-zfs osd-ldiskfs \ +SERVER_SUBDIRS = ost mds mgs mdt mdd ofd osd-zfs osd-ldiskfs \ quota osp lod target CLIENT_SUBDIRS = mdc lmv llite lclient lov osc diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index b771c53..9798be0 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -2520,8 +2520,6 @@ lustre/mds/Makefile lustre/mds/autoMakefile lustre/mdt/Makefile lustre/mdt/autoMakefile -lustre/cmm/Makefile -lustre/cmm/autoMakefile lustre/mdd/Makefile lustre/mdd/autoMakefile lustre/fld/Makefile diff --git a/lustre/cmm/Makefile.in b/lustre/cmm/Makefile.in deleted file mode 100644 index befc9ea5..0000000 --- a/lustre/cmm/Makefile.in +++ /dev/null @@ -1,6 +0,0 @@ -MODULES := cmm -cmm-objs := cmm_device.o cmm_object.o cmm_lproc.o mdc_device.o mdc_object.o - -@SPLIT_TRUE@cmm-objs += cmm_split.o - -@INCLUDE_RULES@ diff --git a/lustre/cmm/autoMakefile.am b/lustre/cmm/autoMakefile.am deleted file mode 100644 index e7355f8..0000000 --- a/lustre/cmm/autoMakefile.am +++ /dev/null @@ -1,42 +0,0 @@ -# -# GPL HEADER START -# -# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 only, -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License version 2 for more details (a copy is included -# in the LICENSE file that accompanied this code). -# -# You should have received a copy of the GNU General Public License -# version 2 along with this program; If not, see -# http://www.sun.com/software/products/lustre/docs/GPLv2.pdf -# -# Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, -# CA 95054 USA or visit www.sun.com if you need additional information or -# have any questions. -# -# GPL HEADER END -# - -# -# Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. -# Use is subject to license terms. -# - -# -# This file is part of Lustre, http://www.lustre.org/ -# Lustre is a trademark of Sun Microsystems, Inc. -# - -if MODULES -modulefs_DATA = cmm$(KMODEXT) -endif - -MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ -EXTRA_DIST = $(cmm-objs:%.o=%.c) cmm_internal.h mdc_internal.h diff --git a/lustre/cmm/cmm_device.c b/lustre/cmm/cmm_device.c deleted file mode 100644 index a3c9397..0000000 --- a/lustre/cmm/cmm_device.c +++ /dev/null @@ -1,627 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2011, Whamcloud, Inc. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/cmm_device.c - * - * Lustre Cluster Metadata Manager (cmm) - * - * Author: Mike Pershin - */ -/** - * \addtogroup cmm - * @{ - */ - -#define DEBUG_SUBSYSTEM S_MDS - -#include - -#include -#include -#include -#include -#include "cmm_internal.h" -#include "mdc_internal.h" - -struct obd_ops cmm_obd_device_ops = { - .o_owner = THIS_MODULE -}; - -static const struct lu_device_operations cmm_lu_ops; - -static inline int lu_device_is_cmm(struct lu_device *d) -{ - return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops); -} - -int cmm_root_get(const struct lu_env *env, struct md_device *md, - struct lu_fid *fid) -{ - struct cmm_device *cmm_dev = md2cmm_dev(md); - /* valid only on master MDS */ - if (cmm_dev->cmm_local_num == 0) - return cmm_child_ops(cmm_dev)->mdo_root_get(env, - cmm_dev->cmm_child, fid); - else - return -EINVAL; -} - -static int cmm_statfs(const struct lu_env *env, struct md_device *md, - struct obd_statfs *sfs) -{ - struct cmm_device *cmm_dev = md2cmm_dev(md); - int rc; - - ENTRY; - rc = cmm_child_ops(cmm_dev)->mdo_statfs(env, - cmm_dev->cmm_child, sfs); - RETURN (rc); -} - -static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md, - int *md_size, int *cookie_size) -{ - struct cmm_device *cmm_dev = md2cmm_dev(md); - int rc; - ENTRY; - rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child, - md_size, cookie_size); - RETURN(rc); -} - -static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md, - int mode , unsigned long timeout, __u32 alg, - struct lustre_capa_key *keys) -{ - struct cmm_device *cmm_dev = md2cmm_dev(md); - int rc; - ENTRY; - LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt); - rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child, - mode, timeout, alg, - keys); - RETURN(rc); -} - -static int cmm_update_capa_key(const struct lu_env *env, - struct md_device *md, - struct lustre_capa_key *key) -{ - struct cmm_device *cmm_dev = md2cmm_dev(md); - int rc; - ENTRY; - rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env, - cmm_dev->cmm_child, - key); - RETURN(rc); -} - -static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m, - int idx, void **h) -{ - struct cmm_device *cmm_dev = md2cmm_dev(m); - int rc; - ENTRY; - - rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child, - idx, h); - RETURN(rc); -} - -int cmm_iocontrol(const struct lu_env *env, struct md_device *m, - unsigned int cmd, int len, void *data) -{ - struct md_device *next = md2cmm_dev(m)->cmm_child; - int rc; - - ENTRY; - rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data); - RETURN(rc); -} - - -static const struct md_device_operations cmm_md_ops = { - .mdo_statfs = cmm_statfs, - .mdo_root_get = cmm_root_get, - .mdo_maxsize_get = cmm_maxsize_get, - .mdo_init_capa_ctxt = cmm_init_capa_ctxt, - .mdo_update_capa_key = cmm_update_capa_key, - .mdo_llog_ctxt_get = cmm_llog_ctxt_get, - .mdo_iocontrol = cmm_iocontrol, -}; - -extern struct lu_device_type mdc_device_type; -/** - * Init MDC. - */ -static int cmm_post_init_mdc(const struct lu_env *env, - struct cmm_device *cmm) -{ - int max_mdsize, max_cookiesize, rc; - struct mdc_device *mc, *tmp; - - /* get the max mdsize and cookiesize from lower layer */ - rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize, - &max_cookiesize); - if (rc) - RETURN(rc); - - cfs_spin_lock(&cmm->cmm_tgt_guard); - cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, - mc_linkage) { - cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize); - } - cfs_spin_unlock(&cmm->cmm_tgt_guard); - RETURN(rc); -} - -/* --- cmm_lu_operations --- */ -/* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */ -static int cmm_add_mdc(const struct lu_env *env, - struct cmm_device *cm, struct lustre_cfg *cfg) -{ - struct lu_device_type *ldt = &mdc_device_type; - char *p, *num = lustre_cfg_string(cfg, 2); - struct mdc_device *mc, *tmp; - struct lu_fld_target target; - struct lu_device *ld; - struct lu_device *cmm_lu = cmm2lu_dev(cm); - mdsno_t mdc_num; - struct lu_site *site = cmm2lu_dev(cm)->ld_site; - int rc; - ENTRY; - - /* find out that there is no such mdc */ - LASSERT(num); - mdc_num = simple_strtol(num, &p, 10); - if (*p) { - CERROR("Invalid index in lustre_cgf, offset 2\n"); - RETURN(-EINVAL); - } - - cfs_spin_lock(&cm->cmm_tgt_guard); - cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, - mc_linkage) { - if (mc->mc_num == mdc_num) { - cfs_spin_unlock(&cm->cmm_tgt_guard); - RETURN(-EEXIST); - } - } - cfs_spin_unlock(&cm->cmm_tgt_guard); - ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg); - if (IS_ERR(ld)) - RETURN(PTR_ERR(ld)); - - ld->ld_site = site; - - rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL); - if (rc) { - ldt->ldt_ops->ldto_device_free(env, ld); - RETURN(rc); - } - /* pass config to the just created MDC */ - rc = ld->ld_ops->ldo_process_config(env, ld, cfg); - if (rc) { - ldt->ldt_ops->ldto_device_fini(env, ld); - ldt->ldt_ops->ldto_device_free(env, ld); - RETURN(rc); - } - - cfs_spin_lock(&cm->cmm_tgt_guard); - cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, - mc_linkage) { - if (mc->mc_num == mdc_num) { - cfs_spin_unlock(&cm->cmm_tgt_guard); - ldt->ldt_ops->ldto_device_fini(env, ld); - ldt->ldt_ops->ldto_device_free(env, ld); - RETURN(-EEXIST); - } - } - mc = lu2mdc_dev(ld); - cfs_list_add_tail(&mc->mc_linkage, &cm->cmm_targets); - cm->cmm_tgt_count++; - cfs_spin_unlock(&cm->cmm_tgt_guard); - - lu_device_get(cmm_lu); - lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld); - - target.ft_srv = NULL; - target.ft_idx = mc->mc_num; - target.ft_exp = mc->mc_desc.cl_exp; - fld_client_add_target(cm->cmm_fld, &target); - - if (mc->mc_num == 0) { - /* this is mdt0 -> mc export, fld lookup need this export - to forward fld lookup request. */ - LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp); - lu_site2md(site)->ms_server_fld->lsf_control_exp = - mc->mc_desc.cl_exp; - } - /* Set max md size for the mdc. */ - rc = cmm_post_init_mdc(env, cm); - RETURN(rc); -} - -static void cmm_device_shutdown(const struct lu_env *env, - struct cmm_device *cm, - struct lustre_cfg *cfg) -{ - struct mdc_device *mc, *tmp; - ENTRY; - - /* Remove local target from FLD. */ - fld_client_del_target(cm->cmm_fld, cm->cmm_local_num); - - /* Finish all mdc devices. */ - cfs_spin_lock(&cm->cmm_tgt_guard); - cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) { - struct lu_device *ld_m = mdc2lu_dev(mc); - fld_client_del_target(cm->cmm_fld, mc->mc_num); - ld_m->ld_ops->ldo_process_config(env, ld_m, cfg); - } - cfs_spin_unlock(&cm->cmm_tgt_guard); - - /* remove upcall device*/ - md_upcall_fini(&cm->cmm_md_dev); - - EXIT; -} - -static int cmm_device_mount(const struct lu_env *env, - struct cmm_device *m, struct lustre_cfg *cfg) -{ - const char *index = lustre_cfg_string(cfg, 2); - char *p; - - LASSERT(index != NULL); - - m->cmm_local_num = simple_strtol(index, &p, 10); - if (*p) { - CERROR("Invalid index in lustre_cgf\n"); - RETURN(-EINVAL); - } - - RETURN(0); -} - -static int cmm_process_config(const struct lu_env *env, - struct lu_device *d, struct lustre_cfg *cfg) -{ - struct cmm_device *m = lu2cmm_dev(d); - struct lu_device *next = md2lu_dev(m->cmm_child); - int err; - ENTRY; - - switch(cfg->lcfg_command) { - case LCFG_ADD_MDC: - /* On first ADD_MDC add also local target. */ - if (!(m->cmm_flags & CMM_INITIALIZED)) { - struct lu_site *ls = cmm2lu_dev(m)->ld_site; - struct lu_fld_target target; - - target.ft_srv = lu_site2md(ls)->ms_server_fld; - target.ft_idx = m->cmm_local_num; - target.ft_exp = NULL; - - fld_client_add_target(m->cmm_fld, &target); - } - err = cmm_add_mdc(env, m, cfg); - - /* The first ADD_MDC can be counted as setup is finished. */ - if (!(m->cmm_flags & CMM_INITIALIZED)) - m->cmm_flags |= CMM_INITIALIZED; - - break; - case LCFG_SETUP: - { - /* lower layers should be set up at first */ - err = next->ld_ops->ldo_process_config(env, next, cfg); - if (err == 0) - err = cmm_device_mount(env, m, cfg); - break; - } - case LCFG_CLEANUP: - { - lu_dev_del_linkage(d->ld_site, d); - cmm_device_shutdown(env, m, cfg); - } - default: - err = next->ld_ops->ldo_process_config(env, next, cfg); - } - RETURN(err); -} - -static int cmm_recovery_complete(const struct lu_env *env, - struct lu_device *d) -{ - struct cmm_device *m = lu2cmm_dev(d); - struct lu_device *next = md2lu_dev(m->cmm_child); - int rc; - ENTRY; - rc = next->ld_ops->ldo_recovery_complete(env, next); - RETURN(rc); -} - -static int cmm_prepare(const struct lu_env *env, - struct lu_device *pdev, - struct lu_device *dev) -{ - struct cmm_device *cmm = lu2cmm_dev(dev); - struct lu_device *next = md2lu_dev(cmm->cmm_child); - int rc; - - ENTRY; - rc = next->ld_ops->ldo_prepare(env, dev, next); - RETURN(rc); -} - -static const struct lu_device_operations cmm_lu_ops = { - .ldo_object_alloc = cmm_object_alloc, - .ldo_process_config = cmm_process_config, - .ldo_recovery_complete = cmm_recovery_complete, - .ldo_prepare = cmm_prepare, -}; - -/* --- lu_device_type operations --- */ -int cmm_upcall(const struct lu_env *env, struct md_device *md, - enum md_upcall_event ev, void *data) -{ - int rc; - ENTRY; - - switch (ev) { - case MD_LOV_SYNC: - rc = cmm_post_init_mdc(env, md2cmm_dev(md)); - if (rc) - CERROR("can not init md size %d\n", rc); - /* fall through */ - default: - rc = md_do_upcall(env, md, ev, data); - } - RETURN(rc); -} - -static struct lu_device *cmm_device_free(const struct lu_env *env, - struct lu_device *d) -{ - struct cmm_device *m = lu2cmm_dev(d); - struct lu_device *next = md2lu_dev(m->cmm_child); - ENTRY; - - LASSERT(m->cmm_tgt_count == 0); - LASSERT(cfs_list_empty(&m->cmm_targets)); - if (m->cmm_fld != NULL) { - OBD_FREE_PTR(m->cmm_fld); - m->cmm_fld = NULL; - } - md_device_fini(&m->cmm_md_dev); - OBD_FREE_PTR(m); - RETURN(next); -} - -static struct lu_device *cmm_device_alloc(const struct lu_env *env, - struct lu_device_type *t, - struct lustre_cfg *cfg) -{ - struct lu_device *l; - struct cmm_device *m; - ENTRY; - - OBD_ALLOC_PTR(m); - if (m == NULL) { - l = ERR_PTR(-ENOMEM); - } else { - md_device_init(&m->cmm_md_dev, t); - m->cmm_md_dev.md_ops = &cmm_md_ops; - md_upcall_init(&m->cmm_md_dev, cmm_upcall); - l = cmm2lu_dev(m); - l->ld_ops = &cmm_lu_ops; - - OBD_ALLOC_PTR(m->cmm_fld); - if (!m->cmm_fld) { - cmm_device_free(env, l); - l = ERR_PTR(-ENOMEM); - } - } - RETURN(l); -} - -/* context key constructor/destructor: cmm_key_init, cmm_key_fini */ -LU_KEY_INIT_FINI(cmm, struct cmm_thread_info); - -/* context key: cmm_thread_key */ -LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD); - -struct cmm_thread_info *cmm_env_info(const struct lu_env *env) -{ - struct cmm_thread_info *info; - - info = lu_context_key_get(&env->le_ctx, &cmm_thread_key); - LASSERT(info != NULL); - return info; -} - -/* type constructor/destructor: cmm_type_init/cmm_type_fini */ -LU_TYPE_INIT_FINI(cmm, &cmm_thread_key); - -/* - * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device - * is really stacked. - */ -static int __cmm_type_init(struct lu_device_type *t) -{ - int rc; - rc = lu_device_type_init(&mdc_device_type); - if (rc == 0) { - rc = cmm_type_init(t); - if (rc) - lu_device_type_fini(&mdc_device_type); - } - return rc; -} - -static void __cmm_type_fini(struct lu_device_type *t) -{ - lu_device_type_fini(&mdc_device_type); - cmm_type_fini(t); -} - -static void __cmm_type_start(struct lu_device_type *t) -{ - mdc_device_type.ldt_ops->ldto_start(&mdc_device_type); - cmm_type_start(t); -} - -static void __cmm_type_stop(struct lu_device_type *t) -{ - mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type); - cmm_type_stop(t); -} - -static int cmm_device_init(const struct lu_env *env, struct lu_device *d, - const char *name, struct lu_device *next) -{ - struct cmm_device *m = lu2cmm_dev(d); - struct lu_site *ls; - int err = 0; - ENTRY; - - cfs_spin_lock_init(&m->cmm_tgt_guard); - CFS_INIT_LIST_HEAD(&m->cmm_targets); - m->cmm_tgt_count = 0; - m->cmm_child = lu2md_dev(next); - - err = fld_client_init(m->cmm_fld, name, - LUSTRE_CLI_FLD_HASH_DHT); - if (err) { - CERROR("Can't init FLD, err %d\n", err); - RETURN(err); - } - - /* Assign site's fld client ref, needed for asserts in osd. */ - ls = cmm2lu_dev(m)->ld_site; - lu_site2md(ls)->ms_client_fld = m->cmm_fld; - err = cmm_procfs_init(m, name); - - RETURN(err); -} - -static struct lu_device *cmm_device_fini(const struct lu_env *env, - struct lu_device *ld) -{ - struct cmm_device *cm = lu2cmm_dev(ld); - struct mdc_device *mc, *tmp; - struct lu_site *ls; - ENTRY; - - /* Finish all mdc devices */ - cfs_spin_lock(&cm->cmm_tgt_guard); - cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) { - struct lu_device *ld_m = mdc2lu_dev(mc); - struct lu_device *ld_c = cmm2lu_dev(cm); - - cfs_list_del_init(&mc->mc_linkage); - lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m); - lu_device_put(ld_c); - ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m); - ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m); - cm->cmm_tgt_count--; - } - cfs_spin_unlock(&cm->cmm_tgt_guard); - - fld_client_proc_fini(cm->cmm_fld); - fld_client_fini(cm->cmm_fld); - ls = cmm2lu_dev(cm)->ld_site; - lu_site2md(ls)->ms_client_fld = NULL; - cmm_procfs_fini(cm); - - RETURN (md2lu_dev(cm->cmm_child)); -} - -static struct lu_device_type_operations cmm_device_type_ops = { - .ldto_init = __cmm_type_init, - .ldto_fini = __cmm_type_fini, - - .ldto_start = __cmm_type_start, - .ldto_stop = __cmm_type_stop, - - .ldto_device_alloc = cmm_device_alloc, - .ldto_device_free = cmm_device_free, - - .ldto_device_init = cmm_device_init, - .ldto_device_fini = cmm_device_fini -}; - -static struct lu_device_type cmm_device_type = { - .ldt_tags = LU_DEVICE_MD, - .ldt_name = LUSTRE_CMM_NAME, - .ldt_ops = &cmm_device_type_ops, - .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD -}; - -struct lprocfs_vars lprocfs_cmm_obd_vars[] = { - { 0 } -}; - -struct lprocfs_vars lprocfs_cmm_module_vars[] = { - { 0 } -}; - -static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars) -{ - lvars->module_vars = lprocfs_cmm_module_vars; - lvars->obd_vars = lprocfs_cmm_obd_vars; -} -/** @} */ - -static int __init cmm_mod_init(void) -{ - struct lprocfs_static_vars lvars; - - lprocfs_cmm_init_vars(&lvars); - return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars, - LUSTRE_CMM_NAME, &cmm_device_type); -} - -static void __exit cmm_mod_exit(void) -{ - class_unregister_type(LUSTRE_CMM_NAME); -} - -MODULE_AUTHOR("Sun Microsystems, Inc. "); -MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")"); -MODULE_LICENSE("GPL"); - -cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit); diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h deleted file mode 100644 index 758c1e1..0000000 --- a/lustre/cmm/cmm_internal.h +++ /dev/null @@ -1,319 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/cmm_internal.h - * - * Lustre Cluster Metadata Manager (cmm) - * - * Author: Mike Pershin - */ -/** - * \defgroup cmm cmm - * Cluster Metadata Manager. - * Implementation of md interface to provide operation between MDS-es. - * - * CMM has two set of md methods, one is for local operation which uses MDD, - * second is for remote operations to call different MDS. - * When object is allocated then proper set of methods are set for it. * - * @{ - */ -#ifndef _CMM_INTERNAL_H -#define _CMM_INTERNAL_H - -#if defined(__KERNEL__) - -#include -#include -#include -#include - -/** - * md_device extention for CMM layer. - */ -struct cmm_device { - /** md_device instance */ - struct md_device cmm_md_dev; - /** device flags, taken from enum cmm_flags */ - __u32 cmm_flags; - /** underlaying device in MDS stack, usually MDD */ - struct md_device *cmm_child; - /** FLD client to talk to FLD */ - struct lu_client_fld *cmm_fld; - /** Number of this MDS server in cluster */ - mdsno_t cmm_local_num; - /** Total number of other MDS */ - __u32 cmm_tgt_count; - /** linked list of all remove MDS clients */ - cfs_list_t cmm_targets; - /** lock for cmm_device::cmm_targets operations */ - cfs_spinlock_t cmm_tgt_guard; - /** /proc entry with CMM data */ - cfs_proc_dir_entry_t *cmm_proc_entry; - /** CMM statistic */ - struct lprocfs_stats *cmm_stats; -}; - -/** - * CMM flags - */ -enum cmm_flags { - /** Device initialization complete. */ - CMM_INITIALIZED = 1 << 0 -}; - -/** - * Wrapper for md_device::md_ops of underlaying device cmm_child. - */ -static inline const struct md_device_operations * -cmm_child_ops(struct cmm_device *d) -{ - return d->cmm_child->md_ops; -} - -/** Wrapper to get cmm_device by contained md_device */ -static inline struct cmm_device *md2cmm_dev(struct md_device *m) -{ - return container_of0(m, struct cmm_device, cmm_md_dev); -} - -/** Wrapper to get cmm_device by contained lu_device */ -static inline struct cmm_device *lu2cmm_dev(struct lu_device *d) -{ - return container_of0(d, struct cmm_device, cmm_md_dev.md_lu_dev); -} - -/** Wrapper to get lu_device from cmm_device */ -static inline struct lu_device *cmm2lu_dev(struct cmm_device *d) -{ - return (&d->cmm_md_dev.md_lu_dev); -} - -#ifdef HAVE_SPLIT_SUPPORT -/** - * \ingroup split - * States of split operation. - */ -enum cmm_split_state { - CMM_SPLIT_UNKNOWN, - CMM_SPLIT_NONE, - CMM_SPLIT_NEEDED, - CMM_SPLIT_DONE, - CMM_SPLIT_DENIED -}; -#endif - -/** CMM container for md_object. */ -struct cmm_object { - struct md_object cmo_obj; -}; - -/** - * \defgroup cml cml - * CMM local operations. - * @{ - */ -/** - * Local CMM object. - */ -struct cml_object { - /** cmm_object instance. */ - struct cmm_object cmm_obj; -#ifdef HAVE_SPLIT_SUPPORT - /** split state of object (for dirs only) */ - enum cmm_split_state clo_split; -#endif -}; -/** @} */ - -/** - * \defgroup cmr cmr - * CMM remote operations. - * @{ - */ -/** - * Remote CMM object. - */ -struct cmr_object { - /** cmm_object instance */ - struct cmm_object cmm_obj; - /** mds number where object is placed */ - mdsno_t cmo_num; -}; -/** @} */ - -enum { - CMM_SPLIT_PAGE_COUNT = 1 -}; - -/** - * CMM thread info. - * This is storage for all variables used in CMM functions and it is needed as - * stack replacement. - */ -struct cmm_thread_info { - struct md_attr cmi_ma; - struct lu_buf cmi_buf; - struct lu_fid cmi_fid; /* used for le/cpu conversions */ - struct lu_rdpg cmi_rdpg; - /** pointers to pages for readpage. */ - struct page *cmi_pages[CMM_SPLIT_PAGE_COUNT]; - struct md_op_spec cmi_spec; - struct lmv_stripe_md cmi_lmv; - char cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE]; - /** Ops object filename */ - struct lu_name cti_name; -}; - -/** - * Wrapper to get cmm_device from cmm_object. - */ -static inline struct cmm_device *cmm_obj2dev(struct cmm_object *c) -{ - return (md2cmm_dev(md_obj2dev(&c->cmo_obj))); -} - -/** - * Get cmm_device from related lu_device. - */ -static inline struct cmm_object *lu2cmm_obj(struct lu_object *o) -{ - //LASSERT(lu_device_is_cmm(o->lo_dev)); - return container_of0(o, struct cmm_object, cmo_obj.mo_lu); -} - -/** - * Get cmm object from corresponding md_object. - */ -static inline struct cmm_object *md2cmm_obj(struct md_object *o) -{ - return container_of0(o, struct cmm_object, cmo_obj); -} - -/** - * Get next lower-layer md_object from current cmm_object. - */ -static inline struct md_object *cmm2child_obj(struct cmm_object *o) -{ - return (o ? lu2md(lu_object_next(&o->cmo_obj.mo_lu)) : NULL); -} - -/** - * Extract lu_fid from corresponding cmm_object. - */ -static inline struct lu_fid* cmm2fid(struct cmm_object *obj) -{ - return &(obj->cmo_obj.mo_lu.lo_header->loh_fid); -} - -struct cmm_thread_info *cmm_env_info(const struct lu_env *env); -struct lu_object *cmm_object_alloc(const struct lu_env *env, - const struct lu_object_header *hdr, - struct lu_device *); -/** - * \addtogroup cml - * @{ - */ -/** Get cml_object from lu_object. */ -static inline struct cml_object *lu2cml_obj(struct lu_object *o) -{ - return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu); -} -/** Get cml_object from md_object. */ -static inline struct cml_object *md2cml_obj(struct md_object *mo) -{ - return container_of0(mo, struct cml_object, cmm_obj.cmo_obj); -} -/** Get cml_object from cmm_object */ -static inline struct cml_object *cmm2cml_obj(struct cmm_object *co) -{ - return container_of0(co, struct cml_object, cmm_obj); -} -/** @} */ - -int cmm_upcall(const struct lu_env *env, struct md_device *md, - enum md_upcall_event ev, void *data); - -#ifdef HAVE_SPLIT_SUPPORT -/** - * \defgroup split split - * @{ - */ - -#define CMM_MD_SIZE(stripes) (sizeof(struct lmv_stripe_md) + \ - (stripes) * sizeof(struct lu_fid)) - -/** Get and initialize lu_bug from cmm_thread_info. */ -static inline struct lu_buf *cmm_buf_get(const struct lu_env *env, - void *area, ssize_t len) -{ - struct lu_buf *buf; - - buf = &cmm_env_info(env)->cmi_buf; - buf->lb_buf = area; - buf->lb_len = len; - return buf; -} - -int cmm_split_check(const struct lu_env *env, struct md_object *mp, - const char *name); -int cmm_split_expect(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma, int *split); -int cmm_split_dir(const struct lu_env *env, struct md_object *mo); -int cmm_split_access(const struct lu_env *env, struct md_object *mo, - mdl_mode_t lm); -/** @} */ -#endif - -int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid, - mdsno_t *mds, const struct lu_env *env); - -int cmm_procfs_init(struct cmm_device *cmm, const char *name); -int cmm_procfs_fini(struct cmm_device *cmm); - -void cmm_lprocfs_time_start(const struct lu_env *env); -void cmm_lprocfs_time_end(const struct lu_env *env, struct cmm_device *cmm, - int idx); -/** - * CMM counters. - */ -enum { - LPROC_CMM_SPLIT_CHECK, - LPROC_CMM_SPLIT, - LPROC_CMM_LOOKUP, - LPROC_CMM_CREATE, - LPROC_CMM_NR -}; - -#endif /* __KERNEL__ */ -#endif /* _CMM_INTERNAL_H */ -/** @} */ diff --git a/lustre/cmm/cmm_lproc.c b/lustre/cmm/cmm_lproc.c deleted file mode 100644 index f864cc7..0000000 --- a/lustre/cmm/cmm_lproc.c +++ /dev/null @@ -1,122 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/cmm_lproc.c - * - * CMM lprocfs stuff - * - * Author: Wang Di - * Author: Yury Umanets - */ - -#define DEBUG_SUBSYSTEM S_MDS - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "cmm_internal.h" -/** - * \addtogroup cmm - * @{ - */ -static const char *cmm_counter_names[LPROC_CMM_NR] = { - [LPROC_CMM_SPLIT_CHECK] = "split_check", - [LPROC_CMM_SPLIT] = "split", - [LPROC_CMM_LOOKUP] = "lookup", - [LPROC_CMM_CREATE] = "create" -}; - -int cmm_procfs_init(struct cmm_device *cmm, const char *name) -{ - struct lu_device *ld = &cmm->cmm_md_dev.md_lu_dev; - struct obd_type *type; - int rc; - ENTRY; - - type = ld->ld_type->ldt_obd_type; - - LASSERT(name != NULL); - LASSERT(type != NULL); - - /* Find the type procroot and add the proc entry for this device. */ - cmm->cmm_proc_entry = lprocfs_register(name, type->typ_procroot, - NULL, NULL); - if (IS_ERR(cmm->cmm_proc_entry)) { - rc = PTR_ERR(cmm->cmm_proc_entry); - CERROR("Error %d setting up lprocfs for %s\n", - rc, name); - cmm->cmm_proc_entry = NULL; - GOTO(out, rc); - } - - rc = lu_time_init(&cmm->cmm_stats, - cmm->cmm_proc_entry, - cmm_counter_names, ARRAY_SIZE(cmm_counter_names)); - - EXIT; -out: - if (rc) - cmm_procfs_fini(cmm); - return rc; -} - -int cmm_procfs_fini(struct cmm_device *cmm) -{ - if (cmm->cmm_stats) - lu_time_fini(&cmm->cmm_stats); - - if (cmm->cmm_proc_entry) { - lprocfs_remove(&cmm->cmm_proc_entry); - cmm->cmm_proc_entry = NULL; - } - RETURN(0); -} - -void cmm_lprocfs_time_start(const struct lu_env *env) -{ - lu_lprocfs_time_start(env); -} - -void cmm_lprocfs_time_end(const struct lu_env *env, struct cmm_device *cmm, - int idx) -{ - lu_lprocfs_time_end(env, cmm->cmm_stats, idx); -} -/** @} */ diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c deleted file mode 100644 index 01e473a..0000000 --- a/lustre/cmm/cmm_object.c +++ /dev/null @@ -1,1548 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2012, Whamcloud, Inc. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/cmm_object.c - * - * Lustre Cluster Metadata Manager (cmm) - * - * Author: Mike Pershin - */ - -#define DEBUG_SUBSYSTEM S_MDS - -#include -#include "cmm_internal.h" -#include "mdc_internal.h" -/** - * \ingroup cmm - * Lookup MDS number \a mds by FID \a fid. - * - * \param fid FID of object to find MDS - * \param mds mds number to return. - */ -int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid, - mdsno_t *mds, const struct lu_env *env) -{ - int rc = 0; - ENTRY; - - LASSERT(fid_is_sane(fid)); - - rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, - LU_SEQ_RANGE_MDT, env); - if (rc) { - CERROR("Can't find mds by seq "LPX64", rc %d\n", - fid_seq(fid), rc); - RETURN(rc); - } - - if (*mds > cm->cmm_tgt_count) { - CERROR("Got invalid mdsno: %x (max: %x)\n", - *mds, cm->cmm_tgt_count); - rc = -EINVAL; - } else { - CDEBUG(D_INFO, "CMM: got MDS %x for sequence: " - LPX64"\n", *mds, fid_seq(fid)); - } - - RETURN (rc); -} - -/** - * \addtogroup cml - * @{ - */ -static const struct md_object_operations cml_mo_ops; -static const struct md_dir_operations cml_dir_ops; -static const struct lu_object_operations cml_obj_ops; - -static const struct md_object_operations cmr_mo_ops; -static const struct md_dir_operations cmr_dir_ops; -static const struct lu_object_operations cmr_obj_ops; - -/** - * \ingroup cmm - * Allocate CMM object. - */ -struct lu_object *cmm_object_alloc(const struct lu_env *env, - const struct lu_object_header *loh, - struct lu_device *ld) -{ - const struct lu_fid *fid = &loh->loh_fid; - struct lu_object *lo = NULL; - struct cmm_device *cd; - mdsno_t mds; - int rc = 0; - - ENTRY; - - cd = lu2cmm_dev(ld); - if (cd->cmm_flags & CMM_INITIALIZED) { - /* get object location */ - rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env); - if (rc) - RETURN(NULL); - } else - /* - * Device is not yet initialized, cmm_object is being created - * as part of early bootstrap procedure (it is /ROOT, or /fld, - * etc.). Such object *has* to be local. - */ - mds = cd->cmm_local_num; - - /* select the proper set of operations based on object location */ - if (mds == cd->cmm_local_num) { - struct cml_object *clo; - - OBD_ALLOC_PTR(clo); - if (clo != NULL) { - lo = &clo->cmm_obj.cmo_obj.mo_lu; - lu_object_init(lo, NULL, ld); - clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops; - clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops; - lo->lo_ops = &cml_obj_ops; - } - } else { - struct cmr_object *cro; - - OBD_ALLOC_PTR(cro); - if (cro != NULL) { - lo = &cro->cmm_obj.cmo_obj.mo_lu; - lu_object_init(lo, NULL, ld); - cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops; - cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops; - lo->lo_ops = &cmr_obj_ops; - cro->cmo_num = mds; - } - } - RETURN(lo); -} - -/** - * Get local child device. - */ -static struct lu_device *cml_child_dev(struct cmm_device *d) -{ - return &d->cmm_child->md_lu_dev; -} - -/** - * Free cml_object. - */ -static void cml_object_free(const struct lu_env *env, - struct lu_object *lo) -{ - struct cml_object *clo = lu2cml_obj(lo); - lu_object_fini(lo); - OBD_FREE_PTR(clo); -} - -/** - * Initialize cml_object. - */ -static int cml_object_init(const struct lu_env *env, struct lu_object *lo, - const struct lu_object_conf *unused) -{ - struct cmm_device *cd = lu2cmm_dev(lo->lo_dev); - struct lu_device *c_dev; - struct lu_object *c_obj; - int rc; - - ENTRY; - -#ifdef HAVE_SPLIT_SUPPORT - if (cd->cmm_tgt_count == 0) - lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED; - else - lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN; -#endif - c_dev = cml_child_dev(cd); - if (c_dev == NULL) { - rc = -ENOENT; - } else { - c_obj = c_dev->ld_ops->ldo_object_alloc(env, - lo->lo_header, c_dev); - if (c_obj != NULL) { - lu_object_add(lo, c_obj); - rc = 0; - } else { - rc = -ENOMEM; - } - } - - RETURN(rc); -} - -static int cml_object_print(const struct lu_env *env, void *cookie, - lu_printer_t p, const struct lu_object *lo) -{ - return (*p)(env, cookie, "[local]"); -} - -static const struct lu_object_operations cml_obj_ops = { - .loo_object_init = cml_object_init, - .loo_object_free = cml_object_free, - .loo_object_print = cml_object_print -}; - -/** - * \name CMM local md_object operations. - * All of them call just corresponding operations on next layer. - * @{ - */ -static int cml_object_create(const struct lu_env *env, - struct md_object *mo, - const struct md_op_spec *spec, - struct md_attr *attr) -{ - int rc; - ENTRY; - rc = mo_object_create(env, md_object_next(mo), spec, attr); - RETURN(rc); -} - -static int cml_permission(const struct lu_env *env, - struct md_object *p, struct md_object *c, - struct md_attr *attr, int mask) -{ - int rc; - ENTRY; - rc = mo_permission(env, md_object_next(p), md_object_next(c), - attr, mask); - RETURN(rc); -} - -static int cml_attr_get(const struct lu_env *env, struct md_object *mo, - struct md_attr *attr) -{ - int rc; - ENTRY; - rc = mo_attr_get(env, md_object_next(mo), attr); - RETURN(rc); -} - -static int cml_attr_set(const struct lu_env *env, struct md_object *mo, - const struct md_attr *attr) -{ - int rc; - ENTRY; - rc = mo_attr_set(env, md_object_next(mo), attr); - RETURN(rc); -} - -static int cml_xattr_get(const struct lu_env *env, struct md_object *mo, - struct lu_buf *buf, const char *name) -{ - int rc; - ENTRY; - rc = mo_xattr_get(env, md_object_next(mo), buf, name); - RETURN(rc); -} - -static int cml_readlink(const struct lu_env *env, struct md_object *mo, - struct lu_buf *buf) -{ - int rc; - ENTRY; - rc = mo_readlink(env, md_object_next(mo), buf); - RETURN(rc); -} - -static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type, - int flags, struct md_object *mo) -{ - int rc; - ENTRY; - rc = mo_changelog(env, type, flags, md_object_next(mo)); - RETURN(rc); -} - -static int cml_xattr_list(const struct lu_env *env, struct md_object *mo, - struct lu_buf *buf) -{ - int rc; - ENTRY; - rc = mo_xattr_list(env, md_object_next(mo), buf); - RETURN(rc); -} - -static int cml_xattr_set(const struct lu_env *env, struct md_object *mo, - const struct lu_buf *buf, const char *name, - int fl) -{ - int rc; - ENTRY; - rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl); - RETURN(rc); -} - -static int cml_xattr_del(const struct lu_env *env, struct md_object *mo, - const char *name) -{ - int rc; - ENTRY; - rc = mo_xattr_del(env, md_object_next(mo), name); - RETURN(rc); -} - -static int cml_ref_add(const struct lu_env *env, struct md_object *mo, - const struct md_attr *ma) -{ - int rc; - ENTRY; - rc = mo_ref_add(env, md_object_next(mo), ma); - RETURN(rc); -} - -static int cml_ref_del(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma) -{ - int rc; - ENTRY; - rc = mo_ref_del(env, md_object_next(mo), ma); - RETURN(rc); -} - -static int cml_open(const struct lu_env *env, struct md_object *mo, - int flags) -{ - int rc; - ENTRY; - rc = mo_open(env, md_object_next(mo), flags); - RETURN(rc); -} - -static int cml_close(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma, int mode) -{ - int rc; - ENTRY; - rc = mo_close(env, md_object_next(mo), ma, mode); - RETURN(rc); -} - -static int cml_readpage(const struct lu_env *env, struct md_object *mo, - const struct lu_rdpg *rdpg) -{ - int rc; - ENTRY; - rc = mo_readpage(env, md_object_next(mo), rdpg); - RETURN(rc); -} - -static int cml_capa_get(const struct lu_env *env, struct md_object *mo, - struct lustre_capa *capa, int renewal) -{ - int rc; - ENTRY; - rc = mo_capa_get(env, md_object_next(mo), capa, renewal); - RETURN(rc); -} - -static int cml_path(const struct lu_env *env, struct md_object *mo, - char *path, int pathlen, __u64 *recno, int *linkno) -{ - int rc; - ENTRY; - rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno); - RETURN(rc); -} - -static int cml_file_lock(const struct lu_env *env, struct md_object *mo, - struct lov_mds_md *lmm, struct ldlm_extent *extent, - struct lustre_handle *lockh) -{ - int rc; - ENTRY; - rc = mo_file_lock(env, md_object_next(mo), lmm, extent, lockh); - RETURN(rc); -} - -static int cml_file_unlock(const struct lu_env *env, struct md_object *mo, - struct lov_mds_md *lmm, struct lustre_handle *lockh) -{ - int rc; - ENTRY; - rc = mo_file_unlock(env, md_object_next(mo), lmm, lockh); - RETURN(rc); -} - -static int cml_object_sync(const struct lu_env *env, struct md_object *mo) -{ - int rc; - ENTRY; - rc = mo_object_sync(env, md_object_next(mo)); - RETURN(rc); -} - -static const struct md_object_operations cml_mo_ops = { - .moo_permission = cml_permission, - .moo_attr_get = cml_attr_get, - .moo_attr_set = cml_attr_set, - .moo_xattr_get = cml_xattr_get, - .moo_xattr_list = cml_xattr_list, - .moo_xattr_set = cml_xattr_set, - .moo_xattr_del = cml_xattr_del, - .moo_object_create = cml_object_create, - .moo_ref_add = cml_ref_add, - .moo_ref_del = cml_ref_del, - .moo_open = cml_open, - .moo_close = cml_close, - .moo_readpage = cml_readpage, - .moo_readlink = cml_readlink, - .moo_changelog = cml_changelog, - .moo_capa_get = cml_capa_get, - .moo_object_sync = cml_object_sync, - .moo_path = cml_path, - .moo_file_lock = cml_file_lock, - .moo_file_unlock = cml_file_unlock, -}; -/** @} */ - -/** - * \name CMM local md_dir_operations. - * @{ - */ -/** - * cml lookup object fid by name. - * This returns only FID by name. - */ -static int cml_lookup(const struct lu_env *env, struct md_object *mo_p, - const struct lu_name *lname, struct lu_fid *lf, - struct md_op_spec *spec) -{ - int rc; - ENTRY; - -#ifdef HAVE_SPLIT_SUPPORT - if (spec != NULL && spec->sp_ck_split) { - rc = cmm_split_check(env, mo_p, lname->ln_name); - if (rc) - RETURN(rc); - } -#endif - rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec); - RETURN(rc); - -} - -/** - * Helper to return lock mode. Used in split cases only. - */ -static mdl_mode_t cml_lock_mode(const struct lu_env *env, - struct md_object *mo, mdl_mode_t lm) -{ - int rc = MDL_MINMODE; - ENTRY; - -#ifdef HAVE_SPLIT_SUPPORT - rc = cmm_split_access(env, mo, lm); -#endif - - RETURN(rc); -} - -/** - * Create operation for cml. - * Objects are local, but split can happen. - * If split is not needed this will call next layer mdo_create(). - * - * \param mo_p Parent directory. Local object. - * \param lname name of file to create. - * \param mo_c Child object. It has no real inode yet. - * \param spec creation specification. - * \param ma child object attributes. - */ -static int cml_create(const struct lu_env *env, struct md_object *mo_p, - const struct lu_name *lname, struct md_object *mo_c, - struct md_op_spec *spec, struct md_attr *ma) -{ - int rc; - ENTRY; - -#ifdef HAVE_SPLIT_SUPPORT - /* Lock mode always should be sane. */ - LASSERT(spec->sp_cr_mode != MDL_MINMODE); - - /* - * Sigh... This is long story. MDT may have race with detecting if split - * is possible in cmm. We know this race and let it live, because - * getting it rid (with some sem or spinlock) will also mean that - * PDIROPS for create will not work because we kill parallel work, what - * is really bad for performance and makes no sense having PDIROPS. So, - * we better allow the race to live, but split dir only if some of - * concurrent threads takes EX lock, not matter which one. So that, say, - * two concurrent threads may have different lock modes on directory (CW - * and EX) and not first one which comes here and see that split is - * possible should split the dir, but only that one which has EX - * lock. And we do not care that in this case, split may happen a bit - * later (when dir size will not be necessarily 64K, but may be a bit - * larger). So that, we allow concurrent creates and protect split by EX - * lock. - */ - if (spec->sp_cr_mode == MDL_EX) { - /** - * Split cases: - * - Try to split \a mo_p upon each create operation. - * If split is ok, -ERESTART is returned and current thread - * will not peoceed with create. Instead it sends -ERESTART - * to client to let it know that correct MDT must be chosen. - * \see cmm_split_dir() - */ - rc = cmm_split_dir(env, mo_p); - if (rc) - /* - * -ERESTART or some split error is returned, we can't - * proceed with create. - */ - GOTO(out, rc); - } - - if (spec != NULL && spec->sp_ck_split) { - /** - * - Directory is split already. Let the caller know that - * it should tell client that directory is split and operation - * should repeat to correct MDT. - * \see cmm_split_check() - */ - rc = cmm_split_check(env, mo_p, lname->ln_name); - if (rc) - GOTO(out, rc); - } -#endif - - rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c), - spec, ma); - - EXIT; -#ifdef HAVE_SPLIT_SUPPORT -out: -#endif - return rc; -} - -/** Call mdo_create_data() on next layer. All objects are local. */ -static int cml_create_data(const struct lu_env *env, struct md_object *p, - struct md_object *o, - const struct md_op_spec *spec, - struct md_attr *ma) -{ - int rc; - ENTRY; - rc = mdo_create_data(env, md_object_next(p), md_object_next(o), - spec, ma); - RETURN(rc); -} - -/** Call mdo_link() on next layer. All objects are local. */ -static int cml_link(const struct lu_env *env, struct md_object *mo_p, - struct md_object *mo_s, const struct lu_name *lname, - struct md_attr *ma) -{ - int rc; - ENTRY; - rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s), - lname, ma); - RETURN(rc); -} - -/** Call mdo_unlink() on next layer. All objects are local. */ -static int cml_unlink(const struct lu_env *env, struct md_object *mo_p, - struct md_object *mo_c, const struct lu_name *lname, - struct md_attr *ma) -{ - int rc; - ENTRY; - rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c), - lname, ma); - RETURN(rc); -} - -/** Call mdo_lum_lmm_cmp() on next layer */ -static int cml_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c, - const struct md_op_spec *spec, struct md_attr *ma) -{ - int rc; - ENTRY; - - rc = mdo_lum_lmm_cmp(env, md_object_next(mo_c), spec, ma); - RETURN(rc); -} - -/** - * \ingroup cmm - * Get mode of object. - * Used in both cml and cmr hence can produce RPC to another server. - */ -static int cmm_mode_get(const struct lu_env *env, struct md_device *md, - const struct lu_fid *lf, struct md_attr *ma, - int *remote) -{ - struct md_object *mo_s = md_object_find_slice(env, md, lf); - struct cmm_thread_info *cmi; - struct md_attr *tmp_ma; - int rc; - ENTRY; - - if (IS_ERR(mo_s)) - RETURN(PTR_ERR(mo_s)); - - if (remote && (lu_object_exists(&mo_s->mo_lu) < 0)) - *remote = 1; - - cmi = cmm_env_info(env); - tmp_ma = &cmi->cmi_ma; - tmp_ma->ma_need = MA_INODE; - tmp_ma->ma_valid = 0; - /* get type from src, can be remote req */ - rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma); - if (rc == 0) { - ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode; - ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid; - ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid; - ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags; - ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS; - } - lu_object_put(env, &mo_s->mo_lu); - RETURN(rc); -} - -/** - * \ingroup cmm - * Set ctime for object. - * Used in both cml and cmr hence can produce RPC to another server. - */ -static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md, - const struct lu_fid *lf, struct md_attr *ma) -{ - struct md_object *mo_s = md_object_find_slice(env, md, lf); - int rc; - ENTRY; - - if (IS_ERR(mo_s)) - RETURN(PTR_ERR(mo_s)); - - LASSERT(ma->ma_attr.la_valid & LA_CTIME); - /* set ctime to obj, can be remote req */ - rc = mo_attr_set(env, md_object_next(mo_s), ma); - lu_object_put(env, &mo_s->mo_lu); - RETURN(rc); -} - -/** Helper to output debug information about rename operation. */ -static inline void cml_rename_warn(const char *fname, - struct md_object *mo_po, - struct md_object *mo_pn, - const struct lu_fid *lf, - const char *s_name, - struct md_object *mo_t, - const char *t_name, - int err) -{ - if (mo_t) - CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] " - "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] " - "[tname %s] [err %d]\n", fname, - PFID(lu_object_fid(&mo_po->mo_lu)), - PFID(lu_object_fid(&mo_pn->mo_lu)), - PFID(lf), s_name, - PFID(lu_object_fid(&mo_t->mo_lu)), - t_name, err); - else - CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] " - "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] " - "[tname %s] [err %d]\n", fname, - PFID(lu_object_fid(&mo_po->mo_lu)), - PFID(lu_object_fid(&mo_pn->mo_lu)), - PFID(lf), s_name, - t_name, err); -} - -/** - * Rename operation for cml. - * - * This is the most complex cross-reference operation. It may consist of up to 4 - * MDS server and require several RPCs to be sent. - * - * \param mo_po Old parent object. - * \param mo_pn New parent object. - * \param lf FID of object to rename. - * \param ls_name Source file name. - * \param mo_t target object. Should be NULL here. - * \param lt_name Name of target file. - * \param ma object attributes. - */ -static int cml_rename(const struct lu_env *env, struct md_object *mo_po, - struct md_object *mo_pn, const struct lu_fid *lf, - const struct lu_name *ls_name, struct md_object *mo_t, - const struct lu_name *lt_name, struct md_attr *ma) -{ - struct cmm_thread_info *cmi; - struct md_attr *tmp_ma = NULL; - struct md_object *tmp_t = mo_t; - int remote = 0, rc; - ENTRY; - - rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote); - if (rc) - RETURN(rc); - - if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) { - /** - * \note \a mo_t is remote object and there is RPC to unlink it. - * Before that, do local sanity check for rename first. - */ - if (!remote) { - struct md_object *mo_s = md_object_find_slice(env, - md_obj2dev(mo_po), lf); - if (IS_ERR(mo_s)) - RETURN(PTR_ERR(mo_s)); - - LASSERT(lu_object_exists(&mo_s->mo_lu) > 0); - rc = mo_permission(env, md_object_next(mo_po), - md_object_next(mo_s), - ma, MAY_RENAME_SRC); - lu_object_put(env, &mo_s->mo_lu); - if (rc) - RETURN(rc); - } else { - rc = mo_permission(env, NULL, md_object_next(mo_po), - ma, MAY_UNLINK | MAY_VTX_FULL); - if (rc) - RETURN(rc); - } - - rc = mo_permission(env, NULL, md_object_next(mo_pn), ma, - MAY_UNLINK | MAY_VTX_PART); - if (rc) - RETURN(rc); - - /* - * /note \a ma will be changed after mo_ref_del(), but we will use - * it for mdo_rename() later, so save it before mo_ref_del(). - */ - cmi = cmm_env_info(env); - tmp_ma = &cmi->cmi_ma; - *tmp_ma = *ma; - rc = mo_ref_del(env, md_object_next(mo_t), ma); - if (rc) - RETURN(rc); - - tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS; - mo_t = NULL; - } - - /** - * \note for src on remote MDS case, change its ctime before local - * rename. Firstly, do local sanity check for rename if necessary. - */ - if (remote) { - if (!tmp_ma) { - rc = mo_permission(env, NULL, md_object_next(mo_po), - ma, MAY_UNLINK | MAY_VTX_FULL); - if (rc) - RETURN(rc); - - if (mo_t) { - LASSERT(lu_object_exists(&mo_t->mo_lu) > 0); - rc = mo_permission(env, md_object_next(mo_pn), - md_object_next(mo_t), - ma, MAY_RENAME_TAR); - if (rc) - RETURN(rc); - } else { - int mask; - - if (mo_po != mo_pn) - mask = (S_ISDIR(ma->ma_attr.la_mode) ? - MAY_LINK : MAY_CREATE); - else - mask = MAY_CREATE; - rc = mo_permission(env, NULL, - md_object_next(mo_pn), - NULL, mask); - if (rc) - RETURN(rc); - } - - ma->ma_attr_flags |= MDS_PERM_BYPASS; - } else { - LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS); - } - - rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, - tmp_ma ? tmp_ma : ma); - if (rc) { - /* TODO: revoke mo_t if necessary. */ - cml_rename_warn("cmm_rename_ctime", mo_po, - mo_pn, lf, ls_name->ln_name, - tmp_t, lt_name->ln_name, rc); - RETURN(rc); - } - } - - /* local rename, mo_t can be NULL */ - rc = mdo_rename(env, md_object_next(mo_po), - md_object_next(mo_pn), lf, ls_name, - md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma); - if (rc) - /* TODO: revoke all cml_rename */ - cml_rename_warn("mdo_rename", mo_po, mo_pn, lf, - ls_name->ln_name, tmp_t, lt_name->ln_name, rc); - - RETURN(rc); -} - -/** - * Rename target partial operation. - * Used for cross-ref rename. - */ -static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p, - struct md_object *mo_t, const struct lu_fid *lf, - const struct lu_name *lname, struct md_attr *ma) -{ - int rc; - ENTRY; - - rc = mdo_rename_tgt(env, md_object_next(mo_p), - md_object_next(mo_t), lf, lname, ma); - RETURN(rc); -} - -/** - * Name insert only operation. - * used only in case of rename_tgt() when target doesn't exist. - */ -static int cml_name_insert(const struct lu_env *env, struct md_object *p, - const struct lu_name *lname, const struct lu_fid *lf, - const struct md_attr *ma) -{ - int rc; - ENTRY; - - rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma); - - RETURN(rc); -} - -/** - * \ingroup cmm - * Check two fids are not subdirectories. - */ -static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo, - const struct lu_fid *fid, struct lu_fid *sfid) -{ - struct cmm_thread_info *cmi; - int rc; - ENTRY; - - cmi = cmm_env_info(env); - rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL); - if (rc) - RETURN(rc); - - if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode)) - RETURN(0); - - rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid); - RETURN(rc); -} - -static const struct md_dir_operations cml_dir_ops = { - .mdo_is_subdir = cmm_is_subdir, - .mdo_lookup = cml_lookup, - .mdo_lock_mode = cml_lock_mode, - .mdo_create = cml_create, - .mdo_link = cml_link, - .mdo_unlink = cml_unlink, - .mdo_lum_lmm_cmp = cml_lum_lmm_cmp, - .mdo_name_insert = cml_name_insert, - .mdo_rename = cml_rename, - .mdo_rename_tgt = cml_rename_tgt, - .mdo_create_data = cml_create_data, -}; -/** @} */ -/** @} */ - -/** - * \addtogroup cmr - * @{ - */ -/** - * \name cmr helpers - * @{ - */ -/** Get cmr_object from lu_object. */ -static inline struct cmr_object *lu2cmr_obj(struct lu_object *o) -{ - return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu); -} -/** Get cmr_object from md_object. */ -static inline struct cmr_object *md2cmr_obj(struct md_object *mo) -{ - return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj); -} -/** Get cmr_object from cmm_object. */ -static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co) -{ - return container_of0(co, struct cmr_object, cmm_obj); -} -/** @} */ - -/** - * Get proper child device from MDCs. - */ -static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num) -{ - struct lu_device *next = NULL; - struct mdc_device *mdc; - - cfs_spin_lock(&d->cmm_tgt_guard); - cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) { - if (mdc->mc_num == num) { - next = mdc2lu_dev(mdc); - break; - } - } - cfs_spin_unlock(&d->cmm_tgt_guard); - return next; -} - -/** - * Free cmr_object. - */ -static void cmr_object_free(const struct lu_env *env, - struct lu_object *lo) -{ - struct cmr_object *cro = lu2cmr_obj(lo); - lu_object_fini(lo); - OBD_FREE_PTR(cro); -} - -/** - * Initialize cmr object. - */ -static int cmr_object_init(const struct lu_env *env, struct lu_object *lo, - const struct lu_object_conf *unused) -{ - struct cmm_device *cd = lu2cmm_dev(lo->lo_dev); - struct lu_device *c_dev; - struct lu_object *c_obj; - int rc; - - ENTRY; - - c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num); - if (c_dev == NULL) { - rc = -ENOENT; - } else { - c_obj = c_dev->ld_ops->ldo_object_alloc(env, - lo->lo_header, c_dev); - if (c_obj != NULL) { - lu_object_add(lo, c_obj); - rc = 0; - } else { - rc = -ENOMEM; - } - } - - RETURN(rc); -} - -/** - * Output lu_object data. - */ -static int cmr_object_print(const struct lu_env *env, void *cookie, - lu_printer_t p, const struct lu_object *lo) -{ - const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo); - return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num); -} - -/** - * Cmr instance of lu_object_operations. - */ -static const struct lu_object_operations cmr_obj_ops = { - .loo_object_init = cmr_object_init, - .loo_object_free = cmr_object_free, - .loo_object_print = cmr_object_print -}; - -/** - * \name cmr remote md_object operations. - * All operations here are invalid and return errors. There is no local object - * so these operations return two kinds of error: - * -# -EFAULT if operation is prohibited. - * -# -EREMOTE if operation can be done just to notify upper level about remote - * object. - * - * @{ - */ -static int cmr_object_create(const struct lu_env *env, - struct md_object *mo, - const struct md_op_spec *spec, - struct md_attr *ma) -{ - return -EFAULT; -} - -static int cmr_permission(const struct lu_env *env, - struct md_object *p, struct md_object *c, - struct md_attr *attr, int mask) -{ - return -EREMOTE; -} - -static int cmr_attr_get(const struct lu_env *env, struct md_object *mo, - struct md_attr *attr) -{ - return -EREMOTE; -} - -static int cmr_attr_set(const struct lu_env *env, struct md_object *mo, - const struct md_attr *attr) -{ - return -EFAULT; -} - -static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo, - struct lu_buf *buf, const char *name) -{ - return -EFAULT; -} - -static int cmr_readlink(const struct lu_env *env, struct md_object *mo, - struct lu_buf *buf) -{ - return -EFAULT; -} - -static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type, - int flags, struct md_object *mo) -{ - return -EFAULT; -} - -static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo, - struct lu_buf *buf) -{ - return -EFAULT; -} - -static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo, - const struct lu_buf *buf, const char *name, - int fl) -{ - return -EFAULT; -} - -static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo, - const char *name) -{ - return -EFAULT; -} - -static int cmr_ref_add(const struct lu_env *env, struct md_object *mo, - const struct md_attr *ma) -{ - return -EFAULT; -} - -static int cmr_ref_del(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma) -{ - return -EFAULT; -} - -static int cmr_open(const struct lu_env *env, struct md_object *mo, - int flags) -{ - return -EREMOTE; -} - -static int cmr_close(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma, int mode) -{ - return -EFAULT; -} - -static int cmr_readpage(const struct lu_env *env, struct md_object *mo, - const struct lu_rdpg *rdpg) -{ - return -EREMOTE; -} - -static int cmr_capa_get(const struct lu_env *env, struct md_object *mo, - struct lustre_capa *capa, int renewal) -{ - return -EFAULT; -} - -static int cmr_path(const struct lu_env *env, struct md_object *obj, - char *path, int pathlen, __u64 *recno, int *linkno) -{ - return -EREMOTE; -} - -static int cmr_object_sync(const struct lu_env *env, struct md_object *mo) -{ - return -EFAULT; -} - -static int cmr_file_lock(const struct lu_env *env, struct md_object *mo, - struct lov_mds_md *lmm, struct ldlm_extent *extent, - struct lustre_handle *lockh) -{ - return -EREMOTE; -} - -static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo, - struct lov_mds_md *lmm, struct lustre_handle *lockh) -{ - return -EREMOTE; -} - -static int cmr_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c, - const struct md_op_spec *spec, struct md_attr *ma) -{ - return -EREMOTE; -} - -/** Set of md_object_operations for cmr. */ -static const struct md_object_operations cmr_mo_ops = { - .moo_permission = cmr_permission, - .moo_attr_get = cmr_attr_get, - .moo_attr_set = cmr_attr_set, - .moo_xattr_get = cmr_xattr_get, - .moo_xattr_set = cmr_xattr_set, - .moo_xattr_list = cmr_xattr_list, - .moo_xattr_del = cmr_xattr_del, - .moo_object_create = cmr_object_create, - .moo_ref_add = cmr_ref_add, - .moo_ref_del = cmr_ref_del, - .moo_open = cmr_open, - .moo_close = cmr_close, - .moo_readpage = cmr_readpage, - .moo_readlink = cmr_readlink, - .moo_changelog = cmr_changelog, - .moo_capa_get = cmr_capa_get, - .moo_object_sync = cmr_object_sync, - .moo_path = cmr_path, - .moo_file_lock = cmr_file_lock, - .moo_file_unlock = cmr_file_unlock, -}; -/** @} */ - -/** - * \name cmr md_dir operations. - * - * All methods below are cross-ref by nature. They consist of remote call and - * local operation. Due to future rollback functionality there are several - * limitations for such methods: - * -# remote call should be done at first to do epoch negotiation between all - * MDS involved and to avoid the RPC inside transaction. - * -# only one RPC can be sent - also due to epoch negotiation. - * For more details see rollback HLD/DLD. - * @{ - */ -static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p, - const struct lu_name *lname, struct lu_fid *lf, - struct md_op_spec *spec) -{ - /* - * This can happens while rename() If new parent is remote dir, lookup - * will happen here. - */ - - return -EREMOTE; -} - -/** Return lock mode. */ -static mdl_mode_t cmr_lock_mode(const struct lu_env *env, - struct md_object *mo, mdl_mode_t lm) -{ - return MDL_MINMODE; -} - -/** - * Create operation for cmr. - * Remote object creation and local name insert. - * - * \param mo_p Parent directory. Local object. - * \param lchild_name name of file to create. - * \param mo_c Child object. It has no real inode yet. - * \param spec creation specification. - * \param ma child object attributes. - */ -static int cmr_create(const struct lu_env *env, struct md_object *mo_p, - const struct lu_name *lchild_name, struct md_object *mo_c, - struct md_op_spec *spec, - struct md_attr *ma) -{ - struct cmm_thread_info *cmi; - struct md_attr *tmp_ma; - int rc; - ENTRY; - - /* Make sure that name isn't exist before doing remote call. */ - rc = mdo_lookup(env, md_object_next(mo_p), lchild_name, - &cmm_env_info(env)->cmi_fid, NULL); - if (rc == 0) - RETURN(-EEXIST); - else if (rc != -ENOENT) - RETURN(rc); - - /* check the SGID attr */ - cmi = cmm_env_info(env); - LASSERT(cmi); - tmp_ma = &cmi->cmi_ma; - tmp_ma->ma_valid = 0; - tmp_ma->ma_need = MA_INODE; - -#ifdef CONFIG_FS_POSIX_ACL - if (!S_ISLNK(ma->ma_attr.la_mode)) { - tmp_ma->ma_acl = cmi->cmi_xattr_buf; - tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf); - tmp_ma->ma_need |= MA_ACL_DEF; - } -#endif - rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma); - if (rc) - RETURN(rc); - - if (tmp_ma->ma_attr.la_mode & S_ISGID) { - ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid; - if (S_ISDIR(ma->ma_attr.la_mode)) { - ma->ma_attr.la_mode |= S_ISGID; - ma->ma_attr.la_valid |= LA_MODE; - } - } - -#ifdef CONFIG_FS_POSIX_ACL - if (tmp_ma->ma_valid & MA_ACL_DEF) { - spec->u.sp_ea.fid = spec->u.sp_pfid; - spec->u.sp_ea.eadata = tmp_ma->ma_acl; - spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size; - spec->sp_cr_flags |= MDS_CREATE_RMT_ACL; - } -#endif - - /* Local permission check for name_insert before remote ops. */ - rc = mo_permission(env, NULL, md_object_next(mo_p), NULL, - (S_ISDIR(ma->ma_attr.la_mode) ? - MAY_LINK : MAY_CREATE)); - if (rc) - RETURN(rc); - - /** - * \note \a ma will be changed after mo_object_create(), but we will use - * it for mdo_name_insert() later, so save it before mo_object_create(). - */ - *tmp_ma = *ma; - rc = mo_object_create(env, md_object_next(mo_c), spec, ma); - if (rc == 0) { - tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS; - rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name, - lu_object_fid(&mo_c->mo_lu), tmp_ma); - if (unlikely(rc)) { - /* TODO: remove object mo_c on remote MDS */ - CWARN("cmr_create failed, should revoke: [mo_p "DFID"]" - " [name %s] [mo_c "DFID"] [err %d]\n", - PFID(lu_object_fid(&mo_p->mo_lu)), - lchild_name->ln_name, - PFID(lu_object_fid(&mo_c->mo_lu)), rc); - } - } - - RETURN(rc); -} - -/** - * Link operations for cmr. - * - * The link RPC is always issued to the server where source parent is living. - * The first operation to do is object nlink increment on remote server. - * Second one is local mdo_name_insert(). - * - * \param mo_p parent directory. It is local. - * \param mo_s source object to link. It is remote. - * \param lname Name of link file. - * \param ma object attributes. - */ -static int cmr_link(const struct lu_env *env, struct md_object *mo_p, - struct md_object *mo_s, const struct lu_name *lname, - struct md_attr *ma) -{ - int rc; - ENTRY; - - /* Make sure that name isn't exist before doing remote call. */ - rc = mdo_lookup(env, md_object_next(mo_p), lname, - &cmm_env_info(env)->cmi_fid, NULL); - if (rc == 0) { - rc = -EEXIST; - } else if (rc == -ENOENT) { - /* Local permission check for name_insert before remote ops. */ - rc = mo_permission(env, NULL, md_object_next(mo_p), NULL, - MAY_CREATE); - if (rc) - RETURN(rc); - - rc = mo_ref_add(env, md_object_next(mo_s), ma); - if (rc == 0) { - ma->ma_attr_flags |= MDS_PERM_BYPASS; - rc = mdo_name_insert(env, md_object_next(mo_p), lname, - lu_object_fid(&mo_s->mo_lu), ma); - if (unlikely(rc)) { - /* TODO: ref_del from mo_s on remote MDS */ - CWARN("cmr_link failed, should revoke: " - "[mo_p "DFID"] [mo_s "DFID"] " - "[name %s] [err %d]\n", - PFID(lu_object_fid(&mo_p->mo_lu)), - PFID(lu_object_fid(&mo_s->mo_lu)), - lname->ln_name, rc); - } - } - } - RETURN(rc); -} - -/** - * Unlink operations for cmr. - * - * The unlink RPC is always issued to the server where parent is living. Hence - * the first operation to do is object unlink on remote server. Second one is - * local mdo_name_remove(). - * - * \param mo_p parent md_object. It is local. - * \param mo_c child object to be unlinked. It is remote. - * \param lname Name of file to unlink. - * \param ma object attributes. - */ -static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p, - struct md_object *mo_c, const struct lu_name *lname, - struct md_attr *ma) -{ - struct cmm_thread_info *cmi; - struct md_attr *tmp_ma; - int rc; - ENTRY; - - /* Local permission check for name_remove before remote ops. */ - rc = mo_permission(env, NULL, md_object_next(mo_p), ma, - MAY_UNLINK | MAY_VTX_PART); - if (rc) - RETURN(rc); - - /* - * \note \a ma will be changed after mo_ref_del, but we will use - * it for mdo_name_remove() later, so save it before mo_ref_del(). - */ - cmi = cmm_env_info(env); - tmp_ma = &cmi->cmi_ma; - *tmp_ma = *ma; - rc = mo_ref_del(env, md_object_next(mo_c), ma); - if (rc == 0) { - tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS; - rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma); - if (unlikely(rc)) { - /* TODO: ref_add to mo_c on remote MDS */ - CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]" - " [mo_c "DFID"] [name %s] [err %d]\n", - PFID(lu_object_fid(&mo_p->mo_lu)), - PFID(lu_object_fid(&mo_c->mo_lu)), - lname->ln_name, rc); - } - } - - RETURN(rc); -} - -/** Helper which outputs error message during cmr_rename() */ -static inline void cmr_rename_warn(const char *fname, - struct md_object *mo_po, - struct md_object *mo_pn, - const struct lu_fid *lf, - const char *s_name, - const char *t_name, - int err) -{ - CWARN("cmr_rename failed for %s, should revoke: " - "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] " - "[sname %s] [tname %s] [err %d]\n", fname, - PFID(lu_object_fid(&mo_po->mo_lu)), - PFID(lu_object_fid(&mo_pn->mo_lu)), - PFID(lf), s_name, t_name, err); -} - -/** - * Rename operation for cmr. - * - * This is the most complex cross-reference operation. It may consist of up to 4 - * MDS server and require several RPCs to be sent. - * - * \param mo_po Old parent object. - * \param mo_pn New parent object. - * \param lf FID of object to rename. - * \param ls_name Source file name. - * \param mo_t target object. Should be NULL here. - * \param lt_name Name of target file. - * \param ma object attributes. - */ -static int cmr_rename(const struct lu_env *env, - struct md_object *mo_po, struct md_object *mo_pn, - const struct lu_fid *lf, const struct lu_name *ls_name, - struct md_object *mo_t, const struct lu_name *lt_name, - struct md_attr *ma) -{ - struct cmm_thread_info *cmi; - struct md_attr *tmp_ma; - int rc; - ENTRY; - - LASSERT(mo_t == NULL); - - /* get real type of src */ - rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL); - if (rc) - RETURN(rc); - - /* Local permission check for name_remove before remote ops. */ - rc = mo_permission(env, NULL, md_object_next(mo_po), ma, - MAY_UNLINK | MAY_VTX_FULL); - if (rc) - RETURN(rc); - - /** - * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it - * for mdo_name_remove() later, so save it before mdo_rename_tgt. - */ - cmi = cmm_env_info(env); - tmp_ma = &cmi->cmi_ma; - *tmp_ma = *ma; - /** - * \note The \a mo_pn is remote directory, so we cannot even know if there is - * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do - * lookup and process this further. - */ - rc = mdo_rename_tgt(env, md_object_next(mo_pn), - NULL/* mo_t */, lf, lt_name, ma); - if (rc) - RETURN(rc); - - tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS; - - /* src object maybe on remote MDS, do remote ops first. */ - rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma); - if (unlikely(rc)) { - /* TODO: revoke mdo_rename_tgt */ - cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf, - ls_name->ln_name, lt_name->ln_name, rc); - RETURN(rc); - } - - /* only old name is removed localy */ - rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma); - if (unlikely(rc)) - /* TODO: revoke all cmr_rename */ - cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf, - ls_name->ln_name, lt_name->ln_name, rc); - - RETURN(rc); -} - -/** - * Part of cross-ref rename(). - * Used to insert new name in new parent and unlink target. - */ -static int cmr_rename_tgt(const struct lu_env *env, - struct md_object *mo_p, struct md_object *mo_t, - const struct lu_fid *lf, const struct lu_name *lname, - struct md_attr *ma) -{ - struct cmm_thread_info *cmi; - struct md_attr *tmp_ma; - int rc; - ENTRY; - - /* target object is remote one */ - /* Local permission check for rename_tgt before remote ops. */ - rc = mo_permission(env, NULL, md_object_next(mo_p), ma, - MAY_UNLINK | MAY_VTX_PART); - if (rc) - RETURN(rc); - - /* - * XXX: @ma maybe changed after mo_ref_del, but we will use - * it for mdo_rename_tgt later, so save it before mo_ref_del. - */ - cmi = cmm_env_info(env); - tmp_ma = &cmi->cmi_ma; - *tmp_ma = *ma; - rc = mo_ref_del(env, md_object_next(mo_t), ma); - /* continue locally with name handling only */ - if (rc == 0) { - tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS; - rc = mdo_rename_tgt(env, md_object_next(mo_p), - NULL, lf, lname, tmp_ma); - if (unlikely(rc)) { - /* TODO: ref_add to mo_t on remote MDS */ - CWARN("cmr_rename_tgt failed, should revoke: " - "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] " - "[name %s] [err %d]\n", - PFID(lu_object_fid(&mo_p->mo_lu)), - PFID(lu_object_fid(&mo_t->mo_lu)), - PFID(lf), - lname->ln_name, rc); - } - } - RETURN(rc); -} -/** @} */ -/** - * The md_dir_operations for cmr. - */ -static const struct md_dir_operations cmr_dir_ops = { - .mdo_is_subdir = cmm_is_subdir, - .mdo_lookup = cmr_lookup, - .mdo_lock_mode = cmr_lock_mode, - .mdo_create = cmr_create, - .mdo_link = cmr_link, - .mdo_unlink = cmr_unlink, - .mdo_lum_lmm_cmp = cmr_lum_lmm_cmp, - .mdo_rename = cmr_rename, - .mdo_rename_tgt = cmr_rename_tgt -}; -/** @} */ diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c deleted file mode 100644 index e799fa9..0000000 --- a/lustre/cmm/cmm_split.c +++ /dev/null @@ -1,758 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/cmm_split.c - * - * Lustre splitting dir - * - * Author: Alex Thomas - * Author: Wang Di - * Author: Yury Umanets - */ - -#define DEBUG_SUBSYSTEM S_MDS - -#include -#include -#include -#include -#include "cmm_internal.h" -#include "mdc_internal.h" - -/** - * \addtogroup split - * @{ - */ -enum { - CMM_SPLIT_SIZE = 128 * 1024 -}; - -/** - * This function checks if passed \a name come to correct server (local MDT). - * - * \param mp Parent directory - * \param name Name to lookup - * \retval -ERESTART Let client know that dir was split and client needs to - * chose correct stripe. - */ -int cmm_split_check(const struct lu_env *env, struct md_object *mp, - const char *name) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp)); - struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - struct cml_object *clo = md2cml_obj(mp); - int rc, lmv_size; - ENTRY; - - cmm_lprocfs_time_start(env); - - /* Not split yet */ - if (clo->clo_split == CMM_SPLIT_NONE || - clo->clo_split == CMM_SPLIT_DENIED) - GOTO(out, rc = 0); - - lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1); - - /* Try to get the LMV EA */ - memset(ma, 0, sizeof(*ma)); - - ma->ma_need = MA_LMV; - ma->ma_lmv_size = lmv_size; - OBD_ALLOC(ma->ma_lmv, lmv_size); - if (ma->ma_lmv == NULL) - GOTO(out, rc = -ENOMEM); - - /* Get LMV EA, Note: refresh valid here for getting LMV_EA */ - rc = mo_attr_get(env, mp, ma); - if (rc) - GOTO(cleanup, rc); - - /* No LMV just return */ - if (!(ma->ma_valid & MA_LMV)) { - /* update split state if unknown */ - if (clo->clo_split == CMM_SPLIT_UNKNOWN) - clo->clo_split = CMM_SPLIT_NONE; - GOTO(cleanup, rc = 0); - } - - /* Skip checking the slave dirs (mea_count is 0) */ - if (ma->ma_lmv->mea_count != 0) { - int idx; - - /** - * This gets stripe by name to check the name belongs to master - * dir, otherwise return the -ERESTART - */ - idx = mea_name2idx(ma->ma_lmv, name, strlen(name)); - - /** - * When client does not know about split, it sends create() to - * the master MDT and master replay back if directory is split. - * So client should orward request to correct MDT. This - * is why we check here if stripe zero or not. Zero stripe means - * master stripe. If stripe calculated from name is not zero - - * return -ERESTART. - */ - if (idx != 0) - rc = -ERESTART; - - /* update split state to DONE if unknown */ - if (clo->clo_split == CMM_SPLIT_UNKNOWN) - clo->clo_split = CMM_SPLIT_DONE; - } else { - /* split is denied for slave dir */ - clo->clo_split = CMM_SPLIT_DENIED; - } - EXIT; -cleanup: - OBD_FREE(ma->ma_lmv, lmv_size); -out: - cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK); - return rc; -} - -/** - * Return preferable access mode to the caller taking into account the split - * case and the fact of existing not splittable dirs. - */ -int cmm_split_access(const struct lu_env *env, struct md_object *mo, - mdl_mode_t lm) -{ - struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - int rc, split; - ENTRY; - - memset(ma, 0, sizeof(*ma)); - - /* - * Check only if we need protection from split. If not - mdt handles - * other cases. - */ - rc = cmm_split_expect(env, mo, ma, &split); - if (rc) { - CERROR("Can't check for possible split, rc %d\n", rc); - RETURN(MDL_MINMODE); - } - - /* - * Do not take PDO lock on non-splittable objects if this is not PW, - * this should speed things up a bit. - */ - if (split == CMM_SPLIT_DONE && lm != MDL_PW) - RETURN(MDL_NL); - - /* Protect splitting by exclusive lock. */ - if (split == CMM_SPLIT_NEEDED && lm == MDL_PW) - RETURN(MDL_EX); - - /* - * Have no idea about lock mode, let it be what higher layer wants. - */ - RETURN(MDL_MINMODE); -} - -/** - * Check if split is expected for current thread. - * - * \param mo Directory to split. - * \param ma md attributes. - * \param split Flag to save split information. - */ -int cmm_split_expect(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma, int *split) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct cml_object *clo = md2cml_obj(mo); - struct lu_fid root_fid; - int rc; - ENTRY; - - if (clo->clo_split == CMM_SPLIT_DONE || - clo->clo_split == CMM_SPLIT_DENIED) { - *split = clo->clo_split; - RETURN(0); - } - /* CMM_SPLIT_UNKNOWN case below */ - - /* No need to split root object. */ - rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, - &root_fid); - if (rc) - RETURN(rc); - - if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) { - /* update split state */ - *split = clo->clo_split == CMM_SPLIT_DENIED; - RETURN(0); - } - - /* - * Assumption: ma_valid = 0 here, we only need get inode and lmv_size - * for this get_attr. - */ - LASSERT(ma->ma_valid == 0); - ma->ma_need = MA_INODE | MA_LMV; - rc = mo_attr_get(env, mo, ma); - if (rc) - RETURN(rc); - - /* No need split for already split object */ - if (ma->ma_valid & MA_LMV) { - LASSERT(ma->ma_lmv_size > 0); - *split = clo->clo_split = CMM_SPLIT_DONE; - RETURN(0); - } - - /* No need split for object whose size < CMM_SPLIT_SIZE */ - if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) { - *split = clo->clo_split = CMM_SPLIT_NONE; - RETURN(0); - } - - *split = clo->clo_split = CMM_SPLIT_NEEDED; - RETURN(0); -} - -struct cmm_object *cmm_object_find(const struct lu_env *env, - struct cmm_device *d, - const struct lu_fid *f) -{ - return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid)); -} - -static inline void cmm_object_put(const struct lu_env *env, - struct cmm_object *o) -{ - lu_object_put(env, &o->cmo_obj.mo_lu); -} - -/** - * Allocate new FID on passed \a mc for slave object which is going to - * create there soon. - */ -static int cmm_split_fid_alloc(const struct lu_env *env, - struct cmm_device *cmm, - struct mdc_device *mc, - struct lu_fid *fid) -{ - int rc; - ENTRY; - - LASSERT(cmm != NULL && mc != NULL && fid != NULL); - - cfs_down(&mc->mc_fid_sem); - - /* Alloc new fid on \a mc. */ - rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL); - if (rc > 0) - rc = 0; - cfs_up(&mc->mc_fid_sem); - - RETURN(rc); -} - -/** - * Allocate new slave object on passed \a mc. - */ -static int cmm_split_slave_create(const struct lu_env *env, - struct cmm_device *cmm, - struct mdc_device *mc, - struct lu_fid *fid, - struct md_attr *ma, - struct lmv_stripe_md *lmv, - int lmv_size) -{ - struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec; - struct cmm_object *obj; - int rc; - ENTRY; - - /* Allocate new fid and store it to @fid */ - rc = cmm_split_fid_alloc(env, cmm, mc, fid); - if (rc) { - CERROR("Can't alloc new fid on "LPU64 - ", rc %d\n", mc->mc_num, rc); - RETURN(rc); - } - - /* Allocate new object on @mc */ - obj = cmm_object_find(env, cmm, fid); - if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); - - memset(spec, 0, sizeof *spec); - spec->u.sp_ea.fid = fid; - spec->u.sp_ea.eadata = lmv; - spec->u.sp_ea.eadatalen = lmv_size; - spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ; - rc = mo_object_create(env, md_object_next(&obj->cmo_obj), - spec, ma); - cmm_object_put(env, obj); - RETURN(rc); -} - -/** - * Create so many slaves as number of stripes. - * This is called in split time before sending pages to slaves. - */ -static int cmm_split_slaves_create(const struct lu_env *env, - struct md_object *mo, - struct md_attr *ma) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct lu_fid *lf = cmm2fid(md2cmm_obj(mo)); - struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv; - struct mdc_device *mc, *tmp; - struct lmv_stripe_md *lmv; - int i = 1, rc = 0; - ENTRY; - - /* Init the split MEA */ - lmv = ma->ma_lmv; - lmv->mea_master = cmm->cmm_local_num; - lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; - lmv->mea_count = cmm->cmm_tgt_count + 1; - - /* - * Store master FID to local node idx number. Local node is always - * master and its stripe number if 0. - */ - lmv->mea_ids[0] = *lf; - - memset(slave_lmv, 0, sizeof *slave_lmv); - slave_lmv->mea_master = cmm->cmm_local_num; - slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; - slave_lmv->mea_count = 0; - - cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) { - rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i], - ma, slave_lmv, sizeof(*slave_lmv)); - if (rc) - GOTO(cleanup, rc); - i++; - } - EXIT; -cleanup: - return rc; -} - -static inline int cmm_split_special_entry(struct lu_dirent *ent) -{ - if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) || - !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen))) - return 1; - return 0; -} - -/** - * Convert string to the lu_name structure. - */ -static inline struct lu_name *cmm_name(const struct lu_env *env, - char *name, int buflen) -{ - struct lu_name *lname; - struct cmm_thread_info *cmi; - - LASSERT(buflen > 0); - LASSERT(name[buflen - 1] == '\0'); - - cmi = cmm_env_info(env); - lname = &cmi->cti_name; - lname->ln_name = name; - /* do NOT count the terminating '\0' of name for length */ - lname->ln_namelen = buflen - 1; - return lname; -} - -/** - * Helper for cmm_split_remove_page(). It removes one entry from local MDT. - * Do not corrupt byte order in page, it will be sent to remote MDT. - */ -static int cmm_split_remove_entry(const struct lu_env *env, - struct md_object *mo, - struct lu_dirent *ent) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct cmm_thread_info *cmi; - struct md_attr *ma; - struct cmm_object *obj; - int is_dir, rc; - char *name; - struct lu_name *lname; - ENTRY; - - if (cmm_split_special_entry(ent)) - RETURN(0); - - fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid); - obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid); - if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); - - cmi = cmm_env_info(env); - ma = &cmi->cmi_ma; - - if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0) - is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu)); - else - /** - * \note These days only cross-ref dirs are possible, so for the - * sake of simplicity, in split, we suppose that all cross-ref - * names point to directory and do not do additional getattr to - * remote MDT. - */ - is_dir = 1; - - OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1); - if (!name) - GOTO(cleanup, rc = -ENOMEM); - - memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen)); - lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1); - /** - * \note When split, no need update parent's ctime, - * and no permission check for name_remove. - */ - ma->ma_attr.la_ctime = 0; - if (is_dir) - ma->ma_attr.la_mode = S_IFDIR; - else - ma->ma_attr.la_mode = 0; - ma->ma_attr.la_valid = LA_MODE; - ma->ma_valid = MA_INODE; - - ma->ma_attr_flags |= MDS_PERM_BYPASS; - rc = mdo_name_remove(env, md_object_next(mo), lname, ma); - OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1); - if (rc) - GOTO(cleanup, rc); - - /** - * \note For each entry transferred to the slave MDS we should know - * whether this object is dir or not. Therefore the highest bit of the - * hash is used to indicate that (it is unused for hash purposes anyway). - */ - if (is_dir) { - ent->lde_hash = le64_to_cpu(ent->lde_hash); - ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT); - } - EXIT; -cleanup: - cmm_object_put(env, obj); - return rc; -} - -/** - * Remove all entries from passed page. - * These entries are going to remote MDT and thus should be removed locally. - */ -static int cmm_split_remove_page(const struct lu_env *env, - struct md_object *mo, - struct lu_rdpg *rdpg, - __u64 hash_end, __u32 *len) -{ - struct lu_dirpage *dp; - struct lu_dirent *ent; - int rc = 0; - ENTRY; - - *len = 0; - cfs_kmap(rdpg->rp_pages[0]); - dp = page_address(rdpg->rp_pages[0]); - for (ent = lu_dirent_start(dp); - ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end; - ent = lu_dirent_next(ent)) { - rc = cmm_split_remove_entry(env, mo, ent); - if (rc) { - /* - * XXX: Error handler to insert remove name back, - * currently we assumed it will success anyway in - * verfication test. - */ - CERROR("Can not del %*.*s, rc %d\n", - le16_to_cpu(ent->lde_namelen), - le16_to_cpu(ent->lde_namelen), - ent->lde_name, rc); - GOTO(unmap, rc); - } - *len += lu_dirent_size(ent); - } - - if (ent != lu_dirent_start(dp)) - *len += sizeof(struct lu_dirpage); - EXIT; -unmap: - cfs_kunmap(rdpg->rp_pages[0]); - return rc; -} - -/** - * Send one page of entries to the slave MDT. - * This page contains entries to be created there. - */ -static int cmm_split_send_page(const struct lu_env *env, - struct md_object *mo, - struct lu_rdpg *rdpg, - struct lu_fid *fid, int len) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct cmm_object *obj; - int rc = 0; - ENTRY; - - obj = cmm_object_find(env, cmm, fid); - if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); - - rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj), - rdpg->rp_pages[0], len); - cmm_object_put(env, obj); - RETURN(rc); -} - -/** Read one page of entries from local MDT. */ -static int cmm_split_read_page(const struct lu_env *env, - struct md_object *mo, - struct lu_rdpg *rdpg) -{ - int rc; - ENTRY; - memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE); - cfs_kunmap(rdpg->rp_pages[0]); - rc = mo_readpage(env, md_object_next(mo), rdpg); - RETURN(rc); -} - -/** - * This function performs migration of each directory stripe to its MDS. - */ -static int cmm_split_process_stripe(const struct lu_env *env, - struct md_object *mo, - struct lu_rdpg *rdpg, - struct lu_fid *lf, - __u64 end) -{ - int rc, done = 0; - ENTRY; - - LASSERT(rdpg->rp_npages == 1); - do { - struct lu_dirpage *ldp; - __u32 len = 0; - - /** - Read one page of entries from local MDT. */ - rc = cmm_split_read_page(env, mo, rdpg); - if (rc) { - CERROR("Error in readpage: %d\n", rc); - RETURN(rc); - } - - /** - Remove local entries which are going to remite MDT. */ - rc = cmm_split_remove_page(env, mo, rdpg, end, &len); - if (rc) { - CERROR("Error in remove stripe entries: %d\n", rc); - RETURN(rc); - } - - /** - * - Send entries page to slave MDT and repeat while there are - * more pages. - */ - if (len > 0) { - rc = cmm_split_send_page(env, mo, rdpg, lf, len); - if (rc) { - CERROR("Error in sending page: %d\n", rc); - RETURN(rc); - } - } - - cfs_kmap(rdpg->rp_pages[0]); - ldp = page_address(rdpg->rp_pages[0]); - if (le64_to_cpu(ldp->ldp_hash_end) >= end) - done = 1; - - rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end); - cfs_kunmap(rdpg->rp_pages[0]); - } while (!done); - - RETURN(rc); -} - -/** - * Directory scanner for split operation. - * - * It calculates hashes for names and organizes files to stripes. - */ -static int cmm_split_process_dir(const struct lu_env *env, - struct md_object *mo, - struct md_attr *ma) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg; - __u64 hash_segment; - int rc = 0, i; - ENTRY; - - memset(rdpg, 0, sizeof *rdpg); - rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT; - rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages; - rdpg->rp_pages = cmm_env_info(env)->cmi_pages; - - for (i = 0; i < rdpg->rp_npages; i++) { - rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD); - if (rdpg->rp_pages[i] == NULL) - GOTO(cleanup, rc = -ENOMEM); - } - - hash_segment = MAX_HASH_SIZE; - /** Whole hash range is divided on segments by number of MDS-es. */ - do_div(hash_segment, cmm->cmm_tgt_count + 1); - /** - * For each segment the cmm_split_process_stripe() is called to move - * entries on new server. - */ - for (i = 1; i < cmm->cmm_tgt_count + 1; i++) { - struct lu_fid *lf; - __u64 hash_end; - - lf = &ma->ma_lmv->mea_ids[i]; - - rdpg->rp_hash = i * hash_segment; - if (i == cmm->cmm_tgt_count) - hash_end = MAX_HASH_SIZE; - else - hash_end = rdpg->rp_hash + hash_segment; - rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end); - if (rc) { - CERROR("Error (rc = %d) while splitting for %d: fid=" - DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf), - rdpg->rp_hash, hash_end); - GOTO(cleanup, rc); - } - } - EXIT; -cleanup: - for (i = 0; i < rdpg->rp_npages; i++) - if (rdpg->rp_pages[i] != NULL) - cfs_free_page(rdpg->rp_pages[i]); - return rc; -} - -/** - * Directory splitting. - * - * Big directory can be split eventually. - */ -int cmm_split_dir(const struct lu_env *env, struct md_object *mo) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - int rc = 0, split; - struct lu_buf *buf; - ENTRY; - - cmm_lprocfs_time_start(env); - - LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); - memset(ma, 0, sizeof(*ma)); - - /** - Step1: Checking whether the dir needs to be split. */ - rc = cmm_split_expect(env, mo, ma, &split); - if (rc) - GOTO(out, rc); - - if (split != CMM_SPLIT_NEEDED) { - /* No split is needed, caller may proceed with create. */ - GOTO(out, rc = 0); - } - - /* Split should be done now, let's do it. */ - CWARN("Dir "DFID" is going to split (size: "LPU64")\n", - PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size); - - /** - * /note Disable transactions for split, since there will be so many trans in - * this one ops, conflict with current recovery design. - */ - rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL); - if (rc) { - CERROR("Can't disable trans for split, rc %d\n", rc); - GOTO(out, rc); - } - - /** - Step2: Prepare the md memory */ - ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1); - OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); - if (ma->ma_lmv == NULL) - GOTO(out, rc = -ENOMEM); - - /** - Step3: Create slave objects and fill the ma->ma_lmv */ - rc = cmm_split_slaves_create(env, mo, ma); - if (rc) { - CERROR("Can't create slaves for split, rc %d\n", rc); - GOTO(cleanup, rc); - } - - /** - Step4: Scan and split the object. */ - rc = cmm_split_process_dir(env, mo, ma); - if (rc) { - CERROR("Can't scan and split, rc %d\n", rc); - GOTO(cleanup, rc); - } - - /** - Step5: Set mea to the master object. */ - buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size); - rc = mo_xattr_set(env, md_object_next(mo), buf, - MDS_LMV_MD_NAME, 0); - if (rc) { - CERROR("Can't set MEA to master dir, " "rc %d\n", rc); - GOTO(cleanup, rc); - } - - /* set flag in cmm_object */ - md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE; - - /** - * - Finally, split succeed, tell client to repeat opetartion on correct - * MDT. - */ - CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu))); - rc = -ERESTART; - EXIT; -cleanup: - OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); -out: - cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT); - return rc; -} -/** @} */ diff --git a/lustre/cmm/mdc_device.c b/lustre/cmm/mdc_device.c deleted file mode 100644 index 58f72f0..0000000 --- a/lustre/cmm/mdc_device.c +++ /dev/null @@ -1,365 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2011, Whamcloud, Inc. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/mdc_device.c - * - * Lustre Metadata Client (mdc) - * - * Author: Mike Pershin - */ - -#define DEBUG_SUBSYSTEM S_MDS - -#include -#include -#include -#include -#include "cmm_internal.h" -#include "mdc_internal.h" - -static const struct lu_device_operations mdc_lu_ops; -/** - * \addtogroup cmm_mdc - * @{ - */ -/** - * The md_device_operation for mdc. It is empty. - */ -static const struct md_device_operations mdc_md_ops = { 0 }; - -/** - * Upcall handler in mdc. Analog of obd_device::o_notify(). - */ -static int mdc_obd_update(struct obd_device *host, - struct obd_device *watched, - enum obd_notify_event ev, void *owner, void *data) -{ - struct mdc_device *mc = owner; - int rc = 0; - ENTRY; - - LASSERT(mc != NULL); - CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev); - if (ev == OBD_NOTIFY_ACTIVE) { - CDEBUG(D_INFO|D_WARNING, "Device %s is active now\n", - watched->obd_name); - } else if (ev == OBD_NOTIFY_INACTIVE) { - CDEBUG(D_INFO|D_WARNING, "Device %s is inactive now\n", - watched->obd_name); - } else if (ev == OBD_NOTIFY_OCD) { - struct obd_connect_data *conn_data = - &watched->u.cli.cl_import->imp_connect_data; - /* - * Update exp_connect_flags. - */ - mc->mc_desc.cl_exp->exp_connect_flags = - conn_data->ocd_connect_flags; - CDEBUG(D_INFO, "Update connect_flags: "LPX64"\n", - conn_data->ocd_connect_flags); - } - - RETURN(rc); -} -/** - * Add new mdc device. - * Invoked by configuration command LCFG_ADD_MDC. - * - * MDC OBD is set up already and connected to the proper MDS - * mdc_add_obd() find that obd by uuid and connects to it. - * Local MDT uuid is used for connection. - */ -static int mdc_obd_add(const struct lu_env *env, - struct mdc_device *mc, struct lustre_cfg *cfg) -{ - struct mdc_cli_desc *desc = &mc->mc_desc; - struct obd_device *mdc; - const char *uuid_str = lustre_cfg_string(cfg, 1); - const char *index = lustre_cfg_string(cfg, 2); - const char *mdc_uuid_str = lustre_cfg_string(cfg, 4); - struct md_site *ms = lu_site2md(mdc2lu_dev(mc)->ld_site); - char *p; - int rc = 0; - - ENTRY; - LASSERT(uuid_str); - LASSERT(index); - - mc->mc_num = simple_strtol(index, &p, 10); - if (*p) { - CERROR("Invalid index in lustre_cgf, offset 2\n"); - RETURN(-EINVAL); - } - - obd_str2uuid(&desc->cl_srv_uuid, uuid_str); - obd_str2uuid(&desc->cl_cli_uuid, mdc_uuid_str); - /* try to find MDC OBD connected to the needed MDT */ - mdc = class_find_client_obd(&desc->cl_srv_uuid, LUSTRE_MDC_NAME, - &desc->cl_cli_uuid); - if (!mdc) { - CERROR("Cannot find MDC OBD connected to %s\n", uuid_str); - rc = -ENOENT; - } else if (!mdc->obd_set_up) { - CERROR("target %s not set up\n", mdc->obd_name); - rc = -EINVAL; - } else { - struct obd_connect_data *ocd; - - CDEBUG(D_CONFIG, "connect to %s(%s)\n", - mdc->obd_name, mdc->obd_uuid.uuid); - - OBD_ALLOC_PTR(ocd); - if (!ocd) - RETURN(-ENOMEM); - /* - * The connection between MDS must be local, - * IBITS are needed for rename_lock (INODELOCK_UPDATE) - */ - ocd->ocd_ibits_known = MDS_INODELOCK_UPDATE; - ocd->ocd_connect_flags = OBD_CONNECT_VERSION | - OBD_CONNECT_ACL | - OBD_CONNECT_RMT_CLIENT | - OBD_CONNECT_MDS_CAPA | - OBD_CONNECT_OSS_CAPA | - OBD_CONNECT_IBITS | - OBD_CONNECT_BRW_SIZE | - OBD_CONNECT_MDS_MDS | - OBD_CONNECT_FID | - OBD_CONNECT_AT | - OBD_CONNECT_FULL20 | - OBD_CONNECT_64BITHASH; - ocd->ocd_brw_size = PTLRPC_MAX_BRW_SIZE; - rc = obd_connect(env, &desc->cl_exp, mdc, &mdc->obd_uuid, ocd, NULL); - OBD_FREE_PTR(ocd); - if (rc) { - CERROR("target %s connect error %d\n", - mdc->obd_name, rc); - } else { - /* set seq controller export for MDC0 if exists */ - if (mc->mc_num == 0) - ms->ms_control_exp = - class_export_get(desc->cl_exp); - rc = obd_fid_init(desc->cl_exp); - if (rc) - CERROR("fid init error %d \n", rc); - else { - /* obd notify mechanism */ - mdc->obd_upcall.onu_owner = mc; - mdc->obd_upcall.onu_upcall = mdc_obd_update; - } - } - - if (rc) { - obd_disconnect(desc->cl_exp); - desc->cl_exp = NULL; - } - } - - RETURN(rc); -} - -/** - * Delete mdc device. - * Called when configuration command LCFG_CLEANUP is issued. - * - * This disconnects MDC OBD and cleanup it. - */ -static int mdc_obd_del(const struct lu_env *env, struct mdc_device *mc, - struct lustre_cfg *cfg) -{ - struct mdc_cli_desc *desc = &mc->mc_desc; - const char *dev = lustre_cfg_string(cfg, 0); - struct obd_device *mdc_obd = class_exp2obd(desc->cl_exp); - struct obd_device *mdt_obd; - int rc; - - ENTRY; - - CDEBUG(D_CONFIG, "Disconnect from %s\n", - mdc_obd->obd_name); - - /* Set mdt_obd flags in shutdown. */ - mdt_obd = class_name2obd(dev); - LASSERT(mdt_obd != NULL); - if (mdc_obd) { - mdc_obd->obd_no_recov = mdt_obd->obd_no_recov; - mdc_obd->obd_force = mdt_obd->obd_force; - mdc_obd->obd_fail = 0; - } - - rc = obd_fid_fini(desc->cl_exp); - if (rc) - CERROR("Fid fini error %d\n", rc); - - obd_register_observer(mdc_obd, NULL); - mdc_obd->obd_upcall.onu_owner = NULL; - mdc_obd->obd_upcall.onu_upcall = NULL; - rc = obd_disconnect(desc->cl_exp); - if (rc) { - CERROR("Target %s disconnect error %d\n", - mdc_obd->obd_name, rc); - } - class_manual_cleanup(mdc_obd); - desc->cl_exp = NULL; - - RETURN(0); -} - -/** - * Process config command. Passed to the mdc from mdt. - * Supports two commands only - LCFG_ADD_MDC and LCFG_CLEANUP - */ -static int mdc_process_config(const struct lu_env *env, - struct lu_device *ld, - struct lustre_cfg *cfg) -{ - struct mdc_device *mc = lu2mdc_dev(ld); - int rc; - - ENTRY; - switch (cfg->lcfg_command) { - case LCFG_ADD_MDC: - rc = mdc_obd_add(env, mc, cfg); - break; - case LCFG_CLEANUP: - rc = mdc_obd_del(env, mc, cfg); - break; - default: - rc = -EOPNOTSUPP; - } - RETURN(rc); -} - -/** - * lu_device_operations instance for mdc. - */ -static const struct lu_device_operations mdc_lu_ops = { - .ldo_object_alloc = mdc_object_alloc, - .ldo_process_config = mdc_process_config -}; - -/** - * Initialize proper easize and cookie size. - */ -void cmm_mdc_init_ea_size(const struct lu_env *env, struct mdc_device *mc, - int max_mdsize, int max_cookiesize) -{ - struct obd_device *obd = class_exp2obd(mc->mc_desc.cl_exp); - - obd->u.cli.cl_max_mds_easize = max_mdsize; - obd->u.cli.cl_max_mds_cookiesize = max_cookiesize; -} - -/** Start mdc device */ -static int mdc_device_init(const struct lu_env *env, struct lu_device *ld, - const char *name, struct lu_device *next) -{ - return 0; -} - -/** Stop mdc device. */ -static struct lu_device *mdc_device_fini(const struct lu_env *env, - struct lu_device *ld) -{ - ENTRY; - RETURN (NULL); -} - -/** Allocate new mdc device */ -static struct lu_device *mdc_device_alloc(const struct lu_env *env, - struct lu_device_type *ldt, - struct lustre_cfg *cfg) -{ - struct lu_device *ld; - struct mdc_device *mc; - ENTRY; - - OBD_ALLOC_PTR(mc); - if (mc == NULL) { - ld = ERR_PTR(-ENOMEM); - } else { - md_device_init(&mc->mc_md_dev, ldt); - mc->mc_md_dev.md_ops = &mdc_md_ops; - ld = mdc2lu_dev(mc); - ld->ld_ops = &mdc_lu_ops; - cfs_sema_init(&mc->mc_fid_sem, 1); - } - - RETURN (ld); -} - -/** Free mdc device */ -static struct lu_device *mdc_device_free(const struct lu_env *env, - struct lu_device *ld) -{ - struct mdc_device *mc = lu2mdc_dev(ld); - - LASSERTF(cfs_atomic_read(&ld->ld_ref) == 0, - "Refcount = %d\n", cfs_atomic_read(&ld->ld_ref)); - LASSERT(cfs_list_empty(&mc->mc_linkage)); - md_device_fini(&mc->mc_md_dev); - OBD_FREE_PTR(mc); - return NULL; -} - -/** context key constructor/destructor: mdc_key_init, mdc_key_fini */ -LU_KEY_INIT_FINI(mdc, struct mdc_thread_info); - -/** context key: mdc_thread_key */ -LU_CONTEXT_KEY_DEFINE(mdc, LCT_MD_THREAD|LCT_CL_THREAD); - -/** type constructor/destructor: mdc_type_init, mdc_type_fini */ -LU_TYPE_INIT_FINI(mdc, &mdc_thread_key); - -static struct lu_device_type_operations mdc_device_type_ops = { - .ldto_init = mdc_type_init, - .ldto_fini = mdc_type_fini, - - .ldto_start = mdc_type_start, - .ldto_stop = mdc_type_stop, - - .ldto_device_alloc = mdc_device_alloc, - .ldto_device_free = mdc_device_free, - - .ldto_device_init = mdc_device_init, - .ldto_device_fini = mdc_device_fini -}; - -struct lu_device_type mdc_device_type = { - .ldt_tags = LU_DEVICE_MD, - .ldt_name = LUSTRE_CMM_MDC_NAME, - .ldt_ops = &mdc_device_type_ops, - .ldt_ctx_tags = LCT_MD_THREAD|LCT_CL_THREAD -}; -/** @} */ diff --git a/lustre/cmm/mdc_internal.h b/lustre/cmm/mdc_internal.h deleted file mode 100644 index 69eaae0..0000000 --- a/lustre/cmm/mdc_internal.h +++ /dev/null @@ -1,152 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/mdc_internal.h - * - * Lustre Cluster Metadata Manager (cmm), MDC device - * - * Author: Mike Pershin - */ - -#ifndef _CMM_MDC_INTERNAL_H -#define _CMM_MDC_INTERNAL_H - -#if defined(__KERNEL__) - -#include -#include -#include -/** - * \addtogroup cmm - * @{ - */ -/** - * \defgroup cmm_mdc cmm_mdc - * - * This is mdc wrapper device to work with old MDC obd-based devices. - * @{ - */ -/** - * MDC client description. - */ -struct mdc_cli_desc { - /** uuid of remote MDT to connect */ - struct obd_uuid cl_srv_uuid; - /** mdc uuid */ - struct obd_uuid cl_cli_uuid; - /** export of mdc obd */ - struct obd_export *cl_exp; -}; - -/** - * MDC device. - */ -struct mdc_device { - /** md_device instance for MDC */ - struct md_device mc_md_dev; - /** other MD servers in cluster */ - cfs_list_t mc_linkage; - /** number of current device */ - mdsno_t mc_num; - /** mdc client description */ - struct mdc_cli_desc mc_desc; - /** Protects ??*/ - cfs_semaphore_t mc_fid_sem; -}; - -/** - * mdc thread info. Local storage for varios data. - */ -struct mdc_thread_info { - /** Storage for md_op_data */ - struct md_op_data mci_opdata; - /** Storage for ptlrpc request */ - struct ptlrpc_request *mci_req; -}; - -/** mdc object. */ -struct mdc_object { - /** md_object instance for mdc_object */ - struct md_object mco_obj; -}; - -/** Get lu_device from mdc_device. */ -static inline struct lu_device *mdc2lu_dev(struct mdc_device *mc) -{ - return (&mc->mc_md_dev.md_lu_dev); -} - -/** Get mdc_device from md_device. */ -static inline struct mdc_device *md2mdc_dev(struct md_device *md) -{ - return container_of0(md, struct mdc_device, mc_md_dev); -} - -/** Get mdc_device from mdc_object. */ -static inline struct mdc_device *mdc_obj2dev(struct mdc_object *mco) -{ - return (md2mdc_dev(md_obj2dev(&mco->mco_obj))); -} - -/** Get mdc_object from lu_object. */ -static inline struct mdc_object *lu2mdc_obj(struct lu_object *lo) -{ - return container_of0(lo, struct mdc_object, mco_obj.mo_lu); -} - -/** Get mdc_object from md_object. */ -static inline struct mdc_object *md2mdc_obj(struct md_object *mo) -{ - return container_of0(mo, struct mdc_object, mco_obj); -} - -/** Get mdc_device from lu_device. */ -static inline struct mdc_device *lu2mdc_dev(struct lu_device *ld) -{ - return container_of0(ld, struct mdc_device, mc_md_dev.md_lu_dev); -} - -struct lu_object *mdc_object_alloc(const struct lu_env *, - const struct lu_object_header *, - struct lu_device *); - -void cmm_mdc_init_ea_size(const struct lu_env *env, struct mdc_device *mc, - int max_mdsize, int max_cookiesize); -#ifdef HAVE_SPLIT_SUPPORT -int mdc_send_page(struct cmm_device *cmm, const struct lu_env *env, - struct md_object *mo, struct page *page, __u32 end); -#endif -/** @} */ -/** @} */ -#endif /* __KERNEL__ */ -#endif /* _CMM_MDC_INTERNAL_H */ diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c deleted file mode 100644 index 71dd7fa..0000000 --- a/lustre/cmm/mdc_object.c +++ /dev/null @@ -1,668 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/cmm/mdc_object.c - * - * Lustre Cluster Metadata Manager (cmm) - * - * Author: Mike Pershin - */ - -#define DEBUG_SUBSYSTEM S_MDS -#include -#include -#include -#include -#include "cmm_internal.h" -#include "mdc_internal.h" - -static const struct md_object_operations mdc_mo_ops; -static const struct md_dir_operations mdc_dir_ops; -static const struct lu_object_operations mdc_obj_ops; - -extern struct lu_context_key mdc_thread_key; -/** - * \addtogroup cmm_mdc - * @{ - */ -/** - * Allocate new mdc object. - */ -struct lu_object *mdc_object_alloc(const struct lu_env *env, - const struct lu_object_header *hdr, - struct lu_device *ld) -{ - struct mdc_object *mco; - ENTRY; - - OBD_ALLOC_PTR(mco); - if (mco != NULL) { - struct lu_object *lo; - - lo = &mco->mco_obj.mo_lu; - lu_object_init(lo, NULL, ld); - mco->mco_obj.mo_ops = &mdc_mo_ops; - mco->mco_obj.mo_dir_ops = &mdc_dir_ops; - lo->lo_ops = &mdc_obj_ops; - RETURN(lo); - } else - RETURN(NULL); -} - -/** Free current mdc object */ -static void mdc_object_free(const struct lu_env *env, struct lu_object *lo) -{ - struct mdc_object *mco = lu2mdc_obj(lo); - lu_object_fini(lo); - OBD_FREE_PTR(mco); -} - -/** - * Initialize mdc object. All of them have loh_attr::LOHA_REMOTE set. - */ -static int mdc_object_init(const struct lu_env *env, struct lu_object *lo, - const struct lu_object_conf *unused) -{ - ENTRY; - lo->lo_header->loh_attr |= LOHA_REMOTE; - RETURN(0); -} - -/** - * Instance of lu_object_operations for mdc. - */ -static const struct lu_object_operations mdc_obj_ops = { - .loo_object_init = mdc_object_init, - .loo_object_free = mdc_object_free, -}; - -/** - * \name The set of md_object_operations. - * @{ - */ -/** - * Get mdc_thread_info from lu_context - */ -static -struct mdc_thread_info *mdc_info_get(const struct lu_env *env) -{ - struct mdc_thread_info *mci; - - mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); - LASSERT(mci); - return mci; -} - -/** - * Initialize mdc_thread_info. - */ -static -struct mdc_thread_info *mdc_info_init(const struct lu_env *env) -{ - struct mdc_thread_info *mci = mdc_info_get(env); - memset(mci, 0, sizeof(*mci)); - return mci; -} - -/** - * Convert attributes from mdt_body to the md_attr. - */ -static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma) -{ - struct lu_attr *la = &ma->ma_attr; - /* update time */ - if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) { - la->la_ctime = body->ctime; - if (body->valid & OBD_MD_FLMTIME) - la->la_mtime = body->mtime; - } - - if (body->valid & OBD_MD_FLMODE) - la->la_mode = body->mode; - if (body->valid & OBD_MD_FLSIZE) - la->la_size = body->size; - if (body->valid & OBD_MD_FLBLOCKS) - la->la_blocks = body->blocks; - if (body->valid & OBD_MD_FLUID) - la->la_uid = body->uid; - if (body->valid & OBD_MD_FLGID) - la->la_gid = body->gid; - if (body->valid & OBD_MD_FLFLAGS) - la->la_flags = body->flags; - if (body->valid & OBD_MD_FLNLINK) - la->la_nlink = body->nlink; - if (body->valid & OBD_MD_FLRDEV) - la->la_rdev = body->rdev; - - la->la_valid = body->valid; - ma->ma_valid = MA_INODE; -} - -/** - * Fill the md_attr \a ma with attributes from request. - */ -static int mdc_req2attr_update(const struct lu_env *env, - struct md_attr *ma) -{ - struct mdc_thread_info *mci; - struct ptlrpc_request *req; - struct mdt_body *body; - struct lov_mds_md *md; - struct llog_cookie *cookie; - void *acl; - - ENTRY; - mci = mdc_info_get(env); - req = mci->mci_req; - LASSERT(req); - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - LASSERT(body); - mdc_body2attr(body, ma); - - if (body->valid & OBD_MD_FLMDSCAPA) { - struct lustre_capa *capa; - - /* create for cross-ref will fetch mds capa from remote obj */ - capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1); - LASSERT(capa != NULL); - LASSERT(ma->ma_capa != NULL); - *ma->ma_capa = *capa; - } - - if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) { - if (body->eadatasize == 0) { - CERROR("No size defined for easize field\n"); - RETURN(-EPROTO); - } - - md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, - body->eadatasize); - if (md == NULL) - RETURN(-EPROTO); - - LASSERT(ma->ma_lmm != NULL); - LASSERT(ma->ma_lmm_size >= body->eadatasize); - ma->ma_lmm_size = body->eadatasize; - memcpy(ma->ma_lmm, md, ma->ma_lmm_size); - ma->ma_valid |= MA_LOV; - } - - if (body->valid & OBD_MD_FLCOOKIE) { - /* - * ACL and cookie share the same body->aclsize, we need - * to make sure that they both never come here. - */ - LASSERT(!(body->valid & OBD_MD_FLACL)); - - if (body->aclsize == 0) { - CERROR("No size defined for cookie field\n"); - RETURN(-EPROTO); - } - - cookie = req_capsule_server_sized_get(&req->rq_pill, - &RMF_LOGCOOKIES, - body->aclsize); - if (cookie == NULL) - RETURN(-EPROTO); - - LASSERT(ma->ma_cookie != NULL); - LASSERT(ma->ma_cookie_size == body->aclsize); - memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size); - ma->ma_valid |= MA_COOKIE; - } - -#ifdef CONFIG_FS_POSIX_ACL - if (body->valid & OBD_MD_FLACL) { - if (body->aclsize == 0) { - CERROR("No size defined for acl field\n"); - RETURN(-EPROTO); - } - - acl = req_capsule_server_sized_get(&req->rq_pill, - &RMF_ACL, - body->aclsize); - if (acl == NULL) - RETURN(-EPROTO); - - LASSERT(ma->ma_acl != NULL); - LASSERT(ma->ma_acl_size == body->aclsize); - memcpy(ma->ma_acl, acl, ma->ma_acl_size); - ma->ma_valid |= MA_ACL_DEF; - } -#endif - - RETURN(0); -} - -/** - * The md_object_operations::moo_attr_get() in mdc. - */ -static int mdc_attr_get(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); - struct mdc_thread_info *mci; - int rc; - ENTRY; - - mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); - LASSERT(mci); - - memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata)); - - memcpy(&mci->mci_opdata.op_fid1, lu_object_fid(&mo->mo_lu), - sizeof (struct lu_fid)); - mci->mci_opdata.op_valid = OBD_MD_FLMODE | OBD_MD_FLUID | - OBD_MD_FLGID | OBD_MD_FLFLAGS | - OBD_MD_FLCROSSREF; - - rc = md_getattr(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req); - if (rc == 0) { - /* get attr from request */ - rc = mdc_req2attr_update(env, ma); - } - - ptlrpc_req_finished(mci->mci_req); - - RETURN(rc); -} - -/** - * Helper to init timspec \a t. - */ -static inline struct timespec *mdc_attr_time(struct timespec *t, obd_time seconds) -{ - t->tv_sec = seconds; - t->tv_nsec = 0; - return t; -} - -/** - * The md_object_operations::moo_attr_set() in mdc. - * - * \note It is only used for set ctime when rename's source on remote MDS. - */ -static int mdc_attr_set(const struct lu_env *env, struct md_object *mo, - const struct md_attr *ma) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); - const struct lu_attr *la = &ma->ma_attr; - struct mdc_thread_info *mci; - struct md_ucred *uc = md_ucred(env); - int rc; - ENTRY; - - LASSERT(ma->ma_attr.la_valid & LA_CTIME); - - mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); - LASSERT(mci); - - memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata)); - - mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu); - mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime); - mci->mci_opdata.op_attr.ia_mode = la->la_mode; - mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET; - if (uc && - ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { - mci->mci_opdata.op_fsuid = uc->mu_fsuid; - mci->mci_opdata.op_fsgid = uc->mu_fsgid; - mci->mci_opdata.op_cap = uc->mu_cap; - if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) { - mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; - mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1]; - } else { - mci->mci_opdata.op_suppgids[0] = - mci->mci_opdata.op_suppgids[1] = -1; - } - } else { - mci->mci_opdata.op_fsuid = la->la_uid; - mci->mci_opdata.op_fsgid = la->la_gid; - mci->mci_opdata.op_cap = cfs_curproc_cap_pack(); - mci->mci_opdata.op_suppgids[0] = - mci->mci_opdata.op_suppgids[1] = -1; - } - - rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata, - NULL, 0, NULL, 0, &mci->mci_req, NULL); - - ptlrpc_req_finished(mci->mci_req); - - RETURN(rc); -} - -/** - * The md_object_operations::moo_object_create() in mdc. - */ -static int mdc_object_create(const struct lu_env *env, - struct md_object *mo, - const struct md_op_spec *spec, - struct md_attr *ma) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); - struct lu_attr *la = &ma->ma_attr; - struct mdc_thread_info *mci; - const void *symname; - struct md_ucred *uc = md_ucred(env); - int rc, symlen; - uid_t uid; - gid_t gid; - cfs_cap_t cap; - ENTRY; - - LASSERT(S_ISDIR(la->la_mode)); - LASSERT(spec->u.sp_pfid != NULL); - - mci = mdc_info_init(env); - mci->mci_opdata.op_bias = MDS_CROSS_REF; - mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu); - - /* Parent fid is needed to create dotdot on the remote node. */ - mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid); - mci->mci_opdata.op_mod_time = la->la_ctime; - if (uc && - ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { - uid = uc->mu_fsuid; - if (la->la_mode & S_ISGID) - gid = la->la_gid; - else - gid = uc->mu_fsgid; - cap = uc->mu_cap; - if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) - mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; - else - mci->mci_opdata.op_suppgids[0] = -1; - } else { - uid = la->la_uid; - gid = la->la_gid; - cap = 0; - mci->mci_opdata.op_suppgids[0] = -1; - } - - /* get data from spec */ - if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) { - symname = spec->u.sp_ea.eadata; - symlen = spec->u.sp_ea.eadatalen; - mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid); - mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ; -#ifdef CONFIG_FS_POSIX_ACL - } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) { - symname = spec->u.sp_ea.eadata; - symlen = spec->u.sp_ea.eadatalen; - mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid); - mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL; -#endif - } else { - symname = spec->u.sp_symname; - symlen = symname ? strlen(symname) + 1 : 0; - } - - rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata, - symname, symlen, la->la_mode, uid, gid, - cap, la->la_rdev, &mci->mci_req); - - if (rc == 0) { - /* get attr from request */ - rc = mdc_req2attr_update(env, ma); - } - - ptlrpc_req_finished(mci->mci_req); - - RETURN(rc); -} - -/** - * The md_object_operations::moo_ref_add() in mdc. - */ -static int mdc_ref_add(const struct lu_env *env, struct md_object *mo, - const struct md_attr *ma) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); - const struct lu_attr *la = &ma->ma_attr; - struct mdc_thread_info *mci; - struct md_ucred *uc = md_ucred(env); - int rc; - ENTRY; - - mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); - LASSERT(mci); - - memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata)); - mci->mci_opdata.op_bias = MDS_CROSS_REF; - mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu); - mci->mci_opdata.op_mod_time = la->la_ctime; - if (uc && - ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { - mci->mci_opdata.op_fsuid = uc->mu_fsuid; - mci->mci_opdata.op_fsgid = uc->mu_fsgid; - mci->mci_opdata.op_cap = uc->mu_cap; - if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) { - mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; - mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1]; - } else { - mci->mci_opdata.op_suppgids[0] = - mci->mci_opdata.op_suppgids[1] = -1; - } - } else { - mci->mci_opdata.op_fsuid = la->la_uid; - mci->mci_opdata.op_fsgid = la->la_gid; - mci->mci_opdata.op_cap = cfs_curproc_cap_pack(); - mci->mci_opdata.op_suppgids[0] = - mci->mci_opdata.op_suppgids[1] = -1; - } - - - rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req); - - ptlrpc_req_finished(mci->mci_req); - - RETURN(rc); -} - -/** - * The md_object_operations::moo_ref_del() in mdc. - */ -static int mdc_ref_del(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); - struct lu_attr *la = &ma->ma_attr; - struct mdc_thread_info *mci; - struct md_ucred *uc = md_ucred(env); - int rc; - ENTRY; - - mci = mdc_info_init(env); - mci->mci_opdata.op_bias = MDS_CROSS_REF; - if (ma->ma_attr_flags & MDS_VTX_BYPASS) - mci->mci_opdata.op_bias |= MDS_VTX_BYPASS; - else - mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS; - mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu); - mci->mci_opdata.op_mode = la->la_mode; - mci->mci_opdata.op_mod_time = la->la_ctime; - if (uc && - ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { - mci->mci_opdata.op_fsuid = uc->mu_fsuid; - mci->mci_opdata.op_fsgid = uc->mu_fsgid; - mci->mci_opdata.op_cap = uc->mu_cap; - if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) - mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; - else - mci->mci_opdata.op_suppgids[0] = -1; - } else { - mci->mci_opdata.op_fsuid = la->la_uid; - mci->mci_opdata.op_fsgid = la->la_gid; - mci->mci_opdata.op_cap = cfs_curproc_cap_pack(); - mci->mci_opdata.op_suppgids[0] = -1; - } - - rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req); - if (rc == 0) { - /* get attr from request */ - rc = mdc_req2attr_update(env, ma); - } - - ptlrpc_req_finished(mci->mci_req); - - RETURN(rc); -} - -#ifdef HAVE_SPLIT_SUPPORT -/** Send page with directory entries to another MDS. */ -int mdc_send_page(struct cmm_device *cm, const struct lu_env *env, - struct md_object *mo, struct page *page, __u32 offset) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); - int rc; - ENTRY; - - rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), - page, offset); - CDEBUG(rc ? D_ERROR : D_INFO, "send page %p offset %d fid "DFID - " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc); - RETURN(rc); -} -#endif - -/** - * Instance of md_object_operations for mdc. - */ -static const struct md_object_operations mdc_mo_ops = { - .moo_attr_get = mdc_attr_get, - .moo_attr_set = mdc_attr_set, - .moo_object_create = mdc_object_create, - .moo_ref_add = mdc_ref_add, - .moo_ref_del = mdc_ref_del, -}; -/** @} */ - -/** - * \name The set of md_dir_operations. - * @{ - */ -/** - * The md_dir_operations::mdo_rename_tgt in mdc. - */ -static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p, - struct md_object *mo_t, const struct lu_fid *lf, - const struct lu_name *lname, struct md_attr *ma) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p)); - struct lu_attr *la = &ma->ma_attr; - struct mdc_thread_info *mci; - struct md_ucred *uc = md_ucred(env); - int rc; - ENTRY; - - mci = mdc_info_init(env); - mci->mci_opdata.op_bias = MDS_CROSS_REF; - if (ma->ma_attr_flags & MDS_VTX_BYPASS) - mci->mci_opdata.op_bias |= MDS_VTX_BYPASS; - else - mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS; - mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu); - mci->mci_opdata.op_fid2 = *lf; - mci->mci_opdata.op_mode = la->la_mode; - mci->mci_opdata.op_mod_time = la->la_ctime; - if (uc && - ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { - mci->mci_opdata.op_fsuid = uc->mu_fsuid; - mci->mci_opdata.op_fsgid = uc->mu_fsgid; - mci->mci_opdata.op_cap = uc->mu_cap; - if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) { - mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; - mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1]; - } else { - mci->mci_opdata.op_suppgids[0] = - mci->mci_opdata.op_suppgids[1] = -1; - } - } else { - mci->mci_opdata.op_fsuid = la->la_uid; - mci->mci_opdata.op_fsgid = la->la_gid; - mci->mci_opdata.op_cap = cfs_curproc_cap_pack(); - mci->mci_opdata.op_suppgids[0] = - mci->mci_opdata.op_suppgids[1] = -1; - } - - rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0, - lname->ln_name, lname->ln_namelen, &mci->mci_req); - if (rc == 0) { - /* get attr from request */ - mdc_req2attr_update(env, ma); - } - - ptlrpc_req_finished(mci->mci_req); - - RETURN(rc); -} -/** - * Check the fids are not relatives. - * The md_dir_operations::mdo_is_subdir() in mdc. - * - * Return resulting fid in sfid. - * \retval \a sfid = 0 fids are not relatives - * \retval \a sfid = FID at which search stopped - */ -static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo, - const struct lu_fid *fid, struct lu_fid *sfid) -{ - struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); - struct mdc_thread_info *mci; - struct mdt_body *body; - int rc; - ENTRY; - - mci = mdc_info_init(env); - - rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), - fid, &mci->mci_req); - if (rc == 0 || rc == -EREMOTE) { - body = req_capsule_server_get(&mci->mci_req->rq_pill, - &RMF_MDT_BODY); - LASSERT(body->valid & OBD_MD_FLID); - - CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n", - PFID(&body->fid1)); - *sfid = body->fid1; - } - ptlrpc_req_finished(mci->mci_req); - RETURN(rc); -} - -/** Instance of md_dir_operations for mdc. */ -static const struct md_dir_operations mdc_dir_ops = { - .mdo_is_subdir = mdc_is_subdir, - .mdo_rename_tgt = mdc_rename_tgt -}; -/** @} */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 2c24b9c..4fdbe67 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2083,11 +2083,7 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa); * anymore, reserve this flags * just for preventing such bit * to be reused. */ -#define MDS_CREATE_RMT_ACL 01000000000 /* indicate create on remote server - * with default ACL */ -#define MDS_CREATE_SLAVE_OBJ 02000000000 /* indicate create slave object - * actually, this is for create, not - * conflict with other open flags */ + #define MDS_OPEN_LOCK 04000000000 /* This open requires open lock */ #define MDS_OPEN_HAS_EA 010000000000 /* specify object create pattern */ #define MDS_OPEN_HAS_OBJS 020000000000 /* Just set the EA the obj exist */ diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index 487fdaf..dc49759 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -90,8 +90,6 @@ #define LDD_F_UPGRADE14 0x0200 /** process as lctl conf_param */ #define LDD_F_PARAM 0x0400 -/** backend fs make use of IAM directory format. */ -#define LDD_F_IAM_DIR 0x0800 /** all nodes are specified as service nodes */ #define LDD_F_NO_PRIMNODE 0x1000 /** IR enable flag */ @@ -105,7 +103,7 @@ #define LDD_F_OPC_READY 0x40000000 #define LDD_F_OPC_MASK 0xf0000000 -#define LDD_F_ONDISK_MASK (LDD_F_SV_TYPE_MASK | LDD_F_IAM_DIR) +#define LDD_F_ONDISK_MASK (LDD_F_SV_TYPE_MASK) #define LDD_F_MASK 0xFFFF diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 33c0f9a..376522f 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -214,9 +214,6 @@ struct md_op_spec { /** Current lock mode for parent dir where create is performing. */ mdl_mode_t sp_cr_mode; - /** Check for split */ - int sp_ck_split; - /** to create directory */ const struct dt_index_features *sp_feat; }; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index f752015..4a84abe 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1023,7 +1023,6 @@ static int mdt_getattr(struct mdt_thread_info *info) if (unlikely(rc)) GOTO(out_shrink, rc); - info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT); info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF); /* @@ -1404,7 +1403,6 @@ static int mdt_getattr_name(struct mdt_thread_info *info) repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(repbody != NULL); - info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT); info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF); repbody->eadatasize = 0; repbody->aclsize = 0; @@ -1569,180 +1567,6 @@ static int mdt_sendpage(struct mdt_thread_info *info, RETURN(rc); } -#ifdef HAVE_SPLIT_SUPPORT -/* - * Retrieve dir entry from the page and insert it to the slave object, actually, - * this should be in osd layer, but since it will not in the final product, so - * just do it here and do not define more moo api anymore for this. - */ -static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page, - int size) -{ - struct mdt_object *object = info->mti_object; - struct lu_fid *lf = &info->mti_tmp_fid2; - struct md_attr *ma = &info->mti_attr; - struct lu_dirpage *dp; - struct lu_dirent *ent; - int rc = 0, offset = 0; - ENTRY; - - /* Make sure we have at least one entry. */ - if (size == 0) - RETURN(-EINVAL); - - /* - * Disable trans for this name insert, since it will include many trans - * for this. - */ - info->mti_no_need_trans = 1; - /* - * When write_dir_page, no need update parent's ctime, - * and no permission check for name_insert. - */ - ma->ma_attr.la_ctime = 0; - ma->ma_attr.la_valid = LA_MODE; - ma->ma_valid = MA_INODE; - - cfs_kmap(page); - dp = page_address(page); - offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp); - - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - struct lu_name *lname; - char *name; - - if (le16_to_cpu(ent->lde_namelen) == 0) - continue; - - fid_le_to_cpu(lf, &ent->lde_fid); - if (le64_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT) - ma->ma_attr.la_mode = S_IFDIR; - else - ma->ma_attr.la_mode = 0; - OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1); - if (name == NULL) - GOTO(out, rc = -ENOMEM); - - memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen)); - lname = mdt_name(info->mti_env, name, - le16_to_cpu(ent->lde_namelen)); - ma->ma_attr_flags |= (MDS_PERM_BYPASS | MDS_QUOTA_IGNORE); - rc = mdo_name_insert(info->mti_env, - md_object_next(&object->mot_obj), - lname, lf, ma); - OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1); - if (rc) { - CERROR("Can't insert %*.*s, rc %d\n", - le16_to_cpu(ent->lde_namelen), - le16_to_cpu(ent->lde_namelen), - ent->lde_name, rc); - GOTO(out, rc); - } - - offset += lu_dirent_size(ent); - if (offset >= size) - break; - } - EXIT; -out: - cfs_kunmap(page); - return rc; -} - -static int mdt_bulk_timeout(void *data) -{ - ENTRY; - - CERROR("mdt bulk transfer timeout \n"); - - RETURN(1); -} - -static int mdt_writepage(struct mdt_thread_info *info) -{ - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_body *reqbody; - struct l_wait_info *lwi; - struct ptlrpc_bulk_desc *desc; - struct page *page; - int rc; - ENTRY; - - - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); - if (reqbody == NULL) - RETURN(err_serious(-EFAULT)); - - desc = ptlrpc_prep_bulk_exp(req, 1, BULK_GET_SINK, MDS_BULK_PORTAL); - if (desc == NULL) - RETURN(err_serious(-ENOMEM)); - - /* allocate the page for the desc */ - page = cfs_alloc_page(CFS_ALLOC_STD); - if (page == NULL) - GOTO(desc_cleanup, rc = -ENOMEM); - - CDEBUG(D_INFO, "Received page offset %d size %d \n", - (int)reqbody->size, (int)reqbody->nlink); - - ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size, - (int)reqbody->nlink); - - rc = sptlrpc_svc_prep_bulk(req, desc); - if (rc != 0) - GOTO(cleanup_page, rc); - /* - * Check if client was evicted while we were doing i/o before touching - * network. - */ - OBD_ALLOC_PTR(lwi); - if (!lwi) - GOTO(cleanup_page, rc = -ENOMEM); - - if (desc->bd_export->exp_failed) - rc = -ENOTCONN; - else - rc = ptlrpc_start_bulk_transfer (desc); - if (rc == 0) { - *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * CFS_HZ / 4, CFS_HZ, - mdt_bulk_timeout, desc); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) || - desc->bd_export->exp_failed, lwi); - LASSERT(rc == 0 || rc == -ETIMEDOUT); - if (rc == -ETIMEDOUT) { - DEBUG_REQ(D_ERROR, req, "timeout on bulk GET"); - ptlrpc_abort_bulk(desc); - } else if (desc->bd_export->exp_failed) { - DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET"); - rc = -ENOTCONN; - ptlrpc_abort_bulk(desc); - } else if (!desc->bd_success || - desc->bd_nob_transferred != desc->bd_nob) { - DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)", - desc->bd_success ? - "truncated" : "network error on", - desc->bd_nob_transferred, desc->bd_nob); - /* XXX should this be a different errno? */ - rc = -ETIMEDOUT; - } - } else { - DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc); - } - if (rc) - GOTO(cleanup_lwi, rc); - rc = mdt_write_dir_page(info, page, reqbody->nlink); - -cleanup_lwi: - OBD_FREE_PTR(lwi); -cleanup_page: - cfs_free_page(page); -desc_cleanup: - ptlrpc_free_bulk_pin(desc); - RETURN(rc); -} -#endif - static int mdt_readpage(struct mdt_thread_info *info) { struct mdt_object *object = info->mti_object; @@ -3124,7 +2948,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_big_lmm_used = 0; /* To not check for split by default. */ - info->mti_spec.sp_ck_split = 0; info->mti_spec.no_create = 0; } @@ -3715,7 +3538,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(repbody); - info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT); info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF); repbody->eadatasize = 0; repbody->aclsize = 0; @@ -5147,12 +4969,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, /* CMD is supported only in IAM mode */ LASSERT(num); node_id = simple_strtol(num, NULL, 10); - if (!(lsi->lsi_flags & LDD_F_IAM_DIR) && node_id) { - CERROR("CMD Operation not allowed in IOP mode\n"); - GOTO(err_lmi, rc = -EINVAL); - } - - obd->u.obt.obt_magic = OBT_MAGIC; + obd->u.obt.obt_magic = OBT_MAGIC; } cfs_rwlock_init(&m->mdt_sptlrpc_lock); @@ -5183,7 +5000,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, rc = mdt_stack_init((struct lu_env *)env, m, cfg); if (rc) { CERROR("Can't init device stack, rc %d\n", rc); - RETURN(rc); + GOTO(err_lmi, rc); } s = m->mdt_md_dev.md_lu_dev.ld_site; diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index d5b125b..5b40877 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -939,62 +939,18 @@ static int mdt_create_unpack(struct mdt_thread_info *info) LA_CTIME | LA_MTIME | LA_ATIME; memset(&sp->u, 0, sizeof(sp->u)); sp->sp_cr_flags = get_mrc_cr_flags(rec); - sp->sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT); - info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF); if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) mdt_set_capainfo(info, 0, rr->rr_fid1, req_capsule_client_get(pill, &RMF_CAPA1)); mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); - if (!info->mti_cross_ref) { - rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); - rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, - RCL_CLIENT) - 1; - LASSERT(rr->rr_name && rr->rr_namelen > 0); - } else { - rr->rr_name = NULL; - rr->rr_namelen = 0; - } + rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, + RCL_CLIENT) - 1; + LASSERT(rr->rr_name && rr->rr_namelen > 0); -#ifdef CONFIG_FS_POSIX_ACL - if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) { - if (S_ISDIR(attr->la_mode)) - sp->u.sp_pfid = rr->rr_fid1; - req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL); - LASSERT(req_capsule_field_present(pill, &RMF_EADATA, - RCL_CLIENT)); - rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA); - rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT); - sp->u.sp_ea.eadata = rr->rr_eadata; - sp->u.sp_ea.eadatalen = rr->rr_eadatalen; - sp->u.sp_ea.fid = rr->rr_fid1; - RETURN(0); - } -#endif - if (S_ISDIR(attr->la_mode)) { - /* pass parent fid for cross-ref cases */ - sp->u.sp_pfid = rr->rr_fid1; - if (sp->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) { - /* create salve object req, need - * unpack split ea here - */ - req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SLAVE); - LASSERT(req_capsule_field_present(pill, &RMF_EADATA, - RCL_CLIENT)); - rr->rr_eadata = req_capsule_client_get(pill, - &RMF_EADATA); - rr->rr_eadatalen = req_capsule_get_size(pill, - &RMF_EADATA, - RCL_CLIENT); - sp->u.sp_ea.eadata = rr->rr_eadata; - sp->u.sp_ea.eadatalen = rr->rr_eadatalen; - sp->u.sp_ea.fid = rr->rr_fid1; - RETURN(0); - } - req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL); - } else if (S_ISLNK(attr->la_mode)) { + if (S_ISLNK(attr->la_mode)) { const char *tgt = NULL; req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SYM); @@ -1007,6 +963,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info) } else { req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL); } + rc = mdt_dlmreq_unpack(info); RETURN(rc); } @@ -1047,14 +1004,12 @@ static int mdt_link_unpack(struct mdt_thread_info *info) mdt_set_capainfo(info, 1, rr->rr_fid2, req_capsule_client_get(pill, &RMF_CAPA2)); - info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT); - info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; - if (!info->mti_cross_ref) - LASSERT(rr->rr_namelen > 0); + + LASSERT(rr->rr_namelen > 0); rc = mdt_dlmreq_unpack(info); RETURN(rc); @@ -1095,17 +1050,11 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) mdt_set_capainfo(info, 0, rr->rr_fid1, req_capsule_client_get(pill, &RMF_CAPA1)); - info->mti_cross_ref = !!(rec->ul_bias & MDS_CROSS_REF); - if (!info->mti_cross_ref) { - rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); - rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; - if (rr->rr_name == NULL || rr->rr_namelen == 0) - RETURN(-EFAULT); - } else { - rr->rr_name = NULL; - rr->rr_namelen = 0; - } - info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT); + rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; + if (rr->rr_name == NULL || rr->rr_namelen == 0) + RETURN(-EFAULT); + if (rec->ul_bias & MDS_VTX_BYPASS) ma->ma_attr_flags |= MDS_VTX_BYPASS; else @@ -1156,16 +1105,14 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) mdt_set_capainfo(info, 1, rr->rr_fid2, req_capsule_client_get(pill, &RMF_CAPA2)); - info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT); - info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT); if (rr->rr_name == NULL || rr->rr_tgt == NULL) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT) - 1; - if (!info->mti_cross_ref) - LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0); + LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0); + if (rec->rn_bias & MDS_VTX_BYPASS) ma->ma_attr_flags |= MDS_VTX_BYPASS; else @@ -1242,7 +1189,6 @@ static int mdt_open_unpack(struct mdt_thread_info *info) RETURN(-EPROTO); info->mti_replayepoch = rec->cr_ioepoch; - info->mti_spec.sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT); info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF); if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 30e8ece..909bd10 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -268,9 +268,6 @@ static int mdt_server_data_init(const struct lu_env *env, lsd->lsd_feature_compat |= OBD_COMPAT_20; } - if (lsi->lsi_flags & LDD_F_IAM_DIR) - lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR; - lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID; cfs_spin_lock(&mdt->mdt_lut.lut_translock); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index d4f2c6f..d0c072b 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -357,56 +357,6 @@ out_put_parent: RETURN(rc); } -/* Partial request to create object only */ -static int mdt_md_mkobj(struct mdt_thread_info *info) -{ - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *o; - struct mdt_body *repbody; - struct md_attr *ma = &info->mti_attr; - int rc; - ENTRY; - - DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"", - PFID(info->mti_rr.rr_fid2)); - - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - - o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2); - if (!IS_ERR(o)) { - struct md_object *next = mdt_object_child(o); - - ma->ma_need = MA_INODE; - ma->ma_valid = 0; - - /* - * Cross-ref create can encounter already created obj in case of - * recovery, just get attr in that case. - */ - if (mdt_object_exists(o) == 1) { - rc = mdt_attr_get_complex(info, o, ma); - } else { - /* - * Here, NO permission check for object_create, - * such check has been done on the original MDS. - */ - rc = mo_object_create(info->mti_env, next, - &info->mti_spec, ma); - } - if (rc == 0) { - /* Return fid & attr to client. */ - if (ma->ma_valid & MA_INODE) - mdt_pack_attr2body(info, repbody, &ma->ma_attr, - mdt_object_fid(o)); - } - mdt_object_put(info->mti_env, o); - } else - rc = PTR_ERR(o); - - mdt_create_pack_capa(info, rc, o, repbody); - RETURN(rc); -} - int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, struct md_attr *ma, int flags) { @@ -614,35 +564,29 @@ static int mdt_reint_create(struct mdt_thread_info *info, if (info->mti_dlm_req) ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0); + LASSERT(info->mti_rr.rr_namelen > 0); switch (info->mti_attr.ma_attr.la_mode & S_IFMT) { - case S_IFDIR:{ - /* Cross-ref case. */ - /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */ - if (info->mti_cross_ref) { - rc = mdt_md_mkobj(info); - } else { - LASSERT(info->mti_rr.rr_namelen > 0); - mdt_counter_incr(req, LPROC_MDT_MKDIR); - rc = mdt_md_create(info); - } - break; - } + case S_IFDIR: + mdt_counter_incr(req, LPROC_MDT_MKDIR); + break; case S_IFREG: case S_IFLNK: case S_IFCHR: case S_IFBLK: case S_IFIFO: - case S_IFSOCK:{ - /* Special file should stay on the same node as parent. */ - LASSERT(info->mti_rr.rr_namelen > 0); + case S_IFSOCK: + /* Special file should stay on the same node as parent. */ mdt_counter_incr(req, LPROC_MDT_MKNOD); - rc = mdt_md_create(info); - break; - } - default: - rc = err_serious(-EOPNOTSUPP); - } - RETURN(rc); + break; + default: + CERROR("%s: Unsupported mode %o\n", + mdt2obd_dev(info->mti_mdt)->obd_name, + info->mti_attr.ma_attr.la_mode); + RETURN(err_serious(-EOPNOTSUPP)); + } + + rc = mdt_md_create(info); + RETURN(rc); } /* @@ -674,29 +618,16 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, RETURN(err_serious(-ENOENT)); /* - * step 1: lock the parent. Note, this may be child in case of - * remote operation denoted by ->mti_cross_ref flag. + * step 1: lock the parent. */ parent_lh = &info->mti_lh[MDT_LH_PARENT]; - if (info->mti_cross_ref) { - /* - * Init reg lock for cross ref case when we need to do only - * ref del locally. - */ - mdt_lock_reg_init(parent_lh, LCK_PW); - } else { - mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name, - rr->rr_namelen); - } + mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name, + rr->rr_namelen); + mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh, MDS_INODELOCK_UPDATE); - if (IS_ERR(mp)) { - rc = PTR_ERR(mp); - /* errors are possible here in cross-ref cases, see below */ - if (info->mti_cross_ref) - rc = 0; - GOTO(out, rc); - } + if (IS_ERR(mp)) + GOTO(out, rc = PTR_ERR(mp)); if (mdt_object_obf(mp)) GOTO(out_unlock_parent, rc = -EPERM); @@ -707,24 +638,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, mdt_reint_init_ma(info, ma); - if (info->mti_cross_ref) { - /* - * Remote partial operation. It is possible that replay may - * happen on parent MDT and this operation will be repeated. - * Therefore the object absense is allowed case and nothing - * should be done here. - */ - if (mdt_object_exists(mp) > 0) { - mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); - rc = mo_ref_del(info->mti_env, - mdt_object_child(mp), ma); - if (rc == 0) - mdt_handle_last_unlink(info, mp, ma); - } else - rc = 0; - GOTO(out_unlock_parent, rc); - } - /* step 2: find & lock the child */ lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); /* lookup child object along with version checking */ @@ -819,21 +732,6 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0); - if (info->mti_cross_ref) { - /* MDT holding name ask us to add ref. */ - lhs = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_reg_init(lhs, LCK_EX); - ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, - MDS_INODELOCK_UPDATE); - if (IS_ERR(ms)) - RETURN(PTR_ERR(ms)); - - mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); - rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma); - mdt_object_unlock_put(info, ms, lhs, rc); - RETURN(rc); - } - /* Invalid case so return error immediately instead of * processing it */ if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) @@ -935,79 +833,6 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info, return rc; } -/* partial operation for rename */ -static int mdt_reint_rename_tgt(struct mdt_thread_info *info) -{ - struct mdt_reint_record *rr = &info->mti_rr; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct mdt_object *mtgtdir; - struct mdt_object *mtgt = NULL; - struct mdt_lock_handle *lh_tgtdir; - struct mdt_lock_handle *lh_tgt = NULL; - struct lu_fid *tgt_fid = &info->mti_tmp_fid1; - struct lu_name *lname; - int rc; - ENTRY; - - DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID, - rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1)); - - /* step 1: lookup & lock the tgt dir. */ - lh_tgtdir = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt, - rr->rr_tgtlen); - mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir, - MDS_INODELOCK_UPDATE); - if (IS_ERR(mtgtdir)) - RETURN(PTR_ERR(mtgtdir)); - - /* step 2: find & lock the target object if exists. */ - mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); - lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen); - rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir), - lname, tgt_fid, &info->mti_spec); - if (rc != 0 && rc != -ENOENT) { - GOTO(out_unlock_tgtdir, rc); - } else if (rc == 0) { - /* - * In case of replay that name can be already inserted, check - * that and do nothing if so. - */ - if (lu_fid_eq(tgt_fid, rr->rr_fid2)) - GOTO(out_unlock_tgtdir, rc); - - lh_tgt = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_reg_init(lh_tgt, LCK_EX); - - mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt, - MDS_INODELOCK_LOOKUP); - if (IS_ERR(mtgt)) - GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt)); - - mdt_reint_init_ma(info, ma); - - rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir), - mdt_object_child(mtgt), rr->rr_fid2, - lname, ma); - } else /* -ENOENT */ { - rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir), - lname, rr->rr_fid2, ma); - } - - /* handle last link of tgt object */ - if (rc == 0 && mtgt) - mdt_handle_last_unlink(info, mtgt, ma); - - EXIT; - - if (mtgt) - mdt_object_unlock_put(info, mtgt, lh_tgt, rc); -out_unlock_tgtdir: - mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc); - return rc; -} - static int mdt_rename_lock(struct mdt_thread_info *info, struct lustre_handle *lh) { @@ -1126,11 +951,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info, if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0); - if (info->mti_cross_ref) { - rc = mdt_reint_rename_tgt(info); - RETURN(rc); - } - DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s", PFID(rr->rr_fid1), rr->rr_name, PFID(rr->rr_fid2), rr->rr_tgt); diff --git a/lustre/obdclass/md_local_object.c b/lustre/obdclass/md_local_object.c index 77c9ce8..2b85c6d 100644 --- a/lustre/obdclass/md_local_object.c +++ b/lustre/obdclass/md_local_object.c @@ -132,7 +132,6 @@ static int llo_lookup(const struct lu_env *env, spec->sp_cr_flags = 0; spec->sp_cr_lookup = 0; spec->sp_cr_mode = 0; - spec->sp_ck_split = 0; lname->ln_name = name; lname->ln_namelen = strlen(name); @@ -284,7 +283,6 @@ static struct md_object *llo_create_obj(const struct lu_env *env, spec->sp_cr_flags = 0; spec->sp_cr_lookup = 1; spec->sp_cr_mode = 0; - spec->sp_ck_split = 0; if (feat == &dt_directory_features) la->la_mode = S_IFDIR | S_IXUGO; diff --git a/lustre/obdclass/obd_mount.c b/lustre/obdclass/obd_mount.c index 325acaa..551c385 100644 --- a/lustre/obdclass/obd_mount.c +++ b/lustre/obdclass/obd_mount.c @@ -2066,7 +2066,7 @@ static int lsi_prepare(struct lustre_sb_info *lsi) lsi->lsi_flags |= rc; /* Add mount line flags that used to be in ldd: - * writeconf, mgs, iam, anything else? + * writeconf, mgs, anything else? */ lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_WRITECONF) ? LDD_F_WRITECONF : 0; @@ -2074,8 +2074,6 @@ static int lsi_prepare(struct lustre_sb_info *lsi) LDD_F_VIRGIN : 0; lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_MGS) ? LDD_F_SV_TYPE_MGS : 0; - lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_IAM) ? - LDD_F_IAM_DIR : 0; lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_NO_PRIMNODE) ? LDD_F_NO_PRIMNODE : 0; diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index d209956..602e4ab 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -150,11 +150,7 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, l = &mo->oo_dt.do_lu; dt_object_init(&mo->oo_dt, NULL, d); - if (osd_dev(d)->od_iop_mode) - mo->oo_dt.do_ops = &osd_obj_ea_ops; - else - mo->oo_dt.do_ops = &osd_obj_ops; - + mo->oo_dt.do_ops = &osd_obj_ea_ops; l->lo_ops = &osd_lu_obj_ops; cfs_init_rwsem(&mo->oo_sem); cfs_init_rwsem(&mo->oo_ext_idx_sem); @@ -397,10 +393,9 @@ trigger: obj->oo_inode = inode; LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); - if (dev->od_iop_mode) { - obj->oo_compat_dot_created = 1; - obj->oo_compat_dotdot_created = 1; - } + + obj->oo_compat_dot_created = 1; + obj->oo_compat_dotdot_created = 1; if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */ GOTO(out, result = 0); @@ -1690,7 +1685,6 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, { int result; struct osd_thandle *oth; - struct osd_device *osd = osd_obj2dev(obj); __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX)); LASSERT(S_ISDIR(attr->la_mode)); @@ -1698,16 +1692,7 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); result = osd_mkfile(info, obj, mode, hint, th); - if (result == 0 && osd->od_iop_mode == 0) { - LASSERT(obj->oo_inode != NULL); - /* - * XXX uh-oh... call low-level iam function directly. - */ - result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4, - sizeof (struct osd_fid_pack), - oth->ot_handle); - } return result; } @@ -2703,7 +2688,6 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, int result; int skip_iam = 0; struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); @@ -2711,7 +2695,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, if (osd_object_is_root(obj)) { dt->do_index_ops = &osd_index_ea_ops; result = 0; - } else if (feat == &dt_directory_features && osd->od_iop_mode) { + } else if (feat == &dt_directory_features) { dt->do_index_ops = &osd_index_ea_ops; if (S_ISDIR(obj->oo_inode->i_mode)) result = 0; @@ -3878,6 +3862,7 @@ static const struct dt_index_operations osd_index_iam_ops = { } }; + /** * Creates or initializes iterator context. * @@ -4447,11 +4432,6 @@ static int osd_mount(const struct lu_env *env, GOTO(out, rc = -EINVAL); } - if (lmd_flags & LMD_FLG_IAM) { - o->od_iop_mode = 0; - LCONSOLE_WARN("%s: OSD: IAM mode enabled\n", name); - } else - o->od_iop_mode = 1; if (lmd_flags & LMD_FLG_NOSCRUB) o->od_noscrub = 1; diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index 9fda286..1bdd60e 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -271,8 +271,7 @@ struct osd_device { struct obd_statfs od_statfs; cfs_spinlock_t od_osfs_lock; - unsigned int od_iop_mode:1, - od_noscrub:1; + unsigned int od_noscrub:1; struct fsfilt_operations *od_fsops; int od_connects; diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 514bd64..eaa5f64 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -2000,10 +2000,6 @@ void lustre_assert_wire_constants(void) MDS_OPEN_OWNEROVERRIDE); LASSERTF(MDS_OPEN_JOIN_FILE == 000400000000UL, "found 0%.11oUL\n", MDS_OPEN_JOIN_FILE); - LASSERTF(MDS_CREATE_RMT_ACL == 001000000000UL, "found 0%.11oUL\n", - MDS_CREATE_RMT_ACL); - LASSERTF(MDS_CREATE_SLAVE_OBJ == 002000000000UL, "found 0%.11oUL\n", - MDS_CREATE_SLAVE_OBJ); LASSERTF(MDS_OPEN_LOCK == 004000000000UL, "found 0%.11oUL\n", MDS_OPEN_LOCK); LASSERTF(MDS_OPEN_HAS_EA == 010000000000UL, "found 0%.11oUL\n", diff --git a/lustre/tests/cfg/local.sh b/lustre/tests/cfg/local.sh index 3f056d7..cff13a7 100644 --- a/lustre/tests/cfg/local.sh +++ b/lustre/tests/cfg/local.sh @@ -15,7 +15,6 @@ TMP=${TMP:-/tmp} DAEMONSIZE=${DAEMONSIZE:-500} MDSCOUNT=${MDSCOUNT:-1} [ $MDSCOUNT -gt 4 ] && MDSCOUNT=4 -[ $MDSCOUNT -gt 1 ] && IAMDIR=yes for num in $(seq $MDSCOUNT); do eval mds${num}_HOST=\$\{mds${num}_HOST:-$mds_HOST\} eval mds${num}failover_HOST=\$\{mds${num}failover_HOST:-$mdsfailover_HOST\} diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index 265bbcb..4771bfb 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -2784,8 +2784,6 @@ mkfs_opts() { opts+=${L_GETIDENTITY:+" --param=mdt.identity_upcall=$L_GETIDENTITY"} if [ $fstype == ldiskfs ]; then - opts+=${IAMDIR:+" --iam-dir"} - fs_mkfs_opts+=${MDSJOURNALSIZE:+" -J size=$MDSJOURNALSIZE"} fs_mkfs_opts+=${MDSISIZE:+" -i $MDSISIZE"} fi diff --git a/lustre/utils/mkfs_lustre.c b/lustre/utils/mkfs_lustre.c index e3cfa81..392e4f1 100644 --- a/lustre/utils/mkfs_lustre.c +++ b/lustre/utils/mkfs_lustre.c @@ -191,7 +191,7 @@ void print_ldd(char *str, struct lustre_disk_data *ldd) printf("Lustre FS: %s\n", ldd->ldd_fsname); printf("Mount type: %s\n", MT_STR(ldd)); printf("Flags: %#x\n", ldd->ldd_flags); - printf(" (%s%s%s%s%s%s%s%s%s%s)\n", + printf(" (%s%s%s%s%s%s%s%s%s)\n", IS_MDT(ldd) ? "MDT ":"", IS_OST(ldd) ? "OST ":"", IS_MGS(ldd) ? "MGS ":"", @@ -199,7 +199,6 @@ void print_ldd(char *str, struct lustre_disk_data *ldd) ldd->ldd_flags & LDD_F_VIRGIN ? "first_time ":"", ldd->ldd_flags & LDD_F_UPDATE ? "update ":"", ldd->ldd_flags & LDD_F_WRITECONF ? "writeconf ":"", - ldd->ldd_flags & LDD_F_IAM_DIR ? "IAM_dir_format ":"", ldd->ldd_flags & LDD_F_NO_PRIMNODE? "no_primnode ":"", ldd->ldd_flags & LDD_F_UPGRADE14 ? "upgrade1.4 ":""); printf("Persistent mount opts: %s\n", ldd->ldd_mount_opts); @@ -284,7 +283,6 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop, char **mountopts) { static struct option long_opt[] = { - {"iam-dir", 0, 0, 'a'}, {"backfstype", 1, 0, 'b'}, {"stripe-count-hint", 1, 0, 'c'}, {"comment", 1, 0, 'u'}, @@ -326,11 +324,6 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop, while ((opt = getopt_long(argc, argv, optstring, long_opt, &longidx)) != EOF) { switch (opt) { - case 'a': { - if (IS_MDT(&mop->mo_ldd)) - mop->mo_ldd.ldd_flags |= LDD_F_IAM_DIR; - break; - } case 'b': { int i = 0; while (i < LDD_MT_LAST) { diff --git a/lustre/utils/mount_lustre.c b/lustre/utils/mount_lustre.c index 1df69b0..fe1920f 100644 --- a/lustre/utils/mount_lustre.c +++ b/lustre/utils/mount_lustre.c @@ -364,8 +364,6 @@ static int parse_ldd(char *source, struct mount_opts *mop, char *options) append_option(options, "virgin"); if (ldd->ldd_flags & LDD_F_WRITECONF) append_option(options, "writeconf"); - if (ldd->ldd_flags & LDD_F_IAM_DIR) - append_option(options, "iam"); if (ldd->ldd_flags & LDD_F_NO_PRIMNODE) append_option(options, "noprimnode"); diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index c0781a0..a049517 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -900,8 +900,6 @@ check_mdt_body(void) CHECK_VALUE_O(MDS_OPEN_DELAY_CREATE); CHECK_VALUE_O(MDS_OPEN_OWNEROVERRIDE); CHECK_VALUE_O(MDS_OPEN_JOIN_FILE); - CHECK_VALUE_O(MDS_CREATE_RMT_ACL); - CHECK_VALUE_O(MDS_CREATE_SLAVE_OBJ); CHECK_VALUE_O(MDS_OPEN_LOCK); CHECK_VALUE_O(MDS_OPEN_HAS_EA); CHECK_VALUE_O(MDS_OPEN_HAS_OBJS); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index a3a66e5..bdefdd2 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -2008,10 +2008,6 @@ void lustre_assert_wire_constants(void) MDS_OPEN_OWNEROVERRIDE); LASSERTF(MDS_OPEN_JOIN_FILE == 000400000000UL, "found 0%.11oUL\n", MDS_OPEN_JOIN_FILE); - LASSERTF(MDS_CREATE_RMT_ACL == 001000000000UL, "found 0%.11oUL\n", - MDS_CREATE_RMT_ACL); - LASSERTF(MDS_CREATE_SLAVE_OBJ == 002000000000UL, "found 0%.11oUL\n", - MDS_CREATE_SLAVE_OBJ); LASSERTF(MDS_OPEN_LOCK == 004000000000UL, "found 0%.11oUL\n", MDS_OPEN_LOCK); LASSERTF(MDS_OPEN_HAS_EA == 010000000000UL, "found 0%.11oUL\n",