Whamcloud - gitweb
b=16715 NFS cannot files in HEAD, MDS crashing
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_LOG
47
48 #ifndef EXPORT_SYMTAB
49 #define EXPORT_SYMTAB
50 #endif
51
52 #ifndef __KERNEL__
53 #include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_log.h>
58 #include <libcfs/list.h>
59 #include "llog_internal.h"
60
61 /* Allocate a new log or catalog handle */
62 struct llog_handle *llog_alloc_handle(void)
63 {
64         struct llog_handle *loghandle;
65         ENTRY;
66
67         OBD_ALLOC(loghandle, sizeof(*loghandle));
68         if (loghandle == NULL)
69                 RETURN(ERR_PTR(-ENOMEM));
70
71         cfs_init_rwsem(&loghandle->lgh_lock);
72
73         RETURN(loghandle);
74 }
75 EXPORT_SYMBOL(llog_alloc_handle);
76
77
78 void llog_free_handle(struct llog_handle *loghandle)
79 {
80         if (!loghandle)
81                 return;
82
83         if (!loghandle->lgh_hdr)
84                 goto out;
85         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
87         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
89         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
90
91  out:
92         OBD_FREE(loghandle, sizeof(*loghandle));
93 }
94 EXPORT_SYMBOL(llog_free_handle);
95
96 /* returns negative on error; 0 if success; 1 if success & log destroyed */
97 int llog_cancel_rec(struct llog_handle *loghandle, int index)
98 {
99         struct llog_log_hdr *llh = loghandle->lgh_hdr;
100         int rc = 0;
101         ENTRY;
102
103         CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
104                index, loghandle->lgh_id.lgl_oid);
105
106         if (index == 0) {
107                 CERROR("Can't cancel index 0 which is header\n");
108                 RETURN(-EINVAL);
109         }
110
111         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
112                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
113                 RETURN(-ENOENT);
114         }
115
116         llh->llh_count--;
117
118         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
119             (llh->llh_count == 1) &&
120             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
121                 rc = llog_destroy(loghandle);
122                 if (rc) {
123                         CERROR("Failure destroying log after last cancel: %d\n",
124                                rc);
125                         ext2_set_bit(index, llh->llh_bitmap);
126                         llh->llh_count++;
127                 } else {
128                         rc = 1;
129                 }
130                 RETURN(rc);
131         }
132
133         rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
134         if (rc) {
135                 CERROR("Failure re-writing header %d\n", rc);
136                 ext2_set_bit(index, llh->llh_bitmap);
137                 llh->llh_count++;
138         }
139         RETURN(rc);
140 }
141 EXPORT_SYMBOL(llog_cancel_rec);
142
143 int llog_init_handle(struct llog_handle *handle, int flags,
144                      struct obd_uuid *uuid)
145 {
146         int rc;
147         struct llog_log_hdr *llh;
148         ENTRY;
149         LASSERT(handle->lgh_hdr == NULL);
150
151         OBD_ALLOC(llh, sizeof(*llh));
152         if (llh == NULL)
153                 RETURN(-ENOMEM);
154         handle->lgh_hdr = llh;
155         /* first assign flags to use llog_client_ops */
156         llh->llh_flags = flags;
157         rc = llog_read_header(handle);
158         if (rc == 0) {
159                 flags = llh->llh_flags;
160                 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
161                         CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
162                                (char *)llh->llh_tgtuuid.uuid);
163                         rc = -EEXIST;
164                 }
165                 GOTO(out, rc);
166         } else if (rc != LLOG_EEMPTY || !flags) {
167                 /* set a pesudo flag for initialization */
168                 flags = LLOG_F_IS_CAT;
169                 GOTO(out, rc);
170         }
171         rc = 0;
172
173         handle->lgh_last_idx = 0; /* header is record with index 0 */
174         llh->llh_count = 1;         /* for the header record */
175         llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
176         llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
177         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
178         llh->llh_timestamp = cfs_time_current_sec();
179         if (uuid)
180                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
181         llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
182         ext2_set_bit(0, llh->llh_bitmap);
183
184 out:
185         if (flags & LLOG_F_IS_CAT) {
186                 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
187                 llh->llh_size = sizeof(struct llog_logid_rec);
188         } else if (flags & LLOG_F_IS_PLAIN) {
189                 CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
190         } else {
191                 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
192                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
193                 LBUG();
194         }
195
196         if (rc) {
197                 OBD_FREE(llh, sizeof(*llh));
198                 handle->lgh_hdr = NULL;
199         }
200         RETURN(rc);
201 }
202 EXPORT_SYMBOL(llog_init_handle);
203
204 int llog_close(struct llog_handle *loghandle)
205 {
206         struct llog_operations *lop;
207         int rc;
208         ENTRY;
209
210         rc = llog_handle2ops(loghandle, &lop);
211         if (rc)
212                 GOTO(out, rc);
213         if (lop->lop_close == NULL)
214                 GOTO(out, -EOPNOTSUPP);
215         rc = lop->lop_close(loghandle);
216  out:
217         llog_free_handle(loghandle);
218         RETURN(rc);
219 }
220 EXPORT_SYMBOL(llog_close);
221
222 static int llog_process_thread(void *arg)
223 {
224         struct llog_process_info     *lpi = (struct llog_process_info *)arg;
225         struct llog_handle           *loghandle = lpi->lpi_loghandle;
226         struct llog_log_hdr          *llh = loghandle->lgh_hdr;
227         struct llog_process_cat_data *cd  = lpi->lpi_catdata;
228         char                         *buf;
229         __u64                         cur_offset = LLOG_CHUNK_SIZE;
230         __u64                         last_offset;
231         int                           rc = 0, index = 1, last_index;
232         int                           saved_index = 0, last_called_index = 0;
233
234         LASSERT(llh);
235
236         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
237         if (!buf) {
238                 lpi->lpi_rc = -ENOMEM;
239 #ifdef __KERNEL__
240                 cfs_complete(&lpi->lpi_completion);
241 #endif
242                 return 0;
243         }
244
245         cfs_daemonize_ctxt("llog_process_thread");
246
247         if (cd != NULL) {
248                 last_called_index = cd->lpcd_first_idx;
249                 index = cd->lpcd_first_idx + 1;
250         }
251         if (cd != NULL && cd->lpcd_last_idx)
252                 last_index = cd->lpcd_last_idx;
253         else
254                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
255
256         while (rc == 0) {
257                 struct llog_rec_hdr *rec;
258
259                 /* skip records not set in bitmap */
260                 while (index <= last_index &&
261                        !ext2_test_bit(index, llh->llh_bitmap))
262                         ++index;
263
264                 LASSERT(index <= last_index + 1);
265                 if (index == last_index + 1)
266                         break;
267
268                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
269                        index, last_index);
270
271                 /* get the buf with our target record; avoid old garbage */
272                 memset(buf, 0, LLOG_CHUNK_SIZE);
273                 last_offset = cur_offset;
274                 rc = llog_next_block(loghandle, &saved_index, index,
275                                      &cur_offset, buf, LLOG_CHUNK_SIZE);
276                 if (rc)
277                         GOTO(out, rc);
278
279                 /* NB: when rec->lrh_len is accessed it is already swabbed
280                  * since it is used at the "end" of the loop and the rec
281                  * swabbing is done at the beginning of the loop. */
282                 for (rec = (struct llog_rec_hdr *)buf;
283                      (char *)rec < buf + LLOG_CHUNK_SIZE;
284                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
285
286                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
287                                rec, rec->lrh_type);
288
289                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
290                                 lustre_swab_llog_rec(rec, NULL);
291
292                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
293                                rec->lrh_type, rec->lrh_index);
294
295                         if (rec->lrh_index == 0)
296                                 GOTO(out, 0); /* no more records */
297
298                         if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
299                                 CWARN("invalid length %d in llog record for "
300                                       "index %d/%d\n", rec->lrh_len,
301                                       rec->lrh_index, index);
302                                 GOTO(out, rc = -EINVAL);
303                         }
304
305                         if (rec->lrh_index < index) {
306                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
307                                        rec->lrh_index);
308                                 continue;
309                         }
310
311                         CDEBUG(D_OTHER,
312                                "lrh_index: %d lrh_len: %d (%d remains)\n",
313                                rec->lrh_index, rec->lrh_len,
314                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
315
316                         loghandle->lgh_cur_idx = rec->lrh_index;
317                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
318                                                     last_offset;
319
320                         /* if set, process the callback on this record */
321                         if (ext2_test_bit(index, llh->llh_bitmap)) {
322                                 rc = lpi->lpi_cb(loghandle, rec,
323                                                  lpi->lpi_cbdata);
324                                 last_called_index = index;
325                                 if (rc == LLOG_PROC_BREAK) {
326                                         GOTO(out, rc);
327                                 } else if (rc == LLOG_DEL_RECORD) {
328                                         llog_cancel_rec(loghandle,
329                                                         rec->lrh_index);
330                                         rc = 0;
331                                 }
332                                 if (rc)
333                                         GOTO(out, rc);
334                         } else {
335                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
336                         }
337
338                         /* next record, still in buffer? */
339                         ++index;
340                         if (index > last_index)
341                                 GOTO(out, rc = 0);
342                 }
343         }
344
345  out:
346         if (cd != NULL)
347                 cd->lpcd_last_idx = last_called_index;
348         if (buf)
349                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
350         lpi->lpi_rc = rc;
351 #ifdef __KERNEL__
352         cfs_complete(&lpi->lpi_completion);
353 #endif
354         return 0;
355 }
356
357 int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
358                  void *data, void *catdata)
359 {
360         struct llog_process_info *lpi;
361         int                      rc;
362         ENTRY;
363
364         OBD_ALLOC_PTR(lpi);
365         if (lpi == NULL) {
366                 CERROR("cannot alloc pointer\n");
367                 RETURN(-ENOMEM);
368         }
369         lpi->lpi_loghandle = loghandle;
370         lpi->lpi_cb        = cb;
371         lpi->lpi_cbdata    = data;
372         lpi->lpi_catdata   = catdata;
373
374 #ifdef __KERNEL__
375         cfs_init_completion(&lpi->lpi_completion);
376         rc = cfs_kernel_thread(llog_process_thread, lpi, CLONE_VM | CLONE_FILES);
377         if (rc < 0) {
378                 CERROR("cannot start thread: %d\n", rc);
379                 OBD_FREE_PTR(lpi);
380                 RETURN(rc);
381         }
382         cfs_wait_for_completion(&lpi->lpi_completion);
383 #else
384         llog_process_thread(lpi);
385 #endif
386         rc = lpi->lpi_rc;
387         OBD_FREE_PTR(lpi);
388         RETURN(rc);
389 }
390 EXPORT_SYMBOL(llog_process);
391
392 inline int llog_get_size(struct llog_handle *loghandle)
393 {
394         if (loghandle && loghandle->lgh_hdr)
395                 return loghandle->lgh_hdr->llh_count;
396         return 0;
397 }
398 EXPORT_SYMBOL(llog_get_size);
399
400 int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
401                          void *data, void *catdata)
402 {
403         struct llog_log_hdr *llh = loghandle->lgh_hdr;
404         struct llog_process_cat_data *cd = catdata;
405         void *buf;
406         int rc = 0, first_index = 1, index, idx;
407         ENTRY;
408
409         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
410         if (!buf)
411                 RETURN(-ENOMEM);
412
413         if (cd != NULL)
414                 first_index = cd->lpcd_first_idx + 1;
415         if (cd != NULL && cd->lpcd_last_idx)
416                 index = cd->lpcd_last_idx;
417         else
418                 index = LLOG_BITMAP_BYTES * 8 - 1;
419
420         while (rc == 0) {
421                 struct llog_rec_hdr *rec;
422                 struct llog_rec_tail *tail;
423
424                 /* skip records not set in bitmap */
425                 while (index >= first_index &&
426                        !ext2_test_bit(index, llh->llh_bitmap))
427                         --index;
428
429                 LASSERT(index >= first_index - 1);
430                 if (index == first_index - 1)
431                         break;
432
433                 /* get the buf with our target record; avoid old garbage */
434                 memset(buf, 0, LLOG_CHUNK_SIZE);
435                 rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
436                 if (rc)
437                         GOTO(out, rc);
438
439                 rec = buf;
440                 idx = le32_to_cpu(rec->lrh_index);
441                 if (idx < index)
442                         CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
443                 while (idx < index) {
444                         rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
445                         idx ++;
446                 }
447                 tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
448
449                 /* process records in buffer, starting where we found one */
450                 while ((void *)tail > buf) {
451                         rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
452                                 sizeof(*tail);
453
454                         if (rec->lrh_index == 0)
455                                 GOTO(out, 0); /* no more records */
456
457                         /* if set, process the callback on this record */
458                         if (ext2_test_bit(index, llh->llh_bitmap)) {
459                                 rc = cb(loghandle, rec, data);
460                                 if (rc == LLOG_PROC_BREAK) {
461                                         GOTO(out, rc);
462                                 }
463                                 if (rc)
464                                         GOTO(out, rc);
465                         }
466
467                         /* previous record, still in buffer? */
468                         --index;
469                         if (index < first_index)
470                                 GOTO(out, rc = 0);
471                         tail = (void *)rec - sizeof(*tail);
472                 }
473         }
474
475 out:
476         if (buf)
477                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
478         RETURN(rc);
479 }
480 EXPORT_SYMBOL(llog_reverse_process);