1. remove split checking and cross-ref code from DNE.
2. remove IAM code on ldiskfs and utils.
3. remove cmm directory.
Change-Id: I0c81d753462863706e8918393369dde94a45030c
Signed-off-by: Wang Di <di.wang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/4353
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
subdir-m += obdecho
subdir-m += mgc
-@SERVER_TRUE@subdir-m += mds ost mgs mdt cmm mdd ofd quota osp lod
+@SERVER_TRUE@subdir-m += mds ost mgs mdt mdd ofd quota osp lod
@CLIENT_TRUE@subdir-m += lov osc mdc lmv llite fld
@LDISKFS_ENABLED_TRUE@subdir-m += osd-ldiskfs
@ZFS_ENABLED_TRUE@subdir-m += osd-zfs
ALWAYS_SUBDIRS = include lvfs obdclass ldlm ptlrpc obdecho \
mgc fid fld doc utils tests scripts autoconf contrib conf
-SERVER_SUBDIRS = ost mds mgs mdt cmm mdd ofd osd-zfs osd-ldiskfs \
+SERVER_SUBDIRS = ost mds mgs mdt mdd ofd osd-zfs osd-ldiskfs \
quota osp lod target
CLIENT_SUBDIRS = mdc lmv llite lclient lov osc
lustre/mds/autoMakefile
lustre/mdt/Makefile
lustre/mdt/autoMakefile
-lustre/cmm/Makefile
-lustre/cmm/autoMakefile
lustre/mdd/Makefile
lustre/mdd/autoMakefile
lustre/fld/Makefile
+++ /dev/null
-MODULES := cmm
-cmm-objs := cmm_device.o cmm_object.o cmm_lproc.o mdc_device.o mdc_object.o
-
-@SPLIT_TRUE@cmm-objs += cmm_split.o
-
-@INCLUDE_RULES@
+++ /dev/null
-#
-# GPL HEADER START
-#
-# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 only,
-# as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# General Public License version 2 for more details (a copy is included
-# in the LICENSE file that accompanied this code).
-#
-# You should have received a copy of the GNU General Public License
-# version 2 along with this program; If not, see
-# http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
-#
-# Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
-# CA 95054 USA or visit www.sun.com if you need additional information or
-# have any questions.
-#
-# GPL HEADER END
-#
-
-#
-# Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
-# Use is subject to license terms.
-#
-
-#
-# This file is part of Lustre, http://www.lustre.org/
-# Lustre is a trademark of Sun Microsystems, Inc.
-#
-
-if MODULES
-modulefs_DATA = cmm$(KMODEXT)
-endif
-
-MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
-EXTRA_DIST = $(cmm-objs:%.o=%.c) cmm_internal.h mdc_internal.h
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_device.c
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-/**
- * \addtogroup cmm
- * @{
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <linux/module.h>
-
-#include <obd.h>
-#include <obd_class.h>
-#include <lprocfs_status.h>
-#include <lustre_ver.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-struct obd_ops cmm_obd_device_ops = {
- .o_owner = THIS_MODULE
-};
-
-static const struct lu_device_operations cmm_lu_ops;
-
-static inline int lu_device_is_cmm(struct lu_device *d)
-{
- return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
-}
-
-int cmm_root_get(const struct lu_env *env, struct md_device *md,
- struct lu_fid *fid)
-{
- struct cmm_device *cmm_dev = md2cmm_dev(md);
- /* valid only on master MDS */
- if (cmm_dev->cmm_local_num == 0)
- return cmm_child_ops(cmm_dev)->mdo_root_get(env,
- cmm_dev->cmm_child, fid);
- else
- return -EINVAL;
-}
-
-static int cmm_statfs(const struct lu_env *env, struct md_device *md,
- struct obd_statfs *sfs)
-{
- struct cmm_device *cmm_dev = md2cmm_dev(md);
- int rc;
-
- ENTRY;
- rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
- cmm_dev->cmm_child, sfs);
- RETURN (rc);
-}
-
-static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
- int *md_size, int *cookie_size)
-{
- struct cmm_device *cmm_dev = md2cmm_dev(md);
- int rc;
- ENTRY;
- rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
- md_size, cookie_size);
- RETURN(rc);
-}
-
-static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
- int mode , unsigned long timeout, __u32 alg,
- struct lustre_capa_key *keys)
-{
- struct cmm_device *cmm_dev = md2cmm_dev(md);
- int rc;
- ENTRY;
- LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
- rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
- mode, timeout, alg,
- keys);
- RETURN(rc);
-}
-
-static int cmm_update_capa_key(const struct lu_env *env,
- struct md_device *md,
- struct lustre_capa_key *key)
-{
- struct cmm_device *cmm_dev = md2cmm_dev(md);
- int rc;
- ENTRY;
- rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
- cmm_dev->cmm_child,
- key);
- RETURN(rc);
-}
-
-static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
- int idx, void **h)
-{
- struct cmm_device *cmm_dev = md2cmm_dev(m);
- int rc;
- ENTRY;
-
- rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child,
- idx, h);
- RETURN(rc);
-}
-
-int cmm_iocontrol(const struct lu_env *env, struct md_device *m,
- unsigned int cmd, int len, void *data)
-{
- struct md_device *next = md2cmm_dev(m)->cmm_child;
- int rc;
-
- ENTRY;
- rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
- RETURN(rc);
-}
-
-
-static const struct md_device_operations cmm_md_ops = {
- .mdo_statfs = cmm_statfs,
- .mdo_root_get = cmm_root_get,
- .mdo_maxsize_get = cmm_maxsize_get,
- .mdo_init_capa_ctxt = cmm_init_capa_ctxt,
- .mdo_update_capa_key = cmm_update_capa_key,
- .mdo_llog_ctxt_get = cmm_llog_ctxt_get,
- .mdo_iocontrol = cmm_iocontrol,
-};
-
-extern struct lu_device_type mdc_device_type;
-/**
- * Init MDC.
- */
-static int cmm_post_init_mdc(const struct lu_env *env,
- struct cmm_device *cmm)
-{
- int max_mdsize, max_cookiesize, rc;
- struct mdc_device *mc, *tmp;
-
- /* get the max mdsize and cookiesize from lower layer */
- rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
- &max_cookiesize);
- if (rc)
- RETURN(rc);
-
- cfs_spin_lock(&cmm->cmm_tgt_guard);
- cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
- mc_linkage) {
- cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
- }
- cfs_spin_unlock(&cmm->cmm_tgt_guard);
- RETURN(rc);
-}
-
-/* --- cmm_lu_operations --- */
-/* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
-static int cmm_add_mdc(const struct lu_env *env,
- struct cmm_device *cm, struct lustre_cfg *cfg)
-{
- struct lu_device_type *ldt = &mdc_device_type;
- char *p, *num = lustre_cfg_string(cfg, 2);
- struct mdc_device *mc, *tmp;
- struct lu_fld_target target;
- struct lu_device *ld;
- struct lu_device *cmm_lu = cmm2lu_dev(cm);
- mdsno_t mdc_num;
- struct lu_site *site = cmm2lu_dev(cm)->ld_site;
- int rc;
- ENTRY;
-
- /* find out that there is no such mdc */
- LASSERT(num);
- mdc_num = simple_strtol(num, &p, 10);
- if (*p) {
- CERROR("Invalid index in lustre_cgf, offset 2\n");
- RETURN(-EINVAL);
- }
-
- cfs_spin_lock(&cm->cmm_tgt_guard);
- cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
- mc_linkage) {
- if (mc->mc_num == mdc_num) {
- cfs_spin_unlock(&cm->cmm_tgt_guard);
- RETURN(-EEXIST);
- }
- }
- cfs_spin_unlock(&cm->cmm_tgt_guard);
- ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
- if (IS_ERR(ld))
- RETURN(PTR_ERR(ld));
-
- ld->ld_site = site;
-
- rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
- if (rc) {
- ldt->ldt_ops->ldto_device_free(env, ld);
- RETURN(rc);
- }
- /* pass config to the just created MDC */
- rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
- if (rc) {
- ldt->ldt_ops->ldto_device_fini(env, ld);
- ldt->ldt_ops->ldto_device_free(env, ld);
- RETURN(rc);
- }
-
- cfs_spin_lock(&cm->cmm_tgt_guard);
- cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
- mc_linkage) {
- if (mc->mc_num == mdc_num) {
- cfs_spin_unlock(&cm->cmm_tgt_guard);
- ldt->ldt_ops->ldto_device_fini(env, ld);
- ldt->ldt_ops->ldto_device_free(env, ld);
- RETURN(-EEXIST);
- }
- }
- mc = lu2mdc_dev(ld);
- cfs_list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
- cm->cmm_tgt_count++;
- cfs_spin_unlock(&cm->cmm_tgt_guard);
-
- lu_device_get(cmm_lu);
- lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
-
- target.ft_srv = NULL;
- target.ft_idx = mc->mc_num;
- target.ft_exp = mc->mc_desc.cl_exp;
- fld_client_add_target(cm->cmm_fld, &target);
-
- if (mc->mc_num == 0) {
- /* this is mdt0 -> mc export, fld lookup need this export
- to forward fld lookup request. */
- LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
- lu_site2md(site)->ms_server_fld->lsf_control_exp =
- mc->mc_desc.cl_exp;
- }
- /* Set max md size for the mdc. */
- rc = cmm_post_init_mdc(env, cm);
- RETURN(rc);
-}
-
-static void cmm_device_shutdown(const struct lu_env *env,
- struct cmm_device *cm,
- struct lustre_cfg *cfg)
-{
- struct mdc_device *mc, *tmp;
- ENTRY;
-
- /* Remove local target from FLD. */
- fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
-
- /* Finish all mdc devices. */
- cfs_spin_lock(&cm->cmm_tgt_guard);
- cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
- struct lu_device *ld_m = mdc2lu_dev(mc);
- fld_client_del_target(cm->cmm_fld, mc->mc_num);
- ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
- }
- cfs_spin_unlock(&cm->cmm_tgt_guard);
-
- /* remove upcall device*/
- md_upcall_fini(&cm->cmm_md_dev);
-
- EXIT;
-}
-
-static int cmm_device_mount(const struct lu_env *env,
- struct cmm_device *m, struct lustre_cfg *cfg)
-{
- const char *index = lustre_cfg_string(cfg, 2);
- char *p;
-
- LASSERT(index != NULL);
-
- m->cmm_local_num = simple_strtol(index, &p, 10);
- if (*p) {
- CERROR("Invalid index in lustre_cgf\n");
- RETURN(-EINVAL);
- }
-
- RETURN(0);
-}
-
-static int cmm_process_config(const struct lu_env *env,
- struct lu_device *d, struct lustre_cfg *cfg)
-{
- struct cmm_device *m = lu2cmm_dev(d);
- struct lu_device *next = md2lu_dev(m->cmm_child);
- int err;
- ENTRY;
-
- switch(cfg->lcfg_command) {
- case LCFG_ADD_MDC:
- /* On first ADD_MDC add also local target. */
- if (!(m->cmm_flags & CMM_INITIALIZED)) {
- struct lu_site *ls = cmm2lu_dev(m)->ld_site;
- struct lu_fld_target target;
-
- target.ft_srv = lu_site2md(ls)->ms_server_fld;
- target.ft_idx = m->cmm_local_num;
- target.ft_exp = NULL;
-
- fld_client_add_target(m->cmm_fld, &target);
- }
- err = cmm_add_mdc(env, m, cfg);
-
- /* The first ADD_MDC can be counted as setup is finished. */
- if (!(m->cmm_flags & CMM_INITIALIZED))
- m->cmm_flags |= CMM_INITIALIZED;
-
- break;
- case LCFG_SETUP:
- {
- /* lower layers should be set up at first */
- err = next->ld_ops->ldo_process_config(env, next, cfg);
- if (err == 0)
- err = cmm_device_mount(env, m, cfg);
- break;
- }
- case LCFG_CLEANUP:
- {
- lu_dev_del_linkage(d->ld_site, d);
- cmm_device_shutdown(env, m, cfg);
- }
- default:
- err = next->ld_ops->ldo_process_config(env, next, cfg);
- }
- RETURN(err);
-}
-
-static int cmm_recovery_complete(const struct lu_env *env,
- struct lu_device *d)
-{
- struct cmm_device *m = lu2cmm_dev(d);
- struct lu_device *next = md2lu_dev(m->cmm_child);
- int rc;
- ENTRY;
- rc = next->ld_ops->ldo_recovery_complete(env, next);
- RETURN(rc);
-}
-
-static int cmm_prepare(const struct lu_env *env,
- struct lu_device *pdev,
- struct lu_device *dev)
-{
- struct cmm_device *cmm = lu2cmm_dev(dev);
- struct lu_device *next = md2lu_dev(cmm->cmm_child);
- int rc;
-
- ENTRY;
- rc = next->ld_ops->ldo_prepare(env, dev, next);
- RETURN(rc);
-}
-
-static const struct lu_device_operations cmm_lu_ops = {
- .ldo_object_alloc = cmm_object_alloc,
- .ldo_process_config = cmm_process_config,
- .ldo_recovery_complete = cmm_recovery_complete,
- .ldo_prepare = cmm_prepare,
-};
-
-/* --- lu_device_type operations --- */
-int cmm_upcall(const struct lu_env *env, struct md_device *md,
- enum md_upcall_event ev, void *data)
-{
- int rc;
- ENTRY;
-
- switch (ev) {
- case MD_LOV_SYNC:
- rc = cmm_post_init_mdc(env, md2cmm_dev(md));
- if (rc)
- CERROR("can not init md size %d\n", rc);
- /* fall through */
- default:
- rc = md_do_upcall(env, md, ev, data);
- }
- RETURN(rc);
-}
-
-static struct lu_device *cmm_device_free(const struct lu_env *env,
- struct lu_device *d)
-{
- struct cmm_device *m = lu2cmm_dev(d);
- struct lu_device *next = md2lu_dev(m->cmm_child);
- ENTRY;
-
- LASSERT(m->cmm_tgt_count == 0);
- LASSERT(cfs_list_empty(&m->cmm_targets));
- if (m->cmm_fld != NULL) {
- OBD_FREE_PTR(m->cmm_fld);
- m->cmm_fld = NULL;
- }
- md_device_fini(&m->cmm_md_dev);
- OBD_FREE_PTR(m);
- RETURN(next);
-}
-
-static struct lu_device *cmm_device_alloc(const struct lu_env *env,
- struct lu_device_type *t,
- struct lustre_cfg *cfg)
-{
- struct lu_device *l;
- struct cmm_device *m;
- ENTRY;
-
- OBD_ALLOC_PTR(m);
- if (m == NULL) {
- l = ERR_PTR(-ENOMEM);
- } else {
- md_device_init(&m->cmm_md_dev, t);
- m->cmm_md_dev.md_ops = &cmm_md_ops;
- md_upcall_init(&m->cmm_md_dev, cmm_upcall);
- l = cmm2lu_dev(m);
- l->ld_ops = &cmm_lu_ops;
-
- OBD_ALLOC_PTR(m->cmm_fld);
- if (!m->cmm_fld) {
- cmm_device_free(env, l);
- l = ERR_PTR(-ENOMEM);
- }
- }
- RETURN(l);
-}
-
-/* context key constructor/destructor: cmm_key_init, cmm_key_fini */
-LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
-
-/* context key: cmm_thread_key */
-LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
-
-struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
-{
- struct cmm_thread_info *info;
-
- info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
- LASSERT(info != NULL);
- return info;
-}
-
-/* type constructor/destructor: cmm_type_init/cmm_type_fini */
-LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
-
-/*
- * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device
- * is really stacked.
- */
-static int __cmm_type_init(struct lu_device_type *t)
-{
- int rc;
- rc = lu_device_type_init(&mdc_device_type);
- if (rc == 0) {
- rc = cmm_type_init(t);
- if (rc)
- lu_device_type_fini(&mdc_device_type);
- }
- return rc;
-}
-
-static void __cmm_type_fini(struct lu_device_type *t)
-{
- lu_device_type_fini(&mdc_device_type);
- cmm_type_fini(t);
-}
-
-static void __cmm_type_start(struct lu_device_type *t)
-{
- mdc_device_type.ldt_ops->ldto_start(&mdc_device_type);
- cmm_type_start(t);
-}
-
-static void __cmm_type_stop(struct lu_device_type *t)
-{
- mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type);
- cmm_type_stop(t);
-}
-
-static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
- const char *name, struct lu_device *next)
-{
- struct cmm_device *m = lu2cmm_dev(d);
- struct lu_site *ls;
- int err = 0;
- ENTRY;
-
- cfs_spin_lock_init(&m->cmm_tgt_guard);
- CFS_INIT_LIST_HEAD(&m->cmm_targets);
- m->cmm_tgt_count = 0;
- m->cmm_child = lu2md_dev(next);
-
- err = fld_client_init(m->cmm_fld, name,
- LUSTRE_CLI_FLD_HASH_DHT);
- if (err) {
- CERROR("Can't init FLD, err %d\n", err);
- RETURN(err);
- }
-
- /* Assign site's fld client ref, needed for asserts in osd. */
- ls = cmm2lu_dev(m)->ld_site;
- lu_site2md(ls)->ms_client_fld = m->cmm_fld;
- err = cmm_procfs_init(m, name);
-
- RETURN(err);
-}
-
-static struct lu_device *cmm_device_fini(const struct lu_env *env,
- struct lu_device *ld)
-{
- struct cmm_device *cm = lu2cmm_dev(ld);
- struct mdc_device *mc, *tmp;
- struct lu_site *ls;
- ENTRY;
-
- /* Finish all mdc devices */
- cfs_spin_lock(&cm->cmm_tgt_guard);
- cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
- struct lu_device *ld_m = mdc2lu_dev(mc);
- struct lu_device *ld_c = cmm2lu_dev(cm);
-
- cfs_list_del_init(&mc->mc_linkage);
- lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
- lu_device_put(ld_c);
- ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
- ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
- cm->cmm_tgt_count--;
- }
- cfs_spin_unlock(&cm->cmm_tgt_guard);
-
- fld_client_proc_fini(cm->cmm_fld);
- fld_client_fini(cm->cmm_fld);
- ls = cmm2lu_dev(cm)->ld_site;
- lu_site2md(ls)->ms_client_fld = NULL;
- cmm_procfs_fini(cm);
-
- RETURN (md2lu_dev(cm->cmm_child));
-}
-
-static struct lu_device_type_operations cmm_device_type_ops = {
- .ldto_init = __cmm_type_init,
- .ldto_fini = __cmm_type_fini,
-
- .ldto_start = __cmm_type_start,
- .ldto_stop = __cmm_type_stop,
-
- .ldto_device_alloc = cmm_device_alloc,
- .ldto_device_free = cmm_device_free,
-
- .ldto_device_init = cmm_device_init,
- .ldto_device_fini = cmm_device_fini
-};
-
-static struct lu_device_type cmm_device_type = {
- .ldt_tags = LU_DEVICE_MD,
- .ldt_name = LUSTRE_CMM_NAME,
- .ldt_ops = &cmm_device_type_ops,
- .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
-};
-
-struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
- { 0 }
-};
-
-struct lprocfs_vars lprocfs_cmm_module_vars[] = {
- { 0 }
-};
-
-static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
-{
- lvars->module_vars = lprocfs_cmm_module_vars;
- lvars->obd_vars = lprocfs_cmm_obd_vars;
-}
-/** @} */
-
-static int __init cmm_mod_init(void)
-{
- struct lprocfs_static_vars lvars;
-
- lprocfs_cmm_init_vars(&lvars);
- return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
- LUSTRE_CMM_NAME, &cmm_device_type);
-}
-
-static void __exit cmm_mod_exit(void)
-{
- class_unregister_type(LUSTRE_CMM_NAME);
-}
-
-MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
-MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
-MODULE_LICENSE("GPL");
-
-cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_internal.h
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-/**
- * \defgroup cmm cmm
- * Cluster Metadata Manager.
- * Implementation of md interface to provide operation between MDS-es.
- *
- * CMM has two set of md methods, one is for local operation which uses MDD,
- * second is for remote operations to call different MDS.
- * When object is allocated then proper set of methods are set for it. *
- * @{
- */
-#ifndef _CMM_INTERNAL_H
-#define _CMM_INTERNAL_H
-
-#if defined(__KERNEL__)
-
-#include <obd.h>
-#include <lustre_fld.h>
-#include <md_object.h>
-#include <lustre_acl.h>
-
-/**
- * md_device extention for CMM layer.
- */
-struct cmm_device {
- /** md_device instance */
- struct md_device cmm_md_dev;
- /** device flags, taken from enum cmm_flags */
- __u32 cmm_flags;
- /** underlaying device in MDS stack, usually MDD */
- struct md_device *cmm_child;
- /** FLD client to talk to FLD */
- struct lu_client_fld *cmm_fld;
- /** Number of this MDS server in cluster */
- mdsno_t cmm_local_num;
- /** Total number of other MDS */
- __u32 cmm_tgt_count;
- /** linked list of all remove MDS clients */
- cfs_list_t cmm_targets;
- /** lock for cmm_device::cmm_targets operations */
- cfs_spinlock_t cmm_tgt_guard;
- /** /proc entry with CMM data */
- cfs_proc_dir_entry_t *cmm_proc_entry;
- /** CMM statistic */
- struct lprocfs_stats *cmm_stats;
-};
-
-/**
- * CMM flags
- */
-enum cmm_flags {
- /** Device initialization complete. */
- CMM_INITIALIZED = 1 << 0
-};
-
-/**
- * Wrapper for md_device::md_ops of underlaying device cmm_child.
- */
-static inline const struct md_device_operations *
-cmm_child_ops(struct cmm_device *d)
-{
- return d->cmm_child->md_ops;
-}
-
-/** Wrapper to get cmm_device by contained md_device */
-static inline struct cmm_device *md2cmm_dev(struct md_device *m)
-{
- return container_of0(m, struct cmm_device, cmm_md_dev);
-}
-
-/** Wrapper to get cmm_device by contained lu_device */
-static inline struct cmm_device *lu2cmm_dev(struct lu_device *d)
-{
- return container_of0(d, struct cmm_device, cmm_md_dev.md_lu_dev);
-}
-
-/** Wrapper to get lu_device from cmm_device */
-static inline struct lu_device *cmm2lu_dev(struct cmm_device *d)
-{
- return (&d->cmm_md_dev.md_lu_dev);
-}
-
-#ifdef HAVE_SPLIT_SUPPORT
-/**
- * \ingroup split
- * States of split operation.
- */
-enum cmm_split_state {
- CMM_SPLIT_UNKNOWN,
- CMM_SPLIT_NONE,
- CMM_SPLIT_NEEDED,
- CMM_SPLIT_DONE,
- CMM_SPLIT_DENIED
-};
-#endif
-
-/** CMM container for md_object. */
-struct cmm_object {
- struct md_object cmo_obj;
-};
-
-/**
- * \defgroup cml cml
- * CMM local operations.
- * @{
- */
-/**
- * Local CMM object.
- */
-struct cml_object {
- /** cmm_object instance. */
- struct cmm_object cmm_obj;
-#ifdef HAVE_SPLIT_SUPPORT
- /** split state of object (for dirs only) */
- enum cmm_split_state clo_split;
-#endif
-};
-/** @} */
-
-/**
- * \defgroup cmr cmr
- * CMM remote operations.
- * @{
- */
-/**
- * Remote CMM object.
- */
-struct cmr_object {
- /** cmm_object instance */
- struct cmm_object cmm_obj;
- /** mds number where object is placed */
- mdsno_t cmo_num;
-};
-/** @} */
-
-enum {
- CMM_SPLIT_PAGE_COUNT = 1
-};
-
-/**
- * CMM thread info.
- * This is storage for all variables used in CMM functions and it is needed as
- * stack replacement.
- */
-struct cmm_thread_info {
- struct md_attr cmi_ma;
- struct lu_buf cmi_buf;
- struct lu_fid cmi_fid; /* used for le/cpu conversions */
- struct lu_rdpg cmi_rdpg;
- /** pointers to pages for readpage. */
- struct page *cmi_pages[CMM_SPLIT_PAGE_COUNT];
- struct md_op_spec cmi_spec;
- struct lmv_stripe_md cmi_lmv;
- char cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
- /** Ops object filename */
- struct lu_name cti_name;
-};
-
-/**
- * Wrapper to get cmm_device from cmm_object.
- */
-static inline struct cmm_device *cmm_obj2dev(struct cmm_object *c)
-{
- return (md2cmm_dev(md_obj2dev(&c->cmo_obj)));
-}
-
-/**
- * Get cmm_device from related lu_device.
- */
-static inline struct cmm_object *lu2cmm_obj(struct lu_object *o)
-{
- //LASSERT(lu_device_is_cmm(o->lo_dev));
- return container_of0(o, struct cmm_object, cmo_obj.mo_lu);
-}
-
-/**
- * Get cmm object from corresponding md_object.
- */
-static inline struct cmm_object *md2cmm_obj(struct md_object *o)
-{
- return container_of0(o, struct cmm_object, cmo_obj);
-}
-
-/**
- * Get next lower-layer md_object from current cmm_object.
- */
-static inline struct md_object *cmm2child_obj(struct cmm_object *o)
-{
- return (o ? lu2md(lu_object_next(&o->cmo_obj.mo_lu)) : NULL);
-}
-
-/**
- * Extract lu_fid from corresponding cmm_object.
- */
-static inline struct lu_fid* cmm2fid(struct cmm_object *obj)
-{
- return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
-}
-
-struct cmm_thread_info *cmm_env_info(const struct lu_env *env);
-struct lu_object *cmm_object_alloc(const struct lu_env *env,
- const struct lu_object_header *hdr,
- struct lu_device *);
-/**
- * \addtogroup cml
- * @{
- */
-/** Get cml_object from lu_object. */
-static inline struct cml_object *lu2cml_obj(struct lu_object *o)
-{
- return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
-}
-/** Get cml_object from md_object. */
-static inline struct cml_object *md2cml_obj(struct md_object *mo)
-{
- return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
-}
-/** Get cml_object from cmm_object */
-static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
-{
- return container_of0(co, struct cml_object, cmm_obj);
-}
-/** @} */
-
-int cmm_upcall(const struct lu_env *env, struct md_device *md,
- enum md_upcall_event ev, void *data);
-
-#ifdef HAVE_SPLIT_SUPPORT
-/**
- * \defgroup split split
- * @{
- */
-
-#define CMM_MD_SIZE(stripes) (sizeof(struct lmv_stripe_md) + \
- (stripes) * sizeof(struct lu_fid))
-
-/** Get and initialize lu_bug from cmm_thread_info. */
-static inline struct lu_buf *cmm_buf_get(const struct lu_env *env,
- void *area, ssize_t len)
-{
- struct lu_buf *buf;
-
- buf = &cmm_env_info(env)->cmi_buf;
- buf->lb_buf = area;
- buf->lb_len = len;
- return buf;
-}
-
-int cmm_split_check(const struct lu_env *env, struct md_object *mp,
- const char *name);
-int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma, int *split);
-int cmm_split_dir(const struct lu_env *env, struct md_object *mo);
-int cmm_split_access(const struct lu_env *env, struct md_object *mo,
- mdl_mode_t lm);
-/** @} */
-#endif
-
-int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
- mdsno_t *mds, const struct lu_env *env);
-
-int cmm_procfs_init(struct cmm_device *cmm, const char *name);
-int cmm_procfs_fini(struct cmm_device *cmm);
-
-void cmm_lprocfs_time_start(const struct lu_env *env);
-void cmm_lprocfs_time_end(const struct lu_env *env, struct cmm_device *cmm,
- int idx);
-/**
- * CMM counters.
- */
-enum {
- LPROC_CMM_SPLIT_CHECK,
- LPROC_CMM_SPLIT,
- LPROC_CMM_LOOKUP,
- LPROC_CMM_CREATE,
- LPROC_CMM_NR
-};
-
-#endif /* __KERNEL__ */
-#endif /* _CMM_INTERNAL_H */
-/** @} */
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_lproc.c
- *
- * CMM lprocfs stuff
- *
- * Author: Wang Di <wangdi@clusterfs.com>
- * Author: Yury Umanets <umka@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <linux/module.h>
-#include <obd.h>
-#include <obd_class.h>
-#include <lustre_ver.h>
-#include <obd_support.h>
-#include <lprocfs_status.h>
-#include <lu_time.h>
-
-#include <lustre/lustre_idl.h>
-
-#include "cmm_internal.h"
-/**
- * \addtogroup cmm
- * @{
- */
-static const char *cmm_counter_names[LPROC_CMM_NR] = {
- [LPROC_CMM_SPLIT_CHECK] = "split_check",
- [LPROC_CMM_SPLIT] = "split",
- [LPROC_CMM_LOOKUP] = "lookup",
- [LPROC_CMM_CREATE] = "create"
-};
-
-int cmm_procfs_init(struct cmm_device *cmm, const char *name)
-{
- struct lu_device *ld = &cmm->cmm_md_dev.md_lu_dev;
- struct obd_type *type;
- int rc;
- ENTRY;
-
- type = ld->ld_type->ldt_obd_type;
-
- LASSERT(name != NULL);
- LASSERT(type != NULL);
-
- /* Find the type procroot and add the proc entry for this device. */
- cmm->cmm_proc_entry = lprocfs_register(name, type->typ_procroot,
- NULL, NULL);
- if (IS_ERR(cmm->cmm_proc_entry)) {
- rc = PTR_ERR(cmm->cmm_proc_entry);
- CERROR("Error %d setting up lprocfs for %s\n",
- rc, name);
- cmm->cmm_proc_entry = NULL;
- GOTO(out, rc);
- }
-
- rc = lu_time_init(&cmm->cmm_stats,
- cmm->cmm_proc_entry,
- cmm_counter_names, ARRAY_SIZE(cmm_counter_names));
-
- EXIT;
-out:
- if (rc)
- cmm_procfs_fini(cmm);
- return rc;
-}
-
-int cmm_procfs_fini(struct cmm_device *cmm)
-{
- if (cmm->cmm_stats)
- lu_time_fini(&cmm->cmm_stats);
-
- if (cmm->cmm_proc_entry) {
- lprocfs_remove(&cmm->cmm_proc_entry);
- cmm->cmm_proc_entry = NULL;
- }
- RETURN(0);
-}
-
-void cmm_lprocfs_time_start(const struct lu_env *env)
-{
- lu_lprocfs_time_start(env);
-}
-
-void cmm_lprocfs_time_end(const struct lu_env *env, struct cmm_device *cmm,
- int idx)
-{
- lu_lprocfs_time_end(env, cmm->cmm_stats, idx);
-}
-/** @} */
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2012, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_object.c
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <lustre_fid.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-/**
- * \ingroup cmm
- * Lookup MDS number \a mds by FID \a fid.
- *
- * \param fid FID of object to find MDS
- * \param mds mds number to return.
- */
-int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
- mdsno_t *mds, const struct lu_env *env)
-{
- int rc = 0;
- ENTRY;
-
- LASSERT(fid_is_sane(fid));
-
- rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
- LU_SEQ_RANGE_MDT, env);
- if (rc) {
- CERROR("Can't find mds by seq "LPX64", rc %d\n",
- fid_seq(fid), rc);
- RETURN(rc);
- }
-
- if (*mds > cm->cmm_tgt_count) {
- CERROR("Got invalid mdsno: %x (max: %x)\n",
- *mds, cm->cmm_tgt_count);
- rc = -EINVAL;
- } else {
- CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
- LPX64"\n", *mds, fid_seq(fid));
- }
-
- RETURN (rc);
-}
-
-/**
- * \addtogroup cml
- * @{
- */
-static const struct md_object_operations cml_mo_ops;
-static const struct md_dir_operations cml_dir_ops;
-static const struct lu_object_operations cml_obj_ops;
-
-static const struct md_object_operations cmr_mo_ops;
-static const struct md_dir_operations cmr_dir_ops;
-static const struct lu_object_operations cmr_obj_ops;
-
-/**
- * \ingroup cmm
- * Allocate CMM object.
- */
-struct lu_object *cmm_object_alloc(const struct lu_env *env,
- const struct lu_object_header *loh,
- struct lu_device *ld)
-{
- const struct lu_fid *fid = &loh->loh_fid;
- struct lu_object *lo = NULL;
- struct cmm_device *cd;
- mdsno_t mds;
- int rc = 0;
-
- ENTRY;
-
- cd = lu2cmm_dev(ld);
- if (cd->cmm_flags & CMM_INITIALIZED) {
- /* get object location */
- rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
- if (rc)
- RETURN(NULL);
- } else
- /*
- * Device is not yet initialized, cmm_object is being created
- * as part of early bootstrap procedure (it is /ROOT, or /fld,
- * etc.). Such object *has* to be local.
- */
- mds = cd->cmm_local_num;
-
- /* select the proper set of operations based on object location */
- if (mds == cd->cmm_local_num) {
- struct cml_object *clo;
-
- OBD_ALLOC_PTR(clo);
- if (clo != NULL) {
- lo = &clo->cmm_obj.cmo_obj.mo_lu;
- lu_object_init(lo, NULL, ld);
- clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
- clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
- lo->lo_ops = &cml_obj_ops;
- }
- } else {
- struct cmr_object *cro;
-
- OBD_ALLOC_PTR(cro);
- if (cro != NULL) {
- lo = &cro->cmm_obj.cmo_obj.mo_lu;
- lu_object_init(lo, NULL, ld);
- cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
- cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
- lo->lo_ops = &cmr_obj_ops;
- cro->cmo_num = mds;
- }
- }
- RETURN(lo);
-}
-
-/**
- * Get local child device.
- */
-static struct lu_device *cml_child_dev(struct cmm_device *d)
-{
- return &d->cmm_child->md_lu_dev;
-}
-
-/**
- * Free cml_object.
- */
-static void cml_object_free(const struct lu_env *env,
- struct lu_object *lo)
-{
- struct cml_object *clo = lu2cml_obj(lo);
- lu_object_fini(lo);
- OBD_FREE_PTR(clo);
-}
-
-/**
- * Initialize cml_object.
- */
-static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
- const struct lu_object_conf *unused)
-{
- struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
- struct lu_device *c_dev;
- struct lu_object *c_obj;
- int rc;
-
- ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
- if (cd->cmm_tgt_count == 0)
- lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
- else
- lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
-#endif
- c_dev = cml_child_dev(cd);
- if (c_dev == NULL) {
- rc = -ENOENT;
- } else {
- c_obj = c_dev->ld_ops->ldo_object_alloc(env,
- lo->lo_header, c_dev);
- if (c_obj != NULL) {
- lu_object_add(lo, c_obj);
- rc = 0;
- } else {
- rc = -ENOMEM;
- }
- }
-
- RETURN(rc);
-}
-
-static int cml_object_print(const struct lu_env *env, void *cookie,
- lu_printer_t p, const struct lu_object *lo)
-{
- return (*p)(env, cookie, "[local]");
-}
-
-static const struct lu_object_operations cml_obj_ops = {
- .loo_object_init = cml_object_init,
- .loo_object_free = cml_object_free,
- .loo_object_print = cml_object_print
-};
-
-/**
- * \name CMM local md_object operations.
- * All of them call just corresponding operations on next layer.
- * @{
- */
-static int cml_object_create(const struct lu_env *env,
- struct md_object *mo,
- const struct md_op_spec *spec,
- struct md_attr *attr)
-{
- int rc;
- ENTRY;
- rc = mo_object_create(env, md_object_next(mo), spec, attr);
- RETURN(rc);
-}
-
-static int cml_permission(const struct lu_env *env,
- struct md_object *p, struct md_object *c,
- struct md_attr *attr, int mask)
-{
- int rc;
- ENTRY;
- rc = mo_permission(env, md_object_next(p), md_object_next(c),
- attr, mask);
- RETURN(rc);
-}
-
-static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
- struct md_attr *attr)
-{
- int rc;
- ENTRY;
- rc = mo_attr_get(env, md_object_next(mo), attr);
- RETURN(rc);
-}
-
-static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
- const struct md_attr *attr)
-{
- int rc;
- ENTRY;
- rc = mo_attr_set(env, md_object_next(mo), attr);
- RETURN(rc);
-}
-
-static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
- struct lu_buf *buf, const char *name)
-{
- int rc;
- ENTRY;
- rc = mo_xattr_get(env, md_object_next(mo), buf, name);
- RETURN(rc);
-}
-
-static int cml_readlink(const struct lu_env *env, struct md_object *mo,
- struct lu_buf *buf)
-{
- int rc;
- ENTRY;
- rc = mo_readlink(env, md_object_next(mo), buf);
- RETURN(rc);
-}
-
-static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type,
- int flags, struct md_object *mo)
-{
- int rc;
- ENTRY;
- rc = mo_changelog(env, type, flags, md_object_next(mo));
- RETURN(rc);
-}
-
-static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
- struct lu_buf *buf)
-{
- int rc;
- ENTRY;
- rc = mo_xattr_list(env, md_object_next(mo), buf);
- RETURN(rc);
-}
-
-static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
- const struct lu_buf *buf, const char *name,
- int fl)
-{
- int rc;
- ENTRY;
- rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
- RETURN(rc);
-}
-
-static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
- const char *name)
-{
- int rc;
- ENTRY;
- rc = mo_xattr_del(env, md_object_next(mo), name);
- RETURN(rc);
-}
-
-static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
- const struct md_attr *ma)
-{
- int rc;
- ENTRY;
- rc = mo_ref_add(env, md_object_next(mo), ma);
- RETURN(rc);
-}
-
-static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma)
-{
- int rc;
- ENTRY;
- rc = mo_ref_del(env, md_object_next(mo), ma);
- RETURN(rc);
-}
-
-static int cml_open(const struct lu_env *env, struct md_object *mo,
- int flags)
-{
- int rc;
- ENTRY;
- rc = mo_open(env, md_object_next(mo), flags);
- RETURN(rc);
-}
-
-static int cml_close(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma, int mode)
-{
- int rc;
- ENTRY;
- rc = mo_close(env, md_object_next(mo), ma, mode);
- RETURN(rc);
-}
-
-static int cml_readpage(const struct lu_env *env, struct md_object *mo,
- const struct lu_rdpg *rdpg)
-{
- int rc;
- ENTRY;
- rc = mo_readpage(env, md_object_next(mo), rdpg);
- RETURN(rc);
-}
-
-static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
- struct lustre_capa *capa, int renewal)
-{
- int rc;
- ENTRY;
- rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
- RETURN(rc);
-}
-
-static int cml_path(const struct lu_env *env, struct md_object *mo,
- char *path, int pathlen, __u64 *recno, int *linkno)
-{
- int rc;
- ENTRY;
- rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
- RETURN(rc);
-}
-
-static int cml_file_lock(const struct lu_env *env, struct md_object *mo,
- struct lov_mds_md *lmm, struct ldlm_extent *extent,
- struct lustre_handle *lockh)
-{
- int rc;
- ENTRY;
- rc = mo_file_lock(env, md_object_next(mo), lmm, extent, lockh);
- RETURN(rc);
-}
-
-static int cml_file_unlock(const struct lu_env *env, struct md_object *mo,
- struct lov_mds_md *lmm, struct lustre_handle *lockh)
-{
- int rc;
- ENTRY;
- rc = mo_file_unlock(env, md_object_next(mo), lmm, lockh);
- RETURN(rc);
-}
-
-static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
-{
- int rc;
- ENTRY;
- rc = mo_object_sync(env, md_object_next(mo));
- RETURN(rc);
-}
-
-static const struct md_object_operations cml_mo_ops = {
- .moo_permission = cml_permission,
- .moo_attr_get = cml_attr_get,
- .moo_attr_set = cml_attr_set,
- .moo_xattr_get = cml_xattr_get,
- .moo_xattr_list = cml_xattr_list,
- .moo_xattr_set = cml_xattr_set,
- .moo_xattr_del = cml_xattr_del,
- .moo_object_create = cml_object_create,
- .moo_ref_add = cml_ref_add,
- .moo_ref_del = cml_ref_del,
- .moo_open = cml_open,
- .moo_close = cml_close,
- .moo_readpage = cml_readpage,
- .moo_readlink = cml_readlink,
- .moo_changelog = cml_changelog,
- .moo_capa_get = cml_capa_get,
- .moo_object_sync = cml_object_sync,
- .moo_path = cml_path,
- .moo_file_lock = cml_file_lock,
- .moo_file_unlock = cml_file_unlock,
-};
-/** @} */
-
-/**
- * \name CMM local md_dir_operations.
- * @{
- */
-/**
- * cml lookup object fid by name.
- * This returns only FID by name.
- */
-static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
- const struct lu_name *lname, struct lu_fid *lf,
- struct md_op_spec *spec)
-{
- int rc;
- ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
- if (spec != NULL && spec->sp_ck_split) {
- rc = cmm_split_check(env, mo_p, lname->ln_name);
- if (rc)
- RETURN(rc);
- }
-#endif
- rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
- RETURN(rc);
-
-}
-
-/**
- * Helper to return lock mode. Used in split cases only.
- */
-static mdl_mode_t cml_lock_mode(const struct lu_env *env,
- struct md_object *mo, mdl_mode_t lm)
-{
- int rc = MDL_MINMODE;
- ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
- rc = cmm_split_access(env, mo, lm);
-#endif
-
- RETURN(rc);
-}
-
-/**
- * Create operation for cml.
- * Objects are local, but split can happen.
- * If split is not needed this will call next layer mdo_create().
- *
- * \param mo_p Parent directory. Local object.
- * \param lname name of file to create.
- * \param mo_c Child object. It has no real inode yet.
- * \param spec creation specification.
- * \param ma child object attributes.
- */
-static int cml_create(const struct lu_env *env, struct md_object *mo_p,
- const struct lu_name *lname, struct md_object *mo_c,
- struct md_op_spec *spec, struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
- /* Lock mode always should be sane. */
- LASSERT(spec->sp_cr_mode != MDL_MINMODE);
-
- /*
- * Sigh... This is long story. MDT may have race with detecting if split
- * is possible in cmm. We know this race and let it live, because
- * getting it rid (with some sem or spinlock) will also mean that
- * PDIROPS for create will not work because we kill parallel work, what
- * is really bad for performance and makes no sense having PDIROPS. So,
- * we better allow the race to live, but split dir only if some of
- * concurrent threads takes EX lock, not matter which one. So that, say,
- * two concurrent threads may have different lock modes on directory (CW
- * and EX) and not first one which comes here and see that split is
- * possible should split the dir, but only that one which has EX
- * lock. And we do not care that in this case, split may happen a bit
- * later (when dir size will not be necessarily 64K, but may be a bit
- * larger). So that, we allow concurrent creates and protect split by EX
- * lock.
- */
- if (spec->sp_cr_mode == MDL_EX) {
- /**
- * Split cases:
- * - Try to split \a mo_p upon each create operation.
- * If split is ok, -ERESTART is returned and current thread
- * will not peoceed with create. Instead it sends -ERESTART
- * to client to let it know that correct MDT must be chosen.
- * \see cmm_split_dir()
- */
- rc = cmm_split_dir(env, mo_p);
- if (rc)
- /*
- * -ERESTART or some split error is returned, we can't
- * proceed with create.
- */
- GOTO(out, rc);
- }
-
- if (spec != NULL && spec->sp_ck_split) {
- /**
- * - Directory is split already. Let the caller know that
- * it should tell client that directory is split and operation
- * should repeat to correct MDT.
- * \see cmm_split_check()
- */
- rc = cmm_split_check(env, mo_p, lname->ln_name);
- if (rc)
- GOTO(out, rc);
- }
-#endif
-
- rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
- spec, ma);
-
- EXIT;
-#ifdef HAVE_SPLIT_SUPPORT
-out:
-#endif
- return rc;
-}
-
-/** Call mdo_create_data() on next layer. All objects are local. */
-static int cml_create_data(const struct lu_env *env, struct md_object *p,
- struct md_object *o,
- const struct md_op_spec *spec,
- struct md_attr *ma)
-{
- int rc;
- ENTRY;
- rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
- spec, ma);
- RETURN(rc);
-}
-
-/** Call mdo_link() on next layer. All objects are local. */
-static int cml_link(const struct lu_env *env, struct md_object *mo_p,
- struct md_object *mo_s, const struct lu_name *lname,
- struct md_attr *ma)
-{
- int rc;
- ENTRY;
- rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
- lname, ma);
- RETURN(rc);
-}
-
-/** Call mdo_unlink() on next layer. All objects are local. */
-static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
- struct md_object *mo_c, const struct lu_name *lname,
- struct md_attr *ma)
-{
- int rc;
- ENTRY;
- rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
- lname, ma);
- RETURN(rc);
-}
-
-/** Call mdo_lum_lmm_cmp() on next layer */
-static int cml_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
- const struct md_op_spec *spec, struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
- rc = mdo_lum_lmm_cmp(env, md_object_next(mo_c), spec, ma);
- RETURN(rc);
-}
-
-/**
- * \ingroup cmm
- * Get mode of object.
- * Used in both cml and cmr hence can produce RPC to another server.
- */
-static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
- const struct lu_fid *lf, struct md_attr *ma,
- int *remote)
-{
- struct md_object *mo_s = md_object_find_slice(env, md, lf);
- struct cmm_thread_info *cmi;
- struct md_attr *tmp_ma;
- int rc;
- ENTRY;
-
- if (IS_ERR(mo_s))
- RETURN(PTR_ERR(mo_s));
-
- if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
- *remote = 1;
-
- cmi = cmm_env_info(env);
- tmp_ma = &cmi->cmi_ma;
- tmp_ma->ma_need = MA_INODE;
- tmp_ma->ma_valid = 0;
- /* get type from src, can be remote req */
- rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
- if (rc == 0) {
- ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
- ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
- ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
- ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
- ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
- }
- lu_object_put(env, &mo_s->mo_lu);
- RETURN(rc);
-}
-
-/**
- * \ingroup cmm
- * Set ctime for object.
- * Used in both cml and cmr hence can produce RPC to another server.
- */
-static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
- const struct lu_fid *lf, struct md_attr *ma)
-{
- struct md_object *mo_s = md_object_find_slice(env, md, lf);
- int rc;
- ENTRY;
-
- if (IS_ERR(mo_s))
- RETURN(PTR_ERR(mo_s));
-
- LASSERT(ma->ma_attr.la_valid & LA_CTIME);
- /* set ctime to obj, can be remote req */
- rc = mo_attr_set(env, md_object_next(mo_s), ma);
- lu_object_put(env, &mo_s->mo_lu);
- RETURN(rc);
-}
-
-/** Helper to output debug information about rename operation. */
-static inline void cml_rename_warn(const char *fname,
- struct md_object *mo_po,
- struct md_object *mo_pn,
- const struct lu_fid *lf,
- const char *s_name,
- struct md_object *mo_t,
- const char *t_name,
- int err)
-{
- if (mo_t)
- CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
- "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
- "[tname %s] [err %d]\n", fname,
- PFID(lu_object_fid(&mo_po->mo_lu)),
- PFID(lu_object_fid(&mo_pn->mo_lu)),
- PFID(lf), s_name,
- PFID(lu_object_fid(&mo_t->mo_lu)),
- t_name, err);
- else
- CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
- "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
- "[tname %s] [err %d]\n", fname,
- PFID(lu_object_fid(&mo_po->mo_lu)),
- PFID(lu_object_fid(&mo_pn->mo_lu)),
- PFID(lf), s_name,
- t_name, err);
-}
-
-/**
- * Rename operation for cml.
- *
- * This is the most complex cross-reference operation. It may consist of up to 4
- * MDS server and require several RPCs to be sent.
- *
- * \param mo_po Old parent object.
- * \param mo_pn New parent object.
- * \param lf FID of object to rename.
- * \param ls_name Source file name.
- * \param mo_t target object. Should be NULL here.
- * \param lt_name Name of target file.
- * \param ma object attributes.
- */
-static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
- struct md_object *mo_pn, const struct lu_fid *lf,
- const struct lu_name *ls_name, struct md_object *mo_t,
- const struct lu_name *lt_name, struct md_attr *ma)
-{
- struct cmm_thread_info *cmi;
- struct md_attr *tmp_ma = NULL;
- struct md_object *tmp_t = mo_t;
- int remote = 0, rc;
- ENTRY;
-
- rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
- if (rc)
- RETURN(rc);
-
- if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
- /**
- * \note \a mo_t is remote object and there is RPC to unlink it.
- * Before that, do local sanity check for rename first.
- */
- if (!remote) {
- struct md_object *mo_s = md_object_find_slice(env,
- md_obj2dev(mo_po), lf);
- if (IS_ERR(mo_s))
- RETURN(PTR_ERR(mo_s));
-
- LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
- rc = mo_permission(env, md_object_next(mo_po),
- md_object_next(mo_s),
- ma, MAY_RENAME_SRC);
- lu_object_put(env, &mo_s->mo_lu);
- if (rc)
- RETURN(rc);
- } else {
- rc = mo_permission(env, NULL, md_object_next(mo_po),
- ma, MAY_UNLINK | MAY_VTX_FULL);
- if (rc)
- RETURN(rc);
- }
-
- rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
- MAY_UNLINK | MAY_VTX_PART);
- if (rc)
- RETURN(rc);
-
- /*
- * /note \a ma will be changed after mo_ref_del(), but we will use
- * it for mdo_rename() later, so save it before mo_ref_del().
- */
- cmi = cmm_env_info(env);
- tmp_ma = &cmi->cmi_ma;
- *tmp_ma = *ma;
- rc = mo_ref_del(env, md_object_next(mo_t), ma);
- if (rc)
- RETURN(rc);
-
- tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
- mo_t = NULL;
- }
-
- /**
- * \note for src on remote MDS case, change its ctime before local
- * rename. Firstly, do local sanity check for rename if necessary.
- */
- if (remote) {
- if (!tmp_ma) {
- rc = mo_permission(env, NULL, md_object_next(mo_po),
- ma, MAY_UNLINK | MAY_VTX_FULL);
- if (rc)
- RETURN(rc);
-
- if (mo_t) {
- LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
- rc = mo_permission(env, md_object_next(mo_pn),
- md_object_next(mo_t),
- ma, MAY_RENAME_TAR);
- if (rc)
- RETURN(rc);
- } else {
- int mask;
-
- if (mo_po != mo_pn)
- mask = (S_ISDIR(ma->ma_attr.la_mode) ?
- MAY_LINK : MAY_CREATE);
- else
- mask = MAY_CREATE;
- rc = mo_permission(env, NULL,
- md_object_next(mo_pn),
- NULL, mask);
- if (rc)
- RETURN(rc);
- }
-
- ma->ma_attr_flags |= MDS_PERM_BYPASS;
- } else {
- LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
- }
-
- rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
- tmp_ma ? tmp_ma : ma);
- if (rc) {
- /* TODO: revoke mo_t if necessary. */
- cml_rename_warn("cmm_rename_ctime", mo_po,
- mo_pn, lf, ls_name->ln_name,
- tmp_t, lt_name->ln_name, rc);
- RETURN(rc);
- }
- }
-
- /* local rename, mo_t can be NULL */
- rc = mdo_rename(env, md_object_next(mo_po),
- md_object_next(mo_pn), lf, ls_name,
- md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
- if (rc)
- /* TODO: revoke all cml_rename */
- cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
- ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
-
- RETURN(rc);
-}
-
-/**
- * Rename target partial operation.
- * Used for cross-ref rename.
- */
-static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
- struct md_object *mo_t, const struct lu_fid *lf,
- const struct lu_name *lname, struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
- rc = mdo_rename_tgt(env, md_object_next(mo_p),
- md_object_next(mo_t), lf, lname, ma);
- RETURN(rc);
-}
-
-/**
- * Name insert only operation.
- * used only in case of rename_tgt() when target doesn't exist.
- */
-static int cml_name_insert(const struct lu_env *env, struct md_object *p,
- const struct lu_name *lname, const struct lu_fid *lf,
- const struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
- rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
-
- RETURN(rc);
-}
-
-/**
- * \ingroup cmm
- * Check two fids are not subdirectories.
- */
-static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
- const struct lu_fid *fid, struct lu_fid *sfid)
-{
- struct cmm_thread_info *cmi;
- int rc;
- ENTRY;
-
- cmi = cmm_env_info(env);
- rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
- if (rc)
- RETURN(rc);
-
- if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
- RETURN(0);
-
- rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
- RETURN(rc);
-}
-
-static const struct md_dir_operations cml_dir_ops = {
- .mdo_is_subdir = cmm_is_subdir,
- .mdo_lookup = cml_lookup,
- .mdo_lock_mode = cml_lock_mode,
- .mdo_create = cml_create,
- .mdo_link = cml_link,
- .mdo_unlink = cml_unlink,
- .mdo_lum_lmm_cmp = cml_lum_lmm_cmp,
- .mdo_name_insert = cml_name_insert,
- .mdo_rename = cml_rename,
- .mdo_rename_tgt = cml_rename_tgt,
- .mdo_create_data = cml_create_data,
-};
-/** @} */
-/** @} */
-
-/**
- * \addtogroup cmr
- * @{
- */
-/**
- * \name cmr helpers
- * @{
- */
-/** Get cmr_object from lu_object. */
-static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
-{
- return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
-}
-/** Get cmr_object from md_object. */
-static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
-{
- return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
-}
-/** Get cmr_object from cmm_object. */
-static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
-{
- return container_of0(co, struct cmr_object, cmm_obj);
-}
-/** @} */
-
-/**
- * Get proper child device from MDCs.
- */
-static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
-{
- struct lu_device *next = NULL;
- struct mdc_device *mdc;
-
- cfs_spin_lock(&d->cmm_tgt_guard);
- cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
- if (mdc->mc_num == num) {
- next = mdc2lu_dev(mdc);
- break;
- }
- }
- cfs_spin_unlock(&d->cmm_tgt_guard);
- return next;
-}
-
-/**
- * Free cmr_object.
- */
-static void cmr_object_free(const struct lu_env *env,
- struct lu_object *lo)
-{
- struct cmr_object *cro = lu2cmr_obj(lo);
- lu_object_fini(lo);
- OBD_FREE_PTR(cro);
-}
-
-/**
- * Initialize cmr object.
- */
-static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
- const struct lu_object_conf *unused)
-{
- struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
- struct lu_device *c_dev;
- struct lu_object *c_obj;
- int rc;
-
- ENTRY;
-
- c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
- if (c_dev == NULL) {
- rc = -ENOENT;
- } else {
- c_obj = c_dev->ld_ops->ldo_object_alloc(env,
- lo->lo_header, c_dev);
- if (c_obj != NULL) {
- lu_object_add(lo, c_obj);
- rc = 0;
- } else {
- rc = -ENOMEM;
- }
- }
-
- RETURN(rc);
-}
-
-/**
- * Output lu_object data.
- */
-static int cmr_object_print(const struct lu_env *env, void *cookie,
- lu_printer_t p, const struct lu_object *lo)
-{
- const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
- return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
-}
-
-/**
- * Cmr instance of lu_object_operations.
- */
-static const struct lu_object_operations cmr_obj_ops = {
- .loo_object_init = cmr_object_init,
- .loo_object_free = cmr_object_free,
- .loo_object_print = cmr_object_print
-};
-
-/**
- * \name cmr remote md_object operations.
- * All operations here are invalid and return errors. There is no local object
- * so these operations return two kinds of error:
- * -# -EFAULT if operation is prohibited.
- * -# -EREMOTE if operation can be done just to notify upper level about remote
- * object.
- *
- * @{
- */
-static int cmr_object_create(const struct lu_env *env,
- struct md_object *mo,
- const struct md_op_spec *spec,
- struct md_attr *ma)
-{
- return -EFAULT;
-}
-
-static int cmr_permission(const struct lu_env *env,
- struct md_object *p, struct md_object *c,
- struct md_attr *attr, int mask)
-{
- return -EREMOTE;
-}
-
-static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
- struct md_attr *attr)
-{
- return -EREMOTE;
-}
-
-static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
- const struct md_attr *attr)
-{
- return -EFAULT;
-}
-
-static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
- struct lu_buf *buf, const char *name)
-{
- return -EFAULT;
-}
-
-static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
- struct lu_buf *buf)
-{
- return -EFAULT;
-}
-
-static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type,
- int flags, struct md_object *mo)
-{
- return -EFAULT;
-}
-
-static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
- struct lu_buf *buf)
-{
- return -EFAULT;
-}
-
-static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
- const struct lu_buf *buf, const char *name,
- int fl)
-{
- return -EFAULT;
-}
-
-static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
- const char *name)
-{
- return -EFAULT;
-}
-
-static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
- const struct md_attr *ma)
-{
- return -EFAULT;
-}
-
-static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma)
-{
- return -EFAULT;
-}
-
-static int cmr_open(const struct lu_env *env, struct md_object *mo,
- int flags)
-{
- return -EREMOTE;
-}
-
-static int cmr_close(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma, int mode)
-{
- return -EFAULT;
-}
-
-static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
- const struct lu_rdpg *rdpg)
-{
- return -EREMOTE;
-}
-
-static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
- struct lustre_capa *capa, int renewal)
-{
- return -EFAULT;
-}
-
-static int cmr_path(const struct lu_env *env, struct md_object *obj,
- char *path, int pathlen, __u64 *recno, int *linkno)
-{
- return -EREMOTE;
-}
-
-static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
-{
- return -EFAULT;
-}
-
-static int cmr_file_lock(const struct lu_env *env, struct md_object *mo,
- struct lov_mds_md *lmm, struct ldlm_extent *extent,
- struct lustre_handle *lockh)
-{
- return -EREMOTE;
-}
-
-static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo,
- struct lov_mds_md *lmm, struct lustre_handle *lockh)
-{
- return -EREMOTE;
-}
-
-static int cmr_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
- const struct md_op_spec *spec, struct md_attr *ma)
-{
- return -EREMOTE;
-}
-
-/** Set of md_object_operations for cmr. */
-static const struct md_object_operations cmr_mo_ops = {
- .moo_permission = cmr_permission,
- .moo_attr_get = cmr_attr_get,
- .moo_attr_set = cmr_attr_set,
- .moo_xattr_get = cmr_xattr_get,
- .moo_xattr_set = cmr_xattr_set,
- .moo_xattr_list = cmr_xattr_list,
- .moo_xattr_del = cmr_xattr_del,
- .moo_object_create = cmr_object_create,
- .moo_ref_add = cmr_ref_add,
- .moo_ref_del = cmr_ref_del,
- .moo_open = cmr_open,
- .moo_close = cmr_close,
- .moo_readpage = cmr_readpage,
- .moo_readlink = cmr_readlink,
- .moo_changelog = cmr_changelog,
- .moo_capa_get = cmr_capa_get,
- .moo_object_sync = cmr_object_sync,
- .moo_path = cmr_path,
- .moo_file_lock = cmr_file_lock,
- .moo_file_unlock = cmr_file_unlock,
-};
-/** @} */
-
-/**
- * \name cmr md_dir operations.
- *
- * All methods below are cross-ref by nature. They consist of remote call and
- * local operation. Due to future rollback functionality there are several
- * limitations for such methods:
- * -# remote call should be done at first to do epoch negotiation between all
- * MDS involved and to avoid the RPC inside transaction.
- * -# only one RPC can be sent - also due to epoch negotiation.
- * For more details see rollback HLD/DLD.
- * @{
- */
-static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
- const struct lu_name *lname, struct lu_fid *lf,
- struct md_op_spec *spec)
-{
- /*
- * This can happens while rename() If new parent is remote dir, lookup
- * will happen here.
- */
-
- return -EREMOTE;
-}
-
-/** Return lock mode. */
-static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
- struct md_object *mo, mdl_mode_t lm)
-{
- return MDL_MINMODE;
-}
-
-/**
- * Create operation for cmr.
- * Remote object creation and local name insert.
- *
- * \param mo_p Parent directory. Local object.
- * \param lchild_name name of file to create.
- * \param mo_c Child object. It has no real inode yet.
- * \param spec creation specification.
- * \param ma child object attributes.
- */
-static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
- const struct lu_name *lchild_name, struct md_object *mo_c,
- struct md_op_spec *spec,
- struct md_attr *ma)
-{
- struct cmm_thread_info *cmi;
- struct md_attr *tmp_ma;
- int rc;
- ENTRY;
-
- /* Make sure that name isn't exist before doing remote call. */
- rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
- &cmm_env_info(env)->cmi_fid, NULL);
- if (rc == 0)
- RETURN(-EEXIST);
- else if (rc != -ENOENT)
- RETURN(rc);
-
- /* check the SGID attr */
- cmi = cmm_env_info(env);
- LASSERT(cmi);
- tmp_ma = &cmi->cmi_ma;
- tmp_ma->ma_valid = 0;
- tmp_ma->ma_need = MA_INODE;
-
-#ifdef CONFIG_FS_POSIX_ACL
- if (!S_ISLNK(ma->ma_attr.la_mode)) {
- tmp_ma->ma_acl = cmi->cmi_xattr_buf;
- tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
- tmp_ma->ma_need |= MA_ACL_DEF;
- }
-#endif
- rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
- if (rc)
- RETURN(rc);
-
- if (tmp_ma->ma_attr.la_mode & S_ISGID) {
- ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
- if (S_ISDIR(ma->ma_attr.la_mode)) {
- ma->ma_attr.la_mode |= S_ISGID;
- ma->ma_attr.la_valid |= LA_MODE;
- }
- }
-
-#ifdef CONFIG_FS_POSIX_ACL
- if (tmp_ma->ma_valid & MA_ACL_DEF) {
- spec->u.sp_ea.fid = spec->u.sp_pfid;
- spec->u.sp_ea.eadata = tmp_ma->ma_acl;
- spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
- spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
- }
-#endif
-
- /* Local permission check for name_insert before remote ops. */
- rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
- (S_ISDIR(ma->ma_attr.la_mode) ?
- MAY_LINK : MAY_CREATE));
- if (rc)
- RETURN(rc);
-
- /**
- * \note \a ma will be changed after mo_object_create(), but we will use
- * it for mdo_name_insert() later, so save it before mo_object_create().
- */
- *tmp_ma = *ma;
- rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
- if (rc == 0) {
- tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
- rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
- lu_object_fid(&mo_c->mo_lu), tmp_ma);
- if (unlikely(rc)) {
- /* TODO: remove object mo_c on remote MDS */
- CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
- " [name %s] [mo_c "DFID"] [err %d]\n",
- PFID(lu_object_fid(&mo_p->mo_lu)),
- lchild_name->ln_name,
- PFID(lu_object_fid(&mo_c->mo_lu)), rc);
- }
- }
-
- RETURN(rc);
-}
-
-/**
- * Link operations for cmr.
- *
- * The link RPC is always issued to the server where source parent is living.
- * The first operation to do is object nlink increment on remote server.
- * Second one is local mdo_name_insert().
- *
- * \param mo_p parent directory. It is local.
- * \param mo_s source object to link. It is remote.
- * \param lname Name of link file.
- * \param ma object attributes.
- */
-static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
- struct md_object *mo_s, const struct lu_name *lname,
- struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
- /* Make sure that name isn't exist before doing remote call. */
- rc = mdo_lookup(env, md_object_next(mo_p), lname,
- &cmm_env_info(env)->cmi_fid, NULL);
- if (rc == 0) {
- rc = -EEXIST;
- } else if (rc == -ENOENT) {
- /* Local permission check for name_insert before remote ops. */
- rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
- MAY_CREATE);
- if (rc)
- RETURN(rc);
-
- rc = mo_ref_add(env, md_object_next(mo_s), ma);
- if (rc == 0) {
- ma->ma_attr_flags |= MDS_PERM_BYPASS;
- rc = mdo_name_insert(env, md_object_next(mo_p), lname,
- lu_object_fid(&mo_s->mo_lu), ma);
- if (unlikely(rc)) {
- /* TODO: ref_del from mo_s on remote MDS */
- CWARN("cmr_link failed, should revoke: "
- "[mo_p "DFID"] [mo_s "DFID"] "
- "[name %s] [err %d]\n",
- PFID(lu_object_fid(&mo_p->mo_lu)),
- PFID(lu_object_fid(&mo_s->mo_lu)),
- lname->ln_name, rc);
- }
- }
- }
- RETURN(rc);
-}
-
-/**
- * Unlink operations for cmr.
- *
- * The unlink RPC is always issued to the server where parent is living. Hence
- * the first operation to do is object unlink on remote server. Second one is
- * local mdo_name_remove().
- *
- * \param mo_p parent md_object. It is local.
- * \param mo_c child object to be unlinked. It is remote.
- * \param lname Name of file to unlink.
- * \param ma object attributes.
- */
-static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
- struct md_object *mo_c, const struct lu_name *lname,
- struct md_attr *ma)
-{
- struct cmm_thread_info *cmi;
- struct md_attr *tmp_ma;
- int rc;
- ENTRY;
-
- /* Local permission check for name_remove before remote ops. */
- rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
- MAY_UNLINK | MAY_VTX_PART);
- if (rc)
- RETURN(rc);
-
- /*
- * \note \a ma will be changed after mo_ref_del, but we will use
- * it for mdo_name_remove() later, so save it before mo_ref_del().
- */
- cmi = cmm_env_info(env);
- tmp_ma = &cmi->cmi_ma;
- *tmp_ma = *ma;
- rc = mo_ref_del(env, md_object_next(mo_c), ma);
- if (rc == 0) {
- tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
- rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
- if (unlikely(rc)) {
- /* TODO: ref_add to mo_c on remote MDS */
- CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
- " [mo_c "DFID"] [name %s] [err %d]\n",
- PFID(lu_object_fid(&mo_p->mo_lu)),
- PFID(lu_object_fid(&mo_c->mo_lu)),
- lname->ln_name, rc);
- }
- }
-
- RETURN(rc);
-}
-
-/** Helper which outputs error message during cmr_rename() */
-static inline void cmr_rename_warn(const char *fname,
- struct md_object *mo_po,
- struct md_object *mo_pn,
- const struct lu_fid *lf,
- const char *s_name,
- const char *t_name,
- int err)
-{
- CWARN("cmr_rename failed for %s, should revoke: "
- "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
- "[sname %s] [tname %s] [err %d]\n", fname,
- PFID(lu_object_fid(&mo_po->mo_lu)),
- PFID(lu_object_fid(&mo_pn->mo_lu)),
- PFID(lf), s_name, t_name, err);
-}
-
-/**
- * Rename operation for cmr.
- *
- * This is the most complex cross-reference operation. It may consist of up to 4
- * MDS server and require several RPCs to be sent.
- *
- * \param mo_po Old parent object.
- * \param mo_pn New parent object.
- * \param lf FID of object to rename.
- * \param ls_name Source file name.
- * \param mo_t target object. Should be NULL here.
- * \param lt_name Name of target file.
- * \param ma object attributes.
- */
-static int cmr_rename(const struct lu_env *env,
- struct md_object *mo_po, struct md_object *mo_pn,
- const struct lu_fid *lf, const struct lu_name *ls_name,
- struct md_object *mo_t, const struct lu_name *lt_name,
- struct md_attr *ma)
-{
- struct cmm_thread_info *cmi;
- struct md_attr *tmp_ma;
- int rc;
- ENTRY;
-
- LASSERT(mo_t == NULL);
-
- /* get real type of src */
- rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
- if (rc)
- RETURN(rc);
-
- /* Local permission check for name_remove before remote ops. */
- rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
- MAY_UNLINK | MAY_VTX_FULL);
- if (rc)
- RETURN(rc);
-
- /**
- * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
- * for mdo_name_remove() later, so save it before mdo_rename_tgt.
- */
- cmi = cmm_env_info(env);
- tmp_ma = &cmi->cmi_ma;
- *tmp_ma = *ma;
- /**
- * \note The \a mo_pn is remote directory, so we cannot even know if there is
- * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
- * lookup and process this further.
- */
- rc = mdo_rename_tgt(env, md_object_next(mo_pn),
- NULL/* mo_t */, lf, lt_name, ma);
- if (rc)
- RETURN(rc);
-
- tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
-
- /* src object maybe on remote MDS, do remote ops first. */
- rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
- if (unlikely(rc)) {
- /* TODO: revoke mdo_rename_tgt */
- cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
- ls_name->ln_name, lt_name->ln_name, rc);
- RETURN(rc);
- }
-
- /* only old name is removed localy */
- rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
- if (unlikely(rc))
- /* TODO: revoke all cmr_rename */
- cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
- ls_name->ln_name, lt_name->ln_name, rc);
-
- RETURN(rc);
-}
-
-/**
- * Part of cross-ref rename().
- * Used to insert new name in new parent and unlink target.
- */
-static int cmr_rename_tgt(const struct lu_env *env,
- struct md_object *mo_p, struct md_object *mo_t,
- const struct lu_fid *lf, const struct lu_name *lname,
- struct md_attr *ma)
-{
- struct cmm_thread_info *cmi;
- struct md_attr *tmp_ma;
- int rc;
- ENTRY;
-
- /* target object is remote one */
- /* Local permission check for rename_tgt before remote ops. */
- rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
- MAY_UNLINK | MAY_VTX_PART);
- if (rc)
- RETURN(rc);
-
- /*
- * XXX: @ma maybe changed after mo_ref_del, but we will use
- * it for mdo_rename_tgt later, so save it before mo_ref_del.
- */
- cmi = cmm_env_info(env);
- tmp_ma = &cmi->cmi_ma;
- *tmp_ma = *ma;
- rc = mo_ref_del(env, md_object_next(mo_t), ma);
- /* continue locally with name handling only */
- if (rc == 0) {
- tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
- rc = mdo_rename_tgt(env, md_object_next(mo_p),
- NULL, lf, lname, tmp_ma);
- if (unlikely(rc)) {
- /* TODO: ref_add to mo_t on remote MDS */
- CWARN("cmr_rename_tgt failed, should revoke: "
- "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
- "[name %s] [err %d]\n",
- PFID(lu_object_fid(&mo_p->mo_lu)),
- PFID(lu_object_fid(&mo_t->mo_lu)),
- PFID(lf),
- lname->ln_name, rc);
- }
- }
- RETURN(rc);
-}
-/** @} */
-/**
- * The md_dir_operations for cmr.
- */
-static const struct md_dir_operations cmr_dir_ops = {
- .mdo_is_subdir = cmm_is_subdir,
- .mdo_lookup = cmr_lookup,
- .mdo_lock_mode = cmr_lock_mode,
- .mdo_create = cmr_create,
- .mdo_link = cmr_link,
- .mdo_unlink = cmr_unlink,
- .mdo_lum_lmm_cmp = cmr_lum_lmm_cmp,
- .mdo_rename = cmr_rename,
- .mdo_rename_tgt = cmr_rename_tgt
-};
-/** @} */
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_split.c
- *
- * Lustre splitting dir
- *
- * Author: Alex Thomas <alex@clusterfs.com>
- * Author: Wang Di <wangdi@clusterfs.com>
- * Author: Yury Umanets <umka@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <obd_class.h>
-#include <lustre_fid.h>
-#include <lustre_mds.h>
-#include <lustre/lustre_idl.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-/**
- * \addtogroup split
- * @{
- */
-enum {
- CMM_SPLIT_SIZE = 128 * 1024
-};
-
-/**
- * This function checks if passed \a name come to correct server (local MDT).
- *
- * \param mp Parent directory
- * \param name Name to lookup
- * \retval -ERESTART Let client know that dir was split and client needs to
- * chose correct stripe.
- */
-int cmm_split_check(const struct lu_env *env, struct md_object *mp,
- const char *name)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
- struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
- struct cml_object *clo = md2cml_obj(mp);
- int rc, lmv_size;
- ENTRY;
-
- cmm_lprocfs_time_start(env);
-
- /* Not split yet */
- if (clo->clo_split == CMM_SPLIT_NONE ||
- clo->clo_split == CMM_SPLIT_DENIED)
- GOTO(out, rc = 0);
-
- lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
-
- /* Try to get the LMV EA */
- memset(ma, 0, sizeof(*ma));
-
- ma->ma_need = MA_LMV;
- ma->ma_lmv_size = lmv_size;
- OBD_ALLOC(ma->ma_lmv, lmv_size);
- if (ma->ma_lmv == NULL)
- GOTO(out, rc = -ENOMEM);
-
- /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
- rc = mo_attr_get(env, mp, ma);
- if (rc)
- GOTO(cleanup, rc);
-
- /* No LMV just return */
- if (!(ma->ma_valid & MA_LMV)) {
- /* update split state if unknown */
- if (clo->clo_split == CMM_SPLIT_UNKNOWN)
- clo->clo_split = CMM_SPLIT_NONE;
- GOTO(cleanup, rc = 0);
- }
-
- /* Skip checking the slave dirs (mea_count is 0) */
- if (ma->ma_lmv->mea_count != 0) {
- int idx;
-
- /**
- * This gets stripe by name to check the name belongs to master
- * dir, otherwise return the -ERESTART
- */
- idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
-
- /**
- * When client does not know about split, it sends create() to
- * the master MDT and master replay back if directory is split.
- * So client should orward request to correct MDT. This
- * is why we check here if stripe zero or not. Zero stripe means
- * master stripe. If stripe calculated from name is not zero -
- * return -ERESTART.
- */
- if (idx != 0)
- rc = -ERESTART;
-
- /* update split state to DONE if unknown */
- if (clo->clo_split == CMM_SPLIT_UNKNOWN)
- clo->clo_split = CMM_SPLIT_DONE;
- } else {
- /* split is denied for slave dir */
- clo->clo_split = CMM_SPLIT_DENIED;
- }
- EXIT;
-cleanup:
- OBD_FREE(ma->ma_lmv, lmv_size);
-out:
- cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
- return rc;
-}
-
-/**
- * Return preferable access mode to the caller taking into account the split
- * case and the fact of existing not splittable dirs.
- */
-int cmm_split_access(const struct lu_env *env, struct md_object *mo,
- mdl_mode_t lm)
-{
- struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
- int rc, split;
- ENTRY;
-
- memset(ma, 0, sizeof(*ma));
-
- /*
- * Check only if we need protection from split. If not - mdt handles
- * other cases.
- */
- rc = cmm_split_expect(env, mo, ma, &split);
- if (rc) {
- CERROR("Can't check for possible split, rc %d\n", rc);
- RETURN(MDL_MINMODE);
- }
-
- /*
- * Do not take PDO lock on non-splittable objects if this is not PW,
- * this should speed things up a bit.
- */
- if (split == CMM_SPLIT_DONE && lm != MDL_PW)
- RETURN(MDL_NL);
-
- /* Protect splitting by exclusive lock. */
- if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
- RETURN(MDL_EX);
-
- /*
- * Have no idea about lock mode, let it be what higher layer wants.
- */
- RETURN(MDL_MINMODE);
-}
-
-/**
- * Check if split is expected for current thread.
- *
- * \param mo Directory to split.
- * \param ma md attributes.
- * \param split Flag to save split information.
- */
-int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma, int *split)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct cml_object *clo = md2cml_obj(mo);
- struct lu_fid root_fid;
- int rc;
- ENTRY;
-
- if (clo->clo_split == CMM_SPLIT_DONE ||
- clo->clo_split == CMM_SPLIT_DENIED) {
- *split = clo->clo_split;
- RETURN(0);
- }
- /* CMM_SPLIT_UNKNOWN case below */
-
- /* No need to split root object. */
- rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
- &root_fid);
- if (rc)
- RETURN(rc);
-
- if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
- /* update split state */
- *split = clo->clo_split == CMM_SPLIT_DENIED;
- RETURN(0);
- }
-
- /*
- * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
- * for this get_attr.
- */
- LASSERT(ma->ma_valid == 0);
- ma->ma_need = MA_INODE | MA_LMV;
- rc = mo_attr_get(env, mo, ma);
- if (rc)
- RETURN(rc);
-
- /* No need split for already split object */
- if (ma->ma_valid & MA_LMV) {
- LASSERT(ma->ma_lmv_size > 0);
- *split = clo->clo_split = CMM_SPLIT_DONE;
- RETURN(0);
- }
-
- /* No need split for object whose size < CMM_SPLIT_SIZE */
- if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
- *split = clo->clo_split = CMM_SPLIT_NONE;
- RETURN(0);
- }
-
- *split = clo->clo_split = CMM_SPLIT_NEEDED;
- RETURN(0);
-}
-
-struct cmm_object *cmm_object_find(const struct lu_env *env,
- struct cmm_device *d,
- const struct lu_fid *f)
-{
- return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
-}
-
-static inline void cmm_object_put(const struct lu_env *env,
- struct cmm_object *o)
-{
- lu_object_put(env, &o->cmo_obj.mo_lu);
-}
-
-/**
- * Allocate new FID on passed \a mc for slave object which is going to
- * create there soon.
- */
-static int cmm_split_fid_alloc(const struct lu_env *env,
- struct cmm_device *cmm,
- struct mdc_device *mc,
- struct lu_fid *fid)
-{
- int rc;
- ENTRY;
-
- LASSERT(cmm != NULL && mc != NULL && fid != NULL);
-
- cfs_down(&mc->mc_fid_sem);
-
- /* Alloc new fid on \a mc. */
- rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
- if (rc > 0)
- rc = 0;
- cfs_up(&mc->mc_fid_sem);
-
- RETURN(rc);
-}
-
-/**
- * Allocate new slave object on passed \a mc.
- */
-static int cmm_split_slave_create(const struct lu_env *env,
- struct cmm_device *cmm,
- struct mdc_device *mc,
- struct lu_fid *fid,
- struct md_attr *ma,
- struct lmv_stripe_md *lmv,
- int lmv_size)
-{
- struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
- struct cmm_object *obj;
- int rc;
- ENTRY;
-
- /* Allocate new fid and store it to @fid */
- rc = cmm_split_fid_alloc(env, cmm, mc, fid);
- if (rc) {
- CERROR("Can't alloc new fid on "LPU64
- ", rc %d\n", mc->mc_num, rc);
- RETURN(rc);
- }
-
- /* Allocate new object on @mc */
- obj = cmm_object_find(env, cmm, fid);
- if (IS_ERR(obj))
- RETURN(PTR_ERR(obj));
-
- memset(spec, 0, sizeof *spec);
- spec->u.sp_ea.fid = fid;
- spec->u.sp_ea.eadata = lmv;
- spec->u.sp_ea.eadatalen = lmv_size;
- spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
- rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
- spec, ma);
- cmm_object_put(env, obj);
- RETURN(rc);
-}
-
-/**
- * Create so many slaves as number of stripes.
- * This is called in split time before sending pages to slaves.
- */
-static int cmm_split_slaves_create(const struct lu_env *env,
- struct md_object *mo,
- struct md_attr *ma)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
- struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
- struct mdc_device *mc, *tmp;
- struct lmv_stripe_md *lmv;
- int i = 1, rc = 0;
- ENTRY;
-
- /* Init the split MEA */
- lmv = ma->ma_lmv;
- lmv->mea_master = cmm->cmm_local_num;
- lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
- lmv->mea_count = cmm->cmm_tgt_count + 1;
-
- /*
- * Store master FID to local node idx number. Local node is always
- * master and its stripe number if 0.
- */
- lmv->mea_ids[0] = *lf;
-
- memset(slave_lmv, 0, sizeof *slave_lmv);
- slave_lmv->mea_master = cmm->cmm_local_num;
- slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
- slave_lmv->mea_count = 0;
-
- cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
- rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
- ma, slave_lmv, sizeof(*slave_lmv));
- if (rc)
- GOTO(cleanup, rc);
- i++;
- }
- EXIT;
-cleanup:
- return rc;
-}
-
-static inline int cmm_split_special_entry(struct lu_dirent *ent)
-{
- if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
- !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
- return 1;
- return 0;
-}
-
-/**
- * Convert string to the lu_name structure.
- */
-static inline struct lu_name *cmm_name(const struct lu_env *env,
- char *name, int buflen)
-{
- struct lu_name *lname;
- struct cmm_thread_info *cmi;
-
- LASSERT(buflen > 0);
- LASSERT(name[buflen - 1] == '\0');
-
- cmi = cmm_env_info(env);
- lname = &cmi->cti_name;
- lname->ln_name = name;
- /* do NOT count the terminating '\0' of name for length */
- lname->ln_namelen = buflen - 1;
- return lname;
-}
-
-/**
- * Helper for cmm_split_remove_page(). It removes one entry from local MDT.
- * Do not corrupt byte order in page, it will be sent to remote MDT.
- */
-static int cmm_split_remove_entry(const struct lu_env *env,
- struct md_object *mo,
- struct lu_dirent *ent)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct cmm_thread_info *cmi;
- struct md_attr *ma;
- struct cmm_object *obj;
- int is_dir, rc;
- char *name;
- struct lu_name *lname;
- ENTRY;
-
- if (cmm_split_special_entry(ent))
- RETURN(0);
-
- fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
- obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
- if (IS_ERR(obj))
- RETURN(PTR_ERR(obj));
-
- cmi = cmm_env_info(env);
- ma = &cmi->cmi_ma;
-
- if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
- is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
- else
- /**
- * \note These days only cross-ref dirs are possible, so for the
- * sake of simplicity, in split, we suppose that all cross-ref
- * names point to directory and do not do additional getattr to
- * remote MDT.
- */
- is_dir = 1;
-
- OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
- if (!name)
- GOTO(cleanup, rc = -ENOMEM);
-
- memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
- lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
- /**
- * \note When split, no need update parent's ctime,
- * and no permission check for name_remove.
- */
- ma->ma_attr.la_ctime = 0;
- if (is_dir)
- ma->ma_attr.la_mode = S_IFDIR;
- else
- ma->ma_attr.la_mode = 0;
- ma->ma_attr.la_valid = LA_MODE;
- ma->ma_valid = MA_INODE;
-
- ma->ma_attr_flags |= MDS_PERM_BYPASS;
- rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
- OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
- if (rc)
- GOTO(cleanup, rc);
-
- /**
- * \note For each entry transferred to the slave MDS we should know
- * whether this object is dir or not. Therefore the highest bit of the
- * hash is used to indicate that (it is unused for hash purposes anyway).
- */
- if (is_dir) {
- ent->lde_hash = le64_to_cpu(ent->lde_hash);
- ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
- }
- EXIT;
-cleanup:
- cmm_object_put(env, obj);
- return rc;
-}
-
-/**
- * Remove all entries from passed page.
- * These entries are going to remote MDT and thus should be removed locally.
- */
-static int cmm_split_remove_page(const struct lu_env *env,
- struct md_object *mo,
- struct lu_rdpg *rdpg,
- __u64 hash_end, __u32 *len)
-{
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc = 0;
- ENTRY;
-
- *len = 0;
- cfs_kmap(rdpg->rp_pages[0]);
- dp = page_address(rdpg->rp_pages[0]);
- for (ent = lu_dirent_start(dp);
- ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
- ent = lu_dirent_next(ent)) {
- rc = cmm_split_remove_entry(env, mo, ent);
- if (rc) {
- /*
- * XXX: Error handler to insert remove name back,
- * currently we assumed it will success anyway in
- * verfication test.
- */
- CERROR("Can not del %*.*s, rc %d\n",
- le16_to_cpu(ent->lde_namelen),
- le16_to_cpu(ent->lde_namelen),
- ent->lde_name, rc);
- GOTO(unmap, rc);
- }
- *len += lu_dirent_size(ent);
- }
-
- if (ent != lu_dirent_start(dp))
- *len += sizeof(struct lu_dirpage);
- EXIT;
-unmap:
- cfs_kunmap(rdpg->rp_pages[0]);
- return rc;
-}
-
-/**
- * Send one page of entries to the slave MDT.
- * This page contains entries to be created there.
- */
-static int cmm_split_send_page(const struct lu_env *env,
- struct md_object *mo,
- struct lu_rdpg *rdpg,
- struct lu_fid *fid, int len)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct cmm_object *obj;
- int rc = 0;
- ENTRY;
-
- obj = cmm_object_find(env, cmm, fid);
- if (IS_ERR(obj))
- RETURN(PTR_ERR(obj));
-
- rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
- rdpg->rp_pages[0], len);
- cmm_object_put(env, obj);
- RETURN(rc);
-}
-
-/** Read one page of entries from local MDT. */
-static int cmm_split_read_page(const struct lu_env *env,
- struct md_object *mo,
- struct lu_rdpg *rdpg)
-{
- int rc;
- ENTRY;
- memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
- cfs_kunmap(rdpg->rp_pages[0]);
- rc = mo_readpage(env, md_object_next(mo), rdpg);
- RETURN(rc);
-}
-
-/**
- * This function performs migration of each directory stripe to its MDS.
- */
-static int cmm_split_process_stripe(const struct lu_env *env,
- struct md_object *mo,
- struct lu_rdpg *rdpg,
- struct lu_fid *lf,
- __u64 end)
-{
- int rc, done = 0;
- ENTRY;
-
- LASSERT(rdpg->rp_npages == 1);
- do {
- struct lu_dirpage *ldp;
- __u32 len = 0;
-
- /** - Read one page of entries from local MDT. */
- rc = cmm_split_read_page(env, mo, rdpg);
- if (rc) {
- CERROR("Error in readpage: %d\n", rc);
- RETURN(rc);
- }
-
- /** - Remove local entries which are going to remite MDT. */
- rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
- if (rc) {
- CERROR("Error in remove stripe entries: %d\n", rc);
- RETURN(rc);
- }
-
- /**
- * - Send entries page to slave MDT and repeat while there are
- * more pages.
- */
- if (len > 0) {
- rc = cmm_split_send_page(env, mo, rdpg, lf, len);
- if (rc) {
- CERROR("Error in sending page: %d\n", rc);
- RETURN(rc);
- }
- }
-
- cfs_kmap(rdpg->rp_pages[0]);
- ldp = page_address(rdpg->rp_pages[0]);
- if (le64_to_cpu(ldp->ldp_hash_end) >= end)
- done = 1;
-
- rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
- cfs_kunmap(rdpg->rp_pages[0]);
- } while (!done);
-
- RETURN(rc);
-}
-
-/**
- * Directory scanner for split operation.
- *
- * It calculates hashes for names and organizes files to stripes.
- */
-static int cmm_split_process_dir(const struct lu_env *env,
- struct md_object *mo,
- struct md_attr *ma)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
- __u64 hash_segment;
- int rc = 0, i;
- ENTRY;
-
- memset(rdpg, 0, sizeof *rdpg);
- rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
- rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
- rdpg->rp_pages = cmm_env_info(env)->cmi_pages;
-
- for (i = 0; i < rdpg->rp_npages; i++) {
- rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
- if (rdpg->rp_pages[i] == NULL)
- GOTO(cleanup, rc = -ENOMEM);
- }
-
- hash_segment = MAX_HASH_SIZE;
- /** Whole hash range is divided on segments by number of MDS-es. */
- do_div(hash_segment, cmm->cmm_tgt_count + 1);
- /**
- * For each segment the cmm_split_process_stripe() is called to move
- * entries on new server.
- */
- for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
- struct lu_fid *lf;
- __u64 hash_end;
-
- lf = &ma->ma_lmv->mea_ids[i];
-
- rdpg->rp_hash = i * hash_segment;
- if (i == cmm->cmm_tgt_count)
- hash_end = MAX_HASH_SIZE;
- else
- hash_end = rdpg->rp_hash + hash_segment;
- rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
- if (rc) {
- CERROR("Error (rc = %d) while splitting for %d: fid="
- DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
- rdpg->rp_hash, hash_end);
- GOTO(cleanup, rc);
- }
- }
- EXIT;
-cleanup:
- for (i = 0; i < rdpg->rp_npages; i++)
- if (rdpg->rp_pages[i] != NULL)
- cfs_free_page(rdpg->rp_pages[i]);
- return rc;
-}
-
-/**
- * Directory splitting.
- *
- * Big directory can be split eventually.
- */
-int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
-{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
- int rc = 0, split;
- struct lu_buf *buf;
- ENTRY;
-
- cmm_lprocfs_time_start(env);
-
- LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
- memset(ma, 0, sizeof(*ma));
-
- /** - Step1: Checking whether the dir needs to be split. */
- rc = cmm_split_expect(env, mo, ma, &split);
- if (rc)
- GOTO(out, rc);
-
- if (split != CMM_SPLIT_NEEDED) {
- /* No split is needed, caller may proceed with create. */
- GOTO(out, rc = 0);
- }
-
- /* Split should be done now, let's do it. */
- CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
- PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
-
- /**
- * /note Disable transactions for split, since there will be so many trans in
- * this one ops, conflict with current recovery design.
- */
- rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL);
- if (rc) {
- CERROR("Can't disable trans for split, rc %d\n", rc);
- GOTO(out, rc);
- }
-
- /** - Step2: Prepare the md memory */
- ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
- OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
- if (ma->ma_lmv == NULL)
- GOTO(out, rc = -ENOMEM);
-
- /** - Step3: Create slave objects and fill the ma->ma_lmv */
- rc = cmm_split_slaves_create(env, mo, ma);
- if (rc) {
- CERROR("Can't create slaves for split, rc %d\n", rc);
- GOTO(cleanup, rc);
- }
-
- /** - Step4: Scan and split the object. */
- rc = cmm_split_process_dir(env, mo, ma);
- if (rc) {
- CERROR("Can't scan and split, rc %d\n", rc);
- GOTO(cleanup, rc);
- }
-
- /** - Step5: Set mea to the master object. */
- buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
- rc = mo_xattr_set(env, md_object_next(mo), buf,
- MDS_LMV_MD_NAME, 0);
- if (rc) {
- CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
- GOTO(cleanup, rc);
- }
-
- /* set flag in cmm_object */
- md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
-
- /**
- * - Finally, split succeed, tell client to repeat opetartion on correct
- * MDT.
- */
- CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
- rc = -ERESTART;
- EXIT;
-cleanup:
- OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
-out:
- cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
- return rc;
-}
-/** @} */
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/mdc_device.c
- *
- * Lustre Metadata Client (mdc)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <obd.h>
-#include <obd_class.h>
-#include <lprocfs_status.h>
-#include <lustre_ver.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-static const struct lu_device_operations mdc_lu_ops;
-/**
- * \addtogroup cmm_mdc
- * @{
- */
-/**
- * The md_device_operation for mdc. It is empty.
- */
-static const struct md_device_operations mdc_md_ops = { 0 };
-
-/**
- * Upcall handler in mdc. Analog of obd_device::o_notify().
- */
-static int mdc_obd_update(struct obd_device *host,
- struct obd_device *watched,
- enum obd_notify_event ev, void *owner, void *data)
-{
- struct mdc_device *mc = owner;
- int rc = 0;
- ENTRY;
-
- LASSERT(mc != NULL);
- CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
- if (ev == OBD_NOTIFY_ACTIVE) {
- CDEBUG(D_INFO|D_WARNING, "Device %s is active now\n",
- watched->obd_name);
- } else if (ev == OBD_NOTIFY_INACTIVE) {
- CDEBUG(D_INFO|D_WARNING, "Device %s is inactive now\n",
- watched->obd_name);
- } else if (ev == OBD_NOTIFY_OCD) {
- struct obd_connect_data *conn_data =
- &watched->u.cli.cl_import->imp_connect_data;
- /*
- * Update exp_connect_flags.
- */
- mc->mc_desc.cl_exp->exp_connect_flags =
- conn_data->ocd_connect_flags;
- CDEBUG(D_INFO, "Update connect_flags: "LPX64"\n",
- conn_data->ocd_connect_flags);
- }
-
- RETURN(rc);
-}
-/**
- * Add new mdc device.
- * Invoked by configuration command LCFG_ADD_MDC.
- *
- * MDC OBD is set up already and connected to the proper MDS
- * mdc_add_obd() find that obd by uuid and connects to it.
- * Local MDT uuid is used for connection.
- */
-static int mdc_obd_add(const struct lu_env *env,
- struct mdc_device *mc, struct lustre_cfg *cfg)
-{
- struct mdc_cli_desc *desc = &mc->mc_desc;
- struct obd_device *mdc;
- const char *uuid_str = lustre_cfg_string(cfg, 1);
- const char *index = lustre_cfg_string(cfg, 2);
- const char *mdc_uuid_str = lustre_cfg_string(cfg, 4);
- struct md_site *ms = lu_site2md(mdc2lu_dev(mc)->ld_site);
- char *p;
- int rc = 0;
-
- ENTRY;
- LASSERT(uuid_str);
- LASSERT(index);
-
- mc->mc_num = simple_strtol(index, &p, 10);
- if (*p) {
- CERROR("Invalid index in lustre_cgf, offset 2\n");
- RETURN(-EINVAL);
- }
-
- obd_str2uuid(&desc->cl_srv_uuid, uuid_str);
- obd_str2uuid(&desc->cl_cli_uuid, mdc_uuid_str);
- /* try to find MDC OBD connected to the needed MDT */
- mdc = class_find_client_obd(&desc->cl_srv_uuid, LUSTRE_MDC_NAME,
- &desc->cl_cli_uuid);
- if (!mdc) {
- CERROR("Cannot find MDC OBD connected to %s\n", uuid_str);
- rc = -ENOENT;
- } else if (!mdc->obd_set_up) {
- CERROR("target %s not set up\n", mdc->obd_name);
- rc = -EINVAL;
- } else {
- struct obd_connect_data *ocd;
-
- CDEBUG(D_CONFIG, "connect to %s(%s)\n",
- mdc->obd_name, mdc->obd_uuid.uuid);
-
- OBD_ALLOC_PTR(ocd);
- if (!ocd)
- RETURN(-ENOMEM);
- /*
- * The connection between MDS must be local,
- * IBITS are needed for rename_lock (INODELOCK_UPDATE)
- */
- ocd->ocd_ibits_known = MDS_INODELOCK_UPDATE;
- ocd->ocd_connect_flags = OBD_CONNECT_VERSION |
- OBD_CONNECT_ACL |
- OBD_CONNECT_RMT_CLIENT |
- OBD_CONNECT_MDS_CAPA |
- OBD_CONNECT_OSS_CAPA |
- OBD_CONNECT_IBITS |
- OBD_CONNECT_BRW_SIZE |
- OBD_CONNECT_MDS_MDS |
- OBD_CONNECT_FID |
- OBD_CONNECT_AT |
- OBD_CONNECT_FULL20 |
- OBD_CONNECT_64BITHASH;
- ocd->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
- rc = obd_connect(env, &desc->cl_exp, mdc, &mdc->obd_uuid, ocd, NULL);
- OBD_FREE_PTR(ocd);
- if (rc) {
- CERROR("target %s connect error %d\n",
- mdc->obd_name, rc);
- } else {
- /* set seq controller export for MDC0 if exists */
- if (mc->mc_num == 0)
- ms->ms_control_exp =
- class_export_get(desc->cl_exp);
- rc = obd_fid_init(desc->cl_exp);
- if (rc)
- CERROR("fid init error %d \n", rc);
- else {
- /* obd notify mechanism */
- mdc->obd_upcall.onu_owner = mc;
- mdc->obd_upcall.onu_upcall = mdc_obd_update;
- }
- }
-
- if (rc) {
- obd_disconnect(desc->cl_exp);
- desc->cl_exp = NULL;
- }
- }
-
- RETURN(rc);
-}
-
-/**
- * Delete mdc device.
- * Called when configuration command LCFG_CLEANUP is issued.
- *
- * This disconnects MDC OBD and cleanup it.
- */
-static int mdc_obd_del(const struct lu_env *env, struct mdc_device *mc,
- struct lustre_cfg *cfg)
-{
- struct mdc_cli_desc *desc = &mc->mc_desc;
- const char *dev = lustre_cfg_string(cfg, 0);
- struct obd_device *mdc_obd = class_exp2obd(desc->cl_exp);
- struct obd_device *mdt_obd;
- int rc;
-
- ENTRY;
-
- CDEBUG(D_CONFIG, "Disconnect from %s\n",
- mdc_obd->obd_name);
-
- /* Set mdt_obd flags in shutdown. */
- mdt_obd = class_name2obd(dev);
- LASSERT(mdt_obd != NULL);
- if (mdc_obd) {
- mdc_obd->obd_no_recov = mdt_obd->obd_no_recov;
- mdc_obd->obd_force = mdt_obd->obd_force;
- mdc_obd->obd_fail = 0;
- }
-
- rc = obd_fid_fini(desc->cl_exp);
- if (rc)
- CERROR("Fid fini error %d\n", rc);
-
- obd_register_observer(mdc_obd, NULL);
- mdc_obd->obd_upcall.onu_owner = NULL;
- mdc_obd->obd_upcall.onu_upcall = NULL;
- rc = obd_disconnect(desc->cl_exp);
- if (rc) {
- CERROR("Target %s disconnect error %d\n",
- mdc_obd->obd_name, rc);
- }
- class_manual_cleanup(mdc_obd);
- desc->cl_exp = NULL;
-
- RETURN(0);
-}
-
-/**
- * Process config command. Passed to the mdc from mdt.
- * Supports two commands only - LCFG_ADD_MDC and LCFG_CLEANUP
- */
-static int mdc_process_config(const struct lu_env *env,
- struct lu_device *ld,
- struct lustre_cfg *cfg)
-{
- struct mdc_device *mc = lu2mdc_dev(ld);
- int rc;
-
- ENTRY;
- switch (cfg->lcfg_command) {
- case LCFG_ADD_MDC:
- rc = mdc_obd_add(env, mc, cfg);
- break;
- case LCFG_CLEANUP:
- rc = mdc_obd_del(env, mc, cfg);
- break;
- default:
- rc = -EOPNOTSUPP;
- }
- RETURN(rc);
-}
-
-/**
- * lu_device_operations instance for mdc.
- */
-static const struct lu_device_operations mdc_lu_ops = {
- .ldo_object_alloc = mdc_object_alloc,
- .ldo_process_config = mdc_process_config
-};
-
-/**
- * Initialize proper easize and cookie size.
- */
-void cmm_mdc_init_ea_size(const struct lu_env *env, struct mdc_device *mc,
- int max_mdsize, int max_cookiesize)
-{
- struct obd_device *obd = class_exp2obd(mc->mc_desc.cl_exp);
-
- obd->u.cli.cl_max_mds_easize = max_mdsize;
- obd->u.cli.cl_max_mds_cookiesize = max_cookiesize;
-}
-
-/** Start mdc device */
-static int mdc_device_init(const struct lu_env *env, struct lu_device *ld,
- const char *name, struct lu_device *next)
-{
- return 0;
-}
-
-/** Stop mdc device. */
-static struct lu_device *mdc_device_fini(const struct lu_env *env,
- struct lu_device *ld)
-{
- ENTRY;
- RETURN (NULL);
-}
-
-/** Allocate new mdc device */
-static struct lu_device *mdc_device_alloc(const struct lu_env *env,
- struct lu_device_type *ldt,
- struct lustre_cfg *cfg)
-{
- struct lu_device *ld;
- struct mdc_device *mc;
- ENTRY;
-
- OBD_ALLOC_PTR(mc);
- if (mc == NULL) {
- ld = ERR_PTR(-ENOMEM);
- } else {
- md_device_init(&mc->mc_md_dev, ldt);
- mc->mc_md_dev.md_ops = &mdc_md_ops;
- ld = mdc2lu_dev(mc);
- ld->ld_ops = &mdc_lu_ops;
- cfs_sema_init(&mc->mc_fid_sem, 1);
- }
-
- RETURN (ld);
-}
-
-/** Free mdc device */
-static struct lu_device *mdc_device_free(const struct lu_env *env,
- struct lu_device *ld)
-{
- struct mdc_device *mc = lu2mdc_dev(ld);
-
- LASSERTF(cfs_atomic_read(&ld->ld_ref) == 0,
- "Refcount = %d\n", cfs_atomic_read(&ld->ld_ref));
- LASSERT(cfs_list_empty(&mc->mc_linkage));
- md_device_fini(&mc->mc_md_dev);
- OBD_FREE_PTR(mc);
- return NULL;
-}
-
-/** context key constructor/destructor: mdc_key_init, mdc_key_fini */
-LU_KEY_INIT_FINI(mdc, struct mdc_thread_info);
-
-/** context key: mdc_thread_key */
-LU_CONTEXT_KEY_DEFINE(mdc, LCT_MD_THREAD|LCT_CL_THREAD);
-
-/** type constructor/destructor: mdc_type_init, mdc_type_fini */
-LU_TYPE_INIT_FINI(mdc, &mdc_thread_key);
-
-static struct lu_device_type_operations mdc_device_type_ops = {
- .ldto_init = mdc_type_init,
- .ldto_fini = mdc_type_fini,
-
- .ldto_start = mdc_type_start,
- .ldto_stop = mdc_type_stop,
-
- .ldto_device_alloc = mdc_device_alloc,
- .ldto_device_free = mdc_device_free,
-
- .ldto_device_init = mdc_device_init,
- .ldto_device_fini = mdc_device_fini
-};
-
-struct lu_device_type mdc_device_type = {
- .ldt_tags = LU_DEVICE_MD,
- .ldt_name = LUSTRE_CMM_MDC_NAME,
- .ldt_ops = &mdc_device_type_ops,
- .ldt_ctx_tags = LCT_MD_THREAD|LCT_CL_THREAD
-};
-/** @} */
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/mdc_internal.h
- *
- * Lustre Cluster Metadata Manager (cmm), MDC device
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#ifndef _CMM_MDC_INTERNAL_H
-#define _CMM_MDC_INTERNAL_H
-
-#if defined(__KERNEL__)
-
-#include <lustre_net.h>
-#include <obd.h>
-#include <md_object.h>
-/**
- * \addtogroup cmm
- * @{
- */
-/**
- * \defgroup cmm_mdc cmm_mdc
- *
- * This is mdc wrapper device to work with old MDC obd-based devices.
- * @{
- */
-/**
- * MDC client description.
- */
-struct mdc_cli_desc {
- /** uuid of remote MDT to connect */
- struct obd_uuid cl_srv_uuid;
- /** mdc uuid */
- struct obd_uuid cl_cli_uuid;
- /** export of mdc obd */
- struct obd_export *cl_exp;
-};
-
-/**
- * MDC device.
- */
-struct mdc_device {
- /** md_device instance for MDC */
- struct md_device mc_md_dev;
- /** other MD servers in cluster */
- cfs_list_t mc_linkage;
- /** number of current device */
- mdsno_t mc_num;
- /** mdc client description */
- struct mdc_cli_desc mc_desc;
- /** Protects ??*/
- cfs_semaphore_t mc_fid_sem;
-};
-
-/**
- * mdc thread info. Local storage for varios data.
- */
-struct mdc_thread_info {
- /** Storage for md_op_data */
- struct md_op_data mci_opdata;
- /** Storage for ptlrpc request */
- struct ptlrpc_request *mci_req;
-};
-
-/** mdc object. */
-struct mdc_object {
- /** md_object instance for mdc_object */
- struct md_object mco_obj;
-};
-
-/** Get lu_device from mdc_device. */
-static inline struct lu_device *mdc2lu_dev(struct mdc_device *mc)
-{
- return (&mc->mc_md_dev.md_lu_dev);
-}
-
-/** Get mdc_device from md_device. */
-static inline struct mdc_device *md2mdc_dev(struct md_device *md)
-{
- return container_of0(md, struct mdc_device, mc_md_dev);
-}
-
-/** Get mdc_device from mdc_object. */
-static inline struct mdc_device *mdc_obj2dev(struct mdc_object *mco)
-{
- return (md2mdc_dev(md_obj2dev(&mco->mco_obj)));
-}
-
-/** Get mdc_object from lu_object. */
-static inline struct mdc_object *lu2mdc_obj(struct lu_object *lo)
-{
- return container_of0(lo, struct mdc_object, mco_obj.mo_lu);
-}
-
-/** Get mdc_object from md_object. */
-static inline struct mdc_object *md2mdc_obj(struct md_object *mo)
-{
- return container_of0(mo, struct mdc_object, mco_obj);
-}
-
-/** Get mdc_device from lu_device. */
-static inline struct mdc_device *lu2mdc_dev(struct lu_device *ld)
-{
- return container_of0(ld, struct mdc_device, mc_md_dev.md_lu_dev);
-}
-
-struct lu_object *mdc_object_alloc(const struct lu_env *,
- const struct lu_object_header *,
- struct lu_device *);
-
-void cmm_mdc_init_ea_size(const struct lu_env *env, struct mdc_device *mc,
- int max_mdsize, int max_cookiesize);
-#ifdef HAVE_SPLIT_SUPPORT
-int mdc_send_page(struct cmm_device *cmm, const struct lu_env *env,
- struct md_object *mo, struct page *page, __u32 end);
-#endif
-/** @} */
-/** @} */
-#endif /* __KERNEL__ */
-#endif /* _CMM_MDC_INTERNAL_H */
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/mdc_object.c
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-#include <obd_support.h>
-#include <lustre_lib.h>
-#include <obd_class.h>
-#include <lustre_mdc.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-static const struct md_object_operations mdc_mo_ops;
-static const struct md_dir_operations mdc_dir_ops;
-static const struct lu_object_operations mdc_obj_ops;
-
-extern struct lu_context_key mdc_thread_key;
-/**
- * \addtogroup cmm_mdc
- * @{
- */
-/**
- * Allocate new mdc object.
- */
-struct lu_object *mdc_object_alloc(const struct lu_env *env,
- const struct lu_object_header *hdr,
- struct lu_device *ld)
-{
- struct mdc_object *mco;
- ENTRY;
-
- OBD_ALLOC_PTR(mco);
- if (mco != NULL) {
- struct lu_object *lo;
-
- lo = &mco->mco_obj.mo_lu;
- lu_object_init(lo, NULL, ld);
- mco->mco_obj.mo_ops = &mdc_mo_ops;
- mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
- lo->lo_ops = &mdc_obj_ops;
- RETURN(lo);
- } else
- RETURN(NULL);
-}
-
-/** Free current mdc object */
-static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
-{
- struct mdc_object *mco = lu2mdc_obj(lo);
- lu_object_fini(lo);
- OBD_FREE_PTR(mco);
-}
-
-/**
- * Initialize mdc object. All of them have loh_attr::LOHA_REMOTE set.
- */
-static int mdc_object_init(const struct lu_env *env, struct lu_object *lo,
- const struct lu_object_conf *unused)
-{
- ENTRY;
- lo->lo_header->loh_attr |= LOHA_REMOTE;
- RETURN(0);
-}
-
-/**
- * Instance of lu_object_operations for mdc.
- */
-static const struct lu_object_operations mdc_obj_ops = {
- .loo_object_init = mdc_object_init,
- .loo_object_free = mdc_object_free,
-};
-
-/**
- * \name The set of md_object_operations.
- * @{
- */
-/**
- * Get mdc_thread_info from lu_context
- */
-static
-struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
-{
- struct mdc_thread_info *mci;
-
- mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
- LASSERT(mci);
- return mci;
-}
-
-/**
- * Initialize mdc_thread_info.
- */
-static
-struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
-{
- struct mdc_thread_info *mci = mdc_info_get(env);
- memset(mci, 0, sizeof(*mci));
- return mci;
-}
-
-/**
- * Convert attributes from mdt_body to the md_attr.
- */
-static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
-{
- struct lu_attr *la = &ma->ma_attr;
- /* update time */
- if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
- la->la_ctime = body->ctime;
- if (body->valid & OBD_MD_FLMTIME)
- la->la_mtime = body->mtime;
- }
-
- if (body->valid & OBD_MD_FLMODE)
- la->la_mode = body->mode;
- if (body->valid & OBD_MD_FLSIZE)
- la->la_size = body->size;
- if (body->valid & OBD_MD_FLBLOCKS)
- la->la_blocks = body->blocks;
- if (body->valid & OBD_MD_FLUID)
- la->la_uid = body->uid;
- if (body->valid & OBD_MD_FLGID)
- la->la_gid = body->gid;
- if (body->valid & OBD_MD_FLFLAGS)
- la->la_flags = body->flags;
- if (body->valid & OBD_MD_FLNLINK)
- la->la_nlink = body->nlink;
- if (body->valid & OBD_MD_FLRDEV)
- la->la_rdev = body->rdev;
-
- la->la_valid = body->valid;
- ma->ma_valid = MA_INODE;
-}
-
-/**
- * Fill the md_attr \a ma with attributes from request.
- */
-static int mdc_req2attr_update(const struct lu_env *env,
- struct md_attr *ma)
-{
- struct mdc_thread_info *mci;
- struct ptlrpc_request *req;
- struct mdt_body *body;
- struct lov_mds_md *md;
- struct llog_cookie *cookie;
- void *acl;
-
- ENTRY;
- mci = mdc_info_get(env);
- req = mci->mci_req;
- LASSERT(req);
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- LASSERT(body);
- mdc_body2attr(body, ma);
-
- if (body->valid & OBD_MD_FLMDSCAPA) {
- struct lustre_capa *capa;
-
- /* create for cross-ref will fetch mds capa from remote obj */
- capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
- LASSERT(capa != NULL);
- LASSERT(ma->ma_capa != NULL);
- *ma->ma_capa = *capa;
- }
-
- if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
- if (body->eadatasize == 0) {
- CERROR("No size defined for easize field\n");
- RETURN(-EPROTO);
- }
-
- md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
- body->eadatasize);
- if (md == NULL)
- RETURN(-EPROTO);
-
- LASSERT(ma->ma_lmm != NULL);
- LASSERT(ma->ma_lmm_size >= body->eadatasize);
- ma->ma_lmm_size = body->eadatasize;
- memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
- ma->ma_valid |= MA_LOV;
- }
-
- if (body->valid & OBD_MD_FLCOOKIE) {
- /*
- * ACL and cookie share the same body->aclsize, we need
- * to make sure that they both never come here.
- */
- LASSERT(!(body->valid & OBD_MD_FLACL));
-
- if (body->aclsize == 0) {
- CERROR("No size defined for cookie field\n");
- RETURN(-EPROTO);
- }
-
- cookie = req_capsule_server_sized_get(&req->rq_pill,
- &RMF_LOGCOOKIES,
- body->aclsize);
- if (cookie == NULL)
- RETURN(-EPROTO);
-
- LASSERT(ma->ma_cookie != NULL);
- LASSERT(ma->ma_cookie_size == body->aclsize);
- memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
- ma->ma_valid |= MA_COOKIE;
- }
-
-#ifdef CONFIG_FS_POSIX_ACL
- if (body->valid & OBD_MD_FLACL) {
- if (body->aclsize == 0) {
- CERROR("No size defined for acl field\n");
- RETURN(-EPROTO);
- }
-
- acl = req_capsule_server_sized_get(&req->rq_pill,
- &RMF_ACL,
- body->aclsize);
- if (acl == NULL)
- RETURN(-EPROTO);
-
- LASSERT(ma->ma_acl != NULL);
- LASSERT(ma->ma_acl_size == body->aclsize);
- memcpy(ma->ma_acl, acl, ma->ma_acl_size);
- ma->ma_valid |= MA_ACL_DEF;
- }
-#endif
-
- RETURN(0);
-}
-
-/**
- * The md_object_operations::moo_attr_get() in mdc.
- */
-static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- struct mdc_thread_info *mci;
- int rc;
- ENTRY;
-
- mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
- LASSERT(mci);
-
- memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
-
- memcpy(&mci->mci_opdata.op_fid1, lu_object_fid(&mo->mo_lu),
- sizeof (struct lu_fid));
- mci->mci_opdata.op_valid = OBD_MD_FLMODE | OBD_MD_FLUID |
- OBD_MD_FLGID | OBD_MD_FLFLAGS |
- OBD_MD_FLCROSSREF;
-
- rc = md_getattr(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
- if (rc == 0) {
- /* get attr from request */
- rc = mdc_req2attr_update(env, ma);
- }
-
- ptlrpc_req_finished(mci->mci_req);
-
- RETURN(rc);
-}
-
-/**
- * Helper to init timspec \a t.
- */
-static inline struct timespec *mdc_attr_time(struct timespec *t, obd_time seconds)
-{
- t->tv_sec = seconds;
- t->tv_nsec = 0;
- return t;
-}
-
-/**
- * The md_object_operations::moo_attr_set() in mdc.
- *
- * \note It is only used for set ctime when rename's source on remote MDS.
- */
-static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
- const struct md_attr *ma)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- const struct lu_attr *la = &ma->ma_attr;
- struct mdc_thread_info *mci;
- struct md_ucred *uc = md_ucred(env);
- int rc;
- ENTRY;
-
- LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-
- mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
- LASSERT(mci);
-
- memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
-
- mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
- mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
- mci->mci_opdata.op_attr.ia_mode = la->la_mode;
- mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
- if (uc &&
- ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
- mci->mci_opdata.op_fsuid = uc->mu_fsuid;
- mci->mci_opdata.op_fsgid = uc->mu_fsgid;
- mci->mci_opdata.op_cap = uc->mu_cap;
- if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
- mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
- mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
- } else {
- mci->mci_opdata.op_suppgids[0] =
- mci->mci_opdata.op_suppgids[1] = -1;
- }
- } else {
- mci->mci_opdata.op_fsuid = la->la_uid;
- mci->mci_opdata.op_fsgid = la->la_gid;
- mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
- mci->mci_opdata.op_suppgids[0] =
- mci->mci_opdata.op_suppgids[1] = -1;
- }
-
- rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
- NULL, 0, NULL, 0, &mci->mci_req, NULL);
-
- ptlrpc_req_finished(mci->mci_req);
-
- RETURN(rc);
-}
-
-/**
- * The md_object_operations::moo_object_create() in mdc.
- */
-static int mdc_object_create(const struct lu_env *env,
- struct md_object *mo,
- const struct md_op_spec *spec,
- struct md_attr *ma)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- struct lu_attr *la = &ma->ma_attr;
- struct mdc_thread_info *mci;
- const void *symname;
- struct md_ucred *uc = md_ucred(env);
- int rc, symlen;
- uid_t uid;
- gid_t gid;
- cfs_cap_t cap;
- ENTRY;
-
- LASSERT(S_ISDIR(la->la_mode));
- LASSERT(spec->u.sp_pfid != NULL);
-
- mci = mdc_info_init(env);
- mci->mci_opdata.op_bias = MDS_CROSS_REF;
- mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
-
- /* Parent fid is needed to create dotdot on the remote node. */
- mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
- mci->mci_opdata.op_mod_time = la->la_ctime;
- if (uc &&
- ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
- uid = uc->mu_fsuid;
- if (la->la_mode & S_ISGID)
- gid = la->la_gid;
- else
- gid = uc->mu_fsgid;
- cap = uc->mu_cap;
- if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
- mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
- else
- mci->mci_opdata.op_suppgids[0] = -1;
- } else {
- uid = la->la_uid;
- gid = la->la_gid;
- cap = 0;
- mci->mci_opdata.op_suppgids[0] = -1;
- }
-
- /* get data from spec */
- if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
- symname = spec->u.sp_ea.eadata;
- symlen = spec->u.sp_ea.eadatalen;
- mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
- mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
-#ifdef CONFIG_FS_POSIX_ACL
- } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
- symname = spec->u.sp_ea.eadata;
- symlen = spec->u.sp_ea.eadatalen;
- mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
- mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
-#endif
- } else {
- symname = spec->u.sp_symname;
- symlen = symname ? strlen(symname) + 1 : 0;
- }
-
- rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
- symname, symlen, la->la_mode, uid, gid,
- cap, la->la_rdev, &mci->mci_req);
-
- if (rc == 0) {
- /* get attr from request */
- rc = mdc_req2attr_update(env, ma);
- }
-
- ptlrpc_req_finished(mci->mci_req);
-
- RETURN(rc);
-}
-
-/**
- * The md_object_operations::moo_ref_add() in mdc.
- */
-static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
- const struct md_attr *ma)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- const struct lu_attr *la = &ma->ma_attr;
- struct mdc_thread_info *mci;
- struct md_ucred *uc = md_ucred(env);
- int rc;
- ENTRY;
-
- mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
- LASSERT(mci);
-
- memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
- mci->mci_opdata.op_bias = MDS_CROSS_REF;
- mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
- mci->mci_opdata.op_mod_time = la->la_ctime;
- if (uc &&
- ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
- mci->mci_opdata.op_fsuid = uc->mu_fsuid;
- mci->mci_opdata.op_fsgid = uc->mu_fsgid;
- mci->mci_opdata.op_cap = uc->mu_cap;
- if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
- mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
- mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
- } else {
- mci->mci_opdata.op_suppgids[0] =
- mci->mci_opdata.op_suppgids[1] = -1;
- }
- } else {
- mci->mci_opdata.op_fsuid = la->la_uid;
- mci->mci_opdata.op_fsgid = la->la_gid;
- mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
- mci->mci_opdata.op_suppgids[0] =
- mci->mci_opdata.op_suppgids[1] = -1;
- }
-
-
- rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
-
- ptlrpc_req_finished(mci->mci_req);
-
- RETURN(rc);
-}
-
-/**
- * The md_object_operations::moo_ref_del() in mdc.
- */
-static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- struct lu_attr *la = &ma->ma_attr;
- struct mdc_thread_info *mci;
- struct md_ucred *uc = md_ucred(env);
- int rc;
- ENTRY;
-
- mci = mdc_info_init(env);
- mci->mci_opdata.op_bias = MDS_CROSS_REF;
- if (ma->ma_attr_flags & MDS_VTX_BYPASS)
- mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
- else
- mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
- mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
- mci->mci_opdata.op_mode = la->la_mode;
- mci->mci_opdata.op_mod_time = la->la_ctime;
- if (uc &&
- ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
- mci->mci_opdata.op_fsuid = uc->mu_fsuid;
- mci->mci_opdata.op_fsgid = uc->mu_fsgid;
- mci->mci_opdata.op_cap = uc->mu_cap;
- if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
- mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
- else
- mci->mci_opdata.op_suppgids[0] = -1;
- } else {
- mci->mci_opdata.op_fsuid = la->la_uid;
- mci->mci_opdata.op_fsgid = la->la_gid;
- mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
- mci->mci_opdata.op_suppgids[0] = -1;
- }
-
- rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
- if (rc == 0) {
- /* get attr from request */
- rc = mdc_req2attr_update(env, ma);
- }
-
- ptlrpc_req_finished(mci->mci_req);
-
- RETURN(rc);
-}
-
-#ifdef HAVE_SPLIT_SUPPORT
-/** Send page with directory entries to another MDS. */
-int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
- struct md_object *mo, struct page *page, __u32 offset)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- int rc;
- ENTRY;
-
- rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
- page, offset);
- CDEBUG(rc ? D_ERROR : D_INFO, "send page %p offset %d fid "DFID
- " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
- RETURN(rc);
-}
-#endif
-
-/**
- * Instance of md_object_operations for mdc.
- */
-static const struct md_object_operations mdc_mo_ops = {
- .moo_attr_get = mdc_attr_get,
- .moo_attr_set = mdc_attr_set,
- .moo_object_create = mdc_object_create,
- .moo_ref_add = mdc_ref_add,
- .moo_ref_del = mdc_ref_del,
-};
-/** @} */
-
-/**
- * \name The set of md_dir_operations.
- * @{
- */
-/**
- * The md_dir_operations::mdo_rename_tgt in mdc.
- */
-static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
- struct md_object *mo_t, const struct lu_fid *lf,
- const struct lu_name *lname, struct md_attr *ma)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
- struct lu_attr *la = &ma->ma_attr;
- struct mdc_thread_info *mci;
- struct md_ucred *uc = md_ucred(env);
- int rc;
- ENTRY;
-
- mci = mdc_info_init(env);
- mci->mci_opdata.op_bias = MDS_CROSS_REF;
- if (ma->ma_attr_flags & MDS_VTX_BYPASS)
- mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
- else
- mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
- mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
- mci->mci_opdata.op_fid2 = *lf;
- mci->mci_opdata.op_mode = la->la_mode;
- mci->mci_opdata.op_mod_time = la->la_ctime;
- if (uc &&
- ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
- mci->mci_opdata.op_fsuid = uc->mu_fsuid;
- mci->mci_opdata.op_fsgid = uc->mu_fsgid;
- mci->mci_opdata.op_cap = uc->mu_cap;
- if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
- mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
- mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
- } else {
- mci->mci_opdata.op_suppgids[0] =
- mci->mci_opdata.op_suppgids[1] = -1;
- }
- } else {
- mci->mci_opdata.op_fsuid = la->la_uid;
- mci->mci_opdata.op_fsgid = la->la_gid;
- mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
- mci->mci_opdata.op_suppgids[0] =
- mci->mci_opdata.op_suppgids[1] = -1;
- }
-
- rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
- lname->ln_name, lname->ln_namelen, &mci->mci_req);
- if (rc == 0) {
- /* get attr from request */
- mdc_req2attr_update(env, ma);
- }
-
- ptlrpc_req_finished(mci->mci_req);
-
- RETURN(rc);
-}
-/**
- * Check the fids are not relatives.
- * The md_dir_operations::mdo_is_subdir() in mdc.
- *
- * Return resulting fid in sfid.
- * \retval \a sfid = 0 fids are not relatives
- * \retval \a sfid = FID at which search stopped
- */
-static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
- const struct lu_fid *fid, struct lu_fid *sfid)
-{
- struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- struct mdc_thread_info *mci;
- struct mdt_body *body;
- int rc;
- ENTRY;
-
- mci = mdc_info_init(env);
-
- rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
- fid, &mci->mci_req);
- if (rc == 0 || rc == -EREMOTE) {
- body = req_capsule_server_get(&mci->mci_req->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(body->valid & OBD_MD_FLID);
-
- CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
- PFID(&body->fid1));
- *sfid = body->fid1;
- }
- ptlrpc_req_finished(mci->mci_req);
- RETURN(rc);
-}
-
-/** Instance of md_dir_operations for mdc. */
-static const struct md_dir_operations mdc_dir_ops = {
- .mdo_is_subdir = mdc_is_subdir,
- .mdo_rename_tgt = mdc_rename_tgt
-};
-/** @} */
* anymore, reserve this flags
* just for preventing such bit
* to be reused. */
-#define MDS_CREATE_RMT_ACL 01000000000 /* indicate create on remote server
- * with default ACL */
-#define MDS_CREATE_SLAVE_OBJ 02000000000 /* indicate create slave object
- * actually, this is for create, not
- * conflict with other open flags */
+
#define MDS_OPEN_LOCK 04000000000 /* This open requires open lock */
#define MDS_OPEN_HAS_EA 010000000000 /* specify object create pattern */
#define MDS_OPEN_HAS_OBJS 020000000000 /* Just set the EA the obj exist */
#define LDD_F_UPGRADE14 0x0200
/** process as lctl conf_param */
#define LDD_F_PARAM 0x0400
-/** backend fs make use of IAM directory format. */
-#define LDD_F_IAM_DIR 0x0800
/** all nodes are specified as service nodes */
#define LDD_F_NO_PRIMNODE 0x1000
/** IR enable flag */
#define LDD_F_OPC_READY 0x40000000
#define LDD_F_OPC_MASK 0xf0000000
-#define LDD_F_ONDISK_MASK (LDD_F_SV_TYPE_MASK | LDD_F_IAM_DIR)
+#define LDD_F_ONDISK_MASK (LDD_F_SV_TYPE_MASK)
#define LDD_F_MASK 0xFFFF
/** Current lock mode for parent dir where create is performing. */
mdl_mode_t sp_cr_mode;
- /** Check for split */
- int sp_ck_split;
-
/** to create directory */
const struct dt_index_features *sp_feat;
};
if (unlikely(rc))
GOTO(out_shrink, rc);
- info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
/*
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(repbody != NULL);
- info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
repbody->eadatasize = 0;
repbody->aclsize = 0;
RETURN(rc);
}
-#ifdef HAVE_SPLIT_SUPPORT
-/*
- * Retrieve dir entry from the page and insert it to the slave object, actually,
- * this should be in osd layer, but since it will not in the final product, so
- * just do it here and do not define more moo api anymore for this.
- */
-static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
- int size)
-{
- struct mdt_object *object = info->mti_object;
- struct lu_fid *lf = &info->mti_tmp_fid2;
- struct md_attr *ma = &info->mti_attr;
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc = 0, offset = 0;
- ENTRY;
-
- /* Make sure we have at least one entry. */
- if (size == 0)
- RETURN(-EINVAL);
-
- /*
- * Disable trans for this name insert, since it will include many trans
- * for this.
- */
- info->mti_no_need_trans = 1;
- /*
- * When write_dir_page, no need update parent's ctime,
- * and no permission check for name_insert.
- */
- ma->ma_attr.la_ctime = 0;
- ma->ma_attr.la_valid = LA_MODE;
- ma->ma_valid = MA_INODE;
-
- cfs_kmap(page);
- dp = page_address(page);
- offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp);
-
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- struct lu_name *lname;
- char *name;
-
- if (le16_to_cpu(ent->lde_namelen) == 0)
- continue;
-
- fid_le_to_cpu(lf, &ent->lde_fid);
- if (le64_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT)
- ma->ma_attr.la_mode = S_IFDIR;
- else
- ma->ma_attr.la_mode = 0;
- OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
- if (name == NULL)
- GOTO(out, rc = -ENOMEM);
-
- memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
- lname = mdt_name(info->mti_env, name,
- le16_to_cpu(ent->lde_namelen));
- ma->ma_attr_flags |= (MDS_PERM_BYPASS | MDS_QUOTA_IGNORE);
- rc = mdo_name_insert(info->mti_env,
- md_object_next(&object->mot_obj),
- lname, lf, ma);
- OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
- if (rc) {
- CERROR("Can't insert %*.*s, rc %d\n",
- le16_to_cpu(ent->lde_namelen),
- le16_to_cpu(ent->lde_namelen),
- ent->lde_name, rc);
- GOTO(out, rc);
- }
-
- offset += lu_dirent_size(ent);
- if (offset >= size)
- break;
- }
- EXIT;
-out:
- cfs_kunmap(page);
- return rc;
-}
-
-static int mdt_bulk_timeout(void *data)
-{
- ENTRY;
-
- CERROR("mdt bulk transfer timeout \n");
-
- RETURN(1);
-}
-
-static int mdt_writepage(struct mdt_thread_info *info)
-{
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_body *reqbody;
- struct l_wait_info *lwi;
- struct ptlrpc_bulk_desc *desc;
- struct page *page;
- int rc;
- ENTRY;
-
-
- reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
- if (reqbody == NULL)
- RETURN(err_serious(-EFAULT));
-
- desc = ptlrpc_prep_bulk_exp(req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
- if (desc == NULL)
- RETURN(err_serious(-ENOMEM));
-
- /* allocate the page for the desc */
- page = cfs_alloc_page(CFS_ALLOC_STD);
- if (page == NULL)
- GOTO(desc_cleanup, rc = -ENOMEM);
-
- CDEBUG(D_INFO, "Received page offset %d size %d \n",
- (int)reqbody->size, (int)reqbody->nlink);
-
- ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
- (int)reqbody->nlink);
-
- rc = sptlrpc_svc_prep_bulk(req, desc);
- if (rc != 0)
- GOTO(cleanup_page, rc);
- /*
- * Check if client was evicted while we were doing i/o before touching
- * network.
- */
- OBD_ALLOC_PTR(lwi);
- if (!lwi)
- GOTO(cleanup_page, rc = -ENOMEM);
-
- if (desc->bd_export->exp_failed)
- rc = -ENOTCONN;
- else
- rc = ptlrpc_start_bulk_transfer (desc);
- if (rc == 0) {
- *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * CFS_HZ / 4, CFS_HZ,
- mdt_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
- desc->bd_export->exp_failed, lwi);
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
- if (rc == -ETIMEDOUT) {
- DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
- ptlrpc_abort_bulk(desc);
- } else if (desc->bd_export->exp_failed) {
- DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
- rc = -ENOTCONN;
- ptlrpc_abort_bulk(desc);
- } else if (!desc->bd_success ||
- desc->bd_nob_transferred != desc->bd_nob) {
- DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
- desc->bd_success ?
- "truncated" : "network error on",
- desc->bd_nob_transferred, desc->bd_nob);
- /* XXX should this be a different errno? */
- rc = -ETIMEDOUT;
- }
- } else {
- DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
- }
- if (rc)
- GOTO(cleanup_lwi, rc);
- rc = mdt_write_dir_page(info, page, reqbody->nlink);
-
-cleanup_lwi:
- OBD_FREE_PTR(lwi);
-cleanup_page:
- cfs_free_page(page);
-desc_cleanup:
- ptlrpc_free_bulk_pin(desc);
- RETURN(rc);
-}
-#endif
-
static int mdt_readpage(struct mdt_thread_info *info)
{
struct mdt_object *object = info->mti_object;
info->mti_big_lmm_used = 0;
/* To not check for split by default. */
- info->mti_spec.sp_ck_split = 0;
info->mti_spec.no_create = 0;
}
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(repbody);
- info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
repbody->eadatasize = 0;
repbody->aclsize = 0;
/* CMD is supported only in IAM mode */
LASSERT(num);
node_id = simple_strtol(num, NULL, 10);
- if (!(lsi->lsi_flags & LDD_F_IAM_DIR) && node_id) {
- CERROR("CMD Operation not allowed in IOP mode\n");
- GOTO(err_lmi, rc = -EINVAL);
- }
-
- obd->u.obt.obt_magic = OBT_MAGIC;
+ obd->u.obt.obt_magic = OBT_MAGIC;
}
cfs_rwlock_init(&m->mdt_sptlrpc_lock);
rc = mdt_stack_init((struct lu_env *)env, m, cfg);
if (rc) {
CERROR("Can't init device stack, rc %d\n", rc);
- RETURN(rc);
+ GOTO(err_lmi, rc);
}
s = m->mdt_md_dev.md_lu_dev.ld_site;
LA_CTIME | LA_MTIME | LA_ATIME;
memset(&sp->u, 0, sizeof(sp->u));
sp->sp_cr_flags = get_mrc_cr_flags(rec);
- sp->sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT);
- info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF);
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
mdt_set_capainfo(info, 0, rr->rr_fid1,
req_capsule_client_get(pill, &RMF_CAPA1));
mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
- if (!info->mti_cross_ref) {
- rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
- rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME,
- RCL_CLIENT) - 1;
- LASSERT(rr->rr_name && rr->rr_namelen > 0);
- } else {
- rr->rr_name = NULL;
- rr->rr_namelen = 0;
- }
+ rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME,
+ RCL_CLIENT) - 1;
+ LASSERT(rr->rr_name && rr->rr_namelen > 0);
-#ifdef CONFIG_FS_POSIX_ACL
- if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) {
- if (S_ISDIR(attr->la_mode))
- sp->u.sp_pfid = rr->rr_fid1;
- req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
- LASSERT(req_capsule_field_present(pill, &RMF_EADATA,
- RCL_CLIENT));
- rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA);
- rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
- RCL_CLIENT);
- sp->u.sp_ea.eadata = rr->rr_eadata;
- sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
- sp->u.sp_ea.fid = rr->rr_fid1;
- RETURN(0);
- }
-#endif
- if (S_ISDIR(attr->la_mode)) {
- /* pass parent fid for cross-ref cases */
- sp->u.sp_pfid = rr->rr_fid1;
- if (sp->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
- /* create salve object req, need
- * unpack split ea here
- */
- req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SLAVE);
- LASSERT(req_capsule_field_present(pill, &RMF_EADATA,
- RCL_CLIENT));
- rr->rr_eadata = req_capsule_client_get(pill,
- &RMF_EADATA);
- rr->rr_eadatalen = req_capsule_get_size(pill,
- &RMF_EADATA,
- RCL_CLIENT);
- sp->u.sp_ea.eadata = rr->rr_eadata;
- sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
- sp->u.sp_ea.fid = rr->rr_fid1;
- RETURN(0);
- }
- req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
- } else if (S_ISLNK(attr->la_mode)) {
+ if (S_ISLNK(attr->la_mode)) {
const char *tgt = NULL;
req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SYM);
} else {
req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
}
+
rc = mdt_dlmreq_unpack(info);
RETURN(rc);
}
mdt_set_capainfo(info, 1, rr->rr_fid2,
req_capsule_client_get(pill, &RMF_CAPA2));
- info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT);
- info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF);
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
RETURN(-EFAULT);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
- if (!info->mti_cross_ref)
- LASSERT(rr->rr_namelen > 0);
+
+ LASSERT(rr->rr_namelen > 0);
rc = mdt_dlmreq_unpack(info);
RETURN(rc);
mdt_set_capainfo(info, 0, rr->rr_fid1,
req_capsule_client_get(pill, &RMF_CAPA1));
- info->mti_cross_ref = !!(rec->ul_bias & MDS_CROSS_REF);
- if (!info->mti_cross_ref) {
- rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
- rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
- if (rr->rr_name == NULL || rr->rr_namelen == 0)
- RETURN(-EFAULT);
- } else {
- rr->rr_name = NULL;
- rr->rr_namelen = 0;
- }
- info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT);
+ rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
+ if (rr->rr_name == NULL || rr->rr_namelen == 0)
+ RETURN(-EFAULT);
+
if (rec->ul_bias & MDS_VTX_BYPASS)
ma->ma_attr_flags |= MDS_VTX_BYPASS;
else
mdt_set_capainfo(info, 1, rr->rr_fid2,
req_capsule_client_get(pill, &RMF_CAPA2));
- info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT);
- info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF);
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
if (rr->rr_name == NULL || rr->rr_tgt == NULL)
RETURN(-EFAULT);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT) - 1;
- if (!info->mti_cross_ref)
- LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0);
+ LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0);
+
if (rec->rn_bias & MDS_VTX_BYPASS)
ma->ma_attr_flags |= MDS_VTX_BYPASS;
else
RETURN(-EPROTO);
info->mti_replayepoch = rec->cr_ioepoch;
- info->mti_spec.sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT);
info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF);
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
lsd->lsd_feature_compat |= OBD_COMPAT_20;
}
- if (lsi->lsi_flags & LDD_F_IAM_DIR)
- lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR;
-
lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
cfs_spin_lock(&mdt->mdt_lut.lut_translock);
RETURN(rc);
}
-/* Partial request to create object only */
-static int mdt_md_mkobj(struct mdt_thread_info *info)
-{
- struct mdt_device *mdt = info->mti_mdt;
- struct mdt_object *o;
- struct mdt_body *repbody;
- struct md_attr *ma = &info->mti_attr;
- int rc;
- ENTRY;
-
- DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"",
- PFID(info->mti_rr.rr_fid2));
-
- repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-
- o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
- if (!IS_ERR(o)) {
- struct md_object *next = mdt_object_child(o);
-
- ma->ma_need = MA_INODE;
- ma->ma_valid = 0;
-
- /*
- * Cross-ref create can encounter already created obj in case of
- * recovery, just get attr in that case.
- */
- if (mdt_object_exists(o) == 1) {
- rc = mdt_attr_get_complex(info, o, ma);
- } else {
- /*
- * Here, NO permission check for object_create,
- * such check has been done on the original MDS.
- */
- rc = mo_object_create(info->mti_env, next,
- &info->mti_spec, ma);
- }
- if (rc == 0) {
- /* Return fid & attr to client. */
- if (ma->ma_valid & MA_INODE)
- mdt_pack_attr2body(info, repbody, &ma->ma_attr,
- mdt_object_fid(o));
- }
- mdt_object_put(info->mti_env, o);
- } else
- rc = PTR_ERR(o);
-
- mdt_create_pack_capa(info, rc, o, repbody);
- RETURN(rc);
-}
-
int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
struct md_attr *ma, int flags)
{
if (info->mti_dlm_req)
ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
+ LASSERT(info->mti_rr.rr_namelen > 0);
switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
- case S_IFDIR:{
- /* Cross-ref case. */
- /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */
- if (info->mti_cross_ref) {
- rc = mdt_md_mkobj(info);
- } else {
- LASSERT(info->mti_rr.rr_namelen > 0);
- mdt_counter_incr(req, LPROC_MDT_MKDIR);
- rc = mdt_md_create(info);
- }
- break;
- }
+ case S_IFDIR:
+ mdt_counter_incr(req, LPROC_MDT_MKDIR);
+ break;
case S_IFREG:
case S_IFLNK:
case S_IFCHR:
case S_IFBLK:
case S_IFIFO:
- case S_IFSOCK:{
- /* Special file should stay on the same node as parent. */
- LASSERT(info->mti_rr.rr_namelen > 0);
+ case S_IFSOCK:
+ /* Special file should stay on the same node as parent. */
mdt_counter_incr(req, LPROC_MDT_MKNOD);
- rc = mdt_md_create(info);
- break;
- }
- default:
- rc = err_serious(-EOPNOTSUPP);
- }
- RETURN(rc);
+ break;
+ default:
+ CERROR("%s: Unsupported mode %o\n",
+ mdt2obd_dev(info->mti_mdt)->obd_name,
+ info->mti_attr.ma_attr.la_mode);
+ RETURN(err_serious(-EOPNOTSUPP));
+ }
+
+ rc = mdt_md_create(info);
+ RETURN(rc);
}
/*
RETURN(err_serious(-ENOENT));
/*
- * step 1: lock the parent. Note, this may be child in case of
- * remote operation denoted by ->mti_cross_ref flag.
+ * step 1: lock the parent.
*/
parent_lh = &info->mti_lh[MDT_LH_PARENT];
- if (info->mti_cross_ref) {
- /*
- * Init reg lock for cross ref case when we need to do only
- * ref del locally.
- */
- mdt_lock_reg_init(parent_lh, LCK_PW);
- } else {
- mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
- rr->rr_namelen);
- }
+ mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
+ rr->rr_namelen);
+
mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
MDS_INODELOCK_UPDATE);
- if (IS_ERR(mp)) {
- rc = PTR_ERR(mp);
- /* errors are possible here in cross-ref cases, see below */
- if (info->mti_cross_ref)
- rc = 0;
- GOTO(out, rc);
- }
+ if (IS_ERR(mp))
+ GOTO(out, rc = PTR_ERR(mp));
if (mdt_object_obf(mp))
GOTO(out_unlock_parent, rc = -EPERM);
mdt_reint_init_ma(info, ma);
- if (info->mti_cross_ref) {
- /*
- * Remote partial operation. It is possible that replay may
- * happen on parent MDT and this operation will be repeated.
- * Therefore the object absense is allowed case and nothing
- * should be done here.
- */
- if (mdt_object_exists(mp) > 0) {
- mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
- rc = mo_ref_del(info->mti_env,
- mdt_object_child(mp), ma);
- if (rc == 0)
- mdt_handle_last_unlink(info, mp, ma);
- } else
- rc = 0;
- GOTO(out_unlock_parent, rc);
- }
-
/* step 2: find & lock the child */
lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
/* lookup child object along with version checking */
if (info->mti_dlm_req)
ldlm_request_cancel(req, info->mti_dlm_req, 0);
- if (info->mti_cross_ref) {
- /* MDT holding name ask us to add ref. */
- lhs = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lhs, LCK_EX);
- ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
- MDS_INODELOCK_UPDATE);
- if (IS_ERR(ms))
- RETURN(PTR_ERR(ms));
-
- mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
- rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma);
- mdt_object_unlock_put(info, ms, lhs, rc);
- RETURN(rc);
- }
-
/* Invalid case so return error immediately instead of
* processing it */
if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
return rc;
}
-/* partial operation for rename */
-static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
-{
- struct mdt_reint_record *rr = &info->mti_rr;
- struct ptlrpc_request *req = mdt_info_req(info);
- struct md_attr *ma = &info->mti_attr;
- struct mdt_object *mtgtdir;
- struct mdt_object *mtgt = NULL;
- struct mdt_lock_handle *lh_tgtdir;
- struct mdt_lock_handle *lh_tgt = NULL;
- struct lu_fid *tgt_fid = &info->mti_tmp_fid1;
- struct lu_name *lname;
- int rc;
- ENTRY;
-
- DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
- rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
-
- /* step 1: lookup & lock the tgt dir. */
- lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
- rr->rr_tgtlen);
- mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
- MDS_INODELOCK_UPDATE);
- if (IS_ERR(mtgtdir))
- RETURN(PTR_ERR(mtgtdir));
-
- /* step 2: find & lock the target object if exists. */
- mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
- lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
- rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
- lname, tgt_fid, &info->mti_spec);
- if (rc != 0 && rc != -ENOENT) {
- GOTO(out_unlock_tgtdir, rc);
- } else if (rc == 0) {
- /*
- * In case of replay that name can be already inserted, check
- * that and do nothing if so.
- */
- if (lu_fid_eq(tgt_fid, rr->rr_fid2))
- GOTO(out_unlock_tgtdir, rc);
-
- lh_tgt = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lh_tgt, LCK_EX);
-
- mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
- MDS_INODELOCK_LOOKUP);
- if (IS_ERR(mtgt))
- GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
-
- mdt_reint_init_ma(info, ma);
-
- rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
- mdt_object_child(mtgt), rr->rr_fid2,
- lname, ma);
- } else /* -ENOENT */ {
- rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
- lname, rr->rr_fid2, ma);
- }
-
- /* handle last link of tgt object */
- if (rc == 0 && mtgt)
- mdt_handle_last_unlink(info, mtgt, ma);
-
- EXIT;
-
- if (mtgt)
- mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
-out_unlock_tgtdir:
- mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
- return rc;
-}
-
static int mdt_rename_lock(struct mdt_thread_info *info,
struct lustre_handle *lh)
{
if (info->mti_dlm_req)
ldlm_request_cancel(req, info->mti_dlm_req, 0);
- if (info->mti_cross_ref) {
- rc = mdt_reint_rename_tgt(info);
- RETURN(rc);
- }
-
DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
PFID(rr->rr_fid1), rr->rr_name,
PFID(rr->rr_fid2), rr->rr_tgt);
spec->sp_cr_flags = 0;
spec->sp_cr_lookup = 0;
spec->sp_cr_mode = 0;
- spec->sp_ck_split = 0;
lname->ln_name = name;
lname->ln_namelen = strlen(name);
spec->sp_cr_flags = 0;
spec->sp_cr_lookup = 1;
spec->sp_cr_mode = 0;
- spec->sp_ck_split = 0;
if (feat == &dt_directory_features)
la->la_mode = S_IFDIR | S_IXUGO;
lsi->lsi_flags |= rc;
/* Add mount line flags that used to be in ldd:
- * writeconf, mgs, iam, anything else?
+ * writeconf, mgs, anything else?
*/
lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_WRITECONF) ?
LDD_F_WRITECONF : 0;
LDD_F_VIRGIN : 0;
lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_MGS) ?
LDD_F_SV_TYPE_MGS : 0;
- lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_IAM) ?
- LDD_F_IAM_DIR : 0;
lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_NO_PRIMNODE) ?
LDD_F_NO_PRIMNODE : 0;
l = &mo->oo_dt.do_lu;
dt_object_init(&mo->oo_dt, NULL, d);
- if (osd_dev(d)->od_iop_mode)
- mo->oo_dt.do_ops = &osd_obj_ea_ops;
- else
- mo->oo_dt.do_ops = &osd_obj_ops;
-
+ mo->oo_dt.do_ops = &osd_obj_ea_ops;
l->lo_ops = &osd_lu_obj_ops;
cfs_init_rwsem(&mo->oo_sem);
cfs_init_rwsem(&mo->oo_ext_idx_sem);
obj->oo_inode = inode;
LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
- if (dev->od_iop_mode) {
- obj->oo_compat_dot_created = 1;
- obj->oo_compat_dotdot_created = 1;
- }
+
+ obj->oo_compat_dot_created = 1;
+ obj->oo_compat_dotdot_created = 1;
if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
GOTO(out, result = 0);
{
int result;
struct osd_thandle *oth;
- struct osd_device *osd = osd_obj2dev(obj);
__u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
LASSERT(S_ISDIR(attr->la_mode));
oth = container_of(th, struct osd_thandle, ot_super);
LASSERT(oth->ot_handle->h_transaction != NULL);
result = osd_mkfile(info, obj, mode, hint, th);
- if (result == 0 && osd->od_iop_mode == 0) {
- LASSERT(obj->oo_inode != NULL);
- /*
- * XXX uh-oh... call low-level iam function directly.
- */
- result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
- sizeof (struct osd_fid_pack),
- oth->ot_handle);
- }
return result;
}
int result;
int skip_iam = 0;
struct osd_object *obj = osd_dt_obj(dt);
- struct osd_device *osd = osd_obj2dev(obj);
LINVRNT(osd_invariant(obj));
LASSERT(dt_object_exists(dt));
if (osd_object_is_root(obj)) {
dt->do_index_ops = &osd_index_ea_ops;
result = 0;
- } else if (feat == &dt_directory_features && osd->od_iop_mode) {
+ } else if (feat == &dt_directory_features) {
dt->do_index_ops = &osd_index_ea_ops;
if (S_ISDIR(obj->oo_inode->i_mode))
result = 0;
}
};
+
/**
* Creates or initializes iterator context.
*
GOTO(out, rc = -EINVAL);
}
- if (lmd_flags & LMD_FLG_IAM) {
- o->od_iop_mode = 0;
- LCONSOLE_WARN("%s: OSD: IAM mode enabled\n", name);
- } else
- o->od_iop_mode = 1;
if (lmd_flags & LMD_FLG_NOSCRUB)
o->od_noscrub = 1;
struct obd_statfs od_statfs;
cfs_spinlock_t od_osfs_lock;
- unsigned int od_iop_mode:1,
- od_noscrub:1;
+ unsigned int od_noscrub:1;
struct fsfilt_operations *od_fsops;
int od_connects;
MDS_OPEN_OWNEROVERRIDE);
LASSERTF(MDS_OPEN_JOIN_FILE == 000400000000UL, "found 0%.11oUL\n",
MDS_OPEN_JOIN_FILE);
- LASSERTF(MDS_CREATE_RMT_ACL == 001000000000UL, "found 0%.11oUL\n",
- MDS_CREATE_RMT_ACL);
- LASSERTF(MDS_CREATE_SLAVE_OBJ == 002000000000UL, "found 0%.11oUL\n",
- MDS_CREATE_SLAVE_OBJ);
LASSERTF(MDS_OPEN_LOCK == 004000000000UL, "found 0%.11oUL\n",
MDS_OPEN_LOCK);
LASSERTF(MDS_OPEN_HAS_EA == 010000000000UL, "found 0%.11oUL\n",
DAEMONSIZE=${DAEMONSIZE:-500}
MDSCOUNT=${MDSCOUNT:-1}
[ $MDSCOUNT -gt 4 ] && MDSCOUNT=4
-[ $MDSCOUNT -gt 1 ] && IAMDIR=yes
for num in $(seq $MDSCOUNT); do
eval mds${num}_HOST=\$\{mds${num}_HOST:-$mds_HOST\}
eval mds${num}failover_HOST=\$\{mds${num}failover_HOST:-$mdsfailover_HOST\}
opts+=${L_GETIDENTITY:+" --param=mdt.identity_upcall=$L_GETIDENTITY"}
if [ $fstype == ldiskfs ]; then
- opts+=${IAMDIR:+" --iam-dir"}
-
fs_mkfs_opts+=${MDSJOURNALSIZE:+" -J size=$MDSJOURNALSIZE"}
fs_mkfs_opts+=${MDSISIZE:+" -i $MDSISIZE"}
fi
printf("Lustre FS: %s\n", ldd->ldd_fsname);
printf("Mount type: %s\n", MT_STR(ldd));
printf("Flags: %#x\n", ldd->ldd_flags);
- printf(" (%s%s%s%s%s%s%s%s%s%s)\n",
+ printf(" (%s%s%s%s%s%s%s%s%s)\n",
IS_MDT(ldd) ? "MDT ":"",
IS_OST(ldd) ? "OST ":"",
IS_MGS(ldd) ? "MGS ":"",
ldd->ldd_flags & LDD_F_VIRGIN ? "first_time ":"",
ldd->ldd_flags & LDD_F_UPDATE ? "update ":"",
ldd->ldd_flags & LDD_F_WRITECONF ? "writeconf ":"",
- ldd->ldd_flags & LDD_F_IAM_DIR ? "IAM_dir_format ":"",
ldd->ldd_flags & LDD_F_NO_PRIMNODE? "no_primnode ":"",
ldd->ldd_flags & LDD_F_UPGRADE14 ? "upgrade1.4 ":"");
printf("Persistent mount opts: %s\n", ldd->ldd_mount_opts);
char **mountopts)
{
static struct option long_opt[] = {
- {"iam-dir", 0, 0, 'a'},
{"backfstype", 1, 0, 'b'},
{"stripe-count-hint", 1, 0, 'c'},
{"comment", 1, 0, 'u'},
while ((opt = getopt_long(argc, argv, optstring, long_opt, &longidx)) !=
EOF) {
switch (opt) {
- case 'a': {
- if (IS_MDT(&mop->mo_ldd))
- mop->mo_ldd.ldd_flags |= LDD_F_IAM_DIR;
- break;
- }
case 'b': {
int i = 0;
while (i < LDD_MT_LAST) {
append_option(options, "virgin");
if (ldd->ldd_flags & LDD_F_WRITECONF)
append_option(options, "writeconf");
- if (ldd->ldd_flags & LDD_F_IAM_DIR)
- append_option(options, "iam");
if (ldd->ldd_flags & LDD_F_NO_PRIMNODE)
append_option(options, "noprimnode");
CHECK_VALUE_O(MDS_OPEN_DELAY_CREATE);
CHECK_VALUE_O(MDS_OPEN_OWNEROVERRIDE);
CHECK_VALUE_O(MDS_OPEN_JOIN_FILE);
- CHECK_VALUE_O(MDS_CREATE_RMT_ACL);
- CHECK_VALUE_O(MDS_CREATE_SLAVE_OBJ);
CHECK_VALUE_O(MDS_OPEN_LOCK);
CHECK_VALUE_O(MDS_OPEN_HAS_EA);
CHECK_VALUE_O(MDS_OPEN_HAS_OBJS);
MDS_OPEN_OWNEROVERRIDE);
LASSERTF(MDS_OPEN_JOIN_FILE == 000400000000UL, "found 0%.11oUL\n",
MDS_OPEN_JOIN_FILE);
- LASSERTF(MDS_CREATE_RMT_ACL == 001000000000UL, "found 0%.11oUL\n",
- MDS_CREATE_RMT_ACL);
- LASSERTF(MDS_CREATE_SLAVE_OBJ == 002000000000UL, "found 0%.11oUL\n",
- MDS_CREATE_SLAVE_OBJ);
LASSERTF(MDS_OPEN_LOCK == 004000000000UL, "found 0%.11oUL\n",
MDS_OPEN_LOCK);
LASSERTF(MDS_OPEN_HAS_EA == 010000000000UL, "found 0%.11oUL\n",