4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lustre/obdclass/llog.c
36 * OST<->MDS recovery logging infrastructure.
37 * Invariants in implementation:
38 * - we do not share logs among different OST<->MDS connections, so that
39 * if an OST or MDS fails it need only look at log(s) relevant to itself
41 * Author: Andreas Dilger <adilger@clusterfs.com>
42 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
43 * Author: Mikhail Pershin <tappro@whamcloud.com>
46 #define DEBUG_SUBSYSTEM S_LOG
49 #include <liblustre.h>
52 #include <obd_class.h>
53 #include <lustre_log.h>
54 #include "llog_internal.h"
57 * Allocate a new log or catalog handle
58 * Used inside llog_open().
60 struct llog_handle *llog_alloc_handle(void)
62 struct llog_handle *loghandle;
64 OBD_ALLOC_PTR(loghandle);
65 if (loghandle == NULL)
66 return ERR_PTR(-ENOMEM);
68 cfs_init_rwsem(&loghandle->lgh_lock);
69 cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
70 CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
76 * Free llog handle and header data if exists. Used in llog_close() only
78 void llog_free_handle(struct llog_handle *loghandle)
83 if (!loghandle->lgh_hdr)
85 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86 cfs_list_del_init(&loghandle->u.phd.phd_entry);
87 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
89 LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
90 OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
93 OBD_FREE_PTR(loghandle);
96 /* returns negative on error; 0 if success; 1 if success & log destroyed */
97 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
100 struct llog_log_hdr *llh = loghandle->lgh_hdr;
104 CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
105 index, loghandle->lgh_id.lgl_oid);
108 CERROR("Can't cancel index 0 which is header\n");
112 cfs_spin_lock(&loghandle->lgh_hdr_lock);
113 if (!ext2_clear_bit(index, llh->llh_bitmap)) {
114 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
115 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
121 if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
122 (llh->llh_count == 1) &&
123 (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
124 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
125 rc = llog_destroy(env, loghandle);
127 CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
129 loghandle->lgh_ctxt->loc_obd->obd_name,
130 loghandle->lgh_id.lgl_oid,
131 loghandle->lgh_id.lgl_oseq,
132 loghandle->lgh_id.lgl_ogen, rc);
137 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
139 rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
141 CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
143 loghandle->lgh_ctxt->loc_obd->obd_name,
144 loghandle->lgh_id.lgl_oid,
145 loghandle->lgh_id.lgl_oseq,
146 loghandle->lgh_id.lgl_ogen, rc);
151 cfs_spin_lock(&loghandle->lgh_hdr_lock);
152 ext2_set_bit(index, llh->llh_bitmap);
154 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
157 EXPORT_SYMBOL(llog_cancel_rec);
159 static int llog_read_header(const struct lu_env *env,
160 struct llog_handle *handle,
161 struct obd_uuid *uuid)
163 struct llog_operations *lop;
166 rc = llog_handle2ops(handle, &lop);
170 if (lop->lop_read_header == NULL)
173 rc = lop->lop_read_header(env, handle);
174 if (rc == LLOG_EEMPTY) {
175 struct llog_log_hdr *llh = handle->lgh_hdr;
177 handle->lgh_last_idx = 0; /* header is record with index 0 */
178 llh->llh_count = 1; /* for the header record */
179 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
180 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
181 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
182 llh->llh_timestamp = cfs_time_current_sec();
184 memcpy(&llh->llh_tgtuuid, uuid,
185 sizeof(llh->llh_tgtuuid));
186 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
187 ext2_set_bit(0, llh->llh_bitmap);
193 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
194 int flags, struct obd_uuid *uuid)
196 struct llog_log_hdr *llh;
200 LASSERT(handle->lgh_hdr == NULL);
205 handle->lgh_hdr = llh;
206 /* first assign flags to use llog_client_ops */
207 llh->llh_flags = flags;
208 rc = llog_read_header(env, handle, uuid);
210 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
211 flags & LLOG_F_IS_CAT) ||
212 (llh->llh_flags & LLOG_F_IS_CAT &&
213 flags & LLOG_F_IS_PLAIN))) {
214 CERROR("%s: llog type is %s but initializing %s\n",
215 handle->lgh_ctxt->loc_obd->obd_name,
216 llh->llh_flags & LLOG_F_IS_CAT ?
218 flags & LLOG_F_IS_CAT ? "catalog" : "plain");
219 GOTO(out, rc = -EINVAL);
220 } else if (llh->llh_flags &
221 (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
223 * it is possible to open llog without specifying llog
224 * type so it is taken from llh_flags
226 flags = llh->llh_flags;
228 /* for some reason the llh_flags has no type set */
229 CERROR("llog type is not specified!\n");
230 GOTO(out, rc = -EINVAL);
233 !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
234 CERROR("%s: llog uuid mismatch: %s/%s\n",
235 handle->lgh_ctxt->loc_obd->obd_name,
237 (char *)llh->llh_tgtuuid.uuid);
238 GOTO(out, rc = -EEXIST);
241 if (flags & LLOG_F_IS_CAT) {
242 LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
243 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
244 llh->llh_size = sizeof(struct llog_logid_rec);
245 } else if (!(flags & LLOG_F_IS_PLAIN)) {
246 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
247 handle->lgh_ctxt->loc_obd->obd_name,
248 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
254 handle->lgh_hdr = NULL;
258 EXPORT_SYMBOL(llog_init_handle);
260 static int llog_process_thread(void *arg)
262 struct llog_process_info *lpi = arg;
263 struct llog_handle *loghandle = lpi->lpi_loghandle;
264 struct llog_log_hdr *llh = loghandle->lgh_hdr;
265 struct llog_process_cat_data *cd = lpi->lpi_catdata;
267 __u64 cur_offset = LLOG_CHUNK_SIZE;
269 int rc = 0, index = 1, last_index;
271 int last_called_index = 0;
277 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
279 lpi->lpi_rc = -ENOMEM;
284 last_called_index = cd->lpcd_first_idx;
285 index = cd->lpcd_first_idx + 1;
287 if (cd != NULL && cd->lpcd_last_idx)
288 last_index = cd->lpcd_last_idx;
290 last_index = LLOG_BITMAP_BYTES * 8 - 1;
293 struct llog_rec_hdr *rec;
295 /* skip records not set in bitmap */
296 while (index <= last_index &&
297 !ext2_test_bit(index, llh->llh_bitmap))
300 LASSERT(index <= last_index + 1);
301 if (index == last_index + 1)
304 CDEBUG(D_OTHER, "index: %d last_index %d\n",
307 /* get the buf with our target record; avoid old garbage */
308 memset(buf, 0, LLOG_CHUNK_SIZE);
309 last_offset = cur_offset;
310 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
311 index, &cur_offset, buf, LLOG_CHUNK_SIZE);
315 /* NB: when rec->lrh_len is accessed it is already swabbed
316 * since it is used at the "end" of the loop and the rec
317 * swabbing is done at the beginning of the loop. */
318 for (rec = (struct llog_rec_hdr *)buf;
319 (char *)rec < buf + LLOG_CHUNK_SIZE;
320 rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
322 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
325 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
326 lustre_swab_llog_rec(rec);
328 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
329 rec->lrh_type, rec->lrh_index);
331 if (rec->lrh_index == 0) {
332 /* probably another rec just got added? */
333 if (index <= loghandle->lgh_last_idx)
334 GOTO(repeat, rc = 0);
335 GOTO(out, rc = 0); /* no more records */
337 if (rec->lrh_len == 0 ||
338 rec->lrh_len > LLOG_CHUNK_SIZE) {
339 CWARN("invalid length %d in llog record for "
340 "index %d/%d\n", rec->lrh_len,
341 rec->lrh_index, index);
342 GOTO(out, rc = -EINVAL);
345 if (rec->lrh_index < index) {
346 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
352 "lrh_index: %d lrh_len: %d (%d remains)\n",
353 rec->lrh_index, rec->lrh_len,
354 (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
356 loghandle->lgh_cur_idx = rec->lrh_index;
357 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
360 /* if set, process the callback on this record */
361 if (ext2_test_bit(index, llh->llh_bitmap)) {
362 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
364 last_called_index = index;
365 if (rc == LLOG_PROC_BREAK) {
367 } else if (rc == LLOG_DEL_RECORD) {
368 llog_cancel_rec(lpi->lpi_env,
376 CDEBUG(D_OTHER, "Skipped index %d\n", index);
379 /* next record, still in buffer? */
381 if (index > last_index)
388 cd->lpcd_last_idx = last_called_index;
390 OBD_FREE(buf, LLOG_CHUNK_SIZE);
396 static int llog_process_thread_daemonize(void *arg)
398 struct llog_process_info *lpi = arg;
402 cfs_daemonize_ctxt("llog_process_thread");
404 /* client env has no keys, tags is just 0 */
405 rc = lu_env_init(&env, LCT_LOCAL);
410 rc = llog_process_thread(arg);
414 cfs_complete(&lpi->lpi_completion);
419 int llog_process_or_fork(const struct lu_env *env,
420 struct llog_handle *loghandle,
421 llog_cb_t cb, void *data, void *catdata, bool fork)
423 struct llog_process_info *lpi;
430 CERROR("cannot alloc pointer\n");
433 lpi->lpi_loghandle = loghandle;
435 lpi->lpi_cbdata = data;
436 lpi->lpi_catdata = catdata;
440 /* The new thread can't use parent env,
441 * init the new one in llog_process_thread_daemonize. */
443 cfs_init_completion(&lpi->lpi_completion);
444 rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
447 CERROR("%s: cannot start thread: rc = %d\n",
448 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
452 cfs_wait_for_completion(&lpi->lpi_completion);
455 llog_process_thread(lpi);
459 llog_process_thread(lpi);
466 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
467 llog_cb_t cb, void *data, void *catdata)
469 return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
471 EXPORT_SYMBOL(llog_process);
473 inline int llog_get_size(struct llog_handle *loghandle)
475 if (loghandle && loghandle->lgh_hdr)
476 return loghandle->lgh_hdr->llh_count;
479 EXPORT_SYMBOL(llog_get_size);
481 int llog_reverse_process(const struct lu_env *env,
482 struct llog_handle *loghandle, llog_cb_t cb,
483 void *data, void *catdata)
485 struct llog_log_hdr *llh = loghandle->lgh_hdr;
486 struct llog_process_cat_data *cd = catdata;
488 int rc = 0, first_index = 1, index, idx;
491 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
496 first_index = cd->lpcd_first_idx + 1;
497 if (cd != NULL && cd->lpcd_last_idx)
498 index = cd->lpcd_last_idx;
500 index = LLOG_BITMAP_BYTES * 8 - 1;
503 struct llog_rec_hdr *rec;
504 struct llog_rec_tail *tail;
506 /* skip records not set in bitmap */
507 while (index >= first_index &&
508 !ext2_test_bit(index, llh->llh_bitmap))
511 LASSERT(index >= first_index - 1);
512 if (index == first_index - 1)
515 /* get the buf with our target record; avoid old garbage */
516 memset(buf, 0, LLOG_CHUNK_SIZE);
517 rc = llog_prev_block(env, loghandle, index, buf,
523 idx = rec->lrh_index;
524 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
525 while (idx < index) {
526 rec = (void *)rec + rec->lrh_len;
527 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
528 lustre_swab_llog_rec(rec);
531 LASSERT(idx == index);
532 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
534 /* process records in buffer, starting where we found one */
535 while ((void *)tail > buf) {
536 if (tail->lrt_index == 0)
537 GOTO(out, rc = 0); /* no more records */
539 /* if set, process the callback on this record */
540 if (ext2_test_bit(index, llh->llh_bitmap)) {
541 rec = (void *)tail - tail->lrt_len +
544 rc = cb(env, loghandle, rec, data);
545 if (rc == LLOG_PROC_BREAK) {
547 } else if (rc == LLOG_DEL_RECORD) {
548 llog_cancel_rec(env, loghandle,
556 /* previous record, still in buffer? */
558 if (index < first_index)
560 tail = (void *)tail - tail->lrt_len;
566 OBD_FREE(buf, LLOG_CHUNK_SIZE);
569 EXPORT_SYMBOL(llog_reverse_process);
572 * Helper function to open llog or create it if doesn't exist.
573 * It hides all transaction handling from caller.
575 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
576 struct llog_handle **res, struct llog_logid *logid,
584 rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
588 if (llog_exist(*res))
591 if ((*res)->lgh_obj != NULL) {
594 d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
596 th = dt_trans_create(env, d);
598 GOTO(out, rc = PTR_ERR(th));
600 rc = llog_declare_create(env, *res, th);
602 rc = dt_trans_start_local(env, d, th);
604 rc = llog_create(env, *res, th);
606 dt_trans_stop(env, d, th);
608 /* lvfs compat code */
609 LASSERT((*res)->lgh_file == NULL);
610 rc = llog_create(env, *res, NULL);
614 llog_close(env, *res);
617 EXPORT_SYMBOL(llog_open_create);
620 * Helper function to delete existent llog.
622 int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
623 struct llog_logid *logid, char *name)
625 struct llog_handle *handle;
630 /* nothing to erase */
631 if (name == NULL && logid == NULL)
634 rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
638 rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
640 rc = llog_destroy(env, handle);
642 rc2 = llog_close(env, handle);
647 EXPORT_SYMBOL(llog_erase);
649 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
650 struct llog_handle **lgh, struct llog_logid *logid,
651 char *name, enum llog_open_param open_param)
659 LASSERT(ctxt->loc_logops);
661 if (ctxt->loc_logops->lop_open == NULL) {
666 *lgh = llog_alloc_handle();
669 (*lgh)->lgh_ctxt = ctxt;
670 (*lgh)->lgh_logops = ctxt->loc_logops;
672 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
674 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
675 rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
677 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
679 llog_free_handle(*lgh);
684 EXPORT_SYMBOL(llog_open);
686 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
688 struct llog_operations *lop;
693 rc = llog_handle2ops(loghandle, &lop);
696 if (lop->lop_close == NULL)
697 GOTO(out, -EOPNOTSUPP);
698 rc = lop->lop_close(env, loghandle);
700 llog_free_handle(loghandle);
703 EXPORT_SYMBOL(llog_close);