Whamcloud - gitweb
Write out MDS last_rcvd when first created, don't wait for a client connect.
[fs/lustre-release.git] / lustre / obdfilter / filter.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 /*
27  * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28  *            (which need to get journal_lock, may block if journal full).
29  *
30  * Invariant: Call filter_start_transno() before any journal ops to avoid the
31  *            same deadlock problem.  We can (and want) to get rid of the
32  *            transno sem in favour of the dir/inode i_sem to avoid single
33  *            threaded operation on the OST.
34  */
35
36 #define DEBUG_SUBSYSTEM S_FILTER
37
38 #include <linux/config.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/dcache.h>
42 #include <linux/init.h>
43 #include <linux/version.h>
44 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
45 # include <linux/mount.h>
46 # include <linux/buffer_head.h>
47 #endif
48
49 #include <linux/obd_class.h>
50 #include <linux/lustre_dlm.h>
51 #include <linux/lustre_fsfilt.h>
52 #include <linux/lprocfs_status.h>
53 #include <linux/lustre_log.h>
54 #include <linux/lustre_commit_confd.h>
55
56 #include "filter_internal.h"
57
58 #define S_SHIFT 12
59 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
60         [0]                     NULL,
61         [S_IFREG >> S_SHIFT]    "R",
62         [S_IFDIR >> S_SHIFT]    "D",
63         [S_IFCHR >> S_SHIFT]    "C",
64         [S_IFBLK >> S_SHIFT]    "B",
65         [S_IFIFO >> S_SHIFT]    "F",
66         [S_IFSOCK >> S_SHIFT]   "S",
67         [S_IFLNK >> S_SHIFT]    "L"
68 };
69
70 static inline const char *obd_mode_to_type(int mode)
71 {
72         return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
73 }
74
75 static void filter_ffd_addref(void *ffdp)
76 {
77         struct filter_file_data *ffd = ffdp;
78
79         atomic_inc(&ffd->ffd_refcount);
80         CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
81                atomic_read(&ffd->ffd_refcount));
82 }
83
84 static struct filter_file_data *filter_ffd_new(void)
85 {
86         struct filter_file_data *ffd;
87
88         OBD_ALLOC(ffd, sizeof *ffd);
89         if (ffd == NULL) {
90                 CERROR("out of memory\n");
91                 return NULL;
92         }
93
94         atomic_set(&ffd->ffd_refcount, 2);
95
96         INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
97         class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
98
99         return ffd;
100 }
101
102 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
103 {
104         struct filter_file_data *ffd = NULL;
105         ENTRY;
106         LASSERT(handle != NULL);
107         ffd = class_handle2object(handle->cookie);
108         if (ffd != NULL)
109                 LASSERT(ffd->ffd_file->private_data == ffd);
110         RETURN(ffd);
111 }
112
113 static void filter_ffd_put(struct filter_file_data *ffd)
114 {
115         CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
116                atomic_read(&ffd->ffd_refcount) - 1);
117         LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
118                 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
119         if (atomic_dec_and_test(&ffd->ffd_refcount)) {
120                 LASSERT(list_empty(&ffd->ffd_handle.h_link));
121                 OBD_FREE(ffd, sizeof *ffd);
122         }
123 }
124
125 static void filter_ffd_destroy(struct filter_file_data *ffd)
126 {
127         class_handle_unhash(&ffd->ffd_handle);
128         filter_ffd_put(ffd);
129 }
130
131 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
132                              void *cb_data, int error)
133 {
134         obd_transno_commit_cb(obd, transno, error);
135 }
136
137 static int filter_client_log_cancel(struct lustre_handle *conn,
138                                     struct lov_stripe_md *lsm, int count,
139                                     struct llog_cookie *cookies, int flags)
140 {
141         struct obd_device *obd = class_conn2obd(conn);
142         struct llog_commit_data *llcd;
143         struct filter_obd *filter = &obd->u.filter;
144         int rc = 0;
145         ENTRY;
146
147         if (count == 0 || cookies == NULL) {
148                 down(&filter->fo_sem);
149                 if (filter->fo_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
150                         GOTO(out, rc);
151
152                 llcd = filter->fo_llcd;
153                 GOTO(send_now, rc);
154         }
155
156         down(&filter->fo_sem);
157         llcd = filter->fo_llcd;
158         if (llcd == NULL) {
159                 llcd = llcd_grab();
160                 if (llcd == NULL) {
161                         CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
162                                cookies->lgc_lgl.lgl_oid,
163                                cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
164                         GOTO(out, rc = -ENOMEM);
165                 }
166                 llcd->llcd_import = filter->fo_mdc_imp;
167                 filter->fo_llcd = llcd;
168         }
169
170         memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
171                sizeof(*cookies));
172         llcd->llcd_cookiebytes += sizeof(*cookies);
173
174         GOTO(send_now, rc);
175 send_now:
176         if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
177              flags & OBD_LLOG_FL_SENDNOW)) {
178                 filter->fo_llcd = NULL;
179                 llcd_send(llcd);
180         }
181 out:
182         up(&filter->fo_sem);
183
184         return rc;
185 }
186
187 /* When this (destroy) operation is committed, return the cancel cookie */
188 static void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
189                                      void *cb_data, int error)
190 {
191         filter_client_log_cancel(&obd->u.filter.fo_mdc_conn, NULL, 1,
192                                  cb_data, OBD_LLOG_FL_SENDNOW);
193         OBD_FREE(cb_data, sizeof(struct llog_cookie));
194 }
195
196 /* Assumes caller has already pushed us into the kernel context. */
197 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
198                           int rc)
199 {
200         struct filter_obd *filter = &exp->exp_obd->u.filter;
201         struct filter_export_data *fed = &exp->exp_filter_data;
202         struct filter_client_data *fcd = fed->fed_fcd;
203         __u64 last_rcvd;
204         loff_t off;
205         ssize_t written;
206
207         /* Propagate error code. */
208         if (rc)
209                 RETURN(rc);
210
211         if (!exp->exp_obd->obd_replayable)
212                 RETURN(rc);
213
214         /* we don't allocate new transnos for replayed requests */
215         if (oti != NULL && oti->oti_transno == 0) {
216                 spin_lock(&filter->fo_translock);
217                 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
218                 filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
219                 spin_unlock(&filter->fo_translock);
220                 oti->oti_transno = last_rcvd;
221                 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
222                 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
223
224                 /* could get xid from oti, if it's ever needed */
225                 fcd->fcd_last_xid = 0;
226
227                 off = fed->fed_lr_off;
228                 fsfilt_set_last_rcvd(exp->exp_obd, last_rcvd, oti->oti_handle,
229                                      filter_commit_cb, NULL);
230                 written = fsfilt_write_record(exp->exp_obd,
231                                               filter->fo_rcvd_filp, fcd,
232                                               sizeof(*fcd), &off);
233                 CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
234                        "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid,
235                        fed->fed_lr_idx, written);
236
237                 if (written == sizeof(*fcd))
238                         RETURN(0);
239                 CERROR("error writing to %s: rc = %d\n", LAST_RCVD,
240                        (int)written);
241                 if (written >= 0)
242                         RETURN(-ENOSPC);
243                 RETURN(written);
244         }
245
246         RETURN(0);
247 }
248
249 void f_dput(struct dentry *dentry)
250 {
251         /* Can't go inside filter_ddelete because it can block */
252         CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
253                dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
254         LASSERT(atomic_read(&dentry->d_count) > 0);
255
256         dput(dentry);
257 }
258
259 /* Not racy w.r.t. others, because we are the only user of this dentry */
260 static void filter_drelease(struct dentry *dentry)
261 {
262         if (dentry->d_fsdata)
263                 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
264 }
265
266 struct dentry_operations filter_dops = {
267         d_release: filter_drelease,
268 };
269
270 /* Add client data to the FILTER.  We use a bitmap to locate a free space
271  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
272  * Otherwise, we have just read the data from the last_rcvd file and
273  * we know its offset. */
274 static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
275                              struct filter_export_data *fed, int cl_idx)
276 {
277         unsigned long *bitmap = filter->fo_last_rcvd_slots;
278         int new_client = (cl_idx == -1);
279         ENTRY;
280
281         LASSERT(bitmap != NULL);
282
283         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
284         if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
285                 RETURN(0);
286
287         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
288          * there's no need for extra complication here
289          */
290         if (new_client) {
291                 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
292         repeat:
293                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
294                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
295                         RETURN(-ENOMEM);
296                 }
297                 if (test_and_set_bit(cl_idx, bitmap)) {
298                         CERROR("FILTER client %d: found bit is set in bitmap\n",
299                                cl_idx);
300                         cl_idx = find_next_zero_bit(bitmap,
301                                                     FILTER_LR_MAX_CLIENTS,
302                                                     cl_idx);
303                         goto repeat;
304                 }
305         } else {
306                 if (test_and_set_bit(cl_idx, bitmap)) {
307                         CERROR("FILTER client %d: bit already set in bitmap!\n",
308                                cl_idx);
309                         LBUG();
310                 }
311         }
312
313         fed->fed_lr_idx = cl_idx;
314         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
315                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
316
317         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
318                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
319
320         if (new_client) {
321                 struct obd_run_ctxt saved;
322                 loff_t off = fed->fed_lr_off;
323                 int written;
324                 void *handle;
325
326                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
327                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
328
329                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
330                 /* Transaction needed to fix bug 1403 */
331                 handle = fsfilt_start(obd,
332                                       filter->fo_rcvd_filp->f_dentry->d_inode,
333                                       FSFILT_OP_SETATTR, NULL);
334                 if (IS_ERR(handle)) {
335                         written = PTR_ERR(handle);
336                         CERROR("unable to start transaction: rc %d\n",
337                                (int)written);
338                 } else {
339                         written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
340                                                       fed->fed_fcd,
341                                                       sizeof(*fed->fed_fcd),
342                                                       &off);
343                         fsfilt_commit(obd,
344                                       filter->fo_rcvd_filp->f_dentry->d_inode,
345                                       handle, 0);
346                 }
347                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
348
349                 if (written != sizeof(*fed->fed_fcd)) {
350                         CERROR("error writing %s client idx %u: rc %d\n",
351                                LAST_RCVD, fed->fed_lr_idx, written);
352                         if (written < 0)
353                                 RETURN(written);
354                         RETURN(-ENOSPC);
355                 }
356         }
357         RETURN(0);
358 }
359
360 static int filter_client_free(struct obd_export *exp, int flags)
361 {
362         struct filter_export_data *fed = &exp->exp_filter_data;
363         struct filter_obd *filter = &exp->exp_obd->u.filter;
364         struct obd_device *obd = exp->exp_obd;
365         struct filter_client_data zero_fcd;
366         struct obd_run_ctxt saved;
367         int written;
368         loff_t off;
369         ENTRY;
370
371         if (fed->fed_fcd == NULL)
372                 RETURN(0);
373
374         if (flags & OBD_OPT_FAILOVER)
375                 GOTO(free, 0);
376
377         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
378         if (strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID") == 0)
379                 GOTO(free, 0);
380
381         LASSERT(filter->fo_last_rcvd_slots != NULL);
382
383         off = fed->fed_lr_off;
384
385         CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
386                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
387
388         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
389                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
390                        fed->fed_lr_idx);
391                 LBUG();
392         }
393
394         memset(&zero_fcd, 0, sizeof zero_fcd);
395         push_ctxt(&saved, &filter->fo_ctxt, NULL);
396         written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
397                                       &zero_fcd, sizeof(zero_fcd), &off);
398
399         /* XXX: this write gets lost sometimes, unless this sync is here. */
400         if (written > 0)
401                 file_fsync(filter->fo_rcvd_filp,
402                            filter->fo_rcvd_filp->f_dentry, 1);
403         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
404
405         if (written != sizeof(zero_fcd)) {
406                 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
407                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
408                        LAST_RCVD, written);
409         } else {
410                 CDEBUG(D_INFO,
411                        "zeroed disconnecting client %s at idx %u (%llu)\n",
412                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
413         }
414
415 free:
416         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
417
418         RETURN(0);
419 }
420
421 static int filter_free_server_data(struct filter_obd *filter)
422 {
423         OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
424         filter->fo_fsd = NULL;
425         OBD_FREE(filter->fo_last_rcvd_slots,
426                  FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
427         filter->fo_last_rcvd_slots = NULL;
428         return 0;
429 }
430
431 /* assumes caller is already in kernel ctxt */
432 int filter_update_server_data(struct obd_device *obd,
433                               struct file *filp, struct filter_server_data *fsd)
434 {
435         loff_t off = 0;
436         int rc;
437         ENTRY;
438
439         CDEBUG(D_INODE, "server uuid      : %s\n", fsd->fsd_uuid);
440         CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
441                le64_to_cpu(fsd->fsd_last_objid));
442         CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
443                le64_to_cpu(fsd->fsd_last_transno));
444         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
445                le64_to_cpu(fsd->fsd_mount_count));
446
447         rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off);
448         if (rc == sizeof(*fsd))
449                 RETURN(0);
450
451         CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n", rc);
452         if (rc >= 0)
453                 RETURN(-ENOSPC);
454         RETURN(rc);
455 }
456
457 /* assumes caller has already in kernel ctxt */
458 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
459                                    __u64 init_lastobjid)
460 {
461         struct filter_obd *filter = &obd->u.filter;
462         struct filter_server_data *fsd;
463         struct filter_client_data *fcd = NULL;
464         struct inode *inode = filp->f_dentry->d_inode;
465         unsigned long last_rcvd_size = inode->i_size;
466         __u64 mount_count = 0;
467         int cl_idx;
468         loff_t off = 0;
469         int rc;
470
471         /* ensure padding in the struct is the correct size */
472         LASSERT (offsetof(struct filter_server_data, fsd_padding) +
473                  sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
474         LASSERT (offsetof(struct filter_client_data, fcd_padding) +
475                  sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
476
477         OBD_ALLOC(fsd, sizeof(*fsd));
478         if (!fsd)
479                 RETURN(-ENOMEM);
480         filter->fo_fsd = fsd;
481
482         OBD_ALLOC(filter->fo_last_rcvd_slots,
483                   FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
484         if (filter->fo_last_rcvd_slots == NULL) {
485                 OBD_FREE(fsd, sizeof(*fsd));
486                 RETURN(-ENOMEM);
487         }
488
489         if (last_rcvd_size == 0) {
490                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
491
492                 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
493                 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
494                 fsd->fsd_last_transno = 0;
495                 mount_count = fsd->fsd_mount_count = 0;
496                 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
497                 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
498                 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
499                 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
500                 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
501         } else {
502                 int retval = fsfilt_read_record(obd, filp, fsd,
503                                                 sizeof(*fsd), &off);
504                 if (retval != sizeof(*fsd)) {
505                         CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
506                                LAST_RCVD, retval);
507                         GOTO(err_fsd, rc = -EIO);
508                 }
509                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
510                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
511                 fsd->fsd_last_objid =
512                         cpu_to_le64(le64_to_cpu(fsd->fsd_last_objid) +
513                                     FILTER_SKIP_OBJID);
514         }
515
516         if (fsd->fsd_feature_incompat) {
517                 CERROR("unsupported feature %x\n",
518                        le32_to_cpu(fsd->fsd_feature_incompat));
519                 GOTO(err_fsd, rc = -EINVAL);
520         }
521         if (fsd->fsd_feature_rocompat) {
522                 CERROR("read-only feature %x\n",
523                        le32_to_cpu(fsd->fsd_feature_rocompat));
524                 /* Do something like remount filesystem read-only */
525                 GOTO(err_fsd, rc = -EINVAL);
526         }
527
528         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
529                obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
530         CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
531                obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
532         CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
533                obd->obd_name, mount_count);
534         CDEBUG(D_INODE, "%s: server data size: %u\n",
535                obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
536         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
537                obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
538         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
539                obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
540         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
541                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
542
543         if (!obd->obd_replayable) {
544                 CWARN("%s: recovery support OFF\n", obd->obd_name);
545                 GOTO(out, rc = 0);
546         }
547
548         for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
549                 __u64 last_rcvd;
550                 int mount_age;
551
552                 if (!fcd) {
553                         OBD_ALLOC(fcd, sizeof(*fcd));
554                         if (!fcd)
555                                 GOTO(err_fsd, rc = -ENOMEM);
556                 }
557
558                 /* Don't assume off is incremented properly, in case
559                  * sizeof(fsd) isn't the same as fsd->fsd_client_size.
560                  */
561                 off = le32_to_cpu(fsd->fsd_client_start) +
562                         cl_idx * le16_to_cpu(fsd->fsd_client_size);
563                 rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
564                 if (rc != sizeof(*fcd)) {
565                         CERROR("error reading FILTER %s offset %d: rc = %d\n",
566                                LAST_RCVD, cl_idx, rc);
567                         if (rc > 0) /* XXX fatal error or just abort reading? */
568                                 rc = -EIO;
569                         break;
570                 }
571
572                 if (fcd->fcd_uuid[0] == '\0') {
573                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
574                                cl_idx);
575                         continue;
576                 }
577
578                 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
579
580                 /* These exports are cleaned up by filter_disconnect(), so they
581                  * need to be set up like real exports as filter_connect() does.
582                  */
583                 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
584                 if (mount_age < FILTER_MOUNT_RECOV) {
585                         struct obd_export *exp = class_new_export(obd);
586                         struct filter_export_data *fed;
587                         CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
588                                " srv lr: "LPU64" mnt: "LPU64" last mount: "
589                                LPU64"\n", fcd->fcd_uuid, cl_idx,
590                                last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
591                                le64_to_cpu(fcd->fcd_mount_count), mount_count);
592                         if (exp == NULL) {
593                                 /* XXX this rc is ignored  */
594                                 rc = -ENOMEM;
595                                 break;
596                         }
597                         memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
598                                sizeof exp->exp_client_uuid.uuid);
599                         fed = &exp->exp_filter_data;
600                         fed->fed_fcd = fcd;
601                         filter_client_add(obd, filter, fed, cl_idx);
602                         /* create helper if export init gets more complex */
603                         INIT_LIST_HEAD(&fed->fed_open_head);
604                         spin_lock_init(&fed->fed_lock);
605
606                         fcd = NULL;
607                         obd->obd_recoverable_clients++;
608                         class_export_put(exp);
609                 } else {
610                         CDEBUG(D_INFO,
611                                "discarded client %d UUID '%s' count "LPU64"\n",
612                                cl_idx, fcd->fcd_uuid,
613                                le64_to_cpu(fcd->fcd_mount_count));
614                 }
615
616                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
617                        cl_idx, last_rcvd);
618
619                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
620                         filter->fo_fsd->fsd_last_transno=cpu_to_le64(last_rcvd);
621
622                 obd->obd_last_committed =
623                         le64_to_cpu(filter->fo_fsd->fsd_last_transno);
624
625                 if (obd->obd_recoverable_clients) {
626                         CERROR("RECOVERY: %d recoverable clients, last_rcvd "
627                                LPU64"\n", obd->obd_recoverable_clients,
628                                le64_to_cpu(filter->fo_fsd->fsd_last_transno));
629                         obd->obd_next_recovery_transno =
630                                 obd->obd_last_committed + 1;
631                         obd->obd_recovering = 1;
632                 }
633
634         }
635
636         if (fcd)
637                 OBD_FREE(fcd, sizeof(*fcd));
638
639 out:
640         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
641
642         /* save it, so mount count and last_transno is current */
643         rc = filter_update_server_data(obd, filp, filter->fo_fsd);
644
645         RETURN(rc);
646
647 err_fsd:
648         filter_free_server_data(filter);
649         RETURN(rc);
650 }
651
652 /* setup the object store with correct subdirectories */
653 static int filter_prep(struct obd_device *obd)
654 {
655         struct obd_run_ctxt saved;
656         struct filter_obd *filter = &obd->u.filter;
657         struct dentry *dentry, *O_dentry;
658         struct file *file;
659         struct inode *inode;
660         int i;
661         int rc = 0;
662         int mode = 0;
663
664         push_ctxt(&saved, &filter->fo_ctxt, NULL);
665         dentry = simple_mkdir(current->fs->pwd, "O", 0700);
666         CDEBUG(D_INODE, "got/created O: %p\n", dentry);
667         if (IS_ERR(dentry)) {
668                 rc = PTR_ERR(dentry);
669                 CERROR("cannot open/create O: rc = %d\n", rc);
670                 GOTO(out, rc);
671         }
672         filter->fo_dentry_O = dentry;
673
674         /*
675          * Create directories and/or get dentries for each object type.
676          * This saves us from having to do multiple lookups for each one.
677          */
678         O_dentry = filter->fo_dentry_O;
679         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
680                 char *name = obd_type_by_mode[mode];
681
682                 if (!name) {
683                         filter->fo_dentry_O_mode[mode] = NULL;
684                         continue;
685                 }
686                 dentry = simple_mkdir(O_dentry, name, 0700);
687                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
688                 if (IS_ERR(dentry)) {
689                         rc = PTR_ERR(dentry);
690                         CERROR("cannot create O/%s: rc = %d\n", name, rc);
691                         GOTO(err_O_mode, rc);
692                 }
693                 filter->fo_dentry_O_mode[mode] = dentry;
694         }
695
696         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
697         if (!file || IS_ERR(file)) {
698                 rc = PTR_ERR(file);
699                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
700                        LAST_RCVD, rc);
701                 GOTO(err_O_mode, rc);
702         }
703
704         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
705                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
706                        file->f_dentry->d_inode->i_mode);
707                 GOTO(err_filp, rc = -ENOENT);
708         }
709
710         rc = fsfilt_journal_data(obd, file);
711         if (rc) {
712                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
713                 GOTO(err_filp, rc);
714         }
715         /* steal operations */
716         inode = file->f_dentry->d_inode;
717         filter->fo_fop = file->f_op;
718         filter->fo_iop = inode->i_op;
719         filter->fo_aops = inode->i_mapping->a_ops;
720 #ifdef I_SKIP_PDFLUSH
721         /*
722          * we need this to protect from deadlock
723          * pdflush vs. lustre_fwrite()
724          */
725         inode->i_flags |= I_SKIP_PDFLUSH;
726 #endif
727
728         rc = filter_init_server_data(obd, file, FILTER_INIT_OBJID);
729         if (rc) {
730                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
731                 GOTO(err_client, rc);
732         }
733         filter->fo_rcvd_filp = file;
734
735         if (filter->fo_subdir_count) {
736                 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
737                 OBD_ALLOC(filter->fo_dentry_O_sub,
738                           filter->fo_subdir_count * sizeof(dentry));
739                 if (!filter->fo_dentry_O_sub)
740                         GOTO(err_client, rc = -ENOMEM);
741
742                 for (i = 0; i < filter->fo_subdir_count; i++) {
743                         char dir[20];
744                         snprintf(dir, sizeof(dir), "d%u", i);
745
746                         dentry = simple_mkdir(O_dentry, dir, 0700);
747                         CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
748                         if (IS_ERR(dentry)) {
749                                 rc = PTR_ERR(dentry);
750                                 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
751                                 GOTO(err_O_sub, rc);
752                         }
753                         filter->fo_dentry_O_sub[i] = dentry;
754                 }
755         }
756         rc = 0;
757  out:
758         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
759
760         return(rc);
761
762 err_O_sub:
763         while (i-- > 0) {
764                 struct dentry *dentry = filter->fo_dentry_O_sub[i];
765                 if (dentry) {
766                         f_dput(dentry);
767                         filter->fo_dentry_O_sub[i] = NULL;
768                 }
769         }
770         OBD_FREE(filter->fo_dentry_O_sub,
771                  filter->fo_subdir_count * sizeof(dentry));
772 err_client:
773         class_disconnect_exports(obd, 0);
774 err_filp:
775         if (filp_close(file, 0))
776                 CERROR("can't close %s after error\n", LAST_RCVD);
777         filter->fo_rcvd_filp = NULL;
778 err_O_mode:
779         while (mode-- > 0) {
780                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
781                 if (dentry) {
782                         f_dput(dentry);
783                         filter->fo_dentry_O_mode[mode] = NULL;
784                 }
785         }
786         f_dput(filter->fo_dentry_O);
787         filter->fo_dentry_O = NULL;
788         goto out;
789 }
790
791 /* cleanup the filter: write last used object id to status file */
792 static void filter_post(struct obd_device *obd)
793 {
794         struct obd_run_ctxt saved;
795         struct filter_obd *filter = &obd->u.filter;
796         long rc;
797         int mode;
798
799         /* XXX: filter_update_lastobjid used to call fsync_dev.  It might be
800          * best to start a transaction with h_sync, because we removed this
801          * from lastobjid */
802
803         push_ctxt(&saved, &filter->fo_ctxt, NULL);
804         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
805                                        filter->fo_fsd);
806         if (rc)
807                 CERROR("error writing lastobjid: rc = %ld\n", rc);
808
809
810         if (filter->fo_rcvd_filp) {
811                 rc = file_fsync(filter->fo_rcvd_filp,
812                                 filter->fo_rcvd_filp->f_dentry, 1);
813                 filp_close(filter->fo_rcvd_filp, 0);
814                 filter->fo_rcvd_filp = NULL;
815                 if (rc)
816                         CERROR("error closing %s: rc = %ld\n", LAST_RCVD, rc);
817         }
818
819         if (filter->fo_subdir_count) {
820                 int i;
821                 for (i = 0; i < filter->fo_subdir_count; i++) {
822                         struct dentry *dentry = filter->fo_dentry_O_sub[i];
823                         f_dput(dentry);
824                         filter->fo_dentry_O_sub[i] = NULL;
825                 }
826                 OBD_FREE(filter->fo_dentry_O_sub,
827                          filter->fo_subdir_count *
828                          sizeof(*filter->fo_dentry_O_sub));
829         }
830         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
831                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
832                 if (dentry) {
833                         f_dput(dentry);
834                         filter->fo_dentry_O_mode[mode] = NULL;
835                 }
836         }
837         f_dput(filter->fo_dentry_O);
838         filter_free_server_data(filter);
839         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
840 }
841
842 __u64 filter_next_id(struct filter_obd *filter)
843 {
844         obd_id id;
845         LASSERT(filter->fo_fsd != NULL);
846
847         spin_lock(&filter->fo_objidlock);
848         id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
849         filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
850         spin_unlock(&filter->fo_objidlock);
851
852         return id;
853 }
854
855 /* direct cut-n-paste of mds_blocking_ast() */
856 static int filter_blocking_ast(struct ldlm_lock *lock,
857                                struct ldlm_lock_desc *desc,
858                                void *data, int flag)
859 {
860         int do_ast;
861         ENTRY;
862
863         if (flag == LDLM_CB_CANCELING) {
864                 /* Don't need to do anything here. */
865                 RETURN(0);
866         }
867
868         /* XXX layering violation!  -phil */
869         l_lock(&lock->l_resource->lr_namespace->ns_lock);
870         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
871          * such that mds_blocking_ast is called just before l_i_p takes the
872          * ns_lock, then by the time we get the lock, we might not be the
873          * correct blocking function anymore.  So check, and return early, if
874          * so. */
875         if (lock->l_blocking_ast != filter_blocking_ast) {
876                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
877                 RETURN(0);
878         }
879
880         lock->l_flags |= LDLM_FL_CBPENDING;
881         do_ast = (!lock->l_readers && !lock->l_writers);
882         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
883
884         if (do_ast) {
885                 struct lustre_handle lockh;
886                 int rc;
887
888                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
889                 ldlm_lock2handle(lock, &lockh);
890                 rc = ldlm_cli_cancel(&lockh);
891                 if (rc < 0)
892                         CERROR("ldlm_cli_cancel: %d\n", rc);
893         } else {
894                 LDLM_DEBUG(lock, "Lock still has references, will be "
895                            "cancelled later");
896         }
897         RETURN(0);
898 }
899
900 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
901                               ldlm_mode_t lock_mode,struct lustre_handle *lockh)
902 {
903         struct ldlm_res_id res_id = { .name = {0} };
904         int flags = 0, rc;
905         ENTRY;
906
907         res_id.name[0] = de->d_inode->i_ino;
908         res_id.name[1] = de->d_inode->i_generation;
909         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
910                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
911                               &flags, ldlm_completion_ast,
912                               filter_blocking_ast, NULL, lockh);
913
914         RETURN(rc == ELDLM_OK ? 0 : -ENOLCK);  /* XXX translate ldlm code */
915 }
916
917 /* We never dget the object parent, so DON'T dput it either */
918 static void filter_parent_unlock(struct dentry *dparent,
919                                  struct lustre_handle *lockh,
920                                  ldlm_mode_t lock_mode)
921 {
922         ldlm_lock_decref(lockh, lock_mode);
923 }
924
925 /* We never dget the object parent, so DON'T dput it either */
926 struct dentry *filter_parent(struct obd_device *obd, obd_mode mode,
927                              obd_id objid)
928 {
929         struct filter_obd *filter = &obd->u.filter;
930
931         LASSERT(S_ISREG(mode));   /* only regular files for now */
932         if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
933                 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
934
935         return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
936 }
937
938 /* We never dget the object parent, so DON'T dput it either */
939 struct dentry *filter_parent_lock(struct obd_device *obd, obd_mode mode,
940                                   obd_id objid, ldlm_mode_t lock_mode,
941                                   struct lustre_handle *lockh)
942 {
943         unsigned long now = jiffies;
944         struct dentry *de = filter_parent(obd, mode, objid);
945         int rc;
946
947         if (IS_ERR(de))
948                 return de;
949
950         rc = filter_lock_dentry(obd, de, lock_mode, lockh);
951         if (time_after(jiffies, now + 15 * HZ))
952                 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
953         return rc ? ERR_PTR(rc) : de;
954 }
955
956 /* How to get files, dentries, inodes from object id's.
957  *
958  * If dir_dentry is passed, the caller has already locked the parent
959  * appropriately for this operation (normally a write lock).  If
960  * dir_dentry is NULL, we do a read lock while we do the lookup to
961  * avoid races with create/destroy and such changing the directory
962  * internal to the filesystem code. */
963 struct dentry *filter_fid2dentry(struct obd_device *obd,
964                                  struct dentry *dir_dentry,
965                                  obd_mode mode, obd_id id)
966 {
967         struct lustre_handle lockh;
968         struct dentry *dparent = dir_dentry;
969         struct dentry *dchild;
970         char name[32];
971         int len;
972         ENTRY;
973
974         if (id == 0) {
975                 CERROR("fatal: invalid object id 0\n");
976                 LBUG();
977                 RETURN(ERR_PTR(-ESTALE));
978         }
979
980         len = sprintf(name, LPU64, id);
981         if (dir_dentry == NULL) {
982                 dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
983                 if (IS_ERR(dparent))
984                         RETURN(dparent);
985         }
986         CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
987                dparent->d_name.len, dparent->d_name.name, name);
988         dchild = ll_lookup_one_len(name, dparent, len);
989         if (dir_dentry == NULL)
990                 filter_parent_unlock(dparent, &lockh, LCK_PR);
991         if (IS_ERR(dchild)) {
992                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
993                 RETURN(dchild);
994         }
995
996         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
997                name, dchild, atomic_read(&dchild->d_count));
998
999         LASSERT(atomic_read(&dchild->d_count) > 0);
1000
1001         RETURN(dchild);
1002 }
1003
1004 static struct file *filter_obj_open(struct obd_export *export,
1005                                     struct obd_trans_info *oti,
1006                                     __u64 id, __u32 type, int parent_mode,
1007                                     struct lustre_handle *parent_lockh)
1008 {
1009         struct obd_device *obd = export->exp_obd;
1010         struct filter_obd *filter = &obd->u.filter;
1011         struct dentry *dchild = NULL, *dparent = NULL;
1012         struct filter_export_data *fed = &export->exp_filter_data;
1013         struct filter_dentry_data *fdd = NULL;
1014         struct filter_file_data *ffd = NULL;
1015         struct obd_run_ctxt saved;
1016         char name[24];
1017         struct file *file;
1018         int len, cleanup_phase = 0;
1019         ENTRY;
1020
1021         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1022
1023         if (!id) {
1024                 CERROR("fatal: invalid obdo "LPU64"\n", id);
1025                 GOTO(cleanup, file = ERR_PTR(-ESTALE));
1026         }
1027
1028         if (!(type & S_IFMT)) {
1029                 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
1030                        __FUNCTION__, id, type);
1031                 GOTO(cleanup, file = ERR_PTR(-EINVAL));
1032         }
1033
1034         ffd = filter_ffd_new();
1035         if (ffd == NULL) {
1036                 CERROR("obdfilter: out of memory\n");
1037                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1038         }
1039
1040         cleanup_phase = 1;
1041
1042         /* We preallocate this to avoid blocking while holding fo_fddlock */
1043         OBD_ALLOC(fdd, sizeof *fdd);
1044         if (fdd == NULL) {
1045                 CERROR("obdfilter: out of memory\n");
1046                 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1047         }
1048
1049         cleanup_phase = 2;
1050
1051         dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1052         if (IS_ERR(dparent))
1053                 GOTO(cleanup, file = (void *)dparent);
1054
1055         cleanup_phase = 3;
1056
1057         len = snprintf(name, sizeof(name), LPU64, id);
1058         dchild = ll_lookup_one_len(name, dparent, len);
1059         if (IS_ERR(dchild))
1060                 GOTO(cleanup, file = (void *)dchild);
1061
1062         cleanup_phase = 4;
1063
1064         if (dchild->d_inode == NULL) {
1065                 CERROR("opening non-existent object %s - O_CREAT?\n", name);
1066                 /* dput(dchild); call filter_create_internal here */
1067                 file = ERR_PTR(-ENOENT);
1068                 GOTO(cleanup, file);
1069         }
1070
1071         /* dentry_open does a dput(dchild) and mntput(mnt) on error */
1072         mntget(filter->fo_vfsmnt);
1073         file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1074         if (IS_ERR(file)) {
1075                 dchild = NULL; /* prevent a double dput in step 4 */
1076                 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1077                 GOTO(cleanup, file);
1078         }
1079
1080         spin_lock(&filter->fo_fddlock);
1081         if (dchild->d_fsdata) {
1082                 spin_unlock(&filter->fo_fddlock);
1083                 OBD_FREE(fdd, sizeof *fdd);
1084                 fdd = dchild->d_fsdata;
1085                 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1086                 /* should only happen during client recovery */
1087                 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1088                         CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1089                 atomic_inc(&fdd->fdd_open_count);
1090         } else {
1091                 atomic_set(&fdd->fdd_open_count, 1);
1092                 fdd->fdd_magic = FILTER_DENTRY_MAGIC;
1093                 fdd->fdd_flags = 0;
1094                 fdd->fdd_objid = id;
1095                 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1096                 dchild->d_fsdata = fdd;
1097                 spin_unlock(&filter->fo_fddlock);
1098         }
1099
1100         ffd->ffd_file = file;
1101         LASSERT(file->private_data == NULL);
1102         file->private_data = ffd;
1103
1104         if (!dchild->d_op)
1105                 dchild->d_op = &filter_dops;
1106         else
1107                 LASSERT(dchild->d_op == &filter_dops);
1108
1109         spin_lock(&fed->fed_lock);
1110         list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1111         spin_unlock(&fed->fed_lock);
1112
1113         CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1114 cleanup:
1115         switch (cleanup_phase) {
1116         case 4:
1117                 if (IS_ERR(file))
1118                         f_dput(dchild);
1119         case 3:
1120                 if (IS_ERR(file))
1121                         filter_parent_unlock(dparent, parent_lockh,parent_mode);
1122         case 2:
1123                 if (IS_ERR(file))
1124                         OBD_FREE(fdd, sizeof *fdd);
1125         case 1:
1126                 if (IS_ERR(file))
1127                         filter_ffd_destroy(ffd);
1128                 filter_ffd_put(ffd);
1129         case 0:
1130                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1131         }
1132         RETURN(file);
1133 }
1134
1135 /* Caller must hold LCK_PW on parent and push us into kernel context.
1136  * Caller is also required to ensure that dchild->d_inode exists. */
1137 static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
1138                                    struct dentry *dparent,
1139                                    struct dentry *dchild)
1140 {
1141         struct inode *inode = dchild->d_inode;
1142         int rc;
1143         ENTRY;
1144
1145         if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1146                 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1147                        dchild->d_name.len, dchild->d_name.name,
1148                        inode->i_nlink, atomic_read(&inode->i_count));
1149         }
1150
1151         
1152 #if 0
1153         /* Tell the clients that the object is gone now and that they should
1154          * throw away any cached pages.  We don't need to wait until they're
1155          * done, so just decref the lock right away and let ldlm_completion_ast
1156          * clean up when it's all over. */
1157         ldlm_cli_enqueue(..., LCK_PW, AST_INTENT_DESTROY, &lockh);
1158         ldlm_lock_decref(&lockh, LCK_PW);
1159 #endif
1160
1161         if (0) {
1162                 struct lustre_handle lockh;
1163                 int flags = 0, rc;
1164                 struct ldlm_res_id res_id = { .name = { objid } };
1165
1166                 /* This part is a wee bit iffy: we really only want to bust the
1167                  * locks on our stripe, so that we don't end up bouncing
1168                  * [0->EOF] locks around on each of the OSTs as the rest of the
1169                  * destroys get processed.  Because we're only talking to
1170                  * the local LDLM, though, we should only end up locking the 
1171                  * whole of our stripe.  When bug 1425 (take all locks on OST
1172                  * for stripe 0) is fixed, this code should be revisited. */
1173                 struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
1174
1175                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1176                                       res_id, LDLM_EXTENT, &extent,
1177                                       sizeof(extent), LCK_PW, &flags,
1178                                       ldlm_completion_ast, filter_blocking_ast,
1179                                       NULL, &lockh);
1180                 /* We only care about the side-effects, just drop the lock. */
1181                 ldlm_lock_decref(&lockh, LCK_PW);
1182         }
1183
1184         rc = vfs_unlink(dparent->d_inode, dchild);
1185
1186         if (rc)
1187                 CERROR("error unlinking objid %*s: rc %d\n",
1188                        dchild->d_name.len, dchild->d_name.name, rc);
1189
1190         RETURN(rc);
1191 }
1192
1193 /* If closing because we are failing this device, then
1194    don't do the unlink on close.
1195 */
1196 static int filter_close_internal(struct obd_export *exp,
1197                                  struct filter_file_data *ffd,
1198                                  struct obd_trans_info *oti, int flags)
1199 {
1200         struct obd_device *obd = exp->exp_obd;
1201         struct filter_obd *filter = &obd->u.filter;
1202         struct file *filp = ffd->ffd_file;
1203         struct dentry *dchild = dget(filp->f_dentry);
1204         struct filter_dentry_data *fdd = dchild->d_fsdata;
1205         struct lustre_handle parent_lockh;
1206         int rc, rc2, cleanup_phase = 0;
1207         struct dentry *dparent = NULL;
1208         struct obd_run_ctxt saved;
1209         int nested_trans = (current->journal_info != NULL);
1210         ENTRY;
1211
1212         LASSERT(filp->private_data == ffd);
1213         LASSERT(fdd != NULL);
1214         LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1215
1216         rc = filp_close(filp, 0);
1217
1218         if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1219             (fdd->fdd_flags & FILTER_FLAG_DESTROY) &&
1220             !(flags & OBD_OPT_FAILOVER)) {
1221                 void *handle;
1222
1223                 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1224                 cleanup_phase = 1;
1225
1226                 LASSERT(fdd->fdd_objid > 0);
1227                 dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1228                                              LCK_PW, &parent_lockh);
1229                 if (IS_ERR(dparent))
1230                         GOTO(cleanup, rc = PTR_ERR(dparent));
1231                 cleanup_phase = 2;
1232
1233                 handle = fsfilt_start(obd, dparent->d_inode,
1234                                       FSFILT_OP_UNLINK_LOG, oti);
1235                 if (IS_ERR(handle))
1236                         GOTO(cleanup, rc = PTR_ERR(handle));
1237
1238                 if (oti != NULL) {
1239                         if (oti->oti_handle == NULL)
1240                                 oti->oti_handle = handle;
1241                         else
1242                                 LASSERT(oti->oti_handle == handle);
1243                 }
1244
1245 #ifdef ENABLE_ORPHANS
1246                 /* Remove orphan unlink record from log */
1247                 llog_cancel_records(filter->fo_catalog, 1, &fdd->fdd_cookie);
1248 #endif
1249                 /* XXX unlink from PENDING directory now too */
1250                 rc2 = filter_destroy_internal(obd, fdd->fdd_objid, dparent,
1251                                               dchild);
1252                 if (rc2 && !rc)
1253                         rc = rc2;
1254                 rc = filter_finish_transno(exp, oti, rc);
1255                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1256                 if (rc2) {
1257                         CERROR("error on commit, err = %d\n", rc2);
1258                         if (!rc)
1259                                 rc = rc2;
1260                 }
1261                 if (nested_trans == 0) {
1262                         LASSERT(current->journal_info == NULL);
1263                         if (oti != NULL)
1264                                 oti->oti_handle = NULL;
1265                 }
1266         }
1267
1268 cleanup:
1269         switch(cleanup_phase) {
1270         case 2:
1271                 if (rc || oti == NULL) {
1272                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1273                 } else {
1274                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1275                                sizeof(parent_lockh));
1276                         oti->oti_ack_locks[0].mode = LCK_PW;
1277                 }
1278         case 1:
1279                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1280         case 0:
1281                 f_dput(dchild);
1282                 filter_ffd_destroy(ffd);
1283                 break;
1284         default:
1285                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1286                 LBUG();
1287         }
1288
1289         RETURN(rc);
1290 }
1291
1292 /* mount the file system (secretly) */
1293 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1294                         char *option)
1295 {
1296         struct obd_ioctl_data* data = buf;
1297         struct filter_obd *filter = &obd->u.filter;
1298         struct vfsmount *mnt;
1299         int rc = 0;
1300         ENTRY;
1301
1302         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1303                 RETURN(-EINVAL);
1304
1305         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1306         if (IS_ERR(obd->obd_fsops))
1307                 RETURN(PTR_ERR(obd->obd_fsops));
1308
1309         mnt = do_kern_mount(data->ioc_inlbuf2, MS_NOATIME | MS_NODIRATIME,
1310                             data->ioc_inlbuf1, option);
1311         rc = PTR_ERR(mnt);
1312         if (IS_ERR(mnt))
1313                 GOTO(err_ops, rc);
1314
1315         if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1316                 if (*data->ioc_inlbuf3 == 'f') {
1317                         obd->obd_replayable = 1;
1318                         obd_sync_filter = 1;
1319                         CERROR("%s: configured for recovery and sync write\n",
1320                                obd->obd_name);
1321                 } else {
1322                         if (*data->ioc_inlbuf3 != 'n') {
1323                                 CERROR("unrecognised flag '%c'\n",
1324                                        *data->ioc_inlbuf3);
1325                         }
1326                 }
1327         }
1328
1329         if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
1330                 if (*data->ioc_inlbuf4 == '/') {
1331                         CERROR("filter namespace mount: %s\n",
1332                                data->ioc_inlbuf4);
1333                         filter->fo_nspath = strdup(data->ioc_inlbuf4);
1334                 } else {
1335                         CERROR("namespace mount must be absolute path: '%s'\n",
1336                                data->ioc_inlbuf4);
1337                 }
1338         }
1339
1340         filter->fo_vfsmnt = mnt;
1341         filter->fo_sb = mnt->mnt_sb;
1342         filter->fo_fstype = mnt->mnt_sb->s_type->name;
1343         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1344
1345         OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1346         filter->fo_ctxt.pwdmnt = mnt;
1347         filter->fo_ctxt.pwd = mnt->mnt_root;
1348         filter->fo_ctxt.fs = get_ds();
1349
1350         rc = filter_prep(obd);
1351         if (rc)
1352                 GOTO(err_mntput, rc);
1353
1354         spin_lock_init(&filter->fo_translock);
1355         spin_lock_init(&filter->fo_fddlock);
1356         spin_lock_init(&filter->fo_objidlock);
1357         INIT_LIST_HEAD(&filter->fo_export_list);
1358
1359         ptlrpc_init_client(MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1360                            "filter_mdc", &filter->fo_mdc_client);
1361         sema_init(&filter->fo_sem, 1);
1362
1363         obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1364                                                 LDLM_NAMESPACE_SERVER);
1365         if (obd->obd_namespace == NULL)
1366                 GOTO(err_post, rc = -ENOMEM);
1367
1368         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1369                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1370
1371         /* Create a non-replaying connection for recovery logging, so that
1372          * we don't create a client entry for this local connection, and do
1373          * not log or assign transaction numbers for logging operations. */
1374 #ifdef ENABLE_ORPHANS
1375         filter->fo_catalog = filter_get_catalog(obd);
1376         if (IS_ERR(filter->fo_catalog))
1377                 GOTO(err_post, rc = PTR_ERR(filter->fo_catalog));
1378 #endif
1379
1380         RETURN(0);
1381
1382 err_post:
1383         filter_post(obd);
1384 err_mntput:
1385         unlock_kernel();
1386         mntput(mnt);
1387         filter->fo_sb = 0;
1388         lock_kernel();
1389 err_ops:
1390         fsfilt_put_ops(obd->obd_fsops);
1391         return rc;
1392 }
1393
1394 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1395 {
1396         struct obd_ioctl_data* data = buf;
1397         char *option = NULL;
1398
1399 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1400         /* bug 1577: implement async-delete for 2.5 */
1401         if (!strcmp(data->ioc_inlbuf2, "ext3"))
1402                 option = "asyncdel";
1403 #endif
1404
1405         return filter_common_setup(obd, len, buf, option);
1406 }
1407
1408 static int filter_cleanup(struct obd_device *obd, int flags)
1409 {
1410         struct filter_obd *filter = &obd->u.filter;
1411         ENTRY;
1412
1413         if (flags & OBD_OPT_FAILOVER)
1414                 CERROR("%s: shutting down for failover; client state will"
1415                        " be preserved.\n", obd->obd_name);
1416
1417         if (!list_empty(&obd->obd_exports)) {
1418                 CERROR("%s: still has clients!\n", obd->obd_name);
1419                 class_disconnect_exports(obd, flags);
1420                 if (!list_empty(&obd->obd_exports)) {
1421                         CERROR("still has exports after forced cleanup?\n");
1422                         RETURN(-EBUSY);
1423                 }
1424         }
1425
1426 #ifdef ENABLE_ORPHANS
1427         filter_put_catalog(filter->fo_catalog);
1428 #endif
1429
1430         ldlm_namespace_free(obd->obd_namespace);
1431
1432         if (filter->fo_sb == NULL)
1433                 RETURN(0);
1434
1435         filter_post(obd);
1436
1437         shrink_dcache_parent(filter->fo_sb->s_root);
1438         filter->fo_sb = 0;
1439
1440         if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
1441                 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1442                        atomic_read(&filter->fo_vfsmnt->mnt_count));
1443
1444         unlock_kernel();
1445         mntput(filter->fo_vfsmnt);
1446         //destroy_buffers(filter->fo_sb->s_dev);
1447         filter->fo_sb = NULL;
1448         fsfilt_put_ops(obd->obd_fsops);
1449         lock_kernel();
1450
1451         RETURN(0);
1452 }
1453
1454 static int filter_attach(struct obd_device *obd, obd_count len, void *data)
1455 {
1456         struct lprocfs_static_vars lvars;
1457         int rc;
1458
1459         lprocfs_init_vars(filter, &lvars);
1460         rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1461         if (rc != 0)
1462                 return rc;
1463
1464         rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1465         if (rc != 0)
1466                 return rc;
1467
1468         /* Init obdfilter private stats here */
1469         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1470                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1471         lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1472                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1473         return rc;
1474 }
1475
1476 static int filter_detach(struct obd_device *dev)
1477 {
1478         lprocfs_free_obd_stats(dev);
1479         return lprocfs_obd_detach(dev);
1480 }
1481
1482 /* nearly identical to mds_connect */
1483 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1484                           struct obd_uuid *cluuid)
1485 {
1486         struct obd_export *exp;
1487         struct filter_export_data *fed;
1488         struct filter_client_data *fcd;
1489         struct filter_obd *filter = &obd->u.filter;
1490         int rc;
1491         ENTRY;
1492
1493         if (conn == NULL || obd == NULL || cluuid == NULL)
1494                 RETURN(-EINVAL);
1495
1496         rc = class_connect(conn, obd, cluuid);
1497         if (rc)
1498                 RETURN(rc);
1499         exp = class_conn2export(conn);
1500         LASSERT(exp != NULL);
1501
1502         fed = &exp->exp_filter_data;
1503         class_export_put(exp);
1504
1505         INIT_LIST_HEAD(&fed->fed_open_head);
1506         spin_lock_init(&fed->fed_lock);
1507
1508         if (!obd->obd_replayable)
1509                 RETURN(0);
1510
1511         OBD_ALLOC(fcd, sizeof(*fcd));
1512         if (!fcd) {
1513                 CERROR("filter: out of memory for client data\n");
1514                 GOTO(out_export, rc = -ENOMEM);
1515         }
1516
1517         memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1518         fed->fed_fcd = fcd;
1519         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1520
1521         rc = filter_client_add(obd, filter, fed, -1);
1522         if (rc)
1523                 GOTO(out_fcd, rc);
1524
1525         RETURN(rc);
1526
1527 out_fcd:
1528         OBD_FREE(fcd, sizeof(*fcd));
1529 out_export:
1530         class_disconnect(conn, 0);
1531
1532         RETURN(rc);
1533 }
1534
1535 static void filter_destroy_export(struct obd_export *exp)
1536 {
1537         struct filter_export_data *fed = &exp->exp_filter_data;
1538
1539         ENTRY;
1540         spin_lock(&fed->fed_lock);
1541         while (!list_empty(&fed->fed_open_head)) {
1542                 struct filter_file_data *ffd;
1543
1544                 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1545                                  ffd_export_list);
1546                 list_del(&ffd->ffd_export_list);
1547                 spin_unlock(&fed->fed_lock);
1548
1549                 CDEBUG(D_INFO, "force close file %*s (hdl %p:"LPX64") on "
1550                        "disconnect\n", ffd->ffd_file->f_dentry->d_name.len,
1551                        ffd->ffd_file->f_dentry->d_name.name,
1552                        ffd, ffd->ffd_handle.h_cookie);
1553
1554                 filter_close_internal(exp, ffd, NULL, exp->exp_flags);
1555                 spin_lock(&fed->fed_lock);
1556         }
1557         spin_unlock(&fed->fed_lock);
1558
1559         if (exp->exp_obd->obd_replayable)
1560                 filter_client_free(exp, exp->exp_flags);
1561         EXIT;
1562 }
1563
1564 /* also incredibly similar to mds_disconnect */
1565 static int filter_disconnect(struct lustre_handle *conn, int flags)
1566 {
1567         struct obd_export *exp = class_conn2export(conn);
1568         unsigned long irqflags;
1569         int rc;
1570         ENTRY;
1571
1572         LASSERT(exp);
1573         ldlm_cancel_locks_for_export(exp);
1574
1575         spin_lock_irqsave(&exp->exp_lock, irqflags);
1576         exp->exp_flags = flags;
1577         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1578
1579         rc = class_disconnect(conn, flags);
1580
1581         fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1582         class_export_put(exp);
1583         /* XXX cleanup preallocated inodes */
1584         RETURN(rc);
1585 }
1586
1587 struct dentry *__filter_oa2dentry(struct obd_device *obd,
1588                                   struct obdo *oa, const char *what)
1589 {
1590         struct dentry *dchild = NULL;
1591
1592         if (oa->o_valid & OBD_MD_FLHANDLE) {
1593                 struct lustre_handle *ost_handle = obdo_handle(oa);
1594                 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1595
1596                 if (ffd != NULL) {
1597                         struct filter_dentry_data *fdd;
1598                         dchild = dget(ffd->ffd_file->f_dentry);
1599                         fdd = dchild->d_fsdata;
1600                         LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1601                         filter_ffd_put(ffd);
1602
1603                         CDEBUG(D_INODE,"%s got child objid %*s: %p, count %d\n",
1604                                what, dchild->d_name.len, dchild->d_name.name,
1605                                dchild, atomic_read(&dchild->d_count));
1606                 }
1607         }
1608
1609         if (!dchild)
1610                 dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
1611
1612         if (IS_ERR(dchild)) {
1613                 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1614                 RETURN(dchild);
1615         }
1616
1617         if (!dchild->d_inode) {
1618                 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1619                 f_dput(dchild);
1620                 RETURN(ERR_PTR(-ENOENT));
1621         }
1622
1623         return dchild;
1624 }
1625
1626 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1627                           struct lov_stripe_md *md)
1628 {
1629         struct dentry *dentry = NULL;
1630         struct obd_device *obd;
1631         int rc = 0;
1632         ENTRY;
1633
1634         obd = class_conn2obd(conn);
1635         if (obd == NULL) {
1636                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1637                 RETURN(-EINVAL);
1638         }
1639
1640         dentry = filter_oa2dentry(obd, oa);
1641         if (IS_ERR(dentry))
1642                 RETURN(PTR_ERR(dentry));
1643
1644         /* Limit the valid bits in the return data to what we actually use */
1645         oa->o_valid = OBD_MD_FLID;
1646         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1647
1648         f_dput(dentry);
1649         RETURN(rc);
1650 }
1651
1652 /* this is called from filter_truncate() until we have filter_punch() */
1653 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1654                           struct lov_stripe_md *md, struct obd_trans_info *oti)
1655 {
1656         struct obd_run_ctxt saved;
1657         struct obd_export *exp;
1658         struct filter_obd *filter;
1659         struct dentry *dentry;
1660         struct iattr iattr;
1661         void *handle;
1662         int rc, rc2;
1663         ENTRY;
1664
1665         LASSERT(oti != NULL);
1666         exp = class_conn2export(conn);
1667         if (!exp) {
1668                 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1669                 RETURN(-EINVAL);
1670         }
1671
1672         dentry = filter_oa2dentry(exp->exp_obd, oa);
1673         if (IS_ERR(dentry))
1674                 GOTO(out_exp, rc = PTR_ERR(dentry));
1675
1676         filter = &exp->exp_obd->u.filter;
1677
1678         iattr_from_obdo(&iattr, oa, oa->o_valid);
1679
1680         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1681         lock_kernel();
1682
1683         /* XXX this could be a rwsem instead, if filter_preprw played along */
1684         if (iattr.ia_valid & ATTR_SIZE)
1685                 down(&dentry->d_inode->i_sem);
1686
1687         handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
1688                               oti);
1689         if (IS_ERR(handle))
1690                 GOTO(out_unlock, rc = PTR_ERR(handle));
1691
1692         rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
1693         rc = filter_finish_transno(exp, oti, rc);
1694         rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
1695         if (rc2) {
1696                 CERROR("error on commit, err = %d\n", rc2);
1697                 if (!rc)
1698                         rc = rc2;
1699         }
1700
1701         if (iattr.ia_valid & ATTR_SIZE)
1702                 up(&dentry->d_inode->i_sem);
1703
1704         oa->o_valid = OBD_MD_FLID;
1705         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1706
1707 out_unlock:
1708         unlock_kernel();
1709         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1710
1711         f_dput(dentry);
1712  out_exp:
1713         class_export_put(exp);
1714         RETURN(rc);
1715 }
1716
1717 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1718                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
1719                        struct obd_client_handle *och)
1720 {
1721         struct obd_export *exp;
1722         struct lustre_handle *handle;
1723         struct filter_file_data *ffd;
1724         struct file *filp;
1725         struct lustre_handle parent_lockh;
1726         int rc = 0;
1727         ENTRY;
1728
1729         exp = class_conn2export(conn);
1730         if (exp == NULL) {
1731                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1732                 RETURN(-EINVAL);
1733         }
1734
1735         filp = filter_obj_open(exp, oti, oa->o_id, oa->o_mode,
1736                                LCK_PR, &parent_lockh);
1737         if (IS_ERR(filp))
1738                 GOTO(out, rc = PTR_ERR(filp));
1739
1740         oa->o_valid = OBD_MD_FLID;
1741         obdo_from_inode(oa, filp->f_dentry->d_inode, FILTER_VALID_FLAGS);
1742
1743         ffd = filp->private_data;
1744         handle = obdo_handle(oa);
1745         handle->cookie = ffd->ffd_handle.h_cookie;
1746         oa->o_valid |= OBD_MD_FLHANDLE;
1747
1748 out:
1749         class_export_put(exp);
1750         if (!rc) {
1751                 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1752                        sizeof(parent_lockh));
1753                 oti->oti_ack_locks[0].mode = LCK_PR;
1754         }
1755         RETURN(rc);
1756 }
1757
1758 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1759                         struct lov_stripe_md *ea, struct obd_trans_info *oti)
1760 {
1761         struct obd_export *exp;
1762         struct filter_file_data *ffd;
1763         struct filter_export_data *fed;
1764         int rc;
1765         ENTRY;
1766
1767         exp = class_conn2export(conn);
1768         if (exp == NULL) {
1769                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1770                 RETURN(-EINVAL);
1771         }
1772
1773         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1774                 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1775                 GOTO(out, rc = -EINVAL);
1776         }
1777
1778         ffd = filter_handle2ffd(obdo_handle(oa));
1779         if (ffd == NULL) {
1780                 CERROR("bad handle ("LPX64") for close\n",
1781                        obdo_handle(oa)->cookie);
1782                 GOTO(out, rc = -ESTALE);
1783         }
1784
1785         fed = &exp->exp_filter_data;
1786         spin_lock(&fed->fed_lock);
1787         list_del(&ffd->ffd_export_list);
1788         spin_unlock(&fed->fed_lock);
1789
1790         oa->o_valid = OBD_MD_FLID;
1791         obdo_from_inode(oa,ffd->ffd_file->f_dentry->d_inode,FILTER_VALID_FLAGS);
1792
1793         rc = filter_close_internal(exp, ffd, oti, 0);
1794         filter_ffd_put(ffd);
1795         GOTO(out, rc);
1796  out:
1797         class_export_put(exp);
1798         return rc;
1799 }
1800
1801 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1802                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
1803 {
1804         struct obd_export *exp;
1805         struct obd_device *obd;
1806         struct filter_obd *filter;
1807         struct obd_run_ctxt saved;
1808         struct lustre_handle parent_lockh;
1809         struct dentry *dparent;
1810         struct ll_fid mds_fid = { .id = 0 };
1811         struct dentry *dchild = NULL;
1812         void *handle;
1813         int err, rc, cleanup_phase;
1814         ENTRY;
1815
1816         exp = class_conn2export(conn);
1817         if (exp == NULL) {
1818                 CDEBUG(D_IOCTL,"invalid client cookie "LPX64"\n", conn->cookie);
1819                 RETURN(-EINVAL);
1820         }
1821
1822         obd = exp->exp_obd;
1823         filter = &obd->u.filter;
1824         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1825  retry:
1826         oa->o_id = filter_next_id(filter);
1827
1828         cleanup_phase = 0;
1829         dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1830                                      &parent_lockh);
1831         if (IS_ERR(dparent))
1832                 GOTO(cleanup, rc = PTR_ERR(dparent));
1833         cleanup_phase = 1;
1834
1835         dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1836         if (IS_ERR(dchild))
1837                 GOTO(cleanup, rc = PTR_ERR(dchild));
1838         if (dchild->d_inode) {
1839                 /* This would only happen if lastobjid was bad on disk */
1840                 CERROR("Serious error: objid %*s already exists; is this "
1841                        "filesystem corrupt?  I will try to work around it.\n",
1842                        dchild->d_name.len, dchild->d_name.name);
1843                 f_dput(dchild);
1844                 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1845                 goto retry;
1846         }
1847
1848         cleanup_phase = 2;
1849         handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE_LOG, oti);
1850         if (IS_ERR(handle))
1851                 GOTO(cleanup, rc = PTR_ERR(handle));
1852
1853         rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
1854         if (rc) {
1855                 CERROR("create failed rc = %d\n", rc);
1856         } else if (oa->o_valid & (OBD_MD_FLCTIME|OBD_MD_FLMTIME|OBD_MD_FLSIZE)){
1857                 struct iattr attr;
1858
1859                 iattr_from_obdo(&attr, oa, oa->o_valid);
1860                 rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
1861                 if (rc)
1862                         CERROR("create setattr failed rc = %d\n", rc);
1863         }
1864         rc = filter_finish_transno(exp, oti, rc);
1865         err = filter_update_server_data(obd, filter->fo_rcvd_filp,
1866                                         filter->fo_fsd);
1867         if (err)
1868                 CERROR("unable to write lastobjid but file created\n");
1869
1870         /* Set flags for fields we have set in the inode struct */
1871         if (!rc && mds_fid.id && (oa->o_valid & OBD_MD_FLCOOKIE)) {
1872                 err = filter_log_op_create(obd->u.filter.fo_catalog, &mds_fid,
1873                                            dchild->d_inode->i_ino,
1874                                            dchild->d_inode->i_generation,
1875                                            oti->oti_logcookies);
1876                 if (err) {
1877                         CERROR("error logging create record: rc %d\n", err);
1878                         oa->o_valid = OBD_MD_FLID;
1879                 } else {
1880                         oa->o_valid = OBD_MD_FLID | OBD_MD_FLCOOKIE;
1881                 }
1882         } else
1883                 oa->o_valid = OBD_MD_FLID;
1884
1885         err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1886         if (err) {
1887                 CERROR("error on commit, err = %d\n", err);
1888                 if (!rc)
1889                         rc = err;
1890         }
1891
1892         if (rc)
1893                 GOTO(cleanup, rc);
1894
1895         /* Set flags for fields we have set in the inode struct */
1896         obdo_from_inode(oa, dchild->d_inode, FILTER_VALID_FLAGS);
1897
1898         EXIT;
1899 cleanup:
1900         switch(cleanup_phase) {
1901         case 2:
1902                 f_dput(dchild);
1903         case 1: /* locked parent dentry */
1904                 if (rc || oti == NULL) {
1905                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1906                 } else {
1907                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1908                                sizeof(parent_lockh));
1909                         oti->oti_ack_locks[0].mode = LCK_PW;
1910                 }
1911         case 0:
1912                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1913                 class_export_put(exp);
1914                 break;
1915         default:
1916                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1917                 LBUG();
1918         }
1919
1920         RETURN(rc);
1921 }
1922
1923 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1924                           struct lov_stripe_md *ea, struct obd_trans_info *oti)
1925 {
1926         struct obd_export *exp;
1927         struct obd_device *obd;
1928         struct filter_obd *filter;
1929         struct dentry *dchild = NULL, *dparent = NULL;
1930         struct filter_dentry_data *fdd;
1931         struct obd_run_ctxt saved;
1932         void *handle = NULL;
1933         struct lustre_handle parent_lockh;
1934         struct llog_cookie *fcc = NULL;
1935         int rc, rc2, cleanup_phase = 0;
1936         ENTRY;
1937
1938         exp = class_conn2export(conn);
1939         if (exp == NULL) {
1940                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1941                 RETURN(-EINVAL);
1942         }
1943
1944         obd = exp->exp_obd;
1945         filter = &obd->u.filter;
1946
1947         push_ctxt(&saved, &filter->fo_ctxt, NULL);
1948         dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1949                                      LCK_PW, &parent_lockh);
1950         if (IS_ERR(dparent))
1951                 GOTO(cleanup, rc = PTR_ERR(dparent));
1952         cleanup_phase = 1;
1953
1954         dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1955         if (IS_ERR(dchild))
1956                 GOTO(cleanup, rc = -ENOENT);
1957         cleanup_phase = 2;
1958
1959         if (dchild->d_inode == NULL) {
1960                 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
1961                 GOTO(cleanup, rc = -ENOENT);
1962         }
1963         handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK_LOG, oti);
1964         if (IS_ERR(handle))
1965                 GOTO(cleanup, rc = PTR_ERR(handle));
1966         cleanup_phase = 3;
1967
1968         fdd = dchild->d_fsdata;
1969
1970         /* Our MDC connection is established by the MDS to us */
1971         if ((oa->o_valid & OBD_MD_FLCOOKIE) && filter->fo_mdc_imp != NULL) {
1972                 OBD_ALLOC(fcc, sizeof(*fcc));
1973                 if (fcc != NULL)
1974                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
1975         }
1976
1977         if (fdd != NULL && atomic_read(&fdd->fdd_open_count)) {
1978                 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1979                 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1980                         fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1981
1982 #ifdef ENABLE_ORPHANS
1983                         filter_log_op_orphan(filter->fo_catalog, oa->o_id,
1984                                              oa->o_generation,&fdd->fdd_cookie);
1985 #endif
1986                         CDEBUG(D_INODE,
1987                                "defer destroy of %dx open objid "LPU64"\n",
1988                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1989                 } else {
1990                         CDEBUG(D_INODE,
1991                                "repeat destroy of %dx open objid "LPU64"\n",
1992                                atomic_read(&fdd->fdd_open_count), oa->o_id);
1993                 }
1994                 GOTO(cleanup, rc = 0);
1995         }
1996
1997         rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
1998
1999 cleanup:
2000         switch(cleanup_phase) {
2001         case 3:
2002                 if (fcc != NULL)
2003                         fsfilt_set_last_rcvd(obd, 0, oti->oti_handle,
2004                                              filter_cancel_cookies_cb, fcc);
2005                 rc = filter_finish_transno(exp, oti, rc);
2006                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2007                 if (rc2) {
2008                         CERROR("error on commit, err = %d\n", rc2);
2009                         if (!rc)
2010                                 rc = rc2;
2011                 }
2012         case 2:
2013                 f_dput(dchild);
2014         case 1:
2015                 if (rc || oti == NULL) {
2016                         filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
2017                 } else {
2018                         memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
2019                                sizeof(parent_lockh));
2020                         oti->oti_ack_locks[0].mode = LCK_PW;
2021                 }
2022         case 0:
2023                 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
2024                 class_export_put(exp);
2025                 break;
2026         default:
2027                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2028                 LBUG();
2029         }
2030
2031         RETURN(rc);
2032 }
2033
2034 /* NB start and end are used for punch, but not truncate */
2035 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
2036                            struct lov_stripe_md *lsm,
2037                            obd_off start, obd_off end,
2038                            struct obd_trans_info *oti)
2039 {
2040         int error;
2041         ENTRY;
2042
2043         if (end != OBD_OBJECT_EOF)
2044                 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2045                        end);
2046
2047         CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
2048                "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2049         oa->o_size = start;
2050         error = filter_setattr(conn, oa, NULL, oti);
2051         RETURN(error);
2052 }
2053
2054 static int filter_syncfs(struct obd_export *exp)
2055 {
2056         ENTRY;
2057
2058         RETURN(fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb));
2059 }
2060
2061 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2062                          unsigned long max_age)
2063 {
2064         ENTRY;
2065         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2066 }
2067
2068 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2069                            void *key, __u32 *vallen, void *val)
2070 {
2071         struct obd_device *obd;
2072         ENTRY;
2073
2074         obd = class_conn2obd(conn);
2075         if (obd == NULL) {
2076                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2077                        conn->cookie);
2078                 RETURN(-EINVAL);
2079         }
2080
2081         if (keylen == strlen("blocksize") &&
2082             memcmp(key, "blocksize", keylen) == 0) {
2083                 __u32 *blocksize = val;
2084                 *vallen = sizeof(*blocksize);
2085                 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2086                 RETURN(0);
2087         }
2088
2089         if (keylen == strlen("blocksize_bits") &&
2090             memcmp(key, "blocksize_bits", keylen) == 0) {
2091                 __u32 *blocksize_bits = val;
2092                 *vallen = sizeof(*blocksize_bits);
2093                 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2094                 RETURN(0);
2095         }
2096
2097         CDEBUG(D_IOCTL, "invalid key\n");
2098         RETURN(-EINVAL);
2099 }
2100
2101 static int filter_set_info(struct lustre_handle *conn, __u32 keylen,
2102                            void *key, __u32 vallen, void *val)
2103 {
2104         struct obd_device *obd;
2105         struct obd_export *exp;
2106         struct obd_import *imp;
2107         ENTRY;
2108
2109         obd = class_conn2obd(conn);
2110         if (obd == NULL) {
2111                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2112                        conn->cookie);
2113                 RETURN(-EINVAL);
2114         }
2115
2116         if (keylen < strlen("mds_conn") ||
2117             memcmp(key, "mds_conn", keylen) != 0)
2118                 RETURN(-EINVAL);
2119
2120         CERROR("Received MDS connection ("LPX64")\n", conn->cookie);
2121         memcpy(&obd->u.filter.fo_mdc_conn, conn, sizeof(*conn));
2122
2123         imp = obd->u.filter.fo_mdc_imp = class_new_import();
2124
2125         exp = class_conn2export(conn);
2126         imp->imp_connection = ptlrpc_connection_addref(exp->exp_connection);
2127         class_export_put(exp);
2128
2129         imp->imp_client = &obd->u.filter.fo_mdc_client;
2130         imp->imp_remote_handle = *conn;
2131         imp->imp_obd = obd;
2132         imp->imp_dlm_fake = 1; /* XXX rename imp_dlm_fake to something else */
2133         imp->imp_level = LUSTRE_CONN_FULL;
2134         class_import_put(imp);
2135
2136         RETURN(0);
2137 }
2138
2139 int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2140                      int len, void *karg, void *uarg)
2141 {
2142         struct obd_device *obd = class_conn2obd(conn);
2143
2144         switch (cmd) {
2145         case OBD_IOC_ABORT_RECOVERY:
2146                 CERROR("aborting recovery for device %s\n", obd->obd_name);
2147                 target_abort_recovery(obd);
2148                 RETURN(0);
2149
2150         default:
2151                 RETURN(-EINVAL);
2152         }
2153         RETURN(0);
2154 }
2155
2156 static struct obd_ops filter_obd_ops = {
2157         o_owner:          THIS_MODULE,
2158         o_attach:         filter_attach,
2159         o_detach:         filter_detach,
2160         o_get_info:       filter_get_info,
2161         o_set_info:       filter_set_info,
2162         o_setup:          filter_setup,
2163         o_cleanup:        filter_cleanup,
2164         o_connect:        filter_connect,
2165         o_disconnect:     filter_disconnect,
2166         o_statfs:         filter_statfs,
2167         o_syncfs:         filter_syncfs,
2168         o_getattr:        filter_getattr,
2169         o_create:         filter_create,
2170         o_setattr:        filter_setattr,
2171         o_destroy:        filter_destroy,
2172         o_open:           filter_open,
2173         o_close:          filter_close,
2174         o_brw:            filter_brw,
2175         o_punch:          filter_truncate,
2176         o_preprw:         filter_preprw,
2177         o_commitrw:       filter_commitrw,
2178         o_log_cancel:     filter_log_cancel,
2179         o_destroy_export: filter_destroy_export,
2180         o_iocontrol:      filter_iocontrol,
2181 };
2182
2183 static struct obd_ops filter_sanobd_ops = {
2184         o_owner:          THIS_MODULE,
2185         o_attach:         filter_attach,
2186         o_detach:         filter_detach,
2187         o_get_info:       filter_get_info,
2188         o_set_info:       filter_set_info,
2189         o_setup:          filter_san_setup,
2190         o_cleanup:        filter_cleanup,
2191         o_connect:        filter_connect,
2192         o_disconnect:     filter_disconnect,
2193         o_statfs:         filter_statfs,
2194         o_getattr:        filter_getattr,
2195         o_create:         filter_create,
2196         o_setattr:        filter_setattr,
2197         o_destroy:        filter_destroy,
2198         o_open:           filter_open,
2199         o_close:          filter_close,
2200         o_brw:            filter_brw,
2201         o_punch:          filter_truncate,
2202         o_preprw:         filter_preprw,
2203         o_commitrw:       filter_commitrw,
2204         o_log_cancel:     filter_log_cancel,
2205         o_san_preprw:     filter_san_preprw,
2206         o_destroy_export: filter_destroy_export,
2207         o_iocontrol:      filter_iocontrol,
2208 };
2209
2210 static int __init obdfilter_init(void)
2211 {
2212         struct lprocfs_static_vars lvars;
2213         int rc;
2214
2215         printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2216
2217         lprocfs_init_vars(filter, &lvars);
2218
2219         rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2220                                  OBD_FILTER_DEVICENAME);
2221         if (rc)
2222                 return rc;
2223
2224         rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2225                                  OBD_FILTER_SAN_DEVICENAME);
2226         if (rc)
2227                 class_unregister_type(OBD_FILTER_DEVICENAME);
2228         return rc;
2229 }
2230
2231 static void __exit obdfilter_exit(void)
2232 {
2233         class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2234         class_unregister_type(OBD_FILTER_DEVICENAME);
2235 }
2236
2237 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2238 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2239 MODULE_LICENSE("GPL");
2240
2241 module_init(obdfilter_init);
2242 module_exit(obdfilter_exit);