Whamcloud - gitweb
- unland b_fid to HEAD
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lvfs.h>
42 #include <linux/lustre_fsfilt.h>
43 #include <linux/lustre_log.h>
44
45 #ifdef __KERNEL__
46
47 static int llog_lvfs_pad(struct llog_ctxt *ctxt, struct l_file *file,
48                          int len, int index)
49 {
50         struct llog_rec_hdr rec;
51         struct llog_rec_tail tail;
52         int rc;
53         ENTRY;
54
55         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
56
57         tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
58         tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
59         rec.lrh_type = 0;
60
61         rc = llog_fsfilt_write_record(ctxt, file, &rec, sizeof(rec),
62                                       &file->f_pos, 0);
63         if (rc) {
64                 CERROR("error writing padding record: rc %d\n", rc);
65                 goto out;
66         }
67
68         file->f_pos += len - sizeof(rec) - sizeof(tail);
69         rc = llog_fsfilt_write_record(ctxt, file, &tail, sizeof(tail),
70                                       &file->f_pos, 0);
71         if (rc) {
72                 CERROR("error writing padding record: rc %d\n", rc);
73                 goto out;
74         }
75
76  out:
77         RETURN(rc);
78 }
79
80 static int llog_lvfs_write_blob(struct llog_ctxt *ctxt, struct l_file *file,
81                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
82 {
83         int rc;
84         struct llog_rec_tail end;
85         loff_t saved_off = file->f_pos;
86         int buflen = le32_to_cpu(rec->lrh_len);
87
88         ENTRY;
89         file->f_pos = off;
90
91         if (!buf) {
92                 rc = llog_fsfilt_write_record(ctxt, file, rec, buflen,
93                                               &file->f_pos, 0);
94                 if (rc) {
95                         CERROR("error writing log record: rc %d\n", rc);
96                         goto out;
97                 }
98                 GOTO(out, rc = 0);
99         }
100
101         /* the buf case */
102         rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
103         rc = llog_fsfilt_write_record(ctxt, file, rec, sizeof(*rec),
104                                       &file->f_pos, 0);
105         if (rc) {
106                 CERROR("error writing log hdr: rc %d\n", rc);
107                 goto out;
108         }
109
110         rc = llog_fsfilt_write_record(ctxt, file, buf, buflen,
111                                       &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log buffer: rc %d\n", rc);
114                 goto out;
115         }
116
117         end.lrt_len = rec->lrh_len;
118         end.lrt_index = rec->lrh_index;
119         rc = llog_fsfilt_write_record(ctxt, file, &end, sizeof(end),
120                                       &file->f_pos, 0);
121         if (rc) {
122                 CERROR("error writing log tail: rc %d\n", rc);
123                 goto out;
124         }
125
126         rc = 0;
127  out:
128         if (saved_off > file->f_pos)
129                 file->f_pos = saved_off;
130         LASSERT(rc <= 0);
131         RETURN(rc);
132 }
133
134 static int llog_lvfs_read_blob(struct llog_ctxt *ctxt, struct l_file *file,
135                                void *buf, int size, loff_t off)
136 {
137         loff_t offset = off;
138         int rc;
139         ENTRY;
140
141         rc = llog_fsfilt_read_record(ctxt, file, buf, size, &offset);
142         if (rc) {
143                 CERROR("error reading log record: rc %d\n", rc);
144                 RETURN(rc);
145         }
146         RETURN(0);
147 }
148
149 static int llog_lvfs_read_header(struct llog_handle *handle)
150 {
151         struct llog_ctxt *ctxt = handle->lgh_ctxt;
152         int rc;
153         ENTRY;
154
155         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
156         LASSERT(ctxt != NULL);
157
158         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
159                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
160                 RETURN(LLOG_EEMPTY);
161         }
162
163         rc = llog_lvfs_read_blob(ctxt, handle->lgh_file, handle->lgh_hdr,
164                                  LLOG_CHUNK_SIZE, 0);
165         if (rc)
166                 CERROR("error reading log header\n");
167
168         handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
169         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
170
171         RETURN(rc);
172 }
173
174 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
175 /* appends if idx == -1, otherwise overwrites record idx. */
176 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
177                                struct llog_rec_hdr *rec,
178                                struct llog_cookie *reccookie,
179                                int cookiecount,
180                                void *buf, int idx)
181 {
182         struct llog_log_hdr *llh;
183         int reclen = le32_to_cpu(rec->lrh_len), index, rc;
184         struct llog_rec_tail *lrt;
185         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
186         struct file *file;
187         loff_t offset;
188         size_t left;
189         ENTRY;
190
191         llh = loghandle->lgh_hdr;
192         file = loghandle->lgh_file;
193
194         /* record length should not bigger than LLOG_CHUNK_SIZE */
195         if (buf)
196                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
197                       - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
198         else
199                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
200         if (rc)
201                 RETURN(rc);
202
203         if (idx != -1) {
204                 loff_t saved_offset;
205
206                 /* no header: only allowed to insert record 1 */
207                 if (idx > 1 && !file->f_dentry->d_inode->i_size) {
208                         CERROR("idx != -1 in empty log\n");
209                         LBUG();
210                 }
211
212                 if (idx && llh->llh_size && llh->llh_size != reclen)
213                         RETURN(-EINVAL);
214
215                 rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
216                 /* we are done if we only write the header or on error */
217                 if (rc || idx == 0)
218                         RETURN(rc);
219
220                 saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
221                 rc = llog_lvfs_write_blob(ctxt, file, rec, buf, saved_offset);
222                 if (rc == 0 && reccookie) {
223                         reccookie->lgc_lgl = loghandle->lgh_id;
224                         reccookie->lgc_index = idx;
225                         rc = 1;
226                 }
227                 RETURN(rc);
228         }
229
230         /* Make sure that records don't cross a chunk boundary, so we can
231          * process them page-at-a-time if needed.  If it will cross a chunk
232          * boundary, write in a fake (but referenced) entry to pad the chunk.
233          *
234          * We know that llog_current_log() will return a loghandle that is
235          * big enough to hold reclen, so all we care about is padding here.
236          */
237         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
238         if (buf)
239                 reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
240                          sizeof(struct llog_rec_tail);
241
242         /* NOTE: padding is a record, but no bit is set */
243         if (left != 0 && left != reclen &&
244             left < (reclen + LLOG_MIN_REC_SIZE)) {
245                 loghandle->lgh_last_idx++;
246                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
247                 if (rc)
248                         RETURN(rc);
249                 /* if it's the last idx in log file, then return -ENOSPC */
250                 if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
251                         RETURN(-ENOSPC);
252         }
253
254         loghandle->lgh_last_idx++;
255         index = loghandle->lgh_last_idx;
256         LASSERT(index < LLOG_BITMAP_SIZE(llh));
257         rec->lrh_index = cpu_to_le32(index);
258         if (buf == NULL) {
259                 lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
260                 lrt->lrt_len = rec->lrh_len;
261                 lrt->lrt_index = rec->lrh_index;
262         }
263         if (ext2_set_bit(index, llh->llh_bitmap)) {
264                 CERROR("argh, index %u already set in log bitmap?\n", index);
265                 LBUG(); /* should never happen */
266         }
267         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
268         llh->llh_tail.lrt_index = cpu_to_le32(index);
269
270         offset = 0;
271         rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
272         if (rc)
273                 RETURN(rc);
274
275         CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
276                loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
277                file->f_pos);
278
279         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
280         if (rc)
281                 RETURN(rc);
282
283         if (rc == 0 && reccookie) {
284                 if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
285                         LASSERTF(EQ_LOGID(reccookie->lgc_lgl,loghandle->lgh_id),
286                                  "lgc_lgl.oid/gr "LPU64"/"LPU64" lgh_id.oid/gr"
287                                  LPU64"/"LPU64"\n",
288                                  reccookie->lgc_lgl.lgl_oid,
289                                  reccookie->lgc_lgl.lgl_ogr,
290                                  loghandle->lgh_id.lgl_oid,
291                                  loghandle->lgh_id.lgl_oid);
292                         LASSERTF(reccookie->lgc_index == index,
293                                  "lgc_index %u != index %u\n",
294                                  reccookie->lgc_index, index);
295                 } else {
296                         reccookie->lgc_lgl = loghandle->lgh_id;
297                         reccookie->lgc_index = index;
298                         llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
299                 }
300
301                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
302                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
303                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
304                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
305                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
306                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
307                 else
308                         reccookie->lgc_subsys = -1;
309                 rc = 1;
310         }
311         if (rc == 0 && (le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC ||
312             le32_to_cpu(rec->lrh_type) == SMFS_UPDATE_REC))
313                 rc = 1;
314
315         RETURN(rc);
316 }
317
318 /* We can skip reading at least as many log blocks as the number of
319 * minimum sized log records we are skipping.  If it turns out
320 * that we are not far enough along the log (because the
321 * actual records are larger than minimum size) we just skip
322 * some more records. */
323
324 static void llog_skip_over(__u64 *off, int curr, int goal)
325 {
326         if (goal <= curr)
327                 return;
328         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
329                 ~(LLOG_CHUNK_SIZE - 1);
330 }
331
332 /* sets:
333  *  - curr_offset to the furthest point read in the log file
334  *  - curr_idx to the log index preceeding curr_offset
335  * returns -EIO/-EINVAL on error
336  */
337 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
338                                 int next_idx, __u64 *curr_offset, void *buf,
339                                 int len)
340 {
341         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
342         ENTRY;
343
344         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
345                 RETURN(-EINVAL);
346
347         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
348                next_idx, *curr_idx, *curr_offset);
349
350         while (*curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
351                 struct llog_rec_hdr *rec;
352                 struct llog_rec_tail *tail;
353                 loff_t ppos;
354                 int nbytes, rc;
355
356                 llog_skip_over(curr_offset, *curr_idx, next_idx);
357
358                 ppos = *curr_offset;
359                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
360                                              buf, len, &ppos);
361
362                 if (rc) {
363                         CERROR("Cant read llog block at log id "LPU64
364                                "/%u offset "LPU64"\n",
365                                loghandle->lgh_id.lgl_oid,
366                                loghandle->lgh_id.lgl_ogen,
367                                *curr_offset);
368                         RETURN(rc);
369                 }
370
371                 nbytes = ppos - *curr_offset;
372                 *curr_offset = ppos;
373
374                 if (nbytes == 0) /* end of file, nothing to do */
375                         RETURN(0);
376
377                 if (nbytes < sizeof(*tail)) {
378                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
379                                LPU64"\n", loghandle->lgh_id.lgl_oid,
380                                loghandle->lgh_id.lgl_ogen, *curr_offset);
381                         RETURN(-EINVAL);
382                 }
383
384                 tail = buf + nbytes - sizeof(struct llog_rec_tail);
385                 *curr_idx = le32_to_cpu(tail->lrt_index);
386
387                 /* this shouldn't happen */
388                 if (tail->lrt_index == 0) {
389                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
390                                LPU64"\n", loghandle->lgh_id.lgl_oid,
391                                loghandle->lgh_id.lgl_ogen, *curr_offset);
392                         RETURN(-EINVAL);
393                 }
394                 if (le32_to_cpu(tail->lrt_index) < next_idx)
395                         continue;
396
397                 /* sanity check that the start of the new buffer is no farther
398                  * than the record that we wanted.  This shouldn't happen. */
399                 rec = buf;
400                 if (le32_to_cpu(rec->lrh_index) > next_idx) {
401                         CERROR("missed desired record? %u > %u\n",
402                                le32_to_cpu(rec->lrh_index), next_idx);
403                         RETURN(-ENOENT);
404                 }
405                 RETURN(0);
406         }
407         RETURN(-EIO);
408 }
409
410 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
411                                 int prev_idx, void *buf, int len)
412 {
413         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
414         __u64 curr_offset;
415         int rc;
416         ENTRY;
417
418         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
419                 RETURN(-EINVAL);
420
421         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
422
423         curr_offset = LLOG_CHUNK_SIZE;
424         llog_skip_over(&curr_offset, 0, prev_idx);
425
426         while (curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
427                 struct llog_rec_hdr *rec;
428                 struct llog_rec_tail *tail;
429                 loff_t ppos;
430
431                 ppos = curr_offset;
432                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
433                                              buf, len, &ppos);
434
435                 if (rc) {
436                         CERROR("Cant read llog block at log id "LPU64
437                                "/%u offset "LPU64"\n",
438                                loghandle->lgh_id.lgl_oid,
439                                loghandle->lgh_id.lgl_ogen,
440                                curr_offset);
441                         RETURN(rc);
442                 }
443
444                 /* put number of bytes read into rc to make code simpler */
445                 rc = ppos - curr_offset;
446                 curr_offset = ppos;
447
448                 if (rc == 0) /* end of file, nothing to do */
449                         RETURN(0);
450
451                 if (rc < sizeof(*tail)) {
452                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
453                                LPU64"\n", loghandle->lgh_id.lgl_oid,
454                                loghandle->lgh_id.lgl_ogen, curr_offset);
455                         RETURN(-EINVAL);
456                 }
457
458                 tail = buf + rc - sizeof(struct llog_rec_tail);
459
460                 /* this shouldn't happen */
461                 if (tail->lrt_index == 0) {
462                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
463                                LPU64"\n", loghandle->lgh_id.lgl_oid,
464                                loghandle->lgh_id.lgl_ogen, curr_offset);
465                         RETURN(-EINVAL);
466                 }
467                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
468                         continue;
469
470                 /* sanity check that the start of the new buffer is no farther
471                  * than the record that we wanted.  This shouldn't happen. */
472                 rec = buf;
473                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
474                         CERROR("missed desired record? %u > %u\n",
475                                le32_to_cpu(rec->lrh_index), prev_idx);
476                         RETURN(-ENOENT);
477                 }
478                 RETURN(0);
479         }
480         RETURN(-EIO);
481 }
482
483 static struct file *llog_filp_open(char *name, int flags, int mode)
484 {
485         char *logname;
486         struct file *filp;
487         int len;
488
489         OBD_ALLOC(logname, PATH_MAX);
490         if (logname == NULL)
491                 return ERR_PTR(-ENOMEM);
492
493         len = snprintf(logname, PATH_MAX, "LOGS/%s", name);
494         if (len >= PATH_MAX - 1) {
495                 filp = ERR_PTR(-ENAMETOOLONG);
496         } else {
497                 filp = l_filp_open(logname, flags, mode);
498                 if (IS_ERR(filp)) {
499                         CERROR("logfile %s(%s): %ld\n",
500                                flags & O_CREAT ? "create" : "open", logname,
501                                PTR_ERR(filp));
502                 }
503         }
504
505         OBD_FREE(logname, PATH_MAX);
506         return filp;
507 }
508
509 /* creates object for the case when we have no obd (smfs). */
510 static struct file *
511 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
512 {
513         struct file *filp;
514         int rc = 0;
515         ENTRY;
516
517         LASSERT(lgh_id != NULL);
518         if (lgh_id->lgl_oid) {
519                 struct dentry *dchild;
520                 char fidname[LL_FID_NAMELEN];
521                 int fidlen = 0;
522
523                 down(&ctxt->loc_objects_dir->d_inode->i_sem);
524                 fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
525                 dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
526                 if (IS_ERR(dchild)) {
527                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
528                         RETURN((struct file *)dchild);
529                 }
530                 if (dchild->d_inode == NULL) {
531                         struct dentry_params dp;
532                         struct inode *inode;
533
534                         dchild->d_fsdata = (void *) &dp;
535                         dp.p_ptr = NULL;
536                         dp.p_inum = lgh_id->lgl_oid;
537                         rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
538                                            dchild, S_IFREG, NULL);
539                         if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
540                                 dchild->d_fsdata = NULL;
541                         if (rc) {
542                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
543                                 dput(dchild);
544                                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
545                                 RETURN(ERR_PTR(rc));
546                         }
547                         inode = dchild->d_inode;
548                         LASSERT(inode->i_ino == lgh_id->lgl_oid);
549                         inode->i_generation = lgh_id->lgl_ogen;
550                         CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
551                                inode->i_ino, inode->i_generation);
552                         mark_inode_dirty(inode);
553                 }
554
555                 mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
556                 filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
557                                     O_RDWR | O_LARGEFILE);
558                 if (IS_ERR(filp)) {
559                         dput(dchild);
560                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
561                         RETURN(filp);
562                 }
563                 if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
564                         CERROR("%s is not a regular file!: mode = %o\n", fidname,
565                                filp->f_dentry->d_inode->i_mode);
566                         filp_close(filp, 0);
567                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
568                         RETURN(ERR_PTR(-ENOENT));
569                 }
570
571                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
572                 RETURN(filp);
573
574         } else {
575                 unsigned int tmpname = ll_insecure_random_int();
576                 char fidname[LL_FID_NAMELEN];
577                 struct dentry *new_child, *parent;
578                 void *handle;
579                 int err, namelen;
580
581                 sprintf(fidname, "OBJECTS/%u", tmpname);
582                 filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
583                 if (IS_ERR(filp)) {
584                         rc = PTR_ERR(filp);
585                         if (rc == -EEXIST) {
586                                 CERROR("impossible object name collision %u\n",
587                                         tmpname);
588                                 LBUG();
589                         }
590                         CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
591                         RETURN(filp);
592                 }
593
594                 namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
595                                      filp->f_dentry->d_inode->i_generation);
596                 parent = filp->f_dentry->d_parent;
597                 down(&parent->d_inode->i_sem);
598                 new_child = lookup_one_len(fidname, parent, namelen);
599                 if (IS_ERR(new_child)) {
600                         CERROR("getting neg dentry for obj rename: %d\n", rc);
601                         GOTO(out_close, rc = PTR_ERR(new_child));
602                 }
603                 if (new_child->d_inode != NULL) {
604                         CERROR("impossible non-negative obj dentry %lu:%u!\n",
605                                 filp->f_dentry->d_inode->i_ino,
606                                 filp->f_dentry->d_inode->i_generation);
607                         LBUG();
608                 }
609
610                 handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
611                 if (IS_ERR(handle))
612                         GOTO(out_dput, rc = PTR_ERR(handle));
613
614                 lock_kernel();
615                 rc = vfs_rename(parent->d_inode, filp->f_dentry,
616                                 parent->d_inode, new_child);
617                 unlock_kernel();
618                 if (rc)
619                         CERROR("error renaming new object %lu:%u: rc %d\n",
620                                 filp->f_dentry->d_inode->i_ino,
621                                 filp->f_dentry->d_inode->i_generation, rc);
622
623                 err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
624                 if (!rc)
625                         rc = err;
626
627         out_dput:
628                 dput(new_child);
629         out_close:
630                 up(&parent->d_inode->i_sem);
631                 if (rc) {
632                         filp_close(filp, 0);
633                         filp = ERR_PTR(rc);
634                 } else {
635                         /* FIXME: is this group 1 is correct? */
636                         lgh_id->lgl_ogr = 1;
637                         lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
638                         lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
639                 }
640                 RETURN(filp);
641         }
642 }
643
644 /* creates object for generic case (obd exists) */
645 static struct file *
646 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
647 {
648         struct file *filp = NULL;
649         struct dentry *dchild;
650         struct obd_device *obd;
651         struct obdo *oa = NULL;
652         int open_flags = O_RDWR | O_LARGEFILE;
653         int rc = 0;
654         ENTRY;
655
656         obd = ctxt->loc_exp->exp_obd;
657         LASSERT(obd != NULL);
658
659         if (lgh_id->lgl_oid) {
660                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
661                                              lgh_id->lgl_ogen, lgh_id->lgl_ogr);
662                 if (IS_ERR(dchild) == -ENOENT) {
663                         OBD_ALLOC(oa, sizeof(*oa));
664                         if (!oa)
665                                 RETURN(ERR_PTR(-ENOMEM));
666
667                         oa->o_id = lgh_id->lgl_oid;
668                         oa->o_generation = lgh_id->lgl_ogen;
669                         oa->o_gr = lgh_id->lgl_ogr;
670                         oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
671                         rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
672                         if (rc) {
673                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
674                                 GOTO(out_free_oa, rc);
675                         }
676                         CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
677                                lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
678
679                         dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
680                                                      lgh_id->lgl_ogen, lgh_id->lgl_ogr);
681                 } else if (IS_ERR(dchild)) {
682                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
683                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
684                         RETURN((struct file *)dchild);
685                 }
686
687                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
688                 if (IS_ERR(filp)) {
689                         l_dput(dchild);
690                         rc = PTR_ERR(filp);
691                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
692                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
693                 }
694                 GOTO(out_free_oa, rc);
695         } else {
696                 /* this is important to work here over obd_create() as it manages 
697                   groups and we need it. Yet another reason is that mds_obd_create()
698                  is fully the same as old version of this function and this helps
699                  us to avoid code duplicating and layering violating. */
700                 OBD_ALLOC(oa, sizeof(*oa));
701                 if (!oa)
702                         RETURN(ERR_PTR(-ENOMEM));
703
704                 oa->o_gr = FILTER_GROUP_LLOG;
705                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
706                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
707                 if (rc)
708                         GOTO(out_free_oa, rc);
709
710                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
711                                              oa->o_generation, oa->o_gr);
712                 if (IS_ERR(dchild))
713                         GOTO(out_free_oa, rc = PTR_ERR(dchild));
714
715                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
716                                      open_flags);
717                 if (IS_ERR(filp)) {
718                         l_dput(dchild);
719                         GOTO(out_free_oa, rc = PTR_ERR(filp));
720                 }
721
722                 /* group 1 is not longer valid, we use the group which is set 
723                 by obd_create()->mds_obd_create(). */
724                 lgh_id->lgl_ogr = oa->o_gr;
725                 lgh_id->lgl_oid = oa->o_id;
726                 lgh_id->lgl_ogen = oa->o_generation;
727         }
728
729 out_free_oa:
730         if (rc)
731                 filp = ERR_PTR(rc);
732         if (oa)
733                 OBD_FREE(oa, sizeof(*oa));
734         RETURN(filp);
735 }
736
737 static struct file *
738 llog_object_create(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
739 {
740         if (ctxt->loc_alone)
741                 return llog_object_create_alone(ctxt, lgh_id);
742         else
743                 return llog_object_create_generic(ctxt, lgh_id);
744 }
745
746 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
747                                 struct dentry *dentry)
748 {
749         struct dentry *new_child;
750         char fidname[LL_FID_NAMELEN];
751         void *handle;
752         int namelen, rc = 0, err;
753         ENTRY;
754         
755         namelen = ll_fid2str(fidname, logid.lgl_oid, logid.lgl_ogen);
756         down(&ctxt->loc_objects_dir->d_inode->i_sem);
757         new_child = lookup_one_len(fidname, ctxt->loc_objects_dir, namelen);
758         if (IS_ERR(new_child)) {
759                 CERROR("getting neg dentry for obj rename: %d\n", rc);
760                 GOTO(out, rc = PTR_ERR(new_child));
761         }
762         if (new_child->d_inode == dentry->d_inode)
763                 GOTO(out_dput, rc);
764         if (new_child->d_inode != NULL) {
765                 CERROR("impossible non-negative obj dentry "LPX64":%u!\n",
766                        logid.lgl_oid, logid.lgl_ogen);
767                 LBUG();
768         }
769         handle = llog_fsfilt_start(ctxt, ctxt->loc_objects_dir->d_inode,
770                                    FSFILT_OP_LINK, NULL);
771         if (IS_ERR(handle))
772                 GOTO(out_dput, rc = PTR_ERR(handle));
773         
774         lock_kernel();
775         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
776         unlock_kernel();
777         if (rc) {
778                 CERROR("error link new object "LPX64":%08x: rc %d\n",
779                        logid.lgl_oid, logid.lgl_ogen, rc);
780                 /* it doesn't make much sense to get -EEXIST here */
781                 LASSERTF(rc != -EEXIST, "bug 3490: dentry: %p "
782                          "dir->d_ionode %p new_child: %p  \n",
783                          dentry, ctxt->loc_objects_dir->d_inode, new_child);
784         }
785         err = llog_fsfilt_commit(ctxt, ctxt->loc_objects_dir->d_inode, handle, 0);
786 out_dput:
787         l_dput(new_child);
788 out:
789         up(&ctxt->loc_objects_dir->d_inode->i_sem);
790         RETURN(rc);
791 }
792
793 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
794                           struct llog_logid *logid, char *name, int flags)
795 {
796         struct llog_handle *handle;
797         struct lvfs_run_ctxt saved;
798         int rc = 0;
799         int open_flags = O_RDWR | O_LARGEFILE;
800         ENTRY;
801
802         if (flags & OBD_LLOG_FL_CREATE)
803                 open_flags |= O_CREAT;
804
805         handle = llog_alloc_handle();
806         if (handle == NULL)
807                 RETURN(-ENOMEM);
808         *res = handle;
809         
810         LASSERT(ctxt);
811         if (ctxt->loc_lvfs_ctxt)
812                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
813         
814         if (logid != NULL) {
815                 handle->lgh_file = llog_object_create(ctxt, logid);
816                 if (IS_ERR(handle->lgh_file)) {
817                         CERROR("cannot create/open llog object "LPX64":%x "
818                                "error = %ld", logid->lgl_oid, logid->lgl_ogen,
819                                PTR_ERR(handle->lgh_file));
820                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
821                 }
822                 handle->lgh_id = *logid;
823
824         } else if (name) {
825                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
826                 if (IS_ERR(handle->lgh_file)) {
827                         CERROR("cannot open %s file, error = %ld\n", 
828                                name, PTR_ERR(handle->lgh_file));
829                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
830                 }
831                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
832                 
833                 handle->lgh_id.lgl_ogr = 1;
834                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
835                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
836                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
837                 if (rc)
838                         GOTO(cleanup, rc);
839
840         } else {
841                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
842                 if (IS_ERR(handle->lgh_file)) {
843                         CERROR("cannot create llog object, error = %ld\n", 
844                                PTR_ERR(handle->lgh_file));
845                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
846                 }
847         }
848
849         handle->lgh_ctxt = ctxt;
850 finish:
851         if (ctxt->loc_lvfs_ctxt)
852                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
853         RETURN(rc);
854 cleanup:
855         llog_free_handle(handle);
856         goto finish;
857 }
858
859 static int llog_lvfs_close(struct llog_handle *handle)
860 {
861         int rc;
862         ENTRY;
863
864         rc = filp_close(handle->lgh_file, 0);
865         if (rc)
866                 CERROR("error closing log: rc %d\n", rc);
867         RETURN(rc);
868 }
869
870 static int llog_lvfs_destroy(struct llog_handle *loghandle)
871 {
872         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
873         struct lvfs_run_ctxt saved;
874         struct dentry *fdentry;
875         struct inode *parent_inode;
876         char fidname[LL_FID_NAMELEN];
877         void *handle;
878         int rc = -EINVAL, err, namelen;
879         ENTRY;
880         
881         if (ctxt->loc_lvfs_ctxt)
882                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
883         
884         fdentry = loghandle->lgh_file->f_dentry;
885         parent_inode = fdentry->d_parent->d_inode;
886         
887         if (!strcmp(fdentry->d_parent->d_name.name, "LOGS")) {
888                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
889                 
890                 namelen = ll_fid2str(fidname, fdentry->d_inode->i_ino,
891                                      fdentry->d_inode->i_generation);
892                 dget(fdentry);
893                 rc = llog_lvfs_close(loghandle);
894                 if (rc) {
895                         dput(fdentry);
896                         GOTO(out, rc);
897                 }
898                 
899                 handle = llog_fsfilt_start(ctxt, parent_inode,
900                                            FSFILT_OP_UNLINK, NULL);
901                 if (IS_ERR(handle)) {
902                         dput(fdentry);
903                         GOTO(out, rc = PTR_ERR(handle));
904                 }
905                 
906                 down(&parent_inode->i_sem);
907                 rc = vfs_unlink(parent_inode, fdentry);
908                 up(&parent_inode->i_sem);
909                 dput(fdentry);
910                 
911                 if (!rc) {
912                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
913                         fdentry = lookup_one_len(fidname, ctxt->loc_objects_dir,
914                                                  namelen);
915                         if (fdentry == NULL || fdentry->d_inode == NULL) {
916                                 CERROR("destroy non_existent object %s\n", fidname);
917                                 GOTO(out_err, rc = IS_ERR(fdentry) ?
918                                      PTR_ERR(fdentry) : -ENOENT);
919                         }
920                         rc = vfs_unlink(ctxt->loc_objects_dir->d_inode, fdentry);
921                         l_dput(fdentry);
922 out_err:
923                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
924                 }
925                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
926                 if (err && !rc)
927                         err = rc;
928                 
929                 GOTO(out, rc);
930         }
931         if (ctxt->loc_alone) {
932                 if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
933                         LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
934                         
935                         dget(fdentry);
936                         rc = llog_lvfs_close(loghandle);
937                         if (rc == 0) {
938                                 down(&parent_inode->i_sem);
939                                 rc = vfs_unlink(parent_inode, fdentry);
940                                 up(&parent_inode->i_sem);
941                         }
942                         dput(fdentry);
943                 }
944         } else {
945                 struct obdo *oa = NULL;
946  
947                 OBD_ALLOC(oa, sizeof(*oa));
948                 if (!oa)
949                         GOTO(out, rc = -ENOMEM);
950                 
951                 oa->o_id = loghandle->lgh_id.lgl_oid;
952                 oa->o_gr = loghandle->lgh_id.lgl_ogr;
953                 oa->o_generation = loghandle->lgh_id.lgl_ogen;
954                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
955                 
956                 rc = llog_lvfs_close(loghandle);
957                 if (rc)
958                         GOTO(out_free_oa, rc);
959                 
960                 rc = obd_destroy(loghandle->lgh_ctxt->loc_exp, oa, NULL, NULL);
961 out_free_oa:
962                 OBD_FREE(oa, sizeof(*oa));
963         }
964 out:
965         if (ctxt->loc_lvfs_ctxt)
966                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
967         RETURN(rc);
968 }
969
970 /* reads the catalog list */
971 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
972                       struct fsfilt_operations *fsops, const char *name,
973                       int count, struct llog_catid *idarray)
974 {
975         struct lvfs_run_ctxt saved;
976         struct l_file *file;
977         int size = sizeof(*idarray) * count;
978         loff_t off = 0;
979         int rc;
980
981         LASSERT(count);
982
983         if (ctxt)
984                 push_ctxt(&saved, ctxt, NULL);
985         file = l_filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
986         if (!file || IS_ERR(file)) {
987                 rc = PTR_ERR(file);
988                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
989                        name, rc);
990                 GOTO(out, rc);
991         }
992
993         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
994                 CERROR("%s is not a regular file!: mode = %o\n", name,
995                        file->f_dentry->d_inode->i_mode);
996                 GOTO(out, rc = -ENOENT);
997         }
998
999         rc = fsops->fs_read_record(file, idarray, size, &off);
1000         if (rc) {
1001                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
1002                        name, rc);
1003                 GOTO(out, rc);
1004         }
1005
1006  out:
1007         if (file && !IS_ERR(file))
1008                 rc = filp_close(file, 0);
1009         if (ctxt)
1010                 pop_ctxt(&saved, ctxt, NULL);
1011         RETURN(rc);
1012 }
1013 EXPORT_SYMBOL(llog_get_cat_list);
1014
1015 /* writes the cat list */
1016 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1017                       struct fsfilt_operations *fsops, const char *name,
1018                       int count, struct llog_catid *idarray)
1019 {
1020         struct lvfs_run_ctxt saved;
1021         struct l_file *file;
1022         int size = sizeof(*idarray) * count;
1023         loff_t off = 0;
1024         int rc;
1025
1026         LASSERT(count);
1027
1028         if (ctxt)
1029                 push_ctxt(&saved, ctxt, NULL);
1030         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
1031         if (!file || IS_ERR(file)) {
1032                 rc = PTR_ERR(file);
1033                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
1034                        name, rc);
1035                 GOTO(out, rc);
1036         }
1037
1038         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
1039                 CERROR("%s is not a regular file!: mode = %o\n", name,
1040                        file->f_dentry->d_inode->i_mode);
1041                 GOTO(out, rc = -ENOENT);
1042         }
1043
1044         rc = fsops->fs_write_record(file, idarray, size, &off, 1);
1045         if (rc) {
1046                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
1047                        name, rc);
1048                 GOTO(out, rc);
1049         }
1050
1051  out:
1052         if (file && !IS_ERR(file))
1053                 rc = filp_close(file, 0);
1054         if (ctxt)
1055                 pop_ctxt(&saved, ctxt, NULL);
1056         RETURN(rc);
1057 }
1058 EXPORT_SYMBOL(llog_put_cat_list);
1059
1060 struct llog_operations llog_lvfs_ops = {
1061         lop_open:        llog_lvfs_open,
1062         lop_destroy:     llog_lvfs_destroy,
1063         lop_close:       llog_lvfs_close,
1064         lop_read_header: llog_lvfs_read_header,
1065         lop_write_rec:   llog_lvfs_write_rec,
1066         lop_next_block:  llog_lvfs_next_block,
1067         lop_prev_block:  llog_lvfs_prev_block,
1068 };
1069 EXPORT_SYMBOL(llog_lvfs_ops);
1070
1071 #else /* !__KERNEL__ */
1072
1073 static int llog_lvfs_read_header(struct llog_handle *handle)
1074 {
1075         LBUG();
1076         return 0;
1077 }
1078
1079 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
1080                                struct llog_rec_hdr *rec,
1081                                struct llog_cookie *reccookie, int cookiecount,
1082                                void *buf, int idx)
1083 {
1084         LBUG();
1085         return 0;
1086 }
1087
1088 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
1089                           struct llog_logid *logid, char *name, int flags)
1090 {
1091         LBUG();
1092         return 0;
1093 }
1094
1095 static int llog_lvfs_close(struct llog_handle *handle)
1096 {
1097         LBUG();
1098         return 0;
1099 }
1100
1101 static int llog_lvfs_destroy(struct llog_handle *handle)
1102 {
1103         LBUG();
1104         return 0;
1105 }
1106
1107 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
1108                       struct fsfilt_operations *fsops, const char *name,
1109                       int count, struct llog_catid *idarray)
1110 {
1111         LBUG();
1112         return 0;
1113 }
1114
1115 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1116                       struct fsfilt_operations *fsops, const char *name,
1117                       int count, struct llog_catid *idarray)
1118 {
1119         LBUG();
1120         return 0;
1121 }
1122
1123 int llog_lvfs_prev_block(struct llog_handle *loghandle,
1124                          int prev_idx, void *buf, int len)
1125 {
1126         LBUG();
1127         return 0;
1128 }
1129
1130 int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
1131                          int next_idx, __u64 *offset, void *buf, int len)
1132 {
1133         LBUG();
1134         return 0;
1135 }
1136
1137 struct llog_operations llog_lvfs_ops = {
1138         lop_open:        llog_lvfs_open,
1139         lop_destroy:     llog_lvfs_destroy,
1140         lop_close:       llog_lvfs_close,
1141         lop_read_header: llog_lvfs_read_header,
1142         lop_write_rec:   llog_lvfs_write_rec,
1143         lop_next_block:  llog_lvfs_next_block,
1144         lop_prev_block:  llog_lvfs_prev_block,
1145 };
1146 #endif