Whamcloud - gitweb
e2a55dd985abfdf70f5c72fe330b0e2f9880980f
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define DEBUG_SUBSYSTEM S_FILTER
37
38 #include <linux/config.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/dcache.h>
42 #include <linux/init.h>
43 #include <linux/version.h>
44 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
45 # include <linux/mount.h>
46 # include <linux/buffer_head.h>
47 #endif
48
49 #include <linux/obd_class.h>
50 #include <linux/obd_lov.h>
51 #include <linux/lustre_dlm.h>
52 #include <linux/lustre_fsfilt.h>
53 #include <linux/lprocfs_status.h>
54 #include <linux/lustre_log.h>
55 #include <linux/lustre_commit_confd.h>
56 #include <portals/list.h>
57
58 #include "filter_internal.h"
59
60 static struct lvfs_callback_ops filter_lvfs_ops;
61
62 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
63                           struct lov_stripe_md *ea, struct obd_trans_info *);
64
65 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
66                              void *cb_data, int error)
67 {
68         obd_transno_commit_cb(obd, transno, error);
69 }
70
71 /* Assumes caller has already pushed us into the kernel context. */
72 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
73                           int rc)
74 {
75         struct filter_obd *filter = &exp->exp_obd->u.filter;
76         struct filter_export_data *fed = &exp->exp_filter_data;
77         struct filter_client_data *fcd = fed->fed_fcd;
78         __u64 last_rcvd;
79         loff_t off;
80         int err, log_pri = D_HA;
81
82         /* Propagate error code. */
83         if (rc)
84                 RETURN(rc);
85
86         if (!exp->exp_obd->obd_replayable || oti == NULL)
87                 RETURN(rc);
88
89         /* we don't allocate new transnos for replayed requests */
90         if (oti->oti_transno == 0) {
91                 spin_lock(&filter->fo_translock);
92                 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
93                 filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
94                 spin_unlock(&filter->fo_translock);
95                 oti->oti_transno = last_rcvd;
96         } else {
97                 spin_lock(&filter->fo_translock);
98                 last_rcvd = oti->oti_transno;
99                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
100                         filter->fo_fsd->fsd_last_transno =
101                                 cpu_to_le64(last_rcvd);
102                 spin_unlock(&filter->fo_translock);
103         }
104         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
105
106         /* could get xid from oti, if it's ever needed */
107         fcd->fcd_last_xid = 0;
108
109         off = fed->fed_lr_off;
110
111         fsfilt_add_journal_cb(exp->exp_obd, filter->fo_sb, last_rcvd,
112                               oti->oti_handle, filter_commit_cb, NULL);
113
114         err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
115                                   sizeof(*fcd), &off, 0);
116         if (err) {
117                 log_pri = D_ERROR;
118                 if (rc == 0)
119                         rc = err;
120         }
121
122         CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n",
123                last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err);
124
125         RETURN(rc);
126 }
127
128 void f_dput(struct dentry *dentry)
129 {
130         /* Can't go inside filter_ddelete because it can block */
131         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
132                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
133         LASSERT(atomic_read(&dentry->d_count) > 0);
134
135         dput(dentry);
136 }
137
138 /* Add client data to the FILTER.  We use a bitmap to locate a free space
139  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
140  * Otherwise, we have just read the data from the last_rcvd file and
141  * we know its offset. */
142 static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
143                              struct filter_export_data *fed, int cl_idx)
144 {
145         unsigned long *bitmap = filter->fo_last_rcvd_slots;
146         int new_client = (cl_idx == -1);
147         ENTRY;
148
149         LASSERT(bitmap != NULL);
150
151         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
152         if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
153                 RETURN(0);
154
155         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
156          * there's no need for extra complication here
157          */
158         if (new_client) {
159                 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
160         repeat:
161                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
162                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
163                         RETURN(-ENOMEM);
164                 }
165                 if (test_and_set_bit(cl_idx, bitmap)) {
166                         CERROR("FILTER client %d: found bit is set in bitmap\n",
167                                cl_idx);
168                         cl_idx = find_next_zero_bit(bitmap,
169                                                     FILTER_LR_MAX_CLIENTS,
170                                                     cl_idx);
171                         goto repeat;
172                 }
173         } else {
174                 if (test_and_set_bit(cl_idx, bitmap)) {
175                         CERROR("FILTER client %d: bit already set in bitmap!\n",
176                                cl_idx);
177                         LBUG();
178                 }
179         }
180
181         fed->fed_lr_idx = cl_idx;
182         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
183                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
184
185         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
186                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
187
188         if (new_client) {
189                 struct lvfs_run_ctxt saved;
190                 loff_t off = fed->fed_lr_off;
191                 int err;
192                 void *handle;
193
194                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
195                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
196
197                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
198                 /* Transaction needed to fix bug 1403 */
199                 handle = fsfilt_start(obd,
200                                       filter->fo_rcvd_filp->f_dentry->d_inode,
201                                       FSFILT_OP_SETATTR, NULL);
202                 if (IS_ERR(handle)) {
203                         err = PTR_ERR(handle);
204                         CERROR("unable to start transaction: rc %d\n", err);
205                 } else {
206                         err = fsfilt_write_record(obd, filter->fo_rcvd_filp,
207                                                   fed->fed_fcd,
208                                                   sizeof(*fed->fed_fcd),
209                                                   &off, 1);
210                         fsfilt_commit(obd,
211                                       filter->fo_rcvd_filp->f_dentry->d_inode,
212                                       handle, 1);
213                 }
214                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
215
216                 if (err) {
217                         CERROR("error writing %s client idx %u: rc %d\n",
218                                LAST_RCVD, fed->fed_lr_idx, err);
219                         RETURN(err);
220                 }
221         }
222         RETURN(0);
223 }
224
225 static int filter_client_free(struct obd_export *exp, int flags)
226 {
227         struct filter_export_data *fed = &exp->exp_filter_data;
228         struct filter_obd *filter = &exp->exp_obd->u.filter;
229         struct obd_device *obd = exp->exp_obd;
230         struct filter_client_data zero_fcd;
231         struct lvfs_run_ctxt saved;
232         int rc;
233         loff_t off;
234         ENTRY;
235
236         if (fed->fed_fcd == NULL)
237                 RETURN(0);
238
239         if (flags & OBD_OPT_FAILOVER)
240                 GOTO(free, 0);
241
242         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
243         if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
244                 GOTO(free, 0);
245
246         LASSERT(filter->fo_last_rcvd_slots != NULL);
247
248         off = fed->fed_lr_off;
249
250         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
251                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
252
253         /* Clear the bit _after_ zeroing out the client so we don't
254            race with filter_client_add and zero out new clients.*/
255         if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
256                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
257                        fed->fed_lr_idx);
258                 LBUG();
259         }
260
261         memset(&zero_fcd, 0, sizeof zero_fcd);
262         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
263         rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
264                                  sizeof(zero_fcd), &off, 1);
265         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
266
267         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
268                "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
269                fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
270                LAST_RCVD, rc);
271
272         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
273                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
274                        fed->fed_lr_idx);
275                 LBUG();
276         }
277
278 free:
279         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
280
281         RETURN(0);
282 }
283
284 static int filter_free_server_data(struct filter_obd *filter)
285 {
286         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
287         filter->fo_fsd = NULL;
288         OBD_FREE(filter->fo_last_rcvd_slots,
289                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
290         filter->fo_last_rcvd_slots = NULL;
291         return 0;
292 }
293
294 /* assumes caller is already in kernel ctxt */
295 int filter_update_server_data(struct obd_device *obd, struct file *filp,
296                               struct filter_server_data *fsd, int force_sync)
297 {
298         loff_t off = 0;
299         int rc;
300         ENTRY;
301
302         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
303         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
304                le64_to_cpu(fsd->fsd_last_transno));
305         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
306                le64_to_cpu(fsd->fsd_mount_count));
307
308         rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync);
309         if (rc)
310                 CERROR("error writing filter_server_data: rc = %d\n", rc);
311
312         RETURN(rc);
313 }
314
315 int filter_update_last_objid(struct obd_device *obd, obd_gr group,
316                              int force_sync)
317 {
318         struct filter_obd *filter = &obd->u.filter;
319         __u64 tmp;
320         loff_t off = 0;
321         int rc;
322         ENTRY;
323
324         if (filter->fo_last_objid_files[group] == NULL) {
325                 CERROR("Object group "LPU64" not fully setup; not updating "
326                        "last_objid\n", group);
327                 RETURN(0);
328         }
329
330         CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n",
331                group, filter->fo_last_objids[group]);
332
333         tmp = cpu_to_le64(filter->fo_last_objids[group]);
334         rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
335                                  &tmp, sizeof(tmp), &off, force_sync);
336         if (rc)
337                 CERROR("error writing group "LPU64" last objid: rc = %d\n",
338                        group, rc);
339         RETURN(rc);
340 }
341
342 /* assumes caller has already in kernel ctxt */
343 static int filter_init_server_data(struct obd_device *obd, struct file * filp)
344 {
345         struct filter_obd *filter = &obd->u.filter;
346         struct filter_server_data *fsd;
347         struct filter_client_data *fcd = NULL;
348         struct inode *inode = filp->f_dentry->d_inode;
349         unsigned long last_rcvd_size = inode->i_size;
350         __u64 mount_count;
351         int cl_idx;
352         loff_t off = 0;
353         int rc;
354
355         /* ensure padding in the struct is the correct size */
356         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
357                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
358         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
359                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
360
361         OBD_ALLOC(fsd, sizeof(*fsd));
362         if (!fsd)
363                 RETURN(-ENOMEM);
364         filter->fo_fsd = fsd;
365
366         OBD_ALLOC(filter->fo_last_rcvd_slots,
367                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
368         if (filter->fo_last_rcvd_slots == NULL) {
369                 OBD_FREE(fsd, sizeof(*fsd));
370                 RETURN(-ENOMEM);
371         }
372
373         if (last_rcvd_size == 0) {
374                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
375
376                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
377                 fsd->fsd_last_transno = 0;
378                 mount_count = fsd->fsd_mount_count = 0;
379                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
380                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
381                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
382                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
383                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
384         } else {
385                 rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off);
386                 if (rc) {
387                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
388                                LAST_RCVD, rc);
389                         GOTO(err_fsd, rc);
390                 }
391                 if (strcmp(fsd->fsd_uuid, obd->obd_uuid.uuid) != 0) {
392                         CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
393                                obd->obd_uuid.uuid, fsd->fsd_uuid);
394                         GOTO(err_fsd, rc = -EINVAL);
395                 }
396                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
397                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
398         }
399
400         if (fsd->fsd_feature_incompat & ~cpu_to_le32(FILTER_INCOMPAT_SUPP)) {
401                 CERROR("unsupported feature %x\n",
402                        le32_to_cpu(fsd->fsd_feature_incompat) &
403                        ~FILTER_INCOMPAT_SUPP);
404                 GOTO(err_fsd, rc = -EINVAL);
405         }
406         if (fsd->fsd_feature_rocompat & ~cpu_to_le32(FILTER_ROCOMPAT_SUPP)) {
407                 CERROR("read-only feature %x\n",
408                        le32_to_cpu(fsd->fsd_feature_rocompat) &
409                        ~FILTER_ROCOMPAT_SUPP);
410                 /* Do something like remount filesystem read-only */
411                 GOTO(err_fsd, rc = -EINVAL);
412         }
413
414         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
415                obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
416         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
417                obd->obd_name, mount_count + 1);
418         CDEBUG(D_INODE, "%s: server data size: %u\n",
419                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
420         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
421                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
422         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
423                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
424         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
425                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
426         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
427                last_rcvd_size <= le32_to_cpu(fsd->fsd_client_start) ? 0 :
428                (last_rcvd_size - le32_to_cpu(fsd->fsd_client_start)) /
429                 le16_to_cpu(fsd->fsd_client_size));
430
431         if (!obd->obd_replayable) {
432                 CWARN("%s: recovery support OFF\n", obd->obd_name);
433                 GOTO(out, rc = 0);
434         }
435
436         for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start);
437              off < last_rcvd_size; cl_idx++) {
438                 __u64 last_rcvd;
439                 struct obd_export *exp;
440                 struct filter_export_data *fed;
441
442                 if (!fcd) {
443                         OBD_ALLOC(fcd, sizeof(*fcd));
444                         if (!fcd)
445                                 GOTO(err_client, rc = -ENOMEM);
446                 }
447
448                 /* Don't assume off is incremented properly by
449                  * fsfilt_read_record(), in case sizeof(*fcd)
450                  * isn't the same as fsd->fsd_client_size.  */
451                 off = le32_to_cpu(fsd->fsd_client_start) +
452                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
453                 rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
454                 if (rc) {
455                         CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
456                                LAST_RCVD, cl_idx, off, rc);
457                         break; /* read error shouldn't cause startup to fail */
458                 }
459
460                 if (fcd->fcd_uuid[0] == '\0') {
461                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
462                                cl_idx);
463                         continue;
464                 }
465
466                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
467
468                 /* These exports are cleaned up by filter_disconnect(), so they
469                  * need to be set up like real exports as filter_connect() does.
470                  */
471                 exp = class_new_export(obd);
472                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
473                        " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
474                        last_rcvd, le64_to_cpu(fsd->fsd_last_transno));
475                 if (exp == NULL)
476                         GOTO(err_client, rc = -ENOMEM);
477
478                 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
479                        sizeof exp->exp_client_uuid.uuid);
480                 fed = &exp->exp_filter_data;
481                 fed->fed_fcd = fcd;
482                 filter_client_add(obd, filter, fed, cl_idx);
483                 /* create helper if export init gets more complex */
484                 spin_lock_init(&fed->fed_lock);
485
486                 fcd = NULL;
487                 obd->obd_recoverable_clients++;
488                 class_export_put(exp);
489
490                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
491                        cl_idx, last_rcvd);
492
493                 if (last_rcvd > le64_to_cpu(fsd->fsd_last_transno))
494                         fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
495
496         }
497
498         obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno);
499
500         if (obd->obd_recoverable_clients) {
501                 CWARN("RECOVERY: %d recoverable clients, last_rcvd "
502                       LPU64"\n", obd->obd_recoverable_clients,
503                       le64_to_cpu(fsd->fsd_last_transno));
504                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
505                 obd->obd_recovering = 1;
506         }
507
508         if (fcd)
509                 OBD_FREE(fcd, sizeof(*fcd));
510
511 out:
512         filter->fo_mount_count = mount_count + 1;
513         fsd->fsd_mount_count = cpu_to_le64(filter->fo_mount_count);
514
515         /* save it, so mount count and last_transno is current */
516         rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
517
518         RETURN(rc);
519
520 err_client:
521         class_disconnect_exports(obd, 0);
522 err_fsd:
523         filter_free_server_data(filter);
524         RETURN(rc);
525 }
526
527 static int filter_cleanup_groups(struct obd_device *obd)
528 {
529         struct filter_obd *filter = &obd->u.filter;
530         struct dentry *dentry;
531         int i, k;
532         ENTRY;
533
534         for (i = 0; i < filter->fo_group_count; i++) {
535                 if (filter->fo_subdirs != NULL) {
536                         for (k = 0; k < filter->fo_subdir_count; k++) {
537                                 dentry = filter->fo_subdirs[i].dentry[k];
538                                 if (dentry == NULL)
539                                         continue;
540                                 f_dput(dentry);
541                                 filter->fo_subdirs[i].dentry[k] = NULL;
542                         }
543                 }
544                 if (filter->fo_last_objid_files[i] != NULL) {
545                         filp_close(filter->fo_last_objid_files[i], 0);
546                         filter->fo_last_objid_files[i] = NULL;
547                 }
548                 if (filter->fo_groups[i] != NULL) {
549                         dput(filter->fo_groups[i]);
550                         filter->fo_groups[i] = NULL;
551                 }
552         }
553         if (filter->fo_subdirs != NULL)
554                 OBD_FREE(filter->fo_subdirs,
555                          filter->fo_group_count * sizeof(*filter->fo_subdirs));
556         if (filter->fo_groups != NULL)
557                 OBD_FREE(filter->fo_groups,
558                          filter->fo_group_count * sizeof(*filter->fo_groups));
559         if (filter->fo_last_objids != NULL)
560                 OBD_FREE(filter->fo_last_objids,
561                          filter->fo_group_count * sizeof(__u64));
562         if (filter->fo_last_objid_files != NULL)
563                 OBD_FREE(filter->fo_last_objid_files,
564                          filter->fo_group_count * sizeof(struct file *));
565         f_dput(filter->fo_dentry_O);
566         RETURN(0);
567 }
568
569 static int filter_read_group_internal(struct obd_device *obd, int group,
570                                       int create)
571 {
572         struct filter_obd *filter = &obd->u.filter;
573         __u64 *new_objids = NULL;
574         struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs;
575         struct dentry **new_groups = NULL;
576         struct file **new_files = NULL;
577         struct dentry *dentry;
578         struct file *filp;
579         int old_count = filter->fo_group_count, rc, stage = 0, i;
580         char name[25];
581         __u64 last_objid;
582         loff_t off = 0;
583
584         snprintf(name, 24, "%d", group);
585         name[24] = '\0';
586
587         if (!create) {
588                 dentry = ll_lookup_one_len(name, filter->fo_dentry_O,
589                                            strlen(name));
590                 if (IS_ERR(dentry)) {
591                         CERROR("Cannot lookup expected object group %d: %ld\n",
592                                group, PTR_ERR(dentry));
593                         RETURN(PTR_ERR(dentry));
594                 }
595         } else {
596                 dentry = simple_mkdir(filter->fo_dentry_O, name, 0700, 1);
597                 if (IS_ERR(dentry)) {
598                         CERROR("cannot lookup/create O/%s: rc = %ld\n", name,
599                                PTR_ERR(dentry));
600                         RETURN(PTR_ERR(dentry));
601                 }
602         }
603         stage = 1;
604
605         snprintf(name, 24, "O/%d/LAST_ID", group);
606         name[24] = '\0';
607         filp = filp_open(name, O_CREAT | O_RDWR, 0700);
608         if (IS_ERR(filp)) {
609                 CERROR("cannot create %s: rc = %ld\n", name, PTR_ERR(filp));
610                 GOTO(cleanup, rc = PTR_ERR(filp));
611         }
612         stage = 2;
613
614         rc = fsfilt_read_record(obd, filp, &last_objid, sizeof(__u64), &off);
615         if (rc) {
616                 CDEBUG(D_INODE, "error reading %s: rc %d\n", name, rc);
617                 GOTO(cleanup, rc);
618         }
619
620         if (filter->fo_subdir_count) {
621                 OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs));
622                 if (tmp_subdirs == NULL)
623                         GOTO(cleanup, rc = -ENOMEM);
624                 stage = 3;
625
626                 for (i = 0; i < filter->fo_subdir_count; i++) {
627                         char dir[20];
628                         snprintf(dir, sizeof(dir), "d%u", i);
629
630                         tmp_subdirs->dentry[i] =
631                                 simple_mkdir(dentry, dir, 0700, 1);
632                         if (IS_ERR(tmp_subdirs->dentry[i])) {
633                                 rc = PTR_ERR(tmp_subdirs->dentry[i]);
634                                 CERROR("can't lookup/create O/%d/%s: rc = %d\n",
635                                        group, dir, rc);
636                                 GOTO(cleanup, rc);
637                         }
638                         CDEBUG(D_INODE, "got/created O/%d/%s: %p\n", group, dir,
639                                tmp_subdirs->dentry[i]);
640                 }
641         }
642
643         /* 'group' is an index; we need an array of length 'group + 1' */
644         if (group + 1 > old_count) {
645                 int len = group + 1;
646                 OBD_ALLOC(new_objids, len * sizeof(*new_objids));
647                 OBD_ALLOC(new_subdirs, len * sizeof(*new_subdirs));
648                 OBD_ALLOC(new_groups, len * sizeof(*new_groups));
649                 OBD_ALLOC(new_files, len * sizeof(*new_files));
650                 stage = 4;
651                 if (new_objids == NULL || new_subdirs == NULL ||
652                     new_groups == NULL || new_files == NULL)
653                         GOTO(cleanup, rc = -ENOMEM);
654
655                 memcpy(new_objids, filter->fo_last_objids,
656                        old_count * sizeof(*new_objids));
657                 memcpy(new_subdirs, filter->fo_subdirs,
658                        old_count * sizeof(*new_subdirs));
659                 memcpy(new_groups, filter->fo_groups,
660                        old_count * sizeof(*new_groups));
661                 memcpy(new_files, filter->fo_last_objid_files,
662                        old_count * sizeof(*new_files));
663
664                 if (old_count) {
665                         OBD_FREE(filter->fo_last_objids,
666                                  old_count * sizeof(*new_objids));
667                         OBD_FREE(filter->fo_subdirs,
668                                  old_count * sizeof(*new_subdirs));
669                         OBD_FREE(filter->fo_groups,
670                                  old_count * sizeof(*new_groups));
671                         OBD_FREE(filter->fo_last_objid_files,
672                                  old_count * sizeof(*new_files));
673                 }
674                 filter->fo_last_objids = new_objids;
675                 filter->fo_subdirs = new_subdirs;
676                 filter->fo_groups = new_groups;
677                 filter->fo_last_objid_files = new_files;
678                 filter->fo_group_count = len;
679         }
680
681         filter->fo_groups[group] = dentry;
682         filter->fo_last_objid_files[group] = filp;
683         if (filter->fo_subdir_count) {
684                 filter->fo_subdirs[group] = *tmp_subdirs;
685                 OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
686         }
687
688         if (filp->f_dentry->d_inode->i_size == 0) {
689                 filter->fo_last_objids[group] = FILTER_INIT_OBJID;
690                 RETURN(0);
691         }
692
693         filter->fo_last_objids[group] = le64_to_cpu(last_objid);
694         CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
695                obd->obd_name, group, last_objid);
696         RETURN(0);
697  cleanup:
698         switch (stage) {
699         case 4:
700                 if (new_objids != NULL)
701                         OBD_FREE(new_objids, group * sizeof(*new_objids));
702                 if (new_subdirs != NULL)
703                         OBD_FREE(new_subdirs, group * sizeof(*new_subdirs));
704                 if (new_groups != NULL)
705                         OBD_FREE(new_groups, group * sizeof(*new_groups));
706                 if (new_files != NULL)
707                         OBD_FREE(new_files, group * sizeof(*new_files));
708         case 3:
709                 if (filter->fo_subdir_count) {
710                         for (i = 0; i < filter->fo_subdir_count; i++) {
711                                 if (tmp_subdirs->dentry[i] != NULL)
712                                         dput(tmp_subdirs->dentry[i]);
713                         }
714                         OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
715                 }
716         case 2:
717                 filp_close(filp, 0);
718         case 1:
719                 dput(dentry);
720         }
721         RETURN(rc);
722 }
723
724 static int filter_read_groups(struct obd_device *obd, int last_group,
725                               int create)
726 {
727         struct filter_obd *filter = &obd->u.filter;
728         int old_count = filter->fo_group_count, group = old_count, rc = 0;
729
730         for (group = old_count; group <= last_group; group++) {
731                 if (group == 0)
732                         continue; /* no group zero */
733
734                 rc = filter_read_group_internal(obd, group, create);
735                 if (rc != 0)
736                         break;
737         }
738         return rc;
739 }
740
741 static int filter_prep_groups(struct obd_device *obd)
742 {
743         struct filter_obd *filter = &obd->u.filter;
744         struct dentry *dentry, *O_dentry;
745         int rc = 0, cleanup_phase = 0;
746         ENTRY;
747
748         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
749         CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
750         if (IS_ERR(O_dentry)) {
751                 rc = PTR_ERR(O_dentry);
752                 CERROR("cannot open/create O: rc = %d\n", rc);
753                 GOTO(cleanup, rc);
754         }
755         filter->fo_dentry_O = O_dentry;
756         cleanup_phase = 1; /* O_dentry */
757
758         /* Lookup "R" to tell if we're on an old OST FS and need to convert
759          * from O/R/<dir>/<objid> to O/0/<dir>/<objid>.  This can be removed
760          * some time post 1.0 when all old-style OSTs have converted along
761          * with the init_objid hack. */
762         dentry = ll_lookup_one_len("R", O_dentry, 1);
763         if (IS_ERR(dentry))
764                 GOTO(cleanup, rc = PTR_ERR(dentry));
765         if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
766                 struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1);
767                 ENTRY;
768
769                 CWARN("converting OST to new object layout\n");
770                 if (IS_ERR(O0_dentry)) {
771                         rc = PTR_ERR(O0_dentry);
772                         CERROR("error looking up O/0: rc %d\n", rc);
773                         GOTO(cleanup_R, rc);
774                 }
775
776                 if (O0_dentry->d_inode) {
777                         CERROR("Both O/R and O/0 exist. Fix manually.\n");
778                         GOTO(cleanup_O0, rc = -EEXIST);
779                 }
780
781                 down(&O_dentry->d_inode->i_sem);
782                 rc = vfs_rename(O_dentry->d_inode, dentry,
783                                 O_dentry->d_inode, O0_dentry);
784                 up(&O_dentry->d_inode->i_sem);
785
786                 if (rc) {
787                         CERROR("error renaming O/R to O/0: rc %d\n", rc);
788                         GOTO(cleanup_O0, rc);
789                 }
790                 filter->fo_fsd->fsd_feature_incompat |=
791                         cpu_to_le32(FILTER_INCOMPAT_GROUPS);
792                 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
793                                                filter->fo_fsd, 1);
794                 GOTO(cleanup_O0, rc);
795
796         cleanup_O0:
797                 f_dput(O0_dentry);
798         cleanup_R:
799                 f_dput(dentry);
800                 if (rc)
801                         GOTO(cleanup, rc);
802         } else {
803                 f_dput(dentry);
804         }
805
806         cleanup_phase = 2; /* groups */
807
808         /* Group 0 is no longer a legal group, to catch uninitialized IDs */
809 #define FILTER_MIN_GROUPS 3
810         rc = filter_read_groups(obd, FILTER_MIN_GROUPS, 1);
811         if (rc)
812                 GOTO(cleanup, rc);
813
814         RETURN(0);
815
816  cleanup:
817         switch (cleanup_phase) {
818         case 2:
819                 filter_cleanup_groups(obd);
820         case 1:
821                 f_dput(filter->fo_dentry_O);
822                 filter->fo_dentry_O = NULL;
823         default:
824                 break;
825         }
826         return rc;
827 }
828
829 /* setup the object store with correct subdirectories */
830 static int filter_prep(struct obd_device *obd)
831 {
832         struct lvfs_run_ctxt saved;
833         struct filter_obd *filter = &obd->u.filter;
834         struct file *file;
835         struct inode *inode;
836         int rc = 0;
837         ENTRY;
838
839         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
840         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
841         if (!file || IS_ERR(file)) {
842                 rc = PTR_ERR(file);
843                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
844                        LAST_RCVD, rc);
845                 GOTO(out, rc);
846         }
847
848         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
849                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
850                        file->f_dentry->d_inode->i_mode);
851                 GOTO(err_filp, rc = -ENOENT);
852         }
853
854         /* steal operations */
855         inode = file->f_dentry->d_inode;
856         filter->fo_fop = file->f_op;
857         filter->fo_iop = inode->i_op;
858         filter->fo_aops = inode->i_mapping->a_ops;
859
860         rc = filter_init_server_data(obd, file);
861         if (rc) {
862                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
863                 GOTO(err_filp, rc);
864         }
865         filter->fo_rcvd_filp = file;
866
867         rc = filter_prep_groups(obd);
868         if (rc)
869                 GOTO(err_server_data, rc);
870
871  out:
872         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
873
874         return(rc);
875
876  err_server_data:
877         //class_disconnect_exports(obd, 0);
878         filter_free_server_data(filter);
879  err_filp:
880         if (filp_close(file, 0))
881                 CERROR("can't close %s after error\n", LAST_RCVD);
882         filter->fo_rcvd_filp = NULL;
883         goto out;
884 }
885
886 /* cleanup the filter: write last used object id to status file */
887 static void filter_post(struct obd_device *obd)
888 {
889         struct lvfs_run_ctxt saved;
890         struct filter_obd *filter = &obd->u.filter;
891         int rc, i;
892
893         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
894          * best to start a transaction with h_sync, because we removed this
895          * from lastobjid */
896
897         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
898         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
899                                        filter->fo_fsd, 0);
900         if (rc)
901                 CERROR("error writing server data: rc = %d\n", rc);
902
903         for (i = 1; i < filter->fo_group_count; i++) {
904                 rc = filter_update_last_objid(obd, i,
905                                              (i == filter->fo_group_count - 1));
906                 if (rc)
907                         CERROR("error writing group %d lastobjid: rc = %d\n",
908                                i, rc);
909         }
910
911         rc = filp_close(filter->fo_rcvd_filp, 0);
912         filter->fo_rcvd_filp = NULL;
913         if (rc)
914                 CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
915
916         filter_cleanup_groups(obd);
917         filter_free_server_data(filter);
918         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
919 }
920
921 static void filter_set_last_id(struct filter_obd *filter, int group, obd_id id)
922 {
923         LASSERT(filter->fo_fsd != NULL);
924         LASSERT(group > 0);
925         LASSERT(group < filter->fo_group_count);
926
927         spin_lock(&filter->fo_objidlock);
928         filter->fo_last_objids[group] = id;
929         spin_unlock(&filter->fo_objidlock);
930 }
931
932 __u64 filter_last_id(struct filter_obd *filter, int group)
933 {
934         obd_id id;
935         LASSERT(filter->fo_fsd != NULL);
936         LASSERT(group > 0);
937         LASSERT(group < filter->fo_group_count);
938
939         spin_lock(&filter->fo_objidlock);
940         id = filter->fo_last_objids[group];
941         spin_unlock(&filter->fo_objidlock);
942
943         return id;
944 }
945
946 /* direct cut-n-paste of mds_blocking_ast() */
947 static int filter_blocking_ast(struct ldlm_lock *lock,
948                                struct ldlm_lock_desc *desc,
949                                void *data, int flag)
950 {
951         int do_ast;
952         ENTRY;
953
954         if (flag == LDLM_CB_CANCELING) {
955                 /* Don't need to do anything here. */
956                 RETURN(0);
957         }
958
959         /* XXX layering violation!  -phil */
960         l_lock(&lock->l_resource->lr_namespace->ns_lock);
961         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
962          * such that filter_blocking_ast is called just before l_i_p takes the
963          * ns_lock, then by the time we get the lock, we might not be the
964          * correct blocking function anymore.  So check, and return early, if
965          * so. */
966         if (lock->l_blocking_ast != filter_blocking_ast) {
967                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
968                 RETURN(0);
969         }
970
971         lock->l_flags |= LDLM_FL_CBPENDING;
972         do_ast = (!lock->l_readers && !lock->l_writers);
973         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
974
975         if (do_ast) {
976                 struct lustre_handle lockh;
977                 int rc;
978
979                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
980                 ldlm_lock2handle(lock, &lockh);
981                 rc = ldlm_cli_cancel(&lockh);
982                 if (rc < 0)
983                         CERROR("ldlm_cli_cancel: %d\n", rc);
984         } else {
985                 LDLM_DEBUG(lock, "Lock still has references, will be "
986                            "cancelled later");
987         }
988         RETURN(0);
989 }
990
991 static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
992 {
993         down(&dparent->d_inode->i_sem);
994         return 0;
995 }
996
997 /* We never dget the object parent, so DON'T dput it either */
998 static void filter_parent_unlock(struct dentry *dparent)
999 {
1000         up(&dparent->d_inode->i_sem);
1001 }
1002
1003 /* We never dget the object parent, so DON'T dput it either */
1004 struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
1005 {
1006         struct filter_obd *filter = &obd->u.filter;
1007         LASSERT(group < filter->fo_group_count);
1008         LASSERT(group > 0);
1009
1010         if (filter->fo_subdir_count == 0)
1011                 return filter->fo_groups[group];
1012
1013         return filter->fo_subdirs[group].dentry[objid & (filter->fo_subdir_count - 1)];
1014 }
1015
1016 /* We never dget the object parent, so DON'T dput it either */
1017 struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
1018                                   obd_id objid)
1019 {
1020         unsigned long now = jiffies;
1021         struct dentry *dparent = filter_parent(obd, group, objid);
1022         int rc;
1023
1024         if (IS_ERR(dparent))
1025                 return dparent;
1026
1027         LASSERT(dparent);
1028         LASSERT(dparent->d_inode);
1029
1030         rc = filter_lock_dentry(obd, dparent);
1031         if (time_after(jiffies, now + 15 * HZ))
1032                 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
1033         return rc ? ERR_PTR(rc) : dparent;
1034 }
1035
1036 /* How to get files, dentries, inodes from object id's.
1037  *
1038  * If dir_dentry is passed, the caller has already locked the parent
1039  * appropriately for this operation (normally a write lock).  If
1040  * dir_dentry is NULL, we do a read lock while we do the lookup to
1041  * avoid races with create/destroy and such changing the directory
1042  * internal to the filesystem code. */
1043 struct dentry *filter_fid2dentry(struct obd_device *obd,
1044                                  struct dentry *dir_dentry,
1045                                  obd_gr group, obd_id id)
1046 {
1047         struct dentry *dparent = dir_dentry;
1048         struct dentry *dchild;
1049         char name[32];
1050         int len;
1051         ENTRY;
1052
1053         if (id == 0) {
1054                 CERROR("fatal: invalid object id 0\n");
1055                 RETURN(ERR_PTR(-ESTALE));
1056         }
1057
1058         len = sprintf(name, LPU64, id);
1059         if (dir_dentry == NULL) {
1060                 dparent = filter_parent_lock(obd, group, id);
1061                 if (IS_ERR(dparent))
1062                         RETURN(dparent);
1063         }
1064         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
1065                dparent->d_name.len, dparent->d_name.name, name);
1066         dchild = /*ll_*/lookup_one_len(name, dparent, len);
1067         if (dir_dentry == NULL)
1068                 filter_parent_unlock(dparent);
1069         if (IS_ERR(dchild)) {
1070                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
1071                 RETURN(dchild);
1072         }
1073
1074         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
1075                name, dchild, atomic_read(&dchild->d_count));
1076
1077         LASSERT(atomic_read(&dchild->d_count) > 0);
1078
1079         RETURN(dchild);
1080 }
1081
1082 static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
1083                                   obd_id group)
1084 {
1085         struct lustre_handle lockh;
1086         int flags = LDLM_AST_DISCARD_DATA, rc;
1087         struct ldlm_res_id res_id = { .name = { objid, 0, group, 0 } };
1088         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1089
1090         ENTRY;
1091         /* Tell the clients that the object is gone now and that they should
1092          * throw away any cached pages. */
1093         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
1094                               LDLM_EXTENT, &policy, LCK_PW,
1095                               &flags, filter_blocking_ast, ldlm_completion_ast,
1096                               NULL, NULL, NULL, 0, NULL, &lockh);
1097
1098         /* We only care about the side-effects, just drop the lock. */
1099         if (rc == ELDLM_OK)
1100                 ldlm_lock_decref(&lockh, LCK_PW);
1101
1102         RETURN(rc);
1103 }
1104
1105 /* Caller must hold LCK_PW on parent and push us into kernel context.
1106  * Caller is also required to ensure that dchild->d_inode exists. */
1107 static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
1108                                    struct dentry *dparent,
1109                                    struct dentry *dchild)
1110 {
1111         struct inode *inode = dchild->d_inode;
1112         int rc;
1113         ENTRY;
1114
1115         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1116                 CERROR("destroying objid %*s nlink = %lu, count = %d\n",
1117                        dchild->d_name.len, dchild->d_name.name,
1118                        (unsigned long)inode->i_nlink,
1119                        atomic_read(&inode->i_count));
1120         }
1121
1122         rc = vfs_unlink(dparent->d_inode, dchild);
1123
1124         if (rc)
1125                 CERROR("error unlinking objid %*s: rc %d\n",
1126                        dchild->d_name.len, dchild->d_name.name, rc);
1127
1128         RETURN(rc);
1129 }
1130
1131 static int filter_intent_policy(struct ldlm_namespace *ns,
1132                                 struct ldlm_lock **lockp, void *req_cookie,
1133                                 ldlm_mode_t mode, int flags, void *data)
1134 {
1135         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1136         struct ptlrpc_request *req = req_cookie;
1137         struct ldlm_lock *lock = *lockp, *l = NULL;
1138         struct ldlm_resource *res = lock->l_resource;
1139         ldlm_processing_policy policy;
1140         struct ost_lvb *res_lvb, *reply_lvb;
1141         struct list_head *tmp;
1142         ldlm_error_t err;
1143         int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply),
1144                                             sizeof(struct ost_lvb) };
1145         ENTRY;
1146
1147         policy = ldlm_get_processing_policy(res);
1148         LASSERT(policy != NULL);
1149         LASSERT(req != NULL);
1150
1151         rc = lustre_pack_reply(req, 2, repsize, NULL);
1152         if (rc)
1153                 RETURN(req->rq_status = rc);
1154
1155         reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
1156         LASSERT(reply_lvb != NULL);
1157
1158         //fixup_handle_for_resent_req(req, lock, &lockh);
1159
1160         /* If we grant any lock at all, it will be a whole-file read lock.
1161          * Call the extent policy function to see if our request can be
1162          * granted, or is blocked. */
1163         lock->l_policy_data.l_extent.start = 0;
1164         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
1165         lock->l_req_mode = LCK_PR;
1166
1167         l_lock(&res->lr_namespace->ns_lock);
1168
1169         res->lr_tmp = &rpc_list;
1170         rc = policy(lock, &tmpflags, 0, &err);
1171         res->lr_tmp = NULL;
1172
1173         /* FIXME: we should change the policy function slightly, to not make
1174          * this list at all, since we just turn around and free it */
1175         while (!list_empty(&rpc_list)) {
1176                 struct ldlm_ast_work *w =
1177                         list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
1178                 list_del(&w->w_list);
1179                 LDLM_LOCK_PUT(w->w_lock);
1180                 OBD_FREE(w, sizeof(*w));
1181         }
1182
1183         if (rc == LDLM_ITER_CONTINUE) {
1184                 /* The lock met with no resistance; we're finished. */
1185                 l_unlock(&res->lr_namespace->ns_lock);
1186                 RETURN(ELDLM_LOCK_REPLACED);
1187         }
1188
1189         /* Do not grant any lock, but instead send GL callbacks.  The extent
1190          * policy nicely created a list of all PW locks for us.  We will choose
1191          * the highest of those which are larger than the size in the LVB, if
1192          * any, and perform a glimpse callback. */
1193         down(&res->lr_lvb_sem);
1194         res_lvb = res->lr_lvb_data;
1195         LASSERT(res_lvb != NULL);
1196         reply_lvb->lvb_size = res_lvb->lvb_size;
1197         reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
1198         up(&res->lr_lvb_sem);
1199
1200         list_for_each(tmp, &res->lr_granted) {
1201                 struct ldlm_lock *tmplock =
1202                         list_entry(tmp, struct ldlm_lock, l_res_link);
1203
1204                 if (tmplock->l_granted_mode == LCK_PR)
1205                         continue;
1206
1207                 if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
1208                         continue;
1209
1210                 if (l == NULL) {
1211                         l = LDLM_LOCK_GET(tmplock);
1212                         continue;
1213                 }
1214
1215                 if (l->l_policy_data.l_extent.start >
1216                     tmplock->l_policy_data.l_extent.start)
1217                         continue;
1218
1219                 LDLM_LOCK_PUT(l);
1220                 l = LDLM_LOCK_GET(tmplock);
1221         }
1222         l_unlock(&res->lr_namespace->ns_lock);
1223
1224         /* There were no PW locks beyond the size in the LVB; finished. */
1225         if (l == NULL)
1226                 RETURN(ELDLM_LOCK_ABORTED);
1227
1228         LASSERT(l->l_glimpse_ast != NULL);
1229         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
1230         if (rc != 0 && res->lr_namespace->ns_lvbo &&
1231             res->lr_namespace->ns_lvbo->lvbo_update) {
1232                 res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
1233         }
1234
1235         down(&res->lr_lvb_sem);
1236         reply_lvb->lvb_size = res_lvb->lvb_size;
1237         reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
1238         up(&res->lr_lvb_sem);
1239
1240         LDLM_LOCK_PUT(l);
1241
1242         RETURN(ELDLM_LOCK_ABORTED);
1243 }
1244
1245 static int filter_post_fs_cleanup(struct obd_device *obd)
1246 {
1247         int rc = 0;
1248
1249         rc = fsfilt_post_cleanup(obd);
1250
1251         RETURN(rc);
1252 }
1253
1254 static int filter_group_set_kml_flags(struct obd_device *obd, int group)
1255 {
1256         struct filter_obd *filter = &obd->u.filter;
1257         int rc = 0, i = 0;
1258         ENTRY;        
1259         
1260         /* zero group is not longer valid. */
1261         if (group== 0)
1262                 RETURN(rc); 
1263         for (i = 0; i < filter->fo_subdir_count; i++) {
1264                 struct dentry *dentry;
1265                 dentry = (filter->fo_subdirs + group)->dentry[i];
1266                 rc = fsfilt_set_kml_flags(obd, dentry->d_inode);
1267                 if (rc)
1268                         RETURN(rc);
1269         }
1270         RETURN(rc);
1271 }
1272 static int filter_post_fs_setup(struct obd_device *obd)
1273 {
1274         struct filter_obd *filter = &obd->u.filter;
1275         int rc = 0, j = 0;
1276         struct llog_ctxt *ctxt = NULL;
1277
1278         rc = fsfilt_post_setup(obd);
1279         if (rc)
1280                 RETURN(rc);
1281         
1282         for (j = 0; j < filter->fo_group_count; j++) {
1283                 rc = filter_group_set_kml_flags(obd, j);
1284                 if (rc)
1285                         return rc;
1286         } 
1287
1288         fsfilt_get_reint_log_ctxt(obd, filter->fo_sb, &ctxt);
1289         if (ctxt) {
1290                 ctxt->loc_obd = obd;
1291                 ctxt->loc_idx = LLOG_REINT_ORIG_CTXT;
1292                 obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT] = ctxt;
1293         }
1294         fsfilt_set_ost_flags(obd, filter->fo_sb);
1295         return rc;
1296 }
1297
1298 /* mount the file system (secretly) */
1299 int filter_common_setup(struct obd_device *obd, obd_count len,
1300                         void *buf, char *option)
1301 {
1302         struct lustre_cfg* lcfg = buf;
1303         struct filter_obd *filter = &obd->u.filter;
1304         struct vfsmount *mnt;
1305         char name[32] = "CATLIST";
1306         int rc = 0;
1307         ENTRY;
1308
1309         dev_clear_rdonly(2);
1310
1311         if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2)
1312                 RETURN(-EINVAL);
1313
1314         obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2);
1315         if (IS_ERR(obd->obd_fsops))
1316                 RETURN(PTR_ERR(obd->obd_fsops));
1317
1318         mnt = do_kern_mount(lcfg->lcfg_inlbuf2, MS_NOATIME | MS_NODIRATIME,
1319                             lcfg->lcfg_inlbuf1, option);
1320         rc = PTR_ERR(mnt);
1321         if (IS_ERR(mnt))
1322                 GOTO(err_ops, rc);
1323
1324         if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) {
1325                 if (*lcfg->lcfg_inlbuf3 == 'f') {
1326                         obd->obd_replayable = 1;
1327                         obd_sync_filter = 1;
1328                         CWARN("%s: recovery enabled\n", obd->obd_name);
1329                 } else {
1330                         if (*lcfg->lcfg_inlbuf3 != 'n') {
1331                                 CERROR("unrecognised flag '%c'\n",
1332                                        *lcfg->lcfg_inlbuf3);
1333                         }
1334                         // XXX Robert? Why do we get errors here
1335                         // GOTO(err_mntput, rc = -EINVAL);
1336                 }
1337         }
1338
1339         filter->fo_vfsmnt = mnt;
1340         filter->fo_sb = mnt->mnt_sb;
1341         filter->fo_fstype = mnt->mnt_sb->s_type->name;
1342         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1343
1344         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1345         obd->obd_lvfs_ctxt.pwdmnt = mnt;
1346         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
1347         obd->obd_lvfs_ctxt.fs = get_ds();
1348         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
1349
1350         rc = fsfilt_setup(obd, mnt->mnt_sb);
1351         if (rc)
1352                 GOTO(err_mntput, rc);
1353
1354         rc = filter_prep(obd);
1355         if (rc)
1356                 GOTO(err_mntput, rc);
1357
1358
1359         filter->fo_destroy_in_progress = 0;
1360         sema_init(&filter->fo_create_lock, 1);
1361
1362         spin_lock_init(&filter->fo_translock);
1363         spin_lock_init(&filter->fo_objidlock);
1364         INIT_LIST_HEAD(&filter->fo_export_list);
1365         sema_init(&filter->fo_alloc_lock, 1);
1366         spin_lock_init(&filter->fo_r_pages.oh_lock);
1367         spin_lock_init(&filter->fo_w_pages.oh_lock);
1368         spin_lock_init(&filter->fo_r_discont_pages.oh_lock);
1369         spin_lock_init(&filter->fo_w_discont_pages.oh_lock);
1370         spin_lock_init(&filter->fo_r_discont_blocks.oh_lock);
1371         spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
1372         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
1373
1374         INIT_LIST_HEAD(&filter->fo_llog_list);
1375         spin_lock_init(&filter->fo_llog_list_lock);
1376
1377         obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1378                                                 LDLM_NAMESPACE_SERVER);
1379         if (obd->obd_namespace == NULL)
1380                 GOTO(err_post, rc = -ENOMEM);
1381         obd->obd_namespace->ns_lvbp = obd;
1382         obd->obd_namespace->ns_lvbo = &filter_lvbo;
1383         ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
1384
1385         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1386                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1387
1388         rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, name);
1389         if (rc) {
1390                 CERROR("failed to setup llogging subsystems\n");
1391                 GOTO(err_post, rc);
1392         }
1393         RETURN(0);
1394
1395 err_post:
1396         filter_post(obd);
1397 err_mntput:
1398         unlock_kernel();
1399         mntput(mnt);
1400         filter->fo_sb = 0;
1401         lock_kernel();
1402 err_ops:
1403         fsfilt_put_ops(obd->obd_fsops);
1404         return rc;
1405 }
1406
1407 static int filter_attach(struct obd_device *obd, obd_count len, void *data)
1408 {
1409         struct lprocfs_static_vars lvars;
1410         int rc;
1411
1412         lprocfs_init_vars(filter, &lvars);
1413         rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1414         if (rc != 0)
1415                 return rc;
1416
1417         rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1418         if (rc != 0)
1419                 return rc;
1420
1421         /* Init obdfilter private stats here */
1422         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1423                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1424         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1425                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1426
1427         return lproc_filter_attach_seqstat(obd);
1428 }
1429
1430 static int filter_detach(struct obd_device *dev)
1431 {
1432         lprocfs_free_obd_stats(dev);
1433         return lprocfs_obd_detach(dev);
1434 }
1435
1436 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1437 {
1438         struct lustre_cfg* lcfg = buf;
1439         int rc;
1440         ENTRY;
1441         /* all mount options including errors=remount-ro and asyncdel are passed
1442          * using 4th lcfg param. And it is good, finally we have got rid of
1443          * hardcoded fs types in the code. */
1444         rc = filter_common_setup(obd, len, buf, lcfg->lcfg_inlbuf4);
1445         if (rc)
1446                 RETURN(rc);
1447         rc = filter_post_fs_setup(obd);
1448         RETURN(rc);
1449 }
1450
1451 static int filter_cleanup(struct obd_device *obd, int flags)
1452 {
1453         struct filter_obd *filter = &obd->u.filter;
1454         ENTRY;
1455
1456         if (flags & OBD_OPT_FAILOVER)
1457                 CERROR("%s: shutting down for failover; client state will"
1458                        " be preserved.\n", obd->obd_name);
1459
1460         if (!list_empty(&obd->obd_exports)) {
1461                 CERROR("%s: still has clients!\n", obd->obd_name);
1462                 class_disconnect_exports(obd, flags);
1463                 if (!list_empty(&obd->obd_exports)) {
1464                         CERROR("still has exports after forced cleanup?\n");
1465                         RETURN(-EBUSY);
1466                 }
1467         }
1468
1469         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
1470
1471         if (filter->fo_sb == NULL)
1472                 RETURN(0);
1473
1474         filter_post_fs_cleanup(obd);
1475         filter_post(obd);
1476
1477         shrink_dcache_parent(filter->fo_sb->s_root);
1478         filter->fo_sb = 0;
1479
1480         if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
1481                 CERROR("%s: mount point %p busy, mnt_count: %d\n",
1482                        obd->obd_name, filter->fo_vfsmnt,
1483                        atomic_read(&filter->fo_vfsmnt->mnt_count));
1484
1485         unlock_kernel();
1486         mntput(filter->fo_vfsmnt);
1487         //destroy_buffers(filter->fo_sb->s_dev);
1488         filter->fo_sb = NULL;
1489         fsfilt_put_ops(obd->obd_fsops);
1490         lock_kernel();
1491
1492         dev_clear_rdonly(2);
1493
1494         RETURN(0);
1495 }
1496
1497 /* nearly identical to mds_connect */
1498 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1499                           struct obd_uuid *cluuid)
1500 {
1501         struct obd_export *exp;
1502         struct filter_export_data *fed;
1503         struct filter_client_data *fcd = NULL;
1504         struct filter_obd *filter = &obd->u.filter;
1505         int rc;
1506         ENTRY;
1507
1508         if (conn == NULL || obd == NULL || cluuid == NULL)
1509                 RETURN(-EINVAL);
1510
1511         rc = class_connect(conn, obd, cluuid);
1512         if (rc)
1513                 RETURN(rc);
1514         exp = class_conn2export(conn);
1515         LASSERT(exp != NULL);
1516
1517         fed = &exp->exp_filter_data;
1518
1519         spin_lock_init(&fed->fed_lock);
1520
1521         if (!obd->obd_replayable)
1522                 GOTO(cleanup, rc = 0);
1523
1524         OBD_ALLOC(fcd, sizeof(*fcd));
1525         if (!fcd) {
1526                 CERROR("filter: out of memory for client data\n");
1527                 GOTO(cleanup, rc = -ENOMEM);
1528         }
1529
1530         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1531         fed->fed_fcd = fcd;
1532
1533         rc = filter_client_add(obd, filter, fed, -1);
1534
1535 cleanup:
1536         if (rc) {
1537                 if (fcd)
1538                         OBD_FREE(fcd, sizeof(*fcd));
1539                 class_disconnect(exp, 0);
1540         } else {
1541                 class_export_put(exp);
1542         }
1543         return rc;
1544 }
1545
1546 static int filter_precleanup(struct obd_device *obd, int flags)
1547 {
1548         struct filter_group_llog *log;
1549         struct filter_obd *filter;
1550         int rc = 0;
1551         ENTRY;
1552
1553         filter = &obd->u.filter;
1554
1555         spin_lock(&filter->fo_llog_list_lock);
1556         while (!list_empty(&filter->fo_llog_list)) {
1557                 log = list_entry(filter->fo_llog_list.next,
1558                                  struct filter_group_llog, list);
1559                 list_del(&log->list);
1560                 spin_unlock(&filter->fo_llog_list_lock);
1561
1562                 rc = obd_llog_finish(obd, log->llogs, 0);
1563                 if (rc)
1564                         CERROR("failed to cleanup llogging subsystem for %u\n",
1565                                 log->group);
1566                 OBD_FREE(log->llogs, sizeof(*(log->llogs)));
1567                 OBD_FREE(log, sizeof(*log));
1568                 spin_lock(&filter->fo_llog_list_lock);
1569         }
1570         spin_unlock(&filter->fo_llog_list_lock);
1571
1572         rc = obd_llog_finish(obd, &obd->obd_llogs, 0);
1573         if (rc)
1574                 CERROR("failed to cleanup llogging subsystem\n");
1575
1576         RETURN(rc);
1577 }
1578
1579 /* Do extra sanity checks for grant accounting.  We do this at connect,
1580  * disconnect, and statfs RPC time, so it shouldn't be too bad.  We can
1581  * always get rid of it or turn it off when we know accounting is good. */
1582 static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
1583 {
1584         struct filter_export_data *fed;
1585         struct obd_export *exp;
1586         obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
1587         obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
1588         obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
1589
1590         if (list_empty(&obd->obd_exports))
1591                 return;
1592
1593         spin_lock(&obd->obd_osfs_lock);
1594         spin_lock(&obd->obd_dev_lock);
1595         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1596                 fed = &exp->exp_filter_data;
1597                 LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
1598                          "cli %s/%p %lu+%lu > "LPU64"\n",
1599                          exp->exp_client_uuid.uuid, exp,
1600                          fed->fed_grant, fed->fed_pending, maxsize);
1601                 LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
1602                          exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize);
1603                 CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
1604                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
1605                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
1606                 tot_granted += fed->fed_grant + fed->fed_pending;
1607                 tot_pending += fed->fed_pending;
1608                 tot_dirty += fed->fed_dirty;
1609         }
1610         fo_tot_granted = obd->u.filter.fo_tot_granted;
1611         fo_tot_pending = obd->u.filter.fo_tot_pending;
1612         fo_tot_dirty = obd->u.filter.fo_tot_dirty;
1613         spin_unlock(&obd->obd_dev_lock);
1614         spin_unlock(&obd->obd_osfs_lock);
1615
1616         /* Do these assertions outside the spinlocks so we don't kill system */
1617         if (tot_granted != fo_tot_granted)
1618                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
1619                        func, tot_granted, fo_tot_granted);
1620         if (tot_pending != fo_tot_pending)
1621                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
1622                        func, tot_pending, fo_tot_pending);
1623         if (tot_dirty != fo_tot_dirty)
1624                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
1625                        func, tot_dirty, fo_tot_dirty);
1626         if (tot_pending > tot_granted)
1627                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
1628                        func, tot_pending, tot_granted);
1629         if (tot_granted > maxsize)
1630                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
1631                        func, tot_granted, maxsize);
1632         if (tot_dirty > maxsize)
1633                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
1634                        func, tot_dirty, maxsize);
1635 }
1636
1637 /* Remove this client from the grant accounting totals.  We also remove
1638  * the export from the obd device under the osfs and dev locks to ensure
1639  * that the filter_grant_sanity_check() calculations are always valid.
1640  * The client should do something similar when it invalidates its import. */
1641 static void filter_grant_discard(struct obd_export *exp)
1642 {
1643         struct obd_device *obd = exp->exp_obd;
1644         struct filter_obd *filter = &obd->u.filter;
1645         struct filter_export_data *fed = &exp->exp_filter_data;
1646
1647         spin_lock(&obd->obd_osfs_lock);
1648         spin_lock(&exp->exp_obd->obd_dev_lock);
1649         list_del_init(&exp->exp_obd_chain);
1650         spin_unlock(&exp->exp_obd->obd_dev_lock);
1651
1652         CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
1653                obd->obd_name, exp->exp_client_uuid.uuid, exp,
1654                fed->fed_dirty, fed->fed_pending, fed->fed_grant);
1655
1656         LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
1657                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
1658                  obd->obd_name, filter->fo_tot_granted,
1659                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
1660         filter->fo_tot_granted -= fed->fed_grant;
1661         LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
1662                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
1663                  obd->obd_name, filter->fo_tot_pending,
1664                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
1665         LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
1666                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
1667                  obd->obd_name, filter->fo_tot_dirty,
1668                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
1669         filter->fo_tot_dirty -= fed->fed_dirty;
1670         fed->fed_dirty = 0;
1671         fed->fed_grant = 0;
1672
1673         spin_unlock(&obd->obd_osfs_lock);
1674 }
1675
1676 static int filter_destroy_export(struct obd_export *exp)
1677 {
1678         ENTRY;
1679
1680         if (exp->exp_filter_data.fed_pending)
1681                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
1682                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1683                        exp, exp->exp_filter_data.fed_pending);
1684
1685         target_destroy_export(exp);
1686
1687         if (exp->exp_obd->obd_replayable)
1688                 filter_client_free(exp, exp->exp_flags);
1689
1690         filter_grant_discard(exp);
1691         if (!(exp->exp_flags & OBD_OPT_FORCE))
1692                 filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
1693
1694         RETURN(0);
1695 }
1696
1697 static void filter_sync_llogs(struct obd_export *dexp)
1698 {
1699         struct filter_group_llog *fglog, *nlog;
1700         struct obd_device *obd = dexp->exp_obd;
1701         struct filter_obd *filter;
1702         int worked = 0, group;
1703         struct llog_ctxt *ctxt;
1704
1705         filter = &obd->u.filter;
1706
1707         /* we can't sync log holding spinlock. also, we do not want to get
1708          * into livelock. so we do following: loop over MDS's exports in
1709          * group order and skip already synced llogs -bzzz */
1710         do {
1711                 /* look for group with min. number, but > worked */
1712                 fglog = NULL;
1713                 group = 1 << 30;
1714                 spin_lock(&filter->fo_llog_list_lock);
1715                 list_for_each_entry(nlog, &filter->fo_llog_list, list) {
1716                        
1717                         if (nlog->group <= worked) {
1718                                 /* this group is already synced */
1719                                 continue;
1720                         }
1721         
1722                         if (group < nlog->group) {
1723                                 /* we have group with smaller number to sync */
1724                                 continue;
1725                         }
1726
1727                         /* store current minimal group */
1728                         fglog = nlog;
1729                         group = nlog->group;
1730                 }
1731                 spin_lock(&filter->fo_llog_list_lock);
1732
1733                 if (fglog) {
1734                         worked = fglog->group;
1735                         ctxt = llog_get_context(fglog->llogs,
1736                                                 LLOG_UNLINK_REPL_CTXT);
1737                         llog_sync(ctxt, dexp);
1738                 }
1739         } while (fglog != NULL);
1740 }
1741
1742 /* also incredibly similar to mds_disconnect */
1743 static int filter_disconnect(struct obd_export *exp, int flags)
1744 {
1745         struct obd_device *obd = exp->exp_obd;
1746         unsigned long irqflags;
1747         int rc;
1748         ENTRY;
1749
1750         LASSERT(exp);
1751         class_export_get(exp);
1752
1753         spin_lock_irqsave(&exp->exp_lock, irqflags);
1754         exp->exp_flags = flags;
1755         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1756
1757         if (!(flags & OBD_OPT_FORCE))
1758                 filter_grant_sanity_check(obd, __FUNCTION__);
1759         filter_grant_discard(exp);
1760
1761         /* Disconnect early so that clients can't keep using export */
1762         rc = class_disconnect(exp, flags);
1763
1764         ldlm_cancel_locks_for_export(exp);
1765
1766         fsfilt_sync(obd, obd->u.filter.fo_sb);
1767
1768         /* flush any remaining cancel messages out to the target */
1769         filter_sync_llogs(exp);
1770
1771         class_export_put(exp);
1772         RETURN(rc);
1773 }
1774
1775 struct dentry *__filter_oa2dentry(struct obd_device *obd,
1776                                   struct obdo *oa, const char *what)
1777 {
1778         struct dentry *dchild = NULL;
1779         obd_gr group = 0;
1780
1781         if (oa->o_valid & OBD_MD_FLGROUP)
1782                 group = oa->o_gr;
1783
1784         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
1785
1786         if (IS_ERR(dchild)) {
1787                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1788                 RETURN(dchild);
1789         }
1790
1791         if (dchild->d_inode == NULL) {
1792                 CERROR("%s: %s on non-existent object: "LPU64"\n",
1793                        obd->obd_name, what, oa->o_id);
1794                 f_dput(dchild);
1795                 RETURN(ERR_PTR(-ENOENT));
1796         }
1797
1798         return dchild;
1799 }
1800
1801 static int filter_getattr(struct obd_export *exp, struct obdo *oa,
1802                           struct lov_stripe_md *md)
1803 {
1804         struct dentry *dentry = NULL;
1805         struct obd_device *obd;
1806         int rc = 0;
1807         ENTRY;
1808
1809         obd = class_exp2obd(exp);
1810         if (obd == NULL) {
1811                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1812                        exp->exp_handle.h_cookie);
1813                 RETURN(-EINVAL);
1814         }
1815
1816         dentry = filter_oa2dentry(obd, oa);
1817         if (IS_ERR(dentry))
1818                 RETURN(PTR_ERR(dentry));
1819
1820         /* Limit the valid bits in the return data to what we actually use */
1821         oa->o_valid = OBD_MD_FLID;
1822         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1823
1824         f_dput(dentry);
1825         RETURN(rc);
1826 }
1827
1828 /* this is called from filter_truncate() until we have filter_punch() */
1829 static int filter_setattr(struct obd_export *exp, struct obdo *oa,
1830                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1831 {
1832         struct lvfs_run_ctxt saved;
1833         struct filter_obd *filter;
1834         struct dentry *dentry;
1835         struct iattr iattr;
1836         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
1837         struct ldlm_resource *res;
1838         void *handle;
1839         int rc, rc2;
1840         ENTRY;
1841
1842         LASSERT(oti != NULL);
1843
1844         dentry = filter_oa2dentry(exp->exp_obd, oa);
1845         if (IS_ERR(dentry))
1846                 RETURN(PTR_ERR(dentry));
1847
1848         filter = &exp->exp_obd->u.filter;
1849
1850         iattr_from_obdo(&iattr, oa, oa->o_valid);
1851
1852         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1853         lock_kernel();
1854
1855         if (iattr.ia_valid & ATTR_SIZE)
1856                 down(&dentry->d_inode->i_sem);
1857         handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
1858                               oti);
1859         if (IS_ERR(handle))
1860                 GOTO(out_unlock, rc = PTR_ERR(handle));
1861
1862         /* XXX this could be a rwsem instead, if filter_preprw played along */
1863         if (iattr.ia_valid & ATTR_ATTR_FLAG)
1864                 rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
1865                                       EXT3_IOC_SETFLAGS,
1866                                       (long)&iattr.ia_attr_flags);
1867         else
1868                 rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
1869         rc = filter_finish_transno(exp, oti, rc);
1870         rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
1871         if (rc2) {
1872                 CERROR("error on commit, err = %d\n", rc2);
1873                 if (!rc)
1874                         rc = rc2;
1875         }
1876
1877         if (iattr.ia_valid & ATTR_SIZE) {
1878                 res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
1879                                         res_id, LDLM_EXTENT, 0);
1880                 if (res == NULL) {
1881                         CERROR("!!! resource_get failed for object "LPU64" -- "
1882                                "filter_setattr with no lock?\n", oa->o_id);
1883                 } else {
1884                         if (res->lr_namespace->ns_lvbo &&
1885                             res->lr_namespace->ns_lvbo->lvbo_update) {
1886                                 rc = res->lr_namespace->ns_lvbo->lvbo_update
1887                                         (res, NULL, 0, 0);
1888                         }
1889                         ldlm_resource_putref(res);
1890                 }
1891         }
1892
1893         oa->o_valid = OBD_MD_FLID;
1894         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1895
1896 out_unlock:
1897         if (iattr.ia_valid & ATTR_SIZE)
1898                 up(&dentry->d_inode->i_sem);
1899         unlock_kernel();
1900         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1901
1902         f_dput(dentry);
1903         RETURN(rc);
1904 }
1905
1906 /* XXX identical to osc_unpackmd */
1907 static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
1908                            struct lov_mds_md *lmm, int lmm_bytes)
1909 {
1910         int lsm_size;
1911         ENTRY;
1912
1913         if (lmm != NULL) {
1914                 if (lmm_bytes < sizeof (*lmm)) {
1915                         CERROR("lov_mds_md too small: %d, need %d\n",
1916                                lmm_bytes, (int)sizeof(*lmm));
1917                         RETURN(-EINVAL);
1918                 }
1919                 /* XXX LOV_MAGIC etc check? */
1920
1921                 if (lmm->lmm_object_id == cpu_to_le64(0)) {
1922                         CERROR("lov_mds_md: zero lmm_object_id\n");
1923                         RETURN(-EINVAL);
1924                 }
1925         }
1926
1927         lsm_size = lov_stripe_md_size(1);
1928         if (lsmp == NULL)
1929                 RETURN(lsm_size);
1930
1931         if (*lsmp != NULL && lmm == NULL) {
1932                 OBD_FREE(*lsmp, lsm_size);
1933                 *lsmp = NULL;
1934                 RETURN(0);
1935         }
1936
1937         if (*lsmp == NULL) {
1938                 OBD_ALLOC(*lsmp, lsm_size);
1939                 if (*lsmp == NULL)
1940                         RETURN(-ENOMEM);
1941
1942                 loi_init((*lsmp)->lsm_oinfo);
1943         }
1944
1945         if (lmm != NULL) {
1946                 /* XXX zero *lsmp? */
1947                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
1948                 LASSERT((*lsmp)->lsm_object_id);
1949         }
1950
1951         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
1952
1953         RETURN(lsm_size);
1954 }
1955
1956 static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
1957                                       struct filter_obd *filter)
1958 {
1959         struct obdo doa; /* XXX obdo on stack */
1960         __u64 last, id;
1961         ENTRY;
1962         LASSERT(oa);
1963
1964         memset(&doa, 0, sizeof(doa));
1965         if (oa->o_valid & OBD_MD_FLGROUP) {
1966                 doa.o_valid |= OBD_MD_FLGROUP;
1967                 doa.o_gr = oa->o_gr;
1968         } else {
1969                 doa.o_gr = 0;
1970         }
1971         doa.o_mode = S_IFREG;
1972         doa.o_gr = oa->o_gr;
1973         doa.o_valid = oa->o_valid & OBD_MD_FLGROUP;
1974
1975         filter->fo_destroy_in_progress = 1;
1976         down(&filter->fo_create_lock);
1977         if (!filter->fo_destroy_in_progress) {
1978                 CERROR("%s: destroy_in_progress already cleared\n",
1979                         exp->exp_obd->obd_name);
1980                 up(&filter->fo_create_lock);
1981                 EXIT;
1982                 return;
1983         }
1984
1985         last = filter_last_id(filter, doa.o_gr);
1986         CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
1987                exp->exp_obd->obd_name, oa->o_id + 1, last);
1988         for (id = oa->o_id + 1; id <= last; id++) {
1989                 doa.o_id = id;
1990                 filter_destroy(exp, &doa, NULL, NULL);
1991         }
1992
1993         CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
1994                exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
1995
1996         spin_lock(&filter->fo_objidlock);
1997         filter->fo_last_objids[doa.o_gr] = oa->o_id;
1998         spin_unlock(&filter->fo_objidlock);
1999
2000         filter->fo_destroy_in_progress = 0;
2001         up(&filter->fo_create_lock);
2002
2003         EXIT;
2004 }
2005
2006 /* returns a negative error or a nonnegative number of files to create */
2007 static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
2008                                    obd_gr group)
2009 {
2010         struct obd_device *obd = exp->exp_obd;
2011         struct filter_obd *filter = &obd->u.filter;
2012         int diff, rc;
2013         ENTRY;
2014
2015         diff = oa->o_id - filter_last_id(filter, oa->o_gr);
2016         CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
2017                filter_last_id(filter, oa->o_gr), diff);
2018
2019         /* delete orphans request */
2020         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2021             (oa->o_flags & OBD_FL_DELORPHAN)) {
2022                 if (diff >= 0)
2023                         RETURN(diff);
2024                 if (-diff > 10000) { /* XXX make this smarter */
2025                         CERROR("ignoring bogus orphan destroy request: obdid "
2026                                LPU64" last_id "LPU64"\n",
2027                                oa->o_id, filter_last_id(filter, oa->o_gr));
2028                         RETURN(-EINVAL);
2029                 }
2030                 filter_destroy_precreated(exp, oa, filter);
2031                 rc = filter_update_last_objid(obd, group, 0);
2032                 if (rc)
2033                         CERROR("unable to write lastobjid, but orphans"
2034                                "were deleted\n");
2035                 RETURN(0);
2036         } else {
2037                 /* only precreate if group == 0 and o_id is specfied */
2038                 if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
2039                     (group != 0 || oa->o_id == 0))
2040                         RETURN(1);
2041
2042                 LASSERT(diff >= 0);
2043                 RETURN(diff);
2044         }
2045 }
2046
2047 /* We rely on the fact that only one thread will be creating files in a given
2048  * group at a time, which is why we don't need an atomic filter_get_new_id.
2049  * Even if we had that atomic function, the following race would exist:
2050  *
2051  * thread 1: gets id x from filter_next_id
2052  * thread 2: gets id (x + 1) from filter_next_id
2053  * thread 2: creates object (x + 1)
2054  * thread 1: tries to create object x, gets -ENOSPC
2055  */
2056 static int filter_precreate(struct obd_device *obd, struct obdo *oa,
2057                             obd_gr group, int *num)
2058 {
2059         struct dentry *dchild = NULL, *dparent = NULL;
2060         struct filter_obd *filter;
2061         int err = 0, rc = 0, recreate_obj = 0, i;
2062         __u64 next_id;
2063         void *handle = NULL;
2064         ENTRY;
2065
2066         filter = &obd->u.filter;
2067
2068         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2069             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2070                 recreate_obj = 1;
2071         }
2072
2073         CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
2074
2075         down(&filter->fo_create_lock);
2076
2077         for (i = 0; i < *num && err == 0; i++) {
2078                 int cleanup_phase = 0;
2079
2080                 if (filter->fo_destroy_in_progress) {
2081                         CWARN("%s: precreate aborted by destroy\n",
2082                               obd->obd_name);
2083                         break;
2084                 }
2085
2086                 if (recreate_obj) {
2087                         __u64 last_id;
2088                         next_id = oa->o_id;
2089                         last_id = filter_last_id(filter, group);
2090                         if (next_id > last_id) {
2091                                 CERROR("Error: Trying to recreate obj greater"
2092                                        "than last id "LPD64" > "LPD64"\n",
2093                                        next_id, last_id);
2094                                 GOTO(cleanup, rc = -EINVAL);
2095                         }
2096                 } else {
2097                         next_id = filter_last_id(filter, group) + 1;
2098                 }
2099
2100                 CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
2101
2102                 dparent = filter_parent_lock(obd, group, next_id);
2103                 if (IS_ERR(dparent))
2104                         GOTO(cleanup, rc = PTR_ERR(dparent));
2105                 cleanup_phase = 1;
2106
2107                 dchild = filter_fid2dentry(obd, dparent, group, next_id);
2108                 if (IS_ERR(dchild))
2109                         GOTO(cleanup, rc = PTR_ERR(dchild));
2110                 cleanup_phase = 2;
2111
2112                 if (dchild->d_inode != NULL) {
2113                         /* This would only happen if lastobjid was bad on disk*/
2114                         /* Could also happen if recreating missing obj but
2115                          * already exists
2116                          */
2117                         if (recreate_obj) {
2118                                 CERROR("%s: Serious error: recreating obj %*s "
2119                                        "but obj already exists \n",
2120                                        obd->obd_name, dchild->d_name.len,
2121                                        dchild->d_name.name);
2122                                 LBUG();
2123                         } else {
2124                                 CERROR("%s: Serious error: objid %*s already "
2125                                        "exists; is this filesystem corrupt?\n",
2126                                        obd->obd_name, dchild->d_name.len,
2127                                        dchild->d_name.name);
2128                                 LBUG();
2129                         }
2130                         GOTO(cleanup, rc = -EEXIST);
2131                 }
2132
2133                 handle = fsfilt_start_log(obd, dparent->d_inode,
2134                                           FSFILT_OP_CREATE, NULL, 1);
2135                 if (IS_ERR(handle))
2136                         GOTO(cleanup, rc = PTR_ERR(handle));
2137                 cleanup_phase = 3;
2138
2139                 rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
2140                 if (rc) {
2141                         CERROR("create failed rc = %d\n", rc);
2142                         GOTO(cleanup, rc);
2143                 }
2144
2145                 if (!recreate_obj) {
2146                         filter_set_last_id(filter, group, next_id);
2147                         err = filter_update_last_objid(obd, group, 0);
2148                         if (err)
2149                                 CERROR("unable to write lastobjid "
2150                                        "but file created\n");
2151                 }
2152
2153         cleanup:
2154                 switch(cleanup_phase) {
2155                 case 3:
2156                         err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2157                         if (err) {
2158                                 CERROR("error on commit, err = %d\n", err);
2159                                 if (!rc)
2160                                         rc = err;
2161                         }
2162                 case 2:
2163                         f_dput(dchild);
2164                 case 1:
2165                         filter_parent_unlock(dparent);
2166                 case 0:
2167                         break;
2168                 }
2169
2170                 if (rc)
2171                         break;
2172         }
2173         *num = i;
2174
2175         up(&filter->fo_create_lock);
2176
2177         CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
2178                obd->obd_name, group, filter->fo_last_objids[group]);
2179
2180         CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
2181                obd->obd_name, i);
2182         RETURN(rc);
2183 }
2184
2185 static int filter_create(struct obd_export *exp, struct obdo *oa,
2186                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
2187 {
2188         struct obd_device *obd = NULL;
2189         struct filter_obd *filter;
2190         struct lvfs_run_ctxt saved;
2191         struct lov_stripe_md *lsm = NULL;
2192         struct filter_export_data *fed;
2193         int group = oa->o_gr;
2194         char str[PTL_NALFMT_SIZE];
2195         int rc = 0, diff;
2196         ENTRY;
2197
2198         if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
2199                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
2200                                 exp->exp_connection->c_peer.peer_nid, str);
2201                 CERROR("!!! nid "LPX64"/%s sent invalid object group %d\n",
2202                        exp->exp_connection->c_peer.peer_nid, str, group);
2203                 RETURN(-EINVAL);
2204         }
2205
2206         obd = exp->exp_obd;
2207         fed = &exp->exp_filter_data;
2208         filter = &obd->u.filter;
2209
2210         if (fed->fed_group != group && !(oa->o_valid & OBD_MD_REINT)) {
2211                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
2212                                 exp->exp_connection->c_peer.peer_nid, str);
2213                 CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
2214                        "earlier; now it's trying to use group %d!  This could "
2215                        "be a bug in the MDS.  Tell CFS.\n",
2216                        exp->exp_connection->c_peer.peer_nid, str,
2217                        fed->fed_group, group);
2218                 RETURN(-ENOTUNIQ);
2219         }
2220
2221         CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
2222                group, oa->o_id);
2223         if (ea != NULL) {
2224                 lsm = *ea;
2225                 if (lsm == NULL) {
2226                         rc = obd_alloc_memmd(exp, &lsm);
2227                         if (rc < 0)
2228                                 RETURN(rc);
2229                 }
2230         }
2231
2232         obd = exp->exp_obd;
2233         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2234
2235         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
2236             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
2237                 if (oa->o_id > filter_last_id(&obd->u.filter, group)) {
2238                         CERROR("recreate objid "LPU64" > last id "LPU64"\n",
2239                                oa->o_id, filter_last_id(&obd->u.filter, group));
2240                         rc = -EINVAL;
2241                 } else {
2242                         diff = 1;
2243                         rc = filter_precreate(obd, oa, group, &diff);
2244                 }
2245         } else {
2246                 diff = filter_should_precreate(exp, oa, group);
2247                 if (diff > 0) {
2248                         oa->o_id = filter_last_id(&obd->u.filter, group);
2249                         rc = filter_precreate(obd, oa, group, &diff);
2250                         oa->o_id += diff;
2251                         oa->o_valid = OBD_MD_FLID;
2252                 }
2253         }
2254
2255         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2256         if (rc && ea != NULL && *ea != lsm) {
2257                 obd_free_memmd(exp, &lsm);
2258         } else if (rc == 0 && ea != NULL) {
2259                 /* XXX LOV STACKING: the lsm that is passed to us from
2260                  * LOV does not have valid lsm_oinfo data structs, so
2261                  * don't go touching that.  This needs to be fixed in a
2262                  * big way. */
2263                 lsm->lsm_object_id = oa->o_id;
2264                 lsm->lsm_object_gr = oa->o_gr;
2265                 *ea = lsm;
2266         }
2267
2268         RETURN(rc);
2269 }
2270
2271 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
2272                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
2273 {
2274         struct obd_device *obd;
2275         struct filter_obd *filter;
2276         struct dentry *dchild = NULL, *dparent = NULL;
2277         struct lvfs_run_ctxt saved;
2278         void *handle = NULL;
2279         struct llog_cookie *fcc = NULL;
2280         int rc, rc2, cleanup_phase = 0, have_prepared = 0;
2281         ENTRY;
2282
2283         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2284
2285         obd = exp->exp_obd;
2286         filter = &obd->u.filter;
2287
2288         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2289
2290  acquire_locks:
2291         dparent = filter_parent_lock(obd, oa->o_gr, oa->o_id);
2292         if (IS_ERR(dparent))
2293                 GOTO(cleanup, rc = PTR_ERR(dparent));
2294         cleanup_phase = 1;
2295
2296         dchild = filter_fid2dentry(obd, dparent, oa->o_gr, oa->o_id);
2297         if (IS_ERR(dchild))
2298                 GOTO(cleanup, rc = -ENOENT);
2299         cleanup_phase = 2;
2300
2301         if (dchild->d_inode == NULL) {
2302                 CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n",
2303                        oa->o_id);
2304                 GOTO(cleanup, rc = -ENOENT);
2305         }
2306
2307         if (!have_prepared) {
2308                 /* If we're really going to destroy the object, get ready
2309                  * by getting the clients to discard their cached data.
2310                  *
2311                  * We have to drop the parent lock, because
2312                  * filter_prepare_destroy will acquire a PW on the object, and
2313                  * we don't want to deadlock with an incoming write to the
2314                  * object, which has the extent PW and then wants to get the
2315                  * parent dentry to do the lookup.
2316                  *
2317                  * We dput the child because it's not worth the extra
2318                  * complication of condition the above code to skip it on the
2319                  * second time through. */
2320                 f_dput(dchild);
2321                 filter_parent_unlock(dparent);
2322
2323                 filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
2324                 have_prepared = 1;
2325                 goto acquire_locks;
2326         }
2327
2328         handle = fsfilt_start_log(obd, dparent->d_inode, FSFILT_OP_UNLINK, oti, 1);
2329         if (IS_ERR(handle))
2330                 GOTO(cleanup, rc = PTR_ERR(handle));
2331
2332         cleanup_phase = 3;
2333
2334         /* Our MDC connection is established by the MDS to us */
2335         if (oa->o_valid & OBD_MD_FLCOOKIE) {
2336                 OBD_ALLOC(fcc, sizeof(*fcc));
2337                 if (fcc != NULL)
2338                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
2339         }
2340
2341         rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
2342
2343 cleanup:
2344         switch(cleanup_phase) {
2345         case 3:
2346                 if (fcc != NULL) {
2347                         if (oti != NULL)
2348                                 fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
2349                                                       oti->oti_handle,
2350                                                       filter_cancel_cookies_cb,
2351                                                       fcc);
2352                         else
2353                                 fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
2354                                                       handle,
2355                                                       filter_cancel_cookies_cb,
2356                                                       fcc);
2357                 }
2358                 rc = filter_finish_transno(exp, oti, rc);
2359                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2360                 if (rc2) {
2361                         CERROR("error on commit, err = %d\n", rc2);
2362                         if (!rc)
2363                                 rc = rc2;
2364                 }
2365         case 2:
2366                 f_dput(dchild);
2367         case 1:
2368                 filter_parent_unlock(dparent);
2369         case 0:
2370                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2371                 break;
2372         default:
2373                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2374                 LBUG();
2375         }
2376
2377         RETURN(rc);
2378 }
2379
2380 /* NB start and end are used for punch, but not truncate */
2381 static int filter_truncate(struct obd_export *exp, struct obdo *oa,
2382                            struct lov_stripe_md *lsm,
2383                            obd_off start, obd_off end,
2384                            struct obd_trans_info *oti)
2385 {
2386         int error;
2387         ENTRY;
2388
2389         if (end != OBD_OBJECT_EOF)
2390                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2391                        end);
2392
2393         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
2394                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2395         oa->o_size = start;
2396         error = filter_setattr(exp, oa, NULL, oti);
2397         RETURN(error);
2398 }
2399
2400 static int filter_sync(struct obd_export *exp, struct obdo *oa,
2401                        struct lov_stripe_md *lsm, obd_off start, obd_off end)
2402 {
2403         struct obd_device *obd = exp->exp_obd;
2404         struct lvfs_run_ctxt saved;
2405         struct filter_obd *filter;
2406         struct dentry *dentry;
2407         struct llog_ctxt *ctxt;
2408         int rc, rc2;
2409         ENTRY;
2410
2411         filter = &obd->u.filter;
2412
2413         /* an objid of zero is taken to mean "sync whole filesystem" */
2414         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
2415                 rc = fsfilt_sync(obd, filter->fo_sb);
2416                 /* flush any remaining cancel messages out to the target */
2417                 ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_REPL_CTXT);
2418                 llog_sync(ctxt, exp);
2419                 RETURN(rc);
2420         }
2421
2422         dentry = filter_oa2dentry(obd, oa);
2423         if (IS_ERR(dentry))
2424                 RETURN(PTR_ERR(dentry));
2425
2426         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2427
2428         down(&dentry->d_inode->i_sem);
2429         rc = filemap_fdatasync(dentry->d_inode->i_mapping);
2430         if (rc == 0) {
2431                 /* just any file to grab fsync method - "file" arg unused */
2432                 struct file *file = filter->fo_rcvd_filp;
2433
2434                 if (file->f_op && file->f_op->fsync)
2435                         rc = file->f_op->fsync(NULL, dentry, 1);
2436
2437                 rc2 = filemap_fdatawait(dentry->d_inode->i_mapping);
2438                 if (!rc)
2439                         rc = rc2;
2440         }
2441         up(&dentry->d_inode->i_sem);
2442
2443         oa->o_valid = OBD_MD_FLID;
2444         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
2445
2446         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
2447
2448         f_dput(dentry);
2449         RETURN(rc);
2450 }
2451
2452 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2453                          unsigned long max_age)
2454 {
2455         struct filter_obd *filter = &obd->u.filter;
2456         int blockbits = filter->fo_sb->s_blocksize_bits;
2457         int rc;
2458         ENTRY;
2459
2460         /* at least try to account for cached pages.  its still racey and
2461          * might be under-reporting if clients haven't announced their
2462          * caches with brw recently */
2463         spin_lock(&obd->obd_osfs_lock);
2464         rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
2465         memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
2466         spin_unlock(&obd->obd_osfs_lock);
2467
2468         CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
2469                " pending "LPU64" free "LPU64" avail "LPU64"\n",
2470                filter->fo_tot_dirty, filter->fo_tot_granted,
2471                filter->fo_tot_pending,
2472                osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
2473
2474         filter_grant_sanity_check(obd, __FUNCTION__);
2475
2476         osfs->os_bavail -= min(osfs->os_bavail,
2477                                (filter->fo_tot_dirty + filter->fo_tot_pending +
2478                                 osfs->os_bsize -1) >> blockbits);
2479
2480         RETURN(rc);
2481 }
2482
2483 static int filter_get_info(struct obd_export *exp, __u32 keylen,
2484                            void *key, __u32 *vallen, void *val)
2485 {
2486         struct filter_export_data *fed = &exp->exp_filter_data;
2487         struct obd_device *obd;
2488         ENTRY;
2489
2490         obd = class_exp2obd(exp);
2491         if (obd == NULL) {
2492                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2493                        exp->exp_handle.h_cookie);
2494                 RETURN(-EINVAL);
2495         }
2496
2497         if (keylen == strlen("blocksize") &&
2498             memcmp(key, "blocksize", keylen) == 0) {
2499                 __u32 *blocksize = val;
2500                 *vallen = sizeof(*blocksize);
2501                 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2502                 RETURN(0);
2503         }
2504
2505         if (keylen == strlen("blocksize_bits") &&
2506             memcmp(key, "blocksize_bits", keylen) == 0) {
2507                 __u32 *blocksize_bits = val;
2508                 *vallen = sizeof(*blocksize_bits);
2509                 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2510                 RETURN(0);
2511         }
2512
2513         if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
2514                 obd_id *last_id = val;
2515                 *last_id = filter_last_id(&obd->u.filter, fed->fed_group);
2516                 RETURN(0);
2517         }
2518         if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
2519                 /*Get log_context handle*/
2520                 unsigned long *llh_handle = val;
2521                 *vallen = sizeof(unsigned long);
2522                 *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
2523                 RETURN(0);
2524         }
2525         if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
2526                 /*Get log_context handle*/
2527                 unsigned long *sb = val;
2528                 *vallen = sizeof(unsigned long);
2529                 *sb = (unsigned long)obd->u.filter.fo_sb;
2530                 RETURN(0);
2531         }
2532
2533         CDEBUG(D_IOCTL, "invalid key\n");
2534         RETURN(-EINVAL);
2535 }
2536
2537 struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
2538 {
2539         struct filter_group_llog *fglog, *nlog;
2540         char name[32] = "CATLIST";
2541         struct filter_obd *filter;
2542         struct list_head *cur;
2543         int rc;
2544
2545         filter = &obd->u.filter;
2546
2547         spin_lock(&filter->fo_llog_list_lock);
2548         list_for_each(cur, &filter->fo_llog_list) {
2549                 fglog = list_entry(cur, struct filter_group_llog, list);
2550                 if (fglog->group == group) {
2551                         spin_unlock(&filter->fo_llog_list_lock);
2552                         RETURN(fglog->llogs);
2553                 }
2554         }
2555         spin_unlock(&filter->fo_llog_list_lock);
2556
2557         OBD_ALLOC(fglog, sizeof(*fglog));
2558         if (fglog == NULL)
2559                 RETURN(NULL);
2560         fglog->group = group;
2561
2562         OBD_ALLOC(fglog->llogs, sizeof(struct obd_llogs));
2563         if (fglog->llogs == NULL) {
2564                 OBD_FREE(fglog, sizeof(*fglog));
2565                 RETURN(NULL);
2566         }
2567
2568         spin_lock(&filter->fo_llog_list_lock);
2569         list_for_each(cur, &filter->fo_llog_list) {
2570                 nlog = list_entry(cur, struct filter_group_llog, list);
2571                 LASSERT(nlog->group != group);
2572         }
2573         list_add(&fglog->list, &filter->fo_llog_list);
2574         spin_unlock(&filter->fo_llog_list_lock);
2575
2576         rc = obd_llog_cat_initialize(obd, fglog->llogs, 1, name);
2577         if (rc) {
2578                 OBD_FREE(fglog->llogs, sizeof(*(fglog->llogs)));
2579                 OBD_FREE(fglog, sizeof(*fglog));
2580                 RETURN(NULL);
2581         }
2582
2583         CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n", obd->obd_name,
2584                 fglog->llogs, group);
2585
2586         RETURN(fglog->llogs);
2587 }
2588
2589 static int filter_set_info(struct obd_export *exp, __u32 keylen,
2590                            void *key, __u32 vallen, void *val)
2591 {
2592         struct lvfs_run_ctxt saved;
2593         struct filter_export_data *fed = &exp->exp_filter_data;
2594         struct obd_device *obd;
2595         struct lustre_handle conn;
2596         struct obd_llogs *llog;
2597         struct llog_ctxt *ctxt;
2598         __u32 group;
2599         int rc = 0;
2600         ENTRY;
2601
2602         conn.cookie = exp->exp_handle.h_cookie;
2603
2604         obd = exp->exp_obd;
2605         if (obd == NULL) {
2606                 CDEBUG(D_IOCTL, "invalid exp %p cookie "LPX64"\n",
2607                        exp, conn.cookie);
2608                 RETURN(-EINVAL);
2609         }
2610
2611         if (keylen < strlen("mds_conn") ||
2612             memcmp(key, "mds_conn", keylen) != 0)
2613                 RETURN(-EINVAL);
2614
2615         group = *((__u32 *)val);
2616         if (fed->fed_group != 0 && fed->fed_group != group) {
2617                 char str[PTL_NALFMT_SIZE];
2618                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
2619                                 exp->exp_connection->c_peer.peer_nid, str);
2620                 CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
2621                        "earlier; now it's trying to use group %d!  This could "
2622                        "be a bug in the MDS.  Tell CFS.\n",
2623                        exp->exp_connection->c_peer.peer_nid, str,
2624                        fed->fed_group, group);
2625                 RETURN(-EPROTO);
2626         }
2627         fed->fed_group = group;
2628         CWARN("Received MDS connection ("LPX64"); group %d\n", conn.cookie,
2629               group);
2630
2631         LASSERT(rc == 0);
2632
2633         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2634         rc = filter_read_groups(obd, group, 1);
2635         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2636         if (rc != 0) {
2637                 CERROR("can't read group %u\n", group);
2638                 RETURN(rc);
2639         }
2640         rc = filter_group_set_kml_flags(obd, group);
2641          if (rc != 0) {
2642                 CERROR("can't set kml flags %u\n", group);
2643                 RETURN(rc);
2644         }
2645         llog = filter_grab_llog_for_group(obd, group);
2646         LASSERT(llog != NULL);
2647
2648         ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
2649         LASSERT(ctxt != NULL);
2650         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
2651         RETURN(rc);
2652 }
2653
2654 int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
2655                      int len, void *karg, void *uarg)
2656 {
2657         struct obd_device *obd = exp->exp_obd;
2658         struct obd_ioctl_data *data = karg;
2659         int rc = 0;
2660
2661         switch (cmd) {
2662         case OBD_IOC_ABORT_RECOVERY:
2663                 CERROR("aborting recovery for device %s\n", obd->obd_name);
2664                 target_abort_recovery(obd);
2665                 RETURN(0);
2666
2667         case OBD_IOC_SET_READONLY: {
2668                 void *handle;
2669                 struct super_block *sb = obd->u.filter.fo_sb;
2670                 struct inode *inode = sb->s_root->d_inode;
2671                 BDEVNAME_DECLARE_STORAGE(tmp);
2672                 CERROR("setting device %s read-only\n",
2673                        ll_bdevname(sb, tmp));
2674
2675                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
2676                 LASSERT(handle);
2677                 (void)fsfilt_commit(obd, inode, handle, 1);
2678
2679                 dev_set_rdonly(ll_sbdev(obd->u.filter.fo_sb), 2);
2680                 RETURN(0);
2681         }
2682
2683         case OBD_IOC_CATLOGLIST: {
2684                 rc = llog_catalog_list(obd, 1, data);
2685                 RETURN(rc);
2686         }
2687
2688         case OBD_IOC_LLOG_CANCEL:
2689         case OBD_IOC_LLOG_REMOVE:
2690         case OBD_IOC_LLOG_INFO:
2691         case OBD_IOC_LLOG_PRINT: {
2692                 /* FIXME to be finished */
2693                 RETURN(-EOPNOTSUPP);
2694 /*
2695                 struct llog_ctxt *ctxt = NULL;
2696
2697                 push_ctxt(&saved, &ctxt->loc_ctxt, NULL);
2698                 rc = llog_ioctl(ctxt, cmd, data);
2699                 pop_ctxt(&saved, &ctxt->loc_ctxt, NULL);
2700
2701                 RETURN(rc);
2702 */
2703         }
2704
2705
2706         default:
2707                 RETURN(-EINVAL);
2708         }
2709         RETURN(0);
2710 }
2711
2712 static struct llog_operations filter_unlink_repl_logops;
2713 static struct llog_operations filter_size_orig_logops = {
2714         lop_setup: llog_obd_origin_setup,
2715         lop_cleanup: llog_catalog_cleanup,
2716         lop_add: llog_catalog_add,
2717 };
2718
2719 static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
2720                             struct obd_device *tgt, int count,
2721                             struct llog_catid *catid)
2722 {
2723         struct llog_ctxt *ctxt;
2724         int rc;
2725         ENTRY;
2726
2727         filter_unlink_repl_logops = llog_client_ops;
2728         filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel;
2729         filter_unlink_repl_logops.lop_connect = llog_repl_connect;
2730         filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync;
2731
2732         rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
2733                         &filter_unlink_repl_logops);
2734         if (rc)
2735                 RETURN(rc);
2736         /* FIXME - assign unlink_cb for filter's recovery */
2737         ctxt = llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT);
2738         ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
2739
2740         /* FIXME - count should be 1 to setup size log */
2741         rc = obd_llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, 
2742                             &catid->lci_logid, &filter_size_orig_logops);
2743         RETURN(rc);
2744 }
2745
2746 static int filter_llog_finish(struct obd_device *obd,
2747                               struct obd_llogs *llogs, int count)
2748 {
2749         int rc;
2750         ENTRY;
2751
2752         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT));
2753         if (rc)
2754                 RETURN(rc);
2755
2756         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT));
2757         RETURN(rc);
2758 }
2759
2760 static int filter_llog_connect(struct obd_device *obd,
2761                                struct llogd_conn_body *body) 
2762 {
2763         struct llog_ctxt *ctxt;
2764         struct obd_llogs *llog;
2765         int rc;
2766         ENTRY;
2767
2768         CDEBUG(D_OTHER, "handle connect for %s: %u/%u/%u\n", obd->obd_name,
2769                (unsigned) body->lgdc_logid.lgl_ogr,
2770                (unsigned) body->lgdc_logid.lgl_oid,
2771                (unsigned) body->lgdc_logid.lgl_ogen);
2772         llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr);
2773         LASSERT(llog != NULL);
2774         ctxt = llog_get_context(llog, body->lgdc_ctxt_idx);
2775         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
2776                           &body->lgdc_gen, NULL);
2777         if (rc != 0)
2778                 CERROR("failed to connect\n");
2779
2780         RETURN(rc);
2781 }
2782
2783 static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2784                                              void *data)
2785 {
2786         return filter_fid2dentry(data, NULL, gr, id);
2787 }
2788
2789 static struct lvfs_callback_ops filter_lvfs_ops = {
2790         l_fid2dentry:     filter_lvfs_fid2dentry,
2791 };
2792
2793 static struct obd_ops filter_obd_ops = {
2794         .o_owner          = THIS_MODULE,
2795         .o_attach         = filter_attach,
2796         .o_detach         = filter_detach,
2797         .o_get_info       = filter_get_info,
2798         .o_set_info       = filter_set_info,
2799         .o_setup          = filter_setup,
2800         .o_precleanup     = filter_precleanup,
2801         .o_cleanup        = filter_cleanup,
2802         .o_connect        = filter_connect,
2803         .o_disconnect     = filter_disconnect,
2804         .o_statfs         = filter_statfs,
2805         .o_getattr        = filter_getattr,
2806         .o_unpackmd       = filter_unpackmd,
2807         .o_create         = filter_create,
2808         .o_setattr        = filter_setattr,
2809         .o_destroy        = filter_destroy,
2810         .o_brw            = filter_brw,
2811         .o_punch          = filter_truncate,
2812         .o_sync           = filter_sync,
2813         .o_preprw         = filter_preprw,
2814         .o_commitrw       = filter_commitrw,
2815         .o_write_extents  = filter_write_extents,
2816         .o_destroy_export = filter_destroy_export,
2817         .o_llog_init      = filter_llog_init,
2818         .o_llog_finish    = filter_llog_finish,
2819         .o_llog_connect   = filter_llog_connect,
2820         .o_iocontrol      = filter_iocontrol,
2821 };
2822
2823 static struct obd_ops filter_sanobd_ops = {
2824         .o_owner          = THIS_MODULE,
2825         .o_attach         = filter_attach,
2826         .o_detach         = filter_detach,
2827         .o_get_info       = filter_get_info,
2828         .o_set_info       = filter_set_info,
2829         .o_setup          = filter_san_setup,
2830         .o_precleanup     = filter_precleanup,
2831         .o_cleanup        = filter_cleanup,
2832         .o_connect        = filter_connect,
2833         .o_disconnect     = filter_disconnect,
2834         .o_statfs         = filter_statfs,
2835         .o_getattr        = filter_getattr,
2836         .o_unpackmd       = filter_unpackmd,
2837         .o_create         = filter_create,
2838         .o_setattr        = filter_setattr,
2839         .o_destroy        = filter_destroy,
2840         .o_brw            = filter_brw,
2841         .o_punch          = filter_truncate,
2842         .o_sync           = filter_sync,
2843         .o_preprw         = filter_preprw,
2844         .o_commitrw       = filter_commitrw,
2845         .o_write_extents  = filter_write_extents,
2846         .o_san_preprw     = filter_san_preprw,
2847         .o_destroy_export = filter_destroy_export,
2848         .o_llog_init      = filter_llog_init,
2849         .o_llog_finish    = filter_llog_finish,
2850         .o_llog_connect   = filter_llog_connect,
2851         .o_iocontrol      = filter_iocontrol,
2852 };
2853
2854 static int __init obdfilter_init(void)
2855 {
2856         struct lprocfs_static_vars lvars;
2857         int rc;
2858
2859         printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
2860
2861         lprocfs_init_vars(filter, &lvars);
2862
2863         rc = class_register_type(&filter_obd_ops, NULL, lvars.module_vars,
2864                                  OBD_FILTER_DEVICENAME);
2865         if (rc)
2866                 return rc;
2867
2868         rc = class_register_type(&filter_sanobd_ops, NULL, lvars.module_vars,
2869                                  OBD_FILTER_SAN_DEVICENAME);
2870         if (rc)
2871                 class_unregister_type(OBD_FILTER_DEVICENAME);
2872         return rc;
2873 }
2874
2875 static void __exit obdfilter_exit(void)
2876 {
2877         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2878         class_unregister_type(OBD_FILTER_DEVICENAME);
2879 }
2880
2881 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2882 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2883 MODULE_LICENSE("GPL");
2884
2885 module_init(obdfilter_init);
2886 module_exit(obdfilter_exit);