From: adilger Date: Sat, 26 Oct 2002 08:29:39 +0000 (+0000) Subject: Support for delayed-destroy of objects on the OST. It isn't perfect, X-Git-Tag: v1_7_100~4400 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=34e3d7751d9c9f37623b284169244658c271fcee;p=fs%2Flustre-release.git Support for delayed-destroy of objects on the OST. It isn't perfect, in the sense that there is still some work to be done for handling multi-client unlink properly, but it passes all of the normal tests, so it is no worse than what we have now. --- diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index e5af36b..75759d3 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -92,12 +92,13 @@ struct filter_obd { struct obd_run_ctxt fo_ctxt; struct dentry *fo_dentry_O; struct dentry *fo_dentry_O_mode[16]; - spinlock_t fo_lock; - __u64 fo_lastino; + spinlock_t fo_objidlock; /* protects fo_lastobjid increment */ + __u64 fo_lastobjid; struct file_operations *fo_fop; struct inode_operations *fo_iop; struct address_space_operations *fo_aops; struct list_head fo_export_list; + spinlock_t fo_fddlock; /* protects setting dentry->d_fsdata */ }; struct mds_server_data; @@ -198,7 +199,7 @@ struct lov_tgt_desc { struct lov_obd { spinlock_t lov_lock; struct obd_device *mdcobd; - struct lov_desc desc; + struct lov_desc desc; int bufsize; int refcount; struct lov_tgt_desc *tgts; diff --git a/lustre/include/linux/obd_filter.h b/lustre/include/linux/obd_filter.h index d37a198..fb3d1ff 100644 --- a/lustre/include/linux/obd_filter.h +++ b/lustre/include/linux/obd_filter.h @@ -30,19 +30,18 @@ /* In-memory access to client data from OST struct */ struct filter_export_data { struct list_head fed_open_head; /* files to close on disconnect */ - spinlock_t fed_lock; + spinlock_t fed_lock; /* protects fed_open_head */ }; /* file data for open files on OST */ struct filter_file_data { - struct list_head ffd_dentry_list; /* dentry open list */ - struct list_head ffd_export_list; /* export open list */ + struct list_head ffd_export_list; /* export open list - fed_lock */ struct file *ffd_file; /* file handle */ __u64 ffd_servercookie; /* cookie for lustre handle */ }; struct filter_dentry_data { - struct list_head fdd_open_head; /* files that have this dentry open */ + atomic_t fdd_open_count; int fdd_flags; }; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index e5041a0..9429b79 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -276,6 +276,7 @@ static int ll_file_release(struct inode *inode, struct file *file) /* If this fails and we goto out_fd, the file size on the MDS is out of * date. Is that a big deal? */ +#warning "FIXME: don't do this if the file is unlinked already" if (file->f_mode & FMODE_WRITE) { struct lustre_handle *lockhs; diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 05544f1..471c50d 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -24,8 +24,19 @@ #include #include #include +#include +#include #include +static kmem_cache_t *lov_file_cache; + +struct lov_file_handles { + struct list_head lfh_list; + __u64 lfh_cookie; + int lfh_count; + struct lustre_handle *lfh_handles; +}; + /* obd methods */ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, obd_uuid_t cluuid, struct recovd_obd *recovd, @@ -35,6 +46,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, struct lov_obd *lov = &obd->u.lov; struct client_obd *mdc = &lov->mdcobd->u.cli; struct lov_desc *desc = &lov->desc; + struct obd_export *exp; struct lustre_handle mdc_conn; obd_uuid_t *uuidarray; int rc, rc2, i; @@ -52,6 +64,9 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, if (lov->refcount > 1) RETURN(0); + exp = class_conn2export(conn); + INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head); + /* retrieve LOV metadata from MDS */ rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover); if (rc) { @@ -128,14 +143,17 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, for (i = 0; i < desc->ld_tgt_count; i++) { struct obd_device *tgt = class_uuid2obd(uuidarray[i]); + if (!tgt) { CERROR("Target %s not attached\n", uuidarray[i]); GOTO(out_disc, rc = -EINVAL); } + if (!(tgt->obd_flags & OBD_SET_UP)) { CERROR("Target %s not set up\n", uuidarray[i]); GOTO(out_disc, rc = -EINVAL); } + rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd, recover); if (rc) { @@ -177,12 +195,14 @@ static int lov_disconnect(struct lustre_handle *conn) { struct obd_device *obd = class_conn2obd(conn); struct lov_obd *lov = &obd->u.lov; + struct obd_export *exp; + struct list_head *p, *n; int rc, i; if (!lov->tgts) goto out_local; - /* Only disconnect the underlying laters on the final disconnect. */ + /* Only disconnect the underlying layers on the final disconnect. */ lov->refcount--; if (lov->refcount != 0) goto out_local; @@ -207,6 +227,19 @@ static int lov_disconnect(struct lustre_handle *conn) lov->bufsize = 0; lov->tgts = NULL; + exp = class_conn2export(conn); + list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) { + /* XXX close these, instead of just discarding them? */ + struct lov_file_handles *lfh; + lfh = list_entry(p, typeof(*lfh), lfh_list); + CERROR("discarding open LOV handle %p:"LPX64"\n", + lfh, lfh->lfh_cookie); + list_del(&lfh->lfh_list); + OBD_FREE(lfh->lfh_handles, + lfh->lfh_count * sizeof(*lfh->lfh_handles)); + kmem_cache_free(lov_file_cache, lfh); + } + out_local: rc = class_disconnect(conn); if (!rc) @@ -299,6 +332,22 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) RETURN(rc); } +static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle) +{ + struct lov_file_handles *lfh = NULL; + + if (!handle || !handle->addr) + RETURN(NULL); + + lfh = (struct lov_file_handles *)(unsigned long)(handle->addr); + if (!kmem_cache_validate(lov_file_cache, lfh)) + RETURN(NULL); + + if (lfh->lfh_cookie != handle->cookie) + RETURN(NULL); + + return lfh; +} /* the LOV expects oa->o_id to be set to the LOV object id */ static int lov_create(struct lustre_handle *conn, struct obdo *oa, @@ -444,6 +493,7 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + struct lov_file_handles *lfh = NULL; int rc = 0, i; ENTRY; @@ -461,11 +511,18 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, if (!export || !export->exp_obd) RETURN(-ENODEV); + if (oa->o_valid & OBD_MD_FLHANDLE) + lfh = lov_handle2lfh(obdo_handle(oa)); + lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; + if (lfh) + memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], + sizeof(lfh->lfh_handles[i])); + else + tmp.o_valid &= ~OBD_MD_FLHANDLE; rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL); if (rc) CERROR("Error destroying objid "LPX64" subobj "LPX64 @@ -531,6 +588,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + struct lov_file_handles *lfh = NULL; int rc = 0, i; int new = 1; ENTRY; @@ -550,8 +608,10 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - oa->o_size = 0; - oa->o_blocks = 0; + + if (oa->o_valid & OBD_MD_FLHANDLE) + lfh = lov_handle2lfh(obdo_handle(oa)); + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { int err; @@ -563,6 +623,11 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; + if (lfh) + memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], + sizeof(lfh->lfh_handles[i])); + else + tmp.o_valid &= ~OBD_MD_FLHANDLE; err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL); if (err) { @@ -581,10 +646,11 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *lsm) { - struct obdo tmp; + struct obdo *tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + struct lov_file_handles *lfh = NULL; int rc = 0, i; ENTRY; @@ -610,15 +676,28 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, /* size changes should go through punch and not setattr */ LASSERT(!(oa->o_valid & OBD_MD_FLSIZE)); + tmp = obdo_alloc(); + if (!tmp) + RETURN(-ENOMEM); + + if (oa->o_valid & OBD_MD_FLHANDLE) + lfh = lov_handle2lfh(obdo_handle(oa)); + lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { int err; - /* create data objects with "parent" OA */ - memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = loi->loi_id; + obdo_cpy_md(tmp, oa, oa->o_valid); - err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL); + if (lfh) + memcpy(obdo_handle(tmp), &lfh->lfh_handles[i], + sizeof(lfh->lfh_handles[i])); + else + tmp->o_valid &= ~OBD_MD_FLHANDLE; + + tmp->o_id = loi->loi_id; + + err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL); if (err) { CERROR("Error setattr objid "LPX64" subobj "LPX64 " on OST idx %d: rc = %d\n", @@ -627,6 +706,7 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, rc = err; } } + obdo_free(tmp); RETURN(rc); } @@ -637,6 +717,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + struct lov_file_handles *lfh = NULL; int new = 1; int rc = 0, i; ENTRY; @@ -659,6 +740,14 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, if (!tmp) RETURN(-ENOMEM); + lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL); + if (!lfh) + GOTO(out_tmp, rc = -ENOMEM); + OBD_ALLOC(lfh->lfh_handles, + lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles)); + if (!lfh->lfh_handles) + GOTO(out_lfh, rc = -ENOMEM); + lov = &export->exp_obd->u.lov; oa->o_size = 0; oa->o_blocks = 0; @@ -680,7 +769,25 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, } lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new); + + if (tmp->o_valid & OBD_MD_FLHANDLE) + memcpy(&lfh->lfh_handles[i], obdo_handle(tmp), + sizeof(lfh->lfh_handles[i])); } + + if (tmp->o_valid & OBD_MD_FLHANDLE) { + struct lustre_handle *handle = obdo_handle(oa); + + lfh->lfh_count = lsm->lsm_stripe_count; + get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie)); + + handle->addr = (__u64)(unsigned long)lfh; + handle->cookie = lfh->lfh_cookie; + oa->o_valid |= OBD_MD_FLHANDLE; + list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head); + } else + goto out_handles; + /* FIXME: returning an error, but having opened some objects is a bad * idea, since they will likely never be closed. We either * need to not return an error if _some_ objects could be @@ -688,8 +795,17 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, * hopefully partial error status) or close all opened objects * and return an error. I think the former is preferred. */ +out_tmp: obdo_free(tmp); RETURN(rc); + +out_handles: + OBD_FREE(lfh->lfh_handles, + lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles)); +out_lfh: + lfh->lfh_cookie = DEAD_HANDLE_MAGIC; + kmem_cache_free(lov_file_cache, lfh); + goto out_tmp; } static int lov_close(struct lustre_handle *conn, struct obdo *oa, @@ -699,6 +815,7 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + struct lov_file_handles *lfh = NULL; int rc = 0, i; ENTRY; @@ -716,6 +833,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, if (!export || !export->exp_obd) RETURN(-ENODEV); + if (oa->o_valid & OBD_MD_FLHANDLE) + lfh = lov_handle2lfh(obdo_handle(oa)); + lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { int err; @@ -723,6 +843,11 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; + if (lfh) + memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], + sizeof(lfh->lfh_handles[i])); + else + tmp.o_valid &= ~OBD_MD_FLHANDLE; err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL); if (err) { @@ -733,6 +858,14 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, rc = err; } } + if (lfh) { + list_del(&lfh->lfh_list); + OBD_FREE(lfh->lfh_handles, + lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles)); + lfh->lfh_cookie = DEAD_HANDLE_MAGIC; + kmem_cache_free(lov_file_cache, lfh); + } + RETURN(rc); } @@ -794,6 +927,7 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + struct lov_file_handles *lfh = NULL; int rc = 0, i; ENTRY; @@ -811,6 +945,9 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, if (!export || !export->exp_obd) RETURN(-ENODEV); + if (oa->o_valid & OBD_MD_FLHANDLE) + lfh = lov_handle2lfh(obdo_handle(oa)); + lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { obd_off starti = lov_stripe_offset(lsm, start, i); @@ -822,6 +959,11 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; + if (lfh) + memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], + sizeof(lfh->lfh_handles[i])); + else + tmp.o_valid &= ~OBD_MD_FLHANDLE; err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL, starti, endi); @@ -1197,16 +1339,24 @@ static int __init lov_init(void) { printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION ", info@clusterfs.com\n"); + lov_file_cache = kmem_cache_create("ll_lov_file_data", + sizeof(struct lov_file_handles), + 0, 0, NULL, NULL); + if (!lov_file_cache) + RETURN(-ENOMEM); + return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME); } static void __exit lov_exit(void) { class_unregister_type(OBD_LOV_DEVICENAME); + if (kmem_cache_destroy(lov_file_cache)) + CERROR("couldn't free LOV open cache\n"); } MODULE_AUTHOR("Cluster File Systems, Inc. "); -MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1"); +MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver " LOV_VERSION); MODULE_LICENSE("GPL"); module_init(lov_init); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 70ea821..15cfb2f 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -28,9 +28,11 @@ #endif #include #include +#include #include -long filter_memory; +static kmem_cache_t *filter_open_cache; +static kmem_cache_t *filter_dentry_cache; #define FILTER_ROOTINO 2 #define FILTER_ROOTINO_STR __stringify(FILTER_ROOTINO) @@ -60,23 +62,36 @@ static int filter_id(char *buf, obd_id id, obd_mode mode) static inline void f_dput(struct dentry *dentry) { + /* Can't go inside filter_ddelete because it can block */ CDEBUG(D_INODE, "putting %s: %p, count = %d\n", dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1); LASSERT(atomic_read(&dentry->d_count) > 0); + dput(dentry); } +/* Not racy w.r.t. others, because we are the only user of this dentry */ +static void filter_drelease(struct dentry *dentry) +{ + if (dentry->d_fsdata) + kmem_cache_free(filter_dentry_cache, dentry->d_fsdata); +} + +struct dentry_operations filter_dops = { + .d_release = filter_drelease, +}; + /* setup the object store with correct subdirectories */ -static int filter_prep(struct obd_device *obddev) +static int filter_prep(struct obd_device *obd) { struct obd_run_ctxt saved; - struct filter_obd *filter = &obddev->u.filter; + struct filter_obd *filter = &obd->u.filter; struct dentry *dentry; struct dentry *root; struct file *file; struct inode *inode; int rc = 0; - __u64 lastino = 2; + __u64 lastobjid = 2; int mode = 0; push_ctxt(&saved, &filter->fo_ctxt, NULL); @@ -150,28 +165,28 @@ static int filter_prep(struct obd_device *obddev) filter->fo_aops = inode->i_mapping->a_ops; if (inode->i_size == 0) { - __u64 disk_lastino = cpu_to_le64(lastino); - ssize_t retval = file->f_op->write(file, (char *)&disk_lastino, - sizeof(disk_lastino), + __u64 disk_lastobjid = cpu_to_le64(lastobjid); + ssize_t retval = file->f_op->write(file,(char *)&disk_lastobjid, + sizeof(disk_lastobjid), &file->f_pos); - if (retval != sizeof(disk_lastino)) { - CDEBUG(D_INODE, "OBD filter: error writing lastino\n"); + if (retval != sizeof(disk_lastobjid)) { + CDEBUG(D_INODE,"OBD filter: error writing lastobjid\n"); filp_close(file, 0); GOTO(out_O_mode, rc = -EIO); } } else { - __u64 disk_lastino; - ssize_t retval = file->f_op->read(file, (char *)&disk_lastino, - sizeof(disk_lastino), + __u64 disk_lastobjid; + ssize_t retval = file->f_op->read(file, (char *)&disk_lastobjid, + sizeof(disk_lastobjid), &file->f_pos); - if (retval != sizeof(disk_lastino)) { - CDEBUG(D_INODE, "OBD filter: error reading lastino\n"); + if (retval != sizeof(disk_lastobjid)) { + CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n"); filp_close(file, 0); GOTO(out_O_mode, rc = -EIO); } - lastino = le64_to_cpu(disk_lastino); + lastobjid = le64_to_cpu(disk_lastobjid); } - filter->fo_lastino = lastino; + filter->fo_lastobjid = lastobjid; filp_close(file, 0); rc = 0; @@ -195,11 +210,11 @@ out_O: } /* cleanup the filter: write last used object id to status file */ -static void filter_post(struct obd_device *obddev) +static void filter_post(struct obd_device *obd) { struct obd_run_ctxt saved; - struct filter_obd *filter = &obddev->u.filter; - __u64 disk_lastino; + struct filter_obd *filter = &obd->u.filter; + __u64 disk_lastobjid; long rc; struct file *file; int mode; @@ -212,11 +227,11 @@ static void filter_post(struct obd_device *obddev) } file->f_pos = 0; - disk_lastino = cpu_to_le64(filter->fo_lastino); - rc = file->f_op->write(file, (char *)&disk_lastino, - sizeof(disk_lastino), &file->f_pos); - if (rc != sizeof(disk_lastino)) - CERROR("OBD filter: error writing lastino: rc = %ld\n", rc); + disk_lastobjid = cpu_to_le64(filter->fo_lastobjid); + rc = file->f_op->write(file, (char *)&disk_lastobjid, + sizeof(disk_lastobjid), &file->f_pos); + if (rc != sizeof(disk_lastobjid)) + CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc); rc = filp_close(file, NULL); if (rc) @@ -235,25 +250,25 @@ out: } -static __u64 filter_next_id(struct obd_device *obddev) +static __u64 filter_next_id(struct obd_device *obd) { obd_id id; - spin_lock(&obddev->u.filter.fo_lock); - id = ++obddev->u.filter.fo_lastino; - spin_unlock(&obddev->u.filter.fo_lock); + spin_lock(&obd->u.filter.fo_objidlock); + id = ++obd->u.filter.fo_lastobjid; + spin_unlock(&obd->u.filter.fo_objidock); - /* FIXME: write the lastino to disk here */ + /* FIXME: write the lastobjid to disk here */ return id; } /* how to get files, dentries, inodes from object id's */ /* parent i_sem is already held if needed for exclusivity */ -static struct dentry *filter_fid2dentry(struct obd_device *obddev, +static struct dentry *filter_fid2dentry(struct obd_device *obd, struct dentry *dparent, __u64 id, __u32 type) { - struct super_block *sb = obddev->u.filter.fo_sb; + struct super_block *sb = obd->u.filter.fo_sb; struct dentry *dchild; char name[32]; int len; @@ -285,6 +300,11 @@ static struct dentry *filter_fid2dentry(struct obd_device *obddev, RETURN(dchild); } + if (!dchild->d_op) + dchild->d_op = &filter_dops; + else + LASSERT(dchild->d_op == &filter_dops); + CDEBUG(D_INODE, "got child obj O/%s/%s: %p, count = %d\n", obd_mode_to_type(type), name, dchild, atomic_read(&dchild->d_count)); @@ -294,10 +314,22 @@ static struct dentry *filter_fid2dentry(struct obd_device *obddev, RETURN(dchild); } -static struct file *filter_obj_open(struct obd_device *obddev, +static inline struct dentry *filter_parent(struct obd_device *obd, + obd_mode mode) +{ + struct filter_obd *filter = &obd->u.filter; + + return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT]; +} + +static struct file *filter_obj_open(struct obd_export *export, __u64 id, __u32 type) { - struct super_block *sb = obddev->u.filter.fo_sb; + struct filter_obd *filter = &export->exp_obd->u.filter; + struct super_block *sb = filter->fo_sb; + struct filter_export_data *fed = &export->exp_filter_data; + struct filter_dentry_data *fdd; + struct filter_file_data *ffd; struct obd_run_ctxt saved; char name[24]; struct file *file; @@ -319,48 +351,181 @@ static struct file *filter_obj_open(struct obd_device *obddev, RETURN(ERR_PTR(-EINVAL)); } + ffd = kmem_cache_alloc(filter_open_cache, SLAB_KERNEL); + if (!ffd) { + CERROR("obdfilter: out of memory\n"); + RETURN(ERR_PTR(-ENOMEM)); + } + + /* We preallocate this to avoid blocking while holding fo_fddlock */ + fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL); + if (!fdd) { + CERROR("obdfilter: out of memory\n"); + GOTO(out_ffd, file = ERR_PTR(-ENOMEM)); + } + filter_id(name, id, type); - push_ctxt(&saved, &obddev->u.filter.fo_ctxt, NULL); + push_ctxt(&saved, &filter->fo_ctxt, NULL); file = filp_open(name, O_RDONLY | O_LARGEFILE, 0 /* type? */); pop_ctxt(&saved); - CDEBUG(D_INODE, "opening obdo %s: rc = %p\n", name, file); - if (IS_ERR(file)) - file = NULL; + GOTO(out_fdd, file); + + spin_lock(&filter->fo_fddlock); + if (file->f_dentry->d_fsdata) { + spin_unlock(&filter->fo_fddlock); + kmem_cache_free(filter_dentry_cache, fdd); + fdd = file->f_dentry->d_fsdata; + LASSERT(kmem_cache_validate(filter_dentry_cache, fdd)); + /* should only happen during client recovery */ + if (fdd->fdd_flags & FILTER_FLAG_DESTROY) + CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id); + atomic_inc(&fdd->fdd_open_count); + } else { + atomic_set(&fdd->fdd_open_count, 1); + spin_lock_init(&fdd->fdd_lock); + fdd->fdd_flags = 0; + /* If this is racy, then we can use {cmp}xchg and atomic_add */ + file->f_dentry->d_fsdata = fdd; + spin_unlock(&filter->fo_fddlock); + } + + get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie)); + ffd->ffd_file = file; + file->private_data = ffd; + + spin_lock(&fed->fed_lock); + list_add(&ffd->ffd_export_list, &fed->fed_open_head); + spin_unlock(&fed->fed_lock); + + CDEBUG(D_INODE, "opening objid "LPX64": rc = %p\n", id, file); + +out: RETURN(file); + +out_fdd: + kmem_cache_free(filter_dentry_cache, fdd); +out_ffd: + ffd->ffd_servercookie = DEAD_HANDLE_MAGIC; + kmem_cache_free(filter_open_cache, ffd); + goto out; } -static struct dentry *filter_parent(struct obd_device *obddev, obd_mode mode) +/* Caller must hold i_sem on dir_dentry->d_inode */ +static int filter_destroy_internal(struct obd_device *obd, + struct dentry *dir_dentry, + struct dentry *object_dentry) { - struct filter_obd *filter = &obddev->u.filter; + struct obd_run_ctxt saved; + struct inode *inode = object_dentry->d_inode; + int rc; - return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT]; + if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) { + CERROR("destroying objid %*s nlink = %d, count = %d\n", + object_dentry->d_name.len, + object_dentry->d_name.name, + inode->i_nlink, atomic_read(&inode->i_count)); + } + + push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); + rc = vfs_unlink(dir_dentry->d_inode, object_dentry); + /* XXX unlink from PENDING directory now too */ + pop_ctxt(&saved); + + if (rc) + CERROR("error unlinking objid %*s: rc %d\n", + object_dentry->d_name.len, + object_dentry->d_name.name, rc); + + return rc; } +static int filter_close_internal(struct obd_device *obd, + struct filter_file_data *ffd) +{ + struct file *filp = ffd->ffd_file; + struct dentry *object_dentry = dget(filp->f_dentry); + struct filter_dentry_data *fdd = object_dentry->d_fsdata; + int rc, rc2 = 0; + ENTRY; + + LASSERT(filp->private_data == ffd); + LASSERT(fdd); + + rc = filp_close(filp, 0); + + if (atomic_dec_and_test(&fdd->fdd_open_count) && + fdd->fdd_flags & FILTER_FLAG_DESTROY) { + struct dentry *dir_dentry = filter_parent(obd, S_IFREG); + + down(&dir_dentry->d_inode->i_sem); + rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry); + if (rc2 && !rc) + rc = rc2; + up(&dir_dentry->d_inode->i_sem); + } + + f_dput(object_dentry); + kmem_cache_free(filter_open_cache, ffd); + + RETURN(rc); +} /* obd methods */ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd, obd_uuid_t cluuid, struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover) { + struct obd_export *exp; int rc; + ENTRY; MOD_INC_USE_COUNT; rc = class_connect(conn, obd, cluuid); if (rc) - MOD_DEC_USE_COUNT; + GOTO(out_dec, rc); + exp = class_conn2export(conn); + LASSERT(exp); + + INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head); + spin_lock_init(&exp->exp_filter_data.fed_lock); +out: RETURN(rc); + +out_dec: + MOD_DEC_USE_COUNT; + goto out; } static int filter_disconnect(struct lustre_handle *conn) { - struct obd_export *export = class_conn2export(conn); + struct obd_export *exp = class_conn2export(conn); + struct filter_export_data *fed; int rc; ENTRY; - ldlm_cancel_locks_for_export(export); + LASSERT(exp); + fed = &exp->exp_filter_data; + spin_lock(&exp->exp_filter_data); + while(!list_empty(&fed->fed_open_head)) { + struct filter_file_data *ffd; + ffd = list_entry(fed->fed_open_head.next, typeof(*ffd), + ffd_export_list); + list_del(&ffd->ffd_export_list); + spin_unlock(&exp->exp_filter_data); + + CERROR("force closing file %*s on disconnect\n", + ffd->ffd_file->f_dentry->d_name.len, + ffd->ffd_file->f_dentry->d_name.name); + + filter_close_internal(exp->exp_obd, ffd); + spin_lock(&exp->exp_filter_data); + } + spin_unlock(&exp->exp_filter_data); + + ldlm_cancel_locks_for_export(exp); rc = class_disconnect(conn); if (!rc) MOD_DEC_USE_COUNT; @@ -370,7 +535,7 @@ static int filter_disconnect(struct lustre_handle *conn) } /* mount the file system (secretly) */ -static int filter_setup(struct obd_device *obddev, obd_count len, void *buf) +static int filter_setup(struct obd_device *obd, obd_count len, void *buf) { struct obd_ioctl_data* data = buf; struct filter_obd *filter; @@ -387,10 +552,11 @@ static int filter_setup(struct obd_device *obddev, obd_count len, void *buf) if (IS_ERR(mnt)) GOTO(err_dec, err); - filter = &obddev->u.filter;; + filter = &obd->u.filter;; filter->fo_vfsmnt = mnt; filter->fo_fstype = strdup(data->ioc_inlbuf2); filter->fo_sb = mnt->mnt_root->d_inode->i_sb; + CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, filter->fo_vfsmnt); /* XXX is this even possible if do_kern_mount succeeded? */ if (!filter->fo_sb) GOTO(err_kfree, err = -ENODEV); @@ -400,18 +566,19 @@ static int filter_setup(struct obd_device *obddev, obd_count len, void *buf) filter->fo_ctxt.pwd = mnt->mnt_root; filter->fo_ctxt.fs = get_ds(); - err = filter_prep(obddev); + err = filter_prep(obd); if (err) GOTO(err_kfree, err); spin_lock_init(&filter->fo_lock); + INIT_LIST_HEAD(&filter->fo_export_list); - obddev->obd_namespace = + obd->obd_namespace = ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER); - if (obddev->obd_namespace == NULL) + if (obd->obd_namespace == NULL) LBUG(); ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, - "filter_ldlm_client", &obddev->obd_ldlm_client); + "filter_ldlm_client", &obd->obd_ldlm_client); RETURN(0); @@ -428,33 +595,33 @@ err_dec: } -static int filter_cleanup(struct obd_device * obddev) +static int filter_cleanup(struct obd_device *obd) { struct super_block *sb; ENTRY; - if (!list_empty(&obddev->obd_exports)) { + if (!list_empty(&obd->obd_exports)) { CERROR("still has clients!\n"); - class_disconnect_all(obddev); - if (!list_empty(&obddev->obd_exports)) { + class_disconnect_all(obd); + if (!list_empty(&obd->obd_exports)) { CERROR("still has exports after forced cleanup?\n"); RETURN(-EBUSY); } } - ldlm_namespace_free(obddev->obd_namespace); + ldlm_namespace_free(obd->obd_namespace); - sb = obddev->u.filter.fo_sb; - if (!obddev->u.filter.fo_sb) + sb = obd->u.filter.fo_sb; + if (!obd->u.filter.fo_sb) RETURN(0); - filter_post(obddev); + filter_post(obd); shrink_dcache_parent(sb->s_root); unlock_kernel(); - mntput(obddev->u.filter.fo_vfsmnt); - obddev->u.filter.fo_sb = 0; - kfree(obddev->u.filter.fo_fstype); + mntput(obd->u.filter.fo_vfsmnt); + obd->u.filter.fo_sb = 0; + kfree(obd->u.filter.fo_fstype); lock_kernel(); @@ -463,8 +630,7 @@ static int filter_cleanup(struct obd_device * obddev) } -static inline void filter_from_inode(struct obdo *oa, struct inode *inode, - int valid) +static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid) { int type = oa->o_mode & S_IFMT; ENTRY; @@ -485,33 +651,72 @@ static inline void filter_from_inode(struct obdo *oa, struct inode *inode, EXIT; } -static int filter_getattr(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) +static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle) { - struct obd_device *obddev = class_conn2obd(conn); - struct dentry *dentry; - int rc = 0; + struct filter_file_data *ffd = NULL; ENTRY; - if (!class_conn2export(conn)) { - CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr); - RETURN(-EINVAL); + if (!handle || !handle->addr) + RETURN(NULL); + + ffd = (struct filter_file_data *)(unsigned long)(handle->addr); + if (!kmem_cache_validate(filter_open_cache, (void *)ffd)) + RETURN(NULL); + + if (ffd->ffd_servercookie != handle->cookie) + RETURN(NULL); + + LASSERT(ffd->ffd_file->private_data == ffd); + RETURN(ffd); +} + +static struct dentry *__filter_oa2dentry(struct lustre_handle *conn, + struct obdo *oa, char *what) +{ + struct dentry *dentry = NULL; + + if (oa->o_valid & OBD_MD_FLHANDLE) { + struct lustre_handle *ost_handle = obdo_handle(oa); + struct filter_file_data *ffd = filter_handle2ffd(ost_handle); + + if (ffd) + dentry = dget(ffd->ffd_file->f_dentry); } - obddev = class_conn2obd(conn); - dentry = filter_fid2dentry(obddev, filter_parent(obddev, oa->o_mode), - oa->o_id, oa->o_mode); - if (IS_ERR(dentry)) - RETURN(PTR_ERR(dentry)); + if (!dentry) { + struct obd_device *obd = class_conn2obd(conn); + if (!obd) { + CERROR("invalid client "LPX64"\n", conn->addr); + RETURN(ERR_PTR(-EINVAL)); + } + dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode), + oa->o_id, oa->o_mode); + } if (!dentry->d_inode) { - CERROR("getattr on non-existent object: "LPU64"\n", oa->o_id); - GOTO(out_getattr, rc = -ENOENT); + CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id); + f_dput(dentry); + dentry = ERR_PTR(-ENOENT); } + return dentry; +} + +#define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__) + +static int filter_getattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) +{ + struct dentry *dentry = NULL; + int rc = 0; + ENTRY; + + dentry = filter_oa2dentry(conn, oa); + if (IS_ERR(dentry)) + RETURN(PTR_ERR(dentry)); + filter_from_inode(oa, dentry->d_inode, oa->o_valid); -out_getattr: f_dput(dentry); RETURN(rc); } @@ -527,18 +732,14 @@ static int filter_setattr(struct lustre_handle *conn, struct obdo *oa, int rc; ENTRY; - iattr_from_obdo(&iattr, oa, oa->o_valid); - iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG; - dentry = filter_fid2dentry(obd, filter_parent(obd, iattr.ia_mode), - oa->o_id, iattr.ia_mode); + dentry = filter_oa2dentry(conn, oa); + if (IS_ERR(dentry)) RETURN(PTR_ERR(dentry)); + iattr_from_obdo(&iattr, oa, oa->o_valid); + iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG; inode = dentry->d_inode; - if (!inode) { - CERROR("setattr on non-existent object: "LPU64"\n", oa->o_id); - GOTO(out_setattr, rc = -ENOENT); - } lock_kernel(); if (iattr.ia_valid & ATTR_SIZE) @@ -556,7 +757,6 @@ static int filter_setattr(struct lustre_handle *conn, struct obdo *oa, } unlock_kernel(); -out_setattr: f_dput(dentry); RETURN(rc); } @@ -565,9 +765,11 @@ static int filter_open(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *ea) { struct obd_export *export; - struct obd_device *obd; - struct dentry *dentry; - /* ENTRY; */ + struct lustre_handle *handle; + struct filter_file_data *ffd; + struct file *filp; + int rc = 0; + ENTRY; export = class_conn2export(conn); if (!export) { @@ -575,56 +777,65 @@ static int filter_open(struct lustre_handle *conn, struct obdo *oa, RETURN(-EINVAL); } - obd = export->exp_obd; - dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode), - oa->o_id, oa->o_mode); - if (IS_ERR(dentry)) - RETURN(PTR_ERR(dentry)); - - if (!dentry->d_inode) { - CERROR("opening non-existent objid "LPX64"\n", oa->o_id); - RETURN(-ENOENT); - } + filp = filter_obj_open(export, oa->o_id, oa->o_mode); + if (IS_ERR(filp)) + GOTO(out, rc = PTR_ERR(filp)); - filter_from_inode(oa, dentry->d_inode, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); + filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid); - return 0; + ffd = filp->private_data; + handle = obdo_handle(oa); + handle->addr = (__u64)(unsigned long)ffd; + handle->cookie = ffd->ffd_servercookie; + oa->o_valid |= OBD_MD_FLHANDLE; +out: + RETURN(rc); } /* filter_open */ static int filter_close(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *ea) { - struct obd_device *obd; - struct dentry *dentry; - /* ENTRY; */ + struct obd_export *exp; + struct filter_file_data *ffd; + int rc; + ENTRY; - obd = class_conn2obd(conn); - if (!obd) { + exp = class_conn2export(conn); + if (!exp) { CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr); RETURN(-EINVAL); } - dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode), - oa->o_id, oa->o_mode); - if (IS_ERR(dentry)) - RETURN(PTR_ERR(dentry)); - LASSERT(atomic_read(&dentry->d_count) > 1); + if (!(oa->o_valid & OBD_MD_FLHANDLE)) { + CERROR("no handle for close of objid "LPX64"\n", oa->o_id); + RETURN(-EINVAL); + } - f_dput(dentry); /* for the close */ - f_dput(dentry); /* for this call */ - return 0; + ffd = filter_handle2ffd(obdo_handle(oa)); + if (!ffd) { + struct lustre_handle *handle = obdo_handle(oa); + CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n", + handle->addr, handle->cookie); + RETURN(-ESTALE); + } + + spin_lock(&exp->exp_filter_data); + list_del(&ffd->ffd_export_list); + spin_unlock(&exp->exp_filter_data); + + rc = filter_close_internal(exp->exp_obd, ffd); + + RETURN(rc); } /* filter_close */ static int filter_create(struct lustre_handle* conn, struct obdo *oa, struct lov_stripe_md **ea) { + struct obd_device *obd = class_conn2obd(conn); char name[64]; struct obd_run_ctxt saved; struct dentry *new; int mode; - struct obd_device *obd = class_conn2obd(conn); - struct filter_obd *filter = &obd->u.filter; struct iattr; ENTRY; @@ -643,10 +854,8 @@ static int filter_create(struct lustre_handle* conn, struct obdo *oa, //filter_id(name, oa->o_id, oa->o_mode); sprintf(name, LPU64, oa->o_id); - mode = (oa->o_mode & ~S_IFMT) | S_IFREG; push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - new = simple_mknod(filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT], name, - mode); + new = simple_mknod(filter_parent(obd, oa->o_mode), name, mode); pop_ctxt(&saved); if (IS_ERR(new)) { CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(new)); @@ -657,7 +866,7 @@ static int filter_create(struct lustre_handle* conn, struct obdo *oa, oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME; filter_from_inode(oa, new->d_inode, oa->o_valid); - dput(new); + f_dput(new); return 0; } @@ -665,15 +874,11 @@ static int filter_create(struct lustre_handle* conn, struct obdo *oa, static int filter_destroy(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *ea) { - struct obd_device *obd; - struct filter_obd *filter; - struct obd_run_ctxt saved; - struct inode *inode; + struct obd_device *obd = class_conn2obd(conn); struct dentry *dir_dentry, *object_dentry; int rc; ENTRY; - obd = class_conn2obd(conn); if (!obd) { CERROR("invalid client "LPX64"\n", conn->addr); RETURN(-EINVAL); @@ -684,34 +889,22 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa, dir_dentry = filter_parent(obd, oa->o_mode); down(&dir_dentry->d_inode->i_sem); - object_dentry = filter_fid2dentry(obd, dir_dentry, oa->o_id, - oa->o_mode); + object_dentry = filter_oa2dentry(conn, oa); if (IS_ERR(object_dentry)) GOTO(out, rc = -ENOENT); - inode = object_dentry->d_inode; - if (!inode) { - CERROR("trying to destroy negative inode "LPX64"!\n", oa->o_id); - GOTO(out, rc = -ENOENT); - } - - if (inode->i_nlink != 1) { - CERROR("destroying inode with nlink = %d\n", inode->i_nlink); - LBUG(); - inode->i_nlink = 1; - } - inode->i_mode = S_IFREG; + if (object_dentry->d_fsdata) { + struct filter_dentry_data *fdd = object_dentry->d_fsdata; - if (atomic_read(&inode->i_count) > 1) { -#warning FIXME: need to handle open-unlinked case and move to hold dir - CERROR("inode has count %d\n", atomic_read(&inode->i_count)); + if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) { + fdd->fdd_flags |= FILTER_FLAG_DESTROY; + /* XXX put into PENDING directory in case of crash */ + } + GOTO(out_dput, rc = 0); } - filter = &obd->u.filter; - push_ctxt(&saved, &filter->fo_ctxt, NULL); - - rc = vfs_unlink(dir_dentry->d_inode, object_dentry); - pop_ctxt(&saved); + rc = filter_destroy_internal(obd, dir_dentry, object_dentry); +out_dput: f_dput(object_dentry); EXIT; @@ -722,7 +915,7 @@ out: /* NB count and offset are used for punch, but not truncate */ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md, + struct lov_stripe_md *lsm, obd_off start, obd_off end) { int error; @@ -743,26 +936,26 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, struct brw_page *pga, brw_callback_t callback, struct io_cb_data *data) { + struct obd_export *export = class_conn2export(conn); struct obd_run_ctxt saved; struct super_block *sb; int pnum; /* index to pages (bufs) */ unsigned long retval; int error; struct file *file; - struct obd_device *obd = class_conn2obd(conn); int pg; ENTRY; - if (!obd) { + if (!export) { CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr); RETURN(-EINVAL); } - sb = obd->u.filter.fo_sb; - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); + sb = export->exp_obd->u.filter.fo_sb; + push_ctxt(&saved, &export->exp_obd->u.filter.fo_ctxt, NULL); pnum = 0; /* pnum indexes buf 0..num_pages */ - file = filter_obj_open(obd, lsm->lsm_object_id, S_IFREG); + file = filter_obj_open(export, lsm->lsm_object_id, S_IFREG); if (IS_ERR(file)) GOTO(out, retval = PTR_ERR(file)); @@ -1357,10 +1550,12 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn, ENTRY; push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); + lock_kernel(); journal_save = current->journal_info; LASSERT(!journal_save); current->journal_info = private; + unlock_kernel(); for (i = 0, o = obj, r = res; i < objcount; i++, o++) { int j; for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) { @@ -1386,7 +1581,9 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn, f_dput(r->dentry); } } + lock_kernel(); current->journal_info = journal_save; + unlock_kernel(); if (!found_locked) goto out_ctxt; @@ -1579,12 +1776,30 @@ static struct obd_ops filter_obd_ops = { static int __init obdfilter_init(void) { printk(KERN_INFO "Filtering OBD driver v0.001, info@clusterfs.com\n"); + filter_open_cache = kmem_cache_create("ll_filter_fdata", + sizeof(struct filter_file_data), + 0, 0, NULL, NULL); + if (!filter_open_cache) + RETURN(-ENOMEM); + + filter_dentry_cache = kmem_cache_create("ll_filter_dentry", + sizeof(struct filter_dentry_data), + 0, 0, NULL, NULL); + if (!filter_dentry_cache) { + kmem_cache_destroy(filter_open_cache); + RETURN(-ENOMEM); + } + return class_register_type(&filter_obd_ops, OBD_FILTER_DEVICENAME); } static void __exit obdfilter_exit(void) { class_unregister_type(OBD_FILTER_DEVICENAME); + if (kmem_cache_destroy(filter_dentry_cache)) + CERROR("couldn't free obdfilter dentry cache\n"); + if (kmem_cache_destroy(filter_open_cache)) + CERROR("couldn't free obdfilter open cache\n"); } MODULE_AUTHOR("Cluster File Systems, Inc. ");