Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_FILTER
38
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
42 #include <linux/fs.h>
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
54 #endif
55
56 static kmem_cache_t *filter_open_cache;
57 static kmem_cache_t *filter_dentry_cache;
58
59 /* should be generic per-obd stats... */
60 struct xprocfs_io_stat {
61         __u64    st_read_bytes;
62         __u64    st_read_reqs;
63         __u64    st_write_bytes;
64         __u64    st_write_reqs;
65         __u64    st_getattr_reqs;
66         __u64    st_setattr_reqs;
67         __u64    st_create_reqs;
68         __u64    st_destroy_reqs;
69         __u64    st_statfs_reqs;
70         __u64    st_syncfs_reqs;
71         __u64    st_open_reqs;
72         __u64    st_close_reqs;
73         __u64    st_punch_reqs;
74 };
75
76 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
77 static struct proc_dir_entry *xprocfs_dir;
78
79 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count)                 \
80 do {                                                            \
81         xprocfs_iostats[smp_processor_id()].field += (count);   \
82 } while (0)
83
84 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
85 #define DECLARE_XPROCFS_SUM_STAT(field)                 \
86 static long long                                        \
87 xprocfs_sum_##field (void)                              \
88 {                                                       \
89         long long stat = 0;                             \
90         int       i;                                    \
91                                                         \
92         for (i = 0; i < smp_num_cpus; i++)              \
93                 stat += xprocfs_iostats[i].field;       \
94         return (stat);                                  \
95 }
96
97 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
98 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
99 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
100 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
101 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
102 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
103 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
104 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
105 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
106 DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs)
107 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
108 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
109 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
110 #endif
111
112 static int
113 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
114                  int  *eof, void *data)
115 {
116         long long (*fn)(void) = (long long(*)(void))data;
117         int         len;
118
119         *eof = 1;
120         if (off != 0)
121                 return (0);
122
123         len = snprintf (page, count, "%Ld\n", fn());
124         *start = page;
125         return (len);
126 }
127
128
129 static void
130 xprocfs_add_stat(char *name, long long (*fn)(void))
131 {
132         struct proc_dir_entry *entry;
133
134         entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
135         if (entry == NULL) {
136                 CERROR ("Can't add procfs stat %s\n", name);
137                 return;
138         }
139
140         entry->data = fn;
141         entry->read_proc = xprocfs_rd_stat;
142         entry->write_proc = NULL;
143 }
144
145 static void
146 xprocfs_init (char *name)
147 {
148         char  dirname[64];
149
150         snprintf (dirname, sizeof (dirname), "sys/%s", name);
151
152         xprocfs_dir = proc_mkdir ("sys/obdfilter", NULL);
153         if (xprocfs_dir == NULL) {
154                 CERROR ("Can't make dir\n");
155                 return;
156         }
157
158 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
159         xprocfs_add_stat ("read_bytes",   xprocfs_sum_st_read_bytes);
160         xprocfs_add_stat ("read_reqs",    xprocfs_sum_st_read_reqs);
161         xprocfs_add_stat ("write_bytes",  xprocfs_sum_st_write_bytes);
162         xprocfs_add_stat ("write_reqs",   xprocfs_sum_st_write_reqs);
163         xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
164         xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
165         xprocfs_add_stat ("create_reqs",  xprocfs_sum_st_create_reqs);
166         xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
167         xprocfs_add_stat ("statfs_reqs",  xprocfs_sum_st_statfs_reqs);
168         xprocfs_add_stat ("syncfs_reqs",  xprocfs_sum_st_syncfs_reqs);
169         xprocfs_add_stat ("open_reqs",    xprocfs_sum_st_open_reqs);
170         xprocfs_add_stat ("close_reqs",   xprocfs_sum_st_close_reqs);
171         xprocfs_add_stat ("punch_reqs",   xprocfs_sum_st_punch_reqs);
172 #endif
173 }
174
175 void xprocfs_fini (void)
176 {
177         if (xprocfs_dir == NULL)
178                 return;
179
180         remove_proc_entry ("read_bytes",   xprocfs_dir);
181         remove_proc_entry ("read_reqs",    xprocfs_dir);
182         remove_proc_entry ("write_bytes",  xprocfs_dir);
183         remove_proc_entry ("write_reqs",   xprocfs_dir);
184         remove_proc_entry ("getattr_reqs", xprocfs_dir);
185         remove_proc_entry ("setattr_reqs", xprocfs_dir);
186         remove_proc_entry ("create_reqs",  xprocfs_dir);
187         remove_proc_entry ("destroy_reqs", xprocfs_dir);
188         remove_proc_entry ("statfs_reqs",  xprocfs_dir);
189         remove_proc_entry ("syncfs_reqs",  xprocfs_dir);
190         remove_proc_entry ("open_reqs",    xprocfs_dir);
191         remove_proc_entry ("close_reqs",   xprocfs_dir);
192         remove_proc_entry ("punch_reqs",   xprocfs_dir);
193
194         remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
195         xprocfs_dir = NULL;
196 }
197
198 #define S_SHIFT 12
199 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
200         [0]                     NULL,
201         [S_IFREG >> S_SHIFT]    "R",
202         [S_IFDIR >> S_SHIFT]    "D",
203         [S_IFCHR >> S_SHIFT]    "C",
204         [S_IFBLK >> S_SHIFT]    "B",
205         [S_IFIFO >> S_SHIFT]    "F",
206         [S_IFSOCK >> S_SHIFT]   "S",
207         [S_IFLNK >> S_SHIFT]    "L"
208 };
209
210 static inline const char *obd_mode_to_type(int mode)
211 {
212         return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
213 }
214
215 static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
216                                 int error)
217 {
218         CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
219                last_rcvd, error);
220         if (!error && last_rcvd > obd->obd_last_committed)
221                 obd->obd_last_committed = last_rcvd;
222 }
223
224 void filter_start_transno(struct obd_export *export)
225 {
226 #ifdef FILTER_TRANSNO_SEM
227         struct obd_device * obd = export->exp_obd;
228         ENTRY;
229
230         down(&obd->u.filter.fo_transno_sem);
231 #endif
232 }
233
234 /* Assumes caller has already pushed us into the kernel context. */
235 int filter_finish_transno(struct obd_export *export, void *handle,
236                           struct obd_trans_info *oti, int rc)
237 {
238         __u64 last_rcvd;
239         struct obd_device *obd = export->exp_obd;
240         struct filter_obd *filter = &obd->u.filter;
241         struct filter_export_data *fed = &export->exp_filter_data;
242         struct filter_client_data *fcd = fed->fed_fcd;
243         loff_t off;
244         ssize_t written;
245
246         /* Propagate error code. */
247         if (rc) {
248 #ifdef FILTER_TRANSNO_SEM
249                 up(&filter->fo_transno_sem);
250 #endif
251                 RETURN(rc);
252         }
253
254         if (!(obd->obd_flags & OBD_REPLAYABLE)) {
255                 RETURN(0);
256         }
257
258         /* we don't allocate new transnos for replayed requests */
259 #if 0
260         /* perhaps if transno already set? or should level be in oti? */
261         if (req->rq_level == LUSTRE_CONN_RECOVD)
262                 GOTO(out, rc = 0);
263 #endif
264
265         off = fed->fed_lr_off;
266
267 #ifndef FILTER_TRANSNO_SEM
268         spin_lock(&filter->fo_translock);
269 #endif
270         last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
271         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
272 #ifndef FILTER_TRANSNO_SEM
273         spin_unlock(&filter->fo_translock);
274 #endif
275         if (oti)
276                 oti->oti_transno = last_rcvd;
277         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
278         fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
279
280         /* get this from oti */
281 #if 0
282         if (oti)
283                 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
284         else
285 #else
286         fcd->fcd_last_xid = 0;
287 #endif
288         fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_last_rcvd_cb);
289         written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
290                                 &off);
291         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
292                LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
293
294 #ifdef FILTER_TRANSNO_SEM
295         up(&filter->fo_transno_sem);
296 #endif
297         if (written == sizeof(*fcd))
298                 RETURN(0);
299         CERROR("error writing to last_rcvd file: rc = %d\n", written);
300         if (written >= 0)
301                 RETURN(-EIO);
302
303         RETURN(written);
304 }
305
306 /* write the pathname into the string */
307 static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
308                      obd_mode mode)
309 {
310         if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
311                 sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
312         else
313                 sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
314                        (int)id & (filter->fo_subdir_count - 1), id);
315
316         return buf;
317 }
318
319 static inline void f_dput(struct dentry *dentry)
320 {
321         /* Can't go inside filter_ddelete because it can block */
322         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
323                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
324         LASSERT(atomic_read(&dentry->d_count) > 0);
325
326         dput(dentry);
327 }
328
329 /* Not racy w.r.t. others, because we are the only user of this dentry */
330 static void filter_drelease(struct dentry *dentry)
331 {
332         if (dentry->d_fsdata)
333                 kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
334 }
335
336 struct dentry_operations filter_dops = {
337         .d_release = filter_drelease,
338 };
339
340 #define LAST_RCVD "last_rcvd"
341 #define INIT_OBJID 2
342
343 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
344 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
345 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
346
347 /* Add client data to the FILTER.  We use a bitmap to locate a free space
348  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
349  * Otherwise, we have just read the data from the last_rcvd file and
350  * we know its offset.
351  */
352 int filter_client_add(struct filter_obd *filter,
353                       struct filter_export_data *fed, int cl_idx)
354 {
355         int new_client = (cl_idx == -1);
356
357         LASSERT(filter->fo_last_rcvd_slots != NULL);
358
359         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
360          * there's no need for extra complication here
361          */
362         if (new_client) {
363                 cl_idx = find_first_zero_bit(filter->fo_last_rcvd_slots,
364                                              FILTER_LR_MAX_CLIENTS);
365         repeat:
366                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
367                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
368                         return -ENOMEM;
369                 }
370                 if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
371                         CERROR("FILTER client %d: found bit is set in bitmap\n",
372                                cl_idx);
373                         cl_idx = find_next_zero_bit(filter->fo_last_rcvd_slots,
374                                                     FILTER_LR_MAX_CLIENTS,
375                                                     cl_idx);
376                         goto repeat;
377                 }
378         } else {
379                 if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
380                         CERROR("FILTER client %d: bit already set in bitmap!\n",
381                                cl_idx);
382                         LBUG();
383                 }
384         }
385
386         fed->fed_lr_idx = cl_idx;
387         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
388                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
389
390         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
391                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
392
393         if (new_client) {
394                 struct obd_run_ctxt saved;
395                 loff_t off = fed->fed_lr_off;
396                 ssize_t written;
397
398                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
399                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
400
401                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
402                 written = lustre_fwrite(filter->fo_rcvd_filp,
403                                                 (char *)fed->fed_fcd,
404                                                 sizeof(*fed->fed_fcd), &off);
405                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
406
407                 if (written != sizeof(*fed->fed_fcd)) {
408                         if (written < 0)
409                                 RETURN(written);
410                         RETURN(-EIO);
411                 }
412         }
413         return 0;
414 }
415
416 int filter_client_free(struct obd_export *exp)
417 {
418         struct filter_export_data *fed = &exp->exp_filter_data;
419         struct filter_obd *filter = &exp->exp_obd->u.filter;
420         struct filter_client_data zero_fcd;
421         struct obd_run_ctxt saved;
422         int written;
423         loff_t off;
424
425         if (!fed->fed_fcd)
426                 RETURN(0);
427
428         LASSERT(filter->fo_last_rcvd_slots != NULL);
429
430         off = fed->fed_lr_off;
431
432         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
433                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
434
435         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
436                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
437                        fed->fed_lr_idx);
438                 LBUG();
439         }
440
441         memset(&zero_fcd, 0, sizeof zero_fcd);
442         push_ctxt(&saved, &filter->fo_ctxt, NULL);
443         written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
444                                 sizeof(zero_fcd), &off);
445
446         /* XXX: this write gets lost sometimes, unless this sync is here. */
447         file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
448         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
449
450         if (written != sizeof(zero_fcd)) {
451                 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
452                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
453                        LAST_RCVD, written);
454         } else {
455                 CDEBUG(D_INFO,
456                        "zeroed disconnecting client %s at idx %u (%llu)\n",
457                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
458         }
459
460         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
461
462         return 0;
463 }
464
465 static int filter_free_server_data(struct filter_obd *filter)
466 {
467         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
468         filter->fo_fsd = NULL;
469         OBD_FREE(filter->fo_last_rcvd_slots,
470                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
471         filter->fo_last_rcvd_slots = NULL;
472         return 0;
473 }
474
475
476 /* assumes caller is already in kernel ctxt */
477 static int filter_update_server_data(struct file *filp,
478                                      struct filter_server_data *fsd)
479 {
480         loff_t off = 0;
481         int rc;
482
483         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
484         CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
485                le64_to_cpu(fsd->fsd_last_objid));
486         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
487                le64_to_cpu(fsd->fsd_last_rcvd));
488         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
489                le64_to_cpu(fsd->fsd_mount_count));
490
491         rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
492         if (rc != sizeof(*fsd)) {
493                 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
494                        rc);
495                 RETURN(-EIO);
496         }
497         RETURN(0);
498 }
499
500 /* assumes caller has already in kernel ctxt */
501 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
502                                    __u64 init_lastobjid)
503 {
504         struct filter_obd *filter = &obd->u.filter;
505         struct filter_server_data *fsd;
506         struct filter_client_data *fcd = NULL;
507         struct inode *inode = filp->f_dentry->d_inode;
508         unsigned long last_rcvd_size = inode->i_size;
509         __u64 mount_count;
510         int cl_idx;
511         loff_t off = 0;
512         int rc;
513
514         /* ensure padding in the struct is the correct size */
515         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
516                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
517         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
518                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
519
520         OBD_ALLOC(fsd, sizeof(*fsd));
521         if (!fsd)
522                 RETURN(-ENOMEM);
523         filter->fo_fsd = fsd;
524
525         OBD_ALLOC(filter->fo_last_rcvd_slots, 
526                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
527         if (filter->fo_last_rcvd_slots == NULL) {
528                 OBD_FREE(fsd, sizeof(*fsd));
529                 RETURN(-ENOMEM);
530         }
531
532         if (last_rcvd_size == 0) {
533                 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
534
535                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
536                 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
537                 fsd->fsd_last_rcvd = 0;
538                 mount_count = fsd->fsd_mount_count = 0;
539                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
540                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
541                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
542                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
543                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
544         } else {
545                 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
546                                               &off);
547                 if (retval != sizeof(*fsd)) {
548                         CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
549                         GOTO(out, rc = -EIO);
550                 }
551                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
552                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
553         }
554
555         if (fsd->fsd_feature_incompat) {
556                 CERROR("unsupported feature %x\n",
557                        le32_to_cpu(fsd->fsd_feature_incompat));
558                 RETURN(-EINVAL);
559         }
560         if (fsd->fsd_feature_rocompat) {
561                 CERROR("read-only feature %x\n",
562                        le32_to_cpu(fsd->fsd_feature_rocompat));
563                 /* Do something like remount filesystem read-only */
564                 RETURN(-EINVAL);
565         }
566
567         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
568                obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
569         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
570                obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
571         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
572                obd->obd_name, mount_count);
573         CDEBUG(D_INODE, "%s: server data size: %u\n",
574                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
575         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
576                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
577         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
578                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
579         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
580                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
581
582         /*
583          * When we do a clean FILTER shutdown, we save the last_rcvd into
584          * the header.  If we find clients with higher last_rcvd values
585          * then those clients may need recovery done.
586          */
587         if (obd->obd_flags & OBD_REPLAYABLE) {
588                 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
589                         __u64 last_rcvd;
590                         int mount_age;
591
592                         if (!fcd) {
593                                 OBD_ALLOC(fcd, sizeof(*fcd));
594                                 if (!fcd)
595                                         GOTO(err_fsd, rc = -ENOMEM);
596                         }
597
598                         /* Don't assume off is incremented properly, in case
599                          * sizeof(fsd) isn't the same as fsd->fsd_client_size.
600                          */
601                         off = le32_to_cpu(fsd->fsd_client_start) +
602                                 cl_idx * le16_to_cpu(fsd->fsd_client_size);
603                         rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
604                         if (rc != sizeof(*fcd)) {
605                                 CERROR("error reading FILTER %s offset %d: rc = %d\n",
606                                        LAST_RCVD, cl_idx, rc);
607                                 if (rc > 0) /* XXX fatal error or just abort reading? */
608                                         rc = -EIO;
609                                 break;
610                         }
611
612                         if (fcd->fcd_uuid[0] == '\0') {
613                                 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
614                                        cl_idx);
615                                 continue;
616                         }
617
618                         last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
619
620                         /* These exports are cleaned up by filter_disconnect(), so they
621                          * need to be set up like real exports as filter_connect() does.
622                          */
623                         mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
624                         if (mount_age < FILTER_MOUNT_RECOV) {
625                                 struct obd_export *exp = class_new_export(obd);
626                                 struct filter_export_data *fed;
627                                 CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
628                                        " srv lr: "LPU64" mnt: "LPU64" last mount: "
629                                        LPU64"\n", fcd->fcd_uuid, cl_idx,
630                                        last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
631                                        le64_to_cpu(fcd->fcd_mount_count), mount_count);
632                                 /* disabled until OST recovery is actually working */
633
634                                 if (!exp) {
635                                         rc = -ENOMEM;
636                                         break;
637                                 }
638                                 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
639                                        sizeof exp->exp_client_uuid.uuid);
640                                 fed = &exp->exp_filter_data;
641                                 fed->fed_fcd = fcd;
642                                 filter_client_add(filter, fed, cl_idx);
643                                 /* create helper if export init gets more complex */
644                                 INIT_LIST_HEAD(&fed->fed_open_head);
645                                 spin_lock_init(&fed->fed_lock);
646
647                                 fcd = NULL;
648                                 obd->obd_recoverable_clients++;
649                         } else {
650                                 CDEBUG(D_INFO,
651                                        "discarded client %d UUID '%s' count "LPU64"\n",
652                                        cl_idx, fcd->fcd_uuid,
653                                        le64_to_cpu(fcd->fcd_mount_count));
654                         }
655
656                         CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
657                                cl_idx, last_rcvd);
658
659                         if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
660                                 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
661                 }
662
663                 obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
664                 if (obd->obd_recoverable_clients) {
665                         CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
666                                obd->obd_recoverable_clients,
667                                le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
668                         obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
669                         obd->obd_flags |= OBD_RECOVERING;
670                 }
671
672                 if (fcd)
673                         OBD_FREE(fcd, sizeof(*fcd));
674
675         } else {
676                 CERROR("%s: recovery support OFF\n", obd->obd_name);
677         }
678
679         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
680
681         /* save it,so mount count and last_recvd is current */
682         rc = filter_update_server_data(filp, filter->fo_fsd);
683
684 out:
685         RETURN(rc);
686
687 err_fsd:
688         filter_free_server_data(filter);
689         RETURN(rc);
690 }
691
692 /* setup the object store with correct subdirectories */
693 static int filter_prep(struct obd_device *obd)
694 {
695         struct obd_run_ctxt saved;
696         struct filter_obd *filter = &obd->u.filter;
697         struct dentry *dentry, *O_dentry;
698         struct file *file;
699         struct inode *inode;
700         int i;
701         int rc = 0;
702         int mode = 0;
703
704         push_ctxt(&saved, &filter->fo_ctxt, NULL);
705         dentry = simple_mkdir(current->fs->pwd, "O", 0700);
706         CDEBUG(D_INODE, "got/created O: %p\n", dentry);
707         if (IS_ERR(dentry)) {
708                 rc = PTR_ERR(dentry);
709                 CERROR("cannot open/create O: rc = %d\n", rc);
710                 GOTO(out, rc);
711         }
712         filter->fo_dentry_O = dentry;
713
714         /*
715          * Create directories and/or get dentries for each object type.
716          * This saves us from having to do multiple lookups for each one.
717          */
718         O_dentry = filter->fo_dentry_O;
719         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
720                 char *name = obd_type_by_mode[mode];
721
722                 if (!name) {
723                         filter->fo_dentry_O_mode[mode] = NULL;
724                         continue;
725                 }
726                 dentry = simple_mkdir(O_dentry, name, 0700);
727                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
728                 if (IS_ERR(dentry)) {
729                         rc = PTR_ERR(dentry);
730                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
731                         GOTO(err_O_mode, rc);
732                 }
733                 filter->fo_dentry_O_mode[mode] = dentry;
734         }
735
736         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
737         if (!file || IS_ERR(file)) {
738                 rc = PTR_ERR(file);
739                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
740                        LAST_RCVD, rc);
741                 GOTO(err_O_mode, rc);
742         }
743
744         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
745                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
746                        file->f_dentry->d_inode->i_mode);
747                 GOTO(err_filp, rc = -ENOENT);
748         }
749
750         rc = fsfilt_journal_data(obd, file);
751         if (rc) {
752                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
753                 GOTO(err_filp, rc);
754         }
755         /* steal operations */
756         inode = file->f_dentry->d_inode;
757         filter->fo_fop = file->f_op;
758         filter->fo_iop = inode->i_op;
759         filter->fo_aops = inode->i_mapping->a_ops;
760
761         rc = filter_init_server_data(obd, file, INIT_OBJID);
762         if (rc) {
763                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
764                 GOTO(err_client, rc);
765         }
766         filter->fo_rcvd_filp = file;
767
768         if (filter->fo_subdir_count) {
769                 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
770                 OBD_ALLOC(filter->fo_dentry_O_sub,
771                           FILTER_SUBDIR_COUNT * sizeof(dentry));
772                 if (!filter->fo_dentry_O_sub)
773                         GOTO(err_client, rc = -ENOMEM);
774
775                 for (i = 0; i < filter->fo_subdir_count; i++) {
776                         char dir[20];
777                         snprintf(dir, sizeof(dir), "d%u", i);
778
779                         dentry = simple_mkdir(O_dentry, dir, 0700);
780                         CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
781                         if (IS_ERR(dentry)) {
782                                 rc = PTR_ERR(dentry);
783                                 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
784                                 GOTO(err_O_sub, rc);
785                         }
786                         filter->fo_dentry_O_sub[i] = dentry;
787                 }
788         }
789         rc = 0;
790  out:
791         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
792
793         return(rc);
794
795 err_O_sub:
796         while (i-- > 0) {
797                 struct dentry *dentry = filter->fo_dentry_O_sub[i];
798                 if (dentry) {
799                         f_dput(dentry);
800                         filter->fo_dentry_O_sub[i] = NULL;
801                 }
802         }
803         OBD_FREE(filter->fo_dentry_O_sub,
804                  filter->fo_subdir_count * sizeof(dentry));
805 err_client:
806         class_disconnect_all(obd);
807 err_filp:
808         if (filp_close(file, 0))
809                 CERROR("can't close %s after error\n", LAST_RCVD);
810         filter->fo_rcvd_filp = NULL;
811 err_O_mode:
812         while (mode-- > 0) {
813                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
814                 if (dentry) {
815                         f_dput(dentry);
816                         filter->fo_dentry_O_mode[mode] = NULL;
817                 }
818         }
819         f_dput(filter->fo_dentry_O);
820         filter->fo_dentry_O = NULL;
821         goto out;
822 }
823
824 /* cleanup the filter: write last used object id to status file */
825 static void filter_post(struct obd_device *obd)
826 {
827         struct obd_run_ctxt saved;
828         struct filter_obd *filter = &obd->u.filter;
829         long rc;
830         int mode;
831
832         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
833          * best to start a transaction with h_sync, because we removed this
834          * from lastobjid */
835
836         push_ctxt(&saved, &filter->fo_ctxt, NULL);
837         rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
838         if (rc)
839                 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
840
841
842         if (filter->fo_rcvd_filp) {
843                 rc = file_fsync(filter->fo_rcvd_filp,
844                                 filter->fo_rcvd_filp->f_dentry, 1);
845                 filp_close(filter->fo_rcvd_filp, 0);
846                 filter->fo_rcvd_filp = NULL;
847                 if (rc)
848                         CERROR("last_rcvd file won't closed rc = %ld\n", rc);
849         }
850
851         if (filter->fo_subdir_count) {
852                 int i;
853                 for (i = 0; i < filter->fo_subdir_count; i++) {
854                         struct dentry *dentry = filter->fo_dentry_O_sub[i];
855                         f_dput(dentry);
856                         filter->fo_dentry_O_sub[i] = NULL;
857                 }
858                 OBD_FREE(filter->fo_dentry_O_sub,
859                          filter->fo_subdir_count *
860                          sizeof(*filter->fo_dentry_O_sub));
861         }
862         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
863                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
864                 if (dentry) {
865                         f_dput(dentry);
866                         filter->fo_dentry_O_mode[mode] = NULL;
867                 }
868         }
869         f_dput(filter->fo_dentry_O);
870         filter_free_server_data(filter);
871         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
872 }
873
874
875 static __u64 filter_next_id(struct obd_device *obd)
876 {
877         obd_id id;
878         LASSERT(obd->u.filter.fo_fsd != NULL);
879
880         spin_lock(&obd->u.filter.fo_objidlock);
881         id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid);
882         obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
883         spin_unlock(&obd->u.filter.fo_objidlock);
884
885         return id;
886 }
887
888 /* how to get files, dentries, inodes from object id's */
889 /* parent i_sem is already held if needed for exclusivity */
890 static struct dentry *filter_fid2dentry(struct obd_device *obd,
891                                         struct dentry *dparent,
892                                         __u64 id, int lockit)
893 {
894         struct super_block *sb = obd->u.filter.fo_sb;
895         struct dentry *dchild;
896         char name[32];
897         int len;
898         ENTRY;
899
900         if (!sb || !sb->s_dev) {
901                 CERROR("fatal: device not initialized.\n");
902                 RETURN(ERR_PTR(-ENXIO));
903         }
904
905         if (id == 0) {
906                 CERROR("fatal: invalid object id 0\n");
907                 LBUG();
908                 RETURN(ERR_PTR(-ESTALE));
909         }
910
911         len = sprintf(name, LPU64, id);
912         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
913                dparent->d_name.len, dparent->d_name.name, name);
914         if (lockit)
915                 down(&dparent->d_inode->i_sem);
916         dchild = lookup_one_len(name, dparent, len);
917         if (lockit)
918                 up(&dparent->d_inode->i_sem);
919         if (IS_ERR(dchild)) {
920                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
921                 RETURN(dchild);
922         }
923
924         CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
925                dparent->d_name.len, dparent->d_name.name, name, dchild,
926                atomic_read(&dchild->d_count));
927
928         LASSERT(atomic_read(&dchild->d_count) > 0);
929
930         RETURN(dchild);
931 }
932
933 static inline struct dentry *filter_parent(struct obd_device *obd,
934                                            obd_mode mode, obd_id objid)
935 {
936         struct filter_obd *filter = &obd->u.filter;
937
938         LASSERT((mode & S_IFMT) == S_IFREG);   /* only regular files for now */
939         if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
940                 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
941
942         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
943 }
944
945 static struct file *filter_obj_open(struct obd_export *export,
946                                     __u64 id, __u32 type)
947 {
948         struct filter_obd *filter = &export->exp_obd->u.filter;
949         struct super_block *sb = filter->fo_sb;
950         struct dentry *dentry;
951         struct filter_export_data *fed = &export->exp_filter_data;
952         struct filter_dentry_data *fdd;
953         struct filter_file_data *ffd;
954         struct obd_run_ctxt saved;
955         char name[24];
956         struct file *file;
957         ENTRY;
958
959         if (!sb || !sb->s_dev) {
960                 CERROR("fatal: device not initialized.\n");
961                 RETURN(ERR_PTR(-ENXIO));
962         }
963
964         if (!id) {
965                 CERROR("fatal: invalid obdo "LPU64"\n", id);
966                 RETURN(ERR_PTR(-ESTALE));
967         }
968
969         if (!(type & S_IFMT)) {
970                 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
971                        __FUNCTION__, id, type);
972                 RETURN(ERR_PTR(-EINVAL));
973         }
974
975         PORTAL_SLAB_ALLOC(ffd, filter_open_cache, sizeof(*ffd));
976         if (!ffd) {
977                 CERROR("obdfilter: out of memory\n");
978                 RETURN(ERR_PTR(-ENOMEM));
979         }
980
981         /* We preallocate this to avoid blocking while holding fo_fddlock */
982         fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
983         if (!fdd) {
984                 CERROR("obdfilter: out of memory\n");
985                 GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
986         }
987
988         push_ctxt(&saved, &filter->fo_ctxt, NULL);
989         file = filp_open(filter_id(name, filter, id, type),
990                          O_RDWR | O_LARGEFILE, type);
991         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
992
993         if (IS_ERR(file)) {
994                 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
995                 GOTO(out_fdd, file);
996         }
997
998         dentry = file->f_dentry;
999         spin_lock(&filter->fo_fddlock);
1000         if (dentry->d_fsdata) {
1001                 spin_unlock(&filter->fo_fddlock);
1002                 kmem_cache_free(filter_dentry_cache, fdd);
1003                 fdd = dentry->d_fsdata;
1004                 LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
1005                 /* should only happen during client recovery */
1006                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1007                         CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1008                 atomic_inc(&fdd->fdd_open_count);
1009         } else {
1010                 atomic_set(&fdd->fdd_open_count, 1);
1011                 fdd->fdd_flags = 0;
1012                 fdd->fdd_objid = id;
1013                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1014                 dentry->d_fsdata = fdd;
1015                 spin_unlock(&filter->fo_fddlock);
1016         }
1017
1018         get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
1019         ffd->ffd_file = file;
1020         LASSERT(file->private_data == NULL);
1021         file->private_data = ffd;
1022
1023         if (!dentry->d_op)
1024                 dentry->d_op = &filter_dops;
1025         else
1026                 LASSERT(dentry->d_op == &filter_dops);
1027
1028         spin_lock(&fed->fed_lock);
1029         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1030         spin_unlock(&fed->fed_lock);
1031
1032         CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1033         EXIT;
1034 out:
1035         return file;
1036
1037 out_fdd:
1038         kmem_cache_free(filter_dentry_cache, fdd);
1039 out_ffd:
1040         ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
1041         PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1042         goto out;
1043 }
1044
1045 /* Caller must hold i_sem on dir_dentry->d_inode */
1046 /* Caller must push us into kernel context */
1047 static int filter_destroy_internal(struct obd_device *obd,
1048                                    struct dentry *dir_dentry,
1049                                    struct dentry *object_dentry)
1050 {
1051         struct inode *inode = object_dentry->d_inode;
1052         int rc;
1053         ENTRY;
1054
1055         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1056                 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1057                        object_dentry->d_name.len,
1058                        object_dentry->d_name.name,
1059                        inode->i_nlink, atomic_read(&inode->i_count));
1060         }
1061
1062         rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
1063
1064         if (rc)
1065                 CERROR("error unlinking objid %*s: rc %d\n",
1066                        object_dentry->d_name.len,
1067                        object_dentry->d_name.name, rc);
1068
1069         RETURN(rc);
1070 }
1071
1072 static int filter_close_internal(struct obd_export *export,
1073                                  struct filter_file_data *ffd,
1074                                  struct obd_trans_info *oti)
1075 {
1076         struct obd_device *obd = export->exp_obd;
1077         struct filter_obd *filter = &obd->u.filter;
1078         struct file *filp = ffd->ffd_file;
1079         struct dentry *object_dentry = dget(filp->f_dentry);
1080         struct filter_dentry_data *fdd = object_dentry->d_fsdata;
1081         int rc, rc2;
1082         ENTRY;
1083
1084         LASSERT(filp->private_data == ffd);
1085         LASSERT(fdd);
1086
1087         rc = filp_close(filp, 0);
1088
1089         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1090             fdd->fdd_flags & FILTER_FLAG_DESTROY) {
1091                 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, fdd->fdd_objid);
1092                 struct obd_run_ctxt saved;
1093                 void *handle;
1094
1095                 down(&dir_dentry->d_inode->i_sem);
1096                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1097                 filter_start_transno(export);
1098                 handle = fsfilt_start(obd, dir_dentry->d_inode,
1099                                       FSFILT_OP_UNLINK);
1100                 if (IS_ERR(handle)) {
1101                         rc = filter_finish_transno(export, handle, oti,
1102                                                    PTR_ERR(handle));
1103                         GOTO(out, rc);
1104                 }
1105                 /* XXX unlink from PENDING directory now too */
1106                 rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
1107                 if (rc2 && !rc)
1108                         rc = rc2;
1109                 rc = filter_finish_transno(export, handle, oti, rc);
1110                 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1111                 if (rc2) {
1112                         CERROR("error on commit, err = %d\n", rc2);
1113                         if (!rc)
1114                                 rc = rc2;
1115                 }
1116         out:
1117                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1118                 up(&dir_dentry->d_inode->i_sem);
1119         }
1120
1121         f_dput(object_dentry);
1122         PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1123
1124         RETURN(rc);
1125 }
1126
1127 /* obd methods */
1128 /* mount the file system (secretly) */
1129 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1130                                char *option)
1131 {
1132         struct obd_ioctl_data* data = buf;
1133         struct filter_obd *filter;
1134         struct vfsmount *mnt;
1135         int rc = 0;
1136         ENTRY;
1137
1138         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1139                 RETURN(-EINVAL);
1140
1141         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1142         if (IS_ERR(obd->obd_fsops))
1143                 RETURN(PTR_ERR(obd->obd_fsops));
1144
1145         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1146         rc = PTR_ERR(mnt);
1147         if (IS_ERR(mnt)) {
1148                 CERROR("mount of %s as type %s failed: rc %d\n",
1149                        data->ioc_inlbuf2, data->ioc_inlbuf1, rc);
1150                 GOTO(err_ops, rc);
1151         }
1152
1153 #if OST_RECOVERY
1154         obd->obd_flags |= OBD_REPLAYABLE;
1155 #endif
1156
1157         filter = &obd->u.filter;;
1158         filter->fo_vfsmnt = mnt;
1159         filter->fo_fstype = strdup(data->ioc_inlbuf2);
1160         filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
1161         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1162
1163         OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1164         filter->fo_ctxt.pwdmnt = mnt;
1165         filter->fo_ctxt.pwd = mnt->mnt_root;
1166         filter->fo_ctxt.fs = get_ds();
1167
1168         rc = filter_prep(obd);
1169         if (rc)
1170                 GOTO(err_kfree, rc);
1171
1172 #ifdef FILTER_TRANSNO_SEM
1173         init_MUTEX(&filter->fo_transno_sem);
1174 #else
1175         spin_lock_init(&filter->fo_translock);
1176 #endif
1177         spin_lock_init(&filter->fo_fddlock);
1178         spin_lock_init(&filter->fo_objidlock);
1179         INIT_LIST_HEAD(&filter->fo_export_list);
1180
1181         obd->obd_namespace =
1182                 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
1183         if (!obd->obd_namespace)
1184                 GOTO(err_post, rc = -ENOMEM);
1185
1186         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1187                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1188
1189         RETURN(0);
1190
1191 err_post:
1192         filter_post(obd);
1193 err_kfree:
1194         kfree(filter->fo_fstype);
1195         unlock_kernel();
1196         mntput(filter->fo_vfsmnt);
1197         filter->fo_sb = 0;
1198         lock_kernel();
1199 err_ops:
1200         fsfilt_put_ops(obd->obd_fsops);
1201         return rc;
1202 }
1203
1204 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1205 {
1206         return filter_common_setup(obd, len, buf, NULL);
1207 }
1208
1209 /* sanobd setup methods - use a specific mount option */
1210 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1211 {
1212         struct obd_ioctl_data* data = buf;
1213         char *option = NULL;
1214
1215         if (!data->ioc_inlbuf2)
1216                 RETURN(-EINVAL);
1217
1218         /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1219         if (!strcmp(data->ioc_inlbuf2, "extN") ||
1220             !strcmp(data->ioc_inlbuf2, "ext3"))
1221                 option = "data=writeback";
1222         else
1223                 LBUG(); /* just a reminder */
1224
1225         return filter_common_setup(obd, len, buf, option);
1226 }
1227
1228 static int filter_cleanup(struct obd_device *obd)
1229 {
1230         struct super_block *sb;
1231         ENTRY;
1232
1233         if (!list_empty(&obd->obd_exports)) {
1234                 CERROR("still has clients!\n");
1235                 class_disconnect_all(obd);
1236                 if (!list_empty(&obd->obd_exports)) {
1237                         CERROR("still has exports after forced cleanup?\n");
1238                         RETURN(-EBUSY);
1239                 }
1240         }
1241
1242         ldlm_namespace_free(obd->obd_namespace);
1243
1244         sb = obd->u.filter.fo_sb;
1245         if (!obd->u.filter.fo_sb)
1246                 RETURN(0);
1247
1248         filter_post(obd);
1249
1250         shrink_dcache_parent(sb->s_root);
1251         unlock_kernel();
1252         mntput(obd->u.filter.fo_vfsmnt);
1253         obd->u.filter.fo_sb = 0;
1254         kfree(obd->u.filter.fo_fstype);
1255         fsfilt_put_ops(obd->obd_fsops);
1256
1257         lock_kernel();
1258
1259         RETURN(0);
1260 }
1261
1262 int filter_attach(struct obd_device *dev, obd_count len, void *data)
1263 {
1264         struct lprocfs_static_vars lvars;
1265
1266         lprocfs_init_vars(&lvars);
1267         return lprocfs_obd_attach(dev, lvars.obd_vars);
1268 }
1269
1270 int filter_detach(struct obd_device *dev)
1271 {
1272         return lprocfs_obd_detach(dev);
1273 }
1274
1275 /* nearly identical to mds_connect */
1276 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1277                           struct obd_uuid *cluuid, struct recovd_obd *recovd,
1278                           ptlrpc_recovery_cb_t recover)
1279 {
1280         struct obd_export *exp;
1281         struct filter_export_data *fed;
1282         struct filter_client_data *fcd;
1283         struct filter_obd *filter = &obd->u.filter;
1284         int rc;
1285
1286         ENTRY;
1287
1288         if (!conn || !obd || !cluuid)
1289                 RETURN(-EINVAL);
1290
1291         rc = class_connect(conn, obd, cluuid);
1292         if (rc)
1293                 RETURN(rc);
1294         exp = class_conn2export(conn);
1295         LASSERT(exp);
1296         fed = &exp->exp_filter_data;
1297
1298         OBD_ALLOC(fcd, sizeof(*fcd));
1299         if (!fcd) {
1300                 CERROR("filter: out of memory for client data\n");
1301                 GOTO(out_export, rc = -ENOMEM);
1302         }
1303
1304         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1305         fed->fed_fcd = fcd;
1306         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1307
1308         INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
1309         spin_lock_init(&exp->exp_filter_data.fed_lock);
1310
1311         if (obd->obd_flags & OBD_REPLAYABLE) {
1312                 rc = filter_client_add(filter, fed, -1);
1313                 if (rc)
1314                         GOTO(out_fcd, rc);
1315         }
1316
1317         RETURN(rc);
1318
1319 out_fcd:
1320         OBD_FREE(fcd, sizeof(*fcd));
1321 out_export:
1322         class_disconnect(conn);
1323
1324         RETURN(rc);
1325 }
1326
1327 /* also incredibly similar to mds_disconnect */
1328 static int filter_disconnect(struct lustre_handle *conn)
1329 {
1330         struct obd_export *exp = class_conn2export(conn);
1331         struct filter_export_data *fed;
1332         int rc;
1333         ENTRY;
1334
1335         LASSERT(exp);
1336         fed = &exp->exp_filter_data;
1337         spin_lock(&fed->fed_lock);
1338         while (!list_empty(&fed->fed_open_head)) {
1339                 struct filter_file_data *ffd;
1340
1341                 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1342                                  ffd_export_list);
1343                 list_del(&ffd->ffd_export_list);
1344                 spin_unlock(&fed->fed_lock);
1345
1346                 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1347                        ffd->ffd_file->f_dentry->d_name.len,
1348                        ffd->ffd_file->f_dentry->d_name.name,
1349                        ffd, ffd->ffd_servercookie);
1350
1351                 filter_close_internal(exp, ffd, NULL);
1352                 spin_lock(&fed->fed_lock);
1353         }
1354         spin_unlock(&fed->fed_lock);
1355
1356         ldlm_cancel_locks_for_export(exp);
1357
1358         if (exp->exp_obd->obd_flags & OBD_REPLAYABLE) 
1359                 filter_client_free(exp);
1360
1361         rc = class_disconnect(conn);
1362
1363         /* XXX cleanup preallocated inodes */
1364         RETURN(rc);
1365 }
1366
1367 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1368 {
1369         int type = oa->o_mode & S_IFMT;
1370         ENTRY;
1371
1372         CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1373                inode->i_ino, inode, oa->o_id, valid);
1374         /* Don't copy the inode number in place of the object ID */
1375         obdo_from_inode(oa, inode, valid);
1376         oa->o_mode &= ~S_IFMT;
1377         oa->o_mode |= type;
1378
1379         if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1380                 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1381                 oa->o_rdev = rdev;
1382                 oa->o_valid |= OBD_MD_FLRDEV;
1383         }
1384
1385         EXIT;
1386 }
1387
1388 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
1389 {
1390         struct filter_file_data *ffd = NULL;
1391         ENTRY;
1392
1393         if (!handle || !handle->addr)
1394                 RETURN(NULL);
1395
1396         ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
1397         if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
1398                 RETURN(NULL);
1399
1400         if (ffd->ffd_servercookie != handle->cookie)
1401                 RETURN(NULL);
1402
1403         LASSERT(ffd->ffd_file->private_data == ffd);
1404         RETURN(ffd);
1405 }
1406
1407 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1408                                          struct obdo *oa, int locked,char *what)
1409 {
1410         struct dentry *dentry = NULL;
1411
1412         if (oa->o_valid & OBD_MD_FLHANDLE) {
1413                 struct lustre_handle *ost_handle = obdo_handle(oa);
1414                 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1415
1416                 if (ffd)
1417                         dentry = dget(ffd->ffd_file->f_dentry);
1418         }
1419
1420         if (!dentry) {
1421                 struct obd_device *obd = class_conn2obd(conn);
1422                 if (!obd) {
1423                         CERROR("invalid client "LPX64"\n", conn->addr);
1424                         RETURN(ERR_PTR(-EINVAL));
1425                 }
1426                 dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
1427                                                               oa->o_id),
1428                                            oa->o_id, locked);
1429         }
1430
1431         if (IS_ERR(dentry)) {
1432                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1433                 RETURN(dentry);
1434         }
1435
1436         if (!dentry->d_inode) {
1437                 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1438                 f_dput(dentry);
1439                 LBUG();
1440                 RETURN(ERR_PTR(-ENOENT));
1441         }
1442
1443         return dentry;
1444 }
1445
1446 #define filter_oa2dentry(conn, oa, locked) __filter_oa2dentry(conn, oa, locked,\
1447                                                               __FUNCTION__)
1448
1449 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1450                           struct lov_stripe_md *md)
1451 {
1452         struct dentry *dentry = NULL;
1453         int rc = 0;
1454         ENTRY;
1455
1456         XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
1457
1458         dentry = filter_oa2dentry(conn, oa, 1);
1459         if (IS_ERR(dentry))
1460                 RETURN(PTR_ERR(dentry));
1461
1462         filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1463
1464         f_dput(dentry);
1465         RETURN(rc);
1466 }
1467
1468 /* this is called from filter_truncate() until we have filter_punch() */
1469 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1470                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1471 {
1472         struct obd_run_ctxt saved;
1473         struct obd_export *export = class_conn2export(conn);
1474         struct obd_device *obd = class_conn2obd(conn);
1475         struct filter_obd *filter = &obd->u.filter;
1476         struct dentry *dentry;
1477         struct iattr iattr;
1478         struct inode *inode;
1479         void * handle;
1480         int rc, rc2;
1481         ENTRY;
1482
1483         XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
1484
1485         dentry = filter_oa2dentry(conn, oa, 0);
1486
1487         if (IS_ERR(dentry))
1488                 RETURN(PTR_ERR(dentry));
1489
1490         iattr_from_obdo(&iattr, oa, oa->o_valid);
1491         iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1492         inode = dentry->d_inode;
1493
1494         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1495         lock_kernel();
1496         if (iattr.ia_valid & ATTR_SIZE)
1497                 down(&inode->i_sem);
1498
1499         filter_start_transno(export);
1500         handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1501         if (IS_ERR(handle)) {
1502                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1503                 GOTO(out_unlock, rc);
1504         }
1505
1506         if (inode->i_op->setattr)
1507                 rc = inode->i_op->setattr(dentry, &iattr);
1508         else
1509                 rc = inode_setattr(inode, &iattr);
1510         rc = filter_finish_transno(export, handle, oti, rc);
1511         rc2 = fsfilt_commit(obd, dentry->d_inode, handle);
1512         if (rc2) {
1513                 CERROR("error on commit, err = %d\n", rc2);
1514                 if (!rc)
1515                         rc = rc2;
1516         }
1517
1518         if (iattr.ia_valid & ATTR_SIZE) {
1519                 up(&inode->i_sem);
1520                 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1521                 obdo_from_inode(oa, inode, oa->o_valid);
1522         }
1523
1524 out_unlock:
1525         unlock_kernel();
1526         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1527
1528         f_dput(dentry);
1529         RETURN(rc);
1530 }
1531
1532 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1533                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
1534 {
1535         struct obd_export *export;
1536         struct lustre_handle *handle;
1537         struct filter_file_data *ffd;
1538         struct file *filp;
1539         int rc = 0;
1540         ENTRY;
1541
1542         export = class_conn2export(conn);
1543         if (!export) {
1544                 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1545                 RETURN(-EINVAL);
1546         }
1547
1548         XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
1549
1550         filp = filter_obj_open(export, oa->o_id, oa->o_mode);
1551         if (IS_ERR(filp))
1552                 GOTO(out, rc = PTR_ERR(filp));
1553
1554         filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1555
1556         ffd = filp->private_data;
1557         handle = obdo_handle(oa);
1558         handle->addr = (__u64)(unsigned long)ffd;
1559         handle->cookie = ffd->ffd_servercookie;
1560         oa->o_valid |= OBD_MD_FLHANDLE;
1561         EXIT;
1562 out:
1563         return rc;
1564 } /* filter_open */
1565
1566 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1567                         struct lov_stripe_md *ea, struct obd_trans_info *oti)
1568 {
1569         struct obd_export *exp;
1570         struct filter_file_data *ffd;
1571         struct filter_export_data *fed;
1572         int rc;
1573         ENTRY;
1574
1575         exp = class_conn2export(conn);
1576         if (!exp) {
1577                 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1578                 RETURN(-EINVAL);
1579         }
1580
1581         XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
1582
1583         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1584                 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1585                 RETURN(-EINVAL);
1586         }
1587
1588         ffd = filter_handle2ffd(obdo_handle(oa));
1589         if (!ffd) {
1590                 struct lustre_handle *handle = obdo_handle(oa);
1591                 CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
1592                        handle->addr, handle->cookie);
1593                 RETURN(-ESTALE);
1594         }
1595
1596         fed = &exp->exp_filter_data;
1597         spin_lock(&fed->fed_lock);
1598         list_del(&ffd->ffd_export_list);
1599         spin_unlock(&fed->fed_lock);
1600
1601         rc = filter_close_internal(exp, ffd, oti);
1602
1603         RETURN(rc);
1604 } /* filter_close */
1605
1606 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1607                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
1608 {
1609         struct obd_export *export = class_conn2export(conn);
1610         struct obd_device *obd = class_conn2obd(conn);
1611         struct filter_obd *filter = &obd->u.filter;
1612         struct obd_run_ctxt saved;
1613         struct dentry *dir_dentry;
1614         struct dentry *new;
1615         struct iattr;
1616         void *handle;
1617         int err, rc;
1618         ENTRY;
1619
1620         if (!obd) {
1621                 CERROR("invalid client "LPX64"\n", conn->addr);
1622                 return -EINVAL;
1623         }
1624
1625         XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
1626
1627         oa->o_id = filter_next_id(obd);
1628
1629         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1630         dir_dentry = filter_parent(obd, S_IFREG, oa->o_id);
1631         down(&dir_dentry->d_inode->i_sem);
1632         new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
1633         if (IS_ERR(new))
1634                 GOTO(out, rc = PTR_ERR(new));
1635
1636         if (new->d_inode) {
1637                 char buf[32];
1638
1639                 /* This would only happen if lastobjid was bad on disk */
1640                 CERROR("objid %s already exists\n",
1641                        filter_id(buf, filter, S_IFREG, oa->o_id));
1642                 LBUG();
1643                 GOTO(out, rc = -EEXIST);
1644         }
1645
1646         filter_start_transno(export);
1647         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
1648         if (IS_ERR(handle)) {
1649                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1650                 GOTO(out_put, rc);
1651         }
1652         rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
1653         if (rc)
1654                 CERROR("create failed rc = %d\n", rc);
1655
1656         rc = filter_finish_transno(export, handle, oti, rc);
1657         err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1658         if (err) {
1659                 CERROR("unable to write lastobjid but file created\n");
1660                 if (!rc)
1661                         rc = err;
1662         }
1663         err = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1664         if (err) {
1665                 CERROR("error on commit, err = %d\n", err);
1666                 if (!rc)
1667                         rc = err;
1668         }
1669
1670         if (rc)
1671                 GOTO(out_put, rc);
1672
1673         /* Set flags for fields we have set in the inode struct */
1674         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1675                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1676         filter_from_inode(oa, new->d_inode, oa->o_valid);
1677
1678         EXIT;
1679 out_put:
1680         f_dput(new);
1681 out:
1682         up(&dir_dentry->d_inode->i_sem);
1683         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1684         return rc;
1685 }
1686
1687 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1688                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
1689 {
1690         struct obd_export *export = class_conn2export(conn);
1691         struct obd_device *obd = class_conn2obd(conn);
1692         struct filter_obd *filter = &obd->u.filter;
1693         struct dentry *dir_dentry, *object_dentry;
1694         struct filter_dentry_data *fdd;
1695         struct obd_run_ctxt saved;
1696         void *handle;
1697         int rc, rc2;
1698         ENTRY;
1699
1700         if (!obd) {
1701                 CERROR("invalid client "LPX64"\n", conn->addr);
1702                 RETURN(-EINVAL);
1703         }
1704
1705         XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
1706
1707         CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1708
1709         dir_dentry = filter_parent(obd, oa->o_mode, oa->o_id);
1710         down(&dir_dentry->d_inode->i_sem);
1711
1712         object_dentry = filter_oa2dentry(conn, oa, 0);
1713         if (IS_ERR(object_dentry))
1714                 GOTO(out, rc = -ENOENT);
1715
1716         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1717         filter_start_transno(export);
1718         handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
1719         if (IS_ERR(handle)) {
1720                 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1721                 GOTO(out_ctxt, rc);
1722         }
1723
1724         fdd = object_dentry->d_fsdata;
1725         if (fdd && atomic_read(&fdd->fdd_open_count)) {
1726                 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1727                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1728                         /* XXX put into PENDING directory in case of crash */
1729                         CDEBUG(D_INODE,
1730                                "defer destroy of %dx open objid "LPU64"\n",
1731                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1732                 } else
1733                         CDEBUG(D_INODE,
1734                                "repeat destroy of %dx open objid "LPU64"\n",
1735                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1736                 GOTO(out_commit, rc = 0);
1737         }
1738
1739         rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
1740
1741 out_commit:
1742         /* XXX save last_rcvd on disk */
1743         rc = filter_finish_transno(export, handle, oti, rc);
1744         rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1745         if (rc2) {
1746                 CERROR("error on commit, err = %d\n", rc2);
1747                 if (!rc)
1748                         rc = rc2;
1749         }
1750 out_ctxt:
1751         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1752         f_dput(object_dentry);
1753
1754         EXIT;
1755 out:
1756         up(&dir_dentry->d_inode->i_sem);
1757         return rc;
1758 }
1759
1760 /* NB start and end are used for punch, but not truncate */
1761 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1762                            struct lov_stripe_md *lsm,
1763                            obd_off start, obd_off end,
1764                            struct obd_trans_info *oti)
1765 {
1766         int error;
1767         ENTRY;
1768
1769         XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
1770
1771         if (end != OBD_OBJECT_EOF)
1772                 CERROR("PUNCH not supported, only truncate works\n");
1773
1774         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
1775                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1776         oa->o_size = start;
1777         error = filter_setattr(conn, oa, NULL, oti);
1778         RETURN(error);
1779 }
1780
1781 static inline void lustre_put_page(struct page *page)
1782 {
1783         kunmap(page);
1784         page_cache_release(page);
1785 }
1786
1787
1788 static struct page *
1789 lustre_get_page_read(struct inode *inode, struct niobuf_local *lnb)
1790 {
1791         unsigned long index = lnb->offset >> PAGE_SHIFT;
1792         struct address_space *mapping = inode->i_mapping;
1793         struct page *page;
1794         int rc;
1795
1796         page = read_cache_page(mapping, index,
1797                                (filler_t*)mapping->a_ops->readpage, NULL);
1798         if (!IS_ERR(page)) {
1799                 wait_on_page(page);
1800                 lnb->addr = kmap(page);
1801                 lnb->page = page;
1802                 if (!PageUptodate(page)) {
1803                         CERROR("page index %lu not uptodate\n", index);
1804                         GOTO(err_page, rc = -EIO);
1805                 }
1806                 if (PageError(page)) {
1807                         CERROR("page index %lu has error\n", index);
1808                         GOTO(err_page, rc = -EIO);
1809                 }
1810         }
1811         return page;
1812
1813 err_page:
1814         lustre_put_page(page);
1815         return ERR_PTR(rc);
1816 }
1817
1818 static struct page *
1819 lustre_get_page_write(struct inode *inode, unsigned long index)
1820 {
1821         struct address_space *mapping = inode->i_mapping;
1822         struct page *page;
1823         int rc;
1824
1825         page = grab_cache_page(mapping, index); /* locked page */
1826
1827         if (!IS_ERR(page)) {
1828                 kmap(page);
1829                 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
1830                  * a no-op for most filesystems, because we write the whole
1831                  * page.  For partial-page I/O this will read in the page.
1832                  */
1833                 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
1834                 if (rc) {
1835                         CERROR("page index %lu, rc = %d\n", index, rc);
1836                         if (rc != -ENOSPC)
1837                                 LBUG();
1838                         GOTO(err_unlock, rc);
1839                 }
1840                 /* XXX not sure if we need this if we are overwriting page */
1841                 if (PageError(page)) {
1842                         CERROR("error on page index %lu, rc = %d\n", index, rc);
1843                         LBUG();
1844                         GOTO(err_unlock, rc = -EIO);
1845                 }
1846         }
1847         return page;
1848
1849 err_unlock:
1850         unlock_page(page);
1851         lustre_put_page(page);
1852         return ERR_PTR(rc);
1853 }
1854
1855 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1856 int waitfor_one_page(struct page *page)
1857 {
1858         wait_on_page_locked(page);
1859         return 0;
1860 }
1861 #endif
1862
1863 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1864 /* We should only change the file mtime (and not the ctime, like
1865  * update_inode_times() in generic_file_write()) when we only change data.
1866  */
1867 static inline void inode_update_time(struct inode *inode, int ctime_too)
1868 {
1869         time_t now = CURRENT_TIME;
1870         if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
1871                 return;
1872         inode->i_mtime = now;
1873         if (ctime_too)
1874                 inode->i_ctime = now;
1875         mark_inode_dirty_sync(inode);
1876 }
1877 #endif
1878
1879 static int lustre_commit_write(struct niobuf_local *lnb)
1880 {
1881         struct page *page = lnb->page;
1882         unsigned from = lnb->offset & ~PAGE_MASK;
1883         unsigned to = from + lnb->len;
1884         struct inode *inode = page->mapping->host;
1885         int err;
1886
1887         LASSERT(to <= PAGE_SIZE);
1888         err = page->mapping->a_ops->commit_write(NULL, page, from, to);
1889         if (!err && IS_SYNC(inode))
1890                 err = waitfor_one_page(page);
1891         //SetPageUptodate(page); // the client commit_write will do this
1892
1893         SetPageReferenced(page);
1894         unlock_page(page);
1895         lustre_put_page(page);
1896         return err;
1897 }
1898
1899 struct page *filter_get_page_write(struct inode *inode,
1900                                    struct niobuf_local *lnb, int *pglocked)
1901 {
1902         unsigned long index = lnb->offset >> PAGE_SHIFT;
1903         struct address_space *mapping = inode->i_mapping;
1904         struct page *page;
1905         int rc;
1906
1907         //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
1908         if (*pglocked)
1909                 page = grab_cache_page_nowait(mapping, index); /* locked page */
1910         else
1911                 page = grab_cache_page(mapping, index); /* locked page */
1912
1913
1914         /* This page is currently locked, so get a temporary page instead. */
1915         /* XXX I believe this is a very dangerous thing to do - consider if
1916          *     we had multiple writers for the same file (definitely the case
1917          *     if we are using this codepath).  If writer A locks the page,
1918          *     writer B writes to a copy (as here), writer A drops the page
1919          *     lock, and writer C grabs the lock before B does, then B will
1920          *     later overwrite the data from C, even if C had LDLM locked
1921          *     and initiated the write after B did.
1922          */
1923         if (!page) {
1924                 unsigned long addr;
1925                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
1926                 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
1927                 if (!addr) {
1928                         CERROR("no memory for a temp page\n");
1929                         GOTO(err, rc = -ENOMEM);
1930                 }
1931                 POISON((void *)addr, 0xBA, PAGE_SIZE);
1932                 page = virt_to_page(addr);
1933                 kmap(page);
1934                 page->index = index;
1935                 lnb->addr = (void *)addr;
1936                 lnb->page = page;
1937                 lnb->flags |= N_LOCAL_TEMP_PAGE;
1938         } else if (!IS_ERR(page)) {
1939                 (*pglocked)++;
1940                 kmap(page);
1941
1942                 rc = mapping->a_ops->prepare_write(NULL, page,
1943                                                    lnb->offset & ~PAGE_MASK,
1944                                                    lnb->len);
1945                 if (rc) {
1946                         if (rc != -ENOSPC)
1947                                 CERROR("page index %lu, rc = %d\n", index, rc);
1948                         GOTO(err_unlock, rc);
1949                 }
1950                 /* XXX not sure if we need this if we are overwriting page */
1951                 if (PageError(page)) {
1952                         CERROR("error on page index %lu, rc = %d\n", index, rc);
1953                         LBUG();
1954                         GOTO(err_unlock, rc = -EIO);
1955                 }
1956                 lnb->addr = page_address(page);
1957                 lnb->page = page;
1958         }
1959
1960         return page;
1961
1962 err_unlock:
1963         unlock_page(page);
1964         lustre_put_page(page);
1965 err:
1966         return ERR_PTR(rc);
1967 }
1968
1969 /*
1970  * We need to balance prepare_write() calls with commit_write() calls.
1971  * If the page has been prepared, but we have no data for it, we don't
1972  * want to overwrite valid data on disk, but we still need to zero out
1973  * data for space which was newly allocated.  Like part of what happens
1974  * in __block_prepare_write() for newly allocated blocks.
1975  *
1976  * XXX currently __block_prepare_write() creates buffers for all the
1977  *     pages, and the filesystems mark these buffers as BH_New if they
1978  *     were newly allocated from disk. We use the BH_New flag similarly.
1979  */
1980 static int filter_commit_write(struct niobuf_local *lnb, int err)
1981 {
1982 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1983         if (err) {
1984                 unsigned block_start, block_end;
1985                 struct buffer_head *bh, *head = lnb->page->buffers;
1986                 unsigned blocksize = head->b_size;
1987
1988                 /* debugging: just seeing if this ever happens */
1989                 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
1990                        "called for ino %lu:%lu on err %d\n",
1991                        lnb->page->mapping->host->i_ino, lnb->page->index, err);
1992
1993                 /* Currently one buffer per page, but in the future... */
1994                 for (bh = head, block_start = 0; bh != head || !block_start;
1995                      block_start = block_end, bh = bh->b_this_page) {
1996                         block_end = block_start + blocksize;
1997                         if (buffer_new(bh))
1998                                 memset(lnb->addr + block_start, 0, blocksize);
1999                 }
2000         }
2001 #endif
2002         return lustre_commit_write(lnb);
2003 }
2004
2005 static int filter_preprw(int cmd, struct lustre_handle *conn,
2006                          int objcount, struct obd_ioobj *obj,
2007                          int niocount, struct niobuf_remote *nb,
2008                          struct niobuf_local *res, void **desc_private,
2009                          struct obd_trans_info *oti)
2010 {
2011         struct obd_run_ctxt saved;
2012         struct obd_export *export;
2013         struct obd_device *obd;
2014         struct obd_ioobj *o;
2015         struct niobuf_remote *rnb = nb;
2016         struct niobuf_local *lnb = res;
2017         struct fsfilt_objinfo *fso;
2018         int pglocked = 0;
2019         int rc = 0;
2020         int i;
2021         ENTRY;
2022
2023         if ((cmd & OBD_BRW_WRITE) != 0)
2024                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2025         else
2026                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2027
2028         memset(res, 0, niocount * sizeof(*res));
2029
2030         export = class_conn2export(conn);
2031         obd = class_conn2obd(conn);
2032         if (!obd) {
2033                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2034                 RETURN(-EINVAL);
2035         }
2036
2037         LASSERT(objcount < 16); // theoretically we support multi-obj BRW
2038
2039         OBD_ALLOC(fso, objcount * sizeof(*fso));
2040         if (!fso)
2041                 RETURN(-ENOMEM);
2042
2043         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2044
2045         for (i = 0, o = obj; i < objcount; i++, o++) {
2046                 struct filter_dentry_data *fdd;
2047                 struct dentry *dentry;
2048
2049                 LASSERT(o->ioo_bufcnt);
2050
2051                 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2052                                                               o->ioo_id),
2053                                            o->ioo_id, 0);
2054
2055                 if (IS_ERR(dentry))
2056                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
2057
2058                 fso[i].fso_dentry = dentry;
2059                 fso[i].fso_bufcnt = o->ioo_bufcnt;
2060
2061                 if (!dentry->d_inode) {
2062                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2063                                o->ioo_id);
2064                         f_dput(dentry);
2065                         GOTO(out_objinfo, rc = -ENOENT);
2066                 }
2067
2068                 fdd = dentry->d_fsdata;
2069                 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2070                         CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2071                                o->ioo_id);
2072         }
2073
2074         if (cmd & OBD_BRW_WRITE) {
2075 #warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
2076                 /* Even worse, we need to get locks on mulitple inodes (in
2077                  * order) or use the DLM to do the locking for us (and use
2078                  * the same locking in filter_setattr() for truncate.  The
2079                  * handling gets very ugly when dealing with locked pages.
2080                  * It may be easier to just get rid of the locked page code
2081                  * (which has problems of its own) and either discover we do
2082                  * not need it anymore (i.e. it was a symptom of another bug)
2083                  * or ensure we get the page locks in an appropriate order.
2084                  */
2085                 /* Danger, Will Robinson! You are taking a lock here and also
2086                  * starting a transaction and releasing/finishing then in
2087                  * filter_commitrw(), so you must call fsfilt_commit() and
2088                  * finish_transno() if an error occurs in this function.
2089                  */
2090                 filter_start_transno(export);
2091                 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2092                                                  niocount, nb);
2093                 if (IS_ERR(*desc_private)) {
2094                         rc = PTR_ERR(*desc_private);
2095                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2096                                "error starting transaction: rc = %d\n", rc);
2097                         *desc_private = NULL;
2098                         GOTO(out_objinfo, rc);
2099                 }
2100         }
2101
2102         obd_kmap_get(niocount, 1);
2103
2104         for (i = 0, o = obj; i < objcount; i++, o++) {
2105                 struct dentry *dentry;
2106                 struct inode *inode;
2107                 int j;
2108
2109                 dentry = fso[i].fso_dentry;
2110                 inode = dentry->d_inode;
2111
2112                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2113                         struct page *page;
2114
2115                         if (j == 0)
2116                                 lnb->dentry = dentry;
2117                         else
2118                                 lnb->dentry = dget(dentry);
2119
2120                         /* lnb->offset is aligned, while rnb->offset isn't,
2121                          * and we need to copy the fields to lnb anyways.
2122                          */
2123                         memcpy(lnb, rnb, sizeof(*rnb));
2124                         if (cmd & OBD_BRW_WRITE) {
2125                                 page = filter_get_page_write(inode, lnb,
2126                                                              &pglocked);
2127
2128                                 XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
2129                                                           lnb->len);
2130                         } else {
2131                                 page = lustre_get_page_read(inode, lnb);
2132
2133                                 XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
2134                                                           lnb->len);
2135                         }
2136
2137                         if (IS_ERR(page)) {
2138                                 rc = PTR_ERR(page);
2139                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2140                                        "error on page @"LPU64"%u/%u: rc = %d\n",
2141                                        lnb->offset, j, o->ioo_bufcnt, rc);
2142                                 f_dput(dentry);
2143                                 GOTO(out_pages, rc);
2144                         }
2145                 }
2146         }
2147
2148         EXIT;
2149 out:
2150         OBD_FREE(fso, objcount * sizeof(*fso));
2151         current->journal_info = NULL;
2152         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2153         return rc;
2154
2155 out_pages:
2156         while (lnb-- > res) {
2157                 if (cmd & OBD_BRW_WRITE)
2158                         filter_commit_write(lnb, rc);
2159                 else
2160                         lustre_put_page(lnb->page);
2161                 f_dput(lnb->dentry);
2162         }
2163         obd_kmap_put(niocount);
2164         if (cmd & OBD_BRW_WRITE) {
2165                 filter_finish_transno(export, *desc_private, oti, rc);
2166                 fsfilt_commit(obd,
2167                               filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2168                               *desc_private);
2169         }
2170         goto out; /* dropped the dentry refs already (one per page) */
2171
2172 out_objinfo:
2173         for (i = 0; i < objcount && fso[i].fso_dentry; i++)
2174                 f_dput(fso[i].fso_dentry);
2175         goto out;
2176 }
2177
2178 static int filter_write_locked_page(struct niobuf_local *lnb)
2179 {
2180         struct page *lpage;
2181         int rc;
2182         ENTRY;
2183
2184         lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2185         if (IS_ERR(lpage)) {
2186                 /* It is highly unlikely that we would ever get an error here.
2187                  * The page we want to get was previously locked, so it had to
2188                  * have already allocated the space, and we were just writing
2189                  * over the same data, so there would be no hole in the file.
2190                  *
2191                  * XXX: possibility of a race with truncate could exist, need
2192                  *      to check that.  There are no guarantees w.r.t.
2193                  *      write order even on a local filesystem, although the
2194                  *      normal response would be to return the number of bytes
2195                  *      successfully written and leave the rest to the app.
2196                  */
2197                 rc = PTR_ERR(lpage);
2198                 CERROR("error getting locked page index %ld: rc = %d\n",
2199                        lnb->page->index, rc);
2200                 LBUG();
2201                 lustre_commit_write(lnb);
2202                 RETURN(rc);
2203         }
2204
2205         /* lpage is kmapped in lustre_get_page_write() above and kunmapped in
2206          * lustre_commit_write() below, lnb->page was kmapped previously in
2207          * filter_get_page_write() and kunmapped in lustre_put_page() below.
2208          */
2209         memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE);
2210         lustre_put_page(lnb->page);
2211
2212         lnb->page = lpage;
2213         rc = lustre_commit_write(lnb);
2214         if (rc)
2215                 CERROR("error committing locked page %ld: rc = %d\n",
2216                        lnb->page->index, rc);
2217
2218         RETURN(rc);
2219 }
2220
2221 static int filter_syncfs(struct lustre_handle *conn)
2222 {
2223         struct obd_device *obd;
2224         ENTRY;
2225
2226         obd = class_conn2obd(conn);
2227
2228         XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
2229
2230         RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2231 }
2232
2233 static int filter_commitrw(int cmd, struct lustre_handle *conn,
2234                            int objcount, struct obd_ioobj *obj,
2235                            int niocount, struct niobuf_local *res,
2236                            void *desc_private, struct obd_trans_info *oti)
2237 {
2238         struct obd_run_ctxt saved;
2239         struct obd_ioobj *o;
2240         struct niobuf_local *lnb;
2241         struct obd_export *export = class_conn2export(conn);
2242         struct obd_device *obd = class_conn2obd(conn);
2243         int found_locked = 0;
2244         int rc = 0;
2245         int i;
2246         ENTRY;
2247
2248         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2249
2250         LASSERT(!current->journal_info);
2251         current->journal_info = desc_private;
2252
2253         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2254                 int j;
2255
2256                 if (cmd & OBD_BRW_WRITE)
2257                         inode_update_time(lnb->dentry->d_inode, 1);
2258                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2259                         if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2260                                 found_locked++;
2261                                 continue;
2262                         }
2263
2264                         if (cmd & OBD_BRW_WRITE) {
2265                                 int err = filter_commit_write(lnb, 0);
2266
2267                                 if (!rc)
2268                                         rc = err;
2269                         } else
2270                                 lustre_put_page(lnb->page);
2271
2272                         obd_kmap_put(1);
2273                         f_dput(lnb->dentry);
2274                 }
2275         }
2276
2277         for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2278                         i++, o++) {
2279                 int j;
2280                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2281                         int err;
2282                         if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2283                                 continue;
2284
2285                         err = filter_write_locked_page(lnb);
2286                         obd_kmap_put(1);
2287                         if (!rc)
2288                                 rc = err;
2289                         f_dput(lnb->dentry);
2290                         found_locked--;
2291                 }
2292         }
2293
2294         if (cmd & OBD_BRW_WRITE) {
2295                 /* We just want any dentry for the commit, for now */
2296                 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0);
2297                 int err;
2298
2299                 rc = filter_finish_transno(export, desc_private, oti, rc);
2300                 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
2301                 if (err)
2302                         rc = err;
2303                 if (obd_sync_filter) {
2304                         /* this can fail with ENOMEM, what should we do then? */
2305                         filter_syncfs(conn);
2306                 }
2307                 /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
2308         }
2309
2310         LASSERT(!current->journal_info);
2311
2312         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2313         RETURN(rc);
2314 }
2315
2316 static int filter_brw(int cmd, struct lustre_handle *conn,
2317                       struct lov_stripe_md *lsm, obd_count oa_bufs,
2318                       struct brw_page *pga, struct obd_brw_set *set,
2319                       struct obd_trans_info *oti)
2320 {
2321         struct obd_ioobj        ioo;
2322         struct niobuf_local     *lnb;
2323         struct niobuf_remote    *rnb;
2324         obd_count               i;
2325         void                    *desc_private;
2326         int                     ret = 0;
2327         ENTRY;
2328
2329         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2330         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2331
2332         if (lnb == NULL || rnb == NULL)
2333                 GOTO(out, ret = -ENOMEM);
2334
2335         for (i = 0; i < oa_bufs; i++) {
2336                 rnb[i].offset = pga[i].off;
2337                 rnb[i].len = pga[i].count;
2338         }
2339
2340         ioo.ioo_id = lsm->lsm_object_id;
2341         ioo.ioo_gr = 0;
2342         ioo.ioo_type = S_IFREG;
2343         ioo.ioo_bufcnt = oa_bufs;
2344
2345         ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
2346                             &desc_private, oti);
2347         if (ret != 0)
2348                 GOTO(out, ret);
2349
2350         for (i = 0; i < oa_bufs; i++) {
2351                 void *virt = kmap(pga[i].pg);
2352                 obd_off off = pga[i].off & ~PAGE_MASK;
2353
2354                 if (cmd & OBD_BRW_WRITE)
2355                         memcpy(lnb[i].addr + off, virt + off, pga[i].count);
2356                 else
2357                         memcpy(virt + off, lnb[i].addr + off, pga[i].count);
2358
2359                 kunmap(virt);
2360         }
2361
2362         ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private,
2363                               oti);
2364
2365 out:
2366         if (lnb)
2367                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2368         if (rnb)
2369                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2370         RETURN(ret);
2371 }
2372
2373 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2374                              int objcount, struct obd_ioobj *obj,
2375                              int niocount, struct niobuf_remote *nb)
2376 {
2377         struct obd_device *obd;
2378         struct obd_ioobj *o = obj;
2379         struct niobuf_remote *rnb = nb;
2380         int rc = 0;
2381         int i;
2382         ENTRY;
2383
2384         if ((cmd & OBD_BRW_WRITE) != 0)
2385                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2386         else
2387                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2388
2389         obd = class_conn2obd(conn);
2390         if (!obd) {
2391                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2392                 RETURN(-EINVAL);
2393         }
2394
2395         for (i = 0; i < objcount; i++, o++) {
2396                 struct dentry *dentry;
2397                 struct inode *inode;
2398                 int j;
2399
2400                 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2401                                                               o->ioo_id),
2402                                            o->ioo_id, 0);
2403                 if (IS_ERR(dentry))
2404                         GOTO(out, rc = PTR_ERR(dentry));
2405                 inode = dentry->d_inode;
2406                 if (!inode) {
2407                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2408                                o->ioo_id);
2409                         f_dput(dentry);
2410                         GOTO(out, rc = -ENOENT);
2411                 }
2412
2413                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2414                         long block;
2415
2416                         block = rnb->offset >> PAGE_SHIFT;
2417
2418                         if (cmd == OBD_BRW_READ) {
2419                                 block = inode->i_mapping->a_ops->bmap(
2420                                                 inode->i_mapping, block);
2421                         } else {
2422                                 loff_t newsize = rnb->offset + rnb->len;
2423                                 /* fs_prep_san_write will also update inode
2424                                  * size for us:
2425                                  * (1) new alloced block
2426                                  * (2) existed block but size extented
2427                                  */
2428                                 /* FIXME We could call fs_prep_san_write()
2429                                  * only once for all the blocks allocation.
2430                                  * Now call it once for each block, for
2431                                  * simplicity. And if error happens, we
2432                                  * probably need to release previous alloced
2433                                  * block */
2434                                 rc = fs_prep_san_write(obd, inode, &block,
2435                                                        1, newsize);
2436                                 if (rc)
2437                                         break;
2438                         }
2439
2440                         rnb->offset = block;
2441                 }
2442                 f_dput(dentry);
2443         }
2444 out:
2445         RETURN(rc);
2446 }
2447
2448 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2449 {
2450         struct obd_device *obd;
2451         ENTRY;
2452
2453         obd = class_conn2obd(conn);
2454
2455         XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
2456
2457         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2458 }
2459
2460 static int filter_get_info(struct lustre_handle *conn, obd_count keylen,
2461                            void *key, obd_count *vallen, void **val)
2462 {
2463         struct obd_device *obd;
2464         ENTRY;
2465
2466         obd = class_conn2obd(conn);
2467         if (!obd) {
2468                 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2469                 RETURN(-EINVAL);
2470         }
2471
2472         if ( keylen == strlen("blocksize") &&
2473              memcmp(key, "blocksize", keylen) == 0 ) {
2474                 *vallen = sizeof(long);
2475                 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize;
2476                 RETURN(0);
2477         }
2478
2479         if ( keylen == strlen("blocksize_bits") &&
2480              memcmp(key, "blocksize_bits", keylen) == 0 ){
2481                 *vallen = sizeof(long);
2482                 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize_bits;
2483                 RETURN(0);
2484         }
2485
2486         CDEBUG(D_IOCTL, "invalid key\n");
2487         RETURN(-EINVAL);
2488 }
2489
2490 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2491                   struct lustre_handle *src_conn, struct obdo *src,
2492                   obd_size count, obd_off offset, struct obd_trans_info *oti)
2493 {
2494         struct page *page;
2495         struct lov_stripe_md srcmd, dstmd;
2496         unsigned long index = 0;
2497         int err = 0;
2498
2499         memset(&srcmd, 0, sizeof(srcmd));
2500         memset(&dstmd, 0, sizeof(dstmd));
2501         srcmd.lsm_object_id = src->o_id;
2502         dstmd.lsm_object_id = dst->o_id;
2503
2504         ENTRY;
2505         CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2506                ", dst: ino "LPU64"\n",
2507                src->o_id, src->o_blocks, src->o_size, dst->o_id);
2508         page = alloc_page(GFP_USER);
2509         if (page == NULL)
2510                 RETURN(-ENOMEM);
2511
2512 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2513         while (TryLockPage(page))
2514                 ___wait_on_page(page);
2515 #else
2516         wait_on_page_locked(page);
2517 #endif
2518
2519         /* XXX with brw vector I/O, we could batch up reads and writes here,
2520          *     all we need to do is allocate multiple pages to handle the I/Os
2521          *     and arrays to handle the request parameters.
2522          */
2523         while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2524                 struct brw_page pg;
2525                 struct obd_brw_set *set;
2526
2527                 set = obd_brw_set_new();
2528                 if (set == NULL) {
2529                         err = -ENOMEM;
2530                         EXIT;
2531                         break;
2532                 }
2533
2534                 pg.pg = page;
2535                 pg.count = PAGE_SIZE;
2536                 pg.off = (page->index) << PAGE_SHIFT;
2537                 pg.flag = 0;
2538
2539                 page->index = index;
2540                 set->brw_callback = ll_brw_sync_wait;
2541                 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
2542                 obd_brw_set_free(set);
2543                 if (err) {
2544                         EXIT;
2545                         break;
2546                 }
2547
2548                 set = obd_brw_set_new();
2549                 if (set == NULL) {
2550                         err = -ENOMEM;
2551                         EXIT;
2552                         break;
2553                 }
2554                 pg.flag = OBD_BRW_CREATE;
2555                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2556
2557                 set->brw_callback = ll_brw_sync_wait;
2558                 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
2559                 obd_brw_set_free(set);
2560
2561                 /* XXX should handle dst->o_size, dst->o_blocks here */
2562                 if (err) {
2563                         EXIT;
2564                         break;
2565                 }
2566
2567                 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2568
2569                 index++;
2570         }
2571         dst->o_size = src->o_size;
2572         dst->o_blocks = src->o_blocks;
2573         dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2574         unlock_page(page);
2575         __free_page(page);
2576
2577         RETURN(err);
2578 }
2579
2580 static struct obd_ops filter_obd_ops = {
2581         o_owner:        THIS_MODULE,
2582         o_attach:       filter_attach,
2583         o_detach:       filter_detach,
2584         o_get_info:     filter_get_info,
2585         o_setup:        filter_setup,
2586         o_cleanup:      filter_cleanup,
2587         o_connect:      filter_connect,
2588         o_disconnect:   filter_disconnect,
2589         o_statfs:       filter_statfs,
2590         o_syncfs:       filter_syncfs,
2591         o_getattr:      filter_getattr,
2592         o_create:       filter_create,
2593         o_setattr:      filter_setattr,
2594         o_destroy:      filter_destroy,
2595         o_open:         filter_open,
2596         o_close:        filter_close,
2597         o_brw:          filter_brw,
2598         o_punch:        filter_truncate,
2599         o_preprw:       filter_preprw,
2600         o_commitrw:     filter_commitrw
2601 #if 0
2602         o_san_preprw:  filter_san_preprw,
2603         o_preallocate: filter_preallocate_inodes,
2604         o_migrate:     filter_migrate,
2605         o_copy:        filter_copy_data,
2606         o_iterate:     filter_iterate
2607 #endif
2608 };
2609
2610 static struct obd_ops filter_sanobd_ops = {
2611         o_owner:        THIS_MODULE,
2612         o_attach:       filter_attach,
2613         o_detach:       filter_detach,
2614         o_get_info:     filter_get_info,
2615         o_setup:        filter_san_setup,
2616         o_cleanup:      filter_cleanup,
2617         o_connect:      filter_connect,
2618         o_disconnect:   filter_disconnect,
2619         o_statfs:       filter_statfs,
2620         o_getattr:      filter_getattr,
2621         o_create:       filter_create,
2622         o_setattr:      filter_setattr,
2623         o_destroy:      filter_destroy,
2624         o_open:         filter_open,
2625         o_close:        filter_close,
2626         o_brw:          filter_brw,
2627         o_punch:        filter_truncate,
2628         o_preprw:       filter_preprw,
2629         o_commitrw:     filter_commitrw,
2630         o_san_preprw:   filter_san_preprw,
2631 #if 0
2632         o_preallocate:  filter_preallocate_inodes,
2633         o_migrate:      filter_migrate,
2634         o_copy:         filter_copy_data,
2635         o_iterate:      filter_iterate
2636 #endif
2637 };
2638
2639
2640 static int __init obdfilter_init(void)
2641 {
2642         struct lprocfs_static_vars lvars;
2643         int rc;
2644
2645         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2646         filter_open_cache = kmem_cache_create("ll_filter_fdata",
2647                                               sizeof(struct filter_file_data),
2648                                               0, 0, NULL, NULL);
2649         if (!filter_open_cache)
2650                 RETURN(-ENOMEM);
2651
2652         filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
2653                                         sizeof(struct filter_dentry_data),
2654                                         0, 0, NULL, NULL);
2655         if (!filter_dentry_cache) {
2656                 rc = -ENOMEM;
2657                 goto err1;
2658         }
2659
2660         xprocfs_init ("filter");
2661
2662         lprocfs_init_vars(&lvars);
2663
2664         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2665                                  OBD_FILTER_DEVICENAME);
2666         if (rc)
2667                 goto err2;
2668
2669         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2670                                  OBD_FILTER_SAN_DEVICENAME);
2671         if (rc)
2672                 goto err3;
2673
2674         return 0;
2675 err3:
2676         class_unregister_type(OBD_FILTER_DEVICENAME);
2677 err2:
2678         kmem_cache_destroy(filter_dentry_cache);
2679 err1:
2680         kmem_cache_destroy(filter_open_cache);
2681         return rc;
2682 }
2683
2684 static void __exit obdfilter_exit(void)
2685 {
2686         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2687         class_unregister_type(OBD_FILTER_DEVICENAME);
2688         if (kmem_cache_destroy(filter_dentry_cache))
2689                 CERROR("couldn't free obdfilter dentry cache\n");
2690         if (kmem_cache_destroy(filter_open_cache))
2691                 CERROR("couldn't free obdfilter open cache\n");
2692         xprocfs_fini ();
2693 }
2694
2695 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2696 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2697 MODULE_LICENSE("GPL");
2698
2699 module_init(obdfilter_init);
2700 module_exit(obdfilter_exit);