Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index b451ff8..d7de59a 100644 (file)
@@ -1974,7 +1974,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         obd->obd_lvfs_ctxt.fs = get_ds();
         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
 
-        sema_init(&filter->fo_init_lock, 1);
+        init_mutex(&filter->fo_init_lock);
         filter->fo_committed_group = 0;
 
         rc = filter_prep(obd);
@@ -2137,6 +2137,25 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         return rc;
 }
 
+static int filter_group_llog_finish(struct obd_llog_group *olg)
+{
+        struct llog_ctxt *ctxt;
+        int rc = 0, rc2 = 0;
+        ENTRY;
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
+        if (ctxt)
+                rc2 = llog_cleanup(ctxt);
+        if (!rc)
+                rc = rc2;
+
+        RETURN(rc);
+}
+
 static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/;
 static struct llog_operations filter_size_orig_logops = {
         lop_setup: llog_obd_origin_setup,
@@ -2144,59 +2163,62 @@ static struct llog_operations filter_size_orig_logops = {
         lop_add: llog_obd_origin_add
 };
 
-static int filter_llog_init(struct obd_device *obd, int group,
-                            struct obd_device *tgt, int count,
-                            struct llog_catid *catid,
-                            struct obd_uuid *uuid)
+static int filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
+                           struct obd_device *tgt)
+{
+        int rc;
+
+        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+                        &filter_mds_ost_repl_logops);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+                        &filter_size_orig_logops);
+        if (rc)
+                GOTO(cleanup, rc);
+cleanup:
+        if (rc)
+              filter_group_llog_finish(olg);
+        RETURN(rc);
+}
+
+static int 
+filter_default_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
+                        struct obd_device *tgt)
 {
         struct filter_obd *filter = &obd->u.filter;
-        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc;
-        ENTRY;
 
-        olg = filter_find_olg(obd, group);
-        if (IS_ERR(olg))
-                RETURN(PTR_ERR(olg));
-
-        if (group == OBD_LLOG_GROUP) {
-                LASSERT(filter->fo_lcm == NULL);
-                OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
-                if (!filter->fo_lcm)
-                        RETURN(-ENOMEM);
+        LASSERT(filter->fo_lcm == NULL);
+        OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+        if (!filter->fo_lcm)
+                RETURN(-ENOMEM);
 
-                rc = llog_init_commit_master((struct llog_commit_master *)
-                                             filter->fo_lcm);
-                if (rc)
-                        GOTO(cleanup, rc);
+        rc = llog_init_commit_master((struct llog_commit_master *)
+                                     filter->fo_lcm);
+        if (rc)
+                GOTO(cleanup, rc);
 
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
-        } else {
-                LASSERT(filter->fo_lcm != NULL);
-        }
-        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
-                        &filter_mds_ost_repl_logops);
+
+        rc = filter_olg_init(obd, olg, tgt);
         if (rc)
                 GOTO(cleanup, rc);
 
-        /* FIXME - assign unlink_cb for filter's recovery */
-        LASSERT(olg);
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-
         LASSERT(ctxt != NULL);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
         ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        /* Only start log commit thread for default llog ctxt*/
         rc = llog_start_commit_thread(ctxt->loc_lcm);
         llog_ctxt_put(ctxt);
         if (rc)
                 GOTO(cleanup, rc);
-
-        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
-                        &filter_size_orig_logops);
-
 cleanup:
         if (rc) {
                 llog_cleanup_commit_master(filter->fo_lcm, 0);
@@ -2205,22 +2227,33 @@ cleanup:
         }
         RETURN(rc);
 }
-
-static int filter_group_llog_finish(struct obd_llog_group *olg)
+                     
+static int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+                            struct obd_device *tgt, int count,
+                            struct llog_catid *catid,
+                            struct obd_uuid *uuid)
 {
+        struct filter_obd *filter = &obd->u.filter;
         struct llog_ctxt *ctxt;
-        int rc = 0, rc2 = 0;
+        int rc;
         ENTRY;
 
-        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-        if (ctxt)
-                rc = llog_cleanup(ctxt);
+        LASSERT(olg != NULL);
+        if (olg == &obd->obd_olg)
+                return filter_default_olg_init(obd, olg, tgt);
 
-        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
-        if (ctxt)
-                rc2 = llog_cleanup(ctxt);
-        if (!rc)
-                rc = rc2;
+        /* For other group llog, which will be initialized in
+         * llog_connect(after default_olg_init, so fo_lcm must
+         * already be initialized */ 
+        LASSERT(filter->fo_lcm != NULL);
+        rc = filter_olg_init(obd, olg, tgt);
+        if (rc)
+                RETURN(rc);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERT(ctxt != NULL);
+        ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+        ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        llog_ctxt_put(ctxt);
 
         RETURN(rc);
 }
@@ -2243,9 +2276,39 @@ static int filter_llog_finish(struct obd_device *obd, int count)
         RETURN(rc);
 }
 
+static struct obd_llog_group *
+filter_find_olg_internal(struct filter_obd *filter, int group)
+{
+        struct obd_llog_group *olg;
+
+        LASSERT_SPIN_LOCKED(&filter->fo_llog_list_lock);
+        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                if (olg->olg_group == group)
+                        RETURN(olg);
+        }
+        RETURN(NULL);
+}
+
 struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
 {
-        struct obd_llog_group *olg, *nolg;
+        struct obd_llog_group *olg = NULL;
+        struct filter_obd *filter;
+
+        filter = &obd->u.filter;
+
+        if (group == OBD_LLOG_GROUP)
+                RETURN(&obd->obd_olg);
+
+        spin_lock(&filter->fo_llog_list_lock);
+        olg = filter_find_olg_internal(filter, group);
+        spin_unlock(&filter->fo_llog_list_lock);
+
+        RETURN(olg); 
+}
+
+struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group)
+{
+        struct obd_llog_group *olg = NULL;
         struct filter_obd *filter;
         int rc;
 
@@ -2255,38 +2318,42 @@ struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
                 RETURN(&obd->obd_olg);
 
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
-                if (olg->olg_group == group) {
-                        spin_unlock(&filter->fo_llog_list_lock);
-                        RETURN(olg);
+        olg = filter_find_olg_internal(filter, group);
+        if (olg) {
+                if (olg->olg_initializing) {
+                        GOTO(out_unlock, olg = ERR_PTR(-EBUSY));
+                } else {
+                        GOTO(out_unlock, olg);
                 }
         }
-        spin_unlock(&filter->fo_llog_list_lock);
-
         OBD_ALLOC_PTR(olg);
         if (olg == NULL)
-                RETURN(ERR_PTR(-ENOMEM));
+               GOTO(out_unlock, olg = ERR_PTR(-ENOMEM));
 
         llog_group_init(olg, group);
-        spin_lock(&filter->fo_llog_list_lock);
-        list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
-                LASSERT(nolg->olg_group != group);
-        }
         list_add(&olg->olg_list, &filter->fo_llog_list);
+        olg->olg_initializing = 1;
         spin_unlock(&filter->fo_llog_list_lock);
 
         rc = llog_cat_initialize(obd, olg, 1, NULL);
         if (rc) {
-                spin_lock(&filter->fo_llog_list_lock);
-                list_del(&olg->olg_list);
-                spin_unlock(&filter->fo_llog_list_lock);
-                OBD_FREE_PTR(olg);
-                RETURN(ERR_PTR(rc));
+               spin_lock(&filter->fo_llog_list_lock);
+               list_del(&olg->olg_list);
+               spin_unlock(&filter->fo_llog_list_lock);
+               OBD_FREE_PTR(olg);
+               GOTO(out, olg = ERR_PTR(-ENOMEM));
         }
+        spin_lock(&filter->fo_llog_list_lock);
+        olg->olg_initializing = 0;
+        spin_unlock(&filter->fo_llog_list_lock);
         CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
-               obd->obd_name, group, olg);
-
+              obd->obd_name, group, olg);
+out:
         RETURN(olg);
+
+out_unlock:
+        spin_unlock(&filter->fo_llog_list_lock);
+        GOTO(out, olg);
 }
 
 static int filter_llog_connect(struct obd_export *exp,
@@ -2304,8 +2371,11 @@ static int filter_llog_connect(struct obd_export *exp,
                 (unsigned) body->lgdc_logid.lgl_ogen);
 
         olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
-        if (IS_ERR(olg))
-                RETURN(PTR_ERR(olg));
+        if (!olg) {
+                CERROR(" %s: can not find olg of group %d\n",
+                       obd->obd_name, (int)body->lgdc_logid.lgl_ogr); 
+                RETURN(-ENOENT);
+        }
         llog_group_set_export(olg, exp);
 
         ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
@@ -2323,8 +2393,9 @@ static int filter_llog_connect(struct obd_export *exp,
 
 static int filter_llog_preclean (struct obd_device *obd)
 {
-        struct obd_llog_group *olg;
+        struct obd_llog_group *olg, *tmp;
         struct filter_obd *filter;
+        struct list_head  remove_list;
         int rc = 0;
         ENTRY;
 
@@ -2333,21 +2404,25 @@ static int filter_llog_preclean (struct obd_device *obd)
                 CERROR("failed to cleanup llogging subsystem\n");
 
         filter = &obd->u.filter;
+        CFS_INIT_LIST_HEAD(&remove_list);
+
         spin_lock(&filter->fo_llog_list_lock);
         while (!list_empty(&filter->fo_llog_list)) {
                 olg = list_entry(filter->fo_llog_list.next,
                                  struct obd_llog_group, olg_list);
                 list_del(&olg->olg_list);
-                spin_unlock(&filter->fo_llog_list_lock);
-
+                list_add(&olg->olg_list, &remove_list);
+        }
+        spin_unlock(&filter->fo_llog_list_lock);
+        
+        list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) {  
+                list_del_init(&olg->olg_list);
                 rc = filter_group_llog_finish(olg);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
                                olg->olg_group);
                 OBD_FREE_PTR(olg);
-                spin_lock(&filter->fo_llog_list_lock);
         }
-        spin_unlock(&filter->fo_llog_list_lock);
 
         RETURN(rc);
 }
@@ -3740,8 +3815,11 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                         struct obd_llog_group *olg;
                         fcc = obdo_logcookie(oa);
                         olg = filter_find_olg(obd, oa->o_gr);
-                        if (IS_ERR(olg))
-                                GOTO(cleanup, rc = PTR_ERR(olg));
+                        if (!olg) {
+                               CERROR(" %s: can not find olg of group %d\n",
+                                      obd->obd_name, (int)oa->o_gr);
+                               GOTO(cleanup, rc = PTR_ERR(olg));
+                        }
                         llog_group_set_export(olg, exp);
 
                         ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
@@ -4039,11 +4117,12 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
         group = (int)(*(__u32 *)val);
         LASSERT(group >= FILTER_GROUP_MDS0);
 
-        olg = filter_find_olg(obd, group);
+        olg = filter_find_create_olg(obd, group);
         if (IS_ERR(olg))
                 RETURN(PTR_ERR(olg));
-        llog_group_set_export(olg, exp);
 
+        llog_group_set_export(olg, exp);
+        
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
         LASSERTF(ctxt != NULL, "ctxt is null\n"),