Whamcloud - gitweb
refactor dual checksum support.
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index b53ea58..c9ce969 100644 (file)
 #include <linux/init.h>
 #include <linux/version.h>
 #include <linux/sched.h>
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-# include <linux/mount.h>
-# include <linux/buffer_head.h>
-#endif
+#include <linux/mount.h>
+#include <linux/buffer_head.h>
 
+#include <obd_cksum.h>
 #include <obd_class.h>
 #include <obd_lov.h>
 #include <lustre_dlm.h>
@@ -65,7 +64,7 @@
 #include "filter_internal.h"
 
 /* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS 3
+#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0
 static struct lvfs_callback_ops filter_lvfs_ops;
 cfs_mem_cache_t *ll_fmd_cachep;
 
@@ -84,7 +83,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
         struct filter_client_data *fcd = fed->fed_fcd;
         __u64 last_rcvd;
         loff_t off;
-        int err, log_pri = D_HA;
+        int err, log_pri = D_RPCTRACE;
 
         /* Propagate error code. */
         if (rc)
@@ -191,7 +190,8 @@ static int filter_export_stats_init(struct obd_device *obd,
         /* Create a per export proc entry for ops stats */
         num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) +
                      LPROC_FILTER_LAST - 1;
-        exp->exp_ops_stats = lprocfs_alloc_stats(num_stats);
+        exp->exp_ops_stats = lprocfs_alloc_stats(num_stats,
+                                                 LPROCFS_STATS_FLAG_NOPERCPU);
         if (exp->exp_ops_stats == NULL)
               RETURN(-ENOMEM);
         lprocfs_init_ops_stats(LPROC_FILTER_LAST, exp->exp_ops_stats);
@@ -1389,6 +1389,8 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
 
         if (IS_ERR(dparent))
                 return dparent;
+        if (dparent == NULL)
+                return ERR_PTR(-ENOENT);
 
         rc = filter_lock_dentry(obd, dparent);
         fsfilt_check_slow(obd, now, obd_timeout, "parent lock");
@@ -1527,11 +1529,9 @@ int filter_vfs_unlink(struct inode *dir, struct dentry *dentry)
          *       here) or some other ordering issue. */
         DQUOT_INIT(dir);
 
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
         rc = security_inode_unlink(dir, dentry);
         if (rc)
                 GOTO(out, rc);
-#endif
 
         rc = dir->i_op->unlink(dir, dentry);
 out:
@@ -1567,6 +1567,53 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         return(rc);
 }
 
+struct filter_intent_args {
+        struct ldlm_lock **victim;
+        __u64 size;
+        int *liblustre;
+};
+
+static enum interval_iter filter_intent_cb(struct interval_node *n,
+                                           void *args)
+{
+        struct ldlm_interval *node = (struct ldlm_interval *)n;
+        struct filter_intent_args *arg = (struct filter_intent_args*)args;
+        __u64 size = arg->size;
+        struct ldlm_lock **v = arg->victim;
+        struct ldlm_lock *lck;
+
+        /* If the interval is lower than the current file size,
+         * just break. */
+        if (interval_high(n) <= size)
+                return INTERVAL_ITER_STOP;
+
+        list_for_each_entry(lck, &node->li_group, l_sl_policy) {
+                /* Don't send glimpse ASTs to liblustre clients.
+                 * They aren't listening for them, and they do
+                 * entirely synchronous I/O anyways. */
+                if (lck->l_export == NULL ||
+                    lck->l_export->exp_libclient == 1)
+                        continue;
+
+                if (*arg->liblustre)
+                        *arg->liblustre = 0;
+
+                if (*v == NULL) {
+                        *v = LDLM_LOCK_GET(lck);
+                } else if ((*v)->l_policy_data.l_extent.start <
+                           lck->l_policy_data.l_extent.start) {
+                        LDLM_LOCK_PUT(*v);
+                        *v = LDLM_LOCK_GET(lck);
+                }
+
+                /* the same policy group - every lock has the
+                 * same extent, so needn't do it any more */
+                break;
+        }
+
+        return INTERVAL_ITER_CONT;
+}
+
 static int filter_intent_policy(struct ldlm_namespace *ns,
                                 struct ldlm_lock **lockp, void *req_cookie,
                                 ldlm_mode_t mode, int flags, void *data)
@@ -1578,9 +1625,10 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         ldlm_processing_policy policy;
         struct ost_lvb *res_lvb, *reply_lvb;
         struct ldlm_reply *rep;
-        struct list_head *tmp;
         ldlm_error_t err;
-        int rc, tmpflags = 0, only_liblustre = 0;
+        int idx, rc, tmpflags = 0, only_liblustre = 1;
+        struct ldlm_interval_tree *tree;
+        struct filter_intent_args arg;
         int repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                            [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                            [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb) };
@@ -1605,7 +1653,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
 
         /* If we grant any lock at all, it will be a whole-file read lock.
          * Call the extent policy function to see if our request can be
-         * granted, or is blocked. */
+         * granted, or is blocked. 
+         * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse lock
+         */
         lock->l_policy_data.l_extent.start = 0;
         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
         lock->l_req_mode = LCK_PR;
@@ -1635,8 +1685,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                  * list (and potentially being added to l_pending_list by an
                  * AST) when we are going to drop this lock ASAP. */
                 if (lock->l_export->exp_libclient ||
-                    OBD_FAIL_CHECK(OBD_FAIL_LDLM_GLIMPSE)) {
-                        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
+                    OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
                         ldlm_resource_unlink_lock(lock);
                         err = ELDLM_LOCK_ABORTED;
                 } else {
@@ -1654,42 +1703,23 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         LASSERT(res_lvb != NULL);
         *reply_lvb = *res_lvb;
 
-        list_for_each(tmp, &res->lr_granted) {
-                struct ldlm_lock *tmplock =
-                        list_entry(tmp, struct ldlm_lock, l_res_link);
-
-                if (tmplock->l_granted_mode == LCK_PR)
-                        continue;
-                /*
-                 * ->ns_lock guarantees that no new locks are granted, and,
-                 * therefore, that res->lr_lvb_data cannot increase beyond the
-                 * end of already granted lock. As a result, it is safe to
-                 * check against "stale" reply_lvb->lvb_size value without
-                 * res->lr_lvb_sem.
-                 */
-                if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
-                        continue;
-
-                /* Don't send glimpse ASTs to liblustre clients.  They aren't
-                 * listening for them, and they do entirely synchronous I/O
-                 * anyways. */
-                if (tmplock->l_export == NULL ||
-                    tmplock->l_export->exp_libclient == 1) {
-                        only_liblustre = 1;
-                        continue;
-                }
-
-                if (l == NULL) {
-                        l = LDLM_LOCK_GET(tmplock);
-                        continue;
-                }
-
-                if (l->l_policy_data.l_extent.start >
-                    tmplock->l_policy_data.l_extent.start)
+        /*
+         * ->ns_lock guarantees that no new locks are granted, and,
+         * therefore, that res->lr_lvb_data cannot increase beyond the
+         * end of already granted lock. As a result, it is safe to
+         * check against "stale" reply_lvb->lvb_size value without
+         * res->lr_lvb_sem.
+         */
+        arg.size = reply_lvb->lvb_size;
+        arg.victim = &l;
+        arg.liblustre = &only_liblustre;
+        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+                tree = &res->lr_itree[idx];
+                if (tree->lit_mode == LCK_PR)
                         continue;
 
-                LDLM_LOCK_PUT(l);
-                l = LDLM_LOCK_GET(tmplock);
+                interval_iterate_reverse(tree->lit_root, 
+                                         filter_intent_cb, &arg);
         }
         unlock_res(res);
 
@@ -1706,8 +1736,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                          *
                          * Of course, this will all disappear when we switch to
                          * taking liblustre locks on the OST. */
-                        if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
-                                ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+                        ldlm_res_lvbo_update(res, NULL, 0, 1);
                 }
                 RETURN(ELDLM_LOCK_ABORTED);
         }
@@ -1733,8 +1762,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
          * XXX nikita: situation when ldlm_server_glimpse_ast() failed before
          * sending ast is not handled. This can result in lost client writes.
          */
-        if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
-                ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+        if (rc != 0)
+                ldlm_res_lvbo_update(res, NULL, 0, 1);
 
         lock_res(res);
         *reply_lvb = *res_lvb;
@@ -1934,7 +1963,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
 
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
-        INIT_LIST_HEAD(&filter->fo_export_list);
+        CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         sema_init(&filter->fo_alloc_lock, 1);
         init_brw_stats(&filter->fo_filter_stats);
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
@@ -1944,6 +1973,9 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         INIT_LIST_HEAD(&filter->fo_llog_list);
         spin_lock_init(&filter->fo_llog_list_lock);
 
+        filter->fo_sptlrpc_lock = RW_LOCK_UNLOCKED;
+        sptlrpc_rule_set_init(&filter->fo_sptlrpc_rset);
+
         filter->fo_fl_oss_capa = 0;
         INIT_LIST_HEAD(&filter->fo_capa_keys);
         filter->fo_capa_hash = init_capa_hash();
@@ -1951,7 +1983,8 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
                 GOTO(err_ops, rc = -ENOMEM);
 
         sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
-        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
+                                                LDLM_NAMESPACE_GREEDY);
         if (obd->obd_namespace == NULL)
                 GOTO(err_post, rc = -ENOMEM);
         obd->obd_namespace->ns_lvbp = obd;
@@ -1961,7 +1994,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = llog_cat_initialize(obd, NULL, 1, NULL);
+        rc = llog_cat_initialize(obd, &obd->obd_olg, 1, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
@@ -2019,7 +2052,8 @@ err_mntput:
 static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
 {
         struct lprocfs_static_vars lvars;
-        unsigned long page;
+        unsigned long addr;
+        struct page *page;
         int rc;
 
         CLASSERT(offsetof(struct obd_device, u.obt) ==
@@ -2029,13 +2063,15 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                 RETURN(-EINVAL);
 
         /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */
-        page = get_zeroed_page(GFP_KERNEL);
+        OBD_PAGE_ALLOC(page, CFS_ALLOC_STD);
         if (!page)
                 RETURN(-ENOMEM);
+        addr = (unsigned long)cfs_page_address(page);
+        clear_page((void *)addr);
 
         /* lprocfs must be setup before the filter so state can be safely added
          * to /proc incrementally as the filter is setup */
-        lprocfs_init_vars(filter, &lvars);
+        lprocfs_filter_init_vars(&lvars);
         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
             lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) {
                 /* Init obdfilter private stats here */
@@ -2051,10 +2087,10 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                                                    obd->obd_proc_entry);
         }
 
-        memcpy((void *)page, lustre_cfg_buf(lcfg, 4),
+        memcpy((void *)addr, lustre_cfg_buf(lcfg, 4),
                LUSTRE_CFG_BUFLEN(lcfg, 4));
-        rc = filter_common_setup(obd, lcfg, (void *)page);
-        free_page(page);
+        rc = filter_common_setup(obd, lcfg, (void *)addr);
+        OBD_PAGE_FREE(page);
 
         if (rc) {
                 lprocfs_obd_cleanup(obd);
@@ -2071,67 +2107,81 @@ static struct llog_operations filter_size_orig_logops = {
         lop_add: llog_obd_origin_add
 };
 
-static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int filter_llog_init(struct obd_device *obd, int group,
                             struct obd_device *tgt, int count,
                             struct llog_catid *catid,
                             struct obd_uuid *uuid)
 {
+        struct filter_obd *filter = &obd->u.filter;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+
+        if (group == OBD_LLOG_GROUP) {
+                LASSERT(filter->fo_lcm == NULL);
+                OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+                if (!filter->fo_lcm)
+                        RETURN(-ENOMEM);
+
+                rc = llog_init_commit_master((struct llog_commit_master *)
+                                             filter->fo_lcm);
+                if (rc)
+                        GOTO(cleanup, rc);
+
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
-
-        rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+        } else {
+                LASSERT(filter->fo_lcm != NULL);
+        }
+        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
                         &filter_mds_ost_repl_logops);
         if (rc)
-                RETURN(rc);
+                GOTO(cleanup, rc);
 
         /* FIXME - assign unlink_cb for filter's recovery */
-        if (!llogs)
-                ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        else
-                ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        LASSERT(olg);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
 
         LASSERT(ctxt != NULL);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+        ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        rc = llog_start_commit_thread(ctxt->loc_lcm);
+        llog_ctxt_put(ctxt);
+        if (rc)
+                GOTO(cleanup, rc);
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
                         &filter_size_orig_logops);
-        RETURN(rc);
-}
-
-static int filter_group_llog_cleanup(struct llog_ctxt *ctxt)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (CTXTP(ctxt, cleanup))
-                rc = CTXTP(ctxt, cleanup)(ctxt);
-
-        if (ctxt->loc_exp)
-                class_export_put(ctxt->loc_exp);
-        OBD_FREE(ctxt, sizeof(*ctxt));
 
+cleanup:
+        if (rc) {
+                llog_cleanup_commit_master(filter->fo_lcm, 0);
+                OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
+                filter->fo_lcm = NULL;
+        }
         RETURN(rc);
 }
 
-static int filter_group_llog_finish(struct obd_llogs *llogs)
+static int filter_group_llog_finish(struct obd_llog_group *olg)
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
         ENTRY;
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
         if (ctxt)
-                rc = filter_group_llog_cleanup(ctxt);
+                rc = llog_cleanup(ctxt);
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_SIZE_ORIG_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
         if (ctxt)
-                rc2 = filter_group_llog_cleanup(ctxt);
+                rc2 = llog_cleanup(ctxt);
         if (!rc)
                 rc = rc2;
 
@@ -2140,89 +2190,66 @@ static int filter_group_llog_finish(struct obd_llogs *llogs)
 
 static int filter_llog_finish(struct obd_device *obd, int count)
 {
-        struct llog_ctxt *ctxt;
-        int rc = 0, rc2 = 0;
+        int rc;
         ENTRY;
 
-        ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        if (ctxt)
-                rc = llog_cleanup(ctxt);
-
-        ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
-        if (ctxt)
-                rc2 = llog_cleanup(ctxt);
-        if (!rc)
-                rc = rc2;
+        if (obd->u.filter.fo_lcm) { 
+                llog_cleanup_commit_master((struct llog_commit_master *)
+                                           obd->u.filter.fo_lcm, 0);
+                OBD_FREE(obd->u.filter.fo_lcm, 
+                         sizeof(struct llog_commit_master));
+                obd->u.filter.fo_lcm = NULL;
+        }
+        /* finish obd llog group */
+        rc = filter_group_llog_finish(&obd->obd_olg);
 
         RETURN(rc);
 }
 
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
-                                             struct obd_export *export)
+struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg, *nolg;
         struct filter_obd *filter;
-        struct llog_ctxt *ctxt;
-        struct list_head *cur;
         int rc;
 
         filter = &obd->u.filter;
 
+        if (group == OBD_LLOG_GROUP)
+                RETURN(&obd->obd_olg);
+
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                fglog = list_entry(cur, struct filter_group_llog, list);
-                if (fglog->group == group) {
-                        if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
-                                CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
-                                      obd->obd_name, group, fglog->exp, export);
+        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                if (olg->olg_group == group) {
                         spin_unlock(&filter->fo_llog_list_lock);
-                        goto init;
+                        RETURN(olg);
                 }
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        if (export == NULL)
-                RETURN(NULL);
-
-        OBD_ALLOC_PTR(fglog);
-        if (fglog == NULL)
-                RETURN(NULL);
-        fglog->group = group;
-
-        OBD_ALLOC_PTR(fglog->llogs);
-        if (fglog->llogs == NULL) {
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
+        OBD_ALLOC_PTR(olg);
+        if (olg == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
+        llog_group_init(olg, group);
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                nlog = list_entry(cur, struct filter_group_llog, list);
-                LASSERT(nlog->group != group);
+        list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
+                LASSERT(nolg->olg_group != group);
         }
-        list_add(&fglog->list, &filter->fo_llog_list);
+        list_add(&olg->olg_list, &filter->fo_llog_list);
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL);
+        rc = llog_cat_initialize(obd, olg, 1, NULL);
         if (rc) {
-                OBD_FREE_PTR(fglog->llogs);
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
-
-init:
-        if (export) {
-                fglog->exp = export;
-                ctxt = llog_get_context_from_llogs(fglog->llogs,
-                                               LLOG_MDS_OST_REPL_CTXT);
-                LASSERT(ctxt != NULL);
-
-                llog_receptor_accept(ctxt, export->exp_imp_reverse);
+                spin_lock(&filter->fo_llog_list_lock);
+                list_del(&olg->olg_list);
+                spin_unlock(&filter->fo_llog_list_lock);
+                OBD_FREE_PTR(olg);
+                RETURN(ERR_PTR(rc));
         }
-        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
-               obd->obd_name, fglog->llogs, group);
+        CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
+               obd->obd_name, group, olg);
 
-        RETURN(fglog->llogs);
+        RETURN(olg);
 }
 
 static int filter_llog_connect(struct obd_export *exp,
@@ -2230,7 +2257,7 @@ static int filter_llog_connect(struct obd_export *exp,
 {
         struct obd_device *obd = exp->exp_obd;
         struct llog_ctxt *ctxt;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         int rc;
         ENTRY;
 
@@ -2239,13 +2266,17 @@ static int filter_llog_connect(struct obd_export *exp,
                 (unsigned) body->lgdc_logid.lgl_oid,
                 (unsigned) body->lgdc_logid.lgl_ogen);
 
-        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, body->lgdc_ctxt_idx);
+        olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
         LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
                  body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
+        llog_ctxt_put(ctxt);
         if (rc != 0)
                 CERROR("failed to connect rc %d idx %d\n", rc,
                                 body->lgdc_ctxt_idx);
@@ -2255,33 +2286,32 @@ static int filter_llog_connect(struct obd_export *exp,
 
 static int filter_llog_preclean (struct obd_device *obd)
 {
-        struct filter_group_llog *log;
+        struct obd_llog_group *olg;
         struct filter_obd *filter;
         int rc = 0;
         ENTRY;
 
+        rc = obd_llog_finish(obd, 0);
+        if (rc)
+                CERROR("failed to cleanup llogging subsystem\n");
+
         filter = &obd->u.filter;
         spin_lock(&filter->fo_llog_list_lock);
         while (!list_empty(&filter->fo_llog_list)) {
-                log = list_entry(filter->fo_llog_list.next,
-                                 struct filter_group_llog, list);
-                list_del(&log->list);
+                olg = list_entry(filter->fo_llog_list.next,
+                                 struct obd_llog_group, olg_list);
+                list_del(&olg->olg_list);
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                rc = filter_group_llog_finish(log->llogs);
+                rc = filter_group_llog_finish(olg);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
-                                log->group);
-                OBD_FREE_PTR(log->llogs);
-                OBD_FREE_PTR(log);
+                               olg->olg_group);
+                OBD_FREE_PTR(olg);
                 spin_lock(&filter->fo_llog_list_lock);
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = obd_llog_finish(obd, 0);
-        if (rc)
-                CERROR("failed to cleanup llogging subsystem\n");
-
         RETURN(rc);
 }
 
@@ -2296,9 +2326,9 @@ static int filter_precleanup(struct obd_device *obd,
                 break;
         case OBD_CLEANUP_EXPORTS:
                 target_cleanup_recovery(obd);
+                rc = filter_llog_preclean(obd);
                 break;
         case OBD_CLEANUP_SELF_EXP:
-                rc = filter_llog_preclean(obd);
                 break;
         case OBD_CLEANUP_OBD:
                 break;
@@ -2334,6 +2364,8 @@ static int filter_cleanup(struct obd_device *obd)
 
         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
 
+        sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset);
+
         if (obd->u.obt.obt_sb == NULL)
                 RETURN(0);
 
@@ -2371,6 +2403,14 @@ static int filter_connect_internal(struct obd_export *exp,
         exp->exp_connect_flags = data->ocd_connect_flags;
         data->ocd_version = LUSTRE_VERSION_CODE;
 
+        if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
+                CWARN("%s: OST requires FID support (flag="LPX64
+                      "), but client not\n",
+                      exp->exp_obd->obd_name,
+                      exp->exp_connect_flags);
+                RETURN(-EBADF);
+        }
+
         if (exp->exp_connect_flags & OBD_CONNECT_GRANT) {
                 struct filter_export_data *fed = &exp->exp_filter_data;
                 obd_size left, want;
@@ -2419,6 +2459,30 @@ static int filter_connect_internal(struct obd_export *exp,
                 LASSERT(data->ocd_brw_size);
         }
 
+        if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) {
+                __u32 cksum_types = data->ocd_cksum_types;
+
+                /* The client set in ocd_cksum_types the checksum types it
+                 * supports. We have to mask off the algorithms that we don't
+                 * support */
+                if (cksum_types & OBD_CKSUM_ALL)
+                        data->ocd_cksum_types &= OBD_CKSUM_ALL;
+                else
+                        data->ocd_cksum_types = OBD_CKSUM_CRC32;
+
+                CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
+                                   "%x\n", exp->exp_obd->obd_name,
+                                   obd_export_nid2str(exp), cksum_types,
+                                   data->ocd_cksum_types);
+        } else {
+                /* This client does not support OBD_CONNECT_CKSUM
+                 * fall back to CRC32 */
+                CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
+                                   "OBD_CONNECT_CKSUM, CRC32 will be used\n",
+                                   exp->exp_obd->obd_name,
+                                   obd_export_nid2str(exp));
+        }
+
         /* FIXME: Do the same with the MDS UUID and fsd_peeruuid.
          * FIXME: We don't strictly need the COMPAT flag for that,
          * FIXME: as fsd_peeruuid[0] will tell us if that is set.
@@ -2427,7 +2491,8 @@ static int filter_connect_internal(struct obd_export *exp,
         RETURN(0);
 }
 
-static int filter_reconnect(struct obd_export *exp, struct obd_device *obd,
+static int filter_reconnect(const struct lu_env *env,
+                            struct obd_export *exp, struct obd_device *obd,
                             struct obd_uuid *cluuid,
                             struct obd_connect_data *data)
 {
@@ -2646,6 +2711,10 @@ static int filter_destroy_export(struct obd_export *exp)
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
                        exp, exp->exp_filter_data.fed_pending);
 
+        /* Not ported yet the b1_6 quota functionality
+         * lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
+         */
+
         target_destroy_export(exp);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
@@ -2669,7 +2738,7 @@ static int filter_destroy_export(struct obd_export *exp)
 
 static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg_min, *olg;
         struct filter_obd *filter;
         int worked = 0, group;
         struct llog_ctxt *ctxt;
@@ -2682,35 +2751,41 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
          * group order and skip already synced llogs -bzzz */
         do {
                 /* look for group with min. number, but > worked */
-                fglog = NULL;
+                olg_min = NULL;
                 group = 1 << 30;
                 spin_lock(&filter->fo_llog_list_lock);
-                list_for_each_entry(nlog, &filter->fo_llog_list, list) {
-                        if (nlog->group <= worked) {
+                list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                        if (olg->olg_group <= worked) {
                                 /* this group is already synced */
                                 continue;
                         }
-                        if (group < nlog->group) {
+                        if (group < olg->olg_group) {
                                 /* we have group with smaller number to sync */
                                 continue;
                         }
                         /* store current minimal group */
-                        fglog = nlog;
-                        group = nlog->group;
+                        olg_min = olg;
+                        group = olg->olg_group;
                 }
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                if (fglog == NULL)
+                if (olg_min == NULL)
                         break;
 
-                worked = fglog->group;
-                if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
-                        ctxt = llog_get_context_from_llogs(fglog->llogs,
+                worked = olg_min->olg_group;
+                if (olg_min->olg_exp &&
+                    (dexp == olg_min->olg_exp || dexp == NULL)) {
+                        int err;
+                        ctxt = llog_group_get_ctxt(olg_min,
                                                 LLOG_MDS_OST_REPL_CTXT);
                         LASSERT(ctxt != NULL);
-                        llog_sync(ctxt, fglog->exp);
+                        err = llog_sync(ctxt, olg_min->olg_exp);
+                        llog_ctxt_put(ctxt);
+                        if (err)
+                                CERROR("error flushing logs to MDS: rc %d\n",
+                                       err);                        
                 }
-        } while (fglog != NULL);
+        } while (olg_min != NULL);
 }
 
 /* also incredibly similar to mds_disconnect */
@@ -2773,8 +2848,8 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
 
         if (IS_ERR(dchild)) {
-                CERROR("%s error looking up object: "LPU64"\n",
-                       what, oa->o_id);
+                CERROR("%s error looking up object: "LPU64":"LPU64"\n",
+                       what, group, oa->o_id);
                 RETURN(dchild);
         }
 
@@ -3007,7 +3082,6 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
 {
         struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id, 0,
                                                 oinfo->oi_oa->o_gr, 0 } };
-        struct ldlm_valblock_ops *ns_lvbo;
         struct filter_mod_data *fmd;
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
@@ -3047,9 +3121,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
                                 &res_id, LDLM_EXTENT, 0);
 
         if (res != NULL) {
-                ns_lvbo = res->lr_namespace->ns_lvbo;
-                if (ns_lvbo && ns_lvbo->lvbo_update)
-                        rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
+                rc = ldlm_res_lvbo_update(res, NULL, 0, 0);
                 ldlm_resource_putref(res);
         }
 
@@ -3236,7 +3308,7 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa,
                         diff = 1;
                 else
                         diff = oa->o_id - filter_last_id(filter, group);
-                CDEBUG(D_HA, "filter_last_id() = "LPU64" -> diff = %d\n",
+                CDEBUG(D_RPCTRACE, "filter_last_id() = "LPU64" -> diff = %d\n",
                        filter_last_id(filter, group), diff);
 
                 LASSERTF(diff >= 0,"%s: "LPU64" - "LPU64" = %d\n",obd->obd_name,
@@ -3259,7 +3331,7 @@ out:
 }
 
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                         __u64 max_age)
+                         __u64 max_age, __u32 flags)
 {
         struct filter_obd *filter = &obd->u.filter;
         int blockbits = obd->u.obt.obt_sb->s_blocksize_bits;
@@ -3294,6 +3366,31 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         RETURN(rc);
 }
 
+static int filter_use_existing_obj(struct obd_device *obd,
+                                   struct dentry *dchild, void **handle,
+                                   int *cleanup_phase)
+{
+        struct inode *inode = dchild->d_inode;
+        struct iattr iattr;
+        int rc;
+
+        if ((inode->i_mode & (S_ISUID | S_ISGID)) == (S_ISUID|S_ISGID))
+                return 0;
+
+        *handle = fsfilt_start_log(obd, inode, FSFILT_OP_SETATTR, NULL, 1);
+        if (IS_ERR(*handle))
+                return PTR_ERR(*handle);
+
+        iattr.ia_valid = ATTR_MODE;
+        iattr.ia_mode = S_ISUID | S_ISGID |0666;
+        rc = fsfilt_setattr(obd, dchild, *handle, &iattr, 1);
+        if (rc == 0)
+                *cleanup_phase = 3;
+
+        return rc;
+}
+
+
 /* We rely on the fact that only one thread will be creating files in a given
  * group at a time, which is why we don't need an atomic filter_get_new_id.
  * Even if we had that atomic function, the following race would exist:
@@ -3330,10 +3427,10 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 OBD_ALLOC(osfs, sizeof(*osfs));
                 if (osfs == NULL)
                         RETURN(-ENOMEM);
-                rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ);
+                rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ, 0);
                 if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
-                        CDEBUG(D_HA,"%s: not enough space for create "LPU64"\n",
-                               obd->obd_name, osfs->os_bavail <<
+                        CDEBUG(D_RPCTRACE,"%s: not enough space for create "
+                               LPU64"\n", obd->obd_name, osfs->os_bavail <<
                                filter->fo_vfsmnt->mnt_sb->s_blocksize_bits);
                         *num = 0;
                         rc = -ENOSPC;
@@ -3343,8 +3440,8 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                         RETURN(rc);
         }
 
-        CDEBUG(D_HA, "%s: precreating %d objects in group "LPU64" at "LPU64"\n",
-               obd->obd_name, *num, group, oa->o_id);
+        CDEBUG(D_RPCTRACE, "%s: precreating %d objects in group "LPU64
+               " at "LPU64"\n", obd->obd_name, *num, group, oa->o_id);
 
         for (i = 0; i < *num && err == 0; i++) {
                 int cleanup_phase = 0;
@@ -3383,14 +3480,24 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
 
                 if (dchild->d_inode != NULL) {
                         /* This would only happen if lastobjid was bad on disk*/
-                        /* Could also happen if recreating missing obj but
-                         * already exists
-                         */
+                        /* Could also happen if recreating missing obj but it
+                         * already exists. */
                         if (recreate_obj) {
                                 CERROR("%s: recreating existing object %.*s?\n",
                                        obd->obd_name, dchild->d_name.len,
                                        dchild->d_name.name);
                         } else {
+                                /* Use these existing objects if they are
+                                 * zero length. */
+                                if (dchild->d_inode->i_size == 0) {
+                                        rc = filter_use_existing_obj(obd,dchild,
+                                                      &handle, &cleanup_phase);
+                                        if (rc == 0)
+                                                goto set_last_id;
+                                        else
+                                                GOTO(cleanup, rc);
+                                }
+
                                 CERROR("%s: Serious error: objid %.*s already "
                                        "exists; is this filesystem corrupt?\n",
                                        obd->obd_name, dchild->d_name.len,
@@ -3416,6 +3523,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                         GOTO(cleanup, rc);
                 }
 
+set_last_id:
                 if (!recreate_obj) {
                         filter_set_last_id(filter, next_id, group);
                         err = filter_update_last_objid(obd, group, 0);
@@ -3444,15 +3552,17 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 if (rc)
                         break;
                 if (time_after(jiffies, enough_time)) {
-                        CDEBUG(D_HA, "%s: precreate slow - want %d got %d \n",
+                        CDEBUG(D_RPCTRACE,
+                               "%s: precreate slow - want %d got %d \n",
                                obd->obd_name, *num, i);
                         break;
                 }
         }
         *num = i;
 
-        CDEBUG(D_HA, "%s: created %d objects for group "LPU64": "LPU64"\n",
-               obd->obd_name, i, group, filter->fo_last_objids[group]);
+        CDEBUG(D_RPCTRACE,
+               "%s: created %d objects for group "LPU64": "LPU64" rc %d\n",
+               obd->obd_name, i, group, filter->fo_last_objids[group], rc);
 
         RETURN(rc);
 }
@@ -3565,9 +3675,17 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                        oa->o_id);
                 /* If object already gone, cancel cookie right now */
                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
+                        struct llog_ctxt *ctxt;
+                        struct obd_llog_group *olg;
                         fcc = obdo_logcookie(oa);
-                        llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
-                                    NULL, 1, fcc, 0);
+                        olg = filter_find_olg(obd, oa->o_gr);
+                        if (IS_ERR(olg))
+                                GOTO(cleanup, rc = PTR_ERR(olg));
+                        llog_group_set_export(olg, exp);
+
+                        ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
+                        llog_cancel(ctxt, NULL, 1, fcc, 0);
+                        llog_ctxt_put(ctxt);
                         fcc = NULL; /* we didn't allocate fcc, don't free it */
                 }
                 GOTO(cleanup, rc = -ENOENT);
@@ -3680,8 +3798,8 @@ cleanup:
         rc2 = lquota_adjust(filter_quota_interface_ref, obd, qcids, NULL, rc,
                             FSFILT_OP_UNLINK);
 
-        CDEBUG(rc ? D_ERROR : D_QUOTA,
-               "filter adjust qunit! (rc:%d)\n", rc? rc : rc2);
+        if (rc2)
+                CDEBUG(D_QUOTA, "filter adjust qunit! (rc:%d)\n", rc2);
         return rc;
 }
 
@@ -3720,7 +3838,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
-        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
@@ -3735,8 +3852,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
                 /* flush any remaining cancel messages out to the target */
-                ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
-                llog_sync(ctxt, exp);
+                filter_sync_llogs(exp->exp_obd, exp);
                 RETURN(rc);
         }
 
@@ -3783,28 +3899,41 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen,
                 RETURN(-EINVAL);
         }
 
-        if (keylen == strlen("blocksize") &&
-            memcmp(key, "blocksize", keylen) == 0) {
+        if (KEY_IS("blocksize")) {
                 __u32 *blocksize = val;
+                if (blocksize) {
+                        if (*vallen < sizeof(*blocksize))
+                                RETURN(-EOVERFLOW);
+                        *blocksize = obd->u.obt.obt_sb->s_blocksize;
+                }
                 *vallen = sizeof(*blocksize);
-                *blocksize = obd->u.obt.obt_sb->s_blocksize;
                 RETURN(0);
         }
 
-        if (keylen == strlen("blocksize_bits") &&
-            memcmp(key, "blocksize_bits", keylen) == 0) {
+        if (KEY_IS("blocksize_bits")) {
                 __u32 *blocksize_bits = val;
+                if (blocksize_bits) {
+                        if (*vallen < sizeof(*blocksize_bits))
+                                RETURN(-EOVERFLOW);
+                        *blocksize_bits = obd->u.obt.obt_sb->s_blocksize_bits;
+                }
                 *vallen = sizeof(*blocksize_bits);
-                *blocksize_bits = obd->u.obt.obt_sb->s_blocksize_bits;
                 RETURN(0);
         }
 
-        if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
+        if (KEY_IS("last_id")) {
                 obd_id *last_id = val;
                 /* FIXME: object groups */
-                *last_id = filter_last_id(&obd->u.filter, 0);
+                if (last_id) {
+                        if (*vallen < sizeof(*last_id))
+                                RETURN(-EOVERFLOW);
+                        *last_id = filter_last_id(&obd->u.filter,
+                                                  exp->exp_filter_data.fed_group);
+                }
+                *vallen = sizeof(*last_id);
                 RETURN(0);
         }
+
         CDEBUG(D_IOCTL, "invalid key\n");
         RETURN(-EINVAL);
 }
@@ -3814,7 +3943,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                                  struct ptlrpc_request_set *set)
 {
         struct obd_device *obd;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc = 0, group;
         ENTRY;
@@ -3837,8 +3966,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                 RETURN(0);
         }
 
-        if (keylen < strlen(KEY_MDS_CONN) ||
-            memcmp(key, KEY_MDS_CONN, keylen) != 0)
+        if (!KEY_IS(KEY_MDS_CONN))
                 RETURN(-EINVAL);
 
         LCONSOLE_WARN("%s: received MDS connection from %s\n", obd->obd_name,
@@ -3850,12 +3978,16 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
         group = (int)(*(__u32 *)val);
         LASSERT(group >= FILTER_GROUP_MDS0);
 
-        llog = filter_grab_llog_for_group(obd, group, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, LLOG_MDS_OST_REPL_CTXT);
-        LASSERTF(ctxt != NULL, "ctxt is not null\n"),
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERTF(ctxt != NULL, "ctxt is null\n"),
 
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        llog_ctxt_put(ctxt);
 
         lquota_setinfo(filter_quota_interface_ref, exp, obd);
 
@@ -3877,7 +4009,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
         }
 
         case OBD_IOC_SYNC: {
-                CDEBUG(D_HA, "syncing ost %s\n", obd->obd_name);
+                CDEBUG(D_RPCTRACE, "syncing ost %s\n", obd->obd_name);
                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
                 RETURN(rc);
         }
@@ -3964,9 +4096,43 @@ static int filter_process_config(struct obd_device *obd, obd_count len,
         struct lprocfs_static_vars lvars;
         int rc = 0;
 
-        lprocfs_init_vars(filter, &lvars);
+        switch (lcfg->lcfg_command) {
+        case LCFG_SPTLRPC_CONF: {
+                struct filter_obd       *filter = &obd->u.filter;
+                struct sptlrpc_conf_log *log;
+                struct sptlrpc_rule_set  tmp_rset;
+
+                log = sptlrpc_conf_log_extract(lcfg);
+                if (IS_ERR(log)) {
+                        rc = PTR_ERR(log);
+                        break;
+                }
+
+                sptlrpc_rule_set_init(&tmp_rset);
+
+                rc = sptlrpc_rule_set_from_log(&tmp_rset, log);
+                if (rc) {
+                        CERROR("obd %s: failed get sptlrpc rules: %d\n",
+                               obd->obd_name, rc);
+                        break;
+                }
+
+                write_lock(&filter->fo_sptlrpc_lock);
+                sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset);
+                filter->fo_sptlrpc_rset = tmp_rset;
+                write_unlock(&filter->fo_sptlrpc_lock);
+
+                sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
+                break;
+        }
+        default:
+                lprocfs_filter_init_vars(&lvars);
+
+                rc = class_process_proc_param(PARAM_OST, lvars.obd_vars,
+                                              lcfg, obd);
+                break;
+        }
 
-        rc = class_process_proc_param(PARAM_OST, lvars.obd_vars, lcfg, obd);
         return rc;
 }
 
@@ -4014,7 +4180,7 @@ static int __init obdfilter_init(void)
         struct lprocfs_static_vars lvars;
         int rc;
 
-        lprocfs_init_vars(filter, &lvars);
+        lprocfs_filter_init_vars(&lvars);
 
         request_module("lquota");
         OBD_ALLOC(obdfilter_created_scratchpad,
@@ -4024,8 +4190,8 @@ static int __init obdfilter_init(void)
                 return -ENOMEM;
 
         ll_fmd_cachep = cfs_mem_cache_create("ll_fmd_cache",
-                                         sizeof(struct filter_mod_data),
-                                         0, 0);
+                                             sizeof(struct filter_mod_data),
+                                             0, 0);
         if (!ll_fmd_cachep)
                 GOTO(out, rc = -ENOMEM);