Whamcloud - gitweb
LU-7117 osp: set ptlrpc_request::rq_allow_replay properly
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_cat.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  *
40  * Invariants in implementation:
41  * - we do not share logs among different OST<->MDS connections, so that
42  *   if an OST or MDS fails it need only look at log(s) relevant to itself
43  *
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
46  * Author: Mikhail Pershin <mike.pershin@intel.com>
47  */
48
49 #define DEBUG_SUBSYSTEM S_LOG
50
51
52 #include <obd_class.h>
53
54 #include "llog_internal.h"
55
56 /* Create a new log handle and add it to the open list.
57  * This log handle will be closed when all of the records in it are removed.
58  *
59  * Assumes caller has already pushed us into the kernel context and is locking.
60  */
61 static int llog_cat_new_log(const struct lu_env *env,
62                             struct llog_handle *cathandle,
63                             struct llog_handle *loghandle,
64                             struct thandle *th)
65 {
66         struct llog_thread_info *lgi = llog_info(env);
67         struct llog_logid_rec   *rec = &lgi->lgi_logid;
68         struct thandle *handle = NULL;
69         struct dt_device *dt = NULL;
70         struct llog_log_hdr     *llh = cathandle->lgh_hdr;
71         int                      rc, index;
72
73         ENTRY;
74
75         index = (cathandle->lgh_last_idx + 1) %
76                 (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) :
77                                                 LLOG_HDR_BITMAP_SIZE(llh));
78
79         /* check that new llog index will not overlap with the first one.
80          * - llh_cat_idx is the index just before the first/oldest still in-use
81          *      index in catalog
82          * - lgh_last_idx is the last/newest used index in catalog
83          *
84          * When catalog is not wrapped yet then lgh_last_idx is always larger
85          * than llh_cat_idx. After the wrap around lgh_last_idx re-starts
86          * from 0 and llh_cat_idx becomes the upper limit for it
87          *
88          * Check if catalog has already wrapped around or not by comparing
89          * last_idx and cat_idx */
90         if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) ||
91             (index == 0 && llh->llh_cat_idx == 0)) {
92                 CWARN("%s: there are no more free slots in catalog\n",
93                       loghandle->lgh_ctxt->loc_obd->obd_name);
94                 RETURN(-ENOSPC);
95         }
96
97         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
98                 RETURN(-ENOSPC);
99
100         if (loghandle->lgh_hdr != NULL) {
101                 /* If llog object is remote and creation is failed, lgh_hdr
102                  * might be left over here, free it first */
103                 LASSERT(!llog_exist(loghandle));
104                 OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
105                 loghandle->lgh_hdr = NULL;
106         }
107
108         if (th == NULL) {
109                 dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
110
111                 handle = dt_trans_create(env, dt);
112                 if (IS_ERR(handle))
113                         RETURN(PTR_ERR(handle));
114
115                 /* Create update llog object synchronously, which
116                  * happens during inialization process see
117                  * lod_sub_prep_llog(), to make sure the update
118                  * llog object is created before corss-MDT writing
119                  * updates into the llog object */
120                 if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
121                         handle->th_sync = 1;
122
123                 handle->th_wait_submit = 1;
124
125                 rc = llog_declare_create(env, loghandle, handle);
126                 if (rc != 0)
127                         GOTO(out, rc);
128
129                 rec->lid_hdr.lrh_len = sizeof(*rec);
130                 rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
131                 rec->lid_id = loghandle->lgh_id;
132                 rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
133                                             handle);
134                 if (rc != 0)
135                         GOTO(out, rc);
136
137                 rc = dt_trans_start_local(env, dt, handle);
138                 if (rc != 0)
139                         GOTO(out, rc);
140
141                 th = handle;
142         }
143
144         rc = llog_create(env, loghandle, th);
145         /* if llog is already created, no need to initialize it */
146         if (rc == -EEXIST) {
147                 GOTO(out, rc = 0);
148         } else if (rc != 0) {
149                 CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
150                        loghandle->lgh_ctxt->loc_obd->obd_name, rc);
151                 GOTO(out, rc);
152         }
153
154         rc = llog_init_handle(env, loghandle,
155                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
156                               &cathandle->lgh_hdr->llh_tgtuuid);
157         if (rc < 0)
158                 GOTO(out, rc);
159
160         /* build the record for this log in the catalog */
161         rec->lid_hdr.lrh_len = sizeof(*rec);
162         rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
163         rec->lid_id = loghandle->lgh_id;
164
165         /* append the new record into catalog. The new index will be
166          * assigned to the record and updated in rec header */
167         rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
168                             &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
169         if (rc < 0)
170                 GOTO(out_destroy, rc);
171
172         CDEBUG(D_OTHER, "new plain log "DOSTID":%x for index %u of catalog"
173                DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
174                loghandle->lgh_id.lgl_ogen, rec->lid_hdr.lrh_index,
175                POSTID(&cathandle->lgh_id.lgl_oi));
176
177         loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
178 out:
179         if (handle != NULL) {
180                 handle->th_result = rc >= 0 ? 0 : rc;
181                 dt_trans_stop(env, dt, handle);
182         }
183         RETURN(rc);
184
185 out_destroy:
186         /* to signal llog_cat_close() it shouldn't try to destroy the llog,
187          * we want to destroy it in this transaction, otherwise the object
188          * becomes an orphan */
189         loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
190         /* this is to mimic full log, so another llog_cat_current_log()
191          * can skip it and ask for another onet */
192         loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) + 1;
193         llog_trans_destroy(env, loghandle, th);
194         RETURN(rc);
195 }
196
197 /* Open an existent log handle and add it to the open list.
198  * This log handle will be closed when all of the records in it are removed.
199  *
200  * Assumes caller has already pushed us into the kernel context and is locking.
201  * We return a lock on the handle to ensure nobody yanks it from us.
202  *
203  * This takes extra reference on llog_handle via llog_handle_get() and require
204  * this reference to be put by caller using llog_handle_put()
205  */
206 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
207                        struct llog_handle **res, struct llog_logid *logid)
208 {
209         struct llog_handle      *loghandle;
210         enum llog_flag           fmt;
211         int                      rc = 0;
212
213         ENTRY;
214
215         if (cathandle == NULL)
216                 RETURN(-EBADF);
217
218         fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK;
219         down_write(&cathandle->lgh_lock);
220         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
221                             u.phd.phd_entry) {
222                 struct llog_logid *cgl = &loghandle->lgh_id;
223
224                 if (ostid_id(&cgl->lgl_oi) == ostid_id(&logid->lgl_oi) &&
225                     ostid_seq(&cgl->lgl_oi) == ostid_seq(&logid->lgl_oi)) {
226                         if (cgl->lgl_ogen != logid->lgl_ogen) {
227                                 CERROR("%s: log "DOSTID" generation %x != %x\n",
228                                        loghandle->lgh_ctxt->loc_obd->obd_name,
229                                        POSTID(&logid->lgl_oi), cgl->lgl_ogen,
230                                        logid->lgl_ogen);
231                                 continue;
232                         }
233                         loghandle->u.phd.phd_cat_handle = cathandle;
234                         up_write(&cathandle->lgh_lock);
235                         GOTO(out, rc = 0);
236                 }
237         }
238         up_write(&cathandle->lgh_lock);
239
240         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
241                        LLOG_OPEN_EXISTS);
242         if (rc < 0) {
243                 CERROR("%s: error opening log id "DOSTID":%x: rc = %d\n",
244                        cathandle->lgh_ctxt->loc_obd->obd_name,
245                        POSTID(&logid->lgl_oi), logid->lgl_ogen, rc);
246                 RETURN(rc);
247         }
248
249         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
250         if (rc < 0) {
251                 llog_close(env, loghandle);
252                 loghandle = NULL;
253                 RETURN(rc);
254         }
255
256         down_write(&cathandle->lgh_lock);
257         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
258         up_write(&cathandle->lgh_lock);
259
260         loghandle->u.phd.phd_cat_handle = cathandle;
261         loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
262         loghandle->u.phd.phd_cookie.lgc_index =
263                                 loghandle->lgh_hdr->llh_cat_idx;
264         EXIT;
265 out:
266         llog_handle_get(loghandle);
267         *res = loghandle;
268         return 0;
269 }
270
271 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
272 {
273         struct llog_handle      *loghandle, *n;
274         int                      rc;
275
276         ENTRY;
277
278         list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
279                                  u.phd.phd_entry) {
280                 struct llog_log_hdr     *llh = loghandle->lgh_hdr;
281                 int                      index;
282
283                 /* unlink open-not-created llogs */
284                 list_del_init(&loghandle->u.phd.phd_entry);
285                 llh = loghandle->lgh_hdr;
286                 if (loghandle->lgh_obj != NULL && llh != NULL &&
287                     (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
288                     (llh->llh_count == 1)) {
289                         rc = llog_destroy(env, loghandle);
290                         if (rc)
291                                 CERROR("%s: failure destroying log during "
292                                        "cleanup: rc = %d\n",
293                                        loghandle->lgh_ctxt->loc_obd->obd_name,
294                                        rc);
295
296                         index = loghandle->u.phd.phd_cookie.lgc_index;
297                         llog_cat_cleanup(env, cathandle, NULL, index);
298                 }
299                 llog_close(env, loghandle);
300         }
301         /* if handle was stored in ctxt, remove it too */
302         if (cathandle->lgh_ctxt->loc_handle == cathandle)
303                 cathandle->lgh_ctxt->loc_handle = NULL;
304         rc = llog_close(env, cathandle);
305         RETURN(rc);
306 }
307 EXPORT_SYMBOL(llog_cat_close);
308
309 /**
310  * lockdep markers for nested struct llog_handle::lgh_lock locking.
311  */
312 enum {
313         LLOGH_CAT,
314         LLOGH_LOG
315 };
316
317 /** Return the currently active log handle.  If the current log handle doesn't
318  * have enough space left for the current record, start a new one.
319  *
320  * If reclen is 0, we only want to know what the currently active log is,
321  * otherwise we get a lock on this log so nobody can steal our space.
322  *
323  * Assumes caller has already pushed us into the kernel context and is locking.
324  *
325  * NOTE: loghandle is write-locked upon successful return
326  */
327 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
328                                                 struct thandle *th)
329 {
330         struct llog_handle *loghandle = NULL;
331         ENTRY;
332
333
334         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) {
335                 down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
336                 GOTO(next, loghandle);
337         }
338
339         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
340         loghandle = cathandle->u.chd.chd_current_log;
341         if (loghandle) {
342                 struct llog_log_hdr *llh;
343
344                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
345                 llh = loghandle->lgh_hdr;
346                 if (llh == NULL || !llog_is_full(loghandle)) {
347                         up_read(&cathandle->lgh_lock);
348                         RETURN(loghandle);
349                 } else {
350                         up_write(&loghandle->lgh_lock);
351                 }
352         }
353         up_read(&cathandle->lgh_lock);
354
355         /* time to use next log */
356
357         /* first, we have to make sure the state hasn't changed */
358         down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
359         loghandle = cathandle->u.chd.chd_current_log;
360         if (loghandle) {
361                 struct llog_log_hdr *llh;
362
363                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
364                 llh = loghandle->lgh_hdr;
365                 LASSERT(llh);
366                 if (!llog_is_full(loghandle)) {
367                         up_write(&cathandle->lgh_lock);
368                         RETURN(loghandle);
369                 } else {
370                         up_write(&loghandle->lgh_lock);
371                 }
372         }
373
374 next:
375         CDEBUG(D_INODE, "use next log\n");
376
377         loghandle = cathandle->u.chd.chd_next_log;
378         cathandle->u.chd.chd_current_log = loghandle;
379         cathandle->u.chd.chd_next_log = NULL;
380         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
381         up_write(&cathandle->lgh_lock);
382         LASSERT(loghandle);
383         RETURN(loghandle);
384 }
385
386 static int llog_cat_update_header(const struct lu_env *env,
387                            struct llog_handle *cathandle)
388 {
389         struct llog_handle *loghandle;
390         int rc;
391         ENTRY;
392
393         /* refresh llog */
394         down_write(&cathandle->lgh_lock);
395         if (!cathandle->lgh_stale) {
396                 up_write(&cathandle->lgh_lock);
397                 RETURN(0);
398         }
399         list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
400                             u.phd.phd_entry) {
401                 if (!llog_exist(loghandle))
402                         continue;
403
404                 rc = llog_read_header(env, loghandle, NULL);
405                 if (rc != 0) {
406                         up_write(&cathandle->lgh_lock);
407                         GOTO(out, rc);
408                 }
409         }
410         rc = llog_read_header(env, cathandle, NULL);
411         if (rc == 0)
412                 cathandle->lgh_stale = 0;
413         up_write(&cathandle->lgh_lock);
414         if (rc != 0)
415                 GOTO(out, rc);
416 out:
417         RETURN(rc);
418 }
419
420 /* Add a single record to the recovery log(s) using a catalog
421  * Returns as llog_write_record
422  *
423  * Assumes caller has already pushed us into the kernel context.
424  */
425 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
426                      struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
427                      struct thandle *th)
428 {
429         struct llog_handle *loghandle;
430         int rc, retried = 0;
431         ENTRY;
432
433         LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
434
435 retry:
436         loghandle = llog_cat_current_log(cathandle, th);
437         LASSERT(!IS_ERR(loghandle));
438
439         /* loghandle is already locked by llog_cat_current_log() for us */
440         if (!llog_exist(loghandle)) {
441                 rc = llog_cat_new_log(env, cathandle, loghandle, th);
442                 if (rc < 0) {
443                         up_write(&loghandle->lgh_lock);
444                         /* nobody should be trying to use this llog */
445                         down_write(&cathandle->lgh_lock);
446                         if (cathandle->u.chd.chd_current_log == loghandle)
447                                 cathandle->u.chd.chd_current_log = NULL;
448                         up_write(&cathandle->lgh_lock);
449                         RETURN(rc);
450                 }
451         }
452         /* now let's try to add the record */
453         rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
454         if (rc < 0) {
455                 CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR,
456                              "llog_write_rec %d: lh=%p\n", rc, loghandle);
457                 /* -ENOSPC is returned if no empty records left
458                  * and when it's lack of space on the stogage.
459                  * there is no point to try again if it's the second
460                  * case. many callers (like llog test) expect ENOSPC,
461                  * so we preserve this error code, but look for the
462                  * actual cause here */
463                 if (rc == -ENOSPC && llog_is_full(loghandle))
464                         rc = -ENOBUFS;
465         }
466         up_write(&loghandle->lgh_lock);
467
468         if (rc == -ENOBUFS) {
469                 if (retried++ == 0)
470                         GOTO(retry, rc);
471                 CERROR("%s: error on 2nd llog: rc = %d\n",
472                        cathandle->lgh_ctxt->loc_obd->obd_name, rc);
473         }
474
475         RETURN(rc);
476 }
477 EXPORT_SYMBOL(llog_cat_add_rec);
478
479 int llog_cat_declare_add_rec(const struct lu_env *env,
480                              struct llog_handle *cathandle,
481                              struct llog_rec_hdr *rec, struct thandle *th)
482 {
483         struct llog_thread_info *lgi = llog_info(env);
484         struct llog_logid_rec   *lirec = &lgi->lgi_logid;
485         struct llog_handle      *loghandle, *next;
486         int                      rc = 0;
487
488         ENTRY;
489
490         if (cathandle->u.chd.chd_current_log == NULL) {
491                 /* declare new plain llog */
492                 down_write(&cathandle->lgh_lock);
493                 if (cathandle->u.chd.chd_current_log == NULL) {
494                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
495                                        NULL, NULL, LLOG_OPEN_NEW);
496                         if (rc == 0) {
497                                 cathandle->u.chd.chd_current_log = loghandle;
498                                 list_add_tail(&loghandle->u.phd.phd_entry,
499                                               &cathandle->u.chd.chd_head);
500                         }
501                 }
502                 up_write(&cathandle->lgh_lock);
503         } else if (cathandle->u.chd.chd_next_log == NULL) {
504                 /* declare next plain llog */
505                 down_write(&cathandle->lgh_lock);
506                 if (cathandle->u.chd.chd_next_log == NULL) {
507                         rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
508                                        NULL, NULL, LLOG_OPEN_NEW);
509                         if (rc == 0) {
510                                 cathandle->u.chd.chd_next_log = loghandle;
511                                 list_add_tail(&loghandle->u.phd.phd_entry,
512                                               &cathandle->u.chd.chd_head);
513                         }
514                 }
515                 up_write(&cathandle->lgh_lock);
516         }
517         if (rc)
518                 GOTO(out, rc);
519
520         lirec->lid_hdr.lrh_len = sizeof(*lirec);
521
522         if (!llog_exist(cathandle->u.chd.chd_current_log)) {
523                 if (dt_object_remote(cathandle->lgh_obj)) {
524                         /* For remote operation, if we put the llog object
525                          * creation in the current transaction, then the
526                          * llog object will not be created on the remote
527                          * target until the transaction stop, if other
528                          * operations start before the transaction stop,
529                          * and use the same llog object, will be dependent
530                          * on the success of this transaction. So let's
531                          * create the llog object synchronously here to
532                          * remove the dependency. */
533 create_again:
534                         down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
535                         loghandle = cathandle->u.chd.chd_current_log;
536                         down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
537                         if (cathandle->lgh_stale) {
538                                 up_write(&loghandle->lgh_lock);
539                                 up_read(&cathandle->lgh_lock);
540                                 GOTO(out, rc = -EIO);
541                         }
542                         if (!llog_exist(loghandle)) {
543                                 rc = llog_cat_new_log(env, cathandle, loghandle,
544                                                       NULL);
545                                 if (rc == -ESTALE)
546                                         cathandle->lgh_stale = 1;
547                         }
548                         up_write(&loghandle->lgh_lock);
549                         up_read(&cathandle->lgh_lock);
550                         if (rc == -ESTALE) {
551                                 rc = llog_cat_update_header(env, cathandle);
552                                 if (rc != 0)
553                                         GOTO(out, rc);
554                                 goto create_again;
555                         } else if (rc < 0) {
556                                 GOTO(out, rc);
557                         }
558                 } else {
559                         rc = llog_declare_create(env,
560                                         cathandle->u.chd.chd_current_log, th);
561                         if (rc)
562                                 GOTO(out, rc);
563                         llog_declare_write_rec(env, cathandle,
564                                                &lirec->lid_hdr, -1, th);
565                 }
566         }
567
568 write_again:
569         /* declare records in the llogs */
570         rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
571                                     rec, -1, th);
572         if (rc == -ESTALE) {
573                 down_write(&cathandle->lgh_lock);
574                 if (cathandle->lgh_stale) {
575                         up_write(&cathandle->lgh_lock);
576                         GOTO(out, rc = -EIO);
577                 }
578
579                 cathandle->lgh_stale = 1;
580                 up_write(&cathandle->lgh_lock);
581                 rc = llog_cat_update_header(env, cathandle);
582                 if (rc != 0)
583                         GOTO(out, rc);
584                 goto write_again;
585         } else if (rc < 0) {
586                 GOTO(out, rc);
587         }
588
589         next = cathandle->u.chd.chd_next_log;
590         if (next) {
591                 if (!llog_exist(next)) {
592                         if (dt_object_remote(cathandle->lgh_obj)) {
593                                 /* For remote operation, if we put the llog
594                                  * object creation in the current transaction,
595                                  * then the llog object will not be created on
596                                  * the remote target until the transaction stop,
597                                  * if other operations start before the
598                                  * transaction stop, and use the same llog
599                                  * object, will be dependent on the success of
600                                  * this transaction. So let's create the llog
601                                  * object synchronously here to remove the
602                                  * dependency. */
603                                 down_read_nested(&cathandle->lgh_lock,
604                                                  LLOGH_CAT);
605                                 next = cathandle->u.chd.chd_next_log;
606                                 down_write_nested(&next->lgh_lock, LLOGH_LOG);
607                                 if (!llog_exist(next))
608                                         rc = llog_cat_new_log(env, cathandle,
609                                                               next, NULL);
610                                 up_write(&next->lgh_lock);
611                                 up_read(&cathandle->lgh_lock);
612                                 if (rc < 0)
613                                         GOTO(out, rc);
614                         } else {
615                                 rc = llog_declare_create(env, next, th);
616                                 llog_declare_write_rec(env, cathandle,
617                                                 &lirec->lid_hdr, -1, th);
618                         }
619                 }
620                 /* XXX: we hope for declarations made for existing llog
621                  *      this might be not correct with some backends
622                  *      where declarations are expected against specific
623                  *      object like ZFS with full debugging enabled */
624                 /*llog_declare_write_rec(env, next, rec, -1, th);*/
625         }
626 out:
627         RETURN(rc);
628 }
629 EXPORT_SYMBOL(llog_cat_declare_add_rec);
630
631 int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
632                  struct llog_rec_hdr *rec, struct llog_cookie *reccookie)
633 {
634         struct llog_ctxt        *ctxt;
635         struct dt_device        *dt;
636         struct thandle          *th = NULL;
637         int                      rc;
638
639         ctxt = cathandle->lgh_ctxt;
640         LASSERT(ctxt);
641         LASSERT(ctxt->loc_exp);
642
643         LASSERT(cathandle->lgh_obj != NULL);
644         dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
645
646         th = dt_trans_create(env, dt);
647         if (IS_ERR(th))
648                 RETURN(PTR_ERR(th));
649
650         rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
651         if (rc)
652                 GOTO(out_trans, rc);
653
654         rc = dt_trans_start_local(env, dt, th);
655         if (rc)
656                 GOTO(out_trans, rc);
657         rc = llog_cat_add_rec(env, cathandle, rec, reccookie, th);
658 out_trans:
659         dt_trans_stop(env, dt, th);
660         RETURN(rc);
661 }
662 EXPORT_SYMBOL(llog_cat_add);
663
664 /* For each cookie in the cookie array, we clear the log in-use bit and either:
665  * - the log is empty, so mark it free in the catalog header and delete it
666  * - the log is not empty, just write out the log header
667  *
668  * The cookies may be in different log files, so we need to get new logs
669  * each time.
670  *
671  * Assumes caller has already pushed us into the kernel context.
672  */
673 int llog_cat_cancel_records(const struct lu_env *env,
674                             struct llog_handle *cathandle, int count,
675                             struct llog_cookie *cookies)
676 {
677         int i, index, rc = 0, failed = 0;
678
679         ENTRY;
680
681         for (i = 0; i < count; i++, cookies++) {
682                 struct llog_handle      *loghandle;
683                 struct llog_logid       *lgl = &cookies->lgc_lgl;
684                 int                      lrc;
685
686                 rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
687                 if (rc) {
688                         CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
689                                cathandle->lgh_ctxt->loc_obd->obd_name,
690                                POSTID(&lgl->lgl_oi), rc);
691                         failed++;
692                         continue;
693                 }
694
695                 lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
696                 if (lrc == LLOG_DEL_PLAIN) { /* log has been destroyed */
697                         index = loghandle->u.phd.phd_cookie.lgc_index;
698                         rc = llog_cat_cleanup(env, cathandle, loghandle,
699                                               index);
700                 } else if (lrc == -ENOENT) {
701                         if (rc == 0) /* ENOENT shouldn't rewrite any error */
702                                 rc = lrc;
703                 } else if (lrc < 0) {
704                         failed++;
705                         rc = lrc;
706                 }
707                 llog_handle_put(loghandle);
708         }
709         if (rc)
710                 CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
711                        cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
712                        rc);
713
714         RETURN(rc);
715 }
716 EXPORT_SYMBOL(llog_cat_cancel_records);
717
718 static int llog_cat_process_cb(const struct lu_env *env,
719                                struct llog_handle *cat_llh,
720                                struct llog_rec_hdr *rec, void *data)
721 {
722         struct llog_process_data *d = data;
723         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
724         struct llog_handle *llh;
725         struct llog_log_hdr *hdr;
726         int rc;
727
728         ENTRY;
729         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
730                 CERROR("invalid record in catalog\n");
731                 RETURN(-EINVAL);
732         }
733         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
734                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
735                rec->lrh_index, POSTID(&cat_llh->lgh_id.lgl_oi));
736
737         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
738         if (rc) {
739                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
740                        cat_llh->lgh_ctxt->loc_obd->obd_name,
741                        POSTID(&lir->lid_id.lgl_oi), rc);
742                 if (rc == -ENOENT || rc == -ESTALE) {
743                         /* After a server crash, a stub of index
744                          * record in catlog could be kept, because
745                          * plain log destroy + catlog index record
746                          * deletion are not atomic. So we end up with
747                          * an index but no actual record. Destroy the
748                          * index and move on. */
749                         rc = llog_cat_cleanup(env, cat_llh, NULL,
750                                               rec->lrh_index);
751                 }
752
753                 RETURN(rc);
754         }
755
756         /* clean old empty llogs, do not consider current llog in use */
757         /* ignore remote (lgh_obj=NULL) llogs */
758         hdr = llh->lgh_hdr;
759         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
760             hdr->llh_count == 1 && cat_llh->lgh_obj != NULL &&
761             llh != cat_llh->u.chd.chd_current_log) {
762                 rc = llog_destroy(env, llh);
763                 if (rc)
764                         CERROR("%s: fail to destroy empty log: rc = %d\n",
765                                llh->lgh_ctxt->loc_obd->obd_name, rc);
766                 GOTO(out, rc = LLOG_DEL_PLAIN);
767         }
768
769         if (rec->lrh_index < d->lpd_startcat) {
770                 /* Skip processing of the logs until startcat */
771                 rc = 0;
772         } else if (d->lpd_startidx > 0) {
773                 struct llog_process_cat_data cd;
774
775                 cd.lpcd_first_idx = d->lpd_startidx;
776                 cd.lpcd_last_idx = 0;
777                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
778                                           &cd, false);
779                 /* Continue processing the next log from idx 0 */
780                 d->lpd_startidx = 0;
781         } else {
782                 rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
783                                           NULL, false);
784         }
785
786 out:
787         /* The empty plain log was destroyed while processing */
788         if (rc == LLOG_DEL_PLAIN)
789                 rc = llog_cat_cleanup(env, cat_llh, llh,
790                                       llh->u.phd.phd_cookie.lgc_index);
791         llog_handle_put(llh);
792
793         RETURN(rc);
794 }
795
796 int llog_cat_process_or_fork(const struct lu_env *env,
797                              struct llog_handle *cat_llh, llog_cb_t cat_cb,
798                              llog_cb_t cb, void *data, int startcat,
799                              int startidx, bool fork)
800 {
801         struct llog_process_data d;
802         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
803         int rc;
804         ENTRY;
805
806         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
807         d.lpd_data = data;
808         d.lpd_cb = cb;
809         d.lpd_startcat = startcat;
810         d.lpd_startidx = startidx;
811
812         if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
813             llh->llh_count > 1) {
814                 struct llog_process_cat_data cd;
815
816                 CWARN("catlog "DOSTID" crosses index zero\n",
817                       POSTID(&cat_llh->lgh_id.lgl_oi));
818
819                 cd.lpcd_first_idx = llh->llh_cat_idx;
820                 cd.lpcd_last_idx = 0;
821                 rc = llog_process_or_fork(env, cat_llh, cat_cb,
822                                           &d, &cd, fork);
823                 if (rc != 0)
824                         RETURN(rc);
825
826                 cd.lpcd_first_idx = 0;
827                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
828                 rc = llog_process_or_fork(env, cat_llh, cat_cb,
829                                           &d, &cd, fork);
830         } else {
831                 rc = llog_process_or_fork(env, cat_llh, cat_cb,
832                                           &d, NULL, fork);
833         }
834
835         RETURN(rc);
836 }
837
838 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
839                      llog_cb_t cb, void *data, int startcat, int startidx)
840 {
841         return llog_cat_process_or_fork(env, cat_llh, llog_cat_process_cb,
842                                         cb, data, startcat, startidx, false);
843 }
844 EXPORT_SYMBOL(llog_cat_process);
845
846 static int llog_cat_size_cb(const struct lu_env *env,
847                              struct llog_handle *cat_llh,
848                              struct llog_rec_hdr *rec, void *data)
849 {
850         struct llog_process_data *d = data;
851         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
852         struct llog_handle *llh;
853         int rc;
854         __u64 *cum_size = d->lpd_data;
855         __u64 size;
856
857         ENTRY;
858         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
859                 CERROR("%s: invalid record in catalog, rc = %d\n",
860                        cat_llh->lgh_ctxt->loc_obd->obd_name, -EINVAL);
861                 RETURN(-EINVAL);
862         }
863         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
864                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
865                rec->lrh_index, POSTID(&cat_llh->lgh_id.lgl_oi));
866
867         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
868         if (rc) {
869                 CWARN("%s: cannot find handle for llog "DOSTID": rc = %d\n",
870                       cat_llh->lgh_ctxt->loc_obd->obd_name,
871                       POSTID(&lir->lid_id.lgl_oi), rc);
872                 RETURN(0);
873         }
874         size = llog_size(env, llh);
875         *cum_size += size;
876
877         CDEBUG(D_INFO, "Add llog entry "DOSTID" size "LPU64"\n",
878                POSTID(&llh->lgh_id.lgl_oi), size);
879
880         llog_handle_put(llh);
881
882         RETURN(0);
883
884 }
885
886 __u64 llog_cat_size(const struct lu_env *env, struct llog_handle *cat_llh)
887 {
888         __u64 size = llog_size(env, cat_llh);
889
890         llog_cat_process_or_fork(env, cat_llh, llog_cat_size_cb,
891                                  NULL, &size, 0, 0, false);
892
893         return size;
894 }
895 EXPORT_SYMBOL(llog_cat_size);
896
897 static int llog_cat_reverse_process_cb(const struct lu_env *env,
898                                        struct llog_handle *cat_llh,
899                                        struct llog_rec_hdr *rec, void *data)
900 {
901         struct llog_process_data *d = data;
902         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
903         struct llog_handle *llh;
904         struct llog_log_hdr *hdr;
905         int rc;
906
907         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
908                 CERROR("invalid record in catalog\n");
909                 RETURN(-EINVAL);
910         }
911         CDEBUG(D_HA, "processing log "DOSTID":%x at index %u of catalog "
912                DOSTID"\n", POSTID(&lir->lid_id.lgl_oi), lir->lid_id.lgl_ogen,
913                le32_to_cpu(rec->lrh_index), POSTID(&cat_llh->lgh_id.lgl_oi));
914
915         rc = llog_cat_id2handle(env, cat_llh, &llh, &lir->lid_id);
916         if (rc) {
917                 CERROR("%s: cannot find handle for llog "DOSTID": %d\n",
918                        cat_llh->lgh_ctxt->loc_obd->obd_name,
919                        POSTID(&lir->lid_id.lgl_oi), rc);
920                 if (rc == -ENOENT || rc == -ESTALE) {
921                         /* After a server crash, a stub of index
922                          * record in catlog could be kept, because
923                          * plain log destroy + catlog index record
924                          * deletion are not atomic. So we end up with
925                          * an index but no actual record. Destroy the
926                          * index and move on. */
927                         rc = llog_cat_cleanup(env, cat_llh, NULL,
928                                               rec->lrh_index);
929                 }
930
931                 RETURN(rc);
932         }
933
934         /* clean old empty llogs, do not consider current llog in use */
935         hdr = llh->lgh_hdr;
936         if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
937             hdr->llh_count == 1 &&
938             llh != cat_llh->u.chd.chd_current_log) {
939                 rc = llog_destroy(env, llh);
940                 if (rc)
941                         CERROR("%s: fail to destroy empty log: rc = %d\n",
942                                llh->lgh_ctxt->loc_obd->obd_name, rc);
943                 GOTO(out, rc = LLOG_DEL_PLAIN);
944         }
945
946         rc = llog_reverse_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
947
948 out:
949         /* The empty plain was destroyed while processing */
950         if (rc == LLOG_DEL_PLAIN)
951                 rc = llog_cat_cleanup(env, cat_llh, llh,
952                                       llh->u.phd.phd_cookie.lgc_index);
953
954         llog_handle_put(llh);
955         RETURN(rc);
956 }
957
958 int llog_cat_reverse_process(const struct lu_env *env,
959                              struct llog_handle *cat_llh,
960                              llog_cb_t cb, void *data)
961 {
962         struct llog_process_data d;
963         struct llog_process_cat_data cd;
964         struct llog_log_hdr *llh = cat_llh->lgh_hdr;
965         int rc;
966         ENTRY;
967
968         LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
969         d.lpd_data = data;
970         d.lpd_cb = cb;
971
972         if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
973             llh->llh_count > 1) {
974                 CWARN("catalog "DOSTID" crosses index zero\n",
975                       POSTID(&cat_llh->lgh_id.lgl_oi));
976
977                 cd.lpcd_first_idx = 0;
978                 cd.lpcd_last_idx = cat_llh->lgh_last_idx;
979                 rc = llog_reverse_process(env, cat_llh,
980                                           llog_cat_reverse_process_cb,
981                                           &d, &cd);
982                 if (rc != 0)
983                         RETURN(rc);
984
985                 cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
986                 cd.lpcd_last_idx = 0;
987                 rc = llog_reverse_process(env, cat_llh,
988                                           llog_cat_reverse_process_cb,
989                                           &d, &cd);
990         } else {
991                 rc = llog_reverse_process(env, cat_llh,
992                                           llog_cat_reverse_process_cb,
993                                           &d, NULL);
994         }
995
996         RETURN(rc);
997 }
998 EXPORT_SYMBOL(llog_cat_reverse_process);
999
1000 static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
1001 {
1002         struct llog_log_hdr *llh = cathandle->lgh_hdr;
1003         int bitmap_size;
1004
1005         ENTRY;
1006
1007         bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
1008         /*
1009          * The llh_cat_idx equals to the first used index minus 1
1010          * so if we canceled the first index then llh_cat_idx
1011          * must be renewed.
1012          */
1013         if (llh->llh_cat_idx == (idx - 1)) {
1014                 llh->llh_cat_idx = idx;
1015
1016                 while (idx != cathandle->lgh_last_idx) {
1017                         idx = (idx + 1) % bitmap_size;
1018                         if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
1019                                 /* update llh_cat_idx for each unset bit,
1020                                  * expecting the next one is set */
1021                                 llh->llh_cat_idx = idx;
1022                         } else if (idx == 0) {
1023                                 /* skip header bit */
1024                                 llh->llh_cat_idx = 0;
1025                                 continue;
1026                         } else {
1027                                 /* the first index is found */
1028                                 break;
1029                         }
1030                 }
1031
1032                 CDEBUG(D_RPCTRACE, "Set catlog "DOSTID" first idx %u,"
1033                        " (last_idx %u)\n", POSTID(&cathandle->lgh_id.lgl_oi),
1034                        llh->llh_cat_idx, cathandle->lgh_last_idx);
1035         }
1036
1037         RETURN(0);
1038 }
1039
1040 /* Cleanup deleted plain llog traces from catalog */
1041 int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
1042                      struct llog_handle *loghandle, int index)
1043 {
1044         int rc;
1045
1046         LASSERT(index);
1047         if (loghandle != NULL) {
1048                 /* remove destroyed llog from catalog list and
1049                  * chd_current_log variable */
1050                 down_write(&cathandle->lgh_lock);
1051                 if (cathandle->u.chd.chd_current_log == loghandle)
1052                         cathandle->u.chd.chd_current_log = NULL;
1053                 list_del_init(&loghandle->u.phd.phd_entry);
1054                 up_write(&cathandle->lgh_lock);
1055                 LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index);
1056                 /* llog was opened and keep in a list, close it now */
1057                 llog_close(env, loghandle);
1058         }
1059
1060         /* do not attempt to cleanup on-disk llog if on client side */
1061         if (cathandle->lgh_obj == NULL)
1062                 return 0;
1063
1064         /* remove plain llog entry from catalog by index */
1065         llog_cat_set_first_idx(cathandle, index);
1066         rc = llog_cancel_rec(env, cathandle, index);
1067         if (rc == 0)
1068                 CDEBUG(D_HA, "cancel plain log at index"
1069                        " %u of catalog "DOSTID"\n",
1070                        index, POSTID(&cathandle->lgh_id.lgl_oi));
1071         return rc;
1072 }