Whamcloud - gitweb
LU-1818 quota: en/disable quota enforcement via conf_param
[fs/lustre-release.git] / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/obdclass/llog.c
35  *
36  * OST<->MDS recovery logging infrastructure.
37  * Invariants in implementation:
38  * - we do not share logs among different OST<->MDS connections, so that
39  *   if an OST or MDS fails it need only look at log(s) relevant to itself
40  *
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
43  * Author: Mikhail Pershin <tappro@whamcloud.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_LOG
47
48 #ifndef __KERNEL__
49 #include <liblustre.h>
50 #endif
51
52 #include <obd_class.h>
53 #include <lustre_log.h>
54 #include "llog_internal.h"
55
56 /* Allocate a new log or catalog handle */
57 struct llog_handle *llog_alloc_handle(void)
58 {
59         struct llog_handle *loghandle;
60         ENTRY;
61
62         OBD_ALLOC_PTR(loghandle);
63         if (loghandle == NULL)
64                 RETURN(ERR_PTR(-ENOMEM));
65
66         cfs_init_rwsem(&loghandle->lgh_lock);
67         cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
68         CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
69
70         RETURN(loghandle);
71 }
72 EXPORT_SYMBOL(llog_alloc_handle);
73
74
75 void llog_free_handle(struct llog_handle *loghandle)
76 {
77         if (!loghandle)
78                 return;
79
80         if (!loghandle->lgh_hdr)
81                 goto out;
82         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
83                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
84         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
85                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
86         LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
87         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
88
89 out:
90         OBD_FREE_PTR(loghandle);
91 }
92 EXPORT_SYMBOL(llog_free_handle);
93
94 /* returns negative on error; 0 if success; 1 if success & log destroyed */
95 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
96                     int index)
97 {
98         struct llog_log_hdr *llh = loghandle->lgh_hdr;
99         int rc = 0;
100         ENTRY;
101
102         CDEBUG(D_RPCTRACE, "Canceling %d in log "LPX64"\n",
103                index, loghandle->lgh_id.lgl_oid);
104
105         if (index == 0) {
106                 CERROR("Can't cancel index 0 which is header\n");
107                 RETURN(-EINVAL);
108         }
109
110         cfs_spin_lock(&loghandle->lgh_hdr_lock);
111         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
112                 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
113                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
114                 RETURN(-ENOENT);
115         }
116
117         llh->llh_count--;
118
119         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
120             (llh->llh_count == 1) &&
121             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
122                 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
123                 rc = llog_destroy(env, loghandle);
124                 if (rc < 0) {
125                         CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
126                                "#%08x: rc = %d\n",
127                                loghandle->lgh_ctxt->loc_obd->obd_name,
128                                loghandle->lgh_id.lgl_oid,
129                                loghandle->lgh_id.lgl_oseq,
130                                loghandle->lgh_id.lgl_ogen, rc);
131                         GOTO(out_err, rc);
132                 }
133                 RETURN(1);
134         }
135         cfs_spin_unlock(&loghandle->lgh_hdr_lock);
136
137         rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
138         if (rc < 0) {
139                 CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
140                        "#%08x: rc = %d\n",
141                        loghandle->lgh_ctxt->loc_obd->obd_name,
142                        loghandle->lgh_id.lgl_oid,
143                        loghandle->lgh_id.lgl_oseq,
144                        loghandle->lgh_id.lgl_ogen, rc);
145                 GOTO(out_err, rc);
146         }
147         RETURN(0);
148 out_err:
149         cfs_spin_lock(&loghandle->lgh_hdr_lock);
150         ext2_set_bit(index, llh->llh_bitmap);
151         llh->llh_count++;
152         cfs_spin_unlock(&loghandle->lgh_hdr_lock);
153         return rc;
154 }
155 EXPORT_SYMBOL(llog_cancel_rec);
156
157 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
158                      int flags, struct obd_uuid *uuid)
159 {
160         int rc;
161         struct llog_log_hdr *llh;
162         ENTRY;
163         LASSERT(handle->lgh_hdr == NULL);
164
165         OBD_ALLOC_PTR(llh);
166         if (llh == NULL)
167                 RETURN(-ENOMEM);
168         handle->lgh_hdr = llh;
169         /* first assign flags to use llog_client_ops */
170         llh->llh_flags = flags;
171         rc = llog_read_header(env, handle);
172         if (rc == 0) {
173                 flags = llh->llh_flags;
174                 if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
175                         CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
176                                (char *)llh->llh_tgtuuid.uuid);
177                         rc = -EEXIST;
178                 }
179                 GOTO(out, rc);
180         } else if (rc != LLOG_EEMPTY || !flags) {
181                 /* set a pesudo flag for initialization */
182                 flags = LLOG_F_IS_CAT;
183                 GOTO(out, rc);
184         }
185         rc = 0;
186
187         handle->lgh_last_idx = 0; /* header is record with index 0 */
188         llh->llh_count = 1;         /* for the header record */
189         llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
190         llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
191         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
192         llh->llh_timestamp = cfs_time_current_sec();
193         if (uuid)
194                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
195         llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
196         ext2_set_bit(0, llh->llh_bitmap);
197
198 out:
199         if (flags & LLOG_F_IS_CAT) {
200                 LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
201                 CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
202                 llh->llh_size = sizeof(struct llog_logid_rec);
203         } else if (!(flags & LLOG_F_IS_PLAIN)) {
204                 CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
205                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
206                 rc = -EINVAL;
207         }
208
209         if (rc) {
210                 OBD_FREE_PTR(llh);
211                 handle->lgh_hdr = NULL;
212         }
213         RETURN(rc);
214 }
215 EXPORT_SYMBOL(llog_init_handle);
216
217 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
218 {
219         struct llog_operations *lop;
220         int rc;
221         ENTRY;
222
223         rc = llog_handle2ops(loghandle, &lop);
224         if (rc)
225                 GOTO(out, rc);
226         if (lop->lop_close == NULL)
227                 GOTO(out, -EOPNOTSUPP);
228         rc = lop->lop_close(env, loghandle);
229  out:
230         llog_free_handle(loghandle);
231         RETURN(rc);
232 }
233 EXPORT_SYMBOL(llog_close);
234
235 static int llog_process_thread(void *arg)
236 {
237         struct llog_process_info        *lpi = arg;
238         struct llog_handle              *loghandle = lpi->lpi_loghandle;
239         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
240         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
241         char                            *buf;
242         __u64                            cur_offset = LLOG_CHUNK_SIZE;
243         __u64                            last_offset;
244         int                              rc = 0, index = 1, last_index;
245         int                              saved_index = 0;
246         int                              last_called_index = 0;
247
248         ENTRY;
249
250         LASSERT(llh);
251
252         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
253         if (!buf) {
254                 lpi->lpi_rc = -ENOMEM;
255                 RETURN(0);
256         }
257
258         if (cd != NULL) {
259                 last_called_index = cd->lpcd_first_idx;
260                 index = cd->lpcd_first_idx + 1;
261         }
262         if (cd != NULL && cd->lpcd_last_idx)
263                 last_index = cd->lpcd_last_idx;
264         else
265                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
266
267         while (rc == 0) {
268                 struct llog_rec_hdr *rec;
269
270                 /* skip records not set in bitmap */
271                 while (index <= last_index &&
272                        !ext2_test_bit(index, llh->llh_bitmap))
273                         ++index;
274
275                 LASSERT(index <= last_index + 1);
276                 if (index == last_index + 1)
277                         break;
278 repeat:
279                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
280                        index, last_index);
281
282                 /* get the buf with our target record; avoid old garbage */
283                 memset(buf, 0, LLOG_CHUNK_SIZE);
284                 last_offset = cur_offset;
285                 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
286                                      index, &cur_offset, buf, LLOG_CHUNK_SIZE);
287                 if (rc)
288                         GOTO(out, rc);
289
290                 /* NB: when rec->lrh_len is accessed it is already swabbed
291                  * since it is used at the "end" of the loop and the rec
292                  * swabbing is done at the beginning of the loop. */
293                 for (rec = (struct llog_rec_hdr *)buf;
294                      (char *)rec < buf + LLOG_CHUNK_SIZE;
295                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
296
297                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
298                                rec, rec->lrh_type);
299
300                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
301                                 lustre_swab_llog_rec(rec);
302
303                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
304                                rec->lrh_type, rec->lrh_index);
305
306                         if (rec->lrh_index == 0) {
307                                 /* probably another rec just got added? */
308                                 if (index <= loghandle->lgh_last_idx)
309                                         GOTO(repeat, rc = 0);
310                                 GOTO(out, rc = 0); /* no more records */
311                         }
312                         if (rec->lrh_len == 0 ||
313                             rec->lrh_len > LLOG_CHUNK_SIZE) {
314                                 CWARN("invalid length %d in llog record for "
315                                       "index %d/%d\n", rec->lrh_len,
316                                       rec->lrh_index, index);
317                                 GOTO(out, rc = -EINVAL);
318                         }
319
320                         if (rec->lrh_index < index) {
321                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
322                                        rec->lrh_index);
323                                 continue;
324                         }
325
326                         CDEBUG(D_OTHER,
327                                "lrh_index: %d lrh_len: %d (%d remains)\n",
328                                rec->lrh_index, rec->lrh_len,
329                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
330
331                         loghandle->lgh_cur_idx = rec->lrh_index;
332                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
333                                                     last_offset;
334
335                         /* if set, process the callback on this record */
336                         if (ext2_test_bit(index, llh->llh_bitmap)) {
337                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
338                                                  lpi->lpi_cbdata);
339                                 last_called_index = index;
340                                 if (rc == LLOG_PROC_BREAK) {
341                                         GOTO(out, rc);
342                                 } else if (rc == LLOG_DEL_RECORD) {
343                                         llog_cancel_rec(lpi->lpi_env,
344                                                         loghandle,
345                                                         rec->lrh_index);
346                                         rc = 0;
347                                 }
348                                 if (rc)
349                                         GOTO(out, rc);
350                         } else {
351                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
352                         }
353
354                         /* next record, still in buffer? */
355                         ++index;
356                         if (index > last_index)
357                                 GOTO(out, rc = 0);
358                 }
359         }
360
361 out:
362         if (cd != NULL)
363                 cd->lpcd_last_idx = last_called_index;
364
365         OBD_FREE(buf, LLOG_CHUNK_SIZE);
366         lpi->lpi_rc = rc;
367         return 0;
368 }
369
370 #ifdef __KERNEL__
371 static int llog_process_thread_daemonize(void *arg)
372 {
373         struct llog_process_info        *lpi = arg;
374         struct lu_env                    env;
375         int                              rc;
376
377         cfs_daemonize_ctxt("llog_process_thread");
378
379         /* client env has no keys, tags is just 0 */
380         rc = lu_env_init(&env, LCT_LOCAL);
381         if (rc)
382                 goto out;
383         lpi->lpi_env = &env;
384
385         rc = llog_process_thread(arg);
386
387         lu_env_fini(&env);
388 out:
389         cfs_complete(&lpi->lpi_completion);
390         return rc;
391 }
392 #endif
393
394 int llog_process_or_fork(const struct lu_env *env,
395                          struct llog_handle *loghandle,
396                          llog_cb_t cb, void *data, void *catdata, bool fork)
397 {
398         struct llog_process_info *lpi;
399         int                      rc;
400
401         ENTRY;
402
403         OBD_ALLOC_PTR(lpi);
404         if (lpi == NULL) {
405                 CERROR("cannot alloc pointer\n");
406                 RETURN(-ENOMEM);
407         }
408         lpi->lpi_loghandle = loghandle;
409         lpi->lpi_cb        = cb;
410         lpi->lpi_cbdata    = data;
411         lpi->lpi_catdata   = catdata;
412
413 #ifdef __KERNEL__
414         if (fork) {
415                 /* The new thread can't use parent env,
416                  * init the new one in llog_process_thread_daemonize. */
417                 lpi->lpi_env = NULL;
418                 cfs_init_completion(&lpi->lpi_completion);
419                 rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
420                                        CFS_DAEMON_FLAGS);
421                 if (rc < 0) {
422                         CERROR("%s: cannot start thread: rc = %d\n",
423                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
424                         OBD_FREE_PTR(lpi);
425                         RETURN(rc);
426                 }
427                 cfs_wait_for_completion(&lpi->lpi_completion);
428         } else {
429                 lpi->lpi_env = env;
430                 llog_process_thread(lpi);
431         }
432 #else
433         lpi->lpi_env = env;
434         llog_process_thread(lpi);
435 #endif
436         rc = lpi->lpi_rc;
437         OBD_FREE_PTR(lpi);
438         RETURN(rc);
439 }
440
441 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
442                  llog_cb_t cb, void *data, void *catdata)
443 {
444         return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
445 }
446 EXPORT_SYMBOL(llog_process);
447
448 inline int llog_get_size(struct llog_handle *loghandle)
449 {
450         if (loghandle && loghandle->lgh_hdr)
451                 return loghandle->lgh_hdr->llh_count;
452         return 0;
453 }
454 EXPORT_SYMBOL(llog_get_size);
455
456 int llog_reverse_process(const struct lu_env *env,
457                          struct llog_handle *loghandle, llog_cb_t cb,
458                          void *data, void *catdata)
459 {
460         struct llog_log_hdr *llh = loghandle->lgh_hdr;
461         struct llog_process_cat_data *cd = catdata;
462         void *buf;
463         int rc = 0, first_index = 1, index, idx;
464         ENTRY;
465
466         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
467         if (!buf)
468                 RETURN(-ENOMEM);
469
470         if (cd != NULL)
471                 first_index = cd->lpcd_first_idx + 1;
472         if (cd != NULL && cd->lpcd_last_idx)
473                 index = cd->lpcd_last_idx;
474         else
475                 index = LLOG_BITMAP_BYTES * 8 - 1;
476
477         while (rc == 0) {
478                 struct llog_rec_hdr *rec;
479                 struct llog_rec_tail *tail;
480
481                 /* skip records not set in bitmap */
482                 while (index >= first_index &&
483                        !ext2_test_bit(index, llh->llh_bitmap))
484                         --index;
485
486                 LASSERT(index >= first_index - 1);
487                 if (index == first_index - 1)
488                         break;
489
490                 /* get the buf with our target record; avoid old garbage */
491                 memset(buf, 0, LLOG_CHUNK_SIZE);
492                 rc = llog_prev_block(env, loghandle, index, buf,
493                                      LLOG_CHUNK_SIZE);
494                 if (rc)
495                         GOTO(out, rc);
496
497                 rec = buf;
498                 idx = rec->lrh_index;
499                 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
500                 while (idx < index) {
501                         rec = (void *)rec + rec->lrh_len;
502                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
503                                 lustre_swab_llog_rec(rec);
504                         idx ++;
505                 }
506                 LASSERT(idx == index);
507                 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
508
509                 /* process records in buffer, starting where we found one */
510                 while ((void *)tail > buf) {
511                         if (tail->lrt_index == 0)
512                                 GOTO(out, rc = 0); /* no more records */
513
514                         /* if set, process the callback on this record */
515                         if (ext2_test_bit(index, llh->llh_bitmap)) {
516                                 rec = (void *)tail - tail->lrt_len +
517                                       sizeof(*tail);
518
519                                 rc = cb(env, loghandle, rec, data);
520                                 if (rc == LLOG_PROC_BREAK) {
521                                         GOTO(out, rc);
522                                 } else if (rc == LLOG_DEL_RECORD) {
523                                         llog_cancel_rec(env, loghandle,
524                                                         tail->lrt_index);
525                                         rc = 0;
526                                 }
527                                 if (rc)
528                                         GOTO(out, rc);
529                         }
530
531                         /* previous record, still in buffer? */
532                         --index;
533                         if (index < first_index)
534                                 GOTO(out, rc = 0);
535                         tail = (void *)tail - tail->lrt_len;
536                 }
537         }
538
539 out:
540         if (buf)
541                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
542         RETURN(rc);
543 }
544 EXPORT_SYMBOL(llog_reverse_process);