Whamcloud - gitweb
LU-7419 llog: lock new llog object creation
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_cat.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  *
40  * Invariants in implementation:
41  * - we do not share logs among different OST<->MDS connections, so that
42  *   if an OST or MDS fails it need only look at log(s) relevant to itself
43  *
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
46  * Author: Mikhail Pershin <mike.pershin@intel.com>
47  */
48
49 #define DEBUG_SUBSYSTEM S_LOG
50
51
52 #include <obd_class.h>
53
54 #include "llog_internal.h"
55
56 /* Create a new log handle and add it to the open list.
57  * This log handle will be closed when all of the records in it are removed.
58  *
59  * Assumes caller has already pushed us into the kernel context and is locking.
60  */
61 static int llog_cat_new_log(const struct lu_env *env,
62                             struct llog_handle *cathandle,
63                             struct llog_handle *loghandle,
64                             struct thandle *th)
65 {
66         struct llog_thread_info *lgi = llog_info(env);
67         struct llog_logid_rec   *rec = &lgi->lgi_logid;
68         struct thandle *handle = NULL;
69         struct dt_device *dt = NULL;
70         struct llog_log_hdr     *llh = cathandle->lgh_hdr;
71         int                      rc, index;
72
73         ENTRY;
74
75         index = (cathandle->lgh_last_idx + 1) %
76                 (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) :
77                                                 LLOG_HDR_BITMAP_SIZE(llh));
78
79         /* check that new llog index will not overlap with the first one.
80          * - llh_cat_idx is the index just before the first/oldest still in-use
81          *      index in catalog
82          * - lgh_last_idx is the last/newest used index in catalog
83          *
84          * When catalog is not wrapped yet then lgh_last_idx is always larger
85          * than llh_cat_idx. After the wrap around lgh_last_idx re-starts
86          * from 0 and llh_cat_idx becomes the upper limit for it
87          *
88          * Check if catalog has already wrapped around or not by comparing
89          * last_idx and cat_idx */
90         if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) ||
91             (index == 0 && llh->llh_cat_idx == 0)) {
92                 CWARN("%s: there are no more free slots in catalog\n",
93                       loghandle->lgh_ctxt->loc_obd->obd_name);
94                 RETURN(-ENOSPC);
95         }
96
97         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
98                 RETURN(-ENOSPC);
99
100         if (loghandle->lgh_hdr != NULL) {
101                 /* If llog object is remote and creation is failed, lgh_hdr
102                  * might be left over here, free it first */
103                 LASSERT(!llog_exist(loghandle));
104                 OBD_FREE_PTR(loghandle->lgh_hdr);
105                 loghandle->lgh_hdr = NULL;
106         }
107
108         if (th == NULL) {
109                 dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
110
111                 handle = dt_trans_create(env, dt);
112                 if (IS_ERR(handle))
113                         RETURN(PTR_ERR(handle));
114
115                 /* Create update llog object synchronously, which
116                  * happens during inialization process see
117                  * lod_sub_prep_llog(), to make sure the update
118                  * llog object is created before corss-MDT writing
119                  * updates into the llog object */
120                 if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
121                         handle->th_sync = 1;
122
123                 handle->th_wait_submit = 1;
124
125                 rc = llog_declare_create(env, loghandle, handle);
126                 if (rc != 0)
127                         GOTO(out, rc);
128
129                 rec->lid_hdr.lrh_len = sizeof(*rec);
130                 rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
131                 rec->lid_id = loghandle->lgh_id;
132                 rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
133                                             handle);
134                 if (rc != 0)
135                         GOTO(out, rc);
136
137                 rc = dt_trans_start_local(env, dt, handle);
138                 if (rc != 0)
139                         GOTO(out, rc);
140
141                 th = handle;
142         }
143
144         rc = llog_create(env, loghandle, th);
145         /* if llog is already created, no need to initialize it */
146         if (rc == -EEXIST) {
147                 GOTO(out, rc = 0);
148         } else if (rc != 0) {
149                 CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
150                        loghandle->lgh_ctxt->loc_obd->obd_name, rc);
151                 GOTO(out, rc);
152         }
153
154         rc = llog_init_handle(env, loghandle,
155                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
156                               &cathandle->lgh_hdr->llh_tgtuuid);
157         if (rc < 0)
158                 GOTO(out, rc);
159
160         /* build the record for this log in the catalog */
161         rec->lid_hdr.lrh_len = sizeof(*rec);
162         rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
163         rec->lid_id = loghandle->lgh_id;
164
165         /* append the new record into catalog. The new index will be
166          * assigned to the record and updated in rec header */
167         rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
168                             &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
169         if (rc < 0)
170                 GOTO(out_destroy, rc);
171
172         CDEBUG(D_OTHER, "new plain log "DOSTID":%x for index %u of catalog"
173                DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
174                loghandle->lgh_id.lgl_ogen, rec->lid_hdr.lrh_index,
175                POSTID(&cathandle->lgh_id.lgl_oi));
176
177         loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
178 out:
179         if (handle != NULL)
180                 dt_trans_stop(env, dt, handle);
181
182         RETURN(0);
183
184 out_destroy:
185         /* to signal llog_cat_close() it shouldn't try to destroy the llog,
186          * we want to destroy it in this transaction, otherwise the object
187          * becomes an orphan */
188         loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
189         /* this is to mimic full log, so another llog_cat_current_log()
190          * can skip it and ask for another onet */
191         loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) + 1;
192         llog_trans_destroy(env, loghandle, th);
193         RETURN(rc);
194 }
195
196 /* Open an existent log handle and add it to the open list.
197  * This log handle will be closed when all of the records in it are removed.
198  *
199  * Assumes caller has already pushed us into the kernel context and is locking.
200  * We return a lock on the handle to ensure nobody yanks it from us.
201  *
202  * This takes extra reference on llog_handle via llog_handle_get() and require
203  * this reference to be put by caller using llog_handle_put()
204  */
205 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
206                        struct llog_handle **res, struct llog_logid *logid)
207 {
208         struct llog_handle      *loghandle;
209         enum llog_flag           fmt;
210         int                      rc = 0;
211
212         ENTRY;
213
214         if (cathandle == NULL)
215                 RETURN(-EBADF);
216
217         fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK;
218         down_write(&cathandle->lgh_lock);
219         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
220                             u.phd.phd_entry) {
221                 struct llog_logid *cgl = &loghandle->lgh_id;
222
223                 if (ostid_id(&cgl->lgl_oi) == ostid_id(&logid->lgl_oi) &&
224                     ostid_seq(&cgl->lgl_oi) == ostid_seq(&logid->lgl_oi)) {
225                         if (cgl->lgl_ogen != logid->lgl_ogen) {
226                                 CERROR("%s: log "DOSTID" generation %x != %x\n",
227                                        loghandle->lgh_ctxt->loc_obd->obd_name,
228                                        POSTID(&logid->lgl_oi), cgl->lgl_ogen,
229                                        logid->lgl_ogen);
230                                 continue;
231                         }
232                         loghandle->u.phd.phd_cat_handle = cathandle;
233                         up_write(&cathandle->lgh_lock);
234                         GOTO(out, rc = 0);
235                 }
236         }
237         up_write(&cathandle->lgh_lock);
238
239         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
240                        LLOG_OPEN_EXISTS);
241         if (rc < 0) {
242                 CERROR("%s: error opening log id "DOSTID":%x: rc = %d\n",
243                        cathandle->lgh_ctxt->loc_obd->obd_name,
244                        POSTID(&logid->lgl_oi), logid->lgl_ogen, rc);
245                 RETURN(rc);
246         }
247
248         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
249         if (rc < 0) {
250                 llog_close(env, loghandle);
251                 loghandle = NULL;
252                 RETURN(rc);
253         }
254
255         down_write(&cathandle->lgh_lock);
256         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
257         up_write(&cathandle->lgh_lock);
258
259         loghandle->u.phd.phd_cat_handle = cathandle;
260         loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
261         loghandle->u.phd.phd_cookie.lgc_index =
262                                 loghandle->lgh_hdr->llh_cat_idx;
263         EXIT;
264 out:
265         llog_handle_get(loghandle);
266         *res = loghandle;
267         return 0;
268 }
269
270 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
271 {
272         struct llog_handle      *loghandle, *n;
273         int                      rc;
274
275         ENTRY;
276
277         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
278                                  u.phd.phd_entry) {
279                 struct llog_log_hdr     *llh = loghandle->lgh_hdr;
280                 int                      index;
281
282                 /* unlink open-not-created llogs */
283                 list_del_init(&loghandle->u.phd.phd_entry);
284                 llh = loghandle->lgh_hdr;
285                 if (loghandle->lgh_obj != NULL && llh != NULL &&
286                     (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
287                     (llh->llh_count == 1)) {
288                         rc = llog_destroy(env, loghandle);
289                         if (rc)
290                                 CERROR("%s: failure destroying log during "
291                                        "cleanup: rc = %d\n",
292                                        loghandle->lgh_ctxt->loc_obd->obd_name,
293                                        rc);
294
295                         index = loghandle->u.phd.phd_cookie.lgc_index;
296                         llog_cat_cleanup(env, cathandle, NULL, index);
297                 }
298                 llog_close(env, loghandle);
299         }
300         /* if handle was stored in ctxt, remove it too */
301         if (cathandle->lgh_ctxt->loc_handle == cathandle)
302                 cathandle->lgh_ctxt->loc_handle = NULL;
303         rc = llog_close(env, cathandle);
304         RETURN(rc);
305 }
306 EXPORT_SYMBOL(llog_cat_close);
307
308 /**
309  * lockdep markers for nested struct llog_handle::lgh_lock locking.
310  */
311 enum {
312         LLOGH_CAT,
313         LLOGH_LOG
314 };
315
316 /** Return the currently active log handle.  If the current log handle doesn't
317  * have enough space left for the current record, start a new one.
318  *
319  * If reclen is 0, we only want to know what the currently active log is,
320  * otherwise we get a lock on this log so nobody can steal our space.
321  *
322  * Assumes caller has already pushed us into the kernel context and is locking.
323  *
324  * NOTE: loghandle is write-locked upon successful return
325  */
326 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
327                                                 struct thandle *th)
328 {
329         struct llog_handle *loghandle = NULL;
330         ENTRY;
331
332
333         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) {
334                 down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
335                 GOTO(next, loghandle);
336         }
337
338         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
339         loghandle = cathandle->u.chd.chd_current_log;
340         if (loghandle) {
341                 struct llog_log_hdr *llh;
342
343                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
344                 llh = loghandle->lgh_hdr;
345                 if (llh == NULL || !llog_is_full(loghandle)) {
346                         up_read(&cathandle->lgh_lock);
347                         RETURN(loghandle);
348                 } else {
349                         up_write(&loghandle->lgh_lock);
350                 }
351         }
352         up_read(&cathandle->lgh_lock);
353
354         /* time to use next log */
355
356         /* first, we have to make sure the state hasn't changed */
357         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
358         loghandle = cathandle->u.chd.chd_current_log;
359         if (loghandle) {
360                 struct llog_log_hdr *llh;
361
362                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
363                 llh = loghandle->lgh_hdr;
364                 LASSERT(llh);
365                 if (!llog_is_full(loghandle)) {
366                         up_write(&cathandle->lgh_lock);
367                         RETURN(loghandle);
368                 } else {
369                         up_write(&loghandle->lgh_lock);
370                 }
371         }
372
373 next:
374         CDEBUG(D_INODE, "use next log\n");
375
376         loghandle = cathandle->u.chd.chd_next_log;
377         cathandle->u.chd.chd_current_log = loghandle;
378         cathandle->u.chd.chd_next_log = NULL;
379         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
380         up_write(&cathandle->lgh_lock);
381         LASSERT(loghandle);
382         RETURN(loghandle);
383 }
384
385 /* Add a single record to the recovery log(s) using a catalog
386  * Returns as llog_write_record
387  *
388  * Assumes caller has already pushed us into the kernel context.
389  */
390 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
391                      struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
392                      struct thandle *th)
393 {
394         struct llog_handle *loghandle;
395         int rc, retried = 0;
396         ENTRY;
397
398         LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
399
400 retry:
401         loghandle = llog_cat_current_log(cathandle, th);
402         LASSERT(!IS_ERR(loghandle));
403
404         /* loghandle is already locked by llog_cat_current_log() for us */
405         if (!llog_exist(loghandle)) {
406                 rc = llog_cat_new_log(env, cathandle, loghandle, th);
407                 if (rc < 0) {
408                         up_write(&loghandle->lgh_lock);
409                         /* nobody should be trying to use this llog */
410                         down_write(&cathandle->lgh_lock);
411                         if (cathandle->u.chd.chd_current_log == loghandle)
412                                 cathandle->u.chd.chd_current_log = NULL;
413                         up_write(&cathandle->lgh_lock);
414                         RETURN(rc);
415                 }
416         }
417         /* now let's try to add the record */
418         rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
419         if (rc < 0) {
420                 CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR,
421                              "llog_write_rec %d: lh=%p\n", rc, loghandle);
422                 /* -ENOSPC is returned if no empty records left
423                  * and when it's lack of space on the stogage.
424                  * there is no point to try again if it's the second
425                  * case. many callers (like llog test) expect ENOSPC,
426                  * so we preserve this error code, but look for the
427                  * actual cause here */
428                 if (rc == -ENOSPC && llog_is_full(loghandle))
429                         rc = -ENOBUFS;
430         }
431         up_write(&loghandle->lgh_lock);
432
433         if (rc == -ENOBUFS) {
434                 if (retried++ == 0)
435                         GOTO(retry, rc);
436                 CERROR("%s: error on 2nd llog: rc = %d\n",
437                        cathandle->lgh_ctxt->loc_obd->obd_name, rc);
438         }
439
440         RETURN(rc);
441 }
442 EXPORT_SYMBOL(llog_cat_add_rec);
443
444 int llog_cat_declare_add_rec(const struct lu_env *env,
445                              struct llog_handle *cathandle,
446                              struct llog_rec_hdr *rec, struct thandle *th)
447 {
448         struct llog_thread_info *lgi = llog_info(env);
449         struct llog_logid_rec   *lirec = &lgi->lgi_logid;
450         struct llog_handle      *loghandle, *next;
451         int                      rc = 0;
452
453         ENTRY;
454
455         if (cathandle->u.chd.chd_current_log == NULL) {
456                 /* declare new plain llog */
457                 down_write(&cathandle->lgh_lock);
458                 if (cathandle->u.chd.chd_current_log == NULL) {
459                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
460                                        NULL, NULL, LLOG_OPEN_NEW);
461                         if (rc == 0) {
462                                 cathandle->u.chd.chd_current_log = loghandle;
463                                 list_add_tail(&loghandle->u.phd.phd_entry,
464                                               &cathandle->u.chd.chd_head);
465                         }
466                 }
467                 up_write(&cathandle->lgh_lock);
468         } else if (cathandle->u.chd.chd_next_log == NULL) {
469                 /* declare next plain llog */
470                 down_write(&cathandle->lgh_lock);
471                 if (cathandle->u.chd.chd_next_log == NULL) {
472                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
473                                        NULL, NULL, LLOG_OPEN_NEW);
474                         if (rc == 0) {
475                                 cathandle->u.chd.chd_next_log = loghandle;
476                                 list_add_tail(&loghandle->u.phd.phd_entry,
477                                               &cathandle->u.chd.chd_head);
478                         }
479                 }
480                 up_write(&cathandle->lgh_lock);
481         }
482         if (rc)
483                 GOTO(out, rc);
484
485         lirec->lid_hdr.lrh_len = sizeof(*lirec);
486
487         if (!llog_exist(cathandle->u.chd.chd_current_log)) {
488                 if (dt_object_remote(cathandle->lgh_obj)) {
489                         /* For remote operation, if we put the llog object
490                          * creation in the current transaction, then the
491                          * llog object will not be created on the remote
492                          * target until the transaction stop, if other
493                          * operations start before the transaction stop,
494                          * and use the same llog object, will be dependent
495                          * on the success of this transaction. So let's
496                          * create the llog object synchronously here to
497                          * remove the dependency. */
498                         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
499                         loghandle = cathandle->u.chd.chd_current_log;
500                         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
501                         if (!llog_exist(loghandle))
502                                 rc = llog_cat_new_log(env, cathandle, loghandle,
503                                                       NULL);
504                         up_write(&loghandle->lgh_lock);
505                         up_read(&cathandle->lgh_lock);
506                         if (rc < 0)
507                                 GOTO(out, rc);
508
509                 } else {
510                         rc = llog_declare_create(env,
511                                         cathandle->u.chd.chd_current_log, th);
512                         if (rc)
513                                 GOTO(out, rc);
514                         llog_declare_write_rec(env, cathandle,
515                                                &lirec->lid_hdr, -1, th);
516                 }
517         }
518         /* declare records in the llogs */
519         rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
520                                     rec, -1, th);
521         if (rc)
522                 GOTO(out, rc);
523
524         next = cathandle->u.chd.chd_next_log;
525         if (next) {
526                 if (!llog_exist(next)) {
527                         if (dt_object_remote(cathandle->lgh_obj)) {
528                                 /* For remote operation, if we put the llog
529                                  * object creation in the current transaction,
530                                  * then the llog object will not be created on
531                                  * the remote target until the transaction stop,
532                                  * if other operations start before the
533                                  * transaction stop, and use the same llog
534                                  * object, will be dependent on the success of
535                                  * this transaction. So let's create the llog
536                                  * object synchronously here to remove the
537                                  * dependency. */
538                                 down_read_nested(&cathandle->lgh_lock,
539                                                  LLOGH_CAT);
540                                 next = cathandle->u.chd.chd_next_log;
541                                 down_write_nested(&next->lgh_lock, LLOGH_LOG);
542                                 if (!llog_exist(next))
543                                         rc = llog_cat_new_log(env, cathandle,
544                                                               next, NULL);
545                                 up_write(&next->lgh_lock);
546                                 up_read(&cathandle->lgh_lock);
547                                 if (rc < 0)
548                                         GOTO(out, rc);
549                         } else {
550                                 rc = llog_declare_create(env, next, th);
551                                 llog_declare_write_rec(env, cathandle,
552                                                 &lirec->lid_hdr, -1, th);
553                         }
554                 }
555                 /* XXX: we hope for declarations made for existing llog
556                  *      this might be not correct with some backends
557                  *      where declarations are expected against specific
558                  *      object like ZFS with full debugging enabled */
559                 /*llog_declare_write_rec(env, next, rec, -1, th);*/
560         }
561 out:
562         RETURN(rc);
563 }
564 EXPORT_SYMBOL(llog_cat_declare_add_rec);
565
566 int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
567                  struct llog_rec_hdr *rec, struct llog_cookie *reccookie)
568 {
569         struct llog_ctxt        *ctxt;
570         struct dt_device        *dt;
571         struct thandle          *th = NULL;
572         int                      rc;
573
574         ctxt = cathandle->lgh_ctxt;
575         LASSERT(ctxt);
576         LASSERT(ctxt->loc_exp);
577
578         LASSERT(cathandle->lgh_obj != NULL);
579         dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
580
581         th = dt_trans_create(env, dt);
582         if (IS_ERR(th))
583                 RETURN(PTR_ERR(th));
584
585         rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
586         if (rc)
587                 GOTO(out_trans, rc);
588
589         rc = dt_trans_start_local(env, dt, th);
590         if (rc)
591                 GOTO(out_trans, rc);
592         rc = llog_cat_add_rec(env, cathandle, rec, reccookie, th);
593 out_trans:
594         dt_trans_stop(env, dt, th);
595         RETURN(rc);
596 }
597 EXPORT_SYMBOL(llog_cat_add);
598
599 /* For each cookie in the cookie array, we clear the log in-use bit and either:
600  * - the log is empty, so mark it free in the catalog header and delete it
601  * - the log is not empty, just write out the log header
602  *
603  * The cookies may be in different log files, so we need to get new logs
604  * each time.
605  *
606  * Assumes caller has already pushed us into the kernel context.
607  */
608 int llog_cat_cancel_records(const struct lu_env *env,
609                             struct llog_handle *cathandle, int count,
610                             struct llog_cookie *cookies)
611 {
612         int i, index, rc = 0, failed = 0;
613
614         ENTRY;
615
616         for (i = 0; i < count; i++, cookies++) {
617                 struct llog_handle      *loghandle;
618                 struct llog_logid       *lgl = &cookies->lgc_lgl;
619                 int                      lrc;
620
621                 rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
622                 if (rc) {
623                         CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
624                                cathandle->lgh_ctxt->loc_obd->obd_name,
625                                POSTID(&lgl->lgl_oi), rc);
626                         failed++;
627                         continue;
628                 }
629
630                 lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
631                 if (lrc == LLOG_DEL_PLAIN) { /* log has been destroyed */
632                         index = loghandle->u.phd.phd_cookie.lgc_index;
633                         rc = llog_cat_cleanup(env, cathandle, loghandle,
634                                               index);
635                 } else if (lrc == -ENOENT) {
636                         if (rc == 0) /* ENOENT shouldn't rewrite any error */
637                                 rc = lrc;
638                 } else if (lrc < 0) {
639                         failed++;
640                         rc = lrc;
641                 }
642                 llog_handle_put(loghandle);
643         }
644         if (rc)
645                 CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
646                        cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
647                        rc);
648
649         RETURN(rc);
650 }
651 EXPORT_SYMBOL(llog_cat_cancel_records);
652
653 static int llog_cat_process_cb(const struct lu_env *env,
654                                struct llog_handle *cat_llh,
655                                struct llog_rec_hdr *rec, void *data)
656 {
657         struct llog_process_data *d = data;
658         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
659         struct llog_handle *llh;
660         struct llog_log_hdr *hdr;
661         int rc;
662
663         ENTRY;
664         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
665                 CERROR("invalid record in catalog\n");
666                 RETURN(-EINVAL);
667         }
668         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
669                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
670                rec->lrh_index, POSTID(&cat_llh->lgh_id.lgl_oi));
671
672         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
673         if (rc) {
674                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
675                        cat_llh->lgh_ctxt->loc_obd->obd_name,
676                        POSTID(&lir->lid_id.lgl_oi), rc);
677                 if (rc == -ENOENT || rc == -ESTALE) {
678                         /* After a server crash, a stub of index
679                          * record in catlog could be kept, because
680                          * plain log destroy + catlog index record
681                          * deletion are not atomic. So we end up with
682                          * an index but no actual record. Destroy the
683                          * index and move on. */
684                         rc = llog_cat_cleanup(env, cat_llh, NULL,
685                                               rec->lrh_index);
686                 }
687
688                 RETURN(rc);
689         }
690
691         /* clean old empty llogs, do not consider current llog in use */
692         /* ignore remote (lgh_obj=NULL) llogs */
693         hdr = llh->lgh_hdr;
694         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
695             hdr->llh_count == 1 && cat_llh->lgh_obj != NULL &&
696             llh != cat_llh->u.chd.chd_current_log) {
697                 rc = llog_destroy(env, llh);
698                 if (rc)
699                         CERROR("%s: fail to destroy empty log: rc = %d\n",
700                                llh->lgh_ctxt->loc_obd->obd_name, rc);
701                 GOTO(out, rc = LLOG_DEL_PLAIN);
702         }
703
704         if (rec->lrh_index < d->lpd_startcat) {
705                 /* Skip processing of the logs until startcat */
706                 rc = 0;
707         } else if (d->lpd_startidx > 0) {
708                 struct llog_process_cat_data cd;
709
710                 cd.lpcd_first_idx = d->lpd_startidx;
711                 cd.lpcd_last_idx = 0;
712                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
713                                           &cd, false);
714                 /* Continue processing the next log from idx 0 */
715                 d->lpd_startidx = 0;
716         } else {
717                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
718                                           NULL, false);
719         }
720
721 out:
722         /* The empty plain log was destroyed while processing */
723         if (rc == LLOG_DEL_PLAIN)
724                 rc = llog_cat_cleanup(env, cat_llh, llh,
725                                       llh->u.phd.phd_cookie.lgc_index);
726         llog_handle_put(llh);
727
728         RETURN(rc);
729 }
730
731 int llog_cat_process_or_fork(const struct lu_env *env,
732                              struct llog_handle *cat_llh,
733                              llog_cb_t cb, void *data, int startcat,
734                              int startidx, bool fork)
735 {
736         struct llog_process_data d;
737         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
738         int rc;
739         ENTRY;
740
741         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
742         d.lpd_data = data;
743         d.lpd_cb = cb;
744         d.lpd_startcat = startcat;
745         d.lpd_startidx = startidx;
746
747         if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
748             llh->llh_count > 1) {
749                 struct llog_process_cat_data cd;
750
751                 CWARN("catlog "DOSTID" crosses index zero\n",
752                       POSTID(&cat_llh->lgh_id.lgl_oi));
753
754                 cd.lpcd_first_idx = llh->llh_cat_idx;
755                 cd.lpcd_last_idx = 0;
756                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
757                                           &d, &cd, fork);
758                 if (rc != 0)
759                         RETURN(rc);
760
761                 cd.lpcd_first_idx = 0;
762                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
763                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
764                                           &d, &cd, fork);
765         } else {
766                 rc = llog_process_or_fork(env, cat_llh, llog_cat_process_cb,
767                                           &d, NULL, fork);
768         }
769
770         RETURN(rc);
771 }
772
773 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
774                      llog_cb_t cb, void *data, int startcat, int startidx)
775 {
776         return llog_cat_process_or_fork(env, cat_llh, cb, data, startcat,
777                                         startidx, false);
778 }
779 EXPORT_SYMBOL(llog_cat_process);
780
781 static int llog_cat_reverse_process_cb(const struct lu_env *env,
782                                        struct llog_handle *cat_llh,
783                                        struct llog_rec_hdr *rec, void *data)
784 {
785         struct llog_process_data *d = data;
786         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
787         struct llog_handle *llh;
788         struct llog_log_hdr *hdr;
789         int rc;
790
791         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
792                 CERROR("invalid record in catalog\n");
793                 RETURN(-EINVAL);
794         }
795         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
796                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
797                le32_to_cpu(rec->lrh_index), POSTID(&cat_llh->lgh_id.lgl_oi));
798
799         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
800         if (rc) {
801                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
802                        cat_llh->lgh_ctxt->loc_obd->obd_name,
803                        POSTID(&lir->lid_id.lgl_oi), rc);
804                 if (rc == -ENOENT || rc == -ESTALE) {
805                         /* After a server crash, a stub of index
806                          * record in catlog could be kept, because
807                          * plain log destroy + catlog index record
808                          * deletion are not atomic. So we end up with
809                          * an index but no actual record. Destroy the
810                          * index and move on. */
811                         rc = llog_cat_cleanup(env, cat_llh, NULL,
812                                               rec->lrh_index);
813                 }
814
815                 RETURN(rc);
816         }
817
818         /* clean old empty llogs, do not consider current llog in use */
819         hdr = llh->lgh_hdr;
820         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
821             hdr->llh_count == 1 &&
822             llh != cat_llh->u.chd.chd_current_log) {
823                 rc = llog_destroy(env, llh);
824                 if (rc)
825                         CERROR("%s: fail to destroy empty log: rc = %d\n",
826                                llh->lgh_ctxt->loc_obd->obd_name, rc);
827                 GOTO(out, rc = LLOG_DEL_PLAIN);
828         }
829
830         rc = llog_reverse_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
831
832 out:
833         /* The empty plain was destroyed while processing */
834         if (rc == LLOG_DEL_PLAIN)
835                 rc = llog_cat_cleanup(env, cat_llh, llh,
836                                       llh->u.phd.phd_cookie.lgc_index);
837
838         llog_handle_put(llh);
839         RETURN(rc);
840 }
841
842 int llog_cat_reverse_process(const struct lu_env *env,
843                              struct llog_handle *cat_llh,
844                              llog_cb_t cb, void *data)
845 {
846         struct llog_process_data d;
847         struct llog_process_cat_data cd;
848         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
849         int rc;
850         ENTRY;
851
852         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
853         d.lpd_data = data;
854         d.lpd_cb = cb;
855
856         if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
857             llh->llh_count > 1) {
858                 CWARN("catalog "DOSTID" crosses index zero\n",
859                       POSTID(&cat_llh->lgh_id.lgl_oi));
860
861                 cd.lpcd_first_idx = 0;
862                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
863                 rc = llog_reverse_process(env, cat_llh,
864                                           llog_cat_reverse_process_cb,
865                                           &d, &cd);
866                 if (rc != 0)
867                         RETURN(rc);
868
869                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
870                 cd.lpcd_last_idx = 0;
871                 rc = llog_reverse_process(env, cat_llh,
872                                           llog_cat_reverse_process_cb,
873                                           &d, &cd);
874         } else {
875                 rc = llog_reverse_process(env, cat_llh,
876                                           llog_cat_reverse_process_cb,
877                                           &d, NULL);
878         }
879
880         RETURN(rc);
881 }
882 EXPORT_SYMBOL(llog_cat_reverse_process);
883
884 static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
885 {
886         struct llog_log_hdr *llh = cathandle->lgh_hdr;
887         int bitmap_size;
888
889         ENTRY;
890
891         bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
892         /*
893          * The llh_cat_idx equals to the first used index minus 1
894          * so if we canceled the first index then llh_cat_idx
895          * must be renewed.
896          */
897         if (llh->llh_cat_idx == (idx - 1)) {
898                 llh->llh_cat_idx = idx;
899
900                 while (idx != cathandle->lgh_last_idx) {
901                         idx = (idx + 1) % bitmap_size;
902                         if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
903                                 /* update llh_cat_idx for each unset bit,
904                                  * expecting the next one is set */
905                                 llh->llh_cat_idx = idx;
906                         } else if (idx == 0) {
907                                 /* skip header bit */
908                                 llh->llh_cat_idx = 0;
909                                 continue;
910                         } else {
911                                 /* the first index is found */
912                                 break;
913                         }
914                 }
915
916                 CDEBUG(D_RPCTRACE, "Set catlog "DOSTID" first idx %u,"
917                        " (last_idx %u)\n", POSTID(&cathandle->lgh_id.lgl_oi),
918                        llh->llh_cat_idx, cathandle->lgh_last_idx);
919         }
920
921         RETURN(0);
922 }
923
924 /* Cleanup deleted plain llog traces from catalog */
925 int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
926                      struct llog_handle *loghandle, int index)
927 {
928         int rc;
929
930         LASSERT(index);
931         if (loghandle != NULL) {
932                 /* remove destroyed llog from catalog list and
933                  * chd_current_log variable */
934                 down_write(&cathandle->lgh_lock);
935                 if (cathandle->u.chd.chd_current_log == loghandle)
936                         cathandle->u.chd.chd_current_log = NULL;
937                 list_del_init(&loghandle->u.phd.phd_entry);
938                 up_write(&cathandle->lgh_lock);
939                 LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index);
940                 /* llog was opened and keep in a list, close it now */
941                 llog_close(env, loghandle);
942         }
943
944         /* do not attempt to cleanup on-disk llog if on client side */
945         if (cathandle->lgh_obj == NULL)
946                 return 0;
947
948         /* remove plain llog entry from catalog by index */
949         llog_cat_set_first_idx(cathandle, index);
950         rc = llog_cancel_rec(env, cathandle, index);
951         if (rc == 0)
952                 CDEBUG(D_HA, "cancel plain log at index"
953                        " %u of catalog "DOSTID"\n",
954                        index, POSTID(&cathandle->lgh_id.lgl_oi));
955         return rc;
956 }
957
958 /* helper to initialize catalog llog and process it to cancel */
959 int llog_cat_init_and_process(const struct lu_env *env,
960                               struct llog_handle *llh)
961 {
962         int rc;
963
964         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, NULL);
965         if (rc)
966                 RETURN(rc);
967
968         RETURN(0);
969 }
970 EXPORT_SYMBOL(llog_cat_init_and_process);