4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/llog.c
34 * OST<->MDS recovery logging infrastructure.
35 * Invariants in implementation:
36 * - we do not share logs among different OST<->MDS connections, so that
37 * if an OST or MDS fails it need only look at log(s) relevant to itself
39 * Author: Andreas Dilger <adilger@clusterfs.com>
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mikhail Pershin <tappro@whamcloud.com>
44 #define DEBUG_SUBSYSTEM S_LOG
46 #include <linux/pid_namespace.h>
47 #include <linux/kthread.h>
48 #include <llog_swab.h>
49 #include <lustre_log.h>
50 #include <obd_support.h>
51 #include <obd_class.h>
52 #include "llog_internal.h"
54 * Allocate a new log or catalog handle
55 * Used inside llog_open().
57 static struct llog_handle *llog_alloc_handle(void)
59 struct llog_handle *loghandle;
61 OBD_ALLOC_PTR(loghandle);
62 if (loghandle == NULL)
65 init_rwsem(&loghandle->lgh_lock);
66 mutex_init(&loghandle->lgh_hdr_mutex);
67 INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
68 atomic_set(&loghandle->lgh_refcount, 1);
74 * Free llog handle and header data if exists. Used in llog_close() only
76 static void llog_free_handle(struct llog_handle *loghandle)
78 LASSERT(loghandle != NULL);
80 /* failed llog_init_handle */
81 if (loghandle->lgh_hdr == NULL)
84 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
85 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
86 else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
87 LASSERT(list_empty(&loghandle->u.chd.chd_head));
88 OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
90 OBD_FREE_PTR(loghandle);
93 void llog_handle_get(struct llog_handle *loghandle)
95 atomic_inc(&loghandle->lgh_refcount);
98 void llog_handle_put(struct llog_handle *loghandle)
100 LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
101 if (atomic_dec_and_test(&loghandle->lgh_refcount))
102 llog_free_handle(loghandle);
105 static int llog_declare_destroy(const struct lu_env *env,
106 struct llog_handle *handle,
109 struct llog_operations *lop;
114 rc = llog_handle2ops(handle, &lop);
117 if (lop->lop_declare_destroy == NULL)
120 rc = lop->lop_declare_destroy(env, handle, th);
125 int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
128 struct llog_operations *lop;
132 rc = llog_handle2ops(handle, &lop);
135 if (lop->lop_destroy == NULL)
138 LASSERT(handle->lgh_obj != NULL);
139 if (!dt_object_exists(handle->lgh_obj))
142 rc = lop->lop_destroy(env, handle, th);
147 int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
149 struct llog_operations *lop;
150 struct dt_device *dt;
156 rc = llog_handle2ops(handle, &lop);
159 if (lop->lop_destroy == NULL)
162 if (handle->lgh_obj == NULL) {
163 /* if lgh_obj == NULL, then it is from client side destroy */
164 rc = lop->lop_destroy(env, handle, NULL);
168 if (!dt_object_exists(handle->lgh_obj))
171 dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
173 if (unlikely(unlikely(dt->dd_rdonly)))
176 th = dt_trans_create(env, dt);
180 rc = llog_declare_destroy(env, handle, th);
184 rc = dt_trans_start_local(env, dt, th);
188 rc = lop->lop_destroy(env, handle, th);
191 dt_trans_stop(env, dt, th);
195 EXPORT_SYMBOL(llog_destroy);
197 /* returns negative on error; 0 if success; 1 if success & log destroyed */
198 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
201 struct llog_thread_info *lgi = llog_info(env);
202 struct dt_device *dt;
203 struct llog_log_hdr *llh;
208 bool subtract_count = false;
212 LASSERT(loghandle != NULL);
213 LASSERT(loghandle->lgh_ctxt != NULL);
214 LASSERT(loghandle->lgh_obj != NULL);
216 llh = loghandle->lgh_hdr;
218 CDEBUG(D_RPCTRACE, "Canceling %d in log "DFID"\n", index,
219 PFID(&loghandle->lgh_id.lgl_oi.oi_fid));
222 CERROR("Can't cancel index 0 which is header\n");
226 dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
228 if (unlikely(unlikely(dt->dd_rdonly)))
231 th = dt_trans_create(env, dt);
235 rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, index, th);
239 if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY)) {
240 rc = llog_declare_destroy(env, loghandle, th);
245 th->th_wait_submit = 1;
246 rc = dt_trans_start_local(env, dt, th);
250 down_write(&loghandle->lgh_lock);
252 mutex_lock(&loghandle->lgh_hdr_mutex);
253 if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
254 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
255 GOTO(out_unlock, rc);
258 loghandle->lgh_hdr->llh_count--;
259 subtract_count = true;
261 /* Since llog_process_thread use lgi_cookie, it`s better to save them
262 * and restore after using
264 tmp_lgc_index = lgi->lgi_cookie.lgc_index;
265 /* Pass this index to llog_osd_write_rec(), which will use the index
266 * to only update the necesary bitmap. */
267 lgi->lgi_cookie.lgc_index = index;
269 rc = llog_write_rec(env, loghandle, &llh->llh_hdr, &lgi->lgi_cookie,
270 LLOG_HEADER_IDX, th);
271 lgi->lgi_cookie.lgc_index = tmp_lgc_index;
274 GOTO(out_unlock, rc);
276 if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
277 (llh->llh_count == 1) &&
278 ((loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
279 (loghandle->u.phd.phd_cat_handle != NULL &&
280 loghandle->u.phd.phd_cat_handle->u.chd.chd_current_log !=
282 /* never try to destroy it again */
283 llh->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
284 rc = llog_trans_destroy(env, loghandle, th);
286 /* Sigh, can not destroy the final plain llog, but
287 * the bitmap has been clearly, so the record can not
288 * be accessed anymore, let's return 0 for now, and
289 * the orphan will be handled by LFSCK. */
290 CERROR("%s: can't destroy empty llog "DFID": rc = %d\n",
291 loghandle->lgh_ctxt->loc_obd->obd_name,
292 PFID(&loghandle->lgh_id.lgl_oi.oi_fid), rc);
293 GOTO(out_unlock, rc = 0);
299 mutex_unlock(&loghandle->lgh_hdr_mutex);
300 up_write(&loghandle->lgh_lock);
302 rc1 = dt_trans_stop(env, dt, th);
305 if (rc < 0 && subtract_count) {
306 mutex_lock(&loghandle->lgh_hdr_mutex);
307 loghandle->lgh_hdr->llh_count++;
308 ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
309 mutex_unlock(&loghandle->lgh_hdr_mutex);
314 int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
315 const struct obd_uuid *uuid)
317 struct llog_operations *lop;
321 rc = llog_handle2ops(handle, &lop);
325 if (lop->lop_read_header == NULL)
328 rc = lop->lop_read_header(env, handle);
329 if (rc == LLOG_EEMPTY) {
330 struct llog_log_hdr *llh = handle->lgh_hdr;
332 /* lrh_len should be initialized in llog_init_handle */
333 handle->lgh_last_idx = 0; /* header is record with index 0 */
334 llh->llh_count = 1; /* for the header record */
335 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
336 LASSERT(handle->lgh_ctxt->loc_chunk_size >=
337 LLOG_MIN_CHUNK_SIZE);
338 llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
339 llh->llh_hdr.lrh_index = 0;
340 llh->llh_timestamp = ktime_get_real_seconds();
342 memcpy(&llh->llh_tgtuuid, uuid,
343 sizeof(llh->llh_tgtuuid));
344 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
345 /* Since update llog header might also call this function,
346 * let's reset the bitmap to 0 here */
347 memset(LLOG_HDR_BITMAP(llh), 0, llh->llh_hdr.lrh_len -
348 llh->llh_bitmap_offset -
349 sizeof(llh->llh_tail));
350 ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
351 LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
352 LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
357 EXPORT_SYMBOL(llog_read_header);
359 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
360 int flags, struct obd_uuid *uuid)
362 struct llog_log_hdr *llh;
363 enum llog_flag fmt = flags & LLOG_F_EXT_MASK;
365 int chunk_size = handle->lgh_ctxt->loc_chunk_size;
368 LASSERT(handle->lgh_hdr == NULL);
370 LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
371 OBD_ALLOC_LARGE(llh, chunk_size);
375 handle->lgh_hdr = llh;
376 handle->lgh_hdr_size = chunk_size;
377 /* first assign flags to use llog_client_ops */
378 llh->llh_flags = flags;
379 rc = llog_read_header(env, handle, uuid);
381 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
382 flags & LLOG_F_IS_CAT) ||
383 (llh->llh_flags & LLOG_F_IS_CAT &&
384 flags & LLOG_F_IS_PLAIN))) {
385 CERROR("%s: llog type is %s but initializing %s\n",
386 handle->lgh_ctxt->loc_obd->obd_name,
387 llh->llh_flags & LLOG_F_IS_CAT ?
389 flags & LLOG_F_IS_CAT ? "catalog" : "plain");
390 GOTO(out, rc = -EINVAL);
391 } else if (llh->llh_flags &
392 (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
394 * it is possible to open llog without specifying llog
395 * type so it is taken from llh_flags
397 flags = llh->llh_flags;
399 /* for some reason the llh_flags has no type set */
400 CERROR("llog type is not specified!\n");
401 GOTO(out, rc = -EINVAL);
404 !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
405 CERROR("%s: llog uuid mismatch: %s/%s\n",
406 handle->lgh_ctxt->loc_obd->obd_name,
408 (char *)llh->llh_tgtuuid.uuid);
409 GOTO(out, rc = -EEXIST);
412 if (flags & LLOG_F_IS_CAT) {
413 LASSERT(list_empty(&handle->u.chd.chd_head));
414 INIT_LIST_HEAD(&handle->u.chd.chd_head);
415 llh->llh_size = sizeof(struct llog_logid_rec);
416 llh->llh_flags |= LLOG_F_IS_FIXSIZE;
417 } else if (!(flags & LLOG_F_IS_PLAIN)) {
418 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
419 handle->lgh_ctxt->loc_obd->obd_name,
420 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
423 llh->llh_flags |= fmt;
426 OBD_FREE_LARGE(llh, chunk_size);
427 handle->lgh_hdr = NULL;
431 EXPORT_SYMBOL(llog_init_handle);
433 static int llog_process_thread(void *arg)
435 struct llog_process_info *lpi = arg;
436 struct llog_handle *loghandle = lpi->lpi_loghandle;
437 struct llog_log_hdr *llh = loghandle->lgh_hdr;
438 struct llog_process_cat_data *cd = lpi->lpi_catdata;
439 struct llog_thread_info *lti;
443 int rc = 0, index = 1, last_index;
445 int last_called_index = 0;
446 bool repeated = false;
453 lti = lpi->lpi_env == NULL ? NULL : llog_info(lpi->lpi_env);
455 cur_offset = chunk_size = llh->llh_hdr.lrh_len;
456 /* expect chunk_size to be power of two */
457 LASSERT(is_power_of_2(chunk_size));
459 OBD_ALLOC_LARGE(buf, chunk_size);
461 lpi->lpi_rc = -ENOMEM;
466 last_called_index = cd->lpcd_first_idx;
467 index = cd->lpcd_first_idx + 1;
469 if (cd != NULL && cd->lpcd_last_idx)
470 last_index = cd->lpcd_last_idx;
472 last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
475 struct llog_rec_hdr *rec;
476 off_t chunk_offset = 0;
477 unsigned int buf_offset = 0;
481 /* skip records not set in bitmap */
482 while (index <= last_index &&
483 !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
486 /* There are no indices prior the last_index */
487 if (index > last_index)
490 CDEBUG(D_OTHER, "index: %d last_index %d\n", index,
494 /* get the buf with our target record; avoid old garbage */
495 memset(buf, 0, chunk_size);
496 /* the record index for outdated chunk data */
497 lh_last_idx = loghandle->lgh_last_idx + 1;
498 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
499 index, &cur_offset, buf, chunk_size);
501 CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu,"
502 " buf_offset %u, rc = %d\n", cur_offset,
503 (__u64)chunk_offset, buf_offset, rc);
504 /* we`ve tried to reread the chunk, but there is no
506 if (rc == -EIO && repeated && (chunk_offset + buf_offset) ==
512 /* NB: after llog_next_block() call the cur_offset is the
513 * offset of the next block after read one.
514 * The absolute offset of the current chunk is calculated
515 * from cur_offset value and stored in chunk_offset variable.
517 if ((cur_offset & (chunk_size - 1)) != 0) {
518 partial_chunk = true;
519 chunk_offset = cur_offset & ~(chunk_size - 1);
521 partial_chunk = false;
522 chunk_offset = cur_offset - chunk_size;
525 /* NB: when rec->lrh_len is accessed it is already swabbed
526 * since it is used at the "end" of the loop and the rec
527 * swabbing is done at the beginning of the loop. */
528 for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
529 (char *)rec < buf + chunk_size;
530 rec = llog_rec_hdr_next(rec)) {
532 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
535 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
536 lustre_swab_llog_rec(rec);
538 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
539 rec->lrh_type, rec->lrh_index);
541 /* for partial chunk the end of it is zeroed, check
542 * for index 0 to distinguish it. */
543 if (partial_chunk && rec->lrh_index == 0) {
544 /* concurrent llog_add() might add new records
545 * while llog_processing, check this is not
546 * the case and re-read the current chunk
549 /* lgh_last_idx could be less then index
550 * for catalog, if catalog is wrapped */
551 if ((index > loghandle->lgh_last_idx &&
552 !(loghandle->lgh_hdr->llh_flags &
553 LLOG_F_IS_CAT)) || repeated ||
554 (loghandle->lgh_obj != NULL &&
555 dt_object_remote(loghandle->lgh_obj)))
557 /* <2 records means no more records
558 * if the last record we processed was
559 * the final one, then the underlying
560 * object might have been destroyed yet.
561 * we better don't access that.. */
562 mutex_lock(&loghandle->lgh_hdr_mutex);
563 records = loghandle->lgh_hdr->llh_count;
564 mutex_unlock(&loghandle->lgh_hdr_mutex);
567 CDEBUG(D_OTHER, "Re-read last llog buffer for "
568 "new records, index %u, last %u\n",
569 index, loghandle->lgh_last_idx);
570 /* save offset inside buffer for the re-read */
571 buf_offset = (char *)rec - (char *)buf;
572 cur_offset = chunk_offset;
579 if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
580 CWARN("%s: invalid length %d in llog "DFID
581 "record for index %d/%d\n",
582 loghandle->lgh_ctxt->loc_obd->obd_name,
584 PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
585 rec->lrh_index, index);
587 GOTO(out, rc = -EINVAL);
590 if (rec->lrh_index < index) {
591 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
596 if (rec->lrh_index != index) {
597 CERROR("%s: "DFID" Invalid record: index %u"
598 " but expected %u\n",
599 loghandle->lgh_ctxt->loc_obd->obd_name,
600 PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
601 rec->lrh_index, index);
602 GOTO(out, rc = -ERANGE);
606 "lrh_index: %d lrh_len: %d (%d remains)\n",
607 rec->lrh_index, rec->lrh_len,
608 (int)(buf + chunk_size - (char *)rec));
610 /* lgh_cur_offset is used only at llog_test_3 */
611 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
614 OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PROCESS_TIMEOUT, 2);
616 /* if set, process the callback on this record */
617 if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
618 struct llog_cookie *lgc;
621 /* the bitmap could be changed during processing
622 * records from the chunk. For wrapped catalog
623 * it means we can read deleted record and try to
624 * process it. Check this case and reread the chunk.
625 * Checking the race with llog_add the bit is set
626 * after incrementation of lgh_last_idx */
627 if (index == lh_last_idx &&
629 (loghandle->lgh_last_idx + 1)) {
630 /* save offset inside buffer for
632 buf_offset = (char *)rec - (char *)buf;
633 cur_offset = chunk_offset;
639 lgc = <i->lgi_cookie;
640 /* store lu_env for recursive calls */
641 tmp_off = lgc->lgc_offset;
642 tmp_idx = lgc->lgc_index;
644 lgc->lgc_offset = (char *)rec -
645 (char *)buf + chunk_offset;
646 lgc->lgc_index = rec->lrh_index;
648 /* using lu_env for passing record offset to
649 * llog_write through various callbacks */
650 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
652 last_called_index = index;
655 lgc->lgc_offset = tmp_off;
656 lgc->lgc_index = tmp_idx;
659 if (rc == LLOG_PROC_BREAK) {
661 } else if (rc == LLOG_DEL_RECORD) {
662 rc = llog_cancel_rec(lpi->lpi_env,
668 /* some stupid callbacks directly cancel records
669 * and delete llog. Check it and stop
671 if (loghandle->lgh_hdr == NULL ||
672 loghandle->lgh_hdr->llh_count == 1)
675 /* exit if the last index is reached */
676 if (index >= last_index)
684 cd->lpcd_last_idx = last_called_index;
686 if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
687 if (dt_object_remote(loghandle->lgh_obj)) {
688 /* If it is remote object, then -EIO might means
689 * disconnection or eviction, let's return -EAGAIN,
690 * so for update recovery log processing, it will
691 * retry until the umount or abort recovery, see
692 * lod_sub_recovery_thread() */
693 CERROR("%s retry remote llog process\n",
694 loghandle->lgh_ctxt->loc_obd->obd_name);
697 /* something bad happened to the processing of a local
698 * llog file, probably I/O error or the log got
699 * corrupted to be able to finally release the log we
700 * discard any remaining bits in the header */
701 CERROR("%s: Local llog found corrupted #"DOSTID":%x"
702 " %s index %d count %d\n",
703 loghandle->lgh_ctxt->loc_obd->obd_name,
704 POSTID(&loghandle->lgh_id.lgl_oi),
705 loghandle->lgh_id.lgl_ogen,
706 ((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" :
707 "plain"), index, llh->llh_count);
709 while (index <= last_index) {
710 if (ext2_test_bit(index,
711 LLOG_HDR_BITMAP(llh)) != 0)
712 llog_cancel_rec(lpi->lpi_env, loghandle,
720 OBD_FREE_LARGE(buf, chunk_size);
725 static int llog_process_thread_daemonize(void *arg)
727 struct llog_process_info *lpi = arg;
730 struct nsproxy *new_ns, *curr_ns = current->nsproxy;
732 task_lock(lpi->lpi_reftask);
733 new_ns = lpi->lpi_reftask->nsproxy;
734 if (curr_ns != new_ns) {
737 current->nsproxy = new_ns;
738 /* XXX: we should call put_nsproxy() instead of
739 * atomic_dec(&ns->count) directly. But put_nsproxy() cannot be
740 * used outside of the kernel itself, because it calls
741 * free_nsproxy() which is not exported by the kernel
742 * (defined in kernel/nsproxy.c) */
743 atomic_dec(&curr_ns->count);
745 task_unlock(lpi->lpi_reftask);
749 /* client env has no keys, tags is just 0 */
750 rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
755 rc = llog_process_thread(arg);
759 complete(&lpi->lpi_completion);
763 int llog_process_or_fork(const struct lu_env *env,
764 struct llog_handle *loghandle,
765 llog_cb_t cb, void *data, void *catdata, bool fork)
767 struct llog_process_info *lpi;
774 CERROR("cannot alloc pointer\n");
777 lpi->lpi_loghandle = loghandle;
779 lpi->lpi_cbdata = data;
780 lpi->lpi_catdata = catdata;
783 struct task_struct *task;
785 /* The new thread can't use parent env,
786 * init the new one in llog_process_thread_daemonize. */
788 init_completion(&lpi->lpi_completion);
789 /* take reference to current, so that
790 * llog_process_thread_daemonize() can use it to switch to
791 * namespace associated with current */
792 lpi->lpi_reftask = current;
793 task = kthread_run(llog_process_thread_daemonize, lpi,
794 "llog_process_thread");
797 CERROR("%s: cannot start thread: rc = %d\n",
798 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
801 wait_for_completion(&lpi->lpi_completion);
804 llog_process_thread(lpi);
812 EXPORT_SYMBOL(llog_process_or_fork);
814 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
815 llog_cb_t cb, void *data, void *catdata)
818 rc = llog_process_or_fork(env, loghandle, cb, data, catdata, true);
819 return rc == LLOG_DEL_PLAIN ? 0 : rc;
821 EXPORT_SYMBOL(llog_process);
823 int llog_reverse_process(const struct lu_env *env,
824 struct llog_handle *loghandle, llog_cb_t cb,
825 void *data, void *catdata)
827 struct llog_log_hdr *llh = loghandle->lgh_hdr;
828 struct llog_process_cat_data *cd = catdata;
830 int rc = 0, first_index = 1, index, idx;
831 __u32 chunk_size = llh->llh_hdr.lrh_len;
834 OBD_ALLOC_LARGE(buf, chunk_size);
839 first_index = cd->lpcd_first_idx + 1;
840 if (cd != NULL && cd->lpcd_last_idx)
841 index = cd->lpcd_last_idx;
843 index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
846 struct llog_rec_hdr *rec;
847 struct llog_rec_tail *tail;
849 /* skip records not set in bitmap */
850 while (index >= first_index &&
851 !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
854 LASSERT(index >= first_index - 1);
855 if (index == first_index - 1)
858 /* get the buf with our target record; avoid old garbage */
859 memset(buf, 0, chunk_size);
860 rc = llog_prev_block(env, loghandle, index, buf, chunk_size);
865 idx = rec->lrh_index;
866 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
867 while (idx < index) {
868 rec = (void *)rec + rec->lrh_len;
869 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
870 lustre_swab_llog_rec(rec);
873 LASSERT(idx == index);
874 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
876 /* process records in buffer, starting where we found one */
877 while ((void *)tail > buf) {
878 if (tail->lrt_index == 0)
879 GOTO(out, rc = 0); /* no more records */
881 /* if set, process the callback on this record */
882 if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
883 rec = (void *)tail - tail->lrt_len +
886 rc = cb(env, loghandle, rec, data);
887 if (rc == LLOG_PROC_BREAK) {
889 } else if (rc == LLOG_DEL_RECORD) {
890 rc = llog_cancel_rec(env, loghandle,
897 /* previous record, still in buffer? */
899 if (index < first_index)
901 tail = (void *)tail - tail->lrt_len;
907 OBD_FREE_LARGE(buf, chunk_size);
910 EXPORT_SYMBOL(llog_reverse_process);
916 * llog_open - open llog, may not exist
917 * llog_exist - check if llog exists
918 * llog_close - close opened llog, pair for open, frees llog_handle
919 * llog_declare_create - declare llog creation
920 * llog_create - create new llog on disk, need transaction handle
921 * llog_declare_write_rec - declaration of llog write
922 * llog_write_rec - write llog record on disk, need transaction handle
923 * llog_declare_add - declare llog catalog record addition
924 * llog_add - add llog record in catalog, need transaction handle
926 int llog_exist(struct llog_handle *loghandle)
928 struct llog_operations *lop;
933 rc = llog_handle2ops(loghandle, &lop);
936 if (lop->lop_exist == NULL)
939 rc = lop->lop_exist(loghandle);
942 EXPORT_SYMBOL(llog_exist);
944 int llog_declare_create(const struct lu_env *env,
945 struct llog_handle *loghandle, struct thandle *th)
947 struct llog_operations *lop;
952 rc = llog_handle2ops(loghandle, &lop);
955 if (lop->lop_declare_create == NULL)
958 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
960 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
961 rc = lop->lop_declare_create(env, loghandle, th);
963 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
967 int llog_create(const struct lu_env *env, struct llog_handle *handle,
970 struct llog_operations *lop;
975 rc = llog_handle2ops(handle, &lop);
978 if (lop->lop_create == NULL)
981 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
983 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
984 rc = lop->lop_create(env, handle, th);
986 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
990 int llog_declare_write_rec(const struct lu_env *env,
991 struct llog_handle *handle,
992 struct llog_rec_hdr *rec, int idx,
995 struct llog_operations *lop;
1000 rc = llog_handle2ops(handle, &lop);
1004 if (lop->lop_declare_write_rec == NULL)
1005 RETURN(-EOPNOTSUPP);
1007 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
1009 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
1010 rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
1012 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
1016 int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
1017 struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
1018 int idx, struct thandle *th)
1020 struct llog_operations *lop;
1021 int raised, rc, buflen;
1025 /* API sanity checks */
1026 if (handle == NULL) {
1027 CERROR("loghandle is missed\n");
1029 } else if (handle->lgh_obj == NULL) {
1030 CERROR("loghandle %p with NULL object\n",
1033 } else if (th == NULL) {
1034 CERROR("%s: missed transaction handle\n",
1035 handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name);
1037 } else if (handle->lgh_hdr == NULL) {
1038 CERROR("%s: loghandle %p with no header\n",
1039 handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name,
1044 rc = llog_handle2ops(handle, &lop);
1048 if (lop->lop_write_rec == NULL)
1049 RETURN(-EOPNOTSUPP);
1051 buflen = rec->lrh_len;
1052 LASSERT(cfs_size_round(buflen) == buflen);
1054 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
1056 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
1057 rc = lop->lop_write_rec(env, handle, rec, logcookies, idx, th);
1059 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
1063 int llog_add(const struct lu_env *env, struct llog_handle *lgh,
1064 struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
1071 if (lgh->lgh_logops->lop_add == NULL)
1072 RETURN(-EOPNOTSUPP);
1074 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
1076 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
1077 rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, th);
1079 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
1082 EXPORT_SYMBOL(llog_add);
1084 int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
1085 struct llog_rec_hdr *rec, struct thandle *th)
1091 if (lgh->lgh_logops->lop_declare_add == NULL)
1092 RETURN(-EOPNOTSUPP);
1094 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
1096 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
1097 rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
1099 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
1102 EXPORT_SYMBOL(llog_declare_add);
1105 * Helper function to open llog or create it if doesn't exist.
1106 * It hides all transaction handling from caller.
1108 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
1109 struct llog_handle **res, struct llog_logid *logid,
1112 struct dt_device *d;
1118 rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
1122 if (llog_exist(*res))
1125 LASSERT((*res)->lgh_obj != NULL);
1127 d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
1129 if (unlikely(unlikely(d->dd_rdonly)))
1132 th = dt_trans_create(env, d);
1134 GOTO(out, rc = PTR_ERR(th));
1136 /* Create update llog object synchronously, which
1137 * happens during inialization process see
1138 * lod_sub_prep_llog(), to make sure the update
1139 * llog object is created before corss-MDT writing
1140 * updates into the llog object */
1141 if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
1144 th->th_wait_submit = 1;
1145 rc = llog_declare_create(env, *res, th);
1147 rc = dt_trans_start_local(env, d, th);
1149 rc = llog_create(env, *res, th);
1151 dt_trans_stop(env, d, th);
1154 llog_close(env, *res);
1157 EXPORT_SYMBOL(llog_open_create);
1160 * Helper function to delete existent llog.
1162 int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
1163 struct llog_logid *logid, char *name)
1165 struct llog_handle *handle;
1170 /* nothing to erase */
1171 if (name == NULL && logid == NULL)
1174 rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
1178 rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
1180 rc = llog_destroy(env, handle);
1182 rc2 = llog_close(env, handle);
1187 EXPORT_SYMBOL(llog_erase);
1190 * Helper function for write record in llog.
1191 * It hides all transaction handling from caller.
1192 * Valid only with local llog.
1194 int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
1195 struct llog_rec_hdr *rec, int idx)
1197 struct dt_device *dt;
1205 LASSERT(loghandle->lgh_ctxt);
1206 LASSERT(loghandle->lgh_obj != NULL);
1208 dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
1210 if (unlikely(unlikely(dt->dd_rdonly)))
1213 th = dt_trans_create(env, dt);
1215 RETURN(PTR_ERR(th));
1217 rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
1219 GOTO(out_trans, rc);
1221 th->th_wait_submit = 1;
1222 rc = dt_trans_start_local(env, dt, th);
1224 GOTO(out_trans, rc);
1226 need_cookie = !(idx == LLOG_HEADER_IDX || idx == LLOG_NEXT_IDX);
1228 down_write(&loghandle->lgh_lock);
1230 struct llog_thread_info *lti = llog_info(env);
1232 /* cookie comes from llog_process_thread */
1233 rc = llog_write_rec(env, loghandle, rec, <i->lgi_cookie,
1234 rec->lrh_index, th);
1235 /* upper layer didn`t pass cookie so change rc */
1236 rc = (rc == 1 ? 0 : rc);
1238 rc = llog_write_rec(env, loghandle, rec, NULL, idx, th);
1241 up_write(&loghandle->lgh_lock);
1243 dt_trans_stop(env, dt, th);
1246 EXPORT_SYMBOL(llog_write);
1248 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
1249 struct llog_handle **lgh, struct llog_logid *logid,
1250 char *name, enum llog_open_param open_param)
1258 LASSERT(ctxt->loc_logops);
1260 if (ctxt->loc_logops->lop_open == NULL) {
1262 RETURN(-EOPNOTSUPP);
1265 *lgh = llog_alloc_handle();
1268 (*lgh)->lgh_ctxt = ctxt;
1269 (*lgh)->lgh_logops = ctxt->loc_logops;
1271 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
1273 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
1274 rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
1276 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
1278 llog_free_handle(*lgh);
1283 EXPORT_SYMBOL(llog_open);
1285 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
1287 struct llog_operations *lop;
1292 rc = llog_handle2ops(loghandle, &lop);
1295 if (lop->lop_close == NULL)
1296 GOTO(out, rc = -EOPNOTSUPP);
1297 rc = lop->lop_close(env, loghandle);
1299 llog_handle_put(loghandle);
1302 EXPORT_SYMBOL(llog_close);
1305 * Helper function to get the llog size in records. It is used by MGS
1306 * mostly to check that config llog exists and contains data.
1308 * \param[in] env execution environment
1309 * \param[in] ctxt llog context
1310 * \param[in] name llog name
1312 * \retval true if there are records in llog besides a header
1313 * \retval false on error or llog without records
1315 int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
1318 struct llog_handle *llh;
1321 rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1323 if (likely(rc == -ENOENT))
1328 rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1330 GOTO(out_close, rc);
1331 rc = llog_get_size(llh);
1334 llog_close(env, llh);
1336 /* The header is record 1, the llog is still considered as empty
1337 * if there is only header */
1340 EXPORT_SYMBOL(llog_is_empty);
1342 int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
1343 struct llog_rec_hdr *rec, void *data)
1345 struct llog_handle *copy_llh = data;
1347 /* Append all records */
1348 return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
1351 /* backup plain llog */
1352 int llog_backup(const struct lu_env *env, struct obd_device *obd,
1353 struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
1354 char *name, char *backup)
1356 struct llog_handle *llh, *bllh;
1361 /* open original log */
1362 rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1364 /* the -ENOENT case is also reported to the caller
1365 * but silently so it should handle that if needed.
1368 CERROR("%s: failed to open log %s: rc = %d\n",
1369 obd->obd_name, name, rc);
1373 rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1375 GOTO(out_close, rc);
1377 /* Make sure there's no old backup log */
1378 rc = llog_erase(env, bctxt, NULL, backup);
1379 if (rc < 0 && rc != -ENOENT)
1380 GOTO(out_close, rc);
1382 /* open backup log */
1383 rc = llog_open_create(env, bctxt, &bllh, NULL, backup);
1385 CERROR("%s: failed to open backup logfile %s: rc = %d\n",
1386 obd->obd_name, backup, rc);
1387 GOTO(out_close, rc);
1390 /* check that backup llog is not the same object as original one */
1391 if (llh->lgh_obj == bllh->lgh_obj) {
1392 CERROR("%s: backup llog %s to itself (%s), objects %p/%p\n",
1393 obd->obd_name, name, backup, llh->lgh_obj,
1395 GOTO(out_backup, rc = -EEXIST);
1398 rc = llog_init_handle(env, bllh, LLOG_F_IS_PLAIN, NULL);
1400 GOTO(out_backup, rc);
1402 /* Copy log record by record */
1403 rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
1406 CERROR("%s: failed to backup log %s: rc = %d\n",
1407 obd->obd_name, name, rc);
1409 llog_close(env, bllh);
1411 llog_close(env, llh);
1414 EXPORT_SYMBOL(llog_backup);
1416 /* Get size of llog */
1417 __u64 llog_size(const struct lu_env *env, struct llog_handle *llh)
1422 rc = llh->lgh_obj->do_ops->do_attr_get(env, llh->lgh_obj, &la);
1424 CERROR("%s: attr_get failed for "DFID": rc = %d\n",
1425 llh->lgh_ctxt->loc_obd->obd_name,
1426 PFID(&llh->lgh_id.lgl_oi.oi_fid), rc);
1432 EXPORT_SYMBOL(llog_size);