Whamcloud - gitweb
LU-7898 osd: remove unnecessary declarations
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32 /*
33  * lustre/obdclass/llog_osd.c
34  *
35  * Low level llog routines on top of OSD API
36  *
37  * This file provides set of methods for llog operations on top of
38  * dt_device. It contains all supported llog_operations interfaces and
39  * supplimental functions.
40  *
41  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
42  * Author: Mikhail Pershin <mike.pershin@intel.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_LOG
46
47 #include <dt_object.h>
48 #include <llog_swab.h>
49 #include <lustre_fid.h>
50 #include <obd.h>
51 #include <obd_class.h>
52
53 #include "llog_internal.h"
54 #include "local_storage.h"
55
56 /**
57  * Implementation of the llog_operations::lop_declare_create
58  *
59  * This function is a wrapper over local_storage API function
60  * local_object_declare_create().
61  *
62  * \param[in] env       execution environment
63  * \param[in] los       local_storage for bottom storage device
64  * \param[in] o         dt_object to create
65  * \param[in] th        current transaction handle
66  *
67  * \retval              0 on successful declaration of the new object
68  * \retval              negative error if declaration was failed
69  */
70 static int llog_osd_declare_new_object(const struct lu_env *env,
71                                        struct local_oid_storage *los,
72                                        struct dt_object *o,
73                                        struct thandle *th)
74 {
75         struct llog_thread_info *lgi = llog_info(env);
76
77         lgi->lgi_attr.la_valid = LA_MODE;
78         lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
79         lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
80
81         return local_object_declare_create(env, los, o, &lgi->lgi_attr,
82                                            &lgi->lgi_dof, th);
83 }
84
85 /**
86  * Implementation of the llog_operations::lop_create
87  *
88  * This function is a wrapper over local_storage API function
89  * local_object_create().
90  *
91  * \param[in] env       execution environment
92  * \param[in] los       local_storage for bottom storage device
93  * \param[in] o         dt_object to create
94  * \param[in] th        current transaction handle
95  *
96  * \retval              0 on successful creation of the new object
97  * \retval              negative error if creation was failed
98  */
99 static int llog_osd_create_new_object(const struct lu_env *env,
100                                       struct local_oid_storage *los,
101                                       struct dt_object *o,
102                                       struct thandle *th)
103 {
104         struct llog_thread_info *lgi = llog_info(env);
105
106         lgi->lgi_attr.la_valid = LA_MODE;
107         lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
108         lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
109
110         return local_object_create(env, los, o, &lgi->lgi_attr,
111                                    &lgi->lgi_dof, th);
112 }
113
114 /**
115  * Implementation of the llog_operations::lop_exist
116  *
117  * This function checks that llog exists on storage.
118  *
119  * \param[in] handle    llog handle of the current llog
120  *
121  * \retval              true if llog object exists and is not just destroyed
122  * \retval              false if llog doesn't exist or just destroyed
123  */
124 static int llog_osd_exist(struct llog_handle *handle)
125 {
126         LASSERT(handle->lgh_obj);
127         return dt_object_exists(handle->lgh_obj) &&
128                 !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
129 }
130
131 static void *rec_tail(struct llog_rec_hdr *rec)
132 {
133         return (void *)((char *)rec + rec->lrh_len -
134                         sizeof(struct llog_rec_tail));
135 }
136
137 /**
138  * Write a padding record to the llog
139  *
140  * This function writes a padding record to the end of llog. That may
141  * be needed if llog contains records of variable size, e.g. config logs
142  * or changelogs.
143  * The padding record just aligns llog to the llog chunk_size boundary if
144  * the current record doesn't fit in the remaining space.
145  *
146  * It allocates full length to avoid two separate writes for header and tail.
147  * Such 2-steps scheme needs extra protection and complex error handling.
148  *
149  * \param[in]     env   execution environment
150  * \param[in]     o     dt_object to create
151  * \param[in,out] off   pointer to the padding start offset
152  * \param[in]     len   padding length
153  * \param[in]     index index of the padding record in a llog
154  * \param[in]     th    current transaction handle
155  *
156  * \retval              0 on successful padding write
157  * \retval              negative error if write failed
158  */
159 static int llog_osd_pad(const struct lu_env *env, struct dt_object *o,
160                         loff_t *off, int len, int index, struct thandle *th)
161 {
162         struct llog_thread_info *lgi = llog_info(env);
163         struct llog_rec_hdr     *rec;
164         struct llog_rec_tail    *tail;
165         int                      rc;
166
167         ENTRY;
168
169         LASSERT(th);
170         LASSERT(off);
171         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
172
173         OBD_ALLOC(rec, len);
174         if (rec == NULL)
175                 RETURN(-ENOMEM);
176
177         rec->lrh_len = len;
178         rec->lrh_index = index;
179         rec->lrh_type = LLOG_PAD_MAGIC;
180
181         tail = rec_tail(rec);
182         tail->lrt_len = len;
183         tail->lrt_index = index;
184
185         lgi->lgi_buf.lb_buf = rec;
186         lgi->lgi_buf.lb_len = len;
187         rc = dt_record_write(env, o, &lgi->lgi_buf, off, th);
188         if (rc)
189                 CERROR("%s: error writing padding record: rc = %d\n",
190                        o->do_lu.lo_dev->ld_obd->obd_name, rc);
191
192         OBD_FREE(rec, len);
193         RETURN(rc);
194 }
195
196 /**
197  * Implementation of the llog_operations::lop_read_header
198  *
199  * This function reads the current llog header from the bottom storage
200  * device.
201  *
202  * \param[in] env       execution environment
203  * \param[in] handle    llog handle of the current llog
204  *
205  * \retval              0 on successful header read
206  * \retval              negative error if read failed
207  */
208 static int llog_osd_read_header(const struct lu_env *env,
209                                 struct llog_handle *handle)
210 {
211         struct llog_rec_hdr     *llh_hdr;
212         struct dt_object        *o;
213         struct llog_thread_info *lgi;
214         enum llog_flag           flags;
215         int                      rc;
216
217         ENTRY;
218
219         o = handle->lgh_obj;
220         LASSERT(o);
221
222         lgi = llog_info(env);
223
224         rc = dt_attr_get(env, o, &lgi->lgi_attr);
225         if (rc)
226                 RETURN(rc);
227
228         LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
229
230         if (lgi->lgi_attr.la_size == 0) {
231                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
232                 RETURN(LLOG_EEMPTY);
233         }
234
235         flags = handle->lgh_hdr->llh_flags;
236
237         lgi->lgi_off = 0;
238         lgi->lgi_buf.lb_buf = handle->lgh_hdr;
239         lgi->lgi_buf.lb_len = handle->lgh_hdr_size;
240         rc = dt_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
241         llh_hdr = &handle->lgh_hdr->llh_hdr;
242         if (rc < sizeof(*llh_hdr) || rc < llh_hdr->lrh_len) {
243                 CERROR("%s: error reading "DFID" log header size %d: rc = %d\n",
244                        o->do_lu.lo_dev->ld_obd->obd_name,
245                        PFID(lu_object_fid(&o->do_lu)), rc < 0 ? 0 : rc,
246                        -EFAULT);
247
248                 if (rc >= 0)
249                         rc = -EFAULT;
250
251                 RETURN(rc);
252         }
253
254         if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
255                 lustre_swab_llog_hdr(handle->lgh_hdr);
256
257         if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
258                 CERROR("%s: bad log %s "DFID" header magic: %#x "
259                        "(expected %#x)\n", o->do_lu.lo_dev->ld_obd->obd_name,
260                        handle->lgh_name ? handle->lgh_name : "",
261                        PFID(lu_object_fid(&o->do_lu)),
262                        llh_hdr->lrh_type, LLOG_HDR_MAGIC);
263                 RETURN(-EIO);
264         } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
265                    llh_hdr->lrh_len > handle->lgh_hdr_size) {
266                 CERROR("%s: incorrectly sized log %s "DFID" header: "
267                        "%#x (expected at least %#x)\n"
268                        "you may need to re-run lconf --write_conf.\n",
269                        o->do_lu.lo_dev->ld_obd->obd_name,
270                        handle->lgh_name ? handle->lgh_name : "",
271                        PFID(lu_object_fid(&o->do_lu)),
272                        llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
273                 RETURN(-EIO);
274         } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index >
275                    LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) ||
276                    LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len !=
277                         llh_hdr->lrh_len) {
278                 CERROR("%s: incorrectly sized log %s "DFID" tailer: "
279                        "%#x : rc = %d\n",
280                        o->do_lu.lo_dev->ld_obd->obd_name,
281                        handle->lgh_name ? handle->lgh_name : "",
282                        PFID(lu_object_fid(&o->do_lu)),
283                        LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO);
284                 RETURN(-EIO);
285         }
286
287         handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
288         handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
289         handle->lgh_write_offset = lgi->lgi_attr.la_size;
290
291         RETURN(0);
292 }
293
294 /**
295  * Implementation of the llog_operations::lop_declare_write
296  *
297  * This function declares the new record write.
298  *
299  * \param[in] env       execution environment
300  * \param[in] loghandle llog handle of the current llog
301  * \param[in] rec       llog record header. This is a real header of the full
302  *                      llog record to write. This is the beginning of buffer
303  *                      to write, the length of buffer is stored in
304  *                      \a rec::lrh_len
305  * \param[in] idx       index of the llog record. If \a idx == -1 then this is
306  *                      append case, otherwise \a idx is the index of record
307  *                      to modify
308  * \param[in] th        current transaction handle
309  *
310  * \retval              0 on successful declaration
311  * \retval              negative error if declaration failed
312  */
313 static int llog_osd_declare_write_rec(const struct lu_env *env,
314                                       struct llog_handle *loghandle,
315                                       struct llog_rec_hdr *rec,
316                                       int idx, struct thandle *th)
317 {
318         struct llog_thread_info *lgi = llog_info(env);
319         __u32                   chunk_size;
320         struct dt_object        *o;
321         int                      rc;
322
323         ENTRY;
324
325         LASSERT(env);
326         LASSERT(th);
327         LASSERT(loghandle);
328         LASSERT(rec);
329         LASSERT(rec->lrh_len <= loghandle->lgh_ctxt->loc_chunk_size);
330
331         o = loghandle->lgh_obj;
332         LASSERT(o);
333
334         chunk_size = loghandle->lgh_ctxt->loc_chunk_size;
335         lgi->lgi_buf.lb_len = chunk_size;
336         lgi->lgi_buf.lb_buf = NULL;
337         /* each time we update header */
338         rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
339                                      th);
340         if (rc || idx == 0) /* if error or just header */
341                 RETURN(rc);
342
343         /**
344          * the pad record can be inserted so take into account double
345          * record size
346          */
347         lgi->lgi_buf.lb_len = chunk_size * 2;
348         lgi->lgi_buf.lb_buf = NULL;
349         /* XXX: implement declared window or multi-chunks approach */
350         rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th);
351
352         RETURN(rc);
353 }
354
355 /**
356  * Implementation of the llog_operations::lop_write
357  *
358  * This function writes the new record in the llog or modify the existed one.
359  *
360  * \param[in]  env              execution environment
361  * \param[in]  loghandle        llog handle of the current llog
362  * \param[in]  rec              llog record header. This is a real header of
363  *                              the full llog record to write. This is
364  *                              the beginning of buffer to write, the length
365  *                              of buffer is stored in \a rec::lrh_len
366  * \param[out] reccookie        pointer to the cookie to return back if needed.
367  *                              It is used for further cancel of this llog
368  *                              record.
369  * \param[in]  idx              index of the llog record. If \a idx == -1 then
370  *                              this is append case, otherwise \a idx is
371  *                              the index of record to modify
372  * \param[in]  th               current transaction handle
373  *
374  * \retval                      0 on successful write && \a reccookie == NULL
375  *                              1 on successful write && \a reccookie != NULL
376  * \retval                      negative error if write failed
377  */
378 static int llog_osd_write_rec(const struct lu_env *env,
379                               struct llog_handle *loghandle,
380                               struct llog_rec_hdr *rec,
381                               struct llog_cookie *reccookie,
382                               int idx, struct thandle *th)
383 {
384         struct llog_thread_info *lgi = llog_info(env);
385         struct llog_log_hdr     *llh;
386         int                      reclen = rec->lrh_len;
387         int                      index, rc;
388         struct llog_rec_tail    *lrt;
389         struct dt_object        *o;
390         __u32                   chunk_size;
391         size_t                   left;
392         __u32                   orig_last_idx;
393         __u64                   orig_write_offset;
394         ENTRY;
395
396         llh = loghandle->lgh_hdr;
397         o = loghandle->lgh_obj;
398
399         chunk_size = llh->llh_hdr.lrh_len;
400         CDEBUG(D_OTHER, "new record %x to "DFID"\n",
401                rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
402
403         if (!llog_osd_exist(loghandle))
404                 RETURN(-ENOENT);
405
406         /* record length should not bigger than  */
407         if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
408                 RETURN(-E2BIG);
409
410         /* sanity check for fixed-records llog */
411         if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
412                 LASSERT(llh->llh_size != 0);
413                 LASSERT(llh->llh_size == reclen);
414         }
415
416         rc = dt_attr_get(env, o, &lgi->lgi_attr);
417         if (rc)
418                 RETURN(rc);
419
420         /**
421          * The modification case.
422          * If idx set then the record with that index must be modified.
423          * There are three cases possible:
424          * 1) the common case is the llog header update (idx == 0)
425          * 2) the llog record modification during llog process.
426          *    This is indicated by the \a loghandle::lgh_cur_idx > 0.
427          *    In that case the \a loghandle::lgh_cur_offset
428          * 3) otherwise this is assumed that llog consist of records of
429          *    fixed size, i.e. catalog. The llog header must has llh_size
430          *    field equal to record size. The record offset is calculated
431          *    just by /a idx value
432          *
433          * During modification we don't need extra header update because
434          * the bitmap and record count are not changed. The record header
435          * and tail remains the same too.
436          */
437         if (idx != LLOG_NEXT_IDX) {
438                 /* llog can be empty only when first record is being written */
439                 LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0));
440
441                 if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
442                         CERROR("%s: modify unset record %u\n",
443                                o->do_lu.lo_dev->ld_obd->obd_name, idx);
444                         RETURN(-ENOENT);
445                 }
446
447                 if (idx != rec->lrh_index) {
448                         CERROR("%s: modify index mismatch %d %u\n",
449                                o->do_lu.lo_dev->ld_obd->obd_name, idx,
450                                rec->lrh_index);
451                         RETURN(-EFAULT);
452                 }
453
454                 if (idx == LLOG_HEADER_IDX) {
455                         /* llog header update */
456                         __u32   *bitmap = LLOG_HDR_BITMAP(llh);
457
458                         lgi->lgi_off = 0;
459
460                         /* If it does not indicate the bitmap index
461                          * (reccookie == NULL), then it means update
462                          * the whole update header. Otherwise only
463                          * update header and bits needs to be updated,
464                          * and in DNE cases, it will signaficantly
465                          * shrink the RPC size.
466                          * see distribute_txn_cancel_records()*/
467                         if (reccookie == NULL) {
468                                 lgi->lgi_buf.lb_len = reclen;
469                                 lgi->lgi_buf.lb_buf = rec;
470                                 rc = dt_record_write(env, o, &lgi->lgi_buf,
471                                                      &lgi->lgi_off, th);
472                                 RETURN(rc);
473                         }
474
475                         /* update the header */
476                         lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
477                         lgi->lgi_buf.lb_buf = llh;
478                         rc = dt_record_write(env, o, &lgi->lgi_buf,
479                                              &lgi->lgi_off, th);
480                         if (rc != 0)
481                                 RETURN(rc);
482
483                         /* update the bitmap */
484                         index = reccookie->lgc_index;
485                         lgi->lgi_off = llh->llh_bitmap_offset +
486                                       (index / (sizeof(*bitmap) * 8)) *
487                                                         sizeof(*bitmap);
488                         lgi->lgi_buf.lb_len = sizeof(*bitmap);
489                         lgi->lgi_buf.lb_buf =
490                                         &bitmap[index/(sizeof(*bitmap)*8)];
491                         rc = dt_record_write(env, o, &lgi->lgi_buf,
492                                              &lgi->lgi_off, th);
493
494                         RETURN(rc);
495                 } else if (loghandle->lgh_cur_idx > 0) {
496                         /**
497                          * The lgh_cur_offset can be used only if index is
498                          * the same.
499                          */
500                         if (idx != loghandle->lgh_cur_idx) {
501                                 CERROR("%s: modify index mismatch %d %d\n",
502                                        o->do_lu.lo_dev->ld_obd->obd_name, idx,
503                                        loghandle->lgh_cur_idx);
504                                 RETURN(-EFAULT);
505                         }
506
507                         lgi->lgi_off = loghandle->lgh_cur_offset;
508                         CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, "
509                                "len:%u offset %llu\n",
510                                POSTID(&loghandle->lgh_id.lgl_oi), idx,
511                                rec->lrh_len, (long long)lgi->lgi_off);
512                 } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
513                         lgi->lgi_off = llh->llh_hdr.lrh_len +
514                                        (idx - 1) * reclen;
515                 } else {
516                         /* This can be result of lgh_cur_idx is not set during
517                          * llog processing or llh_size is not set to proper
518                          * record size for fixed records llog. Therefore it is
519                          * impossible to get record offset. */
520                         CERROR("%s: can't get record offset, idx:%d, "
521                                "len:%u.\n", o->do_lu.lo_dev->ld_obd->obd_name,
522                                idx, rec->lrh_len);
523                         RETURN(-EFAULT);
524                 }
525
526                 /* update only data, header and tail remain the same */
527                 lgi->lgi_off += sizeof(struct llog_rec_hdr);
528                 lgi->lgi_buf.lb_len = REC_DATA_LEN(rec);
529                 lgi->lgi_buf.lb_buf = REC_DATA(rec);
530                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
531                 if (rc == 0 && reccookie) {
532                         reccookie->lgc_lgl = loghandle->lgh_id;
533                         reccookie->lgc_index = idx;
534                         rc = 1;
535                 }
536                 RETURN(rc);
537         }
538
539         /**
540          * The append case.
541          * The most common case of using llog. The new index is assigned to
542          * the new record, new bit is set in llog bitmap and llog count is
543          * incremented.
544          *
545          * Make sure that records don't cross a chunk boundary, so we can
546          * process them page-at-a-time if needed.  If it will cross a chunk
547          * boundary, write in a fake (but referenced) entry to pad the chunk.
548          */
549
550
551         /* simulate ENOSPC when new plain llog is being added to the
552          * catalog */
553         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) &&
554             llh->llh_flags & LLOG_F_IS_CAT)
555                 RETURN(-ENOSPC);
556
557         LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
558         orig_last_idx = loghandle->lgh_last_idx;
559         orig_write_offset = loghandle->lgh_write_offset;
560         lgi->lgi_off = lgi->lgi_attr.la_size;
561         left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
562         /* NOTE: padding is a record, but no bit is set */
563         if (left != 0 && left != reclen &&
564             left < (reclen + LLOG_MIN_REC_SIZE)) {
565                 index = loghandle->lgh_last_idx + 1;
566                 rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
567                 if (rc)
568                         RETURN(rc);
569
570                 if (dt_object_remote(o))
571                         loghandle->lgh_write_offset = lgi->lgi_off;
572
573                 loghandle->lgh_last_idx++; /* for pad rec */
574         }
575         /* if it's the last idx in log file, then return -ENOSPC
576          * or wrap around if a catalog */
577         if (llog_is_full(loghandle) ||
578             unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
579                      OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
580                      loghandle->lgh_last_idx >= cfs_fail_val)) {
581                 if (llh->llh_flags & LLOG_F_IS_CAT)
582                         loghandle->lgh_last_idx = 0;
583                 else
584                         RETURN(-ENOSPC);
585         }
586
587         /* increment the last_idx along with llh_tail index, they should
588          * be equal for a llog lifetime */
589         loghandle->lgh_last_idx++;
590         index = loghandle->lgh_last_idx;
591         LLOG_HDR_TAIL(llh)->lrt_index = index;
592         /**
593          * NB: the caller should make sure only 1 process access
594          * the lgh_last_idx, e.g. append should be exclusive.
595          * Otherwise it might hit the assert.
596          */
597         LASSERT(index < LLOG_HDR_BITMAP_SIZE(llh));
598         rec->lrh_index = index;
599         lrt = rec_tail(rec);
600         lrt->lrt_len = rec->lrh_len;
601         lrt->lrt_index = rec->lrh_index;
602
603         /* the lgh_hdr_mutex protects llog header data from concurrent
604          * update/cancel, the llh_count and llh_bitmap are protected */
605         mutex_lock(&loghandle->lgh_hdr_mutex);
606         if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) {
607                 CERROR("%s: index %u already set in log bitmap\n",
608                        o->do_lu.lo_dev->ld_obd->obd_name, index);
609                 mutex_unlock(&loghandle->lgh_hdr_mutex);
610                 LBUG(); /* should never happen */
611         }
612         llh->llh_count++;
613
614         if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
615                 /* Update the minimum size of the llog record */
616                 if (llh->llh_size == 0)
617                         llh->llh_size = reclen;
618                 else if (reclen < llh->llh_size)
619                         llh->llh_size = reclen;
620         }
621
622         if (lgi->lgi_attr.la_size == 0) {
623                 lgi->lgi_off = 0;
624                 lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
625                 lgi->lgi_buf.lb_buf = &llh->llh_hdr;
626                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
627                 if (rc != 0)
628                         GOTO(out_unlock, rc);
629         } else {
630                 __u32   *bitmap = LLOG_HDR_BITMAP(llh);
631
632                 /* Note: If this is not initialization (size == 0), then do not
633                  * write the whole header (8k bytes), only update header/tail
634                  * and bits needs to be updated. Because this update might be
635                  * part of cross-MDT operation, which needs to write these
636                  * updates into the update log(32KB limit) and also pack inside
637                  * the RPC (1MB limit), if we write 8K for each operation, which
638                  * will cost a lot space, and keep us adding more updates to one
639                  * update log.*/
640                 lgi->lgi_off = 0;
641                 lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
642                 lgi->lgi_buf.lb_buf = &llh->llh_hdr;
643                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
644                 if (rc != 0)
645                         GOTO(out_unlock, rc);
646
647                 lgi->lgi_off = llh->llh_bitmap_offset +
648                               (index / (sizeof(*bitmap) * 8)) * sizeof(*bitmap);
649                 lgi->lgi_buf.lb_len = sizeof(*bitmap);
650                 lgi->lgi_buf.lb_buf = &bitmap[index/(sizeof(*bitmap)*8)];
651                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
652                 if (rc != 0)
653                         GOTO(out_unlock, rc);
654
655                 lgi->lgi_off =  (unsigned long)LLOG_HDR_TAIL(llh) -
656                                 (unsigned long)llh;
657                 lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
658                 lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh);
659                 rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
660                 if (rc != 0)
661                         GOTO(out_unlock, rc);
662         }
663
664 out_unlock:
665         /* unlock here for remote object */
666         mutex_unlock(&loghandle->lgh_hdr_mutex);
667         if (rc)
668                 GOTO(out, rc);
669
670         /* computed index can be used to determine offset for fixed-size
671          * records. This also allows to handle Catalog wrap around case */
672         if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
673                 lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
674         } else if (dt_object_remote(o)) {
675                 lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
676                                      lgi->lgi_off);
677         } else {
678                 rc = dt_attr_get(env, o, &lgi->lgi_attr);
679                 if (rc)
680                         GOTO(out, rc);
681
682                 LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
683                 lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size,
684                                      lgi->lgi_off);
685         }
686
687         lgi->lgi_buf.lb_len = reclen;
688         lgi->lgi_buf.lb_buf = rec;
689         rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
690         if (rc < 0)
691                 GOTO(out, rc);
692
693         if (dt_object_remote(o))
694                 loghandle->lgh_write_offset = lgi->lgi_off;
695
696         CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n",
697                PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
698                lgi->lgi_off);
699         if (reccookie != NULL) {
700                 reccookie->lgc_lgl = loghandle->lgh_id;
701                 reccookie->lgc_index = index;
702                 if ((rec->lrh_type == MDS_UNLINK_REC) ||
703                     (rec->lrh_type == MDS_SETATTR64_REC))
704                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
705                 else if (rec->lrh_type == OST_SZ_REC)
706                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
707                 else
708                         reccookie->lgc_subsys = -1;
709                 rc = 1;
710         }
711         RETURN(rc);
712 out:
713         /* cleanup llog for error case */
714         mutex_lock(&loghandle->lgh_hdr_mutex);
715         ext2_clear_bit(index, LLOG_HDR_BITMAP(llh));
716         llh->llh_count--;
717         mutex_unlock(&loghandle->lgh_hdr_mutex);
718
719         /* restore llog last_idx */
720         if (dt_object_remote(o)) {
721                 loghandle->lgh_last_idx = orig_last_idx;
722                 loghandle->lgh_write_offset = orig_write_offset;
723         } else if (--loghandle->lgh_last_idx == 0 &&
724             (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
725                 /* catalog had just wrap-around case */
726                 loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
727         }
728
729         LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
730
731         RETURN(rc);
732 }
733
734 /**
735  * We can skip reading at least as many log blocks as the number of
736  * minimum sized log records we are skipping.  If it turns out
737  * that we are not far enough along the log (because the
738  * actual records are larger than minimum size) we just skip
739  * some more records.
740  *
741  * Note: in llog_process_thread, it will use bitmap offset as
742  * the index to locate the record, which also includs some pad
743  * records, whose record size is very small, and it also does not
744  * consider pad record when recording minimum record size (otherwise
745  * min_record size might be too small), so in some rare cases,
746  * it might skip too much record for @goal, see llog_osd_next_block().
747  *
748  * When force_mini_rec is true, it means we have to use LLOG_MIN_REC_SIZE
749  * as the min record size to skip over, usually because in the previous
750  * try, it skip too much record, see loog_osd_next(prev)_block().
751  */
752 static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off,
753                                   int curr, int goal, __u32 chunk_size,
754                                   bool force_mini_rec)
755 {
756         struct llog_log_hdr *llh = lgh->lgh_hdr;
757
758         /* Goal should not bigger than the record count */
759         if (goal > lgh->lgh_last_idx)
760                 goal = lgh->lgh_last_idx;
761
762         if (goal > curr) {
763                 if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
764                         *off = chunk_size + (goal - 1) * llh->llh_size;
765                 } else {
766                         __u64 min_rec_size = LLOG_MIN_REC_SIZE;
767
768                         if (llh->llh_size > 0 && !force_mini_rec)
769                                 min_rec_size = llh->llh_size;
770
771                         *off = *off + (goal - curr - 1) * min_rec_size;
772                 }
773         }
774         /* always align with lower chunk boundary*/
775         *off &= ~(chunk_size - 1);
776 }
777
778 /**
779  * Remove optional fields that the client doesn't expect.
780  * This is typically in order to ensure compatibility with older clients.
781  * It is assumed that since we exclusively remove fields, the block will be
782  * big enough to handle the remapped records. It is also assumed that records
783  * of a block have the same format (i.e.: the same features enabled).
784  *
785  * \param[in,out]    hdr        Header of the block of records to remap.
786  * \param[in,out]    last_hdr   Last header, don't read past this point.
787  * \param[in]        flags      Flags describing the fields to keep.
788  */
789 static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
790                                      struct llog_rec_hdr *last_hdr,
791                                      enum changelog_rec_flags flags)
792 {
793         if (hdr->lrh_type != CHANGELOG_REC)
794                 return;
795
796         do {
797                 struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
798
799                 changelog_remap_rec(rec, rec->cr_flags & flags);
800                 hdr = llog_rec_hdr_next(hdr);
801         } while ((char *)hdr <= (char *)last_hdr);
802 }
803
804 /**
805  * Implementation of the llog_operations::lop_next_block
806  *
807  * This function finds the the next llog block to return which contains
808  * record with required index. It is main part of llog processing.
809  *
810  * \param[in]     env           execution environment
811  * \param[in]     loghandle     llog handle of the current llog
812  * \param[in,out] cur_idx       index preceeding cur_offset
813  * \param[in]     next_idx      target index to find
814  * \param[in,out] cur_offset    furtherst point read in the file
815  * \param[in]     buf           pointer to data buffer to fill
816  * \param[in]     len           required len to read, it is
817  *                              usually llog chunk_size.
818  *
819  * \retval                      0 on successful buffer read
820  * \retval                      negative value on error
821  */
822 static int llog_osd_next_block(const struct lu_env *env,
823                                struct llog_handle *loghandle, int *cur_idx,
824                                int next_idx, __u64 *cur_offset, void *buf,
825                                int len)
826 {
827         struct llog_thread_info *lgi = llog_info(env);
828         struct dt_object        *o;
829         struct dt_device        *dt;
830         int                      rc;
831         __u32                   chunk_size;
832         int last_idx = *cur_idx;
833         __u64 last_offset = *cur_offset;
834         bool force_mini_rec = false;
835
836         ENTRY;
837
838         LASSERT(env);
839         LASSERT(lgi);
840
841         chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
842         if (len == 0 || len & (chunk_size - 1))
843                 RETURN(-EINVAL);
844
845         LASSERT(loghandle);
846         LASSERT(loghandle->lgh_ctxt);
847
848         o = loghandle->lgh_obj;
849         LASSERT(o);
850         LASSERT(dt_object_exists(o));
851         dt = lu2dt_dev(o->do_lu.lo_dev);
852         LASSERT(dt);
853
854         rc = dt_attr_get(env, o, &lgi->lgi_attr);
855         if (rc)
856                 GOTO(out, rc);
857
858         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off"
859                "%llu), size %llu\n", next_idx, *cur_idx,
860                *cur_offset, lgi->lgi_attr.la_size);
861
862         while (*cur_offset < lgi->lgi_attr.la_size) {
863                 struct llog_rec_hdr     *rec, *last_rec;
864                 struct llog_rec_tail    *tail;
865
866                 llog_skip_over(loghandle, cur_offset, *cur_idx,
867                                next_idx, chunk_size, force_mini_rec);
868
869                 /* read up to next llog chunk_size block */
870                 lgi->lgi_buf.lb_len = chunk_size -
871                                       (*cur_offset & (chunk_size - 1));
872                 lgi->lgi_buf.lb_buf = buf;
873
874                 rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
875                 if (rc < 0) {
876                         if (rc == -EBADR && !force_mini_rec)
877                                 goto retry;
878
879                         CERROR("%s: can't read llog block from log "DFID
880                                " offset %llu: rc = %d\n",
881                                o->do_lu.lo_dev->ld_obd->obd_name,
882                                PFID(lu_object_fid(&o->do_lu)), *cur_offset,
883                                rc);
884                         GOTO(out, rc);
885                 }
886
887                 if (rc < len) {
888                         /* signal the end of the valid buffer to
889                          * llog_process */
890                         memset(buf + rc, 0, len - rc);
891                 }
892
893                 if (rc == 0) { /* end of file, nothing to do */
894                         if (!force_mini_rec)
895                                 goto retry;
896                         GOTO(out, rc);
897                 }
898
899                 if (rc < sizeof(*tail)) {
900                         if (!force_mini_rec)
901                                 goto retry;
902
903                         CERROR("%s: invalid llog block at log id "DOSTID"/%u "
904                                "offset %llu\n",
905                                o->do_lu.lo_dev->ld_obd->obd_name,
906                                POSTID(&loghandle->lgh_id.lgl_oi),
907                                loghandle->lgh_id.lgl_ogen, *cur_offset);
908                         GOTO(out, rc = -EINVAL);
909                 }
910
911                 rec = buf;
912                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
913                         lustre_swab_llog_rec(rec);
914
915                 tail = (struct llog_rec_tail *)((char *)buf + rc -
916                                                 sizeof(struct llog_rec_tail));
917                 /* get the last record in block */
918                 last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
919                                                    tail->lrt_len);
920
921                 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
922                         lustre_swab_llog_rec(last_rec);
923
924                 if (last_rec->lrh_index != tail->lrt_index) {
925                         CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
926                                "offset %llu last_rec idx %u tail idx %u\n",
927                                o->do_lu.lo_dev->ld_obd->obd_name,
928                                POSTID(&loghandle->lgh_id.lgl_oi),
929                                loghandle->lgh_id.lgl_ogen, *cur_offset,
930                                last_rec->lrh_index, tail->lrt_index);
931                         GOTO(out, rc = -EINVAL);
932                 }
933
934                 *cur_idx = tail->lrt_index;
935
936                 /* this shouldn't happen */
937                 if (tail->lrt_index == 0) {
938                         CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
939                                "offset %llu bytes %d\n",
940                                o->do_lu.lo_dev->ld_obd->obd_name,
941                                POSTID(&loghandle->lgh_id.lgl_oi),
942                                loghandle->lgh_id.lgl_ogen, *cur_offset, rc);
943                         GOTO(out, rc = -EINVAL);
944                 }
945                 if (tail->lrt_index < next_idx) {
946                         last_idx = *cur_idx;
947                         last_offset = *cur_offset;
948                         continue;
949                 }
950
951                 /* sanity check that the start of the new buffer is no farther
952                  * than the record that we wanted.  This shouldn't happen. */
953                 if (rec->lrh_index > next_idx) {
954                         if (!force_mini_rec && next_idx > last_idx)
955                                 goto retry;
956
957                         CERROR("%s: missed desired record? %u > %u\n",
958                                o->do_lu.lo_dev->ld_obd->obd_name,
959                                rec->lrh_index, next_idx);
960                         GOTO(out, rc = -ENOENT);
961                 }
962
963                 /* Trim unsupported extensions for compat w/ older clients */
964                 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
965                         changelog_block_trim_ext(rec, last_rec,
966                                                  CLF_VERSION | CLF_RENAME);
967
968                 GOTO(out, rc = 0);
969
970 retry:
971                 /* Note: because there are some pad records in the
972                  * llog, so llog_skip_over() might skip too much
973                  * records, let's try skip again with minimum record */
974                 force_mini_rec = true;
975                 *cur_offset = last_offset;
976                 *cur_idx = last_idx;
977         }
978         GOTO(out, rc = -EIO);
979 out:
980         return rc;
981 }
982
983 /**
984  * Implementation of the llog_operations::lop_prev_block
985  *
986  * This function finds the llog block to return which contains
987  * record with required index but in reverse order - from end of llog
988  * to the beginning.
989  * It is main part of reverse llog processing.
990  *
991  * \param[in] env       execution environment
992  * \param[in] loghandle llog handle of the current llog
993  * \param[in] prev_idx  target index to find
994  * \param[in] buf       pointer to data buffer to fill
995  * \param[in] len       required len to read, it is llog_chunk_size usually.
996  *
997  * \retval              0 on successful buffer read
998  * \retval              negative value on error
999  */
1000 static int llog_osd_prev_block(const struct lu_env *env,
1001                                struct llog_handle *loghandle,
1002                                int prev_idx, void *buf, int len)
1003 {
1004         struct llog_thread_info *lgi = llog_info(env);
1005         struct dt_object        *o;
1006         struct dt_device        *dt;
1007         loff_t                   cur_offset;
1008         __u32                   chunk_size;
1009         int                      rc;
1010
1011         ENTRY;
1012
1013         chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
1014         if (len == 0 || len & (chunk_size - 1))
1015                 RETURN(-EINVAL);
1016
1017         CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
1018
1019         LASSERT(loghandle);
1020         LASSERT(loghandle->lgh_ctxt);
1021
1022         o = loghandle->lgh_obj;
1023         LASSERT(o);
1024         LASSERT(dt_object_exists(o));
1025         dt = lu2dt_dev(o->do_lu.lo_dev);
1026         LASSERT(dt);
1027
1028         /* Let's only use mini record size for previous block read
1029          * for now XXX */
1030         cur_offset = chunk_size;
1031         llog_skip_over(loghandle, &cur_offset, 0, prev_idx,
1032                        chunk_size, true);
1033
1034         rc = dt_attr_get(env, o, &lgi->lgi_attr);
1035         if (rc)
1036                 GOTO(out, rc);
1037
1038         while (cur_offset < lgi->lgi_attr.la_size) {
1039                 struct llog_rec_hdr     *rec, *last_rec;
1040                 struct llog_rec_tail    *tail;
1041
1042                 lgi->lgi_buf.lb_len = len;
1043                 lgi->lgi_buf.lb_buf = buf;
1044                 rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
1045                 if (rc < 0) {
1046                         CERROR("%s: can't read llog block from log "DFID
1047                                " offset %llu: rc = %d\n",
1048                                o->do_lu.lo_dev->ld_obd->obd_name,
1049                                PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
1050                         GOTO(out, rc);
1051                 }
1052
1053                 if (rc == 0) /* end of file, nothing to do */
1054                         GOTO(out, rc);
1055
1056                 if (rc < sizeof(*tail)) {
1057                         CERROR("%s: invalid llog block at log id "DOSTID"/%u "
1058                                "offset %llu\n",
1059                                o->do_lu.lo_dev->ld_obd->obd_name,
1060                                POSTID(&loghandle->lgh_id.lgl_oi),
1061                                loghandle->lgh_id.lgl_ogen, cur_offset);
1062                         GOTO(out, rc = -EINVAL);
1063                 }
1064
1065                 rec = buf;
1066                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
1067                         lustre_swab_llog_rec(rec);
1068
1069                 tail = (struct llog_rec_tail *)((char *)buf + rc -
1070                                                 sizeof(struct llog_rec_tail));
1071                 /* get the last record in block */
1072                 last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
1073                                                    le32_to_cpu(tail->lrt_len));
1074
1075                 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
1076                         lustre_swab_llog_rec(last_rec);
1077                 LASSERT(last_rec->lrh_index == tail->lrt_index);
1078
1079                 /* this shouldn't happen */
1080                 if (tail->lrt_index == 0) {
1081                         CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
1082                                "offset %llu\n",
1083                                o->do_lu.lo_dev->ld_obd->obd_name,
1084                                POSTID(&loghandle->lgh_id.lgl_oi),
1085                                loghandle->lgh_id.lgl_ogen, cur_offset);
1086                         GOTO(out, rc = -EINVAL);
1087                 }
1088                 if (tail->lrt_index < prev_idx)
1089                         continue;
1090
1091                 /* sanity check that the start of the new buffer is no farther
1092                  * than the record that we wanted.  This shouldn't happen. */
1093                 if (rec->lrh_index > prev_idx) {
1094                         CERROR("%s: missed desired record? %u > %u\n",
1095                                o->do_lu.lo_dev->ld_obd->obd_name,
1096                                rec->lrh_index, prev_idx);
1097                         GOTO(out, rc = -ENOENT);
1098                 }
1099
1100                 /* Trim unsupported extensions for compat w/ older clients */
1101                 if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
1102                         changelog_block_trim_ext(rec, last_rec,
1103                                                  CLF_VERSION | CLF_RENAME);
1104
1105                 GOTO(out, rc = 0);
1106         }
1107         GOTO(out, rc = -EIO);
1108 out:
1109         return rc;
1110 }
1111
1112 /**
1113  * This is helper function to get llog directory object. It is used by named
1114  * llog operations to find/insert/delete llog entry from llog directory.
1115  *
1116  * \param[in] env       execution environment
1117  * \param[in] ctxt      llog context
1118  *
1119  * \retval              dt_object of llog directory
1120  * \retval              ERR_PTR of negative value on error
1121  */
1122 static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
1123                                           struct llog_ctxt *ctxt)
1124 {
1125         struct dt_device        *dt;
1126         struct dt_thread_info   *dti = dt_info(env);
1127         struct dt_object        *dir;
1128         int                      rc;
1129
1130         dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
1131         if (ctxt->loc_dir == NULL) {
1132                 rc = dt_root_get(env, dt, &dti->dti_fid);
1133                 if (rc)
1134                         return ERR_PTR(rc);
1135                 dir = dt_locate(env, dt, &dti->dti_fid);
1136
1137                 if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) {
1138                         lu_object_put(env, &dir->do_lu);
1139                         return ERR_PTR(-ENOTDIR);
1140                 }
1141         } else {
1142                 lu_object_get(&ctxt->loc_dir->do_lu);
1143                 dir = ctxt->loc_dir;
1144         }
1145
1146         return dir;
1147 }
1148
1149 /**
1150  * Implementation of the llog_operations::lop_open
1151  *
1152  * This function opens the llog by its logid or by name, it may open also
1153  * non existent llog and assing then new id to it.
1154  * The llog_open/llog_close pair works similar to lu_object_find/put,
1155  * the object may not exist prior open. The result of open is just dt_object
1156  * in the llog header.
1157  *
1158  * \param[in] env               execution environment
1159  * \param[in] handle            llog handle of the current llog
1160  * \param[in] logid             logid of llog to open (nameless llog)
1161  * \param[in] name              name of llog to open (named llog)
1162  * \param[in] open_param
1163  *                              LLOG_OPEN_NEW - new llog, may not exist
1164  *                              LLOG_OPEN_EXIST - old llog, must exist
1165  *
1166  * \retval                      0 on successful open, llog_handle::lgh_obj
1167  *                              contains the dt_object of the llog.
1168  * \retval                      negative value on error
1169  */
1170 static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
1171                          struct llog_logid *logid, char *name,
1172                          enum llog_open_param open_param)
1173 {
1174         struct llog_thread_info         *lgi = llog_info(env);
1175         struct llog_ctxt                *ctxt = handle->lgh_ctxt;
1176         struct dt_object                *o;
1177         struct dt_device                *dt;
1178         struct ls_device                *ls;
1179         struct local_oid_storage        *los = NULL;
1180         int                              rc = 0;
1181         bool new_id = false;
1182
1183         ENTRY;
1184
1185         LASSERT(env);
1186         LASSERT(ctxt);
1187         LASSERT(ctxt->loc_exp);
1188         LASSERT(ctxt->loc_exp->exp_obd);
1189         dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
1190         LASSERT(dt);
1191         if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1192                 struct lu_object_conf conf = { 0 };
1193                 if (logid != NULL) {
1194                         logid_to_fid(logid, &lgi->lgi_fid);
1195                 } else {
1196                         /* If logid == NULL, then it means the caller needs
1197                          * to allocate new FID (llog_cat_declare_add_rec()). */
1198                         rc = obd_fid_alloc(env, ctxt->loc_exp,
1199                                            &lgi->lgi_fid, NULL);
1200                         if (rc < 0)
1201                                 RETURN(rc);
1202                         rc = 0;
1203                         conf.loc_flags = LOC_F_NEW;
1204                 }
1205
1206                 o = dt_locate_at(env, dt, &lgi->lgi_fid,
1207                                  dt->dd_lu_dev.ld_site->ls_top_dev, &conf);
1208                 if (IS_ERR(o))
1209                         RETURN(PTR_ERR(o));
1210
1211                 goto after_open;
1212         }
1213
1214         ls = ls_device_get(dt);
1215         if (IS_ERR(ls))
1216                 RETURN(PTR_ERR(ls));
1217
1218         mutex_lock(&ls->ls_los_mutex);
1219         los = dt_los_find(ls, name != NULL ? FID_SEQ_LLOG_NAME : FID_SEQ_LLOG);
1220         mutex_unlock(&ls->ls_los_mutex);
1221         LASSERT(los);
1222         ls_device_put(env, ls);
1223
1224         LASSERT(handle);
1225
1226         if (logid != NULL) {
1227                 logid_to_fid(logid, &lgi->lgi_fid);
1228         } else if (name) {
1229                 struct dt_object *llog_dir;
1230
1231                 llog_dir = llog_osd_dir_get(env, ctxt);
1232                 if (IS_ERR(llog_dir))
1233                         GOTO(out, rc = PTR_ERR(llog_dir));
1234                 dt_read_lock(env, llog_dir, 0);
1235                 rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid);
1236                 dt_read_unlock(env, llog_dir);
1237                 lu_object_put(env, &llog_dir->do_lu);
1238                 if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
1239                         /* generate fid for new llog */
1240                         rc = local_object_fid_generate(env, los,
1241                                                        &lgi->lgi_fid);
1242                         new_id = true;
1243                 }
1244                 if (rc < 0)
1245                         GOTO(out, rc);
1246                 OBD_ALLOC(handle->lgh_name, strlen(name) + 1);
1247                 if (handle->lgh_name)
1248                         strcpy(handle->lgh_name, name);
1249                 else
1250                         GOTO(out, rc = -ENOMEM);
1251         } else {
1252                 LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param);
1253                 /* generate fid for new llog */
1254 generate:
1255                 rc = local_object_fid_generate(env, los, &lgi->lgi_fid);
1256                 if (rc < 0)
1257                         GOTO(out, rc);
1258                 new_id = true;
1259         }
1260
1261         o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
1262         if (IS_ERR(o))
1263                 GOTO(out_name, rc = PTR_ERR(o));
1264
1265         if (dt_object_exists(o) && new_id) {
1266                 /* llog exists with just generated ID, e.g. some old llog file
1267                  * still is in use or is orphan, drop a warn and skip it. */
1268                 CDEBUG(D_INFO, "%s: llog exists with the same FID: "DFID
1269                        ", skipping\n",
1270                        o->do_lu.lo_dev->ld_obd->obd_name,
1271                        PFID(lu_object_fid(&o->do_lu)));
1272                 lu_object_put(env, &o->do_lu);
1273                 /* just skip this llog ID, we shouldn't delete it because we
1274                  * don't know exactly what is its purpose and state. */
1275                 goto generate;
1276         }
1277
1278 after_open:
1279         /* No new llog is expected but doesn't exist */
1280         if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
1281                 GOTO(out_put, rc = -ENOENT);
1282
1283         fid_to_logid(&lgi->lgi_fid, &handle->lgh_id);
1284         handle->lgh_obj = o;
1285         handle->private_data = los;
1286         LASSERT(handle->lgh_ctxt);
1287
1288         RETURN(rc);
1289
1290 out_put:
1291         lu_object_put(env, &o->do_lu);
1292 out_name:
1293         if (handle->lgh_name != NULL)
1294                 OBD_FREE(handle->lgh_name, strlen(name) + 1);
1295 out:
1296         if (los != NULL)
1297                 dt_los_put(los);
1298         RETURN(rc);
1299 }
1300
1301 /**
1302  * Get dir for regular fid log object
1303  *
1304  * Get directory for regular fid log object, and these regular fid log
1305  * object will be inserted under this directory, to satisfy the FS
1306  * consistency check, e2fsck etc.
1307  *
1308  * \param [in] env      execution environment
1309  * \param [in] dto      llog object
1310  *
1311  * \retval              pointer to the directory if it is found.
1312  * \retval              ERR_PTR(negative errno) if it fails.
1313  */
1314 struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env,
1315                                                struct dt_object *dto)
1316 {
1317         struct llog_thread_info *lgi = llog_info(env);
1318         struct seq_server_site *ss = dto->do_lu.lo_dev->ld_site->ld_seq_site;
1319         struct lu_seq_range     *range = &lgi->lgi_range;
1320         struct lu_fid           *dir_fid = &lgi->lgi_fid;
1321         struct dt_object        *dir;
1322         int                     rc;
1323         ENTRY;
1324
1325         fld_range_set_any(range);
1326         LASSERT(ss != NULL);
1327         rc = ss->ss_server_fld->lsf_seq_lookup(env, ss->ss_server_fld,
1328                                    fid_seq(lu_object_fid(&dto->do_lu)), range);
1329         if (rc < 0)
1330                 RETURN(ERR_PTR(rc));
1331
1332         lu_update_log_dir_fid(dir_fid, range->lsr_index);
1333         dir = dt_locate(env, lu2dt_dev(dto->do_lu.lo_dev), dir_fid);
1334         if (IS_ERR(dir))
1335                 RETURN(dir);
1336
1337         if (!dt_try_as_dir(env, dir)) {
1338                 lu_object_put(env, &dir->do_lu);
1339                 RETURN(ERR_PTR(-ENOTDIR));
1340         }
1341
1342         RETURN(dir);
1343 }
1344
1345 /**
1346  * Add llog object with regular FID to name entry
1347  *
1348  * Add llog object with regular FID to name space, and each llog
1349  * object on each MDT will be /update_log_dir/[seq:oid:ver],
1350  * so to satisfy the namespace consistency check, e2fsck etc.
1351  *
1352  * \param [in] env      execution environment
1353  * \param [in] dto      llog object
1354  * \param [in] th       thandle
1355  * \param [in] declare  if it is declare or execution
1356  *
1357  * \retval              0 if insertion succeeds.
1358  * \retval              negative errno if insertion fails.
1359  */
1360 static int
1361 llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
1362                                     struct dt_object *dto,
1363                                     struct thandle *th, bool declare)
1364 {
1365         struct llog_thread_info *lgi = llog_info(env);
1366         const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
1367         struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
1368         struct dt_object        *dir;
1369         char                    *name = lgi->lgi_name;
1370         int                     rc;
1371         ENTRY;
1372
1373         if (!fid_is_norm(fid))
1374                 RETURN(0);
1375
1376         dir = llog_osd_get_regular_fid_dir(env, dto);
1377         if (IS_ERR(dir))
1378                 RETURN(PTR_ERR(dir));
1379
1380         rec->rec_fid = fid;
1381         rec->rec_type = S_IFREG;
1382         snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
1383         dt_write_lock(env, dir, 0);
1384         if (declare) {
1385                 rc = dt_declare_insert(env, dir, (struct dt_rec *)rec,
1386                                (struct dt_key *)name, th);
1387         } else {
1388                 rc = dt_insert(env, dir, (struct dt_rec *)rec,
1389                                (struct dt_key *)name, th, 1);
1390         }
1391         dt_write_unlock(env, dir);
1392
1393         lu_object_put(env, &dir->do_lu);
1394         RETURN(rc);
1395 }
1396
1397
1398 /**
1399  * Implementation of the llog_operations::lop_declare_create
1400  *
1401  * This function declares the llog create. It declares also name insert
1402  * into llog directory in case of named llog.
1403  *
1404  * \param[in] env       execution environment
1405  * \param[in] res       llog handle of the current llog
1406  * \param[in] th        current transaction handle
1407  *
1408  * \retval              0 on successful create declaration
1409  * \retval              negative value on error
1410  */
1411 static int llog_osd_declare_create(const struct lu_env *env,
1412                                    struct llog_handle *res, struct thandle *th)
1413 {
1414         struct llog_thread_info         *lgi = llog_info(env);
1415         struct dt_insert_rec            *rec = &lgi->lgi_dt_rec;
1416         struct local_oid_storage        *los;
1417         struct dt_object                *o;
1418         int                              rc;
1419
1420         ENTRY;
1421
1422         LASSERT(res->lgh_obj);
1423         LASSERT(th);
1424
1425         /* object can be created by another thread */
1426         o = res->lgh_obj;
1427         if (dt_object_exists(o))
1428                 RETURN(0);
1429
1430         if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1431                 struct llog_thread_info *lgi = llog_info(env);
1432
1433                 lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE;
1434                 lgi->lgi_attr.la_size = 0;
1435                 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1436                 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1437
1438                 rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
1439                                        &lgi->lgi_dof, th);
1440                 if (rc < 0)
1441                         RETURN(rc);
1442
1443
1444                 rc = llog_osd_regular_fid_add_name_entry(env, o, th, true);
1445
1446                 RETURN(rc);
1447         }
1448         los = res->private_data;
1449         LASSERT(los);
1450
1451         rc = llog_osd_declare_new_object(env, los, o, th);
1452         if (rc)
1453                 RETURN(rc);
1454
1455         /* do not declare header initialization here as it's declared
1456          * in llog_osd_declare_write_rec() which is always called */
1457
1458         if (res->lgh_name) {
1459                 struct dt_object *llog_dir;
1460
1461                 llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
1462                 if (IS_ERR(llog_dir))
1463                         RETURN(PTR_ERR(llog_dir));
1464                 logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
1465                 rec->rec_fid = &lgi->lgi_fid;
1466                 rec->rec_type = S_IFREG;
1467                 rc = dt_declare_insert(env, llog_dir,
1468                                        (struct dt_rec *)rec,
1469                                        (struct dt_key *)res->lgh_name, th);
1470                 lu_object_put(env, &llog_dir->do_lu);
1471                 if (rc)
1472                         CERROR("%s: can't declare named llog %s: rc = %d\n",
1473                                o->do_lu.lo_dev->ld_obd->obd_name,
1474                                res->lgh_name, rc);
1475         }
1476         RETURN(rc);
1477 }
1478
1479 /**
1480  * Implementation of the llog_operations::lop_create
1481  *
1482  * This function creates the llog according with llog_handle::lgh_obj
1483  * and llog_handle::lgh_name.
1484  *
1485  * \param[in] env       execution environment
1486  * \param[in] res       llog handle of the current llog
1487  * \param[in] th        current transaction handle
1488  *
1489  * \retval              0 on successful create
1490  * \retval              negative value on error
1491  */
1492 static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
1493                            struct thandle *th)
1494 {
1495         struct llog_thread_info *lgi = llog_info(env);
1496         struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
1497         struct local_oid_storage *los;
1498         struct dt_object        *o;
1499         int                      rc = 0;
1500
1501         ENTRY;
1502
1503         LASSERT(env);
1504         o = res->lgh_obj;
1505         LASSERT(o);
1506
1507         /* llog can be already created */
1508         if (dt_object_exists(o))
1509                 RETURN(-EEXIST);
1510
1511         if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1512                 struct llog_thread_info *lgi = llog_info(env);
1513
1514                 lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE | LA_TYPE;
1515                 lgi->lgi_attr.la_size = 0;
1516                 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1517                 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1518
1519                 dt_write_lock(env, o, 0);
1520                 rc = dt_create(env, o, &lgi->lgi_attr, NULL,
1521                                &lgi->lgi_dof, th);
1522                 dt_write_unlock(env, o);
1523                 if (rc < 0)
1524                         RETURN(rc);
1525
1526                 rc = llog_osd_regular_fid_add_name_entry(env, o, th, false);
1527
1528                 RETURN(rc);
1529         }
1530
1531         los = res->private_data;
1532         LASSERT(los);
1533
1534         dt_write_lock(env, o, 0);
1535         if (!dt_object_exists(o))
1536                 rc = llog_osd_create_new_object(env, los, o, th);
1537         else
1538                 rc = -EEXIST;
1539
1540         dt_write_unlock(env, o);
1541         if (rc)
1542                 RETURN(rc);
1543
1544         if (res->lgh_name) {
1545                 struct dt_object *llog_dir;
1546
1547                 llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
1548                 if (IS_ERR(llog_dir))
1549                         RETURN(PTR_ERR(llog_dir));
1550
1551                 logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
1552                 rec->rec_fid = &lgi->lgi_fid;
1553                 rec->rec_type = S_IFREG;
1554                 dt_read_lock(env, llog_dir, 0);
1555                 rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
1556                                (struct dt_key *)res->lgh_name,
1557                                th, 1);
1558                 dt_read_unlock(env, llog_dir);
1559                 lu_object_put(env, &llog_dir->do_lu);
1560                 if (rc)
1561                         CERROR("%s: can't create named llog %s: rc = %d\n",
1562                                o->do_lu.lo_dev->ld_obd->obd_name,
1563                                res->lgh_name, rc);
1564         }
1565         RETURN(rc);
1566 }
1567
1568 /**
1569  * Implementation of the llog_operations::lop_close
1570  *
1571  * This function closes the llog. It just put llog object and referenced
1572  * local storage.
1573  *
1574  * \param[in] env       execution environment
1575  * \param[in] handle    llog handle of the current llog
1576  *
1577  * \retval              0 on successful llog close
1578  * \retval              negative value on error
1579  */
1580 static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
1581 {
1582         struct local_oid_storage        *los;
1583         int                              rc = 0;
1584
1585         ENTRY;
1586
1587         LASSERT(handle->lgh_obj);
1588
1589         if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1590                 /* Remove the object from the cache, otherwise it may
1591                  * hold LOD being released during cleanup process */
1592                 lu_object_put_nocache(env, &handle->lgh_obj->do_lu);
1593                 LASSERT(handle->private_data == NULL);
1594                 RETURN(rc);
1595         } else {
1596                 lu_object_put(env, &handle->lgh_obj->do_lu);
1597         }
1598         los = handle->private_data;
1599         LASSERT(los);
1600         dt_los_put(los);
1601
1602         if (handle->lgh_name)
1603                 OBD_FREE(handle->lgh_name, strlen(handle->lgh_name) + 1);
1604
1605         RETURN(rc);
1606 }
1607
1608 /**
1609  * delete llog object name entry
1610  *
1611  * Delete llog object (with regular FID) from name space (under
1612  * update_log_dir).
1613  *
1614  * \param [in] env      execution environment
1615  * \param [in] dto      llog object
1616  * \param [in] th       thandle
1617  * \param [in] declare  if it is declare or execution
1618  *
1619  * \retval              0 if deletion succeeds.
1620  * \retval              negative errno if deletion fails.
1621  */
1622 static int
1623 llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
1624                                     struct dt_object *dto,
1625                                     struct thandle *th, bool declare)
1626 {
1627         struct llog_thread_info *lgi = llog_info(env);
1628         const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
1629         struct dt_object        *dir;
1630         char                    *name = lgi->lgi_name;
1631         int                     rc;
1632         ENTRY;
1633
1634         if (!fid_is_norm(fid))
1635                 RETURN(0);
1636
1637         dir = llog_osd_get_regular_fid_dir(env, dto);
1638         if (IS_ERR(dir))
1639                 RETURN(PTR_ERR(dir));
1640
1641         snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
1642         dt_write_lock(env, dir, 0);
1643         if (declare) {
1644                 rc = dt_declare_delete(env, dir, (struct dt_key *)name,
1645                                        th);
1646         } else {
1647                 rc = dt_delete(env, dir, (struct dt_key *)name, th);
1648         }
1649         dt_write_unlock(env, dir);
1650
1651         lu_object_put(env, &dir->do_lu);
1652         RETURN(rc);
1653 }
1654
1655 /**
1656  * Implementation of the llog_operations::lop_declare_destroy
1657  *
1658  * This function declare destroys the llog and deletes also entry in the
1659  * llog directory in case of named llog. Llog should be opened prior that.
1660  *
1661  * \param[in] env               execution environment
1662  * \param[in] loghandle llog handle of the current llog
1663  *
1664  * \retval              0 on successful destroy
1665  * \retval              negative value on error
1666  */
1667 static int llog_osd_declare_destroy(const struct lu_env *env,
1668                                     struct llog_handle *loghandle,
1669                                     struct thandle *th)
1670 {
1671         struct llog_ctxt        *ctxt;
1672         struct dt_object        *o, *llog_dir = NULL;
1673         int                      rc;
1674
1675         ENTRY;
1676
1677         ctxt = loghandle->lgh_ctxt;
1678         LASSERT(ctxt);
1679
1680         o = loghandle->lgh_obj;
1681         LASSERT(o);
1682
1683         if (loghandle->lgh_name) {
1684                 llog_dir = llog_osd_dir_get(env, ctxt);
1685                 if (IS_ERR(llog_dir))
1686                         RETURN(PTR_ERR(llog_dir));
1687
1688                 rc = dt_declare_delete(env, llog_dir,
1689                                        (struct dt_key *)loghandle->lgh_name,
1690                                        th);
1691                 if (rc < 0)
1692                         GOTO(out_put, rc);
1693         }
1694
1695         rc = dt_declare_ref_del(env, o, th);
1696         if (rc < 0)
1697                 GOTO(out_put, rc);
1698
1699         rc = dt_declare_destroy(env, o, th);
1700         if (rc < 0)
1701                 GOTO(out_put, rc);
1702
1703         if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1704                 rc = llog_osd_regular_fid_del_name_entry(env, o, th, true);
1705                 if (rc < 0)
1706                         GOTO(out_put, rc);
1707         }
1708
1709 out_put:
1710         if (!(IS_ERR_OR_NULL(llog_dir)))
1711                 lu_object_put(env, &llog_dir->do_lu);
1712
1713         RETURN(rc);
1714 }
1715
1716
1717 /**
1718  * Implementation of the llog_operations::lop_destroy
1719  *
1720  * This function destroys the llog and deletes also entry in the
1721  * llog directory in case of named llog. Llog should be opened prior that.
1722  * Destroy method is not part of external transaction and does everything
1723  * inside.
1724  *
1725  * \param[in] env               execution environment
1726  * \param[in] loghandle llog handle of the current llog
1727  *
1728  * \retval              0 on successful destroy
1729  * \retval              negative value on error
1730  */
1731 static int llog_osd_destroy(const struct lu_env *env,
1732                             struct llog_handle *loghandle, struct thandle *th)
1733 {
1734         struct llog_ctxt        *ctxt;
1735         struct dt_object        *o, *llog_dir = NULL;
1736         int                      rc;
1737
1738         ENTRY;
1739
1740         ctxt = loghandle->lgh_ctxt;
1741         LASSERT(ctxt != NULL);
1742
1743         o = loghandle->lgh_obj;
1744         LASSERT(o != NULL);
1745
1746         dt_write_lock(env, o, 0);
1747         if (!dt_object_exists(o))
1748                 GOTO(out_unlock, rc = 0);
1749
1750         if (loghandle->lgh_name) {
1751                 llog_dir = llog_osd_dir_get(env, ctxt);
1752                 if (IS_ERR(llog_dir))
1753                         GOTO(out_unlock, rc = PTR_ERR(llog_dir));
1754
1755                 dt_read_lock(env, llog_dir, 0);
1756                 rc = dt_delete(env, llog_dir,
1757                                (struct dt_key *)loghandle->lgh_name,
1758                                th);
1759                 dt_read_unlock(env, llog_dir);
1760                 if (rc) {
1761                         CERROR("%s: can't remove llog %s: rc = %d\n",
1762                                o->do_lu.lo_dev->ld_obd->obd_name,
1763                                loghandle->lgh_name, rc);
1764                         GOTO(out_unlock, rc);
1765                 }
1766         }
1767
1768         dt_ref_del(env, o, th);
1769         rc = dt_destroy(env, o, th);
1770         if (rc < 0)
1771                 GOTO(out_unlock, rc);
1772
1773         if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
1774                 rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
1775                 if (rc < 0)
1776                         GOTO(out_unlock, rc);
1777         }
1778
1779 out_unlock:
1780         dt_write_unlock(env, o);
1781         if (!(IS_ERR_OR_NULL(llog_dir)))
1782                 lu_object_put(env, &llog_dir->do_lu);
1783         RETURN(rc);
1784 }
1785
1786 /**
1787  * Implementation of the llog_operations::lop_setup
1788  *
1789  * This function setup the llog on local storage.
1790  *
1791  * \param[in] env       execution environment
1792  * \param[in] obd       obd device the llog belongs to
1793  * \param[in] olg       the llog group, it is always zero group now.
1794  * \param[in] ctxt_idx  the llog index, it defines the purpose of this llog.
1795  *                      Every new llog type have to use own index.
1796  * \param[in] disk_obd  the storage obd, where llog is stored.
1797  *
1798  * \retval              0 on successful llog setup
1799  * \retval              negative value on error
1800  */
1801 static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd,
1802                           struct obd_llog_group *olg, int ctxt_idx,
1803                           struct obd_device *disk_obd)
1804 {
1805         struct llog_thread_info         *lgi = llog_info(env);
1806         struct llog_ctxt                *ctxt;
1807         int                              rc = 0;
1808         ENTRY;
1809
1810         LASSERT(obd);
1811         LASSERT(olg->olg_ctxts[ctxt_idx]);
1812
1813         ctxt = llog_ctxt_get(olg->olg_ctxts[ctxt_idx]);
1814         LASSERT(ctxt);
1815
1816         if (disk_obd == NULL)
1817                 GOTO(out, rc = 0);
1818
1819         /* initialize data allowing to generate new fids,
1820          * literally we need a sequece */
1821         lgi->lgi_fid.f_seq = FID_SEQ_LLOG;
1822         lgi->lgi_fid.f_oid = 1;
1823         lgi->lgi_fid.f_ver = 0;
1824         rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
1825                                     &lgi->lgi_fid,
1826                                     &ctxt->loc_los_nameless);
1827         if (rc != 0)
1828                 GOTO(out, rc);
1829
1830         lgi->lgi_fid.f_seq = FID_SEQ_LLOG_NAME;
1831         lgi->lgi_fid.f_oid = 1;
1832         lgi->lgi_fid.f_ver = 0;
1833         rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
1834                                     &lgi->lgi_fid,
1835                                     &ctxt->loc_los_named);
1836         if (rc != 0) {
1837                 local_oid_storage_fini(env, ctxt->loc_los_nameless);
1838                 ctxt->loc_los_nameless = NULL;
1839         }
1840
1841         GOTO(out, rc);
1842
1843 out:
1844         llog_ctxt_put(ctxt);
1845         return rc;
1846 }
1847
1848 /**
1849  * Implementation of the llog_operations::lop_cleanup
1850  *
1851  * This function cleanups the llog on local storage.
1852  *
1853  * \param[in] env       execution environment
1854  * \param[in] ctxt      the llog context
1855  *
1856  * \retval              0
1857  */
1858 static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
1859 {
1860         if (ctxt->loc_los_nameless != NULL) {
1861                 local_oid_storage_fini(env, ctxt->loc_los_nameless);
1862                 ctxt->loc_los_nameless = NULL;
1863         }
1864
1865         if (ctxt->loc_los_named != NULL) {
1866                 local_oid_storage_fini(env, ctxt->loc_los_named);
1867                 ctxt->loc_los_named = NULL;
1868         }
1869
1870         return 0;
1871 }
1872
1873 struct llog_operations llog_osd_ops = {
1874         .lop_next_block         = llog_osd_next_block,
1875         .lop_prev_block         = llog_osd_prev_block,
1876         .lop_read_header        = llog_osd_read_header,
1877         .lop_declare_destroy    = llog_osd_declare_destroy,
1878         .lop_destroy            = llog_osd_destroy,
1879         .lop_setup              = llog_osd_setup,
1880         .lop_cleanup            = llog_osd_cleanup,
1881         .lop_open               = llog_osd_open,
1882         .lop_exist              = llog_osd_exist,
1883         .lop_declare_create     = llog_osd_declare_create,
1884         .lop_create             = llog_osd_create,
1885         .lop_declare_write_rec  = llog_osd_declare_write_rec,
1886         .lop_write_rec          = llog_osd_write_rec,
1887         .lop_close              = llog_osd_close,
1888 };
1889 EXPORT_SYMBOL(llog_osd_ops);
1890
1891 struct llog_operations llog_common_cat_ops = {
1892         .lop_next_block         = llog_osd_next_block,
1893         .lop_prev_block         = llog_osd_prev_block,
1894         .lop_read_header        = llog_osd_read_header,
1895         .lop_declare_destroy    = llog_osd_declare_destroy,
1896         .lop_destroy            = llog_osd_destroy,
1897         .lop_setup              = llog_osd_setup,
1898         .lop_cleanup            = llog_osd_cleanup,
1899         .lop_open               = llog_osd_open,
1900         .lop_exist              = llog_osd_exist,
1901         .lop_declare_create     = llog_osd_declare_create,
1902         .lop_create             = llog_osd_create,
1903         .lop_declare_write_rec  = llog_osd_declare_write_rec,
1904         .lop_write_rec          = llog_osd_write_rec,
1905         .lop_close              = llog_osd_close,
1906         .lop_add                = llog_cat_add_rec,
1907         .lop_declare_add        = llog_cat_declare_add_rec,
1908 };
1909 EXPORT_SYMBOL(llog_common_cat_ops);
1910
1911 /**
1912  * Read the special file which contains the list of llog catalogs IDs
1913  *
1914  * This function reads the CATALOGS file which contains the array of llog
1915  * catalogs IDs. The main purpose of this file is to store OSP llogs indexed
1916  * by OST/MDT number.
1917  *
1918  * \param[in]  env              execution environment
1919  * \param[in]  d                corresponding storage device
1920  * \param[in]  idx              position to start from, usually OST/MDT index
1921  * \param[in]  count            how many catalog IDs to read
1922  * \param[out] idarray          the buffer for the data. If it is NULL then
1923  *                              function returns just number of catalog IDs
1924  *                              in the file.
1925  * \param[in]  fid              LLOG_CATALOGS_OID for CATALOG object
1926  *
1927  * \retval                      0 on successful read of catalog IDs
1928  * \retval                      negative value on error
1929  * \retval                      positive value which is number of records in
1930  *                              the file if \a idarray is NULL
1931  */
1932 int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
1933                           int idx, int count, struct llog_catid *idarray,
1934                           const struct lu_fid *fid)
1935 {
1936         struct llog_thread_info *lgi = llog_info(env);
1937         struct dt_object        *o = NULL;
1938         struct thandle          *th;
1939         int                      rc, size;
1940
1941         ENTRY;
1942
1943         LASSERT(d);
1944
1945         size = sizeof(*idarray) * count;
1946         lgi->lgi_off = idx *  sizeof(*idarray);
1947
1948         lgi->lgi_fid = *fid;
1949         o = dt_locate(env, d, &lgi->lgi_fid);
1950         if (IS_ERR(o))
1951                 RETURN(PTR_ERR(o));
1952
1953         if (!dt_object_exists(o)) {
1954                 th = dt_trans_create(env, d);
1955                 if (IS_ERR(th))
1956                         GOTO(out, rc = PTR_ERR(th));
1957
1958                 lgi->lgi_attr.la_valid = LA_MODE;
1959                 lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1960                 lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
1961
1962                 th->th_wait_submit = 1;
1963                 /* Make the llog object creation synchronization, so
1964                  * it will be reliable to the reference, especially
1965                  * for remote reference */
1966                 th->th_sync = 1;
1967
1968                 rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
1969                                        &lgi->lgi_dof, th);
1970                 if (rc)
1971                         GOTO(out_trans, rc);
1972
1973                 rc = dt_trans_start_local(env, d, th);
1974                 if (rc)
1975                         GOTO(out_trans, rc);
1976
1977                 dt_write_lock(env, o, 0);
1978                 if (!dt_object_exists(o))
1979                         rc = dt_create(env, o, &lgi->lgi_attr, NULL,
1980                                        &lgi->lgi_dof, th);
1981                 dt_write_unlock(env, o);
1982 out_trans:
1983                 dt_trans_stop(env, d, th);
1984                 if (rc)
1985                         GOTO(out, rc);
1986         }
1987
1988         rc = dt_attr_get(env, o, &lgi->lgi_attr);
1989         if (rc)
1990                 GOTO(out, rc);
1991
1992         if (!S_ISREG(lgi->lgi_attr.la_mode)) {
1993                 CERROR("%s: CATALOGS is not a regular file!: mode = %o\n",
1994                        o->do_lu.lo_dev->ld_obd->obd_name,
1995                        lgi->lgi_attr.la_mode);
1996                 GOTO(out, rc = -ENOENT);
1997         }
1998
1999         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
2000                (int)lgi->lgi_attr.la_size, size);
2001
2002         /* return just number of llogs */
2003         if (idarray == NULL) {
2004                 rc = lgi->lgi_attr.la_size / sizeof(*idarray);
2005                 GOTO(out, rc);
2006         }
2007
2008         /* read for new ost index or for empty file */
2009         memset(idarray, 0, size);
2010         if (lgi->lgi_attr.la_size <= lgi->lgi_off)
2011                 GOTO(out, rc = 0);
2012         if (lgi->lgi_attr.la_size < lgi->lgi_off + size)
2013                 size = lgi->lgi_attr.la_size - lgi->lgi_off;
2014
2015         lgi->lgi_buf.lb_buf = idarray;
2016         lgi->lgi_buf.lb_len = size;
2017         rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
2018         /* -EFAULT means the llog is a sparse file. This is not an error
2019          * after arbitrary OST index is supported. */
2020         if (rc < 0 && rc != -EFAULT) {
2021                 CERROR("%s: error reading CATALOGS: rc = %d\n",
2022                        o->do_lu.lo_dev->ld_obd->obd_name,  rc);
2023                 GOTO(out, rc);
2024         }
2025
2026         EXIT;
2027 out:
2028         lu_object_put(env, &o->do_lu);
2029         RETURN(rc);
2030 }
2031 EXPORT_SYMBOL(llog_osd_get_cat_list);
2032
2033 /**
2034  * Write the special file which contains the list of llog catalogs IDs
2035  *
2036  * This function writes the CATALOG file which contains the array of llog
2037  * catalogs IDs. It is used mostly to store OSP llogs indexed by OST/MDT
2038  * number.
2039  *
2040  * \param[in]  env      execution environment
2041  * \param[in]  d        corresponding storage device
2042  * \param[in]  idx      position to start from, usually OST/MDT index
2043  * \param[in]  count    how many catalog IDs to write
2044  * \param[out] idarray  the buffer with the data to write.
2045  * \param[in]  fid      LLOG_CATALOGS_OID for CATALOG object
2046  *
2047  * \retval              0 on successful write of catalog IDs
2048  * \retval              negative value on error
2049  */
2050 int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
2051                           int idx, int count, struct llog_catid *idarray,
2052                           const struct lu_fid *fid)
2053 {
2054         struct llog_thread_info *lgi = llog_info(env);
2055         struct dt_object        *o = NULL;
2056         struct thandle          *th;
2057         int                      rc, size;
2058
2059         if (count == 0)
2060                 RETURN(0);
2061
2062         LASSERT(d);
2063
2064         size = sizeof(*idarray) * count;
2065         lgi->lgi_off = idx * sizeof(*idarray);
2066         lgi->lgi_fid = *fid;
2067
2068         o = dt_locate(env, d, &lgi->lgi_fid);
2069         if (IS_ERR(o))
2070                 RETURN(PTR_ERR(o));
2071
2072         if (!dt_object_exists(o))
2073                 GOTO(out, rc = -ENOENT);
2074
2075         rc = dt_attr_get(env, o, &lgi->lgi_attr);
2076         if (rc)
2077                 GOTO(out, rc);
2078
2079         if (!S_ISREG(lgi->lgi_attr.la_mode)) {
2080                 CERROR("%s: CATALOGS is not a regular file!: mode = %o\n",
2081                        o->do_lu.lo_dev->ld_obd->obd_name,
2082                        lgi->lgi_attr.la_mode);
2083                 GOTO(out, rc = -ENOENT);
2084         }
2085
2086         th = dt_trans_create(env, d);
2087         if (IS_ERR(th))
2088                 GOTO(out, rc = PTR_ERR(th));
2089
2090         lgi->lgi_buf.lb_len = size;
2091         lgi->lgi_buf.lb_buf = idarray;
2092         rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
2093         if (rc)
2094                 GOTO(out_trans, rc);
2095
2096         /* For update log, this happens during initialization,
2097          * see lod_sub_prep_llog(), and we need make sure catlog
2098          * file ID is written to catlist file(committed) before
2099          * cross-MDT operation write update records to catlog FILE,
2100          * otherwise, during failover these update records might
2101          * missing */
2102         if (fid_is_update_log(fid))
2103                 th->th_sync = 1;
2104
2105         rc = dt_trans_start_local(env, d, th);
2106         if (rc)
2107                 GOTO(out_trans, rc);
2108
2109         th->th_wait_submit = 1;
2110
2111         rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
2112         if (rc)
2113                 CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n",
2114                        idx, rc);
2115 out_trans:
2116         dt_trans_stop(env, d, th);
2117 out:
2118         lu_object_put(env, &o->do_lu);
2119         RETURN(rc);
2120 }
2121 EXPORT_SYMBOL(llog_osd_put_cat_list);