#ifndef _LUSTRE_LOG_H
#define _LUSTRE_LOG_H
+#include <linux/obd.h>
#include <linux/lustre_idl.h>
struct obd_trans_info;
struct obd_device;
struct lov_stripe_md;
+struct plain_handle_data {
+ struct list_head phd_entry;
+ struct llog_cookie phd_cookie; /* cookie of this log in its cat */
+ int phd_last_idx;
+};
+
+struct cat_handle_data {
+ struct list_head chd_head;
+ struct llog_handle *chd_current_log; /* currently open log */
+};
+
/* In-memory descriptor for a log object or log catalog */
struct llog_handle {
- struct list_head lgh_list;
- struct llog_cookie lgh_cookie;
struct semaphore lgh_lock;
+ struct llog_logid lgh_id; /* id of this log */
struct obd_device *lgh_obd;
- void *lgh_hdr;
+ struct llog_log_hdr *lgh_hdr;
struct file *lgh_file;
- struct obd_uuid *lgh_tgtuuid;
- struct llog_handle *lgh_current;
- struct llog_handle *(*lgh_log_create)(struct obd_device *obd);
- struct llog_handle *(*lgh_log_open)(struct obd_device *obd,
- struct llog_cookie *logcookie);
- int (*lgh_log_close)(struct llog_handle *cathandle,
- struct llog_handle *loghandle);
- int lgh_index;
+ int lgh_last_idx;
+ union {
+ struct plain_handle_data phd;
+ struct cat_handle_data chd;
+ } u;
};
-extern int llog_add_record(struct llog_handle *cathandle,
- struct llog_trans_hdr *rec,
- struct llog_cookie *logcookies);
+#define LLOG_EEMPTY 4711
+
+/* llog.c - general API */
+typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
+int llog_init_handle(struct llog_handle *handle, int flags, struct obd_uuid *uuid);
+extern void llog_free_handle(struct llog_handle *handle);
+
+
+/* llog_cat.c - catalog api */
+void llog_cat_put(struct llog_handle *cathandle);
+int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
+ struct llog_cookie *reccookie, void *buf);
+
extern int llog_cancel_records(struct llog_handle *cathandle, int count,
struct llog_cookie *cookies);
extern struct llog_handle *llog_alloc_handle(void);
-extern void llog_free_handle(struct llog_handle *handle);
extern int llog_init_catalog(struct llog_handle *cathandle,
struct obd_uuid *tgtuuid);
extern int llog_delete_log(struct llog_handle *cathandle,
struct llog_handle *loghandle);
extern struct llog_handle *llog_new_log(struct llog_handle *cathandle,
struct obd_uuid *tgtuuid);
+struct llog_operations {
+ int (*lop_write_rec)(struct llog_handle *loghandle,
+ struct llog_rec_hdr *rec,
+ struct llog_cookie *logcookies,
+ int numcookies,
+ void *,
+ int idx);
+ int (*lop_destroy)(struct llog_handle *handle);
+ int (*lop_next_block)(struct llog_handle *h,
+ int curr_idx,
+ int next_idx,
+ __u64 *cur_offset,
+ void *buf,
+ int len);
+ int (*lop_create)(struct obd_device *obd, struct llog_handle **,
+ struct llog_logid *logid, char *name);
+ int (*lop_close)(struct llog_handle *handle);
+ int (*lop_read_header)(struct llog_handle *handle);
+};
-#endif
+extern struct llog_operations llog_lvfs_ops;
+
+static inline int llog_obd2ops(struct obd_device *obd,
+ struct llog_operations **lop)
+{
+ struct obd_export *exp;
+
+ if (obd == NULL)
+ return -ENOTCONN;
+ exp = obd->obd_log_exp;
+ if (exp == NULL)
+ return -ENOTCONN;
+ if (exp->exp_obd == NULL)
+ return -ENOTCONN;
+ *lop = exp->exp_obd->obd_logops;
+ if (*lop == NULL)
+ return -EOPNOTSUPP;
+ return 0;
+}
+
+static inline int llog_handle2ops(struct llog_handle *loghandle,
+ struct llog_operations **lop)
+{
+ if (loghandle == NULL)
+ return -EINVAL;
+ return llog_obd2ops(loghandle->lgh_obd, lop);
+}
+
+static inline int llog_close(struct llog_handle *loghandle)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_close == NULL)
+ RETURN(-EOPNOTSUPP);
+ rc = lop->lop_close(loghandle);
+ RETURN(rc);
+}
+static inline int llog_write_rec(struct llog_handle *handle,
+ struct llog_rec_hdr *rec,
+ struct llog_cookie *logcookies,
+ int numcookies, void *buf, int idx)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_write_rec == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_write_rec(handle, rec, logcookies, numcookies, buf, idx);
+ RETURN(rc);
+}
+
+static inline int llog_read_header(struct llog_handle *handle)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_read_header == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_read_header(handle);
+ RETURN(rc);
+}
+
+static inline int llog_destroy(struct llog_handle *handle)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_destroy == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_destroy(handle);
+ RETURN(rc);
+}
+
+#if 0
+static inline int llog_cancel(struct obd_export *exp,
+ struct lov_stripe_md *lsm, int count,
+ struct llog_cookie *cookies, int flags)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_cancel == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_cancel(exp, lsm, count, cookies, flags);
+ RETURN(rc);
+}
+#endif
+
+static inline int llog_next_block(struct llog_handle *loghandle, int cur_idx,
+ int next_idx, __u64 *cur_offset, void *buf,
+ int len)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_next_block == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_next_block(loghandle, cur_idx, next_idx, cur_offset, buf,
+ len);
+ RETURN(rc);
+}
+
+static inline int llog_create(struct obd_device *obd, struct llog_handle **res,
+ struct llog_logid *logid, char *name)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_obd2ops(obd, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_create == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_create(obd, res, logid, name);
+ RETURN(rc);
+}
+
+#endif
while ((this_char = strsep (&opt_ptr, ",")) != NULL) {
#endif
CDEBUG(D_SUPER, "this_char %s\n", this_char);
- if ((!*ost && (*ost = ll_read_opt("osc", this_char)))||
- (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
- (!(*flags & LL_SBI_NOLCK) &&
- ((*flags) = (*flags) |
- ll_set_opt("nolock", this_char, LL_SBI_NOLCK))))
+ if (!*ost && (*ost = ll_read_opt("osc", this_char)))
+ continue;
+ if (!*mds && (*mds = ll_read_opt("mdc", this_char)))
+ continue;
+ if (!(*flags & LL_SBI_NOLCK) &&
+ ((*flags) = (*flags) |
+ ll_set_opt("nolock", this_char,
+ LL_SBI_NOLCK)))
+ continue;
+ if (!(*flags & LL_SBI_READAHEAD) &&
+ ((*flags) = (*flags) |
+ ll_set_opt("readahead", this_char,
+ LL_SBI_READAHEAD)))
continue;
}
EXIT;
void ll_lli_init(struct ll_inode_info *lli)
{
sema_init(&lli->lli_open_sem, 1);
- spin_lock_init(&lli->lli_read_extent_lock);
- INIT_LIST_HEAD(&lli->lli_read_extents);
lli->lli_flags = 0;
lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
- spin_lock_init(&lli->lli_pg_lock);
- INIT_LIST_HEAD(&lli->lli_lc_item);
- plist_init(&lli->lli_pl_read);
- plist_init(&lli->lli_pl_write);
- atomic_set(&lli->lli_in_writepages, 0);
-#endif
}
int ll_fill_super(struct super_block *sb, void *data, int silent)
struct ll_fid rootfid;
struct obd_statfs osfs;
struct ptlrpc_request *request = NULL;
+ struct lustre_handle osc_conn = {0, };
+ struct lustre_handle mdc_conn = {0, };
struct lustre_md md;
class_uuid_t uuid;
CERROR("could not register mount in /proc/lustre");
}
- err = obd_connect(&sbi->ll_mdc_conn, obd, &sbi->ll_sb_uuid);
+ err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid);
if (err) {
CERROR("cannot connect to %s: rc = %d\n", mdc, err);
GOTO(out_free, err);
}
+ sbi->ll_mdc_exp = class_conn2export(&mdc_conn);
err = obd_statfs(obd, &osfs, jiffies - HZ);
if (err)
GOTO(out_mdc, err);
}
- err = obd_connect(&sbi->ll_osc_conn, obd, &sbi->ll_sb_uuid);
+ err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid);
if (err) {
CERROR("cannot connect to %s: rc = %d\n", osc, err);
GOTO(out_mdc, err);
}
+ sbi->ll_osc_exp = class_conn2export(&osc_conn);
- err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid);
+ err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
GOTO(out_osc, err);
/* make root inode
* XXX: move this to after cbd setup? */
- err = mdc_getattr(&sbi->ll_mdc_conn, &rootfid,
+ err = mdc_getattr(sbi->ll_mdc_exp, &rootfid,
OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request);
if (err) {
CERROR("mdc_getattr failed for root: rc = %d\n", err);
GOTO(out_osc, err);
}
- /* initialize committed transaction callback daemon */
- spin_lock_init(&sbi->ll_commitcbd_lock);
- init_waitqueue_head(&sbi->ll_commitcbd_waitq);
- init_waitqueue_head(&sbi->ll_commitcbd_ctl_waitq);
- sbi->ll_commitcbd_flags = 0;
- err = ll_commitcbd_setup(sbi);
- if (err) {
- CERROR("failed to start commit callback daemon: rc = %d\n",err);
- ptlrpc_req_finished (request);
- GOTO(out_lliod, err);
- }
-
- err = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md);
+ err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md);
if (err) {
CERROR("failed to understand root inode md: rc = %d\n",err);
ptlrpc_req_finished (request);
- GOTO(out_lliod, err);
+ GOTO(out_osc, err);
}
LASSERT(sbi->ll_rootino != 0);
if (root == NULL || is_bad_inode(root)) {
/* XXX might need iput() for bad inode */
CERROR("lustre_lite: bad iget4 for root\n");
- GOTO(out_cbd, err = -EBADF);
+ GOTO(out_osc, err = -EBADF);
}
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
- /* initialize the pagecache writeback thread */
- err = lliod_start(sbi, root);
- if (err) {
- CERROR("failed to start lliod: rc = %d\n",err);
- GOTO(out_root, sb = NULL);
- }
-#endif
sb->s_root = d_alloc_root(root);
out_dev:
RETURN(err);
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-out_root:
iput(root);
-#endif
-out_cbd:
- ll_commitcbd_cleanup(sbi);
-out_lliod:
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
- lliod_stop(sbi);
-#endif
out_osc:
- obd_disconnect(&sbi->ll_osc_conn, 0);
+ obd_disconnect(sbi->ll_osc_exp, 0);
out_mdc:
- obd_disconnect(&sbi->ll_mdc_conn, 0);
+ obd_disconnect(sbi->ll_mdc_exp, 0);
out_free:
lprocfs_unregister_mountpoint(sbi);
OBD_FREE(sbi, sizeof(*sbi));
void ll_put_super(struct super_block *sb)
{
struct ll_sb_info *sbi = ll_s2sbi(sb);
- struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn);
+ struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp);
struct hlist_node *tmp, *next;
struct ll_fid rootfid;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb);
list_del(&sbi->ll_conn_chain);
- ll_commitcbd_cleanup(sbi);
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
- lliod_stop(sbi);
-#endif
- obd_disconnect(&sbi->ll_osc_conn, 0);
+ obd_disconnect(sbi->ll_osc_exp, 0);
/* NULL request to force sync on the MDS, and get the last_committed
* value to flush remaining RPCs from the sending queue on client.
* which we can call for other reasons as well.
*/
if (!obd->obd_no_recov)
- mdc_getstatus(&sbi->ll_mdc_conn, &rootfid);
+ mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
lprocfs_unregister_mountpoint(sbi);
if (sbi->ll_proc_root) {
sbi->ll_proc_root = NULL;
}
- obd_disconnect(&sbi->ll_mdc_conn, 0);
+ obd_disconnect(sbi->ll_mdc_exp, 0);
-#warning Why do we need this?
+#warning We do this to get rid of orphaned dentries. That is not really trw.
spin_lock(&dcache_lock);
hlist_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) {
struct dentry *dentry = hlist_entry(tmp, struct dentry, d_hash);
inode->i_generation, inode);
ll_inode2fid(&fid, inode);
- mdc_change_cbdata(&sbi->ll_mdc_conn, &fid, null_if_equal, inode);
+ clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags));
+ mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode);
if (lli->lli_smd)
- obd_change_cbdata(&sbi->ll_osc_conn, lli->lli_smd,
+ obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd,
null_if_equal, inode);
if (lli->lli_smd) {
- obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd);
+ obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd);
lli->lli_smd = NULL;
}
EXIT;
}
-/* like inode_setattr, but doesn't mark the inode dirty */
-int ll_attr2inode(struct inode *inode, struct iattr *attr, int trunc)
-{
- unsigned int ia_valid = attr->ia_valid;
- int error = 0;
-
- if ((ia_valid & ATTR_SIZE) && trunc) {
- if (attr->ia_size > ll_file_maxbytes(inode)) {
- error = -EFBIG;
- goto out;
- }
- error = vmtruncate(inode, attr->ia_size);
- if (error)
- goto out;
- } else if (ia_valid & ATTR_SIZE)
- inode->i_size = attr->ia_size;
-
- if (ia_valid & ATTR_UID)
- inode->i_uid = attr->ia_uid;
- if (ia_valid & ATTR_GID)
- inode->i_gid = attr->ia_gid;
- if (ia_valid & ATTR_ATIME)
- inode->i_atime = attr->ia_atime;
- if (ia_valid & ATTR_MTIME)
- inode->i_mtime = attr->ia_mtime;
- if (ia_valid & ATTR_CTIME)
- inode->i_ctime = attr->ia_ctime;
- if (ia_valid & ATTR_MODE) {
- inode->i_mode = attr->ia_mode;
- if (!in_group_p(inode->i_gid) && !capable(CAP_FSETID))
- inode->i_mode &= ~S_ISGID;
- }
-out:
- return error;
-}
-
-int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc)
-{
- struct ptlrpc_request *request = NULL;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- int err = 0;
- ENTRY;
-
- /* change incore inode */
- err = ll_attr2inode(inode, attr, do_trunc);
- if (err)
- RETURN(err);
-
- /* Don't send size changes to MDS to avoid "fast EA" problems, and
- * also avoid a pointless RPC (we get file size from OST anyways).
- */
- attr->ia_valid &= ~ATTR_SIZE;
- if (attr->ia_valid) {
- struct mdc_op_data op_data;
-
- ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0);
- err = mdc_setattr(&sbi->ll_mdc_conn, &op_data,
- attr, NULL, 0, NULL, 0, &request);
- if (err)
- CERROR("mdc_setattr fails: err = %d\n", err);
-
- ptlrpc_req_finished(request);
- if (S_ISREG(inode->i_mode) && attr->ia_valid & ATTR_MTIME_SET) {
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- struct obdo oa;
- int err2;
-
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n",
- inode->i_ino, attr->ia_mtime);
- oa.o_mtime = attr->ia_mtime;
-#else
- CDEBUG(D_INODE, "set mtime on OST inode %lu to "
- LPU64"\n", inode->i_ino,
- ll_ts2u64(&attr->ia_mtime));
- oa.o_mtime = ll_ts2u64(&attr->ia_mtime);
-#endif
- oa.o_id = lsm->lsm_object_id;
- oa.o_mode = S_IFREG;
- oa.o_valid = OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME;
- err2 = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL);
- if (err2) {
- CERROR("obd_setattr fails: rc=%d\n", err);
- if (!err)
- err = err2;
- }
- }
- }
-
- RETURN(err);
-}
-
/* If this inode has objects allocated to it (lsm != NULL), then the OST
* object(s) determine the file size and mtime. Otherwise, the MDS will
* keep these values until such a time that objects are allocated for it.
struct lustre_md md;
ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0);
- rc = mdc_setattr(&sbi->ll_mdc_conn, &op_data,
- attr, NULL, 0, NULL, 0, &request);
+ rc = mdc_setattr(sbi->ll_mdc_exp, &op_data,
+ attr, NULL, 0, NULL, 0, &request);
if (rc) {
ptlrpc_req_finished(request);
RETURN(rc);
}
- rc = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md);
+ rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md);
if (rc) {
ptlrpc_req_finished(request);
RETURN(rc);
}
if (ia_valid & ATTR_SIZE) {
- struct ldlm_extent extent = { .start = attr->ia_size,
+ struct ldlm_extent extent = { .start = 0,
.end = OBD_OBJECT_EOF };
struct lustre_handle lockh = { 0 };
int err, ast_flags = 0;
if (extent.start == 0)
ast_flags = LDLM_AST_DISCARD_DATA;
/* bug 1639: avoid write/truncate i_sem/DLM deadlock */
- LASSERT(atomic_read(&inode->i_sem.count) == 0);
+ LASSERT(atomic_read(&inode->i_sem.count) <= 0);
up(&inode->i_sem);
rc = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW,
&extent, &lockh, ast_flags);
rc = vmtruncate(inode, attr->ia_size);
if (rc == 0)
- set_bit(LLI_F_HAVE_SIZE_LOCK,
+ set_bit(LLI_F_HAVE_OST_SIZE_LOCK,
&ll_i2info(inode)->lli_flags);
/* unlock now as we don't mind others file lockers racing with
oa.o_valid = OBD_MD_FLID;
obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
OBD_MD_FLMTIME | OBD_MD_FLCTIME);
- rc = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL);
+ rc = obd_setattr(sbi->ll_osc_exp, &oa, lsm, NULL);
if (rc)
CERROR("obd_setattr fails: rc=%d\n", rc);
}
int ll_setattr(struct dentry *de, struct iattr *attr)
{
- int rc = inode_change_ok(de->d_inode, attr);
- CDEBUG(D_VFSTRACE, "VFS Op:name=%s\n", de->d_name.name);
- if (rc)
- return rc;
-
- lprocfs_counter_incr(ll_i2sbi(de->d_inode)->ll_stats, LPROC_LL_SETATTR);
- return ll_inode_setattr(de->d_inode, attr, 1);
+ LBUG(); /* code is unused, but leave this in case of VFS changes */
+ RETURN(-ENOSYS);
}
int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs,
int rc;
ENTRY;
- rc = obd_statfs(class_conn2obd(&sbi->ll_mdc_conn), osfs, max_age);
+ rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age);
if (rc) {
CERROR("mdc_statfs fails: rc = %d\n", rc);
RETURN(rc);
CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files);
- rc = obd_statfs(class_conn2obd(&sbi->ll_osc_conn), &obd_osfs, max_age);
+ rc = obd_statfs(class_exp2obd(sbi->ll_osc_exp), &obd_osfs, max_age);
if (rc) {
CERROR("obd_statfs fails: rc = %d\n", rc);
RETURN(rc);
}
}
-int it_disposition(struct lookup_intent *it, int flag)
-{
- return it->d.lustre.it_disposition & flag;
-}
-
-void it_set_disposition(struct lookup_intent *it, int flag)
-{
- it->d.lustre.it_disposition |= flag;
-}
-
void ll_umount_begin(struct super_block *sb)
{
struct ll_sb_info *sbi = ll_s2sbi(sb);
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:\n");
- obd = class_conn2obd(&sbi->ll_mdc_conn);
+ obd = class_exp2obd(sbi->ll_mdc_exp);
if (obd == NULL) {
CERROR("Invalid MDC connection handle "LPX64"\n",
- sbi->ll_mdc_conn.cookie);
+ sbi->ll_mdc_exp->exp_handle.h_cookie);
EXIT;
return;
}
obd->obd_no_recov = 1;
- obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_mdc_conn, sizeof ioc_data,
+ obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_mdc_exp, sizeof ioc_data,
&ioc_data, NULL);
- obd = class_conn2obd(&sbi->ll_osc_conn);
+ obd = class_exp2obd(sbi->ll_osc_exp);
if (obd == NULL) {
CERROR("Invalid LOV connection handle "LPX64"\n",
- sbi->ll_osc_conn.cookie);
+ sbi->ll_osc_exp->exp_handle.h_cookie);
EXIT;
return;
}
obd->obd_no_recov = 1;
- obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_osc_conn, sizeof ioc_data,
+ obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_osc_exp, sizeof ioc_data,
&ioc_data, NULL);
/* Really, we'd like to wait until there are no requests outstanding,
void lov_free_memmd(struct lov_stripe_md **lsmp);
/* lov_pack.c */
-int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
+int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm,
struct lov_stripe_md *lsm);
-int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
+int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsm,
struct lov_mds_md *lmm, int lmmsize);
-int lov_setstripe(struct lustre_handle *conn,
+int lov_setstripe(struct obd_export *exp,
struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
-int lov_getstripe(struct lustre_handle *conn,
+int lov_getstripe(struct obd_export *exp,
struct lov_stripe_md *lsm, struct lov_mds_md *lmmu);
/* lproc_lov.c */
include $(src)/../portals/Kernelenv
obj-y += mdc.o
-mdc-objs := mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o
+mdc-objs := mdc_locks.o mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (C) 2001-2003 Cluster File Systems, Inc.
- * Author: Andreas Dilger <adilger@clusterfs.com>
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- *
- * OST<->MDS recovery logging infrastructure.
- *
- * Invariants in implementation:
- * - we do not share logs among different OST<->MDS connections, so that
- * if an OST or MDS fails it need only look at log(s) relevant to itself
- */
-
-#define DEBUG_SUBSYSTEM S_LOG
-
-#ifndef EXPORT_SYMTAB
-#define EXPORT_SYMTAB
-#endif
-
-#include <linux/fs.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_log.h>
-#include <portals/list.h>
-
-/* Allocate a new log or catalog handle */
-struct llog_handle *llog_alloc_handle(void)
-{
- struct llog_handle *loghandle;
- ENTRY;
-
- OBD_ALLOC(loghandle, sizeof(*loghandle));
- if (loghandle == NULL)
- RETURN(ERR_PTR(-ENOMEM));
-
- OBD_ALLOC(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
- if (loghandle->lgh_hdr == NULL) {
- OBD_FREE(loghandle, sizeof(*loghandle));
- RETURN(ERR_PTR(-ENOMEM));
- }
-
- INIT_LIST_HEAD(&loghandle->lgh_list);
- sema_init(&loghandle->lgh_lock, 1);
-
- RETURN(loghandle);
-}
-EXPORT_SYMBOL(llog_alloc_handle);
-
-void llog_free_handle(struct llog_handle *loghandle)
-{
- if (!loghandle)
- return;
-
- list_del_init(&loghandle->lgh_list);
- OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
- OBD_FREE(loghandle, sizeof(*loghandle));
-}
-EXPORT_SYMBOL(llog_free_handle);
-
-/* Create a new log handle and add it to the open list.
- * This log handle will be closed when all of the records in it are removed.
- *
- * Assumes caller has already pushed us into the kernel context and is locking.
- */
-struct llog_handle *llog_new_log(struct llog_handle *cathandle,
- struct obd_uuid *tgtuuid)
-{
- struct llog_handle *loghandle;
- struct llog_object_hdr *llh;
- loff_t offset;
- int rc, index, bitmap_size, i;
- ENTRY;
-
- LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
-
- loghandle = cathandle->lgh_log_create(cathandle->lgh_obd);
- if (IS_ERR(loghandle))
- RETURN(loghandle);
-
- llh = loghandle->lgh_hdr;
- llh->llh_hdr.lth_type = LLOG_OBJECT_MAGIC;
- llh->llh_hdr.lth_len = llh->llh_hdr_end_len = sizeof(*llh);
- llh->llh_timestamp = LTIME_S(CURRENT_TIME);
- llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
- memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
- loghandle->lgh_tgtuuid = &llh->llh_tgtuuid;
-
- llh = cathandle->lgh_hdr;
- bitmap_size = sizeof(llh->llh_bitmap) * 8;
- /* This should basically always find the first entry free */
- for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) {
- index %= bitmap_size;
- if (ext2_set_bit(index, llh->llh_bitmap)) {
- /* XXX This should trigger log clean up or similar */
- CERROR("catalog index %d is still in use\n", index);
- } else {
- llh->llh_count = (index + 1) % bitmap_size;
- break;
- }
- }
- if (i == bitmap_size)
- CERROR("no free catalog slots for log...\n");
-
- CDEBUG(D_HA, "new recovery log "LPX64":%x catalog index %u\n",
- loghandle->lgh_cookie.lgc_lgl.lgl_oid,
- loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index);
- loghandle->lgh_cookie.lgc_index = index;
-
- offset = sizeof(*llh) + index * sizeof(loghandle->lgh_cookie);
-
- /* XXX Hmm, what to do if the catalog update fails? Under normal
- * operations we would clean this handle up anyways, and at
- * worst we leak some objects, but there is little point in
- * doing the logging in that case...
- *
- * We don't want to mark a catalog in-use if it wasn't written.
- * The only danger is if the OST crashes - the log is lost.
- */
- rc = lustre_fwrite(cathandle->lgh_file, &loghandle->lgh_cookie,
- sizeof(loghandle->lgh_cookie), &offset);
- if (rc != sizeof(loghandle->lgh_cookie)) {
- CERROR("error adding log "LPX64" to catalog: rc %d\n",
- loghandle->lgh_cookie.lgc_lgl.lgl_oid, rc);
- rc = rc < 0 ? : -ENOSPC;
- } else {
- offset = 0;
- rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
- &offset);
- if (rc != sizeof(*llh)) {
- CERROR("error marking catalog entry %d in use: rc %d\n",
- index, rc);
- rc = rc < 0 ? : -ENOSPC;
- }
- }
- cathandle->lgh_current = loghandle;
- list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list);
-
- RETURN(loghandle);
-}
-EXPORT_SYMBOL(llog_new_log);
-
-/* Assumes caller has already pushed us into the kernel context. */
-int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
-{
- struct llog_object_hdr *llh;
- loff_t offset = 0;
- int rc = 0;
- ENTRY;
-
- LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
-
- down(&cathandle->lgh_lock);
- llh = cathandle->lgh_hdr;
-
- if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
-write_hdr: llh->llh_hdr.lth_type = LLOG_CATALOG_MAGIC;
- llh->llh_hdr.lth_len = llh->llh_hdr_end_len = LLOG_CHUNK_SIZE;
- llh->llh_timestamp = LTIME_S(CURRENT_TIME);
- llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
- memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
- rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
- &offset);
- if (rc != LLOG_CHUNK_SIZE) {
- CERROR("error writing catalog header: rc %d\n", rc);
- OBD_FREE(llh, sizeof(*llh));
- if (rc >= 0)
- rc = -ENOSPC;
- } else
- rc = 0;
- } else {
- rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
- &offset);
- if (rc != LLOG_CHUNK_SIZE) {
- CERROR("error reading catalog header: rc %d\n", rc);
- /* Can we do much else if the header is bad? */
- goto write_hdr;
- } else
- rc = 0;
- }
-
- cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
- up(&cathandle->lgh_lock);
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_init_catalog);
-
-/* Return the currently active log handle. If the current log handle doesn't
- * have enough space left for the current record, start a new one.
- *
- * If reclen is 0, we only want to know what the currently active log is,
- * otherwise we get a lock on this log so nobody can steal our space.
- *
- * Assumes caller has already pushed us into the kernel context and is locking.
- */
-static struct llog_handle *llog_current_log(struct llog_handle *cathandle,
- int reclen)
-{
- struct llog_handle *loghandle = NULL;
- ENTRY;
-
- loghandle = cathandle->lgh_current;
- if (loghandle) {
- struct llog_object_hdr *llh = loghandle->lgh_hdr;
- if (llh->llh_count < sizeof(llh->llh_bitmap) * 8)
- RETURN(loghandle);
- }
-
- if (reclen)
- loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid);
- RETURN(loghandle);
-}
-
-/* Add a single record to the recovery log(s).
- * Returns number of bytes in returned logcookies, or negative error code.
- *
- * Assumes caller has already pushed us into the kernel context.
- */
-int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
- struct llog_cookie *logcookies)
-{
- struct llog_handle *loghandle;
- struct llog_object_hdr *llh;
- int reclen = rec->lth_len;
- struct file *file;
- loff_t offset;
- size_t left;
- int index;
- int rc;
- ENTRY;
-
- LASSERT(rec->lth_len <= LLOG_CHUNK_SIZE);
- down(&cathandle->lgh_lock);
- loghandle = llog_current_log(cathandle, reclen);
- if (IS_ERR(loghandle)) {
- up(&cathandle->lgh_lock);
- RETURN(PTR_ERR(loghandle));
- }
- down(&loghandle->lgh_lock);
- up(&cathandle->lgh_lock);
-
- llh = loghandle->lgh_hdr;
- file = loghandle->lgh_file;
-
- /* Make sure that records don't cross a chunk boundary, so we can
- * process them page-at-a-time if needed. If it will cross a chunk
- * boundary, write in a fake (but referenced) entry to pad the chunk.
- *
- * We know that llog_current_log() will return a loghandle that is
- * big enough to hold reclen, so all we care about is padding here.
- */
- left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
- if (left != 0 && left != reclen && left < reclen + LLOG_MIN_REC_SIZE) {
- struct llog_null_trans {
- struct llog_trans_hdr hdr;
- __u32 padding[6];
- } pad = { .hdr = { .lth_len = left } };
-
- LASSERT(left >= LLOG_MIN_REC_SIZE);
- if (left <= sizeof(pad))
- *(__u32 *)((char *)&pad + left - sizeof(__u32)) = left;
-
- rc = lustre_fwrite(loghandle->lgh_file, &pad,
- min(sizeof(pad), left),
- &loghandle->lgh_file->f_pos);
- if (rc != min(sizeof(pad), left)) {
- CERROR("error writing padding record: rc %d\n", rc);
- GOTO(out, rc = rc < 0 ? rc : -EIO);
- }
-
- left -= rc;
- if (left) {
- LASSERT(left >= sizeof(__u32));
- loghandle->lgh_file->f_pos += left - sizeof(__u32);
- rc = lustre_fwrite(loghandle->lgh_file, &pad,
- sizeof(__u32),
- &loghandle->lgh_file->f_pos);
- if (rc != sizeof(__u32)) {
- CERROR("error writing padding end: rc %d\n",
- rc);
- GOTO(out, rc < 0 ? rc : -ENOSPC);
- }
- }
-
- loghandle->lgh_index++;
- }
-
- index = loghandle->lgh_index++;
- if (ext2_set_bit(index, llh->llh_bitmap)) {
- CERROR("argh, index %u already set in log bitmap?\n", index);
- LBUG(); /* should never happen */
- }
- llh->llh_count++;
-
- offset = 0;
- rc = lustre_fwrite(loghandle->lgh_file, llh, sizeof(*llh), &offset);
- if (rc != sizeof(*llh)) {
- CERROR("error writing log header: rc %d\n", rc);
- GOTO(out, rc < 0 ? rc : -EIO);
- }
-
- rc = lustre_fwrite(loghandle->lgh_file, rec, reclen,
- &loghandle->lgh_file->f_pos);
- if (rc != reclen) {
- CERROR("error writing log record: rc %d\n", rc);
- GOTO(out, rc < 0 ? rc : -ENOSPC);
- }
-
- CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n",
- loghandle->lgh_cookie.lgc_lgl.lgl_oid,
- loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lth_len);
- *logcookies = loghandle->lgh_cookie;
- logcookies->lgc_index = index;
-
- rc = 0;
-out:
- up(&loghandle->lgh_lock);
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_add_record);
-
-/* Remove a log entry from the catalog.
- * Assumes caller has already pushed us into the kernel context and is locking.
- */
-int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle)
-{
- struct llog_cookie *lgc = &loghandle->lgh_cookie;
- int catindex = lgc->lgc_index;
- struct llog_object_hdr *llh = cathandle->lgh_hdr;
- loff_t offset = 0;
- int rc = 0;
- ENTRY;
-
- CDEBUG(D_HA, "log "LPX64":%x empty, closing\n",
- lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen);
-
- if (ext2_clear_bit(catindex, llh->llh_bitmap)) {
- CERROR("catalog index %u already clear?\n", catindex);
- LBUG();
- } else {
- rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
- &offset);
-
- if (rc != sizeof(*llh)) {
- CERROR("log %u cancel error: rc %d\n", catindex, rc);
- if (rc >= 0)
- rc = -EIO;
- } else
- rc = 0;
- }
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_delete_log);
-
-/* Assumes caller has already pushed us into the kernel context and is locking.
- * We return a lock on the handle to ensure nobody yanks it from us.
- */
-static struct llog_handle *llog_id2handle(struct llog_handle *cathandle,
- struct llog_cookie *logcookie)
-{
- struct llog_handle *loghandle;
- struct llog_logid *lgl = &logcookie->lgc_lgl;
- ENTRY;
-
- if (cathandle == NULL)
- RETURN(ERR_PTR(-EBADF));
-
- list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) {
- struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl;
- if (cgl->lgl_oid == lgl->lgl_oid) {
- if (cgl->lgl_ogen != lgl->lgl_ogen) {
- CERROR("log "LPX64" generation %x != %x\n",
- lgl->lgl_oid, cgl->lgl_ogen,
- lgl->lgl_ogen);
- continue;
- }
- GOTO(out, loghandle);
- }
- }
-
- loghandle = cathandle->lgh_log_open(cathandle->lgh_obd, logcookie);
- if (IS_ERR(loghandle)) {
- CERROR("error opening log id "LPX64":%x: rc %d\n",
- lgl->lgl_oid, lgl->lgl_ogen, (int)PTR_ERR(loghandle));
- } else {
- list_add(&loghandle->lgh_list, &cathandle->lgh_list);
- }
-
-out:
- RETURN(loghandle);
-}
-
-/* For each cookie in the cookie array, we clear the log in-use bit and either:
- * - the log is empty, so mark it free in the catalog header and delete it
- * - the log is not empty, just write out the log header
- *
- * The cookies may be in different log files, so we need to get new logs
- * each time.
- *
- * Assumes caller has already pushed us into the kernel context.
- */
-int llog_cancel_records(struct llog_handle *cathandle, int count,
- struct llog_cookie *cookies)
-{
- int i, rc = 0;
- ENTRY;
-
- down(&cathandle->lgh_lock);
- for (i = 0; i < count; i++, cookies++) {
- struct llog_handle *loghandle;
- struct llog_object_hdr *llh;
- struct llog_logid *lgl = &cookies->lgc_lgl;
-
- loghandle = llog_id2handle(cathandle, cookies);
- if (IS_ERR(loghandle)) {
- if (!rc)
- rc = PTR_ERR(loghandle);
- continue;
- }
-
- down(&loghandle->lgh_lock);
- llh = loghandle->lgh_hdr;
- CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n",
- lgl->lgl_oid, cookies->lgc_index,
- ext2_test_bit(cookies->lgc_index, llh->llh_bitmap));
- if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) {
- CERROR("log index %u in "LPX64":%x already clear?\n",
- cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen);
- } else if (--llh->llh_count == 0 &&
- loghandle != llog_current_log(cathandle, 0)) {
- loghandle->lgh_log_close(cathandle, loghandle);
- } else {
- loff_t offset = 0;
- int ret = lustre_fwrite(loghandle->lgh_file, llh,
- sizeof(*llh), &offset);
-
- if (ret != sizeof(*llh)) {
- CERROR("error cancelling index %u: rc %d\n",
- cookies->lgc_index, ret);
- /* XXX mark handle bad? */
- if (!rc)
- rc = ret;
- }
- }
- up(&loghandle->lgh_lock);
- }
- up(&cathandle->lgh_lock);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_cancel_records);
-
-int llog_close_log(struct llog_handle *cathandle, struct llog_handle *loghandle)
-{
- return loghandle->lgh_log_close(cathandle, loghandle);
-}
-EXPORT_SYMBOL(llog_close_log);
#include <linux/lustre_handles.h>
#include <linux/obd.h>
+#define FILTER_LAYOUT_VERSION "2"
+
#ifndef OBD_FILTER_DEVICENAME
# define OBD_FILTER_DEVICENAME "obdfilter"
#endif
#endif
#define LAST_RCVD "last_rcvd"
-#define FILTER_INIT_OBJID 2
+#define FILTER_INIT_OBJID 0
#define FILTER_LR_SERVER_SIZE 512
#define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */
+#define FILTER_GROUPS 2 /* must be at least 2; not dynamic yet */
#define FILTER_MOUNT_RECOV 2
#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
struct filter_server_data {
__u8 fsd_uuid[37]; /* server UUID */
__u8 fsd_uuid_padding[3]; /* unused */
- __u64 fsd_last_objid; /* last created object ID */
+ __u64 fsd_unused;
__u64 fsd_last_transno; /* last completed transaction ID */
__u64 fsd_mount_count; /* FILTER incarnation number */
__u32 fsd_feature_compat; /* compatible feature flags */
__u8 fcd_padding[FILTER_LR_CLIENT_SIZE - 64];
};
-/* file data for open files on OST */
-struct filter_file_data {
- struct portals_handle ffd_handle;
- atomic_t ffd_refcount;
- struct list_head ffd_export_list; /* export open list - fed_lock */
- struct file *ffd_file; /* file handle */
-};
-
-struct filter_dentry_data {
- struct llog_cookie fdd_cookie;
- obd_id fdd_objid;
- __u32 fdd_magic;
- atomic_t fdd_open_count;
- int fdd_flags;
-};
-
#define FILTER_DENTRY_MAGIC 0x9efba101
#define FILTER_FLAG_DESTROY 0x0001 /* destroy dentry on last file close */
};
/* filter.c */
-struct dentry *filter_parent(struct obd_device *, obd_mode mode, obd_id objid);
-struct dentry *filter_parent_lock(struct obd_device *, obd_mode mode,
- obd_id objid, ldlm_mode_t lock_mode,
- struct lustre_handle *lockh);
+struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid);
+struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id,
+ ldlm_mode_t, struct lustre_handle *);
void f_dput(struct dentry *);
struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir,
- obd_mode mode, obd_id id);
+ obd_gr group, obd_id id);
struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
const char *what);
#define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__)
int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc);
-__u64 filter_next_id(struct filter_obd *);
+__u64 filter_next_id(struct filter_obd *, struct obdo *);
int filter_update_server_data(struct obd_device *, struct file *,
struct filter_server_data *, int force_sync);
+int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
int filter_common_setup(struct obd_device *, obd_count len, void *buf,
char *option);
int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount,
struct obd_ioobj *, int niocount, struct niobuf_local *,
struct obd_trans_info *);
-int filter_brw(int cmd, struct lustre_handle *, struct obdo *,
+int filter_brw(int cmd, struct obd_export *, struct obdo *,
struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *,
struct obd_trans_info *);
+void flip_into_page_cache(struct inode *inode, struct page *new_page);
+
+/* filter_io_*.c */
+int filter_commitrw_write(struct obd_export *exp, int objcount,
+ struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res,
+ struct obd_trans_info *oti);
/* filter_log.c */
-int filter_log_cancel(struct lustre_handle *, struct lov_stripe_md *,
- int num_cookies, struct llog_cookie *, int flags);
int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
obd_id oid, obd_count ogen, struct llog_cookie *);
int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
obd_count ogen, struct llog_cookie *);
struct llog_handle *filter_get_catalog(struct obd_device *);
-void filter_put_catalog(struct llog_handle *cathandle);
+
/* filter_san.c */
int filter_san_setup(struct obd_device *obd, obd_count len, void *buf);
int filter_san_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
struct obd_ioobj *, int niocount, struct niobuf_remote *);
+
#endif
int rc;
page = grab_cache_page(mapping, index); /* locked page */
- if (IS_ERR(page))
- return lnb->rc = PTR_ERR(page);
+ if (page == NULL)
+ return lnb->rc = -ENOMEM;
LASSERT(page->mapping == mapping);
return lnb->rc;
}
-static struct page *lustre_get_page_write(struct inode *inode,
- unsigned long index)
-{
- struct address_space *mapping = inode->i_mapping;
- struct page *page;
- int rc;
-
- page = grab_cache_page(mapping, index); /* locked page */
-
- if (!IS_ERR(page)) {
- /* Note: Called with "O" and "PAGE_SIZE" this is essentially
- * a no-op for most filesystems, because we write the whole
- * page. For partial-page I/O this will read in the page.
- */
- rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
- if (rc) {
- CERROR("page index %lu, rc = %d\n", index, rc);
- if (rc != -ENOSPC)
- LBUG();
- GOTO(err_unlock, rc);
- }
- /* XXX not sure if we need this if we are overwriting page */
- if (PageError(page)) {
- CERROR("error on page index %lu, rc = %d\n", index, rc);
- LBUG();
- GOTO(err_unlock, rc = -EIO);
- }
- }
- return page;
-
-err_unlock:
- unlock_page(page);
- page_cache_release(page);
- return ERR_PTR(rc);
-}
-
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-int wait_on_page_locked(struct page *page)
-{
- waitfor_one_page(page);
- return 0;
-}
-
-/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data. */
-static inline void inode_update_time(struct inode *inode, int ctime_too)
-{
- time_t now = CURRENT_TIME;
- if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
- return;
- inode->i_mtime = now;
- if (ctime_too)
- inode->i_ctime = now;
- mark_inode_dirty_sync(inode);
-}
-#endif
-
-static int lustre_commit_write(struct niobuf_local *lnb)
-{
- struct page *page = lnb->page;
- unsigned from = lnb->offset & ~PAGE_MASK;
- unsigned to = from + lnb->len;
- struct inode *inode = page->mapping->host;
- int err;
-
- LASSERT(to <= PAGE_SIZE);
- err = page->mapping->a_ops->commit_write(NULL, page, from, to);
-#warning 2.4 folks: wait_on_page_locked does NOT return its error here.
- if (!err && IS_SYNC(inode))
- wait_on_page_locked(page);
- //SetPageUptodate(page); // the client commit_write will do this
-
- SetPageReferenced(page);
- unlock_page(page);
- page_cache_release(page);
- return err;
-}
-
-int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
- int *pglocked)
-{
- unsigned long index = lnb->offset >> PAGE_SHIFT;
- struct address_space *mapping = inode->i_mapping;
- struct page *page;
- int rc;
-
- //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
- if (*pglocked)
- page = grab_cache_page_nowait(mapping, index); /* locked page */
- else
- page = grab_cache_page(mapping, index); /* locked page */
-
-
- /* This page is currently locked, so get a temporary page instead. */
- if (page == NULL) {
- CDEBUG(D_INFO, "ino %lu page %ld locked\n", inode->i_ino,index);
- page = alloc_pages(GFP_KERNEL, 0); /* locked page */
- if (page == NULL) {
- CERROR("no memory for a temp page\n");
- GOTO(err, rc = -ENOMEM);
- }
- page->index = index;
- lnb->page = page;
- lnb->flags |= N_LOCAL_TEMP_PAGE;
- } else if (!IS_ERR(page)) {
- unsigned from = lnb->offset & ~PAGE_MASK, to = from + lnb->len;
- (*pglocked)++;
-
- rc = mapping->a_ops->prepare_write(NULL, page, from, to);
- if (rc) {
- if (rc != -ENOSPC)
- CERROR("page index %lu, rc = %d\n", index, rc);
- GOTO(err_unlock, rc);
- }
- /* XXX not sure if we need this if we are overwriting page */
- if (PageError(page)) {
- CERROR("error on page index %lu, rc = %d\n", index, rc);
- LBUG();
- GOTO(err_unlock, rc = -EIO);
- }
- lnb->page = page;
- }
-
- return 0;
-
-err_unlock:
- unlock_page(page);
- page_cache_release(page);
-err:
- return lnb->rc = rc;
-}
-
static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_remote *nb,
struct obd_run_ctxt saved;
struct obd_ioobj *o;
struct niobuf_remote *rnb;
- struct niobuf_local *lnb;
+ struct niobuf_local *lnb = NULL;
struct fsfilt_objinfo *fso;
struct dentry *dentry;
struct inode *inode;
memset(res, 0, niocount * sizeof(*res));
- push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+ push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
for (i = 0, o = obj; i < objcount; i++, o++) {
- struct filter_dentry_data *fdd;
LASSERT(o->ioo_bufcnt);
dentry = filter_oa2dentry(exp->exp_obd, oa);
fso[i].fso_dentry = dentry;
fso[i].fso_bufcnt = o->ioo_bufcnt;
-
- fdd = dentry->d_fsdata;
- if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
- CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
- o->ioo_id);
}
if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+ CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
+ else
+ CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
+ (jiffies - now));
for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
dentry = fso[i].fso_dentry;
}
if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+ CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
+ else
+ CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
+ (jiffies - now));
lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
tot_bytes);
}
if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+ CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
+ else
+ CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
+ (jiffies - now));
EXIT;
f_dput(res->dentry);
else
CERROR("NULL dentry in cleanup -- tell CFS\n");
- res->dentry = NULL;
case 0:
OBD_FREE(fso, objcount * sizeof(*fso));
- pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+ pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
}
return rc;
}
-/* We need to balance prepare_write() calls with commit_write() calls.
- * If the page has been prepared, but we have no data for it, we don't
- * want to overwrite valid data on disk, but we still need to zero out
- * data for space which was newly allocated. Like part of what happens
- * in __block_prepare_write() for newly allocated blocks.
- *
- * XXX currently __block_prepare_write() creates buffers for all the
- * pages, and the filesystems mark these buffers as BH_New if they
- * were newly allocated from disk. We use the BH_New flag similarly. */
-static int filter_commit_write(struct niobuf_local *lnb, int err)
+static int filter_start_page_write(struct inode *inode,
+ struct niobuf_local *lnb)
{
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- if (err) {
- unsigned block_start, block_end;
- struct buffer_head *bh, *head = lnb->page->buffers;
- unsigned blocksize = head->b_size;
-
- /* debugging: just seeing if this ever happens */
- CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
- "called for ino %lu:%lu on err %d\n",
- lnb->page->mapping->host->i_ino, lnb->page->index, err);
-
- /* Currently one buffer per page, but in the future... */
- for (bh = head, block_start = 0; bh != head || !block_start;
- block_start = block_end, bh = bh->b_this_page) {
- block_end = block_start + blocksize;
- if (buffer_new(bh)) {
- memset(kmap(lnb->page) + block_start, 0,
- blocksize);
- kunmap(lnb->page);
- }
- }
+ struct page *page = alloc_pages(GFP_HIGHUSER, 0);
+ if (page == NULL) {
+ CERROR("no memory for a temp page\n");
+ RETURN(lnb->rc = -ENOMEM);
}
-#endif
- return lustre_commit_write(lnb);
+ page->index = lnb->offset >> PAGE_SHIFT;
+ lnb->page = page;
+
+ return 0;
}
/* If we ever start to support multi-object BRW RPCs, we will need to get locks
struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
- struct obd_ioobj *o;
struct niobuf_remote *rnb;
- struct niobuf_local *lnb;
- struct fsfilt_objinfo *fso;
+ struct niobuf_local *lnb = NULL;
+ struct fsfilt_objinfo fso;
struct dentry *dentry;
- int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+ int rc = 0, i, tot_bytes = 0;
unsigned long now = jiffies;
ENTRY;
LASSERT(objcount == 1);
-
- OBD_ALLOC(fso, objcount * sizeof(*fso));
- if (fso == NULL)
- RETURN(-ENOMEM);
+ LASSERT(obj->ioo_bufcnt > 0);
memset(res, 0, niocount * sizeof(*res));
- push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
- for (i = 0, o = obj; i < objcount; i++, o++) {
- struct filter_dentry_data *fdd;
- LASSERT(o->ioo_bufcnt);
-
- dentry = filter_oa2dentry(exp->exp_obd, oa);
- if (IS_ERR(dentry))
- GOTO(out_objinfo, rc = PTR_ERR(dentry));
-
- if (dentry->d_inode == NULL) {
- CERROR("trying to BRW to non-existent file "LPU64"\n",
- o->ioo_id);
- f_dput(dentry);
- GOTO(out_objinfo, rc = -ENOENT);
- }
-
- fso[i].fso_dentry = dentry;
- fso[i].fso_bufcnt = o->ioo_bufcnt;
-
- down(&dentry->d_inode->i_sem);
- fdd = dentry->d_fsdata;
- if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
- CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
- o->ioo_id);
- }
+ push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+ dentry = filter_fid2dentry(exp->exp_obd, NULL, 0, obj->ioo_id);
+ if (IS_ERR(dentry))
+ GOTO(cleanup, rc = PTR_ERR(dentry));
- if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
-
- LASSERT(oti != NULL);
- oti->oti_handle = fsfilt_brw_start(exp->exp_obd, objcount, fso,
- niocount, oti);
- if (IS_ERR(oti->oti_handle)) {
- rc = PTR_ERR(oti->oti_handle);
- CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
- "error starting transaction: rc = %d\n", rc);
- oti->oti_handle = NULL;
- GOTO(out_objinfo, rc);
+ if (dentry->d_inode == NULL) {
+ CERROR("trying to BRW to non-existent file "LPU64"\n",
+ obj->ioo_id);
+ f_dput(dentry);
+ GOTO(cleanup, rc = -ENOENT);
}
- for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
- dentry = fso[i].fso_dentry;
- for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
- if (j == 0)
- lnb->dentry = dentry;
- else
- lnb->dentry = dget(dentry);
-
- lnb->offset = rnb->offset;
- lnb->len = rnb->len;
- lnb->flags = rnb->flags;
- lnb->start = jiffies;
-
- rc = filter_get_page_write(dentry->d_inode, lnb,
- &pglocked);
- if (rc)
- up(&dentry->d_inode->i_sem);
+ fso.fso_dentry = dentry;
+ fso.fso_bufcnt = obj->ioo_bufcnt;
- if (rc) {
- CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
- "page err %u@"LPU64" %u/%u %p: rc %d\n",
- lnb->len, lnb->offset, j, o->ioo_bufcnt,
- dentry, rc);
- f_dput(dentry);
- GOTO(out_pages, rc);
- }
- tot_bytes += lnb->len;
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
+ else
+ CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
+ (jiffies - now));
+
+ for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
+ i++, lnb++, rnb++) {
+ lnb->dentry = dentry;
+ lnb->offset = rnb->offset;
+ lnb->len = rnb->len;
+ lnb->flags = rnb->flags;
+ lnb->start = jiffies;
+
+ rc = filter_start_page_write(dentry->d_inode, lnb);
+ if (rc) {
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
+ LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
+ i, obj->ioo_bufcnt, dentry, rc);
+ while (lnb-- > res)
+ __free_pages(lnb->page, 0);
+ f_dput(dentry);
+ GOTO(cleanup, rc);
}
+ tot_bytes += lnb->len;
}
if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+ CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
+ else
+ CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
+ (jiffies - now));
lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
tot_bytes);
-
EXIT;
-out:
- OBD_FREE(fso, objcount * sizeof(*fso));
- /* we saved the journal handle into oti->oti_handle instead */
- current->journal_info = NULL;
- pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+cleanup:
+ pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
return rc;
-
-out_pages:
- while (lnb-- > res) {
- filter_commit_write(lnb, rc);
- up(&lnb->dentry->d_inode->i_sem);
- f_dput(lnb->dentry);
- }
- filter_finish_transno(exp, oti, rc);
- fsfilt_commit(exp->exp_obd,
- filter_parent(exp->exp_obd,S_IFREG,obj->ioo_id)->d_inode,
- oti->oti_handle, 0);
- goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
- for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
- up(&fso[i].fso_dentry->d_inode->i_sem);
- f_dput(fso[i].fso_dentry);
- }
- goto out;
}
int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
niocount, nb, res, oti);
LBUG();
-
return -EPROTO;
}
-/* It is highly unlikely that we would ever get an error here. The page we want
- * to get was previously locked, so it had to have already allocated the space,
- * and we were just writing over the same data, so there would be no hole in the
- * file.
- *
- * XXX: possibility of a race with truncate could exist, need to check that.
- * There are no guarantees w.r.t. write order even on a local filesystem,
- * although the normal response would be to return the number of bytes
- * successfully written and leave the rest to the app. */
-static int filter_write_locked_page(struct niobuf_local *lnb)
-{
- struct page *lpage;
- void *lpage_addr, *lnb_addr;
- int rc;
- ENTRY;
-
- lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
- if (IS_ERR(lpage)) {
- rc = PTR_ERR(lpage);
- CERROR("error getting locked page index %ld: rc = %d\n",
- lnb->page->index, rc);
- LBUG();
- lustre_commit_write(lnb);
- RETURN(rc);
- }
-
- /* 2 kmaps == vanishingly small deadlock opportunity */
- lpage_addr = kmap(lpage);
- lnb_addr = kmap(lnb->page);
-
- memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
-
- kunmap(lnb->page);
- kunmap(lpage);
-
- page_cache_release(lnb->page);
-
- lnb->page = lpage;
- rc = lustre_commit_write(lnb);
- if (rc)
- CERROR("error committing locked page %ld: rc = %d\n",
- lnb->page->index, rc);
- RETURN(rc);
-}
-
static int filter_commitrw_read(struct obd_export *exp, int objcount,
struct obd_ioobj *obj, int niocount,
struct niobuf_local *res,
RETURN(0);
}
-static int
-filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
- struct niobuf_local *res, struct obd_trans_info *oti)
+void flip_into_page_cache(struct inode *inode, struct page *new_page)
{
- struct obd_run_ctxt saved;
- struct obd_ioobj *o;
- struct niobuf_local *lnb;
- struct obd_device *obd = exp->exp_obd;
- int found_locked = 0, rc = 0, i;
- int nested_trans = current->journal_info != NULL;
- unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
+ struct page *old_page;
+ int rc;
ENTRY;
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-
- if (cmd & OBD_BRW_WRITE) {
- LASSERT(oti);
- LASSERT(current->journal_info == NULL ||
- current->journal_info == oti->oti_handle);
- current->journal_info = oti->oti_handle;
- }
-
- for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
- struct inode *inode;
- int j;
-
- /* If all of the page reads were beyond EOF, let's pretend
- * this read didn't really happen at all. */
- if (lnb->dentry == NULL) {
- oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
- continue;
- }
-
- inode = igrab(lnb->dentry->d_inode);
-
- if (cmd & OBD_BRW_WRITE) {
- /* FIXME: MULTI OBJECT BRW */
- if (oa && oa->o_valid & (OBD_MD_FLMTIME|OBD_MD_FLCTIME))
- obdo_refresh_inode(inode, oa, OBD_MD_FLATIME |
- OBD_MD_FLMTIME |
- OBD_MD_FLCTIME);
- else
- inode_update_time(lnb->dentry->d_inode, 1);
- } else if (oa && oa->o_valid & OBD_MD_FLATIME) {
- /* Note that we don't necessarily write this to disk */
- obdo_refresh_inode(inode, oa, OBD_MD_FLATIME);
- }
-
- for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
- if (lnb->page == NULL) {
- continue;
- }
-
- if (lnb->flags & N_LOCAL_TEMP_PAGE) {
- found_locked++;
- continue;
- }
-
- if (time_after(jiffies, lnb->start + 15 * HZ))
- CERROR("slow commitrw %lus (%lus)\n",
- (jiffies - lnb->start) / HZ,
- (jiffies - now) / HZ);
-
- if (cmd & OBD_BRW_WRITE) {
- int err = filter_commit_write(lnb, 0);
-
- if (!rc)
- rc = err;
- } else {
- page_cache_release(lnb->page);
- }
-
- f_dput(lnb->dentry);
- if (time_after(jiffies, lnb->start + 15 * HZ))
- CERROR("slow commit_write %lus (%lus)\n",
- (jiffies - lnb->start) / HZ,
- (jiffies - now) / HZ);
+ do {
+ /* the dlm is protecting us from read/write concurrency, so we
+ * expect this find_lock_page to return quickly. even if we
+ * race with another writer it won't be doing much work with
+ * the page locked. we do this 'cause t_c_p expects a
+ * locked page, and it wants to grab the pagecache lock
+ * as well. */
+ old_page = find_lock_page(inode->i_mapping, new_page->index);
+ if (old_page) {
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ truncate_complete_page(old_page);
+#else
+ truncate_complete_page(old_page->mapping, old_page);
+#endif
+ unlock_page(old_page);
+ page_cache_release(old_page);
}
- /* FIXME: MULTI OBJECT BRW */
- if (oa) {
- oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
- obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
+#if 0 /* this should be a /proc tunable someday */
+ /* racing o_directs (no locking ioctl) could race adding
+ * their pages, so we repeat the page invalidation unless
+ * we successfully added our new page */
+ rc = add_to_page_cache_unique(new_page, inode->i_mapping,
+ new_page->index,
+ page_hash(inode->i_mapping,
+ new_page->index));
+ if (rc == 0) {
+ /* add_to_page_cache clears uptodate|dirty and locks
+ * the page */
+ SetPageUptodate(new_page);
+ unlock_page(new_page);
}
+#else
+ rc = 0;
+#endif
+ } while (rc != 0);
- if (cmd & OBD_BRW_WRITE)
- up(&inode->i_sem);
-
- iput(inode);
- }
-
- for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
- i++, o++) {
- int j;
-
- for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
- int err;
- if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
- continue;
-
- if (time_after(jiffies, lnb->start + 15 * HZ))
- CERROR("slow commitrw locked %lus (%lus)\n",
- (jiffies - lnb->start) / HZ,
- (jiffies - now) / HZ);
-
- err = filter_write_locked_page(lnb);
- if (!rc)
- rc = err;
- f_dput(lnb->dentry);
- found_locked--;
-
- if (time_after(jiffies, lnb->start + 15 * HZ))
- CERROR("slow commit_write locked %lus (%lus)\n",
- (jiffies - lnb->start) / HZ,
- (jiffies - now) / HZ);
- }
- }
-
- if (cmd & OBD_BRW_WRITE) {
- /* We just want any dentry for the commit, for now */
- struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
- int err;
-
- rc = filter_finish_transno(exp, oti, rc);
- err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle,
- obd_sync_filter);
- if (err)
- rc = err;
- if (obd_sync_filter)
- LASSERT(oti->oti_transno <= obd->obd_last_committed);
- if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
- }
-
- LASSERT(nested_trans || current->journal_info == NULL);
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- RETURN(rc);
+ EXIT;
}
/* XXX needs to trickle its oa down */
struct niobuf_local *res, struct obd_trans_info *oti)
{
if (cmd == OBD_BRW_WRITE)
- return filter_commitrw_write(cmd, exp, oa, objcount, obj,
- niocount, res, oti);
+ return filter_commitrw_write(exp, objcount, obj, niocount,
+ res, oti);
if (cmd == OBD_BRW_READ)
return filter_commitrw_read(exp, objcount, obj, niocount,
res, oti);
return -EPROTO;
}
-int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
+int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *lsm, obd_count oa_bufs,
struct brw_page *pga, struct obd_trans_info *oti)
{
- struct obd_export *exp;
struct obd_ioobj ioo;
struct niobuf_local *lnb;
struct niobuf_remote *rnb;
int ret = 0;
ENTRY;
- exp = class_conn2export(conn);
- if (exp == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
- RETURN(-EINVAL);
- }
-
OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
else
memcpy(virt + off, addr + off, pga[i].count);
- kunmap(addr);
- kunmap(virt);
+ kunmap(lnb[i].page);
+ kunmap(pga[i].pg);
}
ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
if (rnb)
OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
- class_export_put(exp);
RETURN(ret);
}
#include "filter_internal.h"
-static struct llog_handle *filter_log_create(struct obd_device *obd);
-
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static int filter_log_close(struct llog_handle *cathandle,
- struct llog_handle *loghandle)
-{
- struct llog_object_hdr *llh = loghandle->lgh_hdr;
- struct file *file = loghandle->lgh_file;
- struct dentry *dparent = NULL, *dchild = NULL;
- struct lustre_handle parent_lockh;
- struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl;
- int rc;
- ENTRY;
-
- /* If we are going to delete this log, grab a ref before we close
- * it so we don't have to immediately do another lookup. */
- if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
- CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
- lgl->lgl_oid, lgl->lgl_ogen);
- dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG,
- lgl->lgl_oid,LCK_PW,&parent_lockh);
- if (IS_ERR(dparent)) {
- rc = PTR_ERR(dparent);
- CERROR("error locking parent, orphan log %*s: rc %d\n",
- file->f_dentry->d_name.len,
- file->f_dentry->d_name.name, rc);
- RETURN(rc);
- } else {
- dchild = dget(file->f_dentry);
- llog_delete_log(cathandle, loghandle);
- }
- } else {
- CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
- lgl->lgl_oid, lgl->lgl_ogen);
- }
-
- rc = filp_close(file, 0);
-
- llog_free_handle(loghandle); /* also removes loghandle from list */
-
- if (dchild != NULL) {
- int err = vfs_unlink(dparent->d_inode, dchild);
- if (err) {
- CERROR("error unlinking empty log %*s: rc %d\n",
- dchild->d_name.len, dchild->d_name.name, err);
- if (!rc)
- rc = err;
- }
- f_dput(dchild);
- ldlm_lock_decref(&parent_lockh, LCK_PW);
- }
- RETURN(rc);
-}
-
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static struct llog_handle *filter_log_open(struct obd_device *obd,
- struct llog_cookie *logcookie)
-{
- struct llog_logid *lgl = &logcookie->lgc_lgl;
- struct llog_handle *loghandle;
- struct dentry *dchild;
- int rc;
- ENTRY;
-
- loghandle = llog_alloc_handle();
- if (!loghandle)
- RETURN(ERR_PTR(-ENOMEM));
-
- dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid);
- if (IS_ERR(dchild))
- GOTO(out_handle, rc = PTR_ERR(dchild));
-
- if (dchild->d_inode == NULL) {
- CERROR("logcookie references non-existent object %*s\n",
- dchild->d_name.len, dchild->d_name.name);
- GOTO(out_dentry, rc = -ENOENT);
- }
-
- if (dchild->d_inode->i_generation != lgl->lgl_ogen) {
- CERROR("logcookie for %*s had different generation %x != %x\n",
- dchild->d_name.len, dchild->d_name.name,
- dchild->d_inode->i_generation, lgl->lgl_ogen);
- GOTO(out_dentry, rc = -ESTALE);
- }
-
- /* dentry_open does a dput(dchild) and mntput(mnt) on error */
- mntget(obd->u.filter.fo_vfsmnt);
- loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt,
- O_RDWR);
- if (IS_ERR(loghandle->lgh_file)) {
- rc = PTR_ERR(loghandle->lgh_file);
- CERROR("error opening logfile %*s: rc %d\n",
- dchild->d_name.len, dchild->d_name.name, rc);
- GOTO(out_dentry, rc);
- }
- memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
- loghandle->lgh_log_create = filter_log_create;
- loghandle->lgh_log_open = filter_log_open;
- loghandle->lgh_log_close = filter_log_close;
- loghandle->lgh_obd = obd;
- RETURN(loghandle);
-
-out_dentry:
- f_dput(dchild);
-out_handle:
- llog_free_handle(loghandle);
- RETURN(ERR_PTR(rc));
-}
-
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static struct llog_handle *filter_log_create(struct obd_device *obd)
-{
- struct filter_obd *filter = &obd->u.filter;
- struct lustre_handle parent_lockh;
- struct dentry *dparent, *dchild;
- struct llog_handle *loghandle;
- struct file *file;
- int err, rc;
- obd_id id;
- ENTRY;
-
- loghandle = llog_alloc_handle();
- if (!loghandle)
- RETURN(ERR_PTR(-ENOMEM));
-
- retry:
- id = filter_next_id(filter);
-
- dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh);
- if (IS_ERR(dparent))
- GOTO(out_ctxt, rc = PTR_ERR(dparent));
-
- dchild = filter_fid2dentry(obd, dparent, S_IFREG, id);
- if (IS_ERR(dchild))
- GOTO(out_lock, rc = PTR_ERR(dchild));
-
- if (dchild->d_inode != NULL) {
- /* This would only happen if lastobjid was bad on disk */
- CERROR("Serious error: objid %*s already exists; is this "
- "filesystem corrupt? I will try to work around it.\n",
- dchild->d_name.len, dchild->d_name.name);
- f_dput(dchild);
- ldlm_lock_decref(&parent_lockh, LCK_PW);
- goto retry;
- }
-
- rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
- if (rc) {
- CERROR("log create failed rc = %d\n", rc);
- GOTO(out_child, rc);
- }
-
- rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
- filter->fo_fsd, 0);
- if (rc) {
- CERROR("can't write lastobjid but log created: rc %d\n",rc);
- GOTO(out_destroy, rc);
- }
-
- /* dentry_open does a dput(dchild) and mntput(mnt) on error */
- mntget(filter->fo_vfsmnt);
- file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
- if (IS_ERR(file)) {
- rc = PTR_ERR(file);
- CERROR("error opening log file "LPX64": rc %d\n", id, rc);
- GOTO(out_destroy, rc);
- }
- ldlm_lock_decref(&parent_lockh, LCK_PW);
-
- loghandle->lgh_file = file;
- loghandle->lgh_cookie.lgc_lgl.lgl_oid = id;
- loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation;
- loghandle->lgh_log_create = filter_log_create;
- loghandle->lgh_log_open = filter_log_open;
- loghandle->lgh_log_close = filter_log_close;
- loghandle->lgh_obd = obd;
-
- RETURN(loghandle);
-
-out_destroy:
- err = vfs_unlink(dparent->d_inode, dchild);
- if (err)
- CERROR("error unlinking %*s on error: rc %d\n",
- dchild->d_name.len, dchild->d_name.name, err);
-out_child:
- f_dput(dchild);
-out_lock:
- ldlm_lock_decref(&parent_lockh, LCK_PW);
-out_ctxt:
- llog_free_handle(loghandle);
- RETURN(ERR_PTR(rc));
-}
-
/* This is called from filter_setup() and should be single threaded */
struct llog_handle *filter_get_catalog(struct obd_device *obd)
{
struct filter_server_data *fsd = filter->fo_fsd;
struct obd_run_ctxt saved;
struct llog_handle *cathandle = NULL;
+ struct llog_logid logid;
int rc;
ENTRY;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
if (fsd->fsd_catalog_oid) {
- struct llog_cookie catcookie;
-
- catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
- catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
- cathandle = filter_log_open(obd, &catcookie);
- if (IS_ERR(cathandle)) {
+ logid.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
+ logid.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
+ rc = llog_create(obd, &cathandle, &logid, NULL);
+ if (rc) {
CERROR("error opening catalog "LPX64":%x: rc %d\n",
- catcookie.lgc_lgl.lgl_oid,
- catcookie.lgc_lgl.lgl_ogen,
+ logid.lgl_oid, logid.lgl_ogen,
(int)PTR_ERR(cathandle));
fsd->fsd_catalog_oid = 0;
fsd->fsd_catalog_ogen = 0;
}
if (!fsd->fsd_catalog_oid) {
- struct llog_logid *lgl;
-
- cathandle = filter_log_create(obd);
- if (IS_ERR(cathandle)) {
- CERROR("error creating new catalog: rc %d\n",
- (int)PTR_ERR(cathandle));
+ rc = llog_create(obd, &cathandle, NULL, NULL);
+ if (rc) {
+ CERROR("error creating new catalog: rc %d\n", rc);
+ cathandle = ERR_PTR(rc);
GOTO(out, cathandle);
}
- lgl = &cathandle->lgh_cookie.lgc_lgl;
- fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
- fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
+ logid = cathandle->lgh_id;
+ fsd->fsd_catalog_oid = cpu_to_le64(logid.lgl_oid);
+ fsd->fsd_catalog_ogen = cpu_to_le32(logid.lgl_ogen);
rc = filter_update_server_data(obd, filter->fo_rcvd_filp,fsd,0);
if (rc) {
CERROR("error writing new catalog to disk: rc %d\n",rc);
}
}
- rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid);
+ rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->u.filter.fo_mdc_uuid);
if (rc)
GOTO(out_handle, rc);
out:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
RETURN(cathandle);
out_handle:
- filter_log_close(cathandle, cathandle);
+ llog_close(cathandle);
cathandle = ERR_PTR(rc);
goto out;
}
-void filter_put_catalog(struct llog_handle *cathandle)
-{
- struct llog_handle *loghandle, *n;
- int rc;
- ENTRY;
-
- list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
- filter_log_close(cathandle, loghandle);
-
- rc = filp_close(cathandle->lgh_file, 0);
- if (rc)
- CERROR("error closing catalog: rc %d\n", rc);
-
- llog_free_handle(cathandle);
- EXIT;
-}
-
-int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
- int num_cookies, struct llog_cookie *logcookies,
- int flags)
-{
- struct obd_device *obd = class_conn2obd(conn);
- struct obd_run_ctxt saved;
- int rc;
- ENTRY;
-
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies,
- logcookies);
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-
- RETURN(rc);
-}
int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
obd_id oid, obd_count ogen,
OBD_ALLOC(lcr, sizeof(*lcr));
if (lcr == NULL)
RETURN(-ENOMEM);
- lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr);
- lcr->lcr_hdr.lth_type = OST_CREATE_REC;
+ lcr->lcr_hdr.lrh_len = lcr->lcr_tail.lrt_len = sizeof(*lcr);
+ lcr->lcr_hdr.lrh_type = OST_CREATE_REC;
lcr->lcr_fid.id = mds_fid->id;
lcr->lcr_fid.generation = mds_fid->generation;
lcr->lcr_fid.f_type = mds_fid->f_type;
lcr->lcr_oid = oid;
lcr->lcr_ogen = ogen;
- rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie);
+ rc = llog_cat_add_rec(cathandle, &lcr->lcr_hdr, logcookie, NULL);
OBD_FREE(lcr, sizeof(*lcr));
if (rc > 0) {
OBD_ALLOC(lor, sizeof(*lor));
if (lor == NULL)
RETURN(-ENOMEM);
- lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor);
- lor->lor_hdr.lth_type = OST_ORPHAN_REC;
+ lor->lor_hdr.lrh_len = lor->lor_tail.lrt_len = sizeof(*lor);
+ lor->lor_hdr.lrh_type = OST_ORPHAN_REC;
lor->lor_oid = oid;
lor->lor_ogen = ogen;
- rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie);
+ rc = llog_cat_add_rec(cathandle, &lor->lor_hdr, logcookie, NULL);
if (rc > 0) {
LASSERT(rc == sizeof(*logcookie));
obj-y += ptlrpc.o
ptlrpc-objs := recover.o connection.o ptlrpc_module.o events.o service.o \
client.o niobuf.o pack_generic.o lproc_ptlrpc.o pinger.o \
- recov_thread.o ptlrpc_lib.o
+ recov_thread.o ptlrpc_lib.o import.o