Whamcloud - gitweb
land b_ost_amd onto HEAD.
[fs/lustre-release.git] / lustre / lvfs / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lustre_log.h>
42 #include <portals/list.h>
43
44 /* Create a new log handle and add it to the open list.
45  * This log handle will be closed when all of the records in it are removed.
46  *
47  * Assumes caller has already pushed us into the kernel context and is locking.
48  */
49 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
50 {
51         struct llog_handle *loghandle;
52         struct llog_log_hdr *llh;
53         struct llog_logid_rec rec;
54         int rc, index, bitmap_size;
55         ENTRY;
56
57         llh = cathandle->lgh_hdr;
58         bitmap_size = sizeof(llh->llh_bitmap) * 8;
59
60         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
61
62         /* maximum number of available slots in catalog is bitmap_size - 2 */
63         if (llh->llh_cat_idx == cpu_to_le32(index)) {
64                 CERROR("no free catalog slots for log...\n");
65                 RETURN(ERR_PTR(-ENOSPC));
66         } else {
67                 if (index == 0)
68                         index = 1;
69                 if (ext2_set_bit(index, llh->llh_bitmap)) {
70                         CERROR("argh, index %u already set in log bitmap?\n",
71                                index);
72                         LBUG(); /* should never happen */
73                 }
74                 cathandle->lgh_last_idx = index;
75                 llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
76                 llh->llh_tail.lrt_index = cpu_to_le32(index);
77         }
78
79         rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
80                        OBD_LLOG_FL_CREATE);
81         if (rc) {
82                 CERROR("cannot create new log, error = %d\n", rc);
83                 RETURN(ERR_PTR(rc));
84         }
85
86         rc = llog_init_handle(loghandle,
87                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
88                               &cathandle->lgh_hdr->llh_tgtuuid);
89         if (rc)
90                 GOTO(out_destroy, rc);
91
92         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
93                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
94                index, cathandle->lgh_id.lgl_oid);
95         /* build the record for this log in the catalog */
96         rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
97         rec.lid_hdr.lrh_index = cpu_to_le32(index);
98         rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
99         rec.lid_id = loghandle->lgh_id;
100         rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
101         rec.lid_tail.lrt_index = cpu_to_le32(index);
102
103         /* update the catalog: header and record */
104         rc = llog_write_rec(cathandle, &rec.lid_hdr,
105                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
106         if (rc < 0)
107                 GOTO(out_destroy, rc);
108
109         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
110         cathandle->u.chd.chd_current_log = loghandle;
111         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
112         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
113
114  out_destroy:
115         if (rc < 0)
116                 llog_destroy(loghandle);
117
118         RETURN(loghandle);
119 }
120
121 /* Open an existent log handle and add it to the open list.
122  * This log handle will be closed when all of the records in it are removed.
123  *
124  * Assumes caller has already pushed us into the kernel context and is locking.
125  * We return a lock on the handle to ensure nobody yanks it from us.
126  */
127 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
128                               struct llog_logid *logid)
129 {
130         struct llog_handle *loghandle;
131         int rc = 0;
132         ENTRY;
133
134         if (cathandle == NULL)
135                 RETURN(-EBADF);
136
137         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
138                             u.phd.phd_entry) {
139                 struct llog_logid *cgl = &loghandle->lgh_id;
140                 if (cgl->lgl_oid == logid->lgl_oid) {
141                         if (cgl->lgl_ogen != logid->lgl_ogen) {
142                                 CERROR("log "LPX64" generation %x != %x\n",
143                                        logid->lgl_oid, cgl->lgl_ogen,
144                                        logid->lgl_ogen);
145                                 continue;
146                         }
147                         loghandle->u.phd.phd_cat_handle = cathandle;
148                         GOTO(out, rc = 0);
149                 }
150         }
151
152         rc = llog_open(cathandle->lgh_ctxt, &loghandle, logid, NULL, 0);
153         if (rc) {
154                 CERROR("error opening log id "LPX64":%x: rc %d\n",
155                        logid->lgl_oid, logid->lgl_ogen, rc);
156         } else {
157                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
158                 if (!rc) {
159                         list_add(&loghandle->u.phd.phd_entry,
160                                  &cathandle->u.chd.chd_head);
161                 }
162         }
163         if (!rc) {
164                 loghandle->u.phd.phd_cat_handle = cathandle;
165                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
166                 loghandle->u.phd.phd_cookie.lgc_index =
167                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
168         }
169
170 out:
171         *res = loghandle;
172         RETURN(rc);
173 }
174 EXPORT_SYMBOL(llog_cat_id2handle);
175
176 int llog_cat_put(struct llog_handle *cathandle)
177 {
178         struct llog_handle *loghandle, *n;
179         int rc;
180         ENTRY;
181
182         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
183                                  u.phd.phd_entry) {
184                 int err = llog_close(loghandle);
185                 if (err)
186                         CERROR("error closing loghandle\n");
187         }
188         rc = llog_close(cathandle);
189         RETURN(rc);
190 }
191 EXPORT_SYMBOL(llog_cat_put);
192
193 /* Return the currently active log handle.  If the current log handle doesn't
194  * have enough space left for the current record, start a new one.
195  *
196  * If reclen is 0, we only want to know what the currently active log is,
197  * otherwise we get a lock on this log so nobody can steal our space.
198  *
199  * Assumes caller has already pushed us into the kernel context and is locking.
200  *
201  * NOTE: loghandle is write-locked upon successful return
202  */
203 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
204                                                 int create)
205 {
206         struct llog_handle *loghandle = NULL;
207         ENTRY;
208
209         down_read(&cathandle->lgh_lock);
210         loghandle = cathandle->u.chd.chd_current_log;
211         if (loghandle) {
212                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
213                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
214                         down_write(&loghandle->lgh_lock);
215                         up_read(&cathandle->lgh_lock);
216                         RETURN(loghandle);
217                 }
218         }
219         if (!create) {
220                 if (loghandle)
221                         down_write(&loghandle->lgh_lock);
222                 up_read(&cathandle->lgh_lock);
223                 RETURN(loghandle);
224         }
225         up_read(&cathandle->lgh_lock);
226
227         /* time to create new log */
228
229         /* first, we have to make sure the state hasn't changed */
230         down_write(&cathandle->lgh_lock);
231         loghandle = cathandle->u.chd.chd_current_log;
232         if (loghandle) {
233                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
234                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
235                         down_write(&loghandle->lgh_lock);
236                         up_write(&cathandle->lgh_lock);
237                         RETURN(loghandle);
238                 }
239         }
240
241         CDEBUG(D_INODE, "creating new log\n");
242         loghandle = llog_cat_new_log(cathandle);
243         if (!IS_ERR(loghandle))
244                 down_write(&loghandle->lgh_lock);
245         up_write(&cathandle->lgh_lock);
246         RETURN(loghandle);
247 }
248
249 /* Add a single record to the recovery log(s) using a catalog
250  * Returns as llog_write_record
251  *
252  * Assumes caller has already pushed us into the kernel context.
253  */
254 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
255                      struct llog_cookie *reccookie, void *buf)
256 {
257         struct llog_handle *loghandle;
258         int rc;
259         ENTRY;
260
261         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
262         loghandle = llog_cat_current_log(cathandle, 1);
263         if (IS_ERR(loghandle))
264                 RETURN(PTR_ERR(loghandle));
265         /* loghandle is already locked by llog_cat_current_log() for us */
266         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
267         up_write(&loghandle->lgh_lock);
268
269         RETURN(rc);
270 }
271 EXPORT_SYMBOL(llog_cat_add_rec);
272
273 /* For each cookie in the cookie array, we clear the log in-use bit and either:
274  * - the log is empty, so mark it free in the catalog header and delete it
275  * - the log is not empty, just write out the log header
276  *
277  * The cookies may be in different log files, so we need to get new logs
278  * each time.
279  *
280  * Assumes caller has already pushed us into the kernel context.
281  */
282 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
283                             struct llog_cookie *cookies)
284 {
285         int i, index, rc = 0;
286         ENTRY;
287
288         down_write(&cathandle->lgh_lock);
289         for (i = 0; i < count; i++, cookies++) {
290                 struct llog_handle *loghandle;
291                 struct llog_logid *lgl = &cookies->lgc_lgl;
292
293                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
294                 if (rc) {
295                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
296                         break;
297                 }
298
299                 down_write(&loghandle->lgh_lock);
300                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
301                 up_write(&loghandle->lgh_lock);
302
303                 if (rc == 1) {          /* log has been destroyed */
304                         index = loghandle->u.phd.phd_cookie.lgc_index;
305                         if (cathandle->u.chd.chd_current_log == loghandle)
306                                 cathandle->u.chd.chd_current_log = NULL;
307                         llog_free_handle(loghandle);
308
309                         LASSERT(index);
310                         llog_cat_set_first_idx(cathandle, index);
311                         rc = llog_cancel_rec(cathandle, index);
312                         if (rc == 0)
313                                 CDEBUG(D_HA, "cancel plain log at index %u "
314                                        "of catalog "LPX64"\n",
315                                        index, cathandle->lgh_id.lgl_oid);
316                 }
317         }
318         up_write(&cathandle->lgh_lock);
319
320         RETURN(rc);
321 }
322 EXPORT_SYMBOL(llog_cat_cancel_records);
323
324 static int llog_cat_process_cb(struct llog_handle *cat_llh, 
325                                struct llog_rec_hdr *rec, void *data)
326 {
327         struct llog_process_data *d = data;
328         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
329         struct llog_handle *llh;
330         int rc;
331
332         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
333                 CERROR("invalid record in catalog\n");
334                 RETURN(-EINVAL);
335         }
336         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
337                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
338                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
339
340         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
341         if (rc) {
342                 CERROR("Cannot find handle for log "LPX64"\n",
343                        lir->lid_id.lgl_oid);
344                 RETURN(rc);
345         }
346
347         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
348         RETURN(rc);
349 }
350
351 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
352 {
353         struct llog_process_data d;
354         struct llog_process_cat_data cd;
355         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
356         int rc;
357         ENTRY;
358
359         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
360         d.lpd_data = data;
361         d.lpd_cb = cb;
362
363         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
364                 CWARN("catalog "LPX64" crosses index zero\n",
365                       cat_llh->lgh_id.lgl_oid);
366
367                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
368                 cd.last_idx = 0;
369                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
370                 if (rc != 0)
371                         RETURN(rc);
372
373                 cd.first_idx = 0;
374                 cd.last_idx = cat_llh->lgh_last_idx;
375                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
376         } else {
377                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
378         }
379
380         RETURN(rc);
381 }
382 EXPORT_SYMBOL(llog_cat_process);
383
384 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, 
385                                        struct llog_rec_hdr *rec, void *data)
386 {
387         struct llog_process_data *d = data;
388         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
389         struct llog_handle *llh;
390         int rc;
391
392         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
393                 CERROR("invalid record in catalog\n");
394                 RETURN(-EINVAL);
395         }
396         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
397                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
398                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
399
400         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
401         if (rc) {
402                 CERROR("Cannot find handle for log "LPX64"\n",
403                        lir->lid_id.lgl_oid);
404                 RETURN(rc);
405         }
406
407         rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
408         RETURN(rc);
409 }
410
411 int llog_cat_reverse_process(struct llog_handle *cat_llh,
412                              llog_cb_t cb, void *data)
413 {
414         struct llog_process_data d;
415         struct llog_process_cat_data cd;
416         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
417         int rc;
418         ENTRY;
419
420         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
421         d.lpd_data = data;
422         d.lpd_cb = cb;
423
424         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
425                 CWARN("catalog "LPX64" crosses index zero\n",
426                       cat_llh->lgh_id.lgl_oid);
427
428                 cd.first_idx = 0;
429                 cd.last_idx = cat_llh->lgh_last_idx;
430                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
431                                           &d, &cd);
432                 if (rc != 0)
433                         RETURN(rc);
434
435                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
436                 cd.last_idx = 0;
437                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
438                                           &d, &cd);
439         } else {
440                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
441                                           &d, NULL);
442         }
443
444         RETURN(rc);
445 }
446 EXPORT_SYMBOL(llog_cat_reverse_process);
447
448 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
449 {
450         struct llog_log_hdr *llh = cathandle->lgh_hdr;
451         int i, bitmap_size, idx;
452         ENTRY;
453
454         bitmap_size = sizeof(llh->llh_bitmap) * 8;
455         if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
456                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
457                 llh->llh_cat_idx = cpu_to_le32(idx);
458                 if (idx == cathandle->lgh_last_idx)
459                         goto out;
460                 for (i = (index + 1) % bitmap_size;
461                      i != cathandle->lgh_last_idx;
462                      i = (i + 1) % bitmap_size) {
463                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
464                                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
465                                 llh->llh_cat_idx = cpu_to_le32(idx);
466                         } else if (i == 0) {
467                                 llh->llh_cat_idx = 0;
468                         } else {
469                                 break;
470                         }
471                 }
472 out:
473                 CDEBUG(D_HA, "set catalog "LPX64" first idx %u\n",
474                        cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
475         }
476
477         RETURN(0);
478 }
479 EXPORT_SYMBOL(llog_cat_set_first_idx);
480
481 int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
482                      void *buf, struct llog_cookie *logcookies, 
483                      int numcookies, void *data)
484 {
485         struct llog_handle *cathandle;
486         int rc;
487         ENTRY;
488         
489         cathandle = ctxt->loc_handle;
490         LASSERT(cathandle != NULL);
491         
492         rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
493         if (rc != 1)
494                 CERROR("write one catalog record failed: %d\n", rc);
495         RETURN(rc);
496 }
497 EXPORT_SYMBOL(llog_catalog_add);
498
499 int llog_catalog_cancel(struct llog_ctxt *ctxt, int count,
500                         struct llog_cookie *cookies, int flags, void *data)
501 {
502         struct llog_handle *cathandle;
503         int rc;
504         ENTRY;
505
506         if (cookies == NULL || count == 0)
507                 RETURN(-EINVAL);
508         cathandle = ctxt->loc_handle;
509         LASSERT(cathandle != NULL);
510         rc = llog_cat_cancel_records(cathandle, count, cookies);
511         RETURN(rc);
512 }
513 EXPORT_SYMBOL(llog_catalog_cancel);
514
515 int llog_catalog_setup(struct llog_ctxt **res, char *name,
516                        struct obd_export *exp, 
517                        struct lvfs_run_ctxt *lvfs_ctxt,
518                        struct fsfilt_operations *fsops,
519                        struct dentry *logs_de, 
520                        struct dentry *objects_de)
521 {
522         struct llog_ctxt *ctxt;
523         struct llog_catid catid;
524         struct llog_handle *handle;
525         int rc;
526         
527         ENTRY;
528
529         OBD_ALLOC(ctxt, sizeof(*ctxt));
530         if (!ctxt)
531                 RETURN(-ENOMEM);
532
533         *res = ctxt;
534
535         /* marking this ctxt alone. */
536         ctxt->loc_alone = 1;
537         ctxt->loc_fsops = fsops;
538         ctxt->loc_lvfs_ctxt = lvfs_ctxt;
539         ctxt->loc_exp = exp;
540         ctxt->loc_logs_dir = logs_de;
541         ctxt->loc_objects_dir = objects_de;
542         ctxt->loc_logops = &llog_lvfs_ops; 
543         ctxt->loc_logops->lop_add = llog_catalog_add;
544         ctxt->loc_logops->lop_cancel = llog_catalog_cancel;
545
546         memset(&catid, 0, sizeof(struct llog_catid));
547         rc = llog_get_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
548         if (rc) {
549                 CERROR("error llog_get_cat_list rc: %d\n", rc);
550                 RETURN(rc);
551         }
552         if (catid.lci_logid.lgl_oid)
553                 rc = llog_open(ctxt, &handle, &catid.lci_logid, NULL,
554                                OBD_LLOG_FL_CREATE);
555         else {
556                 rc = llog_open(ctxt, &handle, NULL, NULL, OBD_LLOG_FL_CREATE);
557                 if (!rc)
558                         catid.lci_logid = handle->lgh_id;
559         }
560         if (rc)
561                 GOTO(out, rc);
562
563         ctxt->loc_handle = handle;
564         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
565         if (rc)
566                 GOTO(out, rc);
567
568         rc = llog_put_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
569         if (rc)
570                 CERROR("error llog_get_cat_list rc: %d\n", rc);
571 out:
572         if (ctxt && rc)
573                 OBD_FREE(ctxt, sizeof(*ctxt));
574         RETURN(rc);
575 }
576 EXPORT_SYMBOL(llog_catalog_setup);
577
578 int llog_catalog_cleanup(struct llog_ctxt *ctxt)
579 {
580         struct llog_handle *cathandle, *n, *loghandle;
581         struct llog_log_hdr *llh;
582         int rc, index;
583         ENTRY;
584
585         if (!ctxt)
586                 return 0;
587
588         cathandle = ctxt->loc_handle;
589         if (cathandle) {
590                 list_for_each_entry_safe(loghandle, n,
591                                          &cathandle->u.chd.chd_head,
592                                          u.phd.phd_entry) {
593                         llh = loghandle->lgh_hdr;
594                         if ((le32_to_cpu(llh->llh_flags) &
595                                 LLOG_F_ZAP_WHEN_EMPTY) &&
596                             (le32_to_cpu(llh->llh_count) == 1)) {
597                                 rc = llog_destroy(loghandle);
598                                 if (rc)
599                                         CERROR("failure destroying log during "
600                                                "cleanup: %d\n", rc);
601                                 LASSERT(rc == 0);
602                                 index = loghandle->u.phd.phd_cookie.lgc_index;
603                                 llog_free_handle(loghandle);
604
605                                 LASSERT(index);
606                                 llog_cat_set_first_idx(cathandle, index);
607                                 rc = llog_cancel_rec(cathandle, index);
608                                 if (rc == 0)
609                                         CDEBUG(D_HA, "cancel plain log at index"
610                                                " %u of catalog "LPX64"\n",
611                                                index,cathandle->lgh_id.lgl_oid);
612                         }
613                 }
614                 llog_cat_put(ctxt->loc_handle);
615         }
616         return 0;
617 }
618 EXPORT_SYMBOL(llog_catalog_cleanup);
619
620 int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
621 {
622         struct llog_handle *loghandle;
623         struct llog_logid *lgl = &cookie->lgc_lgl;
624         int rc;
625
626         down_read(&handle->lgh_lock);
627         rc = llog_cat_id2handle(handle, &loghandle, lgl);
628         if (rc)
629                 GOTO(out, rc);
630         if (2 * loghandle->lgh_hdr->llh_cat_idx <=
631             handle->lgh_last_idx + handle->lgh_hdr->llh_cat_idx + 1)
632                 rc = 1;
633         else
634                 rc = 0;
635 out:
636         up_read(&handle->lgh_lock);
637         RETURN(rc);
638 }
639 EXPORT_SYMBOL(llog_cat_half_bottom);