-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
* if an OST or MDS fails it need only look at log(s) relevant to itself
*
* Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Alex Zhuravlev <bzzz@whamcloud.com>
+ * Author: Mikhail Pershin <tappro@whamcloud.com>
*/
#define DEBUG_SUBSYSTEM S_LOG
-#ifndef EXPORT_SYMTAB
-#define EXPORT_SYMTAB
-#endif
-
#ifndef __KERNEL__
#include <liblustre.h>
#endif
#include <obd_class.h>
#include <lustre_log.h>
-#include <libcfs/list.h>
#include "llog_internal.h"
/* Allocate a new log or catalog handle */
struct llog_handle *loghandle;
ENTRY;
- OBD_ALLOC(loghandle, sizeof(*loghandle));
+ OBD_ALLOC_PTR(loghandle);
if (loghandle == NULL)
RETURN(ERR_PTR(-ENOMEM));
- init_rwsem(&loghandle->lgh_lock);
+ cfs_init_rwsem(&loghandle->lgh_lock);
+ cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
+ CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
RETURN(loghandle);
}
if (!loghandle->lgh_hdr)
goto out;
if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
- list_del_init(&loghandle->u.phd.phd_entry);
+ cfs_list_del_init(&loghandle->u.phd.phd_entry);
if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
- LASSERT(list_empty(&loghandle->u.chd.chd_head));
- OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+ LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
+ LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
+ OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
- out:
- OBD_FREE(loghandle, sizeof(*loghandle));
+out:
+ OBD_FREE_PTR(loghandle);
}
EXPORT_SYMBOL(llog_free_handle);
RETURN(-EINVAL);
}
- if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+ cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
RETURN(-ENOENT);
}
if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
(llh->llh_count == 1) &&
(loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
- rc = llog_destroy(loghandle);
- if (rc) {
- CERROR("Failure destroying log after last cancel: %d\n",
- rc);
- ext2_set_bit(index, llh->llh_bitmap);
- llh->llh_count++;
- } else {
- rc = 1;
- }
- RETURN(rc);
- }
-
- rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
- if (rc) {
- CERROR("Failure re-writing header %d\n", rc);
- ext2_set_bit(index, llh->llh_bitmap);
- llh->llh_count++;
- }
- RETURN(rc);
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ rc = llog_destroy(loghandle);
+ if (rc < 0) {
+ CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
+ "#%08x: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_oseq,
+ loghandle->lgh_id.lgl_ogen, rc);
+ GOTO(out_err, rc);
+ }
+ RETURN(1);
+ }
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+
+ rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
+ if (rc < 0) {
+ CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
+ "#%08x: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_oseq,
+ loghandle->lgh_id.lgl_ogen, rc);
+ GOTO(out_err, rc);
+ }
+ RETURN(0);
+out_err:
+ cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ ext2_set_bit(index, llh->llh_bitmap);
+ llh->llh_count++;
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ return rc;
}
EXPORT_SYMBOL(llog_cancel_rec);
ENTRY;
LASSERT(handle->lgh_hdr == NULL);
- OBD_ALLOC(llh, sizeof(*llh));
+ OBD_ALLOC_PTR(llh);
if (llh == NULL)
RETURN(-ENOMEM);
handle->lgh_hdr = llh;
out:
if (flags & LLOG_F_IS_CAT) {
- CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
- llh->llh_size = sizeof(struct llog_logid_rec);
- } else if (flags & LLOG_F_IS_PLAIN) {
- CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
- } else {
- CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
- flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
- LBUG();
- }
-
- if (rc) {
- OBD_FREE(llh, sizeof(*llh));
- handle->lgh_hdr = NULL;
- }
- RETURN(rc);
+ LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
+ CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
+ llh->llh_size = sizeof(struct llog_logid_rec);
+ } else if (!(flags & LLOG_F_IS_PLAIN)) {
+ CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
+ flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
+ rc = -EINVAL;
+ }
+
+ if (rc) {
+ OBD_FREE_PTR(llh);
+ handle->lgh_hdr = NULL;
+ }
+ RETURN(rc);
}
EXPORT_SYMBOL(llog_init_handle);
if (!buf) {
lpi->lpi_rc = -ENOMEM;
#ifdef __KERNEL__
- complete(&lpi->lpi_completion);
+ cfs_complete(&lpi->lpi_completion);
#endif
return 0;
}
- cfs_daemonize_ctxt("llog_process_thread");
+ if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
+ cfs_daemonize_ctxt("llog_process_thread");
if (cd != NULL) {
last_called_index = cd->lpcd_first_idx;
OBD_FREE(buf, LLOG_CHUNK_SIZE);
lpi->lpi_rc = rc;
#ifdef __KERNEL__
- complete(&lpi->lpi_completion);
+ cfs_complete(&lpi->lpi_completion);
#endif
return 0;
}
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata)
+int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata, int flags)
{
struct llog_process_info *lpi;
int rc;
lpi->lpi_cb = cb;
lpi->lpi_cbdata = data;
lpi->lpi_catdata = catdata;
+ lpi->lpi_flags = flags;
#ifdef __KERNEL__
- init_completion(&lpi->lpi_completion);
- rc = cfs_kernel_thread(llog_process_thread, lpi, CLONE_VM | CLONE_FILES);
+ cfs_init_completion(&lpi->lpi_completion);
+ rc = cfs_create_thread(llog_process_thread, lpi, CFS_DAEMON_FLAGS);
if (rc < 0) {
CERROR("cannot start thread: %d\n", rc);
OBD_FREE_PTR(lpi);
RETURN(rc);
}
- wait_for_completion(&lpi->lpi_completion);
+ cfs_wait_for_completion(&lpi->lpi_completion);
#else
llog_process_thread(lpi);
#endif
OBD_FREE_PTR(lpi);
RETURN(rc);
}
+EXPORT_SYMBOL(llog_process_flags);
+
+int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata)
+{
+ return llog_process_flags(loghandle, cb, data, catdata, 0);
+}
EXPORT_SYMBOL(llog_process);
inline int llog_get_size(struct llog_handle *loghandle)