Whamcloud - gitweb
- CROW-related fixes from b_hd_mdref
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 2d4360b..80a55f7 100644 (file)
 #include <linux/dcache.h>
 #include <linux/init.h>
 #include <linux/version.h>
+#include <linux/jbd.h>
+#include <linux/ext3_fs.h>
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 # include <linux/mount.h>
 # include <linux/buffer_head.h>
+# include <linux/bio.h>
 #endif
 
 #include <linux/obd_class.h>
 #include <linux/obd_lov.h>
+#include <linux/obd_ost.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_log.h>
 #include <linux/lustre_commit_confd.h>
-#include <portals/list.h>
+#include <libcfs/list.h>
 
+#include <linux/lustre_smfs.h>
+#include <linux/lustre_sec.h>
 #include "filter_internal.h"
 
+/* Group 0 is no longer a legal group, to catch uninitialized IDs */
+#define FILTER_MIN_GROUPS 3
+
 static struct lvfs_callback_ops filter_lvfs_ops;
 
 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *ea, struct obd_trans_info *);
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *,
+                                             int, struct obd_export *);
 
 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
                              void *cb_data, int error)
@@ -101,6 +112,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                                 cpu_to_le64(last_rcvd);
                 spin_unlock(&filter->fo_translock);
         }
+        
         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
 
         /* could get xid from oti, if it's ever needed */
@@ -149,7 +161,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
         LASSERT(bitmap != NULL);
 
         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
-        if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
+        if (!strcmp((char *)fed->fed_fcd->fcd_uuid, (char *)obd->obd_uuid.uuid))
                 RETURN(0);
 
         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
@@ -240,7 +252,7 @@ static int filter_client_free(struct obd_export *exp, int flags)
                 GOTO(free, 0);
 
         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
-        if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
+        if (!strcmp((char *)fed->fed_fcd->fcd_uuid, (char *)obd->obd_uuid.uuid))
                 GOTO(free, 0);
 
         LASSERT(filter->fo_last_rcvd_slots != NULL);
@@ -262,6 +274,10 @@ static int filter_client_free(struct obd_export *exp, int flags)
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
                                  sizeof(zero_fcd), &off, 1);
+        if (rc == 0)
+                /* update server's transno */
+                filter_update_server_data(obd, filter->fo_rcvd_filp,
+                                          filter->fo_fsd, 1);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
@@ -285,8 +301,7 @@ static int filter_free_server_data(struct filter_obd *filter)
 {
         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
         filter->fo_fsd = NULL;
-        OBD_FREE(filter->fo_last_rcvd_slots,
-                 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+        OBD_FREE(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS/8);
         filter->fo_last_rcvd_slots = NULL;
         return 0;
 }
@@ -363,8 +378,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 RETURN(-ENOMEM);
         filter->fo_fsd = fsd;
 
-        OBD_ALLOC(filter->fo_last_rcvd_slots,
-                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+        OBD_ALLOC(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS/8);
         if (filter->fo_last_rcvd_slots == NULL) {
                 OBD_FREE(fsd, sizeof(*fsd));
                 RETURN(-ENOMEM);
@@ -388,7 +402,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                                LAST_RCVD, rc);
                         GOTO(err_fsd, rc);
                 }
-                if (strcmp(fsd->fsd_uuid, obd->obd_uuid.uuid) != 0) {
+                if (strcmp((char *)fsd->fsd_uuid, (char *)obd->obd_uuid.uuid)) {
                         CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
                                obd->obd_uuid.uuid, fsd->fsd_uuid);
                         GOTO(err_fsd, rc = -EINVAL);
@@ -469,9 +483,10 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                  * need to be set up like real exports as filter_connect() does.
                  */
                 exp = class_new_export(obd);
-                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                       " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
-                       last_rcvd, le64_to_cpu(fsd->fsd_last_transno));
+                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64" "
+                       "srv lr: "LPU64" fcd_group %d \n", fcd->fcd_uuid, cl_idx,
+                       last_rcvd, le64_to_cpu(fsd->fsd_last_transno), 
+                       le32_to_cpu(fcd->fcd_group));
                 if (exp == NULL)
                         GOTO(err_client, rc = -ENOMEM);
 
@@ -479,12 +494,19 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                        sizeof exp->exp_client_uuid.uuid);
                 fed = &exp->exp_filter_data;
                 fed->fed_fcd = fcd;
+                fed->fed_group = le32_to_cpu(fcd->fcd_group);
                 filter_client_add(obd, filter, fed, cl_idx);
                 /* create helper if export init gets more complex */
                 spin_lock_init(&fed->fed_lock);
 
                 fcd = NULL;
+                exp->exp_connected = 0;
+                exp->exp_req_replay_needed = 1;
+                exp->exp_lock_replay_needed = 1;
+                atomic_inc(&obd->obd_req_replay_clients);
+                atomic_inc(&obd->obd_lock_replay_clients);
                 obd->obd_recoverable_clients++;
+                obd->obd_max_recoverable_clients++;
                 class_export_put(exp);
 
                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
@@ -498,11 +520,12 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno);
 
         if (obd->obd_recoverable_clients) {
-                CWARN("RECOVERY: %d recoverable clients, last_rcvd "
-                      LPU64"\n", obd->obd_recoverable_clients,
+                CWARN("RECOVERY: service %s, %d recoverable clients, "
+                      "last_transno "LPU64"\n", obd->obd_name,
+                      obd->obd_recoverable_clients,
                       le64_to_cpu(fsd->fsd_last_transno));
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
-                obd->obd_recovering = 1;
+                target_start_recovery_thread(obd, ost_handle);
         }
 
         if (fcd)
@@ -514,8 +537,9 @@ out:
 
         /* save it, so mount count and last_transno is current */
         rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
-
-        RETURN(rc);
+        if (rc)
+                GOTO(err_client, rc);
+        RETURN(0);
 
 err_client:
         class_disconnect_exports(obd, 0);
@@ -566,12 +590,56 @@ static int filter_cleanup_groups(struct obd_device *obd)
         RETURN(0);
 }
 
+static int filter_update_last_group(struct obd_device *obd, int group)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        struct file *filp = NULL;
+        int last_group = 0, rc;
+        loff_t off = 0;
+        ENTRY;
+
+        if (group <= filter->fo_committed_group)
+                RETURN(0);
+
+        filp = filp_open("LAST_GROUP", O_RDWR, 0700);
+        if (IS_ERR(filp)) {
+                rc = PTR_ERR(filp);
+                filp = NULL;
+                CERROR("cannot open LAST_GROUP: rc = %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+        if (rc) {
+                CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
+                GOTO(cleanup, rc);
+        }
+        LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS);
+        CDEBUG(D_INODE, "%s: previous %d, new %d\n",
+               obd->obd_name, last_group, group);
+
+        off = 0;
+        last_group = group;
+        /* must be sync: bXXXX */
+        rc = fsfilt_write_record(obd, filp, &last_group, sizeof(__u32), &off, 1);
+        if (rc) {
+                CDEBUG(D_INODE, "error updating LAST_GROUP: rc %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        filter->fo_committed_group = group;
+cleanup:
+        if (filp)
+                filp_close(filp, 0);
+        RETURN(rc);
+}
+
 static int filter_read_group_internal(struct obd_device *obd, int group,
                                       int create)
 {
         struct filter_obd *filter = &obd->u.filter;
         __u64 *new_objids = NULL;
-        struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs;
+        struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs = NULL;
         struct dentry **new_groups = NULL;
         struct file **new_files = NULL;
         struct dentry *dentry;
@@ -685,6 +753,8 @@ static int filter_read_group_internal(struct obd_device *obd, int group,
                 OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
         }
 
+        filter_update_last_group(obd, group);
+        
         if (filp->f_dentry->d_inode->i_size == 0) {
                 filter->fo_last_objids[group] = FILTER_INIT_OBJID;
                 RETURN(0);
@@ -725,8 +795,10 @@ static int filter_read_groups(struct obd_device *obd, int last_group,
                               int create)
 {
         struct filter_obd *filter = &obd->u.filter;
-        int old_count = filter->fo_group_count, group = old_count, rc = 0;
+        int old_count, group, rc = 0;
 
+        down(&filter->fo_init_lock);
+        old_count = filter->fo_group_count;
         for (group = old_count; group <= last_group; group++) {
                 if (group == 0)
                         continue; /* no group zero */
@@ -735,6 +807,7 @@ static int filter_read_groups(struct obd_device *obd, int last_group,
                 if (rc != 0)
                         break;
         }
+        up(&filter->fo_init_lock);
         return rc;
 }
 
@@ -743,6 +816,9 @@ static int filter_prep_groups(struct obd_device *obd)
         struct filter_obd *filter = &obd->u.filter;
         struct dentry *dentry, *O_dentry;
         int rc = 0, cleanup_phase = 0;
+        struct file *filp = NULL;
+        int last_group;
+        loff_t off = 0;
         ENTRY;
 
         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
@@ -805,16 +881,40 @@ static int filter_prep_groups(struct obd_device *obd)
 
         cleanup_phase = 2; /* groups */
 
-        /* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS 3
-        rc = filter_read_groups(obd, FILTER_MIN_GROUPS, 1);
-        if (rc)
+        /* we have to initialize all groups before first connections from
+         * clients because they may send create/destroy for any group -bzzz */
+        filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700);
+        if (IS_ERR(filp)) {
+                CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp));
+                GOTO(cleanup, rc = PTR_ERR(filp));
+        }
+        cleanup_phase = 3; /* filp */
+
+        rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+        if (rc) {
+                CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc);
                 GOTO(cleanup, rc);
+        }
+        if (off == 0) {
+                last_group = FILTER_MIN_GROUPS;
+        } else {
+                LASSERT(last_group >= FILTER_MIN_GROUPS);
+        }
 
+        CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
+              FILTER_MIN_GROUPS, last_group);
+        filter->fo_committed_group = last_group;
+        rc = filter_read_groups(obd, last_group, 1);
+        if (rc)
+                GOTO(cleanup, rc);
+        
+        filp_close(filp, 0);
         RETURN(0);
 
  cleanup:
         switch (cleanup_phase) {
+        case 3:
+                filp_close(filp, 0);
         case 2:
                 filter_cleanup_groups(obd);
         case 1:
@@ -988,16 +1088,36 @@ static int filter_blocking_ast(struct ldlm_lock *lock,
         RETURN(0);
 }
 
-static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
+extern void *lock_dir(struct inode *dir, struct qstr *name);
+extern void unlock_dir(struct inode *dir, void *lock);
+
+static void *filter_lock_dentry(struct obd_device *obd,
+                                struct dentry *dparent,
+                                obd_id id)
 {
+#ifdef S_PDIROPS
+        struct qstr qstr;
+        char name[32];
+        int len;
+
+        len = sprintf(name, LPU64, id);
+        qstr_assign(&qstr, (char *)name, len);
+        return lock_dir(dparent->d_inode, &qstr);
+#else
         down(&dparent->d_inode->i_sem);
+#endif
         return 0;
 }
 
 /* We never dget the object parent, so DON'T dput it either */
-static void filter_parent_unlock(struct dentry *dparent)
+static void filter_parent_unlock(struct dentry *dparent, void *lock)
 {
+#ifdef S_PDIROPS
+        LASSERT(lock != NULL);
+        unlock_dir(dparent->d_inode, lock);
+#else
         up(&dparent->d_inode->i_sem);
+#endif
 }
 
 /* We never dget the object parent, so DON'T dput it either */
@@ -1015,11 +1135,10 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
 
 /* We never dget the object parent, so DON'T dput it either */
 struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
-                                  obd_id objid)
+                                  obd_id objid, void **lock)
 {
         unsigned long now = jiffies;
         struct dentry *dparent = filter_parent(obd, group, objid);
-        int rc;
 
         if (IS_ERR(dparent))
                 return dparent;
@@ -1027,10 +1146,9 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
         LASSERT(dparent);
         LASSERT(dparent->d_inode);
 
-        rc = filter_lock_dentry(obd, dparent);
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
-        return rc ? ERR_PTR(rc) : dparent;
+        *lock = filter_lock_dentry(obd, dparent, objid);
+        fsfilt_check_slow(now, obd_timeout, "parent lock");
+        return dparent;
 }
 
 /* How to get files, dentries, inodes from object id's.
@@ -1040,12 +1158,13 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
  * dir_dentry is NULL, we do a read lock while we do the lookup to
  * avoid races with create/destroy and such changing the directory
  * internal to the filesystem code. */
-struct dentry *filter_fid2dentry(struct obd_device *obd,
-                                 struct dentry *dir_dentry,
-                                 obd_gr group, obd_id id)
+struct dentry *filter_id2dentry(struct obd_device *obd,
+                                struct dentry *dir_dentry,
+                                obd_gr group, obd_id id)
 {
         struct dentry *dparent = dir_dentry;
         struct dentry *dchild;
+        void *lock = NULL;
         char name[32];
         int len;
         ENTRY;
@@ -1057,20 +1176,31 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
 
         len = sprintf(name, LPU64, id);
         if (dir_dentry == NULL) {
-                dparent = filter_parent_lock(obd, group, id);
-                if (IS_ERR(dparent))
+                dparent = filter_parent_lock(obd, group, id, &lock);
+                if (IS_ERR(dparent)) {
+                        CERROR("%s: error getting object "LPU64":"LPU64
+                               " parent: rc %ld\n", obd->obd_name,
+                               id, group, PTR_ERR(dparent));
                         RETURN(dparent);
+                }
         }
-        CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
+        CDEBUG(D_INODE, "looking up object O/%.*s/%s\n",
                dparent->d_name.len, dparent->d_name.name, name);
         dchild = /*ll_*/lookup_one_len(name, dparent, len);
         if (dir_dentry == NULL)
-                filter_parent_unlock(dparent);
+                filter_parent_unlock(dparent, lock);
         if (IS_ERR(dchild)) {
-                CERROR("child lookup error %ld\n", PTR_ERR(dchild));
+                CERROR("%s: child lookup error %ld\n", obd->obd_name,
+                       PTR_ERR(dchild));
                 RETURN(dchild);
         }
 
+        if (dchild->d_inode != NULL && is_bad_inode(dchild->d_inode)) {
+                CERROR("%s: got bad inode "LPU64"\n", obd->obd_name, id);
+                f_dput(dchild);
+                RETURN(ERR_PTR(-ENOENT));
+        }
+
         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
                name, dchild, atomic_read(&dchild->d_count));
 
@@ -1113,7 +1243,7 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         ENTRY;
 
         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
-                CERROR("destroying objid %*s nlink = %lu, count = %d\n",
+                CERROR("destroying objid %.*s nlink = %lu, count = %d\n",
                        dchild->d_name.len, dchild->d_name.name,
                        (unsigned long)inode->i_nlink,
                        atomic_read(&inode->i_count));
@@ -1122,7 +1252,7 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         rc = vfs_unlink(dparent->d_inode, dchild);
 
         if (rc)
-                CERROR("error unlinking objid %*s: rc %d\n",
+                CERROR("error unlinking objid %.*s: rc %d\n",
                        dchild->d_name.len, dchild->d_name.name, rc);
 
         RETURN(rc);
@@ -1138,10 +1268,10 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         struct ldlm_resource *res = lock->l_resource;
         ldlm_processing_policy policy;
         struct ost_lvb *res_lvb, *reply_lvb;
+        struct ldlm_reply *rep;
         struct list_head *tmp;
         ldlm_error_t err;
-        int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply),
-                                            sizeof(struct ost_lvb) };
+        int tmpflags = 0, rc, repsize[2] = {sizeof(*rep), sizeof(*reply_lvb)};
         ENTRY;
 
         policy = ldlm_get_processing_policy(res);
@@ -1152,6 +1282,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         if (rc)
                 RETURN(req->rq_status = rc);
 
+        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
+        LASSERT(rep != NULL);
+
         reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
         LASSERT(reply_lvb != NULL);
 
@@ -1193,8 +1326,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         down(&res->lr_lvb_sem);
         res_lvb = res->lr_lvb_data;
         LASSERT(res_lvb != NULL);
-        reply_lvb->lvb_size = res_lvb->lvb_size;
-        reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
+        *reply_lvb = *res_lvb;
         up(&res->lr_lvb_sem);
 
         list_for_each(tmp, &res->lr_granted) {
@@ -1225,7 +1357,14 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         if (l == NULL)
                 RETURN(ELDLM_LOCK_ABORTED);
 
-        LASSERT(l->l_glimpse_ast != NULL);
+        if (l->l_glimpse_ast == NULL) {
+                /* We are racing with unlink(); just return -ENOENT */
+                rep->lock_policy_res1 = -ENOENT;
+                goto out;
+        }
+
+        LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
+
         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
         if (rc != 0 && res->lr_namespace->ns_lvbo &&
             res->lr_namespace->ns_lvbo->lvbo_update) {
@@ -1233,10 +1372,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         }
 
         down(&res->lr_lvb_sem);
-        reply_lvb->lvb_size = res_lvb->lvb_size;
-        reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
+        *reply_lvb = *res_lvb;
         up(&res->lr_lvb_sem);
-
+out:
         LDLM_LOCK_PUT(l);
 
         RETURN(ELDLM_LOCK_ABORTED);
@@ -1251,7 +1389,8 @@ static int filter_post_fs_cleanup(struct obd_device *obd)
         RETURN(rc);
 }
 
-static int filter_group_set_kml_flags(struct obd_device *obd, int group)
+#if 0
+static int filter_group_set_fs_flags(struct obd_device *obd, int group)
 {
         struct filter_obd *filter = &obd->u.filter;
         int rc = 0, i = 0;
@@ -1263,73 +1402,68 @@ static int filter_group_set_kml_flags(struct obd_device *obd, int group)
         for (i = 0; i < filter->fo_subdir_count; i++) {
                 struct dentry *dentry;
                 dentry = (filter->fo_subdirs + group)->dentry[i];
-                rc = fsfilt_set_kml_flags(obd, dentry->d_inode);
+                rc = fsfilt_set_fs_flags(obd, dentry->d_inode, 
+                                         SM_DO_REC | SM_DO_COW);
                 if (rc)
                         RETURN(rc);
         }
         RETURN(rc);
 }
+#endif
+
 static int filter_post_fs_setup(struct obd_device *obd)
 {
         struct filter_obd *filter = &obd->u.filter;
-        int rc = 0, j = 0;
-        struct llog_ctxt *ctxt = NULL;
-
-        rc = fsfilt_post_setup(obd);
-        if (rc)
-                RETURN(rc);
+        int rc = 0;
         
-        for (j = 0; j < filter->fo_group_count; j++) {
-                rc = filter_group_set_kml_flags(obd, j);
-                if (rc)
-                        return rc;
-        } 
+        rc = fsfilt_post_setup(obd, filter->fo_dentry_O);
 
-        fsfilt_get_reint_log_ctxt(obd, filter->fo_sb, &ctxt);
-        if (ctxt) {
-                ctxt->loc_obd = obd;
-                ctxt->loc_idx = LLOG_REINT_ORIG_CTXT;
-                obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT] = ctxt;
-        }
-        fsfilt_set_ost_flags(obd, filter->fo_sb);
         return rc;
 }
 
 /* mount the file system (secretly) */
-int filter_common_setup(struct obd_device *obd, obd_count len,
-                        void *buf, char *option)
+int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
+                        char *option)
 {
-        struct lustre_cfglcfg = buf;
+        struct lustre_cfg *lcfg = buf;
         struct filter_obd *filter = &obd->u.filter;
+        struct lvfs_obd_ctxt *lvfs_ctxt = NULL;
         struct vfsmount *mnt;
-        char name[32] = "CATLIST";
-        int rc = 0;
+        char *str;
+        char ns_name[48];
+        int rc = 0, i;
         ENTRY;
 
-        dev_clear_rdonly(2);
-
-        if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2)
+        if ((LUSTRE_CFG_BUFLEN(lcfg, 1)) < 1 || 
+            (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1)) 
                 RETURN(-EINVAL);
 
-        obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2);
+        obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
         if (IS_ERR(obd->obd_fsops))
                 RETURN(PTR_ERR(obd->obd_fsops));
 
-        mnt = do_kern_mount(lcfg->lcfg_inlbuf2, MS_NOATIME | MS_NODIRATIME,
-                            lcfg->lcfg_inlbuf1, option);
-        rc = PTR_ERR(mnt);
-        if (IS_ERR(mnt))
+        rc = lvfs_mount_fs(lustre_cfg_string(lcfg, 1), 
+                           lustre_cfg_string(lcfg, 2), 
+                           option, MS_NOATIME | MS_NODIRATIME, &lvfs_ctxt);
+        if (rc) {
+                CERROR("lvfs_mount_fs failed: rc = %d\n", rc);
                 GOTO(err_ops, rc);
+        }
+        LASSERT(lvfs_ctxt);
 
-        if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) {
-                if (*lcfg->lcfg_inlbuf3 == 'f') {
+        mnt = lvfs_ctxt->loc_mnt;
+        filter->fo_lvfs_ctxt = lvfs_ctxt;
+
+        if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0 && lustre_cfg_buf(lcfg, 3)) {
+                str = lustre_cfg_string(lcfg, 3);
+                if (*str == 'f') {
                         obd->obd_replayable = 1;
                         obd_sync_filter = 1;
                         CWARN("%s: recovery enabled\n", obd->obd_name);
                 } else {
-                        if (*lcfg->lcfg_inlbuf3 != 'n') {
+                        if (*str != 'n') {
                                 CERROR("unrecognised flag '%c'\n",
-                                       *lcfg->lcfg_inlbuf3);
+                                       *str);
                         }
                         // XXX Robert? Why do we get errors here
                         // GOTO(err_mntput, rc = -EINVAL);
@@ -1347,17 +1481,21 @@ int filter_common_setup(struct obd_device *obd, obd_count len,
         obd->obd_lvfs_ctxt.fs = get_ds();
         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
 
+        ll_clear_rdonly(ll_sbdev(filter->fo_sb));
+        
         rc = fsfilt_setup(obd, mnt->mnt_sb);
         if (rc)
                 GOTO(err_mntput, rc);
 
+        sema_init(&filter->fo_init_lock, 1);
+        filter->fo_committed_group = 0;
         rc = filter_prep(obd);
         if (rc)
                 GOTO(err_mntput, rc);
 
-
-        filter->fo_destroy_in_progress = 0;
-        sema_init(&filter->fo_create_lock, 1);
+        filter->fo_destroys_in_progress = 0;
+        for (i = 0; i < 32; i++)
+                sema_init(&filter->fo_create_locks[i], 1);
 
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
@@ -1374,8 +1512,9 @@ int filter_common_setup(struct obd_device *obd, obd_count len,
         INIT_LIST_HEAD(&filter->fo_llog_list);
         spin_lock_init(&filter->fo_llog_list_lock);
 
-        obd->obd_namespace = ldlm_namespace_new("filter-tgt",
-                                                LDLM_NAMESPACE_SERVER);
+        sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
+        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+
         if (obd->obd_namespace == NULL)
                 GOTO(err_post, rc = -ENOMEM);
         obd->obd_namespace->ns_lvbp = obd;
@@ -1385,7 +1524,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, name);
+        rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, CATLIST);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
@@ -1396,7 +1535,7 @@ err_post:
         filter_post(obd);
 err_mntput:
         unlock_kernel();
-        mntput(mnt);
+        lvfs_umount_fs(filter->fo_lvfs_ctxt);
         filter->fo_sb = 0;
         lock_kernel();
 err_ops:
@@ -1435,13 +1574,29 @@ static int filter_detach(struct obd_device *dev)
 
 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
 {
-        struct lustre_cfg* lcfg = buf;
+        struct filter_obd *filter = &obd->u.filter;
+        struct lustre_cfg *lcfg = buf;
+        unsigned long page;
         int rc;
         ENTRY;
+
+        spin_lock_init(&filter->fo_denylist_lock);
+        INIT_LIST_HEAD(&filter->fo_denylist);
+
+        /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */
+        page = get_zeroed_page(GFP_KERNEL);
+        if (!page)
+                RETURN(-ENOMEM);
+
+        memcpy((void *)page, lustre_cfg_buf(lcfg, 4),
+               LUSTRE_CFG_BUFLEN(lcfg, 4));
+        
         /* all mount options including errors=remount-ro and asyncdel are passed
          * using 4th lcfg param. And it is good, finally we have got rid of
          * hardcoded fs types in the code. */
-        rc = filter_common_setup(obd, len, buf, lcfg->lcfg_inlbuf4);
+        rc = filter_common_setup(obd, len, buf, (void *)page);
+        free_page(page);
+        
         if (rc)
                 RETURN(rc);
         rc = filter_post_fs_setup(obd);
@@ -1451,6 +1606,7 @@ static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
 static int filter_cleanup(struct obd_device *obd, int flags)
 {
         struct filter_obd *filter = &obd->u.filter;
+        ll_sbdev_type save_dev;
         ENTRY;
 
         if (flags & OBD_OPT_FAILOVER)
@@ -1466,43 +1622,121 @@ static int filter_cleanup(struct obd_device *obd, int flags)
                 }
         }
 
+        target_cleanup_recovery(obd);
+        
         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
 
         if (filter->fo_sb == NULL)
                 RETURN(0);
 
+        save_dev = ll_sbdev(filter->fo_sb);
         filter_post_fs_cleanup(obd);
         filter_post(obd);
 
         shrink_dcache_parent(filter->fo_sb->s_root);
         filter->fo_sb = 0;
 
-        if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
-                CERROR("%s: mount point %p busy, mnt_count: %d\n",
-                       obd->obd_name, filter->fo_vfsmnt,
-                       atomic_read(&filter->fo_vfsmnt->mnt_count));
+        spin_lock(&filter->fo_denylist_lock);
+        while (!list_empty(&filter->fo_denylist)) {
+                deny_sec_t *p_deny_sec = list_entry(filter->fo_denylist.next,
+                                                    deny_sec_t, list);
+                list_del(&p_deny_sec->list);
+                OBD_FREE(p_deny_sec, sizeof(*p_deny_sec));
+        }
+        spin_unlock(&filter->fo_denylist_lock);
 
         unlock_kernel();
-        mntput(filter->fo_vfsmnt);
+        lvfs_umount_fs(filter->fo_lvfs_ctxt);
         //destroy_buffers(filter->fo_sb->s_dev);
         filter->fo_sb = NULL;
         fsfilt_put_ops(obd->obd_fsops);
         lock_kernel();
 
-        dev_clear_rdonly(2);
+        ll_clear_rdonly(save_dev);
 
         RETURN(0);
 }
 
+static int filter_process_config(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct lustre_cfg *lcfg = buf;
+        struct filter_obd *filter = &obd->u.filter;
+        int rc = 0;
+        ENTRY;
+
+        switch(lcfg->lcfg_command) {
+        case LCFG_SET_SECURITY: {
+                if ((LUSTRE_CFG_BUFLEN(lcfg, 1) == 0) ||
+                    (LUSTRE_CFG_BUFLEN(lcfg, 2) == 0))
+                        GOTO(out, rc = -EINVAL);
+
+                if (!strcmp(lustre_cfg_string(lcfg, 1), "deny_sec")){
+                        spin_lock(&filter->fo_denylist_lock);
+                        rc = add_deny_security(lustre_cfg_string(lcfg, 2),
+                                               &filter->fo_denylist);
+                        spin_unlock(&filter->fo_denylist_lock);
+                }else {
+                        CERROR("Unrecognized key\n");
+                        rc = -EINVAL;
+                }
+                break;
+        }
+        default: {
+                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
+                GOTO(out, rc = -EINVAL);
+
+        }
+        }
+out:
+        RETURN(rc);
+}
+
+static int filter_connect_post(struct obd_export *exp, unsigned initial,
+                               unsigned long connect_flags)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct filter_export_data *fed;
+        char str[PTL_NALFMT_SIZE];
+        struct obd_llogs *llog;
+        struct llog_ctxt *ctxt;
+        int rc = 0;
+        ENTRY;
+
+        fed = &exp->exp_filter_data;
+        if (fed->fed_group < FILTER_MIN_GROUPS)
+                RETURN(0);
+
+        /* initialize llogs for connections from MDS */
+        llog = filter_grab_llog_for_group(obd, fed->fed_group, exp);
+        LASSERT(llog != NULL);
+
+        ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
+        LASSERT(ctxt != NULL);
+
+        rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        
+        portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+                        exp->exp_connection->c_peer.peer_id.nid, str);
+        
+        CDEBUG(D_OTHER, "%s: init llog ctxt for export "LPX64"/%s, group %d\n",
+               obd->obd_name, exp->exp_connection->c_peer.peer_id.nid,
+               str, fed->fed_group);
+
+        RETURN(rc);
+}
+
 /* nearly identical to mds_connect */
 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
-                          struct obd_uuid *cluuid)
+                          struct obd_uuid *cluuid,
+                          struct obd_connect_data *data,
+                          unsigned long connect_flags)
 {
         struct obd_export *exp;
         struct filter_export_data *fed;
         struct filter_client_data *fcd = NULL;
         struct filter_obd *filter = &obd->u.filter;
-        int rc;
+        struct lvfs_run_ctxt saved;
+        int rc, group;
         ENTRY;
 
         if (conn == NULL || obd == NULL || cluuid == NULL)
@@ -1518,20 +1752,55 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         spin_lock_init(&fed->fed_lock);
 
-        if (!obd->obd_replayable)
-                GOTO(cleanup, rc = 0);
+       /* connection from MDS */
+        group = connect_flags;
+        if (obd->obd_replayable) {
+                OBD_ALLOC(fcd, sizeof(*fcd));
+                if (!fcd) {
+                        CERROR("filter: out of memory for client data\n");
+                        GOTO(cleanup, rc = -ENOMEM);
+                }
 
-        OBD_ALLOC(fcd, sizeof(*fcd));
-        if (!fcd) {
-                CERROR("filter: out of memory for client data\n");
-                GOTO(cleanup, rc = -ENOMEM);
+                memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
+                fed->fed_fcd = fcd;
+                fed->fed_fcd->fcd_group = group;
+                rc = filter_client_add(obd, filter, fed, -1);
+                if (rc)
+                        GOTO(cleanup, rc);
         }
+        CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
+              obd->obd_name, exp->exp_handle.h_cookie, group);
+        if (group == 0)
+                GOTO(cleanup, rc);
+        
+        if (fed->fed_group != 0 && fed->fed_group != group) {
+                char str[PTL_NALFMT_SIZE];
+                portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+                                exp->exp_connection->c_peer.peer_id.nid, str);
+                CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+                       "earlier; now it's trying to use group %d!  This could "
+                       "be a bug in the MDS.  Tell CFS.\n",
+                       exp->exp_connection->c_peer.peer_id.nid, str,
+                       fed->fed_group, group);
+                GOTO(cleanup, rc = -EPROTO);
+        }
+        fed->fed_group = group;
 
-        memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
-        fed->fed_fcd = fcd;
-
-        rc = filter_client_add(obd, filter, fed, -1);
-
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        rc = filter_read_groups(obd, group, 1);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        if (rc != 0) {
+                CERROR("can't read group %u\n", group);
+                GOTO(cleanup, rc);
+        }
+#if 0
+        rc = filter_group_set_fs_flags(obd, group);
+        if (rc != 0) {
+                CERROR("can't set kml flags %u\n", group);
+                GOTO(cleanup, rc);
+        }
+#endif
 cleanup:
         if (rc) {
                 if (fcd)
@@ -1586,6 +1855,7 @@ static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
         obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
         obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
         obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
+        int level = D_CACHE;
 
         if (list_empty(&obd->obd_exports))
                 return;
@@ -1594,13 +1864,20 @@ static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
         spin_lock(&obd->obd_dev_lock);
         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
                 fed = &exp->exp_filter_data;
-                LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
-                         "cli %s/%p %lu+%lu > "LPU64"\n",
-                         exp->exp_client_uuid.uuid, exp,
-                         fed->fed_grant, fed->fed_pending, maxsize);
-                LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
-                         exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize);
-                CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+                if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
+                    fed->fed_dirty < 0)
+                        level = D_ERROR;
+                if (maxsize > 0) { /* we may not have done a statfs yet */
+                        LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
+                                 "cli %s/%p %ld+%ld > "LPU64"\n",
+                                 exp->exp_client_uuid.uuid, exp,
+                                 fed->fed_grant, fed->fed_pending, maxsize);
+                        LASSERTF(fed->fed_dirty <= maxsize,
+                                 "cli %s/%p %ld > "LPU64"\n",
+                                 exp->exp_client_uuid.uuid, exp,
+                                 fed->fed_dirty, maxsize);
+                }
+                CDEBUG(level, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
                 tot_granted += fed->fed_grant + fed->fed_pending;
@@ -1643,27 +1920,30 @@ static void filter_grant_discard(struct obd_export *exp)
         struct obd_device *obd = exp->exp_obd;
         struct filter_obd *filter = &obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
+        int level = D_CACHE;
 
         spin_lock(&obd->obd_osfs_lock);
         spin_lock(&exp->exp_obd->obd_dev_lock);
         list_del_init(&exp->exp_obd_chain);
         spin_unlock(&exp->exp_obd->obd_dev_lock);
 
-        CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+        if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0)
+                level = D_ERROR;
+        CDEBUG(level, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
                obd->obd_name, exp->exp_client_uuid.uuid, exp,
                fed->fed_dirty, fed->fed_pending, fed->fed_grant);
 
         LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
-                 "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
+                 "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
                  obd->obd_name, filter->fo_tot_granted,
                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
         filter->fo_tot_granted -= fed->fed_grant;
-        LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
-                 "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
+        LASSERTF(filter->fo_tot_pending >= fed->fed_pending,
+                 "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
                  obd->obd_name, filter->fo_tot_pending,
                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
         LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
-                 "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
+                 "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
                  obd->obd_name, filter->fo_tot_dirty,
                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
         filter->fo_tot_dirty -= fed->fed_dirty;
@@ -1694,13 +1974,13 @@ static int filter_destroy_export(struct obd_export *exp)
         RETURN(0);
 }
 
-static void filter_sync_llogs(struct obd_export *dexp)
+static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
         struct filter_group_llog *fglog, *nlog;
-        struct obd_device *obd = dexp->exp_obd;
         struct filter_obd *filter;
         int worked = 0, group;
         struct llog_ctxt *ctxt;
+        ENTRY;
 
         filter = &obd->u.filter;
 
@@ -1730,17 +2010,21 @@ static void filter_sync_llogs(struct obd_export *dexp)
                 }
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                if (fglog) {
-                        worked = fglog->group;
+                if (fglog == NULL)
+                        break;
+
+                worked = fglog->group;
+                if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
                         ctxt = llog_get_context(fglog->llogs,
-                                                LLOG_UNLINK_REPL_CTXT);
-                        llog_sync(ctxt, dexp);
+                                        LLOG_UNLINK_REPL_CTXT);
+                        LASSERT(ctxt != NULL);
+                        llog_sync(ctxt, fglog->exp);
                 }
         } while (fglog != NULL);
 }
 
 /* also incredibly similar to mds_disconnect */
-static int filter_disconnect(struct obd_export *exp, int flags)
+static int filter_disconnect(struct obd_export *exp, unsigned long flags)
 {
         struct obd_device *obd = exp->exp_obd;
         unsigned long irqflags;
@@ -1766,8 +2050,7 @@ static int filter_disconnect(struct obd_export *exp, int flags)
         fsfilt_sync(obd, obd->u.filter.fo_sb);
 
         /* flush any remaining cancel messages out to the target */
-        filter_sync_llogs(exp);
-
+        filter_sync_llogs(obd, exp);
         class_export_put(exp);
         RETURN(rc);
 }
@@ -1777,25 +2060,27 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
 {
         struct dentry *dchild = NULL;
         obd_gr group = 0;
+        ENTRY;
 
         if (oa->o_valid & OBD_MD_FLGROUP)
                 group = oa->o_gr;
 
-        dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
+        dchild = filter_id2dentry(obd, NULL, group, oa->o_id);
 
         if (IS_ERR(dchild)) {
-                CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
+                CERROR("%s error looking up object: "LPU64"\n",
+                       what, oa->o_id);
                 RETURN(dchild);
         }
 
         if (dchild->d_inode == NULL) {
-                CERROR("%s: %s on non-existent object: "LPU64"\n",
-                       obd->obd_name, what, oa->o_id);
+                CDEBUG(D_INFO, "%s: %s on non-existent object: "
+                       LPU64"\n", obd->obd_name, what, oa->o_id);
                 f_dput(dchild);
                 RETURN(ERR_PTR(-ENOENT));
         }
 
-        return dchild;
+        RETURN(dchild);
 }
 
 static int filter_getattr(struct obd_export *exp, struct obdo *oa,
@@ -1825,82 +2110,106 @@ static int filter_getattr(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-/* this is called from filter_truncate() until we have filter_punch() */
-static int filter_setattr(struct obd_export *exp, struct obdo *oa,
-                          struct lov_stripe_md *md, struct obd_trans_info *oti)
+int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
+                            struct obdo *oa, struct obd_trans_info *oti)
 {
-        struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
-        struct dentry *dentry;
         struct iattr iattr;
-        struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
-        struct ldlm_resource *res;
         void *handle;
-        int rc, rc2;
+        int rc, err;
         ENTRY;
 
-        LASSERT(oti != NULL);
-
-        dentry = filter_oa2dentry(exp->exp_obd, oa);
-        if (IS_ERR(dentry))
-                RETURN(PTR_ERR(dentry));
-
+        LASSERT(dentry != NULL);
+        LASSERT(!IS_ERR(dentry));
+        LASSERT(dentry->d_inode != NULL);
+        
         filter = &exp->exp_obd->u.filter;
-
         iattr_from_obdo(&iattr, oa, oa->o_valid);
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        lock_kernel();
-
         if (iattr.ia_valid & ATTR_SIZE)
                 down(&dentry->d_inode->i_sem);
-        handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
-                              oti);
+        handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
+                              FSFILT_OP_SETATTR, oti);
         if (IS_ERR(handle))
                 GOTO(out_unlock, rc = PTR_ERR(handle));
 
         /* XXX this could be a rwsem instead, if filter_preprw played along */
         if (iattr.ia_valid & ATTR_ATTR_FLAG)
-                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
-                                      EXT3_IOC_SETFLAGS,
+                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode,
+                                      NULL, EXT3_IOC_SETFLAGS,
                                       (long)&iattr.ia_attr_flags);
         else
-                rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
+                rc = fsfilt_setattr(exp->exp_obd, dentry, handle,
+                                    &iattr, 1);
+        
         rc = filter_finish_transno(exp, oti, rc);
-        rc2 = fsfilt_commit(exp->exp_obd, filter->fo_sb, dentry->d_inode, handle, 0);
-        if (rc2) {
-                CERROR("error on commit, err = %d\n", rc2);
+        
+        err = fsfilt_commit(exp->exp_obd, filter->fo_sb,
+                            dentry->d_inode, handle,
+                            exp->exp_sync);
+        if (err) {
+                CERROR("error on commit, err = %d\n", err);
                 if (!rc)
-                        rc = rc2;
+                        rc = err;
         }
+        EXIT;
+out_unlock:
+        if (iattr.ia_valid & ATTR_SIZE)
+                up(&dentry->d_inode->i_sem);
+        return rc;
+}
 
-        if (iattr.ia_valid & ATTR_SIZE) {
-                res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
-                                        res_id, LDLM_EXTENT, 0);
-                if (res == NULL) {
-                        CERROR("!!! resource_get failed for object "LPU64" -- "
-                               "filter_setattr with no lock?\n", oa->o_id);
-                } else {
-                        if (res->lr_namespace->ns_lvbo &&
-                            res->lr_namespace->ns_lvbo->lvbo_update) {
-                                rc = res->lr_namespace->ns_lvbo->lvbo_update
-                                        (res, NULL, 0, 0);
-                        }
-                        ldlm_resource_putref(res);
-                }
-        }
+/* this is called from filter_truncate() until we have filter_punch() */
+int filter_setattr(struct obd_export *exp, struct obdo *oa,
+                   struct lov_stripe_md *md, struct obd_trans_info *oti)
+{
+        struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
+        struct ldlm_valblock_ops *ns_lvbo;
+        struct lvfs_run_ctxt saved;
+        struct filter_obd *filter;
+        struct ldlm_resource *res;
+        struct dentry *dentry;
+        int rc;
+        ENTRY;
+
+        LASSERT(oti != NULL);
+
+        filter = &exp->exp_obd->u.filter;
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+        /* make sure that object is allocated. */
+        dentry = filter_crow_object(exp->exp_obd,
+                                    oa->o_gr, oa->o_id);
+        if (IS_ERR(dentry))
+                GOTO(out_pop, rc = PTR_ERR(dentry));
+
+        lock_kernel();
+
+        /* setting objects attributes (including owner/group) */
+        rc = filter_setattr_internal(exp, dentry, oa, oti);
+        if (rc)
+                GOTO(out_unlock, rc);
 
+        res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
+                                res_id, LDLM_EXTENT, 0);
+        
+        if (res != NULL) {
+                ns_lvbo = res->lr_namespace->ns_lvbo;
+                if (ns_lvbo && ns_lvbo->lvbo_update)
+                        rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
+                ldlm_resource_putref(res);
+        }
+        
         oa->o_valid = OBD_MD_FLID;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
+        EXIT;
 out_unlock:
-        if (iattr.ia_valid & ATTR_SIZE)
-                up(&dentry->d_inode->i_sem);
         unlock_kernel();
-        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
         f_dput(dentry);
-        RETURN(rc);
+out_pop:
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+        return rc;
 }
 
 /* XXX identical to osc_unpackmd */
@@ -1953,353 +2262,346 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         RETURN(lsm_size);
 }
 
-static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
-                                      struct filter_obd *filter)
+static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
+                         unsigned long max_age)
 {
-        struct obdo doa; /* XXX obdo on stack */
-        __u64 last, id;
+        struct filter_obd *filter = &obd->u.filter;
+        int blockbits = filter->fo_sb->s_blocksize_bits;
+        int rc;
         ENTRY;
-        LASSERT(oa);
-
-        memset(&doa, 0, sizeof(doa));
-        if (oa->o_valid & OBD_MD_FLGROUP) {
-                doa.o_valid |= OBD_MD_FLGROUP;
-                doa.o_gr = oa->o_gr;
-        } else {
-                doa.o_gr = 0;
-        }
-        doa.o_mode = S_IFREG;
-        doa.o_gr = oa->o_gr;
-        doa.o_valid = oa->o_valid & OBD_MD_FLGROUP;
-
-        filter->fo_destroy_in_progress = 1;
-        down(&filter->fo_create_lock);
-        if (!filter->fo_destroy_in_progress) {
-                CERROR("%s: destroy_in_progress already cleared\n",
-                        exp->exp_obd->obd_name);
-                up(&filter->fo_create_lock);
-                EXIT;
-                return;
-        }
 
-        last = filter_last_id(filter, doa.o_gr);
-        CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
-               exp->exp_obd->obd_name, oa->o_id + 1, last);
-        for (id = oa->o_id + 1; id <= last; id++) {
-                doa.o_id = id;
-                filter_destroy(exp, &doa, NULL, NULL);
-        }
+        /* at least try to account for cached pages.  its still racey and
+         * might be under-reporting if clients haven't announced their
+         * caches with brw recently */
+        spin_lock(&obd->obd_osfs_lock);
+        rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
+        memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
+        spin_unlock(&obd->obd_osfs_lock);
 
-        CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
-               exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
-
-        spin_lock(&filter->fo_objidlock);
-        filter->fo_last_objids[doa.o_gr] = oa->o_id;
-        spin_unlock(&filter->fo_objidlock);
-
-        filter->fo_destroy_in_progress = 0;
-        up(&filter->fo_create_lock);
-
-        EXIT;
-}
+        CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
+               " pending "LPU64" free "LPU64" avail "LPU64"\n",
+               filter->fo_tot_dirty, filter->fo_tot_granted,
+               filter->fo_tot_pending,
+               osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
 
-/* returns a negative error or a nonnegative number of files to create */
-static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
-                                   obd_gr group)
-{
-        struct obd_device *obd = exp->exp_obd;
-        struct filter_obd *filter = &obd->u.filter;
-        int diff, rc;
-        ENTRY;
+        filter_grant_sanity_check(obd, __FUNCTION__);
 
-        diff = oa->o_id - filter_last_id(filter, oa->o_gr);
-        CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
-               filter_last_id(filter, oa->o_gr), diff);
-
-        /* delete orphans request */
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_DELORPHAN)) {
-                if (diff >= 0)
-                        RETURN(diff);
-                if (-diff > 10000) { /* XXX make this smarter */
-                        CERROR("ignoring bogus orphan destroy request: obdid "
-                               LPU64" last_id "LPU64"\n",
-                               oa->o_id, filter_last_id(filter, oa->o_gr));
-                        RETURN(-EINVAL);
-                }
-                filter_destroy_precreated(exp, oa, filter);
-                rc = filter_update_last_objid(obd, group, 0);
-                if (rc)
-                        CERROR("unable to write lastobjid, but orphans"
-                               "were deleted\n");
-                RETURN(0);
-        } else {
-                /* only precreate if group == 0 and o_id is specfied */
-                if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
-                    (group != 0 || oa->o_id == 0))
-                        RETURN(1);
+        osfs->os_bavail -= min(osfs->os_bavail,
+                               (filter->fo_tot_dirty + filter->fo_tot_pending +
+                                osfs->os_bsize -1) >> blockbits);
 
-                LASSERT(diff >= 0);
-                RETURN(diff);
-        }
-}
-static int filter_precreate_rec(struct obd_device *obd, struct dentry *dentry, 
-                                int *number, struct obdo *oa)
-{
-        int    rc = 0;
-        ENTRY;       
-         
-        rc = fsfilt_precreate_rec(obd, dentry, number, oa);
-  
         RETURN(rc);
 }
-/* We rely on the fact that only one thread will be creating files in a given
- * group at a time, which is why we don't need an atomic filter_get_new_id.
- * Even if we had that atomic function, the following race would exist:
- *
- * thread 1: gets id x from filter_next_id
- * thread 2: gets id (x + 1) from filter_next_id
- * thread 2: creates object (x + 1)
- * thread 1: tries to create object x, gets -ENOSPC
- */
-static int filter_precreate(struct obd_device *obd, struct obdo *oa,
-                            obd_gr group, int *num)
+
+int filter_create_object(struct obd_device *obd, struct obdo *oa,
+                         obd_gr group)
 {
-        struct dentry *dchild = NULL, *dparent = NULL;
+        struct dentry *dparent = NULL;
+        struct dentry *dchild = NULL;
         struct filter_obd *filter;
-        int err = 0, rc = 0, recreate_obj = 0, i;
-        __u64 next_id;
+        struct obd_statfs *osfs;
+        int cleanup_phase = 0;
+        int err = 0, rc = 0;
         void *handle = NULL;
+        void *lock = NULL;
         ENTRY;
 
         filter = &obd->u.filter;
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                recreate_obj = 1;
+        OBD_ALLOC(osfs, sizeof(*osfs));
+        if (osfs == NULL)
+                RETURN(-ENOMEM);
+        rc = filter_statfs(obd, osfs, jiffies - HZ);
+        if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
+                CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
+                       osfs->os_bavail << filter->fo_sb->s_blocksize_bits);
+                rc = -ENOSPC;
         }
+        OBD_FREE(osfs, sizeof(*osfs));
+        if (rc)
+                RETURN(rc);
 
-        CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
+        down(&filter->fo_create_locks[group]);
 
-        down(&filter->fo_create_lock);
+        if (test_bit(group, &filter->fo_destroys_in_progress)) {
+                CWARN("%s: precreate aborted by destroy\n",
+                      obd->obd_name);
+                GOTO(out, rc = -EALREADY);
+        }
 
-        for (i = 0; i < *num && err == 0; i++) {
-                int cleanup_phase = 0;
+        CDEBUG(D_INFO, "precreate objid "LPU64"\n", oa->o_id);
 
-                if (filter->fo_destroy_in_progress) {
-                        CWARN("%s: precreate aborted by destroy\n",
-                              obd->obd_name);
-                        break;
-                }
+        dparent = filter_parent_lock(obd, group, oa->o_id, &lock);
+        if (IS_ERR(dparent))
+                GOTO(cleanup, rc = PTR_ERR(dparent));
+        cleanup_phase = 1;
 
-                if (recreate_obj) {
-                        __u64 last_id;
-                        next_id = oa->o_id;
-                        last_id = filter_last_id(filter, group);
-                        if (next_id > last_id) {
-                                CERROR("Error: Trying to recreate obj greater"
-                                       "than last id "LPD64" > "LPD64"\n",
-                                       next_id, last_id);
-                                GOTO(cleanup, rc = -EINVAL);
-                        }
-                } else {
-                        next_id = filter_last_id(filter, group) + 1;
-                }
+        dchild = filter_id2dentry(obd, dparent, group, oa->o_id);
+        if (IS_ERR(dchild))
+                GOTO(cleanup, rc = PTR_ERR(dchild));
+        cleanup_phase = 2;
 
-                CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
+        if (dchild->d_inode != NULL)
+                GOTO(cleanup, rc = 0);
 
-                dparent = filter_parent_lock(obd, group, next_id);
-                if (IS_ERR(dparent))
-                        GOTO(cleanup, rc = PTR_ERR(dparent));
-                cleanup_phase = 1;
+        handle = fsfilt_start_log(obd, dparent->d_inode,
+                                  FSFILT_OP_CREATE, NULL, 1);
+        if (IS_ERR(handle))
+                GOTO(cleanup, rc = PTR_ERR(handle));
+        cleanup_phase = 3;
 
-                /*only do precreate rec record. so clean kml flags here*/
-                fsfilt_clear_kml_flags(obd, dparent->d_inode);
-                
-                dchild = filter_fid2dentry(obd, dparent, group, next_id);
-                if (IS_ERR(dchild))
-                        GOTO(cleanup, rc = PTR_ERR(dchild));
-                cleanup_phase = 2;
-
-                if (dchild->d_inode != NULL) {
-                        /* This would only happen if lastobjid was bad on disk*/
-                        /* Could also happen if recreating missing obj but
-                         * already exists
-                         */
-                        if (recreate_obj) {
-                                CERROR("%s: Serious error: recreating obj %*s "
-                                       "but obj already exists \n",
-                                       obd->obd_name, dchild->d_name.len,
-                                       dchild->d_name.name);
-                                LBUG();
-                        } else {
-                                CERROR("%s: Serious error: objid %*s already "
-                                       "exists; is this filesystem corrupt?\n",
-                                       obd->obd_name, dchild->d_name.len,
-                                       dchild->d_name.name);
-                                LBUG();
-                        }
-                        GOTO(cleanup, rc = -EEXIST);
-                }
+        rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+        if (rc) {
+                CERROR("create failed rc = %d\n", rc);
+                GOTO(cleanup, rc);
+        }
 
-                handle = fsfilt_start_log(obd, dparent->d_inode,
-                                          FSFILT_OP_CREATE, NULL, 1);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
-                cleanup_phase = 3;
+        fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
 
-                rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+        if (oa->o_id > filter_last_id(filter, group)) {
+                /*
+                 * saving last created object id, it will be needed in recovery
+                 * for deleting orphanes.
+                 */
+                filter_set_last_id(filter, group, oa->o_id);
+
+                rc = filter_update_last_objid(obd, group, 0);
                 if (rc) {
-                        CERROR("create failed rc = %d\n", rc);
-                        GOTO(cleanup, rc);
+                        CERROR("unable to write lastobjid, but "
+                               "orphans were deleted, err = %d\n",
+                               rc);
+                        rc = 0;
                 }
-
-                if (!recreate_obj) {
-                        filter_set_last_id(filter, group, next_id);
-                        err = filter_update_last_objid(obd, group, 0);
-                        if (err)
-                                CERROR("unable to write lastobjid "
-                                       "but file created\n");
+        }
+cleanup:
+        switch(cleanup_phase) {
+        case 3:
+                err = fsfilt_commit(obd, filter->fo_sb,
+                                    dparent->d_inode, handle, 0);
+                if (err) {
+                        CERROR("error on commit, err = %d\n", err);
+                        if (!rc)
+                                rc = err;
                 }
-                fsfilt_set_kml_flags(obd, dparent->d_inode);
+        case 2:
+                f_dput(dchild);
+        case 1:
+                filter_parent_unlock(dparent, lock);
+        case 0:
+                break;
+        }
+
+        if (rc)
+                GOTO(out, rc);
+
+out:
+        up(&filter->fo_create_locks[group]);
+        RETURN(rc);
+}
+
+struct dentry *filter_crow_object(struct obd_device *obd,
+                                  __u64 ogr, __u64 oid)
+{
+        struct dentry *dentry;
+        struct obdo *oa;
+        int rc = 0;
+        ENTRY;
+
+        /* check if object is already allocated */
+        dentry = filter_id2dentry(obd, NULL, ogr, oid);
+        if (IS_ERR(dentry))
+                RETURN(dentry);
+
+        if (dentry->d_inode)
+                RETURN(dentry);
+
+        f_dput(dentry);
         
-        cleanup:
-                switch(cleanup_phase) {
-                case 3:
-                        err = fsfilt_commit(obd, filter->fo_sb, dparent->d_inode, handle, 0);
-                        if (err) {
-                                CERROR("error on commit, err = %d\n", err);
-                                if (!rc)
-                                        rc = err;
-                        }
-                case 2:
-                        f_dput(dchild);
-                case 1:
-                        filter_parent_unlock(dparent);
-                case 0:
-                        break;
-                }
+        /* allocate object as it does not exist */
+        oa = obdo_alloc();
+        if (oa == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
-                if (rc)
-                        break;
+        oa->o_id = oid;
+        oa->o_gr = ogr;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+        CDEBUG(D_INODE, "OSS object "LPU64"/"LPU64
+               " does not exists - allocate now\n",
+               oid, ogr);
+
+        rc = filter_create_object(obd, oa, oa->o_gr);
+        if (rc) {
+                CERROR("cannot create OSS object "LPU64"/"LPU64
+                       ", err = %d\n", oa->o_id, oa->o_gr, rc);
+                GOTO(out_free_oa, dentry = ERR_PTR(rc));
+        }
+
+        /* lookup for just created object and return it to caller */
+        dentry = filter_id2dentry(obd, NULL, ogr, oid);
+        if (IS_ERR(dentry))
+                GOTO(out_free_oa, dentry);
+                
+        if (dentry->d_inode == NULL) {
+                f_dput(dentry);
+                dentry = ERR_PTR(-ENOENT);
+                CERROR("cannot find just created OSS object "
+                       LPU64"/"LPU64" err = %d\n", oid,
+                       ogr, (int)PTR_ERR(dentry));
+                GOTO(out_free_oa, dentry);
         }
-        *num = i;
 
-        rc = filter_precreate_rec(obd, dparent, num, oa);
+        EXIT;
+out_free_oa:
+        obdo_free(oa);
+        return dentry;
+}
+
+static int
+filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
+{
+        struct obd_device *obd = NULL;
+        struct filter_obd *filter;
+        struct obdo *doa = NULL;
+        int rc = 0, orphans;
+        __u64 last, id;
+        ENTRY;
         
-        up(&filter->fo_create_lock);
+        LASSERT(oa);
+        LASSERT(oa->o_gr != 0);
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+        obd = exp->exp_obd;
+        filter = &obd->u.filter;
+
+        last = filter_last_id(filter, oa->o_gr);
+        orphans = last - oa->o_id;
         
-        CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
-               obd->obd_name, group, filter->fo_last_objids[group]);
+        if (orphans <= 0)
+                RETURN(0);
+                
+       doa = obdo_alloc();
+        if (doa == NULL)
+                RETURN(-ENOMEM);
 
-        CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
-               obd->obd_name, i);
-        RETURN(rc);
+        doa->o_gr = oa->o_gr;
+        doa->o_mode = S_IFREG;
+        doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
+
+        set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+        down(&filter->fo_create_locks[doa->o_gr]);
+        if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
+                CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
+                       exp->exp_obd->obd_name, doa->o_gr);
+                up(&filter->fo_create_locks[doa->o_gr]);
+                GOTO(out_free_doa, 0);
+        }
+
+        CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "
+              LPU64"\n", exp->exp_obd->obd_name, doa->o_gr,
+              oa->o_id + 1, last);
+        
+        for (id = oa->o_id + 1; id <= last; id++) {
+                doa->o_id = id;
+                filter_destroy(exp, doa, NULL, NULL);
+        }
+
+        CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
+               LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
+
+        /* return next free id to be used as a new start of sequence -bzzz */
+        oa->o_id = last + 1;
+
+        filter_set_last_id(filter, oa->o_gr, oa->o_id);
+        clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+        up(&filter->fo_create_locks[oa->o_gr]);
+
+        EXIT;
+out_free_doa:
+        obdo_free(doa);
+        return rc;
 }
 
-static int filter_create(struct obd_export *exp, struct obdo *oa,
-                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
+/*
+ * by now this function is only needed as entry point for deleting orphanes on
+ * OSS as objects are created on first write attempt. --umka
+ */
+static int
+filter_create(struct obd_export *exp, struct obdo *oa, void *acl,
+              int acl_size, struct lov_stripe_md **ea,
+              struct obd_trans_info *oti)
 {
+        struct filter_export_data *fed;
         struct obd_device *obd = NULL;
-        struct filter_obd *filter;
+        int group = oa->o_gr, rc = 0;
         struct lvfs_run_ctxt saved;
-        struct lov_stripe_md *lsm = NULL;
-        struct filter_export_data *fed;
+        struct filter_obd *filter;
         char str[PTL_NALFMT_SIZE];
-        int group = oa->o_gr, rc = 0, diff, recreate_objs = 0;
         ENTRY;
 
+        LASSERT(acl == NULL && acl_size == 0);
+
         if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
-                                exp->exp_connection->c_peer.peer_nid, str);
+                                exp->exp_connection->c_peer.peer_id.nid, str);
                 CERROR("!!! nid "LPX64"/%s sent invalid object group %d\n",
-                       exp->exp_connection->c_peer.peer_nid, str, group);
+                       exp->exp_connection->c_peer.peer_id.nid, str, group);
                 RETURN(-EINVAL);
         }
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                recreate_objs = 1;
-        }
-
         obd = exp->exp_obd;
         fed = &exp->exp_filter_data;
         filter = &obd->u.filter;
 
-        if (fed->fed_group != group && !recreate_objs &&
-            !(oa->o_valid & OBD_MD_REINT)) {
+        if (fed->fed_group != group) {
                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
-                                exp->exp_connection->c_peer.peer_nid, str);
-                CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+                                exp->exp_connection->c_peer.peer_id.nid, str);
+                CERROR("!!! this export (nid "LPX64"/%s) used object group %d "
                        "earlier; now it's trying to use group %d!  This could "
                        "be a bug in the MDS.  Tell CFS.\n",
-                       exp->exp_connection->c_peer.peer_nid, str,
+                       exp->exp_connection->c_peer.peer_id.nid, str,
                        fed->fed_group, group);
                 RETURN(-ENOTUNIQ);
         }
 
         CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
                group, oa->o_id);
-        if (ea != NULL) {
-                lsm = *ea;
-                if (lsm == NULL) {
-                        rc = obd_alloc_memmd(exp, &lsm);
-                        if (rc < 0)
-                                RETURN(rc);
-                }
-        }
 
         obd = exp->exp_obd;
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
-        if (oa->o_valid & OBD_MD_REINT) {
-                int num = *((int*)oa->o_inline);  
-                rc = filter_precreate(obd, oa, oa->o_gr, &num);
-        } else if (recreate_objs) {
-                if (oa->o_id > filter_last_id(&obd->u.filter, group)) {
-                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
-                               oa->o_id, filter_last_id(&obd->u.filter, group));
-                        rc = -EINVAL;
-                } else {
-                        diff = 1;
-                        rc = filter_precreate(obd, oa, group, &diff);
-                }
+        LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
+                (oa->o_flags == OBD_FL_DELORPHAN));
+                
+        rc = filter_clear_orphans(exp, oa);
+        if (rc) {
+                CERROR("cannot clear orphanes starting from "
+                       LPU64", err = %d\n", oa->o_id, rc);
         } else {
-                diff = filter_should_precreate(exp, oa, group);
-                if (diff > 0) {
-                        oa->o_id = filter_last_id(&obd->u.filter, group);
-                        rc = filter_precreate(obd, oa, group, &diff);
-                        oa->o_id += diff;
-                        oa->o_valid = OBD_MD_FLID;
+                rc = filter_update_last_objid(obd, group, 0);
+                if (rc) {
+                        CERROR("unable to write lastobjid, but "
+                               "orphans were deleted, err = %d\n",
+                               rc);
                 }
         }
-
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        if (rc && ea != NULL && *ea != lsm) {
-                obd_free_memmd(exp, &lsm);
-        } else if (rc == 0 && ea != NULL) {
-                /* XXX LOV STACKING: the lsm that is passed to us from
-                 * LOV does not have valid lsm_oinfo data structs, so
-                 * don't go touching that.  This needs to be fixed in a
-                 * big way. */
-                lsm->lsm_object_id = oa->o_id;
-                lsm->lsm_object_gr = oa->o_gr;
-                *ea = lsm;
-        }
-
-        RETURN(rc);
+        
+        RETURN(0);
 }
 
 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
 {
-        struct obd_device *obd;
-        struct filter_obd *filter;
+        int rc, rc2, cleanup_phase = 0, have_prepared = 0;
         struct dentry *dchild = NULL, *dparent = NULL;
+        struct llog_cookie *fcc = NULL;
         struct lvfs_run_ctxt saved;
+        struct filter_obd *filter;
+        struct obd_device *obd;
         void *handle = NULL;
-        struct llog_cookie *fcc = NULL;
-        int rc, rc2, cleanup_phase = 0, have_prepared = 0;
+        void *lock = NULL;
+        struct iattr iattr;
         ENTRY;
 
         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
@@ -2310,14 +2612,14 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
  acquire_locks:
-        dparent = filter_parent_lock(obd, oa->o_gr, oa->o_id);
+        dparent = filter_parent_lock(obd, oa->o_gr, oa->o_id, &lock);
         if (IS_ERR(dparent))
                 GOTO(cleanup, rc = PTR_ERR(dparent));
         cleanup_phase = 1;
 
-        dchild = filter_fid2dentry(obd, dparent, oa->o_gr, oa->o_id);
+        dchild = filter_id2dentry(obd, dparent, oa->o_gr, oa->o_id);
         if (IS_ERR(dchild))
-                GOTO(cleanup, rc = -ENOENT);
+                GOTO(cleanup, rc = PTR_ERR(dchild));
         cleanup_phase = 2;
 
         if (dchild->d_inode == NULL) {
@@ -2340,19 +2642,13 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                  * complication of condition the above code to skip it on the
                  * second time through. */
                 f_dput(dchild);
-                filter_parent_unlock(dparent);
+                filter_parent_unlock(dparent, lock);
 
                 filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
                 have_prepared = 1;
                 goto acquire_locks;
         }
 
-        handle = fsfilt_start_log(obd, dparent->d_inode, FSFILT_OP_UNLINK, oti, 1);
-        if (IS_ERR(handle))
-                GOTO(cleanup, rc = PTR_ERR(handle));
-
-        cleanup_phase = 3;
-
         /* Our MDC connection is established by the MDS to us */
         if (oa->o_valid & OBD_MD_FLCOOKIE) {
                 OBD_ALLOC(fcc, sizeof(*fcc));
@@ -2360,26 +2656,51 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
         }
 
+        /* we're gonna truncate it first in order to avoid possible
+         * deadlock:
+         *      P1                      P2
+         * open trasaction      open transaction
+         * down(i_zombie)       down(i_zombie)
+         *                      restart transaction
+         * (see BUG 4180) -bzzz
+         */
+        down(&dchild->d_inode->i_sem);
+        handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_SETATTR,NULL,1);
+        if (IS_ERR(handle)) {
+                up(&dchild->d_inode->i_sem);
+                GOTO(cleanup, rc = PTR_ERR(handle));
+        }
+
+        iattr.ia_valid = ATTR_SIZE;
+        iattr.ia_size = 0;
+        rc = fsfilt_setattr(obd, dchild, handle, &iattr, 1);
+
+        rc2 = fsfilt_commit(obd, filter->fo_sb, dparent->d_inode, handle, 0);
+        up(&dchild->d_inode->i_sem);
+        if (rc)
+                GOTO(cleanup, rc);
+        if (rc2)
+                GOTO(cleanup, rc = rc2);
+
+        handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1);
+        if (IS_ERR(handle))
+                GOTO(cleanup, rc = PTR_ERR(handle));
+
+        cleanup_phase = 3;
+
         rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
 
 cleanup:
         switch(cleanup_phase) {
         case 3:
                 if (fcc != NULL) {
-                        if (oti != NULL)
-                                fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
-                                                      oti->oti_handle,
-                                                      filter_cancel_cookies_cb,
-                                                      fcc);
-                        else
-                                fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
-                                                      handle,
-                                                      filter_cancel_cookies_cb,
-                                                      fcc);
+                        fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
+                                              oti ? oti->oti_handle : handle,
+                                              filter_cancel_cookies_cb, fcc);
                 }
                 rc = filter_finish_transno(exp, oti, rc);
                 rc2 = fsfilt_commit(obd, filter->fo_sb, dparent->d_inode, 
-                                    handle, 0);
+                                    handle, exp->exp_sync);
                 if (rc2) {
                         CERROR("error on commit, err = %d\n", rc2);
                         if (!rc)
@@ -2388,7 +2709,7 @@ cleanup:
         case 2:
                 f_dput(dchild);
         case 1:
-                filter_parent_unlock(dparent);
+                filter_parent_unlock(dparent, lock);
         case 0:
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 break;
@@ -2413,7 +2734,7 @@ static int filter_truncate(struct obd_export *exp, struct obdo *oa,
                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
                        end);
 
-        CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
+        CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = "LPU64", "
                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
         oa->o_size = start;
         error = filter_setattr(exp, oa, NULL, oti);
@@ -2427,7 +2748,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
-        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
@@ -2437,8 +2757,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(obd, filter->fo_sb);
                 /* flush any remaining cancel messages out to the target */
-                ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_REPL_CTXT);
-                llog_sync(ctxt, exp);
+                filter_sync_llogs(obd, NULL);
                 RETURN(rc);
         }
 
@@ -2449,7 +2768,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
         down(&dentry->d_inode->i_sem);
-        rc = filemap_fdatasync(dentry->d_inode->i_mapping);
+        rc = filemap_fdatawrite(dentry->d_inode->i_mapping);
         if (rc == 0) {
                 /* just any file to grab fsync method - "file" arg unused */
                 struct file *file = filter->fo_rcvd_filp;
@@ -2472,37 +2791,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                         unsigned long max_age)
-{
-        struct filter_obd *filter = &obd->u.filter;
-        int blockbits = filter->fo_sb->s_blocksize_bits;
-        int rc;
-        ENTRY;
-
-        /* at least try to account for cached pages.  its still racey and
-         * might be under-reporting if clients haven't announced their
-         * caches with brw recently */
-        spin_lock(&obd->obd_osfs_lock);
-        rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
-        memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
-        spin_unlock(&obd->obd_osfs_lock);
-
-        CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
-               " pending "LPU64" free "LPU64" avail "LPU64"\n",
-               filter->fo_tot_dirty, filter->fo_tot_granted,
-               filter->fo_tot_pending,
-               osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
-
-        filter_grant_sanity_check(obd, __FUNCTION__);
-
-        osfs->os_bavail -= min(osfs->os_bavail,
-                               (filter->fo_tot_dirty + filter->fo_tot_pending +
-                                osfs->os_bsize -1) >> blockbits);
-
-        RETURN(rc);
-}
-
 static int filter_get_info(struct obd_export *exp, __u32 keylen,
                            void *key, __u32 *vallen, void *val)
 {
@@ -2557,11 +2845,13 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen,
         RETURN(-EINVAL);
 }
 
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
+                                             struct obd_export *export)
 {
         struct filter_group_llog *fglog, *nlog;
         char name[32] = "CATLIST";
         struct filter_obd *filter;
+        struct llog_ctxt *ctxt;
         struct list_head *cur;
         int rc;
 
@@ -2571,12 +2861,18 @@ struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
         list_for_each(cur, &filter->fo_llog_list) {
                 fglog = list_entry(cur, struct filter_group_llog, list);
                 if (fglog->group == group) {
+                        if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
+                                CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
+                                      obd->obd_name, group, fglog->exp, export);
                         spin_unlock(&filter->fo_llog_list_lock);
-                        RETURN(fglog->llogs);
+                        goto init;
                 }
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
+        if (export == NULL)
+                RETURN(NULL);
+
         OBD_ALLOC(fglog, sizeof(*fglog));
         if (fglog == NULL)
                 RETURN(NULL);
@@ -2603,75 +2899,18 @@ struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
                 RETURN(NULL);
         }
 
-        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n", obd->obd_name,
-                fglog->llogs, group);
-
-        RETURN(fglog->llogs);
-}
-
-static int filter_set_info(struct obd_export *exp, __u32 keylen,
-                           void *key, __u32 vallen, void *val)
-{
-        struct lvfs_run_ctxt saved;
-        struct filter_export_data *fed = &exp->exp_filter_data;
-        struct obd_device *obd;
-        struct lustre_handle conn;
-        struct obd_llogs *llog;
-        struct llog_ctxt *ctxt;
-        __u32 group;
-        int rc = 0;
-        ENTRY;
-
-        conn.cookie = exp->exp_handle.h_cookie;
-
-        obd = exp->exp_obd;
-        if (obd == NULL) {
-                CDEBUG(D_IOCTL, "invalid exp %p cookie "LPX64"\n",
-                       exp, conn.cookie);
-                RETURN(-EINVAL);
-        }
-
-        if (keylen < strlen("mds_conn") ||
-            memcmp(key, "mds_conn", keylen) != 0)
-                RETURN(-EINVAL);
-
-        group = *((__u32 *)val);
-        if (fed->fed_group != 0 && fed->fed_group != group) {
-                char str[PTL_NALFMT_SIZE];
-                portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
-                                exp->exp_connection->c_peer.peer_nid, str);
-                CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
-                       "earlier; now it's trying to use group %d!  This could "
-                       "be a bug in the MDS.  Tell CFS.\n",
-                       exp->exp_connection->c_peer.peer_nid, str,
-                       fed->fed_group, group);
-                RETURN(-EPROTO);
-        }
-        fed->fed_group = group;
-        CWARN("Received MDS connection ("LPX64"); group %d\n", conn.cookie,
-              group);
-
-        LASSERT(rc == 0);
+init:
+        if (export) {
+                fglog->exp = export;
+                ctxt = llog_get_context(fglog->llogs, LLOG_UNLINK_REPL_CTXT);
+                LASSERT(ctxt != NULL);
 
-        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = filter_read_groups(obd, group, 1);
-        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        if (rc != 0) {
-                CERROR("can't read group %u\n", group);
-                RETURN(rc);
+                llog_receptor_accept(ctxt, export->exp_imp_reverse);
         }
-        rc = filter_group_set_kml_flags(obd, group);
-         if (rc != 0) {
-                CERROR("can't set kml flags %u\n", group);
-                RETURN(rc);
-        }
-        llog = filter_grab_llog_for_group(obd, group);
-        LASSERT(llog != NULL);
+        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
+               obd->obd_name, fglog->llogs, group);
 
-        ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
-        LASSERT(ctxt != NULL);
-        rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
-        RETURN(rc);
+        RETURN(fglog->llogs);
 }
 
 int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
@@ -2683,8 +2922,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
 
         switch (cmd) {
         case OBD_IOC_ABORT_RECOVERY:
-                CERROR("aborting recovery for device %s\n", obd->obd_name);
-                target_abort_recovery(obd);
+                target_stop_recovery_thread(obd);
                 RETURN(0);
 
         case OBD_IOC_SET_READONLY: {
@@ -2699,7 +2937,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
                 LASSERT(handle);
                 (void)fsfilt_commit(obd, sb, inode, handle, 1);
 
-                dev_set_rdonly(ll_sbdev(obd->u.filter.fo_sb), 2);
+                ll_set_rdonly(ll_sbdev(obd->u.filter.fo_sb));
                 RETURN(0);
         }
 
@@ -2707,7 +2945,6 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
                 rc = llog_catalog_list(obd, 1, data);
                 RETURN(rc);
         }
-
         case OBD_IOC_LLOG_CANCEL:
         case OBD_IOC_LLOG_REMOVE:
         case OBD_IOC_LLOG_INFO:
@@ -2780,9 +3017,10 @@ static int filter_llog_finish(struct obd_device *obd,
         RETURN(rc);
 }
 
-static int filter_llog_connect(struct obd_device *obd,
+static int filter_llog_connect(struct obd_export *exp,
                                struct llogd_conn_body *body) 
 {
+        struct obd_device *obd = exp->exp_obd;
         struct llog_ctxt *ctxt;
         struct obd_llogs *llog;
         int rc;
@@ -2792,7 +3030,7 @@ static int filter_llog_connect(struct obd_device *obd,
                (unsigned) body->lgdc_logid.lgl_ogr,
                (unsigned) body->lgdc_logid.lgl_oid,
                (unsigned) body->lgdc_logid.lgl_ogen);
-        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr);
+        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
         LASSERT(llog != NULL);
         ctxt = llog_get_context(llog, body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
@@ -2803,14 +3041,14 @@ static int filter_llog_connect(struct obd_device *obd,
         RETURN(rc);
 }
 
-static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
-                                             void *data)
+static struct dentry *filter_lvfs_id2dentry(__u64 id, __u32 gen, 
+                                           __u64 gr, void *data)
 {
-        return filter_fid2dentry(data, NULL, gr, id);
+        return filter_id2dentry(data, NULL, gr, id);
 }
 
 static struct lvfs_callback_ops filter_lvfs_ops = {
-        l_fid2dentry:     filter_lvfs_fid2dentry,
+        l_id2dentry:     filter_lvfs_id2dentry,
 };
 
 static struct obd_ops filter_obd_ops = {
@@ -2818,11 +3056,12 @@ static struct obd_ops filter_obd_ops = {
         .o_attach         = filter_attach,
         .o_detach         = filter_detach,
         .o_get_info       = filter_get_info,
-        .o_set_info       = filter_set_info,
         .o_setup          = filter_setup,
         .o_precleanup     = filter_precleanup,
         .o_cleanup        = filter_cleanup,
+        .o_process_config = filter_process_config,
         .o_connect        = filter_connect,
+        .o_connect_post   = filter_connect_post,
         .o_disconnect     = filter_disconnect,
         .o_statfs         = filter_statfs,
         .o_getattr        = filter_getattr,
@@ -2835,6 +3074,7 @@ static struct obd_ops filter_obd_ops = {
         .o_sync           = filter_sync,
         .o_preprw         = filter_preprw,
         .o_commitrw       = filter_commitrw,
+        .o_do_cow         = filter_do_cow,
         .o_write_extents  = filter_write_extents,
         .o_destroy_export = filter_destroy_export,
         .o_llog_init      = filter_llog_init,
@@ -2848,11 +3088,11 @@ static struct obd_ops filter_sanobd_ops = {
         .o_attach         = filter_attach,
         .o_detach         = filter_detach,
         .o_get_info       = filter_get_info,
-        .o_set_info       = filter_set_info,
         .o_setup          = filter_san_setup,
         .o_precleanup     = filter_precleanup,
         .o_cleanup        = filter_cleanup,
         .o_connect        = filter_connect,
+        .o_connect_post   = filter_connect_post,
         .o_disconnect     = filter_disconnect,
         .o_statfs         = filter_statfs,
         .o_getattr        = filter_getattr,
@@ -2865,6 +3105,7 @@ static struct obd_ops filter_sanobd_ops = {
         .o_sync           = filter_sync,
         .o_preprw         = filter_preprw,
         .o_commitrw       = filter_commitrw,
+        .o_do_cow         = filter_do_cow,
         .o_write_extents  = filter_write_extents,
         .o_san_preprw     = filter_san_preprw,
         .o_destroy_export = filter_destroy_export,
@@ -2877,21 +3118,34 @@ static struct obd_ops filter_sanobd_ops = {
 static int __init obdfilter_init(void)
 {
         struct lprocfs_static_vars lvars;
-        int rc;
+        int size, rc;
 
         printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
 
         lprocfs_init_vars(filter, &lvars);
 
+        size = OBDFILTER_CREATED_SCRATCHPAD_ENTRIES * 
+                sizeof(*obdfilter_created_scratchpad);
+        
+        OBD_ALLOC(obdfilter_created_scratchpad, size);
+        if (obdfilter_created_scratchpad == NULL) {
+                CERROR ("Can't allocate scratchpad\n");
+                return -ENOMEM;
+        }
+
         rc = class_register_type(&filter_obd_ops, NULL, lvars.module_vars,
                                  OBD_FILTER_DEVICENAME);
-        if (rc)
+        if (rc) {
+                OBD_FREE(obdfilter_created_scratchpad, size);
                 return rc;
+        }
 
         rc = class_register_type(&filter_sanobd_ops, NULL, lvars.module_vars,
                                  OBD_FILTER_SAN_DEVICENAME);
-        if (rc)
+        if (rc) {
                 class_unregister_type(OBD_FILTER_DEVICENAME);
+                OBD_FREE(obdfilter_created_scratchpad, size);
+        }
         return rc;
 }
 
@@ -2899,6 +3153,9 @@ static void __exit obdfilter_exit(void)
 {
         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
         class_unregister_type(OBD_FILTER_DEVICENAME);
+        OBD_FREE(obdfilter_created_scratchpad,
+                 OBDFILTER_CREATED_SCRATCHPAD_ENTRIES * 
+                 sizeof(*obdfilter_created_scratchpad));
 }
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");