Whamcloud - gitweb
LU-911 obdapi: add env to few methods
[fs/lustre-release.git] / lustre / obdclass / llog_lvfs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/obdclass/llog_lvfs.c
35  *
36  * OST<->MDS recovery logging infrastructure.
37  * Invariants in implementation:
38  * - we do not share logs among different OST<->MDS connections, so that
39  *   if an OST or MDS fails it need only look at log(s) relevant to itself
40  *
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46 #ifndef EXPORT_SYMTAB
47 #define EXPORT_SYMTAB
48 #endif
49
50 #ifndef __KERNEL__
51 #include <liblustre.h>
52 #endif
53
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_log.h>
57 #include <obd_ost.h>
58 #include <libcfs/list.h>
59 #include <lvfs.h>
60 #include <lustre_fsfilt.h>
61 #include <lustre_disk.h>
62 #include "llog_internal.h"
63
64 #if defined(__KERNEL__) && defined(LLOG_LVFS)
65
66 static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
67                                 int len, int index)
68 {
69         struct llog_rec_hdr rec = { 0 };
70         struct llog_rec_tail tail;
71         int rc;
72         ENTRY;
73
74         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
75
76         tail.lrt_len = rec.lrh_len = len;
77         tail.lrt_index = rec.lrh_index = index;
78         rec.lrh_type = LLOG_PAD_MAGIC;
79
80         rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
81         if (rc) {
82                 CERROR("error writing padding record: rc %d\n", rc);
83                 goto out;
84         }
85
86         file->f_pos += len - sizeof(rec) - sizeof(tail);
87         rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
88         if (rc) {
89                 CERROR("error writing padding record: rc %d\n", rc);
90                 goto out;
91         }
92
93  out:
94         RETURN(rc);
95 }
96
97 static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
98                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
99 {
100         int rc;
101         struct llog_rec_tail end;
102         loff_t saved_off = file->f_pos;
103         int buflen = rec->lrh_len;
104
105         ENTRY;
106
107         file->f_pos = off;
108
109         if (buflen == 0)
110                 CWARN("0-length record\n");
111
112         if (!buf) {
113                 rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
114                 if (rc) {
115                         CERROR("error writing log record: rc %d\n", rc);
116                         goto out;
117                 }
118                 GOTO(out, rc = 0);
119         }
120
121         /* the buf case */
122         rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
123         rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
124         if (rc) {
125                 CERROR("error writing log hdr: rc %d\n", rc);
126                 goto out;
127         }
128
129         rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
130         if (rc) {
131                 CERROR("error writing log buffer: rc %d\n", rc);
132                 goto out;
133         }
134
135         end.lrt_len = rec->lrh_len;
136         end.lrt_index = rec->lrh_index;
137         rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
138         if (rc) {
139                 CERROR("error writing log tail: rc %d\n", rc);
140                 goto out;
141         }
142
143         rc = 0;
144  out:
145         if (saved_off > file->f_pos)
146                 file->f_pos = saved_off;
147         LASSERT(rc <= 0);
148         RETURN(rc);
149 }
150
151 static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
152                                 void *buf, int size, loff_t off)
153 {
154         loff_t offset = off;
155         int rc;
156         ENTRY;
157
158         rc = fsfilt_read_record(obd, file, buf, size, &offset);
159         if (rc) {
160                 CERROR("error reading log record: rc %d\n", rc);
161                 RETURN(rc);
162         }
163         RETURN(0);
164 }
165
166 static int llog_lvfs_read_header(struct llog_handle *handle)
167 {
168         struct obd_device *obd;
169         int rc;
170         ENTRY;
171
172         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
173
174         obd = handle->lgh_ctxt->loc_exp->exp_obd;
175
176         if (i_size_read(handle->lgh_file->f_dentry->d_inode) == 0) {
177                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
178                 RETURN(LLOG_EEMPTY);
179         }
180
181         rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
182                                  LLOG_CHUNK_SIZE, 0);
183         if (rc) {
184                 CERROR("error reading log header from %.*s\n",
185                        handle->lgh_file->f_dentry->d_name.len,
186                        handle->lgh_file->f_dentry->d_name.name);
187         } else {
188                 struct llog_rec_hdr *llh_hdr = &handle->lgh_hdr->llh_hdr;
189
190                 if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
191                         lustre_swab_llog_hdr(handle->lgh_hdr);
192
193                 if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
194                         CERROR("bad log %.*s header magic: %#x (expected %#x)\n",
195                                handle->lgh_file->f_dentry->d_name.len,
196                                handle->lgh_file->f_dentry->d_name.name,
197                                llh_hdr->lrh_type, LLOG_HDR_MAGIC);
198                         rc = -EIO;
199                 } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
200                         CERROR("incorrectly sized log %.*s header: %#x "
201                                "(expected %#x)\n",
202                                handle->lgh_file->f_dentry->d_name.len,
203                                handle->lgh_file->f_dentry->d_name.name,
204                                llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
205                         CERROR("you may need to re-run lconf --write_conf.\n");
206                         rc = -EIO;
207                 }
208         }
209
210         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
211         handle->lgh_file->f_pos = i_size_read(handle->lgh_file->f_dentry->d_inode);
212
213         RETURN(rc);
214 }
215
216 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
217 /* appends if idx == -1, otherwise overwrites record idx. */
218 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
219                                struct llog_rec_hdr *rec,
220                                struct llog_cookie *reccookie, int cookiecount,
221                                void *buf, int idx)
222 {
223         struct llog_log_hdr *llh;
224         int reclen = rec->lrh_len, index, rc;
225         struct llog_rec_tail *lrt;
226         struct obd_device *obd;
227         struct file *file;
228         size_t left;
229         ENTRY;
230
231         llh = loghandle->lgh_hdr;
232         file = loghandle->lgh_file;
233         obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
234
235         /* record length should not bigger than LLOG_CHUNK_SIZE */
236         if (buf)
237                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
238                       sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
239         else
240                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
241         if (rc)
242                 RETURN(rc);
243
244         if (buf)
245                 /* write_blob adds header and tail to lrh_len. */
246                 reclen = sizeof(*rec) + rec->lrh_len +
247                          sizeof(struct llog_rec_tail);
248
249         if (idx != -1) {
250                 loff_t saved_offset;
251
252                 /* no header: only allowed to insert record 1 */
253                 if (idx != 1 && !i_size_read(file->f_dentry->d_inode)) {
254                         CERROR("idx != -1 in empty log\n");
255                         LBUG();
256                 }
257
258                 if (idx && llh->llh_size && llh->llh_size != rec->lrh_len)
259                         RETURN(-EINVAL);
260
261                 if (!ext2_test_bit(idx, llh->llh_bitmap))
262                         CERROR("Modify unset record %u\n", idx);
263                 if (idx != rec->lrh_index)
264                         CERROR("Index mismatch %d %u\n", idx, rec->lrh_index);
265
266                 rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
267                 /* we are done if we only write the header or on error */
268                 if (rc || idx == 0)
269                         RETURN(rc);
270
271                 /* Assumes constant lrh_len */
272                 saved_offset = sizeof(*llh) + (idx - 1) * reclen;
273
274                 if (buf) {
275                         struct llog_rec_hdr check;
276
277                         /* We assume that caller has set lgh_cur_* */
278                         saved_offset = loghandle->lgh_cur_offset;
279                         CDEBUG(D_OTHER,
280                                "modify record "LPX64": idx:%d/%u/%d, len:%u "
281                                "offset %llu\n",
282                                loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
283                                loghandle->lgh_cur_idx, rec->lrh_len,
284                                (long long)(saved_offset - sizeof(*llh)));
285                         if (rec->lrh_index != loghandle->lgh_cur_idx) {
286                                 CERROR("modify idx mismatch %u/%d\n",
287                                        idx, loghandle->lgh_cur_idx);
288                                 RETURN(-EFAULT);
289                         }
290 #if 1  /* FIXME remove this safety check at some point */
291                         /* Verify that the record we're modifying is the
292                            right one. */
293                         rc = llog_lvfs_read_blob(obd, file, &check,
294                                                  sizeof(check), saved_offset);
295                         if (check.lrh_index != idx || check.lrh_len != reclen) {
296                                 CERROR("Bad modify idx %u/%u size %u/%u (%d)\n",
297                                        idx, check.lrh_index, reclen,
298                                        check.lrh_len, rc);
299                                 RETURN(-EFAULT);
300                         }
301 #endif
302                 }
303
304                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
305                 if (rc == 0 && reccookie) {
306                         reccookie->lgc_lgl = loghandle->lgh_id;
307                         reccookie->lgc_index = idx;
308                         rc = 1;
309                 }
310                 RETURN(rc);
311         }
312
313         /* Make sure that records don't cross a chunk boundary, so we can
314          * process them page-at-a-time if needed.  If it will cross a chunk
315          * boundary, write in a fake (but referenced) entry to pad the chunk.
316          *
317          * We know that llog_current_log() will return a loghandle that is
318          * big enough to hold reclen, so all we care about is padding here.
319          */
320         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
321
322         /* NOTE: padding is a record, but no bit is set */
323         if (left != 0 && left != reclen &&
324             left < (reclen + LLOG_MIN_REC_SIZE)) {
325                  index = loghandle->lgh_last_idx + 1;
326                  rc = llog_lvfs_pad(obd, file, left, index);
327                  if (rc)
328                          RETURN(rc);
329                  loghandle->lgh_last_idx++; /*for pad rec*/
330          }
331          /* if it's the last idx in log file, then return -ENOSPC */
332          if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1)
333                  RETURN(-ENOSPC);
334         loghandle->lgh_last_idx++;
335         index = loghandle->lgh_last_idx;
336         LASSERT(index < LLOG_BITMAP_SIZE(llh));
337         rec->lrh_index = index;
338         if (buf == NULL) {
339                 lrt = (struct llog_rec_tail *)
340                         ((char *)rec + rec->lrh_len - sizeof(*lrt));
341                 lrt->lrt_len = rec->lrh_len;
342                 lrt->lrt_index = rec->lrh_index;
343         }
344         /*The caller should make sure only 1 process access the lgh_last_idx,
345          *Otherwise it might hit the assert.*/
346         LASSERT(index < LLOG_BITMAP_SIZE(llh));
347         if (ext2_set_bit(index, llh->llh_bitmap)) {
348                 CERROR("argh, index %u already set in log bitmap?\n", index);
349                 LBUG(); /* should never happen */
350         }
351         llh->llh_count++;
352         llh->llh_tail.lrt_index = index;
353
354         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
355         if (rc)
356                 RETURN(rc);
357
358         rc = llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
359         if (rc)
360                 RETURN(rc);
361
362         CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u \n",
363                loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
364         if (rc == 0 && reccookie) {
365                 reccookie->lgc_lgl = loghandle->lgh_id;
366                 reccookie->lgc_index = index;
367                 if ((rec->lrh_type == MDS_UNLINK_REC) ||
368                     (rec->lrh_type == MDS_SETATTR_REC) ||
369                     (rec->lrh_type == MDS_SETATTR64_REC))
370                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
371                 else if (rec->lrh_type == OST_SZ_REC)
372                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
373                 else if (rec->lrh_type == OST_RAID1_REC)
374                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
375                 else
376                         reccookie->lgc_subsys = -1;
377                 rc = 1;
378         }
379         if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
380                 rc = 1;
381
382         RETURN(rc);
383 }
384
385 /* We can skip reading at least as many log blocks as the number of
386 * minimum sized log records we are skipping.  If it turns out
387 * that we are not far enough along the log (because the
388 * actual records are larger than minimum size) we just skip
389 * some more records. */
390
391 static void llog_skip_over(__u64 *off, int curr, int goal)
392 {
393         if (goal <= curr)
394                 return;
395         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
396                 ~(LLOG_CHUNK_SIZE - 1);
397 }
398
399
400 /* sets:
401  *  - cur_offset to the furthest point read in the log file
402  *  - cur_idx to the log index preceeding cur_offset
403  * returns -EIO/-EINVAL on error
404  */
405 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
406                                 int next_idx, __u64 *cur_offset, void *buf,
407                                 int len)
408 {
409         int rc;
410         ENTRY;
411
412         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
413                 RETURN(-EINVAL);
414
415         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
416                next_idx, *cur_idx, *cur_offset);
417
418         while (*cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
419                 struct llog_rec_hdr *rec;
420                 struct llog_rec_tail *tail;
421                 loff_t ppos;
422
423                 llog_skip_over(cur_offset, *cur_idx, next_idx);
424
425                 ppos = *cur_offset;
426                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
427                                         loghandle->lgh_file, buf, len,
428                                         &ppos);
429                 if (rc) {
430                         CERROR("Cant read llog block at log id "LPU64
431                                "/%u offset "LPU64"\n",
432                                loghandle->lgh_id.lgl_oid,
433                                loghandle->lgh_id.lgl_ogen,
434                                *cur_offset);
435                         RETURN(rc);
436                 }
437
438                 /* put number of bytes read into rc to make code simpler */
439                 rc = ppos - *cur_offset;
440                 *cur_offset = ppos;
441
442                 if (rc < len) {
443                         /* signal the end of the valid buffer to llog_process */
444                         memset(buf + rc, 0, len - rc);
445                 }
446
447                 if (rc == 0) /* end of file, nothing to do */
448                         RETURN(0);
449
450                 if (rc < sizeof(*tail)) {
451                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
452                                LPU64"\n", loghandle->lgh_id.lgl_oid,
453                                loghandle->lgh_id.lgl_ogen, *cur_offset);
454                         RETURN(-EINVAL);
455                 }
456
457                 rec = buf;
458                 tail = (struct llog_rec_tail *)((char *)buf + rc -
459                                                 sizeof(struct llog_rec_tail));
460
461                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) {
462                         lustre_swab_llog_rec(rec, tail);
463                 }
464
465                 *cur_idx = tail->lrt_index;
466
467                 /* this shouldn't happen */
468                 if (tail->lrt_index == 0) {
469                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
470                                LPU64"\n", loghandle->lgh_id.lgl_oid,
471                                loghandle->lgh_id.lgl_ogen, *cur_offset);
472                         RETURN(-EINVAL);
473                 }
474                 if (tail->lrt_index < next_idx)
475                         continue;
476
477                 /* sanity check that the start of the new buffer is no farther
478                  * than the record that we wanted.  This shouldn't happen. */
479                 if (rec->lrh_index > next_idx) {
480                         CERROR("missed desired record? %u > %u\n",
481                                rec->lrh_index, next_idx);
482                         RETURN(-ENOENT);
483                 }
484                 RETURN(0);
485         }
486         RETURN(-EIO);
487 }
488
489 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
490                                 int prev_idx, void *buf, int len)
491 {
492         __u64 cur_offset;
493         int rc;
494         ENTRY;
495
496         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
497                 RETURN(-EINVAL);
498
499         CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
500
501         cur_offset = LLOG_CHUNK_SIZE;
502         llog_skip_over(&cur_offset, 0, prev_idx);
503
504         while (cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
505                 struct llog_rec_hdr *rec;
506                 struct llog_rec_tail *tail;
507                 loff_t ppos;
508
509                 ppos = cur_offset;
510
511                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
512                                         loghandle->lgh_file, buf, len,
513                                         &ppos);
514                 if (rc) {
515                         CERROR("Cant read llog block at log id "LPU64
516                                "/%u offset "LPU64"\n",
517                                loghandle->lgh_id.lgl_oid,
518                                loghandle->lgh_id.lgl_ogen,
519                                cur_offset);
520                         RETURN(rc);
521                 }
522
523                 /* put number of bytes read into rc to make code simpler */
524                 rc = ppos - cur_offset;
525                 cur_offset = ppos;
526
527                 if (rc == 0) /* end of file, nothing to do */
528                         RETURN(0);
529
530                 if (rc < sizeof(*tail)) {
531                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
532                                LPU64"\n", loghandle->lgh_id.lgl_oid,
533                                loghandle->lgh_id.lgl_ogen, cur_offset);
534                         RETURN(-EINVAL);
535                 }
536
537                 tail = buf + rc - sizeof(struct llog_rec_tail);
538
539                 /* this shouldn't happen */
540                 if (tail->lrt_index == 0) {
541                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
542                                LPU64"\n", loghandle->lgh_id.lgl_oid,
543                                loghandle->lgh_id.lgl_ogen, cur_offset);
544                         RETURN(-EINVAL);
545                 }
546                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
547                         continue;
548
549                 /* sanity check that the start of the new buffer is no farther
550                  * than the record that we wanted.  This shouldn't happen. */
551                 rec = buf;
552                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
553                         CERROR("missed desired record? %u > %u\n",
554                                le32_to_cpu(rec->lrh_index), prev_idx);
555                         RETURN(-ENOENT);
556                 }
557                 RETURN(0);
558         }
559         RETURN(-EIO);
560 }
561
562 static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
563 {
564         char *logname;
565         struct file *filp;
566         int len;
567
568         OBD_ALLOC(logname, PATH_MAX);
569         if (logname == NULL)
570                 return ERR_PTR(-ENOMEM);
571
572         len = snprintf(logname, PATH_MAX, "%s/%s", dir, name);
573         if (len >= PATH_MAX - 1) {
574                 filp = ERR_PTR(-ENAMETOOLONG);
575         } else {
576                 filp = l_filp_open(logname, flags, mode);
577                 if (IS_ERR(filp))
578                         CERROR("logfile creation %s: %ld\n", logname,
579                                PTR_ERR(filp));
580         }
581         OBD_FREE(logname, PATH_MAX);
582         return filp;
583 }
584
585 /* This is a callback from the llog_* functions.
586  * Assumes caller has already pushed us into the kernel context. */
587 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
588                             struct llog_logid *logid, char *name)
589 {
590         struct llog_handle *handle;
591         struct obd_device *obd;
592         struct l_dentry *dchild = NULL;
593         struct obdo *oa = NULL;
594         int rc = 0;
595         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
596         ENTRY;
597
598         handle = llog_alloc_handle();
599         if (handle == NULL)
600                 RETURN(-ENOMEM);
601         *res = handle;
602
603         LASSERT(ctxt);
604         LASSERT(ctxt->loc_exp);
605         obd = ctxt->loc_exp->exp_obd;
606
607         if (logid != NULL) {
608                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
609                                              logid->lgl_ogen, logid->lgl_oseq);
610
611                 if (IS_ERR(dchild)) {
612                         rc = PTR_ERR(dchild);
613                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
614                                logid->lgl_oid, logid->lgl_ogen, rc);
615                         GOTO(out, rc);
616                 }
617
618                 if (dchild->d_inode == NULL) {
619                         l_dput(dchild);
620                         rc = -ENOENT;
621                         CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n",
622                                logid->lgl_oid, logid->lgl_oseq, rc);
623                         GOTO(out, rc);
624                 }
625
626                 /* l_dentry_open will call dput(dchild) if there is an error */
627                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
628                                                     O_RDWR | O_LARGEFILE);
629                 if (IS_ERR(handle->lgh_file)) {
630                         rc = PTR_ERR(handle->lgh_file);
631                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
632                                logid->lgl_oid, logid->lgl_ogen, rc);
633                         GOTO(out, rc);
634                 }
635
636                 /* assign the value of lgh_id for handle directly */
637                 handle->lgh_id = *logid;
638
639         } else if (name) {
640                 handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
641                                                   name, open_flags, 0644);
642                 if (IS_ERR(handle->lgh_file))
643                         GOTO(out, rc = PTR_ERR(handle->lgh_file));
644
645                 handle->lgh_id.lgl_oseq = 1;
646                 handle->lgh_id.lgl_oid =
647                         handle->lgh_file->f_dentry->d_inode->i_ino;
648                 handle->lgh_id.lgl_ogen =
649                         handle->lgh_file->f_dentry->d_inode->i_generation;
650         } else {
651                 OBDO_ALLOC(oa);
652                 if (oa == NULL)
653                         GOTO(out, rc = -ENOMEM);
654
655                 oa->o_seq = FID_SEQ_LLOG;
656                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
657
658                 rc = obd_create(NULL, ctxt->loc_exp, oa, NULL, NULL);
659                 if (rc)
660                         GOTO(out, rc);
661
662                 /* FIXME: rationalize the misuse of o_generation in
663                  *        this API along with mds_obd_{create,destroy}.
664                  *        Hopefully it is only an internal API issue. */
665 #define o_generation o_parent_oid
666                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
667                                              oa->o_generation, oa->o_seq);
668
669                 if (IS_ERR(dchild))
670                         GOTO(out, rc = PTR_ERR(dchild));
671
672                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
673                                                  open_flags);
674                 if (IS_ERR(handle->lgh_file))
675                         GOTO(out, rc = PTR_ERR(handle->lgh_file));
676
677                 handle->lgh_id.lgl_oseq = oa->o_seq;
678                 handle->lgh_id.lgl_oid = oa->o_id;
679                 handle->lgh_id.lgl_ogen = oa->o_generation;
680         }
681
682         handle->lgh_ctxt = ctxt;
683 out:
684         if (rc)
685                 llog_free_handle(handle);
686
687         if (oa)
688                 OBDO_FREE(oa);
689         RETURN(rc);
690 }
691
692 static int llog_lvfs_close(struct llog_handle *handle)
693 {
694         int rc;
695         ENTRY;
696
697         rc = filp_close(handle->lgh_file, 0);
698         if (rc)
699                 CERROR("error closing log: rc %d\n", rc);
700         RETURN(rc);
701 }
702
703 static int llog_lvfs_destroy(struct llog_handle *handle)
704 {
705         struct dentry *fdentry;
706         struct obdo *oa;
707         struct obd_device *obd = handle->lgh_ctxt->loc_exp->exp_obd;
708         char *dir;
709         void *th;
710         struct inode *inode;
711         int rc, rc1;
712         ENTRY;
713
714         dir = MOUNT_CONFIGS_DIR;
715
716         fdentry = handle->lgh_file->f_dentry;
717         inode = fdentry->d_parent->d_inode;
718         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
719                 struct lvfs_run_ctxt saved;
720                 struct vfsmount *mnt = mntget(handle->lgh_file->f_vfsmnt);
721
722                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
723                 dget(fdentry);
724                 rc = llog_lvfs_close(handle);
725
726                 if (rc == 0) {
727                         LOCK_INODE_MUTEX_PARENT(inode);
728                         rc = ll_vfs_unlink(inode, fdentry, mnt);
729                         UNLOCK_INODE_MUTEX(inode);
730                 }
731                 mntput(mnt);
732
733                 dput(fdentry);
734                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
735                 RETURN(rc);
736         }
737
738         OBDO_ALLOC(oa);
739         if (oa == NULL)
740                 RETURN(-ENOMEM);
741
742         oa->o_id = handle->lgh_id.lgl_oid;
743         oa->o_seq = handle->lgh_id.lgl_oseq;
744         oa->o_generation = handle->lgh_id.lgl_ogen;
745 #undef o_generation
746         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
747
748         rc = llog_lvfs_close(handle);
749         if (rc)
750                 GOTO(out, rc);
751
752         th = fsfilt_start_log(obd, inode, FSFILT_OP_UNLINK, NULL, 1);
753         if (IS_ERR(th)) {
754                 CERROR("fsfilt_start failed: %ld\n", PTR_ERR(th));
755                 GOTO(out, rc = PTR_ERR(th));
756         }
757
758         rc = obd_destroy(NULL, handle->lgh_ctxt->loc_exp, oa,
759                          NULL, NULL, NULL, NULL);
760
761         rc1 = fsfilt_commit(obd, inode, th, 0);
762         if (rc == 0 && rc1 != 0)
763                 rc = rc1;
764  out:
765         OBDO_FREE(oa);
766         RETURN(rc);
767 }
768
769 /* reads the catalog list */
770 int llog_get_cat_list(struct obd_device *disk_obd,
771                       char *name, int idx, int count, struct llog_catid *idarray)
772 {
773         struct lvfs_run_ctxt saved;
774         struct l_file *file;
775         int rc, rc1 = 0;
776         int size = sizeof(*idarray) * count;
777         loff_t off = idx *  sizeof(*idarray);
778         ENTRY;
779
780         if (!count)
781                 RETURN(0);
782
783         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
784         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
785         if (!file || IS_ERR(file)) {
786                 rc = PTR_ERR(file);
787                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
788                        name, rc);
789                 GOTO(out, rc);
790         }
791
792         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
793                 CERROR("%s is not a regular file!: mode = %o\n", name,
794                        file->f_dentry->d_inode->i_mode);
795                 GOTO(out, rc = -ENOENT);
796         }
797
798         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
799                (int)i_size_read(file->f_dentry->d_inode), size);
800
801         /* read for new ost index or for empty file */
802         memset(idarray, 0, size);
803         if (i_size_read(file->f_dentry->d_inode) < off)
804                 GOTO(out, rc = 0);
805
806         rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
807         if (rc) {
808                 CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
809                 GOTO(out, rc);
810         }
811
812         EXIT;
813  out:
814         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
815         if (file && !IS_ERR(file))
816                 rc1 = filp_close(file, 0);
817         if (rc == 0)
818                 rc = rc1;
819         return rc;
820 }
821 EXPORT_SYMBOL(llog_get_cat_list);
822
823 /* writes the cat list */
824 int llog_put_cat_list(struct obd_device *disk_obd,
825                       char *name, int idx, int count, struct llog_catid *idarray)
826 {
827         struct lvfs_run_ctxt saved;
828         struct l_file *file;
829         int rc, rc1 = 0;
830         int size = sizeof(*idarray) * count;
831         loff_t off = idx * sizeof(*idarray);
832
833         if (!count)
834                 GOTO(out1, rc = 0);
835
836         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
837         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
838         if (!file || IS_ERR(file)) {
839                 rc = PTR_ERR(file);
840                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
841                        name, rc);
842                 GOTO(out, rc);
843         }
844
845         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
846                 CERROR("%s is not a regular file!: mode = %o\n", name,
847                        file->f_dentry->d_inode->i_mode);
848                 GOTO(out, rc = -ENOENT);
849         }
850
851         rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
852         if (rc) {
853                 CDEBUG(D_INODE,"OBD filter: error writeing %s: rc %d\n",
854                        name, rc);
855                 GOTO(out, rc);
856         }
857
858 out:
859         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
860         if (file && !IS_ERR(file))
861                 rc1 = filp_close(file, 0);
862
863         if (rc == 0)
864                 rc = rc1;
865 out1:
866         RETURN(rc);
867 }
868 EXPORT_SYMBOL(llog_put_cat_list);
869
870 struct llog_operations llog_lvfs_ops = {
871         lop_write_rec:   llog_lvfs_write_rec,
872         lop_next_block:  llog_lvfs_next_block,
873         lop_prev_block:  llog_lvfs_prev_block,
874         lop_read_header: llog_lvfs_read_header,
875         lop_create:      llog_lvfs_create,
876         lop_destroy:     llog_lvfs_destroy,
877         lop_close:       llog_lvfs_close,
878         //        lop_cancel: llog_lvfs_cancel,
879 };
880
881 EXPORT_SYMBOL(llog_lvfs_ops);
882
883 #else /* !__KERNEL__ */
884
885 static int llog_lvfs_read_header(struct llog_handle *handle)
886 {
887         LBUG();
888         return 0;
889 }
890
891 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
892                                struct llog_rec_hdr *rec,
893                                struct llog_cookie *reccookie, int cookiecount,
894                                void *buf, int idx)
895 {
896         LBUG();
897         return 0;
898 }
899
900 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
901                                 int next_idx, __u64 *cur_offset, void *buf,
902                                 int len)
903 {
904         LBUG();
905         return 0;
906 }
907
908 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
909                                 int prev_idx, void *buf, int len)
910 {
911         LBUG();
912         return 0;
913 }
914
915 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
916                             struct llog_logid *logid, char *name)
917 {
918         LBUG();
919         return 0;
920 }
921
922 static int llog_lvfs_close(struct llog_handle *handle)
923 {
924         LBUG();
925         return 0;
926 }
927
928 static int llog_lvfs_destroy(struct llog_handle *handle)
929 {
930         LBUG();
931         return 0;
932 }
933
934 int llog_get_cat_list(struct obd_device *disk_obd,
935                       char *name, int idx, int count, struct llog_catid *idarray)
936 {
937         LBUG();
938         return 0;
939 }
940
941 int llog_put_cat_list(struct obd_device *disk_obd,
942                       char *name, int idx, int count, struct llog_catid *idarray)
943 {
944         LBUG();
945         return 0;
946 }
947
948 struct llog_operations llog_lvfs_ops = {
949         lop_write_rec:   llog_lvfs_write_rec,
950         lop_next_block:  llog_lvfs_next_block,
951         lop_prev_block:  llog_lvfs_prev_block,
952         lop_read_header: llog_lvfs_read_header,
953         lop_create:      llog_lvfs_create,
954         lop_destroy:     llog_lvfs_destroy,
955         lop_close:       llog_lvfs_close,
956 //        lop_cancel:      llog_lvfs_cancel,
957 };
958 #endif