Whamcloud - gitweb
f08d54b040c05d191e63ff0e14c00886d0319409
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/obdclass/llog.c
35  *
36  * OST<->MDS recovery logging infrastructure.
37  * Invariants in implementation:
38  * - we do not share logs among different OST<->MDS connections, so that
39  *   if an OST or MDS fails it need only look at log(s) relevant to itself
40  *
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49
50 #include <obd_class.h>
51 #include <lustre_log.h>
52 #include <libcfs/list.h>
53 #include "llog_internal.h"
54
55 /* Allocate a new log or catalog handle */
56 struct llog_handle *llog_alloc_handle(void)
57 {
58         struct llog_handle *loghandle;
59         ENTRY;
60
61         OBD_ALLOC(loghandle, sizeof(*loghandle));
62         if (loghandle == NULL)
63                 RETURN(ERR_PTR(-ENOMEM));
64
65         cfs_init_rwsem(&loghandle->lgh_lock);
66
67         RETURN(loghandle);
68 }
69 EXPORT_SYMBOL(llog_alloc_handle);
70
71
72 void llog_free_handle(struct llog_handle *loghandle)
73 {
74         if (!loghandle)
75                 return;
76
77         if (!loghandle->lgh_hdr)
78                 goto out;
79         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
80                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
81         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
82                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
83         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
84
85  out:
86         OBD_FREE(loghandle, sizeof(*loghandle));
87 }
88 EXPORT_SYMBOL(llog_free_handle);
89
90 /* returns negative on error; 0 if success; 1 if success & log destroyed */
91 int llog_cancel_rec(struct llog_handle *loghandle, int index)
92 {
93         struct llog_log_hdr *llh = loghandle->lgh_hdr;
94         int rc = 0;
95         ENTRY;
96
97         CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
98                index, loghandle->lgh_id.lgl_oid);
99
100         if (index == 0) {
101                 CERROR("Can't cancel index 0 which is header\n");
102                 RETURN(-EINVAL);
103         }
104
105         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
106                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
107                 RETURN(-ENOENT);
108         }
109
110         llh->llh_count--;
111
112         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
113             (llh->llh_count == 1) &&
114             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
115                 rc = llog_destroy(loghandle);
116                 if (rc) {
117                         CERROR("Failure destroying log after last cancel: %d\n",
118                                rc);
119                         ext2_set_bit(index, llh->llh_bitmap);
120                         llh->llh_count++;
121                 } else {
122                         rc = 1;
123                 }
124                 RETURN(rc);
125         }
126
127         rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
128         if (rc) {
129                 CERROR("Failure re-writing header %d\n", rc);
130                 ext2_set_bit(index, llh->llh_bitmap);
131                 llh->llh_count++;
132         }
133         RETURN(rc);
134 }
135 EXPORT_SYMBOL(llog_cancel_rec);
136
137 int llog_init_handle(struct llog_handle *handle, int flags,
138                      struct obd_uuid *uuid)
139 {
140         int rc;
141         struct llog_log_hdr *llh;
142         ENTRY;
143         LASSERT(handle->lgh_hdr == NULL);
144
145         OBD_ALLOC(llh, sizeof(*llh));
146         if (llh == NULL)
147                 RETURN(-ENOMEM);
148         handle->lgh_hdr = llh;
149         /* first assign flags to use llog_client_ops */
150         llh->llh_flags = flags;
151         rc = llog_read_header(handle);
152         if (rc == 0) {
153                 flags = llh->llh_flags;
154                 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
155                         CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
156                                (char *)llh->llh_tgtuuid.uuid);
157                         rc = -EEXIST;
158                 }
159                 GOTO(out, rc);
160         } else if (rc != LLOG_EEMPTY || !flags) {
161                 /* set a pesudo flag for initialization */
162                 flags = LLOG_F_IS_CAT;
163                 GOTO(out, rc);
164         }
165         rc = 0;
166
167         handle->lgh_last_idx = 0; /* header is record with index 0 */
168         llh->llh_count = 1;         /* for the header record */
169         llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
170         llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
171         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
172         llh->llh_timestamp = cfs_time_current_sec();
173         if (uuid)
174                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
175         llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
176         ext2_set_bit(0, llh->llh_bitmap);
177
178 out:
179         if (flags & LLOG_F_IS_CAT) {
180                 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
181                 llh->llh_size = sizeof(struct llog_logid_rec);
182         } else if (flags & LLOG_F_IS_PLAIN) {
183                 CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
184         } else {
185                 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
186                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
187                 LBUG();
188         }
189
190         if (rc) {
191                 OBD_FREE(llh, sizeof(*llh));
192                 handle->lgh_hdr = NULL;
193         }
194         RETURN(rc);
195 }
196 EXPORT_SYMBOL(llog_init_handle);
197
198 int llog_close(struct llog_handle *loghandle)
199 {
200         struct llog_operations *lop;
201         int rc;
202         ENTRY;
203
204         rc = llog_handle2ops(loghandle, &lop);
205         if (rc)
206                 GOTO(out, rc);
207         if (lop->lop_close == NULL)
208                 GOTO(out, -EOPNOTSUPP);
209         rc = lop->lop_close(loghandle);
210  out:
211         llog_free_handle(loghandle);
212         RETURN(rc);
213 }
214 EXPORT_SYMBOL(llog_close);
215
216 static int llog_process_thread(void *arg)
217 {
218         struct llog_process_info     *lpi = (struct llog_process_info *)arg;
219         struct llog_handle           *loghandle = lpi->lpi_loghandle;
220         struct llog_log_hdr          *llh = loghandle->lgh_hdr;
221         struct llog_process_cat_data *cd  = lpi->lpi_catdata;
222         char                         *buf;
223         __u64                         cur_offset = LLOG_CHUNK_SIZE;
224         __u64                         last_offset;
225         int                           rc = 0, index = 1, last_index;
226         int                           saved_index = 0, last_called_index = 0;
227
228         LASSERT(llh);
229
230         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
231         if (!buf) {
232                 lpi->lpi_rc = -ENOMEM;
233 #ifdef __KERNEL__
234                 cfs_complete(&lpi->lpi_completion);
235 #endif
236                 return 0;
237         }
238
239         if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
240                 cfs_daemonize_ctxt("llog_process_thread");
241
242         if (cd != NULL) {
243                 last_called_index = cd->lpcd_first_idx;
244                 index = cd->lpcd_first_idx + 1;
245         }
246         if (cd != NULL && cd->lpcd_last_idx)
247                 last_index = cd->lpcd_last_idx;
248         else
249                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
250
251         while (rc == 0) {
252                 struct llog_rec_hdr *rec;
253
254                 /* skip records not set in bitmap */
255                 while (index <= last_index &&
256                        !ext2_test_bit(index, llh->llh_bitmap))
257                         ++index;
258
259                 LASSERT(index <= last_index + 1);
260                 if (index == last_index + 1)
261                         break;
262
263                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
264                        index, last_index);
265
266                 /* get the buf with our target record; avoid old garbage */
267                 memset(buf, 0, LLOG_CHUNK_SIZE);
268                 last_offset = cur_offset;
269                 rc = llog_next_block(loghandle, &saved_index, index,
270                                      &cur_offset, buf, LLOG_CHUNK_SIZE);
271                 if (rc)
272                         GOTO(out, rc);
273
274                 /* NB: when rec->lrh_len is accessed it is already swabbed
275                  * since it is used at the "end" of the loop and the rec
276                  * swabbing is done at the beginning of the loop. */
277                 for (rec = (struct llog_rec_hdr *)buf;
278                      (char *)rec < buf + LLOG_CHUNK_SIZE;
279                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
280
281                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
282                                rec, rec->lrh_type);
283
284                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
285                                 lustre_swab_llog_rec(rec, NULL);
286
287                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
288                                rec->lrh_type, rec->lrh_index);
289
290                         if (rec->lrh_index == 0)
291                                 GOTO(out, 0); /* no more records */
292
293                         if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
294                                 CWARN("invalid length %d in llog record for "
295                                       "index %d/%d\n", rec->lrh_len,
296                                       rec->lrh_index, index);
297                                 GOTO(out, rc = -EINVAL);
298                         }
299
300                         if (rec->lrh_index < index) {
301                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
302                                        rec->lrh_index);
303                                 continue;
304                         }
305
306                         CDEBUG(D_OTHER,
307                                "lrh_index: %d lrh_len: %d (%d remains)\n",
308                                rec->lrh_index, rec->lrh_len,
309                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
310
311                         loghandle->lgh_cur_idx = rec->lrh_index;
312                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
313                                                     last_offset;
314
315                         /* if set, process the callback on this record */
316                         if (ext2_test_bit(index, llh->llh_bitmap)) {
317                                 rc = lpi->lpi_cb(loghandle, rec,
318                                                  lpi->lpi_cbdata);
319                                 last_called_index = index;
320                                 if (rc == LLOG_PROC_BREAK) {
321                                         GOTO(out, rc);
322                                 } else if (rc == LLOG_DEL_RECORD) {
323                                         llog_cancel_rec(loghandle,
324                                                         rec->lrh_index);
325                                         rc = 0;
326                                 }
327                                 if (rc)
328                                         GOTO(out, rc);
329                         } else {
330                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
331                         }
332
333                         /* next record, still in buffer? */
334                         ++index;
335                         if (index > last_index)
336                                 GOTO(out, rc = 0);
337                 }
338         }
339
340  out:
341         if (cd != NULL)
342                 cd->lpcd_last_idx = last_called_index;
343         if (buf)
344                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
345         lpi->lpi_rc = rc;
346 #ifdef __KERNEL__
347         cfs_complete(&lpi->lpi_completion);
348 #endif
349         return 0;
350 }
351
352 int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
353                        void *data, void *catdata, int flags)
354 {
355         struct llog_process_info *lpi;
356         int                      rc;
357         ENTRY;
358
359         OBD_ALLOC_PTR(lpi);
360         if (lpi == NULL) {
361                 CERROR("cannot alloc pointer\n");
362                 RETURN(-ENOMEM);
363         }
364         lpi->lpi_loghandle = loghandle;
365         lpi->lpi_cb        = cb;
366         lpi->lpi_cbdata    = data;
367         lpi->lpi_catdata   = catdata;
368         lpi->lpi_flags     = flags;
369
370 #ifdef __KERNEL__
371         cfs_init_completion(&lpi->lpi_completion);
372         rc = cfs_create_thread(llog_process_thread, lpi, CFS_DAEMON_FLAGS);
373         if (rc < 0) {
374                 CERROR("cannot start thread: %d\n", rc);
375                 OBD_FREE_PTR(lpi);
376                 RETURN(rc);
377         }
378         cfs_wait_for_completion(&lpi->lpi_completion);
379 #else
380         llog_process_thread(lpi);
381 #endif
382         rc = lpi->lpi_rc;
383         OBD_FREE_PTR(lpi);
384         RETURN(rc);
385 }
386 EXPORT_SYMBOL(llog_process_flags);
387
388 int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
389                  void *data, void *catdata)
390 {
391         return llog_process_flags(loghandle, cb, data, catdata, 0);
392 }
393 EXPORT_SYMBOL(llog_process);
394
395 inline int llog_get_size(struct llog_handle *loghandle)
396 {
397         if (loghandle && loghandle->lgh_hdr)
398                 return loghandle->lgh_hdr->llh_count;
399         return 0;
400 }
401 EXPORT_SYMBOL(llog_get_size);
402
403 int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
404                          void *data, void *catdata)
405 {
406         struct llog_log_hdr *llh = loghandle->lgh_hdr;
407         struct llog_process_cat_data *cd = catdata;
408         void *buf;
409         int rc = 0, first_index = 1, index, idx;
410         ENTRY;
411
412         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
413         if (!buf)
414                 RETURN(-ENOMEM);
415
416         if (cd != NULL)
417                 first_index = cd->lpcd_first_idx + 1;
418         if (cd != NULL && cd->lpcd_last_idx)
419                 index = cd->lpcd_last_idx;
420         else
421                 index = LLOG_BITMAP_BYTES * 8 - 1;
422
423         while (rc == 0) {
424                 struct llog_rec_hdr *rec;
425                 struct llog_rec_tail *tail;
426
427                 /* skip records not set in bitmap */
428                 while (index >= first_index &&
429                        !ext2_test_bit(index, llh->llh_bitmap))
430                         --index;
431
432                 LASSERT(index >= first_index - 1);
433                 if (index == first_index - 1)
434                         break;
435
436                 /* get the buf with our target record; avoid old garbage */
437                 memset(buf, 0, LLOG_CHUNK_SIZE);
438                 rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
439                 if (rc)
440                         GOTO(out, rc);
441
442                 rec = buf;
443                 idx = le32_to_cpu(rec->lrh_index);
444                 if (idx < index)
445                         CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
446                 while (idx < index) {
447                         rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
448                         idx ++;
449                 }
450                 tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
451
452                 /* process records in buffer, starting where we found one */
453                 while ((void *)tail > buf) {
454                         rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
455                                 sizeof(*tail);
456
457                         if (rec->lrh_index == 0)
458                                 GOTO(out, 0); /* no more records */
459
460                         /* if set, process the callback on this record */
461                         if (ext2_test_bit(index, llh->llh_bitmap)) {
462                                 rc = cb(loghandle, rec, data);
463                                 if (rc == LLOG_PROC_BREAK) {
464                                         GOTO(out, rc);
465                                 }
466                                 if (rc)
467                                         GOTO(out, rc);
468                         }
469
470                         /* previous record, still in buffer? */
471                         --index;
472                         if (index < first_index)
473                                 GOTO(out, rc = 0);
474                         tail = (void *)rec - sizeof(*tail);
475                 }
476         }
477
478 out:
479         if (buf)
480                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
481         RETURN(rc);
482 }
483 EXPORT_SYMBOL(llog_reverse_process);