Whamcloud - gitweb
use special macro for print time_t, cleanup in includes.
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index bcdd5f5..f00b738 100644 (file)
@@ -47,6 +47,7 @@
 #include <linux/mount.h>
 #include <linux/buffer_head.h>
 
+#include <obd_cksum.h>
 #include <obd_class.h>
 #include <obd_lov.h>
 #include <lustre_dlm.h>
@@ -63,7 +64,7 @@
 #include "filter_internal.h"
 
 /* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS 3
+#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0
 static struct lvfs_callback_ops filter_lvfs_ops;
 cfs_mem_cache_t *ll_fmd_cachep;
 
@@ -82,7 +83,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
         struct filter_client_data *fcd = fed->fed_fcd;
         __u64 last_rcvd;
         loff_t off;
-        int err, log_pri = D_HA;
+        int err, log_pri = D_RPCTRACE;
 
         /* Propagate error code. */
         if (rc)
@@ -159,14 +160,34 @@ static void init_brw_stats(struct brw_stats *brw_stats)
                 spin_lock_init(&brw_stats->hist[i].oh_lock);
 }
 
+static int lprocfs_init_rw_stats(struct obd_device *obd,
+                                 struct lprocfs_stats **stats)
+{
+        int num_stats;
+
+        num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) +
+                                                        LPROC_FILTER_LAST - 1;
+        *stats = lprocfs_alloc_stats(num_stats, 0);
+        if (*stats == NULL)
+                return -ENOMEM;
+
+        lprocfs_init_ops_stats(LPROC_FILTER_LAST, *stats);
+        lprocfs_counter_init(*stats, LPROC_FILTER_READ_BYTES,
+                             LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
+        lprocfs_counter_init(*stats, LPROC_FILTER_WRITE_BYTES,
+                             LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
+
+        return(0);
+}
+
 /* brw_stats are 2128, ops are 3916, ldlm are 204, so 6248 bytes per client,
    plus the procfs overhead :( */
 static int filter_export_stats_init(struct obd_device *obd,
-                                    struct obd_export *exp)
+                                    struct obd_export *exp,
+                                    void *client_nid)
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
-        struct proc_dir_entry *brw_entry;
-        int rc, num_stats;
+        int rc, newnid = 0;
         ENTRY;
 
         init_brw_stats(&fed->fed_brw_stats);
@@ -175,30 +196,35 @@ static int filter_export_stats_init(struct obd_device *obd,
                 /* Self-export gets no proc entry */
                 RETURN(0);
 
-        rc = lprocfs_exp_setup(exp);
+        rc = lprocfs_exp_setup(exp, client_nid, &newnid);
         if (rc)
                 RETURN(rc);
 
-        /* Create a per export proc entry for brw_stats */
-        brw_entry = create_proc_entry("brw_stats", 0644, exp->exp_proc);
-        if (brw_entry == NULL)
-               RETURN(-ENOMEM);
-        brw_entry->proc_fops = &filter_per_export_stats_fops;
-        brw_entry->data = fed;
+        if (newnid) {
+                struct nid_stat *tmp = exp->exp_nid_stats;
+                LASSERT(tmp != NULL);
+
+                OBD_ALLOC(tmp->nid_brw_stats, sizeof(struct brw_stats));
+                if (tmp->nid_brw_stats == NULL)
+                        RETURN(-ENOMEM);
+
+                init_brw_stats(tmp->nid_brw_stats);
+                rc = lprocfs_seq_create(exp->exp_nid_stats->nid_proc, "brw_stats",
+                                        0644, &filter_per_nid_stats_fops,
+                                        exp->exp_nid_stats);
+                if (rc)
+                        CWARN("Error adding the brw_stats file\n");
+
+                rc = lprocfs_init_rw_stats(obd, &exp->exp_nid_stats->nid_stats);
+                if (rc)
+                        RETURN(rc);
+
+                rc = lprocfs_register_stats(tmp->nid_proc, "stats",
+                                            tmp->nid_stats);
+                if (rc)
+                        RETURN(rc);
+        }
 
-        /* Create a per export proc entry for ops stats */
-        num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) +
-                     LPROC_FILTER_LAST - 1;
-        exp->exp_ops_stats = lprocfs_alloc_stats(num_stats,
-                                                 LPROCFS_STATS_FLAG_NOPERCPU);
-        if (exp->exp_ops_stats == NULL)
-              RETURN(-ENOMEM);
-        lprocfs_init_ops_stats(LPROC_FILTER_LAST, exp->exp_ops_stats);
-        lprocfs_counter_init(exp->exp_ops_stats, LPROC_FILTER_READ_BYTES,
-                             LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
-        lprocfs_counter_init(exp->exp_ops_stats, LPROC_FILTER_WRITE_BYTES,
-                             LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
-        lprocfs_register_stats(exp->exp_proc, "stats", exp->exp_ops_stats);
         RETURN(0);
 }
 
@@ -548,8 +574,8 @@ static void filter_fmd_cleanup(struct obd_export *exp)
 static int filter_init_export(struct obd_export *exp)
 {
         spin_lock_init(&exp->exp_filter_data.fed_lock);
-        INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
-       
+        CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
+
         spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
@@ -771,7 +797,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                         fed = &exp->exp_filter_data;
                         fed->fed_fcd = fcd;
                         fed->fed_group = le32_to_cpu(fcd->fcd_group);
-                        filter_export_stats_init(obd, exp);
+                        filter_export_stats_init(obd, exp, NULL);
                         rc = filter_client_add(obd, exp, cl_idx);
                         /* can't fail for existing client */
                         LASSERTF(rc == 0, "rc = %d\n", rc);
@@ -1388,6 +1414,8 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
 
         if (IS_ERR(dparent))
                 return dparent;
+        if (dparent == NULL)
+                return ERR_PTR(-ENOENT);
 
         rc = filter_lock_dentry(obd, dparent);
         fsfilt_check_slow(obd, now, obd_timeout, "parent lock");
@@ -1564,20 +1592,68 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         return(rc);
 }
 
+struct filter_intent_args {
+        struct ldlm_lock **victim;
+        __u64 size;
+        int *liblustre;
+};
+
+static enum interval_iter filter_intent_cb(struct interval_node *n,
+                                           void *args)
+{
+        struct ldlm_interval *node = (struct ldlm_interval *)n;
+        struct filter_intent_args *arg = (struct filter_intent_args*)args;
+        __u64 size = arg->size;
+        struct ldlm_lock **v = arg->victim;
+        struct ldlm_lock *lck;
+
+        /* If the interval is lower than the current file size,
+         * just break. */
+        if (interval_high(n) <= size)
+                return INTERVAL_ITER_STOP;
+
+        list_for_each_entry(lck, &node->li_group, l_sl_policy) {
+                /* Don't send glimpse ASTs to liblustre clients.
+                 * They aren't listening for them, and they do
+                 * entirely synchronous I/O anyways. */
+                if (lck->l_export == NULL ||
+                    lck->l_export->exp_libclient == 1)
+                        continue;
+
+                if (*arg->liblustre)
+                        *arg->liblustre = 0;
+
+                if (*v == NULL) {
+                        *v = LDLM_LOCK_GET(lck);
+                } else if ((*v)->l_policy_data.l_extent.start <
+                           lck->l_policy_data.l_extent.start) {
+                        LDLM_LOCK_PUT(*v);
+                        *v = LDLM_LOCK_GET(lck);
+                }
+
+                /* the same policy group - every lock has the
+                 * same extent, so needn't do it any more */
+                break;
+        }
+
+        return INTERVAL_ITER_CONT;
+}
+
 static int filter_intent_policy(struct ldlm_namespace *ns,
                                 struct ldlm_lock **lockp, void *req_cookie,
                                 ldlm_mode_t mode, int flags, void *data)
 {
-        struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+        CFS_LIST_HEAD(rpc_list);
         struct ptlrpc_request *req = req_cookie;
         struct ldlm_lock *lock = *lockp, *l = NULL;
         struct ldlm_resource *res = lock->l_resource;
         ldlm_processing_policy policy;
         struct ost_lvb *res_lvb, *reply_lvb;
         struct ldlm_reply *rep;
-        struct list_head *tmp;
         ldlm_error_t err;
-        int rc, tmpflags = 0, only_liblustre = 0;
+        int idx, rc, tmpflags = 0, only_liblustre = 1;
+        struct ldlm_interval_tree *tree;
+        struct filter_intent_args arg;
         int repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                            [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                            [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb) };
@@ -1602,7 +1678,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
 
         /* If we grant any lock at all, it will be a whole-file read lock.
          * Call the extent policy function to see if our request can be
-         * granted, or is blocked. */
+         * granted, or is blocked. 
+         * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse lock
+         */
         lock->l_policy_data.l_extent.start = 0;
         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
         lock->l_req_mode = LCK_PR;
@@ -1650,42 +1728,23 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         LASSERT(res_lvb != NULL);
         *reply_lvb = *res_lvb;
 
-        list_for_each(tmp, &res->lr_granted) {
-                struct ldlm_lock *tmplock =
-                        list_entry(tmp, struct ldlm_lock, l_res_link);
-
-                if (tmplock->l_granted_mode == LCK_PR)
-                        continue;
-                /*
-                 * ->ns_lock guarantees that no new locks are granted, and,
-                 * therefore, that res->lr_lvb_data cannot increase beyond the
-                 * end of already granted lock. As a result, it is safe to
-                 * check against "stale" reply_lvb->lvb_size value without
-                 * res->lr_lvb_sem.
-                 */
-                if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
-                        continue;
-
-                /* Don't send glimpse ASTs to liblustre clients.  They aren't
-                 * listening for them, and they do entirely synchronous I/O
-                 * anyways. */
-                if (tmplock->l_export == NULL ||
-                    tmplock->l_export->exp_libclient == 1) {
-                        only_liblustre = 1;
-                        continue;
-                }
-
-                if (l == NULL) {
-                        l = LDLM_LOCK_GET(tmplock);
-                        continue;
-                }
-
-                if (l->l_policy_data.l_extent.start >
-                    tmplock->l_policy_data.l_extent.start)
+        /*
+         * ->ns_lock guarantees that no new locks are granted, and,
+         * therefore, that res->lr_lvb_data cannot increase beyond the
+         * end of already granted lock. As a result, it is safe to
+         * check against "stale" reply_lvb->lvb_size value without
+         * res->lr_lvb_sem.
+         */
+        arg.size = reply_lvb->lvb_size;
+        arg.victim = &l;
+        arg.liblustre = &only_liblustre;
+        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+                tree = &res->lr_itree[idx];
+                if (tree->lit_mode == LCK_PR)
                         continue;
 
-                LDLM_LOCK_PUT(l);
-                l = LDLM_LOCK_GET(tmplock);
+                interval_iterate_reverse(tree->lit_root, 
+                                         filter_intent_cb, &arg);
         }
         unlock_res(res);
 
@@ -1929,18 +1988,21 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
 
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
-        INIT_LIST_HEAD(&filter->fo_export_list);
+        CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         sema_init(&filter->fo_alloc_lock, 1);
         init_brw_stats(&filter->fo_filter_stats);
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
         filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
         filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
 
-        INIT_LIST_HEAD(&filter->fo_llog_list);
+        CFS_INIT_LIST_HEAD(&filter->fo_llog_list);
         spin_lock_init(&filter->fo_llog_list_lock);
 
+        filter->fo_sptlrpc_lock = RW_LOCK_UNLOCKED;
+        sptlrpc_rule_set_init(&filter->fo_sptlrpc_rset);
+
         filter->fo_fl_oss_capa = 0;
-        INIT_LIST_HEAD(&filter->fo_capa_keys);
+        CFS_INIT_LIST_HEAD(&filter->fo_capa_keys);
         filter->fo_capa_hash = init_capa_hash();
         if (filter->fo_capa_hash == NULL)
                 GOTO(err_ops, rc = -ENOMEM);
@@ -1957,7 +2019,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = llog_cat_initialize(obd, NULL, 1, NULL);
+        rc = llog_cat_initialize(obd, &obd->obd_olg, 1, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
@@ -2034,7 +2096,7 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
 
         /* lprocfs must be setup before the filter so state can be safely added
          * to /proc incrementally as the filter is setup */
-        lprocfs_init_vars(filter, &lvars);
+        lprocfs_filter_init_vars(&lvars);
         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
             lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) {
                 /* Init obdfilter private stats here */
@@ -2046,9 +2108,20 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                                      "write_bytes", "bytes");
 
                 lproc_filter_attach_seqstat(obd);
-                obd->obd_proc_exports = proc_mkdir("exports",
-                                                   obd->obd_proc_entry);
+                obd->obd_proc_exports_entry = lprocfs_register("exports",
+                                                        obd->obd_proc_entry,
+                                                        NULL, NULL);
+                if (IS_ERR(obd->obd_proc_exports_entry)) {
+                        rc = PTR_ERR(obd->obd_proc_exports_entry);
+                        CERROR("error %d setting up lprocfs for %s\n",
+                               rc, "exports");
+                        obd->obd_proc_exports_entry = NULL;
+                }
         }
+        if (obd->obd_proc_exports_entry)
+                lprocfs_add_simple(obd->obd_proc_exports_entry, "clear",
+                                   lprocfs_nid_stats_clear_read,
+                                   lprocfs_nid_stats_clear_write, obd);
 
         memcpy((void *)addr, lustre_cfg_buf(lcfg, 4),
                LUSTRE_CFG_BUFLEN(lcfg, 4));
@@ -2056,8 +2129,10 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         OBD_PAGE_FREE(page);
 
         if (rc) {
-                lprocfs_obd_cleanup(obd);
+                lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
+                lprocfs_free_per_client_stats(obd);
                 lprocfs_free_obd_stats(obd);
+                lprocfs_obd_cleanup(obd);
         }
 
         return rc;
@@ -2070,67 +2145,81 @@ static struct llog_operations filter_size_orig_logops = {
         lop_add: llog_obd_origin_add
 };
 
-static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int filter_llog_init(struct obd_device *obd, int group,
                             struct obd_device *tgt, int count,
                             struct llog_catid *catid,
                             struct obd_uuid *uuid)
 {
+        struct filter_obd *filter = &obd->u.filter;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+
+        if (group == OBD_LLOG_GROUP) {
+                LASSERT(filter->fo_lcm == NULL);
+                OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+                if (!filter->fo_lcm)
+                        RETURN(-ENOMEM);
+
+                rc = llog_init_commit_master((struct llog_commit_master *)
+                                             filter->fo_lcm);
+                if (rc)
+                        GOTO(cleanup, rc);
+
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
-
-        rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+        } else {
+                LASSERT(filter->fo_lcm != NULL);
+        }
+        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
                         &filter_mds_ost_repl_logops);
         if (rc)
-                RETURN(rc);
+                GOTO(cleanup, rc);
 
         /* FIXME - assign unlink_cb for filter's recovery */
-        if (!llogs)
-                ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        else
-                ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        LASSERT(olg);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
 
         LASSERT(ctxt != NULL);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+        ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        rc = llog_start_commit_thread(ctxt->loc_lcm);
+        llog_ctxt_put(ctxt);
+        if (rc)
+                GOTO(cleanup, rc);
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
                         &filter_size_orig_logops);
-        RETURN(rc);
-}
-
-static int filter_group_llog_cleanup(struct llog_ctxt *ctxt)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (CTXTP(ctxt, cleanup))
-                rc = CTXTP(ctxt, cleanup)(ctxt);
-
-        if (ctxt->loc_exp)
-                class_export_put(ctxt->loc_exp);
-        OBD_FREE(ctxt, sizeof(*ctxt));
 
+cleanup:
+        if (rc) {
+                llog_cleanup_commit_master(filter->fo_lcm, 0);
+                OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
+                filter->fo_lcm = NULL;
+        }
         RETURN(rc);
 }
 
-static int filter_group_llog_finish(struct obd_llogs *llogs)
+static int filter_group_llog_finish(struct obd_llog_group *olg)
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
         ENTRY;
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
         if (ctxt)
-                rc = filter_group_llog_cleanup(ctxt);
+                rc = llog_cleanup(ctxt);
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_SIZE_ORIG_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
         if (ctxt)
-                rc2 = filter_group_llog_cleanup(ctxt);
+                rc2 = llog_cleanup(ctxt);
         if (!rc)
                 rc = rc2;
 
@@ -2139,89 +2228,66 @@ static int filter_group_llog_finish(struct obd_llogs *llogs)
 
 static int filter_llog_finish(struct obd_device *obd, int count)
 {
-        struct llog_ctxt *ctxt;
-        int rc = 0, rc2 = 0;
+        int rc;
         ENTRY;
 
-        ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        if (ctxt)
-                rc = llog_cleanup(ctxt);
-
-        ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
-        if (ctxt)
-                rc2 = llog_cleanup(ctxt);
-        if (!rc)
-                rc = rc2;
+        if (obd->u.filter.fo_lcm) { 
+                llog_cleanup_commit_master((struct llog_commit_master *)
+                                           obd->u.filter.fo_lcm, 0);
+                OBD_FREE(obd->u.filter.fo_lcm, 
+                         sizeof(struct llog_commit_master));
+                obd->u.filter.fo_lcm = NULL;
+        }
+        /* finish obd llog group */
+        rc = filter_group_llog_finish(&obd->obd_olg);
 
         RETURN(rc);
 }
 
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
-                                             struct obd_export *export)
+struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg, *nolg;
         struct filter_obd *filter;
-        struct llog_ctxt *ctxt;
-        struct list_head *cur;
         int rc;
 
         filter = &obd->u.filter;
 
+        if (group == OBD_LLOG_GROUP)
+                RETURN(&obd->obd_olg);
+
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                fglog = list_entry(cur, struct filter_group_llog, list);
-                if (fglog->group == group) {
-                        if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
-                                CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
-                                      obd->obd_name, group, fglog->exp, export);
+        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                if (olg->olg_group == group) {
                         spin_unlock(&filter->fo_llog_list_lock);
-                        goto init;
+                        RETURN(olg);
                 }
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        if (export == NULL)
-                RETURN(NULL);
-
-        OBD_ALLOC_PTR(fglog);
-        if (fglog == NULL)
-                RETURN(NULL);
-        fglog->group = group;
-
-        OBD_ALLOC_PTR(fglog->llogs);
-        if (fglog->llogs == NULL) {
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
+        OBD_ALLOC_PTR(olg);
+        if (olg == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
+        llog_group_init(olg, group);
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                nlog = list_entry(cur, struct filter_group_llog, list);
-                LASSERT(nlog->group != group);
+        list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
+                LASSERT(nolg->olg_group != group);
         }
-        list_add(&fglog->list, &filter->fo_llog_list);
+        list_add(&olg->olg_list, &filter->fo_llog_list);
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL);
+        rc = llog_cat_initialize(obd, olg, 1, NULL);
         if (rc) {
-                OBD_FREE_PTR(fglog->llogs);
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
-
-init:
-        if (export) {
-                fglog->exp = export;
-                ctxt = llog_get_context_from_llogs(fglog->llogs,
-                                               LLOG_MDS_OST_REPL_CTXT);
-                LASSERT(ctxt != NULL);
-
-                llog_receptor_accept(ctxt, export->exp_imp_reverse);
+                spin_lock(&filter->fo_llog_list_lock);
+                list_del(&olg->olg_list);
+                spin_unlock(&filter->fo_llog_list_lock);
+                OBD_FREE_PTR(olg);
+                RETURN(ERR_PTR(rc));
         }
-        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
-               obd->obd_name, fglog->llogs, group);
+        CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
+               obd->obd_name, group, olg);
 
-        RETURN(fglog->llogs);
+        RETURN(olg);
 }
 
 static int filter_llog_connect(struct obd_export *exp,
@@ -2229,7 +2295,7 @@ static int filter_llog_connect(struct obd_export *exp,
 {
         struct obd_device *obd = exp->exp_obd;
         struct llog_ctxt *ctxt;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         int rc;
         ENTRY;
 
@@ -2238,13 +2304,17 @@ static int filter_llog_connect(struct obd_export *exp,
                 (unsigned) body->lgdc_logid.lgl_oid,
                 (unsigned) body->lgdc_logid.lgl_ogen);
 
-        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, body->lgdc_ctxt_idx);
+        olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
         LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
                  body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
+        llog_ctxt_put(ctxt);
         if (rc != 0)
                 CERROR("failed to connect rc %d idx %d\n", rc,
                                 body->lgdc_ctxt_idx);
@@ -2254,33 +2324,32 @@ static int filter_llog_connect(struct obd_export *exp,
 
 static int filter_llog_preclean (struct obd_device *obd)
 {
-        struct filter_group_llog *log;
+        struct obd_llog_group *olg;
         struct filter_obd *filter;
         int rc = 0;
         ENTRY;
 
+        rc = obd_llog_finish(obd, 0);
+        if (rc)
+                CERROR("failed to cleanup llogging subsystem\n");
+
         filter = &obd->u.filter;
         spin_lock(&filter->fo_llog_list_lock);
         while (!list_empty(&filter->fo_llog_list)) {
-                log = list_entry(filter->fo_llog_list.next,
-                                 struct filter_group_llog, list);
-                list_del(&log->list);
+                olg = list_entry(filter->fo_llog_list.next,
+                                 struct obd_llog_group, olg_list);
+                list_del(&olg->olg_list);
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                rc = filter_group_llog_finish(log->llogs);
+                rc = filter_group_llog_finish(olg);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
-                                log->group);
-                OBD_FREE_PTR(log->llogs);
-                OBD_FREE_PTR(log);
+                               olg->olg_group);
+                OBD_FREE_PTR(olg);
                 spin_lock(&filter->fo_llog_list_lock);
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = obd_llog_finish(obd, 0);
-        if (rc)
-                CERROR("failed to cleanup llogging subsystem\n");
-
         RETURN(rc);
 }
 
@@ -2295,9 +2364,9 @@ static int filter_precleanup(struct obd_device *obd,
                 break;
         case OBD_CLEANUP_EXPORTS:
                 target_cleanup_recovery(obd);
+                rc = filter_llog_preclean(obd);
                 break;
         case OBD_CLEANUP_SELF_EXP:
-                rc = filter_llog_preclean(obd);
                 break;
         case OBD_CLEANUP_OBD:
                 break;
@@ -2323,8 +2392,10 @@ static int filter_cleanup(struct obd_device *obd)
                 }
         }
 
-        lprocfs_obd_cleanup(obd);
+        lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
+        lprocfs_free_per_client_stats(obd);
         lprocfs_free_obd_stats(obd);
+        lprocfs_obd_cleanup(obd);
         lquota_cleanup(filter_quota_interface_ref, obd);
 
         /* Stop recovery before namespace cleanup. */
@@ -2333,14 +2404,15 @@ static int filter_cleanup(struct obd_device *obd)
 
         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
 
+        sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset);
+
         if (obd->u.obt.obt_sb == NULL)
                 RETURN(0);
 
         filter_post(obd);
 
-        shrink_dcache_parent(obd->u.obt.obt_sb->s_root);
-
         LL_DQUOT_OFF(obd->u.obt.obt_sb);
+        shrink_dcache_sb(obd->u.obt.obt_sb);
 
         server_put_mount(obd->obd_name, filter->fo_vfsmnt);
         obd->u.obt.obt_sb = NULL;
@@ -2370,6 +2442,14 @@ static int filter_connect_internal(struct obd_export *exp,
         exp->exp_connect_flags = data->ocd_connect_flags;
         data->ocd_version = LUSTRE_VERSION_CODE;
 
+        if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
+                CWARN("%s: OST requires FID support (flag="LPX64
+                      "), but client not\n",
+                      exp->exp_obd->obd_name,
+                      exp->exp_connect_flags);
+                RETURN(-EBADF);
+        }
+
         if (exp->exp_connect_flags & OBD_CONNECT_GRANT) {
                 struct filter_export_data *fed = &exp->exp_filter_data;
                 obd_size left, want;
@@ -2418,6 +2498,30 @@ static int filter_connect_internal(struct obd_export *exp,
                 LASSERT(data->ocd_brw_size);
         }
 
+        if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) {
+                __u32 cksum_types = data->ocd_cksum_types;
+
+                /* The client set in ocd_cksum_types the checksum types it
+                 * supports. We have to mask off the algorithms that we don't
+                 * support */
+                if (cksum_types & OBD_CKSUM_ALL)
+                        data->ocd_cksum_types &= OBD_CKSUM_ALL;
+                else
+                        data->ocd_cksum_types = OBD_CKSUM_CRC32;
+
+                CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
+                                   "%x\n", exp->exp_obd->obd_name,
+                                   obd_export_nid2str(exp), cksum_types,
+                                   data->ocd_cksum_types);
+        } else {
+                /* This client does not support OBD_CONNECT_CKSUM
+                 * fall back to CRC32 */
+                CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
+                                   "OBD_CONNECT_CKSUM, CRC32 will be used\n",
+                                   exp->exp_obd->obd_name,
+                                   obd_export_nid2str(exp));
+        }
+
         /* FIXME: Do the same with the MDS UUID and fsd_peeruuid.
          * FIXME: We don't strictly need the COMPAT flag for that,
          * FIXME: as fsd_peeruuid[0] will tell us if that is set.
@@ -2426,7 +2530,8 @@ static int filter_connect_internal(struct obd_export *exp,
         RETURN(0);
 }
 
-static int filter_reconnect(struct obd_export *exp, struct obd_device *obd,
+static int filter_reconnect(const struct lu_env *env,
+                            struct obd_export *exp, struct obd_device *obd,
                             struct obd_uuid *cluuid,
                             struct obd_connect_data *data)
 {
@@ -2445,7 +2550,7 @@ static int filter_reconnect(struct obd_export *exp, struct obd_device *obd,
 static int filter_connect(const struct lu_env *env,
                           struct lustre_handle *conn, struct obd_device *obd,
                           struct obd_uuid *cluuid,
-                          struct obd_connect_data *data)
+                          struct obd_connect_data *data, void *localdata)
 {
         struct lvfs_run_ctxt saved;
         struct obd_export *exp;
@@ -2470,7 +2575,7 @@ static int filter_connect(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
-        filter_export_stats_init(obd, exp);
+        filter_export_stats_init(obd, exp, localdata);
         group = data->ocd_group;
         if (obd->obd_replayable) {
                 OBD_ALLOC(fcd, sizeof(*fcd));
@@ -2645,6 +2750,10 @@ static int filter_destroy_export(struct obd_export *exp)
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
                        exp, exp->exp_filter_data.fed_pending);
 
+        /* Not ported yet the b1_6 quota functionality
+         * lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
+         */
+
         target_destroy_export(exp);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
@@ -2668,7 +2777,7 @@ static int filter_destroy_export(struct obd_export *exp)
 
 static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg_min, *olg;
         struct filter_obd *filter;
         int worked = 0, group;
         struct llog_ctxt *ctxt;
@@ -2681,35 +2790,41 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
          * group order and skip already synced llogs -bzzz */
         do {
                 /* look for group with min. number, but > worked */
-                fglog = NULL;
+                olg_min = NULL;
                 group = 1 << 30;
                 spin_lock(&filter->fo_llog_list_lock);
-                list_for_each_entry(nlog, &filter->fo_llog_list, list) {
-                        if (nlog->group <= worked) {
+                list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                        if (olg->olg_group <= worked) {
                                 /* this group is already synced */
                                 continue;
                         }
-                        if (group < nlog->group) {
+                        if (group < olg->olg_group) {
                                 /* we have group with smaller number to sync */
                                 continue;
                         }
                         /* store current minimal group */
-                        fglog = nlog;
-                        group = nlog->group;
+                        olg_min = olg;
+                        group = olg->olg_group;
                 }
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                if (fglog == NULL)
+                if (olg_min == NULL)
                         break;
 
-                worked = fglog->group;
-                if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
-                        ctxt = llog_get_context_from_llogs(fglog->llogs,
+                worked = olg_min->olg_group;
+                if (olg_min->olg_exp &&
+                    (dexp == olg_min->olg_exp || dexp == NULL)) {
+                        int err;
+                        ctxt = llog_group_get_ctxt(olg_min,
                                                 LLOG_MDS_OST_REPL_CTXT);
                         LASSERT(ctxt != NULL);
-                        llog_sync(ctxt, fglog->exp);
+                        err = llog_sync(ctxt, olg_min->olg_exp);
+                        llog_ctxt_put(ctxt);
+                        if (err)
+                                CERROR("error flushing logs to MDS: rc %d\n",
+                                       err);                        
                 }
-        } while (fglog != NULL);
+        } while (olg_min != NULL);
 }
 
 /* also incredibly similar to mds_disconnect */
@@ -2772,8 +2887,8 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
 
         if (IS_ERR(dchild)) {
-                CERROR("%s error looking up object: "LPU64"\n",
-                       what, oa->o_id);
+                CERROR("%s error looking up object: "LPU64":"LPU64"\n",
+                       what, group, oa->o_id);
                 RETURN(dchild);
         }
 
@@ -3019,6 +3134,26 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
         if (rc)
                 RETURN(rc);
 
+        /* This would be very bad - accidentally truncating a file when
+         * changing the time or similar - bug 12203. */
+        if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE && 
+            oinfo->oi_policy.l_extent.end != OBD_OBJECT_EOF) {
+                static char mdsinum[48];
+
+                if (oinfo->oi_oa->o_valid & OBD_MD_FLFID)
+                        snprintf(mdsinum, sizeof(mdsinum) - 1,
+                                 " of inode "LPU64"/%u", oinfo->oi_oa->o_fid,
+                                 oinfo->oi_oa->o_generation);
+                else
+                        mdsinum[0] = '\0';
+
+                CERROR("%s: setattr from %s trying to truncate objid "LPU64
+                       " %s\n",
+                       exp->exp_obd->obd_name, obd_export_nid2str(exp),
+                       oinfo->oi_oa->o_id, mdsinum);
+                RETURN(-EPERM);
+        }
+
         dentry = __filter_oa2dentry(exp->exp_obd, oinfo->oi_oa,
                                     __FUNCTION__, 1);
         if (IS_ERR(dentry))
@@ -3255,7 +3390,7 @@ out:
 }
 
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                         __u64 max_age)
+                         __u64 max_age, __u32 flags)
 {
         struct filter_obd *filter = &obd->u.filter;
         int blockbits = obd->u.obt.obt_sb->s_blocksize_bits;
@@ -3351,7 +3486,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 OBD_ALLOC(osfs, sizeof(*osfs));
                 if (osfs == NULL)
                         RETURN(-ENOMEM);
-                rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ);
+                rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ, 0);
                 if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
                         CDEBUG(D_RPCTRACE,"%s: not enough space for create "
                                LPU64"\n", obd->obd_name, osfs->os_bavail <<
@@ -3599,9 +3734,17 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                        oa->o_id);
                 /* If object already gone, cancel cookie right now */
                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
+                        struct llog_ctxt *ctxt;
+                        struct obd_llog_group *olg;
                         fcc = obdo_logcookie(oa);
-                        llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
-                                    NULL, 1, fcc, 0);
+                        olg = filter_find_olg(obd, oa->o_gr);
+                        if (IS_ERR(olg))
+                                GOTO(cleanup, rc = PTR_ERR(olg));
+                        llog_group_set_export(olg, exp);
+
+                        ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
+                        llog_cancel(ctxt, NULL, 1, fcc, 0);
+                        llog_ctxt_put(ctxt);
                         fcc = NULL; /* we didn't allocate fcc, don't free it */
                 }
                 GOTO(cleanup, rc = -ENOENT);
@@ -3754,7 +3897,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
-        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
@@ -3769,8 +3911,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
                 /* flush any remaining cancel messages out to the target */
-                ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
-                llog_sync(ctxt, exp);
+                filter_sync_llogs(exp->exp_obd, exp);
                 RETURN(rc);
         }
 
@@ -3817,28 +3958,41 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen,
                 RETURN(-EINVAL);
         }
 
-        if (keylen == strlen("blocksize") &&
-            memcmp(key, "blocksize", keylen) == 0) {
+        if (KEY_IS("blocksize")) {
                 __u32 *blocksize = val;
+                if (blocksize) {
+                        if (*vallen < sizeof(*blocksize))
+                                RETURN(-EOVERFLOW);
+                        *blocksize = obd->u.obt.obt_sb->s_blocksize;
+                }
                 *vallen = sizeof(*blocksize);
-                *blocksize = obd->u.obt.obt_sb->s_blocksize;
                 RETURN(0);
         }
 
-        if (keylen == strlen("blocksize_bits") &&
-            memcmp(key, "blocksize_bits", keylen) == 0) {
+        if (KEY_IS("blocksize_bits")) {
                 __u32 *blocksize_bits = val;
+                if (blocksize_bits) {
+                        if (*vallen < sizeof(*blocksize_bits))
+                                RETURN(-EOVERFLOW);
+                        *blocksize_bits = obd->u.obt.obt_sb->s_blocksize_bits;
+                }
                 *vallen = sizeof(*blocksize_bits);
-                *blocksize_bits = obd->u.obt.obt_sb->s_blocksize_bits;
                 RETURN(0);
         }
 
-        if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
+        if (KEY_IS("last_id")) {
                 obd_id *last_id = val;
                 /* FIXME: object groups */
-                *last_id = filter_last_id(&obd->u.filter, 0);
+                if (last_id) {
+                        if (*vallen < sizeof(*last_id))
+                                RETURN(-EOVERFLOW);
+                        *last_id = filter_last_id(&obd->u.filter,
+                                                  exp->exp_filter_data.fed_group);
+                }
+                *vallen = sizeof(*last_id);
                 RETURN(0);
         }
+
         CDEBUG(D_IOCTL, "invalid key\n");
         RETURN(-EINVAL);
 }
@@ -3848,7 +4002,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                                  struct ptlrpc_request_set *set)
 {
         struct obd_device *obd;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc = 0, group;
         ENTRY;
@@ -3871,8 +4025,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                 RETURN(0);
         }
 
-        if (keylen < strlen(KEY_MDS_CONN) ||
-            memcmp(key, KEY_MDS_CONN, keylen) != 0)
+        if (!KEY_IS(KEY_MDS_CONN))
                 RETURN(-EINVAL);
 
         LCONSOLE_WARN("%s: received MDS connection from %s\n", obd->obd_name,
@@ -3884,12 +4037,16 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
         group = (int)(*(__u32 *)val);
         LASSERT(group >= FILTER_GROUP_MDS0);
 
-        llog = filter_grab_llog_for_group(obd, group, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, LLOG_MDS_OST_REPL_CTXT);
-        LASSERTF(ctxt != NULL, "ctxt is not null\n"),
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERTF(ctxt != NULL, "ctxt is null\n"),
 
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        llog_ctxt_put(ctxt);
 
         lquota_setinfo(filter_quota_interface_ref, exp, obd);
 
@@ -3998,9 +4155,43 @@ static int filter_process_config(struct obd_device *obd, obd_count len,
         struct lprocfs_static_vars lvars;
         int rc = 0;
 
-        lprocfs_init_vars(filter, &lvars);
+        switch (lcfg->lcfg_command) {
+        case LCFG_SPTLRPC_CONF: {
+                struct filter_obd       *filter = &obd->u.filter;
+                struct sptlrpc_conf_log *log;
+                struct sptlrpc_rule_set  tmp_rset;
+
+                log = sptlrpc_conf_log_extract(lcfg);
+                if (IS_ERR(log)) {
+                        rc = PTR_ERR(log);
+                        break;
+                }
+
+                sptlrpc_rule_set_init(&tmp_rset);
+
+                rc = sptlrpc_rule_set_from_log(&tmp_rset, log);
+                if (rc) {
+                        CERROR("obd %s: failed get sptlrpc rules: %d\n",
+                               obd->obd_name, rc);
+                        break;
+                }
+
+                write_lock(&filter->fo_sptlrpc_lock);
+                sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset);
+                filter->fo_sptlrpc_rset = tmp_rset;
+                write_unlock(&filter->fo_sptlrpc_lock);
+
+                sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
+                break;
+        }
+        default:
+                lprocfs_filter_init_vars(&lvars);
+
+                rc = class_process_proc_param(PARAM_OST, lvars.obd_vars,
+                                              lcfg, obd);
+                break;
+        }
 
-        rc = class_process_proc_param(PARAM_OST, lvars.obd_vars, lcfg, obd);
         return rc;
 }
 
@@ -4048,7 +4239,7 @@ static int __init obdfilter_init(void)
         struct lprocfs_static_vars lvars;
         int rc;
 
-        lprocfs_init_vars(filter, &lvars);
+        lprocfs_filter_init_vars(&lvars);
 
         request_module("lquota");
         OBD_ALLOC(obdfilter_created_scratchpad,