Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Mon, 26 Jun 2006 03:36:41 +0000 (03:36 +0000)
committerwangdi <wangdi>
Mon, 26 Jun 2006 03:36:41 +0000 (03:36 +0000)
add mdd_lov prototype

lustre/mdd/Makefile.in
lustre/mdd/mdd_handler.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c [new file with mode: 0644]
lustre/osd/osd_handler.c

index 2ed1249..a2a7249 100644 (file)
@@ -1,4 +1,4 @@
 MODULES := mdd
-mdd-objs := mdd_handler.o
+mdd-objs := mdd_handler.o mdd_lov.o
 
 @INCLUDE_RULES@
index 3dfce1f..a0b9f6f 100644 (file)
@@ -415,24 +415,7 @@ static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
         return next;
 }
 
-static int mdd_lov_init(struct mdd_device *mdd, struct lustre_cfg *cfg)
-{
-        int rc;
-        ENTRY;
-
-        /*FIXME lov device is a dt or obd device in this cycle?*/
-
-        rc = dt_device_init(&mdd->mdd_lov_dev, NULL);
-        if (rc)
-                GOTO(out, rc);
-
-        mdd->mdd_lov_dev.dd_lu_dev.ld_obd =
-                class_name2obd(lustre_cfg_string(cfg, 3));
-out:
-        RETURN(rc);
-}
-
-static int mdd_process_config(const struct lu_context *ctx,
+static int mdd_process_config(const struct lu_context *ctxt,
                               struct lu_device *d, struct lustre_cfg *cfg)
 {
         struct mdd_device *m = lu2mdd_dev(d);
@@ -441,13 +424,13 @@ static int mdd_process_config(const struct lu_context *ctx,
 
         switch(cfg->lcfg_command) {
         case LCFG_SETUP:
-                rc = next->ld_ops->ldo_process_config(ctx, next, cfg);
+                rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
                 if (rc)
                         GOTO(out, rc);
-                rc = mdd_mount(ctx, m);
+                rc = mdd_mount(ctxt, m);
                 if (rc)
                         GOTO(out, rc);
-                rc = mdd_lov_init(m, cfg);
+                rc = mdd_lov_init(ctxt, m, cfg);
                 if (rc) {
                         CERROR("lov init error %d \n", rc);
                         /*FIXME umount the mdd*/
@@ -455,7 +438,7 @@ static int mdd_process_config(const struct lu_context *ctx,
                 }
                 break;
         default:
-                rc = next->ld_ops->ldo_process_config(ctx, next, cfg);
+                rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
                 break;
         }
 out:
@@ -742,15 +725,11 @@ mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
         if (rc)
                 GOTO(cleanup, rc);
 
-        rc = __mdd_ref_del(ctxt, mdd_pobj, handle);
-        if (rc)
-                GOTO(cleanup, rc);
-
         rc = __mdd_ref_del(ctxt, mdd_cobj, handle);
         if (rc)
                 GOTO(cleanup, rc);
 cleanup:
-       /*FIXME: error handling*/
+       /*FIXME: error handling?*/
         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
         mdd_trans_stop(ctxt, mdd, handle);
         RETURN(rc);
index bae7b00..0188285 100644 (file)
@@ -13,11 +13,25 @@ struct lr_server_data;
 struct dentry;
 struct llog_handle;
 
+struct mdd_lov_info {
+        struct obd_device               *mdd_lov_obd; 
+        struct obd_uuid                  mdd_lov_uuid;
+        struct lov_desc                  mdd_lov_desc;
+        obd_id                          *mdd_lov_objids;
+        int                              mdd_lov_objids_size;
+        __u32                            mdd_lov_objids_in_file;
+        int                              mdd_lov_nextid_set;
+        struct lu_fid                    mdd_lov_objid_fid;
+        struct dt_object                *mdd_lov_objid_obj;
+        unsigned int                     mdd_lov_objids_dirty:1;
+};
+
 struct mdd_device {
         struct md_device                 mdd_md_dev;
         struct dt_device                *mdd_child;
+        struct mdd_lov_info              mdd_lov_info;
         struct dt_device                 mdd_lov_dev; 
-        int                              mdd_max_mddize;
+        int                              mdd_max_mdsize;
         int                              mdd_max_cookiesize;
         struct file                     *mdd_rcvd_filp;
         struct lu_fid                    mdd_root_fid;
@@ -42,4 +56,7 @@ struct mdd_object {
         struct md_object  mod_obj;
 };
 
+int mdd_lov_init(const struct lu_context *ctxt, struct mdd_device *mdd,
+                 struct lustre_cfg *cfg);
+int mdd_lov_fini(const struct lu_context *ctxt, struct mdd_device *mdd);
 #endif
diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c
new file mode 100644 (file)
index 0000000..253a89d
--- /dev/null
@@ -0,0 +1,357 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/mds/mds_lov.c
+ *  Lustre Metadata Server (mds) handling of striped file data
+ *
+ *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *           wangdi <wangdi@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/module.h>
+
+#include <obd.h>
+#include <obd_class.h>
+#include <lustre_ver.h>
+#include <obd_support.h>
+#include <obd_lov.h>
+#include <lprocfs_status.h>
+
+#include <lu_object.h>
+#include <md_object.h>
+#include <dt_object.h>
+
+#include "mdd_internal.h"
+
+const char *mdd_lov_objid_name = "lov_objid";
+
+static int mdd_lov_read_objids(const struct lu_context *ctxt, 
+                               struct mdd_device *mdd)
+{
+        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        struct dt_object *obj_ids = lov_info->mdd_lov_objid_obj;
+        struct lu_attr *lu_attr = NULL;
+        obd_id *ids;
+        loff_t off = 0;
+        int i, rc;
+        ENTRY;
+
+        LASSERT(!lov_info->mdd_lov_objids_size);
+        LASSERT(!lov_info->mdd_lov_objids_dirty);
+        
+        /* Read everything in the file, even if our current lov desc 
+           has fewer targets. Old targets not in the lov descriptor 
+           during mds setup may still have valid objids. */
+        OBD_ALLOC_PTR(lu_attr);
+        if (!lu_attr) 
+                RETURN(-ENOMEM);
+       
+        rc = obj_ids->do_ops->do_attr_get(ctxt, obj_ids, lu_attr);
+        if (rc)
+                GOTO(out, rc);
+        
+        if (lu_attr->la_size == 0)
+                RETURN(0);
+
+        OBD_ALLOC(ids, lu_attr->la_size);
+        if (ids == NULL)
+                RETURN(-ENOMEM);
+
+        lov_info->mdd_lov_objids = ids;
+        lov_info->mdd_lov_objids_size = lu_attr->la_size;
+
+        rc = obj_ids->do_body_ops->dbo_read(ctxt, obj_ids, ids, 
+                                            lu_attr->la_size, &off);
+        if (rc < 0) {
+                CERROR("Error reading objids %d\n", rc);
+                RETURN(rc);
+        }
+                
+        lov_info->mdd_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
+
+        for (i = 0; i < lov_info->mdd_lov_objids_in_file; i++) {
+                CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
+                       lov_info->mdd_lov_objids[i], i);
+        }
+out:
+        if (lu_attr)
+                OBD_FREE_PTR(lu_attr);
+        RETURN(0);
+}
+
+/* Update the lov desc for a new size lov. */
+static int mdd_lov_update_desc(const struct lu_context *ctxt, 
+                               struct mdd_device *mdd)
+{
+        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        __u32 size, stripes, valsize = sizeof(lov_info->mdd_lov_desc);
+        struct lov_desc *ld;
+        struct obd_device *lov_obd = lov_info->mdd_lov_obd;
+        int rc = 0;
+        ENTRY;
+
+        OBD_ALLOC(ld, sizeof(*ld));
+        if (!ld)
+                RETURN(-ENOMEM);
+
+        rc = obd_get_info(lov_obd->obd_self_export, strlen(KEY_LOVDESC) + 1, 
+                          KEY_LOVDESC, &valsize, ld);
+        if (rc)
+                GOTO(out, rc);
+
+        /* The size of the LOV target table may have increased. */
+        size = ld->ld_tgt_count * sizeof(obd_id);
+        if ((lov_info->mdd_lov_objids_size == 0) || 
+            (size > lov_info->mdd_lov_objids_size)) {
+                obd_id *ids;
+
+                /* add room by powers of 2 */
+                size = 1;
+                while (size < ld->ld_tgt_count)
+                        size = size << 1;
+                size = size * sizeof(obd_id);
+
+                OBD_ALLOC(ids, size);
+                if (ids == NULL)
+                        GOTO(out, rc = -ENOMEM);
+                memset(ids, 0, size);
+                if (lov_info->mdd_lov_objids_size) {
+                        obd_id *old_ids = lov_info->mdd_lov_objids;
+                        memcpy(ids, lov_info->mdd_lov_objids,
+                               lov_info->mdd_lov_objids_size);
+                        lov_info->mdd_lov_objids = ids;
+                        OBD_FREE(old_ids, lov_info->mdd_lov_objids_size);
+                }
+                lov_info->mdd_lov_objids = ids;
+                lov_info->mdd_lov_objids_size = size;
+        }
+
+        /* Don't change the mds_lov_desc until the objids size matches the
+           count (paranoia) */
+        lov_info->mdd_lov_desc = *ld;
+        CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
+               lov_info->mdd_lov_desc.ld_tgt_count);
+
+        stripes = min((__u32)LOV_MAX_STRIPE_COUNT,
+                      max(lov_info->mdd_lov_desc.ld_tgt_count,
+                          lov_info->mdd_lov_objids_in_file));
+        mdd->mdd_max_mdsize = lov_mds_md_size(stripes);
+        mdd->mdd_max_cookiesize = stripes * sizeof(struct llog_cookie);
+        CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
+               mdd->mdd_max_mdsize, mdd->mdd_max_cookiesize);
+out:
+        OBD_FREE(ld, sizeof(*ld));
+        RETURN(rc);
+}
+
+int mdd_lov_write_objids(const struct lu_context *ctxt, 
+                         struct mdd_lov_info *lov_info)
+{
+        struct dt_object *ids_obj = lov_info->mdd_lov_objid_obj;
+        loff_t off = 0;
+        int i, rc, tgts;
+        ENTRY;
+
+        if (!lov_info->mdd_lov_objids_dirty)
+                RETURN(0);
+
+        tgts = max(lov_info->mdd_lov_desc.ld_tgt_count, 
+                   lov_info->mdd_lov_objids_in_file);
+        if (!tgts)
+                RETURN(0);
+
+        for (i = 0; i < tgts; i++)
+                CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
+                       lov_info->mdd_lov_objids[i], i);
+
+        rc = ids_obj->do_body_ops->dbo_write(ctxt, ids_obj,
+                                             lov_info->mdd_lov_objids,
+                                             tgts * sizeof(obd_id), &off);
+        if (rc >= 0) {
+                lov_info->mdd_lov_objids_dirty = 0;
+                rc = 0;
+        }
+
+        RETURN(rc);
+}
+
+static int mdd_lov_connect(const struct lu_context *ctxt, 
+                           struct mdd_device *mdd, char *lov_name)
+{
+        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        struct lustre_handle conn = {0,};
+        struct obd_connect_data *data;
+        int rc = 0;
+        ENTRY;
+
+        /*connect to obd*/
+        OBD_ALLOC(data, sizeof(*data));
+        if (data == NULL)
+                RETURN(-ENOMEM);
+        data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
+                                  OBD_CONNECT_REQPORTAL;
+        data->ocd_version = LUSTRE_VERSION_CODE;
+        /* NB: lov_connect() needs to fill in .ocd_index for each OST */
+        rc = obd_connect(&conn, lov_info->mdd_lov_obd, &lov_info->mdd_lov_uuid,
+                         data);
+        OBD_FREE(data, sizeof(*data));
+        if (rc) {
+                CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
+                lov_info->mdd_lov_obd = ERR_PTR(rc);
+                RETURN(rc);
+        }
+#if 0
+        /*FIXME: register observer of lov, need obd method, 
+         * but mdd is not obd now*/
+        rc = md_register_observer(mds->mds_osc_obd, obd);
+        if (rc) {
+                CERROR("MDS cannot register as observer of LOV %s (%d)\n",
+                       lov_name, rc);
+                GOTO(err_discon, rc);
+        }
+#endif
+        /* open and test the lov objd file */
+
+        rc = mdd_lov_read_objids(ctxt, mdd);
+        if (rc) {
+                CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
+                GOTO(out, rc);
+        }
+
+        rc = mdd_lov_update_desc(ctxt, mdd);
+        if (rc)
+                GOTO(out, rc);
+#if 0
+        /* tgt_count may be 0! */
+        rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
+        if (rc) {
+                CERROR("failed to initialize catalog %d\n", rc);
+                GOTO(err_reg, rc);
+        }
+#endif
+        /* If we're mounting this code for the first time on an existing FS,
+         * we need to populate the objids array from the real OST values */
+        if (lov_info->mdd_lov_desc.ld_tgt_count > 
+                      lov_info->mdd_lov_objids_in_file) {
+                int size = sizeof(obd_id) * lov_info->mdd_lov_desc.ld_tgt_count;
+                int i;
+
+                rc = obd_get_info(lov_info->mdd_lov_obd->obd_self_export, 
+                                  strlen("last_id"), "last_id", &size, 
+                                  lov_info->mdd_lov_objids);
+                if (!rc) {
+                        for (i = 0; i < lov_info->mdd_lov_desc.ld_tgt_count; i++)
+                                CWARN("got last object "LPU64" from OST %d\n",
+                                      lov_info->mdd_lov_objids[i], i);
+                        lov_info->mdd_lov_objids_dirty = 1;
+                        rc = mdd_lov_write_objids(ctxt, lov_info);
+                        if (rc)
+                                CERROR("got last objids from OSTs, but error "
+                                       "writing objids file: %d\n", rc);
+                }
+        }
+        /* I want to see a callback happen when the OBD moves to a
+         * "For General Use" state, and that's when we'll call
+         * set_nextid().  The class driver can help us here, because
+         * it can use the obd_recovering flag to determine when the
+         * the OBD is full available. */
+#if 0
+        if (!obd->obd_recovering)
+                rc = mds_postrecov(obd);
+#endif
+out:
+        RETURN(rc);
+}
+
+int mdd_lov_fini(const struct lu_context *ctxt, struct mdd_device *mdd)
+{
+        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        
+        dt_object_fini(lov_info->mdd_lov_objid_obj);
+        return 0;
+}
+
+int mdd_lov_init(const struct lu_context *ctxt, struct mdd_device *mdd,
+                 struct lustre_cfg *cfg)
+{
+        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        struct dt_object *obj_id;
+        char *lov_name = NULL;
+        int rc = 0;
+        ENTRY;
+        if (IS_ERR(lov_info->mdd_lov_obd))
+                RETURN(PTR_ERR(lov_info->mdd_lov_obd));
+
+        lov_name = lustre_cfg_string(cfg, 3);
+        LASSERTF(lov_name != NULL, "MDD need lov \n");
+        lov_info->mdd_lov_obd = class_name2obd(lov_name);
+        if (!lov_info->mdd_lov_obd) {
+                CERROR("MDS cannot locate LOV %s\n", lov_name);
+                lov_info->mdd_lov_obd = ERR_PTR(-ENOTCONN);
+                RETURN(-ENOTCONN);
+        }
+
+        obj_id = dt_store_open(ctxt, mdd->mdd_child, mdd_lov_objid_name,
+                               &lov_info->mdd_lov_objid_fid);
+        if (IS_ERR(obj_id)){
+                rc = PTR_ERR(obj_id);
+                RETURN(rc); 
+        }
+
+        LASSERT(obj_id != NULL);
+        lov_info->mdd_lov_objid_obj = obj_id;
+
+        obd_str2uuid(&lov_info->mdd_lov_uuid, lustre_cfg_string(cfg, 1));
+
+        rc = mdd_lov_connect(ctxt, mdd, lov_name);
+        if (rc)
+                GOTO(out, rc);
+out: 
+        if (rc)
+                mdd_lov_fini(ctxt, mdd);
+        RETURN(rc);
+}
+
+/* update the LOV-OSC knowledge of the last used object id's */
+int mdd_lov_set_nextid(struct mdd_device *mdd)
+{
+        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        int rc;
+        ENTRY;
+
+        LASSERT(lov_info->mdd_lov_objids != NULL);
+
+        rc = obd_set_info_async(lov_info->mdd_lov_obd->obd_self_export, 
+                                strlen(KEY_NEXT_ID), KEY_NEXT_ID,
+                                lov_info->mdd_lov_desc.ld_tgt_count,
+                                lov_info->mdd_lov_objids, NULL);
+
+        if (rc) 
+                CERROR ("mdd_lov_set_nextid failed (%d)\n", rc);
+
+        RETURN(rc);
+}
index df4b0a4..2b68637 100644 (file)
@@ -1163,6 +1163,14 @@ static int osd_mount(const struct lu_context *ctx,
                                 result = PTR_ERR(d);
                 } else
                         result = PTR_ERR(d);
+                /*
+                 * XXX temporary resolution: OBJ_IDS should also be done in mkfs
+                 */
+                d = simple_mknod(osd_sb(o)->s_root, "lov_objid", 0777, 1);
+                if (!IS_ERR(d))
+                        dput(d);
+                else
+                        result = PTR_ERR(d);
         }
         if (result != 0)
                 osd_device_fini(ctx, osd2lu_dev(o));