#include <lustre_log.h>
#include "llog_internal.h"
-/* Allocate a new log or catalog handle */
+/*
+ * Allocate a new log or catalog handle
+ * Used inside llog_open().
+ */
struct llog_handle *llog_alloc_handle(void)
{
- struct llog_handle *loghandle;
- ENTRY;
+ struct llog_handle *loghandle;
OBD_ALLOC_PTR(loghandle);
- if (loghandle == NULL)
- RETURN(ERR_PTR(-ENOMEM));
+ if (loghandle == NULL)
+ return ERR_PTR(-ENOMEM);
- cfs_init_rwsem(&loghandle->lgh_lock);
+ cfs_init_rwsem(&loghandle->lgh_lock);
cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
- RETURN(loghandle);
+ return loghandle;
}
-EXPORT_SYMBOL(llog_alloc_handle);
-
+/*
+ * Free llog handle and header data if exists. Used in llog_close() only
+ */
void llog_free_handle(struct llog_handle *loghandle)
{
- if (!loghandle)
- return;
-
- if (!loghandle->lgh_hdr)
- goto out;
- if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
- cfs_list_del_init(&loghandle->u.phd.phd_entry);
- if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
- LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
+ if (!loghandle)
+ return;
+
+ if (!loghandle->lgh_hdr)
+ goto out;
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
+ cfs_list_del_init(&loghandle->u.phd.phd_entry);
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
+ LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
out:
OBD_FREE_PTR(loghandle);
}
-EXPORT_SYMBOL(llog_free_handle);
/* returns negative on error; 0 if success; 1 if success & log destroyed */
int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
}
cfs_spin_unlock(&loghandle->lgh_hdr_lock);
- rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
+ rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
if (rc < 0) {
CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
"#%08x: rc = %d\n",
}
EXPORT_SYMBOL(llog_cancel_rec);
+static int llog_read_header(const struct lu_env *env,
+ struct llog_handle *handle,
+ struct obd_uuid *uuid)
+{
+ struct llog_operations *lop;
+ int rc;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+
+ if (lop->lop_read_header == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_read_header(env, handle);
+ if (rc == LLOG_EEMPTY) {
+ struct llog_log_hdr *llh = handle->lgh_hdr;
+
+ handle->lgh_last_idx = 0; /* header is record with index 0 */
+ llh->llh_count = 1; /* for the header record */
+ llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
+ llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
+ llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
+ llh->llh_timestamp = cfs_time_current_sec();
+ if (uuid)
+ memcpy(&llh->llh_tgtuuid, uuid,
+ sizeof(llh->llh_tgtuuid));
+ llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+ ext2_set_bit(0, llh->llh_bitmap);
+ rc = 0;
+ }
+ return rc;
+}
+
int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
int flags, struct obd_uuid *uuid)
{
- int rc;
- struct llog_log_hdr *llh;
- ENTRY;
- LASSERT(handle->lgh_hdr == NULL);
+ struct llog_log_hdr *llh;
+ int rc;
- OBD_ALLOC_PTR(llh);
- if (llh == NULL)
- RETURN(-ENOMEM);
- handle->lgh_hdr = llh;
- /* first assign flags to use llog_client_ops */
- llh->llh_flags = flags;
- rc = llog_read_header(env, handle);
- if (rc == 0) {
- flags = llh->llh_flags;
- if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
- CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
- (char *)llh->llh_tgtuuid.uuid);
- rc = -EEXIST;
- }
- GOTO(out, rc);
- } else if (rc != LLOG_EEMPTY || !flags) {
- /* set a pesudo flag for initialization */
- flags = LLOG_F_IS_CAT;
- GOTO(out, rc);
- }
- rc = 0;
-
- handle->lgh_last_idx = 0; /* header is record with index 0 */
- llh->llh_count = 1; /* for the header record */
- llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
- llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
- llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
- llh->llh_timestamp = cfs_time_current_sec();
- if (uuid)
- memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
- llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
- ext2_set_bit(0, llh->llh_bitmap);
+ ENTRY;
+ LASSERT(handle->lgh_hdr == NULL);
-out:
- if (flags & LLOG_F_IS_CAT) {
+ OBD_ALLOC_PTR(llh);
+ if (llh == NULL)
+ RETURN(-ENOMEM);
+ handle->lgh_hdr = llh;
+ /* first assign flags to use llog_client_ops */
+ llh->llh_flags = flags;
+ rc = llog_read_header(env, handle, uuid);
+ if (rc == 0) {
+ if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
+ flags & LLOG_F_IS_CAT) ||
+ (llh->llh_flags & LLOG_F_IS_CAT &&
+ flags & LLOG_F_IS_PLAIN))) {
+ CERROR("%s: llog type is %s but initializing %s\n",
+ handle->lgh_ctxt->loc_obd->obd_name,
+ llh->llh_flags & LLOG_F_IS_CAT ?
+ "catalog" : "plain",
+ flags & LLOG_F_IS_CAT ? "catalog" : "plain");
+ GOTO(out, rc = -EINVAL);
+ } else if (llh->llh_flags &
+ (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
+ /*
+ * it is possible to open llog without specifying llog
+ * type so it is taken from llh_flags
+ */
+ flags = llh->llh_flags;
+ } else {
+ /* for some reason the llh_flags has no type set */
+ CERROR("llog type is not specified!\n");
+ GOTO(out, rc = -EINVAL);
+ }
+ if (unlikely(uuid &&
+ !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
+ CERROR("%s: llog uuid mismatch: %s/%s\n",
+ handle->lgh_ctxt->loc_obd->obd_name,
+ (char *)uuid->uuid,
+ (char *)llh->llh_tgtuuid.uuid);
+ GOTO(out, rc = -EEXIST);
+ }
+ }
+ if (flags & LLOG_F_IS_CAT) {
LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
llh->llh_size = sizeof(struct llog_logid_rec);
} else if (!(flags & LLOG_F_IS_PLAIN)) {
- CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
+ CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
+ handle->lgh_ctxt->loc_obd->obd_name,
flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
rc = -EINVAL;
}
-
+out:
if (rc) {
OBD_FREE_PTR(llh);
handle->lgh_hdr = NULL;
}
EXPORT_SYMBOL(llog_init_handle);
-int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
-{
- struct llog_operations *lop;
- int rc;
- ENTRY;
-
- rc = llog_handle2ops(loghandle, &lop);
- if (rc)
- GOTO(out, rc);
- if (lop->lop_close == NULL)
- GOTO(out, -EOPNOTSUPP);
- rc = lop->lop_close(env, loghandle);
- out:
- llog_free_handle(loghandle);
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_close);
-
static int llog_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
OBD_FREE_PTR(lpi);
RETURN(rc);
}
+EXPORT_SYMBOL(llog_process_or_fork);
int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
llog_cb_t cb, void *data, void *catdata)
{
- return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
+ return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
}
EXPORT_SYMBOL(llog_process);
RETURN(rc);
}
EXPORT_SYMBOL(llog_reverse_process);
+
+/**
+ * new llog API
+ *
+ * API functions:
+ * llog_open - open llog, may not exist
+ * llog_exist - check if llog exists
+ * llog_close - close opened llog, pair for open, frees llog_handle
+ * llog_declare_create - declare llog creation
+ * llog_create - create new llog on disk, need transaction handle
+ * llog_declare_write_rec - declaration of llog write
+ * llog_write_rec - write llog record on disk, need transaction handle
+ * llog_declare_add - declare llog catalog record addition
+ * llog_add - add llog record in catalog, need transaction handle
+ */
+int llog_exist(struct llog_handle *loghandle)
+{
+ struct llog_operations *lop;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_exist == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_exist(loghandle);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_exist);
+
+int llog_declare_create(const struct lu_env *env,
+ struct llog_handle *loghandle, struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_declare_create == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_declare_create(env, loghandle, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_create);
+
+int llog_create(const struct lu_env *env, struct llog_handle *handle,
+ struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_create == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_create(env, handle, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_create);
+
+int llog_declare_write_rec(const struct lu_env *env,
+ struct llog_handle *handle,
+ struct llog_rec_hdr *rec, int idx,
+ struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ LASSERT(lop);
+ if (lop->lop_declare_write_rec == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_write_rec);
+
+int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
+ struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+ int numcookies, void *buf, int idx, struct thandle *th)
+{
+ struct llog_operations *lop;
+ int raised, rc, buflen;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+
+ LASSERT(lop);
+ if (lop->lop_write_rec == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ if (buf)
+ buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) +
+ sizeof(struct llog_rec_tail);
+ else
+ buflen = rec->lrh_len;
+ LASSERT(cfs_size_round(buflen) == buflen);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies,
+ buf, idx, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_write_rec);
+
+int llog_add(const struct lu_env *env, struct llog_handle *lgh,
+ struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+ void *buf, struct thandle *th)
+{
+ int raised, rc;
+
+ ENTRY;
+
+ if (lgh->lgh_logops->lop_add == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_add);
+
+int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
+ struct llog_rec_hdr *rec, struct thandle *th)
+{
+ int raised, rc;
+
+ ENTRY;
+
+ if (lgh->lgh_logops->lop_declare_add == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_add);
+
+/**
+ * Helper function to open llog or create it if doesn't exist.
+ * It hides all transaction handling from caller.
+ */
+int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
+ struct llog_handle **res, struct llog_logid *logid,
+ char *name)
+{
+ struct thandle *th;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
+ if (rc)
+ RETURN(rc);
+
+ if (llog_exist(*res))
+ RETURN(0);
+
+ if ((*res)->lgh_obj != NULL) {
+ struct dt_device *d;
+
+ d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
+
+ th = dt_trans_create(env, d);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ rc = llog_declare_create(env, *res, th);
+ if (rc == 0) {
+ rc = dt_trans_start_local(env, d, th);
+ if (rc == 0)
+ rc = llog_create(env, *res, th);
+ }
+ dt_trans_stop(env, d, th);
+ } else {
+ /* lvfs compat code */
+ LASSERT((*res)->lgh_file == NULL);
+ rc = llog_create(env, *res, NULL);
+ }
+out:
+ if (rc)
+ llog_close(env, *res);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_open_create);
+
+/**
+ * Helper function to delete existent llog.
+ */
+int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
+ struct llog_logid *logid, char *name)
+{
+ struct llog_handle *handle;
+ int rc = 0, rc2;
+
+ ENTRY;
+
+ /* nothing to erase */
+ if (name == NULL && logid == NULL)
+ RETURN(0);
+
+ rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
+ if (rc < 0)
+ RETURN(rc);
+
+ rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
+ if (rc == 0)
+ rc = llog_destroy(env, handle);
+
+ rc2 = llog_close(env, handle);
+ if (rc == 0)
+ rc = rc2;
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_erase);
+
+/*
+ * Helper function for write record in llog.
+ * It hides all transaction handling from caller.
+ * Valid only with local llog.
+ */
+int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
+ struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
+ int cookiecount, void *buf, int idx)
+{
+ int rc;
+
+ ENTRY;
+
+ LASSERT(loghandle);
+ LASSERT(loghandle->lgh_ctxt);
+
+ if (loghandle->lgh_obj != NULL) {
+ struct dt_device *dt;
+ struct thandle *th;
+
+ dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
+
+ th = dt_trans_create(env, dt);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
+ if (rc)
+ GOTO(out_trans, rc);
+
+ rc = dt_trans_start_local(env, dt, th);
+ if (rc)
+ GOTO(out_trans, rc);
+
+ cfs_down_write(&loghandle->lgh_lock);
+ rc = llog_write_rec(env, loghandle, rec, reccookie,
+ cookiecount, buf, idx, th);
+ cfs_up_write(&loghandle->lgh_lock);
+out_trans:
+ dt_trans_stop(env, dt, th);
+ } else { /* lvfs compatibility */
+ cfs_down_write(&loghandle->lgh_lock);
+ rc = llog_write_rec(env, loghandle, rec, reccookie,
+ cookiecount, buf, idx, NULL);
+ cfs_up_write(&loghandle->lgh_lock);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_write);
+
+int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
+ struct llog_handle **lgh, struct llog_logid *logid,
+ char *name, enum llog_open_param open_param)
+{
+ int raised;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(ctxt);
+ LASSERT(ctxt->loc_logops);
+
+ if (ctxt->loc_logops->lop_open == NULL) {
+ *lgh = NULL;
+ RETURN(-EOPNOTSUPP);
+ }
+
+ *lgh = llog_alloc_handle();
+ if (*lgh == NULL)
+ RETURN(-ENOMEM);
+ (*lgh)->lgh_ctxt = ctxt;
+ (*lgh)->lgh_logops = ctxt->loc_logops;
+
+ raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+ if (!raised)
+ cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+ rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
+ if (!raised)
+ cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+ if (rc) {
+ llog_free_handle(*lgh);
+ *lgh = NULL;
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_open);
+
+int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
+{
+ struct llog_operations *lop;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ GOTO(out, rc);
+ if (lop->lop_close == NULL)
+ GOTO(out, -EOPNOTSUPP);
+ rc = lop->lop_close(env, loghandle);
+out:
+ llog_free_handle(loghandle);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_close);