Whamcloud - gitweb
c05774fb4ce68cc441a27130fe191758421ec9fc
[fs/lustre-release.git] / lustre / lvfs / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lustre_log.h>
42 #include <portals/list.h>
43
44 /* Create a new log handle and add it to the open list.
45  * This log handle will be closed when all of the records in it are removed.
46  *
47  * Assumes caller has already pushed us into the kernel context and is locking.
48  */
49 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
50 {
51         struct llog_handle *loghandle;
52         struct llog_log_hdr *llh;
53         struct llog_logid_rec rec;
54         int rc, index, bitmap_size;
55         ENTRY;
56
57         llh = cathandle->lgh_hdr;
58         bitmap_size = sizeof(llh->llh_bitmap) * 8;
59
60         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
61
62         /* maximum number of available slots in catalog is bitmap_size - 2 */
63         if (llh->llh_cat_idx == cpu_to_le32(index)) {
64                 CERROR("no free catalog slots for log...\n");
65                 RETURN(ERR_PTR(-ENOSPC));
66         } else {
67                 if (index == 0)
68                         index = 1;
69                 if (ext2_set_bit(index, llh->llh_bitmap)) {
70                         CERROR("argh, index %u already set in log bitmap?\n",
71                                index);
72                         LBUG(); /* should never happen */
73                 }
74                 cathandle->lgh_last_idx = index;
75                 llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
76                 llh->llh_tail.lrt_index = cpu_to_le32(index);
77         }
78
79         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
80         if (rc) {
81                 CERROR("cannot create new log, error = %d\n", rc);
82                 RETURN(ERR_PTR(rc));
83         }
84
85         rc = llog_init_handle(loghandle,
86                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
87                               &cathandle->lgh_hdr->llh_tgtuuid);
88         if (rc)
89                 GOTO(out_destroy, rc);
90
91         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
92                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
93                index, cathandle->lgh_id.lgl_oid);
94         /* build the record for this log in the catalog */
95         rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
96         rec.lid_hdr.lrh_index = cpu_to_le32(index);
97         rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
98         rec.lid_id = loghandle->lgh_id;
99         rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
100         rec.lid_tail.lrt_index = cpu_to_le32(index);
101
102         /* update the catalog: header and record */
103         rc = llog_write_rec(cathandle, &rec.lid_hdr,
104                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
105         if (rc < 0)
106                 GOTO(out_destroy, rc);
107
108         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
109         cathandle->u.chd.chd_current_log = loghandle;
110         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
111         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
112
113  out_destroy:
114         if (rc < 0)
115                 llog_destroy(loghandle);
116
117         RETURN(loghandle);
118 }
119
120 /* Open an existent log handle and add it to the open list.
121  * This log handle will be closed when all of the records in it are removed.
122  *
123  * Assumes caller has already pushed us into the kernel context and is locking.
124  * We return a lock on the handle to ensure nobody yanks it from us.
125  */
126 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
127                               struct llog_logid *logid)
128 {
129         struct llog_handle *loghandle;
130         int rc = 0;
131         ENTRY;
132
133         if (cathandle == NULL)
134                 RETURN(-EBADF);
135
136         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
137                             u.phd.phd_entry) {
138                 struct llog_logid *cgl = &loghandle->lgh_id;
139                 if (cgl->lgl_oid == logid->lgl_oid) {
140                         if (cgl->lgl_ogen != logid->lgl_ogen) {
141                                 CERROR("log "LPX64" generation %x != %x\n",
142                                        logid->lgl_oid, cgl->lgl_ogen,
143                                        logid->lgl_ogen);
144                                 continue;
145                         }
146                         loghandle->u.phd.phd_cat_handle = cathandle;
147                         GOTO(out, rc = 0);
148                 }
149         }
150
151         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
152         if (rc) {
153                 CERROR("error opening log id "LPX64":%x: rc %d\n",
154                        logid->lgl_oid, logid->lgl_ogen, rc);
155         } else {
156                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
157                 if (!rc) {
158                         list_add(&loghandle->u.phd.phd_entry,
159                                  &cathandle->u.chd.chd_head);
160                 }
161         }
162         if (!rc) {
163                 loghandle->u.phd.phd_cat_handle = cathandle;
164                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
165                 loghandle->u.phd.phd_cookie.lgc_index =
166                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
167         }
168
169 out:
170         *res = loghandle;
171         RETURN(rc);
172 }
173 EXPORT_SYMBOL(llog_cat_id2handle);
174
175 int llog_cat_put(struct llog_handle *cathandle)
176 {
177         struct llog_handle *loghandle, *n;
178         int rc;
179         ENTRY;
180
181         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
182                                  u.phd.phd_entry) {
183                 int err = llog_close(loghandle);
184                 if (err)
185                         CERROR("error closing loghandle\n");
186         }
187         rc = llog_close(cathandle);
188         RETURN(rc);
189 }
190 EXPORT_SYMBOL(llog_cat_put);
191
192 /* Return the currently active log handle.  If the current log handle doesn't
193  * have enough space left for the current record, start a new one.
194  *
195  * If reclen is 0, we only want to know what the currently active log is,
196  * otherwise we get a lock on this log so nobody can steal our space.
197  *
198  * Assumes caller has already pushed us into the kernel context and is locking.
199  *
200  * NOTE: loghandle is write-locked upon successful return
201  */
202 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
203                                                 int create)
204 {
205         struct llog_handle *loghandle = NULL;
206         ENTRY;
207
208         down_read(&cathandle->lgh_lock);
209         loghandle = cathandle->u.chd.chd_current_log;
210         if (loghandle) {
211                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
212                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
213                         down_write(&loghandle->lgh_lock);
214                         up_read(&cathandle->lgh_lock);
215                         RETURN(loghandle);
216                 }
217         }
218         if (!create) {
219                 if (loghandle)
220                         down_write(&loghandle->lgh_lock);
221                 up_read(&cathandle->lgh_lock);
222                 RETURN(loghandle);
223         }
224         up_read(&cathandle->lgh_lock);
225
226         /* time to create new log */
227
228         /* first, we have to make sure the state hasn't changed */
229         down_write(&cathandle->lgh_lock);
230         loghandle = cathandle->u.chd.chd_current_log;
231         if (loghandle) {
232                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
233                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
234                         down_write(&loghandle->lgh_lock);
235                         up_write(&cathandle->lgh_lock);
236                         RETURN(loghandle);
237                 }
238         }
239
240         CDEBUG(D_INODE, "creating new log\n");
241         loghandle = llog_cat_new_log(cathandle);
242         if (!IS_ERR(loghandle))
243                 down_write(&loghandle->lgh_lock);
244         up_write(&cathandle->lgh_lock);
245         RETURN(loghandle);
246 }
247
248 /* Add a single record to the recovery log(s) using a catalog
249  * Returns as llog_write_record
250  *
251  * Assumes caller has already pushed us into the kernel context.
252  */
253 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
254                      struct llog_cookie *reccookie, void *buf)
255 {
256         struct llog_handle *loghandle;
257         int rc;
258         ENTRY;
259
260         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
261         loghandle = llog_cat_current_log(cathandle, 1);
262         if (IS_ERR(loghandle))
263                 RETURN(PTR_ERR(loghandle));
264         /* loghandle is already locked by llog_cat_current_log() for us */
265         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
266         up_write(&loghandle->lgh_lock);
267
268         RETURN(rc);
269 }
270 EXPORT_SYMBOL(llog_cat_add_rec);
271
272 /* For each cookie in the cookie array, we clear the log in-use bit and either:
273  * - the log is empty, so mark it free in the catalog header and delete it
274  * - the log is not empty, just write out the log header
275  *
276  * The cookies may be in different log files, so we need to get new logs
277  * each time.
278  *
279  * Assumes caller has already pushed us into the kernel context.
280  */
281 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
282                             struct llog_cookie *cookies)
283 {
284         int i, index, rc = 0;
285         ENTRY;
286
287         down_write(&cathandle->lgh_lock);
288         for (i = 0; i < count; i++, cookies++) {
289                 struct llog_handle *loghandle;
290                 struct llog_logid *lgl = &cookies->lgc_lgl;
291
292                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
293                 if (rc) {
294                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
295                         break;
296                 }
297
298                 down_write(&loghandle->lgh_lock);
299                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
300                 up_write(&loghandle->lgh_lock);
301
302                 if (rc == 1) {          /* log has been destroyed */
303                         index = loghandle->u.phd.phd_cookie.lgc_index;
304                         if (cathandle->u.chd.chd_current_log == loghandle)
305                                 cathandle->u.chd.chd_current_log = NULL;
306                         llog_free_handle(loghandle);
307
308                         LASSERT(index);
309                         llog_cat_set_first_idx(cathandle, index);
310                         rc = llog_cancel_rec(cathandle, index);
311                         if (rc == 0)
312                                 CDEBUG(D_HA, "cancel plain log at index %u "
313                                        "of catalog "LPX64"\n",
314                                        index, cathandle->lgh_id.lgl_oid);
315                 }
316         }
317         up_write(&cathandle->lgh_lock);
318
319         RETURN(rc);
320 }
321 EXPORT_SYMBOL(llog_cat_cancel_records);
322
323 static int llog_cat_process_cb(struct llog_handle *cat_llh, 
324                                struct llog_rec_hdr *rec, void *data)
325 {
326         struct llog_process_data *d = data;
327         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
328         struct llog_handle *llh;
329         int rc;
330
331         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
332                 CERROR("invalid record in catalog\n");
333                 RETURN(-EINVAL);
334         }
335         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
336                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
337                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
338
339         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
340         if (rc) {
341                 CERROR("Cannot find handle for log "LPX64"\n",
342                        lir->lid_id.lgl_oid);
343                 RETURN(rc);
344         }
345
346         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
347         RETURN(rc);
348 }
349
350 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
351 {
352         struct llog_process_data d;
353         struct llog_process_cat_data cd;
354         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
355         int rc;
356         ENTRY;
357
358         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
359         d.lpd_data = data;
360         d.lpd_cb = cb;
361
362         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
363                 CWARN("catalog "LPX64" crosses index zero\n",
364                       cat_llh->lgh_id.lgl_oid);
365
366                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
367                 cd.last_idx = 0;
368                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
369                 if (rc != 0)
370                         RETURN(rc);
371
372                 cd.first_idx = 0;
373                 cd.last_idx = cat_llh->lgh_last_idx;
374                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
375         } else {
376                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
377         }
378
379         RETURN(rc);
380 }
381 EXPORT_SYMBOL(llog_cat_process);
382
383 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, 
384                                        struct llog_rec_hdr *rec, void *data)
385 {
386         struct llog_process_data *d = data;
387         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
388         struct llog_handle *llh;
389         int rc;
390
391         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
392                 CERROR("invalid record in catalog\n");
393                 RETURN(-EINVAL);
394         }
395         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
396                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
397                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
398
399         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
400         if (rc) {
401                 CERROR("Cannot find handle for log "LPX64"\n",
402                        lir->lid_id.lgl_oid);
403                 RETURN(rc);
404         }
405
406         rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
407         RETURN(rc);
408 }
409
410 int llog_cat_reverse_process(struct llog_handle *cat_llh,
411                              llog_cb_t cb, void *data)
412 {
413         struct llog_process_data d;
414         struct llog_process_cat_data cd;
415         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
416         int rc;
417         ENTRY;
418
419         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
420         d.lpd_data = data;
421         d.lpd_cb = cb;
422
423         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
424                 CWARN("catalog "LPX64" crosses index zero\n",
425                       cat_llh->lgh_id.lgl_oid);
426
427                 cd.first_idx = 0;
428                 cd.last_idx = cat_llh->lgh_last_idx;
429                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
430                                           &d, &cd);
431                 if (rc != 0)
432                         RETURN(rc);
433
434                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
435                 cd.last_idx = 0;
436                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
437                                           &d, &cd);
438         } else {
439                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
440                                           &d, NULL);
441         }
442
443         RETURN(rc);
444 }
445 EXPORT_SYMBOL(llog_cat_reverse_process);
446
447 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
448 {
449         struct llog_log_hdr *llh = cathandle->lgh_hdr;
450         int i, bitmap_size, idx;
451         ENTRY;
452
453         bitmap_size = sizeof(llh->llh_bitmap) * 8;
454         if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
455                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
456                 llh->llh_cat_idx = cpu_to_le32(idx);
457                 if (idx == cathandle->lgh_last_idx)
458                         goto out;
459                 for (i = (index + 1) % bitmap_size;
460                      i != cathandle->lgh_last_idx;
461                      i = (i + 1) % bitmap_size) {
462                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
463                                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
464                                 llh->llh_cat_idx = cpu_to_le32(idx);
465                         } else if (i == 0) {
466                                 llh->llh_cat_idx = 0;
467                         } else {
468                                 break;
469                         }
470                 }
471 out:
472                 CDEBUG(D_HA, "set catalog "LPX64" first idx %u\n",
473                        cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
474         }
475
476         RETURN(0);
477 }
478 EXPORT_SYMBOL(llog_cat_set_first_idx);
479
480 int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
481                      void *buf, struct llog_cookie *logcookies, 
482                      int numcookies, void *data)
483 {
484         struct llog_handle *cathandle;
485         int rc;
486         ENTRY;
487         
488         cathandle = ctxt->loc_handle;
489         LASSERT(cathandle != NULL);
490         
491         rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
492         if (rc != 1)
493                 CERROR("write one catalog record failed: %d\n", rc);
494         RETURN(rc);
495 }
496 EXPORT_SYMBOL(llog_catalog_add);
497
498 int llog_catalog_cancel(struct llog_ctxt *ctxt, int count,
499                         struct llog_cookie *cookies, int flags, void *data)
500 {
501         struct llog_handle *cathandle;
502         int rc;
503         ENTRY;
504
505         if (cookies == NULL || count == 0)
506                 RETURN(-EINVAL);
507         cathandle = ctxt->loc_handle;
508         LASSERT(cathandle != NULL);
509         rc = llog_cat_cancel_records(cathandle, count, cookies);
510         RETURN(rc);
511 }
512 EXPORT_SYMBOL(llog_catalog_cancel);
513
514 int llog_catalog_setup(struct llog_ctxt **res, char *name,
515                        struct obd_export *exp, 
516                        struct lvfs_run_ctxt *lvfs_ctxt,
517                        struct fsfilt_operations *fsops,
518                        struct dentry *logs_de, 
519                        struct dentry *objects_de)
520 {
521         struct llog_ctxt *ctxt;
522         struct llog_catid catid;
523         struct llog_handle *handle;
524         int rc;
525         
526         ENTRY;
527
528         OBD_ALLOC(ctxt, sizeof(*ctxt));
529         if (!ctxt)
530                 RETURN(-ENOMEM);
531
532         *res = ctxt;
533
534         /* marking this ctxt alone. */
535         ctxt->loc_alone = 1;
536         ctxt->loc_fsops = fsops;
537         ctxt->loc_lvfs_ctxt = lvfs_ctxt;
538         ctxt->loc_exp = exp;
539         ctxt->loc_logs_dir = logs_de;
540         ctxt->loc_objects_dir = objects_de;
541         ctxt->loc_logops = &llog_lvfs_ops; 
542         ctxt->loc_logops->lop_add = llog_catalog_add;
543         ctxt->loc_logops->lop_cancel = llog_catalog_cancel;
544
545         memset(&catid, 0, sizeof(struct llog_catid));
546         rc = llog_get_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
547         if (rc) {
548                 CERROR("error llog_get_cat_list rc: %d\n", rc);
549                 RETURN(rc);
550         }
551         if (catid.lci_logid.lgl_oid)
552                 rc = llog_create(ctxt, &handle, &catid.lci_logid, 0);
553         else {
554                 rc = llog_create(ctxt, &handle, NULL, NULL);
555                 if (!rc)
556                         catid.lci_logid = handle->lgh_id;
557         }
558         if (rc)
559                 GOTO(out, rc);
560
561         ctxt->loc_handle = handle;
562         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
563         if (rc)
564                 GOTO(out, rc);
565
566         rc = llog_put_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
567         if (rc)
568                 CERROR("error llog_get_cat_list rc: %d\n", rc);
569 out:
570         if (ctxt && rc)
571                 OBD_FREE(ctxt, sizeof(*ctxt));
572         RETURN(rc);
573 }
574 EXPORT_SYMBOL(llog_catalog_setup);
575
576 int llog_catalog_cleanup(struct llog_ctxt *ctxt)
577 {
578         struct llog_handle *cathandle, *n, *loghandle;
579         struct llog_log_hdr *llh;
580         int rc, index;
581         ENTRY;
582                                                                                                                              
583         if (!ctxt)
584                 return 0;
585
586         cathandle = ctxt->loc_handle;
587         if (cathandle) {
588                 list_for_each_entry_safe(loghandle, n,
589                                          &cathandle->u.chd.chd_head,
590                                          u.phd.phd_entry) {
591                         llh = loghandle->lgh_hdr;
592                         if ((le32_to_cpu(llh->llh_flags) &
593                                 LLOG_F_ZAP_WHEN_EMPTY) &&
594                             (le32_to_cpu(llh->llh_count) == 1)) {
595                                 rc = llog_destroy(loghandle);
596                                 if (rc)
597                                         CERROR("failure destroying log during "
598                                                "cleanup: %d\n", rc);
599                                 LASSERT(rc == 0);
600                                 index = loghandle->u.phd.phd_cookie.lgc_index;
601                                 llog_free_handle(loghandle);
602                                                                                                                              
603                                 LASSERT(index);
604                                 llog_cat_set_first_idx(cathandle, index);
605                                 rc = llog_cancel_rec(cathandle, index);
606                                 if (rc == 0)
607                                         CDEBUG(D_HA, "cancel plain log at index"
608                                                " %u of catalog "LPX64"\n",
609                                                index,cathandle->lgh_id.lgl_oid);
610                         }
611                 }
612                 llog_cat_put(ctxt->loc_handle);
613         }
614         return 0;
615 }
616 EXPORT_SYMBOL(llog_catalog_cleanup);
617
618 int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
619 {
620         struct llog_handle *loghandle;
621         struct llog_logid *lgl = &cookie->lgc_lgl;
622         int rc;
623                                                                                                                              
624         down_read(&handle->lgh_lock);
625         rc = llog_cat_id2handle(handle, &loghandle, lgl);
626         if (rc)
627                 GOTO(out, rc);
628         if (2 * loghandle->lgh_hdr->llh_cat_idx <=
629             handle->lgh_last_idx + handle->lgh_hdr->llh_cat_idx + 1)
630                 rc = 1;
631         else
632                 rc = 0;
633 out:
634         up_read(&handle->lgh_lock);
635         RETURN(rc);
636 }
637 EXPORT_SYMBOL(llog_cat_half_bottom);