Whamcloud - gitweb
e6c223c6257bd59931a3eacbd713c601fe72442d
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_FILTER
38
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
42 #include <linux/fs.h>
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
54 #endif
55
56 enum {
57         LPROC_FILTER_READ_BYTES = 0,
58         LPROC_FILTER_WRITE_BYTES = 1,
59         LPROC_FILTER_LAST,
60 };
61
62 #define S_SHIFT 12
63 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
64         [0]                     NULL,
65         [S_IFREG >> S_SHIFT]    "R",
66         [S_IFDIR >> S_SHIFT]    "D",
67         [S_IFCHR >> S_SHIFT]    "C",
68         [S_IFBLK >> S_SHIFT]    "B",
69         [S_IFIFO >> S_SHIFT]    "F",
70         [S_IFSOCK >> S_SHIFT]   "S",
71         [S_IFLNK >> S_SHIFT]    "L"
72 };
73
74 static inline const char *obd_mode_to_type(int mode)
75 {
76         return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
77 }
78
79 static void filter_ffd_addref(void *ffdp)
80 {
81         struct filter_file_data *ffd = ffdp;
82
83         atomic_inc(&ffd->ffd_refcount);
84         CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
85                atomic_read(&ffd->ffd_refcount));
86 }
87
88 static struct filter_file_data *filter_ffd_new(void)
89 {
90         struct filter_file_data *ffd;
91
92         OBD_ALLOC(ffd, sizeof *ffd);
93         if (ffd == NULL) {
94                 CERROR("out of memory\n");
95                 return NULL;
96         }
97
98         atomic_set(&ffd->ffd_refcount, 2);
99
100         INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
101         class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
102
103         return ffd;
104 }
105
106 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
107 {
108         struct filter_file_data *ffd = NULL;
109         ENTRY;
110         LASSERT(handle != NULL);
111         ffd = class_handle2object(handle->cookie);
112         if (ffd != NULL)
113                 LASSERT(ffd->ffd_file->private_data == ffd);
114         RETURN(ffd);
115 }
116
117 static void filter_ffd_put(struct filter_file_data *ffd)
118 {
119         CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
120                atomic_read(&ffd->ffd_refcount) - 1);
121         LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
122                 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
123         if (atomic_dec_and_test(&ffd->ffd_refcount)) {
124                 LASSERT(list_empty(&ffd->ffd_handle.h_link));
125                 OBD_FREE(ffd, sizeof *ffd);
126         }
127 }
128
129 static void filter_ffd_destroy(struct filter_file_data *ffd)
130 {
131         class_handle_unhash(&ffd->ffd_handle);
132         filter_ffd_put(ffd);
133 }
134
135 static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
136 {
137         obd_transno_commit_cb(obd, transno, error);
138 }
139 /* Assumes caller has already pushed us into the kernel context. */
140 int filter_finish_transno(struct obd_export *export, void *handle,
141                           struct obd_trans_info *oti, int rc)
142 {
143         __u64 last_rcvd;
144         struct obd_device *obd = export->exp_obd;
145         struct filter_obd *filter = &obd->u.filter;
146         struct filter_export_data *fed = &export->exp_filter_data;
147         struct filter_client_data *fcd = fed->fed_fcd;
148         loff_t off;
149         ssize_t written;
150
151         /* Propagate error code. */
152         if (rc)
153                 RETURN(rc);
154
155         if (!obd->obd_replayable)
156                 RETURN(rc);
157
158         /* we don't allocate new transnos for replayed requests */
159 #if 0
160         /* perhaps if transno already set? or should level be in oti? */
161         if (req->rq_level == LUSTRE_CONN_RECOVD)
162                 GOTO(out, rc = 0);
163 #endif
164
165         off = fed->fed_lr_off;
166
167         spin_lock(&filter->fo_translock);
168         last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
169         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
170         spin_unlock(&filter->fo_translock);
171         if (oti)
172                 oti->oti_transno = last_rcvd;
173         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
174         fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
175
176         /* get this from oti */
177 #if 0
178         if (oti)
179                 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
180         else
181 #else
182         fcd->fcd_last_xid = 0;
183 #endif
184         fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
185         written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
186                                 &off);
187         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
188                LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
189
190         if (written == sizeof(*fcd))
191                 RETURN(0);
192         CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
193         if (written >= 0)
194                 RETURN(-EIO);
195
196         RETURN(written);
197 }
198
199 static inline void f_dput(struct dentry *dentry)
200 {
201         /* Can't go inside filter_ddelete because it can block */
202         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
203                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
204         LASSERT(atomic_read(&dentry->d_count) > 0);
205
206         dput(dentry);
207 }
208
209 /* Not racy w.r.t. others, because we are the only user of this dentry */
210 static void filter_drelease(struct dentry *dentry)
211 {
212         if (dentry->d_fsdata)
213                 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
214 }
215
216 struct dentry_operations filter_dops = {
217         .d_release = filter_drelease,
218 };
219
220 #define LAST_RCVD "last_rcvd"
221 #define INIT_OBJID 2
222
223 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
224 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
225 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
226
227 /* Add client data to the FILTER.  We use a bitmap to locate a free space
228  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
229  * Otherwise, we have just read the data from the last_rcvd file and
230  * we know its offset.
231  */
232 int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
233                       struct filter_export_data *fed, int cl_idx)
234 {
235         unsigned long *bitmap = filter->fo_last_rcvd_slots;
236         int new_client = (cl_idx == -1);
237
238         LASSERT(bitmap != NULL);
239
240         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
241         if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
242                 RETURN(0);
243
244         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
245          * there's no need for extra complication here
246          */
247         if (new_client) {
248                 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
249         repeat:
250                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
251                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
252                         return -ENOMEM;
253                 }
254                 if (test_and_set_bit(cl_idx, bitmap)) {
255                         CERROR("FILTER client %d: found bit is set in bitmap\n",
256                                cl_idx);
257                         cl_idx = find_next_zero_bit(bitmap,
258                                                     FILTER_LR_MAX_CLIENTS,
259                                                     cl_idx);
260                         goto repeat;
261                 }
262         } else {
263                 if (test_and_set_bit(cl_idx, bitmap)) {
264                         CERROR("FILTER client %d: bit already set in bitmap!\n",
265                                cl_idx);
266                         LBUG();
267                 }
268         }
269
270         fed->fed_lr_idx = cl_idx;
271         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
272                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
273
274         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
275                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
276
277         if (new_client) {
278                 struct obd_run_ctxt saved;
279                 loff_t off = fed->fed_lr_off;
280                 ssize_t written;
281                 void *handle;
282
283                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
284                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
285
286                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
287                 /* Transaction eeded to fix for bug 1403 */
288                 handle = fsfilt_start(obd,
289                                       filter->fo_rcvd_filp->f_dentry->d_inode,
290                                       FSFILT_OP_SETATTR);
291                 if (IS_ERR(handle)) {
292                         written = PTR_ERR(handle);
293                         CERROR("unable to start transaction: rc %d\n",
294                                (int)written);
295                 } else {
296                         written = lustre_fwrite(filter->fo_rcvd_filp,
297                                                 (char *)fed->fed_fcd,
298                                                 sizeof(*fed->fed_fcd), &off);
299                         fsfilt_commit(obd,
300                                       filter->fo_rcvd_filp->f_dentry->d_inode,
301                                       handle, 0);
302                 }
303                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
304
305                 if (written != sizeof(*fed->fed_fcd)) {
306                         if (written < 0)
307                                 RETURN(written);
308                         RETURN(-EIO);
309                 }
310         }
311         return 0;
312 }
313
314 int filter_client_free(struct obd_export *exp, int failover)
315 {
316         struct filter_export_data *fed = &exp->exp_filter_data;
317         struct filter_obd *filter = &exp->exp_obd->u.filter;
318         struct filter_client_data zero_fcd;
319         struct obd_run_ctxt saved;
320         int written;
321         loff_t off;
322
323         if (!fed->fed_fcd)
324                 RETURN(0);
325
326         if (failover != 0) {
327                 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
328                 RETURN(0);
329         }
330
331         LASSERT(filter->fo_last_rcvd_slots != NULL);
332
333         off = fed->fed_lr_off;
334
335         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
336                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
337
338         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
339                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
340                        fed->fed_lr_idx);
341                 LBUG();
342         }
343
344         memset(&zero_fcd, 0, sizeof zero_fcd);
345         push_ctxt(&saved, &filter->fo_ctxt, NULL);
346         written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
347                                 sizeof(zero_fcd), &off);
348
349         /* XXX: this write gets lost sometimes, unless this sync is here. */
350         if (written > 0)
351                 file_fsync(filter->fo_rcvd_filp,
352                            filter->fo_rcvd_filp->f_dentry, 1);
353         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
354
355         if (written != sizeof(zero_fcd)) {
356                 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
357                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
358                        LAST_RCVD, written);
359         } else {
360                 CDEBUG(D_INFO,
361                        "zeroed disconnecting client %s at idx %u (%llu)\n",
362                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
363         }
364
365         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
366
367         return 0;
368 }
369
370 static int filter_free_server_data(struct filter_obd *filter)
371 {
372         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
373         filter->fo_fsd = NULL;
374         OBD_FREE(filter->fo_last_rcvd_slots,
375                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
376         filter->fo_last_rcvd_slots = NULL;
377         return 0;
378 }
379
380
381 /* assumes caller is already in kernel ctxt */
382 static int filter_update_server_data(struct file *filp,
383                                      struct filter_server_data *fsd)
384 {
385         loff_t off = 0;
386         int rc;
387
388         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
389         CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
390                le64_to_cpu(fsd->fsd_last_objid));
391         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
392                le64_to_cpu(fsd->fsd_last_rcvd));
393         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
394                le64_to_cpu(fsd->fsd_mount_count));
395
396         rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
397         if (rc != sizeof(*fsd)) {
398                 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
399                        rc);
400                 RETURN(-EIO);
401         }
402         RETURN(0);
403 }
404
405 /* assumes caller has already in kernel ctxt */
406 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
407                                    __u64 init_lastobjid)
408 {
409         struct filter_obd *filter = &obd->u.filter;
410         struct filter_server_data *fsd;
411         struct filter_client_data *fcd = NULL;
412         struct inode *inode = filp->f_dentry->d_inode;
413         unsigned long last_rcvd_size = inode->i_size;
414         __u64 mount_count = 0;
415         int cl_idx;
416         loff_t off = 0;
417         int rc;
418
419         /* ensure padding in the struct is the correct size */
420         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
421                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
422         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
423                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
424
425         OBD_ALLOC(fsd, sizeof(*fsd));
426         if (!fsd)
427                 RETURN(-ENOMEM);
428         filter->fo_fsd = fsd;
429
430         OBD_ALLOC(filter->fo_last_rcvd_slots,
431                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
432         if (filter->fo_last_rcvd_slots == NULL) {
433                 OBD_FREE(fsd, sizeof(*fsd));
434                 RETURN(-ENOMEM);
435         }
436
437         if (last_rcvd_size == 0) {
438                 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
439
440                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
441                 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
442                 fsd->fsd_last_rcvd = 0;
443                 mount_count = fsd->fsd_mount_count = 0;
444                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
445                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
446                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
447                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
448                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
449         } else {
450                 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
451                                               &off);
452                 if (retval != sizeof(*fsd)) {
453                         CDEBUG(D_INODE,"OBD filter: error reading %s\n",
454                                LAST_RCVD);
455                         GOTO(err_fsd, rc = -EIO);
456                 }
457                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
458                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
459         }
460
461         if (fsd->fsd_feature_incompat) {
462                 CERROR("unsupported feature %x\n",
463                        le32_to_cpu(fsd->fsd_feature_incompat));
464                 GOTO(err_fsd, rc = -EINVAL);
465         }
466         if (fsd->fsd_feature_rocompat) {
467                 CERROR("read-only feature %x\n",
468                        le32_to_cpu(fsd->fsd_feature_rocompat));
469                 /* Do something like remount filesystem read-only */
470                 GOTO(err_fsd, rc = -EINVAL);
471         }
472
473         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
474                obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
475         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
476                obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
477         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
478                obd->obd_name, mount_count);
479         CDEBUG(D_INODE, "%s: server data size: %u\n",
480                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
481         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
482                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
483         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
484                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
485         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
486                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
487
488         /*
489          * When we do a clean FILTER shutdown, we save the last_rcvd into
490          * the header.  If we find clients with higher last_rcvd values
491          * then those clients may need recovery done.
492          */
493         if (!obd->obd_replayable) {
494                 CERROR("%s: recovery support OFF\n", obd->obd_name);
495                 GOTO(out, rc = 0);
496         }
497
498         for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
499                 __u64 last_rcvd;
500                 int mount_age;
501
502                 if (!fcd) {
503                         OBD_ALLOC(fcd, sizeof(*fcd));
504                         if (!fcd)
505                                 GOTO(err_fsd, rc = -ENOMEM);
506                 }
507
508                 /* Don't assume off is incremented properly, in case
509                  * sizeof(fsd) isn't the same as fsd->fsd_client_size.
510                  */
511                 off = le32_to_cpu(fsd->fsd_client_start) +
512                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
513                 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
514                 if (rc != sizeof(*fcd)) {
515                         CERROR("error reading FILTER %s offset %d: rc = %d\n",
516                                LAST_RCVD, cl_idx, rc);
517                         if (rc > 0) /* XXX fatal error or just abort reading? */
518                                 rc = -EIO;
519                         break;
520                 }
521
522                 if (fcd->fcd_uuid[0] == '\0') {
523                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
524                                cl_idx);
525                         continue;
526                 }
527
528                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
529
530                 /* These exports are cleaned up by filter_disconnect(), so they
531                  * need to be set up like real exports as filter_connect() does.
532                  */
533                 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
534                 if (mount_age < FILTER_MOUNT_RECOV) {
535                         struct obd_export *exp = class_new_export(obd);
536                         struct filter_export_data *fed;
537                         CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
538                                " srv lr: "LPU64" mnt: "LPU64" last mount: "
539                                LPU64"\n", fcd->fcd_uuid, cl_idx,
540                                last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
541                                le64_to_cpu(fcd->fcd_mount_count), mount_count);
542                         if (exp == NULL) {
543                                 /* XXX this rc is ignored  */
544                                 rc = -ENOMEM;
545                                 break;
546                         }
547                         memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
548                                sizeof exp->exp_client_uuid.uuid);
549                         fed = &exp->exp_filter_data;
550                         fed->fed_fcd = fcd;
551                         filter_client_add(obd, filter, fed, cl_idx);
552                         /* create helper if export init gets more complex */
553                         INIT_LIST_HEAD(&fed->fed_open_head);
554                         spin_lock_init(&fed->fed_lock);
555
556                         fcd = NULL;
557                         obd->obd_recoverable_clients++;
558                         class_export_put(exp);
559                 } else {
560                         CDEBUG(D_INFO,
561                                "discarded client %d UUID '%s' count "LPU64"\n",
562                                cl_idx, fcd->fcd_uuid,
563                                le64_to_cpu(fcd->fcd_mount_count));
564                 }
565
566                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
567                        cl_idx, last_rcvd);
568
569                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
570                         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
571
572                 obd->obd_last_committed =
573                         le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
574                 if (obd->obd_recoverable_clients) {
575                         CERROR("RECOVERY: %d recoverable clients, last_rcvd "
576                                LPU64"\n", obd->obd_recoverable_clients,
577                                le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
578                         obd->obd_next_recovery_transno =
579                                 obd->obd_last_committed + 1;
580                         obd->obd_recovering = 1;
581                 }
582
583         }
584
585         if (fcd)
586                 OBD_FREE(fcd, sizeof(*fcd));
587
588 out:
589         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
590
591         /* save it,so mount count and last_recvd is current */
592         rc = filter_update_server_data(filp, filter->fo_fsd);
593
594         RETURN(rc);
595
596 err_fsd:
597         filter_free_server_data(filter);
598         RETURN(rc);
599 }
600
601 /* setup the object store with correct subdirectories */
602 static int filter_prep(struct obd_device *obd)
603 {
604         struct obd_run_ctxt saved;
605         struct filter_obd *filter = &obd->u.filter;
606         struct dentry *dentry, *O_dentry;
607         struct file *file;
608         struct inode *inode;
609         int i;
610         int rc = 0;
611         int mode = 0;
612
613         push_ctxt(&saved, &filter->fo_ctxt, NULL);
614         dentry = simple_mkdir(current->fs->pwd, "O", 0700);
615         CDEBUG(D_INODE, "got/created O: %p\n", dentry);
616         if (IS_ERR(dentry)) {
617                 rc = PTR_ERR(dentry);
618                 CERROR("cannot open/create O: rc = %d\n", rc);
619                 GOTO(out, rc);
620         }
621         filter->fo_dentry_O = dentry;
622
623         /*
624          * Create directories and/or get dentries for each object type.
625          * This saves us from having to do multiple lookups for each one.
626          */
627         O_dentry = filter->fo_dentry_O;
628         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
629                 char *name = obd_type_by_mode[mode];
630
631                 if (!name) {
632                         filter->fo_dentry_O_mode[mode] = NULL;
633                         continue;
634                 }
635                 dentry = simple_mkdir(O_dentry, name, 0700);
636                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
637                 if (IS_ERR(dentry)) {
638                         rc = PTR_ERR(dentry);
639                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
640                         GOTO(err_O_mode, rc);
641                 }
642                 filter->fo_dentry_O_mode[mode] = dentry;
643         }
644
645         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
646         if (!file || IS_ERR(file)) {
647                 rc = PTR_ERR(file);
648                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
649                        LAST_RCVD, rc);
650                 GOTO(err_O_mode, rc);
651         }
652
653         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
654                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
655                        file->f_dentry->d_inode->i_mode);
656                 GOTO(err_filp, rc = -ENOENT);
657         }
658
659         rc = fsfilt_journal_data(obd, file);
660         if (rc) {
661                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
662                 GOTO(err_filp, rc);
663         }
664         /* steal operations */
665         inode = file->f_dentry->d_inode;
666         filter->fo_fop = file->f_op;
667         filter->fo_iop = inode->i_op;
668         filter->fo_aops = inode->i_mapping->a_ops;
669
670         rc = filter_init_server_data(obd, file, INIT_OBJID);
671         if (rc) {
672                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
673                 GOTO(err_client, rc);
674         }
675         filter->fo_rcvd_filp = file;
676
677         if (filter->fo_subdir_count) {
678                 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
679                 OBD_ALLOC(filter->fo_dentry_O_sub,
680                           filter->fo_subdir_count * sizeof(dentry));
681                 if (!filter->fo_dentry_O_sub)
682                         GOTO(err_client, rc = -ENOMEM);
683
684                 for (i = 0; i < filter->fo_subdir_count; i++) {
685                         char dir[20];
686                         snprintf(dir, sizeof(dir), "d%u", i);
687
688                         dentry = simple_mkdir(O_dentry, dir, 0700);
689                         CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
690                         if (IS_ERR(dentry)) {
691                                 rc = PTR_ERR(dentry);
692                                 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
693                                 GOTO(err_O_sub, rc);
694                         }
695                         filter->fo_dentry_O_sub[i] = dentry;
696                 }
697         }
698         rc = 0;
699  out:
700         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
701
702         return(rc);
703
704 err_O_sub:
705         while (i-- > 0) {
706                 struct dentry *dentry = filter->fo_dentry_O_sub[i];
707                 if (dentry) {
708                         f_dput(dentry);
709                         filter->fo_dentry_O_sub[i] = NULL;
710                 }
711         }
712         OBD_FREE(filter->fo_dentry_O_sub,
713                  filter->fo_subdir_count * sizeof(dentry));
714 err_client:
715         class_disconnect_exports(obd, 0);
716 err_filp:
717         if (filp_close(file, 0))
718                 CERROR("can't close %s after error\n", LAST_RCVD);
719         filter->fo_rcvd_filp = NULL;
720 err_O_mode:
721         while (mode-- > 0) {
722                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
723                 if (dentry) {
724                         f_dput(dentry);
725                         filter->fo_dentry_O_mode[mode] = NULL;
726                 }
727         }
728         f_dput(filter->fo_dentry_O);
729         filter->fo_dentry_O = NULL;
730         goto out;
731 }
732
733 /* cleanup the filter: write last used object id to status file */
734 static void filter_post(struct obd_device *obd)
735 {
736         struct obd_run_ctxt saved;
737         struct filter_obd *filter = &obd->u.filter;
738         long rc;
739         int mode;
740
741         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
742          * best to start a transaction with h_sync, because we removed this
743          * from lastobjid */
744
745         push_ctxt(&saved, &filter->fo_ctxt, NULL);
746         rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
747         if (rc)
748                 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
749
750
751         if (filter->fo_rcvd_filp) {
752                 rc = file_fsync(filter->fo_rcvd_filp,
753                                 filter->fo_rcvd_filp->f_dentry, 1);
754                 filp_close(filter->fo_rcvd_filp, 0);
755                 filter->fo_rcvd_filp = NULL;
756                 if (rc)
757                         CERROR("last_rcvd file won't closed rc = %ld\n", rc);
758         }
759
760         if (filter->fo_subdir_count) {
761                 int i;
762                 for (i = 0; i < filter->fo_subdir_count; i++) {
763                         struct dentry *dentry = filter->fo_dentry_O_sub[i];
764                         f_dput(dentry);
765                         filter->fo_dentry_O_sub[i] = NULL;
766                 }
767                 OBD_FREE(filter->fo_dentry_O_sub,
768                          filter->fo_subdir_count *
769                          sizeof(*filter->fo_dentry_O_sub));
770         }
771         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
772                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
773                 if (dentry) {
774                         f_dput(dentry);
775                         filter->fo_dentry_O_mode[mode] = NULL;
776                 }
777         }
778         f_dput(filter->fo_dentry_O);
779         filter_free_server_data(filter);
780         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
781 }
782
783
784 static __u64 filter_next_id(struct filter_obd *filter)
785 {
786         obd_id id;
787         LASSERT(filter->fo_fsd != NULL);
788
789         spin_lock(&filter->fo_objidlock);
790         id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
791         filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
792         spin_unlock(&filter->fo_objidlock);
793
794         return id;
795 }
796
797 /* direct cut-n-paste of mds_blocking_ast() */
798 int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
799                      void *data, int flag)
800 {
801         int do_ast;
802         ENTRY;
803
804         if (flag == LDLM_CB_CANCELING) {
805                 /* Don't need to do anything here. */
806                 RETURN(0);
807         }
808
809         /* XXX layering violation!  -phil */
810         l_lock(&lock->l_resource->lr_namespace->ns_lock);
811         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
812          * such that mds_blocking_ast is called just before l_i_p takes the
813          * ns_lock, then by the time we get the lock, we might not be the
814          * correct blocking function anymore.  So check, and return early, if
815          * so. */
816         if (lock->l_blocking_ast != filter_blocking_ast) {
817                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
818                 RETURN(0);
819         }
820
821         lock->l_flags |= LDLM_FL_CBPENDING;
822         do_ast = (!lock->l_readers && !lock->l_writers);
823         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
824
825         if (do_ast) {
826                 struct lustre_handle lockh;
827                 int rc;
828
829                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
830                 ldlm_lock2handle(lock, &lockh);
831                 rc = ldlm_cli_cancel(&lockh);
832                 if (rc < 0)
833                         CERROR("ldlm_cli_cancel: %d\n", rc);
834         } else {
835                 LDLM_DEBUG(lock, "Lock still has references, will be "
836                            "cancelled later");
837         }
838         RETURN(0);
839 }
840
841 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
842                               ldlm_mode_t lock_mode,struct lustre_handle *lockh)
843 {
844         struct ldlm_res_id res_id = { .name = {0} };
845         int flags = 0, rc;
846         ENTRY;
847
848         res_id.name[0] = de->d_inode->i_ino;
849         res_id.name[1] = de->d_inode->i_generation;
850         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
851                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
852                               &flags, ldlm_completion_ast,
853                               filter_blocking_ast, NULL, lockh);
854
855         RETURN(rc == ELDLM_OK ? 0 : -ENOLCK);  /* XXX translate ldlm code */
856 }
857
858 static void filter_parent_unlock(struct dentry *dparent,
859                                  struct lustre_handle *lockh,
860                                  ldlm_mode_t lock_mode)
861 {
862         ldlm_lock_decref(lockh, lock_mode);
863 }
864
865 /* We never dget the object parent, so DON'T dput it either */
866 static inline struct dentry *filter_parent(struct obd_device *obd,
867                                            obd_mode mode, obd_id objid)
868 {
869         struct filter_obd *filter = &obd->u.filter;
870
871         LASSERT(S_ISREG(mode));   /* only regular files for now */
872         if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
873                 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
874
875         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
876 }
877
878 /* We never dget the object parent, so DON'T dput it either */
879 static inline struct dentry *filter_parent_lock(struct obd_device *obd,
880                                                 obd_mode mode, obd_id objid,
881                                                 ldlm_mode_t lock_mode,
882                                                 struct lustre_handle *lockh)
883 {
884         unsigned long now = jiffies;
885         struct dentry *de = filter_parent(obd, mode, objid);
886         int rc;
887
888         if (IS_ERR(de))
889                 return de;
890
891         rc = filter_lock_dentry(obd, de, lock_mode, lockh);
892         if (time_after(jiffies, now + 15*HZ))
893                 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
894         return rc ? ERR_PTR(rc) : de;
895 }
896
897 /* How to get files, dentries, inodes from object id's.
898  *
899  * If dir_dentry is passed, the caller has already locked the parent
900  * appropriately for this operation (normally a write lock).  If
901  * dir_dentry is NULL, we do a read lock while we do the lookup to
902  * avoid races with create/destroy and such changing the directory
903  * internal to the filesystem code.
904  */
905 static struct dentry *filter_fid2dentry(struct obd_device *obd,
906                                         struct dentry *dir_dentry,
907                                         obd_mode mode, obd_id id)
908 {
909         struct super_block *sb = obd->u.filter.fo_sb;
910         struct lustre_handle lockh;
911         struct dentry *dparent = dir_dentry;
912         struct dentry *dchild;
913         char name[32];
914         int len;
915         ENTRY;
916
917         if (!sb || !sb->s_dev) {
918                 CERROR("device not initialized.\n");
919                 RETURN(ERR_PTR(-ENXIO));
920         }
921
922         if (id == 0) {
923                 CERROR("fatal: invalid object id 0\n");
924                 LBUG();
925                 RETURN(ERR_PTR(-ESTALE));
926         }
927
928         len = sprintf(name, LPU64, id);
929         if (!dir_dentry) {
930                 dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
931                 if (IS_ERR(dparent))
932                         RETURN(dparent);
933         }
934         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
935                dparent->d_name.len, dparent->d_name.name, name);
936         dchild = ll_lookup_one_len(name, dparent, len);
937         if (!dir_dentry)
938                 filter_parent_unlock(dparent, &lockh, LCK_PR);
939         if (IS_ERR(dchild)) {
940                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
941                 RETURN(dchild);
942         }
943
944         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
945                name, dchild, atomic_read(&dchild->d_count));
946
947         LASSERT(atomic_read(&dchild->d_count) > 0);
948
949         RETURN(dchild);
950 }
951
952 static struct file *filter_obj_open(struct obd_export *export,
953                                     __u64 id, __u32 type,
954                                     ldlm_mode_t parent_mode,
955                                     struct lustre_handle *parent_lockh)
956 {
957         struct obd_device *obd = export->exp_obd;
958         struct filter_obd *filter = &obd->u.filter;
959         struct super_block *sb = filter->fo_sb;
960         struct dentry *dchild = NULL, *dparent = NULL;
961         struct filter_export_data *fed = &export->exp_filter_data;
962         struct filter_dentry_data *fdd = NULL;
963         struct filter_file_data *ffd = NULL;
964         struct obd_run_ctxt saved;
965         char name[24];
966         struct file *file;
967         int len, cleanup_phase = 0;
968         ENTRY;
969
970         push_ctxt(&saved, &filter->fo_ctxt, NULL);
971
972         if (!sb || !sb->s_dev) {
973                 CERROR("fatal: device not initialized.\n");
974                 GOTO(cleanup, file = ERR_PTR(-ENXIO));
975         }
976
977         if (!id) {
978                 CERROR("fatal: invalid obdo "LPU64"\n", id);
979                 GOTO(cleanup, file = ERR_PTR(-ESTALE));
980         }
981
982         if (!(type & S_IFMT)) {
983                 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
984                        __FUNCTION__, id, type);
985                 GOTO(cleanup, file = ERR_PTR(-EINVAL));
986         }
987
988         ffd = filter_ffd_new();
989         if (ffd == NULL) {
990                 CERROR("obdfilter: out of memory\n");
991                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
992         }
993
994         cleanup_phase = 1;
995
996         /* We preallocate this to avoid blocking while holding fo_fddlock */
997         OBD_ALLOC(fdd, sizeof *fdd);
998         if (fdd == NULL) {
999                 CERROR("obdfilter: out of memory\n");
1000                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1001         }
1002
1003         cleanup_phase = 2;
1004
1005         dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1006         if (IS_ERR(dparent))
1007                 GOTO(cleanup, file = (void *)dparent);
1008
1009         cleanup_phase = 3;
1010
1011         len = snprintf(name, sizeof(name), LPU64, id);
1012         dchild = ll_lookup_one_len(name, dparent, len);
1013         if (IS_ERR(dchild))
1014                 GOTO(cleanup, file = (void *)dchild);
1015
1016         cleanup_phase = 4;
1017
1018         if (dchild->d_inode == NULL) {
1019                 CERROR("opening non-existent object %s - O_CREAT?\n", name);
1020                 file = ERR_PTR(-ENOENT);
1021                 GOTO(cleanup, file);
1022         }
1023
1024         /* dentry_open does a dput(dchild) and mntput(mnt) on error */
1025         mntget(filter->fo_vfsmnt);
1026         file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1027         if (IS_ERR(file)) {
1028                 dchild = NULL; /* prevent a double dput in step 4 */
1029                 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1030                 GOTO(cleanup, file);
1031         }
1032
1033         spin_lock(&filter->fo_fddlock);
1034         if (dchild->d_fsdata) {
1035                 spin_unlock(&filter->fo_fddlock);
1036                 OBD_FREE(fdd, sizeof *fdd);
1037                 fdd = dchild->d_fsdata;
1038                 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1039                 /* should only happen during client recovery */
1040                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1041                         CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1042                 atomic_inc(&fdd->fdd_open_count);
1043         } else {
1044                 atomic_set(&fdd->fdd_open_count, 1);
1045                 fdd->fdd_magic = FILTER_DENTRY_MAGIC;
1046                 fdd->fdd_flags = 0;
1047                 fdd->fdd_objid = id;
1048                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1049                 dchild->d_fsdata = fdd;
1050                 spin_unlock(&filter->fo_fddlock);
1051         }
1052
1053         ffd->ffd_file = file;
1054         LASSERT(file->private_data == NULL);
1055         file->private_data = ffd;
1056
1057         if (!dchild->d_op)
1058                 dchild->d_op = &filter_dops;
1059         else
1060                 LASSERT(dchild->d_op == &filter_dops);
1061
1062         spin_lock(&fed->fed_lock);
1063         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1064         spin_unlock(&fed->fed_lock);
1065
1066         CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1067 cleanup:
1068         switch (cleanup_phase) {
1069         case 4:
1070                 if (IS_ERR(file))
1071                         f_dput(dchild);
1072         case 3:
1073                 if (IS_ERR(file))
1074                         filter_parent_unlock(dparent, parent_lockh,parent_mode);
1075         case 2:
1076                 if (IS_ERR(file))
1077                         OBD_FREE(fdd, sizeof *fdd);
1078         case 1:
1079                 if (IS_ERR(file))
1080                         filter_ffd_destroy(ffd);
1081                 filter_ffd_put(ffd);
1082         case 0:
1083                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1084         }
1085         RETURN(file);
1086 }
1087
1088 /* Caller must hold LCK_PW on parent and push us into kernel context.
1089  * Caller is also required to ensure that dchild->d_inode exists.
1090  */
1091 static int filter_destroy_internal(struct obd_device *obd,
1092                                    struct dentry *dparent,
1093                                    struct dentry *dchild)
1094 {
1095         struct inode *inode = dchild->d_inode;
1096         int rc;
1097         ENTRY;
1098
1099         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1100                 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1101                        dchild->d_name.len, dchild->d_name.name,
1102                        inode->i_nlink, atomic_read(&inode->i_count));
1103         }
1104
1105         rc = vfs_unlink(dparent->d_inode, dchild);
1106
1107         if (rc)
1108                 CERROR("error unlinking objid %*s: rc %d\n",
1109                        dchild->d_name.len, dchild->d_name.name, rc);
1110
1111         RETURN(rc);
1112 }
1113
1114 /* If closing because we are failing this device, then
1115    don't do the unlink on close.
1116 */
1117 static int filter_close_internal(struct obd_export *exp,
1118                                  struct filter_file_data *ffd,
1119                                  struct obd_trans_info *oti,
1120                                  int failover)
1121 {
1122         struct obd_device *obd = exp->exp_obd;
1123         struct filter_obd *filter = &obd->u.filter;
1124         struct file *filp = ffd->ffd_file;
1125         struct dentry *dchild = dget(filp->f_dentry);
1126         struct filter_dentry_data *fdd = dchild->d_fsdata;
1127         struct lustre_handle parent_lockh;
1128         int rc, rc2, cleanup_phase = 0;
1129         struct dentry *dparent;
1130         struct obd_run_ctxt saved;
1131         ENTRY;
1132
1133         LASSERT(filp->private_data == ffd);
1134         LASSERT(fdd);
1135         LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1136
1137         rc = filp_close(filp, 0);
1138
1139         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1140             fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
1141                 void *handle;
1142
1143                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1144                 cleanup_phase = 1;
1145
1146                 LASSERT(fdd->fdd_objid > 0);
1147                 dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1148                                              LCK_PW, &parent_lockh);
1149                 if (IS_ERR(dparent))
1150                         GOTO(cleanup, rc = PTR_ERR(dparent));
1151                 cleanup_phase = 2;
1152
1153                 handle = fsfilt_start(obd, dparent->d_inode,
1154                                       FSFILT_OP_UNLINK);
1155                 if (IS_ERR(handle))
1156                         GOTO(cleanup, rc = PTR_ERR(handle));
1157
1158                 /* XXX unlink from PENDING directory now too */
1159                 rc2 = filter_destroy_internal(obd, dparent, dchild);
1160                 if (rc2 && !rc)
1161                         rc = rc2;
1162                 rc = filter_finish_transno(exp, handle, oti, rc);
1163                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1164                 if (rc2) {
1165                         CERROR("error on commit, err = %d\n", rc2);
1166                         if (!rc)
1167                                 rc = rc2;
1168                 }
1169         }
1170
1171 cleanup:
1172         switch(cleanup_phase) {
1173         case 2:
1174                 if (rc || oti == NULL) {
1175                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1176                 } else {
1177                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1178                                sizeof(parent_lockh));
1179                         oti->oti_ack_locks[0].mode = LCK_PW;
1180                 }
1181         case 1:
1182                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1183         case 0:
1184                 f_dput(dchild);
1185                 filter_ffd_destroy(ffd);
1186                 break;
1187         default:
1188                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1189                 LBUG();
1190         }
1191
1192         RETURN(rc);
1193 }
1194
1195 /* obd methods */
1196 /* mount the file system (secretly) */
1197 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1198                                char *option)
1199 {
1200         struct obd_ioctl_data* data = buf;
1201         struct filter_obd *filter = &obd->u.filter;
1202
1203         struct vfsmount *mnt;
1204         int rc = 0;
1205         ENTRY;
1206
1207         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1208                 RETURN(-EINVAL);
1209
1210         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1211         if (IS_ERR(obd->obd_fsops))
1212                 RETURN(PTR_ERR(obd->obd_fsops));
1213
1214         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1215         rc = PTR_ERR(mnt);
1216         if (IS_ERR(mnt))
1217                 GOTO(err_ops, rc);
1218
1219         if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1220                 if (*data->ioc_inlbuf3 == 'f') {
1221                         obd->obd_replayable = 1;
1222                         obd_sync_filter = 1;
1223                         CERROR("%s: configured for recovery and sync write\n",
1224                                obd->obd_name);
1225                 } else {
1226                         if (*data->ioc_inlbuf3 != 'n') {
1227                                 CERROR("unrecognised flag '%c'\n",
1228                                        *data->ioc_inlbuf3);
1229                         }
1230                 }
1231         }
1232
1233         if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
1234                 if (*data->ioc_inlbuf4 == '/') {
1235                         CERROR("filter namespace mount: %s\n",
1236                                data->ioc_inlbuf4);
1237                         filter->fo_nspath = strdup(data->ioc_inlbuf4);
1238                 } else {
1239                         CERROR("namespace mount must be absolute path: '%s'\n",
1240                                data->ioc_inlbuf4);
1241                 }
1242         }
1243
1244         filter->fo_vfsmnt = mnt;
1245         filter->fo_sb = mnt->mnt_sb;
1246         filter->fo_fstype = mnt->mnt_sb->s_type->name;
1247         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1248
1249         OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1250         filter->fo_ctxt.pwdmnt = mnt;
1251         filter->fo_ctxt.pwd = mnt->mnt_root;
1252         filter->fo_ctxt.fs = get_ds();
1253
1254         rc = filter_prep(obd);
1255         if (rc)
1256                 GOTO(err_mntput, rc);
1257
1258         spin_lock_init(&filter->fo_translock);
1259         spin_lock_init(&filter->fo_fddlock);
1260         spin_lock_init(&filter->fo_objidlock);
1261         INIT_LIST_HEAD(&filter->fo_export_list);
1262
1263         obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1264                                                 LDLM_NAMESPACE_SERVER);
1265         if (!obd->obd_namespace)
1266                 GOTO(err_post, rc = -ENOMEM);
1267
1268         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1269                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1270
1271         RETURN(0);
1272
1273 err_post:
1274         filter_post(obd);
1275 err_mntput:
1276         unlock_kernel();
1277         mntput(mnt);
1278         filter->fo_sb = 0;
1279         lock_kernel();
1280 err_ops:
1281         fsfilt_put_ops(obd->obd_fsops);
1282         return rc;
1283 }
1284
1285 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1286 {
1287         struct obd_ioctl_data* data = buf;
1288         char *option = NULL;
1289
1290         if (!strcmp(data->ioc_inlbuf2, "ext3"))
1291                 option = "asyncdel";
1292
1293         return filter_common_setup(obd, len, buf, option);
1294 }
1295
1296 /* sanobd setup methods - use a specific mount option */
1297 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1298 {
1299         struct obd_ioctl_data* data = buf;
1300         char *option = NULL;
1301
1302         if (!data->ioc_inlbuf2)
1303                 RETURN(-EINVAL);
1304
1305         /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1306         if (!strcmp(data->ioc_inlbuf2, "extN"))
1307                 option = "data=writeback";
1308         else if (!strcmp(data->ioc_inlbuf2, "ext3"))
1309                 option = "data=writeback,asyncdel";
1310         else
1311                 LBUG(); /* just a reminder */
1312
1313         return filter_common_setup(obd, len, buf, option);
1314 }
1315
1316 static int filter_cleanup(struct obd_device *obd, int force, int failover)
1317 {
1318         struct super_block *sb;
1319         ENTRY;
1320
1321         if (failover)
1322                 CERROR("%s: shutting down for failover; client state will"
1323                        " be preserved.\n", obd->obd_name);
1324
1325         if (!list_empty(&obd->obd_exports)) {
1326                 CERROR("%s: still has clients!\n", obd->obd_name);
1327                 class_disconnect_exports(obd, failover);
1328                 if (!list_empty(&obd->obd_exports)) {
1329                         CERROR("still has exports after forced cleanup?\n");
1330                         RETURN(-EBUSY);
1331                 }
1332         }
1333
1334         ldlm_namespace_free(obd->obd_namespace);
1335
1336         sb = obd->u.filter.fo_sb;
1337         if (!sb)
1338                 RETURN(0);
1339
1340         filter_post(obd);
1341
1342         shrink_dcache_parent(sb->s_root);
1343         unlock_kernel();
1344
1345         if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
1346                 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1347                        atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
1348         }
1349
1350         mntput(obd->u.filter.fo_vfsmnt);
1351         obd->u.filter.fo_sb = 0;
1352 /*        destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
1353
1354         fsfilt_put_ops(obd->obd_fsops);
1355         lock_kernel();
1356
1357         RETURN(0);
1358 }
1359
1360 int filter_attach(struct obd_device *obd, obd_count len, void *data)
1361 {
1362         struct lprocfs_static_vars lvars;
1363         int rc;
1364
1365         lprocfs_init_vars(&lvars);
1366         rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1367         if (rc != 0)
1368                 return rc;
1369
1370         rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1371         if (rc != 0)
1372                 return rc;
1373
1374         /* Init obdfilter private stats here */
1375         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1376                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1377         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1378                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1379         return rc;
1380 }
1381
1382 int filter_detach(struct obd_device *dev)
1383 {
1384         lprocfs_free_obd_stats(dev);
1385         return lprocfs_obd_detach(dev);
1386 }
1387
1388 /* nearly identical to mds_connect */
1389 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1390                           struct obd_uuid *cluuid)
1391 {
1392         struct obd_export *exp;
1393         struct filter_export_data *fed;
1394         struct filter_client_data *fcd;
1395         struct filter_obd *filter = &obd->u.filter;
1396         int rc;
1397
1398         ENTRY;
1399
1400         if (!conn || !obd || !cluuid)
1401                 RETURN(-EINVAL);
1402
1403         rc = class_connect(conn, obd, cluuid);
1404         if (rc)
1405                 RETURN(rc);
1406         exp = class_conn2export(conn);
1407         LASSERT(exp);
1408
1409         fed = &exp->exp_filter_data;
1410         class_export_put(exp);
1411
1412         INIT_LIST_HEAD(&fed->fed_open_head);
1413         spin_lock_init(&fed->fed_lock);
1414
1415         if (!obd->obd_replayable)
1416                 RETURN(0);
1417
1418         OBD_ALLOC(fcd, sizeof(*fcd));
1419         if (!fcd) {
1420                 CERROR("filter: out of memory for client data\n");
1421                 GOTO(out_export, rc = -ENOMEM);
1422         }
1423
1424         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1425         fed->fed_fcd = fcd;
1426         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1427
1428         rc = filter_client_add(obd, filter, fed, -1);
1429         if (rc)
1430                 GOTO(out_fcd, rc);
1431
1432         RETURN(rc);
1433
1434 out_fcd:
1435         OBD_FREE(fcd, sizeof(*fcd));
1436 out_export:
1437         class_disconnect(conn, 0);
1438
1439         RETURN(rc);
1440 }
1441
1442 static void filter_destroy_export(struct obd_export *exp)
1443 {
1444         struct filter_export_data *fed = &exp->exp_filter_data;
1445
1446         ENTRY;
1447         spin_lock(&fed->fed_lock);
1448         while (!list_empty(&fed->fed_open_head)) {
1449                 struct filter_file_data *ffd;
1450
1451                 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1452                                  ffd_export_list);
1453                 list_del(&ffd->ffd_export_list);
1454                 spin_unlock(&fed->fed_lock);
1455
1456                 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1457                        ffd->ffd_file->f_dentry->d_name.len,
1458                        ffd->ffd_file->f_dentry->d_name.name,
1459                        ffd, ffd->ffd_handle.h_cookie);
1460
1461                 filter_close_internal(exp, ffd, NULL, exp->exp_failover);
1462                 spin_lock(&fed->fed_lock);
1463         }
1464         spin_unlock(&fed->fed_lock);
1465
1466         if (exp->exp_obd->obd_replayable)
1467                 filter_client_free(exp, exp->exp_failover);
1468         EXIT;
1469 }
1470
1471 /* also incredibly similar to mds_disconnect */
1472 static int filter_disconnect(struct lustre_handle *conn, int failover)
1473 {
1474         struct obd_export *exp = class_conn2export(conn);
1475         int rc;
1476         unsigned long flags;
1477         ENTRY;
1478
1479         LASSERT(exp);
1480         ldlm_cancel_locks_for_export(exp);
1481
1482         spin_lock_irqsave(&exp->exp_lock, flags);
1483         exp->exp_failover = failover;
1484         spin_unlock_irqrestore(&exp->exp_lock, flags);
1485
1486         rc = class_disconnect(conn, failover);
1487
1488         fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1489         class_export_put(exp);
1490         /* XXX cleanup preallocated inodes */
1491         RETURN(rc);
1492 }
1493
1494 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1495 {
1496         int type = oa->o_mode & S_IFMT;
1497         ENTRY;
1498
1499         CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1500                inode->i_ino, inode, oa->o_id, valid);
1501         /* Don't copy the inode number in place of the object ID */
1502         obdo_from_inode(oa, inode, valid);
1503         oa->o_mode &= ~S_IFMT;
1504         oa->o_mode |= type;
1505
1506         if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1507                 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1508                 oa->o_rdev = rdev;
1509                 oa->o_valid |= OBD_MD_FLRDEV;
1510         }
1511
1512         EXIT;
1513 }
1514
1515 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1516                                          struct obdo *oa, char *what)
1517 {
1518         struct dentry *dchild = NULL;
1519
1520         if (oa->o_valid & OBD_MD_FLHANDLE) {
1521                 struct lustre_handle *ost_handle = obdo_handle(oa);
1522                 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1523
1524                 if (ffd != NULL) {
1525                         struct filter_dentry_data *fdd;
1526                         dchild = dget(ffd->ffd_file->f_dentry);
1527                         fdd = dchild->d_fsdata;
1528                         LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1529                         filter_ffd_put(ffd);
1530
1531                         CDEBUG(D_INODE,
1532                                "got child objid %*s: %p, count = %d\n",
1533                                dchild->d_name.len, dchild->d_name.name,
1534                                dchild, atomic_read(&dchild->d_count));
1535                 }
1536         }
1537
1538         if (!dchild) {
1539                 struct obd_device *obd = class_conn2obd(conn);
1540
1541                 if (!obd) {
1542                         CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1543                         RETURN(ERR_PTR(-EINVAL));
1544                 }
1545                 dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
1546         }
1547
1548         if (IS_ERR(dchild)) {
1549                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1550                 RETURN(dchild);
1551         }
1552
1553         if (!dchild->d_inode) {
1554                 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1555                 f_dput(dchild);
1556                 RETURN(ERR_PTR(-ENOENT));
1557         }
1558
1559         return dchild;
1560 }
1561
1562 #define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__)
1563
1564 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1565                           struct lov_stripe_md *md)
1566 {
1567         struct dentry *dentry = NULL;
1568         int rc = 0;
1569         ENTRY;
1570
1571         dentry = filter_oa2dentry(conn, oa);
1572         if (IS_ERR(dentry))
1573                 RETURN(PTR_ERR(dentry));
1574
1575         filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1576
1577         f_dput(dentry);
1578         RETURN(rc);
1579 }
1580
1581 /* this is called from filter_truncate() until we have filter_punch() */
1582 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1583                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1584 {
1585         struct obd_run_ctxt saved;
1586         struct obd_export *export = class_conn2export(conn);
1587         struct obd_device *obd = class_conn2obd(conn);
1588         struct filter_obd *filter = &obd->u.filter;
1589         struct dentry *dentry;
1590         struct iattr iattr;
1591         struct inode *inode;
1592         void * handle;
1593         int rc, rc2;
1594         ENTRY;
1595
1596         dentry = filter_oa2dentry(conn, oa);
1597
1598         if (IS_ERR(dentry))
1599                 GOTO(out_exp, rc = PTR_ERR(dentry));
1600
1601         iattr_from_obdo(&iattr, oa, oa->o_valid);
1602         iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1603         inode = dentry->d_inode;
1604
1605         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1606         lock_kernel();
1607         if (iattr.ia_valid & ATTR_SIZE)
1608                 down(&inode->i_sem);
1609
1610         handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1611         if (IS_ERR(handle))
1612                 GOTO(out_unlock, rc = PTR_ERR(handle));
1613
1614         rc = fsfilt_setattr(obd, dentry, handle, &iattr, 1);
1615         rc = filter_finish_transno(export, handle, oti, rc);
1616         rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
1617         if (rc2) {
1618                 CERROR("error on commit, err = %d\n", rc2);
1619                 if (!rc)
1620                         rc = rc2;
1621         }
1622
1623         if (iattr.ia_valid & ATTR_SIZE) {
1624                 up(&inode->i_sem);
1625                 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1626                 obdo_from_inode(oa, inode, oa->o_valid);
1627         }
1628
1629 out_unlock:
1630         unlock_kernel();
1631         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1632
1633         f_dput(dentry);
1634  out_exp:
1635         class_export_put(export);
1636         RETURN(rc);
1637 }
1638
1639 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1640                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
1641                        struct obd_client_handle *och)
1642 {
1643         struct obd_export *export = NULL;
1644         struct lustre_handle *handle;
1645         struct filter_file_data *ffd;
1646         struct file *filp;
1647         struct lustre_handle parent_lockh;
1648         int rc = 0;
1649         ENTRY;
1650
1651         export = class_conn2export(conn);
1652         if (!export) {
1653                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1654                        conn->cookie);
1655                 GOTO(out, rc = -EINVAL);
1656         }
1657
1658         filp = filter_obj_open(export, oa->o_id, oa->o_mode,
1659                                LCK_PR, &parent_lockh);
1660         if (IS_ERR(filp))
1661                 GOTO(out, rc = PTR_ERR(filp));
1662
1663         filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1664
1665         ffd = filp->private_data;
1666         handle = obdo_handle(oa);
1667         handle->cookie = ffd->ffd_handle.h_cookie;
1668         oa->o_valid |= OBD_MD_FLHANDLE;
1669
1670 out:
1671         class_export_put(export);
1672         if (!rc) {
1673                 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1674                        sizeof(parent_lockh));
1675                 oti->oti_ack_locks[0].mode = LCK_PR;
1676         }
1677         RETURN(rc);
1678 }
1679
1680 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1681                         struct lov_stripe_md *ea, struct obd_trans_info *oti)
1682 {
1683         struct obd_export *exp = class_conn2export(conn);
1684         struct filter_file_data *ffd;
1685         struct filter_export_data *fed;
1686         int rc;
1687         ENTRY;
1688
1689         if (!exp) {
1690                 CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
1691                 GOTO(out, rc = -EINVAL);
1692         }
1693
1694         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1695                 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1696                 GOTO(out, rc = -EINVAL);
1697         }
1698
1699         ffd = filter_handle2ffd(obdo_handle(oa));
1700         if (ffd == NULL) {
1701                 CERROR("bad handle ("LPX64") for close\n",
1702                        obdo_handle(oa)->cookie);
1703                 GOTO(out, rc = -ESTALE);
1704         }
1705
1706         fed = &exp->exp_filter_data;
1707         spin_lock(&fed->fed_lock);
1708         list_del(&ffd->ffd_export_list);
1709         spin_unlock(&fed->fed_lock);
1710
1711         rc = filter_close_internal(exp, ffd, oti, 0);
1712         filter_ffd_put(ffd);
1713         GOTO(out, rc);
1714  out:
1715         class_export_put(exp);
1716         return rc;
1717 }
1718
1719 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1720                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
1721 {
1722         struct obd_export *exp;
1723         struct obd_device *obd = class_conn2obd(conn);
1724         struct filter_obd *filter = &obd->u.filter;
1725         struct obd_run_ctxt saved;
1726         struct lustre_handle parent_lockh;
1727         struct dentry *dparent;
1728         struct dentry *dchild = NULL;
1729         struct iattr;
1730         void *handle;
1731         int err, rc, cleanup_phase;
1732         ENTRY;
1733
1734         if (!obd) {
1735                 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1736                 RETURN(-EINVAL);
1737         }
1738
1739         exp = class_conn2export(conn);
1740
1741         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1742  retry:
1743         oa->o_id = filter_next_id(filter);
1744
1745         cleanup_phase = 0;
1746         dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1747                                      &parent_lockh);
1748         if (IS_ERR(dparent))
1749                 GOTO(cleanup, rc = PTR_ERR(dparent));
1750         cleanup_phase = 1;
1751
1752         dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1753         if (IS_ERR(dchild))
1754                 GOTO(cleanup, rc = PTR_ERR(dchild));
1755         if (dchild->d_inode) {
1756                 /* This would only happen if lastobjid was bad on disk */
1757                 CERROR("Serious error: objid %*s already exists; is this "
1758                        "filesystem corrupt?  I will try to work around it.\n",
1759                        dchild->d_name.len, dchild->d_name.name);
1760                 f_dput(dchild);
1761                 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1762                 goto retry;
1763         }
1764
1765         cleanup_phase = 2;
1766         handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE);
1767         if (IS_ERR(handle))
1768                 GOTO(cleanup, rc = PTR_ERR(handle));
1769
1770         rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
1771         if (rc)
1772                 CERROR("create failed rc = %d\n", rc);
1773
1774         rc = filter_finish_transno(exp, handle, oti, rc);
1775         err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1776         if (err) {
1777                 CERROR("unable to write lastobjid but file created\n");
1778                 if (!rc)
1779                         rc = err;
1780         }
1781         err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1782         if (err) {
1783                 CERROR("error on commit, err = %d\n", err);
1784                 if (!rc)
1785                         rc = err;
1786         }
1787
1788         if (rc)
1789                 GOTO(cleanup, rc);
1790
1791         /* Set flags for fields we have set in the inode struct */
1792         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1793                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1794         filter_from_inode(oa, dchild->d_inode, oa->o_valid);
1795
1796         EXIT;
1797 cleanup:
1798         switch(cleanup_phase) {
1799         case 2:
1800                 f_dput(dchild);
1801         case 1: /* locked parent dentry */
1802                 if (rc || oti == NULL) {
1803                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1804                 } else {
1805                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1806                                sizeof(parent_lockh));
1807                         oti->oti_ack_locks[0].mode = LCK_PW;
1808                 }
1809         case 0:
1810                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1811                 class_export_put(exp);
1812                 break;
1813         default:
1814                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1815                 LBUG();
1816         }
1817
1818         RETURN(rc);
1819 }
1820
1821 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1822                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
1823 {
1824         struct obd_export *exp;
1825         struct obd_device *obd = class_conn2obd(conn);
1826         struct filter_obd *filter = &obd->u.filter;
1827         struct dentry *dparent, *dchild = NULL;
1828         struct filter_dentry_data *fdd;
1829         struct obd_run_ctxt saved;
1830         void *handle = NULL;
1831         struct lustre_handle parent_lockh;
1832         int rc, rc2, cleanup_phase = 0;
1833         ENTRY;
1834
1835         if (!obd) {
1836                 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1837                 RETURN(-EINVAL);
1838         }
1839
1840         exp = class_conn2export(conn);
1841
1842         CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1843
1844         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1845         dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1846                                      LCK_PW, &parent_lockh);
1847         if (IS_ERR(dparent))
1848                 GOTO(cleanup, rc = PTR_ERR(dparent));
1849         cleanup_phase = 1;
1850
1851         dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1852         if (IS_ERR(dchild))
1853                 GOTO(cleanup, rc = -ENOENT);
1854         cleanup_phase = 2;
1855
1856         if (!dchild->d_inode) {
1857                 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
1858                 GOTO(cleanup, rc = -ENOENT);
1859         }
1860
1861         handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK);
1862         if (IS_ERR(handle))
1863                 GOTO(cleanup, rc = PTR_ERR(handle));
1864         cleanup_phase = 3;
1865
1866         fdd = dchild->d_fsdata;
1867         if (fdd && atomic_read(&fdd->fdd_open_count)) {
1868                 LASSERT(fdd->fdd_magic = FILTER_DENTRY_MAGIC);
1869                 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1870                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1871                         /* XXX put into PENDING directory in case of crash */
1872                         CDEBUG(D_INODE,
1873                                "defer destroy of %dx open objid "LPU64"\n",
1874                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1875                 } else
1876                         CDEBUG(D_INODE,
1877                                "repeat destroy of %dx open objid "LPU64"\n",
1878                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1879                 GOTO(cleanup, rc = 0);
1880         }
1881
1882         rc = filter_destroy_internal(obd, dparent, dchild);
1883
1884 cleanup:
1885         switch(cleanup_phase) {
1886         case 3:
1887                 rc = filter_finish_transno(exp, handle, oti, rc);
1888                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1889                 if (rc2) {
1890                         CERROR("error on commit, err = %d\n", rc2);
1891                         if (!rc)
1892                                 rc = rc2;
1893                 }
1894         case 2:
1895                 f_dput(dchild);
1896         case 1:
1897                 if (rc || oti == NULL) {
1898                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1899                 } else {
1900                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1901                                sizeof(parent_lockh));
1902                         oti->oti_ack_locks[0].mode = LCK_PW;
1903                 }
1904         case 0:
1905                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1906                 class_export_put(exp);
1907                 break;
1908         default:
1909                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1910                 LBUG();
1911         }
1912
1913         RETURN(rc);
1914 }
1915
1916 /* NB start and end are used for punch, but not truncate */
1917 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1918                            struct lov_stripe_md *lsm,
1919                            obd_off start, obd_off end,
1920                            struct obd_trans_info *oti)
1921 {
1922         int error;
1923         ENTRY;
1924
1925         if (end != OBD_OBJECT_EOF)
1926                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
1927                        end);
1928
1929         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
1930                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1931         oa->o_size = start;
1932         error = filter_setattr(conn, oa, NULL, oti);
1933         RETURN(error);
1934 }
1935
1936 static inline void lustre_put_page(struct page *page)
1937 {
1938         page_cache_release(page);
1939 }
1940
1941 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
1942 {
1943         struct address_space *mapping = inode->i_mapping;
1944         struct page *page;
1945         unsigned long index = lnb->offset >> PAGE_SHIFT;
1946         int rc;
1947
1948         page = grab_cache_page(mapping, index); /* locked page */
1949         if (IS_ERR(page))
1950                 return lnb->rc = PTR_ERR(page);
1951
1952         lnb->page = page;
1953
1954         if (inode->i_size < lnb->offset + lnb->len - 1)
1955                 lnb->rc = inode->i_size - lnb->offset;
1956         else
1957                 lnb->rc = lnb->len;
1958
1959         if (PageUptodate(page)) {
1960                 unlock_page(page);
1961                 return 0;
1962         }
1963
1964         rc = mapping->a_ops->readpage(NULL, page);
1965         if (rc < 0) {
1966                 CERROR("page index %lu, rc = %d\n", index, rc);
1967                 lnb->page = NULL;
1968                 lustre_put_page(page);
1969                 return lnb->rc = rc;
1970         }
1971
1972         return 0;
1973 }
1974
1975 static int filter_finish_page_read(struct niobuf_local *lnb)
1976 {
1977         if (lnb->page == NULL)
1978                 return 0;
1979
1980         if (PageUptodate(lnb->page))
1981                 return 0;
1982
1983         wait_on_page(lnb->page);
1984         if (!PageUptodate(lnb->page)) {
1985                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
1986                        lnb->page->index, lnb->offset);
1987                 GOTO(err_page, lnb->rc = -EIO);
1988         }
1989         if (PageError(lnb->page)) {
1990                 CERROR("page index %lu/offset "LPX64" has error\n",
1991                        lnb->page->index, lnb->offset);
1992                 GOTO(err_page, lnb->rc = -EIO);
1993         }
1994
1995         return 0;
1996
1997 err_page:
1998         lustre_put_page(lnb->page);
1999         lnb->page = NULL;
2000         return lnb->rc;
2001 }
2002
2003 static struct page *lustre_get_page_write(struct inode *inode,
2004                                           unsigned long index)
2005 {
2006         struct address_space *mapping = inode->i_mapping;
2007         struct page *page;
2008         int rc;
2009
2010         page = grab_cache_page(mapping, index); /* locked page */
2011
2012         if (!IS_ERR(page)) {
2013                 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
2014                  * a no-op for most filesystems, because we write the whole
2015                  * page.  For partial-page I/O this will read in the page.
2016                  */
2017                 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
2018                 if (rc) {
2019                         CERROR("page index %lu, rc = %d\n", index, rc);
2020                         if (rc != -ENOSPC)
2021                                 LBUG();
2022                         GOTO(err_unlock, rc);
2023                 }
2024                 /* XXX not sure if we need this if we are overwriting page */
2025                 if (PageError(page)) {
2026                         CERROR("error on page index %lu, rc = %d\n", index, rc);
2027                         LBUG();
2028                         GOTO(err_unlock, rc = -EIO);
2029                 }
2030         }
2031         return page;
2032
2033 err_unlock:
2034         unlock_page(page);
2035         lustre_put_page(page);
2036         return ERR_PTR(rc);
2037 }
2038
2039 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2040 int waitfor_one_page(struct page *page)
2041 {
2042         wait_on_page_locked(page);
2043         return 0;
2044 }
2045 #endif
2046
2047 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2048 /* We should only change the file mtime (and not the ctime, like
2049  * update_inode_times() in generic_file_write()) when we only change data.
2050  */
2051 static inline void inode_update_time(struct inode *inode, int ctime_too)
2052 {
2053         time_t now = CURRENT_TIME;
2054         if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
2055                 return;
2056         inode->i_mtime = now;
2057         if (ctime_too)
2058                 inode->i_ctime = now;
2059         mark_inode_dirty_sync(inode);
2060 }
2061 #endif
2062
2063 static int lustre_commit_write(struct niobuf_local *lnb)
2064 {
2065         struct page *page = lnb->page;
2066         unsigned from = lnb->offset & ~PAGE_MASK;
2067         unsigned to = from + lnb->len;
2068         struct inode *inode = page->mapping->host;
2069         int err;
2070
2071         LASSERT(to <= PAGE_SIZE);
2072         err = page->mapping->a_ops->commit_write(NULL, page, from, to);
2073         if (!err && IS_SYNC(inode))
2074                 err = waitfor_one_page(page);
2075         //SetPageUptodate(page); // the client commit_write will do this
2076
2077         SetPageReferenced(page);
2078         unlock_page(page);
2079         lustre_put_page(page);
2080         return err;
2081 }
2082
2083 int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
2084                           int *pglocked)
2085 {
2086         unsigned long index = lnb->offset >> PAGE_SHIFT;
2087         struct address_space *mapping = inode->i_mapping;
2088         struct page *page;
2089         int rc;
2090
2091         //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
2092         if (*pglocked)
2093                 page = grab_cache_page_nowait(mapping, index); /* locked page */
2094         else
2095                 page = grab_cache_page(mapping, index); /* locked page */
2096
2097
2098         /* This page is currently locked, so get a temporary page instead. */
2099         if (!page) {
2100                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
2101                 page = alloc_pages(GFP_KERNEL, 0); /* locked page */
2102                 if (!page) {
2103                         CERROR("no memory for a temp page\n");
2104                         GOTO(err, rc = -ENOMEM);
2105                 }
2106                 page->index = index;
2107                 lnb->page = page;
2108                 lnb->flags |= N_LOCAL_TEMP_PAGE;
2109         } else if (!IS_ERR(page)) {
2110                 (*pglocked)++;
2111
2112                 rc = mapping->a_ops->prepare_write(NULL, page,
2113                                                    lnb->offset & ~PAGE_MASK,
2114                                                    lnb->len);
2115                 if (rc) {
2116                         if (rc != -ENOSPC)
2117                                 CERROR("page index %lu, rc = %d\n", index, rc);
2118                         GOTO(err_unlock, rc);
2119                 }
2120                 /* XXX not sure if we need this if we are overwriting page */
2121                 if (PageError(page)) {
2122                         CERROR("error on page index %lu, rc = %d\n", index, rc);
2123                         LBUG();
2124                         GOTO(err_unlock, rc = -EIO);
2125                 }
2126                 lnb->page = page;
2127         }
2128
2129         return 0;
2130
2131 err_unlock:
2132         unlock_page(page);
2133         lustre_put_page(page);
2134 err:
2135         return lnb->rc = rc;
2136 }
2137
2138 /*
2139  * We need to balance prepare_write() calls with commit_write() calls.
2140  * If the page has been prepared, but we have no data for it, we don't
2141  * want to overwrite valid data on disk, but we still need to zero out
2142  * data for space which was newly allocated.  Like part of what happens
2143  * in __block_prepare_write() for newly allocated blocks.
2144  *
2145  * XXX currently __block_prepare_write() creates buffers for all the
2146  *     pages, and the filesystems mark these buffers as BH_New if they
2147  *     were newly allocated from disk. We use the BH_New flag similarly.
2148  */
2149 static int filter_commit_write(struct niobuf_local *lnb, int err)
2150 {
2151 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2152         if (err) {
2153                 unsigned block_start, block_end;
2154                 struct buffer_head *bh, *head = lnb->page->buffers;
2155                 unsigned blocksize = head->b_size;
2156
2157                 /* debugging: just seeing if this ever happens */
2158                 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
2159                        "called for ino %lu:%lu on err %d\n",
2160                        lnb->page->mapping->host->i_ino, lnb->page->index, err);
2161
2162                 /* Currently one buffer per page, but in the future... */
2163                 for (bh = head, block_start = 0; bh != head || !block_start;
2164                      block_start = block_end, bh = bh->b_this_page) {
2165                         block_end = block_start + blocksize;
2166                         if (buffer_new(bh)) {
2167                                 memset(kmap(lnb->page) + block_start, 0,
2168                                        blocksize);
2169                                 kunmap(lnb->page);
2170                         }
2171                 }
2172         }
2173 #endif
2174         return lustre_commit_write(lnb);
2175 }
2176
2177 static int filter_preprw(int cmd, struct obd_export *exp,
2178                          int objcount, struct obd_ioobj *obj,
2179                          int niocount, struct niobuf_remote *nb,
2180                          struct niobuf_local *res, void **desc_private,
2181                          struct obd_trans_info *oti)
2182 {
2183         struct obd_run_ctxt saved;
2184         struct obd_device *obd;
2185         struct obd_ioobj *o;
2186         struct niobuf_remote *rnb;
2187         struct niobuf_local *lnb;
2188         struct fsfilt_objinfo *fso;
2189         struct dentry *dentry;
2190         struct inode *inode;
2191         int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
2192         unsigned long now = jiffies;
2193         ENTRY;
2194
2195         memset(res, 0, niocount * sizeof(*res));
2196
2197         obd = exp->exp_obd;
2198         if (obd == NULL)
2199                 RETURN(-EINVAL);
2200
2201         // theoretically we support multi-obj BRW RPCs, but until then...
2202         LASSERT(objcount == 1);
2203
2204         OBD_ALLOC(fso, objcount * sizeof(*fso));
2205         if (!fso)
2206                 RETURN(-ENOMEM);
2207
2208         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2209
2210         for (i = 0, o = obj; i < objcount; i++, o++) {
2211                 struct filter_dentry_data *fdd;
2212
2213                 LASSERT(o->ioo_bufcnt);
2214
2215                 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2216
2217                 if (IS_ERR(dentry))
2218                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
2219
2220                 fso[i].fso_dentry = dentry;
2221                 fso[i].fso_bufcnt = o->ioo_bufcnt;
2222
2223                 if (!dentry->d_inode) {
2224                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2225                                o->ioo_id);
2226                         f_dput(dentry);
2227                         GOTO(out_objinfo, rc = -ENOENT);
2228                 }
2229
2230                 /* If we ever start to support mutli-object BRW RPCs, we will
2231                  * need to get locks on mulitple inodes (in order) or use the
2232                  * DLM to do the locking for us (and use the same locking in
2233                  * filter_setattr() for truncate).  That isn't all, because
2234                  * there still exists the possibility of a truncate starting
2235                  * a new transaction while holding the ext3 rwsem = write
2236                  * while some writes (which have started their transactions
2237                  * here) blocking on the ext3 rwsem = read => lock inversion.
2238                  *
2239                  * The handling gets very ugly when dealing with locked pages.
2240                  * It may be easier to just get rid of the locked page code
2241                  * (which has problems of its own) and either discover we do
2242                  * not need it anymore (i.e. it was a symptom of another bug)
2243                  * or ensure we get the page locks in an appropriate order.
2244                  */
2245                 if (cmd & OBD_BRW_WRITE)
2246                         down(&dentry->d_inode->i_sem);
2247                 fdd = dentry->d_fsdata;
2248                 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2249                         CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2250                                o->ioo_id);
2251         }
2252
2253         if (time_after(jiffies, now + 15*HZ))
2254                 CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
2255
2256         if (cmd & OBD_BRW_WRITE) {
2257                 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2258                                                  niocount, nb);
2259                 if (IS_ERR(*desc_private)) {
2260                         rc = PTR_ERR(*desc_private);
2261                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2262                                "error starting transaction: rc = %d\n", rc);
2263                         *desc_private = NULL;
2264                         GOTO(out_objinfo, rc);
2265                 }
2266         }
2267
2268         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
2269                 dentry = fso[i].fso_dentry;
2270                 inode = dentry->d_inode;
2271
2272                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2273                         if (j == 0)
2274                                 lnb->dentry = dentry;
2275                         else
2276                                 lnb->dentry = dget(dentry);
2277
2278                         lnb->offset = rnb->offset;
2279                         lnb->len    = rnb->len;
2280                         lnb->flags  = rnb->flags;
2281                         lnb->start  = jiffies;
2282
2283                         if (cmd & OBD_BRW_WRITE) {
2284                                 rc = filter_get_page_write(inode,lnb,&pglocked);
2285                                 if (rc)
2286                                         up(&dentry->d_inode->i_sem);
2287                         } else if (inode->i_size <= rnb->offset) {
2288                                 /* If there's no more data, abort early.
2289                                  * lnb->page == NULL and lnb->rc == 0, so it's
2290                                  * easy to detect later. */
2291                                 f_dput(dentry);
2292                                 lnb->dentry = NULL;
2293                                 break;
2294                         } else {
2295                                 rc = filter_start_page_read(inode, lnb);
2296                         }
2297
2298                         if (rc) {
2299                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2300                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
2301                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
2302                                        dentry, rc);
2303                                 f_dput(dentry);
2304                                 GOTO(out_pages, rc);
2305                         }
2306
2307                         tot_bytes += lnb->len;
2308
2309                         if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
2310                                 /* Likewise with a partial read */
2311                                 break;
2312                         }
2313                 }
2314         }
2315
2316         if (time_after(jiffies, now + 15*HZ))
2317                 CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
2318
2319         if (cmd & OBD_BRW_READ) {
2320                 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
2321                                     tot_bytes);
2322                 while (lnb-- > res) {
2323                         rc = filter_finish_page_read(lnb);
2324                         if (rc) {
2325                                 CERROR("error page %u@"LPU64" %u %p: rc %d\n",
2326                                        lnb->len, lnb->offset, lnb - res,
2327                                        lnb->dentry, rc);
2328                                 f_dput(lnb->dentry);
2329                                 GOTO(out_pages, rc);
2330                         }
2331                 }
2332         } else
2333                 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
2334                                     tot_bytes);
2335
2336         if (time_after(jiffies, now + 15*HZ))
2337                 CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
2338
2339         EXIT;
2340 out:
2341         OBD_FREE(fso, objcount * sizeof(*fso));
2342         current->journal_info = NULL;
2343         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2344         return rc;
2345
2346 out_pages:
2347         while (lnb-- > res) {
2348                 if (cmd & OBD_BRW_WRITE) {
2349                         filter_commit_write(lnb, rc);
2350                         up(&lnb->dentry->d_inode->i_sem);
2351                 } else {
2352                         lustre_put_page(lnb->page);
2353                 }
2354                 f_dput(lnb->dentry);
2355         }
2356         if (cmd & OBD_BRW_WRITE) {
2357                 filter_finish_transno(exp, *desc_private, oti, rc);
2358                 fsfilt_commit(obd,
2359                               filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2360                               *desc_private, 0);
2361         }
2362         goto out; /* dropped the dentry refs already (one per page) */
2363
2364 out_objinfo:
2365         for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
2366                 if (cmd & OBD_BRW_WRITE)
2367                         up(&fso[i].fso_dentry->d_inode->i_sem);
2368                 f_dput(fso[i].fso_dentry);
2369         }
2370         goto out;
2371 }
2372
2373 static int filter_write_locked_page(struct niobuf_local *lnb)
2374 {
2375         struct page *lpage;
2376         void        *lpage_addr;
2377         void        *lnb_addr;
2378         int rc;
2379         ENTRY;
2380
2381         lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2382         if (IS_ERR(lpage)) {
2383                 /* It is highly unlikely that we would ever get an error here.
2384                  * The page we want to get was previously locked, so it had to
2385                  * have already allocated the space, and we were just writing
2386                  * over the same data, so there would be no hole in the file.
2387                  *
2388                  * XXX: possibility of a race with truncate could exist, need
2389                  *      to check that.  There are no guarantees w.r.t.
2390                  *      write order even on a local filesystem, although the
2391                  *      normal response would be to return the number of bytes
2392                  *      successfully written and leave the rest to the app.
2393                  */
2394                 rc = PTR_ERR(lpage);
2395                 CERROR("error getting locked page index %ld: rc = %d\n",
2396                        lnb->page->index, rc);
2397                 LBUG();
2398                 lustre_commit_write(lnb);
2399                 RETURN(rc);
2400         }
2401
2402         /* 2 kmaps == vanishingly small deadlock opportunity */
2403         lpage_addr = kmap(lpage);
2404         lnb_addr = kmap(lnb->page);
2405
2406         memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
2407
2408         kunmap(lnb->page);
2409         kunmap(lpage);
2410
2411         lustre_put_page(lnb->page);
2412
2413         lnb->page = lpage;
2414         rc = lustre_commit_write(lnb);
2415         if (rc)
2416                 CERROR("error committing locked page %ld: rc = %d\n",
2417                        lnb->page->index, rc);
2418
2419         RETURN(rc);
2420 }
2421
2422 static int filter_syncfs(struct obd_export *exp)
2423 {
2424         struct obd_device *obd = exp->exp_obd;
2425         ENTRY;
2426
2427         RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2428 }
2429
2430 static int filter_commitrw(int cmd, struct obd_export *exp,
2431                            int objcount, struct obd_ioobj *obj,
2432                            int niocount, struct niobuf_local *res,
2433                            void *desc_private, struct obd_trans_info *oti)
2434 {
2435         struct obd_run_ctxt saved;
2436         struct obd_ioobj *o;
2437         struct niobuf_local *lnb;
2438         struct obd_device *obd = exp->exp_obd;
2439         int found_locked = 0, rc = 0, i;
2440         unsigned long now = jiffies;  /* DEBUGGING OST TIMEOUTS */
2441         ENTRY;
2442
2443         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2444
2445         LASSERT(!current->journal_info);
2446         current->journal_info = desc_private;
2447
2448         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2449                 int j;
2450
2451                 if (cmd & OBD_BRW_WRITE) {
2452                         inode_update_time(lnb->dentry->d_inode, 1);
2453                         up(&lnb->dentry->d_inode->i_sem);
2454                 }
2455                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2456                         if (lnb->page == NULL) {
2457                                 continue;
2458                         }
2459
2460                         if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2461                                 found_locked++;
2462                                 continue;
2463                         }
2464
2465                         if (time_after(jiffies, lnb->start + 15*HZ))
2466                                 CERROR("slow commitrw %lus\n",
2467                                        (jiffies - lnb->start) / HZ);
2468
2469                         if (cmd & OBD_BRW_WRITE) {
2470                                 int err = filter_commit_write(lnb, 0);
2471
2472                                 if (!rc)
2473                                         rc = err;
2474                         } else {
2475                                 lustre_put_page(lnb->page);
2476                         }
2477
2478                         f_dput(lnb->dentry);
2479                         if (time_after(jiffies, lnb->start + 15*HZ))
2480                                 CERROR("slow commit_write %lus\n",
2481                                        (jiffies - lnb->start) / HZ);
2482                 }
2483         }
2484
2485         for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2486              i++, o++) {
2487                 int j;
2488                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2489                         int err;
2490                         if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2491                                 continue;
2492
2493                         if (time_after(jiffies, lnb->start + 15*HZ))
2494                                 CERROR("slow commitrw locked %lus\n",
2495                                        (jiffies - lnb->start) / HZ);
2496
2497                         err = filter_write_locked_page(lnb);
2498                         if (!rc)
2499                                 rc = err;
2500                         f_dput(lnb->dentry);
2501                         found_locked--;
2502
2503                         if (time_after(jiffies, lnb->start + 15*HZ))
2504                                 CERROR("slow commit_write locked %lus\n",
2505                                        (jiffies - lnb->start) / HZ);
2506                 }
2507         }
2508
2509         if (cmd & OBD_BRW_WRITE) {
2510                 /* We just want any dentry for the commit, for now */
2511                 struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
2512                 int err;
2513
2514                 rc = filter_finish_transno(exp, desc_private, oti, rc);
2515                 err = fsfilt_commit(obd, dparent->d_inode, desc_private,
2516                                     obd_sync_filter);
2517                 if (err)
2518                         rc = err;
2519                 if (obd_sync_filter)
2520                         LASSERT(oti->oti_transno <= obd->obd_last_committed);
2521
2522                 if (time_after(jiffies, now + 15*HZ))
2523                         CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
2524         }
2525
2526         LASSERT(!current->journal_info);
2527
2528         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2529         RETURN(rc);
2530 }
2531
2532 static int filter_brw(int cmd, struct lustre_handle *conn,
2533                       struct lov_stripe_md *lsm, obd_count oa_bufs,
2534                       struct brw_page *pga, struct obd_trans_info *oti)
2535 {
2536         struct obd_export *export = class_conn2export(conn);
2537         struct obd_ioobj        ioo;
2538         struct niobuf_local     *lnb;
2539         struct niobuf_remote    *rnb;
2540         obd_count               i;
2541         void                    *desc_private;
2542         int                     ret = 0;
2543         ENTRY;
2544
2545         if (export == NULL)
2546                 RETURN(-EINVAL);
2547
2548         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2549         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2550
2551         if (lnb == NULL || rnb == NULL)
2552                 GOTO(out, ret = -ENOMEM);
2553
2554         for (i = 0; i < oa_bufs; i++) {
2555                 rnb[i].offset = pga[i].off;
2556                 rnb[i].len = pga[i].count;
2557         }
2558
2559         ioo.ioo_id = lsm->lsm_object_id;
2560         ioo.ioo_gr = 0;
2561         ioo.ioo_type = S_IFREG;
2562         ioo.ioo_bufcnt = oa_bufs;
2563
2564         ret = filter_preprw(cmd, export, 1, &ioo, oa_bufs, rnb, lnb,
2565                             &desc_private, oti);
2566         if (ret != 0)
2567                 GOTO(out, ret);
2568
2569         for (i = 0; i < oa_bufs; i++) {
2570                 void *virt = kmap(pga[i].pg);
2571                 obd_off off = pga[i].off & ~PAGE_MASK;
2572                 void *addr = kmap(lnb[i].page);
2573
2574                 /* 2 kmaps == vanishingly small deadlock opportunity */
2575
2576                 if (cmd & OBD_BRW_WRITE)
2577                         memcpy(addr + off, virt + off, pga[i].count);
2578                 else
2579                         memcpy(virt + off, addr + off, pga[i].count);
2580
2581                 kunmap(addr);
2582                 kunmap(virt);
2583         }
2584
2585         ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
2586                               oti);
2587
2588 out:
2589         if (lnb)
2590                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2591         if (rnb)
2592                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2593         class_export_put(export);
2594         RETURN(ret);
2595 }
2596
2597 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2598                              int objcount, struct obd_ioobj *obj,
2599                              int niocount, struct niobuf_remote *nb)
2600 {
2601         struct obd_device *obd;
2602         struct obd_ioobj *o = obj;
2603         struct niobuf_remote *rnb = nb;
2604         int rc = 0;
2605         int i;
2606         ENTRY;
2607
2608         obd = class_conn2obd(conn);
2609         if (!obd) {
2610                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2611                        conn->cookie);
2612                 RETURN(-EINVAL);
2613         }
2614
2615         for (i = 0; i < objcount; i++, o++) {
2616                 struct dentry *dentry;
2617                 struct inode *inode;
2618                 int (*fs_bmap)(struct address_space *, long);
2619                 int j;
2620
2621                 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2622                 if (IS_ERR(dentry))
2623                         GOTO(out, rc = PTR_ERR(dentry));
2624                 inode = dentry->d_inode;
2625                 if (!inode) {
2626                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2627                                o->ioo_id);
2628                         f_dput(dentry);
2629                         GOTO(out, rc = -ENOENT);
2630                 }
2631                 fs_bmap = inode->i_mapping->a_ops->bmap;
2632
2633                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2634                         long block;
2635
2636                         block = rnb->offset >> inode->i_blkbits;
2637
2638                         if (cmd == OBD_BRW_READ) {
2639                                 block = fs_bmap(inode->i_mapping, block);
2640                         } else {
2641                                 loff_t newsize = rnb->offset + rnb->len;
2642                                 /* fs_prep_san_write will also update inode
2643                                  * size for us:
2644                                  * (1) new alloced block
2645                                  * (2) existed block but size extented
2646                                  */
2647                                 /* FIXME We could call fs_prep_san_write()
2648                                  * only once for all the blocks allocation.
2649                                  * Now call it once for each block, for
2650                                  * simplicity. And if error happens, we
2651                                  * probably need to release previous alloced
2652                                  * block */
2653                                 rc = fs_prep_san_write(obd, inode, &block,
2654                                                        1, newsize);
2655                                 if (rc)
2656                                         break;
2657                         }
2658
2659                         rnb->offset = block;
2660                 }
2661                 f_dput(dentry);
2662         }
2663 out:
2664         RETURN(rc);
2665 }
2666
2667 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2668 {
2669         struct obd_device *obd;
2670         ENTRY;
2671
2672         obd = class_conn2obd(conn);
2673
2674         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2675 }
2676
2677 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2678                            void *key, __u32 *vallen, void *val)
2679 {
2680         struct obd_device *obd;
2681         ENTRY;
2682
2683         obd = class_conn2obd(conn);
2684         if (!obd) {
2685                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2686                        conn->cookie);
2687                 RETURN(-EINVAL);
2688         }
2689
2690         if (keylen == strlen("blocksize") &&
2691             memcmp(key, "blocksize", keylen) == 0) {
2692                 __u32 *blocksize = val;
2693                 *vallen = sizeof(*blocksize);
2694                 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2695                 RETURN(0);
2696         }
2697
2698         if (keylen == strlen("blocksize_bits") &&
2699             memcmp(key, "blocksize_bits", keylen) == 0) {
2700                 __u32 *blocksize_bits = val;
2701                 *vallen = sizeof(*blocksize_bits);
2702                 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2703                 RETURN(0);
2704         }
2705
2706         CDEBUG(D_IOCTL, "invalid key\n");
2707         RETURN(-EINVAL);
2708 }
2709
2710 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2711                   struct lustre_handle *src_conn, struct obdo *src,
2712                   obd_size count, obd_off offset, struct obd_trans_info *oti)
2713 {
2714         struct page *page;
2715         struct lov_stripe_md srcmd, dstmd;
2716         unsigned long index = 0;
2717         int err = 0;
2718
2719         LBUG(); /* THIS CODE IS NOT CORRECT -phil */
2720
2721         memset(&srcmd, 0, sizeof(srcmd));
2722         memset(&dstmd, 0, sizeof(dstmd));
2723         srcmd.lsm_object_id = src->o_id;
2724         dstmd.lsm_object_id = dst->o_id;
2725
2726         ENTRY;
2727         CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2728                ", dst: ino "LPU64"\n",
2729                src->o_id, src->o_blocks, src->o_size, dst->o_id);
2730         page = alloc_page(GFP_USER);
2731         if (page == NULL)
2732                 RETURN(-ENOMEM);
2733
2734         wait_on_page(page);
2735
2736         /* XXX with brw vector I/O, we could batch up reads and writes here,
2737          *     all we need to do is allocate multiple pages to handle the I/Os
2738          *     and arrays to handle the request parameters.
2739          */
2740         while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2741                 struct brw_page pg;
2742
2743                 pg.pg = page;
2744                 pg.count = PAGE_SIZE;
2745                 pg.off = (page->index) << PAGE_SHIFT;
2746                 pg.flag = 0;
2747
2748                 page->index = index;
2749                 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
2750                 if (err) {
2751                         EXIT;
2752                         break;
2753                 }
2754
2755                 pg.flag = OBD_BRW_CREATE;
2756                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2757
2758                 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
2759
2760                 /* XXX should handle dst->o_size, dst->o_blocks here */
2761                 if (err) {
2762                         EXIT;
2763                         break;
2764                 }
2765
2766                 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2767
2768                 index++;
2769         }
2770         dst->o_size = src->o_size;
2771         dst->o_blocks = src->o_blocks;
2772         dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2773         unlock_page(page);
2774         __free_page(page);
2775
2776         RETURN(err);
2777 }
2778
2779 int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2780                   int len, void *karg, void *uarg)
2781 {
2782         struct obd_device *obd = class_conn2obd(conn);
2783
2784         switch (cmd) {
2785         case OBD_IOC_ABORT_RECOVERY:
2786                 CERROR("aborting recovery for device %s\n", obd->obd_name);
2787                 target_abort_recovery(obd);
2788                 RETURN(0);
2789
2790         default:
2791                 RETURN(-EINVAL);
2792         }
2793         RETURN(0);
2794 }
2795
2796
2797 static struct obd_ops filter_obd_ops = {
2798         o_owner:          THIS_MODULE,
2799         o_attach:         filter_attach,
2800         o_detach:         filter_detach,
2801         o_get_info:       filter_get_info,
2802         o_setup:          filter_setup,
2803         o_cleanup:        filter_cleanup,
2804         o_connect:        filter_connect,
2805         o_disconnect:     filter_disconnect,
2806         o_statfs:         filter_statfs,
2807         o_syncfs:         filter_syncfs,
2808         o_getattr:        filter_getattr,
2809         o_create:         filter_create,
2810         o_setattr:        filter_setattr,
2811         o_destroy:        filter_destroy,
2812         o_open:           filter_open,
2813         o_close:          filter_close,
2814         o_brw:            filter_brw,
2815         o_punch:          filter_truncate,
2816         o_preprw:         filter_preprw,
2817         o_commitrw:       filter_commitrw,
2818         o_destroy_export: filter_destroy_export,
2819         o_iocontrol:      filter_iocontrol,
2820 #if 0
2821         o_san_preprw:  filter_san_preprw,
2822         o_preallocate: filter_preallocate_inodes,
2823         o_migrate:     filter_migrate,
2824         o_copy:        filter_copy_data,
2825         o_iterate:     filter_iterate
2826 #endif
2827 };
2828
2829 static struct obd_ops filter_sanobd_ops = {
2830         o_owner:          THIS_MODULE,
2831         o_attach:         filter_attach,
2832         o_detach:         filter_detach,
2833         o_get_info:       filter_get_info,
2834         o_setup:          filter_san_setup,
2835         o_cleanup:        filter_cleanup,
2836         o_connect:        filter_connect,
2837         o_disconnect:     filter_disconnect,
2838         o_statfs:         filter_statfs,
2839         o_getattr:        filter_getattr,
2840         o_create:         filter_create,
2841         o_setattr:        filter_setattr,
2842         o_destroy:        filter_destroy,
2843         o_open:           filter_open,
2844         o_close:          filter_close,
2845         o_brw:            filter_brw,
2846         o_punch:          filter_truncate,
2847         o_preprw:         filter_preprw,
2848         o_commitrw:       filter_commitrw,
2849         o_san_preprw:     filter_san_preprw,
2850         o_destroy_export: filter_destroy_export,
2851         o_iocontrol:      filter_iocontrol,
2852 #if 0
2853         o_preallocate:  filter_preallocate_inodes,
2854         o_migrate:      filter_migrate,
2855         o_copy:         filter_copy_data,
2856         o_iterate:      filter_iterate
2857 #endif
2858 };
2859
2860
2861 static int __init obdfilter_init(void)
2862 {
2863         struct lprocfs_static_vars lvars;
2864         int rc;
2865
2866         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2867
2868         lprocfs_init_vars(&lvars);
2869
2870         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2871                                  OBD_FILTER_DEVICENAME);
2872         if (rc)
2873                 return rc;
2874
2875         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2876                                  OBD_FILTER_SAN_DEVICENAME);
2877         if (rc)
2878                 class_unregister_type(OBD_FILTER_DEVICENAME);
2879         return rc;
2880 }
2881
2882 static void __exit obdfilter_exit(void)
2883 {
2884         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2885         class_unregister_type(OBD_FILTER_DEVICENAME);
2886 }
2887
2888 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2889 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2890 MODULE_LICENSE("GPL");
2891
2892 module_init(obdfilter_init);
2893 module_exit(obdfilter_exit);