Whamcloud - gitweb
LU-15644 llog: don't report warning in no error case
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/llog_cat.c
32  *
33  * OST<->MDS recovery logging infrastructure.
34  *
35  * Invariants in implementation:
36  * - we do not share logs among different OST<->MDS connections, so that
37  *   if an OST or MDS fails it need only look at log(s) relevant to itself
38  *
39  * Author: Andreas Dilger <adilger@clusterfs.com>
40  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46
47 #include <obd_class.h>
48
49 #include "llog_internal.h"
50
51
52 /**
53  * lockdep markers for nested struct llog_handle::lgh_lock locking.
54  */
55 enum {
56         LLOGH_CAT,
57         LLOGH_LOG,
58 };
59
60 /* Create a new log handle and add it to the open list.
61  * This log handle will be closed when all of the records in it are removed.
62  *
63  * Assumes caller has already pushed us into the kernel context and is locking.
64  */
65 static int llog_cat_new_log(const struct lu_env *env,
66                             struct llog_handle *cathandle,
67                             struct llog_handle *loghandle,
68                             struct thandle *th)
69 {
70         struct llog_thread_info *lgi = llog_info(env);
71         struct llog_logid_rec   *rec = &lgi->lgi_logid;
72         struct thandle *handle = NULL;
73         struct dt_device *dt = NULL;
74         struct llog_log_hdr     *llh = cathandle->lgh_hdr;
75         int                      rc, index;
76
77         ENTRY;
78
79         index = (cathandle->lgh_last_idx + 1) % (llog_max_idx(llh) + 1);
80
81         /* check that new llog index will not overlap with the first one.
82          * - llh_cat_idx is the index just before the first/oldest still in-use
83          *      index in catalog
84          * - lgh_last_idx is the last/newest used index in catalog
85          *
86          * When catalog is not wrapped yet then lgh_last_idx is always larger
87          * than llh_cat_idx. After the wrap around lgh_last_idx re-starts
88          * from 0 and llh_cat_idx becomes the upper limit for it
89          *
90          * Check if catalog has already wrapped around or not by comparing
91          * last_idx and cat_idx */
92         if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) ||
93             (index == 0 && llh->llh_cat_idx == 0)) {
94                 if (cathandle->lgh_name == NULL) {
95                         CWARN("%s: there are no more free slots in catalog "DFID"\n",
96                               loghandle2name(loghandle),
97                               PLOGID(&cathandle->lgh_id));
98                 } else {
99                         CWARN("%s: there are no more free slots in catalog %s\n",
100                               loghandle2name(loghandle), cathandle->lgh_name);
101                 }
102                 RETURN(-ENOSPC);
103         }
104
105         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
106                 RETURN(-ENOSPC);
107
108         if (loghandle->lgh_hdr != NULL) {
109                 /* If llog object is remote and creation is failed, lgh_hdr
110                  * might be left over here, free it first */
111                 LASSERT(!llog_exist(loghandle));
112                 OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
113                 loghandle->lgh_hdr = NULL;
114         }
115
116         if (th == NULL) {
117                 dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
118
119                 handle = dt_trans_create(env, dt);
120                 if (IS_ERR(handle))
121                         RETURN(PTR_ERR(handle));
122
123                 /* Create update llog object synchronously, which
124                  * happens during inialization process see
125                  * lod_sub_prep_llog(), to make sure the update
126                  * llog object is created before corss-MDT writing
127                  * updates into the llog object */
128                 if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
129                         handle->th_sync = 1;
130
131                 handle->th_wait_submit = 1;
132
133                 rc = llog_declare_create(env, loghandle, handle);
134                 if (rc != 0)
135                         GOTO(out, rc);
136
137                 rec->lid_hdr.lrh_len = sizeof(*rec);
138                 rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
139                 rec->lid_id = loghandle->lgh_id;
140                 rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
141                                             handle);
142                 if (rc != 0)
143                         GOTO(out, rc);
144
145                 rc = dt_trans_start_local(env, dt, handle);
146                 if (rc != 0)
147                         GOTO(out, rc);
148
149                 th = handle;
150         }
151
152         rc = llog_create(env, loghandle, th);
153         /* if llog is already created, no need to initialize it */
154         if (rc == -EEXIST) {
155                 GOTO(out, rc = 0);
156         } else if (rc != 0) {
157                 CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
158                        loghandle2name(loghandle), rc);
159                 GOTO(out, rc);
160         }
161
162         rc = llog_init_handle(env, loghandle,
163                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
164                               &cathandle->lgh_hdr->llh_tgtuuid);
165         if (rc < 0)
166                 GOTO(out, rc);
167
168         /* build the record for this log in the catalog */
169         rec->lid_hdr.lrh_len = sizeof(*rec);
170         rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
171         rec->lid_id = loghandle->lgh_id;
172
173         /* append the new record into catalog. The new index will be
174          * assigned to the record and updated in rec header */
175         rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
176                             &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
177         if (rc < 0)
178                 GOTO(out_destroy, rc);
179
180         CDEBUG(D_OTHER, "new plain log "DFID".%u of catalog "DFID"\n",
181                PLOGID(&loghandle->lgh_id), rec->lid_hdr.lrh_index,
182                PLOGID(&cathandle->lgh_id));
183
184         loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
185
186         /* limit max size of plain llog so that space can be
187          * released sooner, especially on small filesystems */
188         /* 2MB for the cases when free space hasn't been learned yet */
189         loghandle->lgh_max_size = 2 << 20;
190         dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
191         rc = dt_statfs(env, dt, &lgi->lgi_statfs);
192         if (rc == 0 && lgi->lgi_statfs.os_bfree > 0) {
193                 __u64 freespace = (lgi->lgi_statfs.os_bfree *
194                                   lgi->lgi_statfs.os_bsize) >> 6;
195                 if (freespace < loghandle->lgh_max_size)
196                         loghandle->lgh_max_size = freespace;
197                 /* shouldn't be > 128MB in any case?
198                  * it's 256K records of 512 bytes each */
199                 if (freespace > (128 << 20))
200                         loghandle->lgh_max_size = 128 << 20;
201         }
202         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PLAIN_RECORDS) ||
203                      CFS_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK))) {
204                 // limit the numer of plain records for test
205                 loghandle->lgh_max_size = loghandle->lgh_hdr_size +
206                        cfs_fail_val * 64;
207         }
208
209         rc = 0;
210
211 out:
212         if (handle != NULL) {
213                 handle->th_result = rc >= 0 ? 0 : rc;
214                 dt_trans_stop(env, dt, handle);
215         }
216         RETURN(rc);
217
218 out_destroy:
219         /* to signal llog_cat_close() it shouldn't try to destroy the llog,
220          * we want to destroy it in this transaction, otherwise the object
221          * becomes an orphan */
222         loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
223         /* this is to mimic full log, so another llog_cat_current_log()
224          * can skip it and ask for another onet */
225         loghandle->lgh_last_idx = llog_max_idx(loghandle->lgh_hdr) + 1;
226         llog_trans_destroy(env, loghandle, th);
227         if (handle != NULL)
228                 dt_trans_stop(env, dt, handle);
229         RETURN(rc);
230 }
231
232 static int llog_cat_refresh(const struct lu_env *env,
233                             struct llog_handle *cathandle)
234 {
235         struct llog_handle *loghandle;
236         int rc;
237
238         down_write(&cathandle->lgh_lock);
239         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
240                             u.phd.phd_entry) {
241                 if (!llog_exist(loghandle))
242                         continue;
243
244                 down_write(&loghandle->lgh_lock);
245                 rc = llog_read_header(env, loghandle, NULL);
246                 up_write(&loghandle->lgh_lock);
247                 if (rc)
248                         goto unlock;
249         }
250
251         rc = llog_read_header(env, cathandle, NULL);
252 unlock:
253         up_write(&cathandle->lgh_lock);
254
255         return rc;
256 }
257
258 /*
259  * prepare current/next log for catalog.
260  *
261  * if \a *ploghandle is NULL, open it, and declare create, NB, if \a
262  * *ploghandle is remote, create it synchronously here, see comments
263  * below.
264  *
265  * \a cathandle->lgh_lock is down_read-ed, it gets down_write-ed if \a
266  * *ploghandle has to be opened.
267  */
268 static int llog_cat_prep_log(const struct lu_env *env,
269                              struct llog_handle *cathandle,
270                              struct llog_handle **ploghandle,
271                              struct thandle *th)
272 {
273         int rc;
274         int sem_upgraded;
275
276 start:
277         rc = 0;
278         sem_upgraded = 0;
279         if (IS_ERR_OR_NULL(*ploghandle)) {
280                 up_read(&cathandle->lgh_lock);
281                 down_write(&cathandle->lgh_lock);
282                 sem_upgraded = 1;
283                 if (IS_ERR_OR_NULL(*ploghandle)) {
284                         struct llog_handle *loghandle;
285
286                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
287                                        NULL, NULL, LLOG_OPEN_NEW);
288                         if (!rc) {
289                                 *ploghandle = loghandle;
290                                 list_add_tail(&loghandle->u.phd.phd_entry,
291                                               &cathandle->u.chd.chd_head);
292                         }
293                 }
294                 if (rc)
295                         GOTO(out, rc);
296         }
297
298         rc = llog_exist(*ploghandle);
299         if (rc < 0)
300                 GOTO(out, rc);
301         if (rc)
302                 GOTO(out, rc = 0);
303
304         if (dt_object_remote(cathandle->lgh_obj)) {
305                 down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
306                 if (!llog_exist(*ploghandle)) {
307                         /* For remote operation, if we put the llog object
308                          * creation in the current transaction, then the
309                          * llog object will not be created on the remote
310                          * target until the transaction stop, if other
311                          * operations start before the transaction stop,
312                          * and use the same llog object, will be dependent
313                          * on the success of this transaction. So let's
314                          * create the llog object synchronously here to
315                          * remove the dependency. */
316                         rc = llog_cat_new_log(env, cathandle, *ploghandle,
317                                               NULL);
318                         if (rc == -ESTALE) {
319                                 up_write(&(*ploghandle)->lgh_lock);
320                                 if (sem_upgraded)
321                                         up_write(&cathandle->lgh_lock);
322                                 else
323                                         up_read(&cathandle->lgh_lock);
324
325                                 rc = llog_cat_refresh(env, cathandle);
326                                 down_read_nested(&cathandle->lgh_lock,
327                                                  LLOGH_CAT);
328                                 if (rc)
329                                         return rc;
330                                 /* *ploghandle might become NULL, restart */
331                                 goto start;
332                         }
333                 }
334                 up_write(&(*ploghandle)->lgh_lock);
335         } else {
336                 struct llog_thread_info *lgi = llog_info(env);
337                 struct llog_logid_rec *lirec = &lgi->lgi_logid;
338
339                 rc = llog_declare_create(env, *ploghandle, th);
340                 if (rc)
341                         GOTO(out, rc);
342
343                 lirec->lid_hdr.lrh_len = sizeof(*lirec);
344                 rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
345                                             th);
346         }
347
348 out:
349         if (sem_upgraded) {
350                 up_write(&cathandle->lgh_lock);
351                 down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
352                 if (rc == 0)
353                         goto start;
354         }
355         return rc;
356 }
357
358 /* Open an existent log handle and add it to the open list.
359  * This log handle will be closed when all of the records in it are removed.
360  *
361  * Assumes caller has already pushed us into the kernel context and is locking.
362  * We return a lock on the handle to ensure nobody yanks it from us.
363  *
364  * This takes extra reference on llog_handle via llog_handle_get() and require
365  * this reference to be put by caller using llog_handle_put()
366  */
367 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
368                        struct llog_handle **res, struct llog_logid *logid)
369 {
370         struct llog_handle      *loghandle;
371         enum llog_flag           fmt;
372         int                      rc = 0;
373
374         ENTRY;
375
376         if (cathandle == NULL)
377                 RETURN(-EBADF);
378
379         fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK;
380         down_write(&cathandle->lgh_lock);
381         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
382                             u.phd.phd_entry) {
383                 struct llog_logid *cgl = &loghandle->lgh_id;
384
385                 if (ostid_id(&cgl->lgl_oi) == ostid_id(&logid->lgl_oi) &&
386                     ostid_seq(&cgl->lgl_oi) == ostid_seq(&logid->lgl_oi)) {
387                         *res = llog_handle_get(loghandle);
388                         if (!*res) {
389                                 CERROR("%s: log "DFID" refcount is zero!\n",
390                                        loghandle2name(loghandle),
391                                        PLOGID(logid));
392                                 continue;
393                         }
394                         loghandle->u.phd.phd_cat_handle = cathandle;
395                         up_write(&cathandle->lgh_lock);
396                         RETURN(rc);
397                 }
398         }
399         up_write(&cathandle->lgh_lock);
400
401         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
402                        LLOG_OPEN_EXISTS);
403         if (rc < 0) {
404                 CERROR("%s: error opening log id "DFID": rc = %d\n",
405                        loghandle2name(cathandle), PLOGID(logid), rc);
406                 RETURN(rc);
407         }
408
409         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN |
410                               LLOG_F_ZAP_WHEN_EMPTY | fmt, NULL);
411         if (rc < 0) {
412                 llog_close(env, loghandle);
413                 *res = NULL;
414                 RETURN(rc);
415         }
416
417         *res = llog_handle_get(loghandle);
418         LASSERT(*res);
419         down_write(&cathandle->lgh_lock);
420         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
421         up_write(&cathandle->lgh_lock);
422
423         loghandle->u.phd.phd_cat_handle = cathandle;
424         loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
425         loghandle->u.phd.phd_cookie.lgc_index =
426                                 loghandle->lgh_hdr->llh_cat_idx;
427         RETURN(0);
428 }
429
430 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
431 {
432         struct llog_handle      *loghandle, *n;
433         int                      rc;
434
435         ENTRY;
436
437         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
438                                  u.phd.phd_entry) {
439                 struct llog_log_hdr     *llh = loghandle->lgh_hdr;
440                 int                      index;
441
442                 /* unlink open-not-created llogs */
443                 list_del_init(&loghandle->u.phd.phd_entry);
444                 llh = loghandle->lgh_hdr;
445                 if (loghandle->lgh_obj != NULL && llh != NULL &&
446                     (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
447                     (llh->llh_count == 1)) {
448                         rc = llog_destroy(env, loghandle);
449                         if (rc)
450                                 CERROR("%s: failure destroying log during "
451                                        "cleanup: rc = %d\n",
452                                        loghandle2name(loghandle), rc);
453
454                         index = loghandle->u.phd.phd_cookie.lgc_index;
455                         llog_cat_cleanup(env, cathandle, NULL, index);
456                 }
457                 llog_close(env, loghandle);
458         }
459         /* if handle was stored in ctxt, remove it too */
460         if (cathandle->lgh_ctxt->loc_handle == cathandle)
461                 cathandle->lgh_ctxt->loc_handle = NULL;
462         rc = llog_close(env, cathandle);
463         RETURN(rc);
464 }
465 EXPORT_SYMBOL(llog_cat_close);
466
467 /** Return the currently active log handle.  If the current log handle doesn't
468  * have enough space left for the current record, start a new one.
469  *
470  * If reclen is 0, we only want to know what the currently active log is,
471  * otherwise we get a lock on this log so nobody can steal our space.
472  *
473  * Assumes caller has already pushed us into the kernel context and is locking.
474  *
475  * NOTE: loghandle is write-locked upon successful return
476  */
477 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
478                                                 struct thandle *th)
479 {
480         struct llog_handle *loghandle = NULL;
481         ENTRY;
482
483
484         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) {
485                 down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
486                 GOTO(next, loghandle);
487         }
488
489         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
490         loghandle = cathandle->u.chd.chd_current_log;
491         if (loghandle) {
492                 struct llog_log_hdr *llh;
493
494                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
495                 llh = loghandle->lgh_hdr;
496                 if (llh == NULL || !llog_is_full(loghandle)) {
497                         up_read(&cathandle->lgh_lock);
498                         RETURN(loghandle);
499                 } else {
500                         up_write(&loghandle->lgh_lock);
501                 }
502         }
503         up_read(&cathandle->lgh_lock);
504
505         /* time to use next log */
506
507         /* first, we have to make sure the state hasn't changed */
508         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
509         loghandle = cathandle->u.chd.chd_current_log;
510         if (loghandle) {
511                 struct llog_log_hdr *llh;
512
513                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
514                 llh = loghandle->lgh_hdr;
515                 if (llh == NULL || !llog_is_full(loghandle))
516                         GOTO(out_unlock, loghandle);
517                 else
518                         up_write(&loghandle->lgh_lock);
519         }
520
521 next:
522         /* Sigh, the chd_next_log and chd_current_log is initialized
523          * in declare phase, and we do not serialize the catlog
524          * accessing, so it might be possible the llog creation
525          * thread (see llog_cat_declare_add_rec()) did not create
526          * llog successfully, then the following thread might
527          * meet this situation. */
528         if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) {
529                 CERROR("%s: next log does not exist!\n",
530                        loghandle2name(cathandle));
531                 loghandle = ERR_PTR(-EIO);
532                 if (cathandle->u.chd.chd_next_log == NULL) {
533                         /* Store the error in chd_next_log, so
534                          * the following process can get correct
535                          * failure value */
536                         cathandle->u.chd.chd_next_log = loghandle;
537                 }
538                 GOTO(out_unlock, loghandle);
539         }
540
541         CDEBUG(D_INODE, "use next log\n");
542
543         loghandle = cathandle->u.chd.chd_next_log;
544         cathandle->u.chd.chd_current_log = loghandle;
545         cathandle->u.chd.chd_next_log = NULL;
546         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
547
548 out_unlock:
549         up_write(&cathandle->lgh_lock);
550         LASSERT(loghandle);
551         RETURN(loghandle);
552 }
553
554 /* Add a single record to the recovery log(s) using a catalog
555  * Returns as llog_write_record
556  *
557  * Assumes caller has already pushed us into the kernel context.
558  */
559 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
560                      struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
561                      struct thandle *th)
562 {
563         struct llog_handle *loghandle;
564         int rc, retried = 0;
565         ENTRY;
566
567         LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
568
569 retry:
570         loghandle = llog_cat_current_log(cathandle, th);
571         if (IS_ERR(loghandle))
572                 RETURN(PTR_ERR(loghandle));
573
574         /* loghandle is already locked by llog_cat_current_log() for us */
575         if (!llog_exist(loghandle)) {
576                 rc = llog_cat_new_log(env, cathandle, loghandle, th);
577                 if (rc < 0) {
578                         up_write(&loghandle->lgh_lock);
579                         /* nobody should be trying to use this llog */
580                         down_write(&cathandle->lgh_lock);
581                         if (cathandle->u.chd.chd_current_log == loghandle)
582                                 cathandle->u.chd.chd_current_log = NULL;
583                         up_write(&cathandle->lgh_lock);
584                         RETURN(rc);
585                 }
586         }
587         /* now let's try to add the record */
588         rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
589         if (rc < 0) {
590                 CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR,
591                              "llog_write_rec %d: lh=%p\n", rc, loghandle);
592                 /* -ENOSPC is returned if no empty records left
593                  * and when it's lack of space on the stogage.
594                  * there is no point to try again if it's the second
595                  * case. many callers (like llog test) expect ENOSPC,
596                  * so we preserve this error code, but look for the
597                  * actual cause here */
598                 if (rc == -ENOSPC && llog_is_full(loghandle))
599                         rc = -ENOBUFS;
600         }
601         up_write(&loghandle->lgh_lock);
602
603         if (rc == -ENOBUFS) {
604                 if (retried++ == 0)
605                         GOTO(retry, rc);
606                 CERROR("%s: error on 2nd llog: rc = %d\n",
607                        loghandle2name(cathandle), rc);
608         }
609
610         RETURN(rc);
611 }
612 EXPORT_SYMBOL(llog_cat_add_rec);
613
614 int llog_cat_declare_add_rec(const struct lu_env *env,
615                              struct llog_handle *cathandle,
616                              struct llog_rec_hdr *rec, struct thandle *th)
617 {
618         int rc;
619
620         ENTRY;
621
622 start:
623         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
624         rc = llog_cat_prep_log(env, cathandle,
625                                &cathandle->u.chd.chd_current_log, th);
626         if (rc)
627                 GOTO(unlock, rc);
628
629         rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
630                                th);
631         if (rc)
632                 GOTO(unlock, rc);
633
634         rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
635                                     rec, -1, th);
636         if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
637                 up_read(&cathandle->lgh_lock);
638                 rc = llog_cat_refresh(env, cathandle);
639                 if (rc)
640                         RETURN(rc);
641                 goto start;
642         }
643
644 #if 0
645         /*
646          * XXX: we hope for declarations made for existing llog this might be
647          * not correct with some backends where declarations are expected
648          * against specific object like ZFS with full debugging enabled.
649          */
650         rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
651                                     th);
652 #endif
653 unlock:
654         up_read(&cathandle->lgh_lock);
655         RETURN(rc);
656 }
657 EXPORT_SYMBOL(llog_cat_declare_add_rec);
658
659 int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
660                  struct llog_rec_hdr *rec, struct llog_cookie *reccookie)
661 {
662         struct llog_ctxt        *ctxt;
663         struct dt_device        *dt;
664         struct thandle          *th = NULL;
665         int                      rc;
666
667         ctxt = cathandle->lgh_ctxt;
668         LASSERT(ctxt);
669         LASSERT(ctxt->loc_exp);
670
671         LASSERT(cathandle->lgh_obj != NULL);
672         dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
673
674         th = dt_trans_create(env, dt);
675         if (IS_ERR(th))
676                 RETURN(PTR_ERR(th));
677
678         rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
679         if (rc)
680                 GOTO(out_trans, rc);
681
682         rc = dt_trans_start_local(env, dt, th);
683         if (rc)
684                 GOTO(out_trans, rc);
685         rc = llog_cat_add_rec(env, cathandle, rec, reccookie, th);
686 out_trans:
687         dt_trans_stop(env, dt, th);
688         RETURN(rc);
689 }
690 EXPORT_SYMBOL(llog_cat_add);
691
692 int llog_cat_cancel_arr_rec(const struct lu_env *env,
693                             struct llog_handle *cathandle,
694                             struct llog_logid *lgl, int count, int *index)
695 {
696         struct llog_handle *loghandle;
697         int  rc;
698
699         ENTRY;
700         rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
701         if (rc) {
702                 CDEBUG(D_HA, "%s: can't find llog handle for "DFID": rc = %d\n",
703                        loghandle2name(cathandle), PLOGID(lgl), rc);
704                 RETURN(rc);
705         }
706
707         if ((cathandle->lgh_ctxt->loc_flags &
708              LLOG_CTXT_FLAG_NORMAL_FID) && !llog_exist(loghandle)) {
709                 /* For update log, some of loghandles of cathandle
710                  * might not exist because remote llog creation might
711                  * be failed, so let's skip the record cancellation
712                  * for these non-exist llogs.
713                  */
714                 rc = -ENOENT;
715                 CDEBUG(D_HA, "%s: llog "DFID" does not exist: rc = %d\n",
716                        loghandle2name(cathandle), PLOGID(lgl), rc);
717                 llog_handle_put(env, loghandle);
718                 RETURN(rc);
719         }
720
721         rc = llog_cancel_arr_rec(env, loghandle, count, index);
722         if (rc == LLOG_DEL_PLAIN) { /* log has been destroyed */
723                 int cat_index;
724
725                 cat_index = loghandle->u.phd.phd_cookie.lgc_index;
726                 rc = llog_cat_cleanup(env, cathandle, loghandle, cat_index);
727                 if (rc)
728                         CDEBUG(D_HA,
729                                "%s: fail to cancel catalog record: rc = %d\n",
730                                loghandle2name(cathandle), rc);
731                 rc = 0;
732
733         }
734         llog_handle_put(env, loghandle);
735         if (rc && rc != -ENOENT && rc != -ESTALE && rc != -EIO)
736                 CWARN("%s: fail to cancel %d records in "DFID": rc = %d\n",
737                       loghandle2name(cathandle), count, PLOGID(lgl), rc);
738         RETURN(rc);
739 }
740 EXPORT_SYMBOL(llog_cat_cancel_arr_rec);
741
742 /* For each cookie in the cookie array, we clear the log in-use bit and either:
743  * - the log is empty, so mark it free in the catalog header and delete it
744  * - the log is not empty, just write out the log header
745  *
746  * The cookies may be in different log files, so we need to get new logs
747  * each time.
748  *
749  * Assumes caller has already pushed us into the kernel context.
750  */
751 int llog_cat_cancel_records(const struct lu_env *env,
752                             struct llog_handle *cathandle, int count,
753                             struct llog_cookie *cookies)
754 {
755         int i, rc = 0;
756
757         ENTRY;
758
759         for (i = 0; i < count; i++, cookies++) {
760                 int lrc;
761
762                 lrc = llog_cat_cancel_arr_rec(env, cathandle, &cookies->lgc_lgl,
763                                               1, &cookies->lgc_index);
764                 if (lrc && !rc)
765                         rc = lrc;
766         }
767
768         RETURN(rc);
769 }
770 EXPORT_SYMBOL(llog_cat_cancel_records);
771
772 static int llog_cat_process_common(const struct lu_env *env,
773                                    struct llog_handle *cat_llh,
774                                    struct llog_rec_hdr *rec,
775                                    struct llog_handle **llhp)
776 {
777         struct llog_logid_rec *lir = container_of(rec, typeof(*lir), lid_hdr);
778         struct llog_log_hdr *hdr;
779         int rc;
780
781         ENTRY;
782         if (rec->lrh_type != le32_to_cpu(LLOG_LOGID_MAGIC)) {
783                 rc = -EINVAL;
784                 CWARN("%s: invalid record in catalog "DFID": rc = %d\n",
785                       loghandle2name(cat_llh), PLOGID(&cat_llh->lgh_id), rc);
786                 RETURN(rc);
787         }
788         CDEBUG(D_HA, "processing log "DFID" at index %u of catalog "DFID"\n",
789                PLOGID(&lir->lid_id), le32_to_cpu(rec->lrh_index),
790                PLOGID(&cat_llh->lgh_id));
791
792         rc = llog_cat_id2handle(env, cat_llh, llhp, &lir->lid_id);
793         if (rc) {
794                 /* After a server crash, a stub of index record in catlog could
795                  * be kept, because plain log destroy + catlog index record
796                  * deletion are not atomic. So we end up with an index but no
797                  * actual record. Destroy the index and move on. */
798                 if (rc == -ENOENT || rc == -ESTALE)
799                         rc = LLOG_DEL_RECORD;
800                 else if (rc)
801                         CWARN("%s: can't find llog handle "DFID": rc = %d\n",
802                               loghandle2name(cat_llh), PLOGID(&lir->lid_id),
803                               rc);
804
805                 RETURN(rc);
806         }
807
808         /* clean old empty llogs, do not consider current llog in use */
809         /* ignore remote (lgh_obj == NULL) llogs */
810         hdr = (*llhp)->lgh_hdr;
811         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
812             hdr->llh_count == 1 && cat_llh->lgh_obj != NULL &&
813             *llhp != cat_llh->u.chd.chd_current_log &&
814             *llhp != cat_llh->u.chd.chd_next_log) {
815                 rc = llog_destroy(env, *llhp);
816                 if (rc)
817                         CWARN("%s: can't destroy empty log "DFID": rc = %d\n",
818                               loghandle2name((*llhp)), PLOGID(&lir->lid_id),
819                               rc);
820                 rc = LLOG_DEL_PLAIN;
821         }
822
823         RETURN(rc);
824 }
825
826 static int llog_cat_process_cb(const struct lu_env *env,
827                                struct llog_handle *cat_llh,
828                                struct llog_rec_hdr *rec, void *data)
829 {
830         struct llog_process_data *d = data;
831         struct llog_handle *llh = NULL;
832         int rc;
833
834         ENTRY;
835
836         /* Skip processing of the logs until startcat */
837         if (rec->lrh_index < d->lpd_startcat)
838                 RETURN(0);
839
840         rc = llog_cat_process_common(env, cat_llh, rec, &llh);
841         if (rc)
842                 GOTO(out, rc);
843
844         if (d->lpd_startidx > 0) {
845                 struct llog_process_cat_data cd = {
846                         .lpcd_first_idx = 0,
847                         .lpcd_last_idx = 0,
848                         .lpcd_read_mode = LLOG_READ_MODE_NORMAL,
849                 };
850
851                 /* startidx is always associated with a catalog index */
852                 if (d->lpd_startcat == rec->lrh_index)
853                         cd.lpcd_first_idx = d->lpd_startidx;
854
855                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
856                                           &cd, false);
857                 /* Continue processing the next log from idx 0 */
858                 d->lpd_startidx = 0;
859         } else {
860                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
861                                           NULL, false);
862         }
863         if (rc == -ENOENT && (cat_llh->lgh_hdr->llh_flags & LLOG_F_RM_ON_ERR)) {
864                 /*
865                  * plain llog is reported corrupted, so better to just remove
866                  * it if the caller is fine with that.
867                  */
868                 CERROR("%s: remove corrupted/missing llog "DFID"\n",
869                        loghandle2name(cat_llh), PLOGID(&llh->lgh_id));
870                 rc = LLOG_DEL_PLAIN;
871         }
872
873 out:
874         /* The empty plain log was destroyed while processing */
875         if (rc == LLOG_DEL_PLAIN || rc == LLOG_DEL_RECORD)
876                 /* clear wrong catalog entry */
877                 rc = llog_cat_cleanup(env, cat_llh, llh, rec->lrh_index);
878         else if (rc == LLOG_SKIP_PLAIN)
879                 /* processing callback ask to skip the llog -> continue */
880                 rc = 0;
881
882         if (llh)
883                 llog_handle_put(env, llh);
884
885         RETURN(rc);
886 }
887
888 int llog_cat_process_or_fork(const struct lu_env *env,
889                              struct llog_handle *cat_llh, llog_cb_t cat_cb,
890                              llog_cb_t cb, void *data, int startcat,
891                              int startidx, bool fork)
892 {
893         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
894         struct llog_process_data d;
895         struct llog_process_cat_data cd;
896         int rc;
897
898         ENTRY;
899
900         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
901         d.lpd_data = data;
902         d.lpd_cb = cb;
903
904         /* default: start from the oldest record */
905         d.lpd_startidx = 0;
906         d.lpd_startcat = llh->llh_cat_idx + 1;
907         cd.lpcd_first_idx = llh->llh_cat_idx;
908         cd.lpcd_last_idx = 0;
909         cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
910
911         if (startcat > 0 && startcat <= llog_max_idx(llh)) {
912                 /* start from a custom catalog/llog plain indexes*/
913                 d.lpd_startidx = startidx;
914                 d.lpd_startcat = startcat;
915                 cd.lpcd_first_idx = startcat - 1;
916         } else if (startcat != 0) {
917                 CWARN("%s: startcat %d out of range for catlog "DFID"\n",
918                       loghandle2name(cat_llh), startcat,
919                       PLOGID(&cat_llh->lgh_id));
920                 RETURN(-EINVAL);
921         }
922
923         startcat = d.lpd_startcat;
924
925         /* if startcat <= lgh_last_idx, we only need to process the first part
926          * of the catalog (from startcat).
927          */
928         if (llog_cat_is_wrapped(cat_llh) && startcat > cat_llh->lgh_last_idx) {
929                 int cat_idx_origin = llh->llh_cat_idx;
930
931                 CWARN("%s: catlog "DFID" crosses index zero\n",
932                       loghandle2name(cat_llh),
933                       PLOGID(&cat_llh->lgh_id));
934
935                 /* processing the catalog part at the end */
936                 rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
937                 if (rc)
938                         RETURN(rc);
939
940                 /* Reset the startcat because it has already reached catalog
941                  * bottom.
942                  * lgh_last_idx value could be increased during processing. So
943                  * we process the remaining of catalog entries to be sure.
944                  */
945                 d.lpd_startcat = 1;
946                 d.lpd_startidx = 0;
947                 cd.lpcd_first_idx = 0;
948                 cd.lpcd_last_idx = max(cat_idx_origin, cat_llh->lgh_last_idx);
949         } else if (llog_cat_is_wrapped(cat_llh)) {
950                 /* only process 1st part -> stop before reaching 2sd part */
951                 cd.lpcd_last_idx = llh->llh_cat_idx;
952         }
953
954         /* processing the catalog part at the begining */
955         rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
956
957         RETURN(rc);
958 }
959 EXPORT_SYMBOL(llog_cat_process_or_fork);
960
961 /**
962  * Process catalog records with a callback
963  *
964  * \note
965  * If "starcat = 0", this is the default processing. "startidx" argument is
966  * ignored and processing begin from the oldest record.
967  * If "startcat > 0", this is a custom starting point. Processing begin with
968  * the llog plain defined in the catalog record at index "startcat". The first
969  * llog plain record to process is at index "startidx + 1".
970  *
971  * \param env           Lustre environnement
972  * \param cat_llh       Catalog llog handler
973  * \param cb            Callback executed for each records (in llog plain files)
974  * \param data          Callback data argument
975  * \param startcat      Catalog index of the llog plain to start with.
976  * \param startidx      Index of the llog plain to start processing. The first
977  *                      record to process is at startidx + 1.
978  *
979  * \retval 0 processing successfully completed
980  * \retval LLOG_PROC_BREAK processing was stopped by the callback.
981  * \retval -errno on error.
982  */
983 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
984                      llog_cb_t cb, void *data, int startcat, int startidx)
985 {
986         return llog_cat_process_or_fork(env, cat_llh, llog_cat_process_cb,
987                                         cb, data, startcat, startidx, false);
988 }
989 EXPORT_SYMBOL(llog_cat_process);
990
991 static int llog_cat_size_cb(const struct lu_env *env,
992                              struct llog_handle *cat_llh,
993                              struct llog_rec_hdr *rec, void *data)
994 {
995         struct llog_process_data *d = data;
996         struct llog_handle *llh = NULL;
997         __u64 *cum_size = d->lpd_data;
998         __u64 size;
999         int rc;
1000
1001         ENTRY;
1002         rc = llog_cat_process_common(env, cat_llh, rec, &llh);
1003
1004         if (rc == LLOG_DEL_PLAIN) {
1005                 /* empty log was deleted, don't count it */
1006                 rc = llog_cat_cleanup(env, cat_llh, llh,
1007                                       llh->u.phd.phd_cookie.lgc_index);
1008         } else if (rc == LLOG_DEL_RECORD) {
1009                 /* clear wrong catalog entry */
1010                 rc = llog_cat_cleanup(env, cat_llh, NULL, rec->lrh_index);
1011         } else {
1012                 size = llog_size(env, llh);
1013                 *cum_size += size;
1014
1015                 CDEBUG(D_INFO, "Add llog entry "DFID" size=%llu, tot=%llu\n",
1016                        PLOGID(&llh->lgh_id), size, *cum_size);
1017         }
1018
1019         if (llh != NULL)
1020                 llog_handle_put(env, llh);
1021
1022         RETURN(0);
1023 }
1024
1025 __u64 llog_cat_size(const struct lu_env *env, struct llog_handle *cat_llh)
1026 {
1027         __u64 size = llog_size(env, cat_llh);
1028
1029         llog_cat_process_or_fork(env, cat_llh, llog_cat_size_cb,
1030                                  NULL, &size, 0, 0, false);
1031
1032         return size;
1033 }
1034 EXPORT_SYMBOL(llog_cat_size);
1035
1036 /* currently returns the number of "free" entries in catalog,
1037  * ie the available entries for a new plain LLOG file creation,
1038  * even if catalog has wrapped
1039  */
1040 __u32 llog_cat_free_space(struct llog_handle *cat_llh)
1041 {
1042         /* simulate almost full Catalog */
1043         if (CFS_FAIL_CHECK(OBD_FAIL_CAT_FREE_RECORDS))
1044                 return cfs_fail_val;
1045
1046         if (cat_llh->lgh_hdr->llh_count == 1)
1047                 return llog_max_idx(cat_llh->lgh_hdr);
1048
1049         if (cat_llh->lgh_last_idx > cat_llh->lgh_hdr->llh_cat_idx)
1050                 return llog_max_idx(cat_llh->lgh_hdr) +
1051                        cat_llh->lgh_hdr->llh_cat_idx - cat_llh->lgh_last_idx;
1052
1053         /* catalog is presently wrapped */
1054         return cat_llh->lgh_hdr->llh_cat_idx - cat_llh->lgh_last_idx;
1055 }
1056 EXPORT_SYMBOL(llog_cat_free_space);
1057
1058 static int llog_cat_reverse_process_cb(const struct lu_env *env,
1059                                        struct llog_handle *cat_llh,
1060                                        struct llog_rec_hdr *rec, void *data)
1061 {
1062         struct llog_process_data *d = data;
1063         struct llog_handle *llh;
1064         int rc;
1065
1066         ENTRY;
1067         rc = llog_cat_process_common(env, cat_llh, rec, &llh);
1068
1069         /* The empty plain log was destroyed while processing */
1070         if (rc == LLOG_DEL_PLAIN) {
1071                 rc = llog_cat_cleanup(env, cat_llh, llh,
1072                                       llh->u.phd.phd_cookie.lgc_index);
1073         } else if (rc == LLOG_DEL_RECORD) {
1074                 /* clear wrong catalog entry */
1075                 rc = llog_cat_cleanup(env, cat_llh, NULL, rec->lrh_index);
1076         } else if (rc == LLOG_SKIP_PLAIN) {
1077                 /* processing callback ask to skip the llog -> continue */
1078                 rc = 0;
1079         }
1080         if (rc)
1081                 RETURN(rc);
1082
1083         rc = llog_reverse_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
1084
1085         /* The empty plain was destroyed while processing */
1086         if (rc == LLOG_DEL_PLAIN)
1087                 rc = llog_cat_cleanup(env, cat_llh, llh,
1088                                       llh->u.phd.phd_cookie.lgc_index);
1089
1090         llog_handle_put(env, llh);
1091         RETURN(rc);
1092 }
1093
1094 int llog_cat_reverse_process(const struct lu_env *env,
1095                              struct llog_handle *cat_llh,
1096                              llog_cb_t cb, void *data)
1097 {
1098         struct llog_process_data d;
1099         struct llog_process_cat_data cd;
1100         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
1101         int rc;
1102         ENTRY;
1103
1104         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
1105         cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
1106         d.lpd_data = data;
1107         d.lpd_cb = cb;
1108
1109         if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
1110             llh->llh_count > 1) {
1111                 CWARN("%s: catalog "DFID" crosses index zero\n",
1112                       loghandle2name(cat_llh),
1113                       PLOGID(&cat_llh->lgh_id));
1114
1115                 cd.lpcd_first_idx = 0;
1116                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
1117                 rc = llog_reverse_process(env, cat_llh,
1118                                           llog_cat_reverse_process_cb,
1119                                           &d, &cd);
1120                 if (rc != 0)
1121                         RETURN(rc);
1122
1123                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
1124                 cd.lpcd_last_idx = 0;
1125                 rc = llog_reverse_process(env, cat_llh,
1126                                           llog_cat_reverse_process_cb,
1127                                           &d, &cd);
1128         } else {
1129                 rc = llog_reverse_process(env, cat_llh,
1130                                           llog_cat_reverse_process_cb,
1131                                           &d, NULL);
1132         }
1133
1134         RETURN(rc);
1135 }
1136 EXPORT_SYMBOL(llog_cat_reverse_process);
1137
1138 static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
1139 {
1140         struct llog_log_hdr *llh = cathandle->lgh_hdr;
1141         int idx_nbr;
1142
1143         ENTRY;
1144
1145         idx_nbr = llog_max_idx(llh) + 1;
1146         /*
1147          * The llh_cat_idx equals to the first used index minus 1
1148          * so if we canceled the first index then llh_cat_idx
1149          * must be renewed.
1150          */
1151         if (llh->llh_cat_idx == (idx - 1)) {
1152                 llh->llh_cat_idx = idx;
1153
1154                 while (idx != cathandle->lgh_last_idx) {
1155                         idx = (idx + 1) % idx_nbr;
1156                         if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) {
1157                                 /* update llh_cat_idx for each unset bit,
1158                                  * expecting the next one is set */
1159                                 llh->llh_cat_idx = idx;
1160                         } else if (idx == 0) {
1161                                 /* skip header bit */
1162                                 llh->llh_cat_idx = 0;
1163                                 continue;
1164                         } else {
1165                                 /* the first index is found */
1166                                 break;
1167                         }
1168                 }
1169
1170                 CDEBUG(D_HA, "catlog "DFID" first idx %u, last_idx %u\n",
1171                        PLOGID(&cathandle->lgh_id), llh->llh_cat_idx,
1172                        cathandle->lgh_last_idx);
1173         }
1174
1175         RETURN(0);
1176 }
1177
1178 /* Cleanup deleted plain llog traces from catalog */
1179 int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
1180                      struct llog_handle *loghandle, int index)
1181 {
1182         int rc;
1183
1184         LASSERT(index);
1185         if (loghandle != NULL) {
1186                 /* remove destroyed llog from catalog list and
1187                  * chd_current_log variable */
1188                 down_write(&cathandle->lgh_lock);
1189                 if (cathandle->u.chd.chd_current_log == loghandle)
1190                         cathandle->u.chd.chd_current_log = NULL;
1191                 list_del_init(&loghandle->u.phd.phd_entry);
1192                 up_write(&cathandle->lgh_lock);
1193                 LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index ||
1194                         loghandle->u.phd.phd_cookie.lgc_index == 0);
1195                 /* llog was opened and keep in a list, close it now */
1196                 llog_close(env, loghandle);
1197         }
1198
1199         /* do not attempt to cleanup on-disk llog if on client side */
1200         if (cathandle->lgh_obj == NULL)
1201                 return 0;
1202
1203         /* remove plain llog entry from catalog by index */
1204         llog_cat_set_first_idx(cathandle, index);
1205         rc = llog_cancel_rec(env, cathandle, index);
1206         if (!rc && loghandle)
1207                 CDEBUG(D_HA,
1208                        "cancel plain log "DFID" at index %u of catalog "DFID"\n",
1209                        PLOGID(&loghandle->lgh_id), index,
1210                        PLOGID(&cathandle->lgh_id));
1211         return rc;
1212 }
1213
1214 /* retain log in catalog, and zap it if log is empty */
1215 int llog_cat_retain_cb(const struct lu_env *env, struct llog_handle *cat,
1216                        struct llog_rec_hdr *rec, void *data)
1217 {
1218         struct llog_handle *log = NULL;
1219         int rc;
1220
1221         rc = llog_cat_process_common(env, cat, rec, &log);
1222
1223         /* The empty plain log was destroyed while processing */
1224         if (rc == LLOG_DEL_PLAIN || rc == LLOG_DEL_RECORD)
1225                 /* clear wrong catalog entry */
1226                 rc = llog_cat_cleanup(env, cat, log, rec->lrh_index);
1227         else if (!rc)
1228                 llog_retain(env, log);
1229
1230         if (log)
1231                 llog_handle_put(env, log);
1232
1233         return rc;
1234 }
1235 EXPORT_SYMBOL(llog_cat_retain_cb);
1236
1237