Whamcloud - gitweb
LU-2109 llog: introduce llog handle refcounter
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_cat.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  *
40  * Invariants in implementation:
41  * - we do not share logs among different OST<->MDS connections, so that
42  *   if an OST or MDS fails it need only look at log(s) relevant to itself
43  *
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
46  * Author: Mikhail Pershin <mike.pershin@intel.com>
47  */
48
49 #define DEBUG_SUBSYSTEM S_LOG
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #endif
54
55 #include <obd_class.h>
56
57 #include "llog_internal.h"
58
59 /* Create a new log handle and add it to the open list.
60  * This log handle will be closed when all of the records in it are removed.
61  *
62  * Assumes caller has already pushed us into the kernel context and is locking.
63  */
64 static int llog_cat_new_log(const struct lu_env *env,
65                             struct llog_handle *cathandle,
66                             struct llog_handle *loghandle,
67                             struct thandle *th)
68 {
69
70         struct llog_log_hdr *llh;
71         struct llog_logid_rec rec = { { 0 }, };
72         int rc, index, bitmap_size;
73         ENTRY;
74
75         llh = cathandle->lgh_hdr;
76         bitmap_size = LLOG_BITMAP_SIZE(llh);
77
78         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
79
80         /* maximum number of available slots in catlog is bitmap_size - 2 */
81         if (llh->llh_cat_idx == index) {
82                 CERROR("no free catalog slots for log...\n");
83                 RETURN(-ENOSPC);
84         }
85
86         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
87                 RETURN(-ENOSPC);
88
89         rc = llog_create(env, loghandle, th);
90         /* if llog is already created, no need to initialize it */
91         if (rc == -EEXIST) {
92                 RETURN(0);
93         } else if (rc != 0) {
94                 CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
95                        loghandle->lgh_ctxt->loc_obd->obd_name, rc);
96                 RETURN(rc);
97         }
98
99         rc = llog_init_handle(env, loghandle,
100                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
101                               &cathandle->lgh_hdr->llh_tgtuuid);
102         if (rc)
103                 GOTO(out_destroy, rc);
104
105         if (index == 0)
106                 index = 1;
107
108         spin_lock(&loghandle->lgh_hdr_lock);
109         llh->llh_count++;
110         if (ext2_set_bit(index, llh->llh_bitmap)) {
111                 CERROR("argh, index %u already set in log bitmap?\n",
112                        index);
113                 spin_unlock(&loghandle->lgh_hdr_lock);
114                 LBUG(); /* should never happen */
115         }
116         spin_unlock(&loghandle->lgh_hdr_lock);
117
118         cathandle->lgh_last_idx = index;
119         llh->llh_tail.lrt_index = index;
120
121         CDEBUG(D_RPCTRACE,"new recovery log "LPX64":%x for index %u of catalog "
122                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
123                index, cathandle->lgh_id.lgl_oid);
124         /* build the record for this log in the catalog */
125         rec.lid_hdr.lrh_len = sizeof(rec);
126         rec.lid_hdr.lrh_index = index;
127         rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
128         rec.lid_id = loghandle->lgh_id;
129         rec.lid_tail.lrt_len = sizeof(rec);
130         rec.lid_tail.lrt_index = index;
131
132         /* update the catalog: header and record */
133         rc = llog_write_rec(env, cathandle, &rec.lid_hdr,
134                             &loghandle->u.phd.phd_cookie, 1, NULL, index, th);
135         if (rc < 0)
136                 GOTO(out_destroy, rc);
137
138         loghandle->lgh_hdr->llh_cat_idx = index;
139         RETURN(0);
140 out_destroy:
141         llog_destroy(env, loghandle);
142         RETURN(rc);
143 }
144
145 /* Open an existent log handle and add it to the open list.
146  * This log handle will be closed when all of the records in it are removed.
147  *
148  * Assumes caller has already pushed us into the kernel context and is locking.
149  * We return a lock on the handle to ensure nobody yanks it from us.
150  *
151  * This takes extra reference on llog_handle via llog_handle_get() and require
152  * this reference to be put by caller using llog_handle_put()
153  */
154 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
155                        struct llog_handle **res, struct llog_logid *logid)
156 {
157         struct llog_handle      *loghandle;
158         int                      rc = 0;
159
160         ENTRY;
161
162         if (cathandle == NULL)
163                 RETURN(-EBADF);
164
165         down_write(&cathandle->lgh_lock);
166         cfs_list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
167                                 u.phd.phd_entry) {
168                 struct llog_logid *cgl = &loghandle->lgh_id;
169
170                 if (cgl->lgl_oid == logid->lgl_oid) {
171                         if (cgl->lgl_ogen != logid->lgl_ogen) {
172                                 CERROR("%s: log "LPX64" generation %x != %x\n",
173                                        loghandle->lgh_ctxt->loc_obd->obd_name,
174                                        logid->lgl_oid, cgl->lgl_ogen,
175                                        logid->lgl_ogen);
176                                 continue;
177                         }
178                         loghandle->u.phd.phd_cat_handle = cathandle;
179                         up_write(&cathandle->lgh_lock);
180                         GOTO(out, rc = 0);
181                 }
182         }
183         up_write(&cathandle->lgh_lock);
184
185         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
186                        LLOG_OPEN_EXISTS);
187         if (rc < 0) {
188                 CERROR("%s: error opening log id "LPX64":%x: rc = %d\n",
189                        cathandle->lgh_ctxt->loc_obd->obd_name,
190                        logid->lgl_oid, logid->lgl_ogen, rc);
191                 RETURN(rc);
192         }
193
194         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
195         if (rc < 0) {
196                 llog_close(env, loghandle);
197                 RETURN(rc);
198         }
199
200         down_write(&cathandle->lgh_lock);
201         cfs_list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
202         up_write(&cathandle->lgh_lock);
203
204         loghandle->u.phd.phd_cat_handle = cathandle;
205         loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
206         loghandle->u.phd.phd_cookie.lgc_index =
207                                 loghandle->lgh_hdr->llh_cat_idx;
208         EXIT;
209 out:
210         llog_handle_get(loghandle);
211         *res = loghandle;
212         return 0;
213 }
214
215 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
216 {
217         struct llog_handle      *loghandle, *n;
218         int                      rc;
219
220         ENTRY;
221
222         cfs_list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
223                                      u.phd.phd_entry) {
224                 struct llog_log_hdr     *llh = loghandle->lgh_hdr;
225                 int                      index;
226
227                 /* unlink open-not-created llogs */
228                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
229                 llh = loghandle->lgh_hdr;
230                 if (loghandle->lgh_obj != NULL && llh != NULL &&
231                     (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
232                     (llh->llh_count == 1)) {
233                         rc = llog_destroy(env, loghandle);
234                         if (rc)
235                                 CERROR("%s: failure destroying log during "
236                                        "cleanup: rc = %d\n",
237                                        loghandle->lgh_ctxt->loc_obd->obd_name,
238                                        rc);
239
240                         index = loghandle->u.phd.phd_cookie.lgc_index;
241                         llog_cat_cleanup(env, cathandle, NULL, index);
242                 }
243                 llog_close(env, loghandle);
244         }
245         /* if handle was stored in ctxt, remove it too */
246         if (cathandle->lgh_ctxt->loc_handle == cathandle)
247                 cathandle->lgh_ctxt->loc_handle = NULL;
248         rc = llog_close(env, cathandle);
249         RETURN(rc);
250 }
251 EXPORT_SYMBOL(llog_cat_close);
252
253 /**
254  * lockdep markers for nested struct llog_handle::lgh_lock locking.
255  */
256 enum {
257         LLOGH_CAT,
258         LLOGH_LOG
259 };
260
261 /** Return the currently active log handle.  If the current log handle doesn't
262  * have enough space left for the current record, start a new one.
263  *
264  * If reclen is 0, we only want to know what the currently active log is,
265  * otherwise we get a lock on this log so nobody can steal our space.
266  *
267  * Assumes caller has already pushed us into the kernel context and is locking.
268  *
269  * NOTE: loghandle is write-locked upon successful return
270  */
271 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
272                                                 struct thandle *th)
273 {
274         struct llog_handle *loghandle = NULL;
275         ENTRY;
276
277         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
278         loghandle = cathandle->u.chd.chd_current_log;
279         if (loghandle) {
280                 struct llog_log_hdr *llh;
281
282                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
283                 llh = loghandle->lgh_hdr;
284                 if (llh == NULL ||
285                     loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
286                         up_read(&cathandle->lgh_lock);
287                         RETURN(loghandle);
288                 } else {
289                         up_write(&loghandle->lgh_lock);
290                 }
291         }
292         up_read(&cathandle->lgh_lock);
293
294         /* time to use next log */
295
296         /* first, we have to make sure the state hasn't changed */
297         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
298         loghandle = cathandle->u.chd.chd_current_log;
299         if (loghandle) {
300                 struct llog_log_hdr *llh;
301
302                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
303                 llh = loghandle->lgh_hdr;
304                 LASSERT(llh);
305                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
306                         up_write(&cathandle->lgh_lock);
307                         RETURN(loghandle);
308                 } else {
309                         up_write(&loghandle->lgh_lock);
310                 }
311         }
312
313         CDEBUG(D_INODE, "use next log\n");
314
315         loghandle = cathandle->u.chd.chd_next_log;
316         cathandle->u.chd.chd_current_log = loghandle;
317         cathandle->u.chd.chd_next_log = NULL;
318         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
319         up_write(&cathandle->lgh_lock);
320         LASSERT(loghandle);
321         RETURN(loghandle);
322 }
323
324 /* Add a single record to the recovery log(s) using a catalog
325  * Returns as llog_write_record
326  *
327  * Assumes caller has already pushed us into the kernel context.
328  */
329 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
330                      struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
331                      void *buf, struct thandle *th)
332 {
333         struct llog_handle *loghandle;
334         int rc;
335         ENTRY;
336
337         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
338         loghandle = llog_cat_current_log(cathandle, th);
339         LASSERT(!IS_ERR(loghandle));
340
341         /* loghandle is already locked by llog_cat_current_log() for us */
342         if (!llog_exist(loghandle)) {
343                 rc = llog_cat_new_log(env, cathandle, loghandle, th);
344                 if (rc < 0) {
345                         up_write(&loghandle->lgh_lock);
346                         RETURN(rc);
347                 }
348         }
349         /* now let's try to add the record */
350         rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf, -1, th);
351         if (rc < 0)
352                 CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
353         up_write(&loghandle->lgh_lock);
354         if (rc == -ENOSPC) {
355                 /* try to use next log */
356                 loghandle = llog_cat_current_log(cathandle, th);
357                 LASSERT(!IS_ERR(loghandle));
358                 /* new llog can be created concurrently */
359                 if (!llog_exist(loghandle)) {
360                         rc = llog_cat_new_log(env, cathandle, loghandle, th);
361                         if (rc < 0) {
362                                 up_write(&loghandle->lgh_lock);
363                                 RETURN(rc);
364                         }
365                 }
366                 /* now let's try to add the record */
367                 rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf,
368                                     -1, th);
369                 if (rc < 0)
370                         CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
371                 up_write(&loghandle->lgh_lock);
372         }
373
374         RETURN(rc);
375 }
376 EXPORT_SYMBOL(llog_cat_add_rec);
377
378 int llog_cat_declare_add_rec(const struct lu_env *env,
379                              struct llog_handle *cathandle,
380                              struct llog_rec_hdr *rec, struct thandle *th)
381 {
382         struct llog_handle      *loghandle, *next;
383         int                      rc = 0;
384
385         ENTRY;
386
387         if (cathandle->u.chd.chd_current_log == NULL) {
388                 /* declare new plain llog */
389                 down_write(&cathandle->lgh_lock);
390                 if (cathandle->u.chd.chd_current_log == NULL) {
391                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
392                                        NULL, NULL, LLOG_OPEN_NEW);
393                         if (rc == 0) {
394                                 cathandle->u.chd.chd_current_log = loghandle;
395                                 cfs_list_add_tail(&loghandle->u.phd.phd_entry,
396                                                   &cathandle->u.chd.chd_head);
397                         }
398                 }
399                 up_write(&cathandle->lgh_lock);
400         } else if (cathandle->u.chd.chd_next_log == NULL) {
401                 /* declare next plain llog */
402                 down_write(&cathandle->lgh_lock);
403                 if (cathandle->u.chd.chd_next_log == NULL) {
404                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
405                                        NULL, NULL, LLOG_OPEN_NEW);
406                         if (rc == 0) {
407                                 cathandle->u.chd.chd_next_log = loghandle;
408                                 cfs_list_add_tail(&loghandle->u.phd.phd_entry,
409                                                   &cathandle->u.chd.chd_head);
410                         }
411                 }
412                 up_write(&cathandle->lgh_lock);
413         }
414         if (rc)
415                 GOTO(out, rc);
416
417         if (!llog_exist(cathandle->u.chd.chd_current_log)) {
418                 rc = llog_declare_create(env, cathandle->u.chd.chd_current_log,
419                                          th);
420                 if (rc)
421                         GOTO(out, rc);
422                 llog_declare_write_rec(env, cathandle, NULL, -1, th);
423         }
424         /* declare records in the llogs */
425         rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
426                                     rec, -1, th);
427         if (rc)
428                 GOTO(out, rc);
429
430         next = cathandle->u.chd.chd_next_log;
431         if (next) {
432                 if (!llog_exist(next)) {
433                         rc = llog_declare_create(env, next, th);
434                         llog_declare_write_rec(env, cathandle, NULL, -1, th);
435                 }
436                 llog_declare_write_rec(env, next, rec, -1, th);
437         }
438 out:
439         RETURN(rc);
440 }
441 EXPORT_SYMBOL(llog_cat_declare_add_rec);
442
443 int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
444                  struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
445                  void *buf)
446 {
447         struct llog_ctxt        *ctxt;
448         struct dt_device        *dt;
449         struct thandle          *th = NULL;
450         int                      rc;
451
452         ctxt = cathandle->lgh_ctxt;
453         LASSERT(ctxt);
454         LASSERT(ctxt->loc_exp);
455
456         if (cathandle->lgh_obj != NULL) {
457                 dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
458                 LASSERT(dt);
459
460                 th = dt_trans_create(env, dt);
461                 if (IS_ERR(th))
462                         RETURN(PTR_ERR(th));
463
464                 rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
465                 if (rc)
466                         GOTO(out_trans, rc);
467
468                 rc = dt_trans_start_local(env, dt, th);
469                 if (rc)
470                         GOTO(out_trans, rc);
471                 rc = llog_cat_add_rec(env, cathandle, rec, reccookie, buf, th);
472 out_trans:
473                 dt_trans_stop(env, dt, th);
474         } else { /* lvfs compat code */
475                 LASSERT(cathandle->lgh_file != NULL);
476                 rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
477                 if (rc == 0)
478                         rc = llog_cat_add_rec(env, cathandle, rec, reccookie,
479                                               buf, th);
480         }
481         RETURN(rc);
482 }
483 EXPORT_SYMBOL(llog_cat_add);
484
485 /* For each cookie in the cookie array, we clear the log in-use bit and either:
486  * - the log is empty, so mark it free in the catalog header and delete it
487  * - the log is not empty, just write out the log header
488  *
489  * The cookies may be in different log files, so we need to get new logs
490  * each time.
491  *
492  * Assumes caller has already pushed us into the kernel context.
493  */
494 int llog_cat_cancel_records(const struct lu_env *env,
495                             struct llog_handle *cathandle, int count,
496                             struct llog_cookie *cookies)
497 {
498         int i, index, rc = 0, failed = 0;
499
500         ENTRY;
501
502         for (i = 0; i < count; i++, cookies++) {
503                 struct llog_handle      *loghandle;
504                 struct llog_logid       *lgl = &cookies->lgc_lgl;
505                 int                      lrc;
506
507                 rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
508                 if (rc) {
509                         CERROR("%s: cannot find handle for llog "LPX64": %d\n",
510                                cathandle->lgh_ctxt->loc_obd->obd_name,
511                                lgl->lgl_oid, rc);
512                         failed++;
513                         continue;
514                 }
515
516                 lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
517                 if (lrc == 1) {          /* log has been destroyed */
518                         index = loghandle->u.phd.phd_cookie.lgc_index;
519                         rc = llog_cat_cleanup(env, cathandle, loghandle,
520                                               index);
521                 } else if (lrc == -ENOENT) {
522                         if (rc == 0) /* ENOENT shouldn't rewrite any error */
523                                 rc = lrc;
524                 } else if (lrc < 0) {
525                         failed++;
526                         rc = lrc;
527                 }
528                 llog_handle_put(loghandle);
529         }
530         if (rc)
531                 CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
532                        cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
533                        rc);
534
535         RETURN(rc);
536 }
537 EXPORT_SYMBOL(llog_cat_cancel_records);
538
539 int llog_cat_process_cb(const struct lu_env *env, struct llog_handle *cat_llh,
540                         struct llog_rec_hdr *rec, void *data)
541 {
542         struct llog_process_data *d = data;
543         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
544         struct llog_handle *llh;
545         int rc;
546
547         ENTRY;
548         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
549                 CERROR("invalid record in catalog\n");
550                 RETURN(-EINVAL);
551         }
552         CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "
553                LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
554                rec->lrh_index, cat_llh->lgh_id.lgl_oid);
555
556         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
557         if (rc) {
558                 CERROR("%s: cannot find handle for llog "LPX64": %d\n",
559                        cat_llh->lgh_ctxt->loc_obd->obd_name,
560                        lir->lid_id.lgl_oid, rc);
561                 RETURN(rc);
562         }
563
564         if (rec->lrh_index < d->lpd_startcat)
565                 /* Skip processing of the logs until startcat */
566                 RETURN(0);
567
568         if (d->lpd_startidx > 0) {
569                 struct llog_process_cat_data cd;
570
571                 cd.lpcd_first_idx = d->lpd_startidx;
572                 cd.lpcd_last_idx = 0;
573                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
574                                           &cd, false);
575                 /* Continue processing the next log from idx 0 */
576                 d->lpd_startidx = 0;
577         } else {
578                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
579                                           NULL, false);
580         }
581         llog_handle_put(llh);
582
583         RETURN(rc);
584 }
585
586 int llog_cat_process_or_fork(const struct lu_env *env,
587                              struct llog_handle *cat_llh,
588                              llog_cb_t cb, void *data, int startcat,
589                              int startidx, bool fork)
590 {
591         struct llog_process_data d;
592         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
593         int rc;
594         ENTRY;
595
596         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
597         d.lpd_data = data;
598         d.lpd_cb = cb;
599         d.lpd_startcat = startcat;
600         d.lpd_startidx = startidx;
601
602         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
603                 struct llog_process_cat_data cd;
604
605                 CWARN("catlog "LPX64" crosses index zero\n",
606                       cat_llh->lgh_id.lgl_oid);
607
608                 cd.lpcd_first_idx = llh->llh_cat_idx;
609                 cd.lpcd_last_idx = 0;
610                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
611                                           &d, &cd, fork);
612                 if (rc != 0)
613                         RETURN(rc);
614
615                 cd.lpcd_first_idx = 0;
616                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
617                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
618                                           &d, &cd, fork);
619         } else {
620                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
621                                           &d, NULL, fork);
622         }
623
624         RETURN(rc);
625 }
626 EXPORT_SYMBOL(llog_cat_process_or_fork);
627
628 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
629                      llog_cb_t cb, void *data, int startcat, int startidx)
630 {
631         return llog_cat_process_or_fork(env, cat_llh, cb, data, startcat,
632                                         startidx, false);
633 }
634 EXPORT_SYMBOL(llog_cat_process);
635
636 #ifdef __KERNEL__
637 int llog_cat_process_thread(void *data)
638 {
639         struct llog_process_cat_args *args = data;
640         struct llog_ctxt *ctxt = args->lpca_ctxt;
641         struct llog_handle *llh = NULL;
642         llog_cb_t cb = args->lpca_cb;
643         struct llog_thread_info *lgi;
644         struct lu_env            env;
645         int rc;
646         ENTRY;
647
648         cfs_daemonize_ctxt("ll_log_process");
649
650         rc = lu_env_init(&env, LCT_LOCAL);
651         if (rc)
652                 GOTO(out, rc);
653         lgi = llog_info(&env);
654         LASSERT(lgi);
655
656         lgi->lgi_logid = *(struct llog_logid *)(args->lpca_arg);
657         rc = llog_open(&env, ctxt, &llh, &lgi->lgi_logid, NULL,
658                        LLOG_OPEN_EXISTS);
659         if (rc) {
660                 CERROR("%s: cannot open llog "LPX64":%x: rc = %d\n",
661                        ctxt->loc_obd->obd_name, lgi->lgi_logid.lgl_oid,
662                        lgi->lgi_logid.lgl_ogen, rc);
663                 GOTO(out_env, rc);
664         }
665         rc = llog_init_handle(&env, llh, LLOG_F_IS_CAT, NULL);
666         if (rc) {
667                 CERROR("%s: llog_init_handle failed: rc = %d\n",
668                        llh->lgh_ctxt->loc_obd->obd_name, rc);
669                 GOTO(release_llh, rc);
670         }
671
672         if (cb) {
673                 rc = llog_cat_process(&env, llh, cb, NULL, 0, 0);
674                 if (rc != LLOG_PROC_BREAK && rc != 0)
675                         CERROR("%s: llog_cat_process() failed: rc = %d\n",
676                                llh->lgh_ctxt->loc_obd->obd_name, rc);
677                 cb(&env, llh, NULL, NULL);
678         } else {
679                 CWARN("No callback function for recovery\n");
680         }
681
682         /*
683          * Make sure that all cached data is sent.
684          */
685         llog_sync(ctxt, NULL, 0);
686         GOTO(release_llh, rc);
687 release_llh:
688         rc = llog_cat_close(&env, llh);
689         if (rc)
690                 CERROR("%s: llog_cat_close() failed: rc = %d\n",
691                        llh->lgh_ctxt->loc_obd->obd_name, rc);
692 out_env:
693         lu_env_fini(&env);
694 out:
695         llog_ctxt_put(ctxt);
696         OBD_FREE_PTR(args);
697         return rc;
698 }
699 EXPORT_SYMBOL(llog_cat_process_thread);
700 #endif
701
702 static int llog_cat_reverse_process_cb(const struct lu_env *env,
703                                        struct llog_handle *cat_llh,
704                                        struct llog_rec_hdr *rec, void *data)
705 {
706         struct llog_process_data *d = data;
707         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
708         struct llog_handle *llh;
709         int rc;
710
711         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
712                 CERROR("invalid record in catalog\n");
713                 RETURN(-EINVAL);
714         }
715         CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "
716                LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
717                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
718
719         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
720         if (rc) {
721                 CERROR("%s: cannot find handle for llog "LPX64": %d\n",
722                        cat_llh->lgh_ctxt->loc_obd->obd_name,
723                        lir->lid_id.lgl_oid, rc);
724                 RETURN(rc);
725         }
726
727         rc = llog_reverse_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
728         llog_handle_put(llh);
729         RETURN(rc);
730 }
731
732 int llog_cat_reverse_process(const struct lu_env *env,
733                              struct llog_handle *cat_llh,
734                              llog_cb_t cb, void *data)
735 {
736         struct llog_process_data d;
737         struct llog_process_cat_data cd;
738         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
739         int rc;
740         ENTRY;
741
742         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
743         d.lpd_data = data;
744         d.lpd_cb = cb;
745
746         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
747                 CWARN("catalog "LPX64" crosses index zero\n",
748                       cat_llh->lgh_id.lgl_oid);
749
750                 cd.lpcd_first_idx = 0;
751                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
752                 rc = llog_reverse_process(env, cat_llh,
753                                           llog_cat_reverse_process_cb,
754                                           &d, &cd);
755                 if (rc != 0)
756                         RETURN(rc);
757
758                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
759                 cd.lpcd_last_idx = 0;
760                 rc = llog_reverse_process(env, cat_llh,
761                                           llog_cat_reverse_process_cb,
762                                           &d, &cd);
763         } else {
764                 rc = llog_reverse_process(env, cat_llh,
765                                           llog_cat_reverse_process_cb,
766                                           &d, NULL);
767         }
768
769         RETURN(rc);
770 }
771 EXPORT_SYMBOL(llog_cat_reverse_process);
772
773 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
774 {
775         struct llog_log_hdr *llh = cathandle->lgh_hdr;
776         int i, bitmap_size, idx;
777         ENTRY;
778
779         bitmap_size = LLOG_BITMAP_SIZE(llh);
780         if (llh->llh_cat_idx == (index - 1)) {
781                 idx = llh->llh_cat_idx + 1;
782                 llh->llh_cat_idx = idx;
783                 if (idx == cathandle->lgh_last_idx)
784                         goto out;
785                 for (i = (index + 1) % bitmap_size;
786                      i != cathandle->lgh_last_idx;
787                      i = (i + 1) % bitmap_size) {
788                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
789                                 idx = llh->llh_cat_idx + 1;
790                                 llh->llh_cat_idx = idx;
791                         } else if (i == 0) {
792                                 llh->llh_cat_idx = 0;
793                         } else {
794                                 break;
795                         }
796                 }
797 out:
798                 CDEBUG(D_RPCTRACE, "set catlog "LPX64" first idx %u\n",
799                        cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
800         }
801
802         RETURN(0);
803 }
804
805 /* Cleanup deleted plain llog traces from catalog */
806 int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
807                      struct llog_handle *loghandle, int index)
808 {
809         int rc;
810
811         LASSERT(index);
812         if (loghandle != NULL) {
813                 /* remove destroyed llog from catalog list and
814                  * chd_current_log variable */
815                 down_write(&cathandle->lgh_lock);
816                 if (cathandle->u.chd.chd_current_log == loghandle)
817                         cathandle->u.chd.chd_current_log = NULL;
818                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
819                 up_write(&cathandle->lgh_lock);
820                 LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index);
821                 /* llog was opened and keep in a list, close it now */
822                 llog_close(env, loghandle);
823         }
824         /* remove plain llog entry from catalog by index */
825         llog_cat_set_first_idx(cathandle, index);
826         rc = llog_cancel_rec(env, cathandle, index);
827         if (rc == 0)
828                 CDEBUG(D_HA, "cancel plain log at index"
829                        " %u of catalog "LPX64"\n",
830                        index, cathandle->lgh_id.lgl_oid);
831         return rc;
832 }
833
834 int cat_cancel_cb(const struct lu_env *env, struct llog_handle *cathandle,
835                   struct llog_rec_hdr *rec, void *data)
836 {
837         struct llog_logid_rec   *lir = (struct llog_logid_rec *)rec;
838         struct llog_handle      *loghandle;
839         struct llog_log_hdr     *llh;
840         int                      rc;
841
842         ENTRY;
843
844         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
845                 CERROR("%s: invalid record in catalog\n",
846                        loghandle->lgh_ctxt->loc_obd->obd_name);
847                 RETURN(-EINVAL);
848         }
849         CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "
850                LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
851                rec->lrh_index, cathandle->lgh_id.lgl_oid);
852
853         rc = llog_cat_id2handle(env, cathandle, &loghandle, &lir->lid_id);
854         if (rc) {
855                 CERROR("%s: cannot find handle for llog "LPX64": %d\n",
856                        cathandle->lgh_ctxt->loc_obd->obd_name,
857                        lir->lid_id.lgl_oid, rc);
858                 if (rc == -ENOENT || rc == -ESTALE) {
859                         /* remove index from catalog */
860                         llog_cat_cleanup(env, cathandle, NULL, rec->lrh_index);
861                 }
862                 RETURN(rc);
863         }
864
865         llh = loghandle->lgh_hdr;
866         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
867             (llh->llh_count == 1)) {
868                 rc = llog_destroy(env, loghandle);
869                 if (rc)
870                         CERROR("%s: fail to destroy empty log: rc = %d\n",
871                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
872
873                 llog_cat_cleanup(env, cathandle, loghandle,
874                                  loghandle->u.phd.phd_cookie.lgc_index);
875         }
876         llog_handle_put(loghandle);
877
878         RETURN(rc);
879 }
880 EXPORT_SYMBOL(cat_cancel_cb);
881
882 /* helper to initialize catalog llog and process it to cancel */
883 int llog_cat_init_and_process(const struct lu_env *env,
884                               struct llog_handle *llh)
885 {
886         int rc;
887
888         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, NULL);
889         if (rc)
890                 RETURN(rc);
891
892         rc = llog_process_or_fork(env, llh, cat_cancel_cb, NULL, NULL, false);
893         if (rc)
894                 CERROR("%s: llog_process() with cat_cancel_cb failed: rc = "
895                        "%d\n", llh->lgh_ctxt->loc_obd->obd_name, rc);
896         RETURN(0);
897 }
898 EXPORT_SYMBOL(llog_cat_init_and_process);
899