Whamcloud - gitweb
LU-7243 misc: update Intel copyright messages 2015
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_cat.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  *
40  * Invariants in implementation:
41  * - we do not share logs among different OST<->MDS connections, so that
42  *   if an OST or MDS fails it need only look at log(s) relevant to itself
43  *
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
46  * Author: Mikhail Pershin <mike.pershin@intel.com>
47  */
48
49 #define DEBUG_SUBSYSTEM S_LOG
50
51
52 #include <obd_class.h>
53
54 #include "llog_internal.h"
55
56 /* Create a new log handle and add it to the open list.
57  * This log handle will be closed when all of the records in it are removed.
58  *
59  * Assumes caller has already pushed us into the kernel context and is locking.
60  */
61 static int llog_cat_new_log(const struct lu_env *env,
62                             struct llog_handle *cathandle,
63                             struct llog_handle *loghandle,
64                             struct thandle *th)
65 {
66         struct llog_thread_info *lgi = llog_info(env);
67         struct llog_logid_rec   *rec = &lgi->lgi_logid;
68         struct thandle *handle = NULL;
69         struct dt_device *dt = NULL;
70         struct llog_log_hdr     *llh = cathandle->lgh_hdr;
71         int                      rc, index;
72
73         ENTRY;
74
75         index = (cathandle->lgh_last_idx + 1) %
76                 (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) :
77                                                 LLOG_HDR_BITMAP_SIZE(llh));
78
79         /* check that new llog index will not overlap with the first one.
80          * - llh_cat_idx is the index just before the first/oldest still in-use
81          *      index in catalog
82          * - lgh_last_idx is the last/newest used index in catalog
83          *
84          * When catalog is not wrapped yet then lgh_last_idx is always larger
85          * than llh_cat_idx. After the wrap around lgh_last_idx re-starts
86          * from 0 and llh_cat_idx becomes the upper limit for it
87          *
88          * Check if catalog has already wrapped around or not by comparing
89          * last_idx and cat_idx */
90         if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) ||
91             (index == 0 && llh->llh_cat_idx == 0)) {
92                 CWARN("%s: there are no more free slots in catalog\n",
93                       loghandle->lgh_ctxt->loc_obd->obd_name);
94                 RETURN(-ENOSPC);
95         }
96
97         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
98                 RETURN(-ENOSPC);
99
100         if (loghandle->lgh_hdr != NULL) {
101                 /* If llog object is remote and creation is failed, lgh_hdr
102                  * might be left over here, free it first */
103                 LASSERT(!llog_exist(loghandle));
104                 OBD_FREE_PTR(loghandle->lgh_hdr);
105                 loghandle->lgh_hdr = NULL;
106         }
107
108         if (th == NULL) {
109                 dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
110
111                 handle = dt_trans_create(env, dt);
112                 if (IS_ERR(handle))
113                         RETURN(PTR_ERR(handle));
114
115                 /* Create update llog object synchronously, which
116                  * happens during inialization process see
117                  * lod_sub_prep_llog(), to make sure the update
118                  * llog object is created before corss-MDT writing
119                  * updates into the llog object */
120                 if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
121                         handle->th_sync = 1;
122
123                 handle->th_wait_submit = 1;
124
125                 rc = llog_declare_create(env, loghandle, handle);
126                 if (rc != 0)
127                         GOTO(out, rc);
128
129                 rec->lid_hdr.lrh_len = sizeof(*rec);
130                 rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
131                 rec->lid_id = loghandle->lgh_id;
132                 rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
133                                             handle);
134                 if (rc != 0)
135                         GOTO(out, rc);
136
137                 rc = dt_trans_start_local(env, dt, handle);
138                 if (rc != 0)
139                         GOTO(out, rc);
140
141                 th = handle;
142         }
143
144         rc = llog_create(env, loghandle, th);
145         /* if llog is already created, no need to initialize it */
146         if (rc == -EEXIST) {
147                 GOTO(out, rc = 0);
148         } else if (rc != 0) {
149                 CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
150                        loghandle->lgh_ctxt->loc_obd->obd_name, rc);
151                 GOTO(out, rc);
152         }
153
154         rc = llog_init_handle(env, loghandle,
155                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
156                               &cathandle->lgh_hdr->llh_tgtuuid);
157         if (rc < 0)
158                 GOTO(out, rc);
159
160         /* build the record for this log in the catalog */
161         rec->lid_hdr.lrh_len = sizeof(*rec);
162         rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
163         rec->lid_id = loghandle->lgh_id;
164
165         /* append the new record into catalog. The new index will be
166          * assigned to the record and updated in rec header */
167         rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
168                             &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
169         if (rc < 0)
170                 GOTO(out_destroy, rc);
171
172         CDEBUG(D_OTHER, "new plain log "DOSTID":%x for index %u of catalog"
173                DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
174                loghandle->lgh_id.lgl_ogen, rec->lid_hdr.lrh_index,
175                POSTID(&cathandle->lgh_id.lgl_oi));
176
177         loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
178 out:
179         if (handle != NULL)
180                 dt_trans_stop(env, dt, handle);
181
182         RETURN(0);
183
184 out_destroy:
185         /* to signal llog_cat_close() it shouldn't try to destroy the llog,
186          * we want to destroy it in this transaction, otherwise the object
187          * becomes an orphan */
188         loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
189         /* this is to mimic full log, so another llog_cat_current_log()
190          * can skip it and ask for another onet */
191         loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) + 1;
192         llog_trans_destroy(env, loghandle, th);
193         RETURN(rc);
194 }
195
196 /* Open an existent log handle and add it to the open list.
197  * This log handle will be closed when all of the records in it are removed.
198  *
199  * Assumes caller has already pushed us into the kernel context and is locking.
200  * We return a lock on the handle to ensure nobody yanks it from us.
201  *
202  * This takes extra reference on llog_handle via llog_handle_get() and require
203  * this reference to be put by caller using llog_handle_put()
204  */
205 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
206                        struct llog_handle **res, struct llog_logid *logid)
207 {
208         struct llog_handle      *loghandle;
209         enum llog_flag           fmt;
210         int                      rc = 0;
211
212         ENTRY;
213
214         if (cathandle == NULL)
215                 RETURN(-EBADF);
216
217         fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK;
218         down_write(&cathandle->lgh_lock);
219         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
220                             u.phd.phd_entry) {
221                 struct llog_logid *cgl = &loghandle->lgh_id;
222
223                 if (ostid_id(&cgl->lgl_oi) == ostid_id(&logid->lgl_oi) &&
224                     ostid_seq(&cgl->lgl_oi) == ostid_seq(&logid->lgl_oi)) {
225                         if (cgl->lgl_ogen != logid->lgl_ogen) {
226                                 CERROR("%s: log "DOSTID" generation %x != %x\n",
227                                        loghandle->lgh_ctxt->loc_obd->obd_name,
228                                        POSTID(&logid->lgl_oi), cgl->lgl_ogen,
229                                        logid->lgl_ogen);
230                                 continue;
231                         }
232                         loghandle->u.phd.phd_cat_handle = cathandle;
233                         up_write(&cathandle->lgh_lock);
234                         GOTO(out, rc = 0);
235                 }
236         }
237         up_write(&cathandle->lgh_lock);
238
239         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
240                        LLOG_OPEN_EXISTS);
241         if (rc < 0) {
242                 CERROR("%s: error opening log id "DOSTID":%x: rc = %d\n",
243                        cathandle->lgh_ctxt->loc_obd->obd_name,
244                        POSTID(&logid->lgl_oi), logid->lgl_ogen, rc);
245                 RETURN(rc);
246         }
247
248         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
249         if (rc < 0) {
250                 llog_close(env, loghandle);
251                 loghandle = NULL;
252                 RETURN(rc);
253         }
254
255         down_write(&cathandle->lgh_lock);
256         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
257         up_write(&cathandle->lgh_lock);
258
259         loghandle->u.phd.phd_cat_handle = cathandle;
260         loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
261         loghandle->u.phd.phd_cookie.lgc_index =
262                                 loghandle->lgh_hdr->llh_cat_idx;
263         EXIT;
264 out:
265         llog_handle_get(loghandle);
266         *res = loghandle;
267         return 0;
268 }
269
270 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
271 {
272         struct llog_handle      *loghandle, *n;
273         int                      rc;
274
275         ENTRY;
276
277         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
278                                  u.phd.phd_entry) {
279                 struct llog_log_hdr     *llh = loghandle->lgh_hdr;
280                 int                      index;
281
282                 /* unlink open-not-created llogs */
283                 list_del_init(&loghandle->u.phd.phd_entry);
284                 llh = loghandle->lgh_hdr;
285                 if (loghandle->lgh_obj != NULL && llh != NULL &&
286                     (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
287                     (llh->llh_count == 1)) {
288                         rc = llog_destroy(env, loghandle);
289                         if (rc)
290                                 CERROR("%s: failure destroying log during "
291                                        "cleanup: rc = %d\n",
292                                        loghandle->lgh_ctxt->loc_obd->obd_name,
293                                        rc);
294
295                         index = loghandle->u.phd.phd_cookie.lgc_index;
296                         llog_cat_cleanup(env, cathandle, NULL, index);
297                 }
298                 llog_close(env, loghandle);
299         }
300         /* if handle was stored in ctxt, remove it too */
301         if (cathandle->lgh_ctxt->loc_handle == cathandle)
302                 cathandle->lgh_ctxt->loc_handle = NULL;
303         rc = llog_close(env, cathandle);
304         RETURN(rc);
305 }
306 EXPORT_SYMBOL(llog_cat_close);
307
308 /**
309  * lockdep markers for nested struct llog_handle::lgh_lock locking.
310  */
311 enum {
312         LLOGH_CAT,
313         LLOGH_LOG
314 };
315
316 /** Return the currently active log handle.  If the current log handle doesn't
317  * have enough space left for the current record, start a new one.
318  *
319  * If reclen is 0, we only want to know what the currently active log is,
320  * otherwise we get a lock on this log so nobody can steal our space.
321  *
322  * Assumes caller has already pushed us into the kernel context and is locking.
323  *
324  * NOTE: loghandle is write-locked upon successful return
325  */
326 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
327                                                 struct thandle *th)
328 {
329         struct llog_handle *loghandle = NULL;
330         ENTRY;
331
332
333         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) {
334                 down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
335                 GOTO(next, loghandle);
336         }
337
338         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
339         loghandle = cathandle->u.chd.chd_current_log;
340         if (loghandle) {
341                 struct llog_log_hdr *llh;
342
343                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
344                 llh = loghandle->lgh_hdr;
345                 if (llh == NULL || !llog_is_full(loghandle)) {
346                         up_read(&cathandle->lgh_lock);
347                         RETURN(loghandle);
348                 } else {
349                         up_write(&loghandle->lgh_lock);
350                 }
351         }
352         up_read(&cathandle->lgh_lock);
353
354         /* time to use next log */
355
356         /* first, we have to make sure the state hasn't changed */
357         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
358         loghandle = cathandle->u.chd.chd_current_log;
359         if (loghandle) {
360                 struct llog_log_hdr *llh;
361
362                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
363                 llh = loghandle->lgh_hdr;
364                 LASSERT(llh);
365                 if (!llog_is_full(loghandle)) {
366                         up_write(&cathandle->lgh_lock);
367                         RETURN(loghandle);
368                 } else {
369                         up_write(&loghandle->lgh_lock);
370                 }
371         }
372
373 next:
374         CDEBUG(D_INODE, "use next log\n");
375
376         loghandle = cathandle->u.chd.chd_next_log;
377         cathandle->u.chd.chd_current_log = loghandle;
378         cathandle->u.chd.chd_next_log = NULL;
379         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
380         up_write(&cathandle->lgh_lock);
381         LASSERT(loghandle);
382         RETURN(loghandle);
383 }
384
385 /* Add a single record to the recovery log(s) using a catalog
386  * Returns as llog_write_record
387  *
388  * Assumes caller has already pushed us into the kernel context.
389  */
390 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
391                      struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
392                      struct thandle *th)
393 {
394         struct llog_handle *loghandle;
395         int rc, retried = 0;
396         ENTRY;
397
398         LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
399
400 retry:
401         loghandle = llog_cat_current_log(cathandle, th);
402         LASSERT(!IS_ERR(loghandle));
403
404         /* loghandle is already locked by llog_cat_current_log() for us */
405         if (!llog_exist(loghandle)) {
406                 rc = llog_cat_new_log(env, cathandle, loghandle, th);
407                 if (rc < 0) {
408                         up_write(&loghandle->lgh_lock);
409                         /* nobody should be trying to use this llog */
410                         down_write(&cathandle->lgh_lock);
411                         if (cathandle->u.chd.chd_current_log == loghandle)
412                                 cathandle->u.chd.chd_current_log = NULL;
413                         up_write(&cathandle->lgh_lock);
414                         RETURN(rc);
415                 }
416         }
417         /* now let's try to add the record */
418         rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
419         if (rc < 0) {
420                 CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR,
421                              "llog_write_rec %d: lh=%p\n", rc, loghandle);
422                 /* -ENOSPC is returned if no empty records left
423                  * and when it's lack of space on the stogage.
424                  * there is no point to try again if it's the second
425                  * case. many callers (like llog test) expect ENOSPC,
426                  * so we preserve this error code, but look for the
427                  * actual cause here */
428                 if (rc == -ENOSPC && llog_is_full(loghandle))
429                         rc = -ENOBUFS;
430         }
431         up_write(&loghandle->lgh_lock);
432
433         if (rc == -ENOBUFS) {
434                 if (retried++ == 0)
435                         GOTO(retry, rc);
436                 CERROR("%s: error on 2nd llog: rc = %d\n",
437                        cathandle->lgh_ctxt->loc_obd->obd_name, rc);
438         }
439
440         RETURN(rc);
441 }
442 EXPORT_SYMBOL(llog_cat_add_rec);
443
444 int llog_cat_declare_add_rec(const struct lu_env *env,
445                              struct llog_handle *cathandle,
446                              struct llog_rec_hdr *rec, struct thandle *th)
447 {
448         struct llog_thread_info *lgi = llog_info(env);
449         struct llog_logid_rec   *lirec = &lgi->lgi_logid;
450         struct llog_handle      *loghandle, *next;
451         int                      rc = 0;
452
453         ENTRY;
454
455         if (cathandle->u.chd.chd_current_log == NULL) {
456                 /* declare new plain llog */
457                 down_write(&cathandle->lgh_lock);
458                 if (cathandle->u.chd.chd_current_log == NULL) {
459                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
460                                        NULL, NULL, LLOG_OPEN_NEW);
461                         if (rc == 0) {
462                                 cathandle->u.chd.chd_current_log = loghandle;
463                                 list_add_tail(&loghandle->u.phd.phd_entry,
464                                               &cathandle->u.chd.chd_head);
465                         }
466                 }
467                 up_write(&cathandle->lgh_lock);
468         } else if (cathandle->u.chd.chd_next_log == NULL) {
469                 /* declare next plain llog */
470                 down_write(&cathandle->lgh_lock);
471                 if (cathandle->u.chd.chd_next_log == NULL) {
472                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
473                                        NULL, NULL, LLOG_OPEN_NEW);
474                         if (rc == 0) {
475                                 cathandle->u.chd.chd_next_log = loghandle;
476                                 list_add_tail(&loghandle->u.phd.phd_entry,
477                                               &cathandle->u.chd.chd_head);
478                         }
479                 }
480                 up_write(&cathandle->lgh_lock);
481         }
482         if (rc)
483                 GOTO(out, rc);
484
485         lirec->lid_hdr.lrh_len = sizeof(*lirec);
486
487         if (!llog_exist(cathandle->u.chd.chd_current_log)) {
488                 if (dt_object_remote(cathandle->lgh_obj)) {
489                         /* If it is remote cat-llog here, let's create the
490                          * remote llog object synchronously, so other threads
491                          * can use it correctly. */
492                         rc = llog_cat_new_log(env, cathandle,
493                                         cathandle->u.chd.chd_current_log, NULL);
494                 } else {
495                         rc = llog_declare_create(env,
496                                         cathandle->u.chd.chd_current_log, th);
497                         if (rc)
498                                 GOTO(out, rc);
499                         llog_declare_write_rec(env, cathandle,
500                                                &lirec->lid_hdr, -1, th);
501                 }
502         }
503         /* declare records in the llogs */
504         rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
505                                     rec, -1, th);
506         if (rc)
507                 GOTO(out, rc);
508
509         next = cathandle->u.chd.chd_next_log;
510         if (next) {
511                 if (!llog_exist(next)) {
512                         if (dt_object_remote(cathandle->lgh_obj)) {
513                                 /* If it is remote cat-llog here, let's create
514                                  * the remote remote llog object synchronously,
515                                  * so other threads can use it correctly. */
516                                 rc = llog_cat_new_log(env, cathandle, next,
517                                                       NULL);
518                         } else {
519                                 rc = llog_declare_create(env, next, th);
520                                 llog_declare_write_rec(env, cathandle,
521                                                 &lirec->lid_hdr, -1, th);
522                         }
523                 }
524                 /* XXX: we hope for declarations made for existing llog
525                  *      this might be not correct with some backends
526                  *      where declarations are expected against specific
527                  *      object like ZFS with full debugging enabled */
528                 /*llog_declare_write_rec(env, next, rec, -1, th);*/
529         }
530 out:
531         RETURN(rc);
532 }
533 EXPORT_SYMBOL(llog_cat_declare_add_rec);
534
535 int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
536                  struct llog_rec_hdr *rec, struct llog_cookie *reccookie)
537 {
538         struct llog_ctxt        *ctxt;
539         struct dt_device        *dt;
540         struct thandle          *th = NULL;
541         int                      rc;
542
543         ctxt = cathandle->lgh_ctxt;
544         LASSERT(ctxt);
545         LASSERT(ctxt->loc_exp);
546
547         LASSERT(cathandle->lgh_obj != NULL);
548         dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
549
550         th = dt_trans_create(env, dt);
551         if (IS_ERR(th))
552                 RETURN(PTR_ERR(th));
553
554         rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
555         if (rc)
556                 GOTO(out_trans, rc);
557
558         rc = dt_trans_start_local(env, dt, th);
559         if (rc)
560                 GOTO(out_trans, rc);
561         rc = llog_cat_add_rec(env, cathandle, rec, reccookie, th);
562 out_trans:
563         dt_trans_stop(env, dt, th);
564         RETURN(rc);
565 }
566 EXPORT_SYMBOL(llog_cat_add);
567
568 /* For each cookie in the cookie array, we clear the log in-use bit and either:
569  * - the log is empty, so mark it free in the catalog header and delete it
570  * - the log is not empty, just write out the log header
571  *
572  * The cookies may be in different log files, so we need to get new logs
573  * each time.
574  *
575  * Assumes caller has already pushed us into the kernel context.
576  */
577 int llog_cat_cancel_records(const struct lu_env *env,
578                             struct llog_handle *cathandle, int count,
579                             struct llog_cookie *cookies)
580 {
581         int i, index, rc = 0, failed = 0;
582
583         ENTRY;
584
585         for (i = 0; i < count; i++, cookies++) {
586                 struct llog_handle      *loghandle;
587                 struct llog_logid       *lgl = &cookies->lgc_lgl;
588                 int                      lrc;
589
590                 rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
591                 if (rc) {
592                         CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
593                                cathandle->lgh_ctxt->loc_obd->obd_name,
594                                POSTID(&lgl->lgl_oi), rc);
595                         failed++;
596                         continue;
597                 }
598
599                 lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
600                 if (lrc == LLOG_DEL_PLAIN) { /* log has been destroyed */
601                         index = loghandle->u.phd.phd_cookie.lgc_index;
602                         rc = llog_cat_cleanup(env, cathandle, loghandle,
603                                               index);
604                 } else if (lrc == -ENOENT) {
605                         if (rc == 0) /* ENOENT shouldn't rewrite any error */
606                                 rc = lrc;
607                 } else if (lrc < 0) {
608                         failed++;
609                         rc = lrc;
610                 }
611                 llog_handle_put(loghandle);
612         }
613         if (rc)
614                 CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
615                        cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
616                        rc);
617
618         RETURN(rc);
619 }
620 EXPORT_SYMBOL(llog_cat_cancel_records);
621
622 static int llog_cat_process_cb(const struct lu_env *env,
623                                struct llog_handle *cat_llh,
624                                struct llog_rec_hdr *rec, void *data)
625 {
626         struct llog_process_data *d = data;
627         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
628         struct llog_handle *llh;
629         struct llog_log_hdr *hdr;
630         int rc;
631
632         ENTRY;
633         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
634                 CERROR("invalid record in catalog\n");
635                 RETURN(-EINVAL);
636         }
637         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
638                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
639                rec->lrh_index, POSTID(&cat_llh->lgh_id.lgl_oi));
640
641         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
642         if (rc) {
643                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
644                        cat_llh->lgh_ctxt->loc_obd->obd_name,
645                        POSTID(&lir->lid_id.lgl_oi), rc);
646                 if (rc == -ENOENT || rc == -ESTALE) {
647                         /* After a server crash, a stub of index
648                          * record in catlog could be kept, because
649                          * plain log destroy + catlog index record
650                          * deletion are not atomic. So we end up with
651                          * an index but no actual record. Destroy the
652                          * index and move on. */
653                         rc = llog_cat_cleanup(env, cat_llh, NULL,
654                                               rec->lrh_index);
655                 }
656
657                 RETURN(rc);
658         }
659
660         /* clean old empty llogs, do not consider current llog in use */
661         /* ignore remote (lgh_obj=NULL) llogs */
662         hdr = llh->lgh_hdr;
663         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
664             hdr->llh_count == 1 && cat_llh->lgh_obj != NULL &&
665             llh != cat_llh->u.chd.chd_current_log) {
666                 rc = llog_destroy(env, llh);
667                 if (rc)
668                         CERROR("%s: fail to destroy empty log: rc = %d\n",
669                                llh->lgh_ctxt->loc_obd->obd_name, rc);
670                 GOTO(out, rc = LLOG_DEL_PLAIN);
671         }
672
673         if (rec->lrh_index < d->lpd_startcat) {
674                 /* Skip processing of the logs until startcat */
675                 rc = 0;
676         } else if (d->lpd_startidx > 0) {
677                 struct llog_process_cat_data cd;
678
679                 cd.lpcd_first_idx = d->lpd_startidx;
680                 cd.lpcd_last_idx = 0;
681                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
682                                           &cd, false);
683                 /* Continue processing the next log from idx 0 */
684                 d->lpd_startidx = 0;
685         } else {
686                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
687                                           NULL, false);
688         }
689
690 out:
691         /* The empty plain log was destroyed while processing */
692         if (rc == LLOG_DEL_PLAIN)
693                 rc = llog_cat_cleanup(env, cat_llh, llh,
694                                       llh->u.phd.phd_cookie.lgc_index);
695         llog_handle_put(llh);
696
697         RETURN(rc);
698 }
699
700 int llog_cat_process_or_fork(const struct lu_env *env,
701                              struct llog_handle *cat_llh,
702                              llog_cb_t cb, void *data, int startcat,
703                              int startidx, bool fork)
704 {
705         struct llog_process_data d;
706         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
707         int rc;
708         ENTRY;
709
710         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
711         d.lpd_data = data;
712         d.lpd_cb = cb;
713         d.lpd_startcat = startcat;
714         d.lpd_startidx = startidx;
715
716         if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
717             llh->llh_count > 1) {
718                 struct llog_process_cat_data cd;
719
720                 CWARN("catlog "DOSTID" crosses index zero\n",
721                       POSTID(&cat_llh->lgh_id.lgl_oi));
722
723                 cd.lpcd_first_idx = llh->llh_cat_idx;
724                 cd.lpcd_last_idx = 0;
725                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
726                                           &d, &cd, fork);
727                 if (rc != 0)
728                         RETURN(rc);
729
730                 cd.lpcd_first_idx = 0;
731                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
732                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
733                                           &d, &cd, fork);
734         } else {
735                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
736                                           &d, NULL, fork);
737         }
738
739         RETURN(rc);
740 }
741
742 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
743                      llog_cb_t cb, void *data, int startcat, int startidx)
744 {
745         return llog_cat_process_or_fork(env, cat_llh, cb, data, startcat,
746                                         startidx, false);
747 }
748 EXPORT_SYMBOL(llog_cat_process);
749
750 static int llog_cat_reverse_process_cb(const struct lu_env *env,
751                                        struct llog_handle *cat_llh,
752                                        struct llog_rec_hdr *rec, void *data)
753 {
754         struct llog_process_data *d = data;
755         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
756         struct llog_handle *llh;
757         struct llog_log_hdr *hdr;
758         int rc;
759
760         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
761                 CERROR("invalid record in catalog\n");
762                 RETURN(-EINVAL);
763         }
764         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
765                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
766                le32_to_cpu(rec->lrh_index), POSTID(&cat_llh->lgh_id.lgl_oi));
767
768         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
769         if (rc) {
770                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
771                        cat_llh->lgh_ctxt->loc_obd->obd_name,
772                        POSTID(&lir->lid_id.lgl_oi), rc);
773                 if (rc == -ENOENT || rc == -ESTALE) {
774                         /* After a server crash, a stub of index
775                          * record in catlog could be kept, because
776                          * plain log destroy + catlog index record
777                          * deletion are not atomic. So we end up with
778                          * an index but no actual record. Destroy the
779                          * index and move on. */
780                         rc = llog_cat_cleanup(env, cat_llh, NULL,
781                                               rec->lrh_index);
782                 }
783
784                 RETURN(rc);
785         }
786
787         /* clean old empty llogs, do not consider current llog in use */
788         hdr = llh->lgh_hdr;
789         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
790             hdr->llh_count == 1 &&
791             llh != cat_llh->u.chd.chd_current_log) {
792                 rc = llog_destroy(env, llh);
793                 if (rc)
794                         CERROR("%s: fail to destroy empty log: rc = %d\n",
795                                llh->lgh_ctxt->loc_obd->obd_name, rc);
796                 GOTO(out, rc = LLOG_DEL_PLAIN);
797         }
798
799         rc = llog_reverse_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
800
801 out:
802         /* The empty plain was destroyed while processing */
803         if (rc == LLOG_DEL_PLAIN)
804                 rc = llog_cat_cleanup(env, cat_llh, llh,
805                                       llh->u.phd.phd_cookie.lgc_index);
806
807         llog_handle_put(llh);
808         RETURN(rc);
809 }
810
811 int llog_cat_reverse_process(const struct lu_env *env,
812                              struct llog_handle *cat_llh,
813                              llog_cb_t cb, void *data)
814 {
815         struct llog_process_data d;
816         struct llog_process_cat_data cd;
817         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
818         int rc;
819         ENTRY;
820
821         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
822         d.lpd_data = data;
823         d.lpd_cb = cb;
824
825         if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
826             llh->llh_count > 1) {
827                 CWARN("catalog "DOSTID" crosses index zero\n",
828                       POSTID(&cat_llh->lgh_id.lgl_oi));
829
830                 cd.lpcd_first_idx = 0;
831                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
832                 rc = llog_reverse_process(env, cat_llh,
833                                           llog_cat_reverse_process_cb,
834                                           &d, &cd);
835                 if (rc != 0)
836                         RETURN(rc);
837
838                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
839                 cd.lpcd_last_idx = 0;
840                 rc = llog_reverse_process(env, cat_llh,
841                                           llog_cat_reverse_process_cb,
842                                           &d, &cd);
843         } else {
844                 rc = llog_reverse_process(env, cat_llh,
845                                           llog_cat_reverse_process_cb,
846                                           &d, NULL);
847         }
848
849         RETURN(rc);
850 }
851 EXPORT_SYMBOL(llog_cat_reverse_process);
852
853 static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
854 {
855         struct llog_log_hdr *llh = cathandle->lgh_hdr;
856         int bitmap_size;
857
858         ENTRY;
859
860         bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
861         /*
862          * The llh_cat_idx equals to the first used index minus 1
863          * so if we canceled the first index then llh_cat_idx
864          * must be renewed.
865          */
866         if (llh->llh_cat_idx == (idx - 1)) {
867                 llh->llh_cat_idx = idx;
868
869                 while (idx != cathandle->lgh_last_idx) {
870                         idx = (idx + 1) % bitmap_size;
871                         if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
872                                 /* update llh_cat_idx for each unset bit,
873                                  * expecting the next one is set */
874                                 llh->llh_cat_idx = idx;
875                         } else if (idx == 0) {
876                                 /* skip header bit */
877                                 llh->llh_cat_idx = 0;
878                                 continue;
879                         } else {
880                                 /* the first index is found */
881                                 break;
882                         }
883                 }
884
885                 CDEBUG(D_RPCTRACE, "Set catlog "DOSTID" first idx %u,"
886                        " (last_idx %u)\n", POSTID(&cathandle->lgh_id.lgl_oi),
887                        llh->llh_cat_idx, cathandle->lgh_last_idx);
888         }
889
890         RETURN(0);
891 }
892
893 /* Cleanup deleted plain llog traces from catalog */
894 int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
895                      struct llog_handle *loghandle, int index)
896 {
897         int rc;
898
899         LASSERT(index);
900         if (loghandle != NULL) {
901                 /* remove destroyed llog from catalog list and
902                  * chd_current_log variable */
903                 down_write(&cathandle->lgh_lock);
904                 if (cathandle->u.chd.chd_current_log == loghandle)
905                         cathandle->u.chd.chd_current_log = NULL;
906                 list_del_init(&loghandle->u.phd.phd_entry);
907                 up_write(&cathandle->lgh_lock);
908                 LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index);
909                 /* llog was opened and keep in a list, close it now */
910                 llog_close(env, loghandle);
911         }
912
913         /* do not attempt to cleanup on-disk llog if on client side */
914         if (cathandle->lgh_obj == NULL)
915                 return 0;
916
917         /* remove plain llog entry from catalog by index */
918         llog_cat_set_first_idx(cathandle, index);
919         rc = llog_cancel_rec(env, cathandle, index);
920         if (rc == 0)
921                 CDEBUG(D_HA, "cancel plain log at index"
922                        " %u of catalog "DOSTID"\n",
923                        index, POSTID(&cathandle->lgh_id.lgl_oi));
924         return rc;
925 }
926
927 /* helper to initialize catalog llog and process it to cancel */
928 int llog_cat_init_and_process(const struct lu_env *env,
929                               struct llog_handle *llh)
930 {
931         int rc;
932
933         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, NULL);
934         if (rc)
935                 RETURN(rc);
936
937         RETURN(0);
938 }
939 EXPORT_SYMBOL(llog_cat_init_and_process);