Whamcloud - gitweb
merge b_devel -> b_eq: 20030909
authorericm <ericm>
Tue, 9 Sep 2003 14:37:12 +0000 (14:37 +0000)
committerericm <ericm>
Tue, 9 Sep 2003 14:37:12 +0000 (14:37 +0000)
only kernel pass sanity, liblustre still broken

lustre/include/linux/lustre_log.h
lustre/llite/llite_lib.c
lustre/lov/lov_internal.h
lustre/mdc/Makefile.mk
lustre/obdclass/recov_log.c [deleted file]
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_log.c
lustre/ptlrpc/Makefile.mk

index 2f21583..5d722b4 100644 (file)
 #ifndef _LUSTRE_LOG_H
 #define _LUSTRE_LOG_H
 
+#include <linux/obd.h>
 #include <linux/lustre_idl.h>
 
 struct obd_trans_info;
 struct obd_device;
 struct lov_stripe_md;
 
+struct plain_handle_data {
+        struct list_head   phd_entry;
+        struct llog_cookie phd_cookie; /* cookie of this log in its cat */
+        int                phd_last_idx;
+};
+
+struct cat_handle_data {
+        struct list_head        chd_head;
+        struct llog_handle     *chd_current_log; /* currently open log */
+};
+
 /* In-memory descriptor for a log object or log catalog */
 struct llog_handle {
-        struct list_head        lgh_list;
-        struct llog_cookie      lgh_cookie;
         struct semaphore        lgh_lock;
+        struct llog_logid       lgh_id;              /* id of this log */
         struct obd_device      *lgh_obd;
-        void                   *lgh_hdr;
+        struct llog_log_hdr    *lgh_hdr;
         struct file            *lgh_file;
-        struct obd_uuid        *lgh_tgtuuid;
-        struct llog_handle     *lgh_current;
-        struct llog_handle     *(*lgh_log_create)(struct obd_device *obd);
-        struct llog_handle     *(*lgh_log_open)(struct obd_device *obd,
-                                                struct llog_cookie *logcookie);
-        int                     (*lgh_log_close)(struct llog_handle *cathandle,
-                                                 struct llog_handle *loghandle);
-        int                     lgh_index;
+        int                     lgh_last_idx;
+        union {
+                struct plain_handle_data phd;
+                struct cat_handle_data   chd;
+        } u;
 };
 
-extern int llog_add_record(struct llog_handle *cathandle,
-                           struct llog_trans_hdr *rec,
-                           struct llog_cookie *logcookies);
+#define LLOG_EEMPTY 4711
+
+/* llog.c  -  general API */
+typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
+int llog_init_handle(struct llog_handle *handle, int flags, struct obd_uuid *uuid);
+extern void llog_free_handle(struct llog_handle *handle);
+
+
+/* llog_cat.c   -  catalog api */
+void llog_cat_put(struct llog_handle *cathandle);
+int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
+                     struct llog_cookie *reccookie, void *buf);
+
 
 extern int llog_cancel_records(struct llog_handle *cathandle, int count,
                                struct llog_cookie *cookies);
 
 extern struct llog_handle *llog_alloc_handle(void);
-extern void llog_free_handle(struct llog_handle *handle);
 extern int llog_init_catalog(struct llog_handle *cathandle,
                              struct obd_uuid *tgtuuid);
 extern int llog_delete_log(struct llog_handle *cathandle,
@@ -76,6 +93,174 @@ extern int llog_close_log(struct llog_handle *cathandle,
                           struct llog_handle *loghandle);
 extern struct llog_handle *llog_new_log(struct llog_handle *cathandle,
                                         struct obd_uuid *tgtuuid);
+struct llog_operations {
+        int (*lop_write_rec)(struct llog_handle *loghandle,
+                             struct llog_rec_hdr *rec, 
+                             struct llog_cookie *logcookies, 
+                             int numcookies, 
+                             void *,
+                             int idx);
+        int (*lop_destroy)(struct llog_handle *handle);
+        int (*lop_next_block)(struct llog_handle *h, 
+                              int curr_idx,  
+                              int next_idx, 
+                              __u64 *cur_offset, 
+                              void *buf, 
+                              int len);
+        int (*lop_create)(struct obd_device *obd, struct llog_handle **,
+                          struct llog_logid *logid, char *name);
+        int (*lop_close)(struct llog_handle *handle);
+        int (*lop_read_header)(struct llog_handle *handle);
+};
 
-#endif
+extern struct llog_operations llog_lvfs_ops;
+
+static inline int llog_obd2ops(struct obd_device *obd,
+                               struct llog_operations **lop)
+{
+        struct obd_export *exp;
+
+        if (obd == NULL)
+                 return -ENOTCONN;
+        exp = obd->obd_log_exp;
+        if (exp == NULL)
+                return -ENOTCONN;
+        if (exp->exp_obd == NULL)
+                return -ENOTCONN;
+        *lop = exp->exp_obd->obd_logops;
+        if (*lop == NULL)
+                return -EOPNOTSUPP;
+        return 0;
+}
+
+static inline int llog_handle2ops(struct llog_handle *loghandle,
+                                  struct llog_operations **lop)
+{
+        if (loghandle == NULL)
+                return -EINVAL;
+        return llog_obd2ops(loghandle->lgh_obd, lop);
+}
+
+static inline int llog_close(struct llog_handle *loghandle)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+
+        rc = llog_handle2ops(loghandle, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_close == NULL)
+                RETURN(-EOPNOTSUPP);
+        rc = lop->lop_close(loghandle);
+        RETURN(rc);
+}
 
+static inline int llog_write_rec(struct llog_handle *handle,
+                                 struct llog_rec_hdr *rec,
+                                 struct llog_cookie *logcookies,
+                                 int numcookies, void *buf, int idx)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+        
+        rc = llog_handle2ops(handle, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_write_rec == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_write_rec(handle, rec, logcookies, numcookies, buf, idx);
+        RETURN(rc);
+}
+
+static inline int llog_read_header(struct llog_handle *handle)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+        
+        rc = llog_handle2ops(handle, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_read_header == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_read_header(handle);
+        RETURN(rc);
+}
+
+static inline int llog_destroy(struct llog_handle *handle)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+        
+        rc = llog_handle2ops(handle, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_destroy == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_destroy(handle);
+        RETURN(rc);
+}
+
+#if 0
+static inline int llog_cancel(struct obd_export *exp,
+                              struct lov_stripe_md *lsm, int count,
+                              struct llog_cookie *cookies, int flags)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+
+        rc = llog_handle2ops(loghandle, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_cancel == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_cancel(exp, lsm, count, cookies, flags);
+        RETURN(rc);
+}
+#endif 
+
+static inline int llog_next_block(struct llog_handle *loghandle, int cur_idx,
+                                  int next_idx, __u64 *cur_offset, void *buf,
+                                  int len)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+
+        rc = llog_handle2ops(loghandle, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_next_block == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_next_block(loghandle, cur_idx, next_idx, cur_offset, buf,
+                                 len);
+        RETURN(rc);
+}
+
+static inline int llog_create(struct obd_device *obd, struct llog_handle **res,
+                              struct llog_logid *logid, char *name)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+
+        rc = llog_obd2ops(obd, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_create == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_create(obd, res, logid, name);
+        RETURN(rc);
+}
+
+#endif
index 8c17001..667d8c2 100644 (file)
@@ -100,11 +100,19 @@ void ll_options(char *options, char **ost, char **mds, int *flags)
         while ((this_char = strsep (&opt_ptr, ",")) != NULL) {
 #endif
                 CDEBUG(D_SUPER, "this_char %s\n", this_char);
-                if ((!*ost && (*ost = ll_read_opt("osc", this_char)))||
-                    (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
-                    (!(*flags & LL_SBI_NOLCK) &&
-                     ((*flags) = (*flags) |
-                      ll_set_opt("nolock", this_char, LL_SBI_NOLCK))))
+                if (!*ost && (*ost = ll_read_opt("osc", this_char)))
+                        continue;
+                if (!*mds && (*mds = ll_read_opt("mdc", this_char)))
+                        continue;
+                if (!(*flags & LL_SBI_NOLCK) &&
+                    ((*flags) = (*flags) |
+                                ll_set_opt("nolock", this_char,
+                                           LL_SBI_NOLCK)))
+                        continue;
+                if (!(*flags & LL_SBI_READAHEAD) &&
+                    ((*flags) = (*flags) |
+                                ll_set_opt("readahead", this_char,
+                                           LL_SBI_READAHEAD)))
                         continue;
         }
         EXIT;
@@ -113,17 +121,8 @@ void ll_options(char *options, char **ost, char **mds, int *flags)
 void ll_lli_init(struct ll_inode_info *lli)
 {
         sema_init(&lli->lli_open_sem, 1);
-        spin_lock_init(&lli->lli_read_extent_lock);
-        INIT_LIST_HEAD(&lli->lli_read_extents);
         lli->lli_flags = 0;
         lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-        spin_lock_init(&lli->lli_pg_lock);
-        INIT_LIST_HEAD(&lli->lli_lc_item);
-        plist_init(&lli->lli_pl_read);
-        plist_init(&lli->lli_pl_write);
-        atomic_set(&lli->lli_in_writepages, 0);
-#endif
 }
 
 int ll_fill_super(struct super_block *sb, void *data, int silent)
@@ -137,6 +136,8 @@ int ll_fill_super(struct super_block *sb, void *data, int silent)
         struct ll_fid rootfid;
         struct obd_statfs osfs;
         struct ptlrpc_request *request = NULL;
+        struct lustre_handle osc_conn = {0, };
+        struct lustre_handle mdc_conn = {0, };
         struct lustre_md md;
         class_uuid_t uuid;
 
@@ -179,11 +180,12 @@ int ll_fill_super(struct super_block *sb, void *data, int silent)
                         CERROR("could not register mount in /proc/lustre");
         }
 
-        err = obd_connect(&sbi->ll_mdc_conn, obd, &sbi->ll_sb_uuid);
+        err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", mdc, err);
                 GOTO(out_free, err);
         }
+        sbi->ll_mdc_exp = class_conn2export(&mdc_conn);
 
         err = obd_statfs(obd, &osfs, jiffies - HZ);
         if (err)
@@ -201,13 +203,14 @@ int ll_fill_super(struct super_block *sb, void *data, int silent)
                 GOTO(out_mdc, err);
         }
 
-        err = obd_connect(&sbi->ll_osc_conn, obd, &sbi->ll_sb_uuid);
+        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
                 GOTO(out_mdc, err);
         }
+        sbi->ll_osc_exp = class_conn2export(&osc_conn);
 
-        err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid);
+        err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
                 GOTO(out_osc, err);
@@ -219,30 +222,18 @@ int ll_fill_super(struct super_block *sb, void *data, int silent)
 
         /* make root inode 
          * XXX: move this to after cbd setup? */
-        err = mdc_getattr(&sbi->ll_mdc_conn, &rootfid,
+        err = mdc_getattr(sbi->ll_mdc_exp, &rootfid,
                           OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request);
         if (err) {
                 CERROR("mdc_getattr failed for root: rc = %d\n", err);
                 GOTO(out_osc, err);
         }
 
-        /* initialize committed transaction callback daemon */
-        spin_lock_init(&sbi->ll_commitcbd_lock);
-        init_waitqueue_head(&sbi->ll_commitcbd_waitq);
-        init_waitqueue_head(&sbi->ll_commitcbd_ctl_waitq);
-        sbi->ll_commitcbd_flags = 0;
-        err = ll_commitcbd_setup(sbi);
-        if (err) {
-                CERROR("failed to start commit callback daemon: rc = %d\n",err);
-                ptlrpc_req_finished (request);
-                GOTO(out_lliod, err);
-        }
-
-        err = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md);
+        err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n",err);
                 ptlrpc_req_finished (request);
-                GOTO(out_lliod, err);
+                GOTO(out_osc, err);
         }
 
         LASSERT(sbi->ll_rootino != 0);
@@ -253,17 +244,9 @@ int ll_fill_super(struct super_block *sb, void *data, int silent)
         if (root == NULL || is_bad_inode(root)) {
                 /* XXX might need iput() for bad inode */
                 CERROR("lustre_lite: bad iget4 for root\n");
-                GOTO(out_cbd, err = -EBADF);
+                GOTO(out_osc, err = -EBADF);
         }
 
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-        /* initialize the pagecache writeback thread */
-        err = lliod_start(sbi, root);
-        if (err) {
-                CERROR("failed to start lliod: rc = %d\n",err);
-                GOTO(out_root, sb = NULL);
-        }
-#endif
         sb->s_root = d_alloc_root(root);
 
 out_dev:
@@ -274,20 +257,11 @@ out_dev:
 
         RETURN(err);
 
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-out_root:
         iput(root);
-#endif
-out_cbd:
-        ll_commitcbd_cleanup(sbi);
-out_lliod:
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-        lliod_stop(sbi);
-#endif
 out_osc:
-        obd_disconnect(&sbi->ll_osc_conn, 0);
+        obd_disconnect(sbi->ll_osc_exp, 0);
 out_mdc:
-        obd_disconnect(&sbi->ll_mdc_conn, 0);
+        obd_disconnect(sbi->ll_mdc_exp, 0);
 out_free:
         lprocfs_unregister_mountpoint(sbi);
         OBD_FREE(sbi, sizeof(*sbi));
@@ -298,18 +272,14 @@ out_free:
 void ll_put_super(struct super_block *sb)
 {
         struct ll_sb_info *sbi = ll_s2sbi(sb);
-        struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn);
+        struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp);
         struct hlist_node *tmp, *next;
         struct ll_fid rootfid;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb);
         list_del(&sbi->ll_conn_chain);
-        ll_commitcbd_cleanup(sbi);
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-        lliod_stop(sbi);
-#endif
-        obd_disconnect(&sbi->ll_osc_conn, 0);
+        obd_disconnect(sbi->ll_osc_exp, 0);
 
         /* NULL request to force sync on the MDS, and get the last_committed
          * value to flush remaining RPCs from the sending queue on client.
@@ -318,7 +288,7 @@ void ll_put_super(struct super_block *sb)
          *     which we can call for other reasons as well.
          */
         if (!obd->obd_no_recov)
-                mdc_getstatus(&sbi->ll_mdc_conn, &rootfid);
+                mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
 
         lprocfs_unregister_mountpoint(sbi);
         if (sbi->ll_proc_root) {
@@ -326,9 +296,9 @@ void ll_put_super(struct super_block *sb)
                 sbi->ll_proc_root = NULL;
         }
 
-        obd_disconnect(&sbi->ll_mdc_conn, 0);
+        obd_disconnect(sbi->ll_mdc_exp, 0);
 
-#warning Why do we need this?
+#warning We do this to get rid of orphaned dentries. That is not really trw.
         spin_lock(&dcache_lock);
         hlist_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) {
                 struct dentry *dentry = hlist_entry(tmp, struct dentry, d_hash);
@@ -376,14 +346,15 @@ void ll_clear_inode(struct inode *inode)
                inode->i_generation, inode);
 
         ll_inode2fid(&fid, inode);
-        mdc_change_cbdata(&sbi->ll_mdc_conn, &fid, null_if_equal, inode);
+        clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags));
+        mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode);
 
         if (lli->lli_smd)
-                obd_change_cbdata(&sbi->ll_osc_conn, lli->lli_smd,
+                obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd,
                                   null_if_equal, inode);
 
         if (lli->lli_smd) {
-                obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd);
+                obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd);
                 lli->lli_smd = NULL;
         }
 
@@ -396,98 +367,6 @@ void ll_clear_inode(struct inode *inode)
         EXIT;
 }
 
-/* like inode_setattr, but doesn't mark the inode dirty */
-int ll_attr2inode(struct inode *inode, struct iattr *attr, int trunc)
-{
-        unsigned int ia_valid = attr->ia_valid;
-        int error = 0;
-
-        if ((ia_valid & ATTR_SIZE) && trunc) {
-                if (attr->ia_size > ll_file_maxbytes(inode)) {
-                        error = -EFBIG;
-                        goto out;
-                }
-                error = vmtruncate(inode, attr->ia_size);
-                if (error)
-                        goto out;
-        } else if (ia_valid & ATTR_SIZE)
-                inode->i_size = attr->ia_size;
-
-        if (ia_valid & ATTR_UID)
-                inode->i_uid = attr->ia_uid;
-        if (ia_valid & ATTR_GID)
-                inode->i_gid = attr->ia_gid;
-        if (ia_valid & ATTR_ATIME)
-                inode->i_atime = attr->ia_atime;
-        if (ia_valid & ATTR_MTIME)
-                inode->i_mtime = attr->ia_mtime;
-        if (ia_valid & ATTR_CTIME)
-                inode->i_ctime = attr->ia_ctime;
-        if (ia_valid & ATTR_MODE) {
-                inode->i_mode = attr->ia_mode;
-                if (!in_group_p(inode->i_gid) && !capable(CAP_FSETID))
-                        inode->i_mode &= ~S_ISGID;
-        }
-out:
-        return error;
-}
-
-int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc)
-{
-        struct ptlrpc_request *request = NULL;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
-        int err = 0;
-        ENTRY;
-
-        /* change incore inode */
-        err = ll_attr2inode(inode, attr, do_trunc);
-        if (err)
-                RETURN(err);
-
-        /* Don't send size changes to MDS to avoid "fast EA" problems, and
-         * also avoid a pointless RPC (we get file size from OST anyways).
-         */
-        attr->ia_valid &= ~ATTR_SIZE;
-        if (attr->ia_valid) {
-                struct mdc_op_data op_data;
-
-                ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0);
-                err = mdc_setattr(&sbi->ll_mdc_conn, &op_data,
-                                  attr, NULL, 0, NULL, 0, &request);
-                if (err)
-                        CERROR("mdc_setattr fails: err = %d\n", err);
-
-                ptlrpc_req_finished(request);
-                if (S_ISREG(inode->i_mode) && attr->ia_valid & ATTR_MTIME_SET) {
-                        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-                        struct obdo oa;
-                        int err2;
-
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-                        CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n",
-                               inode->i_ino, attr->ia_mtime);
-                        oa.o_mtime = attr->ia_mtime;
-#else
-                        CDEBUG(D_INODE, "set mtime on OST inode %lu to "
-                               LPU64"\n", inode->i_ino, 
-                               ll_ts2u64(&attr->ia_mtime));
-                        oa.o_mtime = ll_ts2u64(&attr->ia_mtime);
-#endif
-                        oa.o_id = lsm->lsm_object_id;
-                        oa.o_mode = S_IFREG;
-                        oa.o_valid = OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME;
-                        err2 = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL);
-                        if (err2) {
-                                CERROR("obd_setattr fails: rc=%d\n", err);
-                                if (!err)
-                                        err = err2;
-                        }
-                }
-        }
-
-        RETURN(err);
-}
-
 /* If this inode has objects allocated to it (lsm != NULL), then the OST
  * object(s) determine the file size and mtime.  Otherwise, the MDS will
  * keep these values until such a time that objects are allocated for it.
@@ -554,8 +433,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 struct lustre_md md;
                 ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0);
 
-                rc = mdc_setattr(&sbi->ll_mdc_conn, &op_data,
-                                  attr, NULL, 0, NULL, 0, &request);
+                rc = mdc_setattr(sbi->ll_mdc_exp, &op_data,
+                                 attr, NULL, 0, NULL, 0, &request);
 
                 if (rc) {
                         ptlrpc_req_finished(request);
@@ -564,7 +443,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                         RETURN(rc);
                 }
 
-                rc = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md);
+                rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md);
                 if (rc) {
                         ptlrpc_req_finished(request);
                         RETURN(rc);
@@ -600,7 +479,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
         }
 
         if (ia_valid & ATTR_SIZE) {
-                struct ldlm_extent extent = { .start = attr->ia_size,
+                struct ldlm_extent extent = { .start = 0,
                                               .end = OBD_OBJECT_EOF };
                 struct lustre_handle lockh = { 0 };
                 int err, ast_flags = 0;
@@ -622,7 +501,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 if (extent.start == 0)
                         ast_flags = LDLM_AST_DISCARD_DATA;
                 /* bug 1639: avoid write/truncate i_sem/DLM deadlock */
-                LASSERT(atomic_read(&inode->i_sem.count) == 0);
+                LASSERT(atomic_read(&inode->i_sem.count) <= 0);
                 up(&inode->i_sem);
                 rc = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW,
                                                 &extent, &lockh, ast_flags);
@@ -635,7 +514,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
 
                 rc = vmtruncate(inode, attr->ia_size);
                 if (rc == 0)
-                        set_bit(LLI_F_HAVE_SIZE_LOCK,
+                        set_bit(LLI_F_HAVE_OST_SIZE_LOCK,
                                 &ll_i2info(inode)->lli_flags);
 
                 /* unlock now as we don't mind others file lockers racing with
@@ -655,7 +534,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 oa.o_valid = OBD_MD_FLID;
                 obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-                rc = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL);
+                rc = obd_setattr(sbi->ll_osc_exp, &oa, lsm, NULL);
                 if (rc)
                         CERROR("obd_setattr fails: rc=%d\n", rc);
         }
@@ -664,13 +543,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
 
 int ll_setattr(struct dentry *de, struct iattr *attr)
 {
-        int rc = inode_change_ok(de->d_inode, attr);
-        CDEBUG(D_VFSTRACE, "VFS Op:name=%s\n", de->d_name.name);
-        if (rc)
-                return rc;
-
-        lprocfs_counter_incr(ll_i2sbi(de->d_inode)->ll_stats, LPROC_LL_SETATTR);
-        return ll_inode_setattr(de->d_inode, attr, 1);
+        LBUG(); /* code is unused, but leave this in case of VFS changes */
+        RETURN(-ENOSYS);
 }
 
 int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs,
@@ -681,7 +555,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs,
         int rc;
         ENTRY;
 
-        rc = obd_statfs(class_conn2obd(&sbi->ll_mdc_conn), osfs, max_age);
+        rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age);
         if (rc) {
                 CERROR("mdc_statfs fails: rc = %d\n", rc);
                 RETURN(rc);
@@ -690,7 +564,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs,
         CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
                osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files);
 
-        rc = obd_statfs(class_conn2obd(&sbi->ll_osc_conn), &obd_osfs, max_age);
+        rc = obd_statfs(class_exp2obd(sbi->ll_osc_exp), &obd_osfs, max_age);
         if (rc) {
                 CERROR("obd_statfs fails: rc = %d\n", rc);
                 RETURN(rc);
@@ -869,16 +743,6 @@ void ll_read_inode2(struct inode *inode, void *opaque)
         }
 }
 
-int it_disposition(struct lookup_intent *it, int flag)
-{
-        return it->d.lustre.it_disposition & flag;
-}
-
-void it_set_disposition(struct lookup_intent *it, int flag)
-{
-        it->d.lustre.it_disposition |= flag;
-}
-
 void ll_umount_begin(struct super_block *sb)
 {
         struct ll_sb_info *sbi = ll_s2sbi(sb);
@@ -887,27 +751,27 @@ void ll_umount_begin(struct super_block *sb)
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:\n");
 
-        obd = class_conn2obd(&sbi->ll_mdc_conn);
+        obd = class_exp2obd(sbi->ll_mdc_exp);
         if (obd == NULL) {
                 CERROR("Invalid MDC connection handle "LPX64"\n",
-                       sbi->ll_mdc_conn.cookie);
+                       sbi->ll_mdc_exp->exp_handle.h_cookie);
                 EXIT;
                 return;
         }
         obd->obd_no_recov = 1;
-        obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_mdc_conn, sizeof ioc_data,
+        obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_mdc_exp, sizeof ioc_data,
                       &ioc_data, NULL);
 
-        obd = class_conn2obd(&sbi->ll_osc_conn);
+        obd = class_exp2obd(sbi->ll_osc_exp);
         if (obd == NULL) {
                 CERROR("Invalid LOV connection handle "LPX64"\n",
-                       sbi->ll_osc_conn.cookie);
+                       sbi->ll_osc_exp->exp_handle.h_cookie);
                 EXIT;
                 return;
         }
 
         obd->obd_no_recov = 1;
-        obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_osc_conn, sizeof ioc_data,
+        obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_osc_exp, sizeof ioc_data,
                       &ioc_data, NULL);
 
         /* Really, we'd like to wait until there are no requests outstanding,
index f3bc191..f9b629e 100644 (file)
@@ -13,13 +13,13 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count);
 void lov_free_memmd(struct lov_stripe_md **lsmp);
 
 /* lov_pack.c */
-int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
+int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm,
                struct lov_stripe_md *lsm);
-int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
+int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsm,
                  struct lov_mds_md *lmm, int lmmsize);
-int lov_setstripe(struct lustre_handle *conn,
+int lov_setstripe(struct obd_export *exp,
                   struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
-int lov_getstripe(struct lustre_handle *conn,
+int lov_getstripe(struct obd_export *exp,
                   struct lov_stripe_md *lsm, struct lov_mds_md *lmmu);
 
 /* lproc_lov.c */
index b12e5fc..a93f1cf 100644 (file)
@@ -6,4 +6,4 @@
 include $(src)/../portals/Kernelenv
 
 obj-y += mdc.o
-mdc-objs := mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o
+mdc-objs := mdc_locks.o mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o
diff --git a/lustre/obdclass/recov_log.c b/lustre/obdclass/recov_log.c
deleted file mode 100644 (file)
index bff90f3..0000000
+++ /dev/null
@@ -1,470 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
- *   Author: Andreas Dilger <adilger@clusterfs.com>
- *
- *   This file is part of Lustre, http://www.lustre.org.
- *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
- *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
- *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- *
- * OST<->MDS recovery logging infrastructure.
- *
- * Invariants in implementation:
- * - we do not share logs among different OST<->MDS connections, so that
- *   if an OST or MDS fails it need only look at log(s) relevant to itself
- */
-
-#define DEBUG_SUBSYSTEM S_LOG
-
-#ifndef EXPORT_SYMTAB
-#define EXPORT_SYMTAB
-#endif
-
-#include <linux/fs.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_log.h>
-#include <portals/list.h>
-
-/* Allocate a new log or catalog handle */
-struct llog_handle *llog_alloc_handle(void)
-{
-        struct llog_handle *loghandle;
-        ENTRY;
-
-        OBD_ALLOC(loghandle, sizeof(*loghandle));
-        if (loghandle == NULL)
-                RETURN(ERR_PTR(-ENOMEM));
-
-        OBD_ALLOC(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
-        if (loghandle->lgh_hdr == NULL) {
-                OBD_FREE(loghandle, sizeof(*loghandle));
-                RETURN(ERR_PTR(-ENOMEM));
-        }
-
-        INIT_LIST_HEAD(&loghandle->lgh_list);
-        sema_init(&loghandle->lgh_lock, 1);
-
-        RETURN(loghandle);
-}
-EXPORT_SYMBOL(llog_alloc_handle);
-
-void llog_free_handle(struct llog_handle *loghandle)
-{
-        if (!loghandle)
-                return;
-
-        list_del_init(&loghandle->lgh_list);
-        OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
-        OBD_FREE(loghandle, sizeof(*loghandle));
-}
-EXPORT_SYMBOL(llog_free_handle);
-
-/* Create a new log handle and add it to the open list.
- * This log handle will be closed when all of the records in it are removed.
- *
- * Assumes caller has already pushed us into the kernel context and is locking.
- */
-struct llog_handle *llog_new_log(struct llog_handle *cathandle,
-                                 struct obd_uuid *tgtuuid)
-{
-        struct llog_handle *loghandle;
-        struct llog_object_hdr *llh;
-        loff_t offset;
-        int rc, index, bitmap_size, i;
-        ENTRY;
-
-        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
-
-        loghandle = cathandle->lgh_log_create(cathandle->lgh_obd);
-        if (IS_ERR(loghandle))
-                RETURN(loghandle);
-
-        llh = loghandle->lgh_hdr;
-        llh->llh_hdr.lth_type = LLOG_OBJECT_MAGIC;
-        llh->llh_hdr.lth_len = llh->llh_hdr_end_len = sizeof(*llh);
-        llh->llh_timestamp = LTIME_S(CURRENT_TIME);
-        llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
-        memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
-        loghandle->lgh_tgtuuid = &llh->llh_tgtuuid;
-
-        llh = cathandle->lgh_hdr;
-        bitmap_size = sizeof(llh->llh_bitmap) * 8;
-        /* This should basically always find the first entry free */
-        for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) {
-                index %= bitmap_size;
-                if (ext2_set_bit(index, llh->llh_bitmap)) {
-                        /* XXX This should trigger log clean up or similar */
-                        CERROR("catalog index %d is still in use\n", index);
-                } else {
-                        llh->llh_count = (index + 1) % bitmap_size;
-                        break;
-                }
-        }
-        if (i == bitmap_size)
-                CERROR("no free catalog slots for log...\n");
-
-        CDEBUG(D_HA, "new recovery log "LPX64":%x catalog index %u\n",
-               loghandle->lgh_cookie.lgc_lgl.lgl_oid,
-               loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index);
-        loghandle->lgh_cookie.lgc_index = index;
-
-        offset = sizeof(*llh) + index * sizeof(loghandle->lgh_cookie);
-
-        /* XXX Hmm, what to do if the catalog update fails?  Under normal
-         *     operations we would clean this handle up anyways, and at
-         *     worst we leak some objects, but there is little point in
-         *     doing the logging in that case...
-         *
-         *     We don't want to mark a catalog in-use if it wasn't written.
-         *     The only danger is if the OST crashes - the log is lost.
-         */
-        rc = lustre_fwrite(cathandle->lgh_file, &loghandle->lgh_cookie,
-                           sizeof(loghandle->lgh_cookie), &offset);
-        if (rc != sizeof(loghandle->lgh_cookie)) {
-                CERROR("error adding log "LPX64" to catalog: rc %d\n",
-                       loghandle->lgh_cookie.lgc_lgl.lgl_oid, rc);
-                rc = rc < 0 ? : -ENOSPC;
-        } else {
-                offset = 0;
-                rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
-                                   &offset);
-                if (rc != sizeof(*llh)) {
-                        CERROR("error marking catalog entry %d in use: rc %d\n",
-                               index, rc);
-                        rc = rc < 0 ? : -ENOSPC;
-                }
-        }
-        cathandle->lgh_current = loghandle;
-        list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list);
-
-        RETURN(loghandle);
-}
-EXPORT_SYMBOL(llog_new_log);
-
-/* Assumes caller has already pushed us into the kernel context. */
-int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
-{
-        struct llog_object_hdr *llh;
-        loff_t offset = 0;
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
-
-        down(&cathandle->lgh_lock);
-        llh = cathandle->lgh_hdr;
-
-        if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
-write_hdr:      llh->llh_hdr.lth_type = LLOG_CATALOG_MAGIC;
-                llh->llh_hdr.lth_len = llh->llh_hdr_end_len = LLOG_CHUNK_SIZE;
-                llh->llh_timestamp = LTIME_S(CURRENT_TIME);
-                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
-                memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
-                rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
-                                   &offset);
-                if (rc != LLOG_CHUNK_SIZE) {
-                        CERROR("error writing catalog header: rc %d\n", rc);
-                        OBD_FREE(llh, sizeof(*llh));
-                        if (rc >= 0)
-                                rc = -ENOSPC;
-                } else
-                        rc = 0;
-        } else {
-                rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
-                                  &offset);
-                if (rc != LLOG_CHUNK_SIZE) {
-                        CERROR("error reading catalog header: rc %d\n", rc);
-                        /* Can we do much else if the header is bad? */
-                        goto write_hdr;
-                } else
-                        rc = 0;
-        }
-
-        cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
-        up(&cathandle->lgh_lock);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_init_catalog);
-
-/* Return the currently active log handle.  If the current log handle doesn't
- * have enough space left for the current record, start a new one.
- *
- * If reclen is 0, we only want to know what the currently active log is,
- * otherwise we get a lock on this log so nobody can steal our space.
- *
- * Assumes caller has already pushed us into the kernel context and is locking.
- */
-static struct llog_handle *llog_current_log(struct llog_handle *cathandle,
-                                            int reclen)
-{
-        struct llog_handle *loghandle = NULL;
-        ENTRY;
-
-        loghandle = cathandle->lgh_current;
-        if (loghandle) {
-                struct llog_object_hdr *llh = loghandle->lgh_hdr;
-                if (llh->llh_count < sizeof(llh->llh_bitmap) * 8)
-                        RETURN(loghandle);
-        }
-
-        if (reclen)
-                loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid);
-        RETURN(loghandle);
-}
-
-/* Add a single record to the recovery log(s).
- * Returns number of bytes in returned logcookies, or negative error code.
- *
- * Assumes caller has already pushed us into the kernel context.
- */
-int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
-                    struct llog_cookie *logcookies)
-{
-        struct llog_handle *loghandle;
-        struct llog_object_hdr *llh;
-        int reclen = rec->lth_len;
-        struct file *file;
-        loff_t offset;
-        size_t left;
-        int index;
-        int rc;
-        ENTRY;
-
-        LASSERT(rec->lth_len <= LLOG_CHUNK_SIZE);
-        down(&cathandle->lgh_lock);
-        loghandle = llog_current_log(cathandle, reclen);
-        if (IS_ERR(loghandle)) {
-                up(&cathandle->lgh_lock);
-                RETURN(PTR_ERR(loghandle));
-        }
-        down(&loghandle->lgh_lock);
-        up(&cathandle->lgh_lock);
-
-        llh = loghandle->lgh_hdr;
-        file = loghandle->lgh_file;
-
-        /* Make sure that records don't cross a chunk boundary, so we can
-         * process them page-at-a-time if needed.  If it will cross a chunk
-         * boundary, write in a fake (but referenced) entry to pad the chunk.
-         *
-         * We know that llog_current_log() will return a loghandle that is
-         * big enough to hold reclen, so all we care about is padding here.
-         */
-        left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
-        if (left != 0 && left != reclen && left < reclen + LLOG_MIN_REC_SIZE) {
-                struct llog_null_trans {
-                        struct llog_trans_hdr hdr;
-                        __u32 padding[6];
-                } pad = { .hdr = { .lth_len = left } };
-
-                LASSERT(left >= LLOG_MIN_REC_SIZE);
-                if (left <= sizeof(pad))
-                        *(__u32 *)((char *)&pad + left - sizeof(__u32)) = left;
-
-                rc = lustre_fwrite(loghandle->lgh_file, &pad,
-                                   min(sizeof(pad), left),
-                                   &loghandle->lgh_file->f_pos);
-                if (rc != min(sizeof(pad), left)) {
-                        CERROR("error writing padding record: rc %d\n", rc);
-                        GOTO(out, rc = rc < 0 ? rc : -EIO);
-                }
-
-                left -= rc;
-                if (left) {
-                        LASSERT(left >= sizeof(__u32));
-                        loghandle->lgh_file->f_pos += left - sizeof(__u32);
-                        rc = lustre_fwrite(loghandle->lgh_file, &pad,
-                                           sizeof(__u32),
-                                           &loghandle->lgh_file->f_pos);
-                        if (rc != sizeof(__u32)) {
-                                CERROR("error writing padding end: rc %d\n",
-                                       rc);
-                                GOTO(out, rc < 0 ? rc : -ENOSPC);
-                        }
-                }
-
-                loghandle->lgh_index++;
-        }
-
-        index = loghandle->lgh_index++;
-        if (ext2_set_bit(index, llh->llh_bitmap)) {
-                CERROR("argh, index %u already set in log bitmap?\n", index);
-                LBUG(); /* should never happen */
-        }
-        llh->llh_count++;
-
-        offset = 0;
-        rc = lustre_fwrite(loghandle->lgh_file, llh, sizeof(*llh), &offset);
-        if (rc != sizeof(*llh)) {
-                CERROR("error writing log header: rc %d\n", rc);
-                GOTO(out, rc < 0 ? rc : -EIO);
-        }
-
-        rc = lustre_fwrite(loghandle->lgh_file, rec, reclen,
-                           &loghandle->lgh_file->f_pos);
-        if (rc != reclen) {
-                CERROR("error writing log record: rc %d\n", rc);
-                GOTO(out, rc < 0 ? rc : -ENOSPC);
-        }
-
-        CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n",
-               loghandle->lgh_cookie.lgc_lgl.lgl_oid,
-               loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lth_len);
-        *logcookies = loghandle->lgh_cookie;
-        logcookies->lgc_index = index;
-
-        rc = 0;
-out:
-        up(&loghandle->lgh_lock);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_add_record);
-
-/* Remove a log entry from the catalog.
- * Assumes caller has already pushed us into the kernel context and is locking.
- */
-int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle)
-{
-        struct llog_cookie *lgc = &loghandle->lgh_cookie;
-        int catindex = lgc->lgc_index;
-        struct llog_object_hdr *llh = cathandle->lgh_hdr;
-        loff_t offset = 0;
-        int rc = 0;
-        ENTRY;
-
-        CDEBUG(D_HA, "log "LPX64":%x empty, closing\n",
-               lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen);
-
-        if (ext2_clear_bit(catindex, llh->llh_bitmap)) {
-                CERROR("catalog index %u already clear?\n", catindex);
-                LBUG();
-        } else {
-                rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
-                                   &offset);
-
-                if (rc != sizeof(*llh)) {
-                        CERROR("log %u cancel error: rc %d\n", catindex, rc);
-                        if (rc >= 0)
-                                rc = -EIO;
-                } else
-                        rc = 0;
-        }
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_delete_log);
-
-/* Assumes caller has already pushed us into the kernel context and is locking.
- * We return a lock on the handle to ensure nobody yanks it from us.
- */
-static struct llog_handle *llog_id2handle(struct llog_handle *cathandle,
-                                          struct llog_cookie *logcookie)
-{
-        struct llog_handle *loghandle;
-        struct llog_logid *lgl = &logcookie->lgc_lgl;
-        ENTRY;
-
-        if (cathandle == NULL)
-                RETURN(ERR_PTR(-EBADF));
-
-        list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) {
-                struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl;
-                if (cgl->lgl_oid == lgl->lgl_oid) {
-                        if (cgl->lgl_ogen != lgl->lgl_ogen) {
-                                CERROR("log "LPX64" generation %x != %x\n",
-                                       lgl->lgl_oid, cgl->lgl_ogen,
-                                       lgl->lgl_ogen);
-                                continue;
-                        }
-                        GOTO(out, loghandle);
-                }
-        }
-
-        loghandle = cathandle->lgh_log_open(cathandle->lgh_obd, logcookie);
-        if (IS_ERR(loghandle)) {
-                CERROR("error opening log id "LPX64":%x: rc %d\n",
-                       lgl->lgl_oid, lgl->lgl_ogen, (int)PTR_ERR(loghandle));
-        } else {
-                list_add(&loghandle->lgh_list, &cathandle->lgh_list);
-        }
-
-out:
-        RETURN(loghandle);
-}
-
-/* For each cookie in the cookie array, we clear the log in-use bit and either:
- * - the log is empty, so mark it free in the catalog header and delete it
- * - the log is not empty, just write out the log header
- *
- * The cookies may be in different log files, so we need to get new logs
- * each time.
- *
- * Assumes caller has already pushed us into the kernel context.
- */
-int llog_cancel_records(struct llog_handle *cathandle, int count,
-                        struct llog_cookie *cookies)
-{
-        int i, rc = 0;
-        ENTRY;
-
-        down(&cathandle->lgh_lock);
-        for (i = 0; i < count; i++, cookies++) {
-                struct llog_handle *loghandle;
-                struct llog_object_hdr *llh;
-                struct llog_logid *lgl = &cookies->lgc_lgl;
-
-                loghandle = llog_id2handle(cathandle, cookies);
-                if (IS_ERR(loghandle)) {
-                        if (!rc)
-                                rc = PTR_ERR(loghandle);
-                        continue;
-                }
-
-                down(&loghandle->lgh_lock);
-                llh = loghandle->lgh_hdr;
-                CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n",
-                       lgl->lgl_oid, cookies->lgc_index,
-                       ext2_test_bit(cookies->lgc_index, llh->llh_bitmap));
-                if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) {
-                        CERROR("log index %u in "LPX64":%x already clear?\n",
-                               cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen);
-                } else if (--llh->llh_count == 0 &&
-                           loghandle != llog_current_log(cathandle, 0)) {
-                        loghandle->lgh_log_close(cathandle, loghandle);
-                } else {
-                        loff_t offset = 0;
-                        int ret = lustre_fwrite(loghandle->lgh_file, llh,
-                                                sizeof(*llh), &offset);
-
-                        if (ret != sizeof(*llh)) {
-                                CERROR("error cancelling index %u: rc %d\n",
-                                       cookies->lgc_index, ret);
-                                /* XXX mark handle bad? */
-                                if (!rc)
-                                        rc = ret;
-                        }
-                }
-                up(&loghandle->lgh_lock);
-        }
-        up(&cathandle->lgh_lock);
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_cancel_records);
-
-int llog_close_log(struct llog_handle *cathandle, struct llog_handle *loghandle)
-{
-        return loghandle->lgh_log_close(cathandle, loghandle);
-}
-EXPORT_SYMBOL(llog_close_log);
index 9f0b5ed..81f2c66 100644 (file)
@@ -16,6 +16,8 @@
 #include <linux/lustre_handles.h>
 #include <linux/obd.h>
 
+#define FILTER_LAYOUT_VERSION "2"
+
 #ifndef OBD_FILTER_DEVICENAME
 # define OBD_FILTER_DEVICENAME "obdfilter"
 #endif
@@ -25,7 +27,7 @@
 #endif
 
 #define LAST_RCVD "last_rcvd"
-#define FILTER_INIT_OBJID 2
+#define FILTER_INIT_OBJID 0
 
 #define FILTER_LR_SERVER_SIZE    512
 
@@ -37,6 +39,7 @@
 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
 
 #define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
+#define FILTER_GROUPS 2 /* must be at least 2; not dynamic yet */
 
 #define FILTER_MOUNT_RECOV 2
 #define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
@@ -45,7 +48,7 @@
 struct filter_server_data {
         __u8  fsd_uuid[37];        /* server UUID */
         __u8  fsd_uuid_padding[3]; /* unused */
-        __u64 fsd_last_objid;      /* last created object ID */
+        __u64 fsd_unused;
         __u64 fsd_last_transno;    /* last completed transaction ID */
         __u64 fsd_mount_count;     /* FILTER incarnation number */
         __u32 fsd_feature_compat;  /* compatible feature flags */
@@ -72,22 +75,6 @@ struct filter_client_data {
         __u8  fcd_padding[FILTER_LR_CLIENT_SIZE - 64];
 };
 
-/* file data for open files on OST */
-struct filter_file_data {
-        struct portals_handle ffd_handle;
-        atomic_t              ffd_refcount;
-        struct list_head      ffd_export_list; /* export open list - fed_lock */
-        struct file          *ffd_file;         /* file handle */
-};
-
-struct filter_dentry_data {
-        struct llog_cookie      fdd_cookie;
-        obd_id                  fdd_objid;
-        __u32                   fdd_magic;
-        atomic_t                fdd_open_count;
-        int                     fdd_flags;
-};
-
 #define FILTER_DENTRY_MAGIC 0x9efba101
 #define FILTER_FLAG_DESTROY 0x0001      /* destroy dentry on last file close */
 
@@ -103,21 +90,21 @@ enum {
 };
 
 /* filter.c */
-struct dentry *filter_parent(struct obd_device *, obd_mode mode, obd_id objid);
-struct dentry *filter_parent_lock(struct obd_device *, obd_mode mode,
-                                  obd_id objid, ldlm_mode_t lock_mode,
-                                  struct lustre_handle *lockh);
+struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid);
+struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id,
+                                  ldlm_mode_t, struct lustre_handle *);
 void f_dput(struct dentry *);
 struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir,
-                                 obd_mode mode, obd_id id);
+                                 obd_gr group, obd_id id);
 struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
                                   const char *what);
 #define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__)
 
 int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc);
-__u64 filter_next_id(struct filter_obd *);
+__u64 filter_next_id(struct filter_obd *, struct obdo *);
 int filter_update_server_data(struct obd_device *, struct file *,
                               struct filter_server_data *, int force_sync);
+int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, obd_count len, void *buf,
                         char *option);
 
@@ -128,23 +115,29 @@ int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
 int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount,
                     struct obd_ioobj *, int niocount, struct niobuf_local *,
                     struct obd_trans_info *);
-int filter_brw(int cmd, struct lustre_handle *, struct obdo *,
+int filter_brw(int cmd, struct obd_export *, struct obdo *,
               struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *,
               struct obd_trans_info *);
+void flip_into_page_cache(struct inode *inode, struct page *new_page);
+
+/* filter_io_*.c */
+int filter_commitrw_write(struct obd_export *exp, int objcount,
+                          struct obd_ioobj *obj, int niocount,
+                          struct niobuf_local *res,
+                          struct obd_trans_info *oti);
 
 /* filter_log.c */
-int filter_log_cancel(struct lustre_handle *, struct lov_stripe_md *,
-                      int num_cookies, struct llog_cookie *, int flags);
 int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
                          obd_id oid, obd_count ogen, struct llog_cookie *);
 int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
                          obd_count ogen, struct llog_cookie *);
 struct llog_handle *filter_get_catalog(struct obd_device *);
-void filter_put_catalog(struct llog_handle *cathandle);
+
 
 /* filter_san.c */
 int filter_san_setup(struct obd_device *obd, obd_count len, void *buf);
 int filter_san_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
                       struct obd_ioobj *, int niocount, struct niobuf_remote *);
 
+
 #endif
index 971cf1d..a8d77c5 100644 (file)
@@ -43,8 +43,8 @@ static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
         int rc;
 
         page = grab_cache_page(mapping, index); /* locked page */
-        if (IS_ERR(page))
-                return lnb->rc = PTR_ERR(page);
+        if (page == NULL)
+                return lnb->rc = -ENOMEM;
 
         LASSERT(page->mapping == mapping);
 
@@ -99,138 +99,6 @@ err_page:
         return lnb->rc;
 }
 
-static struct page *lustre_get_page_write(struct inode *inode,
-                                          unsigned long index)
-{
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        page = grab_cache_page(mapping, index); /* locked page */
-
-        if (!IS_ERR(page)) {
-                /* Note: Called with "O" and "PAGE_SIZE" this is essentially
-                 * a no-op for most filesystems, because we write the whole
-                 * page.  For partial-page I/O this will read in the page.
-                 */
-                rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
-                if (rc) {
-                        CERROR("page index %lu, rc = %d\n", index, rc);
-                        if (rc != -ENOSPC)
-                                LBUG();
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-        }
-        return page;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-        return ERR_PTR(rc);
-}
-
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-int wait_on_page_locked(struct page *page)
-{
-        waitfor_one_page(page);
-        return 0;
-}
-
-/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data. */
-static inline void inode_update_time(struct inode *inode, int ctime_too)
-{
-        time_t now = CURRENT_TIME;
-        if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
-                return;
-        inode->i_mtime = now;
-        if (ctime_too)
-                inode->i_ctime = now;
-        mark_inode_dirty_sync(inode);
-}
-#endif
-
-static int lustre_commit_write(struct niobuf_local *lnb)
-{
-        struct page *page = lnb->page;
-        unsigned from = lnb->offset & ~PAGE_MASK;
-        unsigned to = from + lnb->len;
-        struct inode *inode = page->mapping->host;
-        int err;
-
-        LASSERT(to <= PAGE_SIZE);
-        err = page->mapping->a_ops->commit_write(NULL, page, from, to);
-#warning 2.4 folks: wait_on_page_locked does NOT return its error here.
-        if (!err && IS_SYNC(inode))
-                wait_on_page_locked(page);
-        //SetPageUptodate(page); // the client commit_write will do this
-
-        SetPageReferenced(page);
-        unlock_page(page);
-        page_cache_release(page);
-        return err;
-}
-
-int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
-                          int *pglocked)
-{
-        unsigned long index = lnb->offset >> PAGE_SHIFT;
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
-        if (*pglocked)
-                page = grab_cache_page_nowait(mapping, index); /* locked page */
-        else
-                page = grab_cache_page(mapping, index); /* locked page */
-
-
-        /* This page is currently locked, so get a temporary page instead. */
-        if (page == NULL) {
-                CDEBUG(D_INFO, "ino %lu page %ld locked\n", inode->i_ino,index);
-                page = alloc_pages(GFP_KERNEL, 0); /* locked page */
-                if (page == NULL) {
-                        CERROR("no memory for a temp page\n");
-                        GOTO(err, rc = -ENOMEM);
-                }
-                page->index = index;
-                lnb->page = page;
-                lnb->flags |= N_LOCAL_TEMP_PAGE;
-        } else if (!IS_ERR(page)) {
-                unsigned from = lnb->offset & ~PAGE_MASK, to = from + lnb->len;
-                (*pglocked)++;
-
-                rc = mapping->a_ops->prepare_write(NULL, page, from, to);
-                if (rc) {
-                        if (rc != -ENOSPC)
-                                CERROR("page index %lu, rc = %d\n", index, rc);
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-                lnb->page = page;
-        }
-
-        return 0;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-err:
-        return lnb->rc = rc;
-}
-
 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                               int objcount, struct obd_ioobj *obj,
                               int niocount, struct niobuf_remote *nb,
@@ -240,7 +108,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         struct obd_run_ctxt saved;
         struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
+        struct niobuf_local *lnb = NULL;
         struct fsfilt_objinfo *fso;
         struct dentry *dentry;
         struct inode *inode;
@@ -258,9 +126,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         memset(res, 0, niocount * sizeof(*res));
 
-        push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
         for (i = 0, o = obj; i < objcount; i++, o++) {
-                struct filter_dentry_data *fdd;
                 LASSERT(o->ioo_bufcnt);
 
                 dentry = filter_oa2dentry(exp->exp_obd, oa);
@@ -276,15 +143,13 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
                 fso[i].fso_dentry = dentry;
                 fso[i].fso_bufcnt = o->ioo_bufcnt;
-
-                fdd = dentry->d_fsdata;
-                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
-                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
-                               o->ioo_id);
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+                CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
+                       (jiffies - now));
 
         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
                 dentry = fso[i].fso_dentry;
@@ -325,7 +190,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
                             tot_bytes);
@@ -340,7 +208,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         EXIT;
 
@@ -355,49 +226,25 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                         f_dput(res->dentry);
                 else
                         CERROR("NULL dentry in cleanup -- tell CFS\n");
-                res->dentry = NULL;
         case 0:
                 OBD_FREE(fso, objcount * sizeof(*fso));
-                pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+                pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
         }
         return rc;
 }
 
-/* We need to balance prepare_write() calls with commit_write() calls.
- * If the page has been prepared, but we have no data for it, we don't
- * want to overwrite valid data on disk, but we still need to zero out
- * data for space which was newly allocated.  Like part of what happens
- * in __block_prepare_write() for newly allocated blocks.
- *
- * XXX currently __block_prepare_write() creates buffers for all the
- *     pages, and the filesystems mark these buffers as BH_New if they
- *     were newly allocated from disk. We use the BH_New flag similarly. */
-static int filter_commit_write(struct niobuf_local *lnb, int err)
+static int filter_start_page_write(struct inode *inode,
+                                   struct niobuf_local *lnb)
 {
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        if (err) {
-                unsigned block_start, block_end;
-                struct buffer_head *bh, *head = lnb->page->buffers;
-                unsigned blocksize = head->b_size;
-
-                /* debugging: just seeing if this ever happens */
-                CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
-                       "called for ino %lu:%lu on err %d\n",
-                       lnb->page->mapping->host->i_ino, lnb->page->index, err);
-
-                /* Currently one buffer per page, but in the future... */
-                for (bh = head, block_start = 0; bh != head || !block_start;
-                     block_start = block_end, bh = bh->b_this_page) {
-                        block_end = block_start + blocksize;
-                        if (buffer_new(bh)) {
-                                memset(kmap(lnb->page) + block_start, 0,
-                                       blocksize);
-                                kunmap(lnb->page);
-                        }
-                }
+        struct page *page = alloc_pages(GFP_HIGHUSER, 0);
+        if (page == NULL) {
+                CERROR("no memory for a temp page\n");
+                RETURN(lnb->rc = -ENOMEM);
         }
-#endif
-        return lustre_commit_write(lnb);
+        page->index = lnb->offset >> PAGE_SHIFT;
+        lnb->page = page;
+
+        return 0;
 }
 
 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
@@ -417,124 +264,72 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                struct obd_trans_info *oti)
 {
         struct obd_run_ctxt saved;
-        struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
-        struct fsfilt_objinfo *fso;
+        struct niobuf_local *lnb = NULL;
+        struct fsfilt_objinfo fso;
         struct dentry *dentry;
-        int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+        int rc = 0, i, tot_bytes = 0;
         unsigned long now = jiffies;
         ENTRY;
         LASSERT(objcount == 1);
-
-        OBD_ALLOC(fso, objcount * sizeof(*fso));
-        if (fso == NULL)
-                RETURN(-ENOMEM);
+        LASSERT(obj->ioo_bufcnt > 0);
 
         memset(res, 0, niocount * sizeof(*res));
 
-        push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
-        for (i = 0, o = obj; i < objcount; i++, o++) {
-                struct filter_dentry_data *fdd;
-                LASSERT(o->ioo_bufcnt);
-
-                dentry = filter_oa2dentry(exp->exp_obd, oa);
-                if (IS_ERR(dentry))
-                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
-
-                if (dentry->d_inode == NULL) {
-                        CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               o->ioo_id);
-                        f_dput(dentry);
-                        GOTO(out_objinfo, rc = -ENOENT);
-                }
-
-                fso[i].fso_dentry = dentry;
-                fso[i].fso_bufcnt = o->ioo_bufcnt;
-
-                down(&dentry->d_inode->i_sem);
-                fdd = dentry->d_fsdata;
-                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
-                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
-                               o->ioo_id);
-        }
+        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+        dentry = filter_fid2dentry(exp->exp_obd, NULL, 0, obj->ioo_id);
+        if (IS_ERR(dentry))
+                GOTO(cleanup, rc = PTR_ERR(dentry));
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
-
-        LASSERT(oti != NULL);
-        oti->oti_handle = fsfilt_brw_start(exp->exp_obd, objcount, fso,
-                                           niocount, oti);
-        if (IS_ERR(oti->oti_handle)) {
-                rc = PTR_ERR(oti->oti_handle);
-                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                       "error starting transaction: rc = %d\n", rc);
-                oti->oti_handle = NULL;
-                GOTO(out_objinfo, rc);
+        if (dentry->d_inode == NULL) {
+                CERROR("trying to BRW to non-existent file "LPU64"\n",
+                       obj->ioo_id);
+                f_dput(dentry);
+                GOTO(cleanup, rc = -ENOENT);
         }
 
-        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
-                dentry = fso[i].fso_dentry;
-                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        if (j == 0)
-                                lnb->dentry = dentry;
-                        else
-                                lnb->dentry = dget(dentry);
-
-                        lnb->offset = rnb->offset;
-                        lnb->len    = rnb->len;
-                        lnb->flags  = rnb->flags;
-                        lnb->start  = jiffies;
-
-                        rc = filter_get_page_write(dentry->d_inode, lnb,
-                                                   &pglocked);
-                        if (rc)
-                                up(&dentry->d_inode->i_sem);
+        fso.fso_dentry = dentry;
+        fso.fso_bufcnt = obj->ioo_bufcnt;
 
-                        if (rc) {
-                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                                       "page err %u@"LPU64" %u/%u %p: rc %d\n",
-                                       lnb->len, lnb->offset, j, o->ioo_bufcnt,
-                                       dentry, rc);
-                                f_dput(dentry);
-                                GOTO(out_pages, rc);
-                        }
-                        tot_bytes += lnb->len;
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
+                       (jiffies - now));
+
+        for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
+             i++, lnb++, rnb++) {
+                lnb->dentry = dentry;
+                lnb->offset = rnb->offset;
+                lnb->len    = rnb->len;
+                lnb->flags  = rnb->flags;
+                lnb->start  = jiffies;
+
+                rc = filter_start_page_write(dentry->d_inode, lnb);
+                if (rc) {
+                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
+                               LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
+                               i, obj->ioo_bufcnt, dentry, rc);
+                        while (lnb-- > res)
+                                __free_pages(lnb->page, 0);
+                        f_dput(dentry);
+                        GOTO(cleanup, rc);
                 }
+                tot_bytes += lnb->len;
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                             tot_bytes);
-
         EXIT;
-out:
-        OBD_FREE(fso, objcount * sizeof(*fso));
-        /* we saved the journal handle into oti->oti_handle instead */
-        current->journal_info = NULL;
-        pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+cleanup:
+        pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
         return rc;
-
-out_pages:
-        while (lnb-- > res) {
-                filter_commit_write(lnb, rc);
-                up(&lnb->dentry->d_inode->i_sem);
-                f_dput(lnb->dentry);
-        }
-        filter_finish_transno(exp, oti, rc);
-        fsfilt_commit(exp->exp_obd,
-                      filter_parent(exp->exp_obd,S_IFREG,obj->ioo_id)->d_inode,
-                      oti->oti_handle, 0);
-        goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
-        for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
-                up(&fso[i].fso_dentry->d_inode->i_sem);
-                f_dput(fso[i].fso_dentry);
-        }
-        goto out;
 }
 
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -551,55 +346,9 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                                           niocount, nb, res, oti);
 
         LBUG();
-
         return -EPROTO;
 }
 
-/* It is highly unlikely that we would ever get an error here.  The page we want
- * to get was previously locked, so it had to have already allocated the space,
- * and we were just writing over the same data, so there would be no hole in the
- * file.
- *
- * XXX: possibility of a race with truncate could exist, need to check that.
- *      There are no guarantees w.r.t. write order even on a local filesystem,
- *      although the normal response would be to return the number of bytes
- *      successfully written and leave the rest to the app. */
-static int filter_write_locked_page(struct niobuf_local *lnb)
-{
-        struct page *lpage;
-        void *lpage_addr, *lnb_addr;
-        int rc;
-        ENTRY;
-
-        lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
-        if (IS_ERR(lpage)) {
-                rc = PTR_ERR(lpage);
-                CERROR("error getting locked page index %ld: rc = %d\n",
-                       lnb->page->index, rc);
-                LBUG();
-                lustre_commit_write(lnb);
-                RETURN(rc);
-        }
-
-        /* 2 kmaps == vanishingly small deadlock opportunity */
-        lpage_addr = kmap(lpage);
-        lnb_addr = kmap(lnb->page);
-
-        memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
-
-        kunmap(lnb->page);
-        kunmap(lpage);
-
-        page_cache_release(lnb->page);
-
-        lnb->page = lpage;
-        rc = lustre_commit_write(lnb);
-        if (rc)
-                CERROR("error committing locked page %ld: rc = %d\n",
-                       lnb->page->index, rc);
-        RETURN(rc);
-}
-
 static int filter_commitrw_read(struct obd_export *exp, int objcount,
                                 struct obd_ioobj *obj, int niocount,
                                 struct niobuf_local *res,
@@ -621,144 +370,50 @@ static int filter_commitrw_read(struct obd_export *exp, int objcount,
         RETURN(0);
 }
 
-static int
-filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa,
-                      int objcount, struct obd_ioobj *obj, int niocount,
-                      struct niobuf_local *res, struct obd_trans_info *oti)
+void flip_into_page_cache(struct inode *inode, struct page *new_page)
 {
-        struct obd_run_ctxt saved;
-        struct obd_ioobj *o;
-        struct niobuf_local *lnb;
-        struct obd_device *obd = exp->exp_obd;
-        int found_locked = 0, rc = 0, i;
-        int nested_trans = current->journal_info != NULL;
-        unsigned long now = jiffies;  /* DEBUGGING OST TIMEOUTS */
+        struct page *old_page;
+        int rc;
         ENTRY;
 
-        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-
-        if (cmd & OBD_BRW_WRITE) {
-                LASSERT(oti);
-                LASSERT(current->journal_info == NULL ||
-                        current->journal_info == oti->oti_handle);
-                current->journal_info = oti->oti_handle;
-        }
-
-        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
-                struct inode *inode;
-                int j;
-
-                /* If all of the page reads were beyond EOF, let's pretend
-                 * this read didn't really happen at all. */
-                if (lnb->dentry == NULL) {
-                        oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
-                        continue;
-                }
-
-                inode = igrab(lnb->dentry->d_inode);
-
-                if (cmd & OBD_BRW_WRITE) {
-                        /* FIXME: MULTI OBJECT BRW */
-                        if (oa && oa->o_valid & (OBD_MD_FLMTIME|OBD_MD_FLCTIME))
-                                obdo_refresh_inode(inode, oa, OBD_MD_FLATIME |
-                                                   OBD_MD_FLMTIME |
-                                                   OBD_MD_FLCTIME);
-                        else
-                                inode_update_time(lnb->dentry->d_inode, 1);
-                } else if (oa && oa->o_valid & OBD_MD_FLATIME) {
-                        /* Note that we don't necessarily write this to disk */
-                        obdo_refresh_inode(inode, oa, OBD_MD_FLATIME);
-                }
-
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        if (lnb->page == NULL) {
-                                continue;
-                        }
-
-                        if (lnb->flags & N_LOCAL_TEMP_PAGE) {
-                                found_locked++;
-                                continue;
-                        }
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commitrw %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-
-                        if (cmd & OBD_BRW_WRITE) {
-                                int err = filter_commit_write(lnb, 0);
-
-                                if (!rc)
-                                        rc = err;
-                        } else {
-                                page_cache_release(lnb->page);
-                        }
-
-                        f_dput(lnb->dentry);
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commit_write %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
+        do {
+                /* the dlm is protecting us from read/write concurrency, so we
+                 * expect this find_lock_page to return quickly.  even if we
+                 * race with another writer it won't be doing much work with
+                 * the page locked.  we do this 'cause t_c_p expects a 
+                 * locked page, and it wants to grab the pagecache lock
+                 * as well. */
+                old_page = find_lock_page(inode->i_mapping, new_page->index);
+                if (old_page) {
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+                        truncate_complete_page(old_page);
+#else
+                        truncate_complete_page(old_page->mapping, old_page);
+#endif
+                        unlock_page(old_page);
+                        page_cache_release(old_page);
                 }
 
-                /* FIXME: MULTI OBJECT BRW */
-                if (oa) {
-                        oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
-                        obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
+#if 0 /* this should be a /proc tunable someday */
+                /* racing o_directs (no locking ioctl) could race adding
+                 * their pages, so we repeat the page invalidation unless
+                 * we successfully added our new page */
+                rc = add_to_page_cache_unique(new_page, inode->i_mapping, 
+                                              new_page->index,
+                                              page_hash(inode->i_mapping, 
+                                                        new_page->index));
+                if (rc == 0) {
+                        /* add_to_page_cache clears uptodate|dirty and locks
+                         * the page */
+                        SetPageUptodate(new_page);
+                        unlock_page(new_page);
                 }
+#else   
+                rc = 0;
+#endif
+        } while (rc != 0);
 
-                if (cmd & OBD_BRW_WRITE)
-                        up(&inode->i_sem);
-
-                iput(inode);
-        }
-
-        for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
-             i++, o++) {
-                int j;
-
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        int err;
-                        if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
-                                continue;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commitrw locked %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-
-                        err = filter_write_locked_page(lnb);
-                        if (!rc)
-                                rc = err;
-                        f_dput(lnb->dentry);
-                        found_locked--;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commit_write locked %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-                }
-        }
-
-        if (cmd & OBD_BRW_WRITE) {
-                /* We just want any dentry for the commit, for now */
-                struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
-                int err;
-
-                rc = filter_finish_transno(exp, oti, rc);
-                err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle,
-                                    obd_sync_filter);
-                if (err)
-                        rc = err;
-                if (obd_sync_filter)
-                        LASSERT(oti->oti_transno <= obd->obd_last_committed);
-                if (time_after(jiffies, now + 15 * HZ))
-                        CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
-        }
-
-        LASSERT(nested_trans || current->journal_info == NULL);
-        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-        RETURN(rc);
+        EXIT;
 }
 
 /* XXX needs to trickle its oa down */
@@ -767,8 +422,8 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                     struct niobuf_local *res, struct obd_trans_info *oti)
 {
         if (cmd == OBD_BRW_WRITE)
-                return filter_commitrw_write(cmd, exp, oa, objcount, obj,
-                                             niocount, res, oti);
+                return filter_commitrw_write(exp, objcount, obj, niocount,
+                                             res, oti);
         if (cmd == OBD_BRW_READ)
                 return filter_commitrw_read(exp, objcount, obj, niocount,
                                             res, oti);
@@ -776,11 +431,10 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         return -EPROTO;
 }
 
-int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
+int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md *lsm, obd_count oa_bufs,
                struct brw_page *pga, struct obd_trans_info *oti)
 {
-        struct obd_export *exp;
         struct obd_ioobj ioo;
         struct niobuf_local *lnb;
         struct niobuf_remote *rnb;
@@ -788,12 +442,6 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
         int ret = 0;
         ENTRY;
 
-        exp = class_conn2export(conn);
-        if (exp == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
-                RETURN(-EINVAL);
-        }
-
         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
 
@@ -826,8 +474,8 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
                 else
                         memcpy(virt + off, addr + off, pga[i].count);
 
-                kunmap(addr);
-                kunmap(virt);
+                kunmap(lnb[i].page);
+                kunmap(pga[i].pg);
         }
 
         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
@@ -837,6 +485,5 @@ out:
                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
         if (rnb)
                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
-        class_export_put(exp);
         RETURN(ret);
 }
index 77eb078..de21a1f 100644 (file)
 
 #include "filter_internal.h"
 
-static struct llog_handle *filter_log_create(struct obd_device *obd);
-
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static int filter_log_close(struct llog_handle *cathandle,
-                            struct llog_handle *loghandle)
-{
-        struct llog_object_hdr *llh = loghandle->lgh_hdr;
-        struct file *file = loghandle->lgh_file;
-        struct dentry *dparent = NULL, *dchild = NULL;
-        struct lustre_handle parent_lockh;
-        struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl;
-        int rc;
-        ENTRY;
-
-        /* If we are going to delete this log, grab a ref before we close
-         * it so we don't have to immediately do another lookup. */
-        if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
-                CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
-                       lgl->lgl_oid, lgl->lgl_ogen);
-                dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG,
-                                             lgl->lgl_oid,LCK_PW,&parent_lockh);
-                if (IS_ERR(dparent)) {
-                        rc = PTR_ERR(dparent);
-                        CERROR("error locking parent, orphan log %*s: rc %d\n",
-                               file->f_dentry->d_name.len,
-                               file->f_dentry->d_name.name, rc);
-                        RETURN(rc);
-                } else {
-                        dchild = dget(file->f_dentry);
-                        llog_delete_log(cathandle, loghandle);
-                }
-        } else {
-                CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
-                       lgl->lgl_oid, lgl->lgl_ogen);
-        }
-
-        rc = filp_close(file, 0);
-
-        llog_free_handle(loghandle); /* also removes loghandle from list */
-
-        if (dchild != NULL) {
-                int err = vfs_unlink(dparent->d_inode, dchild);
-                if (err) {
-                        CERROR("error unlinking empty log %*s: rc %d\n",
-                               dchild->d_name.len, dchild->d_name.name, err);
-                        if (!rc)
-                                rc = err;
-                }
-                f_dput(dchild);
-                ldlm_lock_decref(&parent_lockh, LCK_PW);
-        }
-        RETURN(rc);
-}
-
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static struct llog_handle *filter_log_open(struct obd_device *obd,
-                                           struct llog_cookie *logcookie)
-{
-        struct llog_logid *lgl = &logcookie->lgc_lgl;
-        struct llog_handle *loghandle;
-        struct dentry *dchild;
-        int rc;
-        ENTRY;
-
-        loghandle = llog_alloc_handle();
-        if (!loghandle)
-                RETURN(ERR_PTR(-ENOMEM));
-
-        dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid);
-        if (IS_ERR(dchild))
-                GOTO(out_handle, rc = PTR_ERR(dchild));
-
-        if (dchild->d_inode == NULL) {
-                CERROR("logcookie references non-existent object %*s\n",
-                       dchild->d_name.len, dchild->d_name.name);
-                GOTO(out_dentry, rc = -ENOENT);
-        }
-
-        if (dchild->d_inode->i_generation != lgl->lgl_ogen) {
-                CERROR("logcookie for %*s had different generation %x != %x\n",
-                       dchild->d_name.len, dchild->d_name.name,
-                       dchild->d_inode->i_generation, lgl->lgl_ogen);
-                GOTO(out_dentry, rc = -ESTALE);
-        }
-
-        /* dentry_open does a dput(dchild) and mntput(mnt) on error */
-        mntget(obd->u.filter.fo_vfsmnt);
-        loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt,
-                                          O_RDWR);
-        if (IS_ERR(loghandle->lgh_file)) {
-                rc = PTR_ERR(loghandle->lgh_file);
-                CERROR("error opening logfile %*s: rc %d\n",
-                       dchild->d_name.len, dchild->d_name.name, rc);
-                GOTO(out_dentry, rc);
-        }
-        memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
-        loghandle->lgh_log_create = filter_log_create;
-        loghandle->lgh_log_open = filter_log_open;
-        loghandle->lgh_log_close = filter_log_close;
-        loghandle->lgh_obd = obd;
-        RETURN(loghandle);
-
-out_dentry:
-        f_dput(dchild);
-out_handle:
-        llog_free_handle(loghandle);
-        RETURN(ERR_PTR(rc));
-}
-
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static struct llog_handle *filter_log_create(struct obd_device *obd)
-{
-        struct filter_obd *filter = &obd->u.filter;
-        struct lustre_handle parent_lockh;
-        struct dentry *dparent, *dchild;
-        struct llog_handle *loghandle;
-        struct file *file;
-        int err, rc;
-        obd_id id;
-        ENTRY;
-
-        loghandle = llog_alloc_handle();
-        if (!loghandle)
-                RETURN(ERR_PTR(-ENOMEM));
-
- retry:
-        id = filter_next_id(filter);
-
-        dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh);
-        if (IS_ERR(dparent))
-                GOTO(out_ctxt, rc = PTR_ERR(dparent));
-
-        dchild = filter_fid2dentry(obd, dparent, S_IFREG, id);
-        if (IS_ERR(dchild))
-                GOTO(out_lock, rc = PTR_ERR(dchild));
-
-        if (dchild->d_inode != NULL) {
-                /* This would only happen if lastobjid was bad on disk */
-                CERROR("Serious error: objid %*s already exists; is this "
-                       "filesystem corrupt?  I will try to work around it.\n",
-                       dchild->d_name.len, dchild->d_name.name);
-                f_dput(dchild);
-                ldlm_lock_decref(&parent_lockh, LCK_PW);
-                goto retry;
-        }
-
-        rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
-        if (rc) {
-                CERROR("log create failed rc = %d\n", rc);
-                GOTO(out_child, rc);
-        }
-
-        rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                       filter->fo_fsd, 0);
-        if (rc) {
-                CERROR("can't write lastobjid but log created: rc %d\n",rc);
-                GOTO(out_destroy, rc);
-        }
-
-        /* dentry_open does a dput(dchild) and mntput(mnt) on error */
-        mntget(filter->fo_vfsmnt);
-        file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
-        if (IS_ERR(file)) {
-                rc = PTR_ERR(file);
-                CERROR("error opening log file "LPX64": rc %d\n", id, rc);
-                GOTO(out_destroy, rc);
-        }
-        ldlm_lock_decref(&parent_lockh, LCK_PW);
-
-        loghandle->lgh_file = file;
-        loghandle->lgh_cookie.lgc_lgl.lgl_oid = id;
-        loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation;
-        loghandle->lgh_log_create = filter_log_create;
-        loghandle->lgh_log_open = filter_log_open;
-        loghandle->lgh_log_close = filter_log_close;
-        loghandle->lgh_obd = obd;
-
-        RETURN(loghandle);
-
-out_destroy:
-        err = vfs_unlink(dparent->d_inode, dchild);
-        if (err)
-                CERROR("error unlinking %*s on error: rc %d\n",
-                       dchild->d_name.len, dchild->d_name.name, err);
-out_child:
-        f_dput(dchild);
-out_lock:
-        ldlm_lock_decref(&parent_lockh, LCK_PW);
-out_ctxt:
-        llog_free_handle(loghandle);
-        RETURN(ERR_PTR(rc));
-}
-
 /* This is called from filter_setup() and should be single threaded */
 struct llog_handle *filter_get_catalog(struct obd_device *obd)
 {
@@ -240,20 +44,18 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd)
         struct filter_server_data *fsd = filter->fo_fsd;
         struct obd_run_ctxt saved;
         struct llog_handle *cathandle = NULL;
+        struct llog_logid logid;
         int rc;
         ENTRY;
 
-        push_ctxt(&saved, &filter->fo_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
         if (fsd->fsd_catalog_oid) {
-                struct llog_cookie catcookie;
-
-                catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
-                catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
-                cathandle = filter_log_open(obd, &catcookie);
-                if (IS_ERR(cathandle)) {
+                logid.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
+                logid.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
+                rc = llog_create(obd, &cathandle, &logid, NULL);
+                if (rc) {
                         CERROR("error opening catalog "LPX64":%x: rc %d\n",
-                               catcookie.lgc_lgl.lgl_oid,
-                               catcookie.lgc_lgl.lgl_ogen,
+                               logid.lgl_oid, logid.lgl_ogen,
                                (int)PTR_ERR(cathandle));
                         fsd->fsd_catalog_oid = 0;
                         fsd->fsd_catalog_ogen = 0;
@@ -261,17 +63,15 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd)
         }
 
         if (!fsd->fsd_catalog_oid) {
-                struct llog_logid *lgl;
-
-                cathandle = filter_log_create(obd);
-                if (IS_ERR(cathandle)) {
-                        CERROR("error creating new catalog: rc %d\n",
-                               (int)PTR_ERR(cathandle));
+                rc = llog_create(obd, &cathandle, NULL, NULL);
+                if (rc) {
+                        CERROR("error creating new catalog: rc %d\n", rc);
+                        cathandle = ERR_PTR(rc);
                         GOTO(out, cathandle);
                 }
-                lgl = &cathandle->lgh_cookie.lgc_lgl;
-                fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
-                fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
+                logid = cathandle->lgh_id;
+                fsd->fsd_catalog_oid = cpu_to_le64(logid.lgl_oid);
+                fsd->fsd_catalog_ogen = cpu_to_le32(logid.lgl_ogen);
                 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,fsd,0);
                 if (rc) {
                         CERROR("error writing new catalog to disk: rc %d\n",rc);
@@ -279,52 +79,19 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd)
                 }
         }
 
-        rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid);
+        rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->u.filter.fo_mdc_uuid);
         if (rc)
                 GOTO(out_handle, rc);
 out:
-        pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
         RETURN(cathandle);
 
 out_handle:
-        filter_log_close(cathandle, cathandle);
+        llog_close(cathandle);
         cathandle = ERR_PTR(rc);
         goto out;
 }
 
-void filter_put_catalog(struct llog_handle *cathandle)
-{
-        struct llog_handle *loghandle, *n;
-        int rc;
-        ENTRY;
-
-        list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
-                filter_log_close(cathandle, loghandle);
-
-        rc = filp_close(cathandle->lgh_file, 0);
-        if (rc)
-                CERROR("error closing catalog: rc %d\n", rc);
-
-        llog_free_handle(cathandle);
-        EXIT;
-}
-
-int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
-                      int num_cookies, struct llog_cookie *logcookies,
-                      int flags)
-{
-        struct obd_device *obd = class_conn2obd(conn);
-        struct obd_run_ctxt saved;
-        int rc;
-        ENTRY;
-
-        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-        rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies,
-                                 logcookies);
-        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-
-        RETURN(rc);
-}
 
 int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
                          obd_id oid, obd_count ogen,
@@ -337,15 +104,15 @@ int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
         OBD_ALLOC(lcr, sizeof(*lcr));
         if (lcr == NULL)
                 RETURN(-ENOMEM);
-        lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr);
-        lcr->lcr_hdr.lth_type = OST_CREATE_REC;
+        lcr->lcr_hdr.lrh_len = lcr->lcr_tail.lrt_len = sizeof(*lcr);
+        lcr->lcr_hdr.lrh_type = OST_CREATE_REC;
         lcr->lcr_fid.id = mds_fid->id;
         lcr->lcr_fid.generation = mds_fid->generation;
         lcr->lcr_fid.f_type = mds_fid->f_type;
         lcr->lcr_oid = oid;
         lcr->lcr_ogen = ogen;
 
-        rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie);
+        rc = llog_cat_add_rec(cathandle, &lcr->lcr_hdr, logcookie, NULL);
         OBD_FREE(lcr, sizeof(*lcr));
 
         if (rc > 0) {
@@ -365,12 +132,12 @@ int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
         OBD_ALLOC(lor, sizeof(*lor));
         if (lor == NULL)
                 RETURN(-ENOMEM);
-        lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor);
-        lor->lor_hdr.lth_type = OST_ORPHAN_REC;
+        lor->lor_hdr.lrh_len = lor->lor_tail.lrt_len = sizeof(*lor);
+        lor->lor_hdr.lrh_type = OST_ORPHAN_REC;
         lor->lor_oid = oid;
         lor->lor_ogen = ogen;
 
-        rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie);
+        rc = llog_cat_add_rec(cathandle, &lor->lor_hdr, logcookie, NULL);
 
         if (rc > 0) {
                 LASSERT(rc == sizeof(*logcookie));
index 064cf51..f7fb9d4 100644 (file)
@@ -8,4 +8,4 @@ include $(src)/../portals/Kernelenv
 obj-y += ptlrpc.o
 ptlrpc-objs := recover.o connection.o ptlrpc_module.o events.o service.o \
                client.o niobuf.o pack_generic.o lproc_ptlrpc.o pinger.o \
-               recov_thread.o ptlrpc_lib.o
+               recov_thread.o ptlrpc_lib.o import.o