-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
* if an OST or MDS fails it need only look at log(s) relevant to itself
*
* Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Alex Zhuravlev <bzzz@whamcloud.com>
+ * Author: Mikhail Pershin <tappro@whamcloud.com>
*/
#define DEBUG_SUBSYSTEM S_LOG
-#ifndef EXPORT_SYMTAB
-#define EXPORT_SYMTAB
-#endif
-
#ifndef __KERNEL__
#include <liblustre.h>
#endif
#include <obd_class.h>
#include <lustre_log.h>
-#include <libcfs/list.h>
#include "llog_internal.h"
-/* Allocate a new log or catalog handle */
+/*
+ * Allocate a new log or catalog handle
+ * Used inside llog_open().
+ */
struct llog_handle *llog_alloc_handle(void)
{
- struct llog_handle *loghandle;
- ENTRY;
+ struct llog_handle *loghandle;
- OBD_ALLOC(loghandle, sizeof(*loghandle));
- if (loghandle == NULL)
- RETURN(ERR_PTR(-ENOMEM));
+ OBD_ALLOC_PTR(loghandle);
+ if (loghandle == NULL)
+ return ERR_PTR(-ENOMEM);
- cfs_init_rwsem(&loghandle->lgh_lock);
+ cfs_init_rwsem(&loghandle->lgh_lock);
+ cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
+ CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
- RETURN(loghandle);
+ return loghandle;
}
-EXPORT_SYMBOL(llog_alloc_handle);
-
+/*
+ * Free llog handle and header data if exists. Used in llog_close() only
+ */
void llog_free_handle(struct llog_handle *loghandle)
{
- if (!loghandle)
- return;
-
- if (!loghandle->lgh_hdr)
- goto out;
- if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
- cfs_list_del_init(&loghandle->u.phd.phd_entry);
- if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
- LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
- OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
-
- out:
- OBD_FREE(loghandle, sizeof(*loghandle));
+ if (!loghandle)
+ return;
+
+ if (!loghandle->lgh_hdr)
+ goto out;
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
+ cfs_list_del_init(&loghandle->u.phd.phd_entry);
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
+ LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
+ LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
+ OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+
+out:
+ OBD_FREE_PTR(loghandle);
}
-EXPORT_SYMBOL(llog_free_handle);
/* returns negative on error; 0 if success; 1 if success & log destroyed */
-int llog_cancel_rec(struct llog_handle *loghandle, int index)
+int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
+ int index)
{
struct llog_log_hdr *llh = loghandle->lgh_hdr;
int rc = 0;
RETURN(-EINVAL);
}
- if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+ cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
RETURN(-ENOENT);
}
if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
(llh->llh_count == 1) &&
(loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
- rc = llog_destroy(loghandle);
- if (rc) {
- CERROR("Failure destroying log after last cancel: %d\n",
- rc);
- ext2_set_bit(index, llh->llh_bitmap);
- llh->llh_count++;
- } else {
- rc = 1;
- }
- RETURN(rc);
- }
-
- rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
- if (rc) {
- CERROR("Failure re-writing header %d\n", rc);
- ext2_set_bit(index, llh->llh_bitmap);
- llh->llh_count++;
- }
- RETURN(rc);
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ rc = llog_destroy(env, loghandle);
+ if (rc < 0) {
+ CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
+ "#%08x: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_oseq,
+ loghandle->lgh_id.lgl_ogen, rc);
+ GOTO(out_err, rc);
+ }
+ RETURN(1);
+ }
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+
+ rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
+ if (rc < 0) {
+ CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
+ "#%08x: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_oseq,
+ loghandle->lgh_id.lgl_ogen, rc);
+ GOTO(out_err, rc);
+ }
+ RETURN(0);
+out_err:
+ cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ ext2_set_bit(index, llh->llh_bitmap);
+ llh->llh_count++;
+ cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ return rc;
}
EXPORT_SYMBOL(llog_cancel_rec);
-int llog_init_handle(struct llog_handle *handle, int flags,
- struct obd_uuid *uuid)
+static int llog_read_header(const struct lu_env *env,
+ struct llog_handle *handle,
+ struct obd_uuid *uuid)
{
- int rc;
- struct llog_log_hdr *llh;
- ENTRY;
- LASSERT(handle->lgh_hdr == NULL);
-
- OBD_ALLOC(llh, sizeof(*llh));
- if (llh == NULL)
- RETURN(-ENOMEM);
- handle->lgh_hdr = llh;
- /* first assign flags to use llog_client_ops */
- llh->llh_flags = flags;
- rc = llog_read_header(handle);
- if (rc == 0) {
- flags = llh->llh_flags;
- if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
- CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
- (char *)llh->llh_tgtuuid.uuid);
- rc = -EEXIST;
- }
- GOTO(out, rc);
- } else if (rc != LLOG_EEMPTY || !flags) {
- /* set a pesudo flag for initialization */
- flags = LLOG_F_IS_CAT;
- GOTO(out, rc);
- }
- rc = 0;
-
- handle->lgh_last_idx = 0; /* header is record with index 0 */
- llh->llh_count = 1; /* for the header record */
- llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
- llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
- llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
- llh->llh_timestamp = cfs_time_current_sec();
- if (uuid)
- memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
- llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
- ext2_set_bit(0, llh->llh_bitmap);
-
-out:
- if (flags & LLOG_F_IS_CAT) {
- CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
- llh->llh_size = sizeof(struct llog_logid_rec);
- } else if (flags & LLOG_F_IS_PLAIN) {
- CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
- } else {
- CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
- flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
- LBUG();
- }
-
- if (rc) {
- OBD_FREE(llh, sizeof(*llh));
- handle->lgh_hdr = NULL;
- }
- RETURN(rc);
+ struct llog_operations *lop;
+ int rc;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+
+ if (lop->lop_read_header == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_read_header(env, handle);
+ if (rc == LLOG_EEMPTY) {
+ struct llog_log_hdr *llh = handle->lgh_hdr;
+
+ handle->lgh_last_idx = 0; /* header is record with index 0 */
+ llh->llh_count = 1; /* for the header record */
+ llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
+ llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
+ llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
+ llh->llh_timestamp = cfs_time_current_sec();
+ if (uuid)
+ memcpy(&llh->llh_tgtuuid, uuid,
+ sizeof(llh->llh_tgtuuid));
+ llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+ ext2_set_bit(0, llh->llh_bitmap);
+ rc = 0;
+ }
+ return rc;
}
-EXPORT_SYMBOL(llog_init_handle);
-int llog_close(struct llog_handle *loghandle)
+int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
+ int flags, struct obd_uuid *uuid)
{
- struct llog_operations *lop;
- int rc;
- ENTRY;
-
- rc = llog_handle2ops(loghandle, &lop);
- if (rc)
- GOTO(out, rc);
- if (lop->lop_close == NULL)
- GOTO(out, -EOPNOTSUPP);
- rc = lop->lop_close(loghandle);
- out:
- llog_free_handle(loghandle);
- RETURN(rc);
+ struct llog_log_hdr *llh;
+ int rc;
+
+ ENTRY;
+ LASSERT(handle->lgh_hdr == NULL);
+
+ OBD_ALLOC_PTR(llh);
+ if (llh == NULL)
+ RETURN(-ENOMEM);
+ handle->lgh_hdr = llh;
+ /* first assign flags to use llog_client_ops */
+ llh->llh_flags = flags;
+ rc = llog_read_header(env, handle, uuid);
+ if (rc == 0) {
+ if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
+ flags & LLOG_F_IS_CAT) ||
+ (llh->llh_flags & LLOG_F_IS_CAT &&
+ flags & LLOG_F_IS_PLAIN))) {
+ CERROR("%s: llog type is %s but initializing %s\n",
+ handle->lgh_ctxt->loc_obd->obd_name,
+ llh->llh_flags & LLOG_F_IS_CAT ?
+ "catalog" : "plain",
+ flags & LLOG_F_IS_CAT ? "catalog" : "plain");
+ GOTO(out, rc = -EINVAL);
+ } else if (llh->llh_flags &
+ (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
+ /*
+ * it is possible to open llog without specifying llog
+ * type so it is taken from llh_flags
+ */
+ flags = llh->llh_flags;
+ } else {
+ /* for some reason the llh_flags has no type set */
+ CERROR("llog type is not specified!\n");
+ GOTO(out, rc = -EINVAL);
+ }
+ if (unlikely(uuid &&
+ !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
+ CERROR("%s: llog uuid mismatch: %s/%s\n",
+ handle->lgh_ctxt->loc_obd->obd_name,
+ (char *)uuid->uuid,
+ (char *)llh->llh_tgtuuid.uuid);
+ GOTO(out, rc = -EEXIST);
+ }
+ }
+ if (flags & LLOG_F_IS_CAT) {
+ LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
+ CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
+ llh->llh_size = sizeof(struct llog_logid_rec);
+ } else if (!(flags & LLOG_F_IS_PLAIN)) {
+ CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
+ handle->lgh_ctxt->loc_obd->obd_name,
+ flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
+ rc = -EINVAL;
+ }
+out:
+ if (rc) {
+ OBD_FREE_PTR(llh);
+ handle->lgh_hdr = NULL;
+ }
+ RETURN(rc);
}
-EXPORT_SYMBOL(llog_close);
+EXPORT_SYMBOL(llog_init_handle);
static int llog_process_thread(void *arg)
{
- struct llog_process_info *lpi = (struct llog_process_info *)arg;
- struct llog_handle *loghandle = lpi->lpi_loghandle;
- struct llog_log_hdr *llh = loghandle->lgh_hdr;
- struct llog_process_cat_data *cd = lpi->lpi_catdata;
- char *buf;
- __u64 cur_offset = LLOG_CHUNK_SIZE;
- __u64 last_offset;
- int rc = 0, index = 1, last_index;
- int saved_index = 0, last_called_index = 0;
+ struct llog_process_info *lpi = arg;
+ struct llog_handle *loghandle = lpi->lpi_loghandle;
+ struct llog_log_hdr *llh = loghandle->lgh_hdr;
+ struct llog_process_cat_data *cd = lpi->lpi_catdata;
+ char *buf;
+ __u64 cur_offset = LLOG_CHUNK_SIZE;
+ __u64 last_offset;
+ int rc = 0, index = 1, last_index;
+ int saved_index = 0;
+ int last_called_index = 0;
+
+ ENTRY;
LASSERT(llh);
OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
if (!buf) {
lpi->lpi_rc = -ENOMEM;
-#ifdef __KERNEL__
- cfs_complete(&lpi->lpi_completion);
-#endif
- return 0;
+ RETURN(0);
}
- if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
- cfs_daemonize_ctxt("llog_process_thread");
-
if (cd != NULL) {
last_called_index = cd->lpcd_first_idx;
index = cd->lpcd_first_idx + 1;
LASSERT(index <= last_index + 1);
if (index == last_index + 1)
break;
-
+repeat:
CDEBUG(D_OTHER, "index: %d last_index %d\n",
index, last_index);
/* get the buf with our target record; avoid old garbage */
memset(buf, 0, LLOG_CHUNK_SIZE);
last_offset = cur_offset;
- rc = llog_next_block(loghandle, &saved_index, index,
- &cur_offset, buf, LLOG_CHUNK_SIZE);
+ rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
+ index, &cur_offset, buf, LLOG_CHUNK_SIZE);
if (rc)
GOTO(out, rc);
rec, rec->lrh_type);
if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
- lustre_swab_llog_rec(rec, NULL);
+ lustre_swab_llog_rec(rec);
CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
rec->lrh_type, rec->lrh_index);
- if (rec->lrh_index == 0)
- GOTO(out, 0); /* no more records */
-
- if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
+ if (rec->lrh_index == 0) {
+ /* probably another rec just got added? */
+ if (index <= loghandle->lgh_last_idx)
+ GOTO(repeat, rc = 0);
+ GOTO(out, rc = 0); /* no more records */
+ }
+ if (rec->lrh_len == 0 ||
+ rec->lrh_len > LLOG_CHUNK_SIZE) {
CWARN("invalid length %d in llog record for "
"index %d/%d\n", rec->lrh_len,
rec->lrh_index, index);
/* if set, process the callback on this record */
if (ext2_test_bit(index, llh->llh_bitmap)) {
- rc = lpi->lpi_cb(loghandle, rec,
- lpi->lpi_cbdata);
- last_called_index = index;
- if (rc == LLOG_PROC_BREAK) {
- GOTO(out, rc);
- } else if (rc == LLOG_DEL_RECORD) {
- llog_cancel_rec(loghandle,
- rec->lrh_index);
+ rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
+ lpi->lpi_cbdata);
+ last_called_index = index;
+ if (rc == LLOG_PROC_BREAK) {
+ GOTO(out, rc);
+ } else if (rc == LLOG_DEL_RECORD) {
+ llog_cancel_rec(lpi->lpi_env,
+ loghandle,
+ rec->lrh_index);
rc = 0;
}
if (rc)
}
}
- out:
+out:
if (cd != NULL)
cd->lpcd_last_idx = last_called_index;
- if (buf)
- OBD_FREE(buf, LLOG_CHUNK_SIZE);
+
+ OBD_FREE(buf, LLOG_CHUNK_SIZE);
lpi->lpi_rc = rc;
-#ifdef __KERNEL__
- cfs_complete(&lpi->lpi_completion);
-#endif
return 0;
}
-int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata, int flags)
+#ifdef __KERNEL__
+static int llog_process_thread_daemonize(void *arg)
+{
+ struct llog_process_info *lpi = arg;
+ struct lu_env env;
+ int rc;
+
+ cfs_daemonize_ctxt("llog_process_thread");
+
+ /* client env has no keys, tags is just 0 */
+ rc = lu_env_init(&env, LCT_LOCAL);
+ if (rc)
+ goto out;
+ lpi->lpi_env = &env;
+
+ rc = llog_process_thread(arg);
+
+ lu_env_fini(&env);
+out:
+ cfs_complete(&lpi->lpi_completion);
+ return rc;
+}
+#endif
+
+int llog_process_or_fork(const struct lu_env *env,
+ struct llog_handle *loghandle,
+ llog_cb_t cb, void *data, void *catdata, bool fork)
{
struct llog_process_info *lpi;
int rc;
+
ENTRY;
OBD_ALLOC_PTR(lpi);
lpi->lpi_cb = cb;
lpi->lpi_cbdata = data;
lpi->lpi_catdata = catdata;
- lpi->lpi_flags = flags;
#ifdef __KERNEL__
- cfs_init_completion(&lpi->lpi_completion);
- rc = cfs_kernel_thread(llog_process_thread, lpi, CLONE_VM | CLONE_FILES);
- if (rc < 0) {
- CERROR("cannot start thread: %d\n", rc);
- OBD_FREE_PTR(lpi);
- RETURN(rc);
- }
- cfs_wait_for_completion(&lpi->lpi_completion);
+ if (fork) {
+ /* The new thread can't use parent env,
+ * init the new one in llog_process_thread_daemonize. */
+ lpi->lpi_env = NULL;
+ cfs_init_completion(&lpi->lpi_completion);
+ rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
+ CFS_DAEMON_FLAGS);
+ if (rc < 0) {
+ CERROR("%s: cannot start thread: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+ OBD_FREE_PTR(lpi);
+ RETURN(rc);
+ }
+ cfs_wait_for_completion(&lpi->lpi_completion);
+ } else {
+ lpi->lpi_env = env;
+ llog_process_thread(lpi);
+ }
#else
- llog_process_thread(lpi);
+ lpi->lpi_env = env;
+ llog_process_thread(lpi);
#endif
rc = lpi->lpi_rc;
OBD_FREE_PTR(lpi);
RETURN(rc);
}
-EXPORT_SYMBOL(llog_process_flags);
+EXPORT_SYMBOL(llog_process_or_fork);
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata)
+int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
+ llog_cb_t cb, void *data, void *catdata)
{
- return llog_process_flags(loghandle, cb, data, catdata, 0);
+ return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
}
EXPORT_SYMBOL(llog_process);
}
EXPORT_SYMBOL(llog_get_size);
-int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata)
+int llog_reverse_process(const struct lu_env *env,
+ struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata)
{
struct llog_log_hdr *llh = loghandle->lgh_hdr;
struct llog_process_cat_data *cd = catdata;
/* get the buf with our target record; avoid old garbage */
memset(buf, 0, LLOG_CHUNK_SIZE);
- rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
- if (rc)
- GOTO(out, rc);
-
- rec = buf;
- idx = le32_to_cpu(rec->lrh_index);
- if (idx < index)
- CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
+ rc = llog_prev_block(env, loghandle, index, buf,
+ LLOG_CHUNK_SIZE);
+ if (rc)
+ GOTO(out, rc);
+
+ rec = buf;
+ idx = rec->lrh_index;
+ CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
while (idx < index) {
- rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ rec = (void *)rec + rec->lrh_len;
+ if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+ lustre_swab_llog_rec(rec);
idx ++;
}
- tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
+ LASSERT(idx == index);
+ tail = (void *)rec + rec->lrh_len - sizeof(*tail);
/* process records in buffer, starting where we found one */
while ((void *)tail > buf) {
- rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
- sizeof(*tail);
-
- if (rec->lrh_index == 0)
- GOTO(out, 0); /* no more records */
+ if (tail->lrt_index == 0)
+ GOTO(out, rc = 0); /* no more records */
/* if set, process the callback on this record */
if (ext2_test_bit(index, llh->llh_bitmap)) {
- rc = cb(loghandle, rec, data);
- if (rc == LLOG_PROC_BREAK) {
- GOTO(out, rc);
- }
+ rec = (void *)tail - tail->lrt_len +
+ sizeof(*tail);
+
+ rc = cb(env, loghandle, rec, data);
+ if (rc == LLOG_PROC_BREAK) {
+ GOTO(out, rc);
+ } else if (rc == LLOG_DEL_RECORD) {
+ llog_cancel_rec(env, loghandle,
+ tail->lrt_index);
+ rc = 0;
+ }
if (rc)
GOTO(out, rc);
}
--index;
if (index < first_index)
GOTO(out, rc = 0);
- tail = (void *)rec - sizeof(*tail);
+ tail = (void *)tail - tail->lrt_len;
}
}
RETURN(rc);
}
EXPORT_SYMBOL(llog_reverse_process);
+
+/**
+ * new llog API
+ *
+ * API functions:
+ * llog_open - open llog, may not exist
+ * llog_exist - check if llog exists
+ * llog_close - close opened llog, pair for open, frees llog_handle
+ * llog_declare_create - declare llog creation
+ * llog_create - create new llog on disk, need transaction handle
+ * llog_declare_write_rec - declaration of llog write
+ * llog_write_rec - write llog record on disk, need transaction handle
+ * llog_declare_add - declare llog catalog record addition
+ * llog_add - add llog record in catalog, need transaction handle
+ */
+int llog_exist(struct llog_handle *loghandle)
+{
+ struct llog_operations *lop;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_exist == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_exist(loghandle);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_exist);
+
+int llog_declare_create(const struct lu_env *env,
+ struct llog_handle *loghandle, struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_declare_create == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_declare_create(env, loghandle, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_create);
+
+int llog_create(const struct lu_env *env, struct llog_handle *handle,
+ struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_create == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_create(env, handle, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_create);
+
+int llog_declare_write_rec(const struct lu_env *env,
+ struct llog_handle *handle,
+ struct llog_rec_hdr *rec, int idx,
+ struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ LASSERT(lop);
+ if (lop->lop_declare_write_rec == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_write_rec);
+
+int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
+ struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+ int numcookies, void *buf, int idx, struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc, buflen;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+
+ LASSERT(lop);
+ if (lop->lop_write_rec == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ if (buf)
+ buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) +
+ sizeof(struct llog_rec_tail);
+ else
+ buflen = rec->lrh_len;
+ LASSERT(cfs_size_round(buflen) == buflen);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies,
+ buf, idx, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_write_rec);
+
+int llog_add(const struct lu_env *env, struct llog_handle *lgh,
+ struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+ void *buf, struct thandle *th)
+{
+ int raised, rc;
+
+ ENTRY;
+
+ if (lgh->lgh_logops->lop_add == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_add);
+
+int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
+ struct llog_rec_hdr *rec, struct thandle *th)
+{
+ int raised, rc;
+
+ ENTRY;
+
+ if (lgh->lgh_logops->lop_declare_add == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_add);
+
+/**
+ * Helper function to open llog or create it if doesn't exist.
+ * It hides all transaction handling from caller.
+ */
+int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
+ struct llog_handle **res, struct llog_logid *logid,
+ char *name)
+{
+ struct thandle *th;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
+ if (rc)
+ RETURN(rc);
+
+ if (llog_exist(*res))
+ RETURN(0);
+
+ if ((*res)->lgh_obj != NULL) {
+ struct dt_device *d;
+
+ d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
+
+ th = dt_trans_create(env, d);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ rc = llog_declare_create(env, *res, th);
+ if (rc == 0) {
+ rc = dt_trans_start_local(env, d, th);
+ if (rc == 0)
+ rc = llog_create(env, *res, th);
+ }
+ dt_trans_stop(env, d, th);
+ } else {
+ /* lvfs compat code */
+ LASSERT((*res)->lgh_file == NULL);
+ rc = llog_create(env, *res, NULL);
+ }
+out:
+ if (rc)
+ llog_close(env, *res);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_open_create);
+
+/**
+ * Helper function to delete existent llog.
+ */
+int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
+ struct llog_logid *logid, char *name)
+{
+ struct llog_handle *handle;
+ int rc = 0, rc2;
+
+ ENTRY;
+
+ /* nothing to erase */
+ if (name == NULL && logid == NULL)
+ RETURN(0);
+
+ rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
+ if (rc < 0)
+ RETURN(rc);
+
+ rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
+ if (rc == 0)
+ rc = llog_destroy(env, handle);
+
+ rc2 = llog_close(env, handle);
+ if (rc == 0)
+ rc = rc2;
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_erase);
+
+/*
+ * Helper function for write record in llog.
+ * It hides all transaction handling from caller.
+ * Valid only with local llog.
+ */
+int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
+ struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
+ int cookiecount, void *buf, int idx)
+{
+ int rc;
+
+ ENTRY;
+
+ LASSERT(loghandle);
+ LASSERT(loghandle->lgh_ctxt);
+
+ if (loghandle->lgh_obj != NULL) {
+ struct dt_device *dt;
+ struct thandle *th;
+
+ dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
+
+ th = dt_trans_create(env, dt);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
+ if (rc)
+ GOTO(out_trans, rc);
+
+ rc = dt_trans_start_local(env, dt, th);
+ if (rc)
+ GOTO(out_trans, rc);
+
+ cfs_down_write(&loghandle->lgh_lock);
+ rc = llog_write_rec(env, loghandle, rec, reccookie,
+ cookiecount, buf, idx, th);
+ cfs_up_write(&loghandle->lgh_lock);
+out_trans:
+ dt_trans_stop(env, dt, th);
+ } else { /* lvfs compatibility */
+ cfs_down_write(&loghandle->lgh_lock);
+ rc = llog_write_rec(env, loghandle, rec, reccookie,
+ cookiecount, buf, idx, NULL);
+ cfs_up_write(&loghandle->lgh_lock);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_write);
+
+int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
+ struct llog_handle **lgh, struct llog_logid *logid,
+ char *name, enum llog_open_param open_param)
+{
+ int raised;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(ctxt);
+ LASSERT(ctxt->loc_logops);
+
+ if (ctxt->loc_logops->lop_open == NULL) {
+ *lgh = NULL;
+ RETURN(-EOPNOTSUPP);
+ }
+
+ *lgh = llog_alloc_handle();
+ if (*lgh == NULL)
+ RETURN(-ENOMEM);
+ (*lgh)->lgh_ctxt = ctxt;
+ (*lgh)->lgh_logops = ctxt->loc_logops;
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ if (rc) {
+ llog_free_handle(*lgh);
+ *lgh = NULL;
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_open);
+
+int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
+{
+ struct llog_operations *lop;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ GOTO(out, rc);
+ if (lop->lop_close == NULL)
+ GOTO(out, -EOPNOTSUPP);
+ rc = lop->lop_close(env, loghandle);
+out:
+ llog_free_handle(loghandle);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_close);