Whamcloud - gitweb
- use OSD to maintain lovobjids files
authoralex <alex>
Tue, 29 Sep 2009 12:34:07 +0000 (12:34 +0000)
committeralex <alex>
Tue, 29 Sep 2009 12:34:07 +0000 (12:34 +0000)
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_trans.c
lustre/mds/handler.c
lustre/mds/mds_lov.c

index d692736..ea3ba0b 100644 (file)
@@ -103,6 +103,7 @@ int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd,
         struct lustre_cfg_bufs *bufs;
         struct lustre_cfg      *lcfg;
         struct obd_device      *obd;
+        struct mds_obd         *mds;
         ENTRY;
 
         mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
@@ -145,6 +146,8 @@ int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd,
                 LBUG();
         }
 
+        mds = &obd->u.mds;
+        mds->mds_next_dev = mdd->mdd_child;
         obd->obd_recovering = 1;
         obd->u.mds.mds_id = mds_id;
         rc = class_setup(obd, lcfg);
index 91db093..f94d0de 100644 (file)
@@ -94,7 +94,7 @@ int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
         struct obd_device *obd = mdd2obd_dev(mdd);
 
         LASSERT(obd);
-        return mds_lov_write_objids(obd);
+        return mds_lov_write_objids(env, obd, txn);
 }
 
 int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn,
index 228af5d..7beb9f7 100644 (file)
@@ -177,24 +177,17 @@ static int mds_lov_clean(struct obd_device *obd)
         RETURN(0);
 }
 
-static int mds_postsetup(struct obd_device *obd)
+int mds_lov_init(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct llog_ctxt *ctxt;
-        int rc = 0;
+        int rc;
         ENTRY;
 
-        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
-                        &llog_lvfs_ops);
-        if (rc)
-                RETURN(rc);
-
-        rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
-                        &llog_lvfs_ops);
-        if (rc)
-                GOTO(err_llog, rc);
-
-        mds_changelog_llog_init(obd, obd);
+        rc = mds_lov_init_objids(obd);
+        if (rc != 0) {
+               CERROR("cannot init lov objid rc = %d\n", rc);
+               GOTO(out, rc );
+        }
 
         if (mds->mds_profile) {
                 struct lustre_profile *lprof;
@@ -206,16 +199,38 @@ static int mds_postsetup(struct obd_device *obd)
                 lprof = class_get_profile(mds->mds_profile);
                 if (lprof == NULL) {
                         CERROR("No profile found: %s\n", mds->mds_profile);
-                        GOTO(err_cleanup, rc = -ENOENT);
+                        GOTO(out, rc = -ENOENT);
                 }
                 rc = mds_lov_connect(obd, lprof->lp_dt);
                 if (rc)
-                        GOTO(err_cleanup, rc);
+                        GOTO(out, rc);
         }
 
+out:
+        RETURN(rc);
+}
+EXPORT_SYMBOL(mds_lov_init);
+
+static int mds_postsetup(struct obd_device *obd)
+{
+        struct llog_ctxt *ctxt;
+        int rc = 0;
+        ENTRY;
+
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+                        &llog_lvfs_ops);
+        if (rc)
+                RETURN(rc);
+
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+                        &llog_lvfs_ops);
+        if (rc)
+                GOTO(err_llog, rc);
+
+        mds_changelog_llog_init(obd, obd);
+
         RETURN(rc);
 
-err_cleanup:
         mds_lov_clean(obd);
         ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
         if (ctxt)
@@ -362,7 +377,7 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         lmi->lmi_dt->dd_ops->dt_conf_get(NULL, lmi->lmi_dt, &dt_param);
         mnt = dt_param.ddp_mnt;
         if (mnt == NULL) {
-                CERROR("non-ldiskfs underlying filesystem\n");
+                //CERROR("non-ldiskfs underlying filesystem\n");
                 goto new_diskfs;
         }
 
@@ -394,12 +409,6 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         }
 
 new_diskfs:
-        rc = mds_lov_init_objids(obd);
-        if (rc != 0) {
-               CERROR("cannot init lov objid rc = %d\n", rc);
-               GOTO(err_fid, rc );
-        }
-
         rc = mds_lov_presetup(mds, lcfg);
         if (rc < 0)
                 GOTO(err_objects, rc);
index 7eb3de3..2c92b21 100644 (file)
@@ -52,6 +52,7 @@
 #include <obd_cksum.h>
 
 #include "mds_internal.h"
+#include <lustre_fid.h>
 
 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
 {
@@ -94,7 +95,10 @@ int mds_lov_init_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
-        struct file *file;
+        struct dt_object *o;
+        struct dt_device *d;
+        struct lu_env env;
+        struct lu_fid fid;
         int rc;
         ENTRY;
 
@@ -104,11 +108,57 @@ int mds_lov_init_objids(struct obd_device *obd)
         if (mds->mds_lov_page_dirty == NULL)
                 RETURN(-ENOMEM);
 
-
         OBD_ALLOC(mds->mds_lov_page_array, size);
         if (mds->mds_lov_page_array == NULL)
                 GOTO(err_free_bitmap, rc = -ENOMEM);
 
+        d = mds->mds_next_dev;
+        LASSERT(d);
+
+        rc = lu_env_init(&env, d->dd_lu_dev.ld_type->ldt_ctx_tags);
+        if (rc) {
+                CERROR("can't initialize env: %d\n", rc);
+                GOTO(err_free, rc);
+        }
+
+        lu_local_obj_fid(&fid, MDD_LOV_OBJ_OID);
+        o = dt_locate(&env, d, &fid);
+        if (IS_ERR(o))
+                GOTO(err_free, rc = PTR_ERR(o));
+
+        if (!dt_object_exists(o)) {
+                struct dt_object_format  dof;
+                struct lu_attr           attr;
+                struct thandle          *th;
+
+                memset(&attr, 0, sizeof(attr));
+                attr.la_valid = LA_MODE;
+                attr.la_mode = S_IFREG | 0666;
+                dof.dof_type = dt_mode_to_dft(S_IFREG);
+
+                th = dt_trans_create(&env, d);
+                LASSERT(!IS_ERR(th));
+
+                rc = dt_declare_create(&env, o, &attr, NULL, &dof, th);
+                LASSERT(rc == 0);
+
+                rc = dt_trans_start(&env, d, th);
+                LASSERT(rc == 0);
+
+                dt_write_lock(&env, o, 0);
+                LASSERT(!dt_object_exists(o));
+
+                rc = dt_create(&env, o, &attr, NULL, &dof, th);
+                LASSERT(rc == 0);
+
+                dt_write_unlock(&env, o);
+
+                dt_trans_stop(&env, d, th);
+        }
+
+        mds->mds_lov_objid_dt = o;
+
+#if 0
         /* open and test the lov objd file */
         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
         if (IS_ERR(file)) {
@@ -122,16 +172,22 @@ int mds_lov_init_objids(struct obd_device *obd)
                 GOTO(err_open, rc = -ENOENT);
         }
         mds->mds_lov_objid_filp = file;
+#endif
 
+        lu_env_fini(&env);
         RETURN (0);
+#if 0
 err_open:
         if (filp_close((struct file *)file, 0))
                 CERROR("can't close %s after error\n", LOV_OBJID);
+#endif
 err_free:
         OBD_FREE(mds->mds_lov_page_array, size);
 err_free_bitmap:
         FREE_BITMAP(mds->mds_lov_page_dirty);
 
+        lu_env_fini(&env);
+
         RETURN(rc);
 }
 
@@ -151,6 +207,24 @@ void mds_lov_destroy_objids(struct obd_device *obd)
                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
         }
 
+        if (mds->mds_lov_objid_dt) {
+                struct lu_env env;
+                struct dt_device *dt;
+
+                dt = mds->mds_next_dev;
+                LASSERT(dt);
+
+                rc = lu_env_init(&env, dt->dd_lu_dev.ld_type->ldt_ctx_tags);
+                if (rc) {
+                        CERROR("can't initialize env: %d\n", rc);
+                        GOTO(out, rc);
+                }
+
+                lu_object_put(&env, &mds->mds_lov_objid_dt->do_lu);
+
+                lu_env_fini(&env);
+        }
+
         if (mds->mds_lov_objid_filp) {
                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
                 mds->mds_lov_objid_filp = NULL;
@@ -158,6 +232,7 @@ void mds_lov_destroy_objids(struct obd_device *obd)
                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
         }
 
+out:
         FREE_BITMAP(mds->mds_lov_page_dirty);
         EXIT;
 }
@@ -382,23 +457,43 @@ static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
 static int mds_lov_read_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
+        struct lu_env env;
+        struct dt_device *dt;
+        struct dt_object *o;
+        struct lu_attr    attr;
         loff_t off = 0;
         int i, rc = 0, count = 0, page = 0;
         unsigned long size;
         ENTRY;
 
+        dt = mds->mds_next_dev;
+        LASSERT(dt);
+        o = mds->mds_lov_objid_dt;
+        LASSERT(o);
+
+        rc = lu_env_init(&env, dt->dd_lu_dev.ld_type->ldt_ctx_tags);
+        if (rc) {
+                CERROR("can't initialize env: %d\n", rc);
+                RETURN(rc);
+        }
+
         /* Read everything in the file, even if our current lov desc
            has fewer targets. Old targets not in the lov descriptor
            during mds setup may still have valid objids. */
-        size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
+        rc = dt_attr_get(&env, o, &attr, NULL);
+        LASSERT(rc == 0);
+        LASSERT(attr.la_valid & LA_SIZE);
+        size = attr.la_size;
         if (size == 0)
-                RETURN(0);
+                GOTO(out2, rc = 0);
 
-        page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
+        page = (size + (OBJID_PER_PAGE() * sizeof(obd_id) - sizeof(obd_id)) /
+                (OBJID_PER_PAGE() * sizeof(obd_id)));
         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
         for (i = 0; i < page; i++) {
                 obd_id *data;
                 loff_t off_old = off;
+                struct lu_buf lb;
 
                 LASSERT(mds->mds_lov_page_array[i] == NULL);
                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
@@ -407,8 +502,12 @@ static int mds_lov_read_objids(struct obd_device *obd)
 
                 data = mds->mds_lov_page_array[i];
 
-                rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
-                                        OBJID_PER_PAGE()*sizeof(obd_id), &off);
+                lb.lb_vmalloc = 0;
+                lb.lb_buf = data;
+                lb.lb_len = OBJID_PER_PAGE()*sizeof(obd_id);
+                if (off + lb.lb_len > size)
+                        lb.lb_len = size - off;
+                rc = dt_record_read(&env, o, &lb, &off);
                 if (rc < 0) {
                         CERROR("Error reading objids %d\n", rc);
                         GOTO(out, rc);
@@ -430,16 +529,29 @@ static int mds_lov_read_objids(struct obd_device *obd)
                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
 out:
         mds_lov_dump_objids("read",obd);
+out2:
+        lu_env_fini(&env);
 
         RETURN(rc);
 }
 
-int mds_lov_write_objids(struct obd_device *obd)
+int mds_lov_write_objids(const struct lu_env *env,
+                         struct obd_device *obd,
+                         struct thandle *th)
 {
         struct mds_obd *mds = &obd->u.mds;
+        struct dt_object *o;
+        struct lu_buf lb;
         int i = 0, rc = 0;
         ENTRY;
 
+        if (mds->mds_next_dev == NULL)
+                RETURN(0);
+
+        o = mds->mds_lov_objid_dt;
+        if (o == NULL)
+                RETURN(0);
+
         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
                 RETURN(0);
 
@@ -459,16 +571,13 @@ int mds_lov_write_objids(struct obd_device *obd)
                 if (i == mds->mds_lov_objid_lastpage)
                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
 
-                CDEBUG(D_INFO, "write %lld - %u\n", off, size);
-                if (obd->obd_fsops) {
-                        rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
-                                         size, &off, 0);
-                        if (rc < 0) {
-                                cfs_bitmap_set(mds->mds_lov_page_dirty, i);
-                                break;
-                        }
-                } else {
-                        CERROR("not implemented yet\n");
+                lb.lb_vmalloc = 0;
+                lb.lb_buf = data;
+                lb.lb_len = size;
+                rc = dt_record_write(env, o, &lb, &off, th);
+                if (rc < 0) {
+                        cfs_bitmap_set(mds->mds_lov_page_dirty, i);
+                        break;
                 }
         }
         if (rc >= 0)