Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 591005e..8486c22 100644 (file)
 #include <linux/random.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lprocfs_status.h>
-
+#include <linux/version.h>
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+#include <linux/mount.h>
+#endif
 
 static kmem_cache_t *filter_open_cache;
 static kmem_cache_t *filter_dentry_cache;
@@ -64,6 +67,7 @@ struct xprocfs_io_stat {
         __u64    st_create_reqs;
         __u64    st_destroy_reqs;
         __u64    st_statfs_reqs;
+        __u64    st_syncfs_reqs;
         __u64    st_open_reqs;
         __u64    st_close_reqs;
         __u64    st_punch_reqs;
@@ -77,6 +81,7 @@ do {                                                            \
         xprocfs_iostats[smp_processor_id()].field += (count);   \
 } while (0)
 
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 #define DECLARE_XPROCFS_SUM_STAT(field)                 \
 static long long                                        \
 xprocfs_sum_##field (void)                              \
@@ -98,9 +103,11 @@ DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs)
 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
+#endif
 
 static int
 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
@@ -148,6 +155,7 @@ xprocfs_init (char *name)
                 return;
         }
 
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
         xprocfs_add_stat ("read_bytes",   xprocfs_sum_st_read_bytes);
         xprocfs_add_stat ("read_reqs",    xprocfs_sum_st_read_reqs);
         xprocfs_add_stat ("write_bytes",  xprocfs_sum_st_write_bytes);
@@ -157,9 +165,11 @@ xprocfs_init (char *name)
         xprocfs_add_stat ("create_reqs",  xprocfs_sum_st_create_reqs);
         xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
         xprocfs_add_stat ("statfs_reqs",  xprocfs_sum_st_statfs_reqs);
+        xprocfs_add_stat ("syncfs_reqs",  xprocfs_sum_st_syncfs_reqs);
         xprocfs_add_stat ("open_reqs",    xprocfs_sum_st_open_reqs);
         xprocfs_add_stat ("close_reqs",   xprocfs_sum_st_close_reqs);
         xprocfs_add_stat ("punch_reqs",   xprocfs_sum_st_punch_reqs);
+#endif
 }
 
 void xprocfs_fini (void)
@@ -176,6 +186,7 @@ void xprocfs_fini (void)
         remove_proc_entry ("create_reqs",  xprocfs_dir);
         remove_proc_entry ("destroy_reqs", xprocfs_dir);
         remove_proc_entry ("statfs_reqs",  xprocfs_dir);
+        remove_proc_entry ("syncfs_reqs",  xprocfs_dir);
         remove_proc_entry ("open_reqs",    xprocfs_dir);
         remove_proc_entry ("close_reqs",   xprocfs_dir);
         remove_proc_entry ("punch_reqs",   xprocfs_dir);
@@ -212,10 +223,12 @@ static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
 
 void filter_start_transno(struct obd_export *export)
 {
+#ifdef FILTER_TRANSNO_SEM
         struct obd_device * obd = export->exp_obd;
         ENTRY;
 
         down(&obd->u.filter.fo_transno_sem);
+#endif
 }
 
 /* Assumes caller has already pushed us into the kernel context. */
@@ -231,8 +244,16 @@ int filter_finish_transno(struct obd_export *export, void *handle,
         ssize_t written;
 
         /* Propagate error code. */
-        if (rc)
-                GOTO(out, rc);
+        if (rc) {
+#ifdef FILTER_TRANSNO_SEM
+                up(&filter->fo_transno_sem);
+#endif
+                RETURN(rc);
+        }
+
+        if (!(obd->obd_flags & OBD_REPLAYABLE)) {
+                RETURN(0);
+        }
 
         /* we don't allocate new transnos for replayed requests */
 #if 0
@@ -241,13 +262,20 @@ int filter_finish_transno(struct obd_export *export, void *handle,
                 GOTO(out, rc = 0);
 #endif
 
-        off = FILTER_LR_CLIENT_START + fed->fed_lr_off * FILTER_LR_CLIENT_SIZE;
+        off = fed->fed_lr_off;
 
-        last_rcvd = ++filter->fo_fsd->fsd_last_rcvd;
+#ifndef FILTER_TRANSNO_SEM
+        spin_lock(&filter->fo_translock);
+#endif
+        last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
+        filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
+#ifndef FILTER_TRANSNO_SEM
+        spin_unlock(&filter->fo_translock);
+#endif
         if (oti)
                 oti->oti_transno = last_rcvd;
         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
-        fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
+        fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
 
         /* get this from oti */
 #if 0
@@ -261,27 +289,31 @@ int filter_finish_transno(struct obd_export *export, void *handle,
         written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
                                 &off);
         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
-               LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_off, written);
+               LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
 
+#ifdef FILTER_TRANSNO_SEM
+        up(&filter->fo_transno_sem);
+#endif
         if (written == sizeof(*fcd))
-                GOTO(out, rc = 0);
-        CERROR("error writing to last_rcvd file: rc = %d\n", rc);
+                RETURN(0);
+        CERROR("error writing to last_rcvd file: rc = %d\n", written);
         if (written >= 0)
-                GOTO(out, rc = -EIO);
-
-        rc = 0;
-
-        EXIT;
- out:
+                RETURN(-EIO);
 
-        up(&filter->fo_transno_sem);
-        return rc;
+        RETURN(written);
 }
 
 /* write the pathname into the string */
-static int filter_id(char *buf, obd_id id, obd_mode mode)
+static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
+                     obd_mode mode)
 {
-        return sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
+        if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
+                sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
+        else
+                sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
+                       (int)id & (filter->fo_subdir_count - 1), id);
+
+        return buf;
 }
 
 static inline void f_dput(struct dentry *dentry)
@@ -312,56 +344,60 @@ struct dentry_operations filter_dops = {
 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
 
-static unsigned long filter_last_rcvd_slots[FILTER_LR_MAX_CLIENT_WORDS];
-
 /* Add client data to the FILTER.  We use a bitmap to locate a free space
- * in the last_rcvd file if cl_off is -1 (i.e. a new client).
+ * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
  * Otherwise, we have just read the data from the last_rcvd file and
  * we know its offset.
  */
 int filter_client_add(struct filter_obd *filter,
-                      struct filter_export_data *fed, int cl_off)
+                      struct filter_export_data *fed, int cl_idx)
 {
-        int new_client = (cl_off == -1);
+        int new_client = (cl_idx == -1);
+
+        LASSERT(filter->fo_last_rcvd_slots != NULL);
 
-        /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
+        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
          * there's no need for extra complication here
          */
         if (new_client) {
-                cl_off = find_first_zero_bit(filter_last_rcvd_slots,
+                cl_idx = find_first_zero_bit(filter->fo_last_rcvd_slots,
                                              FILTER_LR_MAX_CLIENTS);
         repeat:
-                if (cl_off >= FILTER_LR_MAX_CLIENTS) {
+                if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
                         return -ENOMEM;
                 }
-                if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
+                if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
                         CERROR("FILTER client %d: found bit is set in bitmap\n",
-                               cl_off);
-                        cl_off = find_next_zero_bit(filter_last_rcvd_slots,
+                               cl_idx);
+                        cl_idx = find_next_zero_bit(filter->fo_last_rcvd_slots,
                                                     FILTER_LR_MAX_CLIENTS,
-                                                    cl_off);
+                                                    cl_idx);
                         goto repeat;
                 }
         } else {
-                if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
+                if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
                         CERROR("FILTER client %d: bit already set in bitmap!\n",
-                               cl_off);
+                               cl_idx);
                         LBUG();
                 }
         }
 
-        CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
-               cl_off, fed->fed_fcd->fcd_uuid);
+        fed->fed_lr_idx = cl_idx;
+        fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
+                cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
 
-        fed->fed_lr_off = cl_off;
+        CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
+               fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
 
         if (new_client) {
                 struct obd_run_ctxt saved;
-                loff_t off = FILTER_LR_CLIENT_START +
-                        (cl_off * FILTER_LR_CLIENT_SIZE);
+                loff_t off = fed->fed_lr_off;
                 ssize_t written;
 
+                CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
+                       fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
+
                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
                 written = lustre_fwrite(filter->fo_rcvd_filp,
                                                 (char *)fed->fed_fcd,
@@ -373,9 +409,6 @@ int filter_client_add(struct filter_obd *filter,
                                 RETURN(written);
                         RETURN(-EIO);
                 }
-                CDEBUG(D_INFO, "wrote client fcd at off %u (len %u)\n",
-                       FILTER_LR_CLIENT_START + (cl_off*FILTER_LR_CLIENT_SIZE),
-                       (unsigned int)sizeof(*fed->fed_fcd));
         }
         return 0;
 }
@@ -392,14 +425,16 @@ int filter_client_free(struct obd_export *exp)
         if (!fed->fed_fcd)
                 RETURN(0);
 
-        off = FILTER_LR_CLIENT_START + (fed->fed_lr_off*FILTER_LR_CLIENT_SIZE);
+        LASSERT(filter->fo_last_rcvd_slots != NULL);
+
+        off = fed->fed_lr_off;
 
-        CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
-               fed->fed_lr_off, off, fed->fed_fcd->fcd_uuid);
+        CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
+               fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
 
-        if (!test_and_clear_bit(fed->fed_lr_off, filter_last_rcvd_slots)) {
+        if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
-                       fed->fed_lr_off);
+                       fed->fed_lr_idx);
                 LBUG();
         }
 
@@ -409,21 +444,17 @@ int filter_client_free(struct obd_export *exp)
                                 sizeof(zero_fcd), &off);
 
         /* XXX: this write gets lost sometimes, unless this sync is here. */
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
-#else
-        file_fsync(filter->fo_rcvd_filp,  filter->fo_rcvd_filp->f_dentry, 1);
-#endif
+        file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
 
         if (written != sizeof(zero_fcd)) {
-                CERROR("error zeroing out client %s off %d in %s: %d\n",
-                       fed->fed_fcd->fcd_uuid, fed->fed_lr_off, LAST_RCVD,
-                       written);
+                CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
+                       fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
+                       LAST_RCVD, written);
         } else {
                 CDEBUG(D_INFO,
-                       "zeroed disconnecting client %s at off %d ("LPX64")\n",
-                       fed->fed_fcd->fcd_uuid, fed->fed_lr_off, off);
+                       "zeroed disconnecting client %s at idx %u (%llu)\n",
+                       fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
         }
 
         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
@@ -431,49 +462,34 @@ int filter_client_free(struct obd_export *exp)
         return 0;
 }
 
-static void filter_unpack_fsd(struct filter_server_data *fsd)
-{
-        fsd->fsd_last_objid = le64_to_cpu(fsd->fsd_last_objid);
-        fsd->fsd_last_rcvd = le64_to_cpu(fsd->fsd_last_rcvd);
-        fsd->fsd_mount_count = le64_to_cpu(fsd->fsd_mount_count);
-}
-
-static void filter_pack_fsd(struct filter_server_data *disk_fsd,
-                            struct filter_server_data *fsd)
-{
-        memset(disk_fsd, 0, sizeof(*disk_fsd));
-        memcpy(disk_fsd->fsd_uuid, fsd->fsd_uuid, sizeof(fsd->fsd_uuid));
-        disk_fsd->fsd_last_objid = cpu_to_le64(fsd->fsd_last_objid);
-        disk_fsd->fsd_last_rcvd = cpu_to_le64(fsd->fsd_last_rcvd);
-        disk_fsd->fsd_mount_count = cpu_to_le64(fsd->fsd_mount_count);
-}
-
 static int filter_free_server_data(struct filter_obd *filter)
 {
         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
         filter->fo_fsd = NULL;
-
+        OBD_FREE(filter->fo_last_rcvd_slots,
+                 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+        filter->fo_last_rcvd_slots = NULL;
         return 0;
 }
 
 
-/* assumes caller has already in kernel ctxt */
+/* assumes caller is already in kernel ctxt */
 static int filter_update_server_data(struct file *filp,
                                      struct filter_server_data *fsd)
 {
-        struct filter_server_data disk_fsd;
         loff_t off = 0;
         int rc;
 
         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
-        CDEBUG(D_INODE, "server last_objid: "LPU64"\n", fsd->fsd_last_objid);
-        CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", fsd->fsd_last_rcvd);
-        CDEBUG(D_INODE, "server last_mount: "LPU64"\n", fsd->fsd_mount_count);
-
-        filter_pack_fsd(&disk_fsd, fsd);
-        rc = lustre_fwrite(filp, (char *)&disk_fsd,
-                           sizeof(disk_fsd), &off);
-        if (rc != sizeof(disk_fsd)) {
+        CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
+               le64_to_cpu(fsd->fsd_last_objid));
+        CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
+               le64_to_cpu(fsd->fsd_last_rcvd));
+        CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
+               le64_to_cpu(fsd->fsd_mount_count));
+
+        rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
+        if (rc != sizeof(*fsd)) {
                 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
                        rc);
                 RETURN(-EIO);
@@ -482,8 +498,7 @@ static int filter_update_server_data(struct file *filp,
 }
 
 /* assumes caller has already in kernel ctxt */
-static int filter_init_server_data(struct obd_device *obd,
-                                   struct file * filp,
+static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                                    __u64 init_lastobjid)
 {
         struct filter_obd *filter = &obd->u.filter;
@@ -491,7 +506,8 @@ static int filter_init_server_data(struct obd_device *obd,
         struct filter_client_data *fcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = inode->i_size;
-        int cl_off;
+        __u64 mount_count;
+        int cl_idx;
         loff_t off = 0;
         int rc;
 
@@ -506,125 +522,161 @@ static int filter_init_server_data(struct obd_device *obd,
                 RETURN(-ENOMEM);
         filter->fo_fsd = fsd;
 
+        OBD_ALLOC(filter->fo_last_rcvd_slots, 
+                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+        if (filter->fo_last_rcvd_slots == NULL) {
+                OBD_FREE(fsd, sizeof(*fsd));
+                RETURN(-ENOMEM);
+        }
+
         if (last_rcvd_size == 0) {
                 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
 
                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
-                fsd->fsd_last_objid = init_lastobjid;
+                fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
                 fsd->fsd_last_rcvd = 0;
-                fsd->fsd_mount_count = 0;
-
+                mount_count = fsd->fsd_mount_count = 0;
+                fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
+                fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
+                fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
+                fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
+                filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
         } else {
-                ssize_t  retval = lustre_fread(filp, (char *)fsd,
-                                              sizeof(*fsd),
+                ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
                                               &off);
                 if (retval != sizeof(*fsd)) {
                         CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
                         GOTO(out, rc = -EIO);
                 }
-                filter_unpack_fsd(fsd);
+                mount_count = le64_to_cpu(fsd->fsd_mount_count);
+                filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
+        }
+
+        if (fsd->fsd_feature_incompat) {
+                CERROR("unsupported feature %x\n",
+                       le32_to_cpu(fsd->fsd_feature_incompat));
+                RETURN(-EINVAL);
+        }
+        if (fsd->fsd_feature_rocompat) {
+                CERROR("read-only feature %x\n",
+                       le32_to_cpu(fsd->fsd_feature_rocompat));
+                /* Do something like remount filesystem read-only */
+                RETURN(-EINVAL);
         }
 
         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
-               obd->obd_name, fsd->fsd_last_objid);
+               obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
-               obd->obd_name, fsd->fsd_last_rcvd);
+               obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
-               obd->obd_name, fsd->fsd_mount_count);
+               obd->obd_name, mount_count);
+        CDEBUG(D_INODE, "%s: server data size: %u\n",
+               obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
+        CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+               obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
+        CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+               obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
+        CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
+               obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
 
         /*
          * When we do a clean FILTER shutdown, we save the last_rcvd into
          * the header.  If we find clients with higher last_rcvd values
          * then those clients may need recovery done.
          */
-        /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
-       for (off = FILTER_LR_CLIENT_START, cl_off = 0; off < last_rcvd_size;
-            cl_off++) {
-                __u64 last_rcvd;
-                int mount_age;
-
-                if (!fcd) {
-                        OBD_ALLOC(fcd, sizeof(*fcd));
-                        if (!fcd)
-                                GOTO(err_fsd, rc = -ENOMEM);
-                }
-
-                rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
-                if (rc != sizeof(*fcd)) {
-                        CERROR("error reading FILTER %s offset %d: rc = %d\n",
-                               LAST_RCVD, cl_off, rc);
-                        if (rc > 0) /* XXX fatal error or just abort reading? */
-                                rc = -EIO;
-                        break;
-                }
-
-                if (fcd->fcd_uuid[0] == '\0') {
-                        CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
-                               cl_off);
-                        continue;
-                }
+        if (obd->obd_flags & OBD_REPLAYABLE) {
+                for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
+                        __u64 last_rcvd;
+                        int mount_age;
+
+                        if (!fcd) {
+                                OBD_ALLOC(fcd, sizeof(*fcd));
+                                if (!fcd)
+                                        GOTO(err_fsd, rc = -ENOMEM);
+                        }
 
-                last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
+                        /* Don't assume off is incremented properly, in case
+                         * sizeof(fsd) isn't the same as fsd->fsd_client_size.
+                         */
+                        off = le32_to_cpu(fsd->fsd_client_start) +
+                                cl_idx * le16_to_cpu(fsd->fsd_client_size);
+                        rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
+                        if (rc != sizeof(*fcd)) {
+                                CERROR("error reading FILTER %s offset %d: rc = %d\n",
+                                       LAST_RCVD, cl_idx, rc);
+                                if (rc > 0) /* XXX fatal error or just abort reading? */
+                                        rc = -EIO;
+                                break;
+                        }
 
-                /* These exports are cleaned up by filter_disconnect(), so they
-                 * need to be set up like real exports as filter_connect() does.
-                 */
-                mount_age = fsd->fsd_mount_count -
-                        le64_to_cpu(fcd->fcd_mount_count);
-                if (mount_age < FILTER_MOUNT_RECOV) {
-                        CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
-                               "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
-                               "\n", fcd->fcd_uuid, cl_off,
-                               last_rcvd, fsd->fsd_last_rcvd,
-                               le64_to_cpu(fcd->fcd_mount_count),
-                               fsd->fsd_mount_count);
-#if 0
-                        /* disabled until OST recovery is actually working */
-                        struct obd_export *exp = class_new_export(obd);
-                        struct filter_export_data *fed;
+                        if (fcd->fcd_uuid[0] == '\0') {
+                                CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+                                       cl_idx);
+                                continue;
+                        }
 
-                        if (!exp) {
-                                rc = -ENOMEM;
-                                break;
+                        last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
+
+                        /* These exports are cleaned up by filter_disconnect(), so they
+                         * need to be set up like real exports as filter_connect() does.
+                         */
+                        mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
+                        if (mount_age < FILTER_MOUNT_RECOV) {
+                                struct obd_export *exp = class_new_export(obd);
+                                struct filter_export_data *fed;
+                                CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                                       " srv lr: "LPU64" mnt: "LPU64" last mount: "
+                                       LPU64"\n", fcd->fcd_uuid, cl_idx,
+                                       last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
+                                       le64_to_cpu(fcd->fcd_mount_count), mount_count);
+                                /* disabled until OST recovery is actually working */
+
+                                if (!exp) {
+                                        rc = -ENOMEM;
+                                        break;
+                                }
+                                memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
+                                       sizeof exp->exp_client_uuid.uuid);
+                                fed = &exp->exp_filter_data;
+                                fed->fed_fcd = fcd;
+                                filter_client_add(filter, fed, cl_idx);
+                                /* create helper if export init gets more complex */
+                                INIT_LIST_HEAD(&fed->fed_open_head);
+                                spin_lock_init(&fed->fed_lock);
+
+                                fcd = NULL;
+                                obd->obd_recoverable_clients++;
+                        } else {
+                                CDEBUG(D_INFO,
+                                       "discarded client %d UUID '%s' count "LPU64"\n",
+                                       cl_idx, fcd->fcd_uuid,
+                                       le64_to_cpu(fcd->fcd_mount_count));
                         }
 
-                        fed = &exp->exp_filter_data;
-                        fed->fed_fcd = fcd;
-                        filter_client_add(filter, fed, cl_off);
-                        /* create helper if export init gets more complex */
-                        INIT_LIST_HEAD(&fed->fed_open_head);
-                        spin_lock_init(&fed->fed_lock);
+                        CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
+                               cl_idx, last_rcvd);
 
-                        fcd = NULL;
-                        filter->fo_recoverable_clients++;
-#endif
-                } else {
-                        CDEBUG(D_INFO,
-                               "discarded client %d, UUID '%s', count %Ld\n",
-                               cl_off, fcd->fcd_uuid,
-                               (long long)le64_to_cpu(fcd->fcd_mount_count));
+                        if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
+                                filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
                 }
 
-                CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
-                       cl_off, (unsigned long long)last_rcvd);
+                obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
+                if (obd->obd_recoverable_clients) {
+                        CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
+                               obd->obd_recoverable_clients,
+                               le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
+                        obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+                        obd->obd_flags |= OBD_RECOVERING;
+                }
 
-                if (last_rcvd > filter->fo_fsd->fsd_last_rcvd)
-                        filter->fo_fsd->fsd_last_rcvd = last_rcvd;
-        }
+                if (fcd)
+                        OBD_FREE(fcd, sizeof(*fcd));
 
-        obd->obd_last_committed = filter->fo_fsd->fsd_last_rcvd;
-        if (filter->fo_recoverable_clients) {
-                CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
-                       filter->fo_recoverable_clients,
-                       filter->fo_fsd->fsd_last_rcvd);
-                filter->fo_next_recovery_transno = obd->obd_last_committed + 1;
-                obd->obd_flags |= OBD_RECOVERING;
+        } else {
+                CERROR("%s: recovery support OFF\n", obd->obd_name);
         }
 
-        if (fcd)
-                OBD_FREE(fcd, sizeof(*fcd));
-
-        fsd->fsd_mount_count++;
+        fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
 
         /* save it,so mount count and last_recvd is current */
         rc = filter_update_server_data(filp, filter->fo_fsd);
@@ -642,9 +694,10 @@ static int filter_prep(struct obd_device *obd)
 {
         struct obd_run_ctxt saved;
         struct filter_obd *filter = &obd->u.filter;
-        struct dentry *dentry;
+        struct dentry *dentry, *O_dentry;
         struct file *file;
         struct inode *inode;
+        int i;
         int rc = 0;
         int mode = 0;
 
@@ -662,6 +715,7 @@ static int filter_prep(struct obd_device *obd)
          * Create directories and/or get dentries for each object type.
          * This saves us from having to do multiple lookups for each one.
          */
+        O_dentry = filter->fo_dentry_O;
         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
                 char *name = obd_type_by_mode[mode];
 
@@ -669,22 +723,22 @@ static int filter_prep(struct obd_device *obd)
                         filter->fo_dentry_O_mode[mode] = NULL;
                         continue;
                 }
-                dentry = simple_mkdir(filter->fo_dentry_O, name, 0700);
+                dentry = simple_mkdir(O_dentry, name, 0700);
                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
                 if (IS_ERR(dentry)) {
                         rc = PTR_ERR(dentry);
                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
-                        GOTO(out_O_mode, rc);
+                        GOTO(err_O_mode, rc);
                 }
                 filter->fo_dentry_O_mode[mode] = dentry;
         }
 
         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
-        if ( !file || IS_ERR(file) ) {
+        if (!file || IS_ERR(file)) {
                 rc = PTR_ERR(file);
                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
                        LAST_RCVD, rc);
-                GOTO(out_O_mode, rc);
+                GOTO(err_O_mode, rc);
         }
 
         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
@@ -711,19 +765,50 @@ static int filter_prep(struct obd_device *obd)
         }
         filter->fo_rcvd_filp = file;
 
+        if (filter->fo_subdir_count) {
+                O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
+                OBD_ALLOC(filter->fo_dentry_O_sub,
+                          FILTER_SUBDIR_COUNT * sizeof(dentry));
+                if (!filter->fo_dentry_O_sub)
+                        GOTO(err_client, rc = -ENOMEM);
+
+                for (i = 0; i < filter->fo_subdir_count; i++) {
+                        char dir[20];
+                        snprintf(dir, sizeof(dir), "d%u", i);
+
+                        dentry = simple_mkdir(O_dentry, dir, 0700);
+                        CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
+                        if (IS_ERR(dentry)) {
+                                rc = PTR_ERR(dentry);
+                                CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
+                                GOTO(err_O_sub, rc);
+                        }
+                        filter->fo_dentry_O_sub[i] = dentry;
+                }
+        }
         rc = 0;
  out:
         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
 
         return(rc);
 
+err_O_sub:
+        while (i-- > 0) {
+                struct dentry *dentry = filter->fo_dentry_O_sub[i];
+                if (dentry) {
+                        f_dput(dentry);
+                        filter->fo_dentry_O_sub[i] = NULL;
+                }
+        }
+        OBD_FREE(filter->fo_dentry_O_sub,
+                 filter->fo_subdir_count * sizeof(dentry));
 err_client:
         class_disconnect_all(obd);
 err_filp:
         if (filp_close(file, 0))
                 CERROR("can't close %s after error\n", LAST_RCVD);
         filter->fo_rcvd_filp = NULL;
- out_O_mode:
+err_O_mode:
         while (mode-- > 0) {
                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
                 if (dentry) {
@@ -752,23 +837,28 @@ static void filter_post(struct obd_device *obd)
         rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
         if (rc)
                 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
-        filter_free_server_data(filter);
 
 
         if (filter->fo_rcvd_filp) {
-                /* broken sync at umount bug workaround  */
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-                rc = fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
-#else
                 rc = file_fsync(filter->fo_rcvd_filp,
                                 filter->fo_rcvd_filp->f_dentry, 1);
-#endif
                 filp_close(filter->fo_rcvd_filp, 0);
                 filter->fo_rcvd_filp = NULL;
                 if (rc)
-                        CERROR("last_rcvd file won't closek rc = %ld\n", rc);
+                        CERROR("last_rcvd file won't closed rc = %ld\n", rc);
         }
 
+        if (filter->fo_subdir_count) {
+                int i;
+                for (i = 0; i < filter->fo_subdir_count; i++) {
+                        struct dentry *dentry = filter->fo_dentry_O_sub[i];
+                        f_dput(dentry);
+                        filter->fo_dentry_O_sub[i] = NULL;
+                }
+                OBD_FREE(filter->fo_dentry_O_sub,
+                         filter->fo_subdir_count *
+                         sizeof(*filter->fo_dentry_O_sub));
+        }
         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
                 if (dentry) {
@@ -777,6 +867,7 @@ static void filter_post(struct obd_device *obd)
                 }
         }
         f_dput(filter->fo_dentry_O);
+        filter_free_server_data(filter);
         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
 }
 
@@ -787,7 +878,8 @@ static __u64 filter_next_id(struct obd_device *obd)
         LASSERT(obd->u.filter.fo_fsd != NULL);
 
         spin_lock(&obd->u.filter.fo_objidlock);
-        id = ++obd->u.filter.fo_fsd->fsd_last_objid;
+        id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid);
+        obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
         spin_unlock(&obd->u.filter.fo_objidlock);
 
         return id;
@@ -839,12 +931,15 @@ static struct dentry *filter_fid2dentry(struct obd_device *obd,
 }
 
 static inline struct dentry *filter_parent(struct obd_device *obd,
-                                           obd_mode mode)
+                                           obd_mode mode, obd_id objid)
 {
         struct filter_obd *filter = &obd->u.filter;
 
         LASSERT((mode & S_IFMT) == S_IFREG);   /* only regular files for now */
-        return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+        if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
+                return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+
+        return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
 }
 
 static struct file *filter_obj_open(struct obd_export *export,
@@ -890,9 +985,9 @@ static struct file *filter_obj_open(struct obd_export *export,
                 GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
         }
 
-        filter_id(name, id, type);
         push_ctxt(&saved, &filter->fo_ctxt, NULL);
-        file = filp_open(name, O_RDWR | O_LARGEFILE, 0 /* type? */);
+        file = filp_open(filter_id(name, filter, id, type),
+                         O_RDWR | O_LARGEFILE, type);
         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
 
         if (IS_ERR(file)) {
@@ -909,11 +1004,12 @@ static struct file *filter_obj_open(struct obd_export *export,
                 LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
                 /* should only happen during client recovery */
                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
-                        CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id);
+                        CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
                 atomic_inc(&fdd->fdd_open_count);
         } else {
                 atomic_set(&fdd->fdd_open_count, 1);
                 fdd->fdd_flags = 0;
+                fdd->fdd_objid = id;
                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
                 dentry->d_fsdata = fdd;
                 spin_unlock(&filter->fo_fddlock);
@@ -921,6 +1017,7 @@ static struct file *filter_obj_open(struct obd_export *export,
 
         get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
         ffd->ffd_file = file;
+        LASSERT(file->private_data == NULL);
         file->private_data = ffd;
 
         if (!dentry->d_op)
@@ -932,7 +1029,7 @@ static struct file *filter_obj_open(struct obd_export *export,
         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
         spin_unlock(&fed->fed_lock);
 
-        CDEBUG(D_INODE, "opened objid "LPX64": rc = %p\n", id, file);
+        CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
         EXIT;
 out:
         return file;
@@ -991,7 +1088,7 @@ static int filter_close_internal(struct obd_export *export,
 
         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
             fdd->fdd_flags & FILTER_FLAG_DESTROY) {
-                struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
+                struct dentry *dir_dentry = filter_parent(obd, S_IFREG, fdd->fdd_objid);
                 struct obd_run_ctxt saved;
                 void *handle;
 
@@ -1029,7 +1126,8 @@ static int filter_close_internal(struct obd_export *export,
 
 /* obd methods */
 /* mount the file system (secretly) */
-static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
+static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
+                               char *option)
 {
         struct obd_ioctl_data* data = buf;
         struct filter_obd *filter;
@@ -1038,21 +1136,25 @@ static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
         ENTRY;
 
         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
-                RETURN(rc = -EINVAL);
+                RETURN(-EINVAL);
 
         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
         if (IS_ERR(obd->obd_fsops))
-                RETURN(rc = PTR_ERR(obd->obd_fsops));
+                RETURN(PTR_ERR(obd->obd_fsops));
 
-        mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
+        mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
         rc = PTR_ERR(mnt);
-        if (IS_ERR(mnt))
+        if (IS_ERR(mnt)) {
+                CERROR("mount of %s as type %s failed: rc %d\n",
+                       data->ioc_inlbuf2, data->ioc_inlbuf1, rc);
                 GOTO(err_ops, rc);
+        }
 
+#if OST_RECOVERY
         obd->obd_flags |= OBD_REPLAYABLE;
+#endif
 
         filter = &obd->u.filter;;
-        init_MUTEX(&filter->fo_transno_sem);
         filter->fo_vfsmnt = mnt;
         filter->fo_fstype = strdup(data->ioc_inlbuf2);
         filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
@@ -1067,6 +1169,11 @@ static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
         if (rc)
                 GOTO(err_kfree, rc);
 
+#ifdef FILTER_TRANSNO_SEM
+        init_MUTEX(&filter->fo_transno_sem);
+#else
+        spin_lock_init(&filter->fo_translock);
+#endif
         spin_lock_init(&filter->fo_fddlock);
         spin_lock_init(&filter->fo_objidlock);
         INIT_LIST_HEAD(&filter->fo_export_list);
@@ -1094,6 +1201,29 @@ err_ops:
         return rc;
 }
 
+static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+        return filter_common_setup(obd, len, buf, NULL);
+}
+
+/* sanobd setup methods - use a specific mount option */
+static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct obd_ioctl_data* data = buf;
+        char *option = NULL;
+
+        if (!data->ioc_inlbuf2)
+                RETURN(-EINVAL);
+
+        /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
+        if (!strcmp(data->ioc_inlbuf2, "extN") ||
+            !strcmp(data->ioc_inlbuf2, "ext3"))
+                option = "data=writeback";
+        else
+                LBUG(); /* just a reminder */
+
+        return filter_common_setup(obd, len, buf, option);
+}
 
 static int filter_cleanup(struct obd_device *obd)
 {
@@ -1178,9 +1308,11 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
         INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
         spin_lock_init(&exp->exp_filter_data.fed_lock);
 
-        rc = filter_client_add(filter, fed, -1);
-        if (rc)
-                GOTO(out_fcd, rc);
+        if (obd->obd_flags & OBD_REPLAYABLE) {
+                rc = filter_client_add(filter, fed, -1);
+                if (rc)
+                        GOTO(out_fcd, rc);
+        }
 
         RETURN(rc);
 
@@ -1222,7 +1354,9 @@ static int filter_disconnect(struct lustre_handle *conn)
         spin_unlock(&fed->fed_lock);
 
         ldlm_cancel_locks_for_export(exp);
-        filter_client_free(exp);
+
+        if (exp->exp_obd->obd_flags & OBD_REPLAYABLE) 
+                filter_client_free(exp);
 
         rc = class_disconnect(conn);
 
@@ -1235,7 +1369,7 @@ static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
         int type = oa->o_mode & S_IFMT;
         ENTRY;
 
-        CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPX64" valid 0x%08x\n",
+        CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
                inode->i_ino, inode, oa->o_id, valid);
         /* Don't copy the inode number in place of the object ID */
         obdo_from_inode(oa, inode, valid);
@@ -1289,17 +1423,18 @@ static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
                         CERROR("invalid client "LPX64"\n", conn->addr);
                         RETURN(ERR_PTR(-EINVAL));
                 }
-                dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
+                dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
+                                                              oa->o_id),
                                            oa->o_id, locked);
         }
 
         if (IS_ERR(dentry)) {
-                CERROR("%s error looking up object: "LPX64"\n", what, oa->o_id);
+                CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
                 RETURN(dentry);
         }
 
         if (!dentry->d_inode) {
-                CERROR("%s on non-existent object: "LPX64"\n", what, oa->o_id);
+                CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
                 f_dput(dentry);
                 LBUG();
                 RETURN(ERR_PTR(-ENOENT));
@@ -1446,7 +1581,7 @@ static int filter_close(struct lustre_handle *conn, struct obdo *oa,
         XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
 
         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
-                CERROR("no handle for close of objid "LPX64"\n", oa->o_id);
+                CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
                 RETURN(-EINVAL);
         }
 
@@ -1492,17 +1627,18 @@ static int filter_create(struct lustre_handle *conn, struct obdo *oa,
         oa->o_id = filter_next_id(obd);
 
         push_ctxt(&saved, &filter->fo_ctxt, NULL);
-        dir_dentry = filter_parent(obd, oa->o_mode);
+        dir_dentry = filter_parent(obd, S_IFREG, oa->o_id);
         down(&dir_dentry->d_inode->i_sem);
         new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
         if (IS_ERR(new))
                 GOTO(out, rc = PTR_ERR(new));
 
         if (new->d_inode) {
+                char buf[32];
+
                 /* This would only happen if lastobjid was bad on disk */
-                CERROR("objid O/%*s/"LPU64" already exists\n",
-                       dir_dentry->d_name.len, dir_dentry->d_name.name,
-                       oa->o_id);
+                CERROR("objid %s already exists\n",
+                       filter_id(buf, filter, S_IFREG, oa->o_id));
                 LBUG();
                 GOTO(out, rc = -EEXIST);
         }
@@ -1568,9 +1704,9 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
 
         XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
 
-        CDEBUG(D_INODE, "destroying objid "LPX64"\n", oa->o_id);
+        CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
 
-        dir_dentry = filter_parent(obd, oa->o_mode);
+        dir_dentry = filter_parent(obd, oa->o_mode, oa->o_id);
         down(&dir_dentry->d_inode->i_sem);
 
         object_dentry = filter_oa2dentry(conn, oa, 0);
@@ -1591,11 +1727,11 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
                         /* XXX put into PENDING directory in case of crash */
                         CDEBUG(D_INODE,
-                               "defer destroy of %dx open objid "LPX64"\n",
+                               "defer destroy of %dx open objid "LPU64"\n",
                                atomic_read(&fdd->fdd_open_count), oa->o_id);
                 } else
                         CDEBUG(D_INODE,
-                               "repeat destroy of %dx open objid "LPX64"\n",
+                               "repeat destroy of %dx open objid "LPU64"\n",
                                atomic_read(&fdd->fdd_open_count), oa->o_id);
                 GOTO(out_commit, rc = 0);
         }
@@ -1635,7 +1771,7 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
         if (end != OBD_OBJECT_EOF)
                 CERROR("PUNCH not supported, only truncate works\n");
 
-        CDEBUG(D_INODE, "calling truncate for object "LPX64", valid = %x, "
+        CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
         oa->o_size = start;
         error = filter_setattr(conn, oa, NULL, oti);
@@ -1650,9 +1786,9 @@ static inline void lustre_put_page(struct page *page)
 
 
 static struct page *
-lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb)
+lustre_get_page_read(struct inode *inode, struct niobuf_local *lnb)
 {
-        unsigned long index = rnb->offset >> PAGE_SHIFT;
+        unsigned long index = lnb->offset >> PAGE_SHIFT;
         struct address_space *mapping = inode->i_mapping;
         struct page *page;
         int rc;
@@ -1661,7 +1797,8 @@ lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb)
                                (filler_t*)mapping->a_ops->readpage, NULL);
         if (!IS_ERR(page)) {
                 wait_on_page(page);
-                kmap(page);
+                lnb->addr = kmap(page);
+                lnb->page = page;
                 if (!PageUptodate(page)) {
                         CERROR("page index %lu not uptodate\n", index);
                         GOTO(err_page, rc = -EIO);
@@ -1760,12 +1897,10 @@ static int lustre_commit_write(struct niobuf_local *lnb)
 }
 
 struct page *filter_get_page_write(struct inode *inode,
-                                   struct niobuf_remote *rnb,
                                    struct niobuf_local *lnb, int *pglocked)
 {
-        unsigned long index = rnb->offset >> PAGE_SHIFT;
+        unsigned long index = lnb->offset >> PAGE_SHIFT;
         struct address_space *mapping = inode->i_mapping;
-
         struct page *page;
         int rc;
 
@@ -1791,26 +1926,25 @@ struct page *filter_get_page_write(struct inode *inode,
                 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
                 if (!addr) {
                         CERROR("no memory for a temp page\n");
-                        LBUG();
                         GOTO(err, rc = -ENOMEM);
                 }
-                /* XXX debugging */
-                memset((void *)addr, 0xBA, PAGE_SIZE);
+                POISON((void *)addr, 0xBA, PAGE_SIZE);
                 page = virt_to_page(addr);
                 kmap(page);
                 page->index = index;
+                lnb->addr = (void *)addr;
+                lnb->page = page;
                 lnb->flags |= N_LOCAL_TEMP_PAGE;
         } else if (!IS_ERR(page)) {
                 (*pglocked)++;
                 kmap(page);
 
                 rc = mapping->a_ops->prepare_write(NULL, page,
-                                                   rnb->offset % PAGE_SIZE,
-                                                   rnb->len);
+                                                   lnb->offset & ~PAGE_MASK,
+                                                   lnb->len);
                 if (rc) {
-                        CERROR("page index %lu, rc = %d\n", index, rc);
                         if (rc != -ENOSPC)
-                                LBUG();
+                                CERROR("page index %lu, rc = %d\n", index, rc);
                         GOTO(err_unlock, rc);
                 }
                 /* XXX not sure if we need this if we are overwriting page */
@@ -1819,7 +1953,10 @@ struct page *filter_get_page_write(struct inode *inode,
                         LBUG();
                         GOTO(err_unlock, rc = -EIO);
                 }
+                lnb->addr = page_address(page);
+                lnb->page = page;
         }
+
         return page;
 
 err_unlock:
@@ -1849,7 +1986,8 @@ static int filter_commit_write(struct niobuf_local *lnb, int err)
                 unsigned blocksize = head->b_size;
 
                 /* debugging: just seeing if this ever happens */
-                CERROR("called filter_commit_write for ino %lu:%lu on err %d\n",
+                CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
+                       "called for ino %lu:%lu on err %d\n",
                        lnb->page->mapping->host->i_ino, lnb->page->index, err);
 
                 /* Currently one buffer per page, but in the future... */
@@ -1876,7 +2014,6 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
         struct obd_ioobj *o;
         struct niobuf_remote *rnb = nb;
         struct niobuf_local *lnb = res;
-        struct dentry *dir_dentry;
         struct fsfilt_objinfo *fso;
         int pglocked = 0;
         int rc = 0;
@@ -1904,7 +2041,6 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                 RETURN(-ENOMEM);
 
         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-        dir_dentry = filter_parent(obd, S_IFREG);
 
         for (i = 0, o = obj; i < objcount; i++, o++) {
                 struct filter_dentry_data *fdd;
@@ -1912,7 +2048,9 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
 
                 LASSERT(o->ioo_bufcnt);
 
-                dentry = filter_fid2dentry(obd, dir_dentry, o->ioo_id, 0);
+                dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
+                                                              o->ioo_id),
+                                           o->ioo_id, 0);
 
                 if (IS_ERR(dentry))
                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
@@ -1923,17 +2061,18 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                 if (!dentry->d_inode) {
                         CERROR("trying to BRW to non-existent file "LPU64"\n",
                                o->ioo_id);
+                        f_dput(dentry);
                         GOTO(out_objinfo, rc = -ENOENT);
                 }
 
                 fdd = dentry->d_fsdata;
                 if (!fdd || !atomic_read(&fdd->fdd_open_count))
-                        CDEBUG(D_PAGE, "I/O to unopened object "LPX64"\n",
+                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
                                o->ioo_id);
         }
 
         if (cmd & OBD_BRW_WRITE) {
-#warning "FIXME: we need to get inode->i_sem for each object here"
+#warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
                 /* Even worse, we need to get locks on mulitple inodes (in
                  * order) or use the DLM to do the locking for us (and use
                  * the same locking in filter_setattr() for truncate.  The
@@ -1951,8 +2090,13 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                 filter_start_transno(export);
                 *desc_private = fsfilt_brw_start(obd, objcount, fso,
                                                  niocount, nb);
-                if (IS_ERR(*desc_private))
-                        GOTO(out_objinfo, rc = PTR_ERR(*desc_private));
+                if (IS_ERR(*desc_private)) {
+                        rc = PTR_ERR(*desc_private);
+                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                               "error starting transaction: rc = %d\n", rc);
+                        *desc_private = NULL;
+                        GOTO(out_objinfo, rc);
+                }
         }
 
         obd_kmap_get(niocount, 1);
@@ -1973,29 +2117,31 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                         else
                                 lnb->dentry = dget(dentry);
 
+                        /* lnb->offset is aligned, while rnb->offset isn't,
+                         * and we need to copy the fields to lnb anyways.
+                         */
+                        memcpy(lnb, rnb, sizeof(*rnb));
                         if (cmd & OBD_BRW_WRITE) {
-                                page = filter_get_page_write(inode, rnb, lnb,
+                                page = filter_get_page_write(inode, lnb,
                                                              &pglocked);
 
-                                XPROCFS_BUMP_MYCPU_IOSTAT (st_write_bytes,
-                                                           rnb->len);
+                                XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
+                                                          lnb->len);
                         } else {
-                                page = lustre_get_page_read(inode, rnb);
+                                page = lustre_get_page_read(inode, lnb);
 
-                                XPROCFS_BUMP_MYCPU_IOSTAT (st_read_bytes,
-                                                           rnb->len);
+                                XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
+                                                          lnb->len);
                         }
 
                         if (IS_ERR(page)) {
                                 rc = PTR_ERR(page);
+                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                                       "error on page @"LPU64"%u/%u: rc = %d\n",
+                                       lnb->offset, j, o->ioo_bufcnt, rc);
                                 f_dput(dentry);
                                 GOTO(out_pages, rc);
                         }
-
-                        lnb->addr = page_address(page);
-                        lnb->offset = rnb->offset;
-                        lnb->page = page;
-                        lnb->len = rnb->len;
                 }
         }
 
@@ -2008,7 +2154,6 @@ out:
 
 out_pages:
         while (lnb-- > res) {
-                CERROR("%d error cleanup on brw\n", rc);
                 if (cmd & OBD_BRW_WRITE)
                         filter_commit_write(lnb, rc);
                 else
@@ -2016,16 +2161,17 @@ out_pages:
                 f_dput(lnb->dentry);
         }
         obd_kmap_put(niocount);
-        goto out_err; /* dropped the dentry refs already (one per page) */
+        if (cmd & OBD_BRW_WRITE) {
+                filter_finish_transno(export, *desc_private, oti, rc);
+                fsfilt_commit(obd,
+                              filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
+                              *desc_private);
+        }
+        goto out; /* dropped the dentry refs already (one per page) */
 
 out_objinfo:
         for (i = 0; i < objcount && fso[i].fso_dentry; i++)
                 f_dput(fso[i].fso_dentry);
-out_err:
-        if (cmd & OBD_BRW_WRITE) {
-                filter_finish_transno(export, *desc_private, oti, rc);
-                fsfilt_commit(obd, dir_dentry->d_inode, *desc_private);
-        }
         goto out;
 }
 
@@ -2072,8 +2218,15 @@ static int filter_write_locked_page(struct niobuf_local *lnb)
         RETURN(rc);
 }
 
-static int filter_sync(struct obd_device *obd)
+static int filter_syncfs(struct lustre_handle *conn)
 {
+        struct obd_device *obd;
+        ENTRY;
+
+        obd = class_conn2obd(conn);
+
+        XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
+
         RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
 }
 
@@ -2139,8 +2292,9 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
         }
 
         if (cmd & OBD_BRW_WRITE) {
+                /* We just want any dentry for the commit, for now */
+                struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0);
                 int err;
-                struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
 
                 rc = filter_finish_transno(export, desc_private, oti, rc);
                 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
@@ -2148,7 +2302,7 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
                         rc = err;
                 if (obd_sync_filter) {
                         /* this can fail with ENOMEM, what should we do then? */
-                        filter_sync(obd);
+                        filter_syncfs(conn);
                 }
                 /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
         }
@@ -2216,6 +2370,81 @@ out:
         RETURN(ret);
 }
 
+static int filter_san_preprw(int cmd, struct lustre_handle *conn,
+                             int objcount, struct obd_ioobj *obj,
+                             int niocount, struct niobuf_remote *nb)
+{
+        struct obd_device *obd;
+        struct obd_ioobj *o = obj;
+        struct niobuf_remote *rnb = nb;
+        int rc = 0;
+        int i;
+        ENTRY;
+
+        if ((cmd & OBD_BRW_WRITE) != 0)
+                XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
+        else
+                XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
+
+        obd = class_conn2obd(conn);
+        if (!obd) {
+                CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
+                RETURN(-EINVAL);
+        }
+
+        for (i = 0; i < objcount; i++, o++) {
+                struct dentry *dentry;
+                struct inode *inode;
+                int j;
+
+                dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
+                                                              o->ioo_id),
+                                           o->ioo_id, 0);
+                if (IS_ERR(dentry))
+                        GOTO(out, rc = PTR_ERR(dentry));
+                inode = dentry->d_inode;
+                if (!inode) {
+                        CERROR("trying to BRW to non-existent file "LPU64"\n",
+                               o->ioo_id);
+                        f_dput(dentry);
+                        GOTO(out, rc = -ENOENT);
+                }
+
+                for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
+                        long block;
+
+                        block = rnb->offset >> PAGE_SHIFT;
+
+                        if (cmd == OBD_BRW_READ) {
+                                block = inode->i_mapping->a_ops->bmap(
+                                                inode->i_mapping, block);
+                        } else {
+                                loff_t newsize = rnb->offset + rnb->len;
+                                /* fs_prep_san_write will also update inode
+                                 * size for us:
+                                 * (1) new alloced block
+                                 * (2) existed block but size extented
+                                 */
+                                /* FIXME We could call fs_prep_san_write()
+                                 * only once for all the blocks allocation.
+                                 * Now call it once for each block, for
+                                 * simplicity. And if error happens, we
+                                 * probably need to release previous alloced
+                                 * block */
+                                rc = fs_prep_san_write(obd, inode, &block,
+                                                       1, newsize);
+                                if (rc)
+                                        break;
+                        }
+
+                        rnb->offset = block;
+                }
+                f_dput(dentry);
+        }
+out:
+        RETURN(rc);
+}
+
 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
 {
         struct obd_device *obd;
@@ -2358,6 +2587,7 @@ static struct obd_ops filter_obd_ops = {
         o_connect:      filter_connect,
         o_disconnect:   filter_disconnect,
         o_statfs:       filter_statfs,
+        o_syncfs:       filter_syncfs,
         o_getattr:      filter_getattr,
         o_create:       filter_create,
         o_setattr:      filter_setattr,
@@ -2369,6 +2599,36 @@ static struct obd_ops filter_obd_ops = {
         o_preprw:       filter_preprw,
         o_commitrw:     filter_commitrw
 #if 0
+        o_san_preprw:  filter_san_preprw,
+        o_preallocate: filter_preallocate_inodes,
+        o_migrate:     filter_migrate,
+        o_copy:        filter_copy_data,
+        o_iterate:     filter_iterate
+#endif
+};
+
+static struct obd_ops filter_sanobd_ops = {
+        o_owner:        THIS_MODULE,
+        o_attach:       filter_attach,
+        o_detach:       filter_detach,
+        o_get_info:     filter_get_info,
+        o_setup:        filter_san_setup,
+        o_cleanup:      filter_cleanup,
+        o_connect:      filter_connect,
+        o_disconnect:   filter_disconnect,
+        o_statfs:       filter_statfs,
+        o_getattr:      filter_getattr,
+        o_create:       filter_create,
+        o_setattr:      filter_setattr,
+        o_destroy:      filter_destroy,
+        o_open:         filter_open,
+        o_close:        filter_close,
+        o_brw:          filter_brw,
+        o_punch:        filter_truncate,
+        o_preprw:       filter_preprw,
+        o_commitrw:     filter_commitrw,
+        o_san_preprw:   filter_san_preprw,
+#if 0
         o_preallocate:  filter_preallocate_inodes,
         o_migrate:      filter_migrate,
         o_copy:         filter_copy_data,
@@ -2380,6 +2640,7 @@ static struct obd_ops filter_obd_ops = {
 static int __init obdfilter_init(void)
 {
         struct lprocfs_static_vars lvars;
+        int rc;
 
         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
         filter_open_cache = kmem_cache_create("ll_filter_fdata",
@@ -2392,19 +2653,37 @@ static int __init obdfilter_init(void)
                                         sizeof(struct filter_dentry_data),
                                         0, 0, NULL, NULL);
         if (!filter_dentry_cache) {
-                kmem_cache_destroy(filter_open_cache);
-                RETURN(-ENOMEM);
+                rc = -ENOMEM;
+                goto err1;
         }
 
         xprocfs_init ("filter");
 
         lprocfs_init_vars(&lvars);
-        return class_register_type(&filter_obd_ops, lvars.module_vars,
-                                   OBD_FILTER_DEVICENAME);
+
+        rc = class_register_type(&filter_obd_ops, lvars.module_vars,
+                                 OBD_FILTER_DEVICENAME);
+        if (rc)
+                goto err2;
+
+        rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
+                                 OBD_FILTER_SAN_DEVICENAME);
+        if (rc)
+                goto err3;
+
+        return 0;
+err3:
+        class_unregister_type(OBD_FILTER_DEVICENAME);
+err2:
+        kmem_cache_destroy(filter_dentry_cache);
+err1:
+        kmem_cache_destroy(filter_open_cache);
+        return rc;
 }
 
 static void __exit obdfilter_exit(void)
 {
+        class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
         class_unregister_type(OBD_FILTER_DEVICENAME);
         if (kmem_cache_destroy(filter_dentry_cache))
                 CERROR("couldn't free obdfilter dentry cache\n");