Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Wed, 30 Aug 2006 14:51:24 +0000 (14:51 +0000)
committerwangdi <wangdi>
Wed, 30 Aug 2006 14:51:24 +0000 (14:51 +0000)
1)add part of split support, still not finished.
2)some fixes for this splitting.

lustre/autoconf/lustre-core.m4
lustre/cmm/Makefile.in
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c [new file with mode: 0644]
lustre/mdd/mdd_handler.c
lustre/mdt/mdt_handler.c

index a7e1fc3..95f6796 100644 (file)
@@ -597,6 +597,7 @@ if test x$enable_server = xyes ; then
 fi
 LC_CONFIG_PINGER
 LC_CONFIG_QUOTA
+LC_CONFIG_SPLIT
 
 LC_STRUCT_KIOBUF
 LC_FUNC_COND_RESCHED
@@ -696,7 +697,27 @@ if test x$enable_quota != xno; then
    AC_DEFINE(HAVE_QUOTA_SUPPORT, 1, [Enable quota support])
 fi
 ])
-  
+#
+# LC_CONFIG_SPLIT
+#
+# whether to enable quota support
+#
+AC_DEFUN([LC_CONFIG_SPLIT],
+[AC_MSG_CHECKING([whether to enable split support])
+AC_ARG_ENABLE([split], 
+       AC_HELP_STRING([--enable-split],
+                       [enable split support]),
+       [],[enable_split='no'])
+AC_MSG_RESULT([$enable_split])
+if test x$linux25 != xyes; then
+   enable_split='no'
+fi
+if test x$enable_split != xno; then
+   AC_DEFINE(HAVE_SPLIT_SUPPORT, 1, [Enable split support])
+fi
+])
 #
 # LC_CONFIGURE
 #
@@ -750,6 +771,7 @@ AM_CONDITIONAL(MPITESTS, test x$enable_mpitests = xyes, Build MPI Tests)
 AM_CONDITIONAL(CLIENT, test x$enable_client = xyes)
 AM_CONDITIONAL(SERVER, test x$enable_server = xyes)
 AM_CONDITIONAL(QUOTA, test x$enable_quota = xyes)
+AM_CONDITIONAL(SPLIT, test x$enable_split = xyes)
 AM_CONDITIONAL(BLKID, test x$ac_cv_header_blkid_blkid_h = xyes)
 AM_CONDITIONAL(EXT2FS_DEVEL, test x$ac_cv_header_ext2fs_ext2fs_h = xyes)
 ])
index 6264e0f..c4dfdcb 100644 (file)
@@ -1,4 +1,6 @@
 MODULES := cmm
 cmm-objs := cmm_device.o cmm_object.o mdc_device.o mdc_object.o
 
+@SPLIT_TRUE@cmm-objs += cmm_split.o 
+
 @INCLUDE_RULES@
index ea56ed5..7cabd90 100644 (file)
@@ -356,11 +356,12 @@ static int cml_create(const struct lu_context *ctx, struct md_object *mo_p,
         int rc;
         ENTRY;
 
-#ifdef SPLIT_ENABLE
+#ifdef HAVE_SPLIT_SUPPORT
         rc = cml_try_to_split(ctx, mo_p);
         if (rc)
                 RETURN(rc);
 #endif
+
         rc = mdo_create(ctx, md_object_next(mo_p), child_name,
                         md_object_next(mo_c), spec, ma);
 
diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c
new file mode 100644 (file)
index 0000000..921f883
--- /dev/null
@@ -0,0 +1,227 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/cmm/cmm_split.c
+ *  Lustre splitting dir 
+ *
+ *  Copyright (c) 2006 Cluster File Systems, Inc.
+ *   Author: Alex thomas <alex@clusterfs.com>
+ *           Wang Di     <wangdi@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <obd_class.h>
+#include <lustre_fid.h>
+#include <lustre_mds.h>
+#include "cmm_internal.h"
+#include "mdc_internal.h"
+
+struct cmm_thread_info {
+        struct md_attr   cti_ma;
+};
+
+struct lu_context_key cmm_thread_key;
+struct cmm_thread_info *cmm_ctx_info(const struct lu_context *ctx)
+{
+        struct cmm_thread_info *info;
+
+        info = lu_context_key_get(ctx, &cmm_thread_key);
+        LASSERT(info != NULL);
+        return info;
+}
+
+#define CMM_NO_SPLIT_EXPECTED   0
+#define CMM_EXPECT_SPLIT        1
+#define CMM_NO_SPLITTABLE       2
+
+#define SPLIT_SIZE 64*1024
+
+static int cmm_expect_splitting(const struct lu_context *ctx,
+                                struct md_object *mo, struct md_attr *ma)
+{
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        ENTRY;
+
+        if (cmm->cmm_tgt_count == 1)
+                RETURN(CMM_NO_SPLIT_EXPECTED);
+
+        if (ma->ma_attr.la_size < SPLIT_SIZE)
+                RETURN(CMM_NO_SPLIT_EXPECTED);
+
+        if (ma->ma_lmv_size)
+                RETURN(CMM_NO_SPLIT_EXPECTED);
+                       
+        RETURN(CMM_EXPECT_SPLIT);
+}
+
+static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
+{
+       return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
+}
+
+#define cmm_md_size(stripes)                            \
+       (sizeof(struct lmv_stripe_md) + stripes * sizeof(struct lu_fid))
+
+static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
+                         struct lu_fid *fid, int count)
+{
+        struct  mdc_device *mc, *tmp;
+        int rc = 0, i = 0;
+        
+        LASSERT(count == cmm->cmm_tgt_count);
+        
+        /*FIXME: this spin_lock maybe not proper, 
+         * because fid_alloc may need RPC*/
+        spin_lock(&cmm->cmm_tgt_guard);
+        list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
+                                 mc_linkage) {
+                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
+                if (rc) {
+                        spin_unlock(&cmm->cmm_tgt_guard);
+                        RETURN(rc);
+                }
+        }
+        spin_unlock(&cmm->cmm_tgt_guard);
+        RETURN(rc);
+}
+
+struct cmm_object *cmm_object_find(const struct lu_context *ctxt,
+                                   struct cmm_device *d,
+                                   const struct lu_fid *f)
+{
+        struct lu_object *o;
+        struct cmm_object *m;
+        ENTRY;
+
+        o = lu_object_find(ctxt, d->cmm_md_dev.md_lu_dev.ld_site, f);
+        if (IS_ERR(o))
+                m = (struct cmm_object *)o;
+        else
+                m = lu2cmm_obj(lu_object_locate(o->lo_header,
+                               d->cmm_md_dev.md_lu_dev.ld_type));
+        RETURN(m);
+}
+
+static int cmm_creat_remote_obj(const struct lu_context *ctx, 
+                                struct cmm_device *cmm,
+                                struct lu_fid *fid, struct md_attr *ma)
+{
+        struct cmm_object *obj;
+        struct md_create_spec *spec;
+        int rc;
+        ENTRY;
+
+        obj = cmm_object_find(ctx, cmm, fid);
+        if (IS_ERR(obj))
+                RETURN(PTR_ERR(obj));
+
+        OBD_ALLOC_PTR(spec);
+        rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj), 
+                              spec, ma);
+        OBD_FREE_PTR(spec);
+
+        RETURN(0);
+}
+
+static int cmm_create_slave_objects(const struct lu_context *ctx,
+                                    struct md_object *mo, struct md_attr *ma)
+{
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct lmv_stripe_md *lmv = NULL;
+        int lmv_size, i, rc;
+        struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
+        ENTRY;
+
+        lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
+
+        OBD_ALLOC(lmv, lmv_size);
+        if (!lmv)
+                RETURN(-ENOMEM);
+
+        lmv->mea_master = -1; 
+        lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
+        lmv->mea_count = cmm->cmm_tgt_count + 1;
+
+        lmv->mea_ids[0] = *lf;
+        /*create object*/
+        rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        for (i = 0; i < cmm->cmm_tgt_count; i ++) {
+                rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
+
+        rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size, 
+                          MDS_LMV_MD_NAME, 0);
+cleanup:
+        OBD_FREE(lmv, lmv_size);
+        RETURN(rc);
+}
+
+static int cmm_scan_and_split(const struct lu_context *ctx,
+                              struct md_object *mo, struct md_attr *ma)
+{
+        RETURN(0);
+}
+
+int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
+{
+        struct md_attr *ma;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
+       
+        OBD_ALLOC_PTR(ma);
+        if (ma == NULL)
+                RETURN(-ENOMEM);
+
+        ma->ma_need = MA_INODE;
+        rc = mo_attr_get(ctx, mo, ma);
+        if (rc)
+                GOTO(cleanup, ma);
+
+        /*step1: checking whether the dir need to be splitted*/
+        rc = cmm_expect_splitting(ctx, mo, ma);
+        if (rc != CMM_EXPECT_SPLIT)
+                GOTO(cleanup, rc = 0);
+
+        /*step2: create slave objects*/
+        rc = cmm_create_slave_objects(ctx, mo, ma);
+        if (rc)
+                GOTO(cleanup, ma);
+
+        /*step3: scan and split the object*/
+        rc = cmm_scan_and_split(ctx, mo, ma);
+cleanup:
+        OBD_FREE_PTR(ma);
+
+        RETURN(rc);
+}
+
index b289ae1..f3100f6 100644 (file)
@@ -290,6 +290,7 @@ static int __mdd_lmm_get(const struct lu_context *ctxt,
         RETURN(rc);
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
 /* get lmv EA only*/
 static int __mdd_lmv_get(const struct lu_context *ctxt,
                          struct mdd_object *mdd_obj, struct md_attr *ma)
@@ -304,6 +305,7 @@ static int __mdd_lmv_get(const struct lu_context *ctxt,
         }
         RETURN(rc);
 }
+#endif
 
 static int mdd_attr_get_internal(const struct lu_context *ctxt,
                                  struct mdd_object *mdd_obj,
@@ -320,11 +322,12 @@ static int mdd_attr_get_internal(const struct lu_context *ctxt,
                     S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
         }
+#ifdef HAVE_SPLIT_SUPPORT 
         if (rc == 0 && ma->ma_need & MA_LMV) {
                 if (S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmv_get(ctxt, mdd_obj, ma);
         }
-        
+#endif        
         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
                         rc, ma->ma_valid);
         RETURN(rc);
index e8acce1..6a4554c 100644 (file)
@@ -1907,6 +1907,11 @@ static int mdt_seq_init_cli(const struct lu_context *ctx,
 
                         if (rc)
                                 RETURN(rc);
+                        /*FIXME: add client seq to mdc obd for 
+                         *allocating fid in create slave objects,
+                         *may need better way to fix it,
+                         *why not init client seq in cmm_add_mdc?*/
+                        mdc->u.cli.cl_seq = ls->ls_client_seq;
 
                         LASSERT(ls->ls_server_seq != NULL);