Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * OST<->MDS recovery logging infrastructure.
26  *
27  * Invariants in implementation:
28  * - we do not share logs among different OST<->MDS connections, so that
29  *   if an OST or MDS fails it need only look at log(s) relevant to itself
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOG
33
34 #ifndef EXPORT_SYMTAB
35 #define EXPORT_SYMTAB
36 #endif
37
38 #ifndef __KERNEL__
39 #include <liblustre.h>
40 #endif
41
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <obd_ost.h>
46 #include <libcfs/list.h>
47 #include <lvfs.h>
48 #include <lustre_fsfilt.h>
49 #include <lustre_disk.h>
50 #include "llog_internal.h"
51
52 #if defined(__KERNEL__) && defined(LLOG_LVFS)
53
54 static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
55                                 int len, int index)
56 {
57         struct llog_rec_hdr rec = { 0 };
58         struct llog_rec_tail tail;
59         int rc;
60         ENTRY;
61
62         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
63
64         tail.lrt_len = rec.lrh_len = len;
65         tail.lrt_index = rec.lrh_index = index;
66         rec.lrh_type = LLOG_PAD_MAGIC;
67
68         rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
69         if (rc) {
70                 CERROR("error writing padding record: rc %d\n", rc);
71                 goto out;
72         }
73
74         file->f_pos += len - sizeof(rec) - sizeof(tail);
75         rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
76         if (rc) {
77                 CERROR("error writing padding record: rc %d\n", rc);
78                 goto out;
79         }
80
81  out:
82         RETURN(rc);
83 }
84
85 static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
86                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
87 {
88         int rc;
89         struct llog_rec_tail end;
90         loff_t saved_off = file->f_pos;
91         int buflen = rec->lrh_len;
92
93         ENTRY;
94
95         file->f_pos = off;
96
97         if (buflen == 0) 
98                 CWARN("0-length record\n");
99
100         if (!buf) {
101                 rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
102                 if (rc) {
103                         CERROR("error writing log record: rc %d\n", rc);
104                         goto out;
105                 }
106                 GOTO(out, rc = 0);
107         }
108
109         /* the buf case */
110         rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
111         rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log hdr: rc %d\n", rc);
114                 goto out;
115         }
116
117         rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
118         if (rc) {
119                 CERROR("error writing log buffer: rc %d\n", rc);
120                 goto out;
121         }
122
123         end.lrt_len = rec->lrh_len;
124         end.lrt_index = rec->lrh_index;
125         rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
126         if (rc) {
127                 CERROR("error writing log tail: rc %d\n", rc);
128                 goto out;
129         }
130
131         rc = 0;
132  out:
133         if (saved_off > file->f_pos)
134                 file->f_pos = saved_off;
135         LASSERT(rc <= 0);
136         RETURN(rc);
137 }
138
139 static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
140                                 void *buf, int size, loff_t off)
141 {
142         loff_t offset = off;
143         int rc;
144         ENTRY;
145
146         rc = fsfilt_read_record(obd, file, buf, size, &offset);
147         if (rc) {
148                 CERROR("error reading log record: rc %d\n", rc);
149                 RETURN(rc);
150         }
151         RETURN(0);
152 }
153
154 static int llog_lvfs_read_header(struct llog_handle *handle)
155 {
156         struct obd_device *obd;
157         int rc;
158         ENTRY;
159
160         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
161
162         obd = handle->lgh_ctxt->loc_exp->exp_obd;
163
164         if (i_size_read(handle->lgh_file->f_dentry->d_inode) == 0) {
165                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
166                 RETURN(LLOG_EEMPTY);
167         }
168
169         rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
170                                  LLOG_CHUNK_SIZE, 0);
171         if (rc) {
172                 CERROR("error reading log header from %.*s\n",
173                        handle->lgh_file->f_dentry->d_name.len,
174                        handle->lgh_file->f_dentry->d_name.name);
175         } else {
176                 struct llog_rec_hdr *llh_hdr = &handle->lgh_hdr->llh_hdr;
177
178                 if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
179                         lustre_swab_llog_hdr(handle->lgh_hdr);
180
181                 if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
182                         CERROR("bad log %.*s header magic: %#x (expected %#x)\n",
183                                handle->lgh_file->f_dentry->d_name.len,
184                                handle->lgh_file->f_dentry->d_name.name,
185                                llh_hdr->lrh_type, LLOG_HDR_MAGIC);
186                         rc = -EIO;
187                 } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
188                         CERROR("incorrectly sized log %.*s header: %#x "
189                                "(expected %#x)\n",
190                                handle->lgh_file->f_dentry->d_name.len,
191                                handle->lgh_file->f_dentry->d_name.name,
192                                llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
193                         CERROR("you may need to re-run lconf --write_conf.\n");
194                         rc = -EIO;
195                 }
196         }
197
198         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
199         handle->lgh_file->f_pos = i_size_read(handle->lgh_file->f_dentry->d_inode);
200
201         RETURN(rc);
202 }
203
204 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
205 /* appends if idx == -1, otherwise overwrites record idx. */
206 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
207                                struct llog_rec_hdr *rec,
208                                struct llog_cookie *reccookie, int cookiecount,
209                                void *buf, int idx)
210 {
211         struct llog_log_hdr *llh;
212         int reclen = rec->lrh_len, index, rc;
213         struct llog_rec_tail *lrt;
214         struct obd_device *obd;
215         struct file *file;
216         size_t left;
217         ENTRY;
218
219         llh = loghandle->lgh_hdr;
220         file = loghandle->lgh_file;
221         obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
222
223         /* record length should not bigger than LLOG_CHUNK_SIZE */
224         if (buf)
225                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
226                       sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
227         else
228                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
229         if (rc)
230                 RETURN(rc);
231
232         if (buf)
233                 /* write_blob adds header and tail to lrh_len. */ 
234                 reclen = sizeof(*rec) + rec->lrh_len + 
235                          sizeof(struct llog_rec_tail);
236
237         if (idx != -1) {
238                 loff_t saved_offset;
239
240                 /* no header: only allowed to insert record 1 */
241                 if (idx != 1 && !i_size_read(file->f_dentry->d_inode)) {
242                         CERROR("idx != -1 in empty log\n");
243                         LBUG();
244                 }
245
246                 if (idx && llh->llh_size && llh->llh_size != rec->lrh_len)
247                         RETURN(-EINVAL);
248
249                 if (!ext2_test_bit(idx, llh->llh_bitmap)) 
250                         CERROR("Modify unset record %u\n", idx);
251                 if (idx != rec->lrh_index)
252                         CERROR("Index mismatch %d %u\n", idx, rec->lrh_index);
253
254                 rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
255                 /* we are done if we only write the header or on error */
256                 if (rc || idx == 0)
257                         RETURN(rc);
258
259                 /* Assumes constant lrh_len */
260                 saved_offset = sizeof(*llh) + (idx - 1) * reclen;
261
262                 if (buf) {
263                         struct llog_rec_hdr check;
264
265                         /* We assume that caller has set lgh_cur_* */
266                         saved_offset = loghandle->lgh_cur_offset;
267                         CDEBUG(D_OTHER,
268                                "modify record "LPX64": idx:%d/%u/%d, len:%u "
269                                "offset %llu\n",
270                                loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
271                                loghandle->lgh_cur_idx, rec->lrh_len,
272                                (long long)(saved_offset - sizeof(*llh)));
273                         if (rec->lrh_index != loghandle->lgh_cur_idx) {
274                                 CERROR("modify idx mismatch %u/%d\n",
275                                        idx, loghandle->lgh_cur_idx);
276                                 RETURN(-EFAULT);
277                         }
278 #if 1  /* FIXME remove this safety check at some point */
279                         /* Verify that the record we're modifying is the 
280                            right one. */
281                         rc = llog_lvfs_read_blob(obd, file, &check,
282                                                  sizeof(check), saved_offset);
283                         if (check.lrh_index != idx || check.lrh_len != reclen) {
284                                 CERROR("Bad modify idx %u/%u size %u/%u (%d)\n",
285                                        idx, check.lrh_index, reclen, 
286                                        check.lrh_len, rc);
287                                 RETURN(-EFAULT);
288                         }
289 #endif
290                 }
291
292                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
293                 if (rc == 0 && reccookie) {
294                         reccookie->lgc_lgl = loghandle->lgh_id;
295                         reccookie->lgc_index = idx;
296                         rc = 1;
297                 }
298                 RETURN(rc);
299         }
300
301         /* Make sure that records don't cross a chunk boundary, so we can
302          * process them page-at-a-time if needed.  If it will cross a chunk
303          * boundary, write in a fake (but referenced) entry to pad the chunk.
304          *
305          * We know that llog_current_log() will return a loghandle that is
306          * big enough to hold reclen, so all we care about is padding here.
307          */
308         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
309
310         /* NOTE: padding is a record, but no bit is set */
311         if (left != 0 && left != reclen &&
312             left < (reclen + LLOG_MIN_REC_SIZE)) {
313                 loghandle->lgh_last_idx++;
314                 rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
315                 if (rc)
316                         RETURN(rc);
317                 /* if it's the last idx in log file, then return -ENOSPC */
318                 if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
319                         RETURN(-ENOSPC);
320         }
321
322         loghandle->lgh_last_idx++;
323         index = loghandle->lgh_last_idx;
324         LASSERT(index < LLOG_BITMAP_SIZE(llh));
325         rec->lrh_index = index;
326         if (buf == NULL) {
327                 lrt = (struct llog_rec_tail *)
328                         ((char *)rec + rec->lrh_len - sizeof(*lrt));
329                 lrt->lrt_len = rec->lrh_len;
330                 lrt->lrt_index = rec->lrh_index;
331         }
332         if (ext2_set_bit(index, llh->llh_bitmap)) {
333                 CERROR("argh, index %u already set in log bitmap?\n", index);
334                 LBUG(); /* should never happen */
335         }
336         llh->llh_count++;
337         llh->llh_tail.lrt_index = index;
338
339         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
340         if (rc)
341                 RETURN(rc);
342
343         rc = llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
344         if (rc)
345                 RETURN(rc);
346
347         CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
348                loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
349         if (rc == 0 && reccookie) {
350                 reccookie->lgc_lgl = loghandle->lgh_id;
351                 reccookie->lgc_index = index;
352                 if ((rec->lrh_type == MDS_UNLINK_REC) || 
353                                 (rec->lrh_type == MDS_SETATTR_REC))
354                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
355                 else if (rec->lrh_type == OST_SZ_REC)
356                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
357                 else if (rec->lrh_type == OST_RAID1_REC)
358                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
359                 else
360                         reccookie->lgc_subsys = -1;
361                 rc = 1;
362         }
363         if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
364                 rc = 1;
365
366         RETURN(rc);
367 }
368
369 /* We can skip reading at least as many log blocks as the number of
370 * minimum sized log records we are skipping.  If it turns out
371 * that we are not far enough along the log (because the
372 * actual records are larger than minimum size) we just skip
373 * some more records. */
374
375 static void llog_skip_over(__u64 *off, int curr, int goal)
376 {
377         if (goal <= curr)
378                 return;
379         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
380                 ~(LLOG_CHUNK_SIZE - 1);
381 }
382
383
384 /* sets:
385  *  - cur_offset to the furthest point read in the log file
386  *  - cur_idx to the log index preceeding cur_offset
387  * returns -EIO/-EINVAL on error
388  */
389 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
390                                 int next_idx, __u64 *cur_offset, void *buf,
391                                 int len)
392 {
393         int rc;
394         ENTRY;
395
396         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
397                 RETURN(-EINVAL);
398
399         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
400                next_idx, *cur_idx, *cur_offset);
401
402         while (*cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
403                 struct llog_rec_hdr *rec;
404                 struct llog_rec_tail *tail;
405                 loff_t ppos;
406
407                 llog_skip_over(cur_offset, *cur_idx, next_idx);
408
409                 ppos = *cur_offset;
410                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
411                                         loghandle->lgh_file, buf, len,
412                                         &ppos);
413                 if (rc) {
414                         CERROR("Cant read llog block at log id "LPU64
415                                "/%u offset "LPU64"\n",
416                                loghandle->lgh_id.lgl_oid,
417                                loghandle->lgh_id.lgl_ogen,
418                                *cur_offset);
419                         RETURN(rc);
420                 }
421
422                 /* put number of bytes read into rc to make code simpler */
423                 rc = ppos - *cur_offset;
424                 *cur_offset = ppos;
425
426                 if (rc < len) {
427                         /* signal the end of the valid buffer to llog_process */
428                         memset(buf + rc, 0, len - rc);
429                 }
430
431                 if (rc == 0) /* end of file, nothing to do */
432                         RETURN(0);
433
434                 if (rc < sizeof(*tail)) {
435                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
436                                LPU64"\n", loghandle->lgh_id.lgl_oid,
437                                loghandle->lgh_id.lgl_ogen, *cur_offset);
438                         RETURN(-EINVAL);
439                 }
440
441                 rec = buf;
442                 tail = (struct llog_rec_tail *)((char *)buf + rc -
443                                                 sizeof(struct llog_rec_tail));
444
445                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) {
446                         lustre_swab_llog_rec(rec, tail);
447                 }
448
449                 *cur_idx = tail->lrt_index;
450
451                 /* this shouldn't happen */
452                 if (tail->lrt_index == 0) {
453                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
454                                LPU64"\n", loghandle->lgh_id.lgl_oid,
455                                loghandle->lgh_id.lgl_ogen, *cur_offset);
456                         RETURN(-EINVAL);
457                 }
458                 if (tail->lrt_index < next_idx)
459                         continue;
460
461                 /* sanity check that the start of the new buffer is no farther
462                  * than the record that we wanted.  This shouldn't happen. */
463                 if (rec->lrh_index > next_idx) {
464                         CERROR("missed desired record? %u > %u\n",
465                                rec->lrh_index, next_idx);
466                         RETURN(-ENOENT);
467                 }
468                 RETURN(0);
469         }
470         RETURN(-EIO);
471 }
472
473 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
474                                 int prev_idx, void *buf, int len)
475 {
476         __u64 cur_offset;
477         int rc;
478         ENTRY;
479
480         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
481                 RETURN(-EINVAL);
482
483         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
484
485         cur_offset = LLOG_CHUNK_SIZE;
486         llog_skip_over(&cur_offset, 0, prev_idx);
487
488         while (cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
489                 struct llog_rec_hdr *rec;
490                 struct llog_rec_tail *tail;
491                 loff_t ppos;
492
493                 ppos = cur_offset;
494
495                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
496                                         loghandle->lgh_file, buf, len,
497                                         &ppos);
498                 if (rc) {
499                         CERROR("Cant read llog block at log id "LPU64
500                                "/%u offset "LPU64"\n",
501                                loghandle->lgh_id.lgl_oid,
502                                loghandle->lgh_id.lgl_ogen,
503                                cur_offset);
504                         RETURN(rc);
505                 }
506
507                 /* put number of bytes read into rc to make code simpler */
508                 rc = ppos - cur_offset;
509                 cur_offset = ppos;
510
511                 if (rc == 0) /* end of file, nothing to do */
512                         RETURN(0);
513
514                 if (rc < sizeof(*tail)) {
515                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
516                                LPU64"\n", loghandle->lgh_id.lgl_oid,
517                                loghandle->lgh_id.lgl_ogen, cur_offset);
518                         RETURN(-EINVAL);
519                 }
520
521                 tail = buf + rc - sizeof(struct llog_rec_tail);
522
523                 /* this shouldn't happen */
524                 if (tail->lrt_index == 0) {
525                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
526                                LPU64"\n", loghandle->lgh_id.lgl_oid,
527                                loghandle->lgh_id.lgl_ogen, cur_offset);
528                         RETURN(-EINVAL);
529                 }
530                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
531                         continue;
532
533                 /* sanity check that the start of the new buffer is no farther
534                  * than the record that we wanted.  This shouldn't happen. */
535                 rec = buf;
536                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
537                         CERROR("missed desired record? %u > %u\n",
538                                le32_to_cpu(rec->lrh_index), prev_idx);
539                         RETURN(-ENOENT);
540                 }
541                 RETURN(0);
542         }
543         RETURN(-EIO);
544 }
545
546 static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
547 {
548         char *logname;
549         struct file *filp;
550         int len;
551
552         OBD_ALLOC(logname, PATH_MAX);
553         if (logname == NULL)
554                 return ERR_PTR(-ENOMEM);
555
556         len = snprintf(logname, PATH_MAX, "%s/%s", dir, name);
557         if (len >= PATH_MAX - 1) {
558                 filp = ERR_PTR(-ENAMETOOLONG);
559         } else {
560                 filp = l_filp_open(logname, flags, mode);
561                 if (IS_ERR(filp))
562                         CERROR("logfile creation %s: %ld\n", logname,
563                                PTR_ERR(filp));
564         }
565         OBD_FREE(logname, PATH_MAX);
566         return filp;
567 }
568
569 /* This is a callback from the llog_* functions.
570  * Assumes caller has already pushed us into the kernel context. */
571 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
572                             struct llog_logid *logid, char *name)
573 {
574         struct llog_handle *handle;
575         struct obd_device *obd;
576         struct l_dentry *dchild = NULL;
577         struct obdo *oa = NULL;
578         int rc = 0, cleanup_phase = 1;
579         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
580         ENTRY;
581
582         handle = llog_alloc_handle();
583         if (handle == NULL)
584                 RETURN(-ENOMEM);
585         *res = handle;
586
587         LASSERT(ctxt);
588         LASSERT(ctxt->loc_exp);
589         obd = ctxt->loc_exp->exp_obd;
590
591         if (logid != NULL) {
592                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
593                                              logid->lgl_ogen, logid->lgl_ogr);
594
595                 if (IS_ERR(dchild)) {
596                         rc = PTR_ERR(dchild);
597                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
598                                logid->lgl_oid, logid->lgl_ogen, rc);
599                         GOTO(cleanup, rc);
600                 }
601
602                 cleanup_phase = 2;
603                 if (dchild->d_inode == NULL) {
604                         rc = -ENOENT;
605                         CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n",
606                                logid->lgl_oid, logid->lgl_ogr, rc);
607                         GOTO(cleanup, rc);
608                 }
609
610                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
611                                                     O_RDWR | O_LARGEFILE);
612                 if (IS_ERR(handle->lgh_file)) {
613                         rc = PTR_ERR(handle->lgh_file);
614                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
615                                logid->lgl_oid, logid->lgl_ogen, rc);
616                         GOTO(cleanup, rc);
617                 }
618
619                 /* assign the value of lgh_id for handle directly */
620                 handle->lgh_id = *logid;
621
622         } else if (name) {
623                 /* COMPAT_146 */
624                 if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME) == 0) {
625                         handle->lgh_file = llog_filp_open(MDT_LOGS_DIR, name, 
626                                                           open_flags, 0644);
627                 } else {
628                         /* end COMPAT_146 */
629                         handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
630                                                           name, open_flags, 
631                                                           0644);
632                 }
633                 if (IS_ERR(handle->lgh_file))
634                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
635
636                 handle->lgh_id.lgl_ogr = 1;
637                 handle->lgh_id.lgl_oid =
638                         handle->lgh_file->f_dentry->d_inode->i_ino;
639                 handle->lgh_id.lgl_ogen =
640                         handle->lgh_file->f_dentry->d_inode->i_generation;
641         } else {
642                 OBDO_ALLOC(oa);
643                 if (oa == NULL)
644                         GOTO(cleanup, rc = -ENOMEM);
645
646                 oa->o_gr = FILTER_GROUP_LLOG;
647                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
648
649                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
650                 if (rc)
651                         GOTO(cleanup, rc);
652
653                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
654                                              oa->o_generation, oa->o_gr);
655
656                 if (IS_ERR(dchild))
657                         GOTO(cleanup, rc = PTR_ERR(dchild));
658                 cleanup_phase = 2;
659                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
660                                                  open_flags);
661                 if (IS_ERR(handle->lgh_file))
662                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
663
664                 handle->lgh_id.lgl_ogr = oa->o_gr;
665                 handle->lgh_id.lgl_oid = oa->o_id;
666                 handle->lgh_id.lgl_ogen = oa->o_generation;
667         }
668
669         handle->lgh_ctxt = ctxt;
670  finish:
671         if (oa)
672                 OBDO_FREE(oa);
673         RETURN(rc);
674 cleanup:
675         switch (cleanup_phase) {
676         case 2:
677                 l_dput(dchild);
678         case 1:
679                 llog_free_handle(handle);
680         }
681         goto finish;
682 }
683
684 static int llog_lvfs_close(struct llog_handle *handle)
685 {
686         int rc;
687         ENTRY;
688
689         rc = filp_close(handle->lgh_file, 0);
690         if (rc)
691                 CERROR("error closing log: rc %d\n", rc);
692         RETURN(rc);
693 }
694
695 static int llog_lvfs_destroy(struct llog_handle *handle)
696 {
697         struct dentry *fdentry;
698         struct obdo *oa;
699         struct obd_device *obd = handle->lgh_ctxt->loc_exp->exp_obd;
700         char *dir;
701         int rc;
702         ENTRY;
703
704         /* COMPAT_146 */
705         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME) == 0)
706                 dir = MDT_LOGS_DIR;
707         else
708                 /* end COMPAT_146 */
709                 dir = MOUNT_CONFIGS_DIR;
710
711         fdentry = handle->lgh_file->f_dentry;
712         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
713                 struct inode *inode = fdentry->d_parent->d_inode;
714                 struct lvfs_run_ctxt saved;
715
716                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
717                 dget(fdentry);
718                 rc = llog_lvfs_close(handle);
719
720                 if (rc == 0) {
721                         LOCK_INODE_MUTEX(inode);
722                         rc = vfs_unlink(inode, fdentry);
723                         UNLOCK_INODE_MUTEX(inode);
724                 }
725
726                 dput(fdentry);
727                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
728                 RETURN(rc);
729         }
730
731         OBDO_ALLOC(oa);
732         if (oa == NULL)
733                 RETURN(-ENOMEM);
734
735         oa->o_id = handle->lgh_id.lgl_oid;
736         oa->o_gr = handle->lgh_id.lgl_ogr;
737         oa->o_generation = handle->lgh_id.lgl_ogen;
738         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
739
740         rc = llog_lvfs_close(handle);
741         if (rc)
742                 GOTO(out, rc);
743
744         rc = obd_destroy(handle->lgh_ctxt->loc_exp, oa, NULL, NULL, NULL);
745  out:
746         OBDO_FREE(oa);
747         RETURN(rc);
748 }
749
750 /* reads the catalog list */
751 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
752                       char *name, int count, struct llog_catid *idarray)
753 {
754         struct lvfs_run_ctxt saved;
755         struct l_file *file;
756         int rc;
757         int size = sizeof(*idarray) * count;
758         loff_t off = 0;
759         ENTRY;
760
761         if (!count) 
762                 RETURN(0);
763
764         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
765         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
766         if (!file || IS_ERR(file)) {
767                 rc = PTR_ERR(file);
768                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
769                        name, rc);
770                 GOTO(out, rc);
771         }
772         
773         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
774                 CERROR("%s is not a regular file!: mode = %o\n", name,
775                        file->f_dentry->d_inode->i_mode);
776                 GOTO(out, rc = -ENOENT);
777         }
778
779         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
780                (int)i_size_read(file->f_dentry->d_inode), size);
781
782         rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
783         if (rc) {
784                 CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
785                 GOTO(out, rc);
786         }
787
788         EXIT;
789  out:
790         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
791         if (file && !IS_ERR(file))
792                 rc = filp_close(file, 0);
793         return rc;
794 }
795 EXPORT_SYMBOL(llog_get_cat_list);
796
797 /* writes the cat list */
798 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
799                       char *name, int count, struct llog_catid *idarray)
800 {
801         struct lvfs_run_ctxt saved;
802         struct l_file *file;
803         int rc;
804         int size = sizeof(*idarray) * count;
805         loff_t off = 0;
806
807         if (!count) 
808                 return (0);
809
810         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
811         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
812         if (!file || IS_ERR(file)) {
813                 rc = PTR_ERR(file);
814                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
815                        name, rc);
816                 GOTO(out, rc);
817         }
818
819         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
820                 CERROR("%s is not a regular file!: mode = %o\n", name,
821                        file->f_dentry->d_inode->i_mode);
822                 GOTO(out, rc = -ENOENT);
823         }
824
825         rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
826         if (rc) {
827                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
828                        name, rc);
829                 GOTO(out, rc);
830         }
831
832  out:
833         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
834         if (file && !IS_ERR(file))
835                 rc = filp_close(file, 0);
836         RETURN(rc);
837 }
838
839 struct llog_operations llog_lvfs_ops = {
840         lop_write_rec:   llog_lvfs_write_rec,
841         lop_next_block:  llog_lvfs_next_block,
842         lop_prev_block:  llog_lvfs_prev_block,
843         lop_read_header: llog_lvfs_read_header,
844         lop_create:      llog_lvfs_create,
845         lop_destroy:     llog_lvfs_destroy,
846         lop_close:       llog_lvfs_close,
847         //        lop_cancel: llog_lvfs_cancel,
848 };
849
850 EXPORT_SYMBOL(llog_lvfs_ops);
851
852 #else /* !__KERNEL__ */
853
854 static int llog_lvfs_read_header(struct llog_handle *handle)
855 {
856         LBUG();
857         return 0;
858 }
859
860 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
861                                struct llog_rec_hdr *rec,
862                                struct llog_cookie *reccookie, int cookiecount,
863                                void *buf, int idx)
864 {
865         LBUG();
866         return 0;
867 }
868
869 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
870                                 int next_idx, __u64 *cur_offset, void *buf,
871                                 int len)
872 {
873         LBUG();
874         return 0;
875 }
876
877 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
878                                 int prev_idx, void *buf, int len)
879 {
880         LBUG();
881         return 0;
882 }
883
884 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
885                             struct llog_logid *logid, char *name)
886 {
887         LBUG();
888         return 0;
889 }
890
891 static int llog_lvfs_close(struct llog_handle *handle)
892 {
893         LBUG();
894         return 0;
895 }
896
897 static int llog_lvfs_destroy(struct llog_handle *handle)
898 {
899         LBUG();
900         return 0;
901 }
902
903 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
904                       char *name, int count, struct llog_catid *idarray)
905 {
906         LBUG();
907         return 0;
908 }
909
910 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
911                       char *name, int count, struct llog_catid *idarray)
912 {
913         LBUG();
914         return 0;
915 }
916
917 struct llog_operations llog_lvfs_ops = {
918         lop_write_rec:   llog_lvfs_write_rec,
919         lop_next_block:  llog_lvfs_next_block,
920         lop_prev_block:  llog_lvfs_prev_block,
921         lop_read_header: llog_lvfs_read_header,
922         lop_create:      llog_lvfs_create,
923         lop_destroy:     llog_lvfs_destroy,
924         lop_close:       llog_lvfs_close,
925 //        lop_cancel:      llog_lvfs_cancel,
926 };
927 #endif