Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define DEBUG_SUBSYSTEM S_FILTER
37
38 #include <linux/config.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/dcache.h>
42 #include <linux/init.h>
43 #include <linux/version.h>
44 #include <linux/sched.h>
45 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
46 # include <linux/mount.h>
47 # include <linux/buffer_head.h>
48 #endif
49
50 #include <linux/obd_class.h>
51 #include <linux/obd_lov.h>
52 #include <linux/lustre_dlm.h>
53 #include <linux/lustre_fsfilt.h>
54 #include <linux/lprocfs_status.h>
55 #include <linux/lustre_log.h>
56 #include <linux/lustre_ver.h>
57 #include <linux/lustre_commit_confd.h>
58 #include <libcfs/list.h>
59 #include <linux/lustre_disk.h>
60 #include <linux/lustre_quota.h>
61
62 #include "filter_internal.h"
63
64 static struct lvfs_callback_ops filter_lvfs_ops;
65
66 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
67                              void *cb_data, int error)
68 {
69         obd_transno_commit_cb(obd, transno, error);
70 }
71
72 /* Assumes caller has already pushed us into the kernel context. */
73 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
74                           int rc)
75 {
76         struct filter_obd *filter = &exp->exp_obd->u.filter;
77         struct filter_export_data *fed = &exp->exp_filter_data;
78         struct filter_client_data *fcd = fed->fed_fcd;
79         __u64 last_rcvd;
80         loff_t off;
81         int err, log_pri = D_HA;
82
83         /* Propagate error code. */
84         if (rc)
85                 RETURN(rc);
86
87         if (!exp->exp_obd->obd_replayable || oti == NULL)
88                 RETURN(rc);
89
90         /* we don't allocate new transnos for replayed requests */
91         if (oti->oti_transno == 0) {
92                 spin_lock(&filter->fo_translock);
93                 last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
94                 filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
95                 spin_unlock(&filter->fo_translock);
96                 oti->oti_transno = last_rcvd;
97         } else {
98                 spin_lock(&filter->fo_translock);
99                 last_rcvd = oti->oti_transno;
100                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
101                         filter->fo_fsd->lsd_last_transno =
102                                 cpu_to_le64(last_rcvd);
103                 spin_unlock(&filter->fo_translock);
104         }
105         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
106
107         /* could get xid from oti, if it's ever needed */
108         fcd->fcd_last_xid = 0;
109
110         off = fed->fed_lr_off;
111         if (off <= 0) {
112                 CERROR("%s: client idx %d is %lld\n", exp->exp_obd->obd_name,
113                        fed->fed_lr_idx, fed->fed_lr_off);
114                 err = -EINVAL;
115         } else {
116                 fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
117                                       filter_commit_cb, NULL);
118                 err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
119                                           fcd, sizeof(*fcd), &off, 0);
120         }
121         if (err) {
122                 log_pri = D_ERROR;
123                 if (rc == 0)
124                         rc = err;
125         }
126
127         CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n",
128                last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err);
129
130         RETURN(rc);
131 }
132
133 void f_dput(struct dentry *dentry)
134 {
135         /* Can't go inside filter_ddelete because it can block */
136         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
137                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
138         LASSERT(atomic_read(&dentry->d_count) > 0);
139
140         dput(dentry);
141 }
142
143 /* Add client data to the FILTER.  We use a bitmap to locate a free space
144  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
145  * Otherwise, we have just read the data from the last_rcvd file and
146  * we know its offset. */
147 static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
148                              struct filter_export_data *fed, int cl_idx)
149 {
150         unsigned long *bitmap = filter->fo_last_rcvd_slots;
151         int new_client = (cl_idx == -1);
152         ENTRY;
153
154         LASSERT(bitmap != NULL);
155         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
156
157         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
158         if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
159                 RETURN(0);
160
161         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
162          * there's no need for extra complication here
163          */
164         if (new_client) {
165                 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
166         repeat:
167                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
168                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
169                         RETURN(-EOVERFLOW);
170                 }
171                 if (test_and_set_bit(cl_idx, bitmap)) {
172                         cl_idx = find_next_zero_bit(bitmap,
173                                                     FILTER_LR_MAX_CLIENTS,
174                                                     cl_idx);
175                         goto repeat;
176                 }
177         } else {
178                 if (test_and_set_bit(cl_idx, bitmap)) {
179                         CERROR("FILTER client %d: bit already set in bitmap!\n",
180                                cl_idx);
181                         LBUG();
182                 }
183         }
184
185         fed->fed_lr_idx = cl_idx;
186         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) +
187                 cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size);
188         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
189
190         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
191                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
192
193         if (new_client) {
194                 struct lvfs_run_ctxt saved;
195                 loff_t off = fed->fed_lr_off;
196                 int err;
197                 void *handle;
198
199                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
200                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
201
202                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
203                 /* Transaction needed to fix bug 1403 */
204                 handle = fsfilt_start(obd,
205                                       filter->fo_rcvd_filp->f_dentry->d_inode,
206                                       FSFILT_OP_SETATTR, NULL);
207                 if (IS_ERR(handle)) {
208                         err = PTR_ERR(handle);
209                         CERROR("unable to start transaction: rc %d\n", err);
210                 } else {
211                         err = fsfilt_write_record(obd, filter->fo_rcvd_filp,
212                                                   fed->fed_fcd,
213                                                   sizeof(*fed->fed_fcd),
214                                                   &off, 1);
215                         fsfilt_commit(obd,
216                                       filter->fo_rcvd_filp->f_dentry->d_inode,
217                                       handle, 1);
218                 }
219                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
220
221                 if (err) {
222                         CERROR("error writing %s client idx %u: rc %d\n",
223                                LAST_RCVD, fed->fed_lr_idx, err);
224                         RETURN(err);
225                 }
226         }
227         RETURN(0);
228 }
229
230 static int filter_client_free(struct obd_export *exp)
231 {
232         struct filter_export_data *fed = &exp->exp_filter_data;
233         struct filter_obd *filter = &exp->exp_obd->u.filter;
234         struct obd_device *obd = exp->exp_obd;
235         struct filter_client_data zero_fcd;
236         struct lvfs_run_ctxt saved;
237         int rc;
238         loff_t off;
239         ENTRY;
240
241         if (fed->fed_fcd == NULL)
242                 RETURN(0);
243
244         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
245         if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
246                 GOTO(free, 0);
247
248         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
249                fed->fed_lr_idx, off, fed->fed_fcd->fcd_uuid);
250
251         LASSERT(filter->fo_last_rcvd_slots != NULL);
252
253         off = fed->fed_lr_off;
254
255         /* Don't clear fed_lr_idx here as it is likely also unset.  At worst
256          * we leak a client slot that will be cleaned on the next recovery. */
257         if (off <= 0) {
258                 CERROR("%s: client idx %d has med_off %lld\n",
259                        obd->obd_name, fed->fed_lr_idx, off);
260                 GOTO(free, rc = -EINVAL);
261         }
262
263         /* Clear the bit _after_ zeroing out the client so we don't
264            race with filter_client_add and zero out new clients.*/
265         if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
266                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
267                        fed->fed_lr_idx);
268                 LBUG();
269         }
270
271         if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
272                 memset(&zero_fcd, 0, sizeof zero_fcd);
273                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
274                 rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
275                                          sizeof(zero_fcd), &off, 0);
276
277                 if (rc == 0)
278                         /* update server's transno */
279                         filter_update_server_data(obd, filter->fo_rcvd_filp,
280                                                   filter->fo_fsd, 1);
281                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
282
283                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
284                        "zeroing out client %s at idx %u (%llu) in %s rc %d\n",
285                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
286                        LAST_RCVD, rc);
287         }
288
289         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
290                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
291                        fed->fed_lr_idx);
292                 LBUG();
293         }
294
295         EXIT;
296 free:
297         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
298         fed->fed_fcd = NULL;
299
300         return 0;
301 }
302
303 static int filter_free_server_data(struct filter_obd *filter)
304 {
305         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
306         filter->fo_fsd = NULL;
307         OBD_FREE(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS / 8);
308         filter->fo_last_rcvd_slots = NULL;
309         return 0;
310 }
311
312 /* assumes caller is already in kernel ctxt */
313 int filter_update_server_data(struct obd_device *obd, struct file *filp,
314                               struct lr_server_data *fsd, int force_sync)
315 {
316         loff_t off = 0;
317         int rc;
318         ENTRY;
319
320         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->lsd_uuid);
321         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
322                le64_to_cpu(fsd->lsd_last_transno));
323         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
324                le64_to_cpu(fsd->lsd_mount_count));
325
326         rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync);
327         if (rc)
328                 CERROR("error writing lr_server_data: rc = %d\n", rc);
329
330         RETURN(rc);
331 }
332
333 int filter_update_last_objid(struct obd_device *obd, obd_gr group,
334                              int force_sync)
335 {
336         struct filter_obd *filter = &obd->u.filter;
337         __u64 tmp;
338         loff_t off = 0;
339         int rc;
340         ENTRY;
341
342         CDEBUG(D_INODE, "%s: server last_objid for group "LPU64": "LPU64"\n",
343                obd->obd_name, group, filter->fo_last_objids[group]);
344
345         tmp = cpu_to_le64(filter->fo_last_objids[group]);
346         rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
347                                  &tmp, sizeof(tmp), &off, force_sync);
348         if (rc)
349                 CERROR("error writing group "LPU64" last objid: rc = %d\n",
350                        group, rc);
351         RETURN(rc);
352 }
353
354 /* assumes caller has already in kernel ctxt */
355 static int filter_init_server_data(struct obd_device *obd, struct file * filp)
356 {
357         struct filter_obd *filter = &obd->u.filter;
358         struct lr_server_data *fsd;
359         struct filter_client_data *fcd = NULL;
360         struct inode *inode = filp->f_dentry->d_inode;
361         unsigned long last_rcvd_size = inode->i_size;
362         __u64 mount_count;
363         int cl_idx;
364         loff_t off = 0;
365         int rc;
366
367         /* ensure padding in the struct is the correct size */
368         LASSERT (offsetof(struct lr_server_data, lsd_padding) +
369                  sizeof(fsd->lsd_padding) == FILTER_LR_SERVER_SIZE);
370         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
371                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
372         LASSERT(FILTER_LR_CLIENT_SIZE == LR_CLIENT_SIZE);
373         LASSERT(FILTER_LR_CLIENT_START == LR_CLIENT_START);
374
375         OBD_ALLOC(fsd, sizeof(*fsd));
376         if (!fsd)
377                 RETURN(-ENOMEM);
378         filter->fo_fsd = fsd;
379
380         OBD_ALLOC(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS / 8);
381         if (filter->fo_last_rcvd_slots == NULL) {
382                 OBD_FREE(fsd, sizeof(*fsd));
383                 RETURN(-ENOMEM);
384         }
385
386         if (last_rcvd_size == 0) {
387                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
388
389                 memcpy(fsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->lsd_uuid));
390                 fsd->lsd_last_transno = 0;
391                 mount_count = fsd->lsd_mount_count = 0;
392                 fsd->lsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
393                 fsd->lsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
394                 fsd->lsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
395                 fsd->lsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
396                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
397         } else {
398                 rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off);
399                 if (rc) {
400                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
401                                LAST_RCVD, rc);
402                         GOTO(err_fsd, rc);
403                 }
404                 if (strcmp(fsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
405                         CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
406                                obd->obd_uuid.uuid, fsd->lsd_uuid);
407                         GOTO(err_fsd, rc = -EINVAL);
408                 }
409                 mount_count = le64_to_cpu(fsd->lsd_mount_count);
410                 filter->fo_subdir_count = le16_to_cpu(fsd->lsd_subdir_count);
411         }
412
413         if (fsd->lsd_feature_incompat & ~cpu_to_le32(FILTER_INCOMPAT_SUPP)) {
414                 CERROR("unsupported feature %x\n",
415                        le32_to_cpu(fsd->lsd_feature_incompat) &
416                        ~FILTER_INCOMPAT_SUPP);
417                 GOTO(err_fsd, rc = -EINVAL);
418         }
419         if (fsd->lsd_feature_rocompat & ~cpu_to_le32(FILTER_ROCOMPAT_SUPP)) {
420                 CERROR("read-only feature %x\n",
421                        le32_to_cpu(fsd->lsd_feature_rocompat) &
422                        ~FILTER_ROCOMPAT_SUPP);
423                 /* Do something like remount filesystem read-only */
424                 GOTO(err_fsd, rc = -EINVAL);
425         }
426
427         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
428                obd->obd_name, le64_to_cpu(fsd->lsd_last_transno));
429         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
430                obd->obd_name, mount_count + 1);
431         CDEBUG(D_INODE, "%s: server data size: %u\n",
432                obd->obd_name, le32_to_cpu(fsd->lsd_server_size));
433         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
434                obd->obd_name, le32_to_cpu(fsd->lsd_client_start));
435         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
436                obd->obd_name, le32_to_cpu(fsd->lsd_client_size));
437         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
438                obd->obd_name, le16_to_cpu(fsd->lsd_subdir_count));
439         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
440                last_rcvd_size <= le32_to_cpu(fsd->lsd_client_start) ? 0 :
441                (last_rcvd_size - le32_to_cpu(fsd->lsd_client_start)) /
442                 le16_to_cpu(fsd->lsd_client_size));
443
444         if (!obd->obd_replayable) {
445                 CWARN("%s: recovery support OFF\n", obd->obd_name);
446                 GOTO(out, rc = 0);
447         }
448
449         for (cl_idx = 0, off = le32_to_cpu(fsd->lsd_client_start);
450              off < last_rcvd_size; cl_idx++) {
451                 __u64 last_rcvd;
452                 struct obd_export *exp;
453                 struct filter_export_data *fed;
454
455                 if (!fcd) {
456                         OBD_ALLOC(fcd, sizeof(*fcd));
457                         if (!fcd)
458                                 GOTO(err_client, rc = -ENOMEM);
459                 }
460
461                 /* Don't assume off is incremented properly by
462                  * fsfilt_read_record(), in case sizeof(*fcd)
463                  * isn't the same as fsd->lsd_client_size.  */
464                 off = le32_to_cpu(fsd->lsd_client_start) +
465                         cl_idx * le16_to_cpu(fsd->lsd_client_size);
466                 rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
467                 if (rc) {
468                         CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
469                                LAST_RCVD, cl_idx, off, rc);
470                         break; /* read error shouldn't cause startup to fail */
471                 }
472
473                 if (fcd->fcd_uuid[0] == '\0') {
474                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
475                                cl_idx);
476                         continue;
477                 }
478
479                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
480
481                 /* These exports are cleaned up by filter_disconnect(), so they
482                  * need to be set up like real exports as filter_connect() does.
483                  */
484                 exp = class_new_export(obd);
485                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
486                        " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
487                        last_rcvd, le64_to_cpu(fsd->lsd_last_transno));
488                 if (exp == NULL)
489                         GOTO(err_client, rc = -ENOMEM);
490
491                 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
492                        sizeof exp->exp_client_uuid.uuid);
493                 fed = &exp->exp_filter_data;
494                 fed->fed_fcd = fcd;
495                 rc = filter_client_add(obd, filter, fed, cl_idx);
496                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
497
498                 /* create helper if export init gets more complex */
499                 spin_lock_init(&fed->fed_lock);
500
501                 fcd = NULL;
502                 exp->exp_replay_needed = 1;
503                 obd->obd_recoverable_clients++;
504                 obd->obd_max_recoverable_clients++;
505                 class_export_put(exp);
506
507                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
508                        cl_idx, last_rcvd);
509
510                 if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
511                         fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
512
513         }
514
515         if (fcd)
516                 OBD_FREE(fcd, sizeof(*fcd));
517
518         obd->obd_last_committed = le64_to_cpu(fsd->lsd_last_transno);
519
520         if (obd->obd_recoverable_clients) {
521                 CWARN("RECOVERY: service %s, %d recoverable clients, "
522                       "last_rcvd "LPU64"\n", obd->obd_name,
523                       obd->obd_recoverable_clients,
524                       le64_to_cpu(fsd->lsd_last_transno));
525                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
526                 obd->obd_recovering = 1;
527                 obd->obd_recovery_start = CURRENT_SECONDS;
528                 /* Only used for lprocfs_status */
529                 obd->obd_recovery_end = obd->obd_recovery_start +
530                         OBD_RECOVERY_TIMEOUT / HZ;
531         }
532
533 out:
534         filter->fo_mount_count = mount_count + 1;
535         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
536
537         /* save it, so mount count and last_transno is current */
538         rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
539         if (rc)
540                 GOTO(err_client, rc);
541
542         RETURN(0);
543
544 err_client:
545         class_disconnect_exports(obd);
546 err_fsd:
547         filter_free_server_data(filter);
548         RETURN(rc);
549 }
550
551 static int filter_cleanup_groups(struct obd_device *obd)
552 {
553         struct filter_obd *filter = &obd->u.filter;
554         struct file *filp;
555         struct dentry *dentry;
556         int i;
557         ENTRY;
558
559         if (filter->fo_dentry_O_groups != NULL) {
560                 for (i = 0; i < FILTER_GROUPS; i++) {
561                         dentry = filter->fo_dentry_O_groups[i];
562                         if (dentry != NULL)
563                                 f_dput(dentry);
564                 }
565                 OBD_FREE(filter->fo_dentry_O_groups,
566                          FILTER_GROUPS * sizeof(*filter->fo_dentry_O_groups));
567                 filter->fo_dentry_O_groups = NULL;
568         }
569         if (filter->fo_last_objid_files != NULL) {
570                 for (i = 0; i < FILTER_GROUPS; i++) {
571                         filp = filter->fo_last_objid_files[i];
572                         if (filp != NULL)
573                                 filp_close(filp, 0);
574                 }
575                 OBD_FREE(filter->fo_last_objid_files,
576                          FILTER_GROUPS * sizeof(*filter->fo_last_objid_files));
577                 filter->fo_last_objid_files = NULL;
578         }
579         if (filter->fo_dentry_O_sub != NULL) {
580                 for (i = 0; i < filter->fo_subdir_count; i++) {
581                         dentry = filter->fo_dentry_O_sub[i];
582                         if (dentry != NULL)
583                                 f_dput(dentry);
584                 }
585                 OBD_FREE(filter->fo_dentry_O_sub,
586                          filter->fo_subdir_count *
587                          sizeof(*filter->fo_dentry_O_sub));
588                 filter->fo_dentry_O_sub = NULL;
589         }
590         if (filter->fo_last_objids != NULL) {
591                 OBD_FREE(filter->fo_last_objids,
592                          FILTER_GROUPS * sizeof(*filter->fo_last_objids));
593                 filter->fo_last_objids = NULL;
594         }
595         if (filter->fo_dentry_O != NULL) {
596                 f_dput(filter->fo_dentry_O);
597                 filter->fo_dentry_O = NULL;
598         }
599         RETURN(0);
600 }
601
602 /* FIXME: object groups */
603 static int filter_prep_groups(struct obd_device *obd)
604 {
605         struct filter_obd *filter = &obd->u.filter;
606         struct dentry *dentry, *O_dentry;
607         struct file *filp;
608         int i, rc = 0, cleanup_phase = 0;
609         ENTRY;
610
611         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
612         CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
613         if (IS_ERR(O_dentry)) {
614                 rc = PTR_ERR(O_dentry);
615                 CERROR("cannot open/create O: rc = %d\n", rc);
616                 GOTO(cleanup, rc);
617         }
618         filter->fo_dentry_O = O_dentry;
619         cleanup_phase = 1; /* O_dentry */
620
621         /* Lookup "R" to tell if we're on an old OST FS and need to convert
622          * from O/R/<dir>/<objid> to O/0/<dir>/<objid>.  This can be removed
623          * some time post 1.0 when all old-style OSTs have converted along
624          * with the init_objid hack. */
625         dentry = ll_lookup_one_len("R", O_dentry, 1);
626         if (IS_ERR(dentry))
627                 GOTO(cleanup, rc = PTR_ERR(dentry));
628         if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
629                 struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1);
630                 ENTRY;
631
632                 CWARN("converting OST to new object layout\n");
633                 if (IS_ERR(O0_dentry)) {
634                         rc = PTR_ERR(O0_dentry);
635                         CERROR("error looking up O/0: rc %d\n", rc);
636                         GOTO(cleanup_R, rc);
637                 }
638
639                 if (O0_dentry->d_inode) {
640                         CERROR("Both O/R and O/0 exist. Fix manually.\n");
641                         GOTO(cleanup_O0, rc = -EEXIST);
642                 }
643
644                 down(&O_dentry->d_inode->i_sem);
645                 rc = vfs_rename(O_dentry->d_inode, dentry,
646                                 O_dentry->d_inode, O0_dentry);
647                 up(&O_dentry->d_inode->i_sem);
648
649                 if (rc) {
650                         CERROR("error renaming O/R to O/0: rc %d\n", rc);
651                         GOTO(cleanup_O0, rc);
652                 }
653                 filter->fo_fsd->lsd_feature_incompat |=
654                         cpu_to_le32(FILTER_INCOMPAT_GROUPS);
655                 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
656                                                filter->fo_fsd, 1);
657                 GOTO(cleanup_O0, rc);
658
659         cleanup_O0:
660                 f_dput(O0_dentry);
661         cleanup_R:
662                 f_dput(dentry);
663                 if (rc)
664                         GOTO(cleanup, rc);
665         } else {
666                 f_dput(dentry);
667         }
668
669         OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
670         if (filter->fo_last_objids == NULL)
671                 GOTO(cleanup, rc = -ENOMEM);
672         cleanup_phase = 2; /* groups */
673
674         OBD_ALLOC(filter->fo_dentry_O_groups, FILTER_GROUPS * sizeof(dentry));
675         if (filter->fo_dentry_O_groups == NULL)
676                 GOTO(cleanup, rc = -ENOMEM);
677         OBD_ALLOC(filter->fo_last_objid_files, FILTER_GROUPS * sizeof(filp));
678         if (filter->fo_last_objid_files == NULL)
679                 GOTO(cleanup, rc = -ENOMEM);
680
681         for (i = 0; i < FILTER_GROUPS; i++) {
682                 char name[25];
683                 loff_t off = 0;
684
685                 sprintf(name, "%d", i);
686                 dentry = simple_mkdir(O_dentry, name, 0700, 1);
687                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
688                 if (IS_ERR(dentry)) {
689                         rc = PTR_ERR(dentry);
690                         CERROR("cannot lookup/create O/%s: rc = %d\n",
691                                name, rc);
692                         GOTO(cleanup, rc);
693                 }
694                 filter->fo_dentry_O_groups[i] = dentry;
695
696                 sprintf(name, "O/%d/LAST_ID", i);
697                 filp = filp_open(name, O_CREAT | O_RDWR, 0700);
698                 if (IS_ERR(filp)) {
699                         rc = PTR_ERR(filp);
700                         CERROR("cannot create %s: rc = %d\n", name, rc);
701                         GOTO(cleanup, rc);
702                 }
703                 filter->fo_last_objid_files[i] = filp;
704
705                 if (filp->f_dentry->d_inode->i_size == 0) {
706                         if (i == 0 && filter->fo_fsd->lsd_unused != 0) {
707                                 /* OST conversion, remove sometime post 1.0 */
708                                 filter->fo_last_objids[0] =
709                                         le64_to_cpu(filter->fo_fsd->lsd_unused);
710                                 CWARN("saving old objid "LPU64" to LAST_ID\n",
711                                       filter->fo_last_objids[0]);
712                         } else {
713                                 filter->fo_last_objids[i] = FILTER_INIT_OBJID;
714                         }
715                         rc = filter_update_last_objid(obd, i, 1);
716                         if (rc)
717                                 GOTO(cleanup, rc);
718                         continue;
719                 }
720
721                 rc = fsfilt_read_record(obd, filp, &filter->fo_last_objids[i],
722                                         sizeof(__u64), &off);
723                 if (rc) {
724                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
725                                name, rc);
726                         GOTO(cleanup, rc);
727                 }
728                 filter->fo_last_objids[i] =
729                         le64_to_cpu(filter->fo_last_objids[i]);
730                 CDEBUG(D_HA, "%s: server last_objid group %d: "LPU64"\n",
731                        obd->obd_name, i, filter->fo_last_objids[i]);
732         }
733
734         if (filter->fo_subdir_count) {
735                 O_dentry = filter->fo_dentry_O_groups[0];
736                 OBD_ALLOC(filter->fo_dentry_O_sub,
737                           filter->fo_subdir_count * sizeof(dentry));
738                 if (filter->fo_dentry_O_sub == NULL)
739                         GOTO(cleanup, rc = -ENOMEM);
740
741                 for (i = 0; i < filter->fo_subdir_count; i++) {
742                         char dir[20];
743                         snprintf(dir, sizeof(dir), "d%u", i);
744
745                         dentry = simple_mkdir(O_dentry, dir, 0700, 1);
746                         CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
747                         if (IS_ERR(dentry)) {
748                                 rc = PTR_ERR(dentry);
749                                 CERROR("can't lookup/create O/0/%s: rc = %d\n",
750                                        dir, rc);
751                                 GOTO(cleanup, rc);
752                         }
753                         filter->fo_dentry_O_sub[i] = dentry;
754                 }
755         }
756         RETURN(0);
757
758  cleanup:
759         filter_cleanup_groups(obd);
760         return rc;
761 }
762
763 /* setup the object store with correct subdirectories */
764 static int filter_prep(struct obd_device *obd)
765 {
766         struct lvfs_run_ctxt saved;
767         struct filter_obd *filter = &obd->u.filter;
768         struct file *file;
769         struct inode *inode;
770         int rc = 0;
771         ENTRY;
772
773         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
774         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
775         if (!file || IS_ERR(file)) {
776                 rc = PTR_ERR(file);
777                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
778                        LAST_RCVD, rc);
779                 GOTO(out, rc);
780         }
781         filter->fo_rcvd_filp = file;
782         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
783                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
784                        file->f_dentry->d_inode->i_mode);
785                 GOTO(err_filp, rc = -ENOENT);
786         }
787
788         /* steal operations */
789         inode = file->f_dentry->d_inode;
790         filter->fo_fop = file->f_op;
791         filter->fo_iop = inode->i_op;
792         filter->fo_aops = inode->i_mapping->a_ops;
793
794         rc = filter_init_server_data(obd, file);
795         if (rc) {
796                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
797                 GOTO(err_filp, rc);
798         }
799         /* open and create health check io file*/
800         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
801         if (IS_ERR(file)) {
802                 rc = PTR_ERR(file);
803                 CERROR("OBD filter: cannot open/create %s rc = %d\n", 
804                         HEALTH_CHECK, rc);
805                 GOTO(err_filp, rc);
806         }
807         filter->fo_health_check_filp = file;
808         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
809                 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
810                        file->f_dentry->d_inode->i_mode);
811                 GOTO(err_health_check, rc = -ENOENT);
812         }
813         rc = lvfs_check_io_health(obd, file);
814         if (rc)
815                 GOTO(err_health_check, rc);
816
817         rc = filter_prep_groups(obd);
818         if (rc)
819                 GOTO(err_server_data, rc);
820  out:
821         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
822
823         return(rc);
824
825  err_server_data:
826         //class_disconnect_exports(obd, 0);
827         filter_free_server_data(filter);
828  err_health_check:
829         if (filp_close(filter->fo_health_check_filp, 0))
830                 CERROR("can't close %s after error\n", HEALTH_CHECK);
831         filter->fo_health_check_filp = NULL;
832  err_filp:
833         if (filp_close(filter->fo_rcvd_filp, 0))
834                 CERROR("can't close %s after error\n", LAST_RCVD);
835         filter->fo_rcvd_filp = NULL;
836         goto out;
837 }
838
839 /* cleanup the filter: write last used object id to status file */
840 static void filter_post(struct obd_device *obd)
841 {
842         struct lvfs_run_ctxt saved;
843         struct filter_obd *filter = &obd->u.filter;
844         int rc, i;
845
846         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
847          * best to start a transaction with h_sync, because we removed this
848          * from lastobjid */
849
850         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
851         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
852                                        filter->fo_fsd, 0);
853         if (rc)
854                 CERROR("error writing server data: rc = %d\n", rc);
855
856         for (i = 0; i < FILTER_GROUPS; i++) {
857                 rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1));
858                 if (rc)
859                         CERROR("error writing group %d lastobjid: rc = %d\n",
860                                i, rc);
861         }
862
863         rc = filp_close(filter->fo_rcvd_filp, 0);
864         filter->fo_rcvd_filp = NULL;
865         if (rc)
866                 CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
867
868         rc = filp_close(filter->fo_health_check_filp, 0);
869         filter->fo_health_check_filp = NULL;
870         if (rc)
871                 CERROR("error closing %s: rc = %d\n", HEALTH_CHECK, rc);
872
873         filter_cleanup_groups(obd);
874         filter_free_server_data(filter);
875         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
876 }
877
878 static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa,
879                                obd_id id)
880 {
881         obd_gr group = 0;
882         LASSERT(filter->fo_fsd != NULL);
883
884         if (oa != NULL) {
885                 LASSERT(oa->o_gr <= FILTER_GROUPS);
886                 group = oa->o_gr;
887         }
888
889         spin_lock(&filter->fo_objidlock);
890         filter->fo_last_objids[group] = id;
891         spin_unlock(&filter->fo_objidlock);
892 }
893
894 __u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
895 {
896         obd_id id;
897         obd_gr group = 0;
898         LASSERT(filter->fo_fsd != NULL);
899
900         if (oa != NULL) {
901                 LASSERT(oa->o_gr <= FILTER_GROUPS);
902                 group = oa->o_gr;
903         }
904
905         /* FIXME: object groups */
906         spin_lock(&filter->fo_objidlock);
907         id = filter->fo_last_objids[group];
908         spin_unlock(&filter->fo_objidlock);
909
910         return id;
911 }
912
913 static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
914 {
915         down(&dparent->d_inode->i_sem);
916         return 0;
917 }
918
919 /* We never dget the object parent, so DON'T dput it either */
920 static void filter_parent_unlock(struct dentry *dparent)
921 {
922         up(&dparent->d_inode->i_sem);
923 }
924
925 /* We never dget the object parent, so DON'T dput it either */
926 struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
927 {
928         struct filter_obd *filter = &obd->u.filter;
929         LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */
930
931         if (group > 0 || filter->fo_subdir_count == 0)
932                 return filter->fo_dentry_O_groups[group];
933
934         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
935 }
936
937 /* We never dget the object parent, so DON'T dput it either */
938 struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
939                                   obd_id objid)
940 {
941         unsigned long now = jiffies;
942         struct dentry *dparent = filter_parent(obd, group, objid);
943         int rc;
944
945         if (IS_ERR(dparent))
946                 return dparent;
947
948         rc = filter_lock_dentry(obd, dparent);
949         fsfilt_check_slow(now, obd_timeout, "parent lock");
950         return rc ? ERR_PTR(rc) : dparent;
951 }
952
953 /* How to get files, dentries, inodes from object id's.
954  *
955  * If dir_dentry is passed, the caller has already locked the parent
956  * appropriately for this operation (normally a write lock).  If
957  * dir_dentry is NULL, we do a read lock while we do the lookup to
958  * avoid races with create/destroy and such changing the directory
959  * internal to the filesystem code. */
960 struct dentry *filter_fid2dentry(struct obd_device *obd,
961                                  struct dentry *dir_dentry,
962                                  obd_gr group, obd_id id)
963 {
964         struct dentry *dparent = dir_dentry;
965         struct dentry *dchild;
966         char name[32];
967         int len;
968         ENTRY;
969
970         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) {
971                 CERROR("test case OBD_FAIL_OST_ENOENT\n");
972                 RETURN(ERR_PTR(-ENOENT));
973         }
974
975         if (id == 0) {
976                 CERROR("fatal: invalid object id 0\n");
977                 RETURN(ERR_PTR(-ESTALE));
978         }
979
980         len = sprintf(name, LPU64, id);
981         if (dir_dentry == NULL) {
982                 dparent = filter_parent_lock(obd, group, id);
983                 if (IS_ERR(dparent)) {
984                         CERROR("%s: error getting object "LPU64":"LPU64
985                                " parent: rc %ld\n", obd->obd_name,
986                                id, group, PTR_ERR(dparent));
987                         RETURN(dparent);
988                 }
989         }
990         CDEBUG(D_INODE, "looking up object O/%.*s/%s\n",
991                dparent->d_name.len, dparent->d_name.name, name);
992         dchild = /*ll_*/lookup_one_len(name, dparent, len);
993         if (dir_dentry == NULL)
994                 filter_parent_unlock(dparent);
995         if (IS_ERR(dchild)) {
996                 CERROR("%s: child lookup error %ld\n", obd->obd_name,
997                        PTR_ERR(dchild));
998                 RETURN(dchild);
999         }
1000
1001         if (dchild->d_inode != NULL && is_bad_inode(dchild->d_inode)) {
1002                 CERROR("%s: got bad object "LPU64" inode %lu\n",
1003                        obd->obd_name, id, dchild->d_inode->i_ino);
1004                 f_dput(dchild);
1005                 RETURN(ERR_PTR(-ENOENT));
1006         }
1007
1008         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
1009                name, dchild, atomic_read(&dchild->d_count));
1010
1011         LASSERT(atomic_read(&dchild->d_count) > 0);
1012
1013         RETURN(dchild);
1014 }
1015
1016 static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
1017 {
1018         struct lustre_handle lockh;
1019         int flags = LDLM_AST_DISCARD_DATA, rc;
1020         struct ldlm_res_id res_id = { .name = { objid } };
1021         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1022
1023         ENTRY;
1024         /* Tell the clients that the object is gone now and that they should
1025          * throw away any cached pages. */
1026         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
1027                               LDLM_EXTENT, &policy, LCK_PW,
1028                               &flags, ldlm_blocking_ast, ldlm_completion_ast,
1029                               NULL, NULL, NULL, 0, NULL, &lockh);
1030
1031         /* We only care about the side-effects, just drop the lock. */
1032         if (rc == ELDLM_OK)
1033                 ldlm_lock_decref(&lockh, LCK_PW);
1034
1035         RETURN(rc);
1036 }
1037
1038 /* Caller must hold LCK_PW on parent and push us into kernel context.
1039  * Caller is also required to ensure that dchild->d_inode exists. */
1040 static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
1041                                    struct dentry *dparent,
1042                                    struct dentry *dchild)
1043 {
1044         struct inode *inode = dchild->d_inode;
1045         int rc;
1046         ENTRY;
1047
1048         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1049                 CERROR("destroying objid %.*s ino %lu nlink %lu count %d\n",
1050                        dchild->d_name.len, dchild->d_name.name, inode->i_ino,
1051                        (unsigned long)inode->i_nlink,
1052                        atomic_read(&inode->i_count));
1053         }
1054
1055         rc = vfs_unlink(dparent->d_inode, dchild);
1056         if (rc)
1057                 CERROR("error unlinking objid %.*s: rc %d\n",
1058                        dchild->d_name.len, dchild->d_name.name, rc);
1059         RETURN(rc);
1060 }
1061
1062 static int filter_intent_policy(struct ldlm_namespace *ns,
1063                                 struct ldlm_lock **lockp, void *req_cookie,
1064                                 ldlm_mode_t mode, int flags, void *data)
1065 {
1066         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1067         struct ptlrpc_request *req = req_cookie;
1068         struct ldlm_lock *lock = *lockp, *l = NULL;
1069         struct ldlm_resource *res = lock->l_resource;
1070         ldlm_processing_policy policy;
1071         struct ost_lvb *res_lvb, *reply_lvb;
1072         struct ldlm_reply *rep;
1073         struct list_head *tmp;
1074         ldlm_error_t err;
1075         int tmpflags = 0, rc, repsize[2] = {sizeof(*rep), sizeof(*reply_lvb)};
1076         int only_liblustre = 0;
1077         ENTRY;
1078
1079         policy = ldlm_get_processing_policy(res);
1080         LASSERT(policy != NULL);
1081         LASSERT(req != NULL);
1082
1083         rc = lustre_pack_reply(req, 2, repsize, NULL);
1084         if (rc)
1085                 RETURN(req->rq_status = rc);
1086
1087         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
1088         LASSERT(rep != NULL);
1089
1090         reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
1091         LASSERT(reply_lvb != NULL);
1092
1093         //fixup_handle_for_resent_req(req, lock, &lockh);
1094
1095         /* If we grant any lock at all, it will be a whole-file read lock.
1096          * Call the extent policy function to see if our request can be
1097          * granted, or is blocked. */
1098         lock->l_policy_data.l_extent.start = 0;
1099         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
1100         lock->l_req_mode = LCK_PR;
1101
1102         LASSERT(ns == res->lr_namespace);
1103         l_lock(&ns->ns_lock);
1104
1105         res->lr_tmp = &rpc_list;
1106         rc = policy(lock, &tmpflags, 0, &err);
1107         res->lr_tmp = NULL;
1108
1109         /* FIXME: we should change the policy function slightly, to not make
1110          * this list at all, since we just turn around and free it */
1111         while (!list_empty(&rpc_list)) {
1112                 struct ldlm_ast_work *w =
1113                         list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
1114                 list_del(&w->w_list);
1115                 LDLM_LOCK_PUT(w->w_lock);
1116                 OBD_FREE(w, sizeof(*w));
1117         }
1118
1119         /* The lock met with no resistance; we're finished. */
1120         if (rc == LDLM_ITER_CONTINUE) {
1121                 l_unlock(&ns->ns_lock);
1122                 /*
1123                  * do not grant locks to the liblustre clients: they cannot
1124                  * handle ASTs robustly.
1125                  */
1126                 if (lock->l_export->exp_libclient) {
1127                         ldlm_resource_unlink_lock(lock);
1128                         RETURN(ELDLM_LOCK_ABORTED);
1129                 }
1130                 RETURN(ELDLM_LOCK_REPLACED);
1131         }
1132
1133         /* Do not grant any lock, but instead send GL callbacks.  The extent
1134          * policy nicely created a list of all PW locks for us.  We will choose
1135          * the highest of those which are larger than the size in the LVB, if
1136          * any, and perform a glimpse callback. */
1137         down(&res->lr_lvb_sem);
1138         res_lvb = res->lr_lvb_data;
1139         LASSERT(res_lvb != NULL);
1140         *reply_lvb = *res_lvb;
1141         up(&res->lr_lvb_sem);
1142
1143         list_for_each(tmp, &res->lr_granted) {
1144                 struct ldlm_lock *tmplock =
1145                         list_entry(tmp, struct ldlm_lock, l_res_link);
1146
1147                 if (tmplock->l_granted_mode == LCK_PR)
1148                         continue;
1149                 /*
1150                  * ->ns_lock guarantees that no new locks are granted, and,
1151                  * therefore, that res->lr_lvb_data cannot increase beyond the
1152                  * end of already granted lock. As a result, it is safe to
1153                  * check against "stale" reply_lvb->lvb_size value without
1154                  * res->lr_lvb_sem.
1155                  */
1156                 if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
1157                         continue;
1158
1159                 /* Don't send glimpse ASTs to liblustre clients.  They aren't
1160                  * listening for them, and they do entirely synchronous I/O
1161                  * anyways. */
1162                 if (tmplock->l_export == NULL ||
1163                     tmplock->l_export->exp_libclient == 1) {
1164                         only_liblustre = 1;
1165                         continue;
1166                 }
1167
1168                 if (l == NULL) {
1169                         l = LDLM_LOCK_GET(tmplock);
1170                         continue;
1171                 }
1172
1173                 if (l->l_policy_data.l_extent.start >
1174                     tmplock->l_policy_data.l_extent.start)
1175                         continue;
1176
1177                 LDLM_LOCK_PUT(l);
1178                 l = LDLM_LOCK_GET(tmplock);
1179         }
1180         l_unlock(&ns->ns_lock);
1181
1182         /* There were no PW locks beyond the size in the LVB; finished. */
1183         if (l == NULL) {
1184                 if (only_liblustre) {
1185                         /* If we discovered a liblustre client with a PW lock,
1186                          * however, the LVB may be out of date!  The LVB is
1187                          * updated only on glimpse (which we don't do for
1188                          * liblustre clients) and cancel (which the client
1189                          * obviously has not yet done).  So if it has written
1190                          * data but kept the lock, the LVB is stale and needs
1191                          * to be updated from disk.
1192                          *
1193                          * Of course, this will all disappear when we switch to
1194                          * taking liblustre locks on the OST. */
1195                         if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
1196                                 ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
1197                 }
1198                 RETURN(ELDLM_LOCK_ABORTED);
1199         }
1200         /*
1201          * This check is for lock taken in filter_prepare_destroy() that does
1202          * not have l_glimpse_ast set. So the logic is: if there is a lock
1203          * with no l_glimpse_ast set, this object is being destroyed already.
1204          *
1205          * Hence, if you are grabbing DLM locks on the server, always set
1206          * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
1207          */
1208         if (l->l_glimpse_ast == NULL) {
1209                 /* We are racing with unlink(); just return -ENOENT */
1210                 rep->lock_policy_res1 = -ENOENT;
1211                 goto out;
1212         }
1213
1214         LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
1215         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
1216         /* Update the LVB from disk if the AST failed (this is a legal race) */
1217         /*
1218          * XXX nikita: situation when ldlm_server_glimpse_ast() failed before
1219          * sending ast is not handled. This can result in lost client writes.
1220          */
1221         if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
1222                 ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
1223
1224         down(&res->lr_lvb_sem);
1225         *reply_lvb = *res_lvb;
1226         up(&res->lr_lvb_sem);
1227
1228  out:
1229         LDLM_LOCK_PUT(l);
1230
1231         RETURN(ELDLM_LOCK_ABORTED);
1232 }
1233
1234 /*
1235  * per-obd_device iobuf pool.
1236  *
1237  * To avoid memory deadlocks in low-memory setups, amount of dynamic
1238  * allocations in write-path has to be minimized (see bug 5137).
1239  *
1240  * Pages, niobuf_local's and niobuf_remote's are pre-allocated and attached to
1241  * OST threads (see ost_thread_{init,done}()).
1242  *
1243  * "iobuf's" used by filter cannot be attached to OST thread, however, because
1244  * at the OST layer there are only (potentially) multiple obd_device of type
1245  * unknown at the time of OST thread creation.
1246  *
1247  * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool
1248  * field). This array has size OST_NUM_THREADS, so that each OST thread uses
1249  * it's very own iobuf.
1250  *
1251  * Functions below
1252  *
1253  *     filter_kiobuf_pool_init()
1254  *
1255  *     filter_kiobuf_pool_done()
1256  *
1257  *     filter_iobuf_get()
1258  *
1259  * operate on this array. They are "generic" in a sense that they don't depend
1260  * on actual type of iobuf's (the latter depending on Linux kernel version).
1261  */
1262
1263 /*
1264  * destroy pool created by filter_iobuf_pool_init
1265  */
1266 static void filter_iobuf_pool_done(struct filter_obd *filter)
1267 {
1268         void **pool;
1269         int i;
1270
1271         ENTRY;
1272
1273         pool = filter->fo_iobuf_pool;
1274         if (pool != NULL) {
1275                 for (i = 0; i < OST_NUM_THREADS; ++ i) {
1276                         if (pool[i] != NULL)
1277                                 filter_free_iobuf(pool[i]);
1278                 }
1279                 OBD_FREE(pool, OST_NUM_THREADS * sizeof pool[0]);
1280                 filter->fo_iobuf_pool = NULL;
1281         }
1282         EXIT;
1283 }
1284
1285 /*
1286  * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write().
1287  */
1288 static int filter_iobuf_pool_init(struct filter_obd *filter, int count)
1289 {
1290         void **pool;
1291         int i;
1292         int result = 0;
1293
1294         ENTRY;
1295
1296         LASSERT(count <= OST_NUM_THREADS);
1297
1298         OBD_ALLOC_GFP(pool, OST_NUM_THREADS * sizeof pool[0], GFP_KERNEL);
1299         if (pool == NULL)
1300                 RETURN(-ENOMEM);
1301
1302         filter->fo_iobuf_pool = pool;
1303         filter->fo_iobuf_count = count;
1304         for (i = 0; i < count; ++ i) {
1305                 /*
1306                  * allocate kiobuf to be used by i-th OST thread.
1307                  */
1308                 result = filter_alloc_iobuf(filter, OBD_BRW_WRITE,
1309                                             PTLRPC_MAX_BRW_PAGES,
1310                                             &pool[i]);
1311                 if (result != 0) {
1312                         filter_iobuf_pool_done(filter);
1313                         break;
1314                 }
1315         }
1316         RETURN(result);
1317 }
1318
1319 /*
1320  * return iobuf preallocated by filter_iobuf_pool_init() for @thread.
1321  */
1322 void *filter_iobuf_get(struct ptlrpc_thread *thread, struct filter_obd *filter)
1323 {
1324         void *kio;
1325
1326         LASSERT(thread->t_id < filter->fo_iobuf_count);
1327         kio = filter->fo_iobuf_pool[thread->t_id];
1328         LASSERT(kio != NULL);
1329         return kio;
1330 }
1331
1332 /* mount the file system (secretly).  lustre_cfg parameters are:
1333  * 1 = device
1334  * 2 = fstype
1335  * 3 = flags: failover=f, failout=n
1336  * 4 = mount options
1337  */
1338 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1339                         void *option)
1340 {
1341         struct lustre_cfg* lcfg = buf;
1342         struct filter_obd *filter = &obd->u.filter;
1343         struct vfsmount *mnt;
1344         struct lustre_mount_info *lmi;
1345         char *str;
1346         char ns_name[48];
1347         int rc;
1348         ENTRY;
1349
1350         if (lcfg->lcfg_bufcount < 3 ||
1351             LUSTRE_CFG_BUFLEN(lcfg, 1) < 1 ||
1352             LUSTRE_CFG_BUFLEN(lcfg, 2) < 1)
1353                 RETURN(-EINVAL);
1354
1355         lmi = server_get_mount(obd->obd_name);
1356         if (lmi) {
1357                 /* We already mounted in lustre_fill_super.
1358                    lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1359                 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
1360                 mnt = lmi->lmi_mnt;
1361                 obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1362         } else {
1363                 /* old path - used by lctl */
1364                 CERROR("Using old MDS mount method\n");
1365                 mnt = do_kern_mount(lustre_cfg_string(lcfg, 2),
1366                                     MS_NOATIME|MS_NODIRATIME,
1367                                     lustre_cfg_string(lcfg, 1), option);    
1368                 if (IS_ERR(mnt)) {
1369                         rc = PTR_ERR(mnt);
1370                         LCONSOLE_ERROR("Can't mount disk %s (%d)\n",
1371                                        lustre_cfg_string(lcfg, 1), rc);
1372                         RETURN(rc);
1373                 }
1374
1375                 obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
1376         }
1377         if (IS_ERR(obd->obd_fsops))
1378                 GOTO(err_mntput, rc = PTR_ERR(obd->obd_fsops));
1379
1380         rc = filter_iobuf_pool_init(filter, OST_NUM_THREADS);
1381         if (rc != 0)
1382                 GOTO(err_ops, rc);
1383
1384         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1385
1386         /* failover is the default */
1387         obd->obd_replayable = 1;
1388         obd_sync_filter = 1;
1389
1390         if (lcfg->lcfg_bufcount > 3 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
1391                 str = lustre_cfg_string(lcfg, 3);
1392                 if (strchr(str, 'n')) {
1393                         CWARN("%s: recovery disabled\n", obd->obd_name);
1394                         obd->obd_replayable = 0;
1395                         obd_sync_filter = 0;
1396                 }
1397         }
1398
1399         filter->fo_vfsmnt = mnt;
1400         filter->fo_sb = mnt->mnt_sb;
1401         filter->fo_fstype = mnt->mnt_sb->s_type->name;
1402         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1403
1404         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1405         obd->obd_lvfs_ctxt.pwdmnt = mnt;
1406         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
1407         obd->obd_lvfs_ctxt.fs = get_ds();
1408         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
1409
1410         rc = filter_prep(obd);
1411         if (rc)
1412                 GOTO(err_ops, rc);
1413
1414         filter->fo_destroy_in_progress = 0;
1415         sema_init(&filter->fo_create_lock, 1);
1416
1417         spin_lock_init(&filter->fo_translock);
1418         spin_lock_init(&filter->fo_objidlock);
1419         spin_lock_init(&filter->fo_stats_lock);
1420         INIT_LIST_HEAD(&filter->fo_export_list);
1421         sema_init(&filter->fo_alloc_lock, 1);
1422         spin_lock_init(&filter->fo_r_pages.oh_lock);
1423         spin_lock_init(&filter->fo_w_pages.oh_lock);
1424         spin_lock_init(&filter->fo_read_rpc_hist.oh_lock);
1425         spin_lock_init(&filter->fo_write_rpc_hist.oh_lock);
1426         spin_lock_init(&filter->fo_r_io_time.oh_lock);
1427         spin_lock_init(&filter->fo_w_io_time.oh_lock);
1428         spin_lock_init(&filter->fo_r_discont_pages.oh_lock);
1429         spin_lock_init(&filter->fo_w_discont_pages.oh_lock);
1430         spin_lock_init(&filter->fo_r_discont_blocks.oh_lock);
1431         spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
1432         spin_lock_init(&filter->fo_r_disk_iosize.oh_lock);
1433         spin_lock_init(&filter->fo_w_disk_iosize.oh_lock);
1434         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
1435
1436         sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
1437         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1438         if (obd->obd_namespace == NULL)
1439                 GOTO(err_post, rc = -ENOMEM);
1440         obd->obd_namespace->ns_lvbp = obd;
1441         obd->obd_namespace->ns_lvbo = &filter_lvbo;
1442         ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
1443
1444         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1445                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1446
1447         rc = llog_cat_initialize(obd, 1);
1448         if (rc) {
1449                 CERROR("failed to setup llogging subsystems\n");
1450                 GOTO(err_post, rc);
1451         }
1452
1453         rc = filter_quota_setup(filter);
1454         if (rc) {
1455                 GOTO(err_post, rc);
1456         }
1457
1458         if (obd->obd_recovering) {
1459                 LCONSOLE_WARN("OST %s now serving %s, but will be in recovery "
1460                               "until %d %s reconnect, or if no clients "
1461                               "reconnect for %d:%.02d; during that time new "
1462                               "clients will not be allowed to connect. "
1463                               "Recovery progress can be monitored by watching "
1464                               "/proc/fs/lustre/obdfilter/%s/recovery_status.\n",
1465                               obd->obd_name,
1466                               lustre_cfg_string(lcfg, 1),
1467                               obd->obd_recoverable_clients,
1468                               (obd->obd_recoverable_clients == 1)
1469                               ? "client" : "clients",
1470                               (int)(OBD_RECOVERY_TIMEOUT / HZ) / 60,
1471                               (int)(OBD_RECOVERY_TIMEOUT / HZ) % 60,
1472                               obd->obd_name);
1473         } else {
1474                 LCONSOLE_INFO("OST %s now serving %s with recovery %s.\n",
1475                               obd->obd_name,
1476                               lustre_cfg_string(lcfg, 1),
1477                               obd->obd_replayable ? "enabled" : "disabled");
1478         }
1479
1480         RETURN(0);
1481
1482 err_post:
1483         filter_post(obd);
1484 err_ops:
1485         fsfilt_put_ops(obd->obd_fsops);
1486         filter_iobuf_pool_done(filter);
1487 err_mntput:
1488         if (lmi) {
1489                 server_put_mount(obd->obd_name, mnt);
1490         } else {
1491                 /* old method */
1492                 unlock_kernel();
1493                 mntput(mnt);
1494                 lock_kernel();
1495         }
1496         filter->fo_sb = 0;
1497         return rc;
1498 }
1499
1500 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1501 {
1502         struct lprocfs_static_vars lvars;
1503         struct lustre_cfg* lcfg = buf;
1504         unsigned long page;
1505         int rc;
1506
1507         if (!LUSTRE_CFG_BUFLEN(lcfg, 1) || !LUSTRE_CFG_BUFLEN(lcfg, 2))
1508                 RETURN(-EINVAL);
1509
1510         /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */
1511         page = get_zeroed_page(GFP_KERNEL);
1512         if (!page)
1513                 RETURN(-ENOMEM);
1514
1515         memcpy((void *)page, lustre_cfg_buf(lcfg, 4),
1516                LUSTRE_CFG_BUFLEN(lcfg, 4));
1517         rc = filter_common_setup(obd, len, buf, (void *)page);
1518         free_page(page);
1519
1520         lprocfs_init_vars(filter, &lvars);
1521         if (rc == 0 && lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
1522             lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) {
1523                 /* Init obdfilter private stats here */
1524                 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1525                                      LPROCFS_CNTR_AVGMINMAX,
1526                                      "read_bytes", "bytes");
1527                 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1528                                      LPROCFS_CNTR_AVGMINMAX,
1529                                      "write_bytes", "bytes");
1530
1531                 lproc_filter_attach_seqstat(obd);
1532         }
1533
1534         ping_evictor_start();
1535
1536         return rc;
1537 }
1538
1539 static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/;
1540 static struct llog_operations filter_size_orig_logops = {
1541         lop_setup: llog_obd_origin_setup,
1542         lop_cleanup: llog_obd_origin_cleanup,
1543         lop_add: llog_obd_origin_add
1544 };
1545
1546 static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
1547                             int count, struct llog_catid *catid)
1548 {
1549         struct llog_ctxt *ctxt;
1550         int rc;
1551         ENTRY;
1552
1553         filter_mds_ost_repl_logops = llog_client_ops;
1554         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
1555         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
1556         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
1557
1558         rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
1559                         &filter_mds_ost_repl_logops);
1560         if (rc)
1561                 RETURN(rc);
1562
1563         /* FIXME - assign unlink_cb for filter's recovery */
1564         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
1565         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
1566
1567         rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
1568                         &filter_size_orig_logops);
1569         RETURN(rc);
1570 }
1571
1572 static int filter_llog_finish(struct obd_device *obd, int count)
1573 {
1574         struct llog_ctxt *ctxt;
1575         int rc = 0, rc2 = 0;
1576         ENTRY;
1577
1578         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
1579         if (ctxt)
1580                 rc = llog_cleanup(ctxt);
1581
1582         ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
1583         if (ctxt)
1584                 rc2 = llog_cleanup(ctxt);
1585         if (!rc)
1586                 rc = rc2;
1587
1588         RETURN(rc);
1589 }
1590
1591 static int filter_precleanup(struct obd_device *obd, int stage)
1592 {
1593         int rc = 0;
1594         ENTRY;
1595
1596         switch(stage) {
1597         case OBD_CLEANUP_EXPORTS:
1598                 target_cleanup_recovery(obd);
1599                 break;
1600         case OBD_CLEANUP_SELF_EXP:
1601                 rc = filter_llog_finish(obd, 0);
1602         }
1603         RETURN(rc);
1604 }
1605
1606 static int filter_cleanup(struct obd_device *obd)
1607 {
1608         struct filter_obd *filter = &obd->u.filter;
1609         lvfs_sbdev_type save_dev;
1610         int must_relock = 0, must_put = 0;
1611         ENTRY;
1612
1613         if (obd->obd_fail)
1614                 CERROR("%s: shutting down for failover; client state will"
1615                        " be preserved.\n", obd->obd_name);
1616
1617         if (!list_empty(&obd->obd_exports)) {
1618                 CERROR("%s: still has clients!\n", obd->obd_name);
1619                 class_disconnect_exports(obd);
1620                 if (!list_empty(&obd->obd_exports)) {
1621                         CERROR("still has exports after forced cleanup?\n");
1622                         RETURN(-EBUSY);
1623                 }
1624         }
1625
1626         ping_evictor_stop();
1627
1628         filter_quota_cleanup(filter);
1629
1630         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
1631
1632         if (filter->fo_sb == NULL)
1633                 RETURN(0);
1634         save_dev = lvfs_sbdev(filter->fo_sb);
1635
1636         lprocfs_free_obd_stats(obd);
1637         lprocfs_obd_cleanup(obd);
1638
1639         filter_post(obd);
1640
1641         shrink_dcache_parent(filter->fo_sb->s_root);
1642
1643         LL_DQUOT_OFF(filter->fo_sb);
1644
1645         must_put = server_put_mount(obd->obd_name, filter->fo_vfsmnt);
1646         /* must_put is for old method (l_p_m returns non-0 on err) */
1647
1648         /* We can only unlock kernel if we are in the context of sys_ioctl,
1649            otherwise we never called lock_kernel */
1650         if (kernel_locked()) {
1651                 unlock_kernel();
1652                 must_relock++;
1653         }
1654         
1655         if (must_put) 
1656                 /* In case we didn't mount with lustre_get_mount -- old method*/
1657                 mntput(filter->fo_vfsmnt);
1658         
1659         filter->fo_sb = NULL;
1660
1661         lvfs_clear_rdonly(save_dev);
1662
1663         if (must_relock)
1664                 lock_kernel();
1665
1666         fsfilt_put_ops(obd->obd_fsops);
1667
1668         filter_iobuf_pool_done(filter);
1669
1670         LCONSOLE_INFO("OST %s has stopped.\n", obd->obd_name);
1671
1672         RETURN(0);
1673 }
1674
1675 static int filter_connect_internal(struct obd_export *exp,
1676                                    struct obd_connect_data *data)
1677 {
1678         if (data != NULL) {
1679                 CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
1680                        " ocd_version: %x ocd_grant: %d\n",
1681                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1682                        data->ocd_connect_flags, data->ocd_version,
1683                        data->ocd_grant);
1684
1685                 data->ocd_connect_flags &= OST_CONNECT_SUPPORTED;
1686                 exp->exp_connect_flags = data->ocd_connect_flags;
1687                 data->ocd_version = LUSTRE_VERSION_CODE;
1688
1689                 if (exp->exp_connect_flags & OBD_CONNECT_GRANT) {
1690                         obd_size left, want;
1691
1692                         spin_lock(&exp->exp_obd->obd_osfs_lock);
1693                         left = filter_grant_space_left(exp);
1694                         want = data->ocd_grant;
1695                         data->ocd_grant = filter_grant(exp, 0, want, left);
1696                         spin_unlock(&exp->exp_obd->obd_osfs_lock);
1697
1698                         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %d want: "
1699                                "%lld left: %lld\n", exp->exp_obd->obd_name,
1700                                exp->exp_client_uuid.uuid, exp,
1701                                data->ocd_grant, want, left);
1702                 }
1703         }
1704
1705         RETURN(0);
1706 }
1707
1708 static int filter_reconnect(struct obd_export *exp, struct obd_device *obd,
1709                             struct obd_uuid *cluuid,
1710                             struct obd_connect_data *data)
1711 {
1712         int rc;
1713         ENTRY;
1714
1715         if (exp == NULL || obd == NULL || cluuid == NULL)
1716                 RETURN(-EINVAL);
1717
1718         rc = filter_connect_internal(exp, data);
1719
1720         RETURN(rc);
1721 }
1722
1723 /* nearly identical to mds_connect */
1724 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1725                           struct obd_uuid *cluuid,struct obd_connect_data *data)
1726 {
1727         struct obd_export *exp;
1728         struct filter_export_data *fed;
1729         struct filter_client_data *fcd = NULL;
1730         struct filter_obd *filter = &obd->u.filter;
1731         int rc;
1732         ENTRY;
1733
1734         if (conn == NULL || obd == NULL || cluuid == NULL)
1735                 RETURN(-EINVAL);
1736
1737         rc = class_connect(conn, obd, cluuid);
1738         if (rc)
1739                 RETURN(rc);
1740         exp = class_conn2export(conn);
1741         LASSERT(exp != NULL);
1742
1743         fed = &exp->exp_filter_data;
1744         spin_lock_init(&fed->fed_lock);
1745
1746         if (!obd->obd_replayable)
1747                 GOTO(cleanup, rc = 0);
1748
1749         OBD_ALLOC(fcd, sizeof(*fcd));
1750         if (!fcd) {
1751                 CERROR("filter: out of memory for client data\n");
1752                 GOTO(cleanup, rc = -ENOMEM);
1753         }
1754
1755         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1756         fed->fed_fcd = fcd;
1757
1758         rc = filter_client_add(obd, filter, fed, -1);
1759         if (!rc)
1760                 filter_connect_internal(exp, data);
1761
1762         GOTO(cleanup, rc);
1763
1764 cleanup:
1765         if (rc) {
1766                 if (fcd) {
1767                         OBD_FREE(fcd, sizeof(*fcd));
1768                         fed->fed_fcd = NULL;
1769                 }
1770                 class_disconnect(exp);
1771         } else {
1772                 class_export_put(exp);
1773         }
1774
1775         RETURN(rc);
1776 }
1777
1778 /* Do extra sanity checks for grant accounting.  We do this at connect,
1779  * disconnect, and statfs RPC time, so it shouldn't be too bad.  We can
1780  * always get rid of it or turn it off when we know accounting is good. */
1781 static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
1782 {
1783         struct filter_export_data *fed;
1784         struct obd_export *exp;
1785         obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
1786         obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
1787         obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
1788
1789         if (list_empty(&obd->obd_exports))
1790                 return;
1791
1792         spin_lock(&obd->obd_osfs_lock);
1793         spin_lock(&obd->obd_dev_lock);
1794         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1795                 int error = 0;
1796                 fed = &exp->exp_filter_data;
1797                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
1798                     fed->fed_dirty < 0)
1799                         error = 1;
1800                 if (maxsize > 0) { /* we may not have done a statfs yet */
1801                         LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
1802                                  "%s: cli %s/%p %ld+%ld > "LPU64"\n", func,
1803                                  exp->exp_client_uuid.uuid, exp,
1804                                  fed->fed_grant, fed->fed_pending, maxsize);
1805                         LASSERTF(fed->fed_dirty <= maxsize,
1806                                  "%s: cli %s/%p %ld > "LPU64"\n", func,
1807                                  exp->exp_client_uuid.uuid, exp,
1808                                  fed->fed_dirty, maxsize);
1809                 }
1810                 if (error)
1811                         CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
1812                                obd->obd_name, exp->exp_client_uuid.uuid, exp,
1813                                fed->fed_dirty, fed->fed_pending,fed->fed_grant);
1814                 else
1815                         CDEBUG(D_CACHE, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
1816                                obd->obd_name, exp->exp_client_uuid.uuid, exp,
1817                                fed->fed_dirty, fed->fed_pending,fed->fed_grant);
1818                 tot_granted += fed->fed_grant + fed->fed_pending;
1819                 tot_pending += fed->fed_pending;
1820                 tot_dirty += fed->fed_dirty;
1821         }
1822         fo_tot_granted = obd->u.filter.fo_tot_granted;
1823         fo_tot_pending = obd->u.filter.fo_tot_pending;
1824         fo_tot_dirty = obd->u.filter.fo_tot_dirty;
1825         spin_unlock(&obd->obd_dev_lock);
1826         spin_unlock(&obd->obd_osfs_lock);
1827
1828         /* Do these assertions outside the spinlocks so we don't kill system */
1829         if (tot_granted != fo_tot_granted)
1830                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
1831                        func, tot_granted, fo_tot_granted);
1832         if (tot_pending != fo_tot_pending)
1833                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
1834                        func, tot_pending, fo_tot_pending);
1835         if (tot_dirty != fo_tot_dirty)
1836                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
1837                        func, tot_dirty, fo_tot_dirty);
1838         if (tot_pending > tot_granted)
1839                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
1840                        func, tot_pending, tot_granted);
1841         if (tot_granted > maxsize)
1842                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
1843                        func, tot_granted, maxsize);
1844         if (tot_dirty > maxsize)
1845                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
1846                        func, tot_dirty, maxsize);
1847 }
1848
1849 /* Remove this client from the grant accounting totals.  We also remove
1850  * the export from the obd device under the osfs and dev locks to ensure
1851  * that the filter_grant_sanity_check() calculations are always valid.
1852  * The client should do something similar when it invalidates its import. */
1853 static void filter_grant_discard(struct obd_export *exp)
1854 {
1855         struct obd_device *obd = exp->exp_obd;
1856         struct filter_obd *filter = &obd->u.filter;
1857         struct filter_export_data *fed = &exp->exp_filter_data;
1858
1859         spin_lock(&obd->obd_osfs_lock);
1860         spin_lock(&obd->obd_dev_lock);
1861         list_del_init(&exp->exp_obd_chain);
1862         spin_unlock(&obd->obd_dev_lock);
1863
1864         LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
1865                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
1866                  obd->obd_name, filter->fo_tot_granted,
1867                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
1868         filter->fo_tot_granted -= fed->fed_grant;
1869         LASSERTF(filter->fo_tot_pending >= fed->fed_pending,
1870                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
1871                  obd->obd_name, filter->fo_tot_pending,
1872                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
1873         LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
1874                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
1875                  obd->obd_name, filter->fo_tot_dirty,
1876                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
1877         filter->fo_tot_dirty -= fed->fed_dirty;
1878         fed->fed_dirty = 0;
1879         fed->fed_grant = 0;
1880
1881         spin_unlock(&obd->obd_osfs_lock);
1882 }
1883
1884 static int filter_destroy_export(struct obd_export *exp)
1885 {
1886         ENTRY;
1887
1888         if (exp->exp_filter_data.fed_pending)
1889                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
1890                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1891                        exp, exp->exp_filter_data.fed_pending);
1892
1893         target_destroy_export(exp);
1894
1895         if (exp->exp_obd->obd_replayable)
1896                 filter_client_free(exp);
1897
1898         filter_grant_discard(exp);
1899
1900         if (!(exp->exp_flags & OBD_OPT_FORCE))
1901                 filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
1902
1903         RETURN(0);
1904 }
1905
1906 /* also incredibly similar to mds_disconnect */
1907 static int filter_disconnect(struct obd_export *exp)
1908 {
1909         struct obd_device *obd = exp->exp_obd;
1910         struct llog_ctxt *ctxt;
1911         int rc, err;
1912         ENTRY;
1913
1914         LASSERT(exp);
1915         class_export_get(exp);
1916
1917         if (!(exp->exp_flags & OBD_OPT_FORCE))
1918                 filter_grant_sanity_check(obd, __FUNCTION__);
1919         filter_grant_discard(exp);
1920
1921         /* Disconnect early so that clients can't keep using export */
1922         rc = class_disconnect(exp);
1923         ldlm_cancel_locks_for_export(exp);
1924
1925         fsfilt_sync(obd, obd->u.filter.fo_sb);
1926
1927         /* flush any remaining cancel messages out to the target */
1928         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
1929         err = llog_sync(ctxt, exp);
1930         if (err)
1931                 CERROR("error flushing logs to MDS: rc %d\n", err);
1932
1933         class_export_put(exp);
1934         RETURN(rc);
1935 }
1936
1937 struct dentry *__filter_oa2dentry(struct obd_device *obd,
1938                                   struct obdo *oa, const char *what, int quiet)
1939 {
1940         struct dentry *dchild = NULL;
1941         obd_gr group = 0;
1942
1943         if (oa->o_valid & OBD_MD_FLGROUP)
1944                 group = oa->o_gr;
1945
1946         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
1947
1948         if (IS_ERR(dchild)) {
1949                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1950                 RETURN(dchild);
1951         }
1952
1953         if (dchild->d_inode == NULL) {
1954                 if (!quiet)
1955                         CERROR("%s: %s on non-existent object: "LPU64"\n",
1956                                obd->obd_name, what, oa->o_id);
1957                 f_dput(dchild);
1958                 RETURN(ERR_PTR(-ENOENT));
1959         }
1960
1961         return dchild;
1962 }
1963
1964 static int filter_getattr(struct obd_export *exp, struct obdo *oa,
1965                           struct lov_stripe_md *md)
1966 {
1967         struct dentry *dentry = NULL;
1968         struct obd_device *obd;
1969         int rc = 0;
1970         ENTRY;
1971
1972         obd = class_exp2obd(exp);
1973         if (obd == NULL) {
1974                 CDEBUG(D_IOCTL, "invalid client export %p\n", exp);
1975                 RETURN(-EINVAL);
1976         }
1977
1978         dentry = filter_oa2dentry(obd, oa);
1979         if (IS_ERR(dentry))
1980                 RETURN(PTR_ERR(dentry));
1981
1982         /* Limit the valid bits in the return data to what we actually use */
1983         oa->o_valid = OBD_MD_FLID;
1984         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1985
1986         f_dput(dentry);
1987         RETURN(rc);
1988 }
1989
1990 /* this is called from filter_truncate() until we have filter_punch() */
1991 int filter_setattr(struct obd_export *exp, struct obdo *oa,
1992                    struct lov_stripe_md *md, struct obd_trans_info *oti)
1993 {
1994         struct obd_device *obd;
1995         struct lvfs_run_ctxt saved;
1996         struct filter_obd *filter;
1997         struct dentry *dentry;
1998         struct iattr iattr;
1999         uid_t orig_uid = 0;
2000         gid_t orig_gid = 0;
2001         struct ldlm_res_id res_id = { .name = { oa->o_id } };
2002         struct ldlm_resource *res;
2003         void *handle;
2004         struct llog_cookie *fcc = NULL;
2005         int rc, rc2;
2006         ENTRY;
2007
2008         dentry = __filter_oa2dentry(exp->exp_obd, oa, __FUNCTION__, 1);
2009         if (IS_ERR(dentry))
2010                 RETURN(PTR_ERR(dentry));
2011
2012         obd = exp->exp_obd;
2013         filter = &obd->u.filter;
2014
2015         iattr_from_obdo(&iattr, oa, oa->o_valid);
2016
2017         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2018         lock_kernel();
2019
2020         if (oa->o_valid & OBD_MD_FLCOOKIE) {
2021                 OBD_ALLOC(fcc, sizeof(*fcc));
2022                 if (fcc != NULL)
2023                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
2024         }
2025
2026         if (iattr.ia_valid & ATTR_SIZE)
2027                 down(&dentry->d_inode->i_sem);
2028
2029         if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
2030                 orig_uid = dentry->d_inode->i_uid;
2031                 orig_gid = dentry->d_inode->i_gid;
2032                 handle = fsfilt_start_log(exp->exp_obd, dentry->d_inode,
2033                                           FSFILT_OP_SETATTR, oti, 1);
2034         } else {
2035                 handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
2036                                       FSFILT_OP_SETATTR, oti);
2037         }
2038
2039         if (IS_ERR(handle))
2040                 GOTO(out_unlock, rc = PTR_ERR(handle));
2041
2042         if (iattr.ia_valid & ATTR_ATTR_FLAG) {
2043                 rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
2044                                       EXT3_IOC_SETFLAGS,
2045                                       (long)&iattr.ia_attr_flags);
2046         } else {
2047                 rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
2048                 if (fcc != NULL)
2049                         /* set cancel cookie callback function */
2050                         fsfilt_add_journal_cb(obd, 0, oti ?
2051                                               oti->oti_handle : handle,
2052                                               filter_cancel_cookies_cb,
2053                                               fcc);
2054         }
2055
2056         rc = filter_finish_transno(exp, oti, rc);
2057         rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
2058         if (rc2) {
2059                 CERROR("error on commit, err = %d\n", rc2);
2060                 if (!rc)
2061                         rc = rc2;
2062         }
2063
2064         res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
2065                                 res_id, LDLM_EXTENT, 0);
2066         if (res != NULL) {
2067                 if (res->lr_namespace->ns_lvbo &&
2068                     res->lr_namespace->ns_lvbo->lvbo_update)
2069                         rc = res->lr_namespace->ns_lvbo->lvbo_update(res, NULL,
2070                                                                      0, 0);
2071                 ldlm_resource_putref(res);
2072         } else if (iattr.ia_valid & ATTR_SIZE) {
2073                 CERROR("!!! resource_get failed for object "LPU64" -- "
2074                        "filter_setattr with no lock?\n", oa->o_id);
2075         }
2076
2077         oa->o_valid = OBD_MD_FLID;
2078         /* Quota release need uid/gid info */
2079         obdo_from_inode(oa, dentry->d_inode,
2080                         FILTER_VALID_FLAGS | OBD_MD_FLUID | OBD_MD_FLGID);
2081
2082 out_unlock:
2083         if (iattr.ia_valid & ATTR_SIZE)
2084                 up(&dentry->d_inode->i_sem);
2085         unlock_kernel();
2086         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2087
2088         f_dput(dentry);
2089
2090         /* trigger quota release */
2091         if (rc == 0 && iattr.ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) {
2092                 rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
2093                                          oa->o_uid, oa->o_gid, 1);
2094                 if (rc2)
2095                         CERROR("error filter adjust qunit! (rc:%d)\n", rc2);
2096                 /* after owner changed, release quota for the original owner */
2097                 rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
2098                                          orig_uid, orig_gid, 1);
2099                 if (rc2)
2100                         CERROR("error filter adjust qunit! (rc:%d)\n", rc2);
2101         }
2102         RETURN(rc);
2103 }
2104
2105 /* XXX identical to osc_unpackmd */
2106 static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2107                            struct lov_mds_md *lmm, int lmm_bytes)
2108 {
2109         int lsm_size;
2110         ENTRY;
2111
2112         if (lmm != NULL) {
2113                 if (lmm_bytes < sizeof (*lmm)) {
2114                         CERROR("lov_mds_md too small: %d, need %d\n",
2115                                lmm_bytes, (int)sizeof(*lmm));
2116                         RETURN(-EINVAL);
2117                 }
2118                 /* XXX LOV_MAGIC etc check? */
2119
2120                 if (lmm->lmm_object_id == cpu_to_le64(0)) {
2121                         CERROR("lov_mds_md: zero lmm_object_id\n");
2122                         RETURN(-EINVAL);
2123                 }
2124         }
2125
2126         lsm_size = lov_stripe_md_size(1);
2127         if (lsmp == NULL)
2128                 RETURN(lsm_size);
2129
2130         if (*lsmp != NULL && lmm == NULL) {
2131                 OBD_FREE(*lsmp, lsm_size);
2132                 *lsmp = NULL;
2133                 RETURN(0);
2134         }
2135
2136         if (*lsmp == NULL) {
2137                 OBD_ALLOC(*lsmp, lsm_size);
2138                 if (*lsmp == NULL)
2139                         RETURN(-ENOMEM);
2140
2141                 loi_init((*lsmp)->lsm_oinfo);
2142         }
2143
2144         if (lmm != NULL) {
2145                 /* XXX zero *lsmp? */
2146                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
2147                 LASSERT((*lsmp)->lsm_object_id);
2148         }
2149
2150         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
2151
2152         RETURN(lsm_size);
2153 }
2154
2155 static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
2156                                       struct filter_obd *filter)
2157 {
2158         struct obdo doa; /* XXX obdo on stack */
2159         __u64 last, id;
2160         ENTRY;
2161         LASSERT(oa);
2162
2163         memset(&doa, 0, sizeof(doa));
2164         if (oa->o_valid & OBD_MD_FLGROUP) {
2165                 doa.o_valid |= OBD_MD_FLGROUP;
2166                 doa.o_gr = oa->o_gr;
2167         } else {
2168                 doa.o_gr = 0;
2169         }
2170         doa.o_mode = S_IFREG;
2171
2172         filter->fo_destroy_in_progress = 1;
2173         down(&filter->fo_create_lock);
2174         if (!filter->fo_destroy_in_progress) {
2175                 CERROR("%s: destroy_in_progress already cleared\n",
2176                         exp->exp_obd->obd_name);
2177                 up(&filter->fo_create_lock);
2178                 EXIT;
2179                 return;
2180         }
2181
2182         last = filter_last_id(filter, &doa);
2183         CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
2184                exp->exp_obd->obd_name, oa->o_id + 1, last);
2185         for (id = oa->o_id + 1; id <= last; id++) {
2186                 doa.o_id = id;
2187                 filter_destroy(exp, &doa, NULL, NULL);
2188         }
2189
2190         CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
2191                exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
2192
2193         spin_lock(&filter->fo_objidlock);
2194         filter->fo_last_objids[doa.o_gr] = oa->o_id;
2195         spin_unlock(&filter->fo_objidlock);
2196
2197         filter->fo_destroy_in_progress = 0;
2198         up(&filter->fo_create_lock);
2199
2200         EXIT;
2201 }
2202
2203 /* returns a negative error or a nonnegative number of files to create */
2204 static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
2205                                    obd_gr group)
2206 {
2207         struct obd_device *obd = exp->exp_obd;
2208         struct filter_obd *filter = &obd->u.filter;
2209         int diff, rc;
2210         ENTRY;
2211
2212         diff = oa->o_id - filter_last_id(filter, oa);
2213         CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
2214                filter_last_id(filter, oa), diff);
2215
2216         /* delete orphans request */
2217         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2218             (oa->o_flags & OBD_FL_DELORPHAN)) {
2219                 if (diff >= 0)
2220                         RETURN(diff);
2221                 if (-diff > OST_MAX_PRECREATE) {
2222                         CERROR("%s: ignoring bogus orphan destroy request: "
2223                                "obdid "LPU64" last_id "LPU64"\n", obd->obd_name,
2224                                oa->o_id, filter_last_id(filter, oa));
2225                         RETURN(-EINVAL);
2226                 }
2227                 filter_destroy_precreated(exp, oa, filter);
2228                 rc = filter_update_last_objid(obd, group, 0);
2229                 if (rc)
2230                         CERROR("%s: unable to write lastobjid, but orphans"
2231                                "were deleted\n", obd->obd_name);
2232                 RETURN(0);
2233         } else {
2234                 /* only precreate if group == 0 and o_id is specfied */
2235                 if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
2236                     (group != 0 || oa->o_id == 0))
2237                         RETURN(1);
2238
2239                 LASSERTF(diff >= 0,"%s: "LPU64" - "LPU64" = %d\n",obd->obd_name,
2240                          oa->o_id, filter_last_id(filter, oa), diff);
2241                 RETURN(diff);
2242         }
2243 }
2244
2245 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2246                          unsigned long max_age)
2247 {
2248         struct filter_obd *filter = &obd->u.filter;
2249         int blockbits = filter->fo_sb->s_blocksize_bits;
2250         int rc;
2251         ENTRY;
2252
2253         /* at least try to account for cached pages.  its still racey and
2254          * might be under-reporting if clients haven't announced their
2255          * caches with brw recently */
2256         spin_lock(&obd->obd_osfs_lock);
2257         rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
2258         memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
2259         spin_unlock(&obd->obd_osfs_lock);
2260
2261         CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
2262                " pending "LPU64" free "LPU64" avail "LPU64"\n",
2263                filter->fo_tot_dirty, filter->fo_tot_granted,
2264                filter->fo_tot_pending,
2265                osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
2266
2267         filter_grant_sanity_check(obd, __FUNCTION__);
2268
2269         osfs->os_bavail -= min(osfs->os_bavail, GRANT_FOR_LLOG(obd) +
2270                                ((filter->fo_tot_dirty + filter->fo_tot_pending +
2271                                  osfs->os_bsize - 1) >> blockbits));
2272         RETURN(rc);
2273 }
2274
2275 /* We rely on the fact that only one thread will be creating files in a given
2276  * group at a time, which is why we don't need an atomic filter_get_new_id.
2277  * Even if we had that atomic function, the following race would exist:
2278  *
2279  * thread 1: gets id x from filter_next_id
2280  * thread 2: gets id (x + 1) from filter_next_id
2281  * thread 2: creates object (x + 1)
2282  * thread 1: tries to create object x, gets -ENOSPC
2283  */
2284 static int filter_precreate(struct obd_device *obd, struct obdo *oa,
2285                             obd_gr group, int *num)
2286 {
2287         struct dentry *dchild = NULL, *dparent = NULL;
2288         struct filter_obd *filter;
2289         struct obd_statfs *osfs;
2290         int err = 0, rc = 0, recreate_obj = 0, i;
2291         unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3;
2292         __u64 next_id;
2293         void *handle = NULL;
2294         ENTRY;
2295
2296         filter = &obd->u.filter;
2297
2298         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2299             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2300                 recreate_obj = 1;
2301         } else {
2302                 OBD_ALLOC(osfs, sizeof(*osfs));
2303                 if (osfs == NULL)
2304                         RETURN(-ENOMEM);
2305                 rc = filter_statfs(obd, osfs, jiffies - HZ);
2306                 if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
2307                         CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
2308                               osfs->os_bavail<<filter->fo_sb->s_blocksize_bits);
2309                         *num=0;
2310                         rc = -ENOSPC;
2311                 }
2312                 OBD_FREE(osfs, sizeof(*osfs));
2313                 if (rc) {
2314                         RETURN(rc);
2315                 }
2316         }
2317
2318         CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
2319
2320         down(&filter->fo_create_lock);
2321
2322         for (i = 0; i < *num && err == 0; i++) {
2323                 int cleanup_phase = 0;
2324
2325                 if (filter->fo_destroy_in_progress) {
2326                         CWARN("%s: precreate aborted by destroy\n",
2327                               obd->obd_name);
2328                         break;
2329                 }
2330
2331                 if (recreate_obj) {
2332                         __u64 last_id;
2333                         next_id = oa->o_id;
2334                         last_id = filter_last_id(filter, oa);
2335                         if (next_id > last_id) {
2336                                 CERROR("Error: Trying to recreate obj greater"
2337                                        "than last id "LPD64" > "LPD64"\n",
2338                                        next_id, last_id);
2339                                 GOTO(cleanup, rc = -EINVAL);
2340                         }
2341                 } else
2342                         next_id = filter_last_id(filter, oa) + 1;
2343
2344                 CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
2345
2346                 dparent = filter_parent_lock(obd, group, next_id);
2347                 if (IS_ERR(dparent))
2348                         GOTO(cleanup, rc = PTR_ERR(dparent));
2349                 cleanup_phase = 1;
2350
2351                 dchild = filter_fid2dentry(obd, dparent, group, next_id);
2352                 if (IS_ERR(dchild))
2353                         GOTO(cleanup, rc = PTR_ERR(dchild));
2354                 cleanup_phase = 2;
2355
2356                 if (dchild->d_inode != NULL) {
2357                         /* This would only happen if lastobjid was bad on disk*/
2358                         /* Could also happen if recreating missing obj but
2359                          * already exists
2360                          */
2361                         if (recreate_obj) {
2362                                 CERROR("%s: recreating existing object %.*s?\n",
2363                                        obd->obd_name, dchild->d_name.len,
2364                                        dchild->d_name.name);
2365                         } else {
2366                                 CERROR("%s: Serious error: objid %.*s already "
2367                                        "exists; is this filesystem corrupt?\n",
2368                                        obd->obd_name, dchild->d_name.len,
2369                                        dchild->d_name.name);
2370                                 LBUG();
2371                         }
2372                         GOTO(cleanup, rc = -EEXIST);
2373                 }
2374
2375                 handle = fsfilt_start_log(obd, dparent->d_inode,
2376                                           FSFILT_OP_CREATE, NULL, 1);
2377                 if (IS_ERR(handle))
2378                         GOTO(cleanup, rc = PTR_ERR(handle));
2379                 cleanup_phase = 3;
2380
2381                 rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG | 0666, NULL);
2382                 if (rc) {
2383                         CERROR("create failed rc = %d\n", rc);
2384                         GOTO(cleanup, rc);
2385                 }
2386
2387                 if (!recreate_obj) {
2388                         filter_set_last_id(filter, oa, next_id);
2389                         err = filter_update_last_objid(obd, group, 0);
2390                         if (err)
2391                                 CERROR("unable to write lastobjid "
2392                                        "but file created\n");
2393                 }
2394
2395         cleanup:
2396                 switch(cleanup_phase) {
2397                 case 3:
2398                         err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2399                         if (err) {
2400                                 CERROR("error on commit, err = %d\n", err);
2401                                 if (!rc)
2402                                         rc = err;
2403                         }
2404                 case 2:
2405                         f_dput(dchild);
2406                 case 1:
2407                         filter_parent_unlock(dparent);
2408                 case 0:
2409                         break;
2410                 }
2411
2412                 if (rc)
2413                         break;
2414                 if (time_after(jiffies, enough_time)) {
2415                         CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n",
2416                                obd->obd_name, *num, i);
2417                         break;
2418                 }
2419         }
2420         *num = i;
2421
2422         up(&filter->fo_create_lock);
2423
2424         CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
2425                obd->obd_name, group, filter->fo_last_objids[group]);
2426
2427         CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
2428                obd->obd_name, i);
2429         RETURN(rc);
2430 }
2431
2432 static int filter_create(struct obd_export *exp, struct obdo *oa,
2433                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
2434 {
2435         struct obd_device *obd = NULL;
2436         struct lvfs_run_ctxt saved;
2437         struct lov_stripe_md *lsm = NULL;
2438         obd_gr group = 0;
2439         int rc = 0, diff;
2440         ENTRY;
2441
2442         if (oa->o_valid & OBD_MD_FLGROUP)
2443                 group = oa->o_gr;
2444
2445         CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
2446                group, oa->o_id);
2447         if (ea != NULL) {
2448                 lsm = *ea;
2449                 if (lsm == NULL) {
2450                         rc = obd_alloc_memmd(exp, &lsm);
2451                         if (rc < 0)
2452                                 RETURN(rc);
2453                 }
2454         }
2455
2456         obd = exp->exp_obd;
2457         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2458
2459         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2460             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2461                 if (oa->o_id > filter_last_id(&obd->u.filter, oa)) {
2462                         CERROR("recreate objid "LPU64" > last id "LPU64"\n",
2463                                oa->o_id, filter_last_id(&obd->u.filter, oa));
2464                         rc = -EINVAL;
2465                 } else {
2466                         diff = 1;
2467                         rc = filter_precreate(obd, oa, group, &diff);
2468                 }
2469         } else {
2470                 diff = filter_should_precreate(exp, oa, group);
2471                 if (diff > 0) {
2472                         oa->o_id = filter_last_id(&obd->u.filter, oa);
2473                         rc = filter_precreate(obd, oa, group, &diff);
2474                         oa->o_id = filter_last_id(&obd->u.filter, oa);
2475                         oa->o_valid = OBD_MD_FLID;
2476                 }
2477         }
2478
2479         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2480         if (rc && ea != NULL && *ea != lsm) {
2481                 obd_free_memmd(exp, &lsm);
2482         } else if (rc == 0 && ea != NULL) {
2483                 /* XXX LOV STACKING: the lsm that is passed to us from
2484                  * LOV does not have valid lsm_oinfo data structs, so
2485                  * don't go touching that.  This needs to be fixed in a
2486                  * big way. */
2487                 lsm->lsm_object_id = oa->o_id;
2488                 *ea = lsm;
2489         }
2490
2491         RETURN(rc);
2492 }
2493
2494 int filter_destroy(struct obd_export *exp, struct obdo *oa,
2495                    struct lov_stripe_md *md, struct obd_trans_info *oti)
2496 {
2497         struct obd_device *obd;
2498         struct filter_obd *filter;
2499         struct dentry *dchild = NULL, *dparent = NULL;
2500         struct lvfs_run_ctxt saved;
2501         void *handle = NULL;
2502         struct llog_cookie *fcc = NULL;
2503         int rc, rc2, cleanup_phase = 0, have_prepared = 0;
2504         obd_gr group = 0;
2505         ENTRY;
2506
2507         if (oa->o_valid & OBD_MD_FLGROUP)
2508                 group = oa->o_gr;
2509
2510         obd = exp->exp_obd;
2511         filter = &obd->u.filter;
2512
2513         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2514
2515  acquire_locks:
2516         dparent = filter_parent_lock(obd, group, oa->o_id);
2517         if (IS_ERR(dparent))
2518                 GOTO(cleanup, rc = PTR_ERR(dparent));
2519         cleanup_phase = 1;
2520
2521         dchild = filter_fid2dentry(obd, dparent, group, oa->o_id);
2522         if (IS_ERR(dchild))
2523                 GOTO(cleanup, rc = PTR_ERR(dchild));
2524         cleanup_phase = 2;
2525
2526         if (dchild->d_inode == NULL) {
2527                 CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n",
2528                        oa->o_id);
2529                 /* If object already gone, cancel cookie right now */
2530                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
2531                         fcc = obdo_logcookie(oa);
2532                         llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
2533                                     NULL, 1, fcc, 0);
2534                 }
2535                 GOTO(cleanup, rc = -ENOENT);
2536         }
2537
2538         if (!have_prepared) {
2539                 /* If we're really going to destroy the object, get ready
2540                  * by getting the clients to discard their cached data.
2541                  *
2542                  * We have to drop the parent lock, because
2543                  * filter_prepare_destroy will acquire a PW on the object, and
2544                  * we don't want to deadlock with an incoming write to the
2545                  * object, which has the extent PW and then wants to get the
2546                  * parent dentry to do the lookup.
2547                  *
2548                  * We dput the child because it's not worth the extra
2549                  * complication of condition the above code to skip it on the
2550                  * second time through. */
2551                 f_dput(dchild);
2552                 filter_parent_unlock(dparent);
2553
2554                 filter_prepare_destroy(obd, oa->o_id);
2555                 have_prepared = 1;
2556                 goto acquire_locks;
2557         }
2558
2559         handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1);
2560         if (IS_ERR(handle))
2561                 GOTO(cleanup, rc = PTR_ERR(handle));
2562         cleanup_phase = 3;
2563
2564         /* Our MDC connection is established by the MDS to us */
2565         if (oa->o_valid & OBD_MD_FLCOOKIE) {
2566                 OBD_ALLOC(fcc, sizeof(*fcc));
2567                 if (fcc != NULL)
2568                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
2569         }
2570
2571         /* Quota release need uid/gid of inode */
2572         obdo_from_inode(oa, dchild->d_inode, OBD_MD_FLUID|OBD_MD_FLGID);
2573         rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
2574
2575 cleanup:
2576         switch(cleanup_phase) {
2577         case 3:
2578                 if (fcc != NULL) {
2579                         fsfilt_add_journal_cb(obd, 0,
2580                                               oti ? oti->oti_handle : handle,
2581                                               filter_cancel_cookies_cb, fcc);
2582                 }
2583                 rc = filter_finish_transno(exp, oti, rc);
2584                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2585                 if (rc2) {
2586                         CERROR("error on commit, err = %d\n", rc2);
2587                         if (!rc)
2588                                 rc = rc2;
2589                 }
2590         case 2:
2591                 f_dput(dchild);
2592         case 1:
2593                 filter_parent_unlock(dparent);
2594         case 0:
2595                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2596                 break;
2597         default:
2598                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2599                 LBUG();
2600         }
2601
2602         /* trigger quota release */
2603         if (rc == 0) {
2604                 rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
2605                                          oa->o_uid, oa->o_gid, 1);
2606                 if (rc2)
2607                         CERROR("error filter adjust qunit! (rc:%d)\n", rc2);
2608         }
2609
2610         RETURN(rc);
2611 }
2612
2613 /* NB start and end are used for punch, but not truncate */
2614 static int filter_truncate(struct obd_export *exp, struct obdo *oa,
2615                            struct lov_stripe_md *lsm,
2616                            obd_off start, obd_off end,
2617                            struct obd_trans_info *oti)
2618 {
2619         int error;
2620         ENTRY;
2621
2622         if (end != OBD_OBJECT_EOF) {
2623                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2624                        end);
2625                 RETURN(-EFAULT);
2626         }
2627
2628         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = "LPX64
2629                ", o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2630         oa->o_size = start;
2631         error = filter_setattr(exp, oa, NULL, oti);
2632         RETURN(error);
2633 }
2634
2635 static int filter_sync(struct obd_export *exp, struct obdo *oa,
2636                        struct lov_stripe_md *lsm, obd_off start, obd_off end)
2637 {
2638         struct lvfs_run_ctxt saved;
2639         struct filter_obd *filter;
2640         struct dentry *dentry;
2641         struct llog_ctxt *ctxt;
2642         int rc, rc2;
2643         ENTRY;
2644
2645         filter = &exp->exp_obd->u.filter;
2646
2647         /* an objid of zero is taken to mean "sync whole filesystem" */
2648         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
2649                 rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
2650                 /* flush any remaining cancel messages out to the target */
2651                 ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
2652                 llog_sync(ctxt, exp);
2653                 RETURN(rc);
2654         }
2655
2656         dentry = filter_oa2dentry(exp->exp_obd, oa);
2657         if (IS_ERR(dentry))
2658                 RETURN(PTR_ERR(dentry));
2659
2660         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2661
2662         down(&dentry->d_inode->i_sem);
2663         rc = filemap_fdatawrite(dentry->d_inode->i_mapping);
2664         if (rc == 0) {
2665                 /* just any file to grab fsync method - "file" arg unused */
2666                 struct file *file = filter->fo_rcvd_filp;
2667
2668                 if (file->f_op && file->f_op->fsync)
2669                         rc = file->f_op->fsync(NULL, dentry, 1);
2670
2671                 rc2 = filemap_fdatawait(dentry->d_inode->i_mapping);
2672                 if (!rc)
2673                         rc = rc2;
2674         }
2675         up(&dentry->d_inode->i_sem);
2676
2677         oa->o_valid = OBD_MD_FLID;
2678         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
2679
2680         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2681
2682         f_dput(dentry);
2683         RETURN(rc);
2684 }
2685
2686 static int filter_get_info(struct obd_export *exp, __u32 keylen,
2687                            void *key, __u32 *vallen, void *val)
2688 {
2689         struct obd_device *obd;
2690         ENTRY;
2691
2692         obd = class_exp2obd(exp);
2693         if (obd == NULL) {
2694                 CDEBUG(D_IOCTL, "invalid client export %p\n", exp);
2695                 RETURN(-EINVAL);
2696         }
2697
2698         if (keylen == strlen("blocksize") &&
2699             memcmp(key, "blocksize", keylen) == 0) {
2700                 __u32 *blocksize = val;
2701                 *vallen = sizeof(*blocksize);
2702                 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2703                 RETURN(0);
2704         }
2705
2706         if (keylen == strlen("blocksize_bits") &&
2707             memcmp(key, "blocksize_bits", keylen) == 0) {
2708                 __u32 *blocksize_bits = val;
2709                 *vallen = sizeof(*blocksize_bits);
2710                 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2711                 RETURN(0);
2712         }
2713
2714         if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
2715                 obd_id *last_id = val;
2716                 /* FIXME: object groups */
2717                 *last_id = filter_last_id(&obd->u.filter, 0);
2718                 RETURN(0);
2719         }
2720         CDEBUG(D_IOCTL, "invalid key\n");
2721         RETURN(-EINVAL);
2722 }
2723
2724 static int filter_set_info(struct obd_export *exp, __u32 keylen,
2725                            void *key, __u32 vallen, void *val)
2726 {
2727         struct obd_device *obd;
2728         struct llog_ctxt *ctxt;
2729         int rc = 0;
2730         ENTRY;
2731
2732         obd = exp->exp_obd;
2733         if (obd == NULL) {
2734                 CDEBUG(D_IOCTL, "invalid export %p\n", exp);
2735                 RETURN(-EINVAL);
2736         }
2737
2738         if (keylen < strlen("mds_conn") ||
2739             memcmp(key, "mds_conn", keylen) != 0)
2740                 RETURN(-EINVAL);
2741
2742         CWARN("%s: received MDS connection from %s\n", obd->obd_name,
2743               obd_export_nid2str(exp));
2744         obd->u.filter.fo_mdc_conn.cookie = exp->exp_handle.h_cookie;
2745
2746         /* setup llog imports */
2747         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
2748         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
2749
2750         filter_quota_set_info(exp, obd);
2751
2752         RETURN(rc);
2753 }
2754
2755 int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
2756                      int len, void *karg, void *uarg)
2757 {
2758         struct obd_device *obd = exp->exp_obd;
2759         struct obd_ioctl_data *data = karg;
2760         int rc = 0;
2761
2762         switch (cmd) {
2763         case OBD_IOC_ABORT_RECOVERY: {
2764                 CERROR("aborting recovery for device %s\n", obd->obd_name);
2765                 target_abort_recovery(obd);
2766                 RETURN(0);
2767         }
2768
2769         case OBD_IOC_SYNC: {
2770                 CDEBUG(D_HA, "syncing ost %s\n", obd->obd_name);
2771                 rc = fsfilt_sync(obd, obd->u.filter.fo_sb);
2772                 RETURN(rc);
2773         }
2774
2775         case OBD_IOC_SET_READONLY: {
2776                 void *handle;
2777                 struct super_block *sb = obd->u.filter.fo_sb;
2778                 struct inode *inode = sb->s_root->d_inode;
2779                 BDEVNAME_DECLARE_STORAGE(tmp);
2780                 CERROR("*** setting device %s read-only ***\n",
2781                        ll_bdevname(sb, tmp));
2782
2783                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
2784                 if (!IS_ERR(handle))
2785                         rc = fsfilt_commit(obd, inode, handle, 1);
2786
2787                 CDEBUG(D_HA, "syncing ost %s\n", obd->obd_name);
2788                 rc = fsfilt_sync(obd, obd->u.filter.fo_sb);
2789
2790                 lvfs_set_rdonly(lvfs_sbdev(obd->u.filter.fo_sb));
2791                 RETURN(0);
2792         }
2793
2794         case OBD_IOC_CATLOGLIST: {
2795                 rc = llog_catalog_list(obd, 1, data);
2796                 RETURN(rc);
2797         }
2798
2799         case OBD_IOC_LLOG_CANCEL:
2800         case OBD_IOC_LLOG_REMOVE:
2801         case OBD_IOC_LLOG_INFO:
2802         case OBD_IOC_LLOG_PRINT: {
2803                 /* FIXME to be finished */
2804                 RETURN(-EOPNOTSUPP);
2805 /*
2806                 struct llog_ctxt *ctxt = NULL;
2807
2808                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
2809                 rc = llog_ioctl(ctxt, cmd, data);
2810                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
2811
2812                 RETURN(rc);
2813 */
2814         }
2815
2816
2817         default:
2818                 RETURN(-EINVAL);
2819         }
2820         RETURN(0);
2821 }
2822
2823 static int filter_health_check(struct obd_device *obd)
2824 {
2825         struct filter_obd *filter = &obd->u.filter;
2826         int rc = 0;
2827
2828         /*
2829          * health_check to return 0 on healthy
2830          * and 1 on unhealthy.
2831          */
2832         if (filter->fo_sb->s_flags & MS_RDONLY)
2833                 rc = 1;
2834
2835         LASSERT(filter->fo_health_check_filp != NULL);
2836         rc |= !!lvfs_check_io_health(obd, filter->fo_health_check_filp);
2837
2838         return rc;
2839 }
2840
2841 static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2842                                              void *data)
2843 {
2844         return filter_fid2dentry(data, NULL, gr, id);
2845 }
2846
2847 static struct lvfs_callback_ops filter_lvfs_ops = {
2848         l_fid2dentry:     filter_lvfs_fid2dentry,
2849 };
2850
2851 static struct obd_ops filter_obd_ops = {
2852         .o_owner          = THIS_MODULE,
2853         .o_get_info       = filter_get_info,
2854         .o_set_info       = filter_set_info,
2855         .o_setup          = filter_setup,
2856         .o_precleanup     = filter_precleanup,
2857         .o_cleanup        = filter_cleanup,
2858         .o_connect        = filter_connect,
2859         .o_reconnect      = filter_reconnect,
2860         .o_disconnect     = filter_disconnect,
2861         .o_statfs         = filter_statfs,
2862         .o_getattr        = filter_getattr,
2863         .o_unpackmd       = filter_unpackmd,
2864         .o_create         = filter_create,
2865         .o_setattr        = filter_setattr,
2866         .o_destroy        = filter_destroy,
2867         .o_brw            = filter_brw,
2868         .o_punch          = filter_truncate,
2869         .o_sync           = filter_sync,
2870         .o_preprw         = filter_preprw,
2871         .o_commitrw       = filter_commitrw,
2872         .o_destroy_export = filter_destroy_export,
2873         .o_llog_init      = filter_llog_init,
2874         .o_llog_finish    = filter_llog_finish,
2875         .o_iocontrol      = filter_iocontrol,
2876         .o_quotacheck     = filter_quotacheck,
2877         .o_quotactl       = filter_quotactl,
2878         .o_health_check   = filter_health_check,
2879 };
2880
2881 static struct obd_ops filter_sanobd_ops = {
2882         .o_owner          = THIS_MODULE,
2883         .o_get_info       = filter_get_info,
2884         .o_set_info       = filter_set_info,
2885         .o_setup          = filter_san_setup,
2886         .o_precleanup     = filter_precleanup,
2887         .o_cleanup        = filter_cleanup,
2888         .o_connect        = filter_connect,
2889         .o_reconnect      = filter_reconnect,
2890         .o_disconnect     = filter_disconnect,
2891         .o_statfs         = filter_statfs,
2892         .o_getattr        = filter_getattr,
2893         .o_unpackmd       = filter_unpackmd,
2894         .o_create         = filter_create,
2895         .o_setattr        = filter_setattr,
2896         .o_destroy        = filter_destroy,
2897         .o_brw            = filter_brw,
2898         .o_punch          = filter_truncate,
2899         .o_sync           = filter_sync,
2900         .o_preprw         = filter_preprw,
2901         .o_commitrw       = filter_commitrw,
2902         .o_san_preprw     = filter_san_preprw,
2903         .o_destroy_export = filter_destroy_export,
2904         .o_llog_init      = filter_llog_init,
2905         .o_llog_finish    = filter_llog_finish,
2906         .o_iocontrol      = filter_iocontrol,
2907 };
2908
2909 static int __init obdfilter_init(void)
2910 {
2911         struct lprocfs_static_vars lvars;
2912         int rc;
2913
2914         printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
2915
2916         lprocfs_init_vars(filter, &lvars);
2917
2918         OBD_ALLOC(obdfilter_created_scratchpad,
2919                   OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
2920                   sizeof(*obdfilter_created_scratchpad));
2921         if (obdfilter_created_scratchpad == NULL)
2922                 return -ENOMEM;
2923
2924         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2925                                  LUSTRE_OST_NAME);
2926         if (rc)
2927                 GOTO(out, rc);
2928
2929         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2930                                  LUSTRE_OSTSAN_NAME);
2931         if (rc) {
2932                 class_unregister_type(LUSTRE_OST_NAME);
2933 out:
2934                 OBD_FREE(obdfilter_created_scratchpad,
2935                          OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
2936                          sizeof(*obdfilter_created_scratchpad));
2937         }
2938         return rc;
2939 }
2940
2941 static void __exit obdfilter_exit(void)
2942 {
2943         class_unregister_type(LUSTRE_OSTSAN_NAME);
2944         class_unregister_type(LUSTRE_OST_NAME);
2945         OBD_FREE(obdfilter_created_scratchpad,
2946                  OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
2947                  sizeof(*obdfilter_created_scratchpad));
2948 }
2949
2950 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2951 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2952 MODULE_LICENSE("GPL");
2953
2954 module_init(obdfilter_init);
2955 module_exit(obdfilter_exit);