Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_class.h>
42 #include <linux/lustre_log.h>
43 #include <portals/list.h>
44
45 /* Create a new log handle and add it to the open list.
46  * This log handle will be closed when all of the records in it are removed.
47  *
48  * Assumes caller has already pushed us into the kernel context and is locking.
49  */
50 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
51 {
52         struct llog_handle *loghandle;
53         struct llog_log_hdr *llh;
54         struct llog_logid_rec rec;
55         int rc, index, bitmap_size, i;
56         ENTRY;
57
58         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
59         if (rc)
60                 RETURN(ERR_PTR(rc));
61
62         rc = llog_init_handle(loghandle, 
63                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, 
64                               &cathandle->lgh_hdr->llh_tgtuuid);
65         if (rc)
66                 GOTO(out_destroy, rc);
67
68         /* Find first free entry */
69         llh = cathandle->lgh_hdr;
70         bitmap_size = sizeof(llh->llh_bitmap) * 8;
71         for (i = 0, index = le32_to_cpu(llh->llh_count); i < bitmap_size; 
72              i++, index++) {
73                 index %= bitmap_size;
74                 if (ext2_set_bit(index, llh->llh_bitmap)) {
75                         /* XXX This should trigger log clean up or similar */
76                         CERROR("catalog index %d is still in use\n", index);
77                 } else {
78                         cathandle->lgh_last_idx = index;
79                         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
80                         break;
81                 }
82         }
83         if (i == bitmap_size) {
84                 CERROR("no free catalog slots for log...\n");
85                 GOTO(out_destroy, rc = -ENOSPC);
86         }
87         CWARN("new recovery log "LPX64":%x for index %u of catalog "LPX64"\n",
88                loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, index,
89                cathandle->lgh_id.lgl_oid);
90         /* build the record for this log in the catalog */
91         rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
92         rec.lid_hdr.lrh_index = cpu_to_le32(index);
93         rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
94         rec.lid_id = loghandle->lgh_id;
95         rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
96         rec.lid_tail.lrt_index = cpu_to_le32(index);
97
98         /* update the catalog: header and record */
99         rc = llog_write_rec(cathandle, &rec.lid_hdr, 
100                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
101         if (rc < 0) {
102                 GOTO(out_destroy, rc);
103         }
104
105         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
106         cathandle->u.chd.chd_current_log = loghandle;
107         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
108         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
109
110  out_destroy:
111         if (rc < 0)
112                 llog_destroy(loghandle);
113
114         RETURN(loghandle);
115 }
116 EXPORT_SYMBOL(llog_cat_new_log);
117
118 /* Assumes caller has already pushed us into the kernel context and is locking.
119  * We return a lock on the handle to ensure nobody yanks it from us.
120  */
121 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
122                        struct llog_logid *logid)
123 {
124         struct llog_handle *loghandle;
125         int rc = 0;
126         ENTRY;
127
128         if (cathandle == NULL)
129                 RETURN(-EBADF);
130
131         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head, 
132                             u.phd.phd_entry) {
133                 struct llog_logid *cgl = &loghandle->lgh_id;
134                 if (cgl->lgl_oid == logid->lgl_oid) {
135                         if (cgl->lgl_ogen != logid->lgl_ogen) {
136                                 CERROR("log "LPX64" generation %x != %x\n",
137                                        logid->lgl_oid, cgl->lgl_ogen,
138                                        logid->lgl_ogen);
139                                 continue;
140                         }
141                         loghandle->u.phd.phd_cat_handle = cathandle;
142                         cathandle->u.chd.chd_current_log = loghandle;
143                         GOTO(out, rc = 0);
144                 }
145         }
146
147         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
148         if (rc) {
149                 CERROR("error opening log id "LPX64":%x: rc %d\n",
150                        logid->lgl_oid, logid->lgl_ogen, rc);
151         } else {
152                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
153                 if (!rc) {
154                         list_add(&loghandle->u.phd.phd_entry, 
155                                  &cathandle->u.chd.chd_head);
156                         cathandle->u.chd.chd_current_log = loghandle;
157                 }
158         }
159         if (!rc) {
160                 loghandle->u.phd.phd_cat_handle = cathandle;
161                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
162                 loghandle->u.phd.phd_cookie.lgc_index = 
163                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
164         }
165
166 out:
167         *res = loghandle;
168         RETURN(rc);
169 }
170
171 int llog_cat_put(struct llog_handle *cathandle)
172 {
173         struct llog_handle *loghandle, *n;
174         int rc;
175         ENTRY;
176
177         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head, 
178                                  u.phd.phd_entry) {
179                 int err = llog_close(loghandle);
180                 if (err)
181                         CERROR("error closing loghandle\n");
182         }
183         rc = llog_close(cathandle);
184         RETURN(rc);
185 }
186 EXPORT_SYMBOL(llog_cat_put);
187
188 /* Return the currently active log handle.  If the current log handle doesn't
189  * have enough space left for the current record, start a new one.
190  *
191  * If reclen is 0, we only want to know what the currently active log is,
192  * otherwise we get a lock on this log so nobody can steal our space.
193  *
194  * Assumes caller has already pushed us into the kernel context and is locking.
195  *
196  * NOTE: loghandle is write-locked upon successful return
197  */
198 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, 
199                                                 int create)
200 {
201         struct llog_handle *loghandle = NULL;
202         ENTRY;
203
204         down_read(&cathandle->lgh_lock);
205         loghandle = cathandle->u.chd.chd_current_log;
206         if (loghandle) {
207                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
208                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
209                         down_write(&loghandle->lgh_lock);
210                         up_read(&cathandle->lgh_lock);
211                         RETURN(loghandle);
212                 }
213         }
214         if (!create) {
215                 if (loghandle)
216                         down_write(&loghandle->lgh_lock);
217                 up_read(&cathandle->lgh_lock);
218                 RETURN(loghandle);
219         }
220         up_read(&cathandle->lgh_lock);
221
222         /* time to create new log */
223
224         /* first, we have to make sure the state hasn't changed */
225         down_write(&cathandle->lgh_lock);
226         loghandle = cathandle->u.chd.chd_current_log;
227         if (loghandle) {
228                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
229                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
230                         down_write(&loghandle->lgh_lock);
231                         up_write(&cathandle->lgh_lock);
232                         RETURN(loghandle);
233                 }
234         }
235
236         CDEBUG(D_INODE, "creating new log\n");
237         loghandle = llog_cat_new_log(cathandle);
238         if (loghandle)
239                 down_write(&loghandle->lgh_lock);
240         up_write(&cathandle->lgh_lock);
241         RETURN(loghandle);
242 }
243
244 /* Add a single record to the recovery log(s) using a catalog
245  * Returns as llog_write_record
246  *
247  * Assumes caller has already pushed us into the kernel context.
248  */
249 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
250                     struct llog_cookie *reccookie, void *buf)
251 {
252         struct llog_handle *loghandle;
253         int rc;
254         ENTRY;
255
256         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
257         loghandle = llog_cat_current_log(cathandle, 1);
258         if (IS_ERR(loghandle))
259                 RETURN(PTR_ERR(loghandle));
260         /* loghandle is already locked by llog_cat_current_log() for us */
261         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
262         up_write(&loghandle->lgh_lock);
263         RETURN(rc);
264 }
265 EXPORT_SYMBOL(llog_cat_add_rec);
266
267 /* For each cookie in the cookie array, we clear the log in-use bit and either:
268  * - the log is empty, so mark it free in the catalog header and delete it
269  * - the log is not empty, just write out the log header
270  *
271  * The cookies may be in different log files, so we need to get new logs
272  * each time.
273  *
274  * Assumes caller has already pushed us into the kernel context.
275  */
276 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
277                         struct llog_cookie *cookies)
278 {
279         int i, index, rc = 0;
280         ENTRY;
281
282         down_write(&cathandle->lgh_lock);
283         for (i = 0; i < count; i++, cookies++) {
284                 struct llog_handle *loghandle;
285                 struct llog_logid *lgl = &cookies->lgc_lgl;
286
287                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
288                 if (rc) {
289                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
290                         break;
291                 }
292
293                 down_write(&loghandle->lgh_lock);
294                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
295                 up_write(&loghandle->lgh_lock);
296                 
297                 if (rc == 1) {          /* log has been destroyed */
298                         index = loghandle->u.phd.phd_cookie.lgc_index;
299                         if (cathandle->u.chd.chd_current_log == loghandle)
300                                 cathandle->u.chd.chd_current_log = NULL;
301                         llog_free_handle(loghandle);
302                         
303                         LASSERT(index);
304                         rc = llog_cancel_rec(cathandle, index);
305                 }
306         }
307         up_write(&cathandle->lgh_lock);
308
309         RETURN(rc);
310 }
311 EXPORT_SYMBOL(llog_cat_cancel_records);
312
313 int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data)
314 {
315         struct llog_process_data *d = data;
316         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
317         struct llog_handle *llh;
318         int rc;
319
320         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
321                 CERROR("invalid record in catalog\n");
322                 RETURN(-EINVAL);
323         }
324         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", 
325                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
326                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
327
328         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
329         if (rc) {
330                 CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
331                 RETURN(rc);
332         }        
333
334         rc = llog_process(llh, d->lpd_cb, d->lpd_data);
335         RETURN(rc);
336 }
337
338 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
339 {
340         struct llog_process_data d;
341         int rc;
342         ENTRY;
343         d.lpd_data = data;
344         d.lpd_cb = cb;
345
346         rc = llog_process(cat_llh, llog_cat_process_cb, &d);
347         RETURN(rc);
348 }
349 EXPORT_SYMBOL(llog_cat_process);
350
351
352 #if 0
353 /* Assumes caller has already pushed us into the kernel context. */
354 int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
355 {
356         struct llog_log_hdr *llh;
357         loff_t offset = 0;
358         int rc = 0;
359         ENTRY;
360
361         LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
362
363         down(&cathandle->lgh_lock);
364         llh = cathandle->lgh_hdr;
365
366         if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
367                 llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
368
369 write_hdr:    
370                 rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
371                                    &offset);
372                 if (rc != LLOG_CHUNK_SIZE) {
373                         CERROR("error writing catalog header: rc %d\n", rc);
374                         OBD_FREE(llh, sizeof(*llh));
375                         if (rc >= 0)
376                                 rc = -ENOSPC;
377                 } else
378                         rc = 0;
379         } else {
380                 rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
381                                   &offset);
382                 if (rc != LLOG_CHUNK_SIZE) {
383                         CERROR("error reading catalog header: rc %d\n", rc);
384                         /* Can we do much else if the header is bad? */
385                         goto write_hdr;
386                 } else
387                         rc = 0;
388         }
389
390         cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
391         up(&cathandle->lgh_lock);
392         RETURN(rc);
393 }
394 EXPORT_SYMBOL(llog_cat_init);
395
396 #endif