Whamcloud - gitweb
LU-6846 llog: combine cancel rec and destroy
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45  * Author: Mikhail Pershin <tappro@whamcloud.com>
46  */
47
48 #define DEBUG_SUBSYSTEM S_LOG
49
50 #include <linux/kthread.h>
51 #include <obd_class.h>
52 #include <lustre_log.h>
53 #include "llog_internal.h"
54
55 /*
56  * Allocate a new log or catalog handle
57  * Used inside llog_open().
58  */
59 static struct llog_handle *llog_alloc_handle(void)
60 {
61         struct llog_handle *loghandle;
62
63         OBD_ALLOC_PTR(loghandle);
64         if (loghandle == NULL)
65                 return NULL;
66
67         init_rwsem(&loghandle->lgh_lock);
68         mutex_init(&loghandle->lgh_hdr_mutex);
69         INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
70         atomic_set(&loghandle->lgh_refcount, 1);
71
72         return loghandle;
73 }
74
75 /*
76  * Free llog handle and header data if exists. Used in llog_close() only
77  */
78 static void llog_free_handle(struct llog_handle *loghandle)
79 {
80         LASSERT(loghandle != NULL);
81
82         /* failed llog_init_handle */
83         if (loghandle->lgh_hdr == NULL)
84                 goto out;
85
86         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
87                 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
88         else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
89                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
90         OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
91 out:
92         OBD_FREE_PTR(loghandle);
93 }
94
95 void llog_handle_get(struct llog_handle *loghandle)
96 {
97         atomic_inc(&loghandle->lgh_refcount);
98 }
99
100 void llog_handle_put(struct llog_handle *loghandle)
101 {
102         LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
103         if (atomic_dec_and_test(&loghandle->lgh_refcount))
104                 llog_free_handle(loghandle);
105 }
106
107 static int llog_declare_destroy(const struct lu_env *env,
108                                 struct llog_handle *handle,
109                                 struct thandle *th)
110 {
111         struct llog_operations *lop;
112         int rc;
113
114         ENTRY;
115
116         rc = llog_handle2ops(handle, &lop);
117         if (rc)
118                 RETURN(rc);
119         if (lop->lop_declare_destroy == NULL)
120                 RETURN(-EOPNOTSUPP);
121
122         rc = lop->lop_declare_destroy(env, handle, th);
123
124         RETURN(rc);
125 }
126
127 int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
128                        struct thandle *th)
129 {
130         struct llog_operations  *lop;
131         int rc;
132         ENTRY;
133
134         rc = llog_handle2ops(handle, &lop);
135         if (rc < 0)
136                 RETURN(rc);
137         if (lop->lop_destroy == NULL)
138                 RETURN(-EOPNOTSUPP);
139
140         LASSERT(handle->lgh_obj != NULL);
141         if (!dt_object_exists(handle->lgh_obj))
142                 RETURN(0);
143
144         rc = lop->lop_destroy(env, handle, th);
145
146         RETURN(rc);
147 }
148
149 int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
150 {
151         struct llog_operations  *lop;
152         struct dt_device        *dt;
153         struct thandle          *th;
154         int rc;
155
156         ENTRY;
157
158         rc = llog_handle2ops(handle, &lop);
159         if (rc < 0)
160                 RETURN(rc);
161         if (lop->lop_destroy == NULL)
162                 RETURN(-EOPNOTSUPP);
163
164         if (handle->lgh_obj == NULL) {
165                 /* if lgh_obj == NULL, then it is from client side destroy */
166                 rc = lop->lop_destroy(env, handle, NULL);
167                 RETURN(rc);
168         }
169
170         if (!dt_object_exists(handle->lgh_obj))
171                 RETURN(0);
172
173         dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
174
175         th = dt_trans_create(env, dt);
176         if (IS_ERR(th))
177                 RETURN(PTR_ERR(th));
178
179         rc = llog_declare_destroy(env, handle, th);
180         if (rc != 0)
181                 GOTO(out_trans, rc);
182
183         rc = dt_trans_start_local(env, dt, th);
184         if (rc < 0)
185                 GOTO(out_trans, rc);
186
187         rc = lop->lop_destroy(env, handle, th);
188
189 out_trans:
190         dt_trans_stop(env, dt, th);
191
192         RETURN(rc);
193 }
194 EXPORT_SYMBOL(llog_destroy);
195
196 /* returns negative on error; 0 if success; 1 if success & log destroyed */
197 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
198                     int index)
199 {
200         struct dt_device        *dt;
201         struct llog_log_hdr     *llh = loghandle->lgh_hdr;
202         struct thandle          *th;
203         int                      rc;
204
205         ENTRY;
206
207         CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n", index,
208                POSTID(&loghandle->lgh_id.lgl_oi));
209
210         if (index == 0) {
211                 CERROR("Can't cancel index 0 which is header\n");
212                 RETURN(-EINVAL);
213         }
214
215         LASSERT(loghandle != NULL);
216         LASSERT(loghandle->lgh_ctxt != NULL);
217         LASSERT(loghandle->lgh_obj != NULL);
218
219         dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
220
221         th = dt_trans_create(env, dt);
222         if (IS_ERR(th))
223                 RETURN(PTR_ERR(th));
224
225         rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, index, th);
226         if (rc < 0)
227                 GOTO(out_trans, rc);
228
229         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY))
230                 rc = llog_declare_destroy(env, loghandle, th);
231
232         th->th_wait_submit = 1;
233         rc = dt_trans_start_local(env, dt, th);
234         if (rc < 0)
235                 GOTO(out_trans, rc);
236
237         down_write(&loghandle->lgh_lock);
238         /* clear bitmap */
239         mutex_lock(&loghandle->lgh_hdr_mutex);
240         if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
241                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
242                 GOTO(out_unlock, rc);
243         }
244         /* update header */
245         rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL,
246                             LLOG_HEADER_IDX, th);
247         if (rc == 0)
248                 loghandle->lgh_hdr->llh_count--;
249         else
250                 ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
251
252         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
253             (llh->llh_count == 1) &&
254             (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
255                 rc = llog_trans_destroy(env, loghandle, th);
256                 if (rc < 0) {
257                         /* Sigh, can not destroy the final plain llog, but
258                          * the bitmap has been clearly, so the record can not
259                          * be accessed anymore, let's return 0 for now, and
260                          * the orphan will be handled by LFSCK. */
261                         CERROR("%s: can't destroy empty llog #"DOSTID
262                                "#%08x: rc = %d\n",
263                                loghandle->lgh_ctxt->loc_obd->obd_name,
264                                POSTID(&loghandle->lgh_id.lgl_oi),
265                                loghandle->lgh_id.lgl_ogen, rc);
266                         GOTO(out_unlock, rc = 0);
267                 }
268                 rc = LLOG_DEL_PLAIN;
269         }
270
271 out_unlock:
272         mutex_unlock(&loghandle->lgh_hdr_mutex);
273         up_write(&loghandle->lgh_lock);
274 out_trans:
275         dt_trans_stop(env, dt, th);
276         RETURN(rc);
277 }
278
279 static int llog_read_header(const struct lu_env *env,
280                             struct llog_handle *handle,
281                             struct obd_uuid *uuid)
282 {
283         struct llog_operations *lop;
284         int rc;
285
286         rc = llog_handle2ops(handle, &lop);
287         if (rc)
288                 RETURN(rc);
289
290         if (lop->lop_read_header == NULL)
291                 RETURN(-EOPNOTSUPP);
292
293         rc = lop->lop_read_header(env, handle);
294         if (rc == LLOG_EEMPTY) {
295                 struct llog_log_hdr *llh = handle->lgh_hdr;
296
297                 /* lrh_len should be initialized in llog_init_handle */
298                 handle->lgh_last_idx = 0; /* header is record with index 0 */
299                 llh->llh_count = 1;         /* for the header record */
300                 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
301                 LASSERT(handle->lgh_ctxt->loc_chunk_size >=
302                                                 LLOG_MIN_CHUNK_SIZE);
303                 llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
304                 llh->llh_hdr.lrh_index = 0;
305                 llh->llh_timestamp = cfs_time_current_sec();
306                 if (uuid)
307                         memcpy(&llh->llh_tgtuuid, uuid,
308                                sizeof(llh->llh_tgtuuid));
309                 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
310                 ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
311                 LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
312                 LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
313                 rc = 0;
314         }
315         return rc;
316 }
317
318 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
319                      int flags, struct obd_uuid *uuid)
320 {
321         struct llog_log_hdr     *llh;
322         enum llog_flag           fmt = flags & LLOG_F_EXT_MASK;
323         int                      rc;
324         int                     chunk_size = handle->lgh_ctxt->loc_chunk_size;
325         ENTRY;
326
327         LASSERT(handle->lgh_hdr == NULL);
328
329         LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
330         OBD_ALLOC_LARGE(llh, chunk_size);
331         if (llh == NULL)
332                 RETURN(-ENOMEM);
333
334         handle->lgh_hdr = llh;
335         handle->lgh_hdr_size = chunk_size;
336         /* first assign flags to use llog_client_ops */
337         llh->llh_flags = flags;
338         rc = llog_read_header(env, handle, uuid);
339         if (rc == 0) {
340                 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
341                               flags & LLOG_F_IS_CAT) ||
342                              (llh->llh_flags & LLOG_F_IS_CAT &&
343                               flags & LLOG_F_IS_PLAIN))) {
344                         CERROR("%s: llog type is %s but initializing %s\n",
345                                handle->lgh_ctxt->loc_obd->obd_name,
346                                llh->llh_flags & LLOG_F_IS_CAT ?
347                                "catalog" : "plain",
348                                flags & LLOG_F_IS_CAT ? "catalog" : "plain");
349                         GOTO(out, rc = -EINVAL);
350                 } else if (llh->llh_flags &
351                            (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
352                         /*
353                          * it is possible to open llog without specifying llog
354                          * type so it is taken from llh_flags
355                          */
356                         flags = llh->llh_flags;
357                 } else {
358                         /* for some reason the llh_flags has no type set */
359                         CERROR("llog type is not specified!\n");
360                         GOTO(out, rc = -EINVAL);
361                 }
362                 if (unlikely(uuid &&
363                              !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
364                         CERROR("%s: llog uuid mismatch: %s/%s\n",
365                                handle->lgh_ctxt->loc_obd->obd_name,
366                                (char *)uuid->uuid,
367                                (char *)llh->llh_tgtuuid.uuid);
368                         GOTO(out, rc = -EEXIST);
369                 }
370         }
371         if (flags & LLOG_F_IS_CAT) {
372                 LASSERT(list_empty(&handle->u.chd.chd_head));
373                 INIT_LIST_HEAD(&handle->u.chd.chd_head);
374                 llh->llh_size = sizeof(struct llog_logid_rec);
375         } else if (!(flags & LLOG_F_IS_PLAIN)) {
376                 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
377                        handle->lgh_ctxt->loc_obd->obd_name,
378                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
379                 rc = -EINVAL;
380         }
381         llh->llh_flags |= fmt;
382 out:
383         if (rc) {
384                 OBD_FREE_LARGE(llh, chunk_size);
385                 handle->lgh_hdr = NULL;
386         }
387         RETURN(rc);
388 }
389 EXPORT_SYMBOL(llog_init_handle);
390
391 static int llog_process_thread(void *arg)
392 {
393         struct llog_process_info        *lpi = arg;
394         struct llog_handle              *loghandle = lpi->lpi_loghandle;
395         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
396         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
397         char                            *buf;
398         size_t                           chunk_size;
399         __u64                            cur_offset;
400         int                              rc = 0, index = 1, last_index;
401         int                              saved_index = 0;
402         int                              last_called_index = 0;
403
404         ENTRY;
405
406         if (llh == NULL)
407                 RETURN(-EINVAL);
408
409         cur_offset = chunk_size = llh->llh_hdr.lrh_len;
410         /* expect chunk_size to be power of two */
411         LASSERT(is_power_of_2(chunk_size));
412
413         OBD_ALLOC_LARGE(buf, chunk_size);
414         if (buf == NULL) {
415                 lpi->lpi_rc = -ENOMEM;
416                 RETURN(0);
417         }
418
419         if (cd != NULL) {
420                 last_called_index = cd->lpcd_first_idx;
421                 index = cd->lpcd_first_idx + 1;
422         }
423         if (cd != NULL && cd->lpcd_last_idx)
424                 last_index = cd->lpcd_last_idx;
425         else
426                 last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
427
428         while (rc == 0) {
429                 struct llog_rec_hdr *rec;
430                 off_t chunk_offset;
431                 unsigned int buf_offset = 0;
432                 bool partial_chunk;
433
434                 /* skip records not set in bitmap */
435                 while (index <= last_index &&
436                        !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
437                         ++index;
438
439                 /* There are no indices prior the last_index */
440                 if (index > last_index)
441                         break;
442
443                 CDEBUG(D_OTHER, "index: %d last_index %d\n", index,
444                        last_index);
445
446 repeat:
447                 /* get the buf with our target record; avoid old garbage */
448                 memset(buf, 0, chunk_size);
449                 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
450                                      index, &cur_offset, buf, chunk_size);
451                 if (rc != 0)
452                         GOTO(out, rc);
453
454                 /* NB: after llog_next_block() call the cur_offset is the
455                  * offset of the next block after read one.
456                  * The absolute offset of the current chunk is calculated
457                  * from cur_offset value and stored in chunk_offset variable.
458                  */
459                 if (cur_offset % chunk_size != 0) {
460                         partial_chunk = true;
461                         chunk_offset = cur_offset & ~(chunk_size - 1);
462                 } else {
463                         partial_chunk = false;
464                         chunk_offset = cur_offset - chunk_size;
465                 }
466
467                 /* NB: when rec->lrh_len is accessed it is already swabbed
468                  * since it is used at the "end" of the loop and the rec
469                  * swabbing is done at the beginning of the loop. */
470                 for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
471                      (char *)rec < buf + chunk_size;
472                      rec = llog_rec_hdr_next(rec)) {
473
474                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
475                                rec, rec->lrh_type);
476
477                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
478                                 lustre_swab_llog_rec(rec);
479
480                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
481                                rec->lrh_type, rec->lrh_index);
482
483                         /* for partial chunk the end of it is zeroed, check
484                          * for index 0 to distinguish it. */
485                         if (partial_chunk && rec->lrh_index == 0) {
486                                 /* concurrent llog_add() might add new records
487                                  * while llog_processing, check this is not
488                                  * the case and re-read the current chunk
489                                  * otherwise. */
490                                 if (index > loghandle->lgh_last_idx)
491                                         GOTO(out, rc = 0);
492                                 CDEBUG(D_OTHER, "Re-read last llog buffer for "
493                                        "new records, index %u, last %u\n",
494                                        index, loghandle->lgh_last_idx);
495                                 /* save offset inside buffer for the re-read */
496                                 buf_offset = (char *)rec - (char *)buf;
497                                 cur_offset = chunk_offset;
498                                 goto repeat;
499                         }
500
501                         if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
502                                 CWARN("invalid length %d in llog record for "
503                                       "index %d/%d\n", rec->lrh_len,
504                                       rec->lrh_index, index);
505                                 GOTO(out, rc = -EINVAL);
506                         }
507
508                         if (rec->lrh_index < index) {
509                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
510                                        rec->lrh_index);
511                                 continue;
512                         }
513
514                         if (rec->lrh_index != index) {
515                                 CERROR("%s: Invalid record: index %u but "
516                                        "expected %u\n",
517                                        loghandle->lgh_ctxt->loc_obd->obd_name,
518                                        rec->lrh_index, index);
519                                 GOTO(out, rc = -ERANGE);
520                         }
521
522                         CDEBUG(D_OTHER,
523                                "lrh_index: %d lrh_len: %d (%d remains)\n",
524                                rec->lrh_index, rec->lrh_len,
525                                (int)(buf + chunk_size - (char *)rec));
526
527                         loghandle->lgh_cur_idx = rec->lrh_index;
528                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
529                                                     chunk_offset;
530
531                         /* if set, process the callback on this record */
532                         if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
533                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
534                                                  lpi->lpi_cbdata);
535                                 last_called_index = index;
536                                 if (rc == LLOG_PROC_BREAK) {
537                                         GOTO(out, rc);
538                                 } else if (rc == LLOG_DEL_RECORD) {
539                                         rc = llog_cancel_rec(lpi->lpi_env,
540                                                              loghandle,
541                                                              rec->lrh_index);
542                                 }
543                                 if (rc)
544                                         GOTO(out, rc);
545                         }
546                         /* exit if the last index is reached */
547                         if (index >= last_index)
548                                 GOTO(out, rc = 0);
549                         ++index;
550                 }
551         }
552
553 out:
554         if (cd != NULL)
555                 cd->lpcd_last_idx = last_called_index;
556
557         if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
558                 /* something bad happened to the processing of a local
559                  * llog file, probably I/O error or the log got corrupted..
560                  * to be able to finally release the log we discard any
561                  * remaining bits in the header */
562                 CERROR("Local llog found corrupted\n");
563                 while (index <= last_index) {
564                         if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
565                                 llog_cancel_rec(lpi->lpi_env, loghandle, index);
566                         index++;
567                 }
568                 rc = 0;
569         }
570
571         OBD_FREE_LARGE(buf, chunk_size);
572         lpi->lpi_rc = rc;
573         return 0;
574 }
575
576 static int llog_process_thread_daemonize(void *arg)
577 {
578         struct llog_process_info        *lpi = arg;
579         struct lu_env                    env;
580         int                              rc;
581
582         unshare_fs_struct();
583
584         /* client env has no keys, tags is just 0 */
585         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
586         if (rc)
587                 goto out;
588         lpi->lpi_env = &env;
589
590         rc = llog_process_thread(arg);
591
592         lu_env_fini(&env);
593 out:
594         complete(&lpi->lpi_completion);
595         return rc;
596 }
597
598 int llog_process_or_fork(const struct lu_env *env,
599                          struct llog_handle *loghandle,
600                          llog_cb_t cb, void *data, void *catdata, bool fork)
601 {
602         struct llog_process_info *lpi;
603         int                      rc;
604
605         ENTRY;
606
607         OBD_ALLOC_PTR(lpi);
608         if (lpi == NULL) {
609                 CERROR("cannot alloc pointer\n");
610                 RETURN(-ENOMEM);
611         }
612         lpi->lpi_loghandle = loghandle;
613         lpi->lpi_cb        = cb;
614         lpi->lpi_cbdata    = data;
615         lpi->lpi_catdata   = catdata;
616
617         if (fork) {
618                 struct task_struct *task;
619
620                 /* The new thread can't use parent env,
621                  * init the new one in llog_process_thread_daemonize. */
622                 lpi->lpi_env = NULL;
623                 init_completion(&lpi->lpi_completion);
624                 task = kthread_run(llog_process_thread_daemonize, lpi,
625                                    "llog_process_thread");
626                 if (IS_ERR(task)) {
627                         rc = PTR_ERR(task);
628                         CERROR("%s: cannot start thread: rc = %d\n",
629                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
630                         GOTO(out_lpi, rc);
631                 }
632                 wait_for_completion(&lpi->lpi_completion);
633         } else {
634                 lpi->lpi_env = env;
635                 llog_process_thread(lpi);
636         }
637         rc = lpi->lpi_rc;
638
639 out_lpi:
640         OBD_FREE_PTR(lpi);
641         RETURN(rc);
642 }
643 EXPORT_SYMBOL(llog_process_or_fork);
644
645 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
646                  llog_cb_t cb, void *data, void *catdata)
647 {
648         int rc;
649         rc = llog_process_or_fork(env, loghandle, cb, data, catdata, true);
650         return rc == LLOG_DEL_PLAIN ? 0 : rc;
651 }
652 EXPORT_SYMBOL(llog_process);
653
654 int llog_reverse_process(const struct lu_env *env,
655                          struct llog_handle *loghandle, llog_cb_t cb,
656                          void *data, void *catdata)
657 {
658         struct llog_log_hdr *llh = loghandle->lgh_hdr;
659         struct llog_process_cat_data *cd = catdata;
660         void *buf;
661         int rc = 0, first_index = 1, index, idx;
662         __u32   chunk_size = llh->llh_hdr.lrh_len;
663         ENTRY;
664
665         OBD_ALLOC_LARGE(buf, chunk_size);
666         if (buf == NULL)
667                 RETURN(-ENOMEM);
668
669         if (cd != NULL)
670                 first_index = cd->lpcd_first_idx + 1;
671         if (cd != NULL && cd->lpcd_last_idx)
672                 index = cd->lpcd_last_idx;
673         else
674                 index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
675
676         while (rc == 0) {
677                 struct llog_rec_hdr *rec;
678                 struct llog_rec_tail *tail;
679
680                 /* skip records not set in bitmap */
681                 while (index >= first_index &&
682                        !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
683                         --index;
684
685                 LASSERT(index >= first_index - 1);
686                 if (index == first_index - 1)
687                         break;
688
689                 /* get the buf with our target record; avoid old garbage */
690                 memset(buf, 0, chunk_size);
691                 rc = llog_prev_block(env, loghandle, index, buf, chunk_size);
692                 if (rc)
693                         GOTO(out, rc);
694
695                 rec = buf;
696                 idx = rec->lrh_index;
697                 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
698                 while (idx < index) {
699                         rec = (void *)rec + rec->lrh_len;
700                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
701                                 lustre_swab_llog_rec(rec);
702                         idx ++;
703                 }
704                 LASSERT(idx == index);
705                 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
706
707                 /* process records in buffer, starting where we found one */
708                 while ((void *)tail > buf) {
709                         if (tail->lrt_index == 0)
710                                 GOTO(out, rc = 0); /* no more records */
711
712                         /* if set, process the callback on this record */
713                         if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
714                                 rec = (void *)tail - tail->lrt_len +
715                                       sizeof(*tail);
716
717                                 rc = cb(env, loghandle, rec, data);
718                                 if (rc == LLOG_PROC_BREAK) {
719                                         GOTO(out, rc);
720                                 } else if (rc == LLOG_DEL_RECORD) {
721                                         rc = llog_cancel_rec(env, loghandle,
722                                                              tail->lrt_index);
723                                 }
724                                 if (rc)
725                                         GOTO(out, rc);
726                         }
727
728                         /* previous record, still in buffer? */
729                         --index;
730                         if (index < first_index)
731                                 GOTO(out, rc = 0);
732                         tail = (void *)tail - tail->lrt_len;
733                 }
734         }
735
736 out:
737         if (buf != NULL)
738                 OBD_FREE_LARGE(buf, chunk_size);
739         RETURN(rc);
740 }
741 EXPORT_SYMBOL(llog_reverse_process);
742
743 /**
744  * new llog API
745  *
746  * API functions:
747  *      llog_open - open llog, may not exist
748  *      llog_exist - check if llog exists
749  *      llog_close - close opened llog, pair for open, frees llog_handle
750  *      llog_declare_create - declare llog creation
751  *      llog_create - create new llog on disk, need transaction handle
752  *      llog_declare_write_rec - declaration of llog write
753  *      llog_write_rec - write llog record on disk, need transaction handle
754  *      llog_declare_add - declare llog catalog record addition
755  *      llog_add - add llog record in catalog, need transaction handle
756  */
757 int llog_exist(struct llog_handle *loghandle)
758 {
759         struct llog_operations  *lop;
760         int                      rc;
761
762         ENTRY;
763
764         rc = llog_handle2ops(loghandle, &lop);
765         if (rc)
766                 RETURN(rc);
767         if (lop->lop_exist == NULL)
768                 RETURN(-EOPNOTSUPP);
769
770         rc = lop->lop_exist(loghandle);
771         RETURN(rc);
772 }
773 EXPORT_SYMBOL(llog_exist);
774
775 int llog_declare_create(const struct lu_env *env,
776                         struct llog_handle *loghandle, struct thandle *th)
777 {
778         struct llog_operations  *lop;
779         int                      raised, rc;
780
781         ENTRY;
782
783         rc = llog_handle2ops(loghandle, &lop);
784         if (rc)
785                 RETURN(rc);
786         if (lop->lop_declare_create == NULL)
787                 RETURN(-EOPNOTSUPP);
788
789         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
790         if (!raised)
791                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
792         rc = lop->lop_declare_create(env, loghandle, th);
793         if (!raised)
794                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
795         RETURN(rc);
796 }
797
798 int llog_create(const struct lu_env *env, struct llog_handle *handle,
799                 struct thandle *th)
800 {
801         struct llog_operations  *lop;
802         int                      raised, rc;
803
804         ENTRY;
805
806         rc = llog_handle2ops(handle, &lop);
807         if (rc)
808                 RETURN(rc);
809         if (lop->lop_create == NULL)
810                 RETURN(-EOPNOTSUPP);
811
812         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
813         if (!raised)
814                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
815         rc = lop->lop_create(env, handle, th);
816         if (!raised)
817                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
818         RETURN(rc);
819 }
820
821 int llog_declare_write_rec(const struct lu_env *env,
822                            struct llog_handle *handle,
823                            struct llog_rec_hdr *rec, int idx,
824                            struct thandle *th)
825 {
826         struct llog_operations  *lop;
827         int                      raised, rc;
828
829         ENTRY;
830
831         rc = llog_handle2ops(handle, &lop);
832         if (rc)
833                 RETURN(rc);
834         LASSERT(lop);
835         if (lop->lop_declare_write_rec == NULL)
836                 RETURN(-EOPNOTSUPP);
837
838         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
839         if (!raised)
840                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
841         rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
842         if (!raised)
843                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
844         RETURN(rc);
845 }
846
847 int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
848                    struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
849                    int idx, struct thandle *th)
850 {
851         struct llog_operations  *lop;
852         int                      raised, rc, buflen;
853
854         ENTRY;
855
856         rc = llog_handle2ops(handle, &lop);
857         if (rc)
858                 RETURN(rc);
859
860         LASSERT(lop);
861         if (lop->lop_write_rec == NULL)
862                 RETURN(-EOPNOTSUPP);
863
864         buflen = rec->lrh_len;
865         LASSERT(cfs_size_round(buflen) == buflen);
866
867         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
868         if (!raised)
869                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
870         rc = lop->lop_write_rec(env, handle, rec, logcookies, idx, th);
871         if (!raised)
872                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
873         RETURN(rc);
874 }
875
876 int llog_add(const struct lu_env *env, struct llog_handle *lgh,
877              struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
878              struct thandle *th)
879 {
880         int raised, rc;
881
882         ENTRY;
883
884         if (lgh->lgh_logops->lop_add == NULL)
885                 RETURN(-EOPNOTSUPP);
886
887         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
888         if (!raised)
889                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
890         rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, th);
891         if (!raised)
892                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
893         RETURN(rc);
894 }
895 EXPORT_SYMBOL(llog_add);
896
897 int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
898                      struct llog_rec_hdr *rec, struct thandle *th)
899 {
900         int raised, rc;
901
902         ENTRY;
903
904         if (lgh->lgh_logops->lop_declare_add == NULL)
905                 RETURN(-EOPNOTSUPP);
906
907         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
908         if (!raised)
909                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
910         rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
911         if (!raised)
912                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
913         RETURN(rc);
914 }
915 EXPORT_SYMBOL(llog_declare_add);
916
917 /**
918  * Helper function to open llog or create it if doesn't exist.
919  * It hides all transaction handling from caller.
920  */
921 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
922                      struct llog_handle **res, struct llog_logid *logid,
923                      char *name)
924 {
925         struct dt_device        *d;
926         struct thandle          *th;
927         int                      rc;
928
929         ENTRY;
930
931         rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
932         if (rc)
933                 RETURN(rc);
934
935         if (llog_exist(*res))
936                 RETURN(0);
937
938         LASSERT((*res)->lgh_obj != NULL);
939
940         d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
941
942         th = dt_trans_create(env, d);
943         if (IS_ERR(th))
944                 GOTO(out, rc = PTR_ERR(th));
945
946         /* Create update llog object synchronously, which
947          * happens during inialization process see
948          * lod_sub_prep_llog(), to make sure the update
949          * llog object is created before corss-MDT writing
950          * updates into the llog object */
951         if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
952                 th->th_sync = 1;
953
954         th->th_wait_submit = 1;
955         rc = llog_declare_create(env, *res, th);
956         if (rc == 0) {
957                 rc = dt_trans_start_local(env, d, th);
958                 if (rc == 0)
959                         rc = llog_create(env, *res, th);
960         }
961         dt_trans_stop(env, d, th);
962 out:
963         if (rc)
964                 llog_close(env, *res);
965         RETURN(rc);
966 }
967 EXPORT_SYMBOL(llog_open_create);
968
969 /**
970  * Helper function to delete existent llog.
971  */
972 int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
973                struct llog_logid *logid, char *name)
974 {
975         struct llog_handle      *handle;
976         int                      rc = 0, rc2;
977
978         ENTRY;
979
980         /* nothing to erase */
981         if (name == NULL && logid == NULL)
982                 RETURN(0);
983
984         rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
985         if (rc < 0)
986                 RETURN(rc);
987
988         rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
989         if (rc == 0)
990                 rc = llog_destroy(env, handle);
991
992         rc2 = llog_close(env, handle);
993         if (rc == 0)
994                 rc = rc2;
995         RETURN(rc);
996 }
997 EXPORT_SYMBOL(llog_erase);
998
999 /*
1000  * Helper function for write record in llog.
1001  * It hides all transaction handling from caller.
1002  * Valid only with local llog.
1003  */
1004 int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
1005                struct llog_rec_hdr *rec, int idx)
1006 {
1007         struct dt_device        *dt;
1008         struct thandle          *th;
1009         int                      rc;
1010
1011         ENTRY;
1012
1013         LASSERT(loghandle);
1014         LASSERT(loghandle->lgh_ctxt);
1015         LASSERT(loghandle->lgh_obj != NULL);
1016
1017         dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
1018
1019         th = dt_trans_create(env, dt);
1020         if (IS_ERR(th))
1021                 RETURN(PTR_ERR(th));
1022
1023         rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
1024         if (rc)
1025                 GOTO(out_trans, rc);
1026
1027         th->th_wait_submit = 1;
1028         rc = dt_trans_start_local(env, dt, th);
1029         if (rc)
1030                 GOTO(out_trans, rc);
1031
1032         down_write(&loghandle->lgh_lock);
1033         rc = llog_write_rec(env, loghandle, rec, NULL, idx, th);
1034         up_write(&loghandle->lgh_lock);
1035 out_trans:
1036         dt_trans_stop(env, dt, th);
1037         RETURN(rc);
1038 }
1039 EXPORT_SYMBOL(llog_write);
1040
1041 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
1042               struct llog_handle **lgh, struct llog_logid *logid,
1043               char *name, enum llog_open_param open_param)
1044 {
1045         int      raised;
1046         int      rc;
1047
1048         ENTRY;
1049
1050         LASSERT(ctxt);
1051         LASSERT(ctxt->loc_logops);
1052
1053         if (ctxt->loc_logops->lop_open == NULL) {
1054                 *lgh = NULL;
1055                 RETURN(-EOPNOTSUPP);
1056         }
1057
1058         *lgh = llog_alloc_handle();
1059         if (*lgh == NULL)
1060                 RETURN(-ENOMEM);
1061         (*lgh)->lgh_ctxt = ctxt;
1062         (*lgh)->lgh_logops = ctxt->loc_logops;
1063
1064         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
1065         if (!raised)
1066                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
1067         rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
1068         if (!raised)
1069                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
1070         if (rc) {
1071                 llog_free_handle(*lgh);
1072                 *lgh = NULL;
1073         }
1074         RETURN(rc);
1075 }
1076 EXPORT_SYMBOL(llog_open);
1077
1078 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
1079 {
1080         struct llog_operations  *lop;
1081         int                      rc;
1082
1083         ENTRY;
1084
1085         rc = llog_handle2ops(loghandle, &lop);
1086         if (rc)
1087                 GOTO(out, rc);
1088         if (lop->lop_close == NULL)
1089                 GOTO(out, rc = -EOPNOTSUPP);
1090         rc = lop->lop_close(env, loghandle);
1091 out:
1092         llog_handle_put(loghandle);
1093         RETURN(rc);
1094 }
1095 EXPORT_SYMBOL(llog_close);
1096
1097 /**
1098  * Helper function to get the llog size in records. It is used by MGS
1099  * mostly to check that config llog exists and contains data.
1100  *
1101  * \param[in] env       execution environment
1102  * \param[in] ctxt      llog context
1103  * \param[in] name      llog name
1104  *
1105  * \retval              true if there are records in llog besides a header
1106  * \retval              false on error or llog without records
1107  */
1108 int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
1109                   char *name)
1110 {
1111         struct llog_handle      *llh;
1112         int                      rc = 0;
1113
1114         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1115         if (rc < 0) {
1116                 if (likely(rc == -ENOENT))
1117                         rc = 0;
1118                 GOTO(out, rc);
1119         }
1120
1121         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1122         if (rc)
1123                 GOTO(out_close, rc);
1124         rc = llog_get_size(llh);
1125
1126 out_close:
1127         llog_close(env, llh);
1128 out:
1129         /* The header is record 1, the llog is still considered as empty
1130          * if there is only header */
1131         return (rc <= 1);
1132 }
1133 EXPORT_SYMBOL(llog_is_empty);
1134
1135 int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
1136                       struct llog_rec_hdr *rec, void *data)
1137 {
1138         struct llog_handle      *copy_llh = data;
1139
1140         /* Append all records */
1141         return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
1142 }
1143
1144 /* backup plain llog */
1145 int llog_backup(const struct lu_env *env, struct obd_device *obd,
1146                 struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
1147                 char *name, char *backup)
1148 {
1149         struct llog_handle      *llh, *bllh;
1150         int                      rc;
1151
1152         ENTRY;
1153
1154         /* open original log */
1155         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1156         if (rc < 0) {
1157                 /* the -ENOENT case is also reported to the caller
1158                  * but silently so it should handle that if needed.
1159                  */
1160                 if (rc != -ENOENT)
1161                         CERROR("%s: failed to open log %s: rc = %d\n",
1162                                obd->obd_name, name, rc);
1163                 RETURN(rc);
1164         }
1165
1166         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1167         if (rc)
1168                 GOTO(out_close, rc);
1169
1170         /* Make sure there's no old backup log */
1171         rc = llog_erase(env, bctxt, NULL, backup);
1172         if (rc < 0 && rc != -ENOENT)
1173                 GOTO(out_close, rc);
1174
1175         /* open backup log */
1176         rc = llog_open_create(env, bctxt, &bllh, NULL, backup);
1177         if (rc) {
1178                 CERROR("%s: failed to open backup logfile %s: rc = %d\n",
1179                        obd->obd_name, backup, rc);
1180                 GOTO(out_close, rc);
1181         }
1182
1183         /* check that backup llog is not the same object as original one */
1184         if (llh->lgh_obj == bllh->lgh_obj) {
1185                 CERROR("%s: backup llog %s to itself (%s), objects %p/%p\n",
1186                        obd->obd_name, name, backup, llh->lgh_obj,
1187                        bllh->lgh_obj);
1188                 GOTO(out_backup, rc = -EEXIST);
1189         }
1190
1191         rc = llog_init_handle(env, bllh, LLOG_F_IS_PLAIN, NULL);
1192         if (rc)
1193                 GOTO(out_backup, rc);
1194
1195         /* Copy log record by record */
1196         rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
1197                                   NULL, false);
1198         if (rc)
1199                 CERROR("%s: failed to backup log %s: rc = %d\n",
1200                        obd->obd_name, name, rc);
1201 out_backup:
1202         llog_close(env, bllh);
1203 out_close:
1204         llog_close(env, llh);
1205         RETURN(rc);
1206 }
1207 EXPORT_SYMBOL(llog_backup);