Whamcloud - gitweb
b=3550
[fs/lustre-release.git] / lustre / lvfs / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lustre_log.h>
42 #include <portals/list.h>
43
44 /* Create a new log handle and add it to the open list.
45  * This log handle will be closed when all of the records in it are removed.
46  *
47  * Assumes caller has already pushed us into the kernel context and is locking.
48  */
49 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
50                                             struct llog_cookie *logcookie)
51 {
52         struct llog_handle *loghandle;
53         struct llog_log_hdr *llh;
54         struct llog_logid_rec rec;
55         int rc, index, bitmap_size;
56         ENTRY;
57
58         llh = cathandle->lgh_hdr;
59         bitmap_size = sizeof(llh->llh_bitmap) * 8;
60
61         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
62
63         /* maximum number of available slots in catalog is bitmap_size - 2 */
64         if (llh->llh_cat_idx == cpu_to_le32(index)) {
65                 CERROR("no free catalog slots for log...\n");
66                 RETURN(ERR_PTR(-ENOSPC));
67         } else {
68                 if (index == 0)
69                         index = 1;
70                 if (ext2_set_bit(index, llh->llh_bitmap)) {
71                         CERROR("argh, index %u already set in log bitmap?\n",
72                                index);
73                         LBUG(); /* should never happen */
74                 }
75                 cathandle->lgh_last_idx = index;
76                 llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
77                 llh->llh_tail.lrt_index = cpu_to_le32(index);
78         }
79
80         if (logcookie && llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
81                 rc = llog_open(cathandle->lgh_ctxt, &loghandle,
82                                &logcookie->lgc_lgl, NULL, OBD_LLOG_FL_CREATE);
83         else
84                 rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
85                                OBD_LLOG_FL_CREATE);
86         if (rc) {
87                 CERROR("cannot create new log, error = %d\n", rc);
88                 RETURN(ERR_PTR(rc));
89         }
90
91         rc = llog_init_handle(loghandle,
92                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
93                               &cathandle->lgh_hdr->llh_tgtuuid);
94         if (rc)
95                 GOTO(out_destroy, rc);
96
97         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
98                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
99                index, cathandle->lgh_id.lgl_oid);
100         /* build the record for this log in the catalog */
101         rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
102         rec.lid_hdr.lrh_index = cpu_to_le32(index);
103         rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
104         rec.lid_id = loghandle->lgh_id;
105         rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
106         rec.lid_tail.lrt_index = cpu_to_le32(index);
107
108         /* update the catalog: header and record */
109         rc = llog_write_rec(cathandle, &rec.lid_hdr,
110                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
111         if (rc < 0)
112                 GOTO(out_destroy, rc);
113
114         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
115         cathandle->u.chd.chd_current_log = loghandle;
116         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
117         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
118
119  out_destroy:
120         if (rc < 0) 
121                 llog_destroy(loghandle);
122         else if (logcookie) {
123                 if (llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
124                         LASSERT(EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl));
125                 else
126                         llog_cookie_set_flags(logcookie, LLOG_COOKIE_REPLAY_NEW);
127         }
128
129         RETURN(loghandle);
130 }
131
132 /* Open an existent log handle and add it to the open list.
133  * This log handle will be closed when all of the records in it are removed.
134  *
135  * Assumes caller has already pushed us into the kernel context and is locking.
136  * We return a lock on the handle to ensure nobody yanks it from us.
137  */
138 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
139                               struct llog_logid *logid)
140 {
141         struct llog_handle *loghandle;
142         int rc = 0;
143         ENTRY;
144
145         if (cathandle == NULL)
146                 RETURN(-EBADF);
147
148         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
149                             u.phd.phd_entry) {
150                 struct llog_logid *cgl = &loghandle->lgh_id;
151                 if (cgl->lgl_oid == logid->lgl_oid) {
152                         if (cgl->lgl_ogen != logid->lgl_ogen) {
153                                 CERROR("log "LPX64" generation %x != %x\n",
154                                        logid->lgl_oid, cgl->lgl_ogen,
155                                        logid->lgl_ogen);
156                                 continue;
157                         }
158                         loghandle->u.phd.phd_cat_handle = cathandle;
159                         GOTO(out, rc = 0);
160                 }
161         }
162
163         rc = llog_open(cathandle->lgh_ctxt, &loghandle, logid, NULL, 0);
164         if (rc) {
165                 CERROR("error opening log id "LPX64":%x: rc %d\n",
166                        logid->lgl_oid, logid->lgl_ogen, rc);
167         } else {
168                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
169                 if (!rc) {
170                         list_add(&loghandle->u.phd.phd_entry,
171                                  &cathandle->u.chd.chd_head);
172                 }
173         }
174         if (!rc) {
175                 loghandle->u.phd.phd_cat_handle = cathandle;
176                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
177                 loghandle->u.phd.phd_cookie.lgc_index =
178                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
179         }
180
181 out:
182         *res = loghandle;
183         RETURN(rc);
184 }
185 EXPORT_SYMBOL(llog_cat_id2handle);
186
187 int llog_cat_put(struct llog_handle *cathandle)
188 {
189         struct llog_handle *loghandle, *n;
190         int rc;
191         ENTRY;
192
193         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
194                                  u.phd.phd_entry) {
195                 int err = llog_close(loghandle);
196                 if (err)
197                         CERROR("error closing loghandle\n");
198         }
199         rc = llog_close(cathandle);
200         RETURN(rc);
201 }
202 EXPORT_SYMBOL(llog_cat_put);
203
204 /* Return the currently active log handle.  If the current log handle doesn't
205  * have enough space left for the current record, start a new one.
206  *
207  * If reclen is 0, we only want to know what the currently active log is,
208  * otherwise we get a lock on this log so nobody can steal our space.
209  *
210  * Assumes caller has already pushed us into the kernel context and is locking.
211  *
212  * NOTE: loghandle is write-locked upon successful return
213  */
214 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
215                                                 int create,
216                                                 struct llog_cookie *logcookie,
217                                                 struct rw_semaphore **lock)
218 {
219         struct llog_handle *loghandle = NULL;
220         ENTRY;
221
222         down_read(&cathandle->lgh_lock);
223         loghandle = cathandle->u.chd.chd_current_log;
224         if (loghandle) {
225                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
226                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1 &&
227                     (!logcookie ||
228                      !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
229                      EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
230                         down_write(&loghandle->lgh_lock);
231                         up_read(&cathandle->lgh_lock);
232                         RETURN(loghandle);
233                 }
234         }
235
236         LASSERT(!logcookie ||
237                 !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
238                 llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW);
239
240         if (!create) {
241                 if (loghandle)
242                         down_write(&loghandle->lgh_lock);
243                 up_read(&cathandle->lgh_lock);
244                 RETURN(loghandle);
245         }
246         up_read(&cathandle->lgh_lock);
247
248         /* time to create new log */
249
250         /* first, we have to make sure the state hasn't changed */
251         down_write(&cathandle->lgh_lock);
252         loghandle = cathandle->u.chd.chd_current_log;
253         if (loghandle) {
254                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
255                 if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1 &&
256                     (!logcookie ||
257                      !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
258                      EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
259                         down_write(&loghandle->lgh_lock);
260                         up_write(&cathandle->lgh_lock);
261                         RETURN(loghandle);
262                 }
263         }
264
265         CDEBUG(D_INODE, "creating new log\n");
266         loghandle = llog_cat_new_log(cathandle, logcookie);
267         if (!IS_ERR(loghandle)) {
268                 down_write(&loghandle->lgh_lock);
269                 if (lock != NULL)
270                         *lock = &loghandle->lgh_lock;
271         }
272
273         up_write(&cathandle->lgh_lock);
274         RETURN(loghandle);
275 }
276
277 /* Add a single record to the recovery log(s) using a catalog
278  * Returns as llog_write_record
279  *
280  * Assumes caller has already pushed us into the kernel context.
281  */
282 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
283                      struct llog_cookie *reccookie, void *buf,
284                      struct rw_semaphore **lock, int *lock_count)
285 {
286         struct llog_handle *loghandle;
287         int rc;
288         ENTRY;
289
290         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
291         loghandle = llog_cat_current_log(cathandle, 1, reccookie, lock);
292         if (IS_ERR(loghandle))
293                 RETURN(PTR_ERR(loghandle));
294         /* loghandle is already locked by llog_cat_current_log() for us */
295         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
296         if (!lock || *lock == NULL) {
297                 up_write(&loghandle->lgh_lock);
298         } else {
299                 LASSERT(lock_count != NULL);
300                 *lock_count += 1;
301         }
302
303         RETURN(rc);
304 }
305 EXPORT_SYMBOL(llog_cat_add_rec);
306
307 /* For each cookie in the cookie array, we clear the log in-use bit and either:
308  * - the log is empty, so mark it free in the catalog header and delete it
309  * - the log is not empty, just write out the log header
310  *
311  * The cookies may be in different log files, so we need to get new logs
312  * each time.
313  *
314  * Assumes caller has already pushed us into the kernel context.
315  */
316 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
317                             struct llog_cookie *cookies)
318 {
319         int i, index, rc = 0;
320         ENTRY;
321
322         down_write(&cathandle->lgh_lock);
323         for (i = 0; i < count; i++, cookies++) {
324                 struct llog_handle *loghandle;
325                 struct llog_logid *lgl = &cookies->lgc_lgl;
326
327                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
328                 if (rc) {
329                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
330                         break;
331                 }
332
333                 down_write(&loghandle->lgh_lock);
334                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
335                 up_write(&loghandle->lgh_lock);
336
337                 if (rc == 1) {          /* log has been destroyed */
338                         index = loghandle->u.phd.phd_cookie.lgc_index;
339                         if (cathandle->u.chd.chd_current_log == loghandle)
340                                 cathandle->u.chd.chd_current_log = NULL;
341                         llog_free_handle(loghandle);
342
343                         LASSERT(index);
344                         llog_cat_set_first_idx(cathandle, index);
345                         rc = llog_cancel_rec(cathandle, index);
346                         if (rc == 0)
347                                 CDEBUG(D_HA, "cancel plain log at index %u "
348                                        "of catalog "LPX64"\n",
349                                        index, cathandle->lgh_id.lgl_oid);
350                 }
351         }
352         up_write(&cathandle->lgh_lock);
353
354         RETURN(rc);
355 }
356 EXPORT_SYMBOL(llog_cat_cancel_records);
357
358 static int llog_cat_process_cb(struct llog_handle *cat_llh, 
359                                struct llog_rec_hdr *rec, void *data)
360 {
361         struct llog_process_data *d = data;
362         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
363         struct llog_handle *llh;
364         int rc;
365
366         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
367                 CERROR("invalid record in catalog\n");
368                 RETURN(-EINVAL);
369         }
370         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
371                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
372                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
373
374         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
375         if (rc) {
376                 CERROR("Cannot find handle for log "LPX64"\n",
377                        lir->lid_id.lgl_oid);
378                 RETURN(rc);
379         }
380
381         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
382         RETURN(rc);
383 }
384
385 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
386 {
387         struct llog_process_data d;
388         struct llog_process_cat_data cd;
389         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
390         int rc;
391         ENTRY;
392
393         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
394         d.lpd_data = data;
395         d.lpd_cb = cb;
396
397         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
398                 CWARN("catalog "LPX64" crosses index zero\n",
399                       cat_llh->lgh_id.lgl_oid);
400
401                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
402                 cd.last_idx = 0;
403                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
404                 if (rc != 0)
405                         RETURN(rc);
406
407                 cd.first_idx = 0;
408                 cd.last_idx = cat_llh->lgh_last_idx;
409                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
410         } else {
411                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
412         }
413
414         RETURN(rc);
415 }
416 EXPORT_SYMBOL(llog_cat_process);
417
418 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, 
419                                        struct llog_rec_hdr *rec, void *data)
420 {
421         struct llog_process_data *d = data;
422         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
423         struct llog_handle *llh;
424         int rc;
425
426         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
427                 CERROR("invalid record in catalog\n");
428                 RETURN(-EINVAL);
429         }
430         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
431                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
432                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
433
434         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
435         if (rc) {
436                 CERROR("Cannot find handle for log "LPX64"\n",
437                        lir->lid_id.lgl_oid);
438                 RETURN(rc);
439         }
440
441         rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
442         RETURN(rc);
443 }
444
445 int llog_cat_reverse_process(struct llog_handle *cat_llh,
446                              llog_cb_t cb, void *data)
447 {
448         struct llog_process_data d;
449         struct llog_process_cat_data cd;
450         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
451         int rc;
452         ENTRY;
453
454         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
455         d.lpd_data = data;
456         d.lpd_cb = cb;
457
458         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
459                 CWARN("catalog "LPX64" crosses index zero\n",
460                       cat_llh->lgh_id.lgl_oid);
461
462                 cd.first_idx = 0;
463                 cd.last_idx = cat_llh->lgh_last_idx;
464                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
465                                           &d, &cd);
466                 if (rc != 0)
467                         RETURN(rc);
468
469                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
470                 cd.last_idx = 0;
471                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
472                                           &d, &cd);
473         } else {
474                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
475                                           &d, NULL);
476         }
477
478         RETURN(rc);
479 }
480 EXPORT_SYMBOL(llog_cat_reverse_process);
481
482 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
483 {
484         struct llog_log_hdr *llh = cathandle->lgh_hdr;
485         int i, bitmap_size, idx;
486         ENTRY;
487
488         bitmap_size = sizeof(llh->llh_bitmap) * 8;
489         if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
490                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
491                 llh->llh_cat_idx = cpu_to_le32(idx);
492                 if (idx == cathandle->lgh_last_idx)
493                         goto out;
494                 for (i = (index + 1) % bitmap_size;
495                      i != cathandle->lgh_last_idx;
496                      i = (i + 1) % bitmap_size) {
497                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
498                                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
499                                 llh->llh_cat_idx = cpu_to_le32(idx);
500                         } else if (i == 0) {
501                                 llh->llh_cat_idx = 0;
502                         } else {
503                                 break;
504                         }
505                 }
506 out:
507                 CDEBUG(D_HA, "set catalog "LPX64" first idx %u\n",
508                        cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
509         }
510
511         RETURN(0);
512 }
513 EXPORT_SYMBOL(llog_cat_set_first_idx);
514
515 int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
516                      void *buf, struct llog_cookie *logcookies, 
517                      int numcookies, void *data, 
518                      struct rw_semaphore **lock, int *lock_count)
519 {
520         struct llog_handle *cathandle;
521         int rc;
522         ENTRY;
523         
524         cathandle = ctxt->loc_handle;
525         LASSERT(cathandle != NULL);
526         
527         rc = llog_cat_add_rec(cathandle, rec, logcookies, buf, lock, lock_count);
528         if (rc != 1)
529                 CERROR("write one catalog record failed: %d\n", rc);
530         RETURN(rc);
531 }
532 EXPORT_SYMBOL(llog_catalog_add);
533
534 int llog_catalog_cancel(struct llog_ctxt *ctxt, int count,
535                         struct llog_cookie *cookies, int flags, void *data)
536 {
537         struct llog_handle *cathandle;
538         int rc;
539         ENTRY;
540
541         if (cookies == NULL || count == 0)
542                 RETURN(-EINVAL);
543         cathandle = ctxt->loc_handle;
544         LASSERT(cathandle != NULL);
545         rc = llog_cat_cancel_records(cathandle, count, cookies);
546         RETURN(rc);
547 }
548 EXPORT_SYMBOL(llog_catalog_cancel);
549
550 int llog_catalog_setup(struct llog_ctxt **res, char *name,
551                        struct obd_export *exp, 
552                        struct lvfs_run_ctxt *lvfs_ctxt,
553                        struct fsfilt_operations *fsops,
554                        struct dentry *logs_de, 
555                        struct dentry *objects_de)
556 {
557         struct llog_ctxt *ctxt;
558         struct llog_catid catid;
559         struct llog_handle *handle;
560         int rc;
561         
562         ENTRY;
563
564         OBD_ALLOC(ctxt, sizeof(*ctxt));
565         if (!ctxt)
566                 RETURN(-ENOMEM);
567
568         *res = ctxt;
569
570         /* marking this ctxt alone. */
571         ctxt->loc_alone = 1;
572         ctxt->loc_fsops = fsops;
573         ctxt->loc_lvfs_ctxt = lvfs_ctxt;
574         ctxt->loc_exp = exp;
575         ctxt->loc_logs_dir = logs_de;
576         ctxt->loc_objects_dir = objects_de;
577         ctxt->loc_logops = &llog_lvfs_ops; 
578         ctxt->loc_logops->lop_add = llog_catalog_add;
579         ctxt->loc_logops->lop_cancel = llog_catalog_cancel;
580
581         memset(&catid, 0, sizeof(struct llog_catid));
582         rc = llog_get_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
583         if (rc) {
584                 CERROR("error llog_get_cat_list rc: %d\n", rc);
585                 RETURN(rc);
586         }
587         if (catid.lci_logid.lgl_oid)
588                 rc = llog_open(ctxt, &handle, &catid.lci_logid, NULL,
589                                OBD_LLOG_FL_CREATE);
590         else {
591                 rc = llog_open(ctxt, &handle, NULL, NULL, OBD_LLOG_FL_CREATE);
592                 if (!rc)
593                         catid.lci_logid = handle->lgh_id;
594         }
595         if (rc)
596                 GOTO(out, rc);
597
598         ctxt->loc_handle = handle;
599         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
600         if (rc)
601                 GOTO(out, rc);
602
603         rc = llog_put_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
604         if (rc)
605                 CERROR("error llog_get_cat_list rc: %d\n", rc);
606 out:
607         if (ctxt && rc)
608                 OBD_FREE(ctxt, sizeof(*ctxt));
609         RETURN(rc);
610 }
611 EXPORT_SYMBOL(llog_catalog_setup);
612
613 int llog_catalog_cleanup(struct llog_ctxt *ctxt)
614 {
615         struct llog_handle *cathandle;
616         ENTRY;
617
618         if (!ctxt)
619                 return 0;
620
621         cathandle = ctxt->loc_handle;
622         if (cathandle)
623                 llog_cat_put(ctxt->loc_handle);
624  
625 //        OBD_FREE(ctxt, sizeof(*ctxt));
626         return 0;
627 }
628 EXPORT_SYMBOL(llog_catalog_cleanup);
629
630 int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
631 {
632         struct llog_handle *loghandle;
633         struct llog_logid *lgl = &cookie->lgc_lgl;
634         int rc;
635
636         down_read(&handle->lgh_lock);
637         rc = llog_cat_id2handle(handle, &loghandle, lgl);
638         if (rc)
639                 GOTO(out, rc);
640         if (2 * loghandle->lgh_hdr->llh_cat_idx <=
641             handle->lgh_last_idx + handle->lgh_hdr->llh_cat_idx + 1)
642                 rc = 1;
643         else
644                 rc = 0;
645 out:
646         up_read(&handle->lgh_lock);
647         RETURN(rc);
648 }
649 EXPORT_SYMBOL(llog_cat_half_bottom);