Whamcloud - gitweb
landing smfs.
[fs/lustre-release.git] / lustre / lvfs / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lustre_log.h>
42 #include <portals/list.h>
43
44 /* Create a new log handle and add it to the open list.
45  * This log handle will be closed when all of the records in it are removed.
46  *
47  * Assumes caller has already pushed us into the kernel context and is locking.
48  */
49 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
50 {
51         struct llog_handle *loghandle;
52         struct llog_log_hdr *llh;
53         struct llog_logid_rec rec;
54         int rc, index, bitmap_size;
55         ENTRY;
56
57         llh = cathandle->lgh_hdr;
58         bitmap_size = sizeof(llh->llh_bitmap) * 8;
59
60         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
61
62         /* maximum number of available slots in catalog is bitmap_size - 2 */
63         if (llh->llh_cat_idx == cpu_to_le32(index)) {
64                 CERROR("no free catalog slots for log...\n");
65                 RETURN(ERR_PTR(-ENOSPC));
66         } else {
67                 if (index == 0)
68                         index = 1;
69                 if (ext2_set_bit(index, llh->llh_bitmap)) {
70                         CERROR("argh, index %u already set in log bitmap?\n",
71                                index);
72                         LBUG(); /* should never happen */
73                 }
74                 cathandle->lgh_last_idx = index;
75                 llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
76                 llh->llh_tail.lrt_index = cpu_to_le32(index);
77         }
78
79         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
80         if (rc)
81                 RETURN(ERR_PTR(rc));
82
83         rc = llog_init_handle(loghandle,
84                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
85                               &cathandle->lgh_hdr->llh_tgtuuid);
86         if (rc)
87                 GOTO(out_destroy, rc);
88
89         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
90                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
91                index, cathandle->lgh_id.lgl_oid);
92         /* build the record for this log in the catalog */
93         rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
94         rec.lid_hdr.lrh_index = cpu_to_le32(index);
95         rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
96         rec.lid_id = loghandle->lgh_id;
97         rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
98         rec.lid_tail.lrt_index = cpu_to_le32(index);
99
100         /* update the catalog: header and record */
101         rc = llog_write_rec(cathandle, &rec.lid_hdr,
102                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
103         if (rc < 0) {
104                 GOTO(out_destroy, rc);
105         }
106
107         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
108         cathandle->u.chd.chd_current_log = loghandle;
109         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
110         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
111
112  out_destroy:
113         if (rc < 0)
114                 llog_destroy(loghandle);
115
116         RETURN(loghandle);
117 }
118
119 /* Open an existent log handle and add it to the open list.
120  * This log handle will be closed when all of the records in it are removed.
121  *
122  * Assumes caller has already pushed us into the kernel context and is locking.
123  * We return a lock on the handle to ensure nobody yanks it from us.
124  */
125 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
126                               struct llog_logid *logid)
127 {
128         struct llog_handle *loghandle;
129         int rc = 0;
130         ENTRY;
131
132         if (cathandle == NULL)
133                 RETURN(-EBADF);
134
135         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
136                             u.phd.phd_entry) {
137                 struct llog_logid *cgl = &loghandle->lgh_id;
138                 if (cgl->lgl_oid == logid->lgl_oid) {
139                         if (cgl->lgl_ogen != logid->lgl_ogen) {
140                                 CERROR("log "LPX64" generation %x != %x\n",
141                                        logid->lgl_oid, cgl->lgl_ogen,
142                                        logid->lgl_ogen);
143                                 continue;
144                         }
145                         loghandle->u.phd.phd_cat_handle = cathandle;
146                         GOTO(out, rc = 0);
147                 }
148         }
149
150         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
151         if (rc) {
152                 CERROR("error opening log id "LPX64":%x: rc %d\n",
153                        logid->lgl_oid, logid->lgl_ogen, rc);
154         } else {
155                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
156                 if (!rc) {
157                         list_add(&loghandle->u.phd.phd_entry,
158                                  &cathandle->u.chd.chd_head);
159                 }
160         }
161         if (!rc) {
162                 loghandle->u.phd.phd_cat_handle = cathandle;
163                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
164                 loghandle->u.phd.phd_cookie.lgc_index =
165                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
166         }
167
168 out:
169         *res = loghandle;
170         RETURN(rc);
171 }
172 EXPORT_SYMBOL(llog_cat_id2handle);
173
174 int llog_cat_put(struct llog_handle *cathandle)
175 {
176         struct llog_handle *loghandle, *n;
177         int rc;
178         ENTRY;
179
180         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
181                                  u.phd.phd_entry) {
182                 int err = llog_close(loghandle);
183                 if (err)
184                         CERROR("error closing loghandle\n");
185         }
186         rc = llog_close(cathandle);
187         RETURN(rc);
188 }
189 EXPORT_SYMBOL(llog_cat_put);
190
191 /* Return the currently active log handle.  If the current log handle doesn't
192  * have enough space left for the current record, start a new one.
193  *
194  * If reclen is 0, we only want to know what the currently active log is,
195  * otherwise we get a lock on this log so nobody can steal our space.
196  *
197  * Assumes caller has already pushed us into the kernel context and is locking.
198  *
199  * NOTE: loghandle is write-locked upon successful return
200  */
201 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
202                                                 int create)
203 {
204         struct llog_handle *loghandle = NULL;
205         ENTRY;
206
207         down_read(&cathandle->lgh_lock);
208         loghandle = cathandle->u.chd.chd_current_log;
209         if (loghandle) {
210                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
211                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
212                         down_write(&loghandle->lgh_lock);
213                         up_read(&cathandle->lgh_lock);
214                         RETURN(loghandle);
215                 }
216         }
217         if (!create) {
218                 if (loghandle)
219                         down_write(&loghandle->lgh_lock);
220                 up_read(&cathandle->lgh_lock);
221                 RETURN(loghandle);
222         }
223         up_read(&cathandle->lgh_lock);
224
225         /* time to create new log */
226
227         /* first, we have to make sure the state hasn't changed */
228         down_write(&cathandle->lgh_lock);
229         loghandle = cathandle->u.chd.chd_current_log;
230         if (loghandle) {
231                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
232                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
233                         down_write(&loghandle->lgh_lock);
234                         up_write(&cathandle->lgh_lock);
235                         RETURN(loghandle);
236                 }
237         }
238
239         CDEBUG(D_INODE, "creating new log\n");
240         loghandle = llog_cat_new_log(cathandle);
241         if (!IS_ERR(loghandle))
242                 down_write(&loghandle->lgh_lock);
243         up_write(&cathandle->lgh_lock);
244         RETURN(loghandle);
245 }
246
247 /* Add a single record to the recovery log(s) using a catalog
248  * Returns as llog_write_record
249  *
250  * Assumes caller has already pushed us into the kernel context.
251  */
252 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
253                      struct llog_cookie *reccookie, void *buf)
254 {
255         struct llog_handle *loghandle;
256         int rc;
257         ENTRY;
258
259         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
260         loghandle = llog_cat_current_log(cathandle, 1);
261         if (IS_ERR(loghandle))
262                 RETURN(PTR_ERR(loghandle));
263         /* loghandle is already locked by llog_cat_current_log() for us */
264         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
265         up_write(&loghandle->lgh_lock);
266
267         RETURN(rc);
268 }
269 EXPORT_SYMBOL(llog_cat_add_rec);
270
271 /* For each cookie in the cookie array, we clear the log in-use bit and either:
272  * - the log is empty, so mark it free in the catalog header and delete it
273  * - the log is not empty, just write out the log header
274  *
275  * The cookies may be in different log files, so we need to get new logs
276  * each time.
277  *
278  * Assumes caller has already pushed us into the kernel context.
279  */
280 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
281                             struct llog_cookie *cookies)
282 {
283         int i, index, rc = 0;
284         ENTRY;
285
286         down_write(&cathandle->lgh_lock);
287         for (i = 0; i < count; i++, cookies++) {
288                 struct llog_handle *loghandle;
289                 struct llog_logid *lgl = &cookies->lgc_lgl;
290
291                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
292                 if (rc) {
293                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
294                         break;
295                 }
296
297                 down_write(&loghandle->lgh_lock);
298                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
299                 up_write(&loghandle->lgh_lock);
300
301                 if (rc == 1) {          /* log has been destroyed */
302                         index = loghandle->u.phd.phd_cookie.lgc_index;
303                         if (cathandle->u.chd.chd_current_log == loghandle)
304                                 cathandle->u.chd.chd_current_log = NULL;
305                         llog_free_handle(loghandle);
306
307                         LASSERT(index);
308                         llog_cat_set_first_idx(cathandle, index);
309                         rc = llog_cancel_rec(cathandle, index);
310                         if (rc == 0)
311                                 CDEBUG(D_HA, "cancel plain log at index %u "
312                                        "of catalog "LPX64"\n",
313                                        index, cathandle->lgh_id.lgl_oid);
314                 }
315         }
316         up_write(&cathandle->lgh_lock);
317
318         RETURN(rc);
319 }
320 EXPORT_SYMBOL(llog_cat_cancel_records);
321
322 static int llog_cat_process_cb(struct llog_handle *cat_llh, 
323                                struct llog_rec_hdr *rec, void *data)
324 {
325         struct llog_process_data *d = data;
326         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
327         struct llog_handle *llh;
328         int rc;
329
330         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
331                 CERROR("invalid record in catalog\n");
332                 RETURN(-EINVAL);
333         }
334         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
335                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
336                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
337
338         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
339         if (rc) {
340                 CERROR("Cannot find handle for log "LPX64"\n",
341                        lir->lid_id.lgl_oid);
342                 RETURN(rc);
343         }
344
345         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
346         RETURN(rc);
347 }
348
349 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
350 {
351         struct llog_process_data d;
352         struct llog_process_cat_data cd;
353         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
354         int rc;
355         ENTRY;
356
357         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
358         d.lpd_data = data;
359         d.lpd_cb = cb;
360
361         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
362                 CWARN("catalog "LPX64" crosses index zero\n",
363                       cat_llh->lgh_id.lgl_oid);
364
365                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
366                 cd.last_idx = 0;
367                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
368                 if (rc != 0)
369                         RETURN(rc);
370
371                 cd.first_idx = 0;
372                 cd.last_idx = cat_llh->lgh_last_idx;
373                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
374         } else {
375                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
376         }
377
378         RETURN(rc);
379 }
380 EXPORT_SYMBOL(llog_cat_process);
381
382 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, 
383                                        struct llog_rec_hdr *rec, void *data)
384 {
385         struct llog_process_data *d = data;
386         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
387         struct llog_handle *llh;
388         int rc;
389
390         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
391                 CERROR("invalid record in catalog\n");
392                 RETURN(-EINVAL);
393         }
394         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
395                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
396                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
397
398         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
399         if (rc) {
400                 CERROR("Cannot find handle for log "LPX64"\n",
401                        lir->lid_id.lgl_oid);
402                 RETURN(rc);
403         }
404
405         rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
406         RETURN(rc);
407 }
408
409 int llog_cat_reverse_process(struct llog_handle *cat_llh,
410                              llog_cb_t cb, void *data)
411 {
412         struct llog_process_data d;
413         struct llog_process_cat_data cd;
414         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
415         int rc;
416         ENTRY;
417
418         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
419         d.lpd_data = data;
420         d.lpd_cb = cb;
421
422         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
423                 CWARN("catalog "LPX64" crosses index zero\n",
424                       cat_llh->lgh_id.lgl_oid);
425
426                 cd.first_idx = 0;
427                 cd.last_idx = cat_llh->lgh_last_idx;
428                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
429                                           &d, &cd);
430                 if (rc != 0)
431                         RETURN(rc);
432
433                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
434                 cd.last_idx = 0;
435                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
436                                           &d, &cd);
437         } else {
438                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
439                                           &d, NULL);
440         }
441
442         RETURN(rc);
443 }
444 EXPORT_SYMBOL(llog_cat_reverse_process);
445
446 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
447 {
448         struct llog_log_hdr *llh = cathandle->lgh_hdr;
449         int i, bitmap_size, idx;
450         ENTRY;
451
452         bitmap_size = sizeof(llh->llh_bitmap) * 8;
453         if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
454                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
455                 llh->llh_cat_idx = cpu_to_le32(idx);
456                 if (idx == cathandle->lgh_last_idx)
457                         goto out;
458                 for (i = (index + 1) % bitmap_size;
459                      i != cathandle->lgh_last_idx;
460                      i = (i + 1) % bitmap_size) {
461                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
462                                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
463                                 llh->llh_cat_idx = cpu_to_le32(idx);
464                         } else if (i == 0) {
465                                 llh->llh_cat_idx = 0;
466                         } else {
467                                 break;
468                         }
469                 }
470 out:
471                 CDEBUG(D_HA, "set catalog "LPX64" first idx %u\n",
472                        cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
473         }
474
475         RETURN(0);
476 }
477 EXPORT_SYMBOL(llog_cat_set_first_idx);
478
479 int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
480                      void *buf, struct llog_cookie *logcookies, 
481                      int numcookies, void *data)
482 {
483         struct llog_handle *cathandle;
484         int rc;
485         ENTRY;
486         
487         cathandle = ctxt->loc_handle;
488         LASSERT(cathandle != NULL);
489         
490         rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
491         if (rc != 1)
492                 CERROR("write one catalog record failed: %d\n", rc);
493         RETURN(rc);
494 }
495 EXPORT_SYMBOL(llog_catalog_add);
496
497 int llog_catalog_cancel(struct llog_ctxt *ctxt, int count,
498                         struct llog_cookie *cookies, int flags, void *data)
499 {
500         struct llog_handle *cathandle;
501         int rc;
502         ENTRY;
503
504         if (cookies == NULL || count == 0)
505                 RETURN(-EINVAL);
506         cathandle = ctxt->loc_handle;
507         LASSERT(cathandle != NULL);
508         rc = llog_cat_cancel_records(cathandle, count, cookies);
509         RETURN(rc);
510 }
511 EXPORT_SYMBOL(llog_catalog_cancel);
512
513 int llog_catalog_setup(struct llog_ctxt **res, char *name,
514                        struct lvfs_run_ctxt *lvfs_ctxt,
515                        struct fsfilt_operations *fsops,
516                        struct dentry *logs_de, 
517                        struct dentry *objects_de)
518 {
519         struct llog_ctxt *ctxt;
520         struct llog_catid catid;
521         struct llog_handle *handle;
522         int rc;
523         
524         ENTRY;
525
526         OBD_ALLOC(ctxt, sizeof(*ctxt));
527         if (!ctxt)
528                 RETURN(-ENOMEM);
529
530         *res = ctxt;
531
532         ctxt->loc_fsops = fsops;
533         ctxt->loc_lvfs_ctxt = lvfs_ctxt;
534         ctxt->loc_logs_dir = logs_de;
535         ctxt->loc_objects_dir = objects_de;
536         ctxt->loc_logops = &llog_lvfs_ops; 
537         ctxt->loc_logops->lop_add = llog_catalog_add;
538         ctxt->loc_logops->lop_cancel = llog_catalog_cancel;
539
540         memset(&catid, 0, sizeof(struct llog_catid));
541         rc = llog_get_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
542         if (rc) {
543                 CERROR("error llog_get_cat_list rc: %d\n", rc);
544                 RETURN(rc);
545         }
546         if (catid.lci_logid.lgl_oid)
547                 rc = llog_create(ctxt, &handle, &catid.lci_logid, 0);
548         else {
549                 rc = llog_create(ctxt, &handle, NULL, NULL);
550                 if (!rc)
551                         catid.lci_logid = handle->lgh_id;
552         }
553         if (rc)
554                 GOTO(out, rc);
555
556         ctxt->loc_handle = handle;
557         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
558         if (rc)
559                 GOTO(out, rc);
560
561         rc = llog_put_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
562         if (rc)
563                 CERROR("error llog_get_cat_list rc: %d\n", rc);
564 out:
565         if (ctxt && rc)
566                 OBD_FREE(ctxt, sizeof(*ctxt));
567         RETURN(rc);
568 }
569 EXPORT_SYMBOL(llog_catalog_setup);
570
571 int llog_catalog_cleanup(struct llog_ctxt *ctxt)
572 {
573         struct llog_handle *cathandle, *n, *loghandle;
574         struct llog_log_hdr *llh;
575         int rc, index;
576         ENTRY;
577                                                                                                                              
578         if (!ctxt)
579                 return 0;
580
581         cathandle = ctxt->loc_handle;
582         if (cathandle) {
583                 list_for_each_entry_safe(loghandle, n,
584                                          &cathandle->u.chd.chd_head,
585                                          u.phd.phd_entry) {
586                         llh = loghandle->lgh_hdr;
587                         if ((le32_to_cpu(llh->llh_flags) &
588                                 LLOG_F_ZAP_WHEN_EMPTY) &&
589                             (le32_to_cpu(llh->llh_count) == 1)) {
590                                 rc = llog_destroy(loghandle);
591                                 if (rc)
592                                         CERROR("failure destroying log during "
593                                                "cleanup: %d\n", rc);
594                                 LASSERT(rc == 0);
595                                 index = loghandle->u.phd.phd_cookie.lgc_index;
596                                 llog_free_handle(loghandle);
597                                                                                                                              
598                                 LASSERT(index);
599                                 llog_cat_set_first_idx(cathandle, index);
600                                 rc = llog_cancel_rec(cathandle, index);
601                                 if (rc == 0)
602                                         CDEBUG(D_HA, "cancel plain log at index"
603                                                " %u of catalog "LPX64"\n",
604                                                index,cathandle->lgh_id.lgl_oid);
605                         }
606                 }
607                 llog_cat_put(ctxt->loc_handle);
608         }
609         OBD_FREE(ctxt, sizeof(*ctxt));
610         return 0;
611 }
612 EXPORT_SYMBOL(llog_catalog_cleanup);
613
614 int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
615 {
616         struct llog_handle *loghandle;
617         struct llog_logid *lgl = &cookie->lgc_lgl;
618         int rc;
619                                                                                                                              
620         down_read(&handle->lgh_lock);
621         rc = llog_cat_id2handle(handle, &loghandle, lgl);
622         if (rc)
623                 GOTO(out, rc);
624         if (2 * loghandle->lgh_hdr->llh_cat_idx <=
625             handle->lgh_last_idx + handle->lgh_hdr->llh_cat_idx + 1)
626                 rc = 1;
627         else
628                 rc = 0;
629 out:
630         up_read(&handle->lgh_lock);
631         RETURN(rc);
632 }
633 EXPORT_SYMBOL(llog_cat_half_bottom);