Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_FILTER
38
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
42 #include <linux/fs.h>
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
54 #endif
55
56 enum {
57         LPROC_FILTER_READ_BYTES = 0,
58         LPROC_FILTER_WRITE_BYTES = 1,
59         LPROC_FILTER_LAST,
60 };
61
62 #define S_SHIFT 12
63 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
64         [0]                     NULL,
65         [S_IFREG >> S_SHIFT]    "R",
66         [S_IFDIR >> S_SHIFT]    "D",
67         [S_IFCHR >> S_SHIFT]    "C",
68         [S_IFBLK >> S_SHIFT]    "B",
69         [S_IFIFO >> S_SHIFT]    "F",
70         [S_IFSOCK >> S_SHIFT]   "S",
71         [S_IFLNK >> S_SHIFT]    "L"
72 };
73
74 static inline const char *obd_mode_to_type(int mode)
75 {
76         return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
77 }
78
79 static void filter_ffd_addref(void *ffdp)
80 {
81         struct filter_file_data *ffd = ffdp;
82
83         atomic_inc(&ffd->ffd_refcount);
84         CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
85                atomic_read(&ffd->ffd_refcount));
86 }
87
88 static struct filter_file_data *filter_ffd_new(void)
89 {
90         struct filter_file_data *ffd;
91
92         OBD_ALLOC(ffd, sizeof *ffd);
93         if (ffd == NULL) {
94                 CERROR("out of memory\n");
95                 return NULL;
96         }
97
98         atomic_set(&ffd->ffd_refcount, 2);
99
100         INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
101         class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
102
103         return ffd;
104 }
105
106 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
107 {
108         struct filter_file_data *ffd = NULL;
109         ENTRY;
110         LASSERT(handle != NULL);
111         ffd = class_handle2object(handle->cookie);
112         if (ffd != NULL)
113                 LASSERT(ffd->ffd_file->private_data == ffd);
114         RETURN(ffd);
115 }
116
117 static void filter_ffd_put(struct filter_file_data *ffd)
118 {
119         CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
120                atomic_read(&ffd->ffd_refcount) - 1);
121         LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
122                 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
123         if (atomic_dec_and_test(&ffd->ffd_refcount)) {
124                 LASSERT(list_empty(&ffd->ffd_handle.h_link));
125                 OBD_FREE(ffd, sizeof *ffd);
126         }
127 }
128
129 static void filter_ffd_destroy(struct filter_file_data *ffd)
130 {
131         class_handle_unhash(&ffd->ffd_handle);
132         filter_ffd_put(ffd);
133 }
134
135 static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
136 {
137         obd_transno_commit_cb(obd, transno, error);
138 }
139 /* Assumes caller has already pushed us into the kernel context. */
140 int filter_finish_transno(struct obd_export *export, void *handle,
141                           struct obd_trans_info *oti, int rc)
142 {
143         __u64 last_rcvd;
144         struct obd_device *obd = export->exp_obd;
145         struct filter_obd *filter = &obd->u.filter;
146         struct filter_export_data *fed = &export->exp_filter_data;
147         struct filter_client_data *fcd = fed->fed_fcd;
148         loff_t off;
149         ssize_t written;
150
151         /* Propagate error code. */
152         if (rc)
153                 RETURN(rc);
154
155         if (!obd->obd_replayable)
156                 RETURN(rc);
157
158         /* we don't allocate new transnos for replayed requests */
159         if (oti && oti->oti_transno == 0) {
160                 spin_lock(&filter->fo_translock);
161                 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd) + 1;
162                 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
163                 spin_unlock(&filter->fo_translock);
164                 oti->oti_transno = last_rcvd;
165                 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
166                 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
167
168                 /* could get xid from oti, if it's ever needed */
169                 fcd->fcd_last_xid = 0;
170
171                 off = fed->fed_lr_off;
172                 fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
173                 written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, 
174                                         sizeof(*fcd), &off);
175                 CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
176                        "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid, 
177                        fed->fed_lr_idx, written);
178
179                 if (written == sizeof(*fcd))
180                         RETURN(0);
181                 CERROR("error writing to last_rcvd file: rc = %d\n", 
182                        (int)written);
183                 if (written >= 0)
184                         RETURN(-EIO);
185
186                 RETURN(written);
187         }                 
188
189         RETURN(0);
190 }
191
192 static inline void f_dput(struct dentry *dentry)
193 {
194         /* Can't go inside filter_ddelete because it can block */
195         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
196                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
197         LASSERT(atomic_read(&dentry->d_count) > 0);
198
199         dput(dentry);
200 }
201
202 /* Not racy w.r.t. others, because we are the only user of this dentry */
203 static void filter_drelease(struct dentry *dentry)
204 {
205         if (dentry->d_fsdata)
206                 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
207 }
208
209 struct dentry_operations filter_dops = {
210         .d_release = filter_drelease,
211 };
212
213 #define LAST_RCVD "last_rcvd"
214 #define INIT_OBJID 2
215
216 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
217 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
218 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
219
220 /* Add client data to the FILTER.  We use a bitmap to locate a free space
221  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
222  * Otherwise, we have just read the data from the last_rcvd file and
223  * we know its offset.
224  */
225 int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
226                       struct filter_export_data *fed, int cl_idx)
227 {
228         unsigned long *bitmap = filter->fo_last_rcvd_slots;
229         int new_client = (cl_idx == -1);
230
231         LASSERT(bitmap != NULL);
232
233         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
234         if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
235                 RETURN(0);
236
237         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
238          * there's no need for extra complication here
239          */
240         if (new_client) {
241                 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
242         repeat:
243                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
244                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
245                         return -ENOMEM;
246                 }
247                 if (test_and_set_bit(cl_idx, bitmap)) {
248                         CERROR("FILTER client %d: found bit is set in bitmap\n",
249                                cl_idx);
250                         cl_idx = find_next_zero_bit(bitmap,
251                                                     FILTER_LR_MAX_CLIENTS,
252                                                     cl_idx);
253                         goto repeat;
254                 }
255         } else {
256                 if (test_and_set_bit(cl_idx, bitmap)) {
257                         CERROR("FILTER client %d: bit already set in bitmap!\n",
258                                cl_idx);
259                         LBUG();
260                 }
261         }
262
263         fed->fed_lr_idx = cl_idx;
264         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
265                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
266
267         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
268                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
269
270         if (new_client) {
271                 struct obd_run_ctxt saved;
272                 loff_t off = fed->fed_lr_off;
273                 ssize_t written;
274                 void *handle;
275
276                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
277                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
278
279                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
280                 /* Transaction eeded to fix for bug 1403 */
281                 handle = fsfilt_start(obd,
282                                       filter->fo_rcvd_filp->f_dentry->d_inode,
283                                       FSFILT_OP_SETATTR);
284                 if (IS_ERR(handle)) {
285                         written = PTR_ERR(handle);
286                         CERROR("unable to start transaction: rc %d\n",
287                                (int)written);
288                 } else {
289                         written = lustre_fwrite(filter->fo_rcvd_filp,
290                                                 (char *)fed->fed_fcd,
291                                                 sizeof(*fed->fed_fcd), &off);
292                         fsfilt_commit(obd,
293                                       filter->fo_rcvd_filp->f_dentry->d_inode,
294                                       handle, 0);
295                 }
296                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
297
298                 if (written != sizeof(*fed->fed_fcd)) {
299                         if (written < 0)
300                                 RETURN(written);
301                         RETURN(-EIO);
302                 }
303         }
304         return 0;
305 }
306
307 int filter_client_free(struct obd_export *exp, int failover)
308 {
309         struct filter_export_data *fed = &exp->exp_filter_data;
310         struct filter_obd *filter = &exp->exp_obd->u.filter;
311         struct filter_client_data zero_fcd;
312         struct obd_run_ctxt saved;
313         int written;
314         loff_t off;
315         ENTRY;
316
317         if (!fed->fed_fcd)
318                 RETURN(0);
319
320         if (failover != 0)
321                 GOTO(free, 0);
322
323         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
324         if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
325                 GOTO(free, 0);
326
327         LASSERT(filter->fo_last_rcvd_slots != NULL);
328
329         off = fed->fed_lr_off;
330
331         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
332                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
333
334         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
335                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
336                        fed->fed_lr_idx);
337                 LBUG();
338         }
339
340         memset(&zero_fcd, 0, sizeof zero_fcd);
341         push_ctxt(&saved, &filter->fo_ctxt, NULL);
342         written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
343                                 sizeof(zero_fcd), &off);
344
345         /* XXX: this write gets lost sometimes, unless this sync is here. */
346         if (written > 0)
347                 file_fsync(filter->fo_rcvd_filp,
348                            filter->fo_rcvd_filp->f_dentry, 1);
349         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
350
351         if (written != sizeof(zero_fcd)) {
352                 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
353                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
354                        LAST_RCVD, written);
355         } else {
356                 CDEBUG(D_INFO,
357                        "zeroed disconnecting client %s at idx %u (%llu)\n",
358                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
359         }
360
361 free:
362         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
363
364         RETURN(0);
365 }
366
367 static int filter_free_server_data(struct filter_obd *filter)
368 {
369         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
370         filter->fo_fsd = NULL;
371         OBD_FREE(filter->fo_last_rcvd_slots,
372                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
373         filter->fo_last_rcvd_slots = NULL;
374         return 0;
375 }
376
377
378 /* assumes caller is already in kernel ctxt */
379 static int filter_update_server_data(struct file *filp,
380                                      struct filter_server_data *fsd)
381 {
382         loff_t off = 0;
383         int rc;
384
385         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
386         CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
387                le64_to_cpu(fsd->fsd_last_objid));
388         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
389                le64_to_cpu(fsd->fsd_last_rcvd));
390         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
391                le64_to_cpu(fsd->fsd_mount_count));
392
393         rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
394         if (rc != sizeof(*fsd)) {
395                 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
396                        rc);
397                 RETURN(-EIO);
398         }
399         RETURN(0);
400 }
401
402 /* assumes caller has already in kernel ctxt */
403 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
404                                    __u64 init_lastobjid)
405 {
406         struct filter_obd *filter = &obd->u.filter;
407         struct filter_server_data *fsd;
408         struct filter_client_data *fcd = NULL;
409         struct inode *inode = filp->f_dentry->d_inode;
410         unsigned long last_rcvd_size = inode->i_size;
411         __u64 mount_count = 0;
412         int cl_idx;
413         loff_t off = 0;
414         int rc;
415
416         /* ensure padding in the struct is the correct size */
417         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
418                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
419         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
420                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
421
422         OBD_ALLOC(fsd, sizeof(*fsd));
423         if (!fsd)
424                 RETURN(-ENOMEM);
425         filter->fo_fsd = fsd;
426
427         OBD_ALLOC(filter->fo_last_rcvd_slots,
428                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
429         if (filter->fo_last_rcvd_slots == NULL) {
430                 OBD_FREE(fsd, sizeof(*fsd));
431                 RETURN(-ENOMEM);
432         }
433
434         if (last_rcvd_size == 0) {
435                 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
436
437                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
438                 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
439                 fsd->fsd_last_rcvd = 0;
440                 mount_count = fsd->fsd_mount_count = 0;
441                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
442                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
443                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
444                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
445                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
446         } else {
447                 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
448                                               &off);
449                 if (retval != sizeof(*fsd)) {
450                         CDEBUG(D_INODE,"OBD filter: error reading %s\n",
451                                LAST_RCVD);
452                         GOTO(err_fsd, rc = -EIO);
453                 }
454                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
455                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
456         }
457
458         if (fsd->fsd_feature_incompat) {
459                 CERROR("unsupported feature %x\n",
460                        le32_to_cpu(fsd->fsd_feature_incompat));
461                 GOTO(err_fsd, rc = -EINVAL);
462         }
463         if (fsd->fsd_feature_rocompat) {
464                 CERROR("read-only feature %x\n",
465                        le32_to_cpu(fsd->fsd_feature_rocompat));
466                 /* Do something like remount filesystem read-only */
467                 GOTO(err_fsd, rc = -EINVAL);
468         }
469
470         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
471                obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
472         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
473                obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
474         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
475                obd->obd_name, mount_count);
476         CDEBUG(D_INODE, "%s: server data size: %u\n",
477                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
478         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
479                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
480         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
481                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
482         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
483                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
484
485         /*
486          * When we do a clean FILTER shutdown, we save the last_rcvd into
487          * the header.  If we find clients with higher last_rcvd values
488          * then those clients may need recovery done.
489          */
490         if (!obd->obd_replayable) {
491                 CERROR("%s: recovery support OFF\n", obd->obd_name);
492                 GOTO(out, rc = 0);
493         }
494
495         for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
496                 __u64 last_rcvd;
497                 int mount_age;
498
499                 if (!fcd) {
500                         OBD_ALLOC(fcd, sizeof(*fcd));
501                         if (!fcd)
502                                 GOTO(err_fsd, rc = -ENOMEM);
503                 }
504
505                 /* Don't assume off is incremented properly, in case
506                  * sizeof(fsd) isn't the same as fsd->fsd_client_size.
507                  */
508                 off = le32_to_cpu(fsd->fsd_client_start) +
509                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
510                 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
511                 if (rc != sizeof(*fcd)) {
512                         CERROR("error reading FILTER %s offset %d: rc = %d\n",
513                                LAST_RCVD, cl_idx, rc);
514                         if (rc > 0) /* XXX fatal error or just abort reading? */
515                                 rc = -EIO;
516                         break;
517                 }
518
519                 if (fcd->fcd_uuid[0] == '\0') {
520                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
521                                cl_idx);
522                         continue;
523                 }
524
525                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
526
527                 /* These exports are cleaned up by filter_disconnect(), so they
528                  * need to be set up like real exports as filter_connect() does.
529                  */
530                 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
531                 if (mount_age < FILTER_MOUNT_RECOV) {
532                         struct obd_export *exp = class_new_export(obd);
533                         struct filter_export_data *fed;
534                         CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
535                                " srv lr: "LPU64" mnt: "LPU64" last mount: "
536                                LPU64"\n", fcd->fcd_uuid, cl_idx,
537                                last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
538                                le64_to_cpu(fcd->fcd_mount_count), mount_count);
539                         if (exp == NULL) {
540                                 /* XXX this rc is ignored  */
541                                 rc = -ENOMEM;
542                                 break;
543                         }
544                         memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
545                                sizeof exp->exp_client_uuid.uuid);
546                         fed = &exp->exp_filter_data;
547                         fed->fed_fcd = fcd;
548                         filter_client_add(obd, filter, fed, cl_idx);
549                         /* create helper if export init gets more complex */
550                         INIT_LIST_HEAD(&fed->fed_open_head);
551                         spin_lock_init(&fed->fed_lock);
552
553                         fcd = NULL;
554                         obd->obd_recoverable_clients++;
555                         class_export_put(exp);
556                 } else {
557                         CDEBUG(D_INFO,
558                                "discarded client %d UUID '%s' count "LPU64"\n",
559                                cl_idx, fcd->fcd_uuid,
560                                le64_to_cpu(fcd->fcd_mount_count));
561                 }
562
563                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
564                        cl_idx, last_rcvd);
565
566                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
567                         filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
568
569                 obd->obd_last_committed =
570                         le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
571                 if (obd->obd_recoverable_clients) {
572                         CERROR("RECOVERY: %d recoverable clients, last_rcvd "
573                                LPU64"\n", obd->obd_recoverable_clients,
574                                le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
575                         obd->obd_next_recovery_transno =
576                                 obd->obd_last_committed + 1;
577                         obd->obd_recovering = 1;
578                 }
579
580         }
581
582         if (fcd)
583                 OBD_FREE(fcd, sizeof(*fcd));
584
585 out:
586         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
587
588         /* save it,so mount count and last_recvd is current */
589         rc = filter_update_server_data(filp, filter->fo_fsd);
590
591         RETURN(rc);
592
593 err_fsd:
594         filter_free_server_data(filter);
595         RETURN(rc);
596 }
597
598 /* setup the object store with correct subdirectories */
599 static int filter_prep(struct obd_device *obd)
600 {
601         struct obd_run_ctxt saved;
602         struct filter_obd *filter = &obd->u.filter;
603         struct dentry *dentry, *O_dentry;
604         struct file *file;
605         struct inode *inode;
606         int i;
607         int rc = 0;
608         int mode = 0;
609
610         push_ctxt(&saved, &filter->fo_ctxt, NULL);
611         dentry = simple_mkdir(current->fs->pwd, "O", 0700);
612         CDEBUG(D_INODE, "got/created O: %p\n", dentry);
613         if (IS_ERR(dentry)) {
614                 rc = PTR_ERR(dentry);
615                 CERROR("cannot open/create O: rc = %d\n", rc);
616                 GOTO(out, rc);
617         }
618         filter->fo_dentry_O = dentry;
619
620         /*
621          * Create directories and/or get dentries for each object type.
622          * This saves us from having to do multiple lookups for each one.
623          */
624         O_dentry = filter->fo_dentry_O;
625         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
626                 char *name = obd_type_by_mode[mode];
627
628                 if (!name) {
629                         filter->fo_dentry_O_mode[mode] = NULL;
630                         continue;
631                 }
632                 dentry = simple_mkdir(O_dentry, name, 0700);
633                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
634                 if (IS_ERR(dentry)) {
635                         rc = PTR_ERR(dentry);
636                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
637                         GOTO(err_O_mode, rc);
638                 }
639                 filter->fo_dentry_O_mode[mode] = dentry;
640         }
641
642         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
643         if (!file || IS_ERR(file)) {
644                 rc = PTR_ERR(file);
645                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
646                        LAST_RCVD, rc);
647                 GOTO(err_O_mode, rc);
648         }
649
650         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
651                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
652                        file->f_dentry->d_inode->i_mode);
653                 GOTO(err_filp, rc = -ENOENT);
654         }
655
656         rc = fsfilt_journal_data(obd, file);
657         if (rc) {
658                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
659                 GOTO(err_filp, rc);
660         }
661         /* steal operations */
662         inode = file->f_dentry->d_inode;
663         filter->fo_fop = file->f_op;
664         filter->fo_iop = inode->i_op;
665         filter->fo_aops = inode->i_mapping->a_ops;
666
667         rc = filter_init_server_data(obd, file, INIT_OBJID);
668         if (rc) {
669                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
670                 GOTO(err_client, rc);
671         }
672         filter->fo_rcvd_filp = file;
673
674         if (filter->fo_subdir_count) {
675                 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
676                 OBD_ALLOC(filter->fo_dentry_O_sub,
677                           filter->fo_subdir_count * sizeof(dentry));
678                 if (!filter->fo_dentry_O_sub)
679                         GOTO(err_client, rc = -ENOMEM);
680
681                 for (i = 0; i < filter->fo_subdir_count; i++) {
682                         char dir[20];
683                         snprintf(dir, sizeof(dir), "d%u", i);
684
685                         dentry = simple_mkdir(O_dentry, dir, 0700);
686                         CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
687                         if (IS_ERR(dentry)) {
688                                 rc = PTR_ERR(dentry);
689                                 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
690                                 GOTO(err_O_sub, rc);
691                         }
692                         filter->fo_dentry_O_sub[i] = dentry;
693                 }
694         }
695         rc = 0;
696  out:
697         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
698
699         return(rc);
700
701 err_O_sub:
702         while (i-- > 0) {
703                 struct dentry *dentry = filter->fo_dentry_O_sub[i];
704                 if (dentry) {
705                         f_dput(dentry);
706                         filter->fo_dentry_O_sub[i] = NULL;
707                 }
708         }
709         OBD_FREE(filter->fo_dentry_O_sub,
710                  filter->fo_subdir_count * sizeof(dentry));
711 err_client:
712         class_disconnect_exports(obd, 0);
713 err_filp:
714         if (filp_close(file, 0))
715                 CERROR("can't close %s after error\n", LAST_RCVD);
716         filter->fo_rcvd_filp = NULL;
717 err_O_mode:
718         while (mode-- > 0) {
719                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
720                 if (dentry) {
721                         f_dput(dentry);
722                         filter->fo_dentry_O_mode[mode] = NULL;
723                 }
724         }
725         f_dput(filter->fo_dentry_O);
726         filter->fo_dentry_O = NULL;
727         goto out;
728 }
729
730 /* cleanup the filter: write last used object id to status file */
731 static void filter_post(struct obd_device *obd)
732 {
733         struct obd_run_ctxt saved;
734         struct filter_obd *filter = &obd->u.filter;
735         long rc;
736         int mode;
737
738         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
739          * best to start a transaction with h_sync, because we removed this
740          * from lastobjid */
741
742         push_ctxt(&saved, &filter->fo_ctxt, NULL);
743         rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
744         if (rc)
745                 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
746
747
748         if (filter->fo_rcvd_filp) {
749                 rc = file_fsync(filter->fo_rcvd_filp,
750                                 filter->fo_rcvd_filp->f_dentry, 1);
751                 filp_close(filter->fo_rcvd_filp, 0);
752                 filter->fo_rcvd_filp = NULL;
753                 if (rc)
754                         CERROR("last_rcvd file won't closed rc = %ld\n", rc);
755         }
756
757         if (filter->fo_subdir_count) {
758                 int i;
759                 for (i = 0; i < filter->fo_subdir_count; i++) {
760                         struct dentry *dentry = filter->fo_dentry_O_sub[i];
761                         f_dput(dentry);
762                         filter->fo_dentry_O_sub[i] = NULL;
763                 }
764                 OBD_FREE(filter->fo_dentry_O_sub,
765                          filter->fo_subdir_count *
766                          sizeof(*filter->fo_dentry_O_sub));
767         }
768         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
769                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
770                 if (dentry) {
771                         f_dput(dentry);
772                         filter->fo_dentry_O_mode[mode] = NULL;
773                 }
774         }
775         f_dput(filter->fo_dentry_O);
776         filter_free_server_data(filter);
777         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
778 }
779
780
781 static __u64 filter_next_id(struct filter_obd *filter)
782 {
783         obd_id id;
784         LASSERT(filter->fo_fsd != NULL);
785
786         spin_lock(&filter->fo_objidlock);
787         id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
788         filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
789         spin_unlock(&filter->fo_objidlock);
790
791         return id;
792 }
793
794 /* direct cut-n-paste of mds_blocking_ast() */
795 int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
796                      void *data, int flag)
797 {
798         int do_ast;
799         ENTRY;
800
801         if (flag == LDLM_CB_CANCELING) {
802                 /* Don't need to do anything here. */
803                 RETURN(0);
804         }
805
806         /* XXX layering violation!  -phil */
807         l_lock(&lock->l_resource->lr_namespace->ns_lock);
808         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
809          * such that mds_blocking_ast is called just before l_i_p takes the
810          * ns_lock, then by the time we get the lock, we might not be the
811          * correct blocking function anymore.  So check, and return early, if
812          * so. */
813         if (lock->l_blocking_ast != filter_blocking_ast) {
814                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
815                 RETURN(0);
816         }
817
818         lock->l_flags |= LDLM_FL_CBPENDING;
819         do_ast = (!lock->l_readers && !lock->l_writers);
820         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
821
822         if (do_ast) {
823                 struct lustre_handle lockh;
824                 int rc;
825
826                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
827                 ldlm_lock2handle(lock, &lockh);
828                 rc = ldlm_cli_cancel(&lockh);
829                 if (rc < 0)
830                         CERROR("ldlm_cli_cancel: %d\n", rc);
831         } else {
832                 LDLM_DEBUG(lock, "Lock still has references, will be "
833                            "cancelled later");
834         }
835         RETURN(0);
836 }
837
838 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
839                               ldlm_mode_t lock_mode,struct lustre_handle *lockh)
840 {
841         struct ldlm_res_id res_id = { .name = {0} };
842         int flags = 0, rc;
843         ENTRY;
844
845         res_id.name[0] = de->d_inode->i_ino;
846         res_id.name[1] = de->d_inode->i_generation;
847         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
848                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
849                               &flags, ldlm_completion_ast,
850                               filter_blocking_ast, NULL, lockh);
851
852         RETURN(rc == ELDLM_OK ? 0 : -ENOLCK);  /* XXX translate ldlm code */
853 }
854
855 static void filter_parent_unlock(struct dentry *dparent,
856                                  struct lustre_handle *lockh,
857                                  ldlm_mode_t lock_mode)
858 {
859         ldlm_lock_decref(lockh, lock_mode);
860 }
861
862 /* We never dget the object parent, so DON'T dput it either */
863 static inline struct dentry *filter_parent(struct obd_device *obd,
864                                            obd_mode mode, obd_id objid)
865 {
866         struct filter_obd *filter = &obd->u.filter;
867
868         LASSERT(S_ISREG(mode));   /* only regular files for now */
869         if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
870                 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
871
872         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
873 }
874
875 /* We never dget the object parent, so DON'T dput it either */
876 static inline struct dentry *filter_parent_lock(struct obd_device *obd,
877                                                 obd_mode mode, obd_id objid,
878                                                 ldlm_mode_t lock_mode,
879                                                 struct lustre_handle *lockh)
880 {
881         unsigned long now = jiffies;
882         struct dentry *de = filter_parent(obd, mode, objid);
883         int rc;
884
885         if (IS_ERR(de))
886                 return de;
887
888         rc = filter_lock_dentry(obd, de, lock_mode, lockh);
889         if (time_after(jiffies, now + 15*HZ))
890                 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
891         return rc ? ERR_PTR(rc) : de;
892 }
893
894 /* How to get files, dentries, inodes from object id's.
895  *
896  * If dir_dentry is passed, the caller has already locked the parent
897  * appropriately for this operation (normally a write lock).  If
898  * dir_dentry is NULL, we do a read lock while we do the lookup to
899  * avoid races with create/destroy and such changing the directory
900  * internal to the filesystem code.
901  */
902 static struct dentry *filter_fid2dentry(struct obd_device *obd,
903                                         struct dentry *dir_dentry,
904                                         obd_mode mode, obd_id id)
905 {
906         struct super_block *sb = obd->u.filter.fo_sb;
907         struct lustre_handle lockh;
908         struct dentry *dparent = dir_dentry;
909         struct dentry *dchild;
910         char name[32];
911         int len;
912         ENTRY;
913
914         if (!sb || !sb->s_dev) {
915                 CERROR("device not initialized.\n");
916                 RETURN(ERR_PTR(-ENXIO));
917         }
918
919         if (id == 0) {
920                 CERROR("fatal: invalid object id 0\n");
921                 LBUG();
922                 RETURN(ERR_PTR(-ESTALE));
923         }
924
925         len = sprintf(name, LPU64, id);
926         if (!dir_dentry) {
927                 dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
928                 if (IS_ERR(dparent))
929                         RETURN(dparent);
930         }
931         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
932                dparent->d_name.len, dparent->d_name.name, name);
933         dchild = ll_lookup_one_len(name, dparent, len);
934         if (!dir_dentry)
935                 filter_parent_unlock(dparent, &lockh, LCK_PR);
936         if (IS_ERR(dchild)) {
937                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
938                 RETURN(dchild);
939         }
940
941         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
942                name, dchild, atomic_read(&dchild->d_count));
943
944         LASSERT(atomic_read(&dchild->d_count) > 0);
945
946         RETURN(dchild);
947 }
948
949 static struct file *filter_obj_open(struct obd_export *export,
950                                     __u64 id, __u32 type,
951                                     ldlm_mode_t parent_mode,
952                                     struct lustre_handle *parent_lockh)
953 {
954         struct obd_device *obd = export->exp_obd;
955         struct filter_obd *filter = &obd->u.filter;
956         struct super_block *sb = filter->fo_sb;
957         struct dentry *dchild = NULL, *dparent = NULL;
958         struct filter_export_data *fed = &export->exp_filter_data;
959         struct filter_dentry_data *fdd = NULL;
960         struct filter_file_data *ffd = NULL;
961         struct obd_run_ctxt saved;
962         char name[24];
963         struct file *file;
964         int len, cleanup_phase = 0;
965         ENTRY;
966
967         push_ctxt(&saved, &filter->fo_ctxt, NULL);
968
969         if (!sb || !sb->s_dev) {
970                 CERROR("fatal: device not initialized.\n");
971                 GOTO(cleanup, file = ERR_PTR(-ENXIO));
972         }
973
974         if (!id) {
975                 CERROR("fatal: invalid obdo "LPU64"\n", id);
976                 GOTO(cleanup, file = ERR_PTR(-ESTALE));
977         }
978
979         if (!(type & S_IFMT)) {
980                 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
981                        __FUNCTION__, id, type);
982                 GOTO(cleanup, file = ERR_PTR(-EINVAL));
983         }
984
985         ffd = filter_ffd_new();
986         if (ffd == NULL) {
987                 CERROR("obdfilter: out of memory\n");
988                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
989         }
990
991         cleanup_phase = 1;
992
993         /* We preallocate this to avoid blocking while holding fo_fddlock */
994         OBD_ALLOC(fdd, sizeof *fdd);
995         if (fdd == NULL) {
996                 CERROR("obdfilter: out of memory\n");
997                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
998         }
999
1000         cleanup_phase = 2;
1001
1002         dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1003         if (IS_ERR(dparent))
1004                 GOTO(cleanup, file = (void *)dparent);
1005
1006         cleanup_phase = 3;
1007
1008         len = snprintf(name, sizeof(name), LPU64, id);
1009         dchild = ll_lookup_one_len(name, dparent, len);
1010         if (IS_ERR(dchild))
1011                 GOTO(cleanup, file = (void *)dchild);
1012
1013         cleanup_phase = 4;
1014
1015         if (dchild->d_inode == NULL) {
1016                 CERROR("opening non-existent object %s - O_CREAT?\n", name);
1017                 file = ERR_PTR(-ENOENT);
1018                 GOTO(cleanup, file);
1019         }
1020
1021         /* dentry_open does a dput(dchild) and mntput(mnt) on error */
1022         mntget(filter->fo_vfsmnt);
1023         file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1024         if (IS_ERR(file)) {
1025                 dchild = NULL; /* prevent a double dput in step 4 */
1026                 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1027                 GOTO(cleanup, file);
1028         }
1029
1030         spin_lock(&filter->fo_fddlock);
1031         if (dchild->d_fsdata) {
1032                 spin_unlock(&filter->fo_fddlock);
1033                 OBD_FREE(fdd, sizeof *fdd);
1034                 fdd = dchild->d_fsdata;
1035                 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1036                 /* should only happen during client recovery */
1037                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1038                         CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1039                 atomic_inc(&fdd->fdd_open_count);
1040         } else {
1041                 atomic_set(&fdd->fdd_open_count, 1);
1042                 fdd->fdd_magic = FILTER_DENTRY_MAGIC;
1043                 fdd->fdd_flags = 0;
1044                 fdd->fdd_objid = id;
1045                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1046                 dchild->d_fsdata = fdd;
1047                 spin_unlock(&filter->fo_fddlock);
1048         }
1049
1050         ffd->ffd_file = file;
1051         LASSERT(file->private_data == NULL);
1052         file->private_data = ffd;
1053
1054         if (!dchild->d_op)
1055                 dchild->d_op = &filter_dops;
1056         else
1057                 LASSERT(dchild->d_op == &filter_dops);
1058
1059         spin_lock(&fed->fed_lock);
1060         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1061         spin_unlock(&fed->fed_lock);
1062
1063         CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1064 cleanup:
1065         switch (cleanup_phase) {
1066         case 4:
1067                 if (IS_ERR(file))
1068                         f_dput(dchild);
1069         case 3:
1070                 if (IS_ERR(file))
1071                         filter_parent_unlock(dparent, parent_lockh,parent_mode);
1072         case 2:
1073                 if (IS_ERR(file))
1074                         OBD_FREE(fdd, sizeof *fdd);
1075         case 1:
1076                 if (IS_ERR(file))
1077                         filter_ffd_destroy(ffd);
1078                 filter_ffd_put(ffd);
1079         case 0:
1080                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1081         }
1082         RETURN(file);
1083 }
1084
1085 /* Caller must hold LCK_PW on parent and push us into kernel context.
1086  * Caller is also required to ensure that dchild->d_inode exists.
1087  */
1088 static int filter_destroy_internal(struct obd_device *obd,
1089                                    struct dentry *dparent,
1090                                    struct dentry *dchild)
1091 {
1092         struct inode *inode = dchild->d_inode;
1093         int rc;
1094         ENTRY;
1095
1096         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1097                 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1098                        dchild->d_name.len, dchild->d_name.name,
1099                        inode->i_nlink, atomic_read(&inode->i_count));
1100         }
1101
1102         rc = vfs_unlink(dparent->d_inode, dchild);
1103
1104         if (rc)
1105                 CERROR("error unlinking objid %*s: rc %d\n",
1106                        dchild->d_name.len, dchild->d_name.name, rc);
1107
1108         RETURN(rc);
1109 }
1110
1111 /* If closing because we are failing this device, then
1112    don't do the unlink on close.
1113 */
1114 static int filter_close_internal(struct obd_export *exp,
1115                                  struct filter_file_data *ffd,
1116                                  struct obd_trans_info *oti,
1117                                  int failover)
1118 {
1119         struct obd_device *obd = exp->exp_obd;
1120         struct filter_obd *filter = &obd->u.filter;
1121         struct file *filp = ffd->ffd_file;
1122         struct dentry *dchild = dget(filp->f_dentry);
1123         struct filter_dentry_data *fdd = dchild->d_fsdata;
1124         struct lustre_handle parent_lockh;
1125         int rc, rc2, cleanup_phase = 0;
1126         struct dentry *dparent = NULL;
1127         struct obd_run_ctxt saved;
1128         ENTRY;
1129
1130         LASSERT(filp->private_data == ffd);
1131         LASSERT(fdd);
1132         LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1133
1134         rc = filp_close(filp, 0);
1135
1136         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1137             fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
1138                 void *handle;
1139
1140                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1141                 cleanup_phase = 1;
1142
1143                 LASSERT(fdd->fdd_objid > 0);
1144                 dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1145                                              LCK_PW, &parent_lockh);
1146                 if (IS_ERR(dparent))
1147                         GOTO(cleanup, rc = PTR_ERR(dparent));
1148                 cleanup_phase = 2;
1149
1150                 handle = fsfilt_start(obd, dparent->d_inode,
1151                                       FSFILT_OP_UNLINK);
1152                 if (IS_ERR(handle))
1153                         GOTO(cleanup, rc = PTR_ERR(handle));
1154
1155                 /* XXX unlink from PENDING directory now too */
1156                 rc2 = filter_destroy_internal(obd, dparent, dchild);
1157                 if (rc2 && !rc)
1158                         rc = rc2;
1159                 rc = filter_finish_transno(exp, handle, oti, rc);
1160                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1161                 if (rc2) {
1162                         CERROR("error on commit, err = %d\n", rc2);
1163                         if (!rc)
1164                                 rc = rc2;
1165                 }
1166         }
1167
1168 cleanup:
1169         switch(cleanup_phase) {
1170         case 2:
1171                 if (rc || oti == NULL) {
1172                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1173                 } else {
1174                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1175                                sizeof(parent_lockh));
1176                         oti->oti_ack_locks[0].mode = LCK_PW;
1177                 }
1178         case 1:
1179                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1180         case 0:
1181                 f_dput(dchild);
1182                 filter_ffd_destroy(ffd);
1183                 break;
1184         default:
1185                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1186                 LBUG();
1187         }
1188
1189         RETURN(rc);
1190 }
1191
1192 /* obd methods */
1193 /* mount the file system (secretly) */
1194 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1195                                char *option)
1196 {
1197         struct obd_ioctl_data* data = buf;
1198         struct filter_obd *filter = &obd->u.filter;
1199
1200         struct vfsmount *mnt;
1201         int rc = 0;
1202         ENTRY;
1203
1204         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1205                 RETURN(-EINVAL);
1206
1207         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1208         if (IS_ERR(obd->obd_fsops))
1209                 RETURN(PTR_ERR(obd->obd_fsops));
1210
1211         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1212         rc = PTR_ERR(mnt);
1213         if (IS_ERR(mnt))
1214                 GOTO(err_ops, rc);
1215
1216         if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1217                 if (*data->ioc_inlbuf3 == 'f') {
1218                         obd->obd_replayable = 1;
1219                         obd_sync_filter = 1;
1220                         CERROR("%s: configured for recovery and sync write\n",
1221                                obd->obd_name);
1222                 } else {
1223                         if (*data->ioc_inlbuf3 != 'n') {
1224                                 CERROR("unrecognised flag '%c'\n",
1225                                        *data->ioc_inlbuf3);
1226                         }
1227                 }
1228         }
1229
1230         if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
1231                 if (*data->ioc_inlbuf4 == '/') {
1232                         CERROR("filter namespace mount: %s\n",
1233                                data->ioc_inlbuf4);
1234                         filter->fo_nspath = strdup(data->ioc_inlbuf4);
1235                 } else {
1236                         CERROR("namespace mount must be absolute path: '%s'\n",
1237                                data->ioc_inlbuf4);
1238                 }
1239         }
1240
1241         filter->fo_vfsmnt = mnt;
1242         filter->fo_sb = mnt->mnt_sb;
1243         filter->fo_fstype = mnt->mnt_sb->s_type->name;
1244         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1245
1246         OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1247         filter->fo_ctxt.pwdmnt = mnt;
1248         filter->fo_ctxt.pwd = mnt->mnt_root;
1249         filter->fo_ctxt.fs = get_ds();
1250
1251         rc = filter_prep(obd);
1252         if (rc)
1253                 GOTO(err_mntput, rc);
1254
1255         spin_lock_init(&filter->fo_translock);
1256         spin_lock_init(&filter->fo_fddlock);
1257         spin_lock_init(&filter->fo_objidlock);
1258         INIT_LIST_HEAD(&filter->fo_export_list);
1259
1260         obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1261                                                 LDLM_NAMESPACE_SERVER);
1262         if (!obd->obd_namespace)
1263                 GOTO(err_post, rc = -ENOMEM);
1264
1265         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1266                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1267
1268         RETURN(0);
1269
1270 err_post:
1271         filter_post(obd);
1272 err_mntput:
1273         unlock_kernel();
1274         mntput(mnt);
1275         filter->fo_sb = 0;
1276         lock_kernel();
1277 err_ops:
1278         fsfilt_put_ops(obd->obd_fsops);
1279         return rc;
1280 }
1281
1282 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1283 {
1284         struct obd_ioctl_data* data = buf;
1285         char *option = NULL;
1286
1287         if (!strcmp(data->ioc_inlbuf2, "ext3"))
1288                 option = "asyncdel";
1289
1290         return filter_common_setup(obd, len, buf, option);
1291 }
1292
1293 /* sanobd setup methods - use a specific mount option */
1294 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1295 {
1296         struct obd_ioctl_data* data = buf;
1297         char *option = NULL;
1298
1299         if (!data->ioc_inlbuf2)
1300                 RETURN(-EINVAL);
1301
1302         /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1303         if (!strcmp(data->ioc_inlbuf2, "extN"))
1304                 option = "data=writeback";
1305         else if (!strcmp(data->ioc_inlbuf2, "ext3"))
1306                 option = "data=writeback,asyncdel";
1307         else
1308                 LBUG(); /* just a reminder */
1309
1310         return filter_common_setup(obd, len, buf, option);
1311 }
1312
1313 static int filter_cleanup(struct obd_device *obd, int force, int failover)
1314 {
1315         struct super_block *sb;
1316         ENTRY;
1317
1318         if (failover)
1319                 CERROR("%s: shutting down for failover; client state will"
1320                        " be preserved.\n", obd->obd_name);
1321
1322         if (!list_empty(&obd->obd_exports)) {
1323                 CERROR("%s: still has clients!\n", obd->obd_name);
1324                 class_disconnect_exports(obd, failover);
1325                 if (!list_empty(&obd->obd_exports)) {
1326                         CERROR("still has exports after forced cleanup?\n");
1327                         RETURN(-EBUSY);
1328                 }
1329         }
1330
1331         ldlm_namespace_free(obd->obd_namespace);
1332
1333         sb = obd->u.filter.fo_sb;
1334         if (!sb)
1335                 RETURN(0);
1336
1337         filter_post(obd);
1338
1339         shrink_dcache_parent(sb->s_root);
1340         unlock_kernel();
1341
1342         if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
1343                 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1344                        atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
1345         }
1346
1347         mntput(obd->u.filter.fo_vfsmnt);
1348         obd->u.filter.fo_sb = 0;
1349 /*        destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
1350
1351         fsfilt_put_ops(obd->obd_fsops);
1352         lock_kernel();
1353
1354         RETURN(0);
1355 }
1356
1357 int filter_attach(struct obd_device *obd, obd_count len, void *data)
1358 {
1359         struct lprocfs_static_vars lvars;
1360         int rc;
1361
1362         lprocfs_init_vars(&lvars);
1363         rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1364         if (rc != 0)
1365                 return rc;
1366
1367         rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1368         if (rc != 0)
1369                 return rc;
1370
1371         /* Init obdfilter private stats here */
1372         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1373                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1374         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1375                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1376         return rc;
1377 }
1378
1379 int filter_detach(struct obd_device *dev)
1380 {
1381         lprocfs_free_obd_stats(dev);
1382         return lprocfs_obd_detach(dev);
1383 }
1384
1385 /* nearly identical to mds_connect */
1386 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1387                           struct obd_uuid *cluuid)
1388 {
1389         struct obd_export *exp;
1390         struct filter_export_data *fed;
1391         struct filter_client_data *fcd;
1392         struct filter_obd *filter = &obd->u.filter;
1393         int rc;
1394
1395         ENTRY;
1396
1397         if (!conn || !obd || !cluuid)
1398                 RETURN(-EINVAL);
1399
1400         rc = class_connect(conn, obd, cluuid);
1401         if (rc)
1402                 RETURN(rc);
1403         exp = class_conn2export(conn);
1404         LASSERT(exp);
1405
1406         fed = &exp->exp_filter_data;
1407         class_export_put(exp);
1408
1409         INIT_LIST_HEAD(&fed->fed_open_head);
1410         spin_lock_init(&fed->fed_lock);
1411
1412         if (!obd->obd_replayable)
1413                 RETURN(0);
1414
1415         OBD_ALLOC(fcd, sizeof(*fcd));
1416         if (!fcd) {
1417                 CERROR("filter: out of memory for client data\n");
1418                 GOTO(out_export, rc = -ENOMEM);
1419         }
1420
1421         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1422         fed->fed_fcd = fcd;
1423         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1424
1425         rc = filter_client_add(obd, filter, fed, -1);
1426         if (rc)
1427                 GOTO(out_fcd, rc);
1428
1429         RETURN(rc);
1430
1431 out_fcd:
1432         OBD_FREE(fcd, sizeof(*fcd));
1433 out_export:
1434         class_disconnect(conn, 0);
1435
1436         RETURN(rc);
1437 }
1438
1439 static void filter_destroy_export(struct obd_export *exp)
1440 {
1441         struct filter_export_data *fed = &exp->exp_filter_data;
1442
1443         ENTRY;
1444         spin_lock(&fed->fed_lock);
1445         while (!list_empty(&fed->fed_open_head)) {
1446                 struct filter_file_data *ffd;
1447
1448                 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1449                                  ffd_export_list);
1450                 list_del(&ffd->ffd_export_list);
1451                 spin_unlock(&fed->fed_lock);
1452
1453                 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1454                        ffd->ffd_file->f_dentry->d_name.len,
1455                        ffd->ffd_file->f_dentry->d_name.name,
1456                        ffd, ffd->ffd_handle.h_cookie);
1457
1458                 filter_close_internal(exp, ffd, NULL, exp->exp_failover);
1459                 spin_lock(&fed->fed_lock);
1460         }
1461         spin_unlock(&fed->fed_lock);
1462
1463         if (exp->exp_obd->obd_replayable)
1464                 filter_client_free(exp, exp->exp_failover);
1465         EXIT;
1466 }
1467
1468 /* also incredibly similar to mds_disconnect */
1469 static int filter_disconnect(struct lustre_handle *conn, int failover)
1470 {
1471         struct obd_export *exp = class_conn2export(conn);
1472         int rc;
1473         unsigned long flags;
1474         ENTRY;
1475
1476         LASSERT(exp);
1477         ldlm_cancel_locks_for_export(exp);
1478
1479         spin_lock_irqsave(&exp->exp_lock, flags);
1480         exp->exp_failover = failover;
1481         spin_unlock_irqrestore(&exp->exp_lock, flags);
1482
1483         rc = class_disconnect(conn, failover);
1484
1485         fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1486         class_export_put(exp);
1487         /* XXX cleanup preallocated inodes */
1488         RETURN(rc);
1489 }
1490
1491 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1492 {
1493         int type = oa->o_mode & S_IFMT;
1494         ENTRY;
1495
1496         CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1497                inode->i_ino, inode, oa->o_id, valid);
1498         /* Don't copy the inode number in place of the object ID */
1499         obdo_from_inode(oa, inode, valid);
1500         oa->o_mode &= ~S_IFMT;
1501         oa->o_mode |= type;
1502
1503         if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1504                 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1505                 oa->o_rdev = rdev;
1506                 oa->o_valid |= OBD_MD_FLRDEV;
1507         }
1508
1509         EXIT;
1510 }
1511
1512 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1513                                          struct obdo *oa, char *what)
1514 {
1515         struct dentry *dchild = NULL;
1516
1517         if (oa->o_valid & OBD_MD_FLHANDLE) {
1518                 struct lustre_handle *ost_handle = obdo_handle(oa);
1519                 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1520
1521                 if (ffd != NULL) {
1522                         struct filter_dentry_data *fdd;
1523                         dchild = dget(ffd->ffd_file->f_dentry);
1524                         fdd = dchild->d_fsdata;
1525                         LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1526                         filter_ffd_put(ffd);
1527
1528                         CDEBUG(D_INODE,
1529                                "got child objid %*s: %p, count = %d\n",
1530                                dchild->d_name.len, dchild->d_name.name,
1531                                dchild, atomic_read(&dchild->d_count));
1532                 }
1533         }
1534
1535         if (!dchild) {
1536                 struct obd_device *obd = class_conn2obd(conn);
1537
1538                 if (!obd) {
1539                         CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1540                         RETURN(ERR_PTR(-EINVAL));
1541                 }
1542                 dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
1543         }
1544
1545         if (IS_ERR(dchild)) {
1546                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1547                 RETURN(dchild);
1548         }
1549
1550         if (!dchild->d_inode) {
1551                 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1552                 f_dput(dchild);
1553                 RETURN(ERR_PTR(-ENOENT));
1554         }
1555
1556         return dchild;
1557 }
1558
1559 #define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__)
1560
1561 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1562                           struct lov_stripe_md *md)
1563 {
1564         struct dentry *dentry = NULL;
1565         int rc = 0;
1566         ENTRY;
1567
1568         dentry = filter_oa2dentry(conn, oa);
1569         if (IS_ERR(dentry))
1570                 RETURN(PTR_ERR(dentry));
1571
1572         filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1573
1574         f_dput(dentry);
1575         RETURN(rc);
1576 }
1577
1578 /* this is called from filter_truncate() until we have filter_punch() */
1579 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1580                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1581 {
1582         struct obd_run_ctxt saved;
1583         struct obd_export *export = class_conn2export(conn);
1584         struct obd_device *obd = class_conn2obd(conn);
1585         struct filter_obd *filter = &obd->u.filter;
1586         struct dentry *dentry;
1587         struct iattr iattr;
1588         struct inode *inode;
1589         void * handle;
1590         int rc, rc2;
1591         ENTRY;
1592
1593         dentry = filter_oa2dentry(conn, oa);
1594
1595         if (IS_ERR(dentry))
1596                 GOTO(out_exp, rc = PTR_ERR(dentry));
1597
1598         iattr_from_obdo(&iattr, oa, oa->o_valid);
1599         iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1600         inode = dentry->d_inode;
1601
1602         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1603         lock_kernel();
1604         if (iattr.ia_valid & ATTR_SIZE)
1605                 down(&inode->i_sem);
1606
1607         handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1608         if (IS_ERR(handle))
1609                 GOTO(out_unlock, rc = PTR_ERR(handle));
1610
1611         rc = fsfilt_setattr(obd, dentry, handle, &iattr, 1);
1612         rc = filter_finish_transno(export, handle, oti, rc);
1613         rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
1614         if (rc2) {
1615                 CERROR("error on commit, err = %d\n", rc2);
1616                 if (!rc)
1617                         rc = rc2;
1618         }
1619
1620         if (iattr.ia_valid & ATTR_SIZE) {
1621                 up(&inode->i_sem);
1622                 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1623                 obdo_from_inode(oa, inode, oa->o_valid);
1624         }
1625
1626 out_unlock:
1627         unlock_kernel();
1628         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1629
1630         f_dput(dentry);
1631  out_exp:
1632         class_export_put(export);
1633         RETURN(rc);
1634 }
1635
1636 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1637                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
1638                        struct obd_client_handle *och)
1639 {
1640         struct obd_export *export = NULL;
1641         struct lustre_handle *handle;
1642         struct filter_file_data *ffd;
1643         struct file *filp;
1644         struct lustre_handle parent_lockh;
1645         int rc = 0;
1646         ENTRY;
1647
1648         export = class_conn2export(conn);
1649         if (!export) {
1650                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1651                        conn->cookie);
1652                 GOTO(out, rc = -EINVAL);
1653         }
1654
1655         filp = filter_obj_open(export, oa->o_id, oa->o_mode,
1656                                LCK_PR, &parent_lockh);
1657         if (IS_ERR(filp))
1658                 GOTO(out, rc = PTR_ERR(filp));
1659
1660         filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1661
1662         ffd = filp->private_data;
1663         handle = obdo_handle(oa);
1664         handle->cookie = ffd->ffd_handle.h_cookie;
1665         oa->o_valid |= OBD_MD_FLHANDLE;
1666
1667 out:
1668         class_export_put(export);
1669         if (!rc) {
1670                 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1671                        sizeof(parent_lockh));
1672                 oti->oti_ack_locks[0].mode = LCK_PR;
1673         }
1674         RETURN(rc);
1675 }
1676
1677 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1678                         struct lov_stripe_md *ea, struct obd_trans_info *oti)
1679 {
1680         struct obd_export *exp = class_conn2export(conn);
1681         struct filter_file_data *ffd;
1682         struct filter_export_data *fed;
1683         int rc;
1684         ENTRY;
1685
1686         if (!exp) {
1687                 CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
1688                 GOTO(out, rc = -EINVAL);
1689         }
1690
1691         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1692                 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1693                 GOTO(out, rc = -EINVAL);
1694         }
1695
1696         ffd = filter_handle2ffd(obdo_handle(oa));
1697         if (ffd == NULL) {
1698                 CERROR("bad handle ("LPX64") for close\n",
1699                        obdo_handle(oa)->cookie);
1700                 GOTO(out, rc = -ESTALE);
1701         }
1702
1703         fed = &exp->exp_filter_data;
1704         spin_lock(&fed->fed_lock);
1705         list_del(&ffd->ffd_export_list);
1706         spin_unlock(&fed->fed_lock);
1707
1708         rc = filter_close_internal(exp, ffd, oti, 0);
1709         filter_ffd_put(ffd);
1710         GOTO(out, rc);
1711  out:
1712         class_export_put(exp);
1713         return rc;
1714 }
1715
1716 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1717                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
1718 {
1719         struct obd_export *exp;
1720         struct obd_device *obd = class_conn2obd(conn);
1721         struct filter_obd *filter = &obd->u.filter;
1722         struct obd_run_ctxt saved;
1723         struct lustre_handle parent_lockh;
1724         struct dentry *dparent;
1725         struct dentry *dchild = NULL;
1726         struct iattr;
1727         void *handle;
1728         int err, rc, cleanup_phase;
1729         ENTRY;
1730
1731         if (!obd) {
1732                 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1733                 RETURN(-EINVAL);
1734         }
1735
1736         exp = class_conn2export(conn);
1737
1738         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1739  retry:
1740         oa->o_id = filter_next_id(filter);
1741
1742         cleanup_phase = 0;
1743         dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1744                                      &parent_lockh);
1745         if (IS_ERR(dparent))
1746                 GOTO(cleanup, rc = PTR_ERR(dparent));
1747         cleanup_phase = 1;
1748
1749         dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1750         if (IS_ERR(dchild))
1751                 GOTO(cleanup, rc = PTR_ERR(dchild));
1752         if (dchild->d_inode) {
1753                 /* This would only happen if lastobjid was bad on disk */
1754                 CERROR("Serious error: objid %*s already exists; is this "
1755                        "filesystem corrupt?  I will try to work around it.\n",
1756                        dchild->d_name.len, dchild->d_name.name);
1757                 f_dput(dchild);
1758                 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1759                 goto retry;
1760         }
1761
1762         cleanup_phase = 2;
1763         handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE);
1764         if (IS_ERR(handle))
1765                 GOTO(cleanup, rc = PTR_ERR(handle));
1766
1767         rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
1768         if (rc)
1769                 CERROR("create failed rc = %d\n", rc);
1770
1771         rc = filter_finish_transno(exp, handle, oti, rc);
1772         err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1773         if (err) {
1774                 CERROR("unable to write lastobjid but file created\n");
1775                 if (!rc)
1776                         rc = err;
1777         }
1778         err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1779         if (err) {
1780                 CERROR("error on commit, err = %d\n", err);
1781                 if (!rc)
1782                         rc = err;
1783         }
1784
1785         if (rc)
1786                 GOTO(cleanup, rc);
1787
1788         /* Set flags for fields we have set in the inode struct */
1789         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1790                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1791         filter_from_inode(oa, dchild->d_inode, oa->o_valid);
1792
1793         EXIT;
1794 cleanup:
1795         switch(cleanup_phase) {
1796         case 2:
1797                 f_dput(dchild);
1798         case 1: /* locked parent dentry */
1799                 if (rc || oti == NULL) {
1800                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1801                 } else {
1802                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1803                                sizeof(parent_lockh));
1804                         oti->oti_ack_locks[0].mode = LCK_PW;
1805                 }
1806         case 0:
1807                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1808                 class_export_put(exp);
1809                 break;
1810         default:
1811                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1812                 LBUG();
1813         }
1814
1815         RETURN(rc);
1816 }
1817
1818 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1819                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
1820 {
1821         struct obd_export *exp;
1822         struct obd_device *obd = class_conn2obd(conn);
1823         struct filter_obd *filter = &obd->u.filter;
1824         struct dentry *dparent, *dchild = NULL;
1825         struct filter_dentry_data *fdd;
1826         struct obd_run_ctxt saved;
1827         void *handle = NULL;
1828         struct lustre_handle parent_lockh;
1829         int rc, rc2, cleanup_phase = 0;
1830         ENTRY;
1831
1832         if (!obd) {
1833                 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1834                 RETURN(-EINVAL);
1835         }
1836
1837         exp = class_conn2export(conn);
1838
1839         CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1840
1841         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1842         dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1843                                      LCK_PW, &parent_lockh);
1844         if (IS_ERR(dparent))
1845                 GOTO(cleanup, rc = PTR_ERR(dparent));
1846         cleanup_phase = 1;
1847
1848         dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1849         if (IS_ERR(dchild))
1850                 GOTO(cleanup, rc = -ENOENT);
1851         cleanup_phase = 2;
1852
1853         if (!dchild->d_inode) {
1854                 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
1855                 GOTO(cleanup, rc = -ENOENT);
1856         }
1857
1858         handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK);
1859         if (IS_ERR(handle))
1860                 GOTO(cleanup, rc = PTR_ERR(handle));
1861         cleanup_phase = 3;
1862
1863         fdd = dchild->d_fsdata;
1864         if (fdd && atomic_read(&fdd->fdd_open_count)) {
1865                 LASSERT(fdd->fdd_magic = FILTER_DENTRY_MAGIC);
1866                 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1867                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1868                         /* XXX put into PENDING directory in case of crash */
1869                         CDEBUG(D_INODE,
1870                                "defer destroy of %dx open objid "LPU64"\n",
1871                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1872                 } else
1873                         CDEBUG(D_INODE,
1874                                "repeat destroy of %dx open objid "LPU64"\n",
1875                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1876                 GOTO(cleanup, rc = 0);
1877         }
1878
1879         rc = filter_destroy_internal(obd, dparent, dchild);
1880
1881 cleanup:
1882         switch(cleanup_phase) {
1883         case 3:
1884                 rc = filter_finish_transno(exp, handle, oti, rc);
1885                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1886                 if (rc2) {
1887                         CERROR("error on commit, err = %d\n", rc2);
1888                         if (!rc)
1889                                 rc = rc2;
1890                 }
1891         case 2:
1892                 f_dput(dchild);
1893         case 1:
1894                 if (rc || oti == NULL) {
1895                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1896                 } else {
1897                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1898                                sizeof(parent_lockh));
1899                         oti->oti_ack_locks[0].mode = LCK_PW;
1900                 }
1901         case 0:
1902                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1903                 class_export_put(exp);
1904                 break;
1905         default:
1906                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1907                 LBUG();
1908         }
1909
1910         RETURN(rc);
1911 }
1912
1913 /* NB start and end are used for punch, but not truncate */
1914 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1915                            struct lov_stripe_md *lsm,
1916                            obd_off start, obd_off end,
1917                            struct obd_trans_info *oti)
1918 {
1919         int error;
1920         ENTRY;
1921
1922         if (end != OBD_OBJECT_EOF)
1923                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
1924                        end);
1925
1926         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
1927                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1928         oa->o_size = start;
1929         error = filter_setattr(conn, oa, NULL, oti);
1930         RETURN(error);
1931 }
1932
1933 static inline void lustre_put_page(struct page *page)
1934 {
1935         page_cache_release(page);
1936 }
1937
1938 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
1939 {
1940         struct address_space *mapping = inode->i_mapping;
1941         struct page *page;
1942         unsigned long index = lnb->offset >> PAGE_SHIFT;
1943         int rc;
1944
1945         page = grab_cache_page(mapping, index); /* locked page */
1946         if (IS_ERR(page))
1947                 return lnb->rc = PTR_ERR(page);
1948
1949         lnb->page = page;
1950
1951         if (inode->i_size < lnb->offset + lnb->len - 1)
1952                 lnb->rc = inode->i_size - lnb->offset;
1953         else
1954                 lnb->rc = lnb->len;
1955
1956         if (PageUptodate(page)) {
1957                 unlock_page(page);
1958                 return 0;
1959         }
1960
1961         rc = mapping->a_ops->readpage(NULL, page);
1962         if (rc < 0) {
1963                 CERROR("page index %lu, rc = %d\n", index, rc);
1964                 lnb->page = NULL;
1965                 lustre_put_page(page);
1966                 return lnb->rc = rc;
1967         }
1968
1969         return 0;
1970 }
1971
1972 static int filter_finish_page_read(struct niobuf_local *lnb)
1973 {
1974         if (lnb->page == NULL)
1975                 return 0;
1976
1977         if (PageUptodate(lnb->page))
1978                 return 0;
1979
1980         wait_on_page(lnb->page);
1981         if (!PageUptodate(lnb->page)) {
1982                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
1983                        lnb->page->index, lnb->offset);
1984                 GOTO(err_page, lnb->rc = -EIO);
1985         }
1986         if (PageError(lnb->page)) {
1987                 CERROR("page index %lu/offset "LPX64" has error\n",
1988                        lnb->page->index, lnb->offset);
1989                 GOTO(err_page, lnb->rc = -EIO);
1990         }
1991
1992         return 0;
1993
1994 err_page:
1995         lustre_put_page(lnb->page);
1996         lnb->page = NULL;
1997         return lnb->rc;
1998 }
1999
2000 static struct page *lustre_get_page_write(struct inode *inode,
2001                                           unsigned long index)
2002 {
2003         struct address_space *mapping = inode->i_mapping;
2004         struct page *page;
2005         int rc;
2006
2007         page = grab_cache_page(mapping, index); /* locked page */
2008
2009         if (!IS_ERR(page)) {
2010                 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
2011                  * a no-op for most filesystems, because we write the whole
2012                  * page.  For partial-page I/O this will read in the page.
2013                  */
2014                 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
2015                 if (rc) {
2016                         CERROR("page index %lu, rc = %d\n", index, rc);
2017                         if (rc != -ENOSPC)
2018                                 LBUG();
2019                         GOTO(err_unlock, rc);
2020                 }
2021                 /* XXX not sure if we need this if we are overwriting page */
2022                 if (PageError(page)) {
2023                         CERROR("error on page index %lu, rc = %d\n", index, rc);
2024                         LBUG();
2025                         GOTO(err_unlock, rc = -EIO);
2026                 }
2027         }
2028         return page;
2029
2030 err_unlock:
2031         unlock_page(page);
2032         lustre_put_page(page);
2033         return ERR_PTR(rc);
2034 }
2035
2036 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2037 int waitfor_one_page(struct page *page)
2038 {
2039         wait_on_page_locked(page);
2040         return 0;
2041 }
2042 #endif
2043
2044 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2045 /* We should only change the file mtime (and not the ctime, like
2046  * update_inode_times() in generic_file_write()) when we only change data.
2047  */
2048 static inline void inode_update_time(struct inode *inode, int ctime_too)
2049 {
2050         time_t now = CURRENT_TIME;
2051         if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
2052                 return;
2053         inode->i_mtime = now;
2054         if (ctime_too)
2055                 inode->i_ctime = now;
2056         mark_inode_dirty_sync(inode);
2057 }
2058 #endif
2059
2060 static int lustre_commit_write(struct niobuf_local *lnb)
2061 {
2062         struct page *page = lnb->page;
2063         unsigned from = lnb->offset & ~PAGE_MASK;
2064         unsigned to = from + lnb->len;
2065         struct inode *inode = page->mapping->host;
2066         int err;
2067
2068         LASSERT(to <= PAGE_SIZE);
2069         err = page->mapping->a_ops->commit_write(NULL, page, from, to);
2070         if (!err && IS_SYNC(inode))
2071                 err = waitfor_one_page(page);
2072         //SetPageUptodate(page); // the client commit_write will do this
2073
2074         SetPageReferenced(page);
2075         unlock_page(page);
2076         lustre_put_page(page);
2077         return err;
2078 }
2079
2080 int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
2081                           int *pglocked)
2082 {
2083         unsigned long index = lnb->offset >> PAGE_SHIFT;
2084         struct address_space *mapping = inode->i_mapping;
2085         struct page *page;
2086         int rc;
2087
2088         //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
2089         if (*pglocked)
2090                 page = grab_cache_page_nowait(mapping, index); /* locked page */
2091         else
2092                 page = grab_cache_page(mapping, index); /* locked page */
2093
2094
2095         /* This page is currently locked, so get a temporary page instead. */
2096         if (!page) {
2097                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
2098                 page = alloc_pages(GFP_KERNEL, 0); /* locked page */
2099                 if (!page) {
2100                         CERROR("no memory for a temp page\n");
2101                         GOTO(err, rc = -ENOMEM);
2102                 }
2103                 page->index = index;
2104                 lnb->page = page;
2105                 lnb->flags |= N_LOCAL_TEMP_PAGE;
2106         } else if (!IS_ERR(page)) {
2107                 (*pglocked)++;
2108
2109                 rc = mapping->a_ops->prepare_write(NULL, page,
2110                                                    lnb->offset & ~PAGE_MASK,
2111                                                    lnb->len);
2112                 if (rc) {
2113                         if (rc != -ENOSPC)
2114                                 CERROR("page index %lu, rc = %d\n", index, rc);
2115                         GOTO(err_unlock, rc);
2116                 }
2117                 /* XXX not sure if we need this if we are overwriting page */
2118                 if (PageError(page)) {
2119                         CERROR("error on page index %lu, rc = %d\n", index, rc);
2120                         LBUG();
2121                         GOTO(err_unlock, rc = -EIO);
2122                 }
2123                 lnb->page = page;
2124         }
2125
2126         return 0;
2127
2128 err_unlock:
2129         unlock_page(page);
2130         lustre_put_page(page);
2131 err:
2132         return lnb->rc = rc;
2133 }
2134
2135 /*
2136  * We need to balance prepare_write() calls with commit_write() calls.
2137  * If the page has been prepared, but we have no data for it, we don't
2138  * want to overwrite valid data on disk, but we still need to zero out
2139  * data for space which was newly allocated.  Like part of what happens
2140  * in __block_prepare_write() for newly allocated blocks.
2141  *
2142  * XXX currently __block_prepare_write() creates buffers for all the
2143  *     pages, and the filesystems mark these buffers as BH_New if they
2144  *     were newly allocated from disk. We use the BH_New flag similarly.
2145  */
2146 static int filter_commit_write(struct niobuf_local *lnb, int err)
2147 {
2148 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2149         if (err) {
2150                 unsigned block_start, block_end;
2151                 struct buffer_head *bh, *head = lnb->page->buffers;
2152                 unsigned blocksize = head->b_size;
2153
2154                 /* debugging: just seeing if this ever happens */
2155                 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
2156                        "called for ino %lu:%lu on err %d\n",
2157                        lnb->page->mapping->host->i_ino, lnb->page->index, err);
2158
2159                 /* Currently one buffer per page, but in the future... */
2160                 for (bh = head, block_start = 0; bh != head || !block_start;
2161                      block_start = block_end, bh = bh->b_this_page) {
2162                         block_end = block_start + blocksize;
2163                         if (buffer_new(bh)) {
2164                                 memset(kmap(lnb->page) + block_start, 0,
2165                                        blocksize);
2166                                 kunmap(lnb->page);
2167                         }
2168                 }
2169         }
2170 #endif
2171         return lustre_commit_write(lnb);
2172 }
2173
2174 static int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
2175                          int objcount, struct obd_ioobj *obj,
2176                          int niocount, struct niobuf_remote *nb,
2177                          struct niobuf_local *res, void **desc_private,
2178                          struct obd_trans_info *oti)
2179 {
2180         struct obd_run_ctxt saved;
2181         struct obd_device *obd;
2182         struct obd_ioobj *o;
2183         struct niobuf_remote *rnb;
2184         struct niobuf_local *lnb;
2185         struct fsfilt_objinfo *fso;
2186         struct dentry *dentry;
2187         struct inode *inode;
2188         int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
2189         unsigned long now = jiffies;
2190         ENTRY;
2191
2192         memset(res, 0, niocount * sizeof(*res));
2193
2194         obd = exp->exp_obd;
2195         if (obd == NULL)
2196                 RETURN(-EINVAL);
2197
2198         // theoretically we support multi-obj BRW RPCs, but until then...
2199         LASSERT(objcount == 1);
2200
2201         OBD_ALLOC(fso, objcount * sizeof(*fso));
2202         if (!fso)
2203                 RETURN(-ENOMEM);
2204
2205         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2206
2207         for (i = 0, o = obj; i < objcount; i++, o++) {
2208                 struct filter_dentry_data *fdd;
2209
2210                 LASSERT(o->ioo_bufcnt);
2211
2212                 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2213
2214                 if (IS_ERR(dentry))
2215                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
2216
2217                 fso[i].fso_dentry = dentry;
2218                 fso[i].fso_bufcnt = o->ioo_bufcnt;
2219
2220                 if (!dentry->d_inode) {
2221                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2222                                o->ioo_id);
2223                         f_dput(dentry);
2224                         GOTO(out_objinfo, rc = -ENOENT);
2225                 }
2226
2227                 /* If we ever start to support mutli-object BRW RPCs, we will
2228                  * need to get locks on mulitple inodes (in order) or use the
2229                  * DLM to do the locking for us (and use the same locking in
2230                  * filter_setattr() for truncate).  That isn't all, because
2231                  * there still exists the possibility of a truncate starting
2232                  * a new transaction while holding the ext3 rwsem = write
2233                  * while some writes (which have started their transactions
2234                  * here) blocking on the ext3 rwsem = read => lock inversion.
2235                  *
2236                  * The handling gets very ugly when dealing with locked pages.
2237                  * It may be easier to just get rid of the locked page code
2238                  * (which has problems of its own) and either discover we do
2239                  * not need it anymore (i.e. it was a symptom of another bug)
2240                  * or ensure we get the page locks in an appropriate order.
2241                  */
2242                 if (cmd & OBD_BRW_WRITE)
2243                         down(&dentry->d_inode->i_sem);
2244                 fdd = dentry->d_fsdata;
2245                 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2246                         CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2247                                o->ioo_id);
2248         }
2249
2250         if (time_after(jiffies, now + 15*HZ))
2251                 CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
2252
2253         if (cmd & OBD_BRW_WRITE) {
2254                 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2255                                                  niocount, nb);
2256                 if (IS_ERR(*desc_private)) {
2257                         rc = PTR_ERR(*desc_private);
2258                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2259                                "error starting transaction: rc = %d\n", rc);
2260                         *desc_private = NULL;
2261                         GOTO(out_objinfo, rc);
2262                 }
2263         }
2264
2265         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
2266                 dentry = fso[i].fso_dentry;
2267                 inode = dentry->d_inode;
2268
2269                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2270                         if (j == 0)
2271                                 lnb->dentry = dentry;
2272                         else
2273                                 lnb->dentry = dget(dentry);
2274
2275                         lnb->offset = rnb->offset;
2276                         lnb->len    = rnb->len;
2277                         lnb->flags  = rnb->flags;
2278                         lnb->start  = jiffies;
2279
2280                         if (cmd & OBD_BRW_WRITE) {
2281                                 rc = filter_get_page_write(inode,lnb,&pglocked);
2282                                 if (rc)
2283                                         up(&dentry->d_inode->i_sem);
2284                         } else if (inode->i_size <= rnb->offset) {
2285                                 /* If there's no more data, abort early.
2286                                  * lnb->page == NULL and lnb->rc == 0, so it's
2287                                  * easy to detect later. */
2288                                 f_dput(dentry);
2289                                 lnb->dentry = NULL;
2290                                 break;
2291                         } else {
2292                                 rc = filter_start_page_read(inode, lnb);
2293                         }
2294
2295                         if (rc) {
2296                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2297                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
2298                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
2299                                        dentry, rc);
2300                                 f_dput(dentry);
2301                                 GOTO(out_pages, rc);
2302                         }
2303
2304                         tot_bytes += lnb->len;
2305
2306                         if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
2307                                 /* Likewise with a partial read */
2308                                 break;
2309                         }
2310                 }
2311         }
2312
2313         if (time_after(jiffies, now + 15*HZ))
2314                 CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
2315
2316         if (cmd & OBD_BRW_READ) {
2317                 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
2318                                     tot_bytes);
2319                 while (lnb-- > res) {
2320                         rc = filter_finish_page_read(lnb);
2321                         if (rc) {
2322                                 CERROR("error page %u@"LPU64" %u %p: rc %d\n",
2323                                        lnb->len, lnb->offset, lnb - res,
2324                                        lnb->dentry, rc);
2325                                 f_dput(lnb->dentry);
2326                                 GOTO(out_pages, rc);
2327                         }
2328                 }
2329         } else
2330                 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
2331                                     tot_bytes);
2332
2333         if (time_after(jiffies, now + 15*HZ))
2334                 CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
2335
2336         EXIT;
2337 out:
2338         OBD_FREE(fso, objcount * sizeof(*fso));
2339         current->journal_info = NULL;
2340         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2341         return rc;
2342
2343 out_pages:
2344         while (lnb-- > res) {
2345                 if (cmd & OBD_BRW_WRITE) {
2346                         filter_commit_write(lnb, rc);
2347                         up(&lnb->dentry->d_inode->i_sem);
2348                 } else {
2349                         lustre_put_page(lnb->page);
2350                 }
2351                 f_dput(lnb->dentry);
2352         }
2353         if (cmd & OBD_BRW_WRITE) {
2354                 filter_finish_transno(exp, *desc_private, oti, rc);
2355                 fsfilt_commit(obd,
2356                               filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2357                               *desc_private, 0);
2358         }
2359         goto out; /* dropped the dentry refs already (one per page) */
2360
2361 out_objinfo:
2362         for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
2363                 if (cmd & OBD_BRW_WRITE)
2364                         up(&fso[i].fso_dentry->d_inode->i_sem);
2365                 f_dput(fso[i].fso_dentry);
2366         }
2367         goto out;
2368 }
2369
2370 static int filter_write_locked_page(struct niobuf_local *lnb)
2371 {
2372         struct page *lpage;
2373         void        *lpage_addr;
2374         void        *lnb_addr;
2375         int rc;
2376         ENTRY;
2377
2378         lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2379         if (IS_ERR(lpage)) {
2380                 /* It is highly unlikely that we would ever get an error here.
2381                  * The page we want to get was previously locked, so it had to
2382                  * have already allocated the space, and we were just writing
2383                  * over the same data, so there would be no hole in the file.
2384                  *
2385                  * XXX: possibility of a race with truncate could exist, need
2386                  *      to check that.  There are no guarantees w.r.t.
2387                  *      write order even on a local filesystem, although the
2388                  *      normal response would be to return the number of bytes
2389                  *      successfully written and leave the rest to the app.
2390                  */
2391                 rc = PTR_ERR(lpage);
2392                 CERROR("error getting locked page index %ld: rc = %d\n",
2393                        lnb->page->index, rc);
2394                 LBUG();
2395                 lustre_commit_write(lnb);
2396                 RETURN(rc);
2397         }
2398
2399         /* 2 kmaps == vanishingly small deadlock opportunity */
2400         lpage_addr = kmap(lpage);
2401         lnb_addr = kmap(lnb->page);
2402
2403         memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
2404
2405         kunmap(lnb->page);
2406         kunmap(lpage);
2407
2408         lustre_put_page(lnb->page);
2409
2410         lnb->page = lpage;
2411         rc = lustre_commit_write(lnb);
2412         if (rc)
2413                 CERROR("error committing locked page %ld: rc = %d\n",
2414                        lnb->page->index, rc);
2415
2416         RETURN(rc);
2417 }
2418
2419 static int filter_syncfs(struct obd_export *exp)
2420 {
2421         struct obd_device *obd = exp->exp_obd;
2422         ENTRY;
2423
2424         RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2425 }
2426
2427 static int filter_commitrw(int cmd, struct obd_export *exp,
2428                            int objcount, struct obd_ioobj *obj,
2429                            int niocount, struct niobuf_local *res,
2430                            void *desc_private, struct obd_trans_info *oti)
2431 {
2432         struct obd_run_ctxt saved;
2433         struct obd_ioobj *o;
2434         struct niobuf_local *lnb;
2435         struct obd_device *obd = exp->exp_obd;
2436         int found_locked = 0, rc = 0, i;
2437         unsigned long now = jiffies;  /* DEBUGGING OST TIMEOUTS */
2438         ENTRY;
2439
2440         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2441
2442         LASSERT(!current->journal_info);
2443         current->journal_info = desc_private;
2444
2445         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2446                 int j;
2447
2448                 if (cmd & OBD_BRW_WRITE) {
2449                         inode_update_time(lnb->dentry->d_inode, 1);
2450                         up(&lnb->dentry->d_inode->i_sem);
2451                 }
2452                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2453                         if (lnb->page == NULL) {
2454                                 continue;
2455                         }
2456
2457                         if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2458                                 found_locked++;
2459                                 continue;
2460                         }
2461
2462                         if (time_after(jiffies, lnb->start + 15*HZ))
2463                                 CERROR("slow commitrw %lus\n",
2464                                        (jiffies - lnb->start) / HZ);
2465
2466                         if (cmd & OBD_BRW_WRITE) {
2467                                 int err = filter_commit_write(lnb, 0);
2468
2469                                 if (!rc)
2470                                         rc = err;
2471                         } else {
2472                                 lustre_put_page(lnb->page);
2473                         }
2474
2475                         f_dput(lnb->dentry);
2476                         if (time_after(jiffies, lnb->start + 15*HZ))
2477                                 CERROR("slow commit_write %lus\n",
2478                                        (jiffies - lnb->start) / HZ);
2479                 }
2480         }
2481
2482         for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2483              i++, o++) {
2484                 int j;
2485                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2486                         int err;
2487                         if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2488                                 continue;
2489
2490                         if (time_after(jiffies, lnb->start + 15*HZ))
2491                                 CERROR("slow commitrw locked %lus\n",
2492                                        (jiffies - lnb->start) / HZ);
2493
2494                         err = filter_write_locked_page(lnb);
2495                         if (!rc)
2496                                 rc = err;
2497                         f_dput(lnb->dentry);
2498                         found_locked--;
2499
2500                         if (time_after(jiffies, lnb->start + 15*HZ))
2501                                 CERROR("slow commit_write locked %lus\n",
2502                                        (jiffies - lnb->start) / HZ);
2503                 }
2504         }
2505
2506         if (cmd & OBD_BRW_WRITE) {
2507                 /* We just want any dentry for the commit, for now */
2508                 struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
2509                 int err;
2510
2511                 rc = filter_finish_transno(exp, desc_private, oti, rc);
2512                 err = fsfilt_commit(obd, dparent->d_inode, desc_private,
2513                                     obd_sync_filter);
2514                 if (err)
2515                         rc = err;
2516                 if (obd_sync_filter)
2517                         LASSERT(oti->oti_transno <= obd->obd_last_committed);
2518
2519                 if (time_after(jiffies, now + 15*HZ))
2520                         CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
2521         }
2522
2523         LASSERT(!current->journal_info);
2524
2525         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2526         RETURN(rc);
2527 }
2528
2529 static int filter_brw(int cmd, struct lustre_handle *conn,
2530                       struct lov_stripe_md *lsm, obd_count oa_bufs,
2531                       struct brw_page *pga, struct obd_trans_info *oti)
2532 {
2533         struct obd_export *export = class_conn2export(conn);
2534         struct obd_ioobj        ioo;
2535         struct niobuf_local     *lnb;
2536         struct niobuf_remote    *rnb;
2537         obd_count               i;
2538         void                    *desc_private;
2539         int                     ret = 0;
2540         ENTRY;
2541
2542         if (export == NULL)
2543                 RETURN(-EINVAL);
2544
2545         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2546         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2547
2548         if (lnb == NULL || rnb == NULL)
2549                 GOTO(out, ret = -ENOMEM);
2550
2551         for (i = 0; i < oa_bufs; i++) {
2552                 rnb[i].offset = pga[i].off;
2553                 rnb[i].len = pga[i].count;
2554         }
2555
2556         ioo.ioo_id = lsm->lsm_object_id;
2557         ioo.ioo_gr = 0;
2558         ioo.ioo_type = S_IFREG;
2559         ioo.ioo_bufcnt = oa_bufs;
2560
2561         ret = filter_preprw(cmd, export, NULL, 1, &ioo, oa_bufs, rnb, lnb,
2562                             &desc_private, oti);
2563         if (ret != 0)
2564                 GOTO(out, ret);
2565
2566         for (i = 0; i < oa_bufs; i++) {
2567                 void *virt = kmap(pga[i].pg);
2568                 obd_off off = pga[i].off & ~PAGE_MASK;
2569                 void *addr = kmap(lnb[i].page);
2570
2571                 /* 2 kmaps == vanishingly small deadlock opportunity */
2572
2573                 if (cmd & OBD_BRW_WRITE)
2574                         memcpy(addr + off, virt + off, pga[i].count);
2575                 else
2576                         memcpy(virt + off, addr + off, pga[i].count);
2577
2578                 kunmap(addr);
2579                 kunmap(virt);
2580         }
2581
2582         ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
2583                               oti);
2584
2585 out:
2586         if (lnb)
2587                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2588         if (rnb)
2589                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2590         class_export_put(export);
2591         RETURN(ret);
2592 }
2593
2594 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2595                              int objcount, struct obd_ioobj *obj,
2596                              int niocount, struct niobuf_remote *nb)
2597 {
2598         struct obd_device *obd;
2599         struct obd_ioobj *o = obj;
2600         struct niobuf_remote *rnb = nb;
2601         int rc = 0;
2602         int i;
2603         ENTRY;
2604
2605         obd = class_conn2obd(conn);
2606         if (!obd) {
2607                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2608                        conn->cookie);
2609                 RETURN(-EINVAL);
2610         }
2611
2612         for (i = 0; i < objcount; i++, o++) {
2613                 struct dentry *dentry;
2614                 struct inode *inode;
2615                 int (*fs_bmap)(struct address_space *, long);
2616                 int j;
2617
2618                 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2619                 if (IS_ERR(dentry))
2620                         GOTO(out, rc = PTR_ERR(dentry));
2621                 inode = dentry->d_inode;
2622                 if (!inode) {
2623                         CERROR("trying to BRW to non-existent file "LPU64"\n",
2624                                o->ioo_id);
2625                         f_dput(dentry);
2626                         GOTO(out, rc = -ENOENT);
2627                 }
2628                 fs_bmap = inode->i_mapping->a_ops->bmap;
2629
2630                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2631                         long block;
2632
2633                         block = rnb->offset >> inode->i_blkbits;
2634
2635                         if (cmd == OBD_BRW_READ) {
2636                                 block = fs_bmap(inode->i_mapping, block);
2637                         } else {
2638                                 loff_t newsize = rnb->offset + rnb->len;
2639                                 /* fs_prep_san_write will also update inode
2640                                  * size for us:
2641                                  * (1) new alloced block
2642                                  * (2) existed block but size extented
2643                                  */
2644                                 /* FIXME We could call fs_prep_san_write()
2645                                  * only once for all the blocks allocation.
2646                                  * Now call it once for each block, for
2647                                  * simplicity. And if error happens, we
2648                                  * probably need to release previous alloced
2649                                  * block */
2650                                 rc = fs_prep_san_write(obd, inode, &block,
2651                                                        1, newsize);
2652                                 if (rc)
2653                                         break;
2654                         }
2655
2656                         rnb->offset = block;
2657                 }
2658                 f_dput(dentry);
2659         }
2660 out:
2661         RETURN(rc);
2662 }
2663
2664 static int filter_statfs(struct obd_export *exp, struct obd_statfs *osfs)
2665 {
2666         struct obd_device *obd = exp->exp_obd;
2667         ENTRY;
2668
2669         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2670 }
2671
2672 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2673                            void *key, __u32 *vallen, void *val)
2674 {
2675         struct obd_device *obd;
2676         ENTRY;
2677
2678         obd = class_conn2obd(conn);
2679         if (!obd) {
2680                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2681                        conn->cookie);
2682                 RETURN(-EINVAL);
2683         }
2684
2685         if (keylen == strlen("blocksize") &&
2686             memcmp(key, "blocksize", keylen) == 0) {
2687                 __u32 *blocksize = val;
2688                 *vallen = sizeof(*blocksize);
2689                 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2690                 RETURN(0);
2691         }
2692
2693         if (keylen == strlen("blocksize_bits") &&
2694             memcmp(key, "blocksize_bits", keylen) == 0) {
2695                 __u32 *blocksize_bits = val;
2696                 *vallen = sizeof(*blocksize_bits);
2697                 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2698                 RETURN(0);
2699         }
2700
2701         CDEBUG(D_IOCTL, "invalid key\n");
2702         RETURN(-EINVAL);
2703 }
2704
2705 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2706                   struct lustre_handle *src_conn, struct obdo *src,
2707                   obd_size count, obd_off offset, struct obd_trans_info *oti)
2708 {
2709         struct page *page;
2710         struct lov_stripe_md srcmd, dstmd;
2711         unsigned long index = 0;
2712         int err = 0;
2713
2714         LBUG(); /* THIS CODE IS NOT CORRECT -phil */
2715
2716         memset(&srcmd, 0, sizeof(srcmd));
2717         memset(&dstmd, 0, sizeof(dstmd));
2718         srcmd.lsm_object_id = src->o_id;
2719         dstmd.lsm_object_id = dst->o_id;
2720
2721         ENTRY;
2722         CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2723                ", dst: ino "LPU64"\n",
2724                src->o_id, src->o_blocks, src->o_size, dst->o_id);
2725         page = alloc_page(GFP_USER);
2726         if (page == NULL)
2727                 RETURN(-ENOMEM);
2728
2729         wait_on_page(page);
2730
2731         /* XXX with brw vector I/O, we could batch up reads and writes here,
2732          *     all we need to do is allocate multiple pages to handle the I/Os
2733          *     and arrays to handle the request parameters.
2734          */
2735         while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2736                 struct brw_page pg;
2737
2738                 pg.pg = page;
2739                 pg.count = PAGE_SIZE;
2740                 pg.off = (page->index) << PAGE_SHIFT;
2741                 pg.flag = 0;
2742
2743                 page->index = index;
2744                 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
2745                 if (err) {
2746                         EXIT;
2747                         break;
2748                 }
2749
2750                 pg.flag = OBD_BRW_CREATE;
2751                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2752
2753                 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
2754
2755                 /* XXX should handle dst->o_size, dst->o_blocks here */
2756                 if (err) {
2757                         EXIT;
2758                         break;
2759                 }
2760
2761                 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2762
2763                 index++;
2764         }
2765         dst->o_size = src->o_size;
2766         dst->o_blocks = src->o_blocks;
2767         dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2768         unlock_page(page);
2769         __free_page(page);
2770
2771         RETURN(err);
2772 }
2773
2774 int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2775                   int len, void *karg, void *uarg)
2776 {
2777         struct obd_device *obd = class_conn2obd(conn);
2778
2779         switch (cmd) {
2780         case OBD_IOC_ABORT_RECOVERY:
2781                 CERROR("aborting recovery for device %s\n", obd->obd_name);
2782                 target_abort_recovery(obd);
2783                 RETURN(0);
2784
2785         default:
2786                 RETURN(-EINVAL);
2787         }
2788         RETURN(0);
2789 }
2790
2791
2792 static struct obd_ops filter_obd_ops = {
2793         o_owner:          THIS_MODULE,
2794         o_attach:         filter_attach,
2795         o_detach:         filter_detach,
2796         o_get_info:       filter_get_info,
2797         o_setup:          filter_setup,
2798         o_cleanup:        filter_cleanup,
2799         o_connect:        filter_connect,
2800         o_disconnect:     filter_disconnect,
2801         o_statfs:         filter_statfs,
2802         o_syncfs:         filter_syncfs,
2803         o_getattr:        filter_getattr,
2804         o_create:         filter_create,
2805         o_setattr:        filter_setattr,
2806         o_destroy:        filter_destroy,
2807         o_open:           filter_open,
2808         o_close:          filter_close,
2809         o_brw:            filter_brw,
2810         o_punch:          filter_truncate,
2811         o_preprw:         filter_preprw,
2812         o_commitrw:       filter_commitrw,
2813         o_destroy_export: filter_destroy_export,
2814         o_iocontrol:      filter_iocontrol,
2815 #if 0
2816         o_san_preprw:  filter_san_preprw,
2817         o_preallocate: filter_preallocate_inodes,
2818         o_migrate:     filter_migrate,
2819         o_copy:        filter_copy_data,
2820         o_iterate:     filter_iterate
2821 #endif
2822 };
2823
2824 static struct obd_ops filter_sanobd_ops = {
2825         o_owner:          THIS_MODULE,
2826         o_attach:         filter_attach,
2827         o_detach:         filter_detach,
2828         o_get_info:       filter_get_info,
2829         o_setup:          filter_san_setup,
2830         o_cleanup:        filter_cleanup,
2831         o_connect:        filter_connect,
2832         o_disconnect:     filter_disconnect,
2833         o_statfs:         filter_statfs,
2834         o_getattr:        filter_getattr,
2835         o_create:         filter_create,
2836         o_setattr:        filter_setattr,
2837         o_destroy:        filter_destroy,
2838         o_open:           filter_open,
2839         o_close:          filter_close,
2840         o_brw:            filter_brw,
2841         o_punch:          filter_truncate,
2842         o_preprw:         filter_preprw,
2843         o_commitrw:       filter_commitrw,
2844         o_san_preprw:     filter_san_preprw,
2845         o_destroy_export: filter_destroy_export,
2846         o_iocontrol:      filter_iocontrol,
2847 #if 0
2848         o_preallocate:  filter_preallocate_inodes,
2849         o_migrate:      filter_migrate,
2850         o_copy:         filter_copy_data,
2851         o_iterate:      filter_iterate
2852 #endif
2853 };
2854
2855
2856 static int __init obdfilter_init(void)
2857 {
2858         struct lprocfs_static_vars lvars;
2859         int rc;
2860
2861         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2862
2863         lprocfs_init_vars(&lvars);
2864
2865         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2866                                  OBD_FILTER_DEVICENAME);
2867         if (rc)
2868                 return rc;
2869
2870         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2871                                  OBD_FILTER_SAN_DEVICENAME);
2872         if (rc)
2873                 class_unregister_type(OBD_FILTER_DEVICENAME);
2874         return rc;
2875 }
2876
2877 static void __exit obdfilter_exit(void)
2878 {
2879         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2880         class_unregister_type(OBD_FILTER_DEVICENAME);
2881 }
2882
2883 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2884 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2885 MODULE_LICENSE("GPL");
2886
2887 module_init(obdfilter_init);
2888 module_exit(obdfilter_exit);