4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lustre/obdclass/llog.c
36 * OST<->MDS recovery logging infrastructure.
37 * Invariants in implementation:
38 * - we do not share logs among different OST<->MDS connections, so that
39 * if an OST or MDS fails it need only look at log(s) relevant to itself
41 * Author: Andreas Dilger <adilger@clusterfs.com>
42 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
43 * Author: Mikhail Pershin <tappro@whamcloud.com>
46 #define DEBUG_SUBSYSTEM S_LOG
49 #include <liblustre.h>
52 #include <obd_class.h>
53 #include <lustre_log.h>
54 #include "llog_internal.h"
56 /* Allocate a new log or catalog handle */
57 struct llog_handle *llog_alloc_handle(void)
59 struct llog_handle *loghandle;
62 OBD_ALLOC_PTR(loghandle);
63 if (loghandle == NULL)
64 RETURN(ERR_PTR(-ENOMEM));
66 cfs_init_rwsem(&loghandle->lgh_lock);
67 cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
68 CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
72 EXPORT_SYMBOL(llog_alloc_handle);
75 void llog_free_handle(struct llog_handle *loghandle)
80 if (!loghandle->lgh_hdr)
82 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
83 cfs_list_del_init(&loghandle->u.phd.phd_entry);
84 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
85 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
86 LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
87 OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
90 OBD_FREE_PTR(loghandle);
92 EXPORT_SYMBOL(llog_free_handle);
94 /* returns negative on error; 0 if success; 1 if success & log destroyed */
95 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
98 struct llog_log_hdr *llh = loghandle->lgh_hdr;
102 CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
103 index, loghandle->lgh_id.lgl_oid);
106 CERROR("Can't cancel index 0 which is header\n");
110 cfs_spin_lock(&loghandle->lgh_hdr_lock);
111 if (!ext2_clear_bit(index, llh->llh_bitmap)) {
112 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
113 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
119 if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
120 (llh->llh_count == 1) &&
121 (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
122 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
123 rc = llog_destroy(env, loghandle);
125 CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
127 loghandle->lgh_ctxt->loc_obd->obd_name,
128 loghandle->lgh_id.lgl_oid,
129 loghandle->lgh_id.lgl_oseq,
130 loghandle->lgh_id.lgl_ogen, rc);
135 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
137 rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
139 CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
141 loghandle->lgh_ctxt->loc_obd->obd_name,
142 loghandle->lgh_id.lgl_oid,
143 loghandle->lgh_id.lgl_oseq,
144 loghandle->lgh_id.lgl_ogen, rc);
149 cfs_spin_lock(&loghandle->lgh_hdr_lock);
150 ext2_set_bit(index, llh->llh_bitmap);
152 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
155 EXPORT_SYMBOL(llog_cancel_rec);
157 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
158 int flags, struct obd_uuid *uuid)
161 struct llog_log_hdr *llh;
163 LASSERT(handle->lgh_hdr == NULL);
168 handle->lgh_hdr = llh;
169 /* first assign flags to use llog_client_ops */
170 llh->llh_flags = flags;
171 rc = llog_read_header(env, handle);
173 flags = llh->llh_flags;
174 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
175 CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
176 (char *)llh->llh_tgtuuid.uuid);
180 } else if (rc != LLOG_EEMPTY || !flags) {
181 /* set a pesudo flag for initialization */
182 flags = LLOG_F_IS_CAT;
187 handle->lgh_last_idx = 0; /* header is record with index 0 */
188 llh->llh_count = 1; /* for the header record */
189 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
190 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
191 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
192 llh->llh_timestamp = cfs_time_current_sec();
194 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
195 llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
196 ext2_set_bit(0, llh->llh_bitmap);
199 if (flags & LLOG_F_IS_CAT) {
200 LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
201 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
202 llh->llh_size = sizeof(struct llog_logid_rec);
203 } else if (!(flags & LLOG_F_IS_PLAIN)) {
204 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
205 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
211 handle->lgh_hdr = NULL;
215 EXPORT_SYMBOL(llog_init_handle);
217 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
219 struct llog_operations *lop;
223 rc = llog_handle2ops(loghandle, &lop);
226 if (lop->lop_close == NULL)
227 GOTO(out, -EOPNOTSUPP);
228 rc = lop->lop_close(env, loghandle);
230 llog_free_handle(loghandle);
233 EXPORT_SYMBOL(llog_close);
235 static int llog_process_thread(void *arg)
237 struct llog_process_info *lpi = arg;
238 struct llog_handle *loghandle = lpi->lpi_loghandle;
239 struct llog_log_hdr *llh = loghandle->lgh_hdr;
240 struct llog_process_cat_data *cd = lpi->lpi_catdata;
242 __u64 cur_offset = LLOG_CHUNK_SIZE;
244 int rc = 0, index = 1, last_index;
246 int last_called_index = 0;
252 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
254 lpi->lpi_rc = -ENOMEM;
259 last_called_index = cd->lpcd_first_idx;
260 index = cd->lpcd_first_idx + 1;
262 if (cd != NULL && cd->lpcd_last_idx)
263 last_index = cd->lpcd_last_idx;
265 last_index = LLOG_BITMAP_BYTES * 8 - 1;
268 struct llog_rec_hdr *rec;
270 /* skip records not set in bitmap */
271 while (index <= last_index &&
272 !ext2_test_bit(index, llh->llh_bitmap))
275 LASSERT(index <= last_index + 1);
276 if (index == last_index + 1)
279 CDEBUG(D_OTHER, "index: %d last_index %d\n",
282 /* get the buf with our target record; avoid old garbage */
283 memset(buf, 0, LLOG_CHUNK_SIZE);
284 last_offset = cur_offset;
285 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
286 index, &cur_offset, buf, LLOG_CHUNK_SIZE);
290 /* NB: when rec->lrh_len is accessed it is already swabbed
291 * since it is used at the "end" of the loop and the rec
292 * swabbing is done at the beginning of the loop. */
293 for (rec = (struct llog_rec_hdr *)buf;
294 (char *)rec < buf + LLOG_CHUNK_SIZE;
295 rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
297 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
300 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
301 lustre_swab_llog_rec(rec);
303 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
304 rec->lrh_type, rec->lrh_index);
306 if (rec->lrh_index == 0) {
307 /* probably another rec just got added? */
308 if (index <= loghandle->lgh_last_idx)
309 GOTO(repeat, rc = 0);
310 GOTO(out, rc = 0); /* no more records */
312 if (rec->lrh_len == 0 ||
313 rec->lrh_len > LLOG_CHUNK_SIZE) {
314 CWARN("invalid length %d in llog record for "
315 "index %d/%d\n", rec->lrh_len,
316 rec->lrh_index, index);
317 GOTO(out, rc = -EINVAL);
320 if (rec->lrh_index < index) {
321 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
327 "lrh_index: %d lrh_len: %d (%d remains)\n",
328 rec->lrh_index, rec->lrh_len,
329 (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
331 loghandle->lgh_cur_idx = rec->lrh_index;
332 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
335 /* if set, process the callback on this record */
336 if (ext2_test_bit(index, llh->llh_bitmap)) {
337 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
339 last_called_index = index;
340 if (rc == LLOG_PROC_BREAK) {
342 } else if (rc == LLOG_DEL_RECORD) {
343 llog_cancel_rec(lpi->lpi_env,
351 CDEBUG(D_OTHER, "Skipped index %d\n", index);
354 /* next record, still in buffer? */
356 if (index > last_index)
363 cd->lpcd_last_idx = last_called_index;
365 OBD_FREE(buf, LLOG_CHUNK_SIZE);
371 static int llog_process_thread_daemonize(void *arg)
373 struct llog_process_info *lpi = arg;
377 cfs_daemonize_ctxt("llog_process_thread");
379 /* client env has no keys, tags is just 0 */
380 rc = lu_env_init(&env, LCT_LOCAL);
385 rc = llog_process_thread(arg);
389 cfs_complete(&lpi->lpi_completion);
394 int llog_process_or_fork(const struct lu_env *env,
395 struct llog_handle *loghandle,
396 llog_cb_t cb, void *data, void *catdata, bool fork)
398 struct llog_process_info *lpi;
405 CERROR("cannot alloc pointer\n");
408 lpi->lpi_loghandle = loghandle;
410 lpi->lpi_cbdata = data;
411 lpi->lpi_catdata = catdata;
415 /* The new thread can't use parent env,
416 * init the new one in llog_process_thread_daemonize. */
418 cfs_init_completion(&lpi->lpi_completion);
419 rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
422 CERROR("%s: cannot start thread: rc = %d\n",
423 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
427 cfs_wait_for_completion(&lpi->lpi_completion);
430 llog_process_thread(lpi);
434 llog_process_thread(lpi);
441 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
442 llog_cb_t cb, void *data, void *catdata)
444 return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
446 EXPORT_SYMBOL(llog_process);
448 inline int llog_get_size(struct llog_handle *loghandle)
450 if (loghandle && loghandle->lgh_hdr)
451 return loghandle->lgh_hdr->llh_count;
454 EXPORT_SYMBOL(llog_get_size);
456 int llog_reverse_process(const struct lu_env *env,
457 struct llog_handle *loghandle, llog_cb_t cb,
458 void *data, void *catdata)
460 struct llog_log_hdr *llh = loghandle->lgh_hdr;
461 struct llog_process_cat_data *cd = catdata;
463 int rc = 0, first_index = 1, index, idx;
466 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
471 first_index = cd->lpcd_first_idx + 1;
472 if (cd != NULL && cd->lpcd_last_idx)
473 index = cd->lpcd_last_idx;
475 index = LLOG_BITMAP_BYTES * 8 - 1;
478 struct llog_rec_hdr *rec;
479 struct llog_rec_tail *tail;
481 /* skip records not set in bitmap */
482 while (index >= first_index &&
483 !ext2_test_bit(index, llh->llh_bitmap))
486 LASSERT(index >= first_index - 1);
487 if (index == first_index - 1)
490 /* get the buf with our target record; avoid old garbage */
491 memset(buf, 0, LLOG_CHUNK_SIZE);
492 rc = llog_prev_block(env, loghandle, index, buf,
498 idx = rec->lrh_index;
499 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
500 while (idx < index) {
501 rec = (void *)rec + rec->lrh_len;
502 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
503 lustre_swab_llog_rec(rec);
506 LASSERT(idx == index);
507 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
509 /* process records in buffer, starting where we found one */
510 while ((void *)tail > buf) {
511 if (tail->lrt_index == 0)
512 GOTO(out, rc = 0); /* no more records */
514 /* if set, process the callback on this record */
515 if (ext2_test_bit(index, llh->llh_bitmap)) {
516 rec = (void *)tail - tail->lrt_len +
519 rc = cb(env, loghandle, rec, data);
520 if (rc == LLOG_PROC_BREAK) {
522 } else if (rc == LLOG_DEL_RECORD) {
523 llog_cancel_rec(env, loghandle,
531 /* previous record, still in buffer? */
533 if (index < first_index)
535 tail = (void *)tail - tail->lrt_len;
541 OBD_FREE(buf, LLOG_CHUNK_SIZE);
544 EXPORT_SYMBOL(llog_reverse_process);