Whamcloud - gitweb
1d2101c1ac6ab7e6ea34df31b5cb79cb05c22c4c
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_cat.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  *
40  * Invariants in implementation:
41  * - we do not share logs among different OST<->MDS connections, so that
42  *   if an OST or MDS fails it need only look at log(s) relevant to itself
43  *
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  */
46
47 #define DEBUG_SUBSYSTEM S_LOG
48
49 #ifndef EXPORT_SYMTAB
50 #define EXPORT_SYMTAB
51 #endif
52
53 #ifndef __KERNEL__
54 #include <liblustre.h>
55 #endif
56
57 #include <obd_class.h>
58 #include <lustre_log.h>
59 #include <libcfs/list.h>
60
61 /* Create a new log handle and add it to the open list.
62  * This log handle will be closed when all of the records in it are removed.
63  *
64  * Assumes caller has already pushed us into the kernel context and is locking.
65  */
66 static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
67                                             struct llog_logid *lid)
68 {
69         struct llog_handle *loghandle;
70         struct llog_log_hdr *llh;
71         struct llog_logid_rec rec = { { 0 }, };
72         int rc, index, bitmap_size;
73         ENTRY;
74
75         llh = cathandle->lgh_hdr;
76         bitmap_size = LLOG_BITMAP_SIZE(llh);
77
78         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
79
80         /* maximum number of available slots in catlog is bitmap_size - 2 */
81         if (llh->llh_cat_idx == index) {
82                 CERROR("no free catalog slots for log...\n");
83                 RETURN(ERR_PTR(-ENOSPC));
84         }
85
86         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
87                 RETURN(ERR_PTR(-ENOSPC));
88
89         if (lid != NULL && lid->lgl_oid == 0)
90                 lid = NULL;
91
92         rc = llog_create(cathandle->lgh_ctxt, &loghandle, lid, NULL);
93         if (rc)
94                 RETURN(ERR_PTR(rc));
95
96         rc = llog_init_handle(loghandle,
97                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
98                               &cathandle->lgh_hdr->llh_tgtuuid);
99         if (rc)
100                 GOTO(out_destroy, rc);
101
102         if (index == 0)
103                 index = 1;
104         if (ext2_set_bit(index, llh->llh_bitmap)) {
105                 CERROR("argh, index %u already set in log bitmap?\n",
106                        index);
107                 LBUG(); /* should never happen */
108         }
109         cathandle->lgh_last_idx = index;
110         llh->llh_count++;
111         llh->llh_tail.lrt_index = index;
112
113         CDEBUG(D_RPCTRACE,"new recovery log "LPX64":%x for index %u of catalog "
114                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
115                index, cathandle->lgh_id.lgl_oid);
116         /* build the record for this log in the catalog */
117         rec.lid_hdr.lrh_len = sizeof(rec);
118         rec.lid_hdr.lrh_index = index;
119         rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
120         rec.lid_id = loghandle->lgh_id;
121         rec.lid_tail.lrt_len = sizeof(rec);
122         rec.lid_tail.lrt_index = index;
123
124         /* update the catalog: header and record */
125         rc = llog_write_rec(cathandle, &rec.lid_hdr,
126                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
127         if (rc < 0) {
128                 GOTO(out_destroy, rc);
129         }
130
131         loghandle->lgh_hdr->llh_cat_idx = index;
132         cathandle->u.chd.chd_current_log = loghandle;
133         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
134         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
135
136 out_destroy:
137         if (rc < 0)
138                 llog_destroy(loghandle);
139
140         RETURN(loghandle);
141 }
142
143 /* Open an existent log handle and add it to the open list.
144  * This log handle will be closed when all of the records in it are removed.
145  *
146  * Assumes caller has already pushed us into the kernel context and is locking.
147  * We return a lock on the handle to ensure nobody yanks it from us.
148  */
149 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
150                        struct llog_logid *logid)
151 {
152         struct llog_handle *loghandle;
153         int rc = 0;
154         ENTRY;
155
156         if (cathandle == NULL)
157                 RETURN(-EBADF);
158
159         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
160                             u.phd.phd_entry) {
161                 struct llog_logid *cgl = &loghandle->lgh_id;
162                 if (cgl->lgl_oid == logid->lgl_oid) {
163                         if (cgl->lgl_ogen != logid->lgl_ogen) {
164                                 CERROR("log "LPX64" generation %x != %x\n",
165                                        logid->lgl_oid, cgl->lgl_ogen,
166                                        logid->lgl_ogen);
167                                 continue;
168                         }
169                         loghandle->u.phd.phd_cat_handle = cathandle;
170                         GOTO(out, rc = 0);
171                 }
172         }
173
174         rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
175         if (rc) {
176                 CERROR("error opening log id "LPX64":%x: rc %d\n",
177                        logid->lgl_oid, logid->lgl_ogen, rc);
178         } else {
179                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
180                 if (!rc) {
181                         list_add(&loghandle->u.phd.phd_entry,
182                                  &cathandle->u.chd.chd_head);
183                 }
184         }
185         if (!rc) {
186                 loghandle->u.phd.phd_cat_handle = cathandle;
187                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
188                 loghandle->u.phd.phd_cookie.lgc_index = 
189                         loghandle->lgh_hdr->llh_cat_idx;
190         }
191
192 out:
193         *res = loghandle;
194         RETURN(rc);
195 }
196
197 int llog_cat_put(struct llog_handle *cathandle)
198 {
199         struct llog_handle *loghandle, *n;
200         int rc;
201         ENTRY;
202
203         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
204                                  u.phd.phd_entry) {
205                 int err = llog_close(loghandle);
206                 if (err)
207                         CERROR("error closing loghandle\n");
208         }
209         rc = llog_close(cathandle);
210         RETURN(rc);
211 }
212 EXPORT_SYMBOL(llog_cat_put);
213
214 /**
215  * lockdep markers for nested struct llog_handle::lgh_lock locking.
216  */
217 enum {
218         LLOGH_CAT,
219         LLOGH_LOG
220 };
221
222 /** Return the currently active log handle.  If the current log handle doesn't
223  * have enough space left for the current record, start a new one.
224  *
225  * If reclen is 0, we only want to know what the currently active log is,
226  * otherwise we get a lock on this log so nobody can steal our space.
227  *
228  * Assumes caller has already pushed us into the kernel context and is locking.
229  *
230  * NOTE: loghandle is write-locked upon successful return
231  */
232 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
233                                                 struct llog_logid *lid,
234                                                 int create)
235 {
236         struct llog_handle *loghandle = NULL;
237         ENTRY;
238
239         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
240         loghandle = cathandle->u.chd.chd_current_log;
241         if (loghandle) {
242                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
243                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
244                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
245                         up_read(&cathandle->lgh_lock);
246                         RETURN(loghandle);
247                 } else {
248                         lid = NULL;
249                         up_write(&loghandle->lgh_lock);
250                 }
251         }
252         if (!create) {
253                 if (loghandle)
254                         down_write(&loghandle->lgh_lock);
255                 up_read(&cathandle->lgh_lock);
256                 RETURN(loghandle);
257         }
258         up_read(&cathandle->lgh_lock);
259
260         /* time to create new log */
261
262         /* first, we have to make sure the state hasn't changed */
263         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
264         loghandle = cathandle->u.chd.chd_current_log;
265         if (loghandle) {
266                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
267                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
268                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
269                         up_write(&cathandle->lgh_lock);
270                         RETURN(loghandle);
271                 } else {
272                         lid = NULL;
273                         up_write(&loghandle->lgh_lock);
274                 }
275         }
276
277         CDEBUG(D_INODE, "creating new log\n");
278         loghandle = llog_cat_new_log(cathandle, lid);
279         if (!IS_ERR(loghandle))
280                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
281         up_write(&cathandle->lgh_lock);
282         RETURN(loghandle);
283 }
284
285 /* Add a single record to the recovery log(s) using a catalog
286  * Returns as llog_write_record
287  *
288  * Assumes caller has already pushed us into the kernel context.
289  */
290 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
291                      struct llog_cookie *reccookie, void *buf)
292 {
293         struct llog_handle *loghandle;
294         int rc;
295         ENTRY;
296
297         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
298         loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
299         if (IS_ERR(loghandle))
300                 RETURN(PTR_ERR(loghandle));
301         /* loghandle is already locked by llog_cat_current_log() for us */
302         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
303         up_write(&loghandle->lgh_lock);
304         if (rc == -ENOSPC) {
305                 /* to create a new plain log */
306                 loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
307                 if (IS_ERR(loghandle))
308                         RETURN(PTR_ERR(loghandle));
309                 rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
310                 up_write(&loghandle->lgh_lock);
311         }
312
313         RETURN(rc);
314 }
315 EXPORT_SYMBOL(llog_cat_add_rec);
316
317 /* For each cookie in the cookie array, we clear the log in-use bit and either:
318  * - the log is empty, so mark it free in the catalog header and delete it
319  * - the log is not empty, just write out the log header
320  *
321  * The cookies may be in different log files, so we need to get new logs
322  * each time.
323  *
324  * Assumes caller has already pushed us into the kernel context.
325  */
326 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
327                         struct llog_cookie *cookies)
328 {
329         int i, index, rc = 0;
330         ENTRY;
331
332         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
333         for (i = 0; i < count; i++, cookies++) {
334                 struct llog_handle *loghandle;
335                 struct llog_logid *lgl = &cookies->lgc_lgl;
336
337                 rc = llog_cat_id2handle(cathandle, &loghandle, lgl);
338                 if (rc) {
339                         CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid);
340                         break;
341                 }
342
343                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
344                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
345                 up_write(&loghandle->lgh_lock);
346
347                 if (rc == 1) {          /* log has been destroyed */
348                         index = loghandle->u.phd.phd_cookie.lgc_index;
349                         if (cathandle->u.chd.chd_current_log == loghandle)
350                                 cathandle->u.chd.chd_current_log = NULL;
351                         llog_free_handle(loghandle);
352
353                         LASSERT(index);
354                         llog_cat_set_first_idx(cathandle, index);
355                         rc = llog_cancel_rec(cathandle, index);
356                         if (rc == 0)
357                                 CDEBUG(D_RPCTRACE,"cancel plain log at index %u"
358                                        " of catalog "LPX64"\n",
359                                        index, cathandle->lgh_id.lgl_oid);
360                 }
361         }
362         up_write(&cathandle->lgh_lock);
363
364         RETURN(rc);
365 }
366 EXPORT_SYMBOL(llog_cat_cancel_records);
367
368 int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
369                         void *data)
370 {
371         struct llog_process_data *d = data;
372         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
373         struct llog_handle *llh;
374         int rc;
375
376         ENTRY;
377         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
378                 CERROR("invalid record in catalog\n");
379                 RETURN(-EINVAL);
380         }
381         CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "
382                LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
383                rec->lrh_index, cat_llh->lgh_id.lgl_oid);
384
385         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
386         if (rc) {
387                 CERROR("Cannot find handle for log "LPX64"\n",
388                        lir->lid_id.lgl_oid);
389                 RETURN(rc);
390         }
391
392         rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
393         RETURN(rc);
394 }
395
396 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
397 {
398         struct llog_process_data d;
399         struct llog_process_cat_data cd;
400         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
401         int rc;
402         ENTRY;
403
404         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
405         d.lpd_data = data;
406         d.lpd_cb = cb;
407
408         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
409                 CWARN("catlog "LPX64" crosses index zero\n",
410                       cat_llh->lgh_id.lgl_oid);
411
412                 cd.lpcd_first_idx = llh->llh_cat_idx;
413                 cd.lpcd_last_idx = 0;
414                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
415                 if (rc != 0)
416                         RETURN(rc);
417
418                 cd.lpcd_first_idx = 0;
419                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
420                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
421         } else {
422                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
423         }
424
425         RETURN(rc);
426 }
427 EXPORT_SYMBOL(llog_cat_process);
428
429 #ifdef __KERNEL__
430 int llog_cat_process_thread(void *data)
431 {
432         struct llog_process_cat_args *args = data;
433         struct llog_ctxt *ctxt = args->lpca_ctxt;
434         struct llog_handle *llh = NULL;
435         void  *cb = args->lpca_cb;
436         struct llog_logid logid;
437         int rc;
438         ENTRY;
439
440         cfs_daemonize_ctxt("ll_log_process");
441
442         logid = *(struct llog_logid *)(args->lpca_arg);
443         rc = llog_create(ctxt, &llh, &logid, NULL);
444         if (rc) {
445                 CERROR("llog_create() failed %d\n", rc);
446                 GOTO(out, rc);
447         }
448         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
449         if (rc) {
450                 CERROR("llog_init_handle failed %d\n", rc);
451                 GOTO(release_llh, rc);
452         }
453
454         if (cb) {
455                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
456                 if (rc != LLOG_PROC_BREAK)
457                         CERROR("llog_cat_process() failed %d\n", rc);
458         } else {
459                 CWARN("No callback function for recovery\n");
460         }
461
462         /* 
463          * Make sure that all cached data is sent. 
464          */
465         llog_sync(ctxt, NULL);
466         EXIT;
467 release_llh:
468         rc = llog_cat_put(llh);
469         if (rc)
470                 CERROR("llog_cat_put() failed %d\n", rc);
471 out:
472         llog_ctxt_put(ctxt);
473         OBD_FREE(args, sizeof(*args));
474         return rc;
475 }
476 EXPORT_SYMBOL(llog_cat_process_thread);
477 #endif
478
479 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh,
480                                        struct llog_rec_hdr *rec, void *data)
481 {
482         struct llog_process_data *d = data;
483         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
484         struct llog_handle *llh;
485         int rc;
486
487         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
488                 CERROR("invalid record in catalog\n");
489                 RETURN(-EINVAL);
490         }
491         CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "
492                LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
493                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
494
495         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
496         if (rc) {
497                 CERROR("Cannot find handle for log "LPX64"\n",
498                        lir->lid_id.lgl_oid);
499                 RETURN(rc);
500         }
501
502         rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
503         RETURN(rc);
504 }
505
506 int llog_cat_reverse_process(struct llog_handle *cat_llh,
507                              llog_cb_t cb, void *data)
508 {
509         struct llog_process_data d;
510         struct llog_process_cat_data cd;
511         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
512         int rc;
513         ENTRY;
514
515         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
516         d.lpd_data = data;
517         d.lpd_cb = cb;
518
519         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
520                 CWARN("catalog "LPX64" crosses index zero\n",
521                       cat_llh->lgh_id.lgl_oid);
522
523                 cd.lpcd_first_idx = 0;
524                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
525                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
526                                           &d, &cd);
527                 if (rc != 0)
528                         RETURN(rc);
529
530                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
531                 cd.lpcd_last_idx = 0;
532                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
533                                           &d, &cd);
534         } else {
535                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
536                                           &d, NULL);
537         }
538
539         RETURN(rc);
540 }
541 EXPORT_SYMBOL(llog_cat_reverse_process);
542
543 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
544 {
545         struct llog_log_hdr *llh = cathandle->lgh_hdr;
546         int i, bitmap_size, idx;
547         ENTRY;
548
549         bitmap_size = LLOG_BITMAP_SIZE(llh);
550         if (llh->llh_cat_idx == (index - 1)) {
551                 idx = llh->llh_cat_idx + 1;
552                 llh->llh_cat_idx = idx;
553                 if (idx == cathandle->lgh_last_idx)
554                         goto out;
555                 for (i = (index + 1) % bitmap_size;
556                      i != cathandle->lgh_last_idx;
557                      i = (i + 1) % bitmap_size) {
558                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
559                                 idx = llh->llh_cat_idx + 1;
560                                 llh->llh_cat_idx = idx;
561                         } else if (i == 0) {
562                                 llh->llh_cat_idx = 0;
563                         } else {
564                                 break;
565                         }
566                 }
567 out:
568                 CDEBUG(D_RPCTRACE, "set catlog "LPX64" first idx %u\n",
569                        cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
570         }
571
572         RETURN(0);
573 }
574
575 #if 0
576 /* Assumes caller has already pushed us into the kernel context. */
577 int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
578 {
579         struct llog_log_hdr *llh;
580         loff_t offset = 0;
581         int rc = 0;
582         ENTRY;
583
584         LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
585
586         down(&cathandle->lgh_lock);
587         llh = cathandle->lgh_hdr;
588
589         if (i_size_read(cathandle->lgh_file->f_dentry->d_inode) == 0) {
590                 llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
591
592 write_hdr:
593                 rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
594                                    &offset);
595                 if (rc != LLOG_CHUNK_SIZE) {
596                         CERROR("error writing catalog header: rc %d\n", rc);
597                         OBD_FREE(llh, sizeof(*llh));
598                         if (rc >= 0)
599                                 rc = -ENOSPC;
600                 } else
601                         rc = 0;
602         } else {
603                 rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
604                                   &offset);
605                 if (rc != LLOG_CHUNK_SIZE) {
606                         CERROR("error reading catalog header: rc %d\n", rc);
607                         /* Can we do much else if the header is bad? */
608                         goto write_hdr;
609                 } else
610                         rc = 0;
611         }
612
613         cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
614         up(&cathandle->lgh_lock);
615         RETURN(rc);
616 }
617 EXPORT_SYMBOL(llog_cat_init);
618
619 #endif