4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
32 * lustre/obdclass/llog_osd.c
34 * Low level llog routines on top of OSD API
36 * This file provides set of methods for llog operations on top of
37 * dt_device. It contains all supported llog_operations interfaces and
38 * supplimental functions.
40 * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
41 * Author: Mikhail Pershin <mike.pershin@intel.com>
44 #define DEBUG_SUBSYSTEM S_LOG
46 #include <linux/delay.h>
48 #include <dt_object.h>
49 #include <llog_swab.h>
50 #include <lustre_fid.h>
52 #include <obd_class.h>
54 #include "llog_internal.h"
55 #include "local_storage.h"
58 * Implementation of the llog_operations::lop_declare_create
60 * This function is a wrapper over local_storage API function
61 * local_object_declare_create().
63 * \param[in] env execution environment
64 * \param[in] los local_storage for bottom storage device
65 * \param[in] o dt_object to create
66 * \param[in] th current transaction handle
68 * \retval 0 on successful declaration of the new object
69 * \retval negative error if declaration was failed
71 static int llog_osd_declare_new_object(const struct lu_env *env,
72 struct local_oid_storage *los,
76 struct llog_thread_info *lgi = llog_info(env);
78 lgi->lgi_attr.la_valid = LA_MODE;
79 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
80 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
82 return local_object_declare_create(env, los, o, &lgi->lgi_attr,
87 * Implementation of the llog_operations::lop_create
89 * This function is a wrapper over local_storage API function
90 * local_object_create().
92 * \param[in] env execution environment
93 * \param[in] los local_storage for bottom storage device
94 * \param[in] o dt_object to create
95 * \param[in] th current transaction handle
97 * \retval 0 on successful creation of the new object
98 * \retval negative error if creation was failed
100 static int llog_osd_create_new_object(const struct lu_env *env,
101 struct local_oid_storage *los,
105 struct llog_thread_info *lgi = llog_info(env);
107 lgi->lgi_attr.la_valid = LA_MODE;
108 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
109 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
111 return local_object_create(env, los, o, &lgi->lgi_attr,
116 * Implementation of the llog_operations::lop_exist
118 * This function checks that llog exists on storage.
120 * \param[in] handle llog handle of the current llog
122 * \retval true if llog object exists and is not just destroyed
123 * \retval false if llog doesn't exist or just destroyed
125 static int llog_osd_exist(struct llog_handle *handle)
127 LASSERT(handle->lgh_obj);
128 return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed;
131 static void *rec_tail(struct llog_rec_hdr *rec)
133 return (void *)((char *)rec + rec->lrh_len -
134 sizeof(struct llog_rec_tail));
138 * Write a padding record to the llog
140 * This function writes a padding record to the end of llog. That may
141 * be needed if llog contains records of variable size, e.g. config logs
143 * The padding record just aligns llog to the llog chunk_size boundary if
144 * the current record doesn't fit in the remaining space.
146 * It allocates full length to avoid two separate writes for header and tail.
147 * Such 2-steps scheme needs extra protection and complex error handling.
149 * \param[in] env execution environment
150 * \param[in] o dt_object to create
151 * \param[in,out] off pointer to the padding start offset
152 * \param[in] len padding length
153 * \param[in] index index of the padding record in a llog
154 * \param[in] th current transaction handle
156 * \retval 0 on successful padding write
157 * \retval negative error if write failed
159 static int llog_osd_pad(const struct lu_env *env, struct dt_object *o,
160 loff_t *off, int len, int index, struct thandle *th)
162 struct llog_thread_info *lgi = llog_info(env);
163 struct llog_rec_hdr *rec;
164 struct llog_rec_tail *tail;
171 LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
178 rec->lrh_index = index;
179 rec->lrh_type = LLOG_PAD_MAGIC;
181 tail = rec_tail(rec);
183 tail->lrt_index = index;
185 lgi->lgi_buf.lb_buf = rec;
186 lgi->lgi_buf.lb_len = len;
187 rc = dt_record_write(env, o, &lgi->lgi_buf, off, th);
189 CERROR("%s: error writing padding record: rc = %d\n",
190 o->do_lu.lo_dev->ld_obd->obd_name, rc);
197 * Implementation of the llog_operations::lop_read_header
199 * This function reads the current llog header from the bottom storage
202 * \param[in] env execution environment
203 * \param[in] handle llog handle of the current llog
205 * \retval 0 on successful header read
206 * \retval negative error if read failed
208 static int llog_osd_read_header(const struct lu_env *env,
209 struct llog_handle *handle)
211 struct llog_rec_hdr *llh_hdr;
213 struct llog_thread_info *lgi;
214 enum llog_flag flags;
222 lgi = llog_info(env);
224 dt_read_lock(env, o, 0);
226 rc = dt_attr_get(env, o, &lgi->lgi_attr);
228 GOTO(unlock, rc = -EIO);
230 LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
232 if (lgi->lgi_attr.la_size == 0) {
233 CDEBUG(D_HA, "not reading header from 0-byte log\n");
234 GOTO(unlock, rc = LLOG_EEMPTY);
237 flags = handle->lgh_hdr->llh_flags;
240 lgi->lgi_buf.lb_buf = handle->lgh_hdr;
241 lgi->lgi_buf.lb_len = handle->lgh_hdr_size;
242 rc = dt_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
243 llh_hdr = &handle->lgh_hdr->llh_hdr;
245 CERROR("%s: can't read llog "DFID" header: rc = %d\n",
246 o->do_lu.lo_dev->ld_obd->obd_name,
247 PFID(lu_object_fid(&o->do_lu)), rc);
248 GOTO(unlock, rc = -EIO);
250 if (rc < sizeof(*llh_hdr) || rc < LLOG_MIN_CHUNK_SIZE) {
251 /* consider short header as non-initialized llog */
252 CERROR("%s: llog "DFID" header too small: rc = %d\n",
253 o->do_lu.lo_dev->ld_obd->obd_name,
254 PFID(lu_object_fid(&o->do_lu)), rc);
255 /* caller flags to be initialized */
256 handle->lgh_hdr->llh_flags = flags;
257 GOTO(unlock, rc = LLOG_EEMPTY);
260 if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
261 lustre_swab_llog_hdr(handle->lgh_hdr);
263 if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
264 CERROR("%s: bad log %s "DFID" header magic: %#x "
265 "(expected %#x)\n", o->do_lu.lo_dev->ld_obd->obd_name,
266 handle->lgh_name ? handle->lgh_name : "",
267 PFID(lu_object_fid(&o->do_lu)),
268 llh_hdr->lrh_type, LLOG_HDR_MAGIC);
269 GOTO(unlock, rc = -EINVAL);
270 } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
271 llh_hdr->lrh_len > handle->lgh_hdr_size) {
272 CERROR("%s: incorrectly sized log %s "DFID" header: "
273 "%#x (expected at least %#x)\n"
274 "you may need to re-run lconf --write_conf.\n",
275 o->do_lu.lo_dev->ld_obd->obd_name,
276 handle->lgh_name ? handle->lgh_name : "",
277 PFID(lu_object_fid(&o->do_lu)),
278 llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
279 GOTO(unlock, rc = -EINVAL);
280 } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index >
281 LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) ||
282 LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len !=
284 CERROR("%s: incorrectly sized log %s "DFID" tailer: "
286 o->do_lu.lo_dev->ld_obd->obd_name,
287 handle->lgh_name ? handle->lgh_name : "",
288 PFID(lu_object_fid(&o->do_lu)),
289 LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO);
290 GOTO(unlock, rc = -EINVAL);
293 handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
294 handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
298 dt_read_unlock(env, o);
303 * Implementation of the llog_operations::lop_declare_write
305 * This function declares the new record write.
307 * \param[in] env execution environment
308 * \param[in] loghandle llog handle of the current llog
309 * \param[in] rec llog record header. This is a real header of the full
310 * llog record to write. This is the beginning of buffer
311 * to write, the length of buffer is stored in
313 * \param[in] idx index of the llog record. If \a idx == -1 then this is
314 * append case, otherwise \a idx is the index of record
316 * \param[in] th current transaction handle
318 * \retval 0 on successful declaration
319 * \retval negative error if declaration failed
321 static int llog_osd_declare_write_rec(const struct lu_env *env,
322 struct llog_handle *loghandle,
323 struct llog_rec_hdr *rec,
324 int idx, struct thandle *th)
326 struct llog_thread_info *lgi = llog_info(env);
337 LASSERT(rec->lrh_len <= loghandle->lgh_ctxt->loc_chunk_size);
339 o = loghandle->lgh_obj;
342 chunk_size = loghandle->lgh_ctxt->loc_chunk_size;
343 lgi->lgi_buf.lb_len = chunk_size;
344 lgi->lgi_buf.lb_buf = NULL;
345 /* each time we update header */
346 rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0, th);
347 if (rc || idx == 0) /* if error or just header */
351 * the pad record can be inserted so take into account double
352 * record size: pad and the actual record into a new block
354 lgi->lgi_buf.lb_len = rec->lrh_len * 2;
355 lgi->lgi_buf.lb_buf = NULL;
356 /* XXX: implement declared window or multi-chunks approach */
357 rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th);
363 * Implementation of the llog_operations::lop_write
365 * This function writes the new record in the llog or modify the existed one.
367 * \param[in] env execution environment
368 * \param[in] loghandle llog handle of the current llog
369 * \param[in] rec llog record header. This is a real header of
370 * the full llog record to write. This is
371 * the beginning of buffer to write, the length
372 * of buffer is stored in \a rec::lrh_len
373 * \param[in,out] reccookie pointer to the cookie to return back if needed.
374 * It is used for further cancel of this llog
376 * \param[in] idx index of the llog record. If \a idx == -1 then
377 * this is append case, otherwise \a idx is
378 * the index of record to modify
379 * \param[in] th current transaction handle
381 * \retval 0 on successful write && \a reccookie == NULL
382 * 1 on successful write && \a reccookie != NULL
383 * \retval negative error if write failed
385 static int llog_osd_write_rec(const struct lu_env *env,
386 struct llog_handle *loghandle,
387 struct llog_rec_hdr *rec,
388 struct llog_cookie *reccookie,
389 int idx, struct thandle *th)
391 struct llog_thread_info *lgi = llog_info(env);
392 struct llog_log_hdr *llh;
393 int reclen = rec->lrh_len;
395 struct llog_rec_tail *lrt;
403 llh = loghandle->lgh_hdr;
404 o = loghandle->lgh_obj;
406 chunk_size = llh->llh_hdr.lrh_len;
407 CDEBUG(D_OTHER, "new record %x to "DFID"\n",
408 rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
410 if (!llog_osd_exist(loghandle))
413 /* record length should not bigger than */
414 if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
417 /* sanity check for fixed-records llog */
418 if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
419 LASSERT(llh->llh_size != 0);
420 LASSERT(llh->llh_size == reclen);
423 /* return error if osp object is stale */
424 if (idx != LLOG_HEADER_IDX && dt_object_stale(o))
426 rc = dt_attr_get(env, o, &lgi->lgi_attr);
431 * The modification case.
432 * If idx set then the record with that index must be modified.
433 * There are three cases possible:
434 * 1) the common case is the llog header update (idx == 0)
435 * 2) the llog record modification during llog process.
436 * This is indicated by the \a loghandle::lgh_cur_idx > 0.
437 * In that case the \a loghandle::lgh_cur_offset
438 * 3) otherwise this is assumed that llog consist of records of
439 * fixed size, i.e. catalog. The llog header must has llh_size
440 * field equal to record size. The record offset is calculated
441 * just by /a idx value
443 * During modification we don't need extra header update because
444 * the bitmap and record count are not changed. The record header
445 * and tail remains the same too.
447 if (idx != LLOG_NEXT_IDX) {
448 /* llog can be empty only when first record is being written */
449 LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0));
451 if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) {
452 CERROR("%s: modify unset record %u\n",
453 o->do_lu.lo_dev->ld_obd->obd_name, idx);
457 if (idx != rec->lrh_index) {
458 CERROR("%s: modify index mismatch %d %u\n",
459 o->do_lu.lo_dev->ld_obd->obd_name, idx,
464 if (idx == LLOG_HEADER_IDX) {
465 /* llog header update */
466 __u32 *bitmap = LLOG_HDR_BITMAP(llh);
470 /* If it does not indicate the bitmap index
471 * (reccookie == NULL), then it means update
472 * the whole update header. Otherwise only
473 * update header and bits needs to be updated,
474 * and in DNE cases, it will signaficantly
475 * shrink the RPC size.
476 * see distribute_txn_cancel_records()*/
477 if (reccookie == NULL) {
478 lgi->lgi_buf.lb_len = reclen;
479 lgi->lgi_buf.lb_buf = rec;
480 rc = dt_record_write(env, o, &lgi->lgi_buf,
485 /* update the header */
486 lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
487 lgi->lgi_buf.lb_buf = llh;
488 rc = dt_record_write(env, o, &lgi->lgi_buf,
493 /* update the bitmap */
494 index = reccookie->lgc_index;
495 lgi->lgi_off = llh->llh_bitmap_offset +
496 (index / (sizeof(*bitmap) * 8)) *
498 lgi->lgi_buf.lb_len = sizeof(*bitmap);
499 lgi->lgi_buf.lb_buf =
500 &bitmap[index/(sizeof(*bitmap)*8)];
501 rc = dt_record_write(env, o, &lgi->lgi_buf,
505 } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
506 lgi->lgi_off = llh->llh_hdr.lrh_len +
508 } else if (reccookie != NULL && reccookie->lgc_index > 0) {
510 * The lgc_offset can be used only if index is
513 if (idx != reccookie->lgc_index) {
514 CERROR("%s: modify index mismatch %d %d\n",
515 o->do_lu.lo_dev->ld_obd->obd_name, idx,
516 reccookie->lgc_index);
520 lgi->lgi_off = reccookie->lgc_offset;
521 CDEBUG(D_OTHER, "modify record "DFID": idx:%u, "
522 "len:%u offset %llu\n",
523 PLOGID(&loghandle->lgh_id), idx,
524 rec->lrh_len, (long long)lgi->lgi_off);
526 /* This can be result of lgh_cur_idx is not set during
527 * llog processing or llh_size is not set to proper
528 * record size for fixed records llog. Therefore it is
529 * impossible to get record offset. */
530 CERROR("%s: can't get record offset, idx:%d, "
531 "len:%u.\n", o->do_lu.lo_dev->ld_obd->obd_name,
536 /* update only data, header and tail remain the same */
537 lgi->lgi_off += sizeof(struct llog_rec_hdr);
538 lgi->lgi_buf.lb_len = REC_DATA_LEN(rec);
539 lgi->lgi_buf.lb_buf = REC_DATA(rec);
540 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
541 if (rc == 0 && reccookie) {
542 reccookie->lgc_lgl = loghandle->lgh_id;
543 reccookie->lgc_index = idx;
551 * The most common case of using llog. The new index is assigned to
552 * the new record, new bit is set in llog bitmap and llog count is
555 * Make sure that records don't cross a chunk boundary, so we can
556 * process them page-at-a-time if needed. If it will cross a chunk
557 * boundary, write in a fake (but referenced) entry to pad the chunk.
561 /* simulate ENOSPC when new plain llog is being added to the
563 if (CFS_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) &&
564 llh->llh_flags & LLOG_F_IS_CAT)
567 LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
568 orig_last_idx = loghandle->lgh_last_idx;
569 lgi->lgi_off = lgi->lgi_attr.la_size;
571 if (loghandle->lgh_max_size > 0 &&
572 lgi->lgi_off >= loghandle->lgh_max_size) {
573 CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u "
574 DFID"\n", (unsigned)lgi->lgi_off,
575 loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
576 PLOGID(&loghandle->lgh_id));
577 /* this is to signal that this llog is full */
578 loghandle->lgh_last_idx = llog_max_idx(llh);
582 left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
583 /* NOTE: padding is a record, but no bit is set */
584 if (left != 0 && left != reclen &&
585 left < (reclen + LLOG_MIN_REC_SIZE)) {
586 index = loghandle->lgh_last_idx + 1;
587 rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
591 loghandle->lgh_last_idx++; /* for pad rec */
594 /* if it's the last idx in log file, then return -ENOSPC
595 * or wrap around if a catalog */
596 if (llog_is_full(loghandle)) {
597 if (llh->llh_flags & LLOG_F_IS_CAT)
598 loghandle->lgh_last_idx = 0;
603 down_write(&loghandle->lgh_last_sem);
604 /* increment the last_idx along with llh_tail index, they should
605 * be equal for a llog lifetime */
606 if (CFS_FAIL_CHECK(OBD_FAIL_LLOG_ADD_GAP) && --cfs_fail_val == 0)
607 loghandle->lgh_last_idx++;
608 loghandle->lgh_last_idx++;
609 index = loghandle->lgh_last_idx;
610 LLOG_HDR_TAIL(llh)->lrt_index = index;
612 * NB: the caller should make sure only 1 process access
613 * the lgh_last_idx, e.g. append should be exclusive.
614 * Otherwise it might hit the assert.
616 LASSERT(index < LLOG_HDR_BITMAP_SIZE(llh));
617 rec->lrh_index = index;
619 lrt->lrt_len = rec->lrh_len;
620 lrt->lrt_index = rec->lrh_index;
622 /* the lgh_hdr_mutex protects llog header data from concurrent
623 * update/cancel, the llh_count and llh_bitmap are protected */
624 mutex_lock(&loghandle->lgh_hdr_mutex);
625 if (__test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh))) {
626 CERROR("%s: index %u already set in llog bitmap "DFID"\n",
627 o->do_lu.lo_dev->ld_obd->obd_name, index,
628 PFID(lu_object_fid(&o->do_lu)));
629 mutex_unlock(&loghandle->lgh_hdr_mutex);
630 LBUG(); /* should never happen */
634 if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
635 /* Update the minimum size of the llog record */
636 if (llh->llh_size == 0)
637 llh->llh_size = reclen;
638 else if (reclen < llh->llh_size)
639 llh->llh_size = reclen;
643 * readers (e.g. llog_osd_read_header()) must not find
644 * llog updated partially (bitmap/counter claims record,
645 * but a record hasn't been added yet) as this results
648 dt_write_lock(env, o, 0);
650 if (lgi->lgi_attr.la_size == 0) {
652 lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
653 lgi->lgi_buf.lb_buf = &llh->llh_hdr;
654 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
656 GOTO(out_unlock, rc);
658 __u32 *bitmap = LLOG_HDR_BITMAP(llh);
660 /* Note: If this is not initialization (size == 0), then do not
661 * write the whole header (8k bytes), only update header/tail
662 * and bits needs to be updated. Because this update might be
663 * part of cross-MDT operation, which needs to write these
664 * updates into the update log(32KB limit) and also pack inside
665 * the RPC (1MB limit), if we write 8K for each operation, which
666 * will cost a lot space, and keep us adding more updates to one
669 lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
670 lgi->lgi_buf.lb_buf = &llh->llh_hdr;
671 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
673 GOTO(out_unlock, rc);
675 lgi->lgi_off = llh->llh_bitmap_offset +
676 (index / (sizeof(*bitmap) * 8)) * sizeof(*bitmap);
677 lgi->lgi_buf.lb_len = sizeof(*bitmap);
678 lgi->lgi_buf.lb_buf = &bitmap[index/(sizeof(*bitmap)*8)];
679 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
681 GOTO(out_unlock, rc);
683 lgi->lgi_off = (unsigned long)LLOG_HDR_TAIL(llh) -
685 lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
686 lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh);
687 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
689 GOTO(out_unlock, rc);
691 if (CFS_FAIL_PRECHECK(OBD_FAIL_LLOG_PAUSE_AFTER_PAD) && pad) {
692 /* a window for concurrent llog reader, see LU-12577 */
693 CFS_FAIL_TIMEOUT(OBD_FAIL_LLOG_PAUSE_AFTER_PAD,
698 /* unlock here for remote object */
699 mutex_unlock(&loghandle->lgh_hdr_mutex);
701 dt_write_unlock(env, o);
705 if (CFS_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
706 cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id &
708 CFS_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT);
709 msleep(1 * MSEC_PER_SEC);
711 /* computed index can be used to determine offset for fixed-size
712 * records. This also allows to handle Catalog wrap around case */
713 if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
714 lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
716 rc = dt_attr_get(env, o, &lgi->lgi_attr);
718 dt_write_unlock(env, o);
722 LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
723 lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size,
727 lgi->lgi_buf.lb_len = reclen;
728 lgi->lgi_buf.lb_buf = rec;
729 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
731 dt_write_unlock(env, o);
735 up_write(&loghandle->lgh_last_sem);
737 CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
738 PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
740 if (reccookie != NULL) {
741 reccookie->lgc_lgl = loghandle->lgh_id;
742 reccookie->lgc_index = index;
743 if ((rec->lrh_type == MDS_UNLINK_REC) ||
744 (rec->lrh_type == MDS_SETATTR64_REC))
745 reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
746 else if (rec->lrh_type == OST_SZ_REC)
747 reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
749 reccookie->lgc_subsys = -1;
754 /* cleanup llog for error case */
755 mutex_lock(&loghandle->lgh_hdr_mutex);
756 clear_bit_le(index, LLOG_HDR_BITMAP(llh));
758 mutex_unlock(&loghandle->lgh_hdr_mutex);
760 /* restore llog last_idx */
761 if (dt_object_remote(o)) {
762 loghandle->lgh_last_idx = orig_last_idx;
763 } else if (--loghandle->lgh_last_idx == 0 &&
764 (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
765 /* catalog had just wrap-around case */
766 loghandle->lgh_last_idx = llog_max_idx(llh);
769 LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
770 up_write(&loghandle->lgh_last_sem);
776 * We can skip reading at least as many log blocks as the number of
777 * minimum sized log records we are skipping. If it turns out
778 * that we are not far enough along the log (because the
779 * actual records are larger than minimum size) we just skip
782 * Note: in llog_process_thread, it will use bitmap offset as
783 * the index to locate the record, which also includs some pad
784 * records, whose record size is very small, and it also does not
785 * consider pad record when recording minimum record size (otherwise
786 * min_record size might be too small), so in some rare cases,
787 * it might skip too much record for @goal, see llog_osd_next_block().
789 * When force_mini_rec is true, it means we have to use LLOG_MIN_REC_SIZE
790 * as the min record size to skip over, usually because in the previous
791 * try, it skip too much record, see loog_osd_next(prev)_block().
793 static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off,
794 int curr, int goal, __u32 chunk_size,
797 struct llog_log_hdr *llh = lgh->lgh_hdr;
799 /* Goal should not bigger than the record count */
800 if (goal > lgh->lgh_last_idx)
801 goal = lgh->lgh_last_idx;
804 if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
805 *off = chunk_size + (goal - 1) * llh->llh_size;
807 __u64 min_rec_size = LLOG_MIN_REC_SIZE;
809 if (llh->llh_size > 0 && !force_mini_rec)
810 min_rec_size = llh->llh_size;
812 *off = *off + (goal - curr - 1) * min_rec_size;
815 /* always align with lower chunk boundary*/
816 *off &= ~(chunk_size - 1);
820 * Remap a record to the desired format as specified by the crf flags.
821 * The record must be big enough to contain the final remapped version.
822 * Superfluous extension fields are removed and missing ones are added
823 * and zeroed. The flags of the record are updated accordingly to what
824 * the calling llog layer can support. Only influence user land has is
825 * to store the NID in large NID format. The user land end user will
826 * recieve all fields that supported by the kernel.
828 * The jobid and rename extensions will be added to a record, to match the
829 * format an application expects, typically. In this case, the newly added
830 * fields will be zeroed.
831 * The Jobid field can be removed, to guarantee compatibility with older
832 * clients that don't expect this field in the records they process.
834 * The following assumptions are being made:
835 * - CLF_RENAME will not be removed
836 * - CLF_JOBID will not be added without CLF_RENAME being added too
837 * - CLF_EXTRA_FLAGS will not be added without CLF_JOBID being added too
839 * @rec: The record to remap.
840 * @crf_wanted: Flags describing the desired extensions.
841 * @cref_want: Flags describing the desired extra extensions.
843 static void changelog_remap_rec(struct changelog_rec *rec,
844 enum changelog_rec_flags crf_wanted,
845 enum changelog_rec_extra_flags cref_want)
847 char *xattr_mov = NULL;
848 char *omd_mov = NULL;
849 char *nid_mov = NULL;
850 char *uidgid_mov = NULL;
854 enum changelog_rec_extra_flags cref = CLFE_INVALID;
856 crf_wanted = (enum changelog_rec_flags)
857 (crf_wanted & CLF_SUPPORTED);
858 cref_want = (enum changelog_rec_extra_flags)
859 (cref_want & CLFE_SUPPORTED);
861 if ((rec->cr_flags & CLF_SUPPORTED) == crf_wanted) {
862 if (!(rec->cr_flags & CLF_EXTRA_FLAGS) ||
863 (rec->cr_flags & CLF_EXTRA_FLAGS &&
864 (changelog_rec_extra_flags(rec)->cr_extra_flags &
870 /* First move the variable-length name field */
871 memmove((char *)rec + changelog_rec_offset(crf_wanted, cref_want),
872 changelog_rec_name(rec), rec->cr_namelen);
874 /* Locations of extensions in the remapped record */
875 if (rec->cr_flags & CLF_EXTRA_FLAGS) {
876 xattr_mov = (char *)rec +
877 changelog_rec_offset(
878 (enum changelog_rec_flags)
879 (crf_wanted & CLF_SUPPORTED),
880 (enum changelog_rec_extra_flags)
881 (cref_want & ~CLFE_XATTR));
882 omd_mov = (char *)rec +
883 changelog_rec_offset(
884 (enum changelog_rec_flags)
885 (crf_wanted & CLF_SUPPORTED),
886 (enum changelog_rec_extra_flags)
887 (cref_want & ~(CLFE_OPEN | CLFE_XATTR)));
888 nid_mov = (char *)rec +
889 changelog_rec_offset(
890 (enum changelog_rec_flags)
891 (crf_wanted & CLF_SUPPORTED),
892 (enum changelog_rec_extra_flags)
894 ~(CLFE_NID | CLFE_OPEN | CLFE_XATTR)));
895 uidgid_mov = (char *)rec +
896 changelog_rec_offset(
897 (enum changelog_rec_flags)
898 (crf_wanted & CLF_SUPPORTED),
899 (enum changelog_rec_extra_flags)
900 (cref_want & ~(CLFE_UIDGID |
904 cref = (enum changelog_rec_extra_flags)
905 changelog_rec_extra_flags(rec)->cr_extra_flags;
908 ef_mov = (char *)rec +
909 changelog_rec_offset(
910 (enum changelog_rec_flags)
911 (crf_wanted & ~CLF_EXTRA_FLAGS), CLFE_INVALID);
912 jid_mov = (char *)rec +
913 changelog_rec_offset((enum changelog_rec_flags)(crf_wanted &
914 ~(CLF_EXTRA_FLAGS | CLF_JOBID)),
916 rnm_mov = (char *)rec +
917 changelog_rec_offset((enum changelog_rec_flags)(crf_wanted &
923 /* Move the extension fields to the desired positions */
924 if ((crf_wanted & CLF_EXTRA_FLAGS) &&
925 (rec->cr_flags & CLF_EXTRA_FLAGS)) {
926 if ((cref_want & CLFE_XATTR) && (cref & CLFE_XATTR))
927 memmove(xattr_mov, changelog_rec_xattr(rec),
928 sizeof(struct changelog_ext_xattr));
930 if ((cref_want & CLFE_OPEN) && (cref & CLFE_OPEN))
931 memmove(omd_mov, changelog_rec_openmode(rec),
932 sizeof(struct changelog_ext_openmode));
934 if ((cref_want & CLFE_NID) && (cref & CLFE_NID)) {
935 struct changelog_ext_nid *cen = changelog_rec_nid(rec);
937 if ((cref_want & CLFE_NID_BE) != (cref & CLFE_NID_BE)) {
938 struct lnet_nid *nid;
940 if (!(cref_want & CLFE_NID_BE)) {
941 nid = (struct lnet_nid *)cen;
942 if (nid_is_nid4(nid)) {
943 struct changelog_ext_nid *mov;
945 mov = (struct changelog_ext_nid *)nid_mov;
946 mov->cr_nid = lnet_nid_to_nid4(nid);
947 cref &= ~CLFE_NID_BE;
953 nid = (struct lnet_nid *)nid_mov;
954 lnet_nid4_to_nid(cen->cr_nid, nid);
956 changelog_rec_extra_flags(rec)->cr_extra_flags =
959 memmove(nid_mov, cen, sizeof(*cen));
963 if ((cref_want & CLFE_UIDGID) && (cref & CLFE_UIDGID))
964 memmove(uidgid_mov, changelog_rec_uidgid(rec),
965 sizeof(struct changelog_ext_uidgid));
967 memmove(ef_mov, changelog_rec_extra_flags(rec),
968 sizeof(struct changelog_ext_extra_flags));
971 if ((crf_wanted & CLF_JOBID) && (rec->cr_flags & CLF_JOBID))
972 memmove(jid_mov, changelog_rec_jobid(rec),
973 sizeof(struct changelog_ext_jobid));
975 if ((crf_wanted & CLF_RENAME) && (rec->cr_flags & CLF_RENAME))
976 memmove(rnm_mov, changelog_rec_rename(rec),
977 sizeof(struct changelog_ext_rename));
979 /* Clear newly added fields */
980 if (xattr_mov && (cref_want & CLFE_XATTR) &&
981 !(cref & CLFE_XATTR))
982 memset(xattr_mov, 0, sizeof(struct changelog_ext_xattr));
984 if (omd_mov && (cref_want & CLFE_OPEN) &&
986 memset(omd_mov, 0, sizeof(struct changelog_ext_openmode));
988 if (nid_mov && (cref_want & CLFE_NID) &&
990 memset(nid_mov, 0, sizeof(struct changelog_ext_nid));
992 if (uidgid_mov && (cref_want & CLFE_UIDGID) &&
993 !(cref & CLFE_UIDGID))
994 memset(uidgid_mov, 0, sizeof(struct changelog_ext_uidgid));
996 if ((crf_wanted & CLF_EXTRA_FLAGS) &&
997 !(rec->cr_flags & CLF_EXTRA_FLAGS))
998 memset(ef_mov, 0, sizeof(struct changelog_ext_extra_flags));
1000 if ((crf_wanted & CLF_JOBID) && !(rec->cr_flags & CLF_JOBID))
1001 memset(jid_mov, 0, sizeof(struct changelog_ext_jobid));
1003 if ((crf_wanted & CLF_RENAME) && !(rec->cr_flags & CLF_RENAME))
1004 memset(rnm_mov, 0, sizeof(struct changelog_ext_rename));
1006 /* Update the record's flags accordingly */
1007 rec->cr_flags = (rec->cr_flags & CLF_FLAGMASK) | crf_wanted;
1008 if (rec->cr_flags & CLF_EXTRA_FLAGS)
1009 changelog_rec_extra_flags(rec)->cr_extra_flags =
1010 changelog_rec_extra_flags(rec)->cr_extra_flags |
1015 * Remove optional fields that the client doesn't expect.
1016 * This is typically in order to ensure compatibility with older clients.
1017 * It is assumed that since we exclusively remove fields, the block will be
1018 * big enough to handle the remapped records. It is also assumed that records
1019 * of a block have the same format (i.e.: the same features enabled).
1021 * \param[in,out] hdr Header of the block of records to remap.
1022 * \param[in,out] last_hdr Last header, don't read past this point.
1023 * \param[in] flags Flags describing the fields to keep.
1024 * \param[in] extra_flags Flags describing the extra fields to keep.
1026 static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
1027 struct llog_rec_hdr *last_hdr,
1028 struct llog_handle *loghandle)
1030 enum changelog_rec_flags flags = CLF_SUPPORTED;
1031 enum changelog_rec_extra_flags extra_flags = CLFE_SUPPORTED;
1033 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_XATTR))
1034 extra_flags &= ~CLFE_XATTR;
1035 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_OMODE))
1036 extra_flags &= ~CLFE_OPEN;
1037 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID))
1038 extra_flags &= ~(CLFE_NID | CLFE_NID_BE);
1039 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID_BE)) {
1040 if (extra_flags & CLFE_NID_BE) {
1041 /* The large nid won't be understood */
1042 extra_flags &= ~CLFE_NID_BE;
1045 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID))
1046 extra_flags &= ~CLFE_UIDGID;
1047 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS))
1048 flags &= ~CLF_EXTRA_FLAGS;
1049 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
1050 flags &= ~CLF_JOBID;
1052 if (flags == CLF_SUPPORTED && extra_flags == CLFE_SUPPORTED)
1055 if (hdr->lrh_type != CHANGELOG_REC)
1059 struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
1060 enum changelog_rec_extra_flags xflag = CLFE_INVALID;
1062 if (flags & CLF_EXTRA_FLAGS &&
1063 rec->cr_flags & CLF_EXTRA_FLAGS) {
1064 xflag = changelog_rec_extra_flags(rec)->cr_extra_flags &
1068 if (unlikely(hdr->lrh_len == 0)) {
1069 /* It is corruption case, we cannot know the next rec,
1070 * jump to the last one directly to avoid dead loop. */
1071 LCONSOLE(D_WARNING, "Hit invalid llog record: "
1072 "idx %u, type %u, id %u\n",
1073 hdr->lrh_index, hdr->lrh_type, hdr->lrh_id);
1074 hdr = llog_rec_hdr_next(last_hdr);
1075 if (unlikely(hdr == last_hdr))
1076 LCONSOLE(D_WARNING, "The last record crashed: "
1077 "idx %u, type %u, id %u\n",
1078 hdr->lrh_index, hdr->lrh_type,
1083 /* Fill up the changelog record with everything the kernel
1086 changelog_remap_rec(rec, rec->cr_flags & flags, xflag);
1087 hdr = llog_rec_hdr_next(hdr);
1088 /* Yield CPU to avoid soft-lockup if there are too many records
1091 } while ((char *)hdr <= (char *)last_hdr);
1095 * Implementation of the llog_operations::lop_next_block
1097 * This function finds the the next llog block to return which contains
1098 * record with required index. It is main part of llog processing.
1100 * \param[in] env execution environment
1101 * \param[in] loghandle llog handle of the current llog
1102 * \param[in,out] cur_idx index preceeding cur_offset
1103 * \param[in] next_idx target index to find
1104 * \param[in,out] cur_offset furtherst point read in the file
1105 * \param[in] buf pointer to data buffer to fill
1106 * \param[in] len required len to read, it is
1107 * usually llog chunk_size.
1109 * \retval 0 on successful buffer read
1110 * \retval negative value on error
1112 static int llog_osd_next_block(const struct lu_env *env,
1113 struct llog_handle *loghandle, int *cur_idx,
1114 int next_idx, __u64 *cur_offset, void *buf,
1117 struct llog_thread_info *lgi = llog_info(env);
1118 struct dt_object *o;
1119 struct dt_device *dt;
1122 int last_idx = *cur_idx;
1123 __u64 last_offset = *cur_offset;
1124 bool force_mini_rec = !next_idx;
1131 chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
1132 if (len == 0 || len & (chunk_size - 1))
1136 LASSERT(loghandle->lgh_ctxt);
1138 if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_CHANGELOG_DEL) &&
1139 cfs_fail_val == ((unsigned long)loghandle & 0xFFFFFFFF)) {
1140 CFS_RACE(OBD_FAIL_MDS_CHANGELOG_DEL);
1141 msleep(MSEC_PER_SEC >> 2);
1144 o = loghandle->lgh_obj;
1146 dt_read_lock(env, o, 0);
1147 if (!llog_osd_exist(loghandle))
1148 GOTO(out, rc = -ESTALE); //object was destroyed
1150 dt = lu2dt_dev(o->do_lu.lo_dev);
1153 rc = dt_attr_get(env, o, &lgi->lgi_attr);
1158 "looking for log index %u (cur idx %u off %llu), size %llu\n",
1160 *cur_offset, lgi->lgi_attr.la_size);
1162 while (*cur_offset < lgi->lgi_attr.la_size) {
1163 struct llog_rec_hdr *rec, *last_rec;
1164 struct llog_rec_tail *tail;
1166 llog_skip_over(loghandle, cur_offset, *cur_idx,
1167 next_idx, chunk_size, force_mini_rec);
1169 /* read up to next llog chunk_size block */
1170 lgi->lgi_buf.lb_len = chunk_size -
1171 (*cur_offset & (chunk_size - 1));
1172 lgi->lgi_buf.lb_buf = buf;
1174 rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
1177 /* no goal is valid case */
1180 if (!force_mini_rec)
1183 CERROR("%s: can't read llog block from log "DFID
1184 " offset %llu: rc = %d\n",
1185 o->do_lu.lo_dev->ld_obd->obd_name,
1186 PFID(lu_object_fid(&o->do_lu)), *cur_offset,
1192 /* signal the end of the valid buffer to
1194 memset(buf + rc, 0, len - rc);
1197 if (rc == 0) { /* end of file, nothing to do */
1198 if (!force_mini_rec)
1203 if (rc < sizeof(*tail)) {
1204 if (!force_mini_rec)
1207 CERROR("%s: invalid llog block at log id "DFID" offset %llu\n",
1208 o->do_lu.lo_dev->ld_obd->obd_name,
1209 PLOGID(&loghandle->lgh_id), *cur_offset);
1210 GOTO(out, rc = -EINVAL);
1214 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
1215 lustre_swab_llog_rec(rec);
1216 tail = (struct llog_rec_tail *)((char *)buf + rc -
1217 sizeof(struct llog_rec_tail));
1219 /* caller handles bad records if any */
1220 if (llog_verify_record(loghandle, rec))
1223 /* get the last record in block */
1224 last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
1227 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
1228 lustre_swab_llog_rec(last_rec);
1230 if (last_rec->lrh_index != tail->lrt_index) {
1231 CERROR("%s: invalid llog tail at log id "DFID" offset %llu last_rec idx %u tail idx %u lrt len %u read_size %d\n",
1232 o->do_lu.lo_dev->ld_obd->obd_name,
1233 PLOGID(&loghandle->lgh_id), *cur_offset,
1234 last_rec->lrh_index, tail->lrt_index,
1236 GOTO(out, rc = -EINVAL);
1239 *cur_idx = tail->lrt_index;
1241 /* this shouldn't happen */
1242 if (tail->lrt_index == 0) {
1243 CERROR("%s: invalid llog tail at log id "DFID"offset %llu bytes %d\n",
1244 o->do_lu.lo_dev->ld_obd->obd_name,
1245 PLOGID(&loghandle->lgh_id), *cur_offset, rc);
1246 GOTO(out, rc = -EINVAL);
1248 if (tail->lrt_index < next_idx) {
1249 last_idx = *cur_idx;
1250 last_offset = *cur_offset;
1254 /* sanity check that the start of the new buffer is no farther
1255 * than the record that we wanted. This shouldn't happen. */
1256 if (next_idx && rec->lrh_index > next_idx) {
1257 if (!force_mini_rec && next_idx > last_idx)
1260 CERROR("%s: missed desired record? %u > %u\n",
1261 o->do_lu.lo_dev->ld_obd->obd_name,
1262 rec->lrh_index, next_idx);
1263 GOTO(out, rc = -ENOENT);
1266 /* Trim unsupported extensions for compat w/ older clients */
1267 changelog_block_trim_ext(rec, last_rec, loghandle);
1272 /* Note: because there are some pad records in the
1273 * llog, so llog_skip_over() might skip too much
1274 * records, let's try skip again with minimum record */
1275 force_mini_rec = true;
1276 *cur_offset = last_offset;
1277 *cur_idx = last_idx;
1279 /* being here means we reach end of llog but didn't find needed idx
1280 * normally could happen while processing remote llog, return -EBADR
1281 * to indicate access beyond end of file like dt_read() does and to
1282 * distunguish this situation from real IO or network issues.
1284 GOTO(out, rc = -EBADR);
1286 dt_read_unlock(env, o);
1291 * Implementation of the llog_operations::lop_prev_block
1293 * This function finds the llog block to return which contains
1294 * record with required index but in reverse order - from end of llog
1296 * It is main part of reverse llog processing.
1298 * \param[in] env execution environment
1299 * \param[in] loghandle llog handle of the current llog
1300 * \param[in] prev_idx target index to find
1301 * \param[in] buf pointer to data buffer to fill
1302 * \param[in] len required len to read, it is llog_chunk_size usually.
1304 * \retval 0 on successful buffer read
1305 * \retval negative value on error
1307 static int llog_osd_prev_block(const struct lu_env *env,
1308 struct llog_handle *loghandle,
1309 int prev_idx, void *buf, int len)
1311 struct llog_thread_info *lgi = llog_info(env);
1312 struct dt_object *o;
1313 struct dt_device *dt;
1320 chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
1321 if (len == 0 || len & (chunk_size - 1))
1324 CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
1327 LASSERT(loghandle->lgh_ctxt);
1329 o = loghandle->lgh_obj;
1331 dt_read_lock(env, o, 0);
1332 if (!llog_osd_exist(loghandle))
1333 GOTO(out, rc = -ESTALE);
1335 dt = lu2dt_dev(o->do_lu.lo_dev);
1338 /* Let's only use mini record size for previous block read
1340 cur_offset = chunk_size;
1341 llog_skip_over(loghandle, &cur_offset, 0, prev_idx,
1344 rc = dt_attr_get(env, o, &lgi->lgi_attr);
1348 while (cur_offset < lgi->lgi_attr.la_size) {
1349 struct llog_rec_hdr *rec, *last_rec;
1350 struct llog_rec_tail *tail;
1352 lgi->lgi_buf.lb_len = len;
1353 lgi->lgi_buf.lb_buf = buf;
1354 rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
1356 CERROR("%s: can't read llog block from log "DFID
1357 " offset %llu: rc = %d\n",
1358 o->do_lu.lo_dev->ld_obd->obd_name,
1359 PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
1363 if (rc == 0) /* end of file, nothing to do */
1366 if (rc < sizeof(*tail)) {
1367 CERROR("%s: invalid llog block at log id "DFID" offset %llu\n",
1368 o->do_lu.lo_dev->ld_obd->obd_name,
1369 PLOGID(&loghandle->lgh_id), cur_offset);
1370 GOTO(out, rc = -EINVAL);
1374 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
1375 lustre_swab_llog_rec(rec);
1377 tail = (struct llog_rec_tail *)((char *)buf + rc -
1378 sizeof(struct llog_rec_tail));
1379 /* get the last record in block */
1380 last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
1381 le32_to_cpu(tail->lrt_len));
1383 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
1384 lustre_swab_llog_rec(last_rec);
1385 LASSERT(last_rec->lrh_index == tail->lrt_index);
1387 /* this shouldn't happen */
1388 if (tail->lrt_index == 0) {
1389 CERROR("%s: invalid llog tail at log id "DFID" offset %llu\n",
1390 o->do_lu.lo_dev->ld_obd->obd_name,
1391 PLOGID(&loghandle->lgh_id), cur_offset);
1392 GOTO(out, rc = -EINVAL);
1394 if (tail->lrt_index < prev_idx)
1397 /* sanity check that the start of the new buffer is no farther
1398 * than the record that we wanted. This shouldn't happen. */
1399 if (rec->lrh_index > prev_idx) {
1400 CERROR("%s: missed desired record? %u > %u\n",
1401 o->do_lu.lo_dev->ld_obd->obd_name,
1402 rec->lrh_index, prev_idx);
1403 GOTO(out, rc = -ENOENT);
1406 /* Trim unsupported extensions for compat w/ older clients */
1407 changelog_block_trim_ext(rec, last_rec, loghandle);
1411 GOTO(out, rc = -EIO);
1413 dt_read_unlock(env, o);
1418 * This is helper function to get llog directory object. It is used by named
1419 * llog operations to find/insert/delete llog entry from llog directory.
1421 * \param[in] env execution environment
1422 * \param[in] ctxt llog context
1424 * \retval dt_object of llog directory
1425 * \retval ERR_PTR of negative value on error
1427 static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
1428 struct llog_ctxt *ctxt)
1430 struct dt_device *dt;
1431 struct dt_thread_info *dti = dt_info(env);
1432 struct dt_object *dir;
1435 dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
1436 if (ctxt->loc_dir == NULL) {
1437 rc = dt_root_get(env, dt, &dti->dti_fid);
1440 dir = dt_locate(env, dt, &dti->dti_fid);
1442 if (!IS_ERR(dir) && !dt_try_as_dir(env, dir, false)) {
1443 dt_object_put(env, dir);
1444 return ERR_PTR(-ENOTDIR);
1447 lu_object_get(&ctxt->loc_dir->do_lu);
1448 dir = ctxt->loc_dir;
1455 * Implementation of the llog_operations::lop_open
1457 * This function opens the llog by its logid or by name, it may open also
1458 * non existent llog and assing then new id to it.
1459 * The llog_open/llog_close pair works similar to lu_object_find/put,
1460 * the object may not exist prior open. The result of open is just dt_object
1461 * in the llog header.
1463 * \param[in] env execution environment
1464 * \param[in] handle llog handle of the current llog
1465 * \param[in] logid logid of llog to open (nameless llog)
1466 * \param[in] name name of llog to open (named llog)
1467 * \param[in] open_param
1468 * LLOG_OPEN_NEW - new llog, may not exist
1469 * LLOG_OPEN_EXIST - old llog, must exist
1471 * \retval 0 on successful open, llog_handle::lgh_obj
1472 * contains the dt_object of the llog.
1473 * \retval negative value on error
1475 static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
1476 struct llog_logid *logid, char *name,
1477 enum llog_open_param open_param)
1479 struct llog_thread_info *lgi = llog_info(env);
1480 struct llog_ctxt *ctxt = handle->lgh_ctxt;
1481 struct dt_object *o;
1482 struct dt_device *dt;
1483 struct ls_device *ls;
1484 struct local_oid_storage *los = NULL;
1486 bool new_id = false;
1492 LASSERT(ctxt->loc_exp);
1493 LASSERT(ctxt->loc_exp->exp_obd);
1494 dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
1496 if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1497 struct lu_object_conf conf = { 0 };
1498 if (logid != NULL) {
1499 logid_to_fid(logid, &lgi->lgi_fid);
1501 /* If logid == NULL, then it means the caller needs
1502 * to allocate new FID (llog_cat_declare_add_rec()). */
1503 rc = dt_fid_alloc(env, dt, &lgi->lgi_fid, NULL, NULL);
1507 conf.loc_flags = LOC_F_NEW;
1510 o = dt_locate_at(env, dt, &lgi->lgi_fid,
1511 dt->dd_lu_dev.ld_site->ls_top_dev, &conf);
1518 ls = ls_device_find_or_init(dt);
1520 RETURN(PTR_ERR(ls));
1522 mutex_lock(&ls->ls_los_mutex);
1523 los = dt_los_find(ls, name != NULL ? FID_SEQ_LLOG_NAME : FID_SEQ_LLOG);
1524 mutex_unlock(&ls->ls_los_mutex);
1526 ls_device_put(env, ls);
1530 if (logid != NULL) {
1531 logid_to_fid(logid, &lgi->lgi_fid);
1533 struct dt_object *llog_dir;
1535 llog_dir = llog_osd_dir_get(env, ctxt);
1536 if (IS_ERR(llog_dir))
1537 GOTO(out, rc = PTR_ERR(llog_dir));
1538 dt_read_lock(env, llog_dir, 0);
1539 rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid);
1540 dt_read_unlock(env, llog_dir);
1541 dt_object_put(env, llog_dir);
1542 if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
1543 /* generate fid for new llog */
1544 rc = local_object_fid_generate(env, los,
1550 OBD_ALLOC(handle->lgh_name, strlen(name) + 1);
1551 if (handle->lgh_name)
1552 strcpy(handle->lgh_name, name);
1554 GOTO(out, rc = -ENOMEM);
1556 LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param);
1557 /* generate fid for new llog */
1559 rc = local_object_fid_generate(env, los, &lgi->lgi_fid);
1564 if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_LLOG_UMOUNT_RACE) &&
1565 cfs_fail_val == 1) {
1567 CFS_RACE(OBD_FAIL_MDS_LLOG_UMOUNT_RACE);
1568 msleep(MSEC_PER_SEC);
1570 o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
1572 GOTO(out_name, rc = PTR_ERR(o));
1574 if (dt_object_exists(o) && new_id) {
1575 /* llog exists with just generated ID, e.g. some old llog file
1576 * still is in use or is orphan, drop a warn and skip it. */
1577 CDEBUG(D_INFO, "%s: llog exists with the same FID: "DFID
1579 o->do_lu.lo_dev->ld_obd->obd_name,
1580 PFID(lu_object_fid(&o->do_lu)));
1581 dt_object_put(env, o);
1582 /* just skip this llog ID, we shouldn't delete it because we
1583 * don't know exactly what is its purpose and state. */
1588 /* No new llog is expected but doesn't exist */
1589 if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) {
1590 CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n",
1591 o->do_lu.lo_dev->ld_obd->obd_name,
1592 PFID(lu_object_fid(&o->do_lu)), o);
1593 GOTO(out_put, rc = -ENOENT);
1595 fid_to_logid(&lgi->lgi_fid, &handle->lgh_id);
1596 handle->lgh_obj = o;
1597 handle->private_data = los;
1598 LASSERT(handle->lgh_ctxt);
1603 dt_object_put(env, o);
1605 OBD_FREE(handle->lgh_name, strlen(name) + 1);
1613 * Get dir for regular fid log object
1615 * Get directory for regular fid log object, and these regular fid log
1616 * object will be inserted under this directory, to satisfy the FS
1617 * consistency check, e2fsck etc.
1619 * \param [in] env execution environment
1620 * \param [in] dto llog object
1622 * \retval pointer to the directory if it is found.
1623 * \retval ERR_PTR(negative errno) if it fails.
1625 static struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env,
1626 struct dt_object *dto)
1628 struct llog_thread_info *lgi = llog_info(env);
1629 struct seq_server_site *ss = dto->do_lu.lo_dev->ld_site->ld_seq_site;
1630 struct lu_seq_range *range = &lgi->lgi_range;
1631 struct lu_fid *dir_fid = &lgi->lgi_fid;
1632 struct dt_object *dir;
1636 fld_range_set_any(range);
1637 LASSERT(ss != NULL);
1638 rc = ss->ss_server_fld->lsf_seq_lookup(env, ss->ss_server_fld,
1639 fid_seq(lu_object_fid(&dto->do_lu)), range);
1641 RETURN(ERR_PTR(rc));
1643 lu_update_log_dir_fid(dir_fid, range->lsr_index);
1644 dir = dt_locate(env, lu2dt_dev(dto->do_lu.lo_dev), dir_fid);
1648 if (!dt_try_as_dir(env, dir, false)) {
1649 dt_object_put(env, dir);
1650 RETURN(ERR_PTR(-ENOTDIR));
1657 * Add llog object with regular FID to name entry
1659 * Add llog object with regular FID to name space, and each llog
1660 * object on each MDT will be /update_log_dir/[seq:oid:ver],
1661 * so to satisfy the namespace consistency check, e2fsck etc.
1663 * \param [in] env execution environment
1664 * \param [in] dto llog object
1665 * \param [in] th thandle
1666 * \param [in] declare if it is declare or execution
1668 * \retval 0 if insertion succeeds.
1669 * \retval negative errno if insertion fails.
1672 llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
1673 struct dt_object *dto,
1674 struct thandle *th, bool declare)
1676 struct llog_thread_info *lgi = llog_info(env);
1677 const struct lu_fid *fid = lu_object_fid(&dto->do_lu);
1678 struct dt_insert_rec *rec = &lgi->lgi_dt_rec;
1679 struct dt_object *dir;
1680 char *name = lgi->lgi_name;
1684 if (!fid_is_norm(fid))
1687 dir = llog_osd_get_regular_fid_dir(env, dto);
1689 RETURN(PTR_ERR(dir));
1692 rec->rec_type = S_IFREG;
1693 snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
1694 dt_write_lock(env, dir, 0);
1696 rc = dt_declare_insert(env, dir, (struct dt_rec *)rec,
1697 (struct dt_key *)name, th);
1699 rc = dt_insert(env, dir, (struct dt_rec *)rec,
1700 (struct dt_key *)name, th);
1702 dt_write_unlock(env, dir);
1704 dt_object_put(env, dir);
1710 * Implementation of the llog_operations::lop_declare_create
1712 * This function declares the llog create. It declares also name insert
1713 * into llog directory in case of named llog.
1715 * \param[in] env execution environment
1716 * \param[in] res llog handle of the current llog
1717 * \param[in] th current transaction handle
1719 * \retval 0 on successful create declaration
1720 * \retval negative value on error
1722 static int llog_osd_declare_create(const struct lu_env *env,
1723 struct llog_handle *res, struct thandle *th)
1725 struct llog_thread_info *lgi = llog_info(env);
1726 struct dt_insert_rec *rec = &lgi->lgi_dt_rec;
1727 struct local_oid_storage *los;
1728 struct dt_object *o;
1733 LASSERT(res->lgh_obj);
1736 /* object can be created by another thread */
1738 if (dt_object_exists(o))
1741 if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1742 struct llog_thread_info *lgi = llog_info(env);
1744 lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE;
1745 lgi->lgi_attr.la_size = 0;
1746 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1747 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1749 rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
1755 rc = llog_osd_regular_fid_add_name_entry(env, o, th, true);
1759 los = res->private_data;
1762 rc = llog_osd_declare_new_object(env, los, o, th);
1766 /* do not declare header initialization here as it's declared
1767 * in llog_osd_declare_write_rec() which is always called */
1769 if (res->lgh_name) {
1770 struct dt_object *llog_dir;
1772 llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
1773 if (IS_ERR(llog_dir))
1774 RETURN(PTR_ERR(llog_dir));
1775 logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
1776 rec->rec_fid = &lgi->lgi_fid;
1777 rec->rec_type = S_IFREG;
1778 rc = dt_declare_insert(env, llog_dir,
1779 (struct dt_rec *)rec,
1780 (struct dt_key *)res->lgh_name, th);
1781 dt_object_put(env, llog_dir);
1783 CERROR("%s: can't declare named llog %s: rc = %d\n",
1784 o->do_lu.lo_dev->ld_obd->obd_name,
1791 * Implementation of the llog_operations::lop_create
1793 * This function creates the llog according with llog_handle::lgh_obj
1794 * and llog_handle::lgh_name.
1796 * \param[in] env execution environment
1797 * \param[in] res llog handle of the current llog
1798 * \param[in] th current transaction handle
1800 * \retval 0 on successful create
1801 * \retval negative value on error
1803 static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
1806 struct llog_thread_info *lgi = llog_info(env);
1807 struct dt_insert_rec *rec = &lgi->lgi_dt_rec;
1808 struct local_oid_storage *los;
1809 struct dt_object *o;
1818 /* llog can be already created */
1819 if (dt_object_exists(o))
1822 if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1823 struct llog_thread_info *lgi = llog_info(env);
1825 lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE | LA_TYPE;
1826 lgi->lgi_attr.la_size = 0;
1827 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1828 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1830 dt_write_lock(env, o, 0);
1831 rc = dt_create(env, o, &lgi->lgi_attr, NULL,
1833 dt_write_unlock(env, o);
1837 rc = llog_osd_regular_fid_add_name_entry(env, o, th, false);
1842 los = res->private_data;
1845 dt_write_lock(env, o, 0);
1846 if (!dt_object_exists(o))
1847 rc = llog_osd_create_new_object(env, los, o, th);
1851 dt_write_unlock(env, o);
1855 if (res->lgh_name) {
1856 struct dt_object *llog_dir;
1858 llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
1859 if (IS_ERR(llog_dir))
1860 RETURN(PTR_ERR(llog_dir));
1862 logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
1863 rec->rec_fid = &lgi->lgi_fid;
1864 rec->rec_type = S_IFREG;
1865 dt_read_lock(env, llog_dir, 0);
1866 rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
1867 (struct dt_key *)res->lgh_name, th);
1868 dt_read_unlock(env, llog_dir);
1869 dt_object_put(env, llog_dir);
1871 CERROR("%s: can't create named llog %s: rc = %d\n",
1872 o->do_lu.lo_dev->ld_obd->obd_name,
1879 * Implementation of the llog_operations::lop_close
1881 * This function closes the llog. It just put llog object and referenced
1884 * \param[in] env execution environment
1885 * \param[in] handle llog handle of the current llog
1887 * \retval 0 on successful llog close
1888 * \retval negative value on error
1890 static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
1892 struct local_oid_storage *los;
1897 LASSERT(handle->lgh_obj);
1899 if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1900 /* Remove the object from the cache, otherwise it may
1901 * hold LOD being released during cleanup process */
1902 dt_object_put_nocache(env, handle->lgh_obj);
1903 LASSERT(handle->private_data == NULL);
1906 dt_object_put(env, handle->lgh_obj);
1908 los = handle->private_data;
1912 OBD_FREE(handle->lgh_name, strlen(handle->lgh_name) + 1);
1918 * delete llog object name entry
1920 * Delete llog object (with regular FID) from name space (under
1923 * \param [in] env execution environment
1924 * \param [in] dto llog object
1925 * \param [in] th thandle
1926 * \param [in] declare if it is declare or execution
1928 * \retval 0 if deletion succeeds.
1929 * \retval negative errno if deletion fails.
1932 llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
1933 struct dt_object *dto,
1934 struct thandle *th, bool declare)
1936 struct llog_thread_info *lgi = llog_info(env);
1937 const struct lu_fid *fid = lu_object_fid(&dto->do_lu);
1938 struct dt_object *dir;
1939 char *name = lgi->lgi_name;
1943 if (!fid_is_norm(fid))
1946 dir = llog_osd_get_regular_fid_dir(env, dto);
1948 RETURN(PTR_ERR(dir));
1950 snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
1951 dt_write_lock(env, dir, 0);
1953 rc = dt_declare_delete(env, dir, (struct dt_key *)name,
1956 rc = dt_delete(env, dir, (struct dt_key *)name, th);
1958 dt_write_unlock(env, dir);
1960 dt_object_put(env, dir);
1965 * Implementation of the llog_operations::lop_declare_destroy
1967 * This function declare destroys the llog and deletes also entry in the
1968 * llog directory in case of named llog. Llog should be opened prior that.
1970 * \param[in] env execution environment
1971 * \param[in] loghandle llog handle of the current llog
1973 * \retval 0 on successful destroy
1974 * \retval negative value on error
1976 static int llog_osd_declare_destroy(const struct lu_env *env,
1977 struct llog_handle *loghandle,
1980 struct llog_ctxt *ctxt;
1981 struct dt_object *o, *llog_dir = NULL;
1986 ctxt = loghandle->lgh_ctxt;
1989 o = loghandle->lgh_obj;
1992 if (loghandle->lgh_name) {
1993 llog_dir = llog_osd_dir_get(env, ctxt);
1994 if (IS_ERR(llog_dir))
1995 RETURN(PTR_ERR(llog_dir));
1997 rc = dt_declare_delete(env, llog_dir,
1998 (struct dt_key *)loghandle->lgh_name,
2004 rc = dt_declare_ref_del(env, o, th);
2008 rc = dt_declare_destroy(env, o, th);
2012 if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
2013 rc = llog_osd_regular_fid_del_name_entry(env, o, th, true);
2019 if (!(IS_ERR_OR_NULL(llog_dir)))
2020 dt_object_put(env, llog_dir);
2027 * Implementation of the llog_operations::lop_destroy
2029 * This function destroys the llog and deletes also entry in the
2030 * llog directory in case of named llog. Llog should be opened prior that.
2031 * Destroy method is not part of external transaction and does everything
2034 * \param[in] env execution environment
2035 * \param[in] loghandle llog handle of the current llog
2037 * \retval 0 on successful destroy
2038 * \retval negative value on error
2040 static int llog_osd_destroy(const struct lu_env *env,
2041 struct llog_handle *loghandle, struct thandle *th)
2043 struct llog_ctxt *ctxt;
2044 struct dt_object *o, *llog_dir = NULL;
2049 ctxt = loghandle->lgh_ctxt;
2050 LASSERT(ctxt != NULL);
2052 o = loghandle->lgh_obj;
2055 dt_write_lock(env, o, 0);
2056 if (!llog_osd_exist(loghandle))
2057 GOTO(out_unlock, rc = 0);
2059 if (loghandle->lgh_name) {
2060 llog_dir = llog_osd_dir_get(env, ctxt);
2061 if (IS_ERR(llog_dir))
2062 GOTO(out_unlock, rc = PTR_ERR(llog_dir));
2064 dt_read_lock(env, llog_dir, 0);
2065 rc = dt_delete(env, llog_dir,
2066 (struct dt_key *)loghandle->lgh_name,
2068 dt_read_unlock(env, llog_dir);
2070 CERROR("%s: can't remove llog %s: rc = %d\n",
2071 o->do_lu.lo_dev->ld_obd->obd_name,
2072 loghandle->lgh_name, rc);
2073 GOTO(out_unlock, rc);
2077 dt_ref_del(env, o, th);
2078 rc = dt_destroy(env, o, th);
2080 GOTO(out_unlock, rc);
2082 loghandle->lgh_destroyed = true;
2083 if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
2084 rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
2086 GOTO(out_unlock, rc);
2090 dt_write_unlock(env, o);
2091 if (!(IS_ERR_OR_NULL(llog_dir)))
2092 dt_object_put(env, llog_dir);
2097 * Implementation of the llog_operations::lop_setup
2099 * This function setup the llog on local storage.
2101 * \param[in] env execution environment
2102 * \param[in] obd obd device the llog belongs to
2103 * \param[in] olg the llog group, it is always zero group now.
2104 * \param[in] ctxt_idx the llog index, it defines the purpose of this llog.
2105 * Every new llog type have to use own index.
2106 * \param[in] disk_obd the storage obd, where llog is stored.
2108 * \retval 0 on successful llog setup
2109 * \retval negative value on error
2111 static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd,
2112 struct obd_llog_group *olg, int ctxt_idx,
2113 struct obd_device *disk_obd)
2115 struct llog_thread_info *lgi = llog_info(env);
2116 struct llog_ctxt *ctxt;
2121 LASSERT(olg->olg_ctxts[ctxt_idx]);
2123 ctxt = llog_ctxt_get(olg->olg_ctxts[ctxt_idx]);
2126 if (disk_obd == NULL)
2129 /* initialize data allowing to generate new fids,
2130 * literally we need a sequece */
2131 lgi->lgi_fid.f_seq = FID_SEQ_LLOG;
2132 lgi->lgi_fid.f_oid = 1;
2133 lgi->lgi_fid.f_ver = 0;
2134 rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
2136 &ctxt->loc_los_nameless);
2140 lgi->lgi_fid.f_seq = FID_SEQ_LLOG_NAME;
2141 lgi->lgi_fid.f_oid = 1;
2142 lgi->lgi_fid.f_ver = 0;
2143 rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
2145 &ctxt->loc_los_named);
2147 local_oid_storage_fini(env, ctxt->loc_los_nameless);
2148 ctxt->loc_los_nameless = NULL;
2154 llog_ctxt_put(ctxt);
2159 * Implementation of the llog_operations::lop_cleanup
2161 * This function cleanups the llog on local storage.
2163 * \param[in] env execution environment
2164 * \param[in] ctxt the llog context
2168 static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
2170 if (ctxt->loc_los_nameless != NULL) {
2171 local_oid_storage_fini(env, ctxt->loc_los_nameless);
2172 ctxt->loc_los_nameless = NULL;
2175 if (ctxt->loc_los_named != NULL) {
2176 local_oid_storage_fini(env, ctxt->loc_los_named);
2177 ctxt->loc_los_named = NULL;
2183 const struct llog_operations llog_osd_ops = {
2184 .lop_next_block = llog_osd_next_block,
2185 .lop_prev_block = llog_osd_prev_block,
2186 .lop_read_header = llog_osd_read_header,
2187 .lop_declare_destroy = llog_osd_declare_destroy,
2188 .lop_destroy = llog_osd_destroy,
2189 .lop_setup = llog_osd_setup,
2190 .lop_cleanup = llog_osd_cleanup,
2191 .lop_open = llog_osd_open,
2192 .lop_exist = llog_osd_exist,
2193 .lop_declare_create = llog_osd_declare_create,
2194 .lop_create = llog_osd_create,
2195 .lop_declare_write_rec = llog_osd_declare_write_rec,
2196 .lop_write_rec = llog_osd_write_rec,
2197 .lop_close = llog_osd_close,
2199 EXPORT_SYMBOL(llog_osd_ops);
2201 const struct llog_operations llog_common_cat_ops = {
2202 .lop_next_block = llog_osd_next_block,
2203 .lop_prev_block = llog_osd_prev_block,
2204 .lop_read_header = llog_osd_read_header,
2205 .lop_declare_destroy = llog_osd_declare_destroy,
2206 .lop_destroy = llog_osd_destroy,
2207 .lop_setup = llog_osd_setup,
2208 .lop_cleanup = llog_osd_cleanup,
2209 .lop_open = llog_osd_open,
2210 .lop_exist = llog_osd_exist,
2211 .lop_declare_create = llog_osd_declare_create,
2212 .lop_create = llog_osd_create,
2213 .lop_declare_write_rec = llog_osd_declare_write_rec,
2214 .lop_write_rec = llog_osd_write_rec,
2215 .lop_close = llog_osd_close,
2216 .lop_add = llog_cat_add_rec,
2217 .lop_declare_add = llog_cat_declare_add_rec,
2219 EXPORT_SYMBOL(llog_common_cat_ops);
2222 * Read the special file which contains the list of llog catalogs IDs
2224 * This function reads the CATALOGS file which contains the array of llog
2225 * catalogs IDs. The main purpose of this file is to store OSP llogs indexed
2226 * by OST/MDT number.
2228 * \param[in] env execution environment
2229 * \param[in] d corresponding storage device
2230 * \param[in] idx position to start from, usually OST/MDT index
2231 * \param[in] count how many catalog IDs to read
2232 * \param[out] idarray the buffer for the data. If it is NULL then
2233 * function returns just number of catalog IDs
2235 * \param[in] fid LLOG_CATALOGS_OID for CATALOG object
2237 * \retval 0 on successful read of catalog IDs
2238 * \retval negative value on error
2239 * \retval positive value which is number of records in
2240 * the file if \a idarray is NULL
2242 int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
2243 int idx, int count, struct llog_catid *idarray,
2244 const struct lu_fid *fid)
2246 struct llog_thread_info *lgi = llog_info(env);
2247 struct dt_object *o = NULL;
2255 size = sizeof(*idarray) * count;
2256 lgi->lgi_off = idx * sizeof(*idarray);
2258 lgi->lgi_fid = *fid;
2259 o = dt_locate(env, d, &lgi->lgi_fid);
2263 if (!dt_object_exists(o)) {
2264 th = dt_trans_create(env, d);
2266 GOTO(out, rc = PTR_ERR(th));
2268 lgi->lgi_attr.la_valid = LA_MODE | LA_TYPE;
2269 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
2270 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
2272 th->th_wait_submit = 1;
2273 /* Make the llog object creation synchronization, so
2274 * it will be reliable to the reference, especially
2275 * for remote reference */
2278 rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
2281 GOTO(out_trans, rc);
2283 rc = dt_trans_start_local(env, d, th);
2285 GOTO(out_trans, rc);
2287 dt_write_lock(env, o, 0);
2288 if (!dt_object_exists(o))
2289 rc = dt_create(env, o, &lgi->lgi_attr, NULL,
2291 dt_write_unlock(env, o);
2293 dt_trans_stop(env, d, th);
2298 rc = dt_attr_get(env, o, &lgi->lgi_attr);
2302 if (!S_ISREG(lgi->lgi_attr.la_mode)) {
2303 CERROR("%s: CATALOGS is not a regular file!: mode = %o\n",
2304 o->do_lu.lo_dev->ld_obd->obd_name,
2305 lgi->lgi_attr.la_mode);
2306 GOTO(out, rc = -ENOENT);
2309 CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
2310 (int)lgi->lgi_attr.la_size, size);
2312 /* return just number of llogs */
2313 if (idarray == NULL) {
2314 rc = lgi->lgi_attr.la_size / sizeof(*idarray);
2318 /* read for new ost index or for empty file */
2319 memset(idarray, 0, size);
2320 if (lgi->lgi_attr.la_size <= lgi->lgi_off)
2322 if (lgi->lgi_attr.la_size < lgi->lgi_off + size)
2323 size = lgi->lgi_attr.la_size - lgi->lgi_off;
2325 lgi->lgi_buf.lb_buf = idarray;
2326 lgi->lgi_buf.lb_len = size;
2327 rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
2328 /* -EFAULT means the llog is a sparse file. This is not an error
2329 * after arbitrary OST index is supported. */
2330 if (rc < 0 && rc != -EFAULT) {
2331 CERROR("%s: error reading CATALOGS: rc = %d\n",
2332 o->do_lu.lo_dev->ld_obd->obd_name, rc);
2338 dt_object_put(env, o);
2341 EXPORT_SYMBOL(llog_osd_get_cat_list);
2344 * Write the special file which contains the list of llog catalogs IDs
2346 * This function writes the CATALOG file which contains the array of llog
2347 * catalogs IDs. It is used mostly to store OSP llogs indexed by OST/MDT
2350 * \param[in] env execution environment
2351 * \param[in] d corresponding storage device
2352 * \param[in] idx position to start from, usually OST/MDT index
2353 * \param[in] count how many catalog IDs to write
2354 * \param[out] idarray the buffer with the data to write.
2355 * \param[in] fid LLOG_CATALOGS_OID for CATALOG object
2357 * \retval 0 on successful write of catalog IDs
2358 * \retval negative value on error
2360 int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
2361 int idx, int count, struct llog_catid *idarray,
2362 const struct lu_fid *fid)
2364 struct llog_thread_info *lgi = llog_info(env);
2365 struct dt_object *o = NULL;
2374 size = sizeof(*idarray) * count;
2375 lgi->lgi_off = idx * sizeof(*idarray);
2376 lgi->lgi_fid = *fid;
2378 o = dt_locate(env, d, &lgi->lgi_fid);
2382 if (!dt_object_exists(o))
2383 GOTO(out, rc = -ENOENT);
2385 rc = dt_attr_get(env, o, &lgi->lgi_attr);
2389 if (!S_ISREG(lgi->lgi_attr.la_mode)) {
2390 CERROR("%s: CATALOGS is not a regular file!: mode = %o\n",
2391 o->do_lu.lo_dev->ld_obd->obd_name,
2392 lgi->lgi_attr.la_mode);
2393 GOTO(out, rc = -ENOENT);
2396 th = dt_trans_create(env, d);
2398 GOTO(out, rc = PTR_ERR(th));
2400 lgi->lgi_buf.lb_len = size;
2401 lgi->lgi_buf.lb_buf = idarray;
2402 rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
2404 GOTO(out_trans, rc);
2406 /* For update log, this happens during initialization,
2407 * see lod_sub_prep_llog(), and we need make sure catlog
2408 * file ID is written to catlist file(committed) before
2409 * cross-MDT operation write update records to catlog FILE,
2410 * otherwise, during failover these update records might
2412 if (fid_is_update_log(fid))
2415 rc = dt_trans_start_local(env, d, th);
2417 GOTO(out_trans, rc);
2419 th->th_wait_submit = 1;
2421 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
2423 CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n",
2426 dt_trans_stop(env, d, th);
2428 dt_object_put(env, o);
2431 EXPORT_SYMBOL(llog_osd_put_cat_list);