Whamcloud - gitweb
Stop error spew if no real error, doh.
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index e566fbb..d2f6369 100644 (file)
@@ -53,6 +53,7 @@
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_log.h>
 #include <linux/lustre_commit_confd.h>
+#include <portals/list.h>
 
 #include "filter_internal.h"
 
@@ -93,7 +94,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                 filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
                 spin_unlock(&filter->fo_translock);
                 oti->oti_transno = last_rcvd;
-        } else { 
+        } else {
                 spin_lock(&filter->fo_translock);
                 last_rcvd = oti->oti_transno;
                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
@@ -102,7 +103,6 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                 spin_unlock(&filter->fo_translock);
         }
         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
-        fcd->fcd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         /* could get xid from oti, if it's ever needed */
         fcd->fcd_last_xid = 0;
@@ -249,7 +249,9 @@ static int filter_client_free(struct obd_export *exp, int flags)
         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
 
-        if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
+        /* Clear the bit _after_ zeroing out the client so we don't
+           race with filter_client_add and zero out new clients.*/
+        if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
                        fed->fed_lr_idx);
                 LBUG();
@@ -266,6 +268,12 @@ static int filter_client_free(struct obd_export *exp, int flags)
                fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
                LAST_RCVD, rc);
 
+        if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
+                CERROR("FILTER client %u: bit already clear in bitmap!!\n",
+                       fed->fed_lr_idx);
+                LBUG();
+        }
+
 free:
         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
 
@@ -420,7 +428,8 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start);
              off < last_rcvd_size; cl_idx++) {
                 __u64 last_rcvd;
-                int mount_age;
+                struct obd_export *exp;
+                struct filter_export_data *fed;
 
                 if (!fcd) {
                         OBD_ALLOC(fcd, sizeof(*fcd));
@@ -451,34 +460,24 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 /* These exports are cleaned up by filter_disconnect(), so they
                  * need to be set up like real exports as filter_connect() does.
                  */
-                mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
-                if (mount_age < FILTER_MOUNT_RECOV) {
-                        struct obd_export *exp = class_new_export(obd);
-                        struct filter_export_data *fed;
-                        CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                               " srv lr: "LPU64" mnt: "LPU64" last mount: "
-                               LPU64"\n", fcd->fcd_uuid, cl_idx,
-                               last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
-                               le64_to_cpu(fcd->fcd_mount_count), mount_count);
-                        if (exp == NULL)
-                                GOTO(err_client, rc = -ENOMEM);
-
-                        memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
-                               sizeof exp->exp_client_uuid.uuid);
-                        fed = &exp->exp_filter_data;
-                        fed->fed_fcd = fcd;
-                        filter_client_add(obd, filter, fed, cl_idx);
-                        /* create helper if export init gets more complex */
-                        spin_lock_init(&fed->fed_lock);
-
-                        fcd = NULL;
-                        obd->obd_recoverable_clients++;
-                        class_export_put(exp);
-                } else {
-                        CDEBUG(D_INFO, "discarded client %d UUID '%s' count "
-                               LPU64"\n", cl_idx, fcd->fcd_uuid,
-                               le64_to_cpu(fcd->fcd_mount_count));
-                }
+                exp = class_new_export(obd);
+                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                       " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
+                       last_rcvd, le64_to_cpu(fsd->fsd_last_transno));
+                if (exp == NULL)
+                        GOTO(err_client, rc = -ENOMEM);
+
+                memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
+                       sizeof exp->exp_client_uuid.uuid);
+                fed = &exp->exp_filter_data;
+                fed->fed_fcd = fcd;
+                filter_client_add(obd, filter, fed, cl_idx);
+                /* create helper if export init gets more complex */
+                spin_lock_init(&fed->fed_lock);
+
+                fcd = NULL;
+                obd->obd_recoverable_clients++;
+                class_export_put(exp);
 
                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
                        cl_idx, last_rcvd);
@@ -491,7 +490,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno);
 
         if (obd->obd_recoverable_clients) {
-                CERROR("RECOVERY: %d recoverable clients, last_rcvd "
+                CWARN("RECOVERY: %d recoverable clients, last_rcvd "
                        LPU64"\n", obd->obd_recoverable_clients,
                        le64_to_cpu(fsd->fsd_last_transno));
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
@@ -572,7 +571,7 @@ static int filter_prep_groups(struct obd_device *obd)
         int i, rc = 0, cleanup_phase = 0;
         ENTRY;
 
-        O_dentry = simple_mkdir(current->fs->pwd, "O", 0700);
+        O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
         CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
         if (IS_ERR(O_dentry)) {
                 rc = PTR_ERR(O_dentry);
@@ -647,7 +646,7 @@ static int filter_prep_groups(struct obd_device *obd)
                 loff_t off = 0;
 
                 sprintf(name, "%d", i);
-                dentry = simple_mkdir(O_dentry, name, 0700);
+                dentry = simple_mkdir(O_dentry, name, 0700, 1);
                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
                 if (IS_ERR(dentry)) {
                         rc = PTR_ERR(dentry);
@@ -706,7 +705,7 @@ static int filter_prep_groups(struct obd_device *obd)
                         char dir[20];
                         snprintf(dir, sizeof(dir), "d%u", i);
 
-                        dentry = simple_mkdir(O_dentry, dir, 0700);
+                        dentry = simple_mkdir(O_dentry, dir, 0700, 1);
                         CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
                         if (IS_ERR(dentry)) {
                                 rc = PTR_ERR(dentry);
@@ -904,29 +903,16 @@ static int filter_blocking_ast(struct ldlm_lock *lock,
         RETURN(0);
 }
 
-static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
-                              ldlm_mode_t lock_mode,struct lustre_handle *lockh)
+static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
 {
-        struct ldlm_res_id res_id = { .name = {0} };
-        int flags = 0, rc;
-        ENTRY;
-
-        res_id.name[0] = de->d_inode->i_ino;
-        res_id.name[1] = de->d_inode->i_generation;
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                              res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                              &flags, ldlm_completion_ast,
-                              filter_blocking_ast, NULL, lockh);
-
-        RETURN(rc == ELDLM_OK ? 0 : -EIO);  /* XXX translate ldlm code */
+        down(&dparent->d_inode->i_sem);
+        return 0;
 }
 
 /* We never dget the object parent, so DON'T dput it either */
-static void filter_parent_unlock(struct dentry *dparent,
-                                 struct lustre_handle *lockh,
-                                 ldlm_mode_t lock_mode)
+static void filter_parent_unlock(struct dentry *dparent)
 {
-        ldlm_lock_decref(lockh, lock_mode);
+        up(&dparent->d_inode->i_sem);
 }
 
 /* We never dget the object parent, so DON'T dput it either */
@@ -943,20 +929,19 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
 
 /* We never dget the object parent, so DON'T dput it either */
 struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
-                                  obd_id objid, ldlm_mode_t lock_mode,
-                                  struct lustre_handle *lockh)
+                                  obd_id objid)
 {
         unsigned long now = jiffies;
-        struct dentry *de = filter_parent(obd, group, objid);
+        struct dentry *dparent = filter_parent(obd, group, objid);
         int rc;
 
-        if (IS_ERR(de))
-                return de;
+        if (IS_ERR(dparent))
+                return dparent;
 
-        rc = filter_lock_dentry(obd, de, lock_mode, lockh);
+        rc = filter_lock_dentry(obd, dparent);
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
-        return rc ? ERR_PTR(rc) : de;
+        return rc ? ERR_PTR(rc) : dparent;
 }
 
 /* How to get files, dentries, inodes from object id's.
@@ -970,7 +955,6 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
                                  struct dentry *dir_dentry,
                                  obd_gr group, obd_id id)
 {
-        struct lustre_handle lockh;
         struct dentry *dparent = dir_dentry;
         struct dentry *dchild;
         char name[32];
@@ -984,15 +968,15 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
 
         len = sprintf(name, LPU64, id);
         if (dir_dentry == NULL) {
-                dparent = filter_parent_lock(obd, group, id, LCK_PR, &lockh);
+                dparent = filter_parent_lock(obd, group, id);
                 if (IS_ERR(dparent))
                         RETURN(dparent);
         }
         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
                dparent->d_name.len, dparent->d_name.name, name);
-        dchild = ll_lookup_one_len(name, dparent, len);
+        dchild = /*ll_*/lookup_one_len(name, dparent, len);
         if (dir_dentry == NULL)
-                filter_parent_unlock(dparent, &lockh, LCK_PR);
+                filter_parent_unlock(dparent);
         if (IS_ERR(dchild)) {
                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
                 RETURN(dchild);
@@ -1011,21 +995,15 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
         struct lustre_handle lockh;
         int flags = LDLM_AST_DISCARD_DATA, rc;
         struct ldlm_res_id res_id = { .name = { objid } };
-        struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
-        ENTRY;
+        ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
 
+        ENTRY;
         /* Tell the clients that the object is gone now and that they should
-         * throw away any cached pages.  If we're the OST at stripe 0 in the
-         * file then this enqueue will communicate the DISCARD to all the
-         * clients.  This assumes that we always destroy all the objects for
-         * a file at a time, as is currently the case.  If we're not the
-         * OST at stripe 0 then we'll harmlessly get a very lonely lock in 
-         * the local DLM and immediately drop it. */
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                              res_id, LDLM_EXTENT, &extent,
-                              sizeof(extent), LCK_PW, &flags,
-                              ldlm_completion_ast, filter_blocking_ast,
-                              NULL, &lockh);
+         * throw away any cached pages. */
+        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
+                              LDLM_EXTENT, &policy, LCK_PW,
+                              &flags, filter_blocking_ast, ldlm_completion_ast,
+                              NULL, NULL, NULL, 0, NULL, &lockh);
 
         /* We only care about the side-effects, just drop the lock. */
         if (rc == ELDLM_OK)
@@ -1047,7 +1025,7 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
                 CERROR("destroying objid %*s nlink = %lu, count = %d\n",
                        dchild->d_name.len, dchild->d_name.name,
-                       (unsigned long)inode->i_nlink, 
+                       (unsigned long)inode->i_nlink,
                        atomic_read(&inode->i_count));
         }
 
@@ -1060,6 +1038,119 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         RETURN(rc);
 }
 
+static int filter_intent_policy(struct ldlm_namespace *ns,
+                                struct ldlm_lock **lockp, void *req_cookie,
+                                ldlm_mode_t mode, int flags, void *data)
+{
+        struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+        struct ptlrpc_request *req = req_cookie;
+        struct ldlm_lock *lock = *lockp, *l = NULL;
+        struct ldlm_resource *res = lock->l_resource;
+        ldlm_processing_policy policy;
+        struct ost_lvb *res_lvb, *reply_lvb;
+        struct list_head *tmp;
+        ldlm_error_t err;
+        int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply),
+                                            sizeof(struct ost_lvb) };
+        ENTRY;
+
+        policy = ldlm_get_processing_policy(res);
+        LASSERT(policy != NULL);
+        LASSERT(req != NULL);
+
+        rc = lustre_pack_reply(req, 2, repsize, NULL);
+        if (rc)
+                RETURN(req->rq_status = rc);
+
+        reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
+        LASSERT(reply_lvb != NULL);
+
+        //fixup_handle_for_resent_req(req, lock, &lockh);
+
+        /* If we grant any lock at all, it will be a whole-file read lock.
+         * Call the extent policy function to see if our request can be
+         * granted, or is blocked. */
+        lock->l_policy_data.l_extent.start = 0;
+        lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
+        lock->l_req_mode = LCK_PR;
+
+        l_lock(&res->lr_namespace->ns_lock);
+
+        res->lr_tmp = &rpc_list;
+        rc = policy(lock, &tmpflags, 0, &err);
+        res->lr_tmp = NULL;
+
+        /* FIXME: we should change the policy function slightly, to not make
+         * this list at all, since we just turn around and free it */
+        while (!list_empty(&rpc_list)) {
+                struct ldlm_ast_work *w =
+                        list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
+                list_del(&w->w_list);
+                LDLM_LOCK_PUT(w->w_lock);
+                OBD_FREE(w, sizeof(*w));
+        }
+
+        if (rc == LDLM_ITER_CONTINUE) {
+                /* The lock met with no resistance; we're finished. */
+                l_unlock(&res->lr_namespace->ns_lock);
+                RETURN(ELDLM_LOCK_REPLACED);
+        }
+
+        /* Do not grant any lock, but instead send GL callbacks.  The extent
+         * policy nicely created a list of all PW locks for us.  We will choose
+         * the highest of those which are larger than the size in the LVB, if
+         * any, and perform a glimpse callback. */
+        down(&res->lr_lvb_sem);
+        res_lvb = res->lr_lvb_data;
+        LASSERT(res_lvb != NULL);
+        reply_lvb->lvb_size = res_lvb->lvb_size;
+        up(&res->lr_lvb_sem);
+
+        list_for_each(tmp, &res->lr_granted) {
+                struct ldlm_lock *tmplock =
+                        list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                if (tmplock->l_granted_mode == LCK_PR)
+                        continue;
+
+                if (tmplock->l_policy_data.l_extent.end <=
+                    reply_lvb->lvb_size)
+                        continue;
+
+                if (l == NULL) {
+                        l = LDLM_LOCK_GET(tmplock);
+                        continue;
+                }
+
+                if (l->l_policy_data.l_extent.start >
+                    tmplock->l_policy_data.l_extent.start)
+                        continue;
+
+                LDLM_LOCK_PUT(l);
+                l = LDLM_LOCK_GET(tmplock);
+        }
+        l_unlock(&res->lr_namespace->ns_lock);
+
+        /* There were no PW locks beyond the size in the LVB; finished. */
+        if (l == NULL)
+                RETURN(ELDLM_LOCK_ABORTED);
+
+        LASSERT(l->l_glimpse_ast != NULL);
+        rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+        if (rc != 0 && res->lr_namespace->ns_lvbo &&
+            res->lr_namespace->ns_lvbo->lvbo_update) {
+                res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+        }
+
+        down(&res->lr_lvb_sem);
+        reply_lvb->lvb_size = res_lvb->lvb_size;
+        up(&res->lr_lvb_sem);
+
+        LDLM_LOCK_PUT(l);
+
+        RETURN(ELDLM_LOCK_ABORTED);
+}
+
 /* mount the file system (secretly) */
 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
                         char *option)
@@ -1089,7 +1180,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
                 if (*lcfg->lcfg_inlbuf3 == 'f') {
                         obd->obd_replayable = 1;
                         obd_sync_filter = 1;
-                        CERROR("%s: recovery enabled\n", obd->obd_name);
+                        CWARN("%s: recovery enabled\n", obd->obd_name);
                 } else {
                         if (*lcfg->lcfg_inlbuf3 != 'n') {
                                 CERROR("unrecognised flag '%c'\n",
@@ -1119,15 +1210,31 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         spin_lock_init(&filter->fo_objidlock);
         INIT_LIST_HEAD(&filter->fo_export_list);
         sema_init(&filter->fo_alloc_lock, 1);
+        spin_lock_init(&filter->fo_r_pages.oh_lock);
+        spin_lock_init(&filter->fo_w_pages.oh_lock);
+        spin_lock_init(&filter->fo_r_discont_pages.oh_lock);
+        spin_lock_init(&filter->fo_w_discont_pages.oh_lock);
+        spin_lock_init(&filter->fo_r_discont_blocks.oh_lock);
+        spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
+        filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
 
         obd->obd_namespace = ldlm_namespace_new("filter-tgt",
                                                 LDLM_NAMESPACE_SERVER);
         if (obd->obd_namespace == NULL)
                 GOTO(err_post, rc = -ENOMEM);
+        obd->obd_namespace->ns_lvbp = obd;
+        obd->obd_namespace->ns_lvbo = &filter_lvbo;
+        ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
 
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
+        rc = llog_cat_initialize(obd, 1);
+        if (rc) {
+                CERROR("failed to setup llogging subsystems\n");
+                GOTO(err_post, rc);
+        }
+
         RETURN(0);
 
 err_post:
@@ -1170,20 +1277,6 @@ static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
         return rc;
 }
 
-static int filter_postsetup(struct obd_device *obd)
-{
-        int rc = 0;
-        ENTRY;
-
-        // XXX add a storage location for the logid for size changes
-#ifdef ENABLE_ORPHANS
-        rc = llog_cat_initialize(obd, 1);
-        if (rc)
-                CERROR("failed to setup llogging subsystems\n");
-#endif
-        RETURN(rc);
-}
-
 static int filter_cleanup(struct obd_device *obd, int flags)
 {
         struct filter_obd *filter = &obd->u.filter;
@@ -1248,7 +1341,8 @@ static int filter_attach(struct obd_device *obd, obd_count len, void *data)
                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
-        return rc;
+
+        return lproc_filter_attach_seqstat(obd);
 }
 
 static int filter_detach(struct obd_device *dev)
@@ -1292,7 +1386,6 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
         fed->fed_fcd = fcd;
-        fcd->fcd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         rc = filter_client_add(obd, filter, fed, -1);
 
@@ -1312,65 +1405,163 @@ static int filter_precleanup(struct obd_device *obd, int flags)
         int rc = 0;
         ENTRY;
 
-#ifdef ENABLE_ORPHANS
         rc = obd_llog_finish(obd, 0);
         if (rc)
                 CERROR("failed to cleanup llogging subsystem\n");
-#endif
 
         RETURN(rc);
 }
 
+/* Do extra sanity checks for grant accounting.  We do this at connect,
+ * disconnect, and statfs RPC time, so it shouldn't be too bad.  We can
+ * always get rid of it or turn it off when we know accounting is good. */
+static void filter_grant_sanity_check(struct obd_device *obd, char *func)
+{
+        struct filter_export_data *fed;
+        struct obd_export *exp;
+        obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
+        obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
+        obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
+
+        if (list_empty(&obd->obd_exports))
+                return;
+
+        spin_lock(&obd->obd_osfs_lock);
+        spin_lock(&obd->obd_dev_lock);
+        list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
+                fed = &exp->exp_filter_data;
+                LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
+                         "cli %s/%p %lu+%lu > "LPU64"\n",
+                         exp->exp_client_uuid.uuid, exp,
+                         fed->fed_grant, fed->fed_pending, maxsize);
+                LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
+                         exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize);
+                CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+                       obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                       fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+                tot_granted += fed->fed_grant + fed->fed_pending;
+                tot_pending += fed->fed_pending;
+                tot_dirty += fed->fed_dirty;
+        }
+        fo_tot_granted = obd->u.filter.fo_tot_granted;
+        fo_tot_pending = obd->u.filter.fo_tot_pending;
+        fo_tot_dirty = obd->u.filter.fo_tot_dirty;
+        spin_unlock(&obd->obd_dev_lock);
+        spin_unlock(&obd->obd_osfs_lock);
+
+        /* Do these assertions outside the spinlocks so we don't kill system */
+        if (tot_granted != fo_tot_granted)
+                CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
+                       func, tot_granted, fo_tot_granted);
+        if (tot_pending != fo_tot_pending)
+                CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
+                       func, tot_pending, fo_tot_pending);
+        if (tot_dirty != fo_tot_dirty)
+                CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
+                       func, tot_dirty, fo_tot_dirty);
+        if (tot_pending > tot_granted)
+                CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
+                       func, tot_pending, tot_granted);
+        if (tot_granted > maxsize)
+                CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
+                       func, tot_granted, maxsize);
+        if (tot_dirty > maxsize)
+                CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
+                       func, tot_dirty, maxsize);
+}
+
+/* Remove this client from the grant accounting totals.  We also remove
+ * the export from the obd device under the osfs and dev locks to ensure
+ * that the filter_grant_sanity_check() calculations are always valid.
+ * The client should do something similar when it invalidates its import. */
+static void filter_grant_discard(struct obd_export *exp)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct filter_obd *filter = &obd->u.filter;
+        struct filter_export_data *fed = &exp->exp_filter_data;
+
+        spin_lock(&obd->obd_osfs_lock);
+        spin_lock(&exp->exp_obd->obd_dev_lock);
+        list_del_init(&exp->exp_obd_chain);
+        spin_unlock(&exp->exp_obd->obd_dev_lock);
+
+        CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+               obd->obd_name, exp->exp_client_uuid.uuid, exp,
+               fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+
+        LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
+                 "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
+                 obd->obd_name, filter->fo_tot_granted,
+                 exp->exp_client_uuid.uuid, exp, fed->fed_grant);
+        filter->fo_tot_granted -= fed->fed_grant;
+        LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
+                 "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
+                 obd->obd_name, filter->fo_tot_pending,
+                 exp->exp_client_uuid.uuid, exp, fed->fed_pending);
+        LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
+                 "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
+                 obd->obd_name, filter->fo_tot_dirty,
+                 exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
+        filter->fo_tot_dirty -= fed->fed_dirty;
+        fed->fed_dirty = 0;
+        fed->fed_grant = 0;
+
+        spin_unlock(&obd->obd_osfs_lock);
+}
+
 static int filter_destroy_export(struct obd_export *exp)
 {
         ENTRY;
 
+        if (exp->exp_filter_data.fed_pending)
+                CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
+                       exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+                       exp, exp->exp_filter_data.fed_pending);
+
         target_destroy_export(exp);
 
         if (exp->exp_obd->obd_replayable)
                 filter_client_free(exp, exp->exp_flags);
+
+        filter_grant_discard(exp);
+        if (!(exp->exp_flags & OBD_OPT_FORCE))
+                filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
+
         RETURN(0);
 }
 
 /* also incredibly similar to mds_disconnect */
 static int filter_disconnect(struct obd_export *exp, int flags)
 {
-        struct filter_obd *filter = &exp->exp_obd->u.filter;
-        struct filter_export_data *fed = &exp->exp_filter_data;
+        struct obd_device *obd = exp->exp_obd;
         unsigned long irqflags;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
         LASSERT(exp);
-
-        /* XXX should this go into filter_destroy_export() instead? */
-        /* forget what this client had cached, I bet this needs to be
-         * matched with appropriate client behaviour in the face of
-         * disconnects */
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
-        filter->fo_tot_cached -= fed->fed_cached;
-        filter->fo_tot_granted -= fed->fed_grant;
-        fed->fed_cached = 0;
-        fed->fed_grant = 0;
-        fed->fed_grant_sent = 0;
-        fed->fed_grant_waiting = 0;
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
-
-        ldlm_cancel_locks_for_export(exp);
+        class_export_get(exp);
 
         spin_lock_irqsave(&exp->exp_lock, irqflags);
         exp->exp_flags = flags;
         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
 
-        fsfilt_sync(exp->exp_obd, filter->fo_sb);
-        /* XXX cleanup preallocated inodes */
+        if (!(flags & OBD_OPT_FORCE))
+                filter_grant_sanity_check(obd, __FUNCTION__);
+        filter_grant_discard(exp);
+
+        /* Disconnect early so that clients can't keep using export */
+        rc = class_disconnect(exp, flags);
+
+        ldlm_cancel_locks_for_export(exp);
+
+        fsfilt_sync(obd, obd->u.filter.fo_sb);
 
         /* flush any remaining cancel messages out to the target */
-        ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+        ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
         llog_sync(ctxt, exp);
 
-        rc = class_disconnect(exp, flags);
+        class_export_put(exp);
         RETURN(rc);
 }
 
@@ -1434,6 +1625,8 @@ static int filter_setattr(struct obd_export *exp, struct obdo *oa,
         struct filter_obd *filter;
         struct dentry *dentry;
         struct iattr iattr;
+        struct ldlm_res_id res_id = { .name = { oa->o_id } };
+        struct ldlm_resource *res;
         void *handle;
         int rc, rc2;
         ENTRY;
@@ -1473,6 +1666,22 @@ static int filter_setattr(struct obd_export *exp, struct obdo *oa,
                         rc = rc2;
         }
 
+        if (iattr.ia_valid & ATTR_SIZE) {
+                res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
+                                        res_id, LDLM_EXTENT, 0);
+                if (res == NULL) {
+                        CERROR("!!! resource_get failed for object "LPU64" -- "
+                               "filter_setattr with no lock?\n", oa->o_id);
+                } else {
+                        if (res->lr_namespace->ns_lvbo &&
+                            res->lr_namespace->ns_lvbo->lvbo_update) {
+                                rc = res->lr_namespace->ns_lvbo->lvbo_update
+                                        (res, NULL, 0, 0);
+                        }
+                        ldlm_resource_putref(res);
+                }
+        }
+
         oa->o_valid = OBD_MD_FLID;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
@@ -1575,29 +1784,33 @@ static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
         diff = oa->o_id - filter_last_id(filter, oa);
         CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
                filter_last_id(filter, oa), diff);
-       
+
         /* delete orphans request */
-        if ((oa->o_valid & OBD_MD_FLFLAGS) && 
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
             (oa->o_flags & OBD_FL_DELORPHAN)) {
-                LASSERT(diff <= 0);
-                if (diff == 0)
-                        RETURN(0);
+                if (diff >= 0)
+                        RETURN(diff);
+                if (-diff > 10000) { /* XXX make this smarter */
+                        CERROR("ignoring bogus orphan destroy request: obdid "
+                               LPU64" last_id "LPU64"\n",
+                               oa->o_id, filter_last_id(filter, oa));
+                        RETURN(-EINVAL);
+                }
                 filter_destroy_precreated(exp, oa, filter);
                 rc = filter_update_last_objid(obd, group, 0);
                 if (rc)
-                        CERROR("unable to write lastobjid, but orphans" 
+                        CERROR("unable to write lastobjid, but orphans"
                                "were deleted\n");
                 RETURN(0);
         } else {
                 /* only precreate if group == 0 and o_id is specfied */
-                if (!(oa->o_valid & OBD_FL_DELORPHAN) && 
+                if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
                     (group != 0 || oa->o_id == 0))
                         RETURN(1);
 
                 LASSERT(diff >= 0);
                 RETURN(diff);
         }
-
 }
 
 /* We rely on the fact that only one thread will be creating files in a given
@@ -1612,25 +1825,41 @@ static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
 static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                             obd_gr group, int *num)
 {
-        struct lustre_handle parent_lockh;
         struct dentry *dchild = NULL;
         struct filter_obd *filter;
         struct dentry *dparent;
         int err = 0, rc = 0, i;
         __u64 next_id;
-        void *handle;
+        int recreate_obj = 0;
+        void *handle = NULL;
         ENTRY;
 
         filter = &obd->u.filter;
 
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+                recreate_obj = 1;
+        }
+
         for (i = 0; i < *num && err == 0; i++) {
                 int cleanup_phase = 0;
 
-                next_id = filter_last_id(filter, oa) + 1;
+                if (recreate_obj) {
+                        __u64 last_id;
+                        next_id = oa->o_id;
+                        last_id = filter_last_id(filter, NULL);
+                        if (next_id > last_id) {
+                                CERROR("Error: Trying to recreate obj greater"
+                                       "than last id "LPD64" > "LPD64"\n",
+                                       next_id, last_id);
+                                RETURN(-EINVAL);
+                        }
+                } else
+                        next_id = filter_last_id(filter, NULL) + 1;
+
                 CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
 
-                dparent = filter_parent_lock(obd, group, next_id, LCK_PW,
-                                             &parent_lockh);
+                dparent = filter_parent_lock(obd, group, next_id);
                 if (IS_ERR(dparent))
                         GOTO(cleanup, rc = PTR_ERR(dparent));
                 cleanup_phase = 1;
@@ -1642,9 +1871,18 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
 
                 if (dchild->d_inode != NULL) {
                         /* This would only happen if lastobjid was bad on disk*/
-                        CERROR("Serious error: objid %*s already exists; is "
-                               "this filesystem corrupt?\n",
-                               dchild->d_name.len, dchild->d_name.name);
+                        /* Could also happen if recreating missing obj but
+                         * already exists
+                         */
+                        if (recreate_obj) {
+                                CERROR("Serious error: recreating obj %*s but "
+                                       "obj already exists \n",
+                                       dchild->d_name.len, dchild->d_name.name);
+                        } else {
+                                CERROR("Serious error: objid %*s already "
+                                       "exists; is this filesystem corrupt?\n",
+                                        dchild->d_name.len, dchild->d_name.name);
+                        }
                         GOTO(cleanup, rc = -EEXIST);
                 }
 
@@ -1658,12 +1896,15 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 if (rc) {
                         CERROR("create failed rc = %d\n", rc);
                         GOTO(cleanup, rc);
-                } 
+                }
 
-                filter_set_last_id(filter, oa, next_id);
-                err = filter_update_last_objid(obd, group, 0);
-                if (err)
-                        CERROR("unable to write lastobjid but file created\n");
+                if (!recreate_obj) {
+                        filter_set_last_id(filter, NULL, next_id);
+                        err = filter_update_last_objid(obd, group, 0);
+                        if (err)
+                                CERROR("unable to write lastobjid "
+                                       "but file created\n");
+                }
 
         cleanup:
                 switch(cleanup_phase) {
@@ -1677,11 +1918,11 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 case 2:
                         f_dput(dchild);
                 case 1:
-                        filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+                        filter_parent_unlock(dparent);
                 case 0:
                         break;
                 }
-                
+
                 if (rc)
                         break;
         }
@@ -1718,12 +1959,24 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
         obd = exp->exp_obd;
         push_ctxt(&saved, &obd->obd_ctxt, NULL);
 
-        diff = filter_should_precreate(exp, oa, group);
-        if (diff > 0) {
-                oa->o_id = filter_last_id(&obd->u.filter, oa);
-                rc = filter_precreate(obd, oa, group, &diff);
-                oa->o_id += diff;
-                oa->o_valid = OBD_MD_FLID;
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+                if (oa->o_id > filter_last_id(&obd->u.filter, oa)) {
+                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
+                               oa->o_id, filter_last_id(&obd->u.filter, oa));
+                        rc = -EINVAL;
+                } else {
+                        diff = 1;
+                        rc = filter_precreate(obd, oa, group, &diff);
+                }
+        } else {
+                diff = filter_should_precreate(exp, oa, group);
+                if (diff > 0) {
+                        oa->o_id = filter_last_id(&obd->u.filter, oa);
+                        rc = filter_precreate(obd, oa, group, &diff);
+                        oa->o_id += diff;
+                        oa->o_valid = OBD_MD_FLID;
+                }
         }
 
         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
@@ -1749,7 +2002,6 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
         struct dentry *dchild = NULL, *dparent = NULL;
         struct obd_run_ctxt saved;
         void *handle = NULL;
-        struct lustre_handle parent_lockh;
         struct llog_cookie *fcc = NULL;
         int rc, rc2, cleanup_phase = 0, have_prepared = 0;
         obd_gr group = 0;
@@ -1764,8 +2016,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
         push_ctxt(&saved, &obd->obd_ctxt, NULL);
 
  acquire_locks:
-        dparent = filter_parent_lock(obd, group, oa->o_id, LCK_PW,
-                                     &parent_lockh);
+        dparent = filter_parent_lock(obd, group, oa->o_id);
         if (IS_ERR(dparent))
                 GOTO(cleanup, rc = PTR_ERR(dparent));
         cleanup_phase = 1;
@@ -1776,7 +2027,8 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
         cleanup_phase = 2;
 
         if (dchild->d_inode == NULL) {
-                CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
+                CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n",
+                       oa->o_id);
                 GOTO(cleanup, rc = -ENOENT);
         }
 
@@ -1794,7 +2046,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                  * complication of condition the above code to skip it on the
                  * second time through. */
                 f_dput(dchild);
-                filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+                filter_parent_unlock(dparent);
 
                 filter_prepare_destroy(obd, oa->o_id);
                 have_prepared = 1;
@@ -1818,9 +2070,16 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
 cleanup:
         switch(cleanup_phase) {
         case 3:
-                if (fcc != NULL)
-                        fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
-                                              filter_cancel_cookies_cb, fcc);
+                if (fcc != NULL) {
+                        if (oti != NULL)
+                                fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
+                                                      filter_cancel_cookies_cb,
+                                                      fcc);
+                        else
+                                fsfilt_add_journal_cb(obd, 0, handle,
+                                                      filter_cancel_cookies_cb,
+                                                      fcc);
+                }
                 rc = filter_finish_transno(exp, oti, rc);
                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
                 if (rc2) {
@@ -1831,13 +2090,7 @@ cleanup:
         case 2:
                 f_dput(dchild);
         case 1:
-                if (rc || oti == NULL) {
-                        filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
-                } else {
-                        memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
-                               sizeof(parent_lockh));
-                        oti->oti_ack_locks[0].mode = LCK_PW;
-                }
+                filter_parent_unlock(dparent);
         case 0:
                 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
                 break;
@@ -1875,6 +2128,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct obd_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
+        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
@@ -1883,6 +2137,9 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         /* an objid of zero is taken to mean "sync whole filesystem" */
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
+                /* flush any remaining cancel messages out to the target */
+                ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+                llog_sync(ctxt, exp);
                 RETURN(rc);
         }
 
@@ -1916,32 +2173,10 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-/* debugging to make sure that nothing bad happens, can be turned off soon */
-static void filter_grant_sanity_check(struct obd_device *obd,
-                                      struct filter_obd *filter, __u64 maxsize)
-{
-        struct filter_export_data *fed;
-        struct obd_export *exp_pos;
-        obd_size tot_cached = 0, tot_granted = 0;
-
-        list_for_each_entry(exp_pos, &obd->obd_exports, exp_obd_chain) {
-                fed = &exp_pos->exp_filter_data;
-                LASSERT(fed->fed_cached <= maxsize);
-                LASSERT(fed->fed_grant <= maxsize);
-                tot_cached += fed->fed_cached;
-                tot_granted += fed->fed_grant;
-        }
-        LASSERT(tot_cached == filter->fo_tot_cached);
-        LASSERT(tot_granted == filter->fo_tot_granted);
-        LASSERT(tot_cached <= maxsize);
-        LASSERT(tot_granted <= maxsize);
-}
-
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                          unsigned long max_age)
 {
         struct filter_obd *filter = &obd->u.filter;
-        __u64 cached;
         int blockbits = filter->fo_sb->s_blocksize_bits;
         int rc;
         ENTRY;
@@ -1952,18 +2187,19 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         spin_lock(&obd->obd_osfs_lock);
         rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
         memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
-        filter_grant_sanity_check(obd, filter, osfs->os_blocks << blockbits);
-        cached = filter->fo_tot_cached + osfs->os_bsize - 1;
         spin_unlock(&obd->obd_osfs_lock);
 
-        cached >>= blockbits;
-        if (cached > osfs->os_bavail) {
-                CERROR("cached "LPU64" > bavail "LPU64", clamping\n", cached,
-                       osfs->os_bavail);
-                cached = osfs->os_bavail;
-        }
-        osfs->os_bavail -= cached;
-        osfs->os_bfree -= cached;
+        CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
+               " pending "LPU64" free "LPU64" avail "LPU64"\n",
+               filter->fo_tot_dirty, filter->fo_tot_granted,
+               filter->fo_tot_pending,
+               osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
+
+        filter_grant_sanity_check(obd, __FUNCTION__);
+
+        osfs->os_bavail -= min(osfs->os_bavail,
+                               (filter->fo_tot_dirty + filter->fo_tot_pending +
+                                osfs->os_bsize -1) >> blockbits);
 
         RETURN(rc);
 }
@@ -2012,9 +2248,7 @@ static int filter_set_info(struct obd_export *exp, __u32 keylen,
 {
         struct obd_device *obd;
         struct lustre_handle conn;
-#ifdef ENABLE_ORPHANS
         struct llog_ctxt *ctxt;
-#endif
         int rc = 0;
         ENTRY;
 
@@ -2033,10 +2267,8 @@ static int filter_set_info(struct obd_export *exp, __u32 keylen,
 
         CWARN("Received MDS connection ("LPX64")\n", conn.cookie);
         memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn));
-#ifdef ENABLE_ORPHANS
         ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
-#endif
         RETURN(rc);
 }
 
@@ -2060,7 +2292,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
                 BDEVNAME_DECLARE_STORAGE(tmp);
                 CERROR("setting device %s read-only\n",
                        ll_bdevname(sb, tmp));
-                
+
                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
                 LASSERT(handle);
                 (void)fsfilt_commit(obd, inode, handle, 1);
@@ -2075,18 +2307,18 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
         }
 
         case OBD_IOC_LLOG_CANCEL:
-        case OBD_IOC_LLOG_REMOVE: 
+        case OBD_IOC_LLOG_REMOVE:
         case OBD_IOC_LLOG_INFO:
         case OBD_IOC_LLOG_PRINT: {
                 /* FIXME to be finished */
                 RETURN(-EOPNOTSUPP);
 /*
                 struct llog_ctxt *ctxt = NULL;
-                
+
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-                
+
                 RETURN(rc);
 */
         }
@@ -2106,12 +2338,12 @@ static struct llog_operations filter_size_orig_logops = {
 };
 
 static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                            int count, struct llog_logid *logid) 
+                            int count, struct llog_catid *logid)
 {
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
-        
+
         filter_unlink_repl_logops = llog_client_ops;
         filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_unlink_repl_logops.lop_connect = llog_repl_connect;
@@ -2134,7 +2366,7 @@ static int filter_llog_finish(struct obd_device *obd, int count)
 {
         int rc;
         ENTRY;
-        
+
         rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_REPL_CTXT));
         if (rc)
                 RETURN(rc);
@@ -2160,7 +2392,6 @@ static struct obd_ops filter_obd_ops = {
         o_get_info:       filter_get_info,
         o_set_info:       filter_set_info,
         o_setup:          filter_setup,
-        o_postsetup:      filter_postsetup,
         o_precleanup:     filter_precleanup,
         o_cleanup:        filter_cleanup,
         o_connect:        filter_connect,