Whamcloud - gitweb
back the mmap code out of b1_2
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_class.h>
42 #include <linux/lustre_log.h>
43 #include <portals/list.h>
44
45 /* Create a new log handle and add it to the open list.
46  * This log handle will be closed when all of the records in it are removed.
47  *
48  * Assumes caller has already pushed us into the kernel context and is locking.
49  */
50 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
51 {
52         struct llog_handle *loghandle;
53         struct llog_log_hdr *llh;
54         struct llog_logid_rec rec = { { 0 }, };
55         int rc, index, bitmap_size;
56         ENTRY;
57
58         llh = cathandle->lgh_hdr;
59         bitmap_size = LLOG_BITMAP_SIZE(llh);
60
61         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
62
63         /* maximum number of available slots in catlog is bitmap_size - 2 */
64         if (llh->llh_cat_idx == index) {
65                 CERROR("no free catalog slots for log...\n");
66                 RETURN(ERR_PTR(-ENOSPC));
67         } else {
68                 if (index == 0)
69                         index = 1;
70                 if (ext2_set_bit(index, llh->llh_bitmap)) {
71                         CERROR("argh, index %u already set in log bitmap?\n",
72                                index);
73                         LBUG(); /* should never happen */
74                 }
75                 cathandle->lgh_last_idx = index;
76                 llh->llh_count++;
77                 llh->llh_tail.lrt_index = index;
78         }
79
80         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
81         if (rc)
82                 RETURN(ERR_PTR(rc));
83
84         rc = llog_init_handle(loghandle,
85                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
86                               &cathandle->lgh_hdr->llh_tgtuuid);
87         if (rc)
88                 GOTO(out_destroy, rc);
89
90         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
91                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
92                index, cathandle->lgh_id.lgl_oid);
93         /* build the record for this log in the catalog */
94         rec.lid_hdr.lrh_len = sizeof(rec);
95         rec.lid_hdr.lrh_index = index;
96         rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
97         rec.lid_id = loghandle->lgh_id;
98         rec.lid_tail.lrt_len = sizeof(rec);
99         rec.lid_tail.lrt_index = index;
100
101         /* update the catalog: header and record */
102         rc = llog_write_rec(cathandle, &rec.lid_hdr,
103                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
104         if (rc < 0) {
105                 GOTO(out_destroy, rc);
106         }
107
108         loghandle->lgh_hdr->llh_cat_idx = index;
109         cathandle->u.chd.chd_current_log = loghandle;
110         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
111         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
112
113  out_destroy:
114         if (rc < 0)
115                 llog_destroy(loghandle);
116
117         RETURN(loghandle);
118 }
119 EXPORT_SYMBOL(llog_cat_new_log);
120
121 /* Open an existent log handle and add it to the open list.
122  * This log handle will be closed when all of the records in it are removed.
123  *
124  * Assumes caller has already pushed us into the kernel context and is locking.
125  * We return a lock on the handle to ensure nobody yanks it from us.
126  */
127 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
128                        struct llog_logid *logid)
129 {
130         struct llog_handle *loghandle;
131         int rc = 0;
132         ENTRY;
133
134         if (cathandle == NULL)
135                 RETURN(-EBADF);
136
137         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
138                             u.phd.phd_entry) {
139                 struct llog_logid *cgl = &loghandle->lgh_id;
140                 if (cgl->lgl_oid == logid->lgl_oid) {
141                         if (cgl->lgl_ogen != logid->lgl_ogen) {
142                                 CERROR("log "LPX64" generation %x != %x\n",
143                                        logid->lgl_oid, cgl->lgl_ogen,
144                                        logid->lgl_ogen);
145                                 continue;
146                         }
147                         loghandle->u.phd.phd_cat_handle = cathandle;
148                         GOTO(out, rc = 0);
149                 }
150         }
151
152         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
153         if (rc) {
154                 CERROR("error opening log id "LPX64":%x: rc %d\n",
155                        logid->lgl_oid, logid->lgl_ogen, rc);
156         } else {
157                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
158                 if (!rc) {
159                         list_add(&loghandle->u.phd.phd_entry,
160                                  &cathandle->u.chd.chd_head);
161                 }
162         }
163         if (!rc) {
164                 loghandle->u.phd.phd_cat_handle = cathandle;
165                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
166                 loghandle->u.phd.phd_cookie.lgc_index = 
167                         loghandle->lgh_hdr->llh_cat_idx;
168         }
169
170 out:
171         *res = loghandle;
172         RETURN(rc);
173 }
174
175 int llog_cat_put(struct llog_handle *cathandle)
176 {
177         struct llog_handle *loghandle, *n;
178         int rc;
179         ENTRY;
180
181         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
182                                  u.phd.phd_entry) {
183                 int err = llog_close(loghandle);
184                 if (err)
185                         CERROR("error closing loghandle\n");
186         }
187         rc = llog_close(cathandle);
188         RETURN(rc);
189 }
190 EXPORT_SYMBOL(llog_cat_put);
191
192 /* Return the currently active log handle.  If the current log handle doesn't
193  * have enough space left for the current record, start a new one.
194  *
195  * If reclen is 0, we only want to know what the currently active log is,
196  * otherwise we get a lock on this log so nobody can steal our space.
197  *
198  * Assumes caller has already pushed us into the kernel context and is locking.
199  *
200  * NOTE: loghandle is write-locked upon successful return
201  */
202 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
203                                                 int create)
204 {
205         struct llog_handle *loghandle = NULL;
206         ENTRY;
207
208         down_read(&cathandle->lgh_lock);
209         loghandle = cathandle->u.chd.chd_current_log;
210         if (loghandle) {
211                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
212                 down_write(&loghandle->lgh_lock);
213                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
214                         up_read(&cathandle->lgh_lock);
215                         RETURN(loghandle);
216                 } else {
217                         up_write(&loghandle->lgh_lock);
218                 }
219         }
220         if (!create) {
221                 if (loghandle)
222                         down_write(&loghandle->lgh_lock);
223                 up_read(&cathandle->lgh_lock);
224                 RETURN(loghandle);
225         }
226         up_read(&cathandle->lgh_lock);
227
228         /* time to create new log */
229
230         /* first, we have to make sure the state hasn't changed */
231         down_write(&cathandle->lgh_lock);
232         loghandle = cathandle->u.chd.chd_current_log;
233         if (loghandle) {
234                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
235                 down_write(&loghandle->lgh_lock);
236                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
237                         up_write(&cathandle->lgh_lock);
238                         RETURN(loghandle);
239                 } else {
240                         up_write(&loghandle->lgh_lock);
241                 }
242         }
243
244         CDEBUG(D_INODE, "creating new log\n");
245         loghandle = llog_cat_new_log(cathandle);
246         if (!IS_ERR(loghandle))
247                 down_write(&loghandle->lgh_lock);
248         up_write(&cathandle->lgh_lock);
249         RETURN(loghandle);
250 }
251
252 /* Add a single record to the recovery log(s) using a catalog
253  * Returns as llog_write_record
254  *
255  * Assumes caller has already pushed us into the kernel context.
256  */
257 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
258                      struct llog_cookie *reccookie, void *buf)
259 {
260         struct llog_handle *loghandle;
261         int rc;
262         ENTRY;
263
264         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
265         loghandle = llog_cat_current_log(cathandle, 1);
266         if (IS_ERR(loghandle))
267                 RETURN(PTR_ERR(loghandle));
268         /* loghandle is already locked by llog_cat_current_log() for us */
269         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
270         up_write(&loghandle->lgh_lock);
271         if (rc == -ENOSPC) {
272                 /* to create a new plain log */
273                 loghandle = llog_cat_current_log(cathandle, 1);
274                 if (IS_ERR(loghandle))
275                         RETURN(PTR_ERR(loghandle));
276                 rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
277                 up_write(&loghandle->lgh_lock);
278         }
279
280         RETURN(rc);
281 }
282 EXPORT_SYMBOL(llog_cat_add_rec);
283
284 /* For each cookie in the cookie array, we clear the log in-use bit and either:
285  * - the log is empty, so mark it free in the catalog header and delete it
286  * - the log is not empty, just write out the log header
287  *
288  * The cookies may be in different log files, so we need to get new logs
289  * each time.
290  *
291  * Assumes caller has already pushed us into the kernel context.
292  */
293 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
294                         struct llog_cookie *cookies)
295 {
296         int i, index, rc = 0;
297         ENTRY;
298
299         down_write(&cathandle->lgh_lock);
300         for (i = 0; i < count; i++, cookies++) {
301                 struct llog_handle *loghandle;
302                 struct llog_logid *lgl = &cookies->lgc_lgl;
303
304                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
305                 if (rc) {
306                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
307                         break;
308                 }
309
310                 down_write(&loghandle->lgh_lock);
311                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
312                 up_write(&loghandle->lgh_lock);
313
314                 if (rc == 1) {          /* log has been destroyed */
315                         index = loghandle->u.phd.phd_cookie.lgc_index;
316                         if (cathandle->u.chd.chd_current_log == loghandle)
317                                 cathandle->u.chd.chd_current_log = NULL;
318                         llog_free_handle(loghandle);
319
320                         LASSERT(index);
321                         llog_cat_set_first_idx(cathandle, index);
322                         rc = llog_cancel_rec(cathandle, index);
323                         if (rc == 0)
324                                 CDEBUG(D_HA, "cancel plain log at index %u "
325                                        "of catalog "LPX64"\n",
326                                        index, cathandle->lgh_id.lgl_oid);
327                 }
328         }
329         up_write(&cathandle->lgh_lock);
330
331         RETURN(rc);
332 }
333 EXPORT_SYMBOL(llog_cat_cancel_records);
334
335 int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
336                         void *data)
337 {
338         struct llog_process_data *d = data;
339         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
340         struct llog_handle *llh;
341         int rc;
342
343         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
344                 CERROR("invalid record in catalog\n");
345                 RETURN(-EINVAL);
346         }
347         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
348                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
349                rec->lrh_index, cat_llh->lgh_id.lgl_oid);
350
351         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
352         if (rc) {
353                 CERROR("Cannot find handle for log "LPX64"\n",
354                        lir->lid_id.lgl_oid);
355                 RETURN(rc);
356         }
357
358         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
359         RETURN(rc);
360 }
361
362 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
363 {
364         struct llog_process_data d;
365         struct llog_process_cat_data cd;
366         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
367         int rc;
368         ENTRY;
369
370         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
371         d.lpd_data = data;
372         d.lpd_cb = cb;
373
374         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
375                 CWARN("catlog "LPX64" crosses index zero\n",
376                       cat_llh->lgh_id.lgl_oid);
377
378                 cd.first_idx = llh->llh_cat_idx;
379                 cd.last_idx = 0;
380                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
381                 if (rc != 0)
382                         RETURN(rc);
383
384                 cd.first_idx = 0;
385                 cd.last_idx = cat_llh->lgh_last_idx;
386                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
387         } else {
388                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
389         }
390
391         RETURN(rc);
392 }
393 EXPORT_SYMBOL(llog_cat_process);
394
395 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
396 {
397         struct llog_log_hdr *llh = cathandle->lgh_hdr;
398         int i, bitmap_size, idx;
399         ENTRY;
400
401         bitmap_size = LLOG_BITMAP_SIZE(llh);
402         if (llh->llh_cat_idx == (index - 1)) {
403                 idx = llh->llh_cat_idx + 1;
404                 llh->llh_cat_idx = idx;
405                 if (idx == cathandle->lgh_last_idx)
406                         goto out;
407                 for (i = (index + 1) % bitmap_size;
408                      i != cathandle->lgh_last_idx;
409                      i = (i + 1) % bitmap_size) {
410                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
411                                 idx = llh->llh_cat_idx + 1;
412                                 llh->llh_cat_idx = idx;
413                         } else if (i == 0) {
414                                 llh->llh_cat_idx = 0;
415                         } else {
416                                 break;
417                         }
418                 }
419 out:
420                 CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
421                        cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
422         }
423
424         RETURN(0);
425 }
426
427 #if 0
428 /* Assumes caller has already pushed us into the kernel context. */
429 int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
430 {
431         struct llog_log_hdr *llh;
432         loff_t offset = 0;
433         int rc = 0;
434         ENTRY;
435
436         LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
437
438         down(&cathandle->lgh_lock);
439         llh = cathandle->lgh_hdr;
440
441         if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
442                 llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
443
444 write_hdr:
445                 rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
446                                    &offset);
447                 if (rc != LLOG_CHUNK_SIZE) {
448                         CERROR("error writing catalog header: rc %d\n", rc);
449                         OBD_FREE(llh, sizeof(*llh));
450                         if (rc >= 0)
451                                 rc = -ENOSPC;
452                 } else
453                         rc = 0;
454         } else {
455                 rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
456                                   &offset);
457                 if (rc != LLOG_CHUNK_SIZE) {
458                         CERROR("error reading catalog header: rc %d\n", rc);
459                         /* Can we do much else if the header is bad? */
460                         goto write_hdr;
461                 } else
462                         rc = 0;
463         }
464
465         cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
466         up(&cathandle->lgh_lock);
467         RETURN(rc);
468 }
469 EXPORT_SYMBOL(llog_cat_init);
470
471 #endif