From 323523e904fc3e43fba1bc9f506555cae6795190 Mon Sep 17 00:00:00 2001 From: pschwan Date: Fri, 4 Jul 2003 08:56:58 +0000 Subject: [PATCH] merge b_orphan into b_merge, with orphan code disabled --- lustre/include/linux/lustre_commit_confd.h | 73 +++++ lustre/include/linux/lustre_log.h | 81 +++++ lustre/mds/commit_confd.c | 76 +++++ lustre/obdclass/recov_log.c | 470 +++++++++++++++++++++++++++++ lustre/ptlrpc/recov_thread.c | 329 ++++++++++++++++++++ 5 files changed, 1029 insertions(+) create mode 100644 lustre/include/linux/lustre_commit_confd.h create mode 100644 lustre/include/linux/lustre_log.h create mode 100644 lustre/mds/commit_confd.c create mode 100644 lustre/obdclass/recov_log.c create mode 100644 lustre/ptlrpc/recov_thread.c diff --git a/lustre/include/linux/lustre_commit_confd.h b/lustre/include/linux/lustre_commit_confd.h new file mode 100644 index 0000000..5a021a8 --- /dev/null +++ b/lustre/include/linux/lustre_commit_confd.h @@ -0,0 +1,73 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * Structures relating to the log commit thread. + */ + +#ifndef _LUSTRE_COMMIT_CONFD_H +#define _LUSTRE_COMMIT_CONFD_H + +#include + +struct llog_commit_data { + struct list_head llcd_list; /* free or pending struct list */ + struct obd_import *llcd_import; + struct llog_commit_master *llcd_lcm; + int llcd_tries; /* number of tries to send */ + int llcd_cookiebytes; + struct llog_cookie llcd_cookies[0]; +}; + +struct llog_commit_master { + struct list_head lcm_thread_busy; /* list of busy daemons */ + struct list_head lcm_thread_idle; /* list of idle daemons */ + spinlock_t lcm_thread_lock; /* protects thread_list */ + atomic_t lcm_thread_numidle;/* number of idle threads */ + int lcm_thread_total; /* total number of threads */ + int lcm_thread_max; /* <= num_osts normally */ + + int lcm_flags; + wait_queue_head_t lcm_waitq; + + struct list_head lcm_llcd_pending; /* llog_commit_data to send */ + struct list_head lcm_llcd_resend; /* try to resend this data */ + struct list_head lcm_llcd_free; /* free llog_commit_data */ + spinlock_t lcm_llcd_lock; /* protects llcd_free */ + atomic_t lcm_llcd_numfree; /* items on llcd_free */ + int lcm_llcd_minfree; /* min free on llcd_free */ + int lcm_llcd_maxfree; /* max free on llcd_free */ +}; + +#define LLOG_LCM_FL_EXIT 0x01 +#define LLOG_LCM_FL_EXIT_FORCE 0x02 + +/* the thread data that collects local commits and makes rpc's */ +struct llog_commit_daemon { + struct list_head lcd_lcm_list; /* list of daemon threads */ + struct list_head lcd_llcd_list; /* list of pending RPCs */ + struct llog_commit_master *lcd_lcm; /* pointer back to parent */ +}; + +/* ptlrpc/recov_thread.c */ +int llog_start_commit_thread(void); +struct llog_commit_data *llcd_grab(void); +void llcd_send(struct llog_commit_data *llcd); + +#endif /* _LUSTRE_COMMIT_CONFD_H */ diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h new file mode 100644 index 0000000..2f21583 --- /dev/null +++ b/lustre/include/linux/lustre_log.h @@ -0,0 +1,81 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * Generic infrastructure for managing a collection of logs. + * + * These logs are used for: + * + * - orphan recovery: OST adds record on create + * - mtime/size consistency: the OST adds a record on first write + * - open/unlinked objects: OST adds a record on destroy + * + * - mds unlink log: the MDS adds an entry upon delete + * + * - raid1 replication log between OST's + * - MDS replication logs + */ + +#ifndef _LUSTRE_LOG_H +#define _LUSTRE_LOG_H + +#include + +struct obd_trans_info; +struct obd_device; +struct lov_stripe_md; + +/* In-memory descriptor for a log object or log catalog */ +struct llog_handle { + struct list_head lgh_list; + struct llog_cookie lgh_cookie; + struct semaphore lgh_lock; + struct obd_device *lgh_obd; + void *lgh_hdr; + struct file *lgh_file; + struct obd_uuid *lgh_tgtuuid; + struct llog_handle *lgh_current; + struct llog_handle *(*lgh_log_create)(struct obd_device *obd); + struct llog_handle *(*lgh_log_open)(struct obd_device *obd, + struct llog_cookie *logcookie); + int (*lgh_log_close)(struct llog_handle *cathandle, + struct llog_handle *loghandle); + int lgh_index; +}; + +extern int llog_add_record(struct llog_handle *cathandle, + struct llog_trans_hdr *rec, + struct llog_cookie *logcookies); + +extern int llog_cancel_records(struct llog_handle *cathandle, int count, + struct llog_cookie *cookies); + +extern struct llog_handle *llog_alloc_handle(void); +extern void llog_free_handle(struct llog_handle *handle); +extern int llog_init_catalog(struct llog_handle *cathandle, + struct obd_uuid *tgtuuid); +extern int llog_delete_log(struct llog_handle *cathandle, + struct llog_handle *loghandle); +extern int llog_close_log(struct llog_handle *cathandle, + struct llog_handle *loghandle); +extern struct llog_handle *llog_new_log(struct llog_handle *cathandle, + struct obd_uuid *tgtuuid); + +#endif + diff --git a/lustre/mds/commit_confd.c b/lustre/mds/commit_confd.c new file mode 100644 index 0000000..557dc55 --- /dev/null +++ b/lustre/mds/commit_confd.c @@ -0,0 +1,76 @@ + + +void commit_add(struct ) +{ + struct obd_import *import = commit_uuid2import(rec-> uuid); + + if (!import) { + CERROR("unaware of OST UUID %s - dorpping\n", rec-> uuid); + EXIT; + return; + } + + spin_lock(&import->llcconf_lock); + list_add(&rec-> &import); + spin_unlock(&import->llcconf_lock); + EXIT; + return; +} + +void commit_confd_conf_import(struct obd_import *import, + struct llog_commit_confirm_daemon *lccd) +{ + struct list_head *tmp, *save; + + + list_for_each_safe(&import->import_cc_list, tmp, save) { + struct llog_commit_data *cd; + + if (atomic_read(import->import_cc_count) <= + lccd->llcconf_lowwater) + break; + + cd = list_entry(tmp, struct llog_commit_data *, llcconf_entry); + atomic_dec(&import->import_cc_count); + commit_confd_add_and_fire(cd); + } + EXIT; + return; +} + + +int commit_confd_main(void *data) +{ + struct llog_commit_confirm_daemon *lccd = data; + + while (1) { + /* something has happened */ + event_wait(); + + if (lccd->flags & LCCD_STOP) + break; + + + /* lock llccd imporlist */ + spin_lock(&lccd->llcconf_lock); + list_for_each_safe(&lccd->llcconf_list, ) { + struct obd_import *import; + import = list_entry(&lccd->llcconf_list, + struct obd_import, + import_entry); + get_import(import); + spin_unlock(&lccd->llcconf_lock); + if (atomic_read(import->import_cc_count) > + lccd->llcconf_highwater) + commit_confd_conf_import(import); + put_import(import); + spin_lock(&lccd->llcconf_lock); + + } + spin_unlock(&lccd->llcconf_lock); + + } + + lccd->flags = LCCD_STOPPED; + RETURN(0); +} diff --git a/lustre/obdclass/recov_log.c b/lustre/obdclass/recov_log.c new file mode 100644 index 0000000..7c6a3f5 --- /dev/null +++ b/lustre/obdclass/recov_log.c @@ -0,0 +1,470 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * OST<->MDS recovery logging infrastructure. + * + * Invariants in implementation: + * - we do not share logs among different OST<->MDS connections, so that + * if an OST or MDS fails it need only look at log(s) relevant to itself + */ + +#define DEBUG_SUBSYSTEM S_LOG + +#ifndef EXPORT_SYMTAB +#define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include + +/* Allocate a new log or catalog handle */ +struct llog_handle *llog_alloc_handle(void) +{ + struct llog_handle *loghandle; + ENTRY; + + OBD_ALLOC(loghandle, sizeof(*loghandle)); + if (loghandle == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + OBD_ALLOC(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); + if (loghandle->lgh_hdr == NULL) { + OBD_FREE(loghandle, sizeof(*loghandle)); + RETURN(ERR_PTR(-ENOMEM)); + } + + INIT_LIST_HEAD(&loghandle->lgh_list); + sema_init(&loghandle->lgh_lock, 1); + + RETURN(loghandle); +} +EXPORT_SYMBOL(llog_alloc_handle); + +void llog_free_handle(struct llog_handle *loghandle) +{ + if (!loghandle) + return; + + list_del_init(&loghandle->lgh_list); + OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); + OBD_FREE(loghandle, sizeof(*loghandle)); +} +EXPORT_SYMBOL(llog_free_handle); + +/* Create a new log handle and add it to the open list. + * This log handle will be closed when all of the records in it are removed. + * + * Assumes caller has already pushed us into the kernel context and is locking. + */ +struct llog_handle *llog_new_log(struct llog_handle *cathandle, + struct obd_uuid *tgtuuid) +{ + struct llog_handle *loghandle; + struct llog_object_hdr *llh; + loff_t offset; + int rc, index, bitmap_size, i; + ENTRY; + + LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE); + + loghandle = cathandle->lgh_log_create(cathandle->lgh_obd); + if (IS_ERR(loghandle)) + RETURN(loghandle); + + llh = loghandle->lgh_hdr; + llh->llh_hdr.lth_type = LLOG_OBJECT_MAGIC; + llh->llh_hdr.lth_len = llh->llh_hdr_end_len = sizeof(*llh); + llh->llh_timestamp = CURRENT_TIME; + llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); + memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid)); + loghandle->lgh_tgtuuid = &llh->llh_tgtuuid; + + llh = cathandle->lgh_hdr; + bitmap_size = sizeof(llh->llh_bitmap) * 8; + /* This should basically always find the first entry free */ + for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) { + index %= bitmap_size; + if (ext2_set_bit(index, llh->llh_bitmap)) { + /* XXX This should trigger log clean up or similar */ + CERROR("catalog index %d is still in use\n", index); + } else { + llh->llh_count = (index + 1) % bitmap_size; + break; + } + } + if (i == bitmap_size) + CERROR("no free catalog slots for log...\n"); + + CDEBUG(D_HA, "new recovery log "LPX64":%x catalog index %u\n", + loghandle->lgh_cookie.lgc_lgl.lgl_oid, + loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index); + loghandle->lgh_cookie.lgc_index = index; + + offset = sizeof(*llh) + index * sizeof(loghandle->lgh_cookie); + + /* XXX Hmm, what to do if the catalog update fails? Under normal + * operations we would clean this handle up anyways, and at + * worst we leak some objects, but there is little point in + * doing the logging in that case... + * + * We don't want to mark a catalog in-use if it wasn't written. + * The only danger is if the OST crashes - the log is lost. + */ + rc = lustre_fwrite(cathandle->lgh_file, &loghandle->lgh_cookie, + sizeof(loghandle->lgh_cookie), &offset); + if (rc != sizeof(loghandle->lgh_cookie)) { + CERROR("error adding log "LPX64" to catalog: rc %d\n", + loghandle->lgh_cookie.lgc_lgl.lgl_oid, rc); + rc = rc < 0 ? : -ENOSPC; + } else { + offset = 0; + rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh), + &offset); + if (rc != sizeof(*llh)) { + CERROR("error marking catalog entry %d in use: rc %d\n", + index, rc); + rc = rc < 0 ? : -ENOSPC; + } + } + cathandle->lgh_current = loghandle; + list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list); + + RETURN(loghandle); +} +EXPORT_SYMBOL(llog_new_log); + +/* Assumes caller has already pushed us into the kernel context. */ +int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid) +{ + struct llog_object_hdr *llh; + loff_t offset = 0; + int rc = 0; + ENTRY; + + LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE); + + down(&cathandle->lgh_lock); + llh = cathandle->lgh_hdr; + + if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) { +write_hdr: llh->llh_hdr.lth_type = LLOG_CATALOG_MAGIC; + llh->llh_hdr.lth_len = llh->llh_hdr_end_len = LLOG_CHUNK_SIZE; + llh->llh_timestamp = CURRENT_TIME; + llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); + memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid)); + rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, + &offset); + if (rc != LLOG_CHUNK_SIZE) { + CERROR("error writing catalog header: rc %d\n", rc); + OBD_FREE(llh, sizeof(*llh)); + if (rc >= 0) + rc = -ENOSPC; + } else + rc = 0; + } else { + rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, + &offset); + if (rc != LLOG_CHUNK_SIZE) { + CERROR("error reading catalog header: rc %d\n", rc); + /* Can we do much else if the header is bad? */ + goto write_hdr; + } else + rc = 0; + } + + cathandle->lgh_tgtuuid = &llh->llh_tgtuuid; + up(&cathandle->lgh_lock); + RETURN(rc); +} +EXPORT_SYMBOL(llog_init_catalog); + +/* Return the currently active log handle. If the current log handle doesn't + * have enough space left for the current record, start a new one. + * + * If reclen is 0, we only want to know what the currently active log is, + * otherwise we get a lock on this log so nobody can steal our space. + * + * Assumes caller has already pushed us into the kernel context and is locking. + */ +static struct llog_handle *llog_current_log(struct llog_handle *cathandle, + int reclen) +{ + struct llog_handle *loghandle = NULL; + ENTRY; + + loghandle = cathandle->lgh_current; + if (loghandle) { + struct llog_object_hdr *llh = loghandle->lgh_hdr; + if (llh->llh_count < sizeof(llh->llh_bitmap) * 8) + RETURN(loghandle); + } + + if (reclen) + loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid); + RETURN(loghandle); +} + +/* Add a single record to the recovery log(s). + * Returns number of bytes in returned logcookies, or negative error code. + * + * Assumes caller has already pushed us into the kernel context. + */ +int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec, + struct llog_cookie *logcookies) +{ + struct llog_handle *loghandle; + struct llog_object_hdr *llh; + int reclen = rec->lth_len; + struct file *file; + loff_t offset; + size_t left; + int index; + int rc; + ENTRY; + + LASSERT(rec->lth_len <= LLOG_CHUNK_SIZE); + down(&cathandle->lgh_lock); + loghandle = llog_current_log(cathandle, reclen); + if (IS_ERR(loghandle)) { + up(&cathandle->lgh_lock); + RETURN(PTR_ERR(loghandle)); + } + down(&loghandle->lgh_lock); + up(&cathandle->lgh_lock); + + llh = loghandle->lgh_hdr; + file = loghandle->lgh_file; + + /* Make sure that records don't cross a chunk boundary, so we can + * process them page-at-a-time if needed. If it will cross a chunk + * boundary, write in a fake (but referenced) entry to pad the chunk. + * + * We know that llog_current_log() will return a loghandle that is + * big enough to hold reclen, so all we care about is padding here. + */ + left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1)); + if (left != 0 && left != reclen && left < reclen + LLOG_MIN_REC_SIZE) { + struct llog_null_trans { + struct llog_trans_hdr hdr; + __u32 padding[6]; + } pad = { .hdr = { .lth_len = left } }; + + LASSERT(left >= LLOG_MIN_REC_SIZE); + if (left <= sizeof(pad)) + *(__u32 *)((char *)&pad + left - sizeof(__u32)) = left; + + rc = lustre_fwrite(loghandle->lgh_file, &pad, + min(sizeof(pad), left), + &loghandle->lgh_file->f_pos); + if (rc != min(sizeof(pad), left)) { + CERROR("error writing padding record: rc %d\n", rc); + GOTO(out, rc = rc < 0 ? rc : -EIO); + } + + left -= rc; + if (left) { + LASSERT(left >= sizeof(__u32)); + loghandle->lgh_file->f_pos += left - sizeof(__u32); + rc = lustre_fwrite(loghandle->lgh_file, &pad, + sizeof(__u32), + &loghandle->lgh_file->f_pos); + if (rc != sizeof(__u32)) { + CERROR("error writing padding end: rc %d\n", + rc); + GOTO(out, rc < 0 ? rc : -ENOSPC); + } + } + + loghandle->lgh_index++; + } + + index = loghandle->lgh_index++; + if (ext2_set_bit(index, llh->llh_bitmap)) { + CERROR("argh, index %u already set in log bitmap?\n", index); + LBUG(); /* should never happen */ + } + llh->llh_count++; + + offset = 0; + rc = lustre_fwrite(loghandle->lgh_file, llh, sizeof(*llh), &offset); + if (rc != sizeof(*llh)) { + CERROR("error writing log header: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : -EIO); + } + + rc = lustre_fwrite(loghandle->lgh_file, rec, reclen, + &loghandle->lgh_file->f_pos); + if (rc != reclen) { + CERROR("error writing log record: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : -ENOSPC); + } + + CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n", + loghandle->lgh_cookie.lgc_lgl.lgl_oid, + loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lth_len); + *logcookies = loghandle->lgh_cookie; + logcookies->lgc_index = index; + + rc = 0; +out: + up(&loghandle->lgh_lock); + RETURN(rc); +} +EXPORT_SYMBOL(llog_add_record); + +/* Remove a log entry from the catalog. + * Assumes caller has already pushed us into the kernel context and is locking. + */ +int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle) +{ + struct llog_cookie *lgc = &loghandle->lgh_cookie; + int catindex = lgc->lgc_index; + struct llog_object_hdr *llh = cathandle->lgh_hdr; + loff_t offset = 0; + int rc = 0; + ENTRY; + + CDEBUG(D_HA, "log "LPX64":%x empty, closing\n", + lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen); + + if (ext2_clear_bit(catindex, llh->llh_bitmap)) { + CERROR("catalog index %u already clear?\n", catindex); + LBUG(); + } else { + rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh), + &offset); + + if (rc != sizeof(*llh)) { + CERROR("log %u cancel error: rc %d\n", catindex, rc); + if (rc >= 0) + rc = -EIO; + } else + rc = 0; + } + RETURN(rc); +} +EXPORT_SYMBOL(llog_delete_log); + +/* Assumes caller has already pushed us into the kernel context and is locking. + * We return a lock on the handle to ensure nobody yanks it from us. + */ +static struct llog_handle *llog_id2handle(struct llog_handle *cathandle, + struct llog_cookie *logcookie) +{ + struct llog_handle *loghandle; + struct llog_logid *lgl = &logcookie->lgc_lgl; + ENTRY; + + if (cathandle == NULL) + RETURN(ERR_PTR(-EBADF)); + + list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) { + struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl; + if (cgl->lgl_oid == lgl->lgl_oid) { + if (cgl->lgl_ogen != lgl->lgl_ogen) { + CERROR("log "LPX64" generation %x != %x\n", + lgl->lgl_oid, cgl->lgl_ogen, + lgl->lgl_ogen); + continue; + } + GOTO(out, loghandle); + } + } + + loghandle = cathandle->lgh_log_open(cathandle->lgh_obd, logcookie); + if (IS_ERR(loghandle)) { + CERROR("error opening log id "LPX64":%x: rc %d\n", + lgl->lgl_oid, lgl->lgl_ogen, (int)PTR_ERR(loghandle)); + } else { + list_add(&loghandle->lgh_list, &cathandle->lgh_list); + } + +out: + RETURN(loghandle); +} + +/* For each cookie in the cookie array, we clear the log in-use bit and either: + * - the log is empty, so mark it free in the catalog header and delete it + * - the log is not empty, just write out the log header + * + * The cookies may be in different log files, so we need to get new logs + * each time. + * + * Assumes caller has already pushed us into the kernel context. + */ +int llog_cancel_records(struct llog_handle *cathandle, int count, + struct llog_cookie *cookies) +{ + int i, rc = 0; + ENTRY; + + down(&cathandle->lgh_lock); + for (i = 0; i < count; i++, cookies++) { + struct llog_handle *loghandle; + struct llog_object_hdr *llh; + struct llog_logid *lgl = &cookies->lgc_lgl; + + loghandle = llog_id2handle(cathandle, cookies); + if (IS_ERR(loghandle)) { + if (!rc) + rc = PTR_ERR(loghandle); + continue; + } + + down(&loghandle->lgh_lock); + llh = loghandle->lgh_hdr; + CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n", + lgl->lgl_oid, cookies->lgc_index, + ext2_test_bit(cookies->lgc_index, llh->llh_bitmap)); + if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) { + CERROR("log index %u in "LPX64":%x already clear?\n", + cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen); + } else if (--llh->llh_count == 0 && + loghandle != llog_current_log(cathandle, 0)) { + loghandle->lgh_log_close(cathandle, loghandle); + } else { + loff_t offset = 0; + int ret = lustre_fwrite(loghandle->lgh_file, llh, + sizeof(*llh), &offset); + + if (ret != sizeof(*llh)) { + CERROR("error cancelling index %u: rc %d\n", + cookies->lgc_index, ret); + /* XXX mark handle bad? */ + if (!rc) + rc = ret; + } + } + up(&loghandle->lgh_lock); + } + up(&cathandle->lgh_lock); + + RETURN(rc); +} +EXPORT_SYMBOL(llog_cancel_records); + +int llog_close_log(struct llog_handle *cathandle, struct llog_handle *loghandle) +{ + return loghandle->lgh_log_close(cathandle, loghandle); +} +EXPORT_SYMBOL(llog_close_log); diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c new file mode 100644 index 0000000..1048629 --- /dev/null +++ b/lustre/ptlrpc/recov_thread.c @@ -0,0 +1,329 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * OST<->MDS recovery logging thread. + * + * Invariants in implementation: + * - we do not share logs among different OST<->MDS connections, so that + * if an OST or MDS fails it need only look at log(s) relevant to itself + */ + +#define DEBUG_SUBSYSTEM S_LOG + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include + +static struct llog_commit_master lustre_lcm; +static struct llog_commit_master *lcm = &lustre_lcm; + +/* Allocate new commit structs in case we do not have enough */ +static int llcd_alloc(void) +{ + struct llog_commit_data *llcd; + + OBD_ALLOC(llcd, PAGE_SIZE); + if (llcd == NULL) + return -ENOMEM; + + llcd->llcd_lcm = lcm; + + spin_lock(&lcm->lcm_llcd_lock); + list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); + atomic_inc(&lcm->lcm_llcd_numfree); + spin_unlock(&lcm->lcm_llcd_lock); + + return 0; +} + +/* Get a free cookie struct from the list */ +struct llog_commit_data *llcd_grab(void) +{ + struct llog_commit_data *llcd; + + spin_lock(&lcm->lcm_llcd_lock); + if (list_empty(&lcm->lcm_llcd_free)) { + spin_unlock(&lcm->lcm_llcd_lock); + if (llcd_alloc() < 0) { + CERROR("unable to allocate log commit data!\n"); + return NULL; + } + spin_lock(&lcm->lcm_llcd_lock); + } + + llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list); + list_del(&llcd->llcd_list); + atomic_dec(&lcm->lcm_llcd_numfree); + spin_unlock(&lcm->lcm_llcd_lock); + + llcd->llcd_tries = 0; + llcd->llcd_cookiebytes = 0; + + return llcd; +} +EXPORT_SYMBOL(llcd_grab); + +static void llcd_put(struct llog_commit_data *llcd) +{ + if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { + OBD_FREE(llcd, PAGE_SIZE); + } else { + spin_lock(&lcm->lcm_llcd_lock); + list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); + atomic_inc(&lcm->lcm_llcd_numfree); + spin_unlock(&lcm->lcm_llcd_lock); + } +} + +/* Send some cookies to the appropriate target */ +void llcd_send(struct llog_commit_data *llcd) +{ + spin_lock(&llcd->llcd_lcm->lcm_llcd_lock); + list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending); + spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock); + + wake_up_nr(&llcd->llcd_lcm->lcm_waitq, 1); +} +EXPORT_SYMBOL(llcd_send); + +static int log_commit_thread(void *arg) +{ + struct llog_commit_master *lcm = arg; + struct llog_commit_daemon *lcd; + struct llog_commit_data *llcd, *n; + long flags; + + OBD_ALLOC(lcd, sizeof(*lcd)); + if (lcd == NULL) + RETURN(-ENOMEM); + + INIT_LIST_HEAD(&lcd->lcd_lcm_list); + INIT_LIST_HEAD(&lcd->lcd_llcd_list); + lcd->lcd_lcm = lcm; + + lock_kernel(); + daemonize(); /* thread never needs to do IO */ + + SIGNAL_MASK_LOCK(current, flags); + sigfillset(¤t->blocked); + RECALC_SIGPENDING; + SIGNAL_MASK_UNLOCK(current, flags); + + spin_lock(&lcm->lcm_thread_lock); + THREAD_NAME(current->comm, "ll_log_commit_%d", lcm->lcm_thread_total++); + spin_unlock(&lcm->lcm_thread_lock); + unlock_kernel(); + + CDEBUG(D_HA, "%s started\n", current->comm); + do { + struct ptlrpc_request *request; + struct obd_import *import; + struct list_head *sending_list; + int rc = 0; + + /* If we do not have enough pages available, allocate some */ + while (atomic_read(&lcm->lcm_llcd_numfree) < + lcm->lcm_llcd_minfree) { + if (llcd_alloc() < 0) + break; + } + + spin_lock(&lcm->lcm_thread_lock); + atomic_inc(&lcm->lcm_thread_numidle); + list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle); + spin_unlock(&lcm->lcm_thread_lock); + + wait_event_interruptible(lcm->lcm_waitq, + !list_empty(&lcm->lcm_llcd_pending) || + lcm->lcm_flags & LLOG_LCM_FL_EXIT); + + /* If we are the last available thread, start a new one in case + * we get blocked on an RPC (nobody else will start a new one). + */ + spin_lock(&lcm->lcm_thread_lock); + atomic_dec(&lcm->lcm_thread_numidle); + list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy); + spin_unlock(&lcm->lcm_thread_lock); + + sending_list = &lcm->lcm_llcd_pending; + resend: + if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) { + lcm->lcm_llcd_maxfree = 0; + lcm->lcm_llcd_minfree = 0; + lcm->lcm_thread_max = 0; + + if (list_empty(&lcm->lcm_llcd_pending) || + lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) + break; + } + + if (atomic_read(&lcm->lcm_thread_numidle) <= 1 && + lcm->lcm_thread_total < lcm->lcm_thread_max) { + rc = llog_start_commit_thread(); + if (rc < 0) + CERROR("error starting thread: rc %d\n", rc); + } + + /* Move all of the pending cancels from the same OST off of + * the list, so we don't get multiple threads blocked and/or + * doing upcalls on the same OST in case of failure. + */ + spin_lock(&lcm->lcm_llcd_lock); + if (!list_empty(sending_list)) { + list_move_tail(sending_list->next, + &lcd->lcd_llcd_list); + llcd = list_entry(lcd->lcd_llcd_list.next, + typeof(*llcd), llcd_list); + LASSERT(llcd->llcd_lcm == lcm); + import = llcd->llcd_import; + } + list_for_each_entry_safe(llcd, n, sending_list, llcd_list) { + LASSERT(llcd->llcd_lcm == lcm); + if (import == llcd->llcd_import) + list_move_tail(&llcd->llcd_list, + &lcd->lcd_llcd_list); + } + if (sending_list != &lcm->lcm_llcd_resend) { + list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend, + llcd_list) { + LASSERT(llcd->llcd_lcm == lcm); + if (import == llcd->llcd_import) + list_move_tail(&llcd->llcd_list, + &lcd->lcd_llcd_list); + } + } + spin_unlock(&lcm->lcm_llcd_lock); + + /* We are the only one manipulating our local list - no lock */ + list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ + char *bufs[1] = {(char *)llcd->llcd_cookies}; + list_del(&llcd->llcd_list); + + request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1, + &llcd->llcd_cookiebytes, + bufs); + if (request == NULL) { + rc = -ENOMEM; + CERROR("error preparing commit: rc %d\n", rc); + + spin_lock(&lcm->lcm_llcd_lock); + list_splice(&lcd->lcd_llcd_list, + &lcm->lcm_llcd_resend); + INIT_LIST_HEAD(&lcd->lcd_llcd_list); + spin_unlock(&lcm->lcm_llcd_lock); + break; + } + + request->rq_replen = lustre_msg_size(0, NULL); + rc = ptlrpc_queue_wait(request); + ptlrpc_req_finished(request); + + /* If the RPC failed, we put this and the remaining + * messages onto the resend list for another time. */ + if (rc == 0) { + llcd_put(llcd); + continue; + } + + spin_lock(&lcm->lcm_llcd_lock); + list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); + if (++llcd->llcd_tries < 5) { + CERROR("commit %p failed %dx: rc %d\n", + llcd, llcd->llcd_tries, rc); + + list_add_tail(&llcd->llcd_list, + &lcm->lcm_llcd_resend); + spin_unlock(&lcm->lcm_llcd_lock); + } else { + spin_unlock(&lcm->lcm_llcd_lock); + CERROR("commit %p dropped %d cookies: rc %d\n", + llcd, llcd->llcd_cookiebytes / + sizeof(*llcd->llcd_cookies), rc); + llcd_put(llcd); + } + break; + } + + if (rc == 0) { + sending_list = &lcm->lcm_llcd_resend; + if (!list_empty(sending_list)) + goto resend; + } + } while(1); + + /* If we are force exiting, just drop all of the cookies. */ + if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) { + spin_lock(&lcm->lcm_llcd_lock); + list_splice(&lcm->lcm_llcd_pending,&lcd->lcd_llcd_list); + list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list); + list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list); + spin_unlock(&lcm->lcm_llcd_lock); + + list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list) + llcd_put(llcd); + } + + CDEBUG(D_HA, "%s exiting\n", current->comm); + OBD_FREE(lcd, sizeof(*lcd)); + return 0; +} + +int llog_start_commit_thread(void) +{ + int rc; + ENTRY; + + rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES); + if (rc < 0) { + CERROR("error starting thread #%d: %d\n", lcm->lcm_thread_total, + rc); + RETURN(rc); + } + + RETURN(0); +} +EXPORT_SYMBOL(llog_start_commit_thread); + +int llog_init_commit_master(void) +{ + INIT_LIST_HEAD(&lcm->lcm_thread_busy); + INIT_LIST_HEAD(&lcm->lcm_thread_idle); + spin_lock_init(&lcm->lcm_thread_lock); + atomic_set(&lcm->lcm_thread_numidle, 0); + init_waitqueue_head(&lcm->lcm_waitq); + INIT_LIST_HEAD(&lcm->lcm_llcd_pending); + INIT_LIST_HEAD(&lcm->lcm_llcd_resend); + INIT_LIST_HEAD(&lcm->lcm_llcd_free); + spin_lock_init(&lcm->lcm_llcd_lock); + atomic_set(&lcm->lcm_llcd_numfree, 0); + lcm->lcm_llcd_minfree = 0; + return 0; +} + +int llog_cleanup_commit_master(void) +{ + return 0; +} -- 1.8.3.1