Whamcloud - gitweb
LU-6846 llog: create remote llog synchronously
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_cat.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  *
40  * Invariants in implementation:
41  * - we do not share logs among different OST<->MDS connections, so that
42  *   if an OST or MDS fails it need only look at log(s) relevant to itself
43  *
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
46  * Author: Mikhail Pershin <mike.pershin@intel.com>
47  */
48
49 #define DEBUG_SUBSYSTEM S_LOG
50
51
52 #include <obd_class.h>
53
54 #include "llog_internal.h"
55
56 /* Create a new log handle and add it to the open list.
57  * This log handle will be closed when all of the records in it are removed.
58  *
59  * Assumes caller has already pushed us into the kernel context and is locking.
60  */
61 static int llog_cat_new_log(const struct lu_env *env,
62                             struct llog_handle *cathandle,
63                             struct llog_handle *loghandle,
64                             struct thandle *th)
65 {
66         struct llog_thread_info *lgi = llog_info(env);
67         struct llog_logid_rec   *rec = &lgi->lgi_logid;
68         int                      rc;
69         struct thandle *handle = NULL;
70         struct dt_device *dt = NULL;
71
72         ENTRY;
73
74         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
75                 RETURN(-ENOSPC);
76
77         if (loghandle->lgh_hdr != NULL) {
78                 /* If llog object is remote and creation is failed, lgh_hdr
79                  * might be left over here, free it first */
80                 LASSERT(!llog_exist(loghandle));
81                 OBD_FREE_PTR(loghandle->lgh_hdr);
82                 loghandle->lgh_hdr = NULL;
83         }
84
85         if (th == NULL) {
86                 dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
87
88                 handle = dt_trans_create(env, dt);
89                 if (IS_ERR(handle))
90                         RETURN(PTR_ERR(handle));
91
92                 /* Create update llog object synchronously, which
93                  * happens during inialization process see
94                  * lod_sub_prep_llog(), to make sure the update
95                  * llog object is created before corss-MDT writing
96                  * updates into the llog object */
97                 if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
98                         handle->th_sync = 1;
99
100                 handle->th_wait_submit = 1;
101
102                 rc = llog_declare_create(env, loghandle, handle);
103                 if (rc != 0)
104                         GOTO(out, rc);
105
106                 rec->lid_hdr.lrh_len = sizeof(*rec);
107                 rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
108                 rec->lid_id = loghandle->lgh_id;
109                 rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
110                                             handle);
111                 if (rc != 0)
112                         GOTO(out, rc);
113
114                 rc = dt_trans_start_local(env, dt, handle);
115                 if (rc != 0)
116                         GOTO(out, rc);
117
118                 th = handle;
119         }
120
121         rc = llog_create(env, loghandle, th);
122         /* if llog is already created, no need to initialize it */
123         if (rc == -EEXIST) {
124                 GOTO(out, rc = 0);
125         } else if (rc != 0) {
126                 CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
127                        loghandle->lgh_ctxt->loc_obd->obd_name, rc);
128                 GOTO(out, rc);
129         }
130
131         rc = llog_init_handle(env, loghandle,
132                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
133                               &cathandle->lgh_hdr->llh_tgtuuid);
134         if (rc < 0)
135                 GOTO(out, rc);
136
137         /* build the record for this log in the catalog */
138         rec->lid_hdr.lrh_len = sizeof(*rec);
139         rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
140         rec->lid_id = loghandle->lgh_id;
141
142         /* append the new record into catalog. The new index will be
143          * assigned to the record and updated in rec header */
144         rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
145                             &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
146         if (rc < 0)
147                 GOTO(out, rc);
148
149         CDEBUG(D_OTHER, "new recovery log "DOSTID":%x for index %u of catalog"
150                DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
151                loghandle->lgh_id.lgl_ogen, rec->lid_hdr.lrh_index,
152                POSTID(&cathandle->lgh_id.lgl_oi));
153
154         loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
155 out:
156         if (handle != NULL)
157                 dt_trans_stop(env, dt, handle);
158
159         RETURN(0);
160 }
161
162 /* Open an existent log handle and add it to the open list.
163  * This log handle will be closed when all of the records in it are removed.
164  *
165  * Assumes caller has already pushed us into the kernel context and is locking.
166  * We return a lock on the handle to ensure nobody yanks it from us.
167  *
168  * This takes extra reference on llog_handle via llog_handle_get() and require
169  * this reference to be put by caller using llog_handle_put()
170  */
171 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
172                        struct llog_handle **res, struct llog_logid *logid)
173 {
174         struct llog_handle      *loghandle;
175         enum llog_flag           fmt;
176         int                      rc = 0;
177
178         ENTRY;
179
180         if (cathandle == NULL)
181                 RETURN(-EBADF);
182
183         fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK;
184         down_write(&cathandle->lgh_lock);
185         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
186                             u.phd.phd_entry) {
187                 struct llog_logid *cgl = &loghandle->lgh_id;
188
189                 if (ostid_id(&cgl->lgl_oi) == ostid_id(&logid->lgl_oi) &&
190                     ostid_seq(&cgl->lgl_oi) == ostid_seq(&logid->lgl_oi)) {
191                         if (cgl->lgl_ogen != logid->lgl_ogen) {
192                                 CERROR("%s: log "DOSTID" generation %x != %x\n",
193                                        loghandle->lgh_ctxt->loc_obd->obd_name,
194                                        POSTID(&logid->lgl_oi), cgl->lgl_ogen,
195                                        logid->lgl_ogen);
196                                 continue;
197                         }
198                         loghandle->u.phd.phd_cat_handle = cathandle;
199                         up_write(&cathandle->lgh_lock);
200                         GOTO(out, rc = 0);
201                 }
202         }
203         up_write(&cathandle->lgh_lock);
204
205         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
206                        LLOG_OPEN_EXISTS);
207         if (rc < 0) {
208                 CERROR("%s: error opening log id "DOSTID":%x: rc = %d\n",
209                        cathandle->lgh_ctxt->loc_obd->obd_name,
210                        POSTID(&logid->lgl_oi), logid->lgl_ogen, rc);
211                 RETURN(rc);
212         }
213
214         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
215         if (rc < 0) {
216                 llog_close(env, loghandle);
217                 loghandle = NULL;
218                 RETURN(rc);
219         }
220
221         down_write(&cathandle->lgh_lock);
222         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
223         up_write(&cathandle->lgh_lock);
224
225         loghandle->u.phd.phd_cat_handle = cathandle;
226         loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
227         loghandle->u.phd.phd_cookie.lgc_index =
228                                 loghandle->lgh_hdr->llh_cat_idx;
229         EXIT;
230 out:
231         llog_handle_get(loghandle);
232         *res = loghandle;
233         return 0;
234 }
235
236 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
237 {
238         struct llog_handle      *loghandle, *n;
239         int                      rc;
240
241         ENTRY;
242
243         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
244                                  u.phd.phd_entry) {
245                 struct llog_log_hdr     *llh = loghandle->lgh_hdr;
246                 int                      index;
247
248                 /* unlink open-not-created llogs */
249                 list_del_init(&loghandle->u.phd.phd_entry);
250                 llh = loghandle->lgh_hdr;
251                 if (loghandle->lgh_obj != NULL && llh != NULL &&
252                     (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
253                     (llh->llh_count == 1)) {
254                         rc = llog_destroy(env, loghandle);
255                         if (rc)
256                                 CERROR("%s: failure destroying log during "
257                                        "cleanup: rc = %d\n",
258                                        loghandle->lgh_ctxt->loc_obd->obd_name,
259                                        rc);
260
261                         index = loghandle->u.phd.phd_cookie.lgc_index;
262                         llog_cat_cleanup(env, cathandle, NULL, index);
263                 }
264                 llog_close(env, loghandle);
265         }
266         /* if handle was stored in ctxt, remove it too */
267         if (cathandle->lgh_ctxt->loc_handle == cathandle)
268                 cathandle->lgh_ctxt->loc_handle = NULL;
269         rc = llog_close(env, cathandle);
270         RETURN(rc);
271 }
272 EXPORT_SYMBOL(llog_cat_close);
273
274 /**
275  * lockdep markers for nested struct llog_handle::lgh_lock locking.
276  */
277 enum {
278         LLOGH_CAT,
279         LLOGH_LOG
280 };
281
282 /** Return the currently active log handle.  If the current log handle doesn't
283  * have enough space left for the current record, start a new one.
284  *
285  * If reclen is 0, we only want to know what the currently active log is,
286  * otherwise we get a lock on this log so nobody can steal our space.
287  *
288  * Assumes caller has already pushed us into the kernel context and is locking.
289  *
290  * NOTE: loghandle is write-locked upon successful return
291  */
292 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
293                                                 struct thandle *th)
294 {
295         struct llog_handle *loghandle = NULL;
296         ENTRY;
297
298         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
299         loghandle = cathandle->u.chd.chd_current_log;
300         if (loghandle) {
301                 struct llog_log_hdr *llh;
302
303                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
304                 llh = loghandle->lgh_hdr;
305                 if (llh == NULL ||
306                     loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
307                         up_read(&cathandle->lgh_lock);
308                         RETURN(loghandle);
309                 } else {
310                         up_write(&loghandle->lgh_lock);
311                 }
312         }
313         up_read(&cathandle->lgh_lock);
314
315         /* time to use next log */
316
317         /* first, we have to make sure the state hasn't changed */
318         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
319         loghandle = cathandle->u.chd.chd_current_log;
320         if (loghandle) {
321                 struct llog_log_hdr *llh;
322
323                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
324                 llh = loghandle->lgh_hdr;
325                 LASSERT(llh);
326                 if (loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
327                         up_write(&cathandle->lgh_lock);
328                         RETURN(loghandle);
329                 } else {
330                         up_write(&loghandle->lgh_lock);
331                 }
332         }
333
334         CDEBUG(D_INODE, "use next log\n");
335
336         loghandle = cathandle->u.chd.chd_next_log;
337         cathandle->u.chd.chd_current_log = loghandle;
338         cathandle->u.chd.chd_next_log = NULL;
339         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
340         up_write(&cathandle->lgh_lock);
341         LASSERT(loghandle);
342         RETURN(loghandle);
343 }
344
345 /* Add a single record to the recovery log(s) using a catalog
346  * Returns as llog_write_record
347  *
348  * Assumes caller has already pushed us into the kernel context.
349  */
350 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
351                      struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
352                      struct thandle *th)
353 {
354         struct llog_handle *loghandle;
355         int rc;
356         ENTRY;
357
358         LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
359         loghandle = llog_cat_current_log(cathandle, th);
360         LASSERT(!IS_ERR(loghandle));
361
362         /* loghandle is already locked by llog_cat_current_log() for us */
363         if (!llog_exist(loghandle)) {
364                 rc = llog_cat_new_log(env, cathandle, loghandle, th);
365                 if (rc < 0) {
366                         up_write(&loghandle->lgh_lock);
367                         RETURN(rc);
368                 }
369         }
370         /* now let's try to add the record */
371         rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
372         if (rc < 0)
373                 CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR,
374                              "llog_write_rec %d: lh=%p\n", rc, loghandle);
375         up_write(&loghandle->lgh_lock);
376         if (rc == -ENOSPC) {
377                 /* try to use next log */
378                 loghandle = llog_cat_current_log(cathandle, th);
379                 LASSERT(!IS_ERR(loghandle));
380                 /* new llog can be created concurrently */
381                 if (!llog_exist(loghandle)) {
382                         rc = llog_cat_new_log(env, cathandle, loghandle, th);
383                         if (rc < 0) {
384                                 up_write(&loghandle->lgh_lock);
385                                 RETURN(rc);
386                         }
387                 }
388                 /* now let's try to add the record */
389                 rc = llog_write_rec(env, loghandle, rec, reccookie,
390                                     LLOG_NEXT_IDX, th);
391                 if (rc < 0)
392                         CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
393                 up_write(&loghandle->lgh_lock);
394         }
395
396         RETURN(rc);
397 }
398 EXPORT_SYMBOL(llog_cat_add_rec);
399
400 int llog_cat_declare_add_rec(const struct lu_env *env,
401                              struct llog_handle *cathandle,
402                              struct llog_rec_hdr *rec, struct thandle *th)
403 {
404         struct llog_thread_info *lgi = llog_info(env);
405         struct llog_logid_rec   *lirec = &lgi->lgi_logid;
406         struct llog_handle      *loghandle, *next;
407         int                      rc = 0;
408
409         ENTRY;
410
411         if (cathandle->u.chd.chd_current_log == NULL) {
412                 /* declare new plain llog */
413                 down_write(&cathandle->lgh_lock);
414                 if (cathandle->u.chd.chd_current_log == NULL) {
415                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
416                                        NULL, NULL, LLOG_OPEN_NEW);
417                         if (rc == 0) {
418                                 cathandle->u.chd.chd_current_log = loghandle;
419                                 list_add_tail(&loghandle->u.phd.phd_entry,
420                                               &cathandle->u.chd.chd_head);
421                         }
422                 }
423                 up_write(&cathandle->lgh_lock);
424         } else if (cathandle->u.chd.chd_next_log == NULL) {
425                 /* declare next plain llog */
426                 down_write(&cathandle->lgh_lock);
427                 if (cathandle->u.chd.chd_next_log == NULL) {
428                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
429                                        NULL, NULL, LLOG_OPEN_NEW);
430                         if (rc == 0) {
431                                 cathandle->u.chd.chd_next_log = loghandle;
432                                 list_add_tail(&loghandle->u.phd.phd_entry,
433                                               &cathandle->u.chd.chd_head);
434                         }
435                 }
436                 up_write(&cathandle->lgh_lock);
437         }
438         if (rc)
439                 GOTO(out, rc);
440
441         lirec->lid_hdr.lrh_len = sizeof(*lirec);
442
443         if (!llog_exist(cathandle->u.chd.chd_current_log)) {
444                 if (dt_object_remote(cathandle->lgh_obj)) {
445                         /* If it is remote cat-llog here, let's create the
446                          * remote llog object synchronously, so other threads
447                          * can use it correctly. */
448                         rc = llog_cat_new_log(env, cathandle,
449                                         cathandle->u.chd.chd_current_log, NULL);
450                 } else {
451                         rc = llog_declare_create(env,
452                                         cathandle->u.chd.chd_current_log, th);
453                         if (rc)
454                                 GOTO(out, rc);
455                         llog_declare_write_rec(env, cathandle,
456                                                &lirec->lid_hdr, -1, th);
457                 }
458         }
459         /* declare records in the llogs */
460         rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
461                                     rec, -1, th);
462         if (rc)
463                 GOTO(out, rc);
464
465         next = cathandle->u.chd.chd_next_log;
466         if (next) {
467                 if (!llog_exist(next)) {
468                         if (dt_object_remote(cathandle->lgh_obj)) {
469                                 /* If it is remote cat-llog here, let's create
470                                  * the remote remote llog object synchronously,
471                                  * so other threads can use it correctly. */
472                                 rc = llog_cat_new_log(env, cathandle, next,
473                                                       NULL);
474                         } else {
475                                 rc = llog_declare_create(env, next, th);
476                                 llog_declare_write_rec(env, cathandle,
477                                                 &lirec->lid_hdr, -1, th);
478                         }
479                 }
480                 /* XXX: we hope for declarations made for existing llog
481                  *      this might be not correct with some backends
482                  *      where declarations are expected against specific
483                  *      object like ZFS with full debugging enabled */
484                 /*llog_declare_write_rec(env, next, rec, -1, th);*/
485         }
486 out:
487         RETURN(rc);
488 }
489 EXPORT_SYMBOL(llog_cat_declare_add_rec);
490
491 int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
492                  struct llog_rec_hdr *rec, struct llog_cookie *reccookie)
493 {
494         struct llog_ctxt        *ctxt;
495         struct dt_device        *dt;
496         struct thandle          *th = NULL;
497         int                      rc;
498
499         ctxt = cathandle->lgh_ctxt;
500         LASSERT(ctxt);
501         LASSERT(ctxt->loc_exp);
502
503         LASSERT(cathandle->lgh_obj != NULL);
504         dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
505
506         th = dt_trans_create(env, dt);
507         if (IS_ERR(th))
508                 RETURN(PTR_ERR(th));
509
510         rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
511         if (rc)
512                 GOTO(out_trans, rc);
513
514         rc = dt_trans_start_local(env, dt, th);
515         if (rc)
516                 GOTO(out_trans, rc);
517         rc = llog_cat_add_rec(env, cathandle, rec, reccookie, th);
518 out_trans:
519         dt_trans_stop(env, dt, th);
520         RETURN(rc);
521 }
522 EXPORT_SYMBOL(llog_cat_add);
523
524 /* For each cookie in the cookie array, we clear the log in-use bit and either:
525  * - the log is empty, so mark it free in the catalog header and delete it
526  * - the log is not empty, just write out the log header
527  *
528  * The cookies may be in different log files, so we need to get new logs
529  * each time.
530  *
531  * Assumes caller has already pushed us into the kernel context.
532  */
533 int llog_cat_cancel_records(const struct lu_env *env,
534                             struct llog_handle *cathandle, int count,
535                             struct llog_cookie *cookies)
536 {
537         int i, index, rc = 0, failed = 0;
538
539         ENTRY;
540
541         for (i = 0; i < count; i++, cookies++) {
542                 struct llog_handle      *loghandle;
543                 struct llog_logid       *lgl = &cookies->lgc_lgl;
544                 int                      lrc;
545
546                 rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
547                 if (rc) {
548                         CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
549                                cathandle->lgh_ctxt->loc_obd->obd_name,
550                                POSTID(&lgl->lgl_oi), rc);
551                         failed++;
552                         continue;
553                 }
554
555                 lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
556                 if (lrc == LLOG_DEL_PLAIN) { /* log has been destroyed */
557                         index = loghandle->u.phd.phd_cookie.lgc_index;
558                         rc = llog_cat_cleanup(env, cathandle, loghandle,
559                                               index);
560                 } else if (lrc == -ENOENT) {
561                         if (rc == 0) /* ENOENT shouldn't rewrite any error */
562                                 rc = lrc;
563                 } else if (lrc < 0) {
564                         failed++;
565                         rc = lrc;
566                 }
567                 llog_handle_put(loghandle);
568         }
569         if (rc)
570                 CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
571                        cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
572                        rc);
573
574         RETURN(rc);
575 }
576 EXPORT_SYMBOL(llog_cat_cancel_records);
577
578 static int llog_cat_process_cb(const struct lu_env *env,
579                                struct llog_handle *cat_llh,
580                                struct llog_rec_hdr *rec, void *data)
581 {
582         struct llog_process_data *d = data;
583         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
584         struct llog_handle *llh;
585         struct llog_log_hdr *hdr;
586         int rc;
587
588         ENTRY;
589         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
590                 CERROR("invalid record in catalog\n");
591                 RETURN(-EINVAL);
592         }
593         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
594                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
595                rec->lrh_index, POSTID(&cat_llh->lgh_id.lgl_oi));
596
597         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
598         if (rc) {
599                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
600                        cat_llh->lgh_ctxt->loc_obd->obd_name,
601                        POSTID(&lir->lid_id.lgl_oi), rc);
602                 if (rc == -ENOENT || rc == -ESTALE) {
603                         /* After a server crash, a stub of index
604                          * record in catlog could be kept, because
605                          * plain log destroy + catlog index record
606                          * deletion are not atomic. So we end up with
607                          * an index but no actual record. Destroy the
608                          * index and move on. */
609                         rc = llog_cat_cleanup(env, cat_llh, NULL,
610                                               rec->lrh_index);
611                 }
612
613                 RETURN(rc);
614         }
615
616         /* clean old empty llogs, do not consider current llog in use */
617         /* ignore remote (lgh_obj=NULL) llogs */
618         hdr = llh->lgh_hdr;
619         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
620             hdr->llh_count == 1 && cat_llh->lgh_obj != NULL &&
621             llh != cat_llh->u.chd.chd_current_log) {
622                 rc = llog_destroy(env, llh);
623                 if (rc)
624                         CERROR("%s: fail to destroy empty log: rc = %d\n",
625                                llh->lgh_ctxt->loc_obd->obd_name, rc);
626                 GOTO(out, rc = LLOG_DEL_PLAIN);
627         }
628
629         if (rec->lrh_index < d->lpd_startcat) {
630                 /* Skip processing of the logs until startcat */
631                 rc = 0;
632         } else if (d->lpd_startidx > 0) {
633                 struct llog_process_cat_data cd;
634
635                 cd.lpcd_first_idx = d->lpd_startidx;
636                 cd.lpcd_last_idx = 0;
637                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
638                                           &cd, false);
639                 /* Continue processing the next log from idx 0 */
640                 d->lpd_startidx = 0;
641         } else {
642                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
643                                           NULL, false);
644         }
645
646 out:
647         /* The empty plain log was destroyed while processing */
648         if (rc == LLOG_DEL_PLAIN)
649                 rc = llog_cat_cleanup(env, cat_llh, llh,
650                                       llh->u.phd.phd_cookie.lgc_index);
651         llog_handle_put(llh);
652
653         RETURN(rc);
654 }
655
656 int llog_cat_process_or_fork(const struct lu_env *env,
657                              struct llog_handle *cat_llh,
658                              llog_cb_t cb, void *data, int startcat,
659                              int startidx, bool fork)
660 {
661         struct llog_process_data d;
662         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
663         int rc;
664         ENTRY;
665
666         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
667         d.lpd_data = data;
668         d.lpd_cb = cb;
669         d.lpd_startcat = startcat;
670         d.lpd_startidx = startidx;
671
672         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
673                 struct llog_process_cat_data cd;
674
675                 CWARN("catlog "DOSTID" crosses index zero\n",
676                       POSTID(&cat_llh->lgh_id.lgl_oi));
677
678                 cd.lpcd_first_idx = llh->llh_cat_idx;
679                 cd.lpcd_last_idx = 0;
680                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
681                                           &d, &cd, fork);
682                 if (rc != 0)
683                         RETURN(rc);
684
685                 cd.lpcd_first_idx = 0;
686                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
687                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
688                                           &d, &cd, fork);
689         } else {
690                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
691                                           &d, NULL, fork);
692         }
693
694         RETURN(rc);
695 }
696
697 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
698                      llog_cb_t cb, void *data, int startcat, int startidx)
699 {
700         return llog_cat_process_or_fork(env, cat_llh, cb, data, startcat,
701                                         startidx, false);
702 }
703 EXPORT_SYMBOL(llog_cat_process);
704
705 static int llog_cat_reverse_process_cb(const struct lu_env *env,
706                                        struct llog_handle *cat_llh,
707                                        struct llog_rec_hdr *rec, void *data)
708 {
709         struct llog_process_data *d = data;
710         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
711         struct llog_handle *llh;
712         struct llog_log_hdr *hdr;
713         int rc;
714
715         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
716                 CERROR("invalid record in catalog\n");
717                 RETURN(-EINVAL);
718         }
719         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
720                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
721                le32_to_cpu(rec->lrh_index), POSTID(&cat_llh->lgh_id.lgl_oi));
722
723         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
724         if (rc) {
725                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
726                        cat_llh->lgh_ctxt->loc_obd->obd_name,
727                        POSTID(&lir->lid_id.lgl_oi), rc);
728                 if (rc == -ENOENT || rc == -ESTALE) {
729                         /* After a server crash, a stub of index
730                          * record in catlog could be kept, because
731                          * plain log destroy + catlog index record
732                          * deletion are not atomic. So we end up with
733                          * an index but no actual record. Destroy the
734                          * index and move on. */
735                         rc = llog_cat_cleanup(env, cat_llh, NULL,
736                                               rec->lrh_index);
737                 }
738
739                 RETURN(rc);
740         }
741
742         /* clean old empty llogs, do not consider current llog in use */
743         hdr = llh->lgh_hdr;
744         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
745             hdr->llh_count == 1 &&
746             llh != cat_llh->u.chd.chd_current_log) {
747                 rc = llog_destroy(env, llh);
748                 if (rc)
749                         CERROR("%s: fail to destroy empty log: rc = %d\n",
750                                llh->lgh_ctxt->loc_obd->obd_name, rc);
751                 GOTO(out, rc = LLOG_DEL_PLAIN);
752         }
753
754         rc = llog_reverse_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
755
756 out:
757         /* The empty plain was destroyed while processing */
758         if (rc == LLOG_DEL_PLAIN)
759                 rc = llog_cat_cleanup(env, cat_llh, llh,
760                                       llh->u.phd.phd_cookie.lgc_index);
761
762         llog_handle_put(llh);
763         RETURN(rc);
764 }
765
766 int llog_cat_reverse_process(const struct lu_env *env,
767                              struct llog_handle *cat_llh,
768                              llog_cb_t cb, void *data)
769 {
770         struct llog_process_data d;
771         struct llog_process_cat_data cd;
772         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
773         int rc;
774         ENTRY;
775
776         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
777         d.lpd_data = data;
778         d.lpd_cb = cb;
779
780         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
781                 CWARN("catalog "DOSTID" crosses index zero\n",
782                       POSTID(&cat_llh->lgh_id.lgl_oi));
783
784                 cd.lpcd_first_idx = 0;
785                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
786                 rc = llog_reverse_process(env, cat_llh,
787                                           llog_cat_reverse_process_cb,
788                                           &d, &cd);
789                 if (rc != 0)
790                         RETURN(rc);
791
792                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
793                 cd.lpcd_last_idx = 0;
794                 rc = llog_reverse_process(env, cat_llh,
795                                           llog_cat_reverse_process_cb,
796                                           &d, &cd);
797         } else {
798                 rc = llog_reverse_process(env, cat_llh,
799                                           llog_cat_reverse_process_cb,
800                                           &d, NULL);
801         }
802
803         RETURN(rc);
804 }
805 EXPORT_SYMBOL(llog_cat_reverse_process);
806
807 static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
808 {
809         struct llog_log_hdr *llh = cathandle->lgh_hdr;
810         int bitmap_size;
811
812         ENTRY;
813
814         bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
815         /*
816          * The llh_cat_idx equals to the first used index minus 1
817          * so if we canceled the first index then llh_cat_idx
818          * must be renewed.
819          */
820         if (llh->llh_cat_idx == (idx - 1)) {
821                 llh->llh_cat_idx = idx;
822
823                 while (idx != cathandle->lgh_last_idx) {
824                         idx = (idx + 1) % bitmap_size;
825                         if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
826                                 /* update llh_cat_idx for each unset bit,
827                                  * expecting the next one is set */
828                                 llh->llh_cat_idx = idx;
829                         } else if (idx == 0) {
830                                 /* skip header bit */
831                                 llh->llh_cat_idx = 0;
832                                 continue;
833                         } else {
834                                 /* the first index is found */
835                                 break;
836                         }
837                 }
838
839                 CDEBUG(D_RPCTRACE, "Set catlog "DOSTID" first idx %u,"
840                        " (last_idx %u)\n", POSTID(&cathandle->lgh_id.lgl_oi),
841                        llh->llh_cat_idx, cathandle->lgh_last_idx);
842         }
843
844         RETURN(0);
845 }
846
847 /* Cleanup deleted plain llog traces from catalog */
848 int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
849                      struct llog_handle *loghandle, int index)
850 {
851         int rc;
852
853         LASSERT(index);
854         if (loghandle != NULL) {
855                 /* remove destroyed llog from catalog list and
856                  * chd_current_log variable */
857                 down_write(&cathandle->lgh_lock);
858                 if (cathandle->u.chd.chd_current_log == loghandle)
859                         cathandle->u.chd.chd_current_log = NULL;
860                 list_del_init(&loghandle->u.phd.phd_entry);
861                 up_write(&cathandle->lgh_lock);
862                 LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index);
863                 /* llog was opened and keep in a list, close it now */
864                 llog_close(env, loghandle);
865         }
866
867         /* do not attempt to cleanup on-disk llog if on client side */
868         if (cathandle->lgh_obj == NULL)
869                 return 0;
870
871         /* remove plain llog entry from catalog by index */
872         llog_cat_set_first_idx(cathandle, index);
873         rc = llog_cancel_rec(env, cathandle, index);
874         if (rc == 0)
875                 CDEBUG(D_HA, "cancel plain log at index"
876                        " %u of catalog "DOSTID"\n",
877                        index, POSTID(&cathandle->lgh_id.lgl_oi));
878         return rc;
879 }
880
881 /* helper to initialize catalog llog and process it to cancel */
882 int llog_cat_init_and_process(const struct lu_env *env,
883                               struct llog_handle *llh)
884 {
885         int rc;
886
887         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, NULL);
888         if (rc)
889                 RETURN(rc);
890
891         RETURN(0);
892 }
893 EXPORT_SYMBOL(llog_cat_init_and_process);