Whamcloud - gitweb
make CATALOG processing more safe.
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_cat.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  *
40  * Invariants in implementation:
41  * - we do not share logs among different OST<->MDS connections, so that
42  *   if an OST or MDS fails it need only look at log(s) relevant to itself
43  *
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  */
46
47 #define DEBUG_SUBSYSTEM S_LOG
48
49 #ifndef EXPORT_SYMTAB
50 #define EXPORT_SYMTAB
51 #endif
52
53 #ifndef __KERNEL__
54 #include <liblustre.h>
55 #endif
56
57 #include <obd_class.h>
58 #include <lustre_log.h>
59 #include <libcfs/list.h>
60
61 /* Create a new log handle and add it to the open list.
62  * This log handle will be closed when all of the records in it are removed.
63  *
64  * Assumes caller has already pushed us into the kernel context and is locking.
65  */
66 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
67                                             struct llog_logid *lid)
68 {
69         struct llog_handle *loghandle;
70         struct llog_log_hdr *llh;
71         struct llog_logid_rec rec = { { 0 }, };
72         int rc, index, bitmap_size;
73         ENTRY;
74
75         llh = cathandle->lgh_hdr;
76         bitmap_size = LLOG_BITMAP_SIZE(llh);
77
78         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
79
80         /* maximum number of available slots in catlog is bitmap_size - 2 */
81         if (llh->llh_cat_idx == index) {
82                 CERROR("no free catalog slots for log...\n");
83                 RETURN(ERR_PTR(-ENOSPC));
84         }
85
86         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
87                 RETURN(ERR_PTR(-ENOSPC));
88
89         if (lid != NULL && lid->lgl_oid == 0)
90                 lid = NULL;
91
92         rc = llog_create(cathandle->lgh_ctxt, &loghandle, lid, NULL);
93         if (rc)
94                 RETURN(ERR_PTR(rc));
95
96         rc = llog_init_handle(loghandle,
97                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
98                               &cathandle->lgh_hdr->llh_tgtuuid);
99         if (rc)
100                 GOTO(out_destroy, rc);
101
102         if (index == 0)
103                 index = 1;
104         if (ext2_set_bit(index, llh->llh_bitmap)) {
105                 CERROR("argh, index %u already set in log bitmap?\n",
106                        index);
107                 LBUG(); /* should never happen */
108         }
109         cathandle->lgh_last_idx = index;
110         llh->llh_count++;
111         llh->llh_tail.lrt_index = index;
112
113         CDEBUG(D_RPCTRACE,"new recovery log "LPX64":%x for index %u of catalog "
114                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
115                index, cathandle->lgh_id.lgl_oid);
116         /* build the record for this log in the catalog */
117         rec.lid_hdr.lrh_len = sizeof(rec);
118         rec.lid_hdr.lrh_index = index;
119         rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
120         rec.lid_id = loghandle->lgh_id;
121         rec.lid_tail.lrt_len = sizeof(rec);
122         rec.lid_tail.lrt_index = index;
123
124         /* update the catalog: header and record */
125         rc = llog_write_rec(cathandle, &rec.lid_hdr,
126                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
127         if (rc < 0) {
128                 GOTO(out_destroy, rc);
129         }
130
131         loghandle->lgh_hdr->llh_cat_idx = index;
132         cathandle->u.chd.chd_current_log = loghandle;
133         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
134         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
135
136 out_destroy:
137         if (rc < 0)
138                 llog_destroy(loghandle);
139
140         RETURN(loghandle);
141 }
142
143 /* Open an existent log handle and add it to the open list.
144  * This log handle will be closed when all of the records in it are removed.
145  *
146  * Assumes caller has already pushed us into the kernel context and is locking.
147  * We return a lock on the handle to ensure nobody yanks it from us.
148  */
149 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
150                        struct llog_logid *logid)
151 {
152         struct llog_handle *loghandle;
153         int rc = 0;
154         ENTRY;
155
156         if (cathandle == NULL)
157                 RETURN(-EBADF);
158
159         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
160                             u.phd.phd_entry) {
161                 struct llog_logid *cgl = &loghandle->lgh_id;
162                 if (cgl->lgl_oid == logid->lgl_oid) {
163                         if (cgl->lgl_ogen != logid->lgl_ogen) {
164                                 CERROR("log "LPX64" generation %x != %x\n",
165                                        logid->lgl_oid, cgl->lgl_ogen,
166                                        logid->lgl_ogen);
167                                 continue;
168                         }
169                         loghandle->u.phd.phd_cat_handle = cathandle;
170                         GOTO(out, rc = 0);
171                 }
172         }
173
174         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
175         if (rc) {
176                 CERROR("error opening log id "LPX64":%x: rc %d\n",
177                        logid->lgl_oid, logid->lgl_ogen, rc);
178         } else {
179                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
180                 if (!rc) {
181                         list_add(&loghandle->u.phd.phd_entry,
182                                  &cathandle->u.chd.chd_head);
183                 }
184         }
185         if (!rc) {
186                 loghandle->u.phd.phd_cat_handle = cathandle;
187                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
188                 loghandle->u.phd.phd_cookie.lgc_index = 
189                         loghandle->lgh_hdr->llh_cat_idx;
190         }
191
192 out:
193         *res = loghandle;
194         RETURN(rc);
195 }
196
197 int llog_cat_put(struct llog_handle *cathandle)
198 {
199         struct llog_handle *loghandle, *n;
200         int rc;
201         ENTRY;
202
203         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
204                                  u.phd.phd_entry) {
205                 int err = llog_close(loghandle);
206                 if (err)
207                         CERROR("error closing loghandle\n");
208         }
209         rc = llog_close(cathandle);
210         RETURN(rc);
211 }
212 EXPORT_SYMBOL(llog_cat_put);
213
214 /* Return the currently active log handle.  If the current log handle doesn't
215  * have enough space left for the current record, start a new one.
216  *
217  * If reclen is 0, we only want to know what the currently active log is,
218  * otherwise we get a lock on this log so nobody can steal our space.
219  *
220  * Assumes caller has already pushed us into the kernel context and is locking.
221  *
222  * NOTE: loghandle is write-locked upon successful return
223  */
224 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
225                                                 struct llog_logid *lid,
226                                                 int create)
227 {
228         struct llog_handle *loghandle = NULL;
229         ENTRY;
230
231         down_read(&cathandle->lgh_lock);
232         loghandle = cathandle->u.chd.chd_current_log;
233         if (loghandle) {
234                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
235                 down_write(&loghandle->lgh_lock);
236                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
237                         up_read(&cathandle->lgh_lock);
238                         RETURN(loghandle);
239                 } else {
240                         lid = NULL;
241                         up_write(&loghandle->lgh_lock);
242                 }
243         }
244         if (!create) {
245                 if (loghandle)
246                         down_write(&loghandle->lgh_lock);
247                 up_read(&cathandle->lgh_lock);
248                 RETURN(loghandle);
249         }
250         up_read(&cathandle->lgh_lock);
251
252         /* time to create new log */
253
254         /* first, we have to make sure the state hasn't changed */
255         down_write(&cathandle->lgh_lock);
256         loghandle = cathandle->u.chd.chd_current_log;
257         if (loghandle) {
258                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
259                 down_write(&loghandle->lgh_lock);
260                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
261                         up_write(&cathandle->lgh_lock);
262                         RETURN(loghandle);
263                 } else {
264                         lid = NULL;
265                         up_write(&loghandle->lgh_lock);
266                 }
267         }
268
269         CDEBUG(D_INODE, "creating new log\n");
270         loghandle = llog_cat_new_log(cathandle, lid);
271         if (!IS_ERR(loghandle))
272                 down_write(&loghandle->lgh_lock);
273         up_write(&cathandle->lgh_lock);
274         RETURN(loghandle);
275 }
276
277 /* Add a single record to the recovery log(s) using a catalog
278  * Returns as llog_write_record
279  *
280  * Assumes caller has already pushed us into the kernel context.
281  */
282 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
283                      struct llog_cookie *reccookie, void *buf)
284 {
285         struct llog_handle *loghandle;
286         int rc;
287         ENTRY;
288
289         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
290         loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
291         if (IS_ERR(loghandle))
292                 RETURN(PTR_ERR(loghandle));
293         /* loghandle is already locked by llog_cat_current_log() for us */
294         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
295         up_write(&loghandle->lgh_lock);
296         if (rc == -ENOSPC) {
297                 /* to create a new plain log */
298                 loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
299                 if (IS_ERR(loghandle))
300                         RETURN(PTR_ERR(loghandle));
301                 rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
302                 up_write(&loghandle->lgh_lock);
303         }
304
305         RETURN(rc);
306 }
307 EXPORT_SYMBOL(llog_cat_add_rec);
308
309 /* For each cookie in the cookie array, we clear the log in-use bit and either:
310  * - the log is empty, so mark it free in the catalog header and delete it
311  * - the log is not empty, just write out the log header
312  *
313  * The cookies may be in different log files, so we need to get new logs
314  * each time.
315  *
316  * Assumes caller has already pushed us into the kernel context.
317  */
318 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
319                         struct llog_cookie *cookies)
320 {
321         int i, index, rc = 0;
322         ENTRY;
323
324         down_write(&cathandle->lgh_lock);
325         for (i = 0; i < count; i++, cookies++) {
326                 struct llog_handle *loghandle;
327                 struct llog_logid *lgl = &cookies->lgc_lgl;
328
329                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
330                 if (rc) {
331                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
332                         break;
333                 }
334
335                 down_write(&loghandle->lgh_lock);
336                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
337                 up_write(&loghandle->lgh_lock);
338
339                 if (rc == 1) {          /* log has been destroyed */
340                         index = loghandle->u.phd.phd_cookie.lgc_index;
341                         if (cathandle->u.chd.chd_current_log == loghandle)
342                                 cathandle->u.chd.chd_current_log = NULL;
343                         llog_free_handle(loghandle);
344
345                         LASSERT(index);
346                         llog_cat_set_first_idx(cathandle, index);
347                         rc = llog_cancel_rec(cathandle, index);
348                         if (rc == 0)
349                                 CDEBUG(D_RPCTRACE,"cancel plain log at index %u"
350                                        " of catalog "LPX64"\n",
351                                        index, cathandle->lgh_id.lgl_oid);
352                 }
353         }
354         up_write(&cathandle->lgh_lock);
355
356         RETURN(rc);
357 }
358 EXPORT_SYMBOL(llog_cat_cancel_records);
359
360 int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
361                         void *data)
362 {
363         struct llog_process_data *d = data;
364         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
365         struct llog_handle *llh;
366         int rc;
367
368         ENTRY;
369         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
370                 CERROR("invalid record in catalog\n");
371                 RETURN(-EINVAL);
372         }
373         CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "
374                LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
375                rec->lrh_index, cat_llh->lgh_id.lgl_oid);
376
377         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
378         if (rc) {
379                 CERROR("Cannot find handle for log "LPX64"\n",
380                        lir->lid_id.lgl_oid);
381                 RETURN(rc);
382         }
383
384         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
385         RETURN(rc);
386 }
387
388 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
389 {
390         struct llog_process_data d;
391         struct llog_process_cat_data cd;
392         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
393         int rc;
394         ENTRY;
395
396         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
397         d.lpd_data = data;
398         d.lpd_cb = cb;
399
400         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
401                 CWARN("catlog "LPX64" crosses index zero\n",
402                       cat_llh->lgh_id.lgl_oid);
403
404                 cd.lpcd_first_idx = llh->llh_cat_idx;
405                 cd.lpcd_last_idx = 0;
406                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
407                 if (rc != 0)
408                         RETURN(rc);
409
410                 cd.lpcd_first_idx = 0;
411                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
412                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
413         } else {
414                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
415         }
416
417         RETURN(rc);
418 }
419 EXPORT_SYMBOL(llog_cat_process);
420
421 #ifdef __KERNEL__
422 int llog_cat_process_thread(void *data)
423 {
424         struct llog_process_cat_args *args = data;
425         struct llog_ctxt *ctxt = args->lpca_ctxt;
426         struct llog_handle *llh = NULL;
427         void  *cb = args->lpca_cb;
428         struct llog_logid logid;
429         int rc;
430         ENTRY;
431
432         cfs_daemonize_ctxt("ll_log_process");
433
434         logid = *(struct llog_logid *)(args->lpca_arg);
435         rc = llog_create(ctxt, &llh, &logid, NULL);
436         if (rc) {
437                 CERROR("llog_create() failed %d\n", rc);
438                 GOTO(out, rc);
439         }
440         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
441         if (rc) {
442                 CERROR("llog_init_handle failed %d\n", rc);
443                 GOTO(release_llh, rc);
444         }
445
446         if (cb) {
447                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
448                 if (rc != LLOG_PROC_BREAK)
449                         CERROR("llog_cat_process() failed %d\n", rc);
450         } else {
451                 CWARN("No callback function for recovery\n");
452         }
453
454         /* 
455          * Make sure that all cached data is sent. 
456          */
457         llog_sync(ctxt, NULL);
458         EXIT;
459 release_llh:
460         rc = llog_cat_put(llh);
461         if (rc)
462                 CERROR("llog_cat_put() failed %d\n", rc);
463 out:
464         llog_ctxt_put(ctxt);
465         OBD_FREE(args, sizeof(*args));
466         return rc;
467 }
468 EXPORT_SYMBOL(llog_cat_process_thread);
469 #endif
470
471 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh,
472                                        struct llog_rec_hdr *rec, void *data)
473 {
474         struct llog_process_data *d = data;
475         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
476         struct llog_handle *llh;
477         int rc;
478
479         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
480                 CERROR("invalid record in catalog\n");
481                 RETURN(-EINVAL);
482         }
483         CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "
484                LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
485                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
486
487         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
488         if (rc) {
489                 CERROR("Cannot find handle for log "LPX64"\n",
490                        lir->lid_id.lgl_oid);
491                 RETURN(rc);
492         }
493
494         rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
495         RETURN(rc);
496 }
497
498 int llog_cat_reverse_process(struct llog_handle *cat_llh,
499                              llog_cb_t cb, void *data)
500 {
501         struct llog_process_data d;
502         struct llog_process_cat_data cd;
503         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
504         int rc;
505         ENTRY;
506
507         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
508         d.lpd_data = data;
509         d.lpd_cb = cb;
510
511         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
512                 CWARN("catalog "LPX64" crosses index zero\n",
513                       cat_llh->lgh_id.lgl_oid);
514
515                 cd.lpcd_first_idx = 0;
516                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
517                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
518                                           &d, &cd);
519                 if (rc != 0)
520                         RETURN(rc);
521
522                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
523                 cd.lpcd_last_idx = 0;
524                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
525                                           &d, &cd);
526         } else {
527                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
528                                           &d, NULL);
529         }
530
531         RETURN(rc);
532 }
533 EXPORT_SYMBOL(llog_cat_reverse_process);
534
535 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
536 {
537         struct llog_log_hdr *llh = cathandle->lgh_hdr;
538         int i, bitmap_size, idx;
539         ENTRY;
540
541         bitmap_size = LLOG_BITMAP_SIZE(llh);
542         if (llh->llh_cat_idx == (index - 1)) {
543                 idx = llh->llh_cat_idx + 1;
544                 llh->llh_cat_idx = idx;
545                 if (idx == cathandle->lgh_last_idx)
546                         goto out;
547                 for (i = (index + 1) % bitmap_size;
548                      i != cathandle->lgh_last_idx;
549                      i = (i + 1) % bitmap_size) {
550                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
551                                 idx = llh->llh_cat_idx + 1;
552                                 llh->llh_cat_idx = idx;
553                         } else if (i == 0) {
554                                 llh->llh_cat_idx = 0;
555                         } else {
556                                 break;
557                         }
558                 }
559 out:
560                 CDEBUG(D_RPCTRACE, "set catlog "LPX64" first idx %u\n",
561                        cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
562         }
563
564         RETURN(0);
565 }
566
567 #if 0
568 /* Assumes caller has already pushed us into the kernel context. */
569 int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
570 {
571         struct llog_log_hdr *llh;
572         loff_t offset = 0;
573         int rc = 0;
574         ENTRY;
575
576         LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
577
578         down(&cathandle->lgh_lock);
579         llh = cathandle->lgh_hdr;
580
581         if (i_size_read(cathandle->lgh_file->f_dentry->d_inode) == 0) {
582                 llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
583
584 write_hdr:
585                 rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
586                                    &offset);
587                 if (rc != LLOG_CHUNK_SIZE) {
588                         CERROR("error writing catalog header: rc %d\n", rc);
589                         OBD_FREE(llh, sizeof(*llh));
590                         if (rc >= 0)
591                                 rc = -ENOSPC;
592                 } else
593                         rc = 0;
594         } else {
595                 rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
596                                   &offset);
597                 if (rc != LLOG_CHUNK_SIZE) {
598                         CERROR("error reading catalog header: rc %d\n", rc);
599                         /* Can we do much else if the header is bad? */
600                         goto write_hdr;
601                 } else
602                         rc = 0;
603         }
604
605         cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
606         up(&cathandle->lgh_lock);
607         RETURN(rc);
608 }
609 EXPORT_SYMBOL(llog_cat_init);
610
611 #endif