Whamcloud - gitweb
Support for delayed-destroy of objects on the OST. It isn't perfect,
authoradilger <adilger>
Sat, 26 Oct 2002 08:29:39 +0000 (08:29 +0000)
committeradilger <adilger>
Sat, 26 Oct 2002 08:29:39 +0000 (08:29 +0000)
in the sense that there is still some work to be done for handling
multi-client unlink properly, but it passes all of the normal tests,
so it is no worse than what we have now.

lustre/include/linux/obd.h
lustre/include/linux/obd_filter.h
lustre/llite/file.c
lustre/lov/lov_obd.c
lustre/obdfilter/filter.c

index e5af36b..75759d3 100644 (file)
@@ -92,12 +92,13 @@ struct filter_obd {
         struct obd_run_ctxt fo_ctxt;
         struct dentry *fo_dentry_O;
         struct dentry *fo_dentry_O_mode[16];
-        spinlock_t fo_lock;
-        __u64 fo_lastino;
+        spinlock_t fo_objidlock;        /* protects fo_lastobjid increment */
+        __u64 fo_lastobjid;
         struct file_operations *fo_fop;
         struct inode_operations *fo_iop;
         struct address_space_operations *fo_aops;
         struct list_head fo_export_list;
+        spinlock_t fo_fddlock;          /* protects setting dentry->d_fsdata */
 };
 
 struct mds_server_data;
@@ -198,7 +199,7 @@ struct lov_tgt_desc {
 struct lov_obd {
         spinlock_t lov_lock;
         struct obd_device *mdcobd;
- struct lov_desc desc;
       struct lov_desc desc;
         int bufsize;
         int refcount;
         struct lov_tgt_desc *tgts;
index d37a198..fb3d1ff 100644 (file)
 /* In-memory access to client data from OST struct */
 struct filter_export_data {
         struct list_head  fed_open_head; /* files to close on disconnect */
-        spinlock_t        fed_lock;
+        spinlock_t        fed_lock;      /* protects fed_open_head */
 };
 
 /* file data for open files on OST */
 struct filter_file_data {
-        struct list_head  ffd_dentry_list;  /* dentry open list */
-        struct list_head  ffd_export_list;  /* export open list */
+        struct list_head  ffd_export_list;  /* export open list - fed_lock */
         struct file      *ffd_file;         /* file handle */
         __u64             ffd_servercookie; /* cookie for lustre handle */
 };
 
 struct filter_dentry_data {
-        struct list_head fdd_open_head; /* files that have this dentry open */
+        atomic_t         fdd_open_count;
         int              fdd_flags;
 };
 
index e5041a0..9429b79 100644 (file)
@@ -276,6 +276,7 @@ static int ll_file_release(struct inode *inode, struct file *file)
 
         /* If this fails and we goto out_fd, the file size on the MDS is out of
          * date.  Is that a big deal? */
+#warning "FIXME: don't do this if the file is unlinked already"
         if (file->f_mode & FMODE_WRITE) {
                 struct lustre_handle *lockhs;
 
index 05544f1..471c50d 100644 (file)
 #include <linux/obd_class.h>
 #include <linux/obd_lov.h>
 #include <linux/init.h>
+#include <linux/random.h>
+#include <linux/slab.h>
 #include <asm/div64.h>
 
+static kmem_cache_t *lov_file_cache;
+
+struct lov_file_handles {
+        struct list_head lfh_list;
+        __u64 lfh_cookie;
+        int lfh_count;
+        struct lustre_handle *lfh_handles;
+};
+
 /* obd methods */
 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                        obd_uuid_t cluuid, struct recovd_obd *recovd,
@@ -35,6 +46,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         struct lov_obd *lov = &obd->u.lov;
         struct client_obd *mdc = &lov->mdcobd->u.cli;
         struct lov_desc *desc = &lov->desc;
+        struct obd_export *exp;
         struct lustre_handle mdc_conn;
         obd_uuid_t *uuidarray;
         int rc, rc2, i;
@@ -52,6 +64,9 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         if (lov->refcount > 1)
                 RETURN(0);
 
+        exp = class_conn2export(conn);
+        INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
+
         /* retrieve LOV metadata from MDS */
         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
         if (rc) {
@@ -128,14 +143,17 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         for (i = 0; i < desc->ld_tgt_count; i++) {
                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
+
                 if (!tgt) {
                         CERROR("Target %s not attached\n", uuidarray[i]);
                         GOTO(out_disc, rc = -EINVAL);
                 }
+
                 if (!(tgt->obd_flags & OBD_SET_UP)) {
                         CERROR("Target %s not set up\n", uuidarray[i]);
                         GOTO(out_disc, rc = -EINVAL);
                 }
+
                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
                                  recover);
                 if (rc) {
@@ -177,12 +195,14 @@ static int lov_disconnect(struct lustre_handle *conn)
 {
         struct obd_device *obd = class_conn2obd(conn);
         struct lov_obd *lov = &obd->u.lov;
+        struct obd_export *exp;
+        struct list_head *p, *n;
         int rc, i;
 
         if (!lov->tgts)
                 goto out_local;
 
-        /* Only disconnect the underlying laters on the final disconnect. */
+        /* Only disconnect the underlying layers on the final disconnect. */
         lov->refcount--;
         if (lov->refcount != 0)
                 goto out_local;
@@ -207,6 +227,19 @@ static int lov_disconnect(struct lustre_handle *conn)
         lov->bufsize = 0;
         lov->tgts = NULL;
 
+        exp = class_conn2export(conn);
+        list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
+                /* XXX close these, instead of just discarding them? */
+                struct lov_file_handles *lfh;
+                lfh = list_entry(p, typeof(*lfh), lfh_list);
+                CERROR("discarding open LOV handle %p:"LPX64"\n",
+                       lfh, lfh->lfh_cookie);
+                list_del(&lfh->lfh_list);
+                OBD_FREE(lfh->lfh_handles,
+                         lfh->lfh_count * sizeof(*lfh->lfh_handles));
+                kmem_cache_free(lov_file_cache, lfh);
+        }
+
  out_local:
         rc = class_disconnect(conn);
         if (!rc)
@@ -299,6 +332,22 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         RETURN(rc);
 }
 
+static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
+{
+        struct lov_file_handles *lfh = NULL;
+
+        if (!handle || !handle->addr)
+                RETURN(NULL);
+
+        lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
+        if (!kmem_cache_validate(lov_file_cache, lfh))
+                RETURN(NULL);
+
+        if (lfh->lfh_cookie != handle->cookie)
+                RETURN(NULL);
+
+        return lfh;
+}
 
 /* the LOV expects oa->o_id to be set to the LOV object id */
 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
@@ -444,6 +493,7 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        struct lov_file_handles *lfh = NULL;
         int rc = 0, i;
         ENTRY;
 
@@ -461,11 +511,18 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
+        if (oa->o_valid & OBD_MD_FLHANDLE)
+                lfh = lov_handle2lfh(obdo_handle(oa));
+
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
-                /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
+                if (lfh)
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+                               sizeof(lfh->lfh_handles[i]));
+                else
+                        tmp.o_valid &= ~OBD_MD_FLHANDLE;
                 rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
                 if (rc)
                         CERROR("Error destroying objid "LPX64" subobj "LPX64
@@ -531,6 +588,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        struct lov_file_handles *lfh = NULL;
         int rc = 0, i;
         int new = 1;
         ENTRY;
@@ -550,8 +608,10 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                 RETURN(-ENODEV);
 
         lov = &export->exp_obd->u.lov;
-        oa->o_size = 0;
-        oa->o_blocks = 0;
+
+        if (oa->o_valid & OBD_MD_FLHANDLE)
+                lfh = lov_handle2lfh(obdo_handle(oa));
+
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
 
@@ -563,6 +623,11 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                 /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
+                if (lfh)
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+                               sizeof(lfh->lfh_handles[i]));
+                else
+                        tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
                 if (err) {
@@ -581,10 +646,11 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
                        struct lov_stripe_md *lsm)
 {
-        struct obdo tmp;
+        struct obdo *tmp;
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        struct lov_file_handles *lfh = NULL;
         int rc = 0, i;
         ENTRY;
 
@@ -610,15 +676,28 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
         /* size changes should go through punch and not setattr */
         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
 
+        tmp = obdo_alloc();
+        if (!tmp)
+                RETURN(-ENOMEM);
+
+        if (oa->o_valid & OBD_MD_FLHANDLE)
+                lfh = lov_handle2lfh(obdo_handle(oa));
+
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
 
-                /* create data objects with "parent" OA */
-                memcpy(&tmp, oa, sizeof(tmp));
-                tmp.o_id = loi->loi_id;
+                obdo_cpy_md(tmp, oa, oa->o_valid);
 
-                err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
+                if (lfh)
+                        memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
+                                sizeof(lfh->lfh_handles[i]));
+                else
+                        tmp->o_valid &= ~OBD_MD_FLHANDLE;
+
+                tmp->o_id = loi->loi_id;
+
+                err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
                 if (err) {
                         CERROR("Error setattr objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
@@ -627,6 +706,7 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
                                 rc = err;
                 }
         }
+        obdo_free(tmp);
         RETURN(rc);
 }
 
@@ -637,6 +717,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        struct lov_file_handles *lfh = NULL;
         int new = 1;
         int rc = 0, i;
         ENTRY;
@@ -659,6 +740,14 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         if (!tmp)
                 RETURN(-ENOMEM);
 
+        lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
+        if (!lfh)
+                GOTO(out_tmp, rc = -ENOMEM);
+        OBD_ALLOC(lfh->lfh_handles,
+                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+        if (!lfh->lfh_handles)
+                GOTO(out_lfh, rc = -ENOMEM);
+
         lov = &export->exp_obd->u.lov;
         oa->o_size = 0;
         oa->o_blocks = 0;
@@ -680,7 +769,25 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                 }
 
                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
+
+                if (tmp->o_valid & OBD_MD_FLHANDLE)
+                        memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
+                               sizeof(lfh->lfh_handles[i]));
         }
+
+        if (tmp->o_valid & OBD_MD_FLHANDLE) {
+                struct lustre_handle *handle = obdo_handle(oa);
+
+                lfh->lfh_count = lsm->lsm_stripe_count;
+                get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
+
+                handle->addr = (__u64)(unsigned long)lfh;
+                handle->cookie = lfh->lfh_cookie;
+                oa->o_valid |= OBD_MD_FLHANDLE;
+                list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
+        } else
+                goto out_handles;
+
         /* FIXME: returning an error, but having opened some objects is a bad
          *        idea, since they will likely never be closed.  We either
          *        need to not return an error if _some_ objects could be
@@ -688,8 +795,17 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
          *        hopefully partial error status) or close all opened objects
          *        and return an error.  I think the former is preferred.
          */
+out_tmp:
         obdo_free(tmp);
         RETURN(rc);
+
+out_handles:
+        OBD_FREE(lfh->lfh_handles,
+                 lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+out_lfh:
+        lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
+        kmem_cache_free(lov_file_cache, lfh);
+        goto out_tmp;
 }
 
 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
@@ -699,6 +815,7 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        struct lov_file_handles *lfh = NULL;
         int rc = 0, i;
         ENTRY;
 
@@ -716,6 +833,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
+        if (oa->o_valid & OBD_MD_FLHANDLE)
+                lfh = lov_handle2lfh(obdo_handle(oa));
+
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
@@ -723,6 +843,11 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
                 /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
+                if (lfh)
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+                               sizeof(lfh->lfh_handles[i]));
+                else
+                        tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
                 if (err) {
@@ -733,6 +858,14 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
                                 rc = err;
                 }
         }
+        if (lfh) {
+                list_del(&lfh->lfh_list);
+                OBD_FREE(lfh->lfh_handles,
+                         lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+                lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
+                kmem_cache_free(lov_file_cache, lfh);
+        }
+
         RETURN(rc);
 }
 
@@ -794,6 +927,7 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        struct lov_file_handles *lfh = NULL;
         int rc = 0, i;
         ENTRY;
 
@@ -811,6 +945,9 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
+        if (oa->o_valid & OBD_MD_FLHANDLE)
+                lfh = lov_handle2lfh(obdo_handle(oa));
+
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 obd_off starti = lov_stripe_offset(lsm, start, i);
@@ -822,6 +959,11 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
                 /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
+                if (lfh)
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+                               sizeof(lfh->lfh_handles[i]));
+                else
+                        tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
                                 starti, endi);
@@ -1197,16 +1339,24 @@ static int __init lov_init(void)
 {
         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
                ", info@clusterfs.com\n");
+        lov_file_cache = kmem_cache_create("ll_lov_file_data",
+                                           sizeof(struct lov_file_handles),
+                                           0, 0, NULL, NULL);
+        if (!lov_file_cache)
+                RETURN(-ENOMEM);
+
         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
 }
 
 static void __exit lov_exit(void)
 {
         class_unregister_type(OBD_LOV_DEVICENAME);
+        if (kmem_cache_destroy(lov_file_cache))
+                CERROR("couldn't free LOV open cache\n");
 }
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
+MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver " LOV_VERSION);
 MODULE_LICENSE("GPL");
 
 module_init(lov_init);
index 70ea821..15cfb2f 100644 (file)
 #endif
 #include <linux/quotaops.h>
 #include <linux/init.h>
+#include <linux/random.h>
 #include <linux/stringify.h>
 
-long filter_memory;
+static kmem_cache_t *filter_open_cache;
+static kmem_cache_t *filter_dentry_cache;
 
 #define FILTER_ROOTINO 2
 #define FILTER_ROOTINO_STR __stringify(FILTER_ROOTINO)
@@ -60,23 +62,36 @@ static int filter_id(char *buf, obd_id id, obd_mode mode)
 
 static inline void f_dput(struct dentry *dentry)
 {
+        /* Can't go inside filter_ddelete because it can block */
         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
         LASSERT(atomic_read(&dentry->d_count) > 0);
+
         dput(dentry);
 }
 
+/* Not racy w.r.t. others, because we are the only user of this dentry */
+static void filter_drelease(struct dentry *dentry)
+{
+        if (dentry->d_fsdata)
+                kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
+}
+
+struct dentry_operations filter_dops = {
+        .d_release = filter_drelease,
+};
+
 /* setup the object store with correct subdirectories */
-static int filter_prep(struct obd_device *obddev)
+static int filter_prep(struct obd_device *obd)
 {
         struct obd_run_ctxt saved;
-        struct filter_obd *filter = &obddev->u.filter;
+        struct filter_obd *filter = &obd->u.filter;
         struct dentry *dentry;
         struct dentry *root;
         struct file *file;
         struct inode *inode;
         int rc = 0;
-        __u64 lastino = 2;
+        __u64 lastobjid = 2;
         int mode = 0;
 
         push_ctxt(&saved, &filter->fo_ctxt, NULL);
@@ -150,28 +165,28 @@ static int filter_prep(struct obd_device *obddev)
         filter->fo_aops = inode->i_mapping->a_ops;
 
         if (inode->i_size == 0) {
-                __u64 disk_lastino = cpu_to_le64(lastino);
-                ssize_t retval = file->f_op->write(file, (char *)&disk_lastino,
-                                                   sizeof(disk_lastino),
+                __u64 disk_lastobjid = cpu_to_le64(lastobjid);
+                ssize_t retval = file->f_op->write(file,(char *)&disk_lastobjid,
+                                                   sizeof(disk_lastobjid),
                                                    &file->f_pos);
-                if (retval != sizeof(disk_lastino)) {
-                        CDEBUG(D_INODE, "OBD filter: error writing lastino\n");
+                if (retval != sizeof(disk_lastobjid)) {
+                        CDEBUG(D_INODE,"OBD filter: error writing lastobjid\n");
                         filp_close(file, 0);
                         GOTO(out_O_mode, rc = -EIO);
                 }
         } else {
-                __u64 disk_lastino;
-                ssize_t retval = file->f_op->read(file, (char *)&disk_lastino,
-                                                  sizeof(disk_lastino),
+                __u64 disk_lastobjid;
+                ssize_t retval = file->f_op->read(file, (char *)&disk_lastobjid,
+                                                  sizeof(disk_lastobjid),
                                                   &file->f_pos);
-                if (retval != sizeof(disk_lastino)) {
-                        CDEBUG(D_INODE, "OBD filter: error reading lastino\n");
+                if (retval != sizeof(disk_lastobjid)) {
+                        CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
                         filp_close(file, 0);
                         GOTO(out_O_mode, rc = -EIO);
                 }
-                lastino = le64_to_cpu(disk_lastino);
+                lastobjid = le64_to_cpu(disk_lastobjid);
         }
-        filter->fo_lastino = lastino;
+        filter->fo_lastobjid = lastobjid;
         filp_close(file, 0);
 
         rc = 0;
@@ -195,11 +210,11 @@ out_O:
 }
 
 /* cleanup the filter: write last used object id to status file */
-static void filter_post(struct obd_device *obddev)
+static void filter_post(struct obd_device *obd)
 {
         struct obd_run_ctxt saved;
-        struct filter_obd *filter = &obddev->u.filter;
-        __u64 disk_lastino;
+        struct filter_obd *filter = &obd->u.filter;
+        __u64 disk_lastobjid;
         long rc;
         struct file *file;
         int mode;
@@ -212,11 +227,11 @@ static void filter_post(struct obd_device *obddev)
         }
 
         file->f_pos = 0;
-        disk_lastino = cpu_to_le64(filter->fo_lastino);
-        rc = file->f_op->write(file, (char *)&disk_lastino,
-                       sizeof(disk_lastino), &file->f_pos);
-        if (rc != sizeof(disk_lastino))
-                CERROR("OBD filter: error writing lastino: rc = %ld\n", rc);
+        disk_lastobjid = cpu_to_le64(filter->fo_lastobjid);
+        rc = file->f_op->write(file, (char *)&disk_lastobjid,
+                       sizeof(disk_lastobjid), &file->f_pos);
+        if (rc != sizeof(disk_lastobjid))
+                CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
 
         rc = filp_close(file, NULL);
         if (rc)
@@ -235,25 +250,25 @@ out:
 }
 
 
-static __u64 filter_next_id(struct obd_device *obddev)
+static __u64 filter_next_id(struct obd_device *obd)
 {
         obd_id id;
 
-        spin_lock(&obddev->u.filter.fo_lock);
-        id = ++obddev->u.filter.fo_lastino;
-        spin_unlock(&obddev->u.filter.fo_lock);
+        spin_lock(&obd->u.filter.fo_objidlock);
+        id = ++obd->u.filter.fo_lastobjid;
+        spin_unlock(&obd->u.filter.fo_objidock);
 
-        /* FIXME: write the lastino to disk here */
+        /* FIXME: write the lastobjid to disk here */
         return id;
 }
 
 /* how to get files, dentries, inodes from object id's */
 /* parent i_sem is already held if needed for exclusivity */
-static struct dentry *filter_fid2dentry(struct obd_device *obddev,
+static struct dentry *filter_fid2dentry(struct obd_device *obd,
                                         struct dentry *dparent,
                                         __u64 id, __u32 type)
 {
-        struct super_block *sb = obddev->u.filter.fo_sb;
+        struct super_block *sb = obd->u.filter.fo_sb;
         struct dentry *dchild;
         char name[32];
         int len;
@@ -285,6 +300,11 @@ static struct dentry *filter_fid2dentry(struct obd_device *obddev,
                 RETURN(dchild);
         }
 
+        if (!dchild->d_op)
+                dchild->d_op = &filter_dops;
+        else
+                LASSERT(dchild->d_op == &filter_dops);
+
         CDEBUG(D_INODE, "got child obj O/%s/%s: %p, count = %d\n",
                obd_mode_to_type(type), name, dchild,
                atomic_read(&dchild->d_count));
@@ -294,10 +314,22 @@ static struct dentry *filter_fid2dentry(struct obd_device *obddev,
         RETURN(dchild);
 }
 
-static struct file *filter_obj_open(struct obd_device *obddev,
+static inline struct dentry *filter_parent(struct obd_device *obd,
+                                           obd_mode mode)
+{
+        struct filter_obd *filter = &obd->u.filter;
+
+        return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+}
+
+static struct file *filter_obj_open(struct obd_export *export,
                                     __u64 id, __u32 type)
 {
-        struct super_block *sb = obddev->u.filter.fo_sb;
+        struct filter_obd *filter = &export->exp_obd->u.filter;
+        struct super_block *sb = filter->fo_sb;
+        struct filter_export_data *fed = &export->exp_filter_data;
+        struct filter_dentry_data *fdd;
+        struct filter_file_data *ffd;
         struct obd_run_ctxt saved;
         char name[24];
         struct file *file;
@@ -319,48 +351,181 @@ static struct file *filter_obj_open(struct obd_device *obddev,
                 RETURN(ERR_PTR(-EINVAL));
         }
 
+        ffd = kmem_cache_alloc(filter_open_cache, SLAB_KERNEL);
+        if (!ffd) {
+                CERROR("obdfilter: out of memory\n");
+                RETURN(ERR_PTR(-ENOMEM));
+        }
+
+        /* We preallocate this to avoid blocking while holding fo_fddlock */
+        fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
+        if (!fdd) {
+                CERROR("obdfilter: out of memory\n");
+                GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
+        }
+
         filter_id(name, id, type);
-        push_ctxt(&saved, &obddev->u.filter.fo_ctxt, NULL);
+        push_ctxt(&saved, &filter->fo_ctxt, NULL);
         file = filp_open(name, O_RDONLY | O_LARGEFILE, 0 /* type? */);
         pop_ctxt(&saved);
 
-        CDEBUG(D_INODE, "opening obdo %s: rc = %p\n", name, file);
-
         if (IS_ERR(file))
-                file = NULL;
+                GOTO(out_fdd, file);
+
+        spin_lock(&filter->fo_fddlock);
+        if (file->f_dentry->d_fsdata) {
+                spin_unlock(&filter->fo_fddlock);
+                kmem_cache_free(filter_dentry_cache, fdd);
+                fdd = file->f_dentry->d_fsdata;
+                LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
+                /* should only happen during client recovery */
+                if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
+                        CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id);
+                atomic_inc(&fdd->fdd_open_count);
+        } else {
+                atomic_set(&fdd->fdd_open_count, 1);
+                spin_lock_init(&fdd->fdd_lock);
+                fdd->fdd_flags = 0;
+                /* If this is racy, then we can use {cmp}xchg and atomic_add */
+                file->f_dentry->d_fsdata = fdd;
+                spin_unlock(&filter->fo_fddlock);
+        }
+
+        get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
+        ffd->ffd_file = file;
+        file->private_data = ffd;
+
+        spin_lock(&fed->fed_lock);
+        list_add(&ffd->ffd_export_list, &fed->fed_open_head);
+        spin_unlock(&fed->fed_lock);
+
+        CDEBUG(D_INODE, "opening objid "LPX64": rc = %p\n", id, file);
+
+out:
         RETURN(file);
+
+out_fdd:
+        kmem_cache_free(filter_dentry_cache, fdd);
+out_ffd:
+        ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
+        kmem_cache_free(filter_open_cache, ffd);
+        goto out;
 }
 
-static struct dentry *filter_parent(struct obd_device *obddev, obd_mode mode)
+/* Caller must hold i_sem on dir_dentry->d_inode */
+static int filter_destroy_internal(struct obd_device *obd,
+                                   struct dentry *dir_dentry,
+                                   struct dentry *object_dentry)
 {
-        struct filter_obd *filter = &obddev->u.filter;
+        struct obd_run_ctxt saved;
+        struct inode *inode = object_dentry->d_inode;
+        int rc;
 
-        return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+        if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
+                CERROR("destroying objid %*s nlink = %d, count = %d\n",
+                       object_dentry->d_name.len,
+                       object_dentry->d_name.name,
+                       inode->i_nlink, atomic_read(&inode->i_count));
+        }
+
+        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
+        /* XXX unlink from PENDING directory now too */
+        pop_ctxt(&saved);
+
+        if (rc)
+                CERROR("error unlinking objid %*s: rc %d\n",
+                       object_dentry->d_name.len,
+                       object_dentry->d_name.name, rc);
+
+        return rc;
 }
 
+static int filter_close_internal(struct obd_device *obd,
+                                 struct filter_file_data *ffd)
+{
+        struct file *filp = ffd->ffd_file;
+        struct dentry *object_dentry = dget(filp->f_dentry);
+        struct filter_dentry_data *fdd = object_dentry->d_fsdata;
+        int rc, rc2 = 0;
+        ENTRY;
+
+        LASSERT(filp->private_data == ffd);
+        LASSERT(fdd);
+
+        rc = filp_close(filp, 0);
+
+        if (atomic_dec_and_test(&fdd->fdd_open_count) &&
+            fdd->fdd_flags & FILTER_FLAG_DESTROY) {
+                struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
+
+                down(&dir_dentry->d_inode->i_sem);
+                rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
+                if (rc2 && !rc)
+                        rc = rc2;
+                up(&dir_dentry->d_inode->i_sem);
+        }
+
+        f_dput(object_dentry);
+        kmem_cache_free(filter_open_cache, ffd);
+
+        RETURN(rc);
+}
 
 /* obd methods */
 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
                           obd_uuid_t cluuid, struct recovd_obd *recovd,
                           ptlrpc_recovery_cb_t recover)
 {
+        struct obd_export *exp;
         int rc;
+
         ENTRY;
         MOD_INC_USE_COUNT;
         rc = class_connect(conn, obd, cluuid);
         if (rc)
-                MOD_DEC_USE_COUNT;
+                GOTO(out_dec, rc);
+        exp = class_conn2export(conn);
+        LASSERT(exp);
+
+        INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
+        spin_lock_init(&exp->exp_filter_data.fed_lock);
+out:
         RETURN(rc);
+
+out_dec:
+        MOD_DEC_USE_COUNT;
+        goto out;
 }
 
 static int filter_disconnect(struct lustre_handle *conn)
 {
-        struct obd_export *export = class_conn2export(conn);
+        struct obd_export *exp = class_conn2export(conn);
+        struct filter_export_data *fed;
         int rc;
         ENTRY;
 
-        ldlm_cancel_locks_for_export(export);
+        LASSERT(exp);
+        fed = &exp->exp_filter_data;
+        spin_lock(&exp->exp_filter_data);
+        while(!list_empty(&fed->fed_open_head)) {
+                struct filter_file_data *ffd;
 
+                ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
+                                 ffd_export_list);
+                list_del(&ffd->ffd_export_list);
+                spin_unlock(&exp->exp_filter_data);
+
+                CERROR("force closing file %*s on disconnect\n",
+                       ffd->ffd_file->f_dentry->d_name.len,
+                       ffd->ffd_file->f_dentry->d_name.name);
+
+                filter_close_internal(exp->exp_obd, ffd);
+                spin_lock(&exp->exp_filter_data);
+        }
+        spin_unlock(&exp->exp_filter_data);
+
+        ldlm_cancel_locks_for_export(exp);
         rc = class_disconnect(conn);
         if (!rc)
                 MOD_DEC_USE_COUNT;
@@ -370,7 +535,7 @@ static int filter_disconnect(struct lustre_handle *conn)
 }
 
 /* mount the file system (secretly) */
-static int filter_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         struct obd_ioctl_data* data = buf;
         struct filter_obd *filter;
@@ -387,10 +552,11 @@ static int filter_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (IS_ERR(mnt))
                 GOTO(err_dec, err);
 
-        filter = &obddev->u.filter;;
+        filter = &obd->u.filter;;
         filter->fo_vfsmnt = mnt;
         filter->fo_fstype = strdup(data->ioc_inlbuf2);
         filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
+        CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, filter->fo_vfsmnt);
         /* XXX is this even possible if do_kern_mount succeeded? */
         if (!filter->fo_sb)
                 GOTO(err_kfree, err = -ENODEV);
@@ -400,18 +566,19 @@ static int filter_setup(struct obd_device *obddev, obd_count len, void *buf)
         filter->fo_ctxt.pwd = mnt->mnt_root;
         filter->fo_ctxt.fs = get_ds();
 
-        err = filter_prep(obddev);
+        err = filter_prep(obd);
         if (err)
                 GOTO(err_kfree, err);
         spin_lock_init(&filter->fo_lock);
+        INIT_LIST_HEAD(&filter->fo_export_list);
 
-        obddev->obd_namespace =
+        obd->obd_namespace =
                 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
-        if (obddev->obd_namespace == NULL)
+        if (obd->obd_namespace == NULL)
                 LBUG();
 
         ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           "filter_ldlm_client", &obddev->obd_ldlm_client);
+                           "filter_ldlm_client", &obd->obd_ldlm_client);
 
         RETURN(0);
 
@@ -428,33 +595,33 @@ err_dec:
 }
 
 
-static int filter_cleanup(struct obd_device * obddev)
+static int filter_cleanup(struct obd_device *obd)
 {
         struct super_block *sb;
         ENTRY;
 
-        if (!list_empty(&obddev->obd_exports)) {
+        if (!list_empty(&obd->obd_exports)) {
                 CERROR("still has clients!\n");
-                class_disconnect_all(obddev);
-                if (!list_empty(&obddev->obd_exports)) {
+                class_disconnect_all(obd);
+                if (!list_empty(&obd->obd_exports)) {
                         CERROR("still has exports after forced cleanup?\n");
                         RETURN(-EBUSY);
                 }
         }
 
-        ldlm_namespace_free(obddev->obd_namespace);
+        ldlm_namespace_free(obd->obd_namespace);
 
-        sb = obddev->u.filter.fo_sb;
-        if (!obddev->u.filter.fo_sb)
+        sb = obd->u.filter.fo_sb;
+        if (!obd->u.filter.fo_sb)
                 RETURN(0);
 
-        filter_post(obddev);
+        filter_post(obd);
 
         shrink_dcache_parent(sb->s_root);
         unlock_kernel();
-        mntput(obddev->u.filter.fo_vfsmnt);
-        obddev->u.filter.fo_sb = 0;
-        kfree(obddev->u.filter.fo_fstype);
+        mntput(obd->u.filter.fo_vfsmnt);
+        obd->u.filter.fo_sb = 0;
+        kfree(obd->u.filter.fo_fstype);
 
         lock_kernel();
 
@@ -463,8 +630,7 @@ static int filter_cleanup(struct obd_device * obddev)
 }
 
 
-static inline void filter_from_inode(struct obdo *oa, struct inode *inode,
-                                     int valid)
+static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
 {
         int type = oa->o_mode & S_IFMT;
         ENTRY;
@@ -485,33 +651,72 @@ static inline void filter_from_inode(struct obdo *oa, struct inode *inode,
         EXIT;
 }
 
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
-                          struct lov_stripe_md *md)
+static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
 {
-        struct obd_device *obddev = class_conn2obd(conn);
-        struct dentry *dentry;
-        int rc = 0;
+        struct filter_file_data *ffd = NULL;
         ENTRY;
 
-        if (!class_conn2export(conn)) {
-                CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
-                RETURN(-EINVAL);
+        if (!handle || !handle->addr)
+                RETURN(NULL);
+
+        ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
+        if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
+                RETURN(NULL);
+
+        if (ffd->ffd_servercookie != handle->cookie)
+                RETURN(NULL);
+
+        LASSERT(ffd->ffd_file->private_data == ffd);
+        RETURN(ffd);
+}
+
+static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
+                                         struct obdo *oa, char *what)
+{
+        struct dentry *dentry = NULL;
+
+        if (oa->o_valid & OBD_MD_FLHANDLE) {
+                struct lustre_handle *ost_handle = obdo_handle(oa);
+                struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
+
+                if (ffd)
+                        dentry = dget(ffd->ffd_file->f_dentry);
         }
 
-        obddev = class_conn2obd(conn);
-        dentry = filter_fid2dentry(obddev, filter_parent(obddev, oa->o_mode),
-                                   oa->o_id, oa->o_mode);
-        if (IS_ERR(dentry))
-                RETURN(PTR_ERR(dentry));
+        if (!dentry) {
+                struct obd_device *obd = class_conn2obd(conn);
+                if (!obd) {
+                        CERROR("invalid client "LPX64"\n", conn->addr);
+                        RETURN(ERR_PTR(-EINVAL));
+                }
+                dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
+                                           oa->o_id, oa->o_mode);
+        }
 
         if (!dentry->d_inode) {
-                CERROR("getattr on non-existent object: "LPU64"\n", oa->o_id);
-                GOTO(out_getattr, rc = -ENOENT);
+                CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
+                f_dput(dentry);
+                dentry = ERR_PTR(-ENOENT);
         }
 
+        return dentry;
+}
+
+#define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__)
+
+static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
+                          struct lov_stripe_md *md)
+{
+        struct dentry *dentry = NULL;
+        int rc = 0;
+        ENTRY;
+
+        dentry = filter_oa2dentry(conn, oa);
+        if (IS_ERR(dentry))
+                RETURN(PTR_ERR(dentry));
+
         filter_from_inode(oa, dentry->d_inode, oa->o_valid);
 
-out_getattr:
         f_dput(dentry);
         RETURN(rc);
 }
@@ -527,18 +732,14 @@ static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
         int rc;
         ENTRY;
 
-        iattr_from_obdo(&iattr, oa, oa->o_valid);
-        iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
-        dentry = filter_fid2dentry(obd, filter_parent(obd, iattr.ia_mode),
-                                   oa->o_id, iattr.ia_mode);
+        dentry = filter_oa2dentry(conn, oa);
+
         if (IS_ERR(dentry))
                 RETURN(PTR_ERR(dentry));
 
+        iattr_from_obdo(&iattr, oa, oa->o_valid);
+        iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
         inode = dentry->d_inode;
-        if (!inode) {
-                CERROR("setattr on non-existent object: "LPU64"\n", oa->o_id);
-                GOTO(out_setattr, rc = -ENOENT);
-        }
 
         lock_kernel();
         if (iattr.ia_valid & ATTR_SIZE)
@@ -556,7 +757,6 @@ static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
         }
         unlock_kernel();
 
-out_setattr:
         f_dput(dentry);
         RETURN(rc);
 }
@@ -565,9 +765,11 @@ static int filter_open(struct lustre_handle *conn, struct obdo *oa,
                        struct lov_stripe_md *ea)
 {
         struct obd_export *export;
-        struct obd_device *obd;
-        struct dentry *dentry;
-        /* ENTRY; */
+        struct lustre_handle *handle;
+        struct filter_file_data *ffd;
+        struct file *filp;
+        int rc = 0;
+        ENTRY;
 
         export = class_conn2export(conn);
         if (!export) {
@@ -575,56 +777,65 @@ static int filter_open(struct lustre_handle *conn, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
-        obd = export->exp_obd;
-        dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
-                                   oa->o_id, oa->o_mode);
-        if (IS_ERR(dentry))
-                RETURN(PTR_ERR(dentry));
-
-        if (!dentry->d_inode) {
-                CERROR("opening non-existent objid "LPX64"\n", oa->o_id);
-                RETURN(-ENOENT);
-        }
+        filp = filter_obj_open(export, oa->o_id, oa->o_mode);
+        if (IS_ERR(filp))
+                GOTO(out, rc = PTR_ERR(filp));
 
-        filter_from_inode(oa, dentry->d_inode, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                          OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+        filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
 
-        return 0;
+        ffd = filp->private_data;
+        handle = obdo_handle(oa);
+        handle->addr = (__u64)(unsigned long)ffd;
+        handle->cookie = ffd->ffd_servercookie;
+        oa->o_valid |= OBD_MD_FLHANDLE;
+out:
+        RETURN(rc);
 } /* filter_open */
 
 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
                         struct lov_stripe_md *ea)
 {
-        struct obd_device *obd;
-        struct dentry *dentry;
-        /* ENTRY; */
+        struct obd_export *exp;
+        struct filter_file_data *ffd;
+        int rc;
+        ENTRY;
 
-        obd = class_conn2obd(conn);
-        if (!obd) {
+        exp = class_conn2export(conn);
+        if (!exp) {
                 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
                 RETURN(-EINVAL);
         }
 
-        dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
-                                   oa->o_id, oa->o_mode);
-        if (IS_ERR(dentry))
-                RETURN(PTR_ERR(dentry));
-        LASSERT(atomic_read(&dentry->d_count) > 1);
+        if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
+                CERROR("no handle for close of objid "LPX64"\n", oa->o_id);
+                RETURN(-EINVAL);
+        }
 
-        f_dput(dentry);  /* for the close */
-        f_dput(dentry);  /* for this call */
-        return 0;
+        ffd = filter_handle2ffd(obdo_handle(oa));
+        if (!ffd) {
+                struct lustre_handle *handle = obdo_handle(oa);
+                CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
+                       handle->addr, handle->cookie);
+                RETURN(-ESTALE);
+        }
+
+        spin_lock(&exp->exp_filter_data);
+        list_del(&ffd->ffd_export_list);
+        spin_unlock(&exp->exp_filter_data);
+
+        rc = filter_close_internal(exp->exp_obd, ffd);
+
+        RETURN(rc);
 } /* filter_close */
 
 static int filter_create(struct lustre_handle* conn, struct obdo *oa,
                          struct lov_stripe_md **ea)
 {
+        struct obd_device *obd = class_conn2obd(conn);
         char name[64];
         struct obd_run_ctxt saved;
         struct dentry *new;
         int mode;
-        struct obd_device *obd = class_conn2obd(conn);
-        struct filter_obd *filter = &obd->u.filter;
         struct iattr;
         ENTRY;
 
@@ -643,10 +854,8 @@ static int filter_create(struct lustre_handle* conn, struct obdo *oa,
 
         //filter_id(name, oa->o_id, oa->o_mode);
         sprintf(name, LPU64, oa->o_id);
-        mode = (oa->o_mode & ~S_IFMT) | S_IFREG;
         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-        new = simple_mknod(filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT], name,
-                           mode);
+        new = simple_mknod(filter_parent(obd, oa->o_mode), name, mode);
         pop_ctxt(&saved);
         if (IS_ERR(new)) {
                 CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(new));
@@ -657,7 +866,7 @@ static int filter_create(struct lustre_handle* conn, struct obdo *oa,
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
         filter_from_inode(oa, new->d_inode, oa->o_valid);
-        dput(new);
+        f_dput(new);
 
         return 0;
 }
@@ -665,15 +874,11 @@ static int filter_create(struct lustre_handle* conn, struct obdo *oa,
 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
                           struct lov_stripe_md *ea)
 {
-        struct obd_device *obd;
-        struct filter_obd *filter;
-        struct obd_run_ctxt saved;
-        struct inode *inode;
+        struct obd_device *obd = class_conn2obd(conn);
         struct dentry *dir_dentry, *object_dentry;
         int rc;
         ENTRY;
 
-        obd = class_conn2obd(conn);
         if (!obd) {
                 CERROR("invalid client "LPX64"\n", conn->addr);
                 RETURN(-EINVAL);
@@ -684,34 +889,22 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
         dir_dentry = filter_parent(obd, oa->o_mode);
         down(&dir_dentry->d_inode->i_sem);
 
-        object_dentry = filter_fid2dentry(obd, dir_dentry, oa->o_id,
-                                          oa->o_mode);
+        object_dentry = filter_oa2dentry(conn, oa);
         if (IS_ERR(object_dentry))
                 GOTO(out, rc = -ENOENT);
 
-        inode = object_dentry->d_inode;
-        if (!inode) {
-                CERROR("trying to destroy negative inode "LPX64"!\n", oa->o_id);
-                GOTO(out, rc = -ENOENT);
-        }
-
-        if (inode->i_nlink != 1) {
-                CERROR("destroying inode with nlink = %d\n", inode->i_nlink);
-                LBUG();
-                inode->i_nlink = 1;
-        }
-        inode->i_mode = S_IFREG;
+        if (object_dentry->d_fsdata) {
+                struct filter_dentry_data *fdd = object_dentry->d_fsdata;
 
-        if (atomic_read(&inode->i_count) > 1) {
-#warning FIXME: need to handle open-unlinked case and move to hold dir
-                CERROR("inode has count %d\n", atomic_read(&inode->i_count));
+                if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
+                        fdd->fdd_flags |= FILTER_FLAG_DESTROY;
+                        /* XXX put into PENDING directory in case of crash */
+                }
+                GOTO(out_dput, rc = 0);
         }
 
-        filter = &obd->u.filter;
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
-
-        rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
-        pop_ctxt(&saved);
+        rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
+out_dput:
         f_dput(object_dentry);
 
         EXIT;
@@ -722,7 +915,7 @@ out:
 
 /* NB count and offset are used for punch, but not truncate */
 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
-                           struct lov_stripe_md *md,
+                           struct lov_stripe_md *lsm,
                            obd_off start, obd_off end)
 {
         int error;
@@ -743,26 +936,26 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
                               struct brw_page *pga, brw_callback_t callback,
                               struct io_cb_data *data)
 {
+        struct obd_export       *export = class_conn2export(conn);
         struct obd_run_ctxt      saved;
         struct super_block      *sb;
         int                      pnum;          /* index to pages (bufs) */
         unsigned long            retval;
         int                      error;
         struct file             *file;
-        struct obd_device       *obd = class_conn2obd(conn);
         int pg;
         ENTRY;
 
-        if (!obd) {
+        if (!export) {
                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
                 RETURN(-EINVAL);
         }
 
-        sb = obd->u.filter.fo_sb;
-        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        sb = export->exp_obd->u.filter.fo_sb;
+        push_ctxt(&saved, &export->exp_obd->u.filter.fo_ctxt, NULL);
         pnum = 0; /* pnum indexes buf 0..num_pages */
 
-        file = filter_obj_open(obd, lsm->lsm_object_id, S_IFREG);
+        file = filter_obj_open(export, lsm->lsm_object_id, S_IFREG);
         if (IS_ERR(file))
                 GOTO(out, retval = PTR_ERR(file));
 
@@ -1357,10 +1550,12 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
         ENTRY;
 
         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        lock_kernel();
         journal_save = current->journal_info;
         LASSERT(!journal_save);
 
         current->journal_info = private;
+        unlock_kernel();
         for (i = 0, o = obj, r = res; i < objcount; i++, o++) {
                 int j;
                 for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) {
@@ -1386,7 +1581,9 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
                         f_dput(r->dentry);
                 }
         }
+        lock_kernel();
         current->journal_info = journal_save;
+        unlock_kernel();
 
         if (!found_locked)
                 goto out_ctxt;
@@ -1579,12 +1776,30 @@ static struct obd_ops filter_obd_ops = {
 static int __init obdfilter_init(void)
 {
         printk(KERN_INFO "Filtering OBD driver  v0.001, info@clusterfs.com\n");
+        filter_open_cache = kmem_cache_create("ll_filter_fdata",
+                                              sizeof(struct filter_file_data),
+                                              0, 0, NULL, NULL);
+        if (!filter_open_cache)
+                RETURN(-ENOMEM);
+
+        filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
+                                        sizeof(struct filter_dentry_data),
+                                        0, 0, NULL, NULL);
+        if (!filter_dentry_cache) {
+                kmem_cache_destroy(filter_open_cache);
+                RETURN(-ENOMEM);
+        }
+
         return class_register_type(&filter_obd_ops, OBD_FILTER_DEVICENAME);
 }
 
 static void __exit obdfilter_exit(void)
 {
         class_unregister_type(OBD_FILTER_DEVICENAME);
+        if (kmem_cache_destroy(filter_dentry_cache))
+                CERROR("couldn't free obdfilter dentry cache\n");
+        if (kmem_cache_destroy(filter_open_cache))
+                CERROR("couldn't free obdfilter open cache\n");
 }
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");