Whamcloud - gitweb
LU-7988 hsm: run HSM coordinator once per second at most
[fs/lustre-release.git] / lustre / obdclass / llog_test.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/llog_test.c
33  *
34  * Author: Phil Schwan <phil@clusterfs.com>
35  * Author: Mikhail Pershin <mike.pershin@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/module.h>
41 #include <linux/init.h>
42
43 #include <obd_class.h>
44 #include <lustre_fid.h>
45 #include <lustre_log.h>
46
47 /* This is slightly more than the number of records that can fit into a
48  * single llog file, because the llog_log_header takes up some of the
49  * space in the first block that cannot be used for the bitmap. */
50 #define LLOG_TEST_RECNUM  (LLOG_MIN_CHUNK_SIZE * 8)
51
52 static int llog_test_rand;
53 static struct obd_uuid uuid = { .uuid = "test_uuid" };
54 static struct llog_logid cat_logid;
55
56 struct llog_mini_rec {
57         struct llog_rec_hdr     lmr_hdr;
58         struct llog_rec_tail    lmr_tail;
59 } __attribute__((packed));
60
61 static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
62 {
63         int i;
64         int last_idx = 0;
65         int active_recs = 0;
66
67         for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
68                 if (ext2_test_bit(i, LLOG_HDR_BITMAP(llh->lgh_hdr))) {
69                         last_idx = i;
70                         active_recs++;
71                 }
72         }
73
74         /* check the llog is sane at first, llh_count and lgh_last_idx*/
75         if (llh->lgh_hdr->llh_count != active_recs) {
76                 CERROR("%s: handle->count is %d, but there are %d recs found\n",
77                        test, llh->lgh_hdr->llh_count, active_recs);
78                 RETURN(-ERANGE);
79         }
80
81         if (llh->lgh_last_idx != LLOG_HDR_TAIL(llh->lgh_hdr)->lrt_index ||
82             (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_CAT) &&
83              llh->lgh_last_idx < last_idx)) {
84                 CERROR("%s: lgh_last_idx is %d (%d in the header), last found %d\n",
85                        test, llh->lgh_last_idx,
86                        LLOG_HDR_TAIL(llh->lgh_hdr)->lrt_index, last_idx);
87                 RETURN(-ERANGE);
88         }
89
90         /* finally checks against expected value from the caller */
91         if (active_recs != num_recs) {
92                 CERROR("%s: expected %d active recs after write, found %d\n",
93                        test, num_recs, active_recs);
94                 RETURN(-ERANGE);
95         }
96
97         RETURN(0);
98 }
99
100 /* Test named-log create/open, close */
101 static int llog_test_1(const struct lu_env *env,
102                        struct obd_device *obd, char *name)
103 {
104         struct llog_handle      *llh;
105         struct llog_ctxt        *ctxt;
106         int rc;
107         int rc2;
108
109         ENTRY;
110
111         CWARN("1a: create a log with name: %s\n", name);
112         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
113         LASSERT(ctxt);
114
115         rc = llog_open_create(env, ctxt, &llh, NULL, name);
116         if (rc) {
117                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
118                 GOTO(out, rc);
119         }
120         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, &uuid);
121         if (rc) {
122                 CERROR("1a: can't init llog handle: %d\n", rc);
123                 GOTO(out_close, rc);
124         }
125
126         rc = verify_handle("1", llh, 1);
127
128         CWARN("1b: close newly-created log\n");
129 out_close:
130         rc2 = llog_close(env, llh);
131         if (rc2) {
132                 CERROR("1b: close log %s failed: %d\n", name, rc2);
133                 if (rc == 0)
134                         rc = rc2;
135         }
136 out:
137         llog_ctxt_put(ctxt);
138         RETURN(rc);
139 }
140
141 static int test_2_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
142                             struct llog_rec_hdr *rec, void *data)
143 {
144         return LLOG_DEL_RECORD;
145 }
146
147 /* Test named-log reopen; returns opened log on success */
148 static int llog_test_2(const struct lu_env *env, struct obd_device *obd,
149                        char *name, struct llog_handle **llh)
150 {
151         struct llog_ctxt        *ctxt;
152         struct llog_handle      *lgh;
153         struct llog_logid        logid;
154         int                      rc;
155         struct llog_mini_rec     lmr;
156
157         ENTRY;
158
159         CWARN("2a: re-open a log with name: %s\n", name);
160         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
161         LASSERT(ctxt);
162
163         rc = llog_open(env, ctxt, llh, NULL, name, LLOG_OPEN_EXISTS);
164         if (rc) {
165                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
166                 GOTO(out_put, rc);
167         }
168
169         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &uuid);
170         if (rc) {
171                 CERROR("2a: can't init llog handle: %d\n", rc);
172                 GOTO(out_close_llh, rc);
173         }
174
175         rc = verify_handle("2", *llh, 1);
176         if (rc)
177                 GOTO(out_close_llh, rc);
178
179         CWARN("2b: create a log without specified NAME & LOGID\n");
180         rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
181         if (rc) {
182                 CERROR("2b: create log failed\n");
183                 GOTO(out_close_llh, rc);
184         }
185         rc = llog_init_handle(env, lgh, LLOG_F_IS_PLAIN, &uuid);
186         if (rc) {
187                 CERROR("2b: can't init llog handle: %d\n", rc);
188                 GOTO(out_close, rc);
189         }
190
191         logid = lgh->lgh_id;
192
193         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
194         lmr.lmr_hdr.lrh_type = 0xf02f02;
195
196         /* Check llog header values are correct after record add/cancel */
197         CWARN("2b: write 1 llog records, check llh_count\n");
198         rc = llog_write(env, lgh, &lmr.lmr_hdr, LLOG_NEXT_IDX);
199         if (rc < 0)
200                 GOTO(out_close, rc);
201
202         /* in-memory values after record addition */
203         rc = verify_handle("2b", lgh, 2);
204         if (rc < 0)
205                 GOTO(out_close, rc);
206
207         /* re-open llog to read on-disk values */
208         llog_close(env, lgh);
209
210         CWARN("2c: re-open the log by LOGID and verify llh_count\n");
211         rc = llog_open(env, ctxt, &lgh, &logid, NULL, LLOG_OPEN_EXISTS);
212         if (rc < 0) {
213                 CERROR("2c: re-open log by LOGID failed\n");
214                 GOTO(out_close_llh, rc);
215         }
216
217         rc = llog_init_handle(env, lgh, LLOG_F_IS_PLAIN, &uuid);
218         if (rc < 0) {
219                 CERROR("2c: can't init llog handle: %d\n", rc);
220                 GOTO(out_close, rc);
221         }
222
223         /* check values just read from disk */
224         rc = verify_handle("2c", lgh, 2);
225         if (rc < 0)
226                 GOTO(out_close, rc);
227
228         rc = llog_process(env, lgh, test_2_cancel_cb, NULL, NULL);
229         if (rc < 0)
230                 GOTO(out_close, rc);
231
232         /* in-memory values */
233         rc = verify_handle("2c", lgh, 1);
234         if (rc < 0)
235                 GOTO(out_close, rc);
236
237         /* re-open llog to get on-disk values */
238         llog_close(env, lgh);
239
240         rc = llog_open(env, ctxt, &lgh, &logid, NULL, LLOG_OPEN_EXISTS);
241         if (rc) {
242                 CERROR("2c: re-open log by LOGID failed\n");
243                 GOTO(out_close_llh, rc);
244         }
245
246         rc = llog_init_handle(env, lgh, LLOG_F_IS_PLAIN, &uuid);
247         if (rc) {
248                 CERROR("2c: can't init llog handle: %d\n", rc);
249                 GOTO(out_close, rc);
250         }
251
252         /* on-disk values after llog re-open */
253         rc = verify_handle("2c", lgh, 1);
254         if (rc < 0)
255                 GOTO(out_close, rc);
256
257         CWARN("2d: destroy this log\n");
258         rc = llog_destroy(env, lgh);
259         if (rc)
260                 CERROR("2d: destroy log failed\n");
261 out_close:
262         llog_close(env, lgh);
263 out_close_llh:
264         if (rc)
265                 llog_close(env, *llh);
266 out_put:
267         llog_ctxt_put(ctxt);
268
269         RETURN(rc);
270 }
271
272 static int test_3_rec_num;
273 static off_t test_3_rec_off;
274 static int test_3_paddings;
275 static int test_3_start_idx;
276
277 /*
278  * Test 3 callback.
279  * - check lgh_cur_offset correctness
280  * - check record index consistency
281  * - modify each record in-place
282  * - add new record during *last_idx processing
283  */
284 static int test3_check_n_add_cb(const struct lu_env *env,
285                                 struct llog_handle *lgh,
286                                 struct llog_rec_hdr *rec, void *data)
287 {
288         struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
289         int *last_rec = data;
290         unsigned cur_idx = test_3_start_idx + test_3_rec_num;
291         int rc;
292
293         if (lgh->lgh_hdr->llh_flags & LLOG_F_IS_FIXSIZE) {
294                 LASSERT(lgh->lgh_hdr->llh_size > 0);
295                 if (lgh->lgh_cur_offset != lgh->lgh_hdr->llh_hdr.lrh_len +
296                                         (cur_idx - 1) * lgh->lgh_hdr->llh_size)
297                         CERROR("Wrong record offset in cur_off: %llu, should be %u\n",
298                                lgh->lgh_cur_offset,
299                                lgh->lgh_hdr->llh_hdr.lrh_len +
300                                (cur_idx - 1) * lgh->lgh_hdr->llh_size);
301         } else {
302                 size_t chunk_size = lgh->lgh_hdr->llh_hdr.lrh_len;
303
304                 /* For variable size records the start offset is unknown, trust
305                  * the first value and check others are consistent with it. */
306                 if (test_3_rec_off == 0)
307                         test_3_rec_off = lgh->lgh_cur_offset;
308
309                 if (lgh->lgh_cur_offset != test_3_rec_off) {
310                         __u64 tmp = lgh->lgh_cur_offset;
311
312                         /* there can be padding record */
313                         if ((do_div(tmp, chunk_size) == 0) &&
314                             (lgh->lgh_cur_offset - test_3_rec_off <
315                              rec->lrh_len + LLOG_MIN_REC_SIZE)) {
316                                 test_3_rec_off = lgh->lgh_cur_offset;
317                                 test_3_paddings++;
318                         } else {
319                                 CERROR("Wrong record offset in cur_off: %llu"
320                                        ", should be %lld (rec len %u)\n",
321                                        lgh->lgh_cur_offset,
322                                        (long long)test_3_rec_off,
323                                        rec->lrh_len);
324                         }
325                 }
326                 test_3_rec_off += rec->lrh_len;
327         }
328
329         cur_idx += test_3_paddings;
330         if (cur_idx != rec->lrh_index)
331                 CERROR("Record with wrong index was read: %u, expected %u\n",
332                        rec->lrh_index, cur_idx);
333
334         /* modify all records in place */
335         lgr->lgr_gen.conn_cnt = rec->lrh_index;
336         rc = llog_write(env, lgh, rec, rec->lrh_index);
337         if (rc < 0)
338                 CERROR("cb_test_3: cannot modify record while processing\n");
339
340         /* Add new record to the llog at *last_rec position one by one to
341          * check that last block is re-read during processing */
342         if (cur_idx == *last_rec || cur_idx == (*last_rec + 1)) {
343                 rc = llog_write(env, lgh, rec, LLOG_NEXT_IDX);
344                 if (rc < 0)
345                         CERROR("cb_test_3: cannot add new record while "
346                                "processing\n");
347         }
348         test_3_rec_num++;
349
350         return rc;
351 }
352
353 /* Check in-place modifications were done for all records*/
354 static int test3_check_cb(const struct lu_env *env, struct llog_handle *lgh,
355                           struct llog_rec_hdr *rec, void *data)
356 {
357         struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
358
359         if (lgr->lgr_gen.conn_cnt != rec->lrh_index) {
360                 CERROR("cb_test_3: record %u is not modified\n",
361                        rec->lrh_index);
362                 return -EINVAL;
363         }
364         test_3_rec_num++;
365         return 0;
366 }
367
368 static int llog_test3_process(const struct lu_env *env,
369                               struct llog_handle *lgh,
370                               llog_cb_t cb, int start)
371 {
372         struct llog_process_cat_data cd;
373         int last_idx; /* new record will be injected here */
374         int rc = 0;
375
376         CWARN("test3: processing records from index %d to the end\n",
377               start);
378         cd.lpcd_first_idx = start - 1;
379         cd.lpcd_last_idx = 0;
380         test_3_rec_num = test_3_paddings = 0;
381         last_idx = lgh->lgh_last_idx;
382         rc = llog_process(env, lgh, cb, &last_idx, &cd);
383         if (rc < 0)
384                 return rc;
385         CWARN("test3: total %u records processed with %u paddings\n",
386               test_3_rec_num, test_3_paddings);
387         return test_3_rec_num;
388 }
389
390 /* Test plain llog functionality */
391 static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
392                        struct llog_handle *llh)
393 {
394         char buf[128];
395         struct llog_rec_hdr *hdr = (void *)buf;
396         int rc, i;
397         int num_recs = 1; /* 1 for the header */
398         int expected;
399
400         ENTRY;
401
402         hdr->lrh_len = sizeof(struct llog_gen_rec);
403         hdr->lrh_type = LLOG_GEN_REC;
404         llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec);
405         llh->lgh_hdr->llh_flags |= LLOG_F_IS_FIXSIZE;
406
407         /* Fill the llog with 64-bytes records, use 1023 records,
408          * so last chunk will be partially full. Don't change this
409          * value until record size is changed.
410          */
411         CWARN("3a: write 1023 fixed-size llog records\n");
412         for (i = 0; i < 1023; i++) {
413                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
414                 if (rc < 0) {
415                         CERROR("3a: write 1023 records failed at #%d: %d\n",
416                                i + 1, rc);
417                         RETURN(rc);
418                 }
419                 num_recs++;
420         }
421
422         rc = verify_handle("3a", llh, num_recs);
423         if (rc)
424                 RETURN(rc);
425
426         /*
427          * Test fixed-size records processing:
428          * - search the needed index
429          * - go through all records from that index
430          * - check all indices are growing monotonically and exist
431          * - modify each record
432          *
433          * NB: test3_check_n_add adds two new records while processing
434          * after last record. There were 1023 records created so the last chunk
435          * misses exactly one record. Therefore one of new records will be
436          * the last in the current chunk and second causes the new chunk to be
437          * created.
438          */
439         test_3_rec_off = 0;
440         test_3_start_idx = 501;
441         expected = 525;
442         rc = llog_test3_process(env, llh, test3_check_n_add_cb,
443                                 test_3_start_idx);
444         if (rc < 0)
445                 RETURN(rc);
446
447         /* extra record is created during llog_process() */
448         if (rc != expected) {
449                 CERROR("3a: process total %d records but expect %d\n",
450                        rc, expected);
451                 RETURN(-ERANGE);
452         }
453
454         num_recs += 2;
455
456         /* test modification in place */
457         rc = llog_test3_process(env, llh, test3_check_cb, test_3_start_idx);
458         if (rc < 0)
459                 RETURN(rc);
460
461         if (rc != expected) {
462                 CERROR("3a: process total %d records but expect %d\n",
463                        rc, expected);
464                 RETURN(-ERANGE);
465         }
466
467         CWARN("3b: write 566 variable size llog records\n");
468
469         /* Drop llh_size to 0 to mark llog as variable-size and write
470          * header to make this change permanent. */
471         llh->lgh_hdr->llh_flags &= ~LLOG_F_IS_FIXSIZE;
472         llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX);
473
474         hdr->lrh_type = OBD_CFG_REC;
475
476         /* there are 1025 64-bytes records in llog already,
477          * the last chunk contains single record, i.e. 64 bytes.
478          * Each pair of variable size records is 200 bytes, so
479          * we will have the following distribution per chunks:
480          * block 1: 64 + 80(80/120) + 80 + 48(pad) = 81 iterations
481          * block 2: 80(120/80) + 120 + 72(pad) = 81 itereations
482          * block 3: 80(80/120) + 80 + 112(pad) = 81 iterations
483          * -- the same as block 2 again and so on.
484          * block 7: 80(80/120) = 80 iterations and 192 bytes remain
485          * Total 6 * 81 + 80 = 566 itereations.
486          * Callback will add another 120 bytes in the end of the last chunk
487          * and another 120 bytes will cause padding (72 bytes) plus 120
488          * bytes in the new block.
489          */
490         for (i = 0; i < 566; i++) {
491                 if ((i % 2) == 0)
492                         hdr->lrh_len = 80;
493                 else
494                         hdr->lrh_len = 120;
495
496                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
497                 if (rc < 0) {
498                         CERROR("3b: write 566 records failed at #%d: %d\n",
499                                i + 1, rc);
500                         RETURN(rc);
501                 }
502                 num_recs++;
503         }
504
505         rc = verify_handle("3b", llh, num_recs);
506         if (rc)
507                 RETURN(rc);
508
509         test_3_start_idx = 1026;
510         expected = 568;
511         rc = llog_test3_process(env, llh, test3_check_n_add_cb,
512                                 test_3_start_idx);
513         if (rc < 0)
514                 RETURN(rc);
515
516         if (rc != expected) {
517                 CERROR("3b: process total %d records but expect %d\n",
518                        rc, expected);
519                 RETURN(-ERANGE);
520         }
521
522         num_recs += 2;
523
524         /* test modification in place */
525         rc = llog_test3_process(env, llh, test3_check_cb, test_3_start_idx);
526         if (rc < 0)
527                 RETURN(rc);
528
529         if (rc != expected) {
530                 CERROR("3b: process total %d records but expect %d\n",
531                        rc, expected);
532                 RETURN(-ERANGE);
533         }
534
535         CWARN("3c: write records with variable size until BITMAP_SIZE, "
536               "return -ENOSPC\n");
537         while (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
538                 if ((num_recs % 2) == 0)
539                         hdr->lrh_len = 80;
540                 else
541                         hdr->lrh_len = 128;
542
543                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
544                 if (rc == -ENOSPC) {
545                         break;
546                 } else if (rc < 0) {
547                         CERROR("3c: write recs failed at #%d: %d\n",
548                                num_recs, rc);
549                         RETURN(rc);
550                 }
551                 num_recs++;
552         }
553
554         if (rc != -ENOSPC) {
555                 CWARN("3c: write record more than BITMAP size!\n");
556                 RETURN(-EINVAL);
557         }
558         CWARN("3c: wrote %d more records before end of llog is reached\n",
559               num_recs);
560
561         rc = verify_handle("3c", llh, num_recs);
562
563         RETURN(rc);
564 }
565
566 /* Test catalogue additions */
567 static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
568 {
569         struct llog_handle      *cath;
570         char                     name[10];
571         int                      rc, rc2, i, buflen;
572         struct llog_mini_rec     lmr;
573         struct llog_cookie       cookie;
574         struct llog_ctxt        *ctxt;
575         int                      num_recs = 0;
576         char                    *buf;
577         struct llog_rec_hdr     *rec;
578
579         ENTRY;
580
581         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
582         LASSERT(ctxt);
583
584         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
585         lmr.lmr_hdr.lrh_type = 0xf00f00;
586
587         sprintf(name, "%x", llog_test_rand + 1);
588         CWARN("4a: create a catalog log with name: %s\n", name);
589         rc = llog_open_create(env, ctxt, &cath, NULL, name);
590         if (rc) {
591                 CERROR("4a: llog_create with name %s failed: %d\n", name, rc);
592                 GOTO(ctxt_release, rc);
593         }
594         rc = llog_init_handle(env, cath, LLOG_F_IS_CAT, &uuid);
595         if (rc) {
596                 CERROR("4a: can't init llog handle: %d\n", rc);
597                 GOTO(out, rc);
598         }
599
600         num_recs++;
601         cat_logid = cath->lgh_id;
602
603         CWARN("4b: write 1 record into the catalog\n");
604         rc = llog_cat_add(env, cath, &lmr.lmr_hdr, &cookie);
605         if (rc != 1) {
606                 CERROR("4b: write 1 catalog record failed at: %d\n", rc);
607                 GOTO(out, rc);
608         }
609         num_recs++;
610         rc = verify_handle("4b", cath, 2);
611         if (rc)
612                 GOTO(out, rc);
613
614         rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs);
615         if (rc)
616                 GOTO(out, rc);
617
618         CWARN("4c: cancel 1 log record\n");
619         rc = llog_cat_cancel_records(env, cath, 1, &cookie);
620         if (rc) {
621                 CERROR("4c: cancel 1 catalog based record failed: %d\n", rc);
622                 GOTO(out, rc);
623         }
624         num_recs--;
625
626         rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs);
627         if (rc)
628                 GOTO(out, rc);
629
630         CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM);
631         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
632                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
633                 if (rc) {
634                         CERROR("4d: write %d records failed at #%d: %d\n",
635                                LLOG_TEST_RECNUM, i + 1, rc);
636                         GOTO(out, rc);
637                 }
638                 num_recs++;
639         }
640
641         /* make sure new plain llog appears */
642         rc = verify_handle("4d", cath, 3);
643         if (rc)
644                 GOTO(out, rc);
645
646         CWARN("4e: add 5 large records, one record per block\n");
647         buflen = LLOG_MIN_CHUNK_SIZE;
648         OBD_ALLOC(buf, buflen);
649         if (buf == NULL)
650                 GOTO(out, rc = -ENOMEM);
651         for (i = 0; i < 5; i++) {
652                 rec = (void *)buf;
653                 rec->lrh_len = buflen;
654                 rec->lrh_type = OBD_CFG_REC;
655                 rc = llog_cat_add(env, cath, rec, NULL);
656                 if (rc) {
657                         CERROR("4e: write 5 records failed at #%d: %d\n",
658                                i + 1, rc);
659                         GOTO(out_free, rc);
660                 }
661                 num_recs++;
662         }
663 out_free:
664         OBD_FREE(buf, buflen);
665 out:
666         CWARN("4f: put newly-created catalog\n");
667         rc2 = llog_cat_close(env, cath);
668         if (rc2) {
669                 CERROR("4: close log %s failed: %d\n", name, rc2);
670                 if (rc == 0)
671                         rc = rc2;
672         }
673 ctxt_release:
674         llog_ctxt_put(ctxt);
675         RETURN(rc);
676 }
677
678 static int cat_counter;
679
680 static int cat_print_cb(const struct lu_env *env, struct llog_handle *llh,
681                         struct llog_rec_hdr *rec, void *data)
682 {
683         struct llog_logid_rec   *lir = (struct llog_logid_rec *)rec;
684         struct lu_fid            fid = {0};
685
686         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
687                 CERROR("invalid record in catalog\n");
688                 RETURN(-EINVAL);
689         }
690
691         logid_to_fid(&lir->lid_id, &fid);
692
693         CWARN("seeing record at index %d - "DFID" in log "DFID"\n",
694               rec->lrh_index, PFID(&fid),
695               PFID(lu_object_fid(&llh->lgh_obj->do_lu)));
696
697         cat_counter++;
698
699         RETURN(0);
700 }
701
702 static int plain_counter;
703
704 static int plain_print_cb(const struct lu_env *env, struct llog_handle *llh,
705                           struct llog_rec_hdr *rec, void *data)
706 {
707         struct lu_fid fid = {0};
708
709         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
710                 CERROR("log is not plain\n");
711                 RETURN(-EINVAL);
712         }
713
714         logid_to_fid(&llh->lgh_id, &fid);
715
716         CDEBUG(D_INFO, "seeing record at index %d in log "DFID"\n",
717                rec->lrh_index, PFID(&fid));
718
719         plain_counter++;
720
721         RETURN(0);
722 }
723
724 static int cancel_count;
725
726 static int llog_cancel_rec_cb(const struct lu_env *env,
727                               struct llog_handle *llh,
728                               struct llog_rec_hdr *rec, void *data)
729 {
730         struct llog_cookie cookie;
731
732         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
733                 CERROR("log is not plain\n");
734                 RETURN(-EINVAL);
735         }
736
737         cookie.lgc_lgl = llh->lgh_id;
738         cookie.lgc_index = rec->lrh_index;
739
740         llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie);
741         cancel_count++;
742         if (cancel_count == LLOG_TEST_RECNUM)
743                 RETURN(-LLOG_EEMPTY);
744         RETURN(0);
745 }
746
747 /* Test log and catalogue processing */
748 static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
749 {
750         struct llog_handle      *llh = NULL;
751         char                     name[10];
752         int                      rc, rc2;
753         struct llog_mini_rec     lmr;
754         struct llog_ctxt        *ctxt;
755
756         ENTRY;
757
758         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
759         LASSERT(ctxt);
760
761         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
762         lmr.lmr_hdr.lrh_type = 0xf00f00;
763
764         CWARN("5a: re-open catalog by id\n");
765         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
766         if (rc) {
767                 CERROR("5a: llog_create with logid failed: %d\n", rc);
768                 GOTO(out_put, rc);
769         }
770
771         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
772         if (rc) {
773                 CERROR("5a: can't init llog handle: %d\n", rc);
774                 GOTO(out, rc);
775         }
776
777         CWARN("5b: print the catalog entries.. we expect 2\n");
778         cat_counter = 0;
779         rc = llog_process(env, llh, cat_print_cb, "test 5", NULL);
780         if (rc) {
781                 CERROR("5b: process with cat_print_cb failed: %d\n", rc);
782                 GOTO(out, rc);
783         }
784         if (cat_counter != 2) {
785                 CERROR("5b: %d entries in catalog\n", cat_counter);
786                 GOTO(out, rc = -EINVAL);
787         }
788
789         CWARN("5c: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
790         cancel_count = 0;
791         rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0);
792         if (rc != -LLOG_EEMPTY) {
793                 CERROR("5c: process with llog_cancel_rec_cb failed: %d\n", rc);
794                 GOTO(out, rc);
795         }
796
797         CWARN("5c: print the catalog entries.. we expect 1\n");
798         cat_counter = 0;
799         rc = llog_process(env, llh, cat_print_cb, "test 5", NULL);
800         if (rc) {
801                 CERROR("5c: process with cat_print_cb failed: %d\n", rc);
802                 GOTO(out, rc);
803         }
804         if (cat_counter != 1) {
805                 CERROR("5c: %d entries in catalog\n", cat_counter);
806                 GOTO(out, rc = -EINVAL);
807         }
808
809         CWARN("5d: add 1 record to the log with many canceled empty pages\n");
810         rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
811         if (rc) {
812                 CERROR("5d: add record to the log with many canceled empty "
813                        "pages failed\n");
814                 GOTO(out, rc);
815         }
816
817         CWARN("5e: print plain log entries.. expect 6\n");
818         plain_counter = 0;
819         rc = llog_cat_process(env, llh, plain_print_cb, "foobar", 0, 0);
820         if (rc) {
821                 CERROR("5e: process with plain_print_cb failed: %d\n", rc);
822                 GOTO(out, rc);
823         }
824         if (plain_counter != 6) {
825                 CERROR("5e: found %d records\n", plain_counter);
826                 GOTO(out, rc = -EINVAL);
827         }
828
829         CWARN("5f: print plain log entries reversely.. expect 6\n");
830         plain_counter = 0;
831         rc = llog_cat_reverse_process(env, llh, plain_print_cb, "foobar");
832         if (rc) {
833                 CERROR("5f: reversely process with plain_print_cb failed: "
834                        "%d\n", rc);
835                 GOTO(out, rc);
836         }
837         if (plain_counter != 6) {
838                 CERROR("5f: found %d records\n", plain_counter);
839                 GOTO(out, rc = -EINVAL);
840         }
841
842 out:
843         CWARN("5g: close re-opened catalog\n");
844         rc2 = llog_cat_close(env, llh);
845         if (rc2) {
846                 CERROR("5g: close log %s failed: %d\n", name, rc2);
847                 if (rc == 0)
848                         rc = rc2;
849         }
850 out_put:
851         llog_ctxt_put(ctxt);
852
853         RETURN(rc);
854 }
855
856 /* Test client api; open log by name and process */
857 static int llog_test_6(const struct lu_env *env, struct obd_device *obd,
858                        char *name)
859 {
860         struct obd_device       *mgc_obd;
861         struct llog_ctxt        *ctxt;
862         struct obd_uuid         *mgs_uuid;
863         struct obd_export       *exp;
864         struct obd_uuid          uuid = { "LLOG_TEST6_UUID" };
865         struct llog_handle      *llh = NULL;
866         struct llog_ctxt        *nctxt;
867         int                      rc, rc2;
868
869         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
870         LASSERT(ctxt);
871         mgs_uuid = &ctxt->loc_exp->exp_obd->obd_uuid;
872
873         CWARN("6a: re-open log %s using client API\n", name);
874         mgc_obd = class_find_client_obd(mgs_uuid, LUSTRE_MGC_NAME, NULL);
875         if (mgc_obd == NULL) {
876                 CERROR("6a: no MGC devices connected to %s found.\n",
877                        mgs_uuid->uuid);
878                 GOTO(ctxt_release, rc = -ENOENT);
879         }
880
881         rc = obd_connect(NULL, &exp, mgc_obd, &uuid,
882                          NULL /* obd_connect_data */, NULL);
883         if (rc != -EALREADY) {
884                 CERROR("6a: connect on connected MGC (%s) failed to return"
885                        " -EALREADY\n", mgc_obd->obd_name);
886                 if (rc == 0)
887                         obd_disconnect(exp);
888                 GOTO(ctxt_release, rc = -EINVAL);
889         }
890
891         nctxt = llog_get_context(mgc_obd, LLOG_CONFIG_REPL_CTXT);
892         rc = llog_open(env, nctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
893         if (rc) {
894                 CERROR("6a: llog_open failed %d\n", rc);
895                 GOTO(nctxt_put, rc);
896         }
897
898         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
899         if (rc) {
900                 CERROR("6a: llog_init_handle failed %d\n", rc);
901                 GOTO(parse_out, rc);
902         }
903
904         plain_counter = 1; /* llog header is first record */
905         CWARN("6b: process log %s using client API\n", name);
906         rc = llog_process(env, llh, plain_print_cb, NULL, NULL);
907         if (rc)
908                 CERROR("6b: llog_process failed %d\n", rc);
909         CWARN("6b: processed %d records\n", plain_counter);
910
911         rc = verify_handle("6b", llh, plain_counter);
912         if (rc)
913                 GOTO(parse_out, rc);
914
915         plain_counter = 1; /* llog header is first record */
916         CWARN("6c: process log %s reversely using client API\n", name);
917         rc = llog_reverse_process(env, llh, plain_print_cb, NULL, NULL);
918         if (rc)
919                 CERROR("6c: llog_reverse_process failed %d\n", rc);
920         CWARN("6c: processed %d records\n", plain_counter);
921
922         rc = verify_handle("6c", llh, plain_counter);
923         if (rc)
924                 GOTO(parse_out, rc);
925
926 parse_out:
927         rc2 = llog_close(env, llh);
928         if (rc2) {
929                 CERROR("6: llog_close failed: rc = %d\n", rc2);
930                 if (rc == 0)
931                         rc = rc2;
932         }
933 nctxt_put:
934         llog_ctxt_put(nctxt);
935 ctxt_release:
936         llog_ctxt_put(ctxt);
937         RETURN(rc);
938 }
939
940 static union {
941         struct llog_rec_hdr             lrh;   /* common header */
942         struct llog_logid_rec           llr;   /* LLOG_LOGID_MAGIC */
943         struct llog_unlink64_rec        lur;   /* MDS_UNLINK64_REC */
944         struct llog_setattr64_rec       lsr64; /* MDS_SETATTR64_REC */
945         struct llog_setattr64_rec_v2    lsr64_v2; /* MDS_SETATTR64_REC */
946         struct llog_size_change_rec     lscr;  /* OST_SZ_REC */
947         struct llog_changelog_rec       lcr;   /* CHANGELOG_REC */
948         struct llog_changelog_user_rec  lcur;  /* CHANGELOG_USER_REC */
949         struct llog_gen_rec             lgr;   /* LLOG_GEN_REC */
950 } llog_records;
951
952 static int test_7_print_cb(const struct lu_env *env, struct llog_handle *llh,
953                            struct llog_rec_hdr *rec, void *data)
954 {
955         struct lu_fid fid = {0};
956
957         logid_to_fid(&llh->lgh_id, &fid);
958
959         CDEBUG(D_OTHER, "record type %#x at index %d in log "DFID"\n",
960                rec->lrh_type, rec->lrh_index, PFID(&fid));
961
962         plain_counter++;
963         return 0;
964 }
965
966 static int test_7_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
967                             struct llog_rec_hdr *rec, void *data)
968 {
969         plain_counter++;
970         /* test LLOG_DEL_RECORD is working */
971         return LLOG_DEL_RECORD;
972 }
973
974 static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
975 {
976         struct llog_handle      *llh;
977         int                      rc = 0, i, process_count;
978         int                      num_recs = 0;
979
980         ENTRY;
981
982         rc = llog_open_create(env, ctxt, &llh, NULL, NULL);
983         if (rc) {
984                 CERROR("7_sub: create log failed\n");
985                 RETURN(rc);
986         }
987
988         rc = llog_init_handle(env, llh,
989                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
990                               &uuid);
991         if (rc) {
992                 CERROR("7_sub: can't init llog handle: %d\n", rc);
993                 GOTO(out_close, rc);
994         }
995         for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
996                 rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX);
997                 if (rc == -ENOSPC) {
998                         break;
999                 } else if (rc < 0) {
1000                         CERROR("7_sub: write recs failed at #%d: %d\n",
1001                                i + 1, rc);
1002                         GOTO(out_close, rc);
1003                 }
1004                 num_recs++;
1005         }
1006         if (rc != -ENOSPC) {
1007                 CWARN("7_sub: write record more than BITMAP size!\n");
1008                 GOTO(out_close, rc = -EINVAL);
1009         }
1010
1011         rc = verify_handle("7_sub", llh, num_recs + 1);
1012         if (rc) {
1013                 CERROR("7_sub: verify handle failed: %d\n", rc);
1014                 GOTO(out_close, rc);
1015         }
1016         if (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1)
1017                 CWARN("7_sub: records are not aligned, written %d from %u\n",
1018                       num_recs, LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
1019
1020         plain_counter = 0;
1021         rc = llog_process(env, llh, test_7_print_cb, "test 7", NULL);
1022         if (rc) {
1023                 CERROR("7_sub: llog process failed: %d\n", rc);
1024                 GOTO(out_close, rc);
1025         }
1026         process_count = plain_counter;
1027         if (process_count != num_recs) {
1028                 CERROR("7_sub: processed %d records from %d total\n",
1029                        process_count, num_recs);
1030                 GOTO(out_close, rc = -EINVAL);
1031         }
1032
1033         plain_counter = 0;
1034         rc = llog_reverse_process(env, llh, test_7_cancel_cb, "test 7", NULL);
1035         if (rc && rc != LLOG_DEL_PLAIN) {
1036                 CERROR("7_sub: reverse llog process failed: %d\n", rc);
1037                 GOTO(out_close, rc);
1038         }
1039         if (process_count != plain_counter) {
1040                 CERROR("7_sub: Reverse/direct processing found different"
1041                        "number of records: %d/%d\n",
1042                        plain_counter, process_count);
1043                 GOTO(out_close, rc = -EINVAL);
1044         }
1045         if (llog_exist(llh)) {
1046                 CERROR("7_sub: llog exists but should be zapped\n");
1047                 GOTO(out_close, rc = -EEXIST);
1048         }
1049
1050         rc = verify_handle("7_sub", llh, 1);
1051 out_close:
1052         if (rc)
1053                 llog_destroy(env, llh);
1054         llog_close(env, llh);
1055         RETURN(rc);
1056 }
1057
1058 /* Test all llog records writing and processing */
1059 static int llog_test_7(const struct lu_env *env, struct obd_device *obd)
1060 {
1061         struct llog_ctxt        *ctxt;
1062         int                      rc;
1063
1064         ENTRY;
1065
1066         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1067
1068         CWARN("7a: test llog_logid_rec\n");
1069         llog_records.llr.lid_hdr.lrh_len = sizeof(llog_records.llr);
1070         llog_records.llr.lid_tail.lrt_len = sizeof(llog_records.llr);
1071         llog_records.llr.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
1072
1073         rc = llog_test_7_sub(env, ctxt);
1074         if (rc) {
1075                 CERROR("7a: llog_logid_rec test failed\n");
1076                 GOTO(out, rc);
1077         }
1078
1079         CWARN("7b: test llog_unlink64_rec\n");
1080         llog_records.lur.lur_hdr.lrh_len = sizeof(llog_records.lur);
1081         llog_records.lur.lur_tail.lrt_len = sizeof(llog_records.lur);
1082         llog_records.lur.lur_hdr.lrh_type = MDS_UNLINK64_REC;
1083
1084         rc = llog_test_7_sub(env, ctxt);
1085         if (rc) {
1086                 CERROR("7b: llog_unlink_rec test failed\n");
1087                 GOTO(out, rc);
1088         }
1089
1090         CWARN("7c: test llog_setattr64_rec\n");
1091         llog_records.lsr64.lsr_hdr.lrh_len = sizeof(llog_records.lsr64);
1092         llog_records.lsr64.lsr_tail.lrt_len = sizeof(llog_records.lsr64);
1093         llog_records.lsr64.lsr_hdr.lrh_type = MDS_SETATTR64_REC;
1094
1095         rc = llog_test_7_sub(env, ctxt);
1096         if (rc) {
1097                 CERROR("7c: llog_setattr64_rec test failed\n");
1098                 GOTO(out, rc);
1099         }
1100
1101         CWARN("7d: test llog_size_change_rec\n");
1102         llog_records.lscr.lsc_hdr.lrh_len = sizeof(llog_records.lscr);
1103         llog_records.lscr.lsc_tail.lrt_len = sizeof(llog_records.lscr);
1104         llog_records.lscr.lsc_hdr.lrh_type = OST_SZ_REC;
1105
1106         rc = llog_test_7_sub(env, ctxt);
1107         if (rc) {
1108                 CERROR("7d: llog_size_change_rec test failed\n");
1109                 GOTO(out, rc);
1110         }
1111
1112         CWARN("7e: test llog_changelog_rec\n");
1113         /* Direct access to cr_do_not_use: peculiar case for this test */
1114         llog_records.lcr.cr_hdr.lrh_len = sizeof(llog_records.lcr);
1115         llog_records.lcr.cr_do_not_use.lrt_len = sizeof(llog_records.lcr);
1116         llog_records.lcr.cr_hdr.lrh_type = CHANGELOG_REC;
1117
1118         rc = llog_test_7_sub(env, ctxt);
1119         if (rc) {
1120                 CERROR("7e: llog_changelog_rec test failed\n");
1121                 GOTO(out, rc);
1122         }
1123
1124         CWARN("7f: test llog_changelog_user_rec\n");
1125         llog_records.lcur.cur_hdr.lrh_len = sizeof(llog_records.lcur);
1126         llog_records.lcur.cur_tail.lrt_len = sizeof(llog_records.lcur);
1127         llog_records.lcur.cur_hdr.lrh_type = CHANGELOG_USER_REC;
1128
1129         rc = llog_test_7_sub(env, ctxt);
1130         if (rc) {
1131                 CERROR("7f: llog_changelog_user_rec test failed\n");
1132                 GOTO(out, rc);
1133         }
1134
1135         CWARN("7g: test llog_gen_rec\n");
1136         llog_records.lgr.lgr_hdr.lrh_len = sizeof(llog_records.lgr);
1137         llog_records.lgr.lgr_tail.lrt_len = sizeof(llog_records.lgr);
1138         llog_records.lgr.lgr_hdr.lrh_type = LLOG_GEN_REC;
1139
1140         rc = llog_test_7_sub(env, ctxt);
1141         if (rc) {
1142                 CERROR("7g: llog_size_change_rec test failed\n");
1143                 GOTO(out, rc);
1144         }
1145
1146         CWARN("7h: test llog_setattr64_rec_v2\n");
1147         llog_records.lsr64.lsr_hdr.lrh_len = sizeof(llog_records.lsr64_v2);
1148         llog_records.lsr64.lsr_tail.lrt_len = sizeof(llog_records.lsr64_v2);
1149         llog_records.lsr64.lsr_hdr.lrh_type = MDS_SETATTR64_REC;
1150
1151         rc = llog_test_7_sub(env, ctxt);
1152         if (rc) {
1153                 CERROR("7h: llog_setattr64_rec_v2 test failed\n");
1154                 GOTO(out, rc);
1155         }
1156 out:
1157         llog_ctxt_put(ctxt);
1158         RETURN(rc);
1159 }
1160
1161 static int llog_truncate(const struct lu_env *env, struct dt_object *o)
1162 {
1163         struct lu_attr           la;
1164         struct thandle          *th;
1165         struct dt_device        *d;
1166         int                      rc;
1167         ENTRY;
1168
1169         LASSERT(o);
1170         d = lu2dt_dev(o->do_lu.lo_dev);
1171         LASSERT(d);
1172
1173         rc = dt_attr_get(env, o, &la);
1174         if (rc)
1175                 RETURN(rc);
1176
1177         CDEBUG(D_OTHER, "original size %llu\n", la.la_size);
1178         rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
1179         if (la.la_size < rc) {
1180                 CERROR("too small llog: %llu\n", la.la_size);
1181                 RETURN(0);
1182         }
1183
1184         /* drop 2 records */
1185         la.la_size = la.la_size - (sizeof(struct llog_mini_rec) * 2);
1186         la.la_valid = LA_SIZE;
1187
1188         th = dt_trans_create(env, d);
1189         if (IS_ERR(th))
1190                 RETURN(PTR_ERR(th));
1191
1192         rc = dt_declare_attr_set(env, o, &la, th);
1193         if (rc)
1194                 GOTO(stop, rc);
1195
1196         rc = dt_declare_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
1197
1198         rc = dt_trans_start_local(env, d, th);
1199         if (rc)
1200                 GOTO(stop, rc);
1201
1202         rc = dt_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
1203         if (rc)
1204                 GOTO(stop, rc);
1205
1206         rc = dt_attr_set(env, o, &la, th);
1207         if (rc)
1208                 GOTO(stop, rc);
1209
1210 stop:
1211         dt_trans_stop(env, d, th);
1212
1213         RETURN(rc);
1214 }
1215
1216 static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
1217                           struct llog_rec_hdr *rec, void *data)
1218 {
1219         plain_counter++;
1220         return 0;
1221 }
1222
1223 static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
1224 {
1225         struct llog_handle      *llh = NULL;
1226         char                     name[10];
1227         int                      rc, rc2, i;
1228         int                      orig_counter;
1229         struct llog_mini_rec     lmr;
1230         struct llog_ctxt        *ctxt;
1231         struct dt_object        *obj = NULL;
1232
1233         ENTRY;
1234
1235         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1236         LASSERT(ctxt);
1237
1238         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
1239         lmr.lmr_hdr.lrh_type = 0xf00f00;
1240
1241         CWARN("8a: fill the first plain llog\n");
1242         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1243         if (rc) {
1244                 CERROR("8a: llog_create with logid failed: %d\n", rc);
1245                 GOTO(out_put, rc);
1246         }
1247
1248         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1249         if (rc) {
1250                 CERROR("8a: can't init llog handle: %d\n", rc);
1251                 GOTO(out, rc);
1252         }
1253
1254         plain_counter = 0;
1255         rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
1256         if (rc != 0) {
1257                 CERROR("5a: process with test_8_cb failed: %d\n", rc);
1258                 GOTO(out, rc);
1259         }
1260         orig_counter = plain_counter;
1261
1262         for (i = 0; i < 100; i++) {
1263                 rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
1264                 if (rc) {
1265                         CERROR("5a: add record failed\n");
1266                         GOTO(out, rc);
1267                 }
1268         }
1269
1270         /* grab the current plain llog, we'll corrupt it later */
1271         obj = llh->u.chd.chd_current_log->lgh_obj;
1272         LASSERT(obj);
1273         lu_object_get(&obj->do_lu);
1274         CWARN("8a: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu)));
1275
1276         rc2 = llog_cat_close(env, llh);
1277         if (rc2) {
1278                 CERROR("8a: close log %s failed: %d\n", name, rc2);
1279                 if (rc == 0)
1280                         rc = rc2;
1281                 GOTO(out_put, rc);
1282         }
1283
1284         CWARN("8b: fill the second plain llog\n");
1285         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1286         if (rc) {
1287                 CERROR("8b: llog_create with logid failed: %d\n", rc);
1288                 GOTO(out_put, rc);
1289         }
1290
1291         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1292         if (rc) {
1293                 CERROR("8b: can't init llog handle: %d\n", rc);
1294                 GOTO(out, rc);
1295         }
1296
1297         for (i = 0; i < 100; i++) {
1298                 rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
1299                 if (rc) {
1300                         CERROR("8b: add record failed\n");
1301                         GOTO(out, rc);
1302                 }
1303         }
1304         CWARN("8b: second llog "DFID"\n",
1305                 PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
1306
1307         rc2 = llog_cat_close(env, llh);
1308         if (rc2) {
1309                 CERROR("8b: close log %s failed: %d\n", name, rc2);
1310                 if (rc == 0)
1311                         rc = rc2;
1312                 GOTO(out_put, rc);
1313         }
1314
1315         CWARN("8c: drop two records from the first plain llog\n");
1316         llog_truncate(env, obj);
1317
1318         CWARN("8d: count survived records\n");
1319         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1320         if (rc) {
1321                 CERROR("8d: llog_create with logid failed: %d\n", rc);
1322                 GOTO(out_put, rc);
1323         }
1324
1325         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1326         if (rc) {
1327                 CERROR("8d: can't init llog handle: %d\n", rc);
1328                 GOTO(out, rc);
1329         }
1330
1331         plain_counter = 0;
1332         rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
1333         if (rc != 0) {
1334                 CERROR("8d: process with test_8_cb failed: %d\n", rc);
1335                 GOTO(out, rc);
1336         }
1337
1338         if (orig_counter + 200 - 2 != plain_counter) {
1339                 CERROR("found %d records (expected %d)\n", plain_counter,
1340                        orig_counter + 200 - 2);
1341                 rc = -EIO;
1342         }
1343
1344 out:
1345         CWARN("8d: close re-opened catalog\n");
1346         rc2 = llog_cat_close(env, llh);
1347         if (rc2) {
1348                 CERROR("8d: close log %s failed: %d\n", name, rc2);
1349                 if (rc == 0)
1350                         rc = rc2;
1351         }
1352 out_put:
1353         llog_ctxt_put(ctxt);
1354
1355         if (obj != NULL)
1356                 dt_object_put(env, obj);
1357
1358         RETURN(rc);
1359 }
1360
1361 static int llog_test_9_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
1362 {
1363         struct llog_handle      *llh;
1364         struct lu_fid            fid;
1365         int                      rc = 0;
1366
1367         ENTRY;
1368
1369         rc = llog_open_create(env, ctxt, &llh, NULL, NULL);
1370         if (rc != 0) {
1371                 CERROR("9_sub: create log failed\n");
1372                 RETURN(rc);
1373         }
1374
1375         rc = llog_init_handle(env, llh,
1376                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
1377                               &uuid);
1378         if (rc != 0) {
1379                 CERROR("9_sub: can't init llog handle: %d\n", rc);
1380                 GOTO(out_close, rc);
1381         }
1382
1383         logid_to_fid(&llh->lgh_id, &fid);
1384         fid_to_logid(&fid, &llog_records.llr.lid_id);
1385         rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX);
1386         if (rc < 0) {
1387                 CERROR("9_sub: write recs failed at #1: %d\n", rc);
1388                 GOTO(out_close, rc);
1389         }
1390         CWARN("9_sub: record type %x in log "DFID_NOBRACE"\n",
1391               llog_records.lrh.lrh_type, PFID(&fid));
1392 out_close:
1393         llog_close(env, llh);
1394         RETURN(rc);
1395 }
1396
1397 /* Prepare different types of llog records for llog_reader test*/
1398 static int llog_test_9(const struct lu_env *env, struct obd_device *obd)
1399 {
1400         struct llog_ctxt        *ctxt;
1401         int                      rc;
1402
1403         ENTRY;
1404
1405         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1406
1407         CWARN("9a: test llog_logid_rec\n");
1408         llog_records.llr.lid_hdr.lrh_len = sizeof(llog_records.llr);
1409         llog_records.llr.lid_tail.lrt_len = sizeof(llog_records.llr);
1410         llog_records.llr.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
1411
1412         rc = llog_test_9_sub(env, ctxt);
1413         if (rc != 0) {
1414                 CERROR("9a: llog_logid_rec test failed\n");
1415                 GOTO(out, rc);
1416         }
1417
1418         CWARN("9b: test llog_obd_cfg_rec\n");
1419         llog_records.lscr.lsc_hdr.lrh_len = sizeof(llog_records.lscr);
1420         llog_records.lscr.lsc_tail.lrt_len = sizeof(llog_records.lscr);
1421         llog_records.lscr.lsc_hdr.lrh_type = OBD_CFG_REC;
1422
1423         rc = llog_test_9_sub(env, ctxt);
1424         if (rc != 0) {
1425                 CERROR("9b: llog_obd_cfg_rec test failed\n");
1426                 GOTO(out, rc);
1427         }
1428
1429         CWARN("9c: test llog_changelog_rec\n");
1430         /* Direct access to cr_do_not_use: peculiar case for this test */
1431         llog_records.lcr.cr_hdr.lrh_len = sizeof(llog_records.lcr);
1432         llog_records.lcr.cr_do_not_use.lrt_len = sizeof(llog_records.lcr);
1433         llog_records.lcr.cr_hdr.lrh_type = CHANGELOG_REC;
1434
1435         rc = llog_test_9_sub(env, ctxt);
1436         if (rc != 0) {
1437                 CERROR("9c: llog_changelog_rec test failed\n");
1438                 GOTO(out, rc);
1439         }
1440
1441         CWARN("9d: test llog_changelog_user_rec\n");
1442         llog_records.lcur.cur_hdr.lrh_len = sizeof(llog_records.lcur);
1443         llog_records.lcur.cur_tail.lrt_len = sizeof(llog_records.lcur);
1444         llog_records.lcur.cur_hdr.lrh_type = CHANGELOG_USER_REC;
1445
1446         rc = llog_test_9_sub(env, ctxt);
1447         if (rc != 0) {
1448                 CERROR("9d: llog_changelog_user_rec test failed\n");
1449                 GOTO(out, rc);
1450         }
1451
1452 out:
1453         llog_ctxt_put(ctxt);
1454         RETURN(rc);
1455 }
1456
1457 /* test catalog wrap around */
1458 static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
1459 {
1460         struct llog_handle      *cath;
1461         char                     name[10];
1462         int                      rc, rc2, i, enospc, eok;
1463         struct llog_mini_rec     lmr;
1464         struct llog_ctxt        *ctxt;
1465         struct lu_attr           la;
1466         __u64                    cat_max_size;
1467         struct dt_device        *dt;
1468
1469         ENTRY;
1470
1471         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1472         LASSERT(ctxt);
1473
1474         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
1475         lmr.lmr_hdr.lrh_type = 0xf00f00;
1476
1477         snprintf(name, sizeof(name), "%x", llog_test_rand + 2);
1478         CWARN("10a: create a catalog log with name: %s\n", name);
1479         rc = llog_open_create(env, ctxt, &cath, NULL, name);
1480         if (rc) {
1481                 CERROR("10a: llog_create with name %s failed: %d\n", name, rc);
1482                 GOTO(ctxt_release, rc);
1483         }
1484         rc = llog_init_handle(env, cath, LLOG_F_IS_CAT, &uuid);
1485         if (rc) {
1486                 CERROR("10a: can't init llog handle: %d\n", rc);
1487                 GOTO(out, rc);
1488         }
1489
1490         cat_logid = cath->lgh_id;
1491         dt = lu2dt_dev(cath->lgh_obj->do_lu.lo_dev);
1492
1493         /* sync device to commit all recent LLOG changes to disk and avoid
1494          * to consume a huge space with delayed journal commit callbacks
1495          * particularly on low memory nodes or VMs */
1496         rc = dt_sync(env, dt);
1497         if (rc) {
1498                 CERROR("10c: sync failed: %d\n", rc);
1499                 GOTO(out, rc);
1500         }
1501
1502         /* force catalog wrap for 5th plain LLOG */
1503         cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS;
1504         cfs_fail_val = 4;
1505
1506         CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM);
1507         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1508                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1509                 if (rc) {
1510                         CERROR("10b: write %d records failed at #%d: %d\n",
1511                                LLOG_TEST_RECNUM, i + 1, rc);
1512                         GOTO(out, rc);
1513                 }
1514         }
1515
1516         /* make sure 2 new plain llog appears in catalog (+1 with hdr) */
1517         rc = verify_handle("10b", cath, 3);
1518         if (rc)
1519                 GOTO(out, rc);
1520
1521         /* sync device to commit all recent LLOG changes to disk and avoid
1522          * to consume a huge space with delayed journal commit callbacks
1523          * particularly on low memory nodes or VMs */
1524         rc = dt_sync(env, dt);
1525         if (rc) {
1526                 CERROR("10b: sync failed: %d\n", rc);
1527                 GOTO(out, rc);
1528         }
1529
1530         CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM);
1531         for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) {
1532                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1533                 if (rc) {
1534                         CERROR("10c: write %d records failed at #%d: %d\n",
1535                                2*LLOG_TEST_RECNUM, i + 1, rc);
1536                         GOTO(out, rc);
1537                 }
1538         }
1539
1540         /* make sure 2 new plain llog appears in catalog (+1 with hdr) */
1541         rc = verify_handle("10c", cath, 5);
1542         if (rc)
1543                 GOTO(out, rc);
1544
1545         /* sync device to commit all recent LLOG changes to disk and avoid
1546          * to consume a huge space with delayed journal commit callbacks
1547          * particularly on low memory nodes or VMs */
1548         rc = dt_sync(env, dt);
1549         if (rc) {
1550                 CERROR("10c: sync failed: %d\n", rc);
1551                 GOTO(out, rc);
1552         }
1553
1554         /* fill last allocated plain LLOG and reach -ENOSPC condition
1555          * because no slot available in Catalog */
1556         enospc = 0;
1557         eok = 0;
1558         CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM);
1559         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1560                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1561                 if (rc && rc != -ENOSPC) {
1562                         CERROR("10c: write %d records failed at #%d: %d\n",
1563                                LLOG_TEST_RECNUM, i + 1, rc);
1564                         GOTO(out, rc);
1565                 }
1566                 /* after last added plain LLOG has filled up, all new
1567                  * records add should fail with -ENOSPC */
1568                 if (rc == -ENOSPC) {
1569                         enospc++;
1570                 } else {
1571                         enospc = 0;
1572                         eok++;
1573                 }
1574         }
1575
1576         if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
1577                 CERROR("10c: all last records adds should have failed with"
1578                        " -ENOSPC\n");
1579                 GOTO(out, rc = -EINVAL);
1580         }
1581
1582         CWARN("10c: wrote %d records then %d failed with ENOSPC\n", eok,
1583               enospc);
1584
1585         /* make sure no new record in Catalog */
1586         rc = verify_handle("10c", cath, 5);
1587         if (rc)
1588                 GOTO(out, rc);
1589
1590         /* Catalog should have reached its max size for test */
1591         rc = dt_attr_get(env, cath->lgh_obj, &la);
1592         if (rc) {
1593                 CERROR("10c: failed to get catalog attrs: %d\n", rc);
1594                 GOTO(out, rc);
1595         }
1596         cat_max_size = la.la_size;
1597
1598         /* cancel all 1st plain llog records to empty it, this will also cause
1599          * its catalog entry to be freed for next forced wrap in 10e */
1600         CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1601         cancel_count = 0;
1602         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1603         if (rc != -LLOG_EEMPTY) {
1604                 CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
1605                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1606                  * not reached */
1607                 if (rc == 0)
1608                         rc = -ERANGE;
1609                 GOTO(out, rc);
1610         }
1611
1612         CWARN("10d: print the catalog entries.. we expect 3\n");
1613         cat_counter = 0;
1614         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1615         if (rc) {
1616                 CERROR("10d: process with cat_print_cb failed: %d\n", rc);
1617                 GOTO(out, rc);
1618         }
1619         if (cat_counter != 3) {
1620                 CERROR("10d: %d entries in catalog\n", cat_counter);
1621                 GOTO(out, rc = -EINVAL);
1622         }
1623
1624         /* verify one down in catalog (+1 with hdr) */
1625         rc = verify_handle("10d", cath, 4);
1626         if (rc)
1627                 GOTO(out, rc);
1628
1629         /* sync device to commit all recent LLOG changes to disk and avoid
1630          * to consume a huge space with delayed journal commit callbacks
1631          * particularly on low memory nodes or VMs */
1632         rc = dt_sync(env, dt);
1633         if (rc) {
1634                 CERROR("10d: sync failed: %d\n", rc);
1635                 GOTO(out, rc);
1636         }
1637
1638         enospc = 0;
1639         eok = 0;
1640         CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM);
1641         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1642                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1643                 if (rc && rc != -ENOSPC) {
1644                         CERROR("10e: write %d records failed at #%d: %d\n",
1645                                LLOG_TEST_RECNUM, i + 1, rc);
1646                         GOTO(out, rc);
1647                 }
1648                 /* after last added plain LLOG has filled up, all new
1649                  * records add should fail with -ENOSPC */
1650                 if (rc == -ENOSPC) {
1651                         enospc++;
1652                 } else {
1653                         enospc = 0;
1654                         eok++;
1655                 }
1656         }
1657
1658         if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
1659                 CERROR("10e: all last records adds should have failed with"
1660                        " -ENOSPC\n");
1661                 GOTO(out, rc = -EINVAL);
1662         }
1663
1664         CWARN("10e: wrote %d records then %d failed with ENOSPC\n", eok,
1665               enospc);
1666
1667         CWARN("10e: print the catalog entries.. we expect 4\n");
1668         cat_counter = 0;
1669         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1670         if (rc) {
1671                 CERROR("10d: process with cat_print_cb failed: %d\n", rc);
1672                 GOTO(out, rc);
1673         }
1674         if (cat_counter != 4) {
1675                 CERROR("10d: %d entries in catalog\n", cat_counter);
1676                 GOTO(out, rc = -EINVAL);
1677         }
1678
1679         /* make sure 1 new plain llog appears in catalog (+1 with hdr) */
1680         rc = verify_handle("10e", cath, 5);
1681         if (rc)
1682                 GOTO(out, rc);
1683
1684         /* verify catalog has wrap around */
1685         if (cath->lgh_last_idx > cath->lgh_hdr->llh_cat_idx) {
1686                 CERROR("10e: catalog failed to wrap around\n");
1687                 GOTO(out, rc = -EINVAL);
1688         }
1689
1690         rc = dt_attr_get(env, cath->lgh_obj, &la);
1691         if (rc) {
1692                 CERROR("10e: failed to get catalog attrs: %d\n", rc);
1693                 GOTO(out, rc);
1694         }
1695
1696         if (la.la_size != cat_max_size) {
1697                 CERROR("10e: catalog size has changed after it has wrap around,"
1698                        " current size = %llu, expected size = %llu\n",
1699                        la.la_size, cat_max_size);
1700                 GOTO(out, rc = -EINVAL);
1701         }
1702         CWARN("10e: catalog successfully wrap around, last_idx %d, first %d\n",
1703               cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1704
1705         /* sync device to commit all recent LLOG changes to disk and avoid
1706          * to consume a huge space with delayed journal commit callbacks
1707          * particularly on low memory nodes or VMs */
1708         rc = dt_sync(env, dt);
1709         if (rc) {
1710                 CERROR("10e: sync failed: %d\n", rc);
1711                 GOTO(out, rc);
1712         }
1713
1714         /* cancel more records to free one more slot in Catalog
1715          * see if it is re-allocated when adding more records */
1716         CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1717         cancel_count = 0;
1718         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1719         if (rc != -LLOG_EEMPTY) {
1720                 CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
1721                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1722                  * not reached */
1723                 if (rc == 0)
1724                         rc = -ERANGE;
1725                 GOTO(out, rc);
1726         }
1727
1728         CWARN("10f: print the catalog entries.. we expect 3\n");
1729         cat_counter = 0;
1730         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1731         if (rc) {
1732                 CERROR("10f: process with cat_print_cb failed: %d\n", rc);
1733                 GOTO(out, rc);
1734         }
1735         if (cat_counter != 3) {
1736                 CERROR("10f: %d entries in catalog\n", cat_counter);
1737                 GOTO(out, rc = -EINVAL);
1738         }
1739
1740         /* verify one down in catalog (+1 with hdr) */
1741         rc = verify_handle("10f", cath, 4);
1742         if (rc)
1743                 GOTO(out, rc);
1744
1745         /* sync device to commit all recent LLOG changes to disk and avoid
1746          * to consume a huge space with delayed journal commit callbacks
1747          * particularly on low memory nodes or VMs */
1748         rc = dt_sync(env, dt);
1749         if (rc) {
1750                 CERROR("10f: sync failed: %d\n", rc);
1751                 GOTO(out, rc);
1752         }
1753
1754         enospc = 0;
1755         eok = 0;
1756         CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM);
1757         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
1758                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
1759                 if (rc && rc != -ENOSPC) {
1760                         CERROR("10f: write %d records failed at #%d: %d\n",
1761                                LLOG_TEST_RECNUM, i + 1, rc);
1762                         GOTO(out, rc);
1763                 }
1764                 /* after last added plain LLOG has filled up, all new
1765                  * records add should fail with -ENOSPC */
1766                 if (rc == -ENOSPC) {
1767                         enospc++;
1768                 } else {
1769                         enospc = 0;
1770                         eok++;
1771                 }
1772         }
1773
1774         if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
1775                 CERROR("10f: all last records adds should have failed with"
1776                        " -ENOSPC\n");
1777                 GOTO(out, rc = -EINVAL);
1778         }
1779
1780         CWARN("10f: wrote %d records then %d failed with ENOSPC\n", eok,
1781               enospc);
1782
1783         /* make sure 1 new plain llog appears in catalog (+1 with hdr) */
1784         rc = verify_handle("10f", cath, 5);
1785         if (rc)
1786                 GOTO(out, rc);
1787
1788         /* verify lgh_last_idx = llh_cat_idx = 2 now */
1789         if (cath->lgh_last_idx != cath->lgh_hdr->llh_cat_idx ||
1790             cath->lgh_last_idx != 2) {
1791                 CERROR("10f: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 2\n",
1792                        cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1793                 GOTO(out, rc = -EINVAL);
1794         }
1795
1796         rc = dt_attr_get(env, cath->lgh_obj, &la);
1797         if (rc) {
1798                 CERROR("10f: failed to get catalog attrs: %d\n", rc);
1799                 GOTO(out, rc);
1800         }
1801
1802         if (la.la_size != cat_max_size) {
1803                 CERROR("10f: catalog size has changed after it has wrap around,"
1804                        " current size = %llu, expected size = %llu\n",
1805                        la.la_size, cat_max_size);
1806                 GOTO(out, rc = -EINVAL);
1807         }
1808
1809         /* sync device to commit all recent LLOG changes to disk and avoid
1810          * to consume a huge space with delayed journal commit callbacks
1811          * particularly on low memory nodes or VMs */
1812         rc = dt_sync(env, dt);
1813         if (rc) {
1814                 CERROR("10f: sync failed: %d\n", rc);
1815                 GOTO(out, rc);
1816         }
1817
1818         /* will llh_cat_idx also successfully wrap ? */
1819
1820         /* cancel all records in the plain LLOGs referenced by 2 last indexes in
1821          * Catalog */
1822
1823         /* cancel more records to free one more slot in Catalog */
1824         CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1825         cancel_count = 0;
1826         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1827         if (rc != -LLOG_EEMPTY) {
1828                 CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
1829                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1830                  * not reached */
1831                 if (rc == 0)
1832                         rc = -ERANGE;
1833                 GOTO(out, rc);
1834         }
1835
1836         CWARN("10g: print the catalog entries.. we expect 3\n");
1837         cat_counter = 0;
1838         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1839         if (rc) {
1840                 CERROR("10g: process with cat_print_cb failed: %d\n", rc);
1841                 GOTO(out, rc);
1842         }
1843         if (cat_counter != 3) {
1844                 CERROR("10g: %d entries in catalog\n", cat_counter);
1845                 GOTO(out, rc = -EINVAL);
1846         }
1847
1848         /* verify one down in catalog (+1 with hdr) */
1849         rc = verify_handle("10g", cath, 4);
1850         if (rc)
1851                 GOTO(out, rc);
1852
1853         /* sync device to commit all recent LLOG changes to disk and avoid
1854          * to consume a huge space with delayed journal commit callbacks
1855          * particularly on low memory nodes or VMs */
1856         rc = dt_sync(env, dt);
1857         if (rc) {
1858                 CERROR("10g: sync failed: %d\n", rc);
1859                 GOTO(out, rc);
1860         }
1861
1862         /* cancel more records to free one more slot in Catalog */
1863         CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1864         cancel_count = 0;
1865         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1866         if (rc != -LLOG_EEMPTY) {
1867                 CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
1868                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1869                  * not reached */
1870                 if (rc == 0)
1871                         rc = -ERANGE;
1872                 GOTO(out, rc);
1873         }
1874
1875         CWARN("10g: print the catalog entries.. we expect 2\n");
1876         cat_counter = 0;
1877         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1878         if (rc) {
1879                 CERROR("10g: process with cat_print_cb failed: %d\n", rc);
1880                 GOTO(out, rc);
1881         }
1882         if (cat_counter != 2) {
1883                 CERROR("10g: %d entries in catalog\n", cat_counter);
1884                 GOTO(out, rc = -EINVAL);
1885         }
1886
1887         /* verify one down in catalog (+1 with hdr) */
1888         rc = verify_handle("10g", cath, 3);
1889         if (rc)
1890                 GOTO(out, rc);
1891
1892         /* verify lgh_last_idx = 2 and llh_cat_idx = 0 now */
1893         if (cath->lgh_hdr->llh_cat_idx != 0 ||
1894             cath->lgh_last_idx != 2) {
1895                 CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 0\n",
1896                        cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1897                 GOTO(out, rc = -EINVAL);
1898         }
1899
1900         /* sync device to commit all recent LLOG changes to disk and avoid
1901          * to consume a huge space with delayed journal commit callbacks
1902          * particularly on low memory nodes or VMs */
1903         rc = dt_sync(env, dt);
1904         if (rc) {
1905                 CERROR("10g: sync failed: %d\n", rc);
1906                 GOTO(out, rc);
1907         }
1908
1909         /* cancel more records to free one more slot in Catalog */
1910         CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
1911         cancel_count = 0;
1912         rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
1913         if (rc != -LLOG_EEMPTY) {
1914                 CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
1915                 /* need to indicate error if for any reason LLOG_TEST_RECNUM is
1916                  * not reached */
1917                 if (rc == 0)
1918                         rc = -ERANGE;
1919                 GOTO(out, rc);
1920         }
1921
1922         CWARN("10g: print the catalog entries.. we expect 1\n");
1923         cat_counter = 0;
1924         rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
1925         if (rc) {
1926                 CERROR("10g: process with cat_print_cb failed: %d\n", rc);
1927                 GOTO(out, rc);
1928         }
1929         if (cat_counter != 1) {
1930                 CERROR("10g: %d entries in catalog\n", cat_counter);
1931                 GOTO(out, rc = -EINVAL);
1932         }
1933
1934         /* verify one down in catalog (+1 with hdr) */
1935         rc = verify_handle("10g", cath, 2);
1936         if (rc)
1937                 GOTO(out, rc);
1938
1939         /* verify lgh_last_idx = 2 and llh_cat_idx = 1 now */
1940         if (cath->lgh_hdr->llh_cat_idx != 1 ||
1941             cath->lgh_last_idx != 2) {
1942                 CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 1\n",
1943                        cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
1944                 GOTO(out, rc = -EINVAL);
1945         }
1946
1947         CWARN("10g: llh_cat_idx has also successfully wrapped!\n");
1948
1949 out:
1950         cfs_fail_loc = 0;
1951         cfs_fail_val = 0;
1952
1953         CWARN("10: put newly-created catalog\n");
1954         rc2 = llog_cat_close(env, cath);
1955         if (rc2) {
1956                 CERROR("10: close log %s failed: %d\n", name, rc2);
1957                 if (rc == 0)
1958                         rc = rc2;
1959         }
1960 ctxt_release:
1961         llog_ctxt_put(ctxt);
1962         RETURN(rc);
1963 }
1964
1965 /* -------------------------------------------------------------------------
1966  * Tests above, boring obd functions below
1967  * ------------------------------------------------------------------------- */
1968 static int llog_run_tests(const struct lu_env *env, struct obd_device *obd)
1969 {
1970         struct llog_handle      *llh = NULL;
1971         struct llog_ctxt        *ctxt;
1972         int                      rc, err;
1973         char                     name[10];
1974
1975         ENTRY;
1976         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1977         LASSERT(ctxt);
1978
1979         sprintf(name, "%x", llog_test_rand);
1980
1981         rc = llog_test_1(env, obd, name);
1982         if (rc)
1983                 GOTO(cleanup_ctxt, rc);
1984
1985         rc = llog_test_2(env, obd, name, &llh);
1986         if (rc)
1987                 GOTO(cleanup_ctxt, rc);
1988
1989         rc = llog_test_3(env, obd, llh);
1990         if (rc)
1991                 GOTO(cleanup, rc);
1992
1993         rc = llog_test_4(env, obd);
1994         if (rc)
1995                 GOTO(cleanup, rc);
1996
1997         rc = llog_test_5(env, obd);
1998         if (rc)
1999                 GOTO(cleanup, rc);
2000
2001         rc = llog_test_6(env, obd, name);
2002         if (rc)
2003                 GOTO(cleanup, rc);
2004
2005         rc = llog_test_7(env, obd);
2006         if (rc)
2007                 GOTO(cleanup, rc);
2008
2009         rc = llog_test_8(env, obd);
2010         if (rc)
2011                 GOTO(cleanup, rc);
2012
2013         rc = llog_test_9(env, obd);
2014         if (rc != 0)
2015                 GOTO(cleanup, rc);
2016
2017         rc = llog_test_10(env, obd);
2018         if (rc)
2019                 GOTO(cleanup, rc);
2020
2021 cleanup:
2022         err = llog_destroy(env, llh);
2023         if (err)
2024                 CERROR("cleanup: llog_destroy failed: %d\n", err);
2025         llog_close(env, llh);
2026         if (rc == 0)
2027                 rc = err;
2028 cleanup_ctxt:
2029         llog_ctxt_put(ctxt);
2030         return rc;
2031 }
2032
2033 static int llog_test_cleanup(struct obd_device *obd)
2034 {
2035         struct obd_device       *tgt;
2036         struct lu_env            env;
2037         int                      rc;
2038
2039         ENTRY;
2040
2041         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
2042         if (rc)
2043                 RETURN(rc);
2044
2045         tgt = obd->obd_lvfs_ctxt.dt->dd_lu_dev.ld_obd;
2046         rc = llog_cleanup(&env, llog_get_context(tgt, LLOG_TEST_ORIG_CTXT));
2047         if (rc)
2048                 CERROR("failed to llog_test_llog_finish: %d\n", rc);
2049         lu_env_fini(&env);
2050         RETURN(rc);
2051 }
2052
2053 static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2054 {
2055         struct obd_device       *tgt;
2056         struct llog_ctxt        *ctxt;
2057         struct dt_object        *o;
2058         struct lu_env            env;
2059         struct lu_context        test_session;
2060         int                      rc;
2061
2062         ENTRY;
2063
2064         if (lcfg->lcfg_bufcount < 2) {
2065                 CERROR("requires a TARGET OBD name\n");
2066                 RETURN(-EINVAL);
2067         }
2068
2069         if (lcfg->lcfg_buflens[1] < 1) {
2070                 CERROR("requires a TARGET OBD name\n");
2071                 RETURN(-EINVAL);
2072         }
2073
2074         /* disk obd */
2075         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2076         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2077                 CERROR("target device not attached or not set up (%s)\n",
2078                        lustre_cfg_string(lcfg, 1));
2079                 RETURN(-EINVAL);
2080         }
2081
2082         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
2083         if (rc)
2084                 RETURN(rc);
2085
2086         rc = lu_context_init(&test_session, LCT_SERVER_SESSION);
2087         if (rc)
2088                 GOTO(cleanup_env, rc);
2089         test_session.lc_thread = (struct ptlrpc_thread *)current;
2090         lu_context_enter(&test_session);
2091         env.le_ses = &test_session;
2092
2093         CWARN("Setup llog-test device over %s device\n",
2094               lustre_cfg_string(lcfg, 1));
2095
2096         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
2097         obd->obd_lvfs_ctxt.dt = lu2dt_dev(tgt->obd_lu_dev);
2098
2099         rc = llog_setup(&env, tgt, &tgt->obd_olg, LLOG_TEST_ORIG_CTXT, tgt,
2100                         &llog_osd_ops);
2101         if (rc)
2102                 GOTO(cleanup_session, rc);
2103
2104         /* use MGS llog dir for tests */
2105         ctxt = llog_get_context(tgt, LLOG_CONFIG_ORIG_CTXT);
2106         LASSERT(ctxt);
2107         o = ctxt->loc_dir;
2108         llog_ctxt_put(ctxt);
2109
2110         ctxt = llog_get_context(tgt, LLOG_TEST_ORIG_CTXT);
2111         LASSERT(ctxt);
2112         ctxt->loc_dir = o;
2113         llog_ctxt_put(ctxt);
2114
2115         llog_test_rand = cfs_rand();
2116
2117         rc = llog_run_tests(&env, tgt);
2118         if (rc)
2119                 llog_test_cleanup(obd);
2120 cleanup_session:
2121         lu_context_exit(&test_session);
2122         lu_context_fini(&test_session);
2123 cleanup_env:
2124         lu_env_fini(&env);
2125         RETURN(rc);
2126 }
2127
2128 static struct obd_ops llog_obd_ops = {
2129         .o_owner       = THIS_MODULE,
2130         .o_setup       = llog_test_setup,
2131         .o_cleanup     = llog_test_cleanup,
2132 };
2133
2134 static int __init llog_test_init(void)
2135 {
2136         return class_register_type(&llog_obd_ops, NULL, true, NULL,
2137                                    "llog_test", NULL);
2138 }
2139
2140 static void __exit llog_test_exit(void)
2141 {
2142         class_unregister_type("llog_test");
2143 }
2144
2145 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2146 MODULE_DESCRIPTION("Lustre Log test module");
2147 MODULE_VERSION(LUSTRE_VERSION_STRING);
2148 MODULE_LICENSE("GPL");
2149
2150 module_init(llog_test_init);
2151 module_exit(llog_test_exit);