Whamcloud - gitweb
e55817805cee0ef5dd06db303a2ba217a96abd01
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45  * Author: Mikhail Pershin <tappro@whamcloud.com>
46  */
47
48 #define DEBUG_SUBSYSTEM S_LOG
49
50 #include <linux/kthread.h>
51 #include <obd_class.h>
52 #include <lustre_log.h>
53 #include "llog_internal.h"
54
55 /*
56  * Allocate a new log or catalog handle
57  * Used inside llog_open().
58  */
59 static struct llog_handle *llog_alloc_handle(void)
60 {
61         struct llog_handle *loghandle;
62
63         OBD_ALLOC_PTR(loghandle);
64         if (loghandle == NULL)
65                 return NULL;
66
67         init_rwsem(&loghandle->lgh_lock);
68         mutex_init(&loghandle->lgh_hdr_mutex);
69         INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
70         atomic_set(&loghandle->lgh_refcount, 1);
71
72         return loghandle;
73 }
74
75 /*
76  * Free llog handle and header data if exists. Used in llog_close() only
77  */
78 static void llog_free_handle(struct llog_handle *loghandle)
79 {
80         LASSERT(loghandle != NULL);
81
82         /* failed llog_init_handle */
83         if (loghandle->lgh_hdr == NULL)
84                 goto out;
85
86         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
87                 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
88         else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
89                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
90         OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
91 out:
92         OBD_FREE_PTR(loghandle);
93 }
94
95 void llog_handle_get(struct llog_handle *loghandle)
96 {
97         atomic_inc(&loghandle->lgh_refcount);
98 }
99
100 void llog_handle_put(struct llog_handle *loghandle)
101 {
102         LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
103         if (atomic_dec_and_test(&loghandle->lgh_refcount))
104                 llog_free_handle(loghandle);
105 }
106
107 static int llog_declare_destroy(const struct lu_env *env,
108                                 struct llog_handle *handle,
109                                 struct thandle *th)
110 {
111         struct llog_operations *lop;
112         int rc;
113
114         ENTRY;
115
116         rc = llog_handle2ops(handle, &lop);
117         if (rc)
118                 RETURN(rc);
119         if (lop->lop_declare_destroy == NULL)
120                 RETURN(-EOPNOTSUPP);
121
122         rc = lop->lop_declare_destroy(env, handle, th);
123
124         RETURN(rc);
125 }
126
127 int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
128                        struct thandle *th)
129 {
130         struct llog_operations  *lop;
131         int rc;
132         ENTRY;
133
134         rc = llog_handle2ops(handle, &lop);
135         if (rc < 0)
136                 RETURN(rc);
137         if (lop->lop_destroy == NULL)
138                 RETURN(-EOPNOTSUPP);
139
140         LASSERT(handle->lgh_obj != NULL);
141         if (!dt_object_exists(handle->lgh_obj))
142                 RETURN(0);
143
144         rc = lop->lop_destroy(env, handle, th);
145
146         RETURN(rc);
147 }
148
149 int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
150 {
151         struct llog_operations  *lop;
152         struct dt_device        *dt;
153         struct thandle          *th;
154         int rc;
155
156         ENTRY;
157
158         rc = llog_handle2ops(handle, &lop);
159         if (rc < 0)
160                 RETURN(rc);
161         if (lop->lop_destroy == NULL)
162                 RETURN(-EOPNOTSUPP);
163
164         if (handle->lgh_obj == NULL) {
165                 /* if lgh_obj == NULL, then it is from client side destroy */
166                 rc = lop->lop_destroy(env, handle, NULL);
167                 RETURN(rc);
168         }
169
170         if (!dt_object_exists(handle->lgh_obj))
171                 RETURN(0);
172
173         dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
174
175         th = dt_trans_create(env, dt);
176         if (IS_ERR(th))
177                 RETURN(PTR_ERR(th));
178
179         rc = llog_declare_destroy(env, handle, th);
180         if (rc != 0)
181                 GOTO(out_trans, rc);
182
183         rc = dt_trans_start_local(env, dt, th);
184         if (rc < 0)
185                 GOTO(out_trans, rc);
186
187         rc = lop->lop_destroy(env, handle, th);
188
189 out_trans:
190         dt_trans_stop(env, dt, th);
191
192         RETURN(rc);
193 }
194 EXPORT_SYMBOL(llog_destroy);
195
196 /* returns negative on error; 0 if success; 1 if success & log destroyed */
197 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
198                     int index)
199 {
200         struct llog_thread_info *lgi = llog_info(env);
201         struct dt_device        *dt;
202         struct llog_log_hdr     *llh = loghandle->lgh_hdr;
203         struct thandle          *th;
204         int                      rc;
205         int rc1;
206         bool subtract_count = false;
207
208         ENTRY;
209
210         CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n", index,
211                POSTID(&loghandle->lgh_id.lgl_oi));
212
213         if (index == 0) {
214                 CERROR("Can't cancel index 0 which is header\n");
215                 RETURN(-EINVAL);
216         }
217
218         LASSERT(loghandle != NULL);
219         LASSERT(loghandle->lgh_ctxt != NULL);
220         LASSERT(loghandle->lgh_obj != NULL);
221
222         dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
223
224         th = dt_trans_create(env, dt);
225         if (IS_ERR(th))
226                 RETURN(PTR_ERR(th));
227
228         rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, index, th);
229         if (rc < 0)
230                 GOTO(out_trans, rc);
231
232         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY))
233                 rc = llog_declare_destroy(env, loghandle, th);
234
235         th->th_wait_submit = 1;
236         rc = dt_trans_start_local(env, dt, th);
237         if (rc < 0)
238                 GOTO(out_trans, rc);
239
240         down_write(&loghandle->lgh_lock);
241         /* clear bitmap */
242         mutex_lock(&loghandle->lgh_hdr_mutex);
243         if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
244                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
245                 GOTO(out_unlock, rc);
246         }
247
248         loghandle->lgh_hdr->llh_count--;
249         subtract_count = true;
250         /* Pass this index to llog_osd_write_rec(), which will use the index
251          * to only update the necesary bitmap. */
252         lgi->lgi_cookie.lgc_index = index;
253         /* update header */
254         rc = llog_write_rec(env, loghandle, &llh->llh_hdr, &lgi->lgi_cookie,
255                             LLOG_HEADER_IDX, th);
256         if (rc != 0)
257                 GOTO(out_unlock, rc);
258
259         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
260             (llh->llh_count == 1) &&
261             (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
262                 rc = llog_trans_destroy(env, loghandle, th);
263                 if (rc < 0) {
264                         /* Sigh, can not destroy the final plain llog, but
265                          * the bitmap has been clearly, so the record can not
266                          * be accessed anymore, let's return 0 for now, and
267                          * the orphan will be handled by LFSCK. */
268                         CERROR("%s: can't destroy empty llog #"DOSTID
269                                "#%08x: rc = %d\n",
270                                loghandle->lgh_ctxt->loc_obd->obd_name,
271                                POSTID(&loghandle->lgh_id.lgl_oi),
272                                loghandle->lgh_id.lgl_ogen, rc);
273                         GOTO(out_unlock, rc);
274                 }
275                 rc = LLOG_DEL_PLAIN;
276         }
277
278 out_unlock:
279         mutex_unlock(&loghandle->lgh_hdr_mutex);
280         up_write(&loghandle->lgh_lock);
281 out_trans:
282         rc1 = dt_trans_stop(env, dt, th);
283         if (rc == 0)
284                 rc = rc1;
285         if (rc < 0 && subtract_count) {
286                 mutex_lock(&loghandle->lgh_hdr_mutex);
287                 loghandle->lgh_hdr->llh_count++;
288                 ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
289                 mutex_unlock(&loghandle->lgh_hdr_mutex);
290         }
291         RETURN(rc);
292 }
293
294 static int llog_read_header(const struct lu_env *env,
295                             struct llog_handle *handle,
296                             struct obd_uuid *uuid)
297 {
298         struct llog_operations *lop;
299         int rc;
300
301         rc = llog_handle2ops(handle, &lop);
302         if (rc)
303                 RETURN(rc);
304
305         if (lop->lop_read_header == NULL)
306                 RETURN(-EOPNOTSUPP);
307
308         rc = lop->lop_read_header(env, handle);
309         if (rc == LLOG_EEMPTY) {
310                 struct llog_log_hdr *llh = handle->lgh_hdr;
311
312                 /* lrh_len should be initialized in llog_init_handle */
313                 handle->lgh_last_idx = 0; /* header is record with index 0 */
314                 llh->llh_count = 1;         /* for the header record */
315                 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
316                 LASSERT(handle->lgh_ctxt->loc_chunk_size >=
317                                                 LLOG_MIN_CHUNK_SIZE);
318                 llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
319                 llh->llh_hdr.lrh_index = 0;
320                 llh->llh_timestamp = cfs_time_current_sec();
321                 if (uuid)
322                         memcpy(&llh->llh_tgtuuid, uuid,
323                                sizeof(llh->llh_tgtuuid));
324                 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
325                 ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
326                 LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
327                 LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
328                 rc = 0;
329         }
330         return rc;
331 }
332
333 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
334                      int flags, struct obd_uuid *uuid)
335 {
336         struct llog_log_hdr     *llh;
337         enum llog_flag           fmt = flags & LLOG_F_EXT_MASK;
338         int                      rc;
339         int                     chunk_size = handle->lgh_ctxt->loc_chunk_size;
340         ENTRY;
341
342         LASSERT(handle->lgh_hdr == NULL);
343
344         LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
345         OBD_ALLOC_LARGE(llh, chunk_size);
346         if (llh == NULL)
347                 RETURN(-ENOMEM);
348
349         handle->lgh_hdr = llh;
350         handle->lgh_hdr_size = chunk_size;
351         /* first assign flags to use llog_client_ops */
352         llh->llh_flags = flags;
353         rc = llog_read_header(env, handle, uuid);
354         if (rc == 0) {
355                 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
356                               flags & LLOG_F_IS_CAT) ||
357                              (llh->llh_flags & LLOG_F_IS_CAT &&
358                               flags & LLOG_F_IS_PLAIN))) {
359                         CERROR("%s: llog type is %s but initializing %s\n",
360                                handle->lgh_ctxt->loc_obd->obd_name,
361                                llh->llh_flags & LLOG_F_IS_CAT ?
362                                "catalog" : "plain",
363                                flags & LLOG_F_IS_CAT ? "catalog" : "plain");
364                         GOTO(out, rc = -EINVAL);
365                 } else if (llh->llh_flags &
366                            (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
367                         /*
368                          * it is possible to open llog without specifying llog
369                          * type so it is taken from llh_flags
370                          */
371                         flags = llh->llh_flags;
372                 } else {
373                         /* for some reason the llh_flags has no type set */
374                         CERROR("llog type is not specified!\n");
375                         GOTO(out, rc = -EINVAL);
376                 }
377                 if (unlikely(uuid &&
378                              !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
379                         CERROR("%s: llog uuid mismatch: %s/%s\n",
380                                handle->lgh_ctxt->loc_obd->obd_name,
381                                (char *)uuid->uuid,
382                                (char *)llh->llh_tgtuuid.uuid);
383                         GOTO(out, rc = -EEXIST);
384                 }
385         }
386         if (flags & LLOG_F_IS_CAT) {
387                 LASSERT(list_empty(&handle->u.chd.chd_head));
388                 INIT_LIST_HEAD(&handle->u.chd.chd_head);
389                 llh->llh_size = sizeof(struct llog_logid_rec);
390                 llh->llh_flags |= LLOG_F_IS_FIXSIZE;
391         } else if (!(flags & LLOG_F_IS_PLAIN)) {
392                 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
393                        handle->lgh_ctxt->loc_obd->obd_name,
394                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
395                 rc = -EINVAL;
396         }
397         llh->llh_flags |= fmt;
398 out:
399         if (rc) {
400                 OBD_FREE_LARGE(llh, chunk_size);
401                 handle->lgh_hdr = NULL;
402         }
403         RETURN(rc);
404 }
405 EXPORT_SYMBOL(llog_init_handle);
406
407 static int llog_process_thread(void *arg)
408 {
409         struct llog_process_info        *lpi = arg;
410         struct llog_handle              *loghandle = lpi->lpi_loghandle;
411         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
412         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
413         char                            *buf;
414         size_t                           chunk_size;
415         __u64                            cur_offset;
416         int                              rc = 0, index = 1, last_index;
417         int                              saved_index = 0;
418         int                              last_called_index = 0;
419
420         ENTRY;
421
422         if (llh == NULL)
423                 RETURN(-EINVAL);
424
425         cur_offset = chunk_size = llh->llh_hdr.lrh_len;
426         /* expect chunk_size to be power of two */
427         LASSERT(is_power_of_2(chunk_size));
428
429         OBD_ALLOC_LARGE(buf, chunk_size);
430         if (buf == NULL) {
431                 lpi->lpi_rc = -ENOMEM;
432                 RETURN(0);
433         }
434
435         if (cd != NULL) {
436                 last_called_index = cd->lpcd_first_idx;
437                 index = cd->lpcd_first_idx + 1;
438         }
439         if (cd != NULL && cd->lpcd_last_idx)
440                 last_index = cd->lpcd_last_idx;
441         else
442                 last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
443
444         while (rc == 0) {
445                 struct llog_rec_hdr *rec;
446                 off_t chunk_offset;
447                 unsigned int buf_offset = 0;
448                 bool partial_chunk;
449
450                 /* skip records not set in bitmap */
451                 while (index <= last_index &&
452                        !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
453                         ++index;
454
455                 /* There are no indices prior the last_index */
456                 if (index > last_index)
457                         break;
458
459                 CDEBUG(D_OTHER, "index: %d last_index %d\n", index,
460                        last_index);
461
462 repeat:
463                 /* get the buf with our target record; avoid old garbage */
464                 memset(buf, 0, chunk_size);
465                 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
466                                      index, &cur_offset, buf, chunk_size);
467                 if (rc != 0)
468                         GOTO(out, rc);
469
470                 /* NB: after llog_next_block() call the cur_offset is the
471                  * offset of the next block after read one.
472                  * The absolute offset of the current chunk is calculated
473                  * from cur_offset value and stored in chunk_offset variable.
474                  */
475                 if (cur_offset % chunk_size != 0) {
476                         partial_chunk = true;
477                         chunk_offset = cur_offset & ~(chunk_size - 1);
478                 } else {
479                         partial_chunk = false;
480                         chunk_offset = cur_offset - chunk_size;
481                 }
482
483                 /* NB: when rec->lrh_len is accessed it is already swabbed
484                  * since it is used at the "end" of the loop and the rec
485                  * swabbing is done at the beginning of the loop. */
486                 for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
487                      (char *)rec < buf + chunk_size;
488                      rec = llog_rec_hdr_next(rec)) {
489
490                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
491                                rec, rec->lrh_type);
492
493                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
494                                 lustre_swab_llog_rec(rec);
495
496                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
497                                rec->lrh_type, rec->lrh_index);
498
499                         /* for partial chunk the end of it is zeroed, check
500                          * for index 0 to distinguish it. */
501                         if (partial_chunk && rec->lrh_index == 0) {
502                                 /* concurrent llog_add() might add new records
503                                  * while llog_processing, check this is not
504                                  * the case and re-read the current chunk
505                                  * otherwise. */
506                                 if (index > loghandle->lgh_last_idx)
507                                         GOTO(out, rc = 0);
508                                 CDEBUG(D_OTHER, "Re-read last llog buffer for "
509                                        "new records, index %u, last %u\n",
510                                        index, loghandle->lgh_last_idx);
511                                 /* save offset inside buffer for the re-read */
512                                 buf_offset = (char *)rec - (char *)buf;
513                                 cur_offset = chunk_offset;
514                                 goto repeat;
515                         }
516
517                         if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
518                                 CWARN("invalid length %d in llog record for "
519                                       "index %d/%d\n", rec->lrh_len,
520                                       rec->lrh_index, index);
521                                 GOTO(out, rc = -EINVAL);
522                         }
523
524                         if (rec->lrh_index < index) {
525                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
526                                        rec->lrh_index);
527                                 continue;
528                         }
529
530                         if (rec->lrh_index != index) {
531                                 CERROR("%s: Invalid record: index %u but "
532                                        "expected %u\n",
533                                        loghandle->lgh_ctxt->loc_obd->obd_name,
534                                        rec->lrh_index, index);
535                                 GOTO(out, rc = -ERANGE);
536                         }
537
538                         CDEBUG(D_OTHER,
539                                "lrh_index: %d lrh_len: %d (%d remains)\n",
540                                rec->lrh_index, rec->lrh_len,
541                                (int)(buf + chunk_size - (char *)rec));
542
543                         loghandle->lgh_cur_idx = rec->lrh_index;
544                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
545                                                     chunk_offset;
546
547                         /* if set, process the callback on this record */
548                         if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
549                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
550                                                  lpi->lpi_cbdata);
551                                 last_called_index = index;
552                                 if (rc == LLOG_PROC_BREAK) {
553                                         GOTO(out, rc);
554                                 } else if (rc == LLOG_DEL_RECORD) {
555                                         rc = llog_cancel_rec(lpi->lpi_env,
556                                                              loghandle,
557                                                              rec->lrh_index);
558                                 }
559                                 if (rc)
560                                         GOTO(out, rc);
561                         }
562                         /* exit if the last index is reached */
563                         if (index >= last_index)
564                                 GOTO(out, rc = 0);
565                         ++index;
566                 }
567         }
568
569 out:
570         if (cd != NULL)
571                 cd->lpcd_last_idx = last_called_index;
572
573         if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
574                 /* something bad happened to the processing of a local
575                  * llog file, probably I/O error or the log got corrupted..
576                  * to be able to finally release the log we discard any
577                  * remaining bits in the header */
578                 CERROR("Local llog found corrupted\n");
579                 while (index <= last_index) {
580                         if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
581                                 llog_cancel_rec(lpi->lpi_env, loghandle, index);
582                         index++;
583                 }
584                 rc = 0;
585         }
586
587         OBD_FREE_LARGE(buf, chunk_size);
588         lpi->lpi_rc = rc;
589         return 0;
590 }
591
592 static int llog_process_thread_daemonize(void *arg)
593 {
594         struct llog_process_info        *lpi = arg;
595         struct lu_env                    env;
596         int                              rc;
597
598         unshare_fs_struct();
599
600         /* client env has no keys, tags is just 0 */
601         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
602         if (rc)
603                 goto out;
604         lpi->lpi_env = &env;
605
606         rc = llog_process_thread(arg);
607
608         lu_env_fini(&env);
609 out:
610         complete(&lpi->lpi_completion);
611         return rc;
612 }
613
614 int llog_process_or_fork(const struct lu_env *env,
615                          struct llog_handle *loghandle,
616                          llog_cb_t cb, void *data, void *catdata, bool fork)
617 {
618         struct llog_process_info *lpi;
619         int                      rc;
620
621         ENTRY;
622
623         OBD_ALLOC_PTR(lpi);
624         if (lpi == NULL) {
625                 CERROR("cannot alloc pointer\n");
626                 RETURN(-ENOMEM);
627         }
628         lpi->lpi_loghandle = loghandle;
629         lpi->lpi_cb        = cb;
630         lpi->lpi_cbdata    = data;
631         lpi->lpi_catdata   = catdata;
632
633         if (fork) {
634                 struct task_struct *task;
635
636                 /* The new thread can't use parent env,
637                  * init the new one in llog_process_thread_daemonize. */
638                 lpi->lpi_env = NULL;
639                 init_completion(&lpi->lpi_completion);
640                 task = kthread_run(llog_process_thread_daemonize, lpi,
641                                    "llog_process_thread");
642                 if (IS_ERR(task)) {
643                         rc = PTR_ERR(task);
644                         CERROR("%s: cannot start thread: rc = %d\n",
645                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
646                         GOTO(out_lpi, rc);
647                 }
648                 wait_for_completion(&lpi->lpi_completion);
649         } else {
650                 lpi->lpi_env = env;
651                 llog_process_thread(lpi);
652         }
653         rc = lpi->lpi_rc;
654
655 out_lpi:
656         OBD_FREE_PTR(lpi);
657         RETURN(rc);
658 }
659 EXPORT_SYMBOL(llog_process_or_fork);
660
661 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
662                  llog_cb_t cb, void *data, void *catdata)
663 {
664         int rc;
665         rc = llog_process_or_fork(env, loghandle, cb, data, catdata, true);
666         return rc == LLOG_DEL_PLAIN ? 0 : rc;
667 }
668 EXPORT_SYMBOL(llog_process);
669
670 int llog_reverse_process(const struct lu_env *env,
671                          struct llog_handle *loghandle, llog_cb_t cb,
672                          void *data, void *catdata)
673 {
674         struct llog_log_hdr *llh = loghandle->lgh_hdr;
675         struct llog_process_cat_data *cd = catdata;
676         void *buf;
677         int rc = 0, first_index = 1, index, idx;
678         __u32   chunk_size = llh->llh_hdr.lrh_len;
679         ENTRY;
680
681         OBD_ALLOC_LARGE(buf, chunk_size);
682         if (buf == NULL)
683                 RETURN(-ENOMEM);
684
685         if (cd != NULL)
686                 first_index = cd->lpcd_first_idx + 1;
687         if (cd != NULL && cd->lpcd_last_idx)
688                 index = cd->lpcd_last_idx;
689         else
690                 index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
691
692         while (rc == 0) {
693                 struct llog_rec_hdr *rec;
694                 struct llog_rec_tail *tail;
695
696                 /* skip records not set in bitmap */
697                 while (index >= first_index &&
698                        !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
699                         --index;
700
701                 LASSERT(index >= first_index - 1);
702                 if (index == first_index - 1)
703                         break;
704
705                 /* get the buf with our target record; avoid old garbage */
706                 memset(buf, 0, chunk_size);
707                 rc = llog_prev_block(env, loghandle, index, buf, chunk_size);
708                 if (rc)
709                         GOTO(out, rc);
710
711                 rec = buf;
712                 idx = rec->lrh_index;
713                 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
714                 while (idx < index) {
715                         rec = (void *)rec + rec->lrh_len;
716                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
717                                 lustre_swab_llog_rec(rec);
718                         idx ++;
719                 }
720                 LASSERT(idx == index);
721                 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
722
723                 /* process records in buffer, starting where we found one */
724                 while ((void *)tail > buf) {
725                         if (tail->lrt_index == 0)
726                                 GOTO(out, rc = 0); /* no more records */
727
728                         /* if set, process the callback on this record */
729                         if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
730                                 rec = (void *)tail - tail->lrt_len +
731                                       sizeof(*tail);
732
733                                 rc = cb(env, loghandle, rec, data);
734                                 if (rc == LLOG_PROC_BREAK) {
735                                         GOTO(out, rc);
736                                 } else if (rc == LLOG_DEL_RECORD) {
737                                         rc = llog_cancel_rec(env, loghandle,
738                                                              tail->lrt_index);
739                                 }
740                                 if (rc)
741                                         GOTO(out, rc);
742                         }
743
744                         /* previous record, still in buffer? */
745                         --index;
746                         if (index < first_index)
747                                 GOTO(out, rc = 0);
748                         tail = (void *)tail - tail->lrt_len;
749                 }
750         }
751
752 out:
753         if (buf != NULL)
754                 OBD_FREE_LARGE(buf, chunk_size);
755         RETURN(rc);
756 }
757 EXPORT_SYMBOL(llog_reverse_process);
758
759 /**
760  * new llog API
761  *
762  * API functions:
763  *      llog_open - open llog, may not exist
764  *      llog_exist - check if llog exists
765  *      llog_close - close opened llog, pair for open, frees llog_handle
766  *      llog_declare_create - declare llog creation
767  *      llog_create - create new llog on disk, need transaction handle
768  *      llog_declare_write_rec - declaration of llog write
769  *      llog_write_rec - write llog record on disk, need transaction handle
770  *      llog_declare_add - declare llog catalog record addition
771  *      llog_add - add llog record in catalog, need transaction handle
772  */
773 int llog_exist(struct llog_handle *loghandle)
774 {
775         struct llog_operations  *lop;
776         int                      rc;
777
778         ENTRY;
779
780         rc = llog_handle2ops(loghandle, &lop);
781         if (rc)
782                 RETURN(rc);
783         if (lop->lop_exist == NULL)
784                 RETURN(-EOPNOTSUPP);
785
786         rc = lop->lop_exist(loghandle);
787         RETURN(rc);
788 }
789 EXPORT_SYMBOL(llog_exist);
790
791 int llog_declare_create(const struct lu_env *env,
792                         struct llog_handle *loghandle, struct thandle *th)
793 {
794         struct llog_operations  *lop;
795         int                      raised, rc;
796
797         ENTRY;
798
799         rc = llog_handle2ops(loghandle, &lop);
800         if (rc)
801                 RETURN(rc);
802         if (lop->lop_declare_create == NULL)
803                 RETURN(-EOPNOTSUPP);
804
805         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
806         if (!raised)
807                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
808         rc = lop->lop_declare_create(env, loghandle, th);
809         if (!raised)
810                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
811         RETURN(rc);
812 }
813
814 int llog_create(const struct lu_env *env, struct llog_handle *handle,
815                 struct thandle *th)
816 {
817         struct llog_operations  *lop;
818         int                      raised, rc;
819
820         ENTRY;
821
822         rc = llog_handle2ops(handle, &lop);
823         if (rc)
824                 RETURN(rc);
825         if (lop->lop_create == NULL)
826                 RETURN(-EOPNOTSUPP);
827
828         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
829         if (!raised)
830                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
831         rc = lop->lop_create(env, handle, th);
832         if (!raised)
833                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
834         RETURN(rc);
835 }
836
837 int llog_declare_write_rec(const struct lu_env *env,
838                            struct llog_handle *handle,
839                            struct llog_rec_hdr *rec, int idx,
840                            struct thandle *th)
841 {
842         struct llog_operations  *lop;
843         int                      raised, rc;
844
845         ENTRY;
846
847         rc = llog_handle2ops(handle, &lop);
848         if (rc)
849                 RETURN(rc);
850         LASSERT(lop);
851         if (lop->lop_declare_write_rec == NULL)
852                 RETURN(-EOPNOTSUPP);
853
854         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
855         if (!raised)
856                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
857         rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
858         if (!raised)
859                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
860         RETURN(rc);
861 }
862
863 int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
864                    struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
865                    int idx, struct thandle *th)
866 {
867         struct llog_operations  *lop;
868         int                      raised, rc, buflen;
869
870         ENTRY;
871
872         rc = llog_handle2ops(handle, &lop);
873         if (rc)
874                 RETURN(rc);
875
876         LASSERT(lop);
877         if (lop->lop_write_rec == NULL)
878                 RETURN(-EOPNOTSUPP);
879
880         buflen = rec->lrh_len;
881         LASSERT(cfs_size_round(buflen) == buflen);
882
883         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
884         if (!raised)
885                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
886         rc = lop->lop_write_rec(env, handle, rec, logcookies, idx, th);
887         if (!raised)
888                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
889         RETURN(rc);
890 }
891
892 int llog_add(const struct lu_env *env, struct llog_handle *lgh,
893              struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
894              struct thandle *th)
895 {
896         int raised, rc;
897
898         ENTRY;
899
900         if (lgh->lgh_logops->lop_add == NULL)
901                 RETURN(-EOPNOTSUPP);
902
903         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
904         if (!raised)
905                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
906         rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, th);
907         if (!raised)
908                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
909         RETURN(rc);
910 }
911 EXPORT_SYMBOL(llog_add);
912
913 int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
914                      struct llog_rec_hdr *rec, struct thandle *th)
915 {
916         int raised, rc;
917
918         ENTRY;
919
920         if (lgh->lgh_logops->lop_declare_add == NULL)
921                 RETURN(-EOPNOTSUPP);
922
923         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
924         if (!raised)
925                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
926         rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
927         if (!raised)
928                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
929         RETURN(rc);
930 }
931 EXPORT_SYMBOL(llog_declare_add);
932
933 /**
934  * Helper function to open llog or create it if doesn't exist.
935  * It hides all transaction handling from caller.
936  */
937 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
938                      struct llog_handle **res, struct llog_logid *logid,
939                      char *name)
940 {
941         struct dt_device        *d;
942         struct thandle          *th;
943         int                      rc;
944
945         ENTRY;
946
947         rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
948         if (rc)
949                 RETURN(rc);
950
951         if (llog_exist(*res))
952                 RETURN(0);
953
954         LASSERT((*res)->lgh_obj != NULL);
955
956         d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
957
958         th = dt_trans_create(env, d);
959         if (IS_ERR(th))
960                 GOTO(out, rc = PTR_ERR(th));
961
962         /* Create update llog object synchronously, which
963          * happens during inialization process see
964          * lod_sub_prep_llog(), to make sure the update
965          * llog object is created before corss-MDT writing
966          * updates into the llog object */
967         if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
968                 th->th_sync = 1;
969
970         th->th_wait_submit = 1;
971         rc = llog_declare_create(env, *res, th);
972         if (rc == 0) {
973                 rc = dt_trans_start_local(env, d, th);
974                 if (rc == 0)
975                         rc = llog_create(env, *res, th);
976         }
977         dt_trans_stop(env, d, th);
978 out:
979         if (rc)
980                 llog_close(env, *res);
981         RETURN(rc);
982 }
983 EXPORT_SYMBOL(llog_open_create);
984
985 /**
986  * Helper function to delete existent llog.
987  */
988 int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
989                struct llog_logid *logid, char *name)
990 {
991         struct llog_handle      *handle;
992         int                      rc = 0, rc2;
993
994         ENTRY;
995
996         /* nothing to erase */
997         if (name == NULL && logid == NULL)
998                 RETURN(0);
999
1000         rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
1001         if (rc < 0)
1002                 RETURN(rc);
1003
1004         rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
1005         if (rc == 0)
1006                 rc = llog_destroy(env, handle);
1007
1008         rc2 = llog_close(env, handle);
1009         if (rc == 0)
1010                 rc = rc2;
1011         RETURN(rc);
1012 }
1013 EXPORT_SYMBOL(llog_erase);
1014
1015 /*
1016  * Helper function for write record in llog.
1017  * It hides all transaction handling from caller.
1018  * Valid only with local llog.
1019  */
1020 int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
1021                struct llog_rec_hdr *rec, int idx)
1022 {
1023         struct dt_device        *dt;
1024         struct thandle          *th;
1025         int                      rc;
1026
1027         ENTRY;
1028
1029         LASSERT(loghandle);
1030         LASSERT(loghandle->lgh_ctxt);
1031         LASSERT(loghandle->lgh_obj != NULL);
1032
1033         dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
1034
1035         th = dt_trans_create(env, dt);
1036         if (IS_ERR(th))
1037                 RETURN(PTR_ERR(th));
1038
1039         rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
1040         if (rc)
1041                 GOTO(out_trans, rc);
1042
1043         th->th_wait_submit = 1;
1044         rc = dt_trans_start_local(env, dt, th);
1045         if (rc)
1046                 GOTO(out_trans, rc);
1047
1048         down_write(&loghandle->lgh_lock);
1049         rc = llog_write_rec(env, loghandle, rec, NULL, idx, th);
1050         up_write(&loghandle->lgh_lock);
1051 out_trans:
1052         dt_trans_stop(env, dt, th);
1053         RETURN(rc);
1054 }
1055 EXPORT_SYMBOL(llog_write);
1056
1057 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
1058               struct llog_handle **lgh, struct llog_logid *logid,
1059               char *name, enum llog_open_param open_param)
1060 {
1061         int      raised;
1062         int      rc;
1063
1064         ENTRY;
1065
1066         LASSERT(ctxt);
1067         LASSERT(ctxt->loc_logops);
1068
1069         if (ctxt->loc_logops->lop_open == NULL) {
1070                 *lgh = NULL;
1071                 RETURN(-EOPNOTSUPP);
1072         }
1073
1074         *lgh = llog_alloc_handle();
1075         if (*lgh == NULL)
1076                 RETURN(-ENOMEM);
1077         (*lgh)->lgh_ctxt = ctxt;
1078         (*lgh)->lgh_logops = ctxt->loc_logops;
1079
1080         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
1081         if (!raised)
1082                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
1083         rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
1084         if (!raised)
1085                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
1086         if (rc) {
1087                 llog_free_handle(*lgh);
1088                 *lgh = NULL;
1089         }
1090         RETURN(rc);
1091 }
1092 EXPORT_SYMBOL(llog_open);
1093
1094 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
1095 {
1096         struct llog_operations  *lop;
1097         int                      rc;
1098
1099         ENTRY;
1100
1101         rc = llog_handle2ops(loghandle, &lop);
1102         if (rc)
1103                 GOTO(out, rc);
1104         if (lop->lop_close == NULL)
1105                 GOTO(out, rc = -EOPNOTSUPP);
1106         rc = lop->lop_close(env, loghandle);
1107 out:
1108         llog_handle_put(loghandle);
1109         RETURN(rc);
1110 }
1111 EXPORT_SYMBOL(llog_close);
1112
1113 /**
1114  * Helper function to get the llog size in records. It is used by MGS
1115  * mostly to check that config llog exists and contains data.
1116  *
1117  * \param[in] env       execution environment
1118  * \param[in] ctxt      llog context
1119  * \param[in] name      llog name
1120  *
1121  * \retval              true if there are records in llog besides a header
1122  * \retval              false on error or llog without records
1123  */
1124 int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
1125                   char *name)
1126 {
1127         struct llog_handle      *llh;
1128         int                      rc = 0;
1129
1130         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1131         if (rc < 0) {
1132                 if (likely(rc == -ENOENT))
1133                         rc = 0;
1134                 GOTO(out, rc);
1135         }
1136
1137         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1138         if (rc)
1139                 GOTO(out_close, rc);
1140         rc = llog_get_size(llh);
1141
1142 out_close:
1143         llog_close(env, llh);
1144 out:
1145         /* The header is record 1, the llog is still considered as empty
1146          * if there is only header */
1147         return (rc <= 1);
1148 }
1149 EXPORT_SYMBOL(llog_is_empty);
1150
1151 int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
1152                       struct llog_rec_hdr *rec, void *data)
1153 {
1154         struct llog_handle      *copy_llh = data;
1155
1156         /* Append all records */
1157         return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
1158 }
1159
1160 /* backup plain llog */
1161 int llog_backup(const struct lu_env *env, struct obd_device *obd,
1162                 struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
1163                 char *name, char *backup)
1164 {
1165         struct llog_handle      *llh, *bllh;
1166         int                      rc;
1167
1168         ENTRY;
1169
1170         /* open original log */
1171         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1172         if (rc < 0) {
1173                 /* the -ENOENT case is also reported to the caller
1174                  * but silently so it should handle that if needed.
1175                  */
1176                 if (rc != -ENOENT)
1177                         CERROR("%s: failed to open log %s: rc = %d\n",
1178                                obd->obd_name, name, rc);
1179                 RETURN(rc);
1180         }
1181
1182         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1183         if (rc)
1184                 GOTO(out_close, rc);
1185
1186         /* Make sure there's no old backup log */
1187         rc = llog_erase(env, bctxt, NULL, backup);
1188         if (rc < 0 && rc != -ENOENT)
1189                 GOTO(out_close, rc);
1190
1191         /* open backup log */
1192         rc = llog_open_create(env, bctxt, &bllh, NULL, backup);
1193         if (rc) {
1194                 CERROR("%s: failed to open backup logfile %s: rc = %d\n",
1195                        obd->obd_name, backup, rc);
1196                 GOTO(out_close, rc);
1197         }
1198
1199         /* check that backup llog is not the same object as original one */
1200         if (llh->lgh_obj == bllh->lgh_obj) {
1201                 CERROR("%s: backup llog %s to itself (%s), objects %p/%p\n",
1202                        obd->obd_name, name, backup, llh->lgh_obj,
1203                        bllh->lgh_obj);
1204                 GOTO(out_backup, rc = -EEXIST);
1205         }
1206
1207         rc = llog_init_handle(env, bllh, LLOG_F_IS_PLAIN, NULL);
1208         if (rc)
1209                 GOTO(out_backup, rc);
1210
1211         /* Copy log record by record */
1212         rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
1213                                   NULL, false);
1214         if (rc)
1215                 CERROR("%s: failed to backup log %s: rc = %d\n",
1216                        obd->obd_name, name, rc);
1217 out_backup:
1218         llog_close(env, bllh);
1219 out_close:
1220         llog_close(env, llh);
1221         RETURN(rc);
1222 }
1223 EXPORT_SYMBOL(llog_backup);