-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see [sun.com URL with a
- * copy of GPLv2].
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
* if an OST or MDS fails it need only look at log(s) relevant to itself
*
* Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Alex Zhuravlev <bzzz@whamcloud.com>
+ * Author: Mikhail Pershin <tappro@whamcloud.com>
*/
#define DEBUG_SUBSYSTEM S_LOG
-#ifndef EXPORT_SYMTAB
-#define EXPORT_SYMTAB
-#endif
-
#ifndef __KERNEL__
#include <liblustre.h>
#endif
#include <obd_class.h>
#include <lustre_log.h>
-#include <libcfs/list.h>
#include "llog_internal.h"
/* Allocate a new log or catalog handle */
struct llog_handle *loghandle;
ENTRY;
- OBD_ALLOC(loghandle, sizeof(*loghandle));
+ OBD_ALLOC_PTR(loghandle);
if (loghandle == NULL)
RETURN(ERR_PTR(-ENOMEM));
- init_rwsem(&loghandle->lgh_lock);
+ cfs_init_rwsem(&loghandle->lgh_lock);
+ cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
+ CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
RETURN(loghandle);
}
if (!loghandle->lgh_hdr)
goto out;
if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
- list_del_init(&loghandle->u.phd.phd_entry);
+ cfs_list_del_init(&loghandle->u.phd.phd_entry);
if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
- LASSERT(list_empty(&loghandle->u.chd.chd_head));
- OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+ LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
+ LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
+ OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
- out:
- OBD_FREE(loghandle, sizeof(*loghandle));
+out:
+ OBD_FREE_PTR(loghandle);
}
EXPORT_SYMBOL(llog_free_handle);
/* returns negative on error; 0 if success; 1 if success & log destroyed */
-int llog_cancel_rec(struct llog_handle *loghandle, int index)
+int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
+ int index)
{
struct llog_log_hdr *llh = loghandle->lgh_hdr;
int rc = 0;
ENTRY;
- CDEBUG(D_RPCTRACE, "canceling %d in log "LPX64"\n",
+ CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
index, loghandle->lgh_id.lgl_oid);
if (index == 0) {
- CERROR("cannot cancel index 0 (which is header)\n");
+ CERROR("Can't cancel index 0 which is header\n");
RETURN(-EINVAL);
}
- if (!ext2_clear_bit(index, llh->llh_bitmap)) {
- CDEBUG(D_RPCTRACE, "catalog index %u already clear?\n", index);
- RETURN(-EINVAL);
+ cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
+ RETURN(-ENOENT);
}
llh->llh_count--;
if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
(llh->llh_count == 1) &&
(loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
- rc = llog_destroy(loghandle);
- if (rc) {
- CERROR("failure destroying log after last cancel: %d\n",
- rc);
- ext2_set_bit(index, llh->llh_bitmap);
- llh->llh_count++;
- } else {
- rc = 1;
- }
- RETURN(rc);
- }
-
- rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
- if (rc) {
- CERROR("failure re-writing header %d\n", rc);
- ext2_set_bit(index, llh->llh_bitmap);
- llh->llh_count++;
- }
- RETURN(rc);
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ rc = llog_destroy(env, loghandle);
+ if (rc < 0) {
+ CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
+ "#%08x: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_oseq,
+ loghandle->lgh_id.lgl_ogen, rc);
+ GOTO(out_err, rc);
+ }
+ RETURN(1);
+ }
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+
+ rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
+ if (rc < 0) {
+ CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
+ "#%08x: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_oseq,
+ loghandle->lgh_id.lgl_ogen, rc);
+ GOTO(out_err, rc);
+ }
+ RETURN(0);
+out_err:
+ cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ ext2_set_bit(index, llh->llh_bitmap);
+ llh->llh_count++;
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ return rc;
}
EXPORT_SYMBOL(llog_cancel_rec);
-int llog_init_handle(struct llog_handle *handle, int flags,
- struct obd_uuid *uuid)
+int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
+ int flags, struct obd_uuid *uuid)
{
int rc;
struct llog_log_hdr *llh;
ENTRY;
LASSERT(handle->lgh_hdr == NULL);
- OBD_ALLOC(llh, sizeof(*llh));
+ OBD_ALLOC_PTR(llh);
if (llh == NULL)
RETURN(-ENOMEM);
handle->lgh_hdr = llh;
/* first assign flags to use llog_client_ops */
llh->llh_flags = flags;
- rc = llog_read_header(handle);
+ rc = llog_read_header(env, handle);
if (rc == 0) {
flags = llh->llh_flags;
if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
out:
if (flags & LLOG_F_IS_CAT) {
- CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
- llh->llh_size = sizeof(struct llog_logid_rec);
- } else if (flags & LLOG_F_IS_PLAIN) {
- CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
- } else {
- CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
- flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
- LBUG();
- }
-
- if (rc) {
- OBD_FREE(llh, sizeof(*llh));
- handle->lgh_hdr = NULL;
- }
- RETURN(rc);
+ LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
+ CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
+ llh->llh_size = sizeof(struct llog_logid_rec);
+ } else if (!(flags & LLOG_F_IS_PLAIN)) {
+ CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
+ flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
+ rc = -EINVAL;
+ }
+
+ if (rc) {
+ OBD_FREE_PTR(llh);
+ handle->lgh_hdr = NULL;
+ }
+ RETURN(rc);
}
EXPORT_SYMBOL(llog_init_handle);
-int llog_close(struct llog_handle *loghandle)
+int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
{
struct llog_operations *lop;
int rc;
GOTO(out, rc);
if (lop->lop_close == NULL)
GOTO(out, -EOPNOTSUPP);
- rc = lop->lop_close(loghandle);
+ rc = lop->lop_close(env, loghandle);
out:
llog_free_handle(loghandle);
RETURN(rc);
static int llog_process_thread(void *arg)
{
- struct llog_process_info *lpi = (struct llog_process_info *)arg;
- struct llog_handle *loghandle = lpi->lpi_loghandle;
- struct llog_log_hdr *llh = loghandle->lgh_hdr;
- struct llog_process_cat_data *cd = lpi->lpi_catdata;
- char *buf;
- __u64 cur_offset = LLOG_CHUNK_SIZE;
- __u64 last_offset;
- int rc = 0, index = 1, last_index;
- int saved_index = 0, last_called_index = 0;
+ struct llog_process_info *lpi = arg;
+ struct llog_handle *loghandle = lpi->lpi_loghandle;
+ struct llog_log_hdr *llh = loghandle->lgh_hdr;
+ struct llog_process_cat_data *cd = lpi->lpi_catdata;
+ char *buf;
+ __u64 cur_offset = LLOG_CHUNK_SIZE;
+ __u64 last_offset;
+ int rc = 0, index = 1, last_index;
+ int saved_index = 0;
+ int last_called_index = 0;
+
+ ENTRY;
LASSERT(llh);
OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
if (!buf) {
lpi->lpi_rc = -ENOMEM;
-#ifdef __KERNEL__
- complete(&lpi->lpi_completion);
-#endif
- return 0;
+ RETURN(0);
}
- cfs_daemonize_ctxt("llog_process_thread");
-
if (cd != NULL) {
- last_called_index = cd->first_idx;
- index = cd->first_idx + 1;
+ last_called_index = cd->lpcd_first_idx;
+ index = cd->lpcd_first_idx + 1;
}
- if (cd != NULL && cd->last_idx)
- last_index = cd->last_idx;
+ if (cd != NULL && cd->lpcd_last_idx)
+ last_index = cd->lpcd_last_idx;
else
last_index = LLOG_BITMAP_BYTES * 8 - 1;
LASSERT(index <= last_index + 1);
if (index == last_index + 1)
break;
-
+repeat:
CDEBUG(D_OTHER, "index: %d last_index %d\n",
index, last_index);
/* get the buf with our target record; avoid old garbage */
memset(buf, 0, LLOG_CHUNK_SIZE);
last_offset = cur_offset;
- rc = llog_next_block(loghandle, &saved_index, index,
- &cur_offset, buf, LLOG_CHUNK_SIZE);
+ rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
+ index, &cur_offset, buf, LLOG_CHUNK_SIZE);
if (rc)
GOTO(out, rc);
rec, rec->lrh_type);
if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
- lustre_swab_llog_rec(rec, NULL);
+ lustre_swab_llog_rec(rec);
CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
rec->lrh_type, rec->lrh_index);
- if (rec->lrh_index == 0)
- GOTO(out, 0); /* no more records */
-
- if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
+ if (rec->lrh_index == 0) {
+ /* probably another rec just got added? */
+ if (index <= loghandle->lgh_last_idx)
+ GOTO(repeat, rc = 0);
+ GOTO(out, rc = 0); /* no more records */
+ }
+ if (rec->lrh_len == 0 ||
+ rec->lrh_len > LLOG_CHUNK_SIZE) {
CWARN("invalid length %d in llog record for "
"index %d/%d\n", rec->lrh_len,
rec->lrh_index, index);
/* if set, process the callback on this record */
if (ext2_test_bit(index, llh->llh_bitmap)) {
- rc = lpi->lpi_cb(loghandle, rec,
- lpi->lpi_cbdata);
- last_called_index = index;
- if (rc == LLOG_PROC_BREAK) {
- CDEBUG(D_HA, "recovery from log: "LPX64
- ":%x stopped\n",
- loghandle->lgh_id.lgl_oid,
- loghandle->lgh_id.lgl_ogen);
- GOTO(out, rc);
- } else if (rc == LLOG_DEL_RECORD) {
- llog_cancel_rec(loghandle,
- rec->lrh_index);
+ rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
+ lpi->lpi_cbdata);
+ last_called_index = index;
+ if (rc == LLOG_PROC_BREAK) {
+ GOTO(out, rc);
+ } else if (rc == LLOG_DEL_RECORD) {
+ llog_cancel_rec(lpi->lpi_env,
+ loghandle,
+ rec->lrh_index);
rc = 0;
}
if (rc)
}
}
- out:
+out:
if (cd != NULL)
- cd->last_idx = last_called_index;
- if (buf)
- OBD_FREE(buf, LLOG_CHUNK_SIZE);
+ cd->lpcd_last_idx = last_called_index;
+
+ OBD_FREE(buf, LLOG_CHUNK_SIZE);
lpi->lpi_rc = rc;
-#ifdef __KERNEL__
- complete(&lpi->lpi_completion);
-#endif
return 0;
}
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata)
+#ifdef __KERNEL__
+static int llog_process_thread_daemonize(void *arg)
+{
+ struct llog_process_info *lpi = arg;
+ struct lu_env env;
+ int rc;
+
+ cfs_daemonize_ctxt("llog_process_thread");
+
+ /* client env has no keys, tags is just 0 */
+ rc = lu_env_init(&env, LCT_LOCAL);
+ if (rc)
+ goto out;
+ lpi->lpi_env = &env;
+
+ rc = llog_process_thread(arg);
+
+ lu_env_fini(&env);
+out:
+ cfs_complete(&lpi->lpi_completion);
+ return rc;
+}
+#endif
+
+int llog_process_or_fork(const struct lu_env *env,
+ struct llog_handle *loghandle,
+ llog_cb_t cb, void *data, void *catdata, bool fork)
{
struct llog_process_info *lpi;
int rc;
+
ENTRY;
OBD_ALLOC_PTR(lpi);
lpi->lpi_catdata = catdata;
#ifdef __KERNEL__
- init_completion(&lpi->lpi_completion);
- rc = cfs_kernel_thread(llog_process_thread, lpi, CLONE_VM | CLONE_FILES);
- if (rc < 0) {
- CERROR("cannot start thread: %d\n", rc);
- OBD_FREE_PTR(lpi);
- RETURN(rc);
- }
- wait_for_completion(&lpi->lpi_completion);
+ if (fork) {
+ /* The new thread can't use parent env,
+ * init the new one in llog_process_thread_daemonize. */
+ lpi->lpi_env = NULL;
+ cfs_init_completion(&lpi->lpi_completion);
+ rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
+ CFS_DAEMON_FLAGS);
+ if (rc < 0) {
+ CERROR("%s: cannot start thread: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+ OBD_FREE_PTR(lpi);
+ RETURN(rc);
+ }
+ cfs_wait_for_completion(&lpi->lpi_completion);
+ } else {
+ lpi->lpi_env = env;
+ llog_process_thread(lpi);
+ }
#else
- llog_process_thread(lpi);
+ lpi->lpi_env = env;
+ llog_process_thread(lpi);
#endif
rc = lpi->lpi_rc;
OBD_FREE_PTR(lpi);
RETURN(rc);
}
+
+int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
+ llog_cb_t cb, void *data, void *catdata)
+{
+ return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
+}
EXPORT_SYMBOL(llog_process);
inline int llog_get_size(struct llog_handle *loghandle)
}
EXPORT_SYMBOL(llog_get_size);
-int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata)
+int llog_reverse_process(const struct lu_env *env,
+ struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata)
{
struct llog_log_hdr *llh = loghandle->lgh_hdr;
struct llog_process_cat_data *cd = catdata;
RETURN(-ENOMEM);
if (cd != NULL)
- first_index = cd->first_idx + 1;
- if (cd != NULL && cd->last_idx)
- index = cd->last_idx;
+ first_index = cd->lpcd_first_idx + 1;
+ if (cd != NULL && cd->lpcd_last_idx)
+ index = cd->lpcd_last_idx;
else
index = LLOG_BITMAP_BYTES * 8 - 1;
/* get the buf with our target record; avoid old garbage */
memset(buf, 0, LLOG_CHUNK_SIZE);
- rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
- if (rc)
- GOTO(out, rc);
-
- rec = buf;
- idx = le32_to_cpu(rec->lrh_index);
- if (idx < index)
- CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
+ rc = llog_prev_block(env, loghandle, index, buf,
+ LLOG_CHUNK_SIZE);
+ if (rc)
+ GOTO(out, rc);
+
+ rec = buf;
+ idx = rec->lrh_index;
+ CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
while (idx < index) {
- rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ rec = (void *)rec + rec->lrh_len;
+ if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+ lustre_swab_llog_rec(rec);
idx ++;
}
- tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
+ LASSERT(idx == index);
+ tail = (void *)rec + rec->lrh_len - sizeof(*tail);
/* process records in buffer, starting where we found one */
while ((void *)tail > buf) {
- rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
- sizeof(*tail);
-
- if (rec->lrh_index == 0)
- GOTO(out, 0); /* no more records */
+ if (tail->lrt_index == 0)
+ GOTO(out, rc = 0); /* no more records */
/* if set, process the callback on this record */
if (ext2_test_bit(index, llh->llh_bitmap)) {
- rc = cb(loghandle, rec, data);
- if (rc == LLOG_PROC_BREAK) {
- CWARN("recovery from log: "LPX64":%x"
- " stopped\n",
- loghandle->lgh_id.lgl_oid,
- loghandle->lgh_id.lgl_ogen);
- GOTO(out, rc);
- }
+ rec = (void *)tail - tail->lrt_len +
+ sizeof(*tail);
+
+ rc = cb(env, loghandle, rec, data);
+ if (rc == LLOG_PROC_BREAK) {
+ GOTO(out, rc);
+ } else if (rc == LLOG_DEL_RECORD) {
+ llog_cancel_rec(env, loghandle,
+ tail->lrt_index);
+ rc = 0;
+ }
if (rc)
GOTO(out, rc);
}
--index;
if (index < first_index)
GOTO(out, rc = 0);
- tail = (void *)rec - sizeof(*tail);
+ tail = (void *)tail - tail->lrt_len;
}
}