Whamcloud - gitweb
Fixes and cleanups in lmv.
[fs/lustre-release.git] / lustre / mds / mds_lov.c
index c6b6839..64e7041 100644 (file)
@@ -30,6 +30,7 @@
 
 #include <linux/module.h>
 #include <linux/lustre_mds.h>
+#include <linux/obd_ost.h>
 #include <linux/lustre_idl.h>
 #include <linux/obd_class.h>
 #include <linux/obd_lov.h>
@@ -117,7 +118,7 @@ int mds_lov_write_objids(struct obd_device *obd)
         RETURN(rc);
 }
 
-static int mds_lov_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
+int mds_lov_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
 {
         int rc;
         struct obdo oa;
@@ -131,7 +132,8 @@ static int mds_lov_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
          * missing objects below this ID, they will be created.  If it finds
          * objects above this ID, they will be removed. */
         memset(&oa, 0, sizeof(oa));
-        oa.o_valid = OBD_MD_FLFLAGS;
+        oa.o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+        oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
         oa.o_flags = OBD_FL_DELORPHAN;
         if (ost_uuid != NULL) {
                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
@@ -155,12 +157,6 @@ int mds_lov_set_nextid(struct obd_device *obd)
 
         rc = obd_set_info(mds->mds_osc_exp, strlen("next_id"), "next_id",
                           mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids);
-        if (rc < 0)
-                GOTO(out, rc);
-
-        rc = mds_lov_clearorphans(mds, NULL /* all OSTs */);
-
-out:
         RETURN(rc);
 }
 
@@ -180,8 +176,9 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0,};
-        int valsize;
-        int rc, i;
+        char name[32] = "CATLIST";
+        int valsize, rc, i;
+        __u32 group;
         ENTRY;
 
         if (IS_ERR(mds->mds_osc_obd))
@@ -220,7 +217,9 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (rc)
                 GOTO(err_reg, rc);
 
-        mds->mds_max_mdsize = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
+        i = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
+        if (i > mds->mds_max_mdsize)
+                mds->mds_max_mdsize = i;
         mds->mds_max_cookiesize = mds->mds_lov_desc.ld_tgt_count*
                 sizeof(struct llog_cookie);
         mds->mds_has_lov_desc = 1;
@@ -230,15 +229,18 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 GOTO(err_reg, rc);
         }
 
-        rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
+        rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 
+                                     mds->mds_lov_desc.ld_tgt_count, name);
         if (rc) {
                 CERROR("failed to initialize catalog %d\n", rc);
                 GOTO(err_reg, rc);
         }
 
         /* FIXME before set info call is made, we must initialize logging */
+        group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+        valsize = sizeof(group);
         rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
-                          0, NULL);
+                          valsize, &group);
         if (rc)
                 GOTO(err_reg, rc);
 
@@ -265,28 +267,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
          * set_nextid().  The class driver can help us here, because
          * it can use the obd_recovering flag to determine when the
          * the OBD is full available. */
-        if (!obd->obd_recovering) {
-                rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
-                                  obd->u.mds.mds_lov_desc.ld_tgt_count, NULL,
-                                  NULL, NULL);
-                if (rc != 0)
-                        CERROR("faild at llog_origin_connect: %d\n", rc);
-
-                rc = mds_cleanup_orphans(obd);
-                if (rc > 0)
-                        CERROR("Cleanup %d orphans while MDS isn't recovering\n", rc);
-
-                rc = mds_lov_set_nextid(obd);
-                if (rc)
-                        GOTO(err_llog, rc);
-        }
+        if (!obd->obd_recovering)
+                rc = mds_postrecov(obd);
         RETURN(rc);
 
-err_llog:
-        /* cleanup all llogging subsystems */
-        rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
-        if (rc)
-                CERROR("failed to cleanup llogging subsystems\n");
 err_reg:
         obd_register_observer(mds->mds_osc_obd, NULL);
 err_discon:
@@ -304,7 +288,8 @@ int mds_lov_disconnect(struct obd_device *obd, int flags)
 
         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
                 /* cleanup all llogging subsystems */
-                rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
+                rc = obd_llog_finish(obd, &obd->obd_llogs,
+                                     mds->mds_lov_desc.ld_tgt_count);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystems\n");
 
@@ -330,7 +315,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         struct obd_device *obd = exp->exp_obd;
         struct mds_obd *mds = &obd->u.mds;
         struct obd_ioctl_data *data = karg;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         int rc = 0;
 
         switch (cmd) {
@@ -339,15 +324,16 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 if (mds->mds_cfg_llh)
                         RETURN(-EBUSY);
 
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
-                rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                rc = llog_create(llog_get_context(&obd->obd_llogs, 
+                                 LLOG_CONFIG_ORIG_CTXT),
                                  &mds->mds_cfg_llh, NULL, name);
                 if (rc == 0)
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          &cfg_uuid);
                 else
                         mds->mds_cfg_llh = NULL;
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 RETURN(rc);
         }
@@ -356,9 +342,9 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 if (!mds->mds_cfg_llh)
                         RETURN(-EBADF);
 
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = llog_close(mds->mds_cfg_llh);
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 mds->mds_cfg_llh = NULL;
                 RETURN(rc);
@@ -369,18 +355,19 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 if (mds->mds_cfg_llh)
                         RETURN(-EBUSY);
 
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
-                rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT), 
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                rc = llog_create(llog_get_context(&obd->obd_llogs, 
+                                 LLOG_CONFIG_ORIG_CTXT),
                                  &mds->mds_cfg_llh, NULL, name);
                 if (rc == 0) {
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          NULL);
-
+                                                                                                                             
                         rc = llog_destroy(mds->mds_cfg_llh);
                         llog_free_handle(mds->mds_cfg_llh);
                 }
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
-
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                                                                                                                             
                 mds->mds_cfg_llh = NULL;
                 RETURN(rc);
         }
@@ -411,10 +398,10 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         RETURN(rc);
                 }
 
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
                                     cfg_buf, -1);
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 OBD_FREE(cfg_buf, data->ioc_plen1);
                 RETURN(rc);
@@ -422,10 +409,10 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         case OBD_IOC_PARSE: {
                 struct llog_ctxt *ctxt =
-                        llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                        llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT);
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 if (rc)
                         RETURN(rc);
 
@@ -434,10 +421,10 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         case OBD_IOC_DUMP_LOG: {
                 struct llog_ctxt *ctxt =
-                        llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                        llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT);
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 if (rc)
                         RETURN(rc);
 
@@ -453,7 +440,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
                 LASSERT(handle);
-                rc = fsfilt_commit(obd, inode, handle, 1);
+                rc = fsfilt_commit(obd, obd->u.mds.mds_sb, inode, handle, 1);
 
                 dev_set_rdonly(ll_sbdev(obd->u.mds.mds_sb), 2);
                 RETURN(0);
@@ -461,7 +448,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         case OBD_IOC_CATLOGLIST: {
                 int count = mds->mds_lov_desc.ld_tgt_count;
-                rc = llog_catlog_list(obd, count, data);
+                rc = llog_catalog_list(obd, count, data);
                 RETURN(rc);
 
         }
@@ -469,16 +456,20 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_LLOG_CANCEL:
         case OBD_IOC_LLOG_REMOVE: {
                 struct llog_ctxt *ctxt =
-                        llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+                        llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT);
+                char name[32] = "CATLIST";
                 int rc2;
 
-                obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
-                push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+                obd_llog_finish(obd, &obd->obd_llogs,
+                                mds->mds_lov_desc.ld_tgt_count);
+                push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
-                pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-                llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
-                rc2 = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
-                          0, NULL);
+                pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
+                obd_llog_cat_initialize(obd, &obd->obd_llogs, 
+                                        mds->mds_lov_desc.ld_tgt_count,
+                                        name);
+                rc2 = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"),
+                                   "mds_conn", 0, NULL);
                 if (!rc)
                         rc = rc2;
                 RETURN(rc);
@@ -486,11 +477,11 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_LLOG_INFO:
         case OBD_IOC_LLOG_PRINT: {
                 struct llog_ctxt *ctxt =
-                        llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+                        llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT);
 
-                push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+                push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
-                pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
 
                 RETURN(rc);
         }
@@ -504,6 +495,92 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 RETURN(-EINVAL);
         }
         RETURN(0);
+
+}
+
+struct mds_lov_sync_info {
+        struct obd_device *mlsi_obd; /* the lov device to sync */
+        struct obd_uuid   *mlsi_uuid;  /* target to sync */
+};
+
+int mds_lov_synchronize(void *data)
+{
+        struct mds_lov_sync_info *mlsi = data;
+        struct llog_ctxt *ctxt;
+        struct obd_device *obd;
+        struct obd_uuid *uuid;
+        unsigned long flags;
+        int rc;
+
+        lock_kernel();
+        ptlrpc_daemonize();
+
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+
+        obd = mlsi->mlsi_obd;
+        uuid = mlsi->mlsi_uuid;
+
+        OBD_FREE(mlsi, sizeof(*mlsi));
+
+        LASSERT(obd != NULL);
+        LASSERT(uuid != NULL);
+
+        rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), 
+                          "mds_conn", 0, uuid);
+        if (rc != 0)
+                RETURN(rc);
+        
+        ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
+        LASSERT(ctxt != NULL);
+
+        rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
+                          NULL, NULL, uuid);
+        if (rc != 0) {
+                CERROR("%s: failed at llog_origin_connect: %d\n", 
+                       obd->obd_name, rc);
+                RETURN(rc);
+        }
+        
+        CWARN("MDS %s: %s now active, resetting orphans\n",
+              obd->obd_name, uuid->uuid);
+        rc = mds_lov_clearorphans(&obd->u.mds, uuid);
+        if (rc != 0) {
+                CERROR("%s: failed at mds_lov_clearorphans: %d\n", 
+                       obd->obd_name, rc);
+                RETURN(rc);
+        }
+
+        RETURN(0);
+}
+
+int mds_lov_start_synchronize(struct obd_device *obd, struct obd_uuid *uuid)
+{
+        struct mds_lov_sync_info *mlsi;
+        int rc;
+        
+        ENTRY;
+
+        OBD_ALLOC(mlsi, sizeof(*mlsi));
+        if (mlsi == NULL)
+                RETURN(-ENOMEM);
+
+        mlsi->mlsi_obd = obd;
+        mlsi->mlsi_uuid = uuid;
+
+        rc = kernel_thread(mds_lov_synchronize, mlsi, CLONE_VM | CLONE_FILES);
+        if (rc < 0)
+                CERROR("%s: error starting mds_lov_synchronize: %d\n", 
+                       obd->obd_name, rc);
+        else {
+                CDEBUG(D_HA, "%s: mds_lov_synchronize thread: %d\n", 
+                       obd->obd_name, rc);
+                rc = 0;
+        }
+
+        RETURN(rc);
 }
 
 int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
@@ -515,7 +592,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
         if (!active)
                 RETURN(0);
 
-        if (strcmp(watched->obd_type->typ_name, "osc")) {
+        if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
                 CERROR("unexpected notification of %s %s!\n",
                        watched->obd_type->typ_name, watched->obd_name);
                 RETURN(-EINVAL);
@@ -526,24 +603,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
                       obd->obd_name, uuid->uuid);
         } else {
-                LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
-
-                rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), "mds_conn",
-                                  0, uuid);
-                if (rc != 0)
-                        RETURN(rc);
-
-                rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
-                                  obd->u.mds.mds_lov_desc.ld_tgt_count,
-                                  NULL, NULL, uuid);
-                if (rc != 0) {
-                        CERROR("faild at llog_origin_connect: %d\n", rc);
-                        RETURN(rc);
-                }
-
-                CWARN("MDS %s: %s now active, resetting orphans\n",
-                      obd->obd_name, uuid->uuid);
-                rc = mds_lov_clearorphans(&obd->u.mds, uuid);
+                rc = mds_lov_start_synchronize(obd, uuid);
         }
         RETURN(rc);
 }
@@ -590,7 +650,7 @@ int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
 
         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size);
 
-        err = fsfilt_commit(obd, inode, handle, 0);
+        err = fsfilt_commit(obd, obd->u.mds.mds_sb, inode, handle, 0);
         if (!rc)
                 rc = err ? err : lmm_size;
         GOTO(conv_free, rc);