Whamcloud - gitweb
LU-957 revert OI scrub patch
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/obdclass/llog.c
35  *
36  * OST<->MDS recovery logging infrastructure.
37  * Invariants in implementation:
38  * - we do not share logs among different OST<->MDS connections, so that
39  *   if an OST or MDS fails it need only look at log(s) relevant to itself
40  *
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46 #ifndef EXPORT_SYMTAB
47 #define EXPORT_SYMTAB
48 #endif
49
50 #ifndef __KERNEL__
51 #include <liblustre.h>
52 #endif
53
54 #include <obd_class.h>
55 #include <lustre_log.h>
56 #include <libcfs/list.h>
57 #include "llog_internal.h"
58
59 /* Allocate a new log or catalog handle */
60 struct llog_handle *llog_alloc_handle(void)
61 {
62         struct llog_handle *loghandle;
63         ENTRY;
64
65         OBD_ALLOC(loghandle, sizeof(*loghandle));
66         if (loghandle == NULL)
67                 RETURN(ERR_PTR(-ENOMEM));
68
69         cfs_init_rwsem(&loghandle->lgh_lock);
70
71         RETURN(loghandle);
72 }
73 EXPORT_SYMBOL(llog_alloc_handle);
74
75
76 void llog_free_handle(struct llog_handle *loghandle)
77 {
78         if (!loghandle)
79                 return;
80
81         if (!loghandle->lgh_hdr)
82                 goto out;
83         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
84                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
85         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
86                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
87         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
88
89  out:
90         OBD_FREE(loghandle, sizeof(*loghandle));
91 }
92 EXPORT_SYMBOL(llog_free_handle);
93
94 /* returns negative on error; 0 if success; 1 if success & log destroyed */
95 int llog_cancel_rec(struct llog_handle *loghandle, int index)
96 {
97         struct llog_log_hdr *llh = loghandle->lgh_hdr;
98         int rc = 0;
99         ENTRY;
100
101         CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
102                index, loghandle->lgh_id.lgl_oid);
103
104         if (index == 0) {
105                 CERROR("Can't cancel index 0 which is header\n");
106                 RETURN(-EINVAL);
107         }
108
109         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
110                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
111                 RETURN(-ENOENT);
112         }
113
114         llh->llh_count--;
115
116         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
117             (llh->llh_count == 1) &&
118             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
119                 rc = llog_destroy(loghandle);
120                 if (rc) {
121                         CERROR("Failure destroying log after last cancel: %d\n",
122                                rc);
123                         ext2_set_bit(index, llh->llh_bitmap);
124                         llh->llh_count++;
125                 } else {
126                         rc = 1;
127                 }
128                 RETURN(rc);
129         }
130
131         rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
132         if (rc) {
133                 CERROR("Failure re-writing header %d\n", rc);
134                 ext2_set_bit(index, llh->llh_bitmap);
135                 llh->llh_count++;
136         }
137         RETURN(rc);
138 }
139 EXPORT_SYMBOL(llog_cancel_rec);
140
141 int llog_init_handle(struct llog_handle *handle, int flags,
142                      struct obd_uuid *uuid)
143 {
144         int rc;
145         struct llog_log_hdr *llh;
146         ENTRY;
147         LASSERT(handle->lgh_hdr == NULL);
148
149         OBD_ALLOC(llh, sizeof(*llh));
150         if (llh == NULL)
151                 RETURN(-ENOMEM);
152         handle->lgh_hdr = llh;
153         /* first assign flags to use llog_client_ops */
154         llh->llh_flags = flags;
155         rc = llog_read_header(handle);
156         if (rc == 0) {
157                 flags = llh->llh_flags;
158                 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
159                         CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
160                                (char *)llh->llh_tgtuuid.uuid);
161                         rc = -EEXIST;
162                 }
163                 GOTO(out, rc);
164         } else if (rc != LLOG_EEMPTY || !flags) {
165                 /* set a pesudo flag for initialization */
166                 flags = LLOG_F_IS_CAT;
167                 GOTO(out, rc);
168         }
169         rc = 0;
170
171         handle->lgh_last_idx = 0; /* header is record with index 0 */
172         llh->llh_count = 1;         /* for the header record */
173         llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
174         llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
175         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
176         llh->llh_timestamp = cfs_time_current_sec();
177         if (uuid)
178                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
179         llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
180         ext2_set_bit(0, llh->llh_bitmap);
181
182 out:
183         if (flags & LLOG_F_IS_CAT) {
184                 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
185                 llh->llh_size = sizeof(struct llog_logid_rec);
186         } else if (flags & LLOG_F_IS_PLAIN) {
187                 CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
188         } else {
189                 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
190                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
191                 LBUG();
192         }
193
194         if (rc) {
195                 OBD_FREE(llh, sizeof(*llh));
196                 handle->lgh_hdr = NULL;
197         }
198         RETURN(rc);
199 }
200 EXPORT_SYMBOL(llog_init_handle);
201
202 int llog_close(struct llog_handle *loghandle)
203 {
204         struct llog_operations *lop;
205         int rc;
206         ENTRY;
207
208         rc = llog_handle2ops(loghandle, &lop);
209         if (rc)
210                 GOTO(out, rc);
211         if (lop->lop_close == NULL)
212                 GOTO(out, -EOPNOTSUPP);
213         rc = lop->lop_close(loghandle);
214  out:
215         llog_free_handle(loghandle);
216         RETURN(rc);
217 }
218 EXPORT_SYMBOL(llog_close);
219
220 static int llog_process_thread(void *arg)
221 {
222         struct llog_process_info     *lpi = (struct llog_process_info *)arg;
223         struct llog_handle           *loghandle = lpi->lpi_loghandle;
224         struct llog_log_hdr          *llh = loghandle->lgh_hdr;
225         struct llog_process_cat_data *cd  = lpi->lpi_catdata;
226         char                         *buf;
227         __u64                         cur_offset = LLOG_CHUNK_SIZE;
228         __u64                         last_offset;
229         int                           rc = 0, index = 1, last_index;
230         int                           saved_index = 0, last_called_index = 0;
231
232         LASSERT(llh);
233
234         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
235         if (!buf) {
236                 lpi->lpi_rc = -ENOMEM;
237 #ifdef __KERNEL__
238                 cfs_complete(&lpi->lpi_completion);
239 #endif
240                 return 0;
241         }
242
243         if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
244                 cfs_daemonize_ctxt("llog_process_thread");
245
246         if (cd != NULL) {
247                 last_called_index = cd->lpcd_first_idx;
248                 index = cd->lpcd_first_idx + 1;
249         }
250         if (cd != NULL && cd->lpcd_last_idx)
251                 last_index = cd->lpcd_last_idx;
252         else
253                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
254
255         while (rc == 0) {
256                 struct llog_rec_hdr *rec;
257
258                 /* skip records not set in bitmap */
259                 while (index <= last_index &&
260                        !ext2_test_bit(index, llh->llh_bitmap))
261                         ++index;
262
263                 LASSERT(index <= last_index + 1);
264                 if (index == last_index + 1)
265                         break;
266
267                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
268                        index, last_index);
269
270                 /* get the buf with our target record; avoid old garbage */
271                 memset(buf, 0, LLOG_CHUNK_SIZE);
272                 last_offset = cur_offset;
273                 rc = llog_next_block(loghandle, &saved_index, index,
274                                      &cur_offset, buf, LLOG_CHUNK_SIZE);
275                 if (rc)
276                         GOTO(out, rc);
277
278                 /* NB: when rec->lrh_len is accessed it is already swabbed
279                  * since it is used at the "end" of the loop and the rec
280                  * swabbing is done at the beginning of the loop. */
281                 for (rec = (struct llog_rec_hdr *)buf;
282                      (char *)rec < buf + LLOG_CHUNK_SIZE;
283                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
284
285                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
286                                rec, rec->lrh_type);
287
288                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
289                                 lustre_swab_llog_rec(rec, NULL);
290
291                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
292                                rec->lrh_type, rec->lrh_index);
293
294                         if (rec->lrh_index == 0)
295                                 GOTO(out, 0); /* no more records */
296
297                         if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
298                                 CWARN("invalid length %d in llog record for "
299                                       "index %d/%d\n", rec->lrh_len,
300                                       rec->lrh_index, index);
301                                 GOTO(out, rc = -EINVAL);
302                         }
303
304                         if (rec->lrh_index < index) {
305                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
306                                        rec->lrh_index);
307                                 continue;
308                         }
309
310                         CDEBUG(D_OTHER,
311                                "lrh_index: %d lrh_len: %d (%d remains)\n",
312                                rec->lrh_index, rec->lrh_len,
313                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
314
315                         loghandle->lgh_cur_idx = rec->lrh_index;
316                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
317                                                     last_offset;
318
319                         /* if set, process the callback on this record */
320                         if (ext2_test_bit(index, llh->llh_bitmap)) {
321                                 rc = lpi->lpi_cb(loghandle, rec,
322                                                  lpi->lpi_cbdata);
323                                 last_called_index = index;
324                                 if (rc == LLOG_PROC_BREAK) {
325                                         GOTO(out, rc);
326                                 } else if (rc == LLOG_DEL_RECORD) {
327                                         llog_cancel_rec(loghandle,
328                                                         rec->lrh_index);
329                                         rc = 0;
330                                 }
331                                 if (rc)
332                                         GOTO(out, rc);
333                         } else {
334                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
335                         }
336
337                         /* next record, still in buffer? */
338                         ++index;
339                         if (index > last_index)
340                                 GOTO(out, rc = 0);
341                 }
342         }
343
344  out:
345         if (cd != NULL)
346                 cd->lpcd_last_idx = last_called_index;
347         if (buf)
348                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
349         lpi->lpi_rc = rc;
350 #ifdef __KERNEL__
351         cfs_complete(&lpi->lpi_completion);
352 #endif
353         return 0;
354 }
355
356 int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
357                        void *data, void *catdata, int flags)
358 {
359         struct llog_process_info *lpi;
360         int                      rc;
361         ENTRY;
362
363         OBD_ALLOC_PTR(lpi);
364         if (lpi == NULL) {
365                 CERROR("cannot alloc pointer\n");
366                 RETURN(-ENOMEM);
367         }
368         lpi->lpi_loghandle = loghandle;
369         lpi->lpi_cb        = cb;
370         lpi->lpi_cbdata    = data;
371         lpi->lpi_catdata   = catdata;
372         lpi->lpi_flags     = flags;
373
374 #ifdef __KERNEL__
375         cfs_init_completion(&lpi->lpi_completion);
376         rc = cfs_create_thread(llog_process_thread, lpi, CFS_DAEMON_FLAGS);
377         if (rc < 0) {
378                 CERROR("cannot start thread: %d\n", rc);
379                 OBD_FREE_PTR(lpi);
380                 RETURN(rc);
381         }
382         cfs_wait_for_completion(&lpi->lpi_completion);
383 #else
384         llog_process_thread(lpi);
385 #endif
386         rc = lpi->lpi_rc;
387         OBD_FREE_PTR(lpi);
388         RETURN(rc);
389 }
390 EXPORT_SYMBOL(llog_process_flags);
391
392 int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
393                  void *data, void *catdata)
394 {
395         return llog_process_flags(loghandle, cb, data, catdata, 0);
396 }
397 EXPORT_SYMBOL(llog_process);
398
399 inline int llog_get_size(struct llog_handle *loghandle)
400 {
401         if (loghandle && loghandle->lgh_hdr)
402                 return loghandle->lgh_hdr->llh_count;
403         return 0;
404 }
405 EXPORT_SYMBOL(llog_get_size);
406
407 int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
408                          void *data, void *catdata)
409 {
410         struct llog_log_hdr *llh = loghandle->lgh_hdr;
411         struct llog_process_cat_data *cd = catdata;
412         void *buf;
413         int rc = 0, first_index = 1, index, idx;
414         ENTRY;
415
416         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
417         if (!buf)
418                 RETURN(-ENOMEM);
419
420         if (cd != NULL)
421                 first_index = cd->lpcd_first_idx + 1;
422         if (cd != NULL && cd->lpcd_last_idx)
423                 index = cd->lpcd_last_idx;
424         else
425                 index = LLOG_BITMAP_BYTES * 8 - 1;
426
427         while (rc == 0) {
428                 struct llog_rec_hdr *rec;
429                 struct llog_rec_tail *tail;
430
431                 /* skip records not set in bitmap */
432                 while (index >= first_index &&
433                        !ext2_test_bit(index, llh->llh_bitmap))
434                         --index;
435
436                 LASSERT(index >= first_index - 1);
437                 if (index == first_index - 1)
438                         break;
439
440                 /* get the buf with our target record; avoid old garbage */
441                 memset(buf, 0, LLOG_CHUNK_SIZE);
442                 rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
443                 if (rc)
444                         GOTO(out, rc);
445
446                 rec = buf;
447                 idx = le32_to_cpu(rec->lrh_index);
448                 if (idx < index)
449                         CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
450                 while (idx < index) {
451                         rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
452                         idx ++;
453                 }
454                 tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
455
456                 /* process records in buffer, starting where we found one */
457                 while ((void *)tail > buf) {
458                         rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
459                                 sizeof(*tail);
460
461                         if (rec->lrh_index == 0)
462                                 GOTO(out, 0); /* no more records */
463
464                         /* if set, process the callback on this record */
465                         if (ext2_test_bit(index, llh->llh_bitmap)) {
466                                 rc = cb(loghandle, rec, data);
467                                 if (rc == LLOG_PROC_BREAK) {
468                                         GOTO(out, rc);
469                                 }
470                                 if (rc)
471                                         GOTO(out, rc);
472                         }
473
474                         /* previous record, still in buffer? */
475                         --index;
476                         if (index < first_index)
477                                 GOTO(out, rc = 0);
478                         tail = (void *)rec - sizeof(*tail);
479                 }
480         }
481
482 out:
483         if (buf)
484                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
485         RETURN(rc);
486 }
487 EXPORT_SYMBOL(llog_reverse_process);