Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_FILTER
38
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
42 #include <linux/fs.h>
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51
52
53 static kmem_cache_t *filter_open_cache;
54 static kmem_cache_t *filter_dentry_cache;
55
56 /* should be generic per-obd stats... */
57 struct xprocfs_io_stat {
58         __u64    st_read_bytes;
59         __u64    st_read_reqs;
60         __u64    st_write_bytes;
61         __u64    st_write_reqs;
62         __u64    st_getattr_reqs;
63         __u64    st_setattr_reqs;
64         __u64    st_create_reqs;
65         __u64    st_destroy_reqs;
66         __u64    st_statfs_reqs;
67         __u64    st_open_reqs;
68         __u64    st_close_reqs;
69         __u64    st_punch_reqs;
70 };
71
72 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
73 static struct proc_dir_entry *xprocfs_dir;
74
75 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count)                 \
76 do {                                                            \
77         xprocfs_iostats[smp_processor_id()].field += (count);   \
78 } while (0)
79
80 #define DECLARE_XPROCFS_SUM_STAT(field)                 \
81 static long long                                        \
82 xprocfs_sum_##field (void)                              \
83 {                                                       \
84         long long stat = 0;                             \
85         int       i;                                    \
86                                                         \
87         for (i = 0; i < smp_num_cpus; i++)              \
88                 stat += xprocfs_iostats[i].field;       \
89         return (stat);                                  \
90 }
91
92 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
93 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
94 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
95 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
96 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
97 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
98 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
99 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
100 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
101 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
102 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
103 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
104
105 static int
106 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
107                  int  *eof, void *data)
108 {
109         long long (*fn)(void) = (long long(*)(void))data;
110         int         len;
111
112         *eof = 1;
113         if (off != 0)
114                 return (0);
115
116         len = snprintf (page, count, "%Ld\n", fn());
117         *start = page;
118         return (len);
119 }
120
121
122 static void
123 xprocfs_add_stat(char *name, long long (*fn)(void))
124 {
125         struct proc_dir_entry *entry;
126
127         entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
128         if (entry == NULL) {
129                 CERROR ("Can't add procfs stat %s\n", name);
130                 return;
131         }
132
133         entry->data = fn;
134         entry->read_proc = xprocfs_rd_stat;
135         entry->write_proc = NULL;
136 }
137
138 static void
139 xprocfs_init (char *name)
140 {
141         char  dirname[64];
142
143         snprintf (dirname, sizeof (dirname), "sys/%s", name);
144
145         xprocfs_dir = proc_mkdir ("sys/obdfilter", NULL);
146         if (xprocfs_dir == NULL) {
147                 CERROR ("Can't make dir\n");
148                 return;
149         }
150
151         xprocfs_add_stat ("read_bytes",   xprocfs_sum_st_read_bytes);
152         xprocfs_add_stat ("read_reqs",    xprocfs_sum_st_read_reqs);
153         xprocfs_add_stat ("write_bytes",  xprocfs_sum_st_write_bytes);
154         xprocfs_add_stat ("write_reqs",   xprocfs_sum_st_write_reqs);
155         xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
156         xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
157         xprocfs_add_stat ("create_reqs",  xprocfs_sum_st_create_reqs);
158         xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
159         xprocfs_add_stat ("statfs_reqs",  xprocfs_sum_st_statfs_reqs);
160         xprocfs_add_stat ("open_reqs",    xprocfs_sum_st_open_reqs);
161         xprocfs_add_stat ("close_reqs",   xprocfs_sum_st_close_reqs);
162         xprocfs_add_stat ("punch_reqs",   xprocfs_sum_st_punch_reqs);
163 }
164
165 void xprocfs_fini (void)
166 {
167         if (xprocfs_dir == NULL)
168                 return;
169
170         remove_proc_entry ("read_bytes",   xprocfs_dir);
171         remove_proc_entry ("read_reqs",    xprocfs_dir);
172         remove_proc_entry ("write_bytes",  xprocfs_dir);
173         remove_proc_entry ("write_reqs",   xprocfs_dir);
174         remove_proc_entry ("getattr_reqs", xprocfs_dir);
175         remove_proc_entry ("setattr_reqs", xprocfs_dir);
176         remove_proc_entry ("create_reqs",  xprocfs_dir);
177         remove_proc_entry ("destroy_reqs", xprocfs_dir);
178         remove_proc_entry ("statfs_reqs",  xprocfs_dir);
179         remove_proc_entry ("open_reqs",    xprocfs_dir);
180         remove_proc_entry ("close_reqs",   xprocfs_dir);
181         remove_proc_entry ("punch_reqs",   xprocfs_dir);
182
183         remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
184         xprocfs_dir = NULL;
185 }
186
187 #define S_SHIFT 12
188 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
189         [0]                     NULL,
190         [S_IFREG >> S_SHIFT]    "R",
191         [S_IFDIR >> S_SHIFT]    "D",
192         [S_IFCHR >> S_SHIFT]    "C",
193         [S_IFBLK >> S_SHIFT]    "B",
194         [S_IFIFO >> S_SHIFT]    "F",
195         [S_IFSOCK >> S_SHIFT]   "S",
196         [S_IFLNK >> S_SHIFT]    "L"
197 };
198
199 static inline const char *obd_mode_to_type(int mode)
200 {
201         return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
202 }
203
204 static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
205                                 int error)
206 {
207         CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
208                last_rcvd, error);
209         if (!error && last_rcvd > obd->obd_last_committed)
210                 obd->obd_last_committed = last_rcvd;
211 }
212
213 void filter_start_transno(struct obd_export *export)
214 {
215         struct obd_device * obd = export->exp_obd;
216         ENTRY;
217
218         down(&obd->u.filter.fo_transno_sem);
219 }
220
221 /* Assumes caller has already pushed us into the kernel context. */
222 int filter_finish_transno(struct obd_export *export, void *handle,
223                           struct obd_trans_info *oti, int rc)
224 {
225         __u64 last_rcvd;
226         struct obd_device *obd = export->exp_obd;
227         struct filter_obd *filter = &obd->u.filter;
228         struct filter_export_data *fed = &export->exp_filter_data;
229         struct filter_client_data *fcd = fed->fed_fcd;
230         loff_t off;
231         ssize_t written;
232
233         /* Propagate error code. */
234         if (rc)
235                 GOTO(out, rc);
236
237         /* we don't allocate new transnos for replayed requests */
238 #if 0
239         /* perhaps if transno already set? or should level be in oti? */
240         if (req->rq_level == LUSTRE_CONN_RECOVD)
241                 GOTO(out, rc = 0);
242 #endif
243
244         off = FILTER_LR_CLIENT_START + fed->fed_lr_off * FILTER_LR_CLIENT_SIZE;
245
246         last_rcvd = ++filter->fo_fsd->fsd_last_rcvd;
247         if (oti)
248                 oti->oti_transno = last_rcvd;
249         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
250         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
251
252         /* get this from oti */
253 #if 0
254         if (oti)
255                 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
256         else
257 #else
258         fcd->fcd_last_xid = 0;
259 #endif
260         fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_last_rcvd_cb);
261         written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
262                                 &off);
263         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
264                LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_off, written);
265
266         if (written == sizeof(*fcd))
267                 GOTO(out, rc = 0);
268         CERROR("error writing to last_rcvd file: rc = %d\n", rc);
269         if (written >= 0)
270                 GOTO(out, rc = -EIO);
271
272         rc = 0;
273
274         EXIT;
275  out:
276
277         up(&filter->fo_transno_sem);
278         return rc;
279 }
280
281 /* write the pathname into the string */
282 static int filter_id(char *buf, obd_id id, obd_mode mode)
283 {
284         return sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
285 }
286
287 static inline void f_dput(struct dentry *dentry)
288 {
289         /* Can't go inside filter_ddelete because it can block */
290         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
291                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
292         LASSERT(atomic_read(&dentry->d_count) > 0);
293
294         dput(dentry);
295 }
296
297 /* Not racy w.r.t. others, because we are the only user of this dentry */
298 static void filter_drelease(struct dentry *dentry)
299 {
300         if (dentry->d_fsdata)
301                 kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
302 }
303
304 struct dentry_operations filter_dops = {
305         .d_release = filter_drelease,
306 };
307
308 #define LAST_RCVD "last_rcvd"
309 #define INIT_OBJID 2
310
311 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
312 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
313 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
314
315 static unsigned long filter_last_rcvd_slots[FILTER_LR_MAX_CLIENT_WORDS];
316
317 /* Add client data to the FILTER.  We use a bitmap to locate a free space
318  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
319  * Otherwise, we have just read the data from the last_rcvd file and
320  * we know its offset.
321  */
322 int filter_client_add(struct filter_obd *filter,
323                       struct filter_export_data *fed, int cl_off)
324 {
325         int new_client = (cl_off == -1);
326
327         /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
328          * there's no need for extra complication here
329          */
330         if (new_client) {
331                 cl_off = find_first_zero_bit(filter_last_rcvd_slots,
332                                              FILTER_LR_MAX_CLIENTS);
333         repeat:
334                 if (cl_off >= FILTER_LR_MAX_CLIENTS) {
335                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
336                         return -ENOMEM;
337                 }
338                 if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
339                         CERROR("FILTER client %d: found bit is set in bitmap\n",
340                                cl_off);
341                         cl_off = find_next_zero_bit(filter_last_rcvd_slots,
342                                                     FILTER_LR_MAX_CLIENTS,
343                                                     cl_off);
344                         goto repeat;
345                 }
346         } else {
347                 if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
348                         CERROR("FILTER client %d: bit already set in bitmap!\n",
349                                cl_off);
350                         LBUG();
351                 }
352         }
353
354         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
355                cl_off, fed->fed_fcd->fcd_uuid);
356
357         fed->fed_lr_off = cl_off;
358
359         if (new_client) {
360                 struct obd_run_ctxt saved;
361                 loff_t off = FILTER_LR_CLIENT_START +
362                         (cl_off * FILTER_LR_CLIENT_SIZE);
363                 ssize_t written;
364
365                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
366                 written = lustre_fwrite(filter->fo_rcvd_filp,
367                                                 (char *)fed->fed_fcd,
368                                                 sizeof(*fed->fed_fcd), &off);
369                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
370
371                 if (written != sizeof(*fed->fed_fcd)) {
372                         if (written < 0)
373                                 RETURN(written);
374                         RETURN(-EIO);
375                 }
376                 CDEBUG(D_INFO, "wrote client fcd at off %u (len %u)\n",
377                        FILTER_LR_CLIENT_START + (cl_off*FILTER_LR_CLIENT_SIZE),
378                        (unsigned int)sizeof(*fed->fed_fcd));
379         }
380         return 0;
381 }
382
383 int filter_client_free(struct obd_export *exp)
384 {
385         struct filter_export_data *fed = &exp->exp_filter_data;
386         struct filter_obd *filter = &exp->exp_obd->u.filter;
387         struct filter_client_data zero_fcd;
388         struct obd_run_ctxt saved;
389         int written;
390         loff_t off;
391
392         if (!fed->fed_fcd)
393                 RETURN(0);
394
395         off = FILTER_LR_CLIENT_START + (fed->fed_lr_off*FILTER_LR_CLIENT_SIZE);
396
397         CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
398                fed->fed_lr_off, off, fed->fed_fcd->fcd_uuid);
399
400         if (!test_and_clear_bit(fed->fed_lr_off, filter_last_rcvd_slots)) {
401                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
402                        fed->fed_lr_off);
403                 LBUG();
404         }
405
406         memset(&zero_fcd, 0, sizeof zero_fcd);
407         push_ctxt(&saved, &filter->fo_ctxt, NULL);
408         written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
409                                 sizeof(zero_fcd), &off);
410
411         /* XXX: this write gets lost sometimes, unless this sync is here. */
412 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
413         fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
414 #else
415         file_fsync(filter->fo_rcvd_filp,  filter->fo_rcvd_filp->f_dentry, 1);
416 #endif
417         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
418
419         if (written != sizeof(zero_fcd)) {
420                 CERROR("error zeroing out client %s off %d in %s: %d\n",
421                        fed->fed_fcd->fcd_uuid, fed->fed_lr_off, LAST_RCVD,
422                        written);
423         } else {
424                 CDEBUG(D_INFO,
425                        "zeroed disconnecting client %s at off %d ("LPX64")\n",
426                        fed->fed_fcd->fcd_uuid, fed->fed_lr_off, off);
427         }
428
429         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
430
431         return 0;
432 }
433
434 static void filter_unpack_fsd(struct filter_server_data *fsd)
435 {
436         fsd->fsd_last_objid = le64_to_cpu(fsd->fsd_last_objid);
437         fsd->fsd_last_rcvd = le64_to_cpu(fsd->fsd_last_rcvd);
438         fsd->fsd_mount_count = le64_to_cpu(fsd->fsd_mount_count);
439 }
440
441 static void filter_pack_fsd(struct filter_server_data *disk_fsd,
442                             struct filter_server_data *fsd)
443 {
444         memset(disk_fsd, 0, sizeof(*disk_fsd));
445         memcpy(disk_fsd->fsd_uuid, fsd->fsd_uuid, sizeof(fsd->fsd_uuid));
446         disk_fsd->fsd_last_objid = cpu_to_le64(fsd->fsd_last_objid);
447         disk_fsd->fsd_last_rcvd = cpu_to_le64(fsd->fsd_last_rcvd);
448         disk_fsd->fsd_mount_count = cpu_to_le64(fsd->fsd_mount_count);
449 }
450
451 static int filter_free_server_data(struct filter_obd *filter)
452 {
453         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
454         filter->fo_fsd = NULL;
455
456         return 0;
457 }
458
459
460 /* assumes caller has already in kernel ctxt */
461 static int filter_update_server_data(struct file *filp,
462                                      struct filter_server_data *fsd)
463 {
464         struct filter_server_data disk_fsd;
465         loff_t off = 0;
466         int rc;
467
468         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
469         CDEBUG(D_INODE, "server last_objid: "LPU64"\n", fsd->fsd_last_objid);
470         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", fsd->fsd_last_rcvd);
471         CDEBUG(D_INODE, "server last_mount: "LPU64"\n", fsd->fsd_mount_count);
472
473         filter_pack_fsd(&disk_fsd, fsd);
474         rc = lustre_fwrite(filp, (char *)&disk_fsd,
475                            sizeof(disk_fsd), &off);
476         if (rc != sizeof(disk_fsd)) {
477                 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
478                        rc);
479                 RETURN(-EIO);
480         }
481         RETURN(0);
482 }
483
484 /* assumes caller has already in kernel ctxt */
485 static int filter_init_server_data(struct obd_device *obd,
486                                    struct file * filp,
487                                    __u64 init_lastobjid)
488 {
489         struct filter_obd *filter = &obd->u.filter;
490         struct filter_server_data *fsd;
491         struct filter_client_data *fcd = NULL;
492         struct inode *inode = filp->f_dentry->d_inode;
493         unsigned long last_rcvd_size = inode->i_size;
494         int cl_off;
495         loff_t off = 0;
496         int rc;
497
498         /* ensure padding in the struct is the correct size */
499         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
500                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
501         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
502                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
503
504         OBD_ALLOC(fsd, sizeof(*fsd));
505         if (!fsd)
506                 RETURN(-ENOMEM);
507         filter->fo_fsd = fsd;
508
509         if (last_rcvd_size == 0) {
510                 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
511
512                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
513                 fsd->fsd_last_objid = init_lastobjid;
514                 fsd->fsd_last_rcvd = 0;
515                 fsd->fsd_mount_count = 0;
516
517         } else {
518                 ssize_t  retval = lustre_fread(filp, (char *)fsd,
519                                               sizeof(*fsd),
520                                               &off);
521                 if (retval != sizeof(*fsd)) {
522                         CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
523                         GOTO(out, rc = -EIO);
524                 }
525                 filter_unpack_fsd(fsd);
526         }
527
528         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
529                obd->obd_name, fsd->fsd_last_objid);
530         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
531                obd->obd_name, fsd->fsd_last_rcvd);
532         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
533                obd->obd_name, fsd->fsd_mount_count);
534
535         /*
536          * When we do a clean FILTER shutdown, we save the last_rcvd into
537          * the header.  If we find clients with higher last_rcvd values
538          * then those clients may need recovery done.
539          */
540         /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
541        for (off = FILTER_LR_CLIENT_START, cl_off = 0; off < last_rcvd_size;
542             cl_off++) {
543                 __u64 last_rcvd;
544                 int mount_age;
545
546                 if (!fcd) {
547                         OBD_ALLOC(fcd, sizeof(*fcd));
548                         if (!fcd)
549                                 GOTO(err_fsd, rc = -ENOMEM);
550                 }
551
552                 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
553                 if (rc != sizeof(*fcd)) {
554                         CERROR("error reading FILTER %s offset %d: rc = %d\n",
555                                LAST_RCVD, cl_off, rc);
556                         if (rc > 0) /* XXX fatal error or just abort reading? */
557                                 rc = -EIO;
558                         break;
559                 }
560
561                 if (fcd->fcd_uuid[0] == '\0') {
562                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
563                                cl_off);
564                         continue;
565                 }
566
567                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
568
569                 /* These exports are cleaned up by filter_disconnect(), so they
570                  * need to be set up like real exports as filter_connect() does.
571                  */
572                 mount_age = fsd->fsd_mount_count -
573                         le64_to_cpu(fcd->fcd_mount_count);
574                 if (mount_age < FILTER_MOUNT_RECOV) {
575                         CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
576                                "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
577                                "\n", fcd->fcd_uuid, cl_off,
578                                last_rcvd, fsd->fsd_last_rcvd,
579                                le64_to_cpu(fcd->fcd_mount_count),
580                                fsd->fsd_mount_count);
581 #if 0
582                         /* disabled until OST recovery is actually working */
583                         struct obd_export *exp = class_new_export(obd);
584                         struct filter_export_data *fed;
585
586                         if (!exp) {
587                                 rc = -ENOMEM;
588                                 break;
589                         }
590
591                         fed = &exp->exp_filter_data;
592                         fed->fed_fcd = fcd;
593                         filter_client_add(filter, fed, cl_off);
594                         /* create helper if export init gets more complex */
595                         INIT_LIST_HEAD(&fed->fed_open_head);
596                         spin_lock_init(&fed->fed_lock);
597
598                         fcd = NULL;
599                         filter->fo_recoverable_clients++;
600 #endif
601                 } else {
602                         CDEBUG(D_INFO,
603                                "discarded client %d, UUID '%s', count %Ld\n",
604                                cl_off, fcd->fcd_uuid,
605                                (long long)le64_to_cpu(fcd->fcd_mount_count));
606                 }
607
608                 CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
609                        cl_off, (unsigned long long)last_rcvd);
610
611                 if (last_rcvd > filter->fo_fsd->fsd_last_rcvd)
612                         filter->fo_fsd->fsd_last_rcvd = last_rcvd;
613         }
614
615         obd->obd_last_committed = filter->fo_fsd->fsd_last_rcvd;
616         if (filter->fo_recoverable_clients) {
617                 CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
618                        filter->fo_recoverable_clients,
619                        filter->fo_fsd->fsd_last_rcvd);
620                 filter->fo_next_recovery_transno = obd->obd_last_committed + 1;
621                 obd->obd_flags |= OBD_RECOVERING;
622         }
623
624         if (fcd)
625                 OBD_FREE(fcd, sizeof(*fcd));
626
627         fsd->fsd_mount_count++;
628
629         /* save it,so mount count and last_recvd is current */
630         rc = filter_update_server_data(filp, filter->fo_fsd);
631
632 out:
633         RETURN(rc);
634
635 err_fsd:
636         filter_free_server_data(filter);
637         RETURN(rc);
638 }
639
640 /* setup the object store with correct subdirectories */
641 static int filter_prep(struct obd_device *obd)
642 {
643         struct obd_run_ctxt saved;
644         struct filter_obd *filter = &obd->u.filter;
645         struct dentry *dentry;
646         struct file *file;
647         struct inode *inode;
648         int rc = 0;
649         int mode = 0;
650
651         push_ctxt(&saved, &filter->fo_ctxt, NULL);
652         dentry = simple_mkdir(current->fs->pwd, "O", 0700);
653         CDEBUG(D_INODE, "got/created O: %p\n", dentry);
654         if (IS_ERR(dentry)) {
655                 rc = PTR_ERR(dentry);
656                 CERROR("cannot open/create O: rc = %d\n", rc);
657                 GOTO(out, rc);
658         }
659         filter->fo_dentry_O = dentry;
660
661         /*
662          * Create directories and/or get dentries for each object type.
663          * This saves us from having to do multiple lookups for each one.
664          */
665         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
666                 char *name = obd_type_by_mode[mode];
667
668                 if (!name) {
669                         filter->fo_dentry_O_mode[mode] = NULL;
670                         continue;
671                 }
672                 dentry = simple_mkdir(filter->fo_dentry_O, name, 0700);
673                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
674                 if (IS_ERR(dentry)) {
675                         rc = PTR_ERR(dentry);
676                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
677                         GOTO(out_O_mode, rc);
678                 }
679                 filter->fo_dentry_O_mode[mode] = dentry;
680         }
681
682         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
683         if ( !file || IS_ERR(file) ) {
684                 rc = PTR_ERR(file);
685                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
686                        LAST_RCVD, rc);
687                 GOTO(out_O_mode, rc);
688         }
689
690         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
691                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
692                        file->f_dentry->d_inode->i_mode);
693                 GOTO(err_filp, rc = -ENOENT);
694         }
695
696         rc = fsfilt_journal_data(obd, file);
697         if (rc) {
698                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
699                 GOTO(err_filp, rc);
700         }
701         /* steal operations */
702         inode = file->f_dentry->d_inode;
703         filter->fo_fop = file->f_op;
704         filter->fo_iop = inode->i_op;
705         filter->fo_aops = inode->i_mapping->a_ops;
706
707         rc = filter_init_server_data(obd, file, INIT_OBJID);
708         if (rc) {
709                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
710                 GOTO(err_client, rc);
711         }
712         filter->fo_rcvd_filp = file;
713
714         rc = 0;
715  out:
716         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
717
718         return(rc);
719
720 err_client:
721         class_disconnect_all(obd);
722 err_filp:
723         if (filp_close(file, 0))
724                 CERROR("can't close %s after error\n", LAST_RCVD);
725         filter->fo_rcvd_filp = NULL;
726  out_O_mode:
727         while (mode-- > 0) {
728                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
729                 if (dentry) {
730                         f_dput(dentry);
731                         filter->fo_dentry_O_mode[mode] = NULL;
732                 }
733         }
734         f_dput(filter->fo_dentry_O);
735         filter->fo_dentry_O = NULL;
736         goto out;
737 }
738
739 /* cleanup the filter: write last used object id to status file */
740 static void filter_post(struct obd_device *obd)
741 {
742         struct obd_run_ctxt saved;
743         struct filter_obd *filter = &obd->u.filter;
744         long rc;
745         int mode;
746
747         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
748          * best to start a transaction with h_sync, because we removed this
749          * from lastobjid */
750
751         push_ctxt(&saved, &filter->fo_ctxt, NULL);
752         rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
753         if (rc)
754                 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
755         filter_free_server_data(filter);
756
757
758         if (filter->fo_rcvd_filp) {
759                 /* broken sync at umount bug workaround  */
760 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
761                 rc = fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
762 #else
763                 rc = file_fsync(filter->fo_rcvd_filp,
764                                 filter->fo_rcvd_filp->f_dentry, 1);
765 #endif
766                 filp_close(filter->fo_rcvd_filp, 0);
767                 filter->fo_rcvd_filp = NULL;
768                 if (rc)
769                         CERROR("last_rcvd file won't closek rc = %ld\n", rc);
770         }
771
772         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
773                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
774                 if (dentry) {
775                         f_dput(dentry);
776                         filter->fo_dentry_O_mode[mode] = NULL;
777                 }
778         }
779         f_dput(filter->fo_dentry_O);
780         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
781 }
782
783
784 static __u64 filter_next_id(struct obd_device *obd)
785 {
786         obd_id id;
787         LASSERT(obd->u.filter.fo_fsd != NULL);
788
789         spin_lock(&obd->u.filter.fo_objidlock);
790         id = ++obd->u.filter.fo_fsd->fsd_last_objid;
791         spin_unlock(&obd->u.filter.fo_objidlock);
792
793         return id;
794 }
795
796 /* how to get files, dentries, inodes from object id's */
797 /* parent i_sem is already held if needed for exclusivity */
798 static struct dentry *filter_fid2dentry(struct obd_device *obd,
799                                         struct dentry *dparent,
800                                         __u64 id, int lockit)
801 {
802         struct super_block *sb = obd->u.filter.fo_sb;
803         struct dentry *dchild;
804         char name[32];
805         int len;
806         ENTRY;
807
808         if (!sb || !sb->s_dev) {
809                 CERROR("fatal: device not initialized.\n");
810                 RETURN(ERR_PTR(-ENXIO));
811         }
812
813         if (id == 0) {
814                 CERROR("fatal: invalid object id 0\n");
815                 LBUG();
816                 RETURN(ERR_PTR(-ESTALE));
817         }
818
819         len = sprintf(name, LPU64, id);
820         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
821                dparent->d_name.len, dparent->d_name.name, name);
822         if (lockit)
823                 down(&dparent->d_inode->i_sem);
824         dchild = lookup_one_len(name, dparent, len);
825         if (lockit)
826                 up(&dparent->d_inode->i_sem);
827         if (IS_ERR(dchild)) {
828                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
829                 RETURN(dchild);
830         }
831
832         CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
833                dparent->d_name.len, dparent->d_name.name, name, dchild,
834                atomic_read(&dchild->d_count));
835
836         LASSERT(atomic_read(&dchild->d_count) > 0);
837
838         RETURN(dchild);
839 }
840
841 static inline struct dentry *filter_parent(struct obd_device *obd,
842                                            obd_mode mode)
843 {
844         struct filter_obd *filter = &obd->u.filter;
845
846         LASSERT((mode & S_IFMT) == S_IFREG);   /* only regular files for now */
847         return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
848 }
849
850 static struct file *filter_obj_open(struct obd_export *export,
851                                     __u64 id, __u32 type)
852 {
853         struct filter_obd *filter = &export->exp_obd->u.filter;
854         struct super_block *sb = filter->fo_sb;
855         struct dentry *dentry;
856         struct filter_export_data *fed = &export->exp_filter_data;
857         struct filter_dentry_data *fdd;
858         struct filter_file_data *ffd;
859         struct obd_run_ctxt saved;
860         char name[24];
861         struct file *file;
862         ENTRY;
863
864         if (!sb || !sb->s_dev) {
865                 CERROR("fatal: device not initialized.\n");
866                 RETURN(ERR_PTR(-ENXIO));
867         }
868
869         if (!id) {
870                 CERROR("fatal: invalid obdo "LPU64"\n", id);
871                 RETURN(ERR_PTR(-ESTALE));
872         }
873
874         if (!(type & S_IFMT)) {
875                 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
876                        __FUNCTION__, id, type);
877                 RETURN(ERR_PTR(-EINVAL));
878         }
879
880         PORTAL_SLAB_ALLOC(ffd, filter_open_cache, sizeof(*ffd));
881         if (!ffd) {
882                 CERROR("obdfilter: out of memory\n");
883                 RETURN(ERR_PTR(-ENOMEM));
884         }
885
886         /* We preallocate this to avoid blocking while holding fo_fddlock */
887         fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
888         if (!fdd) {
889                 CERROR("obdfilter: out of memory\n");
890                 GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
891         }
892
893         filter_id(name, id, type);
894         push_ctxt(&saved, &filter->fo_ctxt, NULL);
895         file = filp_open(name, O_RDWR | O_LARGEFILE, 0 /* type? */);
896         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
897
898         if (IS_ERR(file)) {
899                 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
900                 GOTO(out_fdd, file);
901         }
902
903         dentry = file->f_dentry;
904         spin_lock(&filter->fo_fddlock);
905         if (dentry->d_fsdata) {
906                 spin_unlock(&filter->fo_fddlock);
907                 kmem_cache_free(filter_dentry_cache, fdd);
908                 fdd = dentry->d_fsdata;
909                 LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
910                 /* should only happen during client recovery */
911                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
912                         CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id);
913                 atomic_inc(&fdd->fdd_open_count);
914         } else {
915                 atomic_set(&fdd->fdd_open_count, 1);
916                 fdd->fdd_flags = 0;
917                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
918                 dentry->d_fsdata = fdd;
919                 spin_unlock(&filter->fo_fddlock);
920         }
921
922         get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
923         ffd->ffd_file = file;
924         file->private_data = ffd;
925
926         if (!dentry->d_op)
927                 dentry->d_op = &filter_dops;
928         else
929                 LASSERT(dentry->d_op == &filter_dops);
930
931         spin_lock(&fed->fed_lock);
932         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
933         spin_unlock(&fed->fed_lock);
934
935         CDEBUG(D_INODE, "opened objid "LPX64": rc = %p\n", id, file);
936         EXIT;
937 out:
938         return file;
939
940 out_fdd:
941         kmem_cache_free(filter_dentry_cache, fdd);
942 out_ffd:
943         ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
944         PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
945         goto out;
946 }
947
948 /* Caller must hold i_sem on dir_dentry->d_inode */
949 /* Caller must push us into kernel context */
950 static int filter_destroy_internal(struct obd_device *obd,
951                                    struct dentry *dir_dentry,
952                                    struct dentry *object_dentry)
953 {
954         struct inode *inode = object_dentry->d_inode;
955         int rc;
956         ENTRY;
957
958         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
959                 CERROR("destroying objid %*s nlink = %d, count = %d\n",
960                        object_dentry->d_name.len,
961                        object_dentry->d_name.name,
962                        inode->i_nlink, atomic_read(&inode->i_count));
963         }
964
965         rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
966
967         if (rc)
968                 CERROR("error unlinking objid %*s: rc %d\n",
969                        object_dentry->d_name.len,
970                        object_dentry->d_name.name, rc);
971
972         RETURN(rc);
973 }
974
975 static int filter_close_internal(struct obd_export *export,
976                                  struct filter_file_data *ffd,
977                                  struct obd_trans_info *oti)
978 {
979         struct obd_device *obd = export->exp_obd;
980         struct filter_obd *filter = &obd->u.filter;
981         struct file *filp = ffd->ffd_file;
982         struct dentry *object_dentry = dget(filp->f_dentry);
983         struct filter_dentry_data *fdd = object_dentry->d_fsdata;
984         int rc, rc2;
985         ENTRY;
986
987         LASSERT(filp->private_data == ffd);
988         LASSERT(fdd);
989
990         rc = filp_close(filp, 0);
991
992         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
993             fdd->fdd_flags & FILTER_FLAG_DESTROY) {
994                 struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
995                 struct obd_run_ctxt saved;
996                 void *handle;
997
998                 down(&dir_dentry->d_inode->i_sem);
999                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1000                 filter_start_transno(export);
1001                 handle = fsfilt_start(obd, dir_dentry->d_inode,
1002                                       FSFILT_OP_UNLINK);
1003                 if (IS_ERR(handle)) {
1004                         rc = filter_finish_transno(export, handle, oti,
1005                                                    PTR_ERR(handle));
1006                         GOTO(out, rc);
1007                 }
1008                 /* XXX unlink from PENDING directory now too */
1009                 rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
1010                 if (rc2 && !rc)
1011                         rc = rc2;
1012                 rc = filter_finish_transno(export, handle, oti, rc);
1013                 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1014                 if (rc2) {
1015                         CERROR("error on commit, err = %d\n", rc2);
1016                         if (!rc)
1017                                 rc = rc2;
1018                 }
1019         out:
1020                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1021                 up(&dir_dentry->d_inode->i_sem);
1022         }
1023
1024         f_dput(object_dentry);
1025         PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1026
1027         RETURN(rc);
1028 }
1029
1030 /* obd methods */
1031 /* mount the file system (secretly) */
1032 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1033 {
1034         struct obd_ioctl_data* data = buf;
1035         struct filter_obd *filter;
1036         struct vfsmount *mnt;
1037         int rc = 0;
1038         ENTRY;
1039
1040         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1041                 RETURN(rc = -EINVAL);
1042
1043         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1044         if (IS_ERR(obd->obd_fsops))
1045                 RETURN(rc = PTR_ERR(obd->obd_fsops));
1046
1047         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
1048         rc = PTR_ERR(mnt);
1049         if (IS_ERR(mnt))
1050                 GOTO(err_ops, rc);
1051
1052         obd->obd_flags |= OBD_REPLAYABLE;
1053
1054         filter = &obd->u.filter;;
1055         init_MUTEX(&filter->fo_transno_sem);
1056         filter->fo_vfsmnt = mnt;
1057         filter->fo_fstype = strdup(data->ioc_inlbuf2);
1058         filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
1059         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1060
1061         OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1062         filter->fo_ctxt.pwdmnt = mnt;
1063         filter->fo_ctxt.pwd = mnt->mnt_root;
1064         filter->fo_ctxt.fs = get_ds();
1065
1066         rc = filter_prep(obd);
1067         if (rc)
1068                 GOTO(err_kfree, rc);
1069
1070         spin_lock_init(&filter->fo_fddlock);
1071         spin_lock_init(&filter->fo_objidlock);
1072         INIT_LIST_HEAD(&filter->fo_export_list);
1073
1074         obd->obd_namespace =
1075                 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
1076         if (!obd->obd_namespace)
1077                 GOTO(err_post, rc = -ENOMEM);
1078
1079         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1080                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1081
1082         RETURN(0);
1083
1084 err_post:
1085         filter_post(obd);
1086 err_kfree:
1087         kfree(filter->fo_fstype);
1088         unlock_kernel();
1089         mntput(filter->fo_vfsmnt);
1090         filter->fo_sb = 0;
1091         lock_kernel();
1092 err_ops:
1093         fsfilt_put_ops(obd->obd_fsops);
1094         return rc;
1095 }
1096
1097
1098 static int filter_cleanup(struct obd_device *obd)
1099 {
1100         struct super_block *sb;
1101         ENTRY;
1102
1103         if (!list_empty(&obd->obd_exports)) {
1104                 CERROR("still has clients!\n");
1105                 class_disconnect_all(obd);
1106                 if (!list_empty(&obd->obd_exports)) {
1107                         CERROR("still has exports after forced cleanup?\n");
1108                         RETURN(-EBUSY);
1109                 }
1110         }
1111
1112         ldlm_namespace_free(obd->obd_namespace);
1113
1114         sb = obd->u.filter.fo_sb;
1115         if (!obd->u.filter.fo_sb)
1116                 RETURN(0);
1117
1118         filter_post(obd);
1119
1120         shrink_dcache_parent(sb->s_root);
1121         unlock_kernel();
1122         mntput(obd->u.filter.fo_vfsmnt);
1123         obd->u.filter.fo_sb = 0;
1124         kfree(obd->u.filter.fo_fstype);
1125         fsfilt_put_ops(obd->obd_fsops);
1126
1127         lock_kernel();
1128
1129         RETURN(0);
1130 }
1131
1132 int filter_attach(struct obd_device *dev, obd_count len, void *data)
1133 {
1134         struct lprocfs_static_vars lvars;
1135
1136         lprocfs_init_vars(&lvars);
1137         return lprocfs_obd_attach(dev, lvars.obd_vars);
1138 }
1139
1140 int filter_detach(struct obd_device *dev)
1141 {
1142         return lprocfs_obd_detach(dev);
1143 }
1144
1145 /* nearly identical to mds_connect */
1146 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1147                           struct obd_uuid *cluuid, struct recovd_obd *recovd,
1148                           ptlrpc_recovery_cb_t recover)
1149 {
1150         struct obd_export *exp;
1151         struct filter_export_data *fed;
1152         struct filter_client_data *fcd;
1153         struct filter_obd *filter = &obd->u.filter;
1154         int rc;
1155
1156         ENTRY;
1157
1158         if (!conn || !obd || !cluuid)
1159                 RETURN(-EINVAL);
1160
1161         rc = class_connect(conn, obd, cluuid);
1162         if (rc)
1163                 RETURN(rc);
1164         exp = class_conn2export(conn);
1165         LASSERT(exp);
1166         fed = &exp->exp_filter_data;
1167
1168         OBD_ALLOC(fcd, sizeof(*fcd));
1169         if (!fcd) {
1170                 CERROR("filter: out of memory for client data\n");
1171                 GOTO(out_export, rc = -ENOMEM);
1172         }
1173
1174         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1175         fed->fed_fcd = fcd;
1176         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1177
1178         INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
1179         spin_lock_init(&exp->exp_filter_data.fed_lock);
1180
1181         rc = filter_client_add(filter, fed, -1);
1182         if (rc)
1183                 GOTO(out_fcd, rc);
1184
1185         RETURN(rc);
1186
1187 out_fcd:
1188         OBD_FREE(fcd, sizeof(*fcd));
1189 out_export:
1190         class_disconnect(conn);
1191
1192         RETURN(rc);
1193 }
1194
1195 /* also incredibly similar to mds_disconnect */
1196 static int filter_disconnect(struct lustre_handle *conn)
1197 {
1198         struct obd_export *exp = class_conn2export(conn);
1199         struct filter_export_data *fed;
1200         int rc;
1201         ENTRY;
1202
1203         LASSERT(exp);
1204         fed = &exp->exp_filter_data;
1205         spin_lock(&fed->fed_lock);
1206         while (!list_empty(&fed->fed_open_head)) {
1207                 struct filter_file_data *ffd;
1208
1209                 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1210                                  ffd_export_list);
1211                 list_del(&ffd->ffd_export_list);
1212                 spin_unlock(&fed->fed_lock);
1213
1214                 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1215                        ffd->ffd_file->f_dentry->d_name.len,
1216                        ffd->ffd_file->f_dentry->d_name.name,
1217                        ffd, ffd->ffd_servercookie);
1218
1219                 filter_close_internal(exp, ffd, NULL);
1220                 spin_lock(&fed->fed_lock);
1221         }
1222         spin_unlock(&fed->fed_lock);
1223
1224         ldlm_cancel_locks_for_export(exp);
1225         filter_client_free(exp);
1226
1227         rc = class_disconnect(conn);
1228
1229         /* XXX cleanup preallocated inodes */
1230         RETURN(rc);
1231 }
1232
1233 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1234 {
1235         int type = oa->o_mode & S_IFMT;
1236         ENTRY;
1237
1238         CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPX64" valid 0x%08x\n",
1239                inode->i_ino, inode, oa->o_id, valid);
1240         /* Don't copy the inode number in place of the object ID */
1241         obdo_from_inode(oa, inode, valid);
1242         oa->o_mode &= ~S_IFMT;
1243         oa->o_mode |= type;
1244
1245         if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1246                 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1247                 oa->o_rdev = rdev;
1248                 oa->o_valid |= OBD_MD_FLRDEV;
1249         }
1250
1251         EXIT;
1252 }
1253
1254 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
1255 {
1256         struct filter_file_data *ffd = NULL;
1257         ENTRY;
1258
1259         if (!handle || !handle->addr)
1260                 RETURN(NULL);
1261
1262         ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
1263         if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
1264                 RETURN(NULL);
1265
1266         if (ffd->ffd_servercookie != handle->cookie)
1267                 RETURN(NULL);
1268
1269         LASSERT(ffd->ffd_file->private_data == ffd);
1270         RETURN(ffd);
1271 }
1272
1273 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1274                                          struct obdo *oa, int locked,char *what)
1275 {
1276         struct dentry *dentry = NULL;
1277
1278         if (oa->o_valid & OBD_MD_FLHANDLE) {
1279                 struct lustre_handle *ost_handle = obdo_handle(oa);
1280                 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1281
1282                 if (ffd)
1283                         dentry = dget(ffd->ffd_file->f_dentry);
1284         }
1285
1286         if (!dentry) {
1287                 struct obd_device *obd = class_conn2obd(conn);
1288                 if (!obd) {
1289                         CERROR("invalid client "LPX64"\n", conn->addr);
1290                         RETURN(ERR_PTR(-EINVAL));
1291                 }
1292                 dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
1293                                            oa->o_id, locked);
1294         }
1295
1296         if (IS_ERR(dentry)) {
1297                 CERROR("%s error looking up object: "LPX64"\n", what, oa->o_id);
1298                 RETURN(dentry);
1299         }
1300
1301         if (!dentry->d_inode) {
1302                 CERROR("%s on non-existent object: "LPX64"\n", what, oa->o_id);
1303                 f_dput(dentry);
1304                 LBUG();
1305                 RETURN(ERR_PTR(-ENOENT));
1306         }
1307
1308         return dentry;
1309 }
1310
1311 #define filter_oa2dentry(conn, oa, locked) __filter_oa2dentry(conn, oa, locked,\
1312                                                               __FUNCTION__)
1313
1314 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1315                           struct lov_stripe_md *md)
1316 {
1317         struct dentry *dentry = NULL;
1318         int rc = 0;
1319         ENTRY;
1320
1321         XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
1322
1323         dentry = filter_oa2dentry(conn, oa, 1);
1324         if (IS_ERR(dentry))
1325                 RETURN(PTR_ERR(dentry));
1326
1327         filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1328
1329         f_dput(dentry);
1330         RETURN(rc);
1331 }
1332
1333 /* this is called from filter_truncate() until we have filter_punch() */
1334 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1335                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1336 {
1337         struct obd_run_ctxt saved;
1338         struct obd_export *export = class_conn2export(conn);
1339         struct obd_device *obd = class_conn2obd(conn);
1340         struct filter_obd *filter = &obd->u.filter;
1341         struct dentry *dentry;
1342         struct iattr iattr;
1343         struct inode *inode;
1344         void * handle;
1345         int rc, rc2;
1346         ENTRY;
1347
1348         XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
1349
1350         dentry = filter_oa2dentry(conn, oa, 0);
1351
1352         if (IS_ERR(dentry))
1353                 RETURN(PTR_ERR(dentry));
1354
1355         iattr_from_obdo(&iattr, oa, oa->o_valid);
1356         iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1357         inode = dentry->d_inode;
1358
1359         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1360         lock_kernel();
1361         if (iattr.ia_valid & ATTR_SIZE)
1362                 down(&inode->i_sem);
1363
1364         filter_start_transno(export);
1365         handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1366         if (IS_ERR(handle)) {
1367                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1368                 GOTO(out_unlock, rc);
1369         }
1370
1371         if (inode->i_op->setattr)
1372                 rc = inode->i_op->setattr(dentry, &iattr);
1373         else
1374                 rc = inode_setattr(inode, &iattr);
1375         rc = filter_finish_transno(export, handle, oti, rc);
1376         rc2 = fsfilt_commit(obd, dentry->d_inode, handle);
1377         if (rc2) {
1378                 CERROR("error on commit, err = %d\n", rc2);
1379                 if (!rc)
1380                         rc = rc2;
1381         }
1382
1383         if (iattr.ia_valid & ATTR_SIZE) {
1384                 up(&inode->i_sem);
1385                 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1386                 obdo_from_inode(oa, inode, oa->o_valid);
1387         }
1388
1389 out_unlock:
1390         unlock_kernel();
1391         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1392
1393         f_dput(dentry);
1394         RETURN(rc);
1395 }
1396
1397 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1398                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
1399 {
1400         struct obd_export *export;
1401         struct lustre_handle *handle;
1402         struct filter_file_data *ffd;
1403         struct file *filp;
1404         int rc = 0;
1405         ENTRY;
1406
1407         export = class_conn2export(conn);
1408         if (!export) {
1409                 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1410                 RETURN(-EINVAL);
1411         }
1412
1413         XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
1414
1415         filp = filter_obj_open(export, oa->o_id, oa->o_mode);
1416         if (IS_ERR(filp))
1417                 GOTO(out, rc = PTR_ERR(filp));
1418
1419         filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1420
1421         ffd = filp->private_data;
1422         handle = obdo_handle(oa);
1423         handle->addr = (__u64)(unsigned long)ffd;
1424         handle->cookie = ffd->ffd_servercookie;
1425         oa->o_valid |= OBD_MD_FLHANDLE;
1426         EXIT;
1427 out:
1428         return rc;
1429 } /* filter_open */
1430
1431 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1432                         struct lov_stripe_md *ea, struct obd_trans_info *oti)
1433 {
1434         struct obd_export *exp;
1435         struct filter_file_data *ffd;
1436         struct filter_export_data *fed;
1437         int rc;
1438         ENTRY;
1439
1440         exp = class_conn2export(conn);
1441         if (!exp) {
1442                 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1443                 RETURN(-EINVAL);
1444         }
1445
1446         XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
1447
1448         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1449                 CERROR("no handle for close of objid "LPX64"\n", oa->o_id);
1450                 RETURN(-EINVAL);
1451         }
1452
1453         ffd = filter_handle2ffd(obdo_handle(oa));
1454         if (!ffd) {
1455                 struct lustre_handle *handle = obdo_handle(oa);
1456                 CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
1457                        handle->addr, handle->cookie);
1458                 RETURN(-ESTALE);
1459         }
1460
1461         fed = &exp->exp_filter_data;
1462         spin_lock(&fed->fed_lock);
1463         list_del(&ffd->ffd_export_list);
1464         spin_unlock(&fed->fed_lock);
1465
1466         rc = filter_close_internal(exp, ffd, oti);
1467
1468         RETURN(rc);
1469 } /* filter_close */
1470
1471 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1472                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
1473 {
1474         struct obd_export *export = class_conn2export(conn);
1475         struct obd_device *obd = class_conn2obd(conn);
1476         struct filter_obd *filter = &obd->u.filter;
1477         struct obd_run_ctxt saved;
1478         struct dentry *dir_dentry;
1479         struct dentry *new;
1480         struct iattr;
1481         void *handle;
1482         int err, rc;
1483         ENTRY;
1484
1485         if (!obd) {
1486                 CERROR("invalid client "LPX64"\n", conn->addr);
1487                 return -EINVAL;
1488         }
1489
1490         XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
1491
1492         oa->o_id = filter_next_id(obd);
1493
1494         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1495         dir_dentry = filter_parent(obd, oa->o_mode);
1496         down(&dir_dentry->d_inode->i_sem);
1497         new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
1498         if (IS_ERR(new))
1499                 GOTO(out, rc = PTR_ERR(new));
1500
1501         if (new->d_inode) {
1502                 /* This would only happen if lastobjid was bad on disk */
1503                 CERROR("objid O/%*s/"LPU64" already exists\n",
1504                        dir_dentry->d_name.len, dir_dentry->d_name.name,
1505                        oa->o_id);
1506                 LBUG();
1507                 GOTO(out, rc = -EEXIST);
1508         }
1509
1510         filter_start_transno(export);
1511         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
1512         if (IS_ERR(handle)) {
1513                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1514                 GOTO(out_put, rc);
1515         }
1516         rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
1517         if (rc)
1518                 CERROR("create failed rc = %d\n", rc);
1519
1520         rc = filter_finish_transno(export, handle, oti, rc);
1521         err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1522         if (err) {
1523                 CERROR("unable to write lastobjid but file created\n");
1524                 if (!rc)
1525                         rc = err;
1526         }
1527         err = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1528         if (err) {
1529                 CERROR("error on commit, err = %d\n", err);
1530                 if (!rc)
1531                         rc = err;
1532         }
1533
1534         if (rc)
1535                 GOTO(out_put, rc);
1536
1537         /* Set flags for fields we have set in the inode struct */
1538         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1539                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1540         filter_from_inode(oa, new->d_inode, oa->o_valid);
1541
1542         EXIT;
1543 out_put:
1544         f_dput(new);
1545 out:
1546         up(&dir_dentry->d_inode->i_sem);
1547         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1548         return rc;
1549 }
1550
1551 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1552                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
1553 {
1554         struct obd_export *export = class_conn2export(conn);
1555         struct obd_device *obd = class_conn2obd(conn);
1556         struct filter_obd *filter = &obd->u.filter;
1557         struct dentry *dir_dentry, *object_dentry;
1558         struct filter_dentry_data *fdd;
1559         struct obd_run_ctxt saved;
1560         void *handle;
1561         int rc, rc2;
1562         ENTRY;
1563
1564         if (!obd) {
1565                 CERROR("invalid client "LPX64"\n", conn->addr);
1566                 RETURN(-EINVAL);
1567         }
1568
1569         XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
1570
1571         CDEBUG(D_INODE, "destroying objid "LPX64"\n", oa->o_id);
1572
1573         dir_dentry = filter_parent(obd, oa->o_mode);
1574         down(&dir_dentry->d_inode->i_sem);
1575
1576         object_dentry = filter_oa2dentry(conn, oa, 0);
1577         if (IS_ERR(object_dentry))
1578                 GOTO(out, rc = -ENOENT);
1579
1580         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1581         filter_start_transno(export);
1582         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
1583         if (IS_ERR(handle)) {
1584                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1585                 GOTO(out_ctxt, rc);
1586         }
1587
1588         fdd = object_dentry->d_fsdata;
1589         if (fdd && atomic_read(&fdd->fdd_open_count)) {
1590                 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1591                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1592                         /* XXX put into PENDING directory in case of crash */
1593                         CDEBUG(D_INODE,
1594                                "defer destroy of %dx open objid "LPX64"\n",
1595                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1596                 } else
1597                         CDEBUG(D_INODE,
1598                                "repeat destroy of %dx open objid "LPX64"\n",
1599                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1600                 GOTO(out_commit, rc = 0);
1601         }
1602
1603         rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
1604
1605 out_commit:
1606         /* XXX save last_rcvd on disk */
1607         rc = filter_finish_transno(export, handle, oti, rc);
1608         rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1609         if (rc2) {
1610                 CERROR("error on commit, err = %d\n", rc2);
1611                 if (!rc)
1612                         rc = rc2;
1613         }
1614 out_ctxt:
1615         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1616         f_dput(object_dentry);
1617
1618         EXIT;
1619 out:
1620         up(&dir_dentry->d_inode->i_sem);
1621         return rc;
1622 }
1623
1624 /* NB start and end are used for punch, but not truncate */
1625 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1626                            struct lov_stripe_md *lsm,
1627                            obd_off start, obd_off end,
1628                            struct obd_trans_info *oti)
1629 {
1630         int error;
1631         ENTRY;
1632
1633         XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
1634
1635         if (end != OBD_OBJECT_EOF)
1636                 CERROR("PUNCH not supported, only truncate works\n");
1637
1638         CDEBUG(D_INODE, "calling truncate for object "LPX64", valid = %x, "
1639                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1640         oa->o_size = start;
1641         error = filter_setattr(conn, oa, NULL, oti);
1642         RETURN(error);
1643 }
1644
1645 static inline void lustre_put_page(struct page *page)
1646 {
1647         kunmap(page);
1648         page_cache_release(page);
1649 }
1650
1651
1652 static struct page *
1653 lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb)
1654 {
1655         unsigned long index = rnb->offset >> PAGE_SHIFT;
1656         struct address_space *mapping = inode->i_mapping;
1657         struct page *page;
1658         int rc;
1659
1660         page = read_cache_page(mapping, index,
1661                                (filler_t*)mapping->a_ops->readpage, NULL);
1662         if (!IS_ERR(page)) {
1663                 wait_on_page(page);
1664                 kmap(page);
1665                 if (!PageUptodate(page)) {
1666                         CERROR("page index %lu not uptodate\n", index);
1667                         GOTO(err_page, rc = -EIO);
1668                 }
1669                 if (PageError(page)) {
1670                         CERROR("page index %lu has error\n", index);
1671                         GOTO(err_page, rc = -EIO);
1672                 }
1673         }
1674         return page;
1675
1676 err_page:
1677         lustre_put_page(page);
1678         return ERR_PTR(rc);
1679 }
1680
1681 static struct page *
1682 lustre_get_page_write(struct inode *inode, unsigned long index)
1683 {
1684         struct address_space *mapping = inode->i_mapping;
1685         struct page *page;
1686         int rc;
1687
1688         page = grab_cache_page(mapping, index); /* locked page */
1689
1690         if (!IS_ERR(page)) {
1691                 kmap(page);
1692                 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
1693                  * a no-op for most filesystems, because we write the whole
1694                  * page.  For partial-page I/O this will read in the page.
1695                  */
1696                 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
1697                 if (rc) {
1698                         CERROR("page index %lu, rc = %d\n", index, rc);
1699                         if (rc != -ENOSPC)
1700                                 LBUG();
1701                         GOTO(err_unlock, rc);
1702                 }
1703                 /* XXX not sure if we need this if we are overwriting page */
1704                 if (PageError(page)) {
1705                         CERROR("error on page index %lu, rc = %d\n", index, rc);
1706                         LBUG();
1707                         GOTO(err_unlock, rc = -EIO);
1708                 }
1709         }
1710         return page;
1711
1712 err_unlock:
1713         unlock_page(page);
1714         lustre_put_page(page);
1715         return ERR_PTR(rc);
1716 }
1717
1718 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1719 int waitfor_one_page(struct page *page)
1720 {
1721         wait_on_page_locked(page);
1722         return 0;
1723 }
1724 #endif
1725
1726 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1727 /* We should only change the file mtime (and not the ctime, like
1728  * update_inode_times() in generic_file_write()) when we only change data.
1729  */
1730 static inline void inode_update_time(struct inode *inode, int ctime_too)
1731 {
1732         time_t now = CURRENT_TIME;
1733         if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
1734                 return;
1735         inode->i_mtime = now;
1736         if (ctime_too)
1737                 inode->i_ctime = now;
1738         mark_inode_dirty_sync(inode);
1739 }
1740 #endif
1741
1742 static int lustre_commit_write(struct niobuf_local *lnb)
1743 {
1744         struct page *page = lnb->page;
1745         unsigned from = lnb->offset & ~PAGE_MASK;
1746         unsigned to = from + lnb->len;
1747         struct inode *inode = page->mapping->host;
1748         int err;
1749
1750         LASSERT(to <= PAGE_SIZE);
1751         err = page->mapping->a_ops->commit_write(NULL, page, from, to);
1752         if (!err && IS_SYNC(inode))
1753                 err = waitfor_one_page(page);
1754         //SetPageUptodate(page); // the client commit_write will do this
1755
1756         SetPageReferenced(page);
1757         unlock_page(page);
1758         lustre_put_page(page);
1759         return err;
1760 }
1761
1762 struct page *filter_get_page_write(struct inode *inode,
1763                                    struct niobuf_remote *rnb,
1764                                    struct niobuf_local *lnb, int *pglocked)
1765 {
1766         unsigned long index = rnb->offset >> PAGE_SHIFT;
1767         struct address_space *mapping = inode->i_mapping;
1768
1769         struct page *page;
1770         int rc;
1771
1772         //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
1773         if (*pglocked)
1774                 page = grab_cache_page_nowait(mapping, index); /* locked page */
1775         else
1776                 page = grab_cache_page(mapping, index); /* locked page */
1777
1778
1779         /* This page is currently locked, so get a temporary page instead. */
1780         /* XXX I believe this is a very dangerous thing to do - consider if
1781          *     we had multiple writers for the same file (definitely the case
1782          *     if we are using this codepath).  If writer A locks the page,
1783          *     writer B writes to a copy (as here), writer A drops the page
1784          *     lock, and writer C grabs the lock before B does, then B will
1785          *     later overwrite the data from C, even if C had LDLM locked
1786          *     and initiated the write after B did.
1787          */
1788         if (!page) {
1789                 unsigned long addr;
1790                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
1791                 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
1792                 if (!addr) {
1793                         CERROR("no memory for a temp page\n");
1794                         LBUG();
1795                         GOTO(err, rc = -ENOMEM);
1796                 }
1797                 /* XXX debugging */
1798                 memset((void *)addr, 0xBA, PAGE_SIZE);
1799                 page = virt_to_page(addr);
1800                 kmap(page);
1801                 page->index = index;
1802                 lnb->flags |= N_LOCAL_TEMP_PAGE;
1803         } else if (!IS_ERR(page)) {
1804                 (*pglocked)++;
1805                 kmap(page);
1806
1807                 rc = mapping->a_ops->prepare_write(NULL, page,
1808                                                    rnb->offset % PAGE_SIZE,
1809                                                    rnb->len);
1810                 if (rc) {
1811                         CERROR("page index %lu, rc = %d\n", index, rc);
1812                         if (rc != -ENOSPC)
1813                                 LBUG();
1814                         GOTO(err_unlock, rc);
1815                 }
1816                 /* XXX not sure if we need this if we are overwriting page */
1817                 if (PageError(page)) {
1818                         CERROR("error on page index %lu, rc = %d\n", index, rc);
1819                         LBUG();
1820                         GOTO(err_unlock, rc = -EIO);
1821                 }
1822         }
1823         return page;
1824
1825 err_unlock:
1826         unlock_page(page);
1827         lustre_put_page(page);
1828 err:
1829         return ERR_PTR(rc);
1830 }
1831
1832 /*
1833  * We need to balance prepare_write() calls with commit_write() calls.
1834  * If the page has been prepared, but we have no data for it, we don't
1835  * want to overwrite valid data on disk, but we still need to zero out
1836  * data for space which was newly allocated.  Like part of what happens
1837  * in __block_prepare_write() for newly allocated blocks.
1838  *
1839  * XXX currently __block_prepare_write() creates buffers for all the
1840  *     pages, and the filesystems mark these buffers as BH_New if they
1841  *     were newly allocated from disk. We use the BH_New flag similarly.
1842  */
1843 static int filter_commit_write(struct niobuf_local *lnb, int err)
1844 {
1845 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1846         if (err) {
1847                 unsigned block_start, block_end;
1848                 struct buffer_head *bh, *head = lnb->page->buffers;
1849                 unsigned blocksize = head->b_size;
1850
1851                 /* debugging: just seeing if this ever happens */
1852                 CERROR("called filter_commit_write for ino %lu:%lu on err %d\n",
1853                        lnb->page->mapping->host->i_ino, lnb->page->index, err);
1854
1855                 /* Currently one buffer per page, but in the future... */
1856                 for (bh = head, block_start = 0; bh != head || !block_start;
1857                      block_start = block_end, bh = bh->b_this_page) {
1858                         block_end = block_start + blocksize;
1859                         if (buffer_new(bh))
1860                                 memset(lnb->addr + block_start, 0, blocksize);
1861                 }
1862         }
1863 #endif
1864         return lustre_commit_write(lnb);
1865 }
1866
1867 static int filter_preprw(int cmd, struct lustre_handle *conn,
1868                          int objcount, struct obd_ioobj *obj,
1869                          int niocount, struct niobuf_remote *nb,
1870                          struct niobuf_local *res, void **desc_private,
1871                          struct obd_trans_info *oti)
1872 {
1873         struct obd_run_ctxt saved;
1874         struct obd_export *export;
1875         struct obd_device *obd;
1876         struct obd_ioobj *o;
1877         struct niobuf_remote *rnb = nb;
1878         struct niobuf_local *lnb = res;
1879         struct dentry *dir_dentry;
1880         struct fsfilt_objinfo *fso;
1881         int pglocked = 0;
1882         int rc = 0;
1883         int i;
1884         ENTRY;
1885
1886         if ((cmd & OBD_BRW_WRITE) != 0)
1887                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
1888         else
1889                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
1890
1891         memset(res, 0, niocount * sizeof(*res));
1892
1893         export = class_conn2export(conn);
1894         obd = class_conn2obd(conn);
1895         if (!obd) {
1896                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
1897                 RETURN(-EINVAL);
1898         }
1899
1900         LASSERT(objcount < 16); // theoretically we support multi-obj BRW
1901
1902         OBD_ALLOC(fso, objcount * sizeof(*fso));
1903         if (!fso)
1904                 RETURN(-ENOMEM);
1905
1906         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
1907         dir_dentry = filter_parent(obd, S_IFREG);
1908
1909         for (i = 0, o = obj; i < objcount; i++, o++) {
1910                 struct filter_dentry_data *fdd;
1911                 struct dentry *dentry;
1912
1913                 LASSERT(o->ioo_bufcnt);
1914
1915                 dentry = filter_fid2dentry(obd, dir_dentry, o->ioo_id, 0);
1916
1917                 if (IS_ERR(dentry))
1918                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
1919
1920                 fso[i].fso_dentry = dentry;
1921                 fso[i].fso_bufcnt = o->ioo_bufcnt;
1922
1923                 if (!dentry->d_inode) {
1924                         CERROR("trying to BRW to non-existent file "LPU64"\n",
1925                                o->ioo_id);
1926                         GOTO(out_objinfo, rc = -ENOENT);
1927                 }
1928
1929                 fdd = dentry->d_fsdata;
1930                 if (!fdd || !atomic_read(&fdd->fdd_open_count))
1931                         CDEBUG(D_PAGE, "I/O to unopened object "LPX64"\n",
1932                                o->ioo_id);
1933         }
1934
1935         if (cmd & OBD_BRW_WRITE) {
1936 #warning "FIXME: we need to get inode->i_sem for each object here"
1937                 /* Even worse, we need to get locks on mulitple inodes (in
1938                  * order) or use the DLM to do the locking for us (and use
1939                  * the same locking in filter_setattr() for truncate.  The
1940                  * handling gets very ugly when dealing with locked pages.
1941                  * It may be easier to just get rid of the locked page code
1942                  * (which has problems of its own) and either discover we do
1943                  * not need it anymore (i.e. it was a symptom of another bug)
1944                  * or ensure we get the page locks in an appropriate order.
1945                  */
1946                 /* Danger, Will Robinson! You are taking a lock here and also
1947                  * starting a transaction and releasing/finishing then in
1948                  * filter_commitrw(), so you must call fsfilt_commit() and
1949                  * finish_transno() if an error occurs in this function.
1950                  */
1951                 filter_start_transno(export);
1952                 *desc_private = fsfilt_brw_start(obd, objcount, fso,
1953                                                  niocount, nb);
1954                 if (IS_ERR(*desc_private))
1955                         GOTO(out_objinfo, rc = PTR_ERR(*desc_private));
1956         }
1957
1958         obd_kmap_get(niocount, 1);
1959
1960         for (i = 0, o = obj; i < objcount; i++, o++) {
1961                 struct dentry *dentry;
1962                 struct inode *inode;
1963                 int j;
1964
1965                 dentry = fso[i].fso_dentry;
1966                 inode = dentry->d_inode;
1967
1968                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
1969                         struct page *page;
1970
1971                         if (j == 0)
1972                                 lnb->dentry = dentry;
1973                         else
1974                                 lnb->dentry = dget(dentry);
1975
1976                         if (cmd & OBD_BRW_WRITE) {
1977                                 page = filter_get_page_write(inode, rnb, lnb,
1978                                                              &pglocked);
1979
1980                                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_bytes,
1981                                                            rnb->len);
1982                         } else {
1983                                 page = lustre_get_page_read(inode, rnb);
1984
1985                                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_bytes,
1986                                                            rnb->len);
1987                         }
1988
1989                         if (IS_ERR(page)) {
1990                                 rc = PTR_ERR(page);
1991                                 f_dput(dentry);
1992                                 GOTO(out_pages, rc);
1993                         }
1994
1995                         lnb->addr = page_address(page);
1996                         lnb->offset = rnb->offset;
1997                         lnb->page = page;
1998                         lnb->len = rnb->len;
1999                 }
2000         }
2001
2002         EXIT;
2003 out:
2004         OBD_FREE(fso, objcount * sizeof(*fso));
2005         current->journal_info = NULL;
2006         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2007         return rc;
2008
2009 out_pages:
2010         while (lnb-- > res) {
2011                 CERROR("%d error cleanup on brw\n", rc);
2012                 if (cmd & OBD_BRW_WRITE)
2013                         filter_commit_write(lnb, rc);
2014                 else
2015                         lustre_put_page(lnb->page);
2016                 f_dput(lnb->dentry);
2017         }
2018         obd_kmap_put(niocount);
2019         goto out_err; /* dropped the dentry refs already (one per page) */
2020
2021 out_objinfo:
2022         for (i = 0; i < objcount && fso[i].fso_dentry; i++)
2023                 f_dput(fso[i].fso_dentry);
2024 out_err:
2025         if (cmd & OBD_BRW_WRITE) {
2026                 filter_finish_transno(export, *desc_private, oti, rc);
2027                 fsfilt_commit(obd, dir_dentry->d_inode, *desc_private);
2028         }
2029         goto out;
2030 }
2031
2032 static int filter_write_locked_page(struct niobuf_local *lnb)
2033 {
2034         struct page *lpage;
2035         int rc;
2036         ENTRY;
2037
2038         lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2039         if (IS_ERR(lpage)) {
2040                 /* It is highly unlikely that we would ever get an error here.
2041                  * The page we want to get was previously locked, so it had to
2042                  * have already allocated the space, and we were just writing
2043                  * over the same data, so there would be no hole in the file.
2044                  *
2045                  * XXX: possibility of a race with truncate could exist, need
2046                  *      to check that.  There are no guarantees w.r.t.
2047                  *      write order even on a local filesystem, although the
2048                  *      normal response would be to return the number of bytes
2049                  *      successfully written and leave the rest to the app.
2050                  */
2051                 rc = PTR_ERR(lpage);
2052                 CERROR("error getting locked page index %ld: rc = %d\n",
2053                        lnb->page->index, rc);
2054                 LBUG();
2055                 lustre_commit_write(lnb);
2056                 RETURN(rc);
2057         }
2058
2059         /* lpage is kmapped in lustre_get_page_write() above and kunmapped in
2060          * lustre_commit_write() below, lnb->page was kmapped previously in
2061          * filter_get_page_write() and kunmapped in lustre_put_page() below.
2062          */
2063         memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE);
2064         lustre_put_page(lnb->page);
2065
2066         lnb->page = lpage;
2067         rc = lustre_commit_write(lnb);
2068         if (rc)
2069                 CERROR("error committing locked page %ld: rc = %d\n",
2070                        lnb->page->index, rc);
2071
2072         RETURN(rc);
2073 }
2074
2075 static int filter_sync(struct obd_device *obd)
2076 {
2077         RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2078 }
2079
2080 static int filter_commitrw(int cmd, struct lustre_handle *conn,
2081                            int objcount, struct obd_ioobj *obj,
2082                            int niocount, struct niobuf_local *res,
2083                            void *desc_private, struct obd_trans_info *oti)
2084 {
2085         struct obd_run_ctxt saved;
2086         struct obd_ioobj *o;
2087         struct niobuf_local *lnb;
2088         struct obd_export *export = class_conn2export(conn);
2089         struct obd_device *obd = class_conn2obd(conn);
2090         int found_locked = 0;
2091         int rc = 0;
2092         int i;
2093         ENTRY;
2094
2095         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2096
2097         LASSERT(!current->journal_info);
2098         current->journal_info = desc_private;
2099
2100         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2101                 int j;
2102
2103                 if (cmd & OBD_BRW_WRITE)
2104                         inode_update_time(lnb->dentry->d_inode, 1);
2105                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2106                         if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2107                                 found_locked++;
2108                                 continue;
2109                         }
2110
2111                         if (cmd & OBD_BRW_WRITE) {
2112                                 int err = filter_commit_write(lnb, 0);
2113
2114                                 if (!rc)
2115                                         rc = err;
2116                         } else
2117                                 lustre_put_page(lnb->page);
2118
2119                         obd_kmap_put(1);
2120                         f_dput(lnb->dentry);
2121                 }
2122         }
2123
2124         for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2125                         i++, o++) {
2126                 int j;
2127                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2128                         int err;
2129                         if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2130                                 continue;
2131
2132                         err = filter_write_locked_page(lnb);
2133                         obd_kmap_put(1);
2134                         if (!rc)
2135                                 rc = err;
2136                         f_dput(lnb->dentry);
2137                         found_locked--;
2138                 }
2139         }
2140
2141         if (cmd & OBD_BRW_WRITE) {
2142                 int err;
2143                 struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
2144
2145                 rc = filter_finish_transno(export, desc_private, oti, rc);
2146                 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
2147                 if (err)
2148                         rc = err;
2149                 if (obd_sync_filter) {
2150                         /* this can fail with ENOMEM, what should we do then? */
2151                         filter_sync(obd);
2152                 }
2153                 /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
2154         }
2155
2156         LASSERT(!current->journal_info);
2157
2158         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2159         RETURN(rc);
2160 }
2161
2162 static int filter_brw(int cmd, struct lustre_handle *conn,
2163                       struct lov_stripe_md *lsm, obd_count oa_bufs,
2164                       struct brw_page *pga, struct obd_brw_set *set,
2165                       struct obd_trans_info *oti)
2166 {
2167         struct obd_ioobj        ioo;
2168         struct niobuf_local     *lnb;
2169         struct niobuf_remote    *rnb;
2170         obd_count               i;
2171         void                    *desc_private;
2172         int                     ret = 0;
2173         ENTRY;
2174
2175         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2176         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2177
2178         if (lnb == NULL || rnb == NULL)
2179                 GOTO(out, ret = -ENOMEM);
2180
2181         for (i = 0; i < oa_bufs; i++) {
2182                 rnb[i].offset = pga[i].off;
2183                 rnb[i].len = pga[i].count;
2184         }
2185
2186         ioo.ioo_id = lsm->lsm_object_id;
2187         ioo.ioo_gr = 0;
2188         ioo.ioo_type = S_IFREG;
2189         ioo.ioo_bufcnt = oa_bufs;
2190
2191         ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
2192                             &desc_private, oti);
2193         if (ret != 0)
2194                 GOTO(out, ret);
2195
2196         for (i = 0; i < oa_bufs; i++) {
2197                 void *virt = kmap(pga[i].pg);
2198                 obd_off off = pga[i].off & ~PAGE_MASK;
2199
2200                 if (cmd & OBD_BRW_WRITE)
2201                         memcpy(lnb[i].addr + off, virt + off, pga[i].count);
2202                 else
2203                         memcpy(virt + off, lnb[i].addr + off, pga[i].count);
2204
2205                 kunmap(virt);
2206         }
2207
2208         ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private,
2209                               oti);
2210
2211 out:
2212         if (lnb)
2213                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2214         if (rnb)
2215                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2216         RETURN(ret);
2217 }
2218
2219 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2220 {
2221         struct obd_device *obd;
2222         ENTRY;
2223
2224         obd = class_conn2obd(conn);
2225
2226         XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
2227
2228         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2229 }
2230
2231 static int filter_get_info(struct lustre_handle *conn, obd_count keylen,
2232                            void *key, obd_count *vallen, void **val)
2233 {
2234         struct obd_device *obd;
2235         ENTRY;
2236
2237         obd = class_conn2obd(conn);
2238         if (!obd) {
2239                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2240                 RETURN(-EINVAL);
2241         }
2242
2243         if ( keylen == strlen("blocksize") &&
2244              memcmp(key, "blocksize", keylen) == 0 ) {
2245                 *vallen = sizeof(long);
2246                 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize;
2247                 RETURN(0);
2248         }
2249
2250         if ( keylen == strlen("blocksize_bits") &&
2251              memcmp(key, "blocksize_bits", keylen) == 0 ){
2252                 *vallen = sizeof(long);
2253                 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize_bits;
2254                 RETURN(0);
2255         }
2256
2257         CDEBUG(D_IOCTL, "invalid key\n");
2258         RETURN(-EINVAL);
2259 }
2260
2261 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2262                   struct lustre_handle *src_conn, struct obdo *src,
2263                   obd_size count, obd_off offset, struct obd_trans_info *oti)
2264 {
2265         struct page *page;
2266         struct lov_stripe_md srcmd, dstmd;
2267         unsigned long index = 0;
2268         int err = 0;
2269
2270         memset(&srcmd, 0, sizeof(srcmd));
2271         memset(&dstmd, 0, sizeof(dstmd));
2272         srcmd.lsm_object_id = src->o_id;
2273         dstmd.lsm_object_id = dst->o_id;
2274
2275         ENTRY;
2276         CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2277                ", dst: ino "LPU64"\n",
2278                src->o_id, src->o_blocks, src->o_size, dst->o_id);
2279         page = alloc_page(GFP_USER);
2280         if (page == NULL)
2281                 RETURN(-ENOMEM);
2282
2283 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2284         while (TryLockPage(page))
2285                 ___wait_on_page(page);
2286 #else
2287         wait_on_page_locked(page);
2288 #endif
2289
2290         /* XXX with brw vector I/O, we could batch up reads and writes here,
2291          *     all we need to do is allocate multiple pages to handle the I/Os
2292          *     and arrays to handle the request parameters.
2293          */
2294         while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2295                 struct brw_page pg;
2296                 struct obd_brw_set *set;
2297
2298                 set = obd_brw_set_new();
2299                 if (set == NULL) {
2300                         err = -ENOMEM;
2301                         EXIT;
2302                         break;
2303                 }
2304
2305                 pg.pg = page;
2306                 pg.count = PAGE_SIZE;
2307                 pg.off = (page->index) << PAGE_SHIFT;
2308                 pg.flag = 0;
2309
2310                 page->index = index;
2311                 set->brw_callback = ll_brw_sync_wait;
2312                 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
2313                 obd_brw_set_free(set);
2314                 if (err) {
2315                         EXIT;
2316                         break;
2317                 }
2318
2319                 set = obd_brw_set_new();
2320                 if (set == NULL) {
2321                         err = -ENOMEM;
2322                         EXIT;
2323                         break;
2324                 }
2325                 pg.flag = OBD_BRW_CREATE;
2326                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2327
2328                 set->brw_callback = ll_brw_sync_wait;
2329                 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
2330                 obd_brw_set_free(set);
2331
2332                 /* XXX should handle dst->o_size, dst->o_blocks here */
2333                 if (err) {
2334                         EXIT;
2335                         break;
2336                 }
2337
2338                 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2339
2340                 index++;
2341         }
2342         dst->o_size = src->o_size;
2343         dst->o_blocks = src->o_blocks;
2344         dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2345         unlock_page(page);
2346         __free_page(page);
2347
2348         RETURN(err);
2349 }
2350
2351 static struct obd_ops filter_obd_ops = {
2352         o_owner:        THIS_MODULE,
2353         o_attach:       filter_attach,
2354         o_detach:       filter_detach,
2355         o_get_info:     filter_get_info,
2356         o_setup:        filter_setup,
2357         o_cleanup:      filter_cleanup,
2358         o_connect:      filter_connect,
2359         o_disconnect:   filter_disconnect,
2360         o_statfs:       filter_statfs,
2361         o_getattr:      filter_getattr,
2362         o_create:       filter_create,
2363         o_setattr:      filter_setattr,
2364         o_destroy:      filter_destroy,
2365         o_open:         filter_open,
2366         o_close:        filter_close,
2367         o_brw:          filter_brw,
2368         o_punch:        filter_truncate,
2369         o_preprw:       filter_preprw,
2370         o_commitrw:     filter_commitrw
2371 #if 0
2372         o_preallocate:  filter_preallocate_inodes,
2373         o_migrate:      filter_migrate,
2374         o_copy:         filter_copy_data,
2375         o_iterate:      filter_iterate
2376 #endif
2377 };
2378
2379
2380 static int __init obdfilter_init(void)
2381 {
2382         struct lprocfs_static_vars lvars;
2383
2384         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2385         filter_open_cache = kmem_cache_create("ll_filter_fdata",
2386                                               sizeof(struct filter_file_data),
2387                                               0, 0, NULL, NULL);
2388         if (!filter_open_cache)
2389                 RETURN(-ENOMEM);
2390
2391         filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
2392                                         sizeof(struct filter_dentry_data),
2393                                         0, 0, NULL, NULL);
2394         if (!filter_dentry_cache) {
2395                 kmem_cache_destroy(filter_open_cache);
2396                 RETURN(-ENOMEM);
2397         }
2398
2399         xprocfs_init ("filter");
2400
2401         lprocfs_init_vars(&lvars);
2402         return class_register_type(&filter_obd_ops, lvars.module_vars,
2403                                    OBD_FILTER_DEVICENAME);
2404 }
2405
2406 static void __exit obdfilter_exit(void)
2407 {
2408         class_unregister_type(OBD_FILTER_DEVICENAME);
2409         if (kmem_cache_destroy(filter_dentry_cache))
2410                 CERROR("couldn't free obdfilter dentry cache\n");
2411         if (kmem_cache_destroy(filter_open_cache))
2412                 CERROR("couldn't free obdfilter open cache\n");
2413         xprocfs_fini ();
2414 }
2415
2416 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2417 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2418 MODULE_LICENSE("GPL");
2419
2420 module_init(obdfilter_init);
2421 module_exit(obdfilter_exit);