Whamcloud - gitweb
LU-1302 llog: modify llog_write/llog_add to support OSD
[fs/lustre-release.git] / lustre / obdclass / llog_lvfs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/obdclass/llog_lvfs.c
35  *
36  * OST<->MDS recovery logging infrastructure.
37  * Invariants in implementation:
38  * - we do not share logs among different OST<->MDS connections, so that
39  *   if an OST or MDS fails it need only look at log(s) relevant to itself
40  *
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_log.h>
53 #include <obd_ost.h>
54 #include <libcfs/list.h>
55 #include <lvfs.h>
56 #include <lustre_fsfilt.h>
57 #include <lustre_disk.h>
58 #include "llog_internal.h"
59
60 #if defined(__KERNEL__) && defined(LLOG_LVFS)
61
62 static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
63                                 int len, int index)
64 {
65         struct llog_rec_hdr rec = { 0 };
66         struct llog_rec_tail tail;
67         int rc;
68         ENTRY;
69
70         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
71
72         tail.lrt_len = rec.lrh_len = len;
73         tail.lrt_index = rec.lrh_index = index;
74         rec.lrh_type = LLOG_PAD_MAGIC;
75
76         rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
77         if (rc) {
78                 CERROR("error writing padding record: rc %d\n", rc);
79                 goto out;
80         }
81
82         file->f_pos += len - sizeof(rec) - sizeof(tail);
83         rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
84         if (rc) {
85                 CERROR("error writing padding record: rc %d\n", rc);
86                 goto out;
87         }
88
89  out:
90         RETURN(rc);
91 }
92
93 static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
94                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
95 {
96         int rc;
97         struct llog_rec_tail end;
98         loff_t saved_off = file->f_pos;
99         int buflen = rec->lrh_len;
100
101         ENTRY;
102
103         file->f_pos = off;
104
105         if (buflen == 0)
106                 CWARN("0-length record\n");
107
108         if (!buf) {
109                 rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
110                 if (rc) {
111                         CERROR("error writing log record: rc %d\n", rc);
112                         goto out;
113                 }
114                 GOTO(out, rc = 0);
115         }
116
117         /* the buf case */
118         rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
119         rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
120         if (rc) {
121                 CERROR("error writing log hdr: rc %d\n", rc);
122                 goto out;
123         }
124
125         rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
126         if (rc) {
127                 CERROR("error writing log buffer: rc %d\n", rc);
128                 goto out;
129         }
130
131         end.lrt_len = rec->lrh_len;
132         end.lrt_index = rec->lrh_index;
133         rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
134         if (rc) {
135                 CERROR("error writing log tail: rc %d\n", rc);
136                 goto out;
137         }
138
139         rc = 0;
140  out:
141         if (saved_off > file->f_pos)
142                 file->f_pos = saved_off;
143         LASSERT(rc <= 0);
144         RETURN(rc);
145 }
146
147 static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
148                                 void *buf, int size, loff_t off)
149 {
150         loff_t offset = off;
151         int rc;
152         ENTRY;
153
154         rc = fsfilt_read_record(obd, file, buf, size, &offset);
155         if (rc) {
156                 CERROR("error reading log record: rc %d\n", rc);
157                 RETURN(rc);
158         }
159         RETURN(0);
160 }
161
162 static int llog_lvfs_read_header(const struct lu_env *env,
163                                  struct llog_handle *handle)
164 {
165         struct obd_device *obd;
166         int rc;
167         ENTRY;
168
169         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
170
171         obd = handle->lgh_ctxt->loc_exp->exp_obd;
172
173         if (i_size_read(handle->lgh_file->f_dentry->d_inode) == 0) {
174                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
175                 RETURN(LLOG_EEMPTY);
176         }
177
178         rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
179                                  LLOG_CHUNK_SIZE, 0);
180         if (rc) {
181                 CERROR("error reading log header from %.*s\n",
182                        handle->lgh_file->f_dentry->d_name.len,
183                        handle->lgh_file->f_dentry->d_name.name);
184         } else {
185                 struct llog_rec_hdr *llh_hdr = &handle->lgh_hdr->llh_hdr;
186
187                 if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
188                         lustre_swab_llog_hdr(handle->lgh_hdr);
189
190                 if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
191                         CERROR("bad log %.*s header magic: %#x (expected %#x)\n",
192                                handle->lgh_file->f_dentry->d_name.len,
193                                handle->lgh_file->f_dentry->d_name.name,
194                                llh_hdr->lrh_type, LLOG_HDR_MAGIC);
195                         rc = -EIO;
196                 } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
197                         CERROR("incorrectly sized log %.*s header: %#x "
198                                "(expected %#x)\n",
199                                handle->lgh_file->f_dentry->d_name.len,
200                                handle->lgh_file->f_dentry->d_name.name,
201                                llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
202                         CERROR("you may need to re-run lconf --write_conf.\n");
203                         rc = -EIO;
204                 }
205         }
206
207         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
208         handle->lgh_file->f_pos = i_size_read(handle->lgh_file->f_dentry->d_inode);
209
210         RETURN(rc);
211 }
212
213 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
214 /* appends if idx == -1, otherwise overwrites record idx. */
215 static int llog_lvfs_write_rec(const struct lu_env *env,
216                                struct llog_handle *loghandle,
217                                struct llog_rec_hdr *rec,
218                                struct llog_cookie *reccookie, int cookiecount,
219                                void *buf, int idx, struct thandle *th)
220 {
221         struct llog_log_hdr *llh;
222         int reclen = rec->lrh_len, index, rc;
223         struct llog_rec_tail *lrt;
224         struct obd_device *obd;
225         struct file *file;
226         size_t left;
227         ENTRY;
228
229         llh = loghandle->lgh_hdr;
230         file = loghandle->lgh_file;
231         obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
232
233         /* record length should not bigger than LLOG_CHUNK_SIZE */
234         if (buf)
235                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
236                       sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
237         else
238                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
239         if (rc)
240                 RETURN(rc);
241
242         if (buf)
243                 /* write_blob adds header and tail to lrh_len. */
244                 reclen = sizeof(*rec) + rec->lrh_len +
245                          sizeof(struct llog_rec_tail);
246
247         if (idx != -1) {
248                 loff_t saved_offset;
249
250                 /* no header: only allowed to insert record 1 */
251                 if (idx != 1 && !i_size_read(file->f_dentry->d_inode)) {
252                         CERROR("idx != -1 in empty log\n");
253                         LBUG();
254                 }
255
256                 if (idx && llh->llh_size && llh->llh_size != rec->lrh_len)
257                         RETURN(-EINVAL);
258
259                 if (!ext2_test_bit(idx, llh->llh_bitmap))
260                         CERROR("Modify unset record %u\n", idx);
261                 if (idx != rec->lrh_index)
262                         CERROR("Index mismatch %d %u\n", idx, rec->lrh_index);
263
264                 rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
265                 /* we are done if we only write the header or on error */
266                 if (rc || idx == 0)
267                         RETURN(rc);
268
269                 if (buf) {
270                         /* We assume that caller has set lgh_cur_* */
271                         saved_offset = loghandle->lgh_cur_offset;
272                         CDEBUG(D_OTHER,
273                                "modify record "LPX64": idx:%d/%u/%d, len:%u "
274                                "offset %llu\n",
275                                loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
276                                loghandle->lgh_cur_idx, rec->lrh_len,
277                                (long long)(saved_offset - sizeof(*llh)));
278                         if (rec->lrh_index != loghandle->lgh_cur_idx) {
279                                 CERROR("modify idx mismatch %u/%d\n",
280                                        idx, loghandle->lgh_cur_idx);
281                                 RETURN(-EFAULT);
282                         }
283                 } else {
284                         /* Assumes constant lrh_len */
285                         saved_offset = sizeof(*llh) + (idx - 1) * reclen;
286                 }
287
288                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
289                 if (rc == 0 && reccookie) {
290                         reccookie->lgc_lgl = loghandle->lgh_id;
291                         reccookie->lgc_index = idx;
292                         rc = 1;
293                 }
294                 RETURN(rc);
295         }
296
297         /* Make sure that records don't cross a chunk boundary, so we can
298          * process them page-at-a-time if needed.  If it will cross a chunk
299          * boundary, write in a fake (but referenced) entry to pad the chunk.
300          *
301          * We know that llog_current_log() will return a loghandle that is
302          * big enough to hold reclen, so all we care about is padding here.
303          */
304         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
305
306         /* NOTE: padding is a record, but no bit is set */
307         if (left != 0 && left != reclen &&
308             left < (reclen + LLOG_MIN_REC_SIZE)) {
309                  index = loghandle->lgh_last_idx + 1;
310                  rc = llog_lvfs_pad(obd, file, left, index);
311                  if (rc)
312                          RETURN(rc);
313                  loghandle->lgh_last_idx++; /*for pad rec*/
314          }
315          /* if it's the last idx in log file, then return -ENOSPC */
316          if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1)
317                  RETURN(-ENOSPC);
318         loghandle->lgh_last_idx++;
319         index = loghandle->lgh_last_idx;
320         LASSERT(index < LLOG_BITMAP_SIZE(llh));
321         rec->lrh_index = index;
322         if (buf == NULL) {
323                 lrt = (struct llog_rec_tail *)
324                         ((char *)rec + rec->lrh_len - sizeof(*lrt));
325                 lrt->lrt_len = rec->lrh_len;
326                 lrt->lrt_index = rec->lrh_index;
327         }
328         /*The caller should make sure only 1 process access the lgh_last_idx,
329          *Otherwise it might hit the assert.*/
330         LASSERT(index < LLOG_BITMAP_SIZE(llh));
331         cfs_spin_lock(&loghandle->lgh_hdr_lock);
332         if (ext2_set_bit(index, llh->llh_bitmap)) {
333                 CERROR("argh, index %u already set in log bitmap?\n", index);
334                 cfs_spin_unlock(&loghandle->lgh_hdr_lock);
335                 LBUG(); /* should never happen */
336         }
337         llh->llh_count++;
338         cfs_spin_unlock(&loghandle->lgh_hdr_lock);
339         llh->llh_tail.lrt_index = index;
340
341         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
342         if (rc)
343                 RETURN(rc);
344
345         rc = llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
346         if (rc)
347                 RETURN(rc);
348
349         CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u \n",
350                loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
351         if (rc == 0 && reccookie) {
352                 reccookie->lgc_lgl = loghandle->lgh_id;
353                 reccookie->lgc_index = index;
354                 if ((rec->lrh_type == MDS_UNLINK_REC) ||
355                     (rec->lrh_type == MDS_SETATTR64_REC))
356                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
357                 else if (rec->lrh_type == OST_SZ_REC)
358                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
359                 else
360                         reccookie->lgc_subsys = -1;
361                 rc = 1;
362         }
363         if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
364                 rc = 1;
365
366         RETURN(rc);
367 }
368
369 /* We can skip reading at least as many log blocks as the number of
370 * minimum sized log records we are skipping.  If it turns out
371 * that we are not far enough along the log (because the
372 * actual records are larger than minimum size) we just skip
373 * some more records. */
374
375 static void llog_skip_over(__u64 *off, int curr, int goal)
376 {
377         if (goal <= curr)
378                 return;
379         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
380                 ~(LLOG_CHUNK_SIZE - 1);
381 }
382
383
384 /* sets:
385  *  - cur_offset to the furthest point read in the log file
386  *  - cur_idx to the log index preceeding cur_offset
387  * returns -EIO/-EINVAL on error
388  */
389 static int llog_lvfs_next_block(const struct lu_env *env,
390                                 struct llog_handle *loghandle, int *cur_idx,
391                                 int next_idx, __u64 *cur_offset, void *buf,
392                                 int len)
393 {
394         int rc;
395         ENTRY;
396
397         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
398                 RETURN(-EINVAL);
399
400         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
401                next_idx, *cur_idx, *cur_offset);
402
403         while (*cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
404                 struct llog_rec_hdr *rec, *last_rec;
405                 struct llog_rec_tail *tail;
406                 loff_t ppos;
407                 int llen;
408
409                 llog_skip_over(cur_offset, *cur_idx, next_idx);
410
411                 /* read up to next LLOG_CHUNK_SIZE block */
412                 ppos = *cur_offset;
413                 llen = LLOG_CHUNK_SIZE - (*cur_offset & (LLOG_CHUNK_SIZE - 1));
414                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
415                                         loghandle->lgh_file, buf, llen,
416                                         cur_offset);
417                 if (rc < 0) {
418                         CERROR("Cant read llog block at log id "LPU64
419                                "/%u offset "LPU64"\n",
420                                loghandle->lgh_id.lgl_oid,
421                                loghandle->lgh_id.lgl_ogen,
422                                *cur_offset);
423                         RETURN(rc);
424                 }
425
426                 /* put number of bytes read into rc to make code simpler */
427                 rc = *cur_offset - ppos;
428                 if (rc < len) {
429                         /* signal the end of the valid buffer to llog_process */
430                         memset(buf + rc, 0, len - rc);
431                 }
432
433                 if (rc == 0) /* end of file, nothing to do */
434                         RETURN(0);
435
436                 if (rc < sizeof(*tail)) {
437                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
438                                LPU64"\n", loghandle->lgh_id.lgl_oid,
439                                loghandle->lgh_id.lgl_ogen, *cur_offset);
440                         RETURN(-EINVAL);
441                 }
442
443                 rec = buf;
444                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
445                         lustre_swab_llog_rec(rec);
446
447                 tail = (struct llog_rec_tail *)(buf + rc -
448                                                 sizeof(struct llog_rec_tail));
449
450                 /* get the last record in block */
451                 last_rec = (struct llog_rec_hdr *)(buf + rc -
452                                                    le32_to_cpu(tail->lrt_len));
453
454                 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
455                         lustre_swab_llog_rec(last_rec);
456                 LASSERT(last_rec->lrh_index == tail->lrt_index);
457
458                 *cur_idx = tail->lrt_index;
459
460                 /* this shouldn't happen */
461                 if (tail->lrt_index == 0) {
462                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
463                                LPU64"\n", loghandle->lgh_id.lgl_oid,
464                                loghandle->lgh_id.lgl_ogen, *cur_offset);
465                         RETURN(-EINVAL);
466                 }
467                 if (tail->lrt_index < next_idx)
468                         continue;
469
470                 /* sanity check that the start of the new buffer is no farther
471                  * than the record that we wanted.  This shouldn't happen. */
472                 if (rec->lrh_index > next_idx) {
473                         CERROR("missed desired record? %u > %u\n",
474                                rec->lrh_index, next_idx);
475                         RETURN(-ENOENT);
476                 }
477                 RETURN(0);
478         }
479         RETURN(-EIO);
480 }
481
482 static int llog_lvfs_prev_block(const struct lu_env *env,
483                                 struct llog_handle *loghandle,
484                                 int prev_idx, void *buf, int len)
485 {
486         __u64 cur_offset;
487         int rc;
488         ENTRY;
489
490         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
491                 RETURN(-EINVAL);
492
493         CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
494
495         cur_offset = LLOG_CHUNK_SIZE;
496         llog_skip_over(&cur_offset, 0, prev_idx);
497
498         while (cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
499                 struct llog_rec_hdr *rec, *last_rec;
500                 struct llog_rec_tail *tail;
501                 loff_t ppos = cur_offset;
502
503                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
504                                         loghandle->lgh_file, buf, len,
505                                         &cur_offset);
506                 if (rc < 0) {
507                         CERROR("Cant read llog block at log id "LPU64
508                                "/%u offset "LPU64"\n",
509                                loghandle->lgh_id.lgl_oid,
510                                loghandle->lgh_id.lgl_ogen,
511                                cur_offset);
512                         RETURN(rc);
513                 }
514
515                 /* put number of bytes read into rc to make code simpler */
516                 rc = cur_offset - ppos;
517
518                 if (rc == 0) /* end of file, nothing to do */
519                         RETURN(0);
520
521                 if (rc < sizeof(*tail)) {
522                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
523                                LPU64"\n", loghandle->lgh_id.lgl_oid,
524                                loghandle->lgh_id.lgl_ogen, cur_offset);
525                         RETURN(-EINVAL);
526                 }
527
528                 rec = buf;
529                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
530                         lustre_swab_llog_rec(rec);
531
532                 tail = (struct llog_rec_tail *)(buf + rc -
533                                                 sizeof(struct llog_rec_tail));
534
535                 /* get the last record in block */
536                 last_rec = (struct llog_rec_hdr *)(buf + rc -
537                                                    le32_to_cpu(tail->lrt_len));
538
539                 if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
540                         lustre_swab_llog_rec(last_rec);
541                 LASSERT(last_rec->lrh_index == tail->lrt_index);
542
543                 /* this shouldn't happen */
544                 if (tail->lrt_index == 0) {
545                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
546                                LPU64"\n", loghandle->lgh_id.lgl_oid,
547                                loghandle->lgh_id.lgl_ogen, cur_offset);
548                         RETURN(-EINVAL);
549                 }
550                 if (tail->lrt_index < prev_idx)
551                         continue;
552
553                 /* sanity check that the start of the new buffer is no farther
554                  * than the record that we wanted.  This shouldn't happen. */
555                 if (rec->lrh_index > prev_idx) {
556                         CERROR("missed desired record? %u > %u\n",
557                                rec->lrh_index, prev_idx);
558                         RETURN(-ENOENT);
559                 }
560                 RETURN(0);
561         }
562         RETURN(-EIO);
563 }
564
565 static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
566 {
567         char *logname;
568         struct file *filp;
569         int len;
570
571         OBD_ALLOC(logname, PATH_MAX);
572         if (logname == NULL)
573                 return ERR_PTR(-ENOMEM);
574
575         len = snprintf(logname, PATH_MAX, "%s/%s", dir, name);
576         if (len >= PATH_MAX - 1) {
577                 filp = ERR_PTR(-ENAMETOOLONG);
578         } else {
579                 filp = l_filp_open(logname, flags, mode);
580                 if (IS_ERR(filp) && PTR_ERR(filp) != -ENOENT)
581                         CERROR("logfile creation %s: %ld\n", logname,
582                                PTR_ERR(filp));
583         }
584         OBD_FREE(logname, PATH_MAX);
585         return filp;
586 }
587
588 static int llog_lvfs_open(const struct lu_env *env,  struct llog_handle *handle,
589                           struct llog_logid *logid, char *name,
590                           enum llog_open_param open_param)
591 {
592         struct llog_ctxt        *ctxt = handle->lgh_ctxt;
593         struct l_dentry         *dchild = NULL;
594         struct obd_device       *obd;
595         int                      rc = 0;
596
597         ENTRY;
598
599         LASSERT(ctxt);
600         LASSERT(ctxt->loc_exp);
601         LASSERT(ctxt->loc_exp->exp_obd);
602         obd = ctxt->loc_exp->exp_obd;
603
604         LASSERT(handle);
605         if (logid != NULL) {
606                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
607                                              logid->lgl_ogen, logid->lgl_oseq);
608                 if (IS_ERR(dchild)) {
609                         rc = PTR_ERR(dchild);
610                         CERROR("%s: error looking up logfile #"LPX64"#"
611                                LPX64"#%08x: rc = %d\n",
612                                ctxt->loc_obd->obd_name, logid->lgl_oid,
613                                logid->lgl_oseq, logid->lgl_ogen, rc);
614                         GOTO(out, rc);
615                 }
616                 if (dchild->d_inode == NULL) {
617                         l_dput(dchild);
618                         rc = -ENOENT;
619                         CERROR("%s: nonexistent llog #"LPX64"#"LPX64"#%08x: "
620                                "rc = %d\n", ctxt->loc_obd->obd_name,
621                                logid->lgl_oid, logid->lgl_oseq,
622                                logid->lgl_ogen, rc);
623                         GOTO(out, rc);
624                 }
625                 /* l_dentry_open will call dput(dchild) if there is an error */
626                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
627                                                  O_RDWR | O_LARGEFILE);
628                 if (IS_ERR(handle->lgh_file)) {
629                         rc = PTR_ERR(handle->lgh_file);
630                         handle->lgh_file = NULL;
631                         CERROR("%s: error opening llog #"LPX64"#"LPX64"#%08x: "
632                                "rc = %d\n", ctxt->loc_obd->obd_name,
633                                logid->lgl_oid, logid->lgl_oseq,
634                                logid->lgl_ogen, rc);
635                         GOTO(out, rc);
636                 }
637
638                 handle->lgh_id = *logid;
639         } else if (name) {
640                 handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR, name,
641                                                   O_RDWR | O_LARGEFILE, 0644);
642                 if (IS_ERR(handle->lgh_file)) {
643                         rc = PTR_ERR(handle->lgh_file);
644                         handle->lgh_file = NULL;
645                         if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
646                                 OBD_ALLOC(handle->lgh_name, strlen(name) + 1);
647                                 if (handle->lgh_name)
648                                         strcpy(handle->lgh_name, name);
649                                 else
650                                         GOTO(out, rc = -ENOMEM);
651                                 rc = 0;
652                         } else {
653                                 GOTO(out, rc);
654                         }
655                 } else {
656                         handle->lgh_id.lgl_oseq = FID_SEQ_LLOG;
657                         handle->lgh_id.lgl_oid =
658                                 handle->lgh_file->f_dentry->d_inode->i_ino;
659                         handle->lgh_id.lgl_ogen =
660                                 handle->lgh_file->f_dentry->d_inode->i_generation;
661                 }
662         } else {
663                 LASSERTF(open_param == LLOG_OPEN_NEW, "%#x\n", open_param);
664                 handle->lgh_file = NULL;
665         }
666
667         /* No new llog is expected but doesn't exist */
668         if (open_param != LLOG_OPEN_NEW && handle->lgh_file == NULL)
669                 GOTO(out_name, rc = -ENOENT);
670
671         RETURN(0);
672 out_name:
673         if (handle->lgh_name != NULL)
674                 OBD_FREE(handle->lgh_name, strlen(name) + 1);
675 out:
676         RETURN(rc);
677 }
678
679 static int llog_lvfs_exist(struct llog_handle *handle)
680 {
681         return (handle->lgh_file != NULL);
682 }
683
684 /* This is a callback from the llog_* functions.
685  * Assumes caller has already pushed us into the kernel context. */
686 static int llog_lvfs_create(const struct lu_env *env,
687                             struct llog_handle *handle,
688                             struct thandle *th)
689 {
690         struct llog_ctxt        *ctxt = handle->lgh_ctxt;
691         struct obd_device       *obd;
692         struct l_dentry         *dchild = NULL;
693         struct obdo             *oa = NULL;
694         int                      rc = 0;
695         int                      open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
696
697         ENTRY;
698
699         LASSERT(ctxt);
700         LASSERT(ctxt->loc_exp);
701         obd = ctxt->loc_exp->exp_obd;
702         LASSERT(handle->lgh_file == NULL);
703
704         if (handle->lgh_name) {
705                 handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
706                                                   handle->lgh_name,
707                                                   open_flags, 0644);
708                 if (IS_ERR(handle->lgh_file))
709                         RETURN(PTR_ERR(handle->lgh_file));
710
711                 handle->lgh_id.lgl_oseq = FID_SEQ_LLOG;
712                 handle->lgh_id.lgl_oid =
713                         handle->lgh_file->f_dentry->d_inode->i_ino;
714                 handle->lgh_id.lgl_ogen =
715                         handle->lgh_file->f_dentry->d_inode->i_generation;
716         } else {
717                 OBDO_ALLOC(oa);
718                 if (oa == NULL)
719                         RETURN(-ENOMEM);
720
721                 oa->o_seq = FID_SEQ_LLOG;
722                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
723
724                 rc = obd_create(NULL, ctxt->loc_exp, oa, NULL, NULL);
725                 if (rc)
726                         GOTO(out, rc);
727
728                 /* FIXME: rationalize the misuse of o_generation in
729                  *        this API along with mds_obd_{create,destroy}.
730                  *        Hopefully it is only an internal API issue. */
731 #define o_generation o_parent_oid
732                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
733                                              oa->o_generation, oa->o_seq);
734                 if (IS_ERR(dchild))
735                         GOTO(out, rc = PTR_ERR(dchild));
736
737                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
738                                                  open_flags);
739                 if (IS_ERR(handle->lgh_file))
740                         GOTO(out, rc = PTR_ERR(handle->lgh_file));
741
742                 handle->lgh_id.lgl_oseq = oa->o_seq;
743                 handle->lgh_id.lgl_oid = oa->o_id;
744                 handle->lgh_id.lgl_ogen = oa->o_generation;
745 out:
746                 OBDO_FREE(oa);
747         }
748         RETURN(rc);
749 }
750
751 static int llog_lvfs_close(const struct lu_env *env,
752                            struct llog_handle *handle)
753 {
754         int rc;
755
756         ENTRY;
757
758         if (handle->lgh_file == NULL)
759                 RETURN(0);
760         rc = filp_close(handle->lgh_file, 0);
761         if (rc)
762                 CERROR("%s: error closing llog #"LPX64"#"LPX64"#%08x: "
763                        "rc = %d\n", handle->lgh_ctxt->loc_obd->obd_name,
764                        handle->lgh_id.lgl_oid, handle->lgh_id.lgl_oseq,
765                        handle->lgh_id.lgl_ogen, rc);
766         handle->lgh_file = NULL;
767         if (handle->lgh_name) {
768                 OBD_FREE(handle->lgh_name, strlen(handle->lgh_name) + 1);
769                 handle->lgh_name = NULL;
770         }
771         RETURN(rc);
772 }
773
774 static int llog_lvfs_destroy(const struct lu_env *env,
775                              struct llog_handle *handle)
776 {
777         struct dentry *fdentry;
778         struct obdo *oa;
779         struct obd_device *obd = handle->lgh_ctxt->loc_exp->exp_obd;
780         char *dir;
781         void *th;
782         struct inode *inode;
783         int rc, rc1;
784         ENTRY;
785
786         dir = MOUNT_CONFIGS_DIR;
787
788         LASSERT(handle->lgh_file);
789         fdentry = handle->lgh_file->f_dentry;
790         inode = fdentry->d_parent->d_inode;
791         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
792                 struct lvfs_run_ctxt saved;
793                 struct vfsmount *mnt = mntget(handle->lgh_file->f_vfsmnt);
794
795                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
796                 dget(fdentry);
797                 rc = llog_lvfs_close(env, handle);
798                 if (rc == 0) {
799                         mutex_lock_nested(&inode->i_mutex, I_MUTEX_PARENT);
800                         rc = ll_vfs_unlink(inode, fdentry, mnt);
801                         mutex_unlock(&inode->i_mutex);
802                 }
803                 mntput(mnt);
804
805                 dput(fdentry);
806                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
807                 RETURN(rc);
808         }
809
810         OBDO_ALLOC(oa);
811         if (oa == NULL)
812                 RETURN(-ENOMEM);
813
814         oa->o_id = handle->lgh_id.lgl_oid;
815         oa->o_seq = handle->lgh_id.lgl_oseq;
816         oa->o_generation = handle->lgh_id.lgl_ogen;
817 #undef o_generation
818         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
819
820         rc = llog_lvfs_close(env, handle);
821         if (rc)
822                 GOTO(out, rc);
823
824         th = fsfilt_start_log(obd, inode, FSFILT_OP_UNLINK, NULL, 1);
825         if (IS_ERR(th)) {
826                 CERROR("fsfilt_start failed: %ld\n", PTR_ERR(th));
827                 GOTO(out, rc = PTR_ERR(th));
828         }
829
830         rc = obd_destroy(NULL, handle->lgh_ctxt->loc_exp, oa,
831                          NULL, NULL, NULL, NULL);
832
833         rc1 = fsfilt_commit(obd, inode, th, 0);
834         if (rc == 0 && rc1 != 0)
835                 rc = rc1;
836  out:
837         OBDO_FREE(oa);
838         RETURN(rc);
839 }
840
841 /* reads the catalog list */
842 int llog_get_cat_list(struct obd_device *disk_obd,
843                       char *name, int idx, int count, struct llog_catid *idarray)
844 {
845         struct lvfs_run_ctxt saved;
846         struct l_file *file;
847         int rc, rc1 = 0;
848         int size = sizeof(*idarray) * count;
849         loff_t off = idx *  sizeof(*idarray);
850         ENTRY;
851
852         if (!count)
853                 RETURN(0);
854
855         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
856         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
857         if (!file || IS_ERR(file)) {
858                 rc = PTR_ERR(file);
859                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
860                        name, rc);
861                 GOTO(out, rc);
862         }
863
864         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
865                 CERROR("%s is not a regular file!: mode = %o\n", name,
866                        file->f_dentry->d_inode->i_mode);
867                 GOTO(out, rc = -ENOENT);
868         }
869
870         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
871                (int)i_size_read(file->f_dentry->d_inode), size);
872
873         /* read for new ost index or for empty file */
874         memset(idarray, 0, size);
875         if (i_size_read(file->f_dentry->d_inode) < off)
876                 GOTO(out, rc = 0);
877
878         rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
879         if (rc) {
880                 CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
881                 GOTO(out, rc);
882         }
883
884         EXIT;
885  out:
886         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
887         if (file && !IS_ERR(file))
888                 rc1 = filp_close(file, 0);
889         if (rc == 0)
890                 rc = rc1;
891         return rc;
892 }
893 EXPORT_SYMBOL(llog_get_cat_list);
894
895 /* writes the cat list */
896 int llog_put_cat_list(struct obd_device *disk_obd,
897                       char *name, int idx, int count, struct llog_catid *idarray)
898 {
899         struct lvfs_run_ctxt saved;
900         struct l_file *file;
901         int rc, rc1 = 0;
902         int size = sizeof(*idarray) * count;
903         loff_t off = idx * sizeof(*idarray);
904
905         if (!count)
906                 GOTO(out1, rc = 0);
907
908         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
909         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
910         if (!file || IS_ERR(file)) {
911                 rc = PTR_ERR(file);
912                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
913                        name, rc);
914                 GOTO(out, rc);
915         }
916
917         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
918                 CERROR("%s is not a regular file!: mode = %o\n", name,
919                        file->f_dentry->d_inode->i_mode);
920                 GOTO(out, rc = -ENOENT);
921         }
922
923         rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
924         if (rc) {
925                 CDEBUG(D_INODE,"OBD filter: error writeing %s: rc %d\n",
926                        name, rc);
927                 GOTO(out, rc);
928         }
929
930 out:
931         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
932         if (file && !IS_ERR(file))
933                 rc1 = filp_close(file, 0);
934
935         if (rc == 0)
936                 rc = rc1;
937 out1:
938         RETURN(rc);
939 }
940 EXPORT_SYMBOL(llog_put_cat_list);
941
942 static int llog_lvfs_declare_create(const struct lu_env *env,
943                                     struct llog_handle *res,
944                                     struct thandle *th)
945 {
946         return 0;
947 }
948
949 static int llog_lvfs_declare_write_rec(const struct lu_env *env,
950                                        struct llog_handle *loghandle,
951                                        struct llog_rec_hdr *rec,
952                                        int idx, struct thandle *th)
953 {
954         return 0;
955 }
956
957 struct llog_operations llog_lvfs_ops = {
958         .lop_write_rec          = llog_lvfs_write_rec,
959         .lop_next_block         = llog_lvfs_next_block,
960         .lop_prev_block         = llog_lvfs_prev_block,
961         .lop_read_header        = llog_lvfs_read_header,
962         .lop_create             = llog_lvfs_create,
963         .lop_destroy            = llog_lvfs_destroy,
964         .lop_close              = llog_lvfs_close,
965         .lop_open               = llog_lvfs_open,
966         .lop_exist              = llog_lvfs_exist,
967         .lop_declare_create     = llog_lvfs_declare_create,
968         .lop_declare_write_rec  = llog_lvfs_declare_write_rec,
969 };
970 EXPORT_SYMBOL(llog_lvfs_ops);
971 #else /* !__KERNEL__ */
972 int llog_get_cat_list(struct obd_device *disk_obd,
973                       char *name, int idx, int count,
974                       struct llog_catid *idarray)
975 {
976         LBUG();
977         return 0;
978 }
979
980 int llog_put_cat_list(struct obd_device *disk_obd,
981                       char *name, int idx, int count,
982                       struct llog_catid *idarray)
983 {
984         LBUG();
985         return 0;
986 }
987
988 struct llog_operations llog_lvfs_ops = {};
989 #endif