Whamcloud - gitweb
LU-502 don't allow to kill service threads by OOM killer.
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_LOG
47
48 #ifndef EXPORT_SYMTAB
49 #define EXPORT_SYMTAB
50 #endif
51
52 #ifndef __KERNEL__
53 #include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_log.h>
58 #include <libcfs/list.h>
59 #include "llog_internal.h"
60
61 /* Allocate a new log or catalog handle */
62 struct llog_handle *llog_alloc_handle(void)
63 {
64         struct llog_handle *loghandle;
65         ENTRY;
66
67         OBD_ALLOC(loghandle, sizeof(*loghandle));
68         if (loghandle == NULL)
69                 RETURN(ERR_PTR(-ENOMEM));
70
71         cfs_init_rwsem(&loghandle->lgh_lock);
72
73         RETURN(loghandle);
74 }
75 EXPORT_SYMBOL(llog_alloc_handle);
76
77
78 void llog_free_handle(struct llog_handle *loghandle)
79 {
80         if (!loghandle)
81                 return;
82
83         if (!loghandle->lgh_hdr)
84                 goto out;
85         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
87         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
89         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
90
91  out:
92         OBD_FREE(loghandle, sizeof(*loghandle));
93 }
94 EXPORT_SYMBOL(llog_free_handle);
95
96 /* returns negative on error; 0 if success; 1 if success & log destroyed */
97 int llog_cancel_rec(struct llog_handle *loghandle, int index)
98 {
99         struct llog_log_hdr *llh = loghandle->lgh_hdr;
100         int rc = 0;
101         ENTRY;
102
103         CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
104                index, loghandle->lgh_id.lgl_oid);
105
106         if (index == 0) {
107                 CERROR("Can't cancel index 0 which is header\n");
108                 RETURN(-EINVAL);
109         }
110
111         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
112                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
113                 RETURN(-ENOENT);
114         }
115
116         llh->llh_count--;
117
118         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
119             (llh->llh_count == 1) &&
120             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
121                 rc = llog_destroy(loghandle);
122                 if (rc) {
123                         CERROR("Failure destroying log after last cancel: %d\n",
124                                rc);
125                         ext2_set_bit(index, llh->llh_bitmap);
126                         llh->llh_count++;
127                 } else {
128                         rc = 1;
129                 }
130                 RETURN(rc);
131         }
132
133         rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
134         if (rc) {
135                 CERROR("Failure re-writing header %d\n", rc);
136                 ext2_set_bit(index, llh->llh_bitmap);
137                 llh->llh_count++;
138         }
139         RETURN(rc);
140 }
141 EXPORT_SYMBOL(llog_cancel_rec);
142
143 int llog_init_handle(struct llog_handle *handle, int flags,
144                      struct obd_uuid *uuid)
145 {
146         int rc;
147         struct llog_log_hdr *llh;
148         ENTRY;
149         LASSERT(handle->lgh_hdr == NULL);
150
151         OBD_ALLOC(llh, sizeof(*llh));
152         if (llh == NULL)
153                 RETURN(-ENOMEM);
154         handle->lgh_hdr = llh;
155         /* first assign flags to use llog_client_ops */
156         llh->llh_flags = flags;
157         rc = llog_read_header(handle);
158         if (rc == 0) {
159                 flags = llh->llh_flags;
160                 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
161                         CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
162                                (char *)llh->llh_tgtuuid.uuid);
163                         rc = -EEXIST;
164                 }
165                 GOTO(out, rc);
166         } else if (rc != LLOG_EEMPTY || !flags) {
167                 /* set a pesudo flag for initialization */
168                 flags = LLOG_F_IS_CAT;
169                 GOTO(out, rc);
170         }
171         rc = 0;
172
173         handle->lgh_last_idx = 0; /* header is record with index 0 */
174         llh->llh_count = 1;         /* for the header record */
175         llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
176         llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
177         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
178         llh->llh_timestamp = cfs_time_current_sec();
179         if (uuid)
180                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
181         llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
182         ext2_set_bit(0, llh->llh_bitmap);
183
184 out:
185         if (flags & LLOG_F_IS_CAT) {
186                 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
187                 llh->llh_size = sizeof(struct llog_logid_rec);
188         } else if (flags & LLOG_F_IS_PLAIN) {
189                 CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
190         } else {
191                 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
192                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
193                 LBUG();
194         }
195
196         if (rc) {
197                 OBD_FREE(llh, sizeof(*llh));
198                 handle->lgh_hdr = NULL;
199         }
200         RETURN(rc);
201 }
202 EXPORT_SYMBOL(llog_init_handle);
203
204 int llog_close(struct llog_handle *loghandle)
205 {
206         struct llog_operations *lop;
207         int rc;
208         ENTRY;
209
210         rc = llog_handle2ops(loghandle, &lop);
211         if (rc)
212                 GOTO(out, rc);
213         if (lop->lop_close == NULL)
214                 GOTO(out, -EOPNOTSUPP);
215         rc = lop->lop_close(loghandle);
216  out:
217         llog_free_handle(loghandle);
218         RETURN(rc);
219 }
220 EXPORT_SYMBOL(llog_close);
221
222 static int llog_process_thread(void *arg)
223 {
224         struct llog_process_info     *lpi = (struct llog_process_info *)arg;
225         struct llog_handle           *loghandle = lpi->lpi_loghandle;
226         struct llog_log_hdr          *llh = loghandle->lgh_hdr;
227         struct llog_process_cat_data *cd  = lpi->lpi_catdata;
228         char                         *buf;
229         __u64                         cur_offset = LLOG_CHUNK_SIZE;
230         __u64                         last_offset;
231         int                           rc = 0, index = 1, last_index;
232         int                           saved_index = 0, last_called_index = 0;
233
234         LASSERT(llh);
235
236         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
237         if (!buf) {
238                 lpi->lpi_rc = -ENOMEM;
239 #ifdef __KERNEL__
240                 cfs_complete(&lpi->lpi_completion);
241 #endif
242                 return 0;
243         }
244
245         if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
246                 cfs_daemonize_ctxt("llog_process_thread");
247
248         if (cd != NULL) {
249                 last_called_index = cd->lpcd_first_idx;
250                 index = cd->lpcd_first_idx + 1;
251         }
252         if (cd != NULL && cd->lpcd_last_idx)
253                 last_index = cd->lpcd_last_idx;
254         else
255                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
256
257         while (rc == 0) {
258                 struct llog_rec_hdr *rec;
259
260                 /* skip records not set in bitmap */
261                 while (index <= last_index &&
262                        !ext2_test_bit(index, llh->llh_bitmap))
263                         ++index;
264
265                 LASSERT(index <= last_index + 1);
266                 if (index == last_index + 1)
267                         break;
268
269                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
270                        index, last_index);
271
272                 /* get the buf with our target record; avoid old garbage */
273                 memset(buf, 0, LLOG_CHUNK_SIZE);
274                 last_offset = cur_offset;
275                 rc = llog_next_block(loghandle, &saved_index, index,
276                                      &cur_offset, buf, LLOG_CHUNK_SIZE);
277                 if (rc)
278                         GOTO(out, rc);
279
280                 /* NB: when rec->lrh_len is accessed it is already swabbed
281                  * since it is used at the "end" of the loop and the rec
282                  * swabbing is done at the beginning of the loop. */
283                 for (rec = (struct llog_rec_hdr *)buf;
284                      (char *)rec < buf + LLOG_CHUNK_SIZE;
285                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
286
287                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
288                                rec, rec->lrh_type);
289
290                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
291                                 lustre_swab_llog_rec(rec, NULL);
292
293                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
294                                rec->lrh_type, rec->lrh_index);
295
296                         if (rec->lrh_index == 0)
297                                 GOTO(out, 0); /* no more records */
298
299                         if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
300                                 CWARN("invalid length %d in llog record for "
301                                       "index %d/%d\n", rec->lrh_len,
302                                       rec->lrh_index, index);
303                                 GOTO(out, rc = -EINVAL);
304                         }
305
306                         if (rec->lrh_index < index) {
307                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
308                                        rec->lrh_index);
309                                 continue;
310                         }
311
312                         CDEBUG(D_OTHER,
313                                "lrh_index: %d lrh_len: %d (%d remains)\n",
314                                rec->lrh_index, rec->lrh_len,
315                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
316
317                         loghandle->lgh_cur_idx = rec->lrh_index;
318                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
319                                                     last_offset;
320
321                         /* if set, process the callback on this record */
322                         if (ext2_test_bit(index, llh->llh_bitmap)) {
323                                 rc = lpi->lpi_cb(loghandle, rec,
324                                                  lpi->lpi_cbdata);
325                                 last_called_index = index;
326                                 if (rc == LLOG_PROC_BREAK) {
327                                         GOTO(out, rc);
328                                 } else if (rc == LLOG_DEL_RECORD) {
329                                         llog_cancel_rec(loghandle,
330                                                         rec->lrh_index);
331                                         rc = 0;
332                                 }
333                                 if (rc)
334                                         GOTO(out, rc);
335                         } else {
336                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
337                         }
338
339                         /* next record, still in buffer? */
340                         ++index;
341                         if (index > last_index)
342                                 GOTO(out, rc = 0);
343                 }
344         }
345
346  out:
347         if (cd != NULL)
348                 cd->lpcd_last_idx = last_called_index;
349         if (buf)
350                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
351         lpi->lpi_rc = rc;
352 #ifdef __KERNEL__
353         cfs_complete(&lpi->lpi_completion);
354 #endif
355         return 0;
356 }
357
358 int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
359                        void *data, void *catdata, int flags)
360 {
361         struct llog_process_info *lpi;
362         int                      rc;
363         ENTRY;
364
365         OBD_ALLOC_PTR(lpi);
366         if (lpi == NULL) {
367                 CERROR("cannot alloc pointer\n");
368                 RETURN(-ENOMEM);
369         }
370         lpi->lpi_loghandle = loghandle;
371         lpi->lpi_cb        = cb;
372         lpi->lpi_cbdata    = data;
373         lpi->lpi_catdata   = catdata;
374         lpi->lpi_flags     = flags;
375
376 #ifdef __KERNEL__
377         cfs_init_completion(&lpi->lpi_completion);
378         rc = cfs_create_thread(llog_process_thread, lpi, CFS_DAEMON_FLAGS);
379         if (rc < 0) {
380                 CERROR("cannot start thread: %d\n", rc);
381                 OBD_FREE_PTR(lpi);
382                 RETURN(rc);
383         }
384         cfs_wait_for_completion(&lpi->lpi_completion);
385 #else
386         llog_process_thread(lpi);
387 #endif
388         rc = lpi->lpi_rc;
389         OBD_FREE_PTR(lpi);
390         RETURN(rc);
391 }
392 EXPORT_SYMBOL(llog_process_flags);
393
394 int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
395                  void *data, void *catdata)
396 {
397         return llog_process_flags(loghandle, cb, data, catdata, 0);
398 }
399 EXPORT_SYMBOL(llog_process);
400
401 inline int llog_get_size(struct llog_handle *loghandle)
402 {
403         if (loghandle && loghandle->lgh_hdr)
404                 return loghandle->lgh_hdr->llh_count;
405         return 0;
406 }
407 EXPORT_SYMBOL(llog_get_size);
408
409 int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
410                          void *data, void *catdata)
411 {
412         struct llog_log_hdr *llh = loghandle->lgh_hdr;
413         struct llog_process_cat_data *cd = catdata;
414         void *buf;
415         int rc = 0, first_index = 1, index, idx;
416         ENTRY;
417
418         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
419         if (!buf)
420                 RETURN(-ENOMEM);
421
422         if (cd != NULL)
423                 first_index = cd->lpcd_first_idx + 1;
424         if (cd != NULL && cd->lpcd_last_idx)
425                 index = cd->lpcd_last_idx;
426         else
427                 index = LLOG_BITMAP_BYTES * 8 - 1;
428
429         while (rc == 0) {
430                 struct llog_rec_hdr *rec;
431                 struct llog_rec_tail *tail;
432
433                 /* skip records not set in bitmap */
434                 while (index >= first_index &&
435                        !ext2_test_bit(index, llh->llh_bitmap))
436                         --index;
437
438                 LASSERT(index >= first_index - 1);
439                 if (index == first_index - 1)
440                         break;
441
442                 /* get the buf with our target record; avoid old garbage */
443                 memset(buf, 0, LLOG_CHUNK_SIZE);
444                 rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
445                 if (rc)
446                         GOTO(out, rc);
447
448                 rec = buf;
449                 idx = le32_to_cpu(rec->lrh_index);
450                 if (idx < index)
451                         CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
452                 while (idx < index) {
453                         rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
454                         idx ++;
455                 }
456                 tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
457
458                 /* process records in buffer, starting where we found one */
459                 while ((void *)tail > buf) {
460                         rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
461                                 sizeof(*tail);
462
463                         if (rec->lrh_index == 0)
464                                 GOTO(out, 0); /* no more records */
465
466                         /* if set, process the callback on this record */
467                         if (ext2_test_bit(index, llh->llh_bitmap)) {
468                                 rc = cb(loghandle, rec, data);
469                                 if (rc == LLOG_PROC_BREAK) {
470                                         GOTO(out, rc);
471                                 }
472                                 if (rc)
473                                         GOTO(out, rc);
474                         }
475
476                         /* previous record, still in buffer? */
477                         --index;
478                         if (index < first_index)
479                                 GOTO(out, rc = 0);
480                         tail = (void *)rec - sizeof(*tail);
481                 }
482         }
483
484 out:
485         if (buf)
486                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
487         RETURN(rc);
488 }
489 EXPORT_SYMBOL(llog_reverse_process);