Whamcloud - gitweb
70256c6bd42473dbb17e08fcb5400468f746d434
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define DEBUG_SUBSYSTEM S_FILTER
37
38 #include <linux/config.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/dcache.h>
42 #include <linux/init.h>
43 #include <linux/version.h>
44 #include <linux/sched.h>
45 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
46 # include <linux/mount.h>
47 # include <linux/buffer_head.h>
48 # include <linux/security.h>
49 #endif
50
51 #include <linux/obd_class.h>
52 #include <linux/obd_lov.h>
53 #include <linux/lustre_dlm.h>
54 #include <linux/lustre_fsfilt.h>
55 #include <linux/lprocfs_status.h>
56 #include <linux/lustre_log.h>
57 #include <linux/lustre_ver.h>
58 #include <linux/lustre_commit_confd.h>
59 #include <libcfs/list.h>
60 #include <linux/lustre_disk.h>
61 #include <linux/lustre_quota.h>
62 #include <linux/quotaops.h>
63
64 #include "filter_internal.h"
65
66 static struct lvfs_callback_ops filter_lvfs_ops;
67
68 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
69                              void *cb_data, int error)
70 {
71         obd_transno_commit_cb(obd, transno, error);
72 }
73
74 /* Assumes caller has already pushed us into the kernel context. */
75 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
76                           int rc)
77 {
78         struct filter_obd *filter = &exp->exp_obd->u.filter;
79         struct filter_export_data *fed = &exp->exp_filter_data;
80         struct filter_client_data *fcd = fed->fed_fcd;
81         __u64 last_rcvd;
82         loff_t off;
83         int err, log_pri = D_HA;
84
85         /* Propagate error code. */
86         if (rc)
87                 RETURN(rc);
88
89         if (!exp->exp_obd->obd_replayable || oti == NULL)
90                 RETURN(rc);
91
92         /* we don't allocate new transnos for replayed requests */
93         if (oti->oti_transno == 0) {
94                 spin_lock(&filter->fo_translock);
95                 last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
96                 filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
97                 spin_unlock(&filter->fo_translock);
98                 oti->oti_transno = last_rcvd;
99         } else {
100                 spin_lock(&filter->fo_translock);
101                 last_rcvd = oti->oti_transno;
102                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
103                         filter->fo_fsd->lsd_last_transno =
104                                 cpu_to_le64(last_rcvd);
105                 spin_unlock(&filter->fo_translock);
106         }
107         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
108
109         /* could get xid from oti, if it's ever needed */
110         fcd->fcd_last_xid = 0;
111
112         off = fed->fed_lr_off;
113         if (off <= 0) {
114                 CERROR("%s: client idx %d is %lld\n", exp->exp_obd->obd_name,
115                        fed->fed_lr_idx, fed->fed_lr_off);
116                 err = -EINVAL;
117         } else {
118                 fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
119                                       filter_commit_cb, NULL);
120                 err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
121                                           fcd, sizeof(*fcd), &off, 0);
122         }
123         if (err) {
124                 log_pri = D_ERROR;
125                 if (rc == 0)
126                         rc = err;
127         }
128
129         CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n",
130                last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err);
131
132         RETURN(rc);
133 }
134
135 void f_dput(struct dentry *dentry)
136 {
137         /* Can't go inside filter_ddelete because it can block */
138         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
139                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
140         LASSERT(atomic_read(&dentry->d_count) > 0);
141
142         dput(dentry);
143 }
144
145 /* Add client data to the FILTER.  We use a bitmap to locate a free space
146  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
147  * Otherwise, we have just read the data from the last_rcvd file and
148  * we know its offset. */
149 static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
150                              struct filter_export_data *fed, int cl_idx)
151 {
152         unsigned long *bitmap = filter->fo_last_rcvd_slots;
153         int new_client = (cl_idx == -1);
154         ENTRY;
155
156         LASSERT(bitmap != NULL);
157         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
158
159         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
160         if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
161                 RETURN(0);
162
163         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
164          * there's no need for extra complication here
165          */
166         if (new_client) {
167                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
168         repeat:
169                 if (cl_idx >= LR_MAX_CLIENTS) {
170                         CERROR("no client slots - fix LR_MAX_CLIENTS\n");
171                         RETURN(-EOVERFLOW);
172                 }
173                 if (test_and_set_bit(cl_idx, bitmap)) {
174                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
175                                                     cl_idx);
176                         goto repeat;
177                 }
178         } else {
179                 if (test_and_set_bit(cl_idx, bitmap)) {
180                         CERROR("FILTER client %d: bit already set in bitmap!\n",
181                                cl_idx);
182                         LBUG();
183                 }
184         }
185
186         fed->fed_lr_idx = cl_idx;
187         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) +
188                 cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size);
189         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
190
191         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
192                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
193
194         if (new_client) {
195                 struct lvfs_run_ctxt saved;
196                 loff_t off = fed->fed_lr_off;
197                 int err;
198                 void *handle;
199
200                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
201                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
202
203                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
204                 /* Transaction needed to fix bug 1403 */
205                 handle = fsfilt_start(obd,
206                                       filter->fo_rcvd_filp->f_dentry->d_inode,
207                                       FSFILT_OP_SETATTR, NULL);
208                 if (IS_ERR(handle)) {
209                         err = PTR_ERR(handle);
210                         CERROR("unable to start transaction: rc %d\n", err);
211                 } else {
212                         err = fsfilt_write_record(obd, filter->fo_rcvd_filp,
213                                                   fed->fed_fcd,
214                                                   sizeof(*fed->fed_fcd),
215                                                   &off, 1);
216                         fsfilt_commit(obd,
217                                       filter->fo_rcvd_filp->f_dentry->d_inode,
218                                       handle, 1);
219                 }
220                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
221
222                 if (err) {
223                         CERROR("error writing %s client idx %u: rc %d\n",
224                                LAST_RCVD, fed->fed_lr_idx, err);
225                         RETURN(err);
226                 }
227         }
228         RETURN(0);
229 }
230
231 static int filter_client_free(struct obd_export *exp)
232 {
233         struct filter_export_data *fed = &exp->exp_filter_data;
234         struct filter_obd *filter = &exp->exp_obd->u.filter;
235         struct obd_device *obd = exp->exp_obd;
236         struct filter_client_data zero_fcd;
237         struct lvfs_run_ctxt saved;
238         int rc;
239         loff_t off;
240         ENTRY;
241
242         if (fed->fed_fcd == NULL)
243                 RETURN(0);
244
245         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
246         if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
247                 GOTO(free, 0);
248
249         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
250                fed->fed_lr_idx, off, fed->fed_fcd->fcd_uuid);
251
252         LASSERT(filter->fo_last_rcvd_slots != NULL);
253
254         off = fed->fed_lr_off;
255
256         /* Don't clear fed_lr_idx here as it is likely also unset.  At worst
257          * we leak a client slot that will be cleaned on the next recovery. */
258         if (off <= 0) {
259                 CERROR("%s: client idx %d has med_off %lld\n",
260                        obd->obd_name, fed->fed_lr_idx, off);
261                 GOTO(free, rc = -EINVAL);
262         }
263
264         /* Clear the bit _after_ zeroing out the client so we don't
265            race with filter_client_add and zero out new clients.*/
266         if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
267                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
268                        fed->fed_lr_idx);
269                 LBUG();
270         }
271
272         if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
273                 memset(&zero_fcd, 0, sizeof zero_fcd);
274                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
275                 rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
276                                          sizeof(zero_fcd), &off, 0);
277
278                 if (rc == 0)
279                         /* update server's transno */
280                         filter_update_server_data(obd, filter->fo_rcvd_filp,
281                                                   filter->fo_fsd, 1);
282                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
283
284                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
285                        "zeroing out client %s at idx %u (%llu) in %s rc %d\n",
286                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
287                        LAST_RCVD, rc);
288         }
289
290         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
291                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
292                        fed->fed_lr_idx);
293                 LBUG();
294         }
295
296         EXIT;
297 free:
298         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
299         fed->fed_fcd = NULL;
300
301         return 0;
302 }
303
304 static int filter_free_server_data(struct filter_obd *filter)
305 {
306         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
307         filter->fo_fsd = NULL;
308         OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
309         filter->fo_last_rcvd_slots = NULL;
310         return 0;
311 }
312
313 /* assumes caller is already in kernel ctxt */
314 int filter_update_server_data(struct obd_device *obd, struct file *filp,
315                               struct lr_server_data *fsd, int force_sync)
316 {
317         loff_t off = 0;
318         int rc;
319         ENTRY;
320
321         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->lsd_uuid);
322         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
323                le64_to_cpu(fsd->lsd_last_transno));
324         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
325                le64_to_cpu(fsd->lsd_mount_count));
326
327         rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync);
328         if (rc)
329                 CERROR("error writing lr_server_data: rc = %d\n", rc);
330
331         RETURN(rc);
332 }
333
334 int filter_update_last_objid(struct obd_device *obd, obd_gr group,
335                              int force_sync)
336 {
337         struct filter_obd *filter = &obd->u.filter;
338         __u64 tmp;
339         loff_t off = 0;
340         int rc;
341         ENTRY;
342
343         if (filter->fo_last_objid_files[group] == NULL) {
344                 CERROR("Object group "LPU64" not fully setup; not updating "
345                        "last_objid\n", group);
346                 RETURN(-EINVAL);
347         }
348
349         CDEBUG(D_INODE, "%s: server last_objid for group "LPU64": "LPU64"\n",
350                obd->obd_name, group, filter->fo_last_objids[group]);
351
352         tmp = cpu_to_le64(filter->fo_last_objids[group]);
353         rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
354                                  &tmp, sizeof(tmp), &off, force_sync);
355         if (rc)
356                 CERROR("error writing group "LPU64" last objid: rc = %d\n",
357                        group, rc);
358         RETURN(rc);
359 }
360
361 /* assumes caller has already in kernel ctxt */
362 static int filter_init_server_data(struct obd_device *obd, struct file * filp)
363 {
364         struct filter_obd *filter = &obd->u.filter;
365         struct lr_server_data *fsd;
366         struct filter_client_data *fcd = NULL;
367         struct inode *inode = filp->f_dentry->d_inode;
368         unsigned long last_rcvd_size = inode->i_size;
369         __u64 mount_count;
370         int cl_idx;
371         loff_t off = 0;
372         int rc;
373
374         /* ensure padding in the struct is the correct size */
375         CLASSERT (offsetof(struct lr_server_data, lsd_padding) +
376                  sizeof(fsd->lsd_padding) == LR_SERVER_SIZE);
377         CLASSERT (offsetof(struct filter_client_data, fcd_padding) +
378                  sizeof(fcd->fcd_padding) == LR_CLIENT_SIZE);
379
380         OBD_ALLOC(fsd, sizeof(*fsd));
381         if (!fsd)
382                 RETURN(-ENOMEM);
383         filter->fo_fsd = fsd;
384
385         OBD_ALLOC(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
386         if (filter->fo_last_rcvd_slots == NULL) {
387                 OBD_FREE(fsd, sizeof(*fsd));
388                 RETURN(-ENOMEM);
389         }
390
391         if (last_rcvd_size == 0) {
392                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
393
394                 memcpy(fsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->lsd_uuid));
395                 fsd->lsd_last_transno = 0;
396                 mount_count = fsd->lsd_mount_count = 0;
397                 fsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE);
398                 fsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START);
399                 fsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
400                 fsd->lsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
401                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
402                 fsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_OST);
403         } else {
404                 rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off);
405                 if (rc) {
406                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
407                                LAST_RCVD, rc);
408                         GOTO(err_fsd, rc);
409                 }
410                 if (strcmp(fsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
411                         CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
412                                obd->obd_uuid.uuid, fsd->lsd_uuid);
413                         GOTO(err_fsd, rc = -EINVAL);
414                 }
415                 mount_count = le64_to_cpu(fsd->lsd_mount_count);
416                 filter->fo_subdir_count = le16_to_cpu(fsd->lsd_subdir_count);
417         }
418
419         if (fsd->lsd_feature_incompat & ~cpu_to_le32(FILTER_INCOMPAT_SUPP)) {
420                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
421                        obd->obd_name, le32_to_cpu(fsd->lsd_feature_incompat) &
422                        ~FILTER_INCOMPAT_SUPP);
423                 GOTO(err_fsd, rc = -EINVAL);
424         }
425         if (fsd->lsd_feature_rocompat & ~cpu_to_le32(FILTER_ROCOMPAT_SUPP)) {
426                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
427                        obd->obd_name, le32_to_cpu(fsd->lsd_feature_rocompat) &
428                        ~FILTER_ROCOMPAT_SUPP);
429                 /* Do something like remount filesystem read-only */
430                 GOTO(err_fsd, rc = -EINVAL);
431         }
432
433         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
434                obd->obd_name, le64_to_cpu(fsd->lsd_last_transno));
435         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
436                obd->obd_name, mount_count + 1);
437         CDEBUG(D_INODE, "%s: server data size: %u\n",
438                obd->obd_name, le32_to_cpu(fsd->lsd_server_size));
439         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
440                obd->obd_name, le32_to_cpu(fsd->lsd_client_start));
441         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
442                obd->obd_name, le32_to_cpu(fsd->lsd_client_size));
443         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
444                obd->obd_name, le16_to_cpu(fsd->lsd_subdir_count));
445         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
446                last_rcvd_size <= le32_to_cpu(fsd->lsd_client_start) ? 0 :
447                (last_rcvd_size - le32_to_cpu(fsd->lsd_client_start)) /
448                 le16_to_cpu(fsd->lsd_client_size));
449
450         if (!obd->obd_replayable) {
451                 CWARN("%s: recovery support OFF\n", obd->obd_name);
452                 GOTO(out, rc = 0);
453         }
454
455         for (cl_idx = 0, off = le32_to_cpu(fsd->lsd_client_start);
456              off < last_rcvd_size; cl_idx++) {
457                 __u64 last_rcvd;
458                 struct obd_export *exp;
459                 struct filter_export_data *fed;
460
461                 if (!fcd) {
462                         OBD_ALLOC(fcd, sizeof(*fcd));
463                         if (!fcd)
464                                 GOTO(err_client, rc = -ENOMEM);
465                 }
466
467                 /* Don't assume off is incremented properly by
468                  * fsfilt_read_record(), in case sizeof(*fcd)
469                  * isn't the same as fsd->lsd_client_size.  */
470                 off = le32_to_cpu(fsd->lsd_client_start) +
471                         cl_idx * le16_to_cpu(fsd->lsd_client_size);
472                 rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
473                 if (rc) {
474                         CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
475                                LAST_RCVD, cl_idx, off, rc);
476                         break; /* read error shouldn't cause startup to fail */
477                 }
478
479                 if (fcd->fcd_uuid[0] == '\0') {
480                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
481                                cl_idx);
482                         continue;
483                 }
484
485                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
486
487                 /* These exports are cleaned up by filter_disconnect(), so they
488                  * need to be set up like real exports as filter_connect() does.
489                  */
490                 exp = class_new_export(obd);
491                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
492                        " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
493                        last_rcvd, le64_to_cpu(fsd->lsd_last_transno));
494                 if (exp == NULL)
495                         GOTO(err_client, rc = -ENOMEM);
496
497                 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
498                        sizeof exp->exp_client_uuid.uuid);
499                 fed = &exp->exp_filter_data;
500                 fed->fed_fcd = fcd;
501                 rc = filter_client_add(obd, filter, fed, cl_idx);
502                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
503
504                 /* create helper if export init gets more complex */
505                 spin_lock_init(&fed->fed_lock);
506
507                 fcd = NULL;
508                 exp->exp_replay_needed = 1;
509                 obd->obd_recoverable_clients++;
510                 obd->obd_max_recoverable_clients++;
511                 class_export_put(exp);
512
513                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
514                        cl_idx, last_rcvd);
515
516                 if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
517                         fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
518
519         }
520
521         if (fcd)
522                 OBD_FREE(fcd, sizeof(*fcd));
523
524         obd->obd_last_committed = le64_to_cpu(fsd->lsd_last_transno);
525
526         if (obd->obd_recoverable_clients) {
527                 CWARN("RECOVERY: service %s, %d recoverable clients, "
528                       "last_rcvd "LPU64"\n", obd->obd_name,
529                       obd->obd_recoverable_clients,
530                       le64_to_cpu(fsd->lsd_last_transno));
531                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
532                 obd->obd_recovering = 1;
533                 obd->obd_recovery_start = CURRENT_SECONDS;
534                 /* Only used for lprocfs_status */
535                 obd->obd_recovery_end = obd->obd_recovery_start +
536                         OBD_RECOVERY_TIMEOUT / HZ;
537         }
538
539 out:
540         filter->fo_mount_count = mount_count + 1;
541         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
542
543         /* save it, so mount count and last_transno is current */
544         rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
545         if (rc)
546                 GOTO(err_client, rc);
547
548         RETURN(0);
549
550 err_client:
551         class_disconnect_exports(obd);
552 err_fsd:
553         filter_free_server_data(filter);
554         RETURN(rc);
555 }
556
557 static int filter_cleanup_groups(struct obd_device *obd)
558 {
559         struct filter_obd *filter = &obd->u.filter;
560         struct file *filp;
561         struct dentry *dentry;
562         int i;
563         ENTRY;
564
565         if (filter->fo_dentry_O_groups != NULL) {
566                 for (i = 0; i < FILTER_GROUPS; i++) {
567                         dentry = filter->fo_dentry_O_groups[i];
568                         if (dentry != NULL)
569                                 f_dput(dentry);
570                 }
571                 OBD_FREE(filter->fo_dentry_O_groups,
572                          FILTER_GROUPS * sizeof(*filter->fo_dentry_O_groups));
573                 filter->fo_dentry_O_groups = NULL;
574         }
575         if (filter->fo_last_objid_files != NULL) {
576                 for (i = 0; i < FILTER_GROUPS; i++) {
577                         filp = filter->fo_last_objid_files[i];
578                         if (filp != NULL)
579                                 filp_close(filp, 0);
580                 }
581                 OBD_FREE(filter->fo_last_objid_files,
582                          FILTER_GROUPS * sizeof(*filter->fo_last_objid_files));
583                 filter->fo_last_objid_files = NULL;
584         }
585         if (filter->fo_dentry_O_sub != NULL) {
586                 for (i = 0; i < filter->fo_subdir_count; i++) {
587                         dentry = filter->fo_dentry_O_sub[i];
588                         if (dentry != NULL)
589                                 f_dput(dentry);
590                 }
591                 OBD_FREE(filter->fo_dentry_O_sub,
592                          filter->fo_subdir_count *
593                          sizeof(*filter->fo_dentry_O_sub));
594                 filter->fo_dentry_O_sub = NULL;
595         }
596         if (filter->fo_last_objids != NULL) {
597                 OBD_FREE(filter->fo_last_objids,
598                          FILTER_GROUPS * sizeof(*filter->fo_last_objids));
599                 filter->fo_last_objids = NULL;
600         }
601         if (filter->fo_dentry_O != NULL) {
602                 f_dput(filter->fo_dentry_O);
603                 filter->fo_dentry_O = NULL;
604         }
605         RETURN(0);
606 }
607
608 /* FIXME: object groups */
609 static int filter_prep_groups(struct obd_device *obd)
610 {
611         struct filter_obd *filter = &obd->u.filter;
612         struct dentry *dentry, *O_dentry;
613         struct file *filp;
614         int i, rc = 0, cleanup_phase = 0;
615         ENTRY;
616
617         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
618         CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
619         if (IS_ERR(O_dentry)) {
620                 rc = PTR_ERR(O_dentry);
621                 CERROR("cannot open/create O: rc = %d\n", rc);
622                 GOTO(cleanup, rc);
623         }
624         filter->fo_dentry_O = O_dentry;
625         cleanup_phase = 1; /* O_dentry */
626
627         /* Lookup "R" to tell if we're on an old OST FS and need to convert
628          * from O/R/<dir>/<objid> to O/0/<dir>/<objid>.  This can be removed
629          * some time post 1.0 when all old-style OSTs have converted along
630          * with the init_objid hack. */
631         dentry = ll_lookup_one_len("R", O_dentry, 1);
632         if (IS_ERR(dentry))
633                 GOTO(cleanup, rc = PTR_ERR(dentry));
634         if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
635                 struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1);
636                 ENTRY;
637
638                 CWARN("converting OST to new object layout\n");
639                 if (IS_ERR(O0_dentry)) {
640                         rc = PTR_ERR(O0_dentry);
641                         CERROR("error looking up O/0: rc %d\n", rc);
642                         GOTO(cleanup_R, rc);
643                 }
644
645                 if (O0_dentry->d_inode) {
646                         CERROR("Both O/R and O/0 exist. Fix manually.\n");
647                         GOTO(cleanup_O0, rc = -EEXIST);
648                 }
649
650                 down(&O_dentry->d_inode->i_sem);
651                 rc = vfs_rename(O_dentry->d_inode, dentry,
652                                 O_dentry->d_inode, O0_dentry);
653                 up(&O_dentry->d_inode->i_sem);
654
655                 if (rc) {
656                         CERROR("error renaming O/R to O/0: rc %d\n", rc);
657                         GOTO(cleanup_O0, rc);
658                 }
659                 filter->fo_fsd->lsd_feature_incompat |=
660                         cpu_to_le32(OBD_INCOMPAT_GROUPS);
661                 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
662                                                filter->fo_fsd, 1);
663                 GOTO(cleanup_O0, rc);
664
665         cleanup_O0:
666                 f_dput(O0_dentry);
667         cleanup_R:
668                 f_dput(dentry);
669                 if (rc)
670                         GOTO(cleanup, rc);
671         } else {
672                 f_dput(dentry);
673         }
674
675         OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
676         if (filter->fo_last_objids == NULL)
677                 GOTO(cleanup, rc = -ENOMEM);
678         cleanup_phase = 2; /* groups */
679
680         OBD_ALLOC(filter->fo_dentry_O_groups, FILTER_GROUPS * sizeof(dentry));
681         if (filter->fo_dentry_O_groups == NULL)
682                 GOTO(cleanup, rc = -ENOMEM);
683         OBD_ALLOC(filter->fo_last_objid_files, FILTER_GROUPS * sizeof(filp));
684         if (filter->fo_last_objid_files == NULL)
685                 GOTO(cleanup, rc = -ENOMEM);
686
687         for (i = 0; i < FILTER_GROUPS; i++) {
688                 char name[25];
689                 loff_t off = 0;
690
691                 sprintf(name, "%d", i);
692                 dentry = simple_mkdir(O_dentry, name, 0700, 1);
693                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
694                 if (IS_ERR(dentry)) {
695                         rc = PTR_ERR(dentry);
696                         CERROR("cannot lookup/create O/%s: rc = %d\n",
697                                name, rc);
698                         GOTO(cleanup, rc);
699                 }
700                 filter->fo_dentry_O_groups[i] = dentry;
701
702                 sprintf(name, "O/%d/LAST_ID", i);
703                 filp = filp_open(name, O_CREAT | O_RDWR, 0700);
704                 if (IS_ERR(filp)) {
705                         rc = PTR_ERR(filp);
706                         CERROR("cannot create %s: rc = %d\n", name, rc);
707                         GOTO(cleanup, rc);
708                 }
709                 filter->fo_last_objid_files[i] = filp;
710
711                 if (filp->f_dentry->d_inode->i_size == 0) {
712                         filter->fo_last_objids[i] = FILTER_INIT_OBJID;
713                         rc = filter_update_last_objid(obd, i, 1);
714                         if (rc)
715                                 GOTO(cleanup, rc);
716                         continue;
717                 }
718
719                 rc = fsfilt_read_record(obd, filp, &filter->fo_last_objids[i],
720                                         sizeof(__u64), &off);
721                 if (rc) {
722                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
723                                name, rc);
724                         GOTO(cleanup, rc);
725                 }
726                 filter->fo_last_objids[i] =
727                         le64_to_cpu(filter->fo_last_objids[i]);
728                 CDEBUG(D_HA, "%s: server last_objid group %d: "LPU64"\n",
729                        obd->obd_name, i, filter->fo_last_objids[i]);
730         }
731
732         if (filter->fo_subdir_count) {
733                 O_dentry = filter->fo_dentry_O_groups[0];
734                 OBD_ALLOC(filter->fo_dentry_O_sub,
735                           filter->fo_subdir_count * sizeof(dentry));
736                 if (filter->fo_dentry_O_sub == NULL)
737                         GOTO(cleanup, rc = -ENOMEM);
738
739                 for (i = 0; i < filter->fo_subdir_count; i++) {
740                         char dir[20];
741                         snprintf(dir, sizeof(dir), "d%u", i);
742
743                         dentry = simple_mkdir(O_dentry, dir, 0700, 1);
744                         CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
745                         if (IS_ERR(dentry)) {
746                                 rc = PTR_ERR(dentry);
747                                 CERROR("can't lookup/create O/0/%s: rc = %d\n",
748                                        dir, rc);
749                                 GOTO(cleanup, rc);
750                         }
751                         filter->fo_dentry_O_sub[i] = dentry;
752                 }
753         }
754         RETURN(0);
755
756  cleanup:
757         filter_cleanup_groups(obd);
758         return rc;
759 }
760
761 /* setup the object store with correct subdirectories */
762 static int filter_prep(struct obd_device *obd)
763 {
764         struct lvfs_run_ctxt saved;
765         struct filter_obd *filter = &obd->u.filter;
766         struct file *file;
767         struct inode *inode;
768         int rc = 0;
769         ENTRY;
770
771         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
772         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
773         if (!file || IS_ERR(file)) {
774                 rc = PTR_ERR(file);
775                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
776                        LAST_RCVD, rc);
777                 GOTO(out, rc);
778         }
779         filter->fo_rcvd_filp = file;
780         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
781                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
782                        file->f_dentry->d_inode->i_mode);
783                 GOTO(err_filp, rc = -ENOENT);
784         }
785
786         inode = file->f_dentry->d_parent->d_inode;
787         /* We use i_op->unlink directly in filter_vfs_unlink() */
788         if (!inode->i_op || !inode->i_op->create || !inode->i_op->unlink) {
789                 CERROR("%s: filesystem does not support create/unlink ops\n",
790                        obd->obd_name);
791                 GOTO(err_filp, rc = -EOPNOTSUPP);
792         }
793
794         rc = filter_init_server_data(obd, file);
795         if (rc) {
796                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
797                 GOTO(err_filp, rc);
798         }
799         /* open and create health check io file*/
800         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
801         if (IS_ERR(file)) {
802                 rc = PTR_ERR(file);
803                 CERROR("OBD filter: cannot open/create %s rc = %d\n",
804                         HEALTH_CHECK, rc);
805                 GOTO(err_filp, rc);
806         }
807         filter->fo_health_check_filp = file;
808         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
809                 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
810                        file->f_dentry->d_inode->i_mode);
811                 GOTO(err_health_check, rc = -ENOENT);
812         }
813         rc = lvfs_check_io_health(obd, file);
814         if (rc)
815                 GOTO(err_health_check, rc);
816
817         rc = filter_prep_groups(obd);
818         if (rc)
819                 GOTO(err_server_data, rc);
820  out:
821         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
822
823         return(rc);
824
825  err_server_data:
826         //class_disconnect_exports(obd, 0);
827         filter_free_server_data(filter);
828  err_health_check:
829         if (filp_close(filter->fo_health_check_filp, 0))
830                 CERROR("can't close %s after error\n", HEALTH_CHECK);
831         filter->fo_health_check_filp = NULL;
832  err_filp:
833         if (filp_close(filter->fo_rcvd_filp, 0))
834                 CERROR("can't close %s after error\n", LAST_RCVD);
835         filter->fo_rcvd_filp = NULL;
836         goto out;
837 }
838
839 /* cleanup the filter: write last used object id to status file */
840 static void filter_post(struct obd_device *obd)
841 {
842         struct lvfs_run_ctxt saved;
843         struct filter_obd *filter = &obd->u.filter;
844         int rc, i;
845
846         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
847          * best to start a transaction with h_sync, because we removed this
848          * from lastobjid */
849
850         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
851         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
852                                        filter->fo_fsd, 0);
853         if (rc)
854                 CERROR("error writing server data: rc = %d\n", rc);
855
856         for (i = 0; i < FILTER_GROUPS; i++) {
857                 rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1));
858                 if (rc)
859                         CERROR("error writing group %d lastobjid: rc = %d\n",
860                                i, rc);
861         }
862
863         rc = filp_close(filter->fo_rcvd_filp, 0);
864         filter->fo_rcvd_filp = NULL;
865         if (rc)
866                 CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
867
868         rc = filp_close(filter->fo_health_check_filp, 0);
869         filter->fo_health_check_filp = NULL;
870         if (rc)
871                 CERROR("error closing %s: rc = %d\n", HEALTH_CHECK, rc);
872
873         filter_cleanup_groups(obd);
874         filter_free_server_data(filter);
875         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
876 }
877
878 static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa,
879                                obd_id id)
880 {
881         obd_gr group = 0;
882         LASSERT(filter->fo_fsd != NULL);
883
884         if (oa != NULL) {
885                 LASSERT(oa->o_gr <= FILTER_GROUPS);
886                 group = oa->o_gr;
887         }
888
889         spin_lock(&filter->fo_objidlock);
890         filter->fo_last_objids[group] = id;
891         spin_unlock(&filter->fo_objidlock);
892 }
893
894 __u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
895 {
896         obd_id id;
897         obd_gr group = 0;
898         LASSERT(filter->fo_fsd != NULL);
899
900         if (oa != NULL) {
901                 LASSERT(oa->o_gr <= FILTER_GROUPS);
902                 group = oa->o_gr;
903         }
904
905         /* FIXME: object groups */
906         spin_lock(&filter->fo_objidlock);
907         id = filter->fo_last_objids[group];
908         spin_unlock(&filter->fo_objidlock);
909
910         return id;
911 }
912
913 static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
914 {
915         down(&dparent->d_inode->i_sem);
916         return 0;
917 }
918
919 /* We never dget the object parent, so DON'T dput it either */
920 struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
921 {
922         struct filter_obd *filter = &obd->u.filter;
923         LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */
924
925         if (group > 0 || filter->fo_subdir_count == 0)
926                 return filter->fo_dentry_O_groups[group];
927
928         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
929 }
930
931 /* We never dget the object parent, so DON'T dput it either */
932 struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
933                                   obd_id objid)
934 {
935         unsigned long now = jiffies;
936         struct dentry *dparent = filter_parent(obd, group, objid);
937         int rc;
938
939         if (IS_ERR(dparent))
940                 return dparent;
941
942         rc = filter_lock_dentry(obd, dparent);
943         fsfilt_check_slow(now, obd_timeout, "parent lock");
944         return rc ? ERR_PTR(rc) : dparent;
945 }
946
947 /* We never dget the object parent, so DON'T dput it either */
948 static void filter_parent_unlock(struct dentry *dparent)
949 {
950         up(&dparent->d_inode->i_sem);
951 }
952
953 /* How to get files, dentries, inodes from object id's.
954  *
955  * If dir_dentry is passed, the caller has already locked the parent
956  * appropriately for this operation (normally a write lock).  If
957  * dir_dentry is NULL, we do a read lock while we do the lookup to
958  * avoid races with create/destroy and such changing the directory
959  * internal to the filesystem code. */
960 struct dentry *filter_fid2dentry(struct obd_device *obd,
961                                  struct dentry *dir_dentry,
962                                  obd_gr group, obd_id id)
963 {
964         struct dentry *dparent = dir_dentry;
965         struct dentry *dchild;
966         char name[32];
967         int len;
968         ENTRY;
969
970         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
971                 RETURN(ERR_PTR(-ENOENT));
972
973         if (id == 0) {
974                 CERROR("fatal: invalid object id 0\n");
975                 RETURN(ERR_PTR(-ESTALE));
976         }
977
978         len = sprintf(name, LPU64, id);
979         if (dir_dentry == NULL) {
980                 dparent = filter_parent_lock(obd, group, id);
981                 if (IS_ERR(dparent)) {
982                         CERROR("%s: error getting object "LPU64":"LPU64
983                                " parent: rc %ld\n", obd->obd_name,
984                                id, group, PTR_ERR(dparent));
985                         RETURN(dparent);
986                 }
987         }
988         CDEBUG(D_INODE, "looking up object O/%.*s/%s\n",
989                dparent->d_name.len, dparent->d_name.name, name);
990         dchild = /*ll_*/lookup_one_len(name, dparent, len);
991         if (dir_dentry == NULL)
992                 filter_parent_unlock(dparent);
993         if (IS_ERR(dchild)) {
994                 CERROR("%s: child lookup error %ld\n", obd->obd_name,
995                        PTR_ERR(dchild));
996                 RETURN(dchild);
997         }
998
999         if (dchild->d_inode != NULL && is_bad_inode(dchild->d_inode)) {
1000                 CERROR("%s: got bad object "LPU64" inode %lu\n",
1001                        obd->obd_name, id, dchild->d_inode->i_ino);
1002                 f_dput(dchild);
1003                 RETURN(ERR_PTR(-ENOENT));
1004         }
1005
1006         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
1007                name, dchild, atomic_read(&dchild->d_count));
1008
1009         LASSERT(atomic_read(&dchild->d_count) > 0);
1010
1011         RETURN(dchild);
1012 }
1013
1014 static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
1015 {
1016         struct lustre_handle lockh;
1017         int flags = LDLM_AST_DISCARD_DATA, rc;
1018         struct ldlm_res_id res_id = { .name = { objid } };
1019         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1020
1021         ENTRY;
1022         /* Tell the clients that the object is gone now and that they should
1023          * throw away any cached pages. */
1024         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
1025                               LDLM_EXTENT, &policy, LCK_PW,
1026                               &flags, ldlm_blocking_ast, ldlm_completion_ast,
1027                               NULL, NULL, NULL, 0, NULL, &lockh);
1028
1029         /* We only care about the side-effects, just drop the lock. */
1030         if (rc == ELDLM_OK)
1031                 ldlm_lock_decref(&lockh, LCK_PW);
1032
1033         RETURN(rc);
1034 }
1035
1036 /* This is vfs_unlink() without down(i_sem).  If we call regular vfs_unlink()
1037  * we have 2.6 lock ordering issues with filter_commitrw_write() as it takes
1038  * i_sem before starting a handle, while filter_destroy() + vfs_unlink do the
1039  * reverse.  Caller must take i_sem before starting the transaction and we
1040  * drop it here before the inode is removed from the dentry.  bug 4180/6984 */
1041 int filter_vfs_unlink(struct inode *dir, struct dentry *dentry)
1042 {
1043         int rc;
1044         ENTRY;
1045
1046         /* don't need dir->i_zombie for 2.4, it is for rename/unlink of dir
1047          * itself we already hold dir->i_sem for child create/unlink ops */
1048         LASSERT(down_trylock(&dir->i_sem) != 0);
1049         LASSERT(down_trylock(&dentry->d_inode->i_sem) != 0);
1050
1051         /* may_delete() */
1052         if (!dentry->d_inode || dentry->d_parent->d_inode != dir)
1053                 GOTO(out, rc = -ENOENT);
1054
1055         rc = ll_permission(dir, MAY_WRITE | MAY_EXEC, NULL);
1056         if (rc)
1057                 GOTO(out, rc);
1058
1059         if (IS_APPEND(dir))
1060                 GOTO(out, rc = -EPERM);
1061
1062         /* check_sticky() */
1063         if ((dentry->d_inode->i_uid != current->fsuid && !capable(CAP_FOWNER))||
1064             IS_APPEND(dentry->d_inode) || IS_IMMUTABLE(dentry->d_inode))
1065                 GOTO(out, rc = -EPERM);
1066
1067         /* NOTE: This might need to go outside i_sem, though it isn't clear if
1068          *       that was done because of journal_start (which is already done
1069          *       here) or some other ordering issue. */
1070         DQUOT_INIT(dir);
1071
1072 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1073         rc = security_inode_unlink(dir, dentry);
1074         if (rc)
1075                 GOTO(out, rc);
1076 #endif
1077
1078         rc = dir->i_op->unlink(dir, dentry);
1079 out:
1080         /* need to drop i_sem before we lose inode reference */
1081         up(&dentry->d_inode->i_sem);
1082         if (rc == 0)
1083                 d_delete(dentry);
1084
1085         RETURN(rc);
1086 }
1087
1088 /* Caller must hold LCK_PW on parent and push us into kernel context.
1089  * Caller must hold child i_sem, we drop it always.
1090  * Caller is also required to ensure that dchild->d_inode exists. */
1091 static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
1092                                    struct dentry *dparent,
1093                                    struct dentry *dchild)
1094 {
1095         struct inode *inode = dchild->d_inode;
1096         int rc;
1097
1098         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1099                 CERROR("destroying objid %.*s ino %lu nlink %lu count %d\n",
1100                        dchild->d_name.len, dchild->d_name.name, inode->i_ino,
1101                        (unsigned long)inode->i_nlink,
1102                        atomic_read(&inode->i_count));
1103         }
1104
1105         rc = filter_vfs_unlink(dparent->d_inode, dchild);
1106         if (rc)
1107                 CERROR("error unlinking objid %.*s: rc %d\n",
1108                        dchild->d_name.len, dchild->d_name.name, rc);
1109         return(rc);
1110 }
1111
1112 static int filter_intent_policy(struct ldlm_namespace *ns,
1113                                 struct ldlm_lock **lockp, void *req_cookie,
1114                                 ldlm_mode_t mode, int flags, void *data)
1115 {
1116         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1117         struct ptlrpc_request *req = req_cookie;
1118         struct ldlm_lock *lock = *lockp, *l = NULL;
1119         struct ldlm_resource *res = lock->l_resource;
1120         ldlm_processing_policy policy;
1121         struct ost_lvb *res_lvb, *reply_lvb;
1122         struct ldlm_reply *rep;
1123         struct list_head *tmp;
1124         ldlm_error_t err;
1125         int tmpflags = 0, rc, repsize[2] = {sizeof(*rep), sizeof(*reply_lvb)};
1126         int only_liblustre = 0;
1127         ENTRY;
1128
1129         policy = ldlm_get_processing_policy(res);
1130         LASSERT(policy != NULL);
1131         LASSERT(req != NULL);
1132
1133         rc = lustre_pack_reply(req, 2, repsize, NULL);
1134         if (rc)
1135                 RETURN(req->rq_status = rc);
1136
1137         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
1138         LASSERT(rep != NULL);
1139
1140         reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
1141         LASSERT(reply_lvb != NULL);
1142
1143         //fixup_handle_for_resent_req(req, lock, &lockh);
1144
1145         /* If we grant any lock at all, it will be a whole-file read lock.
1146          * Call the extent policy function to see if our request can be
1147          * granted, or is blocked. */
1148         lock->l_policy_data.l_extent.start = 0;
1149         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
1150         lock->l_req_mode = LCK_PR;
1151
1152         LASSERT(ns == res->lr_namespace);
1153         l_lock(&ns->ns_lock);
1154
1155         res->lr_tmp = &rpc_list;
1156         rc = policy(lock, &tmpflags, 0, &err);
1157         res->lr_tmp = NULL;
1158
1159         /* FIXME: we should change the policy function slightly, to not make
1160          * this list at all, since we just turn around and free it */
1161         while (!list_empty(&rpc_list)) {
1162                 struct ldlm_ast_work *w =
1163                         list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
1164                 list_del(&w->w_list);
1165                 LDLM_LOCK_PUT(w->w_lock);
1166                 OBD_FREE(w, sizeof(*w));
1167         }
1168
1169         /* The lock met with no resistance; we're finished. */
1170         if (rc == LDLM_ITER_CONTINUE) {
1171                 l_unlock(&ns->ns_lock);
1172                 /*
1173                  * do not grant locks to the liblustre clients: they cannot
1174                  * handle ASTs robustly.
1175                  */
1176                 if (lock->l_export->exp_libclient) {
1177                         ldlm_resource_unlink_lock(lock);
1178                         RETURN(ELDLM_LOCK_ABORTED);
1179                 }
1180                 RETURN(ELDLM_LOCK_REPLACED);
1181         }
1182
1183         /* Do not grant any lock, but instead send GL callbacks.  The extent
1184          * policy nicely created a list of all PW locks for us.  We will choose
1185          * the highest of those which are larger than the size in the LVB, if
1186          * any, and perform a glimpse callback. */
1187         down(&res->lr_lvb_sem);
1188         res_lvb = res->lr_lvb_data;
1189         LASSERT(res_lvb != NULL);
1190         *reply_lvb = *res_lvb;
1191         up(&res->lr_lvb_sem);
1192
1193         list_for_each(tmp, &res->lr_granted) {
1194                 struct ldlm_lock *tmplock =
1195                         list_entry(tmp, struct ldlm_lock, l_res_link);
1196
1197                 if (tmplock->l_granted_mode == LCK_PR)
1198                         continue;
1199                 /*
1200                  * ->ns_lock guarantees that no new locks are granted, and,
1201                  * therefore, that res->lr_lvb_data cannot increase beyond the
1202                  * end of already granted lock. As a result, it is safe to
1203                  * check against "stale" reply_lvb->lvb_size value without
1204                  * res->lr_lvb_sem.
1205                  */
1206                 if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
1207                         continue;
1208
1209                 /* Don't send glimpse ASTs to liblustre clients.  They aren't
1210                  * listening for them, and they do entirely synchronous I/O
1211                  * anyways. */
1212                 if (tmplock->l_export == NULL ||
1213                     tmplock->l_export->exp_libclient == 1) {
1214                         only_liblustre = 1;
1215                         continue;
1216                 }
1217
1218                 if (l == NULL) {
1219                         l = LDLM_LOCK_GET(tmplock);
1220                         continue;
1221                 }
1222
1223                 if (l->l_policy_data.l_extent.start >
1224                     tmplock->l_policy_data.l_extent.start)
1225                         continue;
1226
1227                 LDLM_LOCK_PUT(l);
1228                 l = LDLM_LOCK_GET(tmplock);
1229         }
1230         l_unlock(&ns->ns_lock);
1231
1232         /* There were no PW locks beyond the size in the LVB; finished. */
1233         if (l == NULL) {
1234                 if (only_liblustre) {
1235                         /* If we discovered a liblustre client with a PW lock,
1236                          * however, the LVB may be out of date!  The LVB is
1237                          * updated only on glimpse (which we don't do for
1238                          * liblustre clients) and cancel (which the client
1239                          * obviously has not yet done).  So if it has written
1240                          * data but kept the lock, the LVB is stale and needs
1241                          * to be updated from disk.
1242                          *
1243                          * Of course, this will all disappear when we switch to
1244                          * taking liblustre locks on the OST. */
1245                         if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
1246                                 ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
1247                 }
1248                 RETURN(ELDLM_LOCK_ABORTED);
1249         }
1250
1251         /*
1252          * This check is for lock taken in filter_prepare_destroy() that does
1253          * not have l_glimpse_ast set. So the logic is: if there is a lock
1254          * with no l_glimpse_ast set, this object is being destroyed already.
1255          *
1256          * Hence, if you are grabbing DLM locks on the server, always set
1257          * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
1258          */
1259         if (l->l_glimpse_ast == NULL) {
1260                 /* We are racing with unlink(); just return -ENOENT */
1261                 rep->lock_policy_res1 = -ENOENT;
1262                 goto out;
1263         }
1264
1265         LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
1266         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
1267         /* Update the LVB from disk if the AST failed (this is a legal race) */
1268         /*
1269          * XXX nikita: situation when ldlm_server_glimpse_ast() failed before
1270          * sending ast is not handled. This can result in lost client writes.
1271          */
1272         if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
1273                 ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
1274
1275         down(&res->lr_lvb_sem);
1276         *reply_lvb = *res_lvb;
1277         up(&res->lr_lvb_sem);
1278
1279  out:
1280         LDLM_LOCK_PUT(l);
1281
1282         RETURN(ELDLM_LOCK_ABORTED);
1283 }
1284
1285 /*
1286  * per-obd_device iobuf pool.
1287  *
1288  * To avoid memory deadlocks in low-memory setups, amount of dynamic
1289  * allocations in write-path has to be minimized (see bug 5137).
1290  *
1291  * Pages, niobuf_local's and niobuf_remote's are pre-allocated and attached to
1292  * OST threads (see ost_thread_{init,done}()).
1293  *
1294  * "iobuf's" used by filter cannot be attached to OST thread, however, because
1295  * at the OST layer there are only (potentially) multiple obd_device of type
1296  * unknown at the time of OST thread creation.
1297  *
1298  * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool
1299  * field). This array has size OST_MAX_THREADS, so that each OST thread uses
1300  * it's very own iobuf.
1301  *
1302  * Functions below
1303  *
1304  *     filter_kiobuf_pool_init()
1305  *
1306  *     filter_kiobuf_pool_done()
1307  *
1308  *     filter_iobuf_get()
1309  *
1310  * operate on this array. They are "generic" in a sense that they don't depend
1311  * on actual type of iobuf's (the latter depending on Linux kernel version).
1312  */
1313
1314 /*
1315  * destroy pool created by filter_iobuf_pool_init
1316  */
1317 static void filter_iobuf_pool_done(struct filter_obd *filter)
1318 {
1319         struct filter_iobuf **pool;
1320         int i;
1321
1322         ENTRY;
1323
1324         pool = filter->fo_iobuf_pool;
1325         if (pool != NULL) {
1326                 for (i = 0; i < filter->fo_iobuf_count; ++ i) {
1327                         if (pool[i] != NULL)
1328                                 filter_free_iobuf(pool[i]);
1329                 }
1330                 OBD_FREE(pool, filter->fo_iobuf_count * sizeof pool[0]);
1331                 filter->fo_iobuf_pool = NULL;
1332         }
1333         EXIT;
1334 }
1335
1336 /*
1337  * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write().
1338  */
1339 static int filter_iobuf_pool_init(struct filter_obd *filter)
1340 {
1341         void **pool;
1342
1343         ENTRY;
1344
1345         OBD_ALLOC_GFP(filter->fo_iobuf_pool, OST_MAX_THREADS * sizeof(*pool),
1346                       GFP_KERNEL);
1347         if (filter->fo_iobuf_pool == NULL)
1348                 RETURN(-ENOMEM);
1349
1350         filter->fo_iobuf_count = OST_MAX_THREADS;
1351
1352         RETURN(0);
1353 }
1354
1355 /* Return iobuf allocated for @thread_id.  We don't know in advance how
1356  * many threads there will be so we allocate a large empty array and only
1357  * fill in those slots that are actually in use.
1358  * If we haven't allocated a pool entry for this thread before, do so now. */
1359 void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti)
1360 {
1361         int thread_id                    = oti ? oti->oti_thread_id : -1;
1362         struct filter_iobuf  *pool       = NULL;
1363         struct filter_iobuf **pool_place = NULL;
1364
1365         if (thread_id >= 0) {
1366                 LASSERT(thread_id < filter->fo_iobuf_count);
1367                 pool = *(pool_place = &filter->fo_iobuf_pool[thread_id]);
1368         }
1369
1370         if (unlikely(pool == NULL)) {
1371                 pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE,
1372                                           PTLRPC_MAX_BRW_PAGES);
1373                 if (pool_place != NULL)
1374                         *pool_place = pool;
1375         }
1376
1377         return pool;
1378 }
1379
1380 /* mount the file system (secretly).  lustre_cfg parameters are:
1381  * 1 = device
1382  * 2 = fstype
1383  * 3 = flags: failover=f, failout=n
1384  * 4 = mount options
1385  */
1386 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1387                         void *option)
1388 {
1389         struct lustre_cfg* lcfg = buf;
1390         struct filter_obd *filter = &obd->u.filter;
1391         struct vfsmount *mnt;
1392         struct lustre_mount_info *lmi;
1393         struct obd_uuid uuid;
1394         __u8 *uuid_ptr;
1395         char *str, *label;
1396         char ns_name[48];
1397         int rc;
1398         ENTRY;
1399
1400         if (lcfg->lcfg_bufcount < 3 ||
1401             LUSTRE_CFG_BUFLEN(lcfg, 1) < 1 ||
1402             LUSTRE_CFG_BUFLEN(lcfg, 2) < 1)
1403                 RETURN(-EINVAL);
1404
1405         lmi = server_get_mount(obd->obd_name);
1406         if (lmi) {
1407                 /* We already mounted in lustre_fill_super.
1408                    lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1409                 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
1410                 mnt = lmi->lmi_mnt;
1411                 obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1412         } else {
1413                 /* old path - used by lctl */
1414                 CERROR("Using old MDS mount method\n");
1415                 mnt = do_kern_mount(lustre_cfg_string(lcfg, 2),
1416                                     MS_NOATIME|MS_NODIRATIME,
1417                                     lustre_cfg_string(lcfg, 1), option);    
1418                 if (IS_ERR(mnt)) {
1419                         rc = PTR_ERR(mnt);
1420                         LCONSOLE_ERROR("Can't mount disk %s (%d)\n",
1421                                        lustre_cfg_string(lcfg, 1), rc);
1422                         RETURN(rc);
1423                 }
1424
1425                 obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
1426         }
1427         if (IS_ERR(obd->obd_fsops))
1428                 GOTO(err_mntput, rc = PTR_ERR(obd->obd_fsops));
1429
1430         rc = filter_iobuf_pool_init(filter);
1431         if (rc != 0)
1432                 GOTO(err_ops, rc);
1433
1434         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1435
1436         /* failover is the default */
1437         obd->obd_replayable = 1;
1438         obd_sync_filter = 1;
1439
1440         if (lcfg->lcfg_bufcount > 3 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
1441                 str = lustre_cfg_string(lcfg, 3);
1442                 if (strchr(str, 'n')) {
1443                         CWARN("%s: recovery disabled\n", obd->obd_name);
1444                         obd->obd_replayable = 0;
1445                         obd_sync_filter = 0;
1446                 }
1447         }
1448
1449         filter->fo_vfsmnt = mnt;
1450         obd->u.obt.obt_sb = mnt->mnt_sb;
1451         filter->fo_fstype = mnt->mnt_sb->s_type->name;
1452         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1453
1454         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1455         obd->obd_lvfs_ctxt.pwdmnt = mnt;
1456         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
1457         obd->obd_lvfs_ctxt.fs = get_ds();
1458         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
1459
1460         rc = filter_prep(obd);
1461         if (rc)
1462                 GOTO(err_ops, rc);
1463
1464         filter->fo_destroy_in_progress = 0;
1465         sema_init(&filter->fo_create_lock, 1);
1466         spin_lock_init(&filter->fo_translock);
1467         spin_lock_init(&filter->fo_objidlock);
1468         spin_lock_init(&filter->fo_stats_lock);
1469         INIT_LIST_HEAD(&filter->fo_export_list);
1470         sema_init(&filter->fo_alloc_lock, 1);
1471         spin_lock_init(&filter->fo_r_pages.oh_lock);
1472         spin_lock_init(&filter->fo_w_pages.oh_lock);
1473         spin_lock_init(&filter->fo_read_rpc_hist.oh_lock);
1474         spin_lock_init(&filter->fo_write_rpc_hist.oh_lock);
1475         spin_lock_init(&filter->fo_r_io_time.oh_lock);
1476         spin_lock_init(&filter->fo_w_io_time.oh_lock);
1477         spin_lock_init(&filter->fo_r_discont_pages.oh_lock);
1478         spin_lock_init(&filter->fo_w_discont_pages.oh_lock);
1479         spin_lock_init(&filter->fo_r_discont_blocks.oh_lock);
1480         spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
1481         spin_lock_init(&filter->fo_r_disk_iosize.oh_lock);
1482         spin_lock_init(&filter->fo_w_disk_iosize.oh_lock);
1483         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
1484
1485         sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
1486         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1487         if (obd->obd_namespace == NULL)
1488                 GOTO(err_post, rc = -ENOMEM);
1489         obd->obd_namespace->ns_lvbp = obd;
1490         obd->obd_namespace->ns_lvbo = &filter_lvbo;
1491         ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
1492
1493         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1494                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1495
1496         rc = llog_cat_initialize(obd, 1);
1497         if (rc) {
1498                 CERROR("failed to setup llogging subsystems\n");
1499                 GOTO(err_post, rc);
1500         }
1501
1502         rc = lquota_setup(quota_interface, obd, lcfg);
1503         if (rc)
1504                 GOTO(err_post, rc);
1505
1506         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
1507         if (uuid_ptr != NULL) {
1508                 class_uuid_unparse(uuid_ptr, &uuid);
1509                 str = uuid.uuid;
1510         } else {
1511                 str = "no UUID";
1512         }
1513         label = fsfilt_label(obd, obd->u.obt.obt_sb);
1514
1515         if (obd->obd_recovering) {
1516                 LCONSOLE_WARN("OST %s now serving %s (%s%s%s), but will be in "
1517                               "recovery until %d %s reconnect, or if no clients"
1518                               " reconnect for %d:%.02d; during that time new "
1519                               "clients will not be allowed to connect. "
1520                               "Recovery progress can be monitored by watching "
1521                               "/proc/fs/lustre/obdfilter/%s/recovery_status.\n",
1522                               obd->obd_name, lustre_cfg_string(lcfg, 1),
1523                               label ?: "", label ? "/" : "", str,
1524                               obd->obd_recoverable_clients,
1525                               (obd->obd_recoverable_clients == 1)
1526                               ? "client" : "clients",
1527                               (int)(OBD_RECOVERY_TIMEOUT / HZ) / 60,
1528                               (int)(OBD_RECOVERY_TIMEOUT / HZ) % 60,
1529                               obd->obd_name);
1530         } else {
1531                 LCONSOLE_INFO("OST %s now serving %s (%s%s%s) with recovery "
1532                               "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
1533                               label ?: "", label ? "/" : "", str,
1534                               obd->obd_replayable ? "enabled" : "disabled");
1535         }
1536
1537         RETURN(0);
1538
1539 err_post:
1540         filter_post(obd);
1541 err_ops:
1542         fsfilt_put_ops(obd->obd_fsops);
1543         filter_iobuf_pool_done(filter);
1544 err_mntput:
1545         if (lmi) {
1546                 server_put_mount(obd->obd_name, mnt);
1547         } else {
1548                 /* old method */
1549                 unlock_kernel();
1550                 mntput(mnt);
1551                 lock_kernel();
1552         }
1553         obd->u.obt.obt_sb = 0;
1554         return rc;
1555 }
1556
1557 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1558 {
1559         struct lprocfs_static_vars lvars;
1560         struct lustre_cfg* lcfg = buf;
1561         unsigned long page;
1562         int rc;
1563
1564         CLASSERT(offsetof(struct obd_device, u.obt) ==
1565                  offsetof(struct obd_device, u.filter.fo_obt));
1566
1567         if (!LUSTRE_CFG_BUFLEN(lcfg, 1) || !LUSTRE_CFG_BUFLEN(lcfg, 2))
1568                 RETURN(-EINVAL);
1569
1570         /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */
1571         page = get_zeroed_page(GFP_KERNEL);
1572         if (!page)
1573                 RETURN(-ENOMEM);
1574
1575         memcpy((void *)page, lustre_cfg_buf(lcfg, 4),
1576                LUSTRE_CFG_BUFLEN(lcfg, 4));
1577         rc = filter_common_setup(obd, len, buf, (void *)page);
1578         free_page(page);
1579
1580         lprocfs_init_vars(filter, &lvars);
1581         if (rc == 0 && lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
1582             lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) {
1583                 /* Init obdfilter private stats here */
1584                 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1585                                      LPROCFS_CNTR_AVGMINMAX,
1586                                      "read_bytes", "bytes");
1587                 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1588                                      LPROCFS_CNTR_AVGMINMAX,
1589                                      "write_bytes", "bytes");
1590
1591                 lproc_filter_attach_seqstat(obd);
1592         }
1593
1594         ping_evictor_start();
1595
1596         return rc;
1597 }
1598
1599 static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/;
1600 static struct llog_operations filter_size_orig_logops = {
1601         lop_setup: llog_obd_origin_setup,
1602         lop_cleanup: llog_obd_origin_cleanup,
1603         lop_add: llog_obd_origin_add
1604 };
1605
1606 static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
1607                             int count, struct llog_catid *catid)
1608 {
1609         struct llog_ctxt *ctxt;
1610         int rc;
1611         ENTRY;
1612
1613         filter_mds_ost_repl_logops = llog_client_ops;
1614         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
1615         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
1616         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
1617
1618         rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
1619                         &filter_mds_ost_repl_logops);
1620         if (rc)
1621                 RETURN(rc);
1622
1623         /* FIXME - assign unlink_cb for filter's recovery */
1624         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
1625         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
1626
1627         rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
1628                         &filter_size_orig_logops);
1629         RETURN(rc);
1630 }
1631
1632 static int filter_llog_finish(struct obd_device *obd, int count)
1633 {
1634         struct llog_ctxt *ctxt;
1635         int rc = 0, rc2 = 0;
1636         ENTRY;
1637
1638         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
1639         if (ctxt)
1640                 rc = llog_cleanup(ctxt);
1641
1642         ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
1643         if (ctxt)
1644                 rc2 = llog_cleanup(ctxt);
1645         if (!rc)
1646                 rc = rc2;
1647
1648         RETURN(rc);
1649 }
1650
1651 static int filter_precleanup(struct obd_device *obd, int stage)
1652 {
1653         int rc = 0;
1654         ENTRY;
1655
1656         switch(stage) {
1657         case OBD_CLEANUP_EXPORTS:
1658                 target_cleanup_recovery(obd);
1659                 break;
1660         case OBD_CLEANUP_SELF_EXP:
1661                 rc = filter_llog_finish(obd, 0);
1662         }
1663         RETURN(rc);
1664 }
1665
1666 static int filter_cleanup(struct obd_device *obd)
1667 {
1668         struct filter_obd *filter = &obd->u.filter;
1669         lvfs_sbdev_type save_dev;
1670         int must_relock = 0, must_put = 0;
1671         ENTRY;
1672
1673         if (obd->obd_fail)
1674                 CERROR("%s: shutting down for failover; client state will"
1675                        " be preserved.\n", obd->obd_name);
1676
1677         if (!list_empty(&obd->obd_exports)) {
1678                 CERROR("%s: still has clients!\n", obd->obd_name);
1679                 class_disconnect_exports(obd);
1680                 if (!list_empty(&obd->obd_exports)) {
1681                         CERROR("still has exports after forced cleanup?\n");
1682                         RETURN(-EBUSY);
1683                 }
1684         }
1685
1686         ping_evictor_stop();
1687
1688         lquota_cleanup(quota_interface, obd);
1689
1690         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
1691
1692         if (obd->u.obt.obt_sb == NULL)
1693                 RETURN(0);
1694         save_dev = lvfs_sbdev(obd->u.obt.obt_sb);
1695
1696         lprocfs_free_obd_stats(obd);
1697         lprocfs_obd_cleanup(obd);
1698
1699         filter_post(obd);
1700
1701         shrink_dcache_parent(obd->u.obt.obt_sb->s_root);
1702
1703         LL_DQUOT_OFF(obd->u.obt.obt_sb);
1704
1705         must_put = server_put_mount(obd->obd_name, filter->fo_vfsmnt);
1706         /* must_put is for old method (l_p_m returns non-0 on err) */
1707
1708         /* We can only unlock kernel if we are in the context of sys_ioctl,
1709            otherwise we never called lock_kernel */
1710         if (ll_kernel_locked()) {
1711                 unlock_kernel();
1712                 must_relock++;
1713         }
1714         
1715         if (must_put) 
1716                 /* In case we didn't mount with lustre_get_mount -- old method*/
1717                 mntput(filter->fo_vfsmnt);
1718         obd->u.obt.obt_sb = NULL;
1719
1720         lvfs_clear_rdonly(save_dev);
1721
1722         if (must_relock)
1723                 lock_kernel();
1724
1725         fsfilt_put_ops(obd->obd_fsops);
1726
1727         filter_iobuf_pool_done(filter);
1728
1729         LCONSOLE_INFO("OST %s has stopped.\n", obd->obd_name);
1730
1731         RETURN(0);
1732 }
1733
1734 static int filter_connect_internal(struct obd_export *exp,
1735                                    struct obd_connect_data *data)
1736 {
1737         if (data != NULL) {
1738                 CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
1739                        " ocd_version: %x ocd_grant: %d\n",
1740                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1741                        data->ocd_connect_flags, data->ocd_version,
1742                        data->ocd_grant);
1743
1744                 data->ocd_connect_flags &= OST_CONNECT_SUPPORTED;
1745                 exp->exp_connect_flags = data->ocd_connect_flags;
1746                 data->ocd_version = LUSTRE_VERSION_CODE;
1747
1748                 if (exp->exp_connect_flags & OBD_CONNECT_GRANT) {
1749                         obd_size left, want;
1750
1751                         spin_lock(&exp->exp_obd->obd_osfs_lock);
1752                         left = filter_grant_space_left(exp);
1753                         want = data->ocd_grant;
1754                         data->ocd_grant = filter_grant(exp, 0, want, left);
1755                         spin_unlock(&exp->exp_obd->obd_osfs_lock);
1756
1757                         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %d want: "
1758                                "%lld left: %lld\n", exp->exp_obd->obd_name,
1759                                exp->exp_client_uuid.uuid, exp,
1760                                data->ocd_grant, want, left);
1761                 }
1762         }
1763
1764         RETURN(0);
1765 }
1766
1767 static int filter_reconnect(struct obd_export *exp, struct obd_device *obd,
1768                             struct obd_uuid *cluuid,
1769                             struct obd_connect_data *data)
1770 {
1771         int rc;
1772         ENTRY;
1773
1774         if (exp == NULL || obd == NULL || cluuid == NULL)
1775                 RETURN(-EINVAL);
1776
1777         rc = filter_connect_internal(exp, data);
1778
1779         RETURN(rc);
1780 }
1781
1782 /* nearly identical to mds_connect */
1783 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1784                           struct obd_uuid *cluuid, struct obd_connect_data *data)
1785 {
1786         struct obd_export *exp;
1787         struct filter_export_data *fed;
1788         struct filter_client_data *fcd = NULL;
1789         struct filter_obd *filter = &obd->u.filter;
1790         int rc;
1791         ENTRY;
1792
1793         if (conn == NULL || obd == NULL || cluuid == NULL)
1794                 RETURN(-EINVAL);
1795
1796         rc = class_connect(conn, obd, cluuid);
1797         if (rc)
1798                 RETURN(rc);
1799         exp = class_conn2export(conn);
1800         LASSERT(exp != NULL);
1801
1802         fed = &exp->exp_filter_data;
1803
1804         spin_lock_init(&fed->fed_lock);
1805
1806         rc = filter_connect_internal(exp, data);
1807         if (rc)
1808                 GOTO(cleanup, rc);
1809
1810         if (!obd->obd_replayable)
1811                 GOTO(cleanup, rc = 0);
1812
1813         OBD_ALLOC(fcd, sizeof(*fcd));
1814         if (!fcd) {
1815                 CERROR("filter: out of memory for client data\n");
1816                 GOTO(cleanup, rc = -ENOMEM);
1817         }
1818
1819         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1820         fed->fed_fcd = fcd;
1821
1822         rc = filter_client_add(obd, filter, fed, -1);
1823
1824         GOTO(cleanup, rc);
1825
1826 cleanup:
1827         if (rc) {
1828                 if (fcd) {
1829                         OBD_FREE(fcd, sizeof(*fcd));
1830                         fed->fed_fcd = NULL;
1831                 }
1832                 class_disconnect(exp);
1833         } else {
1834                 class_export_put(exp);
1835         }
1836
1837         RETURN(rc);
1838 }
1839
1840 /* Do extra sanity checks for grant accounting.  We do this at connect,
1841  * disconnect, and statfs RPC time, so it shouldn't be too bad.  We can
1842  * always get rid of it or turn it off when we know accounting is good. */
1843 static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
1844 {
1845         struct filter_export_data *fed;
1846         struct obd_export *exp;
1847         obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
1848         obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
1849         obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
1850
1851         if (list_empty(&obd->obd_exports))
1852                 return;
1853
1854         spin_lock(&obd->obd_osfs_lock);
1855         spin_lock(&obd->obd_dev_lock);
1856         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1857                 int error = 0;
1858                 fed = &exp->exp_filter_data;
1859                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
1860                     fed->fed_dirty < 0)
1861                         error = 1;
1862                 if (maxsize > 0) { /* we may not have done a statfs yet */
1863                         LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
1864                                  "%s: cli %s/%p %ld+%ld > "LPU64"\n", func,
1865                                  exp->exp_client_uuid.uuid, exp,
1866                                  fed->fed_grant, fed->fed_pending, maxsize);
1867                         LASSERTF(fed->fed_dirty <= maxsize,
1868                                  "%s: cli %s/%p %ld > "LPU64"\n", func,
1869                                  exp->exp_client_uuid.uuid, exp,
1870                                  fed->fed_dirty, maxsize);
1871                 }
1872                 if (error)
1873                         CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
1874                                obd->obd_name, exp->exp_client_uuid.uuid, exp,
1875                                fed->fed_dirty, fed->fed_pending,fed->fed_grant);
1876                 else
1877                         CDEBUG(D_CACHE, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
1878                                obd->obd_name, exp->exp_client_uuid.uuid, exp,
1879                                fed->fed_dirty, fed->fed_pending,fed->fed_grant);
1880                 tot_granted += fed->fed_grant + fed->fed_pending;
1881                 tot_pending += fed->fed_pending;
1882                 tot_dirty += fed->fed_dirty;
1883         }
1884         fo_tot_granted = obd->u.filter.fo_tot_granted;
1885         fo_tot_pending = obd->u.filter.fo_tot_pending;
1886         fo_tot_dirty = obd->u.filter.fo_tot_dirty;
1887         spin_unlock(&obd->obd_dev_lock);
1888         spin_unlock(&obd->obd_osfs_lock);
1889
1890         /* Do these assertions outside the spinlocks so we don't kill system */
1891         if (tot_granted != fo_tot_granted)
1892                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
1893                        func, tot_granted, fo_tot_granted);
1894         if (tot_pending != fo_tot_pending)
1895                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
1896                        func, tot_pending, fo_tot_pending);
1897         if (tot_dirty != fo_tot_dirty)
1898                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
1899                        func, tot_dirty, fo_tot_dirty);
1900         if (tot_pending > tot_granted)
1901                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
1902                        func, tot_pending, tot_granted);
1903         if (tot_granted > maxsize)
1904                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
1905                        func, tot_granted, maxsize);
1906         if (tot_dirty > maxsize)
1907                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
1908                        func, tot_dirty, maxsize);
1909 }
1910
1911 /* Remove this client from the grant accounting totals.  We also remove
1912  * the export from the obd device under the osfs and dev locks to ensure
1913  * that the filter_grant_sanity_check() calculations are always valid.
1914  * The client should do something similar when it invalidates its import. */
1915 static void filter_grant_discard(struct obd_export *exp)
1916 {
1917         struct obd_device *obd = exp->exp_obd;
1918         struct filter_obd *filter = &obd->u.filter;
1919         struct filter_export_data *fed = &exp->exp_filter_data;
1920
1921         spin_lock(&obd->obd_osfs_lock);
1922         spin_lock(&obd->obd_dev_lock);
1923         list_del_init(&exp->exp_obd_chain);
1924         spin_unlock(&obd->obd_dev_lock);
1925
1926         LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
1927                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
1928                  obd->obd_name, filter->fo_tot_granted,
1929                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
1930         filter->fo_tot_granted -= fed->fed_grant;
1931         LASSERTF(filter->fo_tot_pending >= fed->fed_pending,
1932                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
1933                  obd->obd_name, filter->fo_tot_pending,
1934                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
1935         LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
1936                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
1937                  obd->obd_name, filter->fo_tot_dirty,
1938                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
1939         filter->fo_tot_dirty -= fed->fed_dirty;
1940         fed->fed_dirty = 0;
1941         fed->fed_grant = 0;
1942
1943         spin_unlock(&obd->obd_osfs_lock);
1944 }
1945
1946 static int filter_destroy_export(struct obd_export *exp)
1947 {
1948         ENTRY;
1949
1950         if (exp->exp_filter_data.fed_pending)
1951                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
1952                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1953                        exp, exp->exp_filter_data.fed_pending);
1954
1955         target_destroy_export(exp);
1956
1957         if (exp->exp_obd->obd_replayable)
1958                 filter_client_free(exp);
1959         else
1960                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
1961
1962         filter_grant_discard(exp);
1963
1964         if (!(exp->exp_flags & OBD_OPT_FORCE))
1965                 filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
1966
1967         RETURN(0);
1968 }
1969
1970 /* also incredibly similar to mds_disconnect */
1971 static int filter_disconnect(struct obd_export *exp)
1972 {
1973         struct obd_device *obd = exp->exp_obd;
1974         struct llog_ctxt *ctxt;
1975         int rc, err;
1976         ENTRY;
1977
1978         LASSERT(exp);
1979         class_export_get(exp);
1980
1981         if (!(exp->exp_flags & OBD_OPT_FORCE))
1982                 filter_grant_sanity_check(obd, __FUNCTION__);
1983         filter_grant_discard(exp);
1984
1985         /* Disconnect early so that clients can't keep using export */
1986         rc = class_disconnect(exp);
1987         ldlm_cancel_locks_for_export(exp);
1988
1989         fsfilt_sync(obd, obd->u.obt.obt_sb);
1990
1991         /* flush any remaining cancel messages out to the target */
1992         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
1993         err = llog_sync(ctxt, exp);
1994         if (err)
1995                 CERROR("error flushing logs to MDS: rc %d\n", err);
1996
1997         class_export_put(exp);
1998         RETURN(rc);
1999 }
2000
2001 struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
2002                                   const char *what, int quiet)
2003 {
2004         struct dentry *dchild = NULL;
2005         obd_gr group = 0;
2006
2007         if (oa->o_valid & OBD_MD_FLGROUP)
2008                 group = oa->o_gr;
2009
2010         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
2011
2012         if (IS_ERR(dchild)) {
2013                 CERROR("%s error looking up object: "LPU64"\n",
2014                        what, oa->o_id);
2015                 RETURN(dchild);
2016         }
2017
2018         if (dchild->d_inode == NULL) {
2019                 if (!quiet)
2020                         CERROR("%s: %s on non-existent object: "LPU64"\n",
2021                                obd->obd_name, what, oa->o_id);
2022                 f_dput(dchild);
2023                 RETURN(ERR_PTR(-ENOENT));
2024         }
2025
2026         return dchild;
2027 }
2028
2029 static int filter_getattr(struct obd_export *exp, struct obdo *oa,
2030                           struct lov_stripe_md *md)
2031 {
2032         struct dentry *dentry = NULL;
2033         struct obd_device *obd;
2034         int rc = 0;
2035         ENTRY;
2036
2037         obd = class_exp2obd(exp);
2038         if (obd == NULL) {
2039                 CDEBUG(D_IOCTL, "invalid client export %p\n", exp);
2040                 RETURN(-EINVAL);
2041         }
2042
2043         dentry = filter_oa2dentry(obd, oa);
2044         if (IS_ERR(dentry))
2045                 RETURN(PTR_ERR(dentry));
2046
2047         /* Limit the valid bits in the return data to what we actually use */
2048         oa->o_valid = OBD_MD_FLID;
2049         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
2050
2051         f_dput(dentry);
2052         RETURN(rc);
2053 }
2054
2055 /* this should be enabled/disabled in condition to enabled/disabled large
2056  * inodes (fast EAs) in backing store FS. */
2057 int filter_update_fidea(struct obd_export *exp, struct inode *inode,
2058                         void *handle, struct obdo *oa)
2059 {
2060         struct obd_device *obd = exp->exp_obd;
2061         int rc = 0;
2062         ENTRY;
2063
2064         if (oa->o_valid & OBD_MD_FLFID) {
2065                 struct filter_fid ff;
2066                 obd_gr group = 0;
2067
2068                 if (oa->o_valid & OBD_MD_FLGROUP)
2069                         group = oa->o_gr;
2070
2071                 /* packing fid and converting it to LE for storing into EA.
2072                  * Here ->o_stripe_idx should be filled by LOV and rest of
2073                  * fields - by client. */
2074                 ff.ff_fid.id = cpu_to_le64(oa->o_fid);
2075                 ff.ff_fid.f_type = cpu_to_le32(oa->o_stripe_idx);
2076                 ff.ff_fid.generation = cpu_to_le32(oa->o_generation);
2077                 ff.ff_objid = cpu_to_le64(oa->o_id);
2078                 ff.ff_group = cpu_to_le64(group);
2079
2080                 CDEBUG(D_INODE, "storing filter fid EA ("LPU64"/%u/%u"
2081                        LPU64"/"LPU64")\n", oa->o_fid, oa->o_stripe_idx,
2082                        oa->o_generation, oa->o_id, group);
2083
2084                 rc = fsfilt_set_md(obd, inode, handle, &ff, sizeof(ff));
2085                 if (rc)
2086                         CERROR("store fid in object failed! rc: %d\n", rc);
2087         } else {
2088                 CDEBUG(D_HA, "OSS object without fid info!\n");
2089         }
2090
2091         RETURN(rc);
2092 }
2093
2094 /* this is called from filter_truncate() until we have filter_punch() */
2095 int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
2096                             struct obdo *oa, struct obd_trans_info *oti)
2097 {
2098         unsigned int orig_ids[MAXQUOTAS] = {0, 0};
2099         struct llog_cookie *fcc = NULL;
2100         struct filter_obd *filter;
2101         int rc, err, locked = 0;
2102         unsigned int ia_valid;
2103         struct inode *inode;
2104         struct iattr iattr;
2105         void *handle;
2106         ENTRY;
2107
2108         LASSERT(dentry != NULL);
2109         LASSERT(!IS_ERR(dentry));
2110
2111         inode = dentry->d_inode;
2112         LASSERT(inode != NULL);
2113
2114         filter = &exp->exp_obd->u.filter;
2115         iattr_from_obdo(&iattr, oa, oa->o_valid);
2116         ia_valid = iattr.ia_valid;
2117
2118         if (oa->o_valid & OBD_MD_FLCOOKIE) {
2119                 OBD_ALLOC(fcc, sizeof(*fcc));
2120                 if (fcc != NULL)
2121                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
2122         }
2123
2124         if (ia_valid & ATTR_SIZE || ia_valid & (ATTR_UID | ATTR_GID)) {
2125                 down(&inode->i_sem);
2126                 locked = 1;
2127         }
2128
2129         /* If the inode still has SUID+SGID bits set (see filter_precreate())
2130          * then we will accept the UID+GID sent by the client during write for
2131          * initializing the ownership of this inode.  We only allow this to
2132          * happen once so clear these bits in setattr. In 2.6 kernels it is
2133          * possible to get ATTR_UID and ATTR_GID separately, so we only clear
2134          * the flags that are actually being set. */
2135         if (ia_valid & (ATTR_UID | ATTR_GID)) {
2136                 CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
2137                        (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
2138
2139                 if ((inode->i_mode & S_ISUID) && (ia_valid & ATTR_UID)) {
2140                         if (!(ia_valid & ATTR_MODE)) {
2141                                 iattr.ia_mode = inode->i_mode;
2142                                 iattr.ia_valid |= ATTR_MODE;
2143                         }
2144                         iattr.ia_mode &= ~S_ISUID;
2145                 }
2146                 if ((inode->i_mode & S_ISGID) && (ia_valid & ATTR_GID)) {
2147                         if (!(iattr.ia_valid & ATTR_MODE)) {
2148                                 iattr.ia_mode = inode->i_mode;
2149                                 iattr.ia_valid |= ATTR_MODE;
2150                         }
2151                         iattr.ia_mode &= ~S_ISGID;
2152                 }
2153
2154                 orig_ids[USRQUOTA] = inode->i_uid;
2155                 orig_ids[GRPQUOTA] = inode->i_gid;
2156                 handle = fsfilt_start_log(exp->exp_obd, inode,
2157                                           FSFILT_OP_SETATTR, oti, 1);
2158
2159                 /* update inode EA only once when inode is suid bit marked. As
2160                  * on 2.6.x UID and GID may be set separately, we check here
2161                  * only one of them to avoid double setting. */
2162                 if (inode->i_mode & S_ISUID)
2163                         filter_update_fidea(exp, inode, handle, oa);
2164         } else {
2165                 handle = fsfilt_start(exp->exp_obd, inode,
2166                                       FSFILT_OP_SETATTR, oti);
2167         }
2168
2169         if (IS_ERR(handle))
2170                 GOTO(out_unlock, rc = PTR_ERR(handle));
2171
2172         if (oa->o_valid & OBD_MD_FLFLAGS) {
2173                 rc = fsfilt_iocontrol(exp->exp_obd, inode, NULL,
2174                                       EXT3_IOC_SETFLAGS, (long)&oa->o_flags);
2175         } else {
2176                 rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
2177                 if (fcc != NULL)
2178                         /* set cancel cookie callback function */
2179                         fsfilt_add_journal_cb(exp->exp_obd, 0, oti ?
2180                                               oti->oti_handle : handle,
2181                                               filter_cancel_cookies_cb,
2182                                               fcc);
2183         }
2184
2185         if (locked) {
2186                 up(&inode->i_sem);
2187                 locked = 0;
2188         }
2189
2190         rc = filter_finish_transno(exp, oti, rc);
2191         
2192         err = fsfilt_commit(exp->exp_obd, inode, handle, 0);
2193         if (err) {
2194                 CERROR("error on commit, err = %d\n", err);
2195                 if (!rc)
2196                         rc = err;
2197         }
2198         EXIT;
2199 out_unlock:
2200         if (locked)
2201                 up(&inode->i_sem);
2202
2203         /* trigger quota release */
2204         if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) {
2205                 unsigned int cur_ids[MAXQUOTAS] = {oa->o_uid, oa->o_gid};
2206                 int rc2 = lquota_adjust(quota_interface, exp->exp_obd, cur_ids,
2207                                         orig_ids, rc, FSFILT_OP_SETATTR);
2208                 CDEBUG(rc2 ? D_ERROR : D_QUOTA, 
2209                        "filter adjust qunit. (rc:%d)\n", rc2);
2210         }
2211         return rc;
2212 }
2213
2214 /* this is called from filter_truncate() until we have filter_punch() */
2215 int filter_setattr(struct obd_export *exp, struct obdo *oa,
2216                    struct lov_stripe_md *md, struct obd_trans_info *oti)
2217 {
2218         struct ldlm_res_id res_id = { .name = { oa->o_id } };
2219         struct ldlm_valblock_ops *ns_lvbo;
2220         struct lvfs_run_ctxt saved;
2221         struct filter_obd *filter;
2222         struct ldlm_resource *res;
2223         struct dentry *dentry;
2224         int rc;
2225         ENTRY;
2226
2227         dentry = __filter_oa2dentry(exp->exp_obd, oa,
2228                                     __FUNCTION__, 1);
2229         if (IS_ERR(dentry))
2230                 RETURN(PTR_ERR(dentry));
2231                         
2232         filter = &exp->exp_obd->u.filter;
2233         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2234         lock_kernel();
2235
2236         /* setting objects attributes (including owner/group) */
2237         rc = filter_setattr_internal(exp, dentry, oa, oti);
2238         if (rc)
2239                 GOTO(out_unlock, rc);
2240
2241         res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
2242                                 res_id, LDLM_EXTENT, 0);
2243         
2244         if (res != NULL) {
2245                 ns_lvbo = res->lr_namespace->ns_lvbo;
2246                 if (ns_lvbo && ns_lvbo->lvbo_update)
2247                         rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
2248                 ldlm_resource_putref(res);
2249         }
2250
2251         oa->o_valid = OBD_MD_FLID;
2252         
2253         /* Quota release need uid/gid info */
2254         obdo_from_inode(oa, dentry->d_inode,
2255                         FILTER_VALID_FLAGS | OBD_MD_FLUID | OBD_MD_FLGID);
2256
2257         EXIT;
2258 out_unlock:
2259         unlock_kernel();
2260         f_dput(dentry);
2261         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2262         return rc;
2263 }
2264
2265 /* XXX identical to osc_unpackmd */
2266 static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2267                            struct lov_mds_md *lmm, int lmm_bytes)
2268 {
2269         int lsm_size;
2270         ENTRY;
2271
2272         if (lmm != NULL) {
2273                 if (lmm_bytes < sizeof (*lmm)) {
2274                         CERROR("lov_mds_md too small: %d, need %d\n",
2275                                lmm_bytes, (int)sizeof(*lmm));
2276                         RETURN(-EINVAL);
2277                 }
2278                 /* XXX LOV_MAGIC etc check? */
2279
2280                 if (lmm->lmm_object_id == cpu_to_le64(0)) {
2281                         CERROR("lov_mds_md: zero lmm_object_id\n");
2282                         RETURN(-EINVAL);
2283                 }
2284         }
2285
2286         lsm_size = lov_stripe_md_size(1);
2287         if (lsmp == NULL)
2288                 RETURN(lsm_size);
2289
2290         if (*lsmp != NULL && lmm == NULL) {
2291                 OBD_FREE(*lsmp, lsm_size);
2292                 *lsmp = NULL;
2293                 RETURN(0);
2294         }
2295
2296         if (*lsmp == NULL) {
2297                 OBD_ALLOC(*lsmp, lsm_size);
2298                 if (*lsmp == NULL)
2299                         RETURN(-ENOMEM);
2300
2301                 loi_init((*lsmp)->lsm_oinfo);
2302         }
2303
2304         if (lmm != NULL) {
2305                 /* XXX zero *lsmp? */
2306                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
2307                 LASSERT((*lsmp)->lsm_object_id);
2308         }
2309
2310         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
2311
2312         RETURN(lsm_size);
2313 }
2314
2315 static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
2316                                       struct filter_obd *filter)
2317 {
2318         struct obdo doa; /* XXX obdo on stack */
2319         __u64 last, id;
2320         ENTRY;
2321         LASSERT(oa);
2322
2323         memset(&doa, 0, sizeof(doa));
2324         if (oa->o_valid & OBD_MD_FLGROUP) {
2325                 doa.o_valid |= OBD_MD_FLGROUP;
2326                 doa.o_gr = oa->o_gr;
2327         } else {
2328                 doa.o_gr = 0;
2329         }
2330         doa.o_mode = S_IFREG;
2331
2332         filter->fo_destroy_in_progress = 1;
2333         down(&filter->fo_create_lock);
2334         if (!filter->fo_destroy_in_progress) {
2335                 CERROR("%s: destroy_in_progress already cleared\n",
2336                         exp->exp_obd->obd_name);
2337                 up(&filter->fo_create_lock);
2338                 EXIT;
2339                 return;
2340         }
2341
2342         last = filter_last_id(filter, &doa);
2343         CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
2344                exp->exp_obd->obd_name, oa->o_id + 1, last);
2345         for (id = oa->o_id + 1; id <= last; id++) {
2346                 doa.o_id = id;
2347                 filter_destroy(exp, &doa, NULL, NULL, NULL);
2348         }
2349
2350         CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
2351                exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
2352
2353         spin_lock(&filter->fo_objidlock);
2354         filter->fo_last_objids[doa.o_gr] = oa->o_id;
2355         spin_unlock(&filter->fo_objidlock);
2356
2357         filter->fo_destroy_in_progress = 0;
2358         up(&filter->fo_create_lock);
2359
2360         EXIT;
2361 }
2362
2363 /* returns a negative error or a nonnegative number of files to create */
2364 static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
2365                                    obd_gr group)
2366 {
2367         struct obd_device *obd = exp->exp_obd;
2368         struct filter_obd *filter = &obd->u.filter;
2369         int diff, rc;
2370         ENTRY;
2371
2372         diff = oa->o_id - filter_last_id(filter, oa);
2373         CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
2374                filter_last_id(filter, oa), diff);
2375
2376         /* delete orphans request */
2377         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2378             (oa->o_flags & OBD_FL_DELORPHAN)) {
2379                 if (diff >= 0)
2380                         RETURN(diff);
2381                 if (-diff > OST_MAX_PRECREATE) {
2382                         CERROR("%s: ignoring bogus orphan destroy request: "
2383                                "obdid "LPU64" last_id "LPU64"\n", obd->obd_name,
2384                                oa->o_id, filter_last_id(filter, oa));
2385                         RETURN(-EINVAL);
2386                 }
2387                 filter_destroy_precreated(exp, oa, filter);
2388                 rc = filter_update_last_objid(obd, group, 0);
2389                 if (rc)
2390                         CERROR("%s: unable to write lastobjid, but orphans"
2391                                "were deleted\n", obd->obd_name);
2392                 RETURN(0);
2393         } else {
2394                 /* only precreate if group == 0 and o_id is specfied */
2395                 if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
2396                     (group != 0 || oa->o_id == 0))
2397                         RETURN(1);
2398
2399                 LASSERTF(diff >= 0,"%s: "LPU64" - "LPU64" = %d\n",obd->obd_name,
2400                          oa->o_id, filter_last_id(filter, oa), diff);
2401                 RETURN(diff);
2402         }
2403 }
2404
2405 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2406                          unsigned long max_age)
2407 {
2408         struct filter_obd *filter = &obd->u.filter;
2409         int blockbits = obd->u.obt.obt_sb->s_blocksize_bits;
2410         int rc;
2411         ENTRY;
2412
2413         /* at least try to account for cached pages.  its still racey and
2414          * might be under-reporting if clients haven't announced their
2415          * caches with brw recently */
2416         spin_lock(&obd->obd_osfs_lock);
2417         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
2418         memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
2419         spin_unlock(&obd->obd_osfs_lock);
2420
2421         CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
2422                " pending "LPU64" free "LPU64" avail "LPU64"\n",
2423                filter->fo_tot_dirty, filter->fo_tot_granted,
2424                filter->fo_tot_pending,
2425                osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
2426
2427         filter_grant_sanity_check(obd, __FUNCTION__);
2428
2429         osfs->os_bavail -= min(osfs->os_bavail, GRANT_FOR_LLOG(obd) +
2430                                ((filter->fo_tot_dirty + filter->fo_tot_pending +
2431                                  osfs->os_bsize - 1) >> blockbits));
2432
2433         /* set EROFS to state field if FS is mounted as RDONLY. The goal is to
2434          * stop creating files on MDS if OST is not good shape to create
2435          * objects.*/
2436         osfs->os_state = (filter->fo_obt.obt_sb->s_flags & MS_RDONLY) ? 
2437                 EROFS : 0;
2438         RETURN(rc);
2439 }
2440
2441 /* We rely on the fact that only one thread will be creating files in a given
2442  * group at a time, which is why we don't need an atomic filter_get_new_id.
2443  * Even if we had that atomic function, the following race would exist:
2444  *
2445  * thread 1: gets id x from filter_next_id
2446  * thread 2: gets id (x + 1) from filter_next_id
2447  * thread 2: creates object (x + 1)
2448  * thread 1: tries to create object x, gets -ENOSPC
2449  */
2450 static int filter_precreate(struct obd_device *obd, struct obdo *oa,
2451                             obd_gr group, int *num)
2452 {
2453         struct dentry *dchild = NULL, *dparent = NULL;
2454         struct filter_obd *filter;
2455         struct obd_statfs *osfs;
2456         int err = 0, rc = 0, recreate_obj = 0, i;
2457         unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3;
2458         __u64 next_id;
2459         void *handle = NULL;
2460         ENTRY;
2461
2462         filter = &obd->u.filter;
2463
2464         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2465             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2466                 recreate_obj = 1;
2467         } else {
2468                 OBD_ALLOC(osfs, sizeof(*osfs));
2469                 if (osfs == NULL)
2470                         RETURN(-ENOMEM);
2471                 rc = filter_statfs(obd, osfs, jiffies - HZ);
2472                 if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
2473                         CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
2474                               osfs->os_bavail<<filter->fo_obt.obt_sb->s_blocksize_bits);
2475                         *num=0;
2476                         rc = -ENOSPC;
2477                 }
2478                 OBD_FREE(osfs, sizeof(*osfs));
2479                 if (rc) {
2480                         RETURN(rc);
2481                 }
2482         }
2483
2484         CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
2485
2486         down(&filter->fo_create_lock);
2487
2488         for (i = 0; i < *num && err == 0; i++) {
2489                 int cleanup_phase = 0;
2490
2491                 if (filter->fo_destroy_in_progress) {
2492                         CWARN("%s: precreate aborted by destroy\n",
2493                               obd->obd_name);
2494                         break;
2495                 }
2496
2497                 if (recreate_obj) {
2498                         __u64 last_id;
2499                         next_id = oa->o_id;
2500                         last_id = filter_last_id(filter, oa);
2501                         if (next_id > last_id) {
2502                                 CERROR("Error: Trying to recreate obj greater"
2503                                        "than last id "LPD64" > "LPD64"\n",
2504                                        next_id, last_id);
2505                                 GOTO(cleanup, rc = -EINVAL);
2506                         }
2507                 } else
2508                         next_id = filter_last_id(filter, oa) + 1;
2509
2510                 CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
2511
2512                 dparent = filter_parent_lock(obd, group, next_id);
2513                 if (IS_ERR(dparent))
2514                         GOTO(cleanup, rc = PTR_ERR(dparent));
2515                 cleanup_phase = 1;
2516
2517                 dchild = filter_fid2dentry(obd, dparent, group, next_id);
2518                 if (IS_ERR(dchild))
2519                         GOTO(cleanup, rc = PTR_ERR(dchild));
2520                 cleanup_phase = 2;
2521
2522                 if (dchild->d_inode != NULL) {
2523                         /* This would only happen if lastobjid was bad on disk*/
2524                         /* Could also happen if recreating missing obj but
2525                          * already exists
2526                          */
2527                         if (recreate_obj) {
2528                                 CERROR("%s: recreating existing object %.*s?\n",
2529                                        obd->obd_name, dchild->d_name.len,
2530                                        dchild->d_name.name);
2531                         } else {
2532                                 CERROR("%s: Serious error: objid %.*s already "
2533                                        "exists; is this filesystem corrupt?\n",
2534                                        obd->obd_name, dchild->d_name.len,
2535                                        dchild->d_name.name);
2536                                 LBUG();
2537                         }
2538                         GOTO(cleanup, rc = -EEXIST);
2539                 }
2540
2541                 handle = fsfilt_start_log(obd, dparent->d_inode,
2542                                           FSFILT_OP_CREATE, NULL, 1);
2543                 if (IS_ERR(handle))
2544                         GOTO(cleanup, rc = PTR_ERR(handle));
2545                 cleanup_phase = 3;
2546
2547                 rc = ll_vfs_create(dparent->d_inode, dchild,
2548                                    S_IFREG |  S_ISUID | S_ISGID | 0666, NULL);
2549                 if (rc) {
2550                         CERROR("create failed rc = %d\n", rc);
2551                         GOTO(cleanup, rc);
2552                 }
2553
2554                 if (!recreate_obj) {
2555                         filter_set_last_id(filter, oa, next_id);
2556                         err = filter_update_last_objid(obd, group, 0);
2557                         if (err)
2558                                 CERROR("unable to write lastobjid "
2559                                        "but file created\n");
2560                 }
2561
2562         cleanup:
2563                 switch(cleanup_phase) {
2564                 case 3:
2565                         err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2566                         if (err) {
2567                                 CERROR("error on commit, err = %d\n", err);
2568                                 if (!rc)
2569                                         rc = err;
2570                         }
2571                 case 2:
2572                         f_dput(dchild);
2573                 case 1:
2574                         filter_parent_unlock(dparent);
2575                 case 0:
2576                         break;
2577                 }
2578
2579                 if (rc)
2580                         break;
2581                 if (time_after(jiffies, enough_time)) {
2582                         CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n",
2583                                obd->obd_name, *num, i);
2584                         break;
2585                 }
2586         }
2587         *num = i;
2588
2589         up(&filter->fo_create_lock);
2590
2591         CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
2592                obd->obd_name, group, filter->fo_last_objids[group]);
2593
2594         CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
2595                obd->obd_name, i);
2596         RETURN(rc);
2597 }
2598
2599 static int filter_create(struct obd_export *exp, struct obdo *oa,
2600                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
2601 {
2602         struct obd_device *obd = NULL;
2603         struct lvfs_run_ctxt saved;
2604         struct lov_stripe_md *lsm = NULL;
2605         obd_gr group = 0;
2606         int rc = 0, diff;
2607         ENTRY;
2608
2609         if (oa->o_valid & OBD_MD_FLGROUP)
2610                 group = oa->o_gr;
2611
2612         CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
2613                group, oa->o_id);
2614         if (ea != NULL) {
2615                 lsm = *ea;
2616                 if (lsm == NULL) {
2617                         rc = obd_alloc_memmd(exp, &lsm);
2618                         if (rc < 0)
2619                                 RETURN(rc);
2620                 }
2621         }
2622
2623         obd = exp->exp_obd;
2624         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2625
2626         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2627             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2628                 if (oa->o_id > filter_last_id(&obd->u.filter, oa)) {
2629                         CERROR("recreate objid "LPU64" > last id "LPU64"\n",
2630                                oa->o_id, filter_last_id(&obd->u.filter, oa));
2631                         rc = -EINVAL;
2632                 } else {
2633                         diff = 1;
2634                         rc = filter_precreate(obd, oa, group, &diff);
2635                 }
2636         } else {
2637                 diff = filter_should_precreate(exp, oa, group);
2638                 if (diff > 0) {
2639                         oa->o_id = filter_last_id(&obd->u.filter, oa);
2640                         rc = filter_precreate(obd, oa, group, &diff);
2641                         oa->o_id = filter_last_id(&obd->u.filter, oa);
2642                         oa->o_valid = OBD_MD_FLID;
2643                 }
2644         }
2645
2646         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2647         if (rc && ea != NULL && *ea != lsm) {
2648                 obd_free_memmd(exp, &lsm);
2649         } else if (rc == 0 && ea != NULL) {
2650                 /* XXX LOV STACKING: the lsm that is passed to us from
2651                  * LOV does not have valid lsm_oinfo data structs, so
2652                  * don't go touching that.  This needs to be fixed in a
2653                  * big way. */
2654                 lsm->lsm_object_id = oa->o_id;
2655                 *ea = lsm;
2656         }
2657
2658         RETURN(rc);
2659 }
2660
2661 int filter_destroy(struct obd_export *exp, struct obdo *oa,
2662                    struct lov_stripe_md *md, struct obd_trans_info *oti,
2663                    struct obd_export *md_exp)
2664 {
2665         unsigned int qcids[MAXQUOTAS] = {0, 0};
2666         struct obd_device *obd;
2667         struct filter_obd *filter;
2668         struct dentry *dchild = NULL, *dparent;
2669         struct lvfs_run_ctxt saved;
2670         void *handle = NULL;
2671         struct llog_cookie *fcc = NULL;
2672         int rc, rc2, cleanup_phase = 0;
2673         obd_gr group = 0;
2674         struct iattr iattr;
2675         ENTRY;
2676
2677         if (oa->o_valid & OBD_MD_FLGROUP)
2678                 group = oa->o_gr;
2679
2680         obd = exp->exp_obd;
2681         filter = &obd->u.filter;
2682
2683         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2684         cleanup_phase = 1;
2685
2686         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
2687         if (IS_ERR(dchild))
2688                 GOTO(cleanup, rc = PTR_ERR(dchild));
2689         cleanup_phase = 2;
2690
2691         if (dchild->d_inode == NULL) {
2692                 CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n",
2693                        oa->o_id);
2694                 /* If object already gone, cancel cookie right now */
2695                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
2696                         fcc = obdo_logcookie(oa);
2697                         llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
2698                                     NULL, 1, fcc, 0);
2699                 }
2700                 GOTO(cleanup, rc = -ENOENT);
2701         }
2702
2703         filter_prepare_destroy(obd, oa->o_id);
2704
2705         /* Our MDC connection is established by the MDS to us */
2706         if (oa->o_valid & OBD_MD_FLCOOKIE) {
2707                 OBD_ALLOC(fcc, sizeof(*fcc));
2708                 if (fcc != NULL)
2709                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
2710         }
2711
2712         /* we're gonna truncate it first in order to avoid possible deadlock:
2713          *      P1                      P2
2714          * open trasaction      open transaction
2715          * down(i_zombie)       down(i_zombie)
2716          *                      restart transaction
2717          * (see BUG 4180) -bzzz
2718          */
2719         down(&dchild->d_inode->i_sem);
2720         handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR,
2721                                   NULL, 1);
2722         if (IS_ERR(handle)) {
2723                 up(&dchild->d_inode->i_sem);
2724                 GOTO(cleanup, rc = PTR_ERR(handle));
2725         }
2726
2727         iattr.ia_valid = ATTR_SIZE;
2728         iattr.ia_size = 0;
2729         rc = fsfilt_setattr(obd, dchild, handle, &iattr, 1);
2730         rc2 = fsfilt_commit(obd, dchild->d_inode, handle, 0);
2731         up(&dchild->d_inode->i_sem);
2732         if (rc)
2733                 GOTO(cleanup, rc);
2734         if (rc2)
2735                 GOTO(cleanup, rc = rc2);
2736
2737         /* We don't actually need to lock the parent until we are unlinking
2738          * here, and not while truncating above.  That avoids holding the
2739          * parent lock for a long time during truncate, which can block other
2740          * threads from doing anything to objects in that directory. bug 7171 */
2741         dparent = filter_parent_lock(obd, group, oa->o_id);
2742         if (IS_ERR(dparent))
2743                 GOTO(cleanup, rc = PTR_ERR(dparent));
2744         cleanup_phase = 3; /* filter_parent_unlock */
2745
2746         down(&dchild->d_inode->i_sem);
2747         handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1);
2748         if (IS_ERR(handle)) {
2749                 up(&dchild->d_inode->i_sem);
2750                 GOTO(cleanup, rc = PTR_ERR(handle));
2751         }
2752         cleanup_phase = 4; /* fsfilt_commit */
2753
2754         /* Quota release need uid/gid of inode */
2755         obdo_from_inode(oa, dchild->d_inode, OBD_MD_FLUID|OBD_MD_FLGID);
2756
2757         /* this drops dchild->d_inode->i_sem unconditionally */
2758         rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
2759
2760         EXIT;
2761 cleanup:
2762         switch(cleanup_phase) {
2763         case 4:
2764                 if (fcc != NULL) {
2765                         fsfilt_add_journal_cb(obd, 0,
2766                                               oti ? oti->oti_handle : handle,
2767                                               filter_cancel_cookies_cb, fcc);
2768                 }
2769                 rc = filter_finish_transno(exp, oti, rc);
2770                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2771                 if (rc2) {
2772                         CERROR("error on commit, err = %d\n", rc2);
2773                         if (!rc)
2774                                 rc = rc2;
2775                 }
2776         case 3:
2777                 filter_parent_unlock(dparent);
2778         case 2:
2779                 f_dput(dchild);
2780         case 1:
2781                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2782                 break;
2783         default:
2784                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2785                 LBUG();
2786         }
2787
2788         /* trigger quota release */
2789         qcids[USRQUOTA] = oa->o_uid;
2790         qcids[GRPQUOTA] = oa->o_gid;
2791         rc2 = lquota_adjust(quota_interface, obd, qcids, NULL, rc,
2792                             FSFILT_OP_UNLINK); 
2793         CDEBUG(rc2 ? D_ERROR : D_QUOTA, 
2794                "filter adjust qunit! (rc:%d)\n", rc2);
2795         return rc;
2796 }
2797
2798 /* NB start and end are used for punch, but not truncate */
2799 static int filter_truncate(struct obd_export *exp, struct obdo *oa,
2800                            struct lov_stripe_md *lsm, obd_off start,
2801                            obd_off end, struct obd_trans_info *oti)
2802 {
2803         int rc;
2804         ENTRY;
2805
2806         if (end != OBD_OBJECT_EOF) {
2807                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2808                        end);
2809                 RETURN(-EFAULT);
2810         }
2811
2812         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = "LPX64
2813                ", o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2814         
2815         oa->o_size = start;
2816         rc = filter_setattr(exp, oa, NULL, oti);
2817         RETURN(rc);
2818 }
2819
2820 static int filter_sync(struct obd_export *exp, struct obdo *oa,
2821                        struct lov_stripe_md *lsm, obd_off start, obd_off end)
2822 {
2823         struct lvfs_run_ctxt saved;
2824         struct filter_obd *filter;
2825         struct dentry *dentry;
2826         struct llog_ctxt *ctxt;
2827         int rc, rc2;
2828         ENTRY;
2829
2830         filter = &exp->exp_obd->u.filter;
2831
2832         /* an objid of zero is taken to mean "sync whole filesystem" */
2833         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
2834                 rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
2835                 /* flush any remaining cancel messages out to the target */
2836                 ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
2837                 llog_sync(ctxt, exp);
2838                 RETURN(rc);
2839         }
2840
2841         dentry = filter_oa2dentry(exp->exp_obd, oa);
2842         if (IS_ERR(dentry))
2843                 RETURN(PTR_ERR(dentry));
2844
2845         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2846
2847         down(&dentry->d_inode->i_sem);
2848         rc = filemap_fdatawrite(dentry->d_inode->i_mapping);
2849         if (rc == 0) {
2850                 /* just any file to grab fsync method - "file" arg unused */
2851                 struct file *file = filter->fo_rcvd_filp;
2852
2853                 if (file->f_op && file->f_op->fsync)
2854                         rc = file->f_op->fsync(NULL, dentry, 1);
2855
2856                 rc2 = filemap_fdatawait(dentry->d_inode->i_mapping);
2857                 if (!rc)
2858                         rc = rc2;
2859         }
2860         up(&dentry->d_inode->i_sem);
2861
2862         oa->o_valid = OBD_MD_FLID;
2863         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
2864
2865         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2866
2867         f_dput(dentry);
2868         RETURN(rc);
2869 }
2870
2871 static int filter_get_info(struct obd_export *exp, __u32 keylen,
2872                            void *key, __u32 *vallen, void *val)
2873 {
2874         struct obd_device *obd;
2875         ENTRY;
2876
2877         obd = class_exp2obd(exp);
2878         if (obd == NULL) {
2879                 CDEBUG(D_IOCTL, "invalid client export %p\n", exp);
2880                 RETURN(-EINVAL);
2881         }
2882
2883         if (keylen == strlen("blocksize") &&
2884             memcmp(key, "blocksize", keylen) == 0) {
2885                 __u32 *blocksize = val;
2886                 *vallen = sizeof(*blocksize);
2887                 *blocksize = obd->u.obt.obt_sb->s_blocksize;
2888                 RETURN(0);
2889         }
2890
2891         if (keylen == strlen("blocksize_bits") &&
2892             memcmp(key, "blocksize_bits", keylen) == 0) {
2893                 __u32 *blocksize_bits = val;
2894                 *vallen = sizeof(*blocksize_bits);
2895                 *blocksize_bits = obd->u.obt.obt_sb->s_blocksize_bits;
2896                 RETURN(0);
2897         }
2898
2899         if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
2900                 obd_id *last_id = val;
2901                 /* FIXME: object groups */
2902                 *last_id = filter_last_id(&obd->u.filter, 0);
2903                 RETURN(0);
2904         }
2905         CDEBUG(D_IOCTL, "invalid key\n");
2906         RETURN(-EINVAL);
2907 }
2908
2909 static int filter_set_info(struct obd_export *exp, __u32 keylen,
2910                            void *key, __u32 vallen, void *val)
2911 {
2912         struct obd_device *obd;
2913         struct llog_ctxt *ctxt;
2914         int rc = 0;
2915         ENTRY;
2916
2917         obd = exp->exp_obd;
2918         if (obd == NULL) {
2919                 CDEBUG(D_IOCTL, "invalid export %p\n", exp);
2920                 RETURN(-EINVAL);
2921         }
2922
2923         if (keylen < strlen(KEY_MDS_CONN) ||
2924             memcmp(key, KEY_MDS_CONN, keylen) != 0)
2925                 RETURN(-EINVAL);
2926
2927         CWARN("%s: received MDS connection from %s\n", obd->obd_name,
2928               obd_export_nid2str(exp));
2929         obd->u.filter.fo_mdc_conn.cookie = exp->exp_handle.h_cookie;
2930
2931         /* setup llog imports */
2932         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
2933         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
2934         
2935         lquota_setinfo(quota_interface, exp, obd);
2936
2937         RETURN(rc);
2938 }
2939
2940 int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
2941                      int len, void *karg, void *uarg)
2942 {
2943         struct obd_device *obd = exp->exp_obd;
2944         struct obd_ioctl_data *data = karg;
2945         int rc = 0;
2946
2947         switch (cmd) {
2948         case OBD_IOC_ABORT_RECOVERY: {
2949                 CERROR("aborting recovery for device %s\n", obd->obd_name);
2950                 target_abort_recovery(obd);
2951                 RETURN(0);
2952         }
2953
2954         case OBD_IOC_SYNC: {
2955                 CDEBUG(D_HA, "syncing ost %s\n", obd->obd_name);
2956                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
2957                 RETURN(rc);
2958         }
2959
2960         case OBD_IOC_SET_READONLY: {
2961                 void *handle;
2962                 struct super_block *sb = obd->u.obt.obt_sb;
2963                 struct inode *inode = sb->s_root->d_inode;
2964                 BDEVNAME_DECLARE_STORAGE(tmp);
2965                 CERROR("*** setting device %s read-only ***\n",
2966                        ll_bdevname(sb, tmp));
2967
2968                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
2969                 if (!IS_ERR(handle))
2970                         rc = fsfilt_commit(obd, inode, handle, 1);
2971
2972                 CDEBUG(D_HA, "syncing ost %s\n", obd->obd_name);
2973                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
2974
2975                 lvfs_set_rdonly(lvfs_sbdev(obd->u.obt.obt_sb));
2976                 RETURN(0);
2977         }
2978
2979         case OBD_IOC_CATLOGLIST: {
2980                 rc = llog_catalog_list(obd, 1, data);
2981                 RETURN(rc);
2982         }
2983
2984         case OBD_IOC_LLOG_CANCEL:
2985         case OBD_IOC_LLOG_REMOVE:
2986         case OBD_IOC_LLOG_INFO:
2987         case OBD_IOC_LLOG_PRINT: {
2988                 /* FIXME to be finished */
2989                 RETURN(-EOPNOTSUPP);
2990 /*
2991                 struct llog_ctxt *ctxt = NULL;
2992
2993                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
2994                 rc = llog_ioctl(ctxt, cmd, data);
2995                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
2996
2997                 RETURN(rc);
2998 */
2999         }
3000
3001
3002         default:
3003                 RETURN(-EINVAL);
3004         }
3005         RETURN(0);
3006 }
3007
3008 static int filter_health_check(struct obd_device *obd)
3009 {
3010         struct filter_obd *filter = &obd->u.filter;
3011         int rc = 0;
3012
3013         /*
3014          * health_check to return 0 on healthy
3015          * and 1 on unhealthy.
3016          */
3017         if (obd->u.obt.obt_sb->s_flags & MS_RDONLY)
3018                 rc = 1;
3019
3020         LASSERT(filter->fo_health_check_filp != NULL);
3021         rc |= !!lvfs_check_io_health(obd, filter->fo_health_check_filp);
3022
3023         return rc;
3024 }
3025
3026 static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
3027                                              void *data)
3028 {
3029         return filter_fid2dentry(data, NULL, gr, id);
3030 }
3031
3032 static struct lvfs_callback_ops filter_lvfs_ops = {
3033         l_fid2dentry:     filter_lvfs_fid2dentry,
3034 };
3035
3036 static struct obd_ops filter_obd_ops = {
3037         .o_owner          = THIS_MODULE,
3038         .o_get_info       = filter_get_info,
3039         .o_set_info       = filter_set_info,
3040         .o_setup          = filter_setup,
3041         .o_precleanup     = filter_precleanup,
3042         .o_cleanup        = filter_cleanup,
3043         .o_connect        = filter_connect,
3044         .o_reconnect      = filter_reconnect,
3045         .o_disconnect     = filter_disconnect,
3046         .o_statfs         = filter_statfs,
3047         .o_getattr        = filter_getattr,
3048         .o_unpackmd       = filter_unpackmd,
3049         .o_create         = filter_create,
3050         .o_setattr        = filter_setattr,
3051         .o_destroy        = filter_destroy,
3052         .o_brw            = filter_brw,
3053         .o_punch          = filter_truncate,
3054         .o_sync           = filter_sync,
3055         .o_preprw         = filter_preprw,
3056         .o_commitrw       = filter_commitrw,
3057         .o_destroy_export = filter_destroy_export,
3058         .o_llog_init      = filter_llog_init,
3059         .o_llog_finish    = filter_llog_finish,
3060         .o_iocontrol      = filter_iocontrol,
3061         .o_health_check   = filter_health_check,
3062 };
3063
3064 static struct obd_ops filter_sanobd_ops = {
3065         .o_owner          = THIS_MODULE,
3066         .o_get_info       = filter_get_info,
3067         .o_set_info       = filter_set_info,
3068         .o_setup          = filter_san_setup,
3069         .o_precleanup     = filter_precleanup,
3070         .o_cleanup        = filter_cleanup,
3071         .o_connect        = filter_connect,
3072         .o_reconnect      = filter_reconnect,
3073         .o_disconnect     = filter_disconnect,
3074         .o_statfs         = filter_statfs,
3075         .o_getattr        = filter_getattr,
3076         .o_unpackmd       = filter_unpackmd,
3077         .o_create         = filter_create,
3078         .o_setattr        = filter_setattr,
3079         .o_destroy        = filter_destroy,
3080         .o_brw            = filter_brw,
3081         .o_punch          = filter_truncate,
3082         .o_sync           = filter_sync,
3083         .o_preprw         = filter_preprw,
3084         .o_commitrw       = filter_commitrw,
3085         .o_san_preprw     = filter_san_preprw,
3086         .o_destroy_export = filter_destroy_export,
3087         .o_llog_init      = filter_llog_init,
3088         .o_llog_finish    = filter_llog_finish,
3089         .o_iocontrol      = filter_iocontrol,
3090 };
3091
3092 quota_interface_t *quota_interface;
3093 extern quota_interface_t filter_quota_interface;
3094
3095 static int __init obdfilter_init(void)
3096 {
3097         struct lprocfs_static_vars lvars;
3098         int rc;
3099
3100         printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
3101
3102         lprocfs_init_vars(filter, &lvars);
3103
3104         OBD_ALLOC(obdfilter_created_scratchpad,
3105                   OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
3106                   sizeof(*obdfilter_created_scratchpad));
3107         if (obdfilter_created_scratchpad == NULL)
3108                 return -ENOMEM;
3109
3110         quota_interface = PORTAL_SYMBOL_GET(filter_quota_interface);
3111         init_obd_quota_ops(quota_interface, &filter_obd_ops);
3112         init_obd_quota_ops(quota_interface, &filter_sanobd_ops);
3113
3114         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
3115                                  LUSTRE_OST_NAME);
3116         if (rc)
3117                 GOTO(out, rc);
3118
3119         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
3120                                  LUSTRE_OSTSAN_NAME);
3121         if (rc) {
3122                 class_unregister_type(LUSTRE_OST_NAME);
3123 out:
3124                 if (quota_interface)
3125                         PORTAL_SYMBOL_PUT(filter_quota_interface);
3126                         
3127                 OBD_FREE(obdfilter_created_scratchpad,
3128                          OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
3129                          sizeof(*obdfilter_created_scratchpad));
3130         } 
3131
3132         return rc;
3133 }
3134
3135 static void __exit obdfilter_exit(void)
3136 {
3137         if (quota_interface)
3138                 PORTAL_SYMBOL_PUT(filter_quota_interface);
3139
3140         class_unregister_type(LUSTRE_OSTSAN_NAME);
3141         class_unregister_type(LUSTRE_OST_NAME);
3142         
3143         OBD_FREE(obdfilter_created_scratchpad,
3144                  OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
3145                  sizeof(*obdfilter_created_scratchpad));
3146 }
3147
3148 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3149 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
3150 MODULE_LICENSE("GPL");
3151
3152 module_init(obdfilter_init);
3153 module_exit(obdfilter_exit);