Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Fri, 14 Jul 2006 07:58:22 +0000 (07:58 +0000)
committerwangdi <wangdi>
Fri, 14 Jul 2006 07:58:22 +0000 (07:58 +0000)
1)reorganize MDS to share mds lov api with mdd
2)serval fixes about mdd lov

lustre/include/lustre_mds.h
lustre/include/obd.h
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c
lustre/mds/mds_fs.c
lustre/mds/mds_internal.h
lustre/mds/mds_lov.c
lustre/mds/mds_reint.c

index edbf088..b0ef627 100644 (file)
@@ -81,6 +81,15 @@ struct mds_file_data {
 int mds_reint_rec(struct mds_update_record *r, int offset,
                   struct ptlrpc_request *req, struct lustre_handle *);
 
+int md_lov_connect(struct obd_device *obd, struct md_lov_info *mli,
+                   char *lov_name, struct obd_uuid *uuid, 
+                   struct md_lov_ops *mlo, const void *ctxt);
+int md_lov_notity_pre(struct obd_device *obd, struct md_lov_info *mli,
+                      struct obd_device *watched, enum obd_notify_event ev,
+                      void *data);
+int md_lov_start_synchronize(struct obd_device *obd, struct md_lov_info *mli,
+                             struct obd_device *watched,
+                             void *data, int nonblock, const void *ctxt);
 /* ioctls for trying requests */
 #define IOC_REQUEST_TYPE                   'f'
 #define IOC_REQUEST_MIN_NR                 30
index 83fa845..6838b97 100644 (file)
@@ -402,31 +402,47 @@ struct mgs_obd {
         struct semaphore                 mgs_sem;
 };
 
+struct md_lov_info;
+
+struct md_lov_ops {
+        int (*ml_read_objids)(struct obd_device *obd, struct md_lov_info *mli, 
+                              const void *ctxt);
+        int (*ml_write_objids)(struct obd_device *obd, struct md_lov_info *mli,
+                               const void *ctxt);
+};
 struct md_lov_info {
-        struct obd_device               *md_osc_obd; /* XXX lov_obd */
+        struct obd_device               *md_lov_obd; /* XXX lov_obd */
         struct obd_uuid                  md_lov_uuid;
-        struct obd_export               *md_osc_exp; /* XXX lov_exp */
+        struct obd_export               *md_lov_exp; /* XXX lov_exp */
         struct lov_desc                  md_lov_desc;
         obd_id                          *md_lov_objids;
         int                              md_lov_objids_size;
         __u32                            md_lov_objids_in_file;
         unsigned int                     md_lov_objids_dirty:1;
         int                              md_lov_nextid_set;
-        struct file                     *md_lov_objid_filp;
+        void                            *md_lov_objid_obj;
+        struct lu_fid                    md_lov_objid_fid;
         unsigned long                    md_lov_objids_valid:1;
+        int                              md_lov_max_mdsize;
+        int                              md_lov_max_cookiesize;
+        struct semaphore                 md_lov_orphan_recovery_sem;
+        struct md_lov_ops                *md_lov_ops;
 };
 
-#define mds_osc_obd             mds_lov_info.md_osc_obd
+#define mds_osc_obd             mds_lov_info.md_lov_obd
 #define mds_lov_uuid            mds_lov_info.md_lov_uuid
-#define mds_osc_exp             mds_lov_info.md_osc_exp
+#define mds_osc_exp             mds_lov_info.md_lov_exp
 #define mds_lov_desc            mds_lov_info.md_lov_desc
 #define mds_lov_objids          mds_lov_info.md_lov_objids
 #define mds_lov_objids_size     mds_lov_info.md_lov_objids_size
 #define mds_lov_objids_in_file  mds_lov_info.md_lov_objids_in_file
 #define mds_lov_objids_dirty    mds_lov_info.md_lov_objids_dirty
 #define mds_lov_nextid_set      mds_lov_info.md_lov_nextid_set
-#define mds_lov_objid_filp      mds_lov_info.md_lov_objid_filp
+#define mds_lov_objid_filp      mds_lov_info.md_lov_objid_obj
 #define mds_lov_objids_valid    mds_lov_info.md_lov_objids_valid
+#define mds_max_mdsize          mds_lov_info.md_lov_max_mdsize
+#define mds_max_cookiesize      mds_lov_info.md_lov_max_cookiesize
+#define mds_orphan_recovery_sem mds_lov_info.md_lov_orphan_recovery_sem
 
 struct mds_obd {
         /* NB this field MUST be first */
@@ -436,8 +452,6 @@ struct mds_obd {
         struct ptlrpc_service           *mds_readpage_service;
         struct vfsmount                 *mds_vfsmnt;
         cfs_dentry_t                    *mds_fid_de;
-        int                              mds_max_mdsize;
-        int                              mds_max_cookiesize;
         struct file                     *mds_rcvd_filp;
         spinlock_t                       mds_transno_lock;
         __u64                            mds_last_transno;
@@ -456,7 +470,6 @@ struct mds_obd {
         char                            *mds_profile;
         struct file                     *mds_health_check_filp;
         unsigned long                   *mds_client_bitmap;
-        struct semaphore                 mds_orphan_recovery_sem;
         struct upcall_cache             *mds_group_hash;
 
         struct lustre_quota_info         mds_quota_info;
index 7a9844b..f66f939 100644 (file)
 
 struct dt_device;
 
-struct mdd_lov_info {
-        struct obd_device               *mdd_lov_obd;
-        struct obd_uuid                  mdd_lov_uuid;
-        struct lov_desc                  mdd_lov_desc;
-        obd_id                          *mdd_lov_objids;
-        int                              mdd_lov_objids_size;
-        __u32                            mdd_lov_objids_in_file;
-        int                              mdd_lov_nextid_set;
-        struct lu_fid                    mdd_lov_objid_fid;
-        struct dt_object                *mdd_lov_objid_obj;
-        unsigned int                     mdd_lov_objids_dirty:1;
-};
-
 struct mdd_device {
         struct md_device                 mdd_md_dev;
         struct dt_device                *mdd_child;
-        struct mdd_lov_info              mdd_lov_info;
+        struct md_lov_info               mdd_lov_info;
         struct dt_device                 mdd_lov_dev;
-        int                              mdd_max_mdsize;
-        int                              mdd_max_cookiesize;
         struct lu_fid                    mdd_root_fid;
 };
 
index 2f190d3..6a89c13 100644 (file)
 #include <md_object.h>
 #include <dt_object.h>
 #include <lustre_mds.h>
+#include <lustre/lustre_idl.h>
 
 #include "mdd_internal.h"
 
 static const char mdd_lov_objid_name[] = "lov_objid";
 
-static int mdd_lov_read_objids(const struct lu_context *ctxt,
-                               struct mdd_device *mdd)
+static int mdd_lov_read_objids(struct obd_device *obd, struct md_lov_info *mli, 
+                               const void *ctxt)
 {
-        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
-        struct dt_object *obj_ids = lov_info->mdd_lov_objid_obj;
+        struct dt_object *obj_ids = mli->md_lov_objid_obj;
         struct lu_attr *lu_attr = &mdd_ctx_info(ctxt)->mti_attr;
         obd_id *ids;
         int i, rc;
         ENTRY;
 
-        LASSERT(!lov_info->mdd_lov_objids_size);
-        LASSERT(!lov_info->mdd_lov_objids_dirty);
+        LASSERT(!mli->md_lov_objids_size);
+        LASSERT(!mli->md_lov_objids_dirty);
 
         /* Read everything in the file, even if our current lov desc
            has fewer targets. Old targets not in the lov descriptor
@@ -77,8 +77,8 @@ static int mdd_lov_read_objids(const struct lu_context *ctxt,
         if (ids == NULL)
                 RETURN(-ENOMEM);
 
-        lov_info->mdd_lov_objids = ids;
-        lov_info->mdd_lov_objids_size = lu_attr->la_size;
+        mli->md_lov_objids = ids;
+        mli->md_lov_objids_size = lu_attr->la_size;
 
 #if 0
         rc = obj_ids->do_body_ops->dbo_read(ctxt, obj_ids, ids,
@@ -88,475 +88,101 @@ static int mdd_lov_read_objids(const struct lu_context *ctxt,
                 RETURN(rc);
         }
 #endif
-        lov_info->mdd_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
+        mli->md_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
 
-        for (i = 0; i < lov_info->mdd_lov_objids_in_file; i++) {
+        for (i = 0; i < mli->md_lov_objids_in_file; i++) {
                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
-                       lov_info->mdd_lov_objids[i], i);
+                       mli->md_lov_objids[i], i);
         }
 out:
         RETURN(0);
 }
 
-/* Update the lov desc for a new size lov. */
-static int mdd_lov_update_desc(const struct lu_context *ctxt,
-                               struct mdd_device *mdd)
-{
-        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
-        __u32 size, stripes, valsize = sizeof(lov_info->mdd_lov_desc);
-        struct lov_desc *ld;
-        struct obd_device *lov_obd = lov_info->mdd_lov_obd;
-        int rc = 0;
-        ENTRY;
-
-        ld = &mdd_ctx_info(ctxt)->mti_ld;
-
-        rc = obd_get_info(lov_obd->obd_self_export, strlen(KEY_LOVDESC) + 1,
-                          KEY_LOVDESC, &valsize, ld);
-        if (rc)
-                GOTO(out, rc);
-
-        /* The size of the LOV target table may have increased. */
-        size = ld->ld_tgt_count * sizeof(obd_id);
-        if ((lov_info->mdd_lov_objids_size == 0) ||
-            (size > lov_info->mdd_lov_objids_size)) {
-                obd_id *ids;
-
-                /* add room by powers of 2 */
-                size = 1;
-                while (size < ld->ld_tgt_count)
-                        size = size << 1;
-                size = size * sizeof(obd_id);
-
-                OBD_ALLOC(ids, size);
-                if (ids == NULL)
-                        GOTO(out, rc = -ENOMEM);
-                memset(ids, 0, size);
-                if (lov_info->mdd_lov_objids_size) {
-                        obd_id *old_ids = lov_info->mdd_lov_objids;
-                        memcpy(ids, lov_info->mdd_lov_objids,
-                               lov_info->mdd_lov_objids_size);
-                        lov_info->mdd_lov_objids = ids;
-                        OBD_FREE(old_ids, lov_info->mdd_lov_objids_size);
-                }
-                lov_info->mdd_lov_objids = ids;
-                lov_info->mdd_lov_objids_size = size;
-        }
-
-        /* Don't change the mds_lov_desc until the objids size matches the
-           count (paranoia) */
-        lov_info->mdd_lov_desc = *ld;
-        CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
-               lov_info->mdd_lov_desc.ld_tgt_count);
-
-        stripes = min((__u32)LOV_MAX_STRIPE_COUNT,
-                      max(lov_info->mdd_lov_desc.ld_tgt_count,
-                          lov_info->mdd_lov_objids_in_file));
-        mdd->mdd_max_mdsize = lov_mds_md_size(stripes);
-        mdd->mdd_max_cookiesize = stripes * sizeof(struct llog_cookie);
-        CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
-               mdd->mdd_max_mdsize, mdd->mdd_max_cookiesize);
-out:
-        RETURN(rc);
-}
-
-int mdd_lov_write_objids(const struct lu_context *ctxt,
-                         struct mdd_lov_info *lov_info)
+int mdd_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli, 
+                         const void *ctxt)
 {
         int i, rc = 0, tgts;
         ENTRY;
 
-        if (!lov_info->mdd_lov_objids_dirty)
+        if (!mli->md_lov_objids_dirty)
                 RETURN(0);
 
-        tgts = max(lov_info->mdd_lov_desc.ld_tgt_count,
-                   lov_info->mdd_lov_objids_in_file);
+        tgts = max(mli->md_lov_desc.ld_tgt_count,
+                   mli->md_lov_objids_in_file);
         if (!tgts)
                 RETURN(0);
 
         for (i = 0; i < tgts; i++)
                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
-                       lov_info->mdd_lov_objids[i], i);
+                       mli->md_lov_objids[i], i);
 #if 0
         rc = ids_obj->do_body_ops->dbo_write(ctxt, ids_obj,
-                                             lov_info->mdd_lov_objids,
+                                             mli->mdd_lov_objids,
                                              tgts * sizeof(obd_id), &off);
         if (rc >= 0) {
-                lov_info->mdd_lov_objids_dirty = 0;
+                mli->mdd_lov_objids_dirty = 0;
                 rc = 0;
         }
 #endif
         RETURN(rc);
 }
 
-static int mdd_lov_connect(const struct lu_context *ctxt,
-                           struct mdd_device *mdd, char *lov_name)
-{
-        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
-        struct lustre_handle conn = {0,};
-        struct obd_connect_data *data;
-        int rc = 0;
-        ENTRY;
-
-        /*connect to obd*/
-        OBD_ALLOC(data, sizeof(*data));
-        if (data == NULL)
-                RETURN(-ENOMEM);
-        data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
-                                  OBD_CONNECT_REQPORTAL;
-        data->ocd_version = LUSTRE_VERSION_CODE;
-        /* NB: lov_connect() needs to fill in .ocd_index for each OST */
-        rc = obd_connect(&conn, lov_info->mdd_lov_obd, &lov_info->mdd_lov_uuid,
-                         data);
-        OBD_FREE(data, sizeof(*data));
-        if (rc) {
-                CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
-                lov_info->mdd_lov_obd = ERR_PTR(rc);
-                RETURN(rc);
-        }
-
-        /* open and test the lov objd file */
-
-        rc = mdd_lov_read_objids(ctxt, mdd);
-        if (rc) {
-                CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
-                GOTO(out, rc);
-        }
-
-        rc = mdd_lov_update_desc(ctxt, mdd);
-        if (rc)
-                GOTO(out, rc);
-#if 0
-        /* tgt_count may be 0! */
-        rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
-        if (rc) {
-                CERROR("failed to initialize catalog %d\n", rc);
-                GOTO(err_reg, rc);
-        }
-#endif
-        /* If we're mounting this code for the first time on an existing FS,
-         * we need to populate the objids array from the real OST values */
-        if (lov_info->mdd_lov_desc.ld_tgt_count >
-                      lov_info->mdd_lov_objids_in_file) {
-                int size = sizeof(obd_id) * lov_info->mdd_lov_desc.ld_tgt_count;
-                int i;
-
-                rc = obd_get_info(lov_info->mdd_lov_obd->obd_self_export,
-                                  strlen("last_id"), "last_id", &size,
-                                  lov_info->mdd_lov_objids);
-                if (!rc) {
-                        for (i = 0; i < lov_info->mdd_lov_desc.ld_tgt_count; i++)
-                                CWARN("got last object "LPU64" from OST %d\n",
-                                      lov_info->mdd_lov_objids[i], i);
-                        lov_info->mdd_lov_objids_dirty = 1;
-                        rc = mdd_lov_write_objids(ctxt, lov_info);
-                        if (rc)
-                                CERROR("got last objids from OSTs, but error "
-                                       "writing objids file: %d\n", rc);
-                }
-        }
-        /* I want to see a callback happen when the OBD moves to a
-         * "For General Use" state, and that's when we'll call
-         * set_nextid().  The class driver can help us here, because
-         * it can use the obd_recovering flag to determine when the
-         * the OBD is full available. */
-#if 0
-        if (!obd->obd_recovering)
-                rc = mds_postrecov(obd);
-#endif
-out:
-        if (rc)
-                obd_disconnect(lov_info->mdd_lov_obd->obd_self_export);
-        RETURN(rc);
-}
+struct md_lov_ops mdd_lov_ops = {
+        .ml_read_objids = mdd_lov_read_objids,
+        .ml_write_objids = mdd_lov_write_objids,
+};
 
 int mdd_lov_fini(const struct lu_context *ctxt, struct mdd_device *mdd)
 {
-        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        struct md_lov_info *mli = &mdd->mdd_lov_info;
 
-        dt_object_fini(lov_info->mdd_lov_objid_obj);
+        obd_register_observer(mli->md_lov_obd, NULL);
+        
+        if (mli->md_lov_exp) {
+                obd_disconnect(mli->md_lov_exp);
+                mli->md_lov_exp = NULL;
+        }
+        
+        dt_object_fini(mli->md_lov_objid_obj);
         return 0;
 }
 
 int mdd_lov_init(const struct lu_context *ctxt, struct mdd_device *mdd,
                  struct lustre_cfg *cfg)
 {
-        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+        struct md_lov_info *lov_info = &mdd->mdd_lov_info;
         struct dt_object *obj_id;
         struct obd_device *obd = NULL;
         char *lov_name = NULL, *srv = NULL;
         int rc = 0;
         ENTRY;
 
-        if (IS_ERR(lov_info->mdd_lov_obd))
-                RETURN(PTR_ERR(lov_info->mdd_lov_obd));
+        if (IS_ERR(lov_info->md_lov_obd))
+                RETURN(PTR_ERR(lov_info->md_lov_obd));
 
         lov_name = lustre_cfg_string(cfg, 3);
         LASSERTF(lov_name != NULL, "MDD need lov \n");
-        lov_info->mdd_lov_obd = class_name2obd(lov_name);
-        if (!lov_info->mdd_lov_obd) {
-                CERROR("MDS cannot locate LOV %s\n", lov_name);
-                lov_info->mdd_lov_obd = ERR_PTR(-ENOTCONN);
-                RETURN(-ENOTCONN);
-        }
 
         obj_id = dt_store_open(ctxt, mdd->mdd_child, mdd_lov_objid_name,
-                               &lov_info->mdd_lov_objid_fid);
+                               &lov_info->md_lov_objid_fid);
         if (IS_ERR(obj_id)){
                 rc = PTR_ERR(obj_id);
                 RETURN(rc);
         }
 
         LASSERT(obj_id != NULL);
-        lov_info->mdd_lov_objid_obj = obj_id;
-
-        obd_str2uuid(&lov_info->mdd_lov_uuid, lustre_cfg_string(cfg, 1));
+        lov_info->md_lov_objid_obj = obj_id;
 
-        rc = mdd_lov_connect(ctxt, mdd, lov_name);
-        if (rc)
-                GOTO(out, rc);
-
-        /*register the obd server for lov*/
         srv = lustre_cfg_string(cfg, 0);
         obd = class_name2obd(srv);
         if (obd == NULL) {
                 CERROR("No such OBD %s\n", srv);
                 LBUG();
         }
-        rc = obd_register_observer(lov_info->mdd_lov_obd, obd);
-        if (rc) {
-                CERROR("MDS cannot register as observer of LOV %s (%d)\n",
-                       lov_name, rc);
-                GOTO(out, rc);
-        }
-        EXIT;
-out:
+        rc = md_lov_connect(obd, lov_info, lov_name, 
+                            &obd->obd_uuid, &mdd_lov_ops, ctxt);
         if (rc)
                 mdd_lov_fini(ctxt, mdd);
-        return rc;
-}
-
-/* update the LOV-OSC knowledge of the last used object id's */
-int mdd_lov_set_nextid(struct mdd_device *mdd)
-{
-        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
-        int rc;
-        ENTRY;
-
-        LASSERT(lov_info->mdd_lov_objids != NULL);
-
-        rc = obd_set_info_async(lov_info->mdd_lov_obd->obd_self_export,
-                                strlen(KEY_NEXT_ID), KEY_NEXT_ID,
-                                lov_info->mdd_lov_desc.ld_tgt_count,
-                                lov_info->mdd_lov_objids, NULL);
-
-        if (rc)
-                CERROR ("mdd_lov_set_nextid failed (%d)\n", rc);
-
-        RETURN(rc);
-}
-
-struct mdd_lov_sync_info {
-        struct lu_context *mlsi_ctxt;
-        struct lu_device  *mlsi_ld;     /* the lov device to sync */
-        struct obd_device *mlsi_watched; /* target osc */
-        __u32              mlsi_index;   /* index of target */
-};
-
-#define MDSLOV_NO_INDEX -1
-
-/* Inform MDS about new/updated target */
-static int mdd_lov_update_mds(struct lu_context *ctxt,
-                              struct lu_device *ld,
-                              struct obd_device *watched,
-                              __u32 idx)
-{
-        struct mdd_device *mdd = lu2mdd_dev(ld);
-        struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
-        int old_count;
-        int rc = 0;
-        ENTRY;
-
-        old_count = lov_info->mdd_lov_desc.ld_tgt_count;
-        rc = mdd_lov_update_desc(ctxt, mdd);
-        if (rc)
-                RETURN(rc);
-
-        /*
-         * idx is set as data from lov_notify.
-         * XXX did not consider recovery here
-         */
-        if (idx != MDSLOV_NO_INDEX) {
-                if (idx >= lov_info->mdd_lov_desc.ld_tgt_count) {
-                        CERROR("index %d > count %d!\n", idx,
-                               lov_info->mdd_lov_desc.ld_tgt_count);
-                        RETURN(-EINVAL);
-                }
-
-                if (idx >= lov_info->mdd_lov_objids_in_file) {
-                        /* We never read this lastid; ask the osc */
-                        obd_id lastid;
-                        __u32 size = sizeof(lastid);
-                        rc = obd_get_info(watched->obd_self_export,
-                                          strlen("last_id"),
-                                          "last_id", &size, &lastid);
-                        if (rc)
-                                RETURN(rc);
-                        lov_info->mdd_lov_objids[idx] = lastid;
-                        lov_info->mdd_lov_objids_dirty = 1;
-                        mdd_lov_write_objids(ctxt, lov_info);
-                } else {
-                        /* We have read this lastid from disk; tell the osc.
-                           Don't call this during recovery. */
-                        rc = mdd_lov_set_nextid(mdd);
-                }
-
-                CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
-                      lov_info->mdd_lov_objids[idx], idx);
-        }
-
-        RETURN(rc);
-}
-
-static int mdd_lov_clear_orphans(struct mdd_lov_info *mli,
-                                 struct obd_uuid *ost_uuid)
-{
-        int rc;
-        struct obdo oa;
-        struct obd_trans_info oti = {0};
-        struct lov_stripe_md  *empty_ea = NULL;
-        ENTRY;
-
-        LASSERT(mli->mdd_lov_objids != NULL);
-
-        /* This create will in fact either create or destroy:  If the OST is
-         * missing objects below this ID, they will be created.  If it finds
-         * objects above this ID, they will be removed. */
-        memset(&oa, 0, sizeof(oa));
-        oa.o_valid = OBD_MD_FLFLAGS;
-        oa.o_flags = OBD_FL_DELORPHAN;
-        if (ost_uuid != NULL) {
-                memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
-                oa.o_valid |= OBD_MD_FLINLINE;
-        }
-        rc = obd_create(mli->mdd_lov_obd->obd_self_export, &oa,
-                        &empty_ea, &oti);
-
-        RETURN(rc);
-}
-
-/* We only sync one osc at a time, so that we don't have to hold
-   any kind of lock on the whole mds_lov_desc, which may change
-   (grow) as a result of mds_lov_add_ost.  This also avoids any
-   kind of mismatch between the lov_desc and the mds_lov_desc,
-   which are not in lock-step during lov_add_obd */
-static int __mdd_lov_synchronize(void *data)
-{
-        struct mdd_lov_sync_info *mlsi = data;
-        struct lu_device *ld = mlsi->mlsi_ld;
-        struct obd_device *watched = mlsi->mlsi_watched;
-        struct lu_context *ctxt = mlsi->mlsi_ctxt;
-        struct mdd_device *mdd = lu2mdd_dev(ld);
-        struct obd_uuid *uuid;
-        __u32  idx = mlsi->mlsi_index;
-        int rc = 0;
-        ENTRY;
-
-        OBD_FREE(mlsi, sizeof(*mlsi));
-
-        LASSERT(ld);
-        LASSERT(watched);
-        uuid = &watched->u.cli.cl_target_uuid;
-        LASSERT(uuid);
-
-        rc = mdd_lov_update_mds(ctxt, ld, watched, idx);
-        if (rc != 0)
-                GOTO(out, rc);
-
-        rc = obd_set_info_async(mdd->mdd_lov_info.mdd_lov_obd->obd_self_export,
-                                strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid,
-                                NULL);
-        if (rc != 0) {
-                CERROR("failed at obd_set_info_async: %d\n", rc);
-                GOTO(out, rc);
-        }
-
-        rc = mdd_lov_clear_orphans(&mdd->mdd_lov_info, uuid);
-        if (rc != 0) {
-                CERROR("failed at mds_lov_clear_orphans: %d\n", rc);
-                GOTO(out, rc);
-        }
-out:
-        EXIT;
-        lu_device_put(ld);
-        return rc;
-}
-
-int mdd_lov_synchronize(void *data)
-{
-        struct mdd_lov_sync_info *mlsi = data;
-        char name[20];
-
-        sprintf(name, "ll_mlov_sync_%02u", mlsi->mlsi_index);
-        ptlrpc_daemonize(name);
-
-        RETURN(__mdd_lov_synchronize(data));
-}
-
-int mdd_lov_start_synchronize(const struct lu_context *ctxt,
-                              struct lu_device *ld,
-                              struct obd_device *watched,
-                              void *data, int nonblock)
-{
-        struct mdd_lov_sync_info *mlsi;
-        int rc;
-
-        ENTRY;
-
-        LASSERT(watched);
-
-        OBD_ALLOC(mlsi, sizeof(*mlsi));
-        if (mlsi == NULL)
-                RETURN(-ENOMEM);
-
-        mlsi->mlsi_ctxt = (struct lu_context *)ctxt;
-        mlsi->mlsi_ld = ld;
-        mlsi->mlsi_watched = watched;
-        if (data)
-                mlsi->mlsi_index = *(__u32 *)data;
-        else
-                mlsi->mlsi_index = MDSLOV_NO_INDEX;
-
-        /* Although class_export_get(obd->obd_self_export) would lock
-           the MDS in place, since it's only a self-export
-           it doesn't lock the LOV in place.  The LOV can be disconnected
-           during MDS precleanup, leaving nothing for __mdd_lov_synchronize.
-           Simply taking an export ref on the LOV doesn't help, because it's
-           still disconnected. Taking an obd reference insures that we don't
-           disconnect the LOV.  This of course means a cleanup won't
-           finish for as long as the sync is blocking. */
-        lu_device_get(ld);
-#if 0
-        if (nonblock) {
-                /* Synchronize in the background */
-                rc = cfs_kernel_thread(mdd_lov_synchronize, mlsi,
-                                       CLONE_VM | CLONE_FILES);
-                if (rc < 0) {
-                        CERROR("error starting mdd_lov_synchronize: %d\n", rc);
-                        lu_device_put(ld);
-                } else {
-                        CDEBUG(D_HA, "mdd_lov_synchronize idx=%d thread=%d\n",
-                               mlsi->mlsi_index, rc);
-                        rc = 0;
-                }
-        } else {
-                rc = __mdd_lov_synchronize((void *)mlsi);
-        }
-#else
-        /*FIXME: Did not implement the nonblock lov sync here. because ctxt can not
-         * be shared, maybe we need ref_count for ctxt */
-        rc = __mdd_lov_synchronize((void *)mlsi);
-#endif
         RETURN(rc);
 }
 
@@ -569,38 +195,16 @@ int mdd_notify(const struct lu_context *ctxt, struct lu_device *ld,
         int rc = 0;
         ENTRY;
 
-        switch (ev) {
-        /* We only handle these: */
-        case OBD_NOTIFY_ACTIVE:
-        case OBD_NOTIFY_SYNC:
-        case OBD_NOTIFY_SYNC_NONBLOCK:
-                break;
-        default:
-                RETURN(0);
-        }
-
-        CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
-
-        if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
-                CERROR("unexpected notification of %s %s!\n",
-                       watched->obd_type->typ_name, watched->obd_name);
-                RETURN(-EINVAL);
-        }
-
-        /*FIXME later, Recovery stuff still not be designed */
-        if (obd->obd_recovering) {
-                CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
-                      obd->obd_name,
-                      obd_uuid2str(&watched->u.cli.cl_target_uuid));
-                /* We still have to fix the lov descriptor for ost's added
-                   after the mdt in the config log. They didn't make it into
-                   mds_lov_connect. */
-                rc = mdd_lov_update_desc(ctxt, mdd);
+        rc = md_lov_notity_pre(obd, &mdd->mdd_lov_info, watched, ev, data);
+        if (rc) {
+                if (rc == -ENOENT || rc == -EBUSY)
+                        rc = 0;
                 RETURN(rc);
         }
 
-        rc = mdd_lov_start_synchronize(ctxt, ld, watched, data,
-                                       !(ev == OBD_NOTIFY_SYNC));
+        rc = md_lov_start_synchronize(obd, &mdd->mdd_lov_info, watched, data, 
+                                      !(ev == OBD_NOTIFY_SYNC), ctxt);
+
         RETURN(rc);
 }
 
@@ -651,7 +255,7 @@ int mdd_lov_set_md(const struct lu_context *ctxt, struct md_object *pobj,
 int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                    struct mdd_object *child)
 {
-        struct mdd_lov_info *mli = &mdd->mdd_lov_info;
+        struct md_lov_info *mli = &mdd->mdd_lov_info;
         struct obdo *oa;
         struct lov_mds_md *lmm = NULL;
         struct lov_stripe_md *lsm = NULL;
@@ -667,19 +271,18 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
         oa->o_size = 0;
 
-        rc = obd_create(mli->mdd_lov_obd->obd_self_export, oa, &lsm, NULL);
+        rc = obd_create(mli->md_lov_exp, oa, &lsm, NULL);
         if (rc)
                 GOTO(out_oa, rc);
 
-        rc = obd_packmd(mli->mdd_lov_obd->obd_self_export, &lmm, lsm);
+        rc = obd_packmd(mli->md_lov_exp, &lmm, lsm);
         if (rc < 0) {
                 CERROR("cannot pack lsm, err = %d\n", rc);
                 GOTO(out_oa, rc);
         }
         lmm_size = rc;
-
-        rc = mdd_xattr_set(ctxt, &child->mod_obj, lmm, lmm_size,
-                           MDS_LOV_MD_NAME);
+        rc = 0;
+        /*FIXME: did not set MD here */
 out_oa:
         obdo_free(oa);
         RETURN(rc);
index 124d178..82214b0 100644 (file)
@@ -555,7 +555,8 @@ err_health_check:
             filp_close(mds->mds_health_check_filp, 0))
                 CERROR("can't close %s after error\n", HEALTH_CHECK);
 err_lov_objid:
-        if (mds->mds_lov_objid_filp && filp_close(mds->mds_lov_objid_filp, 0))
+        if (mds->mds_lov_objid_filp && 
+                filp_close((struct file *)mds->mds_lov_objid_filp, 0))
                 CERROR("can't close %s after error\n", LOV_OBJID);
 err_client:
         class_disconnect_exports(obd);
@@ -595,7 +596,7 @@ int mds_fs_cleanup(struct obd_device *obd)
                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
         }
         if (mds->mds_lov_objid_filp) {
-                rc = filp_close(mds->mds_lov_objid_filp, 0);
+                rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
                 mds->mds_lov_objid_filp = NULL;
                 if (rc)
                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
index f67a19f..3a61aa9 100644 (file)
@@ -183,9 +183,10 @@ int mds_llog_finish(struct obd_device *obd, int count);
 /* mds/mds_lov.c */
 int mds_lov_connect(struct obd_device *obd, char * lov_name);
 int mds_lov_disconnect(struct obd_device *obd);
-int mds_lov_write_objids(struct obd_device *obd);
-void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
+int mds_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli, 
+                         const void *ctxt);
 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
+void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
 int mds_lov_set_nextid(struct obd_device *obd);
 int mds_lov_start_synchronize(struct obd_device *obd, 
                               struct obd_device *watched,
index ed4296b..a9b2c32 100644 (file)
@@ -53,7 +53,6 @@ void md_lov_info_update_objids(struct md_lov_info *mli, obd_id *ids)
                 }
         unlock_kernel();
 }
-EXPORT_SYMBOL(md_lov_info_update_objids);
 
 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
 {
@@ -62,77 +61,83 @@ void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
         md_lov_info_update_objids(&mds->mds_lov_info, ids);
 }
 
-static int mds_lov_read_objids(struct obd_device *obd)
+static int mds_lov_read_objids(struct obd_device *obd, struct md_lov_info *mli, 
+                               const void *ctxt)
 {
-        struct mds_obd *mds = &obd->u.mds;
+        struct file *filp = (struct file *)mli->md_lov_objid_obj;
         obd_id *ids;
         loff_t off = 0;
         int i, rc, size;
         ENTRY;
 
-        LASSERT(!mds->mds_lov_objids_size);
-        LASSERT(!mds->mds_lov_objids_dirty);
+        LASSERT(!mli->md_lov_objids_size);
+        LASSERT(!mli->md_lov_objids_dirty);
 
         /* Read everything in the file, even if our current lov desc 
            has fewer targets. Old targets not in the lov descriptor 
            during mds setup may still have valid objids. */
-        size = mds->mds_lov_objid_filp->f_dentry->d_inode->i_size;
+        size = filp->f_dentry->d_inode->i_size;
         if (size == 0)
                 RETURN(0);
 
         OBD_ALLOC(ids, size);
         if (ids == NULL)
                 RETURN(-ENOMEM);
-        mds->mds_lov_objids = ids;
-        mds->mds_lov_objids_size = size;
+        mli->md_lov_objids = ids;
+        mli->md_lov_objids_size = size;
 
-        rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
+        rc = fsfilt_read_record(obd, filp, ids, size, &off);
         if (rc < 0) {
                 CERROR("Error reading objids %d\n", rc);
                 RETURN(rc);
         }
                 
-        mds->mds_lov_objids_in_file = size / sizeof(*ids); 
+        mli->md_lov_objids_in_file = size / sizeof(*ids); 
         
-        for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
+        for (i = 0; i < mli->md_lov_objids_in_file; i++) {
                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
-                       mds->mds_lov_objids[i], i);
+                       mli->md_lov_objids[i], i);
         }
         RETURN(0);
 }
 
-int mds_lov_write_objids(struct obd_device *obd)
+int mds_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli, 
+                         const void *ctxt)
 {
-        struct mds_obd *mds = &obd->u.mds;
+        struct file *filp = (struct file *)mli->md_lov_objid_obj;
         loff_t off = 0;
         int i, rc, tgts; 
         ENTRY;
 
-        if (!mds->mds_lov_objids_dirty)
+        if (!mli->md_lov_objids_dirty)
                 RETURN(0);
 
-        tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
+        tgts = max(mli->md_lov_desc.ld_tgt_count, mli->md_lov_objids_in_file);
 
         if (!tgts)
                 RETURN(0);
 
         for (i = 0; i < tgts; i++)
                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
-                       mds->mds_lov_objids[i], i);
+                       mli->md_lov_objids[i], i);
 
-        rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
-                                 mds->mds_lov_objids, tgts * sizeof(obd_id),
+        rc = fsfilt_write_record(obd, filp, mli->md_lov_objids, 
+                                 tgts * sizeof(obd_id),
                                  &off, 0);
         if (rc >= 0) {
-                mds->mds_lov_objids_dirty = 0;
+                mli->md_lov_objids_dirty = 0;
                 rc = 0;
         }
 
         RETURN(rc);
 }
 
-int md_lov_info_clear_orphans(struct md_lov_info *mli, 
-                              struct obd_uuid *ost_uuid)
+struct md_lov_ops mli_ops = {
+        .ml_read_objids = mds_lov_read_objids,
+        .ml_write_objids = mds_lov_write_objids,
+};
+
+int md_lov_clear_orphans(struct md_lov_info *mli, struct obd_uuid *ost_uuid)
 {
         int rc;
         struct obdo oa;
@@ -152,54 +157,44 @@ int md_lov_info_clear_orphans(struct md_lov_info *mli,
                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
                 oa.o_valid |= OBD_MD_FLINLINE;
         }
-        rc = obd_create(mli->md_osc_exp, &oa, &empty_ea, &oti);
+        rc = obd_create(mli->md_lov_exp, &oa, &empty_ea, &oti);
 
         RETURN(rc);
 }
-EXPORT_SYMBOL(md_lov_info_clear_orphans);
 
 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
 {
-        return md_lov_info_clear_orphans(&mds->mds_lov_info, ost_uuid);
+        return md_lov_clear_orphans(&mds->mds_lov_info, ost_uuid);
 }
 
-int md_lov_info_set_nextid(struct md_lov_info *mli)
+/* update the LOV-OSC knowledge of the last used object id's */
+static int md_lov_info_set_nextid(struct obd_device *obd, 
+                                  struct md_lov_info *mli)
 {
         int rc;
         ENTRY;
-        
+
+        LASSERT(!obd->obd_recovering);
         LASSERT(mli->md_lov_objids != NULL);
 
-        rc = obd_set_info_async(mli->md_osc_exp, strlen(KEY_NEXT_ID),
+        rc = obd_set_info_async(mli->md_lov_exp, strlen(KEY_NEXT_ID),
                                 KEY_NEXT_ID,
                                 mli->md_lov_desc.ld_tgt_count,
                                 mli->md_lov_objids, NULL);
-        
 
         RETURN(rc);
-
 }
-EXPORT_SYMBOL(md_lov_info_set_nextid);
 
-/* update the LOV-OSC knowledge of the last used object id's */
 int mds_lov_set_nextid(struct obd_device *obd)
 {
-        struct mds_obd *mds = &obd->u.mds;
-        int rc;
-
-        LASSERT(!obd->obd_recovering);
-        
-        rc = md_lov_info_set_nextid(&mds->mds_lov_info);
-        if (rc) 
-                CERROR ("%s: mds_lov_set_nextid failed (%d)\n", 
-                        obd->obd_name, rc);
-        RETURN(rc);
+        struct md_lov_info *mli = &obd->u.mds.mds_lov_info;
+        return md_lov_info_set_nextid(obd, mli);
 }
 
 int md_lov_info_update_desc(struct md_lov_info *mli, struct obd_export *lov)
 {
         struct lov_desc *ld; 
-        __u32 size, valsize = sizeof(mli->md_lov_desc);
+        __u32 size, stripes, valsize = sizeof(mli->md_lov_desc);
         int rc = 0;
         ENTRY;
 
@@ -244,65 +239,50 @@ int md_lov_info_update_desc(struct md_lov_info *mli, struct obd_export *lov)
         mli->md_lov_desc = *ld;
         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
                mli->md_lov_desc.ld_tgt_count);
-out:
-        OBD_FREE(ld, sizeof(*ld));
-        RETURN(rc);
-}
-
-/* Update the lov desc for a new size lov. */
-static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
-{
-        struct mds_obd *mds = &obd->u.mds;
-        __u32 stripes;
-        int rc = 0;
-        ENTRY;
 
-        rc = md_lov_info_update_desc(&mds->mds_lov_info, lov);
-        if (rc)
-                GOTO(out, rc);
-        
         stripes = min((__u32)LOV_MAX_STRIPE_COUNT, 
-                      max(mds->mds_lov_desc.ld_tgt_count,
-                          mds->mds_lov_objids_in_file));
-        mds->mds_max_mdsize = lov_mds_md_size(stripes);
-        mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
+                      max(mli->md_lov_desc.ld_tgt_count,
+                          mli->md_lov_objids_in_file));
+
+        mli->md_lov_max_mdsize = lov_mds_md_size(stripes);
+        mli->md_lov_max_cookiesize = stripes * sizeof(struct llog_cookie);
         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
-               mds->mds_max_mdsize, mds->mds_max_cookiesize);
+               mli->md_lov_max_mdsize, mli->md_lov_max_cookiesize);
+
 out:
+        OBD_FREE(ld, sizeof(*ld));
         RETURN(rc);
 }
 
-
 #define MDSLOV_NO_INDEX -1
-
 /* Inform MDS about new/updated target */
-static int mds_lov_update_mds(struct obd_device *obd,   
+static int mds_lov_update_mds(struct obd_device *obd,
+                              struct md_lov_info *mli,
                               struct obd_device *watched, 
-                              __u32 idx)
+                              __u32 idx, const void  *ctxt)
 {
-        struct mds_obd *mds = &obd->u.mds;
         int old_count;
-        int rc = 0;
+        int rc;
         ENTRY;
 
-        old_count = mds->mds_lov_desc.ld_tgt_count;
-        rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+        old_count = mli->md_lov_desc.ld_tgt_count;
+        rc = md_lov_info_update_desc(mli, mli->md_lov_exp);
         if (rc)
                 RETURN(rc);
 
         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
-               idx, obd->obd_recovering, obd->obd_async_recov, old_count, 
-               mds->mds_lov_desc.ld_tgt_count);
+               idx, obd->obd_recovering, obd->obd_async_recov, old_count,
+               mli->md_lov_desc.ld_tgt_count);
 
         /* idx is set as data from lov_notify. */
         if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
-                if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+                if (idx >= mli->md_lov_desc.ld_tgt_count) {
                         CERROR("index %d > count %d!\n", idx, 
-                               mds->mds_lov_desc.ld_tgt_count);
+                               mli->md_lov_desc.ld_tgt_count);
                         RETURN(-EINVAL);
                 }
                 
-                if (idx >= mds->mds_lov_objids_in_file) {
+                if (idx >= mli->md_lov_objids_in_file) {
                         /* We never read this lastid; ask the osc */
                         obd_id lastid;
                         __u32 size = sizeof(lastid);
@@ -311,113 +291,143 @@ static int mds_lov_update_mds(struct obd_device *obd,
                                           "last_id", &size, &lastid);
                         if (rc)
                                 RETURN(rc);
-                        mds->mds_lov_objids[idx] = lastid;
-                        mds->mds_lov_objids_dirty = 1;
-                        mds_lov_write_objids(obd);
+                        mli->md_lov_objids[idx] = lastid;
+                        mli->md_lov_objids_dirty = 1;
+                        mli->md_lov_ops->ml_write_objids(obd, mli, ctxt);
                 } else {
                         /* We have read this lastid from disk; tell the osc.
                            Don't call this during recovery. */ 
-                        rc = mds_lov_set_nextid(obd);
+                        rc = md_lov_info_set_nextid(obd, mli);
                 }
         
                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
-                      mds->mds_lov_objids[idx], idx);
+                       mli->md_lov_objids[idx], idx);
         }
-
+#if 0
+      /*FIXME: Do not support llog in mdd, so disable this temporarily*/
         /* If we added a target we have to reconnect the llogs */
         /* Only do this at first add (idx), or the first time after recovery */
         if (idx != MDSLOV_NO_INDEX || 1/*FIXME*/) {
                 CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx);
                 /* These two must be atomic */
-                down(&mds->mds_orphan_recovery_sem);
+                down(&mli->md_lov_orphan_recovery_sem);
                 obd_llog_finish(obd, old_count);
-                llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
-                up(&mds->mds_orphan_recovery_sem);
+                llog_cat_initialize(obd, mli->md_lov_desc.ld_tgt_count);
+                up(&mli->md_lov_orphan_recovery_sem);
         }
-
+#endif
         RETURN(rc);
 }
 
-/* update the LOV-OSC knowledge of the last used object id's */
-int mds_lov_connect(struct obd_device *obd, char * lov_name)
+int md_lov_connect(struct obd_device *obd, struct md_lov_info *mli,
+                   char *lov_name, struct obd_uuid *uuid, 
+                   struct md_lov_ops *mlo, const void *ctxt)
 {
-        struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0,};
         struct obd_connect_data *data;
-        int rc, i;
-        ENTRY;
+        int rc;
 
-        if (IS_ERR(mds->mds_osc_obd))
-                RETURN(PTR_ERR(mds->mds_osc_obd));
+        if (IS_ERR(mli->md_lov_obd))
+                RETURN(PTR_ERR(mli->md_lov_obd));
 
-        if (mds->mds_osc_obd)
+        if (mli->md_lov_obd)
                 RETURN(0);
 
-        mds->mds_osc_obd = class_name2obd(lov_name);
-        if (!mds->mds_osc_obd) {
+        mli->md_lov_obd = class_name2obd(lov_name);
+        if (!mli->md_lov_obd) {
                 CERROR("MDS cannot locate LOV %s\n", lov_name);
-                mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
+                mli->md_lov_obd = ERR_PTR(-ENOTCONN);
                 RETURN(-ENOTCONN);
         }
 
+        mli->md_lov_ops = mlo; 
+
         OBD_ALLOC(data, sizeof(*data));
         if (data == NULL)
                 RETURN(-ENOMEM);
         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
                                   OBD_CONNECT_REQPORTAL;
         data->ocd_version = LUSTRE_VERSION_CODE;
+        
         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
-        rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data);
+        rc = obd_connect(&conn, mli->md_lov_obd, uuid, data);
         OBD_FREE(data, sizeof(*data));
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
-                mds->mds_osc_obd = ERR_PTR(rc);
-                RETURN(rc);
+                GOTO(out, rc);
         }
-        mds->mds_osc_exp = class_conn2export(&conn);
+        mli->md_lov_exp = class_conn2export(&conn);
 
-        rc = obd_register_observer(mds->mds_osc_obd, obd);
+        rc = obd_register_observer(mli->md_lov_obd, obd);
         if (rc) {
                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
                        lov_name, rc);
-                GOTO(err_discon, rc);
+                GOTO(out, rc);
         }
-
-        rc = mds_lov_read_objids(obd);
+        
+        rc = mli->md_lov_ops->ml_read_objids(obd, mli, ctxt);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
-                GOTO(err_reg, rc);
+                GOTO(out, rc);
         }
 
-        rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+        rc = md_lov_info_update_desc(mli, mli->md_lov_exp);
         if (rc)
-                GOTO(err_reg, rc);
+                GOTO(out, rc);
+out:
+        RETURN(rc);
+}
+EXPORT_SYMBOL(md_lov_connect);
 
-        /* tgt_count may be 0! */
-        rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
-        if (rc) {
-                CERROR("failed to initialize catalog %d\n", rc);
-                GOTO(err_reg, rc);
-        }
+int md_lov_update_objids(struct obd_device *obd, struct md_lov_info *mli, 
+                         const void *ctxt)
+{
+        int rc = 0, i;
 
         /* If we're mounting this code for the first time on an existing FS,
          * we need to populate the objids array from the real OST values */
-        if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
-                int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
-                rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
-                                  "last_id", &size, mds->mds_lov_objids);
+        if (mli->md_lov_desc.ld_tgt_count > mli->md_lov_objids_in_file) {
+                int size = sizeof(obd_id) * mli->md_lov_desc.ld_tgt_count;
+                rc = obd_get_info(mli->md_lov_exp, strlen("last_id"),
+                                  "last_id", &size, mli->md_lov_objids);
                 if (!rc) {
-                        for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+                        for (i = 0; i < mli->md_lov_desc.ld_tgt_count; i++)
                                 CWARN("got last object "LPU64" from OST %d\n",
-                                      mds->mds_lov_objids[i], i);
-                        mds->mds_lov_objids_dirty = 1;
-                        rc = mds_lov_write_objids(obd);
+                                      mli->md_lov_objids[i], i);
+                        mli->md_lov_objids_dirty = 1;
+                        rc = mli->md_lov_ops->ml_write_objids(obd, mli, ctxt);
                         if (rc)
                                 CERROR("got last objids from OSTs, but error "
                                        "writing objids file: %d\n", rc);
                 }
         }
+        return rc;
+}
+
+/* update the LOV-OSC knowledge of the last used object id's */
+int mds_lov_connect(struct obd_device *obd, char * lov_name)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct md_lov_info *mli = &mds->mds_lov_info;
+        int rc;
+        ENTRY;
+
+        rc = md_lov_connect(obd, mli, lov_name, &obd->obd_uuid, &mli_ops, 
+                            NULL);
+        if (rc)
+                GOTO(err_reg, rc);
 
+        /* tgt_count may be 0! */
+        rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
+        if (rc) {
+                CERROR("failed to initialize catalog %d\n", rc);
+                GOTO(err_reg, rc);
+        }
+
+        /* If we're mounting this code for the first time on an existing FS,
+         * we need to populate the objids array from the real OST values */
+        rc = md_lov_update_objids(obd, mli, NULL);
+        
         /* I want to see a callback happen when the OBD moves to a
          * "For General Use" state, and that's when we'll call
          * set_nextid().  The class driver can help us here, because
@@ -429,10 +439,11 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
 
 err_reg:
         obd_register_observer(mds->mds_osc_obd, NULL);
-err_discon:
-        obd_disconnect(mds->mds_osc_exp);
-        mds->mds_osc_exp = NULL;
-        mds->mds_osc_obd = ERR_PTR(rc);
+        if (mli->md_lov_exp) {
+                obd_disconnect(mli->md_lov_exp);
+                mli->md_lov_exp = NULL;
+        }
+        mli->md_lov_obd = ERR_PTR(rc);
         RETURN(rc);
 }
 
@@ -654,9 +665,11 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 }
 
 struct mds_lov_sync_info {
-        struct obd_device *mlsi_obd;     /* the lov device to sync */
-        struct obd_device *mlsi_watched; /* target osc */
-        __u32              mlsi_index;   /* index of target */
+        struct obd_device  *mlsi_obd;     /* the lov device to sync */
+        struct md_lov_info *mlsi_mli; 
+        struct obd_device  *mlsi_watched; /* target osc */
+        __u32               mlsi_index;   /* index of target */
+        const void         *mlsi_ctxt;
 };
 
 
@@ -670,7 +683,8 @@ static int __mds_lov_synchronize(void *data)
         struct mds_lov_sync_info *mlsi = data;
         struct obd_device *obd = mlsi->mlsi_obd;
         struct obd_device *watched = mlsi->mlsi_watched;
-        struct mds_obd *mds = &obd->u.mds;
+        struct md_lov_info *mli = mlsi->mlsi_mli;
+        const void *ctxt = mlsi->mlsi_ctxt;
         struct obd_uuid *uuid;
         __u32  idx = mlsi->mlsi_index;
         int rc = 0;
@@ -683,15 +697,16 @@ static int __mds_lov_synchronize(void *data)
         uuid = &watched->u.cli.cl_target_uuid;
         LASSERT(uuid);
 
-        rc = mds_lov_update_mds(obd, watched, idx);
+        rc = mds_lov_update_mds(obd, mli, watched, idx, ctxt);
         if (rc != 0)
                 GOTO(out, rc);
         
-        rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
+        rc = obd_set_info_async(mli->md_lov_exp, strlen(KEY_MDS_CONN),
                                 KEY_MDS_CONN, 0, uuid, NULL);
         if (rc != 0)
                 GOTO(out, rc);
-
+#if 0
+        /*disable for not support llog in mdd*/
         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
                           mds->mds_lov_desc.ld_tgt_count,
                           NULL, NULL, uuid);
@@ -701,16 +716,16 @@ static int __mds_lov_synchronize(void *data)
                        obd->obd_name, rc);
                 GOTO(out, rc);
         }
-
+#endif
         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
               obd->obd_name, obd_uuid2str(uuid));
 
         if (obd->obd_stopping)
                 GOTO(out, rc = -ENODEV);
 
-        rc = mds_lov_clear_orphans(mds, uuid);
+        rc = md_lov_clear_orphans(mli, uuid);
         if (rc != 0) {
-                CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
+                CERROR("%s: failed at md_lov_clear_orphans: %d\n",
                        obd->obd_name, rc);
                 GOTO(out, rc);
         }
@@ -731,9 +746,9 @@ int mds_lov_synchronize(void *data)
         RETURN(__mds_lov_synchronize(data));
 }
 
-int mds_lov_start_synchronize(struct obd_device *obd, 
-                              struct obd_device *watched,
-                              void *data, int nonblock)
+int md_lov_start_synchronize(struct obd_device *obd, struct md_lov_info *mli,
+                             struct obd_device *watched,
+                             void *data, int nonblock, const void *ctxt)
 {
         struct mds_lov_sync_info *mlsi;
         int rc;
@@ -748,6 +763,8 @@ int mds_lov_start_synchronize(struct obd_device *obd,
 
         mlsi->mlsi_obd = obd;
         mlsi->mlsi_watched = watched;
+        mlsi->mlsi_mli = mli;
+        mlsi->mlsi_ctxt = ctxt;
         if (data) 
                 mlsi->mlsi_index = *(__u32 *)data;
         else
@@ -762,7 +779,7 @@ int mds_lov_start_synchronize(struct obd_device *obd,
            disconnect the LOV.  This of course means a cleanup won't
            finish for as long as the sync is blocking. */
         class_incref(obd);
-
+#if 0
         if (nonblock) {
                 /* Synchronize in the background */
                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
@@ -780,15 +797,26 @@ int mds_lov_start_synchronize(struct obd_device *obd,
         } else {
                 rc = __mds_lov_synchronize((void *)mlsi);
         }
-
+#else
+                rc = __mds_lov_synchronize((void *)mlsi);
+#endif
         RETURN(rc);
 }
+EXPORT_SYMBOL(md_lov_start_synchronize);
 
-int mds_notify(struct obd_device *obd, struct obd_device *watched,
-               enum obd_notify_event ev, void *data)
+int mds_lov_start_synchronize(struct obd_device *obd, 
+                              struct obd_device *watched,
+                              void *data, int nonblock)
+{
+        return md_lov_start_synchronize(obd, &obd->u.mds.mds_lov_info,
+                                        watched, data, nonblock, NULL);
+}
+
+int md_lov_notity_pre(struct obd_device *obd, struct md_lov_info *mli,
+                      struct obd_device *watched, enum obd_notify_event ev,
+                      void *data)
 {
         int rc = 0;
-        ENTRY;
 
         switch (ev) {
         /* We only handle these: */
@@ -797,7 +825,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
         case OBD_NOTIFY_SYNC_NONBLOCK:
                 break;
         default:
-                RETURN(0);
+                RETURN(-ENOENT);
         }
 
         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
@@ -815,16 +843,33 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                 /* We still have to fix the lov descriptor for ost's added 
                    after the mdt in the config log.  They didn't make it into
                    mds_lov_connect. */
-                rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
+                rc = md_lov_info_update_desc(mli, mli->md_lov_exp);
+                RETURN(-EBUSY);
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(md_lov_notity_pre);
+
+int mds_notify(struct obd_device *obd, struct obd_device *watched,
+               enum obd_notify_event ev, void *data)
+{
+        int rc = 0;
+        struct md_lov_info *mli = &obd->u.mds.mds_lov_info;
+        ENTRY;
+
+        rc = md_lov_notity_pre(obd, mli, watched, ev, data);
+        if (rc) {
+                if (rc == -ENOENT || rc == -EBUSY)
+                        rc = 0;
                 RETURN(rc);
         }
 
         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
         rc = mds_lov_start_synchronize(obd, watched, data, 
                                        !(ev == OBD_NOTIFY_SYNC));
-        
+
         lquota_recovery(quota_interface, obd);
-                
+
         RETURN(rc);
 }
 
index 43e1ba3..02ad8b7 100644 (file)
@@ -186,7 +186,7 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                      "wrote trans #"LPU64" rc %d client %s at idx %u: err = %d",
                      transno, rc, mcd->mcd_uuid, med->med_lr_idx, err);
 
-        err = mds_lov_write_objids(obd);
+        err = mds_lov_write_objids(obd, &mds->mds_lov_info, NULL);
         if (err) {
                 log_pri = D_ERROR;
                 if (rc == 0)