Whamcloud - gitweb
Landing b_recovery
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_class.h>
42 #include <linux/lustre_log.h>
43 #include <portals/list.h>
44
45 /* Create a new log handle and add it to the open list.
46  * This log handle will be closed when all of the records in it are removed.
47  *
48  * Assumes caller has already pushed us into the kernel context and is locking.
49  */
50 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
51 {
52         struct llog_handle *loghandle;
53         struct llog_log_hdr *llh;
54         struct llog_logid_rec rec;
55         int rc, index, bitmap_size;
56         ENTRY;
57
58         llh = cathandle->lgh_hdr;
59         bitmap_size = sizeof(llh->llh_bitmap) * 8;
60
61         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
62
63         /* maximum number of available slots in catlog is bitmap_size - 2 */
64         if (llh->llh_cat_idx == cpu_to_le32(index)) {
65                 CERROR("no free catalog slots for log...\n");
66                 RETURN(ERR_PTR(-ENOSPC));
67         } else {
68                 if (index == 0)
69                         index = 1;
70                 if (ext2_set_bit(index, llh->llh_bitmap)) {
71                         CERROR("argh, index %u already set in log bitmap?\n",
72                                index);
73                         LBUG(); /* should never happen */
74                 }
75                 cathandle->lgh_last_idx = index;
76                 llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
77                 llh->llh_tail.lrt_index = cpu_to_le32(index);
78         }
79
80         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
81         if (rc)
82                 RETURN(ERR_PTR(rc));
83
84         rc = llog_init_handle(loghandle,
85                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
86                               &cathandle->lgh_hdr->llh_tgtuuid);
87         if (rc)
88                 GOTO(out_destroy, rc);
89
90         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
91                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
92                index, cathandle->lgh_id.lgl_oid);
93         /* build the record for this log in the catalog */
94         rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
95         rec.lid_hdr.lrh_index = cpu_to_le32(index);
96         rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
97         rec.lid_id = loghandle->lgh_id;
98         rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
99         rec.lid_tail.lrt_index = cpu_to_le32(index);
100
101         /* update the catalog: header and record */
102         rc = llog_write_rec(cathandle, &rec.lid_hdr,
103                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
104         if (rc < 0) {
105                 GOTO(out_destroy, rc);
106         }
107
108         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
109         cathandle->u.chd.chd_current_log = loghandle;
110         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
111         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
112
113  out_destroy:
114         if (rc < 0)
115                 llog_destroy(loghandle);
116
117         RETURN(loghandle);
118 }
119 EXPORT_SYMBOL(llog_cat_new_log);
120
121 /* Open an existent log handle and add it to the open list.
122  * This log handle will be closed when all of the records in it are removed.
123  *
124  * Assumes caller has already pushed us into the kernel context and is locking.
125  * We return a lock on the handle to ensure nobody yanks it from us.
126  */
127 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
128                        struct llog_logid *logid)
129 {
130         struct llog_handle *loghandle;
131         int rc = 0;
132         ENTRY;
133
134         if (cathandle == NULL)
135                 RETURN(-EBADF);
136
137         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
138                             u.phd.phd_entry) {
139                 struct llog_logid *cgl = &loghandle->lgh_id;
140                 if (cgl->lgl_oid == logid->lgl_oid) {
141                         if (cgl->lgl_ogen != logid->lgl_ogen) {
142                                 CERROR("log "LPX64" generation %x != %x\n",
143                                        logid->lgl_oid, cgl->lgl_ogen,
144                                        logid->lgl_ogen);
145                                 continue;
146                         }
147                         loghandle->u.phd.phd_cat_handle = cathandle;
148                         GOTO(out, rc = 0);
149                 }
150         }
151
152         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
153         if (rc) {
154                 CERROR("error opening log id "LPX64":%x: rc %d\n",
155                        logid->lgl_oid, logid->lgl_ogen, rc);
156         } else {
157                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
158                 if (!rc) {
159                         list_add(&loghandle->u.phd.phd_entry,
160                                  &cathandle->u.chd.chd_head);
161                 }
162         }
163         if (!rc) {
164                 loghandle->u.phd.phd_cat_handle = cathandle;
165                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
166                 loghandle->u.phd.phd_cookie.lgc_index =
167                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
168         }
169
170 out:
171         *res = loghandle;
172         RETURN(rc);
173 }
174
175 int llog_cat_put(struct llog_handle *cathandle)
176 {
177         struct llog_handle *loghandle, *n;
178         int rc;
179         ENTRY;
180
181         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
182                                  u.phd.phd_entry) {
183                 int err = llog_close(loghandle);
184                 if (err)
185                         CERROR("error closing loghandle\n");
186         }
187         rc = llog_close(cathandle);
188         RETURN(rc);
189 }
190 EXPORT_SYMBOL(llog_cat_put);
191
192 /* Return the currently active log handle.  If the current log handle doesn't
193  * have enough space left for the current record, start a new one.
194  *
195  * If reclen is 0, we only want to know what the currently active log is,
196  * otherwise we get a lock on this log so nobody can steal our space.
197  *
198  * Assumes caller has already pushed us into the kernel context and is locking.
199  *
200  * NOTE: loghandle is write-locked upon successful return
201  */
202 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
203                                                 int create)
204 {
205         struct llog_handle *loghandle = NULL;
206         ENTRY;
207
208         down_read(&cathandle->lgh_lock);
209         loghandle = cathandle->u.chd.chd_current_log;
210         if (loghandle) {
211                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
212                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
213                         down_write(&loghandle->lgh_lock);
214                         up_read(&cathandle->lgh_lock);
215                         RETURN(loghandle);
216                 }
217         }
218         if (!create) {
219                 if (loghandle)
220                         down_write(&loghandle->lgh_lock);
221                 up_read(&cathandle->lgh_lock);
222                 RETURN(loghandle);
223         }
224         up_read(&cathandle->lgh_lock);
225
226         /* time to create new log */
227
228         /* first, we have to make sure the state hasn't changed */
229         down_write(&cathandle->lgh_lock);
230         loghandle = cathandle->u.chd.chd_current_log;
231         if (loghandle) {
232                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
233                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
234                         down_write(&loghandle->lgh_lock);
235                         up_write(&cathandle->lgh_lock);
236                         RETURN(loghandle);
237                 }
238         }
239
240         CDEBUG(D_INODE, "creating new log\n");
241         loghandle = llog_cat_new_log(cathandle);
242         if (!IS_ERR(loghandle))
243                 down_write(&loghandle->lgh_lock);
244         up_write(&cathandle->lgh_lock);
245         RETURN(loghandle);
246 }
247
248 /* Add a single record to the recovery log(s) using a catalog
249  * Returns as llog_write_record
250  *
251  * Assumes caller has already pushed us into the kernel context.
252  */
253 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
254                      struct llog_cookie *reccookie, void *buf)
255 {
256         struct llog_handle *loghandle;
257         int rc;
258         ENTRY;
259
260         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
261         loghandle = llog_cat_current_log(cathandle, 1);
262         if (IS_ERR(loghandle))
263                 RETURN(PTR_ERR(loghandle));
264         /* loghandle is already locked by llog_cat_current_log() for us */
265         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
266         up_write(&loghandle->lgh_lock);
267
268         RETURN(rc);
269 }
270 EXPORT_SYMBOL(llog_cat_add_rec);
271
272 /* For each cookie in the cookie array, we clear the log in-use bit and either:
273  * - the log is empty, so mark it free in the catalog header and delete it
274  * - the log is not empty, just write out the log header
275  *
276  * The cookies may be in different log files, so we need to get new logs
277  * each time.
278  *
279  * Assumes caller has already pushed us into the kernel context.
280  */
281 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
282                         struct llog_cookie *cookies)
283 {
284         int i, index, rc = 0;
285         ENTRY;
286
287         down_write(&cathandle->lgh_lock);
288         for (i = 0; i < count; i++, cookies++) {
289                 struct llog_handle *loghandle;
290                 struct llog_logid *lgl = &cookies->lgc_lgl;
291
292                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
293                 if (rc) {
294                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
295                         break;
296                 }
297
298                 down_write(&loghandle->lgh_lock);
299                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
300                 up_write(&loghandle->lgh_lock);
301
302                 if (rc == 1) {          /* log has been destroyed */
303                         index = loghandle->u.phd.phd_cookie.lgc_index;
304                         if (cathandle->u.chd.chd_current_log == loghandle)
305                                 cathandle->u.chd.chd_current_log = NULL;
306                         llog_free_handle(loghandle);
307
308                         LASSERT(index);
309                         llog_cat_set_first_idx(cathandle, index);
310                         rc = llog_cancel_rec(cathandle, index);
311                         if (rc == 0)
312                                 CDEBUG(D_HA, "cancel plain log at index %u "
313                                        "of catalog "LPX64"\n",
314                                        index, cathandle->lgh_id.lgl_oid);
315                 }
316         }
317         up_write(&cathandle->lgh_lock);
318
319         RETURN(rc);
320 }
321 EXPORT_SYMBOL(llog_cat_cancel_records);
322
323 int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
324                         void *data)
325 {
326         struct llog_process_data *d = data;
327         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
328         struct llog_handle *llh;
329         int rc;
330
331         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
332                 CERROR("invalid record in catalog\n");
333                 RETURN(-EINVAL);
334         }
335         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
336                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
337                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
338
339         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
340         if (rc) {
341                 CERROR("Cannot find handle for log "LPX64"\n",
342                        lir->lid_id.lgl_oid);
343                 RETURN(rc);
344         }
345
346         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
347         RETURN(rc);
348 }
349
350 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
351 {
352         struct llog_process_data d;
353         struct llog_process_cat_data cd;
354         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
355         int rc;
356         ENTRY;
357
358         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
359         d.lpd_data = data;
360         d.lpd_cb = cb;
361
362         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
363                 CWARN("catlog "LPX64" crosses index zero\n",
364                       cat_llh->lgh_id.lgl_oid);
365
366                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
367                 cd.last_idx = 0;
368                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
369                 if (rc != 0)
370                         RETURN(rc);
371
372                 cd.first_idx = 0;
373                 cd.last_idx = cat_llh->lgh_last_idx;
374                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
375         } else {
376                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
377         }
378
379         RETURN(rc);
380 }
381 EXPORT_SYMBOL(llog_cat_process);
382
383 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
384 {
385         struct llog_log_hdr *llh = cathandle->lgh_hdr;
386         int i, bitmap_size, idx;
387         ENTRY;
388
389         bitmap_size = sizeof(llh->llh_bitmap) * 8;
390         if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
391                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
392                 llh->llh_cat_idx = cpu_to_le32(idx);
393                 if (idx == cathandle->lgh_last_idx)
394                         goto out;
395                 for (i = (index + 1) % bitmap_size;
396                      i != cathandle->lgh_last_idx;
397                      i = (i + 1) % bitmap_size) {
398                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
399                                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
400                                 llh->llh_cat_idx = cpu_to_le32(idx);
401                         } else if (i == 0) {
402                                 llh->llh_cat_idx = 0;
403                         } else {
404                                 break;
405                         }
406                 }
407 out:
408                 CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
409                        cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
410         }
411
412         RETURN(0);
413 }
414
415 #if 0
416 /* Assumes caller has already pushed us into the kernel context. */
417 int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
418 {
419         struct llog_log_hdr *llh;
420         loff_t offset = 0;
421         int rc = 0;
422         ENTRY;
423
424         LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
425
426         down(&cathandle->lgh_lock);
427         llh = cathandle->lgh_hdr;
428
429         if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
430                 llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
431
432 write_hdr:
433                 rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
434                                    &offset);
435                 if (rc != LLOG_CHUNK_SIZE) {
436                         CERROR("error writing catalog header: rc %d\n", rc);
437                         OBD_FREE(llh, sizeof(*llh));
438                         if (rc >= 0)
439                                 rc = -ENOSPC;
440                 } else
441                         rc = 0;
442         } else {
443                 rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
444                                   &offset);
445                 if (rc != LLOG_CHUNK_SIZE) {
446                         CERROR("error reading catalog header: rc %d\n", rc);
447                         /* Can we do much else if the header is bad? */
448                         goto write_hdr;
449                 } else
450                         rc = 0;
451         }
452
453         cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
454         up(&cathandle->lgh_lock);
455         RETURN(rc);
456 }
457 EXPORT_SYMBOL(llog_cat_init);
458
459 #endif