X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter.c;h=8486c22db43d461f410bbeb8ab6fe259f39cc68b;hp=591005e1ee27433333ea2bc7ab1c6fe6d6a5c9f5;hb=040033cef24c5aca2967daf2da7a862abcd074cf;hpb=5c0b2855f617d17e3c0749244edeca1b706f8a7e diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 591005e..8486c22 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -48,7 +48,10 @@ #include #include #include - +#include +#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) +#include +#endif static kmem_cache_t *filter_open_cache; static kmem_cache_t *filter_dentry_cache; @@ -64,6 +67,7 @@ struct xprocfs_io_stat { __u64 st_create_reqs; __u64 st_destroy_reqs; __u64 st_statfs_reqs; + __u64 st_syncfs_reqs; __u64 st_open_reqs; __u64 st_close_reqs; __u64 st_punch_reqs; @@ -77,6 +81,7 @@ do { \ xprocfs_iostats[smp_processor_id()].field += (count); \ } while (0) +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) #define DECLARE_XPROCFS_SUM_STAT(field) \ static long long \ xprocfs_sum_##field (void) \ @@ -98,9 +103,11 @@ DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs) DECLARE_XPROCFS_SUM_STAT (st_create_reqs) DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs) DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs) +DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs) DECLARE_XPROCFS_SUM_STAT (st_open_reqs) DECLARE_XPROCFS_SUM_STAT (st_close_reqs) DECLARE_XPROCFS_SUM_STAT (st_punch_reqs) +#endif static int xprocfs_rd_stat (char *page, char **start, off_t off, int count, @@ -148,6 +155,7 @@ xprocfs_init (char *name) return; } +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) xprocfs_add_stat ("read_bytes", xprocfs_sum_st_read_bytes); xprocfs_add_stat ("read_reqs", xprocfs_sum_st_read_reqs); xprocfs_add_stat ("write_bytes", xprocfs_sum_st_write_bytes); @@ -157,9 +165,11 @@ xprocfs_init (char *name) xprocfs_add_stat ("create_reqs", xprocfs_sum_st_create_reqs); xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs); xprocfs_add_stat ("statfs_reqs", xprocfs_sum_st_statfs_reqs); + xprocfs_add_stat ("syncfs_reqs", xprocfs_sum_st_syncfs_reqs); xprocfs_add_stat ("open_reqs", xprocfs_sum_st_open_reqs); xprocfs_add_stat ("close_reqs", xprocfs_sum_st_close_reqs); xprocfs_add_stat ("punch_reqs", xprocfs_sum_st_punch_reqs); +#endif } void xprocfs_fini (void) @@ -176,6 +186,7 @@ void xprocfs_fini (void) remove_proc_entry ("create_reqs", xprocfs_dir); remove_proc_entry ("destroy_reqs", xprocfs_dir); remove_proc_entry ("statfs_reqs", xprocfs_dir); + remove_proc_entry ("syncfs_reqs", xprocfs_dir); remove_proc_entry ("open_reqs", xprocfs_dir); remove_proc_entry ("close_reqs", xprocfs_dir); remove_proc_entry ("punch_reqs", xprocfs_dir); @@ -212,10 +223,12 @@ static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd, void filter_start_transno(struct obd_export *export) { +#ifdef FILTER_TRANSNO_SEM struct obd_device * obd = export->exp_obd; ENTRY; down(&obd->u.filter.fo_transno_sem); +#endif } /* Assumes caller has already pushed us into the kernel context. */ @@ -231,8 +244,16 @@ int filter_finish_transno(struct obd_export *export, void *handle, ssize_t written; /* Propagate error code. */ - if (rc) - GOTO(out, rc); + if (rc) { +#ifdef FILTER_TRANSNO_SEM + up(&filter->fo_transno_sem); +#endif + RETURN(rc); + } + + if (!(obd->obd_flags & OBD_REPLAYABLE)) { + RETURN(0); + } /* we don't allocate new transnos for replayed requests */ #if 0 @@ -241,13 +262,20 @@ int filter_finish_transno(struct obd_export *export, void *handle, GOTO(out, rc = 0); #endif - off = FILTER_LR_CLIENT_START + fed->fed_lr_off * FILTER_LR_CLIENT_SIZE; + off = fed->fed_lr_off; - last_rcvd = ++filter->fo_fsd->fsd_last_rcvd; +#ifndef FILTER_TRANSNO_SEM + spin_lock(&filter->fo_translock); +#endif + last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd); + filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1); +#ifndef FILTER_TRANSNO_SEM + spin_unlock(&filter->fo_translock); +#endif if (oti) oti->oti_transno = last_rcvd; fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd); - fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count); + fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count; /* get this from oti */ #if 0 @@ -261,27 +289,31 @@ int filter_finish_transno(struct obd_export *export, void *handle, written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd), &off); CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = " - LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_off, written); + LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written); +#ifdef FILTER_TRANSNO_SEM + up(&filter->fo_transno_sem); +#endif if (written == sizeof(*fcd)) - GOTO(out, rc = 0); - CERROR("error writing to last_rcvd file: rc = %d\n", rc); + RETURN(0); + CERROR("error writing to last_rcvd file: rc = %d\n", written); if (written >= 0) - GOTO(out, rc = -EIO); - - rc = 0; - - EXIT; - out: + RETURN(-EIO); - up(&filter->fo_transno_sem); - return rc; + RETURN(written); } /* write the pathname into the string */ -static int filter_id(char *buf, obd_id id, obd_mode mode) +static char *filter_id(char *buf, struct filter_obd *filter, obd_id id, + obd_mode mode) { - return sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id); + if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0) + sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id); + else + sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode), + (int)id & (filter->fo_subdir_count - 1), id); + + return buf; } static inline void f_dput(struct dentry *dentry) @@ -312,56 +344,60 @@ struct dentry_operations filter_dops = { #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8) #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long)) -static unsigned long filter_last_rcvd_slots[FILTER_LR_MAX_CLIENT_WORDS]; - /* Add client data to the FILTER. We use a bitmap to locate a free space - * in the last_rcvd file if cl_off is -1 (i.e. a new client). + * in the last_rcvd file if cl_idx is -1 (i.e. a new client). * Otherwise, we have just read the data from the last_rcvd file and * we know its offset. */ int filter_client_add(struct filter_obd *filter, - struct filter_export_data *fed, int cl_off) + struct filter_export_data *fed, int cl_idx) { - int new_client = (cl_off == -1); + int new_client = (cl_idx == -1); + + LASSERT(filter->fo_last_rcvd_slots != NULL); - /* the bitmap operations can handle cl_off > sizeof(long) * 8, so + /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so * there's no need for extra complication here */ if (new_client) { - cl_off = find_first_zero_bit(filter_last_rcvd_slots, + cl_idx = find_first_zero_bit(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS); repeat: - if (cl_off >= FILTER_LR_MAX_CLIENTS) { + if (cl_idx >= FILTER_LR_MAX_CLIENTS) { CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n"); return -ENOMEM; } - if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) { + if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) { CERROR("FILTER client %d: found bit is set in bitmap\n", - cl_off); - cl_off = find_next_zero_bit(filter_last_rcvd_slots, + cl_idx); + cl_idx = find_next_zero_bit(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS, - cl_off); + cl_idx); goto repeat; } } else { - if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) { + if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) { CERROR("FILTER client %d: bit already set in bitmap!\n", - cl_off); + cl_idx); LBUG(); } } - CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n", - cl_off, fed->fed_fcd->fcd_uuid); + fed->fed_lr_idx = cl_idx; + fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) + + cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size); - fed->fed_lr_off = cl_off; + CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n", + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid); if (new_client) { struct obd_run_ctxt saved; - loff_t off = FILTER_LR_CLIENT_START + - (cl_off * FILTER_LR_CLIENT_SIZE); + loff_t off = fed->fed_lr_off; ssize_t written; + CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n", + fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd)); + push_ctxt(&saved, &filter->fo_ctxt, NULL); written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fed->fed_fcd, @@ -373,9 +409,6 @@ int filter_client_add(struct filter_obd *filter, RETURN(written); RETURN(-EIO); } - CDEBUG(D_INFO, "wrote client fcd at off %u (len %u)\n", - FILTER_LR_CLIENT_START + (cl_off*FILTER_LR_CLIENT_SIZE), - (unsigned int)sizeof(*fed->fed_fcd)); } return 0; } @@ -392,14 +425,16 @@ int filter_client_free(struct obd_export *exp) if (!fed->fed_fcd) RETURN(0); - off = FILTER_LR_CLIENT_START + (fed->fed_lr_off*FILTER_LR_CLIENT_SIZE); + LASSERT(filter->fo_last_rcvd_slots != NULL); + + off = fed->fed_lr_off; - CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n", - fed->fed_lr_off, off, fed->fed_fcd->fcd_uuid); + CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n", + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid); - if (!test_and_clear_bit(fed->fed_lr_off, filter_last_rcvd_slots)) { + if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) { CERROR("FILTER client %u: bit already clear in bitmap!!\n", - fed->fed_lr_off); + fed->fed_lr_idx); LBUG(); } @@ -409,21 +444,17 @@ int filter_client_free(struct obd_export *exp) sizeof(zero_fcd), &off); /* XXX: this write gets lost sometimes, unless this sync is here. */ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev); -#else - file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1); -#endif + file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1); pop_ctxt(&saved, &filter->fo_ctxt, NULL); if (written != sizeof(zero_fcd)) { - CERROR("error zeroing out client %s off %d in %s: %d\n", - fed->fed_fcd->fcd_uuid, fed->fed_lr_off, LAST_RCVD, - written); + CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n", + fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off, + LAST_RCVD, written); } else { CDEBUG(D_INFO, - "zeroed disconnecting client %s at off %d ("LPX64")\n", - fed->fed_fcd->fcd_uuid, fed->fed_lr_off, off); + "zeroed disconnecting client %s at idx %u (%llu)\n", + fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off); } OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd)); @@ -431,49 +462,34 @@ int filter_client_free(struct obd_export *exp) return 0; } -static void filter_unpack_fsd(struct filter_server_data *fsd) -{ - fsd->fsd_last_objid = le64_to_cpu(fsd->fsd_last_objid); - fsd->fsd_last_rcvd = le64_to_cpu(fsd->fsd_last_rcvd); - fsd->fsd_mount_count = le64_to_cpu(fsd->fsd_mount_count); -} - -static void filter_pack_fsd(struct filter_server_data *disk_fsd, - struct filter_server_data *fsd) -{ - memset(disk_fsd, 0, sizeof(*disk_fsd)); - memcpy(disk_fsd->fsd_uuid, fsd->fsd_uuid, sizeof(fsd->fsd_uuid)); - disk_fsd->fsd_last_objid = cpu_to_le64(fsd->fsd_last_objid); - disk_fsd->fsd_last_rcvd = cpu_to_le64(fsd->fsd_last_rcvd); - disk_fsd->fsd_mount_count = cpu_to_le64(fsd->fsd_mount_count); -} - static int filter_free_server_data(struct filter_obd *filter) { OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd)); filter->fo_fsd = NULL; - + OBD_FREE(filter->fo_last_rcvd_slots, + FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long)); + filter->fo_last_rcvd_slots = NULL; return 0; } -/* assumes caller has already in kernel ctxt */ +/* assumes caller is already in kernel ctxt */ static int filter_update_server_data(struct file *filp, struct filter_server_data *fsd) { - struct filter_server_data disk_fsd; loff_t off = 0; int rc; CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid); - CDEBUG(D_INODE, "server last_objid: "LPU64"\n", fsd->fsd_last_objid); - CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", fsd->fsd_last_rcvd); - CDEBUG(D_INODE, "server last_mount: "LPU64"\n", fsd->fsd_mount_count); - - filter_pack_fsd(&disk_fsd, fsd); - rc = lustre_fwrite(filp, (char *)&disk_fsd, - sizeof(disk_fsd), &off); - if (rc != sizeof(disk_fsd)) { + CDEBUG(D_INODE, "server last_objid: "LPU64"\n", + le64_to_cpu(fsd->fsd_last_objid)); + CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", + le64_to_cpu(fsd->fsd_last_rcvd)); + CDEBUG(D_INODE, "server last_mount: "LPU64"\n", + le64_to_cpu(fsd->fsd_mount_count)); + + rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off); + if (rc != sizeof(*fsd)) { CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n", rc); RETURN(-EIO); @@ -482,8 +498,7 @@ static int filter_update_server_data(struct file *filp, } /* assumes caller has already in kernel ctxt */ -static int filter_init_server_data(struct obd_device *obd, - struct file * filp, +static int filter_init_server_data(struct obd_device *obd, struct file * filp, __u64 init_lastobjid) { struct filter_obd *filter = &obd->u.filter; @@ -491,7 +506,8 @@ static int filter_init_server_data(struct obd_device *obd, struct filter_client_data *fcd = NULL; struct inode *inode = filp->f_dentry->d_inode; unsigned long last_rcvd_size = inode->i_size; - int cl_off; + __u64 mount_count; + int cl_idx; loff_t off = 0; int rc; @@ -506,125 +522,161 @@ static int filter_init_server_data(struct obd_device *obd, RETURN(-ENOMEM); filter->fo_fsd = fsd; + OBD_ALLOC(filter->fo_last_rcvd_slots, + FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long)); + if (filter->fo_last_rcvd_slots == NULL) { + OBD_FREE(fsd, sizeof(*fsd)); + RETURN(-ENOMEM); + } + if (last_rcvd_size == 0) { CERROR("%s: initializing new last_rcvd\n", obd->obd_name); memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid)); - fsd->fsd_last_objid = init_lastobjid; + fsd->fsd_last_objid = cpu_to_le64(init_lastobjid); fsd->fsd_last_rcvd = 0; - fsd->fsd_mount_count = 0; - + mount_count = fsd->fsd_mount_count = 0; + fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE); + fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START); + fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE); + fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT); + filter->fo_subdir_count = FILTER_SUBDIR_COUNT; } else { - ssize_t retval = lustre_fread(filp, (char *)fsd, - sizeof(*fsd), + ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd), &off); if (retval != sizeof(*fsd)) { CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n"); GOTO(out, rc = -EIO); } - filter_unpack_fsd(fsd); + mount_count = le64_to_cpu(fsd->fsd_mount_count); + filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count); + } + + if (fsd->fsd_feature_incompat) { + CERROR("unsupported feature %x\n", + le32_to_cpu(fsd->fsd_feature_incompat)); + RETURN(-EINVAL); + } + if (fsd->fsd_feature_rocompat) { + CERROR("read-only feature %x\n", + le32_to_cpu(fsd->fsd_feature_rocompat)); + /* Do something like remount filesystem read-only */ + RETURN(-EINVAL); } CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n", - obd->obd_name, fsd->fsd_last_objid); + obd->obd_name, le64_to_cpu(fsd->fsd_last_objid)); CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n", - obd->obd_name, fsd->fsd_last_rcvd); + obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd)); CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n", - obd->obd_name, fsd->fsd_mount_count); + obd->obd_name, mount_count); + CDEBUG(D_INODE, "%s: server data size: %u\n", + obd->obd_name, le32_to_cpu(fsd->fsd_server_size)); + CDEBUG(D_INODE, "%s: per-client data start: %u\n", + obd->obd_name, le32_to_cpu(fsd->fsd_client_start)); + CDEBUG(D_INODE, "%s: per-client data size: %u\n", + obd->obd_name, le32_to_cpu(fsd->fsd_client_size)); + CDEBUG(D_INODE, "%s: server subdir_count: %u\n", + obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count)); /* * When we do a clean FILTER shutdown, we save the last_rcvd into * the header. If we find clients with higher last_rcvd values * then those clients may need recovery done. */ - /* off is adjusted by lustre_fread, so we don't adjust it in the loop */ - for (off = FILTER_LR_CLIENT_START, cl_off = 0; off < last_rcvd_size; - cl_off++) { - __u64 last_rcvd; - int mount_age; - - if (!fcd) { - OBD_ALLOC(fcd, sizeof(*fcd)); - if (!fcd) - GOTO(err_fsd, rc = -ENOMEM); - } - - rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off); - if (rc != sizeof(*fcd)) { - CERROR("error reading FILTER %s offset %d: rc = %d\n", - LAST_RCVD, cl_off, rc); - if (rc > 0) /* XXX fatal error or just abort reading? */ - rc = -EIO; - break; - } - - if (fcd->fcd_uuid[0] == '\0') { - CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", - cl_off); - continue; - } + if (obd->obd_flags & OBD_REPLAYABLE) { + for (cl_idx = 0; off < last_rcvd_size; cl_idx++) { + __u64 last_rcvd; + int mount_age; + + if (!fcd) { + OBD_ALLOC(fcd, sizeof(*fcd)); + if (!fcd) + GOTO(err_fsd, rc = -ENOMEM); + } - last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd); + /* Don't assume off is incremented properly, in case + * sizeof(fsd) isn't the same as fsd->fsd_client_size. + */ + off = le32_to_cpu(fsd->fsd_client_start) + + cl_idx * le16_to_cpu(fsd->fsd_client_size); + rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off); + if (rc != sizeof(*fcd)) { + CERROR("error reading FILTER %s offset %d: rc = %d\n", + LAST_RCVD, cl_idx, rc); + if (rc > 0) /* XXX fatal error or just abort reading? */ + rc = -EIO; + break; + } - /* These exports are cleaned up by filter_disconnect(), so they - * need to be set up like real exports as filter_connect() does. - */ - mount_age = fsd->fsd_mount_count - - le64_to_cpu(fcd->fcd_mount_count); - if (mount_age < FILTER_MOUNT_RECOV) { - CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64 - "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64 - "\n", fcd->fcd_uuid, cl_off, - last_rcvd, fsd->fsd_last_rcvd, - le64_to_cpu(fcd->fcd_mount_count), - fsd->fsd_mount_count); -#if 0 - /* disabled until OST recovery is actually working */ - struct obd_export *exp = class_new_export(obd); - struct filter_export_data *fed; + if (fcd->fcd_uuid[0] == '\0') { + CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", + cl_idx); + continue; + } - if (!exp) { - rc = -ENOMEM; - break; + last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd); + + /* These exports are cleaned up by filter_disconnect(), so they + * need to be set up like real exports as filter_connect() does. + */ + mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count); + if (mount_age < FILTER_MOUNT_RECOV) { + struct obd_export *exp = class_new_export(obd); + struct filter_export_data *fed; + CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64" mnt: "LPU64" last mount: " + LPU64"\n", fcd->fcd_uuid, cl_idx, + last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd), + le64_to_cpu(fcd->fcd_mount_count), mount_count); + /* disabled until OST recovery is actually working */ + + if (!exp) { + rc = -ENOMEM; + break; + } + memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid, + sizeof exp->exp_client_uuid.uuid); + fed = &exp->exp_filter_data; + fed->fed_fcd = fcd; + filter_client_add(filter, fed, cl_idx); + /* create helper if export init gets more complex */ + INIT_LIST_HEAD(&fed->fed_open_head); + spin_lock_init(&fed->fed_lock); + + fcd = NULL; + obd->obd_recoverable_clients++; + } else { + CDEBUG(D_INFO, + "discarded client %d UUID '%s' count "LPU64"\n", + cl_idx, fcd->fcd_uuid, + le64_to_cpu(fcd->fcd_mount_count)); } - fed = &exp->exp_filter_data; - fed->fed_fcd = fcd; - filter_client_add(filter, fed, cl_off); - /* create helper if export init gets more complex */ - INIT_LIST_HEAD(&fed->fed_open_head); - spin_lock_init(&fed->fed_lock); + CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n", + cl_idx, last_rcvd); - fcd = NULL; - filter->fo_recoverable_clients++; -#endif - } else { - CDEBUG(D_INFO, - "discarded client %d, UUID '%s', count %Ld\n", - cl_off, fcd->fcd_uuid, - (long long)le64_to_cpu(fcd->fcd_mount_count)); + if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd)) + filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd); } - CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n", - cl_off, (unsigned long long)last_rcvd); + obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd); + if (obd->obd_recoverable_clients) { + CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n", + obd->obd_recoverable_clients, + le64_to_cpu(filter->fo_fsd->fsd_last_rcvd)); + obd->obd_next_recovery_transno = obd->obd_last_committed + 1; + obd->obd_flags |= OBD_RECOVERING; + } - if (last_rcvd > filter->fo_fsd->fsd_last_rcvd) - filter->fo_fsd->fsd_last_rcvd = last_rcvd; - } + if (fcd) + OBD_FREE(fcd, sizeof(*fcd)); - obd->obd_last_committed = filter->fo_fsd->fsd_last_rcvd; - if (filter->fo_recoverable_clients) { - CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n", - filter->fo_recoverable_clients, - filter->fo_fsd->fsd_last_rcvd); - filter->fo_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_flags |= OBD_RECOVERING; + } else { + CERROR("%s: recovery support OFF\n", obd->obd_name); } - if (fcd) - OBD_FREE(fcd, sizeof(*fcd)); - - fsd->fsd_mount_count++; + fsd->fsd_mount_count = cpu_to_le64(mount_count + 1); /* save it,so mount count and last_recvd is current */ rc = filter_update_server_data(filp, filter->fo_fsd); @@ -642,9 +694,10 @@ static int filter_prep(struct obd_device *obd) { struct obd_run_ctxt saved; struct filter_obd *filter = &obd->u.filter; - struct dentry *dentry; + struct dentry *dentry, *O_dentry; struct file *file; struct inode *inode; + int i; int rc = 0; int mode = 0; @@ -662,6 +715,7 @@ static int filter_prep(struct obd_device *obd) * Create directories and/or get dentries for each object type. * This saves us from having to do multiple lookups for each one. */ + O_dentry = filter->fo_dentry_O; for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) { char *name = obd_type_by_mode[mode]; @@ -669,22 +723,22 @@ static int filter_prep(struct obd_device *obd) filter->fo_dentry_O_mode[mode] = NULL; continue; } - dentry = simple_mkdir(filter->fo_dentry_O, name, 0700); + dentry = simple_mkdir(O_dentry, name, 0700); CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry); if (IS_ERR(dentry)) { rc = PTR_ERR(dentry); CERROR("cannot create O/%s: rc = %d\n", name, rc); - GOTO(out_O_mode, rc); + GOTO(err_O_mode, rc); } filter->fo_dentry_O_mode[mode] = dentry; } file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700); - if ( !file || IS_ERR(file) ) { + if (!file || IS_ERR(file)) { rc = PTR_ERR(file); CERROR("OBD filter: cannot open/create %s: rc = %d\n", LAST_RCVD, rc); - GOTO(out_O_mode, rc); + GOTO(err_O_mode, rc); } if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { @@ -711,19 +765,50 @@ static int filter_prep(struct obd_device *obd) } filter->fo_rcvd_filp = file; + if (filter->fo_subdir_count) { + O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT]; + OBD_ALLOC(filter->fo_dentry_O_sub, + FILTER_SUBDIR_COUNT * sizeof(dentry)); + if (!filter->fo_dentry_O_sub) + GOTO(err_client, rc = -ENOMEM); + + for (i = 0; i < filter->fo_subdir_count; i++) { + char dir[20]; + snprintf(dir, sizeof(dir), "d%u", i); + + dentry = simple_mkdir(O_dentry, dir, 0700); + CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("can't create O/R/%s: rc = %d\n",dir,rc); + GOTO(err_O_sub, rc); + } + filter->fo_dentry_O_sub[i] = dentry; + } + } rc = 0; out: pop_ctxt(&saved, &filter->fo_ctxt, NULL); return(rc); +err_O_sub: + while (i-- > 0) { + struct dentry *dentry = filter->fo_dentry_O_sub[i]; + if (dentry) { + f_dput(dentry); + filter->fo_dentry_O_sub[i] = NULL; + } + } + OBD_FREE(filter->fo_dentry_O_sub, + filter->fo_subdir_count * sizeof(dentry)); err_client: class_disconnect_all(obd); err_filp: if (filp_close(file, 0)) CERROR("can't close %s after error\n", LAST_RCVD); filter->fo_rcvd_filp = NULL; - out_O_mode: +err_O_mode: while (mode-- > 0) { struct dentry *dentry = filter->fo_dentry_O_mode[mode]; if (dentry) { @@ -752,23 +837,28 @@ static void filter_post(struct obd_device *obd) rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd); if (rc) CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc); - filter_free_server_data(filter); if (filter->fo_rcvd_filp) { - /* broken sync at umount bug workaround */ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - rc = fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev); -#else rc = file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1); -#endif filp_close(filter->fo_rcvd_filp, 0); filter->fo_rcvd_filp = NULL; if (rc) - CERROR("last_rcvd file won't closek rc = %ld\n", rc); + CERROR("last_rcvd file won't closed rc = %ld\n", rc); } + if (filter->fo_subdir_count) { + int i; + for (i = 0; i < filter->fo_subdir_count; i++) { + struct dentry *dentry = filter->fo_dentry_O_sub[i]; + f_dput(dentry); + filter->fo_dentry_O_sub[i] = NULL; + } + OBD_FREE(filter->fo_dentry_O_sub, + filter->fo_subdir_count * + sizeof(*filter->fo_dentry_O_sub)); + } for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) { struct dentry *dentry = filter->fo_dentry_O_mode[mode]; if (dentry) { @@ -777,6 +867,7 @@ static void filter_post(struct obd_device *obd) } } f_dput(filter->fo_dentry_O); + filter_free_server_data(filter); pop_ctxt(&saved, &filter->fo_ctxt, NULL); } @@ -787,7 +878,8 @@ static __u64 filter_next_id(struct obd_device *obd) LASSERT(obd->u.filter.fo_fsd != NULL); spin_lock(&obd->u.filter.fo_objidlock); - id = ++obd->u.filter.fo_fsd->fsd_last_objid; + id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid); + obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1); spin_unlock(&obd->u.filter.fo_objidlock); return id; @@ -839,12 +931,15 @@ static struct dentry *filter_fid2dentry(struct obd_device *obd, } static inline struct dentry *filter_parent(struct obd_device *obd, - obd_mode mode) + obd_mode mode, obd_id objid) { struct filter_obd *filter = &obd->u.filter; LASSERT((mode & S_IFMT) == S_IFREG); /* only regular files for now */ - return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT]; + if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0) + return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT]; + + return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)]; } static struct file *filter_obj_open(struct obd_export *export, @@ -890,9 +985,9 @@ static struct file *filter_obj_open(struct obd_export *export, GOTO(out_ffd, file = ERR_PTR(-ENOMEM)); } - filter_id(name, id, type); push_ctxt(&saved, &filter->fo_ctxt, NULL); - file = filp_open(name, O_RDWR | O_LARGEFILE, 0 /* type? */); + file = filp_open(filter_id(name, filter, id, type), + O_RDWR | O_LARGEFILE, type); pop_ctxt(&saved, &filter->fo_ctxt, NULL); if (IS_ERR(file)) { @@ -909,11 +1004,12 @@ static struct file *filter_obj_open(struct obd_export *export, LASSERT(kmem_cache_validate(filter_dentry_cache, fdd)); /* should only happen during client recovery */ if (fdd->fdd_flags & FILTER_FLAG_DESTROY) - CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id); + CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id); atomic_inc(&fdd->fdd_open_count); } else { atomic_set(&fdd->fdd_open_count, 1); fdd->fdd_flags = 0; + fdd->fdd_objid = id; /* If this is racy, then we can use {cmp}xchg and atomic_add */ dentry->d_fsdata = fdd; spin_unlock(&filter->fo_fddlock); @@ -921,6 +1017,7 @@ static struct file *filter_obj_open(struct obd_export *export, get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie)); ffd->ffd_file = file; + LASSERT(file->private_data == NULL); file->private_data = ffd; if (!dentry->d_op) @@ -932,7 +1029,7 @@ static struct file *filter_obj_open(struct obd_export *export, list_add(&ffd->ffd_export_list, &fed->fed_open_head); spin_unlock(&fed->fed_lock); - CDEBUG(D_INODE, "opened objid "LPX64": rc = %p\n", id, file); + CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file); EXIT; out: return file; @@ -991,7 +1088,7 @@ static int filter_close_internal(struct obd_export *export, if (atomic_dec_and_test(&fdd->fdd_open_count) && fdd->fdd_flags & FILTER_FLAG_DESTROY) { - struct dentry *dir_dentry = filter_parent(obd, S_IFREG); + struct dentry *dir_dentry = filter_parent(obd, S_IFREG, fdd->fdd_objid); struct obd_run_ctxt saved; void *handle; @@ -1029,7 +1126,8 @@ static int filter_close_internal(struct obd_export *export, /* obd methods */ /* mount the file system (secretly) */ -static int filter_setup(struct obd_device *obd, obd_count len, void *buf) +static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, + char *option) { struct obd_ioctl_data* data = buf; struct filter_obd *filter; @@ -1038,21 +1136,25 @@ static int filter_setup(struct obd_device *obd, obd_count len, void *buf) ENTRY; if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2) - RETURN(rc = -EINVAL); + RETURN(-EINVAL); obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2); if (IS_ERR(obd->obd_fsops)) - RETURN(rc = PTR_ERR(obd->obd_fsops)); + RETURN(PTR_ERR(obd->obd_fsops)); - mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); + mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option); rc = PTR_ERR(mnt); - if (IS_ERR(mnt)) + if (IS_ERR(mnt)) { + CERROR("mount of %s as type %s failed: rc %d\n", + data->ioc_inlbuf2, data->ioc_inlbuf1, rc); GOTO(err_ops, rc); + } +#if OST_RECOVERY obd->obd_flags |= OBD_REPLAYABLE; +#endif filter = &obd->u.filter;; - init_MUTEX(&filter->fo_transno_sem); filter->fo_vfsmnt = mnt; filter->fo_fstype = strdup(data->ioc_inlbuf2); filter->fo_sb = mnt->mnt_root->d_inode->i_sb; @@ -1067,6 +1169,11 @@ static int filter_setup(struct obd_device *obd, obd_count len, void *buf) if (rc) GOTO(err_kfree, rc); +#ifdef FILTER_TRANSNO_SEM + init_MUTEX(&filter->fo_transno_sem); +#else + spin_lock_init(&filter->fo_translock); +#endif spin_lock_init(&filter->fo_fddlock); spin_lock_init(&filter->fo_objidlock); INIT_LIST_HEAD(&filter->fo_export_list); @@ -1094,6 +1201,29 @@ err_ops: return rc; } +static int filter_setup(struct obd_device *obd, obd_count len, void *buf) +{ + return filter_common_setup(obd, len, buf, NULL); +} + +/* sanobd setup methods - use a specific mount option */ +static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf) +{ + struct obd_ioctl_data* data = buf; + char *option = NULL; + + if (!data->ioc_inlbuf2) + RETURN(-EINVAL); + + /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */ + if (!strcmp(data->ioc_inlbuf2, "extN") || + !strcmp(data->ioc_inlbuf2, "ext3")) + option = "data=writeback"; + else + LBUG(); /* just a reminder */ + + return filter_common_setup(obd, len, buf, option); +} static int filter_cleanup(struct obd_device *obd) { @@ -1178,9 +1308,11 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd, INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head); spin_lock_init(&exp->exp_filter_data.fed_lock); - rc = filter_client_add(filter, fed, -1); - if (rc) - GOTO(out_fcd, rc); + if (obd->obd_flags & OBD_REPLAYABLE) { + rc = filter_client_add(filter, fed, -1); + if (rc) + GOTO(out_fcd, rc); + } RETURN(rc); @@ -1222,7 +1354,9 @@ static int filter_disconnect(struct lustre_handle *conn) spin_unlock(&fed->fed_lock); ldlm_cancel_locks_for_export(exp); - filter_client_free(exp); + + if (exp->exp_obd->obd_flags & OBD_REPLAYABLE) + filter_client_free(exp); rc = class_disconnect(conn); @@ -1235,7 +1369,7 @@ static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid) int type = oa->o_mode & S_IFMT; ENTRY; - CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPX64" valid 0x%08x\n", + CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n", inode->i_ino, inode, oa->o_id, valid); /* Don't copy the inode number in place of the object ID */ obdo_from_inode(oa, inode, valid); @@ -1289,17 +1423,18 @@ static struct dentry *__filter_oa2dentry(struct lustre_handle *conn, CERROR("invalid client "LPX64"\n", conn->addr); RETURN(ERR_PTR(-EINVAL)); } - dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode), + dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode, + oa->o_id), oa->o_id, locked); } if (IS_ERR(dentry)) { - CERROR("%s error looking up object: "LPX64"\n", what, oa->o_id); + CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id); RETURN(dentry); } if (!dentry->d_inode) { - CERROR("%s on non-existent object: "LPX64"\n", what, oa->o_id); + CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id); f_dput(dentry); LBUG(); RETURN(ERR_PTR(-ENOENT)); @@ -1446,7 +1581,7 @@ static int filter_close(struct lustre_handle *conn, struct obdo *oa, XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1); if (!(oa->o_valid & OBD_MD_FLHANDLE)) { - CERROR("no handle for close of objid "LPX64"\n", oa->o_id); + CERROR("no handle for close of objid "LPU64"\n", oa->o_id); RETURN(-EINVAL); } @@ -1492,17 +1627,18 @@ static int filter_create(struct lustre_handle *conn, struct obdo *oa, oa->o_id = filter_next_id(obd); push_ctxt(&saved, &filter->fo_ctxt, NULL); - dir_dentry = filter_parent(obd, oa->o_mode); + dir_dentry = filter_parent(obd, S_IFREG, oa->o_id); down(&dir_dentry->d_inode->i_sem); new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0); if (IS_ERR(new)) GOTO(out, rc = PTR_ERR(new)); if (new->d_inode) { + char buf[32]; + /* This would only happen if lastobjid was bad on disk */ - CERROR("objid O/%*s/"LPU64" already exists\n", - dir_dentry->d_name.len, dir_dentry->d_name.name, - oa->o_id); + CERROR("objid %s already exists\n", + filter_id(buf, filter, S_IFREG, oa->o_id)); LBUG(); GOTO(out, rc = -EEXIST); } @@ -1568,9 +1704,9 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa, XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1); - CDEBUG(D_INODE, "destroying objid "LPX64"\n", oa->o_id); + CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id); - dir_dentry = filter_parent(obd, oa->o_mode); + dir_dentry = filter_parent(obd, oa->o_mode, oa->o_id); down(&dir_dentry->d_inode->i_sem); object_dentry = filter_oa2dentry(conn, oa, 0); @@ -1591,11 +1727,11 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa, fdd->fdd_flags |= FILTER_FLAG_DESTROY; /* XXX put into PENDING directory in case of crash */ CDEBUG(D_INODE, - "defer destroy of %dx open objid "LPX64"\n", + "defer destroy of %dx open objid "LPU64"\n", atomic_read(&fdd->fdd_open_count), oa->o_id); } else CDEBUG(D_INODE, - "repeat destroy of %dx open objid "LPX64"\n", + "repeat destroy of %dx open objid "LPU64"\n", atomic_read(&fdd->fdd_open_count), oa->o_id); GOTO(out_commit, rc = 0); } @@ -1635,7 +1771,7 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa, if (end != OBD_OBJECT_EOF) CERROR("PUNCH not supported, only truncate works\n"); - CDEBUG(D_INODE, "calling truncate for object "LPX64", valid = %x, " + CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, " "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start); oa->o_size = start; error = filter_setattr(conn, oa, NULL, oti); @@ -1650,9 +1786,9 @@ static inline void lustre_put_page(struct page *page) static struct page * -lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb) +lustre_get_page_read(struct inode *inode, struct niobuf_local *lnb) { - unsigned long index = rnb->offset >> PAGE_SHIFT; + unsigned long index = lnb->offset >> PAGE_SHIFT; struct address_space *mapping = inode->i_mapping; struct page *page; int rc; @@ -1661,7 +1797,8 @@ lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb) (filler_t*)mapping->a_ops->readpage, NULL); if (!IS_ERR(page)) { wait_on_page(page); - kmap(page); + lnb->addr = kmap(page); + lnb->page = page; if (!PageUptodate(page)) { CERROR("page index %lu not uptodate\n", index); GOTO(err_page, rc = -EIO); @@ -1760,12 +1897,10 @@ static int lustre_commit_write(struct niobuf_local *lnb) } struct page *filter_get_page_write(struct inode *inode, - struct niobuf_remote *rnb, struct niobuf_local *lnb, int *pglocked) { - unsigned long index = rnb->offset >> PAGE_SHIFT; + unsigned long index = lnb->offset >> PAGE_SHIFT; struct address_space *mapping = inode->i_mapping; - struct page *page; int rc; @@ -1791,26 +1926,25 @@ struct page *filter_get_page_write(struct inode *inode, addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */ if (!addr) { CERROR("no memory for a temp page\n"); - LBUG(); GOTO(err, rc = -ENOMEM); } - /* XXX debugging */ - memset((void *)addr, 0xBA, PAGE_SIZE); + POISON((void *)addr, 0xBA, PAGE_SIZE); page = virt_to_page(addr); kmap(page); page->index = index; + lnb->addr = (void *)addr; + lnb->page = page; lnb->flags |= N_LOCAL_TEMP_PAGE; } else if (!IS_ERR(page)) { (*pglocked)++; kmap(page); rc = mapping->a_ops->prepare_write(NULL, page, - rnb->offset % PAGE_SIZE, - rnb->len); + lnb->offset & ~PAGE_MASK, + lnb->len); if (rc) { - CERROR("page index %lu, rc = %d\n", index, rc); if (rc != -ENOSPC) - LBUG(); + CERROR("page index %lu, rc = %d\n", index, rc); GOTO(err_unlock, rc); } /* XXX not sure if we need this if we are overwriting page */ @@ -1819,7 +1953,10 @@ struct page *filter_get_page_write(struct inode *inode, LBUG(); GOTO(err_unlock, rc = -EIO); } + lnb->addr = page_address(page); + lnb->page = page; } + return page; err_unlock: @@ -1849,7 +1986,8 @@ static int filter_commit_write(struct niobuf_local *lnb, int err) unsigned blocksize = head->b_size; /* debugging: just seeing if this ever happens */ - CERROR("called filter_commit_write for ino %lu:%lu on err %d\n", + CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR, + "called for ino %lu:%lu on err %d\n", lnb->page->mapping->host->i_ino, lnb->page->index, err); /* Currently one buffer per page, but in the future... */ @@ -1876,7 +2014,6 @@ static int filter_preprw(int cmd, struct lustre_handle *conn, struct obd_ioobj *o; struct niobuf_remote *rnb = nb; struct niobuf_local *lnb = res; - struct dentry *dir_dentry; struct fsfilt_objinfo *fso; int pglocked = 0; int rc = 0; @@ -1904,7 +2041,6 @@ static int filter_preprw(int cmd, struct lustre_handle *conn, RETURN(-ENOMEM); push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - dir_dentry = filter_parent(obd, S_IFREG); for (i = 0, o = obj; i < objcount; i++, o++) { struct filter_dentry_data *fdd; @@ -1912,7 +2048,9 @@ static int filter_preprw(int cmd, struct lustre_handle *conn, LASSERT(o->ioo_bufcnt); - dentry = filter_fid2dentry(obd, dir_dentry, o->ioo_id, 0); + dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG, + o->ioo_id), + o->ioo_id, 0); if (IS_ERR(dentry)) GOTO(out_objinfo, rc = PTR_ERR(dentry)); @@ -1923,17 +2061,18 @@ static int filter_preprw(int cmd, struct lustre_handle *conn, if (!dentry->d_inode) { CERROR("trying to BRW to non-existent file "LPU64"\n", o->ioo_id); + f_dput(dentry); GOTO(out_objinfo, rc = -ENOENT); } fdd = dentry->d_fsdata; if (!fdd || !atomic_read(&fdd->fdd_open_count)) - CDEBUG(D_PAGE, "I/O to unopened object "LPX64"\n", + CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n", o->ioo_id); } if (cmd & OBD_BRW_WRITE) { -#warning "FIXME: we need to get inode->i_sem for each object here" +#warning "FIXME: we need inode->i_sem for each object to protect vs truncate" /* Even worse, we need to get locks on mulitple inodes (in * order) or use the DLM to do the locking for us (and use * the same locking in filter_setattr() for truncate. The @@ -1951,8 +2090,13 @@ static int filter_preprw(int cmd, struct lustre_handle *conn, filter_start_transno(export); *desc_private = fsfilt_brw_start(obd, objcount, fso, niocount, nb); - if (IS_ERR(*desc_private)) - GOTO(out_objinfo, rc = PTR_ERR(*desc_private)); + if (IS_ERR(*desc_private)) { + rc = PTR_ERR(*desc_private); + CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, + "error starting transaction: rc = %d\n", rc); + *desc_private = NULL; + GOTO(out_objinfo, rc); + } } obd_kmap_get(niocount, 1); @@ -1973,29 +2117,31 @@ static int filter_preprw(int cmd, struct lustre_handle *conn, else lnb->dentry = dget(dentry); + /* lnb->offset is aligned, while rnb->offset isn't, + * and we need to copy the fields to lnb anyways. + */ + memcpy(lnb, rnb, sizeof(*rnb)); if (cmd & OBD_BRW_WRITE) { - page = filter_get_page_write(inode, rnb, lnb, + page = filter_get_page_write(inode, lnb, &pglocked); - XPROCFS_BUMP_MYCPU_IOSTAT (st_write_bytes, - rnb->len); + XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes, + lnb->len); } else { - page = lustre_get_page_read(inode, rnb); + page = lustre_get_page_read(inode, lnb); - XPROCFS_BUMP_MYCPU_IOSTAT (st_read_bytes, - rnb->len); + XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes, + lnb->len); } if (IS_ERR(page)) { rc = PTR_ERR(page); + CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, + "error on page @"LPU64"%u/%u: rc = %d\n", + lnb->offset, j, o->ioo_bufcnt, rc); f_dput(dentry); GOTO(out_pages, rc); } - - lnb->addr = page_address(page); - lnb->offset = rnb->offset; - lnb->page = page; - lnb->len = rnb->len; } } @@ -2008,7 +2154,6 @@ out: out_pages: while (lnb-- > res) { - CERROR("%d error cleanup on brw\n", rc); if (cmd & OBD_BRW_WRITE) filter_commit_write(lnb, rc); else @@ -2016,16 +2161,17 @@ out_pages: f_dput(lnb->dentry); } obd_kmap_put(niocount); - goto out_err; /* dropped the dentry refs already (one per page) */ + if (cmd & OBD_BRW_WRITE) { + filter_finish_transno(export, *desc_private, oti, rc); + fsfilt_commit(obd, + filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode, + *desc_private); + } + goto out; /* dropped the dentry refs already (one per page) */ out_objinfo: for (i = 0; i < objcount && fso[i].fso_dentry; i++) f_dput(fso[i].fso_dentry); -out_err: - if (cmd & OBD_BRW_WRITE) { - filter_finish_transno(export, *desc_private, oti, rc); - fsfilt_commit(obd, dir_dentry->d_inode, *desc_private); - } goto out; } @@ -2072,8 +2218,15 @@ static int filter_write_locked_page(struct niobuf_local *lnb) RETURN(rc); } -static int filter_sync(struct obd_device *obd) +static int filter_syncfs(struct lustre_handle *conn) { + struct obd_device *obd; + ENTRY; + + obd = class_conn2obd(conn); + + XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1); + RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb)); } @@ -2139,8 +2292,9 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn, } if (cmd & OBD_BRW_WRITE) { + /* We just want any dentry for the commit, for now */ + struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0); int err; - struct dentry *dir_dentry = filter_parent(obd, S_IFREG); rc = filter_finish_transno(export, desc_private, oti, rc); err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private); @@ -2148,7 +2302,7 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn, rc = err; if (obd_sync_filter) { /* this can fail with ENOMEM, what should we do then? */ - filter_sync(obd); + filter_syncfs(conn); } /* XXX LASSERT(last_rcvd == last_committed)*/ } @@ -2216,6 +2370,81 @@ out: RETURN(ret); } +static int filter_san_preprw(int cmd, struct lustre_handle *conn, + int objcount, struct obd_ioobj *obj, + int niocount, struct niobuf_remote *nb) +{ + struct obd_device *obd; + struct obd_ioobj *o = obj; + struct niobuf_remote *rnb = nb; + int rc = 0; + int i; + ENTRY; + + if ((cmd & OBD_BRW_WRITE) != 0) + XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1); + else + XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1); + + obd = class_conn2obd(conn); + if (!obd) { + CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr); + RETURN(-EINVAL); + } + + for (i = 0; i < objcount; i++, o++) { + struct dentry *dentry; + struct inode *inode; + int j; + + dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG, + o->ioo_id), + o->ioo_id, 0); + if (IS_ERR(dentry)) + GOTO(out, rc = PTR_ERR(dentry)); + inode = dentry->d_inode; + if (!inode) { + CERROR("trying to BRW to non-existent file "LPU64"\n", + o->ioo_id); + f_dput(dentry); + GOTO(out, rc = -ENOENT); + } + + for (j = 0; j < o->ioo_bufcnt; j++, rnb++) { + long block; + + block = rnb->offset >> PAGE_SHIFT; + + if (cmd == OBD_BRW_READ) { + block = inode->i_mapping->a_ops->bmap( + inode->i_mapping, block); + } else { + loff_t newsize = rnb->offset + rnb->len; + /* fs_prep_san_write will also update inode + * size for us: + * (1) new alloced block + * (2) existed block but size extented + */ + /* FIXME We could call fs_prep_san_write() + * only once for all the blocks allocation. + * Now call it once for each block, for + * simplicity. And if error happens, we + * probably need to release previous alloced + * block */ + rc = fs_prep_san_write(obd, inode, &block, + 1, newsize); + if (rc) + break; + } + + rnb->offset = block; + } + f_dput(dentry); + } +out: + RETURN(rc); +} + static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs) { struct obd_device *obd; @@ -2358,6 +2587,7 @@ static struct obd_ops filter_obd_ops = { o_connect: filter_connect, o_disconnect: filter_disconnect, o_statfs: filter_statfs, + o_syncfs: filter_syncfs, o_getattr: filter_getattr, o_create: filter_create, o_setattr: filter_setattr, @@ -2369,6 +2599,36 @@ static struct obd_ops filter_obd_ops = { o_preprw: filter_preprw, o_commitrw: filter_commitrw #if 0 + o_san_preprw: filter_san_preprw, + o_preallocate: filter_preallocate_inodes, + o_migrate: filter_migrate, + o_copy: filter_copy_data, + o_iterate: filter_iterate +#endif +}; + +static struct obd_ops filter_sanobd_ops = { + o_owner: THIS_MODULE, + o_attach: filter_attach, + o_detach: filter_detach, + o_get_info: filter_get_info, + o_setup: filter_san_setup, + o_cleanup: filter_cleanup, + o_connect: filter_connect, + o_disconnect: filter_disconnect, + o_statfs: filter_statfs, + o_getattr: filter_getattr, + o_create: filter_create, + o_setattr: filter_setattr, + o_destroy: filter_destroy, + o_open: filter_open, + o_close: filter_close, + o_brw: filter_brw, + o_punch: filter_truncate, + o_preprw: filter_preprw, + o_commitrw: filter_commitrw, + o_san_preprw: filter_san_preprw, +#if 0 o_preallocate: filter_preallocate_inodes, o_migrate: filter_migrate, o_copy: filter_copy_data, @@ -2380,6 +2640,7 @@ static struct obd_ops filter_obd_ops = { static int __init obdfilter_init(void) { struct lprocfs_static_vars lvars; + int rc; printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n"); filter_open_cache = kmem_cache_create("ll_filter_fdata", @@ -2392,19 +2653,37 @@ static int __init obdfilter_init(void) sizeof(struct filter_dentry_data), 0, 0, NULL, NULL); if (!filter_dentry_cache) { - kmem_cache_destroy(filter_open_cache); - RETURN(-ENOMEM); + rc = -ENOMEM; + goto err1; } xprocfs_init ("filter"); lprocfs_init_vars(&lvars); - return class_register_type(&filter_obd_ops, lvars.module_vars, - OBD_FILTER_DEVICENAME); + + rc = class_register_type(&filter_obd_ops, lvars.module_vars, + OBD_FILTER_DEVICENAME); + if (rc) + goto err2; + + rc = class_register_type(&filter_sanobd_ops, lvars.module_vars, + OBD_FILTER_SAN_DEVICENAME); + if (rc) + goto err3; + + return 0; +err3: + class_unregister_type(OBD_FILTER_DEVICENAME); +err2: + kmem_cache_destroy(filter_dentry_cache); +err1: + kmem_cache_destroy(filter_open_cache); + return rc; } static void __exit obdfilter_exit(void) { + class_unregister_type(OBD_FILTER_SAN_DEVICENAME); class_unregister_type(OBD_FILTER_DEVICENAME); if (kmem_cache_destroy(filter_dentry_cache)) CERROR("couldn't free obdfilter dentry cache\n");