Whamcloud - gitweb
refactor dual checksum support.
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 053c10f..c9ce969 100644 (file)
@@ -47,6 +47,7 @@
 #include <linux/mount.h>
 #include <linux/buffer_head.h>
 
+#include <obd_cksum.h>
 #include <obd_class.h>
 #include <obd_lov.h>
 #include <lustre_dlm.h>
@@ -63,7 +64,7 @@
 #include "filter_internal.h"
 
 /* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS 3
+#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0
 static struct lvfs_callback_ops filter_lvfs_ops;
 cfs_mem_cache_t *ll_fmd_cachep;
 
@@ -82,7 +83,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
         struct filter_client_data *fcd = fed->fed_fcd;
         __u64 last_rcvd;
         loff_t off;
-        int err, log_pri = D_HA;
+        int err, log_pri = D_RPCTRACE;
 
         /* Propagate error code. */
         if (rc)
@@ -1566,6 +1567,53 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         return(rc);
 }
 
+struct filter_intent_args {
+        struct ldlm_lock **victim;
+        __u64 size;
+        int *liblustre;
+};
+
+static enum interval_iter filter_intent_cb(struct interval_node *n,
+                                           void *args)
+{
+        struct ldlm_interval *node = (struct ldlm_interval *)n;
+        struct filter_intent_args *arg = (struct filter_intent_args*)args;
+        __u64 size = arg->size;
+        struct ldlm_lock **v = arg->victim;
+        struct ldlm_lock *lck;
+
+        /* If the interval is lower than the current file size,
+         * just break. */
+        if (interval_high(n) <= size)
+                return INTERVAL_ITER_STOP;
+
+        list_for_each_entry(lck, &node->li_group, l_sl_policy) {
+                /* Don't send glimpse ASTs to liblustre clients.
+                 * They aren't listening for them, and they do
+                 * entirely synchronous I/O anyways. */
+                if (lck->l_export == NULL ||
+                    lck->l_export->exp_libclient == 1)
+                        continue;
+
+                if (*arg->liblustre)
+                        *arg->liblustre = 0;
+
+                if (*v == NULL) {
+                        *v = LDLM_LOCK_GET(lck);
+                } else if ((*v)->l_policy_data.l_extent.start <
+                           lck->l_policy_data.l_extent.start) {
+                        LDLM_LOCK_PUT(*v);
+                        *v = LDLM_LOCK_GET(lck);
+                }
+
+                /* the same policy group - every lock has the
+                 * same extent, so needn't do it any more */
+                break;
+        }
+
+        return INTERVAL_ITER_CONT;
+}
+
 static int filter_intent_policy(struct ldlm_namespace *ns,
                                 struct ldlm_lock **lockp, void *req_cookie,
                                 ldlm_mode_t mode, int flags, void *data)
@@ -1577,9 +1625,10 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         ldlm_processing_policy policy;
         struct ost_lvb *res_lvb, *reply_lvb;
         struct ldlm_reply *rep;
-        struct list_head *tmp;
         ldlm_error_t err;
-        int rc, tmpflags = 0, only_liblustre = 0;
+        int idx, rc, tmpflags = 0, only_liblustre = 1;
+        struct ldlm_interval_tree *tree;
+        struct filter_intent_args arg;
         int repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                            [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                            [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb) };
@@ -1604,7 +1653,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
 
         /* If we grant any lock at all, it will be a whole-file read lock.
          * Call the extent policy function to see if our request can be
-         * granted, or is blocked. */
+         * granted, or is blocked. 
+         * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse lock
+         */
         lock->l_policy_data.l_extent.start = 0;
         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
         lock->l_req_mode = LCK_PR;
@@ -1652,42 +1703,23 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         LASSERT(res_lvb != NULL);
         *reply_lvb = *res_lvb;
 
-        list_for_each(tmp, &res->lr_granted) {
-                struct ldlm_lock *tmplock =
-                        list_entry(tmp, struct ldlm_lock, l_res_link);
-
-                if (tmplock->l_granted_mode == LCK_PR)
-                        continue;
-                /*
-                 * ->ns_lock guarantees that no new locks are granted, and,
-                 * therefore, that res->lr_lvb_data cannot increase beyond the
-                 * end of already granted lock. As a result, it is safe to
-                 * check against "stale" reply_lvb->lvb_size value without
-                 * res->lr_lvb_sem.
-                 */
-                if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
-                        continue;
-
-                /* Don't send glimpse ASTs to liblustre clients.  They aren't
-                 * listening for them, and they do entirely synchronous I/O
-                 * anyways. */
-                if (tmplock->l_export == NULL ||
-                    tmplock->l_export->exp_libclient == 1) {
-                        only_liblustre = 1;
-                        continue;
-                }
-
-                if (l == NULL) {
-                        l = LDLM_LOCK_GET(tmplock);
-                        continue;
-                }
-
-                if (l->l_policy_data.l_extent.start >
-                    tmplock->l_policy_data.l_extent.start)
+        /*
+         * ->ns_lock guarantees that no new locks are granted, and,
+         * therefore, that res->lr_lvb_data cannot increase beyond the
+         * end of already granted lock. As a result, it is safe to
+         * check against "stale" reply_lvb->lvb_size value without
+         * res->lr_lvb_sem.
+         */
+        arg.size = reply_lvb->lvb_size;
+        arg.victim = &l;
+        arg.liblustre = &only_liblustre;
+        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+                tree = &res->lr_itree[idx];
+                if (tree->lit_mode == LCK_PR)
                         continue;
 
-                LDLM_LOCK_PUT(l);
-                l = LDLM_LOCK_GET(tmplock);
+                interval_iterate_reverse(tree->lit_root, 
+                                         filter_intent_cb, &arg);
         }
         unlock_res(res);
 
@@ -1931,7 +1963,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
 
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
-        INIT_LIST_HEAD(&filter->fo_export_list);
+        CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         sema_init(&filter->fo_alloc_lock, 1);
         init_brw_stats(&filter->fo_filter_stats);
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
@@ -1962,7 +1994,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = llog_cat_initialize(obd, NULL, 1, NULL);
+        rc = llog_cat_initialize(obd, &obd->obd_olg, 1, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
@@ -2075,67 +2107,81 @@ static struct llog_operations filter_size_orig_logops = {
         lop_add: llog_obd_origin_add
 };
 
-static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int filter_llog_init(struct obd_device *obd, int group,
                             struct obd_device *tgt, int count,
                             struct llog_catid *catid,
                             struct obd_uuid *uuid)
 {
+        struct filter_obd *filter = &obd->u.filter;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+
+        if (group == OBD_LLOG_GROUP) {
+                LASSERT(filter->fo_lcm == NULL);
+                OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+                if (!filter->fo_lcm)
+                        RETURN(-ENOMEM);
+
+                rc = llog_init_commit_master((struct llog_commit_master *)
+                                             filter->fo_lcm);
+                if (rc)
+                        GOTO(cleanup, rc);
+
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
-
-        rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+        } else {
+                LASSERT(filter->fo_lcm != NULL);
+        }
+        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
                         &filter_mds_ost_repl_logops);
         if (rc)
-                RETURN(rc);
+                GOTO(cleanup, rc);
 
         /* FIXME - assign unlink_cb for filter's recovery */
-        if (!llogs)
-                ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        else
-                ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        LASSERT(olg);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
 
         LASSERT(ctxt != NULL);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+        ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        rc = llog_start_commit_thread(ctxt->loc_lcm);
+        llog_ctxt_put(ctxt);
+        if (rc)
+                GOTO(cleanup, rc);
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
                         &filter_size_orig_logops);
-        RETURN(rc);
-}
-
-static int filter_group_llog_cleanup(struct llog_ctxt *ctxt)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (CTXTP(ctxt, cleanup))
-                rc = CTXTP(ctxt, cleanup)(ctxt);
-
-        if (ctxt->loc_exp)
-                class_export_put(ctxt->loc_exp);
-        OBD_FREE(ctxt, sizeof(*ctxt));
 
+cleanup:
+        if (rc) {
+                llog_cleanup_commit_master(filter->fo_lcm, 0);
+                OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
+                filter->fo_lcm = NULL;
+        }
         RETURN(rc);
 }
 
-static int filter_group_llog_finish(struct obd_llogs *llogs)
+static int filter_group_llog_finish(struct obd_llog_group *olg)
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
         ENTRY;
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
         if (ctxt)
-                rc = filter_group_llog_cleanup(ctxt);
+                rc = llog_cleanup(ctxt);
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_SIZE_ORIG_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
         if (ctxt)
-                rc2 = filter_group_llog_cleanup(ctxt);
+                rc2 = llog_cleanup(ctxt);
         if (!rc)
                 rc = rc2;
 
@@ -2144,89 +2190,66 @@ static int filter_group_llog_finish(struct obd_llogs *llogs)
 
 static int filter_llog_finish(struct obd_device *obd, int count)
 {
-        struct llog_ctxt *ctxt;
-        int rc = 0, rc2 = 0;
+        int rc;
         ENTRY;
 
-        ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        if (ctxt)
-                rc = llog_cleanup(ctxt);
-
-        ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
-        if (ctxt)
-                rc2 = llog_cleanup(ctxt);
-        if (!rc)
-                rc = rc2;
+        if (obd->u.filter.fo_lcm) { 
+                llog_cleanup_commit_master((struct llog_commit_master *)
+                                           obd->u.filter.fo_lcm, 0);
+                OBD_FREE(obd->u.filter.fo_lcm, 
+                         sizeof(struct llog_commit_master));
+                obd->u.filter.fo_lcm = NULL;
+        }
+        /* finish obd llog group */
+        rc = filter_group_llog_finish(&obd->obd_olg);
 
         RETURN(rc);
 }
 
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
-                                             struct obd_export *export)
+struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg, *nolg;
         struct filter_obd *filter;
-        struct llog_ctxt *ctxt;
-        struct list_head *cur;
         int rc;
 
         filter = &obd->u.filter;
 
+        if (group == OBD_LLOG_GROUP)
+                RETURN(&obd->obd_olg);
+
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                fglog = list_entry(cur, struct filter_group_llog, list);
-                if (fglog->group == group) {
-                        if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
-                                CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
-                                      obd->obd_name, group, fglog->exp, export);
+        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                if (olg->olg_group == group) {
                         spin_unlock(&filter->fo_llog_list_lock);
-                        goto init;
+                        RETURN(olg);
                 }
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        if (export == NULL)
-                RETURN(NULL);
-
-        OBD_ALLOC_PTR(fglog);
-        if (fglog == NULL)
-                RETURN(NULL);
-        fglog->group = group;
-
-        OBD_ALLOC_PTR(fglog->llogs);
-        if (fglog->llogs == NULL) {
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
+        OBD_ALLOC_PTR(olg);
+        if (olg == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
+        llog_group_init(olg, group);
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                nlog = list_entry(cur, struct filter_group_llog, list);
-                LASSERT(nlog->group != group);
+        list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
+                LASSERT(nolg->olg_group != group);
         }
-        list_add(&fglog->list, &filter->fo_llog_list);
+        list_add(&olg->olg_list, &filter->fo_llog_list);
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL);
+        rc = llog_cat_initialize(obd, olg, 1, NULL);
         if (rc) {
-                OBD_FREE_PTR(fglog->llogs);
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
-
-init:
-        if (export) {
-                fglog->exp = export;
-                ctxt = llog_get_context_from_llogs(fglog->llogs,
-                                               LLOG_MDS_OST_REPL_CTXT);
-                LASSERT(ctxt != NULL);
-
-                llog_receptor_accept(ctxt, export->exp_imp_reverse);
+                spin_lock(&filter->fo_llog_list_lock);
+                list_del(&olg->olg_list);
+                spin_unlock(&filter->fo_llog_list_lock);
+                OBD_FREE_PTR(olg);
+                RETURN(ERR_PTR(rc));
         }
-        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
-               obd->obd_name, fglog->llogs, group);
+        CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
+               obd->obd_name, group, olg);
 
-        RETURN(fglog->llogs);
+        RETURN(olg);
 }
 
 static int filter_llog_connect(struct obd_export *exp,
@@ -2234,7 +2257,7 @@ static int filter_llog_connect(struct obd_export *exp,
 {
         struct obd_device *obd = exp->exp_obd;
         struct llog_ctxt *ctxt;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         int rc;
         ENTRY;
 
@@ -2243,13 +2266,17 @@ static int filter_llog_connect(struct obd_export *exp,
                 (unsigned) body->lgdc_logid.lgl_oid,
                 (unsigned) body->lgdc_logid.lgl_ogen);
 
-        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, body->lgdc_ctxt_idx);
+        olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
         LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
                  body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
+        llog_ctxt_put(ctxt);
         if (rc != 0)
                 CERROR("failed to connect rc %d idx %d\n", rc,
                                 body->lgdc_ctxt_idx);
@@ -2259,33 +2286,32 @@ static int filter_llog_connect(struct obd_export *exp,
 
 static int filter_llog_preclean (struct obd_device *obd)
 {
-        struct filter_group_llog *log;
+        struct obd_llog_group *olg;
         struct filter_obd *filter;
         int rc = 0;
         ENTRY;
 
+        rc = obd_llog_finish(obd, 0);
+        if (rc)
+                CERROR("failed to cleanup llogging subsystem\n");
+
         filter = &obd->u.filter;
         spin_lock(&filter->fo_llog_list_lock);
         while (!list_empty(&filter->fo_llog_list)) {
-                log = list_entry(filter->fo_llog_list.next,
-                                 struct filter_group_llog, list);
-                list_del(&log->list);
+                olg = list_entry(filter->fo_llog_list.next,
+                                 struct obd_llog_group, olg_list);
+                list_del(&olg->olg_list);
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                rc = filter_group_llog_finish(log->llogs);
+                rc = filter_group_llog_finish(olg);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
-                                log->group);
-                OBD_FREE_PTR(log->llogs);
-                OBD_FREE_PTR(log);
+                               olg->olg_group);
+                OBD_FREE_PTR(olg);
                 spin_lock(&filter->fo_llog_list_lock);
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = obd_llog_finish(obd, 0);
-        if (rc)
-                CERROR("failed to cleanup llogging subsystem\n");
-
         RETURN(rc);
 }
 
@@ -2433,6 +2459,30 @@ static int filter_connect_internal(struct obd_export *exp,
                 LASSERT(data->ocd_brw_size);
         }
 
+        if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) {
+                __u32 cksum_types = data->ocd_cksum_types;
+
+                /* The client set in ocd_cksum_types the checksum types it
+                 * supports. We have to mask off the algorithms that we don't
+                 * support */
+                if (cksum_types & OBD_CKSUM_ALL)
+                        data->ocd_cksum_types &= OBD_CKSUM_ALL;
+                else
+                        data->ocd_cksum_types = OBD_CKSUM_CRC32;
+
+                CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
+                                   "%x\n", exp->exp_obd->obd_name,
+                                   obd_export_nid2str(exp), cksum_types,
+                                   data->ocd_cksum_types);
+        } else {
+                /* This client does not support OBD_CONNECT_CKSUM
+                 * fall back to CRC32 */
+                CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
+                                   "OBD_CONNECT_CKSUM, CRC32 will be used\n",
+                                   exp->exp_obd->obd_name,
+                                   obd_export_nid2str(exp));
+        }
+
         /* FIXME: Do the same with the MDS UUID and fsd_peeruuid.
          * FIXME: We don't strictly need the COMPAT flag for that,
          * FIXME: as fsd_peeruuid[0] will tell us if that is set.
@@ -2661,6 +2711,10 @@ static int filter_destroy_export(struct obd_export *exp)
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
                        exp, exp->exp_filter_data.fed_pending);
 
+        /* Not ported yet the b1_6 quota functionality
+         * lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
+         */
+
         target_destroy_export(exp);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
@@ -2684,7 +2738,7 @@ static int filter_destroy_export(struct obd_export *exp)
 
 static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg_min, *olg;
         struct filter_obd *filter;
         int worked = 0, group;
         struct llog_ctxt *ctxt;
@@ -2697,35 +2751,41 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
          * group order and skip already synced llogs -bzzz */
         do {
                 /* look for group with min. number, but > worked */
-                fglog = NULL;
+                olg_min = NULL;
                 group = 1 << 30;
                 spin_lock(&filter->fo_llog_list_lock);
-                list_for_each_entry(nlog, &filter->fo_llog_list, list) {
-                        if (nlog->group <= worked) {
+                list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                        if (olg->olg_group <= worked) {
                                 /* this group is already synced */
                                 continue;
                         }
-                        if (group < nlog->group) {
+                        if (group < olg->olg_group) {
                                 /* we have group with smaller number to sync */
                                 continue;
                         }
                         /* store current minimal group */
-                        fglog = nlog;
-                        group = nlog->group;
+                        olg_min = olg;
+                        group = olg->olg_group;
                 }
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                if (fglog == NULL)
+                if (olg_min == NULL)
                         break;
 
-                worked = fglog->group;
-                if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
-                        ctxt = llog_get_context_from_llogs(fglog->llogs,
+                worked = olg_min->olg_group;
+                if (olg_min->olg_exp &&
+                    (dexp == olg_min->olg_exp || dexp == NULL)) {
+                        int err;
+                        ctxt = llog_group_get_ctxt(olg_min,
                                                 LLOG_MDS_OST_REPL_CTXT);
                         LASSERT(ctxt != NULL);
-                        llog_sync(ctxt, fglog->exp);
+                        err = llog_sync(ctxt, olg_min->olg_exp);
+                        llog_ctxt_put(ctxt);
+                        if (err)
+                                CERROR("error flushing logs to MDS: rc %d\n",
+                                       err);                        
                 }
-        } while (fglog != NULL);
+        } while (olg_min != NULL);
 }
 
 /* also incredibly similar to mds_disconnect */
@@ -3271,7 +3331,7 @@ out:
 }
 
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                         __u64 max_age)
+                         __u64 max_age, __u32 flags)
 {
         struct filter_obd *filter = &obd->u.filter;
         int blockbits = obd->u.obt.obt_sb->s_blocksize_bits;
@@ -3367,7 +3427,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 OBD_ALLOC(osfs, sizeof(*osfs));
                 if (osfs == NULL)
                         RETURN(-ENOMEM);
-                rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ);
+                rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ, 0);
                 if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
                         CDEBUG(D_RPCTRACE,"%s: not enough space for create "
                                LPU64"\n", obd->obd_name, osfs->os_bavail <<
@@ -3615,9 +3675,17 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                        oa->o_id);
                 /* If object already gone, cancel cookie right now */
                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
+                        struct llog_ctxt *ctxt;
+                        struct obd_llog_group *olg;
                         fcc = obdo_logcookie(oa);
-                        llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
-                                    NULL, 1, fcc, 0);
+                        olg = filter_find_olg(obd, oa->o_gr);
+                        if (IS_ERR(olg))
+                                GOTO(cleanup, rc = PTR_ERR(olg));
+                        llog_group_set_export(olg, exp);
+
+                        ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
+                        llog_cancel(ctxt, NULL, 1, fcc, 0);
+                        llog_ctxt_put(ctxt);
                         fcc = NULL; /* we didn't allocate fcc, don't free it */
                 }
                 GOTO(cleanup, rc = -ENOENT);
@@ -3770,7 +3838,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
-        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
@@ -3785,8 +3852,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
                 /* flush any remaining cancel messages out to the target */
-                ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
-                llog_sync(ctxt, exp);
+                filter_sync_llogs(exp->exp_obd, exp);
                 RETURN(rc);
         }
 
@@ -3877,7 +3943,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                                  struct ptlrpc_request_set *set)
 {
         struct obd_device *obd;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc = 0, group;
         ENTRY;
@@ -3912,12 +3978,16 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
         group = (int)(*(__u32 *)val);
         LASSERT(group >= FILTER_GROUP_MDS0);
 
-        llog = filter_grab_llog_for_group(obd, group, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, LLOG_MDS_OST_REPL_CTXT);
-        LASSERTF(ctxt != NULL, "ctxt is not null\n"),
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERTF(ctxt != NULL, "ctxt is null\n"),
 
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        llog_ctxt_put(ctxt);
 
         lquota_setinfo(filter_quota_interface_ref, exp, obd);