From 60f6258cb27b12e4146bc2f913b2bb990b6f2d35 Mon Sep 17 00:00:00 2001 From: ericm Date: Tue, 9 Sep 2003 14:37:12 +0000 Subject: [PATCH] merge b_devel -> b_eq: 20030909 only kernel pass sanity, liblustre still broken --- lustre/include/linux/lustre_log.h | 217 +++++++++++++- lustre/llite/llite_lib.c | 240 ++++----------- lustre/lov/lov_internal.h | 8 +- lustre/mdc/Makefile.mk | 2 +- lustre/obdclass/recov_log.c | 470 ------------------------------ lustre/obdfilter/filter_internal.h | 49 ++-- lustre/obdfilter/filter_io.c | 581 ++++++++----------------------------- lustre/obdfilter/filter_log.c | 279 ++---------------- lustre/ptlrpc/Makefile.mk | 2 +- 9 files changed, 417 insertions(+), 1431 deletions(-) delete mode 100644 lustre/obdclass/recov_log.c diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index 2f21583..5d722b4 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -35,39 +35,56 @@ #ifndef _LUSTRE_LOG_H #define _LUSTRE_LOG_H +#include #include struct obd_trans_info; struct obd_device; struct lov_stripe_md; +struct plain_handle_data { + struct list_head phd_entry; + struct llog_cookie phd_cookie; /* cookie of this log in its cat */ + int phd_last_idx; +}; + +struct cat_handle_data { + struct list_head chd_head; + struct llog_handle *chd_current_log; /* currently open log */ +}; + /* In-memory descriptor for a log object or log catalog */ struct llog_handle { - struct list_head lgh_list; - struct llog_cookie lgh_cookie; struct semaphore lgh_lock; + struct llog_logid lgh_id; /* id of this log */ struct obd_device *lgh_obd; - void *lgh_hdr; + struct llog_log_hdr *lgh_hdr; struct file *lgh_file; - struct obd_uuid *lgh_tgtuuid; - struct llog_handle *lgh_current; - struct llog_handle *(*lgh_log_create)(struct obd_device *obd); - struct llog_handle *(*lgh_log_open)(struct obd_device *obd, - struct llog_cookie *logcookie); - int (*lgh_log_close)(struct llog_handle *cathandle, - struct llog_handle *loghandle); - int lgh_index; + int lgh_last_idx; + union { + struct plain_handle_data phd; + struct cat_handle_data chd; + } u; }; -extern int llog_add_record(struct llog_handle *cathandle, - struct llog_trans_hdr *rec, - struct llog_cookie *logcookies); +#define LLOG_EEMPTY 4711 + +/* llog.c - general API */ +typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *); +int llog_init_handle(struct llog_handle *handle, int flags, struct obd_uuid *uuid); +extern void llog_free_handle(struct llog_handle *handle); + + +/* llog_cat.c - catalog api */ +void llog_cat_put(struct llog_handle *cathandle); +int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, + struct llog_cookie *reccookie, void *buf); + extern int llog_cancel_records(struct llog_handle *cathandle, int count, struct llog_cookie *cookies); extern struct llog_handle *llog_alloc_handle(void); -extern void llog_free_handle(struct llog_handle *handle); extern int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid); extern int llog_delete_log(struct llog_handle *cathandle, @@ -76,6 +93,174 @@ extern int llog_close_log(struct llog_handle *cathandle, struct llog_handle *loghandle); extern struct llog_handle *llog_new_log(struct llog_handle *cathandle, struct obd_uuid *tgtuuid); +struct llog_operations { + int (*lop_write_rec)(struct llog_handle *loghandle, + struct llog_rec_hdr *rec, + struct llog_cookie *logcookies, + int numcookies, + void *, + int idx); + int (*lop_destroy)(struct llog_handle *handle); + int (*lop_next_block)(struct llog_handle *h, + int curr_idx, + int next_idx, + __u64 *cur_offset, + void *buf, + int len); + int (*lop_create)(struct obd_device *obd, struct llog_handle **, + struct llog_logid *logid, char *name); + int (*lop_close)(struct llog_handle *handle); + int (*lop_read_header)(struct llog_handle *handle); +}; -#endif +extern struct llog_operations llog_lvfs_ops; + +static inline int llog_obd2ops(struct obd_device *obd, + struct llog_operations **lop) +{ + struct obd_export *exp; + + if (obd == NULL) + return -ENOTCONN; + exp = obd->obd_log_exp; + if (exp == NULL) + return -ENOTCONN; + if (exp->exp_obd == NULL) + return -ENOTCONN; + *lop = exp->exp_obd->obd_logops; + if (*lop == NULL) + return -EOPNOTSUPP; + return 0; +} + +static inline int llog_handle2ops(struct llog_handle *loghandle, + struct llog_operations **lop) +{ + if (loghandle == NULL) + return -EINVAL; + return llog_obd2ops(loghandle->lgh_obd, lop); +} + +static inline int llog_close(struct llog_handle *loghandle) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_handle2ops(loghandle, &lop); + if (rc) + RETURN(rc); + if (lop->lop_close == NULL) + RETURN(-EOPNOTSUPP); + rc = lop->lop_close(loghandle); + RETURN(rc); +} +static inline int llog_write_rec(struct llog_handle *handle, + struct llog_rec_hdr *rec, + struct llog_cookie *logcookies, + int numcookies, void *buf, int idx) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_handle2ops(handle, &lop); + if (rc) + RETURN(rc); + if (lop->lop_write_rec == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_write_rec(handle, rec, logcookies, numcookies, buf, idx); + RETURN(rc); +} + +static inline int llog_read_header(struct llog_handle *handle) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_handle2ops(handle, &lop); + if (rc) + RETURN(rc); + if (lop->lop_read_header == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_read_header(handle); + RETURN(rc); +} + +static inline int llog_destroy(struct llog_handle *handle) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_handle2ops(handle, &lop); + if (rc) + RETURN(rc); + if (lop->lop_destroy == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_destroy(handle); + RETURN(rc); +} + +#if 0 +static inline int llog_cancel(struct obd_export *exp, + struct lov_stripe_md *lsm, int count, + struct llog_cookie *cookies, int flags) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_handle2ops(loghandle, &lop); + if (rc) + RETURN(rc); + if (lop->lop_cancel == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_cancel(exp, lsm, count, cookies, flags); + RETURN(rc); +} +#endif + +static inline int llog_next_block(struct llog_handle *loghandle, int cur_idx, + int next_idx, __u64 *cur_offset, void *buf, + int len) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_handle2ops(loghandle, &lop); + if (rc) + RETURN(rc); + if (lop->lop_next_block == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_next_block(loghandle, cur_idx, next_idx, cur_offset, buf, + len); + RETURN(rc); +} + +static inline int llog_create(struct obd_device *obd, struct llog_handle **res, + struct llog_logid *logid, char *name) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_obd2ops(obd, &lop); + if (rc) + RETURN(rc); + if (lop->lop_create == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_create(obd, res, logid, name); + RETURN(rc); +} + +#endif diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 8c17001..667d8c2 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -100,11 +100,19 @@ void ll_options(char *options, char **ost, char **mds, int *flags) while ((this_char = strsep (&opt_ptr, ",")) != NULL) { #endif CDEBUG(D_SUPER, "this_char %s\n", this_char); - if ((!*ost && (*ost = ll_read_opt("osc", this_char)))|| - (!*mds && (*mds = ll_read_opt("mdc", this_char)))|| - (!(*flags & LL_SBI_NOLCK) && - ((*flags) = (*flags) | - ll_set_opt("nolock", this_char, LL_SBI_NOLCK)))) + if (!*ost && (*ost = ll_read_opt("osc", this_char))) + continue; + if (!*mds && (*mds = ll_read_opt("mdc", this_char))) + continue; + if (!(*flags & LL_SBI_NOLCK) && + ((*flags) = (*flags) | + ll_set_opt("nolock", this_char, + LL_SBI_NOLCK))) + continue; + if (!(*flags & LL_SBI_READAHEAD) && + ((*flags) = (*flags) | + ll_set_opt("readahead", this_char, + LL_SBI_READAHEAD))) continue; } EXIT; @@ -113,17 +121,8 @@ void ll_options(char *options, char **ost, char **mds, int *flags) void ll_lli_init(struct ll_inode_info *lli) { sema_init(&lli->lli_open_sem, 1); - spin_lock_init(&lli->lli_read_extent_lock); - INIT_LIST_HEAD(&lli->lli_read_extents); lli->lli_flags = 0; lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - spin_lock_init(&lli->lli_pg_lock); - INIT_LIST_HEAD(&lli->lli_lc_item); - plist_init(&lli->lli_pl_read); - plist_init(&lli->lli_pl_write); - atomic_set(&lli->lli_in_writepages, 0); -#endif } int ll_fill_super(struct super_block *sb, void *data, int silent) @@ -137,6 +136,8 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) struct ll_fid rootfid; struct obd_statfs osfs; struct ptlrpc_request *request = NULL; + struct lustre_handle osc_conn = {0, }; + struct lustre_handle mdc_conn = {0, }; struct lustre_md md; class_uuid_t uuid; @@ -179,11 +180,12 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) CERROR("could not register mount in /proc/lustre"); } - err = obd_connect(&sbi->ll_mdc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid); if (err) { CERROR("cannot connect to %s: rc = %d\n", mdc, err); GOTO(out_free, err); } + sbi->ll_mdc_exp = class_conn2export(&mdc_conn); err = obd_statfs(obd, &osfs, jiffies - HZ); if (err) @@ -201,13 +203,14 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) GOTO(out_mdc, err); } - err = obd_connect(&sbi->ll_osc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid); if (err) { CERROR("cannot connect to %s: rc = %d\n", osc, err); GOTO(out_mdc, err); } + sbi->ll_osc_exp = class_conn2export(&osc_conn); - err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid); + err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); GOTO(out_osc, err); @@ -219,30 +222,18 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) /* make root inode * XXX: move this to after cbd setup? */ - err = mdc_getattr(&sbi->ll_mdc_conn, &rootfid, + err = mdc_getattr(sbi->ll_mdc_exp, &rootfid, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); if (err) { CERROR("mdc_getattr failed for root: rc = %d\n", err); GOTO(out_osc, err); } - /* initialize committed transaction callback daemon */ - spin_lock_init(&sbi->ll_commitcbd_lock); - init_waitqueue_head(&sbi->ll_commitcbd_waitq); - init_waitqueue_head(&sbi->ll_commitcbd_ctl_waitq); - sbi->ll_commitcbd_flags = 0; - err = ll_commitcbd_setup(sbi); - if (err) { - CERROR("failed to start commit callback daemon: rc = %d\n",err); - ptlrpc_req_finished (request); - GOTO(out_lliod, err); - } - - err = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md); + err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); if (err) { CERROR("failed to understand root inode md: rc = %d\n",err); ptlrpc_req_finished (request); - GOTO(out_lliod, err); + GOTO(out_osc, err); } LASSERT(sbi->ll_rootino != 0); @@ -253,17 +244,9 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) if (root == NULL || is_bad_inode(root)) { /* XXX might need iput() for bad inode */ CERROR("lustre_lite: bad iget4 for root\n"); - GOTO(out_cbd, err = -EBADF); + GOTO(out_osc, err = -EBADF); } -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - /* initialize the pagecache writeback thread */ - err = lliod_start(sbi, root); - if (err) { - CERROR("failed to start lliod: rc = %d\n",err); - GOTO(out_root, sb = NULL); - } -#endif sb->s_root = d_alloc_root(root); out_dev: @@ -274,20 +257,11 @@ out_dev: RETURN(err); -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) -out_root: iput(root); -#endif -out_cbd: - ll_commitcbd_cleanup(sbi); -out_lliod: -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - lliod_stop(sbi); -#endif out_osc: - obd_disconnect(&sbi->ll_osc_conn, 0); + obd_disconnect(sbi->ll_osc_exp, 0); out_mdc: - obd_disconnect(&sbi->ll_mdc_conn, 0); + obd_disconnect(sbi->ll_mdc_exp, 0); out_free: lprocfs_unregister_mountpoint(sbi); OBD_FREE(sbi, sizeof(*sbi)); @@ -298,18 +272,14 @@ out_free: void ll_put_super(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); - struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn); + struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp); struct hlist_node *tmp, *next; struct ll_fid rootfid; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); list_del(&sbi->ll_conn_chain); - ll_commitcbd_cleanup(sbi); -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - lliod_stop(sbi); -#endif - obd_disconnect(&sbi->ll_osc_conn, 0); + obd_disconnect(sbi->ll_osc_exp, 0); /* NULL request to force sync on the MDS, and get the last_committed * value to flush remaining RPCs from the sending queue on client. @@ -318,7 +288,7 @@ void ll_put_super(struct super_block *sb) * which we can call for other reasons as well. */ if (!obd->obd_no_recov) - mdc_getstatus(&sbi->ll_mdc_conn, &rootfid); + mdc_getstatus(sbi->ll_mdc_exp, &rootfid); lprocfs_unregister_mountpoint(sbi); if (sbi->ll_proc_root) { @@ -326,9 +296,9 @@ void ll_put_super(struct super_block *sb) sbi->ll_proc_root = NULL; } - obd_disconnect(&sbi->ll_mdc_conn, 0); + obd_disconnect(sbi->ll_mdc_exp, 0); -#warning Why do we need this? +#warning We do this to get rid of orphaned dentries. That is not really trw. spin_lock(&dcache_lock); hlist_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) { struct dentry *dentry = hlist_entry(tmp, struct dentry, d_hash); @@ -376,14 +346,15 @@ void ll_clear_inode(struct inode *inode) inode->i_generation, inode); ll_inode2fid(&fid, inode); - mdc_change_cbdata(&sbi->ll_mdc_conn, &fid, null_if_equal, inode); + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags)); + mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode); if (lli->lli_smd) - obd_change_cbdata(&sbi->ll_osc_conn, lli->lli_smd, + obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd, null_if_equal, inode); if (lli->lli_smd) { - obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd); + obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd); lli->lli_smd = NULL; } @@ -396,98 +367,6 @@ void ll_clear_inode(struct inode *inode) EXIT; } -/* like inode_setattr, but doesn't mark the inode dirty */ -int ll_attr2inode(struct inode *inode, struct iattr *attr, int trunc) -{ - unsigned int ia_valid = attr->ia_valid; - int error = 0; - - if ((ia_valid & ATTR_SIZE) && trunc) { - if (attr->ia_size > ll_file_maxbytes(inode)) { - error = -EFBIG; - goto out; - } - error = vmtruncate(inode, attr->ia_size); - if (error) - goto out; - } else if (ia_valid & ATTR_SIZE) - inode->i_size = attr->ia_size; - - if (ia_valid & ATTR_UID) - inode->i_uid = attr->ia_uid; - if (ia_valid & ATTR_GID) - inode->i_gid = attr->ia_gid; - if (ia_valid & ATTR_ATIME) - inode->i_atime = attr->ia_atime; - if (ia_valid & ATTR_MTIME) - inode->i_mtime = attr->ia_mtime; - if (ia_valid & ATTR_CTIME) - inode->i_ctime = attr->ia_ctime; - if (ia_valid & ATTR_MODE) { - inode->i_mode = attr->ia_mode; - if (!in_group_p(inode->i_gid) && !capable(CAP_FSETID)) - inode->i_mode &= ~S_ISGID; - } -out: - return error; -} - -int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc) -{ - struct ptlrpc_request *request = NULL; - struct ll_sb_info *sbi = ll_i2sbi(inode); - int err = 0; - ENTRY; - - /* change incore inode */ - err = ll_attr2inode(inode, attr, do_trunc); - if (err) - RETURN(err); - - /* Don't send size changes to MDS to avoid "fast EA" problems, and - * also avoid a pointless RPC (we get file size from OST anyways). - */ - attr->ia_valid &= ~ATTR_SIZE; - if (attr->ia_valid) { - struct mdc_op_data op_data; - - ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); - err = mdc_setattr(&sbi->ll_mdc_conn, &op_data, - attr, NULL, 0, NULL, 0, &request); - if (err) - CERROR("mdc_setattr fails: err = %d\n", err); - - ptlrpc_req_finished(request); - if (S_ISREG(inode->i_mode) && attr->ia_valid & ATTR_MTIME_SET) { - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - struct obdo oa; - int err2; - -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", - inode->i_ino, attr->ia_mtime); - oa.o_mtime = attr->ia_mtime; -#else - CDEBUG(D_INODE, "set mtime on OST inode %lu to " - LPU64"\n", inode->i_ino, - ll_ts2u64(&attr->ia_mtime)); - oa.o_mtime = ll_ts2u64(&attr->ia_mtime); -#endif - oa.o_id = lsm->lsm_object_id; - oa.o_mode = S_IFREG; - oa.o_valid = OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME; - err2 = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL); - if (err2) { - CERROR("obd_setattr fails: rc=%d\n", err); - if (!err) - err = err2; - } - } - } - - RETURN(err); -} - /* If this inode has objects allocated to it (lsm != NULL), then the OST * object(s) determine the file size and mtime. Otherwise, the MDS will * keep these values until such a time that objects are allocated for it. @@ -554,8 +433,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) struct lustre_md md; ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); - rc = mdc_setattr(&sbi->ll_mdc_conn, &op_data, - attr, NULL, 0, NULL, 0, &request); + rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, + attr, NULL, 0, NULL, 0, &request); if (rc) { ptlrpc_req_finished(request); @@ -564,7 +443,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) RETURN(rc); } - rc = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md); + rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); if (rc) { ptlrpc_req_finished(request); RETURN(rc); @@ -600,7 +479,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) } if (ia_valid & ATTR_SIZE) { - struct ldlm_extent extent = { .start = attr->ia_size, + struct ldlm_extent extent = { .start = 0, .end = OBD_OBJECT_EOF }; struct lustre_handle lockh = { 0 }; int err, ast_flags = 0; @@ -622,7 +501,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) if (extent.start == 0) ast_flags = LDLM_AST_DISCARD_DATA; /* bug 1639: avoid write/truncate i_sem/DLM deadlock */ - LASSERT(atomic_read(&inode->i_sem.count) == 0); + LASSERT(atomic_read(&inode->i_sem.count) <= 0); up(&inode->i_sem); rc = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW, &extent, &lockh, ast_flags); @@ -635,7 +514,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) rc = vmtruncate(inode, attr->ia_size); if (rc == 0) - set_bit(LLI_F_HAVE_SIZE_LOCK, + set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &ll_i2info(inode)->lli_flags); /* unlock now as we don't mind others file lockers racing with @@ -655,7 +534,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) oa.o_valid = OBD_MD_FLID; obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME); - rc = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL); + rc = obd_setattr(sbi->ll_osc_exp, &oa, lsm, NULL); if (rc) CERROR("obd_setattr fails: rc=%d\n", rc); } @@ -664,13 +543,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) int ll_setattr(struct dentry *de, struct iattr *attr) { - int rc = inode_change_ok(de->d_inode, attr); - CDEBUG(D_VFSTRACE, "VFS Op:name=%s\n", de->d_name.name); - if (rc) - return rc; - - lprocfs_counter_incr(ll_i2sbi(de->d_inode)->ll_stats, LPROC_LL_SETATTR); - return ll_inode_setattr(de->d_inode, attr, 1); + LBUG(); /* code is unused, but leave this in case of VFS changes */ + RETURN(-ENOSYS); } int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, @@ -681,7 +555,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, int rc; ENTRY; - rc = obd_statfs(class_conn2obd(&sbi->ll_mdc_conn), osfs, max_age); + rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age); if (rc) { CERROR("mdc_statfs fails: rc = %d\n", rc); RETURN(rc); @@ -690,7 +564,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); - rc = obd_statfs(class_conn2obd(&sbi->ll_osc_conn), &obd_osfs, max_age); + rc = obd_statfs(class_exp2obd(sbi->ll_osc_exp), &obd_osfs, max_age); if (rc) { CERROR("obd_statfs fails: rc = %d\n", rc); RETURN(rc); @@ -869,16 +743,6 @@ void ll_read_inode2(struct inode *inode, void *opaque) } } -int it_disposition(struct lookup_intent *it, int flag) -{ - return it->d.lustre.it_disposition & flag; -} - -void it_set_disposition(struct lookup_intent *it, int flag) -{ - it->d.lustre.it_disposition |= flag; -} - void ll_umount_begin(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); @@ -887,27 +751,27 @@ void ll_umount_begin(struct super_block *sb) ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:\n"); - obd = class_conn2obd(&sbi->ll_mdc_conn); + obd = class_exp2obd(sbi->ll_mdc_exp); if (obd == NULL) { CERROR("Invalid MDC connection handle "LPX64"\n", - sbi->ll_mdc_conn.cookie); + sbi->ll_mdc_exp->exp_handle.h_cookie); EXIT; return; } obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_mdc_conn, sizeof ioc_data, + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_mdc_exp, sizeof ioc_data, &ioc_data, NULL); - obd = class_conn2obd(&sbi->ll_osc_conn); + obd = class_exp2obd(sbi->ll_osc_exp); if (obd == NULL) { CERROR("Invalid LOV connection handle "LPX64"\n", - sbi->ll_osc_conn.cookie); + sbi->ll_osc_exp->exp_handle.h_cookie); EXIT; return; } obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_osc_conn, sizeof ioc_data, + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_osc_exp, sizeof ioc_data, &ioc_data, NULL); /* Really, we'd like to wait until there are no requests outstanding, diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index f3bc191..f9b629e 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -13,13 +13,13 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count); void lov_free_memmd(struct lov_stripe_md **lsmp); /* lov_pack.c */ -int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm, +int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm, struct lov_stripe_md *lsm); -int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm, +int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsm, struct lov_mds_md *lmm, int lmmsize); -int lov_setstripe(struct lustre_handle *conn, +int lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu); -int lov_getstripe(struct lustre_handle *conn, +int lov_getstripe(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_mds_md *lmmu); /* lproc_lov.c */ diff --git a/lustre/mdc/Makefile.mk b/lustre/mdc/Makefile.mk index b12e5fc..a93f1cf 100644 --- a/lustre/mdc/Makefile.mk +++ b/lustre/mdc/Makefile.mk @@ -6,4 +6,4 @@ include $(src)/../portals/Kernelenv obj-y += mdc.o -mdc-objs := mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o +mdc-objs := mdc_locks.o mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o diff --git a/lustre/obdclass/recov_log.c b/lustre/obdclass/recov_log.c deleted file mode 100644 index bff90f3..0000000 --- a/lustre/obdclass/recov_log.c +++ /dev/null @@ -1,470 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (C) 2001-2003 Cluster File Systems, Inc. - * Author: Andreas Dilger - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - * - * OST<->MDS recovery logging infrastructure. - * - * Invariants in implementation: - * - we do not share logs among different OST<->MDS connections, so that - * if an OST or MDS fails it need only look at log(s) relevant to itself - */ - -#define DEBUG_SUBSYSTEM S_LOG - -#ifndef EXPORT_SYMTAB -#define EXPORT_SYMTAB -#endif - -#include -#include -#include -#include - -/* Allocate a new log or catalog handle */ -struct llog_handle *llog_alloc_handle(void) -{ - struct llog_handle *loghandle; - ENTRY; - - OBD_ALLOC(loghandle, sizeof(*loghandle)); - if (loghandle == NULL) - RETURN(ERR_PTR(-ENOMEM)); - - OBD_ALLOC(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); - if (loghandle->lgh_hdr == NULL) { - OBD_FREE(loghandle, sizeof(*loghandle)); - RETURN(ERR_PTR(-ENOMEM)); - } - - INIT_LIST_HEAD(&loghandle->lgh_list); - sema_init(&loghandle->lgh_lock, 1); - - RETURN(loghandle); -} -EXPORT_SYMBOL(llog_alloc_handle); - -void llog_free_handle(struct llog_handle *loghandle) -{ - if (!loghandle) - return; - - list_del_init(&loghandle->lgh_list); - OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); - OBD_FREE(loghandle, sizeof(*loghandle)); -} -EXPORT_SYMBOL(llog_free_handle); - -/* Create a new log handle and add it to the open list. - * This log handle will be closed when all of the records in it are removed. - * - * Assumes caller has already pushed us into the kernel context and is locking. - */ -struct llog_handle *llog_new_log(struct llog_handle *cathandle, - struct obd_uuid *tgtuuid) -{ - struct llog_handle *loghandle; - struct llog_object_hdr *llh; - loff_t offset; - int rc, index, bitmap_size, i; - ENTRY; - - LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE); - - loghandle = cathandle->lgh_log_create(cathandle->lgh_obd); - if (IS_ERR(loghandle)) - RETURN(loghandle); - - llh = loghandle->lgh_hdr; - llh->llh_hdr.lth_type = LLOG_OBJECT_MAGIC; - llh->llh_hdr.lth_len = llh->llh_hdr_end_len = sizeof(*llh); - llh->llh_timestamp = LTIME_S(CURRENT_TIME); - llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); - memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid)); - loghandle->lgh_tgtuuid = &llh->llh_tgtuuid; - - llh = cathandle->lgh_hdr; - bitmap_size = sizeof(llh->llh_bitmap) * 8; - /* This should basically always find the first entry free */ - for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) { - index %= bitmap_size; - if (ext2_set_bit(index, llh->llh_bitmap)) { - /* XXX This should trigger log clean up or similar */ - CERROR("catalog index %d is still in use\n", index); - } else { - llh->llh_count = (index + 1) % bitmap_size; - break; - } - } - if (i == bitmap_size) - CERROR("no free catalog slots for log...\n"); - - CDEBUG(D_HA, "new recovery log "LPX64":%x catalog index %u\n", - loghandle->lgh_cookie.lgc_lgl.lgl_oid, - loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index); - loghandle->lgh_cookie.lgc_index = index; - - offset = sizeof(*llh) + index * sizeof(loghandle->lgh_cookie); - - /* XXX Hmm, what to do if the catalog update fails? Under normal - * operations we would clean this handle up anyways, and at - * worst we leak some objects, but there is little point in - * doing the logging in that case... - * - * We don't want to mark a catalog in-use if it wasn't written. - * The only danger is if the OST crashes - the log is lost. - */ - rc = lustre_fwrite(cathandle->lgh_file, &loghandle->lgh_cookie, - sizeof(loghandle->lgh_cookie), &offset); - if (rc != sizeof(loghandle->lgh_cookie)) { - CERROR("error adding log "LPX64" to catalog: rc %d\n", - loghandle->lgh_cookie.lgc_lgl.lgl_oid, rc); - rc = rc < 0 ? : -ENOSPC; - } else { - offset = 0; - rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh), - &offset); - if (rc != sizeof(*llh)) { - CERROR("error marking catalog entry %d in use: rc %d\n", - index, rc); - rc = rc < 0 ? : -ENOSPC; - } - } - cathandle->lgh_current = loghandle; - list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list); - - RETURN(loghandle); -} -EXPORT_SYMBOL(llog_new_log); - -/* Assumes caller has already pushed us into the kernel context. */ -int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid) -{ - struct llog_object_hdr *llh; - loff_t offset = 0; - int rc = 0; - ENTRY; - - LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE); - - down(&cathandle->lgh_lock); - llh = cathandle->lgh_hdr; - - if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) { -write_hdr: llh->llh_hdr.lth_type = LLOG_CATALOG_MAGIC; - llh->llh_hdr.lth_len = llh->llh_hdr_end_len = LLOG_CHUNK_SIZE; - llh->llh_timestamp = LTIME_S(CURRENT_TIME); - llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); - memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid)); - rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, - &offset); - if (rc != LLOG_CHUNK_SIZE) { - CERROR("error writing catalog header: rc %d\n", rc); - OBD_FREE(llh, sizeof(*llh)); - if (rc >= 0) - rc = -ENOSPC; - } else - rc = 0; - } else { - rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, - &offset); - if (rc != LLOG_CHUNK_SIZE) { - CERROR("error reading catalog header: rc %d\n", rc); - /* Can we do much else if the header is bad? */ - goto write_hdr; - } else - rc = 0; - } - - cathandle->lgh_tgtuuid = &llh->llh_tgtuuid; - up(&cathandle->lgh_lock); - RETURN(rc); -} -EXPORT_SYMBOL(llog_init_catalog); - -/* Return the currently active log handle. If the current log handle doesn't - * have enough space left for the current record, start a new one. - * - * If reclen is 0, we only want to know what the currently active log is, - * otherwise we get a lock on this log so nobody can steal our space. - * - * Assumes caller has already pushed us into the kernel context and is locking. - */ -static struct llog_handle *llog_current_log(struct llog_handle *cathandle, - int reclen) -{ - struct llog_handle *loghandle = NULL; - ENTRY; - - loghandle = cathandle->lgh_current; - if (loghandle) { - struct llog_object_hdr *llh = loghandle->lgh_hdr; - if (llh->llh_count < sizeof(llh->llh_bitmap) * 8) - RETURN(loghandle); - } - - if (reclen) - loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid); - RETURN(loghandle); -} - -/* Add a single record to the recovery log(s). - * Returns number of bytes in returned logcookies, or negative error code. - * - * Assumes caller has already pushed us into the kernel context. - */ -int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec, - struct llog_cookie *logcookies) -{ - struct llog_handle *loghandle; - struct llog_object_hdr *llh; - int reclen = rec->lth_len; - struct file *file; - loff_t offset; - size_t left; - int index; - int rc; - ENTRY; - - LASSERT(rec->lth_len <= LLOG_CHUNK_SIZE); - down(&cathandle->lgh_lock); - loghandle = llog_current_log(cathandle, reclen); - if (IS_ERR(loghandle)) { - up(&cathandle->lgh_lock); - RETURN(PTR_ERR(loghandle)); - } - down(&loghandle->lgh_lock); - up(&cathandle->lgh_lock); - - llh = loghandle->lgh_hdr; - file = loghandle->lgh_file; - - /* Make sure that records don't cross a chunk boundary, so we can - * process them page-at-a-time if needed. If it will cross a chunk - * boundary, write in a fake (but referenced) entry to pad the chunk. - * - * We know that llog_current_log() will return a loghandle that is - * big enough to hold reclen, so all we care about is padding here. - */ - left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1)); - if (left != 0 && left != reclen && left < reclen + LLOG_MIN_REC_SIZE) { - struct llog_null_trans { - struct llog_trans_hdr hdr; - __u32 padding[6]; - } pad = { .hdr = { .lth_len = left } }; - - LASSERT(left >= LLOG_MIN_REC_SIZE); - if (left <= sizeof(pad)) - *(__u32 *)((char *)&pad + left - sizeof(__u32)) = left; - - rc = lustre_fwrite(loghandle->lgh_file, &pad, - min(sizeof(pad), left), - &loghandle->lgh_file->f_pos); - if (rc != min(sizeof(pad), left)) { - CERROR("error writing padding record: rc %d\n", rc); - GOTO(out, rc = rc < 0 ? rc : -EIO); - } - - left -= rc; - if (left) { - LASSERT(left >= sizeof(__u32)); - loghandle->lgh_file->f_pos += left - sizeof(__u32); - rc = lustre_fwrite(loghandle->lgh_file, &pad, - sizeof(__u32), - &loghandle->lgh_file->f_pos); - if (rc != sizeof(__u32)) { - CERROR("error writing padding end: rc %d\n", - rc); - GOTO(out, rc < 0 ? rc : -ENOSPC); - } - } - - loghandle->lgh_index++; - } - - index = loghandle->lgh_index++; - if (ext2_set_bit(index, llh->llh_bitmap)) { - CERROR("argh, index %u already set in log bitmap?\n", index); - LBUG(); /* should never happen */ - } - llh->llh_count++; - - offset = 0; - rc = lustre_fwrite(loghandle->lgh_file, llh, sizeof(*llh), &offset); - if (rc != sizeof(*llh)) { - CERROR("error writing log header: rc %d\n", rc); - GOTO(out, rc < 0 ? rc : -EIO); - } - - rc = lustre_fwrite(loghandle->lgh_file, rec, reclen, - &loghandle->lgh_file->f_pos); - if (rc != reclen) { - CERROR("error writing log record: rc %d\n", rc); - GOTO(out, rc < 0 ? rc : -ENOSPC); - } - - CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n", - loghandle->lgh_cookie.lgc_lgl.lgl_oid, - loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lth_len); - *logcookies = loghandle->lgh_cookie; - logcookies->lgc_index = index; - - rc = 0; -out: - up(&loghandle->lgh_lock); - RETURN(rc); -} -EXPORT_SYMBOL(llog_add_record); - -/* Remove a log entry from the catalog. - * Assumes caller has already pushed us into the kernel context and is locking. - */ -int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle) -{ - struct llog_cookie *lgc = &loghandle->lgh_cookie; - int catindex = lgc->lgc_index; - struct llog_object_hdr *llh = cathandle->lgh_hdr; - loff_t offset = 0; - int rc = 0; - ENTRY; - - CDEBUG(D_HA, "log "LPX64":%x empty, closing\n", - lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen); - - if (ext2_clear_bit(catindex, llh->llh_bitmap)) { - CERROR("catalog index %u already clear?\n", catindex); - LBUG(); - } else { - rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh), - &offset); - - if (rc != sizeof(*llh)) { - CERROR("log %u cancel error: rc %d\n", catindex, rc); - if (rc >= 0) - rc = -EIO; - } else - rc = 0; - } - RETURN(rc); -} -EXPORT_SYMBOL(llog_delete_log); - -/* Assumes caller has already pushed us into the kernel context and is locking. - * We return a lock on the handle to ensure nobody yanks it from us. - */ -static struct llog_handle *llog_id2handle(struct llog_handle *cathandle, - struct llog_cookie *logcookie) -{ - struct llog_handle *loghandle; - struct llog_logid *lgl = &logcookie->lgc_lgl; - ENTRY; - - if (cathandle == NULL) - RETURN(ERR_PTR(-EBADF)); - - list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) { - struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl; - if (cgl->lgl_oid == lgl->lgl_oid) { - if (cgl->lgl_ogen != lgl->lgl_ogen) { - CERROR("log "LPX64" generation %x != %x\n", - lgl->lgl_oid, cgl->lgl_ogen, - lgl->lgl_ogen); - continue; - } - GOTO(out, loghandle); - } - } - - loghandle = cathandle->lgh_log_open(cathandle->lgh_obd, logcookie); - if (IS_ERR(loghandle)) { - CERROR("error opening log id "LPX64":%x: rc %d\n", - lgl->lgl_oid, lgl->lgl_ogen, (int)PTR_ERR(loghandle)); - } else { - list_add(&loghandle->lgh_list, &cathandle->lgh_list); - } - -out: - RETURN(loghandle); -} - -/* For each cookie in the cookie array, we clear the log in-use bit and either: - * - the log is empty, so mark it free in the catalog header and delete it - * - the log is not empty, just write out the log header - * - * The cookies may be in different log files, so we need to get new logs - * each time. - * - * Assumes caller has already pushed us into the kernel context. - */ -int llog_cancel_records(struct llog_handle *cathandle, int count, - struct llog_cookie *cookies) -{ - int i, rc = 0; - ENTRY; - - down(&cathandle->lgh_lock); - for (i = 0; i < count; i++, cookies++) { - struct llog_handle *loghandle; - struct llog_object_hdr *llh; - struct llog_logid *lgl = &cookies->lgc_lgl; - - loghandle = llog_id2handle(cathandle, cookies); - if (IS_ERR(loghandle)) { - if (!rc) - rc = PTR_ERR(loghandle); - continue; - } - - down(&loghandle->lgh_lock); - llh = loghandle->lgh_hdr; - CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n", - lgl->lgl_oid, cookies->lgc_index, - ext2_test_bit(cookies->lgc_index, llh->llh_bitmap)); - if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) { - CERROR("log index %u in "LPX64":%x already clear?\n", - cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen); - } else if (--llh->llh_count == 0 && - loghandle != llog_current_log(cathandle, 0)) { - loghandle->lgh_log_close(cathandle, loghandle); - } else { - loff_t offset = 0; - int ret = lustre_fwrite(loghandle->lgh_file, llh, - sizeof(*llh), &offset); - - if (ret != sizeof(*llh)) { - CERROR("error cancelling index %u: rc %d\n", - cookies->lgc_index, ret); - /* XXX mark handle bad? */ - if (!rc) - rc = ret; - } - } - up(&loghandle->lgh_lock); - } - up(&cathandle->lgh_lock); - - RETURN(rc); -} -EXPORT_SYMBOL(llog_cancel_records); - -int llog_close_log(struct llog_handle *cathandle, struct llog_handle *loghandle) -{ - return loghandle->lgh_log_close(cathandle, loghandle); -} -EXPORT_SYMBOL(llog_close_log); diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 9f0b5ed..81f2c66 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -16,6 +16,8 @@ #include #include +#define FILTER_LAYOUT_VERSION "2" + #ifndef OBD_FILTER_DEVICENAME # define OBD_FILTER_DEVICENAME "obdfilter" #endif @@ -25,7 +27,7 @@ #endif #define LAST_RCVD "last_rcvd" -#define FILTER_INIT_OBJID 2 +#define FILTER_INIT_OBJID 0 #define FILTER_LR_SERVER_SIZE 512 @@ -37,6 +39,7 @@ #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long)) #define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */ +#define FILTER_GROUPS 2 /* must be at least 2; not dynamic yet */ #define FILTER_MOUNT_RECOV 2 #define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */ @@ -45,7 +48,7 @@ struct filter_server_data { __u8 fsd_uuid[37]; /* server UUID */ __u8 fsd_uuid_padding[3]; /* unused */ - __u64 fsd_last_objid; /* last created object ID */ + __u64 fsd_unused; __u64 fsd_last_transno; /* last completed transaction ID */ __u64 fsd_mount_count; /* FILTER incarnation number */ __u32 fsd_feature_compat; /* compatible feature flags */ @@ -72,22 +75,6 @@ struct filter_client_data { __u8 fcd_padding[FILTER_LR_CLIENT_SIZE - 64]; }; -/* file data for open files on OST */ -struct filter_file_data { - struct portals_handle ffd_handle; - atomic_t ffd_refcount; - struct list_head ffd_export_list; /* export open list - fed_lock */ - struct file *ffd_file; /* file handle */ -}; - -struct filter_dentry_data { - struct llog_cookie fdd_cookie; - obd_id fdd_objid; - __u32 fdd_magic; - atomic_t fdd_open_count; - int fdd_flags; -}; - #define FILTER_DENTRY_MAGIC 0x9efba101 #define FILTER_FLAG_DESTROY 0x0001 /* destroy dentry on last file close */ @@ -103,21 +90,21 @@ enum { }; /* filter.c */ -struct dentry *filter_parent(struct obd_device *, obd_mode mode, obd_id objid); -struct dentry *filter_parent_lock(struct obd_device *, obd_mode mode, - obd_id objid, ldlm_mode_t lock_mode, - struct lustre_handle *lockh); +struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid); +struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id, + ldlm_mode_t, struct lustre_handle *); void f_dput(struct dentry *); struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir, - obd_mode mode, obd_id id); + obd_gr group, obd_id id); struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa, const char *what); #define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__) int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc); -__u64 filter_next_id(struct filter_obd *); +__u64 filter_next_id(struct filter_obd *, struct obdo *); int filter_update_server_data(struct obd_device *, struct file *, struct filter_server_data *, int force_sync); +int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync); int filter_common_setup(struct obd_device *, obd_count len, void *buf, char *option); @@ -128,23 +115,29 @@ int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount, int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount, struct obd_ioobj *, int niocount, struct niobuf_local *, struct obd_trans_info *); -int filter_brw(int cmd, struct lustre_handle *, struct obdo *, +int filter_brw(int cmd, struct obd_export *, struct obdo *, struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *, struct obd_trans_info *); +void flip_into_page_cache(struct inode *inode, struct page *new_page); + +/* filter_io_*.c */ +int filter_commitrw_write(struct obd_export *exp, int objcount, + struct obd_ioobj *obj, int niocount, + struct niobuf_local *res, + struct obd_trans_info *oti); /* filter_log.c */ -int filter_log_cancel(struct lustre_handle *, struct lov_stripe_md *, - int num_cookies, struct llog_cookie *, int flags); int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid, obd_id oid, obd_count ogen, struct llog_cookie *); int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid, obd_count ogen, struct llog_cookie *); struct llog_handle *filter_get_catalog(struct obd_device *); -void filter_put_catalog(struct llog_handle *cathandle); + /* filter_san.c */ int filter_san_setup(struct obd_device *obd, obd_count len, void *buf); int filter_san_preprw(int cmd, struct obd_export *, struct obdo *, int objcount, struct obd_ioobj *, int niocount, struct niobuf_remote *); + #endif diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 971cf1d..a8d77c5 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -43,8 +43,8 @@ static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb) int rc; page = grab_cache_page(mapping, index); /* locked page */ - if (IS_ERR(page)) - return lnb->rc = PTR_ERR(page); + if (page == NULL) + return lnb->rc = -ENOMEM; LASSERT(page->mapping == mapping); @@ -99,138 +99,6 @@ err_page: return lnb->rc; } -static struct page *lustre_get_page_write(struct inode *inode, - unsigned long index) -{ - struct address_space *mapping = inode->i_mapping; - struct page *page; - int rc; - - page = grab_cache_page(mapping, index); /* locked page */ - - if (!IS_ERR(page)) { - /* Note: Called with "O" and "PAGE_SIZE" this is essentially - * a no-op for most filesystems, because we write the whole - * page. For partial-page I/O this will read in the page. - */ - rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE); - if (rc) { - CERROR("page index %lu, rc = %d\n", index, rc); - if (rc != -ENOSPC) - LBUG(); - GOTO(err_unlock, rc); - } - /* XXX not sure if we need this if we are overwriting page */ - if (PageError(page)) { - CERROR("error on page index %lu, rc = %d\n", index, rc); - LBUG(); - GOTO(err_unlock, rc = -EIO); - } - } - return page; - -err_unlock: - unlock_page(page); - page_cache_release(page); - return ERR_PTR(rc); -} - -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) -int wait_on_page_locked(struct page *page) -{ - waitfor_one_page(page); - return 0; -} - -/* We should only change the file mtime (and not the ctime, like - * update_inode_times() in generic_file_write()) when we only change data. */ -static inline void inode_update_time(struct inode *inode, int ctime_too) -{ - time_t now = CURRENT_TIME; - if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now)) - return; - inode->i_mtime = now; - if (ctime_too) - inode->i_ctime = now; - mark_inode_dirty_sync(inode); -} -#endif - -static int lustre_commit_write(struct niobuf_local *lnb) -{ - struct page *page = lnb->page; - unsigned from = lnb->offset & ~PAGE_MASK; - unsigned to = from + lnb->len; - struct inode *inode = page->mapping->host; - int err; - - LASSERT(to <= PAGE_SIZE); - err = page->mapping->a_ops->commit_write(NULL, page, from, to); -#warning 2.4 folks: wait_on_page_locked does NOT return its error here. - if (!err && IS_SYNC(inode)) - wait_on_page_locked(page); - //SetPageUptodate(page); // the client commit_write will do this - - SetPageReferenced(page); - unlock_page(page); - page_cache_release(page); - return err; -} - -int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb, - int *pglocked) -{ - unsigned long index = lnb->offset >> PAGE_SHIFT; - struct address_space *mapping = inode->i_mapping; - struct page *page; - int rc; - - //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL)); - if (*pglocked) - page = grab_cache_page_nowait(mapping, index); /* locked page */ - else - page = grab_cache_page(mapping, index); /* locked page */ - - - /* This page is currently locked, so get a temporary page instead. */ - if (page == NULL) { - CDEBUG(D_INFO, "ino %lu page %ld locked\n", inode->i_ino,index); - page = alloc_pages(GFP_KERNEL, 0); /* locked page */ - if (page == NULL) { - CERROR("no memory for a temp page\n"); - GOTO(err, rc = -ENOMEM); - } - page->index = index; - lnb->page = page; - lnb->flags |= N_LOCAL_TEMP_PAGE; - } else if (!IS_ERR(page)) { - unsigned from = lnb->offset & ~PAGE_MASK, to = from + lnb->len; - (*pglocked)++; - - rc = mapping->a_ops->prepare_write(NULL, page, from, to); - if (rc) { - if (rc != -ENOSPC) - CERROR("page index %lu, rc = %d\n", index, rc); - GOTO(err_unlock, rc); - } - /* XXX not sure if we need this if we are overwriting page */ - if (PageError(page)) { - CERROR("error on page index %lu, rc = %d\n", index, rc); - LBUG(); - GOTO(err_unlock, rc = -EIO); - } - lnb->page = page; - } - - return 0; - -err_unlock: - unlock_page(page); - page_cache_release(page); -err: - return lnb->rc = rc; -} - static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, @@ -240,7 +108,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, struct obd_run_ctxt saved; struct obd_ioobj *o; struct niobuf_remote *rnb; - struct niobuf_local *lnb; + struct niobuf_local *lnb = NULL; struct fsfilt_objinfo *fso; struct dentry *dentry; struct inode *inode; @@ -258,9 +126,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, memset(res, 0, niocount * sizeof(*res)); - push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL); + push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); for (i = 0, o = obj; i < objcount; i++, o++) { - struct filter_dentry_data *fdd; LASSERT(o->ioo_bufcnt); dentry = filter_oa2dentry(exp->exp_obd, oa); @@ -276,15 +143,13 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, fso[i].fso_dentry = dentry; fso[i].fso_bufcnt = o->ioo_bufcnt; - - fdd = dentry->d_fsdata; - if (fdd == NULL || !atomic_read(&fdd->fdd_open_count)) - CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n", - o->ioo_id); } if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow prep setup %lus\n", (jiffies - now) / HZ); + CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ); + else + CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n", + (jiffies - now)); for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) { dentry = fso[i].fso_dentry; @@ -325,7 +190,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, } if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow prep get page %lus\n", (jiffies - now) / HZ); + CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ); + else + CDEBUG(D_INFO, "start_page_read: %lu jiffies\n", + (jiffies - now)); lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); @@ -340,7 +208,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, } if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ); + CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ); + else + CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n", + (jiffies - now)); EXIT; @@ -355,49 +226,25 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, f_dput(res->dentry); else CERROR("NULL dentry in cleanup -- tell CFS\n"); - res->dentry = NULL; case 0: OBD_FREE(fso, objcount * sizeof(*fso)); - pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL); + pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); } return rc; } -/* We need to balance prepare_write() calls with commit_write() calls. - * If the page has been prepared, but we have no data for it, we don't - * want to overwrite valid data on disk, but we still need to zero out - * data for space which was newly allocated. Like part of what happens - * in __block_prepare_write() for newly allocated blocks. - * - * XXX currently __block_prepare_write() creates buffers for all the - * pages, and the filesystems mark these buffers as BH_New if they - * were newly allocated from disk. We use the BH_New flag similarly. */ -static int filter_commit_write(struct niobuf_local *lnb, int err) +static int filter_start_page_write(struct inode *inode, + struct niobuf_local *lnb) { -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - if (err) { - unsigned block_start, block_end; - struct buffer_head *bh, *head = lnb->page->buffers; - unsigned blocksize = head->b_size; - - /* debugging: just seeing if this ever happens */ - CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR, - "called for ino %lu:%lu on err %d\n", - lnb->page->mapping->host->i_ino, lnb->page->index, err); - - /* Currently one buffer per page, but in the future... */ - for (bh = head, block_start = 0; bh != head || !block_start; - block_start = block_end, bh = bh->b_this_page) { - block_end = block_start + blocksize; - if (buffer_new(bh)) { - memset(kmap(lnb->page) + block_start, 0, - blocksize); - kunmap(lnb->page); - } - } + struct page *page = alloc_pages(GFP_HIGHUSER, 0); + if (page == NULL) { + CERROR("no memory for a temp page\n"); + RETURN(lnb->rc = -ENOMEM); } -#endif - return lustre_commit_write(lnb); + page->index = lnb->offset >> PAGE_SHIFT; + lnb->page = page; + + return 0; } /* If we ever start to support multi-object BRW RPCs, we will need to get locks @@ -417,124 +264,72 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, struct obd_trans_info *oti) { struct obd_run_ctxt saved; - struct obd_ioobj *o; struct niobuf_remote *rnb; - struct niobuf_local *lnb; - struct fsfilt_objinfo *fso; + struct niobuf_local *lnb = NULL; + struct fsfilt_objinfo fso; struct dentry *dentry; - int pglocked = 0, rc = 0, i, j, tot_bytes = 0; + int rc = 0, i, tot_bytes = 0; unsigned long now = jiffies; ENTRY; LASSERT(objcount == 1); - - OBD_ALLOC(fso, objcount * sizeof(*fso)); - if (fso == NULL) - RETURN(-ENOMEM); + LASSERT(obj->ioo_bufcnt > 0); memset(res, 0, niocount * sizeof(*res)); - push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL); - for (i = 0, o = obj; i < objcount; i++, o++) { - struct filter_dentry_data *fdd; - LASSERT(o->ioo_bufcnt); - - dentry = filter_oa2dentry(exp->exp_obd, oa); - if (IS_ERR(dentry)) - GOTO(out_objinfo, rc = PTR_ERR(dentry)); - - if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - o->ioo_id); - f_dput(dentry); - GOTO(out_objinfo, rc = -ENOENT); - } - - fso[i].fso_dentry = dentry; - fso[i].fso_bufcnt = o->ioo_bufcnt; - - down(&dentry->d_inode->i_sem); - fdd = dentry->d_fsdata; - if (fdd == NULL || !atomic_read(&fdd->fdd_open_count)) - CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n", - o->ioo_id); - } + push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); + dentry = filter_fid2dentry(exp->exp_obd, NULL, 0, obj->ioo_id); + if (IS_ERR(dentry)) + GOTO(cleanup, rc = PTR_ERR(dentry)); - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow prep setup %lus\n", (jiffies - now) / HZ); - - LASSERT(oti != NULL); - oti->oti_handle = fsfilt_brw_start(exp->exp_obd, objcount, fso, - niocount, oti); - if (IS_ERR(oti->oti_handle)) { - rc = PTR_ERR(oti->oti_handle); - CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, - "error starting transaction: rc = %d\n", rc); - oti->oti_handle = NULL; - GOTO(out_objinfo, rc); + if (dentry->d_inode == NULL) { + CERROR("trying to BRW to non-existent file "LPU64"\n", + obj->ioo_id); + f_dput(dentry); + GOTO(cleanup, rc = -ENOENT); } - for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) { - dentry = fso[i].fso_dentry; - for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) { - if (j == 0) - lnb->dentry = dentry; - else - lnb->dentry = dget(dentry); - - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - lnb->start = jiffies; - - rc = filter_get_page_write(dentry->d_inode, lnb, - &pglocked); - if (rc) - up(&dentry->d_inode->i_sem); + fso.fso_dentry = dentry; + fso.fso_bufcnt = obj->ioo_bufcnt; - if (rc) { - CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, - "page err %u@"LPU64" %u/%u %p: rc %d\n", - lnb->len, lnb->offset, j, o->ioo_bufcnt, - dentry, rc); - f_dput(dentry); - GOTO(out_pages, rc); - } - tot_bytes += lnb->len; + if (time_after(jiffies, now + 15 * HZ)) + CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ); + else + CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n", + (jiffies - now)); + + for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; + i++, lnb++, rnb++) { + lnb->dentry = dentry; + lnb->offset = rnb->offset; + lnb->len = rnb->len; + lnb->flags = rnb->flags; + lnb->start = jiffies; + + rc = filter_start_page_write(dentry->d_inode, lnb); + if (rc) { + CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@" + LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset, + i, obj->ioo_bufcnt, dentry, rc); + while (lnb-- > res) + __free_pages(lnb->page, 0); + f_dput(dentry); + GOTO(cleanup, rc); } + tot_bytes += lnb->len; } if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow prep get page %lus\n", (jiffies - now) / HZ); + CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ); + else + CDEBUG(D_INFO, "start_page_write: %lu jiffies\n", + (jiffies - now)); lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes); - EXIT; -out: - OBD_FREE(fso, objcount * sizeof(*fso)); - /* we saved the journal handle into oti->oti_handle instead */ - current->journal_info = NULL; - pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL); +cleanup: + pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); return rc; - -out_pages: - while (lnb-- > res) { - filter_commit_write(lnb, rc); - up(&lnb->dentry->d_inode->i_sem); - f_dput(lnb->dentry); - } - filter_finish_transno(exp, oti, rc); - fsfilt_commit(exp->exp_obd, - filter_parent(exp->exp_obd,S_IFREG,obj->ioo_id)->d_inode, - oti->oti_handle, 0); - goto out; /* dropped the dentry refs already (one per page) */ - -out_objinfo: - for (i = 0; i < objcount && fso[i].fso_dentry; i++) { - up(&fso[i].fso_dentry->d_inode->i_sem); - f_dput(fso[i].fso_dentry); - } - goto out; } int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, @@ -551,55 +346,9 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, niocount, nb, res, oti); LBUG(); - return -EPROTO; } -/* It is highly unlikely that we would ever get an error here. The page we want - * to get was previously locked, so it had to have already allocated the space, - * and we were just writing over the same data, so there would be no hole in the - * file. - * - * XXX: possibility of a race with truncate could exist, need to check that. - * There are no guarantees w.r.t. write order even on a local filesystem, - * although the normal response would be to return the number of bytes - * successfully written and leave the rest to the app. */ -static int filter_write_locked_page(struct niobuf_local *lnb) -{ - struct page *lpage; - void *lpage_addr, *lnb_addr; - int rc; - ENTRY; - - lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index); - if (IS_ERR(lpage)) { - rc = PTR_ERR(lpage); - CERROR("error getting locked page index %ld: rc = %d\n", - lnb->page->index, rc); - LBUG(); - lustre_commit_write(lnb); - RETURN(rc); - } - - /* 2 kmaps == vanishingly small deadlock opportunity */ - lpage_addr = kmap(lpage); - lnb_addr = kmap(lnb->page); - - memcpy(lpage_addr, lnb_addr, PAGE_SIZE); - - kunmap(lnb->page); - kunmap(lpage); - - page_cache_release(lnb->page); - - lnb->page = lpage; - rc = lustre_commit_write(lnb); - if (rc) - CERROR("error committing locked page %ld: rc = %d\n", - lnb->page->index, rc); - RETURN(rc); -} - static int filter_commitrw_read(struct obd_export *exp, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, @@ -621,144 +370,50 @@ static int filter_commitrw_read(struct obd_export *exp, int objcount, RETURN(0); } -static int -filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_local *res, struct obd_trans_info *oti) +void flip_into_page_cache(struct inode *inode, struct page *new_page) { - struct obd_run_ctxt saved; - struct obd_ioobj *o; - struct niobuf_local *lnb; - struct obd_device *obd = exp->exp_obd; - int found_locked = 0, rc = 0, i; - int nested_trans = current->journal_info != NULL; - unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */ + struct page *old_page; + int rc; ENTRY; - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - - if (cmd & OBD_BRW_WRITE) { - LASSERT(oti); - LASSERT(current->journal_info == NULL || - current->journal_info == oti->oti_handle); - current->journal_info = oti->oti_handle; - } - - for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) { - struct inode *inode; - int j; - - /* If all of the page reads were beyond EOF, let's pretend - * this read didn't really happen at all. */ - if (lnb->dentry == NULL) { - oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM); - continue; - } - - inode = igrab(lnb->dentry->d_inode); - - if (cmd & OBD_BRW_WRITE) { - /* FIXME: MULTI OBJECT BRW */ - if (oa && oa->o_valid & (OBD_MD_FLMTIME|OBD_MD_FLCTIME)) - obdo_refresh_inode(inode, oa, OBD_MD_FLATIME | - OBD_MD_FLMTIME | - OBD_MD_FLCTIME); - else - inode_update_time(lnb->dentry->d_inode, 1); - } else if (oa && oa->o_valid & OBD_MD_FLATIME) { - /* Note that we don't necessarily write this to disk */ - obdo_refresh_inode(inode, oa, OBD_MD_FLATIME); - } - - for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) { - if (lnb->page == NULL) { - continue; - } - - if (lnb->flags & N_LOCAL_TEMP_PAGE) { - found_locked++; - continue; - } - - if (time_after(jiffies, lnb->start + 15 * HZ)) - CERROR("slow commitrw %lus (%lus)\n", - (jiffies - lnb->start) / HZ, - (jiffies - now) / HZ); - - if (cmd & OBD_BRW_WRITE) { - int err = filter_commit_write(lnb, 0); - - if (!rc) - rc = err; - } else { - page_cache_release(lnb->page); - } - - f_dput(lnb->dentry); - if (time_after(jiffies, lnb->start + 15 * HZ)) - CERROR("slow commit_write %lus (%lus)\n", - (jiffies - lnb->start) / HZ, - (jiffies - now) / HZ); + do { + /* the dlm is protecting us from read/write concurrency, so we + * expect this find_lock_page to return quickly. even if we + * race with another writer it won't be doing much work with + * the page locked. we do this 'cause t_c_p expects a + * locked page, and it wants to grab the pagecache lock + * as well. */ + old_page = find_lock_page(inode->i_mapping, new_page->index); + if (old_page) { +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + truncate_complete_page(old_page); +#else + truncate_complete_page(old_page->mapping, old_page); +#endif + unlock_page(old_page); + page_cache_release(old_page); } - /* FIXME: MULTI OBJECT BRW */ - if (oa) { - oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM); - obdo_from_inode(oa, inode, FILTER_VALID_FLAGS); +#if 0 /* this should be a /proc tunable someday */ + /* racing o_directs (no locking ioctl) could race adding + * their pages, so we repeat the page invalidation unless + * we successfully added our new page */ + rc = add_to_page_cache_unique(new_page, inode->i_mapping, + new_page->index, + page_hash(inode->i_mapping, + new_page->index)); + if (rc == 0) { + /* add_to_page_cache clears uptodate|dirty and locks + * the page */ + SetPageUptodate(new_page); + unlock_page(new_page); } +#else + rc = 0; +#endif + } while (rc != 0); - if (cmd & OBD_BRW_WRITE) - up(&inode->i_sem); - - iput(inode); - } - - for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount; - i++, o++) { - int j; - - for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) { - int err; - if (!(lnb->flags & N_LOCAL_TEMP_PAGE)) - continue; - - if (time_after(jiffies, lnb->start + 15 * HZ)) - CERROR("slow commitrw locked %lus (%lus)\n", - (jiffies - lnb->start) / HZ, - (jiffies - now) / HZ); - - err = filter_write_locked_page(lnb); - if (!rc) - rc = err; - f_dput(lnb->dentry); - found_locked--; - - if (time_after(jiffies, lnb->start + 15 * HZ)) - CERROR("slow commit_write locked %lus (%lus)\n", - (jiffies - lnb->start) / HZ, - (jiffies - now) / HZ); - } - } - - if (cmd & OBD_BRW_WRITE) { - /* We just want any dentry for the commit, for now */ - struct dentry *dparent = filter_parent(obd, S_IFREG, 0); - int err; - - rc = filter_finish_transno(exp, oti, rc); - err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle, - obd_sync_filter); - if (err) - rc = err; - if (obd_sync_filter) - LASSERT(oti->oti_transno <= obd->obd_last_committed); - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ); - } - - LASSERT(nested_trans || current->journal_info == NULL); - pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - RETURN(rc); + EXIT; } /* XXX needs to trickle its oa down */ @@ -767,8 +422,8 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, struct niobuf_local *res, struct obd_trans_info *oti) { if (cmd == OBD_BRW_WRITE) - return filter_commitrw_write(cmd, exp, oa, objcount, obj, - niocount, res, oti); + return filter_commitrw_write(exp, objcount, obj, niocount, + res, oti); if (cmd == OBD_BRW_READ) return filter_commitrw_read(exp, objcount, obj, niocount, res, oti); @@ -776,11 +431,10 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, return -EPROTO; } -int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa, +int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *oti) { - struct obd_export *exp; struct obd_ioobj ioo; struct niobuf_local *lnb; struct niobuf_remote *rnb; @@ -788,12 +442,6 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa, int ret = 0; ENTRY; - exp = class_conn2export(conn); - if (exp == NULL) { - CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie); - RETURN(-EINVAL); - } - OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local)); OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote)); @@ -826,8 +474,8 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa, else memcpy(virt + off, addr + off, pga[i].count); - kunmap(addr); - kunmap(virt); + kunmap(lnb[i].page); + kunmap(pga[i].pg); } ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti); @@ -837,6 +485,5 @@ out: OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local)); if (rnb) OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote)); - class_export_put(exp); RETURN(ret); } diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index 77eb078..de21a1f 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -37,202 +37,6 @@ #include "filter_internal.h" -static struct llog_handle *filter_log_create(struct obd_device *obd); - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static int filter_log_close(struct llog_handle *cathandle, - struct llog_handle *loghandle) -{ - struct llog_object_hdr *llh = loghandle->lgh_hdr; - struct file *file = loghandle->lgh_file; - struct dentry *dparent = NULL, *dchild = NULL; - struct lustre_handle parent_lockh; - struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl; - int rc; - ENTRY; - - /* If we are going to delete this log, grab a ref before we close - * it so we don't have to immediately do another lookup. */ - if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){ - CDEBUG(D_INODE, "deleting log file "LPX64":%x\n", - lgl->lgl_oid, lgl->lgl_ogen); - dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG, - lgl->lgl_oid,LCK_PW,&parent_lockh); - if (IS_ERR(dparent)) { - rc = PTR_ERR(dparent); - CERROR("error locking parent, orphan log %*s: rc %d\n", - file->f_dentry->d_name.len, - file->f_dentry->d_name.name, rc); - RETURN(rc); - } else { - dchild = dget(file->f_dentry); - llog_delete_log(cathandle, loghandle); - } - } else { - CDEBUG(D_INODE, "closing log file "LPX64":%x\n", - lgl->lgl_oid, lgl->lgl_ogen); - } - - rc = filp_close(file, 0); - - llog_free_handle(loghandle); /* also removes loghandle from list */ - - if (dchild != NULL) { - int err = vfs_unlink(dparent->d_inode, dchild); - if (err) { - CERROR("error unlinking empty log %*s: rc %d\n", - dchild->d_name.len, dchild->d_name.name, err); - if (!rc) - rc = err; - } - f_dput(dchild); - ldlm_lock_decref(&parent_lockh, LCK_PW); - } - RETURN(rc); -} - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static struct llog_handle *filter_log_open(struct obd_device *obd, - struct llog_cookie *logcookie) -{ - struct llog_logid *lgl = &logcookie->lgc_lgl; - struct llog_handle *loghandle; - struct dentry *dchild; - int rc; - ENTRY; - - loghandle = llog_alloc_handle(); - if (!loghandle) - RETURN(ERR_PTR(-ENOMEM)); - - dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid); - if (IS_ERR(dchild)) - GOTO(out_handle, rc = PTR_ERR(dchild)); - - if (dchild->d_inode == NULL) { - CERROR("logcookie references non-existent object %*s\n", - dchild->d_name.len, dchild->d_name.name); - GOTO(out_dentry, rc = -ENOENT); - } - - if (dchild->d_inode->i_generation != lgl->lgl_ogen) { - CERROR("logcookie for %*s had different generation %x != %x\n", - dchild->d_name.len, dchild->d_name.name, - dchild->d_inode->i_generation, lgl->lgl_ogen); - GOTO(out_dentry, rc = -ESTALE); - } - - /* dentry_open does a dput(dchild) and mntput(mnt) on error */ - mntget(obd->u.filter.fo_vfsmnt); - loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt, - O_RDWR); - if (IS_ERR(loghandle->lgh_file)) { - rc = PTR_ERR(loghandle->lgh_file); - CERROR("error opening logfile %*s: rc %d\n", - dchild->d_name.len, dchild->d_name.name, rc); - GOTO(out_dentry, rc); - } - memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie)); - loghandle->lgh_log_create = filter_log_create; - loghandle->lgh_log_open = filter_log_open; - loghandle->lgh_log_close = filter_log_close; - loghandle->lgh_obd = obd; - RETURN(loghandle); - -out_dentry: - f_dput(dchild); -out_handle: - llog_free_handle(loghandle); - RETURN(ERR_PTR(rc)); -} - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static struct llog_handle *filter_log_create(struct obd_device *obd) -{ - struct filter_obd *filter = &obd->u.filter; - struct lustre_handle parent_lockh; - struct dentry *dparent, *dchild; - struct llog_handle *loghandle; - struct file *file; - int err, rc; - obd_id id; - ENTRY; - - loghandle = llog_alloc_handle(); - if (!loghandle) - RETURN(ERR_PTR(-ENOMEM)); - - retry: - id = filter_next_id(filter); - - dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh); - if (IS_ERR(dparent)) - GOTO(out_ctxt, rc = PTR_ERR(dparent)); - - dchild = filter_fid2dentry(obd, dparent, S_IFREG, id); - if (IS_ERR(dchild)) - GOTO(out_lock, rc = PTR_ERR(dchild)); - - if (dchild->d_inode != NULL) { - /* This would only happen if lastobjid was bad on disk */ - CERROR("Serious error: objid %*s already exists; is this " - "filesystem corrupt? I will try to work around it.\n", - dchild->d_name.len, dchild->d_name.name); - f_dput(dchild); - ldlm_lock_decref(&parent_lockh, LCK_PW); - goto retry; - } - - rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL); - if (rc) { - CERROR("log create failed rc = %d\n", rc); - GOTO(out_child, rc); - } - - rc = filter_update_server_data(obd, filter->fo_rcvd_filp, - filter->fo_fsd, 0); - if (rc) { - CERROR("can't write lastobjid but log created: rc %d\n",rc); - GOTO(out_destroy, rc); - } - - /* dentry_open does a dput(dchild) and mntput(mnt) on error */ - mntget(filter->fo_vfsmnt); - file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE); - if (IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("error opening log file "LPX64": rc %d\n", id, rc); - GOTO(out_destroy, rc); - } - ldlm_lock_decref(&parent_lockh, LCK_PW); - - loghandle->lgh_file = file; - loghandle->lgh_cookie.lgc_lgl.lgl_oid = id; - loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation; - loghandle->lgh_log_create = filter_log_create; - loghandle->lgh_log_open = filter_log_open; - loghandle->lgh_log_close = filter_log_close; - loghandle->lgh_obd = obd; - - RETURN(loghandle); - -out_destroy: - err = vfs_unlink(dparent->d_inode, dchild); - if (err) - CERROR("error unlinking %*s on error: rc %d\n", - dchild->d_name.len, dchild->d_name.name, err); -out_child: - f_dput(dchild); -out_lock: - ldlm_lock_decref(&parent_lockh, LCK_PW); -out_ctxt: - llog_free_handle(loghandle); - RETURN(ERR_PTR(rc)); -} - /* This is called from filter_setup() and should be single threaded */ struct llog_handle *filter_get_catalog(struct obd_device *obd) { @@ -240,20 +44,18 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd) struct filter_server_data *fsd = filter->fo_fsd; struct obd_run_ctxt saved; struct llog_handle *cathandle = NULL; + struct llog_logid logid; int rc; ENTRY; - push_ctxt(&saved, &filter->fo_ctxt, NULL); + push_ctxt(&saved, &obd->obd_ctxt, NULL); if (fsd->fsd_catalog_oid) { - struct llog_cookie catcookie; - - catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid); - catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen); - cathandle = filter_log_open(obd, &catcookie); - if (IS_ERR(cathandle)) { + logid.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid); + logid.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen); + rc = llog_create(obd, &cathandle, &logid, NULL); + if (rc) { CERROR("error opening catalog "LPX64":%x: rc %d\n", - catcookie.lgc_lgl.lgl_oid, - catcookie.lgc_lgl.lgl_ogen, + logid.lgl_oid, logid.lgl_ogen, (int)PTR_ERR(cathandle)); fsd->fsd_catalog_oid = 0; fsd->fsd_catalog_ogen = 0; @@ -261,17 +63,15 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd) } if (!fsd->fsd_catalog_oid) { - struct llog_logid *lgl; - - cathandle = filter_log_create(obd); - if (IS_ERR(cathandle)) { - CERROR("error creating new catalog: rc %d\n", - (int)PTR_ERR(cathandle)); + rc = llog_create(obd, &cathandle, NULL, NULL); + if (rc) { + CERROR("error creating new catalog: rc %d\n", rc); + cathandle = ERR_PTR(rc); GOTO(out, cathandle); } - lgl = &cathandle->lgh_cookie.lgc_lgl; - fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid); - fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen); + logid = cathandle->lgh_id; + fsd->fsd_catalog_oid = cpu_to_le64(logid.lgl_oid); + fsd->fsd_catalog_ogen = cpu_to_le32(logid.lgl_ogen); rc = filter_update_server_data(obd, filter->fo_rcvd_filp,fsd,0); if (rc) { CERROR("error writing new catalog to disk: rc %d\n",rc); @@ -279,52 +79,19 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd) } } - rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid); + rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->u.filter.fo_mdc_uuid); if (rc) GOTO(out_handle, rc); out: - pop_ctxt(&saved, &filter->fo_ctxt, NULL); + pop_ctxt(&saved, &obd->obd_ctxt, NULL); RETURN(cathandle); out_handle: - filter_log_close(cathandle, cathandle); + llog_close(cathandle); cathandle = ERR_PTR(rc); goto out; } -void filter_put_catalog(struct llog_handle *cathandle) -{ - struct llog_handle *loghandle, *n; - int rc; - ENTRY; - - list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list) - filter_log_close(cathandle, loghandle); - - rc = filp_close(cathandle->lgh_file, 0); - if (rc) - CERROR("error closing catalog: rc %d\n", rc); - - llog_free_handle(cathandle); - EXIT; -} - -int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm, - int num_cookies, struct llog_cookie *logcookies, - int flags) -{ - struct obd_device *obd = class_conn2obd(conn); - struct obd_run_ctxt saved; - int rc; - ENTRY; - - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies, - logcookies); - pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - - RETURN(rc); -} int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid, obd_id oid, obd_count ogen, @@ -337,15 +104,15 @@ int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid, OBD_ALLOC(lcr, sizeof(*lcr)); if (lcr == NULL) RETURN(-ENOMEM); - lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr); - lcr->lcr_hdr.lth_type = OST_CREATE_REC; + lcr->lcr_hdr.lrh_len = lcr->lcr_tail.lrt_len = sizeof(*lcr); + lcr->lcr_hdr.lrh_type = OST_CREATE_REC; lcr->lcr_fid.id = mds_fid->id; lcr->lcr_fid.generation = mds_fid->generation; lcr->lcr_fid.f_type = mds_fid->f_type; lcr->lcr_oid = oid; lcr->lcr_ogen = ogen; - rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie); + rc = llog_cat_add_rec(cathandle, &lcr->lcr_hdr, logcookie, NULL); OBD_FREE(lcr, sizeof(*lcr)); if (rc > 0) { @@ -365,12 +132,12 @@ int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid, OBD_ALLOC(lor, sizeof(*lor)); if (lor == NULL) RETURN(-ENOMEM); - lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor); - lor->lor_hdr.lth_type = OST_ORPHAN_REC; + lor->lor_hdr.lrh_len = lor->lor_tail.lrt_len = sizeof(*lor); + lor->lor_hdr.lrh_type = OST_ORPHAN_REC; lor->lor_oid = oid; lor->lor_ogen = ogen; - rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie); + rc = llog_cat_add_rec(cathandle, &lor->lor_hdr, logcookie, NULL); if (rc > 0) { LASSERT(rc == sizeof(*logcookie)); diff --git a/lustre/ptlrpc/Makefile.mk b/lustre/ptlrpc/Makefile.mk index 064cf51..f7fb9d4 100644 --- a/lustre/ptlrpc/Makefile.mk +++ b/lustre/ptlrpc/Makefile.mk @@ -8,4 +8,4 @@ include $(src)/../portals/Kernelenv obj-y += ptlrpc.o ptlrpc-objs := recover.o connection.o ptlrpc_module.o events.o service.o \ client.o niobuf.o pack_generic.o lproc_ptlrpc.o pinger.o \ - recov_thread.o ptlrpc_lib.o + recov_thread.o ptlrpc_lib.o import.o -- 1.8.3.1