Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_FILTER
38
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
42 #include <linux/fs.h>
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
54 #endif
55
56 enum {
57         LPROC_FILTER_READS = 0,
58         LPROC_FILTER_READ_BYTES = 1,
59         LPROC_FILTER_WRITES = 2,
60         LPROC_FILTER_WRITE_BYTES = 3,
61         LPROC_FILTER_LAST = LPROC_FILTER_WRITE_BYTES +1
62 };
63
64 /* should be generic per-obd stats... */
65 struct xprocfs_io_stat {
66         __u64    st_read_bytes;
67         __u64    st_read_reqs;
68         __u64    st_write_bytes;
69         __u64    st_write_reqs;
70         __u64    st_getattr_reqs;
71         __u64    st_setattr_reqs;
72         __u64    st_create_reqs;
73         __u64    st_destroy_reqs;
74         __u64    st_statfs_reqs;
75         __u64    st_syncfs_reqs;
76         __u64    st_open_reqs;
77         __u64    st_close_reqs;
78         __u64    st_punch_reqs;
79 };
80
81 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
82 static struct proc_dir_entry *xprocfs_dir;
83
84 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count)                 \
85 do {                                                            \
86         xprocfs_iostats[smp_processor_id()].field += (count);   \
87 } while (0)
88
89 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
90 #define DECLARE_XPROCFS_SUM_STAT(field)                 \
91 static long long                                        \
92 xprocfs_sum_##field (void)                              \
93 {                                                       \
94         long long stat = 0;                             \
95         int       i;                                    \
96                                                         \
97         for (i = 0; i < smp_num_cpus; i++)              \
98                 stat += xprocfs_iostats[i].field;       \
99         return (stat);                                  \
100 }
101
102 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
103 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
104 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
105 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
106 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
107 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
108 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
109 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
110 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
111 DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs)
112 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
113 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
114 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
115 #endif
116
117 static int
118 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
119                  int  *eof, void *data)
120 {
121         long long (*fn)(void) = (long long(*)(void))data;
122         int         len;
123
124         *eof = 1;
125         if (off != 0)
126                 return (0);
127
128         len = snprintf (page, count, "%Ld\n", fn());
129         *start = page;
130         return (len);
131 }
132
133
134 static void
135 xprocfs_add_stat(char *name, long long (*fn)(void))
136 {
137         struct proc_dir_entry *entry;
138
139         entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
140         if (entry == NULL) {
141                 CERROR ("Can't add procfs stat %s\n", name);
142                 return;
143         }
144
145         entry->data = fn;
146         entry->read_proc = xprocfs_rd_stat;
147         entry->write_proc = NULL;
148 }
149
150 static void
151 xprocfs_init (char *name)
152 {
153         char  dirname[64];
154
155         snprintf (dirname, sizeof (dirname), "sys/%s", name);
156
157         xprocfs_dir = proc_mkdir (dirname, NULL);
158         if (xprocfs_dir == NULL) {
159                 CERROR ("Can't make procfs dir %s\n", dirname);
160                 return;
161         }
162
163 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
164         xprocfs_add_stat ("read_bytes",   xprocfs_sum_st_read_bytes);
165         xprocfs_add_stat ("read_reqs",    xprocfs_sum_st_read_reqs);
166         xprocfs_add_stat ("write_bytes",  xprocfs_sum_st_write_bytes);
167         xprocfs_add_stat ("write_reqs",   xprocfs_sum_st_write_reqs);
168         xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
169         xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
170         xprocfs_add_stat ("create_reqs",  xprocfs_sum_st_create_reqs);
171         xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
172         xprocfs_add_stat ("statfs_reqs",  xprocfs_sum_st_statfs_reqs);
173         xprocfs_add_stat ("syncfs_reqs",  xprocfs_sum_st_syncfs_reqs);
174         xprocfs_add_stat ("open_reqs",    xprocfs_sum_st_open_reqs);
175         xprocfs_add_stat ("close_reqs",   xprocfs_sum_st_close_reqs);
176         xprocfs_add_stat ("punch_reqs",   xprocfs_sum_st_punch_reqs);
177 #endif
178 }
179
180 void xprocfs_fini (void)
181 {
182         if (xprocfs_dir == NULL)
183                 return;
184
185         remove_proc_entry ("read_bytes",   xprocfs_dir);
186         remove_proc_entry ("read_reqs",    xprocfs_dir);
187         remove_proc_entry ("write_bytes",  xprocfs_dir);
188         remove_proc_entry ("write_reqs",   xprocfs_dir);
189         remove_proc_entry ("getattr_reqs", xprocfs_dir);
190         remove_proc_entry ("setattr_reqs", xprocfs_dir);
191         remove_proc_entry ("create_reqs",  xprocfs_dir);
192         remove_proc_entry ("destroy_reqs", xprocfs_dir);
193         remove_proc_entry ("statfs_reqs",  xprocfs_dir);
194         remove_proc_entry ("syncfs_reqs",  xprocfs_dir);
195         remove_proc_entry ("open_reqs",    xprocfs_dir);
196         remove_proc_entry ("close_reqs",   xprocfs_dir);
197         remove_proc_entry ("punch_reqs",   xprocfs_dir);
198
199         remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
200         xprocfs_dir = NULL;
201 }
202
203 #define S_SHIFT 12
204 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
205         [0]                     NULL,
206         [S_IFREG >> S_SHIFT]    "R",
207         [S_IFDIR >> S_SHIFT]    "D",
208         [S_IFCHR >> S_SHIFT]    "C",
209         [S_IFBLK >> S_SHIFT]    "B",
210         [S_IFIFO >> S_SHIFT]    "F",
211         [S_IFSOCK >> S_SHIFT]   "S",
212         [S_IFLNK >> S_SHIFT]    "L"
213 };
214
215 static inline const char *obd_mode_to_type(int mode)
216 {
217         return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
218 }
219
220 static void filter_ffd_addref(void *ffdp)
221 {
222         struct filter_file_data *ffd = ffdp;
223
224         atomic_inc(&ffd->ffd_refcount);
225         CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
226                atomic_read(&ffd->ffd_refcount));
227 }
228
229 static struct filter_file_data *filter_ffd_new(void)
230 {
231         struct filter_file_data *ffd;
232
233         OBD_ALLOC(ffd, sizeof *ffd);
234         if (ffd == NULL) {
235                 CERROR("out of memory\n");
236                 return NULL;
237         }
238
239         atomic_set(&ffd->ffd_refcount, 2);
240
241         INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
242         class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
243
244         return ffd;
245 }
246
247 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
248 {
249         struct filter_file_data *ffd = NULL;
250         ENTRY;
251         LASSERT(handle != NULL);
252         ffd = class_handle2object(handle->cookie);
253         if (ffd != NULL)
254                 LASSERT(ffd->ffd_file->private_data == ffd);
255         RETURN(ffd);
256 }
257
258 static void filter_ffd_put(struct filter_file_data *ffd)
259 {
260         CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
261                atomic_read(&ffd->ffd_refcount) - 1);
262         LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
263                 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
264         if (atomic_dec_and_test(&ffd->ffd_refcount)) {
265                 LASSERT(list_empty(&ffd->ffd_handle.h_link));
266                 OBD_FREE(ffd, sizeof *ffd);
267         }
268 }
269
270 static void filter_ffd_destroy(struct filter_file_data *ffd)
271 {
272         class_handle_unhash(&ffd->ffd_handle);
273         filter_ffd_put(ffd);
274 }
275
276 static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
277 {
278         obd_transno_commit_cb(obd, transno, error);
279 }
280 /* Assumes caller has already pushed us into the kernel context. */
281 int filter_finish_transno(struct obd_export *export, void *handle,
282                           struct obd_trans_info *oti, int rc)
283 {
284         __u64 last_rcvd;
285         struct obd_device *obd = export->exp_obd;
286         struct filter_obd *filter = &obd->u.filter;
287         struct filter_export_data *fed = &export->exp_filter_data;
288         struct filter_client_data *fcd = fed->fed_fcd;
289         loff_t off;
290         ssize_t written;
291
292         /* Propagate error code. */
293         if (rc)
294                 RETURN(rc);
295
296         if (!obd->obd_replayable)
297                 RETURN(rc);
298
299         /* we don't allocate new transnos for replayed requests */
300 #if 0
301         /* perhaps if transno already set? or should level be in oti? */
302         if (req->rq_level == LUSTRE_CONN_RECOVD)
303                 GOTO(out, rc = 0);
304 #endif
305
306         off = fed->fed_lr_off;
307
308         spin_lock(&filter->fo_translock);
309         last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
310         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
311         spin_unlock(&filter->fo_translock);
312         if (oti)
313                 oti->oti_transno = last_rcvd;
314         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
315         fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
316
317         /* get this from oti */
318 #if 0
319         if (oti)
320                 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
321         else
322 #else
323         fcd->fcd_last_xid = 0;
324 #endif
325         fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
326         written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
327                                 &off);
328         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
329                LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
330
331         if (written == sizeof(*fcd))
332                 RETURN(0);
333         CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
334         if (written >= 0)
335                 RETURN(-EIO);
336
337         RETURN(written);
338 }
339
340 /* write the pathname into the string */
341 static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
342                        obd_mode mode)
343 {
344         if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
345                 sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
346         else
347                 sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
348                        (int)id & (filter->fo_subdir_count - 1), id);
349
350         return buf;
351 }
352
353 static inline void f_dput(struct dentry *dentry)
354 {
355         /* Can't go inside filter_ddelete because it can block */
356         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
357                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
358         LASSERT(atomic_read(&dentry->d_count) > 0);
359
360         dput(dentry);
361 }
362
363 /* Not racy w.r.t. others, because we are the only user of this dentry */
364 static void filter_drelease(struct dentry *dentry)
365 {
366         if (dentry->d_fsdata)
367                 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
368 }
369
370 struct dentry_operations filter_dops = {
371         .d_release = filter_drelease,
372 };
373
374 #define LAST_RCVD "last_rcvd"
375 #define INIT_OBJID 2
376
377 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
378 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
379 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
380
381 /* Add client data to the FILTER.  We use a bitmap to locate a free space
382  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
383  * Otherwise, we have just read the data from the last_rcvd file and
384  * we know its offset.
385  */
386 int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
387                       struct filter_export_data *fed, int cl_idx)
388 {
389         unsigned long *bitmap = filter->fo_last_rcvd_slots;
390         int new_client = (cl_idx == -1);
391
392         LASSERT(bitmap != NULL);
393
394         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
395         if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
396                 RETURN(0);
397
398         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
399          * there's no need for extra complication here
400          */
401         if (new_client) {
402                 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
403         repeat:
404                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
405                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
406                         return -ENOMEM;
407                 }
408                 if (test_and_set_bit(cl_idx, bitmap)) {
409                         CERROR("FILTER client %d: found bit is set in bitmap\n",
410                                cl_idx);
411                         cl_idx = find_next_zero_bit(bitmap,
412                                                     FILTER_LR_MAX_CLIENTS,
413                                                     cl_idx);
414                         goto repeat;
415                 }
416         } else {
417                 if (test_and_set_bit(cl_idx, bitmap)) {
418                         CERROR("FILTER client %d: bit already set in bitmap!\n",
419                                cl_idx);
420                         LBUG();
421                 }
422         }
423
424         fed->fed_lr_idx = cl_idx;
425         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
426                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
427
428         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
429                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
430
431         if (new_client) {
432                 struct obd_run_ctxt saved;
433                 loff_t off = fed->fed_lr_off;
434                 ssize_t written;
435                 void *handle;
436
437                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
438                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
439
440                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
441                 /* Transaction eeded to fix for bug 1403 */
442                 handle = fsfilt_start(obd,
443                                       filter->fo_rcvd_filp->f_dentry->d_inode,
444                                       FSFILT_OP_SETATTR);
445                 if (IS_ERR(handle)) {
446                         written = PTR_ERR(handle);
447                         CERROR("unable to start transaction: rc %d\n",
448                                (int)written);
449                 } else {
450                         written = lustre_fwrite(filter->fo_rcvd_filp,
451                                                 (char *)fed->fed_fcd,
452                                                 sizeof(*fed->fed_fcd), &off);
453                         fsfilt_commit(obd,
454                                       filter->fo_rcvd_filp->f_dentry->d_inode,
455                                       handle, 0);
456                 }
457                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
458
459                 if (written != sizeof(*fed->fed_fcd)) {
460                         if (written < 0)
461                                 RETURN(written);
462                         RETURN(-EIO);
463                 }
464         }
465         return 0;
466 }
467
468 int filter_client_free(struct obd_export *exp, int failover)
469 {
470         struct filter_export_data *fed = &exp->exp_filter_data;
471         struct filter_obd *filter = &exp->exp_obd->u.filter;
472         struct filter_client_data zero_fcd;
473         struct obd_run_ctxt saved;
474         int written;
475         loff_t off;
476
477         if (!fed->fed_fcd)
478                 RETURN(0);
479
480         if (failover != 0) {
481                 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
482                 RETURN(0);
483         }
484
485         LASSERT(filter->fo_last_rcvd_slots != NULL);
486
487         off = fed->fed_lr_off;
488
489         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
490                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
491
492         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
493                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
494                        fed->fed_lr_idx);
495                 LBUG();
496         }
497
498         memset(&zero_fcd, 0, sizeof zero_fcd);
499         push_ctxt(&saved, &filter->fo_ctxt, NULL);
500         written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
501                                 sizeof(zero_fcd), &off);
502
503         /* XXX: this write gets lost sometimes, unless this sync is here. */
504         if (written > 0)
505                 file_fsync(filter->fo_rcvd_filp,
506                            filter->fo_rcvd_filp->f_dentry, 1);
507         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
508
509         if (written != sizeof(zero_fcd)) {
510                 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
511                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
512                        LAST_RCVD, written);
513         } else {
514                 CDEBUG(D_INFO,
515                        "zeroed disconnecting client %s at idx %u (%llu)\n",
516                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
517         }
518
519         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
520
521         return 0;
522 }
523
524 static int filter_free_server_data(struct filter_obd *filter)
525 {
526         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
527         filter->fo_fsd = NULL;
528         OBD_FREE(filter->fo_last_rcvd_slots,
529                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
530         filter->fo_last_rcvd_slots = NULL;
531         return 0;
532 }
533
534
535 /* assumes caller is already in kernel ctxt */
536 static int filter_update_server_data(struct file *filp,
537                                      struct filter_server_data *fsd)
538 {
539         loff_t off = 0;
540         int rc;
541
542         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
543         CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
544                le64_to_cpu(fsd->fsd_last_objid));
545         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
546                le64_to_cpu(fsd->fsd_last_rcvd));
547         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
548                le64_to_cpu(fsd->fsd_mount_count));
549
550         rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
551         if (rc != sizeof(*fsd)) {
552                 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
553                        rc);
554                 RETURN(-EIO);
555         }
556         RETURN(0);
557 }
558
559 /* assumes caller has already in kernel ctxt */
560 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
561                                    __u64 init_lastobjid)
562 {
563         struct filter_obd *filter = &obd->u.filter;
564         struct filter_server_data *fsd;
565         struct filter_client_data *fcd = NULL;
566         struct inode *inode = filp->f_dentry->d_inode;
567         unsigned long last_rcvd_size = inode->i_size;
568         __u64 mount_count = 0;
569         int cl_idx;
570         loff_t off = 0;
571         int rc;
572
573         /* ensure padding in the struct is the correct size */
574         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
575                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
576         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
577                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
578
579         OBD_ALLOC(fsd, sizeof(*fsd));
580         if (!fsd)
581                 RETURN(-ENOMEM);
582         filter->fo_fsd = fsd;
583
584         OBD_ALLOC(filter->fo_last_rcvd_slots,
585                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
586         if (filter->fo_last_rcvd_slots == NULL) {
587                 OBD_FREE(fsd, sizeof(*fsd));
588                 RETURN(-ENOMEM);
589         }
590
591         if (last_rcvd_size == 0) {
592                 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
593
594                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
595                 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
596                 fsd->fsd_last_rcvd = 0;
597                 mount_count = fsd->fsd_mount_count = 0;
598                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
599                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
600                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
601                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
602                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
603         } else {
604                 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
605                                               &off);
606                 if (retval != sizeof(*fsd)) {
607                         CDEBUG(D_INODE,"OBD filter: error reading %s\n",
608                                LAST_RCVD);
609                         GOTO(err_fsd, rc = -EIO);
610                 }
611                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
612                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
613         }
614
615         if (fsd->fsd_feature_incompat) {
616                 CERROR("unsupported feature %x\n",
617                        le32_to_cpu(fsd->fsd_feature_incompat));
618                 GOTO(err_fsd, rc = -EINVAL);
619         }
620         if (fsd->fsd_feature_rocompat) {
621                 CERROR("read-only feature %x\n",
622                        le32_to_cpu(fsd->fsd_feature_rocompat));
623                 /* Do something like remount filesystem read-only */
624                 GOTO(err_fsd, rc = -EINVAL);
625         }
626
627         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
628                obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
629         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
630                obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
631         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
632                obd->obd_name, mount_count);
633         CDEBUG(D_INODE, "%s: server data size: %u\n",
634                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
635         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
636                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
637         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
638                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
639         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
640                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
641
642         /*
643          * When we do a clean FILTER shutdown, we save the last_rcvd into
644          * the header.  If we find clients with higher last_rcvd values
645          * then those clients may need recovery done.
646          */
647         if (!obd->obd_replayable) {
648                 CERROR("%s: recovery support OFF\n", obd->obd_name);
649                 GOTO(out, rc = 0);
650         }
651
652         for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
653                 __u64 last_rcvd;
654                 int mount_age;
655
656                 if (!fcd) {
657                         OBD_ALLOC(fcd, sizeof(*fcd));
658                         if (!fcd)
659                                 GOTO(err_fsd, rc = -ENOMEM);
660                 }
661
662                 /* Don't assume off is incremented properly, in case
663                  * sizeof(fsd) isn't the same as fsd->fsd_client_size.
664                  */
665                 off = le32_to_cpu(fsd->fsd_client_start) +
666                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
667                 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
668                 if (rc != sizeof(*fcd)) {
669                         CERROR("error reading FILTER %s offset %d: rc = %d\n",
670                                LAST_RCVD, cl_idx, rc);
671                         if (rc > 0) /* XXX fatal error or just abort reading? */
672                                 rc = -EIO;
673                         break;
674                 }
675
676                 if (fcd->fcd_uuid[0] == '\0') {
677                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
678                                cl_idx);
679                         continue;
680                 }
681
682                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
683
684                 /* These exports are cleaned up by filter_disconnect(), so they
685                  * need to be set up like real exports as filter_connect() does.
686                  */
687                 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
688                 if (mount_age < FILTER_MOUNT_RECOV) {
689                         struct obd_export *exp = class_new_export(obd);
690                         struct filter_export_data *fed;
691                         CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
692                                " srv lr: "LPU64" mnt: "LPU64" last mount: "
693                                LPU64"\n", fcd->fcd_uuid, cl_idx,
694                                last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
695                                le64_to_cpu(fcd->fcd_mount_count), mount_count);
696                         if (exp == NULL) {
697                                 /* XXX this rc is ignored  */
698                                 rc = -ENOMEM;
699                                 break;
700                         }
701                         memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
702                                sizeof exp->exp_client_uuid.uuid);
703                         fed = &exp->exp_filter_data;
704                         fed->fed_fcd = fcd;
705                         filter_client_add(obd, filter, fed, cl_idx);
706                         /* create helper if export init gets more complex */
707                         INIT_LIST_HEAD(&fed->fed_open_head);
708                         spin_lock_init(&fed->fed_lock);
709
710                         fcd = NULL;
711                         obd->obd_recoverable_clients++;
712                         class_export_put(exp);
713                 } else {
714                         CDEBUG(D_INFO,
715                                "discarded client %d UUID '%s' count "LPU64"\n",
716                                cl_idx, fcd->fcd_uuid,
717                                le64_to_cpu(fcd->fcd_mount_count));
718                 }
719
720                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
721                        cl_idx, last_rcvd);
722
723                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
724                         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
725
726                 obd->obd_last_committed =
727                         le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
728                 if (obd->obd_recoverable_clients) {
729                         CERROR("RECOVERY: %d recoverable clients, last_rcvd "
730                                LPU64"\n", obd->obd_recoverable_clients,
731                                le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
732                         obd->obd_next_recovery_transno =
733                                 obd->obd_last_committed + 1;
734                         obd->obd_recovering = 1;
735                 }
736
737         }
738
739         if (fcd)
740                 OBD_FREE(fcd, sizeof(*fcd));
741
742 out:
743         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
744
745         /* save it,so mount count and last_recvd is current */
746         rc = filter_update_server_data(filp, filter->fo_fsd);
747
748         RETURN(rc);
749
750 err_fsd:
751         filter_free_server_data(filter);
752         RETURN(rc);
753 }
754
755 /* setup the object store with correct subdirectories */
756 static int filter_prep(struct obd_device *obd)
757 {
758         struct obd_run_ctxt saved;
759         struct filter_obd *filter = &obd->u.filter;
760         struct dentry *dentry, *O_dentry;
761         struct file *file;
762         struct inode *inode;
763         int i;
764         int rc = 0;
765         int mode = 0;
766
767         push_ctxt(&saved, &filter->fo_ctxt, NULL);
768         dentry = simple_mkdir(current->fs->pwd, "O", 0700);
769         CDEBUG(D_INODE, "got/created O: %p\n", dentry);
770         if (IS_ERR(dentry)) {
771                 rc = PTR_ERR(dentry);
772                 CERROR("cannot open/create O: rc = %d\n", rc);
773                 GOTO(out, rc);
774         }
775         filter->fo_dentry_O = dentry;
776
777         /*
778          * Create directories and/or get dentries for each object type.
779          * This saves us from having to do multiple lookups for each one.
780          */
781         O_dentry = filter->fo_dentry_O;
782         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
783                 char *name = obd_type_by_mode[mode];
784
785                 if (!name) {
786                         filter->fo_dentry_O_mode[mode] = NULL;
787                         continue;
788                 }
789                 dentry = simple_mkdir(O_dentry, name, 0700);
790                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
791                 if (IS_ERR(dentry)) {
792                         rc = PTR_ERR(dentry);
793                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
794                         GOTO(err_O_mode, rc);
795                 }
796                 filter->fo_dentry_O_mode[mode] = dentry;
797         }
798
799         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
800         if (!file || IS_ERR(file)) {
801                 rc = PTR_ERR(file);
802                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
803                        LAST_RCVD, rc);
804                 GOTO(err_O_mode, rc);
805         }
806
807         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
808                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
809                        file->f_dentry->d_inode->i_mode);
810                 GOTO(err_filp, rc = -ENOENT);
811         }
812
813         rc = fsfilt_journal_data(obd, file);
814         if (rc) {
815                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
816                 GOTO(err_filp, rc);
817         }
818         /* steal operations */
819         inode = file->f_dentry->d_inode;
820         filter->fo_fop = file->f_op;
821         filter->fo_iop = inode->i_op;
822         filter->fo_aops = inode->i_mapping->a_ops;
823
824         rc = filter_init_server_data(obd, file, INIT_OBJID);
825         if (rc) {
826                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
827                 GOTO(err_client, rc);
828         }
829         filter->fo_rcvd_filp = file;
830
831         if (filter->fo_subdir_count) {
832                 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
833                 OBD_ALLOC(filter->fo_dentry_O_sub,
834                           filter->fo_subdir_count * sizeof(dentry));
835                 if (!filter->fo_dentry_O_sub)
836                         GOTO(err_client, rc = -ENOMEM);
837
838                 for (i = 0; i < filter->fo_subdir_count; i++) {
839                         char dir[20];
840                         snprintf(dir, sizeof(dir), "d%u", i);
841
842                         dentry = simple_mkdir(O_dentry, dir, 0700);
843                         CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
844                         if (IS_ERR(dentry)) {
845                                 rc = PTR_ERR(dentry);
846                                 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
847                                 GOTO(err_O_sub, rc);
848                         }
849                         filter->fo_dentry_O_sub[i] = dentry;
850                 }
851         }
852         rc = 0;
853  out:
854         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
855
856         return(rc);
857
858 err_O_sub:
859         while (i-- > 0) {
860                 struct dentry *dentry = filter->fo_dentry_O_sub[i];
861                 if (dentry) {
862                         f_dput(dentry);
863                         filter->fo_dentry_O_sub[i] = NULL;
864                 }
865         }
866         OBD_FREE(filter->fo_dentry_O_sub,
867                  filter->fo_subdir_count * sizeof(dentry));
868 err_client:
869         class_disconnect_exports(obd, 0);
870 err_filp:
871         if (filp_close(file, 0))
872                 CERROR("can't close %s after error\n", LAST_RCVD);
873         filter->fo_rcvd_filp = NULL;
874 err_O_mode:
875         while (mode-- > 0) {
876                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
877                 if (dentry) {
878                         f_dput(dentry);
879                         filter->fo_dentry_O_mode[mode] = NULL;
880                 }
881         }
882         f_dput(filter->fo_dentry_O);
883         filter->fo_dentry_O = NULL;
884         goto out;
885 }
886
887 /* cleanup the filter: write last used object id to status file */
888 static void filter_post(struct obd_device *obd)
889 {
890         struct obd_run_ctxt saved;
891         struct filter_obd *filter = &obd->u.filter;
892         long rc;
893         int mode;
894
895         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
896          * best to start a transaction with h_sync, because we removed this
897          * from lastobjid */
898
899         push_ctxt(&saved, &filter->fo_ctxt, NULL);
900         rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
901         if (rc)
902                 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
903
904
905         if (filter->fo_rcvd_filp) {
906                 rc = file_fsync(filter->fo_rcvd_filp,
907                                 filter->fo_rcvd_filp->f_dentry, 1);
908                 filp_close(filter->fo_rcvd_filp, 0);
909                 filter->fo_rcvd_filp = NULL;
910                 if (rc)
911                         CERROR("last_rcvd file won't closed rc = %ld\n", rc);
912         }
913
914         if (filter->fo_subdir_count) {
915                 int i;
916                 for (i = 0; i < filter->fo_subdir_count; i++) {
917                         struct dentry *dentry = filter->fo_dentry_O_sub[i];
918                         f_dput(dentry);
919                         filter->fo_dentry_O_sub[i] = NULL;
920                 }
921                 OBD_FREE(filter->fo_dentry_O_sub,
922                          filter->fo_subdir_count *
923                          sizeof(*filter->fo_dentry_O_sub));
924         }
925         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
926                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
927                 if (dentry) {
928                         f_dput(dentry);
929                         filter->fo_dentry_O_mode[mode] = NULL;
930                 }
931         }
932         f_dput(filter->fo_dentry_O);
933         filter_free_server_data(filter);
934         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
935 }
936
937
938 static __u64 filter_next_id(struct obd_device *obd)
939 {
940         obd_id id;
941         LASSERT(obd->u.filter.fo_fsd != NULL);
942
943         spin_lock(&obd->u.filter.fo_objidlock);
944         id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid);
945         obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
946         spin_unlock(&obd->u.filter.fo_objidlock);
947
948         return id;
949 }
950
951 /* how to get files, dentries, inodes from object id's */
952 /* parent i_sem is already held if needed for exclusivity */
953 static struct dentry *filter_fid2dentry(struct obd_device *obd,
954                                         struct dentry *dparent,
955                                         __u64 id, int lockit)
956 {
957         struct super_block *sb = obd->u.filter.fo_sb;
958         struct dentry *dchild;
959         char name[32];
960         int len;
961         ENTRY;
962
963         if (!sb || !sb->s_dev) {
964                 CERROR("fatal: device not initialized.\n");
965                 RETURN(ERR_PTR(-ENXIO));
966         }
967
968         if (id == 0) {
969                 CERROR("fatal: invalid object id 0\n");
970                 LBUG();
971                 RETURN(ERR_PTR(-ESTALE));
972         }
973
974         len = sprintf(name, LPU64, id);
975         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
976                dparent->d_name.len, dparent->d_name.name, name);
977         if (lockit)
978                 down(&dparent->d_inode->i_sem);
979         dchild = lookup_one_len(name, dparent, len);
980         if (lockit)
981                 up(&dparent->d_inode->i_sem);
982         if (IS_ERR(dchild)) {
983                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
984                 RETURN(dchild);
985         }
986
987         CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
988                dparent->d_name.len, dparent->d_name.name, name, dchild,
989                atomic_read(&dchild->d_count));
990
991         LASSERT(atomic_read(&dchild->d_count) > 0);
992
993         RETURN(dchild);
994 }
995
996 /* direct cut-n-paste of mds_blocking_ast() */
997 int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
998                      void *data, int flag)
999 {
1000         int do_ast;
1001         ENTRY;
1002
1003         if (flag == LDLM_CB_CANCELING) {
1004                 /* Don't need to do anything here. */
1005                 RETURN(0);
1006         }
1007
1008         /* XXX layering violation!  -phil */
1009         l_lock(&lock->l_resource->lr_namespace->ns_lock);
1010         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
1011          * such that mds_blocking_ast is called just before l_i_p takes the
1012          * ns_lock, then by the time we get the lock, we might not be the
1013          * correct blocking function anymore.  So check, and return early, if
1014          * so. */
1015         if (lock->l_blocking_ast != filter_blocking_ast) {
1016                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
1017                 RETURN(0);
1018         }
1019
1020         lock->l_flags |= LDLM_FL_CBPENDING;
1021         do_ast = (!lock->l_readers && !lock->l_writers);
1022         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
1023
1024         if (do_ast) {
1025                 struct lustre_handle lockh;
1026                 int rc;
1027
1028                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
1029                 ldlm_lock2handle(lock, &lockh);
1030                 rc = ldlm_cli_cancel(&lockh);
1031                 if (rc < 0)
1032                         CERROR("ldlm_cli_cancel: %d\n", rc);
1033         } else {
1034                 LDLM_DEBUG(lock, "Lock still has references, will be "
1035                            "cancelled later");
1036         }
1037         RETURN(0);
1038 }
1039
1040 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
1041                               int lock_mode, struct lustre_handle *lockh)
1042 {
1043         struct ldlm_res_id res_id = { .name = {0} };
1044         int flags = 0, rc;
1045         ENTRY;
1046
1047         res_id.name[0] = de->d_inode->i_ino;
1048         res_id.name[1] = de->d_inode->i_generation;
1049         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1050                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
1051                               &flags, ldlm_completion_ast,
1052                               filter_blocking_ast, NULL, lockh);
1053
1054         RETURN(rc == ELDLM_OK ? 0 : -ENOLCK);  /* XXX translate ldlm code */
1055 }
1056
1057 static inline struct dentry *filter_parent(struct obd_device *obd,
1058                                            obd_mode mode, obd_id objid)
1059 {
1060         struct filter_obd *filter = &obd->u.filter;
1061
1062         LASSERT(S_ISREG(mode));   /* only regular files for now */
1063         if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
1064                 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
1065
1066         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
1067 }
1068
1069 static inline struct dentry *filter_parent_lock(struct obd_device *obd,
1070                                                 obd_mode mode, obd_id objid,
1071                                                 int lock_mode,
1072                                                 struct lustre_handle *lockh)
1073 {
1074         struct dentry *de = filter_parent(obd, mode, objid);
1075         int rc;
1076
1077         if (IS_ERR(de))
1078                 return de;
1079
1080         rc = filter_lock_dentry(obd, de, lock_mode, lockh);
1081         return rc ? ERR_PTR(rc) : de;
1082 }
1083
1084 static struct file *filter_obj_open(struct obd_export *export,
1085                                     __u64 id, __u32 type, int parent_mode,
1086                                     struct lustre_handle *parent_lockh)
1087 {
1088         struct obd_device *obd = export->exp_obd;
1089         struct filter_obd *filter = &obd->u.filter;
1090         struct super_block *sb = filter->fo_sb;
1091         struct dentry *dchild = NULL,  *parent;
1092         struct filter_export_data *fed = &export->exp_filter_data;
1093         struct filter_dentry_data *fdd = NULL;
1094         struct filter_file_data *ffd = NULL;
1095         struct obd_run_ctxt saved;
1096         char name[24];
1097         struct file *file;
1098         int len, cleanup_phase = 0;
1099         ENTRY;
1100
1101         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1102
1103         if (!sb || !sb->s_dev) {
1104                 CERROR("fatal: device not initialized.\n");
1105                 GOTO(cleanup, file = ERR_PTR(-ENXIO));
1106         }
1107
1108         if (!id) {
1109                 CERROR("fatal: invalid obdo "LPU64"\n", id);
1110                 GOTO(cleanup, file = ERR_PTR(-ESTALE));
1111         }
1112
1113         if (!(type & S_IFMT)) {
1114                 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
1115                        __FUNCTION__, id, type);
1116                 GOTO(cleanup, file = ERR_PTR(-EINVAL));
1117         }
1118
1119         ffd = filter_ffd_new();
1120         if (ffd == NULL) {
1121                 CERROR("obdfilter: out of memory\n");
1122                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1123         }
1124
1125         cleanup_phase = 1;
1126
1127         /* We preallocate this to avoid blocking while holding fo_fddlock */
1128         OBD_ALLOC(fdd, sizeof *fdd);
1129         if (fdd == NULL) {
1130                 CERROR("obdfilter: out of memory\n");
1131                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1132         }
1133
1134         cleanup_phase = 2;
1135
1136         parent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1137         if (IS_ERR(parent))
1138                 GOTO(cleanup, file = (void *)parent);
1139
1140         cleanup_phase = 3;
1141
1142         len = snprintf(name, sizeof(name), LPU64, id);
1143         dchild = lookup_one_len(name, parent, len);
1144         if (IS_ERR(dchild))
1145                 GOTO(cleanup, file = (void *)dchild);
1146         LASSERT(dchild->d_inode);
1147
1148         cleanup_phase = 4;
1149
1150         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
1151         mntget(filter->fo_vfsmnt);
1152         file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1153         if (IS_ERR(file)) {
1154                 dchild = NULL; /* prevent a double dput in step 4 */
1155                 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1156                 GOTO(cleanup, file);
1157         }
1158
1159         spin_lock(&filter->fo_fddlock);
1160         if (dchild->d_fsdata) {
1161                 spin_unlock(&filter->fo_fddlock);
1162                 OBD_FREE(fdd, sizeof *fdd);
1163                 fdd = dchild->d_fsdata;
1164                 /* should only happen during client recovery */
1165                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1166                         CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1167                 atomic_inc(&fdd->fdd_open_count);
1168         } else {
1169                 atomic_set(&fdd->fdd_open_count, 1);
1170                 fdd->fdd_flags = 0;
1171                 fdd->fdd_objid = id;
1172                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1173                 dchild->d_fsdata = fdd;
1174                 spin_unlock(&filter->fo_fddlock);
1175         }
1176
1177         ffd->ffd_file = file;
1178         LASSERT(file->private_data == NULL);
1179         file->private_data = ffd;
1180
1181         if (!dchild->d_op)
1182                 dchild->d_op = &filter_dops;
1183         else
1184                 LASSERT(dchild->d_op == &filter_dops);
1185
1186         spin_lock(&fed->fed_lock);
1187         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1188         spin_unlock(&fed->fed_lock);
1189
1190         CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1191 cleanup:
1192         switch (cleanup_phase) {
1193         case 4:
1194                 if (IS_ERR(file))
1195                         l_dput(dchild);
1196         case 3:
1197                 if (IS_ERR(file))
1198                         ldlm_lock_decref(parent_lockh, parent_mode);
1199         case 2:
1200                 if (IS_ERR(file))
1201                         OBD_FREE(fdd, sizeof *fdd);
1202         case 1:
1203                 if (IS_ERR(file))
1204                         filter_ffd_destroy(ffd);
1205                 filter_ffd_put(ffd);
1206         case 0:
1207                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1208         }
1209         RETURN(file);
1210 }
1211
1212 /* Caller must hold i_sem on dir_dentry->d_inode */
1213 /* Caller must push us into kernel context */
1214 static int filter_destroy_internal(struct obd_device *obd,
1215                                    struct dentry *dir_dentry,
1216                                    struct dentry *object_dentry)
1217 {
1218         struct inode *inode = object_dentry->d_inode;
1219         int rc;
1220         ENTRY;
1221
1222         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1223                 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1224                        object_dentry->d_name.len,
1225                        object_dentry->d_name.name,
1226                        inode->i_nlink, atomic_read(&inode->i_count));
1227         }
1228
1229         rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
1230
1231         if (rc)
1232                 CERROR("error unlinking objid %*s: rc %d\n",
1233                        object_dentry->d_name.len,
1234                        object_dentry->d_name.name, rc);
1235
1236         RETURN(rc);
1237 }
1238
1239 /* If closing because we are failing this device, then
1240    don't do the unlink on close.
1241 */
1242 static int filter_close_internal(struct obd_export *export,
1243                                  struct filter_file_data *ffd,
1244                                  struct obd_trans_info *oti,
1245                                  int failover)
1246 {
1247         struct obd_device *obd = export->exp_obd;
1248         struct filter_obd *filter = &obd->u.filter;
1249         struct file *filp = ffd->ffd_file;
1250         struct dentry *object_dentry = dget(filp->f_dentry);
1251         struct filter_dentry_data *fdd = object_dentry->d_fsdata;
1252         struct lustre_handle parent_lockh;
1253         int rc, rc2, cleanup_phase = 0;
1254         struct dentry *dir_dentry;
1255         struct obd_run_ctxt saved;
1256         ENTRY;
1257
1258         LASSERT(filp->private_data == ffd);
1259         LASSERT(fdd);
1260
1261         rc = filp_close(filp, 0);
1262
1263         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1264             fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
1265                 void *handle;
1266
1267                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1268                 cleanup_phase = 1;
1269
1270                 dir_dentry = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1271                                                 LCK_PW, &parent_lockh);
1272                 if (IS_ERR(dir_dentry))
1273                         GOTO(cleanup, rc = PTR_ERR(dir_dentry));
1274                 cleanup_phase = 2;
1275
1276                 handle = fsfilt_start(obd, dir_dentry->d_inode,
1277                                       FSFILT_OP_UNLINK);
1278                 if (IS_ERR(handle))
1279                         GOTO(cleanup, rc = PTR_ERR(handle));
1280
1281                 /* XXX unlink from PENDING directory now too */
1282                 rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
1283                 if (rc2 && !rc)
1284                         rc = rc2;
1285                 rc = filter_finish_transno(export, handle, oti, rc);
1286                 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
1287                 if (rc2) {
1288                         CERROR("error on commit, err = %d\n", rc2);
1289                         if (!rc)
1290                                 rc = rc2;
1291                 }
1292         }
1293
1294 cleanup:
1295         switch(cleanup_phase) {
1296         case 2:
1297                 if (rc || oti == NULL) {
1298                         ldlm_lock_decref(&parent_lockh, LCK_PW);
1299                 } else {
1300                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1301                                sizeof(parent_lockh));
1302                         oti->oti_ack_locks[0].mode = LCK_PW;
1303                 }
1304         case 1:
1305                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1306         case 0:
1307                 f_dput(object_dentry);
1308                 filter_ffd_destroy(ffd);
1309                 break;
1310         default:
1311                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1312                 LBUG();
1313         }
1314
1315         RETURN(rc);
1316 }
1317
1318 /* obd methods */
1319 /* mount the file system (secretly) */
1320 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1321                                char *option)
1322 {
1323         struct obd_ioctl_data* data = buf;
1324         struct filter_obd *filter;
1325         struct vfsmount *mnt;
1326         int rc = 0;
1327         ENTRY;
1328
1329         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1330                 RETURN(-EINVAL);
1331
1332         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1333         if (IS_ERR(obd->obd_fsops))
1334                 RETURN(PTR_ERR(obd->obd_fsops));
1335
1336         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1337         rc = PTR_ERR(mnt);
1338         if (IS_ERR(mnt))
1339                 GOTO(err_ops, rc);
1340
1341         if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1342                 if (*data->ioc_inlbuf3 == 'f') {
1343                         obd->obd_replayable = 1;
1344                         obd_sync_filter = 1;
1345                         CERROR("%s: configured for recovery and sync write\n",
1346                                obd->obd_name);
1347                 } else {
1348                         CERROR("unrecognised flag '%c'\n",
1349                                *data->ioc_inlbuf3);
1350                 }
1351         }
1352
1353         filter = &obd->u.filter;
1354         filter->fo_vfsmnt = mnt;
1355         filter->fo_fstype = strdup(data->ioc_inlbuf2);
1356         filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
1357         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1358
1359         OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1360         filter->fo_ctxt.pwdmnt = mnt;
1361         filter->fo_ctxt.pwd = mnt->mnt_root;
1362         filter->fo_ctxt.fs = get_ds();
1363
1364         rc = filter_prep(obd);
1365         if (rc)
1366                 GOTO(err_kfree, rc);
1367
1368         spin_lock_init(&filter->fo_translock);
1369         spin_lock_init(&filter->fo_fddlock);
1370         spin_lock_init(&filter->fo_objidlock);
1371         INIT_LIST_HEAD(&filter->fo_export_list);
1372
1373         obd->obd_namespace =
1374                 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
1375         if (!obd->obd_namespace)
1376                 GOTO(err_post, rc = -ENOMEM);
1377
1378         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1379                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1380
1381         RETURN(0);
1382
1383 err_post:
1384         filter_post(obd);
1385 err_kfree:
1386         kfree(filter->fo_fstype);
1387         unlock_kernel();
1388         mntput(filter->fo_vfsmnt);
1389         filter->fo_sb = 0;
1390         lock_kernel();
1391 err_ops:
1392         fsfilt_put_ops(obd->obd_fsops);
1393         return rc;
1394 }
1395
1396 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1397 {
1398         struct obd_ioctl_data* data = buf;
1399         char *option = NULL;
1400
1401         if (!strcmp(data->ioc_inlbuf2, "ext3"))
1402                 option = "asyncdel";
1403
1404         return filter_common_setup(obd, len, buf, option);
1405 }
1406
1407 /* sanobd setup methods - use a specific mount option */
1408 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1409 {
1410         struct obd_ioctl_data* data = buf;
1411         char *option = NULL;
1412
1413         if (!data->ioc_inlbuf2)
1414                 RETURN(-EINVAL);
1415
1416         /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1417         if (!strcmp(data->ioc_inlbuf2, "extN"))
1418                 option = "data=writeback";
1419         else if (!strcmp(data->ioc_inlbuf2, "ext3"))
1420                 option = "data=writeback,asyncdel";
1421         else
1422                 LBUG(); /* just a reminder */
1423
1424         return filter_common_setup(obd, len, buf, option);
1425 }
1426
1427 static int filter_cleanup(struct obd_device *obd, int force, int failover)
1428 {
1429         struct super_block *sb;
1430         ENTRY;
1431
1432         if (failover)
1433                 CERROR("%s: shutting down for failover; client state will"
1434                        " be preserved.\n", obd->obd_name);
1435
1436         if (!list_empty(&obd->obd_exports)) {
1437                 CERROR("%s: still has clients!\n", obd->obd_name);
1438                 class_disconnect_exports(obd, failover);
1439                 if (!list_empty(&obd->obd_exports)) {
1440                         CERROR("still has exports after forced cleanup?\n");
1441                         RETURN(-EBUSY);
1442                 }
1443         }
1444
1445         ldlm_namespace_free(obd->obd_namespace);
1446
1447         sb = obd->u.filter.fo_sb;
1448         if (!obd->u.filter.fo_sb)
1449                 RETURN(0);
1450
1451         filter_post(obd);
1452
1453         shrink_dcache_parent(sb->s_root);
1454         unlock_kernel();
1455
1456         if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
1457                 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1458                        atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
1459         }
1460
1461         mntput(obd->u.filter.fo_vfsmnt);
1462         obd->u.filter.fo_sb = 0;
1463 /*        destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
1464
1465         kfree(obd->u.filter.fo_fstype);
1466         fsfilt_put_ops(obd->obd_fsops);
1467
1468         lock_kernel();
1469
1470         RETURN(0);
1471 }
1472
1473 int filter_attach(struct obd_device *dev, obd_count len, void *data)
1474 {
1475         struct lprocfs_static_vars lvars;
1476         struct lprocfs_counters* cntrs;
1477         int rc;
1478
1479         lprocfs_init_vars(&lvars);
1480         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
1481         if (rc != 0)
1482                 return rc;
1483
1484         rc = lprocfs_alloc_obd_counters(dev, LPROC_FILTER_LAST);
1485         if (rc != 0)
1486                 return rc;
1487
1488         /* Init obdfilter private counters here */
1489         cntrs = dev->counters;
1490         LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_READS],
1491                              0, NULL, "read", "reqs");
1492         LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_READ_BYTES],
1493                              LPROCFS_CNTR_AVGMINMAX,
1494                              NULL, "read_bytes", "bytes");
1495         LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_WRITES],
1496                              0, NULL, "write", "reqs");
1497
1498         LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_WRITE_BYTES],
1499                              LPROCFS_CNTR_AVGMINMAX,
1500                              NULL, "write_bytes", "bytes");
1501         return rc;
1502 }
1503
1504 int filter_detach(struct obd_device *dev)
1505 {
1506         lprocfs_free_obd_counters(dev);
1507         return lprocfs_obd_detach(dev);
1508 }
1509
1510 /* nearly identical to mds_connect */
1511 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1512                           struct obd_uuid *cluuid)
1513 {
1514         struct obd_export *exp;
1515         struct filter_export_data *fed;
1516         struct filter_client_data *fcd;
1517         struct filter_obd *filter = &obd->u.filter;
1518         int rc;
1519
1520         ENTRY;
1521
1522         if (!conn || !obd || !cluuid)
1523                 RETURN(-EINVAL);
1524
1525         rc = class_connect(conn, obd, cluuid);
1526         if (rc)
1527                 RETURN(rc);
1528         exp = class_conn2export(conn);
1529         LASSERT(exp);
1530
1531         fed = &exp->exp_filter_data;
1532         class_export_put(exp);
1533
1534         INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
1535         spin_lock_init(&exp->exp_filter_data.fed_lock);
1536
1537         if (!obd->obd_replayable)
1538                 RETURN(0);
1539
1540         OBD_ALLOC(fcd, sizeof(*fcd));
1541         if (!fcd) {
1542                 CERROR("filter: out of memory for client data\n");
1543                 GOTO(out_export, rc = -ENOMEM);
1544         }
1545
1546         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1547         fed->fed_fcd = fcd;
1548         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1549
1550         rc = filter_client_add(obd, filter, fed, -1);
1551         if (rc)
1552                 GOTO(out_fcd, rc);
1553
1554         RETURN(rc);
1555
1556 out_fcd:
1557         OBD_FREE(fcd, sizeof(*fcd));
1558 out_export:
1559         class_disconnect(conn, 0);
1560
1561         RETURN(rc);
1562 }
1563
1564 static void filter_destroy_export(struct obd_export *exp)
1565 {
1566         struct filter_export_data *fed = &exp->exp_filter_data;
1567
1568         ENTRY;
1569         spin_lock(&fed->fed_lock);
1570         while (!list_empty(&fed->fed_open_head)) {
1571                 struct filter_file_data *ffd;
1572
1573                 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1574                                  ffd_export_list);
1575                 list_del(&ffd->ffd_export_list);
1576                 spin_unlock(&fed->fed_lock);
1577
1578                 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1579                        ffd->ffd_file->f_dentry->d_name.len,
1580                        ffd->ffd_file->f_dentry->d_name.name,
1581                        ffd, ffd->ffd_handle.h_cookie);
1582
1583                 filter_close_internal(exp, ffd, NULL, exp->exp_failover);
1584                 spin_lock(&fed->fed_lock);
1585         }
1586         spin_unlock(&fed->fed_lock);
1587
1588         if (exp->exp_obd->obd_replayable)
1589                 filter_client_free(exp, exp->exp_failover);
1590         EXIT;
1591 }
1592
1593 /* also incredibly similar to mds_disconnect */
1594 static int filter_disconnect(struct lustre_handle *conn, int failover)
1595 {
1596         struct obd_export *exp = class_conn2export(conn);
1597         int rc;
1598         unsigned long flags;
1599         ENTRY;
1600
1601         LASSERT(exp);
1602         ldlm_cancel_locks_for_export(exp);
1603
1604         spin_lock_irqsave(&exp->exp_lock, flags);
1605         exp->exp_failover = failover;
1606         spin_unlock_irqrestore(&exp->exp_lock, flags);
1607
1608         rc = class_disconnect(conn, failover);
1609
1610         fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1611         class_export_put(exp);
1612         /* XXX cleanup preallocated inodes */
1613         RETURN(rc);
1614 }
1615
1616 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1617 {
1618         int type = oa->o_mode & S_IFMT;
1619         ENTRY;
1620
1621         CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1622                inode->i_ino, inode, oa->o_id, valid);
1623         /* Don't copy the inode number in place of the object ID */
1624         obdo_from_inode(oa, inode, valid);
1625         oa->o_mode &= ~S_IFMT;
1626         oa->o_mode |= type;
1627
1628         if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1629                 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1630                 oa->o_rdev = rdev;
1631                 oa->o_valid |= OBD_MD_FLRDEV;
1632         }
1633
1634         EXIT;
1635 }
1636
1637 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1638                                          struct obdo *oa, int locked,char *what)
1639 {
1640         struct dentry *dentry = NULL;
1641
1642         if (oa->o_valid & OBD_MD_FLHANDLE) {
1643                 struct lustre_handle *ost_handle = obdo_handle(oa);
1644                 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1645
1646                 if (ffd != NULL) {
1647                         dentry = dget(ffd->ffd_file->f_dentry);
1648                         filter_ffd_put(ffd);
1649                 }
1650         }
1651
1652         if (!dentry) {
1653                 struct obd_device *obd = class_conn2obd(conn);
1654                 if (!obd) {
1655                         CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1656                         RETURN(ERR_PTR(-EINVAL));
1657                 }
1658                 dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
1659                                                               oa->o_id),
1660                                            oa->o_id, locked);
1661         }
1662
1663         if (IS_ERR(dentry)) {
1664                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1665                 RETURN(dentry);
1666         }
1667
1668         if (!dentry->d_inode) {
1669                 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1670                 f_dput(dentry);
1671                 RETURN(ERR_PTR(-ENOENT));
1672         }
1673
1674         return dentry;
1675 }
1676
1677 #define filter_oa2dentry(conn, oa, locked) __filter_oa2dentry(conn, oa, locked,\
1678                                                               __FUNCTION__)
1679
1680 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1681                           struct lov_stripe_md *md)
1682 {
1683         struct dentry *dentry = NULL;
1684         int rc = 0;
1685         ENTRY;
1686
1687         XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
1688
1689         dentry = filter_oa2dentry(conn, oa, 1);
1690         if (IS_ERR(dentry))
1691                 RETURN(PTR_ERR(dentry));
1692
1693         filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1694
1695         f_dput(dentry);
1696         RETURN(rc);
1697 }
1698
1699 /* this is called from filter_truncate() until we have filter_punch() */
1700 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1701                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1702 {
1703         struct obd_run_ctxt saved;
1704         struct obd_export *export = class_conn2export(conn);
1705         struct obd_device *obd = class_conn2obd(conn);
1706         struct filter_obd *filter = &obd->u.filter;
1707         struct dentry *dentry;
1708         struct iattr iattr;
1709         struct inode *inode;
1710         void * handle;
1711         int rc, rc2;
1712         ENTRY;
1713
1714         XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
1715
1716         dentry = filter_oa2dentry(conn, oa, 0);
1717
1718         if (IS_ERR(dentry))
1719                 GOTO(out_exp, rc = PTR_ERR(dentry));
1720
1721         iattr_from_obdo(&iattr, oa, oa->o_valid);
1722         iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1723         inode = dentry->d_inode;
1724
1725         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1726         lock_kernel();
1727         if (iattr.ia_valid & ATTR_SIZE)
1728                 down(&inode->i_sem);
1729
1730         handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1731         if (IS_ERR(handle))
1732                 GOTO(out_unlock, rc = PTR_ERR(handle));
1733
1734         if (inode->i_op->setattr)
1735                 rc = inode->i_op->setattr(dentry, &iattr);
1736         else
1737                 rc = inode_setattr(inode, &iattr);
1738         rc = filter_finish_transno(export, handle, oti, rc);
1739         rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
1740         if (rc2) {
1741                 CERROR("error on commit, err = %d\n", rc2);
1742                 if (!rc)
1743                         rc = rc2;
1744         }
1745
1746         if (iattr.ia_valid & ATTR_SIZE) {
1747                 up(&inode->i_sem);
1748                 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1749                 obdo_from_inode(oa, inode, oa->o_valid);
1750         }
1751
1752 out_unlock:
1753         unlock_kernel();
1754         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1755
1756         f_dput(dentry);
1757  out_exp:
1758         class_export_put(export);
1759         RETURN(rc);
1760 }
1761
1762 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1763                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
1764                        struct obd_client_handle *och)
1765 {
1766         struct obd_export *export;
1767         struct lustre_handle *handle;
1768         struct filter_file_data *ffd;
1769         struct file *filp;
1770         struct lustre_handle parent_lockh;
1771         int rc = 0;
1772         ENTRY;
1773
1774         export = class_conn2export(conn);
1775         if (!export) {
1776                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1777                        conn->cookie);
1778                 GOTO(out, rc = -EINVAL);
1779         }
1780
1781         XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
1782
1783         filp = filter_obj_open(export, oa->o_id, oa->o_mode,
1784                                LCK_PR, &parent_lockh);
1785         if (IS_ERR(filp))
1786                 GOTO(out, rc = PTR_ERR(filp));
1787
1788         filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1789
1790         ffd = filp->private_data;
1791         handle = obdo_handle(oa);
1792         handle->cookie = ffd->ffd_handle.h_cookie;
1793         oa->o_valid |= OBD_MD_FLHANDLE;
1794
1795 out:
1796         class_export_put(export);
1797         if (!rc) {
1798                 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1799                        sizeof(parent_lockh));
1800                 oti->oti_ack_locks[0].mode = LCK_PR;
1801         }
1802         RETURN(rc);
1803 }
1804
1805 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1806                         struct lov_stripe_md *ea, struct obd_trans_info *oti)
1807 {
1808         struct obd_export *exp = class_conn2export(conn);
1809         struct filter_file_data *ffd;
1810         struct filter_export_data *fed;
1811         int rc;
1812         ENTRY;
1813
1814         if (!exp) {
1815                 CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
1816                 GOTO(out, rc = -EINVAL);
1817         }
1818
1819         XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
1820
1821         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1822                 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1823                 GOTO(out, rc = -EINVAL);
1824         }
1825
1826         ffd = filter_handle2ffd(obdo_handle(oa));
1827         if (ffd == NULL) {
1828                 CERROR("bad handle ("LPX64") for close\n",
1829                        obdo_handle(oa)->cookie);
1830                 GOTO(out, rc = -ESTALE);
1831         }
1832
1833         fed = &exp->exp_filter_data;
1834         spin_lock(&fed->fed_lock);
1835         list_del(&ffd->ffd_export_list);
1836         spin_unlock(&fed->fed_lock);
1837
1838         rc = filter_close_internal(exp, ffd, oti, 0);
1839         filter_ffd_put(ffd);
1840         GOTO(out, rc);
1841  out:
1842         class_export_put(exp);
1843         return rc;
1844 }
1845
1846 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1847                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
1848 {
1849         struct obd_export *export;
1850         struct obd_device *obd = class_conn2obd(conn);
1851         struct filter_obd *filter = &obd->u.filter;
1852         struct obd_run_ctxt saved;
1853         struct dentry *dir_dentry;
1854         struct lustre_handle parent_lockh;
1855         struct dentry *new = NULL;
1856         struct iattr;
1857         void *handle;
1858         int err, rc, cleanup_phase;
1859         ENTRY;
1860
1861         if (!obd) {
1862                 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1863                 RETURN(-EINVAL);
1864         }
1865
1866         export = class_conn2export(conn);
1867         XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
1868
1869         oa->o_id = filter_next_id(obd);
1870
1871         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1872  retry:
1873         cleanup_phase = 0;
1874         dir_dentry = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1875                                         &parent_lockh);
1876         if (IS_ERR(dir_dentry))
1877                 GOTO(cleanup, rc = PTR_ERR(dir_dentry));
1878         cleanup_phase = 1;
1879
1880         new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
1881         if (IS_ERR(new))
1882                 GOTO(cleanup, rc = PTR_ERR(new));
1883         if (new->d_inode) {
1884                 char buf[32];
1885
1886                 /* This would only happen if lastobjid was bad on disk */
1887                 CERROR("Serious error: objid %s already exists; is this "
1888                        "filesystem corrupt?  I will try to work around it.\n",
1889                        filter_id(buf, filter, oa->o_id, oa->o_mode));
1890                 f_dput(new);
1891                 ldlm_lock_decref(&parent_lockh, LCK_PW);
1892                 oa->o_id = filter_next_id(obd);
1893                 goto retry;
1894         }
1895
1896         cleanup_phase = 2;
1897         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
1898         if (IS_ERR(handle))
1899                 GOTO(cleanup, rc = PTR_ERR(handle));
1900
1901         rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
1902         if (rc)
1903                 CERROR("create failed rc = %d\n", rc);
1904
1905         rc = filter_finish_transno(export, handle, oti, rc);
1906         err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1907         if (err) {
1908                 CERROR("unable to write lastobjid but file created\n");
1909                 if (!rc)
1910                         rc = err;
1911         }
1912         err = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
1913         if (err) {
1914                 CERROR("error on commit, err = %d\n", err);
1915                 if (!rc)
1916                         rc = err;
1917         }
1918
1919         if (rc)
1920                 GOTO(cleanup, rc);
1921
1922         /* Set flags for fields we have set in the inode struct */
1923         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1924                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1925         filter_from_inode(oa, new->d_inode, oa->o_valid);
1926
1927         EXIT;
1928 cleanup:
1929         switch(cleanup_phase) {
1930         case 2:
1931                 f_dput(new);
1932         case 1: /* locked parent dentry */
1933                 if (rc || oti == NULL) {
1934                         ldlm_lock_decref(&parent_lockh, LCK_PW);
1935                 } else {
1936                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1937                                sizeof(parent_lockh));
1938                         oti->oti_ack_locks[0].mode = LCK_PW;
1939                 }
1940         case 0:
1941                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1942                 class_export_put(export);
1943                 break;
1944         default:
1945                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1946                 LBUG();
1947         }
1948
1949         RETURN(rc);
1950 }
1951
1952 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1953                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
1954 {
1955         struct obd_export *export;
1956         struct obd_device *obd = class_conn2obd(conn);
1957         struct filter_obd *filter = &obd->u.filter;
1958         struct dentry *dir_dentry, *object_dentry = NULL;
1959         struct filter_dentry_data *fdd;
1960         struct obd_run_ctxt saved;
1961         void *handle = NULL;
1962         struct lustre_handle parent_lockh;
1963         int rc, rc2, cleanup_phase = 0;
1964         ENTRY;
1965
1966         if (!obd) {
1967                 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1968                 RETURN(-EINVAL);
1969         }
1970
1971         export = class_conn2export(conn);
1972         XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
1973
1974         CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1975
1976         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1977         dir_dentry = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1978                                         LCK_PW, &parent_lockh);
1979         if (IS_ERR(dir_dentry))
1980                 GOTO(cleanup, rc = PTR_ERR(dir_dentry));
1981         cleanup_phase = 1;
1982
1983         object_dentry = filter_oa2dentry(conn, oa, 0);
1984         if (IS_ERR(object_dentry))
1985                 GOTO(cleanup, rc = -ENOENT);
1986         cleanup_phase = 2;
1987
1988         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
1989         if (IS_ERR(handle))
1990                 GOTO(cleanup, rc = PTR_ERR(handle));
1991         cleanup_phase = 3;
1992
1993         fdd = object_dentry->d_fsdata;
1994         if (fdd && atomic_read(&fdd->fdd_open_count)) {
1995                 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1996                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1997                         /* XXX put into PENDING directory in case of crash */
1998                         CDEBUG(D_INODE,
1999                                "defer destroy of %dx open objid "LPU64"\n",
2000                                atomic_read(&fdd->fdd_open_count), oa->o_id);
2001                 } else
2002                         CDEBUG(D_INODE,
2003                                "repeat destroy of %dx open objid "LPU64"\n",
2004                                atomic_read(&fdd->fdd_open_count), oa->o_id);
2005                 GOTO(cleanup, rc = 0);
2006         }
2007
2008         rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
2009
2010 cleanup:
2011         switch(cleanup_phase) {
2012         case 3:
2013                 rc = filter_finish_transno(export, handle, oti, rc);
2014                 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
2015                 if (rc2) {
2016                         CERROR("error on commit, err = %d\n", rc2);
2017                         if (!rc)
2018                                 rc = rc2;
2019                 }
2020         case 2:
2021                 f_dput(object_dentry);
2022         case 1:
2023                 if (rc || oti == NULL) {
2024                         ldlm_lock_decref(&parent_lockh, LCK_PW);
2025                 } else {
2026                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
2027                                sizeof(parent_lockh));
2028                         oti->oti_ack_locks[0].mode = LCK_PW;
2029                 }
2030         case 0:
2031                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
2032                 class_export_put(export);
2033                 break;
2034         default:
2035                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2036                 LBUG();
2037         }
2038
2039         RETURN(rc);
2040 }
2041
2042 /* NB start and end are used for punch, but not truncate */
2043 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
2044                            struct lov_stripe_md *lsm,
2045                            obd_off start, obd_off end,
2046                            struct obd_trans_info *oti)
2047 {
2048         int error;
2049         ENTRY;
2050
2051         XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
2052
2053         if (end != OBD_OBJECT_EOF)
2054                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2055                        end);
2056
2057         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
2058                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2059         oa->o_size = start;
2060         error = filter_setattr(conn, oa, NULL, oti);
2061         RETURN(error);
2062 }
2063
2064 static inline void lustre_put_page(struct page *page)
2065 {
2066         page_cache_release(page);
2067 }
2068
2069 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
2070 {
2071         struct address_space *mapping = inode->i_mapping;
2072         struct page *page;
2073         unsigned long index = lnb->offset >> PAGE_SHIFT;
2074         int rc;
2075
2076         page = grab_cache_page(mapping, index); /* locked page */
2077         if (IS_ERR(page))
2078                 return lnb->rc = PTR_ERR(page);
2079
2080         lnb->page = page;
2081
2082         if (inode->i_size < lnb->offset + lnb->len - 1)
2083                 lnb->rc = inode->i_size - lnb->offset;
2084         else
2085                 lnb->rc = lnb->len;
2086
2087         if (PageUptodate(page)) {
2088                 unlock_page(page);
2089                 return 0;
2090         }
2091
2092         rc = mapping->a_ops->readpage(NULL, page);
2093         if (rc < 0) {
2094                 CERROR("page index %lu, rc = %d\n", index, rc);
2095                 lnb->page = NULL;
2096                 lustre_put_page(page);
2097                 return lnb->rc = rc;
2098         }
2099
2100         return 0;
2101 }
2102
2103 static int filter_finish_page_read(struct niobuf_local *lnb)
2104 {
2105         if (lnb->page == NULL)
2106                 return 0;
2107
2108         if (PageUptodate(lnb->page))
2109                 return 0;
2110
2111         wait_on_page(lnb->page);
2112         if (!PageUptodate(lnb->page)) {
2113                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
2114                        lnb->page->index, lnb->offset);
2115                 GOTO(err_page, lnb->rc = -EIO);
2116         }
2117         if (PageError(lnb->page)) {
2118                 CERROR("page index %lu/offset "LPX64" has error\n",
2119                        lnb->page->index, lnb->offset);
2120                 GOTO(err_page, lnb->rc = -EIO);
2121         }
2122
2123         return 0;
2124
2125 err_page:
2126         lustre_put_page(lnb->page);
2127         lnb->page = NULL;
2128         return lnb->rc;
2129 }
2130
2131 static struct page *lustre_get_page_write(struct inode *inode,
2132                                           unsigned long index)
2133 {
2134         struct address_space *mapping = inode->i_mapping;
2135         struct page *page;
2136         int rc;
2137
2138         page = grab_cache_page(mapping, index); /* locked page */
2139
2140         if (!IS_ERR(page)) {
2141                 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
2142                  * a no-op for most filesystems, because we write the whole
2143                  * page.  For partial-page I/O this will read in the page.
2144                  */
2145                 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
2146                 if (rc) {
2147                         CERROR("page index %lu, rc = %d\n", index, rc);
2148                         if (rc != -ENOSPC)
2149                                 LBUG();
2150                         GOTO(err_unlock, rc);
2151                 }
2152                 /* XXX not sure if we need this if we are overwriting page */
2153                 if (PageError(page)) {
2154                         CERROR("error on page index %lu, rc = %d\n", index, rc);
2155                         LBUG();
2156                         GOTO(err_unlock, rc = -EIO);
2157                 }
2158         }
2159         return page;
2160
2161 err_unlock:
2162         unlock_page(page);
2163         lustre_put_page(page);
2164         return ERR_PTR(rc);
2165 }
2166
2167 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2168 int waitfor_one_page(struct page *page)
2169 {
2170         wait_on_page_locked(page);
2171         return 0;
2172 }
2173 #endif
2174
2175 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2176 /* We should only change the file mtime (and not the ctime, like
2177  * update_inode_times() in generic_file_write()) when we only change data.
2178  */
2179 static inline void inode_update_time(struct inode *inode, int ctime_too)
2180 {
2181         time_t now = CURRENT_TIME;
2182         if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
2183                 return;
2184         inode->i_mtime = now;
2185         if (ctime_too)
2186                 inode->i_ctime = now;
2187         mark_inode_dirty_sync(inode);
2188 }
2189 #endif
2190
2191 static int lustre_commit_write(struct niobuf_local *lnb)
2192 {
2193         struct page *page = lnb->page;
2194         unsigned from = lnb->offset & ~PAGE_MASK;
2195         unsigned to = from + lnb->len;
2196         struct inode *inode = page->mapping->host;
2197         int err;
2198
2199         LASSERT(to <= PAGE_SIZE);
2200         err = page->mapping->a_ops->commit_write(NULL, page, from, to);
2201         if (!err && IS_SYNC(inode))
2202                 waitfor_one_page(page);
2203         //SetPageUptodate(page); // the client commit_write will do this
2204
2205         SetPageReferenced(page);
2206         unlock_page(page);
2207         lustre_put_page(page);
2208         return err;
2209 }
2210
2211 int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
2212                           int *pglocked)
2213 {
2214         unsigned long index = lnb->offset >> PAGE_SHIFT;
2215         struct address_space *mapping = inode->i_mapping;
2216         struct page *page;
2217         int rc;
2218
2219         //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
2220         if (*pglocked)
2221                 page = grab_cache_page_nowait(mapping, index); /* locked page */
2222         else
2223                 page = grab_cache_page(mapping, index); /* locked page */
2224
2225
2226         /* This page is currently locked, so get a temporary page instead. */
2227         if (!page) {
2228                 unsigned long addr;
2229                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
2230                 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
2231                 if (!addr) {
2232                         CERROR("no memory for a temp page\n");
2233                         GOTO(err, rc = -ENOMEM);
2234                 }
2235                 POISON((void *)addr, 0xBA, PAGE_SIZE);
2236                 page = virt_to_page(addr);
2237                 page->index = index;
2238                 lnb->page = page;
2239                 lnb->flags |= N_LOCAL_TEMP_PAGE;
2240         } else if (!IS_ERR(page)) {
2241                 (*pglocked)++;
2242
2243                 rc = mapping->a_ops->prepare_write(NULL, page,
2244                                                    lnb->offset & ~PAGE_MASK,
2245                                                    lnb->len);
2246                 if (rc) {
2247                         if (rc != -ENOSPC)
2248                                 CERROR("page index %lu, rc = %d\n", index, rc);
2249                         GOTO(err_unlock, rc);
2250                 }
2251                 /* XXX not sure if we need this if we are overwriting page */
2252                 if (PageError(page)) {
2253                         CERROR("error on page index %lu, rc = %d\n", index, rc);
2254                         LBUG();
2255                         GOTO(err_unlock, rc = -EIO);
2256                 }
2257                 lnb->page = page;
2258         }
2259
2260         return 0;
2261
2262 err_unlock:
2263         unlock_page(page);
2264         lustre_put_page(page);
2265 err:
2266         return lnb->rc = rc;
2267 }
2268
2269 /*
2270  * We need to balance prepare_write() calls with commit_write() calls.
2271  * If the page has been prepared, but we have no data for it, we don't
2272  * want to overwrite valid data on disk, but we still need to zero out
2273  * data for space which was newly allocated.  Like part of what happens
2274  * in __block_prepare_write() for newly allocated blocks.
2275  *
2276  * XXX currently __block_prepare_write() creates buffers for all the
2277  *     pages, and the filesystems mark these buffers as BH_New if they
2278  *     were newly allocated from disk. We use the BH_New flag similarly.
2279  */
2280 static int filter_commit_write(struct niobuf_local *lnb, int err)
2281 {
2282 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2283         if (err) {
2284                 unsigned block_start, block_end;
2285                 struct buffer_head *bh, *head = lnb->page->buffers;
2286                 unsigned blocksize = head->b_size;
2287
2288                 /* debugging: just seeing if this ever happens */
2289                 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
2290                        "called for ino %lu:%lu on err %d\n",
2291                        lnb->page->mapping->host->i_ino, lnb->page->index, err);
2292
2293                 /* Currently one buffer per page, but in the future... */
2294                 for (bh = head, block_start = 0; bh != head || !block_start;
2295                      block_start = block_end, bh = bh->b_this_page) {
2296                         block_end = block_start + blocksize;
2297                         if (buffer_new(bh)) {
2298                                 memset(kmap(lnb->page) + block_start, 0,
2299                                        blocksize);
2300                                 kunmap(lnb->page);
2301                         }
2302                 }
2303         }
2304 #endif
2305         return lustre_commit_write(lnb);
2306 }
2307
2308 static int filter_preprw(int cmd, struct obd_export *export,
2309                          int objcount, struct obd_ioobj *obj,
2310                          int niocount, struct niobuf_remote *nb,
2311                          struct niobuf_local *res, void **desc_private,
2312                          struct obd_trans_info *oti)
2313 {
2314         struct obd_run_ctxt saved;
2315         struct obd_device *obd;
2316         struct obd_ioobj *o;
2317         struct niobuf_remote *rnb;
2318         struct niobuf_local *lnb;
2319         struct fsfilt_objinfo *fso;
2320         struct dentry *dentry;
2321         struct inode *inode;
2322         struct lprocfs_counters *cntrs;
2323         int pglocked = 0, rc = 0, i, j;
2324
2325         ENTRY;
2326
2327         if ((cmd & OBD_BRW_WRITE) != 0)
2328                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2329         else
2330                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2331
2332         memset(res, 0, niocount * sizeof(*res));
2333
2334         obd = export->exp_obd;
2335         if (obd == NULL)
2336                 RETURN(-EINVAL);
2337
2338         cntrs = obd->counters;
2339         if ((cmd & OBD_BRW_WRITE) != 0)
2340                 LPROCFS_COUNTER_INCBY1(&cntrs->cntr[LPROC_FILTER_WRITES]);
2341         else
2342                 LPROCFS_COUNTER_INCBY1(&cntrs->cntr[LPROC_FILTER_READS]);
2343
2344         // theoretically we support multi-obj BRW RPCs, but until then...
2345         LASSERT(objcount == 1);
2346
2347         OBD_ALLOC(fso, objcount * sizeof(*fso));
2348         if (!fso)
2349                 RETURN(-ENOMEM);
2350
2351         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2352
2353         for (i = 0, o = obj; i < objcount; i++, o++) {
2354                 struct filter_dentry_data *fdd;
2355
2356                 LASSERT(o->ioo_bufcnt);
2357
2358                 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2359                                                               o->ioo_id),
2360                                            o->ioo_id, 0);
2361
2362                 if (IS_ERR(dentry))
2363                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
2364
2365                 fso[i].fso_dentry = dentry;
2366                 fso[i].fso_bufcnt = o->ioo_bufcnt;
2367
2368                 if (!dentry->d_inode) {
2369                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2370                                o->ioo_id);
2371                         GOTO(out_objinfo, rc = -ENOENT);
2372                 }
2373
2374                 /* If we ever start to support mutli-object BRW RPCs, we will
2375                  * need to get locks on mulitple inodes (in order) or use the
2376                  * DLM to do the locking for us (and use the same locking in
2377                  * filter_setattr() for truncate).  That isn't all, because
2378                  * there still exists the possibility of a truncate starting
2379                  * a new transaction while holding the ext3 rwsem = write
2380                  * while some writes (which have started their transactions
2381                  * here) blocking on the ext3 rwsem = read => lock inversion.
2382                  *
2383                  * The handling gets very ugly when dealing with locked pages.
2384                  * It may be easier to just get rid of the locked page code
2385                  * (which has problems of its own) and either discover we do
2386                  * not need it anymore (i.e. it was a symptom of another bug)
2387                  * or ensure we get the page locks in an appropriate order.
2388                  */
2389                 if (cmd & OBD_BRW_WRITE)
2390                         down(&dentry->d_inode->i_sem);
2391                 fdd = dentry->d_fsdata;
2392                 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2393                         CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2394                                o->ioo_id);
2395         }
2396
2397         if (cmd & OBD_BRW_WRITE) {
2398                 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2399                                                  niocount, nb);
2400                 if (IS_ERR(*desc_private)) {
2401                         rc = PTR_ERR(*desc_private);
2402                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2403                                "error starting transaction: rc = %d\n", rc);
2404                         *desc_private = NULL;
2405                         GOTO(out_objinfo, rc);
2406                 }
2407         }
2408
2409         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
2410                 dentry = fso[i].fso_dentry;
2411                 inode = dentry->d_inode;
2412
2413                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2414                         if (j == 0)
2415                                 lnb->dentry = dentry;
2416                         else
2417                                 lnb->dentry = dget(dentry);
2418
2419                         lnb->offset = rnb->offset;
2420                         lnb->len    = rnb->len;
2421                         lnb->flags  = rnb->flags;
2422
2423                         if (cmd & OBD_BRW_WRITE) {
2424                                 rc = filter_get_page_write(inode,lnb,&pglocked);
2425
2426                                 XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
2427                                                           lnb->len);
2428                                 LPROCFS_COUNTER_INCR(&cntrs->cntr[LPROC_FILTER_WRITE_BYTES], lnb->len);
2429                         } else if (inode->i_size <= rnb->offset) {
2430                                 /* If there's no more data, abort early.
2431                                  * lnb->page == NULL and lnb->rc == 0, so it's
2432                                  * easy to detect later. */
2433                                 f_dput(lnb->dentry);
2434                                 lnb->dentry = NULL;
2435                                 break;
2436                         } else {
2437                                 rc = filter_start_page_read(inode, lnb);
2438
2439                                 XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
2440                                                           lnb->len);
2441                                 LPROCFS_COUNTER_INCR(&cntrs->cntr[LPROC_FILTER_READ_BYTES], lnb->len);
2442                         }
2443
2444                         if (rc) {
2445                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2446                                        "error on page @"LPU64"%u/%u: rc = %d\n",
2447                                        lnb->offset, j, o->ioo_bufcnt, rc);
2448                                 f_dput(dentry);
2449                                 GOTO(out_pages, rc);
2450                         }
2451
2452                         if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
2453                                 /* Likewise with a partial read */
2454                                 break;
2455                         }
2456                 }
2457         }
2458
2459         while ((cmd & OBD_BRW_READ) && lnb-- > res) {
2460                 rc = filter_finish_page_read(lnb);
2461                 if (rc) {
2462                         CERROR("error on page %u@"LPU64": rc = %d\n",
2463                                lnb->len, lnb->offset, rc);
2464                         f_dput(lnb->dentry);
2465                         GOTO(out_pages, rc);
2466                 }
2467         }
2468         EXIT;
2469 out:
2470         OBD_FREE(fso, objcount * sizeof(*fso));
2471         current->journal_info = NULL;
2472         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2473         return rc;
2474
2475 out_pages:
2476         while (lnb-- > res) {
2477                 if (cmd & OBD_BRW_WRITE) {
2478                         filter_commit_write(lnb, rc);
2479                         up(&lnb->dentry->d_inode->i_sem);
2480                 } else {
2481                         lustre_put_page(lnb->page);
2482                 }
2483                 f_dput(lnb->dentry);
2484         }
2485         if (cmd & OBD_BRW_WRITE) {
2486                 filter_finish_transno(export, *desc_private, oti, rc);
2487                 fsfilt_commit(obd,
2488                               filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2489                               *desc_private, 0);
2490         }
2491         goto out; /* dropped the dentry refs already (one per page) */
2492
2493 out_objinfo:
2494         for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
2495                 if (cmd & OBD_BRW_WRITE)
2496                         up(&fso[i].fso_dentry->d_inode->i_sem);
2497                 f_dput(fso[i].fso_dentry);
2498         }
2499         goto out;
2500 }
2501
2502 static int filter_write_locked_page(struct niobuf_local *lnb)
2503 {
2504         struct page *lpage;
2505         void        *lpage_addr;
2506         void        *lnb_addr;
2507         int rc;
2508         ENTRY;
2509
2510         lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2511         if (IS_ERR(lpage)) {
2512                 /* It is highly unlikely that we would ever get an error here.
2513                  * The page we want to get was previously locked, so it had to
2514                  * have already allocated the space, and we were just writing
2515                  * over the same data, so there would be no hole in the file.
2516                  *
2517                  * XXX: possibility of a race with truncate could exist, need
2518                  *      to check that.  There are no guarantees w.r.t.
2519                  *      write order even on a local filesystem, although the
2520                  *      normal response would be to return the number of bytes
2521                  *      successfully written and leave the rest to the app.
2522                  */
2523                 rc = PTR_ERR(lpage);
2524                 CERROR("error getting locked page index %ld: rc = %d\n",
2525                        lnb->page->index, rc);
2526                 LBUG();
2527                 lustre_commit_write(lnb);
2528                 RETURN(rc);
2529         }
2530
2531         /* 2 kmaps == vanishingly small deadlock opportunity */
2532         lpage_addr = kmap(lpage);
2533         lnb_addr = kmap(lnb->page);
2534
2535         memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
2536
2537         kunmap(lnb->page);
2538         kunmap(lpage);
2539
2540         lustre_put_page(lnb->page);
2541
2542         lnb->page = lpage;
2543         rc = lustre_commit_write(lnb);
2544         if (rc)
2545                 CERROR("error committing locked page %ld: rc = %d\n",
2546                        lnb->page->index, rc);
2547
2548         RETURN(rc);
2549 }
2550
2551 static int filter_syncfs(struct obd_export *exp)
2552 {
2553         struct obd_device *obd = exp->exp_obd;
2554         ENTRY;
2555
2556         XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
2557
2558         RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2559 }
2560
2561 static int filter_commitrw(int cmd, struct obd_export *export,
2562                            int objcount, struct obd_ioobj *obj,
2563                            int niocount, struct niobuf_local *res,
2564                            void *desc_private, struct obd_trans_info *oti)
2565 {
2566         struct obd_run_ctxt saved;
2567         struct obd_ioobj *o;
2568         struct niobuf_local *lnb;
2569         struct obd_device *obd = export->exp_obd;
2570         int found_locked = 0, rc = 0, i;
2571         ENTRY;
2572
2573         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2574
2575         LASSERT(!current->journal_info);
2576         current->journal_info = desc_private;
2577
2578         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2579                 int j;
2580
2581                 if (cmd & OBD_BRW_WRITE) {
2582                         inode_update_time(lnb->dentry->d_inode, 1);
2583                         up(&lnb->dentry->d_inode->i_sem);
2584                 }
2585                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2586                         if (lnb->page == NULL) {
2587                                 continue;
2588                         }
2589                         if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2590                                 found_locked++;
2591                                 continue;
2592                         }
2593
2594                         if (cmd & OBD_BRW_WRITE) {
2595                                 int err = filter_commit_write(lnb, 0);
2596
2597                                 if (!rc)
2598                                         rc = err;
2599                         } else {
2600                                 lustre_put_page(lnb->page);
2601                         }
2602
2603                         f_dput(lnb->dentry);
2604                 }
2605         }
2606
2607         for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2608              i++, o++) {
2609                 int j;
2610                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2611                         int err;
2612                         if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2613                                 continue;
2614
2615                         err = filter_write_locked_page(lnb);
2616                         if (!rc)
2617                                 rc = err;
2618                         f_dput(lnb->dentry);
2619                         found_locked--;
2620                 }
2621         }
2622
2623         if (cmd & OBD_BRW_WRITE) {
2624                 /* We just want any dentry for the commit, for now */
2625                 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0);
2626                 int err;
2627
2628                 rc = filter_finish_transno(export, desc_private, oti, rc);
2629                 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private,
2630                                     obd_sync_filter);
2631                 if (err)
2632                         rc = err;
2633                 if (obd_sync_filter)
2634                         LASSERT(oti->oti_transno <= obd->obd_last_committed);
2635
2636         }
2637
2638         LASSERT(!current->journal_info);
2639
2640         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2641         RETURN(rc);
2642 }
2643
2644 static int filter_brw(int cmd, struct lustre_handle *conn,
2645                       struct lov_stripe_md *lsm, obd_count oa_bufs,
2646                       struct brw_page *pga, struct obd_trans_info *oti)
2647 {
2648         struct obd_export *export = class_conn2export(conn);
2649         struct obd_ioobj        ioo;
2650         struct niobuf_local     *lnb;
2651         struct niobuf_remote    *rnb;
2652         obd_count               i;
2653         void                    *desc_private;
2654         int                     ret = 0;
2655         ENTRY;
2656
2657         if (export == NULL)
2658                 RETURN(-EINVAL);
2659
2660         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2661         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2662
2663         if (lnb == NULL || rnb == NULL)
2664                 GOTO(out, ret = -ENOMEM);
2665
2666         for (i = 0; i < oa_bufs; i++) {
2667                 rnb[i].offset = pga[i].off;
2668                 rnb[i].len = pga[i].count;
2669         }
2670
2671         ioo.ioo_id = lsm->lsm_object_id;
2672         ioo.ioo_gr = 0;
2673         ioo.ioo_type = S_IFREG;
2674         ioo.ioo_bufcnt = oa_bufs;
2675
2676         ret = filter_preprw(cmd, export, 1, &ioo, oa_bufs, rnb, lnb,
2677                             &desc_private, oti);
2678         if (ret != 0)
2679                 GOTO(out, ret);
2680
2681         for (i = 0; i < oa_bufs; i++) {
2682                 void *virt = kmap(pga[i].pg);
2683                 obd_off off = pga[i].off & ~PAGE_MASK;
2684                 void *addr = kmap(lnb[i].page);
2685
2686                 /* 2 kmaps == vanishingly small deadlock opportunity */
2687
2688                 if (cmd & OBD_BRW_WRITE)
2689                         memcpy(addr + off, virt + off, pga[i].count);
2690                 else
2691                         memcpy(virt + off, addr + off, pga[i].count);
2692
2693                 kunmap(addr);
2694                 kunmap(virt);
2695         }
2696
2697         ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
2698                               oti);
2699
2700 out:
2701         if (lnb)
2702                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2703         if (rnb)
2704                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2705         class_export_put(export);
2706         RETURN(ret);
2707 }
2708
2709 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2710                              int objcount, struct obd_ioobj *obj,
2711                              int niocount, struct niobuf_remote *nb)
2712 {
2713         struct obd_device *obd;
2714         struct obd_ioobj *o = obj;
2715         struct niobuf_remote *rnb = nb;
2716         int rc = 0;
2717         int i;
2718         ENTRY;
2719
2720         if ((cmd & OBD_BRW_WRITE) != 0)
2721                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2722         else
2723                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2724
2725         obd = class_conn2obd(conn);
2726         if (!obd) {
2727                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2728                        conn->cookie);
2729                 RETURN(-EINVAL);
2730         }
2731
2732         for (i = 0; i < objcount; i++, o++) {
2733                 struct dentry *dentry;
2734                 struct inode *inode;
2735                 int (*fs_bmap)(struct address_space *, long);
2736                 int j;
2737
2738                 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2739                                                               o->ioo_id),
2740                                            o->ioo_id, 0);
2741                 if (IS_ERR(dentry))
2742                         GOTO(out, rc = PTR_ERR(dentry));
2743                 inode = dentry->d_inode;
2744                 if (!inode) {
2745                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2746                                o->ioo_id);
2747                         f_dput(dentry);
2748                         GOTO(out, rc = -ENOENT);
2749                 }
2750                 fs_bmap = inode->i_mapping->a_ops->bmap;
2751
2752                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2753                         long block;
2754
2755                         block = rnb->offset >> inode->i_blkbits;
2756
2757                         if (cmd == OBD_BRW_READ) {
2758                                 block = fs_bmap(inode->i_mapping, block);
2759                         } else {
2760                                 loff_t newsize = rnb->offset + rnb->len;
2761                                 /* fs_prep_san_write will also update inode
2762                                  * size for us:
2763                                  * (1) new alloced block
2764                                  * (2) existed block but size extented
2765                                  */
2766                                 /* FIXME We could call fs_prep_san_write()
2767                                  * only once for all the blocks allocation.
2768                                  * Now call it once for each block, for
2769                                  * simplicity. And if error happens, we
2770                                  * probably need to release previous alloced
2771                                  * block */
2772                                 rc = fs_prep_san_write(obd, inode, &block,
2773                                                        1, newsize);
2774                                 if (rc)
2775                                         break;
2776                         }
2777
2778                         rnb->offset = block;
2779                 }
2780                 f_dput(dentry);
2781         }
2782 out:
2783         RETURN(rc);
2784 }
2785
2786 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2787 {
2788         struct obd_device *obd;
2789         ENTRY;
2790
2791         obd = class_conn2obd(conn);
2792
2793         XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
2794
2795         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2796 }
2797
2798 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2799                            void *key, __u32 *vallen, void *val)
2800 {
2801         struct obd_device *obd;
2802         ENTRY;
2803
2804         obd = class_conn2obd(conn);
2805         if (!obd) {
2806                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2807                        conn->cookie);
2808                 RETURN(-EINVAL);
2809         }
2810
2811         if (keylen == strlen("blocksize") &&
2812             memcmp(key, "blocksize", keylen) == 0) {
2813                 __u32 *blocksize = val;
2814                 *vallen = sizeof(*blocksize);
2815                 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2816                 RETURN(0);
2817         }
2818
2819         if (keylen == strlen("blocksize_bits") &&
2820             memcmp(key, "blocksize_bits", keylen) == 0) {
2821                 __u32 *blocksize_bits = val;
2822                 *vallen = sizeof(*blocksize_bits);
2823                 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2824                 RETURN(0);
2825         }
2826
2827         CDEBUG(D_IOCTL, "invalid key\n");
2828         RETURN(-EINVAL);
2829 }
2830
2831 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2832                   struct lustre_handle *src_conn, struct obdo *src,
2833                   obd_size count, obd_off offset, struct obd_trans_info *oti)
2834 {
2835         struct page *page;
2836         struct lov_stripe_md srcmd, dstmd;
2837         unsigned long index = 0;
2838         int err = 0;
2839
2840         LBUG(); /* THIS CODE IS NOT CORRECT -phil */
2841
2842         memset(&srcmd, 0, sizeof(srcmd));
2843         memset(&dstmd, 0, sizeof(dstmd));
2844         srcmd.lsm_object_id = src->o_id;
2845         dstmd.lsm_object_id = dst->o_id;
2846
2847         ENTRY;
2848         CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2849                ", dst: ino "LPU64"\n",
2850                src->o_id, src->o_blocks, src->o_size, dst->o_id);
2851         page = alloc_page(GFP_USER);
2852         if (page == NULL)
2853                 RETURN(-ENOMEM);
2854
2855         wait_on_page(page);
2856
2857         /* XXX with brw vector I/O, we could batch up reads and writes here,
2858          *     all we need to do is allocate multiple pages to handle the I/Os
2859          *     and arrays to handle the request parameters.
2860          */
2861         while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2862                 struct brw_page pg;
2863
2864                 pg.pg = page;
2865                 pg.count = PAGE_SIZE;
2866                 pg.off = (page->index) << PAGE_SHIFT;
2867                 pg.flag = 0;
2868
2869                 page->index = index;
2870                 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
2871                 if (err) {
2872                         EXIT;
2873                         break;
2874                 }
2875
2876                 pg.flag = OBD_BRW_CREATE;
2877                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2878
2879                 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
2880
2881                 /* XXX should handle dst->o_size, dst->o_blocks here */
2882                 if (err) {
2883                         EXIT;
2884                         break;
2885                 }
2886
2887                 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2888
2889                 index++;
2890         }
2891         dst->o_size = src->o_size;
2892         dst->o_blocks = src->o_blocks;
2893         dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2894         unlock_page(page);
2895         __free_page(page);
2896
2897         RETURN(err);
2898 }
2899
2900 static struct obd_ops filter_obd_ops = {
2901         o_owner:          THIS_MODULE,
2902         o_attach:         filter_attach,
2903         o_detach:         filter_detach,
2904         o_get_info:       filter_get_info,
2905         o_setup:          filter_setup,
2906         o_cleanup:        filter_cleanup,
2907         o_connect:        filter_connect,
2908         o_disconnect:     filter_disconnect,
2909         o_statfs:         filter_statfs,
2910         o_syncfs:         filter_syncfs,
2911         o_getattr:        filter_getattr,
2912         o_create:         filter_create,
2913         o_setattr:        filter_setattr,
2914         o_destroy:        filter_destroy,
2915         o_open:           filter_open,
2916         o_close:          filter_close,
2917         o_brw:            filter_brw,
2918         o_punch:          filter_truncate,
2919         o_preprw:         filter_preprw,
2920         o_commitrw:       filter_commitrw,
2921         o_destroy_export: filter_destroy_export,
2922 #if 0
2923         o_san_preprw:  filter_san_preprw,
2924         o_preallocate: filter_preallocate_inodes,
2925         o_migrate:     filter_migrate,
2926         o_copy:        filter_copy_data,
2927         o_iterate:     filter_iterate
2928 #endif
2929 };
2930
2931 static struct obd_ops filter_sanobd_ops = {
2932         o_owner:          THIS_MODULE,
2933         o_attach:         filter_attach,
2934         o_detach:         filter_detach,
2935         o_get_info:       filter_get_info,
2936         o_setup:          filter_san_setup,
2937         o_cleanup:        filter_cleanup,
2938         o_connect:        filter_connect,
2939         o_disconnect:     filter_disconnect,
2940         o_statfs:         filter_statfs,
2941         o_getattr:        filter_getattr,
2942         o_create:         filter_create,
2943         o_setattr:        filter_setattr,
2944         o_destroy:        filter_destroy,
2945         o_open:           filter_open,
2946         o_close:          filter_close,
2947         o_brw:            filter_brw,
2948         o_punch:          filter_truncate,
2949         o_preprw:         filter_preprw,
2950         o_commitrw:       filter_commitrw,
2951         o_san_preprw:     filter_san_preprw,
2952         o_destroy_export: filter_destroy_export
2953 #if 0
2954         o_preallocate:  filter_preallocate_inodes,
2955         o_migrate:      filter_migrate,
2956         o_copy:         filter_copy_data,
2957         o_iterate:      filter_iterate
2958 #endif
2959 };
2960
2961
2962 static int __init obdfilter_init(void)
2963 {
2964         struct lprocfs_static_vars lvars;
2965         int rc;
2966
2967         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2968
2969         xprocfs_init ("filter");
2970         lprocfs_init_vars(&lvars);
2971
2972         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2973                                  OBD_FILTER_DEVICENAME);
2974         if (rc)
2975                 return rc;
2976
2977         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2978                                  OBD_FILTER_SAN_DEVICENAME);
2979         if (rc)
2980                 class_unregister_type(OBD_FILTER_DEVICENAME);
2981         return rc;
2982 }
2983
2984 static void __exit obdfilter_exit(void)
2985 {
2986         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2987         class_unregister_type(OBD_FILTER_DEVICENAME);
2988         xprocfs_fini ();
2989 }
2990
2991 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2992 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2993 MODULE_LICENSE("GPL");
2994
2995 module_init(obdfilter_init);
2996 module_exit(obdfilter_exit);