*
* linux/fs/obdfilter/filter.c
*
- * Copyright (c) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (c) 2001-2003 Cluster File Systems, Inc.
* Author: Peter Braam <braam@clusterfs.com>
* Author: Andreas Dilger <adilger@clusterfs.com>
*
*/
/*
- * Invariant: get O/R i_sem for lookup, if needed, before any journal ops
+ * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
* (which need to get journal_lock, may block if journal full).
+ *
+ * Invariant: Call filter_start_transno() before any journal ops to avoid the
+ * same deadlock problem. We can (and want) to get rid of the
+ * transno sem in favour of the dir/inode i_sem to avoid single
+ * threaded operation on the OST.
*/
#define EXPORT_SYMTAB
#include <linux/lustre_fsfilt.h>
#include <linux/lprocfs_status.h>
-extern struct lprocfs_vars status_class_var[];
-extern struct lprocfs_vars status_var_nm_1[];
static kmem_cache_t *filter_open_cache;
static kmem_cache_t *filter_dentry_cache;
+/* should be generic per-obd stats... */
+struct xprocfs_io_stat {
+ __u64 st_read_bytes;
+ __u64 st_read_reqs;
+ __u64 st_write_bytes;
+ __u64 st_write_reqs;
+ __u64 st_getattr_reqs;
+ __u64 st_setattr_reqs;
+ __u64 st_create_reqs;
+ __u64 st_destroy_reqs;
+ __u64 st_statfs_reqs;
+ __u64 st_open_reqs;
+ __u64 st_close_reqs;
+ __u64 st_punch_reqs;
+};
+
+static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
+static struct proc_dir_entry *xprocfs_dir;
+
+#define XPROCFS_BUMP_MYCPU_IOSTAT(field, count) \
+do { \
+ xprocfs_iostats[smp_processor_id()].field += (count); \
+} while (0)
+
+#define DECLARE_XPROCFS_SUM_STAT(field) \
+static long long \
+xprocfs_sum_##field (void) \
+{ \
+ long long stat = 0; \
+ int i; \
+ \
+ for (i = 0; i < smp_num_cpus; i++) \
+ stat += xprocfs_iostats[i].field; \
+ return (stat); \
+}
+
+DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
+DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
+DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
+
+static int
+xprocfs_rd_stat (char *page, char **start, off_t off, int count,
+ int *eof, void *data)
+{
+ long long (*fn)(void) = (long long(*)(void))data;
+ int len;
+
+ *eof = 1;
+ if (off != 0)
+ return (0);
+
+ len = snprintf (page, count, "%Ld\n", fn());
+ *start = page;
+ return (len);
+}
+
+
+static void
+xprocfs_add_stat(char *name, long long (*fn)(void))
+{
+ struct proc_dir_entry *entry;
+
+ entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
+ if (entry == NULL) {
+ CERROR ("Can't add procfs stat %s\n", name);
+ return;
+ }
+
+ entry->data = fn;
+ entry->read_proc = xprocfs_rd_stat;
+ entry->write_proc = NULL;
+}
+
+static void
+xprocfs_init (char *name)
+{
+ char dirname[64];
+
+ snprintf (dirname, sizeof (dirname), "sys/%s", name);
+
+ xprocfs_dir = proc_mkdir ("sys/obdfilter", NULL);
+ if (xprocfs_dir == NULL) {
+ CERROR ("Can't make dir\n");
+ return;
+ }
+
+ xprocfs_add_stat ("read_bytes", xprocfs_sum_st_read_bytes);
+ xprocfs_add_stat ("read_reqs", xprocfs_sum_st_read_reqs);
+ xprocfs_add_stat ("write_bytes", xprocfs_sum_st_write_bytes);
+ xprocfs_add_stat ("write_reqs", xprocfs_sum_st_write_reqs);
+ xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
+ xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
+ xprocfs_add_stat ("create_reqs", xprocfs_sum_st_create_reqs);
+ xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
+ xprocfs_add_stat ("statfs_reqs", xprocfs_sum_st_statfs_reqs);
+ xprocfs_add_stat ("open_reqs", xprocfs_sum_st_open_reqs);
+ xprocfs_add_stat ("close_reqs", xprocfs_sum_st_close_reqs);
+ xprocfs_add_stat ("punch_reqs", xprocfs_sum_st_punch_reqs);
+}
+
+void xprocfs_fini (void)
+{
+ if (xprocfs_dir == NULL)
+ return;
+
+ remove_proc_entry ("read_bytes", xprocfs_dir);
+ remove_proc_entry ("read_reqs", xprocfs_dir);
+ remove_proc_entry ("write_bytes", xprocfs_dir);
+ remove_proc_entry ("write_reqs", xprocfs_dir);
+ remove_proc_entry ("getattr_reqs", xprocfs_dir);
+ remove_proc_entry ("setattr_reqs", xprocfs_dir);
+ remove_proc_entry ("create_reqs", xprocfs_dir);
+ remove_proc_entry ("destroy_reqs", xprocfs_dir);
+ remove_proc_entry ("statfs_reqs", xprocfs_dir);
+ remove_proc_entry ("open_reqs", xprocfs_dir);
+ remove_proc_entry ("close_reqs", xprocfs_dir);
+ remove_proc_entry ("punch_reqs", xprocfs_dir);
+
+ remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
+ xprocfs_dir = NULL;
+}
+
#define S_SHIFT 12
static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
[0] NULL,
return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
}
+static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
+ int error)
+{
+ CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
+ last_rcvd, error);
+ if (!error && last_rcvd > obd->obd_last_committed)
+ obd->obd_last_committed = last_rcvd;
+}
+
+void filter_start_transno(struct obd_export *export)
+{
+ struct obd_device * obd = export->exp_obd;
+ ENTRY;
+
+ down(&obd->u.filter.fo_transno_sem);
+}
+
+/* Assumes caller has already pushed us into the kernel context. */
+int filter_finish_transno(struct obd_export *export, void *handle,
+ struct obd_trans_info *oti, int rc)
+{
+ __u64 last_rcvd;
+ struct obd_device *obd = export->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
+ struct filter_export_data *fed = &export->exp_filter_data;
+ struct filter_client_data *fcd = fed->fed_fcd;
+ loff_t off;
+ ssize_t written;
+
+ /* Propagate error code. */
+ if (rc)
+ GOTO(out, rc);
+
+ /* we don't allocate new transnos for replayed requests */
+#if 0
+ /* perhaps if transno already set? or should level be in oti? */
+ if (req->rq_level == LUSTRE_CONN_RECOVD)
+ GOTO(out, rc = 0);
+#endif
+
+ off = FILTER_LR_CLIENT_START + fed->fed_lr_off * FILTER_LR_CLIENT_SIZE;
+
+ last_rcvd = ++filter->fo_fsd->fsd_last_rcvd;
+ if (oti)
+ oti->oti_transno = last_rcvd;
+ fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
+ fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
+
+ /* get this from oti */
+#if 0
+ if (oti)
+ fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
+ else
+#else
+ fcd->fcd_last_xid = 0;
+#endif
+ fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_last_rcvd_cb);
+ written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
+ &off);
+ CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
+ LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_off, written);
+
+ if (written == sizeof(*fcd))
+ GOTO(out, rc = 0);
+ CERROR("error writing to last_rcvd file: rc = %d\n", rc);
+ if (written >= 0)
+ GOTO(out, rc = -EIO);
+
+ rc = 0;
+
+ EXIT;
+ out:
+
+ up(&filter->fo_transno_sem);
+ return rc;
+}
+
/* write the pathname into the string */
static int filter_id(char *buf, obd_id id, obd_mode mode)
{
};
#define LAST_RCVD "last_rcvd"
+#define INIT_OBJID 2
+
+/* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
+#define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
+#define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
+
+static unsigned long filter_last_rcvd_slots[FILTER_LR_MAX_CLIENT_WORDS];
+
+/* Add client data to the FILTER. We use a bitmap to locate a free space
+ * in the last_rcvd file if cl_off is -1 (i.e. a new client).
+ * Otherwise, we have just read the data from the last_rcvd file and
+ * we know its offset.
+ */
+int filter_client_add(struct filter_obd *filter,
+ struct filter_export_data *fed, int cl_off)
+{
+ int new_client = (cl_off == -1);
+
+ /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
+ * there's no need for extra complication here
+ */
+ if (new_client) {
+ cl_off = find_first_zero_bit(filter_last_rcvd_slots,
+ FILTER_LR_MAX_CLIENTS);
+ repeat:
+ if (cl_off >= FILTER_LR_MAX_CLIENTS) {
+ CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
+ return -ENOMEM;
+ }
+ if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
+ CERROR("FILTER client %d: found bit is set in bitmap\n",
+ cl_off);
+ cl_off = find_next_zero_bit(filter_last_rcvd_slots,
+ FILTER_LR_MAX_CLIENTS,
+ cl_off);
+ goto repeat;
+ }
+ } else {
+ if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
+ CERROR("FILTER client %d: bit already set in bitmap!\n",
+ cl_off);
+ LBUG();
+ }
+ }
+
+ CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
+ cl_off, fed->fed_fcd->fcd_uuid);
+
+ fed->fed_lr_off = cl_off;
+
+ if (new_client) {
+ struct obd_run_ctxt saved;
+ loff_t off = FILTER_LR_CLIENT_START +
+ (cl_off * FILTER_LR_CLIENT_SIZE);
+ ssize_t written;
+
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ written = lustre_fwrite(filter->fo_rcvd_filp,
+ (char *)fed->fed_fcd,
+ sizeof(*fed->fed_fcd), &off);
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+
+ if (written != sizeof(*fed->fed_fcd)) {
+ if (written < 0)
+ RETURN(written);
+ RETURN(-EIO);
+ }
+ CDEBUG(D_INFO, "wrote client fcd at off %u (len %u)\n",
+ FILTER_LR_CLIENT_START + (cl_off*FILTER_LR_CLIENT_SIZE),
+ (unsigned int)sizeof(*fed->fed_fcd));
+ }
+ return 0;
+}
+
+int filter_client_free(struct obd_export *exp)
+{
+ struct filter_export_data *fed = &exp->exp_filter_data;
+ struct filter_obd *filter = &exp->exp_obd->u.filter;
+ struct filter_client_data zero_fcd;
+ struct obd_run_ctxt saved;
+ int written;
+ loff_t off;
+
+ if (!fed->fed_fcd)
+ RETURN(0);
+
+ off = FILTER_LR_CLIENT_START + (fed->fed_lr_off*FILTER_LR_CLIENT_SIZE);
+
+ CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
+ fed->fed_lr_off, off, fed->fed_fcd->fcd_uuid);
+
+ if (!test_and_clear_bit(fed->fed_lr_off, filter_last_rcvd_slots)) {
+ CERROR("FILTER client %u: bit already clear in bitmap!!\n",
+ fed->fed_lr_off);
+ LBUG();
+ }
+
+ memset(&zero_fcd, 0, sizeof zero_fcd);
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
+ sizeof(zero_fcd), &off);
+
+ /* XXX: this write gets lost sometimes, unless this sync is here. */
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
+#else
+ file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
+#endif
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+
+ if (written != sizeof(zero_fcd)) {
+ CERROR("error zeroing out client %s off %d in %s: %d\n",
+ fed->fed_fcd->fcd_uuid, fed->fed_lr_off, LAST_RCVD,
+ written);
+ } else {
+ CDEBUG(D_INFO,
+ "zeroed disconnecting client %s at off %d ("LPX64")\n",
+ fed->fed_fcd->fcd_uuid, fed->fed_lr_off, off);
+ }
+
+ OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
+
+ return 0;
+}
+
+static void filter_unpack_fsd(struct filter_server_data *fsd)
+{
+ fsd->fsd_last_objid = le64_to_cpu(fsd->fsd_last_objid);
+ fsd->fsd_last_rcvd = le64_to_cpu(fsd->fsd_last_rcvd);
+ fsd->fsd_mount_count = le64_to_cpu(fsd->fsd_mount_count);
+}
+
+static void filter_pack_fsd(struct filter_server_data *disk_fsd,
+ struct filter_server_data *fsd)
+{
+ memset(disk_fsd, 0, sizeof(*disk_fsd));
+ memcpy(disk_fsd->fsd_uuid, fsd->fsd_uuid, sizeof(fsd->fsd_uuid));
+ disk_fsd->fsd_last_objid = cpu_to_le64(fsd->fsd_last_objid);
+ disk_fsd->fsd_last_rcvd = cpu_to_le64(fsd->fsd_last_rcvd);
+ disk_fsd->fsd_mount_count = cpu_to_le64(fsd->fsd_mount_count);
+}
+
+static int filter_free_server_data(struct filter_obd *filter)
+{
+ OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
+ filter->fo_fsd = NULL;
+
+ return 0;
+}
+
+
+/* assumes caller has already in kernel ctxt */
+static int filter_update_server_data(struct file *filp,
+ struct filter_server_data *fsd)
+{
+ struct filter_server_data disk_fsd;
+ loff_t off = 0;
+ int rc;
+
+ CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
+ CDEBUG(D_INODE, "server last_objid: "LPU64"\n", fsd->fsd_last_objid);
+ CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", fsd->fsd_last_rcvd);
+ CDEBUG(D_INODE, "server last_mount: "LPU64"\n", fsd->fsd_mount_count);
+
+ filter_pack_fsd(&disk_fsd, fsd);
+ rc = lustre_fwrite(filp, (char *)&disk_fsd,
+ sizeof(disk_fsd), &off);
+ if (rc != sizeof(disk_fsd)) {
+ CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
+ rc);
+ RETURN(-EIO);
+ }
+ RETURN(0);
+}
+
+/* assumes caller has already in kernel ctxt */
+static int filter_init_server_data(struct obd_device *obd,
+ struct file * filp,
+ __u64 init_lastobjid)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ struct filter_server_data *fsd;
+ struct filter_client_data *fcd = NULL;
+ struct inode *inode = filp->f_dentry->d_inode;
+ unsigned long last_rcvd_size = inode->i_size;
+ int cl_off;
+ loff_t off = 0;
+ int rc;
+
+ /* ensure padding in the struct is the correct size */
+ LASSERT (offsetof(struct filter_server_data, fsd_padding) +
+ sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
+ LASSERT (offsetof(struct filter_client_data, fcd_padding) +
+ sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
+
+ OBD_ALLOC(fsd, sizeof(*fsd));
+ if (!fsd)
+ RETURN(-ENOMEM);
+ filter->fo_fsd = fsd;
+
+ if (last_rcvd_size == 0) {
+ CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
+
+ memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
+ fsd->fsd_last_objid = init_lastobjid;
+ fsd->fsd_last_rcvd = 0;
+ fsd->fsd_mount_count = 0;
+
+ } else {
+ ssize_t retval = lustre_fread(filp, (char *)fsd,
+ sizeof(*fsd),
+ &off);
+ if (retval != sizeof(*fsd)) {
+ CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
+ GOTO(out, rc = -EIO);
+ }
+ filter_unpack_fsd(fsd);
+ }
+
+ CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
+ obd->obd_name, fsd->fsd_last_objid);
+ CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
+ obd->obd_name, fsd->fsd_last_rcvd);
+ CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
+ obd->obd_name, fsd->fsd_mount_count);
+
+ /*
+ * When we do a clean FILTER shutdown, we save the last_rcvd into
+ * the header. If we find clients with higher last_rcvd values
+ * then those clients may need recovery done.
+ */
+ /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
+ for (off = FILTER_LR_CLIENT_START, cl_off = 0; off < last_rcvd_size;
+ cl_off++) {
+ __u64 last_rcvd;
+ int mount_age;
+
+ if (!fcd) {
+ OBD_ALLOC(fcd, sizeof(*fcd));
+ if (!fcd)
+ GOTO(err_fsd, rc = -ENOMEM);
+ }
+
+ rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
+ if (rc != sizeof(*fcd)) {
+ CERROR("error reading FILTER %s offset %d: rc = %d\n",
+ LAST_RCVD, cl_off, rc);
+ if (rc > 0) /* XXX fatal error or just abort reading? */
+ rc = -EIO;
+ break;
+ }
+
+ if (fcd->fcd_uuid[0] == '\0') {
+ CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+ cl_off);
+ continue;
+ }
+
+ last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
+
+ /* These exports are cleaned up by filter_disconnect(), so they
+ * need to be set up like real exports as filter_connect() does.
+ */
+ mount_age = fsd->fsd_mount_count -
+ le64_to_cpu(fcd->fcd_mount_count);
+ if (mount_age < FILTER_MOUNT_RECOV) {
+ CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
+ "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
+ "\n", fcd->fcd_uuid, cl_off,
+ last_rcvd, fsd->fsd_last_rcvd,
+ le64_to_cpu(fcd->fcd_mount_count),
+ fsd->fsd_mount_count);
+#if 0
+ /* disabled until OST recovery is actually working */
+ struct obd_export *exp = class_new_export(obd);
+ struct filter_export_data *fed;
+
+ if (!exp) {
+ rc = -ENOMEM;
+ break;
+ }
+
+ fed = &exp->exp_filter_data;
+ fed->fed_fcd = fcd;
+ filter_client_add(filter, fed, cl_off);
+ /* create helper if export init gets more complex */
+ INIT_LIST_HEAD(&fed->fed_open_head);
+ spin_lock_init(&fed->fed_lock);
+
+ fcd = NULL;
+ filter->fo_recoverable_clients++;
+#endif
+ } else {
+ CDEBUG(D_INFO,
+ "discarded client %d, UUID '%s', count %Ld\n",
+ cl_off, fcd->fcd_uuid,
+ (long long)le64_to_cpu(fcd->fcd_mount_count));
+ }
+
+ CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
+ cl_off, (unsigned long long)last_rcvd);
+
+ if (last_rcvd > filter->fo_fsd->fsd_last_rcvd)
+ filter->fo_fsd->fsd_last_rcvd = last_rcvd;
+ }
+
+ obd->obd_last_committed = filter->fo_fsd->fsd_last_rcvd;
+ if (filter->fo_recoverable_clients) {
+ CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
+ filter->fo_recoverable_clients,
+ filter->fo_fsd->fsd_last_rcvd);
+ filter->fo_next_recovery_transno = obd->obd_last_committed + 1;
+ obd->obd_flags |= OBD_RECOVERING;
+ }
+
+ if (fcd)
+ OBD_FREE(fcd, sizeof(*fcd));
+
+ fsd->fsd_mount_count++;
+
+ /* save it,so mount count and last_recvd is current */
+ rc = filter_update_server_data(filp, filter->fo_fsd);
+
+out:
+ RETURN(rc);
+
+err_fsd:
+ filter_free_server_data(filter);
+ RETURN(rc);
+}
/* setup the object store with correct subdirectories */
static int filter_prep(struct obd_device *obd)
struct file *file;
struct inode *inode;
int rc = 0;
- __u64 lastobjid = 2;
int mode = 0;
push_ctxt(&saved, &filter->fo_ctxt, NULL);
GOTO(out_O_mode, rc);
}
+ if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
+ CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
+ file->f_dentry->d_inode->i_mode);
+ GOTO(err_filp, rc = -ENOENT);
+ }
+
+ rc = fsfilt_journal_data(obd, file);
+ if (rc) {
+ CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
+ GOTO(err_filp, rc);
+ }
/* steal operations */
inode = file->f_dentry->d_inode;
filter->fo_fop = file->f_op;
filter->fo_iop = inode->i_op;
filter->fo_aops = inode->i_mapping->a_ops;
- if (inode->i_size == 0) {
- __u64 disk_lastobjid = cpu_to_le64(lastobjid);
- ssize_t retval = file->f_op->write(file,(char *)&disk_lastobjid,
- sizeof(disk_lastobjid),
- &file->f_pos);
- if (retval != sizeof(disk_lastobjid)) {
- CDEBUG(D_INODE,"OBD filter: error writing lastobjid\n");
- filp_close(file, 0);
- GOTO(out_O_mode, rc = -EIO);
- }
- } else {
- __u64 disk_lastobjid;
- ssize_t retval = file->f_op->read(file, (char *)&disk_lastobjid,
- sizeof(disk_lastobjid),
- &file->f_pos);
- if (retval != sizeof(disk_lastobjid)) {
- CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
- filp_close(file, 0);
- GOTO(out_O_mode, rc = -EIO);
- }
- lastobjid = le64_to_cpu(disk_lastobjid);
+ rc = filter_init_server_data(obd, file, INIT_OBJID);
+ if (rc) {
+ CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
+ GOTO(err_client, rc);
}
- filter->fo_lastobjid = lastobjid;
- filp_close(file, 0);
+ filter->fo_rcvd_filp = file;
rc = 0;
out:
return(rc);
+err_client:
+ class_disconnect_all(obd);
+err_filp:
+ if (filp_close(file, 0))
+ CERROR("can't close %s after error\n", LAST_RCVD);
+ filter->fo_rcvd_filp = NULL;
out_O_mode:
while (mode-- > 0) {
struct dentry *dentry = filter->fo_dentry_O_mode[mode];
{
struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
- __u64 disk_lastobjid;
long rc;
- struct file *file;
int mode;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
- if (IS_ERR(file)) {
- CERROR("OBD filter: cannot create %s\n", LAST_RCVD);
- goto out;
- }
+ /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
+ * best to start a transaction with h_sync, because we removed this
+ * from lastobjid */
- file->f_pos = 0;
- disk_lastobjid = cpu_to_le64(filter->fo_lastobjid);
- rc = file->f_op->write(file, (char *)&disk_lastobjid,
- sizeof(disk_lastobjid), &file->f_pos);
- if (rc != sizeof(disk_lastobjid))
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
+ if (rc)
CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
+ filter_free_server_data(filter);
- rc = filp_close(file, NULL);
- if (rc)
- CERROR("OBD filter: cannot close status file: rc = %ld\n", rc);
+
+ if (filter->fo_rcvd_filp) {
+ /* broken sync at umount bug workaround */
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ rc = fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
+#else
+ rc = file_fsync(filter->fo_rcvd_filp,
+ filter->fo_rcvd_filp->f_dentry, 1);
+#endif
+ filp_close(filter->fo_rcvd_filp, 0);
+ filter->fo_rcvd_filp = NULL;
+ if (rc)
+ CERROR("last_rcvd file won't closek rc = %ld\n", rc);
+ }
for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
struct dentry *dentry = filter->fo_dentry_O_mode[mode];
}
}
f_dput(filter->fo_dentry_O);
-out:
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
}
static __u64 filter_next_id(struct obd_device *obd)
{
obd_id id;
+ LASSERT(obd->u.filter.fo_fsd != NULL);
spin_lock(&obd->u.filter.fo_objidlock);
- id = ++obd->u.filter.fo_lastobjid;
+ id = ++obd->u.filter.fo_fsd->fsd_last_objid;
spin_unlock(&obd->u.filter.fo_objidlock);
return id;
/* parent i_sem is already held if needed for exclusivity */
static struct dentry *filter_fid2dentry(struct obd_device *obd,
struct dentry *dparent,
- __u64 id, int locked)
+ __u64 id, int lockit)
{
struct super_block *sb = obd->u.filter.fo_sb;
struct dentry *dchild;
}
len = sprintf(name, LPU64, id);
- CDEBUG(D_INODE, "opening object O/%*s/%s\n",
+ CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
dparent->d_name.len, dparent->d_name.name, name);
- //if (!locked)
- //down(&dparent->d_inode->i_sem);
+ if (lockit)
+ down(&dparent->d_inode->i_sem);
dchild = lookup_one_len(name, dparent, len);
- //if (!locked)
- //up(&dparent->d_inode->i_sem);
+ if (lockit)
+ up(&dparent->d_inode->i_sem);
if (IS_ERR(dchild)) {
CERROR("child lookup error %ld\n", PTR_ERR(dchild));
RETURN(dchild);
spin_unlock(&fed->fed_lock);
CDEBUG(D_INODE, "opened objid "LPX64": rc = %p\n", id, file);
-
+ EXIT;
out:
- RETURN(file);
+ return file;
out_fdd:
kmem_cache_free(filter_dentry_cache, fdd);
}
/* Caller must hold i_sem on dir_dentry->d_inode */
+/* Caller must push us into kernel context */
static int filter_destroy_internal(struct obd_device *obd,
struct dentry *dir_dentry,
struct dentry *object_dentry)
{
- struct obd_run_ctxt saved;
struct inode *inode = object_dentry->d_inode;
int rc;
ENTRY;
inode->i_nlink, atomic_read(&inode->i_count));
}
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
if (rc)
CERROR("error unlinking objid %*s: rc %d\n",
RETURN(rc);
}
-static int filter_close_internal(struct obd_device *obd,
- struct filter_file_data *ffd)
+static int filter_close_internal(struct obd_export *export,
+ struct filter_file_data *ffd,
+ struct obd_trans_info *oti)
{
+ struct obd_device *obd = export->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
struct file *filp = ffd->ffd_file;
struct dentry *object_dentry = dget(filp->f_dentry);
struct filter_dentry_data *fdd = object_dentry->d_fsdata;
- int rc, rc2 = 0;
+ int rc, rc2;
ENTRY;
LASSERT(filp->private_data == ffd);
if (atomic_dec_and_test(&fdd->fdd_open_count) &&
fdd->fdd_flags & FILTER_FLAG_DESTROY) {
struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
+ struct obd_run_ctxt saved;
+ void *handle;
down(&dir_dentry->d_inode->i_sem);
- /* XXX start transaction */
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ filter_start_transno(export);
+ handle = fsfilt_start(obd, dir_dentry->d_inode,
+ FSFILT_OP_UNLINK);
+ if (IS_ERR(handle)) {
+ rc = filter_finish_transno(export, handle, oti,
+ PTR_ERR(handle));
+ GOTO(out, rc);
+ }
/* XXX unlink from PENDING directory now too */
rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
- /* XXX finish transaction */
if (rc2 && !rc)
rc = rc2;
+ rc = filter_finish_transno(export, handle, oti, rc);
+ rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
+ if (rc2) {
+ CERROR("error on commit, err = %d\n", rc2);
+ if (!rc)
+ rc = rc2;
+ }
+ out:
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
up(&dir_dentry->d_inode->i_sem);
}
int rc = 0;
ENTRY;
- MOD_INC_USE_COUNT;
if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
- GOTO(err_dec, rc = -EINVAL);
+ RETURN(rc = -EINVAL);
obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
if (IS_ERR(obd->obd_fsops))
- GOTO(err_dec, rc = PTR_ERR(obd->obd_fsops));
+ RETURN(rc = PTR_ERR(obd->obd_fsops));
mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
rc = PTR_ERR(mnt);
if (IS_ERR(mnt))
GOTO(err_ops, rc);
+ obd->obd_flags |= OBD_REPLAYABLE;
+
filter = &obd->u.filter;;
+ init_MUTEX(&filter->fo_transno_sem);
filter->fo_vfsmnt = mnt;
filter->fo_fstype = strdup(data->ioc_inlbuf2);
filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
lock_kernel();
err_ops:
fsfilt_put_ops(obd->obd_fsops);
-err_dec:
- MOD_DEC_USE_COUNT;
return rc;
}
lock_kernel();
- MOD_DEC_USE_COUNT;
RETURN(0);
}
int filter_attach(struct obd_device *dev, obd_count len, void *data)
{
- return lprocfs_reg_obd(dev, status_var_nm_1, dev);
+ struct lprocfs_static_vars lvars;
+
+ lprocfs_init_vars(&lvars);
+ return lprocfs_obd_attach(dev, lvars.obd_vars);
}
int filter_detach(struct obd_device *dev)
{
- return lprocfs_dereg_obd(dev);
+ return lprocfs_obd_detach(dev);
}
+/* nearly identical to mds_connect */
static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
- obd_uuid_t cluuid, struct recovd_obd *recovd,
+ struct obd_uuid *cluuid, struct recovd_obd *recovd,
ptlrpc_recovery_cb_t recover)
{
struct obd_export *exp;
+ struct filter_export_data *fed;
+ struct filter_client_data *fcd;
+ struct filter_obd *filter = &obd->u.filter;
int rc;
ENTRY;
- MOD_INC_USE_COUNT;
+
+ if (!conn || !obd || !cluuid)
+ RETURN(-EINVAL);
+
rc = class_connect(conn, obd, cluuid);
if (rc)
- GOTO(out_dec, rc);
+ RETURN(rc);
exp = class_conn2export(conn);
LASSERT(exp);
+ fed = &exp->exp_filter_data;
+
+ OBD_ALLOC(fcd, sizeof(*fcd));
+ if (!fcd) {
+ CERROR("filter: out of memory for client data\n");
+ GOTO(out_export, rc = -ENOMEM);
+ }
+
+ memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
+ fed->fed_fcd = fcd;
+ fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
spin_lock_init(&exp->exp_filter_data.fed_lock);
-out:
+
+ rc = filter_client_add(filter, fed, -1);
+ if (rc)
+ GOTO(out_fcd, rc);
+
RETURN(rc);
-out_dec:
- MOD_DEC_USE_COUNT;
- goto out;
+out_fcd:
+ OBD_FREE(fcd, sizeof(*fcd));
+out_export:
+ class_disconnect(conn);
+
+ RETURN(rc);
}
+/* also incredibly similar to mds_disconnect */
static int filter_disconnect(struct lustre_handle *conn)
{
struct obd_export *exp = class_conn2export(conn);
list_del(&ffd->ffd_export_list);
spin_unlock(&fed->fed_lock);
- CERROR("force closing file %*s on disconnect\n",
+ CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
ffd->ffd_file->f_dentry->d_name.len,
- ffd->ffd_file->f_dentry->d_name.name);
+ ffd->ffd_file->f_dentry->d_name.name,
+ ffd, ffd->ffd_servercookie);
- filter_close_internal(exp->exp_obd, ffd);
+ filter_close_internal(exp, ffd, NULL);
spin_lock(&fed->fed_lock);
}
spin_unlock(&fed->fed_lock);
ldlm_cancel_locks_for_export(exp);
+ filter_client_free(exp);
+
rc = class_disconnect(conn);
- if (!rc)
- MOD_DEC_USE_COUNT;
/* XXX cleanup preallocated inodes */
RETURN(rc);
if (!dentry->d_inode) {
CERROR("%s on non-existent object: "LPX64"\n", what, oa->o_id);
f_dput(dentry);
+ LBUG();
RETURN(ERR_PTR(-ENOENT));
}
int rc = 0;
ENTRY;
- dentry = filter_oa2dentry(conn, oa, 0);
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
+
+ dentry = filter_oa2dentry(conn, oa, 1);
if (IS_ERR(dentry))
RETURN(PTR_ERR(dentry));
/* this is called from filter_truncate() until we have filter_punch() */
static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *md)
+ struct lov_stripe_md *md, struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
+ struct obd_export *export = class_conn2export(conn);
struct obd_device *obd = class_conn2obd(conn);
+ struct filter_obd *filter = &obd->u.filter;
struct dentry *dentry;
struct iattr iattr;
struct inode *inode;
- int rc;
+ void * handle;
+ int rc, rc2;
ENTRY;
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
+
dentry = filter_oa2dentry(conn, oa, 0);
if (IS_ERR(dentry))
iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
inode = dentry->d_inode;
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
lock_kernel();
if (iattr.ia_valid & ATTR_SIZE)
down(&inode->i_sem);
- /* XXX start transaction */
+ filter_start_transno(export);
+ handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
+ if (IS_ERR(handle)) {
+ rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
+ GOTO(out_unlock, rc);
+ }
+
if (inode->i_op->setattr)
rc = inode->i_op->setattr(dentry, &iattr);
else
rc = inode_setattr(inode, &iattr);
- /* XXX update last_rcvd, finish transaction */
+ rc = filter_finish_transno(export, handle, oti, rc);
+ rc2 = fsfilt_commit(obd, dentry->d_inode, handle);
+ if (rc2) {
+ CERROR("error on commit, err = %d\n", rc2);
+ if (!rc)
+ rc = rc2;
+ }
if (iattr.ia_valid & ATTR_SIZE) {
up(&inode->i_sem);
obdo_from_inode(oa, inode, oa->o_valid);
}
+out_unlock:
unlock_kernel();
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
f_dput(dentry);
RETURN(rc);
}
static int filter_open(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea)
+ struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
struct obd_export *export;
struct lustre_handle *handle;
RETURN(-EINVAL);
}
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
+
filp = filter_obj_open(export, oa->o_id, oa->o_mode);
if (IS_ERR(filp))
GOTO(out, rc = PTR_ERR(filp));
} /* filter_open */
static int filter_close(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea)
+ struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
struct obd_export *exp;
struct filter_file_data *ffd;
RETURN(-EINVAL);
}
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
+
if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
CERROR("no handle for close of objid "LPX64"\n", oa->o_id);
RETURN(-EINVAL);
list_del(&ffd->ffd_export_list);
spin_unlock(&fed->fed_lock);
- rc = filter_close_internal(exp->exp_obd, ffd);
+ rc = filter_close_internal(exp, ffd, oti);
RETURN(rc);
} /* filter_close */
static int filter_create(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md **ea)
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
+ struct obd_export *export = class_conn2export(conn);
struct obd_device *obd = class_conn2obd(conn);
+ struct filter_obd *filter = &obd->u.filter;
struct obd_run_ctxt saved;
struct dentry *dir_dentry;
struct dentry *new;
struct iattr;
- int rc;
+ void *handle;
+ int err, rc;
ENTRY;
if (!obd) {
return -EINVAL;
}
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
+
oa->o_id = filter_next_id(obd);
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
dir_dentry = filter_parent(obd, oa->o_mode);
down(&dir_dentry->d_inode->i_sem);
- new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 1);
+ new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
if (IS_ERR(new))
GOTO(out, rc = PTR_ERR(new));
GOTO(out, rc = -EEXIST);
}
- /* XXX start transaction */
+ filter_start_transno(export);
+ handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
+ if (IS_ERR(handle)) {
+ rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
+ GOTO(out_put, rc);
+ }
rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
if (rc)
+ CERROR("create failed rc = %d\n", rc);
+
+ rc = filter_finish_transno(export, handle, oti, rc);
+ err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
+ if (err) {
+ CERROR("unable to write lastobjid but file created\n");
+ if (!rc)
+ rc = err;
+ }
+ err = fsfilt_commit(obd, dir_dentry->d_inode, handle);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
+ if (!rc)
+ rc = err;
+ }
+
+ if (rc)
GOTO(out_put, rc);
- /* XXX update last_rcvd+lastobjid on disk, finish transaction */
/* Set flags for fields we have set in the inode struct */
oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
f_dput(new);
out:
up(&dir_dentry->d_inode->i_sem);
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
return rc;
}
static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea)
+ struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
+ struct obd_export *export = class_conn2export(conn);
struct obd_device *obd = class_conn2obd(conn);
+ struct filter_obd *filter = &obd->u.filter;
struct dentry *dir_dentry, *object_dentry;
struct filter_dentry_data *fdd;
- int rc;
+ struct obd_run_ctxt saved;
+ void *handle;
+ int rc, rc2;
ENTRY;
if (!obd) {
RETURN(-EINVAL);
}
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
+
CDEBUG(D_INODE, "destroying objid "LPX64"\n", oa->o_id);
dir_dentry = filter_parent(obd, oa->o_mode);
down(&dir_dentry->d_inode->i_sem);
- object_dentry = filter_oa2dentry(conn, oa, 1);
+ object_dentry = filter_oa2dentry(conn, oa, 0);
if (IS_ERR(object_dentry))
GOTO(out, rc = -ENOENT);
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ filter_start_transno(export);
+ handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
+ if (IS_ERR(handle)) {
+ rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
+ GOTO(out_ctxt, rc);
+ }
+
fdd = object_dentry->d_fsdata;
- /* XXX start transaction */
if (fdd && atomic_read(&fdd->fdd_open_count)) {
if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
fdd->fdd_flags |= FILTER_FLAG_DESTROY;
CDEBUG(D_INODE,
"repeat destroy of %dx open objid "LPX64"\n",
atomic_read(&fdd->fdd_open_count), oa->o_id);
- GOTO(out_dput, rc = 0);
+ GOTO(out_commit, rc = 0);
}
rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
-out_dput:
- /* XXX update last_rcvd on disk, finish transaction */
+
+out_commit:
+ /* XXX save last_rcvd on disk */
+ rc = filter_finish_transno(export, handle, oti, rc);
+ rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
+ if (rc2) {
+ CERROR("error on commit, err = %d\n", rc2);
+ if (!rc)
+ rc = rc2;
+ }
+out_ctxt:
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
f_dput(object_dentry);
EXIT;
/* NB start and end are used for punch, but not truncate */
static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm,
- obd_off start, obd_off end)
+ obd_off start, obd_off end,
+ struct obd_trans_info *oti)
{
int error;
ENTRY;
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
+
if (end != OBD_OBJECT_EOF)
CERROR("PUNCH not supported, only truncate works\n");
CDEBUG(D_INODE, "calling truncate for object "LPX64", valid = %x, "
"o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
oa->o_size = start;
- error = filter_setattr(conn, oa, NULL);
+ error = filter_setattr(conn, oa, NULL, oti);
RETURN(error);
}
}
#endif
-static int lustre_commit_write(struct page *page, unsigned from, unsigned to)
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+/* We should only change the file mtime (and not the ctime, like
+ * update_inode_times() in generic_file_write()) when we only change data.
+ */
+static inline void inode_update_time(struct inode *inode, int ctime_too)
+{
+ time_t now = CURRENT_TIME;
+ if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
+ return;
+ inode->i_mtime = now;
+ if (ctime_too)
+ inode->i_ctime = now;
+ mark_inode_dirty_sync(inode);
+}
+#endif
+
+static int lustre_commit_write(struct niobuf_local *lnb)
{
+ struct page *page = lnb->page;
+ unsigned from = lnb->offset & ~PAGE_MASK;
+ unsigned to = from + lnb->len;
struct inode *inode = page->mapping->host;
int err;
+ LASSERT(to <= PAGE_SIZE);
err = page->mapping->a_ops->commit_write(NULL, page, from, to);
if (!err && IS_SYNC(inode))
err = waitfor_one_page(page);
*/
if (!page) {
unsigned long addr;
- CDEBUG(D_PAGE, "ino %lu page %ld locked\n", inode->i_ino,index);
+ CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
if (!addr) {
CERROR("no memory for a temp page\n");
* pages, and the filesystems mark these buffers as BH_New if they
* were newly allocated from disk. We use the BH_New flag similarly.
*/
-static int filter_commit_write(struct page *page, unsigned from, unsigned to,
- int err)
+static int filter_commit_write(struct niobuf_local *lnb, int err)
{
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
if (err) {
unsigned block_start, block_end;
- struct buffer_head *bh, *head = page->buffers;
+ struct buffer_head *bh, *head = lnb->page->buffers;
unsigned blocksize = head->b_size;
- void *addr = page_address(page);
/* debugging: just seeing if this ever happens */
CERROR("called filter_commit_write for ino %lu:%lu on err %d\n",
- page->mapping->host->i_ino, page->index, err);
+ lnb->page->mapping->host->i_ino, lnb->page->index, err);
/* Currently one buffer per page, but in the future... */
for (bh = head, block_start = 0; bh != head || !block_start;
block_start = block_end, bh = bh->b_this_page) {
block_end = block_start + blocksize;
if (buffer_new(bh))
- memset(addr + block_start, 0, blocksize);
+ memset(lnb->addr + block_start, 0, blocksize);
}
}
#endif
- return lustre_commit_write(page, from, to);
+ return lustre_commit_write(lnb);
}
static int filter_preprw(int cmd, struct lustre_handle *conn,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_remote *nb,
- struct niobuf_local *res, void **desc_private)
+ struct niobuf_local *res, void **desc_private,
+ struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
+ struct obd_export *export;
struct obd_device *obd;
struct obd_ioobj *o;
struct niobuf_remote *rnb = nb;
int i;
ENTRY;
+ if ((cmd & OBD_BRW_WRITE) != 0)
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
+ else
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
+
memset(res, 0, niocount * sizeof(*res));
+ export = class_conn2export(conn);
obd = class_conn2obd(conn);
if (!obd) {
CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
}
if (cmd & OBD_BRW_WRITE) {
+#warning "FIXME: we need to get inode->i_sem for each object here"
+ /* Even worse, we need to get locks on mulitple inodes (in
+ * order) or use the DLM to do the locking for us (and use
+ * the same locking in filter_setattr() for truncate. The
+ * handling gets very ugly when dealing with locked pages.
+ * It may be easier to just get rid of the locked page code
+ * (which has problems of its own) and either discover we do
+ * not need it anymore (i.e. it was a symptom of another bug)
+ * or ensure we get the page locks in an appropriate order.
+ */
+ /* Danger, Will Robinson! You are taking a lock here and also
+ * starting a transaction and releasing/finishing then in
+ * filter_commitrw(), so you must call fsfilt_commit() and
+ * finish_transno() if an error occurs in this function.
+ */
+ filter_start_transno(export);
*desc_private = fsfilt_brw_start(obd, objcount, fso,
niocount, nb);
if (IS_ERR(*desc_private))
obd_kmap_get(niocount, 1);
for (i = 0, o = obj; i < objcount; i++, o++) {
- struct dentry *dentry = fso->fso_dentry;
- struct inode *inode = dentry->d_inode;
+ struct dentry *dentry;
+ struct inode *inode;
int j;
+ dentry = fso[i].fso_dentry;
+ inode = dentry->d_inode;
+
for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
struct page *page;
else
lnb->dentry = dget(dentry);
- if (cmd & OBD_BRW_WRITE)
+ if (cmd & OBD_BRW_WRITE) {
page = filter_get_page_write(inode, rnb, lnb,
&pglocked);
- else
+
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_write_bytes,
+ rnb->len);
+ } else {
page = lustre_get_page_read(inode, rnb);
- if (IS_ERR(page)) {
- if (cmd & OBD_BRW_WRITE)
- fsfilt_commit(obd, dir_dentry->d_inode,
- *desc_private);
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_read_bytes,
+ rnb->len);
+ }
- GOTO(out_pages, rc = PTR_ERR(page));
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
+ f_dput(dentry);
+ GOTO(out_pages, rc);
}
lnb->addr = page_address(page);
}
}
- if (cmd & OBD_BRW_WRITE) {
- int err = fsfilt_commit(obd, dir_dentry->d_inode,
- *desc_private);
- if (err)
- GOTO(out_pages, rc = err);
- }
-
EXIT;
out:
OBD_FREE(fso, objcount * sizeof(*fso));
out_pages:
while (lnb-- > res) {
- CERROR("error cleanup on brw\n");
+ CERROR("%d error cleanup on brw\n", rc);
if (cmd & OBD_BRW_WRITE)
- filter_commit_write(lnb->page, 0, PAGE_SIZE, rc);
+ filter_commit_write(lnb, rc);
else
lustre_put_page(lnb->page);
+ f_dput(lnb->dentry);
}
obd_kmap_put(niocount);
+ goto out_err; /* dropped the dentry refs already (one per page) */
+
out_objinfo:
for (i = 0; i < objcount && fso[i].fso_dentry; i++)
f_dput(fso[i].fso_dentry);
-
+out_err:
+ if (cmd & OBD_BRW_WRITE) {
+ filter_finish_transno(export, *desc_private, oti, rc);
+ fsfilt_commit(obd, dir_dentry->d_inode, *desc_private);
+ }
goto out;
}
{
struct page *lpage;
int rc;
+ ENTRY;
lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
if (IS_ERR(lpage)) {
rc = PTR_ERR(lpage);
CERROR("error getting locked page index %ld: rc = %d\n",
lnb->page->index, rc);
- GOTO(out, rc);
+ LBUG();
+ lustre_commit_write(lnb);
+ RETURN(rc);
}
/* lpage is kmapped in lustre_get_page_write() above and kunmapped in
* filter_get_page_write() and kunmapped in lustre_put_page() below.
*/
memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE);
- rc = lustre_commit_write(lpage, 0, PAGE_SIZE);
+ lustre_put_page(lnb->page);
+
+ lnb->page = lpage;
+ rc = lustre_commit_write(lnb);
if (rc)
CERROR("error committing locked page %ld: rc = %d\n",
lnb->page->index, rc);
-out:
- lustre_put_page(lnb->page);
- return rc;
+ RETURN(rc);
+}
+
+static int filter_sync(struct obd_device *obd)
+{
+ RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
}
static int filter_commitrw(int cmd, struct lustre_handle *conn,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *res,
- void *private)
+ void *desc_private, struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
struct obd_ioobj *o;
- struct niobuf_local *r;
+ struct niobuf_local *lnb;
+ struct obd_export *export = class_conn2export(conn);
struct obd_device *obd = class_conn2obd(conn);
int found_locked = 0;
int rc = 0;
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
LASSERT(!current->journal_info);
- current->journal_info = private;
+ current->journal_info = desc_private;
- for (i = 0, o = obj, r = res; i < objcount; i++, o++) {
+ for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
int j;
- for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) {
- struct page *page = r->page;
-
- if (!page)
- LBUG();
-
- if (r->flags & N_LOCAL_TEMP_PAGE) {
+ if (cmd & OBD_BRW_WRITE)
+ inode_update_time(lnb->dentry->d_inode, 1);
+ for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+ if (lnb->flags & N_LOCAL_TEMP_PAGE) {
found_locked++;
continue;
}
if (cmd & OBD_BRW_WRITE) {
- int err = filter_commit_write(page, 0,
- r->len, 0);
+ int err = filter_commit_write(lnb, 0);
if (!rc)
rc = err;
} else
- lustre_put_page(page);
+ lustre_put_page(lnb->page);
obd_kmap_put(1);
- f_dput(r->dentry);
+ f_dput(lnb->dentry);
}
}
- if (!found_locked)
- goto out_ctxt;
-
- for (i = 0, o = obj, r = res; i < objcount; i++, o++) {
+ for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
+ i++, o++) {
int j;
- for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) {
+ for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
int err;
- if (!(r->flags & N_LOCAL_TEMP_PAGE))
+ if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
continue;
- err = filter_write_locked_page(r);
+ err = filter_write_locked_page(lnb);
obd_kmap_put(1);
if (!rc)
rc = err;
- f_dput(r->dentry);
+ f_dput(lnb->dentry);
+ found_locked--;
}
}
-out_ctxt:
+ if (cmd & OBD_BRW_WRITE) {
+ int err;
+ struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
+
+ rc = filter_finish_transno(export, desc_private, oti, rc);
+ err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
+ if (err)
+ rc = err;
+ if (obd_sync_filter) {
+ /* this can fail with ENOMEM, what should we do then? */
+ filter_sync(obd);
+ }
+ /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
+ }
+
LASSERT(!current->journal_info);
- current->journal_info = NULL;
pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
RETURN(rc);
static int filter_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *lsm, obd_count oa_bufs,
- struct brw_page *pga, struct obd_brw_set *set)
+ struct brw_page *pga, struct obd_brw_set *set,
+ struct obd_trans_info *oti)
{
struct obd_ioobj ioo;
struct niobuf_local *lnb;
ioo.ioo_bufcnt = oa_bufs;
ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
- &desc_private);
+ &desc_private, oti);
if (ret != 0)
GOTO(out, ret);
kunmap(virt);
}
- ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private);
+ ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private,
+ oti);
out:
if (lnb)
obd = class_conn2obd(conn);
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
+
RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
}
int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
struct lustre_handle *src_conn, struct obdo *src,
- obd_size count, obd_off offset)
+ obd_size count, obd_off offset, struct obd_trans_info *oti)
{
struct page *page;
struct lov_stripe_md srcmd, dstmd;
page->index = index;
set->brw_callback = ll_brw_sync_wait;
- err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set);
+ err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
obd_brw_set_free(set);
if (err) {
EXIT;
CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
set->brw_callback = ll_brw_sync_wait;
- err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set);
+ err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
obd_brw_set_free(set);
/* XXX should handle dst->o_size, dst->o_blocks here */
static int __init obdfilter_init(void)
{
- printk(KERN_INFO "Filtering OBD driver v0.001, info@clusterfs.com\n");
+ struct lprocfs_static_vars lvars;
+
+ printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
filter_open_cache = kmem_cache_create("ll_filter_fdata",
sizeof(struct filter_file_data),
0, 0, NULL, NULL);
RETURN(-ENOMEM);
}
- return class_register_type(&filter_obd_ops, status_class_var,
+ xprocfs_init ("filter");
+
+ lprocfs_init_vars(&lvars);
+ return class_register_type(&filter_obd_ops, lvars.module_vars,
OBD_FILTER_DEVICENAME);
}
CERROR("couldn't free obdfilter dentry cache\n");
if (kmem_cache_destroy(filter_open_cache))
CERROR("couldn't free obdfilter open cache\n");
+ xprocfs_fini ();
}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Filtering OBD driver v1.0");
+MODULE_DESCRIPTION("Lustre Filtering OBD driver");
MODULE_LICENSE("GPL");
module_init(obdfilter_init);