Whamcloud - gitweb
LU-6556 obdclass: re-allow catalog to wrap around
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014 Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32 /*
33  * lustre/obdclass/llog_osd.c
34  *
35  * Low level llog routines on top of OSD API
36  *
37  * This file provides set of methods for llog operations on top of
38  * dt_device. It contains all supported llog_operations interfaces and
39  * supplimental functions.
40  *
41  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
42  * Author: Mikhail Pershin <mike.pershin@intel.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_LOG
46
47 #include <obd.h>
48 #include <obd_class.h>
49 #include <lustre_fid.h>
50 #include <dt_object.h>
51
52 #include "llog_internal.h"
53 #include "local_storage.h"
54
55 /**
56  * Implementation of the llog_operations::lop_declare_create
57  *
58  * This function is a wrapper over local_storage API function
59  * local_object_declare_create().
60  *
61  * \param[in] env       execution environment
62  * \param[in] los       local_storage for bottom storage device
63  * \param[in] o         dt_object to create
64  * \param[in] th        current transaction handle
65  *
66  * \retval              0 on successful declaration of the new object
67  * \retval              negative error if declaration was failed
68  */
69 static int llog_osd_declare_new_object(const struct lu_env *env,
70                                        struct local_oid_storage *los,
71                                        struct dt_object *o,
72                                        struct thandle *th)
73 {
74         struct llog_thread_info *lgi = llog_info(env);
75
76         lgi->lgi_attr.la_valid = LA_MODE;
77         lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
78         lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
79
80         return local_object_declare_create(env, los, o, &lgi->lgi_attr,
81                                            &lgi->lgi_dof, th);
82 }
83
84 /**
85  * Implementation of the llog_operations::lop_create
86  *
87  * This function is a wrapper over local_storage API function
88  * local_object_create().
89  *
90  * \param[in] env       execution environment
91  * \param[in] los       local_storage for bottom storage device
92  * \param[in] o         dt_object to create
93  * \param[in] th        current transaction handle
94  *
95  * \retval              0 on successful creation of the new object
96  * \retval              negative error if creation was failed
97  */
98 static int llog_osd_create_new_object(const struct lu_env *env,
99                                       struct local_oid_storage *los,
100                                       struct dt_object *o,
101                                       struct thandle *th)
102 {
103         struct llog_thread_info *lgi = llog_info(env);
104
105         lgi->lgi_attr.la_valid = LA_MODE;
106         lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
107         lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
108
109         return local_object_create(env, los, o, &lgi->lgi_attr,
110                                    &lgi->lgi_dof, th);
111 }
112
113 /**
114  * Write a padding record to the llog
115  *
116  * This function writes a padding record to the end of llog. That may
117  * be needed if llog contains records of variable size, e.g. config logs
118  * or changelogs.
119  * The padding record just aligns llog to the llog chunk_size boundary if
120  * the current record doesn't fit in the remaining space.
121  *
122  * It allocates full length to avoid two separate writes for header and tail.
123  * Such 2-steps scheme needs extra protection and complex error handling.
124  *
125  * \param[in]     env   execution environment
126  * \param[in]     o     dt_object to create
127  * \param[in,out] off   pointer to the padding start offset
128  * \param[in]     len   padding length
129  * \param[in]     index index of the padding record in a llog
130  * \param[in]     th    current transaction handle
131  *
132  * \retval              0 on successful padding write
133  * \retval              negative error if write failed
134  */
135 static int llog_osd_pad(const struct lu_env *env, struct dt_object *o,
136                         loff_t *off, int len, int index, struct thandle *th)
137 {
138         struct llog_thread_info *lgi = llog_info(env);
139         struct llog_rec_hdr     *rec;
140         struct llog_rec_tail    *tail;
141         int                      rc;
142
143         ENTRY;
144
145         LASSERT(th);
146         LASSERT(off);
147         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
148
149         OBD_ALLOC(rec, len);
150         if (rec == NULL)
151                 RETURN(-ENOMEM);
152
153         rec->lrh_len = len;
154         rec->lrh_index = index;
155         rec->lrh_type = LLOG_PAD_MAGIC;
156
157         tail = rec_tail(rec);
158         tail->lrt_len = len;
159         tail->lrt_index = index;
160
161         lgi->lgi_buf.lb_buf = rec;
162         lgi->lgi_buf.lb_len = len;
163         rc = dt_record_write(env, o, &lgi->lgi_buf, off, th);
164         if (rc)
165                 CERROR("%s: error writing padding record: rc = %d\n",
166                        o->do_lu.lo_dev->ld_obd->obd_name, rc);
167
168         OBD_FREE(rec, len);
169         RETURN(rc);
170 }
171
172 /**
173  * Implementation of the llog_operations::lop_read_header
174  *
175  * This function reads the current llog header from the bottom storage
176  * device.
177  *
178  * \param[in] env       execution environment
179  * \param[in] handle    llog handle of the current llog
180  *
181  * \retval              0 on successful header read
182  * \retval              negative error if read failed
183  */
184 static int llog_osd_read_header(const struct lu_env *env,
185                                 struct llog_handle *handle)
186 {
187         struct llog_rec_hdr     *llh_hdr;
188         struct dt_object        *o;
189         struct llog_thread_info *lgi;
190         enum llog_flag           flags;
191         int                      rc;
192
193         ENTRY;
194
195         o = handle->lgh_obj;
196         LASSERT(o);
197
198         lgi = llog_info(env);
199
200         rc = dt_attr_get(env, o, &lgi->lgi_attr);
201         if (rc)
202                 RETURN(rc);
203
204         LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
205
206         if (lgi->lgi_attr.la_size == 0) {
207                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
208                 RETURN(LLOG_EEMPTY);
209         }
210
211         flags = handle->lgh_hdr->llh_flags;
212
213         lgi->lgi_off = 0;
214         lgi->lgi_buf.lb_buf = handle->lgh_hdr;
215         lgi->lgi_buf.lb_len = handle->lgh_hdr_size;
216         rc = dt_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
217         llh_hdr = &handle->lgh_hdr->llh_hdr;
218         if (rc < sizeof(*llh_hdr) || rc < llh_hdr->lrh_len) {
219                 CERROR("%s: error reading "DFID" log header size %d: rc = %d\n",
220                        o->do_lu.lo_dev->ld_obd->obd_name,
221                        PFID(lu_object_fid(&o->do_lu)), rc < 0 ? 0 : rc,
222                        -EFAULT);
223
224                 if (rc >= 0)
225                         rc = -EFAULT;
226
227                 RETURN(rc);
228         }
229
230         if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
231                 lustre_swab_llog_hdr(handle->lgh_hdr);
232
233         if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
234                 CERROR("%s: bad log %s "DFID" header magic: %#x "
235                        "(expected %#x)\n", o->do_lu.lo_dev->ld_obd->obd_name,
236                        handle->lgh_name ? handle->lgh_name : "",
237                        PFID(lu_object_fid(&o->do_lu)),
238                        llh_hdr->lrh_type, LLOG_HDR_MAGIC);
239                 RETURN(-EIO);
240         } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
241                    llh_hdr->lrh_len > handle->lgh_hdr_size) {
242                 CERROR("%s: incorrectly sized log %s "DFID" header: "
243                        "%#x (expected at least %#x)\n"
244                        "you may need to re-run lconf --write_conf.\n",
245                        o->do_lu.lo_dev->ld_obd->obd_name,
246                        handle->lgh_name ? handle->lgh_name : "",
247                        PFID(lu_object_fid(&o->do_lu)),
248                        llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
249                 RETURN(-EIO);
250         } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index >
251                    LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) ||
252                    LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len !=
253                         llh_hdr->lrh_len) {
254                 CERROR("%s: incorrectly sized log %s "DFID" tailer: "
255                        "%#x : rc = %d\n",
256                        o->do_lu.lo_dev->ld_obd->obd_name,
257                        handle->lgh_name ? handle->lgh_name : "",
258                        PFID(lu_object_fid(&o->do_lu)),
259                        LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO);
260                 RETURN(-EIO);
261         }
262
263         handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
264         handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
265
266         RETURN(0);
267 }
268
269 /**
270  * Implementation of the llog_operations::lop_declare_write
271  *
272  * This function declares the new record write.
273  *
274  * \param[in] env       execution environment
275  * \param[in] loghandle llog handle of the current llog
276  * \param[in] rec       llog record header. This is a real header of the full
277  *                      llog record to write. This is the beginning of buffer
278  *                      to write, the length of buffer is stored in
279  *                      \a rec::lrh_len
280  * \param[in] idx       index of the llog record. If \a idx == -1 then this is
281  *                      append case, otherwise \a idx is the index of record
282  *                      to modify
283  * \param[in] th        current transaction handle
284  *
285  * \retval              0 on successful declaration
286  * \retval              negative error if declaration failed
287  */
288 static int llog_osd_declare_write_rec(const struct lu_env *env,
289                                       struct llog_handle *loghandle,
290                                       struct llog_rec_hdr *rec,
291                                       int idx, struct thandle *th)
292 {
293         struct llog_thread_info *lgi = llog_info(env);
294         __u32                   chunk_size;
295         struct dt_object        *o;
296         int                      rc;
297
298         ENTRY;
299
300         LASSERT(env);
301         LASSERT(th);
302         LASSERT(loghandle);
303         LASSERT(rec);
304         LASSERT(rec->lrh_len <= loghandle->lgh_ctxt->loc_chunk_size);
305
306         o = loghandle->lgh_obj;
307         LASSERT(o);
308
309         chunk_size = loghandle->lgh_ctxt->loc_chunk_size;
310         lgi->lgi_buf.lb_len = chunk_size;
311         lgi->lgi_buf.lb_buf = NULL;
312         /* each time we update header */
313         rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
314                                      th);
315         if (rc || idx == 0) /* if error or just header */
316                 RETURN(rc);
317
318         /**
319          * the pad record can be inserted so take into account double
320          * record size
321          */
322         lgi->lgi_buf.lb_len = chunk_size * 2;
323         lgi->lgi_buf.lb_buf = NULL;
324         /* XXX: implement declared window or multi-chunks approach */
325         rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th);
326
327         RETURN(rc);
328 }
329
330 /**
331  * Implementation of the llog_operations::lop_write
332  *
333  * This function writes the new record in the llog or modify the existed one.
334  *
335  * \param[in]  env              execution environment
336  * \param[in]  loghandle        llog handle of the current llog
337  * \param[in]  rec              llog record header. This is a real header of
338  *                              the full llog record to write. This is
339  *                              the beginning of buffer to write, the length
340  *                              of buffer is stored in \a rec::lrh_len
341  * \param[out] reccookie        pointer to the cookie to return back if needed.
342  *                              It is used for further cancel of this llog
343  *                              record.
344  * \param[in]  idx              index of the llog record. If \a idx == -1 then
345  *                              this is append case, otherwise \a idx is
346  *                              the index of record to modify
347  * \param[in]  th               current transaction handle
348  *
349  * \retval                      0 on successful write && \a reccookie == NULL
350  *                              1 on successful write && \a reccookie != NULL
351  * \retval                      negative error if write failed
352  */
353 static int llog_osd_write_rec(const struct lu_env *env,
354                               struct llog_handle *loghandle,
355                               struct llog_rec_hdr *rec,
356                               struct llog_cookie *reccookie,
357                               int idx, struct thandle *th)
358 {
359         struct llog_thread_info *lgi = llog_info(env);
360         struct llog_log_hdr     *llh;
361         int                      reclen = rec->lrh_len;
362         int                      index, rc;
363         struct llog_rec_tail    *lrt;
364         struct dt_object        *o;
365         __u32                   chunk_size;
366         size_t                   left;
367
368         ENTRY;
369
370         LASSERT(env);
371         llh = loghandle->lgh_hdr;
372         LASSERT(llh);
373         o = loghandle->lgh_obj;
374         LASSERT(o);
375         LASSERT(th);
376
377         chunk_size = llh->llh_hdr.lrh_len;
378         CDEBUG(D_OTHER, "new record %x to "DFID"\n",
379                rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
380
381         /* record length should not bigger than  */
382         if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
383                 RETURN(-E2BIG);
384
385         /* sanity check for fixed-records llog */
386         if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
387                 LASSERT(llh->llh_size != 0);
388                 LASSERT(llh->llh_size == reclen);
389         }
390
391         rc = dt_attr_get(env, o, &lgi->lgi_attr);
392         if (rc)
393                 RETURN(rc);
394
395         /**
396          * The modification case.
397          * If idx set then the record with that index must be modified.
398          * There are three cases possible:
399          * 1) the common case is the llog header update (idx == 0)
400          * 2) the llog record modification during llog process.
401          *    This is indicated by the \a loghandle::lgh_cur_idx > 0.
402          *    In that case the \a loghandle::lgh_cur_offset
403          * 3) otherwise this is assumed that llog consist of records of
404          *    fixed size, i.e. catalog. The llog header must has llh_size
405          *    field equal to record size. The record offset is calculated
406          *    just by /a idx value
407          *
408          * During modification we don't need extra header update because
409          * the bitmap and record count are not changed. The record header
410          * and tail remains the same too.
411          */
412         if (idx != LLOG_NEXT_IDX) {
413                 /* llog can be empty only when first record is being written */
414                 LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0));
415
416                 if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
417                         CERROR("%s: modify unset record %u\n",
418                                o->do_lu.lo_dev->ld_obd->obd_name, idx);
419                         RETURN(-ENOENT);
420                 }
421
422                 if (idx != rec->lrh_index) {
423                         CERROR("%s: modify index mismatch %d %u\n",
424                                o->do_lu.lo_dev->ld_obd->obd_name, idx,
425                                rec->lrh_index);
426                         RETURN(-EFAULT);
427                 }
428
429                 if (idx == LLOG_HEADER_IDX) {
430                         /* llog header update */
431                         __u32   *bitmap = LLOG_HDR_BITMAP(llh);
432
433                         lgi->lgi_off = 0;
434
435                         /* If it does not indicate the bitmap index
436                          * (reccookie == NULL), then it means update
437                          * the whole update header. Otherwise only
438                          * update header and bits needs to be updated,
439                          * and in DNE cases, it will signaficantly
440                          * shrink the RPC size.
441                          * see distribute_txn_cancel_records()*/
442                         if (reccookie == NULL) {
443                                 lgi->lgi_buf.lb_len = reclen;
444                                 lgi->lgi_buf.lb_buf = rec;
445                                 rc = dt_record_write(env, o, &lgi->lgi_buf,
446                                                      &lgi->lgi_off, th);
447                                 RETURN(rc);
448                         }
449
450                         /* update the header */
451                         lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
452                         lgi->lgi_buf.lb_buf = llh;
453                         rc = dt_record_write(env, o, &lgi->lgi_buf,
454                                              &lgi->lgi_off, th);
455                         if (rc != 0)
456                                 RETURN(rc);
457
458                         /* update the bitmap */
459                         index = reccookie->lgc_index;
460                         lgi->lgi_off = llh->llh_bitmap_offset +
461                                       (index / (sizeof(*bitmap) * 8)) *
462                                                         sizeof(*bitmap);
463                         lgi->lgi_buf.lb_len = sizeof(*bitmap);
464                         lgi->lgi_buf.lb_buf =
465                                         &bitmap[index/(sizeof(*bitmap)*8)];
466                         rc = dt_record_write(env, o, &lgi->lgi_buf,
467                                              &lgi->lgi_off, th);
468
469                         RETURN(rc);
470                 } else if (loghandle->lgh_cur_idx > 0) {
471                         /**
472                          * The lgh_cur_offset can be used only if index is
473                          * the same.
474                          */
475                         if (idx != loghandle->lgh_cur_idx) {
476                                 CERROR("%s: modify index mismatch %d %d\n",
477                                        o->do_lu.lo_dev->ld_obd->obd_name, idx,
478                                        loghandle->lgh_cur_idx);
479                                 RETURN(-EFAULT);
480                         }
481
482                         lgi->lgi_off = loghandle->lgh_cur_offset;
483                         CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, "
484                                "len:%u offset %llu\n",
485                                POSTID(&loghandle->lgh_id.lgl_oi), idx,
486                                rec->lrh_len, (long long)lgi->lgi_off);
487                 } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
488                         lgi->lgi_off = llh->llh_hdr.lrh_len +
489                                        (idx - 1) * reclen;
490                 } else {
491                         /* This can be result of lgh_cur_idx is not set during
492                          * llog processing or llh_size is not set to proper
493                          * record size for fixed records llog. Therefore it is
494                          * impossible to get record offset. */
495                         CERROR("%s: can't get record offset, idx:%d, "
496                                "len:%u.\n", o->do_lu.lo_dev->ld_obd->obd_name,
497                                idx, rec->lrh_len);
498                         RETURN(-EFAULT);
499                 }
500
501                 /* update only data, header and tail remain the same */
502                 lgi->lgi_off += sizeof(struct llog_rec_hdr);
503                 lgi->lgi_buf.lb_len = REC_DATA_LEN(rec);
504                 lgi->lgi_buf.lb_buf = REC_DATA(rec);
505                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
506                 if (rc == 0 && reccookie) {
507                         reccookie->lgc_lgl = loghandle->lgh_id;
508                         reccookie->lgc_index = idx;
509                         rc = 1;
510                 }
511                 RETURN(rc);
512         }
513
514         /**
515          * The append case.
516          * The most common case of using llog. The new index is assigned to
517          * the new record, new bit is set in llog bitmap and llog count is
518          * incremented.
519          *
520          * Make sure that records don't cross a chunk boundary, so we can
521          * process them page-at-a-time if needed.  If it will cross a chunk
522          * boundary, write in a fake (but referenced) entry to pad the chunk.
523          */
524         LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
525         lgi->lgi_off = lgi->lgi_attr.la_size;
526         left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
527         /* NOTE: padding is a record, but no bit is set */
528         if (left != 0 && left != reclen &&
529             left < (reclen + LLOG_MIN_REC_SIZE)) {
530                 index = loghandle->lgh_last_idx + 1;
531                 rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
532                 if (rc)
533                         RETURN(rc);
534                 loghandle->lgh_last_idx++; /* for pad rec */
535         }
536         /* if it's the last idx in log file, then return -ENOSPC
537          * or wrap around if a catalog */
538         if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
539             unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
540                      OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
541                      loghandle->lgh_last_idx >= cfs_fail_val)) {
542                 if (llh->llh_flags & LLOG_F_IS_CAT)
543                         loghandle->lgh_last_idx = 0;
544                 else
545                         RETURN(-ENOSPC);
546         }
547
548         /* increment the last_idx along with llh_tail index, they should
549          * be equal for a llog lifetime */
550         loghandle->lgh_last_idx++;
551         index = loghandle->lgh_last_idx;
552         LLOG_HDR_TAIL(llh)->lrt_index = index;
553         /**
554          * NB: the caller should make sure only 1 process access
555          * the lgh_last_idx, e.g. append should be exclusive.
556          * Otherwise it might hit the assert.
557          */
558         LASSERT(index < LLOG_HDR_BITMAP_SIZE(llh));
559         rec->lrh_index = index;
560         lrt = rec_tail(rec);
561         lrt->lrt_len = rec->lrh_len;
562         lrt->lrt_index = rec->lrh_index;
563
564         /* the lgh_hdr_mutex protects llog header data from concurrent
565          * update/cancel, the llh_count and llh_bitmap are protected */
566         mutex_lock(&loghandle->lgh_hdr_mutex);
567         if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) {
568                 CERROR("%s: index %u already set in log bitmap\n",
569                        o->do_lu.lo_dev->ld_obd->obd_name, index);
570                 mutex_unlock(&loghandle->lgh_hdr_mutex);
571                 LBUG(); /* should never happen */
572         }
573         llh->llh_count++;
574
575         if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
576                 /* Update the minimum size of the llog record */
577                 if (llh->llh_size == 0)
578                         llh->llh_size = reclen;
579                 else if (reclen < llh->llh_size)
580                         llh->llh_size = reclen;
581         }
582
583         if (lgi->lgi_attr.la_size == 0) {
584                 lgi->lgi_off = 0;
585                 lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
586                 lgi->lgi_buf.lb_buf = &llh->llh_hdr;
587                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
588                 if (rc != 0)
589                         GOTO(out_unlock, rc);
590         } else {
591                 __u32   *bitmap = LLOG_HDR_BITMAP(llh);
592
593                 /* Note: If this is not initialization (size == 0), then do not
594                  * write the whole header (8k bytes), only update header/tail
595                  * and bits needs to be updated. Because this update might be
596                  * part of cross-MDT operation, which needs to write these
597                  * updates into the update log(32KB limit) and also pack inside
598                  * the RPC (1MB limit), if we write 8K for each operation, which
599                  * will cost a lot space, and keep us adding more updates to one
600                  * update log.*/
601                 lgi->lgi_off = 0;
602                 lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
603                 lgi->lgi_buf.lb_buf = &llh->llh_hdr;
604                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
605                 if (rc != 0)
606                         GOTO(out_unlock, rc);
607
608                 lgi->lgi_off = llh->llh_bitmap_offset +
609                               (index / (sizeof(*bitmap) * 8)) * sizeof(*bitmap);
610                 lgi->lgi_buf.lb_len = sizeof(*bitmap);
611                 lgi->lgi_buf.lb_buf = &bitmap[index/(sizeof(*bitmap)*8)];
612                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
613                 if (rc != 0)
614                         GOTO(out_unlock, rc);
615
616                 lgi->lgi_off =  (unsigned long)LLOG_HDR_TAIL(llh) -
617                                 (unsigned long)llh;
618                 lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
619                 lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh);
620                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
621                 if (rc != 0)
622                         GOTO(out_unlock, rc);
623         }
624
625 out_unlock:
626         /* unlock here for remote object */
627         mutex_unlock(&loghandle->lgh_hdr_mutex);
628         if (rc)
629                 GOTO(out, rc);
630
631         /* computed index can be used to determine offset for fixed-size
632          * records. This also allows to handle Catalog wrap around case */
633         if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
634                 lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
635         } else {
636                 rc = dt_attr_get(env, o, &lgi->lgi_attr);
637                 if (rc)
638                         GOTO(out, rc);
639
640                 LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
641                 lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size,
642                                      lgi->lgi_off);
643         }
644
645         lgi->lgi_buf.lb_len = reclen;
646         lgi->lgi_buf.lb_buf = rec;
647         rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
648         if (rc < 0)
649                 GOTO(out, rc);
650
651         CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
652                POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
653                lgi->lgi_off);
654         if (reccookie != NULL) {
655                 reccookie->lgc_lgl = loghandle->lgh_id;
656                 reccookie->lgc_index = index;
657                 if ((rec->lrh_type == MDS_UNLINK_REC) ||
658                     (rec->lrh_type == MDS_SETATTR64_REC))
659                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
660                 else if (rec->lrh_type == OST_SZ_REC)
661                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
662                 else
663                         reccookie->lgc_subsys = -1;
664                 rc = 1;
665         }
666         RETURN(rc);
667 out:
668         /* cleanup llog for error case */
669         mutex_lock(&loghandle->lgh_hdr_mutex);
670         ext2_clear_bit(index, LLOG_HDR_BITMAP(llh));
671         llh->llh_count--;
672         mutex_unlock(&loghandle->lgh_hdr_mutex);
673
674         /* restore llog last_idx */
675         if (--loghandle->lgh_last_idx == 0 &&
676             (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
677                 /* catalog had just wrap-around case */
678                 loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
679         }
680         LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
681
682         RETURN(rc);
683 }
684
685 /**
686  * We can skip reading at least as many log blocks as the number of
687  * minimum sized log records we are skipping.  If it turns out
688  * that we are not far enough along the log (because the
689  * actual records are larger than minimum size) we just skip
690  * some more records.
691  *
692  * Note: in llog_process_thread, it will use bitmap offset as
693  * the index to locate the record, which also includs some pad
694  * records, whose record size is very small, and it also does not
695  * consider pad record when recording minimum record size (otherwise
696  * min_record size might be too small), so in some rare cases,
697  * it might skip too much record for @goal, see llog_osd_next_block().
698  *
699  * When force_mini_rec is true, it means we have to use LLOG_MIN_REC_SIZE
700  * as the min record size to skip over, usually because in the previous
701  * try, it skip too much record, see loog_osd_next(prev)_block().
702  */
703 static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off,
704                                   int curr, int goal, __u32 chunk_size,
705                                   bool force_mini_rec)
706 {
707         struct llog_log_hdr *llh = lgh->lgh_hdr;
708
709         /* Goal should not bigger than the record count */
710         if (goal > lgh->lgh_last_idx)
711                 goal = lgh->lgh_last_idx;
712
713         if (goal > curr) {
714                 if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
715                         *off = chunk_size + (goal - 1) * llh->llh_size;
716                 } else {
717                         __u64 min_rec_size = LLOG_MIN_REC_SIZE;
718
719                         if (llh->llh_size > 0 && !force_mini_rec)
720                                 min_rec_size = llh->llh_size;
721
722                         *off = *off + (goal - curr - 1) * min_rec_size;
723                 }
724         }
725         /* always align with lower chunk boundary*/
726         *off &= ~(chunk_size - 1);
727 }
728
729 /**
730  * Remove optional fields that the client doesn't expect.
731  * This is typically in order to ensure compatibility with older clients.
732  * It is assumed that since we exclusively remove fields, the block will be
733  * big enough to handle the remapped records. It is also assumed that records
734  * of a block have the same format (i.e.: the same features enabled).
735  *
736  * \param[in,out]    hdr        Header of the block of records to remap.
737  * \param[in,out]    last_hdr   Last header, don't read past this point.
738  * \param[in]        flags      Flags describing the fields to keep.
739  */
740 static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
741                                      struct llog_rec_hdr *last_hdr,
742                                      enum changelog_rec_flags flags)
743 {
744         if (hdr->lrh_type != CHANGELOG_REC)
745                 return;
746
747         do {
748                 struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
749
750                 changelog_remap_rec(rec, rec->cr_flags & flags);
751                 hdr = llog_rec_hdr_next(hdr);
752         } while ((char *)hdr <= (char *)last_hdr);
753 }
754
755 /**
756  * Implementation of the llog_operations::lop_next_block
757  *
758  * This function finds the the next llog block to return which contains
759  * record with required index. It is main part of llog processing.
760  *
761  * \param[in]     env           execution environment
762  * \param[in]     loghandle     llog handle of the current llog
763  * \param[in,out] cur_idx       index preceeding cur_offset
764  * \param[in]     next_idx      target index to find
765  * \param[in,out] cur_offset    furtherst point read in the file
766  * \param[in]     buf           pointer to data buffer to fill
767  * \param[in]     len           required len to read, it is
768  *                              usually llog chunk_size.
769  *
770  * \retval                      0 on successful buffer read
771  * \retval                      negative value on error
772  */
773 static int llog_osd_next_block(const struct lu_env *env,
774                                struct llog_handle *loghandle, int *cur_idx,
775                                int next_idx, __u64 *cur_offset, void *buf,
776                                int len)
777 {
778         struct llog_thread_info *lgi = llog_info(env);
779         struct dt_object        *o;
780         struct dt_device        *dt;
781         int                      rc;
782         __u32                   chunk_size;
783         int last_idx = *cur_idx;
784         __u64 last_offset = *cur_offset;
785         bool force_mini_rec = false;
786
787         ENTRY;
788
789         LASSERT(env);
790         LASSERT(lgi);
791
792         chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
793         if (len == 0 || len & (chunk_size - 1))
794                 RETURN(-EINVAL);
795
796         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
797                next_idx, *cur_idx, *cur_offset);
798
799         LASSERT(loghandle);
800         LASSERT(loghandle->lgh_ctxt);
801
802         o = loghandle->lgh_obj;
803         LASSERT(o);
804         LASSERT(dt_object_exists(o));
805         dt = lu2dt_dev(o->do_lu.lo_dev);
806         LASSERT(dt);
807
808         rc = dt_attr_get(env, o, &lgi->lgi_attr);
809         if (rc)
810                 GOTO(out, rc);
811
812         while (*cur_offset < lgi->lgi_attr.la_size) {
813                 struct llog_rec_hdr     *rec, *last_rec;
814                 struct llog_rec_tail    *tail;
815
816                 llog_skip_over(loghandle, cur_offset, *cur_idx,
817                                next_idx, chunk_size, force_mini_rec);
818
819                 /* read up to next llog chunk_size block */
820                 lgi->lgi_buf.lb_len = chunk_size -
821                                       (*cur_offset & (chunk_size - 1));
822                 lgi->lgi_buf.lb_buf = buf;
823
824                 rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
825                 if (rc < 0) {
826                         if (rc == -EBADR && !force_mini_rec)
827                                 goto retry;
828
829                         CERROR("%s: can't read llog block from log "DFID
830                                " offset "LPU64": rc = %d\n",
831                                o->do_lu.lo_dev->ld_obd->obd_name,
832                                PFID(lu_object_fid(&o->do_lu)), *cur_offset,
833                                rc);
834                         GOTO(out, rc);
835                 }
836
837                 if (rc < len) {
838                         /* signal the end of the valid buffer to
839                          * llog_process */
840                         memset(buf + rc, 0, len - rc);
841                 }
842
843                 if (rc == 0) { /* end of file, nothing to do */
844                         if (!force_mini_rec)
845                                 goto retry;
846                         GOTO(out, rc);
847                 }
848
849                 if (rc < sizeof(*tail)) {
850                         if (!force_mini_rec)
851                                 goto retry;
852
853                         CERROR("%s: invalid llog block at log id "DOSTID"/%u "
854                                "offset "LPU64"\n",
855                                o->do_lu.lo_dev->ld_obd->obd_name,
856                                POSTID(&loghandle->lgh_id.lgl_oi),
857                                loghandle->lgh_id.lgl_ogen, *cur_offset);
858                         GOTO(out, rc = -EINVAL);
859                 }
860
861                 rec = buf;
862                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
863                         lustre_swab_llog_rec(rec);
864
865                 tail = (struct llog_rec_tail *)((char *)buf + rc -
866                                                 sizeof(struct llog_rec_tail));
867                 /* get the last record in block */
868                 last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
869                                                    tail->lrt_len);
870
871                 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
872                         lustre_swab_llog_rec(last_rec);
873                 LASSERT(last_rec->lrh_index == tail->lrt_index);
874
875                 *cur_idx = tail->lrt_index;
876
877                 /* this shouldn't happen */
878                 if (tail->lrt_index == 0) {
879                         CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
880                                "offset "LPU64" bytes %d\n",
881                                o->do_lu.lo_dev->ld_obd->obd_name,
882                                POSTID(&loghandle->lgh_id.lgl_oi),
883                                loghandle->lgh_id.lgl_ogen, *cur_offset, rc);
884                         GOTO(out, rc = -EINVAL);
885                 }
886                 if (tail->lrt_index < next_idx) {
887                         last_idx = *cur_idx;
888                         last_offset = *cur_offset;
889                         continue;
890                 }
891
892                 /* sanity check that the start of the new buffer is no farther
893                  * than the record that we wanted.  This shouldn't happen. */
894                 if (rec->lrh_index > next_idx) {
895                         if (!force_mini_rec && next_idx > last_idx)
896                                 goto retry;
897
898                         CERROR("%s: missed desired record? %u > %u\n",
899                                o->do_lu.lo_dev->ld_obd->obd_name,
900                                rec->lrh_index, next_idx);
901                         GOTO(out, rc = -ENOENT);
902                 }
903
904                 /* Trim unsupported extensions for compat w/ older clients */
905                 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
906                         changelog_block_trim_ext(rec, last_rec,
907                                                  CLF_VERSION | CLF_RENAME);
908
909                 GOTO(out, rc = 0);
910
911 retry:
912                 /* Note: because there are some pad records in the
913                  * llog, so llog_skip_over() might skip too much
914                  * records, let's try skip again with minimum record */
915                 force_mini_rec = true;
916                 *cur_offset = last_offset;
917                 *cur_idx = last_idx;
918         }
919         GOTO(out, rc = -EIO);
920 out:
921         return rc;
922 }
923
924 /**
925  * Implementation of the llog_operations::lop_prev_block
926  *
927  * This function finds the llog block to return which contains
928  * record with required index but in reverse order - from end of llog
929  * to the beginning.
930  * It is main part of reverse llog processing.
931  *
932  * \param[in] env       execution environment
933  * \param[in] loghandle llog handle of the current llog
934  * \param[in] prev_idx  target index to find
935  * \param[in] buf       pointer to data buffer to fill
936  * \param[in] len       required len to read, it is llog_chunk_size usually.
937  *
938  * \retval              0 on successful buffer read
939  * \retval              negative value on error
940  */
941 static int llog_osd_prev_block(const struct lu_env *env,
942                                struct llog_handle *loghandle,
943                                int prev_idx, void *buf, int len)
944 {
945         struct llog_thread_info *lgi = llog_info(env);
946         struct dt_object        *o;
947         struct dt_device        *dt;
948         loff_t                   cur_offset;
949         __u32                   chunk_size;
950         int                      rc;
951
952         ENTRY;
953
954         chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
955         if (len == 0 || len & (chunk_size - 1))
956                 RETURN(-EINVAL);
957
958         CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
959
960         LASSERT(loghandle);
961         LASSERT(loghandle->lgh_ctxt);
962
963         o = loghandle->lgh_obj;
964         LASSERT(o);
965         LASSERT(dt_object_exists(o));
966         dt = lu2dt_dev(o->do_lu.lo_dev);
967         LASSERT(dt);
968
969         /* Let's only use mini record size for previous block read
970          * for now XXX */
971         cur_offset = chunk_size;
972         llog_skip_over(loghandle, &cur_offset, 0, prev_idx,
973                        chunk_size, true);
974
975         rc = dt_attr_get(env, o, &lgi->lgi_attr);
976         if (rc)
977                 GOTO(out, rc);
978
979         while (cur_offset < lgi->lgi_attr.la_size) {
980                 struct llog_rec_hdr     *rec, *last_rec;
981                 struct llog_rec_tail    *tail;
982
983                 lgi->lgi_buf.lb_len = len;
984                 lgi->lgi_buf.lb_buf = buf;
985                 rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
986                 if (rc < 0) {
987                         CERROR("%s: can't read llog block from log "DFID
988                                " offset "LPU64": rc = %d\n",
989                                o->do_lu.lo_dev->ld_obd->obd_name,
990                                PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
991                         GOTO(out, rc);
992                 }
993
994                 if (rc == 0) /* end of file, nothing to do */
995                         GOTO(out, rc);
996
997                 if (rc < sizeof(*tail)) {
998                         CERROR("%s: invalid llog block at log id "DOSTID"/%u "
999                                "offset "LPU64"\n",
1000                                o->do_lu.lo_dev->ld_obd->obd_name,
1001                                POSTID(&loghandle->lgh_id.lgl_oi),
1002                                loghandle->lgh_id.lgl_ogen, cur_offset);
1003                         GOTO(out, rc = -EINVAL);
1004                 }
1005
1006                 rec = buf;
1007                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
1008                         lustre_swab_llog_rec(rec);
1009
1010                 tail = (struct llog_rec_tail *)((char *)buf + rc -
1011                                                 sizeof(struct llog_rec_tail));
1012                 /* get the last record in block */
1013                 last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
1014                                                    le32_to_cpu(tail->lrt_len));
1015
1016                 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
1017                         lustre_swab_llog_rec(last_rec);
1018                 LASSERT(last_rec->lrh_index == tail->lrt_index);
1019
1020                 /* this shouldn't happen */
1021                 if (tail->lrt_index == 0) {
1022                         CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
1023                                "offset "LPU64"\n",
1024                                o->do_lu.lo_dev->ld_obd->obd_name,
1025                                POSTID(&loghandle->lgh_id.lgl_oi),
1026                                loghandle->lgh_id.lgl_ogen, cur_offset);
1027                         GOTO(out, rc = -EINVAL);
1028                 }
1029                 if (tail->lrt_index < prev_idx)
1030                         continue;
1031
1032                 /* sanity check that the start of the new buffer is no farther
1033                  * than the record that we wanted.  This shouldn't happen. */
1034                 if (rec->lrh_index > prev_idx) {
1035                         CERROR("%s: missed desired record? %u > %u\n",
1036                                o->do_lu.lo_dev->ld_obd->obd_name,
1037                                rec->lrh_index, prev_idx);
1038                         GOTO(out, rc = -ENOENT);
1039                 }
1040
1041                 /* Trim unsupported extensions for compat w/ older clients */
1042                 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
1043                         changelog_block_trim_ext(rec, last_rec,
1044                                                  CLF_VERSION | CLF_RENAME);
1045
1046                 GOTO(out, rc = 0);
1047         }
1048         GOTO(out, rc = -EIO);
1049 out:
1050         return rc;
1051 }
1052
1053 /**
1054  * This is helper function to get llog directory object. It is used by named
1055  * llog operations to find/insert/delete llog entry from llog directory.
1056  *
1057  * \param[in] env       execution environment
1058  * \param[in] ctxt      llog context
1059  *
1060  * \retval              dt_object of llog directory
1061  * \retval              ERR_PTR of negative value on error
1062  */
1063 static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
1064                                           struct llog_ctxt *ctxt)
1065 {
1066         struct dt_device        *dt;
1067         struct dt_thread_info   *dti = dt_info(env);
1068         struct dt_object        *dir;
1069         int                      rc;
1070
1071         dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
1072         if (ctxt->loc_dir == NULL) {
1073                 rc = dt_root_get(env, dt, &dti->dti_fid);
1074                 if (rc)
1075                         return ERR_PTR(rc);
1076                 dir = dt_locate(env, dt, &dti->dti_fid);
1077
1078                 if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) {
1079                         lu_object_put(env, &dir->do_lu);
1080                         return ERR_PTR(-ENOTDIR);
1081                 }
1082         } else {
1083                 lu_object_get(&ctxt->loc_dir->do_lu);
1084                 dir = ctxt->loc_dir;
1085         }
1086
1087         return dir;
1088 }
1089
1090 /**
1091  * Implementation of the llog_operations::lop_open
1092  *
1093  * This function opens the llog by its logid or by name, it may open also
1094  * non existent llog and assing then new id to it.
1095  * The llog_open/llog_close pair works similar to lu_object_find/put,
1096  * the object may not exist prior open. The result of open is just dt_object
1097  * in the llog header.
1098  *
1099  * \param[in] env               execution environment
1100  * \param[in] handle            llog handle of the current llog
1101  * \param[in] logid             logid of llog to open (nameless llog)
1102  * \param[in] name              name of llog to open (named llog)
1103  * \param[in] open_param
1104  *                              LLOG_OPEN_NEW - new llog, may not exist
1105  *                              LLOG_OPEN_EXIST - old llog, must exist
1106  *
1107  * \retval                      0 on successful open, llog_handle::lgh_obj
1108  *                              contains the dt_object of the llog.
1109  * \retval                      negative value on error
1110  */
1111 static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
1112                          struct llog_logid *logid, char *name,
1113                          enum llog_open_param open_param)
1114 {
1115         struct llog_thread_info         *lgi = llog_info(env);
1116         struct llog_ctxt                *ctxt = handle->lgh_ctxt;
1117         struct dt_object                *o;
1118         struct dt_device                *dt;
1119         struct ls_device                *ls;
1120         struct local_oid_storage        *los = NULL;
1121         int                              rc = 0;
1122
1123         ENTRY;
1124
1125         LASSERT(env);
1126         LASSERT(ctxt);
1127         LASSERT(ctxt->loc_exp);
1128         LASSERT(ctxt->loc_exp->exp_obd);
1129         dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
1130         LASSERT(dt);
1131         if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1132                 struct lu_object_conf conf = { 0 };
1133                 if (logid != NULL) {
1134                         logid_to_fid(logid, &lgi->lgi_fid);
1135                 } else {
1136                         /* If logid == NULL, then it means the caller needs
1137                          * to allocate new FID (llog_cat_declare_add_rec()). */
1138                         rc = obd_fid_alloc(env, ctxt->loc_exp,
1139                                            &lgi->lgi_fid, NULL);
1140                         if (rc < 0)
1141                                 RETURN(rc);
1142                         rc = 0;
1143                         conf.loc_flags = LOC_F_NEW;
1144                 }
1145
1146                 o = dt_locate_at(env, dt, &lgi->lgi_fid,
1147                                  dt->dd_lu_dev.ld_site->ls_top_dev, &conf);
1148                 if (IS_ERR(o))
1149                         RETURN(PTR_ERR(o));
1150
1151                 goto after_open;
1152         }
1153
1154         ls = ls_device_get(dt);
1155         if (IS_ERR(ls))
1156                 RETURN(PTR_ERR(ls));
1157
1158         mutex_lock(&ls->ls_los_mutex);
1159         los = dt_los_find(ls, name != NULL ? FID_SEQ_LLOG_NAME : FID_SEQ_LLOG);
1160         mutex_unlock(&ls->ls_los_mutex);
1161         LASSERT(los);
1162         ls_device_put(env, ls);
1163
1164         LASSERT(handle);
1165
1166         if (logid != NULL) {
1167                 logid_to_fid(logid, &lgi->lgi_fid);
1168         } else if (name) {
1169                 struct dt_object *llog_dir;
1170
1171                 llog_dir = llog_osd_dir_get(env, ctxt);
1172                 if (IS_ERR(llog_dir))
1173                         GOTO(out, rc = PTR_ERR(llog_dir));
1174                 dt_read_lock(env, llog_dir, 0);
1175                 rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid);
1176                 dt_read_unlock(env, llog_dir);
1177                 lu_object_put(env, &llog_dir->do_lu);
1178                 if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
1179                         /* generate fid for new llog */
1180                         rc = local_object_fid_generate(env, los,
1181                                                        &lgi->lgi_fid);
1182                 }
1183                 if (rc < 0)
1184                         GOTO(out, rc);
1185                 OBD_ALLOC(handle->lgh_name, strlen(name) + 1);
1186                 if (handle->lgh_name)
1187                         strcpy(handle->lgh_name, name);
1188                 else
1189                         GOTO(out, rc = -ENOMEM);
1190         } else {
1191                 LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param);
1192                 /* generate fid for new llog */
1193                 rc = local_object_fid_generate(env, los, &lgi->lgi_fid);
1194                 if (rc < 0)
1195                         GOTO(out, rc);
1196         }
1197
1198         o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
1199         if (IS_ERR(o))
1200                 GOTO(out_name, rc = PTR_ERR(o));
1201
1202 after_open:
1203         /* No new llog is expected but doesn't exist */
1204         if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
1205                 GOTO(out_put, rc = -ENOENT);
1206
1207         fid_to_logid(&lgi->lgi_fid, &handle->lgh_id);
1208         handle->lgh_obj = o;
1209         handle->private_data = los;
1210         LASSERT(handle->lgh_ctxt);
1211
1212         RETURN(rc);
1213
1214 out_put:
1215         lu_object_put(env, &o->do_lu);
1216 out_name:
1217         if (handle->lgh_name != NULL)
1218                 OBD_FREE(handle->lgh_name, strlen(name) + 1);
1219 out:
1220         if (los != NULL)
1221                 dt_los_put(los);
1222         RETURN(rc);
1223 }
1224
1225 /**
1226  * Implementation of the llog_operations::lop_exist
1227  *
1228  * This function checks that llog exists on storage.
1229  *
1230  * \param[in] handle    llog handle of the current llog
1231  *
1232  * \retval              true if llog object exists and is not just destroyed
1233  * \retval              false if llog doesn't exist or just destroyed
1234  */
1235 static int llog_osd_exist(struct llog_handle *handle)
1236 {
1237         LASSERT(handle->lgh_obj);
1238         return (dt_object_exists(handle->lgh_obj) &&
1239                 !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header));
1240 }
1241
1242 /**
1243  * Get dir for regular fid log object
1244  *
1245  * Get directory for regular fid log object, and these regular fid log
1246  * object will be inserted under this directory, to satisfy the FS
1247  * consistency check, e2fsck etc.
1248  *
1249  * \param [in] env      execution environment
1250  * \param [in] dto      llog object
1251  *
1252  * \retval              pointer to the directory if it is found.
1253  * \retval              ERR_PTR(negative errno) if it fails.
1254  */
1255 struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env,
1256                                                struct dt_object *dto)
1257 {
1258         struct llog_thread_info *lgi = llog_info(env);
1259         struct seq_server_site *ss = dto->do_lu.lo_dev->ld_site->ld_seq_site;
1260         struct lu_seq_range     *range = &lgi->lgi_range;
1261         struct lu_fid           *dir_fid = &lgi->lgi_fid;
1262         struct dt_object        *dir;
1263         int                     rc;
1264         ENTRY;
1265
1266         fld_range_set_any(range);
1267         LASSERT(ss != NULL);
1268         rc = ss->ss_server_fld->lsf_seq_lookup(env, ss->ss_server_fld,
1269                                    fid_seq(lu_object_fid(&dto->do_lu)), range);
1270         if (rc < 0)
1271                 RETURN(ERR_PTR(rc));
1272
1273         lu_update_log_dir_fid(dir_fid, range->lsr_index);
1274         dir = dt_locate(env, lu2dt_dev(dto->do_lu.lo_dev), dir_fid);
1275         if (IS_ERR(dir))
1276                 RETURN(dir);
1277
1278         if (!dt_try_as_dir(env, dir)) {
1279                 lu_object_put(env, &dir->do_lu);
1280                 RETURN(ERR_PTR(-ENOTDIR));
1281         }
1282
1283         RETURN(dir);
1284 }
1285
1286 /**
1287  * Add llog object with regular FID to name entry
1288  *
1289  * Add llog object with regular FID to name space, and each llog
1290  * object on each MDT will be /update_log_dir/[seq:oid:ver],
1291  * so to satisfy the namespace consistency check, e2fsck etc.
1292  *
1293  * \param [in] env      execution environment
1294  * \param [in] dto      llog object
1295  * \param [in] th       thandle
1296  * \param [in] declare  if it is declare or execution
1297  *
1298  * \retval              0 if insertion succeeds.
1299  * \retval              negative errno if insertion fails.
1300  */
1301 static int
1302 llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
1303                                     struct dt_object *dto,
1304                                     struct thandle *th, bool declare)
1305 {
1306         struct llog_thread_info *lgi = llog_info(env);
1307         const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
1308         struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
1309         struct dt_object        *dir;
1310         char                    *name = lgi->lgi_name;
1311         int                     rc;
1312         ENTRY;
1313
1314         if (!fid_is_norm(fid))
1315                 RETURN(0);
1316
1317         dir = llog_osd_get_regular_fid_dir(env, dto);
1318         if (IS_ERR(dir))
1319                 RETURN(PTR_ERR(dir));
1320
1321         rec->rec_fid = fid;
1322         rec->rec_type = S_IFREG;
1323         snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
1324         dt_write_lock(env, dir, 0);
1325         if (declare) {
1326                 rc = dt_declare_insert(env, dir, (struct dt_rec *)rec,
1327                                (struct dt_key *)name, th);
1328         } else {
1329                 rc = dt_insert(env, dir, (struct dt_rec *)rec,
1330                                (struct dt_key *)name, th, 1);
1331         }
1332         dt_write_unlock(env, dir);
1333
1334         lu_object_put(env, &dir->do_lu);
1335         RETURN(rc);
1336 }
1337
1338
1339 /**
1340  * Implementation of the llog_operations::lop_declare_create
1341  *
1342  * This function declares the llog create. It declares also name insert
1343  * into llog directory in case of named llog.
1344  *
1345  * \param[in] env       execution environment
1346  * \param[in] res       llog handle of the current llog
1347  * \param[in] th        current transaction handle
1348  *
1349  * \retval              0 on successful create declaration
1350  * \retval              negative value on error
1351  */
1352 static int llog_osd_declare_create(const struct lu_env *env,
1353                                    struct llog_handle *res, struct thandle *th)
1354 {
1355         struct llog_thread_info         *lgi = llog_info(env);
1356         struct dt_insert_rec            *rec = &lgi->lgi_dt_rec;
1357         struct local_oid_storage        *los;
1358         struct dt_object                *o;
1359         int                              rc;
1360
1361         ENTRY;
1362
1363         LASSERT(res->lgh_obj);
1364         LASSERT(th);
1365
1366         /* object can be created by another thread */
1367         o = res->lgh_obj;
1368         if (dt_object_exists(o))
1369                 RETURN(0);
1370
1371         if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1372                 struct llog_thread_info *lgi = llog_info(env);
1373
1374                 lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE;
1375                 lgi->lgi_attr.la_size = 0;
1376                 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1377                 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1378
1379                 rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
1380                                        &lgi->lgi_dof, th);
1381                 if (rc < 0)
1382                         RETURN(rc);
1383
1384
1385                 rc = llog_osd_regular_fid_add_name_entry(env, o, th, true);
1386
1387                 RETURN(rc);
1388         }
1389         los = res->private_data;
1390         LASSERT(los);
1391
1392         rc = llog_osd_declare_new_object(env, los, o, th);
1393         if (rc)
1394                 RETURN(rc);
1395
1396         /* do not declare header initialization here as it's declared
1397          * in llog_osd_declare_write_rec() which is always called */
1398
1399         if (res->lgh_name) {
1400                 struct dt_object *llog_dir;
1401
1402                 llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
1403                 if (IS_ERR(llog_dir))
1404                         RETURN(PTR_ERR(llog_dir));
1405                 logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
1406                 rec->rec_fid = &lgi->lgi_fid;
1407                 rec->rec_type = S_IFREG;
1408                 rc = dt_declare_insert(env, llog_dir,
1409                                        (struct dt_rec *)rec,
1410                                        (struct dt_key *)res->lgh_name, th);
1411                 lu_object_put(env, &llog_dir->do_lu);
1412                 if (rc)
1413                         CERROR("%s: can't declare named llog %s: rc = %d\n",
1414                                o->do_lu.lo_dev->ld_obd->obd_name,
1415                                res->lgh_name, rc);
1416         }
1417         RETURN(rc);
1418 }
1419
1420 /**
1421  * Implementation of the llog_operations::lop_create
1422  *
1423  * This function creates the llog according with llog_handle::lgh_obj
1424  * and llog_handle::lgh_name.
1425  *
1426  * \param[in] env       execution environment
1427  * \param[in] res       llog handle of the current llog
1428  * \param[in] th        current transaction handle
1429  *
1430  * \retval              0 on successful create
1431  * \retval              negative value on error
1432  */
1433 static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
1434                            struct thandle *th)
1435 {
1436         struct llog_thread_info *lgi = llog_info(env);
1437         struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
1438         struct local_oid_storage *los;
1439         struct dt_object        *o;
1440         int                      rc = 0;
1441
1442         ENTRY;
1443
1444         LASSERT(env);
1445         o = res->lgh_obj;
1446         LASSERT(o);
1447
1448         /* llog can be already created */
1449         if (dt_object_exists(o))
1450                 RETURN(-EEXIST);
1451
1452         if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1453                 struct llog_thread_info *lgi = llog_info(env);
1454
1455                 lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE | LA_TYPE;
1456                 lgi->lgi_attr.la_size = 0;
1457                 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1458                 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1459
1460                 dt_write_lock(env, o, 0);
1461                 rc = dt_create(env, o, &lgi->lgi_attr, NULL,
1462                                &lgi->lgi_dof, th);
1463                 dt_write_unlock(env, o);
1464                 if (rc < 0)
1465                         RETURN(rc);
1466
1467                 rc = llog_osd_regular_fid_add_name_entry(env, o, th, false);
1468
1469                 RETURN(rc);
1470         }
1471
1472         los = res->private_data;
1473         LASSERT(los);
1474
1475         dt_write_lock(env, o, 0);
1476         if (!dt_object_exists(o))
1477                 rc = llog_osd_create_new_object(env, los, o, th);
1478         else
1479                 rc = -EEXIST;
1480
1481         dt_write_unlock(env, o);
1482         if (rc)
1483                 RETURN(rc);
1484
1485         if (res->lgh_name) {
1486                 struct dt_object *llog_dir;
1487
1488                 llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
1489                 if (IS_ERR(llog_dir))
1490                         RETURN(PTR_ERR(llog_dir));
1491
1492                 logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
1493                 rec->rec_fid = &lgi->lgi_fid;
1494                 rec->rec_type = S_IFREG;
1495                 dt_read_lock(env, llog_dir, 0);
1496                 rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
1497                                (struct dt_key *)res->lgh_name,
1498                                th, 1);
1499                 dt_read_unlock(env, llog_dir);
1500                 lu_object_put(env, &llog_dir->do_lu);
1501                 if (rc)
1502                         CERROR("%s: can't create named llog %s: rc = %d\n",
1503                                o->do_lu.lo_dev->ld_obd->obd_name,
1504                                res->lgh_name, rc);
1505         }
1506         RETURN(rc);
1507 }
1508
1509 /**
1510  * Implementation of the llog_operations::lop_close
1511  *
1512  * This function closes the llog. It just put llog object and referenced
1513  * local storage.
1514  *
1515  * \param[in] env       execution environment
1516  * \param[in] handle    llog handle of the current llog
1517  *
1518  * \retval              0 on successful llog close
1519  * \retval              negative value on error
1520  */
1521 static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
1522 {
1523         struct local_oid_storage        *los;
1524         int                              rc = 0;
1525
1526         ENTRY;
1527
1528         LASSERT(handle->lgh_obj);
1529
1530         if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1531                 /* Remove the object from the cache, otherwise it may
1532                  * hold LOD being released during cleanup process */
1533                 lu_object_put_nocache(env, &handle->lgh_obj->do_lu);
1534                 LASSERT(handle->private_data == NULL);
1535                 RETURN(rc);
1536         } else {
1537                 lu_object_put(env, &handle->lgh_obj->do_lu);
1538         }
1539         los = handle->private_data;
1540         LASSERT(los);
1541         dt_los_put(los);
1542
1543         if (handle->lgh_name)
1544                 OBD_FREE(handle->lgh_name, strlen(handle->lgh_name) + 1);
1545
1546         RETURN(rc);
1547 }
1548
1549 /**
1550  * delete llog object name entry
1551  *
1552  * Delete llog object (with regular FID) from name space (under
1553  * update_log_dir).
1554  *
1555  * \param [in] env      execution environment
1556  * \param [in] dto      llog object
1557  * \param [in] th       thandle
1558  * \param [in] declare  if it is declare or execution
1559  *
1560  * \retval              0 if deletion succeeds.
1561  * \retval              negative errno if deletion fails.
1562  */
1563 static int
1564 llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
1565                                     struct dt_object *dto,
1566                                     struct thandle *th, bool declare)
1567 {
1568         struct llog_thread_info *lgi = llog_info(env);
1569         const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
1570         struct dt_object        *dir;
1571         char                    *name = lgi->lgi_name;
1572         int                     rc;
1573         ENTRY;
1574
1575         if (!fid_is_norm(fid))
1576                 RETURN(0);
1577
1578         dir = llog_osd_get_regular_fid_dir(env, dto);
1579         if (IS_ERR(dir))
1580                 RETURN(PTR_ERR(dir));
1581
1582         snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
1583         dt_write_lock(env, dir, 0);
1584         if (declare) {
1585                 rc = dt_declare_delete(env, dir, (struct dt_key *)name,
1586                                        th);
1587         } else {
1588                 rc = dt_delete(env, dir, (struct dt_key *)name, th);
1589         }
1590         dt_write_unlock(env, dir);
1591
1592         lu_object_put(env, &dir->do_lu);
1593         RETURN(rc);
1594 }
1595
1596 /**
1597  * Implementation of the llog_operations::lop_declare_destroy
1598  *
1599  * This function declare destroys the llog and deletes also entry in the
1600  * llog directory in case of named llog. Llog should be opened prior that.
1601  *
1602  * \param[in] env               execution environment
1603  * \param[in] loghandle llog handle of the current llog
1604  *
1605  * \retval              0 on successful destroy
1606  * \retval              negative value on error
1607  */
1608 static int llog_osd_declare_destroy(const struct lu_env *env,
1609                                     struct llog_handle *loghandle,
1610                                     struct thandle *th)
1611 {
1612         struct llog_ctxt        *ctxt;
1613         struct dt_object        *o, *llog_dir = NULL;
1614         int                      rc;
1615
1616         ENTRY;
1617
1618         ctxt = loghandle->lgh_ctxt;
1619         LASSERT(ctxt);
1620
1621         o = loghandle->lgh_obj;
1622         LASSERT(o);
1623
1624         if (loghandle->lgh_name) {
1625                 llog_dir = llog_osd_dir_get(env, ctxt);
1626                 if (IS_ERR(llog_dir))
1627                         RETURN(PTR_ERR(llog_dir));
1628
1629                 rc = dt_declare_delete(env, llog_dir,
1630                                        (struct dt_key *)loghandle->lgh_name,
1631                                        th);
1632                 if (rc < 0)
1633                         GOTO(out_put, rc);
1634         }
1635
1636         rc = dt_declare_ref_del(env, o, th);
1637         if (rc < 0)
1638                 GOTO(out_put, rc);
1639
1640         rc = dt_declare_destroy(env, o, th);
1641         if (rc < 0)
1642                 GOTO(out_put, rc);
1643
1644         if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1645                 rc = llog_osd_regular_fid_del_name_entry(env, o, th, true);
1646                 if (rc < 0)
1647                         GOTO(out_put, rc);
1648         }
1649
1650 out_put:
1651         if (!(IS_ERR_OR_NULL(llog_dir)))
1652                 lu_object_put(env, &llog_dir->do_lu);
1653
1654         RETURN(rc);
1655 }
1656
1657
1658 /**
1659  * Implementation of the llog_operations::lop_destroy
1660  *
1661  * This function destroys the llog and deletes also entry in the
1662  * llog directory in case of named llog. Llog should be opened prior that.
1663  * Destroy method is not part of external transaction and does everything
1664  * inside.
1665  *
1666  * \param[in] env               execution environment
1667  * \param[in] loghandle llog handle of the current llog
1668  *
1669  * \retval              0 on successful destroy
1670  * \retval              negative value on error
1671  */
1672 static int llog_osd_destroy(const struct lu_env *env,
1673                             struct llog_handle *loghandle, struct thandle *th)
1674 {
1675         struct llog_ctxt        *ctxt;
1676         struct dt_object        *o, *llog_dir = NULL;
1677         int                      rc;
1678
1679         ENTRY;
1680
1681         ctxt = loghandle->lgh_ctxt;
1682         LASSERT(ctxt != NULL);
1683
1684         o = loghandle->lgh_obj;
1685         LASSERT(o != NULL);
1686
1687         dt_write_lock(env, o, 0);
1688         if (!dt_object_exists(o))
1689                 GOTO(out_unlock, rc = 0);
1690
1691         if (loghandle->lgh_name) {
1692                 llog_dir = llog_osd_dir_get(env, ctxt);
1693                 if (IS_ERR(llog_dir))
1694                         GOTO(out_unlock, rc = PTR_ERR(llog_dir));
1695
1696                 dt_read_lock(env, llog_dir, 0);
1697                 rc = dt_delete(env, llog_dir,
1698                                (struct dt_key *)loghandle->lgh_name,
1699                                th);
1700                 dt_read_unlock(env, llog_dir);
1701                 if (rc) {
1702                         CERROR("%s: can't remove llog %s: rc = %d\n",
1703                                o->do_lu.lo_dev->ld_obd->obd_name,
1704                                loghandle->lgh_name, rc);
1705                         GOTO(out_unlock, rc);
1706                 }
1707         }
1708
1709         dt_ref_del(env, o, th);
1710         rc = dt_destroy(env, o, th);
1711         if (rc < 0)
1712                 GOTO(out_unlock, rc);
1713
1714         if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1715                 rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
1716                 if (rc < 0)
1717                         GOTO(out_unlock, rc);
1718         }
1719
1720 out_unlock:
1721         dt_write_unlock(env, o);
1722         if (!(IS_ERR_OR_NULL(llog_dir)))
1723                 lu_object_put(env, &llog_dir->do_lu);
1724         RETURN(rc);
1725 }
1726
1727 /**
1728  * Implementation of the llog_operations::lop_setup
1729  *
1730  * This function setup the llog on local storage.
1731  *
1732  * \param[in] env       execution environment
1733  * \param[in] obd       obd device the llog belongs to
1734  * \param[in] olg       the llog group, it is always zero group now.
1735  * \param[in] ctxt_idx  the llog index, it defines the purpose of this llog.
1736  *                      Every new llog type have to use own index.
1737  * \param[in] disk_obd  the storage obd, where llog is stored.
1738  *
1739  * \retval              0 on successful llog setup
1740  * \retval              negative value on error
1741  */
1742 static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd,
1743                           struct obd_llog_group *olg, int ctxt_idx,
1744                           struct obd_device *disk_obd)
1745 {
1746         struct llog_thread_info         *lgi = llog_info(env);
1747         struct llog_ctxt                *ctxt;
1748         int                              rc = 0;
1749         ENTRY;
1750
1751         LASSERT(obd);
1752         LASSERT(olg->olg_ctxts[ctxt_idx]);
1753
1754         ctxt = llog_ctxt_get(olg->olg_ctxts[ctxt_idx]);
1755         LASSERT(ctxt);
1756
1757         if (disk_obd == NULL)
1758                 GOTO(out, rc = 0);
1759
1760         /* initialize data allowing to generate new fids,
1761          * literally we need a sequece */
1762         lgi->lgi_fid.f_seq = FID_SEQ_LLOG;
1763         lgi->lgi_fid.f_oid = 1;
1764         lgi->lgi_fid.f_ver = 0;
1765         rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
1766                                     &lgi->lgi_fid,
1767                                     &ctxt->loc_los_nameless);
1768         if (rc != 0)
1769                 GOTO(out, rc);
1770
1771         lgi->lgi_fid.f_seq = FID_SEQ_LLOG_NAME;
1772         lgi->lgi_fid.f_oid = 1;
1773         lgi->lgi_fid.f_ver = 0;
1774         rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
1775                                     &lgi->lgi_fid,
1776                                     &ctxt->loc_los_named);
1777         if (rc != 0) {
1778                 local_oid_storage_fini(env, ctxt->loc_los_nameless);
1779                 ctxt->loc_los_nameless = NULL;
1780         }
1781
1782         GOTO(out, rc);
1783
1784 out:
1785         llog_ctxt_put(ctxt);
1786         return rc;
1787 }
1788
1789 /**
1790  * Implementation of the llog_operations::lop_cleanup
1791  *
1792  * This function cleanups the llog on local storage.
1793  *
1794  * \param[in] env       execution environment
1795  * \param[in] ctxt      the llog context
1796  *
1797  * \retval              0
1798  */
1799 static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
1800 {
1801         if (ctxt->loc_los_nameless != NULL) {
1802                 local_oid_storage_fini(env, ctxt->loc_los_nameless);
1803                 ctxt->loc_los_nameless = NULL;
1804         }
1805
1806         if (ctxt->loc_los_named != NULL) {
1807                 local_oid_storage_fini(env, ctxt->loc_los_named);
1808                 ctxt->loc_los_named = NULL;
1809         }
1810
1811         return 0;
1812 }
1813
1814 struct llog_operations llog_osd_ops = {
1815         .lop_next_block         = llog_osd_next_block,
1816         .lop_prev_block         = llog_osd_prev_block,
1817         .lop_read_header        = llog_osd_read_header,
1818         .lop_declare_destroy    = llog_osd_declare_destroy,
1819         .lop_destroy            = llog_osd_destroy,
1820         .lop_setup              = llog_osd_setup,
1821         .lop_cleanup            = llog_osd_cleanup,
1822         .lop_open               = llog_osd_open,
1823         .lop_exist              = llog_osd_exist,
1824         .lop_declare_create     = llog_osd_declare_create,
1825         .lop_create             = llog_osd_create,
1826         .lop_declare_write_rec  = llog_osd_declare_write_rec,
1827         .lop_write_rec          = llog_osd_write_rec,
1828         .lop_close              = llog_osd_close,
1829 };
1830 EXPORT_SYMBOL(llog_osd_ops);
1831
1832 struct llog_operations llog_common_cat_ops = {
1833         .lop_next_block         = llog_osd_next_block,
1834         .lop_prev_block         = llog_osd_prev_block,
1835         .lop_read_header        = llog_osd_read_header,
1836         .lop_declare_destroy    = llog_osd_declare_destroy,
1837         .lop_destroy            = llog_osd_destroy,
1838         .lop_setup              = llog_osd_setup,
1839         .lop_cleanup            = llog_osd_cleanup,
1840         .lop_open               = llog_osd_open,
1841         .lop_exist              = llog_osd_exist,
1842         .lop_declare_create     = llog_osd_declare_create,
1843         .lop_create             = llog_osd_create,
1844         .lop_declare_write_rec  = llog_osd_declare_write_rec,
1845         .lop_write_rec          = llog_osd_write_rec,
1846         .lop_close              = llog_osd_close,
1847         .lop_add                = llog_cat_add_rec,
1848         .lop_declare_add        = llog_cat_declare_add_rec,
1849 };
1850 EXPORT_SYMBOL(llog_common_cat_ops);
1851
1852 /**
1853  * Read the special file which contains the list of llog catalogs IDs
1854  *
1855  * This function reads the CATALOGS file which contains the array of llog
1856  * catalogs IDs. The main purpose of this file is to store OSP llogs indexed
1857  * by OST/MDT number.
1858  *
1859  * \param[in]  env              execution environment
1860  * \param[in]  d                corresponding storage device
1861  * \param[in]  idx              position to start from, usually OST/MDT index
1862  * \param[in]  count            how many catalog IDs to read
1863  * \param[out] idarray          the buffer for the data. If it is NULL then
1864  *                              function returns just number of catalog IDs
1865  *                              in the file.
1866  * \param[in]  fid              LLOG_CATALOGS_OID for CATALOG object
1867  *
1868  * \retval                      0 on successful read of catalog IDs
1869  * \retval                      negative value on error
1870  * \retval                      positive value which is number of records in
1871  *                              the file if \a idarray is NULL
1872  */
1873 int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
1874                           int idx, int count, struct llog_catid *idarray,
1875                           const struct lu_fid *fid)
1876 {
1877         struct llog_thread_info *lgi = llog_info(env);
1878         struct dt_object        *o = NULL;
1879         struct thandle          *th;
1880         int                      rc, size;
1881
1882         ENTRY;
1883
1884         LASSERT(d);
1885
1886         size = sizeof(*idarray) * count;
1887         lgi->lgi_off = idx *  sizeof(*idarray);
1888
1889         lgi->lgi_fid = *fid;
1890         o = dt_locate(env, d, &lgi->lgi_fid);
1891         if (IS_ERR(o))
1892                 RETURN(PTR_ERR(o));
1893
1894         if (!dt_object_exists(o)) {
1895                 th = dt_trans_create(env, d);
1896                 if (IS_ERR(th))
1897                         GOTO(out, rc = PTR_ERR(th));
1898
1899                 lgi->lgi_attr.la_valid = LA_MODE;
1900                 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1901                 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1902
1903                 th->th_wait_submit = 1;
1904                 /* Make the llog object creation synchronization, so
1905                  * it will be reliable to the reference, especially
1906                  * for remote reference */
1907                 th->th_sync = 1;
1908
1909                 rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
1910                                        &lgi->lgi_dof, th);
1911                 if (rc)
1912                         GOTO(out_trans, rc);
1913
1914                 rc = dt_trans_start_local(env, d, th);
1915                 if (rc)
1916                         GOTO(out_trans, rc);
1917
1918                 dt_write_lock(env, o, 0);
1919                 if (!dt_object_exists(o))
1920                         rc = dt_create(env, o, &lgi->lgi_attr, NULL,
1921                                        &lgi->lgi_dof, th);
1922                 dt_write_unlock(env, o);
1923 out_trans:
1924                 dt_trans_stop(env, d, th);
1925                 if (rc)
1926                         GOTO(out, rc);
1927         }
1928
1929         rc = dt_attr_get(env, o, &lgi->lgi_attr);
1930         if (rc)
1931                 GOTO(out, rc);
1932
1933         if (!S_ISREG(lgi->lgi_attr.la_mode)) {
1934                 CERROR("%s: CATALOGS is not a regular file!: mode = %o\n",
1935                        o->do_lu.lo_dev->ld_obd->obd_name,
1936                        lgi->lgi_attr.la_mode);
1937                 GOTO(out, rc = -ENOENT);
1938         }
1939
1940         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
1941                (int)lgi->lgi_attr.la_size, size);
1942
1943         /* return just number of llogs */
1944         if (idarray == NULL) {
1945                 rc = lgi->lgi_attr.la_size / sizeof(*idarray);
1946                 GOTO(out, rc);
1947         }
1948
1949         /* read for new ost index or for empty file */
1950         memset(idarray, 0, size);
1951         if (lgi->lgi_attr.la_size <= lgi->lgi_off)
1952                 GOTO(out, rc = 0);
1953         if (lgi->lgi_attr.la_size < lgi->lgi_off + size)
1954                 size = lgi->lgi_attr.la_size - lgi->lgi_off;
1955
1956         lgi->lgi_buf.lb_buf = idarray;
1957         lgi->lgi_buf.lb_len = size;
1958         rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
1959         /* -EFAULT means the llog is a sparse file. This is not an error
1960          * after arbitrary OST index is supported. */
1961         if (rc < 0 && rc != -EFAULT) {
1962                 CERROR("%s: error reading CATALOGS: rc = %d\n",
1963                        o->do_lu.lo_dev->ld_obd->obd_name,  rc);
1964                 GOTO(out, rc);
1965         }
1966
1967         EXIT;
1968 out:
1969         lu_object_put(env, &o->do_lu);
1970         RETURN(rc);
1971 }
1972 EXPORT_SYMBOL(llog_osd_get_cat_list);
1973
1974 /**
1975  * Write the special file which contains the list of llog catalogs IDs
1976  *
1977  * This function writes the CATALOG file which contains the array of llog
1978  * catalogs IDs. It is used mostly to store OSP llogs indexed by OST/MDT
1979  * number.
1980  *
1981  * \param[in]  env      execution environment
1982  * \param[in]  d        corresponding storage device
1983  * \param[in]  idx      position to start from, usually OST/MDT index
1984  * \param[in]  count    how many catalog IDs to write
1985  * \param[out] idarray  the buffer with the data to write.
1986  * \param[in]  fid      LLOG_CATALOGS_OID for CATALOG object
1987  *
1988  * \retval              0 on successful write of catalog IDs
1989  * \retval              negative value on error
1990  */
1991 int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
1992                           int idx, int count, struct llog_catid *idarray,
1993                           const struct lu_fid *fid)
1994 {
1995         struct llog_thread_info *lgi = llog_info(env);
1996         struct dt_object        *o = NULL;
1997         struct thandle          *th;
1998         int                      rc, size;
1999
2000         if (count == 0)
2001                 RETURN(0);
2002
2003         LASSERT(d);
2004
2005         size = sizeof(*idarray) * count;
2006         lgi->lgi_off = idx * sizeof(*idarray);
2007         lgi->lgi_fid = *fid;
2008
2009         o = dt_locate(env, d, &lgi->lgi_fid);
2010         if (IS_ERR(o))
2011                 RETURN(PTR_ERR(o));
2012
2013         if (!dt_object_exists(o))
2014                 GOTO(out, rc = -ENOENT);
2015
2016         rc = dt_attr_get(env, o, &lgi->lgi_attr);
2017         if (rc)
2018                 GOTO(out, rc);
2019
2020         if (!S_ISREG(lgi->lgi_attr.la_mode)) {
2021                 CERROR("%s: CATALOGS is not a regular file!: mode = %o\n",
2022                        o->do_lu.lo_dev->ld_obd->obd_name,
2023                        lgi->lgi_attr.la_mode);
2024                 GOTO(out, rc = -ENOENT);
2025         }
2026
2027         th = dt_trans_create(env, d);
2028         if (IS_ERR(th))
2029                 GOTO(out, rc = PTR_ERR(th));
2030
2031         lgi->lgi_buf.lb_len = size;
2032         lgi->lgi_buf.lb_buf = idarray;
2033         rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
2034         if (rc)
2035                 GOTO(out_trans, rc);
2036
2037         /* For update log, this happens during initialization,
2038          * see lod_sub_prep_llog(), and we need make sure catlog
2039          * file ID is written to catlist file(committed) before
2040          * cross-MDT operation write update records to catlog FILE,
2041          * otherwise, during failover these update records might
2042          * missing */
2043         if (fid_is_update_log(fid))
2044                 th->th_sync = 1;
2045
2046         rc = dt_trans_start_local(env, d, th);
2047         if (rc)
2048                 GOTO(out_trans, rc);
2049
2050         th->th_wait_submit = 1;
2051
2052         rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
2053         if (rc)
2054                 CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n",
2055                        idx, rc);
2056 out_trans:
2057         dt_trans_stop(env, d, th);
2058 out:
2059         lu_object_put(env, &o->do_lu);
2060         RETURN(rc);
2061 }
2062 EXPORT_SYMBOL(llog_osd_put_cat_list);