Whamcloud - gitweb
b=3550
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lvfs.h>
42 #include <linux/lustre_fsfilt.h>
43 #include <linux/lustre_log.h>
44
45 #ifdef __KERNEL__
46
47 static int llog_lvfs_pad(struct llog_ctxt *ctxt, struct l_file *file,
48                          int len, int index)
49 {
50         struct llog_rec_hdr rec;
51         struct llog_rec_tail tail;
52         int rc;
53         ENTRY;
54
55         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
56
57         tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
58         tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
59         rec.lrh_type = 0;
60
61         rc = llog_fsfilt_write_record(ctxt, file, &rec, sizeof(rec),
62                                       &file->f_pos, 0);
63         if (rc) {
64                 CERROR("error writing padding record: rc %d\n", rc);
65                 goto out;
66         }
67
68         file->f_pos += len - sizeof(rec) - sizeof(tail);
69         rc = llog_fsfilt_write_record(ctxt, file, &tail, sizeof(tail),
70                                       &file->f_pos, 0);
71         if (rc) {
72                 CERROR("error writing padding record: rc %d\n", rc);
73                 goto out;
74         }
75
76  out:
77         RETURN(rc);
78 }
79
80 static int llog_lvfs_write_blob(struct llog_ctxt *ctxt, struct l_file *file,
81                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
82 {
83         int rc;
84         struct llog_rec_tail end;
85         loff_t saved_off = file->f_pos;
86         int buflen = le32_to_cpu(rec->lrh_len);
87
88         ENTRY;
89         file->f_pos = off;
90
91         if (!buf) {
92                 rc = llog_fsfilt_write_record(ctxt, file, rec, buflen,
93                                               &file->f_pos, 0);
94                 if (rc) {
95                         CERROR("error writing log record: rc %d\n", rc);
96                         goto out;
97                 }
98                 GOTO(out, rc = 0);
99         }
100
101         /* the buf case */
102         rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
103         rc = llog_fsfilt_write_record(ctxt, file, rec, sizeof(*rec),
104                                       &file->f_pos, 0);
105         if (rc) {
106                 CERROR("error writing log hdr: rc %d\n", rc);
107                 goto out;
108         }
109
110         rc = llog_fsfilt_write_record(ctxt, file, buf, buflen,
111                                       &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log buffer: rc %d\n", rc);
114                 goto out;
115         }
116
117         end.lrt_len = rec->lrh_len;
118         end.lrt_index = rec->lrh_index;
119         rc = llog_fsfilt_write_record(ctxt, file, &end, sizeof(end),
120                                       &file->f_pos, 0);
121         if (rc) {
122                 CERROR("error writing log tail: rc %d\n", rc);
123                 goto out;
124         }
125
126         rc = 0;
127  out:
128         if (saved_off > file->f_pos)
129                 file->f_pos = saved_off;
130         LASSERT(rc <= 0);
131         RETURN(rc);
132 }
133
134 static int llog_lvfs_read_blob(struct llog_ctxt *ctxt, struct l_file *file,
135                                void *buf, int size, loff_t off)
136 {
137         loff_t offset = off;
138         int rc;
139         ENTRY;
140
141         rc = llog_fsfilt_read_record(ctxt, file, buf, size, &offset);
142         if (rc) {
143                 CERROR("error reading log record: rc %d\n", rc);
144                 RETURN(rc);
145         }
146         RETURN(0);
147 }
148
149 static int llog_lvfs_read_header(struct llog_handle *handle)
150 {
151         struct llog_ctxt *ctxt = handle->lgh_ctxt;
152         int rc;
153         ENTRY;
154
155         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
156         LASSERT(ctxt != NULL);
157
158         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
159                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
160                 RETURN(LLOG_EEMPTY);
161         }
162
163         rc = llog_lvfs_read_blob(ctxt, handle->lgh_file, handle->lgh_hdr,
164                                  LLOG_CHUNK_SIZE, 0);
165         if (rc)
166                 CERROR("error reading log header\n");
167
168         handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
169         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
170
171         RETURN(rc);
172 }
173
174 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
175 /* appends if idx == -1, otherwise overwrites record idx. */
176 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
177                                struct llog_rec_hdr *rec,
178                                struct llog_cookie *reccookie,
179                                int cookiecount,
180                                void *buf, int idx)
181 {
182         struct llog_log_hdr *llh;
183         int reclen = le32_to_cpu(rec->lrh_len), index, rc;
184         struct llog_rec_tail *lrt;
185         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
186         struct file *file;
187         loff_t offset;
188         size_t left;
189         ENTRY;
190
191         llh = loghandle->lgh_hdr;
192         file = loghandle->lgh_file;
193
194         /* record length should not bigger than LLOG_CHUNK_SIZE */
195         if (buf)
196                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
197                       - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
198         else
199                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
200         if (rc)
201                 RETURN(rc);
202
203         if (idx != -1) {
204                 loff_t saved_offset;
205
206                 /* no header: only allowed to insert record 1 */
207                 if (idx > 1 && !file->f_dentry->d_inode->i_size) {
208                         CERROR("idx != -1 in empty log\n");
209                         LBUG();
210                 }
211
212                 if (idx && llh->llh_size && llh->llh_size != reclen)
213                         RETURN(-EINVAL);
214
215                 rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
216                 /* we are done if we only write the header or on error */
217                 if (rc || idx == 0)
218                         RETURN(rc);
219
220                 saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
221                 rc = llog_lvfs_write_blob(ctxt, file, rec, buf, saved_offset);
222                 if (rc == 0 && reccookie) {
223                         reccookie->lgc_lgl = loghandle->lgh_id;
224                         reccookie->lgc_index = idx;
225                         rc = 1;
226                 }
227                 RETURN(rc);
228         }
229
230         /* Make sure that records don't cross a chunk boundary, so we can
231          * process them page-at-a-time if needed.  If it will cross a chunk
232          * boundary, write in a fake (but referenced) entry to pad the chunk.
233          *
234          * We know that llog_current_log() will return a loghandle that is
235          * big enough to hold reclen, so all we care about is padding here.
236          */
237         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
238         if (buf)
239                 reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
240                          sizeof(struct llog_rec_tail);
241
242         /* NOTE: padding is a record, but no bit is set */
243         if (left != 0 && left != reclen &&
244             left < (reclen + LLOG_MIN_REC_SIZE)) {
245                 loghandle->lgh_last_idx++;
246                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
247                 if (rc)
248                         RETURN(rc);
249         }
250
251         loghandle->lgh_last_idx++;
252         index = loghandle->lgh_last_idx;
253         rec->lrh_index = cpu_to_le32(index);
254         if (buf == NULL) {
255                 lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
256                 lrt->lrt_len = rec->lrh_len;
257                 lrt->lrt_index = rec->lrh_index;
258         }
259         if (ext2_set_bit(index, llh->llh_bitmap)) {
260                 CERROR("argh, index %u already set in log bitmap?\n", index);
261                 LBUG(); /* should never happen */
262         }
263         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
264         llh->llh_tail.lrt_index = cpu_to_le32(index);
265
266         offset = 0;
267         rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
268         if (rc)
269                 RETURN(rc);
270
271         CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
272                loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
273                file->f_pos);
274
275         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
276         if (rc)
277                 RETURN(rc);
278
279         if (rc == 0 && reccookie) {
280                 if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
281                         LASSERT(EQ_LOGID(reccookie->lgc_lgl, loghandle->lgh_id));
282                         LASSERT(reccookie->lgc_index == index);        
283                 } else {
284                         reccookie->lgc_lgl = loghandle->lgh_id;
285                         reccookie->lgc_index = index;
286                         llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
287                 }
288
289                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
290                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
291                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
292                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
293                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
294                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
295                 else
296                         reccookie->lgc_subsys = -1;
297                 rc = 1;
298         }
299         if (rc == 0 && (le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC ||
300             le32_to_cpu(rec->lrh_type) == SMFS_UPDATE_REC))
301                 rc = 1;
302
303         RETURN(rc);
304 }
305
306 /* We can skip reading at least as many log blocks as the number of
307 * minimum sized log records we are skipping.  If it turns out
308 * that we are not far enough along the log (because the
309 * actual records are larger than minimum size) we just skip
310 * some more records. */
311
312 static void llog_skip_over(__u64 *off, int curr, int goal)
313 {
314         if (goal <= curr)
315                 return;
316         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
317                 ~(LLOG_CHUNK_SIZE - 1);
318 }
319
320 /* sets:
321  *  - curr_offset to the furthest point read in the log file
322  *  - curr_idx to the log index preceeding curr_offset
323  * returns -EIO/-EINVAL on error
324  */
325 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
326                                 int next_idx, __u64 *curr_offset, void *buf,
327                                 int len)
328 {
329         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
330         ENTRY;
331
332         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
333                 RETURN(-EINVAL);
334
335         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
336                next_idx, *curr_idx, *curr_offset);
337
338         while (*curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
339                 struct llog_rec_hdr *rec;
340                 struct llog_rec_tail *tail;
341                 loff_t ppos;
342                 int nbytes, rc;
343
344                 llog_skip_over(curr_offset, *curr_idx, next_idx);
345
346                 ppos = *curr_offset;
347                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
348                                              buf, len, &ppos);
349
350                 if (rc) {
351                         CERROR("Cant read llog block at log id "LPU64
352                                "/%u offset "LPU64"\n",
353                                loghandle->lgh_id.lgl_oid,
354                                loghandle->lgh_id.lgl_ogen,
355                                *curr_offset);
356                         RETURN(rc);
357                 }
358
359                 nbytes = ppos - *curr_offset;
360                 *curr_offset = ppos;
361
362                 if (nbytes == 0) /* end of file, nothing to do */
363                         RETURN(0);
364
365                 if (nbytes < sizeof(*tail)) {
366                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
367                                LPU64"\n", loghandle->lgh_id.lgl_oid,
368                                loghandle->lgh_id.lgl_ogen, *curr_offset);
369                         RETURN(-EINVAL);
370                 }
371
372                 tail = buf + nbytes - sizeof(struct llog_rec_tail);
373                 *curr_idx = le32_to_cpu(tail->lrt_index);
374
375                 /* this shouldn't happen */
376                 if (tail->lrt_index == 0) {
377                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
378                                LPU64"\n", loghandle->lgh_id.lgl_oid,
379                                loghandle->lgh_id.lgl_ogen, *curr_offset);
380                         RETURN(-EINVAL);
381                 }
382                 if (le32_to_cpu(tail->lrt_index) < next_idx)
383                         continue;
384
385                 /* sanity check that the start of the new buffer is no farther
386                  * than the record that we wanted.  This shouldn't happen. */
387                 rec = buf;
388                 if (le32_to_cpu(rec->lrh_index) > next_idx) {
389                         CERROR("missed desired record? %u > %u\n",
390                                le32_to_cpu(rec->lrh_index), next_idx);
391                         RETURN(-ENOENT);
392                 }
393                 RETURN(0);
394         }
395         RETURN(-EIO);
396 }
397
398 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
399                                 int prev_idx, void *buf, int len)
400 {
401         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
402         __u64 curr_offset;
403         int rc;
404         ENTRY;
405
406         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
407                 RETURN(-EINVAL);
408
409         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
410
411         curr_offset = LLOG_CHUNK_SIZE;
412         llog_skip_over(&curr_offset, 0, prev_idx);
413
414         while (curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
415                 struct llog_rec_hdr *rec;
416                 struct llog_rec_tail *tail;
417                 loff_t ppos;
418
419                 ppos = curr_offset;
420                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
421                                              buf, len, &ppos);
422
423                 if (rc) {
424                         CERROR("Cant read llog block at log id "LPU64
425                                "/%u offset "LPU64"\n",
426                                loghandle->lgh_id.lgl_oid,
427                                loghandle->lgh_id.lgl_ogen,
428                                curr_offset);
429                         RETURN(rc);
430                 }
431
432                 /* put number of bytes read into rc to make code simpler */
433                 rc = ppos - curr_offset;
434                 curr_offset = ppos;
435
436                 if (rc == 0) /* end of file, nothing to do */
437                         RETURN(0);
438
439                 if (rc < sizeof(*tail)) {
440                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
441                                LPU64"\n", loghandle->lgh_id.lgl_oid,
442                                loghandle->lgh_id.lgl_ogen, curr_offset);
443                         RETURN(-EINVAL);
444                 }
445
446                 tail = buf + rc - sizeof(struct llog_rec_tail);
447
448                 /* this shouldn't happen */
449                 if (tail->lrt_index == 0) {
450                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
451                                LPU64"\n", loghandle->lgh_id.lgl_oid,
452                                loghandle->lgh_id.lgl_ogen, curr_offset);
453                         RETURN(-EINVAL);
454                 }
455                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
456                         continue;
457
458                 /* sanity check that the start of the new buffer is no farther
459                  * than the record that we wanted.  This shouldn't happen. */
460                 rec = buf;
461                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
462                         CERROR("missed desired record? %u > %u\n",
463                                le32_to_cpu(rec->lrh_index), prev_idx);
464                         RETURN(-ENOENT);
465                 }
466                 RETURN(0);
467         }
468         RETURN(-EIO);
469 }
470
471 static struct file *llog_filp_open(char *name, int flags, int mode)
472 {
473         char *logname;
474         struct file *filp;
475         int len;
476
477         OBD_ALLOC(logname, PATH_MAX);
478         if (logname == NULL)
479                 return ERR_PTR(-ENOMEM);
480
481         len = snprintf(logname, PATH_MAX, "LOGS/%s", name);
482         if (len >= PATH_MAX - 1) {
483                 filp = ERR_PTR(-ENAMETOOLONG);
484         } else {
485                 filp = l_filp_open(logname, flags, mode);
486                 if (IS_ERR(filp)) {
487                         CERROR("logfile %s(%s): %ld\n",
488                                flags & O_CREAT ? "create" : "open", logname,
489                                PTR_ERR(filp));
490                 }
491         }
492
493         OBD_FREE(logname, PATH_MAX);
494         return filp;
495 }
496
497 /* creates object for the case when we have no obd (smfs). */
498 static struct file *
499 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
500 {
501         struct file *filp;
502         int rc = 0;
503         ENTRY;
504
505         LASSERT(lgh_id != NULL);
506         if (lgh_id->lgl_oid) {
507                 struct dentry *dchild;
508                 char fidname[LL_FID_NAMELEN];
509                 int fidlen = 0;
510
511                 down(&ctxt->loc_objects_dir->d_inode->i_sem);
512                 fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
513                 dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
514                 if (IS_ERR(dchild)) {
515                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
516                         RETURN((struct file *)dchild);
517                 }
518                 if (dchild->d_inode == NULL) {
519                         struct dentry_params dp;
520                         struct inode *inode;
521
522                         dchild->d_fsdata = (void *) &dp;
523                         dp.p_ptr = NULL;
524                         dp.p_inum = lgh_id->lgl_oid;
525                         rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
526                                            dchild, S_IFREG, NULL);
527                         if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
528                                 dchild->d_fsdata = NULL;
529                         if (rc) {
530                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
531                                 dput(dchild);
532                                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
533                                 RETURN(ERR_PTR(rc));
534                         }
535                         inode = dchild->d_inode;
536                         LASSERT(inode->i_ino == lgh_id->lgl_oid);
537                         inode->i_generation = lgh_id->lgl_ogen;
538                         CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
539                                inode->i_ino, inode->i_generation);
540                         mark_inode_dirty(inode);
541                 }
542
543                 mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
544                 filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
545                                     O_RDWR | O_LARGEFILE);
546                 if (IS_ERR(filp)) {
547                         dput(dchild);
548                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
549                         RETURN(filp);
550                 }
551                 if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
552                         CERROR("%s is not a regular file!: mode = %o\n", fidname,
553                                filp->f_dentry->d_inode->i_mode);
554                         filp_close(filp, 0);
555                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
556                         RETURN(ERR_PTR(-ENOENT));
557                 }
558
559                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
560                 RETURN(filp);
561
562         } else {
563                 unsigned int tmpname = ll_insecure_random_int();
564                 char fidname[LL_FID_NAMELEN];
565                 struct dentry *new_child, *parent;
566                 void *handle;
567                 int err, namelen;
568
569                 sprintf(fidname, "OBJECTS/%u", tmpname);
570                 filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
571                 if (IS_ERR(filp)) {
572                         rc = PTR_ERR(filp);
573                         if (rc == -EEXIST) {
574                                 CERROR("impossible object name collision %u\n",
575                                         tmpname);
576                                 LBUG();
577                         }
578                         CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
579                         RETURN(filp);
580                 }
581
582                 namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
583                                      filp->f_dentry->d_inode->i_generation);
584                 parent = filp->f_dentry->d_parent;
585                 down(&parent->d_inode->i_sem);
586                 new_child = lookup_one_len(fidname, parent, namelen);
587                 if (IS_ERR(new_child)) {
588                         CERROR("getting neg dentry for obj rename: %d\n", rc);
589                         GOTO(out_close, rc = PTR_ERR(new_child));
590                 }
591                 if (new_child->d_inode != NULL) {
592                         CERROR("impossible non-negative obj dentry %lu:%u!\n",
593                                 filp->f_dentry->d_inode->i_ino,
594                                 filp->f_dentry->d_inode->i_generation);
595                         LBUG();
596                 }
597
598                 handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
599                 if (IS_ERR(handle))
600                         GOTO(out_dput, rc = PTR_ERR(handle));
601
602                 lock_kernel();
603                 rc = vfs_rename(parent->d_inode, filp->f_dentry,
604                                 parent->d_inode, new_child);
605                 unlock_kernel();
606                 if (rc)
607                         CERROR("error renaming new object %lu:%u: rc %d\n",
608                                 filp->f_dentry->d_inode->i_ino,
609                                 filp->f_dentry->d_inode->i_generation, rc);
610
611                 err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
612                 if (!rc)
613                         rc = err;
614
615         out_dput:
616                 dput(new_child);
617         out_close:
618                 up(&parent->d_inode->i_sem);
619                 if (rc) {
620                         filp_close(filp, 0);
621                         filp = ERR_PTR(rc);
622                 } else {
623                         /* FIXME: is this group 1 is correct? */
624                         lgh_id->lgl_ogr = 1;
625                         lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
626                         lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
627                 }
628                 RETURN(filp);
629         }
630 }
631
632 /* creates object for generic case (obd exists) */
633 static struct file *
634 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
635 {
636         struct file *filp;
637         struct dentry *dchild;
638         struct obd_device *obd;
639         struct obdo *oa = NULL;
640         int open_flags = O_RDWR | O_LARGEFILE;
641         int rc = 0;
642         ENTRY;
643
644         obd = ctxt->loc_exp->exp_obd;
645         LASSERT(obd != NULL);
646
647         if (lgh_id->lgl_oid) {
648                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
649                                              lgh_id->lgl_ogen, lgh_id->lgl_ogr);
650                 if (IS_ERR(dchild) == -ENOENT) {
651                         OBD_ALLOC(oa, sizeof(*oa));
652                         if (!oa)
653                                 RETURN(ERR_PTR(-ENOMEM));
654
655                         oa->o_id = lgh_id->lgl_oid;
656                         oa->o_generation = lgh_id->lgl_ogen;
657                         oa->o_gr = lgh_id->lgl_ogr;
658                         oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
659                         rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
660                         if (rc) {
661                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
662                                 GOTO(out_free_oa, rc);
663                         }
664                         CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
665                                lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
666
667                         dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
668                                                      lgh_id->lgl_ogen, lgh_id->lgl_ogr);
669                 } else if (IS_ERR(dchild)) {
670                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
671                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
672                         RETURN((struct file *)dchild);
673                 }
674
675                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
676                 if (IS_ERR(filp)) {
677                         l_dput(dchild);
678                         rc = PTR_ERR(filp);
679                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
680                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
681                 }
682                 GOTO(out_free_oa, rc);
683         } else {
684                 /* this is important to work here over obd_create() as it manages 
685                   groups and we need it. Yet another reason is that mds_obd_create()
686                  is fully the same as old version of this function and this helps
687                  us to avoid code duplicating and layering violating. */
688                 OBD_ALLOC(oa, sizeof(*oa));
689                 if (!oa)
690                         RETURN(ERR_PTR(-ENOMEM));
691                 
692                 oa->o_gr = FILTER_GROUP_LLOG;
693                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
694                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
695                 if (rc)
696                         GOTO(out_free_oa, rc);
697
698                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
699                                              oa->o_generation, oa->o_gr);
700                 if (IS_ERR(dchild))
701                         GOTO(out_free_oa, rc = PTR_ERR(dchild));
702
703                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
704                                      open_flags);
705                 if (IS_ERR(filp)) {
706                         l_dput(dchild);
707                         GOTO(out_free_oa, rc = PTR_ERR(filp));
708                 }
709
710                 /* group 1 is not longer valid, we use the group which is set 
711                 by obd_create()->mds_obd_create(). */
712                 lgh_id->lgl_ogr = oa->o_gr;
713                 lgh_id->lgl_oid = oa->o_id;
714                 lgh_id->lgl_ogen = oa->o_generation;
715         }
716
717 out_free_oa:
718         if (rc)
719                 filp = ERR_PTR(rc);
720         if (oa)
721                 OBD_FREE(oa, sizeof(*oa));
722         RETURN(filp);
723 }
724
725 static struct file *
726 llog_object_create(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
727 {
728         if (ctxt->loc_alone)
729                 return llog_object_create_alone(ctxt, lgh_id);
730         else
731                 return llog_object_create_generic(ctxt, lgh_id);
732 }
733
734 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
735                                 struct dentry *dentry)
736 {
737         struct dentry *new_child;
738         char fidname[LL_FID_NAMELEN];
739         void *handle;
740         int namelen, rc = 0, err;
741         ENTRY;
742         
743         namelen = ll_fid2str(fidname, logid.lgl_oid, logid.lgl_ogen);
744         down(&ctxt->loc_objects_dir->d_inode->i_sem);
745         new_child = lookup_one_len(fidname, ctxt->loc_objects_dir, namelen);
746         if (IS_ERR(new_child)) {
747                 CERROR("getting neg dentry for obj rename: %d\n", rc);
748                 GOTO(out, rc = PTR_ERR(new_child));
749         }
750         if (new_child->d_inode == dentry->d_inode)
751                 GOTO(out_dput, rc);
752         if (new_child->d_inode != NULL) {
753                 CERROR("impossible non-negative obj dentry "LPX64":%u!\n",
754                        logid.lgl_oid, logid.lgl_ogen);
755                 LBUG();
756         }
757         handle = llog_fsfilt_start(ctxt, ctxt->loc_objects_dir->d_inode,
758                                    FSFILT_OP_LINK, NULL);
759         if (IS_ERR(handle))
760                 GOTO(out_dput, rc = PTR_ERR(handle));
761         
762         lock_kernel();
763         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
764         unlock_kernel();
765         if (rc)
766                 CERROR("error link new object "LPX64":%u: rc %d\n",
767                        logid.lgl_oid, logid.lgl_ogen, rc);
768         err = llog_fsfilt_commit(ctxt, ctxt->loc_objects_dir->d_inode, handle, 0);
769 out_dput:
770         l_dput(new_child);
771 out:
772         up(&ctxt->loc_objects_dir->d_inode->i_sem);
773         RETURN(rc);
774 }
775
776 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
777                           struct llog_logid *logid, char *name, int flags)
778 {
779         struct llog_handle *handle;
780         struct lvfs_run_ctxt saved;
781         int rc = 0;
782         int open_flags = O_RDWR | O_LARGEFILE;
783         ENTRY;
784
785         if (flags & OBD_LLOG_FL_CREATE)
786                 open_flags |= O_CREAT;
787
788         handle = llog_alloc_handle();
789         if (handle == NULL)
790                 RETURN(-ENOMEM);
791         *res = handle;
792         
793         LASSERT(ctxt);
794         if (ctxt->loc_lvfs_ctxt)
795                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
796         
797         if (logid != NULL) {
798                 handle->lgh_file = llog_object_create(ctxt, logid);
799                 if (IS_ERR(handle->lgh_file)) {
800                         CERROR("cannot create/open llog object "LPX64":%x "
801                                "error = %ld", logid->lgl_oid, logid->lgl_ogen,
802                                PTR_ERR(handle->lgh_file));
803                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
804                 }
805                 handle->lgh_id = *logid;
806
807         } else if (name) {
808                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
809                 if (IS_ERR(handle->lgh_file)) {
810                         CERROR("cannot open %s file, error = %ld\n", 
811                                name, PTR_ERR(handle->lgh_file));
812                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
813                 }
814                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
815                 
816                 handle->lgh_id.lgl_ogr = 1;
817                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
818                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
819                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
820                 if (rc)
821                         GOTO(cleanup, rc);
822
823         } else {
824                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
825                 if (IS_ERR(handle->lgh_file)) {
826                         CERROR("cannot create llog object, error = %ld\n", 
827                                PTR_ERR(handle->lgh_file));
828                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
829                 }
830         }
831
832         handle->lgh_ctxt = ctxt;
833 finish:
834         if (ctxt->loc_lvfs_ctxt)
835                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
836         RETURN(rc);
837 cleanup:
838         llog_free_handle(handle);
839         goto finish;
840 }
841
842 static int llog_lvfs_close(struct llog_handle *handle)
843 {
844         int rc;
845         ENTRY;
846
847         rc = filp_close(handle->lgh_file, 0);
848         if (rc)
849                 CERROR("error closing log: rc %d\n", rc);
850         RETURN(rc);
851 }
852
853 static int llog_lvfs_destroy(struct llog_handle *loghandle)
854 {
855         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
856         struct lvfs_run_ctxt saved;
857         struct dentry *fdentry;
858         struct inode *parent_inode;
859         char fidname[LL_FID_NAMELEN];
860         void *handle;
861         int rc = -EINVAL, err, namelen;
862         ENTRY;
863         
864         if (ctxt->loc_lvfs_ctxt)
865                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
866         
867         fdentry = loghandle->lgh_file->f_dentry;
868         parent_inode = fdentry->d_parent->d_inode;
869         
870         if (!strcmp(fdentry->d_parent->d_name.name, "LOGS")) {
871                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
872                 
873                 namelen = ll_fid2str(fidname, fdentry->d_inode->i_ino,
874                                      fdentry->d_inode->i_generation);
875                 dget(fdentry);
876                 rc = llog_lvfs_close(loghandle);
877                 if (rc) {
878                         dput(fdentry);
879                         GOTO(out, rc);
880                 }
881                 
882                 handle = llog_fsfilt_start(ctxt, parent_inode,
883                                            FSFILT_OP_UNLINK, NULL);
884                 if (IS_ERR(handle)) {
885                         dput(fdentry);
886                         GOTO(out, rc = PTR_ERR(handle));
887                 }
888                 
889                 down(&parent_inode->i_sem);
890                 rc = vfs_unlink(parent_inode, fdentry);
891                 up(&parent_inode->i_sem);
892                 dput(fdentry);
893                 
894                 if (!rc) {
895                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
896                         fdentry = lookup_one_len(fidname, ctxt->loc_objects_dir,
897                                                  namelen);
898                         if (fdentry == NULL || fdentry->d_inode == NULL) {
899                                 CERROR("destroy non_existent object %s\n", fidname);
900                                 GOTO(out_err, rc = IS_ERR(fdentry) ?
901                                      PTR_ERR(fdentry) : -ENOENT);
902                         }
903                         rc = vfs_unlink(ctxt->loc_objects_dir->d_inode, fdentry);
904                         l_dput(fdentry);
905 out_err:
906                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
907                 }
908                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
909                 if (err && !rc)
910                         err = rc;
911                 
912                 GOTO(out, rc);
913         }
914         if (ctxt->loc_alone) {
915                 if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
916                         LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
917                         
918                         dget(fdentry);
919                         rc = llog_lvfs_close(loghandle);
920                         if (rc == 0) {
921                                 down(&parent_inode->i_sem);
922                                 rc = vfs_unlink(parent_inode, fdentry);
923                                 up(&parent_inode->i_sem);
924                         }
925                         dput(fdentry);
926                 }
927         } else {
928                 struct obdo *oa = NULL;
929  
930                 OBD_ALLOC(oa, sizeof(*oa));
931                 if (!oa)
932                         GOTO(out, rc = -ENOMEM);
933                 
934                 oa->o_id = loghandle->lgh_id.lgl_oid;
935                 oa->o_gr = loghandle->lgh_id.lgl_ogr;
936                 oa->o_generation = loghandle->lgh_id.lgl_ogen;
937                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
938                 
939                 rc = llog_lvfs_close(loghandle);
940                 if (rc)
941                         GOTO(out_free_oa, rc);
942                 
943                 rc = obd_destroy(loghandle->lgh_ctxt->loc_exp, oa, NULL, NULL);
944 out_free_oa:
945                 OBD_FREE(oa, sizeof(*oa));
946         }
947 out:
948         if (ctxt->loc_lvfs_ctxt)
949                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
950         RETURN(rc);
951 }
952
953 /* reads the catalog list */
954 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
955                       struct fsfilt_operations *fsops, char *name,
956                       int count, struct llog_catid *idarray)
957 {
958         struct lvfs_run_ctxt saved;
959         struct l_file *file;
960         int size = sizeof(*idarray) * count;
961         loff_t off = 0;
962         int rc;
963
964         LASSERT(count);
965
966         if (ctxt)
967                 push_ctxt(&saved, ctxt, NULL);
968         file = l_filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
969         if (!file || IS_ERR(file)) {
970                 rc = PTR_ERR(file);
971                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
972                        name, rc);
973                 GOTO(out, rc);
974         }
975
976         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
977                 CERROR("%s is not a regular file!: mode = %o\n", name,
978                        file->f_dentry->d_inode->i_mode);
979                 GOTO(out, rc = -ENOENT);
980         }
981
982         rc = fsops->fs_read_record(file, idarray, size, &off);
983         if (rc) {
984                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
985                        name, rc);
986                 GOTO(out, rc);
987         }
988
989  out:
990         if (file && !IS_ERR(file))
991                 rc = filp_close(file, 0);
992         if (ctxt)
993                 pop_ctxt(&saved, ctxt, NULL);
994         RETURN(rc);
995 }
996 EXPORT_SYMBOL(llog_get_cat_list);
997
998 /* writes the cat list */
999 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1000                       struct fsfilt_operations *fsops, char *name,
1001                       int count, struct llog_catid *idarray)
1002 {
1003         struct lvfs_run_ctxt saved;
1004         struct l_file *file;
1005         int size = sizeof(*idarray) * count;
1006         loff_t off = 0;
1007         int rc;
1008
1009         LASSERT(count);
1010
1011         if (ctxt)
1012                 push_ctxt(&saved, ctxt, NULL);
1013         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
1014         if (!file || IS_ERR(file)) {
1015                 rc = PTR_ERR(file);
1016                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
1017                        name, rc);
1018                 GOTO(out, rc);
1019         }
1020
1021         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
1022                 CERROR("%s is not a regular file!: mode = %o\n", name,
1023                        file->f_dentry->d_inode->i_mode);
1024                 GOTO(out, rc = -ENOENT);
1025         }
1026
1027         rc = fsops->fs_write_record(file, idarray, size, &off, 1);
1028         if (rc) {
1029                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
1030                        name, rc);
1031                 GOTO(out, rc);
1032         }
1033
1034  out:
1035         if (file && !IS_ERR(file))
1036                 rc = filp_close(file, 0);
1037         if (ctxt)
1038                 pop_ctxt(&saved, ctxt, NULL);
1039         RETURN(rc);
1040 }
1041 EXPORT_SYMBOL(llog_put_cat_list);
1042
1043 struct llog_operations llog_lvfs_ops = {
1044         lop_open:        llog_lvfs_open,
1045         lop_destroy:     llog_lvfs_destroy,
1046         lop_close:       llog_lvfs_close,
1047         lop_read_header: llog_lvfs_read_header,
1048         lop_write_rec:   llog_lvfs_write_rec,
1049         lop_next_block:  llog_lvfs_next_block,
1050         lop_prev_block:  llog_lvfs_prev_block,
1051 };
1052 EXPORT_SYMBOL(llog_lvfs_ops);
1053
1054 #else /* !__KERNEL__ */
1055
1056 static int llog_lvfs_read_header(struct llog_handle *handle)
1057 {
1058         LBUG();
1059         return 0;
1060 }
1061
1062 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
1063                                struct llog_rec_hdr *rec,
1064                                struct llog_cookie *reccookie, int cookiecount,
1065                                void *buf, int idx)
1066 {
1067         LBUG();
1068         return 0;
1069 }
1070
1071 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
1072                           struct llog_logid *logid, char *name, int flags)
1073 {
1074         LBUG();
1075         return 0;
1076 }
1077
1078 static int llog_lvfs_close(struct llog_handle *handle)
1079 {
1080         LBUG();
1081         return 0;
1082 }
1083
1084 static int llog_lvfs_destroy(struct llog_handle *handle)
1085 {
1086         LBUG();
1087         return 0;
1088 }
1089
1090 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
1091                       struct fsfilt_operations *fsops, char *name,
1092                       int count, struct llog_catid *idarray)
1093 {
1094         LBUG();
1095         return 0;
1096 }
1097
1098 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1099                       struct fsfilt_operations *fsops, char *name,
1100                       int count, struct llog_catid *idarray)
1101 {
1102         LBUG();
1103         return 0;
1104 }
1105
1106 int llog_lvfs_prev_block(struct llog_handle *loghandle,
1107                          int prev_idx, void *buf, int len)
1108 {
1109         LBUG();
1110         return 0;
1111 }
1112
1113 int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
1114                          int next_idx, __u64 *offset, void *buf, int len)
1115 {
1116         LBUG();
1117         return 0;
1118 }
1119
1120 struct llog_operations llog_lvfs_ops = {
1121         lop_open:        llog_lvfs_open,
1122         lop_destroy:     llog_lvfs_destroy,
1123         lop_close:       llog_lvfs_close,
1124         lop_read_header: llog_lvfs_read_header,
1125         lop_write_rec:   llog_lvfs_write_rec,
1126         lop_next_block:  llog_lvfs_next_block,
1127         lop_prev_block:  llog_lvfs_prev_block,
1128 };
1129 #endif