Whamcloud - gitweb
d4fa3702acb385bcc8c7f1b7b471efb3202090c9
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_class.h>
42 #include <linux/lustre_log.h>
43 #include <portals/list.h>
44
45 /* Create a new log handle and add it to the open list.
46  * This log handle will be closed when all of the records in it are removed.
47  *
48  * Assumes caller has already pushed us into the kernel context and is locking.
49  */
50 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
51 {
52         struct llog_handle *loghandle;
53         struct llog_log_hdr *llh;
54         struct llog_logid_rec rec = { { 0 }, };
55         int rc, index, bitmap_size;
56         ENTRY;
57
58         llh = cathandle->lgh_hdr;
59         bitmap_size = sizeof(llh->llh_bitmap) * 8;
60
61         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
62
63         /* maximum number of available slots in catlog is bitmap_size - 2 */
64         if (llh->llh_cat_idx == index) {
65                 CERROR("no free catalog slots for log...\n");
66                 RETURN(ERR_PTR(-ENOSPC));
67         } else {
68                 if (index == 0)
69                         index = 1;
70                 if (ext2_set_bit(index, llh->llh_bitmap)) {
71                         CERROR("argh, index %u already set in log bitmap?\n",
72                                index);
73                         LBUG(); /* should never happen */
74                 }
75                 cathandle->lgh_last_idx = index;
76                 llh->llh_count++;
77                 llh->llh_tail.lrt_index = index;
78         }
79
80         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
81         if (rc)
82                 RETURN(ERR_PTR(rc));
83
84         rc = llog_init_handle(loghandle,
85                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
86                               &cathandle->lgh_hdr->llh_tgtuuid);
87         if (rc)
88                 GOTO(out_destroy, rc);
89
90         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
91                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
92                index, cathandle->lgh_id.lgl_oid);
93         /* build the record for this log in the catalog */
94         rec.lid_hdr.lrh_len = sizeof(rec);
95         rec.lid_hdr.lrh_index = index;
96         rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
97         rec.lid_id = loghandle->lgh_id;
98         rec.lid_tail.lrt_len = sizeof(rec);
99         rec.lid_tail.lrt_index = index;
100
101         /* update the catalog: header and record */
102         rc = llog_write_rec(cathandle, &rec.lid_hdr,
103                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
104         if (rc < 0) {
105                 GOTO(out_destroy, rc);
106         }
107
108         loghandle->lgh_hdr->llh_cat_idx = index;
109         cathandle->u.chd.chd_current_log = loghandle;
110         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
111         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
112
113  out_destroy:
114         if (rc < 0)
115                 llog_destroy(loghandle);
116
117         RETURN(loghandle);
118 }
119 EXPORT_SYMBOL(llog_cat_new_log);
120
121 /* Open an existent log handle and add it to the open list.
122  * This log handle will be closed when all of the records in it are removed.
123  *
124  * Assumes caller has already pushed us into the kernel context and is locking.
125  * We return a lock on the handle to ensure nobody yanks it from us.
126  */
127 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
128                        struct llog_logid *logid)
129 {
130         struct llog_handle *loghandle;
131         int rc = 0;
132         ENTRY;
133
134         if (cathandle == NULL)
135                 RETURN(-EBADF);
136
137         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
138                             u.phd.phd_entry) {
139                 struct llog_logid *cgl = &loghandle->lgh_id;
140                 if (cgl->lgl_oid == logid->lgl_oid) {
141                         if (cgl->lgl_ogen != logid->lgl_ogen) {
142                                 CERROR("log "LPX64" generation %x != %x\n",
143                                        logid->lgl_oid, cgl->lgl_ogen,
144                                        logid->lgl_ogen);
145                                 continue;
146                         }
147                         loghandle->u.phd.phd_cat_handle = cathandle;
148                         GOTO(out, rc = 0);
149                 }
150         }
151
152         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
153         if (rc) {
154                 CERROR("error opening log id "LPX64":%x: rc %d\n",
155                        logid->lgl_oid, logid->lgl_ogen, rc);
156         } else {
157                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
158                 if (!rc) {
159                         list_add(&loghandle->u.phd.phd_entry,
160                                  &cathandle->u.chd.chd_head);
161                 }
162         }
163         if (!rc) {
164                 loghandle->u.phd.phd_cat_handle = cathandle;
165                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
166                 loghandle->u.phd.phd_cookie.lgc_index = 
167                         loghandle->lgh_hdr->llh_cat_idx;
168         }
169
170 out:
171         *res = loghandle;
172         RETURN(rc);
173 }
174
175 int llog_cat_put(struct llog_handle *cathandle)
176 {
177         struct llog_handle *loghandle, *n;
178         int rc;
179         ENTRY;
180
181         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
182                                  u.phd.phd_entry) {
183                 int err = llog_close(loghandle);
184                 if (err)
185                         CERROR("error closing loghandle\n");
186         }
187         rc = llog_close(cathandle);
188         RETURN(rc);
189 }
190 EXPORT_SYMBOL(llog_cat_put);
191
192 /* Return the currently active log handle.  If the current log handle doesn't
193  * have enough space left for the current record, start a new one.
194  *
195  * If reclen is 0, we only want to know what the currently active log is,
196  * otherwise we get a lock on this log so nobody can steal our space.
197  *
198  * Assumes caller has already pushed us into the kernel context and is locking.
199  *
200  * NOTE: loghandle is write-locked upon successful return
201  */
202 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
203                                                 int create)
204 {
205         struct llog_handle *loghandle = NULL;
206         ENTRY;
207
208         down_read(&cathandle->lgh_lock);
209         loghandle = cathandle->u.chd.chd_current_log;
210         if (loghandle) {
211                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
212                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
213                         down_write(&loghandle->lgh_lock);
214                         up_read(&cathandle->lgh_lock);
215                         RETURN(loghandle);
216                 }
217         }
218         if (!create) {
219                 if (loghandle)
220                         down_write(&loghandle->lgh_lock);
221                 up_read(&cathandle->lgh_lock);
222                 RETURN(loghandle);
223         }
224         up_read(&cathandle->lgh_lock);
225
226         /* time to create new log */
227
228         /* first, we have to make sure the state hasn't changed */
229         down_write(&cathandle->lgh_lock);
230         loghandle = cathandle->u.chd.chd_current_log;
231         if (loghandle) {
232                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
233                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
234                         down_write(&loghandle->lgh_lock);
235                         up_write(&cathandle->lgh_lock);
236                         RETURN(loghandle);
237                 }
238         }
239
240         CDEBUG(D_INODE, "creating new log\n");
241         loghandle = llog_cat_new_log(cathandle);
242         if (!IS_ERR(loghandle))
243                 down_write(&loghandle->lgh_lock);
244         up_write(&cathandle->lgh_lock);
245         RETURN(loghandle);
246 }
247
248 /* Add a single record to the recovery log(s) using a catalog
249  * Returns as llog_write_record
250  *
251  * Assumes caller has already pushed us into the kernel context.
252  */
253 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
254                      struct llog_cookie *reccookie, void *buf)
255 {
256         struct llog_handle *loghandle;
257         int rc;
258         ENTRY;
259
260         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
261         loghandle = llog_cat_current_log(cathandle, 1);
262         if (IS_ERR(loghandle))
263                 RETURN(PTR_ERR(loghandle));
264         /* loghandle is already locked by llog_cat_current_log() for us */
265         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
266         up_write(&loghandle->lgh_lock);
267         if (rc == -ENOSPC) {
268                 /* to create a new plain log */
269                 loghandle = llog_cat_current_log(cathandle, 1);
270                 if (IS_ERR(loghandle))
271                         RETURN(PTR_ERR(loghandle));
272                 rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
273                 up_write(&loghandle->lgh_lock);
274         }
275
276         RETURN(rc);
277 }
278 EXPORT_SYMBOL(llog_cat_add_rec);
279
280 /* For each cookie in the cookie array, we clear the log in-use bit and either:
281  * - the log is empty, so mark it free in the catalog header and delete it
282  * - the log is not empty, just write out the log header
283  *
284  * The cookies may be in different log files, so we need to get new logs
285  * each time.
286  *
287  * Assumes caller has already pushed us into the kernel context.
288  */
289 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
290                         struct llog_cookie *cookies)
291 {
292         int i, index, rc = 0;
293         ENTRY;
294
295         down_write(&cathandle->lgh_lock);
296         for (i = 0; i < count; i++, cookies++) {
297                 struct llog_handle *loghandle;
298                 struct llog_logid *lgl = &cookies->lgc_lgl;
299
300                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
301                 if (rc) {
302                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
303                         break;
304                 }
305
306                 down_write(&loghandle->lgh_lock);
307                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
308                 up_write(&loghandle->lgh_lock);
309
310                 if (rc == 1) {          /* log has been destroyed */
311                         index = loghandle->u.phd.phd_cookie.lgc_index;
312                         if (cathandle->u.chd.chd_current_log == loghandle)
313                                 cathandle->u.chd.chd_current_log = NULL;
314                         llog_free_handle(loghandle);
315
316                         LASSERT(index);
317                         llog_cat_set_first_idx(cathandle, index);
318                         rc = llog_cancel_rec(cathandle, index);
319                         if (rc == 0)
320                                 CDEBUG(D_HA, "cancel plain log at index %u "
321                                        "of catalog "LPX64"\n",
322                                        index, cathandle->lgh_id.lgl_oid);
323                 }
324         }
325         up_write(&cathandle->lgh_lock);
326
327         RETURN(rc);
328 }
329 EXPORT_SYMBOL(llog_cat_cancel_records);
330
331 int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
332                         void *data)
333 {
334         struct llog_process_data *d = data;
335         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
336         struct llog_handle *llh;
337         int rc;
338
339         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
340                 CERROR("invalid record in catalog\n");
341                 RETURN(-EINVAL);
342         }
343         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
344                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
345                rec->lrh_index, cat_llh->lgh_id.lgl_oid);
346
347         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
348         if (rc) {
349                 CERROR("Cannot find handle for log "LPX64"\n",
350                        lir->lid_id.lgl_oid);
351                 RETURN(rc);
352         }
353
354         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
355         RETURN(rc);
356 }
357
358 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
359 {
360         struct llog_process_data d;
361         struct llog_process_cat_data cd;
362         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
363         int rc;
364         ENTRY;
365
366         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
367         d.lpd_data = data;
368         d.lpd_cb = cb;
369
370         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
371                 CWARN("catlog "LPX64" crosses index zero\n",
372                       cat_llh->lgh_id.lgl_oid);
373
374                 cd.first_idx = llh->llh_cat_idx;
375                 cd.last_idx = 0;
376                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
377                 if (rc != 0)
378                         RETURN(rc);
379
380                 cd.first_idx = 0;
381                 cd.last_idx = cat_llh->lgh_last_idx;
382                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
383         } else {
384                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
385         }
386
387         RETURN(rc);
388 }
389 EXPORT_SYMBOL(llog_cat_process);
390
391 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
392 {
393         struct llog_log_hdr *llh = cathandle->lgh_hdr;
394         int i, bitmap_size, idx;
395         ENTRY;
396
397         bitmap_size = sizeof(llh->llh_bitmap) * 8;
398         if (llh->llh_cat_idx == (index - 1)) {
399                 idx = llh->llh_cat_idx + 1;
400                 llh->llh_cat_idx = idx;
401                 if (idx == cathandle->lgh_last_idx)
402                         goto out;
403                 for (i = (index + 1) % bitmap_size;
404                      i != cathandle->lgh_last_idx;
405                      i = (i + 1) % bitmap_size) {
406                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
407                                 idx = llh->llh_cat_idx + 1;
408                                 llh->llh_cat_idx = idx;
409                         } else if (i == 0) {
410                                 llh->llh_cat_idx = 0;
411                         } else {
412                                 break;
413                         }
414                 }
415 out:
416                 CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
417                        cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
418         }
419
420         RETURN(0);
421 }
422
423 #if 0
424 /* Assumes caller has already pushed us into the kernel context. */
425 int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
426 {
427         struct llog_log_hdr *llh;
428         loff_t offset = 0;
429         int rc = 0;
430         ENTRY;
431
432         LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
433
434         down(&cathandle->lgh_lock);
435         llh = cathandle->lgh_hdr;
436
437         if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
438                 llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
439
440 write_hdr:
441                 rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
442                                    &offset);
443                 if (rc != LLOG_CHUNK_SIZE) {
444                         CERROR("error writing catalog header: rc %d\n", rc);
445                         OBD_FREE(llh, sizeof(*llh));
446                         if (rc >= 0)
447                                 rc = -ENOSPC;
448                 } else
449                         rc = 0;
450         } else {
451                 rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
452                                   &offset);
453                 if (rc != LLOG_CHUNK_SIZE) {
454                         CERROR("error reading catalog header: rc %d\n", rc);
455                         /* Can we do much else if the header is bad? */
456                         goto write_hdr;
457                 } else
458                         rc = 0;
459         }
460
461         cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
462         up(&cathandle->lgh_lock);
463         RETURN(rc);
464 }
465 EXPORT_SYMBOL(llog_cat_init);
466
467 #endif