Whamcloud - gitweb
mgs_llog_read_header
authorlincent <lincent>
Wed, 2 Nov 2005 14:15:38 +0000 (14:15 +0000)
committerlincent <lincent>
Wed, 2 Nov 2005 14:15:38 +0000 (14:15 +0000)
   mgs_llog_create

lustre/include/linux/lustre_disk.h
lustre/include/linux/lustre_log.h
lustre/include/linux/lustre_mgs.h
lustre/include/linux/lvfs.h
lustre/include/linux/obd.h
lustre/mds/handler.c
lustre/mgs/mgs_handler.c
lustre/mgs/mgs_llog.c
lustre/obdclass/llog_lvfs.c

index a59372c..4b4d8ca 100644 (file)
@@ -37,6 +37,7 @@
    Used before the setup llog can be read. */
 #define MOUNT_CONFIGS_DIR "CONFIGS"
 #define MOUNT_DATA_FILE   MOUNT_CONFIGS_DIR"/mountdata"
+#define SYSTEM_DB_FILE "SYSTEM_DB"
 
 #define LDD_MAGIC 0xbabb0001
 
@@ -121,6 +122,7 @@ struct mkfs_opts {
         char  mo_loopdev[128];          /* in case a loop dev is needed */
         __u64 mo_device_sz;
         int   mo_flags; 
+
         /* Below here is required for writing mdt,ost,or client logs */
         int   mo_stripe_sz;
         int   mo_stripe_count;
index a9fe1a2..9ed9aee 100644 (file)
@@ -189,6 +189,15 @@ struct llog_operations {
 
 /* llog_lvfs.c */
 extern struct llog_operations llog_lvfs_ops;
+int llog_lvfs_write_rec(struct llog_handle *loghandle,
+                        struct llog_rec_hdr *rec,
+                        struct llog_cookie *reccookie, int cookiecount,
+                        void *buf, int idx);
+int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
+                         int next_idx, __u64 *cur_offset, void *buf,
+                         int len);
+int llog_lvfs_close(struct llog_handle *loghandle);
+int llog_lvfs_destroy(struct llog_handle *loghandle);
 extern struct llog_operations mgs_llog_lvfs_ops;
 
 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
index 23f8ddd..60a0c77 100644 (file)
@@ -75,19 +75,20 @@ struct mgc_op_data {
 
 struct ost_info {
         struct list_head osi_list;
-        char             osi_ostname[40];
-        char             osi_nodename[40];
-        char             osi_ostuuid[40];
+        char             osi_ostname[64];
+        char             osi_nodename[64];
+        char             osi_ostuuid[64];
         lnet_nid_t       osi_nid;
         __u32            osi_nal;
         __u32            osi_stripe_index;
 };
 
 struct system_db {
-        char              fsname[40];
-        char              mds_name[40];  
-        char              mds_uuid[40];
-        char              mds_nodename[40];
+        __u64             version;
+        char              fsname[64];
+        char              mds_name[64];
+        char              mds_uuid[64];
+        char              mds_nodename[64];
         lnet_nid_t        mds_nid;
         struct lov_desc   lovdesc;
         int               ost_number;
@@ -113,10 +114,9 @@ struct mgs_open_llog {
         char                 mol_fsname[40];
         struct llog_handle  *mol_cfg_llh;
         struct dentry       *mol_dentry;
-        __u64                mol_version;
         spinlock_t           mol_lock;
+        int                  mol_refs;
         struct system_db    *mol_system_db;
-        struct list_head     mol_vesion_descs;
 };
 
 int mgs_fs_setup(struct obd_device *obd, struct vfsmount *mnt);
index 967efc5..ac90e02 100644 (file)
@@ -1,28 +1,28 @@
-        /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
        * vim:expandtab:shiftwidth=8:tabstop=8:
        *
        *  Copyright (C) 2001 Cluster File Systems, Inc. <braam@clusterfs.com>
        *
        *   This file is part of Lustre, http://www.lustre.org.
        *
        *   Lustre is free software; you can redistribute it and/or
        *   modify it under the terms of version 2 of the GNU General Public
        *   License as published by the Free Software Foundation.
        *
        *   Lustre is distributed in the hope that it will be useful,
        *   but WITHOUT ANY WARRANTY; without even the implied warranty of
        *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
        *   GNU General Public License for more details.
        *
        *   You should have received a copy of the GNU General Public License
        *   along with Lustre; if not, write to the Free Software
        *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
        *
        * lustre VFS/process permission interface
        */
-
-        #ifndef __LVFS_H__
-        #define __LVFS_H__
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001 Cluster File Systems, Inc. <braam@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * lustre VFS/process permission interface
+ */
+
+#ifndef __LVFS_H__
+#define __LVFS_H__
 
 #include <libcfs/kp30.h>
 #include <linux/lustre_ucache.h>
@@ -137,6 +137,55 @@ static inline struct dentry *ll_lookup_one_len(const char *fid_name,
         return dchild;
 }
 
+/* Look up an entry by inode number. */
+/* this function ONLY returns valid dget'd dentries with an initialized inode
+   or errors */
+static inline struct dentry * ll_fid2dentry(struct dentry *parent,
+                                            __u64 ino, __u32 generation)
+{
+        char fid_name[32];
+        struct inode *inode;
+        struct dentry *result;
+
+        if (ino == 0)
+                RETURN(ERR_PTR(-ESTALE));
+
+        snprintf(fid_name, sizeof(fid_name), "0x%lx", (unsigned long)ino);
+
+        /* under ext3 this is neither supposed to return bad inodes
+           nor NULL inodes. */
+        result = ll_lookup_one_len(fid_name, parent, strlen(fid_name));
+        if (IS_ERR(result))
+                RETURN(result);
+
+        inode = result->d_inode;
+        if (!inode)
+                RETURN(ERR_PTR(-ENOENT));
+
+        if (inode->i_generation == 0 || inode->i_nlink == 0) {
+                LCONSOLE_WARN("Found inode with zero generation or link -- this"
+                              " may indicate disk corruption (inode: %lu, link:"
+                              " %lu, count: %d)\n", inode->i_ino,
+                              (unsigned long)inode->i_nlink,
+                              atomic_read(&inode->i_count));
+                dput(result);
+                RETURN(ERR_PTR(-ENOENT));
+        }
+
+        if (generation && inode->i_generation != generation) {
+                /* we didn't find the right inode.. */
+                CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
+                       "count: %d, generation %u/%u\n", inode->i_ino,
+                       (unsigned long)inode->i_nlink,
+                       atomic_read(&inode->i_count), inode->i_generation,
+                       generation);
+                dput(result);
+                RETURN(ERR_PTR(-ENOENT));
+        }
+
+        RETURN(result);
+}
+
 static inline void ll_sleep(int t)
 {
         set_current_state(TASK_INTERRUPTIBLE);
index 0644778..d338b30 100644 (file)
@@ -349,6 +349,7 @@ struct mgs_obd {
         struct vfsmount                 *mgs_vfsmnt;
         struct super_block              *mgs_sb;
         struct dentry                   *mgs_configs_dir;
+        spinlock_t                       mgs_open_llogs_lock;
         struct list_head                 mgs_open_llogs;
         struct llog_handle              *mgs_cfg_llh;
 };
index 5ca65f7..fc9c53f 100644 (file)
@@ -189,56 +189,16 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
         RETURN(retval);
 }
 
-/* Look up an entry by inode number. */
-/* this function ONLY returns valid dget'd dentries with an initialized inode
-   or errors */
+/* Look up an entry by inode number in mds obd */
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt)
 {
-        char fid_name[32];
-        unsigned long ino = fid->id;
-        __u32 generation = fid->generation;
-        struct inode *inode;
         struct dentry *result;
 
-        if (ino == 0)
-                RETURN(ERR_PTR(-ESTALE));
-
-        snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
-
         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
-               ino, generation, mds->mds_sb);
-
-        /* under ext3 this is neither supposed to return bad inodes
-           nor NULL inodes. */
-        result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
-        if (IS_ERR(result))
-                RETURN(result);
-
-        inode = result->d_inode;
-        if (!inode)
-                RETURN(ERR_PTR(-ENOENT));
-
-        if (inode->i_generation == 0 || inode->i_nlink == 0) {
-                LCONSOLE_WARN("Found inode with zero generation or link -- this"
-                              " may indicate disk corruption (inode: %lu, link:"
-                              " %lu, count: %d)\n", inode->i_ino,
-                              (unsigned long)inode->i_nlink,
-                              atomic_read(&inode->i_count));
-                dput(result);
-                RETURN(ERR_PTR(-ENOENT));
-        }
-
-        if (generation && inode->i_generation != generation) {
-                /* we didn't find the right inode.. */
-                CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
-                       "count: %d, generation %u/%u\n", inode->i_ino,
-                       (unsigned long)inode->i_nlink,
-                       atomic_read(&inode->i_count), inode->i_generation,
-                       generation);
-                dput(result);
-                RETURN(ERR_PTR(-ENOENT));
-        }
+               (unsigned long)fid->id, fid->generation, mds->mds_sb);
+
+        result = ll_fid2dentry(mds->mds_fid_de, fid->id, fid->generation);
 
         if (mnt) {
                 *mnt = mds->mds_vfsmnt;
index 3242334..fa19e91 100644 (file)
@@ -213,7 +213,6 @@ static int mgs_setup(struct obd_device *obd, obd_count len, void *buf)
         }
 
         INIT_LIST_HEAD(&mgs->mgs_open_llogs);
- //       INIT_LIST_HEAD(&mgs->mgs_update_llhs);
 
         rc = llog_start_commit_thread();
         if (rc < 0)
@@ -519,8 +518,7 @@ static int mgt_setup(struct obd_device *obd, obd_count len, void *buf)
                 GOTO(err_lprocfs, rc = -ENOMEM);
         }
 
-        rc = ptlrpc_start_n_threads(obd, mgs->mgs_service, MGT_NUM_THREADS,
-                                    "ll_mgt");
+        rc = ptlrpc_start_threads(obd, mgs->mgs_service, "ll_mgt");
         if (rc)
                 GOTO(err_thread, rc);
 
index 898fd17..c40829d 100644 (file)
@@ -24,7 +24,7 @@
  */
 
 #ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
+#define EXPORT_SYMTAB
 #endif
 #define DEBUG_SUBSYSTEM S_MGS
 
@@ -177,10 +177,10 @@ static int record_mount_point(struct obd_device *obd, struct llog_handle *llh,
 
 struct mgs_open_llog* find_mgs_open_llog(struct obd_device *obd, char *name)
 {
-        struct mgs_obd *mgs= &obd->u.mgs;
+        struct mgs_obd *mgs = &obd->u.mgs;
         struct list_head *tmp;
         struct mgs_open_llog *mol;
-        char fsname[40];
+        char fsname[64];
         char *p;
 
         p = strrchr(name, '/');
@@ -197,6 +197,71 @@ struct mgs_open_llog* find_mgs_open_llog(struct obd_device *obd, char *name)
         return NULL;
 }
 
+struct mgs_open_llog* create_mgs_open_llog(struct obd_device *obd, char *name)
+{
+        struct mgs_obd *mgs = &obd->u.mgs;
+        struct mgs_open_llog *mol, *tmp;
+        char *p;
+        int rc;
+
+        OBD_ALLOC(mol, sizeof(*mol));
+        if (!mol) {
+                CERROR("can not allocate memory for mgs_open_llog.\n");
+                return NULL;
+        }
+
+        p = strrchr(name, '/');
+        if (p != NULL)
+                strncpy(mol->mol_fsname, name, p - name);
+        else {
+                CERROR("logname need to include fsname.\n");
+                goto cleanup;
+        }
+
+        rc = mgs_load_system_db(obd, name, &mol->mol_system_db);
+        if (rc) 
+                goto cleanup;
+
+        mol->mol_refs = 1;
+
+        spin_lock_init(&mol->mol_lock);
+
+        spin_lock(&mgs->mgs_open_llogs_lock);
+
+        tmp = find_mgs_open_llog(obd, name);
+        if(tmp) {
+               OBD_FREE(mol->mol_system_db, sizeof(struct system_db));
+               OBD_FREE(mol, sizeof(*mol));
+               mol = tmp;
+        } else 
+               list_add(&mol->mol_list, &mgs->mgs_open_llogs);
+
+        spin_unlock(&mgs->mgs_open_llogs_lock);
+
+        return mol;
+        
+cleanup:
+        OBD_FREE(mol, sizeof(*mol));
+        return NULL;
+}
+
+struct mgs_open_llog* open_mgs_open_llog(struct obd_device *obd, char *name)
+{
+        struct mgs_open_llog *mol;
+
+        mol = find_mgs_open_llog(obd, name);
+        if (!mol) {
+                mol = create_mgs_open_llog(obd, name);
+                return mol;
+        }
+
+        spin_lock(&mol->mol_lock);
+        mol->mol_refs++;
+        spin_unlock(&mol->mol_lock);
+
+        return mol;
+}
+
 static int mgs_start_record(struct obd_device *obd, 
                             struct llog_handle *llh, char *name)
 {
@@ -262,7 +327,6 @@ static int mgs_clear_record(struct obd_device *obd,
 static int mgs_do_record(struct obd_device *obd, struct llog_handle *llh,
                          void *cfg_buf)
 {
-        
         struct lvfs_run_ctxt saved;
         struct llog_rec_hdr rec;
         int rc = 0;
@@ -768,71 +832,6 @@ static int mgs_llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
         RETURN(rc);
 }
 
-static int mgs_llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
-                                struct llog_rec_hdr *rec, void *buf, loff_t off)
-{
-        int rc;
-        struct llog_rec_tail end;
-        loff_t saved_off = file->f_pos;
-        int buflen = rec->lrh_len;
-
-        ENTRY;
-        file->f_pos = off;
-
-        if (!buf) {
-                rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
-                if (rc) {
-                        CERROR("error writing log record: rc %d\n", rc);
-                        goto out;
-                }
-                GOTO(out, rc = 0);
-        }
-
-        /* the buf case */
-        rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
-        rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
-        if (rc) {
-                CERROR("error writing log hdr: rc %d\n", rc);
-                goto out;
-        }
-
-        rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
-        if (rc) {
-                CERROR("error writing log buffer: rc %d\n", rc);
-                goto out;
-        }
-
-        end.lrt_len = rec->lrh_len;
-        end.lrt_index = rec->lrh_index;
-        rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
-        if (rc) {
-                CERROR("error writing log tail: rc %d\n", rc);
-                goto out;
-        }
-
-        rc = 0;
- out:
-        if (saved_off > file->f_pos)
-                file->f_pos = saved_off;
-        LASSERT(rc <= 0);
-        RETURN(rc);
-}
-
-static int mgs_llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
-                                void *buf, int size, loff_t off)
-{
-        loff_t offset = off;
-        int rc;
-        ENTRY;
-
-        rc = fsfilt_read_record(obd, file, buf, size, &offset);
-        if (rc) {
-                CERROR("error reading log record: rc %d\n", rc);
-                RETURN(rc);
-        }
-        RETURN(0);
-}
-
 static int mgs_llog_lvfs_read_header(struct llog_handle *handle)
 {
         struct obd_device *obd;
@@ -848,7 +847,7 @@ static int mgs_llog_lvfs_read_header(struct llog_handle *handle)
                 RETURN(LLOG_EEMPTY);
         }
 
-        rc = mgs_llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
+        rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
                                  LLOG_CHUNK_SIZE, 0);
         if (rc) {
                 CERROR("error reading log header from %.*s\n",
@@ -883,130 +882,69 @@ static int mgs_llog_lvfs_read_header(struct llog_handle *handle)
         RETURN(rc);
 }
 
-/* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
-/* appends if idx == -1, otherwise overwrites record idx. */
-static int mgs_llog_lvfs_write_rec(struct llog_handle *loghandle,
-                                   struct llog_rec_hdr *rec,
-                                   struct llog_cookie *reccookie, int cookiecount,
-                                   void *buf, int idx)
+static int mgs_llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
+                                struct llog_rec_hdr *rec, void *buf, loff_t off)
 {
-        struct llog_log_hdr *llh;
-        int reclen = rec->lrh_len, index, rc;
-        struct llog_rec_tail *lrt;
-        struct obd_device *obd;
-        struct file *file;
-        size_t left;
-        ENTRY;
-
-        llh = loghandle->lgh_hdr;
-        file = loghandle->lgh_file;
-        obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
-
-        /* record length should not bigger than LLOG_CHUNK_SIZE */
-        if (buf)
-                rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
-                      sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
-        else
-                rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
-        if (rc)
-                RETURN(rc);
-
-        if (idx != -1) {
-                loff_t saved_offset;
-
-                /* no header: only allowed to insert record 1 */
-                if (idx != 1 && !file->f_dentry->d_inode->i_size) {
-                        CERROR("idx != -1 in empty log\n");
-                        LBUG();
-                }
-
-                if (idx && llh->llh_size && llh->llh_size != reclen)
-                        RETURN(-EINVAL);
+        int rc;
+        struct llog_rec_tail end;
+        loff_t saved_off = file->f_pos;
+        int buflen = rec->lrh_len;
 
-                rc = mgs_llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
-                /* we are done if we only write the header or on error */
-                if (rc || idx == 0)
-                        RETURN(rc);
+        ENTRY;
+        file->f_pos = off;
 
-                saved_offset = sizeof(*llh) + (idx-1)*rec->lrh_len;
-                rc = mgs_llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
-                if (rc == 0 && reccookie) {
-                        reccookie->lgc_lgl = loghandle->lgh_id;
-                        reccookie->lgc_index = idx;
-                        rc = 1;
+        if (!buf) {
+                rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
+                if (rc) {
+                        CERROR("error writing log record: rc %d\n", rc);
+                        goto out;
                 }
-                RETURN(rc);
+                GOTO(out, rc = 0);
         }
 
-        /* Make sure that records don't cross a chunk boundary, so we can
-         * process them page-at-a-time if needed.  If it will cross a chunk
-         * boundary, write in a fake (but referenced) entry to pad the chunk.
-         *
-         * We know that llog_current_log() will return a loghandle that is
-         * big enough to hold reclen, so all we care about is padding here.
-         */
-        left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
-        if (buf)
-                reclen = sizeof(*rec) + rec->lrh_len + 
-                        sizeof(struct llog_rec_tail);
-
-        /* NOTE: padding is a record, but no bit is set */
-        if (left != 0 && left != reclen &&
-            left < (reclen + LLOG_MIN_REC_SIZE)) {
-                loghandle->lgh_last_idx++;
-                rc = mgs_llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
-                if (rc)
-                        RETURN(rc);
-                /* if it's the last idx in log file, then return -ENOSPC */
-                if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
-                        RETURN(-ENOSPC);
+        /* the buf case */
+        rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
+        rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
+        if (rc) {
+                CERROR("error writing log hdr: rc %d\n", rc);
+                goto out;
         }
 
-        loghandle->lgh_last_idx++;
-        index = loghandle->lgh_last_idx;
-        LASSERT(index < LLOG_BITMAP_SIZE(llh));
-        rec->lrh_index = index;
-        if (buf == NULL) {
-                lrt = (struct llog_rec_tail *)
-                        ((char *)rec + rec->lrh_len - sizeof(*lrt));
-                lrt->lrt_len = rec->lrh_len;
-                lrt->lrt_index = rec->lrh_index;
+        rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
+        if (rc) {
+                CERROR("error writing log buffer: rc %d\n", rc);
+                goto out;
         }
-        if (ext2_set_bit(index, llh->llh_bitmap)) {
-                CERROR("argh, index %u already set in log bitmap?\n", index);
-                LBUG(); /* should never happen */
+
+        end.lrt_len = rec->lrh_len;
+        end.lrt_index = rec->lrh_index;
+        rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
+        if (rc) {
+                CERROR("error writing log tail: rc %d\n", rc);
+                goto out;
         }
-        llh->llh_count++;
-        llh->llh_tail.lrt_index = index;
 
-        rc = mgs_llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
-        if (rc)
-                RETURN(rc);
+        rc = 0;
+ out:
+        if (saved_off > file->f_pos)
+                file->f_pos = saved_off;
+        LASSERT(rc <= 0);
+        RETURN(rc);
+}
 
-        rc = mgs_llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
-        if (rc)
-                RETURN(rc);
+static int mgs_llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
+                                void *buf, int size, loff_t off)
+{
+        loff_t offset = off;
+        int rc;
+        ENTRY;
 
-        CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
-               loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
-        if (rc == 0 && reccookie) {
-                reccookie->lgc_lgl = loghandle->lgh_id;
-                reccookie->lgc_index = index;
-                if ((rec->lrh_type == MDS_UNLINK_REC) || 
-                                (rec->lrh_type == MDS_SETATTR_REC))
-                        reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
-                else if (rec->lrh_type == OST_SZ_REC)
-                        reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
-                else if (rec->lrh_type == OST_RAID1_REC)
-                        reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
-                else
-                        reccookie->lgc_subsys = -1;
-                rc = 1;
+        rc = fsfilt_read_record(obd, file, buf, size, &offset);
+        if (rc) {
+                CERROR("error reading log record: rc %d\n", rc);
+                RETURN(rc);
         }
-        if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
-                rc = 1;
-
-        RETURN(rc);
+        RETURN(0);
 }
 
 /* We can skip reading at least as many log blocks as the number of
@@ -1023,90 +961,19 @@ static void llog_skip_over(__u64 *off, int curr, int goal)
                 ~(LLOG_CHUNK_SIZE - 1);
 }
 
-
-/* sets:
- *  - cur_offset to the furthest point read in the log file
- *  - cur_idx to the log index preceeding cur_offset
- * returns -EIO/-EINVAL on error
- */
-static int mgs_llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
-                                int next_idx, __u64 *cur_offset, void *buf,
-                                int len)
+static struct dentry * 
+mgs_lvfs_logid2dentry(struct obd_device *obd, struct dentry *parent,
+                      struct llog_logid *logid)
 {
-        int rc;
-        ENTRY;
-
-        if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
-                RETURN(-EINVAL);
-
-        CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
-               next_idx, *cur_idx, *cur_offset);
-
-        while (*cur_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
-                struct llog_rec_hdr *rec;
-                struct llog_rec_tail *tail;
-                loff_t ppos;
-
-                llog_skip_over(cur_offset, *cur_idx, next_idx);
-
-                ppos = *cur_offset;
-                rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
-                                        loghandle->lgh_file, buf, len,
-                                        &ppos);
-
-                if (rc) {
-                        CERROR("Cant read llog block at log id "LPU64
-                               "/%u offset "LPU64"\n",
-                               loghandle->lgh_id.lgl_oid,
-                               loghandle->lgh_id.lgl_ogen,
-                               *cur_offset);
-                        RETURN(rc);
-                }
-
-                /* put number of bytes read into rc to make code simpler */
-                rc = ppos - *cur_offset;
-                *cur_offset = ppos;
-
-                if (rc == 0) /* end of file, nothing to do */
-                        RETURN(0);
-
-                if (rc < sizeof(*tail)) {
-                        CERROR("Invalid llog block at log id "LPU64"/%u offset "
-                               LPU64"\n", loghandle->lgh_id.lgl_oid,
-                               loghandle->lgh_id.lgl_ogen, *cur_offset);
-                        RETURN(-EINVAL);
-                }
-
-                rec = buf;
-                tail = (struct llog_rec_tail *)((char *)buf + rc -
-                                                sizeof(struct llog_rec_tail));
-
-                if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) {
-                        lustre_swab_llog_rec(rec, tail);
-                }
+        struct mgs_obd *mgs = &obd->u.mgs;
+        struct ll_fid fid;
+        fid.id = logid->lgl_oid;
+        fid.generation = logid->lgl_ogen;
 
-                *cur_idx = tail->lrt_index;
+        CDEBUG(D_DENTRY, "--> mgs_logid2dentry: ino/gen %lu/%u, sb %p\n",
+               fid.id, fid.generation, mgs->mgs_sb);
 
-                /* this shouldn't happen */
-                if (tail->lrt_index == 0) {
-                        CERROR("Invalid llog tail at log id "LPU64"/%u offset "
-                               LPU64"\n", loghandle->lgh_id.lgl_oid,
-                               loghandle->lgh_id.lgl_ogen, *cur_offset);
-                        RETURN(-EINVAL);
-                }
-                if (tail->lrt_index < next_idx)
-                        continue;
-
-                /* sanity check that the start of the new buffer is no farther
-                 * than the record that we wanted.  This shouldn't happen. */
-                if (rec->lrh_index > next_idx) {
-                        CERROR("missed desired record? %u > %u\n",
-                               rec->lrh_index, next_idx);
-                        RETURN(-ENOENT);
-                }
-                RETURN(0);
-        }
-        RETURN(-EIO);
+        return ll_fid2dentry(parent, fid.id, fid.generation);
 }
 
 static struct file *llog_filp_open(char *name, int flags, int mode)
@@ -1138,13 +1005,13 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
 
 /* This is a callback from the llog_* functions.
  * Assumes caller has already pushed us into the kernel context. */
-static int mgs_llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int mgs_llog_lvfs_create(struct llog_ctxt *ctxt,
+                                struct llog_handle **res,
+                                struct llog_logid *logid, char *name)
 {
         struct llog_handle *handle;
         struct obd_device *obd;
         struct l_dentry *dchild = NULL;
-        struct obdo *oa = NULL;
         int rc = 0, cleanup_phase = 1;
         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
         ENTRY;
@@ -1159,8 +1026,13 @@ static int mgs_llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res
         obd = ctxt->loc_exp->exp_obd;
 
         if (logid != NULL) {
-                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
-                                             logid->lgl_ogen, logid->lgl_ogr);
+                struct mgs_open_llog *mol = find_mgs_open_llog(obd, name);
+                if (!mol) {
+                        CERROR("can not find mgs_open_llog: %s\n", name);
+                        GOTO(cleanup, -EINVAL);
+                }
+
+                dchild = mgs_logid2dentry(obd, mol->mol_dentry, logid);
 
                 if (IS_ERR(dchild)) {
                         rc = PTR_ERR(dchild);
@@ -1178,7 +1050,7 @@ static int mgs_llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res
                 }
 
                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
-                                                    O_RDWR | O_LARGEFILE);
+                                                 O_RDWR | O_LARGEFILE);
                 if (IS_ERR(handle->lgh_file)) {
                         rc = PTR_ERR(handle->lgh_file);
                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
@@ -1200,38 +1072,21 @@ static int mgs_llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res
                         handle->lgh_file->f_dentry->d_inode->i_ino;
                 handle->lgh_id.lgl_ogen =
                         handle->lgh_file->f_dentry->d_inode->i_generation;
-                
-        } else {
-                oa = obdo_alloc();
-                if (oa == NULL)
-                        GOTO(cleanup, rc = -ENOMEM);
-                /* XXX get some filter group constants */
-                oa->o_gr = 1;
-                oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
-                rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
-                if (rc)
-                        GOTO(cleanup, rc);
-
-                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
-                                             oa->o_generation, oa->o_gr);
 
-                if (IS_ERR(dchild))
-                        GOTO(cleanup, rc = PTR_ERR(dchild));
-                cleanup_phase = 2;
-                handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
-                                                 open_flags);
-                if (IS_ERR(handle->lgh_file))
-                        GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
-
-                handle->lgh_id.lgl_ogr = oa->o_gr;
-                handle->lgh_id.lgl_oid = oa->o_id;
-                handle->lgh_id.lgl_ogen = oa->o_generation;
+                rc = open_mgs_open_llog(obd, name);
+                if (rc) {
+                        CERROR("can not open mgs_open_llog (%s): rc %d\n",
+                                name, rc);
+                        GOTO(cleanup, rc);
+                }
+        } else {
+                CERROR("No llog id and llog name be specified.\n");
+                GOTO(cleanup, rc = -EINVAL);
         }
 
         handle->lgh_ctxt = ctxt;
- finish:
-        if (oa)
-                obdo_free(oa);
+
+finish:
         RETURN(rc);
 cleanup:
         switch (cleanup_phase) {
@@ -1243,16 +1098,6 @@ cleanup:
         goto finish;
 }
 
-static int mgs_llog_lvfs_close(struct llog_handle *handle)
-{
-        int rc;
-        ENTRY;
-
-        rc = filp_close(handle->lgh_file, 0);
-        if (rc)
-                CERROR("error closing log: rc %d\n", rc);
-        RETURN(rc);
-}
 
 static int mgs_llog_lvfs_destroy(struct llog_handle *handle)
 {
@@ -1392,12 +1237,12 @@ int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
 }
 
 struct llog_operations mgs_llog_lvfs_ops = {
-        lop_write_rec:   mgs_llog_lvfs_write_rec,
-        lop_next_block:  mgs_llog_lvfs_next_block,
-        lop_read_header: mgs_llog_lvfs_read_header,
         lop_create:      mgs_llog_lvfs_create,
-        lop_destroy:     mgs_llog_lvfs_destroy,
-        lop_close:       mgs_llog_lvfs_close,
+        lop_read_header: mgs_llog_lvfs_read_header,
+        lop_write_rec:   llog_lvfs_write_rec,
+        lop_next_block:  llog_lvfs_next_block,
+        lop_destroy:     llog_lvfs_destroy,
+        lop_close:       llog_lvfs_close,
         //        lop_cancel: llog_lvfs_cancel,
 };
 
@@ -1412,9 +1257,9 @@ static int mgs_llog_lvfs_read_header(struct llog_handle *handle)
 }
 
 static int mgs_llog_lvfs_write_rec(struct llog_handle *loghandle,
-                               struct llog_rec_hdr *rec,
-                               struct llog_cookie *reccookie, int cookiecount,
-                               void *buf, int idx)
+                                   struct llog_rec_hdr *rec,
+                                   struct llog_cookie *reccookie, int cookiecount,
+                                   void *buf, int idx)
 {
         LBUG();
         return 0;
@@ -1462,9 +1307,9 @@ int mgs_llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
 }
 
 struct llog_operations mgs_llog_lvfs_ops = {
-        lop_write_rec:   mgs_llog_lvfs_write_rec,
-        lop_next_block:  mgs_llog_lvfs_next_block,
-        lop_read_header: mgs_llog_lvfs_read_header,
+        lop_write_rec:   llog_lvfs_ops.lop_write_rec,
+        lop_next_block:  llog_lvfs_ops.lop_next_block,
+        lop_read_header: llog_lvfs_ops.lop_read_header,
         lop_create:      mgs_llog_lvfs_create,
         lop_destroy:     mgs_llog_lvfs_destroy,
         lop_close:       mgs_llog_lvfs_close,
index 2c8d2f3..5fdc908 100644 (file)
@@ -84,7 +84,7 @@ static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
         RETURN(rc);
 }
 
-static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
+int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
 {
         int rc;
@@ -149,7 +149,7 @@ static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
         RETURN(0);
 }
 
-static int llog_lvfs_read_header(struct llog_handle *handle)
+int llog_lvfs_read_header(struct llog_handle *handle)
 {
         struct obd_device *obd;
         int rc;
@@ -201,7 +201,7 @@ static int llog_lvfs_read_header(struct llog_handle *handle)
 
 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
 /* appends if idx == -1, otherwise overwrites record idx. */
-static int llog_lvfs_write_rec(struct llog_handle *loghandle,
+int llog_lvfs_write_rec(struct llog_handle *loghandle,
                                struct llog_rec_hdr *rec,
                                struct llog_cookie *reccookie, int cookiecount,
                                void *buf, int idx)
@@ -345,7 +345,7 @@ static void llog_skip_over(__u64 *off, int curr, int goal)
  *  - cur_idx to the log index preceeding cur_offset
  * returns -EIO/-EINVAL on error
  */
-static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
+int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
                                 int next_idx, __u64 *cur_offset, void *buf,
                                 int len)
 {
@@ -454,7 +454,7 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
 
 /* This is a callback from the llog_* functions.
  * Assumes caller has already pushed us into the kernel context. */
-static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
+int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                             struct llog_logid *logid, char *name)
 {
         struct llog_handle *handle;
@@ -559,7 +559,7 @@ cleanup:
         goto finish;
 }
 
-static int llog_lvfs_close(struct llog_handle *handle)
+int llog_lvfs_close(struct llog_handle *handle)
 {
         int rc;
         ENTRY;
@@ -570,7 +570,7 @@ static int llog_lvfs_close(struct llog_handle *handle)
         RETURN(rc);
 }
 
-static int llog_lvfs_destroy(struct llog_handle *handle)
+int llog_lvfs_destroy(struct llog_handle *handle)
 {
         struct dentry *fdentry;
         struct obdo *oa;
@@ -717,6 +717,12 @@ struct llog_operations llog_lvfs_ops = {
         //        lop_cancel: llog_lvfs_cancel,
 };
 
+EXPORT_SYMBOL(llog_lvfs_write_rec);
+EXPORT_SYMBOL(llog_lvfs_next_block);
+EXPORT_SYMBOL(llog_lvfs_read_header);
+EXPORT_SYMBOL(llog_lvfs_create);
+EXPORT_SYMBOL(llog_lvfs_destroy);
+EXPORT_SYMBOL(llog_lvfs_close);
 EXPORT_SYMBOL(llog_lvfs_ops);
 
 #else /* !__KERNEL__ */