Whamcloud - gitweb
LU-2216 mdt: remove obsolete DNE code
authorwangdi <di.wang@whamcloud.com>
Sat, 27 Oct 2012 22:05:56 +0000 (15:05 -0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 29 Oct 2012 05:25:09 +0000 (01:25 -0400)
1. remove split checking and cross-ref code from DNE.
2. remove IAM code on ldiskfs and utils.
3. remove cmm directory.

Change-Id: I0c81d753462863706e8918393369dde94a45030c
Signed-off-by: Wang Di <di.wang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/4353
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
31 files changed:
lustre/Makefile.in
lustre/autoMakefile.am
lustre/autoconf/lustre-core.m4
lustre/cmm/Makefile.in [deleted file]
lustre/cmm/autoMakefile.am [deleted file]
lustre/cmm/cmm_device.c [deleted file]
lustre/cmm/cmm_internal.h [deleted file]
lustre/cmm/cmm_lproc.c [deleted file]
lustre/cmm/cmm_object.c [deleted file]
lustre/cmm/cmm_split.c [deleted file]
lustre/cmm/mdc_device.c [deleted file]
lustre/cmm/mdc_internal.h [deleted file]
lustre/cmm/mdc_object.c [deleted file]
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_disk.h
lustre/include/md_object.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/obdclass/md_local_object.c
lustre/obdclass/obd_mount.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/ptlrpc/wiretest.c
lustre/tests/cfg/local.sh
lustre/tests/test-framework.sh
lustre/utils/mkfs_lustre.c
lustre/utils/mount_lustre.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index b3f6f60..ae8c508 100644 (file)
@@ -5,7 +5,7 @@ subdir-m += ptlrpc
 subdir-m += obdecho
 subdir-m += mgc
 
-@SERVER_TRUE@subdir-m += mds ost mgs mdt cmm mdd ofd quota osp lod
+@SERVER_TRUE@subdir-m += mds ost mgs mdt mdd ofd quota osp lod
 @CLIENT_TRUE@subdir-m += lov osc mdc lmv llite fld
 @LDISKFS_ENABLED_TRUE@subdir-m += osd-ldiskfs
 @ZFS_ENABLED_TRUE@subdir-m += osd-zfs
index 728b4c2..dafe8c8 100644 (file)
@@ -42,7 +42,7 @@ AUTOMAKE_OPTIONS = foreign
 ALWAYS_SUBDIRS = include lvfs obdclass ldlm ptlrpc obdecho \
        mgc fid fld doc utils tests scripts autoconf contrib conf
 
-SERVER_SUBDIRS = ost mds mgs mdt cmm mdd ofd osd-zfs osd-ldiskfs \
+SERVER_SUBDIRS = ost mds mgs mdt mdd ofd osd-zfs osd-ldiskfs \
        quota osp lod target
 
 CLIENT_SUBDIRS = mdc lmv llite lclient lov osc
index b771c53..9798be0 100644 (file)
@@ -2520,8 +2520,6 @@ lustre/mds/Makefile
 lustre/mds/autoMakefile
 lustre/mdt/Makefile
 lustre/mdt/autoMakefile
-lustre/cmm/Makefile
-lustre/cmm/autoMakefile
 lustre/mdd/Makefile
 lustre/mdd/autoMakefile
 lustre/fld/Makefile
diff --git a/lustre/cmm/Makefile.in b/lustre/cmm/Makefile.in
deleted file mode 100644 (file)
index befc9ea..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-MODULES := cmm
-cmm-objs := cmm_device.o cmm_object.o cmm_lproc.o mdc_device.o mdc_object.o
-
-@SPLIT_TRUE@cmm-objs += cmm_split.o 
-
-@INCLUDE_RULES@
diff --git a/lustre/cmm/autoMakefile.am b/lustre/cmm/autoMakefile.am
deleted file mode 100644 (file)
index e7355f8..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# GPL HEADER START
-#
-# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 only,
-# as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# General Public License version 2 for more details (a copy is included
-# in the LICENSE file that accompanied this code).
-#
-# You should have received a copy of the GNU General Public License
-# version 2 along with this program; If not, see
-# http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
-#
-# Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
-# CA 95054 USA or visit www.sun.com if you need additional information or
-# have any questions.
-#
-# GPL HEADER END
-#
-
-#
-# Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
-# Use is subject to license terms.
-#
-
-#
-# This file is part of Lustre, http://www.lustre.org/
-# Lustre is a trademark of Sun Microsystems, Inc.
-#
-
-if MODULES
-modulefs_DATA = cmm$(KMODEXT)
-endif
-
-MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ 
-EXTRA_DIST = $(cmm-objs:%.o=%.c) cmm_internal.h mdc_internal.h
diff --git a/lustre/cmm/cmm_device.c b/lustre/cmm/cmm_device.c
deleted file mode 100644 (file)
index a3c9397..0000000
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_device.c
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-/**
- * \addtogroup cmm
- * @{
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <linux/module.h>
-
-#include <obd.h>
-#include <obd_class.h>
-#include <lprocfs_status.h>
-#include <lustre_ver.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-struct obd_ops cmm_obd_device_ops = {
-        .o_owner           = THIS_MODULE
-};
-
-static const struct lu_device_operations cmm_lu_ops;
-
-static inline int lu_device_is_cmm(struct lu_device *d)
-{
-        return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
-}
-
-int cmm_root_get(const struct lu_env *env, struct md_device *md,
-                 struct lu_fid *fid)
-{
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
-        /* valid only on master MDS */
-        if (cmm_dev->cmm_local_num == 0)
-                return cmm_child_ops(cmm_dev)->mdo_root_get(env,
-                                     cmm_dev->cmm_child, fid);
-        else
-                return -EINVAL;
-}
-
-static int cmm_statfs(const struct lu_env *env, struct md_device *md,
-                      struct obd_statfs *sfs)
-{
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
-        int rc;
-
-        ENTRY;
-        rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
-                                                cmm_dev->cmm_child, sfs);
-        RETURN (rc);
-}
-
-static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
-                           int *md_size, int *cookie_size)
-{
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
-        int rc;
-        ENTRY;
-        rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
-                                                     md_size, cookie_size);
-        RETURN(rc);
-}
-
-static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
-                              int mode , unsigned long timeout, __u32 alg,
-                              struct lustre_capa_key *keys)
-{
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
-        int rc;
-        ENTRY;
-        LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
-        rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
-                                                        mode, timeout, alg,
-                                                        keys);
-        RETURN(rc);
-}
-
-static int cmm_update_capa_key(const struct lu_env *env,
-                               struct md_device *md,
-                               struct lustre_capa_key *key)
-{
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
-        int rc;
-        ENTRY;
-        rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
-                                                         cmm_dev->cmm_child,
-                                                         key);
-        RETURN(rc);
-}
-
-static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
-                             int idx, void **h)
-{
-        struct cmm_device *cmm_dev = md2cmm_dev(m);
-        int rc;
-        ENTRY;
-
-        rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child,
-                                                       idx, h);
-        RETURN(rc);
-}
-
-int cmm_iocontrol(const struct lu_env *env, struct md_device *m,
-                  unsigned int cmd, int len, void *data)
-{
-        struct md_device *next = md2cmm_dev(m)->cmm_child;
-        int rc;
-
-        ENTRY;
-        rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
-        RETURN(rc);
-}
-
-
-static const struct md_device_operations cmm_md_ops = {
-        .mdo_statfs          = cmm_statfs,
-        .mdo_root_get        = cmm_root_get,
-        .mdo_maxsize_get     = cmm_maxsize_get,
-        .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
-        .mdo_update_capa_key = cmm_update_capa_key,
-        .mdo_llog_ctxt_get   = cmm_llog_ctxt_get,
-        .mdo_iocontrol       = cmm_iocontrol,
-};
-
-extern struct lu_device_type mdc_device_type;
-/**
- * Init MDC.
- */
-static int cmm_post_init_mdc(const struct lu_env *env,
-                             struct cmm_device *cmm)
-{
-        int max_mdsize, max_cookiesize, rc;
-        struct mdc_device *mc, *tmp;
-
-        /* get the max mdsize and cookiesize from lower layer */
-        rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
-                             &max_cookiesize);
-        if (rc)
-                RETURN(rc);
-
-        cfs_spin_lock(&cmm->cmm_tgt_guard);
-        cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
-                                     mc_linkage) {
-                cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
-        }
-        cfs_spin_unlock(&cmm->cmm_tgt_guard);
-        RETURN(rc);
-}
-
-/* --- cmm_lu_operations --- */
-/* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
-static int cmm_add_mdc(const struct lu_env *env,
-                       struct cmm_device *cm, struct lustre_cfg *cfg)
-{
-        struct lu_device_type *ldt = &mdc_device_type;
-        char *p, *num = lustre_cfg_string(cfg, 2);
-        struct mdc_device *mc, *tmp;
-        struct lu_fld_target target;
-        struct lu_device *ld;
-        struct lu_device *cmm_lu = cmm2lu_dev(cm);
-        mdsno_t mdc_num;
-        struct lu_site *site = cmm2lu_dev(cm)->ld_site;
-        int rc;
-        ENTRY;
-
-        /* find out that there is no such mdc */
-        LASSERT(num);
-        mdc_num = simple_strtol(num, &p, 10);
-        if (*p) {
-                CERROR("Invalid index in lustre_cgf, offset 2\n");
-                RETURN(-EINVAL);
-        }
-
-        cfs_spin_lock(&cm->cmm_tgt_guard);
-        cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
-                                     mc_linkage) {
-                if (mc->mc_num == mdc_num) {
-                        cfs_spin_unlock(&cm->cmm_tgt_guard);
-                        RETURN(-EEXIST);
-                }
-        }
-        cfs_spin_unlock(&cm->cmm_tgt_guard);
-        ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
-        if (IS_ERR(ld))
-                RETURN(PTR_ERR(ld));
-
-        ld->ld_site = site;
-
-        rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
-        if (rc) {
-                ldt->ldt_ops->ldto_device_free(env, ld);
-                RETURN(rc);
-        }
-        /* pass config to the just created MDC */
-        rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
-        if (rc) {
-                ldt->ldt_ops->ldto_device_fini(env, ld);
-                ldt->ldt_ops->ldto_device_free(env, ld);
-                RETURN(rc);
-        }
-
-        cfs_spin_lock(&cm->cmm_tgt_guard);
-        cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
-                                     mc_linkage) {
-                if (mc->mc_num == mdc_num) {
-                        cfs_spin_unlock(&cm->cmm_tgt_guard);
-                        ldt->ldt_ops->ldto_device_fini(env, ld);
-                        ldt->ldt_ops->ldto_device_free(env, ld);
-                        RETURN(-EEXIST);
-                }
-        }
-        mc = lu2mdc_dev(ld);
-        cfs_list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
-        cm->cmm_tgt_count++;
-        cfs_spin_unlock(&cm->cmm_tgt_guard);
-
-        lu_device_get(cmm_lu);
-        lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
-
-        target.ft_srv = NULL;
-        target.ft_idx = mc->mc_num;
-        target.ft_exp = mc->mc_desc.cl_exp;
-        fld_client_add_target(cm->cmm_fld, &target);
-
-        if (mc->mc_num == 0) {
-                /* this is mdt0 -> mc export, fld lookup need this export
-                   to forward fld lookup request. */
-                LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
-                lu_site2md(site)->ms_server_fld->lsf_control_exp =
-                                          mc->mc_desc.cl_exp;
-        }
-        /* Set max md size for the mdc. */
-        rc = cmm_post_init_mdc(env, cm);
-        RETURN(rc);
-}
-
-static void cmm_device_shutdown(const struct lu_env *env,
-                                struct cmm_device *cm,
-                                struct lustre_cfg *cfg)
-{
-        struct mdc_device *mc, *tmp;
-        ENTRY;
-
-        /* Remove local target from FLD. */
-        fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
-
-        /* Finish all mdc devices. */
-        cfs_spin_lock(&cm->cmm_tgt_guard);
-        cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
-                struct lu_device *ld_m = mdc2lu_dev(mc);
-                fld_client_del_target(cm->cmm_fld, mc->mc_num);
-                ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
-        }
-        cfs_spin_unlock(&cm->cmm_tgt_guard);
-
-        /* remove upcall device*/
-        md_upcall_fini(&cm->cmm_md_dev);
-
-        EXIT;
-}
-
-static int cmm_device_mount(const struct lu_env *env,
-                            struct cmm_device *m, struct lustre_cfg *cfg)
-{
-        const char *index = lustre_cfg_string(cfg, 2);
-        char *p;
-
-        LASSERT(index != NULL);
-
-        m->cmm_local_num = simple_strtol(index, &p, 10);
-        if (*p) {
-                CERROR("Invalid index in lustre_cgf\n");
-                RETURN(-EINVAL);
-        }
-
-        RETURN(0);
-}
-
-static int cmm_process_config(const struct lu_env *env,
-                              struct lu_device *d, struct lustre_cfg *cfg)
-{
-        struct cmm_device *m = lu2cmm_dev(d);
-        struct lu_device *next = md2lu_dev(m->cmm_child);
-        int err;
-        ENTRY;
-
-        switch(cfg->lcfg_command) {
-        case LCFG_ADD_MDC:
-                /* On first ADD_MDC add also local target. */
-                if (!(m->cmm_flags & CMM_INITIALIZED)) {
-                        struct lu_site *ls = cmm2lu_dev(m)->ld_site;
-                        struct lu_fld_target target;
-
-                        target.ft_srv = lu_site2md(ls)->ms_server_fld;
-                        target.ft_idx = m->cmm_local_num;
-                        target.ft_exp = NULL;
-
-                        fld_client_add_target(m->cmm_fld, &target);
-                }
-                err = cmm_add_mdc(env, m, cfg);
-
-                /* The first ADD_MDC can be counted as setup is finished. */
-                if (!(m->cmm_flags & CMM_INITIALIZED))
-                        m->cmm_flags |= CMM_INITIALIZED;
-
-                break;
-        case LCFG_SETUP:
-        {
-                /* lower layers should be set up at first */
-                err = next->ld_ops->ldo_process_config(env, next, cfg);
-                if (err == 0)
-                        err = cmm_device_mount(env, m, cfg);
-                break;
-        }
-        case LCFG_CLEANUP:
-        {
-               lu_dev_del_linkage(d->ld_site, d);
-                cmm_device_shutdown(env, m, cfg);
-        }
-        default:
-                err = next->ld_ops->ldo_process_config(env, next, cfg);
-        }
-        RETURN(err);
-}
-
-static int cmm_recovery_complete(const struct lu_env *env,
-                                 struct lu_device *d)
-{
-        struct cmm_device *m = lu2cmm_dev(d);
-        struct lu_device *next = md2lu_dev(m->cmm_child);
-        int rc;
-        ENTRY;
-        rc = next->ld_ops->ldo_recovery_complete(env, next);
-        RETURN(rc);
-}
-
-static int cmm_prepare(const struct lu_env *env,
-                       struct lu_device *pdev,
-                       struct lu_device *dev)
-{
-        struct cmm_device *cmm = lu2cmm_dev(dev);
-        struct lu_device *next = md2lu_dev(cmm->cmm_child);
-        int rc;
-
-        ENTRY;
-        rc = next->ld_ops->ldo_prepare(env, dev, next);
-        RETURN(rc);
-}
-
-static const struct lu_device_operations cmm_lu_ops = {
-        .ldo_object_alloc      = cmm_object_alloc,
-        .ldo_process_config    = cmm_process_config,
-        .ldo_recovery_complete = cmm_recovery_complete,
-        .ldo_prepare           = cmm_prepare,
-};
-
-/* --- lu_device_type operations --- */
-int cmm_upcall(const struct lu_env *env, struct md_device *md,
-               enum md_upcall_event ev, void *data)
-{
-        int rc;
-        ENTRY;
-
-        switch (ev) {
-                case MD_LOV_SYNC:
-                        rc = cmm_post_init_mdc(env, md2cmm_dev(md));
-                        if (rc)
-                                CERROR("can not init md size %d\n", rc);
-                        /* fall through */
-                default:
-                        rc = md_do_upcall(env, md, ev, data);
-        }
-        RETURN(rc);
-}
-
-static struct lu_device *cmm_device_free(const struct lu_env *env,
-                                         struct lu_device *d)
-{
-        struct cmm_device *m = lu2cmm_dev(d);
-        struct lu_device  *next = md2lu_dev(m->cmm_child);
-        ENTRY;
-
-        LASSERT(m->cmm_tgt_count == 0);
-        LASSERT(cfs_list_empty(&m->cmm_targets));
-        if (m->cmm_fld != NULL) {
-                OBD_FREE_PTR(m->cmm_fld);
-                m->cmm_fld = NULL;
-        }
-        md_device_fini(&m->cmm_md_dev);
-        OBD_FREE_PTR(m);
-        RETURN(next);
-}
-
-static struct lu_device *cmm_device_alloc(const struct lu_env *env,
-                                          struct lu_device_type *t,
-                                          struct lustre_cfg *cfg)
-{
-        struct lu_device  *l;
-        struct cmm_device *m;
-        ENTRY;
-
-        OBD_ALLOC_PTR(m);
-        if (m == NULL) {
-                l = ERR_PTR(-ENOMEM);
-        } else {
-                md_device_init(&m->cmm_md_dev, t);
-                m->cmm_md_dev.md_ops = &cmm_md_ops;
-                md_upcall_init(&m->cmm_md_dev, cmm_upcall);
-                l = cmm2lu_dev(m);
-                l->ld_ops = &cmm_lu_ops;
-
-                OBD_ALLOC_PTR(m->cmm_fld);
-                if (!m->cmm_fld) {
-                        cmm_device_free(env, l);
-                        l = ERR_PTR(-ENOMEM);
-                }
-        }
-        RETURN(l);
-}
-
-/* context key constructor/destructor: cmm_key_init, cmm_key_fini */
-LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
-
-/* context key: cmm_thread_key */
-LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
-
-struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
-{
-        struct cmm_thread_info *info;
-
-        info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
-        LASSERT(info != NULL);
-        return info;
-}
-
-/* type constructor/destructor: cmm_type_init/cmm_type_fini */
-LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
-
-/* 
- * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device
- * is really stacked.
- */
-static int __cmm_type_init(struct lu_device_type *t)
-{
-        int rc;
-        rc = lu_device_type_init(&mdc_device_type);
-        if (rc == 0) {
-                rc = cmm_type_init(t);
-                if (rc)
-                        lu_device_type_fini(&mdc_device_type);
-        }
-        return rc;
-}
-
-static void __cmm_type_fini(struct lu_device_type *t)
-{
-        lu_device_type_fini(&mdc_device_type);
-        cmm_type_fini(t);
-}
-
-static void __cmm_type_start(struct lu_device_type *t)
-{
-        mdc_device_type.ldt_ops->ldto_start(&mdc_device_type);
-        cmm_type_start(t);
-}
-
-static void __cmm_type_stop(struct lu_device_type *t)
-{
-        mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type);
-        cmm_type_stop(t);
-}
-
-static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
-                           const char *name, struct lu_device *next)
-{
-        struct cmm_device *m = lu2cmm_dev(d);
-        struct lu_site *ls;
-        int err = 0;
-        ENTRY;
-
-        cfs_spin_lock_init(&m->cmm_tgt_guard);
-        CFS_INIT_LIST_HEAD(&m->cmm_targets);
-        m->cmm_tgt_count = 0;
-        m->cmm_child = lu2md_dev(next);
-
-        err = fld_client_init(m->cmm_fld, name,
-                              LUSTRE_CLI_FLD_HASH_DHT);
-        if (err) {
-                CERROR("Can't init FLD, err %d\n", err);
-                RETURN(err);
-        }
-
-        /* Assign site's fld client ref, needed for asserts in osd. */
-        ls = cmm2lu_dev(m)->ld_site;
-        lu_site2md(ls)->ms_client_fld = m->cmm_fld;
-        err = cmm_procfs_init(m, name);
-
-        RETURN(err);
-}
-
-static struct lu_device *cmm_device_fini(const struct lu_env *env,
-                                         struct lu_device *ld)
-{
-        struct cmm_device *cm = lu2cmm_dev(ld);
-        struct mdc_device *mc, *tmp;
-        struct lu_site *ls;
-        ENTRY;
-
-        /* Finish all mdc devices */
-        cfs_spin_lock(&cm->cmm_tgt_guard);
-        cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
-                struct lu_device *ld_m = mdc2lu_dev(mc);
-                struct lu_device *ld_c = cmm2lu_dev(cm);
-
-                cfs_list_del_init(&mc->mc_linkage);
-                lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
-                lu_device_put(ld_c);
-                ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
-                ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
-                cm->cmm_tgt_count--;
-        }
-        cfs_spin_unlock(&cm->cmm_tgt_guard);
-
-        fld_client_proc_fini(cm->cmm_fld);
-        fld_client_fini(cm->cmm_fld);
-        ls = cmm2lu_dev(cm)->ld_site;
-        lu_site2md(ls)->ms_client_fld = NULL;
-        cmm_procfs_fini(cm);
-
-        RETURN (md2lu_dev(cm->cmm_child));
-}
-
-static struct lu_device_type_operations cmm_device_type_ops = {
-        .ldto_init = __cmm_type_init,
-        .ldto_fini = __cmm_type_fini,
-
-        .ldto_start = __cmm_type_start,
-        .ldto_stop  = __cmm_type_stop,
-
-        .ldto_device_alloc = cmm_device_alloc,
-        .ldto_device_free  = cmm_device_free,
-
-        .ldto_device_init = cmm_device_init,
-        .ldto_device_fini = cmm_device_fini
-};
-
-static struct lu_device_type cmm_device_type = {
-        .ldt_tags     = LU_DEVICE_MD,
-        .ldt_name     = LUSTRE_CMM_NAME,
-        .ldt_ops      = &cmm_device_type_ops,
-        .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
-};
-
-struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
-        { 0 }
-};
-
-struct lprocfs_vars lprocfs_cmm_module_vars[] = {
-        { 0 }
-};
-
-static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
-{
-    lvars->module_vars  = lprocfs_cmm_module_vars;
-    lvars->obd_vars     = lprocfs_cmm_obd_vars;
-}
-/** @} */
-
-static int __init cmm_mod_init(void)
-{
-        struct lprocfs_static_vars lvars;
-
-        lprocfs_cmm_init_vars(&lvars);
-        return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
-                                   LUSTRE_CMM_NAME, &cmm_device_type);
-}
-
-static void __exit cmm_mod_exit(void)
-{
-        class_unregister_type(LUSTRE_CMM_NAME);
-}
-
-MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
-MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
-MODULE_LICENSE("GPL");
-
-cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);
diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h
deleted file mode 100644 (file)
index 758c1e1..0000000
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_internal.h
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-/**
- * \defgroup cmm cmm
- * Cluster Metadata Manager.
- * Implementation of md interface to provide operation between MDS-es.
- *
- * CMM has two set of md methods, one is for local operation which uses MDD,
- * second is for remote operations to call different MDS.
- * When object is allocated then proper set of methods are set for it. *
- * @{
- */
-#ifndef _CMM_INTERNAL_H
-#define _CMM_INTERNAL_H
-
-#if defined(__KERNEL__)
-
-#include <obd.h>
-#include <lustre_fld.h>
-#include <md_object.h>
-#include <lustre_acl.h>
-
-/**
- * md_device extention for CMM layer.
- */
-struct cmm_device {
-        /** md_device instance */
-        struct md_device        cmm_md_dev;
-        /** device flags, taken from enum cmm_flags */
-        __u32                   cmm_flags;
-        /** underlaying device in MDS stack, usually MDD */
-        struct md_device       *cmm_child;
-        /** FLD client to talk to FLD */
-        struct lu_client_fld   *cmm_fld;
-        /** Number of this MDS server in cluster */
-        mdsno_t                 cmm_local_num;
-        /** Total number of other MDS */
-        __u32                   cmm_tgt_count;
-        /** linked list of all remove MDS clients */
-        cfs_list_t              cmm_targets;
-        /** lock for cmm_device::cmm_targets operations */
-        cfs_spinlock_t          cmm_tgt_guard;
-        /** /proc entry with CMM data */
-        cfs_proc_dir_entry_t   *cmm_proc_entry;
-        /** CMM statistic */
-        struct lprocfs_stats   *cmm_stats;
-};
-
-/**
- * CMM flags
- */
-enum cmm_flags {
-        /** Device initialization complete. */
-        CMM_INITIALIZED = 1 << 0
-};
-
-/**
- * Wrapper for md_device::md_ops of underlaying device cmm_child.
- */
-static inline const struct md_device_operations *
-cmm_child_ops(struct cmm_device *d)
-{
-        return d->cmm_child->md_ops;
-}
-
-/** Wrapper to get cmm_device by contained md_device */
-static inline struct cmm_device *md2cmm_dev(struct md_device *m)
-{
-        return container_of0(m, struct cmm_device, cmm_md_dev);
-}
-
-/** Wrapper to get cmm_device by contained lu_device */
-static inline struct cmm_device *lu2cmm_dev(struct lu_device *d)
-{
-        return container_of0(d, struct cmm_device, cmm_md_dev.md_lu_dev);
-}
-
-/** Wrapper to get lu_device from cmm_device */
-static inline struct lu_device *cmm2lu_dev(struct cmm_device *d)
-{
-        return (&d->cmm_md_dev.md_lu_dev);
-}
-
-#ifdef HAVE_SPLIT_SUPPORT
-/**
- * \ingroup split
- * States of split operation.
- */
-enum cmm_split_state {
-        CMM_SPLIT_UNKNOWN,
-        CMM_SPLIT_NONE,
-        CMM_SPLIT_NEEDED,
-        CMM_SPLIT_DONE,
-        CMM_SPLIT_DENIED
-};
-#endif
-
-/** CMM container for md_object. */
-struct cmm_object {
-        struct md_object cmo_obj;
-};
-
-/**
- * \defgroup cml cml
- * CMM local operations.
- * @{
- */
-/**
- * Local CMM object.
- */
-struct cml_object {
-        /** cmm_object instance. */
-        struct cmm_object    cmm_obj;
-#ifdef HAVE_SPLIT_SUPPORT
-        /** split state of object (for dirs only) */
-        enum cmm_split_state clo_split;
-#endif
-};
-/** @} */
-
-/**
- * \defgroup cmr cmr
- * CMM remote operations.
- * @{
- */
-/**
- * Remote CMM object.
- */
-struct cmr_object {
-        /** cmm_object instance */
-        struct cmm_object cmm_obj;
-        /** mds number where object is placed */
-        mdsno_t           cmo_num;
-};
-/** @} */
-
-enum {
-        CMM_SPLIT_PAGE_COUNT = 1
-};
-
-/**
- * CMM thread info.
- * This is storage for all variables used in CMM functions and it is needed as
- * stack replacement.
- */
-struct cmm_thread_info {
-        struct md_attr        cmi_ma;
-        struct lu_buf         cmi_buf;
-        struct lu_fid         cmi_fid; /* used for le/cpu conversions */
-        struct lu_rdpg        cmi_rdpg;
-        /** pointers to pages for readpage. */
-        struct page          *cmi_pages[CMM_SPLIT_PAGE_COUNT];
-        struct md_op_spec     cmi_spec;
-        struct lmv_stripe_md  cmi_lmv;
-        char                  cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
-        /** Ops object filename */
-        struct lu_name        cti_name;
-};
-
-/**
- * Wrapper to get cmm_device from cmm_object.
- */
-static inline struct cmm_device *cmm_obj2dev(struct cmm_object *c)
-{
-        return (md2cmm_dev(md_obj2dev(&c->cmo_obj)));
-}
-
-/**
- * Get cmm_device from related lu_device.
- */
-static inline struct cmm_object *lu2cmm_obj(struct lu_object *o)
-{
-        //LASSERT(lu_device_is_cmm(o->lo_dev));
-        return container_of0(o, struct cmm_object, cmo_obj.mo_lu);
-}
-
-/**
- * Get cmm object from corresponding md_object.
- */
-static inline struct cmm_object *md2cmm_obj(struct md_object *o)
-{
-        return container_of0(o, struct cmm_object, cmo_obj);
-}
-
-/**
- * Get next lower-layer md_object from current cmm_object.
- */
-static inline struct md_object *cmm2child_obj(struct cmm_object *o)
-{
-        return (o ? lu2md(lu_object_next(&o->cmo_obj.mo_lu)) : NULL);
-}
-
-/**
- * Extract lu_fid from corresponding cmm_object.
- */
-static inline struct lu_fid* cmm2fid(struct cmm_object *obj)
-{
-       return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
-}
-
-struct cmm_thread_info *cmm_env_info(const struct lu_env *env);
-struct lu_object *cmm_object_alloc(const struct lu_env *env,
-                                   const struct lu_object_header *hdr,
-                                   struct lu_device *);
-/**
- * \addtogroup cml
- * @{
- */
-/** Get cml_object from lu_object. */
-static inline struct cml_object *lu2cml_obj(struct lu_object *o)
-{
-        return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
-}
-/** Get cml_object from md_object. */
-static inline struct cml_object *md2cml_obj(struct md_object *mo)
-{
-        return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
-}
-/** Get cml_object from cmm_object */
-static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
-{
-        return container_of0(co, struct cml_object, cmm_obj);
-}
-/** @} */
-
-int cmm_upcall(const struct lu_env *env, struct md_device *md,
-               enum md_upcall_event ev, void *data);
-
-#ifdef HAVE_SPLIT_SUPPORT
-/**
- * \defgroup split split
- * @{
- */
-
-#define CMM_MD_SIZE(stripes)  (sizeof(struct lmv_stripe_md) +  \
-                               (stripes) * sizeof(struct lu_fid))
-
-/** Get and initialize lu_bug from cmm_thread_info. */
-static inline struct lu_buf *cmm_buf_get(const struct lu_env *env,
-                                         void *area, ssize_t len)
-{
-        struct lu_buf *buf;
-
-        buf = &cmm_env_info(env)->cmi_buf;
-        buf->lb_buf = area;
-        buf->lb_len = len;
-        return buf;
-}
-
-int cmm_split_check(const struct lu_env *env, struct md_object *mp,
-                    const char *name);
-int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
-                     struct md_attr *ma, int *split);
-int cmm_split_dir(const struct lu_env *env, struct md_object *mo);
-int cmm_split_access(const struct lu_env *env, struct md_object *mo,
-                     mdl_mode_t lm);
-/** @} */
-#endif
-
-int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
-                   mdsno_t *mds, const struct lu_env *env);
-
-int cmm_procfs_init(struct cmm_device *cmm, const char *name);
-int cmm_procfs_fini(struct cmm_device *cmm);
-
-void cmm_lprocfs_time_start(const struct lu_env *env);
-void cmm_lprocfs_time_end(const struct lu_env *env, struct cmm_device *cmm,
-                          int idx);
-/**
- * CMM counters.
- */
-enum {
-        LPROC_CMM_SPLIT_CHECK,
-        LPROC_CMM_SPLIT,
-        LPROC_CMM_LOOKUP,
-        LPROC_CMM_CREATE,
-        LPROC_CMM_NR
-};
-
-#endif /* __KERNEL__ */
-#endif /* _CMM_INTERNAL_H */
-/** @} */
diff --git a/lustre/cmm/cmm_lproc.c b/lustre/cmm/cmm_lproc.c
deleted file mode 100644 (file)
index f864cc7..0000000
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_lproc.c
- *
- * CMM lprocfs stuff
- *
- * Author: Wang Di      <wangdi@clusterfs.com>
- * Author: Yury Umanets <umka@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <linux/module.h>
-#include <obd.h>
-#include <obd_class.h>
-#include <lustre_ver.h>
-#include <obd_support.h>
-#include <lprocfs_status.h>
-#include <lu_time.h>
-
-#include <lustre/lustre_idl.h>
-
-#include "cmm_internal.h"
-/**
- * \addtogroup cmm
- * @{
- */
-static const char *cmm_counter_names[LPROC_CMM_NR] = {
-        [LPROC_CMM_SPLIT_CHECK] = "split_check",
-        [LPROC_CMM_SPLIT]       = "split",
-        [LPROC_CMM_LOOKUP]      = "lookup",
-        [LPROC_CMM_CREATE]      = "create"
-};
-
-int cmm_procfs_init(struct cmm_device *cmm, const char *name)
-{
-        struct lu_device    *ld = &cmm->cmm_md_dev.md_lu_dev;
-        struct obd_type     *type;
-        int                  rc;
-        ENTRY;
-
-        type = ld->ld_type->ldt_obd_type;
-
-        LASSERT(name != NULL);
-        LASSERT(type != NULL);
-
-        /* Find the type procroot and add the proc entry for this device. */
-        cmm->cmm_proc_entry = lprocfs_register(name, type->typ_procroot,
-                                               NULL, NULL);
-        if (IS_ERR(cmm->cmm_proc_entry)) {
-                rc = PTR_ERR(cmm->cmm_proc_entry);
-                CERROR("Error %d setting up lprocfs for %s\n",
-                       rc, name);
-                cmm->cmm_proc_entry = NULL;
-                GOTO(out, rc);
-        }
-
-        rc = lu_time_init(&cmm->cmm_stats,
-                          cmm->cmm_proc_entry,
-                          cmm_counter_names, ARRAY_SIZE(cmm_counter_names));
-
-        EXIT;
-out:
-        if (rc)
-               cmm_procfs_fini(cmm);
-       return rc;
-}
-
-int cmm_procfs_fini(struct cmm_device *cmm)
-{
-        if (cmm->cmm_stats)
-                lu_time_fini(&cmm->cmm_stats);
-
-        if (cmm->cmm_proc_entry) {
-                 lprocfs_remove(&cmm->cmm_proc_entry);
-                 cmm->cmm_proc_entry = NULL;
-        }
-        RETURN(0);
-}
-
-void cmm_lprocfs_time_start(const struct lu_env *env)
-{
-        lu_lprocfs_time_start(env);
-}
-
-void cmm_lprocfs_time_end(const struct lu_env *env, struct cmm_device *cmm,
-                          int idx)
-{
-        lu_lprocfs_time_end(env, cmm->cmm_stats, idx);
-}
-/** @} */
diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c
deleted file mode 100644 (file)
index 01e473a..0000000
+++ /dev/null
@@ -1,1548 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2012, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_object.c
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <lustre_fid.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-/**
- * \ingroup cmm
- * Lookup MDS number \a mds by FID \a fid.
- *
- * \param fid FID of object to find MDS
- * \param mds mds number to return.
- */
-int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
-                   mdsno_t *mds, const struct lu_env *env)
-{
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(fid_is_sane(fid));
-
-        rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
-                               LU_SEQ_RANGE_MDT, env);
-        if (rc) {
-                CERROR("Can't find mds by seq "LPX64", rc %d\n",
-                       fid_seq(fid), rc);
-                RETURN(rc);
-        }
-
-        if (*mds > cm->cmm_tgt_count) {
-                CERROR("Got invalid mdsno: %x (max: %x)\n",
-                       *mds, cm->cmm_tgt_count);
-                rc = -EINVAL;
-        } else {
-                CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
-                       LPX64"\n", *mds, fid_seq(fid));
-        }
-
-        RETURN (rc);
-}
-
-/**
- * \addtogroup cml
- * @{
- */
-static const struct md_object_operations cml_mo_ops;
-static const struct md_dir_operations    cml_dir_ops;
-static const struct lu_object_operations cml_obj_ops;
-
-static const struct md_object_operations cmr_mo_ops;
-static const struct md_dir_operations    cmr_dir_ops;
-static const struct lu_object_operations cmr_obj_ops;
-
-/**
- * \ingroup cmm
- * Allocate CMM object.
- */
-struct lu_object *cmm_object_alloc(const struct lu_env *env,
-                                   const struct lu_object_header *loh,
-                                   struct lu_device *ld)
-{
-        const struct lu_fid *fid = &loh->loh_fid;
-        struct lu_object  *lo = NULL;
-        struct cmm_device *cd;
-        mdsno_t mds;
-        int rc = 0;
-
-        ENTRY;
-
-        cd = lu2cmm_dev(ld);
-        if (cd->cmm_flags & CMM_INITIALIZED) {
-                /* get object location */
-                rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
-                if (rc)
-                        RETURN(NULL);
-        } else
-                /*
-                 * Device is not yet initialized, cmm_object is being created
-                 * as part of early bootstrap procedure (it is /ROOT, or /fld,
-                 * etc.). Such object *has* to be local.
-                 */
-                mds = cd->cmm_local_num;
-
-        /* select the proper set of operations based on object location */
-        if (mds == cd->cmm_local_num) {
-                struct cml_object *clo;
-
-                OBD_ALLOC_PTR(clo);
-                if (clo != NULL) {
-                        lo = &clo->cmm_obj.cmo_obj.mo_lu;
-                        lu_object_init(lo, NULL, ld);
-                        clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
-                        clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
-                        lo->lo_ops = &cml_obj_ops;
-                }
-        } else {
-                struct cmr_object *cro;
-
-                OBD_ALLOC_PTR(cro);
-                if (cro != NULL) {
-                        lo = &cro->cmm_obj.cmo_obj.mo_lu;
-                        lu_object_init(lo, NULL, ld);
-                        cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
-                        cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
-                        lo->lo_ops = &cmr_obj_ops;
-                        cro->cmo_num = mds;
-                }
-        }
-        RETURN(lo);
-}
-
-/**
- * Get local child device.
- */
-static struct lu_device *cml_child_dev(struct cmm_device *d)
-{
-        return &d->cmm_child->md_lu_dev;
-}
-
-/**
- * Free cml_object.
- */
-static void cml_object_free(const struct lu_env *env,
-                            struct lu_object *lo)
-{
-        struct cml_object *clo = lu2cml_obj(lo);
-        lu_object_fini(lo);
-        OBD_FREE_PTR(clo);
-}
-
-/**
- * Initialize cml_object.
- */
-static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
-                           const struct lu_object_conf *unused)
-{
-        struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
-        struct lu_device  *c_dev;
-        struct lu_object  *c_obj;
-        int rc;
-
-        ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
-        if (cd->cmm_tgt_count == 0)
-                lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
-        else
-                lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
-#endif
-        c_dev = cml_child_dev(cd);
-        if (c_dev == NULL) {
-                rc = -ENOENT;
-        } else {
-                c_obj = c_dev->ld_ops->ldo_object_alloc(env,
-                                                        lo->lo_header, c_dev);
-                if (c_obj != NULL) {
-                        lu_object_add(lo, c_obj);
-                        rc = 0;
-                } else {
-                        rc = -ENOMEM;
-                }
-        }
-
-        RETURN(rc);
-}
-
-static int cml_object_print(const struct lu_env *env, void *cookie,
-                            lu_printer_t p, const struct lu_object *lo)
-{
-        return (*p)(env, cookie, "[local]");
-}
-
-static const struct lu_object_operations cml_obj_ops = {
-        .loo_object_init    = cml_object_init,
-        .loo_object_free    = cml_object_free,
-        .loo_object_print   = cml_object_print
-};
-
-/**
- * \name CMM local md_object operations.
- * All of them call just corresponding operations on next layer.
- * @{
- */
-static int cml_object_create(const struct lu_env *env,
-                             struct md_object *mo,
-                             const struct md_op_spec *spec,
-                             struct md_attr *attr)
-{
-        int rc;
-        ENTRY;
-        rc = mo_object_create(env, md_object_next(mo), spec, attr);
-        RETURN(rc);
-}
-
-static int cml_permission(const struct lu_env *env,
-                          struct md_object *p, struct md_object *c,
-                          struct md_attr *attr, int mask)
-{
-        int rc;
-        ENTRY;
-        rc = mo_permission(env, md_object_next(p), md_object_next(c),
-                           attr, mask);
-        RETURN(rc);
-}
-
-static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
-                        struct md_attr *attr)
-{
-        int rc;
-        ENTRY;
-        rc = mo_attr_get(env, md_object_next(mo), attr);
-        RETURN(rc);
-}
-
-static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
-                        const struct md_attr *attr)
-{
-        int rc;
-        ENTRY;
-        rc = mo_attr_set(env, md_object_next(mo), attr);
-        RETURN(rc);
-}
-
-static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
-                         struct lu_buf *buf, const char *name)
-{
-        int rc;
-        ENTRY;
-        rc = mo_xattr_get(env, md_object_next(mo), buf, name);
-        RETURN(rc);
-}
-
-static int cml_readlink(const struct lu_env *env, struct md_object *mo,
-                        struct lu_buf *buf)
-{
-        int rc;
-        ENTRY;
-        rc = mo_readlink(env, md_object_next(mo), buf);
-        RETURN(rc);
-}
-
-static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type,
-                         int flags, struct md_object *mo)
-{
-        int rc;
-        ENTRY;
-        rc = mo_changelog(env, type, flags, md_object_next(mo));
-        RETURN(rc);
-}
-
-static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
-                          struct lu_buf *buf)
-{
-        int rc;
-        ENTRY;
-        rc = mo_xattr_list(env, md_object_next(mo), buf);
-        RETURN(rc);
-}
-
-static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
-                         const struct lu_buf *buf, const char *name,
-                         int fl)
-{
-        int rc;
-        ENTRY;
-        rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
-        RETURN(rc);
-}
-
-static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
-                         const char *name)
-{
-        int rc;
-        ENTRY;
-        rc = mo_xattr_del(env, md_object_next(mo), name);
-        RETURN(rc);
-}
-
-static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
-                       const struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-        rc = mo_ref_add(env, md_object_next(mo), ma);
-        RETURN(rc);
-}
-
-static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
-                       struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-        rc = mo_ref_del(env, md_object_next(mo), ma);
-        RETURN(rc);
-}
-
-static int cml_open(const struct lu_env *env, struct md_object *mo,
-                    int flags)
-{
-        int rc;
-        ENTRY;
-        rc = mo_open(env, md_object_next(mo), flags);
-        RETURN(rc);
-}
-
-static int cml_close(const struct lu_env *env, struct md_object *mo,
-                     struct md_attr *ma, int mode)
-{
-        int rc;
-        ENTRY;
-        rc = mo_close(env, md_object_next(mo), ma, mode);
-        RETURN(rc);
-}
-
-static int cml_readpage(const struct lu_env *env, struct md_object *mo,
-                        const struct lu_rdpg *rdpg)
-{
-        int rc;
-        ENTRY;
-        rc = mo_readpage(env, md_object_next(mo), rdpg);
-        RETURN(rc);
-}
-
-static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
-                        struct lustre_capa *capa, int renewal)
-{
-        int rc;
-        ENTRY;
-        rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
-        RETURN(rc);
-}
-
-static int cml_path(const struct lu_env *env, struct md_object *mo,
-                    char *path, int pathlen, __u64 *recno, int *linkno)
-{
-        int rc;
-        ENTRY;
-        rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
-        RETURN(rc);
-}
-
-static int cml_file_lock(const struct lu_env *env, struct md_object *mo,
-                         struct lov_mds_md *lmm, struct ldlm_extent *extent,
-                         struct lustre_handle *lockh)
-{
-        int rc;
-        ENTRY;
-        rc = mo_file_lock(env, md_object_next(mo), lmm, extent, lockh);
-        RETURN(rc);
-}
-
-static int cml_file_unlock(const struct lu_env *env, struct md_object *mo,
-                           struct lov_mds_md *lmm, struct lustre_handle *lockh)
-{
-        int rc;
-        ENTRY;
-        rc = mo_file_unlock(env, md_object_next(mo), lmm, lockh);
-        RETURN(rc);
-}
-
-static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
-{
-        int rc;
-        ENTRY;
-        rc = mo_object_sync(env, md_object_next(mo));
-        RETURN(rc);
-}
-
-static const struct md_object_operations cml_mo_ops = {
-        .moo_permission    = cml_permission,
-        .moo_attr_get      = cml_attr_get,
-        .moo_attr_set      = cml_attr_set,
-        .moo_xattr_get     = cml_xattr_get,
-        .moo_xattr_list    = cml_xattr_list,
-        .moo_xattr_set     = cml_xattr_set,
-        .moo_xattr_del     = cml_xattr_del,
-        .moo_object_create = cml_object_create,
-        .moo_ref_add       = cml_ref_add,
-        .moo_ref_del       = cml_ref_del,
-        .moo_open          = cml_open,
-        .moo_close         = cml_close,
-        .moo_readpage      = cml_readpage,
-        .moo_readlink      = cml_readlink,
-        .moo_changelog     = cml_changelog,
-        .moo_capa_get      = cml_capa_get,
-        .moo_object_sync   = cml_object_sync,
-        .moo_path          = cml_path,
-        .moo_file_lock     = cml_file_lock,
-        .moo_file_unlock   = cml_file_unlock,
-};
-/** @} */
-
-/**
- * \name CMM local md_dir_operations.
- * @{
- */
-/**
- * cml lookup object fid by name.
- * This returns only FID by name.
- */
-static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
-                      const struct lu_name *lname, struct lu_fid *lf,
-                      struct md_op_spec *spec)
-{
-        int rc;
-        ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
-        if (spec != NULL && spec->sp_ck_split) {
-                rc = cmm_split_check(env, mo_p, lname->ln_name);
-                if (rc)
-                        RETURN(rc);
-        }
-#endif
-        rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
-        RETURN(rc);
-
-}
-
-/**
- * Helper to return lock mode. Used in split cases only.
- */
-static mdl_mode_t cml_lock_mode(const struct lu_env *env,
-                                struct md_object *mo, mdl_mode_t lm)
-{
-        int rc = MDL_MINMODE;
-        ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
-        rc = cmm_split_access(env, mo, lm);
-#endif
-
-        RETURN(rc);
-}
-
-/**
- * Create operation for cml.
- * Objects are local, but split can happen.
- * If split is not needed this will call next layer mdo_create().
- *
- * \param mo_p Parent directory. Local object.
- * \param lname name of file to create.
- * \param mo_c Child object. It has no real inode yet.
- * \param spec creation specification.
- * \param ma child object attributes.
- */
-static int cml_create(const struct lu_env *env, struct md_object *mo_p,
-                      const struct lu_name *lname, struct md_object *mo_c,
-                      struct md_op_spec *spec, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-#ifdef HAVE_SPLIT_SUPPORT
-        /* Lock mode always should be sane. */
-        LASSERT(spec->sp_cr_mode != MDL_MINMODE);
-
-        /*
-         * Sigh... This is long story. MDT may have race with detecting if split
-         * is possible in cmm. We know this race and let it live, because
-         * getting it rid (with some sem or spinlock) will also mean that
-         * PDIROPS for create will not work because we kill parallel work, what
-         * is really bad for performance and makes no sense having PDIROPS. So,
-         * we better allow the race to live, but split dir only if some of
-         * concurrent threads takes EX lock, not matter which one. So that, say,
-         * two concurrent threads may have different lock modes on directory (CW
-         * and EX) and not first one which comes here and see that split is
-         * possible should split the dir, but only that one which has EX
-         * lock. And we do not care that in this case, split may happen a bit
-         * later (when dir size will not be necessarily 64K, but may be a bit
-         * larger). So that, we allow concurrent creates and protect split by EX
-         * lock.
-         */
-        if (spec->sp_cr_mode == MDL_EX) {
-                /**
-                 * Split cases:
-                 * - Try to split \a mo_p upon each create operation.
-                 *   If split is ok, -ERESTART is returned and current thread
-                 *   will not peoceed with create. Instead it sends -ERESTART
-                 *   to client to let it know that correct MDT must be chosen.
-                 * \see cmm_split_dir()
-                 */
-                rc = cmm_split_dir(env, mo_p);
-                if (rc)
-                        /*
-                         * -ERESTART or some split error is returned, we can't
-                         * proceed with create.
-                         */
-                        GOTO(out, rc);
-        }
-
-        if (spec != NULL && spec->sp_ck_split) {
-                /**
-                 * - Directory is split already. Let the caller know that
-                 * it should tell client that directory is split and operation
-                 * should repeat to correct MDT.
-                 * \see cmm_split_check()
-                 */
-                rc = cmm_split_check(env, mo_p, lname->ln_name);
-                if (rc)
-                        GOTO(out, rc);
-        }
-#endif
-
-        rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
-                        spec, ma);
-
-        EXIT;
-#ifdef HAVE_SPLIT_SUPPORT
-out:
-#endif
-        return rc;
-}
-
-/** Call mdo_create_data() on next layer. All objects are local. */
-static int cml_create_data(const struct lu_env *env, struct md_object *p,
-                           struct md_object *o,
-                           const struct md_op_spec *spec,
-                           struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-        rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
-                             spec, ma);
-        RETURN(rc);
-}
-
-/** Call mdo_link() on next layer. All objects are local. */
-static int cml_link(const struct lu_env *env, struct md_object *mo_p,
-                    struct md_object *mo_s, const struct lu_name *lname,
-                    struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-        rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
-                      lname, ma);
-        RETURN(rc);
-}
-
-/** Call mdo_unlink() on next layer. All objects are local. */
-static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
-                      struct md_object *mo_c, const struct lu_name *lname,
-                      struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-        rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
-                        lname, ma);
-        RETURN(rc);
-}
-
-/** Call mdo_lum_lmm_cmp() on next layer */
-static int cml_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
-                           const struct md_op_spec *spec, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        rc = mdo_lum_lmm_cmp(env, md_object_next(mo_c), spec, ma);
-        RETURN(rc);
-}
-
-/**
- * \ingroup cmm
- * Get mode of object.
- * Used in both cml and cmr hence can produce RPC to another server.
- */
-static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
-                        const struct lu_fid *lf, struct md_attr *ma,
-                        int *remote)
-{
-        struct md_object *mo_s = md_object_find_slice(env, md, lf);
-        struct cmm_thread_info *cmi;
-        struct md_attr *tmp_ma;
-        int rc;
-        ENTRY;
-
-        if (IS_ERR(mo_s))
-                RETURN(PTR_ERR(mo_s));
-
-        if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
-                *remote = 1;
-
-        cmi = cmm_env_info(env);
-        tmp_ma = &cmi->cmi_ma;
-        tmp_ma->ma_need = MA_INODE;
-        tmp_ma->ma_valid = 0;
-        /* get type from src, can be remote req */
-        rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
-        if (rc == 0) {
-                ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
-                ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
-                ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
-                ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
-                ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
-        }
-        lu_object_put(env, &mo_s->mo_lu);
-        RETURN(rc);
-}
-
-/**
- * \ingroup cmm
- * Set ctime for object.
- * Used in both cml and cmr hence can produce RPC to another server.
- */
-static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
-                            const struct lu_fid *lf, struct md_attr *ma)
-{
-        struct md_object *mo_s = md_object_find_slice(env, md, lf);
-        int rc;
-        ENTRY;
-
-        if (IS_ERR(mo_s))
-                RETURN(PTR_ERR(mo_s));
-
-        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-        /* set ctime to obj, can be remote req */
-        rc = mo_attr_set(env, md_object_next(mo_s), ma);
-        lu_object_put(env, &mo_s->mo_lu);
-        RETURN(rc);
-}
-
-/** Helper to output debug information about rename operation. */
-static inline void cml_rename_warn(const char *fname,
-                                  struct md_object *mo_po,
-                                  struct md_object *mo_pn,
-                                  const struct lu_fid *lf,
-                                  const char *s_name,
-                                  struct md_object *mo_t,
-                                  const char *t_name,
-                                  int err)
-{
-        if (mo_t)
-                CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
-                      "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
-                      "[tname %s] [err %d]\n", fname,
-                      PFID(lu_object_fid(&mo_po->mo_lu)),
-                      PFID(lu_object_fid(&mo_pn->mo_lu)),
-                      PFID(lf), s_name,
-                      PFID(lu_object_fid(&mo_t->mo_lu)),
-                      t_name, err);
-        else
-                CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
-                      "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
-                      "[tname %s] [err %d]\n", fname,
-                      PFID(lu_object_fid(&mo_po->mo_lu)),
-                      PFID(lu_object_fid(&mo_pn->mo_lu)),
-                      PFID(lf), s_name,
-                      t_name, err);
-}
-
-/**
- * Rename operation for cml.
- *
- * This is the most complex cross-reference operation. It may consist of up to 4
- * MDS server and require several RPCs to be sent.
- *
- * \param mo_po Old parent object.
- * \param mo_pn New parent object.
- * \param lf FID of object to rename.
- * \param ls_name Source file name.
- * \param mo_t target object. Should be NULL here.
- * \param lt_name Name of target file.
- * \param ma object attributes.
- */
-static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
-                      struct md_object *mo_pn, const struct lu_fid *lf,
-                      const struct lu_name *ls_name, struct md_object *mo_t,
-                      const struct lu_name *lt_name, struct md_attr *ma)
-{
-        struct cmm_thread_info *cmi;
-        struct md_attr *tmp_ma = NULL;
-        struct md_object *tmp_t = mo_t;
-        int remote = 0, rc;
-        ENTRY;
-
-        rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
-        if (rc)
-                RETURN(rc);
-
-        if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
-                /**
-                 * \note \a mo_t is remote object and there is RPC to unlink it.
-                 * Before that, do local sanity check for rename first.
-                 */
-                if (!remote) {
-                        struct md_object *mo_s = md_object_find_slice(env,
-                                                        md_obj2dev(mo_po), lf);
-                        if (IS_ERR(mo_s))
-                                RETURN(PTR_ERR(mo_s));
-
-                        LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
-                        rc = mo_permission(env, md_object_next(mo_po),
-                                           md_object_next(mo_s),
-                                           ma, MAY_RENAME_SRC);
-                        lu_object_put(env, &mo_s->mo_lu);
-                        if (rc)
-                                RETURN(rc);
-                } else {
-                        rc = mo_permission(env, NULL, md_object_next(mo_po),
-                                           ma, MAY_UNLINK | MAY_VTX_FULL);
-                        if (rc)
-                                RETURN(rc);
-                }
-
-                rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
-                                   MAY_UNLINK | MAY_VTX_PART);
-                if (rc)
-                        RETURN(rc);
-
-                /*
-                 * /note \a ma will be changed after mo_ref_del(), but we will use
-                 * it for mdo_rename() later, so save it before mo_ref_del().
-                 */
-                cmi = cmm_env_info(env);
-                tmp_ma = &cmi->cmi_ma;
-                *tmp_ma = *ma;
-                rc = mo_ref_del(env, md_object_next(mo_t), ma);
-                if (rc)
-                        RETURN(rc);
-
-                tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
-                mo_t = NULL;
-        }
-
-        /**
-         * \note for src on remote MDS case, change its ctime before local
-         * rename. Firstly, do local sanity check for rename if necessary.
-         */
-        if (remote) {
-                if (!tmp_ma) {
-                        rc = mo_permission(env, NULL, md_object_next(mo_po),
-                                           ma, MAY_UNLINK | MAY_VTX_FULL);
-                        if (rc)
-                                RETURN(rc);
-
-                        if (mo_t) {
-                                LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
-                                rc = mo_permission(env, md_object_next(mo_pn),
-                                                   md_object_next(mo_t),
-                                                   ma, MAY_RENAME_TAR);
-                                if (rc)
-                                        RETURN(rc);
-                        } else {
-                                int mask;
-
-                                if (mo_po != mo_pn)
-                                        mask = (S_ISDIR(ma->ma_attr.la_mode) ?
-                                                MAY_LINK : MAY_CREATE);
-                                else
-                                        mask = MAY_CREATE;
-                                rc = mo_permission(env, NULL,
-                                                   md_object_next(mo_pn),
-                                                   NULL, mask);
-                                if (rc)
-                                        RETURN(rc);
-                        }
-
-                        ma->ma_attr_flags |= MDS_PERM_BYPASS;
-                } else {
-                        LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
-                }
-
-                rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
-                                      tmp_ma ? tmp_ma : ma);
-                if (rc) {
-                        /* TODO: revoke mo_t if necessary. */
-                        cml_rename_warn("cmm_rename_ctime", mo_po,
-                                        mo_pn, lf, ls_name->ln_name,
-                                        tmp_t, lt_name->ln_name, rc);
-                        RETURN(rc);
-                }
-        }
-
-        /* local rename, mo_t can be NULL */
-        rc = mdo_rename(env, md_object_next(mo_po),
-                        md_object_next(mo_pn), lf, ls_name,
-                        md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
-        if (rc)
-                /* TODO: revoke all cml_rename */
-                cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
-                                ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
-
-        RETURN(rc);
-}
-
-/**
- * Rename target partial operation.
- * Used for cross-ref rename.
- */
-static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
-                          struct md_object *mo_t, const struct lu_fid *lf,
-                          const struct lu_name *lname, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        rc = mdo_rename_tgt(env, md_object_next(mo_p),
-                            md_object_next(mo_t), lf, lname, ma);
-        RETURN(rc);
-}
-
-/**
- * Name insert only operation.
- * used only in case of rename_tgt() when target doesn't exist.
- */
-static int cml_name_insert(const struct lu_env *env, struct md_object *p,
-                           const struct lu_name *lname, const struct lu_fid *lf,
-                           const struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
-
-        RETURN(rc);
-}
-
-/**
- * \ingroup cmm
- * Check two fids are not subdirectories.
- */
-static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
-                         const struct lu_fid *fid, struct lu_fid *sfid)
-{
-        struct cmm_thread_info *cmi;
-        int rc;
-        ENTRY;
-
-        cmi = cmm_env_info(env);
-        rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
-        if (rc)
-                RETURN(rc);
-
-        if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
-                RETURN(0);
-
-        rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
-        RETURN(rc);
-}
-
-static const struct md_dir_operations cml_dir_ops = {
-        .mdo_is_subdir   = cmm_is_subdir,
-        .mdo_lookup      = cml_lookup,
-        .mdo_lock_mode   = cml_lock_mode,
-        .mdo_create      = cml_create,
-        .mdo_link        = cml_link,
-        .mdo_unlink      = cml_unlink,
-        .mdo_lum_lmm_cmp = cml_lum_lmm_cmp,
-        .mdo_name_insert = cml_name_insert,
-        .mdo_rename      = cml_rename,
-        .mdo_rename_tgt  = cml_rename_tgt,
-        .mdo_create_data = cml_create_data,
-};
-/** @} */
-/** @} */
-
-/**
- * \addtogroup cmr
- * @{
- */
-/**
- * \name cmr helpers
- * @{
- */
-/** Get cmr_object from lu_object. */
-static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
-{
-        return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
-}
-/** Get cmr_object from md_object. */
-static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
-{
-        return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
-}
-/** Get cmr_object from cmm_object. */
-static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
-{
-        return container_of0(co, struct cmr_object, cmm_obj);
-}
-/** @} */
-
-/**
- * Get proper child device from MDCs.
- */
-static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
-{
-        struct lu_device *next = NULL;
-        struct mdc_device *mdc;
-
-        cfs_spin_lock(&d->cmm_tgt_guard);
-        cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
-                if (mdc->mc_num == num) {
-                        next = mdc2lu_dev(mdc);
-                        break;
-                }
-        }
-        cfs_spin_unlock(&d->cmm_tgt_guard);
-        return next;
-}
-
-/**
- * Free cmr_object.
- */
-static void cmr_object_free(const struct lu_env *env,
-                            struct lu_object *lo)
-{
-        struct cmr_object *cro = lu2cmr_obj(lo);
-        lu_object_fini(lo);
-        OBD_FREE_PTR(cro);
-}
-
-/**
- * Initialize cmr object.
- */
-static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
-                           const struct lu_object_conf *unused)
-{
-        struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
-        struct lu_device  *c_dev;
-        struct lu_object  *c_obj;
-        int rc;
-
-        ENTRY;
-
-        c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
-        if (c_dev == NULL) {
-                rc = -ENOENT;
-        } else {
-                c_obj = c_dev->ld_ops->ldo_object_alloc(env,
-                                                        lo->lo_header, c_dev);
-                if (c_obj != NULL) {
-                        lu_object_add(lo, c_obj);
-                        rc = 0;
-                } else {
-                        rc = -ENOMEM;
-                }
-        }
-
-        RETURN(rc);
-}
-
-/**
- * Output lu_object data.
- */
-static int cmr_object_print(const struct lu_env *env, void *cookie,
-                            lu_printer_t p, const struct lu_object *lo)
-{
-        const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
-        return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
-}
-
-/**
- * Cmr instance of lu_object_operations.
- */
-static const struct lu_object_operations cmr_obj_ops = {
-        .loo_object_init    = cmr_object_init,
-        .loo_object_free    = cmr_object_free,
-        .loo_object_print   = cmr_object_print
-};
-
-/**
- * \name cmr remote md_object operations.
- * All operations here are invalid and return errors. There is no local object
- * so these operations return two kinds of error:
- * -# -EFAULT if operation is prohibited.
- * -# -EREMOTE if operation can be done just to notify upper level about remote
- *  object.
- *
- * @{
- */
-static int cmr_object_create(const struct lu_env *env,
-                             struct md_object *mo,
-                             const struct md_op_spec *spec,
-                             struct md_attr *ma)
-{
-        return -EFAULT;
-}
-
-static int cmr_permission(const struct lu_env *env,
-                          struct md_object *p, struct md_object *c,
-                          struct md_attr *attr, int mask)
-{
-        return -EREMOTE;
-}
-
-static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
-                        struct md_attr *attr)
-{
-        return -EREMOTE;
-}
-
-static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
-                        const struct md_attr *attr)
-{
-        return -EFAULT;
-}
-
-static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
-                         struct lu_buf *buf, const char *name)
-{
-        return -EFAULT;
-}
-
-static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
-                        struct lu_buf *buf)
-{
-        return -EFAULT;
-}
-
-static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type,
-                         int flags, struct md_object *mo)
-{
-        return -EFAULT;
-}
-
-static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
-                          struct lu_buf *buf)
-{
-        return -EFAULT;
-}
-
-static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
-                         const struct lu_buf *buf, const char *name,
-                         int fl)
-{
-        return -EFAULT;
-}
-
-static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
-                         const char *name)
-{
-        return -EFAULT;
-}
-
-static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
-                       const struct md_attr *ma)
-{
-        return -EFAULT;
-}
-
-static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
-                       struct md_attr *ma)
-{
-        return -EFAULT;
-}
-
-static int cmr_open(const struct lu_env *env, struct md_object *mo,
-                    int flags)
-{
-        return -EREMOTE;
-}
-
-static int cmr_close(const struct lu_env *env, struct md_object *mo,
-                     struct md_attr *ma, int mode)
-{
-        return -EFAULT;
-}
-
-static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
-                        const struct lu_rdpg *rdpg)
-{
-        return -EREMOTE;
-}
-
-static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
-                        struct lustre_capa *capa, int renewal)
-{
-        return -EFAULT;
-}
-
-static int cmr_path(const struct lu_env *env, struct md_object *obj,
-                    char *path, int pathlen, __u64 *recno, int *linkno)
-{
-        return -EREMOTE;
-}
-
-static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
-{
-        return -EFAULT;
-}
-
-static int cmr_file_lock(const struct lu_env *env, struct md_object *mo,
-                         struct lov_mds_md *lmm, struct ldlm_extent *extent,
-                         struct lustre_handle *lockh)
-{
-        return -EREMOTE;
-}
-
-static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo,
-                           struct lov_mds_md *lmm, struct lustre_handle *lockh)
-{
-        return -EREMOTE;
-}
-
-static int cmr_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
-                           const struct md_op_spec *spec, struct md_attr *ma)
-{
-        return -EREMOTE;
-}
-
-/** Set of md_object_operations for cmr. */
-static const struct md_object_operations cmr_mo_ops = {
-        .moo_permission    = cmr_permission,
-        .moo_attr_get      = cmr_attr_get,
-        .moo_attr_set      = cmr_attr_set,
-        .moo_xattr_get     = cmr_xattr_get,
-        .moo_xattr_set     = cmr_xattr_set,
-        .moo_xattr_list    = cmr_xattr_list,
-        .moo_xattr_del     = cmr_xattr_del,
-        .moo_object_create = cmr_object_create,
-        .moo_ref_add       = cmr_ref_add,
-        .moo_ref_del       = cmr_ref_del,
-        .moo_open          = cmr_open,
-        .moo_close         = cmr_close,
-        .moo_readpage      = cmr_readpage,
-        .moo_readlink      = cmr_readlink,
-        .moo_changelog     = cmr_changelog,
-        .moo_capa_get      = cmr_capa_get,
-        .moo_object_sync   = cmr_object_sync,
-        .moo_path          = cmr_path,
-        .moo_file_lock     = cmr_file_lock,
-        .moo_file_unlock   = cmr_file_unlock,
-};
-/** @} */
-
-/**
- * \name cmr md_dir operations.
- *
- * All methods below are cross-ref by nature. They consist of remote call and
- * local operation. Due to future rollback functionality there are several
- * limitations for such methods:
- * -# remote call should be done at first to do epoch negotiation between all
- * MDS involved and to avoid the RPC inside transaction.
- * -# only one RPC can be sent - also due to epoch negotiation.
- * For more details see rollback HLD/DLD.
- * @{
- */
-static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
-                      const struct lu_name *lname, struct lu_fid *lf,
-                      struct md_op_spec *spec)
-{
-        /*
-         * This can happens while rename() If new parent is remote dir, lookup
-         * will happen here.
-         */
-
-        return -EREMOTE;
-}
-
-/** Return lock mode. */
-static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
-                                struct md_object *mo, mdl_mode_t lm)
-{
-        return MDL_MINMODE;
-}
-
-/**
- * Create operation for cmr.
- * Remote object creation and local name insert.
- *
- * \param mo_p Parent directory. Local object.
- * \param lchild_name name of file to create.
- * \param mo_c Child object. It has no real inode yet.
- * \param spec creation specification.
- * \param ma child object attributes.
- */
-static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
-                      const struct lu_name *lchild_name, struct md_object *mo_c,
-                      struct md_op_spec *spec,
-                      struct md_attr *ma)
-{
-        struct cmm_thread_info *cmi;
-        struct md_attr *tmp_ma;
-        int rc;
-        ENTRY;
-
-        /* Make sure that name isn't exist before doing remote call. */
-        rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
-                        &cmm_env_info(env)->cmi_fid, NULL);
-        if (rc == 0)
-                RETURN(-EEXIST);
-        else if (rc != -ENOENT)
-                RETURN(rc);
-
-        /* check the SGID attr */
-        cmi = cmm_env_info(env);
-        LASSERT(cmi);
-        tmp_ma = &cmi->cmi_ma;
-        tmp_ma->ma_valid = 0;
-        tmp_ma->ma_need = MA_INODE;
-
-#ifdef CONFIG_FS_POSIX_ACL
-        if (!S_ISLNK(ma->ma_attr.la_mode)) {
-                tmp_ma->ma_acl = cmi->cmi_xattr_buf;
-                tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
-                tmp_ma->ma_need |= MA_ACL_DEF;
-        }
-#endif
-        rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
-        if (rc)
-                RETURN(rc);
-
-        if (tmp_ma->ma_attr.la_mode & S_ISGID) {
-                ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
-                if (S_ISDIR(ma->ma_attr.la_mode)) {
-                        ma->ma_attr.la_mode |= S_ISGID;
-                        ma->ma_attr.la_valid |= LA_MODE;
-                }
-        }
-
-#ifdef CONFIG_FS_POSIX_ACL
-        if (tmp_ma->ma_valid & MA_ACL_DEF) {
-                spec->u.sp_ea.fid = spec->u.sp_pfid;
-                spec->u.sp_ea.eadata = tmp_ma->ma_acl;
-                spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
-                spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
-        }
-#endif
-
-        /* Local permission check for name_insert before remote ops. */
-        rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
-                           (S_ISDIR(ma->ma_attr.la_mode) ?
-                           MAY_LINK : MAY_CREATE));
-        if (rc)
-                RETURN(rc);
-
-        /**
-         * \note \a ma will be changed after mo_object_create(), but we will use
-         * it for mdo_name_insert() later, so save it before mo_object_create().
-         */
-        *tmp_ma = *ma;
-        rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
-        if (rc == 0) {
-                tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
-                rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
-                                     lu_object_fid(&mo_c->mo_lu), tmp_ma);
-                if (unlikely(rc)) {
-                        /* TODO: remove object mo_c on remote MDS */
-                        CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
-                              " [name %s] [mo_c "DFID"] [err %d]\n",
-                              PFID(lu_object_fid(&mo_p->mo_lu)),
-                              lchild_name->ln_name,
-                              PFID(lu_object_fid(&mo_c->mo_lu)), rc);
-                }
-        }
-
-        RETURN(rc);
-}
-
-/**
- * Link operations for cmr.
- *
- * The link RPC is always issued to the server where source parent is living.
- * The first operation to do is object nlink increment on remote server.
- * Second one is local mdo_name_insert().
- *
- * \param mo_p parent directory. It is local.
- * \param mo_s source object to link. It is remote.
- * \param lname Name of link file.
- * \param ma object attributes.
- */
-static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
-                    struct md_object *mo_s, const struct lu_name *lname,
-                    struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        /* Make sure that name isn't exist before doing remote call. */
-        rc = mdo_lookup(env, md_object_next(mo_p), lname,
-                        &cmm_env_info(env)->cmi_fid, NULL);
-        if (rc == 0) {
-                rc = -EEXIST;
-        } else if (rc == -ENOENT) {
-                /* Local permission check for name_insert before remote ops. */
-                rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
-                                   MAY_CREATE);
-                if (rc)
-                        RETURN(rc);
-
-                rc = mo_ref_add(env, md_object_next(mo_s), ma);
-                if (rc == 0) {
-                        ma->ma_attr_flags |= MDS_PERM_BYPASS;
-                        rc = mdo_name_insert(env, md_object_next(mo_p), lname,
-                                             lu_object_fid(&mo_s->mo_lu), ma);
-                        if (unlikely(rc)) {
-                                /* TODO: ref_del from mo_s on remote MDS */
-                                CWARN("cmr_link failed, should revoke: "
-                                      "[mo_p "DFID"] [mo_s "DFID"] "
-                                      "[name %s] [err %d]\n",
-                                      PFID(lu_object_fid(&mo_p->mo_lu)),
-                                      PFID(lu_object_fid(&mo_s->mo_lu)),
-                                      lname->ln_name, rc);
-                        }
-                }
-        }
-        RETURN(rc);
-}
-
-/**
- * Unlink operations for cmr.
- *
- * The unlink RPC is always issued to the server where parent is living. Hence
- * the first operation to do is object unlink on remote server. Second one is
- * local mdo_name_remove().
- *
- * \param mo_p parent md_object. It is local.
- * \param mo_c child object to be unlinked. It is remote.
- * \param lname Name of file to unlink.
- * \param ma object attributes.
- */
-static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
-                      struct md_object *mo_c, const struct lu_name *lname,
-                      struct md_attr *ma)
-{
-        struct cmm_thread_info *cmi;
-        struct md_attr *tmp_ma;
-        int rc;
-        ENTRY;
-
-        /* Local permission check for name_remove before remote ops. */
-        rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
-                           MAY_UNLINK | MAY_VTX_PART);
-        if (rc)
-                RETURN(rc);
-
-        /*
-         * \note \a ma will be changed after mo_ref_del, but we will use
-         * it for mdo_name_remove() later, so save it before mo_ref_del().
-         */
-        cmi = cmm_env_info(env);
-        tmp_ma = &cmi->cmi_ma;
-        *tmp_ma = *ma;
-        rc = mo_ref_del(env, md_object_next(mo_c), ma);
-        if (rc == 0) {
-                tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
-                rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
-                if (unlikely(rc)) {
-                        /* TODO: ref_add to mo_c on remote MDS */
-                        CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
-                              " [mo_c "DFID"] [name %s] [err %d]\n",
-                              PFID(lu_object_fid(&mo_p->mo_lu)),
-                              PFID(lu_object_fid(&mo_c->mo_lu)),
-                              lname->ln_name, rc);
-                }
-        }
-
-        RETURN(rc);
-}
-
-/** Helper which outputs error message during cmr_rename() */
-static inline void cmr_rename_warn(const char *fname,
-                                  struct md_object *mo_po,
-                                  struct md_object *mo_pn,
-                                  const struct lu_fid *lf,
-                                  const char *s_name,
-                                  const char *t_name,
-                                  int err)
-{
-        CWARN("cmr_rename failed for %s, should revoke: "
-              "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
-              "[sname %s] [tname %s] [err %d]\n", fname,
-              PFID(lu_object_fid(&mo_po->mo_lu)),
-              PFID(lu_object_fid(&mo_pn->mo_lu)),
-              PFID(lf), s_name, t_name, err);
-}
-
-/**
- * Rename operation for cmr.
- *
- * This is the most complex cross-reference operation. It may consist of up to 4
- * MDS server and require several RPCs to be sent.
- *
- * \param mo_po Old parent object.
- * \param mo_pn New parent object.
- * \param lf FID of object to rename.
- * \param ls_name Source file name.
- * \param mo_t target object. Should be NULL here.
- * \param lt_name Name of target file.
- * \param ma object attributes.
- */
-static int cmr_rename(const struct lu_env *env,
-                      struct md_object *mo_po, struct md_object *mo_pn,
-                      const struct lu_fid *lf, const struct lu_name *ls_name,
-                      struct md_object *mo_t, const struct lu_name *lt_name,
-                      struct md_attr *ma)
-{
-        struct cmm_thread_info *cmi;
-        struct md_attr *tmp_ma;
-        int rc;
-        ENTRY;
-
-        LASSERT(mo_t == NULL);
-
-        /* get real type of src */
-        rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
-        if (rc)
-                RETURN(rc);
-
-        /* Local permission check for name_remove before remote ops. */
-        rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
-                           MAY_UNLINK | MAY_VTX_FULL);
-        if (rc)
-                RETURN(rc);
-
-        /**
-         * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
-         * for mdo_name_remove() later, so save it before mdo_rename_tgt.
-         */
-        cmi = cmm_env_info(env);
-        tmp_ma = &cmi->cmi_ma;
-        *tmp_ma = *ma;
-        /**
-         * \note The \a mo_pn is remote directory, so we cannot even know if there is
-         * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
-         * lookup and process this further.
-         */
-        rc = mdo_rename_tgt(env, md_object_next(mo_pn),
-                            NULL/* mo_t */, lf, lt_name, ma);
-        if (rc)
-                RETURN(rc);
-
-        tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
-
-        /* src object maybe on remote MDS, do remote ops first. */
-        rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
-        if (unlikely(rc)) {
-                /* TODO: revoke mdo_rename_tgt */
-                cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
-                                ls_name->ln_name, lt_name->ln_name, rc);
-                RETURN(rc);
-        }
-
-        /* only old name is removed localy */
-        rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
-        if (unlikely(rc))
-                /* TODO: revoke all cmr_rename */
-                cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
-                                ls_name->ln_name, lt_name->ln_name, rc);
-
-        RETURN(rc);
-}
-
-/**
- * Part of cross-ref rename().
- * Used to insert new name in new parent and unlink target.
- */
-static int cmr_rename_tgt(const struct lu_env *env,
-                          struct md_object *mo_p, struct md_object *mo_t,
-                          const struct lu_fid *lf, const struct lu_name *lname,
-                          struct md_attr *ma)
-{
-        struct cmm_thread_info *cmi;
-        struct md_attr *tmp_ma;
-        int rc;
-        ENTRY;
-
-        /* target object is remote one */
-        /* Local permission check for rename_tgt before remote ops. */
-        rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
-                           MAY_UNLINK | MAY_VTX_PART);
-        if (rc)
-                RETURN(rc);
-
-        /*
-         * XXX: @ma maybe changed after mo_ref_del, but we will use
-         * it for mdo_rename_tgt later, so save it before mo_ref_del.
-         */
-        cmi = cmm_env_info(env);
-        tmp_ma = &cmi->cmi_ma;
-        *tmp_ma = *ma;
-        rc = mo_ref_del(env, md_object_next(mo_t), ma);
-        /* continue locally with name handling only */
-        if (rc == 0) {
-                tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
-                rc = mdo_rename_tgt(env, md_object_next(mo_p),
-                                    NULL, lf, lname, tmp_ma);
-                if (unlikely(rc)) {
-                        /* TODO: ref_add to mo_t on remote MDS */
-                        CWARN("cmr_rename_tgt failed, should revoke: "
-                              "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
-                              "[name %s] [err %d]\n",
-                              PFID(lu_object_fid(&mo_p->mo_lu)),
-                              PFID(lu_object_fid(&mo_t->mo_lu)),
-                              PFID(lf),
-                              lname->ln_name, rc);
-                }
-        }
-        RETURN(rc);
-}
-/** @} */
-/**
- * The md_dir_operations for cmr.
- */
-static const struct md_dir_operations cmr_dir_ops = {
-        .mdo_is_subdir   = cmm_is_subdir,
-        .mdo_lookup      = cmr_lookup,
-        .mdo_lock_mode   = cmr_lock_mode,
-        .mdo_create      = cmr_create,
-        .mdo_link        = cmr_link,
-        .mdo_unlink      = cmr_unlink,
-        .mdo_lum_lmm_cmp = cmr_lum_lmm_cmp,
-        .mdo_rename      = cmr_rename,
-        .mdo_rename_tgt  = cmr_rename_tgt
-};
-/** @} */
diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c
deleted file mode 100644 (file)
index e799fa9..0000000
+++ /dev/null
@@ -1,758 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/cmm_split.c
- *
- * Lustre splitting dir
- *
- * Author: Alex Thomas  <alex@clusterfs.com>
- * Author: Wang Di      <wangdi@clusterfs.com>
- * Author: Yury Umanets <umka@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <obd_class.h>
-#include <lustre_fid.h>
-#include <lustre_mds.h>
-#include <lustre/lustre_idl.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-/**
- * \addtogroup split
- * @{
- */
-enum {
-        CMM_SPLIT_SIZE =  128 * 1024
-};
-
-/**
- * This function checks if passed \a name come to correct server (local MDT).
- *
- * \param mp Parent directory
- * \param name Name to lookup
- * \retval  -ERESTART Let client know that dir was split and client needs to
- * chose correct stripe.
- */
-int cmm_split_check(const struct lu_env *env, struct md_object *mp,
-                    const char *name)
-{
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
-        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-        struct cml_object *clo = md2cml_obj(mp);
-        int rc, lmv_size;
-        ENTRY;
-
-        cmm_lprocfs_time_start(env);
-
-        /* Not split yet */
-        if (clo->clo_split == CMM_SPLIT_NONE ||
-            clo->clo_split == CMM_SPLIT_DENIED)
-                GOTO(out, rc = 0);
-
-        lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
-
-        /* Try to get the LMV EA */
-        memset(ma, 0, sizeof(*ma));
-
-        ma->ma_need = MA_LMV;
-        ma->ma_lmv_size = lmv_size;
-        OBD_ALLOC(ma->ma_lmv, lmv_size);
-        if (ma->ma_lmv == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
-        rc = mo_attr_get(env, mp, ma);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        /* No LMV just return */
-        if (!(ma->ma_valid & MA_LMV)) {
-                /* update split state if unknown */
-                if (clo->clo_split == CMM_SPLIT_UNKNOWN)
-                        clo->clo_split = CMM_SPLIT_NONE;
-                GOTO(cleanup, rc = 0);
-        }
-
-        /* Skip checking the slave dirs (mea_count is 0) */
-        if (ma->ma_lmv->mea_count != 0) {
-                int idx;
-
-                /**
-                 * This gets stripe by name to check the name belongs to master
-                 * dir, otherwise return the -ERESTART
-                 */
-                idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
-
-                /**
-                 * When client does not know about split, it sends create() to
-                 * the master MDT and master replay back if directory is split.
-                 * So client should orward request to correct MDT. This
-                 * is why we check here if stripe zero or not. Zero stripe means
-                 * master stripe. If stripe calculated from name is not zero -
-                 * return -ERESTART.
-                 */
-                if (idx != 0)
-                        rc = -ERESTART;
-
-                /* update split state to DONE if unknown */
-                if (clo->clo_split == CMM_SPLIT_UNKNOWN)
-                        clo->clo_split = CMM_SPLIT_DONE;
-        } else {
-                /* split is denied for slave dir */
-                clo->clo_split = CMM_SPLIT_DENIED;
-        }
-        EXIT;
-cleanup:
-        OBD_FREE(ma->ma_lmv, lmv_size);
-out:
-        cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
-        return rc;
-}
-
-/**
- * Return preferable access mode to the caller taking into account the split
- * case and the fact of existing not splittable dirs.
- */
-int cmm_split_access(const struct lu_env *env, struct md_object *mo,
-                     mdl_mode_t lm)
-{
-        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-        int rc, split;
-        ENTRY;
-
-        memset(ma, 0, sizeof(*ma));
-
-        /*
-         * Check only if we need protection from split.  If not - mdt handles
-         * other cases.
-         */
-        rc = cmm_split_expect(env, mo, ma, &split);
-        if (rc) {
-                CERROR("Can't check for possible split, rc %d\n", rc);
-                RETURN(MDL_MINMODE);
-        }
-
-        /*
-         * Do not take PDO lock on non-splittable objects if this is not PW,
-         * this should speed things up a bit.
-         */
-        if (split == CMM_SPLIT_DONE && lm != MDL_PW)
-                RETURN(MDL_NL);
-
-        /* Protect splitting by exclusive lock. */
-        if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
-                RETURN(MDL_EX);
-
-        /*
-         * Have no idea about lock mode, let it be what higher layer wants.
-         */
-        RETURN(MDL_MINMODE);
-}
-
-/**
- * Check if split is expected for current thread.
- *
- * \param mo Directory to split.
- * \param ma md attributes.
- * \param split Flag to save split information.
- */
-int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
-                     struct md_attr *ma, int *split)
-{
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct cml_object *clo = md2cml_obj(mo);
-        struct lu_fid root_fid;
-        int rc;
-        ENTRY;
-
-        if (clo->clo_split == CMM_SPLIT_DONE ||
-            clo->clo_split == CMM_SPLIT_DENIED) {
-                *split = clo->clo_split;
-                RETURN(0);
-        }
-        /* CMM_SPLIT_UNKNOWN case below */
-
-        /* No need to split root object. */
-        rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
-                                              &root_fid);
-        if (rc)
-                RETURN(rc);
-
-        if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
-                /* update split state */
-                *split = clo->clo_split == CMM_SPLIT_DENIED;
-                RETURN(0);
-        }
-
-        /*
-         * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
-         * for this get_attr.
-         */
-        LASSERT(ma->ma_valid == 0);
-        ma->ma_need = MA_INODE | MA_LMV;
-        rc = mo_attr_get(env, mo, ma);
-        if (rc)
-                RETURN(rc);
-
-        /* No need split for already split object */
-        if (ma->ma_valid & MA_LMV) {
-                LASSERT(ma->ma_lmv_size > 0);
-                *split = clo->clo_split = CMM_SPLIT_DONE;
-                RETURN(0);
-        }
-
-        /* No need split for object whose size < CMM_SPLIT_SIZE */
-        if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
-                *split = clo->clo_split = CMM_SPLIT_NONE;
-                RETURN(0);
-        }
-
-        *split = clo->clo_split = CMM_SPLIT_NEEDED;
-        RETURN(0);
-}
-
-struct cmm_object *cmm_object_find(const struct lu_env *env,
-                                   struct cmm_device *d,
-                                   const struct lu_fid *f)
-{
-        return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
-}
-
-static inline void cmm_object_put(const struct lu_env *env,
-                                  struct cmm_object *o)
-{
-        lu_object_put(env, &o->cmo_obj.mo_lu);
-}
-
-/**
- * Allocate new FID on passed \a mc for slave object which is going to
- * create there soon.
- */
-static int cmm_split_fid_alloc(const struct lu_env *env,
-                               struct cmm_device *cmm,
-                               struct mdc_device *mc,
-                               struct lu_fid *fid)
-{
-        int rc;
-        ENTRY;
-
-        LASSERT(cmm != NULL && mc != NULL && fid != NULL);
-
-        cfs_down(&mc->mc_fid_sem);
-
-        /* Alloc new fid on \a mc. */
-        rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
-        if (rc > 0)
-                rc = 0;
-        cfs_up(&mc->mc_fid_sem);
-
-        RETURN(rc);
-}
-
-/**
- * Allocate new slave object on passed \a mc.
- */
-static int cmm_split_slave_create(const struct lu_env *env,
-                                  struct cmm_device *cmm,
-                                  struct mdc_device *mc,
-                                  struct lu_fid *fid,
-                                  struct md_attr *ma,
-                                  struct lmv_stripe_md *lmv,
-                                  int lmv_size)
-{
-        struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
-        struct cmm_object *obj;
-        int rc;
-        ENTRY;
-
-        /* Allocate new fid and store it to @fid */
-        rc = cmm_split_fid_alloc(env, cmm, mc, fid);
-        if (rc) {
-                CERROR("Can't alloc new fid on "LPU64
-                       ", rc %d\n", mc->mc_num, rc);
-                RETURN(rc);
-        }
-
-        /* Allocate new object on @mc */
-        obj = cmm_object_find(env, cmm, fid);
-        if (IS_ERR(obj))
-                RETURN(PTR_ERR(obj));
-
-        memset(spec, 0, sizeof *spec);
-        spec->u.sp_ea.fid = fid;
-        spec->u.sp_ea.eadata = lmv;
-        spec->u.sp_ea.eadatalen = lmv_size;
-        spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
-        rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
-                              spec, ma);
-        cmm_object_put(env, obj);
-        RETURN(rc);
-}
-
-/**
- * Create so many slaves as number of stripes.
- * This is called in split time before sending pages to slaves.
- */
-static int cmm_split_slaves_create(const struct lu_env *env,
-                                   struct md_object *mo,
-                                   struct md_attr *ma)
-{
-        struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
-        struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
-        struct mdc_device    *mc, *tmp;
-        struct lmv_stripe_md *lmv;
-        int i = 1, rc = 0;
-        ENTRY;
-
-        /* Init the split MEA */
-        lmv = ma->ma_lmv;
-        lmv->mea_master = cmm->cmm_local_num;
-        lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
-        lmv->mea_count = cmm->cmm_tgt_count + 1;
-
-        /*
-         * Store master FID to local node idx number. Local node is always
-         * master and its stripe number if 0.
-         */
-        lmv->mea_ids[0] = *lf;
-
-        memset(slave_lmv, 0, sizeof *slave_lmv);
-        slave_lmv->mea_master = cmm->cmm_local_num;
-        slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
-        slave_lmv->mea_count = 0;
-
-        cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
-                rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
-                                            ma, slave_lmv, sizeof(*slave_lmv));
-                if (rc)
-                        GOTO(cleanup, rc);
-                i++;
-        }
-        EXIT;
-cleanup:
-        return rc;
-}
-
-static inline int cmm_split_special_entry(struct lu_dirent *ent)
-{
-        if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
-            !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
-                return 1;
-        return 0;
-}
-
-/**
- * Convert string to the lu_name structure.
- */
-static inline struct lu_name *cmm_name(const struct lu_env *env,
-                                       char *name, int buflen)
-{
-        struct lu_name *lname;
-        struct cmm_thread_info *cmi;
-
-        LASSERT(buflen > 0);
-        LASSERT(name[buflen - 1] == '\0');
-
-        cmi = cmm_env_info(env);
-        lname = &cmi->cti_name;
-        lname->ln_name = name;
-        /* do NOT count the terminating '\0' of name for length */
-        lname->ln_namelen = buflen - 1;
-        return lname;
-}
-
-/**
- * Helper for cmm_split_remove_page(). It removes one entry from local MDT.
- * Do not corrupt byte order in page, it will be sent to remote MDT.
- */
-static int cmm_split_remove_entry(const struct lu_env *env,
-                                  struct md_object *mo,
-                                  struct lu_dirent *ent)
-{
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct cmm_thread_info *cmi;
-        struct md_attr *ma;
-        struct cmm_object *obj;
-        int is_dir, rc;
-        char *name;
-        struct lu_name *lname;
-        ENTRY;
-
-        if (cmm_split_special_entry(ent))
-                RETURN(0);
-
-        fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
-        obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
-        if (IS_ERR(obj))
-                RETURN(PTR_ERR(obj));
-
-        cmi = cmm_env_info(env);
-        ma = &cmi->cmi_ma;
-
-        if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
-                is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
-        else
-                /**
-                 * \note These days only cross-ref dirs are possible, so for the
-                 * sake of simplicity, in split, we suppose that all cross-ref
-                 * names point to directory and do not do additional getattr to
-                 * remote MDT.
-                 */
-                is_dir = 1;
-
-        OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
-        if (!name)
-                GOTO(cleanup, rc = -ENOMEM);
-
-        memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
-        lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
-        /**
-         * \note When split, no need update parent's ctime,
-         * and no permission check for name_remove.
-         */
-        ma->ma_attr.la_ctime = 0;
-        if (is_dir)
-                ma->ma_attr.la_mode = S_IFDIR;
-        else
-                ma->ma_attr.la_mode = 0;
-        ma->ma_attr.la_valid = LA_MODE;
-        ma->ma_valid = MA_INODE;
-
-        ma->ma_attr_flags |= MDS_PERM_BYPASS;
-        rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
-        OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        /**
-         * \note For each entry transferred to the slave MDS we should know
-         * whether this object is dir or not. Therefore the highest bit of the
-         * hash is used to indicate that (it is unused for hash purposes anyway).
-         */
-        if (is_dir) {
-                ent->lde_hash = le64_to_cpu(ent->lde_hash);
-                ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
-        }
-        EXIT;
-cleanup:
-        cmm_object_put(env, obj);
-        return rc;
-}
-
-/**
- * Remove all entries from passed page.
- * These entries are going to remote MDT and thus should be removed locally.
- */
-static int cmm_split_remove_page(const struct lu_env *env,
-                                 struct md_object *mo,
-                                 struct lu_rdpg *rdpg,
-                                 __u64 hash_end, __u32 *len)
-{
-        struct lu_dirpage *dp;
-        struct lu_dirent  *ent;
-        int rc = 0;
-        ENTRY;
-
-        *len = 0;
-        cfs_kmap(rdpg->rp_pages[0]);
-        dp = page_address(rdpg->rp_pages[0]);
-        for (ent = lu_dirent_start(dp);
-             ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
-             ent = lu_dirent_next(ent)) {
-                rc = cmm_split_remove_entry(env, mo, ent);
-                if (rc) {
-                        /*
-                         * XXX: Error handler to insert remove name back,
-                         * currently we assumed it will success anyway in
-                         * verfication test.
-                         */
-                        CERROR("Can not del %*.*s, rc %d\n",
-                               le16_to_cpu(ent->lde_namelen),
-                               le16_to_cpu(ent->lde_namelen),
-                               ent->lde_name, rc);
-                        GOTO(unmap, rc);
-                }
-                *len += lu_dirent_size(ent);
-        }
-
-        if (ent != lu_dirent_start(dp))
-                *len += sizeof(struct lu_dirpage);
-        EXIT;
-unmap:
-        cfs_kunmap(rdpg->rp_pages[0]);
-        return rc;
-}
-
-/**
- * Send one page of entries to the slave MDT.
- * This page contains entries to be created there.
- */
-static int cmm_split_send_page(const struct lu_env *env,
-                               struct md_object *mo,
-                               struct lu_rdpg *rdpg,
-                               struct lu_fid *fid, int len)
-{
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct cmm_object *obj;
-        int rc = 0;
-        ENTRY;
-
-        obj = cmm_object_find(env, cmm, fid);
-        if (IS_ERR(obj))
-                RETURN(PTR_ERR(obj));
-
-        rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
-                           rdpg->rp_pages[0], len);
-        cmm_object_put(env, obj);
-        RETURN(rc);
-}
-
-/** Read one page of entries from local MDT. */
-static int cmm_split_read_page(const struct lu_env *env,
-                               struct md_object *mo,
-                               struct lu_rdpg *rdpg)
-{
-        int rc;
-        ENTRY;
-        memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
-        cfs_kunmap(rdpg->rp_pages[0]);
-        rc = mo_readpage(env, md_object_next(mo), rdpg);
-        RETURN(rc);
-}
-
-/**
- * This function performs migration of each directory stripe to its MDS.
- */
-static int cmm_split_process_stripe(const struct lu_env *env,
-                                    struct md_object *mo,
-                                    struct lu_rdpg *rdpg,
-                                    struct lu_fid *lf,
-                                    __u64 end)
-{
-        int rc, done = 0;
-        ENTRY;
-
-        LASSERT(rdpg->rp_npages == 1);
-        do {
-                struct lu_dirpage *ldp;
-                __u32 len = 0;
-
-                /** - Read one page of entries from local MDT. */
-                rc = cmm_split_read_page(env, mo, rdpg);
-                if (rc) {
-                        CERROR("Error in readpage: %d\n", rc);
-                        RETURN(rc);
-                }
-
-                /** - Remove local entries which are going to remite MDT. */
-                rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
-                if (rc) {
-                        CERROR("Error in remove stripe entries: %d\n", rc);
-                        RETURN(rc);
-                }
-
-                /**
-                 * - Send entries page to slave MDT and repeat while there are
-                 * more pages.
-                 */
-                if (len > 0) {
-                        rc = cmm_split_send_page(env, mo, rdpg, lf, len);
-                        if (rc) {
-                                CERROR("Error in sending page: %d\n", rc);
-                                RETURN(rc);
-                        }
-                }
-
-                cfs_kmap(rdpg->rp_pages[0]);
-                ldp = page_address(rdpg->rp_pages[0]);
-                if (le64_to_cpu(ldp->ldp_hash_end) >= end)
-                        done = 1;
-
-                rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
-                cfs_kunmap(rdpg->rp_pages[0]);
-        } while (!done);
-
-        RETURN(rc);
-}
-
-/**
- * Directory scanner for split operation.
- *
- * It calculates hashes for names and organizes files to stripes.
- */
-static int cmm_split_process_dir(const struct lu_env *env,
-                                 struct md_object *mo,
-                                 struct md_attr *ma)
-{
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
-        __u64 hash_segment;
-        int rc = 0, i;
-        ENTRY;
-
-        memset(rdpg, 0, sizeof *rdpg);
-        rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
-        rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
-        rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
-
-        for (i = 0; i < rdpg->rp_npages; i++) {
-                rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
-                if (rdpg->rp_pages[i] == NULL)
-                        GOTO(cleanup, rc = -ENOMEM);
-        }
-
-        hash_segment = MAX_HASH_SIZE;
-        /** Whole hash range is divided on segments by number of MDS-es. */
-        do_div(hash_segment, cmm->cmm_tgt_count + 1);
-        /**
-         * For each segment the cmm_split_process_stripe() is called to move
-         * entries on new server.
-         */
-        for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
-                struct lu_fid *lf;
-                __u64 hash_end;
-
-                lf = &ma->ma_lmv->mea_ids[i];
-
-                rdpg->rp_hash = i * hash_segment;
-                if (i == cmm->cmm_tgt_count)
-                        hash_end = MAX_HASH_SIZE;
-                else
-                        hash_end = rdpg->rp_hash + hash_segment;
-                rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
-                if (rc) {
-                        CERROR("Error (rc = %d) while splitting for %d: fid="
-                               DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
-                               rdpg->rp_hash, hash_end);
-                        GOTO(cleanup, rc);
-                }
-        }
-        EXIT;
-cleanup:
-        for (i = 0; i < rdpg->rp_npages; i++)
-                if (rdpg->rp_pages[i] != NULL)
-                        cfs_free_page(rdpg->rp_pages[i]);
-        return rc;
-}
-
-/**
- * Directory splitting.
- *
- * Big directory can be split eventually.
- */
-int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
-{
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
-        int                rc = 0, split;
-        struct lu_buf     *buf;
-        ENTRY;
-
-        cmm_lprocfs_time_start(env);
-
-        LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-        memset(ma, 0, sizeof(*ma));
-
-        /** - Step1: Checking whether the dir needs to be split. */
-        rc = cmm_split_expect(env, mo, ma, &split);
-        if (rc)
-                GOTO(out, rc);
-
-        if (split != CMM_SPLIT_NEEDED) {
-                /* No split is needed, caller may proceed with create. */
-                GOTO(out, rc = 0);
-        }
-
-        /* Split should be done now, let's do it. */
-        CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
-              PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
-
-        /**
-         * /note Disable transactions for split, since there will be so many trans in
-         * this one ops, conflict with current recovery design.
-         */
-        rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL);
-        if (rc) {
-                CERROR("Can't disable trans for split, rc %d\n", rc);
-                GOTO(out, rc);
-        }
-
-        /** - Step2: Prepare the md memory */
-        ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
-        OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
-        if (ma->ma_lmv == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        /** - Step3: Create slave objects and fill the ma->ma_lmv */
-        rc = cmm_split_slaves_create(env, mo, ma);
-        if (rc) {
-                CERROR("Can't create slaves for split, rc %d\n", rc);
-                GOTO(cleanup, rc);
-        }
-
-        /** - Step4: Scan and split the object. */
-        rc = cmm_split_process_dir(env, mo, ma);
-        if (rc) {
-                CERROR("Can't scan and split, rc %d\n", rc);
-                GOTO(cleanup, rc);
-        }
-
-        /** - Step5: Set mea to the master object. */
-        buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
-        rc = mo_xattr_set(env, md_object_next(mo), buf,
-                          MDS_LMV_MD_NAME, 0);
-        if (rc) {
-                CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
-                GOTO(cleanup, rc);
-        }
-
-        /* set flag in cmm_object */
-        md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
-
-        /**
-         * - Finally, split succeed, tell client to repeat opetartion on correct
-         * MDT.
-         */
-        CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
-        rc = -ERESTART;
-        EXIT;
-cleanup:
-        OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
-out:
-        cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
-        return rc;
-}
-/** @} */
diff --git a/lustre/cmm/mdc_device.c b/lustre/cmm/mdc_device.c
deleted file mode 100644 (file)
index 58f72f0..0000000
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/mdc_device.c
- *
- * Lustre Metadata Client (mdc)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <obd.h>
-#include <obd_class.h>
-#include <lprocfs_status.h>
-#include <lustre_ver.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-static const struct lu_device_operations mdc_lu_ops;
-/**
- * \addtogroup cmm_mdc
- * @{
- */
-/**
- * The md_device_operation for mdc. It is empty.
- */
-static const struct md_device_operations mdc_md_ops = { 0 };
-
-/**
- * Upcall handler in mdc. Analog of obd_device::o_notify().
- */
-static int mdc_obd_update(struct obd_device *host,
-                          struct obd_device *watched,
-                          enum obd_notify_event ev, void *owner, void *data)
-{
-        struct mdc_device *mc = owner;
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(mc != NULL);
-        CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
-        if (ev == OBD_NOTIFY_ACTIVE) {
-                CDEBUG(D_INFO|D_WARNING, "Device %s is active now\n",
-                       watched->obd_name);
-        } else if (ev == OBD_NOTIFY_INACTIVE) {
-                CDEBUG(D_INFO|D_WARNING, "Device %s is inactive now\n",
-                       watched->obd_name);
-        } else if (ev == OBD_NOTIFY_OCD) {
-                struct obd_connect_data *conn_data =
-                                  &watched->u.cli.cl_import->imp_connect_data;
-                /*
-                 * Update exp_connect_flags.
-                 */
-                mc->mc_desc.cl_exp->exp_connect_flags =
-                                                conn_data->ocd_connect_flags;
-                CDEBUG(D_INFO, "Update connect_flags: "LPX64"\n",
-                       conn_data->ocd_connect_flags);
-        }
-
-        RETURN(rc);
-}
-/**
- * Add new mdc device.
- * Invoked by configuration command LCFG_ADD_MDC.
- *
- * MDC OBD is set up already and connected to the proper MDS
- * mdc_add_obd() find that obd by uuid and connects to it.
- * Local MDT uuid is used for connection.
- */
-static int mdc_obd_add(const struct lu_env *env,
-                       struct mdc_device *mc, struct lustre_cfg *cfg)
-{
-        struct mdc_cli_desc *desc = &mc->mc_desc;
-        struct obd_device *mdc;
-        const char *uuid_str = lustre_cfg_string(cfg, 1);
-        const char *index = lustre_cfg_string(cfg, 2);
-        const char *mdc_uuid_str = lustre_cfg_string(cfg, 4);
-        struct md_site *ms = lu_site2md(mdc2lu_dev(mc)->ld_site);
-        char *p;
-        int rc = 0;
-
-        ENTRY;
-        LASSERT(uuid_str);
-        LASSERT(index);
-
-        mc->mc_num = simple_strtol(index, &p, 10);
-        if (*p) {
-                CERROR("Invalid index in lustre_cgf, offset 2\n");
-                RETURN(-EINVAL);
-        }
-
-        obd_str2uuid(&desc->cl_srv_uuid, uuid_str);
-        obd_str2uuid(&desc->cl_cli_uuid, mdc_uuid_str);
-        /* try to find MDC OBD connected to the needed MDT */
-        mdc = class_find_client_obd(&desc->cl_srv_uuid, LUSTRE_MDC_NAME,
-                                    &desc->cl_cli_uuid);
-        if (!mdc) {
-                CERROR("Cannot find MDC OBD connected to %s\n", uuid_str);
-                rc = -ENOENT;
-        } else if (!mdc->obd_set_up) {
-                CERROR("target %s not set up\n", mdc->obd_name);
-                rc = -EINVAL;
-        } else {
-                struct obd_connect_data *ocd;
-
-                CDEBUG(D_CONFIG, "connect to %s(%s)\n",
-                       mdc->obd_name, mdc->obd_uuid.uuid);
-
-                OBD_ALLOC_PTR(ocd);
-                if (!ocd)
-                        RETURN(-ENOMEM);
-                /*
-                 * The connection between MDS must be local,
-                 * IBITS are needed for rename_lock (INODELOCK_UPDATE)
-                 */
-                ocd->ocd_ibits_known = MDS_INODELOCK_UPDATE;
-                ocd->ocd_connect_flags = OBD_CONNECT_VERSION |
-                                         OBD_CONNECT_ACL |
-                                         OBD_CONNECT_RMT_CLIENT |
-                                         OBD_CONNECT_MDS_CAPA |
-                                         OBD_CONNECT_OSS_CAPA |
-                                         OBD_CONNECT_IBITS |
-                                         OBD_CONNECT_BRW_SIZE |
-                                         OBD_CONNECT_MDS_MDS |
-                                         OBD_CONNECT_FID |
-                                         OBD_CONNECT_AT |
-                                         OBD_CONNECT_FULL20 |
-                                         OBD_CONNECT_64BITHASH;
-                ocd->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
-                rc = obd_connect(env, &desc->cl_exp, mdc, &mdc->obd_uuid, ocd, NULL);
-                OBD_FREE_PTR(ocd);
-                if (rc) {
-                        CERROR("target %s connect error %d\n",
-                               mdc->obd_name, rc);
-                } else {
-                        /* set seq controller export for MDC0 if exists */
-                        if (mc->mc_num == 0)
-                                ms->ms_control_exp =
-                                        class_export_get(desc->cl_exp);
-                        rc = obd_fid_init(desc->cl_exp);
-                        if (rc)
-                                CERROR("fid init error %d \n", rc);
-                        else {
-                                /* obd notify mechanism */
-                                mdc->obd_upcall.onu_owner = mc;
-                                mdc->obd_upcall.onu_upcall = mdc_obd_update;
-                        }
-                }
-
-                if (rc) {
-                        obd_disconnect(desc->cl_exp);
-                        desc->cl_exp = NULL;
-                }
-        }
-
-        RETURN(rc);
-}
-
-/**
- * Delete mdc device.
- * Called when configuration command LCFG_CLEANUP is issued.
- *
- * This disconnects MDC OBD and cleanup it.
- */
-static int mdc_obd_del(const struct lu_env *env, struct mdc_device *mc,
-                       struct lustre_cfg *cfg)
-{
-        struct mdc_cli_desc *desc = &mc->mc_desc;
-        const char *dev = lustre_cfg_string(cfg, 0);
-        struct obd_device *mdc_obd = class_exp2obd(desc->cl_exp);
-        struct obd_device *mdt_obd;
-        int rc;
-
-        ENTRY;
-
-        CDEBUG(D_CONFIG, "Disconnect from %s\n",
-               mdc_obd->obd_name);
-
-        /* Set mdt_obd flags in shutdown. */
-        mdt_obd = class_name2obd(dev);
-        LASSERT(mdt_obd != NULL);
-        if (mdc_obd) {
-                mdc_obd->obd_no_recov = mdt_obd->obd_no_recov;
-                mdc_obd->obd_force = mdt_obd->obd_force;
-                mdc_obd->obd_fail = 0;
-        }
-
-        rc = obd_fid_fini(desc->cl_exp);
-        if (rc)
-                CERROR("Fid fini error %d\n", rc);
-
-        obd_register_observer(mdc_obd, NULL);
-        mdc_obd->obd_upcall.onu_owner = NULL;
-        mdc_obd->obd_upcall.onu_upcall = NULL;
-        rc = obd_disconnect(desc->cl_exp);
-        if (rc) {
-                CERROR("Target %s disconnect error %d\n",
-                       mdc_obd->obd_name, rc);
-        }
-        class_manual_cleanup(mdc_obd);
-        desc->cl_exp = NULL;
-
-        RETURN(0);
-}
-
-/**
- * Process config command. Passed to the mdc from mdt.
- * Supports two commands only - LCFG_ADD_MDC and LCFG_CLEANUP
- */
-static int mdc_process_config(const struct lu_env *env,
-                              struct lu_device *ld,
-                              struct lustre_cfg *cfg)
-{
-        struct mdc_device *mc = lu2mdc_dev(ld);
-        int rc;
-
-        ENTRY;
-        switch (cfg->lcfg_command) {
-        case LCFG_ADD_MDC:
-                rc = mdc_obd_add(env, mc, cfg);
-                break;
-        case LCFG_CLEANUP:
-                rc = mdc_obd_del(env, mc, cfg);
-                break;
-        default:
-                rc = -EOPNOTSUPP;
-        }
-        RETURN(rc);
-}
-
-/**
- * lu_device_operations instance for mdc.
- */
-static const struct lu_device_operations mdc_lu_ops = {
-        .ldo_object_alloc   = mdc_object_alloc,
-        .ldo_process_config = mdc_process_config
-};
-
-/**
- * Initialize proper easize and cookie size.
- */
-void cmm_mdc_init_ea_size(const struct lu_env *env, struct mdc_device *mc,
-                      int max_mdsize, int max_cookiesize)
-{
-        struct obd_device *obd = class_exp2obd(mc->mc_desc.cl_exp);
-
-        obd->u.cli.cl_max_mds_easize = max_mdsize;
-        obd->u.cli.cl_max_mds_cookiesize = max_cookiesize;
-}
-
-/** Start mdc device */
-static int mdc_device_init(const struct lu_env *env, struct lu_device *ld,
-                           const char *name, struct lu_device *next)
-{
-        return 0;
-}
-
-/** Stop mdc device. */
-static struct lu_device *mdc_device_fini(const struct lu_env *env,
-                                         struct lu_device *ld)
-{
-        ENTRY;
-        RETURN (NULL);
-}
-
-/** Allocate new mdc device */
-static struct lu_device *mdc_device_alloc(const struct lu_env *env,
-                                          struct lu_device_type *ldt,
-                                          struct lustre_cfg *cfg)
-{
-        struct lu_device  *ld;
-        struct mdc_device *mc;
-        ENTRY;
-
-        OBD_ALLOC_PTR(mc);
-        if (mc == NULL) {
-                ld = ERR_PTR(-ENOMEM);
-        } else {
-                md_device_init(&mc->mc_md_dev, ldt);
-                mc->mc_md_dev.md_ops = &mdc_md_ops;
-                ld = mdc2lu_dev(mc);
-                ld->ld_ops = &mdc_lu_ops;
-                cfs_sema_init(&mc->mc_fid_sem, 1);
-        }
-
-        RETURN (ld);
-}
-
-/** Free mdc device */
-static struct lu_device *mdc_device_free(const struct lu_env *env,
-                                         struct lu_device *ld)
-{
-        struct mdc_device *mc = lu2mdc_dev(ld);
-
-        LASSERTF(cfs_atomic_read(&ld->ld_ref) == 0,
-                 "Refcount = %d\n", cfs_atomic_read(&ld->ld_ref));
-        LASSERT(cfs_list_empty(&mc->mc_linkage));
-        md_device_fini(&mc->mc_md_dev);
-        OBD_FREE_PTR(mc);
-        return NULL;
-}
-
-/** context key constructor/destructor: mdc_key_init, mdc_key_fini */
-LU_KEY_INIT_FINI(mdc, struct mdc_thread_info);
-
-/** context key: mdc_thread_key */
-LU_CONTEXT_KEY_DEFINE(mdc, LCT_MD_THREAD|LCT_CL_THREAD);
-
-/** type constructor/destructor: mdc_type_init, mdc_type_fini */
-LU_TYPE_INIT_FINI(mdc, &mdc_thread_key);
-
-static struct lu_device_type_operations mdc_device_type_ops = {
-        .ldto_init = mdc_type_init,
-        .ldto_fini = mdc_type_fini,
-
-        .ldto_start = mdc_type_start,
-        .ldto_stop  = mdc_type_stop,
-
-        .ldto_device_alloc = mdc_device_alloc,
-        .ldto_device_free  = mdc_device_free,
-
-        .ldto_device_init = mdc_device_init,
-        .ldto_device_fini = mdc_device_fini
-};
-
-struct lu_device_type mdc_device_type = {
-        .ldt_tags     = LU_DEVICE_MD,
-        .ldt_name     = LUSTRE_CMM_MDC_NAME,
-        .ldt_ops      = &mdc_device_type_ops,
-        .ldt_ctx_tags = LCT_MD_THREAD|LCT_CL_THREAD
-};
-/** @} */
diff --git a/lustre/cmm/mdc_internal.h b/lustre/cmm/mdc_internal.h
deleted file mode 100644 (file)
index 69eaae0..0000000
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/mdc_internal.h
- *
- * Lustre Cluster Metadata Manager (cmm), MDC device
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#ifndef _CMM_MDC_INTERNAL_H
-#define _CMM_MDC_INTERNAL_H
-
-#if defined(__KERNEL__)
-
-#include <lustre_net.h>
-#include <obd.h>
-#include <md_object.h>
-/**
- * \addtogroup cmm
- * @{
- */
-/**
- * \defgroup cmm_mdc cmm_mdc
- *
- * This is mdc wrapper device to work with old MDC obd-based devices.
- * @{
- */
-/**
- * MDC client description.
- */
-struct mdc_cli_desc {
-        /** uuid of remote MDT to connect */
-        struct obd_uuid          cl_srv_uuid;
-        /** mdc uuid */
-        struct obd_uuid          cl_cli_uuid;
-        /** export of mdc obd */
-        struct obd_export        *cl_exp;
-};
-
-/**
- * MDC device.
- */
-struct mdc_device {
-        /** md_device instance for MDC */
-        struct md_device        mc_md_dev;
-        /** other MD servers in cluster */
-        cfs_list_t              mc_linkage;
-        /** number of current device */
-        mdsno_t                 mc_num;
-        /** mdc client description */
-        struct mdc_cli_desc     mc_desc;
-        /** Protects ??*/
-        cfs_semaphore_t         mc_fid_sem;
-};
-
-/**
- * mdc thread info. Local storage for varios data.
- */
-struct mdc_thread_info {
-        /** Storage for md_op_data */
-        struct md_op_data       mci_opdata;
-        /** Storage for ptlrpc request */
-        struct ptlrpc_request  *mci_req;
-};
-
-/** mdc object. */
-struct mdc_object {
-        /** md_object instance for mdc_object */
-        struct md_object        mco_obj;
-};
-
-/** Get lu_device from mdc_device. */
-static inline struct lu_device *mdc2lu_dev(struct mdc_device *mc)
-{
-        return (&mc->mc_md_dev.md_lu_dev);
-}
-
-/** Get mdc_device from md_device. */
-static inline struct mdc_device *md2mdc_dev(struct md_device *md)
-{
-        return container_of0(md, struct mdc_device, mc_md_dev);
-}
-
-/** Get mdc_device from mdc_object. */
-static inline struct mdc_device *mdc_obj2dev(struct mdc_object *mco)
-{
-        return (md2mdc_dev(md_obj2dev(&mco->mco_obj)));
-}
-
-/** Get mdc_object from lu_object. */
-static inline struct mdc_object *lu2mdc_obj(struct lu_object *lo)
-{
-        return container_of0(lo, struct mdc_object, mco_obj.mo_lu);
-}
-
-/** Get mdc_object from md_object. */
-static inline struct mdc_object *md2mdc_obj(struct md_object *mo)
-{
-        return container_of0(mo, struct mdc_object, mco_obj);
-}
-
-/** Get mdc_device from lu_device. */
-static inline struct mdc_device *lu2mdc_dev(struct lu_device *ld)
-{
-        return container_of0(ld, struct mdc_device, mc_md_dev.md_lu_dev);
-}
-
-struct lu_object *mdc_object_alloc(const struct lu_env *,
-                                   const struct lu_object_header *,
-                                   struct lu_device *);
-
-void cmm_mdc_init_ea_size(const struct lu_env *env, struct mdc_device *mc,
-                      int max_mdsize, int max_cookiesize);
-#ifdef HAVE_SPLIT_SUPPORT
-int mdc_send_page(struct cmm_device *cmm, const struct lu_env *env,
-                  struct md_object *mo, struct page *page, __u32 end);
-#endif
-/** @} */
-/** @} */
-#endif /* __KERNEL__ */
-#endif /* _CMM_MDC_INTERNAL_H */
diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c
deleted file mode 100644 (file)
index 71dd7fa..0000000
+++ /dev/null
@@ -1,668 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/cmm/mdc_object.c
- *
- * Lustre Cluster Metadata Manager (cmm)
- *
- * Author: Mike Pershin <tappro@clusterfs.com>
- */
-
-#define DEBUG_SUBSYSTEM S_MDS
-#include <obd_support.h>
-#include <lustre_lib.h>
-#include <obd_class.h>
-#include <lustre_mdc.h>
-#include "cmm_internal.h"
-#include "mdc_internal.h"
-
-static const struct md_object_operations mdc_mo_ops;
-static const struct md_dir_operations mdc_dir_ops;
-static const struct lu_object_operations mdc_obj_ops;
-
-extern struct lu_context_key mdc_thread_key;
-/**
- * \addtogroup cmm_mdc
- * @{
- */
-/**
- * Allocate new mdc object.
- */
-struct lu_object *mdc_object_alloc(const struct lu_env *env,
-                                   const struct lu_object_header *hdr,
-                                   struct lu_device *ld)
-{
-        struct mdc_object *mco;
-        ENTRY;
-
-        OBD_ALLOC_PTR(mco);
-        if (mco != NULL) {
-                struct lu_object *lo;
-
-                lo = &mco->mco_obj.mo_lu;
-                lu_object_init(lo, NULL, ld);
-                mco->mco_obj.mo_ops = &mdc_mo_ops;
-                mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
-                lo->lo_ops = &mdc_obj_ops;
-                RETURN(lo);
-        } else
-                RETURN(NULL);
-}
-
-/** Free current mdc object */
-static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
-{
-        struct mdc_object *mco = lu2mdc_obj(lo);
-        lu_object_fini(lo);
-        OBD_FREE_PTR(mco);
-}
-
-/**
- * Initialize mdc object. All of them have loh_attr::LOHA_REMOTE set.
- */
-static int mdc_object_init(const struct lu_env *env, struct lu_object *lo,
-                           const struct lu_object_conf *unused)
-{
-        ENTRY;
-        lo->lo_header->loh_attr |= LOHA_REMOTE;
-        RETURN(0);
-}
-
-/**
- * Instance of lu_object_operations for mdc.
- */
-static const struct lu_object_operations mdc_obj_ops = {
-        .loo_object_init    = mdc_object_init,
-        .loo_object_free    = mdc_object_free,
-};
-
-/**
- * \name The set of md_object_operations.
- * @{
- */
-/**
- * Get mdc_thread_info from lu_context
- */
-static
-struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
-{
-        struct mdc_thread_info *mci;
-
-        mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
-        LASSERT(mci);
-        return mci;
-}
-
-/**
- * Initialize mdc_thread_info.
- */
-static
-struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
-{
-        struct mdc_thread_info *mci = mdc_info_get(env);
-        memset(mci, 0, sizeof(*mci));
-        return mci;
-}
-
-/**
- * Convert attributes from mdt_body to the md_attr.
- */
-static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
-{
-        struct lu_attr *la = &ma->ma_attr;
-        /* update time */
-        if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
-                la->la_ctime = body->ctime;
-                if (body->valid & OBD_MD_FLMTIME)
-                        la->la_mtime = body->mtime;
-        }
-
-        if (body->valid & OBD_MD_FLMODE)
-                la->la_mode = body->mode;
-        if (body->valid & OBD_MD_FLSIZE)
-                la->la_size = body->size;
-        if (body->valid & OBD_MD_FLBLOCKS)
-                la->la_blocks = body->blocks;
-        if (body->valid & OBD_MD_FLUID)
-                la->la_uid = body->uid;
-        if (body->valid & OBD_MD_FLGID)
-                la->la_gid = body->gid;
-        if (body->valid & OBD_MD_FLFLAGS)
-                la->la_flags = body->flags;
-        if (body->valid & OBD_MD_FLNLINK)
-                la->la_nlink = body->nlink;
-        if (body->valid & OBD_MD_FLRDEV)
-                la->la_rdev = body->rdev;
-
-        la->la_valid = body->valid;
-        ma->ma_valid = MA_INODE;
-}
-
-/**
- * Fill the md_attr \a ma with attributes from request.
- */
-static int mdc_req2attr_update(const struct lu_env *env,
-                               struct md_attr *ma)
-{
-        struct mdc_thread_info *mci;
-        struct ptlrpc_request *req;
-        struct mdt_body *body;
-        struct lov_mds_md *md;
-        struct llog_cookie *cookie;
-        void *acl;
-
-        ENTRY;
-        mci = mdc_info_get(env);
-        req = mci->mci_req;
-        LASSERT(req);
-        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-        LASSERT(body);
-        mdc_body2attr(body, ma);
-
-        if (body->valid & OBD_MD_FLMDSCAPA) {
-                struct lustre_capa *capa;
-
-                /* create for cross-ref will fetch mds capa from remote obj */
-                capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
-                LASSERT(capa != NULL);
-                LASSERT(ma->ma_capa != NULL);
-                *ma->ma_capa = *capa;
-        }
-
-        if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
-                if (body->eadatasize == 0) {
-                        CERROR("No size defined for easize field\n");
-                        RETURN(-EPROTO);
-                }
-
-                md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
-                                                  body->eadatasize);
-                if (md == NULL)
-                        RETURN(-EPROTO);
-
-                LASSERT(ma->ma_lmm != NULL);
-                LASSERT(ma->ma_lmm_size >= body->eadatasize);
-                ma->ma_lmm_size = body->eadatasize;
-                memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
-                ma->ma_valid |= MA_LOV;
-        }
-
-        if (body->valid & OBD_MD_FLCOOKIE) {
-                /*
-                 * ACL and cookie share the same body->aclsize, we need
-                 * to make sure that they both never come here.
-                 */
-                LASSERT(!(body->valid & OBD_MD_FLACL));
-
-                if (body->aclsize == 0) {
-                        CERROR("No size defined for cookie field\n");
-                        RETURN(-EPROTO);
-                }
-
-                cookie = req_capsule_server_sized_get(&req->rq_pill,
-                                                      &RMF_LOGCOOKIES,
-                                                      body->aclsize);
-                if (cookie == NULL)
-                        RETURN(-EPROTO);
-
-                LASSERT(ma->ma_cookie != NULL);
-                LASSERT(ma->ma_cookie_size == body->aclsize);
-                memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
-                ma->ma_valid |= MA_COOKIE;
-        }
-
-#ifdef CONFIG_FS_POSIX_ACL
-        if (body->valid & OBD_MD_FLACL) {
-                if (body->aclsize == 0) {
-                        CERROR("No size defined for acl field\n");
-                        RETURN(-EPROTO);
-                }
-
-                acl = req_capsule_server_sized_get(&req->rq_pill,
-                                                   &RMF_ACL,
-                                                   body->aclsize);
-                if (acl == NULL)
-                        RETURN(-EPROTO);
-
-                LASSERT(ma->ma_acl != NULL);
-                LASSERT(ma->ma_acl_size == body->aclsize);
-                memcpy(ma->ma_acl, acl, ma->ma_acl_size);
-                ma->ma_valid |= MA_ACL_DEF;
-        }
-#endif
-
-        RETURN(0);
-}
-
-/**
- * The md_object_operations::moo_attr_get() in mdc.
- */
-static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
-                        struct md_attr *ma)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        struct mdc_thread_info *mci;
-        int rc;
-        ENTRY;
-
-        mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
-        LASSERT(mci);
-
-        memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
-
-        memcpy(&mci->mci_opdata.op_fid1, lu_object_fid(&mo->mo_lu),
-               sizeof (struct lu_fid));
-        mci->mci_opdata.op_valid = OBD_MD_FLMODE | OBD_MD_FLUID |
-                                   OBD_MD_FLGID | OBD_MD_FLFLAGS |
-                                   OBD_MD_FLCROSSREF;
-
-        rc = md_getattr(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
-        if (rc == 0) {
-                /* get attr from request */
-                rc = mdc_req2attr_update(env, ma);
-        }
-
-        ptlrpc_req_finished(mci->mci_req);
-
-        RETURN(rc);
-}
-
-/**
- * Helper to init timspec \a t.
- */
-static inline struct timespec *mdc_attr_time(struct timespec *t, obd_time seconds)
-{
-        t->tv_sec = seconds;
-        t->tv_nsec = 0;
-        return t;
-}
-
-/**
- * The md_object_operations::moo_attr_set() in mdc.
- *
- * \note It is only used for set ctime when rename's source on remote MDS.
- */
-static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
-                        const struct md_attr *ma)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        const struct lu_attr *la = &ma->ma_attr;
-        struct mdc_thread_info *mci;
-        struct md_ucred *uc = md_ucred(env);
-        int rc;
-        ENTRY;
-
-        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-
-        mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
-        LASSERT(mci);
-
-        memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
-
-        mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
-        mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
-        mci->mci_opdata.op_attr.ia_mode = la->la_mode;
-        mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
-        if (uc &&
-            ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
-                mci->mci_opdata.op_fsuid = uc->mu_fsuid;
-                mci->mci_opdata.op_fsgid = uc->mu_fsgid;
-                mci->mci_opdata.op_cap = uc->mu_cap;
-                if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
-                        mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
-                        mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
-                } else {
-                        mci->mci_opdata.op_suppgids[0] =
-                                mci->mci_opdata.op_suppgids[1] = -1;
-                }
-        } else {
-                mci->mci_opdata.op_fsuid = la->la_uid;
-                mci->mci_opdata.op_fsgid = la->la_gid;
-                mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
-                mci->mci_opdata.op_suppgids[0] =
-                                mci->mci_opdata.op_suppgids[1] = -1;
-        }
-
-        rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
-                        NULL, 0, NULL, 0, &mci->mci_req, NULL);
-
-        ptlrpc_req_finished(mci->mci_req);
-
-        RETURN(rc);
-}
-
-/**
- * The md_object_operations::moo_object_create() in mdc.
- */
-static int mdc_object_create(const struct lu_env *env,
-                             struct md_object *mo,
-                             const struct md_op_spec *spec,
-                             struct md_attr *ma)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        struct lu_attr *la = &ma->ma_attr;
-        struct mdc_thread_info *mci;
-        const void *symname;
-        struct md_ucred *uc = md_ucred(env);
-        int rc, symlen;
-        uid_t uid;
-        gid_t gid;
-        cfs_cap_t cap;
-        ENTRY;
-
-        LASSERT(S_ISDIR(la->la_mode));
-        LASSERT(spec->u.sp_pfid != NULL);
-
-        mci = mdc_info_init(env);
-        mci->mci_opdata.op_bias = MDS_CROSS_REF;
-        mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
-
-        /* Parent fid is needed to create dotdot on the remote node. */
-        mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
-        mci->mci_opdata.op_mod_time = la->la_ctime;
-        if (uc &&
-            ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
-                uid = uc->mu_fsuid;
-                if (la->la_mode & S_ISGID)
-                        gid = la->la_gid;
-                else
-                        gid = uc->mu_fsgid;
-                cap = uc->mu_cap;
-                if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
-                        mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
-                else
-                        mci->mci_opdata.op_suppgids[0] = -1;
-        } else {
-                uid = la->la_uid;
-                gid = la->la_gid;
-                cap = 0;
-                mci->mci_opdata.op_suppgids[0] = -1;
-        }
-
-        /* get data from spec */
-        if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
-                symname = spec->u.sp_ea.eadata;
-                symlen = spec->u.sp_ea.eadatalen;
-                mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
-                mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
-#ifdef CONFIG_FS_POSIX_ACL
-        } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
-                symname = spec->u.sp_ea.eadata;
-                symlen = spec->u.sp_ea.eadatalen;
-                mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
-                mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
-#endif
-        } else {
-                symname = spec->u.sp_symname;
-                symlen = symname ? strlen(symname) + 1 : 0;
-        }
-
-        rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
-                       symname, symlen, la->la_mode, uid, gid,
-                       cap, la->la_rdev, &mci->mci_req);
-
-        if (rc == 0) {
-                /* get attr from request */
-                rc = mdc_req2attr_update(env, ma);
-        }
-
-        ptlrpc_req_finished(mci->mci_req);
-
-        RETURN(rc);
-}
-
-/**
- * The md_object_operations::moo_ref_add() in mdc.
- */
-static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
-                       const struct md_attr *ma)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        const struct lu_attr *la = &ma->ma_attr;
-        struct mdc_thread_info *mci;
-        struct md_ucred *uc = md_ucred(env);
-        int rc;
-        ENTRY;
-
-        mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
-        LASSERT(mci);
-
-        memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
-        mci->mci_opdata.op_bias = MDS_CROSS_REF;
-        mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
-        mci->mci_opdata.op_mod_time = la->la_ctime;
-        if (uc &&
-            ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
-                mci->mci_opdata.op_fsuid = uc->mu_fsuid;
-                mci->mci_opdata.op_fsgid = uc->mu_fsgid;
-                mci->mci_opdata.op_cap = uc->mu_cap;
-                if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
-                        mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
-                        mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
-                } else {
-                        mci->mci_opdata.op_suppgids[0] =
-                                mci->mci_opdata.op_suppgids[1] = -1;
-                }
-        } else {
-                mci->mci_opdata.op_fsuid = la->la_uid;
-                mci->mci_opdata.op_fsgid = la->la_gid;
-                mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
-                mci->mci_opdata.op_suppgids[0] =
-                                mci->mci_opdata.op_suppgids[1] = -1;
-        }
-
-
-        rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
-
-        ptlrpc_req_finished(mci->mci_req);
-
-        RETURN(rc);
-}
-
-/**
- * The md_object_operations::moo_ref_del() in mdc.
- */
-static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
-                       struct md_attr *ma)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        struct lu_attr *la = &ma->ma_attr;
-        struct mdc_thread_info *mci;
-        struct md_ucred *uc = md_ucred(env);
-        int rc;
-        ENTRY;
-
-        mci = mdc_info_init(env);
-        mci->mci_opdata.op_bias = MDS_CROSS_REF;
-        if (ma->ma_attr_flags & MDS_VTX_BYPASS)
-                mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
-        else
-                mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
-        mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
-        mci->mci_opdata.op_mode = la->la_mode;
-        mci->mci_opdata.op_mod_time = la->la_ctime;
-        if (uc &&
-            ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
-                mci->mci_opdata.op_fsuid = uc->mu_fsuid;
-                mci->mci_opdata.op_fsgid = uc->mu_fsgid;
-                mci->mci_opdata.op_cap = uc->mu_cap;
-                if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
-                        mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
-                else
-                        mci->mci_opdata.op_suppgids[0] = -1;
-        } else {
-                mci->mci_opdata.op_fsuid = la->la_uid;
-                mci->mci_opdata.op_fsgid = la->la_gid;
-                mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
-                mci->mci_opdata.op_suppgids[0] = -1;
-        }
-
-        rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
-        if (rc == 0) {
-                /* get attr from request */
-                rc = mdc_req2attr_update(env, ma);
-        }
-
-        ptlrpc_req_finished(mci->mci_req);
-
-        RETURN(rc);
-}
-
-#ifdef HAVE_SPLIT_SUPPORT
-/** Send page with directory entries to another MDS. */
-int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
-                  struct md_object *mo, struct page *page, __u32 offset)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        int rc;
-        ENTRY;
-
-        rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
-                          page, offset);
-        CDEBUG(rc ? D_ERROR : D_INFO, "send page %p  offset %d fid "DFID
-               " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
-        RETURN(rc);
-}
-#endif
-
-/**
- * Instance of md_object_operations for mdc.
- */
-static const struct md_object_operations mdc_mo_ops = {
-        .moo_attr_get       = mdc_attr_get,
-        .moo_attr_set       = mdc_attr_set,
-        .moo_object_create  = mdc_object_create,
-        .moo_ref_add        = mdc_ref_add,
-        .moo_ref_del        = mdc_ref_del,
-};
-/** @} */
-
-/**
- * \name The set of md_dir_operations.
- * @{
- */
-/**
- * The md_dir_operations::mdo_rename_tgt in mdc.
- */
-static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
-                          struct md_object *mo_t, const struct lu_fid *lf,
-                          const struct lu_name *lname, struct md_attr *ma)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
-        struct lu_attr *la = &ma->ma_attr;
-        struct mdc_thread_info *mci;
-        struct md_ucred *uc = md_ucred(env);
-        int rc;
-        ENTRY;
-
-        mci = mdc_info_init(env);
-        mci->mci_opdata.op_bias = MDS_CROSS_REF;
-        if (ma->ma_attr_flags & MDS_VTX_BYPASS)
-                mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
-        else
-                mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
-        mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
-        mci->mci_opdata.op_fid2 = *lf;
-        mci->mci_opdata.op_mode = la->la_mode;
-        mci->mci_opdata.op_mod_time = la->la_ctime;
-        if (uc &&
-            ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
-                mci->mci_opdata.op_fsuid = uc->mu_fsuid;
-                mci->mci_opdata.op_fsgid = uc->mu_fsgid;
-                mci->mci_opdata.op_cap = uc->mu_cap;
-                if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
-                        mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
-                        mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
-                } else {
-                        mci->mci_opdata.op_suppgids[0] =
-                                mci->mci_opdata.op_suppgids[1] = -1;
-                }
-        } else {
-                mci->mci_opdata.op_fsuid = la->la_uid;
-                mci->mci_opdata.op_fsgid = la->la_gid;
-                mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
-                mci->mci_opdata.op_suppgids[0] =
-                                mci->mci_opdata.op_suppgids[1] = -1;
-        }
-
-        rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
-                       lname->ln_name, lname->ln_namelen, &mci->mci_req);
-        if (rc == 0) {
-                /* get attr from request */
-                mdc_req2attr_update(env, ma);
-        }
-
-        ptlrpc_req_finished(mci->mci_req);
-
-        RETURN(rc);
-}
-/**
- * Check the fids are not relatives.
- * The md_dir_operations::mdo_is_subdir() in mdc.
- *
- * Return resulting fid in sfid.
- * \retval \a sfid = 0 fids are not relatives
- * \retval \a sfid = FID at which search stopped
- */
-static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
-                         const struct lu_fid *fid, struct lu_fid *sfid)
-{
-        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        struct mdc_thread_info *mci;
-        struct mdt_body *body;
-        int rc;
-        ENTRY;
-
-        mci = mdc_info_init(env);
-
-        rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
-                          fid, &mci->mci_req);
-        if (rc == 0 || rc == -EREMOTE) {
-                body = req_capsule_server_get(&mci->mci_req->rq_pill,
-                                              &RMF_MDT_BODY);
-                LASSERT(body->valid & OBD_MD_FLID);
-
-                CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
-                       PFID(&body->fid1));
-                *sfid = body->fid1;
-        }
-        ptlrpc_req_finished(mci->mci_req);
-        RETURN(rc);
-}
-
-/** Instance of md_dir_operations for mdc. */
-static const struct md_dir_operations mdc_dir_ops = {
-        .mdo_is_subdir   = mdc_is_subdir,
-        .mdo_rename_tgt  = mdc_rename_tgt
-};
-/** @} */
index 2c24b9c..4fdbe67 100644 (file)
@@ -2083,11 +2083,7 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa);
                                            * anymore, reserve this flags
                                            * just for preventing such bit
                                            * to be reused. */
-#define MDS_CREATE_RMT_ACL    01000000000 /* indicate create on remote server
-                                           * with default ACL */
-#define MDS_CREATE_SLAVE_OBJ  02000000000 /* indicate create slave object
-                                           * actually, this is for create, not
-                                           * conflict with other open flags */
+
 #define MDS_OPEN_LOCK         04000000000 /* This open requires open lock */
 #define MDS_OPEN_HAS_EA      010000000000 /* specify object create pattern */
 #define MDS_OPEN_HAS_OBJS    020000000000 /* Just set the EA the obj exist */
index 487fdaf..dc49759 100644 (file)
@@ -90,8 +90,6 @@
 #define LDD_F_UPGRADE14     0x0200
 /** process as lctl conf_param */
 #define LDD_F_PARAM         0x0400
-/** backend fs make use of IAM directory format. */
-#define LDD_F_IAM_DIR       0x0800
 /** all nodes are specified as service nodes */
 #define LDD_F_NO_PRIMNODE   0x1000
 /** IR enable flag */
 #define LDD_F_OPC_READY 0x40000000
 #define LDD_F_OPC_MASK  0xf0000000
 
-#define LDD_F_ONDISK_MASK  (LDD_F_SV_TYPE_MASK | LDD_F_IAM_DIR)
+#define LDD_F_ONDISK_MASK  (LDD_F_SV_TYPE_MASK)
 
 #define LDD_F_MASK          0xFFFF
 
index 33c0f9a..376522f 100644 (file)
@@ -214,9 +214,6 @@ struct md_op_spec {
         /** Current lock mode for parent dir where create is performing. */
         mdl_mode_t sp_cr_mode;
 
-        /** Check for split */
-        int        sp_ck_split;
-
         /** to create directory */
         const struct dt_index_features *sp_feat;
 };
index f752015..4a84abe 100644 (file)
@@ -1023,7 +1023,6 @@ static int mdt_getattr(struct mdt_thread_info *info)
         if (unlikely(rc))
                 GOTO(out_shrink, rc);
 
-        info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
 
         /*
@@ -1404,7 +1403,6 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
         LASSERT(repbody != NULL);
 
-        info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
@@ -1569,180 +1567,6 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
-#ifdef HAVE_SPLIT_SUPPORT
-/*
- * Retrieve dir entry from the page and insert it to the slave object, actually,
- * this should be in osd layer, but since it will not in the final product, so
- * just do it here and do not define more moo api anymore for this.
- */
-static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
-                              int size)
-{
-        struct mdt_object *object = info->mti_object;
-        struct lu_fid *lf = &info->mti_tmp_fid2;
-        struct md_attr *ma = &info->mti_attr;
-        struct lu_dirpage *dp;
-        struct lu_dirent *ent;
-        int rc = 0, offset = 0;
-        ENTRY;
-
-        /* Make sure we have at least one entry. */
-        if (size == 0)
-                RETURN(-EINVAL);
-
-        /*
-         * Disable trans for this name insert, since it will include many trans
-         * for this.
-         */
-        info->mti_no_need_trans = 1;
-        /*
-         * When write_dir_page, no need update parent's ctime,
-         * and no permission check for name_insert.
-         */
-        ma->ma_attr.la_ctime = 0;
-        ma->ma_attr.la_valid = LA_MODE;
-        ma->ma_valid = MA_INODE;
-
-        cfs_kmap(page);
-        dp = page_address(page);
-        offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp);
-
-        for (ent = lu_dirent_start(dp); ent != NULL;
-             ent = lu_dirent_next(ent)) {
-                struct lu_name *lname;
-                char *name;
-
-                if (le16_to_cpu(ent->lde_namelen) == 0)
-                        continue;
-
-                fid_le_to_cpu(lf, &ent->lde_fid);
-                if (le64_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT)
-                        ma->ma_attr.la_mode = S_IFDIR;
-                else
-                        ma->ma_attr.la_mode = 0;
-                OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
-                if (name == NULL)
-                        GOTO(out, rc = -ENOMEM);
-
-                memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
-                lname = mdt_name(info->mti_env, name,
-                                 le16_to_cpu(ent->lde_namelen));
-                ma->ma_attr_flags |= (MDS_PERM_BYPASS | MDS_QUOTA_IGNORE);
-                rc = mdo_name_insert(info->mti_env,
-                                     md_object_next(&object->mot_obj),
-                                     lname, lf, ma);
-                OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
-                if (rc) {
-                        CERROR("Can't insert %*.*s, rc %d\n",
-                               le16_to_cpu(ent->lde_namelen),
-                               le16_to_cpu(ent->lde_namelen),
-                               ent->lde_name, rc);
-                        GOTO(out, rc);
-                }
-
-                offset += lu_dirent_size(ent);
-                if (offset >= size)
-                        break;
-        }
-        EXIT;
-out:
-        cfs_kunmap(page);
-        return rc;
-}
-
-static int mdt_bulk_timeout(void *data)
-{
-        ENTRY;
-
-        CERROR("mdt bulk transfer timeout \n");
-
-        RETURN(1);
-}
-
-static int mdt_writepage(struct mdt_thread_info *info)
-{
-        struct ptlrpc_request   *req = mdt_info_req(info);
-        struct mdt_body         *reqbody;
-        struct l_wait_info      *lwi;
-        struct ptlrpc_bulk_desc *desc;
-        struct page             *page;
-        int                rc;
-        ENTRY;
-
-
-        reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
-        if (reqbody == NULL)
-                RETURN(err_serious(-EFAULT));
-
-        desc = ptlrpc_prep_bulk_exp(req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
-        if (desc == NULL)
-                RETURN(err_serious(-ENOMEM));
-
-        /* allocate the page for the desc */
-        page = cfs_alloc_page(CFS_ALLOC_STD);
-        if (page == NULL)
-                GOTO(desc_cleanup, rc = -ENOMEM);
-
-        CDEBUG(D_INFO, "Received page offset %d size %d \n",
-               (int)reqbody->size, (int)reqbody->nlink);
-
-        ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
-                              (int)reqbody->nlink);
-
-        rc = sptlrpc_svc_prep_bulk(req, desc);
-        if (rc != 0)
-                GOTO(cleanup_page, rc);
-        /*
-         * Check if client was evicted while we were doing i/o before touching
-         * network.
-         */
-        OBD_ALLOC_PTR(lwi);
-        if (!lwi)
-                GOTO(cleanup_page, rc = -ENOMEM);
-
-        if (desc->bd_export->exp_failed)
-                rc = -ENOTCONN;
-        else
-                rc = ptlrpc_start_bulk_transfer (desc);
-        if (rc == 0) {
-                *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * CFS_HZ / 4, CFS_HZ,
-                                            mdt_bulk_timeout, desc);
-                rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
-                                  desc->bd_export->exp_failed, lwi);
-                LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                if (rc == -ETIMEDOUT) {
-                        DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
-                        ptlrpc_abort_bulk(desc);
-                } else if (desc->bd_export->exp_failed) {
-                        DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
-                        rc = -ENOTCONN;
-                        ptlrpc_abort_bulk(desc);
-                } else if (!desc->bd_success ||
-                           desc->bd_nob_transferred != desc->bd_nob) {
-                        DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
-                                  desc->bd_success ?
-                                  "truncated" : "network error on",
-                                  desc->bd_nob_transferred, desc->bd_nob);
-                        /* XXX should this be a different errno? */
-                        rc = -ETIMEDOUT;
-                }
-        } else {
-                DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
-        }
-        if (rc)
-                GOTO(cleanup_lwi, rc);
-        rc = mdt_write_dir_page(info, page, reqbody->nlink);
-
-cleanup_lwi:
-        OBD_FREE_PTR(lwi);
-cleanup_page:
-        cfs_free_page(page);
-desc_cleanup:
-       ptlrpc_free_bulk_pin(desc);
-        RETURN(rc);
-}
-#endif
-
 static int mdt_readpage(struct mdt_thread_info *info)
 {
         struct mdt_object *object = info->mti_object;
@@ -3124,7 +2948,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
        info->mti_big_lmm_used = 0;
 
         /* To not check for split by default. */
-        info->mti_spec.sp_ck_split = 0;
         info->mti_spec.no_create = 0;
 }
 
@@ -3715,7 +3538,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
         LASSERT(repbody);
 
-        info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
@@ -5147,12 +4969,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 /* CMD is supported only in IAM mode */
                 LASSERT(num);
                 node_id = simple_strtol(num, NULL, 10);
-               if (!(lsi->lsi_flags & LDD_F_IAM_DIR) && node_id) {
-                        CERROR("CMD Operation not allowed in IOP mode\n");
-                        GOTO(err_lmi, rc = -EINVAL);
-                }
-
-                obd->u.obt.obt_magic = OBT_MAGIC;
+               obd->u.obt.obt_magic = OBT_MAGIC;
         }
 
         cfs_rwlock_init(&m->mdt_sptlrpc_lock);
@@ -5183,7 +5000,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        rc = mdt_stack_init((struct lu_env *)env, m, cfg);
        if (rc) {
                CERROR("Can't init device stack, rc %d\n", rc);
-               RETURN(rc);
+               GOTO(err_lmi, rc);
        }
 
        s = m->mdt_md_dev.md_lu_dev.ld_site;
index d5b125b..5b40877 100644 (file)
@@ -939,62 +939,18 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                          LA_CTIME | LA_MTIME | LA_ATIME;
         memset(&sp->u, 0, sizeof(sp->u));
         sp->sp_cr_flags = get_mrc_cr_flags(rec);
-        sp->sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT);
-        info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF);
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
                                  req_capsule_client_get(pill, &RMF_CAPA1));
         mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
 
-        if (!info->mti_cross_ref) {
-                rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
-                rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME,
-                                                      RCL_CLIENT) - 1;
-                LASSERT(rr->rr_name && rr->rr_namelen > 0);
-        } else {
-                rr->rr_name = NULL;
-                rr->rr_namelen = 0;
-        }
+       rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+       rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME,
+                                             RCL_CLIENT) - 1;
+       LASSERT(rr->rr_name && rr->rr_namelen > 0);
 
-#ifdef CONFIG_FS_POSIX_ACL
-        if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) {
-                if (S_ISDIR(attr->la_mode))
-                        sp->u.sp_pfid = rr->rr_fid1;
-                req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
-                LASSERT(req_capsule_field_present(pill, &RMF_EADATA,
-                                                  RCL_CLIENT));
-                rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA);
-                rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
-                                                        RCL_CLIENT);
-                sp->u.sp_ea.eadata = rr->rr_eadata;
-                sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
-                sp->u.sp_ea.fid = rr->rr_fid1;
-                RETURN(0);
-        }
-#endif
-        if (S_ISDIR(attr->la_mode)) {
-                /* pass parent fid for cross-ref cases */
-                sp->u.sp_pfid = rr->rr_fid1;
-                if (sp->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
-                        /* create salve object req, need
-                         * unpack split ea here
-                         */
-                       req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SLAVE);
-                       LASSERT(req_capsule_field_present(pill, &RMF_EADATA,
-                                                         RCL_CLIENT));
-                       rr->rr_eadata = req_capsule_client_get(pill,
-                                                              &RMF_EADATA);
-                       rr->rr_eadatalen = req_capsule_get_size(pill,
-                                                               &RMF_EADATA,
-                                                               RCL_CLIENT);
-                       sp->u.sp_ea.eadata = rr->rr_eadata;
-                       sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
-                       sp->u.sp_ea.fid = rr->rr_fid1;
-                       RETURN(0);
-                }
-                req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
-        } else if (S_ISLNK(attr->la_mode)) {
+       if (S_ISLNK(attr->la_mode)) {
                 const char *tgt = NULL;
 
                 req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SYM);
@@ -1007,6 +963,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
         } else {
                 req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
         }
+
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
 }
@@ -1047,14 +1004,12 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
                 mdt_set_capainfo(info, 1, rr->rr_fid2,
                                  req_capsule_client_get(pill, &RMF_CAPA2));
 
-        info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT);
-        info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF);
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
-        if (!info->mti_cross_ref)
-                LASSERT(rr->rr_namelen > 0);
+
+       LASSERT(rr->rr_namelen > 0);
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
@@ -1095,17 +1050,11 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
                                  req_capsule_client_get(pill, &RMF_CAPA1));
 
-        info->mti_cross_ref = !!(rec->ul_bias & MDS_CROSS_REF);
-        if (!info->mti_cross_ref) {
-                rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
-                rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
-                if (rr->rr_name == NULL || rr->rr_namelen == 0)
-                        RETURN(-EFAULT);
-        } else {
-                rr->rr_name = NULL;
-                rr->rr_namelen = 0;
-        }
-        info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT);
+       rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+       rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
+       if (rr->rr_name == NULL || rr->rr_namelen == 0)
+               RETURN(-EFAULT);
+
         if (rec->ul_bias & MDS_VTX_BYPASS)
                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
         else
@@ -1156,16 +1105,14 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
                 mdt_set_capainfo(info, 1, rr->rr_fid2,
                                  req_capsule_client_get(pill, &RMF_CAPA2));
 
-        info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT);
-        info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF);
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
         if (rr->rr_name == NULL || rr->rr_tgt == NULL)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
         rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT) - 1;
-        if (!info->mti_cross_ref)
-                LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0);
+       LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0);
+
         if (rec->rn_bias & MDS_VTX_BYPASS)
                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
         else
@@ -1242,7 +1189,6 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
                 RETURN(-EPROTO);
         info->mti_replayepoch = rec->cr_ioepoch;
 
-        info->mti_spec.sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT);
         info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF);
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
index 30e8ece..909bd10 100644 (file)
@@ -268,9 +268,6 @@ static int mdt_server_data_init(const struct lu_env *env,
                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
         }
 
-       if (lsi->lsi_flags & LDD_F_IAM_DIR)
-                lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR;
-
         lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
 
         cfs_spin_lock(&mdt->mdt_lut.lut_translock);
index d4f2c6f..d0c072b 100644 (file)
@@ -357,56 +357,6 @@ out_put_parent:
         RETURN(rc);
 }
 
-/* Partial request to create object only */
-static int mdt_md_mkobj(struct mdt_thread_info *info)
-{
-        struct mdt_device      *mdt = info->mti_mdt;
-        struct mdt_object      *o;
-        struct mdt_body        *repbody;
-        struct md_attr         *ma = &info->mti_attr;
-        int rc;
-        ENTRY;
-
-        DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"",
-                  PFID(info->mti_rr.rr_fid2));
-
-        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-
-        o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
-        if (!IS_ERR(o)) {
-                struct md_object *next = mdt_object_child(o);
-
-                ma->ma_need = MA_INODE;
-                ma->ma_valid = 0;
-
-                /*
-                 * Cross-ref create can encounter already created obj in case of
-                 * recovery, just get attr in that case.
-                 */
-                if (mdt_object_exists(o) == 1) {
-                       rc = mdt_attr_get_complex(info, o, ma);
-                } else {
-                        /*
-                         * Here, NO permission check for object_create,
-                         * such check has been done on the original MDS.
-                         */
-                        rc = mo_object_create(info->mti_env, next,
-                                              &info->mti_spec, ma);
-                }
-                if (rc == 0) {
-                        /* Return fid & attr to client. */
-                        if (ma->ma_valid & MA_INODE)
-                                mdt_pack_attr2body(info, repbody, &ma->ma_attr,
-                                                   mdt_object_fid(o));
-                }
-                mdt_object_put(info->mti_env, o);
-        } else
-                rc = PTR_ERR(o);
-
-        mdt_create_pack_capa(info, rc, o, repbody);
-        RETURN(rc);
-}
-
 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                  struct md_attr *ma, int flags)
 {
@@ -614,35 +564,29 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         if (info->mti_dlm_req)
                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
 
+       LASSERT(info->mti_rr.rr_namelen > 0);
         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
-        case S_IFDIR:{
-                /* Cross-ref case. */
-                /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */
-                if (info->mti_cross_ref) {
-                        rc = mdt_md_mkobj(info);
-                } else {
-                        LASSERT(info->mti_rr.rr_namelen > 0);
-                       mdt_counter_incr(req, LPROC_MDT_MKDIR);
-                        rc = mdt_md_create(info);
-                }
-                break;
-        }
+       case S_IFDIR:
+               mdt_counter_incr(req, LPROC_MDT_MKDIR);
+               break;
         case S_IFREG:
         case S_IFLNK:
         case S_IFCHR:
         case S_IFBLK:
         case S_IFIFO:
-        case S_IFSOCK:{
-                /* Special file should stay on the same node as parent. */
-                LASSERT(info->mti_rr.rr_namelen > 0);
+       case S_IFSOCK:
+               /* Special file should stay on the same node as parent. */
                mdt_counter_incr(req, LPROC_MDT_MKNOD);
-                rc = mdt_md_create(info);
-                break;
-        }
-        default:
-                rc = err_serious(-EOPNOTSUPP);
-        }
-        RETURN(rc);
+               break;
+       default:
+               CERROR("%s: Unsupported mode %o\n",
+                      mdt2obd_dev(info->mti_mdt)->obd_name,
+                      info->mti_attr.ma_attr.la_mode);
+               RETURN(err_serious(-EOPNOTSUPP));
+       }
+
+       rc = mdt_md_create(info);
+       RETURN(rc);
 }
 
 /*
@@ -674,29 +618,16 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 RETURN(err_serious(-ENOENT));
 
         /*
-         * step 1: lock the parent. Note, this may be child in case of
-         * remote operation denoted by ->mti_cross_ref flag.
+        * step 1: lock the parent.
          */
         parent_lh = &info->mti_lh[MDT_LH_PARENT];
-        if (info->mti_cross_ref) {
-                /*
-                 * Init reg lock for cross ref case when we need to do only
-                 * ref del locally.
-                 */
-                mdt_lock_reg_init(parent_lh, LCK_PW);
-        } else {
-                mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
-                                  rr->rr_namelen);
-        }
+       mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
+                         rr->rr_namelen);
+
         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
                                   MDS_INODELOCK_UPDATE);
-        if (IS_ERR(mp)) {
-                rc = PTR_ERR(mp);
-                /* errors are possible here in cross-ref cases, see below */
-                if (info->mti_cross_ref)
-                        rc = 0;
-                GOTO(out, rc);
-        }
+       if (IS_ERR(mp))
+               GOTO(out, rc = PTR_ERR(mp));
 
         if (mdt_object_obf(mp))
                 GOTO(out_unlock_parent, rc = -EPERM);
@@ -707,24 +638,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         mdt_reint_init_ma(info, ma);
 
-        if (info->mti_cross_ref) {
-                /*
-                 * Remote partial operation. It is possible that replay may
-                 * happen on parent MDT and this operation will be repeated.
-                 * Therefore the object absense is allowed case and nothing
-                 * should be done here.
-                 */
-                if (mdt_object_exists(mp) > 0) {
-                        mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
-                        rc = mo_ref_del(info->mti_env,
-                                        mdt_object_child(mp), ma);
-                        if (rc == 0)
-                                mdt_handle_last_unlink(info, mp, ma);
-                } else
-                        rc = 0;
-                GOTO(out_unlock_parent, rc);
-        }
-
         /* step 2: find & lock the child */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         /* lookup child object along with version checking */
@@ -819,21 +732,6 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (info->mti_dlm_req)
                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
 
-        if (info->mti_cross_ref) {
-                /* MDT holding name ask us to add ref. */
-                lhs = &info->mti_lh[MDT_LH_CHILD];
-                mdt_lock_reg_init(lhs, LCK_EX);
-                ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
-                                          MDS_INODELOCK_UPDATE);
-                if (IS_ERR(ms))
-                        RETURN(PTR_ERR(ms));
-
-                mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
-                rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma);
-                mdt_object_unlock_put(info, ms, lhs, rc);
-                RETURN(rc);
-        }
-
         /* Invalid case so return error immediately instead of
          * processing it */
         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
@@ -935,79 +833,6 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
         return rc;
 }
 
-/* partial operation for rename */
-static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
-{
-        struct mdt_reint_record *rr = &info->mti_rr;
-        struct ptlrpc_request   *req = mdt_info_req(info);
-        struct md_attr          *ma = &info->mti_attr;
-        struct mdt_object       *mtgtdir;
-        struct mdt_object       *mtgt = NULL;
-        struct mdt_lock_handle  *lh_tgtdir;
-        struct mdt_lock_handle  *lh_tgt = NULL;
-        struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
-        struct lu_name          *lname;
-        int                      rc;
-        ENTRY;
-
-        DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
-                  rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
-
-        /* step 1: lookup & lock the tgt dir. */
-        lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
-                          rr->rr_tgtlen);
-        mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
-                                       MDS_INODELOCK_UPDATE);
-        if (IS_ERR(mtgtdir))
-                RETURN(PTR_ERR(mtgtdir));
-
-        /* step 2: find & lock the target object if exists. */
-        mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
-        lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
-        rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
-                        lname, tgt_fid, &info->mti_spec);
-        if (rc != 0 && rc != -ENOENT) {
-                GOTO(out_unlock_tgtdir, rc);
-        } else if (rc == 0) {
-                /*
-                 * In case of replay that name can be already inserted, check
-                 * that and do nothing if so.
-                 */
-                if (lu_fid_eq(tgt_fid, rr->rr_fid2))
-                        GOTO(out_unlock_tgtdir, rc);
-
-                lh_tgt = &info->mti_lh[MDT_LH_CHILD];
-                mdt_lock_reg_init(lh_tgt, LCK_EX);
-
-                mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
-                                            MDS_INODELOCK_LOOKUP);
-                if (IS_ERR(mtgt))
-                        GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
-
-                mdt_reint_init_ma(info, ma);
-
-                rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
-                                    mdt_object_child(mtgt), rr->rr_fid2,
-                                    lname, ma);
-        } else /* -ENOENT */ {
-                rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
-                                     lname, rr->rr_fid2, ma);
-        }
-
-        /* handle last link of tgt object */
-        if (rc == 0 && mtgt)
-                mdt_handle_last_unlink(info, mtgt, ma);
-
-        EXIT;
-
-        if (mtgt)
-                mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
-out_unlock_tgtdir:
-        mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
-        return rc;
-}
-
 static int mdt_rename_lock(struct mdt_thread_info *info,
                            struct lustre_handle *lh)
 {
@@ -1126,11 +951,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (info->mti_dlm_req)
                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
 
-        if (info->mti_cross_ref) {
-                rc = mdt_reint_rename_tgt(info);
-                RETURN(rc);
-        }
-
         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
                   PFID(rr->rr_fid1), rr->rr_name,
                   PFID(rr->rr_fid2), rr->rr_tgt);
index 77c9ce8..2b85c6d 100644 (file)
@@ -132,7 +132,6 @@ static int llo_lookup(const struct lu_env  *env,
         spec->sp_cr_flags = 0;
         spec->sp_cr_lookup = 0;
         spec->sp_cr_mode = 0;
-        spec->sp_ck_split = 0;
 
         lname->ln_name = name;
         lname->ln_namelen = strlen(name);
@@ -284,7 +283,6 @@ static struct md_object *llo_create_obj(const struct lu_env *env,
         spec->sp_cr_flags = 0;
         spec->sp_cr_lookup = 1;
         spec->sp_cr_mode = 0;
-        spec->sp_ck_split = 0;
 
         if (feat == &dt_directory_features)
                 la->la_mode = S_IFDIR | S_IXUGO;
index 325acaa..551c385 100644 (file)
@@ -2066,7 +2066,7 @@ static int lsi_prepare(struct lustre_sb_info *lsi)
        lsi->lsi_flags |= rc;
 
        /* Add mount line flags that used to be in ldd:
-        * writeconf, mgs, iam, anything else?
+        * writeconf, mgs, anything else?
         */
        lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_WRITECONF) ?
                LDD_F_WRITECONF : 0;
@@ -2074,8 +2074,6 @@ static int lsi_prepare(struct lustre_sb_info *lsi)
                LDD_F_VIRGIN : 0;
        lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_MGS) ?
                LDD_F_SV_TYPE_MGS : 0;
-       lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_IAM) ?
-               LDD_F_IAM_DIR : 0;
        lsi->lsi_flags |= (lsi->lsi_lmd->lmd_flags & LMD_FLG_NO_PRIMNODE) ?
                LDD_F_NO_PRIMNODE : 0;
 
index d209956..602e4ab 100644 (file)
@@ -150,11 +150,7 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
 
                 l = &mo->oo_dt.do_lu;
                 dt_object_init(&mo->oo_dt, NULL, d);
-                if (osd_dev(d)->od_iop_mode)
-                        mo->oo_dt.do_ops = &osd_obj_ea_ops;
-                else
-                        mo->oo_dt.do_ops = &osd_obj_ops;
-
+               mo->oo_dt.do_ops = &osd_obj_ea_ops;
                 l->lo_ops = &osd_lu_obj_ops;
                 cfs_init_rwsem(&mo->oo_sem);
                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
@@ -397,10 +393,9 @@ trigger:
 
         obj->oo_inode = inode;
         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
-        if (dev->od_iop_mode) {
-                obj->oo_compat_dot_created = 1;
-                obj->oo_compat_dotdot_created = 1;
-        }
+
+       obj->oo_compat_dot_created = 1;
+       obj->oo_compat_dotdot_created = 1;
 
         if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
                GOTO(out, result = 0);
@@ -1690,7 +1685,6 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
 {
         int result;
         struct osd_thandle *oth;
-        struct osd_device *osd = osd_obj2dev(obj);
         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
 
         LASSERT(S_ISDIR(attr->la_mode));
@@ -1698,16 +1692,7 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle->h_transaction != NULL);
         result = osd_mkfile(info, obj, mode, hint, th);
-        if (result == 0 && osd->od_iop_mode == 0) {
-                LASSERT(obj->oo_inode != NULL);
-                /*
-                 * XXX uh-oh... call low-level iam function directly.
-                 */
 
-                result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
-                                         sizeof (struct osd_fid_pack),
-                                         oth->ot_handle);
-        }
         return result;
 }
 
@@ -2703,7 +2688,6 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
        int                      result;
        int                      skip_iam = 0;
        struct osd_object       *obj = osd_dt_obj(dt);
-       struct osd_device       *osd = osd_obj2dev(obj);
 
         LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
@@ -2711,7 +2695,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_is_root(obj)) {
                 dt->do_index_ops = &osd_index_ea_ops;
                 result = 0;
-        } else if (feat == &dt_directory_features && osd->od_iop_mode) {
+       } else if (feat == &dt_directory_features) {
                 dt->do_index_ops = &osd_index_ea_ops;
                 if (S_ISDIR(obj->oo_inode->i_mode))
                         result = 0;
@@ -3878,6 +3862,7 @@ static const struct dt_index_operations osd_index_iam_ops = {
         }
 };
 
+
 /**
  * Creates or initializes iterator context.
  *
@@ -4447,11 +4432,6 @@ static int osd_mount(const struct lu_env *env,
                GOTO(out, rc = -EINVAL);
        }
 
-       if (lmd_flags & LMD_FLG_IAM) {
-                o->od_iop_mode = 0;
-               LCONSOLE_WARN("%s: OSD: IAM mode enabled\n", name);
-        } else
-                o->od_iop_mode = 1;
        if (lmd_flags & LMD_FLG_NOSCRUB)
                o->od_noscrub = 1;
 
index 9fda286..1bdd60e 100644 (file)
@@ -271,8 +271,7 @@ struct osd_device {
         struct obd_statfs         od_statfs;
         cfs_spinlock_t            od_osfs_lock;
 
-       unsigned int              od_iop_mode:1,
-                                 od_noscrub:1;
+       unsigned int              od_noscrub:1;
 
         struct fsfilt_operations *od_fsops;
        int                       od_connects;
index 514bd64..eaa5f64 100644 (file)
@@ -2000,10 +2000,6 @@ void lustre_assert_wire_constants(void)
                  MDS_OPEN_OWNEROVERRIDE);
         LASSERTF(MDS_OPEN_JOIN_FILE == 000400000000UL, "found 0%.11oUL\n",
                  MDS_OPEN_JOIN_FILE);
-        LASSERTF(MDS_CREATE_RMT_ACL == 001000000000UL, "found 0%.11oUL\n",
-                 MDS_CREATE_RMT_ACL);
-        LASSERTF(MDS_CREATE_SLAVE_OBJ == 002000000000UL, "found 0%.11oUL\n",
-                 MDS_CREATE_SLAVE_OBJ);
         LASSERTF(MDS_OPEN_LOCK == 004000000000UL, "found 0%.11oUL\n",
                  MDS_OPEN_LOCK);
         LASSERTF(MDS_OPEN_HAS_EA == 010000000000UL, "found 0%.11oUL\n",
index 3f056d7..cff13a7 100644 (file)
@@ -15,7 +15,6 @@ TMP=${TMP:-/tmp}
 DAEMONSIZE=${DAEMONSIZE:-500}
 MDSCOUNT=${MDSCOUNT:-1}
 [ $MDSCOUNT -gt 4 ] && MDSCOUNT=4
-[ $MDSCOUNT -gt 1 ] && IAMDIR=yes
 for num in $(seq $MDSCOUNT); do
     eval mds${num}_HOST=\$\{mds${num}_HOST:-$mds_HOST\}
     eval mds${num}failover_HOST=\$\{mds${num}failover_HOST:-$mdsfailover_HOST\}
index 265bbcb..4771bfb 100644 (file)
@@ -2784,8 +2784,6 @@ mkfs_opts() {
                opts+=${L_GETIDENTITY:+" --param=mdt.identity_upcall=$L_GETIDENTITY"}
 
                if [ $fstype == ldiskfs ]; then
-                       opts+=${IAMDIR:+" --iam-dir"}
-
                        fs_mkfs_opts+=${MDSJOURNALSIZE:+" -J size=$MDSJOURNALSIZE"}
                        fs_mkfs_opts+=${MDSISIZE:+" -i $MDSISIZE"}
                fi
index e3cfa81..392e4f1 100644 (file)
@@ -191,7 +191,7 @@ void print_ldd(char *str, struct lustre_disk_data *ldd)
         printf("Lustre FS:  %s\n", ldd->ldd_fsname);
         printf("Mount type: %s\n", MT_STR(ldd));
         printf("Flags:      %#x\n", ldd->ldd_flags);
-        printf("              (%s%s%s%s%s%s%s%s%s%s)\n",
+       printf("              (%s%s%s%s%s%s%s%s%s)\n",
                IS_MDT(ldd) ? "MDT ":"",
                IS_OST(ldd) ? "OST ":"",
                IS_MGS(ldd) ? "MGS ":"",
@@ -199,7 +199,6 @@ void print_ldd(char *str, struct lustre_disk_data *ldd)
                ldd->ldd_flags & LDD_F_VIRGIN     ? "first_time ":"",
                ldd->ldd_flags & LDD_F_UPDATE     ? "update ":"",
                ldd->ldd_flags & LDD_F_WRITECONF  ? "writeconf ":"",
-               ldd->ldd_flags & LDD_F_IAM_DIR  ? "IAM_dir_format ":"",
                ldd->ldd_flags & LDD_F_NO_PRIMNODE? "no_primnode ":"",
                ldd->ldd_flags & LDD_F_UPGRADE14  ? "upgrade1.4 ":"");
         printf("Persistent mount opts: %s\n", ldd->ldd_mount_opts);
@@ -284,7 +283,6 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop,
                char **mountopts)
 {
         static struct option long_opt[] = {
-                {"iam-dir", 0, 0, 'a'},
                 {"backfstype", 1, 0, 'b'},
                 {"stripe-count-hint", 1, 0, 'c'},
                 {"comment", 1, 0, 'u'},
@@ -326,11 +324,6 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop,
         while ((opt = getopt_long(argc, argv, optstring, long_opt, &longidx)) !=
                EOF) {
                 switch (opt) {
-                case 'a': {
-                        if (IS_MDT(&mop->mo_ldd))
-                                mop->mo_ldd.ldd_flags |= LDD_F_IAM_DIR;
-                        break;
-                }
                 case 'b': {
                         int i = 0;
                         while (i < LDD_MT_LAST) {
index 1df69b0..fe1920f 100644 (file)
@@ -364,8 +364,6 @@ static int parse_ldd(char *source, struct mount_opts *mop, char *options)
                append_option(options, "virgin");
        if (ldd->ldd_flags & LDD_F_WRITECONF)
                append_option(options, "writeconf");
-       if (ldd->ldd_flags & LDD_F_IAM_DIR)
-               append_option(options, "iam");
        if (ldd->ldd_flags & LDD_F_NO_PRIMNODE)
                append_option(options, "noprimnode");
 
index c0781a0..a049517 100644 (file)
@@ -900,8 +900,6 @@ check_mdt_body(void)
         CHECK_VALUE_O(MDS_OPEN_DELAY_CREATE);
         CHECK_VALUE_O(MDS_OPEN_OWNEROVERRIDE);
         CHECK_VALUE_O(MDS_OPEN_JOIN_FILE);
-        CHECK_VALUE_O(MDS_CREATE_RMT_ACL);
-        CHECK_VALUE_O(MDS_CREATE_SLAVE_OBJ);
         CHECK_VALUE_O(MDS_OPEN_LOCK);
         CHECK_VALUE_O(MDS_OPEN_HAS_EA);
         CHECK_VALUE_O(MDS_OPEN_HAS_OBJS);
index a3a66e5..bdefdd2 100644 (file)
@@ -2008,10 +2008,6 @@ void lustre_assert_wire_constants(void)
                  MDS_OPEN_OWNEROVERRIDE);
         LASSERTF(MDS_OPEN_JOIN_FILE == 000400000000UL, "found 0%.11oUL\n",
                  MDS_OPEN_JOIN_FILE);
-        LASSERTF(MDS_CREATE_RMT_ACL == 001000000000UL, "found 0%.11oUL\n",
-                 MDS_CREATE_RMT_ACL);
-        LASSERTF(MDS_CREATE_SLAVE_OBJ == 002000000000UL, "found 0%.11oUL\n",
-                 MDS_CREATE_SLAVE_OBJ);
         LASSERTF(MDS_OPEN_LOCK == 004000000000UL, "found 0%.11oUL\n",
                  MDS_OPEN_LOCK);
         LASSERTF(MDS_OPEN_HAS_EA == 010000000000UL, "found 0%.11oUL\n",