* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+/*
+ * Invariant: get O/R i_sem for lookup, if needed, before any journal ops
+ * (which need to get journal_lock, may block if journal full).
+ */
+
#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_FILTER
+#include <linux/config.h>
#include <linux/module.h>
-#include <linux/pagemap.h>
+#include <linux/pagemap.h> // XXX kill me soon
#include <linux/fs.h>
#include <linux/dcache.h>
#include <linux/obd_class.h>
#include <linux/lustre_dlm.h>
#include <linux/obd_filter.h>
-#include <linux/ext3_jbd.h>
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#include <linux/extN_jbd.h>
-#endif
-#include <linux/quotaops.h>
#include <linux/init.h>
#include <linux/random.h>
-#include <linux/stringify.h>
+#include <linux/lustre_fsfilt.h>
#include <linux/lprocfs_status.h>
extern struct lprocfs_vars status_class_var[];
static kmem_cache_t *filter_open_cache;
static kmem_cache_t *filter_dentry_cache;
-#define FILTER_ROOTINO 2
-#define FILTER_ROOTINO_STR __stringify(FILTER_ROOTINO)
-
#define S_SHIFT 12
static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
[0] NULL,
.d_release = filter_drelease,
};
+#define LAST_RCVD "last_rcvd"
+
/* setup the object store with correct subdirectories */
static int filter_prep(struct obd_device *obd)
{
struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
struct dentry *dentry;
- struct dentry *root;
struct file *file;
struct inode *inode;
int rc = 0;
GOTO(out, rc);
}
filter->fo_dentry_O = dentry;
- dentry = simple_mkdir(current->fs->pwd, "P", 0700);
- CDEBUG(D_INODE, "got/created P: %p\n", dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("cannot open/create P: rc = %d\n", rc);
- GOTO(out_O, rc);
- }
- f_dput(dentry);
- dentry = simple_mkdir(current->fs->pwd, "D", 0700);
- CDEBUG(D_INODE, "got/created D: %p\n", dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("cannot open/create D: rc = %d\n", rc);
- GOTO(out_O, rc);
- }
-
- root = simple_mknod(dentry, FILTER_ROOTINO_STR, S_IFREG | 0755);
- f_dput(dentry);
- if (IS_ERR(root)) {
- rc = PTR_ERR(root);
- CERROR("OBD filter: cannot open/create root %d: rc = %d\n",
- FILTER_ROOTINO, rc);
- GOTO(out_O, rc);
- }
- f_dput(root);
/*
* Create directories and/or get dentries for each object type.
* This saves us from having to do multiple lookups for each one.
*/
for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
- char *type = obd_type_by_mode[mode];
+ char *name = obd_type_by_mode[mode];
- if (!type) {
+ if (!name) {
filter->fo_dentry_O_mode[mode] = NULL;
continue;
}
- dentry = simple_mkdir(filter->fo_dentry_O, type, 0700);
- CDEBUG(D_INODE, "got/created O/%s: %p\n", type, dentry);
+ dentry = simple_mkdir(filter->fo_dentry_O, name, 0700);
+ CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
- CERROR("cannot create O/%s: rc = %d\n", type, rc);
+ CERROR("cannot create O/%s: rc = %d\n", name, rc);
GOTO(out_O_mode, rc);
}
filter->fo_dentry_O_mode[mode] = dentry;
}
- file = filp_open("D/status", O_RDWR | O_CREAT, 0700);
+ file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
if ( !file || IS_ERR(file) ) {
rc = PTR_ERR(file);
- CERROR("OBD filter: cannot open/create status %s: rc = %d\n",
- "D/status", rc);
+ CERROR("OBD filter: cannot open/create %s: rc = %d\n",
+ LAST_RCVD, rc);
GOTO(out_O_mode, rc);
}
return(rc);
-out_O_mode:
+ out_O_mode:
while (mode-- > 0) {
struct dentry *dentry = filter->fo_dentry_O_mode[mode];
if (dentry) {
filter->fo_dentry_O_mode[mode] = NULL;
}
}
-out_O:
f_dput(filter->fo_dentry_O);
filter->fo_dentry_O = NULL;
goto out;
int mode;
push_ctxt(&saved, &filter->fo_ctxt, NULL);
- file = filp_open("D/status", O_RDWR | O_CREAT, 0700);
+ file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
if (IS_ERR(file)) {
- CERROR("OBD filter: cannot create status file\n");
+ CERROR("OBD filter: cannot create %s\n", LAST_RCVD);
goto out;
}
id = ++obd->u.filter.fo_lastobjid;
spin_unlock(&obd->u.filter.fo_objidlock);
- /* FIXME: write the lastobjid to disk here */
return id;
}
/* parent i_sem is already held if needed for exclusivity */
static struct dentry *filter_fid2dentry(struct obd_device *obd,
struct dentry *dparent,
- __u64 id, __u32 type, int locked)
+ __u64 id, int locked)
{
struct super_block *sb = obd->u.filter.fo_sb;
struct dentry *dchild;
}
if (id == 0) {
- CERROR("fatal: invalid object #0\n");
+ CERROR("fatal: invalid object id 0\n");
LBUG();
RETURN(ERR_PTR(-ESTALE));
}
- if (!(type & S_IFMT)) {
- CERROR("OBD %s, object "LPU64" has bad type: %o\n",
- __FUNCTION__, id, type);
- RETURN(ERR_PTR(-EINVAL));
- }
-
len = sprintf(name, LPU64, id);
- CDEBUG(D_INODE, "opening object O/%s/%s\n", obd_mode_to_type(type),
- name);
+ CDEBUG(D_INODE, "opening object O/%*s/%s\n",
+ dparent->d_name.len, dparent->d_name.name, name);
if (!locked)
down(&dparent->d_inode->i_sem);
dchild = lookup_one_len(name, dparent, len);
RETURN(dchild);
}
- CDEBUG(D_INODE, "got child obj O/%s/%s: %p, count = %d\n",
- obd_mode_to_type(type), name, dchild,
+ CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
+ dparent->d_name.len, dparent->d_name.name, name, dchild,
atomic_read(&dchild->d_count));
LASSERT(atomic_read(&dchild->d_count) > 0);
{
struct filter_obd *filter = &obd->u.filter;
+ LASSERT((mode & S_IFMT) == S_IFREG); /* only regular files for now */
return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
}
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
- /* XXX unlink from PENDING directory now too */
pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
if (rc)
struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
down(&dir_dentry->d_inode->i_sem);
+ /* XXX start transaction */
+ /* XXX unlink from PENDING directory now too */
rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
+ /* XXX finish transaction */
if (rc2 && !rc)
rc = rc2;
up(&dir_dentry->d_inode->i_sem);
}
/* obd methods */
-static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
- obd_uuid_t cluuid, struct recovd_obd *recovd,
- ptlrpc_recovery_cb_t recover)
-{
- struct obd_export *exp;
- int rc;
-
- ENTRY;
- MOD_INC_USE_COUNT;
- rc = class_connect(conn, obd, cluuid);
- if (rc)
- GOTO(out_dec, rc);
- exp = class_conn2export(conn);
- LASSERT(exp);
-
- INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
- spin_lock_init(&exp->exp_filter_data.fed_lock);
-out:
- RETURN(rc);
-
-out_dec:
- MOD_DEC_USE_COUNT;
- goto out;
-}
-
-static int filter_disconnect(struct lustre_handle *conn)
-{
- struct obd_export *exp = class_conn2export(conn);
- struct filter_export_data *fed;
- int rc;
- ENTRY;
-
- LASSERT(exp);
- fed = &exp->exp_filter_data;
- spin_lock(&fed->fed_lock);
- while (!list_empty(&fed->fed_open_head)) {
- struct filter_file_data *ffd;
-
- ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
- ffd_export_list);
- list_del(&ffd->ffd_export_list);
- spin_unlock(&fed->fed_lock);
-
- CERROR("force closing file %*s on disconnect\n",
- ffd->ffd_file->f_dentry->d_name.len,
- ffd->ffd_file->f_dentry->d_name.name);
-
- filter_close_internal(exp->exp_obd, ffd);
- spin_lock(&fed->fed_lock);
- }
- spin_unlock(&fed->fed_lock);
-
- ldlm_cancel_locks_for_export(exp);
- rc = class_disconnect(conn);
- if (!rc)
- MOD_DEC_USE_COUNT;
-
- /* XXX cleanup preallocated inodes */
- RETURN(rc);
-}
-
/* mount the file system (secretly) */
static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
{
struct obd_ioctl_data* data = buf;
struct filter_obd *filter;
struct vfsmount *mnt;
- int err = 0;
+ int rc = 0;
ENTRY;
+ MOD_INC_USE_COUNT;
if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
- RETURN(-EINVAL);
+ GOTO(err_dec, rc = -EINVAL);
+
+ obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
+ if (IS_ERR(obd->obd_fsops))
+ GOTO(err_dec, rc = PTR_ERR(obd->obd_fsops));
- MOD_INC_USE_COUNT;
mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
- err = PTR_ERR(mnt);
+ rc = PTR_ERR(mnt);
if (IS_ERR(mnt))
- GOTO(err_dec, err);
+ GOTO(err_ops, rc);
filter = &obd->u.filter;;
filter->fo_vfsmnt = mnt;
filter->fo_fstype = strdup(data->ioc_inlbuf2);
filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
- CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, filter->fo_vfsmnt);
- /* XXX is this even possible if do_kern_mount succeeded? */
- if (!filter->fo_sb)
- GOTO(err_kfree, err = -ENODEV);
+ CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
filter->fo_ctxt.pwdmnt = mnt;
filter->fo_ctxt.pwd = mnt->mnt_root;
filter->fo_ctxt.fs = get_ds();
- err = filter_prep(obd);
- if (err)
- GOTO(err_kfree, err);
+ rc = filter_prep(obd);
+ if (rc)
+ GOTO(err_kfree, rc);
+
spin_lock_init(&filter->fo_fddlock);
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
obd->obd_namespace =
ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
- if (obd->obd_namespace == NULL)
- LBUG();
+ if (!obd->obd_namespace)
+ GOTO(err_post, rc = -ENOMEM);
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
RETURN(0);
+err_post:
+ filter_post(obd);
err_kfree:
kfree(filter->fo_fstype);
unlock_kernel();
mntput(filter->fo_vfsmnt);
filter->fo_sb = 0;
lock_kernel();
-
+err_ops:
+ fsfilt_put_ops(obd->obd_fsops);
err_dec:
MOD_DEC_USE_COUNT;
- return err;
+ return rc;
}
mntput(obd->u.filter.fo_vfsmnt);
obd->u.filter.fo_sb = 0;
kfree(obd->u.filter.fo_fstype);
+ fsfilt_put_ops(obd->obd_fsops);
lock_kernel();
RETURN(0);
}
+int filter_attach(struct obd_device *dev, obd_count len, void *data)
+{
+ return lprocfs_reg_obd(dev, status_var_nm_1, dev);
+}
+
+int filter_detach(struct obd_device *dev)
+{
+ return lprocfs_dereg_obd(dev);
+}
+
+static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
+ obd_uuid_t cluuid, struct recovd_obd *recovd,
+ ptlrpc_recovery_cb_t recover)
+{
+ struct obd_export *exp;
+ int rc;
+
+ ENTRY;
+ MOD_INC_USE_COUNT;
+ rc = class_connect(conn, obd, cluuid);
+ if (rc)
+ GOTO(out_dec, rc);
+ exp = class_conn2export(conn);
+ LASSERT(exp);
+
+ INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
+ spin_lock_init(&exp->exp_filter_data.fed_lock);
+out:
+ RETURN(rc);
+
+out_dec:
+ MOD_DEC_USE_COUNT;
+ goto out;
+}
+
+static int filter_disconnect(struct lustre_handle *conn)
+{
+ struct obd_export *exp = class_conn2export(conn);
+ struct filter_export_data *fed;
+ int rc;
+ ENTRY;
+
+ LASSERT(exp);
+ fed = &exp->exp_filter_data;
+ spin_lock(&fed->fed_lock);
+ while (!list_empty(&fed->fed_open_head)) {
+ struct filter_file_data *ffd;
+
+ ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
+ ffd_export_list);
+ list_del(&ffd->ffd_export_list);
+ spin_unlock(&fed->fed_lock);
+
+ CERROR("force closing file %*s on disconnect\n",
+ ffd->ffd_file->f_dentry->d_name.len,
+ ffd->ffd_file->f_dentry->d_name.name);
+
+ filter_close_internal(exp->exp_obd, ffd);
+ spin_lock(&fed->fed_lock);
+ }
+ spin_unlock(&fed->fed_lock);
+
+ ldlm_cancel_locks_for_export(exp);
+ rc = class_disconnect(conn);
+ if (!rc)
+ MOD_DEC_USE_COUNT;
+
+ /* XXX cleanup preallocated inodes */
+ RETURN(rc);
+}
static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
{
RETURN(ERR_PTR(-EINVAL));
}
dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
- oa->o_id, oa->o_mode, locked);
+ oa->o_id, locked);
}
if (IS_ERR(dentry)) {
RETURN(rc);
}
+/* this is called from filter_truncate() until we have filter_punch() */
static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *md)
{
iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
inode = dentry->d_inode;
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
lock_kernel();
if (iattr.ia_valid & ATTR_SIZE)
down(&inode->i_sem);
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+
+ /* XXX start transaction */
if (inode->i_op->setattr)
rc = inode->i_op->setattr(dentry, &iattr);
else
rc = inode_setattr(inode, &iattr);
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ /* XXX update last_rcvd, finish transaction */
+
if (iattr.ia_valid & ATTR_SIZE) {
up(&inode->i_sem);
oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
obdo_from_inode(oa, inode, oa->o_valid);
}
+
unlock_kernel();
+ pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
f_dput(dentry);
RETURN(rc);
struct lov_stripe_md **ea)
{
struct obd_device *obd = class_conn2obd(conn);
- char name[64];
struct obd_run_ctxt saved;
+ struct dentry *dir_dentry;
struct dentry *new;
struct iattr;
+ int rc;
ENTRY;
if (!obd) {
return -EINVAL;
}
- if (!(oa->o_mode & S_IFMT)) {
- CERROR("OBD %s, object "LPU64" has bad type: %o\n",
- __FUNCTION__, oa->o_id, oa->o_mode);
- return -ENOENT;
- }
-
oa->o_id = filter_next_id(obd);
- //filter_id(name, oa->o_id, oa->o_mode);
- sprintf(name, LPU64, oa->o_id);
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- new = simple_mknod(filter_parent(obd, oa->o_mode), name, oa->o_mode);
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- if (IS_ERR(new)) {
- CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(new));
- return -ENOENT;
+ dir_dentry = filter_parent(obd, oa->o_mode);
+ down(&dir_dentry->d_inode->i_sem);
+ new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 1);
+ if (IS_ERR(new))
+ GOTO(out, rc = PTR_ERR(new));
+
+ if (new->d_inode) {
+ /* This would only happen if lastobjid was bad on disk */
+ CERROR("objid O/%*s/"LPU64" already exists\n",
+ dir_dentry->d_name.len, dir_dentry->d_name.name,
+ oa->o_id);
+ LBUG();
+ GOTO(out, rc = -EEXIST);
}
+ /* XXX start transaction */
+ rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
+ if (rc)
+ GOTO(out_put, rc);
+ /* XXX update last_rcvd+lastobjid on disk, finish transaction */
+
/* Set flags for fields we have set in the inode struct */
oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
filter_from_inode(oa, new->d_inode, oa->o_valid);
- f_dput(new);
- return 0;
+ EXIT;
+out_put:
+ f_dput(new);
+out:
+ up(&dir_dentry->d_inode->i_sem);
+ pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ return rc;
}
static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
GOTO(out, rc = -ENOENT);
fdd = object_dentry->d_fsdata;
+ /* XXX start transaction */
if (fdd && atomic_read(&fdd->fdd_open_count)) {
if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
fdd->fdd_flags |= FILTER_FLAG_DESTROY;
rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
out_dput:
+ /* XXX update last_rcvd on disk, finish transaction */
f_dput(object_dentry);
EXIT;
return rc;
}
-/* NB count and offset are used for punch, but not truncate */
+/* NB start and end are used for punch, but not truncate */
static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm,
obd_off start, obd_off end)
RETURN(error);
}
-static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *lsm, obd_count oa_bufs,
- struct brw_page *pga, struct obd_brw_set *set)
-{
- struct obd_export *export = class_conn2export(conn);
- struct obd_run_ctxt saved;
- struct super_block *sb;
- int pnum; /* index to pages (bufs) */
- unsigned long retval;
- int error;
- struct file *file;
- int pg;
- ENTRY;
-
- if (!export) {
- CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
- RETURN(-EINVAL);
- }
-
- sb = export->exp_obd->u.filter.fo_sb;
- push_ctxt(&saved, &export->exp_obd->u.filter.fo_ctxt, NULL);
- pnum = 0; /* pnum indexes buf 0..num_pages */
-
- file = filter_obj_open(export, lsm->lsm_object_id, S_IFREG);
- if (IS_ERR(file))
- GOTO(out, retval = PTR_ERR(file));
-
- /* count doubles as retval */
- for (pg = 0; pg < oa_bufs; pg++) {
- CDEBUG(D_INODE, "OP %d inode %lu pgno: (%d) "LPU64
- ") off count ("LPU64",%d)\n",
- cmd, file->f_dentry->d_inode->i_ino, pnum,
- pga[pnum].off >> PAGE_CACHE_SHIFT, pga[pnum].off,
- (int)pga[pnum].count);
- if (cmd & OBD_BRW_WRITE) {
- loff_t off;
- char *buffer;
- off = pga[pnum].off;
- buffer = kmap(pga[pnum].pg);
- retval = file->f_op->write(file, buffer,
- pga[pnum].count,
- &off);
- kunmap(pga[pnum].pg);
- CDEBUG(D_INODE, "retval %ld\n", retval);
- } else {
- loff_t off = pga[pnum].off;
- char *buffer = kmap(pga[pnum].pg);
-
- if (off >= file->f_dentry->d_inode->i_size) {
- memset(buffer, 0, pga[pnum].count);
- retval = pga[pnum].count;
- } else {
- retval = file->f_op->read(file, buffer,
- pga[pnum].count, &off);
- }
- kunmap(pga[pnum].pg);
-
- if (retval != pga[pnum].count) {
- filp_close(file, 0);
- GOTO(out, retval = -EIO);
- }
- CDEBUG(D_INODE, "retval %ld\n", retval);
- }
- pnum++;
- }
- /* sizes and blocks are set by generic_file_write */
- /* ctimes/mtimes will follow with a setattr call */
- filp_close(file, 0);
-
- /* XXX: do something with callback if it is set? */
-
- EXIT;
-out:
- pop_ctxt(&saved, &export->exp_obd->u.filter.fo_ctxt, NULL);
- error = (retval >= 0) ? 0 : retval;
- return error;
-}
-
-/*
- * Calculate the number of buffer credits needed to write multiple pages in
- * a single ext3/extN transaction. No, this shouldn't be here, but as yet
- * ext3 doesn't have a nice API for calculating this sort of thing in advance.
- *
- * See comment above ext3_writepage_trans_blocks for details. We assume
- * no data journaling is being done, but it does allow for all of the pages
- * being non-contiguous. If we are guaranteed contiguous pages we could
- * reduce the number of (d)indirect blocks a lot.
- *
- * With N blocks per page and P pages, for each inode we have at most:
- * N*P indirect
- * min(N*P, blocksize/4 + 1) dindirect blocks
- * 1 tindirect
- *
- * For the entire filesystem, we have at most:
- * min(sum(nindir + P), ngroups) bitmap blocks (from the above)
- * min(sum(nindir + P), gdblocks) group descriptor blocks (from the above)
- * 1 inode block
- * 1 superblock
- * 2 * EXT3_SINGLEDATA_TRANS_BLOCKS for the quota files
- */
-static int ext3_credits_needed(struct super_block *sb, int objcount,
- struct obd_ioobj *obj)
-{
- struct obd_ioobj *o = obj;
- int blockpp = 1 << (PAGE_CACHE_SHIFT - sb->s_blocksize_bits);
- int addrpp = EXT3_ADDR_PER_BLOCK(sb) * blockpp;
- int nbitmaps = 0;
- int ngdblocks = 0;
- int needed = objcount + 1;
- int i;
-
- for (i = 0; i < objcount; i++, o++) {
- int nblocks = o->ioo_bufcnt * blockpp;
- int ndindirect = min(nblocks, addrpp + 1);
- int nindir = nblocks + ndindirect + 1;
-
- nbitmaps += nindir + nblocks;
- ngdblocks += nindir + nblocks;
-
- needed += nindir;
- }
-
- /* Assumes ext3 and extN have same sb_info layout at the start. */
- if (nbitmaps > EXT3_SB(sb)->s_groups_count)
- nbitmaps = EXT3_SB(sb)->s_groups_count;
- if (ngdblocks > EXT3_SB(sb)->s_gdb_count)
- ngdblocks = EXT3_SB(sb)->s_gdb_count;
-
- needed += nbitmaps + ngdblocks;
-
-#ifdef CONFIG_QUOTA
- /* We assume that there will be 1 bit set in s_dquot.flags for each
- * quota file that is active. This is at least true for now.
- */
- needed += hweight32(sb_any_quota_enabled(sb)) *
- EXT3_SINGLEDATA_TRANS_BLOCKS;
-#endif
-
- return needed;
-}
-
-/* We have to start a huge journal transaction here to hold all of the
- * metadata for the pages being written here. This is necessitated by
- * the fact that we do lots of prepare_write operations before we do
- * any of the matching commit_write operations, so even if we split
- * up to use "smaller" transactions none of them could complete until
- * all of them were opened. By having a single journal transaction,
- * we eliminate duplicate reservations for common blocks like the
- * superblock and group descriptors or bitmaps.
- *
- * We will start the transaction here, but each prepare_write will
- * add a refcount to the transaction, and each commit_write will
- * remove a refcount. The transaction will be closed when all of
- * the pages have been written.
- */
-static void *ext3_filter_journal_start(struct filter_obd *filter,
- int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *nb)
-{
- journal_t *journal = NULL;
- handle_t *handle = NULL;
- int needed;
-
- /* It appears that some kernels have different values for
- * EXT*_MAX_GROUP_LOADED (either 8 or 32), so we cannot
- * assume anything after s_inode_bitmap_number is the same.
- */
- if (!strcmp(filter->fo_fstype, "ext3"))
- journal = EXT3_SB(filter->fo_sb)->s_journal;
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- else if (!strcmp(filter->fo_fstype, "extN"))
- journal = EXTN_SB(filter->fo_sb)->s_journal;
-#endif
- needed = ext3_credits_needed(filter->fo_sb, objcount, obj);
-
- /* The number of blocks we could _possibly_ dirty can very large.
- * We reduce our request if it is absurd (and we couldn't get that
- * many credits for a single handle anyways).
- *
- * At some point we have to limit the size of I/Os sent at one time,
- * increase the size of the journal, or we have to calculate the
- * actual journal requirements more carefully by checking all of
- * the blocks instead of being maximally pessimistic. It remains to
- * be seen if this is a real problem or not.
- */
- if (needed > journal->j_max_transaction_buffers) {
- CERROR("want too many journal credits (%d) using %d instead\n",
- needed, journal->j_max_transaction_buffers);
- needed = journal->j_max_transaction_buffers;
- }
-
- lock_kernel();
- handle = journal_start(journal, needed);
- unlock_kernel();
- if (IS_ERR(handle))
- CERROR("can't get handle for %d credits: rc = %ld\n", needed,
- PTR_ERR(handle));
-
- return(handle);
-}
-
-static void *filter_journal_start(void **journal_save,
- struct filter_obd *filter,
- int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *nb)
-{
- void *handle = NULL;
-
- /* This may not be necessary - we probably never have a
- * transaction started when we enter here, so we can
- * remove the saving of the journal state entirely.
- * For now leave it in just to see if it ever happens.
- */
- *journal_save = current->journal_info;
- if (*journal_save) {
- CERROR("Already have handle %p???\n", *journal_save);
- LBUG();
- current->journal_info = NULL;
- }
-
- if (!strcmp(filter->fo_fstype, "ext3") ||
- !strcmp(filter->fo_fstype, "extN"))
- handle = ext3_filter_journal_start(filter, objcount, obj,
- niocount, nb);
- return handle;
-}
-
-static int ext3_filter_journal_stop(void *handle)
-{
- int rc;
-
- /* We got a refcount on the handle for each call to prepare_write,
- * so we can drop the "parent" handle here to avoid the need for
- * osc to call back into filterobd to close the handle. The
- * remaining references will be dropped in commit_write.
- */
- lock_kernel();
- rc = journal_stop((handle_t *)handle);
- unlock_kernel();
-
- return rc;
-}
-
-static int filter_journal_stop(void *journal_save, struct filter_obd *filter,
- void *handle)
-{
- int rc = 0;
-
- if (!strcmp(filter->fo_fstype, "ext3") ||
- !strcmp(filter->fo_fstype, "extN"))
- rc = ext3_filter_journal_stop(handle);
-
- if (rc)
- CERROR("error on journal stop: rc = %d\n", rc);
-
- current->journal_info = journal_save;
-
- return rc;
-}
-
static inline void lustre_put_page(struct page *page)
{
kunmap(page);
{
struct obd_run_ctxt saved;
struct obd_device *obd;
- struct obd_ioobj *o = obj;
+ struct obd_ioobj *o;
struct niobuf_remote *rnb = nb;
struct niobuf_local *lnb = res;
- void *journal_save = NULL;
+ struct dentry *dir_dentry;
+ struct fsfilt_objinfo *fso;
int pglocked = 0;
int rc = 0;
int i;
ENTRY;
+ memset(res, 0, niocount * sizeof(*res));
+
obd = class_conn2obd(conn);
if (!obd) {
CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
RETURN(-EINVAL);
}
- memset(res, 0, sizeof(*res) * niocount);
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ LASSERT(objcount < 16); // theoretically we support multi-obj BRW
- if (cmd & OBD_BRW_WRITE) {
- *desc_private = filter_journal_start(&journal_save,
- &obd->u.filter,
- objcount, obj, niocount,
- nb);
- if (IS_ERR(*desc_private))
- GOTO(out_ctxt, rc = PTR_ERR(*desc_private));
- }
+ OBD_ALLOC(fso, objcount * sizeof(*fso));
+ if (!fso)
+ RETURN(-ENOMEM);
- obd_kmap_get(niocount, 1);
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ dir_dentry = filter_parent(obd, S_IFREG);
- for (i = 0; i < objcount; i++, o++) {
+ for (i = 0, o = obj; i < objcount; i++, o++) {
struct filter_dentry_data *fdd;
struct dentry *dentry;
- struct inode *inode;
- int j;
- dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG),
- o->ioo_id, S_IFREG, 0);
+ LASSERT(o->ioo_bufcnt);
- if (!(fdd = dentry->d_fsdata) || !atomic_read(&fdd->fdd_open_count))
- CERROR("I/O to unopened object "LPX64"\n", o->ioo_id);
+ dentry = filter_fid2dentry(obd, dir_dentry, o->ioo_id, 0);
if (IS_ERR(dentry))
- GOTO(out_clean, rc = PTR_ERR(dentry));
- inode = dentry->d_inode;
- if (!inode) {
+ GOTO(out_objinfo, rc = PTR_ERR(dentry));
+
+ fso[i].fso_dentry = dentry;
+ fso[i].fso_bufcnt = o->ioo_bufcnt;
+
+ if (!dentry->d_inode) {
CERROR("trying to BRW to non-existent file "LPU64"\n",
o->ioo_id);
- f_dput(dentry);
- GOTO(out_clean, rc = -ENOENT);
+ GOTO(out_objinfo, rc = -ENOENT);
}
+ fdd = dentry->d_fsdata;
+ if (!fdd || !atomic_read(&fdd->fdd_open_count))
+ CDEBUG(D_PAGE, "I/O to unopened object "LPX64"\n",
+ o->ioo_id);
+ }
+
+ if (cmd & OBD_BRW_WRITE) {
+ *desc_private = fsfilt_brw_start(obd, objcount, fso,
+ niocount, nb);
+ if (IS_ERR(*desc_private))
+ GOTO(out_objinfo, rc = PTR_ERR(*desc_private));
+ }
+
+ obd_kmap_get(niocount, 1);
+
+ for (i = 0, o = obj; i < objcount; i++, o++) {
+ struct dentry *dentry = fso->fso_dentry;
+ struct inode *inode = dentry->d_inode;
+ int j;
+
for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
struct page *page;
page = lustre_get_page_read(inode, rnb);
if (IS_ERR(page)) {
- f_dput(dentry);
- GOTO(out_clean, rc = PTR_ERR(page));
+ if (cmd & OBD_BRW_WRITE)
+ fsfilt_commit(obd, dir_dentry->d_inode,
+ *desc_private);
+
+ GOTO(out_pages, rc = PTR_ERR(page));
}
lnb->addr = page_address(page);
}
}
-out_stop:
if (cmd & OBD_BRW_WRITE) {
- int err = filter_journal_stop(journal_save, &obd->u.filter,
- *desc_private);
- if (!rc)
- rc = err;
+ int err = fsfilt_commit(obd, dir_dentry->d_inode,
+ *desc_private);
+ if (err)
+ GOTO(out_pages, rc = err);
}
-out_ctxt:
+
+ EXIT;
+out:
+ OBD_FREE(fso, objcount * sizeof(*fso));
+ current->journal_info = NULL;
pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- RETURN(rc);
-out_clean:
+ return rc;
+
+out_pages:
while (lnb-- > res) {
CERROR("error cleanup on brw\n");
- f_dput(lnb->dentry);
if (cmd & OBD_BRW_WRITE)
filter_commit_write(lnb->page, 0, PAGE_SIZE, rc);
else
lustre_put_page(lnb->page);
}
obd_kmap_put(niocount);
- goto out_stop;
+out_objinfo:
+ for (i = 0; i < objcount && fso[i].fso_dentry; i++)
+ f_dput(fso[i].fso_dentry);
+
+ goto out;
}
static int filter_write_locked_page(struct niobuf_local *lnb)
struct obd_ioobj *o;
struct niobuf_local *r;
struct obd_device *obd = class_conn2obd(conn);
- void *journal_save;
int found_locked = 0;
int rc = 0;
int i;
ENTRY;
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- lock_kernel();
- journal_save = current->journal_info;
- LASSERT(!journal_save);
+ LASSERT(!current->journal_info);
current->journal_info = private;
- unlock_kernel();
+
for (i = 0, o = obj, r = res; i < objcount; i++, o++) {
int j;
+
for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) {
struct page *page = r->page;
f_dput(r->dentry);
}
}
- lock_kernel();
- current->journal_info = journal_save;
- unlock_kernel();
if (!found_locked)
goto out_ctxt;
}
out_ctxt:
+ LASSERT(!current->journal_info);
+ current->journal_info = NULL;
+
pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
RETURN(rc);
}
-static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int filter_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *lsm, obd_count oa_bufs,
+ struct brw_page *pga, struct obd_brw_set *set)
{
- struct obd_device *obd = class_conn2obd(conn);
- struct statfs sfs;
- int rc;
+ struct obd_ioobj ioo;
+ struct niobuf_local *lnb;
+ struct niobuf_remote *rnb;
+ obd_count i;
+ void *desc_private;
+ int ret = 0;
+ ENTRY;
+
+ OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
+ OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
+
+ if ( lnb == NULL || rnb == NULL )
+ GOTO(out, ret = -ENOMEM);
+
+ for ( i = 0 ; i < oa_bufs ; i++ ) {
+ rnb[i].offset = pga[i].off;
+ rnb[i].len = pga[i].count;
+ }
+
+ ioo.ioo_id = lsm->lsm_object_id;
+ ioo.ioo_gr = 0;
+ ioo.ioo_type = S_IFREG;
+ ioo.ioo_bufcnt = oa_bufs;
+
+ ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
+ &desc_private);
+ if ( ret != 0 )
+ GOTO(out, ret);
+
+ for ( i = 0; i < oa_bufs ; i++ ) {
+ void *virt = kmap(pga[i].pg);
+ obd_off off = pga[i].off & ~PAGE_MASK;
+
+ if ( cmd & OBD_BRW_WRITE )
+ memcpy(lnb[i].addr + off, virt + off, pga[i].count);
+ else
+ memcpy(virt + off, lnb[i].addr + off, pga[i].count);
+
+ kunmap(virt);
+ }
+ ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private);
+
+out:
+ if ( lnb )
+ OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
+ if ( rnb )
+ OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
+ RETURN(ret);
+}
+
+static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+{
+ struct obd_device *obd;
ENTRY;
- rc = vfs_statfs(obd->u.filter.fo_sb, &sfs);
- if (!rc)
- statfs_pack(osfs, &sfs);
- return rc;
+ obd = class_conn2obd(conn);
+
+ RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
}
static int filter_get_info(struct lustre_handle *conn, obd_count keylen,
RETURN(0);
}
- if ( keylen == strlen("root_ino") &&
- memcmp(key, "root_ino", keylen) == 0 ){
- *vallen = sizeof(obd_id);
- *val = (void *)(obd_id)FILTER_ROOTINO;
- RETURN(0);
- }
-
CDEBUG(D_IOCTL, "invalid key\n");
RETURN(-EINVAL);
}
RETURN(err);
}
-int filter_attach(struct obd_device *dev, obd_count len, void *data)
-{
- return lprocfs_reg_obd(dev, status_var_nm_1, dev);
-}
-int filter_detach(struct obd_device *dev)
-{
- return lprocfs_dereg_obd(dev);
-}
static struct obd_ops filter_obd_ops = {
o_attach: filter_attach,
o_detach: filter_detach,
o_destroy: filter_destroy,
o_open: filter_open,
o_close: filter_close,
- o_brw: filter_pgcache_brw,
+ o_brw: filter_brw,
o_punch: filter_truncate,
o_preprw: filter_preprw,
o_commitrw: filter_commitrw