Whamcloud - gitweb
LU-8888 clio: remove unused members from struct cl_thread_info
[fs/lustre-release.git] / lustre / obdclass / llog_test.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/llog_test.c
33  *
34  * Author: Phil Schwan <phil@clusterfs.com>
35  * Author: Mikhail Pershin <mike.pershin@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/module.h>
41 #include <linux/init.h>
42
43 #include <obd_class.h>
44 #include <lustre_fid.h>
45 #include <lustre_log.h>
46
47 /* This is slightly more than the number of records that can fit into a
48  * single llog file, because the llog_log_header takes up some of the
49  * space in the first block that cannot be used for the bitmap. */
50 #define LLOG_TEST_RECNUM  (LLOG_MIN_CHUNK_SIZE * 8)
51
52 static int llog_test_rand;
53 static struct obd_uuid uuid = { .uuid = "test_uuid" };
54 static struct llog_logid cat_logid;
55
56 struct llog_mini_rec {
57         struct llog_rec_hdr     lmr_hdr;
58         struct llog_rec_tail    lmr_tail;
59 } __attribute__((packed));
60
61 static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
62 {
63         int i;
64         int last_idx = 0;
65         int active_recs = 0;
66
67         for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
68                 if (ext2_test_bit(i, LLOG_HDR_BITMAP(llh->lgh_hdr))) {
69                         last_idx = i;
70                         active_recs++;
71                 }
72         }
73
74         /* check the llog is sane at first, llh_count and lgh_last_idx*/
75         if (llh->lgh_hdr->llh_count != active_recs) {
76                 CERROR("%s: handle->count is %d, but there are %d recs found\n",
77                        test, llh->lgh_hdr->llh_count, active_recs);
78                 RETURN(-ERANGE);
79         }
80
81         if (llh->lgh_last_idx != LLOG_HDR_TAIL(llh->lgh_hdr)->lrt_index ||
82             (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_CAT) &&
83              llh->lgh_last_idx < last_idx)) {
84                 CERROR("%s: lgh_last_idx is %d (%d in the header), last found %d\n",
85                        test, llh->lgh_last_idx,
86                        LLOG_HDR_TAIL(llh->lgh_hdr)->lrt_index, last_idx);
87                 RETURN(-ERANGE);
88         }
89
90         /* finally checks against expected value from the caller */
91         if (active_recs != num_recs) {
92                 CERROR("%s: expected %d active recs after write, found %d\n",
93                        test, num_recs, active_recs);
94                 RETURN(-ERANGE);
95         }
96
97         RETURN(0);
98 }
99
100 /* Test named-log create/open, close */
101 static int llog_test_1(const struct lu_env *env,
102                        struct obd_device *obd, char *name)
103 {
104         struct llog_handle      *llh;
105         struct llog_ctxt        *ctxt;
106         int rc;
107         int rc2;
108
109         ENTRY;
110
111         CWARN("1a: create a log with name: %s\n", name);
112         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
113         LASSERT(ctxt);
114
115         rc = llog_open_create(env, ctxt, &llh, NULL, name);
116         if (rc) {
117                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
118                 GOTO(out, rc);
119         }
120         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, &uuid);
121         if (rc) {
122                 CERROR("1a: can't init llog handle: %d\n", rc);
123                 GOTO(out_close, rc);
124         }
125
126         rc = verify_handle("1", llh, 1);
127
128         CWARN("1b: close newly-created log\n");
129 out_close:
130         rc2 = llog_close(env, llh);
131         if (rc2) {
132                 CERROR("1b: close log %s failed: %d\n", name, rc2);
133                 if (rc == 0)
134                         rc = rc2;
135         }
136 out:
137         llog_ctxt_put(ctxt);
138         RETURN(rc);
139 }
140
141 static int test_2_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
142                             struct llog_rec_hdr *rec, void *data)
143 {
144         return LLOG_DEL_RECORD;
145 }
146
147 /* Test named-log reopen; returns opened log on success */
148 static int llog_test_2(const struct lu_env *env, struct obd_device *obd,
149                        char *name, struct llog_handle **llh)
150 {
151         struct llog_ctxt        *ctxt;
152         struct llog_handle      *lgh;
153         struct llog_logid        logid;
154         int                      rc;
155         struct llog_mini_rec     lmr;
156
157         ENTRY;
158
159         CWARN("2a: re-open a log with name: %s\n", name);
160         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
161         LASSERT(ctxt);
162
163         rc = llog_open(env, ctxt, llh, NULL, name, LLOG_OPEN_EXISTS);
164         if (rc) {
165                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
166                 GOTO(out_put, rc);
167         }
168
169         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &uuid);
170         if (rc) {
171                 CERROR("2a: can't init llog handle: %d\n", rc);
172                 GOTO(out_close_llh, rc);
173         }
174
175         rc = verify_handle("2", *llh, 1);
176         if (rc)
177                 GOTO(out_close_llh, rc);
178
179         CWARN("2b: create a log without specified NAME & LOGID\n");
180         rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
181         if (rc) {
182                 CERROR("2b: create log failed\n");
183                 GOTO(out_close_llh, rc);
184         }
185         rc = llog_init_handle(env, lgh, LLOG_F_IS_PLAIN, &uuid);
186         if (rc) {
187                 CERROR("2b: can't init llog handle: %d\n", rc);
188                 GOTO(out_close, rc);
189         }
190
191         logid = lgh->lgh_id;
192
193         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
194         lmr.lmr_hdr.lrh_type = 0xf02f02;
195
196         /* Check llog header values are correct after record add/cancel */
197         CWARN("2b: write 1 llog records, check llh_count\n");
198         rc = llog_write(env, lgh, &lmr.lmr_hdr, LLOG_NEXT_IDX);
199         if (rc < 0)
200                 GOTO(out_close, rc);
201
202         /* in-memory values after record addition */
203         rc = verify_handle("2b", lgh, 2);
204         if (rc < 0)
205                 GOTO(out_close, rc);
206
207         /* re-open llog to read on-disk values */
208         llog_close(env, lgh);
209
210         CWARN("2c: re-open the log by LOGID and verify llh_count\n");
211         rc = llog_open(env, ctxt, &lgh, &logid, NULL, LLOG_OPEN_EXISTS);
212         if (rc < 0) {
213                 CERROR("2c: re-open log by LOGID failed\n");
214                 GOTO(out_close_llh, rc);
215         }
216
217         rc = llog_init_handle(env, lgh, LLOG_F_IS_PLAIN, &uuid);
218         if (rc < 0) {
219                 CERROR("2c: can't init llog handle: %d\n", rc);
220                 GOTO(out_close, rc);
221         }
222
223         /* check values just read from disk */
224         rc = verify_handle("2c", lgh, 2);
225         if (rc < 0)
226                 GOTO(out_close, rc);
227
228         rc = llog_process(env, lgh, test_2_cancel_cb, NULL, NULL);
229         if (rc < 0)
230                 GOTO(out_close, rc);
231
232         /* in-memory values */
233         rc = verify_handle("2c", lgh, 1);
234         if (rc < 0)
235                 GOTO(out_close, rc);
236
237         /* re-open llog to get on-disk values */
238         llog_close(env, lgh);
239
240         rc = llog_open(env, ctxt, &lgh, &logid, NULL, LLOG_OPEN_EXISTS);
241         if (rc) {
242                 CERROR("2c: re-open log by LOGID failed\n");
243                 GOTO(out_close_llh, rc);
244         }
245
246         rc = llog_init_handle(env, lgh, LLOG_F_IS_PLAIN, &uuid);
247         if (rc) {
248                 CERROR("2c: can't init llog handle: %d\n", rc);
249                 GOTO(out_close, rc);
250         }
251
252         /* on-disk values after llog re-open */
253         rc = verify_handle("2c", lgh, 1);
254         if (rc < 0)
255                 GOTO(out_close, rc);
256
257         CWARN("2d: destroy this log\n");
258         rc = llog_destroy(env, lgh);
259         if (rc)
260                 CERROR("2d: destroy log failed\n");
261 out_close:
262         llog_close(env, lgh);
263 out_close_llh:
264         if (rc)
265                 llog_close(env, *llh);
266 out_put:
267         llog_ctxt_put(ctxt);
268
269         RETURN(rc);
270 }
271
272 static int test_3_rec_num;
273 static off_t test_3_rec_off;
274 static int test_3_paddings;
275 static int test_3_start_idx;
276
277 /*
278  * Test 3 callback.
279  * - check lgh_cur_offset correctness
280  * - check record index consistency
281  * - modify each record in-place
282  * - add new record during *last_idx processing
283  */
284 static int test3_check_n_add_cb(const struct lu_env *env,
285                                 struct llog_handle *lgh,
286                                 struct llog_rec_hdr *rec, void *data)
287 {
288         struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
289         int *last_rec = data;
290         unsigned cur_idx = test_3_start_idx + test_3_rec_num;
291         int rc;
292
293         if (lgh->lgh_hdr->llh_flags & LLOG_F_IS_FIXSIZE) {
294                 LASSERT(lgh->lgh_hdr->llh_size > 0);
295                 if (lgh->lgh_cur_offset != lgh->lgh_hdr->llh_hdr.lrh_len +
296                                         (cur_idx - 1) * lgh->lgh_hdr->llh_size)
297                         CERROR("Wrong record offset in cur_off: %llu, should be %u\n",
298                                lgh->lgh_cur_offset,
299                                lgh->lgh_hdr->llh_hdr.lrh_len +
300                                (cur_idx - 1) * lgh->lgh_hdr->llh_size);
301         } else {
302                 size_t chunk_size = lgh->lgh_hdr->llh_hdr.lrh_len;
303
304                 /* For variable size records the start offset is unknown, trust
305                  * the first value and check others are consistent with it. */
306                 if (test_3_rec_off == 0)
307                         test_3_rec_off = lgh->lgh_cur_offset;
308
309                 if (lgh->lgh_cur_offset != test_3_rec_off) {
310                         __u64 tmp = lgh->lgh_cur_offset;
311
312                         /* there can be padding record */
313                         if ((do_div(tmp, chunk_size) == 0) &&
314                             (lgh->lgh_cur_offset - test_3_rec_off <
315                              rec->lrh_len + LLOG_MIN_REC_SIZE)) {
316                                 test_3_rec_off = lgh->lgh_cur_offset;
317                                 test_3_paddings++;
318                         } else {
319                                 CERROR("Wrong record offset in cur_off: %llu"
320                                        ", should be %lld (rec len %u)\n",
321                                        lgh->lgh_cur_offset,
322                                        (long long)test_3_rec_off,
323                                        rec->lrh_len);
324                         }
325                 }
326                 test_3_rec_off += rec->lrh_len;
327         }
328
329         cur_idx += test_3_paddings;
330         if (cur_idx != rec->lrh_index)
331                 CERROR("Record with wrong index was read: %u, expected %u\n",
332                        rec->lrh_index, cur_idx);
333
334         /* modify all records in place */
335         lgr->lgr_gen.conn_cnt = rec->lrh_index;
336         rc = llog_write(env, lgh, rec, rec->lrh_index);
337         if (rc < 0)
338                 CERROR("cb_test_3: cannot modify record while processing\n");
339
340         /* Add new record to the llog at *last_rec position one by one to
341          * check that last block is re-read during processing */
342         if (cur_idx == *last_rec || cur_idx == (*last_rec + 1)) {
343                 rc = llog_write(env, lgh, rec, LLOG_NEXT_IDX);
344                 if (rc < 0)
345                         CERROR("cb_test_3: cannot add new record while "
346                                "processing\n");
347         }
348         test_3_rec_num++;
349
350         return rc;
351 }
352
353 /* Check in-place modifications were done for all records*/
354 static int test3_check_cb(const struct lu_env *env, struct llog_handle *lgh,
355                           struct llog_rec_hdr *rec, void *data)
356 {
357         struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
358
359         if (lgr->lgr_gen.conn_cnt != rec->lrh_index) {
360                 CERROR("cb_test_3: record %u is not modified\n",
361                        rec->lrh_index);
362                 return -EINVAL;
363         }
364         test_3_rec_num++;
365         return 0;
366 }
367
368 static int llog_test3_process(const struct lu_env *env,
369                               struct llog_handle *lgh,
370                               llog_cb_t cb, int start)
371 {
372         struct llog_process_cat_data cd;
373         int last_idx; /* new record will be injected here */
374         int rc = 0;
375
376         CWARN("test3: processing records from index %d to the end\n",
377               start);
378         cd.lpcd_first_idx = start - 1;
379         cd.lpcd_last_idx = 0;
380         test_3_rec_num = test_3_paddings = 0;
381         last_idx = lgh->lgh_last_idx;
382         rc = llog_process(env, lgh, cb, &last_idx, &cd);
383         if (rc < 0)
384                 return rc;
385         CWARN("test3: total %u records processed with %u paddings\n",
386               test_3_rec_num, test_3_paddings);
387         return test_3_rec_num;
388 }
389
390 /* Test plain llog functionality */
391 static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
392                        struct llog_handle *llh)
393 {
394         char buf[128];
395         struct llog_rec_hdr *hdr = (void *)buf;
396         int rc, i;
397         int num_recs = 1; /* 1 for the header */
398         int expected;
399
400         ENTRY;
401
402         hdr->lrh_len = sizeof(struct llog_gen_rec);
403         hdr->lrh_type = LLOG_GEN_REC;
404         llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec);
405         llh->lgh_hdr->llh_flags |= LLOG_F_IS_FIXSIZE;
406
407         /* Fill the llog with 64-bytes records, use 1023 records,
408          * so last chunk will be partially full. Don't change this
409          * value until record size is changed.
410          */
411         CWARN("3a: write 1023 fixed-size llog records\n");
412         for (i = 0; i < 1023; i++) {
413                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
414                 if (rc < 0) {
415                         CERROR("3a: write 1023 records failed at #%d: %d\n",
416                                i + 1, rc);
417                         RETURN(rc);
418                 }
419                 num_recs++;
420         }
421
422         rc = verify_handle("3a", llh, num_recs);
423         if (rc)
424                 RETURN(rc);
425
426         /*
427          * Test fixed-size records processing:
428          * - search the needed index
429          * - go through all records from that index
430          * - check all indices are growing monotonically and exist
431          * - modify each record
432          *
433          * NB: test3_check_n_add adds two new records while processing
434          * after last record. There were 1023 records created so the last chunk
435          * misses exactly one record. Therefore one of new records will be
436          * the last in the current chunk and second causes the new chunk to be
437          * created.
438          */
439         test_3_rec_off = 0;
440         test_3_start_idx = 501;
441         expected = 525;
442         rc = llog_test3_process(env, llh, test3_check_n_add_cb,
443                                 test_3_start_idx);
444         if (rc < 0)
445                 RETURN(rc);
446
447         /* extra record is created during llog_process() */
448         if (rc != expected) {
449                 CERROR("3a: process total %d records but expect %d\n",
450                        rc, expected);
451                 RETURN(-ERANGE);
452         }
453
454         num_recs += 2;
455
456         /* test modification in place */
457         rc = llog_test3_process(env, llh, test3_check_cb, test_3_start_idx);
458         if (rc < 0)
459                 RETURN(rc);
460
461         if (rc != expected) {
462                 CERROR("3a: process total %d records but expect %d\n",
463                        rc, expected);
464                 RETURN(-ERANGE);
465         }
466
467         CWARN("3b: write 566 variable size llog records\n");
468
469         /* Drop llh_size to 0 to mark llog as variable-size and write
470          * header to make this change permanent. */
471         llh->lgh_hdr->llh_flags &= ~LLOG_F_IS_FIXSIZE;
472         llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX);
473
474         hdr->lrh_type = OBD_CFG_REC;
475
476         /* there are 1025 64-bytes records in llog already,
477          * the last chunk contains single record, i.e. 64 bytes.
478          * Each pair of variable size records is 200 bytes, so
479          * we will have the following distribution per chunks:
480          * block 1: 64 + 80(80/120) + 80 + 48(pad) = 81 iterations
481          * block 2: 80(120/80) + 120 + 72(pad) = 81 itereations
482          * block 3: 80(80/120) + 80 + 112(pad) = 81 iterations
483          * -- the same as block 2 again and so on.
484          * block 7: 80(80/120) = 80 iterations and 192 bytes remain
485          * Total 6 * 81 + 80 = 566 itereations.
486          * Callback will add another 120 bytes in the end of the last chunk
487          * and another 120 bytes will cause padding (72 bytes) plus 120
488          * bytes in the new block.
489          */
490         for (i = 0; i < 566; i++) {
491                 if ((i % 2) == 0)
492                         hdr->lrh_len = 80;
493                 else
494                         hdr->lrh_len = 120;
495
496                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
497                 if (rc < 0) {
498                         CERROR("3b: write 566 records failed at #%d: %d\n",
499                                i + 1, rc);
500                         RETURN(rc);
501                 }
502                 num_recs++;
503         }
504
505         rc = verify_handle("3b", llh, num_recs);
506         if (rc)
507                 RETURN(rc);
508
509         test_3_start_idx = 1026;
510         expected = 568;
511         rc = llog_test3_process(env, llh, test3_check_n_add_cb,
512                                 test_3_start_idx);
513         if (rc < 0)
514                 RETURN(rc);
515
516         if (rc != expected) {
517                 CERROR("3b: process total %d records but expect %d\n",
518                        rc, expected);
519                 RETURN(-ERANGE);
520         }
521
522         num_recs += 2;
523
524         /* test modification in place */
525         rc = llog_test3_process(env, llh, test3_check_cb, test_3_start_idx);
526         if (rc < 0)
527                 RETURN(rc);
528
529         if (rc != expected) {
530                 CERROR("3b: process total %d records but expect %d\n",
531                        rc, expected);
532                 RETURN(-ERANGE);
533         }
534
535         CWARN("3c: write records with variable size until BITMAP_SIZE, "
536               "return -ENOSPC\n");
537         while (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
538                 if ((num_recs % 2) == 0)
539                         hdr->lrh_len = 80;
540                 else
541                         hdr->lrh_len = 128;
542
543                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
544                 if (rc == -ENOSPC) {
545                         break;
546                 } else if (rc < 0) {
547                         CERROR("3c: write recs failed at #%d: %d\n",
548                                num_recs, rc);
549                         RETURN(rc);
550                 }
551                 num_recs++;
552         }
553
554         if (rc != -ENOSPC) {
555                 CWARN("3c: write record more than BITMAP size!\n");
556                 RETURN(-EINVAL);
557         }
558         CWARN("3c: wrote %d more records before end of llog is reached\n",
559               num_recs);
560
561         rc = verify_handle("3c", llh, num_recs);
562
563         RETURN(rc);
564 }
565
566 /* Test catalogue additions */
567 static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
568 {
569         struct llog_handle      *cath;
570         char                     name[10];
571         int                      rc, rc2, i, buflen;
572         struct llog_mini_rec     lmr;
573         struct llog_cookie       cookie;
574         struct llog_ctxt        *ctxt;
575         int                      num_recs = 0;
576         char                    *buf;
577         struct llog_rec_hdr     *rec;
578
579         ENTRY;
580
581         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
582         LASSERT(ctxt);
583
584         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
585         lmr.lmr_hdr.lrh_type = 0xf00f00;
586
587         sprintf(name, "%x", llog_test_rand + 1);
588         CWARN("4a: create a catalog log with name: %s\n", name);
589         rc = llog_open_create(env, ctxt, &cath, NULL, name);
590         if (rc) {
591                 CERROR("4a: llog_create with name %s failed: %d\n", name, rc);
592                 GOTO(ctxt_release, rc);
593         }
594         rc = llog_init_handle(env, cath, LLOG_F_IS_CAT, &uuid);
595         if (rc) {
596                 CERROR("4a: can't init llog handle: %d\n", rc);
597                 GOTO(out, rc);
598         }
599
600         num_recs++;
601         cat_logid = cath->lgh_id;
602
603         CWARN("4b: write 1 record into the catalog\n");
604         rc = llog_cat_add(env, cath, &lmr.lmr_hdr, &cookie);
605         if (rc != 1) {
606                 CERROR("4b: write 1 catalog record failed at: %d\n", rc);
607                 GOTO(out, rc);
608         }
609         num_recs++;
610         rc = verify_handle("4b", cath, 2);
611         if (rc)
612                 GOTO(out, rc);
613
614         rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs);
615         if (rc)
616                 GOTO(out, rc);
617
618         CWARN("4c: cancel 1 log record\n");
619         rc = llog_cat_cancel_records(env, cath, 1, &cookie);
620         if (rc) {
621                 CERROR("4c: cancel 1 catalog based record failed: %d\n", rc);
622                 GOTO(out, rc);
623         }
624         num_recs--;
625
626         rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs);
627         if (rc)
628                 GOTO(out, rc);
629
630         CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM);
631         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
632                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
633                 if (rc) {
634                         CERROR("4d: write %d records failed at #%d: %d\n",
635                                LLOG_TEST_RECNUM, i + 1, rc);
636                         GOTO(out, rc);
637                 }
638                 num_recs++;
639         }
640
641         /* make sure new plain llog appears */
642         rc = verify_handle("4d", cath, 3);
643         if (rc)
644                 GOTO(out, rc);
645
646         CWARN("4e: add 5 large records, one record per block\n");
647         buflen = LLOG_MIN_CHUNK_SIZE;
648         OBD_ALLOC(buf, buflen);
649         if (buf == NULL)
650                 GOTO(out, rc = -ENOMEM);
651         for (i = 0; i < 5; i++) {
652                 rec = (void *)buf;
653                 rec->lrh_len = buflen;
654                 rec->lrh_type = OBD_CFG_REC;
655                 rc = llog_cat_add(env, cath, rec, NULL);
656                 if (rc) {
657                         CERROR("4e: write 5 records failed at #%d: %d\n",
658                                i + 1, rc);
659                         GOTO(out_free, rc);
660                 }
661                 num_recs++;
662         }
663 out_free:
664         OBD_FREE(buf, buflen);
665 out:
666         CWARN("4f: put newly-created catalog\n");
667         rc2 = llog_cat_close(env, cath);
668         if (rc2) {
669                 CERROR("4: close log %s failed: %d\n", name, rc2);
670                 if (rc == 0)
671                         rc = rc2;
672         }
673 ctxt_release:
674         llog_ctxt_put(ctxt);
675         RETURN(rc);
676 }
677
678 static int cat_counter;
679
680 static int cat_print_cb(const struct lu_env *env, struct llog_handle *llh,
681                         struct llog_rec_hdr *rec, void *data)
682 {
683         struct llog_logid_rec   *lir = (struct llog_logid_rec *)rec;
684         struct lu_fid            fid = {0};
685
686         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
687                 CERROR("invalid record in catalog\n");
688                 RETURN(-EINVAL);
689         }
690
691         logid_to_fid(&lir->lid_id, &fid);
692
693         CWARN("seeing record at index %d - "DFID" in log "DFID"\n",
694               rec->lrh_index, PFID(&fid),
695               PFID(lu_object_fid(&llh->lgh_obj->do_lu)));
696
697         cat_counter++;
698
699         RETURN(0);
700 }
701
702 static int plain_counter;
703
704 static int plain_print_cb(const struct lu_env *env, struct llog_handle *llh,
705                           struct llog_rec_hdr *rec, void *data)
706 {
707         struct lu_fid fid = {0};
708
709         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
710                 CERROR("log is not plain\n");
711                 RETURN(-EINVAL);
712         }
713
714         logid_to_fid(&llh->lgh_id, &fid);
715
716         CDEBUG(D_INFO, "seeing record at index %d in log "DFID"\n",
717                rec->lrh_index, PFID(&fid));
718
719         plain_counter++;
720
721         RETURN(0);
722 }
723
724 static int cancel_count;
725
726 static int llog_cancel_rec_cb(const struct lu_env *env,
727                               struct llog_handle *llh,
728                               struct llog_rec_hdr *rec, void *data)
729 {
730         struct llog_cookie cookie;
731
732         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
733                 CERROR("log is not plain\n");
734                 RETURN(-EINVAL);
735         }
736
737         cookie.lgc_lgl = llh->lgh_id;
738         cookie.lgc_index = rec->lrh_index;
739
740         llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie);
741         cancel_count++;
742         if (cancel_count == LLOG_TEST_RECNUM)
743                 RETURN(-LLOG_EEMPTY);
744         RETURN(0);
745 }
746
747 /* Test log and catalogue processing */
748 static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
749 {
750         struct llog_handle      *llh = NULL;
751         char                     name[10];
752         int                      rc, rc2;
753         struct llog_mini_rec     lmr;
754         struct llog_ctxt        *ctxt;
755
756         ENTRY;
757
758         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
759         LASSERT(ctxt);
760
761         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
762         lmr.lmr_hdr.lrh_type = 0xf00f00;
763
764         CWARN("5a: re-open catalog by id\n");
765         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
766         if (rc) {
767                 CERROR("5a: llog_create with logid failed: %d\n", rc);
768                 GOTO(out_put, rc);
769         }
770
771         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
772         if (rc) {
773                 CERROR("5a: can't init llog handle: %d\n", rc);
774                 GOTO(out, rc);
775         }
776
777         CWARN("5b: print the catalog entries.. we expect 2\n");
778         cat_counter = 0;
779         rc = llog_process(env, llh, cat_print_cb, "test 5", NULL);
780         if (rc) {
781                 CERROR("5b: process with cat_print_cb failed: %d\n", rc);
782                 GOTO(out, rc);
783         }
784         if (cat_counter != 2) {
785                 CERROR("5b: %d entries in catalog\n", cat_counter);
786                 GOTO(out, rc = -EINVAL);
787         }
788
789         CWARN("5c: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
790         cancel_count = 0;
791         rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0);
792         if (rc != -LLOG_EEMPTY) {
793                 CERROR("5c: process with llog_cancel_rec_cb failed: %d\n", rc);
794                 GOTO(out, rc);
795         }
796
797         CWARN("5c: print the catalog entries.. we expect 1\n");
798         cat_counter = 0;
799         rc = llog_process(env, llh, cat_print_cb, "test 5", NULL);
800         if (rc) {
801                 CERROR("5c: process with cat_print_cb failed: %d\n", rc);
802                 GOTO(out, rc);
803         }
804         if (cat_counter != 1) {
805                 CERROR("5c: %d entries in catalog\n", cat_counter);
806                 GOTO(out, rc = -EINVAL);
807         }
808
809         CWARN("5d: add 1 record to the log with many canceled empty pages\n");
810         rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
811         if (rc) {
812                 CERROR("5d: add record to the log with many canceled empty "
813                        "pages failed\n");
814                 GOTO(out, rc);
815         }
816
817         CWARN("5e: print plain log entries.. expect 6\n");
818         plain_counter = 0;
819         rc = llog_cat_process(env, llh, plain_print_cb, "foobar", 0, 0);
820         if (rc) {
821                 CERROR("5e: process with plain_print_cb failed: %d\n", rc);
822                 GOTO(out, rc);
823         }
824         if (plain_counter != 6) {
825                 CERROR("5e: found %d records\n", plain_counter);
826                 GOTO(out, rc = -EINVAL);
827         }
828
829         CWARN("5f: print plain log entries reversely.. expect 6\n");
830         plain_counter = 0;
831         rc = llog_cat_reverse_process(env, llh, plain_print_cb, "foobar");
832         if (rc) {
833                 CERROR("5f: reversely process with plain_print_cb failed: "
834                        "%d\n", rc);
835                 GOTO(out, rc);
836         }
837         if (plain_counter != 6) {
838                 CERROR("5f: found %d records\n", plain_counter);
839                 GOTO(out, rc = -EINVAL);
840         }
841
842 out:
843         CWARN("5g: close re-opened catalog\n");
844         rc2 = llog_cat_close(env, llh);
845         if (rc2) {
846                 CERROR("5g: close log %s failed: %d\n", name, rc2);
847                 if (rc == 0)
848                         rc = rc2;
849         }
850 out_put:
851         llog_ctxt_put(ctxt);
852
853         RETURN(rc);
854 }
855
856 /* Test client api; open log by name and process */
857 static int llog_test_6(const struct lu_env *env, struct obd_device *obd,
858                        char *name)
859 {
860         struct obd_device       *mgc_obd;
861         struct llog_ctxt        *ctxt;
862         struct obd_uuid         *mgs_uuid;
863         struct obd_export       *exp;
864         struct obd_uuid          uuid = { "LLOG_TEST6_UUID" };
865         struct llog_handle      *llh = NULL;
866         struct llog_ctxt        *nctxt;
867         int                      rc, rc2;
868
869         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
870         LASSERT(ctxt);
871         mgs_uuid = &ctxt->loc_exp->exp_obd->obd_uuid;
872
873         CWARN("6a: re-open log %s using client API\n", name);
874         mgc_obd = class_find_client_obd(mgs_uuid, LUSTRE_MGC_NAME, NULL);
875         if (mgc_obd == NULL) {
876                 CERROR("6a: no MGC devices connected to %s found.\n",
877                        mgs_uuid->uuid);
878                 GOTO(ctxt_release, rc = -ENOENT);
879         }
880
881         rc = obd_connect(NULL, &exp, mgc_obd, &uuid,
882                          NULL /* obd_connect_data */, NULL);
883         if (rc != -EALREADY) {
884                 CERROR("6a: connect on connected MGC (%s) failed to return"
885                        " -EALREADY\n", mgc_obd->obd_name);
886                 if (rc == 0)
887                         obd_disconnect(exp);
888                 GOTO(ctxt_release, rc = -EINVAL);
889         }
890
891         nctxt = llog_get_context(mgc_obd, LLOG_CONFIG_REPL_CTXT);
892         rc = llog_open(env, nctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
893         if (rc) {
894                 CERROR("6a: llog_open failed %d\n", rc);
895                 GOTO(nctxt_put, rc);
896         }
897
898         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
899         if (rc) {
900                 CERROR("6a: llog_init_handle failed %d\n", rc);
901                 GOTO(parse_out, rc);
902         }
903
904         plain_counter = 1; /* llog header is first record */
905         CWARN("6b: process log %s using client API\n", name);
906         rc = llog_process(env, llh, plain_print_cb, NULL, NULL);
907         if (rc)
908                 CERROR("6b: llog_process failed %d\n", rc);
909         CWARN("6b: processed %d records\n", plain_counter);
910
911         rc = verify_handle("6b", llh, plain_counter);
912         if (rc)
913                 GOTO(parse_out, rc);
914
915         plain_counter = 1; /* llog header is first record */
916         CWARN("6c: process log %s reversely using client API\n", name);
917         rc = llog_reverse_process(env, llh, plain_print_cb, NULL, NULL);
918         if (rc)
919                 CERROR("6c: llog_reverse_process failed %d\n", rc);
920         CWARN("6c: processed %d records\n", plain_counter);
921
922         rc = verify_handle("6c", llh, plain_counter);
923         if (rc)
924                 GOTO(parse_out, rc);
925
926 parse_out:
927         rc2 = llog_close(env, llh);
928         if (rc2) {
929                 CERROR("6: llog_close failed: rc = %d\n", rc2);
930                 if (rc == 0)
931                         rc = rc2;
932         }
933 nctxt_put:
934         llog_ctxt_put(nctxt);
935 ctxt_release:
936         llog_ctxt_put(ctxt);
937         RETURN(rc);
938 }
939
940 static union {
941         struct llog_rec_hdr             lrh;   /* common header */
942         struct llog_logid_rec           llr;   /* LLOG_LOGID_MAGIC */
943         struct llog_unlink64_rec        lur;   /* MDS_UNLINK64_REC */
944         struct llog_setattr64_rec       lsr64; /* MDS_SETATTR64_REC */
945         struct llog_size_change_rec     lscr;  /* OST_SZ_REC */
946         struct llog_changelog_rec       lcr;   /* CHANGELOG_REC */
947         struct llog_changelog_user_rec  lcur;  /* CHANGELOG_USER_REC */
948         struct llog_gen_rec             lgr;   /* LLOG_GEN_REC */
949 } llog_records;
950
951 static int test_7_print_cb(const struct lu_env *env, struct llog_handle *llh,
952                            struct llog_rec_hdr *rec, void *data)
953 {
954         struct lu_fid fid = {0};
955
956         logid_to_fid(&llh->lgh_id, &fid);
957
958         CDEBUG(D_OTHER, "record type %#x at index %d in log "DFID"\n",
959                rec->lrh_type, rec->lrh_index, PFID(&fid));
960
961         plain_counter++;
962         return 0;
963 }
964
965 static int test_7_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
966                             struct llog_rec_hdr *rec, void *data)
967 {
968         plain_counter++;
969         /* test LLOG_DEL_RECORD is working */
970         return LLOG_DEL_RECORD;
971 }
972
973 static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
974 {
975         struct llog_handle      *llh;
976         int                      rc = 0, i, process_count;
977         int                      num_recs = 0;
978
979         ENTRY;
980
981         rc = llog_open_create(env, ctxt, &llh, NULL, NULL);
982         if (rc) {
983                 CERROR("7_sub: create log failed\n");
984                 RETURN(rc);
985         }
986
987         rc = llog_init_handle(env, llh,
988                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
989                               &uuid);
990         if (rc) {
991                 CERROR("7_sub: can't init llog handle: %d\n", rc);
992                 GOTO(out_close, rc);
993         }
994         for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
995                 rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX);
996                 if (rc == -ENOSPC) {
997                         break;
998                 } else if (rc < 0) {
999                         CERROR("7_sub: write recs failed at #%d: %d\n",
1000                                i + 1, rc);
1001                         GOTO(out_close, rc);
1002                 }
1003                 num_recs++;
1004         }
1005         if (rc != -ENOSPC) {
1006                 CWARN("7_sub: write record more than BITMAP size!\n");
1007                 GOTO(out_close, rc = -EINVAL);
1008         }
1009
1010         rc = verify_handle("7_sub", llh, num_recs + 1);
1011         if (rc) {
1012                 CERROR("7_sub: verify handle failed: %d\n", rc);
1013                 GOTO(out_close, rc);
1014         }
1015         if (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1)
1016                 CWARN("7_sub: records are not aligned, written %d from %u\n",
1017                       num_recs, LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
1018
1019         plain_counter = 0;
1020         rc = llog_process(env, llh, test_7_print_cb, "test 7", NULL);
1021         if (rc) {
1022                 CERROR("7_sub: llog process failed: %d\n", rc);
1023                 GOTO(out_close, rc);
1024         }
1025         process_count = plain_counter;
1026         if (process_count != num_recs) {
1027                 CERROR("7_sub: processed %d records from %d total\n",
1028                        process_count, num_recs);
1029                 GOTO(out_close, rc = -EINVAL);
1030         }
1031
1032         plain_counter = 0;
1033         rc = llog_reverse_process(env, llh, test_7_cancel_cb, "test 7", NULL);
1034         if (rc && rc != LLOG_DEL_PLAIN) {
1035                 CERROR("7_sub: reverse llog process failed: %d\n", rc);
1036                 GOTO(out_close, rc);
1037         }
1038         if (process_count != plain_counter) {
1039                 CERROR("7_sub: Reverse/direct processing found different"
1040                        "number of records: %d/%d\n",
1041                        plain_counter, process_count);
1042                 GOTO(out_close, rc = -EINVAL);
1043         }
1044         if (llog_exist(llh)) {
1045                 CERROR("7_sub: llog exists but should be zapped\n");
1046                 GOTO(out_close, rc = -EEXIST);
1047         }
1048
1049         rc = verify_handle("7_sub", llh, 1);
1050 out_close:
1051         if (rc)
1052                 llog_destroy(env, llh);
1053         llog_close(env, llh);
1054         RETURN(rc);
1055 }
1056
1057 /* Test all llog records writing and processing */
1058 static int llog_test_7(const struct lu_env *env, struct obd_device *obd)
1059 {
1060         struct llog_ctxt        *ctxt;
1061         int                      rc;
1062
1063         ENTRY;
1064
1065         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1066
1067         CWARN("7a: test llog_logid_rec\n");
1068         llog_records.llr.lid_hdr.lrh_len = sizeof(llog_records.llr);
1069         llog_records.llr.lid_tail.lrt_len = sizeof(llog_records.llr);
1070         llog_records.llr.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
1071
1072         rc = llog_test_7_sub(env, ctxt);
1073         if (rc) {
1074                 CERROR("7a: llog_logid_rec test failed\n");
1075                 GOTO(out, rc);
1076         }
1077
1078         CWARN("7b: test llog_unlink64_rec\n");
1079         llog_records.lur.lur_hdr.lrh_len = sizeof(llog_records.lur);
1080         llog_records.lur.lur_tail.lrt_len = sizeof(llog_records.lur);
1081         llog_records.lur.lur_hdr.lrh_type = MDS_UNLINK64_REC;
1082
1083         rc = llog_test_7_sub(env, ctxt);
1084         if (rc) {
1085                 CERROR("7b: llog_unlink_rec test failed\n");
1086                 GOTO(out, rc);
1087         }
1088
1089         CWARN("7c: test llog_setattr64_rec\n");
1090         llog_records.lsr64.lsr_hdr.lrh_len = sizeof(llog_records.lsr64);
1091         llog_records.lsr64.lsr_tail.lrt_len = sizeof(llog_records.lsr64);
1092         llog_records.lsr64.lsr_hdr.lrh_type = MDS_SETATTR64_REC;
1093
1094         rc = llog_test_7_sub(env, ctxt);
1095         if (rc) {
1096                 CERROR("7c: llog_setattr64_rec test failed\n");
1097                 GOTO(out, rc);
1098         }
1099
1100         CWARN("7d: test llog_size_change_rec\n");
1101         llog_records.lscr.lsc_hdr.lrh_len = sizeof(llog_records.lscr);
1102         llog_records.lscr.lsc_tail.lrt_len = sizeof(llog_records.lscr);
1103         llog_records.lscr.lsc_hdr.lrh_type = OST_SZ_REC;
1104
1105         rc = llog_test_7_sub(env, ctxt);
1106         if (rc) {
1107                 CERROR("7d: llog_size_change_rec test failed\n");
1108                 GOTO(out, rc);
1109         }
1110
1111         CWARN("7e: test llog_changelog_rec\n");
1112         /* Direct access to cr_do_not_use: peculiar case for this test */
1113         llog_records.lcr.cr_hdr.lrh_len = sizeof(llog_records.lcr);
1114         llog_records.lcr.cr_do_not_use.lrt_len = sizeof(llog_records.lcr);
1115         llog_records.lcr.cr_hdr.lrh_type = CHANGELOG_REC;
1116
1117         rc = llog_test_7_sub(env, ctxt);
1118         if (rc) {
1119                 CERROR("7e: llog_changelog_rec test failed\n");
1120                 GOTO(out, rc);
1121         }
1122
1123         CWARN("7f: test llog_changelog_user_rec\n");
1124         llog_records.lcur.cur_hdr.lrh_len = sizeof(llog_records.lcur);
1125         llog_records.lcur.cur_tail.lrt_len = sizeof(llog_records.lcur);
1126         llog_records.lcur.cur_hdr.lrh_type = CHANGELOG_USER_REC;
1127
1128         rc = llog_test_7_sub(env, ctxt);
1129         if (rc) {
1130                 CERROR("7f: llog_changelog_user_rec test failed\n");
1131                 GOTO(out, rc);
1132         }
1133
1134         CWARN("7g: test llog_gen_rec\n");
1135         llog_records.lgr.lgr_hdr.lrh_len = sizeof(llog_records.lgr);
1136         llog_records.lgr.lgr_tail.lrt_len = sizeof(llog_records.lgr);
1137         llog_records.lgr.lgr_hdr.lrh_type = LLOG_GEN_REC;
1138
1139         rc = llog_test_7_sub(env, ctxt);
1140         if (rc) {
1141                 CERROR("7g: llog_size_change_rec test failed\n");
1142                 GOTO(out, rc);
1143         }
1144 out:
1145         llog_ctxt_put(ctxt);
1146         RETURN(rc);
1147 }
1148
1149 static int llog_truncate(const struct lu_env *env, struct dt_object *o)
1150 {
1151         struct lu_attr           la;
1152         struct thandle          *th;
1153         struct dt_device        *d;
1154         int                      rc;
1155         ENTRY;
1156
1157         LASSERT(o);
1158         d = lu2dt_dev(o->do_lu.lo_dev);
1159         LASSERT(d);
1160
1161         rc = dt_attr_get(env, o, &la);
1162         if (rc)
1163                 RETURN(rc);
1164
1165         CDEBUG(D_OTHER, "original size %llu\n", la.la_size);
1166         rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
1167         if (la.la_size < rc) {
1168                 CERROR("too small llog: %llu\n", la.la_size);
1169                 RETURN(0);
1170         }
1171
1172         /* drop 2 records */
1173         la.la_size = la.la_size - (sizeof(struct llog_mini_rec) * 2);
1174         la.la_valid = LA_SIZE;
1175
1176         th = dt_trans_create(env, d);
1177         if (IS_ERR(th))
1178                 RETURN(PTR_ERR(th));
1179
1180         rc = dt_declare_attr_set(env, o, &la, th);
1181         if (rc)
1182                 GOTO(stop, rc);
1183
1184         rc = dt_declare_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
1185
1186         rc = dt_trans_start_local(env, d, th);
1187         if (rc)
1188                 GOTO(stop, rc);
1189
1190         rc = dt_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
1191         if (rc)
1192                 GOTO(stop, rc);
1193
1194         rc = dt_attr_set(env, o, &la, th);
1195         if (rc)
1196                 GOTO(stop, rc);
1197
1198 stop:
1199         dt_trans_stop(env, d, th);
1200
1201         RETURN(rc);
1202 }
1203
1204 static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
1205                           struct llog_rec_hdr *rec, void *data)
1206 {
1207         plain_counter++;
1208         return 0;
1209 }
1210
1211 static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
1212 {
1213         struct llog_handle      *llh = NULL;
1214         char                     name[10];
1215         int                      rc, rc2, i;
1216         int                      orig_counter;
1217         struct llog_mini_rec     lmr;
1218         struct llog_ctxt        *ctxt;
1219         struct dt_object        *obj = NULL;
1220
1221         ENTRY;
1222
1223         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1224         LASSERT(ctxt);
1225
1226         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
1227         lmr.lmr_hdr.lrh_type = 0xf00f00;
1228
1229         CWARN("8a: fill the first plain llog\n");
1230         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1231         if (rc) {
1232                 CERROR("8a: llog_create with logid failed: %d\n", rc);
1233                 GOTO(out_put, rc);
1234         }
1235
1236         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1237         if (rc) {
1238                 CERROR("8a: can't init llog handle: %d\n", rc);
1239                 GOTO(out, rc);
1240         }
1241
1242         plain_counter = 0;
1243         rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
1244         if (rc != 0) {
1245                 CERROR("5a: process with test_8_cb failed: %d\n", rc);
1246                 GOTO(out, rc);
1247         }
1248         orig_counter = plain_counter;
1249
1250         for (i = 0; i < 100; i++) {
1251                 rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
1252                 if (rc) {
1253                         CERROR("5a: add record failed\n");
1254                         GOTO(out, rc);
1255                 }
1256         }
1257
1258         /* grab the current plain llog, we'll corrupt it later */
1259         obj = llh->u.chd.chd_current_log->lgh_obj;
1260         LASSERT(obj);
1261         lu_object_get(&obj->do_lu);
1262         CWARN("8a: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu)));
1263
1264         rc2 = llog_cat_close(env, llh);
1265         if (rc2) {
1266                 CERROR("8a: close log %s failed: %d\n", name, rc2);
1267                 if (rc == 0)
1268                         rc = rc2;
1269                 GOTO(out_put, rc);
1270         }
1271
1272         CWARN("8b: fill the second plain llog\n");
1273         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1274         if (rc) {
1275                 CERROR("8b: llog_create with logid failed: %d\n", rc);
1276                 GOTO(out_put, rc);
1277         }
1278
1279         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1280         if (rc) {
1281                 CERROR("8b: can't init llog handle: %d\n", rc);
1282                 GOTO(out, rc);
1283         }
1284
1285         for (i = 0; i < 100; i++) {
1286                 rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
1287                 if (rc) {
1288                         CERROR("8b: add record failed\n");
1289                         GOTO(out, rc);
1290                 }
1291         }
1292         CWARN("8b: second llog "DFID"\n",
1293                 PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
1294
1295         rc2 = llog_cat_close(env, llh);
1296         if (rc2) {
1297                 CERROR("8b: close log %s failed: %d\n", name, rc2);
1298                 if (rc == 0)
1299                         rc = rc2;
1300                 GOTO(out_put, rc);
1301         }
1302
1303         CWARN("8c: drop two records from the first plain llog\n");
1304         llog_truncate(env, obj);
1305
1306         CWARN("8d: count survived records\n");
1307         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1308         if (rc) {
1309                 CERROR("8d: llog_create with logid failed: %d\n", rc);
1310                 GOTO(out_put, rc);
1311         }
1312
1313         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1314         if (rc) {
1315                 CERROR("8d: can't init llog handle: %d\n", rc);
1316                 GOTO(out, rc);
1317         }
1318
1319         plain_counter = 0;
1320         rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
1321         if (rc != 0) {
1322                 CERROR("8d: process with test_8_cb failed: %d\n", rc);
1323                 GOTO(out, rc);
1324         }
1325
1326         if (orig_counter + 200 - 2 != plain_counter) {
1327                 CERROR("found %d records (expected %d)\n", plain_counter,
1328                        orig_counter + 200 - 2);
1329                 rc = -EIO;
1330         }
1331
1332 out:
1333         CWARN("8d: close re-opened catalog\n");
1334         rc2 = llog_cat_close(env, llh);
1335         if (rc2) {
1336                 CERROR("8d: close log %s failed: %d\n", name, rc2);
1337                 if (rc == 0)
1338                         rc = rc2;
1339         }
1340 out_put:
1341         llog_ctxt_put(ctxt);
1342
1343         if (obj != NULL)
1344                 lu_object_put(env, &obj->do_lu);
1345
1346         RETURN(rc);
1347 }
1348
1349 static int llog_test_9_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
1350 {
1351         struct llog_handle      *llh;
1352         struct lu_fid            fid;
1353         int                      rc = 0;
1354
1355         ENTRY;
1356
1357         rc = llog_open_create(env, ctxt, &llh, NULL, NULL);
1358         if (rc != 0) {
1359                 CERROR("9_sub: create log failed\n");
1360                 RETURN(rc);
1361         }
1362
1363         rc = llog_init_handle(env, llh,
1364                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
1365                               &uuid);
1366         if (rc != 0) {
1367                 CERROR("9_sub: can't init llog handle: %d\n", rc);
1368                 GOTO(out_close, rc);
1369         }
1370
1371         logid_to_fid(&llh->lgh_id, &fid);
1372         fid_to_logid(&fid, &llog_records.llr.lid_id);
1373         rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX);
1374         if (rc < 0) {
1375                 CERROR("9_sub: write recs failed at #1: %d\n", rc);
1376                 GOTO(out_close, rc);
1377         }
1378         CWARN("9_sub: record type %x in log "DFID_NOBRACE"\n",
1379               llog_records.lrh.lrh_type, PFID(&fid));
1380 out_close:
1381         llog_close(env, llh);
1382         RETURN(rc);
1383 }
1384
1385 /* Prepare different types of llog records for llog_reader test*/
1386 static int llog_test_9(const struct lu_env *env, struct obd_device *obd)
1387 {
1388         struct llog_ctxt        *ctxt;
1389         int                      rc;
1390
1391         ENTRY;
1392
1393         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1394
1395         CWARN("9a: test llog_logid_rec\n");
1396         llog_records.llr.lid_hdr.lrh_len = sizeof(llog_records.llr);
1397         llog_records.llr.lid_tail.lrt_len = sizeof(llog_records.llr);
1398         llog_records.llr.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
1399
1400         rc = llog_test_9_sub(env, ctxt);
1401         if (rc != 0) {
1402                 CERROR("9a: llog_logid_rec test failed\n");
1403                 GOTO(out, rc);
1404         }
1405
1406         CWARN("9b: test llog_obd_cfg_rec\n");
1407         llog_records.lscr.lsc_hdr.lrh_len = sizeof(llog_records.lscr);
1408         llog_records.lscr.lsc_tail.lrt_len = sizeof(llog_records.lscr);
1409         llog_records.lscr.lsc_hdr.lrh_type = OBD_CFG_REC;
1410
1411         rc = llog_test_9_sub(env, ctxt);
1412         if (rc != 0) {
1413                 CERROR("9b: llog_obd_cfg_rec test failed\n");
1414                 GOTO(out, rc);
1415         }
1416
1417         CWARN("9c: test llog_changelog_rec\n");
1418         /* Direct access to cr_do_not_use: peculiar case for this test */
1419         llog_records.lcr.cr_hdr.lrh_len = sizeof(llog_records.lcr);
1420         llog_records.lcr.cr_do_not_use.lrt_len = sizeof(llog_records.lcr);
1421         llog_records.lcr.cr_hdr.lrh_type = CHANGELOG_REC;
1422
1423         rc = llog_test_9_sub(env, ctxt);
1424         if (rc != 0) {
1425                 CERROR("9c: llog_changelog_rec test failed\n");
1426                 GOTO(out, rc);
1427         }
1428
1429         CWARN("9d: test llog_changelog_user_rec\n");
1430         llog_records.lcur.cur_hdr.lrh_len = sizeof(llog_records.lcur);
1431         llog_records.lcur.cur_tail.lrt_len = sizeof(llog_records.lcur);
1432         llog_records.lcur.cur_hdr.lrh_type = CHANGELOG_USER_REC;
1433
1434         rc = llog_test_9_sub(env, ctxt);
1435         if (rc != 0) {
1436                 CERROR("9d: llog_changelog_user_rec test failed\n");
1437                 GOTO(out, rc);
1438         }
1439
1440 out:
1441         llog_ctxt_put(ctxt);
1442         RETURN(rc);
1443 }
1444
1445 /* test catalog wrap around */
1446 static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
1447 {
1448         struct llog_handle      *cath;
1449         char                     name[10];
1450         int                      rc, rc2, i, enospc, eok;
1451         struct llog_mini_rec     lmr;
1452         struct llog_ctxt        *ctxt;
1453         struct lu_attr           la;
1454         __u64                    cat_max_size;
1455         struct dt_device        *dt;
1456
1457         ENTRY;
1458
1459         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1460         LASSERT(ctxt);
1461
1462         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
1463         lmr.lmr_hdr.lrh_type = 0xf00f00;
1464
1465         snprintf(name, sizeof(name), "%x", llog_test_rand + 2);
1466         CWARN("10a: create a catalog log with name: %s\n", name);
1467         rc = llog_open_create(env, ctxt, &cath, NULL, name);
1468         if (rc) {
1469                 CERROR("10a: llog_create with name %s failed: %d\n", name, rc);
1470                 GOTO(ctxt_release, rc);
1471         }
1472         rc = llog_init_handle(env, cath, LLOG_F_IS_CAT, &uuid);
1473         if (rc) {
1474                 CERROR("10a: can't init llog handle: %d\n", rc);
1475                 GOTO(out, rc);
1476         }
1477
1478         cat_logid = cath->lgh_id;
1479         dt = lu2dt_dev(cath->lgh_obj->do_lu.lo_dev);
1480
1481         /* sync device to commit all recent LLOG changes to disk and avoid
1482          * to consume a huge space with delayed journal commit callbacks
1483          * particularly on low memory nodes or VMs */
1484         rc = dt_sync(env, dt);
1485         if (rc) {
1486                 CERROR("10c: sync failed: %d\n", rc);
1487                 GOTO(out, rc);
1488         }
1489
1490         /* force catalog wrap for 5th plain LLOG */
1491         cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS;
1492         cfs_fail_val = 4;
1493
1494         CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM);
1495         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1496                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1497                 if (rc) {
1498                         CERROR("10b: write %d records failed at #%d: %d\n",
1499                                LLOG_TEST_RECNUM, i + 1, rc);
1500                         GOTO(out, rc);
1501                 }
1502         }
1503
1504         /* make sure 2 new plain llog appears in catalog (+1 with hdr) */
1505         rc = verify_handle("10b", cath, 3);
1506         if (rc)
1507                 GOTO(out, rc);
1508
1509         /* sync device to commit all recent LLOG changes to disk and avoid
1510          * to consume a huge space with delayed journal commit callbacks
1511          * particularly on low memory nodes or VMs */
1512         rc = dt_sync(env, dt);
1513         if (rc) {
1514                 CERROR("10b: sync failed: %d\n", rc);
1515                 GOTO(out, rc);
1516         }
1517
1518         CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM);
1519         for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) {
1520                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1521                 if (rc) {
1522                         CERROR("10c: write %d records failed at #%d: %d\n",
1523                                2*LLOG_TEST_RECNUM, i + 1, rc);
1524                         GOTO(out, rc);
1525                 }
1526         }
1527
1528         /* make sure 2 new plain llog appears in catalog (+1 with hdr) */
1529         rc = verify_handle("10c", cath, 5);
1530         if (rc)
1531                 GOTO(out, rc);
1532
1533         /* sync device to commit all recent LLOG changes to disk and avoid
1534          * to consume a huge space with delayed journal commit callbacks
1535          * particularly on low memory nodes or VMs */
1536         rc = dt_sync(env, dt);
1537         if (rc) {
1538                 CERROR("10c: sync failed: %d\n", rc);
1539                 GOTO(out, rc);
1540         }
1541
1542         /* fill last allocated plain LLOG and reach -ENOSPC condition
1543          * because no slot available in Catalog */
1544         enospc = 0;
1545         eok = 0;
1546         CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM);
1547         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1548                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1549                 if (rc && rc != -ENOSPC) {
1550                         CERROR("10c: write %d records failed at #%d: %d\n",
1551                                LLOG_TEST_RECNUM, i + 1, rc);
1552                         GOTO(out, rc);
1553                 }
1554                 /* after last added plain LLOG has filled up, all new
1555                  * records add should fail with -ENOSPC */
1556                 if (rc == -ENOSPC) {
1557                         enospc++;
1558                 } else {
1559                         enospc = 0;
1560                         eok++;
1561                 }
1562         }
1563
1564         if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
1565                 CERROR("10c: all last records adds should have failed with"
1566                        " -ENOSPC\n");
1567                 GOTO(out, rc = -EINVAL);
1568         }
1569
1570         CWARN("10c: wrote %d records then %d failed with ENOSPC\n", eok,
1571               enospc);
1572
1573         /* make sure no new record in Catalog */
1574         rc = verify_handle("10c", cath, 5);
1575         if (rc)
1576                 GOTO(out, rc);
1577
1578         /* Catalog should have reached its max size for test */
1579         rc = dt_attr_get(env, cath->lgh_obj, &la);
1580         if (rc) {
1581                 CERROR("10c: failed to get catalog attrs: %d\n", rc);
1582                 GOTO(out, rc);
1583         }
1584         cat_max_size = la.la_size;
1585
1586         /* cancel all 1st plain llog records to empty it, this will also cause
1587          * its catalog entry to be freed for next forced wrap in 10e */
1588         CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1589         cancel_count = 0;
1590         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1591         if (rc != -LLOG_EEMPTY) {
1592                 CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
1593                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1594                  * not reached */
1595                 if (rc == 0)
1596                         rc = -ERANGE;
1597                 GOTO(out, rc);
1598         }
1599
1600         CWARN("10d: print the catalog entries.. we expect 3\n");
1601         cat_counter = 0;
1602         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1603         if (rc) {
1604                 CERROR("10d: process with cat_print_cb failed: %d\n", rc);
1605                 GOTO(out, rc);
1606         }
1607         if (cat_counter != 3) {
1608                 CERROR("10d: %d entries in catalog\n", cat_counter);
1609                 GOTO(out, rc = -EINVAL);
1610         }
1611
1612         /* verify one down in catalog (+1 with hdr) */
1613         rc = verify_handle("10d", cath, 4);
1614         if (rc)
1615                 GOTO(out, rc);
1616
1617         /* sync device to commit all recent LLOG changes to disk and avoid
1618          * to consume a huge space with delayed journal commit callbacks
1619          * particularly on low memory nodes or VMs */
1620         rc = dt_sync(env, dt);
1621         if (rc) {
1622                 CERROR("10d: sync failed: %d\n", rc);
1623                 GOTO(out, rc);
1624         }
1625
1626         enospc = 0;
1627         eok = 0;
1628         CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM);
1629         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1630                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1631                 if (rc && rc != -ENOSPC) {
1632                         CERROR("10e: write %d records failed at #%d: %d\n",
1633                                LLOG_TEST_RECNUM, i + 1, rc);
1634                         GOTO(out, rc);
1635                 }
1636                 /* after last added plain LLOG has filled up, all new
1637                  * records add should fail with -ENOSPC */
1638                 if (rc == -ENOSPC) {
1639                         enospc++;
1640                 } else {
1641                         enospc = 0;
1642                         eok++;
1643                 }
1644         }
1645
1646         if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
1647                 CERROR("10e: all last records adds should have failed with"
1648                        " -ENOSPC\n");
1649                 GOTO(out, rc = -EINVAL);
1650         }
1651
1652         CWARN("10e: wrote %d records then %d failed with ENOSPC\n", eok,
1653               enospc);
1654
1655         CWARN("10e: print the catalog entries.. we expect 4\n");
1656         cat_counter = 0;
1657         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1658         if (rc) {
1659                 CERROR("10d: process with cat_print_cb failed: %d\n", rc);
1660                 GOTO(out, rc);
1661         }
1662         if (cat_counter != 4) {
1663                 CERROR("10d: %d entries in catalog\n", cat_counter);
1664                 GOTO(out, rc = -EINVAL);
1665         }
1666
1667         /* make sure 1 new plain llog appears in catalog (+1 with hdr) */
1668         rc = verify_handle("10e", cath, 5);
1669         if (rc)
1670                 GOTO(out, rc);
1671
1672         /* verify catalog has wrap around */
1673         if (cath->lgh_last_idx > cath->lgh_hdr->llh_cat_idx) {
1674                 CERROR("10e: catalog failed to wrap around\n");
1675                 GOTO(out, rc = -EINVAL);
1676         }
1677
1678         rc = dt_attr_get(env, cath->lgh_obj, &la);
1679         if (rc) {
1680                 CERROR("10e: failed to get catalog attrs: %d\n", rc);
1681                 GOTO(out, rc);
1682         }
1683
1684         if (la.la_size != cat_max_size) {
1685                 CERROR("10e: catalog size has changed after it has wrap around,"
1686                        " current size = %llu, expected size = %llu\n",
1687                        la.la_size, cat_max_size);
1688                 GOTO(out, rc = -EINVAL);
1689         }
1690         CWARN("10e: catalog successfully wrap around, last_idx %d, first %d\n",
1691               cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1692
1693         /* sync device to commit all recent LLOG changes to disk and avoid
1694          * to consume a huge space with delayed journal commit callbacks
1695          * particularly on low memory nodes or VMs */
1696         rc = dt_sync(env, dt);
1697         if (rc) {
1698                 CERROR("10e: sync failed: %d\n", rc);
1699                 GOTO(out, rc);
1700         }
1701
1702         /* cancel more records to free one more slot in Catalog
1703          * see if it is re-allocated when adding more records */
1704         CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1705         cancel_count = 0;
1706         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1707         if (rc != -LLOG_EEMPTY) {
1708                 CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
1709                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1710                  * not reached */
1711                 if (rc == 0)
1712                         rc = -ERANGE;
1713                 GOTO(out, rc);
1714         }
1715
1716         CWARN("10f: print the catalog entries.. we expect 3\n");
1717         cat_counter = 0;
1718         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1719         if (rc) {
1720                 CERROR("10f: process with cat_print_cb failed: %d\n", rc);
1721                 GOTO(out, rc);
1722         }
1723         if (cat_counter != 3) {
1724                 CERROR("10f: %d entries in catalog\n", cat_counter);
1725                 GOTO(out, rc = -EINVAL);
1726         }
1727
1728         /* verify one down in catalog (+1 with hdr) */
1729         rc = verify_handle("10f", cath, 4);
1730         if (rc)
1731                 GOTO(out, rc);
1732
1733         /* sync device to commit all recent LLOG changes to disk and avoid
1734          * to consume a huge space with delayed journal commit callbacks
1735          * particularly on low memory nodes or VMs */
1736         rc = dt_sync(env, dt);
1737         if (rc) {
1738                 CERROR("10f: sync failed: %d\n", rc);
1739                 GOTO(out, rc);
1740         }
1741
1742         enospc = 0;
1743         eok = 0;
1744         CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM);
1745         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1746                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1747                 if (rc && rc != -ENOSPC) {
1748                         CERROR("10f: write %d records failed at #%d: %d\n",
1749                                LLOG_TEST_RECNUM, i + 1, rc);
1750                         GOTO(out, rc);
1751                 }
1752                 /* after last added plain LLOG has filled up, all new
1753                  * records add should fail with -ENOSPC */
1754                 if (rc == -ENOSPC) {
1755                         enospc++;
1756                 } else {
1757                         enospc = 0;
1758                         eok++;
1759                 }
1760         }
1761
1762         if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
1763                 CERROR("10f: all last records adds should have failed with"
1764                        " -ENOSPC\n");
1765                 GOTO(out, rc = -EINVAL);
1766         }
1767
1768         CWARN("10f: wrote %d records then %d failed with ENOSPC\n", eok,
1769               enospc);
1770
1771         /* make sure 1 new plain llog appears in catalog (+1 with hdr) */
1772         rc = verify_handle("10f", cath, 5);
1773         if (rc)
1774                 GOTO(out, rc);
1775
1776         /* verify lgh_last_idx = llh_cat_idx = 2 now */
1777         if (cath->lgh_last_idx != cath->lgh_hdr->llh_cat_idx ||
1778             cath->lgh_last_idx != 2) {
1779                 CERROR("10f: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 2\n",
1780                        cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1781                 GOTO(out, rc = -EINVAL);
1782         }
1783
1784         rc = dt_attr_get(env, cath->lgh_obj, &la);
1785         if (rc) {
1786                 CERROR("10f: failed to get catalog attrs: %d\n", rc);
1787                 GOTO(out, rc);
1788         }
1789
1790         if (la.la_size != cat_max_size) {
1791                 CERROR("10f: catalog size has changed after it has wrap around,"
1792                        " current size = %llu, expected size = %llu\n",
1793                        la.la_size, cat_max_size);
1794                 GOTO(out, rc = -EINVAL);
1795         }
1796
1797         /* sync device to commit all recent LLOG changes to disk and avoid
1798          * to consume a huge space with delayed journal commit callbacks
1799          * particularly on low memory nodes or VMs */
1800         rc = dt_sync(env, dt);
1801         if (rc) {
1802                 CERROR("10f: sync failed: %d\n", rc);
1803                 GOTO(out, rc);
1804         }
1805
1806         /* will llh_cat_idx also successfully wrap ? */
1807
1808         /* cancel all records in the plain LLOGs referenced by 2 last indexes in
1809          * Catalog */
1810
1811         /* cancel more records to free one more slot in Catalog */
1812         CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1813         cancel_count = 0;
1814         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1815         if (rc != -LLOG_EEMPTY) {
1816                 CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
1817                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1818                  * not reached */
1819                 if (rc == 0)
1820                         rc = -ERANGE;
1821                 GOTO(out, rc);
1822         }
1823
1824         CWARN("10g: print the catalog entries.. we expect 3\n");
1825         cat_counter = 0;
1826         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1827         if (rc) {
1828                 CERROR("10g: process with cat_print_cb failed: %d\n", rc);
1829                 GOTO(out, rc);
1830         }
1831         if (cat_counter != 3) {
1832                 CERROR("10g: %d entries in catalog\n", cat_counter);
1833                 GOTO(out, rc = -EINVAL);
1834         }
1835
1836         /* verify one down in catalog (+1 with hdr) */
1837         rc = verify_handle("10g", cath, 4);
1838         if (rc)
1839                 GOTO(out, rc);
1840
1841         /* sync device to commit all recent LLOG changes to disk and avoid
1842          * to consume a huge space with delayed journal commit callbacks
1843          * particularly on low memory nodes or VMs */
1844         rc = dt_sync(env, dt);
1845         if (rc) {
1846                 CERROR("10g: sync failed: %d\n", rc);
1847                 GOTO(out, rc);
1848         }
1849
1850         /* cancel more records to free one more slot in Catalog */
1851         CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1852         cancel_count = 0;
1853         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1854         if (rc != -LLOG_EEMPTY) {
1855                 CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
1856                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1857                  * not reached */
1858                 if (rc == 0)
1859                         rc = -ERANGE;
1860                 GOTO(out, rc);
1861         }
1862
1863         CWARN("10g: print the catalog entries.. we expect 2\n");
1864         cat_counter = 0;
1865         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1866         if (rc) {
1867                 CERROR("10g: process with cat_print_cb failed: %d\n", rc);
1868                 GOTO(out, rc);
1869         }
1870         if (cat_counter != 2) {
1871                 CERROR("10g: %d entries in catalog\n", cat_counter);
1872                 GOTO(out, rc = -EINVAL);
1873         }
1874
1875         /* verify one down in catalog (+1 with hdr) */
1876         rc = verify_handle("10g", cath, 3);
1877         if (rc)
1878                 GOTO(out, rc);
1879
1880         /* verify lgh_last_idx = 2 and llh_cat_idx = 0 now */
1881         if (cath->lgh_hdr->llh_cat_idx != 0 ||
1882             cath->lgh_last_idx != 2) {
1883                 CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 0\n",
1884                        cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1885                 GOTO(out, rc = -EINVAL);
1886         }
1887
1888         /* sync device to commit all recent LLOG changes to disk and avoid
1889          * to consume a huge space with delayed journal commit callbacks
1890          * particularly on low memory nodes or VMs */
1891         rc = dt_sync(env, dt);
1892         if (rc) {
1893                 CERROR("10g: sync failed: %d\n", rc);
1894                 GOTO(out, rc);
1895         }
1896
1897         /* cancel more records to free one more slot in Catalog */
1898         CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1899         cancel_count = 0;
1900         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1901         if (rc != -LLOG_EEMPTY) {
1902                 CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
1903                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1904                  * not reached */
1905                 if (rc == 0)
1906                         rc = -ERANGE;
1907                 GOTO(out, rc);
1908         }
1909
1910         CWARN("10g: print the catalog entries.. we expect 1\n");
1911         cat_counter = 0;
1912         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1913         if (rc) {
1914                 CERROR("10g: process with cat_print_cb failed: %d\n", rc);
1915                 GOTO(out, rc);
1916         }
1917         if (cat_counter != 1) {
1918                 CERROR("10g: %d entries in catalog\n", cat_counter);
1919                 GOTO(out, rc = -EINVAL);
1920         }
1921
1922         /* verify one down in catalog (+1 with hdr) */
1923         rc = verify_handle("10g", cath, 2);
1924         if (rc)
1925                 GOTO(out, rc);
1926
1927         /* verify lgh_last_idx = 2 and llh_cat_idx = 1 now */
1928         if (cath->lgh_hdr->llh_cat_idx != 1 ||
1929             cath->lgh_last_idx != 2) {
1930                 CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 1\n",
1931                        cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1932                 GOTO(out, rc = -EINVAL);
1933         }
1934
1935         CWARN("10g: llh_cat_idx has also successfully wrapped!\n");
1936
1937 out:
1938         cfs_fail_loc = 0;
1939         cfs_fail_val = 0;
1940
1941         CWARN("10: put newly-created catalog\n");
1942         rc2 = llog_cat_close(env, cath);
1943         if (rc2) {
1944                 CERROR("10: close log %s failed: %d\n", name, rc2);
1945                 if (rc == 0)
1946                         rc = rc2;
1947         }
1948 ctxt_release:
1949         llog_ctxt_put(ctxt);
1950         RETURN(rc);
1951 }
1952
1953 /* -------------------------------------------------------------------------
1954  * Tests above, boring obd functions below
1955  * ------------------------------------------------------------------------- */
1956 static int llog_run_tests(const struct lu_env *env, struct obd_device *obd)
1957 {
1958         struct llog_handle      *llh = NULL;
1959         struct llog_ctxt        *ctxt;
1960         int                      rc, err;
1961         char                     name[10];
1962
1963         ENTRY;
1964         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1965         LASSERT(ctxt);
1966
1967         sprintf(name, "%x", llog_test_rand);
1968
1969         rc = llog_test_1(env, obd, name);
1970         if (rc)
1971                 GOTO(cleanup_ctxt, rc);
1972
1973         rc = llog_test_2(env, obd, name, &llh);
1974         if (rc)
1975                 GOTO(cleanup_ctxt, rc);
1976
1977         rc = llog_test_3(env, obd, llh);
1978         if (rc)
1979                 GOTO(cleanup, rc);
1980
1981         rc = llog_test_4(env, obd);
1982         if (rc)
1983                 GOTO(cleanup, rc);
1984
1985         rc = llog_test_5(env, obd);
1986         if (rc)
1987                 GOTO(cleanup, rc);
1988
1989         rc = llog_test_6(env, obd, name);
1990         if (rc)
1991                 GOTO(cleanup, rc);
1992
1993         rc = llog_test_7(env, obd);
1994         if (rc)
1995                 GOTO(cleanup, rc);
1996
1997         rc = llog_test_8(env, obd);
1998         if (rc)
1999                 GOTO(cleanup, rc);
2000
2001         rc = llog_test_9(env, obd);
2002         if (rc != 0)
2003                 GOTO(cleanup, rc);
2004
2005         rc = llog_test_10(env, obd);
2006         if (rc)
2007                 GOTO(cleanup, rc);
2008
2009 cleanup:
2010         err = llog_destroy(env, llh);
2011         if (err)
2012                 CERROR("cleanup: llog_destroy failed: %d\n", err);
2013         llog_close(env, llh);
2014         if (rc == 0)
2015                 rc = err;
2016 cleanup_ctxt:
2017         llog_ctxt_put(ctxt);
2018         return rc;
2019 }
2020
2021 static int llog_test_cleanup(struct obd_device *obd)
2022 {
2023         struct obd_device       *tgt;
2024         struct lu_env            env;
2025         int                      rc;
2026
2027         ENTRY;
2028
2029         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
2030         if (rc)
2031                 RETURN(rc);
2032
2033         tgt = obd->obd_lvfs_ctxt.dt->dd_lu_dev.ld_obd;
2034         rc = llog_cleanup(&env, llog_get_context(tgt, LLOG_TEST_ORIG_CTXT));
2035         if (rc)
2036                 CERROR("failed to llog_test_llog_finish: %d\n", rc);
2037         lu_env_fini(&env);
2038         RETURN(rc);
2039 }
2040
2041 static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2042 {
2043         struct obd_device       *tgt;
2044         struct llog_ctxt        *ctxt;
2045         struct dt_object        *o;
2046         struct lu_env            env;
2047         struct lu_context        test_session;
2048         int                      rc;
2049
2050         ENTRY;
2051
2052         if (lcfg->lcfg_bufcount < 2) {
2053                 CERROR("requires a TARGET OBD name\n");
2054                 RETURN(-EINVAL);
2055         }
2056
2057         if (lcfg->lcfg_buflens[1] < 1) {
2058                 CERROR("requires a TARGET OBD name\n");
2059                 RETURN(-EINVAL);
2060         }
2061
2062         /* disk obd */
2063         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2064         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2065                 CERROR("target device not attached or not set up (%s)\n",
2066                        lustre_cfg_string(lcfg, 1));
2067                 RETURN(-EINVAL);
2068         }
2069
2070         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
2071         if (rc)
2072                 RETURN(rc);
2073
2074         rc = lu_context_init(&test_session, LCT_SERVER_SESSION);
2075         if (rc)
2076                 GOTO(cleanup_env, rc);
2077         test_session.lc_thread = (struct ptlrpc_thread *)current;
2078         lu_context_enter(&test_session);
2079         env.le_ses = &test_session;
2080
2081         CWARN("Setup llog-test device over %s device\n",
2082               lustre_cfg_string(lcfg, 1));
2083
2084         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
2085         obd->obd_lvfs_ctxt.dt = lu2dt_dev(tgt->obd_lu_dev);
2086
2087         rc = llog_setup(&env, tgt, &tgt->obd_olg, LLOG_TEST_ORIG_CTXT, tgt,
2088                         &llog_osd_ops);
2089         if (rc)
2090                 GOTO(cleanup_session, rc);
2091
2092         /* use MGS llog dir for tests */
2093         ctxt = llog_get_context(tgt, LLOG_CONFIG_ORIG_CTXT);
2094         LASSERT(ctxt);
2095         o = ctxt->loc_dir;
2096         llog_ctxt_put(ctxt);
2097
2098         ctxt = llog_get_context(tgt, LLOG_TEST_ORIG_CTXT);
2099         LASSERT(ctxt);
2100         ctxt->loc_dir = o;
2101         llog_ctxt_put(ctxt);
2102
2103         llog_test_rand = cfs_rand();
2104
2105         rc = llog_run_tests(&env, tgt);
2106         if (rc)
2107                 llog_test_cleanup(obd);
2108 cleanup_session:
2109         lu_context_exit(&test_session);
2110         lu_context_fini(&test_session);
2111 cleanup_env:
2112         lu_env_fini(&env);
2113         RETURN(rc);
2114 }
2115
2116 static struct obd_ops llog_obd_ops = {
2117         .o_owner       = THIS_MODULE,
2118         .o_setup       = llog_test_setup,
2119         .o_cleanup     = llog_test_cleanup,
2120 };
2121
2122 static int __init llog_test_init(void)
2123 {
2124         return class_register_type(&llog_obd_ops, NULL, true, NULL,
2125                                    "llog_test", NULL);
2126 }
2127
2128 static void __exit llog_test_exit(void)
2129 {
2130         class_unregister_type("llog_test");
2131 }
2132
2133 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2134 MODULE_DESCRIPTION("Lustre Log test module");
2135 MODULE_VERSION(LUSTRE_VERSION_STRING);
2136 MODULE_LICENSE("GPL");
2137
2138 module_init(llog_test_init);
2139 module_exit(llog_test_exit);