--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001 Cluster File Systems, Inc. <info@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * Structures relating to the log commit thread.
+ */
+
+#ifndef _LUSTRE_COMMIT_CONFD_H
+#define _LUSTRE_COMMIT_CONFD_H
+
+#include <linux/lustre_log.h>
+
+struct llog_commit_data {
+ struct list_head llcd_list; /* free or pending struct list */
+ struct obd_import *llcd_import;
+ struct llog_commit_master *llcd_lcm;
+ int llcd_tries; /* number of tries to send */
+ int llcd_cookiebytes;
+ struct llog_cookie llcd_cookies[0];
+};
+
+struct llog_commit_master {
+ struct list_head lcm_thread_busy; /* list of busy daemons */
+ struct list_head lcm_thread_idle; /* list of idle daemons */
+ spinlock_t lcm_thread_lock; /* protects thread_list */
+ atomic_t lcm_thread_numidle;/* number of idle threads */
+ int lcm_thread_total; /* total number of threads */
+ int lcm_thread_max; /* <= num_osts normally */
+
+ int lcm_flags;
+ wait_queue_head_t lcm_waitq;
+
+ struct list_head lcm_llcd_pending; /* llog_commit_data to send */
+ struct list_head lcm_llcd_resend; /* try to resend this data */
+ struct list_head lcm_llcd_free; /* free llog_commit_data */
+ spinlock_t lcm_llcd_lock; /* protects llcd_free */
+ atomic_t lcm_llcd_numfree; /* items on llcd_free */
+ int lcm_llcd_minfree; /* min free on llcd_free */
+ int lcm_llcd_maxfree; /* max free on llcd_free */
+};
+
+#define LLOG_LCM_FL_EXIT 0x01
+#define LLOG_LCM_FL_EXIT_FORCE 0x02
+
+/* the thread data that collects local commits and makes rpc's */
+struct llog_commit_daemon {
+ struct list_head lcd_lcm_list; /* list of daemon threads */
+ struct list_head lcd_llcd_list; /* list of pending RPCs */
+ struct llog_commit_master *lcd_lcm; /* pointer back to parent */
+};
+
+/* ptlrpc/recov_thread.c */
+int llog_start_commit_thread(void);
+struct llog_commit_data *llcd_grab(void);
+void llcd_send(struct llog_commit_data *llcd);
+
+#endif /* _LUSTRE_COMMIT_CONFD_H */
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001 Cluster File Systems, Inc. <info@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * Generic infrastructure for managing a collection of logs.
+ *
+ * These logs are used for:
+ *
+ * - orphan recovery: OST adds record on create
+ * - mtime/size consistency: the OST adds a record on first write
+ * - open/unlinked objects: OST adds a record on destroy
+ *
+ * - mds unlink log: the MDS adds an entry upon delete
+ *
+ * - raid1 replication log between OST's
+ * - MDS replication logs
+ */
+
+#ifndef _LUSTRE_LOG_H
+#define _LUSTRE_LOG_H
+
+#include <linux/lustre_idl.h>
+
+struct obd_trans_info;
+struct obd_device;
+struct lov_stripe_md;
+
+/* In-memory descriptor for a log object or log catalog */
+struct llog_handle {
+ struct list_head lgh_list;
+ struct llog_cookie lgh_cookie;
+ struct semaphore lgh_lock;
+ struct obd_device *lgh_obd;
+ void *lgh_hdr;
+ struct file *lgh_file;
+ struct obd_uuid *lgh_tgtuuid;
+ struct llog_handle *lgh_current;
+ struct llog_handle *(*lgh_log_create)(struct obd_device *obd);
+ struct llog_handle *(*lgh_log_open)(struct obd_device *obd,
+ struct llog_cookie *logcookie);
+ int (*lgh_log_close)(struct llog_handle *cathandle,
+ struct llog_handle *loghandle);
+ int lgh_index;
+};
+
+extern int llog_add_record(struct llog_handle *cathandle,
+ struct llog_trans_hdr *rec,
+ struct llog_cookie *logcookies);
+
+extern int llog_cancel_records(struct llog_handle *cathandle, int count,
+ struct llog_cookie *cookies);
+
+extern struct llog_handle *llog_alloc_handle(void);
+extern void llog_free_handle(struct llog_handle *handle);
+extern int llog_init_catalog(struct llog_handle *cathandle,
+ struct obd_uuid *tgtuuid);
+extern int llog_delete_log(struct llog_handle *cathandle,
+ struct llog_handle *loghandle);
+extern int llog_close_log(struct llog_handle *cathandle,
+ struct llog_handle *loghandle);
+extern struct llog_handle *llog_new_log(struct llog_handle *cathandle,
+ struct obd_uuid *tgtuuid);
+
+#endif
+
--- /dev/null
+
+
+void commit_add(struct )
+{
+ struct obd_import *import = commit_uuid2import(rec-> uuid);
+
+ if (!import) {
+ CERROR("unaware of OST UUID %s - dorpping\n", rec-> uuid);
+ EXIT;
+ return;
+ }
+
+ spin_lock(&import->llcconf_lock);
+ list_add(&rec-> &import);
+ spin_unlock(&import->llcconf_lock);
+ EXIT;
+ return;
+}
+
+void commit_confd_conf_import(struct obd_import *import,
+ struct llog_commit_confirm_daemon *lccd)
+{
+ struct list_head *tmp, *save;
+
+
+ list_for_each_safe(&import->import_cc_list, tmp, save) {
+ struct llog_commit_data *cd;
+
+ if (atomic_read(import->import_cc_count) <=
+ lccd->llcconf_lowwater)
+ break;
+
+ cd = list_entry(tmp, struct llog_commit_data *, llcconf_entry);
+ atomic_dec(&import->import_cc_count);
+ commit_confd_add_and_fire(cd);
+ }
+ EXIT;
+ return;
+}
+
+
+int commit_confd_main(void *data)
+{
+ struct llog_commit_confirm_daemon *lccd = data;
+
+ while (1) {
+ /* something has happened */
+ event_wait();
+
+ if (lccd->flags & LCCD_STOP)
+ break;
+
+
+ /* lock llccd imporlist */
+ spin_lock(&lccd->llcconf_lock);
+ list_for_each_safe(&lccd->llcconf_list, ) {
+ struct obd_import *import;
+ import = list_entry(&lccd->llcconf_list,
+ struct obd_import,
+ import_entry);
+ get_import(import);
+ spin_unlock(&lccd->llcconf_lock);
+ if (atomic_read(import->import_cc_count) >
+ lccd->llcconf_highwater)
+ commit_confd_conf_import(import);
+ put_import(import);
+ spin_lock(&lccd->llcconf_lock);
+
+ }
+ spin_unlock(&lccd->llcconf_lock);
+
+ }
+
+ lccd->flags = LCCD_STOPPED;
+ RETURN(0);
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * OST<->MDS recovery logging infrastructure.
+ *
+ * Invariants in implementation:
+ * - we do not share logs among different OST<->MDS connections, so that
+ * if an OST or MDS fails it need only look at log(s) relevant to itself
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+#define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_log.h>
+#include <portals/list.h>
+
+/* Allocate a new log or catalog handle */
+struct llog_handle *llog_alloc_handle(void)
+{
+ struct llog_handle *loghandle;
+ ENTRY;
+
+ OBD_ALLOC(loghandle, sizeof(*loghandle));
+ if (loghandle == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ OBD_ALLOC(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+ if (loghandle->lgh_hdr == NULL) {
+ OBD_FREE(loghandle, sizeof(*loghandle));
+ RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ INIT_LIST_HEAD(&loghandle->lgh_list);
+ sema_init(&loghandle->lgh_lock, 1);
+
+ RETURN(loghandle);
+}
+EXPORT_SYMBOL(llog_alloc_handle);
+
+void llog_free_handle(struct llog_handle *loghandle)
+{
+ if (!loghandle)
+ return;
+
+ list_del_init(&loghandle->lgh_list);
+ OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+ OBD_FREE(loghandle, sizeof(*loghandle));
+}
+EXPORT_SYMBOL(llog_free_handle);
+
+/* Create a new log handle and add it to the open list.
+ * This log handle will be closed when all of the records in it are removed.
+ *
+ * Assumes caller has already pushed us into the kernel context and is locking.
+ */
+struct llog_handle *llog_new_log(struct llog_handle *cathandle,
+ struct obd_uuid *tgtuuid)
+{
+ struct llog_handle *loghandle;
+ struct llog_object_hdr *llh;
+ loff_t offset;
+ int rc, index, bitmap_size, i;
+ ENTRY;
+
+ LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
+
+ loghandle = cathandle->lgh_log_create(cathandle->lgh_obd);
+ if (IS_ERR(loghandle))
+ RETURN(loghandle);
+
+ llh = loghandle->lgh_hdr;
+ llh->llh_hdr.lth_type = LLOG_OBJECT_MAGIC;
+ llh->llh_hdr.lth_len = llh->llh_hdr_end_len = sizeof(*llh);
+ llh->llh_timestamp = CURRENT_TIME;
+ llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+ memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
+ loghandle->lgh_tgtuuid = &llh->llh_tgtuuid;
+
+ llh = cathandle->lgh_hdr;
+ bitmap_size = sizeof(llh->llh_bitmap) * 8;
+ /* This should basically always find the first entry free */
+ for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) {
+ index %= bitmap_size;
+ if (ext2_set_bit(index, llh->llh_bitmap)) {
+ /* XXX This should trigger log clean up or similar */
+ CERROR("catalog index %d is still in use\n", index);
+ } else {
+ llh->llh_count = (index + 1) % bitmap_size;
+ break;
+ }
+ }
+ if (i == bitmap_size)
+ CERROR("no free catalog slots for log...\n");
+
+ CDEBUG(D_HA, "new recovery log "LPX64":%x catalog index %u\n",
+ loghandle->lgh_cookie.lgc_lgl.lgl_oid,
+ loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index);
+ loghandle->lgh_cookie.lgc_index = index;
+
+ offset = sizeof(*llh) + index * sizeof(loghandle->lgh_cookie);
+
+ /* XXX Hmm, what to do if the catalog update fails? Under normal
+ * operations we would clean this handle up anyways, and at
+ * worst we leak some objects, but there is little point in
+ * doing the logging in that case...
+ *
+ * We don't want to mark a catalog in-use if it wasn't written.
+ * The only danger is if the OST crashes - the log is lost.
+ */
+ rc = lustre_fwrite(cathandle->lgh_file, &loghandle->lgh_cookie,
+ sizeof(loghandle->lgh_cookie), &offset);
+ if (rc != sizeof(loghandle->lgh_cookie)) {
+ CERROR("error adding log "LPX64" to catalog: rc %d\n",
+ loghandle->lgh_cookie.lgc_lgl.lgl_oid, rc);
+ rc = rc < 0 ? : -ENOSPC;
+ } else {
+ offset = 0;
+ rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
+ &offset);
+ if (rc != sizeof(*llh)) {
+ CERROR("error marking catalog entry %d in use: rc %d\n",
+ index, rc);
+ rc = rc < 0 ? : -ENOSPC;
+ }
+ }
+ cathandle->lgh_current = loghandle;
+ list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list);
+
+ RETURN(loghandle);
+}
+EXPORT_SYMBOL(llog_new_log);
+
+/* Assumes caller has already pushed us into the kernel context. */
+int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
+{
+ struct llog_object_hdr *llh;
+ loff_t offset = 0;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
+
+ down(&cathandle->lgh_lock);
+ llh = cathandle->lgh_hdr;
+
+ if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
+write_hdr: llh->llh_hdr.lth_type = LLOG_CATALOG_MAGIC;
+ llh->llh_hdr.lth_len = llh->llh_hdr_end_len = LLOG_CHUNK_SIZE;
+ llh->llh_timestamp = CURRENT_TIME;
+ llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+ memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
+ rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
+ &offset);
+ if (rc != LLOG_CHUNK_SIZE) {
+ CERROR("error writing catalog header: rc %d\n", rc);
+ OBD_FREE(llh, sizeof(*llh));
+ if (rc >= 0)
+ rc = -ENOSPC;
+ } else
+ rc = 0;
+ } else {
+ rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
+ &offset);
+ if (rc != LLOG_CHUNK_SIZE) {
+ CERROR("error reading catalog header: rc %d\n", rc);
+ /* Can we do much else if the header is bad? */
+ goto write_hdr;
+ } else
+ rc = 0;
+ }
+
+ cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
+ up(&cathandle->lgh_lock);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_init_catalog);
+
+/* Return the currently active log handle. If the current log handle doesn't
+ * have enough space left for the current record, start a new one.
+ *
+ * If reclen is 0, we only want to know what the currently active log is,
+ * otherwise we get a lock on this log so nobody can steal our space.
+ *
+ * Assumes caller has already pushed us into the kernel context and is locking.
+ */
+static struct llog_handle *llog_current_log(struct llog_handle *cathandle,
+ int reclen)
+{
+ struct llog_handle *loghandle = NULL;
+ ENTRY;
+
+ loghandle = cathandle->lgh_current;
+ if (loghandle) {
+ struct llog_object_hdr *llh = loghandle->lgh_hdr;
+ if (llh->llh_count < sizeof(llh->llh_bitmap) * 8)
+ RETURN(loghandle);
+ }
+
+ if (reclen)
+ loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid);
+ RETURN(loghandle);
+}
+
+/* Add a single record to the recovery log(s).
+ * Returns number of bytes in returned logcookies, or negative error code.
+ *
+ * Assumes caller has already pushed us into the kernel context.
+ */
+int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
+ struct llog_cookie *logcookies)
+{
+ struct llog_handle *loghandle;
+ struct llog_object_hdr *llh;
+ int reclen = rec->lth_len;
+ struct file *file;
+ loff_t offset;
+ size_t left;
+ int index;
+ int rc;
+ ENTRY;
+
+ LASSERT(rec->lth_len <= LLOG_CHUNK_SIZE);
+ down(&cathandle->lgh_lock);
+ loghandle = llog_current_log(cathandle, reclen);
+ if (IS_ERR(loghandle)) {
+ up(&cathandle->lgh_lock);
+ RETURN(PTR_ERR(loghandle));
+ }
+ down(&loghandle->lgh_lock);
+ up(&cathandle->lgh_lock);
+
+ llh = loghandle->lgh_hdr;
+ file = loghandle->lgh_file;
+
+ /* Make sure that records don't cross a chunk boundary, so we can
+ * process them page-at-a-time if needed. If it will cross a chunk
+ * boundary, write in a fake (but referenced) entry to pad the chunk.
+ *
+ * We know that llog_current_log() will return a loghandle that is
+ * big enough to hold reclen, so all we care about is padding here.
+ */
+ left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
+ if (left != 0 && left != reclen && left < reclen + LLOG_MIN_REC_SIZE) {
+ struct llog_null_trans {
+ struct llog_trans_hdr hdr;
+ __u32 padding[6];
+ } pad = { .hdr = { .lth_len = left } };
+
+ LASSERT(left >= LLOG_MIN_REC_SIZE);
+ if (left <= sizeof(pad))
+ *(__u32 *)((char *)&pad + left - sizeof(__u32)) = left;
+
+ rc = lustre_fwrite(loghandle->lgh_file, &pad,
+ min(sizeof(pad), left),
+ &loghandle->lgh_file->f_pos);
+ if (rc != min(sizeof(pad), left)) {
+ CERROR("error writing padding record: rc %d\n", rc);
+ GOTO(out, rc = rc < 0 ? rc : -EIO);
+ }
+
+ left -= rc;
+ if (left) {
+ LASSERT(left >= sizeof(__u32));
+ loghandle->lgh_file->f_pos += left - sizeof(__u32);
+ rc = lustre_fwrite(loghandle->lgh_file, &pad,
+ sizeof(__u32),
+ &loghandle->lgh_file->f_pos);
+ if (rc != sizeof(__u32)) {
+ CERROR("error writing padding end: rc %d\n",
+ rc);
+ GOTO(out, rc < 0 ? rc : -ENOSPC);
+ }
+ }
+
+ loghandle->lgh_index++;
+ }
+
+ index = loghandle->lgh_index++;
+ if (ext2_set_bit(index, llh->llh_bitmap)) {
+ CERROR("argh, index %u already set in log bitmap?\n", index);
+ LBUG(); /* should never happen */
+ }
+ llh->llh_count++;
+
+ offset = 0;
+ rc = lustre_fwrite(loghandle->lgh_file, llh, sizeof(*llh), &offset);
+ if (rc != sizeof(*llh)) {
+ CERROR("error writing log header: rc %d\n", rc);
+ GOTO(out, rc < 0 ? rc : -EIO);
+ }
+
+ rc = lustre_fwrite(loghandle->lgh_file, rec, reclen,
+ &loghandle->lgh_file->f_pos);
+ if (rc != reclen) {
+ CERROR("error writing log record: rc %d\n", rc);
+ GOTO(out, rc < 0 ? rc : -ENOSPC);
+ }
+
+ CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n",
+ loghandle->lgh_cookie.lgc_lgl.lgl_oid,
+ loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lth_len);
+ *logcookies = loghandle->lgh_cookie;
+ logcookies->lgc_index = index;
+
+ rc = 0;
+out:
+ up(&loghandle->lgh_lock);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_add_record);
+
+/* Remove a log entry from the catalog.
+ * Assumes caller has already pushed us into the kernel context and is locking.
+ */
+int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle)
+{
+ struct llog_cookie *lgc = &loghandle->lgh_cookie;
+ int catindex = lgc->lgc_index;
+ struct llog_object_hdr *llh = cathandle->lgh_hdr;
+ loff_t offset = 0;
+ int rc = 0;
+ ENTRY;
+
+ CDEBUG(D_HA, "log "LPX64":%x empty, closing\n",
+ lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen);
+
+ if (ext2_clear_bit(catindex, llh->llh_bitmap)) {
+ CERROR("catalog index %u already clear?\n", catindex);
+ LBUG();
+ } else {
+ rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
+ &offset);
+
+ if (rc != sizeof(*llh)) {
+ CERROR("log %u cancel error: rc %d\n", catindex, rc);
+ if (rc >= 0)
+ rc = -EIO;
+ } else
+ rc = 0;
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_delete_log);
+
+/* Assumes caller has already pushed us into the kernel context and is locking.
+ * We return a lock on the handle to ensure nobody yanks it from us.
+ */
+static struct llog_handle *llog_id2handle(struct llog_handle *cathandle,
+ struct llog_cookie *logcookie)
+{
+ struct llog_handle *loghandle;
+ struct llog_logid *lgl = &logcookie->lgc_lgl;
+ ENTRY;
+
+ if (cathandle == NULL)
+ RETURN(ERR_PTR(-EBADF));
+
+ list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) {
+ struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl;
+ if (cgl->lgl_oid == lgl->lgl_oid) {
+ if (cgl->lgl_ogen != lgl->lgl_ogen) {
+ CERROR("log "LPX64" generation %x != %x\n",
+ lgl->lgl_oid, cgl->lgl_ogen,
+ lgl->lgl_ogen);
+ continue;
+ }
+ GOTO(out, loghandle);
+ }
+ }
+
+ loghandle = cathandle->lgh_log_open(cathandle->lgh_obd, logcookie);
+ if (IS_ERR(loghandle)) {
+ CERROR("error opening log id "LPX64":%x: rc %d\n",
+ lgl->lgl_oid, lgl->lgl_ogen, (int)PTR_ERR(loghandle));
+ } else {
+ list_add(&loghandle->lgh_list, &cathandle->lgh_list);
+ }
+
+out:
+ RETURN(loghandle);
+}
+
+/* For each cookie in the cookie array, we clear the log in-use bit and either:
+ * - the log is empty, so mark it free in the catalog header and delete it
+ * - the log is not empty, just write out the log header
+ *
+ * The cookies may be in different log files, so we need to get new logs
+ * each time.
+ *
+ * Assumes caller has already pushed us into the kernel context.
+ */
+int llog_cancel_records(struct llog_handle *cathandle, int count,
+ struct llog_cookie *cookies)
+{
+ int i, rc = 0;
+ ENTRY;
+
+ down(&cathandle->lgh_lock);
+ for (i = 0; i < count; i++, cookies++) {
+ struct llog_handle *loghandle;
+ struct llog_object_hdr *llh;
+ struct llog_logid *lgl = &cookies->lgc_lgl;
+
+ loghandle = llog_id2handle(cathandle, cookies);
+ if (IS_ERR(loghandle)) {
+ if (!rc)
+ rc = PTR_ERR(loghandle);
+ continue;
+ }
+
+ down(&loghandle->lgh_lock);
+ llh = loghandle->lgh_hdr;
+ CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n",
+ lgl->lgl_oid, cookies->lgc_index,
+ ext2_test_bit(cookies->lgc_index, llh->llh_bitmap));
+ if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) {
+ CERROR("log index %u in "LPX64":%x already clear?\n",
+ cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen);
+ } else if (--llh->llh_count == 0 &&
+ loghandle != llog_current_log(cathandle, 0)) {
+ loghandle->lgh_log_close(cathandle, loghandle);
+ } else {
+ loff_t offset = 0;
+ int ret = lustre_fwrite(loghandle->lgh_file, llh,
+ sizeof(*llh), &offset);
+
+ if (ret != sizeof(*llh)) {
+ CERROR("error cancelling index %u: rc %d\n",
+ cookies->lgc_index, ret);
+ /* XXX mark handle bad? */
+ if (!rc)
+ rc = ret;
+ }
+ }
+ up(&loghandle->lgh_lock);
+ }
+ up(&cathandle->lgh_lock);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cancel_records);
+
+int llog_close_log(struct llog_handle *cathandle, struct llog_handle *loghandle)
+{
+ return loghandle->lgh_log_close(cathandle, loghandle);
+}
+EXPORT_SYMBOL(llog_close_log);
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2003 Cluster File Systems, Inc.
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * OST<->MDS recovery logging thread.
+ *
+ * Invariants in implementation:
+ * - we do not share logs among different OST<->MDS connections, so that
+ * if an OST or MDS fails it need only look at log(s) relevant to itself
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_commit_confd.h>
+#include <portals/list.h>
+
+static struct llog_commit_master lustre_lcm;
+static struct llog_commit_master *lcm = &lustre_lcm;
+
+/* Allocate new commit structs in case we do not have enough */
+static int llcd_alloc(void)
+{
+ struct llog_commit_data *llcd;
+
+ OBD_ALLOC(llcd, PAGE_SIZE);
+ if (llcd == NULL)
+ return -ENOMEM;
+
+ llcd->llcd_lcm = lcm;
+
+ spin_lock(&lcm->lcm_llcd_lock);
+ list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
+ atomic_inc(&lcm->lcm_llcd_numfree);
+ spin_unlock(&lcm->lcm_llcd_lock);
+
+ return 0;
+}
+
+/* Get a free cookie struct from the list */
+struct llog_commit_data *llcd_grab(void)
+{
+ struct llog_commit_data *llcd;
+
+ spin_lock(&lcm->lcm_llcd_lock);
+ if (list_empty(&lcm->lcm_llcd_free)) {
+ spin_unlock(&lcm->lcm_llcd_lock);
+ if (llcd_alloc() < 0) {
+ CERROR("unable to allocate log commit data!\n");
+ return NULL;
+ }
+ spin_lock(&lcm->lcm_llcd_lock);
+ }
+
+ llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
+ list_del(&llcd->llcd_list);
+ atomic_dec(&lcm->lcm_llcd_numfree);
+ spin_unlock(&lcm->lcm_llcd_lock);
+
+ llcd->llcd_tries = 0;
+ llcd->llcd_cookiebytes = 0;
+
+ return llcd;
+}
+EXPORT_SYMBOL(llcd_grab);
+
+static void llcd_put(struct llog_commit_data *llcd)
+{
+ if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
+ OBD_FREE(llcd, PAGE_SIZE);
+ } else {
+ spin_lock(&lcm->lcm_llcd_lock);
+ list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
+ atomic_inc(&lcm->lcm_llcd_numfree);
+ spin_unlock(&lcm->lcm_llcd_lock);
+ }
+}
+
+/* Send some cookies to the appropriate target */
+void llcd_send(struct llog_commit_data *llcd)
+{
+ spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
+ list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
+ spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
+
+ wake_up_nr(&llcd->llcd_lcm->lcm_waitq, 1);
+}
+EXPORT_SYMBOL(llcd_send);
+
+static int log_commit_thread(void *arg)
+{
+ struct llog_commit_master *lcm = arg;
+ struct llog_commit_daemon *lcd;
+ struct llog_commit_data *llcd, *n;
+ long flags;
+
+ OBD_ALLOC(lcd, sizeof(*lcd));
+ if (lcd == NULL)
+ RETURN(-ENOMEM);
+
+ INIT_LIST_HEAD(&lcd->lcd_lcm_list);
+ INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+ lcd->lcd_lcm = lcm;
+
+ lock_kernel();
+ daemonize(); /* thread never needs to do IO */
+
+ SIGNAL_MASK_LOCK(current, flags);
+ sigfillset(¤t->blocked);
+ RECALC_SIGPENDING;
+ SIGNAL_MASK_UNLOCK(current, flags);
+
+ spin_lock(&lcm->lcm_thread_lock);
+ THREAD_NAME(current->comm, "ll_log_commit_%d", lcm->lcm_thread_total++);
+ spin_unlock(&lcm->lcm_thread_lock);
+ unlock_kernel();
+
+ CDEBUG(D_HA, "%s started\n", current->comm);
+ do {
+ struct ptlrpc_request *request;
+ struct obd_import *import;
+ struct list_head *sending_list;
+ int rc = 0;
+
+ /* If we do not have enough pages available, allocate some */
+ while (atomic_read(&lcm->lcm_llcd_numfree) <
+ lcm->lcm_llcd_minfree) {
+ if (llcd_alloc() < 0)
+ break;
+ }
+
+ spin_lock(&lcm->lcm_thread_lock);
+ atomic_inc(&lcm->lcm_thread_numidle);
+ list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
+ spin_unlock(&lcm->lcm_thread_lock);
+
+ wait_event_interruptible(lcm->lcm_waitq,
+ !list_empty(&lcm->lcm_llcd_pending) ||
+ lcm->lcm_flags & LLOG_LCM_FL_EXIT);
+
+ /* If we are the last available thread, start a new one in case
+ * we get blocked on an RPC (nobody else will start a new one).
+ */
+ spin_lock(&lcm->lcm_thread_lock);
+ atomic_dec(&lcm->lcm_thread_numidle);
+ list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
+ spin_unlock(&lcm->lcm_thread_lock);
+
+ sending_list = &lcm->lcm_llcd_pending;
+ resend:
+ if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
+ lcm->lcm_llcd_maxfree = 0;
+ lcm->lcm_llcd_minfree = 0;
+ lcm->lcm_thread_max = 0;
+
+ if (list_empty(&lcm->lcm_llcd_pending) ||
+ lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
+ break;
+ }
+
+ if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
+ lcm->lcm_thread_total < lcm->lcm_thread_max) {
+ rc = llog_start_commit_thread();
+ if (rc < 0)
+ CERROR("error starting thread: rc %d\n", rc);
+ }
+
+ /* Move all of the pending cancels from the same OST off of
+ * the list, so we don't get multiple threads blocked and/or
+ * doing upcalls on the same OST in case of failure.
+ */
+ spin_lock(&lcm->lcm_llcd_lock);
+ if (!list_empty(sending_list)) {
+ list_move_tail(sending_list->next,
+ &lcd->lcd_llcd_list);
+ llcd = list_entry(lcd->lcd_llcd_list.next,
+ typeof(*llcd), llcd_list);
+ LASSERT(llcd->llcd_lcm == lcm);
+ import = llcd->llcd_import;
+ }
+ list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
+ LASSERT(llcd->llcd_lcm == lcm);
+ if (import == llcd->llcd_import)
+ list_move_tail(&llcd->llcd_list,
+ &lcd->lcd_llcd_list);
+ }
+ if (sending_list != &lcm->lcm_llcd_resend) {
+ list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
+ llcd_list) {
+ LASSERT(llcd->llcd_lcm == lcm);
+ if (import == llcd->llcd_import)
+ list_move_tail(&llcd->llcd_list,
+ &lcd->lcd_llcd_list);
+ }
+ }
+ spin_unlock(&lcm->lcm_llcd_lock);
+
+ /* We are the only one manipulating our local list - no lock */
+ list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
+ char *bufs[1] = {(char *)llcd->llcd_cookies};
+ list_del(&llcd->llcd_list);
+
+ request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
+ &llcd->llcd_cookiebytes,
+ bufs);
+ if (request == NULL) {
+ rc = -ENOMEM;
+ CERROR("error preparing commit: rc %d\n", rc);
+
+ spin_lock(&lcm->lcm_llcd_lock);
+ list_splice(&lcd->lcd_llcd_list,
+ &lcm->lcm_llcd_resend);
+ INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+ spin_unlock(&lcm->lcm_llcd_lock);
+ break;
+ }
+
+ request->rq_replen = lustre_msg_size(0, NULL);
+ rc = ptlrpc_queue_wait(request);
+ ptlrpc_req_finished(request);
+
+ /* If the RPC failed, we put this and the remaining
+ * messages onto the resend list for another time. */
+ if (rc == 0) {
+ llcd_put(llcd);
+ continue;
+ }
+
+ spin_lock(&lcm->lcm_llcd_lock);
+ list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
+ if (++llcd->llcd_tries < 5) {
+ CERROR("commit %p failed %dx: rc %d\n",
+ llcd, llcd->llcd_tries, rc);
+
+ list_add_tail(&llcd->llcd_list,
+ &lcm->lcm_llcd_resend);
+ spin_unlock(&lcm->lcm_llcd_lock);
+ } else {
+ spin_unlock(&lcm->lcm_llcd_lock);
+ CERROR("commit %p dropped %d cookies: rc %d\n",
+ llcd, llcd->llcd_cookiebytes /
+ sizeof(*llcd->llcd_cookies), rc);
+ llcd_put(llcd);
+ }
+ break;
+ }
+
+ if (rc == 0) {
+ sending_list = &lcm->lcm_llcd_resend;
+ if (!list_empty(sending_list))
+ goto resend;
+ }
+ } while(1);
+
+ /* If we are force exiting, just drop all of the cookies. */
+ if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
+ spin_lock(&lcm->lcm_llcd_lock);
+ list_splice(&lcm->lcm_llcd_pending,&lcd->lcd_llcd_list);
+ list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
+ list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
+ spin_unlock(&lcm->lcm_llcd_lock);
+
+ list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
+ llcd_put(llcd);
+ }
+
+ CDEBUG(D_HA, "%s exiting\n", current->comm);
+ OBD_FREE(lcd, sizeof(*lcd));
+ return 0;
+}
+
+int llog_start_commit_thread(void)
+{
+ int rc;
+ ENTRY;
+
+ rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
+ if (rc < 0) {
+ CERROR("error starting thread #%d: %d\n", lcm->lcm_thread_total,
+ rc);
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(llog_start_commit_thread);
+
+int llog_init_commit_master(void)
+{
+ INIT_LIST_HEAD(&lcm->lcm_thread_busy);
+ INIT_LIST_HEAD(&lcm->lcm_thread_idle);
+ spin_lock_init(&lcm->lcm_thread_lock);
+ atomic_set(&lcm->lcm_thread_numidle, 0);
+ init_waitqueue_head(&lcm->lcm_waitq);
+ INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
+ INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
+ INIT_LIST_HEAD(&lcm->lcm_llcd_free);
+ spin_lock_init(&lcm->lcm_llcd_lock);
+ atomic_set(&lcm->lcm_llcd_numfree, 0);
+ lcm->lcm_llcd_minfree = 0;
+ return 0;
+}
+
+int llog_cleanup_commit_master(void)
+{
+ return 0;
+}