Whamcloud - gitweb
- many gcc4 compilation fixes (warnings)
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lvfs.h>
42 #include <linux/lustre_fsfilt.h>
43 #include <linux/lustre_log.h>
44
45 #ifdef __KERNEL__
46
47 static int llog_lvfs_pad(struct llog_ctxt *ctxt, struct l_file *file,
48                          int len, int index)
49 {
50         struct llog_rec_hdr rec;
51         struct llog_rec_tail tail;
52         int rc;
53         ENTRY;
54
55         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
56
57         tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
58         tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
59         rec.lrh_type = 0;
60
61         rc = llog_fsfilt_write_record(ctxt, file, &rec, sizeof(rec),
62                                       &file->f_pos, 0);
63         if (rc) {
64                 CERROR("error writing padding record: rc %d\n", rc);
65                 goto out;
66         }
67
68         file->f_pos += len - sizeof(rec) - sizeof(tail);
69         rc = llog_fsfilt_write_record(ctxt, file, &tail, sizeof(tail),
70                                       &file->f_pos, 0);
71         if (rc) {
72                 CERROR("error writing padding record: rc %d\n", rc);
73                 goto out;
74         }
75
76  out:
77         RETURN(rc);
78 }
79
80 static int llog_lvfs_write_blob(struct llog_ctxt *ctxt, struct l_file *file,
81                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
82 {
83         int rc;
84         struct llog_rec_tail end;
85         loff_t saved_off = file->f_pos;
86         int buflen = le32_to_cpu(rec->lrh_len);
87
88         ENTRY;
89         file->f_pos = off;
90
91         if (!buf) {
92                 rc = llog_fsfilt_write_record(ctxt, file, rec, buflen,
93                                               &file->f_pos, 0);
94                 if (rc) {
95                         CERROR("error writing log record: rc %d\n", rc);
96                         goto out;
97                 }
98                 GOTO(out, rc = 0);
99         }
100
101         /* the buf case */
102         rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
103         rc = llog_fsfilt_write_record(ctxt, file, rec, sizeof(*rec),
104                                       &file->f_pos, 0);
105         if (rc) {
106                 CERROR("error writing log hdr: rc %d\n", rc);
107                 goto out;
108         }
109
110         rc = llog_fsfilt_write_record(ctxt, file, buf, buflen,
111                                       &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log buffer: rc %d\n", rc);
114                 goto out;
115         }
116
117         end.lrt_len = rec->lrh_len;
118         end.lrt_index = rec->lrh_index;
119         rc = llog_fsfilt_write_record(ctxt, file, &end, sizeof(end),
120                                       &file->f_pos, 0);
121         if (rc) {
122                 CERROR("error writing log tail: rc %d\n", rc);
123                 goto out;
124         }
125
126         rc = 0;
127  out:
128         if (saved_off > file->f_pos)
129                 file->f_pos = saved_off;
130         LASSERT(rc <= 0);
131         RETURN(rc);
132 }
133
134 static int llog_lvfs_read_blob(struct llog_ctxt *ctxt, struct l_file *file,
135                                void *buf, int size, loff_t off)
136 {
137         loff_t offset = off;
138         int rc;
139         ENTRY;
140
141         rc = llog_fsfilt_read_record(ctxt, file, buf, size, &offset);
142         if (rc) {
143                 CERROR("error reading log record: rc %d\n", rc);
144                 RETURN(rc);
145         }
146         RETURN(0);
147 }
148
149 static int llog_lvfs_read_header(struct llog_handle *handle)
150 {
151         struct llog_ctxt *ctxt = handle->lgh_ctxt;
152         int rc;
153         ENTRY;
154
155         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
156         LASSERT(ctxt != NULL);
157
158         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
159                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
160                 RETURN(LLOG_EEMPTY);
161         }
162
163         rc = llog_lvfs_read_blob(ctxt, handle->lgh_file, handle->lgh_hdr,
164                                  LLOG_CHUNK_SIZE, 0);
165         if (rc)
166                 CERROR("error reading log header\n");
167
168         handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
169         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
170
171         RETURN(rc);
172 }
173
174 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
175 /* appends if idx == -1, otherwise overwrites record idx. */
176 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
177                                struct llog_rec_hdr *rec,
178                                struct llog_cookie *reccookie,
179                                int cookiecount,
180                                void *buf, int idx)
181 {
182         struct llog_log_hdr *llh;
183         int reclen = le32_to_cpu(rec->lrh_len), index, rc;
184         struct llog_rec_tail *lrt;
185         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
186         struct file *file;
187         loff_t offset;
188         size_t left;
189         ENTRY;
190
191         llh = loghandle->lgh_hdr;
192         file = loghandle->lgh_file;
193
194         /* record length should not bigger than LLOG_CHUNK_SIZE */
195         if (buf)
196                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
197                       - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
198         else
199                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
200         if (rc)
201                 RETURN(rc);
202
203         if (idx != -1) {
204                 loff_t saved_offset;
205
206                 /* no header: only allowed to insert record 1 */
207                 if (idx > 1 && !file->f_dentry->d_inode->i_size) {
208                         CERROR("idx != -1 in empty log\n");
209                         LBUG();
210                 }
211
212                 if (idx && llh->llh_size && llh->llh_size != reclen)
213                         RETURN(-EINVAL);
214
215                 rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
216                 /* we are done if we only write the header or on error */
217                 if (rc || idx == 0)
218                         RETURN(rc);
219
220                 saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
221                 rc = llog_lvfs_write_blob(ctxt, file, rec, buf, saved_offset);
222                 if (rc == 0 && reccookie) {
223                         reccookie->lgc_lgl = loghandle->lgh_id;
224                         reccookie->lgc_index = idx;
225                         rc = 1;
226                 }
227                 RETURN(rc);
228         }
229
230         /* Make sure that records don't cross a chunk boundary, so we can
231          * process them page-at-a-time if needed.  If it will cross a chunk
232          * boundary, write in a fake (but referenced) entry to pad the chunk.
233          *
234          * We know that llog_current_log() will return a loghandle that is
235          * big enough to hold reclen, so all we care about is padding here.
236          */
237         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
238         if (buf)
239                 reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
240                          sizeof(struct llog_rec_tail);
241
242         /* NOTE: padding is a record, but no bit is set */
243         if (left != 0 && left != reclen &&
244             left < (reclen + LLOG_MIN_REC_SIZE)) {
245                 loghandle->lgh_last_idx++;
246                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
247                 if (rc)
248                         RETURN(rc);
249                 /* if it's the last idx in log file, then return -ENOSPC */
250                 if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
251                         RETURN(-ENOSPC);
252         }
253
254         loghandle->lgh_last_idx++;
255         index = loghandle->lgh_last_idx;
256         LASSERT(index < LLOG_BITMAP_SIZE(llh));
257         rec->lrh_index = cpu_to_le32(index);
258         if (buf == NULL) {
259                 lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
260                 lrt->lrt_len = rec->lrh_len;
261                 lrt->lrt_index = rec->lrh_index;
262         }
263         if (ext2_set_bit(index, llh->llh_bitmap)) {
264                 CERROR("argh, index %u already set in log bitmap?\n", index);
265                 LBUG(); /* should never happen */
266         }
267         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
268         llh->llh_tail.lrt_index = cpu_to_le32(index);
269
270         offset = 0;
271         rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
272         if (rc)
273                 RETURN(rc);
274
275         CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
276                loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
277                file->f_pos);
278
279         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
280         if (rc)
281                 RETURN(rc);
282
283         if (rc == 0 && reccookie) {
284                 if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
285                         LASSERTF(EQ_LOGID(reccookie->lgc_lgl,loghandle->lgh_id),
286                                  "lgc_lgl.oid/gr "LPU64"/"LPU64" lgh_id.oid/gr"
287                                  LPU64"/"LPU64"\n",
288                                  reccookie->lgc_lgl.lgl_oid,
289                                  reccookie->lgc_lgl.lgl_ogr,
290                                  loghandle->lgh_id.lgl_oid,
291                                  loghandle->lgh_id.lgl_oid);
292                         LASSERTF(reccookie->lgc_index == index,
293                                  "lgc_index %u != index %u\n",
294                                  reccookie->lgc_index, index);
295                 } else {
296                         reccookie->lgc_lgl = loghandle->lgh_id;
297                         reccookie->lgc_index = index;
298                         llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
299                 }
300
301                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
302                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
303                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
304                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
305                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
306                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
307                 else
308                         reccookie->lgc_subsys = -1;
309                 rc = 1;
310         }
311         if (rc == 0 && (le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC ||
312             le32_to_cpu(rec->lrh_type) == SMFS_UPDATE_REC))
313                 rc = 1;
314
315         RETURN(rc);
316 }
317
318 /* We can skip reading at least as many log blocks as the number of
319 * minimum sized log records we are skipping.  If it turns out
320 * that we are not far enough along the log (because the
321 * actual records are larger than minimum size) we just skip
322 * some more records. */
323
324 static void llog_skip_over(__u64 *off, int curr, int goal)
325 {
326         if (goal <= curr)
327                 return;
328         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
329                 ~(LLOG_CHUNK_SIZE - 1);
330 }
331
332 /* sets:
333  *  - curr_offset to the furthest point read in the log file
334  *  - curr_idx to the log index preceeding curr_offset
335  * returns -EIO/-EINVAL on error
336  */
337 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
338                                 int next_idx, __u64 *curr_offset, void *buf,
339                                 int len)
340 {
341         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
342         ENTRY;
343
344         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
345                 RETURN(-EINVAL);
346
347         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
348                next_idx, *curr_idx, *curr_offset);
349
350         while (*curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
351                 struct llog_rec_hdr *rec;
352                 struct llog_rec_tail *tail;
353                 loff_t ppos;
354                 int nbytes, rc;
355
356                 llog_skip_over(curr_offset, *curr_idx, next_idx);
357
358                 ppos = *curr_offset;
359                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
360                                              buf, len, &ppos);
361
362                 if (rc) {
363                         CERROR("Cant read llog block at log id "LPU64
364                                "/%u offset "LPU64"\n",
365                                loghandle->lgh_id.lgl_oid,
366                                loghandle->lgh_id.lgl_ogen,
367                                *curr_offset);
368                         RETURN(rc);
369                 }
370
371                 nbytes = ppos - *curr_offset;
372                 *curr_offset = ppos;
373
374                 if (nbytes == 0) /* end of file, nothing to do */
375                         RETURN(0);
376
377                 if (nbytes < sizeof(*tail)) {
378                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
379                                LPU64"\n", loghandle->lgh_id.lgl_oid,
380                                loghandle->lgh_id.lgl_ogen, *curr_offset);
381                         RETURN(-EINVAL);
382                 }
383
384                 tail = buf + nbytes - sizeof(struct llog_rec_tail);
385                 *curr_idx = le32_to_cpu(tail->lrt_index);
386
387                 /* this shouldn't happen */
388                 if (tail->lrt_index == 0) {
389                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
390                                LPU64"\n", loghandle->lgh_id.lgl_oid,
391                                loghandle->lgh_id.lgl_ogen, *curr_offset);
392                         RETURN(-EINVAL);
393                 }
394                 if (le32_to_cpu(tail->lrt_index) < next_idx)
395                         continue;
396
397                 /* sanity check that the start of the new buffer is no farther
398                  * than the record that we wanted.  This shouldn't happen. */
399                 rec = buf;
400                 if (le32_to_cpu(rec->lrh_index) > next_idx) {
401                         CERROR("missed desired record? %u > %u\n",
402                                le32_to_cpu(rec->lrh_index), next_idx);
403                         RETURN(-ENOENT);
404                 }
405                 RETURN(0);
406         }
407         RETURN(-EIO);
408 }
409
410 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
411                                 int prev_idx, void *buf, int len)
412 {
413         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
414         __u64 curr_offset;
415         int rc;
416         ENTRY;
417
418         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
419                 RETURN(-EINVAL);
420
421         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
422
423         curr_offset = LLOG_CHUNK_SIZE;
424         llog_skip_over(&curr_offset, 0, prev_idx);
425
426         while (curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
427                 struct llog_rec_hdr *rec;
428                 struct llog_rec_tail *tail;
429                 loff_t ppos;
430
431                 ppos = curr_offset;
432                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
433                                              buf, len, &ppos);
434
435                 if (rc) {
436                         CERROR("Cant read llog block at log id "LPU64
437                                "/%u offset "LPU64"\n",
438                                loghandle->lgh_id.lgl_oid,
439                                loghandle->lgh_id.lgl_ogen,
440                                curr_offset);
441                         RETURN(rc);
442                 }
443
444                 /* put number of bytes read into rc to make code simpler */
445                 rc = ppos - curr_offset;
446                 curr_offset = ppos;
447
448                 if (rc == 0) /* end of file, nothing to do */
449                         RETURN(0);
450
451                 if (rc < sizeof(*tail)) {
452                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
453                                LPU64"\n", loghandle->lgh_id.lgl_oid,
454                                loghandle->lgh_id.lgl_ogen, curr_offset);
455                         RETURN(-EINVAL);
456                 }
457
458                 tail = buf + rc - sizeof(struct llog_rec_tail);
459
460                 /* this shouldn't happen */
461                 if (tail->lrt_index == 0) {
462                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
463                                LPU64"\n", loghandle->lgh_id.lgl_oid,
464                                loghandle->lgh_id.lgl_ogen, curr_offset);
465                         RETURN(-EINVAL);
466                 }
467                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
468                         continue;
469
470                 /* sanity check that the start of the new buffer is no farther
471                  * than the record that we wanted.  This shouldn't happen. */
472                 rec = buf;
473                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
474                         CERROR("missed desired record? %u > %u\n",
475                                le32_to_cpu(rec->lrh_index), prev_idx);
476                         RETURN(-ENOENT);
477                 }
478                 RETURN(0);
479         }
480         RETURN(-EIO);
481 }
482
483 static struct file *llog_filp_open(char *name, int flags, int mode)
484 {
485         char *logname;
486         struct file *filp;
487         int len;
488
489         OBD_ALLOC(logname, PATH_MAX);
490         if (logname == NULL)
491                 return ERR_PTR(-ENOMEM);
492
493         len = snprintf(logname, PATH_MAX, "LOGS/%s", name);
494         if (len >= PATH_MAX - 1) {
495                 filp = ERR_PTR(-ENAMETOOLONG);
496         } else {
497                 filp = l_filp_open(logname, flags, mode);
498                 if (IS_ERR(filp)) {
499                         CERROR("logfile %s(%s): %ld\n",
500                                flags & O_CREAT ? "create" : "open", logname,
501                                PTR_ERR(filp));
502                 }
503         }
504
505         OBD_FREE(logname, PATH_MAX);
506         return filp;
507 }
508
509 /* creates object for the case when we have no obd (smfs). */
510 static struct file *
511 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
512 {
513         struct file *filp;
514         int rc = 0;
515         ENTRY;
516
517         LASSERT(lgh_id != NULL);
518         if (lgh_id->lgl_oid) {
519                 struct dentry *dchild;
520                 char id_name[LL_ID_NAMELEN];
521                 int id_len = 0;
522
523                 down(&ctxt->loc_objects_dir->d_inode->i_sem);
524                 id_len = ll_id2str(id_name, lgh_id->lgl_oid, 
525                                        lgh_id->lgl_ogen);
526                 
527                 dchild = lookup_one_len(id_name, ctxt->loc_objects_dir, 
528                                         id_len);
529                 if (IS_ERR(dchild)) {
530                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
531                         RETURN((struct file *)dchild);
532                 }
533                 if (dchild->d_inode == NULL) {
534                         struct dentry_params dp;
535                         struct inode *inode;
536
537                         dchild->d_fsdata = (void *) &dp;
538                         dp.p_ptr = NULL;
539                         dp.p_inum = lgh_id->lgl_oid;
540                         rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
541                                            dchild, S_IFREG, NULL);
542                         if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
543                                 dchild->d_fsdata = NULL;
544                         if (rc) {
545                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
546                                 dput(dchild);
547                                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
548                                 RETURN(ERR_PTR(rc));
549                         }
550                         inode = dchild->d_inode;
551                         LASSERT(inode->i_ino == lgh_id->lgl_oid);
552                         inode->i_generation = lgh_id->lgl_ogen;
553                         CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
554                                inode->i_ino, inode->i_generation);
555                         mark_inode_dirty(inode);
556                 }
557
558                 mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
559                 filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
560                                     O_RDWR | O_LARGEFILE);
561                 if (IS_ERR(filp)) {
562                         dput(dchild);
563                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
564                         RETURN(filp);
565                 }
566                 if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
567                         CERROR("%s is not a regular file!: mode = %o\n", 
568                                id_name, filp->f_dentry->d_inode->i_mode);
569                         filp_close(filp, 0);
570                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
571                         RETURN(ERR_PTR(-ENOENT));
572                 }
573
574                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
575                 RETURN(filp);
576
577         } else {
578                 unsigned int tmpname = ll_insecure_random_int();
579                 char id_name[LL_ID_NAMELEN];
580                 struct dentry *new_child, *parent;
581                 int err, id_len;
582                 void *handle;
583
584                 sprintf(id_name, "OBJECTS/%u", tmpname);
585                 filp = filp_open(id_name, O_CREAT | O_EXCL, 0644);
586                 if (IS_ERR(filp)) {
587                         rc = PTR_ERR(filp);
588                         if (rc == -EEXIST) {
589                                 CERROR("impossible object name collision %u\n",
590                                         tmpname);
591                                 LBUG();
592                         }
593                         CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
594                         RETURN(filp);
595                 }
596
597                 id_len = ll_id2str(id_name, filp->f_dentry->d_inode->i_ino,
598                                        filp->f_dentry->d_inode->i_generation);
599                 parent = filp->f_dentry->d_parent;
600                 down(&parent->d_inode->i_sem);
601                 new_child = lookup_one_len(id_name, parent, id_len);
602                 if (IS_ERR(new_child)) {
603                         CERROR("getting neg dentry for obj rename: %d\n", rc);
604                         GOTO(out_close, rc = PTR_ERR(new_child));
605                 }
606                 if (new_child->d_inode != NULL) {
607                         CERROR("impossible non-negative obj dentry %lu:%u!\n",
608                                 filp->f_dentry->d_inode->i_ino,
609                                 filp->f_dentry->d_inode->i_generation);
610                         LBUG();
611                 }
612
613                 handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
614                 if (IS_ERR(handle))
615                         GOTO(out_dput, rc = PTR_ERR(handle));
616
617                 lock_kernel();
618                 rc = vfs_rename(parent->d_inode, filp->f_dentry,
619                                 parent->d_inode, new_child);
620                 unlock_kernel();
621                 if (rc)
622                         CERROR("error renaming new object %lu:%u: rc %d\n",
623                                 filp->f_dentry->d_inode->i_ino,
624                                 filp->f_dentry->d_inode->i_generation, rc);
625
626                 err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
627                 if (!rc)
628                         rc = err;
629
630         out_dput:
631                 dput(new_child);
632         out_close:
633                 up(&parent->d_inode->i_sem);
634                 if (rc) {
635                         filp_close(filp, 0);
636                         filp = ERR_PTR(rc);
637                 } else {
638                         /* FIXME: is this group 1 is correct? */
639                         lgh_id->lgl_ogr = 1;
640                         lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
641                         lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
642                 }
643                 RETURN(filp);
644         }
645 }
646
647 /* creates object for generic case (obd exists) */
648 static struct file *
649 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
650 {
651         struct file *filp = NULL;
652         struct dentry *dchild;
653         struct obd_device *obd;
654         struct obdo *oa = NULL;
655         int open_flags = O_RDWR | O_LARGEFILE;
656         int rc = 0;
657         ENTRY;
658
659         obd = ctxt->loc_exp->exp_obd;
660         LASSERT(obd != NULL);
661
662         if (lgh_id->lgl_oid) {
663                 dchild = obd_lvfs_id2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
664                                             lgh_id->lgl_ogen, lgh_id->lgl_ogr);
665                 if (IS_ERR(dchild) == -ENOENT) {
666                         OBD_ALLOC(oa, sizeof(*oa));
667                         if (!oa)
668                                 RETURN(ERR_PTR(-ENOMEM));
669
670                         oa->o_id = lgh_id->lgl_oid;
671                         oa->o_generation = lgh_id->lgl_ogen;
672                         oa->o_gr = lgh_id->lgl_ogr;
673                         oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
674                         rc = obd_create(ctxt->loc_exp, oa, NULL, 0, NULL, NULL);
675                         if (rc) {
676                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
677                                 GOTO(out_free_oa, rc);
678                         }
679                         CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
680                                lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
681
682                         dchild = obd_lvfs_id2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
683                                                     lgh_id->lgl_ogen, lgh_id->lgl_ogr);
684                 } else if (IS_ERR(dchild)) {
685                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
686                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
687                         RETURN((struct file *)dchild);
688                 }
689
690                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
691                 if (IS_ERR(filp)) {
692                         l_dput(dchild);
693                         rc = PTR_ERR(filp);
694                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
695                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
696                 }
697                 GOTO(out_free_oa, rc);
698         } else {
699                 /* this is important to work here over obd_create() as it manages 
700                   groups and we need it. Yet another reason is that mds_obd_create()
701                  is fully the same as old version of this function and this helps
702                  us to avoid code duplicating and layering violating. */
703                 OBD_ALLOC(oa, sizeof(*oa));
704                 if (!oa)
705                         RETURN(ERR_PTR(-ENOMEM));
706
707                 oa->o_gr = FILTER_GROUP_LLOG;
708                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
709                 rc = obd_create(ctxt->loc_exp, oa, NULL, 0, NULL, NULL);
710                 if (rc)
711                         GOTO(out_free_oa, rc);
712
713                 dchild = obd_lvfs_id2dentry(ctxt->loc_exp, oa->o_id,
714                                             oa->o_generation, oa->o_gr);
715                 if (IS_ERR(dchild))
716                         GOTO(out_free_oa, rc = PTR_ERR(dchild));
717
718                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
719                                      open_flags);
720                 if (IS_ERR(filp)) {
721                         l_dput(dchild);
722                         GOTO(out_free_oa, rc = PTR_ERR(filp));
723                 }
724
725                 /* group 1 is not longer valid, we use the group which is set 
726                 by obd_create()->mds_obd_create(). */
727                 lgh_id->lgl_ogr = oa->o_gr;
728                 lgh_id->lgl_oid = oa->o_id;
729                 lgh_id->lgl_ogen = oa->o_generation;
730         }
731
732 out_free_oa:
733         if (rc)
734                 filp = ERR_PTR(rc);
735         if (oa)
736                 OBD_FREE(oa, sizeof(*oa));
737         RETURN(filp);
738 }
739
740 static struct file *
741 llog_object_create(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
742 {
743         if (ctxt->loc_alone)
744                 return llog_object_create_alone(ctxt, lgh_id);
745         else
746                 return llog_object_create_generic(ctxt, lgh_id);
747 }
748
749 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
750                                 struct dentry *dentry)
751 {
752         struct dentry *new_child;
753         char id_name[LL_ID_NAMELEN];
754         void *handle;
755         int id_len, rc = 0, err;
756         ENTRY;
757         
758         id_len = ll_id2str(id_name, logid.lgl_oid, logid.lgl_ogen);
759         down(&ctxt->loc_objects_dir->d_inode->i_sem);
760         new_child = lookup_one_len(id_name, ctxt->loc_objects_dir, id_len);
761         if (IS_ERR(new_child)) {
762                 CERROR("getting neg dentry for obj rename: %d\n", rc);
763                 GOTO(out, rc = PTR_ERR(new_child));
764         }
765         if (new_child->d_inode == dentry->d_inode)
766                 GOTO(out_dput, rc);
767         if (new_child->d_inode != NULL) {
768                 CERROR("impossible non-negative obj dentry "LPX64":%u!\n",
769                        logid.lgl_oid, logid.lgl_ogen);
770                 LBUG();
771         }
772         handle = llog_fsfilt_start(ctxt, ctxt->loc_objects_dir->d_inode,
773                                    FSFILT_OP_LINK, NULL);
774         if (IS_ERR(handle))
775                 GOTO(out_dput, rc = PTR_ERR(handle));
776         
777         lock_kernel();
778         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
779         unlock_kernel();
780         if (rc) {
781                 CERROR("error link new object "LPX64":%08x: rc %d\n",
782                        logid.lgl_oid, logid.lgl_ogen, rc);
783                 /* it doesn't make much sense to get -EEXIST here */
784                 LASSERTF(rc != -EEXIST, "bug 3490: dentry: %p "
785                          "dir->d_ionode %p new_child: %p  \n",
786                          dentry, ctxt->loc_objects_dir->d_inode, new_child);
787         }
788         err = llog_fsfilt_commit(ctxt, ctxt->loc_objects_dir->d_inode, handle, 0);
789 out_dput:
790         l_dput(new_child);
791 out:
792         up(&ctxt->loc_objects_dir->d_inode->i_sem);
793         RETURN(rc);
794 }
795
796 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
797                           struct llog_logid *logid, char *name, int flags)
798 {
799         struct llog_handle *handle;
800         struct lvfs_run_ctxt saved;
801         int rc = 0;
802         int open_flags = O_RDWR | O_LARGEFILE;
803         ENTRY;
804
805         if (flags & OBD_LLOG_FL_CREATE)
806                 open_flags |= O_CREAT;
807
808         handle = llog_alloc_handle();
809         if (handle == NULL)
810                 RETURN(-ENOMEM);
811         *res = handle;
812         
813         LASSERT(ctxt);
814         if (ctxt->loc_lvfs_ctxt)
815                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
816         
817         if (logid != NULL) {
818                 handle->lgh_file = llog_object_create(ctxt, logid);
819                 if (IS_ERR(handle->lgh_file)) {
820                         CERROR("cannot create/open llog object "LPX64":%x "
821                                "error = %ld", logid->lgl_oid, logid->lgl_ogen,
822                                PTR_ERR(handle->lgh_file));
823                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
824                 }
825                 handle->lgh_id = *logid;
826
827         } else if (name) {
828                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
829                 if (IS_ERR(handle->lgh_file)) {
830                         CERROR("cannot open %s file, error = %ld\n", 
831                                name, PTR_ERR(handle->lgh_file));
832                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
833                 }
834                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
835                 
836                 handle->lgh_id.lgl_ogr = 1;
837                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
838                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
839                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
840                 if (rc)
841                         GOTO(cleanup, rc);
842
843         } else {
844                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
845                 if (IS_ERR(handle->lgh_file)) {
846                         CERROR("cannot create llog object, error = %ld\n", 
847                                PTR_ERR(handle->lgh_file));
848                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
849                 }
850         }
851
852         handle->lgh_ctxt = ctxt;
853 finish:
854         if (ctxt->loc_lvfs_ctxt)
855                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
856         RETURN(rc);
857 cleanup:
858         llog_free_handle(handle);
859         goto finish;
860 }
861
862 static int llog_lvfs_close(struct llog_handle *handle)
863 {
864         int rc;
865         ENTRY;
866
867         rc = filp_close(handle->lgh_file, 0);
868         if (rc)
869                 CERROR("error closing log: rc %d\n", rc);
870         RETURN(rc);
871 }
872
873 static int llog_lvfs_destroy(struct llog_handle *loghandle)
874 {
875         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
876         struct lvfs_run_ctxt saved;
877         struct dentry *fdentry;
878         struct inode *parent_inode;
879         char id_name[LL_ID_NAMELEN];
880         void *handle;
881         int rc = -EINVAL, err, id_len;
882         ENTRY;
883         
884         if (ctxt->loc_lvfs_ctxt)
885                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
886         
887         fdentry = loghandle->lgh_file->f_dentry;
888         parent_inode = fdentry->d_parent->d_inode;
889         
890         if (!strcmp((char *)fdentry->d_parent->d_name.name, "LOGS")) {
891                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
892                 
893                 id_len = ll_id2str(id_name, fdentry->d_inode->i_ino,
894                                    fdentry->d_inode->i_generation);
895                 dget(fdentry);
896                 rc = llog_lvfs_close(loghandle);
897                 if (rc) {
898                         dput(fdentry);
899                         GOTO(out, rc);
900                 }
901                 
902                 handle = llog_fsfilt_start(ctxt, parent_inode,
903                                            FSFILT_OP_UNLINK, NULL);
904                 if (IS_ERR(handle)) {
905                         dput(fdentry);
906                         GOTO(out, rc = PTR_ERR(handle));
907                 }
908                 
909                 down(&parent_inode->i_sem);
910                 rc = vfs_unlink(parent_inode, fdentry);
911                 up(&parent_inode->i_sem);
912                 dput(fdentry);
913                 
914                 if (!rc) {
915                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
916                         fdentry = lookup_one_len(id_name, ctxt->loc_objects_dir,
917                                                  id_len);
918                         if (fdentry == NULL || fdentry->d_inode == NULL) {
919                                 CERROR("destroy non_existent object %s\n", 
920                                        id_name);
921                                 GOTO(out_err, rc = IS_ERR(fdentry) ?
922                                      PTR_ERR(fdentry) : -ENOENT);
923                         }
924                         rc = vfs_unlink(ctxt->loc_objects_dir->d_inode, fdentry);
925                         l_dput(fdentry);
926 out_err:
927                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
928                 }
929                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
930                 if (err && !rc)
931                         err = rc;
932                 
933                 GOTO(out, rc);
934         }
935         if (ctxt->loc_alone) {
936                 if (!strcmp((char *)fdentry->d_parent->d_name.name, "OBJECTS")) {
937                         LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
938                         
939                         dget(fdentry);
940                         rc = llog_lvfs_close(loghandle);
941                         if (rc == 0) {
942                                 down(&parent_inode->i_sem);
943                                 rc = vfs_unlink(parent_inode, fdentry);
944                                 up(&parent_inode->i_sem);
945                         }
946                         dput(fdentry);
947                 }
948         } else {
949                 struct obdo *oa = NULL;
950  
951                 OBD_ALLOC(oa, sizeof(*oa));
952                 if (!oa)
953                         GOTO(out, rc = -ENOMEM);
954                 
955                 oa->o_id = loghandle->lgh_id.lgl_oid;
956                 oa->o_gr = loghandle->lgh_id.lgl_ogr;
957                 oa->o_generation = loghandle->lgh_id.lgl_ogen;
958                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
959                 
960                 rc = llog_lvfs_close(loghandle);
961                 if (rc)
962                         GOTO(out_free_oa, rc);
963                 
964                 rc = obd_destroy(loghandle->lgh_ctxt->loc_exp, oa, NULL, NULL);
965 out_free_oa:
966                 OBD_FREE(oa, sizeof(*oa));
967         }
968 out:
969         if (ctxt->loc_lvfs_ctxt)
970                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
971         RETURN(rc);
972 }
973
974 /* reads the catalog list */
975 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
976                       struct fsfilt_operations *fsops, const char *name,
977                       int count, struct llog_catid *idarray)
978 {
979         struct lvfs_run_ctxt saved;
980         struct l_file *file;
981         int size = sizeof(*idarray) * count;
982         loff_t off = 0;
983         int rc;
984
985         LASSERT(count);
986
987         if (ctxt)
988                 push_ctxt(&saved, ctxt, NULL);
989         file = l_filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
990         if (!file || IS_ERR(file)) {
991                 rc = PTR_ERR(file);
992                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
993                        name, rc);
994                 GOTO(out, rc);
995         }
996
997         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
998                 CERROR("%s is not a regular file!: mode = %o\n", name,
999                        file->f_dentry->d_inode->i_mode);
1000                 GOTO(out, rc = -ENOENT);
1001         }
1002
1003         rc = fsops->fs_read_record(file, idarray, size, &off);
1004         if (rc) {
1005                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
1006                        name, rc);
1007                 GOTO(out, rc);
1008         }
1009
1010  out:
1011         if (file && !IS_ERR(file))
1012                 rc = filp_close(file, 0);
1013         if (ctxt)
1014                 pop_ctxt(&saved, ctxt, NULL);
1015         RETURN(rc);
1016 }
1017 EXPORT_SYMBOL(llog_get_cat_list);
1018
1019 /* writes the cat list */
1020 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1021                       struct fsfilt_operations *fsops, const char *name,
1022                       int count, struct llog_catid *idarray)
1023 {
1024         struct lvfs_run_ctxt saved;
1025         struct l_file *file;
1026         int size = sizeof(*idarray) * count;
1027         loff_t off = 0;
1028         int rc;
1029
1030         LASSERT(count);
1031
1032         if (ctxt)
1033                 push_ctxt(&saved, ctxt, NULL);
1034         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
1035         if (!file || IS_ERR(file)) {
1036                 rc = PTR_ERR(file);
1037                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
1038                        name, rc);
1039                 GOTO(out, rc);
1040         }
1041
1042         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
1043                 CERROR("%s is not a regular file!: mode = %o\n", name,
1044                        file->f_dentry->d_inode->i_mode);
1045                 GOTO(out, rc = -ENOENT);
1046         }
1047
1048         rc = fsops->fs_write_record(file, idarray, size, &off, 1);
1049         if (rc) {
1050                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
1051                        name, rc);
1052                 GOTO(out, rc);
1053         }
1054
1055  out:
1056         if (file && !IS_ERR(file))
1057                 rc = filp_close(file, 0);
1058         if (ctxt)
1059                 pop_ctxt(&saved, ctxt, NULL);
1060         RETURN(rc);
1061 }
1062 EXPORT_SYMBOL(llog_put_cat_list);
1063
1064 struct llog_operations llog_lvfs_ops = {
1065         lop_open:        llog_lvfs_open,
1066         lop_destroy:     llog_lvfs_destroy,
1067         lop_close:       llog_lvfs_close,
1068         lop_read_header: llog_lvfs_read_header,
1069         lop_write_rec:   llog_lvfs_write_rec,
1070         lop_next_block:  llog_lvfs_next_block,
1071         lop_prev_block:  llog_lvfs_prev_block,
1072 };
1073 EXPORT_SYMBOL(llog_lvfs_ops);
1074
1075 #else /* !__KERNEL__ */
1076
1077 static int llog_lvfs_read_header(struct llog_handle *handle)
1078 {
1079         LBUG();
1080         return 0;
1081 }
1082
1083 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
1084                                struct llog_rec_hdr *rec,
1085                                struct llog_cookie *reccookie, int cookiecount,
1086                                void *buf, int idx)
1087 {
1088         LBUG();
1089         return 0;
1090 }
1091
1092 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
1093                           struct llog_logid *logid, char *name, int flags)
1094 {
1095         LBUG();
1096         return 0;
1097 }
1098
1099 static int llog_lvfs_close(struct llog_handle *handle)
1100 {
1101         LBUG();
1102         return 0;
1103 }
1104
1105 static int llog_lvfs_destroy(struct llog_handle *handle)
1106 {
1107         LBUG();
1108         return 0;
1109 }
1110
1111 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
1112                       struct fsfilt_operations *fsops, const char *name,
1113                       int count, struct llog_catid *idarray)
1114 {
1115         LBUG();
1116         return 0;
1117 }
1118
1119 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1120                       struct fsfilt_operations *fsops, const char *name,
1121                       int count, struct llog_catid *idarray)
1122 {
1123         LBUG();
1124         return 0;
1125 }
1126
1127 int llog_lvfs_prev_block(struct llog_handle *loghandle,
1128                          int prev_idx, void *buf, int len)
1129 {
1130         LBUG();
1131         return 0;
1132 }
1133
1134 int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
1135                          int next_idx, __u64 *offset, void *buf, int len)
1136 {
1137         LBUG();
1138         return 0;
1139 }
1140
1141 struct llog_operations llog_lvfs_ops = {
1142         lop_open:        llog_lvfs_open,
1143         lop_destroy:     llog_lvfs_destroy,
1144         lop_close:       llog_lvfs_close,
1145         lop_read_header: llog_lvfs_read_header,
1146         lop_write_rec:   llog_lvfs_write_rec,
1147         lop_next_block:  llog_lvfs_next_block,
1148         lop_prev_block:  llog_lvfs_prev_block,
1149 };
1150 #endif