Whamcloud - gitweb
259c83b9cd2a93ff1ed86f705e81aff6b260c723
[fs/lustre-release.git] / lustre / obdclass / llog_test.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog_test.c
37  *
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/module.h>
45 #include <linux/init.h>
46
47 #include <obd_class.h>
48 #include <lustre_fid.h>
49 #include <lustre_log.h>
50
51 /* This is slightly more than the number of records that can fit into a
52  * single llog file, because the llog_log_header takes up some of the
53  * space in the first block that cannot be used for the bitmap. */
54 #define LLOG_TEST_RECNUM  (LLOG_MIN_CHUNK_SIZE * 8)
55
56 static int llog_test_rand;
57 static struct obd_uuid uuid = { .uuid = "test_uuid" };
58 static struct llog_logid cat_logid;
59
60 struct llog_mini_rec {
61         struct llog_rec_hdr     lmr_hdr;
62         struct llog_rec_tail    lmr_tail;
63 } __attribute__((packed));
64
65 static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
66 {
67         int i;
68         int last_idx = 0;
69         int active_recs = 0;
70
71         for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
72                 if (ext2_test_bit(i, LLOG_HDR_BITMAP(llh->lgh_hdr))) {
73                         last_idx = i;
74                         active_recs++;
75                 }
76         }
77
78         if (active_recs != num_recs) {
79                 CERROR("%s: expected %d active recs after write, found %d\n",
80                        test, num_recs, active_recs);
81                 RETURN(-ERANGE);
82         }
83
84         if (llh->lgh_hdr->llh_count != num_recs) {
85                 CERROR("%s: handle->count is %d, expected %d after write\n",
86                        test, llh->lgh_hdr->llh_count, num_recs);
87                 RETURN(-ERANGE);
88         }
89
90         if (llh->lgh_last_idx < last_idx) {
91                 CERROR("%s: handle->last_idx is %d, expected %d after write\n",
92                        test, llh->lgh_last_idx, last_idx);
93                 RETURN(-ERANGE);
94         }
95
96         RETURN(0);
97 }
98
99 /* Test named-log create/open, close */
100 static int llog_test_1(const struct lu_env *env,
101                        struct obd_device *obd, char *name)
102 {
103         struct llog_handle      *llh;
104         struct llog_ctxt        *ctxt;
105         int rc;
106         int rc2;
107
108         ENTRY;
109
110         CWARN("1a: create a log with name: %s\n", name);
111         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
112         LASSERT(ctxt);
113
114         rc = llog_open_create(env, ctxt, &llh, NULL, name);
115         if (rc) {
116                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
117                 GOTO(out, rc);
118         }
119         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, &uuid);
120         if (rc) {
121                 CERROR("1a: can't init llog handle: %d\n", rc);
122                 GOTO(out_close, rc);
123         }
124
125         rc = verify_handle("1", llh, 1);
126
127         CWARN("1b: close newly-created log\n");
128 out_close:
129         rc2 = llog_close(env, llh);
130         if (rc2) {
131                 CERROR("1b: close log %s failed: %d\n", name, rc2);
132                 if (rc == 0)
133                         rc = rc2;
134         }
135 out:
136         llog_ctxt_put(ctxt);
137         RETURN(rc);
138 }
139
140 /* Test named-log reopen; returns opened log on success */
141 static int llog_test_2(const struct lu_env *env, struct obd_device *obd,
142                        char *name, struct llog_handle **llh)
143 {
144         struct llog_ctxt        *ctxt;
145         struct llog_handle      *loghandle;
146         struct llog_logid        logid;
147         int                      rc;
148
149         ENTRY;
150
151         CWARN("2a: re-open a log with name: %s\n", name);
152         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
153         LASSERT(ctxt);
154
155         rc = llog_open(env, ctxt, llh, NULL, name, LLOG_OPEN_EXISTS);
156         if (rc) {
157                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
158                 GOTO(out_put, rc);
159         }
160
161         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &uuid);
162         if (rc) {
163                 CERROR("2a: can't init llog handle: %d\n", rc);
164                 GOTO(out_close_llh, rc);
165         }
166
167         rc = verify_handle("2", *llh, 1);
168         if (rc)
169                 GOTO(out_close_llh, rc);
170
171         /* XXX: there is known issue with tests 2b, MGS is not able to create
172          * anonymous llog, exit now to allow following tests run.
173          * It is fixed in upcoming llog over OSD code */
174         GOTO(out_put, rc);
175
176         CWARN("2b: create a log without specified NAME & LOGID\n");
177         rc = llog_open_create(env, ctxt, &loghandle, NULL, NULL);
178         if (rc) {
179                 CERROR("2b: create log failed\n");
180                 GOTO(out_close_llh, rc);
181         }
182         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, &uuid);
183         if (rc) {
184                 CERROR("2b: can't init llog handle: %d\n", rc);
185                 GOTO(out_close, rc);
186         }
187
188         logid = loghandle->lgh_id;
189         llog_close(env, loghandle);
190
191         CWARN("2c: re-open the log by LOGID\n");
192         rc = llog_open(env, ctxt, &loghandle, &logid, NULL, LLOG_OPEN_EXISTS);
193         if (rc) {
194                 CERROR("2c: re-open log by LOGID failed\n");
195                 GOTO(out_close_llh, rc);
196         }
197
198         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, &uuid);
199         if (rc) {
200                 CERROR("2c: can't init llog handle: %d\n", rc);
201                 GOTO(out_close, rc);
202         }
203
204         CWARN("2b: destroy this log\n");
205         rc = llog_destroy(env, loghandle);
206         if (rc)
207                 CERROR("2d: destroy log failed\n");
208 out_close:
209         llog_close(env, loghandle);
210 out_close_llh:
211         if (rc)
212                 llog_close(env, *llh);
213 out_put:
214         llog_ctxt_put(ctxt);
215
216         RETURN(rc);
217 }
218
219 static int records;
220 static off_t rec_offset;
221 static int paddings;
222 static int start_idx;
223
224 /*
225  * Test 3 callback.
226  * - check lgh_cur_offset correctness
227  * - check record index consistency
228  * - modify each record in-place
229  * - add new record during *last_idx processing
230  */
231 static int test3_check_n_add_cb(const struct lu_env *env,
232                                 struct llog_handle *lgh,
233                                 struct llog_rec_hdr *rec, void *data)
234 {
235         struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
236         int *last_rec = data;
237         int rc;
238
239         if (lgh->lgh_hdr->llh_size > 0) {
240                 if (lgh->lgh_cur_offset != lgh->lgh_hdr->llh_hdr.lrh_len +
241                                 (start_idx + records - 1) *
242                                 lgh->lgh_hdr->llh_size)
243                         CERROR("Wrong record offset in cur_off: "LPU64", should"
244                                " be %u\n", lgh->lgh_cur_offset,
245                                lgh->lgh_hdr->llh_hdr.lrh_len +
246                                (start_idx + records - 1) *
247                                lgh->lgh_hdr->llh_size);
248         } else {
249                 size_t chunk_size = lgh->lgh_hdr->llh_hdr.lrh_len;
250
251                 /* For variable size records the start offset is unknown, trust
252                  * the first value and check others are consistent with it. */
253                 if (rec_offset == 0)
254                         rec_offset = lgh->lgh_cur_offset;
255
256                 if (lgh->lgh_cur_offset != rec_offset) {
257                         /* there can be padding record */
258                         if ((lgh->lgh_cur_offset % chunk_size == 0) &&
259                             (lgh->lgh_cur_offset - rec_offset <
260                              rec->lrh_len + LLOG_MIN_REC_SIZE)) {
261                                 rec_offset = lgh->lgh_cur_offset;
262                                 paddings++;
263                         } else {
264                                 CERROR("Wrong record offset in cur_off: "LPU64
265                                        ", should be %lld (rec len %u)\n",
266                                        lgh->lgh_cur_offset,
267                                        (long long)rec_offset, rec->lrh_len);
268                         }
269                 }
270                 rec_offset += rec->lrh_len;
271         }
272
273         if ((start_idx + records + paddings) != rec->lrh_index)
274                 CERROR("Record with wrong index was read: %u, expected %u\n",
275                        rec->lrh_index, start_idx + records + paddings);
276
277         /* modify all records in place */
278         lgr->lgr_gen.conn_cnt = rec->lrh_index;
279         rc = llog_write(env, lgh, rec, rec->lrh_index);
280         if (rc < 0)
281                 CERROR("cb_test_3: cannot modify record while processing\n");
282
283         /* Add new record to the llog at *last_rec position one by one to
284          * check that last block is re-read during processing */
285         if ((start_idx + records + paddings) == *last_rec ||
286             (start_idx + records + paddings) == (*last_rec + 1)) {
287                 rc = llog_write(env, lgh, rec, LLOG_NEXT_IDX);
288                 if (rc < 0)
289                         CERROR("cb_test_3: cannot add new record while "
290                                "processing\n");
291         }
292         records++;
293
294         return rc;
295 }
296
297 /* Check in-place modifications were done for all records*/
298 static int test3_check_cb(const struct lu_env *env, struct llog_handle *lgh,
299                           struct llog_rec_hdr *rec, void *data)
300 {
301         struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
302
303         if (lgr->lgr_gen.conn_cnt != rec->lrh_index) {
304                 CERROR("cb_test_3: record %u is not modified\n",
305                        rec->lrh_index);
306                 return -EINVAL;
307         }
308         records++;
309         return 0;
310 }
311
312 static int llog_test3_process(const struct lu_env *env,
313                               struct llog_handle *lgh,
314                               llog_cb_t cb, int start)
315 {
316         struct llog_process_cat_data cd;
317         int last_idx; /* new record will be injected here */
318         int rc = 0;
319
320         CWARN("test3: processing records from index %d to the end\n",
321               start);
322         cd.lpcd_first_idx = start - 1;
323         cd.lpcd_last_idx = 0;
324         records = paddings = 0;
325         last_idx = lgh->lgh_last_idx;
326         rc = llog_process(env, lgh, cb, &last_idx, &cd);
327         if (rc < 0)
328                 return rc;
329         CWARN("test3: total %u records processed with %u paddings\n",
330               records, paddings);
331         return records;
332 }
333
334 /* Test plain llog functionality */
335 static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
336                        struct llog_handle *llh)
337 {
338         char buf[128];
339         struct llog_rec_hdr *hdr = (void *)buf;
340         int rc, i;
341         int num_recs = 1; /* 1 for the header */
342         int expected;
343
344         ENTRY;
345
346         hdr->lrh_len = sizeof(struct llog_gen_rec);
347         hdr->lrh_type = LLOG_GEN_REC;
348         llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec);
349
350         /* Fill the llog with 64-bytes records, use 1023 records,
351          * so last chunk will be partially full. Don't change this
352          * value until record size is changed.
353          */
354         CWARN("3a: write 1023 fixed-size llog records\n");
355         for (i = 0; i < 1023; i++) {
356                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
357                 if (rc < 0) {
358                         CERROR("3a: write 1023 records failed at #%d: %d\n",
359                                i + 1, rc);
360                         RETURN(rc);
361                 }
362                 num_recs++;
363         }
364
365         rc = verify_handle("3a", llh, num_recs);
366         if (rc)
367                 RETURN(rc);
368
369         /*
370          * Test fixed-size records processing:
371          * - search the needed index
372          * - go through all records from that index
373          * - check all indices are growing monotonically and exist
374          * - modify each record
375          *
376          * NB: test3_check_n_add adds two new records while processing
377          * after last record. There were 1023 records created so the last chunk
378          * misses exactly one record. Therefore one of new records will be
379          * the last in the current chunk and second causes the new chunk to be
380          * created.
381          */
382         rec_offset = 0;
383         start_idx = 501;
384         expected = 525;
385         rc = llog_test3_process(env, llh, test3_check_n_add_cb, start_idx);
386         if (rc < 0)
387                 RETURN(rc);
388
389         /* extra record is created during llog_process() */
390         if (rc != expected) {
391                 CERROR("3a: process total %d records but expect %d\n",
392                        rc, expected);
393                 RETURN(-ERANGE);
394         }
395
396         num_recs += 2;
397
398         /* test modification in place */
399         rc = llog_test3_process(env, llh, test3_check_cb, start_idx);
400         if (rc < 0)
401                 RETURN(rc);
402
403         if (rc != expected) {
404                 CERROR("3a: process total %d records but expect %d\n",
405                        rc, expected);
406                 RETURN(-ERANGE);
407         }
408
409         CWARN("3b: write 566 variable size llog records\n");
410
411         /* Drop llh_size to 0 to mark llog as variable-size and write
412          * header to make this change permanent. */
413         llh->lgh_hdr->llh_size = 0;
414         llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX);
415
416         hdr->lrh_type = OBD_CFG_REC;
417
418         /* there are 1025 64-bytes records in llog already,
419          * the last chunk contains single record, i.e. 64 bytes.
420          * Each pair of variable size records is 200 bytes, so
421          * we will have the following distribution per chunks:
422          * block 1: 64 + 80(80/120) + 80 + 48(pad) = 81 iterations
423          * block 2: 80(120/80) + 120 + 72(pad) = 81 itereations
424          * block 3: 80(80/120) + 80 + 112(pad) = 81 iterations
425          * -- the same as block 2 again and so on.
426          * block 7: 80(80/120) = 80 iterations and 192 bytes remain
427          * Total 6 * 81 + 80 = 566 itereations.
428          * Callback will add another 120 bytes in the end of the last chunk
429          * and another 120 bytes will cause padding (72 bytes) plus 120
430          * bytes in the new block.
431          */
432         for (i = 0; i < 566; i++) {
433                 if ((i % 2) == 0)
434                         hdr->lrh_len = 80;
435                 else
436                         hdr->lrh_len = 120;
437
438                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
439                 if (rc < 0) {
440                         CERROR("3a: write 566 records failed at #%d: %d\n",
441                                i + 1, rc);
442                         RETURN(rc);
443                 }
444                 num_recs++;
445         }
446
447         rc = verify_handle("3b", llh, num_recs);
448         if (rc)
449                 RETURN(rc);
450
451         start_idx = 1026;
452         expected = 568;
453         rc = llog_test3_process(env, llh, test3_check_n_add_cb, start_idx);
454         if (rc < 0)
455                 RETURN(rc);
456
457         if (rc != expected) {
458                 CERROR("3b: process total %d records but expect %d\n",
459                        rc, expected);
460                 RETURN(-ERANGE);
461         }
462
463         num_recs += 2;
464
465         /* test modification in place */
466         rc = llog_test3_process(env, llh, test3_check_cb, start_idx);
467         if (rc < 0)
468                 RETURN(rc);
469
470         if (rc != expected) {
471                 CERROR("3b: process total %d records but expect %d\n",
472                        rc, expected);
473                 RETURN(-ERANGE);
474         }
475
476         CWARN("3c: write records with variable size until BITMAP_SIZE, "
477               "return -ENOSPC\n");
478         while (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
479                 if ((num_recs % 2) == 0)
480                         hdr->lrh_len = 80;
481                 else
482                         hdr->lrh_len = 128;
483
484                 rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
485                 if (rc == -ENOSPC) {
486                         break;
487                 } else if (rc < 0) {
488                         CERROR("3c: write recs failed at #%d: %d\n",
489                                num_recs, rc);
490                         RETURN(rc);
491                 }
492                 num_recs++;
493         }
494
495         if (rc != -ENOSPC) {
496                 CWARN("3c: write record more than BITMAP size!\n");
497                 RETURN(-EINVAL);
498         }
499         CWARN("3c: wrote %d more records before end of llog is reached\n",
500               num_recs);
501
502         rc = verify_handle("3d", llh, num_recs);
503
504         RETURN(rc);
505 }
506
507 /* Test catalogue additions */
508 static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
509 {
510         struct llog_handle      *cath;
511         char                     name[10];
512         int                      rc, rc2, i, buflen;
513         struct llog_mini_rec     lmr;
514         struct llog_cookie       cookie;
515         struct llog_ctxt        *ctxt;
516         int                      num_recs = 0;
517         char                    *buf;
518         struct llog_rec_hdr     *rec;
519
520         ENTRY;
521
522         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
523         LASSERT(ctxt);
524
525         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
526         lmr.lmr_hdr.lrh_type = 0xf00f00;
527
528         sprintf(name, "%x", llog_test_rand + 1);
529         CWARN("4a: create a catalog log with name: %s\n", name);
530         rc = llog_open_create(env, ctxt, &cath, NULL, name);
531         if (rc) {
532                 CERROR("4a: llog_create with name %s failed: %d\n", name, rc);
533                 GOTO(ctxt_release, rc);
534         }
535         rc = llog_init_handle(env, cath, LLOG_F_IS_CAT, &uuid);
536         if (rc) {
537                 CERROR("4a: can't init llog handle: %d\n", rc);
538                 GOTO(out, rc);
539         }
540
541         num_recs++;
542         cat_logid = cath->lgh_id;
543
544         CWARN("4b: write 1 record into the catalog\n");
545         rc = llog_cat_add(env, cath, &lmr.lmr_hdr, &cookie);
546         if (rc != 1) {
547                 CERROR("4b: write 1 catalog record failed at: %d\n", rc);
548                 GOTO(out, rc);
549         }
550         num_recs++;
551         rc = verify_handle("4b", cath, 2);
552         if (rc)
553                 GOTO(out, rc);
554
555         rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs);
556         if (rc)
557                 GOTO(out, rc);
558
559         CWARN("4c: cancel 1 log record\n");
560         rc = llog_cat_cancel_records(env, cath, 1, &cookie);
561         if (rc) {
562                 CERROR("4c: cancel 1 catalog based record failed: %d\n", rc);
563                 GOTO(out, rc);
564         }
565         num_recs--;
566
567         rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs);
568         if (rc)
569                 GOTO(out, rc);
570
571         CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM);
572         for (i = 0; i < LLOG_TEST_RECNUM; i++) {
573                 rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
574                 if (rc) {
575                         CERROR("4d: write %d records failed at #%d: %d\n",
576                                LLOG_TEST_RECNUM, i + 1, rc);
577                         GOTO(out, rc);
578                 }
579                 num_recs++;
580         }
581
582         /* make sure new plain llog appears */
583         rc = verify_handle("4d", cath, 3);
584         if (rc)
585                 GOTO(out, rc);
586
587         CWARN("4e: add 5 large records, one record per block\n");
588         buflen = LLOG_MIN_CHUNK_SIZE;
589         OBD_ALLOC(buf, buflen);
590         if (buf == NULL)
591                 GOTO(out, rc = -ENOMEM);
592         for (i = 0; i < 5; i++) {
593                 rec = (void *)buf;
594                 rec->lrh_len = buflen;
595                 rec->lrh_type = OBD_CFG_REC;
596                 rc = llog_cat_add(env, cath, rec, NULL);
597                 if (rc) {
598                         CERROR("4e: write 5 records failed at #%d: %d\n",
599                                i + 1, rc);
600                         GOTO(out_free, rc);
601                 }
602                 num_recs++;
603         }
604 out_free:
605         OBD_FREE(buf, buflen);
606 out:
607         CWARN("4f: put newly-created catalog\n");
608         rc2 = llog_cat_close(env, cath);
609         if (rc2) {
610                 CERROR("4: close log %s failed: %d\n", name, rc2);
611                 if (rc == 0)
612                         rc = rc2;
613         }
614 ctxt_release:
615         llog_ctxt_put(ctxt);
616         RETURN(rc);
617 }
618
619 static int cat_counter;
620
621 static int cat_print_cb(const struct lu_env *env, struct llog_handle *llh,
622                         struct llog_rec_hdr *rec, void *data)
623 {
624         struct llog_logid_rec   *lir = (struct llog_logid_rec *)rec;
625         struct lu_fid            fid = {0};
626
627         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
628                 CERROR("invalid record in catalog\n");
629                 RETURN(-EINVAL);
630         }
631
632         logid_to_fid(&lir->lid_id, &fid);
633
634         CWARN("seeing record at index %d - "DFID" in log "DFID"\n",
635               rec->lrh_index, PFID(&fid),
636               PFID(lu_object_fid(&llh->lgh_obj->do_lu)));
637
638         cat_counter++;
639
640         RETURN(0);
641 }
642
643 static int plain_counter;
644
645 static int plain_print_cb(const struct lu_env *env, struct llog_handle *llh,
646                           struct llog_rec_hdr *rec, void *data)
647 {
648         struct lu_fid fid = {0};
649
650         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
651                 CERROR("log is not plain\n");
652                 RETURN(-EINVAL);
653         }
654
655         logid_to_fid(&llh->lgh_id, &fid);
656
657         CDEBUG(D_INFO, "seeing record at index %d in log "DFID"\n",
658                rec->lrh_index, PFID(&fid));
659
660         plain_counter++;
661
662         RETURN(0);
663 }
664
665 static int cancel_count;
666
667 static int llog_cancel_rec_cb(const struct lu_env *env,
668                               struct llog_handle *llh,
669                               struct llog_rec_hdr *rec, void *data)
670 {
671         struct llog_cookie cookie;
672
673         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
674                 CERROR("log is not plain\n");
675                 RETURN(-EINVAL);
676         }
677
678         cookie.lgc_lgl = llh->lgh_id;
679         cookie.lgc_index = rec->lrh_index;
680
681         llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie);
682         cancel_count++;
683         if (cancel_count == LLOG_TEST_RECNUM)
684                 RETURN(-LLOG_EEMPTY);
685         RETURN(0);
686 }
687
688 /* Test log and catalogue processing */
689 static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
690 {
691         struct llog_handle      *llh = NULL;
692         char                     name[10];
693         int                      rc, rc2;
694         struct llog_mini_rec     lmr;
695         struct llog_ctxt        *ctxt;
696
697         ENTRY;
698
699         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
700         LASSERT(ctxt);
701
702         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
703         lmr.lmr_hdr.lrh_type = 0xf00f00;
704
705         CWARN("5a: re-open catalog by id\n");
706         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
707         if (rc) {
708                 CERROR("5a: llog_create with logid failed: %d\n", rc);
709                 GOTO(out_put, rc);
710         }
711
712         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
713         if (rc) {
714                 CERROR("5a: can't init llog handle: %d\n", rc);
715                 GOTO(out, rc);
716         }
717
718         CWARN("5b: print the catalog entries.. we expect 2\n");
719         cat_counter = 0;
720         rc = llog_process(env, llh, cat_print_cb, "test 5", NULL);
721         if (rc) {
722                 CERROR("5b: process with cat_print_cb failed: %d\n", rc);
723                 GOTO(out, rc);
724         }
725         if (cat_counter != 2) {
726                 CERROR("5b: %d entries in catalog\n", cat_counter);
727                 GOTO(out, rc = -EINVAL);
728         }
729
730         CWARN("5c: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
731         cancel_count = 0;
732         rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0);
733         if (rc != -LLOG_EEMPTY) {
734                 CERROR("5c: process with llog_cancel_rec_cb failed: %d\n", rc);
735                 GOTO(out, rc);
736         }
737
738         CWARN("5c: print the catalog entries.. we expect 1\n");
739         cat_counter = 0;
740         rc = llog_process(env, llh, cat_print_cb, "test 5", NULL);
741         if (rc) {
742                 CERROR("5c: process with cat_print_cb failed: %d\n", rc);
743                 GOTO(out, rc);
744         }
745         if (cat_counter != 1) {
746                 CERROR("5c: %d entries in catalog\n", cat_counter);
747                 GOTO(out, rc = -EINVAL);
748         }
749
750         CWARN("5d: add 1 record to the log with many canceled empty pages\n");
751         rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
752         if (rc) {
753                 CERROR("5d: add record to the log with many canceled empty "
754                        "pages failed\n");
755                 GOTO(out, rc);
756         }
757
758         CWARN("5e: print plain log entries.. expect 6\n");
759         plain_counter = 0;
760         rc = llog_cat_process(env, llh, plain_print_cb, "foobar", 0, 0);
761         if (rc) {
762                 CERROR("5e: process with plain_print_cb failed: %d\n", rc);
763                 GOTO(out, rc);
764         }
765         if (plain_counter != 6) {
766                 CERROR("5e: found %d records\n", plain_counter);
767                 GOTO(out, rc = -EINVAL);
768         }
769
770         CWARN("5f: print plain log entries reversely.. expect 6\n");
771         plain_counter = 0;
772         rc = llog_cat_reverse_process(env, llh, plain_print_cb, "foobar");
773         if (rc) {
774                 CERROR("5f: reversely process with plain_print_cb failed: "
775                        "%d\n", rc);
776                 GOTO(out, rc);
777         }
778         if (plain_counter != 6) {
779                 CERROR("5f: found %d records\n", plain_counter);
780                 GOTO(out, rc = -EINVAL);
781         }
782
783 out:
784         CWARN("5g: close re-opened catalog\n");
785         rc2 = llog_cat_close(env, llh);
786         if (rc2) {
787                 CERROR("5g: close log %s failed: %d\n", name, rc2);
788                 if (rc == 0)
789                         rc = rc2;
790         }
791 out_put:
792         llog_ctxt_put(ctxt);
793
794         RETURN(rc);
795 }
796
797 /* Test client api; open log by name and process */
798 static int llog_test_6(const struct lu_env *env, struct obd_device *obd,
799                        char *name)
800 {
801         struct obd_device       *mgc_obd;
802         struct llog_ctxt        *ctxt;
803         struct obd_uuid         *mgs_uuid;
804         struct obd_export       *exp;
805         struct obd_uuid          uuid = { "LLOG_TEST6_UUID" };
806         struct llog_handle      *llh = NULL;
807         struct llog_ctxt        *nctxt;
808         int                      rc, rc2;
809
810         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
811         LASSERT(ctxt);
812         mgs_uuid = &ctxt->loc_exp->exp_obd->obd_uuid;
813
814         CWARN("6a: re-open log %s using client API\n", name);
815         mgc_obd = class_find_client_obd(mgs_uuid, LUSTRE_MGC_NAME, NULL);
816         if (mgc_obd == NULL) {
817                 CERROR("6a: no MGC devices connected to %s found.\n",
818                        mgs_uuid->uuid);
819                 GOTO(ctxt_release, rc = -ENOENT);
820         }
821
822         rc = obd_connect(NULL, &exp, mgc_obd, &uuid,
823                          NULL /* obd_connect_data */, NULL);
824         if (rc != -EALREADY) {
825                 CERROR("6a: connect on connected MGC (%s) failed to return"
826                        " -EALREADY\n", mgc_obd->obd_name);
827                 if (rc == 0)
828                         obd_disconnect(exp);
829                 GOTO(ctxt_release, rc = -EINVAL);
830         }
831
832         nctxt = llog_get_context(mgc_obd, LLOG_CONFIG_REPL_CTXT);
833         rc = llog_open(env, nctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
834         if (rc) {
835                 CERROR("6a: llog_open failed %d\n", rc);
836                 GOTO(nctxt_put, rc);
837         }
838
839         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
840         if (rc) {
841                 CERROR("6a: llog_init_handle failed %d\n", rc);
842                 GOTO(parse_out, rc);
843         }
844
845         plain_counter = 1; /* llog header is first record */
846         CWARN("6b: process log %s using client API\n", name);
847         rc = llog_process(env, llh, plain_print_cb, NULL, NULL);
848         if (rc)
849                 CERROR("6b: llog_process failed %d\n", rc);
850         CWARN("6b: processed %d records\n", plain_counter);
851
852         rc = verify_handle("6b", llh, plain_counter);
853         if (rc)
854                 GOTO(parse_out, rc);
855
856         plain_counter = 1; /* llog header is first record */
857         CWARN("6c: process log %s reversely using client API\n", name);
858         rc = llog_reverse_process(env, llh, plain_print_cb, NULL, NULL);
859         if (rc)
860                 CERROR("6c: llog_reverse_process failed %d\n", rc);
861         CWARN("6c: processed %d records\n", plain_counter);
862
863         rc = verify_handle("6c", llh, plain_counter);
864         if (rc)
865                 GOTO(parse_out, rc);
866
867 parse_out:
868         rc2 = llog_close(env, llh);
869         if (rc2) {
870                 CERROR("6: llog_close failed: rc = %d\n", rc2);
871                 if (rc == 0)
872                         rc = rc2;
873         }
874 nctxt_put:
875         llog_ctxt_put(nctxt);
876 ctxt_release:
877         llog_ctxt_put(ctxt);
878         RETURN(rc);
879 }
880
881 static union {
882         struct llog_rec_hdr             lrh;   /* common header */
883         struct llog_logid_rec           llr;   /* LLOG_LOGID_MAGIC */
884         struct llog_unlink64_rec        lur;   /* MDS_UNLINK64_REC */
885         struct llog_setattr64_rec       lsr64; /* MDS_SETATTR64_REC */
886         struct llog_size_change_rec     lscr;  /* OST_SZ_REC */
887         struct llog_changelog_rec       lcr;   /* CHANGELOG_REC */
888         struct llog_changelog_user_rec  lcur;  /* CHANGELOG_USER_REC */
889         struct llog_gen_rec             lgr;   /* LLOG_GEN_REC */
890 } llog_records;
891
892 static int test_7_print_cb(const struct lu_env *env, struct llog_handle *llh,
893                            struct llog_rec_hdr *rec, void *data)
894 {
895         struct lu_fid fid = {0};
896
897         logid_to_fid(&llh->lgh_id, &fid);
898
899         CDEBUG(D_OTHER, "record type %#x at index %d in log "DFID"\n",
900                rec->lrh_type, rec->lrh_index, PFID(&fid));
901
902         plain_counter++;
903         return 0;
904 }
905
906 static int test_7_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
907                             struct llog_rec_hdr *rec, void *data)
908 {
909         plain_counter++;
910         /* test LLOG_DEL_RECORD is working */
911         return LLOG_DEL_RECORD;
912 }
913
914 static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
915 {
916         struct llog_handle      *llh;
917         int                      rc = 0, i, process_count;
918         int                      num_recs = 0;
919
920         ENTRY;
921
922         rc = llog_open_create(env, ctxt, &llh, NULL, NULL);
923         if (rc) {
924                 CERROR("7_sub: create log failed\n");
925                 RETURN(rc);
926         }
927
928         rc = llog_init_handle(env, llh,
929                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
930                               &uuid);
931         if (rc) {
932                 CERROR("7_sub: can't init llog handle: %d\n", rc);
933                 GOTO(out_close, rc);
934         }
935         for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
936                 rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX);
937                 if (rc == -ENOSPC) {
938                         break;
939                 } else if (rc < 0) {
940                         CERROR("7_sub: write recs failed at #%d: %d\n",
941                                i + 1, rc);
942                         GOTO(out_close, rc);
943                 }
944                 num_recs++;
945         }
946         if (rc != -ENOSPC) {
947                 CWARN("7_sub: write record more than BITMAP size!\n");
948                 GOTO(out_close, rc = -EINVAL);
949         }
950
951         rc = verify_handle("7_sub", llh, num_recs + 1);
952         if (rc) {
953                 CERROR("7_sub: verify handle failed: %d\n", rc);
954                 GOTO(out_close, rc);
955         }
956         if (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1)
957                 CWARN("7_sub: records are not aligned, written %d from %u\n",
958                       num_recs, LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
959
960         plain_counter = 0;
961         rc = llog_process(env, llh, test_7_print_cb, "test 7", NULL);
962         if (rc) {
963                 CERROR("7_sub: llog process failed: %d\n", rc);
964                 GOTO(out_close, rc);
965         }
966         process_count = plain_counter;
967         if (process_count != num_recs) {
968                 CERROR("7_sub: processed %d records from %d total\n",
969                        process_count, num_recs);
970                 GOTO(out_close, rc = -EINVAL);
971         }
972
973         plain_counter = 0;
974         rc = llog_reverse_process(env, llh, test_7_cancel_cb, "test 7", NULL);
975         if (rc && rc != LLOG_DEL_PLAIN) {
976                 CERROR("7_sub: reverse llog process failed: %d\n", rc);
977                 GOTO(out_close, rc);
978         }
979         if (process_count != plain_counter) {
980                 CERROR("7_sub: Reverse/direct processing found different"
981                        "number of records: %d/%d\n",
982                        plain_counter, process_count);
983                 GOTO(out_close, rc = -EINVAL);
984         }
985         if (llog_exist(llh)) {
986                 CERROR("7_sub: llog exists but should be zapped\n");
987                 GOTO(out_close, rc = -EEXIST);
988         }
989
990         rc = verify_handle("7_sub", llh, 1);
991 out_close:
992         if (rc)
993                 llog_destroy(env, llh);
994         llog_close(env, llh);
995         RETURN(rc);
996 }
997
998 /* Test all llog records writing and processing */
999 static int llog_test_7(const struct lu_env *env, struct obd_device *obd)
1000 {
1001         struct llog_ctxt        *ctxt;
1002         int                      rc;
1003
1004         ENTRY;
1005
1006         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1007
1008         CWARN("7a: test llog_logid_rec\n");
1009         llog_records.llr.lid_hdr.lrh_len = sizeof(llog_records.llr);
1010         llog_records.llr.lid_tail.lrt_len = sizeof(llog_records.llr);
1011         llog_records.llr.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
1012
1013         rc = llog_test_7_sub(env, ctxt);
1014         if (rc) {
1015                 CERROR("7a: llog_logid_rec test failed\n");
1016                 GOTO(out, rc);
1017         }
1018
1019         CWARN("7b: test llog_unlink64_rec\n");
1020         llog_records.lur.lur_hdr.lrh_len = sizeof(llog_records.lur);
1021         llog_records.lur.lur_tail.lrt_len = sizeof(llog_records.lur);
1022         llog_records.lur.lur_hdr.lrh_type = MDS_UNLINK64_REC;
1023
1024         rc = llog_test_7_sub(env, ctxt);
1025         if (rc) {
1026                 CERROR("7b: llog_unlink_rec test failed\n");
1027                 GOTO(out, rc);
1028         }
1029
1030         CWARN("7c: test llog_setattr64_rec\n");
1031         llog_records.lsr64.lsr_hdr.lrh_len = sizeof(llog_records.lsr64);
1032         llog_records.lsr64.lsr_tail.lrt_len = sizeof(llog_records.lsr64);
1033         llog_records.lsr64.lsr_hdr.lrh_type = MDS_SETATTR64_REC;
1034
1035         rc = llog_test_7_sub(env, ctxt);
1036         if (rc) {
1037                 CERROR("7c: llog_setattr64_rec test failed\n");
1038                 GOTO(out, rc);
1039         }
1040
1041         CWARN("7d: test llog_size_change_rec\n");
1042         llog_records.lscr.lsc_hdr.lrh_len = sizeof(llog_records.lscr);
1043         llog_records.lscr.lsc_tail.lrt_len = sizeof(llog_records.lscr);
1044         llog_records.lscr.lsc_hdr.lrh_type = OST_SZ_REC;
1045
1046         rc = llog_test_7_sub(env, ctxt);
1047         if (rc) {
1048                 CERROR("7d: llog_size_change_rec test failed\n");
1049                 GOTO(out, rc);
1050         }
1051
1052         CWARN("7e: test llog_changelog_rec\n");
1053         /* Direct access to cr_do_not_use: peculiar case for this test */
1054         llog_records.lcr.cr_hdr.lrh_len = sizeof(llog_records.lcr);
1055         llog_records.lcr.cr_do_not_use.lrt_len = sizeof(llog_records.lcr);
1056         llog_records.lcr.cr_hdr.lrh_type = CHANGELOG_REC;
1057
1058         rc = llog_test_7_sub(env, ctxt);
1059         if (rc) {
1060                 CERROR("7e: llog_changelog_rec test failed\n");
1061                 GOTO(out, rc);
1062         }
1063
1064         CWARN("7f: test llog_changelog_user_rec\n");
1065         llog_records.lcur.cur_hdr.lrh_len = sizeof(llog_records.lcur);
1066         llog_records.lcur.cur_tail.lrt_len = sizeof(llog_records.lcur);
1067         llog_records.lcur.cur_hdr.lrh_type = CHANGELOG_USER_REC;
1068
1069         rc = llog_test_7_sub(env, ctxt);
1070         if (rc) {
1071                 CERROR("7f: llog_changelog_user_rec test failed\n");
1072                 GOTO(out, rc);
1073         }
1074
1075         CWARN("7g: test llog_gen_rec\n");
1076         llog_records.lgr.lgr_hdr.lrh_len = sizeof(llog_records.lgr);
1077         llog_records.lgr.lgr_tail.lrt_len = sizeof(llog_records.lgr);
1078         llog_records.lgr.lgr_hdr.lrh_type = LLOG_GEN_REC;
1079
1080         rc = llog_test_7_sub(env, ctxt);
1081         if (rc) {
1082                 CERROR("7g: llog_size_change_rec test failed\n");
1083                 GOTO(out, rc);
1084         }
1085 out:
1086         llog_ctxt_put(ctxt);
1087         RETURN(rc);
1088 }
1089
1090 static int llog_truncate(const struct lu_env *env, struct dt_object *o)
1091 {
1092         struct lu_attr           la;
1093         struct thandle          *th;
1094         struct dt_device        *d;
1095         int                      rc;
1096         ENTRY;
1097
1098         LASSERT(o);
1099         d = lu2dt_dev(o->do_lu.lo_dev);
1100         LASSERT(d);
1101
1102         rc = dt_attr_get(env, o, &la);
1103         if (rc)
1104                 RETURN(rc);
1105
1106         CDEBUG(D_OTHER, "original size "LPU64"\n", la.la_size);
1107         rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
1108         if (la.la_size < rc) {
1109                 CERROR("too small llog: "LPU64"\n", la.la_size);
1110                 RETURN(0);
1111         }
1112
1113         /* drop 2 records */
1114         la.la_size = la.la_size - (sizeof(struct llog_mini_rec) * 2);
1115         la.la_valid = LA_SIZE;
1116
1117         th = dt_trans_create(env, d);
1118         if (IS_ERR(th))
1119                 RETURN(PTR_ERR(th));
1120
1121         rc = dt_declare_attr_set(env, o, &la, th);
1122         if (rc)
1123                 GOTO(stop, rc);
1124
1125         rc = dt_declare_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
1126
1127         rc = dt_trans_start_local(env, d, th);
1128         if (rc)
1129                 GOTO(stop, rc);
1130
1131         rc = dt_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
1132         if (rc)
1133                 GOTO(stop, rc);
1134
1135         rc = dt_attr_set(env, o, &la, th);
1136         if (rc)
1137                 GOTO(stop, rc);
1138
1139 stop:
1140         dt_trans_stop(env, d, th);
1141
1142         RETURN(rc);
1143 }
1144
1145 static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
1146                           struct llog_rec_hdr *rec, void *data)
1147 {
1148         plain_counter++;
1149         return 0;
1150 }
1151
1152 static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
1153 {
1154         struct llog_handle      *llh = NULL;
1155         char                     name[10];
1156         int                      rc, rc2, i;
1157         int                      orig_counter;
1158         struct llog_mini_rec     lmr;
1159         struct llog_ctxt        *ctxt;
1160         struct dt_object        *obj = NULL;
1161
1162         ENTRY;
1163
1164         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1165         LASSERT(ctxt);
1166
1167         lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
1168         lmr.lmr_hdr.lrh_type = 0xf00f00;
1169
1170         CWARN("8a: fill the first plain llog\n");
1171         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1172         if (rc) {
1173                 CERROR("8a: llog_create with logid failed: %d\n", rc);
1174                 GOTO(out_put, rc);
1175         }
1176
1177         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1178         if (rc) {
1179                 CERROR("8a: can't init llog handle: %d\n", rc);
1180                 GOTO(out, rc);
1181         }
1182
1183         plain_counter = 0;
1184         rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
1185         if (rc != 0) {
1186                 CERROR("5a: process with test_8_cb failed: %d\n", rc);
1187                 GOTO(out, rc);
1188         }
1189         orig_counter = plain_counter;
1190
1191         for (i = 0; i < 100; i++) {
1192                 rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
1193                 if (rc) {
1194                         CERROR("5a: add record failed\n");
1195                         GOTO(out, rc);
1196                 }
1197         }
1198
1199         /* grab the current plain llog, we'll corrupt it later */
1200         obj = llh->u.chd.chd_current_log->lgh_obj;
1201         LASSERT(obj);
1202         lu_object_get(&obj->do_lu);
1203         CWARN("8a: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu)));
1204
1205         rc2 = llog_cat_close(env, llh);
1206         if (rc2) {
1207                 CERROR("8a: close log %s failed: %d\n", name, rc2);
1208                 if (rc == 0)
1209                         rc = rc2;
1210                 GOTO(out_put, rc);
1211         }
1212
1213         CWARN("8b: fill the second plain llog\n");
1214         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1215         if (rc) {
1216                 CERROR("8b: llog_create with logid failed: %d\n", rc);
1217                 GOTO(out_put, rc);
1218         }
1219
1220         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1221         if (rc) {
1222                 CERROR("8b: can't init llog handle: %d\n", rc);
1223                 GOTO(out, rc);
1224         }
1225
1226         for (i = 0; i < 100; i++) {
1227                 rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
1228                 if (rc) {
1229                         CERROR("8b: add record failed\n");
1230                         GOTO(out, rc);
1231                 }
1232         }
1233         CWARN("8b: second llog "DFID"\n",
1234                 PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
1235
1236         rc2 = llog_cat_close(env, llh);
1237         if (rc2) {
1238                 CERROR("8b: close log %s failed: %d\n", name, rc2);
1239                 if (rc == 0)
1240                         rc = rc2;
1241                 GOTO(out_put, rc);
1242         }
1243
1244         CWARN("8c: drop two records from the first plain llog\n");
1245         llog_truncate(env, obj);
1246
1247         CWARN("8d: count survived records\n");
1248         rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
1249         if (rc) {
1250                 CERROR("8d: llog_create with logid failed: %d\n", rc);
1251                 GOTO(out_put, rc);
1252         }
1253
1254         rc = llog_init_handle(env, llh, LLOG_F_IS_CAT, &uuid);
1255         if (rc) {
1256                 CERROR("8d: can't init llog handle: %d\n", rc);
1257                 GOTO(out, rc);
1258         }
1259
1260         plain_counter = 0;
1261         rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
1262         if (rc != 0) {
1263                 CERROR("8d: process with test_8_cb failed: %d\n", rc);
1264                 GOTO(out, rc);
1265         }
1266
1267         if (orig_counter + 200 - 2 != plain_counter) {
1268                 CERROR("found %d records (expected %d)\n", plain_counter,
1269                        orig_counter + 200 - 2);
1270                 rc = -EIO;
1271         }
1272
1273 out:
1274         CWARN("8d: close re-opened catalog\n");
1275         rc2 = llog_cat_close(env, llh);
1276         if (rc2) {
1277                 CERROR("8d: close log %s failed: %d\n", name, rc2);
1278                 if (rc == 0)
1279                         rc = rc2;
1280         }
1281 out_put:
1282         llog_ctxt_put(ctxt);
1283
1284         if (obj != NULL)
1285                 lu_object_put(env, &obj->do_lu);
1286
1287         RETURN(rc);
1288 }
1289
1290 /* -------------------------------------------------------------------------
1291  * Tests above, boring obd functions below
1292  * ------------------------------------------------------------------------- */
1293 static int llog_run_tests(const struct lu_env *env, struct obd_device *obd)
1294 {
1295         struct llog_handle      *llh = NULL;
1296         struct llog_ctxt        *ctxt;
1297         int                      rc, err;
1298         char                     name[10];
1299
1300         ENTRY;
1301         ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
1302         LASSERT(ctxt);
1303
1304         sprintf(name, "%x", llog_test_rand);
1305
1306         rc = llog_test_1(env, obd, name);
1307         if (rc)
1308                 GOTO(cleanup_ctxt, rc);
1309
1310         rc = llog_test_2(env, obd, name, &llh);
1311         if (rc)
1312                 GOTO(cleanup_ctxt, rc);
1313
1314         rc = llog_test_3(env, obd, llh);
1315         if (rc)
1316                 GOTO(cleanup, rc);
1317
1318         rc = llog_test_4(env, obd);
1319         if (rc)
1320                 GOTO(cleanup, rc);
1321
1322         rc = llog_test_5(env, obd);
1323         if (rc)
1324                 GOTO(cleanup, rc);
1325
1326         rc = llog_test_6(env, obd, name);
1327         if (rc)
1328                 GOTO(cleanup, rc);
1329
1330         rc = llog_test_7(env, obd);
1331         if (rc)
1332                 GOTO(cleanup, rc);
1333
1334         rc = llog_test_8(env, obd);
1335         if (rc)
1336                 GOTO(cleanup, rc);
1337
1338 cleanup:
1339         err = llog_destroy(env, llh);
1340         if (err)
1341                 CERROR("cleanup: llog_destroy failed: %d\n", err);
1342         llog_close(env, llh);
1343         if (rc == 0)
1344                 rc = err;
1345 cleanup_ctxt:
1346         llog_ctxt_put(ctxt);
1347         return rc;
1348 }
1349
1350 static int llog_test_cleanup(struct obd_device *obd)
1351 {
1352         struct obd_device       *tgt;
1353         struct lu_env            env;
1354         int                      rc;
1355
1356         ENTRY;
1357
1358         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
1359         if (rc)
1360                 RETURN(rc);
1361
1362         tgt = obd->obd_lvfs_ctxt.dt->dd_lu_dev.ld_obd;
1363         rc = llog_cleanup(&env, llog_get_context(tgt, LLOG_TEST_ORIG_CTXT));
1364         if (rc)
1365                 CERROR("failed to llog_test_llog_finish: %d\n", rc);
1366         lu_env_fini(&env);
1367         RETURN(rc);
1368 }
1369
1370 static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1371 {
1372         struct obd_device       *tgt;
1373         struct llog_ctxt        *ctxt;
1374         struct dt_object        *o;
1375         struct lu_env            env;
1376         struct lu_context        test_session;
1377         int                      rc;
1378
1379         ENTRY;
1380
1381         if (lcfg->lcfg_bufcount < 2) {
1382                 CERROR("requires a TARGET OBD name\n");
1383                 RETURN(-EINVAL);
1384         }
1385
1386         if (lcfg->lcfg_buflens[1] < 1) {
1387                 CERROR("requires a TARGET OBD name\n");
1388                 RETURN(-EINVAL);
1389         }
1390
1391         /* disk obd */
1392         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1393         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1394                 CERROR("target device not attached or not set up (%s)\n",
1395                        lustre_cfg_string(lcfg, 1));
1396                 RETURN(-EINVAL);
1397         }
1398
1399         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
1400         if (rc)
1401                 RETURN(rc);
1402
1403         rc = lu_context_init(&test_session, LCT_SERVER_SESSION);
1404         if (rc)
1405                 GOTO(cleanup_env, rc);
1406         test_session.lc_thread = (struct ptlrpc_thread *)current;
1407         lu_context_enter(&test_session);
1408         env.le_ses = &test_session;
1409
1410         CWARN("Setup llog-test device over %s device\n",
1411               lustre_cfg_string(lcfg, 1));
1412
1413         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1414         obd->obd_lvfs_ctxt.dt = lu2dt_dev(tgt->obd_lu_dev);
1415
1416         rc = llog_setup(&env, tgt, &tgt->obd_olg, LLOG_TEST_ORIG_CTXT, tgt,
1417                         &llog_osd_ops);
1418         if (rc)
1419                 GOTO(cleanup_session, rc);
1420
1421         /* use MGS llog dir for tests */
1422         ctxt = llog_get_context(tgt, LLOG_CONFIG_ORIG_CTXT);
1423         LASSERT(ctxt);
1424         o = ctxt->loc_dir;
1425         llog_ctxt_put(ctxt);
1426
1427         ctxt = llog_get_context(tgt, LLOG_TEST_ORIG_CTXT);
1428         LASSERT(ctxt);
1429         ctxt->loc_dir = o;
1430         llog_ctxt_put(ctxt);
1431
1432         llog_test_rand = cfs_rand();
1433
1434         rc = llog_run_tests(&env, tgt);
1435         if (rc)
1436                 llog_test_cleanup(obd);
1437 cleanup_session:
1438         lu_context_exit(&test_session);
1439         lu_context_fini(&test_session);
1440 cleanup_env:
1441         lu_env_fini(&env);
1442         RETURN(rc);
1443 }
1444
1445 static struct obd_ops llog_obd_ops = {
1446         .o_owner       = THIS_MODULE,
1447         .o_setup       = llog_test_setup,
1448         .o_cleanup     = llog_test_cleanup,
1449 };
1450
1451 static int __init llog_test_init(void)
1452 {
1453         return class_register_type(&llog_obd_ops, NULL, true, NULL,
1454                                    "llog_test", NULL);
1455 }
1456
1457 static void __exit llog_test_exit(void)
1458 {
1459         class_unregister_type("llog_test");
1460 }
1461
1462 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1463 MODULE_DESCRIPTION("llog test module");
1464 MODULE_LICENSE("GPL");
1465
1466 module_init(llog_test_init);
1467 module_exit(llog_test_exit);