Whamcloud - gitweb
a078bb515ae5bf5554ea3422fb7ca84e357ca409
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define DEBUG_SUBSYSTEM S_FILTER
37
38 #include <linux/config.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/dcache.h>
42 #include <linux/init.h>
43 #include <linux/version.h>
44 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
45 # include <linux/mount.h>
46 # include <linux/buffer_head.h>
47 #endif
48
49 #include <linux/obd_class.h>
50 #include <linux/obd_lov.h>
51 #include <linux/obd_ost.h>
52 #include <linux/lustre_dlm.h>
53 #include <linux/lustre_fsfilt.h>
54 #include <linux/lprocfs_status.h>
55 #include <linux/lustre_log.h>
56 #include <linux/lustre_commit_confd.h>
57 #include <portals/list.h>
58
59 #include <linux/lustre_smfs.h>
60 #include "filter_internal.h"
61
62 static struct lvfs_callback_ops filter_lvfs_ops;
63
64 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
65                           struct lov_stripe_md *ea, struct obd_trans_info *);
66
67 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
68                              void *cb_data, int error)
69 {
70         obd_transno_commit_cb(obd, transno, error);
71 }
72
73 /* Assumes caller has already pushed us into the kernel context. */
74 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
75                           int rc)
76 {
77         struct filter_obd *filter = &exp->exp_obd->u.filter;
78         struct filter_export_data *fed = &exp->exp_filter_data;
79         struct filter_client_data *fcd = fed->fed_fcd;
80         __u64 last_rcvd;
81         loff_t off;
82         int err, log_pri = D_HA;
83
84         /* Propagate error code. */
85         if (rc)
86                 RETURN(rc);
87
88         if (!exp->exp_obd->obd_replayable || oti == NULL)
89                 RETURN(rc);
90
91         /* we don't allocate new transnos for replayed requests */
92         if (oti->oti_transno == 0) {
93                 spin_lock(&filter->fo_translock);
94                 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
95                 filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
96                 spin_unlock(&filter->fo_translock);
97                 oti->oti_transno = last_rcvd;
98         } else {
99                 spin_lock(&filter->fo_translock);
100                 last_rcvd = oti->oti_transno;
101                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
102                         filter->fo_fsd->fsd_last_transno =
103                                 cpu_to_le64(last_rcvd);
104                 spin_unlock(&filter->fo_translock);
105         }
106         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
107
108         /* could get xid from oti, if it's ever needed */
109         fcd->fcd_last_xid = 0;
110
111         off = fed->fed_lr_off;
112
113         fsfilt_add_journal_cb(exp->exp_obd, filter->fo_sb, last_rcvd,
114                               oti->oti_handle, filter_commit_cb, NULL);
115
116         err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
117                                   sizeof(*fcd), &off, 0);
118         if (err) {
119                 log_pri = D_ERROR;
120                 if (rc == 0)
121                         rc = err;
122         }
123
124         CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n",
125                last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err);
126
127         RETURN(rc);
128 }
129
130 void f_dput(struct dentry *dentry)
131 {
132         /* Can't go inside filter_ddelete because it can block */
133         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
134                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
135         LASSERT(atomic_read(&dentry->d_count) > 0);
136
137         dput(dentry);
138 }
139
140 /* Add client data to the FILTER.  We use a bitmap to locate a free space
141  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
142  * Otherwise, we have just read the data from the last_rcvd file and
143  * we know its offset. */
144 static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
145                              struct filter_export_data *fed, int cl_idx)
146 {
147         unsigned long *bitmap = filter->fo_last_rcvd_slots;
148         int new_client = (cl_idx == -1);
149         ENTRY;
150
151         LASSERT(bitmap != NULL);
152
153         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
154         if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
155                 RETURN(0);
156
157         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
158          * there's no need for extra complication here
159          */
160         if (new_client) {
161                 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
162         repeat:
163                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
164                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
165                         RETURN(-ENOMEM);
166                 }
167                 if (test_and_set_bit(cl_idx, bitmap)) {
168                         CERROR("FILTER client %d: found bit is set in bitmap\n",
169                                cl_idx);
170                         cl_idx = find_next_zero_bit(bitmap,
171                                                     FILTER_LR_MAX_CLIENTS,
172                                                     cl_idx);
173                         goto repeat;
174                 }
175         } else {
176                 if (test_and_set_bit(cl_idx, bitmap)) {
177                         CERROR("FILTER client %d: bit already set in bitmap!\n",
178                                cl_idx);
179                         LBUG();
180                 }
181         }
182
183         fed->fed_lr_idx = cl_idx;
184         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
185                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
186
187         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
188                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
189
190         if (new_client) {
191                 struct lvfs_run_ctxt saved;
192                 loff_t off = fed->fed_lr_off;
193                 int err;
194                 void *handle;
195
196                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
197                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
198
199                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
200                 /* Transaction needed to fix bug 1403 */
201                 handle = fsfilt_start(obd,
202                                       filter->fo_rcvd_filp->f_dentry->d_inode,
203                                       FSFILT_OP_SETATTR, NULL);
204                 if (IS_ERR(handle)) {
205                         err = PTR_ERR(handle);
206                         CERROR("unable to start transaction: rc %d\n", err);
207                 } else {
208                         err = fsfilt_write_record(obd, filter->fo_rcvd_filp,
209                                                   fed->fed_fcd,
210                                                   sizeof(*fed->fed_fcd),
211                                                   &off, 1);
212                         fsfilt_commit(obd, filter->fo_sb,
213                                       filter->fo_rcvd_filp->f_dentry->d_inode,
214                                       handle, 1);
215                 }
216                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
217
218                 if (err) {
219                         CERROR("error writing %s client idx %u: rc %d\n",
220                                LAST_RCVD, fed->fed_lr_idx, err);
221                         RETURN(err);
222                 }
223         }
224         RETURN(0);
225 }
226
227 static int filter_client_free(struct obd_export *exp, int flags)
228 {
229         struct filter_export_data *fed = &exp->exp_filter_data;
230         struct filter_obd *filter = &exp->exp_obd->u.filter;
231         struct obd_device *obd = exp->exp_obd;
232         struct filter_client_data zero_fcd;
233         struct lvfs_run_ctxt saved;
234         int rc;
235         loff_t off;
236         ENTRY;
237
238         if (fed->fed_fcd == NULL)
239                 RETURN(0);
240
241         if (flags & OBD_OPT_FAILOVER)
242                 GOTO(free, 0);
243
244         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
245         if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
246                 GOTO(free, 0);
247
248         LASSERT(filter->fo_last_rcvd_slots != NULL);
249
250         off = fed->fed_lr_off;
251
252         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
253                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
254
255         /* Clear the bit _after_ zeroing out the client so we don't
256            race with filter_client_add and zero out new clients.*/
257         if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
258                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
259                        fed->fed_lr_idx);
260                 LBUG();
261         }
262
263         memset(&zero_fcd, 0, sizeof zero_fcd);
264         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
265         rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
266                                  sizeof(zero_fcd), &off, 1);
267         if (rc == 0)
268                 /* update server's transno */
269                 filter_update_server_data(obd, filter->fo_rcvd_filp,
270                                           filter->fo_fsd, 1);
271         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
272
273         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
274                "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
275                fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
276                LAST_RCVD, rc);
277
278         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
279                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
280                        fed->fed_lr_idx);
281                 LBUG();
282         }
283
284 free:
285         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
286
287         RETURN(0);
288 }
289
290 static int filter_free_server_data(struct filter_obd *filter)
291 {
292         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
293         filter->fo_fsd = NULL;
294         OBD_FREE(filter->fo_last_rcvd_slots,
295                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
296         filter->fo_last_rcvd_slots = NULL;
297         return 0;
298 }
299
300 /* assumes caller is already in kernel ctxt */
301 int filter_update_server_data(struct obd_device *obd, struct file *filp,
302                               struct filter_server_data *fsd, int force_sync)
303 {
304         loff_t off = 0;
305         int rc;
306         ENTRY;
307
308         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
309         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
310                le64_to_cpu(fsd->fsd_last_transno));
311         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
312                le64_to_cpu(fsd->fsd_mount_count));
313
314         rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync);
315         if (rc)
316                 CERROR("error writing filter_server_data: rc = %d\n", rc);
317
318         RETURN(rc);
319 }
320
321 int filter_update_last_objid(struct obd_device *obd, obd_gr group,
322                              int force_sync)
323 {
324         struct filter_obd *filter = &obd->u.filter;
325         __u64 tmp;
326         loff_t off = 0;
327         int rc;
328         ENTRY;
329
330         if (filter->fo_last_objid_files[group] == NULL) {
331                 CERROR("Object group "LPU64" not fully setup; not updating "
332                        "last_objid\n", group);
333                 RETURN(0);
334         }
335
336         CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n",
337                group, filter->fo_last_objids[group]);
338
339         tmp = cpu_to_le64(filter->fo_last_objids[group]);
340         rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
341                                  &tmp, sizeof(tmp), &off, force_sync);
342         if (rc)
343                 CERROR("error writing group "LPU64" last objid: rc = %d\n",
344                        group, rc);
345         RETURN(rc);
346 }
347
348 /* assumes caller has already in kernel ctxt */
349 static int filter_init_server_data(struct obd_device *obd, struct file * filp)
350 {
351         struct filter_obd *filter = &obd->u.filter;
352         struct filter_server_data *fsd;
353         struct filter_client_data *fcd = NULL;
354         struct inode *inode = filp->f_dentry->d_inode;
355         unsigned long last_rcvd_size = inode->i_size;
356         __u64 mount_count;
357         int cl_idx;
358         loff_t off = 0;
359         int rc;
360
361         /* ensure padding in the struct is the correct size */
362         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
363                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
364         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
365                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
366
367         OBD_ALLOC(fsd, sizeof(*fsd));
368         if (!fsd)
369                 RETURN(-ENOMEM);
370         filter->fo_fsd = fsd;
371
372         OBD_ALLOC(filter->fo_last_rcvd_slots,
373                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
374         if (filter->fo_last_rcvd_slots == NULL) {
375                 OBD_FREE(fsd, sizeof(*fsd));
376                 RETURN(-ENOMEM);
377         }
378
379         if (last_rcvd_size == 0) {
380                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
381
382                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
383                 fsd->fsd_last_transno = 0;
384                 mount_count = fsd->fsd_mount_count = 0;
385                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
386                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
387                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
388                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
389                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
390         } else {
391                 rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off);
392                 if (rc) {
393                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
394                                LAST_RCVD, rc);
395                         GOTO(err_fsd, rc);
396                 }
397                 if (strcmp(fsd->fsd_uuid, obd->obd_uuid.uuid) != 0) {
398                         CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
399                                obd->obd_uuid.uuid, fsd->fsd_uuid);
400                         GOTO(err_fsd, rc = -EINVAL);
401                 }
402                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
403                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
404         }
405
406         if (fsd->fsd_feature_incompat & ~cpu_to_le32(FILTER_INCOMPAT_SUPP)) {
407                 CERROR("unsupported feature %x\n",
408                        le32_to_cpu(fsd->fsd_feature_incompat) &
409                        ~FILTER_INCOMPAT_SUPP);
410                 GOTO(err_fsd, rc = -EINVAL);
411         }
412         if (fsd->fsd_feature_rocompat & ~cpu_to_le32(FILTER_ROCOMPAT_SUPP)) {
413                 CERROR("read-only feature %x\n",
414                        le32_to_cpu(fsd->fsd_feature_rocompat) &
415                        ~FILTER_ROCOMPAT_SUPP);
416                 /* Do something like remount filesystem read-only */
417                 GOTO(err_fsd, rc = -EINVAL);
418         }
419
420         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
421                obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
422         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
423                obd->obd_name, mount_count + 1);
424         CDEBUG(D_INODE, "%s: server data size: %u\n",
425                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
426         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
427                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
428         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
429                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
430         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
431                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
432         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
433                last_rcvd_size <= le32_to_cpu(fsd->fsd_client_start) ? 0 :
434                (last_rcvd_size - le32_to_cpu(fsd->fsd_client_start)) /
435                 le16_to_cpu(fsd->fsd_client_size));
436
437         if (!obd->obd_replayable) {
438                 CWARN("%s: recovery support OFF\n", obd->obd_name);
439                 GOTO(out, rc = 0);
440         }
441
442         for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start);
443              off < last_rcvd_size; cl_idx++) {
444                 __u64 last_rcvd;
445                 struct obd_export *exp;
446                 struct filter_export_data *fed;
447
448                 if (!fcd) {
449                         OBD_ALLOC(fcd, sizeof(*fcd));
450                         if (!fcd)
451                                 GOTO(err_client, rc = -ENOMEM);
452                 }
453
454                 /* Don't assume off is incremented properly by
455                  * fsfilt_read_record(), in case sizeof(*fcd)
456                  * isn't the same as fsd->fsd_client_size.  */
457                 off = le32_to_cpu(fsd->fsd_client_start) +
458                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
459                 rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
460                 if (rc) {
461                         CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
462                                LAST_RCVD, cl_idx, off, rc);
463                         break; /* read error shouldn't cause startup to fail */
464                 }
465
466                 if (fcd->fcd_uuid[0] == '\0') {
467                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
468                                cl_idx);
469                         continue;
470                 }
471
472                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
473
474                 /* These exports are cleaned up by filter_disconnect(), so they
475                  * need to be set up like real exports as filter_connect() does.
476                  */
477                 exp = class_new_export(obd);
478                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
479                        " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
480                        last_rcvd, le64_to_cpu(fsd->fsd_last_transno));
481                 if (exp == NULL)
482                         GOTO(err_client, rc = -ENOMEM);
483
484                 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
485                        sizeof exp->exp_client_uuid.uuid);
486                 fed = &exp->exp_filter_data;
487                 fed->fed_fcd = fcd;
488                 filter_client_add(obd, filter, fed, cl_idx);
489                 /* create helper if export init gets more complex */
490                 spin_lock_init(&fed->fed_lock);
491
492                 fcd = NULL;
493                 exp->exp_replay_needed = 1;
494                 obd->obd_recoverable_clients++;
495                 obd->obd_max_recoverable_clients++;
496                 class_export_put(exp);
497
498                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
499                        cl_idx, last_rcvd);
500
501                 if (last_rcvd > le64_to_cpu(fsd->fsd_last_transno))
502                         fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
503
504         }
505
506         obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno);
507
508         if (obd->obd_recoverable_clients) {
509                 CWARN("RECOVERY: %d recoverable clients, last_rcvd "
510                       LPU64"\n", obd->obd_recoverable_clients,
511                       le64_to_cpu(fsd->fsd_last_transno));
512                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
513                 target_start_recovery_thread(obd, ost_handle);
514         }
515
516         if (fcd)
517                 OBD_FREE(fcd, sizeof(*fcd));
518
519 out:
520         filter->fo_mount_count = mount_count + 1;
521         fsd->fsd_mount_count = cpu_to_le64(filter->fo_mount_count);
522
523         /* save it, so mount count and last_transno is current */
524         rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
525
526         RETURN(rc);
527
528 err_client:
529         class_disconnect_exports(obd, 0);
530 err_fsd:
531         filter_free_server_data(filter);
532         RETURN(rc);
533 }
534
535 static int filter_cleanup_groups(struct obd_device *obd)
536 {
537         struct filter_obd *filter = &obd->u.filter;
538         struct dentry *dentry;
539         int i, k;
540         ENTRY;
541
542         for (i = 0; i < filter->fo_group_count; i++) {
543                 if (filter->fo_subdirs != NULL) {
544                         for (k = 0; k < filter->fo_subdir_count; k++) {
545                                 dentry = filter->fo_subdirs[i].dentry[k];
546                                 if (dentry == NULL)
547                                         continue;
548                                 f_dput(dentry);
549                                 filter->fo_subdirs[i].dentry[k] = NULL;
550                         }
551                 }
552                 if (filter->fo_last_objid_files[i] != NULL) {
553                         filp_close(filter->fo_last_objid_files[i], 0);
554                         filter->fo_last_objid_files[i] = NULL;
555                 }
556                 if (filter->fo_groups[i] != NULL) {
557                         dput(filter->fo_groups[i]);
558                         filter->fo_groups[i] = NULL;
559                 }
560         }
561         if (filter->fo_subdirs != NULL)
562                 OBD_FREE(filter->fo_subdirs,
563                          filter->fo_group_count * sizeof(*filter->fo_subdirs));
564         if (filter->fo_groups != NULL)
565                 OBD_FREE(filter->fo_groups,
566                          filter->fo_group_count * sizeof(*filter->fo_groups));
567         if (filter->fo_last_objids != NULL)
568                 OBD_FREE(filter->fo_last_objids,
569                          filter->fo_group_count * sizeof(__u64));
570         if (filter->fo_last_objid_files != NULL)
571                 OBD_FREE(filter->fo_last_objid_files,
572                          filter->fo_group_count * sizeof(struct file *));
573         f_dput(filter->fo_dentry_O);
574         RETURN(0);
575 }
576
577 static int filter_read_group_internal(struct obd_device *obd, int group,
578                                       int create)
579 {
580         struct filter_obd *filter = &obd->u.filter;
581         __u64 *new_objids = NULL;
582         struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs = NULL;
583         struct dentry **new_groups = NULL;
584         struct file **new_files = NULL;
585         struct dentry *dentry;
586         struct file *filp;
587         int old_count = filter->fo_group_count, rc, stage = 0, i;
588         char name[25];
589         __u64 last_objid;
590         loff_t off = 0;
591
592         snprintf(name, 24, "%d", group);
593         name[24] = '\0';
594
595         if (!create) {
596                 dentry = ll_lookup_one_len(name, filter->fo_dentry_O,
597                                            strlen(name));
598                 if (IS_ERR(dentry)) {
599                         CERROR("Cannot lookup expected object group %d: %ld\n",
600                                group, PTR_ERR(dentry));
601                         RETURN(PTR_ERR(dentry));
602                 }
603         } else {
604                 dentry = simple_mkdir(filter->fo_dentry_O, name, 0700, 1);
605                 if (IS_ERR(dentry)) {
606                         CERROR("cannot lookup/create O/%s: rc = %ld\n", name,
607                                PTR_ERR(dentry));
608                         RETURN(PTR_ERR(dentry));
609                 }
610         }
611         stage = 1;
612
613         snprintf(name, 24, "O/%d/LAST_ID", group);
614         name[24] = '\0';
615         filp = filp_open(name, O_CREAT | O_RDWR, 0700);
616         if (IS_ERR(filp)) {
617                 CERROR("cannot create %s: rc = %ld\n", name, PTR_ERR(filp));
618                 GOTO(cleanup, rc = PTR_ERR(filp));
619         }
620         stage = 2;
621
622         rc = fsfilt_read_record(obd, filp, &last_objid, sizeof(__u64), &off);
623         if (rc) {
624                 CDEBUG(D_INODE, "error reading %s: rc %d\n", name, rc);
625                 GOTO(cleanup, rc);
626         }
627
628         if (filter->fo_subdir_count) {
629                 OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs));
630                 if (tmp_subdirs == NULL)
631                         GOTO(cleanup, rc = -ENOMEM);
632                 stage = 3;
633
634                 for (i = 0; i < filter->fo_subdir_count; i++) {
635                         char dir[20];
636                         snprintf(dir, sizeof(dir), "d%u", i);
637
638                         tmp_subdirs->dentry[i] =
639                                 simple_mkdir(dentry, dir, 0700, 1);
640                         if (IS_ERR(tmp_subdirs->dentry[i])) {
641                                 rc = PTR_ERR(tmp_subdirs->dentry[i]);
642                                 CERROR("can't lookup/create O/%d/%s: rc = %d\n",
643                                        group, dir, rc);
644                                 GOTO(cleanup, rc);
645                         }
646                         CDEBUG(D_INODE, "got/created O/%d/%s: %p\n", group, dir,
647                                tmp_subdirs->dentry[i]);
648                 }
649         }
650
651         /* 'group' is an index; we need an array of length 'group + 1' */
652         if (group + 1 > old_count) {
653                 int len = group + 1;
654                 OBD_ALLOC(new_objids, len * sizeof(*new_objids));
655                 OBD_ALLOC(new_subdirs, len * sizeof(*new_subdirs));
656                 OBD_ALLOC(new_groups, len * sizeof(*new_groups));
657                 OBD_ALLOC(new_files, len * sizeof(*new_files));
658                 stage = 4;
659                 if (new_objids == NULL || new_subdirs == NULL ||
660                     new_groups == NULL || new_files == NULL)
661                         GOTO(cleanup, rc = -ENOMEM);
662
663                 memcpy(new_objids, filter->fo_last_objids,
664                        old_count * sizeof(*new_objids));
665                 memcpy(new_subdirs, filter->fo_subdirs,
666                        old_count * sizeof(*new_subdirs));
667                 memcpy(new_groups, filter->fo_groups,
668                        old_count * sizeof(*new_groups));
669                 memcpy(new_files, filter->fo_last_objid_files,
670                        old_count * sizeof(*new_files));
671
672                 if (old_count) {
673                         OBD_FREE(filter->fo_last_objids,
674                                  old_count * sizeof(*new_objids));
675                         OBD_FREE(filter->fo_subdirs,
676                                  old_count * sizeof(*new_subdirs));
677                         OBD_FREE(filter->fo_groups,
678                                  old_count * sizeof(*new_groups));
679                         OBD_FREE(filter->fo_last_objid_files,
680                                  old_count * sizeof(*new_files));
681                 }
682                 filter->fo_last_objids = new_objids;
683                 filter->fo_subdirs = new_subdirs;
684                 filter->fo_groups = new_groups;
685                 filter->fo_last_objid_files = new_files;
686                 filter->fo_group_count = len;
687         }
688
689         filter->fo_groups[group] = dentry;
690         filter->fo_last_objid_files[group] = filp;
691         if (filter->fo_subdir_count) {
692                 filter->fo_subdirs[group] = *tmp_subdirs;
693                 OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
694         }
695
696         if (filp->f_dentry->d_inode->i_size == 0) {
697                 filter->fo_last_objids[group] = FILTER_INIT_OBJID;
698                 RETURN(0);
699         }
700
701         filter->fo_last_objids[group] = le64_to_cpu(last_objid);
702         CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
703                obd->obd_name, group, last_objid);
704         RETURN(0);
705  cleanup:
706         switch (stage) {
707         case 4:
708                 if (new_objids != NULL)
709                         OBD_FREE(new_objids, group * sizeof(*new_objids));
710                 if (new_subdirs != NULL)
711                         OBD_FREE(new_subdirs, group * sizeof(*new_subdirs));
712                 if (new_groups != NULL)
713                         OBD_FREE(new_groups, group * sizeof(*new_groups));
714                 if (new_files != NULL)
715                         OBD_FREE(new_files, group * sizeof(*new_files));
716         case 3:
717                 if (filter->fo_subdir_count) {
718                         for (i = 0; i < filter->fo_subdir_count; i++) {
719                                 if (tmp_subdirs->dentry[i] != NULL)
720                                         dput(tmp_subdirs->dentry[i]);
721                         }
722                         OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
723                 }
724         case 2:
725                 filp_close(filp, 0);
726         case 1:
727                 dput(dentry);
728         }
729         RETURN(rc);
730 }
731
732 static int filter_read_groups(struct obd_device *obd, int last_group,
733                               int create)
734 {
735         struct filter_obd *filter = &obd->u.filter;
736         int old_count = filter->fo_group_count, group = old_count, rc = 0;
737
738         for (group = old_count; group <= last_group; group++) {
739                 if (group == 0)
740                         continue; /* no group zero */
741
742                 rc = filter_read_group_internal(obd, group, create);
743                 if (rc != 0)
744                         break;
745         }
746         return rc;
747 }
748
749 static int filter_prep_groups(struct obd_device *obd)
750 {
751         struct filter_obd *filter = &obd->u.filter;
752         struct dentry *dentry, *O_dentry;
753         int rc = 0, cleanup_phase = 0;
754         ENTRY;
755
756         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
757         CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
758         if (IS_ERR(O_dentry)) {
759                 rc = PTR_ERR(O_dentry);
760                 CERROR("cannot open/create O: rc = %d\n", rc);
761                 GOTO(cleanup, rc);
762         }
763         filter->fo_dentry_O = O_dentry;
764         cleanup_phase = 1; /* O_dentry */
765
766         /* Lookup "R" to tell if we're on an old OST FS and need to convert
767          * from O/R/<dir>/<objid> to O/0/<dir>/<objid>.  This can be removed
768          * some time post 1.0 when all old-style OSTs have converted along
769          * with the init_objid hack. */
770         dentry = ll_lookup_one_len("R", O_dentry, 1);
771         if (IS_ERR(dentry))
772                 GOTO(cleanup, rc = PTR_ERR(dentry));
773         if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
774                 struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1);
775                 ENTRY;
776
777                 CWARN("converting OST to new object layout\n");
778                 if (IS_ERR(O0_dentry)) {
779                         rc = PTR_ERR(O0_dentry);
780                         CERROR("error looking up O/0: rc %d\n", rc);
781                         GOTO(cleanup_R, rc);
782                 }
783
784                 if (O0_dentry->d_inode) {
785                         CERROR("Both O/R and O/0 exist. Fix manually.\n");
786                         GOTO(cleanup_O0, rc = -EEXIST);
787                 }
788
789                 down(&O_dentry->d_inode->i_sem);
790                 rc = vfs_rename(O_dentry->d_inode, dentry,
791                                 O_dentry->d_inode, O0_dentry);
792                 up(&O_dentry->d_inode->i_sem);
793
794                 if (rc) {
795                         CERROR("error renaming O/R to O/0: rc %d\n", rc);
796                         GOTO(cleanup_O0, rc);
797                 }
798                 filter->fo_fsd->fsd_feature_incompat |=
799                         cpu_to_le32(FILTER_INCOMPAT_GROUPS);
800                 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
801                                                filter->fo_fsd, 1);
802                 GOTO(cleanup_O0, rc);
803
804         cleanup_O0:
805                 f_dput(O0_dentry);
806         cleanup_R:
807                 f_dput(dentry);
808                 if (rc)
809                         GOTO(cleanup, rc);
810         } else {
811                 f_dput(dentry);
812         }
813
814         cleanup_phase = 2; /* groups */
815
816         /* Group 0 is no longer a legal group, to catch uninitialized IDs */
817 #define FILTER_MIN_GROUPS 3
818         rc = filter_read_groups(obd, FILTER_MIN_GROUPS, 1);
819         if (rc)
820                 GOTO(cleanup, rc);
821
822         RETURN(0);
823
824  cleanup:
825         switch (cleanup_phase) {
826         case 2:
827                 filter_cleanup_groups(obd);
828         case 1:
829                 f_dput(filter->fo_dentry_O);
830                 filter->fo_dentry_O = NULL;
831         default:
832                 break;
833         }
834         return rc;
835 }
836
837 /* setup the object store with correct subdirectories */
838 static int filter_prep(struct obd_device *obd)
839 {
840         struct lvfs_run_ctxt saved;
841         struct filter_obd *filter = &obd->u.filter;
842         struct file *file;
843         struct inode *inode;
844         int rc = 0;
845         ENTRY;
846
847         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
848         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
849         if (!file || IS_ERR(file)) {
850                 rc = PTR_ERR(file);
851                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
852                        LAST_RCVD, rc);
853                 GOTO(out, rc);
854         }
855
856         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
857                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
858                        file->f_dentry->d_inode->i_mode);
859                 GOTO(err_filp, rc = -ENOENT);
860         }
861
862         /* steal operations */
863         inode = file->f_dentry->d_inode;
864         filter->fo_fop = file->f_op;
865         filter->fo_iop = inode->i_op;
866         filter->fo_aops = inode->i_mapping->a_ops;
867
868         rc = filter_init_server_data(obd, file);
869         if (rc) {
870                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
871                 GOTO(err_filp, rc);
872         }
873         filter->fo_rcvd_filp = file;
874
875         rc = filter_prep_groups(obd);
876         if (rc)
877                 GOTO(err_server_data, rc);
878
879  out:
880         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
881
882         return(rc);
883
884  err_server_data:
885         //class_disconnect_exports(obd, 0);
886         filter_free_server_data(filter);
887  err_filp:
888         if (filp_close(file, 0))
889                 CERROR("can't close %s after error\n", LAST_RCVD);
890         filter->fo_rcvd_filp = NULL;
891         goto out;
892 }
893
894 /* cleanup the filter: write last used object id to status file */
895 static void filter_post(struct obd_device *obd)
896 {
897         struct lvfs_run_ctxt saved;
898         struct filter_obd *filter = &obd->u.filter;
899         int rc, i;
900
901         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
902          * best to start a transaction with h_sync, because we removed this
903          * from lastobjid */
904
905         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
906         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
907                                        filter->fo_fsd, 0);
908         if (rc)
909                 CERROR("error writing server data: rc = %d\n", rc);
910
911         for (i = 1; i < filter->fo_group_count; i++) {
912                 rc = filter_update_last_objid(obd, i,
913                                              (i == filter->fo_group_count - 1));
914                 if (rc)
915                         CERROR("error writing group %d lastobjid: rc = %d\n",
916                                i, rc);
917         }
918
919         rc = filp_close(filter->fo_rcvd_filp, 0);
920         filter->fo_rcvd_filp = NULL;
921         if (rc)
922                 CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
923
924         filter_cleanup_groups(obd);
925         filter_free_server_data(filter);
926         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
927 }
928
929 static void filter_set_last_id(struct filter_obd *filter, int group, obd_id id)
930 {
931         LASSERT(filter->fo_fsd != NULL);
932         LASSERT(group > 0);
933         LASSERT(group < filter->fo_group_count);
934
935         spin_lock(&filter->fo_objidlock);
936         filter->fo_last_objids[group] = id;
937         spin_unlock(&filter->fo_objidlock);
938 }
939
940 __u64 filter_last_id(struct filter_obd *filter, int group)
941 {
942         obd_id id;
943         LASSERT(filter->fo_fsd != NULL);
944         LASSERT(group > 0);
945         LASSERT(group < filter->fo_group_count);
946
947         spin_lock(&filter->fo_objidlock);
948         id = filter->fo_last_objids[group];
949         spin_unlock(&filter->fo_objidlock);
950
951         return id;
952 }
953
954 /* direct cut-n-paste of mds_blocking_ast() */
955 static int filter_blocking_ast(struct ldlm_lock *lock,
956                                struct ldlm_lock_desc *desc,
957                                void *data, int flag)
958 {
959         int do_ast;
960         ENTRY;
961
962         if (flag == LDLM_CB_CANCELING) {
963                 /* Don't need to do anything here. */
964                 RETURN(0);
965         }
966
967         /* XXX layering violation!  -phil */
968         l_lock(&lock->l_resource->lr_namespace->ns_lock);
969         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
970          * such that filter_blocking_ast is called just before l_i_p takes the
971          * ns_lock, then by the time we get the lock, we might not be the
972          * correct blocking function anymore.  So check, and return early, if
973          * so. */
974         if (lock->l_blocking_ast != filter_blocking_ast) {
975                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
976                 RETURN(0);
977         }
978
979         lock->l_flags |= LDLM_FL_CBPENDING;
980         do_ast = (!lock->l_readers && !lock->l_writers);
981         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
982
983         if (do_ast) {
984                 struct lustre_handle lockh;
985                 int rc;
986
987                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
988                 ldlm_lock2handle(lock, &lockh);
989                 rc = ldlm_cli_cancel(&lockh);
990                 if (rc < 0)
991                         CERROR("ldlm_cli_cancel: %d\n", rc);
992         } else {
993                 LDLM_DEBUG(lock, "Lock still has references, will be "
994                            "cancelled later");
995         }
996         RETURN(0);
997 }
998
999 extern void *lock_dir(struct inode *dir, struct qstr *name);
1000 extern void unlock_dir(struct inode *dir, void *lock);
1001
1002 static void * filter_lock_dentry(struct obd_device *obd,
1003                                  struct dentry *dparent, obd_id id)
1004 {
1005 #ifdef S_PDIROPS
1006         struct qstr qstr;
1007         char name[32];
1008         qstr.name = name;
1009         qstr.len = sprintf(name, LPU64, id);
1010         return lock_dir(dparent->d_inode, &qstr);
1011 #else
1012         down(&dparent->d_inode->i_sem);
1013 #endif
1014         return 0;
1015 }
1016
1017 /* We never dget the object parent, so DON'T dput it either */
1018 static void filter_parent_unlock(struct dentry *dparent, void *lock)
1019 {
1020 #ifdef S_PDIROPS
1021         LASSERT(lock != NULL);
1022         unlock_dir(dparent->d_inode, lock);
1023 #else
1024         up(&dparent->d_inode->i_sem);
1025 #endif
1026 }
1027
1028 /* We never dget the object parent, so DON'T dput it either */
1029 struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
1030 {
1031         struct filter_obd *filter = &obd->u.filter;
1032         LASSERT(group < filter->fo_group_count);
1033         LASSERT(group > 0);
1034
1035         if (filter->fo_subdir_count == 0)
1036                 return filter->fo_groups[group];
1037
1038         return filter->fo_subdirs[group].dentry[objid & (filter->fo_subdir_count - 1)];
1039 }
1040
1041 /* We never dget the object parent, so DON'T dput it either */
1042 struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
1043                                   obd_id objid, void **lock)
1044 {
1045         unsigned long now = jiffies;
1046         struct dentry *dparent = filter_parent(obd, group, objid);
1047
1048         if (IS_ERR(dparent))
1049                 return dparent;
1050
1051         LASSERT(dparent);
1052         LASSERT(dparent->d_inode);
1053
1054         *lock = filter_lock_dentry(obd, dparent, objid);
1055         if (time_after(jiffies, now + 15 * HZ))
1056                 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
1057         return dparent;
1058 }
1059
1060 /* How to get files, dentries, inodes from object id's.
1061  *
1062  * If dir_dentry is passed, the caller has already locked the parent
1063  * appropriately for this operation (normally a write lock).  If
1064  * dir_dentry is NULL, we do a read lock while we do the lookup to
1065  * avoid races with create/destroy and such changing the directory
1066  * internal to the filesystem code. */
1067 struct dentry *filter_fid2dentry(struct obd_device *obd,
1068                                  struct dentry *dir_dentry,
1069                                  obd_gr group, obd_id id)
1070 {
1071         struct dentry *dparent = dir_dentry;
1072         struct dentry *dchild;
1073         void *lock = NULL;
1074         char name[32];
1075         int len;
1076         ENTRY;
1077
1078         if (id == 0) {
1079                 CERROR("fatal: invalid object id 0\n");
1080                 RETURN(ERR_PTR(-ESTALE));
1081         }
1082
1083         len = sprintf(name, LPU64, id);
1084         if (dir_dentry == NULL) {
1085                 dparent = filter_parent_lock(obd, group, id, &lock);
1086                 if (IS_ERR(dparent))
1087                         RETURN(dparent);
1088         }
1089         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
1090                dparent->d_name.len, dparent->d_name.name, name);
1091         dchild = /*ll_*/lookup_one_len(name, dparent, len);
1092         if (dir_dentry == NULL)
1093                 filter_parent_unlock(dparent, lock);
1094         if (IS_ERR(dchild)) {
1095                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
1096                 RETURN(dchild);
1097         }
1098
1099         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
1100                name, dchild, atomic_read(&dchild->d_count));
1101
1102         LASSERT(atomic_read(&dchild->d_count) > 0);
1103
1104         RETURN(dchild);
1105 }
1106
1107 static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
1108                                   obd_id group)
1109 {
1110         struct lustre_handle lockh;
1111         int flags = LDLM_AST_DISCARD_DATA, rc;
1112         struct ldlm_res_id res_id = { .name = { objid, 0, group, 0 } };
1113         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1114
1115         ENTRY;
1116         /* Tell the clients that the object is gone now and that they should
1117          * throw away any cached pages. */
1118         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
1119                               LDLM_EXTENT, &policy, LCK_PW,
1120                               &flags, filter_blocking_ast, ldlm_completion_ast,
1121                               NULL, NULL, NULL, 0, NULL, &lockh);
1122
1123         /* We only care about the side-effects, just drop the lock. */
1124         if (rc == ELDLM_OK)
1125                 ldlm_lock_decref(&lockh, LCK_PW);
1126
1127         RETURN(rc);
1128 }
1129
1130 /* Caller must hold LCK_PW on parent and push us into kernel context.
1131  * Caller is also required to ensure that dchild->d_inode exists. */
1132 static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
1133                                    struct dentry *dparent,
1134                                    struct dentry *dchild)
1135 {
1136         struct inode *inode = dchild->d_inode;
1137         int rc;
1138         ENTRY;
1139
1140         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1141                 CERROR("destroying objid %*s nlink = %lu, count = %d\n",
1142                        dchild->d_name.len, dchild->d_name.name,
1143                        (unsigned long)inode->i_nlink,
1144                        atomic_read(&inode->i_count));
1145         }
1146
1147         rc = vfs_unlink(dparent->d_inode, dchild);
1148
1149         if (rc)
1150                 CERROR("error unlinking objid %*s: rc %d\n",
1151                        dchild->d_name.len, dchild->d_name.name, rc);
1152
1153         RETURN(rc);
1154 }
1155
1156 static int filter_intent_policy(struct ldlm_namespace *ns,
1157                                 struct ldlm_lock **lockp, void *req_cookie,
1158                                 ldlm_mode_t mode, int flags, void *data)
1159 {
1160         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1161         struct ptlrpc_request *req = req_cookie;
1162         struct ldlm_lock *lock = *lockp, *l = NULL;
1163         struct ldlm_resource *res = lock->l_resource;
1164         ldlm_processing_policy policy;
1165         struct ost_lvb *res_lvb, *reply_lvb;
1166         struct list_head *tmp;
1167         ldlm_error_t err;
1168         int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply),
1169                                             sizeof(struct ost_lvb) };
1170         ENTRY;
1171
1172         policy = ldlm_get_processing_policy(res);
1173         LASSERT(policy != NULL);
1174         LASSERT(req != NULL);
1175
1176         rc = lustre_pack_reply(req, 2, repsize, NULL);
1177         if (rc)
1178                 RETURN(req->rq_status = rc);
1179
1180         reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
1181         LASSERT(reply_lvb != NULL);
1182
1183         //fixup_handle_for_resent_req(req, lock, &lockh);
1184
1185         /* If we grant any lock at all, it will be a whole-file read lock.
1186          * Call the extent policy function to see if our request can be
1187          * granted, or is blocked. */
1188         lock->l_policy_data.l_extent.start = 0;
1189         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
1190         lock->l_req_mode = LCK_PR;
1191
1192         l_lock(&res->lr_namespace->ns_lock);
1193
1194         res->lr_tmp = &rpc_list;
1195         rc = policy(lock, &tmpflags, 0, &err);
1196         res->lr_tmp = NULL;
1197
1198         /* FIXME: we should change the policy function slightly, to not make
1199          * this list at all, since we just turn around and free it */
1200         while (!list_empty(&rpc_list)) {
1201                 struct ldlm_ast_work *w =
1202                         list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
1203                 list_del(&w->w_list);
1204                 LDLM_LOCK_PUT(w->w_lock);
1205                 OBD_FREE(w, sizeof(*w));
1206         }
1207
1208         if (rc == LDLM_ITER_CONTINUE) {
1209                 /* The lock met with no resistance; we're finished. */
1210                 l_unlock(&res->lr_namespace->ns_lock);
1211                 RETURN(ELDLM_LOCK_REPLACED);
1212         }
1213
1214         /* Do not grant any lock, but instead send GL callbacks.  The extent
1215          * policy nicely created a list of all PW locks for us.  We will choose
1216          * the highest of those which are larger than the size in the LVB, if
1217          * any, and perform a glimpse callback. */
1218         down(&res->lr_lvb_sem);
1219         res_lvb = res->lr_lvb_data;
1220         LASSERT(res_lvb != NULL);
1221         reply_lvb->lvb_size = res_lvb->lvb_size;
1222         reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
1223         up(&res->lr_lvb_sem);
1224
1225         list_for_each(tmp, &res->lr_granted) {
1226                 struct ldlm_lock *tmplock =
1227                         list_entry(tmp, struct ldlm_lock, l_res_link);
1228
1229                 if (tmplock->l_granted_mode == LCK_PR)
1230                         continue;
1231
1232                 if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
1233                         continue;
1234
1235                 if (l == NULL) {
1236                         l = LDLM_LOCK_GET(tmplock);
1237                         continue;
1238                 }
1239
1240                 if (l->l_policy_data.l_extent.start >
1241                     tmplock->l_policy_data.l_extent.start)
1242                         continue;
1243
1244                 LDLM_LOCK_PUT(l);
1245                 l = LDLM_LOCK_GET(tmplock);
1246         }
1247         l_unlock(&res->lr_namespace->ns_lock);
1248
1249         /* There were no PW locks beyond the size in the LVB; finished. */
1250         if (l == NULL)
1251                 RETURN(ELDLM_LOCK_ABORTED);
1252
1253         LASSERT(l->l_glimpse_ast != NULL);
1254         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
1255         if (rc != 0 && res->lr_namespace->ns_lvbo &&
1256             res->lr_namespace->ns_lvbo->lvbo_update) {
1257                 res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
1258         }
1259
1260         down(&res->lr_lvb_sem);
1261         reply_lvb->lvb_size = res_lvb->lvb_size;
1262         reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
1263         up(&res->lr_lvb_sem);
1264
1265         LDLM_LOCK_PUT(l);
1266
1267         RETURN(ELDLM_LOCK_ABORTED);
1268 }
1269
1270 static int filter_post_fs_cleanup(struct obd_device *obd)
1271 {
1272         int rc = 0;
1273
1274         rc = fsfilt_post_cleanup(obd);
1275
1276         RETURN(rc);
1277 }
1278
1279 static int filter_group_set_fs_flags(struct obd_device *obd, int group)
1280 {
1281         struct filter_obd *filter = &obd->u.filter;
1282         int rc = 0, i = 0;
1283         ENTRY;        
1284         
1285         /* zero group is not longer valid. */
1286         if (group== 0)
1287                 RETURN(rc); 
1288         for (i = 0; i < filter->fo_subdir_count; i++) {
1289                 struct dentry *dentry;
1290                 dentry = (filter->fo_subdirs + group)->dentry[i];
1291                 rc = fsfilt_set_fs_flags(obd, dentry->d_inode, 
1292                                          SM_DO_REC | SM_DO_COW);
1293                 if (rc)
1294                         RETURN(rc);
1295         }
1296         RETURN(rc);
1297 }
1298 static int filter_post_fs_setup(struct obd_device *obd)
1299 {
1300         struct filter_obd *filter = &obd->u.filter;
1301         int rc = 0, j = 0;
1302         struct llog_ctxt *ctxt = NULL;
1303
1304         rc = fsfilt_post_setup(obd);
1305         if (rc)
1306                 RETURN(rc);
1307         
1308         for (j = 0; j < filter->fo_group_count; j++) {
1309                 rc = filter_group_set_fs_flags(obd, j);
1310                 if (rc)
1311                         return rc;
1312         } 
1313
1314         fsfilt_get_reint_log_ctxt(obd, filter->fo_sb, &ctxt);
1315         if (ctxt) {
1316                 ctxt->loc_obd = obd;
1317                 ctxt->loc_idx = LLOG_REINT_ORIG_CTXT;
1318                 obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT] = ctxt;
1319         }
1320         fsfilt_set_ost_flags(obd, filter->fo_sb);
1321         return rc;
1322 }
1323
1324 /* mount the file system (secretly) */
1325 int filter_common_setup(struct obd_device *obd, obd_count len,
1326                         void *buf, char *option)
1327 {
1328         struct lustre_cfg* lcfg = buf;
1329         struct filter_obd *filter = &obd->u.filter;
1330         struct vfsmount *mnt;
1331         char name[32] = "CATLIST";
1332         int rc = 0;
1333         ENTRY;
1334
1335         dev_clear_rdonly(2);
1336
1337         if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2)
1338                 RETURN(-EINVAL);
1339
1340         obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2);
1341         if (IS_ERR(obd->obd_fsops))
1342                 RETURN(PTR_ERR(obd->obd_fsops));
1343
1344         mnt = do_kern_mount(lcfg->lcfg_inlbuf2, MS_NOATIME | MS_NODIRATIME,
1345                             lcfg->lcfg_inlbuf1, option);
1346         rc = PTR_ERR(mnt);
1347         if (IS_ERR(mnt))
1348                 GOTO(err_ops, rc);
1349
1350         if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) {
1351                 if (*lcfg->lcfg_inlbuf3 == 'f') {
1352                         obd->obd_replayable = 1;
1353                         obd_sync_filter = 1;
1354                         CWARN("%s: recovery enabled\n", obd->obd_name);
1355                 } else {
1356                         if (*lcfg->lcfg_inlbuf3 != 'n') {
1357                                 CERROR("unrecognised flag '%c'\n",
1358                                        *lcfg->lcfg_inlbuf3);
1359                         }
1360                         // XXX Robert? Why do we get errors here
1361                         // GOTO(err_mntput, rc = -EINVAL);
1362                 }
1363         }
1364
1365         filter->fo_vfsmnt = mnt;
1366         filter->fo_sb = mnt->mnt_sb;
1367         filter->fo_fstype = mnt->mnt_sb->s_type->name;
1368         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1369
1370         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1371         obd->obd_lvfs_ctxt.pwdmnt = mnt;
1372         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
1373         obd->obd_lvfs_ctxt.fs = get_ds();
1374         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
1375
1376         rc = fsfilt_setup(obd, mnt->mnt_sb);
1377         if (rc)
1378                 GOTO(err_mntput, rc);
1379
1380         rc = filter_prep(obd);
1381         if (rc)
1382                 GOTO(err_mntput, rc);
1383
1384
1385         filter->fo_destroy_in_progress = 0;
1386         sema_init(&filter->fo_create_lock, 1);
1387
1388         spin_lock_init(&filter->fo_translock);
1389         spin_lock_init(&filter->fo_objidlock);
1390         INIT_LIST_HEAD(&filter->fo_export_list);
1391         sema_init(&filter->fo_alloc_lock, 1);
1392         spin_lock_init(&filter->fo_r_pages.oh_lock);
1393         spin_lock_init(&filter->fo_w_pages.oh_lock);
1394         spin_lock_init(&filter->fo_r_discont_pages.oh_lock);
1395         spin_lock_init(&filter->fo_w_discont_pages.oh_lock);
1396         spin_lock_init(&filter->fo_r_discont_blocks.oh_lock);
1397         spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
1398         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
1399
1400         INIT_LIST_HEAD(&filter->fo_llog_list);
1401         spin_lock_init(&filter->fo_llog_list_lock);
1402
1403         obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1404                                                 LDLM_NAMESPACE_SERVER);
1405         if (obd->obd_namespace == NULL)
1406                 GOTO(err_post, rc = -ENOMEM);
1407         obd->obd_namespace->ns_lvbp = obd;
1408         obd->obd_namespace->ns_lvbo = &filter_lvbo;
1409         ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
1410
1411         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1412                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1413
1414         rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, name);
1415         if (rc) {
1416                 CERROR("failed to setup llogging subsystems\n");
1417                 GOTO(err_post, rc);
1418         }
1419         RETURN(0);
1420
1421 err_post:
1422         filter_post(obd);
1423 err_mntput:
1424         unlock_kernel();
1425         mntput(mnt);
1426         filter->fo_sb = 0;
1427         lock_kernel();
1428 err_ops:
1429         fsfilt_put_ops(obd->obd_fsops);
1430         return rc;
1431 }
1432
1433 static int filter_attach(struct obd_device *obd, obd_count len, void *data)
1434 {
1435         struct lprocfs_static_vars lvars;
1436         int rc;
1437
1438         lprocfs_init_vars(filter, &lvars);
1439         rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1440         if (rc != 0)
1441                 return rc;
1442
1443         rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1444         if (rc != 0)
1445                 return rc;
1446
1447         /* Init obdfilter private stats here */
1448         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1449                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1450         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1451                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1452
1453         return lproc_filter_attach_seqstat(obd);
1454 }
1455
1456 static int filter_detach(struct obd_device *dev)
1457 {
1458         lprocfs_free_obd_stats(dev);
1459         return lprocfs_obd_detach(dev);
1460 }
1461
1462 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1463 {
1464         struct lustre_cfg* lcfg = buf;
1465         int rc;
1466         ENTRY;
1467         /* all mount options including errors=remount-ro and asyncdel are passed
1468          * using 4th lcfg param. And it is good, finally we have got rid of
1469          * hardcoded fs types in the code. */
1470         rc = filter_common_setup(obd, len, buf, lcfg->lcfg_inlbuf4);
1471         if (rc)
1472                 RETURN(rc);
1473         rc = filter_post_fs_setup(obd);
1474         RETURN(rc);
1475 }
1476
1477 static int filter_cleanup(struct obd_device *obd, int flags)
1478 {
1479         struct filter_obd *filter = &obd->u.filter;
1480         ENTRY;
1481
1482         if (flags & OBD_OPT_FAILOVER)
1483                 CERROR("%s: shutting down for failover; client state will"
1484                        " be preserved.\n", obd->obd_name);
1485
1486         if (!list_empty(&obd->obd_exports)) {
1487                 CERROR("%s: still has clients!\n", obd->obd_name);
1488                 class_disconnect_exports(obd, flags);
1489                 if (!list_empty(&obd->obd_exports)) {
1490                         CERROR("still has exports after forced cleanup?\n");
1491                         RETURN(-EBUSY);
1492                 }
1493         }
1494
1495         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
1496
1497         if (filter->fo_sb == NULL)
1498                 RETURN(0);
1499
1500         filter_post_fs_cleanup(obd);
1501         filter_post(obd);
1502
1503         shrink_dcache_parent(filter->fo_sb->s_root);
1504         filter->fo_sb = 0;
1505
1506         if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
1507                 CERROR("%s: mount point %p busy, mnt_count: %d\n",
1508                        obd->obd_name, filter->fo_vfsmnt,
1509                        atomic_read(&filter->fo_vfsmnt->mnt_count));
1510
1511         unlock_kernel();
1512         mntput(filter->fo_vfsmnt);
1513         //destroy_buffers(filter->fo_sb->s_dev);
1514         filter->fo_sb = NULL;
1515         fsfilt_put_ops(obd->obd_fsops);
1516         lock_kernel();
1517
1518         dev_clear_rdonly(2);
1519
1520         RETURN(0);
1521 }
1522
1523 /* nearly identical to mds_connect */
1524 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1525                           struct obd_uuid *cluuid)
1526 {
1527         struct obd_export *exp;
1528         struct filter_export_data *fed;
1529         struct filter_client_data *fcd = NULL;
1530         struct filter_obd *filter = &obd->u.filter;
1531         int rc;
1532         ENTRY;
1533
1534         if (conn == NULL || obd == NULL || cluuid == NULL)
1535                 RETURN(-EINVAL);
1536
1537         rc = class_connect(conn, obd, cluuid);
1538         if (rc)
1539                 RETURN(rc);
1540         exp = class_conn2export(conn);
1541         LASSERT(exp != NULL);
1542
1543         fed = &exp->exp_filter_data;
1544
1545         spin_lock_init(&fed->fed_lock);
1546
1547         if (!obd->obd_replayable)
1548                 GOTO(cleanup, rc = 0);
1549
1550         OBD_ALLOC(fcd, sizeof(*fcd));
1551         if (!fcd) {
1552                 CERROR("filter: out of memory for client data\n");
1553                 GOTO(cleanup, rc = -ENOMEM);
1554         }
1555
1556         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1557         fed->fed_fcd = fcd;
1558
1559         rc = filter_client_add(obd, filter, fed, -1);
1560
1561 cleanup:
1562         if (rc) {
1563                 if (fcd)
1564                         OBD_FREE(fcd, sizeof(*fcd));
1565                 class_disconnect(exp, 0);
1566         } else {
1567                 class_export_put(exp);
1568         }
1569         return rc;
1570 }
1571
1572 static int filter_precleanup(struct obd_device *obd, int flags)
1573 {
1574         struct filter_group_llog *log;
1575         struct filter_obd *filter;
1576         int rc = 0;
1577         ENTRY;
1578
1579         filter = &obd->u.filter;
1580
1581         spin_lock(&filter->fo_llog_list_lock);
1582         while (!list_empty(&filter->fo_llog_list)) {
1583                 log = list_entry(filter->fo_llog_list.next,
1584                                  struct filter_group_llog, list);
1585                 list_del(&log->list);
1586                 spin_unlock(&filter->fo_llog_list_lock);
1587
1588                 rc = obd_llog_finish(obd, log->llogs, 0);
1589                 if (rc)
1590                         CERROR("failed to cleanup llogging subsystem for %u\n",
1591                                 log->group);
1592                 OBD_FREE(log->llogs, sizeof(*(log->llogs)));
1593                 OBD_FREE(log, sizeof(*log));
1594                 spin_lock(&filter->fo_llog_list_lock);
1595         }
1596         spin_unlock(&filter->fo_llog_list_lock);
1597
1598         rc = obd_llog_finish(obd, &obd->obd_llogs, 0);
1599         if (rc)
1600                 CERROR("failed to cleanup llogging subsystem\n");
1601
1602         RETURN(rc);
1603 }
1604
1605 /* Do extra sanity checks for grant accounting.  We do this at connect,
1606  * disconnect, and statfs RPC time, so it shouldn't be too bad.  We can
1607  * always get rid of it or turn it off when we know accounting is good. */
1608 static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
1609 {
1610         struct filter_export_data *fed;
1611         struct obd_export *exp;
1612         obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
1613         obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
1614         obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
1615
1616         if (list_empty(&obd->obd_exports))
1617                 return;
1618
1619         spin_lock(&obd->obd_osfs_lock);
1620         spin_lock(&obd->obd_dev_lock);
1621         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1622                 fed = &exp->exp_filter_data;
1623                 LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
1624                          "cli %s/%p %lu+%lu > "LPU64"\n",
1625                          exp->exp_client_uuid.uuid, exp,
1626                          fed->fed_grant, fed->fed_pending, maxsize);
1627                 LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
1628                          exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize);
1629                 CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
1630                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
1631                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
1632                 tot_granted += fed->fed_grant + fed->fed_pending;
1633                 tot_pending += fed->fed_pending;
1634                 tot_dirty += fed->fed_dirty;
1635         }
1636         fo_tot_granted = obd->u.filter.fo_tot_granted;
1637         fo_tot_pending = obd->u.filter.fo_tot_pending;
1638         fo_tot_dirty = obd->u.filter.fo_tot_dirty;
1639         spin_unlock(&obd->obd_dev_lock);
1640         spin_unlock(&obd->obd_osfs_lock);
1641
1642         /* Do these assertions outside the spinlocks so we don't kill system */
1643         if (tot_granted != fo_tot_granted)
1644                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
1645                        func, tot_granted, fo_tot_granted);
1646         if (tot_pending != fo_tot_pending)
1647                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
1648                        func, tot_pending, fo_tot_pending);
1649         if (tot_dirty != fo_tot_dirty)
1650                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
1651                        func, tot_dirty, fo_tot_dirty);
1652         if (tot_pending > tot_granted)
1653                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
1654                        func, tot_pending, tot_granted);
1655         if (tot_granted > maxsize)
1656                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
1657                        func, tot_granted, maxsize);
1658         if (tot_dirty > maxsize)
1659                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
1660                        func, tot_dirty, maxsize);
1661 }
1662
1663 /* Remove this client from the grant accounting totals.  We also remove
1664  * the export from the obd device under the osfs and dev locks to ensure
1665  * that the filter_grant_sanity_check() calculations are always valid.
1666  * The client should do something similar when it invalidates its import. */
1667 static void filter_grant_discard(struct obd_export *exp)
1668 {
1669         struct obd_device *obd = exp->exp_obd;
1670         struct filter_obd *filter = &obd->u.filter;
1671         struct filter_export_data *fed = &exp->exp_filter_data;
1672
1673         spin_lock(&obd->obd_osfs_lock);
1674         spin_lock(&exp->exp_obd->obd_dev_lock);
1675         list_del_init(&exp->exp_obd_chain);
1676         spin_unlock(&exp->exp_obd->obd_dev_lock);
1677
1678         CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
1679                obd->obd_name, exp->exp_client_uuid.uuid, exp,
1680                fed->fed_dirty, fed->fed_pending, fed->fed_grant);
1681
1682         LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
1683                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
1684                  obd->obd_name, filter->fo_tot_granted,
1685                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
1686         filter->fo_tot_granted -= fed->fed_grant;
1687         LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
1688                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
1689                  obd->obd_name, filter->fo_tot_pending,
1690                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
1691         LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
1692                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
1693                  obd->obd_name, filter->fo_tot_dirty,
1694                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
1695         filter->fo_tot_dirty -= fed->fed_dirty;
1696         fed->fed_dirty = 0;
1697         fed->fed_grant = 0;
1698
1699         spin_unlock(&obd->obd_osfs_lock);
1700 }
1701
1702 static int filter_destroy_export(struct obd_export *exp)
1703 {
1704         ENTRY;
1705
1706         if (exp->exp_filter_data.fed_pending)
1707                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
1708                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1709                        exp, exp->exp_filter_data.fed_pending);
1710
1711         target_destroy_export(exp);
1712
1713         if (exp->exp_obd->obd_replayable)
1714                 filter_client_free(exp, exp->exp_flags);
1715
1716         filter_grant_discard(exp);
1717         if (!(exp->exp_flags & OBD_OPT_FORCE))
1718                 filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
1719
1720         RETURN(0);
1721 }
1722
1723 static void filter_sync_llogs(struct obd_export *dexp)
1724 {
1725         struct filter_group_llog *fglog, *nlog;
1726         struct obd_device *obd = dexp->exp_obd;
1727         struct filter_obd *filter;
1728         int worked = 0, group;
1729         struct llog_ctxt *ctxt;
1730
1731         filter = &obd->u.filter;
1732
1733         /* we can't sync log holding spinlock. also, we do not want to get
1734          * into livelock. so we do following: loop over MDS's exports in
1735          * group order and skip already synced llogs -bzzz */
1736         do {
1737                 /* look for group with min. number, but > worked */
1738                 fglog = NULL;
1739                 group = 1 << 30;
1740                 spin_lock(&filter->fo_llog_list_lock);
1741                 list_for_each_entry(nlog, &filter->fo_llog_list, list) {
1742                        
1743                         if (nlog->group <= worked) {
1744                                 /* this group is already synced */
1745                                 continue;
1746                         }
1747         
1748                         if (group < nlog->group) {
1749                                 /* we have group with smaller number to sync */
1750                                 continue;
1751                         }
1752
1753                         /* store current minimal group */
1754                         fglog = nlog;
1755                         group = nlog->group;
1756                 }
1757                 spin_unlock(&filter->fo_llog_list_lock);
1758
1759                 if (fglog) {
1760                         worked = fglog->group;
1761                         ctxt = llog_get_context(fglog->llogs,
1762                                                 LLOG_UNLINK_REPL_CTXT);
1763                         llog_sync(ctxt, dexp);
1764                 }
1765         } while (fglog != NULL);
1766 }
1767
1768 /* also incredibly similar to mds_disconnect */
1769 static int filter_disconnect(struct obd_export *exp, int flags)
1770 {
1771         struct obd_device *obd = exp->exp_obd;
1772         unsigned long irqflags;
1773         int rc;
1774         ENTRY;
1775
1776         LASSERT(exp);
1777         class_export_get(exp);
1778
1779         spin_lock_irqsave(&exp->exp_lock, irqflags);
1780         exp->exp_flags = flags;
1781         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1782
1783         if (!(flags & OBD_OPT_FORCE))
1784                 filter_grant_sanity_check(obd, __FUNCTION__);
1785         filter_grant_discard(exp);
1786
1787         /* Disconnect early so that clients can't keep using export */
1788         rc = class_disconnect(exp, flags);
1789
1790         ldlm_cancel_locks_for_export(exp);
1791
1792         fsfilt_sync(obd, obd->u.filter.fo_sb);
1793
1794         /* flush any remaining cancel messages out to the target */
1795         filter_sync_llogs(exp);
1796
1797         class_export_put(exp);
1798         RETURN(rc);
1799 }
1800
1801 struct dentry *__filter_oa2dentry(struct obd_device *obd,
1802                                   struct obdo *oa, const char *what)
1803 {
1804         struct dentry *dchild = NULL;
1805         obd_gr group = 0;
1806
1807         if (oa->o_valid & OBD_MD_FLGROUP)
1808                 group = oa->o_gr;
1809
1810         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
1811
1812         if (IS_ERR(dchild)) {
1813                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1814                 RETURN(dchild);
1815         }
1816
1817         if (dchild->d_inode == NULL) {
1818                 CERROR("%s: %s on non-existent object: "LPU64"\n",
1819                        obd->obd_name, what, oa->o_id);
1820                 f_dput(dchild);
1821                 RETURN(ERR_PTR(-ENOENT));
1822         }
1823
1824         return dchild;
1825 }
1826
1827 static int filter_getattr(struct obd_export *exp, struct obdo *oa,
1828                           struct lov_stripe_md *md)
1829 {
1830         struct dentry *dentry = NULL;
1831         struct obd_device *obd;
1832         int rc = 0;
1833         ENTRY;
1834
1835         obd = class_exp2obd(exp);
1836         if (obd == NULL) {
1837                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1838                        exp->exp_handle.h_cookie);
1839                 RETURN(-EINVAL);
1840         }
1841
1842         dentry = filter_oa2dentry(obd, oa);
1843         if (IS_ERR(dentry))
1844                 RETURN(PTR_ERR(dentry));
1845
1846         /* Limit the valid bits in the return data to what we actually use */
1847         oa->o_valid = OBD_MD_FLID;
1848         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1849
1850         f_dput(dentry);
1851         RETURN(rc);
1852 }
1853
1854 /* this is called from filter_truncate() until we have filter_punch() */
1855 static int filter_setattr(struct obd_export *exp, struct obdo *oa,
1856                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1857 {
1858         struct lvfs_run_ctxt saved;
1859         struct filter_obd *filter;
1860         struct dentry *dentry;
1861         struct iattr iattr;
1862         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
1863         struct ldlm_resource *res;
1864         void *handle;
1865         int rc, rc2;
1866         ENTRY;
1867
1868         LASSERT(oti != NULL);
1869
1870         dentry = filter_oa2dentry(exp->exp_obd, oa);
1871         if (IS_ERR(dentry))
1872                 RETURN(PTR_ERR(dentry));
1873
1874         filter = &exp->exp_obd->u.filter;
1875
1876         iattr_from_obdo(&iattr, oa, oa->o_valid);
1877
1878         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1879         lock_kernel();
1880
1881         if (iattr.ia_valid & ATTR_SIZE)
1882                 down(&dentry->d_inode->i_sem);
1883         handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
1884                               oti);
1885         if (IS_ERR(handle))
1886                 GOTO(out_unlock, rc = PTR_ERR(handle));
1887
1888         /* XXX this could be a rwsem instead, if filter_preprw played along */
1889         if (iattr.ia_valid & ATTR_ATTR_FLAG)
1890                 rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
1891                                       EXT3_IOC_SETFLAGS,
1892                                       (long)&iattr.ia_attr_flags);
1893         else
1894                 rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
1895         rc = filter_finish_transno(exp, oti, rc);
1896         rc2 = fsfilt_commit(exp->exp_obd, filter->fo_sb, dentry->d_inode, handle, 0);
1897         if (rc2) {
1898                 CERROR("error on commit, err = %d\n", rc2);
1899                 if (!rc)
1900                         rc = rc2;
1901         }
1902
1903         if (iattr.ia_valid & ATTR_SIZE) {
1904                 res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
1905                                         res_id, LDLM_EXTENT, 0);
1906                 if (res == NULL) {
1907                         CERROR("!!! resource_get failed for object "LPU64" -- "
1908                                "filter_setattr with no lock?\n", oa->o_id);
1909                 } else {
1910                         if (res->lr_namespace->ns_lvbo &&
1911                             res->lr_namespace->ns_lvbo->lvbo_update) {
1912                                 rc = res->lr_namespace->ns_lvbo->lvbo_update
1913                                         (res, NULL, 0, 0);
1914                         }
1915                         ldlm_resource_putref(res);
1916                 }
1917         }
1918
1919         oa->o_valid = OBD_MD_FLID;
1920         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1921
1922 out_unlock:
1923         if (iattr.ia_valid & ATTR_SIZE)
1924                 up(&dentry->d_inode->i_sem);
1925         unlock_kernel();
1926         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1927
1928         f_dput(dentry);
1929         RETURN(rc);
1930 }
1931
1932 /* XXX identical to osc_unpackmd */
1933 static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
1934                            struct lov_mds_md *lmm, int lmm_bytes)
1935 {
1936         int lsm_size;
1937         ENTRY;
1938
1939         if (lmm != NULL) {
1940                 if (lmm_bytes < sizeof (*lmm)) {
1941                         CERROR("lov_mds_md too small: %d, need %d\n",
1942                                lmm_bytes, (int)sizeof(*lmm));
1943                         RETURN(-EINVAL);
1944                 }
1945                 /* XXX LOV_MAGIC etc check? */
1946
1947                 if (lmm->lmm_object_id == cpu_to_le64(0)) {
1948                         CERROR("lov_mds_md: zero lmm_object_id\n");
1949                         RETURN(-EINVAL);
1950                 }
1951         }
1952
1953         lsm_size = lov_stripe_md_size(1);
1954         if (lsmp == NULL)
1955                 RETURN(lsm_size);
1956
1957         if (*lsmp != NULL && lmm == NULL) {
1958                 OBD_FREE(*lsmp, lsm_size);
1959                 *lsmp = NULL;
1960                 RETURN(0);
1961         }
1962
1963         if (*lsmp == NULL) {
1964                 OBD_ALLOC(*lsmp, lsm_size);
1965                 if (*lsmp == NULL)
1966                         RETURN(-ENOMEM);
1967
1968                 loi_init((*lsmp)->lsm_oinfo);
1969         }
1970
1971         if (lmm != NULL) {
1972                 /* XXX zero *lsmp? */
1973                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
1974                 LASSERT((*lsmp)->lsm_object_id);
1975         }
1976
1977         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
1978
1979         RETURN(lsm_size);
1980 }
1981
1982 static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
1983                                       struct filter_obd *filter)
1984 {
1985         struct obdo doa; /* XXX obdo on stack */
1986         __u64 last, id;
1987         ENTRY;
1988         LASSERT(oa);
1989
1990         memset(&doa, 0, sizeof(doa));
1991         if (oa->o_valid & OBD_MD_FLGROUP) {
1992                 doa.o_valid |= OBD_MD_FLGROUP;
1993                 doa.o_gr = oa->o_gr;
1994         } else {
1995                 doa.o_gr = 0;
1996         }
1997         doa.o_mode = S_IFREG;
1998         doa.o_gr = oa->o_gr;
1999         doa.o_valid = oa->o_valid & OBD_MD_FLGROUP;
2000
2001         filter->fo_destroy_in_progress = 1;
2002         down(&filter->fo_create_lock);
2003         if (!filter->fo_destroy_in_progress) {
2004                 CERROR("%s: destroy_in_progress already cleared\n",
2005                         exp->exp_obd->obd_name);
2006                 up(&filter->fo_create_lock);
2007                 EXIT;
2008                 return;
2009         }
2010
2011         last = filter_last_id(filter, doa.o_gr);
2012         CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
2013                exp->exp_obd->obd_name, oa->o_id + 1, last);
2014         for (id = oa->o_id + 1; id <= last; id++) {
2015                 doa.o_id = id;
2016                 filter_destroy(exp, &doa, NULL, NULL);
2017         }
2018
2019         CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
2020                exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
2021
2022         spin_lock(&filter->fo_objidlock);
2023         filter->fo_last_objids[doa.o_gr] = oa->o_id;
2024         spin_unlock(&filter->fo_objidlock);
2025
2026         filter->fo_destroy_in_progress = 0;
2027         up(&filter->fo_create_lock);
2028
2029         EXIT;
2030 }
2031
2032 /* returns a negative error or a nonnegative number of files to create */
2033 static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
2034                                    obd_gr group)
2035 {
2036         struct obd_device *obd = exp->exp_obd;
2037         struct filter_obd *filter = &obd->u.filter;
2038         int diff, rc;
2039         ENTRY;
2040
2041         diff = oa->o_id - filter_last_id(filter, oa->o_gr);
2042         CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
2043                filter_last_id(filter, oa->o_gr), diff);
2044
2045         /* delete orphans request */
2046         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2047             (oa->o_flags & OBD_FL_DELORPHAN)) {
2048                 if (diff >= 0)
2049                         RETURN(diff);
2050                 if (-diff > 10000) { /* XXX make this smarter */
2051                         CERROR("ignoring bogus orphan destroy request: obdid "
2052                                LPU64" last_id "LPU64"\n",
2053                                oa->o_id, filter_last_id(filter, oa->o_gr));
2054                         RETURN(-EINVAL);
2055                 }
2056                 filter_destroy_precreated(exp, oa, filter);
2057                 rc = filter_update_last_objid(obd, group, 0);
2058                 if (rc)
2059                         CERROR("unable to write lastobjid, but orphans"
2060                                "were deleted\n");
2061                 RETURN(0);
2062         } else {
2063                 /* only precreate if group == 0 and o_id is specfied */
2064                 if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
2065                     (/*group != 0 ||*/ oa->o_id == 0))
2066                         RETURN(1);
2067
2068                 LASSERT(diff >= 0);
2069                 RETURN(diff);
2070         }
2071 }
2072 static int filter_precreate_rec(struct obd_device *obd, struct dentry *dentry, 
2073                                 int *number, struct obdo *oa)
2074 {
2075         int rc;
2076         ENTRY;       
2077          
2078         rc = fsfilt_precreate_rec(obd, dentry, number, oa);
2079   
2080         RETURN(rc);
2081 }
2082
2083 /* We rely on the fact that only one thread will be creating files in a given
2084  * group at a time, which is why we don't need an atomic filter_get_new_id.
2085  * Even if we had that atomic function, the following race would exist:
2086  *
2087  * thread 1: gets id x from filter_next_id
2088  * thread 2: gets id (x + 1) from filter_next_id
2089  * thread 2: creates object (x + 1)
2090  * thread 1: tries to create object x, gets -ENOSPC
2091  */
2092 static int filter_precreate(struct obd_device *obd, struct obdo *oa,
2093                             obd_gr group, int *num)
2094 {
2095         struct dentry *dchild = NULL, *dparent = NULL;
2096         struct filter_obd *filter;
2097         int err = 0, rc = 0, recreate_obj = 0, i;
2098         __u64 next_id;
2099         void *handle = NULL;
2100         void *lock = NULL;
2101         ENTRY;
2102
2103         filter = &obd->u.filter;
2104
2105         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2106             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2107                 recreate_obj = 1;
2108         }
2109
2110         CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
2111
2112         down(&filter->fo_create_lock);
2113
2114         for (i = 0; i < *num && err == 0; i++) {
2115                 int cleanup_phase = 0;
2116
2117                 if (filter->fo_destroy_in_progress) {
2118                         CWARN("%s: precreate aborted by destroy\n",
2119                               obd->obd_name);
2120                         break;
2121                 }
2122
2123                 if (recreate_obj) {
2124                         __u64 last_id;
2125                         next_id = oa->o_id;
2126                         last_id = filter_last_id(filter, group);
2127                         if (next_id > last_id) {
2128                                 CERROR("Error: Trying to recreate obj greater"
2129                                        "than last id "LPD64" > "LPD64"\n",
2130                                        next_id, last_id);
2131                                 GOTO(cleanup, rc = -EINVAL);
2132                         }
2133                 } else {
2134                         next_id = filter_last_id(filter, group) + 1;
2135                 }
2136
2137                 CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
2138
2139                 dparent = filter_parent_lock(obd, group, next_id, &lock);
2140                 if (IS_ERR(dparent))
2141                         GOTO(cleanup, rc = PTR_ERR(dparent));
2142                 cleanup_phase = 1;
2143
2144                 /*only do precreate rec record. so clean kml flags here*/
2145                 fsfilt_clear_fs_flags(obd, dparent->d_inode, 
2146                                       SM_DO_REC);
2147                 
2148                 dchild = filter_fid2dentry(obd, dparent, group, next_id);
2149                 if (IS_ERR(dchild))
2150                         GOTO(cleanup, rc = PTR_ERR(dchild));
2151                 cleanup_phase = 2;
2152
2153                 if (dchild->d_inode != NULL) {
2154                         /* This would only happen if lastobjid was bad on disk*/
2155                         /* Could also happen if recreating missing obj but
2156                          * already exists
2157                          */
2158                         if (recreate_obj) {
2159                                 CERROR("%s: Serious error: recreating obj %*s "
2160                                        "but obj already exists \n",
2161                                        obd->obd_name, dchild->d_name.len,
2162                                        dchild->d_name.name);
2163                                 LBUG();
2164                         } else {
2165                                 CERROR("%s: Serious error: objid %*s already "
2166                                        "exists; is this filesystem corrupt?\n",
2167                                        obd->obd_name, dchild->d_name.len,
2168                                        dchild->d_name.name);
2169                                 LBUG();
2170                         }
2171                         GOTO(cleanup, rc = -EEXIST);
2172                 }
2173
2174                 handle = fsfilt_start_log(obd, dparent->d_inode,
2175                                           FSFILT_OP_CREATE, NULL, 1);
2176                 if (IS_ERR(handle))
2177                         GOTO(cleanup, rc = PTR_ERR(handle));
2178                 cleanup_phase = 3;
2179
2180                 rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
2181                 if (rc) {
2182                         CERROR("create failed rc = %d\n", rc);
2183                         GOTO(cleanup, rc);
2184                 }
2185
2186                 if (!recreate_obj) {
2187                         filter_set_last_id(filter, group, next_id);
2188                         err = filter_update_last_objid(obd, group, 0);
2189                         if (err)
2190                                 CERROR("unable to write lastobjid "
2191                                        "but file created\n");
2192                 }
2193                 fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
2194         
2195         cleanup:
2196                 switch(cleanup_phase) {
2197                 case 3:
2198                         err = fsfilt_commit(obd, filter->fo_sb, dparent->d_inode, handle, 0);
2199                         if (err) {
2200                                 CERROR("error on commit, err = %d\n", err);
2201                                 if (!rc)
2202                                         rc = err;
2203                         }
2204                 case 2:
2205                         f_dput(dchild);
2206                 case 1:
2207                         filter_parent_unlock(dparent, lock);
2208                 case 0:
2209                         break;
2210                 }
2211
2212                 if (rc)
2213                         break;
2214         }
2215
2216         *num = i;
2217
2218         /* check if we have an error after ll_vfs_create(). It is possible that
2219          * there will be say -ENOSPC and we will leak it. */
2220         if (rc == 0)
2221                 rc = filter_precreate_rec(obd, dparent, num, oa);
2222         
2223         up(&filter->fo_create_lock);
2224         
2225         CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
2226                obd->obd_name, group, filter->fo_last_objids[group]);
2227
2228         CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
2229                obd->obd_name, i);
2230         
2231         RETURN(rc);
2232 }
2233
2234 static int filter_create(struct obd_export *exp, struct obdo *oa,
2235                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
2236 {
2237         struct obd_device *obd = NULL;
2238         struct filter_obd *filter;
2239         struct lvfs_run_ctxt saved;
2240         struct lov_stripe_md *lsm = NULL;
2241         struct filter_export_data *fed;
2242         char str[PTL_NALFMT_SIZE];
2243         int group = oa->o_gr, rc = 0, diff, recreate_objs = 0;
2244         ENTRY;
2245
2246         if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
2247                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
2248                                 exp->exp_connection->c_peer.peer_nid, str);
2249                 CERROR("!!! nid "LPX64"/%s sent invalid object group %d\n",
2250                        exp->exp_connection->c_peer.peer_nid, str, group);
2251                 RETURN(-EINVAL);
2252         }
2253
2254         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2255             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2256                 recreate_objs = 1;
2257         }
2258
2259         obd = exp->exp_obd;
2260         fed = &exp->exp_filter_data;
2261         filter = &obd->u.filter;
2262
2263         if (fed->fed_group != group && !recreate_objs &&
2264             !(oa->o_valid & OBD_MD_REINT)) {
2265                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
2266                                 exp->exp_connection->c_peer.peer_nid, str);
2267                 CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
2268                        "earlier; now it's trying to use group %d!  This could "
2269                        "be a bug in the MDS.  Tell CFS.\n",
2270                        exp->exp_connection->c_peer.peer_nid, str,
2271                        fed->fed_group, group);
2272                 RETURN(-ENOTUNIQ);
2273         }
2274
2275         CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
2276                group, oa->o_id);
2277         if (ea != NULL) {
2278                 lsm = *ea;
2279                 if (lsm == NULL) {
2280                         rc = obd_alloc_memmd(exp, &lsm);
2281                         if (rc < 0)
2282                                 RETURN(rc);
2283                 }
2284         }
2285
2286         obd = exp->exp_obd;
2287         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2288
2289         if (oa->o_valid & OBD_MD_REINT) {
2290                 int num = *((int*)oa->o_inline);  
2291                 rc = filter_precreate(obd, oa, oa->o_gr, &num);
2292         } else if (recreate_objs) {
2293                 if (oa->o_id > filter_last_id(&obd->u.filter, group)) {
2294                         CERROR("recreate objid "LPU64" > last id "LPU64"\n",
2295                                oa->o_id, filter_last_id(&obd->u.filter, group));
2296                         rc = -EINVAL;
2297                 } else {
2298                         diff = 1;
2299                         rc = filter_precreate(obd, oa, group, &diff);
2300                 }
2301         } else {
2302                 diff = filter_should_precreate(exp, oa, group);
2303                 if (diff > 0) {
2304                         oa->o_id = filter_last_id(&obd->u.filter, group);
2305                         rc = filter_precreate(obd, oa, group, &diff);
2306                         oa->o_id += diff;
2307                         oa->o_valid = OBD_MD_FLID;
2308                 }
2309         }
2310
2311         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2312         if (rc && ea != NULL && *ea != lsm) {
2313                 obd_free_memmd(exp, &lsm);
2314         } else if (rc == 0 && ea != NULL) {
2315                 /* XXX LOV STACKING: the lsm that is passed to us from
2316                  * LOV does not have valid lsm_oinfo data structs, so
2317                  * don't go touching that.  This needs to be fixed in a
2318                  * big way. */
2319                 lsm->lsm_object_id = oa->o_id;
2320                 lsm->lsm_object_gr = oa->o_gr;
2321                 *ea = lsm;
2322         }
2323
2324         RETURN(rc);
2325 }
2326
2327 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
2328                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
2329 {
2330         struct obd_device *obd;
2331         struct filter_obd *filter;
2332         struct dentry *dchild = NULL, *dparent = NULL;
2333         struct lvfs_run_ctxt saved;
2334         void *handle = NULL;
2335         struct llog_cookie *fcc = NULL;
2336         int rc, rc2, cleanup_phase = 0, have_prepared = 0;
2337         void *lock = NULL;
2338         ENTRY;
2339
2340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2341
2342         obd = exp->exp_obd;
2343         filter = &obd->u.filter;
2344
2345         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2346
2347  acquire_locks:
2348         dparent = filter_parent_lock(obd, oa->o_gr, oa->o_id, &lock);
2349         if (IS_ERR(dparent))
2350                 GOTO(cleanup, rc = PTR_ERR(dparent));
2351         cleanup_phase = 1;
2352
2353         dchild = filter_fid2dentry(obd, dparent, oa->o_gr, oa->o_id);
2354         if (IS_ERR(dchild))
2355                 GOTO(cleanup, rc = -ENOENT);
2356         cleanup_phase = 2;
2357
2358         if (dchild->d_inode == NULL) {
2359                 CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n",
2360                        oa->o_id);
2361                 GOTO(cleanup, rc = -ENOENT);
2362         }
2363
2364         if (!have_prepared) {
2365                 /* If we're really going to destroy the object, get ready
2366                  * by getting the clients to discard their cached data.
2367                  *
2368                  * We have to drop the parent lock, because
2369                  * filter_prepare_destroy will acquire a PW on the object, and
2370                  * we don't want to deadlock with an incoming write to the
2371                  * object, which has the extent PW and then wants to get the
2372                  * parent dentry to do the lookup.
2373                  *
2374                  * We dput the child because it's not worth the extra
2375                  * complication of condition the above code to skip it on the
2376                  * second time through. */
2377                 f_dput(dchild);
2378                 filter_parent_unlock(dparent, lock);
2379
2380                 filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
2381                 have_prepared = 1;
2382                 goto acquire_locks;
2383         }
2384
2385         handle = fsfilt_start_log(obd, dparent->d_inode, FSFILT_OP_UNLINK, oti, 1);
2386         if (IS_ERR(handle))
2387                 GOTO(cleanup, rc = PTR_ERR(handle));
2388
2389         cleanup_phase = 3;
2390
2391         /* Our MDC connection is established by the MDS to us */
2392         if (oa->o_valid & OBD_MD_FLCOOKIE) {
2393                 OBD_ALLOC(fcc, sizeof(*fcc));
2394                 if (fcc != NULL)
2395                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
2396         }
2397
2398         rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
2399
2400 cleanup:
2401         switch(cleanup_phase) {
2402         case 3:
2403                 if (fcc != NULL) {
2404                         if (oti != NULL)
2405                                 fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
2406                                                       oti->oti_handle,
2407                                                       filter_cancel_cookies_cb,
2408                                                       fcc);
2409                         else
2410                                 fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
2411                                                       handle,
2412                                                       filter_cancel_cookies_cb,
2413                                                       fcc);
2414                 }
2415                 rc = filter_finish_transno(exp, oti, rc);
2416                 rc2 = fsfilt_commit(obd, filter->fo_sb, dparent->d_inode, 
2417                                     handle, 0);
2418                 if (rc2) {
2419                         CERROR("error on commit, err = %d\n", rc2);
2420                         if (!rc)
2421                                 rc = rc2;
2422                 }
2423         case 2:
2424                 f_dput(dchild);
2425         case 1:
2426                 filter_parent_unlock(dparent, lock);
2427         case 0:
2428                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2429                 break;
2430         default:
2431                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2432                 LBUG();
2433         }
2434
2435         RETURN(rc);
2436 }
2437
2438 /* NB start and end are used for punch, but not truncate */
2439 static int filter_truncate(struct obd_export *exp, struct obdo *oa,
2440                            struct lov_stripe_md *lsm,
2441                            obd_off start, obd_off end,
2442                            struct obd_trans_info *oti)
2443 {
2444         int error;
2445         ENTRY;
2446
2447         if (end != OBD_OBJECT_EOF)
2448                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2449                        end);
2450
2451         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
2452                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2453         oa->o_size = start;
2454         error = filter_setattr(exp, oa, NULL, oti);
2455         RETURN(error);
2456 }
2457
2458 static int filter_sync(struct obd_export *exp, struct obdo *oa,
2459                        struct lov_stripe_md *lsm, obd_off start, obd_off end)
2460 {
2461         struct obd_device *obd = exp->exp_obd;
2462         struct lvfs_run_ctxt saved;
2463         struct filter_obd *filter;
2464         struct dentry *dentry;
2465         struct llog_ctxt *ctxt;
2466         int rc, rc2;
2467         ENTRY;
2468
2469         filter = &obd->u.filter;
2470
2471         /* an objid of zero is taken to mean "sync whole filesystem" */
2472         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
2473                 rc = fsfilt_sync(obd, filter->fo_sb);
2474                 /* flush any remaining cancel messages out to the target */
2475                 ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_REPL_CTXT);
2476                 llog_sync(ctxt, exp);
2477                 RETURN(rc);
2478         }
2479
2480         dentry = filter_oa2dentry(obd, oa);
2481         if (IS_ERR(dentry))
2482                 RETURN(PTR_ERR(dentry));
2483
2484         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2485
2486         down(&dentry->d_inode->i_sem);
2487         rc = filemap_fdatasync(dentry->d_inode->i_mapping);
2488         if (rc == 0) {
2489                 /* just any file to grab fsync method - "file" arg unused */
2490                 struct file *file = filter->fo_rcvd_filp;
2491
2492                 if (file->f_op && file->f_op->fsync)
2493                         rc = file->f_op->fsync(NULL, dentry, 1);
2494
2495                 rc2 = filemap_fdatawait(dentry->d_inode->i_mapping);
2496                 if (!rc)
2497                         rc = rc2;
2498         }
2499         up(&dentry->d_inode->i_sem);
2500
2501         oa->o_valid = OBD_MD_FLID;
2502         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
2503
2504         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2505
2506         f_dput(dentry);
2507         RETURN(rc);
2508 }
2509
2510 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2511                          unsigned long max_age)
2512 {
2513         struct filter_obd *filter = &obd->u.filter;
2514         int blockbits = filter->fo_sb->s_blocksize_bits;
2515         int rc;
2516         ENTRY;
2517
2518         /* at least try to account for cached pages.  its still racey and
2519          * might be under-reporting if clients haven't announced their
2520          * caches with brw recently */
2521         spin_lock(&obd->obd_osfs_lock);
2522         rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
2523         memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
2524         spin_unlock(&obd->obd_osfs_lock);
2525
2526         CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
2527                " pending "LPU64" free "LPU64" avail "LPU64"\n",
2528                filter->fo_tot_dirty, filter->fo_tot_granted,
2529                filter->fo_tot_pending,
2530                osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
2531
2532         filter_grant_sanity_check(obd, __FUNCTION__);
2533
2534         osfs->os_bavail -= min(osfs->os_bavail,
2535                                (filter->fo_tot_dirty + filter->fo_tot_pending +
2536                                 osfs->os_bsize -1) >> blockbits);
2537
2538         RETURN(rc);
2539 }
2540
2541 static int filter_get_info(struct obd_export *exp, __u32 keylen,
2542                            void *key, __u32 *vallen, void *val)
2543 {
2544         struct filter_export_data *fed = &exp->exp_filter_data;
2545         struct obd_device *obd;
2546         ENTRY;
2547
2548         obd = class_exp2obd(exp);
2549         if (obd == NULL) {
2550                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2551                        exp->exp_handle.h_cookie);
2552                 RETURN(-EINVAL);
2553         }
2554
2555         if (keylen == strlen("blocksize") &&
2556             memcmp(key, "blocksize", keylen) == 0) {
2557                 __u32 *blocksize = val;
2558                 *vallen = sizeof(*blocksize);
2559                 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2560                 RETURN(0);
2561         }
2562
2563         if (keylen == strlen("blocksize_bits") &&
2564             memcmp(key, "blocksize_bits", keylen) == 0) {
2565                 __u32 *blocksize_bits = val;
2566                 *vallen = sizeof(*blocksize_bits);
2567                 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2568                 RETURN(0);
2569         }
2570
2571         if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
2572                 obd_id *last_id = val;
2573                 *last_id = filter_last_id(&obd->u.filter, fed->fed_group);
2574                 RETURN(0);
2575         }
2576         if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
2577                 /*Get log_context handle*/
2578                 unsigned long *llh_handle = val;
2579                 *vallen = sizeof(unsigned long);
2580                 *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
2581                 RETURN(0);
2582         }
2583         if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
2584                 /*Get log_context handle*/
2585                 unsigned long *sb = val;
2586                 *vallen = sizeof(unsigned long);
2587                 *sb = (unsigned long)obd->u.filter.fo_sb;
2588                 RETURN(0);
2589         }
2590
2591         CDEBUG(D_IOCTL, "invalid key\n");
2592         RETURN(-EINVAL);
2593 }
2594
2595 struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
2596 {
2597         struct filter_group_llog *fglog, *nlog;
2598         char name[32] = "CATLIST";
2599         struct filter_obd *filter;
2600         struct list_head *cur;
2601         int rc;
2602
2603         filter = &obd->u.filter;
2604
2605         spin_lock(&filter->fo_llog_list_lock);
2606         list_for_each(cur, &filter->fo_llog_list) {
2607                 fglog = list_entry(cur, struct filter_group_llog, list);
2608                 if (fglog->group == group) {
2609                         spin_unlock(&filter->fo_llog_list_lock);
2610                         RETURN(fglog->llogs);
2611                 }
2612         }
2613         spin_unlock(&filter->fo_llog_list_lock);
2614
2615         OBD_ALLOC(fglog, sizeof(*fglog));
2616         if (fglog == NULL)
2617                 RETURN(NULL);
2618         fglog->group = group;
2619
2620         OBD_ALLOC(fglog->llogs, sizeof(struct obd_llogs));
2621         if (fglog->llogs == NULL) {
2622                 OBD_FREE(fglog, sizeof(*fglog));
2623                 RETURN(NULL);
2624         }
2625
2626         spin_lock(&filter->fo_llog_list_lock);
2627         list_for_each(cur, &filter->fo_llog_list) {
2628                 nlog = list_entry(cur, struct filter_group_llog, list);
2629                 LASSERT(nlog->group != group);
2630         }
2631         list_add(&fglog->list, &filter->fo_llog_list);
2632         spin_unlock(&filter->fo_llog_list_lock);
2633
2634         rc = obd_llog_cat_initialize(obd, fglog->llogs, 1, name);
2635         if (rc) {
2636                 OBD_FREE(fglog->llogs, sizeof(*(fglog->llogs)));
2637                 OBD_FREE(fglog, sizeof(*fglog));
2638                 RETURN(NULL);
2639         }
2640
2641         CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n", obd->obd_name,
2642                 fglog->llogs, group);
2643
2644         RETURN(fglog->llogs);
2645 }
2646
2647 static int filter_set_info(struct obd_export *exp, __u32 keylen,
2648                            void *key, __u32 vallen, void *val)
2649 {
2650         struct lvfs_run_ctxt saved;
2651         struct filter_export_data *fed = &exp->exp_filter_data;
2652         struct obd_device *obd;
2653         struct lustre_handle conn;
2654         struct obd_llogs *llog;
2655         struct llog_ctxt *ctxt;
2656         __u32 group;
2657         int rc = 0;
2658         ENTRY;
2659
2660         conn.cookie = exp->exp_handle.h_cookie;
2661
2662         obd = exp->exp_obd;
2663         if (obd == NULL) {
2664                 CDEBUG(D_IOCTL, "invalid exp %p cookie "LPX64"\n",
2665                        exp, conn.cookie);
2666                 RETURN(-EINVAL);
2667         }
2668
2669         if (keylen < strlen("mds_conn") ||
2670             memcmp(key, "mds_conn", keylen) != 0)
2671                 RETURN(-EINVAL);
2672
2673         group = *((__u32 *)val);
2674         if (fed->fed_group != 0 && fed->fed_group != group) {
2675                 char str[PTL_NALFMT_SIZE];
2676                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
2677                                 exp->exp_connection->c_peer.peer_nid, str);
2678                 CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
2679                        "earlier; now it's trying to use group %d!  This could "
2680                        "be a bug in the MDS.  Tell CFS.\n",
2681                        exp->exp_connection->c_peer.peer_nid, str,
2682                        fed->fed_group, group);
2683                 RETURN(-EPROTO);
2684         }
2685         fed->fed_group = group;
2686         CWARN("Received MDS connection ("LPX64"); group %d\n", conn.cookie,
2687               group);
2688
2689         LASSERT(rc == 0);
2690
2691         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2692         rc = filter_read_groups(obd, group, 1);
2693         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2694         if (rc != 0) {
2695                 CERROR("can't read group %u\n", group);
2696                 RETURN(rc);
2697         }
2698         rc = filter_group_set_fs_flags(obd, group);
2699          if (rc != 0) {
2700                 CERROR("can't set kml flags %u\n", group);
2701                 RETURN(rc);
2702         }
2703         llog = filter_grab_llog_for_group(obd, group);
2704         LASSERT(llog != NULL);
2705
2706         ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
2707         LASSERT(ctxt != NULL);
2708         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
2709         RETURN(rc);
2710 }
2711
2712 int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
2713                      int len, void *karg, void *uarg)
2714 {
2715         struct obd_device *obd = exp->exp_obd;
2716         struct obd_ioctl_data *data = karg;
2717         int rc = 0;
2718
2719         switch (cmd) {
2720         case OBD_IOC_ABORT_RECOVERY:
2721                 target_stop_recovery_thread(obd);
2722                 RETURN(0);
2723
2724         case OBD_IOC_SET_READONLY: {
2725                 void *handle;
2726                 struct super_block *sb = obd->u.filter.fo_sb;
2727                 struct inode *inode = sb->s_root->d_inode;
2728                 BDEVNAME_DECLARE_STORAGE(tmp);
2729                 CERROR("setting device %s read-only\n",
2730                        ll_bdevname(sb, tmp));
2731
2732                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
2733                 LASSERT(handle);
2734                 (void)fsfilt_commit(obd, sb, inode, handle, 1);
2735
2736                 dev_set_rdonly(ll_sbdev(obd->u.filter.fo_sb), 2);
2737                 RETURN(0);
2738         }
2739
2740         case OBD_IOC_CATLOGLIST: {
2741                 rc = llog_catalog_list(obd, 1, data);
2742                 RETURN(rc);
2743         }
2744
2745         case OBD_IOC_SNAP_ADD: {
2746                 char *name = data->ioc_inlbuf1;
2747                 if (name) {
2748                         rc = fsfilt_set_snap_item(obd, obd->u.filter.fo_sb, name);
2749                 }
2750                 RETURN(rc);
2751         }
2752         case OBD_IOC_LLOG_CANCEL:
2753         case OBD_IOC_LLOG_REMOVE:
2754         case OBD_IOC_LLOG_INFO:
2755         case OBD_IOC_LLOG_PRINT: {
2756                 /* FIXME to be finished */
2757                 RETURN(-EOPNOTSUPP);
2758 /*
2759                 struct llog_ctxt *ctxt = NULL;
2760
2761                 push_ctxt(&saved, &ctxt->loc_ctxt, NULL);
2762                 rc = llog_ioctl(ctxt, cmd, data);
2763                 pop_ctxt(&saved, &ctxt->loc_ctxt, NULL);
2764
2765                 RETURN(rc);
2766 */
2767         }
2768
2769
2770         default:
2771                 RETURN(-EINVAL);
2772         }
2773         RETURN(0);
2774 }
2775
2776 static struct llog_operations filter_unlink_repl_logops;
2777 static struct llog_operations filter_size_orig_logops = {
2778         lop_setup: llog_obd_origin_setup,
2779         lop_cleanup: llog_catalog_cleanup,
2780         lop_add: llog_catalog_add,
2781 };
2782
2783 static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
2784                             struct obd_device *tgt, int count,
2785                             struct llog_catid *catid)
2786 {
2787         struct llog_ctxt *ctxt;
2788         int rc;
2789         ENTRY;
2790
2791         filter_unlink_repl_logops = llog_client_ops;
2792         filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel;
2793         filter_unlink_repl_logops.lop_connect = llog_repl_connect;
2794         filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync;
2795
2796         rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
2797                         &filter_unlink_repl_logops);
2798         if (rc)
2799                 RETURN(rc);
2800         /* FIXME - assign unlink_cb for filter's recovery */
2801         ctxt = llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT);
2802         ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
2803
2804         /* FIXME - count should be 1 to setup size log */
2805         rc = obd_llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, 
2806                             &catid->lci_logid, &filter_size_orig_logops);
2807         RETURN(rc);
2808 }
2809
2810 static int filter_llog_finish(struct obd_device *obd,
2811                               struct obd_llogs *llogs, int count)
2812 {
2813         int rc;
2814         ENTRY;
2815
2816         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT));
2817         if (rc)
2818                 RETURN(rc);
2819
2820         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT));
2821         RETURN(rc);
2822 }
2823
2824 static int filter_llog_connect(struct obd_device *obd,
2825                                struct llogd_conn_body *body) 
2826 {
2827         struct llog_ctxt *ctxt;
2828         struct obd_llogs *llog;
2829         int rc;
2830         ENTRY;
2831
2832         CDEBUG(D_OTHER, "handle connect for %s: %u/%u/%u\n", obd->obd_name,
2833                (unsigned) body->lgdc_logid.lgl_ogr,
2834                (unsigned) body->lgdc_logid.lgl_oid,
2835                (unsigned) body->lgdc_logid.lgl_ogen);
2836         llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr);
2837         LASSERT(llog != NULL);
2838         ctxt = llog_get_context(llog, body->lgdc_ctxt_idx);
2839         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
2840                           &body->lgdc_gen, NULL);
2841         if (rc != 0)
2842                 CERROR("failed to connect\n");
2843
2844         RETURN(rc);
2845 }
2846
2847 static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2848                                              void *data)
2849 {
2850         return filter_fid2dentry(data, NULL, gr, id);
2851 }
2852
2853 static struct lvfs_callback_ops filter_lvfs_ops = {
2854         l_fid2dentry:     filter_lvfs_fid2dentry,
2855 };
2856
2857 static struct obd_ops filter_obd_ops = {
2858         .o_owner          = THIS_MODULE,
2859         .o_attach         = filter_attach,
2860         .o_detach         = filter_detach,
2861         .o_get_info       = filter_get_info,
2862         .o_set_info       = filter_set_info,
2863         .o_setup          = filter_setup,
2864         .o_precleanup     = filter_precleanup,
2865         .o_cleanup        = filter_cleanup,
2866         .o_connect        = filter_connect,
2867         .o_disconnect     = filter_disconnect,
2868         .o_statfs         = filter_statfs,
2869         .o_getattr        = filter_getattr,
2870         .o_unpackmd       = filter_unpackmd,
2871         .o_create         = filter_create,
2872         .o_setattr        = filter_setattr,
2873         .o_destroy        = filter_destroy,
2874         .o_brw            = filter_brw,
2875         .o_punch          = filter_truncate,
2876         .o_sync           = filter_sync,
2877         .o_preprw         = filter_preprw,
2878         .o_commitrw       = filter_commitrw,
2879         .o_do_cow         = filter_do_cow,
2880         .o_write_extents  = filter_write_extents,
2881         .o_destroy_export = filter_destroy_export,
2882         .o_llog_init      = filter_llog_init,
2883         .o_llog_finish    = filter_llog_finish,
2884         .o_llog_connect   = filter_llog_connect,
2885         .o_iocontrol      = filter_iocontrol,
2886 };
2887
2888 static struct obd_ops filter_sanobd_ops = {
2889         .o_owner          = THIS_MODULE,
2890         .o_attach         = filter_attach,
2891         .o_detach         = filter_detach,
2892         .o_get_info       = filter_get_info,
2893         .o_set_info       = filter_set_info,
2894         .o_setup          = filter_san_setup,
2895         .o_precleanup     = filter_precleanup,
2896         .o_cleanup        = filter_cleanup,
2897         .o_connect        = filter_connect,
2898         .o_disconnect     = filter_disconnect,
2899         .o_statfs         = filter_statfs,
2900         .o_getattr        = filter_getattr,
2901         .o_unpackmd       = filter_unpackmd,
2902         .o_create         = filter_create,
2903         .o_setattr        = filter_setattr,
2904         .o_destroy        = filter_destroy,
2905         .o_brw            = filter_brw,
2906         .o_punch          = filter_truncate,
2907         .o_sync           = filter_sync,
2908         .o_preprw         = filter_preprw,
2909         .o_commitrw       = filter_commitrw,
2910         .o_do_cow         = filter_do_cow,
2911         .o_write_extents  = filter_write_extents,
2912         .o_san_preprw     = filter_san_preprw,
2913         .o_destroy_export = filter_destroy_export,
2914         .o_llog_init      = filter_llog_init,
2915         .o_llog_finish    = filter_llog_finish,
2916         .o_llog_connect   = filter_llog_connect,
2917         .o_iocontrol      = filter_iocontrol,
2918 };
2919
2920 static int __init obdfilter_init(void)
2921 {
2922         struct lprocfs_static_vars lvars;
2923         int rc;
2924
2925         printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
2926
2927         lprocfs_init_vars(filter, &lvars);
2928
2929         rc = class_register_type(&filter_obd_ops, NULL, lvars.module_vars,
2930                                  OBD_FILTER_DEVICENAME);
2931         if (rc)
2932                 return rc;
2933
2934         rc = class_register_type(&filter_sanobd_ops, NULL, lvars.module_vars,
2935                                  OBD_FILTER_SAN_DEVICENAME);
2936         if (rc)
2937                 class_unregister_type(OBD_FILTER_DEVICENAME);
2938         return rc;
2939 }
2940
2941 static void __exit obdfilter_exit(void)
2942 {
2943         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2944         class_unregister_type(OBD_FILTER_DEVICENAME);
2945 }
2946
2947 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2948 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2949 MODULE_LICENSE("GPL");
2950
2951 module_init(obdfilter_init);
2952 module_exit(obdfilter_exit);