Whamcloud - gitweb
6b70cf18c856e7fbf676df9591fe4bd36e5596d9
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lvfs.h>
42 #include <linux/lustre_fsfilt.h>
43 #include <linux/lustre_log.h>
44
45 #ifdef __KERNEL__
46
47 static int llog_lvfs_pad(struct llog_ctxt *ctxt, struct l_file *file,
48                          int len, int index)
49 {
50         struct llog_rec_hdr rec;
51         struct llog_rec_tail tail;
52         int rc;
53         ENTRY;
54
55         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
56
57         tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
58         tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
59         rec.lrh_type = 0;
60
61         rc = llog_fsfilt_write_record(ctxt, file, &rec, sizeof(rec),
62                                       &file->f_pos, 0);
63         if (rc) {
64                 CERROR("error writing padding record: rc %d\n", rc);
65                 goto out;
66         }
67
68         file->f_pos += len - sizeof(rec) - sizeof(tail);
69         rc = llog_fsfilt_write_record(ctxt, file, &tail, sizeof(tail),
70                                       &file->f_pos, 0);
71         if (rc) {
72                 CERROR("error writing padding record: rc %d\n", rc);
73                 goto out;
74         }
75
76  out:
77         RETURN(rc);
78 }
79
80 static int llog_lvfs_write_blob(struct llog_ctxt *ctxt, struct l_file *file,
81                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
82 {
83         int rc;
84         struct llog_rec_tail end;
85         loff_t saved_off = file->f_pos;
86         int buflen = le32_to_cpu(rec->lrh_len);
87
88         ENTRY;
89         file->f_pos = off;
90
91         if (!buf) {
92                 rc = llog_fsfilt_write_record(ctxt, file, rec, buflen,
93                                               &file->f_pos, 0);
94                 if (rc) {
95                         CERROR("error writing log record: rc %d\n", rc);
96                         goto out;
97                 }
98                 GOTO(out, rc = 0);
99         }
100
101         /* the buf case */
102         rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
103         rc = llog_fsfilt_write_record(ctxt, file, rec, sizeof(*rec),
104                                       &file->f_pos, 0);
105         if (rc) {
106                 CERROR("error writing log hdr: rc %d\n", rc);
107                 goto out;
108         }
109
110         rc = llog_fsfilt_write_record(ctxt, file, buf, buflen,
111                                       &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log buffer: rc %d\n", rc);
114                 goto out;
115         }
116
117         end.lrt_len = rec->lrh_len;
118         end.lrt_index = rec->lrh_index;
119         rc = llog_fsfilt_write_record(ctxt, file, &end, sizeof(end),
120                                       &file->f_pos, 0);
121         if (rc) {
122                 CERROR("error writing log tail: rc %d\n", rc);
123                 goto out;
124         }
125
126         rc = 0;
127  out:
128         if (saved_off > file->f_pos)
129                 file->f_pos = saved_off;
130         LASSERT(rc <= 0);
131         RETURN(rc);
132 }
133
134 static int llog_lvfs_read_blob(struct llog_ctxt *ctxt, struct l_file *file,
135                                void *buf, int size, loff_t off)
136 {
137         loff_t offset = off;
138         int rc;
139         ENTRY;
140
141         rc = llog_fsfilt_read_record(ctxt, file, buf, size, &offset);
142         if (rc) {
143                 CERROR("error reading log record: rc %d\n", rc);
144                 RETURN(rc);
145         }
146         RETURN(0);
147 }
148
149 static int llog_lvfs_read_header(struct llog_handle *handle)
150 {
151         struct llog_ctxt *ctxt = handle->lgh_ctxt;
152         int rc;
153         ENTRY;
154
155         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
156         LASSERT(ctxt != NULL);
157
158         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
159                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
160                 RETURN(LLOG_EEMPTY);
161         }
162
163         rc = llog_lvfs_read_blob(ctxt, handle->lgh_file, handle->lgh_hdr,
164                                  LLOG_CHUNK_SIZE, 0);
165         if (rc)
166                 CERROR("error reading log header\n");
167
168         handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
169         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
170
171         RETURN(rc);
172 }
173
174 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
175 /* appends if idx == -1, otherwise overwrites record idx. */
176 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
177                                struct llog_rec_hdr *rec,
178                                struct llog_cookie *reccookie,
179                                int cookiecount,
180                                void *buf, int idx)
181 {
182         struct llog_log_hdr *llh;
183         int reclen = le32_to_cpu(rec->lrh_len), index, rc;
184         struct llog_rec_tail *lrt;
185         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
186         struct file *file;
187         loff_t offset;
188         size_t left;
189         ENTRY;
190
191         llh = loghandle->lgh_hdr;
192         file = loghandle->lgh_file;
193
194         /* record length should not bigger than LLOG_CHUNK_SIZE */
195         if (buf)
196                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
197                       - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
198         else
199                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
200         if (rc)
201                 RETURN(rc);
202
203         if (idx != -1) {
204                 loff_t saved_offset;
205
206                 /* no header: only allowed to insert record 1 */
207                 if (idx != 1 && !file->f_dentry->d_inode->i_size) {
208                         CERROR("idx != -1 in empty log\n");
209                         LBUG();
210                 }
211
212                 if (idx && llh->llh_size && llh->llh_size != reclen)
213                         RETURN(-EINVAL);
214
215                 rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
216                 /* we are done if we only write the header or on error */
217                 if (rc || idx == 0)
218                         RETURN(rc);
219
220                 saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
221                 rc = llog_lvfs_write_blob(ctxt, file, rec, buf, saved_offset);
222                 if (rc == 0 && reccookie) {
223                         reccookie->lgc_lgl = loghandle->lgh_id;
224                         reccookie->lgc_index = idx;
225                         rc = 1;
226                 }
227                 RETURN(rc);
228         }
229
230         /* Make sure that records don't cross a chunk boundary, so we can
231          * process them page-at-a-time if needed.  If it will cross a chunk
232          * boundary, write in a fake (but referenced) entry to pad the chunk.
233          *
234          * We know that llog_current_log() will return a loghandle that is
235          * big enough to hold reclen, so all we care about is padding here.
236          */
237         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
238         if (buf)
239                 reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
240                          sizeof(struct llog_rec_tail);
241
242         /* NOTE: padding is a record, but no bit is set */
243         if (left != 0 && left != reclen &&
244             left < (reclen + LLOG_MIN_REC_SIZE)) {
245                 loghandle->lgh_last_idx++;
246                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
247                 if (rc)
248                         RETURN(rc);
249         }
250
251         loghandle->lgh_last_idx++;
252         index = loghandle->lgh_last_idx;
253         rec->lrh_index = cpu_to_le32(index);
254         if (buf == NULL) {
255                 lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
256                 lrt->lrt_len = rec->lrh_len;
257                 lrt->lrt_index = rec->lrh_index;
258         }
259         if (ext2_set_bit(index, llh->llh_bitmap)) {
260                 CERROR("argh, index %u already set in log bitmap?\n", index);
261                 LBUG(); /* should never happen */
262         }
263         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
264         llh->llh_tail.lrt_index = cpu_to_le32(index);
265
266         offset = 0;
267         rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
268         if (rc)
269                 RETURN(rc);
270
271         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
272         if (rc)
273                 RETURN(rc);
274
275         CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
276                loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
277         if (rc == 0 && reccookie) {
278                 reccookie->lgc_lgl = loghandle->lgh_id;
279                 reccookie->lgc_index = index;
280                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
281                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
282                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
283                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
284                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
285                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
286                 else
287                         reccookie->lgc_subsys = -1;
288                 rc = 1;
289         }
290         if (rc == 0 && (le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC ||
291             le32_to_cpu(rec->lrh_type) == SMFS_UPDATE_REC))
292                 rc = 1;
293
294         RETURN(rc);
295 }
296
297 /* We can skip reading at least as many log blocks as the number of
298 * minimum sized log records we are skipping.  If it turns out
299 * that we are not far enough along the log (because the
300 * actual records are larger than minimum size) we just skip
301 * some more records. */
302
303 static void llog_skip_over(__u64 *off, int curr, int goal)
304 {
305         if (goal <= curr)
306                 return;
307         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
308                 ~(LLOG_CHUNK_SIZE - 1);
309 }
310
311 /* sets:
312  *  - curr_offset to the furthest point read in the log file
313  *  - curr_idx to the log index preceeding curr_offset
314  * returns -EIO/-EINVAL on error
315  */
316 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
317                                 int next_idx, __u64 *curr_offset, void *buf,
318                                 int len)
319 {
320         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
321         int rc;
322         ENTRY;
323
324         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
325                 RETURN(-EINVAL);
326
327         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
328                next_idx, *curr_idx, *curr_offset);
329
330         while (*curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
331                 struct llog_rec_hdr *rec;
332                 struct llog_rec_tail *tail;
333                 loff_t ppos;
334
335                 llog_skip_over(curr_offset, *curr_idx, next_idx);
336
337                 ppos = *curr_offset;
338                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
339                                              buf, len, &ppos);
340
341                 if (rc) {
342                         CERROR("Cant read llog block at log id "LPU64
343                                "/%u offset "LPU64"\n",
344                                loghandle->lgh_id.lgl_oid,
345                                loghandle->lgh_id.lgl_ogen,
346                                *curr_offset);
347                         RETURN(rc);
348                 }
349
350                 /* put number of bytes read into rc to make code simpler */
351                 rc = ppos - *curr_offset;
352                 *curr_offset = ppos;
353
354                 if (rc == 0) /* end of file, nothing to do */
355                         RETURN(0);
356
357                 if (rc < sizeof(*tail)) {
358                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
359                                LPU64"\n", loghandle->lgh_id.lgl_oid,
360                                loghandle->lgh_id.lgl_ogen, *curr_offset);
361                         RETURN(-EINVAL);
362                 }
363
364                 tail = buf + rc - sizeof(struct llog_rec_tail);
365                 *curr_idx = le32_to_cpu(tail->lrt_index);
366
367                 /* this shouldn't happen */
368                 if (tail->lrt_index == 0) {
369                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
370                                LPU64"\n", loghandle->lgh_id.lgl_oid,
371                                loghandle->lgh_id.lgl_ogen, *curr_offset);
372                         RETURN(-EINVAL);
373                 }
374                 if (le32_to_cpu(tail->lrt_index) < next_idx)
375                         continue;
376
377                 /* sanity check that the start of the new buffer is no farther
378                  * than the record that we wanted.  This shouldn't happen. */
379                 rec = buf;
380                 if (le32_to_cpu(rec->lrh_index) > next_idx) {
381                         CERROR("missed desired record? %u > %u\n",
382                                le32_to_cpu(rec->lrh_index), next_idx);
383                         RETURN(-ENOENT);
384                 }
385                 RETURN(0);
386         }
387         RETURN(-EIO);
388 }
389
390 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
391                                 int prev_idx, void *buf, int len)
392 {
393         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
394         __u64 curr_offset;
395         int rc;
396         ENTRY;
397
398         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
399                 RETURN(-EINVAL);
400
401         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
402
403         curr_offset = LLOG_CHUNK_SIZE;
404         llog_skip_over(&curr_offset, 0, prev_idx);
405
406         while (curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
407                 struct llog_rec_hdr *rec;
408                 struct llog_rec_tail *tail;
409                 loff_t ppos;
410
411                 ppos = curr_offset;
412                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
413                                              buf, len, &ppos);
414
415                 if (rc) {
416                         CERROR("Cant read llog block at log id "LPU64
417                                "/%u offset "LPU64"\n",
418                                loghandle->lgh_id.lgl_oid,
419                                loghandle->lgh_id.lgl_ogen,
420                                curr_offset);
421                         RETURN(rc);
422                 }
423
424                 /* put number of bytes read into rc to make code simpler */
425                 rc = ppos - curr_offset;
426                 curr_offset = ppos;
427
428                 if (rc == 0) /* end of file, nothing to do */
429                         RETURN(0);
430
431                 if (rc < sizeof(*tail)) {
432                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
433                                LPU64"\n", loghandle->lgh_id.lgl_oid,
434                                loghandle->lgh_id.lgl_ogen, curr_offset);
435                         RETURN(-EINVAL);
436                 }
437
438                 tail = buf + rc - sizeof(struct llog_rec_tail);
439
440                 /* this shouldn't happen */
441                 if (tail->lrt_index == 0) {
442                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
443                                LPU64"\n", loghandle->lgh_id.lgl_oid,
444                                loghandle->lgh_id.lgl_ogen, curr_offset);
445                         RETURN(-EINVAL);
446                 }
447                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
448                         continue;
449
450                 /* sanity check that the start of the new buffer is no farther
451                  * than the record that we wanted.  This shouldn't happen. */
452                 rec = buf;
453                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
454                         CERROR("missed desired record? %u > %u\n",
455                                le32_to_cpu(rec->lrh_index), prev_idx);
456                         RETURN(-ENOENT);
457                 }
458                 RETURN(0);
459         }
460         RETURN(-EIO);
461 }
462
463 static struct file *llog_filp_open(char *name, int flags, int mode)
464 {
465         char *logname;
466         struct file *filp;
467         int len;
468
469         OBD_ALLOC(logname, PATH_MAX);
470         if (logname == NULL)
471                 return ERR_PTR(-ENOMEM);
472
473         len = snprintf(logname, PATH_MAX, "LOGS/%s", name);
474         if (len >= PATH_MAX - 1) {
475                 filp = ERR_PTR(-ENAMETOOLONG);
476         } else {
477                 filp = l_filp_open(logname, flags, mode);
478                 if (IS_ERR(filp)) {
479                         CERROR("logfile creation %s: %ld\n", logname,
480                                PTR_ERR(filp));
481                 }
482         }
483
484         OBD_FREE(logname, PATH_MAX);
485         return filp;
486 }
487
488 /* creates object for the case when we have no obd (smfs). */
489 static struct file *
490 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
491 {
492         unsigned int tmpname = ll_insecure_random_int();
493         char fidname[LL_FID_NAMELEN];
494         struct file *filp;
495         struct dentry *new_child, *parent;
496         void *handle;
497         int rc = 0, err, namelen;
498         ENTRY;
499
500         sprintf(fidname, "OBJECTS/%u", tmpname);
501         filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
502         if (IS_ERR(filp)) {
503                 rc = PTR_ERR(filp);
504                 if (rc == -EEXIST) {
505                         CERROR("impossible object name collision %u\n",
506                                tmpname);
507                         LBUG();
508                 }
509                 CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
510                 RETURN(filp);
511         }
512
513         namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
514                              filp->f_dentry->d_inode->i_generation);
515         parent = filp->f_dentry->d_parent;
516         down(&parent->d_inode->i_sem);
517         new_child = lookup_one_len(fidname, parent, namelen);
518         if (IS_ERR(new_child)) {
519                 CERROR("getting neg dentry for obj rename: %d\n", rc);
520                 GOTO(out_close, rc = PTR_ERR(new_child));
521         }
522         if (new_child->d_inode != NULL) {
523                 CERROR("impossible non-negative obj dentry %lu:%u!\n",
524                        filp->f_dentry->d_inode->i_ino,
525                        filp->f_dentry->d_inode->i_generation);
526                 LBUG();
527         }
528
529         handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
530         if (IS_ERR(handle))
531                 GOTO(out_dput, rc = PTR_ERR(handle));
532
533         lock_kernel();
534         rc = vfs_rename(parent->d_inode, filp->f_dentry,
535                         parent->d_inode, new_child);
536         unlock_kernel();
537         if (rc)
538                 CERROR("error renaming new object %lu:%u: rc %d\n",
539                        filp->f_dentry->d_inode->i_ino,
540                        filp->f_dentry->d_inode->i_generation, rc);
541
542         err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
543         if (!rc)
544                 rc = err;
545
546 out_dput:
547         dput(new_child);
548 out_close:
549         up(&parent->d_inode->i_sem);
550         if (rc) {
551                 filp_close(filp, 0);
552                 filp = (struct file *)rc;
553         } else {
554                 /* FIXME: is this group 1 is correct? */
555                 lgh_id->lgl_ogr = 1;
556                 lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
557                 lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
558         }
559
560         RETURN(filp);
561 }
562
563 /* creates object for generic case (obd exists) */
564 static struct file *
565 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
566 {
567         int rc = 0;
568         struct file *filp;
569         struct dentry *dchild;
570         struct obd_device *obd;
571         struct obdo *oa = NULL;
572         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
573         ENTRY;
574         
575         obd = ctxt->loc_exp->exp_obd;
576
577         /* this is important to work here over obd_create() as it manages 
578            groups and we need it. Yet another reason is that mds_obd_create()
579            is fully the same as old version of this function and this helps
580            us to avoid code duplicating and layering violating. */
581         OBD_ALLOC(oa, sizeof(*oa));
582         if (!oa)
583                 RETURN(ERR_PTR(-ENOMEM));
584                 
585         oa->o_gr = FILTER_GROUP_LLOG;
586         oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
587         rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
588         if (rc)
589                 GOTO(out_free_oa, rc);
590
591         dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
592                                      oa->o_generation, oa->o_gr);
593         if (IS_ERR(dchild))
594                 GOTO(out_free_oa, rc = PTR_ERR(dchild));
595                 
596         filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
597                              open_flags);
598         if (IS_ERR(filp)) {
599                 l_dput(dchild);
600                 GOTO(out_free_oa, rc = PTR_ERR(filp));
601         }
602
603         /* group 1 is not longer valid, we use the group which is set 
604            by obd_create()->mds_obd_create(). */
605         lgh_id->lgl_ogr = oa->o_gr;
606         lgh_id->lgl_oid = oa->o_id;
607         lgh_id->lgl_ogen = oa->o_generation;
608         OBD_FREE(oa, sizeof(*oa));
609         RETURN(filp);
610         
611 out_free_oa:
612         OBD_FREE(oa, sizeof(*oa));
613         RETURN(ERR_PTR(rc));
614 }
615
616 static struct file *
617 llog_object_create(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
618 {
619         if (ctxt->loc_alone)
620                 return llog_object_create_alone(ctxt, lgh_id);
621         else
622                 return llog_object_create_generic(ctxt, lgh_id);
623 }
624
625 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
626                                 struct dentry *dentry)
627 {
628         struct dentry *new_child;
629         char fidname[LL_FID_NAMELEN];
630         void *handle;
631         int namelen, rc = 0, err;
632         ENTRY;
633         
634         namelen = ll_fid2str(fidname, logid.lgl_oid, logid.lgl_ogen);
635         down(&ctxt->loc_objects_dir->d_inode->i_sem);
636         new_child = lookup_one_len(fidname, ctxt->loc_objects_dir, namelen);
637         if (IS_ERR(new_child)) {
638                 CERROR("getting neg dentry for obj rename: %d\n", rc);
639                 GOTO(out, rc = PTR_ERR(new_child));
640         }
641         if (new_child->d_inode == dentry->d_inode)
642                 GOTO(out_dput, rc);
643         if (new_child->d_inode != NULL) {
644                 CERROR("impossible non-negative obj dentry "LPX64":%u!\n",
645                        logid.lgl_oid, logid.lgl_ogen);
646                 LBUG();
647         }
648         handle = llog_fsfilt_start(ctxt, ctxt->loc_objects_dir->d_inode,
649                                    FSFILT_OP_LINK, NULL);
650         if (IS_ERR(handle))
651                 GOTO(out_dput, rc = PTR_ERR(handle));
652         
653         lock_kernel();
654         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
655         unlock_kernel();
656         if (rc)
657                 CERROR("error link new object "LPX64":%u: rc %d\n",
658                        logid.lgl_oid, logid.lgl_ogen, rc);
659         err = llog_fsfilt_commit(ctxt, ctxt->loc_objects_dir->d_inode, handle, 0);
660 out_dput:
661         l_dput(new_child);
662 out:
663         up(&ctxt->loc_objects_dir->d_inode->i_sem);
664         RETURN(rc);
665 }
666
667 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
668                             struct llog_logid *logid, char *name)
669 {
670         struct llog_handle *handle;
671         struct lvfs_run_ctxt saved;
672         int rc = 0;
673         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
674         ENTRY;
675         
676         handle = llog_alloc_handle();
677         if (handle == NULL)
678                 RETURN(-ENOMEM);
679         *res = handle;
680         
681         LASSERT(ctxt);
682         if (ctxt->loc_lvfs_ctxt)
683                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
684         
685         if (logid != NULL) {
686                 char logname[LL_FID_NAMELEN + 10] = "OBJECTS/";
687                 char fidname[LL_FID_NAMELEN];
688                 ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen);
689                 strcat(logname, fidname);
690                 
691                 handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644);
692                 if (IS_ERR(handle->lgh_file)) {
693                         CERROR("cannot open %s file, error = %ld\n", 
694                                logname, PTR_ERR(handle->lgh_file));
695                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
696                 }
697                 if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) {
698                         CERROR("%s is not a regular file!: mode = %o\n", logname,
699                                handle->lgh_file->f_dentry->d_inode->i_mode);
700                         GOTO(cleanup, rc = -ENOENT);
701                 }
702                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
703                 handle->lgh_id = *logid;
704         } else if (name) {
705                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
706                 if (IS_ERR(handle->lgh_file)) {
707                         CERROR("cannot open %s file, error = %ld\n", 
708                                name, PTR_ERR(handle->lgh_file));
709                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
710                 }
711                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
712                 
713                 handle->lgh_id.lgl_ogr = 1;
714                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
715                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
716                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
717                 if (rc)
718                         GOTO(cleanup, rc);
719         } else {
720                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
721                 if (IS_ERR(handle->lgh_file)) {
722                         CERROR("cannot create llog object, error = %ld\n", 
723                                PTR_ERR(handle->lgh_file));
724                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
725                 }
726         }
727
728         handle->lgh_ctxt = ctxt;
729 finish:
730         if (ctxt->loc_lvfs_ctxt)
731                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
732         RETURN(rc);
733 cleanup:
734         llog_free_handle(handle);
735         goto finish;
736 }
737
738 static int llog_lvfs_close(struct llog_handle *handle)
739 {
740         int rc;
741         ENTRY;
742
743         rc = filp_close(handle->lgh_file, 0);
744         if (rc)
745                 CERROR("error closing log: rc %d\n", rc);
746         RETURN(rc);
747 }
748
749 static int llog_lvfs_destroy(struct llog_handle *loghandle)
750 {
751         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
752         struct lvfs_run_ctxt saved;
753         struct dentry *fdentry;
754         struct inode *parent_inode;
755         char fidname[LL_FID_NAMELEN];
756         void *handle;
757         int rc = -EINVAL, err, namelen;
758         ENTRY;
759         
760         if (ctxt->loc_lvfs_ctxt)
761                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
762         
763         fdentry = loghandle->lgh_file->f_dentry;
764         parent_inode = fdentry->d_parent->d_inode;
765         
766         if (!strcmp(fdentry->d_parent->d_name.name, "LOGS")) {
767                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
768                 
769                 namelen = ll_fid2str(fidname, fdentry->d_inode->i_ino,
770                                      fdentry->d_inode->i_generation);
771                 dget(fdentry);
772                 rc = llog_lvfs_close(loghandle);
773                 if (rc) {
774                         dput(fdentry);
775                         GOTO(out, rc);
776                 }
777                 
778                 handle = llog_fsfilt_start(ctxt, parent_inode,
779                                            FSFILT_OP_UNLINK, NULL);
780                 if (IS_ERR(handle)) {
781                         dput(fdentry);
782                         GOTO(out, rc = PTR_ERR(handle));
783                 }
784                 
785                 down(&parent_inode->i_sem);
786                 rc = vfs_unlink(parent_inode, fdentry);
787                 up(&parent_inode->i_sem);
788                 dput(fdentry);
789                 
790                 if (!rc) {
791                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
792                         fdentry = lookup_one_len(fidname, ctxt->loc_objects_dir,
793                                                  namelen);
794                         if (fdentry == NULL || fdentry->d_inode == NULL) {
795                                 CERROR("destroy non_existent object %s\n", fidname);
796                                 GOTO(out_err, rc = IS_ERR(fdentry) ?
797                                      PTR_ERR(fdentry) : -ENOENT);
798                         }
799                         rc = vfs_unlink(ctxt->loc_objects_dir->d_inode, fdentry);
800                         l_dput(fdentry);
801 out_err:
802                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
803                 }
804                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
805                 if (err && !rc)
806                         err = rc;
807                 
808                 GOTO(out, rc);
809         }
810         if (ctxt->loc_alone) {
811                 if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
812                         LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
813                         
814                         dget(fdentry);
815                         rc = llog_lvfs_close(loghandle);
816                         if (rc == 0) {
817                                 down(&parent_inode->i_sem);
818                                 rc = vfs_unlink(parent_inode, fdentry);
819                                 up(&parent_inode->i_sem);
820                         }
821                         dput(fdentry);
822                 }
823         } else {
824                 struct obdo *oa = NULL;
825  
826                 OBD_ALLOC(oa, sizeof(*oa));
827                 if (!oa)
828                         GOTO(out, rc = -ENOMEM);
829                 
830                 oa->o_id = loghandle->lgh_id.lgl_oid;
831                 oa->o_gr = loghandle->lgh_id.lgl_ogr;
832                 oa->o_generation = loghandle->lgh_id.lgl_ogen;
833                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
834                 
835                 rc = llog_lvfs_close(loghandle);
836                 if (rc)
837                         GOTO(out_free_oa, rc);
838                 
839                 rc = obd_destroy(loghandle->lgh_ctxt->loc_exp, oa, NULL, NULL);
840 out_free_oa:
841                 OBD_FREE(oa, sizeof(*oa));
842         }
843 out:
844         if (ctxt->loc_lvfs_ctxt)
845                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
846         RETURN(rc);
847 }
848
849 /* reads the catalog list */
850 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
851                       struct fsfilt_operations *fsops, char *name,
852                       int count, struct llog_catid *idarray)
853 {
854         struct lvfs_run_ctxt saved;
855         struct l_file *file;
856         int size = sizeof(*idarray) * count;
857         loff_t off = 0;
858         int rc;
859
860         LASSERT(count);
861
862         if (ctxt)
863                 push_ctxt(&saved, ctxt, NULL);
864         file = l_filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
865         if (!file || IS_ERR(file)) {
866                 rc = PTR_ERR(file);
867                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
868                        name, rc);
869                 GOTO(out, rc);
870         }
871
872         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
873                 CERROR("%s is not a regular file!: mode = %o\n", name,
874                        file->f_dentry->d_inode->i_mode);
875                 GOTO(out, rc = -ENOENT);
876         }
877
878         rc = fsops->fs_read_record(file, idarray, size, &off);
879         if (rc) {
880                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
881                        name, rc);
882                 GOTO(out, rc);
883         }
884
885  out:
886         if (file && !IS_ERR(file))
887                 rc = filp_close(file, 0);
888         if (ctxt)
889                 pop_ctxt(&saved, ctxt, NULL);
890         RETURN(rc);
891 }
892 EXPORT_SYMBOL(llog_get_cat_list);
893
894 /* writes the cat list */
895 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
896                       struct fsfilt_operations *fsops, char *name,
897                       int count, struct llog_catid *idarray)
898 {
899         struct lvfs_run_ctxt saved;
900         struct l_file *file;
901         int size = sizeof(*idarray) * count;
902         loff_t off = 0;
903         int rc;
904
905         LASSERT(count);
906
907         if (ctxt)
908                 push_ctxt(&saved, ctxt, NULL);
909         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
910         if (!file || IS_ERR(file)) {
911                 rc = PTR_ERR(file);
912                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
913                        name, rc);
914                 GOTO(out, rc);
915         }
916
917         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
918                 CERROR("%s is not a regular file!: mode = %o\n", name,
919                        file->f_dentry->d_inode->i_mode);
920                 GOTO(out, rc = -ENOENT);
921         }
922
923         rc = fsops->fs_write_record(file, idarray, size, &off, 1);
924         if (rc) {
925                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
926                        name, rc);
927                 GOTO(out, rc);
928         }
929
930  out:
931         if (file && !IS_ERR(file))
932                 rc = filp_close(file, 0);
933         if (ctxt)
934                 pop_ctxt(&saved, ctxt, NULL);
935         RETURN(rc);
936 }
937 EXPORT_SYMBOL(llog_put_cat_list);
938
939 struct llog_operations llog_lvfs_ops = {
940         lop_create:      llog_lvfs_create,
941         lop_destroy:     llog_lvfs_destroy,
942         lop_close:       llog_lvfs_close,
943         lop_read_header: llog_lvfs_read_header,
944         lop_write_rec:   llog_lvfs_write_rec,
945         lop_next_block:  llog_lvfs_next_block,
946         lop_prev_block:  llog_lvfs_prev_block,
947 };
948 EXPORT_SYMBOL(llog_lvfs_ops);
949
950 #else /* !__KERNEL__ */
951
952 static int llog_lvfs_read_header(struct llog_handle *handle)
953 {
954         LBUG();
955         return 0;
956 }
957
958 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
959                                struct llog_rec_hdr *rec,
960                                struct llog_cookie *reccookie, int cookiecount,
961                                void *buf, int idx)
962 {
963         LBUG();
964         return 0;
965 }
966
967 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
968                             struct llog_logid *logid, char *name)
969 {
970         LBUG();
971         return 0;
972 }
973
974 static int llog_lvfs_close(struct llog_handle *handle)
975 {
976         LBUG();
977         return 0;
978 }
979
980 static int llog_lvfs_destroy(struct llog_handle *handle)
981 {
982         LBUG();
983         return 0;
984 }
985
986 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
987                       struct fsfilt_operations *fsops, char *name,
988                       int count, struct llog_catid *idarray)
989 {
990         LBUG();
991         return 0;
992 }
993
994 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
995                       struct fsfilt_operations *fsops, char *name,
996                       int count, struct llog_catid *idarray)
997 {
998         LBUG();
999         return 0;
1000 }
1001
1002 int llog_lvfs_prev_block(struct llog_handle *loghandle,
1003                          int prev_idx, void *buf, int len)
1004 {
1005         LBUG();
1006         return 0;
1007 }
1008
1009 int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
1010                          int next_idx, __u64 *offset, void *buf, int len)
1011 {
1012         LBUG();
1013         return 0;
1014 }
1015
1016 struct llog_operations llog_lvfs_ops = {
1017         lop_create:      llog_lvfs_create,
1018         lop_destroy:     llog_lvfs_destroy,
1019         lop_close:       llog_lvfs_close,
1020         lop_read_header: llog_lvfs_read_header,
1021         lop_write_rec:   llog_lvfs_write_rec,
1022         lop_next_block:  llog_lvfs_next_block,
1023         lop_prev_block:  llog_lvfs_prev_block,
1024 };
1025 #endif