Whamcloud - gitweb
Branch: HEAD
authorwangdi <wangdi>
Thu, 3 Jul 2008 20:11:49 +0000 (20:11 +0000)
committerwangdi <wangdi>
Thu, 3 Jul 2008 20:11:49 +0000 (20:11 +0000)
1.Filter group llog cleanup (patch from shadow).
2.Separate filter group llog create and find. then filter group llog will
  only be created in the mds/ost syncing process.
3.Some minor fixes.
b=14629
i=robert,yury

17 files changed:
lustre/ChangeLog
lustre/include/lustre_log.h
lustre/include/obd.h
lustre/lmv/lmv_obd.c
lustre/lov/lov_internal.h
lustre/lov/lov_log.c
lustre/mdc/mdc_request.c
lustre/mds/mds_internal.h
lustre/mds/mds_log.c
lustre/mgc/libmgc.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_log.c
lustre/osc/osc_request.c

index bb8cb73..2f2bda4 100644 (file)
@@ -1143,6 +1143,12 @@ Details    : The direct IO path doesn't call check_rpcs to submit a new RPC once
             one is completed. As a result, some RPCs are stuck in the queue
             and are never sent.
 
+Severity   : normal
+Bugzilla   : 14629 
+Description: filter threads hungs on waiting journal commit  
+Details    : Cleanup filter group llog code, then only filter group llog will
+            be only created in the MDS/OST syncing process.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
index 31bd543..5cd9d62 100644 (file)
@@ -130,7 +130,7 @@ int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
 int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
                 int count, struct llog_cookie *cookies, int flags);
 
-int llog_obd_origin_setup(struct obd_device *obd, struct obd_llog_group *olg, 
+int llog_obd_origin_setup(struct obd_device *obd, struct obd_llog_group *olg,
                           int index, struct obd_device *disk_obd, int count,
                           struct llog_logid *logid);
 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
@@ -140,8 +140,8 @@ int llog_obd_origin_add(struct llog_ctxt *ctxt,
 
 int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
                         int count, struct obd_uuid *uuid);
-int obd_llog_init(struct obd_device *obd, int group,
-                  struct obd_device *disk_obd, int count, 
+int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+                  struct obd_device *disk_obd, int count,
                   struct llog_catid *logid, struct obd_uuid *uuid);
 
 int obd_llog_finish(struct obd_device *obd, int count);
index 7cc09ec..1995330 100644 (file)
@@ -861,6 +861,7 @@ struct obd_llog_group {
         cfs_waitq_t        olg_waitq;
         spinlock_t         olg_lock;
         struct obd_export *olg_exp;
+        int                olg_initializing;
 };
 
 /* corresponds to one of the obd's */
@@ -1275,7 +1276,7 @@ struct obd_ops {
                              int cmd, obd_off *);
 
         /* llog related obd_methods */
-        int (*o_llog_init)(struct obd_device *obd, int group,
+        int (*o_llog_init)(struct obd_device *obd, struct obd_llog_group *grp,
                            struct obd_device *disk_obd, int count,
                            struct llog_catid *logid, struct obd_uuid *uuid);
         int (*o_llog_finish)(struct obd_device *obd, int count);
index 13aac50..73aba2a 100644 (file)
@@ -499,7 +499,7 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
                         RETURN(-EINVAL);
                 }
 
-                rc = obd_llog_init(obd, OBD_LLOG_GROUP, mdc_obd, 0, NULL, tgt_uuid);
+                rc = obd_llog_init(obd, &obd->obd_olg, mdc_obd, 0, NULL, tgt_uuid);
                 if (rc) {
                         lmv_init_unlock(lmv);
                         CERROR("lmv failed to setup llogging subsystems\n");
@@ -2487,7 +2487,7 @@ repeat:
         RETURN(rc);
 }
 
-static int lmv_llog_init(struct obd_device *obd, int group,
+static int lmv_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
index 9981ed8..129f1d5 100644 (file)
@@ -236,7 +236,7 @@ void lov_getref(struct obd_device *obd);
 void lov_putref(struct obd_device *obd);
 
 /* lov_log.c */
-int lov_llog_init(struct obd_device *obd, int group, 
+int lov_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid);
 int lov_llog_finish(struct obd_device *obd, int count);
index 8c2186b..b39c83b 100644 (file)
@@ -182,7 +182,7 @@ static struct llog_operations lov_size_repl_logops = {
         lop_cancel: lov_llog_repl_cancel
 };
 
-int lov_llog_init(struct obd_device *obd, int group, 
+int lov_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid)
 {
@@ -191,13 +191,13 @@ int lov_llog_init(struct obd_device *obd, int group,
         int i, rc = 0, err = 0;
         ENTRY;
 
-        LASSERT(group == OBD_LLOG_GROUP);
-        rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+        LASSERT(olg == &obd->obd_olg);
+        rc = llog_setup(obd, olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
                         &lov_mds_ost_orig_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
                         &lov_size_repl_logops);
         if (rc)
                 RETURN(rc);
@@ -212,7 +212,7 @@ int lov_llog_init(struct obd_device *obd, int group,
                 CDEBUG(D_CONFIG, "init %d/%d\n", i, count);
                 LASSERT(lov->lov_tgts[i]->ltd_exp);
                 child = lov->lov_tgts[i]->ltd_exp->exp_obd;
-                rc = obd_llog_init(child, group, tgt, 1, logid + i, uuid);
+                rc = obd_llog_init(child, &child->obd_olg, tgt, 1, logid + i, uuid);
                 if (rc) {
                         CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
                                "(rc=%d)\n", i, child->obd_name, tgt->obd_name,
index cb4b12d..cf6d57a 100644 (file)
@@ -1524,7 +1524,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
         sptlrpc_lprocfs_cliobd_attach(obd);
         ptlrpc_lprocfs_register_obd(obd);
 
-        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
         if (rc) {
                 mdc_cleanup(obd);
                 CERROR("failed to setup llogging subsystems\n");
@@ -1611,17 +1611,15 @@ static int mdc_cleanup(struct obd_device *obd)
 }
 
 
-static int mdc_llog_init(struct obd_device *obd, int group,
+static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
-        struct obd_llog_group *olg = &obd->obd_olg;
         int rc;
         ENTRY;
 
-        LASSERT(group == OBD_LLOG_GROUP);
-        LASSERT(olg->olg_group == group);
+        LASSERT(olg == &obd->obd_olg);
 
         rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0,
                         NULL, &llog_client_ops);
index 03d4888..e0a3b17 100644 (file)
@@ -12,7 +12,7 @@ int mds_cleanup_pending(struct obd_device *obd);
 
 
 /* mds/mds_log.c */
-int mds_llog_init(struct obd_device *obd, int group,
+int mds_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *tgt, int count,
                   struct llog_catid *logid, struct obd_uuid *uuid);
 int mds_llog_finish(struct obd_device *obd, int count);
index e1488d2..93d48a6 100644 (file)
@@ -102,7 +102,7 @@ static struct llog_operations mds_size_repl_logops = {
         lop_cancel:     mds_llog_repl_cancel,
 };
 
-int mds_llog_init(struct obd_device *obd, int group,
+int mds_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid)
 {
@@ -110,7 +110,7 @@ int mds_llog_init(struct obd_device *obd, int group,
         int rc;
         ENTRY;
 
-        LASSERT(group == OBD_LLOG_GROUP);
+        LASSERT(olg == &obd->obd_olg);
         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
                         &mds_ost_orig_logops);
         if (rc)
@@ -121,7 +121,7 @@ int mds_llog_init(struct obd_device *obd, int group,
         if (rc)
                 RETURN(rc);
 
-        rc = obd_llog_init(lov_obd, group, tgt, count, logid, uuid);
+        rc = obd_llog_init(lov_obd, &lov_obd->obd_olg, tgt, count, logid, uuid);
         if (rc)
                 CERROR("lov_llog_init err %d\n", rc);
 
index 87b51d2..da6975f 100644 (file)
@@ -51,7 +51,7 @@ static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc)
                 GOTO(err_decref, rc);
 
-        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
@@ -93,24 +93,22 @@ static int mgc_cleanup(struct obd_device *obd)
         ENTRY;
 
         LASSERT(cli->cl_mgc_vfsmnt == NULL);
-        
+
         ptlrpcd_decref();
 
         rc = client_obd_cleanup(obd);
         RETURN(rc);
 }
 
-static int mgc_llog_init(struct obd_device *obd, int group,
+static int mgc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                          struct obd_device *tgt, int count, 
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
-        struct obd_llog_group *olg = &obd->obd_olg;
         int rc;
         ENTRY;
 
-        LASSERT(group == olg->olg_group);
-        LASSERT(group == OBD_LLOG_GROUP);
+        LASSERT(olg == &obd->obd_olg);
         rc = llog_setup(obd, olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
index 37135c5..e8ddf27 100644 (file)
@@ -512,7 +512,7 @@ static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc)
                 GOTO(err_decref, rc);
 
-        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
@@ -528,7 +528,7 @@ static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 cfs_waitq_init(&rq_waitq);
         }
         spin_unlock(&config_list_lock);
-        
+
         RETURN(rc);
 
 err_cleanup:
@@ -890,17 +890,15 @@ static int mgc_import_event(struct obd_device *obd,
         RETURN(rc);
 }
 
-static int mgc_llog_init(struct obd_device *obd, int group,
+static int mgc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
-        struct obd_llog_group *olg = &obd->obd_olg;
         int rc;
         ENTRY;
 
-        LASSERT(group == OBD_LLOG_GROUP);
-        LASSERT(olg->olg_group == group);
+        LASSERT(olg == &obd->obd_olg);
 
         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
                         &llog_lvfs_ops);
index af67705..cdbc8d1 100644 (file)
@@ -126,17 +126,14 @@ static int mgs_disconnect(struct obd_export *exp)
 static int mgs_cleanup(struct obd_device *obd);
 static int mgs_handle(struct ptlrpc_request *req);
 
-static int mgs_llog_init(struct obd_device *obd, int group,
+static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
-        struct obd_llog_group *olg = &obd->obd_olg;
         int rc;
         ENTRY;
 
-        LASSERT(group == OBD_LLOG_GROUP);
-        LASSERT(olg->olg_group == group);
-
+        LASSERT(olg == &obd->obd_olg);
         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         RETURN(rc);
@@ -198,7 +195,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 GOTO(err_ns, rc);
         }
 
-        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
         if (rc)
                 GOTO(err_fs, rc);
 
index 9e2ac5f..a26e336 100644 (file)
@@ -295,7 +295,7 @@ int llog_obd_origin_setup(struct obd_device *obd, struct obd_llog_group *olg,
 
         LASSERT(olg != NULL);
         ctxt = llog_group_get_ctxt(olg, index);
-        
+
         LASSERT(ctxt);
         llog_gen_init(ctxt);
 
@@ -406,7 +406,7 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
                 GOTO(out, rc);
         }
 
-        rc = obd_llog_init(obd, olg->olg_group, obd, count, idarray, uuid);
+        rc = obd_llog_init(obd, olg, obd, count, idarray, uuid);
         if (rc) {
                 CERROR("rc: %d\n", rc);
                 GOTO(out, rc);
@@ -425,8 +425,8 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
 }
 EXPORT_SYMBOL(llog_cat_initialize);
 
-int obd_llog_init(struct obd_device *obd, int group, 
-                  struct obd_device *disk_obd, int count, 
+int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+                  struct obd_device *disk_obd, int count,
                   struct llog_catid *logid, struct obd_uuid *uuid)
 {
         int rc;
@@ -434,7 +434,7 @@ int obd_llog_init(struct obd_device *obd, int group,
         OBD_CHECK_DT_OP(obd, llog_init, 0);
         OBD_COUNTER_INCREMENT(obd, llog_init);
 
-        rc = OBP(obd, llog_init)(obd, group, disk_obd, count, logid, uuid);
+        rc = OBP(obd, llog_init)(obd, olg, disk_obd, count, logid, uuid);
         RETURN(rc);
 }
 EXPORT_SYMBOL(obd_llog_init);
index efb1228..7551909 100644 (file)
@@ -609,14 +609,15 @@ static int llog_run_tests(struct obd_device *obd)
 }
 
 
-static int llog_test_llog_init(struct obd_device *obd, int group,
-                               struct obd_device *tgt, int count, 
+static int llog_test_llog_init(struct obd_device *obd,
+                               struct obd_llog_group *olg,
+                               struct obd_device *tgt, int count,
                                struct llog_catid *logid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, &obd->obd_olg, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL, 
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL,
                         &llog_lvfs_ops);
         RETURN(rc);
 }
index b451ff8..d7de59a 100644 (file)
@@ -1974,7 +1974,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         obd->obd_lvfs_ctxt.fs = get_ds();
         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
 
-        sema_init(&filter->fo_init_lock, 1);
+        init_mutex(&filter->fo_init_lock);
         filter->fo_committed_group = 0;
 
         rc = filter_prep(obd);
@@ -2137,6 +2137,25 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         return rc;
 }
 
+static int filter_group_llog_finish(struct obd_llog_group *olg)
+{
+        struct llog_ctxt *ctxt;
+        int rc = 0, rc2 = 0;
+        ENTRY;
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
+        if (ctxt)
+                rc2 = llog_cleanup(ctxt);
+        if (!rc)
+                rc = rc2;
+
+        RETURN(rc);
+}
+
 static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/;
 static struct llog_operations filter_size_orig_logops = {
         lop_setup: llog_obd_origin_setup,
@@ -2144,59 +2163,62 @@ static struct llog_operations filter_size_orig_logops = {
         lop_add: llog_obd_origin_add
 };
 
-static int filter_llog_init(struct obd_device *obd, int group,
-                            struct obd_device *tgt, int count,
-                            struct llog_catid *catid,
-                            struct obd_uuid *uuid)
+static int filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
+                           struct obd_device *tgt)
+{
+        int rc;
+
+        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+                        &filter_mds_ost_repl_logops);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+                        &filter_size_orig_logops);
+        if (rc)
+                GOTO(cleanup, rc);
+cleanup:
+        if (rc)
+              filter_group_llog_finish(olg);
+        RETURN(rc);
+}
+
+static int 
+filter_default_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
+                        struct obd_device *tgt)
 {
         struct filter_obd *filter = &obd->u.filter;
-        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc;
-        ENTRY;
 
-        olg = filter_find_olg(obd, group);
-        if (IS_ERR(olg))
-                RETURN(PTR_ERR(olg));
-
-        if (group == OBD_LLOG_GROUP) {
-                LASSERT(filter->fo_lcm == NULL);
-                OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
-                if (!filter->fo_lcm)
-                        RETURN(-ENOMEM);
+        LASSERT(filter->fo_lcm == NULL);
+        OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+        if (!filter->fo_lcm)
+                RETURN(-ENOMEM);
 
-                rc = llog_init_commit_master((struct llog_commit_master *)
-                                             filter->fo_lcm);
-                if (rc)
-                        GOTO(cleanup, rc);
+        rc = llog_init_commit_master((struct llog_commit_master *)
+                                     filter->fo_lcm);
+        if (rc)
+                GOTO(cleanup, rc);
 
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
-        } else {
-                LASSERT(filter->fo_lcm != NULL);
-        }
-        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
-                        &filter_mds_ost_repl_logops);
+
+        rc = filter_olg_init(obd, olg, tgt);
         if (rc)
                 GOTO(cleanup, rc);
 
-        /* FIXME - assign unlink_cb for filter's recovery */
-        LASSERT(olg);
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-
         LASSERT(ctxt != NULL);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
         ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        /* Only start log commit thread for default llog ctxt*/
         rc = llog_start_commit_thread(ctxt->loc_lcm);
         llog_ctxt_put(ctxt);
         if (rc)
                 GOTO(cleanup, rc);
-
-        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
-                        &filter_size_orig_logops);
-
 cleanup:
         if (rc) {
                 llog_cleanup_commit_master(filter->fo_lcm, 0);
@@ -2205,22 +2227,33 @@ cleanup:
         }
         RETURN(rc);
 }
-
-static int filter_group_llog_finish(struct obd_llog_group *olg)
+                     
+static int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+                            struct obd_device *tgt, int count,
+                            struct llog_catid *catid,
+                            struct obd_uuid *uuid)
 {
+        struct filter_obd *filter = &obd->u.filter;
         struct llog_ctxt *ctxt;
-        int rc = 0, rc2 = 0;
+        int rc;
         ENTRY;
 
-        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-        if (ctxt)
-                rc = llog_cleanup(ctxt);
+        LASSERT(olg != NULL);
+        if (olg == &obd->obd_olg)
+                return filter_default_olg_init(obd, olg, tgt);
 
-        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
-        if (ctxt)
-                rc2 = llog_cleanup(ctxt);
-        if (!rc)
-                rc = rc2;
+        /* For other group llog, which will be initialized in
+         * llog_connect(after default_olg_init, so fo_lcm must
+         * already be initialized */ 
+        LASSERT(filter->fo_lcm != NULL);
+        rc = filter_olg_init(obd, olg, tgt);
+        if (rc)
+                RETURN(rc);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERT(ctxt != NULL);
+        ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+        ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        llog_ctxt_put(ctxt);
 
         RETURN(rc);
 }
@@ -2243,9 +2276,39 @@ static int filter_llog_finish(struct obd_device *obd, int count)
         RETURN(rc);
 }
 
+static struct obd_llog_group *
+filter_find_olg_internal(struct filter_obd *filter, int group)
+{
+        struct obd_llog_group *olg;
+
+        LASSERT_SPIN_LOCKED(&filter->fo_llog_list_lock);
+        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                if (olg->olg_group == group)
+                        RETURN(olg);
+        }
+        RETURN(NULL);
+}
+
 struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
 {
-        struct obd_llog_group *olg, *nolg;
+        struct obd_llog_group *olg = NULL;
+        struct filter_obd *filter;
+
+        filter = &obd->u.filter;
+
+        if (group == OBD_LLOG_GROUP)
+                RETURN(&obd->obd_olg);
+
+        spin_lock(&filter->fo_llog_list_lock);
+        olg = filter_find_olg_internal(filter, group);
+        spin_unlock(&filter->fo_llog_list_lock);
+
+        RETURN(olg); 
+}
+
+struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group)
+{
+        struct obd_llog_group *olg = NULL;
         struct filter_obd *filter;
         int rc;
 
@@ -2255,38 +2318,42 @@ struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
                 RETURN(&obd->obd_olg);
 
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
-                if (olg->olg_group == group) {
-                        spin_unlock(&filter->fo_llog_list_lock);
-                        RETURN(olg);
+        olg = filter_find_olg_internal(filter, group);
+        if (olg) {
+                if (olg->olg_initializing) {
+                        GOTO(out_unlock, olg = ERR_PTR(-EBUSY));
+                } else {
+                        GOTO(out_unlock, olg);
                 }
         }
-        spin_unlock(&filter->fo_llog_list_lock);
-
         OBD_ALLOC_PTR(olg);
         if (olg == NULL)
-                RETURN(ERR_PTR(-ENOMEM));
+               GOTO(out_unlock, olg = ERR_PTR(-ENOMEM));
 
         llog_group_init(olg, group);
-        spin_lock(&filter->fo_llog_list_lock);
-        list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
-                LASSERT(nolg->olg_group != group);
-        }
         list_add(&olg->olg_list, &filter->fo_llog_list);
+        olg->olg_initializing = 1;
         spin_unlock(&filter->fo_llog_list_lock);
 
         rc = llog_cat_initialize(obd, olg, 1, NULL);
         if (rc) {
-                spin_lock(&filter->fo_llog_list_lock);
-                list_del(&olg->olg_list);
-                spin_unlock(&filter->fo_llog_list_lock);
-                OBD_FREE_PTR(olg);
-                RETURN(ERR_PTR(rc));
+               spin_lock(&filter->fo_llog_list_lock);
+               list_del(&olg->olg_list);
+               spin_unlock(&filter->fo_llog_list_lock);
+               OBD_FREE_PTR(olg);
+               GOTO(out, olg = ERR_PTR(-ENOMEM));
         }
+        spin_lock(&filter->fo_llog_list_lock);
+        olg->olg_initializing = 0;
+        spin_unlock(&filter->fo_llog_list_lock);
         CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
-               obd->obd_name, group, olg);
-
+              obd->obd_name, group, olg);
+out:
         RETURN(olg);
+
+out_unlock:
+        spin_unlock(&filter->fo_llog_list_lock);
+        GOTO(out, olg);
 }
 
 static int filter_llog_connect(struct obd_export *exp,
@@ -2304,8 +2371,11 @@ static int filter_llog_connect(struct obd_export *exp,
                 (unsigned) body->lgdc_logid.lgl_ogen);
 
         olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
-        if (IS_ERR(olg))
-                RETURN(PTR_ERR(olg));
+        if (!olg) {
+                CERROR(" %s: can not find olg of group %d\n",
+                       obd->obd_name, (int)body->lgdc_logid.lgl_ogr); 
+                RETURN(-ENOENT);
+        }
         llog_group_set_export(olg, exp);
 
         ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
@@ -2323,8 +2393,9 @@ static int filter_llog_connect(struct obd_export *exp,
 
 static int filter_llog_preclean (struct obd_device *obd)
 {
-        struct obd_llog_group *olg;
+        struct obd_llog_group *olg, *tmp;
         struct filter_obd *filter;
+        struct list_head  remove_list;
         int rc = 0;
         ENTRY;
 
@@ -2333,21 +2404,25 @@ static int filter_llog_preclean (struct obd_device *obd)
                 CERROR("failed to cleanup llogging subsystem\n");
 
         filter = &obd->u.filter;
+        CFS_INIT_LIST_HEAD(&remove_list);
+
         spin_lock(&filter->fo_llog_list_lock);
         while (!list_empty(&filter->fo_llog_list)) {
                 olg = list_entry(filter->fo_llog_list.next,
                                  struct obd_llog_group, olg_list);
                 list_del(&olg->olg_list);
-                spin_unlock(&filter->fo_llog_list_lock);
-
+                list_add(&olg->olg_list, &remove_list);
+        }
+        spin_unlock(&filter->fo_llog_list_lock);
+        
+        list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) {  
+                list_del_init(&olg->olg_list);
                 rc = filter_group_llog_finish(olg);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
                                olg->olg_group);
                 OBD_FREE_PTR(olg);
-                spin_lock(&filter->fo_llog_list_lock);
         }
-        spin_unlock(&filter->fo_llog_list_lock);
 
         RETURN(rc);
 }
@@ -3740,8 +3815,11 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                         struct obd_llog_group *olg;
                         fcc = obdo_logcookie(oa);
                         olg = filter_find_olg(obd, oa->o_gr);
-                        if (IS_ERR(olg))
-                                GOTO(cleanup, rc = PTR_ERR(olg));
+                        if (!olg) {
+                               CERROR(" %s: can not find olg of group %d\n",
+                                      obd->obd_name, (int)oa->o_gr);
+                               GOTO(cleanup, rc = PTR_ERR(olg));
+                        }
                         llog_group_set_export(olg, exp);
 
                         ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
@@ -4039,11 +4117,12 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
         group = (int)(*(__u32 *)val);
         LASSERT(group >= FILTER_GROUP_MDS0);
 
-        olg = filter_find_olg(obd, group);
+        olg = filter_find_create_olg(obd, group);
         if (IS_ERR(olg))
                 RETURN(PTR_ERR(olg));
-        llog_group_set_export(olg, exp);
 
+        llog_group_set_export(olg, exp);
+        
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
         LASSERTF(ctxt != NULL, "ctxt is null\n"),
 
index 20afcf3..b06f563 100644 (file)
@@ -104,7 +104,7 @@ void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
         struct llog_cookie *cookie = cb_data;
         struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
-        int rc;        
+        int rc;
         
         /* we have to find context for right group */
         if (error != 0 || obd->obd_stopping) {
@@ -114,23 +114,24 @@ void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
         }
 
         olg = filter_find_olg(obd, cookie->lgc_lgl.lgl_ogr);
-        if (!IS_ERR(olg)) {
-                ctxt = llog_group_get_ctxt(olg, cookie->lgc_subsys + 1);
-                if (!ctxt) {
-                        CERROR("no valid context for group "LPU64"\n",
-                                cookie->lgc_lgl.lgl_ogr);
-                        GOTO(out, rc = 0);
-                }
-
-                OBD_FAIL_TIMEOUT(OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT, 30);
-
-                rc = llog_cancel(ctxt, NULL, 1, cookie, 0);
-                if (rc)
-                        CERROR("error cancelling log cookies: rc = %d\n", rc);
-                llog_ctxt_put(ctxt);
-        } else {
+        if (!olg) { 
                 CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr);
+                GOTO(out, rc = 0);
         }
+        
+        ctxt = llog_group_get_ctxt(olg, cookie->lgc_subsys + 1);
+        if (!ctxt) {
+                CERROR("no valid context for group "LPU64"\n",
+                        cookie->lgc_lgl.lgl_ogr);
+                GOTO(out, rc = 0);
+        }
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT, 30);
+
+        rc = llog_cancel(ctxt, NULL, 1, cookie, 0);
+        if (rc)
+                CERROR("error cancelling log cookies: rc = %d\n", rc);
+        llog_ctxt_put(ctxt);
 out:
         OBD_FREE(cookie, sizeof(*cookie));
 }
index 72661a4..38bd034 100644 (file)
@@ -3713,13 +3713,14 @@ static struct llog_operations osc_size_repl_logops = {
 };
 
 static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, int group,
+static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                          struct obd_device *tgt, int count,
                          struct llog_catid *catid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
-        LASSERT(group == OBD_LLOG_GROUP);
+
+        LASSERT(olg == &obd->obd_olg);
         spin_lock(&obd->obd_dev_lock);
         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
                 osc_mds_ost_orig_logops = llog_lvfs_ops;