X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog.c;h=932b3847b8013ddc72e271f3739bcb13bcac2f37;hb=e2677595ab7fff054c5a728cb1b3e5080bd01f5e;hp=a69023305e054b5c8ef792dd1cfa9d63efe934d7;hpb=c3d20d0a57e1eebd021ec34323b1223a20e7df16;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index a690233..932b384 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -47,9 +47,6 @@ #define DEBUG_SUBSYSTEM S_LOG -#ifndef __KERNEL__ -#include -#endif #include #include @@ -69,8 +66,8 @@ struct llog_handle *llog_alloc_handle(void) init_rwsem(&loghandle->lgh_lock); spin_lock_init(&loghandle->lgh_hdr_lock); - CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry); - cfs_atomic_set(&loghandle->lgh_refcount, 1); + INIT_LIST_HEAD(&loghandle->u.phd.phd_entry); + atomic_set(&loghandle->lgh_refcount, 1); return loghandle; } @@ -87,9 +84,9 @@ void llog_free_handle(struct llog_handle *loghandle) goto out; if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) - LASSERT(cfs_list_empty(&loghandle->u.phd.phd_entry)); + LASSERT(list_empty(&loghandle->u.phd.phd_entry)); else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) - LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head)); + LASSERT(list_empty(&loghandle->u.chd.chd_head)); LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE); OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); out: @@ -98,13 +95,13 @@ out: void llog_handle_get(struct llog_handle *loghandle) { - cfs_atomic_inc(&loghandle->lgh_refcount); + atomic_inc(&loghandle->lgh_refcount); } void llog_handle_put(struct llog_handle *loghandle) { - LASSERT(cfs_atomic_read(&loghandle->lgh_refcount) > 0); - if (cfs_atomic_dec_and_test(&loghandle->lgh_refcount)) + LASSERT(atomic_read(&loghandle->lgh_refcount) > 0); + if (atomic_dec_and_test(&loghandle->lgh_refcount)) llog_free_handle(loghandle); } @@ -146,11 +143,11 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, loghandle->lgh_id.lgl_ogen, rc); GOTO(out_err, rc); } - RETURN(1); + RETURN(LLOG_DEL_PLAIN); } spin_unlock(&loghandle->lgh_hdr_lock); - rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0); + rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX); if (rc < 0) { CERROR("%s: fail to write header for llog #"DOSTID "#%08x: rc = %d\n", @@ -252,8 +249,8 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle, } } if (flags & LLOG_F_IS_CAT) { - LASSERT(cfs_list_empty(&handle->u.chd.chd_head)); - CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head); + LASSERT(list_empty(&handle->u.chd.chd_head)); + INIT_LIST_HEAD(&handle->u.chd.chd_head); llh->llh_size = sizeof(struct llog_logid_rec); } else if (!(flags & LLOG_F_IS_PLAIN)) { CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n", @@ -378,10 +375,9 @@ repeat: if (rc == LLOG_PROC_BREAK) { GOTO(out, rc); } else if (rc == LLOG_DEL_RECORD) { - llog_cancel_rec(lpi->lpi_env, - loghandle, - rec->lrh_index); - rc = 0; + rc = llog_cancel_rec(lpi->lpi_env, + loghandle, + rec->lrh_index); } if (rc) GOTO(out, rc); @@ -400,12 +396,25 @@ out: if (cd != NULL) cd->lpcd_last_idx = last_called_index; + if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) { + /* something bad happened to the processing of a local + * llog file, probably I/O error or the log got corrupted.. + * to be able to finally release the log we discard any + * remaining bits in the header */ + CERROR("Local llog found corrupted\n"); + while (index <= last_index) { + if (ext2_test_bit(index, llh->llh_bitmap) != 0) + llog_cancel_rec(lpi->lpi_env, loghandle, index); + index++; + } + rc = 0; + } + OBD_FREE(buf, LLOG_CHUNK_SIZE); lpi->lpi_rc = rc; return 0; } -#ifdef __KERNEL__ static int llog_process_thread_daemonize(void *arg) { struct llog_process_info *lpi = arg; @@ -427,7 +436,6 @@ out: complete(&lpi->lpi_completion); return rc; } -#endif int llog_process_or_fork(const struct lu_env *env, struct llog_handle *loghandle, @@ -448,39 +456,40 @@ int llog_process_or_fork(const struct lu_env *env, lpi->lpi_cbdata = data; lpi->lpi_catdata = catdata; -#ifdef __KERNEL__ if (fork) { + struct task_struct *task; + /* The new thread can't use parent env, * init the new one in llog_process_thread_daemonize. */ lpi->lpi_env = NULL; init_completion(&lpi->lpi_completion); - rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi, - "llog_process_thread")); - if (IS_ERR_VALUE(rc)) { + task = kthread_run(llog_process_thread_daemonize, lpi, + "llog_process_thread"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); CERROR("%s: cannot start thread: rc = %d\n", loghandle->lgh_ctxt->loc_obd->obd_name, rc); - OBD_FREE_PTR(lpi); - RETURN(rc); + GOTO(out_lpi, rc); } wait_for_completion(&lpi->lpi_completion); } else { lpi->lpi_env = env; llog_process_thread(lpi); } -#else - lpi->lpi_env = env; - llog_process_thread(lpi); -#endif - rc = lpi->lpi_rc; - OBD_FREE_PTR(lpi); - RETURN(rc); + rc = lpi->lpi_rc; + +out_lpi: + OBD_FREE_PTR(lpi); + RETURN(rc); } EXPORT_SYMBOL(llog_process_or_fork); int llog_process(const struct lu_env *env, struct llog_handle *loghandle, llog_cb_t cb, void *data, void *catdata) { - return llog_process_or_fork(env, loghandle, cb, data, catdata, true); + int rc; + rc = llog_process_or_fork(env, loghandle, cb, data, catdata, true); + return rc == LLOG_DEL_PLAIN ? 0 : rc; } EXPORT_SYMBOL(llog_process); @@ -551,9 +560,8 @@ int llog_reverse_process(const struct lu_env *env, if (rc == LLOG_PROC_BREAK) { GOTO(out, rc); } else if (rc == LLOG_DEL_RECORD) { - llog_cancel_rec(env, loghandle, - tail->lrt_index); - rc = 0; + rc = llog_cancel_rec(env, loghandle, + tail->lrt_index); } if (rc) GOTO(out, rc); @@ -683,7 +691,7 @@ EXPORT_SYMBOL(llog_declare_write_rec); int llog_write_rec(const struct lu_env *env, struct llog_handle *handle, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, - int numcookies, void *buf, int idx, struct thandle *th) + int idx, struct thandle *th) { struct llog_operations *lop; int raised, rc, buflen; @@ -698,18 +706,13 @@ int llog_write_rec(const struct lu_env *env, struct llog_handle *handle, if (lop->lop_write_rec == NULL) RETURN(-EOPNOTSUPP); - if (buf) - buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) + - sizeof(struct llog_rec_tail); - else - buflen = rec->lrh_len; + buflen = rec->lrh_len; LASSERT(cfs_size_round(buflen) == buflen); raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE); if (!raised) cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies, - buf, idx, th); + rc = lop->lop_write_rec(env, handle, rec, logcookies, idx, th); if (!raised) cfs_cap_lower(CFS_CAP_SYS_RESOURCE); RETURN(rc); @@ -718,7 +721,7 @@ EXPORT_SYMBOL(llog_write_rec); int llog_add(const struct lu_env *env, struct llog_handle *lgh, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, - void *buf, struct thandle *th) + struct thandle *th) { int raised, rc; @@ -730,7 +733,7 @@ int llog_add(const struct lu_env *env, struct llog_handle *lgh, raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE); if (!raised) cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th); + rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, th); if (!raised) cfs_cap_lower(CFS_CAP_SYS_RESOURCE); RETURN(rc); @@ -836,8 +839,7 @@ EXPORT_SYMBOL(llog_erase); * Valid only with local llog. */ int llog_write(const struct lu_env *env, struct llog_handle *loghandle, - struct llog_rec_hdr *rec, struct llog_cookie *reccookie, - int cookiecount, void *buf, int idx) + struct llog_rec_hdr *rec, int idx) { struct dt_device *dt; struct thandle *th; @@ -864,8 +866,7 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle, GOTO(out_trans, rc); down_write(&loghandle->lgh_lock); - rc = llog_write_rec(env, loghandle, rec, reccookie, - cookiecount, buf, idx, th); + rc = llog_write_rec(env, loghandle, rec, NULL, idx, th); up_write(&loghandle->lgh_lock); out_trans: dt_trans_stop(env, dt, th); @@ -929,6 +930,17 @@ out: } EXPORT_SYMBOL(llog_close); +/** + * Helper function to get the llog size in records. It is used by MGS + * mostly to check that config llog exists and contains data. + * + * \param[in] env execution environment + * \param[in] ctxt llog context + * \param[in] name llog name + * + * \retval true if there are records in llog besides a header + * \retval false on error or llog without records + */ int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt, char *name) { @@ -950,7 +962,8 @@ int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt, out_close: llog_close(env, llh); out: - /* header is record 1 */ + /* The header is record 1, the llog is still considered as empty + * if there is only header */ return (rc <= 1); } EXPORT_SYMBOL(llog_is_empty); @@ -961,7 +974,7 @@ int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh, struct llog_handle *copy_llh = data; /* Append all records */ - return llog_write(env, copy_llh, rec, NULL, 0, NULL, -1); + return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX); } EXPORT_SYMBOL(llog_copy_handler);