#include <linux/random.h>
#include <linux/lustre_fsfilt.h>
#include <linux/lprocfs_status.h>
-
+#include <linux/version.h>
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+#include <linux/mount.h>
+#endif
static kmem_cache_t *filter_open_cache;
static kmem_cache_t *filter_dentry_cache;
__u64 st_create_reqs;
__u64 st_destroy_reqs;
__u64 st_statfs_reqs;
+ __u64 st_syncfs_reqs;
__u64 st_open_reqs;
__u64 st_close_reqs;
__u64 st_punch_reqs;
xprocfs_iostats[smp_processor_id()].field += (count); \
} while (0)
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
#define DECLARE_XPROCFS_SUM_STAT(field) \
static long long \
xprocfs_sum_##field (void) \
DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
+DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs)
DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
+#endif
static int
xprocfs_rd_stat (char *page, char **start, off_t off, int count,
return;
}
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
xprocfs_add_stat ("read_bytes", xprocfs_sum_st_read_bytes);
xprocfs_add_stat ("read_reqs", xprocfs_sum_st_read_reqs);
xprocfs_add_stat ("write_bytes", xprocfs_sum_st_write_bytes);
xprocfs_add_stat ("create_reqs", xprocfs_sum_st_create_reqs);
xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
xprocfs_add_stat ("statfs_reqs", xprocfs_sum_st_statfs_reqs);
+ xprocfs_add_stat ("syncfs_reqs", xprocfs_sum_st_syncfs_reqs);
xprocfs_add_stat ("open_reqs", xprocfs_sum_st_open_reqs);
xprocfs_add_stat ("close_reqs", xprocfs_sum_st_close_reqs);
xprocfs_add_stat ("punch_reqs", xprocfs_sum_st_punch_reqs);
+#endif
}
void xprocfs_fini (void)
remove_proc_entry ("create_reqs", xprocfs_dir);
remove_proc_entry ("destroy_reqs", xprocfs_dir);
remove_proc_entry ("statfs_reqs", xprocfs_dir);
+ remove_proc_entry ("syncfs_reqs", xprocfs_dir);
remove_proc_entry ("open_reqs", xprocfs_dir);
remove_proc_entry ("close_reqs", xprocfs_dir);
remove_proc_entry ("punch_reqs", xprocfs_dir);
void filter_start_transno(struct obd_export *export)
{
+#ifdef FILTER_TRANSNO_SEM
struct obd_device * obd = export->exp_obd;
ENTRY;
down(&obd->u.filter.fo_transno_sem);
+#endif
}
/* Assumes caller has already pushed us into the kernel context. */
ssize_t written;
/* Propagate error code. */
- if (rc)
- GOTO(out, rc);
+ if (rc) {
+#ifdef FILTER_TRANSNO_SEM
+ up(&filter->fo_transno_sem);
+#endif
+ RETURN(rc);
+ }
+
+ if (!(obd->obd_flags & OBD_REPLAYABLE)) {
+ RETURN(0);
+ }
/* we don't allocate new transnos for replayed requests */
#if 0
GOTO(out, rc = 0);
#endif
- off = FILTER_LR_CLIENT_START + fed->fed_lr_off * FILTER_LR_CLIENT_SIZE;
+ off = fed->fed_lr_off;
- last_rcvd = ++filter->fo_fsd->fsd_last_rcvd;
+#ifndef FILTER_TRANSNO_SEM
+ spin_lock(&filter->fo_translock);
+#endif
+ last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
+ filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
+#ifndef FILTER_TRANSNO_SEM
+ spin_unlock(&filter->fo_translock);
+#endif
if (oti)
oti->oti_transno = last_rcvd;
fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
- fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
+ fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
/* get this from oti */
#if 0
written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
&off);
CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
- LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_off, written);
+ LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
+#ifdef FILTER_TRANSNO_SEM
+ up(&filter->fo_transno_sem);
+#endif
if (written == sizeof(*fcd))
- GOTO(out, rc = 0);
- CERROR("error writing to last_rcvd file: rc = %d\n", rc);
+ RETURN(0);
+ CERROR("error writing to last_rcvd file: rc = %d\n", written);
if (written >= 0)
- GOTO(out, rc = -EIO);
-
- rc = 0;
-
- EXIT;
- out:
+ RETURN(-EIO);
- up(&filter->fo_transno_sem);
- return rc;
+ RETURN(written);
}
/* write the pathname into the string */
-static int filter_id(char *buf, obd_id id, obd_mode mode)
+static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
+ obd_mode mode)
{
- return sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
+ if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
+ sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
+ else
+ sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
+ (int)id & (filter->fo_subdir_count - 1), id);
+
+ return buf;
}
static inline void f_dput(struct dentry *dentry)
#define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
#define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
-static unsigned long filter_last_rcvd_slots[FILTER_LR_MAX_CLIENT_WORDS];
-
/* Add client data to the FILTER. We use a bitmap to locate a free space
- * in the last_rcvd file if cl_off is -1 (i.e. a new client).
+ * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
* Otherwise, we have just read the data from the last_rcvd file and
* we know its offset.
*/
int filter_client_add(struct filter_obd *filter,
- struct filter_export_data *fed, int cl_off)
+ struct filter_export_data *fed, int cl_idx)
{
- int new_client = (cl_off == -1);
+ int new_client = (cl_idx == -1);
+
+ LASSERT(filter->fo_last_rcvd_slots != NULL);
- /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
+ /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
* there's no need for extra complication here
*/
if (new_client) {
- cl_off = find_first_zero_bit(filter_last_rcvd_slots,
+ cl_idx = find_first_zero_bit(filter->fo_last_rcvd_slots,
FILTER_LR_MAX_CLIENTS);
repeat:
- if (cl_off >= FILTER_LR_MAX_CLIENTS) {
+ if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
return -ENOMEM;
}
- if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
+ if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
CERROR("FILTER client %d: found bit is set in bitmap\n",
- cl_off);
- cl_off = find_next_zero_bit(filter_last_rcvd_slots,
+ cl_idx);
+ cl_idx = find_next_zero_bit(filter->fo_last_rcvd_slots,
FILTER_LR_MAX_CLIENTS,
- cl_off);
+ cl_idx);
goto repeat;
}
} else {
- if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
+ if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
CERROR("FILTER client %d: bit already set in bitmap!\n",
- cl_off);
+ cl_idx);
LBUG();
}
}
- CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
- cl_off, fed->fed_fcd->fcd_uuid);
+ fed->fed_lr_idx = cl_idx;
+ fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
+ cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
- fed->fed_lr_off = cl_off;
+ CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
+ fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
if (new_client) {
struct obd_run_ctxt saved;
- loff_t off = FILTER_LR_CLIENT_START +
- (cl_off * FILTER_LR_CLIENT_SIZE);
+ loff_t off = fed->fed_lr_off;
ssize_t written;
+ CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
+ fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
+
push_ctxt(&saved, &filter->fo_ctxt, NULL);
written = lustre_fwrite(filter->fo_rcvd_filp,
(char *)fed->fed_fcd,
RETURN(written);
RETURN(-EIO);
}
- CDEBUG(D_INFO, "wrote client fcd at off %u (len %u)\n",
- FILTER_LR_CLIENT_START + (cl_off*FILTER_LR_CLIENT_SIZE),
- (unsigned int)sizeof(*fed->fed_fcd));
}
return 0;
}
if (!fed->fed_fcd)
RETURN(0);
- off = FILTER_LR_CLIENT_START + (fed->fed_lr_off*FILTER_LR_CLIENT_SIZE);
+ LASSERT(filter->fo_last_rcvd_slots != NULL);
+
+ off = fed->fed_lr_off;
- CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
- fed->fed_lr_off, off, fed->fed_fcd->fcd_uuid);
+ CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
+ fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
- if (!test_and_clear_bit(fed->fed_lr_off, filter_last_rcvd_slots)) {
+ if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
CERROR("FILTER client %u: bit already clear in bitmap!!\n",
- fed->fed_lr_off);
+ fed->fed_lr_idx);
LBUG();
}
sizeof(zero_fcd), &off);
/* XXX: this write gets lost sometimes, unless this sync is here. */
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
-#else
- file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
-#endif
+ file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
if (written != sizeof(zero_fcd)) {
- CERROR("error zeroing out client %s off %d in %s: %d\n",
- fed->fed_fcd->fcd_uuid, fed->fed_lr_off, LAST_RCVD,
- written);
+ CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
+ fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
+ LAST_RCVD, written);
} else {
CDEBUG(D_INFO,
- "zeroed disconnecting client %s at off %d ("LPX64")\n",
- fed->fed_fcd->fcd_uuid, fed->fed_lr_off, off);
+ "zeroed disconnecting client %s at idx %u (%llu)\n",
+ fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
}
OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
return 0;
}
-static void filter_unpack_fsd(struct filter_server_data *fsd)
-{
- fsd->fsd_last_objid = le64_to_cpu(fsd->fsd_last_objid);
- fsd->fsd_last_rcvd = le64_to_cpu(fsd->fsd_last_rcvd);
- fsd->fsd_mount_count = le64_to_cpu(fsd->fsd_mount_count);
-}
-
-static void filter_pack_fsd(struct filter_server_data *disk_fsd,
- struct filter_server_data *fsd)
-{
- memset(disk_fsd, 0, sizeof(*disk_fsd));
- memcpy(disk_fsd->fsd_uuid, fsd->fsd_uuid, sizeof(fsd->fsd_uuid));
- disk_fsd->fsd_last_objid = cpu_to_le64(fsd->fsd_last_objid);
- disk_fsd->fsd_last_rcvd = cpu_to_le64(fsd->fsd_last_rcvd);
- disk_fsd->fsd_mount_count = cpu_to_le64(fsd->fsd_mount_count);
-}
-
static int filter_free_server_data(struct filter_obd *filter)
{
OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
filter->fo_fsd = NULL;
-
+ OBD_FREE(filter->fo_last_rcvd_slots,
+ FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+ filter->fo_last_rcvd_slots = NULL;
return 0;
}
-/* assumes caller has already in kernel ctxt */
+/* assumes caller is already in kernel ctxt */
static int filter_update_server_data(struct file *filp,
struct filter_server_data *fsd)
{
- struct filter_server_data disk_fsd;
loff_t off = 0;
int rc;
CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
- CDEBUG(D_INODE, "server last_objid: "LPU64"\n", fsd->fsd_last_objid);
- CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", fsd->fsd_last_rcvd);
- CDEBUG(D_INODE, "server last_mount: "LPU64"\n", fsd->fsd_mount_count);
-
- filter_pack_fsd(&disk_fsd, fsd);
- rc = lustre_fwrite(filp, (char *)&disk_fsd,
- sizeof(disk_fsd), &off);
- if (rc != sizeof(disk_fsd)) {
+ CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
+ le64_to_cpu(fsd->fsd_last_objid));
+ CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
+ le64_to_cpu(fsd->fsd_last_rcvd));
+ CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
+ le64_to_cpu(fsd->fsd_mount_count));
+
+ rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
+ if (rc != sizeof(*fsd)) {
CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
rc);
RETURN(-EIO);
}
/* assumes caller has already in kernel ctxt */
-static int filter_init_server_data(struct obd_device *obd,
- struct file * filp,
+static int filter_init_server_data(struct obd_device *obd, struct file * filp,
__u64 init_lastobjid)
{
struct filter_obd *filter = &obd->u.filter;
struct filter_client_data *fcd = NULL;
struct inode *inode = filp->f_dentry->d_inode;
unsigned long last_rcvd_size = inode->i_size;
- int cl_off;
+ __u64 mount_count;
+ int cl_idx;
loff_t off = 0;
int rc;
RETURN(-ENOMEM);
filter->fo_fsd = fsd;
+ OBD_ALLOC(filter->fo_last_rcvd_slots,
+ FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+ if (filter->fo_last_rcvd_slots == NULL) {
+ OBD_FREE(fsd, sizeof(*fsd));
+ RETURN(-ENOMEM);
+ }
+
if (last_rcvd_size == 0) {
CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
- fsd->fsd_last_objid = init_lastobjid;
+ fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
fsd->fsd_last_rcvd = 0;
- fsd->fsd_mount_count = 0;
-
+ mount_count = fsd->fsd_mount_count = 0;
+ fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
+ fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
+ fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
+ fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
+ filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
} else {
- ssize_t retval = lustre_fread(filp, (char *)fsd,
- sizeof(*fsd),
+ ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
&off);
if (retval != sizeof(*fsd)) {
CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
GOTO(out, rc = -EIO);
}
- filter_unpack_fsd(fsd);
+ mount_count = le64_to_cpu(fsd->fsd_mount_count);
+ filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
+ }
+
+ if (fsd->fsd_feature_incompat) {
+ CERROR("unsupported feature %x\n",
+ le32_to_cpu(fsd->fsd_feature_incompat));
+ RETURN(-EINVAL);
+ }
+ if (fsd->fsd_feature_rocompat) {
+ CERROR("read-only feature %x\n",
+ le32_to_cpu(fsd->fsd_feature_rocompat));
+ /* Do something like remount filesystem read-only */
+ RETURN(-EINVAL);
}
CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
- obd->obd_name, fsd->fsd_last_objid);
+ obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
- obd->obd_name, fsd->fsd_last_rcvd);
+ obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
- obd->obd_name, fsd->fsd_mount_count);
+ obd->obd_name, mount_count);
+ CDEBUG(D_INODE, "%s: server data size: %u\n",
+ obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
+ CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+ obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
+ CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+ obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
+ CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
+ obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
/*
* When we do a clean FILTER shutdown, we save the last_rcvd into
* the header. If we find clients with higher last_rcvd values
* then those clients may need recovery done.
*/
- /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
- for (off = FILTER_LR_CLIENT_START, cl_off = 0; off < last_rcvd_size;
- cl_off++) {
- __u64 last_rcvd;
- int mount_age;
-
- if (!fcd) {
- OBD_ALLOC(fcd, sizeof(*fcd));
- if (!fcd)
- GOTO(err_fsd, rc = -ENOMEM);
- }
-
- rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
- if (rc != sizeof(*fcd)) {
- CERROR("error reading FILTER %s offset %d: rc = %d\n",
- LAST_RCVD, cl_off, rc);
- if (rc > 0) /* XXX fatal error or just abort reading? */
- rc = -EIO;
- break;
- }
-
- if (fcd->fcd_uuid[0] == '\0') {
- CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
- cl_off);
- continue;
- }
+ if (obd->obd_flags & OBD_REPLAYABLE) {
+ for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
+ __u64 last_rcvd;
+ int mount_age;
+
+ if (!fcd) {
+ OBD_ALLOC(fcd, sizeof(*fcd));
+ if (!fcd)
+ GOTO(err_fsd, rc = -ENOMEM);
+ }
- last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
+ /* Don't assume off is incremented properly, in case
+ * sizeof(fsd) isn't the same as fsd->fsd_client_size.
+ */
+ off = le32_to_cpu(fsd->fsd_client_start) +
+ cl_idx * le16_to_cpu(fsd->fsd_client_size);
+ rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
+ if (rc != sizeof(*fcd)) {
+ CERROR("error reading FILTER %s offset %d: rc = %d\n",
+ LAST_RCVD, cl_idx, rc);
+ if (rc > 0) /* XXX fatal error or just abort reading? */
+ rc = -EIO;
+ break;
+ }
- /* These exports are cleaned up by filter_disconnect(), so they
- * need to be set up like real exports as filter_connect() does.
- */
- mount_age = fsd->fsd_mount_count -
- le64_to_cpu(fcd->fcd_mount_count);
- if (mount_age < FILTER_MOUNT_RECOV) {
- CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
- "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
- "\n", fcd->fcd_uuid, cl_off,
- last_rcvd, fsd->fsd_last_rcvd,
- le64_to_cpu(fcd->fcd_mount_count),
- fsd->fsd_mount_count);
-#if 0
- /* disabled until OST recovery is actually working */
- struct obd_export *exp = class_new_export(obd);
- struct filter_export_data *fed;
+ if (fcd->fcd_uuid[0] == '\0') {
+ CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+ cl_idx);
+ continue;
+ }
- if (!exp) {
- rc = -ENOMEM;
- break;
+ last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
+
+ /* These exports are cleaned up by filter_disconnect(), so they
+ * need to be set up like real exports as filter_connect() does.
+ */
+ mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
+ if (mount_age < FILTER_MOUNT_RECOV) {
+ struct obd_export *exp = class_new_export(obd);
+ struct filter_export_data *fed;
+ CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+ " srv lr: "LPU64" mnt: "LPU64" last mount: "
+ LPU64"\n", fcd->fcd_uuid, cl_idx,
+ last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
+ le64_to_cpu(fcd->fcd_mount_count), mount_count);
+ /* disabled until OST recovery is actually working */
+
+ if (!exp) {
+ rc = -ENOMEM;
+ break;
+ }
+ memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
+ sizeof exp->exp_client_uuid.uuid);
+ fed = &exp->exp_filter_data;
+ fed->fed_fcd = fcd;
+ filter_client_add(filter, fed, cl_idx);
+ /* create helper if export init gets more complex */
+ INIT_LIST_HEAD(&fed->fed_open_head);
+ spin_lock_init(&fed->fed_lock);
+
+ fcd = NULL;
+ obd->obd_recoverable_clients++;
+ } else {
+ CDEBUG(D_INFO,
+ "discarded client %d UUID '%s' count "LPU64"\n",
+ cl_idx, fcd->fcd_uuid,
+ le64_to_cpu(fcd->fcd_mount_count));
}
- fed = &exp->exp_filter_data;
- fed->fed_fcd = fcd;
- filter_client_add(filter, fed, cl_off);
- /* create helper if export init gets more complex */
- INIT_LIST_HEAD(&fed->fed_open_head);
- spin_lock_init(&fed->fed_lock);
+ CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
+ cl_idx, last_rcvd);
- fcd = NULL;
- filter->fo_recoverable_clients++;
-#endif
- } else {
- CDEBUG(D_INFO,
- "discarded client %d, UUID '%s', count %Ld\n",
- cl_off, fcd->fcd_uuid,
- (long long)le64_to_cpu(fcd->fcd_mount_count));
+ if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
+ filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
}
- CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
- cl_off, (unsigned long long)last_rcvd);
+ obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
+ if (obd->obd_recoverable_clients) {
+ CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
+ obd->obd_recoverable_clients,
+ le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
+ obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+ obd->obd_flags |= OBD_RECOVERING;
+ }
- if (last_rcvd > filter->fo_fsd->fsd_last_rcvd)
- filter->fo_fsd->fsd_last_rcvd = last_rcvd;
- }
+ if (fcd)
+ OBD_FREE(fcd, sizeof(*fcd));
- obd->obd_last_committed = filter->fo_fsd->fsd_last_rcvd;
- if (filter->fo_recoverable_clients) {
- CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
- filter->fo_recoverable_clients,
- filter->fo_fsd->fsd_last_rcvd);
- filter->fo_next_recovery_transno = obd->obd_last_committed + 1;
- obd->obd_flags |= OBD_RECOVERING;
+ } else {
+ CERROR("%s: recovery support OFF\n", obd->obd_name);
}
- if (fcd)
- OBD_FREE(fcd, sizeof(*fcd));
-
- fsd->fsd_mount_count++;
+ fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
/* save it,so mount count and last_recvd is current */
rc = filter_update_server_data(filp, filter->fo_fsd);
{
struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
- struct dentry *dentry;
+ struct dentry *dentry, *O_dentry;
struct file *file;
struct inode *inode;
+ int i;
int rc = 0;
int mode = 0;
* Create directories and/or get dentries for each object type.
* This saves us from having to do multiple lookups for each one.
*/
+ O_dentry = filter->fo_dentry_O;
for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
char *name = obd_type_by_mode[mode];
filter->fo_dentry_O_mode[mode] = NULL;
continue;
}
- dentry = simple_mkdir(filter->fo_dentry_O, name, 0700);
+ dentry = simple_mkdir(O_dentry, name, 0700);
CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
CERROR("cannot create O/%s: rc = %d\n", name, rc);
- GOTO(out_O_mode, rc);
+ GOTO(err_O_mode, rc);
}
filter->fo_dentry_O_mode[mode] = dentry;
}
file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
- if ( !file || IS_ERR(file) ) {
+ if (!file || IS_ERR(file)) {
rc = PTR_ERR(file);
CERROR("OBD filter: cannot open/create %s: rc = %d\n",
LAST_RCVD, rc);
- GOTO(out_O_mode, rc);
+ GOTO(err_O_mode, rc);
}
if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
}
filter->fo_rcvd_filp = file;
+ if (filter->fo_subdir_count) {
+ O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
+ OBD_ALLOC(filter->fo_dentry_O_sub,
+ FILTER_SUBDIR_COUNT * sizeof(dentry));
+ if (!filter->fo_dentry_O_sub)
+ GOTO(err_client, rc = -ENOMEM);
+
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ char dir[20];
+ snprintf(dir, sizeof(dir), "d%u", i);
+
+ dentry = simple_mkdir(O_dentry, dir, 0700);
+ CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
+ if (IS_ERR(dentry)) {
+ rc = PTR_ERR(dentry);
+ CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
+ GOTO(err_O_sub, rc);
+ }
+ filter->fo_dentry_O_sub[i] = dentry;
+ }
+ }
rc = 0;
out:
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
return(rc);
+err_O_sub:
+ while (i-- > 0) {
+ struct dentry *dentry = filter->fo_dentry_O_sub[i];
+ if (dentry) {
+ f_dput(dentry);
+ filter->fo_dentry_O_sub[i] = NULL;
+ }
+ }
+ OBD_FREE(filter->fo_dentry_O_sub,
+ filter->fo_subdir_count * sizeof(dentry));
err_client:
class_disconnect_all(obd);
err_filp:
if (filp_close(file, 0))
CERROR("can't close %s after error\n", LAST_RCVD);
filter->fo_rcvd_filp = NULL;
- out_O_mode:
+err_O_mode:
while (mode-- > 0) {
struct dentry *dentry = filter->fo_dentry_O_mode[mode];
if (dentry) {
rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
if (rc)
CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
- filter_free_server_data(filter);
if (filter->fo_rcvd_filp) {
- /* broken sync at umount bug workaround */
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- rc = fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
-#else
rc = file_fsync(filter->fo_rcvd_filp,
filter->fo_rcvd_filp->f_dentry, 1);
-#endif
filp_close(filter->fo_rcvd_filp, 0);
filter->fo_rcvd_filp = NULL;
if (rc)
- CERROR("last_rcvd file won't closek rc = %ld\n", rc);
+ CERROR("last_rcvd file won't closed rc = %ld\n", rc);
}
+ if (filter->fo_subdir_count) {
+ int i;
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ struct dentry *dentry = filter->fo_dentry_O_sub[i];
+ f_dput(dentry);
+ filter->fo_dentry_O_sub[i] = NULL;
+ }
+ OBD_FREE(filter->fo_dentry_O_sub,
+ filter->fo_subdir_count *
+ sizeof(*filter->fo_dentry_O_sub));
+ }
for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
struct dentry *dentry = filter->fo_dentry_O_mode[mode];
if (dentry) {
}
}
f_dput(filter->fo_dentry_O);
+ filter_free_server_data(filter);
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
}
LASSERT(obd->u.filter.fo_fsd != NULL);
spin_lock(&obd->u.filter.fo_objidlock);
- id = ++obd->u.filter.fo_fsd->fsd_last_objid;
+ id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid);
+ obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
spin_unlock(&obd->u.filter.fo_objidlock);
return id;
}
static inline struct dentry *filter_parent(struct obd_device *obd,
- obd_mode mode)
+ obd_mode mode, obd_id objid)
{
struct filter_obd *filter = &obd->u.filter;
LASSERT((mode & S_IFMT) == S_IFREG); /* only regular files for now */
- return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+ if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
+ return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+
+ return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
}
static struct file *filter_obj_open(struct obd_export *export,
GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
}
- filter_id(name, id, type);
push_ctxt(&saved, &filter->fo_ctxt, NULL);
- file = filp_open(name, O_RDWR | O_LARGEFILE, 0 /* type? */);
+ file = filp_open(filter_id(name, filter, id, type),
+ O_RDWR | O_LARGEFILE, type);
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
if (IS_ERR(file)) {
LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
/* should only happen during client recovery */
if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
- CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id);
+ CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
atomic_inc(&fdd->fdd_open_count);
} else {
atomic_set(&fdd->fdd_open_count, 1);
fdd->fdd_flags = 0;
+ fdd->fdd_objid = id;
/* If this is racy, then we can use {cmp}xchg and atomic_add */
dentry->d_fsdata = fdd;
spin_unlock(&filter->fo_fddlock);
get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
ffd->ffd_file = file;
+ LASSERT(file->private_data == NULL);
file->private_data = ffd;
if (!dentry->d_op)
list_add(&ffd->ffd_export_list, &fed->fed_open_head);
spin_unlock(&fed->fed_lock);
- CDEBUG(D_INODE, "opened objid "LPX64": rc = %p\n", id, file);
+ CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
EXIT;
out:
return file;
if (atomic_dec_and_test(&fdd->fdd_open_count) &&
fdd->fdd_flags & FILTER_FLAG_DESTROY) {
- struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
+ struct dentry *dir_dentry = filter_parent(obd, S_IFREG, fdd->fdd_objid);
struct obd_run_ctxt saved;
void *handle;
/* obd methods */
/* mount the file system (secretly) */
-static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
+static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
+ char *option)
{
struct obd_ioctl_data* data = buf;
struct filter_obd *filter;
ENTRY;
if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
- RETURN(rc = -EINVAL);
+ RETURN(-EINVAL);
obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
if (IS_ERR(obd->obd_fsops))
- RETURN(rc = PTR_ERR(obd->obd_fsops));
+ RETURN(PTR_ERR(obd->obd_fsops));
- mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
+ mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
rc = PTR_ERR(mnt);
- if (IS_ERR(mnt))
+ if (IS_ERR(mnt)) {
+ CERROR("mount of %s as type %s failed: rc %d\n",
+ data->ioc_inlbuf2, data->ioc_inlbuf1, rc);
GOTO(err_ops, rc);
+ }
+#if OST_RECOVERY
obd->obd_flags |= OBD_REPLAYABLE;
+#endif
filter = &obd->u.filter;;
- init_MUTEX(&filter->fo_transno_sem);
filter->fo_vfsmnt = mnt;
filter->fo_fstype = strdup(data->ioc_inlbuf2);
filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
if (rc)
GOTO(err_kfree, rc);
+#ifdef FILTER_TRANSNO_SEM
+ init_MUTEX(&filter->fo_transno_sem);
+#else
+ spin_lock_init(&filter->fo_translock);
+#endif
spin_lock_init(&filter->fo_fddlock);
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
return rc;
}
+static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+ return filter_common_setup(obd, len, buf, NULL);
+}
+
+/* sanobd setup methods - use a specific mount option */
+static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+ struct obd_ioctl_data* data = buf;
+ char *option = NULL;
+
+ if (!data->ioc_inlbuf2)
+ RETURN(-EINVAL);
+
+ /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
+ if (!strcmp(data->ioc_inlbuf2, "extN") ||
+ !strcmp(data->ioc_inlbuf2, "ext3"))
+ option = "data=writeback";
+ else
+ LBUG(); /* just a reminder */
+
+ return filter_common_setup(obd, len, buf, option);
+}
static int filter_cleanup(struct obd_device *obd)
{
INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
spin_lock_init(&exp->exp_filter_data.fed_lock);
- rc = filter_client_add(filter, fed, -1);
- if (rc)
- GOTO(out_fcd, rc);
+ if (obd->obd_flags & OBD_REPLAYABLE) {
+ rc = filter_client_add(filter, fed, -1);
+ if (rc)
+ GOTO(out_fcd, rc);
+ }
RETURN(rc);
spin_unlock(&fed->fed_lock);
ldlm_cancel_locks_for_export(exp);
- filter_client_free(exp);
+
+ if (exp->exp_obd->obd_flags & OBD_REPLAYABLE)
+ filter_client_free(exp);
rc = class_disconnect(conn);
int type = oa->o_mode & S_IFMT;
ENTRY;
- CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPX64" valid 0x%08x\n",
+ CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
inode->i_ino, inode, oa->o_id, valid);
/* Don't copy the inode number in place of the object ID */
obdo_from_inode(oa, inode, valid);
CERROR("invalid client "LPX64"\n", conn->addr);
RETURN(ERR_PTR(-EINVAL));
}
- dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
+ dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
+ oa->o_id),
oa->o_id, locked);
}
if (IS_ERR(dentry)) {
- CERROR("%s error looking up object: "LPX64"\n", what, oa->o_id);
+ CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
RETURN(dentry);
}
if (!dentry->d_inode) {
- CERROR("%s on non-existent object: "LPX64"\n", what, oa->o_id);
+ CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
f_dput(dentry);
LBUG();
RETURN(ERR_PTR(-ENOENT));
XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
- CERROR("no handle for close of objid "LPX64"\n", oa->o_id);
+ CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
RETURN(-EINVAL);
}
oa->o_id = filter_next_id(obd);
push_ctxt(&saved, &filter->fo_ctxt, NULL);
- dir_dentry = filter_parent(obd, oa->o_mode);
+ dir_dentry = filter_parent(obd, S_IFREG, oa->o_id);
down(&dir_dentry->d_inode->i_sem);
new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
if (IS_ERR(new))
GOTO(out, rc = PTR_ERR(new));
if (new->d_inode) {
+ char buf[32];
+
/* This would only happen if lastobjid was bad on disk */
- CERROR("objid O/%*s/"LPU64" already exists\n",
- dir_dentry->d_name.len, dir_dentry->d_name.name,
- oa->o_id);
+ CERROR("objid %s already exists\n",
+ filter_id(buf, filter, S_IFREG, oa->o_id));
LBUG();
GOTO(out, rc = -EEXIST);
}
XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
- CDEBUG(D_INODE, "destroying objid "LPX64"\n", oa->o_id);
+ CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
- dir_dentry = filter_parent(obd, oa->o_mode);
+ dir_dentry = filter_parent(obd, oa->o_mode, oa->o_id);
down(&dir_dentry->d_inode->i_sem);
object_dentry = filter_oa2dentry(conn, oa, 0);
fdd->fdd_flags |= FILTER_FLAG_DESTROY;
/* XXX put into PENDING directory in case of crash */
CDEBUG(D_INODE,
- "defer destroy of %dx open objid "LPX64"\n",
+ "defer destroy of %dx open objid "LPU64"\n",
atomic_read(&fdd->fdd_open_count), oa->o_id);
} else
CDEBUG(D_INODE,
- "repeat destroy of %dx open objid "LPX64"\n",
+ "repeat destroy of %dx open objid "LPU64"\n",
atomic_read(&fdd->fdd_open_count), oa->o_id);
GOTO(out_commit, rc = 0);
}
if (end != OBD_OBJECT_EOF)
CERROR("PUNCH not supported, only truncate works\n");
- CDEBUG(D_INODE, "calling truncate for object "LPX64", valid = %x, "
+ CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
"o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
oa->o_size = start;
error = filter_setattr(conn, oa, NULL, oti);
static struct page *
-lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb)
+lustre_get_page_read(struct inode *inode, struct niobuf_local *lnb)
{
- unsigned long index = rnb->offset >> PAGE_SHIFT;
+ unsigned long index = lnb->offset >> PAGE_SHIFT;
struct address_space *mapping = inode->i_mapping;
struct page *page;
int rc;
(filler_t*)mapping->a_ops->readpage, NULL);
if (!IS_ERR(page)) {
wait_on_page(page);
- kmap(page);
+ lnb->addr = kmap(page);
+ lnb->page = page;
if (!PageUptodate(page)) {
CERROR("page index %lu not uptodate\n", index);
GOTO(err_page, rc = -EIO);
}
struct page *filter_get_page_write(struct inode *inode,
- struct niobuf_remote *rnb,
struct niobuf_local *lnb, int *pglocked)
{
- unsigned long index = rnb->offset >> PAGE_SHIFT;
+ unsigned long index = lnb->offset >> PAGE_SHIFT;
struct address_space *mapping = inode->i_mapping;
-
struct page *page;
int rc;
addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
if (!addr) {
CERROR("no memory for a temp page\n");
- LBUG();
GOTO(err, rc = -ENOMEM);
}
- /* XXX debugging */
- memset((void *)addr, 0xBA, PAGE_SIZE);
+ POISON((void *)addr, 0xBA, PAGE_SIZE);
page = virt_to_page(addr);
kmap(page);
page->index = index;
+ lnb->addr = (void *)addr;
+ lnb->page = page;
lnb->flags |= N_LOCAL_TEMP_PAGE;
} else if (!IS_ERR(page)) {
(*pglocked)++;
kmap(page);
rc = mapping->a_ops->prepare_write(NULL, page,
- rnb->offset % PAGE_SIZE,
- rnb->len);
+ lnb->offset & ~PAGE_MASK,
+ lnb->len);
if (rc) {
- CERROR("page index %lu, rc = %d\n", index, rc);
if (rc != -ENOSPC)
- LBUG();
+ CERROR("page index %lu, rc = %d\n", index, rc);
GOTO(err_unlock, rc);
}
/* XXX not sure if we need this if we are overwriting page */
LBUG();
GOTO(err_unlock, rc = -EIO);
}
+ lnb->addr = page_address(page);
+ lnb->page = page;
}
+
return page;
err_unlock:
unsigned blocksize = head->b_size;
/* debugging: just seeing if this ever happens */
- CERROR("called filter_commit_write for ino %lu:%lu on err %d\n",
+ CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
+ "called for ino %lu:%lu on err %d\n",
lnb->page->mapping->host->i_ino, lnb->page->index, err);
/* Currently one buffer per page, but in the future... */
struct obd_ioobj *o;
struct niobuf_remote *rnb = nb;
struct niobuf_local *lnb = res;
- struct dentry *dir_dentry;
struct fsfilt_objinfo *fso;
int pglocked = 0;
int rc = 0;
RETURN(-ENOMEM);
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- dir_dentry = filter_parent(obd, S_IFREG);
for (i = 0, o = obj; i < objcount; i++, o++) {
struct filter_dentry_data *fdd;
LASSERT(o->ioo_bufcnt);
- dentry = filter_fid2dentry(obd, dir_dentry, o->ioo_id, 0);
+ dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
+ o->ioo_id),
+ o->ioo_id, 0);
if (IS_ERR(dentry))
GOTO(out_objinfo, rc = PTR_ERR(dentry));
if (!dentry->d_inode) {
CERROR("trying to BRW to non-existent file "LPU64"\n",
o->ioo_id);
+ f_dput(dentry);
GOTO(out_objinfo, rc = -ENOENT);
}
fdd = dentry->d_fsdata;
if (!fdd || !atomic_read(&fdd->fdd_open_count))
- CDEBUG(D_PAGE, "I/O to unopened object "LPX64"\n",
+ CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
o->ioo_id);
}
if (cmd & OBD_BRW_WRITE) {
-#warning "FIXME: we need to get inode->i_sem for each object here"
+#warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
/* Even worse, we need to get locks on mulitple inodes (in
* order) or use the DLM to do the locking for us (and use
* the same locking in filter_setattr() for truncate. The
filter_start_transno(export);
*desc_private = fsfilt_brw_start(obd, objcount, fso,
niocount, nb);
- if (IS_ERR(*desc_private))
- GOTO(out_objinfo, rc = PTR_ERR(*desc_private));
+ if (IS_ERR(*desc_private)) {
+ rc = PTR_ERR(*desc_private);
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+ "error starting transaction: rc = %d\n", rc);
+ *desc_private = NULL;
+ GOTO(out_objinfo, rc);
+ }
}
obd_kmap_get(niocount, 1);
else
lnb->dentry = dget(dentry);
+ /* lnb->offset is aligned, while rnb->offset isn't,
+ * and we need to copy the fields to lnb anyways.
+ */
+ memcpy(lnb, rnb, sizeof(*rnb));
if (cmd & OBD_BRW_WRITE) {
- page = filter_get_page_write(inode, rnb, lnb,
+ page = filter_get_page_write(inode, lnb,
&pglocked);
- XPROCFS_BUMP_MYCPU_IOSTAT (st_write_bytes,
- rnb->len);
+ XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
+ lnb->len);
} else {
- page = lustre_get_page_read(inode, rnb);
+ page = lustre_get_page_read(inode, lnb);
- XPROCFS_BUMP_MYCPU_IOSTAT (st_read_bytes,
- rnb->len);
+ XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
+ lnb->len);
}
if (IS_ERR(page)) {
rc = PTR_ERR(page);
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+ "error on page @"LPU64"%u/%u: rc = %d\n",
+ lnb->offset, j, o->ioo_bufcnt, rc);
f_dput(dentry);
GOTO(out_pages, rc);
}
-
- lnb->addr = page_address(page);
- lnb->offset = rnb->offset;
- lnb->page = page;
- lnb->len = rnb->len;
}
}
out_pages:
while (lnb-- > res) {
- CERROR("%d error cleanup on brw\n", rc);
if (cmd & OBD_BRW_WRITE)
filter_commit_write(lnb, rc);
else
f_dput(lnb->dentry);
}
obd_kmap_put(niocount);
- goto out_err; /* dropped the dentry refs already (one per page) */
+ if (cmd & OBD_BRW_WRITE) {
+ filter_finish_transno(export, *desc_private, oti, rc);
+ fsfilt_commit(obd,
+ filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
+ *desc_private);
+ }
+ goto out; /* dropped the dentry refs already (one per page) */
out_objinfo:
for (i = 0; i < objcount && fso[i].fso_dentry; i++)
f_dput(fso[i].fso_dentry);
-out_err:
- if (cmd & OBD_BRW_WRITE) {
- filter_finish_transno(export, *desc_private, oti, rc);
- fsfilt_commit(obd, dir_dentry->d_inode, *desc_private);
- }
goto out;
}
RETURN(rc);
}
-static int filter_sync(struct obd_device *obd)
+static int filter_syncfs(struct lustre_handle *conn)
{
+ struct obd_device *obd;
+ ENTRY;
+
+ obd = class_conn2obd(conn);
+
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
+
RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
}
}
if (cmd & OBD_BRW_WRITE) {
+ /* We just want any dentry for the commit, for now */
+ struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0);
int err;
- struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
rc = filter_finish_transno(export, desc_private, oti, rc);
err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
rc = err;
if (obd_sync_filter) {
/* this can fail with ENOMEM, what should we do then? */
- filter_sync(obd);
+ filter_syncfs(conn);
}
/* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
}
RETURN(ret);
}
+static int filter_san_preprw(int cmd, struct lustre_handle *conn,
+ int objcount, struct obd_ioobj *obj,
+ int niocount, struct niobuf_remote *nb)
+{
+ struct obd_device *obd;
+ struct obd_ioobj *o = obj;
+ struct niobuf_remote *rnb = nb;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ if ((cmd & OBD_BRW_WRITE) != 0)
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
+ else
+ XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
+
+ obd = class_conn2obd(conn);
+ if (!obd) {
+ CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
+ RETURN(-EINVAL);
+ }
+
+ for (i = 0; i < objcount; i++, o++) {
+ struct dentry *dentry;
+ struct inode *inode;
+ int j;
+
+ dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
+ o->ioo_id),
+ o->ioo_id, 0);
+ if (IS_ERR(dentry))
+ GOTO(out, rc = PTR_ERR(dentry));
+ inode = dentry->d_inode;
+ if (!inode) {
+ CERROR("trying to BRW to non-existent file "LPU64"\n",
+ o->ioo_id);
+ f_dput(dentry);
+ GOTO(out, rc = -ENOENT);
+ }
+
+ for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
+ long block;
+
+ block = rnb->offset >> PAGE_SHIFT;
+
+ if (cmd == OBD_BRW_READ) {
+ block = inode->i_mapping->a_ops->bmap(
+ inode->i_mapping, block);
+ } else {
+ loff_t newsize = rnb->offset + rnb->len;
+ /* fs_prep_san_write will also update inode
+ * size for us:
+ * (1) new alloced block
+ * (2) existed block but size extented
+ */
+ /* FIXME We could call fs_prep_san_write()
+ * only once for all the blocks allocation.
+ * Now call it once for each block, for
+ * simplicity. And if error happens, we
+ * probably need to release previous alloced
+ * block */
+ rc = fs_prep_san_write(obd, inode, &block,
+ 1, newsize);
+ if (rc)
+ break;
+ }
+
+ rnb->offset = block;
+ }
+ f_dput(dentry);
+ }
+out:
+ RETURN(rc);
+}
+
static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
{
struct obd_device *obd;
o_connect: filter_connect,
o_disconnect: filter_disconnect,
o_statfs: filter_statfs,
+ o_syncfs: filter_syncfs,
o_getattr: filter_getattr,
o_create: filter_create,
o_setattr: filter_setattr,
o_preprw: filter_preprw,
o_commitrw: filter_commitrw
#if 0
+ o_san_preprw: filter_san_preprw,
+ o_preallocate: filter_preallocate_inodes,
+ o_migrate: filter_migrate,
+ o_copy: filter_copy_data,
+ o_iterate: filter_iterate
+#endif
+};
+
+static struct obd_ops filter_sanobd_ops = {
+ o_owner: THIS_MODULE,
+ o_attach: filter_attach,
+ o_detach: filter_detach,
+ o_get_info: filter_get_info,
+ o_setup: filter_san_setup,
+ o_cleanup: filter_cleanup,
+ o_connect: filter_connect,
+ o_disconnect: filter_disconnect,
+ o_statfs: filter_statfs,
+ o_getattr: filter_getattr,
+ o_create: filter_create,
+ o_setattr: filter_setattr,
+ o_destroy: filter_destroy,
+ o_open: filter_open,
+ o_close: filter_close,
+ o_brw: filter_brw,
+ o_punch: filter_truncate,
+ o_preprw: filter_preprw,
+ o_commitrw: filter_commitrw,
+ o_san_preprw: filter_san_preprw,
+#if 0
o_preallocate: filter_preallocate_inodes,
o_migrate: filter_migrate,
o_copy: filter_copy_data,
static int __init obdfilter_init(void)
{
struct lprocfs_static_vars lvars;
+ int rc;
printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
filter_open_cache = kmem_cache_create("ll_filter_fdata",
sizeof(struct filter_dentry_data),
0, 0, NULL, NULL);
if (!filter_dentry_cache) {
- kmem_cache_destroy(filter_open_cache);
- RETURN(-ENOMEM);
+ rc = -ENOMEM;
+ goto err1;
}
xprocfs_init ("filter");
lprocfs_init_vars(&lvars);
- return class_register_type(&filter_obd_ops, lvars.module_vars,
- OBD_FILTER_DEVICENAME);
+
+ rc = class_register_type(&filter_obd_ops, lvars.module_vars,
+ OBD_FILTER_DEVICENAME);
+ if (rc)
+ goto err2;
+
+ rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
+ OBD_FILTER_SAN_DEVICENAME);
+ if (rc)
+ goto err3;
+
+ return 0;
+err3:
+ class_unregister_type(OBD_FILTER_DEVICENAME);
+err2:
+ kmem_cache_destroy(filter_dentry_cache);
+err1:
+ kmem_cache_destroy(filter_open_cache);
+ return rc;
}
static void __exit obdfilter_exit(void)
{
+ class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
class_unregister_type(OBD_FILTER_DEVICENAME);
if (kmem_cache_destroy(filter_dentry_cache))
CERROR("couldn't free obdfilter dentry cache\n");