X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter.c;h=d2f6369ced9cd5da0bf2d474ad1fd7dcfcc83ef9;hp=0ae84c86f2666a35e197215ebcc1895e7a624892;hb=abac4d40166d4f9e594dec0c2939fb802852b007;hpb=404d2dc2fc01f49ae469a3e7c81bac431bd84936 diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 0ae84c8..d2f6369 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -3,68 +3,128 @@ * * linux/fs/obdfilter/filter.c * - * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Andreas Dilger * - * This code is issued under the GNU General Public License. - * See the file COPYING in this distribution + * This file is part of Lustre, http://www.lustre.org. * - * by Peter Braam - * and Andreas Dilger + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +/* + * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops + * (which need to get journal_lock, may block if journal full). + * + * Invariant: Call filter_start_transno() before any journal ops to avoid the + * same deadlock problem. We can (and want) to get rid of the + * transno sem in favour of the dir/inode i_sem to avoid single + * threaded operation on the OST. */ -#define EXPORT_SYMTAB #define DEBUG_SUBSYSTEM S_FILTER +#include #include -#include #include #include +#include +#include +#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) +# include +# include +#endif + #include +#include #include -#include -#include -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) -#include -#endif -#include -#include -#include -#include +#include #include +#include +#include +#include -extern struct lprocfs_vars status_class_var[]; -extern struct lprocfs_vars status_var_nm_1[]; - -static kmem_cache_t *filter_open_cache; -static kmem_cache_t *filter_dentry_cache; - -#define FILTER_ROOTINO 2 -#define FILTER_ROOTINO_STR __stringify(FILTER_ROOTINO) - -#define S_SHIFT 12 -static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = { - [0] NULL, - [S_IFREG >> S_SHIFT] "R", - [S_IFDIR >> S_SHIFT] "D", - [S_IFCHR >> S_SHIFT] "C", - [S_IFBLK >> S_SHIFT] "B", - [S_IFIFO >> S_SHIFT] "F", - [S_IFSOCK >> S_SHIFT] "S", - [S_IFLNK >> S_SHIFT] "L" -}; +#include "filter_internal.h" + +static struct lvfs_callback_ops filter_lvfs_ops; -static inline const char *obd_mode_to_type(int mode) +static int filter_destroy(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *ea, struct obd_trans_info *); + +static void filter_commit_cb(struct obd_device *obd, __u64 transno, + void *cb_data, int error) { - return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT]; + obd_transno_commit_cb(obd, transno, error); } -/* write the pathname into the string */ -static int filter_id(char *buf, obd_id id, obd_mode mode) + +/* Assumes caller has already pushed us into the kernel context. */ +int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, + int rc) { - return sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id); + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_client_data *fcd = fed->fed_fcd; + __u64 last_rcvd; + loff_t off; + int err, log_pri = D_HA; + + /* Propagate error code. */ + if (rc) + RETURN(rc); + + if (!exp->exp_obd->obd_replayable || oti == NULL) + RETURN(rc); + + /* we don't allocate new transnos for replayed requests */ + if (oti->oti_transno == 0) { + spin_lock(&filter->fo_translock); + last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1; + filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd); + spin_unlock(&filter->fo_translock); + oti->oti_transno = last_rcvd; + } else { + spin_lock(&filter->fo_translock); + last_rcvd = oti->oti_transno; + if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno)) + filter->fo_fsd->fsd_last_transno = + cpu_to_le64(last_rcvd); + spin_unlock(&filter->fo_translock); + } + fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd); + + /* could get xid from oti, if it's ever needed */ + fcd->fcd_last_xid = 0; + + off = fed->fed_lr_off; + fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle, + filter_commit_cb, NULL); + err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd, + sizeof(*fcd), &off, 0); + if (err) { + log_pri = D_ERROR; + if (rc == 0) + rc = err; + } + + CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n", + last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err); + + RETURN(rc); } -static inline void f_dput(struct dentry *dentry) +void f_dput(struct dentry *dentry) { /* Can't go inside filter_ddelete because it can block */ CDEBUG(D_INODE, "putting %s: %p, count = %d\n", @@ -74,92 +134,626 @@ static inline void f_dput(struct dentry *dentry) dput(dentry); } -/* Not racy w.r.t. others, because we are the only user of this dentry */ -static void filter_drelease(struct dentry *dentry) +/* Add client data to the FILTER. We use a bitmap to locate a free space + * in the last_rcvd file if cl_idx is -1 (i.e. a new client). + * Otherwise, we have just read the data from the last_rcvd file and + * we know its offset. */ +static int filter_client_add(struct obd_device *obd, struct filter_obd *filter, + struct filter_export_data *fed, int cl_idx) { - if (dentry->d_fsdata) - kmem_cache_free(filter_dentry_cache, dentry->d_fsdata); -} + unsigned long *bitmap = filter->fo_last_rcvd_slots; + int new_client = (cl_idx == -1); + ENTRY; -struct dentry_operations filter_dops = { - .d_release = filter_drelease, -}; + LASSERT(bitmap != NULL); -/* setup the object store with correct subdirectories */ -static int filter_prep(struct obd_device *obd) + /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ + if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid)) + RETURN(0); + + /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so + * there's no need for extra complication here + */ + if (new_client) { + cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS); + repeat: + if (cl_idx >= FILTER_LR_MAX_CLIENTS) { + CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n"); + RETURN(-ENOMEM); + } + if (test_and_set_bit(cl_idx, bitmap)) { + CERROR("FILTER client %d: found bit is set in bitmap\n", + cl_idx); + cl_idx = find_next_zero_bit(bitmap, + FILTER_LR_MAX_CLIENTS, + cl_idx); + goto repeat; + } + } else { + if (test_and_set_bit(cl_idx, bitmap)) { + CERROR("FILTER client %d: bit already set in bitmap!\n", + cl_idx); + LBUG(); + } + } + + fed->fed_lr_idx = cl_idx; + fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) + + cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size); + + CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n", + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid); + + if (new_client) { + struct obd_run_ctxt saved; + loff_t off = fed->fed_lr_off; + int err; + void *handle; + + CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n", + fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd)); + + push_ctxt(&saved, &obd->obd_ctxt, NULL); + /* Transaction needed to fix bug 1403 */ + handle = fsfilt_start(obd, + filter->fo_rcvd_filp->f_dentry->d_inode, + FSFILT_OP_SETATTR, NULL); + if (IS_ERR(handle)) { + err = PTR_ERR(handle); + CERROR("unable to start transaction: rc %d\n", err); + } else { + err = fsfilt_write_record(obd, filter->fo_rcvd_filp, + fed->fed_fcd, + sizeof(*fed->fed_fcd), + &off, 1); + fsfilt_commit(obd, + filter->fo_rcvd_filp->f_dentry->d_inode, + handle, 1); + } + pop_ctxt(&saved, &obd->obd_ctxt, NULL); + + if (err) { + CERROR("error writing %s client idx %u: rc %d\n", + LAST_RCVD, fed->fed_lr_idx, err); + RETURN(err); + } + } + RETURN(0); +} + +static int filter_client_free(struct obd_export *exp, int flags) { + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct obd_device *obd = exp->exp_obd; + struct filter_client_data zero_fcd; struct obd_run_ctxt saved; + int rc; + loff_t off; + ENTRY; + + if (fed->fed_fcd == NULL) + RETURN(0); + + if (flags & OBD_OPT_FAILOVER) + GOTO(free, 0); + + /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ + if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0) + GOTO(free, 0); + + LASSERT(filter->fo_last_rcvd_slots != NULL); + + off = fed->fed_lr_off; + + CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n", + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid); + + /* Clear the bit _after_ zeroing out the client so we don't + race with filter_client_add and zero out new clients.*/ + if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) { + CERROR("FILTER client %u: bit already clear in bitmap!!\n", + fed->fed_lr_idx); + LBUG(); + } + + memset(&zero_fcd, 0, sizeof zero_fcd); + push_ctxt(&saved, &obd->obd_ctxt, NULL); + rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd, + sizeof(zero_fcd), &off, 1); + pop_ctxt(&saved, &obd->obd_ctxt, NULL); + + CDEBUG(rc == 0 ? D_INFO : D_ERROR, + "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n", + fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off, + LAST_RCVD, rc); + + if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) { + CERROR("FILTER client %u: bit already clear in bitmap!!\n", + fed->fed_lr_idx); + LBUG(); + } + +free: + OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd)); + + RETURN(0); +} + +static int filter_free_server_data(struct filter_obd *filter) +{ + OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd)); + filter->fo_fsd = NULL; + OBD_FREE(filter->fo_last_rcvd_slots, + FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long)); + filter->fo_last_rcvd_slots = NULL; + return 0; +} + +/* assumes caller is already in kernel ctxt */ +int filter_update_server_data(struct obd_device *obd, struct file *filp, + struct filter_server_data *fsd, int force_sync) +{ + loff_t off = 0; + int rc; + ENTRY; + + CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid); + CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", + le64_to_cpu(fsd->fsd_last_transno)); + CDEBUG(D_INODE, "server last_mount: "LPU64"\n", + le64_to_cpu(fsd->fsd_mount_count)); + + rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync); + if (rc) + CERROR("error writing filter_server_data: rc = %d\n", rc); + + RETURN(rc); +} + +int filter_update_last_objid(struct obd_device *obd, obd_gr group, + int force_sync) +{ struct filter_obd *filter = &obd->u.filter; - struct dentry *dentry; - struct dentry *root; - struct file *file; - struct inode *inode; - int rc = 0; - __u64 lastobjid = 2; - int mode = 0; - - push_ctxt(&saved, &filter->fo_ctxt, NULL); - dentry = simple_mkdir(current->fs->pwd, "O", 0700); - CDEBUG(D_INODE, "got/created O: %p\n", dentry); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot open/create O: rc = %d\n", rc); - GOTO(out, rc); + __u64 tmp; + loff_t off = 0; + int rc; + ENTRY; + + CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n", + group, filter->fo_last_objids[group]); + + tmp = cpu_to_le64(filter->fo_last_objids[group]); + rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group], + &tmp, sizeof(tmp), &off, force_sync); + if (rc) + CERROR("error writing group "LPU64" last objid: rc = %d\n", + group, rc); + RETURN(rc); +} + +/* assumes caller has already in kernel ctxt */ +static int filter_init_server_data(struct obd_device *obd, struct file * filp) +{ + struct filter_obd *filter = &obd->u.filter; + struct filter_server_data *fsd; + struct filter_client_data *fcd = NULL; + struct inode *inode = filp->f_dentry->d_inode; + unsigned long last_rcvd_size = inode->i_size; + __u64 mount_count; + int cl_idx; + loff_t off = 0; + int rc; + + /* ensure padding in the struct is the correct size */ + LASSERT (offsetof(struct filter_server_data, fsd_padding) + + sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE); + LASSERT (offsetof(struct filter_client_data, fcd_padding) + + sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE); + + OBD_ALLOC(fsd, sizeof(*fsd)); + if (!fsd) + RETURN(-ENOMEM); + filter->fo_fsd = fsd; + + OBD_ALLOC(filter->fo_last_rcvd_slots, + FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long)); + if (filter->fo_last_rcvd_slots == NULL) { + OBD_FREE(fsd, sizeof(*fsd)); + RETURN(-ENOMEM); } - filter->fo_dentry_O = dentry; - dentry = simple_mkdir(current->fs->pwd, "P", 0700); - CDEBUG(D_INODE, "got/created P: %p\n", dentry); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot open/create P: rc = %d\n", rc); - GOTO(out_O, rc); + + if (last_rcvd_size == 0) { + CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD); + + memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid)); + fsd->fsd_last_transno = 0; + mount_count = fsd->fsd_mount_count = 0; + fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE); + fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START); + fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE); + fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT); + filter->fo_subdir_count = FILTER_SUBDIR_COUNT; + } else { + rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off); + if (rc) { + CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n", + LAST_RCVD, rc); + GOTO(err_fsd, rc); + } + if (strcmp(fsd->fsd_uuid, obd->obd_uuid.uuid) != 0) { + CERROR("OBD UUID %s does not match last_rcvd UUID %s\n", + obd->obd_uuid.uuid, fsd->fsd_uuid); + GOTO(err_fsd, rc = -EINVAL); + } + mount_count = le64_to_cpu(fsd->fsd_mount_count); + filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count); } - f_dput(dentry); - dentry = simple_mkdir(current->fs->pwd, "D", 0700); - CDEBUG(D_INODE, "got/created D: %p\n", dentry); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot open/create D: rc = %d\n", rc); - GOTO(out_O, rc); + + if (fsd->fsd_feature_incompat & ~cpu_to_le32(FILTER_INCOMPAT_SUPP)) { + CERROR("unsupported feature %x\n", + le32_to_cpu(fsd->fsd_feature_incompat) & + ~FILTER_INCOMPAT_SUPP); + GOTO(err_fsd, rc = -EINVAL); + } + if (fsd->fsd_feature_rocompat & ~cpu_to_le32(FILTER_ROCOMPAT_SUPP)) { + CERROR("read-only feature %x\n", + le32_to_cpu(fsd->fsd_feature_rocompat) & + ~FILTER_ROCOMPAT_SUPP); + /* Do something like remount filesystem read-only */ + GOTO(err_fsd, rc = -EINVAL); } - root = simple_mknod(dentry, FILTER_ROOTINO_STR, S_IFREG | 0755); - f_dput(dentry); - if (IS_ERR(root)) { - rc = PTR_ERR(root); - CERROR("OBD filter: cannot open/create root %d: rc = %d\n", - FILTER_ROOTINO, rc); - GOTO(out_O, rc); + CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n", + obd->obd_name, le64_to_cpu(fsd->fsd_last_transno)); + CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n", + obd->obd_name, mount_count + 1); + CDEBUG(D_INODE, "%s: server data size: %u\n", + obd->obd_name, le32_to_cpu(fsd->fsd_server_size)); + CDEBUG(D_INODE, "%s: per-client data start: %u\n", + obd->obd_name, le32_to_cpu(fsd->fsd_client_start)); + CDEBUG(D_INODE, "%s: per-client data size: %u\n", + obd->obd_name, le32_to_cpu(fsd->fsd_client_size)); + CDEBUG(D_INODE, "%s: server subdir_count: %u\n", + obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count)); + CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name, + last_rcvd_size <= FILTER_LR_CLIENT_START ? 0 : + (last_rcvd_size-FILTER_LR_CLIENT_START) /FILTER_LR_CLIENT_SIZE); + + if (!obd->obd_replayable) { + CWARN("%s: recovery support OFF\n", obd->obd_name); + GOTO(out, rc = 0); } - f_dput(root); - /* - * Create directories and/or get dentries for each object type. - * This saves us from having to do multiple lookups for each one. - */ - for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) { - char *type = obd_type_by_mode[mode]; + for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start); + off < last_rcvd_size; cl_idx++) { + __u64 last_rcvd; + struct obd_export *exp; + struct filter_export_data *fed; - if (!type) { - filter->fo_dentry_O_mode[mode] = NULL; + if (!fcd) { + OBD_ALLOC(fcd, sizeof(*fcd)); + if (!fcd) + GOTO(err_client, rc = -ENOMEM); + } + + /* Don't assume off is incremented properly by + * fsfilt_read_record(), in case sizeof(*fcd) + * isn't the same as fsd->fsd_client_size. */ + off = le32_to_cpu(fsd->fsd_client_start) + + cl_idx * le16_to_cpu(fsd->fsd_client_size); + rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off); + if (rc) { + CERROR("error reading FILT %s idx %d off %llu: rc %d\n", + LAST_RCVD, cl_idx, off, rc); + break; /* read error shouldn't cause startup to fail */ + } + + if (fcd->fcd_uuid[0] == '\0') { + CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", + cl_idx); continue; } - dentry = simple_mkdir(filter->fo_dentry_O, type, 0700); - CDEBUG(D_INODE, "got/created O/%s: %p\n", type, dentry); + + last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd); + + /* These exports are cleaned up by filter_disconnect(), so they + * need to be set up like real exports as filter_connect() does. + */ + exp = class_new_export(obd); + CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx, + last_rcvd, le64_to_cpu(fsd->fsd_last_transno)); + if (exp == NULL) + GOTO(err_client, rc = -ENOMEM); + + memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid, + sizeof exp->exp_client_uuid.uuid); + fed = &exp->exp_filter_data; + fed->fed_fcd = fcd; + filter_client_add(obd, filter, fed, cl_idx); + /* create helper if export init gets more complex */ + spin_lock_init(&fed->fed_lock); + + fcd = NULL; + obd->obd_recoverable_clients++; + class_export_put(exp); + + CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n", + cl_idx, last_rcvd); + + if (last_rcvd > le64_to_cpu(fsd->fsd_last_transno)) + fsd->fsd_last_transno = cpu_to_le64(last_rcvd); + + } + + obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno); + + if (obd->obd_recoverable_clients) { + CWARN("RECOVERY: %d recoverable clients, last_rcvd " + LPU64"\n", obd->obd_recoverable_clients, + le64_to_cpu(fsd->fsd_last_transno)); + obd->obd_next_recovery_transno = obd->obd_last_committed + 1; + obd->obd_recovering = 1; + } + + if (fcd) + OBD_FREE(fcd, sizeof(*fcd)); + +out: + filter->fo_mount_count = mount_count + 1; + fsd->fsd_mount_count = cpu_to_le64(filter->fo_mount_count); + + /* save it, so mount count and last_transno is current */ + rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1); + + RETURN(rc); + +err_client: + class_disconnect_exports(obd, 0); +err_fsd: + filter_free_server_data(filter); + RETURN(rc); +} + +static int filter_cleanup_groups(struct obd_device *obd) +{ + struct filter_obd *filter = &obd->u.filter; + int i; + ENTRY; + + if (filter->fo_dentry_O_groups != NULL && + filter->fo_last_objids != NULL && + filter->fo_last_objid_files != NULL) { + for (i = 0; i < FILTER_GROUPS; i++) { + struct dentry *dentry = filter->fo_dentry_O_groups[i]; + struct file *filp = filter->fo_last_objid_files[i]; + if (dentry != NULL) { + f_dput(dentry); + filter->fo_dentry_O_groups[i] = NULL; + } + if (filp != NULL) { + filp_close(filp, 0); + filter->fo_last_objid_files[i] = NULL; + } + } + } + if (filter->fo_dentry_O_sub != NULL && filter->fo_subdir_count) { + for (i = 0; i < filter->fo_subdir_count; i++) { + struct dentry *dentry = filter->fo_dentry_O_sub[i]; + if (dentry != NULL) { + f_dput(dentry); + filter->fo_dentry_O_sub[i] = NULL; + } + } + OBD_FREE(filter->fo_dentry_O_sub, + filter->fo_subdir_count * + sizeof(*filter->fo_dentry_O_sub)); + } + if (filter->fo_dentry_O_groups != NULL) + OBD_FREE(filter->fo_dentry_O_groups, + FILTER_GROUPS * sizeof(struct dentry *)); + if (filter->fo_last_objids != NULL) + OBD_FREE(filter->fo_last_objids, + FILTER_GROUPS * sizeof(__u64)); + if (filter->fo_last_objid_files != NULL) + OBD_FREE(filter->fo_last_objid_files, + FILTER_GROUPS * sizeof(struct file *)); + RETURN(0); +} + +/* FIXME: object groups */ +static int filter_prep_groups(struct obd_device *obd) +{ + struct filter_obd *filter = &obd->u.filter; + struct dentry *dentry, *O_dentry; + struct file *filp; + int i, rc = 0, cleanup_phase = 0; + ENTRY; + + O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1); + CDEBUG(D_INODE, "got/created O: %p\n", O_dentry); + if (IS_ERR(O_dentry)) { + rc = PTR_ERR(O_dentry); + CERROR("cannot open/create O: rc = %d\n", rc); + GOTO(cleanup, rc); + } + filter->fo_dentry_O = O_dentry; + cleanup_phase = 1; /* O_dentry */ + + /* Lookup "R" to tell if we're on an old OST FS and need to convert + * from O/R// to O/0//. This can be removed + * some time post 1.0 when all old-style OSTs have converted along + * with the init_objid hack. */ + dentry = ll_lookup_one_len("R", O_dentry, 1); + if (IS_ERR(dentry)) + GOTO(cleanup, rc = PTR_ERR(dentry)); + if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) { + struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1); + ENTRY; + + CWARN("converting OST to new object layout\n"); + if (IS_ERR(O0_dentry)) { + rc = PTR_ERR(O0_dentry); + CERROR("error looking up O/0: rc %d\n", rc); + GOTO(cleanup_R, rc); + } + + if (O0_dentry->d_inode) { + CERROR("Both O/R and O/0 exist. Fix manually.\n"); + GOTO(cleanup_O0, rc = -EEXIST); + } + + down(&O_dentry->d_inode->i_sem); + rc = vfs_rename(O_dentry->d_inode, dentry, + O_dentry->d_inode, O0_dentry); + up(&O_dentry->d_inode->i_sem); + + if (rc) { + CERROR("error renaming O/R to O/0: rc %d\n", rc); + GOTO(cleanup_O0, rc); + } + filter->fo_fsd->fsd_feature_incompat |= + cpu_to_le32(FILTER_INCOMPAT_GROUPS); + rc = filter_update_server_data(obd, filter->fo_rcvd_filp, + filter->fo_fsd, 1); + GOTO(cleanup_O0, rc); + + cleanup_O0: + dput(O0_dentry); + cleanup_R: + dput(dentry); + if (rc) + GOTO(cleanup, rc); + } else { + dput(dentry); + } + + OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64)); + if (filter->fo_last_objids == NULL) + GOTO(cleanup, rc = -ENOMEM); + cleanup_phase = 2; /* groups */ + + OBD_ALLOC(filter->fo_dentry_O_groups, FILTER_GROUPS * sizeof(dentry)); + if (filter->fo_dentry_O_groups == NULL) + GOTO(cleanup, rc = -ENOMEM); + OBD_ALLOC(filter->fo_last_objid_files, FILTER_GROUPS * sizeof(filp)); + if (filter->fo_last_objid_files == NULL) + GOTO(cleanup, rc = -ENOMEM); + + for (i = 0; i < FILTER_GROUPS; i++) { + char name[25]; + loff_t off = 0; + + sprintf(name, "%d", i); + dentry = simple_mkdir(O_dentry, name, 0700, 1); + CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot lookup/create O/%s: rc = %d\n", + name, rc); + GOTO(cleanup, rc); + } + filter->fo_dentry_O_groups[i] = dentry; + + sprintf(name, "O/%d/LAST_ID", i); + filp = filp_open(name, O_CREAT | O_RDWR, 0700); if (IS_ERR(dentry)) { rc = PTR_ERR(dentry); - CERROR("cannot create O/%s: rc = %d\n", type, rc); - GOTO(out_O_mode, rc); + CERROR("cannot create %s: rc = %d\n", name, rc); + GOTO(cleanup, rc); } - filter->fo_dentry_O_mode[mode] = dentry; + filter->fo_last_objid_files[i] = filp; + + if (filp->f_dentry->d_inode->i_size == 0) { + if (i == 0 && filter->fo_fsd->fsd_unused != 0) { + /* OST conversion, remove sometime post 1.0 */ + filter->fo_last_objids[i] = + le64_to_cpu(filter->fo_fsd->fsd_unused); + CWARN("saving old objid "LPU64" to LAST_ID\n", + filter->fo_last_objids[i]); + rc = filter_update_last_objid(obd, 0, 1); + if (rc) + GOTO(cleanup, rc); + } else { + filter->fo_last_objids[i] = FILTER_INIT_OBJID; + } + continue; + } + + rc = fsfilt_read_record(obd, filp, &filter->fo_last_objids[i], + sizeof(__u64), &off); + if (rc) { + CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n", + name, rc); + GOTO(cleanup, rc); + } + filter->fo_last_objids[i] = + le64_to_cpu(filter->fo_last_objids[i]); + CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n", + obd->obd_name, i, filter->fo_last_objids[i]); + } + + if (filter->fo_subdir_count) { + O_dentry = filter->fo_dentry_O_groups[0]; + OBD_ALLOC(filter->fo_dentry_O_sub, + filter->fo_subdir_count * sizeof(dentry)); + if (filter->fo_dentry_O_sub == NULL) + GOTO(cleanup, rc = -ENOMEM); + + for (i = 0; i < filter->fo_subdir_count; i++) { + char dir[20]; + snprintf(dir, sizeof(dir), "d%u", i); + + dentry = simple_mkdir(O_dentry, dir, 0700, 1); + CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("can't lookup/create O/0/%s: rc = %d\n", + dir, rc); + GOTO(cleanup, rc); + } + filter->fo_dentry_O_sub[i] = dentry; + } + } + RETURN(0); + + cleanup: + switch (cleanup_phase) { + case 2: + filter_cleanup_groups(obd); + case 1: + f_dput(filter->fo_dentry_O); + filter->fo_dentry_O = NULL; + default: + break; } + return rc; +} + +/* setup the object store with correct subdirectories */ +static int filter_prep(struct obd_device *obd) +{ + struct obd_run_ctxt saved; + struct filter_obd *filter = &obd->u.filter; + struct file *file; + struct inode *inode; + int rc = 0; + ENTRY; - file = filp_open("D/status", O_RDWR | O_CREAT, 0700); - if ( !file || IS_ERR(file) ) { + push_ctxt(&saved, &obd->obd_ctxt, NULL); + file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700); + if (!file || IS_ERR(file)) { rc = PTR_ERR(file); - CERROR("OBD filter: cannot open/create status %s: rc = %d\n", - "D/status", rc); - GOTO(out_O_mode, rc); + CERROR("OBD filter: cannot open/create %s: rc = %d\n", + LAST_RCVD, rc); + GOTO(out, rc); + } + + if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { + CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD, + file->f_dentry->d_inode->i_mode); + GOTO(err_filp, rc = -ENOENT); } /* steal operations */ @@ -168,48 +762,29 @@ static int filter_prep(struct obd_device *obd) filter->fo_iop = inode->i_op; filter->fo_aops = inode->i_mapping->a_ops; - if (inode->i_size == 0) { - __u64 disk_lastobjid = cpu_to_le64(lastobjid); - ssize_t retval = file->f_op->write(file,(char *)&disk_lastobjid, - sizeof(disk_lastobjid), - &file->f_pos); - if (retval != sizeof(disk_lastobjid)) { - CDEBUG(D_INODE,"OBD filter: error writing lastobjid\n"); - filp_close(file, 0); - GOTO(out_O_mode, rc = -EIO); - } - } else { - __u64 disk_lastobjid; - ssize_t retval = file->f_op->read(file, (char *)&disk_lastobjid, - sizeof(disk_lastobjid), - &file->f_pos); - if (retval != sizeof(disk_lastobjid)) { - CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n"); - filp_close(file, 0); - GOTO(out_O_mode, rc = -EIO); - } - lastobjid = le64_to_cpu(disk_lastobjid); + rc = filter_init_server_data(obd, file); + if (rc) { + CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc); + GOTO(err_filp, rc); } - filter->fo_lastobjid = lastobjid; - filp_close(file, 0); + filter->fo_rcvd_filp = file; + + rc = filter_prep_groups(obd); + if (rc) + GOTO(err_server_data, rc); - rc = 0; out: - pop_ctxt(&saved); + pop_ctxt(&saved, &obd->obd_ctxt, NULL); return(rc); -out_O_mode: - while (mode-- > 0) { - struct dentry *dentry = filter->fo_dentry_O_mode[mode]; - if (dentry) { - f_dput(dentry); - filter->fo_dentry_O_mode[mode] = NULL; - } - } -out_O: - f_dput(filter->fo_dentry_O); - filter->fo_dentry_O = NULL; + err_server_data: + //class_disconnect_exports(obd, 0); + filter_free_server_data(filter); + err_filp: + if (filp_close(file, 0)) + CERROR("can't close %s after error\n", LAST_RCVD); + filter->fo_rcvd_filp = NULL; goto out; } @@ -218,1622 +793,1684 @@ static void filter_post(struct obd_device *obd) { struct obd_run_ctxt saved; struct filter_obd *filter = &obd->u.filter; - __u64 disk_lastobjid; - long rc; - struct file *file; - int mode; + int rc, i; - push_ctxt(&saved, &filter->fo_ctxt, NULL); - file = filp_open("D/status", O_RDWR | O_CREAT, 0700); - if (IS_ERR(file)) { - CERROR("OBD filter: cannot create status file\n"); - goto out; - } + /* XXX: filter_update_lastobjid used to call fsync_dev. It might be + * best to start a transaction with h_sync, because we removed this + * from lastobjid */ - file->f_pos = 0; - disk_lastobjid = cpu_to_le64(filter->fo_lastobjid); - rc = file->f_op->write(file, (char *)&disk_lastobjid, - sizeof(disk_lastobjid), &file->f_pos); - if (rc != sizeof(disk_lastobjid)) - CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc); - - rc = filp_close(file, NULL); + push_ctxt(&saved, &obd->obd_ctxt, NULL); + rc = filter_update_server_data(obd, filter->fo_rcvd_filp, + filter->fo_fsd, 0); if (rc) - CERROR("OBD filter: cannot close status file: rc = %ld\n", rc); + CERROR("error writing server data: rc = %d\n", rc); - for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) { - struct dentry *dentry = filter->fo_dentry_O_mode[mode]; - if (dentry) { - f_dput(dentry); - filter->fo_dentry_O_mode[mode] = NULL; - } + for (i = 0; i < FILTER_GROUPS; i++) { + rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1)); + if (rc) + CERROR("error writing group %d lastobjid: rc = %d\n", + i, rc); } + + filp_close(filter->fo_rcvd_filp, 0); + filter->fo_rcvd_filp = NULL; + if (rc) + CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc); + + filter_cleanup_groups(obd); f_dput(filter->fo_dentry_O); -out: - pop_ctxt(&saved); + filter_free_server_data(filter); + pop_ctxt(&saved, &obd->obd_ctxt, NULL); } +static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa, + obd_id id) +{ + obd_gr group = 0; + LASSERT(filter->fo_fsd != NULL); + + if (oa != NULL) { + LASSERT(oa->o_gr <= FILTER_GROUPS); + group = oa->o_gr; + } + + spin_lock(&filter->fo_objidlock); + filter->fo_last_objids[group] = id; + spin_unlock(&filter->fo_objidlock); +} -static __u64 filter_next_id(struct obd_device *obd) +__u64 filter_last_id(struct filter_obd *filter, struct obdo *oa) { obd_id id; + obd_gr group = 0; + LASSERT(filter->fo_fsd != NULL); - spin_lock(&obd->u.filter.fo_objidlock); - id = ++obd->u.filter.fo_lastobjid; - spin_unlock(&obd->u.filter.fo_objidlock); + if (oa != NULL) { + LASSERT(oa->o_gr <= FILTER_GROUPS); + group = oa->o_gr; + } + + /* FIXME: object groups */ + spin_lock(&filter->fo_objidlock); + id = filter->fo_last_objids[group]; + spin_unlock(&filter->fo_objidlock); - /* FIXME: write the lastobjid to disk here */ return id; } -/* how to get files, dentries, inodes from object id's */ -/* parent i_sem is already held if needed for exclusivity */ -static struct dentry *filter_fid2dentry(struct obd_device *obd, - struct dentry *dparent, - __u64 id, __u32 type) +/* direct cut-n-paste of mds_blocking_ast() */ +static int filter_blocking_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, + void *data, int flag) { - struct super_block *sb = obd->u.filter.fo_sb; - struct dentry *dchild; - char name[32]; - int len; + int do_ast; ENTRY; - if (!sb || !sb->s_dev) { - CERROR("fatal: device not initialized.\n"); - RETURN(ERR_PTR(-ENXIO)); + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ + RETURN(0); } - if (id == 0) { - CERROR("fatal: invalid object #0\n"); - LBUG(); - RETURN(ERR_PTR(-ESTALE)); + /* XXX layering violation! -phil */ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy, + * such that filter_blocking_ast is called just before l_i_p takes the + * ns_lock, then by the time we get the lock, we might not be the + * correct blocking function anymore. So check, and return early, if + * so. */ + if (lock->l_blocking_ast != filter_blocking_ast) { + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + RETURN(0); } - if (!(type & S_IFMT)) { - CERROR("OBD %s, object "LPU64" has bad type: %o\n", - __FUNCTION__, id, type); - RETURN(ERR_PTR(-EINVAL)); - } + lock->l_flags |= LDLM_FL_CBPENDING; + do_ast = (!lock->l_readers && !lock->l_writers); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); - len = sprintf(name, LPU64, id); - CDEBUG(D_INODE, "opening object O/%s/%s\n", obd_mode_to_type(type), - name); - dchild = lookup_one_len(name, dparent, len); - if (IS_ERR(dchild)) { - CERROR("child lookup error %ld\n", PTR_ERR(dchild)); - RETURN(dchild); - } + if (do_ast) { + struct lustre_handle lockh; + int rc; - CDEBUG(D_INODE, "got child obj O/%s/%s: %p, count = %d\n", - obd_mode_to_type(type), name, dchild, - atomic_read(&dchild->d_count)); + LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) + CERROR("ldlm_cli_cancel: %d\n", rc); + } else { + LDLM_DEBUG(lock, "Lock still has references, will be " + "cancelled later"); + } + RETURN(0); +} - LASSERT(atomic_read(&dchild->d_count) > 0); +static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent) +{ + down(&dparent->d_inode->i_sem); + return 0; +} - RETURN(dchild); +/* We never dget the object parent, so DON'T dput it either */ +static void filter_parent_unlock(struct dentry *dparent) +{ + up(&dparent->d_inode->i_sem); } -static inline struct dentry *filter_parent(struct obd_device *obd, - obd_mode mode) +/* We never dget the object parent, so DON'T dput it either */ +struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) { struct filter_obd *filter = &obd->u.filter; + LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */ + + if (group > 0 || filter->fo_subdir_count == 0) + return filter->fo_dentry_O_groups[group]; - return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT]; + return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)]; } -static struct file *filter_obj_open(struct obd_export *export, - __u64 id, __u32 type) +/* We never dget the object parent, so DON'T dput it either */ +struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group, + obd_id objid) { - struct filter_obd *filter = &export->exp_obd->u.filter; - struct super_block *sb = filter->fo_sb; - struct dentry *dentry; - struct filter_export_data *fed = &export->exp_filter_data; - struct filter_dentry_data *fdd; - struct filter_file_data *ffd; - struct obd_run_ctxt saved; - char name[24]; - struct file *file; - ENTRY; + unsigned long now = jiffies; + struct dentry *dparent = filter_parent(obd, group, objid); + int rc; - if (!sb || !sb->s_dev) { - CERROR("fatal: device not initialized.\n"); - RETURN(ERR_PTR(-ENXIO)); - } + if (IS_ERR(dparent)) + return dparent; - if (!id) { - CERROR("fatal: invalid obdo "LPU64"\n", id); - RETURN(ERR_PTR(-ESTALE)); - } + rc = filter_lock_dentry(obd, dparent); + if (time_after(jiffies, now + 15 * HZ)) + CERROR("slow parent lock %lus\n", (jiffies - now) / HZ); + return rc ? ERR_PTR(rc) : dparent; +} - if (!(type & S_IFMT)) { - CERROR("OBD %s, object "LPU64" has bad type: %o\n", - __FUNCTION__, id, type); - RETURN(ERR_PTR(-EINVAL)); - } +/* How to get files, dentries, inodes from object id's. + * + * If dir_dentry is passed, the caller has already locked the parent + * appropriately for this operation (normally a write lock). If + * dir_dentry is NULL, we do a read lock while we do the lookup to + * avoid races with create/destroy and such changing the directory + * internal to the filesystem code. */ +struct dentry *filter_fid2dentry(struct obd_device *obd, + struct dentry *dir_dentry, + obd_gr group, obd_id id) +{ + struct dentry *dparent = dir_dentry; + struct dentry *dchild; + char name[32]; + int len; + ENTRY; - ffd = kmem_cache_alloc(filter_open_cache, SLAB_KERNEL); - if (!ffd) { - CERROR("obdfilter: out of memory\n"); - RETURN(ERR_PTR(-ENOMEM)); + if (id == 0) { + CERROR("fatal: invalid object id 0\n"); + RETURN(ERR_PTR(-ESTALE)); } - /* We preallocate this to avoid blocking while holding fo_fddlock */ - fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL); - if (!fdd) { - CERROR("obdfilter: out of memory\n"); - GOTO(out_ffd, file = ERR_PTR(-ENOMEM)); + len = sprintf(name, LPU64, id); + if (dir_dentry == NULL) { + dparent = filter_parent_lock(obd, group, id); + if (IS_ERR(dparent)) + RETURN(dparent); } - - filter_id(name, id, type); - push_ctxt(&saved, &filter->fo_ctxt, NULL); - file = filp_open(name, O_RDONLY | O_LARGEFILE, 0 /* type? */); - pop_ctxt(&saved); - - if (IS_ERR(file)) - GOTO(out_fdd, file); - - dentry = file->f_dentry; - spin_lock(&filter->fo_fddlock); - if (dentry->d_fsdata) { - spin_unlock(&filter->fo_fddlock); - kmem_cache_free(filter_dentry_cache, fdd); - fdd = dentry->d_fsdata; - LASSERT(kmem_cache_validate(filter_dentry_cache, fdd)); - /* should only happen during client recovery */ - if (fdd->fdd_flags & FILTER_FLAG_DESTROY) - CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id); - atomic_inc(&fdd->fdd_open_count); - } else { - atomic_set(&fdd->fdd_open_count, 1); - fdd->fdd_flags = 0; - /* If this is racy, then we can use {cmp}xchg and atomic_add */ - dentry->d_fsdata = fdd; - spin_unlock(&filter->fo_fddlock); + CDEBUG(D_INODE, "looking up object O/%*s/%s\n", + dparent->d_name.len, dparent->d_name.name, name); + dchild = /*ll_*/lookup_one_len(name, dparent, len); + if (dir_dentry == NULL) + filter_parent_unlock(dparent); + if (IS_ERR(dchild)) { + CERROR("child lookup error %ld\n", PTR_ERR(dchild)); + RETURN(dchild); } - get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie)); - ffd->ffd_file = file; - file->private_data = ffd; + CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n", + name, dchild, atomic_read(&dchild->d_count)); - if (!dentry->d_op) - dentry->d_op = &filter_dops; - else - LASSERT(dentry->d_op == &filter_dops); + LASSERT(atomic_read(&dchild->d_count) > 0); + + RETURN(dchild); +} - spin_lock(&fed->fed_lock); - list_add(&ffd->ffd_export_list, &fed->fed_open_head); - spin_unlock(&fed->fed_lock); +static int filter_prepare_destroy(struct obd_device *obd, obd_id objid) +{ + struct lustre_handle lockh; + int flags = LDLM_AST_DISCARD_DATA, rc; + struct ldlm_res_id res_id = { .name = { objid } }; + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; - CDEBUG(D_INODE, "opening objid "LPX64": rc = %p\n", id, file); + ENTRY; + /* Tell the clients that the object is gone now and that they should + * throw away any cached pages. */ + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, + LDLM_EXTENT, &policy, LCK_PW, + &flags, filter_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, &lockh); -out: - RETURN(file); + /* We only care about the side-effects, just drop the lock. */ + if (rc == ELDLM_OK) + ldlm_lock_decref(&lockh, LCK_PW); -out_fdd: - kmem_cache_free(filter_dentry_cache, fdd); -out_ffd: - ffd->ffd_servercookie = DEAD_HANDLE_MAGIC; - kmem_cache_free(filter_open_cache, ffd); - goto out; + RETURN(rc); } -/* Caller must hold i_sem on dir_dentry->d_inode */ -static int filter_destroy_internal(struct obd_device *obd, - struct dentry *dir_dentry, - struct dentry *object_dentry) +/* Caller must hold LCK_PW on parent and push us into kernel context. + * Caller is also required to ensure that dchild->d_inode exists. */ +static int filter_destroy_internal(struct obd_device *obd, obd_id objid, + struct dentry *dparent, + struct dentry *dchild) { - struct obd_run_ctxt saved; - struct inode *inode = object_dentry->d_inode; + struct inode *inode = dchild->d_inode; int rc; ENTRY; if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) { - CERROR("destroying objid %*s nlink = %d, count = %d\n", - object_dentry->d_name.len, - object_dentry->d_name.name, - inode->i_nlink, atomic_read(&inode->i_count)); + CERROR("destroying objid %*s nlink = %lu, count = %d\n", + dchild->d_name.len, dchild->d_name.name, + (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count)); } - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - rc = vfs_unlink(dir_dentry->d_inode, object_dentry); - /* XXX unlink from PENDING directory now too */ - pop_ctxt(&saved); + rc = vfs_unlink(dparent->d_inode, dchild); if (rc) CERROR("error unlinking objid %*s: rc %d\n", - object_dentry->d_name.len, - object_dentry->d_name.name, rc); + dchild->d_name.len, dchild->d_name.name, rc); RETURN(rc); } -static int filter_close_internal(struct obd_device *obd, - struct filter_file_data *ffd) +static int filter_intent_policy(struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + ldlm_mode_t mode, int flags, void *data) { - struct file *filp = ffd->ffd_file; - struct dentry *object_dentry = dget(filp->f_dentry); - struct filter_dentry_data *fdd = object_dentry->d_fsdata; - int rc, rc2 = 0; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + struct ptlrpc_request *req = req_cookie; + struct ldlm_lock *lock = *lockp, *l = NULL; + struct ldlm_resource *res = lock->l_resource; + ldlm_processing_policy policy; + struct ost_lvb *res_lvb, *reply_lvb; + struct list_head *tmp; + ldlm_error_t err; + int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply), + sizeof(struct ost_lvb) }; ENTRY; - LASSERT(filp->private_data == ffd); - LASSERT(fdd); - - rc = filp_close(filp, 0); - - if (atomic_dec_and_test(&fdd->fdd_open_count) && - fdd->fdd_flags & FILTER_FLAG_DESTROY) { - struct dentry *dir_dentry = filter_parent(obd, S_IFREG); + policy = ldlm_get_processing_policy(res); + LASSERT(policy != NULL); + LASSERT(req != NULL); - down(&dir_dentry->d_inode->i_sem); - rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry); - if (rc2 && !rc) - rc = rc2; - up(&dir_dentry->d_inode->i_sem); + rc = lustre_pack_reply(req, 2, repsize, NULL); + if (rc) + RETURN(req->rq_status = rc); + + reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb)); + LASSERT(reply_lvb != NULL); + + //fixup_handle_for_resent_req(req, lock, &lockh); + + /* If we grant any lock at all, it will be a whole-file read lock. + * Call the extent policy function to see if our request can be + * granted, or is blocked. */ + lock->l_policy_data.l_extent.start = 0; + lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; + lock->l_req_mode = LCK_PR; + + l_lock(&res->lr_namespace->ns_lock); + + res->lr_tmp = &rpc_list; + rc = policy(lock, &tmpflags, 0, &err); + res->lr_tmp = NULL; + + /* FIXME: we should change the policy function slightly, to not make + * this list at all, since we just turn around and free it */ + while (!list_empty(&rpc_list)) { + struct ldlm_ast_work *w = + list_entry(rpc_list.next, struct ldlm_ast_work, w_list); + list_del(&w->w_list); + LDLM_LOCK_PUT(w->w_lock); + OBD_FREE(w, sizeof(*w)); } - f_dput(object_dentry); - kmem_cache_free(filter_open_cache, ffd); - - RETURN(rc); -} - -/* obd methods */ -static int filter_connect(struct lustre_handle *conn, struct obd_device *obd, - obd_uuid_t cluuid, struct recovd_obd *recovd, - ptlrpc_recovery_cb_t recover) -{ - struct obd_export *exp; - int rc; - - ENTRY; - MOD_INC_USE_COUNT; - rc = class_connect(conn, obd, cluuid); - if (rc) - GOTO(out_dec, rc); - exp = class_conn2export(conn); - LASSERT(exp); + if (rc == LDLM_ITER_CONTINUE) { + /* The lock met with no resistance; we're finished. */ + l_unlock(&res->lr_namespace->ns_lock); + RETURN(ELDLM_LOCK_REPLACED); + } - INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head); - spin_lock_init(&exp->exp_filter_data.fed_lock); -out: - RETURN(rc); + /* Do not grant any lock, but instead send GL callbacks. The extent + * policy nicely created a list of all PW locks for us. We will choose + * the highest of those which are larger than the size in the LVB, if + * any, and perform a glimpse callback. */ + down(&res->lr_lvb_sem); + res_lvb = res->lr_lvb_data; + LASSERT(res_lvb != NULL); + reply_lvb->lvb_size = res_lvb->lvb_size; + up(&res->lr_lvb_sem); + + list_for_each(tmp, &res->lr_granted) { + struct ldlm_lock *tmplock = + list_entry(tmp, struct ldlm_lock, l_res_link); + + if (tmplock->l_granted_mode == LCK_PR) + continue; -out_dec: - MOD_DEC_USE_COUNT; - goto out; -} + if (tmplock->l_policy_data.l_extent.end <= + reply_lvb->lvb_size) + continue; -static int filter_disconnect(struct lustre_handle *conn) -{ - struct obd_export *exp = class_conn2export(conn); - struct filter_export_data *fed; - int rc; - ENTRY; + if (l == NULL) { + l = LDLM_LOCK_GET(tmplock); + continue; + } - LASSERT(exp); - fed = &exp->exp_filter_data; - spin_lock(&fed->fed_lock); - while (!list_empty(&fed->fed_open_head)) { - struct filter_file_data *ffd; + if (l->l_policy_data.l_extent.start > + tmplock->l_policy_data.l_extent.start) + continue; - ffd = list_entry(fed->fed_open_head.next, typeof(*ffd), - ffd_export_list); - list_del(&ffd->ffd_export_list); - spin_unlock(&fed->fed_lock); + LDLM_LOCK_PUT(l); + l = LDLM_LOCK_GET(tmplock); + } + l_unlock(&res->lr_namespace->ns_lock); - CERROR("force closing file %*s on disconnect\n", - ffd->ffd_file->f_dentry->d_name.len, - ffd->ffd_file->f_dentry->d_name.name); + /* There were no PW locks beyond the size in the LVB; finished. */ + if (l == NULL) + RETURN(ELDLM_LOCK_ABORTED); - filter_close_internal(exp->exp_obd, ffd); - spin_lock(&fed->fed_lock); + LASSERT(l->l_glimpse_ast != NULL); + rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ + if (rc != 0 && res->lr_namespace->ns_lvbo && + res->lr_namespace->ns_lvbo->lvbo_update) { + res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); } - spin_unlock(&fed->fed_lock); - ldlm_cancel_locks_for_export(exp); - rc = class_disconnect(conn); - if (!rc) - MOD_DEC_USE_COUNT; + down(&res->lr_lvb_sem); + reply_lvb->lvb_size = res_lvb->lvb_size; + up(&res->lr_lvb_sem); - /* XXX cleanup preallocated inodes */ - RETURN(rc); + LDLM_LOCK_PUT(l); + + RETURN(ELDLM_LOCK_ABORTED); } /* mount the file system (secretly) */ -static int filter_setup(struct obd_device *obd, obd_count len, void *buf) +int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, + char *option) { - struct obd_ioctl_data* data = buf; - struct filter_obd *filter; + struct lustre_cfg* lcfg = buf; + struct filter_obd *filter = &obd->u.filter; struct vfsmount *mnt; - int err = 0; + int rc = 0; ENTRY; - if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2) + dev_clear_rdonly(2); + + if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2) RETURN(-EINVAL); - MOD_INC_USE_COUNT; - mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); - err = PTR_ERR(mnt); + obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2); + if (IS_ERR(obd->obd_fsops)) + RETURN(PTR_ERR(obd->obd_fsops)); + + mnt = do_kern_mount(lcfg->lcfg_inlbuf2, MS_NOATIME | MS_NODIRATIME, + lcfg->lcfg_inlbuf1, option); + rc = PTR_ERR(mnt); if (IS_ERR(mnt)) - GOTO(err_dec, err); + GOTO(err_ops, rc); + + if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) { + if (*lcfg->lcfg_inlbuf3 == 'f') { + obd->obd_replayable = 1; + obd_sync_filter = 1; + CWARN("%s: recovery enabled\n", obd->obd_name); + } else { + if (*lcfg->lcfg_inlbuf3 != 'n') { + CERROR("unrecognised flag '%c'\n", + *lcfg->lcfg_inlbuf3); + } + // XXX Robert? Why do we get errors here + // GOTO(err_mntput, rc = -EINVAL); + } + } - filter = &obd->u.filter;; filter->fo_vfsmnt = mnt; - filter->fo_fstype = strdup(data->ioc_inlbuf2); - filter->fo_sb = mnt->mnt_root->d_inode->i_sb; - CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, filter->fo_vfsmnt); - /* XXX is this even possible if do_kern_mount succeeded? */ - if (!filter->fo_sb) - GOTO(err_kfree, err = -ENODEV); - - OBD_SET_CTXT_MAGIC(&filter->fo_ctxt); - filter->fo_ctxt.pwdmnt = mnt; - filter->fo_ctxt.pwd = mnt->mnt_root; - filter->fo_ctxt.fs = get_ds(); - - err = filter_prep(obd); - if (err) - GOTO(err_kfree, err); - spin_lock_init(&filter->fo_fddlock); + filter->fo_sb = mnt->mnt_sb; + filter->fo_fstype = mnt->mnt_sb->s_type->name; + CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt); + + OBD_SET_CTXT_MAGIC(&obd->obd_ctxt); + obd->obd_ctxt.pwdmnt = mnt; + obd->obd_ctxt.pwd = mnt->mnt_root; + obd->obd_ctxt.fs = get_ds(); + obd->obd_ctxt.cb_ops = filter_lvfs_ops; + + rc = filter_prep(obd); + if (rc) + GOTO(err_mntput, rc); + + spin_lock_init(&filter->fo_translock); spin_lock_init(&filter->fo_objidlock); INIT_LIST_HEAD(&filter->fo_export_list); - - obd->obd_namespace = - ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER); + sema_init(&filter->fo_alloc_lock, 1); + spin_lock_init(&filter->fo_r_pages.oh_lock); + spin_lock_init(&filter->fo_w_pages.oh_lock); + spin_lock_init(&filter->fo_r_discont_pages.oh_lock); + spin_lock_init(&filter->fo_w_discont_pages.oh_lock); + spin_lock_init(&filter->fo_r_discont_blocks.oh_lock); + spin_lock_init(&filter->fo_w_discont_blocks.oh_lock); + filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE; + + obd->obd_namespace = ldlm_namespace_new("filter-tgt", + LDLM_NAMESPACE_SERVER); if (obd->obd_namespace == NULL) - LBUG(); + GOTO(err_post, rc = -ENOMEM); + obd->obd_namespace->ns_lvbp = obd; + obd->obd_namespace->ns_lvbo = &filter_lvbo; + ldlm_register_intent(obd->obd_namespace, filter_intent_policy); ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, "filter_ldlm_cb_client", &obd->obd_ldlm_client); + rc = llog_cat_initialize(obd, 1); + if (rc) { + CERROR("failed to setup llogging subsystems\n"); + GOTO(err_post, rc); + } + RETURN(0); -err_kfree: - kfree(filter->fo_fstype); +err_post: + filter_post(obd); +err_mntput: unlock_kernel(); - mntput(filter->fo_vfsmnt); + mntput(mnt); filter->fo_sb = 0; lock_kernel(); - -err_dec: - MOD_DEC_USE_COUNT; - return err; +err_ops: + fsfilt_put_ops(obd->obd_fsops); + return rc; } +static int filter_setup(struct obd_device *obd, obd_count len, void *buf) +{ + struct lustre_cfg* lcfg = buf; + const char *str = NULL; + char *option = NULL; + int n = 0; + int rc; + + if (!strcmp(lcfg->lcfg_inlbuf2, "ext3")) { +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + /* bug 1577: implement async-delete for 2.5 */ + str = "errors=remount-ro,asyncdel"; +#else + str = "errors=remount-ro"; +#endif + n = strlen(str) + 1; + OBD_ALLOC(option, n); + if (option == NULL) + RETURN(-ENOMEM); + strcpy(option, str); + } + + rc = filter_common_setup(obd, len, buf, option); + if (option) + OBD_FREE(option, n); + return rc; +} -static int filter_cleanup(struct obd_device *obd) +static int filter_cleanup(struct obd_device *obd, int flags) { - struct super_block *sb; + struct filter_obd *filter = &obd->u.filter; ENTRY; + if (flags & OBD_OPT_FAILOVER) + CERROR("%s: shutting down for failover; client state will" + " be preserved.\n", obd->obd_name); + if (!list_empty(&obd->obd_exports)) { - CERROR("still has clients!\n"); - class_disconnect_all(obd); + CERROR("%s: still has clients!\n", obd->obd_name); + class_disconnect_exports(obd, flags); if (!list_empty(&obd->obd_exports)) { CERROR("still has exports after forced cleanup?\n"); RETURN(-EBUSY); } } - ldlm_namespace_free(obd->obd_namespace); + ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE); - sb = obd->u.filter.fo_sb; - if (!obd->u.filter.fo_sb) + if (filter->fo_sb == NULL) RETURN(0); filter_post(obd); - shrink_dcache_parent(sb->s_root); - unlock_kernel(); - mntput(obd->u.filter.fo_vfsmnt); - obd->u.filter.fo_sb = 0; - kfree(obd->u.filter.fo_fstype); + shrink_dcache_parent(filter->fo_sb->s_root); + filter->fo_sb = 0; + if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1) + CERROR("%s: mount point %p busy, mnt_count: %d\n", + obd->obd_name, filter->fo_vfsmnt, + atomic_read(&filter->fo_vfsmnt->mnt_count)); + + unlock_kernel(); + mntput(filter->fo_vfsmnt); + //destroy_buffers(filter->fo_sb->s_dev); + filter->fo_sb = NULL; + fsfilt_put_ops(obd->obd_fsops); lock_kernel(); - MOD_DEC_USE_COUNT; + dev_clear_rdonly(2); + RETURN(0); } - -static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid) +static int filter_attach(struct obd_device *obd, obd_count len, void *data) { - int type = oa->o_mode & S_IFMT; - ENTRY; + struct lprocfs_static_vars lvars; + int rc; - CDEBUG(D_INFO, "src inode %ld (%p), dst obdo %ld valid 0x%08x\n", - inode->i_ino, inode, (long)oa->o_id, valid); - /* Don't copy the inode number in place of the object ID */ - obdo_from_inode(oa, inode, valid); - oa->o_mode &= ~S_IFMT; - oa->o_mode |= type; + lprocfs_init_vars(filter, &lvars); + rc = lprocfs_obd_attach(obd, lvars.obd_vars); + if (rc != 0) + return rc; - if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) { - obd_rdev rdev = kdev_t_to_nr(inode->i_rdev); - oa->o_rdev = rdev; - oa->o_valid |= OBD_MD_FLRDEV; - } + rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST); + if (rc != 0) + return rc; - EXIT; + /* Init obdfilter private stats here */ + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES, + LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, + LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); + + return lproc_filter_attach_seqstat(obd); } -static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle) +static int filter_detach(struct obd_device *dev) { - struct filter_file_data *ffd = NULL; - ENTRY; + lprocfs_free_obd_stats(dev); + return lprocfs_obd_detach(dev); +} - if (!handle || !handle->addr) - RETURN(NULL); +/* nearly identical to mds_connect */ +static int filter_connect(struct lustre_handle *conn, struct obd_device *obd, + struct obd_uuid *cluuid) +{ + struct obd_export *exp; + struct filter_export_data *fed; + struct filter_client_data *fcd = NULL; + struct filter_obd *filter = &obd->u.filter; + int rc; + ENTRY; - ffd = (struct filter_file_data *)(unsigned long)(handle->addr); - if (!kmem_cache_validate(filter_open_cache, (void *)ffd)) - RETURN(NULL); + if (conn == NULL || obd == NULL || cluuid == NULL) + RETURN(-EINVAL); - if (ffd->ffd_servercookie != handle->cookie) - RETURN(NULL); + rc = class_connect(conn, obd, cluuid); + if (rc) + RETURN(rc); + exp = class_conn2export(conn); + LASSERT(exp != NULL); - LASSERT(ffd->ffd_file->private_data == ffd); - RETURN(ffd); -} + fed = &exp->exp_filter_data; -static struct dentry *__filter_oa2dentry(struct lustre_handle *conn, - struct obdo *oa, char *what) -{ - struct dentry *dentry = NULL; + spin_lock_init(&fed->fed_lock); - if (oa->o_valid & OBD_MD_FLHANDLE) { - struct lustre_handle *ost_handle = obdo_handle(oa); - struct filter_file_data *ffd = filter_handle2ffd(ost_handle); + if (!obd->obd_replayable) + GOTO(cleanup, rc = 0); - if (ffd) - dentry = dget(ffd->ffd_file->f_dentry); + OBD_ALLOC(fcd, sizeof(*fcd)); + if (!fcd) { + CERROR("filter: out of memory for client data\n"); + GOTO(cleanup, rc = -ENOMEM); } - if (!dentry) { - struct obd_device *obd = class_conn2obd(conn); - if (!obd) { - CERROR("invalid client "LPX64"\n", conn->addr); - RETURN(ERR_PTR(-EINVAL)); - } - dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode), - oa->o_id, oa->o_mode); - } + memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid)); + fed->fed_fcd = fcd; - if (!dentry->d_inode) { - CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id); - f_dput(dentry); - dentry = ERR_PTR(-ENOENT); - } + rc = filter_client_add(obd, filter, fed, -1); - return dentry; +cleanup: + if (rc) { + if (fcd) + OBD_FREE(fcd, sizeof(*fcd)); + class_disconnect(exp, 0); + } else { + class_export_put(exp); + } + return rc; } -#define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__) - -static int filter_getattr(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) +static int filter_precleanup(struct obd_device *obd, int flags) { - struct dentry *dentry = NULL; int rc = 0; ENTRY; - dentry = filter_oa2dentry(conn, oa); - if (IS_ERR(dentry)) - RETURN(PTR_ERR(dentry)); - - filter_from_inode(oa, dentry->d_inode, oa->o_valid); + rc = obd_llog_finish(obd, 0); + if (rc) + CERROR("failed to cleanup llogging subsystem\n"); - f_dput(dentry); RETURN(rc); } -static int filter_setattr(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) +/* Do extra sanity checks for grant accounting. We do this at connect, + * disconnect, and statfs RPC time, so it shouldn't be too bad. We can + * always get rid of it or turn it off when we know accounting is good. */ +static void filter_grant_sanity_check(struct obd_device *obd, char *func) { - struct obd_run_ctxt saved; - struct obd_device *obd = class_conn2obd(conn); - struct dentry *dentry; - struct iattr iattr; - struct inode *inode; - int rc; - ENTRY; - - dentry = filter_oa2dentry(conn, oa); - - if (IS_ERR(dentry)) - RETURN(PTR_ERR(dentry)); - - iattr_from_obdo(&iattr, oa, oa->o_valid); - iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG; - inode = dentry->d_inode; - - lock_kernel(); - if (iattr.ia_valid & ATTR_SIZE) - down(&inode->i_sem); - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - if (inode->i_op->setattr) - rc = inode->i_op->setattr(dentry, &iattr); - else - rc = inode_setattr(inode, &iattr); - pop_ctxt(&saved); - if (iattr.ia_valid & ATTR_SIZE) { - up(&inode->i_sem); - oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME; - obdo_from_inode(oa, inode, oa->o_valid); + struct filter_export_data *fed; + struct obd_export *exp; + obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize; + obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0; + obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted; + + if (list_empty(&obd->obd_exports)) + return; + + spin_lock(&obd->obd_osfs_lock); + spin_lock(&obd->obd_dev_lock); + list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) { + fed = &exp->exp_filter_data; + LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize, + "cli %s/%p %lu+%lu > "LPU64"\n", + exp->exp_client_uuid.uuid, exp, + fed->fed_grant, fed->fed_pending, maxsize); + LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n", + exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize); + CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + tot_granted += fed->fed_grant + fed->fed_pending; + tot_pending += fed->fed_pending; + tot_dirty += fed->fed_dirty; } - unlock_kernel(); + fo_tot_granted = obd->u.filter.fo_tot_granted; + fo_tot_pending = obd->u.filter.fo_tot_pending; + fo_tot_dirty = obd->u.filter.fo_tot_dirty; + spin_unlock(&obd->obd_dev_lock); + spin_unlock(&obd->obd_osfs_lock); + + /* Do these assertions outside the spinlocks so we don't kill system */ + if (tot_granted != fo_tot_granted) + CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n", + func, tot_granted, fo_tot_granted); + if (tot_pending != fo_tot_pending) + CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n", + func, tot_pending, fo_tot_pending); + if (tot_dirty != fo_tot_dirty) + CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n", + func, tot_dirty, fo_tot_dirty); + if (tot_pending > tot_granted) + CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n", + func, tot_pending, tot_granted); + if (tot_granted > maxsize) + CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n", + func, tot_granted, maxsize); + if (tot_dirty > maxsize) + CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n", + func, tot_dirty, maxsize); +} - f_dput(dentry); - RETURN(rc); +/* Remove this client from the grant accounting totals. We also remove + * the export from the obd device under the osfs and dev locks to ensure + * that the filter_grant_sanity_check() calculations are always valid. + * The client should do something similar when it invalidates its import. */ +static void filter_grant_discard(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + struct filter_obd *filter = &obd->u.filter; + struct filter_export_data *fed = &exp->exp_filter_data; + + spin_lock(&obd->obd_osfs_lock); + spin_lock(&exp->exp_obd->obd_dev_lock); + list_del_init(&exp->exp_obd_chain); + spin_unlock(&exp->exp_obd->obd_dev_lock); + + CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + + LASSERTF(filter->fo_tot_granted >= fed->fed_grant, + "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n", + obd->obd_name, filter->fo_tot_granted, + exp->exp_client_uuid.uuid, exp, fed->fed_grant); + filter->fo_tot_granted -= fed->fed_grant; + LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending, + "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n", + obd->obd_name, filter->fo_tot_pending, + exp->exp_client_uuid.uuid, exp, fed->fed_pending); + LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty, + "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n", + obd->obd_name, filter->fo_tot_dirty, + exp->exp_client_uuid.uuid, exp, fed->fed_dirty); + filter->fo_tot_dirty -= fed->fed_dirty; + fed->fed_dirty = 0; + fed->fed_grant = 0; + + spin_unlock(&obd->obd_osfs_lock); } -static int filter_open(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *ea) +static int filter_destroy_export(struct obd_export *exp) { - struct obd_export *export; - struct lustre_handle *handle; - struct filter_file_data *ffd; - struct file *filp; - int rc = 0; ENTRY; - export = class_conn2export(conn); - if (!export) { - CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr); - RETURN(-EINVAL); - } + if (exp->exp_filter_data.fed_pending) + CERROR("%s: cli %s/%p has %lu pending on destroyed export\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, + exp, exp->exp_filter_data.fed_pending); - filp = filter_obj_open(export, oa->o_id, oa->o_mode); - if (IS_ERR(filp)) - GOTO(out, rc = PTR_ERR(filp)); + target_destroy_export(exp); - filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid); + if (exp->exp_obd->obd_replayable) + filter_client_free(exp, exp->exp_flags); - ffd = filp->private_data; - handle = obdo_handle(oa); - handle->addr = (__u64)(unsigned long)ffd; - handle->cookie = ffd->ffd_servercookie; - oa->o_valid |= OBD_MD_FLHANDLE; -out: - RETURN(rc); -} /* filter_open */ + filter_grant_discard(exp); + if (!(exp->exp_flags & OBD_OPT_FORCE)) + filter_grant_sanity_check(exp->exp_obd, __FUNCTION__); -static int filter_close(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *ea) + RETURN(0); +} + +/* also incredibly similar to mds_disconnect */ +static int filter_disconnect(struct obd_export *exp, int flags) { - struct obd_export *exp; - struct filter_file_data *ffd; - struct filter_export_data *fed; + struct obd_device *obd = exp->exp_obd; + unsigned long irqflags; + struct llog_ctxt *ctxt; int rc; ENTRY; - exp = class_conn2export(conn); - if (!exp) { - CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr); - RETURN(-EINVAL); - } - - if (!(oa->o_valid & OBD_MD_FLHANDLE)) { - CERROR("no handle for close of objid "LPX64"\n", oa->o_id); - RETURN(-EINVAL); - } - - ffd = filter_handle2ffd(obdo_handle(oa)); - if (!ffd) { - struct lustre_handle *handle = obdo_handle(oa); - CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n", - handle->addr, handle->cookie); - RETURN(-ESTALE); - } - - fed = &exp->exp_filter_data; - spin_lock(&fed->fed_lock); - list_del(&ffd->ffd_export_list); - spin_unlock(&fed->fed_lock); - - rc = filter_close_internal(exp->exp_obd, ffd); - - RETURN(rc); -} /* filter_close */ + LASSERT(exp); + class_export_get(exp); -static int filter_create(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md **ea) -{ - struct obd_device *obd = class_conn2obd(conn); - char name[64]; - struct obd_run_ctxt saved; - struct dentry *new; - struct iattr; - ENTRY; + spin_lock_irqsave(&exp->exp_lock, irqflags); + exp->exp_flags = flags; + spin_unlock_irqrestore(&exp->exp_lock, irqflags); - if (!obd) { - CERROR("invalid client "LPX64"\n", conn->addr); - return -EINVAL; - } + if (!(flags & OBD_OPT_FORCE)) + filter_grant_sanity_check(obd, __FUNCTION__); + filter_grant_discard(exp); - if (!(oa->o_mode & S_IFMT)) { - CERROR("OBD %s, object "LPU64" has bad type: %o\n", - __FUNCTION__, oa->o_id, oa->o_mode); - return -ENOENT; - } + /* Disconnect early so that clients can't keep using export */ + rc = class_disconnect(exp, flags); - oa->o_id = filter_next_id(obd); + ldlm_cancel_locks_for_export(exp); - //filter_id(name, oa->o_id, oa->o_mode); - sprintf(name, LPU64, oa->o_id); - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - new = simple_mknod(filter_parent(obd, oa->o_mode), name, oa->o_mode); - pop_ctxt(&saved); - if (IS_ERR(new)) { - CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(new)); - return -ENOENT; - } + fsfilt_sync(obd, obd->u.filter.fo_sb); - /* Set flags for fields we have set in the inode struct */ - oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS | - OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME; - filter_from_inode(oa, new->d_inode, oa->o_valid); - f_dput(new); + /* flush any remaining cancel messages out to the target */ + ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT); + llog_sync(ctxt, exp); - return 0; + class_export_put(exp); + RETURN(rc); } -static int filter_destroy(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *ea) +struct dentry *__filter_oa2dentry(struct obd_device *obd, + struct obdo *oa, const char *what) { - struct obd_device *obd = class_conn2obd(conn); - struct dentry *dir_dentry, *object_dentry; - struct filter_dentry_data *fdd; - int rc; - ENTRY; - - if (!obd) { - CERROR("invalid client "LPX64"\n", conn->addr); - RETURN(-EINVAL); - } - - CDEBUG(D_INODE, "destroying objid "LPX64"\n", oa->o_id); + struct dentry *dchild = NULL; + obd_gr group = 0; - dir_dentry = filter_parent(obd, oa->o_mode); - down(&dir_dentry->d_inode->i_sem); + if (oa->o_valid & OBD_MD_FLGROUP) + group = oa->o_gr; - object_dentry = filter_oa2dentry(conn, oa); - if (IS_ERR(object_dentry)) - GOTO(out, rc = -ENOENT); + dchild = filter_fid2dentry(obd, NULL, group, oa->o_id); - fdd = object_dentry->d_fsdata; - if (fdd && atomic_read(&fdd->fdd_open_count)) { - if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) { - fdd->fdd_flags |= FILTER_FLAG_DESTROY; - /* XXX put into PENDING directory in case of crash */ - CDEBUG(D_INODE, - "defer destroy of %dx open objid "LPX64"\n", - atomic_read(&fdd->fdd_open_count), oa->o_id); - } else - CDEBUG(D_INODE, - "repeat destroy of %dx open objid "LPX64"\n", - atomic_read(&fdd->fdd_open_count), oa->o_id); - GOTO(out_dput, rc = 0); + if (IS_ERR(dchild)) { + CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id); + RETURN(dchild); } - rc = filter_destroy_internal(obd, dir_dentry, object_dentry); -out_dput: - f_dput(object_dentry); - - EXIT; -out: - up(&dir_dentry->d_inode->i_sem); - return rc; -} - -/* NB count and offset are used for punch, but not truncate */ -static int filter_truncate(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *lsm, - obd_off start, obd_off end) -{ - int error; - ENTRY; - - if (end != OBD_OBJECT_EOF) - CERROR("PUNCH not supported, only truncate works\n"); + if (dchild->d_inode == NULL) { + CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id); + f_dput(dchild); + RETURN(ERR_PTR(-ENOENT)); + } - CDEBUG(D_INODE, "calling truncate for object "LPX64", valid = %x, " - "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start); - oa->o_size = start; - error = filter_setattr(conn, oa, NULL); - RETURN(error); + return dchild; } -static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, - struct lov_stripe_md *lsm, obd_count oa_bufs, - struct brw_page *pga, brw_cb_t callback, - struct brw_cb_data *brw_cbd) +static int filter_getattr(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *md) { - struct obd_export *export = class_conn2export(conn); - struct obd_run_ctxt saved; - struct super_block *sb; - int pnum; /* index to pages (bufs) */ - unsigned long retval; - int error; - struct file *file; - int pg; + struct dentry *dentry = NULL; + struct obd_device *obd; + int rc = 0; ENTRY; - if (!export) { - CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr); + obd = class_exp2obd(exp); + if (obd == NULL) { + CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n", + exp->exp_handle.h_cookie); RETURN(-EINVAL); } - sb = export->exp_obd->u.filter.fo_sb; - push_ctxt(&saved, &export->exp_obd->u.filter.fo_ctxt, NULL); - pnum = 0; /* pnum indexes buf 0..num_pages */ - - file = filter_obj_open(export, lsm->lsm_object_id, S_IFREG); - if (IS_ERR(file)) - GOTO(out, retval = PTR_ERR(file)); - - /* count doubles as retval */ - for (pg = 0; pg < oa_bufs; pg++) { - CDEBUG(D_INODE, "OP %d obdo pgno: (%d) (%ld,"LPU64 - ") off count ("LPU64",%d)\n", - cmd, pnum, file->f_dentry->d_inode->i_ino, - pga[pnum].off >> PAGE_CACHE_SHIFT, pga[pnum].off, - (int)pga[pnum].count); - if (cmd & OBD_BRW_WRITE) { - loff_t off; - char *buffer; - off = pga[pnum].off; - buffer = kmap(pga[pnum].pg); - retval = file->f_op->write(file, buffer, - pga[pnum].count, - &off); - kunmap(pga[pnum].pg); - CDEBUG(D_INODE, "retval %ld\n", retval); - } else { - loff_t off = pga[pnum].off; - char *buffer = kmap(pga[pnum].pg); - - if (off >= file->f_dentry->d_inode->i_size) { - memset(buffer, 0, pga[pnum].count); - retval = pga[pnum].count; - } else { - retval = file->f_op->read(file, buffer, - pga[pnum].count, &off); - } - kunmap(pga[pnum].pg); - - if (retval != pga[pnum].count) { - filp_close(file, 0); - GOTO(out, retval = -EIO); - } - CDEBUG(D_INODE, "retval %ld\n", retval); - } - pnum++; - } - /* sizes and blocks are set by generic_file_write */ - /* ctimes/mtimes will follow with a setattr call */ - filp_close(file, 0); + dentry = filter_oa2dentry(obd, oa); + if (IS_ERR(dentry)) + RETURN(PTR_ERR(dentry)); - /* XXX: do something with callback if it is set? */ + /* Limit the valid bits in the return data to what we actually use */ + oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); - EXIT; -out: - pop_ctxt(&saved); - error = (retval >= 0) ? 0 : retval; - return error; + f_dput(dentry); + RETURN(rc); } -/* - * Calculate the number of buffer credits needed to write multiple pages in - * a single ext3/extN transaction. No, this shouldn't be here, but as yet - * ext3 doesn't have a nice API for calculating this sort of thing in advance. - * - * See comment above ext3_writepage_trans_blocks for details. We assume - * no data journaling is being done, but it does allow for all of the pages - * being non-contiguous. If we are guaranteed contiguous pages we could - * reduce the number of (d)indirect blocks a lot. - * - * With N blocks per page and P pages, for each inode we have at most: - * N*P indirect - * min(N*P, blocksize/4 + 1) dindirect blocks - * 1 tindirect - * - * For the entire filesystem, we have at most: - * min(sum(nindir + P), ngroups) bitmap blocks (from the above) - * min(sum(nindir + P), gdblocks) group descriptor blocks (from the above) - * 1 inode block - * 1 superblock - * 2 * EXT3_SINGLEDATA_TRANS_BLOCKS for the quota files - */ -static int ext3_credits_needed(struct super_block *sb, int objcount, - struct obd_ioobj *obj) +/* this is called from filter_truncate() until we have filter_punch() */ +static int filter_setattr(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *md, struct obd_trans_info *oti) { - struct obd_ioobj *o = obj; - int blockpp = 1 << (PAGE_CACHE_SHIFT - sb->s_blocksize_bits); - int addrpp = EXT3_ADDR_PER_BLOCK(sb) * blockpp; - int nbitmaps = 0; - int ngdblocks = 0; - int needed = objcount + 1; - int i; - - for (i = 0; i < objcount; i++, o++) { - int nblocks = o->ioo_bufcnt * blockpp; - int ndindirect = min(nblocks, addrpp + 1); - int nindir = nblocks + ndindirect + 1; + struct obd_run_ctxt saved; + struct filter_obd *filter; + struct dentry *dentry; + struct iattr iattr; + struct ldlm_res_id res_id = { .name = { oa->o_id } }; + struct ldlm_resource *res; + void *handle; + int rc, rc2; + ENTRY; - nbitmaps += nindir + nblocks; - ngdblocks += nindir + nblocks; + LASSERT(oti != NULL); - needed += nindir; - } + dentry = filter_oa2dentry(exp->exp_obd, oa); + if (IS_ERR(dentry)) + RETURN(PTR_ERR(dentry)); - /* Assumes ext3 and extN have same sb_info layout at the start. */ - if (nbitmaps > EXT3_SB(sb)->s_groups_count) - nbitmaps = EXT3_SB(sb)->s_groups_count; - if (ngdblocks > EXT3_SB(sb)->s_gdb_count) - ngdblocks = EXT3_SB(sb)->s_gdb_count; + filter = &exp->exp_obd->u.filter; - needed += nbitmaps + ngdblocks; + iattr_from_obdo(&iattr, oa, oa->o_valid); -#ifdef CONFIG_QUOTA - /* We assume that there will be 1 bit set in s_dquot.flags for each - * quota file that is active. This is at least true for now. - */ - needed += hweight32(sb_any_quota_enabled(sb)) * - EXT3_SINGLEDATA_TRANS_BLOCKS; -#endif + push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); + lock_kernel(); - return needed; -} + if (iattr.ia_valid & ATTR_SIZE) + down(&dentry->d_inode->i_sem); + handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR, + oti); + if (IS_ERR(handle)) + GOTO(out_unlock, rc = PTR_ERR(handle)); -/* We have to start a huge journal transaction here to hold all of the - * metadata for the pages being written here. This is necessitated by - * the fact that we do lots of prepare_write operations before we do - * any of the matching commit_write operations, so even if we split - * up to use "smaller" transactions none of them could complete until - * all of them were opened. By having a single journal transaction, - * we eliminate duplicate reservations for common blocks like the - * superblock and group descriptors or bitmaps. - * - * We will start the transaction here, but each prepare_write will - * add a refcount to the transaction, and each commit_write will - * remove a refcount. The transaction will be closed when all of - * the pages have been written. - */ -static void *ext3_filter_journal_start(struct filter_obd *filter, - int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb) -{ - journal_t *journal = NULL; - handle_t *handle = NULL; - int needed; + /* XXX this could be a rwsem instead, if filter_preprw played along */ + if (iattr.ia_valid & ATTR_ATTR_FLAG) + rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL, + EXT3_IOC_SETFLAGS, + (long)&iattr.ia_attr_flags); + else + rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1); + rc = filter_finish_transno(exp, oti, rc); + rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0); + if (rc2) { + CERROR("error on commit, err = %d\n", rc2); + if (!rc) + rc = rc2; + } - /* It appears that some kernels have different values for - * EXT*_MAX_GROUP_LOADED (either 8 or 32), so we cannot - * assume anything after s_inode_bitmap_number is the same. - */ - if (!strcmp(filter->fo_fstype, "ext3")) - journal = EXT3_SB(filter->fo_sb)->s_journal; -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - else if (!strcmp(filter->fo_fstype, "extN")) - journal = EXTN_SB(filter->fo_sb)->s_journal; -#endif - needed = ext3_credits_needed(filter->fo_sb, objcount, obj); - - /* The number of blocks we could _possibly_ dirty can very large. - * We reduce our request if it is absurd (and we couldn't get that - * many credits for a single handle anyways). - * - * At some point we have to limit the size of I/Os sent at one time, - * increase the size of the journal, or we have to calculate the - * actual journal requirements more carefully by checking all of - * the blocks instead of being maximally pessimistic. It remains to - * be seen if this is a real problem or not. - */ - if (needed > journal->j_max_transaction_buffers) { - CERROR("want too many journal credits (%d) using %d instead\n", - needed, journal->j_max_transaction_buffers); - needed = journal->j_max_transaction_buffers; + if (iattr.ia_valid & ATTR_SIZE) { + res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL, + res_id, LDLM_EXTENT, 0); + if (res == NULL) { + CERROR("!!! resource_get failed for object "LPU64" -- " + "filter_setattr with no lock?\n", oa->o_id); + } else { + if (res->lr_namespace->ns_lvbo && + res->lr_namespace->ns_lvbo->lvbo_update) { + rc = res->lr_namespace->ns_lvbo->lvbo_update + (res, NULL, 0, 0); + } + ldlm_resource_putref(res); + } } - lock_kernel(); - handle = journal_start(journal, needed); + oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); + +out_unlock: + if (iattr.ia_valid & ATTR_SIZE) + up(&dentry->d_inode->i_sem); unlock_kernel(); - if (IS_ERR(handle)) - CERROR("can't get handle for %d credits: rc = %ld\n", needed, - PTR_ERR(handle)); + pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - return(handle); + f_dput(dentry); + RETURN(rc); } -static void *filter_journal_start(void **journal_save, - struct filter_obd *filter, - int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb) +/* XXX identical to osc_unpackmd */ +static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, + struct lov_mds_md *lmm, int lmm_bytes) { - void *handle = NULL; - - /* This may not be necessary - we probably never have a - * transaction started when we enter here, so we can - * remove the saving of the journal state entirely. - * For now leave it in just to see if it ever happens. - */ - *journal_save = current->journal_info; - if (*journal_save) { - CERROR("Already have handle %p???\n", *journal_save); - LBUG(); - current->journal_info = NULL; - } + int lsm_size; + ENTRY; - if (!strcmp(filter->fo_fstype, "ext3") || - !strcmp(filter->fo_fstype, "extN")) - handle = ext3_filter_journal_start(filter, objcount, obj, - niocount, nb); - return handle; -} + if (lmm != NULL) { + if (lmm_bytes < sizeof (*lmm)) { + CERROR("lov_mds_md too small: %d, need %d\n", + lmm_bytes, (int)sizeof(*lmm)); + RETURN(-EINVAL); + } + /* XXX LOV_MAGIC etc check? */ -static int ext3_filter_journal_stop(void *handle) -{ - int rc; + if (lmm->lmm_object_id == cpu_to_le64(0)) { + CERROR("lov_mds_md: zero lmm_object_id\n"); + RETURN(-EINVAL); + } + } - /* We got a refcount on the handle for each call to prepare_write, - * so we can drop the "parent" handle here to avoid the need for - * osc to call back into filterobd to close the handle. The - * remaining references will be dropped in commit_write. - */ - lock_kernel(); - rc = journal_stop((handle_t *)handle); - unlock_kernel(); + lsm_size = lov_stripe_md_size(1); + if (lsmp == NULL) + RETURN(lsm_size); - return rc; -} + if (*lsmp != NULL && lmm == NULL) { + OBD_FREE(*lsmp, lsm_size); + *lsmp = NULL; + RETURN(0); + } -static int filter_journal_stop(void *journal_save, struct filter_obd *filter, - void *handle) -{ - int rc = 0; + if (*lsmp == NULL) { + OBD_ALLOC(*lsmp, lsm_size); + if (*lsmp == NULL) + RETURN(-ENOMEM); - if (!strcmp(filter->fo_fstype, "ext3") || - !strcmp(filter->fo_fstype, "extN")) - rc = ext3_filter_journal_stop(handle); + loi_init((*lsmp)->lsm_oinfo); + } - if (rc) - CERROR("error on journal stop: rc = %d\n", rc); + if (lmm != NULL) { + /* XXX zero *lsmp? */ + (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id); + LASSERT((*lsmp)->lsm_object_id); + } - current->journal_info = journal_save; + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; - return rc; + RETURN(lsm_size); } -static inline void lustre_put_page(struct page *page) +static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, + struct filter_obd *filter) { - kunmap(page); - page_cache_release(page); -} + struct obdo doa; /* XXX obdo on stack */ + __u64 last, id; + ENTRY; + LASSERT(oa); + memset(&doa, 0, sizeof(doa)); + if (oa->o_valid & OBD_MD_FLGROUP) + doa.o_gr = oa->o_gr; + else + doa.o_gr = 0; + doa.o_mode = S_IFREG; + last = filter_last_id(filter, &doa); /* FIXME: object groups */ + CWARN("deleting orphan objects from "LPU64" to "LPU64"\n", + oa->o_id + 1, last); + for (id = oa->o_id + 1; id <= last; id++) { + doa.o_id = id; + filter_destroy(exp, &doa, NULL, NULL); + } + spin_lock(&filter->fo_objidlock); + filter->fo_last_objids[0] = oa->o_id; /* FIXME: object groups */ + spin_unlock(&filter->fo_objidlock); + EXIT; +} -static struct page * -lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb) +/* returns a negative error or a nonnegative number of files to create */ +static int filter_should_precreate(struct obd_export *exp, struct obdo *oa, + int group) { - unsigned long index = rnb->offset >> PAGE_SHIFT; - struct address_space *mapping = inode->i_mapping; - struct page *page; - int rc; + struct obd_device *obd = exp->exp_obd; + struct filter_obd *filter = &obd->u.filter; + int diff, rc; + ENTRY; - page = read_cache_page(mapping, index, - (filler_t*)mapping->a_ops->readpage, NULL); - if (!IS_ERR(page)) { - wait_on_page(page); - kmap(page); - if (!PageUptodate(page)) { - CERROR("page index %lu not uptodate\n", index); - GOTO(err_page, rc = -EIO); - } - if (PageError(page)) { - CERROR("page index %lu has error\n", index); - GOTO(err_page, rc = -EIO); + diff = oa->o_id - filter_last_id(filter, oa); + CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n", + filter_last_id(filter, oa), diff); + + /* delete orphans request */ + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_DELORPHAN)) { + if (diff >= 0) + RETURN(diff); + if (-diff > 10000) { /* XXX make this smarter */ + CERROR("ignoring bogus orphan destroy request: obdid " + LPU64" last_id "LPU64"\n", + oa->o_id, filter_last_id(filter, oa)); + RETURN(-EINVAL); } - } - return page; + filter_destroy_precreated(exp, oa, filter); + rc = filter_update_last_objid(obd, group, 0); + if (rc) + CERROR("unable to write lastobjid, but orphans" + "were deleted\n"); + RETURN(0); + } else { + /* only precreate if group == 0 and o_id is specfied */ + if (!(oa->o_valid & OBD_FL_DELORPHAN) && + (group != 0 || oa->o_id == 0)) + RETURN(1); -err_page: - lustre_put_page(page); - return ERR_PTR(rc); + LASSERT(diff >= 0); + RETURN(diff); + } } -static struct page * -lustre_get_page_write(struct inode *inode, unsigned long index) +/* We rely on the fact that only one thread will be creating files in a given + * group at a time, which is why we don't need an atomic filter_get_new_id. + * Even if we had that atomic function, the following race would exist: + * + * thread 1: gets id x from filter_next_id + * thread 2: gets id (x + 1) from filter_next_id + * thread 2: creates object (x + 1) + * thread 1: tries to create object x, gets -ENOSPC + */ +static int filter_precreate(struct obd_device *obd, struct obdo *oa, + obd_gr group, int *num) { - struct address_space *mapping = inode->i_mapping; - struct page *page; - int rc; + struct dentry *dchild = NULL; + struct filter_obd *filter; + struct dentry *dparent; + int err = 0, rc = 0, i; + __u64 next_id; + int recreate_obj = 0; + void *handle = NULL; + ENTRY; - page = grab_cache_page(mapping, index); /* locked page */ + filter = &obd->u.filter; - if (!IS_ERR(page)) { - kmap(page); - /* Note: Called with "O" and "PAGE_SIZE" this is essentially - * a no-op for most filesystems, because we write the whole - * page. For partial-page I/O this will read in the page. - */ - rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE); + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + recreate_obj = 1; + } + + for (i = 0; i < *num && err == 0; i++) { + int cleanup_phase = 0; + + if (recreate_obj) { + __u64 last_id; + next_id = oa->o_id; + last_id = filter_last_id(filter, NULL); + if (next_id > last_id) { + CERROR("Error: Trying to recreate obj greater" + "than last id "LPD64" > "LPD64"\n", + next_id, last_id); + RETURN(-EINVAL); + } + } else + next_id = filter_last_id(filter, NULL) + 1; + + CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id); + + dparent = filter_parent_lock(obd, group, next_id); + if (IS_ERR(dparent)) + GOTO(cleanup, rc = PTR_ERR(dparent)); + cleanup_phase = 1; + + dchild = filter_fid2dentry(obd, dparent, group, next_id); + if (IS_ERR(dchild)) + GOTO(cleanup, rc = PTR_ERR(dchild)); + cleanup_phase = 2; + + if (dchild->d_inode != NULL) { + /* This would only happen if lastobjid was bad on disk*/ + /* Could also happen if recreating missing obj but + * already exists + */ + if (recreate_obj) { + CERROR("Serious error: recreating obj %*s but " + "obj already exists \n", + dchild->d_name.len, dchild->d_name.name); + } else { + CERROR("Serious error: objid %*s already " + "exists; is this filesystem corrupt?\n", + dchild->d_name.len, dchild->d_name.name); + } + GOTO(cleanup, rc = -EEXIST); + } + + handle = fsfilt_start(obd, dparent->d_inode, + FSFILT_OP_CREATE_LOG, NULL); + if (IS_ERR(handle)) + GOTO(cleanup, rc = PTR_ERR(handle)); + cleanup_phase = 3; + + rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL); if (rc) { - CERROR("page index %lu, rc = %d\n", index, rc); - if (rc != -ENOSPC) - LBUG(); - GOTO(err_unlock, rc); + CERROR("create failed rc = %d\n", rc); + GOTO(cleanup, rc); } - /* XXX not sure if we need this if we are overwriting page */ - if (PageError(page)) { - CERROR("error on page index %lu, rc = %d\n", index, rc); - LBUG(); - GOTO(err_unlock, rc = -EIO); + + if (!recreate_obj) { + filter_set_last_id(filter, NULL, next_id); + err = filter_update_last_objid(obd, group, 0); + if (err) + CERROR("unable to write lastobjid " + "but file created\n"); } - } - return page; -err_unlock: - unlock_page(page); - lustre_put_page(page); - return ERR_PTR(rc); -} + cleanup: + switch(cleanup_phase) { + case 3: + err = fsfilt_commit(obd, dparent->d_inode, handle, 0); + if (err) { + CERROR("error on commit, err = %d\n", err); + if (!rc) + rc = err; + } + case 2: + f_dput(dchild); + case 1: + filter_parent_unlock(dparent); + case 0: + break; + } -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) -int waitfor_one_page(struct page *page) -{ - wait_on_page_locked(page); - return 0; -} -#endif + if (rc) + break; + } + *num = i; -static int lustre_commit_write(struct page *page, unsigned from, unsigned to) -{ - struct inode *inode = page->mapping->host; - int err; - - err = page->mapping->a_ops->commit_write(NULL, page, from, to); - if (!err && IS_SYNC(inode)) - err = waitfor_one_page(page); - //SetPageUptodate(page); // the client commit_write will do this - - SetPageReferenced(page); - unlock_page(page); - lustre_put_page(page); - return err; + CDEBUG(D_INFO, "filter_precreate() created %d objects\n", i); + RETURN(rc); } -struct page *filter_get_page_write(struct inode *inode, - struct niobuf_remote *rnb, - struct niobuf_local *lnb, int *pglocked) +static int filter_create(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti) { - unsigned long index = rnb->offset >> PAGE_SHIFT; - struct address_space *mapping = inode->i_mapping; - - struct page *page; - int rc; + struct obd_device *obd = NULL; + struct obd_run_ctxt saved; + struct lov_stripe_md *lsm = NULL; + obd_gr group = 0; + int rc = 0, diff; + ENTRY; - //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL)); - if (*pglocked) - page = grab_cache_page_nowait(mapping, index); /* locked page */ - else - page = grab_cache_page(mapping, index); /* locked page */ + if (oa->o_valid & OBD_MD_FLGROUP) + group = oa->o_gr; + + CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n", + group, oa->o_id); + if (ea != NULL) { + lsm = *ea; + if (lsm == NULL) { + rc = obd_alloc_memmd(exp, &lsm); + if (rc < 0) + RETURN(rc); + } + } + obd = exp->exp_obd; + push_ctxt(&saved, &obd->obd_ctxt, NULL); - /* This page is currently locked, so get a temporary page instead. */ - /* XXX I believe this is a very dangerous thing to do - consider if - * we had multiple writers for the same file (definitely the case - * if we are using this codepath). If writer A locks the page, - * writer B writes to a copy (as here), writer A drops the page - * lock, and writer C grabs the lock before B does, then B will - * later overwrite the data from C, even if C had LDLM locked - * and initiated the write after B did. - */ - if (!page) { - unsigned long addr; - CDEBUG(D_PAGE, "ino %ld page %ld locked\n", inode->i_ino,index); - addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */ - if (!addr) { - CERROR("no memory for a temp page\n"); - LBUG(); - GOTO(err, rc = -ENOMEM); - } - /* XXX debugging */ - memset((void *)addr, 0xBA, PAGE_SIZE); - page = virt_to_page(addr); - kmap(page); - page->index = index; - lnb->flags |= N_LOCAL_TEMP_PAGE; - } else if (!IS_ERR(page)) { - (*pglocked)++; - kmap(page); - - rc = mapping->a_ops->prepare_write(NULL, page, - rnb->offset % PAGE_SIZE, - rnb->len); - if (rc) { - CERROR("page index %lu, rc = %d\n", index, rc); - if (rc != -ENOSPC) - LBUG(); - GOTO(err_unlock, rc); + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + if (oa->o_id > filter_last_id(&obd->u.filter, oa)) { + CERROR("recreate objid "LPU64" > last id "LPU64"\n", + oa->o_id, filter_last_id(&obd->u.filter, oa)); + rc = -EINVAL; + } else { + diff = 1; + rc = filter_precreate(obd, oa, group, &diff); } - /* XXX not sure if we need this if we are overwriting page */ - if (PageError(page)) { - CERROR("error on page index %lu, rc = %d\n", index, rc); - LBUG(); - GOTO(err_unlock, rc = -EIO); + } else { + diff = filter_should_precreate(exp, oa, group); + if (diff > 0) { + oa->o_id = filter_last_id(&obd->u.filter, oa); + rc = filter_precreate(obd, oa, group, &diff); + oa->o_id += diff; + oa->o_valid = OBD_MD_FLID; } } - return page; - -err_unlock: - unlock_page(page); - lustre_put_page(page); -err: - return ERR_PTR(rc); -} -/* - * We need to balance prepare_write() calls with commit_write() calls. - * If the page has been prepared, but we have no data for it, we don't - * want to overwrite valid data on disk, but we still need to zero out - * data for space which was newly allocated. Like part of what happens - * in __block_prepare_write() for newly allocated blocks. - * - * XXX currently __block_prepare_write() creates buffers for all the - * pages, and the filesystems mark these buffers as BH_New if they - * were newly allocated from disk. We use the BH_New flag similarly. - */ -static int filter_commit_write(struct page *page, unsigned from, unsigned to, - int err) -{ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - if (err) { - unsigned block_start, block_end; - struct buffer_head *bh, *head = page->buffers; - unsigned blocksize = head->b_size; - void *addr = page_address(page); - - /* debugging: just seeing if this ever happens */ - CERROR("called filter_commit_write for obj %ld:%ld on err %d\n", - page->index, page->mapping->host->i_ino, err); - - /* Currently one buffer per page, but in the future... */ - for (bh = head, block_start = 0; bh != head || !block_start; - block_start = block_end, bh = bh->b_this_page) { - block_end = block_start + blocksize; - if (buffer_new(bh)) - memset(addr + block_start, 0, blocksize); - } + pop_ctxt(&saved, &obd->obd_ctxt, NULL); + if (rc && ea != NULL && *ea != lsm) { + obd_free_memmd(exp, &lsm); + } else if (rc == 0 && ea != NULL) { + /* XXX LOV STACKING: the lsm that is passed to us from + * LOV does not have valid lsm_oinfo data structs, so + * don't go touching that. This needs to be fixed in a + * big way. */ + lsm->lsm_object_id = oa->o_id; + *ea = lsm; } -#endif - return lustre_commit_write(page, from, to); + + RETURN(rc); } -static int filter_preprw(int cmd, struct lustre_handle *conn, - int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, - struct niobuf_local *res, void **desc_private) +static int filter_destroy(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *ea, struct obd_trans_info *oti) { - struct obd_run_ctxt saved; struct obd_device *obd; - struct obd_ioobj *o = obj; - struct niobuf_remote *rnb = nb; - struct niobuf_local *lnb = res; - void *journal_save = NULL; - int pglocked = 0; - int rc = 0; - int i; + struct filter_obd *filter; + struct dentry *dchild = NULL, *dparent = NULL; + struct obd_run_ctxt saved; + void *handle = NULL; + struct llog_cookie *fcc = NULL; + int rc, rc2, cleanup_phase = 0, have_prepared = 0; + obd_gr group = 0; ENTRY; - obd = class_conn2obd(conn); - if (!obd) { - CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr); - RETURN(-EINVAL); - } - memset(res, 0, sizeof(*res) * niocount); + if (oa->o_valid & OBD_MD_FLGROUP) + group = oa->o_gr; - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - - if (cmd & OBD_BRW_WRITE) { - *desc_private = filter_journal_start(&journal_save, - &obd->u.filter, - objcount, obj, niocount, - nb); - if (IS_ERR(*desc_private)) - GOTO(out_ctxt, rc = PTR_ERR(*desc_private)); - } + obd = exp->exp_obd; + filter = &obd->u.filter; - obd_kmap_get(niocount, 1); + push_ctxt(&saved, &obd->obd_ctxt, NULL); - for (i = 0; i < objcount; i++, o++) { - struct dentry *dentry; - struct inode *inode; - int j; + acquire_locks: + dparent = filter_parent_lock(obd, group, oa->o_id); + if (IS_ERR(dparent)) + GOTO(cleanup, rc = PTR_ERR(dparent)); + cleanup_phase = 1; - dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG), - o->ioo_id, S_IFREG); - if (IS_ERR(dentry)) - GOTO(out_clean, rc = PTR_ERR(dentry)); - inode = dentry->d_inode; - if (!inode) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - o->ioo_id); - f_dput(dentry); - GOTO(out_clean, rc = -ENOENT); - } + dchild = filter_fid2dentry(obd, dparent, group, oa->o_id); + if (IS_ERR(dchild)) + GOTO(cleanup, rc = -ENOENT); + cleanup_phase = 2; - for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) { - struct page *page; + if (dchild->d_inode == NULL) { + CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n", + oa->o_id); + GOTO(cleanup, rc = -ENOENT); + } - if (j == 0) - lnb->dentry = dentry; - else - lnb->dentry = dget(dentry); + if (!have_prepared) { + /* If we're really going to destroy the object, get ready + * by getting the clients to discard their cached data. + * + * We have to drop the parent lock, because + * filter_prepare_destroy will acquire a PW on the object, and + * we don't want to deadlock with an incoming write to the + * object, which has the extent PW and then wants to get the + * parent dentry to do the lookup. + * + * We dput the child because it's not worth the extra + * complication of condition the above code to skip it on the + * second time through. */ + f_dput(dchild); + filter_parent_unlock(dparent); + + filter_prepare_destroy(obd, oa->o_id); + have_prepared = 1; + goto acquire_locks; + } - if (cmd & OBD_BRW_WRITE) - page = filter_get_page_write(inode, rnb, lnb, - &pglocked); - else - page = lustre_get_page_read(inode, rnb); + handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK_LOG, oti); + if (IS_ERR(handle)) + GOTO(cleanup, rc = PTR_ERR(handle)); + cleanup_phase = 3; + + /* Our MDC connection is established by the MDS to us */ + if (oa->o_valid & OBD_MD_FLCOOKIE) { + OBD_ALLOC(fcc, sizeof(*fcc)); + if (fcc != NULL) + memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc)); + } - if (IS_ERR(page)) { - f_dput(dentry); - GOTO(out_clean, rc = PTR_ERR(page)); - } + rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild); - lnb->addr = page_address(page); - lnb->offset = rnb->offset; - lnb->page = page; - lnb->len = rnb->len; +cleanup: + switch(cleanup_phase) { + case 3: + if (fcc != NULL) { + if (oti != NULL) + fsfilt_add_journal_cb(obd, 0, oti->oti_handle, + filter_cancel_cookies_cb, + fcc); + else + fsfilt_add_journal_cb(obd, 0, handle, + filter_cancel_cookies_cb, + fcc); } + rc = filter_finish_transno(exp, oti, rc); + rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0); + if (rc2) { + CERROR("error on commit, err = %d\n", rc2); + if (!rc) + rc = rc2; + } + case 2: + f_dput(dchild); + case 1: + filter_parent_unlock(dparent); + case 0: + pop_ctxt(&saved, &obd->obd_ctxt, NULL); + break; + default: + CERROR("invalid cleanup_phase %d\n", cleanup_phase); + LBUG(); } -out_stop: - if (cmd & OBD_BRW_WRITE) { - int err = filter_journal_stop(journal_save, &obd->u.filter, - *desc_private); - if (!rc) - rc = err; - } -out_ctxt: - pop_ctxt(&saved); RETURN(rc); -out_clean: - while (lnb-- > res) { - CERROR("error cleanup on brw\n"); - f_dput(lnb->dentry); - if (cmd & OBD_BRW_WRITE) - filter_commit_write(lnb->page, 0, PAGE_SIZE, rc); - else - lustre_put_page(lnb->page); - } - obd_kmap_put(niocount); - goto out_stop; } -static int filter_write_locked_page(struct niobuf_local *lnb) +/* NB start and end are used for punch, but not truncate */ +static int filter_truncate(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *lsm, + obd_off start, obd_off end, + struct obd_trans_info *oti) { - struct page *lpage; - int rc; - - lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index); - if (IS_ERR(lpage)) { - /* It is highly unlikely that we would ever get an error here. - * The page we want to get was previously locked, so it had to - * have already allocated the space, and we were just writing - * over the same data, so there would be no hole in the file. - * - * XXX: possibility of a race with truncate could exist, need - * to check that. There are no guarantees w.r.t. - * write order even on a local filesystem, although the - * normal response would be to return the number of bytes - * successfully written and leave the rest to the app. - */ - rc = PTR_ERR(lpage); - CERROR("error getting locked page index %ld: rc = %d\n", - lnb->page->index, rc); - GOTO(out, rc); - } + int error; + ENTRY; - /* lpage is kmapped in lustre_get_page_write() above and kunmapped in - * lustre_commit_write() below, lnb->page was kmapped previously in - * filter_get_page_write() and kunmapped in lustre_put_page() below. - */ - memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE); - rc = lustre_commit_write(lpage, 0, PAGE_SIZE); - if (rc) - CERROR("error committing locked page %ld: rc = %d\n", - lnb->page->index, rc); -out: - lustre_put_page(lnb->page); + if (end != OBD_OBJECT_EOF) + CERROR("PUNCH not supported, only truncate: end = "LPX64"\n", + end); - return rc; + CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, " + "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start); + oa->o_size = start; + error = filter_setattr(exp, oa, NULL, oti); + RETURN(error); } -static int filter_commitrw(int cmd, struct lustre_handle *conn, - int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *res, - void *private) +static int filter_sync(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *lsm, obd_off start, obd_off end) { struct obd_run_ctxt saved; - struct obd_ioobj *o; - struct niobuf_local *r; - struct obd_device *obd = class_conn2obd(conn); - void *journal_save; - int found_locked = 0; - int rc = 0; - int i; + struct filter_obd *filter; + struct dentry *dentry; + struct llog_ctxt *ctxt; + int rc, rc2; ENTRY; - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - lock_kernel(); - journal_save = current->journal_info; - LASSERT(!journal_save); + filter = &exp->exp_obd->u.filter; - current->journal_info = private; - unlock_kernel(); - for (i = 0, o = obj, r = res; i < objcount; i++, o++) { - int j; - for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) { - struct page *page = r->page; + /* an objid of zero is taken to mean "sync whole filesystem" */ + if (!oa || !(oa->o_valid & OBD_MD_FLID)) { + rc = fsfilt_sync(exp->exp_obd, filter->fo_sb); + /* flush any remaining cancel messages out to the target */ + ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT); + llog_sync(ctxt, exp); + RETURN(rc); + } - if (!page) - LBUG(); + dentry = filter_oa2dentry(exp->exp_obd, oa); + if (IS_ERR(dentry)) + RETURN(PTR_ERR(dentry)); - if (r->flags & N_LOCAL_TEMP_PAGE) { - found_locked++; - continue; - } + push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - if (cmd & OBD_BRW_WRITE) { - int err = filter_commit_write(page, 0, - r->len, 0); + down(&dentry->d_inode->i_sem); + rc = filemap_fdatasync(dentry->d_inode->i_mapping); + if (rc == 0) { + /* just any file to grab fsync method - "file" arg unused */ + struct file *file = filter->fo_rcvd_filp; - if (!rc) - rc = err; - } else - lustre_put_page(page); + if (file->f_op && file->f_op->fsync) + rc = file->f_op->fsync(NULL, dentry, 1); - obd_kmap_put(1); - f_dput(r->dentry); - } + rc2 = filemap_fdatawait(dentry->d_inode->i_mapping); + if (!rc) + rc = rc2; } - lock_kernel(); - current->journal_info = journal_save; - unlock_kernel(); + up(&dentry->d_inode->i_sem); - if (!found_locked) - goto out_ctxt; + oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); - for (i = 0, o = obj, r = res; i < objcount; i++, o++) { - int j; - for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) { - int err; - if (!(r->flags & N_LOCAL_TEMP_PAGE)) - continue; + pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - err = filter_write_locked_page(r); - obd_kmap_put(1); - if (!rc) - rc = err; - f_dput(r->dentry); - } - } - -out_ctxt: - pop_ctxt(&saved); + f_dput(dentry); RETURN(rc); } -static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs) +static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, + unsigned long max_age) { - struct obd_device *obd = class_conn2obd(conn); - struct statfs sfs; + struct filter_obd *filter = &obd->u.filter; + int blockbits = filter->fo_sb->s_blocksize_bits; int rc; - ENTRY; - rc = vfs_statfs(obd->u.filter.fo_sb, &sfs); - if (!rc) - statfs_pack(osfs, &sfs); - return rc; + /* at least try to account for cached pages. its still racey and + * might be under-reporting if clients haven't announced their + * caches with brw recently */ + spin_lock(&obd->obd_osfs_lock); + rc = fsfilt_statfs(obd, filter->fo_sb, max_age); + memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); + spin_unlock(&obd->obd_osfs_lock); + + CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64 + " pending "LPU64" free "LPU64" avail "LPU64"\n", + filter->fo_tot_dirty, filter->fo_tot_granted, + filter->fo_tot_pending, + osfs->os_bfree << blockbits, osfs->os_bavail << blockbits); + + filter_grant_sanity_check(obd, __FUNCTION__); + + osfs->os_bavail -= min(osfs->os_bavail, + (filter->fo_tot_dirty + filter->fo_tot_pending + + osfs->os_bsize -1) >> blockbits); + + RETURN(rc); } -static int filter_get_info(struct lustre_handle *conn, obd_count keylen, - void *key, obd_count *vallen, void **val) +static int filter_get_info(struct obd_export *exp, __u32 keylen, + void *key, __u32 *vallen, void *val) { struct obd_device *obd; ENTRY; - obd = class_conn2obd(conn); - if (!obd) { - CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr); + obd = class_exp2obd(exp); + if (obd == NULL) { + CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n", + exp->exp_handle.h_cookie); RETURN(-EINVAL); } - if ( keylen == strlen("blocksize") && - memcmp(key, "blocksize", keylen) == 0 ) { - *vallen = sizeof(long); - *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize; + if (keylen == strlen("blocksize") && + memcmp(key, "blocksize", keylen) == 0) { + __u32 *blocksize = val; + *vallen = sizeof(*blocksize); + *blocksize = obd->u.filter.fo_sb->s_blocksize; RETURN(0); } - if ( keylen == strlen("blocksize_bits") && - memcmp(key, "blocksize_bits", keylen) == 0 ){ - *vallen = sizeof(long); - *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize_bits; + if (keylen == strlen("blocksize_bits") && + memcmp(key, "blocksize_bits", keylen) == 0) { + __u32 *blocksize_bits = val; + *vallen = sizeof(*blocksize_bits); + *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits; RETURN(0); } - if ( keylen == strlen("root_ino") && - memcmp(key, "root_ino", keylen) == 0 ){ - *vallen = sizeof(obd_id); - *val = (void *)(obd_id)FILTER_ROOTINO; + if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) { + obd_id *last_id = val; + /* FIXME: object groups */ + *last_id = filter_last_id(&obd->u.filter, 0); RETURN(0); } - CDEBUG(D_IOCTL, "invalid key\n"); RETURN(-EINVAL); } -int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, - struct lustre_handle *src_conn, struct obdo *src, - obd_size count, obd_off offset) +static int filter_set_info(struct obd_export *exp, __u32 keylen, + void *key, __u32 vallen, void *val) { - struct page *page; - struct lov_stripe_md srcmd, dstmd; - unsigned long index = 0; - int err = 0; + struct obd_device *obd; + struct lustre_handle conn; + struct llog_ctxt *ctxt; + int rc = 0; + ENTRY; - memset(&srcmd, 0, sizeof(srcmd)); - memset(&dstmd, 0, sizeof(dstmd)); - srcmd.lsm_object_id = src->o_id; - dstmd.lsm_object_id = dst->o_id; + conn.cookie = exp->exp_handle.h_cookie; - ENTRY; - CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64 - ", dst: ino "LPU64"\n", - src->o_id, src->o_blocks, src->o_size, dst->o_id); - page = alloc_page(GFP_USER); - if (page == NULL) - RETURN(-ENOMEM); + obd = exp->exp_obd; + if (obd == NULL) { + CDEBUG(D_IOCTL, "invalid exp %p cookie "LPX64"\n", + exp, conn.cookie); + RETURN(-EINVAL); + } -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - while (TryLockPage(page)) - ___wait_on_page(page); -#else - wait_on_page_locked(page); -#endif + if (keylen < strlen("mds_conn") || + memcmp(key, "mds_conn", keylen) != 0) + RETURN(-EINVAL); - /* XXX with brw vector I/O, we could batch up reads and writes here, - * all we need to do is allocate multiple pages to handle the I/Os - * and arrays to handle the request parameters. - */ - while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) { - struct brw_page pg; - struct brw_cb_data *brw_cbd = ll_init_brw_cb_data(); + CWARN("Received MDS connection ("LPX64")\n", conn.cookie); + memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn)); + ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT); + rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse); + RETURN(rc); +} - if (!brw_cbd) { - err = -ENOMEM; - EXIT; - break; - } +int filter_iocontrol(unsigned int cmd, struct obd_export *exp, + int len, void *karg, void *uarg) +{ + struct obd_device *obd = exp->exp_obd; + struct obd_ioctl_data *data = karg; + int rc = 0; - pg.pg = page; - pg.count = PAGE_SIZE; - pg.off = (page->index) << PAGE_SHIFT; - pg.flag = 0; + switch (cmd) { + case OBD_IOC_ABORT_RECOVERY: + CERROR("aborting recovery for device %s\n", obd->obd_name); + target_abort_recovery(obd); + RETURN(0); - page->index = index; - err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, - ll_sync_brw_cb, brw_cbd); + case OBD_IOC_SET_READONLY: { + void *handle; + struct super_block *sb = obd->u.filter.fo_sb; + struct inode *inode = sb->s_root->d_inode; + BDEVNAME_DECLARE_STORAGE(tmp); + CERROR("setting device %s read-only\n", + ll_bdevname(sb, tmp)); - if ( err ) { - EXIT; - break; - } + handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL); + LASSERT(handle); + (void)fsfilt_commit(obd, inode, handle, 1); - brw_cbd = ll_init_brw_cb_data(); - if (!brw_cbd) { - err = -ENOMEM; - EXIT; - break; - } - pg.flag = OBD_BRW_CREATE; - CDEBUG(D_INFO, "Read page %ld ...\n", page->index); + dev_set_rdonly(ll_sbdev(obd->u.filter.fo_sb), 2); + RETURN(0); + } + + case OBD_IOC_CATLOGLIST: { + rc = llog_catlog_list(obd, 1, data); + RETURN(rc); + } - err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, - ll_sync_brw_cb, brw_cbd); + case OBD_IOC_LLOG_CANCEL: + case OBD_IOC_LLOG_REMOVE: + case OBD_IOC_LLOG_INFO: + case OBD_IOC_LLOG_PRINT: { + /* FIXME to be finished */ + RETURN(-EOPNOTSUPP); +/* + struct llog_ctxt *ctxt = NULL; - /* XXX should handle dst->o_size, dst->o_blocks here */ - if ( err ) { - EXIT; - break; - } + push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); + rc = llog_ioctl(ctxt, cmd, data); + pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); + + RETURN(rc); +*/ + } - CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index); - index++; + default: + RETURN(-EINVAL); } - dst->o_size = src->o_size; - dst->o_blocks = src->o_blocks; - dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - unlock_page(page); - __free_page(page); + RETURN(0); +} + +static struct llog_operations filter_unlink_repl_logops; +static struct llog_operations filter_size_orig_logops = { + lop_setup: llog_obd_origin_setup, + lop_cleanup: llog_obd_origin_cleanup, + lop_add: llog_obd_origin_add +}; + +static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt, + int count, struct llog_catid *logid) +{ + struct llog_ctxt *ctxt; + int rc; + ENTRY; + + filter_unlink_repl_logops = llog_client_ops; + filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel; + filter_unlink_repl_logops.lop_connect = llog_repl_connect; + filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync; - RETURN(err); + rc = llog_setup(obd, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL, + &filter_unlink_repl_logops); + if (rc) + RETURN(rc); + /* FIXME - assign unlink_cb for filter's recovery */ + ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT); + ctxt->llog_proc_cb = filter_recov_log_unlink_cb; + + rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL, + &filter_size_orig_logops); + RETURN(rc); } -int filter_attach(struct obd_device *dev, - obd_count len, void *data) + +static int filter_llog_finish(struct obd_device *obd, int count) { - return lprocfs_reg_obd(dev, status_var_nm_1, dev); + int rc; + ENTRY; + + rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_REPL_CTXT)); + if (rc) + RETURN(rc); + + rc = llog_cleanup(llog_get_context(obd, LLOG_SIZE_ORIG_CTXT)); + RETURN(rc); } -int filter_detach(struct obd_device *dev) +static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr, + void *data) { - return lprocfs_dereg_obd(dev); + return filter_fid2dentry(data, NULL, gr, id); } + +static struct lvfs_callback_ops filter_lvfs_ops = { + l_fid2dentry: filter_lvfs_fid2dentry, +}; + static struct obd_ops filter_obd_ops = { - o_attach: filter_attach, - o_detach: filter_detach, - o_get_info: filter_get_info, - o_setup: filter_setup, - o_cleanup: filter_cleanup, - o_connect: filter_connect, - o_disconnect: filter_disconnect, - o_statfs: filter_statfs, - o_getattr: filter_getattr, - o_create: filter_create, - o_setattr: filter_setattr, - o_destroy: filter_destroy, - o_open: filter_open, - o_close: filter_close, - o_brw: filter_pgcache_brw, - o_punch: filter_truncate, - o_preprw: filter_preprw, - o_commitrw: filter_commitrw -#if 0 - o_preallocate: filter_preallocate_inodes, - o_migrate: filter_migrate, - o_copy: filter_copy_data, - o_iterate: filter_iterate -#endif + o_owner: THIS_MODULE, + o_attach: filter_attach, + o_detach: filter_detach, + o_get_info: filter_get_info, + o_set_info: filter_set_info, + o_setup: filter_setup, + o_precleanup: filter_precleanup, + o_cleanup: filter_cleanup, + o_connect: filter_connect, + o_disconnect: filter_disconnect, + o_statfs: filter_statfs, + o_getattr: filter_getattr, + o_unpackmd: filter_unpackmd, + o_create: filter_create, + o_setattr: filter_setattr, + o_destroy: filter_destroy, + o_brw: filter_brw, + o_punch: filter_truncate, + o_sync: filter_sync, + o_preprw: filter_preprw, + o_commitrw: filter_commitrw, + o_destroy_export: filter_destroy_export, + o_llog_init: filter_llog_init, + o_llog_finish: filter_llog_finish, + o_iocontrol: filter_iocontrol, }; +static struct obd_ops filter_sanobd_ops = { + o_owner: THIS_MODULE, + o_attach: filter_attach, + o_detach: filter_detach, + o_get_info: filter_get_info, + o_set_info: filter_set_info, + o_setup: filter_san_setup, + o_precleanup: filter_precleanup, + o_cleanup: filter_cleanup, + o_connect: filter_connect, + o_disconnect: filter_disconnect, + o_statfs: filter_statfs, + o_getattr: filter_getattr, + o_unpackmd: filter_unpackmd, + o_create: filter_create, + o_setattr: filter_setattr, + o_destroy: filter_destroy, + o_brw: filter_brw, + o_punch: filter_truncate, + o_sync: filter_sync, + o_preprw: filter_preprw, + o_commitrw: filter_commitrw, + o_san_preprw: filter_san_preprw, + o_destroy_export: filter_destroy_export, + o_llog_init: filter_llog_init, + o_llog_finish: filter_llog_finish, + o_iocontrol: filter_iocontrol, +}; static int __init obdfilter_init(void) { - printk(KERN_INFO "Filtering OBD driver v0.001, info@clusterfs.com\n"); - filter_open_cache = kmem_cache_create("ll_filter_fdata", - sizeof(struct filter_file_data), - 0, 0, NULL, NULL); - if (!filter_open_cache) - RETURN(-ENOMEM); + struct lprocfs_static_vars lvars; + int rc; - filter_dentry_cache = kmem_cache_create("ll_filter_dentry", - sizeof(struct filter_dentry_data), - 0, 0, NULL, NULL); - if (!filter_dentry_cache) { - kmem_cache_destroy(filter_open_cache); - RETURN(-ENOMEM); - } + printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n"); + + lprocfs_init_vars(filter, &lvars); - return class_register_type(&filter_obd_ops, status_class_var, - OBD_FILTER_DEVICENAME); + rc = class_register_type(&filter_obd_ops, lvars.module_vars, + OBD_FILTER_DEVICENAME); + if (rc) + return rc; + + rc = class_register_type(&filter_sanobd_ops, lvars.module_vars, + OBD_FILTER_SAN_DEVICENAME); + if (rc) + class_unregister_type(OBD_FILTER_DEVICENAME); + return rc; } static void __exit obdfilter_exit(void) { + class_unregister_type(OBD_FILTER_SAN_DEVICENAME); class_unregister_type(OBD_FILTER_DEVICENAME); - if (kmem_cache_destroy(filter_dentry_cache)) - CERROR("couldn't free obdfilter dentry cache\n"); - if (kmem_cache_destroy(filter_open_cache)) - CERROR("couldn't free obdfilter open cache\n"); } MODULE_AUTHOR("Cluster File Systems, Inc. "); -MODULE_DESCRIPTION("Lustre Filtering OBD driver v1.0"); +MODULE_DESCRIPTION("Lustre Filtering OBD driver"); MODULE_LICENSE("GPL"); module_init(obdfilter_init);