Whamcloud - gitweb
Allow GSS password to be passed to the test-framework in $GSS_PASS.
[fs/lustre-release.git] / lustre / lvfs / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lustre_log.h>
42 #include <libcfs/list.h>
43
44 /* Create a new log handle and add it to the open list.
45  * This log handle will be closed when all of the records in it are removed.
46  *
47  * Assumes caller has already pushed us into the kernel context and is locking.
48  */
49 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
50                                             struct llog_cookie *logcookie)
51 {
52         struct llog_handle *loghandle;
53         struct llog_log_hdr *llh;
54         struct llog_logid_rec rec;
55         int rc, index, bitmap_size;
56         ENTRY;
57
58         llh = cathandle->lgh_hdr;
59         bitmap_size = LLOG_BITMAP_SIZE(llh);
60
61         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
62
63         /* maximum number of available slots in catalog is bitmap_size - 2 */
64         if (llh->llh_cat_idx == cpu_to_le32(index)) {
65                 CERROR("no free catalog slots for log...\n");
66                 RETURN(ERR_PTR(-ENOSPC));
67         } else {
68                 if (index == 0)
69                         index = 1;
70                 if (ext2_set_bit(index, llh->llh_bitmap)) {
71                         CERROR("argh, index %u already set in log bitmap?\n",
72                                index);
73                         LBUG(); /* should never happen */
74                 }
75                 cathandle->lgh_last_idx = index;
76                 llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
77                 llh->llh_tail.lrt_index = cpu_to_le32(index);
78         }
79
80         if (logcookie && llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
81                 rc = llog_open(cathandle->lgh_ctxt, &loghandle,
82                                &logcookie->lgc_lgl, NULL, OBD_LLOG_FL_CREATE);
83         else
84                 rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
85                                OBD_LLOG_FL_CREATE);
86         if (rc) {
87                 CERROR("cannot create new log, error = %d\n", rc);
88                 RETURN(ERR_PTR(rc));
89         }
90
91         rc = llog_init_handle(loghandle,
92                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
93                               &cathandle->lgh_hdr->llh_tgtuuid);
94         if (rc)
95                 GOTO(out_destroy, rc);
96
97         CDEBUG(D_HA, "new recovery log "LPX64":%x for index %u of catalog "
98                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
99                index, cathandle->lgh_id.lgl_oid);
100         /* build the record for this log in the catalog */
101         rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
102         rec.lid_hdr.lrh_index = cpu_to_le32(index);
103         rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
104         rec.lid_id = loghandle->lgh_id;
105         rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
106         rec.lid_tail.lrt_index = cpu_to_le32(index);
107
108         /* update the catalog: header and record */
109         rc = llog_write_rec(cathandle, &rec.lid_hdr,
110                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
111         if (rc < 0)
112                 GOTO(out_destroy, rc);
113
114         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
115         cathandle->u.chd.chd_current_log = loghandle;
116         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
117         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
118
119  out_destroy:
120         if (rc < 0) 
121                 llog_destroy(loghandle);
122         else if (logcookie) {
123                 if (llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
124                         LASSERT(EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl));
125                 else
126                         llog_cookie_set_flags(logcookie, LLOG_COOKIE_REPLAY_NEW);
127         }
128
129         RETURN(loghandle);
130 }
131
132 /* Open an existent log handle and add it to the open list.
133  * This log handle will be closed when all of the records in it are removed.
134  *
135  * Assumes caller has already pushed us into the kernel context and is locking.
136  * We return a lock on the handle to ensure nobody yanks it from us.
137  */
138 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
139                               struct llog_logid *logid)
140 {
141         struct llog_handle *loghandle;
142         int rc = 0;
143         ENTRY;
144
145         if (cathandle == NULL)
146                 RETURN(-EBADF);
147
148         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
149                             u.phd.phd_entry) {
150                 struct llog_logid *cgl = &loghandle->lgh_id;
151                 if (cgl->lgl_oid == logid->lgl_oid) {
152                         if (cgl->lgl_ogen != logid->lgl_ogen) {
153                                 CERROR("log "LPX64" generation %x != %x\n",
154                                        logid->lgl_oid, cgl->lgl_ogen,
155                                        logid->lgl_ogen);
156                                 continue;
157                         }
158                         loghandle->u.phd.phd_cat_handle = cathandle;
159                         GOTO(out, rc = 0);
160                 }
161         }
162
163         rc = llog_open(cathandle->lgh_ctxt, &loghandle, logid, NULL, 0);
164         if (rc) {
165                 CERROR("error opening log id "LPX64":%x: rc %d\n",
166                        logid->lgl_oid, logid->lgl_ogen, rc);
167         } else {
168                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
169                 if (!rc) {
170                         list_add(&loghandle->u.phd.phd_entry,
171                                  &cathandle->u.chd.chd_head);
172                 }
173         }
174         if (!rc) {
175                 loghandle->u.phd.phd_cat_handle = cathandle;
176                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
177                 loghandle->u.phd.phd_cookie.lgc_index =
178                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
179         }
180
181 out:
182         *res = loghandle;
183         RETURN(rc);
184 }
185 EXPORT_SYMBOL(llog_cat_id2handle);
186
187 int llog_cat_put(struct llog_handle *cathandle)
188 {
189         struct llog_handle *loghandle, *n;
190         int rc;
191         ENTRY;
192
193         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
194                                  u.phd.phd_entry) {
195                 int err = llog_close(loghandle);
196                 if (err)
197                         CERROR("error closing loghandle\n");
198         }
199         rc = llog_close(cathandle);
200         RETURN(rc);
201 }
202 EXPORT_SYMBOL(llog_cat_put);
203
204 /* Return the currently active log handle.  If the current log handle doesn't
205  * have enough space left for the current record, start a new one.
206  *
207  * If reclen is 0, we only want to know what the currently active log is,
208  * otherwise we get a lock on this log so nobody can steal our space.
209  *
210  * Assumes caller has already pushed us into the kernel context and is locking.
211  *
212  * NOTE: loghandle is write-locked upon successful return
213  */
214 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
215                                                 int create,
216                                                 struct llog_cookie *logcookie,
217                                                 struct rw_semaphore **lock)
218 {
219         struct llog_handle *loghandle = NULL;
220         ENTRY;
221
222         down_read(&cathandle->lgh_lock);
223         loghandle = cathandle->u.chd.chd_current_log;
224         if (loghandle) {
225                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
226                 down_write(&loghandle->lgh_lock);
227                 if (loghandle->lgh_last_idx < (LLOG_BITMAP_SIZE(llh) - 1) &&
228                     (!logcookie ||
229                      !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
230                      EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
231                         up_read(&cathandle->lgh_lock);
232                         RETURN(loghandle);
233                 } else {
234                         up_write(&loghandle->lgh_lock);
235                 }
236         }
237
238         LASSERT(!logcookie ||
239                 !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
240                 llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW);
241
242         if (!create) {
243                 if (loghandle)
244                         down_write(&loghandle->lgh_lock);
245                 up_read(&cathandle->lgh_lock);
246                 RETURN(loghandle);
247         }
248         up_read(&cathandle->lgh_lock);
249
250         /* time to create new log */
251
252         /* first, we have to make sure the state hasn't changed */
253         down_write(&cathandle->lgh_lock);
254         loghandle = cathandle->u.chd.chd_current_log;
255         if (loghandle) {
256                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
257                 down_write(&loghandle->lgh_lock);
258                 if (loghandle->lgh_last_idx < (LLOG_BITMAP_SIZE(llh) - 1) &&
259                     (!logcookie ||
260                      !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
261                      EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
262                         up_write(&cathandle->lgh_lock);
263                         RETURN(loghandle);
264                 } else {
265                         up_write(&loghandle->lgh_lock);
266                 }
267         }
268
269         CDEBUG(D_INODE, "creating new log\n");
270         loghandle = llog_cat_new_log(cathandle, logcookie);
271         if (!IS_ERR(loghandle)) {
272                 down_write(&loghandle->lgh_lock);
273                 if (lock != NULL)
274                         *lock = &loghandle->lgh_lock;
275         }
276
277         up_write(&cathandle->lgh_lock);
278         RETURN(loghandle);
279 }
280
281 /* Add a single record to the recovery log(s) using a catalog
282  * Returns as llog_write_record
283  *
284  * Assumes caller has already pushed us into the kernel context.
285  */
286 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
287                      struct llog_cookie *reccookie, void *buf,
288                      struct rw_semaphore **lock, int *lock_count)
289 {
290         struct llog_handle *loghandle;
291         int rc;
292         ENTRY;
293
294         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
295         loghandle = llog_cat_current_log(cathandle, 1, reccookie, lock);
296         if (IS_ERR(loghandle))
297                 RETURN(PTR_ERR(loghandle));
298         /* loghandle is already locked by llog_cat_current_log() for us */
299         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
300         if (!lock || *lock == NULL) {
301                 up_write(&loghandle->lgh_lock);
302         } else {
303                 LASSERT(lock_count != NULL);
304                 *lock_count += 1;
305         }
306
307         RETURN(rc);
308 }
309 EXPORT_SYMBOL(llog_cat_add_rec);
310
311 /* For each cookie in the cookie array, we clear the log in-use bit and either:
312  * - the log is empty, so mark it free in the catalog header and delete it
313  * - the log is not empty, just write out the log header
314  *
315  * The cookies may be in different log files, so we need to get new logs
316  * each time.
317  *
318  * Assumes caller has already pushed us into the kernel context.
319  */
320 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
321                             struct llog_cookie *cookies)
322 {
323         int i, index, rc = 0;
324         ENTRY;
325
326         down_write(&cathandle->lgh_lock);
327         for (i = 0; i < count; i++, cookies++) {
328                 struct llog_handle *loghandle;
329                 struct llog_logid *lgl = &cookies->lgc_lgl;
330
331                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
332                 if (rc) {
333                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
334                         break;
335                 }
336
337                 down_write(&loghandle->lgh_lock);
338                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
339                 up_write(&loghandle->lgh_lock);
340
341                 if (rc == 1) {          /* log has been destroyed */
342                         index = loghandle->u.phd.phd_cookie.lgc_index;
343                         if (cathandle->u.chd.chd_current_log == loghandle)
344                                 cathandle->u.chd.chd_current_log = NULL;
345                         llog_free_handle(loghandle);
346
347                         LASSERT(index);
348                         llog_cat_set_first_idx(cathandle, index);
349                         rc = llog_cancel_rec(cathandle, index);
350                         if (rc == 0)
351                                 CDEBUG(D_HA, "cancel plain log at index %u "
352                                        "of catalog "LPX64"\n",
353                                        index, cathandle->lgh_id.lgl_oid);
354                 }
355         }
356         up_write(&cathandle->lgh_lock);
357
358         RETURN(rc);
359 }
360 EXPORT_SYMBOL(llog_cat_cancel_records);
361
362 static int llog_cat_process_cb(struct llog_handle *cat_llh, 
363                                struct llog_rec_hdr *rec, void *data)
364 {
365         struct llog_process_data *d = data;
366         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
367         struct llog_handle *llh;
368         int rc;
369
370         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
371                 CERROR("invalid record in catalog\n");
372                 RETURN(-EINVAL);
373         }
374         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
375                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
376                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
377
378         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
379         if (rc) {
380                 CERROR("Cannot find handle for log "LPX64"\n",
381                        lir->lid_id.lgl_oid);
382                 RETURN(rc);
383         }
384
385         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
386         RETURN(rc);
387 }
388
389 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
390 {
391         struct llog_process_data d;
392         struct llog_process_cat_data cd;
393         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
394         int rc;
395         ENTRY;
396
397         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
398         d.lpd_data = data;
399         d.lpd_cb = cb;
400
401         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
402                 CWARN("catalog "LPX64" crosses index zero\n",
403                       cat_llh->lgh_id.lgl_oid);
404
405                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
406                 cd.last_idx = 0;
407                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
408                 if (rc != 0)
409                         RETURN(rc);
410
411                 cd.first_idx = 0;
412                 cd.last_idx = cat_llh->lgh_last_idx;
413                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
414         } else {
415                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
416         }
417
418         RETURN(rc);
419 }
420 EXPORT_SYMBOL(llog_cat_process);
421
422 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, 
423                                        struct llog_rec_hdr *rec, void *data)
424 {
425         struct llog_process_data *d = data;
426         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
427         struct llog_handle *llh;
428         int rc;
429
430         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
431                 CERROR("invalid record in catalog\n");
432                 RETURN(-EINVAL);
433         }
434         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
435                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
436                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
437
438         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
439         if (rc) {
440                 CERROR("Cannot find handle for log "LPX64"\n",
441                        lir->lid_id.lgl_oid);
442                 RETURN(rc);
443         }
444
445         rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
446         RETURN(rc);
447 }
448
449 int llog_cat_reverse_process(struct llog_handle *cat_llh,
450                              llog_cb_t cb, void *data)
451 {
452         struct llog_process_data d;
453         struct llog_process_cat_data cd;
454         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
455         int rc;
456         ENTRY;
457
458         LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
459         d.lpd_data = data;
460         d.lpd_cb = cb;
461
462         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
463                 CWARN("catalog "LPX64" crosses index zero\n",
464                       cat_llh->lgh_id.lgl_oid);
465
466                 cd.first_idx = 0;
467                 cd.last_idx = cat_llh->lgh_last_idx;
468                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
469                                           &d, &cd);
470                 if (rc != 0)
471                         RETURN(rc);
472
473                 cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
474                 cd.last_idx = 0;
475                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
476                                           &d, &cd);
477         } else {
478                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
479                                           &d, NULL);
480         }
481
482         RETURN(rc);
483 }
484 EXPORT_SYMBOL(llog_cat_reverse_process);
485
486 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
487 {
488         struct llog_log_hdr *llh = cathandle->lgh_hdr;
489         int i, bitmap_size, idx;
490         ENTRY;
491
492         bitmap_size = LLOG_BITMAP_SIZE(llh);
493         if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
494                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
495                 llh->llh_cat_idx = cpu_to_le32(idx);
496                 if (idx == cathandle->lgh_last_idx)
497                         goto out;
498                 for (i = (index + 1) % bitmap_size;
499                      i != cathandle->lgh_last_idx;
500                      i = (i + 1) % bitmap_size) {
501                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
502                                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
503                                 llh->llh_cat_idx = cpu_to_le32(idx);
504                         } else if (i == 0) {
505                                 llh->llh_cat_idx = 0;
506                         } else {
507                                 break;
508                         }
509                 }
510 out:
511                 CDEBUG(D_HA, "set catalog "LPX64" first idx %u\n",
512                        cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
513         }
514
515         RETURN(0);
516 }
517 EXPORT_SYMBOL(llog_cat_set_first_idx);
518
519 int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
520                      void *buf, struct llog_cookie *logcookies, 
521                      int numcookies, void *data, 
522                      struct rw_semaphore **lock, int *lock_count)
523 {
524         struct llog_handle *cathandle;
525         int rc;
526         ENTRY;
527         
528         cathandle = ctxt->loc_handle;
529         LASSERT(cathandle != NULL);
530         
531         rc = llog_cat_add_rec(cathandle, rec, logcookies, buf, lock, lock_count);
532         if (rc != 1)
533                 CERROR("write one catalog record failed: %d\n", rc);
534         RETURN(rc);
535 }
536 EXPORT_SYMBOL(llog_catalog_add);
537
538 int llog_catalog_cancel(struct llog_ctxt *ctxt, int count,
539                         struct llog_cookie *cookies, int flags, void *data)
540 {
541         struct llog_handle *cathandle;
542         int rc;
543         ENTRY;
544
545         if (cookies == NULL || count == 0)
546                 RETURN(-EINVAL);
547         cathandle = ctxt->loc_handle;
548         LASSERT(cathandle != NULL);
549         rc = llog_cat_cancel_records(cathandle, count, cookies);
550         RETURN(rc);
551 }
552 EXPORT_SYMBOL(llog_catalog_cancel);
553
554 int llog_catalog_setup(struct llog_ctxt **res, char *name,
555                        struct obd_export *exp, 
556                        struct lvfs_run_ctxt *lvfs_ctxt,
557                        struct fsfilt_operations *fsops,
558                        struct dentry *logs_de, 
559                        struct dentry *objects_de)
560 {
561         struct llog_ctxt *ctxt;
562         struct llog_catid catid;
563         struct llog_handle *handle;
564         int rc;
565         
566         ENTRY;
567
568         OBD_ALLOC(ctxt, sizeof(*ctxt));
569         if (!ctxt)
570                 RETURN(-ENOMEM);
571
572         *res = ctxt;
573
574         /* marking this ctxt alone. */
575         ctxt->loc_alone = 1;
576         ctxt->loc_fsops = fsops;
577         ctxt->loc_lvfs_ctxt = lvfs_ctxt;
578         ctxt->loc_exp = exp;
579         ctxt->loc_logs_dir = logs_de;
580         ctxt->loc_objects_dir = objects_de;
581         ctxt->loc_logops = &llog_lvfs_ops; 
582         ctxt->loc_logops->lop_add = llog_catalog_add;
583         ctxt->loc_logops->lop_cancel = llog_catalog_cancel;
584
585         memset(&catid, 0, sizeof(struct llog_catid));
586         rc = llog_get_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
587         if (rc) {
588                 CERROR("error llog_get_cat_list rc: %d\n", rc);
589                 RETURN(rc);
590         }
591         if (catid.lci_logid.lgl_oid)
592                 rc = llog_open(ctxt, &handle, &catid.lci_logid, NULL,
593                                OBD_LLOG_FL_CREATE);
594         else {
595                 rc = llog_open(ctxt, &handle, NULL, NULL, OBD_LLOG_FL_CREATE);
596                 if (!rc)
597                         catid.lci_logid = handle->lgh_id;
598         }
599         if (rc)
600                 GOTO(out, rc);
601
602         ctxt->loc_handle = handle;
603         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
604         if (rc)
605                 GOTO(out, rc);
606
607         rc = llog_put_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
608         if (rc)
609                 CERROR("error llog_get_cat_list rc: %d\n", rc);
610 out:
611         if (ctxt && rc)
612                 OBD_FREE(ctxt, sizeof(*ctxt));
613         RETURN(rc);
614 }
615 EXPORT_SYMBOL(llog_catalog_setup);
616
617 int llog_catalog_cleanup(struct llog_ctxt *ctxt)
618 {
619         struct llog_handle *cathandle;
620         ENTRY;
621
622         if (!ctxt)
623                 return 0;
624
625         cathandle = ctxt->loc_handle;
626         if (cathandle)
627                 llog_cat_put(ctxt->loc_handle);
628
629         return 0;
630 }
631 EXPORT_SYMBOL(llog_catalog_cleanup);
632
633 int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
634 {
635         struct llog_handle *loghandle;
636         struct llog_logid *lgl = &cookie->lgc_lgl;
637         int rc;
638
639         down_read(&handle->lgh_lock);
640         rc = llog_cat_id2handle(handle, &loghandle, lgl);
641         if (rc)
642                 GOTO(out, rc);
643         if (2 * loghandle->lgh_hdr->llh_cat_idx <=
644             handle->lgh_last_idx + handle->lgh_hdr->llh_cat_idx + 1)
645                 rc = 1;
646         else
647                 rc = 0;
648 out:
649         up_read(&handle->lgh_lock);
650         RETURN(rc);
651 }
652 EXPORT_SYMBOL(llog_cat_half_bottom);