Whamcloud - gitweb
b=14608
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_LOG
47
48 #ifndef EXPORT_SYMTAB
49 #define EXPORT_SYMTAB
50 #endif
51
52 #ifndef __KERNEL__
53 #include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_log.h>
58 #include <libcfs/list.h>
59 #include "llog_internal.h"
60
61 /* Allocate a new log or catalog handle */
62 struct llog_handle *llog_alloc_handle(void)
63 {
64         struct llog_handle *loghandle;
65         ENTRY;
66
67         OBD_ALLOC(loghandle, sizeof(*loghandle));
68         if (loghandle == NULL)
69                 RETURN(ERR_PTR(-ENOMEM));
70
71         init_rwsem(&loghandle->lgh_lock);
72
73         RETURN(loghandle);
74 }
75 EXPORT_SYMBOL(llog_alloc_handle);
76
77
78 void llog_free_handle(struct llog_handle *loghandle)
79 {
80         if (!loghandle)
81                 return;
82
83         if (!loghandle->lgh_hdr)
84                 goto out;
85         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86                 list_del_init(&loghandle->u.phd.phd_entry);
87         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
89         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
90
91  out:
92         OBD_FREE(loghandle, sizeof(*loghandle));
93 }
94 EXPORT_SYMBOL(llog_free_handle);
95
96 /* returns negative on error; 0 if success; 1 if success & log destroyed */
97 int llog_cancel_rec(struct llog_handle *loghandle, int index)
98 {
99         struct llog_log_hdr *llh = loghandle->lgh_hdr;
100         int rc = 0;
101         ENTRY;
102
103         CDEBUG(D_RPCTRACE, "canceling %d in log "LPX64"\n",
104                index, loghandle->lgh_id.lgl_oid);
105
106         if (index == 0) {
107                 CERROR("cannot cancel index 0 (which is header)\n");
108                 RETURN(-EINVAL);
109         }
110
111         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
112                 CDEBUG(D_RPCTRACE, "catalog index %u already clear?\n", index);
113                 RETURN(-EINVAL);
114         }
115
116         llh->llh_count--;
117
118         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
119             (llh->llh_count == 1) &&
120             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
121                 rc = llog_destroy(loghandle);
122                 if (rc) {
123                         CERROR("failure destroying log after last cancel: %d\n",
124                                rc);
125                         ext2_set_bit(index, llh->llh_bitmap);
126                         llh->llh_count++;
127                 } else {
128                         rc = 1;
129                 }
130                 RETURN(rc);
131         }
132
133         rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
134         if (rc) {
135                 CERROR("failure re-writing header %d\n", rc);
136                 ext2_set_bit(index, llh->llh_bitmap);
137                 llh->llh_count++;
138         }
139         RETURN(rc);
140 }
141 EXPORT_SYMBOL(llog_cancel_rec);
142
143 int llog_init_handle(struct llog_handle *handle, int flags,
144                      struct obd_uuid *uuid)
145 {
146         int rc;
147         struct llog_log_hdr *llh;
148         ENTRY;
149         LASSERT(handle->lgh_hdr == NULL);
150
151         OBD_ALLOC(llh, sizeof(*llh));
152         if (llh == NULL)
153                 RETURN(-ENOMEM);
154         handle->lgh_hdr = llh;
155         /* first assign flags to use llog_client_ops */
156         llh->llh_flags = flags;
157         rc = llog_read_header(handle);
158         if (rc == 0) {
159                 flags = llh->llh_flags;
160                 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
161                         CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
162                                (char *)llh->llh_tgtuuid.uuid);
163                         rc = -EEXIST;
164                 }
165                 GOTO(out, rc);
166         } else if (rc != LLOG_EEMPTY || !flags) {
167                 /* set a pesudo flag for initialization */
168                 flags = LLOG_F_IS_CAT;
169                 GOTO(out, rc);
170         }
171         rc = 0;
172
173         handle->lgh_last_idx = 0; /* header is record with index 0 */
174         llh->llh_count = 1;         /* for the header record */
175         llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
176         llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
177         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
178         llh->llh_timestamp = cfs_time_current_sec();
179         if (uuid)
180                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
181         llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
182         ext2_set_bit(0, llh->llh_bitmap);
183
184 out:
185         if (flags & LLOG_F_IS_CAT) {
186                 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
187                 llh->llh_size = sizeof(struct llog_logid_rec);
188         } else if (flags & LLOG_F_IS_PLAIN) {
189                 CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
190         } else {
191                 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
192                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
193                 LBUG();
194         }
195
196         if (rc) {
197                 OBD_FREE(llh, sizeof(*llh));
198                 handle->lgh_hdr = NULL;
199         }
200         RETURN(rc);
201 }
202 EXPORT_SYMBOL(llog_init_handle);
203
204 int llog_close(struct llog_handle *loghandle)
205 {
206         struct llog_operations *lop;
207         int rc;
208         ENTRY;
209
210         rc = llog_handle2ops(loghandle, &lop);
211         if (rc)
212                 GOTO(out, rc);
213         if (lop->lop_close == NULL)
214                 GOTO(out, -EOPNOTSUPP);
215         rc = lop->lop_close(loghandle);
216  out:
217         llog_free_handle(loghandle);
218         RETURN(rc);
219 }
220 EXPORT_SYMBOL(llog_close);
221
222 static int llog_process_thread(void *arg)
223 {
224         struct llog_process_info     *lpi = (struct llog_process_info *)arg;
225         struct llog_handle           *loghandle = lpi->lpi_loghandle;
226         struct llog_log_hdr          *llh = loghandle->lgh_hdr;
227         struct llog_process_cat_data *cd  = lpi->lpi_catdata;
228         char                         *buf;
229         __u64                         cur_offset = LLOG_CHUNK_SIZE;
230         __u64                         last_offset;
231         int                           rc = 0, index = 1, last_index;
232         int                           saved_index = 0, last_called_index = 0;
233
234         LASSERT(llh);
235
236         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
237         if (!buf) {
238                 lpi->lpi_rc = -ENOMEM;
239 #ifdef __KERNEL__
240                 complete(&lpi->lpi_completion);
241 #endif
242                 return 0;
243         }
244
245         cfs_daemonize_ctxt("llog_process_thread");
246
247         if (cd != NULL) {
248                 last_called_index = cd->lpcd_first_idx;
249                 index = cd->lpcd_first_idx + 1;
250         }
251         if (cd != NULL && cd->lpcd_last_idx)
252                 last_index = cd->lpcd_last_idx;
253         else
254                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
255
256         while (rc == 0) {
257                 struct llog_rec_hdr *rec;
258
259                 /* skip records not set in bitmap */
260                 while (index <= last_index &&
261                        !ext2_test_bit(index, llh->llh_bitmap))
262                         ++index;
263
264                 LASSERT(index <= last_index + 1);
265                 if (index == last_index + 1)
266                         break;
267
268                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
269                        index, last_index);
270
271                 /* get the buf with our target record; avoid old garbage */
272                 memset(buf, 0, LLOG_CHUNK_SIZE);
273                 last_offset = cur_offset;
274                 rc = llog_next_block(loghandle, &saved_index, index,
275                                      &cur_offset, buf, LLOG_CHUNK_SIZE);
276                 if (rc)
277                         GOTO(out, rc);
278
279                 /* NB: when rec->lrh_len is accessed it is already swabbed
280                  * since it is used at the "end" of the loop and the rec
281                  * swabbing is done at the beginning of the loop. */
282                 for (rec = (struct llog_rec_hdr *)buf;
283                      (char *)rec < buf + LLOG_CHUNK_SIZE;
284                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
285
286                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
287                                rec, rec->lrh_type);
288
289                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
290                                 lustre_swab_llog_rec(rec, NULL);
291
292                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
293                                rec->lrh_type, rec->lrh_index);
294
295                         if (rec->lrh_index == 0)
296                                 GOTO(out, 0); /* no more records */
297
298                         if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
299                                 CWARN("invalid length %d in llog record for "
300                                       "index %d/%d\n", rec->lrh_len,
301                                       rec->lrh_index, index);
302                                 GOTO(out, rc = -EINVAL);
303                         }
304
305                         if (rec->lrh_index < index) {
306                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
307                                        rec->lrh_index);
308                                 continue;
309                         }
310
311                         CDEBUG(D_OTHER,
312                                "lrh_index: %d lrh_len: %d (%d remains)\n",
313                                rec->lrh_index, rec->lrh_len,
314                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
315
316                         loghandle->lgh_cur_idx = rec->lrh_index;
317                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
318                                                     last_offset;
319
320                         /* if set, process the callback on this record */
321                         if (ext2_test_bit(index, llh->llh_bitmap)) {
322                                 rc = lpi->lpi_cb(loghandle, rec,
323                                                  lpi->lpi_cbdata);
324                                 last_called_index = index;
325                                 if (rc == LLOG_PROC_BREAK) {
326                                         CDEBUG(D_HA, "recovery from log: "LPX64
327                                                ":%x stopped\n",
328                                                loghandle->lgh_id.lgl_oid,
329                                                loghandle->lgh_id.lgl_ogen);
330                                         GOTO(out, rc);
331                                 } else if (rc == LLOG_DEL_RECORD) {
332                                         llog_cancel_rec(loghandle,
333                                                         rec->lrh_index);
334                                         rc = 0;
335                                 }
336                                 if (rc)
337                                         GOTO(out, rc);
338                         } else {
339                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
340                         }
341
342                         /* next record, still in buffer? */
343                         ++index;
344                         if (index > last_index)
345                                 GOTO(out, rc = 0);
346                 }
347         }
348
349  out:
350         if (cd != NULL)
351                 cd->lpcd_last_idx = last_called_index;
352         if (buf)
353                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
354         lpi->lpi_rc = rc;
355 #ifdef __KERNEL__
356         complete(&lpi->lpi_completion);
357 #endif
358         return 0;
359 }
360
361 int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
362                  void *data, void *catdata)
363 {
364         struct llog_process_info *lpi;
365         int                      rc;
366         ENTRY;
367
368         OBD_ALLOC_PTR(lpi);
369         if (lpi == NULL) {
370                 CERROR("cannot alloc pointer\n");
371                 RETURN(-ENOMEM);
372         }
373         lpi->lpi_loghandle = loghandle;
374         lpi->lpi_cb        = cb;
375         lpi->lpi_cbdata    = data;
376         lpi->lpi_catdata   = catdata;
377
378 #ifdef __KERNEL__
379         init_completion(&lpi->lpi_completion);
380         rc = cfs_kernel_thread(llog_process_thread, lpi, CLONE_VM | CLONE_FILES);
381         if (rc < 0) {
382                 CERROR("cannot start thread: %d\n", rc);
383                 OBD_FREE_PTR(lpi);
384                 RETURN(rc);
385         }
386         wait_for_completion(&lpi->lpi_completion);
387 #else
388         llog_process_thread(lpi);
389 #endif
390         rc = lpi->lpi_rc;
391         OBD_FREE_PTR(lpi);
392         RETURN(rc);
393 }
394 EXPORT_SYMBOL(llog_process);
395
396 inline int llog_get_size(struct llog_handle *loghandle)
397 {
398         if (loghandle && loghandle->lgh_hdr)
399                 return loghandle->lgh_hdr->llh_count;
400         return 0;
401 }
402 EXPORT_SYMBOL(llog_get_size);
403
404 int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
405                          void *data, void *catdata)
406 {
407         struct llog_log_hdr *llh = loghandle->lgh_hdr;
408         struct llog_process_cat_data *cd = catdata;
409         void *buf;
410         int rc = 0, first_index = 1, index, idx;
411         ENTRY;
412
413         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
414         if (!buf)
415                 RETURN(-ENOMEM);
416
417         if (cd != NULL)
418                 first_index = cd->lpcd_first_idx + 1;
419         if (cd != NULL && cd->lpcd_last_idx)
420                 index = cd->lpcd_last_idx;
421         else
422                 index = LLOG_BITMAP_BYTES * 8 - 1;
423
424         while (rc == 0) {
425                 struct llog_rec_hdr *rec;
426                 struct llog_rec_tail *tail;
427
428                 /* skip records not set in bitmap */
429                 while (index >= first_index &&
430                        !ext2_test_bit(index, llh->llh_bitmap))
431                         --index;
432
433                 LASSERT(index >= first_index - 1);
434                 if (index == first_index - 1)
435                         break;
436
437                 /* get the buf with our target record; avoid old garbage */
438                 memset(buf, 0, LLOG_CHUNK_SIZE);
439                 rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
440                 if (rc)
441                         GOTO(out, rc);
442
443                 rec = buf;
444                 idx = le32_to_cpu(rec->lrh_index);
445                 if (idx < index)
446                         CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
447                 while (idx < index) {
448                         rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
449                         idx ++;
450                 }
451                 tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
452
453                 /* process records in buffer, starting where we found one */
454                 while ((void *)tail > buf) {
455                         rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
456                                 sizeof(*tail);
457
458                         if (rec->lrh_index == 0)
459                                 GOTO(out, 0); /* no more records */
460
461                         /* if set, process the callback on this record */
462                         if (ext2_test_bit(index, llh->llh_bitmap)) {
463                                 rc = cb(loghandle, rec, data);
464                                 if (rc == LLOG_PROC_BREAK) {
465                                         CWARN("recovery from log: "LPX64":%x"
466                                               " stopped\n",
467                                               loghandle->lgh_id.lgl_oid,
468                                               loghandle->lgh_id.lgl_ogen);
469                                         GOTO(out, rc);
470                                 }
471                                 if (rc)
472                                         GOTO(out, rc);
473                         }
474
475                         /* previous record, still in buffer? */
476                         --index;
477                         if (index < first_index)
478                                 GOTO(out, rc = 0);
479                         tail = (void *)rec - sizeof(*tail);
480                 }
481         }
482
483 out:
484         if (buf)
485                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
486         RETURN(rc);
487 }
488 EXPORT_SYMBOL(llog_reverse_process);