#include <linux/obd_class.h>
#include <linux/obd_lov.h>
#include <linux/init.h>
+#include <linux/random.h>
+#include <linux/slab.h>
#include <asm/div64.h>
+static kmem_cache_t *lov_file_cache;
+
+struct lov_file_handles {
+ struct list_head lfh_list;
+ __u64 lfh_cookie;
+ int lfh_count;
+ struct lustre_handle *lfh_handles;
+};
+
/* obd methods */
static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
obd_uuid_t cluuid, struct recovd_obd *recovd,
struct lov_obd *lov = &obd->u.lov;
struct client_obd *mdc = &lov->mdcobd->u.cli;
struct lov_desc *desc = &lov->desc;
+ struct obd_export *exp;
struct lustre_handle mdc_conn;
obd_uuid_t *uuidarray;
int rc, rc2, i;
if (lov->refcount > 1)
RETURN(0);
+ exp = class_conn2export(conn);
+ INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
+
/* retrieve LOV metadata from MDS */
rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
if (rc) {
for (i = 0; i < desc->ld_tgt_count; i++) {
struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
+
if (!tgt) {
CERROR("Target %s not attached\n", uuidarray[i]);
GOTO(out_disc, rc = -EINVAL);
}
+
if (!(tgt->obd_flags & OBD_SET_UP)) {
CERROR("Target %s not set up\n", uuidarray[i]);
GOTO(out_disc, rc = -EINVAL);
}
+
rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
recover);
if (rc) {
{
struct obd_device *obd = class_conn2obd(conn);
struct lov_obd *lov = &obd->u.lov;
+ struct obd_export *exp;
+ struct list_head *p, *n;
int rc, i;
if (!lov->tgts)
goto out_local;
- /* Only disconnect the underlying laters on the final disconnect. */
+ /* Only disconnect the underlying layers on the final disconnect. */
lov->refcount--;
if (lov->refcount != 0)
goto out_local;
lov->bufsize = 0;
lov->tgts = NULL;
+ exp = class_conn2export(conn);
+ list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
+ /* XXX close these, instead of just discarding them? */
+ struct lov_file_handles *lfh;
+ lfh = list_entry(p, typeof(*lfh), lfh_list);
+ CERROR("discarding open LOV handle %p:"LPX64"\n",
+ lfh, lfh->lfh_cookie);
+ list_del(&lfh->lfh_list);
+ OBD_FREE(lfh->lfh_handles,
+ lfh->lfh_count * sizeof(*lfh->lfh_handles));
+ kmem_cache_free(lov_file_cache, lfh);
+ }
+
out_local:
rc = class_disconnect(conn);
if (!rc)
RETURN(rc);
}
+static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
+{
+ struct lov_file_handles *lfh = NULL;
+
+ if (!handle || !handle->addr)
+ RETURN(NULL);
+
+ lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
+ if (!kmem_cache_validate(lov_file_cache, lfh))
+ RETURN(NULL);
+
+ if (lfh->lfh_cookie != handle->cookie)
+ RETURN(NULL);
+
+ return lfh;
+}
/* the LOV expects oa->o_id to be set to the LOV object id */
static int lov_create(struct lustre_handle *conn, struct obdo *oa,
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
+ struct lov_file_handles *lfh = NULL;
int rc = 0, i;
ENTRY;
if (!export || !export->exp_obd)
RETURN(-ENODEV);
+ if (oa->o_valid & OBD_MD_FLHANDLE)
+ lfh = lov_handle2lfh(obdo_handle(oa));
+
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- /* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
+ if (lfh)
+ memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+ sizeof(lfh->lfh_handles[i]));
+ else
+ tmp.o_valid &= ~OBD_MD_FLHANDLE;
rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
if (rc)
CERROR("Error destroying objid "LPX64" subobj "LPX64
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
+ struct lov_file_handles *lfh = NULL;
int rc = 0, i;
int new = 1;
ENTRY;
RETURN(-ENODEV);
lov = &export->exp_obd->u.lov;
- oa->o_size = 0;
- oa->o_blocks = 0;
+
+ if (oa->o_valid & OBD_MD_FLHANDLE)
+ lfh = lov_handle2lfh(obdo_handle(oa));
+
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
int err;
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
+ if (lfh)
+ memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+ sizeof(lfh->lfh_handles[i]));
+ else
+ tmp.o_valid &= ~OBD_MD_FLHANDLE;
err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
if (err) {
static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm)
{
- struct obdo tmp;
+ struct obdo *tmp;
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
+ struct lov_file_handles *lfh = NULL;
int rc = 0, i;
ENTRY;
/* size changes should go through punch and not setattr */
LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
+ tmp = obdo_alloc();
+ if (!tmp)
+ RETURN(-ENOMEM);
+
+ if (oa->o_valid & OBD_MD_FLHANDLE)
+ lfh = lov_handle2lfh(obdo_handle(oa));
+
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
int err;
- /* create data objects with "parent" OA */
- memcpy(&tmp, oa, sizeof(tmp));
- tmp.o_id = loi->loi_id;
+ obdo_cpy_md(tmp, oa, oa->o_valid);
- err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
+ if (lfh)
+ memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
+ sizeof(lfh->lfh_handles[i]));
+ else
+ tmp->o_valid &= ~OBD_MD_FLHANDLE;
+
+ tmp->o_id = loi->loi_id;
+
+ err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
if (err) {
CERROR("Error setattr objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n",
rc = err;
}
}
+ obdo_free(tmp);
RETURN(rc);
}
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
+ struct lov_file_handles *lfh = NULL;
int new = 1;
int rc = 0, i;
ENTRY;
if (!tmp)
RETURN(-ENOMEM);
+ lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
+ if (!lfh)
+ GOTO(out_tmp, rc = -ENOMEM);
+ OBD_ALLOC(lfh->lfh_handles,
+ lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+ if (!lfh->lfh_handles)
+ GOTO(out_lfh, rc = -ENOMEM);
+
lov = &export->exp_obd->u.lov;
oa->o_size = 0;
oa->o_blocks = 0;
}
lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
+
+ if (tmp->o_valid & OBD_MD_FLHANDLE)
+ memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
+ sizeof(lfh->lfh_handles[i]));
}
+
+ if (tmp->o_valid & OBD_MD_FLHANDLE) {
+ struct lustre_handle *handle = obdo_handle(oa);
+
+ lfh->lfh_count = lsm->lsm_stripe_count;
+ get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
+
+ handle->addr = (__u64)(unsigned long)lfh;
+ handle->cookie = lfh->lfh_cookie;
+ oa->o_valid |= OBD_MD_FLHANDLE;
+ list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
+ } else
+ goto out_handles;
+
/* FIXME: returning an error, but having opened some objects is a bad
* idea, since they will likely never be closed. We either
* need to not return an error if _some_ objects could be
* hopefully partial error status) or close all opened objects
* and return an error. I think the former is preferred.
*/
+out_tmp:
obdo_free(tmp);
RETURN(rc);
+
+out_handles:
+ OBD_FREE(lfh->lfh_handles,
+ lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+out_lfh:
+ lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
+ kmem_cache_free(lov_file_cache, lfh);
+ goto out_tmp;
}
static int lov_close(struct lustre_handle *conn, struct obdo *oa,
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
+ struct lov_file_handles *lfh = NULL;
int rc = 0, i;
ENTRY;
if (!export || !export->exp_obd)
RETURN(-ENODEV);
+ if (oa->o_valid & OBD_MD_FLHANDLE)
+ lfh = lov_handle2lfh(obdo_handle(oa));
+
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
int err;
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
+ if (lfh)
+ memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+ sizeof(lfh->lfh_handles[i]));
+ else
+ tmp.o_valid &= ~OBD_MD_FLHANDLE;
err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
if (err) {
rc = err;
}
}
+ if (lfh) {
+ list_del(&lfh->lfh_list);
+ OBD_FREE(lfh->lfh_handles,
+ lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+ lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
+ kmem_cache_free(lov_file_cache, lfh);
+ }
+
RETURN(rc);
}
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
+ struct lov_file_handles *lfh = NULL;
int rc = 0, i;
ENTRY;
if (!export || !export->exp_obd)
RETURN(-ENODEV);
+ if (oa->o_valid & OBD_MD_FLHANDLE)
+ lfh = lov_handle2lfh(obdo_handle(oa));
+
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
obd_off starti = lov_stripe_offset(lsm, start, i);
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
+ if (lfh)
+ memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
+ sizeof(lfh->lfh_handles[i]));
+ else
+ tmp.o_valid &= ~OBD_MD_FLHANDLE;
err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
starti, endi);
{
printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
", info@clusterfs.com\n");
+ lov_file_cache = kmem_cache_create("ll_lov_file_data",
+ sizeof(struct lov_file_handles),
+ 0, 0, NULL, NULL);
+ if (!lov_file_cache)
+ RETURN(-ENOMEM);
+
return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
}
static void __exit lov_exit(void)
{
class_unregister_type(OBD_LOV_DEVICENAME);
+ if (kmem_cache_destroy(lov_file_cache))
+ CERROR("couldn't free LOV open cache\n");
}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
+MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver " LOV_VERSION);
MODULE_LICENSE("GPL");
module_init(lov_init);
#endif
#include <linux/quotaops.h>
#include <linux/init.h>
+#include <linux/random.h>
#include <linux/stringify.h>
-long filter_memory;
+static kmem_cache_t *filter_open_cache;
+static kmem_cache_t *filter_dentry_cache;
#define FILTER_ROOTINO 2
#define FILTER_ROOTINO_STR __stringify(FILTER_ROOTINO)
static inline void f_dput(struct dentry *dentry)
{
+ /* Can't go inside filter_ddelete because it can block */
CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
LASSERT(atomic_read(&dentry->d_count) > 0);
+
dput(dentry);
}
+/* Not racy w.r.t. others, because we are the only user of this dentry */
+static void filter_drelease(struct dentry *dentry)
+{
+ if (dentry->d_fsdata)
+ kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
+}
+
+struct dentry_operations filter_dops = {
+ .d_release = filter_drelease,
+};
+
/* setup the object store with correct subdirectories */
-static int filter_prep(struct obd_device *obddev)
+static int filter_prep(struct obd_device *obd)
{
struct obd_run_ctxt saved;
- struct filter_obd *filter = &obddev->u.filter;
+ struct filter_obd *filter = &obd->u.filter;
struct dentry *dentry;
struct dentry *root;
struct file *file;
struct inode *inode;
int rc = 0;
- __u64 lastino = 2;
+ __u64 lastobjid = 2;
int mode = 0;
push_ctxt(&saved, &filter->fo_ctxt, NULL);
filter->fo_aops = inode->i_mapping->a_ops;
if (inode->i_size == 0) {
- __u64 disk_lastino = cpu_to_le64(lastino);
- ssize_t retval = file->f_op->write(file, (char *)&disk_lastino,
- sizeof(disk_lastino),
+ __u64 disk_lastobjid = cpu_to_le64(lastobjid);
+ ssize_t retval = file->f_op->write(file,(char *)&disk_lastobjid,
+ sizeof(disk_lastobjid),
&file->f_pos);
- if (retval != sizeof(disk_lastino)) {
- CDEBUG(D_INODE, "OBD filter: error writing lastino\n");
+ if (retval != sizeof(disk_lastobjid)) {
+ CDEBUG(D_INODE,"OBD filter: error writing lastobjid\n");
filp_close(file, 0);
GOTO(out_O_mode, rc = -EIO);
}
} else {
- __u64 disk_lastino;
- ssize_t retval = file->f_op->read(file, (char *)&disk_lastino,
- sizeof(disk_lastino),
+ __u64 disk_lastobjid;
+ ssize_t retval = file->f_op->read(file, (char *)&disk_lastobjid,
+ sizeof(disk_lastobjid),
&file->f_pos);
- if (retval != sizeof(disk_lastino)) {
- CDEBUG(D_INODE, "OBD filter: error reading lastino\n");
+ if (retval != sizeof(disk_lastobjid)) {
+ CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
filp_close(file, 0);
GOTO(out_O_mode, rc = -EIO);
}
- lastino = le64_to_cpu(disk_lastino);
+ lastobjid = le64_to_cpu(disk_lastobjid);
}
- filter->fo_lastino = lastino;
+ filter->fo_lastobjid = lastobjid;
filp_close(file, 0);
rc = 0;
}
/* cleanup the filter: write last used object id to status file */
-static void filter_post(struct obd_device *obddev)
+static void filter_post(struct obd_device *obd)
{
struct obd_run_ctxt saved;
- struct filter_obd *filter = &obddev->u.filter;
- __u64 disk_lastino;
+ struct filter_obd *filter = &obd->u.filter;
+ __u64 disk_lastobjid;
long rc;
struct file *file;
int mode;
}
file->f_pos = 0;
- disk_lastino = cpu_to_le64(filter->fo_lastino);
- rc = file->f_op->write(file, (char *)&disk_lastino,
- sizeof(disk_lastino), &file->f_pos);
- if (rc != sizeof(disk_lastino))
- CERROR("OBD filter: error writing lastino: rc = %ld\n", rc);
+ disk_lastobjid = cpu_to_le64(filter->fo_lastobjid);
+ rc = file->f_op->write(file, (char *)&disk_lastobjid,
+ sizeof(disk_lastobjid), &file->f_pos);
+ if (rc != sizeof(disk_lastobjid))
+ CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
rc = filp_close(file, NULL);
if (rc)
}
-static __u64 filter_next_id(struct obd_device *obddev)
+static __u64 filter_next_id(struct obd_device *obd)
{
obd_id id;
- spin_lock(&obddev->u.filter.fo_lock);
- id = ++obddev->u.filter.fo_lastino;
- spin_unlock(&obddev->u.filter.fo_lock);
+ spin_lock(&obd->u.filter.fo_objidlock);
+ id = ++obd->u.filter.fo_lastobjid;
+ spin_unlock(&obd->u.filter.fo_objidock);
- /* FIXME: write the lastino to disk here */
+ /* FIXME: write the lastobjid to disk here */
return id;
}
/* how to get files, dentries, inodes from object id's */
/* parent i_sem is already held if needed for exclusivity */
-static struct dentry *filter_fid2dentry(struct obd_device *obddev,
+static struct dentry *filter_fid2dentry(struct obd_device *obd,
struct dentry *dparent,
__u64 id, __u32 type)
{
- struct super_block *sb = obddev->u.filter.fo_sb;
+ struct super_block *sb = obd->u.filter.fo_sb;
struct dentry *dchild;
char name[32];
int len;
RETURN(dchild);
}
+ if (!dchild->d_op)
+ dchild->d_op = &filter_dops;
+ else
+ LASSERT(dchild->d_op == &filter_dops);
+
CDEBUG(D_INODE, "got child obj O/%s/%s: %p, count = %d\n",
obd_mode_to_type(type), name, dchild,
atomic_read(&dchild->d_count));
RETURN(dchild);
}
-static struct file *filter_obj_open(struct obd_device *obddev,
+static inline struct dentry *filter_parent(struct obd_device *obd,
+ obd_mode mode)
+{
+ struct filter_obd *filter = &obd->u.filter;
+
+ return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+}
+
+static struct file *filter_obj_open(struct obd_export *export,
__u64 id, __u32 type)
{
- struct super_block *sb = obddev->u.filter.fo_sb;
+ struct filter_obd *filter = &export->exp_obd->u.filter;
+ struct super_block *sb = filter->fo_sb;
+ struct filter_export_data *fed = &export->exp_filter_data;
+ struct filter_dentry_data *fdd;
+ struct filter_file_data *ffd;
struct obd_run_ctxt saved;
char name[24];
struct file *file;
RETURN(ERR_PTR(-EINVAL));
}
+ ffd = kmem_cache_alloc(filter_open_cache, SLAB_KERNEL);
+ if (!ffd) {
+ CERROR("obdfilter: out of memory\n");
+ RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ /* We preallocate this to avoid blocking while holding fo_fddlock */
+ fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
+ if (!fdd) {
+ CERROR("obdfilter: out of memory\n");
+ GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
+ }
+
filter_id(name, id, type);
- push_ctxt(&saved, &obddev->u.filter.fo_ctxt, NULL);
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
file = filp_open(name, O_RDONLY | O_LARGEFILE, 0 /* type? */);
pop_ctxt(&saved);
- CDEBUG(D_INODE, "opening obdo %s: rc = %p\n", name, file);
-
if (IS_ERR(file))
- file = NULL;
+ GOTO(out_fdd, file);
+
+ spin_lock(&filter->fo_fddlock);
+ if (file->f_dentry->d_fsdata) {
+ spin_unlock(&filter->fo_fddlock);
+ kmem_cache_free(filter_dentry_cache, fdd);
+ fdd = file->f_dentry->d_fsdata;
+ LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
+ /* should only happen during client recovery */
+ if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
+ CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id);
+ atomic_inc(&fdd->fdd_open_count);
+ } else {
+ atomic_set(&fdd->fdd_open_count, 1);
+ spin_lock_init(&fdd->fdd_lock);
+ fdd->fdd_flags = 0;
+ /* If this is racy, then we can use {cmp}xchg and atomic_add */
+ file->f_dentry->d_fsdata = fdd;
+ spin_unlock(&filter->fo_fddlock);
+ }
+
+ get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
+ ffd->ffd_file = file;
+ file->private_data = ffd;
+
+ spin_lock(&fed->fed_lock);
+ list_add(&ffd->ffd_export_list, &fed->fed_open_head);
+ spin_unlock(&fed->fed_lock);
+
+ CDEBUG(D_INODE, "opening objid "LPX64": rc = %p\n", id, file);
+
+out:
RETURN(file);
+
+out_fdd:
+ kmem_cache_free(filter_dentry_cache, fdd);
+out_ffd:
+ ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
+ kmem_cache_free(filter_open_cache, ffd);
+ goto out;
}
-static struct dentry *filter_parent(struct obd_device *obddev, obd_mode mode)
+/* Caller must hold i_sem on dir_dentry->d_inode */
+static int filter_destroy_internal(struct obd_device *obd,
+ struct dentry *dir_dentry,
+ struct dentry *object_dentry)
{
- struct filter_obd *filter = &obddev->u.filter;
+ struct obd_run_ctxt saved;
+ struct inode *inode = object_dentry->d_inode;
+ int rc;
- return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+ if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
+ CERROR("destroying objid %*s nlink = %d, count = %d\n",
+ object_dentry->d_name.len,
+ object_dentry->d_name.name,
+ inode->i_nlink, atomic_read(&inode->i_count));
+ }
+
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
+ /* XXX unlink from PENDING directory now too */
+ pop_ctxt(&saved);
+
+ if (rc)
+ CERROR("error unlinking objid %*s: rc %d\n",
+ object_dentry->d_name.len,
+ object_dentry->d_name.name, rc);
+
+ return rc;
}
+static int filter_close_internal(struct obd_device *obd,
+ struct filter_file_data *ffd)
+{
+ struct file *filp = ffd->ffd_file;
+ struct dentry *object_dentry = dget(filp->f_dentry);
+ struct filter_dentry_data *fdd = object_dentry->d_fsdata;
+ int rc, rc2 = 0;
+ ENTRY;
+
+ LASSERT(filp->private_data == ffd);
+ LASSERT(fdd);
+
+ rc = filp_close(filp, 0);
+
+ if (atomic_dec_and_test(&fdd->fdd_open_count) &&
+ fdd->fdd_flags & FILTER_FLAG_DESTROY) {
+ struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
+
+ down(&dir_dentry->d_inode->i_sem);
+ rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
+ if (rc2 && !rc)
+ rc = rc2;
+ up(&dir_dentry->d_inode->i_sem);
+ }
+
+ f_dput(object_dentry);
+ kmem_cache_free(filter_open_cache, ffd);
+
+ RETURN(rc);
+}
/* obd methods */
static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
obd_uuid_t cluuid, struct recovd_obd *recovd,
ptlrpc_recovery_cb_t recover)
{
+ struct obd_export *exp;
int rc;
+
ENTRY;
MOD_INC_USE_COUNT;
rc = class_connect(conn, obd, cluuid);
if (rc)
- MOD_DEC_USE_COUNT;
+ GOTO(out_dec, rc);
+ exp = class_conn2export(conn);
+ LASSERT(exp);
+
+ INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
+ spin_lock_init(&exp->exp_filter_data.fed_lock);
+out:
RETURN(rc);
+
+out_dec:
+ MOD_DEC_USE_COUNT;
+ goto out;
}
static int filter_disconnect(struct lustre_handle *conn)
{
- struct obd_export *export = class_conn2export(conn);
+ struct obd_export *exp = class_conn2export(conn);
+ struct filter_export_data *fed;
int rc;
ENTRY;
- ldlm_cancel_locks_for_export(export);
+ LASSERT(exp);
+ fed = &exp->exp_filter_data;
+ spin_lock(&exp->exp_filter_data);
+ while(!list_empty(&fed->fed_open_head)) {
+ struct filter_file_data *ffd;
+ ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
+ ffd_export_list);
+ list_del(&ffd->ffd_export_list);
+ spin_unlock(&exp->exp_filter_data);
+
+ CERROR("force closing file %*s on disconnect\n",
+ ffd->ffd_file->f_dentry->d_name.len,
+ ffd->ffd_file->f_dentry->d_name.name);
+
+ filter_close_internal(exp->exp_obd, ffd);
+ spin_lock(&exp->exp_filter_data);
+ }
+ spin_unlock(&exp->exp_filter_data);
+
+ ldlm_cancel_locks_for_export(exp);
rc = class_disconnect(conn);
if (!rc)
MOD_DEC_USE_COUNT;
}
/* mount the file system (secretly) */
-static int filter_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
{
struct obd_ioctl_data* data = buf;
struct filter_obd *filter;
if (IS_ERR(mnt))
GOTO(err_dec, err);
- filter = &obddev->u.filter;;
+ filter = &obd->u.filter;;
filter->fo_vfsmnt = mnt;
filter->fo_fstype = strdup(data->ioc_inlbuf2);
filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
+ CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, filter->fo_vfsmnt);
/* XXX is this even possible if do_kern_mount succeeded? */
if (!filter->fo_sb)
GOTO(err_kfree, err = -ENODEV);
filter->fo_ctxt.pwd = mnt->mnt_root;
filter->fo_ctxt.fs = get_ds();
- err = filter_prep(obddev);
+ err = filter_prep(obd);
if (err)
GOTO(err_kfree, err);
spin_lock_init(&filter->fo_lock);
+ INIT_LIST_HEAD(&filter->fo_export_list);
- obddev->obd_namespace =
+ obd->obd_namespace =
ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
- if (obddev->obd_namespace == NULL)
+ if (obd->obd_namespace == NULL)
LBUG();
ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- "filter_ldlm_client", &obddev->obd_ldlm_client);
+ "filter_ldlm_client", &obd->obd_ldlm_client);
RETURN(0);
}
-static int filter_cleanup(struct obd_device * obddev)
+static int filter_cleanup(struct obd_device *obd)
{
struct super_block *sb;
ENTRY;
- if (!list_empty(&obddev->obd_exports)) {
+ if (!list_empty(&obd->obd_exports)) {
CERROR("still has clients!\n");
- class_disconnect_all(obddev);
- if (!list_empty(&obddev->obd_exports)) {
+ class_disconnect_all(obd);
+ if (!list_empty(&obd->obd_exports)) {
CERROR("still has exports after forced cleanup?\n");
RETURN(-EBUSY);
}
}
- ldlm_namespace_free(obddev->obd_namespace);
+ ldlm_namespace_free(obd->obd_namespace);
- sb = obddev->u.filter.fo_sb;
- if (!obddev->u.filter.fo_sb)
+ sb = obd->u.filter.fo_sb;
+ if (!obd->u.filter.fo_sb)
RETURN(0);
- filter_post(obddev);
+ filter_post(obd);
shrink_dcache_parent(sb->s_root);
unlock_kernel();
- mntput(obddev->u.filter.fo_vfsmnt);
- obddev->u.filter.fo_sb = 0;
- kfree(obddev->u.filter.fo_fstype);
+ mntput(obd->u.filter.fo_vfsmnt);
+ obd->u.filter.fo_sb = 0;
+ kfree(obd->u.filter.fo_fstype);
lock_kernel();
}
-static inline void filter_from_inode(struct obdo *oa, struct inode *inode,
- int valid)
+static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
{
int type = oa->o_mode & S_IFMT;
ENTRY;
EXIT;
}
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *md)
+static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
{
- struct obd_device *obddev = class_conn2obd(conn);
- struct dentry *dentry;
- int rc = 0;
+ struct filter_file_data *ffd = NULL;
ENTRY;
- if (!class_conn2export(conn)) {
- CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
- RETURN(-EINVAL);
+ if (!handle || !handle->addr)
+ RETURN(NULL);
+
+ ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
+ if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
+ RETURN(NULL);
+
+ if (ffd->ffd_servercookie != handle->cookie)
+ RETURN(NULL);
+
+ LASSERT(ffd->ffd_file->private_data == ffd);
+ RETURN(ffd);
+}
+
+static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
+ struct obdo *oa, char *what)
+{
+ struct dentry *dentry = NULL;
+
+ if (oa->o_valid & OBD_MD_FLHANDLE) {
+ struct lustre_handle *ost_handle = obdo_handle(oa);
+ struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
+
+ if (ffd)
+ dentry = dget(ffd->ffd_file->f_dentry);
}
- obddev = class_conn2obd(conn);
- dentry = filter_fid2dentry(obddev, filter_parent(obddev, oa->o_mode),
- oa->o_id, oa->o_mode);
- if (IS_ERR(dentry))
- RETURN(PTR_ERR(dentry));
+ if (!dentry) {
+ struct obd_device *obd = class_conn2obd(conn);
+ if (!obd) {
+ CERROR("invalid client "LPX64"\n", conn->addr);
+ RETURN(ERR_PTR(-EINVAL));
+ }
+ dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
+ oa->o_id, oa->o_mode);
+ }
if (!dentry->d_inode) {
- CERROR("getattr on non-existent object: "LPU64"\n", oa->o_id);
- GOTO(out_getattr, rc = -ENOENT);
+ CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
+ f_dput(dentry);
+ dentry = ERR_PTR(-ENOENT);
}
+ return dentry;
+}
+
+#define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__)
+
+static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
+{
+ struct dentry *dentry = NULL;
+ int rc = 0;
+ ENTRY;
+
+ dentry = filter_oa2dentry(conn, oa);
+ if (IS_ERR(dentry))
+ RETURN(PTR_ERR(dentry));
+
filter_from_inode(oa, dentry->d_inode, oa->o_valid);
-out_getattr:
f_dput(dentry);
RETURN(rc);
}
int rc;
ENTRY;
- iattr_from_obdo(&iattr, oa, oa->o_valid);
- iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
- dentry = filter_fid2dentry(obd, filter_parent(obd, iattr.ia_mode),
- oa->o_id, iattr.ia_mode);
+ dentry = filter_oa2dentry(conn, oa);
+
if (IS_ERR(dentry))
RETURN(PTR_ERR(dentry));
+ iattr_from_obdo(&iattr, oa, oa->o_valid);
+ iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
inode = dentry->d_inode;
- if (!inode) {
- CERROR("setattr on non-existent object: "LPU64"\n", oa->o_id);
- GOTO(out_setattr, rc = -ENOENT);
- }
lock_kernel();
if (iattr.ia_valid & ATTR_SIZE)
}
unlock_kernel();
-out_setattr:
f_dput(dentry);
RETURN(rc);
}
struct lov_stripe_md *ea)
{
struct obd_export *export;
- struct obd_device *obd;
- struct dentry *dentry;
- /* ENTRY; */
+ struct lustre_handle *handle;
+ struct filter_file_data *ffd;
+ struct file *filp;
+ int rc = 0;
+ ENTRY;
export = class_conn2export(conn);
if (!export) {
RETURN(-EINVAL);
}
- obd = export->exp_obd;
- dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
- oa->o_id, oa->o_mode);
- if (IS_ERR(dentry))
- RETURN(PTR_ERR(dentry));
-
- if (!dentry->d_inode) {
- CERROR("opening non-existent objid "LPX64"\n", oa->o_id);
- RETURN(-ENOENT);
- }
+ filp = filter_obj_open(export, oa->o_id, oa->o_mode);
+ if (IS_ERR(filp))
+ GOTO(out, rc = PTR_ERR(filp));
- filter_from_inode(oa, dentry->d_inode, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
- OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+ filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
- return 0;
+ ffd = filp->private_data;
+ handle = obdo_handle(oa);
+ handle->addr = (__u64)(unsigned long)ffd;
+ handle->cookie = ffd->ffd_servercookie;
+ oa->o_valid |= OBD_MD_FLHANDLE;
+out:
+ RETURN(rc);
} /* filter_open */
static int filter_close(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *ea)
{
- struct obd_device *obd;
- struct dentry *dentry;
- /* ENTRY; */
+ struct obd_export *exp;
+ struct filter_file_data *ffd;
+ int rc;
+ ENTRY;
- obd = class_conn2obd(conn);
- if (!obd) {
+ exp = class_conn2export(conn);
+ if (!exp) {
CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
RETURN(-EINVAL);
}
- dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
- oa->o_id, oa->o_mode);
- if (IS_ERR(dentry))
- RETURN(PTR_ERR(dentry));
- LASSERT(atomic_read(&dentry->d_count) > 1);
+ if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
+ CERROR("no handle for close of objid "LPX64"\n", oa->o_id);
+ RETURN(-EINVAL);
+ }
- f_dput(dentry); /* for the close */
- f_dput(dentry); /* for this call */
- return 0;
+ ffd = filter_handle2ffd(obdo_handle(oa));
+ if (!ffd) {
+ struct lustre_handle *handle = obdo_handle(oa);
+ CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
+ handle->addr, handle->cookie);
+ RETURN(-ESTALE);
+ }
+
+ spin_lock(&exp->exp_filter_data);
+ list_del(&ffd->ffd_export_list);
+ spin_unlock(&exp->exp_filter_data);
+
+ rc = filter_close_internal(exp->exp_obd, ffd);
+
+ RETURN(rc);
} /* filter_close */
static int filter_create(struct lustre_handle* conn, struct obdo *oa,
struct lov_stripe_md **ea)
{
+ struct obd_device *obd = class_conn2obd(conn);
char name[64];
struct obd_run_ctxt saved;
struct dentry *new;
int mode;
- struct obd_device *obd = class_conn2obd(conn);
- struct filter_obd *filter = &obd->u.filter;
struct iattr;
ENTRY;
//filter_id(name, oa->o_id, oa->o_mode);
sprintf(name, LPU64, oa->o_id);
- mode = (oa->o_mode & ~S_IFMT) | S_IFREG;
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- new = simple_mknod(filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT], name,
- mode);
+ new = simple_mknod(filter_parent(obd, oa->o_mode), name, mode);
pop_ctxt(&saved);
if (IS_ERR(new)) {
CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(new));
oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
filter_from_inode(oa, new->d_inode, oa->o_valid);
- dput(new);
+ f_dput(new);
return 0;
}
static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *ea)
{
- struct obd_device *obd;
- struct filter_obd *filter;
- struct obd_run_ctxt saved;
- struct inode *inode;
+ struct obd_device *obd = class_conn2obd(conn);
struct dentry *dir_dentry, *object_dentry;
int rc;
ENTRY;
- obd = class_conn2obd(conn);
if (!obd) {
CERROR("invalid client "LPX64"\n", conn->addr);
RETURN(-EINVAL);
dir_dentry = filter_parent(obd, oa->o_mode);
down(&dir_dentry->d_inode->i_sem);
- object_dentry = filter_fid2dentry(obd, dir_dentry, oa->o_id,
- oa->o_mode);
+ object_dentry = filter_oa2dentry(conn, oa);
if (IS_ERR(object_dentry))
GOTO(out, rc = -ENOENT);
- inode = object_dentry->d_inode;
- if (!inode) {
- CERROR("trying to destroy negative inode "LPX64"!\n", oa->o_id);
- GOTO(out, rc = -ENOENT);
- }
-
- if (inode->i_nlink != 1) {
- CERROR("destroying inode with nlink = %d\n", inode->i_nlink);
- LBUG();
- inode->i_nlink = 1;
- }
- inode->i_mode = S_IFREG;
+ if (object_dentry->d_fsdata) {
+ struct filter_dentry_data *fdd = object_dentry->d_fsdata;
- if (atomic_read(&inode->i_count) > 1) {
-#warning FIXME: need to handle open-unlinked case and move to hold dir
- CERROR("inode has count %d\n", atomic_read(&inode->i_count));
+ if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
+ fdd->fdd_flags |= FILTER_FLAG_DESTROY;
+ /* XXX put into PENDING directory in case of crash */
+ }
+ GOTO(out_dput, rc = 0);
}
- filter = &obd->u.filter;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
-
- rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
- pop_ctxt(&saved);
+ rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
+out_dput:
f_dput(object_dentry);
EXIT;
/* NB count and offset are used for punch, but not truncate */
static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *md,
+ struct lov_stripe_md *lsm,
obd_off start, obd_off end)
{
int error;
struct brw_page *pga, brw_callback_t callback,
struct io_cb_data *data)
{
+ struct obd_export *export = class_conn2export(conn);
struct obd_run_ctxt saved;
struct super_block *sb;
int pnum; /* index to pages (bufs) */
unsigned long retval;
int error;
struct file *file;
- struct obd_device *obd = class_conn2obd(conn);
int pg;
ENTRY;
- if (!obd) {
+ if (!export) {
CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
RETURN(-EINVAL);
}
- sb = obd->u.filter.fo_sb;
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ sb = export->exp_obd->u.filter.fo_sb;
+ push_ctxt(&saved, &export->exp_obd->u.filter.fo_ctxt, NULL);
pnum = 0; /* pnum indexes buf 0..num_pages */
- file = filter_obj_open(obd, lsm->lsm_object_id, S_IFREG);
+ file = filter_obj_open(export, lsm->lsm_object_id, S_IFREG);
if (IS_ERR(file))
GOTO(out, retval = PTR_ERR(file));
ENTRY;
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ lock_kernel();
journal_save = current->journal_info;
LASSERT(!journal_save);
current->journal_info = private;
+ unlock_kernel();
for (i = 0, o = obj, r = res; i < objcount; i++, o++) {
int j;
for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) {
f_dput(r->dentry);
}
}
+ lock_kernel();
current->journal_info = journal_save;
+ unlock_kernel();
if (!found_locked)
goto out_ctxt;
static int __init obdfilter_init(void)
{
printk(KERN_INFO "Filtering OBD driver v0.001, info@clusterfs.com\n");
+ filter_open_cache = kmem_cache_create("ll_filter_fdata",
+ sizeof(struct filter_file_data),
+ 0, 0, NULL, NULL);
+ if (!filter_open_cache)
+ RETURN(-ENOMEM);
+
+ filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
+ sizeof(struct filter_dentry_data),
+ 0, 0, NULL, NULL);
+ if (!filter_dentry_cache) {
+ kmem_cache_destroy(filter_open_cache);
+ RETURN(-ENOMEM);
+ }
+
return class_register_type(&filter_obd_ops, OBD_FILTER_DEVICENAME);
}
static void __exit obdfilter_exit(void)
{
class_unregister_type(OBD_FILTER_DEVICENAME);
+ if (kmem_cache_destroy(filter_dentry_cache))
+ CERROR("couldn't free obdfilter dentry cache\n");
+ if (kmem_cache_destroy(filter_open_cache))
+ CERROR("couldn't free obdfilter open cache\n");
}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");