Whamcloud - gitweb
b=3825
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lvfs.h>
42 #include <linux/lustre_fsfilt.h>
43 #include <linux/lustre_log.h>
44
45 #ifdef __KERNEL__
46
47 static int llog_lvfs_pad(struct llog_ctxt *ctxt, struct l_file *file,
48                          int len, int index)
49 {
50         struct llog_rec_hdr rec;
51         struct llog_rec_tail tail;
52         int rc;
53         ENTRY;
54
55         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
56
57         tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
58         tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
59         rec.lrh_type = 0;
60
61         rc = llog_fsfilt_write_record(ctxt, file, &rec, sizeof(rec),
62                                       &file->f_pos, 0);
63         if (rc) {
64                 CERROR("error writing padding record: rc %d\n", rc);
65                 goto out;
66         }
67
68         file->f_pos += len - sizeof(rec) - sizeof(tail);
69         rc = llog_fsfilt_write_record(ctxt, file, &tail, sizeof(tail),
70                                       &file->f_pos, 0);
71         if (rc) {
72                 CERROR("error writing padding record: rc %d\n", rc);
73                 goto out;
74         }
75
76  out:
77         RETURN(rc);
78 }
79
80 static int llog_lvfs_write_blob(struct llog_ctxt *ctxt, struct l_file *file,
81                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
82 {
83         int rc;
84         struct llog_rec_tail end;
85         loff_t saved_off = file->f_pos;
86         int buflen = le32_to_cpu(rec->lrh_len);
87
88         ENTRY;
89         file->f_pos = off;
90
91         if (!buf) {
92                 rc = llog_fsfilt_write_record(ctxt, file, rec, buflen,
93                                               &file->f_pos, 0);
94                 if (rc) {
95                         CERROR("error writing log record: rc %d\n", rc);
96                         goto out;
97                 }
98                 GOTO(out, rc = 0);
99         }
100
101         /* the buf case */
102         rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
103         rc = llog_fsfilt_write_record(ctxt, file, rec, sizeof(*rec),
104                                       &file->f_pos, 0);
105         if (rc) {
106                 CERROR("error writing log hdr: rc %d\n", rc);
107                 goto out;
108         }
109
110         rc = llog_fsfilt_write_record(ctxt, file, buf, buflen,
111                                       &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log buffer: rc %d\n", rc);
114                 goto out;
115         }
116
117         end.lrt_len = rec->lrh_len;
118         end.lrt_index = rec->lrh_index;
119         rc = llog_fsfilt_write_record(ctxt, file, &end, sizeof(end),
120                                       &file->f_pos, 0);
121         if (rc) {
122                 CERROR("error writing log tail: rc %d\n", rc);
123                 goto out;
124         }
125
126         rc = 0;
127  out:
128         if (saved_off > file->f_pos)
129                 file->f_pos = saved_off;
130         LASSERT(rc <= 0);
131         RETURN(rc);
132 }
133
134 static int llog_lvfs_read_blob(struct llog_ctxt *ctxt, struct l_file *file,
135                                void *buf, int size, loff_t off)
136 {
137         loff_t offset = off;
138         int rc;
139         ENTRY;
140
141         rc = llog_fsfilt_read_record(ctxt, file, buf, size, &offset);
142         if (rc) {
143                 CERROR("error reading log record: rc %d\n", rc);
144                 RETURN(rc);
145         }
146         RETURN(0);
147 }
148
149 static int llog_lvfs_read_header(struct llog_handle *handle)
150 {
151         struct llog_ctxt *ctxt = handle->lgh_ctxt;
152         int rc;
153         ENTRY;
154
155         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
156         LASSERT(ctxt != NULL);
157
158         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
159                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
160                 RETURN(LLOG_EEMPTY);
161         }
162
163         rc = llog_lvfs_read_blob(ctxt, handle->lgh_file, handle->lgh_hdr,
164                                  LLOG_CHUNK_SIZE, 0);
165         if (rc)
166                 CERROR("error reading log header\n");
167
168         handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
169         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
170
171         RETURN(rc);
172 }
173
174 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
175 /* appends if idx == -1, otherwise overwrites record idx. */
176 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
177                                struct llog_rec_hdr *rec,
178                                struct llog_cookie *reccookie,
179                                int cookiecount,
180                                void *buf, int idx)
181 {
182         struct llog_log_hdr *llh;
183         int reclen = le32_to_cpu(rec->lrh_len), index, rc;
184         struct llog_rec_tail *lrt;
185         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
186         struct file *file;
187         loff_t offset;
188         size_t left;
189         ENTRY;
190
191         llh = loghandle->lgh_hdr;
192         file = loghandle->lgh_file;
193
194         /* record length should not bigger than LLOG_CHUNK_SIZE */
195         if (buf)
196                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
197                       - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
198         else
199                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
200         if (rc)
201                 RETURN(rc);
202
203         if (idx != -1) {
204                 loff_t saved_offset;
205
206                 /* no header: only allowed to insert record 1 */
207                 if (idx > 1 && !file->f_dentry->d_inode->i_size) {
208                         CERROR("idx != -1 in empty log\n");
209                         LBUG();
210                 }
211
212                 if (idx && llh->llh_size && llh->llh_size != reclen)
213                         RETURN(-EINVAL);
214
215                 rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
216                 /* we are done if we only write the header or on error */
217                 if (rc || idx == 0)
218                         RETURN(rc);
219
220                 saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
221                 rc = llog_lvfs_write_blob(ctxt, file, rec, buf, saved_offset);
222                 if (rc == 0 && reccookie) {
223                         reccookie->lgc_lgl = loghandle->lgh_id;
224                         reccookie->lgc_index = idx;
225                         rc = 1;
226                 }
227                 RETURN(rc);
228         }
229
230         /* Make sure that records don't cross a chunk boundary, so we can
231          * process them page-at-a-time if needed.  If it will cross a chunk
232          * boundary, write in a fake (but referenced) entry to pad the chunk.
233          *
234          * We know that llog_current_log() will return a loghandle that is
235          * big enough to hold reclen, so all we care about is padding here.
236          */
237         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
238         if (buf)
239                 reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
240                          sizeof(struct llog_rec_tail);
241
242         /* NOTE: padding is a record, but no bit is set */
243         if (left != 0 && left != reclen &&
244             left < (reclen + LLOG_MIN_REC_SIZE)) {
245                 loghandle->lgh_last_idx++;
246                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
247                 if (rc)
248                         RETURN(rc);
249                 /* if it's the last idx in log file, then return -ENOSPC */
250                 if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
251                         RETURN(-ENOSPC);
252         }
253
254         loghandle->lgh_last_idx++;
255         index = loghandle->lgh_last_idx;
256         rec->lrh_index = cpu_to_le32(index);
257         if (buf == NULL) {
258                 lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
259                 lrt->lrt_len = rec->lrh_len;
260                 lrt->lrt_index = rec->lrh_index;
261         }
262         if (ext2_set_bit(index, llh->llh_bitmap)) {
263                 CERROR("argh, index %u already set in log bitmap?\n", index);
264                 LBUG(); /* should never happen */
265         }
266         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
267         llh->llh_tail.lrt_index = cpu_to_le32(index);
268
269         offset = 0;
270         rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
271         if (rc)
272                 RETURN(rc);
273
274         CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
275                loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
276                file->f_pos);
277
278         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
279         if (rc)
280                 RETURN(rc);
281
282         if (rc == 0 && reccookie) {
283                 if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
284                         LASSERT(EQ_LOGID(reccookie->lgc_lgl, loghandle->lgh_id));
285                         LASSERT(reccookie->lgc_index == index);        
286                 } else {
287                         reccookie->lgc_lgl = loghandle->lgh_id;
288                         reccookie->lgc_index = index;
289                         llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
290                 }
291
292                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
293                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
294                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
295                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
296                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
297                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
298                 else
299                         reccookie->lgc_subsys = -1;
300                 rc = 1;
301         }
302         if (rc == 0 && (le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC ||
303             le32_to_cpu(rec->lrh_type) == SMFS_UPDATE_REC))
304                 rc = 1;
305
306         RETURN(rc);
307 }
308
309 /* We can skip reading at least as many log blocks as the number of
310 * minimum sized log records we are skipping.  If it turns out
311 * that we are not far enough along the log (because the
312 * actual records are larger than minimum size) we just skip
313 * some more records. */
314
315 static void llog_skip_over(__u64 *off, int curr, int goal)
316 {
317         if (goal <= curr)
318                 return;
319         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
320                 ~(LLOG_CHUNK_SIZE - 1);
321 }
322
323 /* sets:
324  *  - curr_offset to the furthest point read in the log file
325  *  - curr_idx to the log index preceeding curr_offset
326  * returns -EIO/-EINVAL on error
327  */
328 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
329                                 int next_idx, __u64 *curr_offset, void *buf,
330                                 int len)
331 {
332         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
333         ENTRY;
334
335         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
336                 RETURN(-EINVAL);
337
338         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
339                next_idx, *curr_idx, *curr_offset);
340
341         while (*curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
342                 struct llog_rec_hdr *rec;
343                 struct llog_rec_tail *tail;
344                 loff_t ppos;
345                 int nbytes, rc;
346
347                 llog_skip_over(curr_offset, *curr_idx, next_idx);
348
349                 ppos = *curr_offset;
350                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
351                                              buf, len, &ppos);
352
353                 if (rc) {
354                         CERROR("Cant read llog block at log id "LPU64
355                                "/%u offset "LPU64"\n",
356                                loghandle->lgh_id.lgl_oid,
357                                loghandle->lgh_id.lgl_ogen,
358                                *curr_offset);
359                         RETURN(rc);
360                 }
361
362                 nbytes = ppos - *curr_offset;
363                 *curr_offset = ppos;
364
365                 if (nbytes == 0) /* end of file, nothing to do */
366                         RETURN(0);
367
368                 if (nbytes < sizeof(*tail)) {
369                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
370                                LPU64"\n", loghandle->lgh_id.lgl_oid,
371                                loghandle->lgh_id.lgl_ogen, *curr_offset);
372                         RETURN(-EINVAL);
373                 }
374
375                 tail = buf + nbytes - sizeof(struct llog_rec_tail);
376                 *curr_idx = le32_to_cpu(tail->lrt_index);
377
378                 /* this shouldn't happen */
379                 if (tail->lrt_index == 0) {
380                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
381                                LPU64"\n", loghandle->lgh_id.lgl_oid,
382                                loghandle->lgh_id.lgl_ogen, *curr_offset);
383                         RETURN(-EINVAL);
384                 }
385                 if (le32_to_cpu(tail->lrt_index) < next_idx)
386                         continue;
387
388                 /* sanity check that the start of the new buffer is no farther
389                  * than the record that we wanted.  This shouldn't happen. */
390                 rec = buf;
391                 if (le32_to_cpu(rec->lrh_index) > next_idx) {
392                         CERROR("missed desired record? %u > %u\n",
393                                le32_to_cpu(rec->lrh_index), next_idx);
394                         RETURN(-ENOENT);
395                 }
396                 RETURN(0);
397         }
398         RETURN(-EIO);
399 }
400
401 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
402                                 int prev_idx, void *buf, int len)
403 {
404         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
405         __u64 curr_offset;
406         int rc;
407         ENTRY;
408
409         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
410                 RETURN(-EINVAL);
411
412         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
413
414         curr_offset = LLOG_CHUNK_SIZE;
415         llog_skip_over(&curr_offset, 0, prev_idx);
416
417         while (curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
418                 struct llog_rec_hdr *rec;
419                 struct llog_rec_tail *tail;
420                 loff_t ppos;
421
422                 ppos = curr_offset;
423                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
424                                              buf, len, &ppos);
425
426                 if (rc) {
427                         CERROR("Cant read llog block at log id "LPU64
428                                "/%u offset "LPU64"\n",
429                                loghandle->lgh_id.lgl_oid,
430                                loghandle->lgh_id.lgl_ogen,
431                                curr_offset);
432                         RETURN(rc);
433                 }
434
435                 /* put number of bytes read into rc to make code simpler */
436                 rc = ppos - curr_offset;
437                 curr_offset = ppos;
438
439                 if (rc == 0) /* end of file, nothing to do */
440                         RETURN(0);
441
442                 if (rc < sizeof(*tail)) {
443                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
444                                LPU64"\n", loghandle->lgh_id.lgl_oid,
445                                loghandle->lgh_id.lgl_ogen, curr_offset);
446                         RETURN(-EINVAL);
447                 }
448
449                 tail = buf + rc - sizeof(struct llog_rec_tail);
450
451                 /* this shouldn't happen */
452                 if (tail->lrt_index == 0) {
453                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
454                                LPU64"\n", loghandle->lgh_id.lgl_oid,
455                                loghandle->lgh_id.lgl_ogen, curr_offset);
456                         RETURN(-EINVAL);
457                 }
458                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
459                         continue;
460
461                 /* sanity check that the start of the new buffer is no farther
462                  * than the record that we wanted.  This shouldn't happen. */
463                 rec = buf;
464                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
465                         CERROR("missed desired record? %u > %u\n",
466                                le32_to_cpu(rec->lrh_index), prev_idx);
467                         RETURN(-ENOENT);
468                 }
469                 RETURN(0);
470         }
471         RETURN(-EIO);
472 }
473
474 static struct file *llog_filp_open(char *name, int flags, int mode)
475 {
476         char *logname;
477         struct file *filp;
478         int len;
479
480         OBD_ALLOC(logname, PATH_MAX);
481         if (logname == NULL)
482                 return ERR_PTR(-ENOMEM);
483
484         len = snprintf(logname, PATH_MAX, "LOGS/%s", name);
485         if (len >= PATH_MAX - 1) {
486                 filp = ERR_PTR(-ENAMETOOLONG);
487         } else {
488                 filp = l_filp_open(logname, flags, mode);
489                 if (IS_ERR(filp)) {
490                         CERROR("logfile %s(%s): %ld\n",
491                                flags & O_CREAT ? "create" : "open", logname,
492                                PTR_ERR(filp));
493                 }
494         }
495
496         OBD_FREE(logname, PATH_MAX);
497         return filp;
498 }
499
500 /* creates object for the case when we have no obd (smfs). */
501 static struct file *
502 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
503 {
504         struct file *filp;
505         int rc = 0;
506         ENTRY;
507
508         LASSERT(lgh_id != NULL);
509         if (lgh_id->lgl_oid) {
510                 struct dentry *dchild;
511                 char fidname[LL_FID_NAMELEN];
512                 int fidlen = 0;
513
514                 down(&ctxt->loc_objects_dir->d_inode->i_sem);
515                 fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
516                 dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
517                 if (IS_ERR(dchild)) {
518                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
519                         RETURN((struct file *)dchild);
520                 }
521                 if (dchild->d_inode == NULL) {
522                         struct dentry_params dp;
523                         struct inode *inode;
524
525                         dchild->d_fsdata = (void *) &dp;
526                         dp.p_ptr = NULL;
527                         dp.p_inum = lgh_id->lgl_oid;
528                         rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
529                                            dchild, S_IFREG, NULL);
530                         if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
531                                 dchild->d_fsdata = NULL;
532                         if (rc) {
533                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
534                                 dput(dchild);
535                                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
536                                 RETURN(ERR_PTR(rc));
537                         }
538                         inode = dchild->d_inode;
539                         LASSERT(inode->i_ino == lgh_id->lgl_oid);
540                         inode->i_generation = lgh_id->lgl_ogen;
541                         CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
542                                inode->i_ino, inode->i_generation);
543                         mark_inode_dirty(inode);
544                 }
545
546                 mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
547                 filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
548                                     O_RDWR | O_LARGEFILE);
549                 if (IS_ERR(filp)) {
550                         dput(dchild);
551                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
552                         RETURN(filp);
553                 }
554                 if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
555                         CERROR("%s is not a regular file!: mode = %o\n", fidname,
556                                filp->f_dentry->d_inode->i_mode);
557                         filp_close(filp, 0);
558                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
559                         RETURN(ERR_PTR(-ENOENT));
560                 }
561
562                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
563                 RETURN(filp);
564
565         } else {
566                 unsigned int tmpname = ll_insecure_random_int();
567                 char fidname[LL_FID_NAMELEN];
568                 struct dentry *new_child, *parent;
569                 void *handle;
570                 int err, namelen;
571
572                 sprintf(fidname, "OBJECTS/%u", tmpname);
573                 filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
574                 if (IS_ERR(filp)) {
575                         rc = PTR_ERR(filp);
576                         if (rc == -EEXIST) {
577                                 CERROR("impossible object name collision %u\n",
578                                         tmpname);
579                                 LBUG();
580                         }
581                         CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
582                         RETURN(filp);
583                 }
584
585                 namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
586                                      filp->f_dentry->d_inode->i_generation);
587                 parent = filp->f_dentry->d_parent;
588                 down(&parent->d_inode->i_sem);
589                 new_child = lookup_one_len(fidname, parent, namelen);
590                 if (IS_ERR(new_child)) {
591                         CERROR("getting neg dentry for obj rename: %d\n", rc);
592                         GOTO(out_close, rc = PTR_ERR(new_child));
593                 }
594                 if (new_child->d_inode != NULL) {
595                         CERROR("impossible non-negative obj dentry %lu:%u!\n",
596                                 filp->f_dentry->d_inode->i_ino,
597                                 filp->f_dentry->d_inode->i_generation);
598                         LBUG();
599                 }
600
601                 handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
602                 if (IS_ERR(handle))
603                         GOTO(out_dput, rc = PTR_ERR(handle));
604
605                 lock_kernel();
606                 rc = vfs_rename(parent->d_inode, filp->f_dentry,
607                                 parent->d_inode, new_child);
608                 unlock_kernel();
609                 if (rc)
610                         CERROR("error renaming new object %lu:%u: rc %d\n",
611                                 filp->f_dentry->d_inode->i_ino,
612                                 filp->f_dentry->d_inode->i_generation, rc);
613
614                 err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
615                 if (!rc)
616                         rc = err;
617
618         out_dput:
619                 dput(new_child);
620         out_close:
621                 up(&parent->d_inode->i_sem);
622                 if (rc) {
623                         filp_close(filp, 0);
624                         filp = ERR_PTR(rc);
625                 } else {
626                         /* FIXME: is this group 1 is correct? */
627                         lgh_id->lgl_ogr = 1;
628                         lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
629                         lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
630                 }
631                 RETURN(filp);
632         }
633 }
634
635 /* creates object for generic case (obd exists) */
636 static struct file *
637 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
638 {
639         struct file *filp;
640         struct dentry *dchild;
641         struct obd_device *obd;
642         struct obdo *oa = NULL;
643         int open_flags = O_RDWR | O_LARGEFILE;
644         int rc = 0;
645         ENTRY;
646
647         obd = ctxt->loc_exp->exp_obd;
648         LASSERT(obd != NULL);
649
650         if (lgh_id->lgl_oid) {
651                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
652                                              lgh_id->lgl_ogen, lgh_id->lgl_ogr);
653                 if (IS_ERR(dchild) == -ENOENT) {
654                         OBD_ALLOC(oa, sizeof(*oa));
655                         if (!oa)
656                                 RETURN(ERR_PTR(-ENOMEM));
657
658                         oa->o_id = lgh_id->lgl_oid;
659                         oa->o_generation = lgh_id->lgl_ogen;
660                         oa->o_gr = lgh_id->lgl_ogr;
661                         oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
662                         rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
663                         if (rc) {
664                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
665                                 GOTO(out_free_oa, rc);
666                         }
667                         CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
668                                lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
669
670                         dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
671                                                      lgh_id->lgl_ogen, lgh_id->lgl_ogr);
672                 } else if (IS_ERR(dchild)) {
673                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
674                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
675                         RETURN((struct file *)dchild);
676                 }
677
678                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
679                 if (IS_ERR(filp)) {
680                         l_dput(dchild);
681                         rc = PTR_ERR(filp);
682                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
683                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
684                 }
685                 GOTO(out_free_oa, rc);
686         } else {
687                 /* this is important to work here over obd_create() as it manages 
688                   groups and we need it. Yet another reason is that mds_obd_create()
689                  is fully the same as old version of this function and this helps
690                  us to avoid code duplicating and layering violating. */
691                 OBD_ALLOC(oa, sizeof(*oa));
692                 if (!oa)
693                         RETURN(ERR_PTR(-ENOMEM));
694                 
695                 oa->o_gr = FILTER_GROUP_LLOG;
696                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
697                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
698                 if (rc)
699                         GOTO(out_free_oa, rc);
700
701                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
702                                              oa->o_generation, oa->o_gr);
703                 if (IS_ERR(dchild))
704                         GOTO(out_free_oa, rc = PTR_ERR(dchild));
705
706                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
707                                      open_flags);
708                 if (IS_ERR(filp)) {
709                         l_dput(dchild);
710                         GOTO(out_free_oa, rc = PTR_ERR(filp));
711                 }
712
713                 /* group 1 is not longer valid, we use the group which is set 
714                 by obd_create()->mds_obd_create(). */
715                 lgh_id->lgl_ogr = oa->o_gr;
716                 lgh_id->lgl_oid = oa->o_id;
717                 lgh_id->lgl_ogen = oa->o_generation;
718         }
719
720 out_free_oa:
721         if (rc)
722                 filp = ERR_PTR(rc);
723         if (oa)
724                 OBD_FREE(oa, sizeof(*oa));
725         RETURN(filp);
726 }
727
728 static struct file *
729 llog_object_create(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
730 {
731         if (ctxt->loc_alone)
732                 return llog_object_create_alone(ctxt, lgh_id);
733         else
734                 return llog_object_create_generic(ctxt, lgh_id);
735 }
736
737 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
738                                 struct dentry *dentry)
739 {
740         struct dentry *new_child;
741         char fidname[LL_FID_NAMELEN];
742         void *handle;
743         int namelen, rc = 0, err;
744         ENTRY;
745         
746         namelen = ll_fid2str(fidname, logid.lgl_oid, logid.lgl_ogen);
747         down(&ctxt->loc_objects_dir->d_inode->i_sem);
748         new_child = lookup_one_len(fidname, ctxt->loc_objects_dir, namelen);
749         if (IS_ERR(new_child)) {
750                 CERROR("getting neg dentry for obj rename: %d\n", rc);
751                 GOTO(out, rc = PTR_ERR(new_child));
752         }
753         if (new_child->d_inode == dentry->d_inode)
754                 GOTO(out_dput, rc);
755         if (new_child->d_inode != NULL) {
756                 CERROR("impossible non-negative obj dentry "LPX64":%u!\n",
757                        logid.lgl_oid, logid.lgl_ogen);
758                 LBUG();
759         }
760         handle = llog_fsfilt_start(ctxt, ctxt->loc_objects_dir->d_inode,
761                                    FSFILT_OP_LINK, NULL);
762         if (IS_ERR(handle))
763                 GOTO(out_dput, rc = PTR_ERR(handle));
764         
765         lock_kernel();
766         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
767         unlock_kernel();
768         if (rc)
769                 CERROR("error link new object "LPX64":%u: rc %d\n",
770                        logid.lgl_oid, logid.lgl_ogen, rc);
771         err = llog_fsfilt_commit(ctxt, ctxt->loc_objects_dir->d_inode, handle, 0);
772 out_dput:
773         l_dput(new_child);
774 out:
775         up(&ctxt->loc_objects_dir->d_inode->i_sem);
776         RETURN(rc);
777 }
778
779 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
780                           struct llog_logid *logid, char *name, int flags)
781 {
782         struct llog_handle *handle;
783         struct lvfs_run_ctxt saved;
784         int rc = 0;
785         int open_flags = O_RDWR | O_LARGEFILE;
786         ENTRY;
787
788         if (flags & OBD_LLOG_FL_CREATE)
789                 open_flags |= O_CREAT;
790
791         handle = llog_alloc_handle();
792         if (handle == NULL)
793                 RETURN(-ENOMEM);
794         *res = handle;
795         
796         LASSERT(ctxt);
797         if (ctxt->loc_lvfs_ctxt)
798                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
799         
800         if (logid != NULL) {
801                 handle->lgh_file = llog_object_create(ctxt, logid);
802                 if (IS_ERR(handle->lgh_file)) {
803                         CERROR("cannot create/open llog object "LPX64":%x "
804                                "error = %ld", logid->lgl_oid, logid->lgl_ogen,
805                                PTR_ERR(handle->lgh_file));
806                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
807                 }
808                 handle->lgh_id = *logid;
809
810         } else if (name) {
811                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
812                 if (IS_ERR(handle->lgh_file)) {
813                         CERROR("cannot open %s file, error = %ld\n", 
814                                name, PTR_ERR(handle->lgh_file));
815                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
816                 }
817                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
818                 
819                 handle->lgh_id.lgl_ogr = 1;
820                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
821                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
822                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
823                 if (rc)
824                         GOTO(cleanup, rc);
825
826         } else {
827                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
828                 if (IS_ERR(handle->lgh_file)) {
829                         CERROR("cannot create llog object, error = %ld\n", 
830                                PTR_ERR(handle->lgh_file));
831                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
832                 }
833         }
834
835         handle->lgh_ctxt = ctxt;
836 finish:
837         if (ctxt->loc_lvfs_ctxt)
838                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
839         RETURN(rc);
840 cleanup:
841         llog_free_handle(handle);
842         goto finish;
843 }
844
845 static int llog_lvfs_close(struct llog_handle *handle)
846 {
847         int rc;
848         ENTRY;
849
850         rc = filp_close(handle->lgh_file, 0);
851         if (rc)
852                 CERROR("error closing log: rc %d\n", rc);
853         RETURN(rc);
854 }
855
856 static int llog_lvfs_destroy(struct llog_handle *loghandle)
857 {
858         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
859         struct lvfs_run_ctxt saved;
860         struct dentry *fdentry;
861         struct inode *parent_inode;
862         char fidname[LL_FID_NAMELEN];
863         void *handle;
864         int rc = -EINVAL, err, namelen;
865         ENTRY;
866         
867         if (ctxt->loc_lvfs_ctxt)
868                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
869         
870         fdentry = loghandle->lgh_file->f_dentry;
871         parent_inode = fdentry->d_parent->d_inode;
872         
873         if (!strcmp(fdentry->d_parent->d_name.name, "LOGS")) {
874                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
875                 
876                 namelen = ll_fid2str(fidname, fdentry->d_inode->i_ino,
877                                      fdentry->d_inode->i_generation);
878                 dget(fdentry);
879                 rc = llog_lvfs_close(loghandle);
880                 if (rc) {
881                         dput(fdentry);
882                         GOTO(out, rc);
883                 }
884                 
885                 handle = llog_fsfilt_start(ctxt, parent_inode,
886                                            FSFILT_OP_UNLINK, NULL);
887                 if (IS_ERR(handle)) {
888                         dput(fdentry);
889                         GOTO(out, rc = PTR_ERR(handle));
890                 }
891                 
892                 down(&parent_inode->i_sem);
893                 rc = vfs_unlink(parent_inode, fdentry);
894                 up(&parent_inode->i_sem);
895                 dput(fdentry);
896                 
897                 if (!rc) {
898                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
899                         fdentry = lookup_one_len(fidname, ctxt->loc_objects_dir,
900                                                  namelen);
901                         if (fdentry == NULL || fdentry->d_inode == NULL) {
902                                 CERROR("destroy non_existent object %s\n", fidname);
903                                 GOTO(out_err, rc = IS_ERR(fdentry) ?
904                                      PTR_ERR(fdentry) : -ENOENT);
905                         }
906                         rc = vfs_unlink(ctxt->loc_objects_dir->d_inode, fdentry);
907                         l_dput(fdentry);
908 out_err:
909                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
910                 }
911                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
912                 if (err && !rc)
913                         err = rc;
914                 
915                 GOTO(out, rc);
916         }
917         if (ctxt->loc_alone) {
918                 if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
919                         LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
920                         
921                         dget(fdentry);
922                         rc = llog_lvfs_close(loghandle);
923                         if (rc == 0) {
924                                 down(&parent_inode->i_sem);
925                                 rc = vfs_unlink(parent_inode, fdentry);
926                                 up(&parent_inode->i_sem);
927                         }
928                         dput(fdentry);
929                 }
930         } else {
931                 struct obdo *oa = NULL;
932  
933                 OBD_ALLOC(oa, sizeof(*oa));
934                 if (!oa)
935                         GOTO(out, rc = -ENOMEM);
936                 
937                 oa->o_id = loghandle->lgh_id.lgl_oid;
938                 oa->o_gr = loghandle->lgh_id.lgl_ogr;
939                 oa->o_generation = loghandle->lgh_id.lgl_ogen;
940                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
941                 
942                 rc = llog_lvfs_close(loghandle);
943                 if (rc)
944                         GOTO(out_free_oa, rc);
945                 
946                 rc = obd_destroy(loghandle->lgh_ctxt->loc_exp, oa, NULL, NULL);
947 out_free_oa:
948                 OBD_FREE(oa, sizeof(*oa));
949         }
950 out:
951         if (ctxt->loc_lvfs_ctxt)
952                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
953         RETURN(rc);
954 }
955
956 /* reads the catalog list */
957 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
958                       struct fsfilt_operations *fsops, char *name,
959                       int count, struct llog_catid *idarray)
960 {
961         struct lvfs_run_ctxt saved;
962         struct l_file *file;
963         int size = sizeof(*idarray) * count;
964         loff_t off = 0;
965         int rc;
966
967         LASSERT(count);
968
969         if (ctxt)
970                 push_ctxt(&saved, ctxt, NULL);
971         file = l_filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
972         if (!file || IS_ERR(file)) {
973                 rc = PTR_ERR(file);
974                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
975                        name, rc);
976                 GOTO(out, rc);
977         }
978
979         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
980                 CERROR("%s is not a regular file!: mode = %o\n", name,
981                        file->f_dentry->d_inode->i_mode);
982                 GOTO(out, rc = -ENOENT);
983         }
984
985         rc = fsops->fs_read_record(file, idarray, size, &off);
986         if (rc) {
987                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
988                        name, rc);
989                 GOTO(out, rc);
990         }
991
992  out:
993         if (file && !IS_ERR(file))
994                 rc = filp_close(file, 0);
995         if (ctxt)
996                 pop_ctxt(&saved, ctxt, NULL);
997         RETURN(rc);
998 }
999 EXPORT_SYMBOL(llog_get_cat_list);
1000
1001 /* writes the cat list */
1002 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1003                       struct fsfilt_operations *fsops, char *name,
1004                       int count, struct llog_catid *idarray)
1005 {
1006         struct lvfs_run_ctxt saved;
1007         struct l_file *file;
1008         int size = sizeof(*idarray) * count;
1009         loff_t off = 0;
1010         int rc;
1011
1012         LASSERT(count);
1013
1014         if (ctxt)
1015                 push_ctxt(&saved, ctxt, NULL);
1016         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
1017         if (!file || IS_ERR(file)) {
1018                 rc = PTR_ERR(file);
1019                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
1020                        name, rc);
1021                 GOTO(out, rc);
1022         }
1023
1024         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
1025                 CERROR("%s is not a regular file!: mode = %o\n", name,
1026                        file->f_dentry->d_inode->i_mode);
1027                 GOTO(out, rc = -ENOENT);
1028         }
1029
1030         rc = fsops->fs_write_record(file, idarray, size, &off, 1);
1031         if (rc) {
1032                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
1033                        name, rc);
1034                 GOTO(out, rc);
1035         }
1036
1037  out:
1038         if (file && !IS_ERR(file))
1039                 rc = filp_close(file, 0);
1040         if (ctxt)
1041                 pop_ctxt(&saved, ctxt, NULL);
1042         RETURN(rc);
1043 }
1044 EXPORT_SYMBOL(llog_put_cat_list);
1045
1046 struct llog_operations llog_lvfs_ops = {
1047         lop_open:        llog_lvfs_open,
1048         lop_destroy:     llog_lvfs_destroy,
1049         lop_close:       llog_lvfs_close,
1050         lop_read_header: llog_lvfs_read_header,
1051         lop_write_rec:   llog_lvfs_write_rec,
1052         lop_next_block:  llog_lvfs_next_block,
1053         lop_prev_block:  llog_lvfs_prev_block,
1054 };
1055 EXPORT_SYMBOL(llog_lvfs_ops);
1056
1057 #else /* !__KERNEL__ */
1058
1059 static int llog_lvfs_read_header(struct llog_handle *handle)
1060 {
1061         LBUG();
1062         return 0;
1063 }
1064
1065 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
1066                                struct llog_rec_hdr *rec,
1067                                struct llog_cookie *reccookie, int cookiecount,
1068                                void *buf, int idx)
1069 {
1070         LBUG();
1071         return 0;
1072 }
1073
1074 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
1075                           struct llog_logid *logid, char *name, int flags)
1076 {
1077         LBUG();
1078         return 0;
1079 }
1080
1081 static int llog_lvfs_close(struct llog_handle *handle)
1082 {
1083         LBUG();
1084         return 0;
1085 }
1086
1087 static int llog_lvfs_destroy(struct llog_handle *handle)
1088 {
1089         LBUG();
1090         return 0;
1091 }
1092
1093 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
1094                       struct fsfilt_operations *fsops, char *name,
1095                       int count, struct llog_catid *idarray)
1096 {
1097         LBUG();
1098         return 0;
1099 }
1100
1101 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1102                       struct fsfilt_operations *fsops, char *name,
1103                       int count, struct llog_catid *idarray)
1104 {
1105         LBUG();
1106         return 0;
1107 }
1108
1109 int llog_lvfs_prev_block(struct llog_handle *loghandle,
1110                          int prev_idx, void *buf, int len)
1111 {
1112         LBUG();
1113         return 0;
1114 }
1115
1116 int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
1117                          int next_idx, __u64 *offset, void *buf, int len)
1118 {
1119         LBUG();
1120         return 0;
1121 }
1122
1123 struct llog_operations llog_lvfs_ops = {
1124         lop_open:        llog_lvfs_open,
1125         lop_destroy:     llog_lvfs_destroy,
1126         lop_close:       llog_lvfs_close,
1127         lop_read_header: llog_lvfs_read_header,
1128         lop_write_rec:   llog_lvfs_write_rec,
1129         lop_next_block:  llog_lvfs_next_block,
1130         lop_prev_block:  llog_lvfs_prev_block,
1131 };
1132 #endif