Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_FILTER
38
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
42 #include <linux/fs.h>
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
54 #endif
55
56 static kmem_cache_t *filter_open_cache;
57 static kmem_cache_t *filter_dentry_cache;
58
59 /* should be generic per-obd stats... */
60 struct xprocfs_io_stat {
61         __u64    st_read_bytes;
62         __u64    st_read_reqs;
63         __u64    st_write_bytes;
64         __u64    st_write_reqs;
65         __u64    st_getattr_reqs;
66         __u64    st_setattr_reqs;
67         __u64    st_create_reqs;
68         __u64    st_destroy_reqs;
69         __u64    st_statfs_reqs;
70         __u64    st_syncfs_reqs;
71         __u64    st_open_reqs;
72         __u64    st_close_reqs;
73         __u64    st_punch_reqs;
74 };
75
76 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
77 static struct proc_dir_entry *xprocfs_dir;
78
79 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count)                 \
80 do {                                                            \
81         xprocfs_iostats[smp_processor_id()].field += (count);   \
82 } while (0)
83
84 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
85 #define DECLARE_XPROCFS_SUM_STAT(field)                 \
86 static long long                                        \
87 xprocfs_sum_##field (void)                              \
88 {                                                       \
89         long long stat = 0;                             \
90         int       i;                                    \
91                                                         \
92         for (i = 0; i < smp_num_cpus; i++)              \
93                 stat += xprocfs_iostats[i].field;       \
94         return (stat);                                  \
95 }
96
97 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
98 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
99 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
100 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
101 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
102 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
103 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
104 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
105 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
106 DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs)
107 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
108 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
109 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
110 #endif
111
112 static int
113 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
114                  int  *eof, void *data)
115 {
116         long long (*fn)(void) = (long long(*)(void))data;
117         int         len;
118
119         *eof = 1;
120         if (off != 0)
121                 return (0);
122
123         len = snprintf (page, count, "%Ld\n", fn());
124         *start = page;
125         return (len);
126 }
127
128
129 static void
130 xprocfs_add_stat(char *name, long long (*fn)(void))
131 {
132         struct proc_dir_entry *entry;
133
134         entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
135         if (entry == NULL) {
136                 CERROR ("Can't add procfs stat %s\n", name);
137                 return;
138         }
139
140         entry->data = fn;
141         entry->read_proc = xprocfs_rd_stat;
142         entry->write_proc = NULL;
143 }
144
145 static void
146 xprocfs_init (char *name)
147 {
148         char  dirname[64];
149
150         snprintf (dirname, sizeof (dirname), "sys/%s", name);
151
152         xprocfs_dir = proc_mkdir ("sys/obdfilter", NULL);
153         if (xprocfs_dir == NULL) {
154                 CERROR ("Can't make dir\n");
155                 return;
156         }
157
158 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
159         xprocfs_add_stat ("read_bytes",   xprocfs_sum_st_read_bytes);
160         xprocfs_add_stat ("read_reqs",    xprocfs_sum_st_read_reqs);
161         xprocfs_add_stat ("write_bytes",  xprocfs_sum_st_write_bytes);
162         xprocfs_add_stat ("write_reqs",   xprocfs_sum_st_write_reqs);
163         xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
164         xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
165         xprocfs_add_stat ("create_reqs",  xprocfs_sum_st_create_reqs);
166         xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
167         xprocfs_add_stat ("statfs_reqs",  xprocfs_sum_st_statfs_reqs);
168         xprocfs_add_stat ("syncfs_reqs",  xprocfs_sum_st_syncfs_reqs);
169         xprocfs_add_stat ("open_reqs",    xprocfs_sum_st_open_reqs);
170         xprocfs_add_stat ("close_reqs",   xprocfs_sum_st_close_reqs);
171         xprocfs_add_stat ("punch_reqs",   xprocfs_sum_st_punch_reqs);
172 #endif
173 }
174
175 void xprocfs_fini (void)
176 {
177         if (xprocfs_dir == NULL)
178                 return;
179
180         remove_proc_entry ("read_bytes",   xprocfs_dir);
181         remove_proc_entry ("read_reqs",    xprocfs_dir);
182         remove_proc_entry ("write_bytes",  xprocfs_dir);
183         remove_proc_entry ("write_reqs",   xprocfs_dir);
184         remove_proc_entry ("getattr_reqs", xprocfs_dir);
185         remove_proc_entry ("setattr_reqs", xprocfs_dir);
186         remove_proc_entry ("create_reqs",  xprocfs_dir);
187         remove_proc_entry ("destroy_reqs", xprocfs_dir);
188         remove_proc_entry ("statfs_reqs",  xprocfs_dir);
189         remove_proc_entry ("syncfs_reqs",  xprocfs_dir);
190         remove_proc_entry ("open_reqs",    xprocfs_dir);
191         remove_proc_entry ("close_reqs",   xprocfs_dir);
192         remove_proc_entry ("punch_reqs",   xprocfs_dir);
193
194         remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
195         xprocfs_dir = NULL;
196 }
197
198 #define S_SHIFT 12
199 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
200         [0]                     NULL,
201         [S_IFREG >> S_SHIFT]    "R",
202         [S_IFDIR >> S_SHIFT]    "D",
203         [S_IFCHR >> S_SHIFT]    "C",
204         [S_IFBLK >> S_SHIFT]    "B",
205         [S_IFIFO >> S_SHIFT]    "F",
206         [S_IFSOCK >> S_SHIFT]   "S",
207         [S_IFLNK >> S_SHIFT]    "L"
208 };
209
210 static inline const char *obd_mode_to_type(int mode)
211 {
212         return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
213 }
214
215 static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
216                                 int error)
217 {
218         CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
219                last_rcvd, error);
220         if (!error && last_rcvd > obd->obd_last_committed)
221                 obd->obd_last_committed = last_rcvd;
222 }
223
224 void filter_start_transno(struct obd_export *export)
225 {
226 #ifdef FILTER_TRANSNO_SEM
227         struct obd_device * obd = export->exp_obd;
228         ENTRY;
229
230         down(&obd->u.filter.fo_transno_sem);
231 #endif
232 }
233
234 /* Assumes caller has already pushed us into the kernel context. */
235 int filter_finish_transno(struct obd_export *export, void *handle,
236                           struct obd_trans_info *oti, int rc)
237 {
238         __u64 last_rcvd;
239         struct obd_device *obd = export->exp_obd;
240         struct filter_obd *filter = &obd->u.filter;
241         struct filter_export_data *fed = &export->exp_filter_data;
242         struct filter_client_data *fcd = fed->fed_fcd;
243         loff_t off;
244         ssize_t written;
245
246         /* Propagate error code. */
247         if (rc) {
248 #ifdef FILTER_TRANSNO_SEM
249                 up(&filter->fo_transno_sem);
250 #endif
251                 RETURN(rc);
252         }
253
254         if (!(obd->obd_flags & OBD_REPLAYABLE)) {
255                 RETURN(0);
256         }
257
258         /* we don't allocate new transnos for replayed requests */
259 #if 0
260         /* perhaps if transno already set? or should level be in oti? */
261         if (req->rq_level == LUSTRE_CONN_RECOVD)
262                 GOTO(out, rc = 0);
263 #endif
264
265         off = fed->fed_lr_off;
266
267 #ifndef FILTER_TRANSNO_SEM
268         spin_lock(&filter->fo_translock);
269 #endif
270         last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
271         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
272 #ifndef FILTER_TRANSNO_SEM
273         spin_unlock(&filter->fo_translock);
274 #endif
275         if (oti)
276                 oti->oti_transno = last_rcvd;
277         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
278         fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
279
280         /* get this from oti */
281 #if 0
282         if (oti)
283                 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
284         else
285 #else
286         fcd->fcd_last_xid = 0;
287 #endif
288         fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_last_rcvd_cb);
289         written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
290                                 &off);
291         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
292                LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
293
294 #ifdef FILTER_TRANSNO_SEM
295         up(&filter->fo_transno_sem);
296 #endif
297         if (written == sizeof(*fcd))
298                 RETURN(0);
299         CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
300         if (written >= 0)
301                 RETURN(-EIO);
302
303         RETURN(written);
304 }
305
306 /* write the pathname into the string */
307 static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
308                      obd_mode mode)
309 {
310         if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
311                 sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
312         else
313                 sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
314                        (int)id & (filter->fo_subdir_count - 1), id);
315
316         return buf;
317 }
318
319 static inline void f_dput(struct dentry *dentry)
320 {
321         /* Can't go inside filter_ddelete because it can block */
322         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
323                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
324         LASSERT(atomic_read(&dentry->d_count) > 0);
325
326         dput(dentry);
327 }
328
329 /* Not racy w.r.t. others, because we are the only user of this dentry */
330 static void filter_drelease(struct dentry *dentry)
331 {
332         if (dentry->d_fsdata)
333                 kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
334 }
335
336 struct dentry_operations filter_dops = {
337         .d_release = filter_drelease,
338 };
339
340 #define LAST_RCVD "last_rcvd"
341 #define INIT_OBJID 2
342
343 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
344 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
345 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
346
347 /* Add client data to the FILTER.  We use a bitmap to locate a free space
348  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
349  * Otherwise, we have just read the data from the last_rcvd file and
350  * we know its offset.
351  */
352 int filter_client_add(struct filter_obd *filter,
353                       struct filter_export_data *fed, int cl_idx)
354 {
355         int new_client = (cl_idx == -1);
356
357         LASSERT(filter->fo_last_rcvd_slots != NULL);
358
359         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
360          * there's no need for extra complication here
361          */
362         if (new_client) {
363                 cl_idx = find_first_zero_bit(filter->fo_last_rcvd_slots,
364                                              FILTER_LR_MAX_CLIENTS);
365         repeat:
366                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
367                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
368                         return -ENOMEM;
369                 }
370                 if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
371                         CERROR("FILTER client %d: found bit is set in bitmap\n",
372                                cl_idx);
373                         cl_idx = find_next_zero_bit(filter->fo_last_rcvd_slots,
374                                                     FILTER_LR_MAX_CLIENTS,
375                                                     cl_idx);
376                         goto repeat;
377                 }
378         } else {
379                 if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
380                         CERROR("FILTER client %d: bit already set in bitmap!\n",
381                                cl_idx);
382                         LBUG();
383                 }
384         }
385
386         fed->fed_lr_idx = cl_idx;
387         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
388                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
389
390         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
391                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
392
393         if (new_client) {
394                 struct obd_run_ctxt saved;
395                 loff_t off = fed->fed_lr_off;
396                 ssize_t written;
397
398                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
399                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
400
401                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
402                 written = lustre_fwrite(filter->fo_rcvd_filp,
403                                                 (char *)fed->fed_fcd,
404                                                 sizeof(*fed->fed_fcd), &off);
405                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
406
407                 if (written != sizeof(*fed->fed_fcd)) {
408                         if (written < 0)
409                                 RETURN(written);
410                         RETURN(-EIO);
411                 }
412         }
413         return 0;
414 }
415
416 int filter_client_free(struct obd_export *exp)
417 {
418         struct filter_export_data *fed = &exp->exp_filter_data;
419         struct filter_obd *filter = &exp->exp_obd->u.filter;
420         struct filter_client_data zero_fcd;
421         struct obd_run_ctxt saved;
422         int written;
423         loff_t off;
424
425         if (!fed->fed_fcd)
426                 RETURN(0);
427
428         LASSERT(filter->fo_last_rcvd_slots != NULL);
429
430         off = fed->fed_lr_off;
431
432         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
433                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
434
435         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
436                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
437                        fed->fed_lr_idx);
438                 LBUG();
439         }
440
441         memset(&zero_fcd, 0, sizeof zero_fcd);
442         push_ctxt(&saved, &filter->fo_ctxt, NULL);
443         written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
444                                 sizeof(zero_fcd), &off);
445
446         /* XXX: this write gets lost sometimes, unless this sync is here. */
447         file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
448         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
449
450         if (written != sizeof(zero_fcd)) {
451                 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
452                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
453                        LAST_RCVD, written);
454         } else {
455                 CDEBUG(D_INFO,
456                        "zeroed disconnecting client %s at idx %u (%llu)\n",
457                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
458         }
459
460         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
461
462         return 0;
463 }
464
465 static int filter_free_server_data(struct filter_obd *filter)
466 {
467         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
468         filter->fo_fsd = NULL;
469         OBD_FREE(filter->fo_last_rcvd_slots,
470                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
471         filter->fo_last_rcvd_slots = NULL;
472         return 0;
473 }
474
475
476 /* assumes caller is already in kernel ctxt */
477 static int filter_update_server_data(struct file *filp,
478                                      struct filter_server_data *fsd)
479 {
480         loff_t off = 0;
481         int rc;
482
483         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
484         CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
485                le64_to_cpu(fsd->fsd_last_objid));
486         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
487                le64_to_cpu(fsd->fsd_last_rcvd));
488         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
489                le64_to_cpu(fsd->fsd_mount_count));
490
491         rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
492         if (rc != sizeof(*fsd)) {
493                 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
494                        rc);
495                 RETURN(-EIO);
496         }
497         RETURN(0);
498 }
499
500 /* assumes caller has already in kernel ctxt */
501 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
502                                    __u64 init_lastobjid)
503 {
504         struct filter_obd *filter = &obd->u.filter;
505         struct filter_server_data *fsd;
506         struct filter_client_data *fcd = NULL;
507         struct inode *inode = filp->f_dentry->d_inode;
508         unsigned long last_rcvd_size = inode->i_size;
509         __u64 mount_count = 0;
510         int cl_idx;
511         loff_t off = 0;
512         int rc;
513
514         /* ensure padding in the struct is the correct size */
515         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
516                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
517         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
518                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
519
520         OBD_ALLOC(fsd, sizeof(*fsd));
521         if (!fsd)
522                 RETURN(-ENOMEM);
523         filter->fo_fsd = fsd;
524
525         OBD_ALLOC(filter->fo_last_rcvd_slots, 
526                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
527         if (filter->fo_last_rcvd_slots == NULL) {
528                 OBD_FREE(fsd, sizeof(*fsd));
529                 RETURN(-ENOMEM);
530         }
531
532         if (last_rcvd_size == 0) {
533                 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
534
535                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
536                 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
537                 fsd->fsd_last_rcvd = 0;
538                 mount_count = fsd->fsd_mount_count = 0;
539                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
540                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
541                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
542                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
543                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
544         } else {
545                 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
546                                               &off);
547                 if (retval != sizeof(*fsd)) {
548                         CDEBUG(D_INODE,"OBD filter: error reading %s\n",
549                                LAST_RCVD);
550                         GOTO(err_fsd, rc = -EIO);
551                 }
552                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
553                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
554         }
555
556         if (fsd->fsd_feature_incompat) {
557                 CERROR("unsupported feature %x\n",
558                        le32_to_cpu(fsd->fsd_feature_incompat));
559                 GOTO(err_fsd, rc = -EINVAL);
560         }
561         if (fsd->fsd_feature_rocompat) {
562                 CERROR("read-only feature %x\n",
563                        le32_to_cpu(fsd->fsd_feature_rocompat));
564                 /* Do something like remount filesystem read-only */
565                 GOTO(err_fsd, rc = -EINVAL);
566         }
567
568         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
569                obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
570         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
571                obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
572         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
573                obd->obd_name, mount_count);
574         CDEBUG(D_INODE, "%s: server data size: %u\n",
575                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
576         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
577                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
578         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
579                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
580         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
581                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
582
583         /*
584          * When we do a clean FILTER shutdown, we save the last_rcvd into
585          * the header.  If we find clients with higher last_rcvd values
586          * then those clients may need recovery done.
587          */
588         if (!(obd->obd_flags & OBD_REPLAYABLE)) {
589                 CERROR("%s: recovery support OFF\n", obd->obd_name);
590                 GOTO(out, rc = 0);
591         }
592
593         for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
594                 __u64 last_rcvd;
595                 int mount_age;
596
597                 if (!fcd) {
598                         OBD_ALLOC(fcd, sizeof(*fcd));
599                         if (!fcd)
600                                 GOTO(err_fsd, rc = -ENOMEM);
601                 }
602
603                 /* Don't assume off is incremented properly, in case
604                  * sizeof(fsd) isn't the same as fsd->fsd_client_size.
605                  */
606                 off = le32_to_cpu(fsd->fsd_client_start) +
607                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
608                 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
609                 if (rc != sizeof(*fcd)) {
610                         CERROR("error reading FILTER %s offset %d: rc = %d\n",
611                                LAST_RCVD, cl_idx, rc);
612                         if (rc > 0) /* XXX fatal error or just abort reading? */
613                                 rc = -EIO;
614                         break;
615                 }
616
617                 if (fcd->fcd_uuid[0] == '\0') {
618                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
619                                cl_idx);
620                         continue;
621                 }
622
623                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
624
625                 /* These exports are cleaned up by filter_disconnect(), so they
626                  * need to be set up like real exports as filter_connect() does.
627                  */
628                 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
629                 if (mount_age < FILTER_MOUNT_RECOV) {
630                         struct obd_export *exp = class_new_export(obd);
631                         struct filter_export_data *fed;
632                         CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
633                                " srv lr: "LPU64" mnt: "LPU64" last mount: "
634                                LPU64"\n", fcd->fcd_uuid, cl_idx,
635                                last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
636                                le64_to_cpu(fcd->fcd_mount_count), mount_count);
637                         /* disabled until OST recovery is actually working */
638
639                         if (!exp) {
640                                 rc = -ENOMEM;
641                                 break;
642                         }
643                         memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
644                                sizeof exp->exp_client_uuid.uuid);
645                         fed = &exp->exp_filter_data;
646                         fed->fed_fcd = fcd;
647                         filter_client_add(filter, fed, cl_idx);
648                         /* create helper if export init gets more complex */
649                         INIT_LIST_HEAD(&fed->fed_open_head);
650                         spin_lock_init(&fed->fed_lock);
651
652                         fcd = NULL;
653                         obd->obd_recoverable_clients++;
654                 } else {
655                         CDEBUG(D_INFO,
656                                "discarded client %d UUID '%s' count "LPU64"\n",
657                                cl_idx, fcd->fcd_uuid,
658                                le64_to_cpu(fcd->fcd_mount_count));
659                 }
660
661                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
662                        cl_idx, last_rcvd);
663
664                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
665                         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
666
667                 obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
668                 if (obd->obd_recoverable_clients) {
669                         CERROR("RECOVERY: %d recoverable clients, last_rcvd "
670                                LPU64"\n", obd->obd_recoverable_clients,
671                                le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
672                         obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
673                         obd->obd_flags |= OBD_RECOVERING;
674                 }
675
676                 if (fcd)
677                         OBD_FREE(fcd, sizeof(*fcd));
678
679         }
680
681 out:
682         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
683
684         /* save it,so mount count and last_recvd is current */
685         rc = filter_update_server_data(filp, filter->fo_fsd);
686
687         RETURN(rc);
688
689 err_fsd:
690         filter_free_server_data(filter);
691         RETURN(rc);
692 }
693
694 /* setup the object store with correct subdirectories */
695 static int filter_prep(struct obd_device *obd)
696 {
697         struct obd_run_ctxt saved;
698         struct filter_obd *filter = &obd->u.filter;
699         struct dentry *dentry, *O_dentry;
700         struct file *file;
701         struct inode *inode;
702         int i;
703         int rc = 0;
704         int mode = 0;
705
706         push_ctxt(&saved, &filter->fo_ctxt, NULL);
707         dentry = simple_mkdir(current->fs->pwd, "O", 0700);
708         CDEBUG(D_INODE, "got/created O: %p\n", dentry);
709         if (IS_ERR(dentry)) {
710                 rc = PTR_ERR(dentry);
711                 CERROR("cannot open/create O: rc = %d\n", rc);
712                 GOTO(out, rc);
713         }
714         filter->fo_dentry_O = dentry;
715
716         /*
717          * Create directories and/or get dentries for each object type.
718          * This saves us from having to do multiple lookups for each one.
719          */
720         O_dentry = filter->fo_dentry_O;
721         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
722                 char *name = obd_type_by_mode[mode];
723
724                 if (!name) {
725                         filter->fo_dentry_O_mode[mode] = NULL;
726                         continue;
727                 }
728                 dentry = simple_mkdir(O_dentry, name, 0700);
729                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
730                 if (IS_ERR(dentry)) {
731                         rc = PTR_ERR(dentry);
732                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
733                         GOTO(err_O_mode, rc);
734                 }
735                 filter->fo_dentry_O_mode[mode] = dentry;
736         }
737
738         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
739         if (!file || IS_ERR(file)) {
740                 rc = PTR_ERR(file);
741                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
742                        LAST_RCVD, rc);
743                 GOTO(err_O_mode, rc);
744         }
745
746         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
747                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
748                        file->f_dentry->d_inode->i_mode);
749                 GOTO(err_filp, rc = -ENOENT);
750         }
751
752         rc = fsfilt_journal_data(obd, file);
753         if (rc) {
754                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
755                 GOTO(err_filp, rc);
756         }
757         /* steal operations */
758         inode = file->f_dentry->d_inode;
759         filter->fo_fop = file->f_op;
760         filter->fo_iop = inode->i_op;
761         filter->fo_aops = inode->i_mapping->a_ops;
762
763         rc = filter_init_server_data(obd, file, INIT_OBJID);
764         if (rc) {
765                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
766                 GOTO(err_client, rc);
767         }
768         filter->fo_rcvd_filp = file;
769
770         if (filter->fo_subdir_count) {
771                 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
772                 OBD_ALLOC(filter->fo_dentry_O_sub,
773                           filter->fo_subdir_count * sizeof(dentry));
774                 if (!filter->fo_dentry_O_sub)
775                         GOTO(err_client, rc = -ENOMEM);
776
777                 for (i = 0; i < filter->fo_subdir_count; i++) {
778                         char dir[20];
779                         snprintf(dir, sizeof(dir), "d%u", i);
780
781                         dentry = simple_mkdir(O_dentry, dir, 0700);
782                         CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
783                         if (IS_ERR(dentry)) {
784                                 rc = PTR_ERR(dentry);
785                                 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
786                                 GOTO(err_O_sub, rc);
787                         }
788                         filter->fo_dentry_O_sub[i] = dentry;
789                 }
790         }
791         rc = 0;
792  out:
793         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
794
795         return(rc);
796
797 err_O_sub:
798         while (i-- > 0) {
799                 struct dentry *dentry = filter->fo_dentry_O_sub[i];
800                 if (dentry) {
801                         f_dput(dentry);
802                         filter->fo_dentry_O_sub[i] = NULL;
803                 }
804         }
805         OBD_FREE(filter->fo_dentry_O_sub,
806                  filter->fo_subdir_count * sizeof(dentry));
807 err_client:
808         class_disconnect_all(obd);
809 err_filp:
810         if (filp_close(file, 0))
811                 CERROR("can't close %s after error\n", LAST_RCVD);
812         filter->fo_rcvd_filp = NULL;
813 err_O_mode:
814         while (mode-- > 0) {
815                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
816                 if (dentry) {
817                         f_dput(dentry);
818                         filter->fo_dentry_O_mode[mode] = NULL;
819                 }
820         }
821         f_dput(filter->fo_dentry_O);
822         filter->fo_dentry_O = NULL;
823         goto out;
824 }
825
826 /* cleanup the filter: write last used object id to status file */
827 static void filter_post(struct obd_device *obd)
828 {
829         struct obd_run_ctxt saved;
830         struct filter_obd *filter = &obd->u.filter;
831         long rc;
832         int mode;
833
834         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
835          * best to start a transaction with h_sync, because we removed this
836          * from lastobjid */
837
838         push_ctxt(&saved, &filter->fo_ctxt, NULL);
839         rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
840         if (rc)
841                 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
842
843
844         if (filter->fo_rcvd_filp) {
845                 rc = file_fsync(filter->fo_rcvd_filp,
846                                 filter->fo_rcvd_filp->f_dentry, 1);
847                 filp_close(filter->fo_rcvd_filp, 0);
848                 filter->fo_rcvd_filp = NULL;
849                 if (rc)
850                         CERROR("last_rcvd file won't closed rc = %ld\n", rc);
851         }
852
853         if (filter->fo_subdir_count) {
854                 int i;
855                 for (i = 0; i < filter->fo_subdir_count; i++) {
856                         struct dentry *dentry = filter->fo_dentry_O_sub[i];
857                         f_dput(dentry);
858                         filter->fo_dentry_O_sub[i] = NULL;
859                 }
860                 OBD_FREE(filter->fo_dentry_O_sub,
861                          filter->fo_subdir_count *
862                          sizeof(*filter->fo_dentry_O_sub));
863         }
864         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
865                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
866                 if (dentry) {
867                         f_dput(dentry);
868                         filter->fo_dentry_O_mode[mode] = NULL;
869                 }
870         }
871         f_dput(filter->fo_dentry_O);
872         filter_free_server_data(filter);
873         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
874 }
875
876
877 static __u64 filter_next_id(struct obd_device *obd)
878 {
879         obd_id id;
880         LASSERT(obd->u.filter.fo_fsd != NULL);
881
882         spin_lock(&obd->u.filter.fo_objidlock);
883         id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid);
884         obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
885         spin_unlock(&obd->u.filter.fo_objidlock);
886
887         return id;
888 }
889
890 /* how to get files, dentries, inodes from object id's */
891 /* parent i_sem is already held if needed for exclusivity */
892 static struct dentry *filter_fid2dentry(struct obd_device *obd,
893                                         struct dentry *dparent,
894                                         __u64 id, int lockit)
895 {
896         struct super_block *sb = obd->u.filter.fo_sb;
897         struct dentry *dchild;
898         char name[32];
899         int len;
900         ENTRY;
901
902         if (!sb || !sb->s_dev) {
903                 CERROR("fatal: device not initialized.\n");
904                 RETURN(ERR_PTR(-ENXIO));
905         }
906
907         if (id == 0) {
908                 CERROR("fatal: invalid object id 0\n");
909                 LBUG();
910                 RETURN(ERR_PTR(-ESTALE));
911         }
912
913         len = sprintf(name, LPU64, id);
914         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
915                dparent->d_name.len, dparent->d_name.name, name);
916         if (lockit)
917                 down(&dparent->d_inode->i_sem);
918         dchild = lookup_one_len(name, dparent, len);
919         if (lockit)
920                 up(&dparent->d_inode->i_sem);
921         if (IS_ERR(dchild)) {
922                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
923                 RETURN(dchild);
924         }
925
926         CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
927                dparent->d_name.len, dparent->d_name.name, name, dchild,
928                atomic_read(&dchild->d_count));
929
930         LASSERT(atomic_read(&dchild->d_count) > 0);
931
932         RETURN(dchild);
933 }
934
935 static inline struct dentry *filter_parent(struct obd_device *obd,
936                                            obd_mode mode, obd_id objid)
937 {
938         struct filter_obd *filter = &obd->u.filter;
939
940         LASSERT((mode & S_IFMT) == S_IFREG);   /* only regular files for now */
941         if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
942                 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
943
944         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
945 }
946
947 static struct file *filter_obj_open(struct obd_export *export,
948                                     __u64 id, __u32 type)
949 {
950         struct filter_obd *filter = &export->exp_obd->u.filter;
951         struct super_block *sb = filter->fo_sb;
952         struct dentry *dentry;
953         struct filter_export_data *fed = &export->exp_filter_data;
954         struct filter_dentry_data *fdd;
955         struct filter_file_data *ffd;
956         struct obd_run_ctxt saved;
957         char name[24];
958         struct file *file;
959         ENTRY;
960
961         if (!sb || !sb->s_dev) {
962                 CERROR("fatal: device not initialized.\n");
963                 RETURN(ERR_PTR(-ENXIO));
964         }
965
966         if (!id) {
967                 CERROR("fatal: invalid obdo "LPU64"\n", id);
968                 RETURN(ERR_PTR(-ESTALE));
969         }
970
971         if (!(type & S_IFMT)) {
972                 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
973                        __FUNCTION__, id, type);
974                 RETURN(ERR_PTR(-EINVAL));
975         }
976
977         PORTAL_SLAB_ALLOC(ffd, filter_open_cache, sizeof(*ffd));
978         if (!ffd) {
979                 CERROR("obdfilter: out of memory\n");
980                 RETURN(ERR_PTR(-ENOMEM));
981         }
982
983         /* We preallocate this to avoid blocking while holding fo_fddlock */
984         fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
985         if (!fdd) {
986                 CERROR("obdfilter: out of memory\n");
987                 GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
988         }
989
990         push_ctxt(&saved, &filter->fo_ctxt, NULL);
991         file = filp_open(filter_id(name, filter, id, type),
992                          O_RDWR | O_LARGEFILE, type);
993         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
994
995         if (IS_ERR(file)) {
996                 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
997                 GOTO(out_fdd, file);
998         }
999
1000         dentry = file->f_dentry;
1001         spin_lock(&filter->fo_fddlock);
1002         if (dentry->d_fsdata) {
1003                 spin_unlock(&filter->fo_fddlock);
1004                 kmem_cache_free(filter_dentry_cache, fdd);
1005                 fdd = dentry->d_fsdata;
1006                 LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
1007                 /* should only happen during client recovery */
1008                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1009                         CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1010                 atomic_inc(&fdd->fdd_open_count);
1011         } else {
1012                 atomic_set(&fdd->fdd_open_count, 1);
1013                 fdd->fdd_flags = 0;
1014                 fdd->fdd_objid = id;
1015                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1016                 dentry->d_fsdata = fdd;
1017                 spin_unlock(&filter->fo_fddlock);
1018         }
1019
1020         get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
1021         ffd->ffd_file = file;
1022         LASSERT(file->private_data == NULL);
1023         file->private_data = ffd;
1024
1025         if (!dentry->d_op)
1026                 dentry->d_op = &filter_dops;
1027         else
1028                 LASSERT(dentry->d_op == &filter_dops);
1029
1030         spin_lock(&fed->fed_lock);
1031         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1032         spin_unlock(&fed->fed_lock);
1033
1034         CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1035         EXIT;
1036 out:
1037         return file;
1038
1039 out_fdd:
1040         kmem_cache_free(filter_dentry_cache, fdd);
1041 out_ffd:
1042         ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
1043         PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1044         goto out;
1045 }
1046
1047 /* Caller must hold i_sem on dir_dentry->d_inode */
1048 /* Caller must push us into kernel context */
1049 static int filter_destroy_internal(struct obd_device *obd,
1050                                    struct dentry *dir_dentry,
1051                                    struct dentry *object_dentry)
1052 {
1053         struct inode *inode = object_dentry->d_inode;
1054         int rc;
1055         ENTRY;
1056
1057         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1058                 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1059                        object_dentry->d_name.len,
1060                        object_dentry->d_name.name,
1061                        inode->i_nlink, atomic_read(&inode->i_count));
1062         }
1063
1064         rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
1065
1066         if (rc)
1067                 CERROR("error unlinking objid %*s: rc %d\n",
1068                        object_dentry->d_name.len,
1069                        object_dentry->d_name.name, rc);
1070
1071         RETURN(rc);
1072 }
1073
1074 static int filter_close_internal(struct obd_export *export,
1075                                  struct filter_file_data *ffd,
1076                                  struct obd_trans_info *oti)
1077 {
1078         struct obd_device *obd = export->exp_obd;
1079         struct filter_obd *filter = &obd->u.filter;
1080         struct file *filp = ffd->ffd_file;
1081         struct dentry *object_dentry = dget(filp->f_dentry);
1082         struct filter_dentry_data *fdd = object_dentry->d_fsdata;
1083         int rc, rc2;
1084         ENTRY;
1085
1086         LASSERT(filp->private_data == ffd);
1087         LASSERT(fdd);
1088
1089         rc = filp_close(filp, 0);
1090
1091         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1092             fdd->fdd_flags & FILTER_FLAG_DESTROY) {
1093                 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, fdd->fdd_objid);
1094                 struct obd_run_ctxt saved;
1095                 void *handle;
1096
1097                 down(&dir_dentry->d_inode->i_sem);
1098                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1099                 filter_start_transno(export);
1100                 handle = fsfilt_start(obd, dir_dentry->d_inode,
1101                                       FSFILT_OP_UNLINK);
1102                 if (IS_ERR(handle)) {
1103                         rc = filter_finish_transno(export, handle, oti,
1104                                                    PTR_ERR(handle));
1105                         GOTO(out, rc);
1106                 }
1107                 /* XXX unlink from PENDING directory now too */
1108                 rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
1109                 if (rc2 && !rc)
1110                         rc = rc2;
1111                 rc = filter_finish_transno(export, handle, oti, rc);
1112                 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1113                 if (rc2) {
1114                         CERROR("error on commit, err = %d\n", rc2);
1115                         if (!rc)
1116                                 rc = rc2;
1117                 }
1118         out:
1119                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1120                 up(&dir_dentry->d_inode->i_sem);
1121         }
1122
1123         f_dput(object_dentry);
1124         PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1125
1126         RETURN(rc);
1127 }
1128
1129 /* obd methods */
1130 /* mount the file system (secretly) */
1131 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1132                                char *option)
1133 {
1134         struct obd_ioctl_data* data = buf;
1135         struct filter_obd *filter;
1136         struct vfsmount *mnt;
1137         int rc = 0;
1138         ENTRY;
1139
1140         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1141                 RETURN(-EINVAL);
1142
1143         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1144         if (IS_ERR(obd->obd_fsops))
1145                 RETURN(PTR_ERR(obd->obd_fsops));
1146
1147         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1148         rc = PTR_ERR(mnt);
1149         if (IS_ERR(mnt))
1150                 GOTO(err_ops, rc);
1151
1152 #if OST_RECOVERY
1153         obd->obd_flags |= OBD_REPLAYABLE;
1154 #endif
1155
1156         filter = &obd->u.filter;
1157         filter->fo_vfsmnt = mnt;
1158         filter->fo_fstype = strdup(data->ioc_inlbuf2);
1159         filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
1160         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1161
1162         OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1163         filter->fo_ctxt.pwdmnt = mnt;
1164         filter->fo_ctxt.pwd = mnt->mnt_root;
1165         filter->fo_ctxt.fs = get_ds();
1166
1167         rc = filter_prep(obd);
1168         if (rc)
1169                 GOTO(err_kfree, rc);
1170
1171 #ifdef FILTER_TRANSNO_SEM
1172         init_MUTEX(&filter->fo_transno_sem);
1173 #else
1174         spin_lock_init(&filter->fo_translock);
1175 #endif
1176         spin_lock_init(&filter->fo_fddlock);
1177         spin_lock_init(&filter->fo_objidlock);
1178         INIT_LIST_HEAD(&filter->fo_export_list);
1179
1180         obd->obd_namespace =
1181                 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
1182         if (!obd->obd_namespace)
1183                 GOTO(err_post, rc = -ENOMEM);
1184
1185         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1186                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1187
1188         RETURN(0);
1189
1190 err_post:
1191         filter_post(obd);
1192 err_kfree:
1193         kfree(filter->fo_fstype);
1194         unlock_kernel();
1195         mntput(filter->fo_vfsmnt);
1196         filter->fo_sb = 0;
1197         lock_kernel();
1198 err_ops:
1199         fsfilt_put_ops(obd->obd_fsops);
1200         return rc;
1201 }
1202
1203 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1204 {
1205         return filter_common_setup(obd, len, buf, NULL);
1206 }
1207
1208 /* sanobd setup methods - use a specific mount option */
1209 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1210 {
1211         struct obd_ioctl_data* data = buf;
1212         char *option = NULL;
1213
1214         if (!data->ioc_inlbuf2)
1215                 RETURN(-EINVAL);
1216
1217         /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1218         if (!strcmp(data->ioc_inlbuf2, "extN") ||
1219             !strcmp(data->ioc_inlbuf2, "ext3"))
1220                 option = "data=writeback";
1221         else
1222                 LBUG(); /* just a reminder */
1223
1224         return filter_common_setup(obd, len, buf, option);
1225 }
1226
1227 static int filter_cleanup(struct obd_device *obd)
1228 {
1229         struct super_block *sb;
1230         ENTRY;
1231
1232         if (!list_empty(&obd->obd_exports)) {
1233                 CERROR("still has clients!\n");
1234                 class_disconnect_all(obd);
1235                 if (!list_empty(&obd->obd_exports)) {
1236                         CERROR("still has exports after forced cleanup?\n");
1237                         RETURN(-EBUSY);
1238                 }
1239         }
1240
1241         ldlm_namespace_free(obd->obd_namespace);
1242
1243         sb = obd->u.filter.fo_sb;
1244         if (!obd->u.filter.fo_sb)
1245                 RETURN(0);
1246
1247         filter_post(obd);
1248
1249         shrink_dcache_parent(sb->s_root);
1250         unlock_kernel();
1251         mntput(obd->u.filter.fo_vfsmnt);
1252         obd->u.filter.fo_sb = 0;
1253         kfree(obd->u.filter.fo_fstype);
1254         fsfilt_put_ops(obd->obd_fsops);
1255
1256         lock_kernel();
1257
1258         RETURN(0);
1259 }
1260
1261 int filter_attach(struct obd_device *dev, obd_count len, void *data)
1262 {
1263         struct lprocfs_static_vars lvars;
1264
1265         lprocfs_init_vars(&lvars);
1266         return lprocfs_obd_attach(dev, lvars.obd_vars);
1267 }
1268
1269 int filter_detach(struct obd_device *dev)
1270 {
1271         return lprocfs_obd_detach(dev);
1272 }
1273
1274 /* nearly identical to mds_connect */
1275 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1276                           struct obd_uuid *cluuid, struct recovd_obd *recovd,
1277                           ptlrpc_recovery_cb_t recover)
1278 {
1279         struct obd_export *exp;
1280         struct filter_export_data *fed;
1281         struct filter_client_data *fcd;
1282         struct filter_obd *filter = &obd->u.filter;
1283         int rc;
1284
1285         ENTRY;
1286
1287         if (!conn || !obd || !cluuid)
1288                 RETURN(-EINVAL);
1289
1290         rc = class_connect(conn, obd, cluuid);
1291         if (rc)
1292                 RETURN(rc);
1293         exp = class_conn2export(conn);
1294         LASSERT(exp);
1295
1296         fed = &exp->exp_filter_data;
1297
1298         INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
1299         spin_lock_init(&exp->exp_filter_data.fed_lock);
1300
1301         if (!(obd->obd_flags & OBD_REPLAYABLE))
1302                 RETURN(0);
1303
1304         OBD_ALLOC(fcd, sizeof(*fcd));
1305         if (!fcd) {
1306                 CERROR("filter: out of memory for client data\n");
1307                 GOTO(out_export, rc = -ENOMEM);
1308         }
1309
1310         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1311         fed->fed_fcd = fcd;
1312         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1313
1314         rc = filter_client_add(filter, fed, -1);
1315         if (rc)
1316                 GOTO(out_fcd, rc);
1317
1318         RETURN(rc);
1319
1320 out_fcd:
1321         OBD_FREE(fcd, sizeof(*fcd));
1322 out_export:
1323         class_disconnect(conn);
1324
1325         RETURN(rc);
1326 }
1327
1328 /* also incredibly similar to mds_disconnect */
1329 static int filter_disconnect(struct lustre_handle *conn)
1330 {
1331         struct obd_export *exp = class_conn2export(conn);
1332         struct filter_export_data *fed;
1333         int rc;
1334         ENTRY;
1335
1336         LASSERT(exp);
1337         fed = &exp->exp_filter_data;
1338         spin_lock(&fed->fed_lock);
1339         while (!list_empty(&fed->fed_open_head)) {
1340                 struct filter_file_data *ffd;
1341
1342                 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1343                                  ffd_export_list);
1344                 list_del(&ffd->ffd_export_list);
1345                 spin_unlock(&fed->fed_lock);
1346
1347                 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1348                        ffd->ffd_file->f_dentry->d_name.len,
1349                        ffd->ffd_file->f_dentry->d_name.name,
1350                        ffd, ffd->ffd_servercookie);
1351
1352                 filter_close_internal(exp, ffd, NULL);
1353                 spin_lock(&fed->fed_lock);
1354         }
1355         spin_unlock(&fed->fed_lock);
1356
1357         ldlm_cancel_locks_for_export(exp);
1358
1359         if (exp->exp_obd->obd_flags & OBD_REPLAYABLE)
1360                 filter_client_free(exp);
1361
1362         rc = class_disconnect(conn);
1363
1364         /* XXX cleanup preallocated inodes */
1365         RETURN(rc);
1366 }
1367
1368 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1369 {
1370         int type = oa->o_mode & S_IFMT;
1371         ENTRY;
1372
1373         CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1374                inode->i_ino, inode, oa->o_id, valid);
1375         /* Don't copy the inode number in place of the object ID */
1376         obdo_from_inode(oa, inode, valid);
1377         oa->o_mode &= ~S_IFMT;
1378         oa->o_mode |= type;
1379
1380         if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1381                 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1382                 oa->o_rdev = rdev;
1383                 oa->o_valid |= OBD_MD_FLRDEV;
1384         }
1385
1386         EXIT;
1387 }
1388
1389 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
1390 {
1391         struct filter_file_data *ffd = NULL;
1392         ENTRY;
1393
1394         if (!handle || !handle->addr)
1395                 RETURN(NULL);
1396
1397         ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
1398         if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
1399                 RETURN(NULL);
1400
1401         if (ffd->ffd_servercookie != handle->cookie)
1402                 RETURN(NULL);
1403
1404         LASSERT(ffd->ffd_file->private_data == ffd);
1405         RETURN(ffd);
1406 }
1407
1408 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1409                                          struct obdo *oa, int locked,char *what)
1410 {
1411         struct dentry *dentry = NULL;
1412
1413         if (oa->o_valid & OBD_MD_FLHANDLE) {
1414                 struct lustre_handle *ost_handle = obdo_handle(oa);
1415                 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1416
1417                 if (ffd)
1418                         dentry = dget(ffd->ffd_file->f_dentry);
1419         }
1420
1421         if (!dentry) {
1422                 struct obd_device *obd = class_conn2obd(conn);
1423                 if (!obd) {
1424                         CERROR("invalid client "LPX64"\n", conn->addr);
1425                         RETURN(ERR_PTR(-EINVAL));
1426                 }
1427                 dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
1428                                                               oa->o_id),
1429                                            oa->o_id, locked);
1430         }
1431
1432         if (IS_ERR(dentry)) {
1433                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1434                 RETURN(dentry);
1435         }
1436
1437         if (!dentry->d_inode) {
1438                 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1439                 f_dput(dentry);
1440                 LBUG();
1441                 RETURN(ERR_PTR(-ENOENT));
1442         }
1443
1444         return dentry;
1445 }
1446
1447 #define filter_oa2dentry(conn, oa, locked) __filter_oa2dentry(conn, oa, locked,\
1448                                                               __FUNCTION__)
1449
1450 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1451                           struct lov_stripe_md *md)
1452 {
1453         struct dentry *dentry = NULL;
1454         int rc = 0;
1455         ENTRY;
1456
1457         XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
1458
1459         dentry = filter_oa2dentry(conn, oa, 1);
1460         if (IS_ERR(dentry))
1461                 RETURN(PTR_ERR(dentry));
1462
1463         filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1464
1465         f_dput(dentry);
1466         RETURN(rc);
1467 }
1468
1469 /* this is called from filter_truncate() until we have filter_punch() */
1470 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1471                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1472 {
1473         struct obd_run_ctxt saved;
1474         struct obd_export *export = class_conn2export(conn);
1475         struct obd_device *obd = class_conn2obd(conn);
1476         struct filter_obd *filter = &obd->u.filter;
1477         struct dentry *dentry;
1478         struct iattr iattr;
1479         struct inode *inode;
1480         void * handle;
1481         int rc, rc2;
1482         ENTRY;
1483
1484         XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
1485
1486         dentry = filter_oa2dentry(conn, oa, 0);
1487
1488         if (IS_ERR(dentry))
1489                 RETURN(PTR_ERR(dentry));
1490
1491         iattr_from_obdo(&iattr, oa, oa->o_valid);
1492         iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1493         inode = dentry->d_inode;
1494
1495         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1496         lock_kernel();
1497         if (iattr.ia_valid & ATTR_SIZE)
1498                 down(&inode->i_sem);
1499
1500         filter_start_transno(export);
1501         handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1502         if (IS_ERR(handle)) {
1503                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1504                 GOTO(out_unlock, rc);
1505         }
1506
1507         if (inode->i_op->setattr)
1508                 rc = inode->i_op->setattr(dentry, &iattr);
1509         else
1510                 rc = inode_setattr(inode, &iattr);
1511         rc = filter_finish_transno(export, handle, oti, rc);
1512         rc2 = fsfilt_commit(obd, dentry->d_inode, handle);
1513         if (rc2) {
1514                 CERROR("error on commit, err = %d\n", rc2);
1515                 if (!rc)
1516                         rc = rc2;
1517         }
1518
1519         if (iattr.ia_valid & ATTR_SIZE) {
1520                 up(&inode->i_sem);
1521                 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1522                 obdo_from_inode(oa, inode, oa->o_valid);
1523         }
1524
1525 out_unlock:
1526         unlock_kernel();
1527         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1528
1529         f_dput(dentry);
1530         RETURN(rc);
1531 }
1532
1533 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1534                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
1535 {
1536         struct obd_export *export;
1537         struct lustre_handle *handle;
1538         struct filter_file_data *ffd;
1539         struct file *filp;
1540         int rc = 0;
1541         ENTRY;
1542
1543         export = class_conn2export(conn);
1544         if (!export) {
1545                 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1546                 RETURN(-EINVAL);
1547         }
1548
1549         XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
1550
1551         filp = filter_obj_open(export, oa->o_id, oa->o_mode);
1552         if (IS_ERR(filp))
1553                 GOTO(out, rc = PTR_ERR(filp));
1554
1555         filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1556
1557         ffd = filp->private_data;
1558         handle = obdo_handle(oa);
1559         handle->addr = (__u64)(unsigned long)ffd;
1560         handle->cookie = ffd->ffd_servercookie;
1561         oa->o_valid |= OBD_MD_FLHANDLE;
1562         EXIT;
1563 out:
1564         return rc;
1565 } /* filter_open */
1566
1567 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1568                         struct lov_stripe_md *ea, struct obd_trans_info *oti)
1569 {
1570         struct obd_export *exp;
1571         struct filter_file_data *ffd;
1572         struct filter_export_data *fed;
1573         int rc;
1574         ENTRY;
1575
1576         exp = class_conn2export(conn);
1577         if (!exp) {
1578                 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1579                 RETURN(-EINVAL);
1580         }
1581
1582         XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
1583
1584         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1585                 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1586                 RETURN(-EINVAL);
1587         }
1588
1589         ffd = filter_handle2ffd(obdo_handle(oa));
1590         if (!ffd) {
1591                 struct lustre_handle *handle = obdo_handle(oa);
1592                 CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
1593                        handle->addr, handle->cookie);
1594                 RETURN(-ESTALE);
1595         }
1596
1597         fed = &exp->exp_filter_data;
1598         spin_lock(&fed->fed_lock);
1599         list_del(&ffd->ffd_export_list);
1600         spin_unlock(&fed->fed_lock);
1601
1602         rc = filter_close_internal(exp, ffd, oti);
1603
1604         RETURN(rc);
1605 } /* filter_close */
1606
1607 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1608                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
1609 {
1610         struct obd_export *export = class_conn2export(conn);
1611         struct obd_device *obd = class_conn2obd(conn);
1612         struct filter_obd *filter = &obd->u.filter;
1613         struct obd_run_ctxt saved;
1614         struct dentry *dir_dentry;
1615         struct dentry *new;
1616         struct iattr;
1617         void *handle;
1618         int err, rc;
1619         ENTRY;
1620
1621         if (!obd) {
1622                 CERROR("invalid client "LPX64"\n", conn->addr);
1623                 return -EINVAL;
1624         }
1625
1626         XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
1627
1628         oa->o_id = filter_next_id(obd);
1629
1630         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1631         dir_dentry = filter_parent(obd, S_IFREG, oa->o_id);
1632         down(&dir_dentry->d_inode->i_sem);
1633         new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
1634         if (IS_ERR(new))
1635                 GOTO(out, rc = PTR_ERR(new));
1636
1637         if (new->d_inode) {
1638                 char buf[32];
1639
1640                 /* This would only happen if lastobjid was bad on disk */
1641                 CERROR("objid %s already exists\n",
1642                        filter_id(buf, filter, oa->o_mode, oa->o_id));
1643                 LBUG();
1644                 GOTO(out, rc = -EEXIST);
1645         }
1646
1647         filter_start_transno(export);
1648         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
1649         if (IS_ERR(handle)) {
1650                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1651                 GOTO(out_put, rc);
1652         }
1653         rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
1654         if (rc)
1655                 CERROR("create failed rc = %d\n", rc);
1656
1657         rc = filter_finish_transno(export, handle, oti, rc);
1658         err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1659         if (err) {
1660                 CERROR("unable to write lastobjid but file created\n");
1661                 if (!rc)
1662                         rc = err;
1663         }
1664         err = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1665         if (err) {
1666                 CERROR("error on commit, err = %d\n", err);
1667                 if (!rc)
1668                         rc = err;
1669         }
1670
1671         if (rc)
1672                 GOTO(out_put, rc);
1673
1674         /* Set flags for fields we have set in the inode struct */
1675         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1676                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1677         filter_from_inode(oa, new->d_inode, oa->o_valid);
1678
1679         EXIT;
1680 out_put:
1681         f_dput(new);
1682 out:
1683         up(&dir_dentry->d_inode->i_sem);
1684         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1685         return rc;
1686 }
1687
1688 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1689                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
1690 {
1691         struct obd_export *export = class_conn2export(conn);
1692         struct obd_device *obd = class_conn2obd(conn);
1693         struct filter_obd *filter = &obd->u.filter;
1694         struct dentry *dir_dentry, *object_dentry;
1695         struct filter_dentry_data *fdd;
1696         struct obd_run_ctxt saved;
1697         void *handle;
1698         int rc, rc2;
1699         ENTRY;
1700
1701         if (!obd) {
1702                 CERROR("invalid client "LPX64"\n", conn->addr);
1703                 RETURN(-EINVAL);
1704         }
1705
1706         XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
1707
1708         CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1709
1710         dir_dentry = filter_parent(obd, oa->o_mode, oa->o_id);
1711         down(&dir_dentry->d_inode->i_sem);
1712
1713         object_dentry = filter_oa2dentry(conn, oa, 0);
1714         if (IS_ERR(object_dentry))
1715                 GOTO(out, rc = -ENOENT);
1716
1717         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1718         filter_start_transno(export);
1719         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
1720         if (IS_ERR(handle)) {
1721                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1722                 GOTO(out_ctxt, rc);
1723         }
1724
1725         fdd = object_dentry->d_fsdata;
1726         if (fdd && atomic_read(&fdd->fdd_open_count)) {
1727                 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1728                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1729                         /* XXX put into PENDING directory in case of crash */
1730                         CDEBUG(D_INODE,
1731                                "defer destroy of %dx open objid "LPU64"\n",
1732                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1733                 } else
1734                         CDEBUG(D_INODE,
1735                                "repeat destroy of %dx open objid "LPU64"\n",
1736                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1737                 GOTO(out_commit, rc = 0);
1738         }
1739
1740         rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
1741
1742 out_commit:
1743         /* XXX save last_rcvd on disk */
1744         rc = filter_finish_transno(export, handle, oti, rc);
1745         rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1746         if (rc2) {
1747                 CERROR("error on commit, err = %d\n", rc2);
1748                 if (!rc)
1749                         rc = rc2;
1750         }
1751 out_ctxt:
1752         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1753         f_dput(object_dentry);
1754
1755         EXIT;
1756 out:
1757         up(&dir_dentry->d_inode->i_sem);
1758         return rc;
1759 }
1760
1761 /* NB start and end are used for punch, but not truncate */
1762 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1763                            struct lov_stripe_md *lsm,
1764                            obd_off start, obd_off end,
1765                            struct obd_trans_info *oti)
1766 {
1767         int error;
1768         ENTRY;
1769
1770         XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
1771
1772         if (end != OBD_OBJECT_EOF)
1773                 CERROR("PUNCH not supported, only truncate works\n");
1774
1775         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
1776                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1777         oa->o_size = start;
1778         error = filter_setattr(conn, oa, NULL, oti);
1779         RETURN(error);
1780 }
1781
1782 static inline void lustre_put_page(struct page *page)
1783 {
1784         kunmap(page);
1785         page_cache_release(page);
1786 }
1787
1788
1789 static struct page *
1790 lustre_get_page_read(struct inode *inode, struct niobuf_local *lnb)
1791 {
1792         unsigned long index = lnb->offset >> PAGE_SHIFT;
1793         struct address_space *mapping = inode->i_mapping;
1794         struct page *page;
1795         int rc;
1796
1797         page = read_cache_page(mapping, index,
1798                                (filler_t*)mapping->a_ops->readpage, NULL);
1799         if (!IS_ERR(page)) {
1800                 wait_on_page(page);
1801                 lnb->addr = kmap(page);
1802                 lnb->page = page;
1803                 if (!PageUptodate(page)) {
1804                         CERROR("page index %lu not uptodate\n", index);
1805                         GOTO(err_page, rc = -EIO);
1806                 }
1807                 if (PageError(page)) {
1808                         CERROR("page index %lu has error\n", index);
1809                         GOTO(err_page, rc = -EIO);
1810                 }
1811         }
1812         return page;
1813
1814 err_page:
1815         lustre_put_page(page);
1816         return ERR_PTR(rc);
1817 }
1818
1819 static struct page *
1820 lustre_get_page_write(struct inode *inode, unsigned long index)
1821 {
1822         struct address_space *mapping = inode->i_mapping;
1823         struct page *page;
1824         int rc;
1825
1826         page = grab_cache_page(mapping, index); /* locked page */
1827
1828         if (!IS_ERR(page)) {
1829                 kmap(page);
1830                 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
1831                  * a no-op for most filesystems, because we write the whole
1832                  * page.  For partial-page I/O this will read in the page.
1833                  */
1834                 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
1835                 if (rc) {
1836                         CERROR("page index %lu, rc = %d\n", index, rc);
1837                         if (rc != -ENOSPC)
1838                                 LBUG();
1839                         GOTO(err_unlock, rc);
1840                 }
1841                 /* XXX not sure if we need this if we are overwriting page */
1842                 if (PageError(page)) {
1843                         CERROR("error on page index %lu, rc = %d\n", index, rc);
1844                         LBUG();
1845                         GOTO(err_unlock, rc = -EIO);
1846                 }
1847         }
1848         return page;
1849
1850 err_unlock:
1851         unlock_page(page);
1852         lustre_put_page(page);
1853         return ERR_PTR(rc);
1854 }
1855
1856 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1857 int waitfor_one_page(struct page *page)
1858 {
1859         wait_on_page_locked(page);
1860         return 0;
1861 }
1862 #endif
1863
1864 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1865 /* We should only change the file mtime (and not the ctime, like
1866  * update_inode_times() in generic_file_write()) when we only change data.
1867  */
1868 static inline void inode_update_time(struct inode *inode, int ctime_too)
1869 {
1870         time_t now = CURRENT_TIME;
1871         if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
1872                 return;
1873         inode->i_mtime = now;
1874         if (ctime_too)
1875                 inode->i_ctime = now;
1876         mark_inode_dirty_sync(inode);
1877 }
1878 #endif
1879
1880 static int lustre_commit_write(struct niobuf_local *lnb)
1881 {
1882         struct page *page = lnb->page;
1883         unsigned from = lnb->offset & ~PAGE_MASK;
1884         unsigned to = from + lnb->len;
1885         struct inode *inode = page->mapping->host;
1886         int err;
1887
1888         LASSERT(to <= PAGE_SIZE);
1889         err = page->mapping->a_ops->commit_write(NULL, page, from, to);
1890         if (!err && IS_SYNC(inode))
1891                 err = waitfor_one_page(page);
1892         //SetPageUptodate(page); // the client commit_write will do this
1893
1894         SetPageReferenced(page);
1895         unlock_page(page);
1896         lustre_put_page(page);
1897         return err;
1898 }
1899
1900 struct page *filter_get_page_write(struct inode *inode,
1901                                    struct niobuf_local *lnb, int *pglocked)
1902 {
1903         unsigned long index = lnb->offset >> PAGE_SHIFT;
1904         struct address_space *mapping = inode->i_mapping;
1905         struct page *page;
1906         int rc;
1907
1908         //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
1909         if (*pglocked)
1910                 page = grab_cache_page_nowait(mapping, index); /* locked page */
1911         else
1912                 page = grab_cache_page(mapping, index); /* locked page */
1913
1914
1915         /* This page is currently locked, so get a temporary page instead. */
1916         if (!page) {
1917                 unsigned long addr;
1918                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
1919                 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
1920                 if (!addr) {
1921                         CERROR("no memory for a temp page\n");
1922                         GOTO(err, rc = -ENOMEM);
1923                 }
1924                 POISON((void *)addr, 0xBA, PAGE_SIZE);
1925                 page = virt_to_page(addr);
1926                 kmap(page);
1927                 page->index = index;
1928                 lnb->addr = (void *)addr;
1929                 lnb->page = page;
1930                 lnb->flags |= N_LOCAL_TEMP_PAGE;
1931         } else if (!IS_ERR(page)) {
1932                 (*pglocked)++;
1933                 kmap(page);
1934
1935                 rc = mapping->a_ops->prepare_write(NULL, page,
1936                                                    lnb->offset & ~PAGE_MASK,
1937                                                    lnb->len);
1938                 if (rc) {
1939                         if (rc != -ENOSPC)
1940                                 CERROR("page index %lu, rc = %d\n", index, rc);
1941                         GOTO(err_unlock, rc);
1942                 }
1943                 /* XXX not sure if we need this if we are overwriting page */
1944                 if (PageError(page)) {
1945                         CERROR("error on page index %lu, rc = %d\n", index, rc);
1946                         LBUG();
1947                         GOTO(err_unlock, rc = -EIO);
1948                 }
1949                 lnb->addr = page_address(page);
1950                 lnb->page = page;
1951         }
1952
1953         return page;
1954
1955 err_unlock:
1956         unlock_page(page);
1957         lustre_put_page(page);
1958 err:
1959         return ERR_PTR(rc);
1960 }
1961
1962 /*
1963  * We need to balance prepare_write() calls with commit_write() calls.
1964  * If the page has been prepared, but we have no data for it, we don't
1965  * want to overwrite valid data on disk, but we still need to zero out
1966  * data for space which was newly allocated.  Like part of what happens
1967  * in __block_prepare_write() for newly allocated blocks.
1968  *
1969  * XXX currently __block_prepare_write() creates buffers for all the
1970  *     pages, and the filesystems mark these buffers as BH_New if they
1971  *     were newly allocated from disk. We use the BH_New flag similarly.
1972  */
1973 static int filter_commit_write(struct niobuf_local *lnb, int err)
1974 {
1975 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1976         if (err) {
1977                 unsigned block_start, block_end;
1978                 struct buffer_head *bh, *head = lnb->page->buffers;
1979                 unsigned blocksize = head->b_size;
1980
1981                 /* debugging: just seeing if this ever happens */
1982                 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
1983                        "called for ino %lu:%lu on err %d\n",
1984                        lnb->page->mapping->host->i_ino, lnb->page->index, err);
1985
1986                 /* Currently one buffer per page, but in the future... */
1987                 for (bh = head, block_start = 0; bh != head || !block_start;
1988                      block_start = block_end, bh = bh->b_this_page) {
1989                         block_end = block_start + blocksize;
1990                         if (buffer_new(bh))
1991                                 memset(lnb->addr + block_start, 0, blocksize);
1992                 }
1993         }
1994 #endif
1995         return lustre_commit_write(lnb);
1996 }
1997
1998 static int filter_preprw(int cmd, struct lustre_handle *conn,
1999                          int objcount, struct obd_ioobj *obj,
2000                          int niocount, struct niobuf_remote *nb,
2001                          struct niobuf_local *res, void **desc_private,
2002                          struct obd_trans_info *oti)
2003 {
2004         struct obd_run_ctxt saved;
2005         struct obd_export *export;
2006         struct obd_device *obd;
2007         struct obd_ioobj *o;
2008         struct niobuf_remote *rnb = nb;
2009         struct niobuf_local *lnb = res;
2010         struct fsfilt_objinfo *fso;
2011         int pglocked = 0;
2012         int rc = 0;
2013         int i;
2014         ENTRY;
2015
2016         if ((cmd & OBD_BRW_WRITE) != 0)
2017                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2018         else
2019                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2020
2021         memset(res, 0, niocount * sizeof(*res));
2022
2023         export = class_conn2export(conn);
2024         obd = class_conn2obd(conn);
2025         if (!obd) {
2026                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2027                 RETURN(-EINVAL);
2028         }
2029
2030         LASSERT(objcount < 16); // theoretically we support multi-obj BRW
2031
2032         OBD_ALLOC(fso, objcount * sizeof(*fso));
2033         if (!fso)
2034                 RETURN(-ENOMEM);
2035
2036         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2037
2038         for (i = 0, o = obj; i < objcount; i++, o++) {
2039                 struct filter_dentry_data *fdd;
2040                 struct dentry *dentry;
2041
2042                 LASSERT(o->ioo_bufcnt);
2043
2044                 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2045                                                               o->ioo_id),
2046                                            o->ioo_id, 0);
2047
2048                 if (IS_ERR(dentry)) 
2049                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
2050
2051                 fso[i].fso_dentry = dentry;
2052                 fso[i].fso_bufcnt = o->ioo_bufcnt;
2053
2054                 if (!dentry->d_inode) {
2055                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2056                                o->ioo_id);
2057                         f_dput(dentry);
2058                         GOTO(out_objinfo, rc = -ENOENT);
2059                 }
2060
2061                 fdd = dentry->d_fsdata;
2062                 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2063                         CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2064                                o->ioo_id);
2065         }
2066
2067         if (cmd & OBD_BRW_WRITE) {
2068 #warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
2069                 /* Even worse, we need to get locks on mulitple inodes (in
2070                  * order) or use the DLM to do the locking for us (and use
2071                  * the same locking in filter_setattr() for truncate.  The
2072                  * handling gets very ugly when dealing with locked pages.
2073                  * It may be easier to just get rid of the locked page code
2074                  * (which has problems of its own) and either discover we do
2075                  * not need it anymore (i.e. it was a symptom of another bug)
2076                  * or ensure we get the page locks in an appropriate order.
2077                  */
2078                 /* Danger, Will Robinson! You are taking a lock here and also
2079                  * starting a transaction and releasing/finishing then in
2080                  * filter_commitrw(), so you must call fsfilt_commit() and
2081                  * finish_transno() if an error occurs in this function.
2082                  */
2083                 filter_start_transno(export);
2084                 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2085                                                  niocount, nb);
2086                 if (IS_ERR(*desc_private)) {
2087                         rc = PTR_ERR(*desc_private);
2088                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2089                                "error starting transaction: rc = %d\n", rc);
2090                         *desc_private = NULL;
2091                         GOTO(out_objinfo, rc);
2092                 }
2093         }
2094
2095         obd_kmap_get(niocount, 1);
2096
2097         for (i = 0, o = obj; i < objcount; i++, o++) {
2098                 struct dentry *dentry;
2099                 struct inode *inode;
2100                 int j;
2101
2102                 dentry = fso[i].fso_dentry;
2103                 inode = dentry->d_inode;
2104
2105                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2106                         struct page *page;
2107
2108                         if (j == 0)
2109                                 lnb->dentry = dentry;
2110                         else
2111                                 lnb->dentry = dget(dentry);
2112
2113                         /* lnb->offset is aligned, while rnb->offset isn't,
2114                          * and we need to copy the fields to lnb anyways.
2115                          */
2116                         memcpy(lnb, rnb, sizeof(*rnb));
2117                         if (cmd & OBD_BRW_WRITE) {
2118                                 page = filter_get_page_write(inode, lnb,
2119                                                              &pglocked);
2120
2121                                 XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
2122                                                           lnb->len);
2123                         } else {
2124                                 page = lustre_get_page_read(inode, lnb);
2125
2126                                 XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
2127                                                           lnb->len);
2128                         }
2129
2130                         if (IS_ERR(page)) {
2131                                 rc = PTR_ERR(page);
2132                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2133                                        "error on page @"LPU64"%u/%u: rc = %d\n",
2134                                        lnb->offset, j, o->ioo_bufcnt, rc);
2135                                 f_dput(dentry);
2136                                 GOTO(out_pages, rc);
2137                         }
2138                 }
2139         }
2140
2141         EXIT;
2142 out:
2143         OBD_FREE(fso, objcount * sizeof(*fso));
2144         current->journal_info = NULL;
2145         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2146         return rc;
2147
2148 out_pages:
2149         while (lnb-- > res) {
2150                 if (cmd & OBD_BRW_WRITE)
2151                         filter_commit_write(lnb, rc);
2152                 else
2153                         lustre_put_page(lnb->page);
2154                 f_dput(lnb->dentry);
2155         }
2156         obd_kmap_put(niocount);
2157         if (cmd & OBD_BRW_WRITE) {
2158                 filter_finish_transno(export, *desc_private, oti, rc);
2159                 fsfilt_commit(obd,
2160                               filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2161                               *desc_private);
2162         }
2163         goto out; /* dropped the dentry refs already (one per page) */
2164
2165 out_objinfo:
2166         for (i = 0; i < objcount && fso[i].fso_dentry; i++)
2167                 f_dput(fso[i].fso_dentry);
2168         goto out;
2169 }
2170
2171 static int filter_write_locked_page(struct niobuf_local *lnb)
2172 {
2173         struct page *lpage;
2174         int rc;
2175         ENTRY;
2176
2177         lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2178         if (IS_ERR(lpage)) {
2179                 /* It is highly unlikely that we would ever get an error here.
2180                  * The page we want to get was previously locked, so it had to
2181                  * have already allocated the space, and we were just writing
2182                  * over the same data, so there would be no hole in the file.
2183                  *
2184                  * XXX: possibility of a race with truncate could exist, need
2185                  *      to check that.  There are no guarantees w.r.t.
2186                  *      write order even on a local filesystem, although the
2187                  *      normal response would be to return the number of bytes
2188                  *      successfully written and leave the rest to the app.
2189                  */
2190                 rc = PTR_ERR(lpage);
2191                 CERROR("error getting locked page index %ld: rc = %d\n",
2192                        lnb->page->index, rc);
2193                 LBUG();
2194                 lustre_commit_write(lnb);
2195                 RETURN(rc);
2196         }
2197
2198         /* lpage is kmapped in lustre_get_page_write() above and kunmapped in
2199          * lustre_commit_write() below, lnb->page was kmapped previously in
2200          * filter_get_page_write() and kunmapped in lustre_put_page() below.
2201          */
2202         memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE);
2203         lustre_put_page(lnb->page);
2204
2205         lnb->page = lpage;
2206         rc = lustre_commit_write(lnb);
2207         if (rc)
2208                 CERROR("error committing locked page %ld: rc = %d\n",
2209                        lnb->page->index, rc);
2210
2211         RETURN(rc);
2212 }
2213
2214 static int filter_syncfs(struct lustre_handle *conn)
2215 {
2216         struct obd_device *obd;
2217         ENTRY;
2218
2219         obd = class_conn2obd(conn);
2220
2221         XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
2222
2223         RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2224 }
2225
2226 static int filter_commitrw(int cmd, struct lustre_handle *conn,
2227                            int objcount, struct obd_ioobj *obj,
2228                            int niocount, struct niobuf_local *res,
2229                            void *desc_private, struct obd_trans_info *oti)
2230 {
2231         struct obd_run_ctxt saved;
2232         struct obd_ioobj *o;
2233         struct niobuf_local *lnb;
2234         struct obd_export *export = class_conn2export(conn);
2235         struct obd_device *obd = class_conn2obd(conn);
2236         int found_locked = 0;
2237         int rc = 0;
2238         int i;
2239         ENTRY;
2240
2241         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2242
2243         LASSERT(!current->journal_info);
2244         current->journal_info = desc_private;
2245
2246         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2247                 int j;
2248
2249                 if (cmd & OBD_BRW_WRITE)
2250                         inode_update_time(lnb->dentry->d_inode, 1);
2251                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2252                         if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2253                                 found_locked++;
2254                                 continue;
2255                         }
2256
2257                         if (cmd & OBD_BRW_WRITE) {
2258                                 int err = filter_commit_write(lnb, 0);
2259
2260                                 if (!rc)
2261                                         rc = err;
2262                         } else
2263                                 lustre_put_page(lnb->page);
2264
2265                         obd_kmap_put(1);
2266                         f_dput(lnb->dentry);
2267                 }
2268         }
2269
2270         for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2271                         i++, o++) {
2272                 int j;
2273                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2274                         int err;
2275                         if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2276                                 continue;
2277
2278                         err = filter_write_locked_page(lnb);
2279                         obd_kmap_put(1);
2280                         if (!rc)
2281                                 rc = err;
2282                         f_dput(lnb->dentry);
2283                         found_locked--;
2284                 }
2285         }
2286
2287         if (cmd & OBD_BRW_WRITE) {
2288                 /* We just want any dentry for the commit, for now */
2289                 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0);
2290                 int err;
2291
2292                 rc = filter_finish_transno(export, desc_private, oti, rc);
2293                 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
2294                 if (err)
2295                         rc = err;
2296                 if (obd_sync_filter) {
2297                         /* this can fail with ENOMEM, what should we do then? */
2298                         filter_syncfs(conn);
2299                 }
2300                 /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
2301         }
2302
2303         LASSERT(!current->journal_info);
2304
2305         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2306         RETURN(rc);
2307 }
2308
2309 static int filter_brw(int cmd, struct lustre_handle *conn,
2310                       struct lov_stripe_md *lsm, obd_count oa_bufs,
2311                       struct brw_page *pga, struct obd_brw_set *set,
2312                       struct obd_trans_info *oti)
2313 {
2314         struct obd_ioobj        ioo;
2315         struct niobuf_local     *lnb;
2316         struct niobuf_remote    *rnb;
2317         obd_count               i;
2318         void                    *desc_private;
2319         int                     ret = 0;
2320         ENTRY;
2321
2322         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2323         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2324
2325         if (lnb == NULL || rnb == NULL)
2326                 GOTO(out, ret = -ENOMEM);
2327
2328         for (i = 0; i < oa_bufs; i++) {
2329                 rnb[i].offset = pga[i].off;
2330                 rnb[i].len = pga[i].count;
2331         }
2332
2333         ioo.ioo_id = lsm->lsm_object_id;
2334         ioo.ioo_gr = 0;
2335         ioo.ioo_type = S_IFREG;
2336         ioo.ioo_bufcnt = oa_bufs;
2337
2338         ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
2339                             &desc_private, oti);
2340         if (ret != 0)
2341                 GOTO(out, ret);
2342
2343         for (i = 0; i < oa_bufs; i++) {
2344                 void *virt = kmap(pga[i].pg);
2345                 obd_off off = pga[i].off & ~PAGE_MASK;
2346
2347                 if (cmd & OBD_BRW_WRITE)
2348                         memcpy(lnb[i].addr + off, virt + off, pga[i].count);
2349                 else
2350                         memcpy(virt + off, lnb[i].addr + off, pga[i].count);
2351
2352                 kunmap(virt);
2353         }
2354
2355         ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private,
2356                               oti);
2357
2358 out:
2359         if (lnb)
2360                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2361         if (rnb)
2362                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2363         RETURN(ret);
2364 }
2365
2366 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2367                              int objcount, struct obd_ioobj *obj,
2368                              int niocount, struct niobuf_remote *nb)
2369 {
2370         struct obd_device *obd;
2371         struct obd_ioobj *o = obj;
2372         struct niobuf_remote *rnb = nb;
2373         int rc = 0;
2374         int i;
2375         ENTRY;
2376
2377         if ((cmd & OBD_BRW_WRITE) != 0)
2378                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2379         else
2380                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2381
2382         obd = class_conn2obd(conn);
2383         if (!obd) {
2384                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2385                 RETURN(-EINVAL);
2386         }
2387
2388         for (i = 0; i < objcount; i++, o++) {
2389                 struct dentry *dentry;
2390                 struct inode *inode;
2391                 int (*fs_bmap)(struct address_space *, long);
2392                 int j;
2393
2394                 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2395                                                               o->ioo_id),
2396                                            o->ioo_id, 0);
2397                 if (IS_ERR(dentry))
2398                         GOTO(out, rc = PTR_ERR(dentry));
2399                 inode = dentry->d_inode;
2400                 if (!inode) {
2401                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2402                                o->ioo_id);
2403                         f_dput(dentry);
2404                         GOTO(out, rc = -ENOENT);
2405                 }
2406                 fs_bmap = inode->i_mapping->a_ops->bmap;
2407
2408                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2409                         long block;
2410
2411                         block = rnb->offset >> inode->i_blkbits;
2412
2413                         if (cmd == OBD_BRW_READ) {
2414                                 block = fs_bmap(inode->i_mapping, block);
2415                         } else {
2416                                 loff_t newsize = rnb->offset + rnb->len;
2417                                 /* fs_prep_san_write will also update inode
2418                                  * size for us:
2419                                  * (1) new alloced block
2420                                  * (2) existed block but size extented
2421                                  */
2422                                 /* FIXME We could call fs_prep_san_write()
2423                                  * only once for all the blocks allocation.
2424                                  * Now call it once for each block, for
2425                                  * simplicity. And if error happens, we
2426                                  * probably need to release previous alloced
2427                                  * block */
2428                                 rc = fs_prep_san_write(obd, inode, &block,
2429                                                        1, newsize);
2430                                 if (rc)
2431                                         break;
2432                         }
2433
2434                         rnb->offset = block;
2435                 }
2436                 f_dput(dentry);
2437         }
2438 out:
2439         RETURN(rc);
2440 }
2441
2442 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2443 {
2444         struct obd_device *obd;
2445         ENTRY;
2446
2447         obd = class_conn2obd(conn);
2448
2449         XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
2450
2451         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2452 }
2453
2454 static int filter_get_info(struct lustre_handle *conn, obd_count keylen,
2455                            void *key, obd_count *vallen, void **val)
2456 {
2457         struct obd_device *obd;
2458         ENTRY;
2459
2460         obd = class_conn2obd(conn);
2461         if (!obd) {
2462                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2463                 RETURN(-EINVAL);
2464         }
2465
2466         if ( keylen == strlen("blocksize") &&
2467              memcmp(key, "blocksize", keylen) == 0 ) {
2468                 *vallen = sizeof(long);
2469                 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize;
2470                 RETURN(0);
2471         }
2472
2473         if ( keylen == strlen("blocksize_bits") &&
2474              memcmp(key, "blocksize_bits", keylen) == 0 ){
2475                 *vallen = sizeof(long);
2476                 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize_bits;
2477                 RETURN(0);
2478         }
2479
2480         CDEBUG(D_IOCTL, "invalid key\n");
2481         RETURN(-EINVAL);
2482 }
2483
2484 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2485                   struct lustre_handle *src_conn, struct obdo *src,
2486                   obd_size count, obd_off offset, struct obd_trans_info *oti)
2487 {
2488         struct page *page;
2489         struct lov_stripe_md srcmd, dstmd;
2490         unsigned long index = 0;
2491         int err = 0;
2492
2493         LBUG(); /* THIS CODE IS NOT CORRECT -phil */
2494
2495         memset(&srcmd, 0, sizeof(srcmd));
2496         memset(&dstmd, 0, sizeof(dstmd));
2497         srcmd.lsm_object_id = src->o_id;
2498         dstmd.lsm_object_id = dst->o_id;
2499
2500         ENTRY;
2501         CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2502                ", dst: ino "LPU64"\n",
2503                src->o_id, src->o_blocks, src->o_size, dst->o_id);
2504         page = alloc_page(GFP_USER);
2505         if (page == NULL)
2506                 RETURN(-ENOMEM);
2507
2508 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2509         while (TryLockPage(page))
2510                 ___wait_on_page(page);
2511 #else
2512         wait_on_page_locked(page);
2513 #endif
2514
2515         /* XXX with brw vector I/O, we could batch up reads and writes here,
2516          *     all we need to do is allocate multiple pages to handle the I/Os
2517          *     and arrays to handle the request parameters.
2518          */
2519         while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2520                 struct brw_page pg;
2521                 struct obd_brw_set *set;
2522
2523                 set = obd_brw_set_new();
2524                 if (set == NULL) {
2525                         err = -ENOMEM;
2526                         EXIT;
2527                         break;
2528                 }
2529
2530                 pg.pg = page;
2531                 pg.count = PAGE_SIZE;
2532                 pg.off = (page->index) << PAGE_SHIFT;
2533                 pg.flag = 0;
2534
2535                 page->index = index;
2536                 set->brw_callback = ll_brw_sync_wait;
2537                 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
2538                 obd_brw_set_decref(set);
2539                 if (err) {
2540                         EXIT;
2541                         break;
2542                 }
2543
2544                 set = obd_brw_set_new();
2545                 if (set == NULL) {
2546                         err = -ENOMEM;
2547                         EXIT;
2548                         break;
2549                 }
2550                 pg.flag = OBD_BRW_CREATE;
2551                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2552
2553                 set->brw_callback = ll_brw_sync_wait;
2554                 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
2555                 obd_brw_set_decref(set);
2556
2557                 /* XXX should handle dst->o_size, dst->o_blocks here */
2558                 if (err) {
2559                         EXIT;
2560                         break;
2561                 }
2562
2563                 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2564
2565                 index++;
2566         }
2567         dst->o_size = src->o_size;
2568         dst->o_blocks = src->o_blocks;
2569         dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2570         unlock_page(page);
2571         __free_page(page);
2572
2573         RETURN(err);
2574 }
2575
2576 static struct obd_ops filter_obd_ops = {
2577         o_owner:        THIS_MODULE,
2578         o_attach:       filter_attach,
2579         o_detach:       filter_detach,
2580         o_get_info:     filter_get_info,
2581         o_setup:        filter_setup,
2582         o_cleanup:      filter_cleanup,
2583         o_connect:      filter_connect,
2584         o_disconnect:   filter_disconnect,
2585         o_statfs:       filter_statfs,
2586         o_syncfs:       filter_syncfs,
2587         o_getattr:      filter_getattr,
2588         o_create:       filter_create,
2589         o_setattr:      filter_setattr,
2590         o_destroy:      filter_destroy,
2591         o_open:         filter_open,
2592         o_close:        filter_close,
2593         o_brw:          filter_brw,
2594         o_punch:        filter_truncate,
2595         o_preprw:       filter_preprw,
2596         o_commitrw:     filter_commitrw
2597 #if 0
2598         o_san_preprw:  filter_san_preprw,
2599         o_preallocate: filter_preallocate_inodes,
2600         o_migrate:     filter_migrate,
2601         o_copy:        filter_copy_data,
2602         o_iterate:     filter_iterate
2603 #endif
2604 };
2605
2606 static struct obd_ops filter_sanobd_ops = {
2607         o_owner:        THIS_MODULE,
2608         o_attach:       filter_attach,
2609         o_detach:       filter_detach,
2610         o_get_info:     filter_get_info,
2611         o_setup:        filter_san_setup,
2612         o_cleanup:      filter_cleanup,
2613         o_connect:      filter_connect,
2614         o_disconnect:   filter_disconnect,
2615         o_statfs:       filter_statfs,
2616         o_getattr:      filter_getattr,
2617         o_create:       filter_create,
2618         o_setattr:      filter_setattr,
2619         o_destroy:      filter_destroy,
2620         o_open:         filter_open,
2621         o_close:        filter_close,
2622         o_brw:          filter_brw,
2623         o_punch:        filter_truncate,
2624         o_preprw:       filter_preprw,
2625         o_commitrw:     filter_commitrw,
2626         o_san_preprw:   filter_san_preprw,
2627 #if 0
2628         o_preallocate:  filter_preallocate_inodes,
2629         o_migrate:      filter_migrate,
2630         o_copy:         filter_copy_data,
2631         o_iterate:      filter_iterate
2632 #endif
2633 };
2634
2635
2636 static int __init obdfilter_init(void)
2637 {
2638         struct lprocfs_static_vars lvars;
2639         int rc;
2640
2641         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2642         filter_open_cache = kmem_cache_create("ll_filter_fdata",
2643                                               sizeof(struct filter_file_data),
2644                                               0, 0, NULL, NULL);
2645         if (!filter_open_cache)
2646                 RETURN(-ENOMEM);
2647
2648         filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
2649                                         sizeof(struct filter_dentry_data),
2650                                         0, 0, NULL, NULL);
2651         if (!filter_dentry_cache) {
2652                 rc = -ENOMEM;
2653                 goto err1;
2654         }
2655
2656         xprocfs_init ("filter");
2657
2658         lprocfs_init_vars(&lvars);
2659
2660         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2661                                  OBD_FILTER_DEVICENAME);
2662         if (rc)
2663                 goto err2;
2664
2665         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2666                                  OBD_FILTER_SAN_DEVICENAME);
2667         if (rc)
2668                 goto err3;
2669
2670         return 0;
2671 err3:
2672         class_unregister_type(OBD_FILTER_DEVICENAME);
2673 err2:
2674         kmem_cache_destroy(filter_dentry_cache);
2675 err1:
2676         kmem_cache_destroy(filter_open_cache);
2677         return rc;
2678 }
2679
2680 static void __exit obdfilter_exit(void)
2681 {
2682         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2683         class_unregister_type(OBD_FILTER_DEVICENAME);
2684         if (kmem_cache_destroy(filter_dentry_cache))
2685                 CERROR("couldn't free obdfilter dentry cache\n");
2686         if (kmem_cache_destroy(filter_open_cache))
2687                 CERROR("couldn't free obdfilter open cache\n");
2688         xprocfs_fini ();
2689 }
2690
2691 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2692 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2693 MODULE_LICENSE("GPL");
2694
2695 module_init(obdfilter_init);
2696 module_exit(obdfilter_exit);