#include <linux/mount.h>
#endif
-static kmem_cache_t *filter_open_cache;
-static kmem_cache_t *filter_dentry_cache;
+enum {
+ LPROC_FILTER_READS = 0,
+ LPROC_FILTER_READ_BYTES = 1,
+ LPROC_FILTER_WRITES = 2,
+ LPROC_FILTER_WRITE_BYTES = 3,
+ LPROC_FILTER_LAST = LPROC_FILTER_WRITE_BYTES +1
+};
/* should be generic per-obd stats... */
struct xprocfs_io_stat {
snprintf (dirname, sizeof (dirname), "sys/%s", name);
- xprocfs_dir = proc_mkdir ("sys/obdfilter", NULL);
+ xprocfs_dir = proc_mkdir (dirname, NULL);
if (xprocfs_dir == NULL) {
- CERROR ("Can't make dir\n");
+ CERROR ("Can't make procfs dir %s\n", dirname);
return;
}
return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
}
-static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
- int error)
+static void filter_ffd_addref(void *ffdp)
+{
+ struct filter_file_data *ffd = ffdp;
+
+ atomic_inc(&ffd->ffd_refcount);
+ CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
+ atomic_read(&ffd->ffd_refcount));
+}
+
+static struct filter_file_data *filter_ffd_new(void)
{
- CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
- last_rcvd, error);
- if (!error && last_rcvd > obd->obd_last_committed)
- obd->obd_last_committed = last_rcvd;
+ struct filter_file_data *ffd;
+
+ OBD_ALLOC(ffd, sizeof *ffd);
+ if (ffd == NULL) {
+ CERROR("out of memory\n");
+ return NULL;
+ }
+
+ atomic_set(&ffd->ffd_refcount, 2);
+
+ INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
+ class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
+
+ return ffd;
}
-void filter_start_transno(struct obd_export *export)
+static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
{
-#ifdef FILTER_TRANSNO_SEM
- struct obd_device * obd = export->exp_obd;
+ struct filter_file_data *ffd = NULL;
ENTRY;
+ LASSERT(handle != NULL);
+ ffd = class_handle2object(handle->cookie);
+ if (ffd != NULL)
+ LASSERT(ffd->ffd_file->private_data == ffd);
+ RETURN(ffd);
+}
- down(&obd->u.filter.fo_transno_sem);
-#endif
+static void filter_ffd_put(struct filter_file_data *ffd)
+{
+ CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
+ atomic_read(&ffd->ffd_refcount) - 1);
+ LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
+ atomic_read(&ffd->ffd_refcount) < 0x5a5a);
+ if (atomic_dec_and_test(&ffd->ffd_refcount)) {
+ LASSERT(list_empty(&ffd->ffd_handle.h_link));
+ OBD_FREE(ffd, sizeof *ffd);
+ }
}
+static void filter_ffd_destroy(struct filter_file_data *ffd)
+{
+ class_handle_unhash(&ffd->ffd_handle);
+ filter_ffd_put(ffd);
+}
+
+static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
+{
+ obd_transno_commit_cb(obd, transno, error);
+}
/* Assumes caller has already pushed us into the kernel context. */
int filter_finish_transno(struct obd_export *export, void *handle,
struct obd_trans_info *oti, int rc)
ssize_t written;
/* Propagate error code. */
- if (rc) {
-#ifdef FILTER_TRANSNO_SEM
- up(&filter->fo_transno_sem);
-#endif
+ if (rc)
RETURN(rc);
- }
- if (!(obd->obd_flags & OBD_REPLAYABLE)) {
- RETURN(0);
- }
+ if (!obd->obd_replayable)
+ RETURN(rc);
/* we don't allocate new transnos for replayed requests */
#if 0
off = fed->fed_lr_off;
-#ifndef FILTER_TRANSNO_SEM
spin_lock(&filter->fo_translock);
-#endif
last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
-#ifndef FILTER_TRANSNO_SEM
spin_unlock(&filter->fo_translock);
-#endif
if (oti)
oti->oti_transno = last_rcvd;
fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
#else
fcd->fcd_last_xid = 0;
#endif
- fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_last_rcvd_cb);
+ fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
&off);
CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
-#ifdef FILTER_TRANSNO_SEM
- up(&filter->fo_transno_sem);
-#endif
if (written == sizeof(*fcd))
RETURN(0);
CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
/* write the pathname into the string */
static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
- obd_mode mode)
+ obd_mode mode)
{
- if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
+ if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
else
sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
static void filter_drelease(struct dentry *dentry)
{
if (dentry->d_fsdata)
- kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
+ OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
}
struct dentry_operations filter_dops = {
* Otherwise, we have just read the data from the last_rcvd file and
* we know its offset.
*/
-int filter_client_add(struct filter_obd *filter,
+int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
struct filter_export_data *fed, int cl_idx)
{
+ unsigned long *bitmap = filter->fo_last_rcvd_slots;
int new_client = (cl_idx == -1);
- LASSERT(filter->fo_last_rcvd_slots != NULL);
+ LASSERT(bitmap != NULL);
+
+ /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
+ if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
+ RETURN(0);
/* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
* there's no need for extra complication here
*/
if (new_client) {
- cl_idx = find_first_zero_bit(filter->fo_last_rcvd_slots,
- FILTER_LR_MAX_CLIENTS);
+ cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
repeat:
if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
return -ENOMEM;
}
- if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
+ if (test_and_set_bit(cl_idx, bitmap)) {
CERROR("FILTER client %d: found bit is set in bitmap\n",
cl_idx);
- cl_idx = find_next_zero_bit(filter->fo_last_rcvd_slots,
+ cl_idx = find_next_zero_bit(bitmap,
FILTER_LR_MAX_CLIENTS,
cl_idx);
goto repeat;
}
} else {
- if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
+ if (test_and_set_bit(cl_idx, bitmap)) {
CERROR("FILTER client %d: bit already set in bitmap!\n",
cl_idx);
LBUG();
struct obd_run_ctxt saved;
loff_t off = fed->fed_lr_off;
ssize_t written;
+ void *handle;
CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
push_ctxt(&saved, &filter->fo_ctxt, NULL);
- written = lustre_fwrite(filter->fo_rcvd_filp,
+ /* Transaction eeded to fix for bug 1403 */
+ handle = fsfilt_start(obd,
+ filter->fo_rcvd_filp->f_dentry->d_inode,
+ FSFILT_OP_SETATTR);
+ if (IS_ERR(handle)) {
+ written = PTR_ERR(handle);
+ CERROR("unable to start transaction: rc %d\n",
+ (int)written);
+ } else {
+ written = lustre_fwrite(filter->fo_rcvd_filp,
(char *)fed->fed_fcd,
sizeof(*fed->fed_fcd), &off);
+ fsfilt_commit(obd,
+ filter->fo_rcvd_filp->f_dentry->d_inode,
+ handle, 0);
+ }
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
if (written != sizeof(*fed->fed_fcd)) {
return 0;
}
-int filter_client_free(struct obd_export *exp)
+int filter_client_free(struct obd_export *exp, int failover)
{
struct filter_export_data *fed = &exp->exp_filter_data;
struct filter_obd *filter = &exp->exp_obd->u.filter;
if (!fed->fed_fcd)
RETURN(0);
+ if (failover != 0) {
+ OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
+ RETURN(0);
+ }
+
LASSERT(filter->fo_last_rcvd_slots != NULL);
off = fed->fed_lr_off;
sizeof(zero_fcd), &off);
/* XXX: this write gets lost sometimes, unless this sync is here. */
- file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
+ if (written > 0)
+ file_fsync(filter->fo_rcvd_filp,
+ filter->fo_rcvd_filp->f_dentry, 1);
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
if (written != sizeof(zero_fcd)) {
RETURN(-ENOMEM);
filter->fo_fsd = fsd;
- OBD_ALLOC(filter->fo_last_rcvd_slots,
+ OBD_ALLOC(filter->fo_last_rcvd_slots,
FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
if (filter->fo_last_rcvd_slots == NULL) {
OBD_FREE(fsd, sizeof(*fsd));
* the header. If we find clients with higher last_rcvd values
* then those clients may need recovery done.
*/
- if (!(obd->obd_flags & OBD_REPLAYABLE)) {
+ if (!obd->obd_replayable) {
CERROR("%s: recovery support OFF\n", obd->obd_name);
GOTO(out, rc = 0);
}
LPU64"\n", fcd->fcd_uuid, cl_idx,
last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
le64_to_cpu(fcd->fcd_mount_count), mount_count);
- /* disabled until OST recovery is actually working */
-
- if (!exp) {
+ if (exp == NULL) {
+ /* XXX this rc is ignored */
rc = -ENOMEM;
break;
}
sizeof exp->exp_client_uuid.uuid);
fed = &exp->exp_filter_data;
fed->fed_fcd = fcd;
- filter_client_add(filter, fed, cl_idx);
+ filter_client_add(obd, filter, fed, cl_idx);
/* create helper if export init gets more complex */
INIT_LIST_HEAD(&fed->fed_open_head);
spin_lock_init(&fed->fed_lock);
fcd = NULL;
obd->obd_recoverable_clients++;
+ class_export_put(exp);
} else {
CDEBUG(D_INFO,
"discarded client %d UUID '%s' count "LPU64"\n",
if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
- obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
+ obd->obd_last_committed =
+ le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
if (obd->obd_recoverable_clients) {
CERROR("RECOVERY: %d recoverable clients, last_rcvd "
LPU64"\n", obd->obd_recoverable_clients,
le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
- obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
- obd->obd_flags |= OBD_RECOVERING;
+ obd->obd_next_recovery_transno =
+ obd->obd_last_committed + 1;
+ obd->obd_recovering = 1;
}
- if (fcd)
- OBD_FREE(fcd, sizeof(*fcd));
-
}
+ if (fcd)
+ OBD_FREE(fcd, sizeof(*fcd));
+
out:
fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
OBD_FREE(filter->fo_dentry_O_sub,
filter->fo_subdir_count * sizeof(dentry));
err_client:
- class_disconnect_all(obd);
+ class_disconnect_exports(obd, 0);
err_filp:
if (filp_close(file, 0))
CERROR("can't close %s after error\n", LAST_RCVD);
RETURN(dchild);
}
+/* direct cut-n-paste of mds_blocking_ast() */
+int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ int do_ast;
+ ENTRY;
+
+ if (flag == LDLM_CB_CANCELING) {
+ /* Don't need to do anything here. */
+ RETURN(0);
+ }
+
+ /* XXX layering violation! -phil */
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
+ * such that mds_blocking_ast is called just before l_i_p takes the
+ * ns_lock, then by the time we get the lock, we might not be the
+ * correct blocking function anymore. So check, and return early, if
+ * so. */
+ if (lock->l_blocking_ast != filter_blocking_ast) {
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ RETURN(0);
+ }
+
+ lock->l_flags |= LDLM_FL_CBPENDING;
+ do_ast = (!lock->l_readers && !lock->l_writers);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+ if (do_ast) {
+ struct lustre_handle lockh;
+ int rc;
+
+ LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc < 0)
+ CERROR("ldlm_cli_cancel: %d\n", rc);
+ } else {
+ LDLM_DEBUG(lock, "Lock still has references, will be "
+ "cancelled later");
+ }
+ RETURN(0);
+}
+
+static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
+ int lock_mode, struct lustre_handle *lockh)
+{
+ struct ldlm_res_id res_id = { .name = {0} };
+ int flags = 0, rc;
+ ENTRY;
+
+ res_id.name[0] = de->d_inode->i_ino;
+ res_id.name[1] = de->d_inode->i_generation;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
+ res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+ &flags, ldlm_completion_ast,
+ filter_blocking_ast, NULL, lockh);
+
+ RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
+}
+
static inline struct dentry *filter_parent(struct obd_device *obd,
obd_mode mode, obd_id objid)
{
struct filter_obd *filter = &obd->u.filter;
- LASSERT((mode & S_IFMT) == S_IFREG); /* only regular files for now */
- if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
+ LASSERT(S_ISREG(mode)); /* only regular files for now */
+ if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
}
+static inline struct dentry *filter_parent_lock(struct obd_device *obd,
+ obd_mode mode, obd_id objid,
+ int lock_mode,
+ struct lustre_handle *lockh)
+{
+ struct dentry *de = filter_parent(obd, mode, objid);
+ int rc;
+
+ if (IS_ERR(de))
+ return de;
+
+ rc = filter_lock_dentry(obd, de, lock_mode, lockh);
+ return rc ? ERR_PTR(rc) : de;
+}
+
static struct file *filter_obj_open(struct obd_export *export,
- __u64 id, __u32 type)
+ __u64 id, __u32 type, int parent_mode,
+ struct lustre_handle *parent_lockh)
{
- struct filter_obd *filter = &export->exp_obd->u.filter;
+ struct obd_device *obd = export->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
struct super_block *sb = filter->fo_sb;
- struct dentry *dentry;
+ struct dentry *dchild = NULL, *parent;
struct filter_export_data *fed = &export->exp_filter_data;
- struct filter_dentry_data *fdd;
- struct filter_file_data *ffd;
+ struct filter_dentry_data *fdd = NULL;
+ struct filter_file_data *ffd = NULL;
struct obd_run_ctxt saved;
char name[24];
struct file *file;
+ int len, cleanup_phase = 0;
ENTRY;
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+
if (!sb || !sb->s_dev) {
CERROR("fatal: device not initialized.\n");
- RETURN(ERR_PTR(-ENXIO));
+ GOTO(cleanup, file = ERR_PTR(-ENXIO));
}
if (!id) {
CERROR("fatal: invalid obdo "LPU64"\n", id);
- RETURN(ERR_PTR(-ESTALE));
+ GOTO(cleanup, file = ERR_PTR(-ESTALE));
}
if (!(type & S_IFMT)) {
CERROR("OBD %s, object "LPU64" has bad type: %o\n",
__FUNCTION__, id, type);
- RETURN(ERR_PTR(-EINVAL));
+ GOTO(cleanup, file = ERR_PTR(-EINVAL));
}
- PORTAL_SLAB_ALLOC(ffd, filter_open_cache, sizeof(*ffd));
- if (!ffd) {
+ ffd = filter_ffd_new();
+ if (ffd == NULL) {
CERROR("obdfilter: out of memory\n");
- RETURN(ERR_PTR(-ENOMEM));
+ GOTO(cleanup, file = ERR_PTR(-ENOMEM));
}
+ cleanup_phase = 1;
+
/* We preallocate this to avoid blocking while holding fo_fddlock */
- fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
- if (!fdd) {
+ OBD_ALLOC(fdd, sizeof *fdd);
+ if (fdd == NULL) {
CERROR("obdfilter: out of memory\n");
- GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
+ GOTO(cleanup, file = ERR_PTR(-ENOMEM));
}
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- file = filp_open(filter_id(name, filter, id, type),
- O_RDWR | O_LARGEFILE, type);
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ cleanup_phase = 2;
+
+ parent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
+ if (IS_ERR(parent))
+ GOTO(cleanup, file = (void *)parent);
+
+ cleanup_phase = 3;
+
+ len = snprintf(name, sizeof(name), LPU64, id);
+ dchild = lookup_one_len(name, parent, len);
+ if (IS_ERR(dchild))
+ GOTO(cleanup, file = (void *)dchild);
+ LASSERT(dchild->d_inode);
+ cleanup_phase = 4;
+
+ /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
+ mntget(filter->fo_vfsmnt);
+ file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
if (IS_ERR(file)) {
+ dchild = NULL; /* prevent a double dput in step 4 */
CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
- GOTO(out_fdd, file);
+ GOTO(cleanup, file);
}
- dentry = file->f_dentry;
spin_lock(&filter->fo_fddlock);
- if (dentry->d_fsdata) {
+ if (dchild->d_fsdata) {
spin_unlock(&filter->fo_fddlock);
- kmem_cache_free(filter_dentry_cache, fdd);
- fdd = dentry->d_fsdata;
- LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
+ OBD_FREE(fdd, sizeof *fdd);
+ fdd = dchild->d_fsdata;
/* should only happen during client recovery */
if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
fdd->fdd_flags = 0;
fdd->fdd_objid = id;
/* If this is racy, then we can use {cmp}xchg and atomic_add */
- dentry->d_fsdata = fdd;
+ dchild->d_fsdata = fdd;
spin_unlock(&filter->fo_fddlock);
}
- get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
ffd->ffd_file = file;
LASSERT(file->private_data == NULL);
file->private_data = ffd;
- if (!dentry->d_op)
- dentry->d_op = &filter_dops;
+ if (!dchild->d_op)
+ dchild->d_op = &filter_dops;
else
- LASSERT(dentry->d_op == &filter_dops);
+ LASSERT(dchild->d_op == &filter_dops);
spin_lock(&fed->fed_lock);
list_add(&ffd->ffd_export_list, &fed->fed_open_head);
spin_unlock(&fed->fed_lock);
CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
- EXIT;
-out:
- return file;
-
-out_fdd:
- kmem_cache_free(filter_dentry_cache, fdd);
-out_ffd:
- ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
- PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
- goto out;
+cleanup:
+ switch (cleanup_phase) {
+ case 4:
+ if (IS_ERR(file))
+ l_dput(dchild);
+ case 3:
+ if (IS_ERR(file))
+ ldlm_lock_decref(parent_lockh, parent_mode);
+ case 2:
+ if (IS_ERR(file))
+ OBD_FREE(fdd, sizeof *fdd);
+ case 1:
+ if (IS_ERR(file))
+ filter_ffd_destroy(ffd);
+ filter_ffd_put(ffd);
+ case 0:
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ }
+ RETURN(file);
}
/* Caller must hold i_sem on dir_dentry->d_inode */
RETURN(rc);
}
+/* If closing because we are failing this device, then
+ don't do the unlink on close.
+*/
static int filter_close_internal(struct obd_export *export,
struct filter_file_data *ffd,
- struct obd_trans_info *oti)
+ struct obd_trans_info *oti,
+ int failover)
{
struct obd_device *obd = export->exp_obd;
struct filter_obd *filter = &obd->u.filter;
struct file *filp = ffd->ffd_file;
struct dentry *object_dentry = dget(filp->f_dentry);
struct filter_dentry_data *fdd = object_dentry->d_fsdata;
- int rc, rc2;
+ struct lustre_handle parent_lockh;
+ int rc, rc2, cleanup_phase = 0;
+ struct dentry *dir_dentry;
+ struct obd_run_ctxt saved;
ENTRY;
LASSERT(filp->private_data == ffd);
rc = filp_close(filp, 0);
if (atomic_dec_and_test(&fdd->fdd_open_count) &&
- fdd->fdd_flags & FILTER_FLAG_DESTROY) {
- struct dentry *dir_dentry = filter_parent(obd, S_IFREG, fdd->fdd_objid);
- struct obd_run_ctxt saved;
+ fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
void *handle;
- down(&dir_dentry->d_inode->i_sem);
push_ctxt(&saved, &filter->fo_ctxt, NULL);
- filter_start_transno(export);
+ cleanup_phase = 1;
+
+ dir_dentry = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
+ LCK_PW, &parent_lockh);
+ if (IS_ERR(dir_dentry))
+ GOTO(cleanup, rc = PTR_ERR(dir_dentry));
+ cleanup_phase = 2;
+
handle = fsfilt_start(obd, dir_dentry->d_inode,
FSFILT_OP_UNLINK);
- if (IS_ERR(handle)) {
- rc = filter_finish_transno(export, handle, oti,
- PTR_ERR(handle));
- GOTO(out, rc);
- }
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
+
/* XXX unlink from PENDING directory now too */
rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
if (rc2 && !rc)
rc = rc2;
rc = filter_finish_transno(export, handle, oti, rc);
- rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
+ rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
if (rc2) {
CERROR("error on commit, err = %d\n", rc2);
if (!rc)
rc = rc2;
}
- out:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- up(&dir_dentry->d_inode->i_sem);
}
- f_dput(object_dentry);
- PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
+cleanup:
+ switch(cleanup_phase) {
+ case 2:
+ if (rc || oti == NULL) {
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+ } else {
+ memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
+ sizeof(parent_lockh));
+ oti->oti_ack_locks[0].mode = LCK_PW;
+ }
+ case 1:
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ case 0:
+ f_dput(object_dentry);
+ filter_ffd_destroy(ffd);
+ break;
+ default:
+ CERROR("invalid cleanup_phase %d\n", cleanup_phase);
+ LBUG();
+ }
RETURN(rc);
}
if (IS_ERR(mnt))
GOTO(err_ops, rc);
-#if OST_RECOVERY
- obd->obd_flags |= OBD_REPLAYABLE;
-#endif
+ if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
+ if (*data->ioc_inlbuf3 == 'f') {
+ obd->obd_replayable = 1;
+ obd_sync_filter = 1;
+ CERROR("%s: configured for recovery and sync write\n",
+ obd->obd_name);
+ } else {
+ CERROR("unrecognised flag '%c'\n",
+ *data->ioc_inlbuf3);
+ }
+ }
filter = &obd->u.filter;
filter->fo_vfsmnt = mnt;
if (rc)
GOTO(err_kfree, rc);
-#ifdef FILTER_TRANSNO_SEM
- init_MUTEX(&filter->fo_transno_sem);
-#else
spin_lock_init(&filter->fo_translock);
-#endif
spin_lock_init(&filter->fo_fddlock);
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
{
- return filter_common_setup(obd, len, buf, NULL);
+ struct obd_ioctl_data* data = buf;
+ char *option = NULL;
+
+ if (!strcmp(data->ioc_inlbuf2, "ext3"))
+ option = "asyncdel";
+
+ return filter_common_setup(obd, len, buf, option);
}
/* sanobd setup methods - use a specific mount option */
RETURN(-EINVAL);
/* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
- if (!strcmp(data->ioc_inlbuf2, "extN") ||
- !strcmp(data->ioc_inlbuf2, "ext3"))
+ if (!strcmp(data->ioc_inlbuf2, "extN"))
option = "data=writeback";
+ else if (!strcmp(data->ioc_inlbuf2, "ext3"))
+ option = "data=writeback,asyncdel";
else
LBUG(); /* just a reminder */
return filter_common_setup(obd, len, buf, option);
}
-static int filter_cleanup(struct obd_device *obd)
+static int filter_cleanup(struct obd_device *obd, int force, int failover)
{
struct super_block *sb;
ENTRY;
+ if (failover)
+ CERROR("%s: shutting down for failover; client state will"
+ " be preserved.\n", obd->obd_name);
+
if (!list_empty(&obd->obd_exports)) {
- CERROR("still has clients!\n");
- class_disconnect_all(obd);
+ CERROR("%s: still has clients!\n", obd->obd_name);
+ class_disconnect_exports(obd, failover);
if (!list_empty(&obd->obd_exports)) {
CERROR("still has exports after forced cleanup?\n");
RETURN(-EBUSY);
shrink_dcache_parent(sb->s_root);
unlock_kernel();
+
+ if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
+ CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
+ atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
+ }
+
mntput(obd->u.filter.fo_vfsmnt);
obd->u.filter.fo_sb = 0;
+/* destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
+
kfree(obd->u.filter.fo_fstype);
fsfilt_put_ops(obd->obd_fsops);
int filter_attach(struct obd_device *dev, obd_count len, void *data)
{
struct lprocfs_static_vars lvars;
+ struct lprocfs_counters* cntrs;
+ int rc;
lprocfs_init_vars(&lvars);
- return lprocfs_obd_attach(dev, lvars.obd_vars);
+ rc = lprocfs_obd_attach(dev, lvars.obd_vars);
+ if (rc != 0)
+ return rc;
+
+ rc = lprocfs_alloc_obd_counters(dev, LPROC_FILTER_LAST);
+ if (rc != 0)
+ return rc;
+
+ /* Init obdfilter private counters here */
+ cntrs = dev->counters;
+ LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_READS],
+ 0, NULL, "read", "reqs");
+ LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_READ_BYTES],
+ LPROCFS_CNTR_AVGMINMAX,
+ NULL, "read_bytes", "bytes");
+ LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_WRITES],
+ 0, NULL, "write", "reqs");
+
+ LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_WRITE_BYTES],
+ LPROCFS_CNTR_AVGMINMAX,
+ NULL, "write_bytes", "bytes");
+ return rc;
}
int filter_detach(struct obd_device *dev)
{
+ lprocfs_free_obd_counters(dev);
return lprocfs_obd_detach(dev);
}
/* nearly identical to mds_connect */
static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
- struct obd_uuid *cluuid, struct recovd_obd *recovd,
- ptlrpc_recovery_cb_t recover)
+ struct obd_uuid *cluuid)
{
struct obd_export *exp;
struct filter_export_data *fed;
LASSERT(exp);
fed = &exp->exp_filter_data;
+ class_export_put(exp);
INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
spin_lock_init(&exp->exp_filter_data.fed_lock);
- if (!(obd->obd_flags & OBD_REPLAYABLE))
+ if (!obd->obd_replayable)
RETURN(0);
OBD_ALLOC(fcd, sizeof(*fcd));
fed->fed_fcd = fcd;
fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
- rc = filter_client_add(filter, fed, -1);
+ rc = filter_client_add(obd, filter, fed, -1);
if (rc)
GOTO(out_fcd, rc);
out_fcd:
OBD_FREE(fcd, sizeof(*fcd));
out_export:
- class_disconnect(conn);
+ class_disconnect(conn, 0);
RETURN(rc);
}
-/* also incredibly similar to mds_disconnect */
-static int filter_disconnect(struct lustre_handle *conn)
+static void filter_destroy_export(struct obd_export *exp)
{
- struct obd_export *exp = class_conn2export(conn);
- struct filter_export_data *fed;
- int rc;
- ENTRY;
+ struct filter_export_data *fed = &exp->exp_filter_data;
- LASSERT(exp);
- fed = &exp->exp_filter_data;
+ ENTRY;
spin_lock(&fed->fed_lock);
while (!list_empty(&fed->fed_open_head)) {
struct filter_file_data *ffd;
CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
ffd->ffd_file->f_dentry->d_name.len,
ffd->ffd_file->f_dentry->d_name.name,
- ffd, ffd->ffd_servercookie);
+ ffd, ffd->ffd_handle.h_cookie);
- filter_close_internal(exp, ffd, NULL);
+ filter_close_internal(exp, ffd, NULL, exp->exp_failover);
spin_lock(&fed->fed_lock);
}
spin_unlock(&fed->fed_lock);
+ if (exp->exp_obd->obd_replayable)
+ filter_client_free(exp, exp->exp_failover);
+ EXIT;
+}
+
+/* also incredibly similar to mds_disconnect */
+static int filter_disconnect(struct lustre_handle *conn, int failover)
+{
+ struct obd_export *exp = class_conn2export(conn);
+ int rc;
+ unsigned long flags;
+ ENTRY;
+
+ LASSERT(exp);
ldlm_cancel_locks_for_export(exp);
- if (exp->exp_obd->obd_flags & OBD_REPLAYABLE)
- filter_client_free(exp);
+ spin_lock_irqsave(&exp->exp_lock, flags);
+ exp->exp_failover = failover;
+ spin_unlock_irqrestore(&exp->exp_lock, flags);
- rc = class_disconnect(conn);
+ rc = class_disconnect(conn, failover);
+ fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
+ class_export_put(exp);
/* XXX cleanup preallocated inodes */
RETURN(rc);
}
EXIT;
}
-static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
-{
- struct filter_file_data *ffd = NULL;
- ENTRY;
-
- if (!handle || !handle->addr)
- RETURN(NULL);
-
- ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
- if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
- RETURN(NULL);
-
- if (ffd->ffd_servercookie != handle->cookie)
- RETURN(NULL);
-
- LASSERT(ffd->ffd_file->private_data == ffd);
- RETURN(ffd);
-}
-
static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
struct obdo *oa, int locked,char *what)
{
struct lustre_handle *ost_handle = obdo_handle(oa);
struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
- if (ffd)
+ if (ffd != NULL) {
dentry = dget(ffd->ffd_file->f_dentry);
+ filter_ffd_put(ffd);
+ }
}
if (!dentry) {
struct obd_device *obd = class_conn2obd(conn);
if (!obd) {
- CERROR("invalid client "LPX64"\n", conn->addr);
+ CERROR("invalid client cookie "LPX64"\n", conn->cookie);
RETURN(ERR_PTR(-EINVAL));
}
dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
if (!dentry->d_inode) {
CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
f_dput(dentry);
- LBUG();
RETURN(ERR_PTR(-ENOENT));
}
dentry = filter_oa2dentry(conn, oa, 0);
if (IS_ERR(dentry))
- RETURN(PTR_ERR(dentry));
+ GOTO(out_exp, rc = PTR_ERR(dentry));
iattr_from_obdo(&iattr, oa, oa->o_valid);
iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
if (iattr.ia_valid & ATTR_SIZE)
down(&inode->i_sem);
- filter_start_transno(export);
handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
- if (IS_ERR(handle)) {
- rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
- GOTO(out_unlock, rc);
- }
+ if (IS_ERR(handle))
+ GOTO(out_unlock, rc = PTR_ERR(handle));
if (inode->i_op->setattr)
rc = inode->i_op->setattr(dentry, &iattr);
else
rc = inode_setattr(inode, &iattr);
rc = filter_finish_transno(export, handle, oti, rc);
- rc2 = fsfilt_commit(obd, dentry->d_inode, handle);
+ rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
if (rc2) {
CERROR("error on commit, err = %d\n", rc2);
if (!rc)
pop_ctxt(&saved, &filter->fo_ctxt, NULL);
f_dput(dentry);
+ out_exp:
+ class_export_put(export);
RETURN(rc);
}
static int filter_open(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea, struct obd_trans_info *oti)
+ struct lov_stripe_md *ea, struct obd_trans_info *oti,
+ struct obd_client_handle *och)
{
struct obd_export *export;
struct lustre_handle *handle;
struct filter_file_data *ffd;
struct file *filp;
+ struct lustre_handle parent_lockh;
int rc = 0;
ENTRY;
export = class_conn2export(conn);
if (!export) {
- CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
- RETURN(-EINVAL);
+ CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ conn->cookie);
+ GOTO(out, rc = -EINVAL);
}
XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
- filp = filter_obj_open(export, oa->o_id, oa->o_mode);
+ filp = filter_obj_open(export, oa->o_id, oa->o_mode,
+ LCK_PR, &parent_lockh);
if (IS_ERR(filp))
GOTO(out, rc = PTR_ERR(filp));
ffd = filp->private_data;
handle = obdo_handle(oa);
- handle->addr = (__u64)(unsigned long)ffd;
- handle->cookie = ffd->ffd_servercookie;
+ handle->cookie = ffd->ffd_handle.h_cookie;
oa->o_valid |= OBD_MD_FLHANDLE;
- EXIT;
+
out:
- return rc;
-} /* filter_open */
+ class_export_put(export);
+ if (!rc) {
+ memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
+ sizeof(parent_lockh));
+ oti->oti_ack_locks[0].mode = LCK_PR;
+ }
+ RETURN(rc);
+}
static int filter_close(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
- struct obd_export *exp;
+ struct obd_export *exp = class_conn2export(conn);
struct filter_file_data *ffd;
struct filter_export_data *fed;
int rc;
ENTRY;
- exp = class_conn2export(conn);
if (!exp) {
- CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
- RETURN(-EINVAL);
+ CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
+ GOTO(out, rc = -EINVAL);
}
XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
- RETURN(-EINVAL);
+ GOTO(out, rc = -EINVAL);
}
ffd = filter_handle2ffd(obdo_handle(oa));
- if (!ffd) {
- struct lustre_handle *handle = obdo_handle(oa);
- CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
- handle->addr, handle->cookie);
- RETURN(-ESTALE);
+ if (ffd == NULL) {
+ CERROR("bad handle ("LPX64") for close\n",
+ obdo_handle(oa)->cookie);
+ GOTO(out, rc = -ESTALE);
}
fed = &exp->exp_filter_data;
list_del(&ffd->ffd_export_list);
spin_unlock(&fed->fed_lock);
- rc = filter_close_internal(exp, ffd, oti);
-
- RETURN(rc);
-} /* filter_close */
+ rc = filter_close_internal(exp, ffd, oti, 0);
+ filter_ffd_put(ffd);
+ GOTO(out, rc);
+ out:
+ class_export_put(exp);
+ return rc;
+}
static int filter_create(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
- struct obd_export *export = class_conn2export(conn);
+ struct obd_export *export;
struct obd_device *obd = class_conn2obd(conn);
struct filter_obd *filter = &obd->u.filter;
struct obd_run_ctxt saved;
struct dentry *dir_dentry;
- struct dentry *new;
+ struct lustre_handle parent_lockh;
+ struct dentry *new = NULL;
struct iattr;
void *handle;
- int err, rc;
+ int err, rc, cleanup_phase;
ENTRY;
if (!obd) {
- CERROR("invalid client "LPX64"\n", conn->addr);
- return -EINVAL;
+ CERROR("invalid client cookie "LPX64"\n", conn->cookie);
+ RETURN(-EINVAL);
}
+ export = class_conn2export(conn);
XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
oa->o_id = filter_next_id(obd);
push_ctxt(&saved, &filter->fo_ctxt, NULL);
- dir_dentry = filter_parent(obd, S_IFREG, oa->o_id);
- down(&dir_dentry->d_inode->i_sem);
+ retry:
+ cleanup_phase = 0;
+ dir_dentry = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
+ &parent_lockh);
+ if (IS_ERR(dir_dentry))
+ GOTO(cleanup, rc = PTR_ERR(dir_dentry));
+ cleanup_phase = 1;
+
new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
if (IS_ERR(new))
- GOTO(out, rc = PTR_ERR(new));
-
+ GOTO(cleanup, rc = PTR_ERR(new));
if (new->d_inode) {
char buf[32];
/* This would only happen if lastobjid was bad on disk */
- CERROR("objid %s already exists\n",
- filter_id(buf, filter, oa->o_mode, oa->o_id));
- LBUG();
- GOTO(out, rc = -EEXIST);
+ CERROR("Serious error: objid %s already exists; is this "
+ "filesystem corrupt? I will try to work around it.\n",
+ filter_id(buf, filter, oa->o_id, oa->o_mode));
+ f_dput(new);
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+ oa->o_id = filter_next_id(obd);
+ goto retry;
}
- filter_start_transno(export);
+ cleanup_phase = 2;
handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
- if (IS_ERR(handle)) {
- rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
- GOTO(out_put, rc);
- }
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
+
rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
if (rc)
CERROR("create failed rc = %d\n", rc);
if (!rc)
rc = err;
}
- err = fsfilt_commit(obd, dir_dentry->d_inode, handle);
+ err = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
if (err) {
CERROR("error on commit, err = %d\n", err);
if (!rc)
}
if (rc)
- GOTO(out_put, rc);
+ GOTO(cleanup, rc);
/* Set flags for fields we have set in the inode struct */
oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
filter_from_inode(oa, new->d_inode, oa->o_valid);
EXIT;
-out_put:
- f_dput(new);
-out:
- up(&dir_dentry->d_inode->i_sem);
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- return rc;
+cleanup:
+ switch(cleanup_phase) {
+ case 2:
+ f_dput(new);
+ case 1: /* locked parent dentry */
+ if (rc || oti == NULL) {
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+ } else {
+ memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
+ sizeof(parent_lockh));
+ oti->oti_ack_locks[0].mode = LCK_PW;
+ }
+ case 0:
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ class_export_put(export);
+ break;
+ default:
+ CERROR("invalid cleanup_phase %d\n", cleanup_phase);
+ LBUG();
+ }
+
+ RETURN(rc);
}
static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
- struct obd_export *export = class_conn2export(conn);
+ struct obd_export *export;
struct obd_device *obd = class_conn2obd(conn);
struct filter_obd *filter = &obd->u.filter;
- struct dentry *dir_dentry, *object_dentry;
+ struct dentry *dir_dentry, *object_dentry = NULL;
struct filter_dentry_data *fdd;
struct obd_run_ctxt saved;
- void *handle;
- int rc, rc2;
+ void *handle = NULL;
+ struct lustre_handle parent_lockh;
+ int rc, rc2, cleanup_phase = 0;
ENTRY;
if (!obd) {
- CERROR("invalid client "LPX64"\n", conn->addr);
+ CERROR("invalid client cookie "LPX64"\n", conn->cookie);
RETURN(-EINVAL);
}
+ export = class_conn2export(conn);
XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
- dir_dentry = filter_parent(obd, oa->o_mode, oa->o_id);
- down(&dir_dentry->d_inode->i_sem);
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ dir_dentry = filter_parent_lock(obd, oa->o_mode, oa->o_id,
+ LCK_PW, &parent_lockh);
+ if (IS_ERR(dir_dentry))
+ GOTO(cleanup, rc = PTR_ERR(dir_dentry));
+ cleanup_phase = 1;
object_dentry = filter_oa2dentry(conn, oa, 0);
if (IS_ERR(object_dentry))
- GOTO(out, rc = -ENOENT);
+ GOTO(cleanup, rc = -ENOENT);
+ cleanup_phase = 2;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- filter_start_transno(export);
handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
- if (IS_ERR(handle)) {
- rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
- GOTO(out_ctxt, rc);
- }
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
+ cleanup_phase = 3;
fdd = object_dentry->d_fsdata;
if (fdd && atomic_read(&fdd->fdd_open_count)) {
CDEBUG(D_INODE,
"repeat destroy of %dx open objid "LPU64"\n",
atomic_read(&fdd->fdd_open_count), oa->o_id);
- GOTO(out_commit, rc = 0);
+ GOTO(cleanup, rc = 0);
}
rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
-out_commit:
- /* XXX save last_rcvd on disk */
- rc = filter_finish_transno(export, handle, oti, rc);
- rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
- if (rc2) {
- CERROR("error on commit, err = %d\n", rc2);
- if (!rc)
- rc = rc2;
+cleanup:
+ switch(cleanup_phase) {
+ case 3:
+ rc = filter_finish_transno(export, handle, oti, rc);
+ rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
+ if (rc2) {
+ CERROR("error on commit, err = %d\n", rc2);
+ if (!rc)
+ rc = rc2;
+ }
+ case 2:
+ f_dput(object_dentry);
+ case 1:
+ if (rc || oti == NULL) {
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+ } else {
+ memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
+ sizeof(parent_lockh));
+ oti->oti_ack_locks[0].mode = LCK_PW;
+ }
+ case 0:
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ class_export_put(export);
+ break;
+ default:
+ CERROR("invalid cleanup_phase %d\n", cleanup_phase);
+ LBUG();
}
-out_ctxt:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- f_dput(object_dentry);
- EXIT;
-out:
- up(&dir_dentry->d_inode->i_sem);
- return rc;
+ RETURN(rc);
}
/* NB start and end are used for punch, but not truncate */
XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
if (end != OBD_OBJECT_EOF)
- CERROR("PUNCH not supported, only truncate works\n");
+ CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
+ end);
CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
"o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
static inline void lustre_put_page(struct page *page)
{
- kunmap(page);
page_cache_release(page);
}
-
-static struct page *
-lustre_get_page_read(struct inode *inode, struct niobuf_local *lnb)
+static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
{
- unsigned long index = lnb->offset >> PAGE_SHIFT;
struct address_space *mapping = inode->i_mapping;
struct page *page;
+ unsigned long index = lnb->offset >> PAGE_SHIFT;
int rc;
- page = read_cache_page(mapping, index,
- (filler_t*)mapping->a_ops->readpage, NULL);
- if (!IS_ERR(page)) {
- wait_on_page(page);
- lnb->addr = kmap(page);
- lnb->page = page;
- if (!PageUptodate(page)) {
- CERROR("page index %lu not uptodate\n", index);
- GOTO(err_page, rc = -EIO);
- }
- if (PageError(page)) {
- CERROR("page index %lu has error\n", index);
- GOTO(err_page, rc = -EIO);
- }
+ page = grab_cache_page(mapping, index); /* locked page */
+ if (IS_ERR(page))
+ return lnb->rc = PTR_ERR(page);
+
+ lnb->page = page;
+
+ if (inode->i_size < lnb->offset + lnb->len - 1)
+ lnb->rc = inode->i_size - lnb->offset;
+ else
+ lnb->rc = lnb->len;
+
+ if (PageUptodate(page)) {
+ unlock_page(page);
+ return 0;
+ }
+
+ rc = mapping->a_ops->readpage(NULL, page);
+ if (rc < 0) {
+ CERROR("page index %lu, rc = %d\n", index, rc);
+ lnb->page = NULL;
+ lustre_put_page(page);
+ return lnb->rc = rc;
}
- return page;
+
+ return 0;
+}
+
+static int filter_finish_page_read(struct niobuf_local *lnb)
+{
+ if (lnb->page == NULL)
+ return 0;
+
+ if (PageUptodate(lnb->page))
+ return 0;
+
+ wait_on_page(lnb->page);
+ if (!PageUptodate(lnb->page)) {
+ CERROR("page index %lu/offset "LPX64" not uptodate\n",
+ lnb->page->index, lnb->offset);
+ GOTO(err_page, lnb->rc = -EIO);
+ }
+ if (PageError(lnb->page)) {
+ CERROR("page index %lu/offset "LPX64" has error\n",
+ lnb->page->index, lnb->offset);
+ GOTO(err_page, lnb->rc = -EIO);
+ }
+
+ return 0;
err_page:
- lustre_put_page(page);
- return ERR_PTR(rc);
+ lustre_put_page(lnb->page);
+ lnb->page = NULL;
+ return lnb->rc;
}
-static struct page *
-lustre_get_page_write(struct inode *inode, unsigned long index)
+static struct page *lustre_get_page_write(struct inode *inode,
+ unsigned long index)
{
struct address_space *mapping = inode->i_mapping;
struct page *page;
page = grab_cache_page(mapping, index); /* locked page */
if (!IS_ERR(page)) {
- kmap(page);
/* Note: Called with "O" and "PAGE_SIZE" this is essentially
* a no-op for most filesystems, because we write the whole
* page. For partial-page I/O this will read in the page.
LASSERT(to <= PAGE_SIZE);
err = page->mapping->a_ops->commit_write(NULL, page, from, to);
if (!err && IS_SYNC(inode))
- err = waitfor_one_page(page);
+ waitfor_one_page(page);
//SetPageUptodate(page); // the client commit_write will do this
SetPageReferenced(page);
return err;
}
-struct page *filter_get_page_write(struct inode *inode,
- struct niobuf_local *lnb, int *pglocked)
+int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
+ int *pglocked)
{
unsigned long index = lnb->offset >> PAGE_SHIFT;
struct address_space *mapping = inode->i_mapping;
}
POISON((void *)addr, 0xBA, PAGE_SIZE);
page = virt_to_page(addr);
- kmap(page);
page->index = index;
- lnb->addr = (void *)addr;
lnb->page = page;
lnb->flags |= N_LOCAL_TEMP_PAGE;
} else if (!IS_ERR(page)) {
(*pglocked)++;
- kmap(page);
rc = mapping->a_ops->prepare_write(NULL, page,
lnb->offset & ~PAGE_MASK,
LBUG();
GOTO(err_unlock, rc = -EIO);
}
- lnb->addr = page_address(page);
lnb->page = page;
}
- return page;
+ return 0;
err_unlock:
unlock_page(page);
lustre_put_page(page);
err:
- return ERR_PTR(rc);
+ return lnb->rc = rc;
}
/*
for (bh = head, block_start = 0; bh != head || !block_start;
block_start = block_end, bh = bh->b_this_page) {
block_end = block_start + blocksize;
- if (buffer_new(bh))
- memset(lnb->addr + block_start, 0, blocksize);
+ if (buffer_new(bh)) {
+ memset(kmap(lnb->page) + block_start, 0,
+ blocksize);
+ kunmap(lnb->page);
+ }
}
}
#endif
return lustre_commit_write(lnb);
}
-static int filter_preprw(int cmd, struct lustre_handle *conn,
+static int filter_preprw(int cmd, struct obd_export *export,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_remote *nb,
struct niobuf_local *res, void **desc_private,
struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
- struct obd_export *export;
struct obd_device *obd;
struct obd_ioobj *o;
- struct niobuf_remote *rnb = nb;
- struct niobuf_local *lnb = res;
+ struct niobuf_remote *rnb;
+ struct niobuf_local *lnb;
struct fsfilt_objinfo *fso;
- int pglocked = 0;
- int rc = 0;
- int i;
+ struct dentry *dentry;
+ struct inode *inode;
+ struct lprocfs_counters *cntrs;
+ int pglocked = 0, rc = 0, i, j;
+
ENTRY;
if ((cmd & OBD_BRW_WRITE) != 0)
memset(res, 0, niocount * sizeof(*res));
- export = class_conn2export(conn);
- obd = class_conn2obd(conn);
- if (!obd) {
- CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
+ obd = export->exp_obd;
+ if (obd == NULL)
RETURN(-EINVAL);
- }
- LASSERT(objcount < 16); // theoretically we support multi-obj BRW
+ cntrs = obd->counters;
+ if ((cmd & OBD_BRW_WRITE) != 0)
+ LPROCFS_COUNTER_INCBY1(&cntrs->cntr[LPROC_FILTER_WRITES]);
+ else
+ LPROCFS_COUNTER_INCBY1(&cntrs->cntr[LPROC_FILTER_READS]);
+
+ // theoretically we support multi-obj BRW RPCs, but until then...
+ LASSERT(objcount == 1);
OBD_ALLOC(fso, objcount * sizeof(*fso));
if (!fso)
for (i = 0, o = obj; i < objcount; i++, o++) {
struct filter_dentry_data *fdd;
- struct dentry *dentry;
LASSERT(o->ioo_bufcnt);
o->ioo_id),
o->ioo_id, 0);
- if (IS_ERR(dentry))
+ if (IS_ERR(dentry))
GOTO(out_objinfo, rc = PTR_ERR(dentry));
fso[i].fso_dentry = dentry;
if (!dentry->d_inode) {
CERROR("trying to BRW to non-existent file "LPU64"\n",
o->ioo_id);
- f_dput(dentry);
GOTO(out_objinfo, rc = -ENOENT);
}
+ /* If we ever start to support mutli-object BRW RPCs, we will
+ * need to get locks on mulitple inodes (in order) or use the
+ * DLM to do the locking for us (and use the same locking in
+ * filter_setattr() for truncate). That isn't all, because
+ * there still exists the possibility of a truncate starting
+ * a new transaction while holding the ext3 rwsem = write
+ * while some writes (which have started their transactions
+ * here) blocking on the ext3 rwsem = read => lock inversion.
+ *
+ * The handling gets very ugly when dealing with locked pages.
+ * It may be easier to just get rid of the locked page code
+ * (which has problems of its own) and either discover we do
+ * not need it anymore (i.e. it was a symptom of another bug)
+ * or ensure we get the page locks in an appropriate order.
+ */
+ if (cmd & OBD_BRW_WRITE)
+ down(&dentry->d_inode->i_sem);
fdd = dentry->d_fsdata;
if (!fdd || !atomic_read(&fdd->fdd_open_count))
CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
}
if (cmd & OBD_BRW_WRITE) {
-#warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
- /* Even worse, we need to get locks on mulitple inodes (in
- * order) or use the DLM to do the locking for us (and use
- * the same locking in filter_setattr() for truncate. The
- * handling gets very ugly when dealing with locked pages.
- * It may be easier to just get rid of the locked page code
- * (which has problems of its own) and either discover we do
- * not need it anymore (i.e. it was a symptom of another bug)
- * or ensure we get the page locks in an appropriate order.
- */
- /* Danger, Will Robinson! You are taking a lock here and also
- * starting a transaction and releasing/finishing then in
- * filter_commitrw(), so you must call fsfilt_commit() and
- * finish_transno() if an error occurs in this function.
- */
- filter_start_transno(export);
*desc_private = fsfilt_brw_start(obd, objcount, fso,
niocount, nb);
if (IS_ERR(*desc_private)) {
}
}
- obd_kmap_get(niocount, 1);
-
- for (i = 0, o = obj; i < objcount; i++, o++) {
- struct dentry *dentry;
- struct inode *inode;
- int j;
-
+ for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
dentry = fso[i].fso_dentry;
inode = dentry->d_inode;
for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
- struct page *page;
-
if (j == 0)
lnb->dentry = dentry;
else
lnb->dentry = dget(dentry);
- /* lnb->offset is aligned, while rnb->offset isn't,
- * and we need to copy the fields to lnb anyways.
- */
- memcpy(lnb, rnb, sizeof(*rnb));
+ lnb->offset = rnb->offset;
+ lnb->len = rnb->len;
+ lnb->flags = rnb->flags;
+
if (cmd & OBD_BRW_WRITE) {
- page = filter_get_page_write(inode, lnb,
- &pglocked);
+ rc = filter_get_page_write(inode,lnb,&pglocked);
XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
lnb->len);
+ LPROCFS_COUNTER_INCR(&cntrs->cntr[LPROC_FILTER_WRITE_BYTES], lnb->len);
+ } else if (inode->i_size <= rnb->offset) {
+ /* If there's no more data, abort early.
+ * lnb->page == NULL and lnb->rc == 0, so it's
+ * easy to detect later. */
+ f_dput(lnb->dentry);
+ lnb->dentry = NULL;
+ break;
} else {
- page = lustre_get_page_read(inode, lnb);
+ rc = filter_start_page_read(inode, lnb);
XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
lnb->len);
+ LPROCFS_COUNTER_INCR(&cntrs->cntr[LPROC_FILTER_READ_BYTES], lnb->len);
}
- if (IS_ERR(page)) {
- rc = PTR_ERR(page);
+ if (rc) {
CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
"error on page @"LPU64"%u/%u: rc = %d\n",
lnb->offset, j, o->ioo_bufcnt, rc);
f_dput(dentry);
GOTO(out_pages, rc);
}
+
+ if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
+ /* Likewise with a partial read */
+ break;
+ }
}
}
+ while ((cmd & OBD_BRW_READ) && lnb-- > res) {
+ rc = filter_finish_page_read(lnb);
+ if (rc) {
+ CERROR("error on page %u@"LPU64": rc = %d\n",
+ lnb->len, lnb->offset, rc);
+ f_dput(lnb->dentry);
+ GOTO(out_pages, rc);
+ }
+ }
EXIT;
out:
OBD_FREE(fso, objcount * sizeof(*fso));
out_pages:
while (lnb-- > res) {
- if (cmd & OBD_BRW_WRITE)
+ if (cmd & OBD_BRW_WRITE) {
filter_commit_write(lnb, rc);
- else
+ up(&lnb->dentry->d_inode->i_sem);
+ } else {
lustre_put_page(lnb->page);
+ }
f_dput(lnb->dentry);
}
- obd_kmap_put(niocount);
if (cmd & OBD_BRW_WRITE) {
filter_finish_transno(export, *desc_private, oti, rc);
fsfilt_commit(obd,
filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
- *desc_private);
+ *desc_private, 0);
}
goto out; /* dropped the dentry refs already (one per page) */
out_objinfo:
- for (i = 0; i < objcount && fso[i].fso_dentry; i++)
+ for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
+ if (cmd & OBD_BRW_WRITE)
+ up(&fso[i].fso_dentry->d_inode->i_sem);
f_dput(fso[i].fso_dentry);
+ }
goto out;
}
static int filter_write_locked_page(struct niobuf_local *lnb)
{
struct page *lpage;
+ void *lpage_addr;
+ void *lnb_addr;
int rc;
ENTRY;
RETURN(rc);
}
- /* lpage is kmapped in lustre_get_page_write() above and kunmapped in
- * lustre_commit_write() below, lnb->page was kmapped previously in
- * filter_get_page_write() and kunmapped in lustre_put_page() below.
- */
- memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE);
+ /* 2 kmaps == vanishingly small deadlock opportunity */
+ lpage_addr = kmap(lpage);
+ lnb_addr = kmap(lnb->page);
+
+ memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
+
+ kunmap(lnb->page);
+ kunmap(lpage);
+
lustre_put_page(lnb->page);
lnb->page = lpage;
RETURN(rc);
}
-static int filter_syncfs(struct lustre_handle *conn)
+static int filter_syncfs(struct obd_export *exp)
{
- struct obd_device *obd;
+ struct obd_device *obd = exp->exp_obd;
ENTRY;
- obd = class_conn2obd(conn);
-
XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
}
-static int filter_commitrw(int cmd, struct lustre_handle *conn,
+static int filter_commitrw(int cmd, struct obd_export *export,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *res,
void *desc_private, struct obd_trans_info *oti)
struct obd_run_ctxt saved;
struct obd_ioobj *o;
struct niobuf_local *lnb;
- struct obd_export *export = class_conn2export(conn);
- struct obd_device *obd = class_conn2obd(conn);
- int found_locked = 0;
- int rc = 0;
- int i;
+ struct obd_device *obd = export->exp_obd;
+ int found_locked = 0, rc = 0, i;
ENTRY;
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
int j;
- if (cmd & OBD_BRW_WRITE)
+ if (cmd & OBD_BRW_WRITE) {
inode_update_time(lnb->dentry->d_inode, 1);
+ up(&lnb->dentry->d_inode->i_sem);
+ }
for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+ if (lnb->page == NULL) {
+ continue;
+ }
if (lnb->flags & N_LOCAL_TEMP_PAGE) {
found_locked++;
continue;
if (!rc)
rc = err;
- } else
+ } else {
lustre_put_page(lnb->page);
+ }
- obd_kmap_put(1);
f_dput(lnb->dentry);
}
}
for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
- i++, o++) {
+ i++, o++) {
int j;
for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
int err;
continue;
err = filter_write_locked_page(lnb);
- obd_kmap_put(1);
if (!rc)
rc = err;
f_dput(lnb->dentry);
int err;
rc = filter_finish_transno(export, desc_private, oti, rc);
- err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
+ err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private,
+ obd_sync_filter);
if (err)
rc = err;
- if (obd_sync_filter) {
- /* this can fail with ENOMEM, what should we do then? */
- filter_syncfs(conn);
- }
- /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
+ if (obd_sync_filter)
+ LASSERT(oti->oti_transno <= obd->obd_last_committed);
+
}
LASSERT(!current->journal_info);
static int filter_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *lsm, obd_count oa_bufs,
- struct brw_page *pga, struct obd_brw_set *set,
- struct obd_trans_info *oti)
+ struct brw_page *pga, struct obd_trans_info *oti)
{
+ struct obd_export *export = class_conn2export(conn);
struct obd_ioobj ioo;
struct niobuf_local *lnb;
struct niobuf_remote *rnb;
int ret = 0;
ENTRY;
+ if (export == NULL)
+ RETURN(-EINVAL);
+
OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
ioo.ioo_type = S_IFREG;
ioo.ioo_bufcnt = oa_bufs;
- ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
+ ret = filter_preprw(cmd, export, 1, &ioo, oa_bufs, rnb, lnb,
&desc_private, oti);
if (ret != 0)
GOTO(out, ret);
for (i = 0; i < oa_bufs; i++) {
void *virt = kmap(pga[i].pg);
obd_off off = pga[i].off & ~PAGE_MASK;
+ void *addr = kmap(lnb[i].page);
+
+ /* 2 kmaps == vanishingly small deadlock opportunity */
if (cmd & OBD_BRW_WRITE)
- memcpy(lnb[i].addr + off, virt + off, pga[i].count);
+ memcpy(addr + off, virt + off, pga[i].count);
else
- memcpy(virt + off, lnb[i].addr + off, pga[i].count);
+ memcpy(virt + off, addr + off, pga[i].count);
+ kunmap(addr);
kunmap(virt);
}
- ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private,
+ ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
oti);
out:
OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
if (rnb)
OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
+ class_export_put(export);
RETURN(ret);
}
obd = class_conn2obd(conn);
if (!obd) {
- CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
+ CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ conn->cookie);
RETURN(-EINVAL);
}
RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
}
-static int filter_get_info(struct lustre_handle *conn, obd_count keylen,
- void *key, obd_count *vallen, void **val)
+static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
+ void *key, __u32 *vallen, void *val)
{
struct obd_device *obd;
ENTRY;
obd = class_conn2obd(conn);
if (!obd) {
- CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
+ CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ conn->cookie);
RETURN(-EINVAL);
}
- if ( keylen == strlen("blocksize") &&
- memcmp(key, "blocksize", keylen) == 0 ) {
- *vallen = sizeof(long);
- *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize;
+ if (keylen == strlen("blocksize") &&
+ memcmp(key, "blocksize", keylen) == 0) {
+ __u32 *blocksize = val;
+ *vallen = sizeof(*blocksize);
+ *blocksize = obd->u.filter.fo_sb->s_blocksize;
RETURN(0);
}
- if ( keylen == strlen("blocksize_bits") &&
- memcmp(key, "blocksize_bits", keylen) == 0 ){
- *vallen = sizeof(long);
- *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize_bits;
+ if (keylen == strlen("blocksize_bits") &&
+ memcmp(key, "blocksize_bits", keylen) == 0) {
+ __u32 *blocksize_bits = val;
+ *vallen = sizeof(*blocksize_bits);
+ *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
RETURN(0);
}
if (page == NULL)
RETURN(-ENOMEM);
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- while (TryLockPage(page))
- ___wait_on_page(page);
-#else
- wait_on_page_locked(page);
-#endif
+ wait_on_page(page);
/* XXX with brw vector I/O, we could batch up reads and writes here,
* all we need to do is allocate multiple pages to handle the I/Os
*/
while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
struct brw_page pg;
- struct obd_brw_set *set;
-
- set = obd_brw_set_new();
- if (set == NULL) {
- err = -ENOMEM;
- EXIT;
- break;
- }
pg.pg = page;
pg.count = PAGE_SIZE;
pg.flag = 0;
page->index = index;
- set->brw_callback = ll_brw_sync_wait;
- err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
- obd_brw_set_decref(set);
+ err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
if (err) {
EXIT;
break;
}
- set = obd_brw_set_new();
- if (set == NULL) {
- err = -ENOMEM;
- EXIT;
- break;
- }
pg.flag = OBD_BRW_CREATE;
CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
- set->brw_callback = ll_brw_sync_wait;
- err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
- obd_brw_set_decref(set);
+ err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
/* XXX should handle dst->o_size, dst->o_blocks here */
if (err) {
}
static struct obd_ops filter_obd_ops = {
- o_owner: THIS_MODULE,
- o_attach: filter_attach,
- o_detach: filter_detach,
- o_get_info: filter_get_info,
- o_setup: filter_setup,
- o_cleanup: filter_cleanup,
- o_connect: filter_connect,
- o_disconnect: filter_disconnect,
- o_statfs: filter_statfs,
- o_syncfs: filter_syncfs,
- o_getattr: filter_getattr,
- o_create: filter_create,
- o_setattr: filter_setattr,
- o_destroy: filter_destroy,
- o_open: filter_open,
- o_close: filter_close,
- o_brw: filter_brw,
- o_punch: filter_truncate,
- o_preprw: filter_preprw,
- o_commitrw: filter_commitrw
+ o_owner: THIS_MODULE,
+ o_attach: filter_attach,
+ o_detach: filter_detach,
+ o_get_info: filter_get_info,
+ o_setup: filter_setup,
+ o_cleanup: filter_cleanup,
+ o_connect: filter_connect,
+ o_disconnect: filter_disconnect,
+ o_statfs: filter_statfs,
+ o_syncfs: filter_syncfs,
+ o_getattr: filter_getattr,
+ o_create: filter_create,
+ o_setattr: filter_setattr,
+ o_destroy: filter_destroy,
+ o_open: filter_open,
+ o_close: filter_close,
+ o_brw: filter_brw,
+ o_punch: filter_truncate,
+ o_preprw: filter_preprw,
+ o_commitrw: filter_commitrw,
+ o_destroy_export: filter_destroy_export,
#if 0
o_san_preprw: filter_san_preprw,
o_preallocate: filter_preallocate_inodes,
};
static struct obd_ops filter_sanobd_ops = {
- o_owner: THIS_MODULE,
- o_attach: filter_attach,
- o_detach: filter_detach,
- o_get_info: filter_get_info,
- o_setup: filter_san_setup,
- o_cleanup: filter_cleanup,
- o_connect: filter_connect,
- o_disconnect: filter_disconnect,
- o_statfs: filter_statfs,
- o_getattr: filter_getattr,
- o_create: filter_create,
- o_setattr: filter_setattr,
- o_destroy: filter_destroy,
- o_open: filter_open,
- o_close: filter_close,
- o_brw: filter_brw,
- o_punch: filter_truncate,
- o_preprw: filter_preprw,
- o_commitrw: filter_commitrw,
- o_san_preprw: filter_san_preprw,
+ o_owner: THIS_MODULE,
+ o_attach: filter_attach,
+ o_detach: filter_detach,
+ o_get_info: filter_get_info,
+ o_setup: filter_san_setup,
+ o_cleanup: filter_cleanup,
+ o_connect: filter_connect,
+ o_disconnect: filter_disconnect,
+ o_statfs: filter_statfs,
+ o_getattr: filter_getattr,
+ o_create: filter_create,
+ o_setattr: filter_setattr,
+ o_destroy: filter_destroy,
+ o_open: filter_open,
+ o_close: filter_close,
+ o_brw: filter_brw,
+ o_punch: filter_truncate,
+ o_preprw: filter_preprw,
+ o_commitrw: filter_commitrw,
+ o_san_preprw: filter_san_preprw,
+ o_destroy_export: filter_destroy_export
#if 0
o_preallocate: filter_preallocate_inodes,
o_migrate: filter_migrate,
int rc;
printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
- filter_open_cache = kmem_cache_create("ll_filter_fdata",
- sizeof(struct filter_file_data),
- 0, 0, NULL, NULL);
- if (!filter_open_cache)
- RETURN(-ENOMEM);
-
- filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
- sizeof(struct filter_dentry_data),
- 0, 0, NULL, NULL);
- if (!filter_dentry_cache) {
- rc = -ENOMEM;
- goto err1;
- }
xprocfs_init ("filter");
-
lprocfs_init_vars(&lvars);
rc = class_register_type(&filter_obd_ops, lvars.module_vars,
OBD_FILTER_DEVICENAME);
if (rc)
- goto err2;
+ return rc;
rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
OBD_FILTER_SAN_DEVICENAME);
if (rc)
- goto err3;
-
- return 0;
-err3:
- class_unregister_type(OBD_FILTER_DEVICENAME);
-err2:
- kmem_cache_destroy(filter_dentry_cache);
-err1:
- kmem_cache_destroy(filter_open_cache);
+ class_unregister_type(OBD_FILTER_DEVICENAME);
return rc;
}
{
class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
class_unregister_type(OBD_FILTER_DEVICENAME);
- if (kmem_cache_destroy(filter_dentry_cache))
- CERROR("couldn't free obdfilter dentry cache\n");
- if (kmem_cache_destroy(filter_open_cache))
- CERROR("couldn't free obdfilter open cache\n");
xprocfs_fini ();
}