Whamcloud - gitweb
merge b_orphan into b_merge, with orphan code disabled
authorpschwan <pschwan>
Fri, 4 Jul 2003 08:56:58 +0000 (08:56 +0000)
committerpschwan <pschwan>
Fri, 4 Jul 2003 08:56:58 +0000 (08:56 +0000)
lustre/include/linux/lustre_commit_confd.h [new file with mode: 0644]
lustre/include/linux/lustre_log.h [new file with mode: 0644]
lustre/mds/commit_confd.c [new file with mode: 0644]
lustre/obdclass/recov_log.c [new file with mode: 0644]
lustre/ptlrpc/recov_thread.c [new file with mode: 0644]

diff --git a/lustre/include/linux/lustre_commit_confd.h b/lustre/include/linux/lustre_commit_confd.h
new file mode 100644 (file)
index 0000000..5a021a8
--- /dev/null
@@ -0,0 +1,73 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001 Cluster File Systems, Inc. <info@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * Structures relating to the log commit thread.
+ */
+
+#ifndef _LUSTRE_COMMIT_CONFD_H
+#define _LUSTRE_COMMIT_CONFD_H
+
+#include <linux/lustre_log.h>
+
+struct llog_commit_data {
+        struct list_head           llcd_list;  /* free or pending struct list */
+        struct obd_import         *llcd_import;
+        struct llog_commit_master *llcd_lcm;
+        int                        llcd_tries; /* number of tries to send */
+        int                        llcd_cookiebytes;
+        struct llog_cookie         llcd_cookies[0];
+};
+
+struct llog_commit_master {
+        struct list_head        lcm_thread_busy;  /* list of busy daemons */
+        struct list_head        lcm_thread_idle;  /* list of idle daemons */
+        spinlock_t              lcm_thread_lock;  /* protects thread_list */
+        atomic_t                lcm_thread_numidle;/* number of idle threads */
+        int                     lcm_thread_total; /* total number of threads */
+        int                     lcm_thread_max;   /* <= num_osts normally */
+
+        int                     lcm_flags;
+        wait_queue_head_t       lcm_waitq;
+
+        struct list_head        lcm_llcd_pending; /* llog_commit_data to send */
+        struct list_head        lcm_llcd_resend;  /* try to resend this data */
+        struct list_head        lcm_llcd_free;    /* free llog_commit_data */
+        spinlock_t              lcm_llcd_lock;    /* protects llcd_free */
+        atomic_t                lcm_llcd_numfree; /* items on llcd_free */
+        int                     lcm_llcd_minfree; /* min free on llcd_free */
+        int                     lcm_llcd_maxfree; /* max free on llcd_free */
+};
+
+#define LLOG_LCM_FL_EXIT        0x01
+#define LLOG_LCM_FL_EXIT_FORCE  0x02
+
+/* the thread data that collects local commits and makes rpc's */
+struct llog_commit_daemon {
+        struct list_head           lcd_lcm_list;  /* list of daemon threads */
+        struct list_head           lcd_llcd_list; /* list of pending RPCs */
+        struct llog_commit_master *lcd_lcm;       /* pointer back to parent */
+};
+
+/* ptlrpc/recov_thread.c */
+int llog_start_commit_thread(void);
+struct llog_commit_data *llcd_grab(void);
+void llcd_send(struct llog_commit_data *llcd);
+
+#endif /* _LUSTRE_COMMIT_CONFD_H */
diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h
new file mode 100644 (file)
index 0000000..2f21583
--- /dev/null
@@ -0,0 +1,81 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001 Cluster File Systems, Inc. <info@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * Generic infrastructure for managing a collection of logs.
+ *
+ * These logs are used for:
+ *
+ * - orphan recovery: OST adds record on create
+ * - mtime/size consistency: the OST adds a record on first write
+ * - open/unlinked objects: OST adds a record on destroy
+ *
+ * - mds unlink log: the MDS adds an entry upon delete
+ *
+ * - raid1 replication log between OST's
+ * - MDS replication logs
+ */
+
+#ifndef _LUSTRE_LOG_H
+#define _LUSTRE_LOG_H
+
+#include <linux/lustre_idl.h>
+
+struct obd_trans_info;
+struct obd_device;
+struct lov_stripe_md;
+
+/* In-memory descriptor for a log object or log catalog */
+struct llog_handle {
+        struct list_head        lgh_list;
+        struct llog_cookie      lgh_cookie;
+        struct semaphore        lgh_lock;
+        struct obd_device      *lgh_obd;
+        void                   *lgh_hdr;
+        struct file            *lgh_file;
+        struct obd_uuid        *lgh_tgtuuid;
+        struct llog_handle     *lgh_current;
+        struct llog_handle     *(*lgh_log_create)(struct obd_device *obd);
+        struct llog_handle     *(*lgh_log_open)(struct obd_device *obd,
+                                                struct llog_cookie *logcookie);
+        int                     (*lgh_log_close)(struct llog_handle *cathandle,
+                                                 struct llog_handle *loghandle);
+        int                     lgh_index;
+};
+
+extern int llog_add_record(struct llog_handle *cathandle,
+                           struct llog_trans_hdr *rec,
+                           struct llog_cookie *logcookies);
+
+extern int llog_cancel_records(struct llog_handle *cathandle, int count,
+                               struct llog_cookie *cookies);
+
+extern struct llog_handle *llog_alloc_handle(void);
+extern void llog_free_handle(struct llog_handle *handle);
+extern int llog_init_catalog(struct llog_handle *cathandle,
+                             struct obd_uuid *tgtuuid);
+extern int llog_delete_log(struct llog_handle *cathandle,
+                           struct llog_handle *loghandle);
+extern int llog_close_log(struct llog_handle *cathandle,
+                          struct llog_handle *loghandle);
+extern struct llog_handle *llog_new_log(struct llog_handle *cathandle,
+                                        struct obd_uuid *tgtuuid);
+
+#endif
+
diff --git a/lustre/mds/commit_confd.c b/lustre/mds/commit_confd.c
new file mode 100644 (file)
index 0000000..557dc55
--- /dev/null
@@ -0,0 +1,76 @@
+
+
+void commit_add(struct )
+{
+        struct obd_import *import = commit_uuid2import(rec->  uuid);
+
+        if (!import) {
+                CERROR("unaware of OST UUID %s - dorpping\n", rec-> uuid);
+                EXIT;
+                return;
+        }
+
+        spin_lock(&import->llcconf_lock);
+        list_add(&rec->  &import);
+        spin_unlock(&import->llcconf_lock);
+        EXIT;
+        return;
+}
+
+void commit_confd_conf_import(struct obd_import *import,
+                              struct llog_commit_confirm_daemon *lccd)
+{
+        struct list_head *tmp, *save;
+
+
+        list_for_each_safe(&import->import_cc_list, tmp, save) {
+                struct llog_commit_data *cd;
+
+                if (atomic_read(import->import_cc_count) <=
+                    lccd->llcconf_lowwater)
+                        break;
+
+                cd = list_entry(tmp, struct llog_commit_data *, llcconf_entry);
+                atomic_dec(&import->import_cc_count);
+                commit_confd_add_and_fire(cd);
+        }
+        EXIT;
+        return;
+}
+
+
+int commit_confd_main(void *data)
+{
+        struct llog_commit_confirm_daemon *lccd = data;
+
+        while (1) {
+                /* something has happened */
+                event_wait();
+
+                if (lccd->flags & LCCD_STOP)
+                        break;
+
+
+                /* lock llccd imporlist */
+                spin_lock(&lccd->llcconf_lock);
+                list_for_each_safe(&lccd->llcconf_list,   ) {
+                        struct obd_import *import;
+                        import = list_entry(&lccd->llcconf_list,
+                                            struct obd_import,
+                                            import_entry);
+                        get_import(import);
+                        spin_unlock(&lccd->llcconf_lock);
+                        if (atomic_read(import->import_cc_count) >
+                            lccd->llcconf_highwater)
+                                commit_confd_conf_import(import);
+                        put_import(import);
+                        spin_lock(&lccd->llcconf_lock);
+
+                }
+                spin_unlock(&lccd->llcconf_lock);
+
+        }
+
+        lccd->flags = LCCD_STOPPED;
+        RETURN(0);
+}
diff --git a/lustre/obdclass/recov_log.c b/lustre/obdclass/recov_log.c
new file mode 100644 (file)
index 0000000..7c6a3f5
--- /dev/null
@@ -0,0 +1,470 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * OST<->MDS recovery logging infrastructure.
+ *
+ * Invariants in implementation:
+ * - we do not share logs among different OST<->MDS connections, so that
+ *   if an OST or MDS fails it need only look at log(s) relevant to itself
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+#define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_log.h>
+#include <portals/list.h>
+
+/* Allocate a new log or catalog handle */
+struct llog_handle *llog_alloc_handle(void)
+{
+        struct llog_handle *loghandle;
+        ENTRY;
+
+        OBD_ALLOC(loghandle, sizeof(*loghandle));
+        if (loghandle == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+
+        OBD_ALLOC(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+        if (loghandle->lgh_hdr == NULL) {
+                OBD_FREE(loghandle, sizeof(*loghandle));
+                RETURN(ERR_PTR(-ENOMEM));
+        }
+
+        INIT_LIST_HEAD(&loghandle->lgh_list);
+        sema_init(&loghandle->lgh_lock, 1);
+
+        RETURN(loghandle);
+}
+EXPORT_SYMBOL(llog_alloc_handle);
+
+void llog_free_handle(struct llog_handle *loghandle)
+{
+        if (!loghandle)
+                return;
+
+        list_del_init(&loghandle->lgh_list);
+        OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+        OBD_FREE(loghandle, sizeof(*loghandle));
+}
+EXPORT_SYMBOL(llog_free_handle);
+
+/* Create a new log handle and add it to the open list.
+ * This log handle will be closed when all of the records in it are removed.
+ *
+ * Assumes caller has already pushed us into the kernel context and is locking.
+ */
+struct llog_handle *llog_new_log(struct llog_handle *cathandle,
+                                 struct obd_uuid *tgtuuid)
+{
+        struct llog_handle *loghandle;
+        struct llog_object_hdr *llh;
+        loff_t offset;
+        int rc, index, bitmap_size, i;
+        ENTRY;
+
+        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
+
+        loghandle = cathandle->lgh_log_create(cathandle->lgh_obd);
+        if (IS_ERR(loghandle))
+                RETURN(loghandle);
+
+        llh = loghandle->lgh_hdr;
+        llh->llh_hdr.lth_type = LLOG_OBJECT_MAGIC;
+        llh->llh_hdr.lth_len = llh->llh_hdr_end_len = sizeof(*llh);
+        llh->llh_timestamp = CURRENT_TIME;
+        llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+        memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
+        loghandle->lgh_tgtuuid = &llh->llh_tgtuuid;
+
+        llh = cathandle->lgh_hdr;
+        bitmap_size = sizeof(llh->llh_bitmap) * 8;
+        /* This should basically always find the first entry free */
+        for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) {
+                index %= bitmap_size;
+                if (ext2_set_bit(index, llh->llh_bitmap)) {
+                        /* XXX This should trigger log clean up or similar */
+                        CERROR("catalog index %d is still in use\n", index);
+                } else {
+                        llh->llh_count = (index + 1) % bitmap_size;
+                        break;
+                }
+        }
+        if (i == bitmap_size)
+                CERROR("no free catalog slots for log...\n");
+
+        CDEBUG(D_HA, "new recovery log "LPX64":%x catalog index %u\n",
+               loghandle->lgh_cookie.lgc_lgl.lgl_oid,
+               loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index);
+        loghandle->lgh_cookie.lgc_index = index;
+
+        offset = sizeof(*llh) + index * sizeof(loghandle->lgh_cookie);
+
+        /* XXX Hmm, what to do if the catalog update fails?  Under normal
+         *     operations we would clean this handle up anyways, and at
+         *     worst we leak some objects, but there is little point in
+         *     doing the logging in that case...
+         *
+         *     We don't want to mark a catalog in-use if it wasn't written.
+         *     The only danger is if the OST crashes - the log is lost.
+         */
+        rc = lustre_fwrite(cathandle->lgh_file, &loghandle->lgh_cookie,
+                           sizeof(loghandle->lgh_cookie), &offset);
+        if (rc != sizeof(loghandle->lgh_cookie)) {
+                CERROR("error adding log "LPX64" to catalog: rc %d\n",
+                       loghandle->lgh_cookie.lgc_lgl.lgl_oid, rc);
+                rc = rc < 0 ? : -ENOSPC;
+        } else {
+                offset = 0;
+                rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
+                                   &offset);
+                if (rc != sizeof(*llh)) {
+                        CERROR("error marking catalog entry %d in use: rc %d\n",
+                               index, rc);
+                        rc = rc < 0 ? : -ENOSPC;
+                }
+        }
+        cathandle->lgh_current = loghandle;
+        list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list);
+
+        RETURN(loghandle);
+}
+EXPORT_SYMBOL(llog_new_log);
+
+/* Assumes caller has already pushed us into the kernel context. */
+int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
+{
+        struct llog_object_hdr *llh;
+        loff_t offset = 0;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
+
+        down(&cathandle->lgh_lock);
+        llh = cathandle->lgh_hdr;
+
+        if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
+write_hdr:      llh->llh_hdr.lth_type = LLOG_CATALOG_MAGIC;
+                llh->llh_hdr.lth_len = llh->llh_hdr_end_len = LLOG_CHUNK_SIZE;
+                llh->llh_timestamp = CURRENT_TIME;
+                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+                memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
+                rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
+                                   &offset);
+                if (rc != LLOG_CHUNK_SIZE) {
+                        CERROR("error writing catalog header: rc %d\n", rc);
+                        OBD_FREE(llh, sizeof(*llh));
+                        if (rc >= 0)
+                                rc = -ENOSPC;
+                } else
+                        rc = 0;
+        } else {
+                rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
+                                  &offset);
+                if (rc != LLOG_CHUNK_SIZE) {
+                        CERROR("error reading catalog header: rc %d\n", rc);
+                        /* Can we do much else if the header is bad? */
+                        goto write_hdr;
+                } else
+                        rc = 0;
+        }
+
+        cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
+        up(&cathandle->lgh_lock);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_init_catalog);
+
+/* Return the currently active log handle.  If the current log handle doesn't
+ * have enough space left for the current record, start a new one.
+ *
+ * If reclen is 0, we only want to know what the currently active log is,
+ * otherwise we get a lock on this log so nobody can steal our space.
+ *
+ * Assumes caller has already pushed us into the kernel context and is locking.
+ */
+static struct llog_handle *llog_current_log(struct llog_handle *cathandle,
+                                            int reclen)
+{
+        struct llog_handle *loghandle = NULL;
+        ENTRY;
+
+        loghandle = cathandle->lgh_current;
+        if (loghandle) {
+                struct llog_object_hdr *llh = loghandle->lgh_hdr;
+                if (llh->llh_count < sizeof(llh->llh_bitmap) * 8)
+                        RETURN(loghandle);
+        }
+
+        if (reclen)
+                loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid);
+        RETURN(loghandle);
+}
+
+/* Add a single record to the recovery log(s).
+ * Returns number of bytes in returned logcookies, or negative error code.
+ *
+ * Assumes caller has already pushed us into the kernel context.
+ */
+int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
+                    struct llog_cookie *logcookies)
+{
+        struct llog_handle *loghandle;
+        struct llog_object_hdr *llh;
+        int reclen = rec->lth_len;
+        struct file *file;
+        loff_t offset;
+        size_t left;
+        int index;
+        int rc;
+        ENTRY;
+
+        LASSERT(rec->lth_len <= LLOG_CHUNK_SIZE);
+        down(&cathandle->lgh_lock);
+        loghandle = llog_current_log(cathandle, reclen);
+        if (IS_ERR(loghandle)) {
+                up(&cathandle->lgh_lock);
+                RETURN(PTR_ERR(loghandle));
+        }
+        down(&loghandle->lgh_lock);
+        up(&cathandle->lgh_lock);
+
+        llh = loghandle->lgh_hdr;
+        file = loghandle->lgh_file;
+
+        /* Make sure that records don't cross a chunk boundary, so we can
+         * process them page-at-a-time if needed.  If it will cross a chunk
+         * boundary, write in a fake (but referenced) entry to pad the chunk.
+         *
+         * We know that llog_current_log() will return a loghandle that is
+         * big enough to hold reclen, so all we care about is padding here.
+         */
+        left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
+        if (left != 0 && left != reclen && left < reclen + LLOG_MIN_REC_SIZE) {
+                struct llog_null_trans {
+                        struct llog_trans_hdr hdr;
+                        __u32 padding[6];
+                } pad = { .hdr = { .lth_len = left } };
+
+                LASSERT(left >= LLOG_MIN_REC_SIZE);
+                if (left <= sizeof(pad))
+                        *(__u32 *)((char *)&pad + left - sizeof(__u32)) = left;
+
+                rc = lustre_fwrite(loghandle->lgh_file, &pad,
+                                   min(sizeof(pad), left),
+                                   &loghandle->lgh_file->f_pos);
+                if (rc != min(sizeof(pad), left)) {
+                        CERROR("error writing padding record: rc %d\n", rc);
+                        GOTO(out, rc = rc < 0 ? rc : -EIO);
+                }
+
+                left -= rc;
+                if (left) {
+                        LASSERT(left >= sizeof(__u32));
+                        loghandle->lgh_file->f_pos += left - sizeof(__u32);
+                        rc = lustre_fwrite(loghandle->lgh_file, &pad,
+                                           sizeof(__u32),
+                                           &loghandle->lgh_file->f_pos);
+                        if (rc != sizeof(__u32)) {
+                                CERROR("error writing padding end: rc %d\n",
+                                       rc);
+                                GOTO(out, rc < 0 ? rc : -ENOSPC);
+                        }
+                }
+
+                loghandle->lgh_index++;
+        }
+
+        index = loghandle->lgh_index++;
+        if (ext2_set_bit(index, llh->llh_bitmap)) {
+                CERROR("argh, index %u already set in log bitmap?\n", index);
+                LBUG(); /* should never happen */
+        }
+        llh->llh_count++;
+
+        offset = 0;
+        rc = lustre_fwrite(loghandle->lgh_file, llh, sizeof(*llh), &offset);
+        if (rc != sizeof(*llh)) {
+                CERROR("error writing log header: rc %d\n", rc);
+                GOTO(out, rc < 0 ? rc : -EIO);
+        }
+
+        rc = lustre_fwrite(loghandle->lgh_file, rec, reclen,
+                           &loghandle->lgh_file->f_pos);
+        if (rc != reclen) {
+                CERROR("error writing log record: rc %d\n", rc);
+                GOTO(out, rc < 0 ? rc : -ENOSPC);
+        }
+
+        CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n",
+               loghandle->lgh_cookie.lgc_lgl.lgl_oid,
+               loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lth_len);
+        *logcookies = loghandle->lgh_cookie;
+        logcookies->lgc_index = index;
+
+        rc = 0;
+out:
+        up(&loghandle->lgh_lock);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_add_record);
+
+/* Remove a log entry from the catalog.
+ * Assumes caller has already pushed us into the kernel context and is locking.
+ */
+int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle)
+{
+        struct llog_cookie *lgc = &loghandle->lgh_cookie;
+        int catindex = lgc->lgc_index;
+        struct llog_object_hdr *llh = cathandle->lgh_hdr;
+        loff_t offset = 0;
+        int rc = 0;
+        ENTRY;
+
+        CDEBUG(D_HA, "log "LPX64":%x empty, closing\n",
+               lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen);
+
+        if (ext2_clear_bit(catindex, llh->llh_bitmap)) {
+                CERROR("catalog index %u already clear?\n", catindex);
+                LBUG();
+        } else {
+                rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
+                                   &offset);
+
+                if (rc != sizeof(*llh)) {
+                        CERROR("log %u cancel error: rc %d\n", catindex, rc);
+                        if (rc >= 0)
+                                rc = -EIO;
+                } else
+                        rc = 0;
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_delete_log);
+
+/* Assumes caller has already pushed us into the kernel context and is locking.
+ * We return a lock on the handle to ensure nobody yanks it from us.
+ */
+static struct llog_handle *llog_id2handle(struct llog_handle *cathandle,
+                                          struct llog_cookie *logcookie)
+{
+        struct llog_handle *loghandle;
+        struct llog_logid *lgl = &logcookie->lgc_lgl;
+        ENTRY;
+
+        if (cathandle == NULL)
+                RETURN(ERR_PTR(-EBADF));
+
+        list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) {
+                struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl;
+                if (cgl->lgl_oid == lgl->lgl_oid) {
+                        if (cgl->lgl_ogen != lgl->lgl_ogen) {
+                                CERROR("log "LPX64" generation %x != %x\n",
+                                       lgl->lgl_oid, cgl->lgl_ogen,
+                                       lgl->lgl_ogen);
+                                continue;
+                        }
+                        GOTO(out, loghandle);
+                }
+        }
+
+        loghandle = cathandle->lgh_log_open(cathandle->lgh_obd, logcookie);
+        if (IS_ERR(loghandle)) {
+                CERROR("error opening log id "LPX64":%x: rc %d\n",
+                       lgl->lgl_oid, lgl->lgl_ogen, (int)PTR_ERR(loghandle));
+        } else {
+                list_add(&loghandle->lgh_list, &cathandle->lgh_list);
+        }
+
+out:
+        RETURN(loghandle);
+}
+
+/* For each cookie in the cookie array, we clear the log in-use bit and either:
+ * - the log is empty, so mark it free in the catalog header and delete it
+ * - the log is not empty, just write out the log header
+ *
+ * The cookies may be in different log files, so we need to get new logs
+ * each time.
+ *
+ * Assumes caller has already pushed us into the kernel context.
+ */
+int llog_cancel_records(struct llog_handle *cathandle, int count,
+                        struct llog_cookie *cookies)
+{
+        int i, rc = 0;
+        ENTRY;
+
+        down(&cathandle->lgh_lock);
+        for (i = 0; i < count; i++, cookies++) {
+                struct llog_handle *loghandle;
+                struct llog_object_hdr *llh;
+                struct llog_logid *lgl = &cookies->lgc_lgl;
+
+                loghandle = llog_id2handle(cathandle, cookies);
+                if (IS_ERR(loghandle)) {
+                        if (!rc)
+                                rc = PTR_ERR(loghandle);
+                        continue;
+                }
+
+                down(&loghandle->lgh_lock);
+                llh = loghandle->lgh_hdr;
+                CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n",
+                       lgl->lgl_oid, cookies->lgc_index,
+                       ext2_test_bit(cookies->lgc_index, llh->llh_bitmap));
+                if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) {
+                        CERROR("log index %u in "LPX64":%x already clear?\n",
+                               cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen);
+                } else if (--llh->llh_count == 0 &&
+                           loghandle != llog_current_log(cathandle, 0)) {
+                        loghandle->lgh_log_close(cathandle, loghandle);
+                } else {
+                        loff_t offset = 0;
+                        int ret = lustre_fwrite(loghandle->lgh_file, llh,
+                                                sizeof(*llh), &offset);
+
+                        if (ret != sizeof(*llh)) {
+                                CERROR("error cancelling index %u: rc %d\n",
+                                       cookies->lgc_index, ret);
+                                /* XXX mark handle bad? */
+                                if (!rc)
+                                        rc = ret;
+                        }
+                }
+                up(&loghandle->lgh_lock);
+        }
+        up(&cathandle->lgh_lock);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cancel_records);
+
+int llog_close_log(struct llog_handle *cathandle, struct llog_handle *loghandle)
+{
+        return loghandle->lgh_log_close(cathandle, loghandle);
+}
+EXPORT_SYMBOL(llog_close_log);
diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c
new file mode 100644 (file)
index 0000000..1048629
--- /dev/null
@@ -0,0 +1,329 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2003 Cluster File Systems, Inc.
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * OST<->MDS recovery logging thread.
+ *
+ * Invariants in implementation:
+ * - we do not share logs among different OST<->MDS connections, so that
+ *   if an OST or MDS fails it need only look at log(s) relevant to itself
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_commit_confd.h>
+#include <portals/list.h>
+
+static struct llog_commit_master lustre_lcm;
+static struct llog_commit_master *lcm = &lustre_lcm;
+
+/* Allocate new commit structs in case we do not have enough */
+static int llcd_alloc(void)
+{
+        struct llog_commit_data *llcd;
+
+        OBD_ALLOC(llcd, PAGE_SIZE);
+        if (llcd == NULL)
+                return -ENOMEM;
+
+        llcd->llcd_lcm = lcm;
+
+        spin_lock(&lcm->lcm_llcd_lock);
+        list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
+        atomic_inc(&lcm->lcm_llcd_numfree);
+        spin_unlock(&lcm->lcm_llcd_lock);
+
+        return 0;
+}
+
+/* Get a free cookie struct from the list */
+struct llog_commit_data *llcd_grab(void)
+{
+        struct llog_commit_data *llcd;
+
+        spin_lock(&lcm->lcm_llcd_lock);
+        if (list_empty(&lcm->lcm_llcd_free)) {
+                spin_unlock(&lcm->lcm_llcd_lock);
+                if (llcd_alloc() < 0) {
+                        CERROR("unable to allocate log commit data!\n");
+                        return NULL;
+                }
+                spin_lock(&lcm->lcm_llcd_lock);
+        }
+
+        llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
+        list_del(&llcd->llcd_list);
+        atomic_dec(&lcm->lcm_llcd_numfree);
+        spin_unlock(&lcm->lcm_llcd_lock);
+
+        llcd->llcd_tries = 0;
+        llcd->llcd_cookiebytes = 0;
+
+        return llcd;
+}
+EXPORT_SYMBOL(llcd_grab);
+
+static void llcd_put(struct llog_commit_data *llcd)
+{
+        if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
+                OBD_FREE(llcd, PAGE_SIZE);
+        } else {
+                spin_lock(&lcm->lcm_llcd_lock);
+                list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
+                atomic_inc(&lcm->lcm_llcd_numfree);
+                spin_unlock(&lcm->lcm_llcd_lock);
+        }
+}
+
+/* Send some cookies to the appropriate target */
+void llcd_send(struct llog_commit_data *llcd)
+{
+        spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
+        list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
+        spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
+
+        wake_up_nr(&llcd->llcd_lcm->lcm_waitq, 1);
+}
+EXPORT_SYMBOL(llcd_send);
+
+static int log_commit_thread(void *arg)
+{
+        struct llog_commit_master *lcm = arg;
+        struct llog_commit_daemon *lcd;
+        struct llog_commit_data *llcd, *n;
+        long flags;
+
+        OBD_ALLOC(lcd, sizeof(*lcd));
+        if (lcd == NULL)
+                RETURN(-ENOMEM);
+
+        INIT_LIST_HEAD(&lcd->lcd_lcm_list);
+        INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+        lcd->lcd_lcm = lcm;
+
+        lock_kernel();
+        daemonize(); /* thread never needs to do IO */
+
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+
+        spin_lock(&lcm->lcm_thread_lock);
+        THREAD_NAME(current->comm, "ll_log_commit_%d", lcm->lcm_thread_total++);
+        spin_unlock(&lcm->lcm_thread_lock);
+        unlock_kernel();
+
+        CDEBUG(D_HA, "%s started\n", current->comm);
+        do {
+                struct ptlrpc_request *request;
+                struct obd_import *import;
+                struct list_head *sending_list;
+                int rc = 0;
+
+                /* If we do not have enough pages available, allocate some */
+                while (atomic_read(&lcm->lcm_llcd_numfree) <
+                       lcm->lcm_llcd_minfree) {
+                        if (llcd_alloc() < 0)
+                                break;
+                }
+
+                spin_lock(&lcm->lcm_thread_lock);
+                atomic_inc(&lcm->lcm_thread_numidle);
+                list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
+                spin_unlock(&lcm->lcm_thread_lock);
+
+                wait_event_interruptible(lcm->lcm_waitq,
+                                         !list_empty(&lcm->lcm_llcd_pending) ||
+                                         lcm->lcm_flags & LLOG_LCM_FL_EXIT);
+
+                /* If we are the last available thread, start a new one in case
+                 * we get blocked on an RPC (nobody else will start a new one).
+                 */
+                spin_lock(&lcm->lcm_thread_lock);
+                atomic_dec(&lcm->lcm_thread_numidle);
+                list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
+                spin_unlock(&lcm->lcm_thread_lock);
+
+                sending_list = &lcm->lcm_llcd_pending;
+        resend:
+                if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
+                        lcm->lcm_llcd_maxfree = 0;
+                        lcm->lcm_llcd_minfree = 0;
+                        lcm->lcm_thread_max = 0;
+
+                        if (list_empty(&lcm->lcm_llcd_pending) ||
+                            lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
+                                break;
+                }
+
+                if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
+                    lcm->lcm_thread_total < lcm->lcm_thread_max) {
+                        rc = llog_start_commit_thread();
+                        if (rc < 0)
+                                CERROR("error starting thread: rc %d\n", rc);
+                }
+
+                /* Move all of the pending cancels from the same OST off of
+                 * the list, so we don't get multiple threads blocked and/or
+                 * doing upcalls on the same OST in case of failure.
+                 */
+                spin_lock(&lcm->lcm_llcd_lock);
+                if (!list_empty(sending_list)) {
+                        list_move_tail(sending_list->next,
+                                       &lcd->lcd_llcd_list);
+                        llcd = list_entry(lcd->lcd_llcd_list.next,
+                                          typeof(*llcd), llcd_list);
+                        LASSERT(llcd->llcd_lcm == lcm);
+                        import = llcd->llcd_import;
+                }
+                list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
+                        LASSERT(llcd->llcd_lcm == lcm);
+                        if (import == llcd->llcd_import)
+                                list_move_tail(&llcd->llcd_list,
+                                               &lcd->lcd_llcd_list);
+                }
+                if (sending_list != &lcm->lcm_llcd_resend) {
+                        list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
+                                                 llcd_list) {
+                                LASSERT(llcd->llcd_lcm == lcm);
+                                if (import == llcd->llcd_import)
+                                        list_move_tail(&llcd->llcd_list,
+                                                       &lcd->lcd_llcd_list);
+                        }
+                }
+                spin_unlock(&lcm->lcm_llcd_lock);
+
+                /* We are the only one manipulating our local list - no lock */
+                list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
+                        char *bufs[1] = {(char *)llcd->llcd_cookies};
+                        list_del(&llcd->llcd_list);
+
+                        request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
+                                                  &llcd->llcd_cookiebytes,
+                                                  bufs);
+                        if (request == NULL) {
+                                rc = -ENOMEM;
+                                CERROR("error preparing commit: rc %d\n", rc);
+
+                                spin_lock(&lcm->lcm_llcd_lock);
+                                list_splice(&lcd->lcd_llcd_list,
+                                            &lcm->lcm_llcd_resend);
+                                INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+                                spin_unlock(&lcm->lcm_llcd_lock);
+                                break;
+                        }
+
+                        request->rq_replen = lustre_msg_size(0, NULL);
+                        rc = ptlrpc_queue_wait(request);
+                        ptlrpc_req_finished(request);
+
+                        /* If the RPC failed, we put this and the remaining
+                         * messages onto the resend list for another time. */
+                        if (rc == 0) {
+                                llcd_put(llcd);
+                                continue;
+                        }
+
+                        spin_lock(&lcm->lcm_llcd_lock);
+                        list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
+                        if (++llcd->llcd_tries < 5) {
+                                CERROR("commit %p failed %dx: rc %d\n",
+                                       llcd, llcd->llcd_tries, rc);
+
+                                list_add_tail(&llcd->llcd_list,
+                                              &lcm->lcm_llcd_resend);
+                                spin_unlock(&lcm->lcm_llcd_lock);
+                        } else {
+                                spin_unlock(&lcm->lcm_llcd_lock);
+                                CERROR("commit %p dropped %d cookies: rc %d\n",
+                                       llcd, llcd->llcd_cookiebytes /
+                                       sizeof(*llcd->llcd_cookies), rc);
+                                llcd_put(llcd);
+                        }
+                        break;
+                }
+
+                if (rc == 0) {
+                        sending_list = &lcm->lcm_llcd_resend;
+                        if (!list_empty(sending_list))
+                                goto resend;
+                }
+        } while(1);
+
+        /* If we are force exiting, just drop all of the cookies. */
+        if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
+                spin_lock(&lcm->lcm_llcd_lock);
+                list_splice(&lcm->lcm_llcd_pending,&lcd->lcd_llcd_list);
+                list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
+                list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
+                spin_unlock(&lcm->lcm_llcd_lock);
+
+                list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
+                        llcd_put(llcd);
+        }
+
+        CDEBUG(D_HA, "%s exiting\n", current->comm);
+        OBD_FREE(lcd, sizeof(*lcd));
+        return 0;
+}
+
+int llog_start_commit_thread(void)
+{
+        int rc;
+        ENTRY;
+
+        rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
+        if (rc < 0) {
+                CERROR("error starting thread #%d: %d\n", lcm->lcm_thread_total,
+                       rc);
+                RETURN(rc);
+        }
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(llog_start_commit_thread);
+
+int llog_init_commit_master(void)
+{
+        INIT_LIST_HEAD(&lcm->lcm_thread_busy);
+        INIT_LIST_HEAD(&lcm->lcm_thread_idle);
+        spin_lock_init(&lcm->lcm_thread_lock);
+        atomic_set(&lcm->lcm_thread_numidle, 0);
+        init_waitqueue_head(&lcm->lcm_waitq);
+        INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
+        INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
+        INIT_LIST_HEAD(&lcm->lcm_llcd_free);
+        spin_lock_init(&lcm->lcm_llcd_lock);
+        atomic_set(&lcm->lcm_llcd_numfree, 0);
+        lcm->lcm_llcd_minfree = 0;
+        return 0;
+}
+
+int llog_cleanup_commit_master(void)
+{
+        return 0;
+}