Whamcloud - gitweb
Land b_smallfix onto HEAD (20040512_1806)
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lvfs.h>
42 #include <linux/lustre_fsfilt.h>
43 #include <linux/lustre_log.h>
44
45 #ifdef __KERNEL__
46
47 static int llog_lvfs_pad(struct llog_ctxt *ctxt, struct l_file *file,
48                          int len, int index)
49 {
50         struct llog_rec_hdr rec;
51         struct llog_rec_tail tail;
52         int rc;
53         ENTRY;
54
55         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
56
57         tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
58         tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
59         rec.lrh_type = 0;
60
61         rc = llog_fsfilt_write_record(ctxt, file, &rec, sizeof(rec),
62                                       &file->f_pos, 0);
63         if (rc) {
64                 CERROR("error writing padding record: rc %d\n", rc);
65                 goto out;
66         }
67
68         file->f_pos += len - sizeof(rec) - sizeof(tail);
69         rc = llog_fsfilt_write_record(ctxt, file, &tail, sizeof(tail),
70                                       &file->f_pos, 0);
71         if (rc) {
72                 CERROR("error writing padding record: rc %d\n", rc);
73                 goto out;
74         }
75
76  out:
77         RETURN(rc);
78 }
79
80 static int llog_lvfs_write_blob(struct llog_ctxt *ctxt, struct l_file *file,
81                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
82 {
83         int rc;
84         struct llog_rec_tail end;
85         loff_t saved_off = file->f_pos;
86         int buflen = le32_to_cpu(rec->lrh_len);
87
88         ENTRY;
89         file->f_pos = off;
90
91         if (!buf) {
92                 rc = llog_fsfilt_write_record(ctxt, file, rec, buflen,
93                                               &file->f_pos, 0);
94                 if (rc) {
95                         CERROR("error writing log record: rc %d\n", rc);
96                         goto out;
97                 }
98                 GOTO(out, rc = 0);
99         }
100
101         /* the buf case */
102         rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
103         rc = llog_fsfilt_write_record(ctxt, file, rec, sizeof(*rec),
104                                       &file->f_pos, 0);
105         if (rc) {
106                 CERROR("error writing log hdr: rc %d\n", rc);
107                 goto out;
108         }
109
110         rc = llog_fsfilt_write_record(ctxt, file, buf, buflen,
111                                       &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log buffer: rc %d\n", rc);
114                 goto out;
115         }
116
117         end.lrt_len = rec->lrh_len;
118         end.lrt_index = rec->lrh_index;
119         rc = llog_fsfilt_write_record(ctxt, file, &end, sizeof(end),
120                                       &file->f_pos, 0);
121         if (rc) {
122                 CERROR("error writing log tail: rc %d\n", rc);
123                 goto out;
124         }
125
126         rc = 0;
127  out:
128         if (saved_off > file->f_pos)
129                 file->f_pos = saved_off;
130         LASSERT(rc <= 0);
131         RETURN(rc);
132 }
133
134 static int llog_lvfs_read_blob(struct llog_ctxt *ctxt, struct l_file *file,
135                                void *buf, int size, loff_t off)
136 {
137         loff_t offset = off;
138         int rc;
139         ENTRY;
140
141         rc = llog_fsfilt_read_record(ctxt, file, buf, size, &offset);
142         if (rc) {
143                 CERROR("error reading log record: rc %d\n", rc);
144                 RETURN(rc);
145         }
146         RETURN(0);
147 }
148
149 static int llog_lvfs_read_header(struct llog_handle *handle)
150 {
151         struct llog_ctxt *ctxt = handle->lgh_ctxt;
152         int rc;
153         ENTRY;
154
155         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
156         LASSERT(ctxt != NULL);
157
158         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
159                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
160                 RETURN(LLOG_EEMPTY);
161         }
162
163         rc = llog_lvfs_read_blob(ctxt, handle->lgh_file, handle->lgh_hdr,
164                                  LLOG_CHUNK_SIZE, 0);
165         if (rc)
166                 CERROR("error reading log header\n");
167
168         handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
169         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
170
171         RETURN(rc);
172 }
173
174 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
175 /* appends if idx == -1, otherwise overwrites record idx. */
176 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
177                                struct llog_rec_hdr *rec,
178                                struct llog_cookie *reccookie,
179                                int cookiecount,
180                                void *buf, int idx)
181 {
182         struct llog_log_hdr *llh;
183         int reclen = le32_to_cpu(rec->lrh_len), index, rc;
184         struct llog_rec_tail *lrt;
185         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
186         struct file *file;
187         loff_t offset;
188         size_t left;
189         ENTRY;
190
191         llh = loghandle->lgh_hdr;
192         file = loghandle->lgh_file;
193
194         /* record length should not bigger than LLOG_CHUNK_SIZE */
195         if (buf)
196                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
197                       - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
198         else
199                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
200         if (rc)
201                 RETURN(rc);
202
203         if (idx != -1) {
204                 loff_t saved_offset;
205
206                 /* no header: only allowed to insert record 1 */
207                 if (idx != 1 && !file->f_dentry->d_inode->i_size) {
208                         CERROR("idx != -1 in empty log\n");
209                         LBUG();
210                 }
211
212                 if (idx && llh->llh_size && llh->llh_size != reclen)
213                         RETURN(-EINVAL);
214
215                 rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
216                 /* we are done if we only write the header or on error */
217                 if (rc || idx == 0)
218                         RETURN(rc);
219
220                 saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
221                 rc = llog_lvfs_write_blob(ctxt, file, rec, buf, saved_offset);
222                 if (rc == 0 && reccookie) {
223                         reccookie->lgc_lgl = loghandle->lgh_id;
224                         reccookie->lgc_index = idx;
225                         rc = 1;
226                 }
227                 RETURN(rc);
228         }
229
230         /* Make sure that records don't cross a chunk boundary, so we can
231          * process them page-at-a-time if needed.  If it will cross a chunk
232          * boundary, write in a fake (but referenced) entry to pad the chunk.
233          *
234          * We know that llog_current_log() will return a loghandle that is
235          * big enough to hold reclen, so all we care about is padding here.
236          */
237         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
238         if (buf)
239                 reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
240                          sizeof(struct llog_rec_tail);
241
242         /* NOTE: padding is a record, but no bit is set */
243         if (left != 0 && left != reclen &&
244             left < (reclen + LLOG_MIN_REC_SIZE)) {
245                 loghandle->lgh_last_idx++;
246                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
247                 if (rc)
248                         RETURN(rc);
249         }
250
251         loghandle->lgh_last_idx++;
252         index = loghandle->lgh_last_idx;
253         rec->lrh_index = cpu_to_le32(index);
254         if (buf == NULL) {
255                 lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
256                 lrt->lrt_len = rec->lrh_len;
257                 lrt->lrt_index = rec->lrh_index;
258         }
259         if (ext2_set_bit(index, llh->llh_bitmap)) {
260                 CERROR("argh, index %u already set in log bitmap?\n", index);
261                 LBUG(); /* should never happen */
262         }
263         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
264         llh->llh_tail.lrt_index = cpu_to_le32(index);
265
266         offset = 0;
267         rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
268         if (rc)
269                 RETURN(rc);
270
271         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
272         if (rc)
273                 RETURN(rc);
274
275         CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
276                loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
277         if (rc == 0 && reccookie) {
278                 reccookie->lgc_lgl = loghandle->lgh_id;
279                 reccookie->lgc_index = index;
280                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
281                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
282                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
283                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
284                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
285                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
286                 else
287                         reccookie->lgc_subsys = -1;
288                 rc = 1;
289         }
290         if (rc == 0 && (le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC ||
291             le32_to_cpu(rec->lrh_type) == SMFS_UPDATE_REC))
292                 rc = 1;
293
294         RETURN(rc);
295 }
296
297 /* We can skip reading at least as many log blocks as the number of
298 * minimum sized log records we are skipping.  If it turns out
299 * that we are not far enough along the log (because the
300 * actual records are larger than minimum size) we just skip
301 * some more records. */
302
303 static void llog_skip_over(__u64 *off, int curr, int goal)
304 {
305         if (goal <= curr)
306                 return;
307         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
308                 ~(LLOG_CHUNK_SIZE - 1);
309 }
310
311 /* sets:
312  *  - curr_offset to the furthest point read in the log file
313  *  - curr_idx to the log index preceeding curr_offset
314  * returns -EIO/-EINVAL on error
315  */
316 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
317                                 int next_idx, __u64 *curr_offset, void *buf,
318                                 int len)
319 {
320         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
321         int rc;
322         ENTRY;
323
324         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
325                 RETURN(-EINVAL);
326
327         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
328                next_idx, *curr_idx, *curr_offset);
329
330         while (*curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
331                 struct llog_rec_hdr *rec;
332                 struct llog_rec_tail *tail;
333                 loff_t ppos;
334
335                 llog_skip_over(curr_offset, *curr_idx, next_idx);
336
337                 ppos = *curr_offset;
338                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
339                                              buf, len, &ppos);
340
341                 if (rc) {
342                         CERROR("Cant read llog block at log id "LPU64
343                                "/%u offset "LPU64"\n",
344                                loghandle->lgh_id.lgl_oid,
345                                loghandle->lgh_id.lgl_ogen,
346                                *curr_offset);
347                         RETURN(rc);
348                 }
349
350                 /* put number of bytes read into rc to make code simpler */
351                 rc = ppos - *curr_offset;
352                 *curr_offset = ppos;
353
354                 if (rc == 0) /* end of file, nothing to do */
355                         RETURN(0);
356
357                 if (rc < sizeof(*tail)) {
358                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
359                                LPU64"\n", loghandle->lgh_id.lgl_oid,
360                                loghandle->lgh_id.lgl_ogen, *curr_offset);
361                         RETURN(-EINVAL);
362                 }
363
364                 tail = buf + rc - sizeof(struct llog_rec_tail);
365                 *curr_idx = le32_to_cpu(tail->lrt_index);
366
367                 /* this shouldn't happen */
368                 if (tail->lrt_index == 0) {
369                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
370                                LPU64"\n", loghandle->lgh_id.lgl_oid,
371                                loghandle->lgh_id.lgl_ogen, *curr_offset);
372                         RETURN(-EINVAL);
373                 }
374                 if (le32_to_cpu(tail->lrt_index) < next_idx)
375                         continue;
376
377                 /* sanity check that the start of the new buffer is no farther
378                  * than the record that we wanted.  This shouldn't happen. */
379                 rec = buf;
380                 if (le32_to_cpu(rec->lrh_index) > next_idx) {
381                         CERROR("missed desired record? %u > %u\n",
382                                le32_to_cpu(rec->lrh_index), next_idx);
383                         RETURN(-ENOENT);
384                 }
385                 RETURN(0);
386         }
387         RETURN(-EIO);
388 }
389
390 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
391                                 int prev_idx, void *buf, int len)
392 {
393         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
394         __u64 curr_offset;
395         int rc;
396         ENTRY;
397
398         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
399                 RETURN(-EINVAL);
400
401         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
402
403         curr_offset = LLOG_CHUNK_SIZE;
404         llog_skip_over(&curr_offset, 0, prev_idx);
405
406         while (curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
407                 struct llog_rec_hdr *rec;
408                 struct llog_rec_tail *tail;
409                 loff_t ppos;
410
411                 ppos = curr_offset;
412                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
413                                              buf, len, &ppos);
414
415                 if (rc) {
416                         CERROR("Cant read llog block at log id "LPU64
417                                "/%u offset "LPU64"\n",
418                                loghandle->lgh_id.lgl_oid,
419                                loghandle->lgh_id.lgl_ogen,
420                                curr_offset);
421                         RETURN(rc);
422                 }
423
424                 /* put number of bytes read into rc to make code simpler */
425                 rc = ppos - curr_offset;
426                 curr_offset = ppos;
427
428                 if (rc == 0) /* end of file, nothing to do */
429                         RETURN(0);
430
431                 if (rc < sizeof(*tail)) {
432                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
433                                LPU64"\n", loghandle->lgh_id.lgl_oid,
434                                loghandle->lgh_id.lgl_ogen, curr_offset);
435                         RETURN(-EINVAL);
436                 }
437
438                 tail = buf + rc - sizeof(struct llog_rec_tail);
439
440                 /* this shouldn't happen */
441                 if (tail->lrt_index == 0) {
442                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
443                                LPU64"\n", loghandle->lgh_id.lgl_oid,
444                                loghandle->lgh_id.lgl_ogen, curr_offset);
445                         RETURN(-EINVAL);
446                 }
447                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
448                         continue;
449
450                 /* sanity check that the start of the new buffer is no farther
451                  * than the record that we wanted.  This shouldn't happen. */
452                 rec = buf;
453                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
454                         CERROR("missed desired record? %u > %u\n",
455                                le32_to_cpu(rec->lrh_index), prev_idx);
456                         RETURN(-ENOENT);
457                 }
458                 RETURN(0);
459         }
460         RETURN(-EIO);
461 }
462
463 static struct file *llog_filp_open(char *name, int flags, int mode)
464 {
465         char *logname;
466         struct file *filp;
467         int len;
468
469         OBD_ALLOC(logname, PATH_MAX);
470         if (logname == NULL)
471                 return ERR_PTR(-ENOMEM);
472
473         len = snprintf(logname, PATH_MAX, "LOGS/%s", name);
474         if (len >= PATH_MAX - 1) {
475                 filp = ERR_PTR(-ENAMETOOLONG);
476         } else {
477                 filp = l_filp_open(logname, flags, mode);
478                 if (IS_ERR(filp)) {
479                         CERROR("logfile creation %s: %ld\n", logname,
480                                PTR_ERR(filp));
481                 }
482         }
483
484         OBD_FREE(logname, PATH_MAX);
485         return filp;
486 }
487
488 static struct file *llog_object_create(struct llog_ctxt *ctxt)
489 {
490         unsigned int tmpname = ll_insecure_random_int();
491         char fidname[LL_FID_NAMELEN];
492         struct file *filp;
493         struct dentry *new_child, *parent;
494         void *handle;
495         int rc = 0, err, namelen;
496         ENTRY;
497
498         sprintf(fidname, "OBJECTS/%u", tmpname);
499         filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
500         if (IS_ERR(filp)) {
501                 rc = PTR_ERR(filp);
502                 if (rc == -EEXIST) {
503                         CERROR("impossible object name collision %u\n",
504                                tmpname);
505                         LBUG();
506                 }
507                 CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
508                 RETURN(filp);
509         }
510
511         namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
512                              filp->f_dentry->d_inode->i_generation);
513         parent = filp->f_dentry->d_parent;
514         down(&parent->d_inode->i_sem);
515         new_child = lookup_one_len(fidname, parent, namelen);
516         if (IS_ERR(new_child)) {
517                 CERROR("getting neg dentry for obj rename: %d\n", rc);
518                 GOTO(out_close, rc = PTR_ERR(new_child));
519         }
520         if (new_child->d_inode != NULL) {
521                 CERROR("impossible non-negative obj dentry %lu:%u!\n",
522                        filp->f_dentry->d_inode->i_ino,
523                        filp->f_dentry->d_inode->i_generation);
524                 LBUG();
525         }
526
527         handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
528         if (IS_ERR(handle))
529                 GOTO(out_dput, rc = PTR_ERR(handle));
530
531         lock_kernel();
532         rc = vfs_rename(parent->d_inode, filp->f_dentry,
533                         parent->d_inode, new_child);
534         unlock_kernel();
535         if (rc)
536                 CERROR("error renaming new object %lu:%u: rc %d\n",
537                        filp->f_dentry->d_inode->i_ino,
538                        filp->f_dentry->d_inode->i_generation, rc);
539
540         err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
541         if (!rc)
542                 rc = err;
543 out_dput:
544         dput(new_child);
545 out_close:
546         up(&parent->d_inode->i_sem);
547         if (rc) {
548                 filp_close(filp, 0);
549                 filp = (struct file *)rc;
550         }
551
552         RETURN(filp);
553 }
554
555 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
556                                 struct dentry *dentry)
557 {
558         struct dentry *new_child;
559         char fidname[LL_FID_NAMELEN];
560         void *handle;
561         int namelen, rc = 0, err;
562         ENTRY;
563
564         namelen = ll_fid2str(fidname, logid.lgl_oid, logid.lgl_ogen);
565         down(&ctxt->loc_objects_dir->d_inode->i_sem);
566         new_child = lookup_one_len(fidname, ctxt->loc_objects_dir, namelen);
567         if (IS_ERR(new_child)) {
568                 CERROR("getting neg dentry for obj rename: %d\n", rc);
569                 GOTO(out, rc = PTR_ERR(new_child));
570         }
571         if (new_child->d_inode == dentry->d_inode)
572                 GOTO(out_dput, rc);
573         if (new_child->d_inode != NULL) {
574                 CERROR("impossible non-negative obj dentry "LPX64":%u!\n",
575                        logid.lgl_oid, logid.lgl_ogen);
576                 LBUG();
577         }
578         handle = llog_fsfilt_start(ctxt, ctxt->loc_objects_dir->d_inode,
579                                    FSFILT_OP_LINK, NULL);
580         if (IS_ERR(handle))
581                 GOTO(out_dput, rc = PTR_ERR(handle));
582
583         lock_kernel();
584         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
585         unlock_kernel();
586         if (rc)
587                 CERROR("error link new object "LPX64":%u: rc %d\n",
588                        logid.lgl_oid, logid.lgl_ogen, rc);
589         err = llog_fsfilt_commit(ctxt, ctxt->loc_objects_dir->d_inode, handle, 0);
590 out_dput:
591         l_dput(new_child);
592 out:
593         up(&ctxt->loc_objects_dir->d_inode->i_sem);
594         RETURN(rc);
595 }
596
597 /* This is a callback from the llog_* functions.
598  * Assumes caller has already pushed us into the kernel context. */
599 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
600                             struct llog_logid *logid, char *name)
601 {
602         struct llog_handle *handle;
603         struct lvfs_run_ctxt saved;
604         int rc = 0;
605         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
606         ENTRY;
607
608         handle = llog_alloc_handle();
609         if (handle == NULL)
610                 RETURN(-ENOMEM);
611         *res = handle;
612
613         LASSERT(ctxt);
614         if (ctxt->loc_lvfs_ctxt)
615                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
616
617         if (logid != NULL) {
618                 char logname[LL_FID_NAMELEN + 10] = "OBJECTS/";
619                 char fidname[LL_FID_NAMELEN];
620                 ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen);
621                 strcat(logname, fidname);
622
623                 handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644);
624                 if (IS_ERR(handle->lgh_file)) {
625                         CERROR("cannot open %s file\n", logname);
626                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
627                 }
628                 if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) {
629                         CERROR("%s is not a regular file!: mode = %o\n", logname,
630                                handle->lgh_file->f_dentry->d_inode->i_mode);
631                         GOTO(cleanup, rc = -ENOENT);
632                 }
633                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
634                 handle->lgh_id = *logid;
635         } else if (name) {
636                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
637                 if (IS_ERR(handle->lgh_file))
638                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
639                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
640
641                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
642                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
643                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
644                 if (rc)
645                         GOTO(cleanup, rc);
646         } else {
647                 handle->lgh_file = llog_object_create(ctxt);
648                 if (IS_ERR(handle->lgh_file))
649                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
650                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
651                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
652                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
653         }
654
655         handle->lgh_id.lgl_ogr = 1;
656         handle->lgh_ctxt = ctxt;
657  finish:
658         if (ctxt->loc_lvfs_ctxt)
659                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
660         RETURN(rc);
661 cleanup:
662         llog_free_handle(handle);
663         goto finish;
664 }
665
666 static int llog_lvfs_close(struct llog_handle *handle)
667 {
668         int rc;
669         ENTRY;
670
671         rc = filp_close(handle->lgh_file, 0);
672         if (rc)
673                 CERROR("error closing log: rc %d\n", rc);
674         RETURN(rc);
675 }
676
677 static int llog_lvfs_destroy(struct llog_handle *loghandle)
678 {
679         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
680         struct lvfs_run_ctxt saved;
681         struct dentry *fdentry;
682         struct inode *parent_inode;
683         char fidname[LL_FID_NAMELEN];
684         void *handle;
685         int rc = -EINVAL, err, namelen;
686         ENTRY;
687
688         if (ctxt->loc_lvfs_ctxt)
689                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
690
691         fdentry = loghandle->lgh_file->f_dentry;
692         parent_inode = fdentry->d_parent->d_inode;
693
694         if (!strcmp(fdentry->d_parent->d_name.name, "LOGS")) {
695                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
696
697                 namelen = ll_fid2str(fidname, fdentry->d_inode->i_ino,
698                                      fdentry->d_inode->i_generation);
699                 dget(fdentry);
700                 rc = llog_lvfs_close(loghandle);
701                 if (rc) {
702                         dput(fdentry);
703                         GOTO(out, rc);
704                 }
705
706                 handle = llog_fsfilt_start(ctxt, parent_inode,
707                                            FSFILT_OP_UNLINK, NULL);
708                 if (IS_ERR(handle)) {
709                         dput(fdentry);
710                         GOTO(out, rc = PTR_ERR(handle));
711                 }
712
713                 down(&parent_inode->i_sem);
714                 rc = vfs_unlink(parent_inode, fdentry);
715                 up(&parent_inode->i_sem);
716                 dput(fdentry);
717
718                 if (!rc) {
719                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
720                         fdentry = lookup_one_len(fidname, ctxt->loc_objects_dir,
721                                                  namelen);
722                         if (fdentry == NULL || fdentry->d_inode == NULL) {
723                                 CERROR("destroy non_existent object %s\n", fidname);
724                                 GOTO(out_err, rc = IS_ERR(fdentry) ?
725                                      PTR_ERR(fdentry) : -ENOENT);
726                         }
727                         rc = vfs_unlink(ctxt->loc_objects_dir->d_inode, fdentry);
728                         l_dput(fdentry);
729 out_err:
730                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
731                 }
732                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
733                 if (err && !rc)
734                         err = rc;
735
736                 GOTO(out, rc);
737         }
738
739         if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
740                 LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
741
742                 dget(fdentry);
743                 rc = llog_lvfs_close(loghandle);
744                 if (rc == 0) {
745                         down(&parent_inode->i_sem);
746                         rc = vfs_unlink(parent_inode, fdentry);
747                         up(&parent_inode->i_sem);
748                 }
749                 dput(fdentry);
750         }
751 out:
752         if (ctxt->loc_lvfs_ctxt)
753                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
754         RETURN(rc);
755 }
756
757 /* reads the catalog list */
758 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
759                       struct fsfilt_operations *fsops, char *name,
760                       int count, struct llog_catid *idarray)
761 {
762         struct lvfs_run_ctxt saved;
763         struct l_file *file;
764         int size = sizeof(*idarray) * count;
765         loff_t off = 0;
766         int rc;
767
768         LASSERT(count);
769
770         if (ctxt)
771                 push_ctxt(&saved, ctxt, NULL);
772         file = l_filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
773         if (!file || IS_ERR(file)) {
774                 rc = PTR_ERR(file);
775                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
776                        name, rc);
777                 GOTO(out, rc);
778         }
779
780         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
781                 CERROR("%s is not a regular file!: mode = %o\n", name,
782                        file->f_dentry->d_inode->i_mode);
783                 GOTO(out, rc = -ENOENT);
784         }
785
786         rc = fsops->fs_read_record(file, idarray, size, &off);
787         if (rc) {
788                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
789                        name, rc);
790                 GOTO(out, rc);
791         }
792
793  out:
794         if (file && !IS_ERR(file))
795                 rc = filp_close(file, 0);
796         if (ctxt)
797                 pop_ctxt(&saved, ctxt, NULL);
798         RETURN(rc);
799 }
800 EXPORT_SYMBOL(llog_get_cat_list);
801
802 /* writes the cat list */
803 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
804                       struct fsfilt_operations *fsops, char *name,
805                       int count, struct llog_catid *idarray)
806 {
807         struct lvfs_run_ctxt saved;
808         struct l_file *file;
809         int size = sizeof(*idarray) * count;
810         loff_t off = 0;
811         int rc;
812
813         LASSERT(count);
814
815         if (ctxt)
816                 push_ctxt(&saved, ctxt, NULL);
817         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
818         if (!file || IS_ERR(file)) {
819                 rc = PTR_ERR(file);
820                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
821                        name, rc);
822                 GOTO(out, rc);
823         }
824
825         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
826                 CERROR("%s is not a regular file!: mode = %o\n", name,
827                        file->f_dentry->d_inode->i_mode);
828                 GOTO(out, rc = -ENOENT);
829         }
830
831         rc = fsops->fs_write_record(file, idarray, size, &off, 1);
832         if (rc) {
833                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
834                        name, rc);
835                 GOTO(out, rc);
836         }
837
838  out:
839         if (file && !IS_ERR(file))
840                 rc = filp_close(file, 0);
841         if (ctxt)
842                 pop_ctxt(&saved, ctxt, NULL);
843         RETURN(rc);
844 }
845 EXPORT_SYMBOL(llog_put_cat_list);
846
847 struct llog_operations llog_lvfs_ops = {
848         lop_create:      llog_lvfs_create,
849         lop_destroy:     llog_lvfs_destroy,
850         lop_close:       llog_lvfs_close,
851         lop_read_header: llog_lvfs_read_header,
852         lop_write_rec:   llog_lvfs_write_rec,
853         lop_next_block:  llog_lvfs_next_block,
854         lop_prev_block:  llog_lvfs_prev_block,
855 };
856 EXPORT_SYMBOL(llog_lvfs_ops);
857
858 #else /* !__KERNEL__ */
859
860 static int llog_lvfs_read_header(struct llog_handle *handle)
861 {
862         LBUG();
863         return 0;
864 }
865
866 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
867                                struct llog_rec_hdr *rec,
868                                struct llog_cookie *reccookie, int cookiecount,
869                                void *buf, int idx)
870 {
871         LBUG();
872         return 0;
873 }
874
875 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
876                             struct llog_logid *logid, char *name)
877 {
878         LBUG();
879         return 0;
880 }
881
882 static int llog_lvfs_close(struct llog_handle *handle)
883 {
884         LBUG();
885         return 0;
886 }
887
888 static int llog_lvfs_destroy(struct llog_handle *handle)
889 {
890         LBUG();
891         return 0;
892 }
893
894 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
895                       struct fsfilt_operations *fsops, char *name,
896                       int count, struct llog_catid *idarray)
897 {
898         LBUG();
899         return 0;
900 }
901
902 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
903                       struct fsfilt_operations *fsops, char *name,
904                       int count, struct llog_catid *idarray)
905 {
906         LBUG();
907         return 0;
908 }
909
910 int llog_lvfs_prev_block(struct llog_handle *loghandle,
911                          int prev_idx, void *buf, int len)
912 {
913         LBUG();
914         return 0;
915 }
916
917 int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
918                          int next_idx, __u64 *offset, void *buf, int len)
919 {
920         LBUG();
921         return 0;
922 }
923
924 struct llog_operations llog_lvfs_ops = {
925         lop_create:      llog_lvfs_create,
926         lop_destroy:     llog_lvfs_destroy,
927         lop_close:       llog_lvfs_close,
928         lop_read_header: llog_lvfs_read_header,
929         lop_write_rec:   llog_lvfs_write_rec,
930         lop_next_block:  llog_lvfs_next_block,
931         lop_prev_block:  llog_lvfs_prev_block,
932 };
933 #endif