4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lustre/obdclass/llog.c
36 * OST<->MDS recovery logging infrastructure.
37 * Invariants in implementation:
38 * - we do not share logs among different OST<->MDS connections, so that
39 * if an OST or MDS fails it need only look at log(s) relevant to itself
41 * Author: Andreas Dilger <adilger@clusterfs.com>
44 #define DEBUG_SUBSYSTEM S_LOG
47 #include <liblustre.h>
50 #include <obd_class.h>
51 #include <lustre_log.h>
52 #include <libcfs/list.h>
53 #include "llog_internal.h"
55 /* Allocate a new log or catalog handle */
56 struct llog_handle *llog_alloc_handle(void)
58 struct llog_handle *loghandle;
61 OBD_ALLOC(loghandle, sizeof(*loghandle));
62 if (loghandle == NULL)
63 RETURN(ERR_PTR(-ENOMEM));
65 cfs_init_rwsem(&loghandle->lgh_lock);
69 EXPORT_SYMBOL(llog_alloc_handle);
72 void llog_free_handle(struct llog_handle *loghandle)
77 if (!loghandle->lgh_hdr)
79 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
80 cfs_list_del_init(&loghandle->u.phd.phd_entry);
81 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
82 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
83 OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
86 OBD_FREE(loghandle, sizeof(*loghandle));
88 EXPORT_SYMBOL(llog_free_handle);
90 /* returns negative on error; 0 if success; 1 if success & log destroyed */
91 int llog_cancel_rec(struct llog_handle *loghandle, int index)
93 struct llog_log_hdr *llh = loghandle->lgh_hdr;
97 CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
98 index, loghandle->lgh_id.lgl_oid);
101 CERROR("Can't cancel index 0 which is header\n");
105 if (!ext2_clear_bit(index, llh->llh_bitmap)) {
106 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
112 if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
113 (llh->llh_count == 1) &&
114 (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
115 rc = llog_destroy(loghandle);
117 CERROR("Failure destroying log after last cancel: %d\n",
119 ext2_set_bit(index, llh->llh_bitmap);
127 rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
129 CERROR("Failure re-writing header %d\n", rc);
130 ext2_set_bit(index, llh->llh_bitmap);
135 EXPORT_SYMBOL(llog_cancel_rec);
137 int llog_init_handle(struct llog_handle *handle, int flags,
138 struct obd_uuid *uuid)
141 struct llog_log_hdr *llh;
143 LASSERT(handle->lgh_hdr == NULL);
145 OBD_ALLOC(llh, sizeof(*llh));
148 handle->lgh_hdr = llh;
149 /* first assign flags to use llog_client_ops */
150 llh->llh_flags = flags;
151 rc = llog_read_header(handle);
153 flags = llh->llh_flags;
154 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
155 CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
156 (char *)llh->llh_tgtuuid.uuid);
160 } else if (rc != LLOG_EEMPTY || !flags) {
161 /* set a pesudo flag for initialization */
162 flags = LLOG_F_IS_CAT;
167 handle->lgh_last_idx = 0; /* header is record with index 0 */
168 llh->llh_count = 1; /* for the header record */
169 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
170 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
171 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
172 llh->llh_timestamp = cfs_time_current_sec();
174 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
175 llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
176 ext2_set_bit(0, llh->llh_bitmap);
179 if (flags & LLOG_F_IS_CAT) {
180 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
181 llh->llh_size = sizeof(struct llog_logid_rec);
182 } else if (flags & LLOG_F_IS_PLAIN) {
183 CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
185 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
186 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
191 OBD_FREE(llh, sizeof(*llh));
192 handle->lgh_hdr = NULL;
196 EXPORT_SYMBOL(llog_init_handle);
198 int llog_close(struct llog_handle *loghandle)
200 struct llog_operations *lop;
204 rc = llog_handle2ops(loghandle, &lop);
207 if (lop->lop_close == NULL)
208 GOTO(out, -EOPNOTSUPP);
209 rc = lop->lop_close(loghandle);
211 llog_free_handle(loghandle);
214 EXPORT_SYMBOL(llog_close);
216 static int llog_process_thread(void *arg)
218 struct llog_process_info *lpi = (struct llog_process_info *)arg;
219 struct llog_handle *loghandle = lpi->lpi_loghandle;
220 struct llog_log_hdr *llh = loghandle->lgh_hdr;
221 struct llog_process_cat_data *cd = lpi->lpi_catdata;
223 __u64 cur_offset = LLOG_CHUNK_SIZE;
225 int rc = 0, index = 1, last_index;
226 int saved_index = 0, last_called_index = 0;
230 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
232 lpi->lpi_rc = -ENOMEM;
234 cfs_complete(&lpi->lpi_completion);
239 if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
240 cfs_daemonize_ctxt("llog_process_thread");
243 last_called_index = cd->lpcd_first_idx;
244 index = cd->lpcd_first_idx + 1;
246 if (cd != NULL && cd->lpcd_last_idx)
247 last_index = cd->lpcd_last_idx;
249 last_index = LLOG_BITMAP_BYTES * 8 - 1;
252 struct llog_rec_hdr *rec;
254 /* skip records not set in bitmap */
255 while (index <= last_index &&
256 !ext2_test_bit(index, llh->llh_bitmap))
259 LASSERT(index <= last_index + 1);
260 if (index == last_index + 1)
263 CDEBUG(D_OTHER, "index: %d last_index %d\n",
266 /* get the buf with our target record; avoid old garbage */
267 memset(buf, 0, LLOG_CHUNK_SIZE);
268 last_offset = cur_offset;
269 rc = llog_next_block(loghandle, &saved_index, index,
270 &cur_offset, buf, LLOG_CHUNK_SIZE);
274 /* NB: when rec->lrh_len is accessed it is already swabbed
275 * since it is used at the "end" of the loop and the rec
276 * swabbing is done at the beginning of the loop. */
277 for (rec = (struct llog_rec_hdr *)buf;
278 (char *)rec < buf + LLOG_CHUNK_SIZE;
279 rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
281 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
284 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
285 lustre_swab_llog_rec(rec, NULL);
287 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
288 rec->lrh_type, rec->lrh_index);
290 if (rec->lrh_index == 0)
291 GOTO(out, 0); /* no more records */
293 if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
294 CWARN("invalid length %d in llog record for "
295 "index %d/%d\n", rec->lrh_len,
296 rec->lrh_index, index);
297 GOTO(out, rc = -EINVAL);
300 if (rec->lrh_index < index) {
301 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
307 "lrh_index: %d lrh_len: %d (%d remains)\n",
308 rec->lrh_index, rec->lrh_len,
309 (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
311 loghandle->lgh_cur_idx = rec->lrh_index;
312 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
315 /* if set, process the callback on this record */
316 if (ext2_test_bit(index, llh->llh_bitmap)) {
317 rc = lpi->lpi_cb(loghandle, rec,
319 last_called_index = index;
320 if (rc == LLOG_PROC_BREAK) {
322 } else if (rc == LLOG_DEL_RECORD) {
323 llog_cancel_rec(loghandle,
330 CDEBUG(D_OTHER, "Skipped index %d\n", index);
333 /* next record, still in buffer? */
335 if (index > last_index)
342 cd->lpcd_last_idx = last_called_index;
344 OBD_FREE(buf, LLOG_CHUNK_SIZE);
347 cfs_complete(&lpi->lpi_completion);
352 int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
353 void *data, void *catdata, int flags)
355 struct llog_process_info *lpi;
361 CERROR("cannot alloc pointer\n");
364 lpi->lpi_loghandle = loghandle;
366 lpi->lpi_cbdata = data;
367 lpi->lpi_catdata = catdata;
368 lpi->lpi_flags = flags;
371 cfs_init_completion(&lpi->lpi_completion);
372 rc = cfs_create_thread(llog_process_thread, lpi, CFS_DAEMON_FLAGS);
374 CERROR("cannot start thread: %d\n", rc);
378 cfs_wait_for_completion(&lpi->lpi_completion);
380 llog_process_thread(lpi);
386 EXPORT_SYMBOL(llog_process_flags);
388 int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
389 void *data, void *catdata)
391 return llog_process_flags(loghandle, cb, data, catdata, 0);
393 EXPORT_SYMBOL(llog_process);
395 inline int llog_get_size(struct llog_handle *loghandle)
397 if (loghandle && loghandle->lgh_hdr)
398 return loghandle->lgh_hdr->llh_count;
401 EXPORT_SYMBOL(llog_get_size);
403 int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
404 void *data, void *catdata)
406 struct llog_log_hdr *llh = loghandle->lgh_hdr;
407 struct llog_process_cat_data *cd = catdata;
409 int rc = 0, first_index = 1, index, idx;
412 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
417 first_index = cd->lpcd_first_idx + 1;
418 if (cd != NULL && cd->lpcd_last_idx)
419 index = cd->lpcd_last_idx;
421 index = LLOG_BITMAP_BYTES * 8 - 1;
424 struct llog_rec_hdr *rec;
425 struct llog_rec_tail *tail;
427 /* skip records not set in bitmap */
428 while (index >= first_index &&
429 !ext2_test_bit(index, llh->llh_bitmap))
432 LASSERT(index >= first_index - 1);
433 if (index == first_index - 1)
436 /* get the buf with our target record; avoid old garbage */
437 memset(buf, 0, LLOG_CHUNK_SIZE);
438 rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
443 idx = le32_to_cpu(rec->lrh_index);
445 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
446 while (idx < index) {
447 rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
450 tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
452 /* process records in buffer, starting where we found one */
453 while ((void *)tail > buf) {
454 rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
457 if (rec->lrh_index == 0)
458 GOTO(out, 0); /* no more records */
460 /* if set, process the callback on this record */
461 if (ext2_test_bit(index, llh->llh_bitmap)) {
462 rc = cb(loghandle, rec, data);
463 if (rc == LLOG_PROC_BREAK) {
470 /* previous record, still in buffer? */
472 if (index < first_index)
474 tail = (void *)rec - sizeof(*tail);
480 OBD_FREE(buf, LLOG_CHUNK_SIZE);
483 EXPORT_SYMBOL(llog_reverse_process);