Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index e2b1d30..9a2743e 100644 (file)
@@ -47,6 +47,7 @@
 #endif
 
 #include <linux/obd_class.h>
+#include <linux/obd_lov.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lprocfs_status.h>
 
 #include "filter_internal.h"
 
-#define S_SHIFT 12
-static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
-        [0]                     NULL,
-        [S_IFREG >> S_SHIFT]    "R",
-        [S_IFDIR >> S_SHIFT]    "D",
-        [S_IFCHR >> S_SHIFT]    "C",
-        [S_IFBLK >> S_SHIFT]    "B",
-        [S_IFIFO >> S_SHIFT]    "F",
-        [S_IFSOCK >> S_SHIFT]   "S",
-        [S_IFLNK >> S_SHIFT]    "L"
-};
-
-static inline const char *obd_mode_to_type(int mode)
-{
-        return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
-}
-
-static void filter_ffd_addref(void *ffdp)
-{
-        struct filter_file_data *ffd = ffdp;
-
-        atomic_inc(&ffd->ffd_refcount);
-        CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
-               atomic_read(&ffd->ffd_refcount));
-}
-
-static struct filter_file_data *filter_ffd_new(void)
-{
-        struct filter_file_data *ffd;
-
-        OBD_ALLOC(ffd, sizeof *ffd);
-        if (ffd == NULL) {
-                CERROR("out of memory\n");
-                return NULL;
-        }
-
-        atomic_set(&ffd->ffd_refcount, 2);
-
-        INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
-        class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
-
-        return ffd;
-}
+static struct lvfs_callback_ops filter_lvfs_ops;
 
-static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
-{
-        struct filter_file_data *ffd = NULL;
-        ENTRY;
-        LASSERT(handle != NULL);
-        ffd = class_handle2object(handle->cookie);
-        if (ffd != NULL)
-                LASSERT(ffd->ffd_file->private_data == ffd);
-        RETURN(ffd);
-}
-
-static void filter_ffd_put(struct filter_file_data *ffd)
-{
-        CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
-               atomic_read(&ffd->ffd_refcount) - 1);
-        LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
-                atomic_read(&ffd->ffd_refcount) < 0x5a5a);
-        if (atomic_dec_and_test(&ffd->ffd_refcount)) {
-                LASSERT(list_empty(&ffd->ffd_handle.h_link));
-                OBD_FREE(ffd, sizeof *ffd);
-        }
-}
-
-static void filter_ffd_destroy(struct filter_file_data *ffd)
-{
-        class_handle_unhash(&ffd->ffd_handle);
-        filter_ffd_put(ffd);
-}
+static int filter_destroy(struct obd_export *exp, struct obdo *oa,
+                          struct lov_stripe_md *ea, struct obd_trans_info *);
 
 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
                              void *cb_data, int error)
@@ -134,64 +67,6 @@ static void filter_commit_cb(struct obd_device *obd, __u64 transno,
         obd_transno_commit_cb(obd, transno, error);
 }
 
-static int filter_client_log_cancel(struct lustre_handle *conn,
-                                    struct lov_stripe_md *lsm, int count,
-                                    struct llog_cookie *cookies, int flags)
-{
-        struct obd_device *obd = class_conn2obd(conn);
-        struct llog_commit_data *llcd;
-        struct filter_obd *filter = &obd->u.filter;
-        int rc = 0;
-        ENTRY;
-
-        if (count == 0 || cookies == NULL) {
-                down(&filter->fo_sem);
-                if (filter->fo_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
-                        GOTO(out, rc);
-
-                llcd = filter->fo_llcd;
-                GOTO(send_now, rc);
-        }
-
-        down(&filter->fo_sem);
-        llcd = filter->fo_llcd;
-        if (llcd == NULL) {
-                llcd = llcd_grab();
-                if (llcd == NULL) {
-                        CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
-                               cookies->lgc_lgl.lgl_oid,
-                               cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
-                        GOTO(out, rc = -ENOMEM);
-                }
-                llcd->llcd_import = filter->fo_mdc_imp;
-                filter->fo_llcd = llcd;
-        }
-
-        memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
-               sizeof(*cookies));
-        llcd->llcd_cookiebytes += sizeof(*cookies);
-
-        GOTO(send_now, rc);
-send_now:
-        if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
-             flags & OBD_LLOG_FL_SENDNOW)) {
-                filter->fo_llcd = NULL;
-                llcd_send(llcd);
-        }
-out:
-        up(&filter->fo_sem);
-
-        return rc;
-}
-
-/* When this (destroy) operation is committed, return the cancel cookie */
-static void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
-                                     void *cb_data, int error)
-{
-        filter_client_log_cancel(&obd->u.filter.fo_mdc_conn, NULL, 1,
-                                 cb_data, OBD_LLOG_FL_SENDNOW);
-        OBD_FREE(cb_data, sizeof(struct llog_cookie));
-}
 
 /* Assumes caller has already pushed us into the kernel context. */
 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
@@ -202,48 +77,51 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
         struct filter_client_data *fcd = fed->fed_fcd;
         __u64 last_rcvd;
         loff_t off;
-        ssize_t written;
+        int err, log_pri = D_HA;
 
         /* Propagate error code. */
         if (rc)
                 RETURN(rc);
 
-        if (!exp->exp_obd->obd_replayable)
+        if (!exp->exp_obd->obd_replayable || oti == NULL)
                 RETURN(rc);
 
         /* we don't allocate new transnos for replayed requests */
-        if (oti != NULL && oti->oti_transno == 0) {
+        if (oti->oti_transno == 0) {
                 spin_lock(&filter->fo_translock);
                 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
                 filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
                 spin_unlock(&filter->fo_translock);
                 oti->oti_transno = last_rcvd;
-                fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
-                fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
-
-                /* could get xid from oti, if it's ever needed */
-                fcd->fcd_last_xid = 0;
-
-                off = fed->fed_lr_off;
-                fsfilt_set_last_rcvd(exp->exp_obd, last_rcvd, oti->oti_handle,
-                                     filter_commit_cb, NULL);
-                written = fsfilt_write_record(exp->exp_obd,
-                                              filter->fo_rcvd_filp, fcd,
-                                              sizeof(*fcd), &off);
-                CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
-                       "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid,
-                       fed->fed_lr_idx, written);
-
-                if (written == sizeof(*fcd))
-                        RETURN(0);
-                CERROR("error writing to %s: rc = %d\n", LAST_RCVD,
-                       (int)written);
-                if (written >= 0)
-                        RETURN(-ENOSPC);
-                RETURN(written);
+        } else { 
+                spin_lock(&filter->fo_translock);
+                last_rcvd = oti->oti_transno;
+                if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
+                        filter->fo_fsd->fsd_last_transno =
+                                cpu_to_le64(last_rcvd);
+                spin_unlock(&filter->fo_translock);
         }
+        fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
+        fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
 
-        RETURN(0);
+        /* could get xid from oti, if it's ever needed */
+        fcd->fcd_last_xid = 0;
+
+        off = fed->fed_lr_off;
+        fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
+                              filter_commit_cb, NULL);
+        err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
+                                  sizeof(*fcd), &off, 0);
+        if (err) {
+                log_pri = D_ERROR;
+                if (rc == 0)
+                        rc = err;
+        }
+
+        CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n",
+               last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err);
+
+        RETURN(rc);
 }
 
 void f_dput(struct dentry *dentry)
@@ -256,17 +134,6 @@ void f_dput(struct dentry *dentry)
         dput(dentry);
 }
 
-/* Not racy w.r.t. others, because we are the only user of this dentry */
-static void filter_drelease(struct dentry *dentry)
-{
-        if (dentry->d_fsdata)
-                OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
-}
-
-struct dentry_operations filter_dops = {
-        d_release: filter_drelease,
-};
-
 /* Add client data to the FILTER.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
  * Otherwise, we have just read the data from the last_rcvd file and
@@ -281,7 +148,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
         LASSERT(bitmap != NULL);
 
         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
-        if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
+        if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
                 RETURN(0);
 
         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
@@ -320,38 +187,35 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
         if (new_client) {
                 struct obd_run_ctxt saved;
                 loff_t off = fed->fed_lr_off;
-                int written;
+                int err;
                 void *handle;
 
                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
 
-                push_ctxt(&saved, &filter->fo_ctxt, NULL);
+                push_ctxt(&saved, &obd->obd_ctxt, NULL);
                 /* Transaction needed to fix bug 1403 */
                 handle = fsfilt_start(obd,
                                       filter->fo_rcvd_filp->f_dentry->d_inode,
                                       FSFILT_OP_SETATTR, NULL);
                 if (IS_ERR(handle)) {
-                        written = PTR_ERR(handle);
-                        CERROR("unable to start transaction: rc %d\n",
-                               (int)written);
+                        err = PTR_ERR(handle);
+                        CERROR("unable to start transaction: rc %d\n", err);
                 } else {
-                        written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
-                                                      fed->fed_fcd,
-                                                      sizeof(*fed->fed_fcd),
-                                                      &off);
+                        err = fsfilt_write_record(obd, filter->fo_rcvd_filp,
+                                                  fed->fed_fcd,
+                                                  sizeof(*fed->fed_fcd),
+                                                  &off, 1);
                         fsfilt_commit(obd,
                                       filter->fo_rcvd_filp->f_dentry->d_inode,
-                                      handle, 0);
+                                      handle, 1);
                 }
-                pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
 
-                if (written != sizeof(*fed->fed_fcd)) {
+                if (err) {
                         CERROR("error writing %s client idx %u: rc %d\n",
-                               LAST_RCVD, fed->fed_lr_idx, written);
-                        if (written < 0)
-                                RETURN(written);
-                        RETURN(-ENOSPC);
+                               LAST_RCVD, fed->fed_lr_idx, err);
+                        RETURN(err);
                 }
         }
         RETURN(0);
@@ -364,7 +228,7 @@ static int filter_client_free(struct obd_export *exp, int flags)
         struct obd_device *obd = exp->exp_obd;
         struct filter_client_data zero_fcd;
         struct obd_run_ctxt saved;
-        int written;
+        int rc;
         loff_t off;
         ENTRY;
 
@@ -375,7 +239,7 @@ static int filter_client_free(struct obd_export *exp, int flags)
                 GOTO(free, 0);
 
         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
-        if (strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID") == 0)
+        if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
                 GOTO(free, 0);
 
         LASSERT(filter->fo_last_rcvd_slots != NULL);
@@ -392,25 +256,15 @@ static int filter_client_free(struct obd_export *exp, int flags)
         }
 
         memset(&zero_fcd, 0, sizeof zero_fcd);
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
-        written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
-                                      &zero_fcd, sizeof(zero_fcd), &off);
-
-        /* XXX: this write gets lost sometimes, unless this sync is here. */
-        if (written > 0)
-                file_fsync(filter->fo_rcvd_filp,
-                           filter->fo_rcvd_filp->f_dentry, 1);
-        pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-
-        if (written != sizeof(zero_fcd)) {
-                CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
-                       fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
-                       LAST_RCVD, written);
-        } else {
-                CDEBUG(D_INFO,
-                       "zeroed disconnecting client %s at idx %u (%llu)\n",
-                       fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
-        }
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
+                                 sizeof(zero_fcd), &off, 1);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+        CDEBUG(rc == 0 ? D_INFO : D_ERROR,
+               "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
+               fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
+               LAST_RCVD, rc);
 
 free:
         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
@@ -429,41 +283,56 @@ static int filter_free_server_data(struct filter_obd *filter)
 }
 
 /* assumes caller is already in kernel ctxt */
-int filter_update_server_data(struct obd_device *obd,
-                              struct file *filp, struct filter_server_data *fsd)
+int filter_update_server_data(struct obd_device *obd, struct file *filp,
+                              struct filter_server_data *fsd, int force_sync)
 {
         loff_t off = 0;
         int rc;
         ENTRY;
 
         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
-        CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
-               le64_to_cpu(fsd->fsd_last_objid));
         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
                le64_to_cpu(fsd->fsd_last_transno));
         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
                le64_to_cpu(fsd->fsd_mount_count));
 
-        rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off);
-        if (rc == sizeof(*fsd))
-                RETURN(0);
+        rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync);
+        if (rc)
+                CERROR("error writing filter_server_data: rc = %d\n", rc);
+
+        RETURN(rc);
+}
+
+int filter_update_last_objid(struct obd_device *obd, obd_gr group,
+                             int force_sync)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        __u64 tmp;
+        loff_t off = 0;
+        int rc;
+        ENTRY;
+
+        CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n",
+               group, filter->fo_last_objids[group]);
 
-        CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n", rc);
-        if (rc >= 0)
-                RETURN(-ENOSPC);
+        tmp = cpu_to_le64(filter->fo_last_objids[group]);
+        rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
+                                 &tmp, sizeof(tmp), &off, force_sync);
+        if (rc)
+                CERROR("error writing group "LPU64" last objid: rc = %d\n",
+                       group, rc);
         RETURN(rc);
 }
 
 /* assumes caller has already in kernel ctxt */
-static int filter_init_server_data(struct obd_device *obd, struct file * filp,
-                                   __u64 init_lastobjid)
+static int filter_init_server_data(struct obd_device *obd, struct file * filp)
 {
         struct filter_obd *filter = &obd->u.filter;
         struct filter_server_data *fsd;
         struct filter_client_data *fcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = inode->i_size;
-        __u64 mount_count = 0;
+        __u64 mount_count;
         int cl_idx;
         loff_t off = 0;
         int rc;
@@ -490,7 +359,6 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
 
                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
-                fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
                 fsd->fsd_last_transno = 0;
                 mount_count = fsd->fsd_mount_count = 0;
                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
@@ -499,34 +367,35 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
         } else {
-                int retval = fsfilt_read_record(obd, filp, fsd,
-                                                sizeof(*fsd), &off);
-                if (retval != sizeof(*fsd)) {
+                rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off);
+                if (rc) {
                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
-                               LAST_RCVD, retval);
-                        GOTO(err_fsd, rc = -EIO);
+                               LAST_RCVD, rc);
+                        GOTO(err_fsd, rc);
+                }
+                if (strcmp(fsd->fsd_uuid, obd->obd_uuid.uuid) != 0) {
+                        CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
+                               obd->obd_uuid.uuid, fsd->fsd_uuid);
+                        GOTO(err_fsd, rc = -EINVAL);
                 }
                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
-                fsd->fsd_last_objid =
-                        cpu_to_le64(le64_to_cpu(fsd->fsd_last_objid) +
-                                    FILTER_SKIP_OBJID);
         }
 
-        if (fsd->fsd_feature_incompat) {
+        if (fsd->fsd_feature_incompat & ~le32_to_cpu(FILTER_INCOMPAT_SUPP)) {
                 CERROR("unsupported feature %x\n",
-                       le32_to_cpu(fsd->fsd_feature_incompat));
+                       le32_to_cpu(fsd->fsd_feature_incompat) &
+                       ~FILTER_INCOMPAT_SUPP);
                 GOTO(err_fsd, rc = -EINVAL);
         }
-        if (fsd->fsd_feature_rocompat) {
+        if (fsd->fsd_feature_rocompat & ~le32_to_cpu(FILTER_ROCOMPAT_SUPP)) {
                 CERROR("read-only feature %x\n",
-                       le32_to_cpu(fsd->fsd_feature_rocompat));
+                       le32_to_cpu(fsd->fsd_feature_rocompat) &
+                       ~FILTER_ROCOMPAT_SUPP);
                 /* Do something like remount filesystem read-only */
                 GOTO(err_fsd, rc = -EINVAL);
         }
 
-        CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
-               obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
                obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
@@ -539,34 +408,36 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
+        CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
+               last_rcvd_size <= FILTER_LR_CLIENT_START ? 0 :
+               (last_rcvd_size-FILTER_LR_CLIENT_START) /FILTER_LR_CLIENT_SIZE);
 
         if (!obd->obd_replayable) {
                 CWARN("%s: recovery support OFF\n", obd->obd_name);
                 GOTO(out, rc = 0);
         }
 
-        for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
+        for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start);
+             off < last_rcvd_size; cl_idx++) {
                 __u64 last_rcvd;
                 int mount_age;
 
                 if (!fcd) {
                         OBD_ALLOC(fcd, sizeof(*fcd));
                         if (!fcd)
-                                GOTO(err_fsd, rc = -ENOMEM);
+                                GOTO(err_client, rc = -ENOMEM);
                 }
 
-                /* Don't assume off is incremented properly, in case
-                 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
-                 */
+                /* Don't assume off is incremented properly by
+                 * fsfilt_read_record(), in case sizeof(*fcd)
+                 * isn't the same as fsd->fsd_client_size.  */
                 off = le32_to_cpu(fsd->fsd_client_start) +
                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
                 rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
-                if (rc != sizeof(*fcd)) {
-                        CERROR("error reading FILTER %s offset %d: rc = %d\n",
-                               LAST_RCVD, cl_idx, rc);
-                        if (rc > 0) /* XXX fatal error or just abort reading? */
-                                rc = -EIO;
-                        break;
+                if (rc) {
+                        CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
+                               LAST_RCVD, cl_idx, off, rc);
+                        break; /* read error shouldn't cause startup to fail */
                 }
 
                 if (fcd->fcd_uuid[0] == '\0') {
@@ -589,27 +460,23 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                                LPU64"\n", fcd->fcd_uuid, cl_idx,
                                last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
                                le64_to_cpu(fcd->fcd_mount_count), mount_count);
-                        if (exp == NULL) {
-                                /* XXX this rc is ignored  */
-                                rc = -ENOMEM;
-                                break;
-                        }
+                        if (exp == NULL)
+                                GOTO(err_client, rc = -ENOMEM);
+
                         memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
                                sizeof exp->exp_client_uuid.uuid);
                         fed = &exp->exp_filter_data;
                         fed->fed_fcd = fcd;
                         filter_client_add(obd, filter, fed, cl_idx);
                         /* create helper if export init gets more complex */
-                        INIT_LIST_HEAD(&fed->fed_open_head);
                         spin_lock_init(&fed->fed_lock);
 
                         fcd = NULL;
                         obd->obd_recoverable_clients++;
                         class_export_put(exp);
                 } else {
-                        CDEBUG(D_INFO,
-                               "discarded client %d UUID '%s' count "LPU64"\n",
-                               cl_idx, fcd->fcd_uuid,
+                        CDEBUG(D_INFO, "discarded client %d UUID '%s' count "
+                               LPU64"\n", cl_idx, fcd->fcd_uuid,
                                le64_to_cpu(fcd->fcd_mount_count));
                 }
 
@@ -619,18 +486,16 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
                         filter->fo_fsd->fsd_last_transno=cpu_to_le64(last_rcvd);
 
-                obd->obd_last_committed =
-                        le64_to_cpu(filter->fo_fsd->fsd_last_transno);
+        }
 
-                if (obd->obd_recoverable_clients) {
-                        CERROR("RECOVERY: %d recoverable clients, last_rcvd "
-                               LPU64"\n", obd->obd_recoverable_clients,
-                               le64_to_cpu(filter->fo_fsd->fsd_last_transno));
-                        obd->obd_next_recovery_transno =
-                                obd->obd_last_committed + 1;
-                        obd->obd_recovering = 1;
-                }
+        obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_transno);
 
+        if (obd->obd_recoverable_clients) {
+                CERROR("RECOVERY: %d recoverable clients, last_rcvd "
+                       LPU64"\n", obd->obd_recoverable_clients,
+                       le64_to_cpu(filter->fo_fsd->fsd_last_transno));
+                obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+                obd->obd_recovering = 1;
         }
 
         if (fcd)
@@ -640,65 +505,245 @@ out:
         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
 
         /* save it, so mount count and last_transno is current */
-        rc = filter_update_server_data(obd, filp, filter->fo_fsd);
+        rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
 
         RETURN(rc);
 
+err_client:
+        class_disconnect_exports(obd, 0);
 err_fsd:
         filter_free_server_data(filter);
         RETURN(rc);
 }
 
-/* setup the object store with correct subdirectories */
-static int filter_prep(struct obd_device *obd)
+static int filter_cleanup_groups(struct obd_device *obd)
 {
-        struct obd_run_ctxt saved;
         struct filter_obd *filter = &obd->u.filter;
-        struct dentry *dentry, *O_dentry;
-        struct file *file;
-        struct inode *inode;
         int i;
-        int rc = 0;
-        int mode = 0;
+        ENTRY;
+
+        if (filter->fo_subdir_count) {
+                for (i = 0; i < filter->fo_subdir_count; i++) {
+                        struct dentry *dentry = filter->fo_dentry_O_sub[i];
+                        f_dput(dentry);
+                        filter->fo_dentry_O_sub[i] = NULL;
+                }
+                OBD_FREE(filter->fo_dentry_O_sub,
+                         filter->fo_subdir_count *
+                         sizeof(*filter->fo_dentry_O_sub));
+        }
+        if (filter->fo_dentry_O_groups != NULL &&
+            filter->fo_last_objids != NULL &&
+            filter->fo_last_objid_files != NULL) {
+                for (i = 0; i < FILTER_GROUPS; i++) {
+                        struct dentry *dentry = filter->fo_dentry_O_groups[i];
+                        struct file *filp = filter->fo_last_objid_files[i];
+                        if (dentry != NULL) {
+                                f_dput(dentry);
+                                filter->fo_dentry_O_groups[i] = NULL;
+                        }
+                        if (filp != NULL) {
+                                filp_close(filp, 0);
+                                filter->fo_last_objid_files[i] = NULL;
+                        }
+                }
+        }
+        if (filter->fo_dentry_O_groups != NULL)
+                OBD_FREE(filter->fo_dentry_O_groups,
+                         FILTER_GROUPS * sizeof(struct dentry *));
+        if (filter->fo_last_objids != NULL)
+                OBD_FREE(filter->fo_last_objids,
+                         FILTER_GROUPS * sizeof(__u64));
+        if (filter->fo_last_objid_files != NULL)
+                OBD_FREE(filter->fo_last_objid_files,
+                         FILTER_GROUPS * sizeof(struct file *));
+        RETURN(0);
+}
+
+/* FIXME: object groups */
+static int filter_prep_groups(struct obd_device *obd)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        struct dentry *dentry, *O_dentry;
+        struct file *filp;
+        int i, rc = 0, cleanup_phase = 0;
+        ENTRY;
 
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
-        dentry = simple_mkdir(current->fs->pwd, "O", 0700);
-        CDEBUG(D_INODE, "got/created O: %p\n", dentry);
-        if (IS_ERR(dentry)) {
-                rc = PTR_ERR(dentry);
+        O_dentry = simple_mkdir(current->fs->pwd, "O", 0700);
+        CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
+        if (IS_ERR(O_dentry)) {
+                rc = PTR_ERR(O_dentry);
                 CERROR("cannot open/create O: rc = %d\n", rc);
-                GOTO(out, rc);
+                GOTO(cleanup, rc);
         }
-        filter->fo_dentry_O = dentry;
+        filter->fo_dentry_O = O_dentry;
+        cleanup_phase = 1; /* O_dentry */
 
-        /*
-         * Create directories and/or get dentries for each object type.
-         * This saves us from having to do multiple lookups for each one.
-         */
-        O_dentry = filter->fo_dentry_O;
-        for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
-                char *name = obd_type_by_mode[mode];
+        /* Lookup "R" to tell if we're on an old OST FS and need to convert
+         * from O/R/<dir>/<objid> to O/0/<dir>/<objid>.  This can be removed
+         * some time post 1.0 when all old-style OSTs have converted along
+         * with the init_objid hack. */
+        dentry = ll_lookup_one_len("R", O_dentry, 1);
+        if (IS_ERR(dentry))
+                GOTO(cleanup, rc = PTR_ERR(dentry));
+        if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
+                struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1);
+                ENTRY;
+
+                CWARN("converting OST to new object layout\n");
+                if (IS_ERR(O0_dentry)) {
+                        rc = PTR_ERR(O0_dentry);
+                        CERROR("error looking up O/0: rc %d\n", rc);
+                        GOTO(cleanup_R, rc);
+                }
 
-                if (!name) {
-                        filter->fo_dentry_O_mode[mode] = NULL;
-                        continue;
+                if (O0_dentry->d_inode) {
+                        CERROR("Both O/R and O/0 exist. Fix manually.\n");
+                        GOTO(cleanup_O0, rc = -EEXIST);
+                }
+
+                down(&O_dentry->d_inode->i_sem);
+                rc = vfs_rename(O_dentry->d_inode, dentry,
+                                O_dentry->d_inode, O0_dentry);
+                up(&O_dentry->d_inode->i_sem);
+
+                if (rc) {
+                        CERROR("error renaming O/R to O/0: rc %d\n", rc);
+                        GOTO(cleanup_O0, rc);
                 }
+                filter->fo_fsd->fsd_feature_incompat |=
+                        cpu_to_le32(FILTER_INCOMPAT_GROUPS);
+                rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
+                                               filter->fo_fsd, 1);
+                GOTO(cleanup_O0, rc);
+
+        cleanup_O0:
+                dput(O0_dentry);
+        cleanup_R:
+                dput(dentry);
+                if (rc)
+                        GOTO(cleanup, rc);
+        } else {
+                dput(dentry);
+        }
+
+        OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
+        if (filter->fo_last_objids == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
+        cleanup_phase = 2; /* groups */
+
+        OBD_ALLOC(filter->fo_dentry_O_groups, FILTER_GROUPS * sizeof(dentry));
+        if (filter->fo_dentry_O_groups == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
+        OBD_ALLOC(filter->fo_last_objid_files, FILTER_GROUPS * sizeof(filp));
+        if (filter->fo_last_objid_files == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
+
+        for (i = 0; i < FILTER_GROUPS; i++) {
+                char name[25];
+                loff_t off = 0;
+
+                sprintf(name, "%d", i);
                 dentry = simple_mkdir(O_dentry, name, 0700);
                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
                 if (IS_ERR(dentry)) {
                         rc = PTR_ERR(dentry);
                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
-                        GOTO(err_O_mode, rc);
+                        GOTO(cleanup, rc);
+                }
+                filter->fo_dentry_O_groups[i] = dentry;
+
+                sprintf(name, "O/%d/LAST_ID", i);
+                filp = filp_open(name, O_CREAT | O_RDWR, 0700);
+                if (IS_ERR(dentry)) {
+                        rc = PTR_ERR(dentry);
+                        CERROR("cannot create %s: rc = %d\n", name, rc);
+                        GOTO(cleanup, rc);
+                }
+                filter->fo_last_objid_files[i] = filp;
+
+                if (filp->f_dentry->d_inode->i_size == 0) {
+                        if (i == 0 && filter->fo_fsd->fsd_unused != 0) {
+                                /* OST conversion, remove sometime post 1.0 */
+                                filter->fo_last_objids[i] =
+                                        le64_to_cpu(filter->fo_fsd->fsd_unused);
+                                CWARN("saving old objid "LPU64" to LAST_ID\n",
+                                      filter->fo_last_objids[i]);
+                                rc = filter_update_last_objid(obd, 0, 1);
+                                if (rc)
+                                        GOTO(cleanup, rc);
+                        } else {
+                                filter->fo_last_objids[i] = FILTER_INIT_OBJID;
+                        }
+                        continue;
+                }
+
+                rc = fsfilt_read_record(obd, filp, &filter->fo_last_objids[i],
+                                        sizeof(__u64), &off);
+                if (rc) {
+                        CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
+                               name, rc);
+                        GOTO(cleanup, rc);
+                }
+                filter->fo_last_objids[i] =
+                        le64_to_cpu(filter->fo_last_objids[i]);
+                CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
+                       obd->obd_name, i, filter->fo_last_objids[i]);
+        }
+
+        if (filter->fo_subdir_count) {
+                O_dentry = filter->fo_dentry_O_groups[0];
+                OBD_ALLOC(filter->fo_dentry_O_sub,
+                          filter->fo_subdir_count * sizeof(dentry));
+                if (filter->fo_dentry_O_sub == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
+
+                for (i = 0; i < filter->fo_subdir_count; i++) {
+                        char dir[20];
+                        snprintf(dir, sizeof(dir), "d%u", i);
+
+                        dentry = simple_mkdir(O_dentry, dir, 0700);
+                        CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
+                        if (IS_ERR(dentry)) {
+                                rc = PTR_ERR(dentry);
+                                CERROR("can't create O/0/%s: rc = %d\n",dir,rc);
+                                GOTO(cleanup, rc);
+                        }
+                        filter->fo_dentry_O_sub[i] = dentry;
                 }
-                filter->fo_dentry_O_mode[mode] = dentry;
         }
+        RETURN(0);
+
+ cleanup:
+        switch (cleanup_phase) {
+        case 2:
+                filter_cleanup_groups(obd);
+        case 1:
+                f_dput(filter->fo_dentry_O);
+                filter->fo_dentry_O = NULL;
+        default:
+                break;
+        }
+        return rc;
+}
+
+/* setup the object store with correct subdirectories */
+static int filter_prep(struct obd_device *obd)
+{
+        struct obd_run_ctxt saved;
+        struct filter_obd *filter = &obd->u.filter;
+        struct file *file;
+        struct inode *inode;
+        int rc = 0;
+        ENTRY;
 
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
         if (!file || IS_ERR(file)) {
                 rc = PTR_ERR(file);
                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
                        LAST_RCVD, rc);
-                GOTO(err_O_mode, rc);
+                GOTO(out, rc);
         }
 
         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
@@ -707,84 +752,35 @@ static int filter_prep(struct obd_device *obd)
                 GOTO(err_filp, rc = -ENOENT);
         }
 
-        rc = fsfilt_journal_data(obd, file);
-        if (rc) {
-                CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
-                GOTO(err_filp, rc);
-        }
         /* steal operations */
         inode = file->f_dentry->d_inode;
         filter->fo_fop = file->f_op;
         filter->fo_iop = inode->i_op;
         filter->fo_aops = inode->i_mapping->a_ops;
-#ifdef I_SKIP_PDFLUSH
-        /*
-         * we need this to protect from deadlock
-         * pdflush vs. lustre_fwrite()
-         */
-        inode->i_flags |= I_SKIP_PDFLUSH;
-#endif
 
-        rc = filter_init_server_data(obd, file, FILTER_INIT_OBJID);
+        rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
-                GOTO(err_client, rc);
+                GOTO(err_filp, rc);
         }
         filter->fo_rcvd_filp = file;
 
-        if (filter->fo_subdir_count) {
-                O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
-                OBD_ALLOC(filter->fo_dentry_O_sub,
-                          filter->fo_subdir_count * sizeof(dentry));
-                if (!filter->fo_dentry_O_sub)
-                        GOTO(err_client, rc = -ENOMEM);
-
-                for (i = 0; i < filter->fo_subdir_count; i++) {
-                        char dir[20];
-                        snprintf(dir, sizeof(dir), "d%u", i);
+        rc = filter_prep_groups(obd);
+        if (rc)
+                GOTO(err_server_data, rc);
 
-                        dentry = simple_mkdir(O_dentry, dir, 0700);
-                        CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
-                        if (IS_ERR(dentry)) {
-                                rc = PTR_ERR(dentry);
-                                CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
-                                GOTO(err_O_sub, rc);
-                        }
-                        filter->fo_dentry_O_sub[i] = dentry;
-                }
-        }
-        rc = 0;
  out:
-        pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
 
         return(rc);
 
-err_O_sub:
-        while (i-- > 0) {
-                struct dentry *dentry = filter->fo_dentry_O_sub[i];
-                if (dentry) {
-                        f_dput(dentry);
-                        filter->fo_dentry_O_sub[i] = NULL;
-                }
-        }
-        OBD_FREE(filter->fo_dentry_O_sub,
-                 filter->fo_subdir_count * sizeof(dentry));
-err_client:
-        class_disconnect_exports(obd, 0);
-err_filp:
+ err_server_data:
+        //class_disconnect_exports(obd, 0);
+        filter_free_server_data(filter);
+ err_filp:
         if (filp_close(file, 0))
                 CERROR("can't close %s after error\n", LAST_RCVD);
         filter->fo_rcvd_filp = NULL;
-err_O_mode:
-        while (mode-- > 0) {
-                struct dentry *dentry = filter->fo_dentry_O_mode[mode];
-                if (dentry) {
-                        f_dput(dentry);
-                        filter->fo_dentry_O_mode[mode] = NULL;
-                }
-        }
-        f_dput(filter->fo_dentry_O);
-        filter->fo_dentry_O = NULL;
         goto out;
 }
 
@@ -793,60 +789,66 @@ static void filter_post(struct obd_device *obd)
 {
         struct obd_run_ctxt saved;
         struct filter_obd *filter = &obd->u.filter;
-        long rc;
-        int mode;
+        int rc, i;
 
         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
          * best to start a transaction with h_sync, because we removed this
          * from lastobjid */
 
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                       filter->fo_fsd);
+                                       filter->fo_fsd, 0);
         if (rc)
-                CERROR("error writing lastobjid: rc = %ld\n", rc);
+                CERROR("error writing server data: rc = %d\n", rc);
 
-
-        if (filter->fo_rcvd_filp) {
-                rc = file_fsync(filter->fo_rcvd_filp,
-                                filter->fo_rcvd_filp->f_dentry, 1);
-                filp_close(filter->fo_rcvd_filp, 0);
-                filter->fo_rcvd_filp = NULL;
+        for (i = 0; i < FILTER_GROUPS; i++) {
+                rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1));
                 if (rc)
-                        CERROR("error closing %s: rc = %ld\n", LAST_RCVD, rc);
+                        CERROR("error writing group %d lastobjid: rc = %d\n",
+                               i, rc);
         }
 
-        if (filter->fo_subdir_count) {
-                int i;
-                for (i = 0; i < filter->fo_subdir_count; i++) {
-                        struct dentry *dentry = filter->fo_dentry_O_sub[i];
-                        f_dput(dentry);
-                        filter->fo_dentry_O_sub[i] = NULL;
-                }
-                OBD_FREE(filter->fo_dentry_O_sub,
-                         filter->fo_subdir_count *
-                         sizeof(*filter->fo_dentry_O_sub));
-        }
-        for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
-                struct dentry *dentry = filter->fo_dentry_O_mode[mode];
-                if (dentry) {
-                        f_dput(dentry);
-                        filter->fo_dentry_O_mode[mode] = NULL;
-                }
-        }
+        filp_close(filter->fo_rcvd_filp, 0);
+        filter->fo_rcvd_filp = NULL;
+        if (rc)
+                CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
+
+        filter_cleanup_groups(obd);
         f_dput(filter->fo_dentry_O);
         filter_free_server_data(filter);
-        pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+}
+
+static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa,
+                               obd_id id)
+{
+        obd_gr group = 0;
+        LASSERT(filter->fo_fsd != NULL);
+
+        if (oa != NULL) {
+                LASSERT(oa->o_gr <= FILTER_GROUPS);
+                group = oa->o_gr;
+        }
+
+        spin_lock(&filter->fo_objidlock);
+        filter->fo_last_objids[group] = id;
+        spin_unlock(&filter->fo_objidlock);
 }
 
-__u64 filter_next_id(struct filter_obd *filter)
+__u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
 {
         obd_id id;
+        obd_gr group = 0;
         LASSERT(filter->fo_fsd != NULL);
 
+        if (oa != NULL) {
+                LASSERT(oa->o_gr <= FILTER_GROUPS);
+                group = oa->o_gr;
+        }
+
+        /* FIXME: object groups */
         spin_lock(&filter->fo_objidlock);
-        id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
-        filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
+        id = filter->fo_last_objids[group];
         spin_unlock(&filter->fo_objidlock);
 
         return id;
@@ -868,7 +870,7 @@ static int filter_blocking_ast(struct ldlm_lock *lock,
         /* XXX layering violation!  -phil */
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
-         * such that mds_blocking_ast is called just before l_i_p takes the
+         * such that filter_blocking_ast is called just before l_i_p takes the
          * ns_lock, then by the time we get the lock, we might not be the
          * correct blocking function anymore.  So check, and return early, if
          * so. */
@@ -911,7 +913,7 @@ static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
                               &flags, ldlm_completion_ast,
                               filter_blocking_ast, NULL, lockh);
 
-        RETURN(rc == ELDLM_OK ? 0 : -ENOLCK);  /* XXX translate ldlm code */
+        RETURN(rc == ELDLM_OK ? 0 : -EIO);  /* XXX translate ldlm code */
 }
 
 /* We never dget the object parent, so DON'T dput it either */
@@ -923,25 +925,24 @@ static void filter_parent_unlock(struct dentry *dparent,
 }
 
 /* We never dget the object parent, so DON'T dput it either */
-struct dentry *filter_parent(struct obd_device *obd, obd_mode mode,
-                             obd_id objid)
+struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
 {
         struct filter_obd *filter = &obd->u.filter;
+        LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */
 
-        LASSERT(S_ISREG(mode));   /* only regular files for now */
-        if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
-                return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+        if (group > 0 || filter->fo_subdir_count == 0)
+                return filter->fo_dentry_O_groups[group];
 
         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
 }
 
 /* We never dget the object parent, so DON'T dput it either */
-struct dentry *filter_parent_lock(struct obd_device *obd, obd_mode mode,
+struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
                                   obd_id objid, ldlm_mode_t lock_mode,
                                   struct lustre_handle *lockh)
 {
         unsigned long now = jiffies;
-        struct dentry *de = filter_parent(obd, mode, objid);
+        struct dentry *de = filter_parent(obd, group, objid);
         int rc;
 
         if (IS_ERR(de))
@@ -962,7 +963,7 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_mode mode,
  * internal to the filesystem code. */
 struct dentry *filter_fid2dentry(struct obd_device *obd,
                                  struct dentry *dir_dentry,
-                                 obd_mode mode, obd_id id)
+                                 obd_gr group, obd_id id)
 {
         struct lustre_handle lockh;
         struct dentry *dparent = dir_dentry;
@@ -973,13 +974,12 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
 
         if (id == 0) {
                 CERROR("fatal: invalid object id 0\n");
-                LBUG();
                 RETURN(ERR_PTR(-ESTALE));
         }
 
         len = sprintf(name, LPU64, id);
         if (dir_dentry == NULL) {
-                dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
+                dparent = filter_parent_lock(obd, group, id, LCK_PR, &lockh);
                 if (IS_ERR(dparent))
                         RETURN(dparent);
         }
@@ -1001,135 +1001,32 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
         RETURN(dchild);
 }
 
-static struct file *filter_obj_open(struct obd_export *export,
-                                    struct obd_trans_info *oti,
-                                    __u64 id, __u32 type, int parent_mode,
-                                    struct lustre_handle *parent_lockh)
+static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
 {
-        struct obd_device *obd = export->exp_obd;
-        struct filter_obd *filter = &obd->u.filter;
-        struct dentry *dchild = NULL, *dparent = NULL;
-        struct filter_export_data *fed = &export->exp_filter_data;
-        struct filter_dentry_data *fdd = NULL;
-        struct filter_file_data *ffd = NULL;
-        struct obd_run_ctxt saved;
-        char name[24];
-        struct file *file;
-        int len, cleanup_phase = 0;
+        struct lustre_handle lockh;
+        int flags = LDLM_AST_DISCARD_DATA, rc;
+        struct ldlm_res_id res_id = { .name = { objid } };
+        struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
         ENTRY;
 
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
-
-        if (!id) {
-                CERROR("fatal: invalid obdo "LPU64"\n", id);
-                GOTO(cleanup, file = ERR_PTR(-ESTALE));
-        }
+        /* Tell the clients that the object is gone now and that they should
+         * throw away any cached pages.  If we're the OST at stripe 0 in the
+         * file then this enqueue will communicate the DISCARD to all the
+         * clients.  This assumes that we always destroy all the objects for
+         * a file at a time, as is currently the case.  If we're not the
+         * OST at stripe 0 then we'll harmlessly get a very lonely lock in 
+         * the local DLM and immediately drop it. */
+        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
+                              res_id, LDLM_EXTENT, &extent,
+                              sizeof(extent), LCK_PW, &flags,
+                              ldlm_completion_ast, filter_blocking_ast,
+                              NULL, &lockh);
 
-        if (!(type & S_IFMT)) {
-                CERROR("OBD %s, object "LPU64" has bad type: %o\n",
-                       __FUNCTION__, id, type);
-                GOTO(cleanup, file = ERR_PTR(-EINVAL));
-        }
+        /* We only care about the side-effects, just drop the lock. */
+        if (rc == ELDLM_OK)
+                ldlm_lock_decref(&lockh, LCK_PW);
 
-        ffd = filter_ffd_new();
-        if (ffd == NULL) {
-                CERROR("obdfilter: out of memory\n");
-                GOTO(cleanup, file = ERR_PTR(-ENOMEM));
-        }
-
-        cleanup_phase = 1;
-
-        /* We preallocate this to avoid blocking while holding fo_fddlock */
-        OBD_ALLOC(fdd, sizeof *fdd);
-        if (fdd == NULL) {
-                CERROR("obdfilter: out of memory\n");
-                GOTO(cleanup, file = ERR_PTR(-ENOMEM));
-        }
-
-        cleanup_phase = 2;
-
-        dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
-        if (IS_ERR(dparent))
-                GOTO(cleanup, file = (void *)dparent);
-
-        cleanup_phase = 3;
-
-        len = snprintf(name, sizeof(name), LPU64, id);
-        dchild = ll_lookup_one_len(name, dparent, len);
-        if (IS_ERR(dchild))
-                GOTO(cleanup, file = (void *)dchild);
-
-        cleanup_phase = 4;
-
-        if (dchild->d_inode == NULL) {
-                CERROR("opening non-existent object %s - O_CREAT?\n", name);
-                /* dput(dchild); call filter_create_internal here */
-                file = ERR_PTR(-ENOENT);
-                GOTO(cleanup, file);
-        }
-
-        /* dentry_open does a dput(dchild) and mntput(mnt) on error */
-        mntget(filter->fo_vfsmnt);
-        file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
-        if (IS_ERR(file)) {
-                dchild = NULL; /* prevent a double dput in step 4 */
-                CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
-                GOTO(cleanup, file);
-        }
-
-        spin_lock(&filter->fo_fddlock);
-        if (dchild->d_fsdata) {
-                spin_unlock(&filter->fo_fddlock);
-                OBD_FREE(fdd, sizeof *fdd);
-                fdd = dchild->d_fsdata;
-                LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
-                /* should only happen during client recovery */
-                if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
-                        CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
-                atomic_inc(&fdd->fdd_open_count);
-        } else {
-                atomic_set(&fdd->fdd_open_count, 1);
-                fdd->fdd_magic = FILTER_DENTRY_MAGIC;
-                fdd->fdd_flags = 0;
-                fdd->fdd_objid = id;
-                /* If this is racy, then we can use {cmp}xchg and atomic_add */
-                dchild->d_fsdata = fdd;
-                spin_unlock(&filter->fo_fddlock);
-        }
-
-        ffd->ffd_file = file;
-        LASSERT(file->private_data == NULL);
-        file->private_data = ffd;
-
-        if (!dchild->d_op)
-                dchild->d_op = &filter_dops;
-        else
-                LASSERT(dchild->d_op == &filter_dops);
-
-        spin_lock(&fed->fed_lock);
-        list_add(&ffd->ffd_export_list, &fed->fed_open_head);
-        spin_unlock(&fed->fed_lock);
-
-        CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
-cleanup:
-        switch (cleanup_phase) {
-        case 4:
-                if (IS_ERR(file))
-                        f_dput(dchild);
-        case 3:
-                if (IS_ERR(file))
-                        filter_parent_unlock(dparent, parent_lockh,parent_mode);
-        case 2:
-                if (IS_ERR(file))
-                        OBD_FREE(fdd, sizeof *fdd);
-        case 1:
-                if (IS_ERR(file))
-                        filter_ffd_destroy(ffd);
-                filter_ffd_put(ffd);
-        case 0:
-                pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-        }
-        RETURN(file);
+        RETURN(rc);
 }
 
 /* Caller must hold LCK_PW on parent and push us into kernel context.
@@ -1143,42 +1040,10 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         ENTRY;
 
         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
-                CERROR("destroying objid %*s nlink = %d, count = %d\n",
+                CERROR("destroying objid %*s nlink = %lu, count = %d\n",
                        dchild->d_name.len, dchild->d_name.name,
-                       inode->i_nlink, atomic_read(&inode->i_count));
-        }
-
-        
-#if 0
-        /* Tell the clients that the object is gone now and that they should
-         * throw away any cached pages.  We don't need to wait until they're
-         * done, so just decref the lock right away and let ldlm_completion_ast
-         * clean up when it's all over. */
-        ldlm_cli_enqueue(..., LCK_PW, AST_INTENT_DESTROY, &lockh);
-        ldlm_lock_decref(&lockh, LCK_PW);
-#endif
-
-        if (0) {
-                struct lustre_handle lockh;
-                int flags = 0, rc;
-                struct ldlm_res_id res_id = { .name = { objid } };
-
-                /* This part is a wee bit iffy: we really only want to bust the
-                 * locks on our stripe, so that we don't end up bouncing
-                 * [0->EOF] locks around on each of the OSTs as the rest of the
-                 * destroys get processed.  Because we're only talking to
-                 * the local LDLM, though, we should only end up locking the 
-                 * whole of our stripe.  When bug 1425 (take all locks on OST
-                 * for stripe 0) is fixed, this code should be revisited. */
-                struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
-
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                                      res_id, LDLM_EXTENT, &extent,
-                                      sizeof(extent), LCK_PW, &flags,
-                                      ldlm_completion_ast, filter_blocking_ast,
-                                      NULL, &lockh);
-                /* We only care about the side-effects, just drop the lock. */
-                ldlm_lock_decref(&lockh, LCK_PW);
+                       (unsigned long)inode->i_nlink, 
+                       atomic_read(&inode->i_count));
         }
 
         rc = vfs_unlink(dparent->d_inode, dchild);
@@ -1190,150 +1055,43 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
         RETURN(rc);
 }
 
-/* If closing because we are failing this device, then
-   don't do the unlink on close.
-*/
-static int filter_close_internal(struct obd_export *exp,
-                                 struct filter_file_data *ffd,
-                                 struct obd_trans_info *oti, int flags)
-{
-        struct obd_device *obd = exp->exp_obd;
-        struct filter_obd *filter = &obd->u.filter;
-        struct file *filp = ffd->ffd_file;
-        struct dentry *dchild = dget(filp->f_dentry);
-        struct filter_dentry_data *fdd = dchild->d_fsdata;
-        struct lustre_handle parent_lockh;
-        int rc, rc2, cleanup_phase = 0;
-        struct dentry *dparent = NULL;
-        struct obd_run_ctxt saved;
-        int nested_trans = (current->journal_info != NULL);
-        ENTRY;
-
-        LASSERT(filp->private_data == ffd);
-        LASSERT(fdd != NULL);
-        LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
-
-        rc = filp_close(filp, 0);
-
-        if (atomic_dec_and_test(&fdd->fdd_open_count) &&
-            (fdd->fdd_flags & FILTER_FLAG_DESTROY) &&
-            !(flags & OBD_OPT_FAILOVER)) {
-                void *handle;
-
-                push_ctxt(&saved, &filter->fo_ctxt, NULL);
-                cleanup_phase = 1;
-
-                LASSERT(fdd->fdd_objid > 0);
-                dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
-                                             LCK_PW, &parent_lockh);
-                if (IS_ERR(dparent))
-                        GOTO(cleanup, rc = PTR_ERR(dparent));
-                cleanup_phase = 2;
-
-                handle = fsfilt_start(obd, dparent->d_inode,
-                                      FSFILT_OP_UNLINK_LOG, oti);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
-
-                if (oti != NULL) {
-                        if (oti->oti_handle == NULL)
-                                oti->oti_handle = handle;
-                        else
-                                LASSERT(oti->oti_handle == handle);
-                }
-
-#ifdef ENABLE_ORPHANS
-                /* Remove orphan unlink record from log */
-                llog_cancel_records(filter->fo_catalog, 1, &fdd->fdd_cookie);
-#endif
-                /* XXX unlink from PENDING directory now too */
-                rc2 = filter_destroy_internal(obd, fdd->fdd_objid, dparent,
-                                              dchild);
-                if (rc2 && !rc)
-                        rc = rc2;
-                rc = filter_finish_transno(exp, oti, rc);
-                rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
-                if (rc2) {
-                        CERROR("error on commit, err = %d\n", rc2);
-                        if (!rc)
-                                rc = rc2;
-                }
-                if (nested_trans == 0) {
-                        LASSERT(current->journal_info == NULL);
-                        if (oti != NULL)
-                                oti->oti_handle = NULL;
-                }
-        }
-
-cleanup:
-        switch(cleanup_phase) {
-        case 2:
-                if (rc || oti == NULL) {
-                        filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
-                } else {
-                        memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
-                               sizeof(parent_lockh));
-                        oti->oti_ack_locks[0].mode = LCK_PW;
-                }
-        case 1:
-                pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-        case 0:
-                f_dput(dchild);
-                filter_ffd_destroy(ffd);
-                break;
-        default:
-                CERROR("invalid cleanup_phase %d\n", cleanup_phase);
-                LBUG();
-        }
-
-        RETURN(rc);
-}
-
 /* mount the file system (secretly) */
 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
                         char *option)
 {
-        struct obd_ioctl_data* data = buf;
+        struct lustre_cfg* lcfg = buf;
         struct filter_obd *filter = &obd->u.filter;
         struct vfsmount *mnt;
         int rc = 0;
         ENTRY;
 
-        if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
+        dev_clear_rdonly(2);
+
+        if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2)
                 RETURN(-EINVAL);
 
-        obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
+        obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2);
         if (IS_ERR(obd->obd_fsops))
                 RETURN(PTR_ERR(obd->obd_fsops));
 
-        mnt = do_kern_mount(data->ioc_inlbuf2, MS_NOATIME | MS_NODIRATIME,
-                            data->ioc_inlbuf1, option);
+        mnt = do_kern_mount(lcfg->lcfg_inlbuf2, MS_NOATIME | MS_NODIRATIME,
+                            lcfg->lcfg_inlbuf1, option);
         rc = PTR_ERR(mnt);
         if (IS_ERR(mnt))
                 GOTO(err_ops, rc);
 
-        if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
-                if (*data->ioc_inlbuf3 == 'f') {
+        if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) {
+                if (*lcfg->lcfg_inlbuf3 == 'f') {
                         obd->obd_replayable = 1;
                         obd_sync_filter = 1;
-                        CERROR("%s: configured for recovery and sync write\n",
-                               obd->obd_name);
+                        CERROR("%s: recovery enabled\n", obd->obd_name);
                 } else {
-                        if (*data->ioc_inlbuf3 != 'n') {
+                        if (*lcfg->lcfg_inlbuf3 != 'n') {
                                 CERROR("unrecognised flag '%c'\n",
-                                       *data->ioc_inlbuf3);
+                                       *lcfg->lcfg_inlbuf3);
                         }
-                }
-        }
-
-        if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
-                if (*data->ioc_inlbuf4 == '/') {
-                        CERROR("filter namespace mount: %s\n",
-                               data->ioc_inlbuf4);
-                        filter->fo_nspath = strdup(data->ioc_inlbuf4);
-                } else {
-                        CERROR("namespace mount must be absolute path: '%s'\n",
-                               data->ioc_inlbuf4);
+                        // XXX Robert? Why do we get errors here
+                        // GOTO(err_mntput, rc = -EINVAL);
                 }
         }
 
@@ -1342,23 +1100,20 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         filter->fo_fstype = mnt->mnt_sb->s_type->name;
         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
 
-        OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
-        filter->fo_ctxt.pwdmnt = mnt;
-        filter->fo_ctxt.pwd = mnt->mnt_root;
-        filter->fo_ctxt.fs = get_ds();
+        OBD_SET_CTXT_MAGIC(&obd->obd_ctxt);
+        obd->obd_ctxt.pwdmnt = mnt;
+        obd->obd_ctxt.pwd = mnt->mnt_root;
+        obd->obd_ctxt.fs = get_ds();
+        obd->obd_ctxt.cb_ops = filter_lvfs_ops;
 
         rc = filter_prep(obd);
         if (rc)
                 GOTO(err_mntput, rc);
 
         spin_lock_init(&filter->fo_translock);
-        spin_lock_init(&filter->fo_fddlock);
         spin_lock_init(&filter->fo_objidlock);
         INIT_LIST_HEAD(&filter->fo_export_list);
-
-        ptlrpc_init_client(MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
-                           "filter_mdc", &filter->fo_mdc_client);
-        sema_init(&filter->fo_sem, 1);
+        sema_init(&filter->fo_alloc_lock, 1);
 
         obd->obd_namespace = ldlm_namespace_new("filter-tgt",
                                                 LDLM_NAMESPACE_SERVER);
@@ -1368,15 +1123,6 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        /* Create a non-replaying connection for recovery logging, so that
-         * we don't create a client entry for this local connection, and do
-         * not log or assign transaction numbers for logging operations. */
-#ifdef ENABLE_ORPHANS
-        filter->fo_catalog = filter_get_catalog(obd);
-        if (IS_ERR(filter->fo_catalog))
-                GOTO(err_post, rc = PTR_ERR(filter->fo_catalog));
-#endif
-
         RETURN(0);
 
 err_post:
@@ -1393,16 +1139,44 @@ err_ops:
 
 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
 {
-        struct obd_ioctl_data* data = buf;
+        struct lustre_cfg* lcfg = buf;
+        const char *str = NULL;
         char *option = NULL;
+        int n = 0;
+        int rc;
 
+        if (!strcmp(lcfg->lcfg_inlbuf2, "ext3")) {
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
         /* bug 1577: implement async-delete for 2.5 */
-        if (!strcmp(data->ioc_inlbuf2, "ext3"))
-                option = "asyncdel";
+                str = "errors=remount-ro,asyncdel";
+#else
+                str = "errors=remount-ro";
 #endif
+                n = strlen(str) + 1;
+                OBD_ALLOC(option, n);
+                if (option == NULL)
+                        RETURN(-ENOMEM);
+                strcpy(option, str);
+        }
 
-        return filter_common_setup(obd, len, buf, option);
+        rc = filter_common_setup(obd, len, buf, option);
+        if (option)
+                OBD_FREE(option, n);
+        return rc;
+}
+
+static int filter_postsetup(struct obd_device *obd)
+{
+        int rc = 0;
+        ENTRY;
+
+        // XXX add a storage location for the logid for size changes
+#ifdef ENABLE_ORPHANS
+        rc = llog_cat_initialize(obd, 1);
+        if (rc)
+                CERROR("failed to setup llogging subsystems\n");
+#endif
+        RETURN(rc);
 }
 
 static int filter_cleanup(struct obd_device *obd, int flags)
@@ -1423,11 +1197,7 @@ static int filter_cleanup(struct obd_device *obd, int flags)
                 }
         }
 
-#ifdef ENABLE_ORPHANS
-        filter_put_catalog(filter->fo_catalog);
-#endif
-
-        ldlm_namespace_free(obd->obd_namespace);
+        ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
 
         if (filter->fo_sb == NULL)
                 RETURN(0);
@@ -1438,7 +1208,8 @@ static int filter_cleanup(struct obd_device *obd, int flags)
         filter->fo_sb = 0;
 
         if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
-                CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
+                CERROR("%s: mount point %p busy, mnt_count: %d\n",
+                       obd->obd_name, filter->fo_vfsmnt,
                        atomic_read(&filter->fo_vfsmnt->mnt_count));
 
         unlock_kernel();
@@ -1448,6 +1219,8 @@ static int filter_cleanup(struct obd_device *obd, int flags)
         fsfilt_put_ops(obd->obd_fsops);
         lock_kernel();
 
+        dev_clear_rdonly(2);
+
         RETURN(0);
 }
 
@@ -1485,7 +1258,7 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
 {
         struct obd_export *exp;
         struct filter_export_data *fed;
-        struct filter_client_data *fcd;
+        struct filter_client_data *fcd = NULL;
         struct filter_obd *filter = &obd->u.filter;
         int rc;
         ENTRY;
@@ -1500,18 +1273,16 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
         LASSERT(exp != NULL);
 
         fed = &exp->exp_filter_data;
-        class_export_put(exp);
 
-        INIT_LIST_HEAD(&fed->fed_open_head);
         spin_lock_init(&fed->fed_lock);
 
         if (!obd->obd_replayable)
-                RETURN(0);
+                GOTO(cleanup, rc = 0);
 
         OBD_ALLOC(fcd, sizeof(*fcd));
         if (!fcd) {
                 CERROR("filter: out of memory for client data\n");
-                GOTO(out_export, rc = -ENOMEM);
+                GOTO(cleanup, rc = -ENOMEM);
         }
 
         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
@@ -1519,53 +1290,48 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
 
         rc = filter_client_add(obd, filter, fed, -1);
-        if (rc)
-                GOTO(out_fcd, rc);
 
-        RETURN(rc);
+cleanup:
+        if (rc) {
+                if (fcd)
+                        OBD_FREE(fcd, sizeof(*fcd));
+                class_disconnect(exp, 0);
+        } else {
+                class_export_put(exp);
+        }
+        return rc;
+}
 
-out_fcd:
-        OBD_FREE(fcd, sizeof(*fcd));
-out_export:
-        class_disconnect(conn, 0);
+static int filter_precleanup(struct obd_device *obd, int flags)
+{
+        int rc = 0;
+        ENTRY;
+
+#ifdef ENABLE_ORPHANS
+        rc = obd_llog_finish(obd, 0);
+        if (rc)
+                CERROR("failed to cleanup llogging subsystem\n");
+#endif
 
         RETURN(rc);
 }
 
-static void filter_destroy_export(struct obd_export *exp)
+static int filter_destroy_export(struct obd_export *exp)
 {
-        struct filter_export_data *fed = &exp->exp_filter_data;
-
         ENTRY;
-        spin_lock(&fed->fed_lock);
-        while (!list_empty(&fed->fed_open_head)) {
-                struct filter_file_data *ffd;
 
-                ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
-                                 ffd_export_list);
-                list_del(&ffd->ffd_export_list);
-                spin_unlock(&fed->fed_lock);
-
-                CDEBUG(D_INFO, "force close file %*s (hdl %p:"LPX64") on "
-                       "disconnect\n", ffd->ffd_file->f_dentry->d_name.len,
-                       ffd->ffd_file->f_dentry->d_name.name,
-                       ffd, ffd->ffd_handle.h_cookie);
-
-                filter_close_internal(exp, ffd, NULL, exp->exp_flags);
-                spin_lock(&fed->fed_lock);
-        }
-        spin_unlock(&fed->fed_lock);
+        target_destroy_export(exp);
 
         if (exp->exp_obd->obd_replayable)
                 filter_client_free(exp, exp->exp_flags);
-        EXIT;
+        RETURN(0);
 }
 
 /* also incredibly similar to mds_disconnect */
-static int filter_disconnect(struct lustre_handle *conn, int flags)
+static int filter_disconnect(struct obd_export *exp, int flags)
 {
-        struct obd_export *exp = class_conn2export(conn);
         unsigned long irqflags;
+        struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
@@ -1576,11 +1342,14 @@ static int filter_disconnect(struct lustre_handle *conn, int flags)
         exp->exp_flags = flags;
         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
 
-        rc = class_disconnect(conn, flags);
-
         fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
-        class_export_put(exp);
         /* XXX cleanup preallocated inodes */
+
+        /* flush any remaining cancel messages out to the target */
+        ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+        llog_sync(ctxt, exp);
+
+        rc = class_disconnect(exp, flags);
         RETURN(rc);
 }
 
@@ -1588,33 +1357,19 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
                                   struct obdo *oa, const char *what)
 {
         struct dentry *dchild = NULL;
+        obd_gr group = 0;
 
-        if (oa->o_valid & OBD_MD_FLHANDLE) {
-                struct lustre_handle *ost_handle = obdo_handle(oa);
-                struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
+        if (oa->o_valid & OBD_MD_FLGROUP)
+                group = oa->o_gr;
 
-                if (ffd != NULL) {
-                        struct filter_dentry_data *fdd;
-                        dchild = dget(ffd->ffd_file->f_dentry);
-                        fdd = dchild->d_fsdata;
-                        LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
-                        filter_ffd_put(ffd);
-
-                        CDEBUG(D_INODE,"%s got child objid %*s: %p, count %d\n",
-                               what, dchild->d_name.len, dchild->d_name.name,
-                               dchild, atomic_read(&dchild->d_count));
-                }
-        }
-
-        if (!dchild)
-                dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
+        dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
 
         if (IS_ERR(dchild)) {
                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
                 RETURN(dchild);
         }
 
-        if (!dchild->d_inode) {
+        if (dchild->d_inode == NULL) {
                 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
                 f_dput(dchild);
                 RETURN(ERR_PTR(-ENOENT));
@@ -1623,7 +1378,7 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
         return dchild;
 }
 
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
+static int filter_getattr(struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *md)
 {
         struct dentry *dentry = NULL;
@@ -1631,9 +1386,10 @@ static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
         int rc = 0;
         ENTRY;
 
-        obd = class_conn2obd(conn);
+        obd = class_exp2obd(exp);
         if (obd == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
+                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+                       exp->exp_handle.h_cookie);
                 RETURN(-EINVAL);
         }
 
@@ -1650,11 +1406,10 @@ static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
 }
 
 /* this is called from filter_truncate() until we have filter_punch() */
-static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
+static int filter_setattr(struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *md, struct obd_trans_info *oti)
 {
         struct obd_run_ctxt saved;
-        struct obd_export *exp;
         struct filter_obd *filter;
         struct dentry *dentry;
         struct iattr iattr;
@@ -1663,33 +1418,32 @@ static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
         ENTRY;
 
         LASSERT(oti != NULL);
-        exp = class_conn2export(conn);
-        if (!exp) {
-                CERROR("invalid client cookie "LPX64"\n", conn->cookie);
-                RETURN(-EINVAL);
-        }
 
         dentry = filter_oa2dentry(exp->exp_obd, oa);
         if (IS_ERR(dentry))
-                GOTO(out_exp, rc = PTR_ERR(dentry));
+                RETURN(PTR_ERR(dentry));
 
         filter = &exp->exp_obd->u.filter;
 
         iattr_from_obdo(&iattr, oa, oa->o_valid);
 
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
+        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
         lock_kernel();
 
-        /* XXX this could be a rwsem instead, if filter_preprw played along */
         if (iattr.ia_valid & ATTR_SIZE)
                 down(&dentry->d_inode->i_sem);
-
         handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
                               oti);
         if (IS_ERR(handle))
                 GOTO(out_unlock, rc = PTR_ERR(handle));
 
-        rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
+        /* XXX this could be a rwsem instead, if filter_preprw played along */
+        if (iattr.ia_valid & ATTR_ATTR_FLAG)
+                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
+                                      EXT3_IOC_SETFLAGS,
+                                      (long)&iattr.ia_attr_flags);
+        else
+                rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
         rc = filter_finish_transno(exp, oti, rc);
         rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
         if (rc2) {
@@ -1698,260 +1452,302 @@ static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
                         rc = rc2;
         }
 
-        if (iattr.ia_valid & ATTR_SIZE)
-                up(&dentry->d_inode->i_sem);
-
         oa->o_valid = OBD_MD_FLID;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
 out_unlock:
+        if (iattr.ia_valid & ATTR_SIZE)
+                up(&dentry->d_inode->i_sem);
         unlock_kernel();
-        pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+        pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
 
         f_dput(dentry);
- out_exp:
-        class_export_put(exp);
         RETURN(rc);
 }
 
-static int filter_open(struct lustre_handle *conn, struct obdo *oa,
-                       struct lov_stripe_md *ea, struct obd_trans_info *oti,
-                       struct obd_client_handle *och)
+/* XXX identical to osc_unpackmd */
+static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
+                           struct lov_mds_md *lmm, int lmm_bytes)
 {
-        struct obd_export *exp;
-        struct lustre_handle *handle;
-        struct filter_file_data *ffd;
-        struct file *filp;
-        struct lustre_handle parent_lockh;
-        int rc = 0;
+        int lsm_size;
         ENTRY;
 
-        exp = class_conn2export(conn);
-        if (exp == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
-                RETURN(-EINVAL);
+        if (lmm != NULL) {
+                if (lmm_bytes < sizeof (*lmm)) {
+                        CERROR("lov_mds_md too small: %d, need %d\n",
+                               lmm_bytes, (int)sizeof(*lmm));
+                        RETURN(-EINVAL);
+                }
+                /* XXX LOV_MAGIC etc check? */
+
+                if (lmm->lmm_object_id == cpu_to_le64(0)) {
+                        CERROR("lov_mds_md: zero lmm_object_id\n");
+                        RETURN(-EINVAL);
+                }
         }
 
-        filp = filter_obj_open(exp, oti, oa->o_id, oa->o_mode,
-                               LCK_PR, &parent_lockh);
-        if (IS_ERR(filp))
-                GOTO(out, rc = PTR_ERR(filp));
+        lsm_size = lov_stripe_md_size(1);
+        if (lsmp == NULL)
+                RETURN(lsm_size);
 
-        oa->o_valid = OBD_MD_FLID;
-        obdo_from_inode(oa, filp->f_dentry->d_inode, FILTER_VALID_FLAGS);
+        if (*lsmp != NULL && lmm == NULL) {
+                OBD_FREE(*lsmp, lsm_size);
+                *lsmp = NULL;
+                RETURN(0);
+        }
 
-        ffd = filp->private_data;
-        handle = obdo_handle(oa);
-        handle->cookie = ffd->ffd_handle.h_cookie;
-        oa->o_valid |= OBD_MD_FLHANDLE;
+        if (*lsmp == NULL) {
+                OBD_ALLOC(*lsmp, lsm_size);
+                if (*lsmp == NULL)
+                        RETURN(-ENOMEM);
 
-out:
-        class_export_put(exp);
-        if (!rc) {
-                memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
-                       sizeof(parent_lockh));
-                oti->oti_ack_locks[0].mode = LCK_PR;
+                loi_init((*lsmp)->lsm_oinfo);
         }
-        RETURN(rc);
+
+        if (lmm != NULL) {
+                /* XXX zero *lsmp? */
+                (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
+                LASSERT((*lsmp)->lsm_object_id);
+        }
+
+        (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
+        RETURN(lsm_size);
 }
 
-static int filter_close(struct lustre_handle *conn, struct obdo *oa,
-                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
+static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
+                                      struct filter_obd *filter)
 {
-        struct obd_export *exp;
-        struct filter_file_data *ffd;
-        struct filter_export_data *fed;
-        int rc;
+        struct obdo doa; /* XXX obdo on stack */
+        __u64 last, id;
         ENTRY;
+        LASSERT(oa);
 
-        exp = class_conn2export(conn);
-        if (exp == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
-                RETURN(-EINVAL);
+        memset(&doa, 0, sizeof(doa));
+        if (oa->o_valid & OBD_MD_FLGROUP)
+                doa.o_gr = oa->o_gr;
+        else
+                doa.o_gr = 0;
+        doa.o_mode = S_IFREG;
+        last = filter_last_id(filter, &doa); /* FIXME: object groups */
+        CWARN("deleting orphan objects from "LPU64" to "LPU64"\n",
+               oa->o_id + 1, last);
+        for (id = oa->o_id + 1; id <= last; id++) {
+                doa.o_id = id;
+                filter_destroy(exp, &doa, NULL, NULL);
         }
+        spin_lock(&filter->fo_objidlock);
+        filter->fo_last_objids[0] = oa->o_id; /* FIXME: object groups */
+        spin_unlock(&filter->fo_objidlock);
+        EXIT;
+}
 
-        if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
-                CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
-                GOTO(out, rc = -EINVAL);
-        }
+/* returns a negative error or a nonnegative number of files to create */
+static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
+                                   int group)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct filter_obd *filter = &obd->u.filter;
+        int diff, rc;
+        ENTRY;
 
-        ffd = filter_handle2ffd(obdo_handle(oa));
-        if (ffd == NULL) {
-                CERROR("bad handle ("LPX64") for close\n",
-                       obdo_handle(oa)->cookie);
-                GOTO(out, rc = -ESTALE);
-        }
+        /* only precreate if group == 0 and o_id is specfied */
+        if (group != 0 || oa->o_id == 0)
+                RETURN(1);
 
-        fed = &exp->exp_filter_data;
-        spin_lock(&fed->fed_lock);
-        list_del(&ffd->ffd_export_list);
-        spin_unlock(&fed->fed_lock);
+        diff = oa->o_id - filter_last_id(filter, oa);
+        CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
+               filter_last_id(filter, oa), diff);
+        if (diff >= 0)
+                RETURN(diff);
 
-        oa->o_valid = OBD_MD_FLID;
-        obdo_from_inode(oa,ffd->ffd_file->f_dentry->d_inode,FILTER_VALID_FLAGS);
+        if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+            !(oa->o_flags & OBD_FL_DELORPHAN)) {
+                CERROR("filter asked to delete %d objects, but DELORPHAN flag "
+                       "isn't set!\n", -diff);
+                RETURN(0);
+        }
 
-        rc = filter_close_internal(exp, ffd, oti, 0);
-        filter_ffd_put(ffd);
-        GOTO(out, rc);
- out:
-        class_export_put(exp);
-        return rc;
+        /* delete orphan request */
+        filter_destroy_precreated(exp, oa, filter);
+        rc = filter_update_last_objid(obd, group, 0);
+        if (rc)
+                CERROR("unable to write lastobjid, but orphans were deleted\n");
+        RETURN(rc);
 }
 
-static int filter_create(struct lustre_handle *conn, struct obdo *oa,
-                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
+/* We rely on the fact that only one thread will be creating files in a given
+ * group at a time, which is why we don't need an atomic filter_get_new_id.
+ * Even if we had that atomic function, the following race would exist:
+ *
+ * thread 1: gets id x from filter_next_id
+ * thread 2: gets id (x + 1) from filter_next_id
+ * thread 2: creates object (x + 1)
+ * thread 1: tries to create object x, gets -ENOSPC
+ */
+static int filter_precreate(struct obd_device *obd, struct obdo *oa,
+                            obd_gr group, int *num)
 {
-        struct obd_export *exp;
-        struct obd_device *obd;
-        struct filter_obd *filter;
-        struct obd_run_ctxt saved;
         struct lustre_handle parent_lockh;
-        struct dentry *dparent;
-        struct ll_fid mds_fid = { .id = 0 };
         struct dentry *dchild = NULL;
+        struct filter_obd *filter;
+        struct dentry *dparent;
+        struct iattr attr;
+        int err = 0, rc = 0, i;
+        __u64 next_id;
         void *handle;
-        int err, rc, cleanup_phase;
         ENTRY;
 
-        exp = class_conn2export(conn);
-        if (exp == NULL) {
-                CDEBUG(D_IOCTL,"invalid client cookie "LPX64"\n", conn->cookie);
-                RETURN(-EINVAL);
-        }
-
-        obd = exp->exp_obd;
         filter = &obd->u.filter;
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
- retry:
-        oa->o_id = filter_next_id(filter);
 
-        cleanup_phase = 0;
-        dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
-                                     &parent_lockh);
-        if (IS_ERR(dparent))
-                GOTO(cleanup, rc = PTR_ERR(dparent));
-        cleanup_phase = 1;
+        for (i = 0; i < *num && err == 0; i++) {
+                next_id = filter_last_id(filter, NULL) + 1;
+                CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
 
-        dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
-        if (IS_ERR(dchild))
-                GOTO(cleanup, rc = PTR_ERR(dchild));
-        if (dchild->d_inode) {
-                /* This would only happen if lastobjid was bad on disk */
-                CERROR("Serious error: objid %*s already exists; is this "
-                       "filesystem corrupt?  I will try to work around it.\n",
-                       dchild->d_name.len, dchild->d_name.name);
+                dparent = filter_parent_lock(obd, group, next_id, LCK_PW,
+                                             &parent_lockh);
+                if (IS_ERR(dparent)) {
+                        rc = PTR_ERR(dparent);
+                        break;
+                }
+
+                dchild = filter_fid2dentry(obd, dparent, group, next_id);
+                if (IS_ERR(dchild))
+                        GOTO(cleanup_lock, rc = PTR_ERR(dchild));
+
+                if (dchild->d_inode != NULL) {
+                        /* This would only happen if lastobjid was bad on disk*/
+                        CERROR("Serious error: objid %*s already exists; is "
+                               "this filesystem corrupt?\n",
+                               dchild->d_name.len, dchild->d_name.name);
+                        GOTO(cleanup_dchild, rc = -EEXIST);
+                }
+
+                handle = fsfilt_start(obd, dparent->d_inode,
+                                      FSFILT_OP_CREATE_LOG, NULL);
+                if (IS_ERR(handle))
+                        GOTO(cleanup_dchild, rc = PTR_ERR(handle));
+
+                rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+                if (rc) {
+                        CERROR("create failed rc = %d\n", rc);
+                        GOTO(cleanup_commit, rc);
+                } else if (oa != NULL &&
+                           (oa->o_valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME |
+                                           OBD_MD_FLSIZE))) {
+                        iattr_from_obdo(&attr, oa, oa->o_valid);
+                        rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
+                        if (rc)
+                                CERROR("create setattr failed rc = %d\n", rc);
+                }
+                filter_set_last_id(filter, NULL, next_id);
+                err = filter_update_last_objid(obd, group, 0);
+                if (err)
+                        CERROR("unable to write lastobjid but file created\n");
+        cleanup_commit:
+                err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
+                if (err) {
+                        CERROR("error on commit, err = %d\n", err);
+                        if (!rc)
+                                rc = err;
+                }
+                if (dchild->d_inode != NULL && oa != NULL)
+                        obdo_from_inode(oa, dchild->d_inode,
+                                        FILTER_VALID_FLAGS);
+        cleanup_dchild:
                 f_dput(dchild);
+        cleanup_lock:
                 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
-                goto retry;
+                if (rc)
+                        break;
+                oa = NULL; /* oa applies for first iteration only */
         }
+        *num = i;
 
-        cleanup_phase = 2;
-        handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE_LOG, oti);
-        if (IS_ERR(handle))
-                GOTO(cleanup, rc = PTR_ERR(handle));
+        CDEBUG(D_INFO, "filter_precreate() created %d objects\n", i);
+        RETURN(rc);
+}
 
-        rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
-        if (rc) {
-                CERROR("create failed rc = %d\n", rc);
-        } else if (oa->o_valid & (OBD_MD_FLCTIME|OBD_MD_FLMTIME|OBD_MD_FLSIZE)){
-                struct iattr attr;
+static int filter_create(struct obd_export *exp, struct obdo *oa,
+                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        struct obd_device *obd = NULL;
+        struct obd_run_ctxt saved;
+        struct lov_stripe_md *lsm = NULL;
+        obd_gr group = 0;
+        int rc = 0, diff;
+        ENTRY;
 
-                iattr_from_obdo(&attr, oa, oa->o_valid);
-                rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
-                if (rc)
-                        CERROR("create setattr failed rc = %d\n", rc);
-        }
-        rc = filter_finish_transno(exp, oti, rc);
-        err = filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                        filter->fo_fsd);
-        if (err)
-                CERROR("unable to write lastobjid but file created\n");
-
-        /* Set flags for fields we have set in the inode struct */
-        if (!rc && mds_fid.id && (oa->o_valid & OBD_MD_FLCOOKIE)) {
-                err = filter_log_op_create(obd->u.filter.fo_catalog, &mds_fid,
-                                           dchild->d_inode->i_ino,
-                                           dchild->d_inode->i_generation,
-                                           oti->oti_logcookies);
-                if (err) {
-                        CERROR("error logging create record: rc %d\n", err);
-                        oa->o_valid = OBD_MD_FLID;
-                } else {
-                        oa->o_valid = OBD_MD_FLID | OBD_MD_FLCOOKIE;
+        if (oa->o_valid & OBD_MD_FLGROUP)
+                group = oa->o_gr;
+
+        CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
+               group, oa->o_id);
+        if (ea != NULL) {
+                lsm = *ea;
+                if (lsm == NULL) {
+                        rc = obd_alloc_memmd(exp, &lsm);
+                        if (rc < 0)
+                                RETURN(rc);
                 }
-        } else
-                oa->o_valid = OBD_MD_FLID;
-
-        err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
-        if (err) {
-                CERROR("error on commit, err = %d\n", err);
-                if (!rc)
-                        rc = err;
         }
 
-        if (rc)
-                GOTO(cleanup, rc);
+        obd = exp->exp_obd;
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
 
-        /* Set flags for fields we have set in the inode struct */
-        obdo_from_inode(oa, dchild->d_inode, FILTER_VALID_FLAGS);
+        diff = filter_should_precreate(exp, oa, group);
+        if (diff > 0) {
+                oa->o_id = filter_last_id(&obd->u.filter, oa);
+                rc = filter_precreate(obd, oa, group, &diff);
+                oa->o_id += diff;
+                oa->o_valid = OBD_MD_FLID;
+        }
 
-        EXIT;
-cleanup:
-        switch(cleanup_phase) {
-        case 2:
-                f_dput(dchild);
-        case 1: /* locked parent dentry */
-                if (rc || oti == NULL) {
-                        filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
-                } else {
-                        memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
-                               sizeof(parent_lockh));
-                        oti->oti_ack_locks[0].mode = LCK_PW;
-                }
-        case 0:
-                pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-                class_export_put(exp);
-                break;
-        default:
-                CERROR("invalid cleanup_phase %d\n", cleanup_phase);
-                LBUG();
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        if (rc && ea != NULL && *ea != lsm) {
+                obd_free_memmd(exp, &lsm);
+        } else if (rc == 0 && ea != NULL) {
+                /* XXX LOV STACKING: the lsm that is passed to us from
+                 * LOV does not have valid lsm_oinfo data structs, so
+                 * don't go touching that.  This needs to be fixed in a
+                 * big way. */
+                lsm->lsm_object_id = oa->o_id;
+                *ea = lsm;
         }
 
         RETURN(rc);
 }
 
-static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
+static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
 {
-        struct obd_export *exp;
         struct obd_device *obd;
         struct filter_obd *filter;
         struct dentry *dchild = NULL, *dparent = NULL;
-        struct filter_dentry_data *fdd;
         struct obd_run_ctxt saved;
         void *handle = NULL;
         struct lustre_handle parent_lockh;
         struct llog_cookie *fcc = NULL;
-        int rc, rc2, cleanup_phase = 0;
+        int rc, rc2, cleanup_phase = 0, have_prepared = 0;
+        obd_gr group = 0;
         ENTRY;
 
-        exp = class_conn2export(conn);
-        if (exp == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
-                RETURN(-EINVAL);
-        }
+        if (oa->o_valid & OBD_MD_FLGROUP)
+                group = oa->o_gr;
 
         obd = exp->exp_obd;
         filter = &obd->u.filter;
 
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
-        dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
-                                     LCK_PW, &parent_lockh);
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+ acquire_locks:
+        dparent = filter_parent_lock(obd, group, oa->o_id, LCK_PW,
+                                     &parent_lockh);
         if (IS_ERR(dparent))
                 GOTO(cleanup, rc = PTR_ERR(dparent));
         cleanup_phase = 1;
 
-        dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
+        dchild = filter_fid2dentry(obd, dparent, group, oa->o_id);
         if (IS_ERR(dchild))
                 GOTO(cleanup, rc = -ENOENT);
         cleanup_phase = 2;
@@ -1960,48 +1756,48 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
                 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
                 GOTO(cleanup, rc = -ENOENT);
         }
+
+        if (!have_prepared) {
+                /* If we're really going to destroy the object, get ready
+                 * by getting the clients to discard their cached data.
+                 *
+                 * We have to drop the parent lock, because
+                 * filter_prepare_destroy will acquire a PW on the object, and
+                 * we don't want to deadlock with an incoming write to the
+                 * object, which has the extent PW and then wants to get the
+                 * parent dentry to do the lookup.
+                 *
+                 * We dput the child because it's not worth the extra
+                 * complication of condition the above code to skip it on the
+                 * second time through. */
+                f_dput(dchild);
+                filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+
+                filter_prepare_destroy(obd, oa->o_id);
+                have_prepared = 1;
+                goto acquire_locks;
+        }
+
         handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK_LOG, oti);
         if (IS_ERR(handle))
                 GOTO(cleanup, rc = PTR_ERR(handle));
         cleanup_phase = 3;
 
-        fdd = dchild->d_fsdata;
-
         /* Our MDC connection is established by the MDS to us */
-        if ((oa->o_valid & OBD_MD_FLCOOKIE) && filter->fo_mdc_imp != NULL) {
+        if (oa->o_valid & OBD_MD_FLCOOKIE) {
                 OBD_ALLOC(fcc, sizeof(*fcc));
                 if (fcc != NULL)
                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
         }
 
-        if (fdd != NULL && atomic_read(&fdd->fdd_open_count)) {
-                LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
-                if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
-                        fdd->fdd_flags |= FILTER_FLAG_DESTROY;
-
-#ifdef ENABLE_ORPHANS
-                        filter_log_op_orphan(filter->fo_catalog, oa->o_id,
-                                             oa->o_generation,&fdd->fdd_cookie);
-#endif
-                        CDEBUG(D_INODE,
-                               "defer destroy of %dx open objid "LPU64"\n",
-                               atomic_read(&fdd->fdd_open_count), oa->o_id);
-                } else {
-                        CDEBUG(D_INODE,
-                               "repeat destroy of %dx open objid "LPU64"\n",
-                               atomic_read(&fdd->fdd_open_count), oa->o_id);
-                }
-                GOTO(cleanup, rc = 0);
-        }
-
         rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
 
 cleanup:
         switch(cleanup_phase) {
         case 3:
                 if (fcc != NULL)
-                        fsfilt_set_last_rcvd(obd, 0, oti->oti_handle,
-                                             filter_cancel_cookies_cb, fcc);
+                        fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
+                                              filter_cancel_cookies_cb, fcc);
                 rc = filter_finish_transno(exp, oti, rc);
                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
                 if (rc2) {
@@ -2020,8 +1816,7 @@ cleanup:
                         oti->oti_ack_locks[0].mode = LCK_PW;
                 }
         case 0:
-                pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-                class_export_put(exp);
+                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
                 break;
         default:
                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
@@ -2032,7 +1827,7 @@ cleanup:
 }
 
 /* NB start and end are used for punch, but not truncate */
-static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
+static int filter_truncate(struct obd_export *exp, struct obdo *oa,
                            struct lov_stripe_md *lsm,
                            obd_off start, obd_off end,
                            struct obd_trans_info *oti)
@@ -2047,15 +1842,57 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
         oa->o_size = start;
-        error = filter_setattr(conn, oa, NULL, oti);
+        error = filter_setattr(exp, oa, NULL, oti);
         RETURN(error);
 }
 
-static int filter_syncfs(struct obd_export *exp)
+static int filter_sync(struct obd_export *exp, struct obdo *oa,
+                       struct lov_stripe_md *lsm, obd_off start, obd_off end)
 {
+        struct obd_run_ctxt saved;
+        struct filter_obd *filter;
+        struct dentry *dentry;
+        int rc, rc2;
         ENTRY;
 
-        RETURN(fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb));
+        filter = &exp->exp_obd->u.filter;
+
+        /* an objid of zero is taken to mean "sync whole filesystem" */
+        if (!oa || !oa->o_valid & OBD_MD_FLID) {
+                rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
+                GOTO(out_exp, rc);
+        }
+
+        dentry = filter_oa2dentry(exp->exp_obd, oa);
+        if (IS_ERR(dentry))
+                GOTO(out_exp, rc = PTR_ERR(dentry));
+
+        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+
+        down(&dentry->d_inode->i_sem);
+        rc = filemap_fdatasync(dentry->d_inode->i_mapping);
+        if (rc == 0) {
+                /* just any file to grab fsync method - "file" arg unused */
+                struct file *file = filter->fo_rcvd_filp;
+
+                if (file->f_op && file->f_op->fsync)
+                        rc = file->f_op->fsync(NULL, dentry, 1);
+
+                rc2 = filemap_fdatawait(dentry->d_inode->i_mapping);
+                if (!rc)
+                        rc = rc2;
+        }
+        up(&dentry->d_inode->i_sem);
+
+        oa->o_valid = OBD_MD_FLID;
+        obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
+
+        pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+
+        f_dput(dentry);
+out_exp:
+        class_export_put(exp);
+        RETURN(rc);
 }
 
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
@@ -2065,16 +1902,16 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
 }
 
-static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
+static int filter_get_info(struct obd_export *exp, __u32 keylen,
                            void *key, __u32 *vallen, void *val)
 {
         struct obd_device *obd;
         ENTRY;
 
-        obd = class_conn2obd(conn);
+        obd = class_exp2obd(exp);
         if (obd == NULL) {
                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
-                       conn->cookie);
+                       exp->exp_handle.h_cookie);
                 RETURN(-EINVAL);
         }
 
@@ -2094,22 +1931,33 @@ static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
                 RETURN(0);
         }
 
+        if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
+                obd_id *last_id = val;
+                /* FIXME: object groups */
+                *last_id = filter_last_id(&obd->u.filter, 0);
+                RETURN(0);
+        }
         CDEBUG(D_IOCTL, "invalid key\n");
         RETURN(-EINVAL);
 }
 
-static int filter_set_info(struct lustre_handle *conn, __u32 keylen,
+static int filter_set_info(struct obd_export *exp, __u32 keylen,
                            void *key, __u32 vallen, void *val)
 {
         struct obd_device *obd;
-        struct obd_export *exp;
-        struct obd_import *imp;
+        struct lustre_handle conn;
+#ifdef ENABLE_ORPHANS
+        struct llog_ctxt *ctxt;
+#endif
+        int rc = 0;
         ENTRY;
 
-        obd = class_conn2obd(conn);
+        conn.cookie = exp->exp_handle.h_cookie;
+
+        obd = exp->exp_obd;
         if (obd == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
-                       conn->cookie);
+                CDEBUG(D_IOCTL, "invalid exp %p cookie "LPX64"\n",
+                       exp, conn.cookie);
                 RETURN(-EINVAL);
         }
 
@@ -2117,29 +1965,21 @@ static int filter_set_info(struct lustre_handle *conn, __u32 keylen,
             memcmp(key, "mds_conn", keylen) != 0)
                 RETURN(-EINVAL);
 
-        CERROR("Received MDS connection ("LPX64")\n", conn->cookie);
-        memcpy(&obd->u.filter.fo_mdc_conn, conn, sizeof(*conn));
-
-        imp = obd->u.filter.fo_mdc_imp = class_new_import();
-
-        exp = class_conn2export(conn);
-        imp->imp_connection = ptlrpc_connection_addref(exp->exp_connection);
-        class_export_put(exp);
-
-        imp->imp_client = &obd->u.filter.fo_mdc_client;
-        imp->imp_remote_handle = *conn;
-        imp->imp_obd = obd;
-        imp->imp_dlm_fake = 1; /* XXX rename imp_dlm_fake to something else */
-        imp->imp_level = LUSTRE_CONN_FULL;
-        class_import_put(imp);
-
-        RETURN(0);
+        CWARN("Received MDS connection ("LPX64")\n", conn.cookie);
+        memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn));
+#ifdef ENABLE_ORPHANS
+        ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
+        rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+#endif
+        RETURN(rc);
 }
 
-int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
+int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
                      int len, void *karg, void *uarg)
 {
-        struct obd_device *obd = class_conn2obd(conn);
+        struct obd_device *obd = exp->exp_obd;
+        struct obd_ioctl_data *data = karg;
+        int rc = 0;
 
         switch (cmd) {
         case OBD_IOC_ABORT_RECOVERY:
@@ -2147,12 +1987,106 @@ int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
                 target_abort_recovery(obd);
                 RETURN(0);
 
+        case OBD_IOC_SET_READONLY: {
+                void *handle;
+                struct super_block *sb = obd->u.filter.fo_sb;
+                struct inode *inode = sb->s_root->d_inode;
+                BDEVNAME_DECLARE_STORAGE(tmp);
+                CERROR("setting device %s read-only\n",
+                       ll_bdevname(sb, tmp));
+                
+                handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
+                LASSERT(handle);
+                (void)fsfilt_commit(obd, inode, handle, 1);
+
+                dev_set_rdonly(ll_sbdev(obd->u.filter.fo_sb), 2);
+                RETURN(0);
+        }
+
+        case OBD_IOC_CATLOGLIST: {
+                rc = llog_catlog_list(obd, 1, data);
+                RETURN(rc);
+        }
+
+        case OBD_IOC_LLOG_CANCEL:
+        case OBD_IOC_LLOG_REMOVE: 
+        case OBD_IOC_LLOG_INFO:
+        case OBD_IOC_LLOG_PRINT: {
+                /* FIXME to be finished */
+                RETURN(-EOPNOTSUPP);
+/*
+                struct llog_ctxt *ctxt = NULL;
+                
+                push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+                rc = llog_ioctl(ctxt, cmd, data);
+                pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+                
+                RETURN(rc);
+*/
+        }
+
+
         default:
                 RETURN(-EINVAL);
         }
         RETURN(0);
 }
 
+static struct llog_operations filter_unlink_repl_logops;
+static struct llog_operations filter_size_orig_logops = {
+        lop_setup: llog_obd_origin_setup,
+        lop_cleanup: llog_obd_origin_cleanup,
+        lop_add: llog_obd_origin_add
+};
+
+static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
+                            int count, struct llog_logid *logid) 
+{
+        struct llog_ctxt *ctxt;
+        int rc;
+        ENTRY;
+        
+        filter_unlink_repl_logops = llog_client_ops;
+        filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel;
+        filter_unlink_repl_logops.lop_connect = llog_repl_connect;
+        filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync;
+
+        rc = llog_setup(obd, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
+                        &filter_unlink_repl_logops);
+        if (rc)
+                RETURN(rc);
+        /* FIXME - assign unlink_cb for filter's recovery */
+        ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
+        ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
+
+        rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+                        &filter_size_orig_logops);
+        RETURN(rc);
+}
+
+static int filter_llog_finish(struct obd_device *obd, int count)
+{
+        int rc;
+        ENTRY;
+        
+        rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_REPL_CTXT));
+        if (rc)
+                RETURN(rc);
+
+        rc = llog_cleanup(llog_get_context(obd, LLOG_SIZE_ORIG_CTXT));
+        RETURN(rc);
+}
+
+static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
+                                             void *data)
+{
+        return filter_fid2dentry(data, NULL, gr, id);
+}
+
+static struct lvfs_callback_ops filter_lvfs_ops = {
+        l_fid2dentry:     filter_lvfs_fid2dentry,
+};
+
 static struct obd_ops filter_obd_ops = {
         o_owner:          THIS_MODULE,
         o_attach:         filter_attach,
@@ -2160,23 +2094,25 @@ static struct obd_ops filter_obd_ops = {
         o_get_info:       filter_get_info,
         o_set_info:       filter_set_info,
         o_setup:          filter_setup,
+        o_postsetup:      filter_postsetup,
+        o_precleanup:     filter_precleanup,
         o_cleanup:        filter_cleanup,
         o_connect:        filter_connect,
         o_disconnect:     filter_disconnect,
         o_statfs:         filter_statfs,
-        o_syncfs:         filter_syncfs,
         o_getattr:        filter_getattr,
+        o_unpackmd:       filter_unpackmd,
         o_create:         filter_create,
         o_setattr:        filter_setattr,
         o_destroy:        filter_destroy,
-        o_open:           filter_open,
-        o_close:          filter_close,
         o_brw:            filter_brw,
         o_punch:          filter_truncate,
+        o_sync:           filter_sync,
         o_preprw:         filter_preprw,
         o_commitrw:       filter_commitrw,
-        o_log_cancel:     filter_log_cancel,
         o_destroy_export: filter_destroy_export,
+        o_llog_init:      filter_llog_init,
+        o_llog_finish:    filter_llog_finish,
         o_iocontrol:      filter_iocontrol,
 };
 
@@ -2187,23 +2123,25 @@ static struct obd_ops filter_sanobd_ops = {
         o_get_info:       filter_get_info,
         o_set_info:       filter_set_info,
         o_setup:          filter_san_setup,
+        o_precleanup:     filter_precleanup,
         o_cleanup:        filter_cleanup,
         o_connect:        filter_connect,
         o_disconnect:     filter_disconnect,
         o_statfs:         filter_statfs,
         o_getattr:        filter_getattr,
+        o_unpackmd:       filter_unpackmd,
         o_create:         filter_create,
         o_setattr:        filter_setattr,
         o_destroy:        filter_destroy,
-        o_open:           filter_open,
-        o_close:          filter_close,
         o_brw:            filter_brw,
         o_punch:          filter_truncate,
+        o_sync:           filter_sync,
         o_preprw:         filter_preprw,
         o_commitrw:       filter_commitrw,
-        o_log_cancel:     filter_log_cancel,
         o_san_preprw:     filter_san_preprw,
         o_destroy_export: filter_destroy_export,
+        o_llog_init:      filter_llog_init,
+        o_llog_finish:    filter_llog_finish,
         o_iocontrol:      filter_iocontrol,
 };
 
@@ -2212,7 +2150,7 @@ static int __init obdfilter_init(void)
         struct lprocfs_static_vars lvars;
         int rc;
 
-        printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
+        printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
 
         lprocfs_init_vars(filter, &lvars);