Whamcloud - gitweb
LU-1302 llog: pass lu_env to the llog callback
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/obdclass/llog.c
35  *
36  * OST<->MDS recovery logging infrastructure.
37  * Invariants in implementation:
38  * - we do not share logs among different OST<->MDS connections, so that
39  *   if an OST or MDS fails it need only look at log(s) relevant to itself
40  *
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
43  * Author: Mikhail Pershin <tappro@whamcloud.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_LOG
47
48 #ifndef __KERNEL__
49 #include <liblustre.h>
50 #endif
51
52 #include <obd_class.h>
53 #include <lustre_log.h>
54 #include "llog_internal.h"
55
56 /* Allocate a new log or catalog handle */
57 struct llog_handle *llog_alloc_handle(void)
58 {
59         struct llog_handle *loghandle;
60         ENTRY;
61
62         OBD_ALLOC_PTR(loghandle);
63         if (loghandle == NULL)
64                 RETURN(ERR_PTR(-ENOMEM));
65
66         cfs_init_rwsem(&loghandle->lgh_lock);
67         cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
68         CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
69
70         RETURN(loghandle);
71 }
72 EXPORT_SYMBOL(llog_alloc_handle);
73
74
75 void llog_free_handle(struct llog_handle *loghandle)
76 {
77         if (!loghandle)
78                 return;
79
80         if (!loghandle->lgh_hdr)
81                 goto out;
82         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
83                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
84         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
85                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
86         LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
87         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
88
89 out:
90         OBD_FREE_PTR(loghandle);
91 }
92 EXPORT_SYMBOL(llog_free_handle);
93
94 /* returns negative on error; 0 if success; 1 if success & log destroyed */
95 int llog_cancel_rec(struct llog_handle *loghandle, int index)
96 {
97         struct llog_log_hdr *llh = loghandle->lgh_hdr;
98         int rc = 0;
99         ENTRY;
100
101         CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
102                index, loghandle->lgh_id.lgl_oid);
103
104         if (index == 0) {
105                 CERROR("Can't cancel index 0 which is header\n");
106                 RETURN(-EINVAL);
107         }
108
109         cfs_spin_lock(&loghandle->lgh_hdr_lock);
110         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
111                 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
112                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
113                 RETURN(-ENOENT);
114         }
115
116         llh->llh_count--;
117
118         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
119             (llh->llh_count == 1) &&
120             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
121                 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
122                 rc = llog_destroy(loghandle);
123                 if (rc < 0) {
124                         CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
125                                "#%08x: rc = %d\n",
126                                loghandle->lgh_ctxt->loc_obd->obd_name,
127                                loghandle->lgh_id.lgl_oid,
128                                loghandle->lgh_id.lgl_oseq,
129                                loghandle->lgh_id.lgl_ogen, rc);
130                         GOTO(out_err, rc);
131                 }
132                 RETURN(1);
133         }
134         cfs_spin_unlock(&loghandle->lgh_hdr_lock);
135
136         rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
137         if (rc < 0) {
138                 CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
139                        "#%08x: rc = %d\n",
140                        loghandle->lgh_ctxt->loc_obd->obd_name,
141                        loghandle->lgh_id.lgl_oid,
142                        loghandle->lgh_id.lgl_oseq,
143                        loghandle->lgh_id.lgl_ogen, rc);
144                 GOTO(out_err, rc);
145         }
146         RETURN(0);
147 out_err:
148         cfs_spin_lock(&loghandle->lgh_hdr_lock);
149         ext2_set_bit(index, llh->llh_bitmap);
150         llh->llh_count++;
151         cfs_spin_unlock(&loghandle->lgh_hdr_lock);
152         return rc;
153 }
154 EXPORT_SYMBOL(llog_cancel_rec);
155
156 int llog_init_handle(struct llog_handle *handle, int flags,
157                      struct obd_uuid *uuid)
158 {
159         int rc;
160         struct llog_log_hdr *llh;
161         ENTRY;
162         LASSERT(handle->lgh_hdr == NULL);
163
164         OBD_ALLOC_PTR(llh);
165         if (llh == NULL)
166                 RETURN(-ENOMEM);
167         handle->lgh_hdr = llh;
168         /* first assign flags to use llog_client_ops */
169         llh->llh_flags = flags;
170         rc = llog_read_header(handle);
171         if (rc == 0) {
172                 flags = llh->llh_flags;
173                 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
174                         CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
175                                (char *)llh->llh_tgtuuid.uuid);
176                         rc = -EEXIST;
177                 }
178                 GOTO(out, rc);
179         } else if (rc != LLOG_EEMPTY || !flags) {
180                 /* set a pesudo flag for initialization */
181                 flags = LLOG_F_IS_CAT;
182                 GOTO(out, rc);
183         }
184         rc = 0;
185
186         handle->lgh_last_idx = 0; /* header is record with index 0 */
187         llh->llh_count = 1;         /* for the header record */
188         llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
189         llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
190         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
191         llh->llh_timestamp = cfs_time_current_sec();
192         if (uuid)
193                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
194         llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
195         ext2_set_bit(0, llh->llh_bitmap);
196
197 out:
198         if (flags & LLOG_F_IS_CAT) {
199                 LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
200                 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
201                 llh->llh_size = sizeof(struct llog_logid_rec);
202         } else if (!(flags & LLOG_F_IS_PLAIN)) {
203                 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
204                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
205                 rc = -EINVAL;
206         }
207
208         if (rc) {
209                 OBD_FREE_PTR(llh);
210                 handle->lgh_hdr = NULL;
211         }
212         RETURN(rc);
213 }
214 EXPORT_SYMBOL(llog_init_handle);
215
216 int llog_close(struct llog_handle *loghandle)
217 {
218         struct llog_operations *lop;
219         int rc;
220         ENTRY;
221
222         rc = llog_handle2ops(loghandle, &lop);
223         if (rc)
224                 GOTO(out, rc);
225         if (lop->lop_close == NULL)
226                 GOTO(out, -EOPNOTSUPP);
227         rc = lop->lop_close(loghandle);
228  out:
229         llog_free_handle(loghandle);
230         RETURN(rc);
231 }
232 EXPORT_SYMBOL(llog_close);
233
234 static int llog_process_thread(void *arg)
235 {
236         struct llog_process_info        *lpi = arg;
237         struct llog_handle              *loghandle = lpi->lpi_loghandle;
238         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
239         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
240         char                            *buf;
241         __u64                            cur_offset = LLOG_CHUNK_SIZE;
242         __u64                            last_offset;
243         int                              rc = 0, index = 1, last_index;
244         int                              saved_index = 0;
245         int                              last_called_index = 0;
246
247         ENTRY;
248
249         LASSERT(llh);
250
251         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
252         if (!buf) {
253                 lpi->lpi_rc = -ENOMEM;
254                 RETURN(0);
255         }
256
257         if (cd != NULL) {
258                 last_called_index = cd->lpcd_first_idx;
259                 index = cd->lpcd_first_idx + 1;
260         }
261         if (cd != NULL && cd->lpcd_last_idx)
262                 last_index = cd->lpcd_last_idx;
263         else
264                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
265
266         while (rc == 0) {
267                 struct llog_rec_hdr *rec;
268
269                 /* skip records not set in bitmap */
270                 while (index <= last_index &&
271                        !ext2_test_bit(index, llh->llh_bitmap))
272                         ++index;
273
274                 LASSERT(index <= last_index + 1);
275                 if (index == last_index + 1)
276                         break;
277 repeat:
278                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
279                        index, last_index);
280
281                 /* get the buf with our target record; avoid old garbage */
282                 memset(buf, 0, LLOG_CHUNK_SIZE);
283                 last_offset = cur_offset;
284                 rc = llog_next_block(loghandle, &saved_index, index,
285                                      &cur_offset, buf, LLOG_CHUNK_SIZE);
286                 if (rc)
287                         GOTO(out, rc);
288
289                 /* NB: when rec->lrh_len is accessed it is already swabbed
290                  * since it is used at the "end" of the loop and the rec
291                  * swabbing is done at the beginning of the loop. */
292                 for (rec = (struct llog_rec_hdr *)buf;
293                      (char *)rec < buf + LLOG_CHUNK_SIZE;
294                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
295
296                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
297                                rec, rec->lrh_type);
298
299                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
300                                 lustre_swab_llog_rec(rec);
301
302                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
303                                rec->lrh_type, rec->lrh_index);
304
305                         if (rec->lrh_index == 0) {
306                                 /* probably another rec just got added? */
307                                 if (index <= loghandle->lgh_last_idx)
308                                         GOTO(repeat, rc = 0);
309                                 GOTO(out, rc = 0); /* no more records */
310                         }
311                         if (rec->lrh_len == 0 ||
312                             rec->lrh_len > LLOG_CHUNK_SIZE) {
313                                 CWARN("invalid length %d in llog record for "
314                                       "index %d/%d\n", rec->lrh_len,
315                                       rec->lrh_index, index);
316                                 GOTO(out, rc = -EINVAL);
317                         }
318
319                         if (rec->lrh_index < index) {
320                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
321                                        rec->lrh_index);
322                                 continue;
323                         }
324
325                         CDEBUG(D_OTHER,
326                                "lrh_index: %d lrh_len: %d (%d remains)\n",
327                                rec->lrh_index, rec->lrh_len,
328                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
329
330                         loghandle->lgh_cur_idx = rec->lrh_index;
331                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
332                                                     last_offset;
333
334                         /* if set, process the callback on this record */
335                         if (ext2_test_bit(index, llh->llh_bitmap)) {
336                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
337                                                  lpi->lpi_cbdata);
338                                 last_called_index = index;
339                                 if (rc == LLOG_PROC_BREAK) {
340                                         GOTO(out, rc);
341                                 } else if (rc == LLOG_DEL_RECORD) {
342                                         llog_cancel_rec(loghandle,
343                                                         rec->lrh_index);
344                                         rc = 0;
345                                 }
346                                 if (rc)
347                                         GOTO(out, rc);
348                         } else {
349                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
350                         }
351
352                         /* next record, still in buffer? */
353                         ++index;
354                         if (index > last_index)
355                                 GOTO(out, rc = 0);
356                 }
357         }
358
359 out:
360         if (cd != NULL)
361                 cd->lpcd_last_idx = last_called_index;
362
363         OBD_FREE(buf, LLOG_CHUNK_SIZE);
364         lpi->lpi_rc = rc;
365         return 0;
366 }
367
368 #ifdef __KERNEL__
369 static int llog_process_thread_daemonize(void *arg)
370 {
371         struct llog_process_info        *lpi = arg;
372         struct lu_env                    env;
373         int                              rc;
374
375         cfs_daemonize_ctxt("llog_process_thread");
376
377         /* client env has no keys, tags is just 0 */
378         rc = lu_env_init(&env, LCT_LOCAL);
379         if (rc)
380                 goto out;
381         lpi->lpi_env = &env;
382
383         rc = llog_process_thread(arg);
384
385         lu_env_fini(&env);
386 out:
387         cfs_complete(&lpi->lpi_completion);
388         return rc;
389 }
390 #endif
391
392 int llog_process_or_fork(const struct lu_env *env,
393                          struct llog_handle *loghandle,
394                          llog_cb_t cb, void *data, void *catdata, bool fork)
395 {
396         struct llog_process_info *lpi;
397         int                      rc;
398
399         ENTRY;
400
401         OBD_ALLOC_PTR(lpi);
402         if (lpi == NULL) {
403                 CERROR("cannot alloc pointer\n");
404                 RETURN(-ENOMEM);
405         }
406         lpi->lpi_loghandle = loghandle;
407         lpi->lpi_cb        = cb;
408         lpi->lpi_cbdata    = data;
409         lpi->lpi_catdata   = catdata;
410
411 #ifdef __KERNEL__
412         if (fork) {
413                 /* The new thread can't use parent env,
414                  * init the new one in llog_process_thread_daemonize. */
415                 lpi->lpi_env = NULL;
416                 cfs_init_completion(&lpi->lpi_completion);
417                 rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
418                                        CFS_DAEMON_FLAGS);
419                 if (rc < 0) {
420                         CERROR("%s: cannot start thread: rc = %d\n",
421                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
422                         OBD_FREE_PTR(lpi);
423                         RETURN(rc);
424                 }
425                 cfs_wait_for_completion(&lpi->lpi_completion);
426         } else {
427                 lpi->lpi_env = env;
428                 llog_process_thread(lpi);
429         }
430 #else
431         lpi->lpi_env = env;
432         llog_process_thread(lpi);
433 #endif
434         rc = lpi->lpi_rc;
435         OBD_FREE_PTR(lpi);
436         RETURN(rc);
437 }
438
439 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
440                  llog_cb_t cb, void *data, void *catdata)
441 {
442         return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
443 }
444 EXPORT_SYMBOL(llog_process);
445
446 inline int llog_get_size(struct llog_handle *loghandle)
447 {
448         if (loghandle && loghandle->lgh_hdr)
449                 return loghandle->lgh_hdr->llh_count;
450         return 0;
451 }
452 EXPORT_SYMBOL(llog_get_size);
453
454 int llog_reverse_process(const struct lu_env *env,
455                          struct llog_handle *loghandle, llog_cb_t cb,
456                          void *data, void *catdata)
457 {
458         struct llog_log_hdr *llh = loghandle->lgh_hdr;
459         struct llog_process_cat_data *cd = catdata;
460         void *buf;
461         int rc = 0, first_index = 1, index, idx;
462         ENTRY;
463
464         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
465         if (!buf)
466                 RETURN(-ENOMEM);
467
468         if (cd != NULL)
469                 first_index = cd->lpcd_first_idx + 1;
470         if (cd != NULL && cd->lpcd_last_idx)
471                 index = cd->lpcd_last_idx;
472         else
473                 index = LLOG_BITMAP_BYTES * 8 - 1;
474
475         while (rc == 0) {
476                 struct llog_rec_hdr *rec;
477                 struct llog_rec_tail *tail;
478
479                 /* skip records not set in bitmap */
480                 while (index >= first_index &&
481                        !ext2_test_bit(index, llh->llh_bitmap))
482                         --index;
483
484                 LASSERT(index >= first_index - 1);
485                 if (index == first_index - 1)
486                         break;
487
488                 /* get the buf with our target record; avoid old garbage */
489                 memset(buf, 0, LLOG_CHUNK_SIZE);
490                 rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
491                 if (rc)
492                         GOTO(out, rc);
493
494                 rec = buf;
495                 idx = rec->lrh_index;
496                 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
497                 while (idx < index) {
498                         rec = (void *)rec + rec->lrh_len;
499                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
500                                 lustre_swab_llog_rec(rec);
501                         idx ++;
502                 }
503                 LASSERT(idx == index);
504                 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
505
506                 /* process records in buffer, starting where we found one */
507                 while ((void *)tail > buf) {
508                         if (tail->lrt_index == 0)
509                                 GOTO(out, rc = 0); /* no more records */
510
511                         /* if set, process the callback on this record */
512                         if (ext2_test_bit(index, llh->llh_bitmap)) {
513                                 rec = (void *)tail - tail->lrt_len +
514                                       sizeof(*tail);
515
516                                 rc = cb(env, loghandle, rec, data);
517                                 if (rc == LLOG_PROC_BREAK) {
518                                         GOTO(out, rc);
519                                 } else if (rc == LLOG_DEL_RECORD) {
520                                         llog_cancel_rec(loghandle,
521                                                         tail->lrt_index);
522                                         rc = 0;
523                                 }
524                                 if (rc)
525                                         GOTO(out, rc);
526                         }
527
528                         /* previous record, still in buffer? */
529                         --index;
530                         if (index < first_index)
531                                 GOTO(out, rc = 0);
532                         tail = (void *)tail - tail->lrt_len;
533                 }
534         }
535
536 out:
537         if (buf)
538                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
539         RETURN(rc);
540 }
541 EXPORT_SYMBOL(llog_reverse_process);