1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28 * (which need to get journal_lock, may block if journal full).
30 * Invariant: Call filter_start_transno() before any journal ops to avoid the
31 * same deadlock problem. We can (and want) to get rid of the
32 * transno sem in favour of the dir/inode i_sem to avoid single
33 * threaded operation on the OST.
36 #define DEBUG_SUBSYSTEM S_FILTER
38 #include <linux/config.h>
39 #include <linux/module.h>
41 #include <linux/dcache.h>
42 #include <linux/init.h>
43 #include <linux/version.h>
44 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
45 # include <linux/mount.h>
46 # include <linux/buffer_head.h>
49 #include <linux/obd_class.h>
50 #include <linux/lustre_dlm.h>
51 #include <linux/lustre_fsfilt.h>
52 #include <linux/lprocfs_status.h>
53 #include <linux/lustre_log.h>
54 #include <linux/lustre_commit_confd.h>
56 #include "filter_internal.h"
59 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
61 [S_IFREG >> S_SHIFT] "R",
62 [S_IFDIR >> S_SHIFT] "D",
63 [S_IFCHR >> S_SHIFT] "C",
64 [S_IFBLK >> S_SHIFT] "B",
65 [S_IFIFO >> S_SHIFT] "F",
66 [S_IFSOCK >> S_SHIFT] "S",
67 [S_IFLNK >> S_SHIFT] "L"
70 static inline const char *obd_mode_to_type(int mode)
72 return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
75 static void filter_ffd_addref(void *ffdp)
77 struct filter_file_data *ffd = ffdp;
79 atomic_inc(&ffd->ffd_refcount);
80 CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
81 atomic_read(&ffd->ffd_refcount));
84 static struct filter_file_data *filter_ffd_new(void)
86 struct filter_file_data *ffd;
88 OBD_ALLOC(ffd, sizeof *ffd);
90 CERROR("out of memory\n");
94 atomic_set(&ffd->ffd_refcount, 2);
96 INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
97 class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
102 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
104 struct filter_file_data *ffd = NULL;
106 LASSERT(handle != NULL);
107 ffd = class_handle2object(handle->cookie);
109 LASSERT(ffd->ffd_file->private_data == ffd);
113 static void filter_ffd_put(struct filter_file_data *ffd)
115 CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
116 atomic_read(&ffd->ffd_refcount) - 1);
117 LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
118 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
119 if (atomic_dec_and_test(&ffd->ffd_refcount)) {
120 LASSERT(list_empty(&ffd->ffd_handle.h_link));
121 OBD_FREE(ffd, sizeof *ffd);
125 static void filter_ffd_destroy(struct filter_file_data *ffd)
127 class_handle_unhash(&ffd->ffd_handle);
131 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
132 void *cb_data, int error)
134 obd_transno_commit_cb(obd, transno, error);
137 static int filter_client_log_cancel(struct lustre_handle *conn,
138 struct lov_stripe_md *lsm, int count,
139 struct llog_cookie *cookies, int flags)
141 struct obd_device *obd = class_conn2obd(conn);
142 struct llog_commit_data *llcd;
143 struct filter_obd *filter = &obd->u.filter;
147 if (count == 0 || cookies == NULL) {
148 down(&filter->fo_sem);
149 if (filter->fo_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
152 llcd = filter->fo_llcd;
156 down(&filter->fo_sem);
157 llcd = filter->fo_llcd;
161 CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
162 cookies->lgc_lgl.lgl_oid,
163 cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
164 GOTO(out, rc = -ENOMEM);
166 llcd->llcd_import = filter->fo_mdc_imp;
167 filter->fo_llcd = llcd;
170 memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
172 llcd->llcd_cookiebytes += sizeof(*cookies);
176 if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
177 flags & OBD_LLOG_FL_SENDNOW)) {
178 filter->fo_llcd = NULL;
187 /* When this (destroy) operation is committed, return the cancel cookie */
188 static void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
189 void *cb_data, int error)
191 filter_client_log_cancel(&obd->u.filter.fo_mdc_conn, NULL, 1,
192 cb_data, OBD_LLOG_FL_SENDNOW);
193 OBD_FREE(cb_data, sizeof(struct llog_cookie));
196 /* Assumes caller has already pushed us into the kernel context. */
197 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
200 struct filter_obd *filter = &exp->exp_obd->u.filter;
201 struct filter_export_data *fed = &exp->exp_filter_data;
202 struct filter_client_data *fcd = fed->fed_fcd;
207 /* Propagate error code. */
211 if (!exp->exp_obd->obd_replayable)
214 /* we don't allocate new transnos for replayed requests */
215 if (oti != NULL && oti->oti_transno == 0) {
216 spin_lock(&filter->fo_translock);
217 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
218 filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
219 spin_unlock(&filter->fo_translock);
220 oti->oti_transno = last_rcvd;
221 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
222 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
224 /* could get xid from oti, if it's ever needed */
225 fcd->fcd_last_xid = 0;
227 off = fed->fed_lr_off;
228 fsfilt_set_last_rcvd(exp->exp_obd, last_rcvd, oti->oti_handle,
229 filter_commit_cb, NULL);
230 written = fsfilt_write_record(exp->exp_obd,
231 filter->fo_rcvd_filp, (char *)fcd,
233 CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
234 "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid,
235 fed->fed_lr_idx, written);
237 if (written == sizeof(*fcd))
239 CERROR("error writing to %s: rc = %d\n", LAST_RCVD,
249 void f_dput(struct dentry *dentry)
251 /* Can't go inside filter_ddelete because it can block */
252 CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
253 dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
254 LASSERT(atomic_read(&dentry->d_count) > 0);
259 /* Not racy w.r.t. others, because we are the only user of this dentry */
260 static void filter_drelease(struct dentry *dentry)
262 if (dentry->d_fsdata)
263 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
266 struct dentry_operations filter_dops = {
267 d_release: filter_drelease,
270 /* Add client data to the FILTER. We use a bitmap to locate a free space
271 * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
272 * Otherwise, we have just read the data from the last_rcvd file and
273 * we know its offset. */
274 static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
275 struct filter_export_data *fed, int cl_idx)
277 unsigned long *bitmap = filter->fo_last_rcvd_slots;
278 int new_client = (cl_idx == -1);
281 LASSERT(bitmap != NULL);
283 /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
284 if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
287 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
288 * there's no need for extra complication here
291 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
293 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
294 CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
297 if (test_and_set_bit(cl_idx, bitmap)) {
298 CERROR("FILTER client %d: found bit is set in bitmap\n",
300 cl_idx = find_next_zero_bit(bitmap,
301 FILTER_LR_MAX_CLIENTS,
306 if (test_and_set_bit(cl_idx, bitmap)) {
307 CERROR("FILTER client %d: bit already set in bitmap!\n",
313 fed->fed_lr_idx = cl_idx;
314 fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
315 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
317 CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
318 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
321 struct obd_run_ctxt saved;
322 loff_t off = fed->fed_lr_off;
326 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
327 fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
329 push_ctxt(&saved, &filter->fo_ctxt, NULL);
330 /* Transaction needed to fix bug 1403 */
331 handle = fsfilt_start(obd,
332 filter->fo_rcvd_filp->f_dentry->d_inode,
333 FSFILT_OP_SETATTR, NULL);
334 if (IS_ERR(handle)) {
335 written = PTR_ERR(handle);
336 CERROR("unable to start transaction: rc %d\n",
339 written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
340 (char *)fed->fed_fcd,
341 sizeof(*fed->fed_fcd), &off);
343 filter->fo_rcvd_filp->f_dentry->d_inode,
346 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
348 if (written != sizeof(*fed->fed_fcd)) {
349 CERROR("error writing %s client idx %u: rc %d\n",
350 LAST_RCVD, fed->fed_lr_idx, written);
359 static int filter_client_free(struct obd_export *exp, int flags)
361 struct filter_export_data *fed = &exp->exp_filter_data;
362 struct filter_obd *filter = &exp->exp_obd->u.filter;
363 struct obd_device *obd = exp->exp_obd;
364 struct filter_client_data zero_fcd;
365 struct obd_run_ctxt saved;
370 if (fed->fed_fcd == NULL)
373 if (flags & OBD_OPT_FAILOVER)
376 /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
377 if (strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID") == 0)
380 LASSERT(filter->fo_last_rcvd_slots != NULL);
382 off = fed->fed_lr_off;
384 CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
385 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
387 if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
388 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
393 memset(&zero_fcd, 0, sizeof zero_fcd);
394 push_ctxt(&saved, &filter->fo_ctxt, NULL);
395 written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
396 (char *)&zero_fcd, sizeof(zero_fcd),
399 /* XXX: this write gets lost sometimes, unless this sync is here. */
401 file_fsync(filter->fo_rcvd_filp,
402 filter->fo_rcvd_filp->f_dentry, 1);
403 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
405 if (written != sizeof(zero_fcd)) {
406 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
407 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
411 "zeroed disconnecting client %s at idx %u (%llu)\n",
412 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
416 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
421 static int filter_free_server_data(struct filter_obd *filter)
423 OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
424 filter->fo_fsd = NULL;
425 OBD_FREE(filter->fo_last_rcvd_slots,
426 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
427 filter->fo_last_rcvd_slots = NULL;
431 /* assumes caller is already in kernel ctxt */
432 int filter_update_server_data(struct obd_device *obd,
433 struct file *filp, struct filter_server_data *fsd)
439 CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
440 CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
441 le64_to_cpu(fsd->fsd_last_objid));
442 CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
443 le64_to_cpu(fsd->fsd_last_transno));
444 CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
445 le64_to_cpu(fsd->fsd_mount_count));
447 rc = fsfilt_write_record(obd, filp, (char *)fsd, sizeof(*fsd), &off);
448 if (rc == sizeof(*fsd))
451 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n", rc);
457 /* assumes caller has already in kernel ctxt */
458 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
459 __u64 init_lastobjid)
461 struct filter_obd *filter = &obd->u.filter;
462 struct filter_server_data *fsd;
463 struct filter_client_data *fcd = NULL;
464 struct inode *inode = filp->f_dentry->d_inode;
465 unsigned long last_rcvd_size = inode->i_size;
466 __u64 mount_count = 0;
471 /* ensure padding in the struct is the correct size */
472 LASSERT (offsetof(struct filter_server_data, fsd_padding) +
473 sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
474 LASSERT (offsetof(struct filter_client_data, fcd_padding) +
475 sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
477 OBD_ALLOC(fsd, sizeof(*fsd));
480 filter->fo_fsd = fsd;
482 OBD_ALLOC(filter->fo_last_rcvd_slots,
483 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
484 if (filter->fo_last_rcvd_slots == NULL) {
485 OBD_FREE(fsd, sizeof(*fsd));
489 if (last_rcvd_size == 0) {
490 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
492 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
493 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
494 fsd->fsd_last_transno = 0;
495 mount_count = fsd->fsd_mount_count = 0;
496 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
497 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
498 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
499 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
500 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
502 int retval = fsfilt_read_record(obd, filp, (char *)fsd,
504 if (retval != sizeof(*fsd)) {
505 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
507 GOTO(err_fsd, rc = -EIO);
509 mount_count = le64_to_cpu(fsd->fsd_mount_count);
510 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
511 fsd->fsd_last_objid =
512 cpu_to_le64(le64_to_cpu(fsd->fsd_last_objid) +
516 if (fsd->fsd_feature_incompat) {
517 CERROR("unsupported feature %x\n",
518 le32_to_cpu(fsd->fsd_feature_incompat));
519 GOTO(err_fsd, rc = -EINVAL);
521 if (fsd->fsd_feature_rocompat) {
522 CERROR("read-only feature %x\n",
523 le32_to_cpu(fsd->fsd_feature_rocompat));
524 /* Do something like remount filesystem read-only */
525 GOTO(err_fsd, rc = -EINVAL);
528 CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
529 obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
530 CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
531 obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
532 CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
533 obd->obd_name, mount_count);
534 CDEBUG(D_INODE, "%s: server data size: %u\n",
535 obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
536 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
537 obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
538 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
539 obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
540 CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
541 obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
543 if (!obd->obd_replayable) {
544 CWARN("%s: recovery support OFF\n", obd->obd_name);
548 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
553 OBD_ALLOC(fcd, sizeof(*fcd));
555 GOTO(err_fsd, rc = -ENOMEM);
558 /* Don't assume off is incremented properly, in case
559 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
561 off = le32_to_cpu(fsd->fsd_client_start) +
562 cl_idx * le16_to_cpu(fsd->fsd_client_size);
563 rc = fsfilt_read_record(obd, filp, (char *)fcd, sizeof(*fcd),
565 if (rc != sizeof(*fcd)) {
566 CERROR("error reading FILTER %s offset %d: rc = %d\n",
567 LAST_RCVD, cl_idx, rc);
568 if (rc > 0) /* XXX fatal error or just abort reading? */
573 if (fcd->fcd_uuid[0] == '\0') {
574 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
579 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
581 /* These exports are cleaned up by filter_disconnect(), so they
582 * need to be set up like real exports as filter_connect() does.
584 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
585 if (mount_age < FILTER_MOUNT_RECOV) {
586 struct obd_export *exp = class_new_export(obd);
587 struct filter_export_data *fed;
588 CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
589 " srv lr: "LPU64" mnt: "LPU64" last mount: "
590 LPU64"\n", fcd->fcd_uuid, cl_idx,
591 last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
592 le64_to_cpu(fcd->fcd_mount_count), mount_count);
594 /* XXX this rc is ignored */
598 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
599 sizeof exp->exp_client_uuid.uuid);
600 fed = &exp->exp_filter_data;
602 filter_client_add(obd, filter, fed, cl_idx);
603 /* create helper if export init gets more complex */
604 INIT_LIST_HEAD(&fed->fed_open_head);
605 spin_lock_init(&fed->fed_lock);
608 obd->obd_recoverable_clients++;
609 class_export_put(exp);
612 "discarded client %d UUID '%s' count "LPU64"\n",
613 cl_idx, fcd->fcd_uuid,
614 le64_to_cpu(fcd->fcd_mount_count));
617 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
620 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
621 filter->fo_fsd->fsd_last_transno=cpu_to_le64(last_rcvd);
623 obd->obd_last_committed =
624 le64_to_cpu(filter->fo_fsd->fsd_last_transno);
626 if (obd->obd_recoverable_clients) {
627 CERROR("RECOVERY: %d recoverable clients, last_rcvd "
628 LPU64"\n", obd->obd_recoverable_clients,
629 le64_to_cpu(filter->fo_fsd->fsd_last_transno));
630 obd->obd_next_recovery_transno =
631 obd->obd_last_committed + 1;
632 obd->obd_recovering = 1;
638 OBD_FREE(fcd, sizeof(*fcd));
641 fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
643 /* save it, so mount count and last_transno is current */
644 rc = filter_update_server_data(obd, filp, filter->fo_fsd);
649 filter_free_server_data(filter);
653 /* setup the object store with correct subdirectories */
654 static int filter_prep(struct obd_device *obd)
656 struct obd_run_ctxt saved;
657 struct filter_obd *filter = &obd->u.filter;
658 struct dentry *dentry, *O_dentry;
665 push_ctxt(&saved, &filter->fo_ctxt, NULL);
666 dentry = simple_mkdir(current->fs->pwd, "O", 0700);
667 CDEBUG(D_INODE, "got/created O: %p\n", dentry);
668 if (IS_ERR(dentry)) {
669 rc = PTR_ERR(dentry);
670 CERROR("cannot open/create O: rc = %d\n", rc);
673 filter->fo_dentry_O = dentry;
676 * Create directories and/or get dentries for each object type.
677 * This saves us from having to do multiple lookups for each one.
679 O_dentry = filter->fo_dentry_O;
680 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
681 char *name = obd_type_by_mode[mode];
684 filter->fo_dentry_O_mode[mode] = NULL;
687 dentry = simple_mkdir(O_dentry, name, 0700);
688 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
689 if (IS_ERR(dentry)) {
690 rc = PTR_ERR(dentry);
691 CERROR("cannot create O/%s: rc = %d\n", name, rc);
692 GOTO(err_O_mode, rc);
694 filter->fo_dentry_O_mode[mode] = dentry;
697 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
698 if (!file || IS_ERR(file)) {
700 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
702 GOTO(err_O_mode, rc);
705 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
706 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
707 file->f_dentry->d_inode->i_mode);
708 GOTO(err_filp, rc = -ENOENT);
711 rc = fsfilt_journal_data(obd, file);
713 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
716 /* steal operations */
717 inode = file->f_dentry->d_inode;
718 filter->fo_fop = file->f_op;
719 filter->fo_iop = inode->i_op;
720 filter->fo_aops = inode->i_mapping->a_ops;
721 #ifdef I_SKIP_PDFLUSH
723 * we need this to protect from deadlock
724 * pdflush vs. lustre_fwrite()
726 inode->i_flags |= I_SKIP_PDFLUSH;
729 rc = filter_init_server_data(obd, file, FILTER_INIT_OBJID);
731 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
732 GOTO(err_client, rc);
734 filter->fo_rcvd_filp = file;
736 if (filter->fo_subdir_count) {
737 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
738 OBD_ALLOC(filter->fo_dentry_O_sub,
739 filter->fo_subdir_count * sizeof(dentry));
740 if (!filter->fo_dentry_O_sub)
741 GOTO(err_client, rc = -ENOMEM);
743 for (i = 0; i < filter->fo_subdir_count; i++) {
745 snprintf(dir, sizeof(dir), "d%u", i);
747 dentry = simple_mkdir(O_dentry, dir, 0700);
748 CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
749 if (IS_ERR(dentry)) {
750 rc = PTR_ERR(dentry);
751 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
754 filter->fo_dentry_O_sub[i] = dentry;
759 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
765 struct dentry *dentry = filter->fo_dentry_O_sub[i];
768 filter->fo_dentry_O_sub[i] = NULL;
771 OBD_FREE(filter->fo_dentry_O_sub,
772 filter->fo_subdir_count * sizeof(dentry));
774 class_disconnect_exports(obd, 0);
776 if (filp_close(file, 0))
777 CERROR("can't close %s after error\n", LAST_RCVD);
778 filter->fo_rcvd_filp = NULL;
781 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
784 filter->fo_dentry_O_mode[mode] = NULL;
787 f_dput(filter->fo_dentry_O);
788 filter->fo_dentry_O = NULL;
792 /* cleanup the filter: write last used object id to status file */
793 static void filter_post(struct obd_device *obd)
795 struct obd_run_ctxt saved;
796 struct filter_obd *filter = &obd->u.filter;
800 /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
801 * best to start a transaction with h_sync, because we removed this
804 push_ctxt(&saved, &filter->fo_ctxt, NULL);
805 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
808 CERROR("error writing lastobjid: rc = %ld\n", rc);
811 if (filter->fo_rcvd_filp) {
812 rc = file_fsync(filter->fo_rcvd_filp,
813 filter->fo_rcvd_filp->f_dentry, 1);
814 filp_close(filter->fo_rcvd_filp, 0);
815 filter->fo_rcvd_filp = NULL;
817 CERROR("error closing %s: rc = %ld\n", LAST_RCVD, rc);
820 if (filter->fo_subdir_count) {
822 for (i = 0; i < filter->fo_subdir_count; i++) {
823 struct dentry *dentry = filter->fo_dentry_O_sub[i];
825 filter->fo_dentry_O_sub[i] = NULL;
827 OBD_FREE(filter->fo_dentry_O_sub,
828 filter->fo_subdir_count *
829 sizeof(*filter->fo_dentry_O_sub));
831 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
832 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
835 filter->fo_dentry_O_mode[mode] = NULL;
838 f_dput(filter->fo_dentry_O);
839 filter_free_server_data(filter);
840 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
843 __u64 filter_next_id(struct filter_obd *filter)
846 LASSERT(filter->fo_fsd != NULL);
848 spin_lock(&filter->fo_objidlock);
849 id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
850 filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
851 spin_unlock(&filter->fo_objidlock);
856 /* direct cut-n-paste of mds_blocking_ast() */
857 static int filter_blocking_ast(struct ldlm_lock *lock,
858 struct ldlm_lock_desc *desc,
859 void *data, int flag)
864 if (flag == LDLM_CB_CANCELING) {
865 /* Don't need to do anything here. */
869 /* XXX layering violation! -phil */
870 l_lock(&lock->l_resource->lr_namespace->ns_lock);
871 /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
872 * such that mds_blocking_ast is called just before l_i_p takes the
873 * ns_lock, then by the time we get the lock, we might not be the
874 * correct blocking function anymore. So check, and return early, if
876 if (lock->l_blocking_ast != filter_blocking_ast) {
877 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
881 lock->l_flags |= LDLM_FL_CBPENDING;
882 do_ast = (!lock->l_readers && !lock->l_writers);
883 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
886 struct lustre_handle lockh;
889 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
890 ldlm_lock2handle(lock, &lockh);
891 rc = ldlm_cli_cancel(&lockh);
893 CERROR("ldlm_cli_cancel: %d\n", rc);
895 LDLM_DEBUG(lock, "Lock still has references, will be "
901 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
902 ldlm_mode_t lock_mode,struct lustre_handle *lockh)
904 struct ldlm_res_id res_id = { .name = {0} };
908 res_id.name[0] = de->d_inode->i_ino;
909 res_id.name[1] = de->d_inode->i_generation;
910 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
911 res_id, LDLM_PLAIN, NULL, 0, lock_mode,
912 &flags, ldlm_completion_ast,
913 filter_blocking_ast, NULL, lockh);
915 RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
918 /* We never dget the object parent, so DON'T dput it either */
919 static void filter_parent_unlock(struct dentry *dparent,
920 struct lustre_handle *lockh,
921 ldlm_mode_t lock_mode)
923 ldlm_lock_decref(lockh, lock_mode);
926 /* We never dget the object parent, so DON'T dput it either */
927 struct dentry *filter_parent(struct obd_device *obd, obd_mode mode,
930 struct filter_obd *filter = &obd->u.filter;
932 LASSERT(S_ISREG(mode)); /* only regular files for now */
933 if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
934 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
936 return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
939 /* We never dget the object parent, so DON'T dput it either */
940 struct dentry *filter_parent_lock(struct obd_device *obd, obd_mode mode,
941 obd_id objid, ldlm_mode_t lock_mode,
942 struct lustre_handle *lockh)
944 unsigned long now = jiffies;
945 struct dentry *de = filter_parent(obd, mode, objid);
951 rc = filter_lock_dentry(obd, de, lock_mode, lockh);
952 if (time_after(jiffies, now + 15 * HZ))
953 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
954 return rc ? ERR_PTR(rc) : de;
957 /* How to get files, dentries, inodes from object id's.
959 * If dir_dentry is passed, the caller has already locked the parent
960 * appropriately for this operation (normally a write lock). If
961 * dir_dentry is NULL, we do a read lock while we do the lookup to
962 * avoid races with create/destroy and such changing the directory
963 * internal to the filesystem code. */
964 struct dentry *filter_fid2dentry(struct obd_device *obd,
965 struct dentry *dir_dentry,
966 obd_mode mode, obd_id id)
968 struct lustre_handle lockh;
969 struct dentry *dparent = dir_dentry;
970 struct dentry *dchild;
976 CERROR("fatal: invalid object id 0\n");
978 RETURN(ERR_PTR(-ESTALE));
981 len = sprintf(name, LPU64, id);
982 if (dir_dentry == NULL) {
983 dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
987 CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
988 dparent->d_name.len, dparent->d_name.name, name);
989 dchild = ll_lookup_one_len(name, dparent, len);
990 if (dir_dentry == NULL)
991 filter_parent_unlock(dparent, &lockh, LCK_PR);
992 if (IS_ERR(dchild)) {
993 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
997 CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
998 name, dchild, atomic_read(&dchild->d_count));
1000 LASSERT(atomic_read(&dchild->d_count) > 0);
1005 static struct file *filter_obj_open(struct obd_export *export,
1006 struct obd_trans_info *oti,
1007 __u64 id, __u32 type, int parent_mode,
1008 struct lustre_handle *parent_lockh)
1010 struct obd_device *obd = export->exp_obd;
1011 struct filter_obd *filter = &obd->u.filter;
1012 struct dentry *dchild = NULL, *dparent = NULL;
1013 struct filter_export_data *fed = &export->exp_filter_data;
1014 struct filter_dentry_data *fdd = NULL;
1015 struct filter_file_data *ffd = NULL;
1016 struct obd_run_ctxt saved;
1019 int len, cleanup_phase = 0;
1022 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1025 CERROR("fatal: invalid obdo "LPU64"\n", id);
1026 GOTO(cleanup, file = ERR_PTR(-ESTALE));
1029 if (!(type & S_IFMT)) {
1030 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
1031 __FUNCTION__, id, type);
1032 GOTO(cleanup, file = ERR_PTR(-EINVAL));
1035 ffd = filter_ffd_new();
1037 CERROR("obdfilter: out of memory\n");
1038 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1043 /* We preallocate this to avoid blocking while holding fo_fddlock */
1044 OBD_ALLOC(fdd, sizeof *fdd);
1046 CERROR("obdfilter: out of memory\n");
1047 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1052 dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1053 if (IS_ERR(dparent))
1054 GOTO(cleanup, file = (void *)dparent);
1058 len = snprintf(name, sizeof(name), LPU64, id);
1059 dchild = ll_lookup_one_len(name, dparent, len);
1061 GOTO(cleanup, file = (void *)dchild);
1065 if (dchild->d_inode == NULL) {
1066 CERROR("opening non-existent object %s - O_CREAT?\n", name);
1067 /* dput(dchild); call filter_create_internal here */
1068 file = ERR_PTR(-ENOENT);
1069 GOTO(cleanup, file);
1072 /* dentry_open does a dput(dchild) and mntput(mnt) on error */
1073 mntget(filter->fo_vfsmnt);
1074 file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1076 dchild = NULL; /* prevent a double dput in step 4 */
1077 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1078 GOTO(cleanup, file);
1081 spin_lock(&filter->fo_fddlock);
1082 if (dchild->d_fsdata) {
1083 spin_unlock(&filter->fo_fddlock);
1084 OBD_FREE(fdd, sizeof *fdd);
1085 fdd = dchild->d_fsdata;
1086 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1087 /* should only happen during client recovery */
1088 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1089 CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1090 atomic_inc(&fdd->fdd_open_count);
1092 atomic_set(&fdd->fdd_open_count, 1);
1093 fdd->fdd_magic = FILTER_DENTRY_MAGIC;
1095 fdd->fdd_objid = id;
1096 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1097 dchild->d_fsdata = fdd;
1098 spin_unlock(&filter->fo_fddlock);
1101 ffd->ffd_file = file;
1102 LASSERT(file->private_data == NULL);
1103 file->private_data = ffd;
1106 dchild->d_op = &filter_dops;
1108 LASSERT(dchild->d_op == &filter_dops);
1110 spin_lock(&fed->fed_lock);
1111 list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1112 spin_unlock(&fed->fed_lock);
1114 CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1116 switch (cleanup_phase) {
1122 filter_parent_unlock(dparent, parent_lockh,parent_mode);
1125 OBD_FREE(fdd, sizeof *fdd);
1128 filter_ffd_destroy(ffd);
1129 filter_ffd_put(ffd);
1131 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1136 /* Caller must hold LCK_PW on parent and push us into kernel context.
1137 * Caller is also required to ensure that dchild->d_inode exists. */
1138 static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
1139 struct dentry *dparent,
1140 struct dentry *dchild)
1142 struct inode *inode = dchild->d_inode;
1146 if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1147 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1148 dchild->d_name.len, dchild->d_name.name,
1149 inode->i_nlink, atomic_read(&inode->i_count));
1154 /* Tell the clients that the object is gone now and that they should
1155 * throw away any cached pages. We don't need to wait until they're
1156 * done, so just decref the lock right away and let ldlm_completion_ast
1157 * clean up when it's all over. */
1158 ldlm_cli_enqueue(..., LCK_PW, AST_INTENT_DESTROY, &lockh);
1159 ldlm_lock_decref(&lockh, LCK_PW);
1163 struct lustre_handle lockh;
1165 struct ldlm_res_id res_id = { .name = { objid } };
1167 /* This part is a wee bit iffy: we really only want to bust the
1168 * locks on our stripe, so that we don't end up bouncing
1169 * [0->EOF] locks around on each of the OSTs as the rest of the
1170 * destroys get processed. Because we're only talking to
1171 * the local LDLM, though, we should only end up locking the
1172 * whole of our stripe. When bug 1425 (take all locks on OST
1173 * for stripe 0) is fixed, this code should be revisited. */
1174 struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
1176 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1177 res_id, LDLM_EXTENT, &extent,
1178 sizeof(extent), LCK_PW, &flags,
1179 ldlm_completion_ast, filter_blocking_ast,
1181 /* We only care about the side-effects, just drop the lock. */
1182 ldlm_lock_decref(&lockh, LCK_PW);
1185 rc = vfs_unlink(dparent->d_inode, dchild);
1188 CERROR("error unlinking objid %*s: rc %d\n",
1189 dchild->d_name.len, dchild->d_name.name, rc);
1194 /* If closing because we are failing this device, then
1195 don't do the unlink on close.
1197 static int filter_close_internal(struct obd_export *exp,
1198 struct filter_file_data *ffd,
1199 struct obd_trans_info *oti, int flags)
1201 struct obd_device *obd = exp->exp_obd;
1202 struct filter_obd *filter = &obd->u.filter;
1203 struct file *filp = ffd->ffd_file;
1204 struct dentry *dchild = dget(filp->f_dentry);
1205 struct filter_dentry_data *fdd = dchild->d_fsdata;
1206 struct lustre_handle parent_lockh;
1207 int rc, rc2, cleanup_phase = 0;
1208 struct dentry *dparent = NULL;
1209 struct obd_run_ctxt saved;
1212 LASSERT(filp->private_data == ffd);
1213 LASSERT(fdd != NULL);
1214 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1216 rc = filp_close(filp, 0);
1218 if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1219 (fdd->fdd_flags & FILTER_FLAG_DESTROY) &&
1220 !(flags & OBD_OPT_FAILOVER)) {
1223 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1226 LASSERT(fdd->fdd_objid > 0);
1227 dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1228 LCK_PW, &parent_lockh);
1229 if (IS_ERR(dparent))
1230 GOTO(cleanup, rc = PTR_ERR(dparent));
1233 handle = fsfilt_start(obd, dparent->d_inode,
1234 FSFILT_OP_UNLINK_LOG, oti);
1236 GOTO(cleanup, rc = PTR_ERR(handle));
1239 if (oti->oti_handle == NULL)
1240 oti->oti_handle = handle;
1242 LASSERT(oti->oti_handle == handle);
1245 #ifdef ENABLE_ORPHANS
1246 /* Remove orphan unlink record from log */
1247 llog_cancel_records(filter->fo_catalog, 1, &fdd->fdd_cookie);
1249 /* XXX unlink from PENDING directory now too */
1250 rc2 = filter_destroy_internal(obd, fdd->fdd_objid, dparent,
1254 rc = filter_finish_transno(exp, oti, rc);
1255 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1257 CERROR("error on commit, err = %d\n", rc2);
1264 switch(cleanup_phase) {
1266 if (rc || oti == NULL) {
1267 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1269 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1270 sizeof(parent_lockh));
1271 oti->oti_ack_locks[0].mode = LCK_PW;
1274 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1277 filter_ffd_destroy(ffd);
1280 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1287 /* mount the file system (secretly) */
1288 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1291 struct obd_ioctl_data* data = buf;
1292 struct filter_obd *filter = &obd->u.filter;
1293 struct vfsmount *mnt;
1297 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1300 obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1301 if (IS_ERR(obd->obd_fsops))
1302 RETURN(PTR_ERR(obd->obd_fsops));
1304 mnt = do_kern_mount(data->ioc_inlbuf2, MS_NOATIME | MS_NODIRATIME,
1305 data->ioc_inlbuf1, option);
1310 if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1311 if (*data->ioc_inlbuf3 == 'f') {
1312 obd->obd_replayable = 1;
1313 obd_sync_filter = 1;
1314 CERROR("%s: configured for recovery and sync write\n",
1317 if (*data->ioc_inlbuf3 != 'n') {
1318 CERROR("unrecognised flag '%c'\n",
1319 *data->ioc_inlbuf3);
1324 if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
1325 if (*data->ioc_inlbuf4 == '/') {
1326 CERROR("filter namespace mount: %s\n",
1328 filter->fo_nspath = strdup(data->ioc_inlbuf4);
1330 CERROR("namespace mount must be absolute path: '%s'\n",
1335 filter->fo_vfsmnt = mnt;
1336 filter->fo_sb = mnt->mnt_sb;
1337 filter->fo_fstype = mnt->mnt_sb->s_type->name;
1338 CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1340 OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1341 filter->fo_ctxt.pwdmnt = mnt;
1342 filter->fo_ctxt.pwd = mnt->mnt_root;
1343 filter->fo_ctxt.fs = get_ds();
1345 rc = filter_prep(obd);
1347 GOTO(err_mntput, rc);
1349 spin_lock_init(&filter->fo_translock);
1350 spin_lock_init(&filter->fo_fddlock);
1351 spin_lock_init(&filter->fo_objidlock);
1352 INIT_LIST_HEAD(&filter->fo_export_list);
1354 ptlrpc_init_client(MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1355 "filter_mdc", &filter->fo_mdc_client);
1356 sema_init(&filter->fo_sem, 1);
1358 obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1359 LDLM_NAMESPACE_SERVER);
1360 if (obd->obd_namespace == NULL)
1361 GOTO(err_post, rc = -ENOMEM);
1363 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1364 "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1366 /* Create a non-replaying connection for recovery logging, so that
1367 * we don't create a client entry for this local connection, and do
1368 * not log or assign transaction numbers for logging operations. */
1369 #ifdef ENABLE_ORPHANS
1370 filter->fo_catalog = filter_get_catalog(obd);
1371 if (IS_ERR(filter->fo_catalog))
1372 GOTO(err_post, rc = PTR_ERR(filter->fo_catalog));
1385 fsfilt_put_ops(obd->obd_fsops);
1389 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1391 struct obd_ioctl_data* data = buf;
1392 char *option = NULL;
1394 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1395 /* bug 1577: implement async-delete for 2.5 */
1396 if (!strcmp(data->ioc_inlbuf2, "ext3"))
1397 option = "asyncdel";
1400 return filter_common_setup(obd, len, buf, option);
1403 static int filter_cleanup(struct obd_device *obd, int flags)
1405 struct filter_obd *filter = &obd->u.filter;
1408 if (flags & OBD_OPT_FAILOVER)
1409 CERROR("%s: shutting down for failover; client state will"
1410 " be preserved.\n", obd->obd_name);
1412 if (!list_empty(&obd->obd_exports)) {
1413 CERROR("%s: still has clients!\n", obd->obd_name);
1414 class_disconnect_exports(obd, flags);
1415 if (!list_empty(&obd->obd_exports)) {
1416 CERROR("still has exports after forced cleanup?\n");
1421 #ifdef ENABLE_ORPHANS
1422 filter_put_catalog(filter->fo_catalog);
1425 ldlm_namespace_free(obd->obd_namespace);
1427 if (filter->fo_sb == NULL)
1432 shrink_dcache_parent(filter->fo_sb->s_root);
1435 if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
1436 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1437 atomic_read(&filter->fo_vfsmnt->mnt_count));
1440 mntput(filter->fo_vfsmnt);
1441 //destroy_buffers(filter->fo_sb->s_dev);
1442 filter->fo_sb = NULL;
1443 fsfilt_put_ops(obd->obd_fsops);
1449 static int filter_attach(struct obd_device *obd, obd_count len, void *data)
1451 struct lprocfs_static_vars lvars;
1454 lprocfs_init_vars(filter, &lvars);
1455 rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1459 rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1463 /* Init obdfilter private stats here */
1464 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1465 LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1466 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1467 LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1471 static int filter_detach(struct obd_device *dev)
1473 lprocfs_free_obd_stats(dev);
1474 return lprocfs_obd_detach(dev);
1477 /* nearly identical to mds_connect */
1478 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1479 struct obd_uuid *cluuid)
1481 struct obd_export *exp;
1482 struct filter_export_data *fed;
1483 struct filter_client_data *fcd;
1484 struct filter_obd *filter = &obd->u.filter;
1488 if (conn == NULL || obd == NULL || cluuid == NULL)
1491 rc = class_connect(conn, obd, cluuid);
1494 exp = class_conn2export(conn);
1495 LASSERT(exp != NULL);
1497 fed = &exp->exp_filter_data;
1498 class_export_put(exp);
1500 INIT_LIST_HEAD(&fed->fed_open_head);
1501 spin_lock_init(&fed->fed_lock);
1503 if (!obd->obd_replayable)
1506 OBD_ALLOC(fcd, sizeof(*fcd));
1508 CERROR("filter: out of memory for client data\n");
1509 GOTO(out_export, rc = -ENOMEM);
1512 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1514 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1516 rc = filter_client_add(obd, filter, fed, -1);
1523 OBD_FREE(fcd, sizeof(*fcd));
1525 class_disconnect(conn, 0);
1530 static void filter_destroy_export(struct obd_export *exp)
1532 struct filter_export_data *fed = &exp->exp_filter_data;
1535 spin_lock(&fed->fed_lock);
1536 while (!list_empty(&fed->fed_open_head)) {
1537 struct filter_file_data *ffd;
1539 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1541 list_del(&ffd->ffd_export_list);
1542 spin_unlock(&fed->fed_lock);
1544 CDEBUG(D_INFO, "force close file %*s (hdl %p:"LPX64") on "
1545 "disconnect\n", ffd->ffd_file->f_dentry->d_name.len,
1546 ffd->ffd_file->f_dentry->d_name.name,
1547 ffd, ffd->ffd_handle.h_cookie);
1549 filter_close_internal(exp, ffd, NULL, exp->exp_flags);
1550 spin_lock(&fed->fed_lock);
1552 spin_unlock(&fed->fed_lock);
1554 if (exp->exp_obd->obd_replayable)
1555 filter_client_free(exp, exp->exp_flags);
1559 /* also incredibly similar to mds_disconnect */
1560 static int filter_disconnect(struct lustre_handle *conn, int flags)
1562 struct obd_export *exp = class_conn2export(conn);
1563 unsigned long irqflags;
1568 ldlm_cancel_locks_for_export(exp);
1570 spin_lock_irqsave(&exp->exp_lock, irqflags);
1571 exp->exp_flags = flags;
1572 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1574 rc = class_disconnect(conn, flags);
1576 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1577 class_export_put(exp);
1578 /* XXX cleanup preallocated inodes */
1582 struct dentry *__filter_oa2dentry(struct obd_device *obd,
1583 struct obdo *oa, const char *what)
1585 struct dentry *dchild = NULL;
1587 if (oa->o_valid & OBD_MD_FLHANDLE) {
1588 struct lustre_handle *ost_handle = obdo_handle(oa);
1589 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1592 struct filter_dentry_data *fdd;
1593 dchild = dget(ffd->ffd_file->f_dentry);
1594 fdd = dchild->d_fsdata;
1595 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1596 filter_ffd_put(ffd);
1598 CDEBUG(D_INODE,"%s got child objid %*s: %p, count %d\n",
1599 what, dchild->d_name.len, dchild->d_name.name,
1600 dchild, atomic_read(&dchild->d_count));
1605 dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
1607 if (IS_ERR(dchild)) {
1608 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1612 if (!dchild->d_inode) {
1613 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1615 RETURN(ERR_PTR(-ENOENT));
1621 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1622 struct lov_stripe_md *md)
1624 struct dentry *dentry = NULL;
1625 struct obd_device *obd;
1629 obd = class_conn2obd(conn);
1631 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1635 dentry = filter_oa2dentry(obd, oa);
1637 RETURN(PTR_ERR(dentry));
1639 /* Limit the valid bits in the return data to what we actually use */
1640 oa->o_valid = OBD_MD_FLID;
1641 obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1647 /* this is called from filter_truncate() until we have filter_punch() */
1648 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1649 struct lov_stripe_md *md, struct obd_trans_info *oti)
1651 struct obd_run_ctxt saved;
1652 struct obd_export *exp;
1653 struct filter_obd *filter;
1654 struct dentry *dentry;
1660 LASSERT(oti != NULL);
1661 exp = class_conn2export(conn);
1663 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1667 dentry = filter_oa2dentry(exp->exp_obd, oa);
1669 GOTO(out_exp, rc = PTR_ERR(dentry));
1671 filter = &exp->exp_obd->u.filter;
1673 iattr_from_obdo(&iattr, oa, oa->o_valid);
1675 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1678 /* XXX this could be a rwsem instead, if filter_preprw played along */
1679 if (iattr.ia_valid & ATTR_SIZE)
1680 down(&dentry->d_inode->i_sem);
1682 handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
1685 GOTO(out_unlock, rc = PTR_ERR(handle));
1687 rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
1688 rc = filter_finish_transno(exp, oti, rc);
1689 rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
1691 CERROR("error on commit, err = %d\n", rc2);
1696 if (iattr.ia_valid & ATTR_SIZE)
1697 up(&dentry->d_inode->i_sem);
1699 oa->o_valid = OBD_MD_FLID;
1700 obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1704 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1708 class_export_put(exp);
1712 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1713 struct lov_stripe_md *ea, struct obd_trans_info *oti,
1714 struct obd_client_handle *och)
1716 struct obd_export *exp;
1717 struct lustre_handle *handle;
1718 struct filter_file_data *ffd;
1720 struct lustre_handle parent_lockh;
1724 exp = class_conn2export(conn);
1726 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1730 filp = filter_obj_open(exp, oti, oa->o_id, oa->o_mode,
1731 LCK_PR, &parent_lockh);
1733 GOTO(out, rc = PTR_ERR(filp));
1735 oa->o_valid = OBD_MD_FLID;
1736 obdo_from_inode(oa, filp->f_dentry->d_inode, FILTER_VALID_FLAGS);
1738 ffd = filp->private_data;
1739 handle = obdo_handle(oa);
1740 handle->cookie = ffd->ffd_handle.h_cookie;
1741 oa->o_valid |= OBD_MD_FLHANDLE;
1744 class_export_put(exp);
1746 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1747 sizeof(parent_lockh));
1748 oti->oti_ack_locks[0].mode = LCK_PR;
1753 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1754 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1756 struct obd_export *exp;
1757 struct filter_file_data *ffd;
1758 struct filter_export_data *fed;
1762 exp = class_conn2export(conn);
1764 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1768 if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1769 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1770 GOTO(out, rc = -EINVAL);
1773 ffd = filter_handle2ffd(obdo_handle(oa));
1775 CERROR("bad handle ("LPX64") for close\n",
1776 obdo_handle(oa)->cookie);
1777 GOTO(out, rc = -ESTALE);
1780 fed = &exp->exp_filter_data;
1781 spin_lock(&fed->fed_lock);
1782 list_del(&ffd->ffd_export_list);
1783 spin_unlock(&fed->fed_lock);
1785 oa->o_valid = OBD_MD_FLID;
1786 obdo_from_inode(oa,ffd->ffd_file->f_dentry->d_inode,FILTER_VALID_FLAGS);
1788 rc = filter_close_internal(exp, ffd, oti, 0);
1789 filter_ffd_put(ffd);
1792 class_export_put(exp);
1796 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1797 struct lov_stripe_md **ea, struct obd_trans_info *oti)
1799 struct obd_export *exp;
1800 struct obd_device *obd;
1801 struct filter_obd *filter;
1802 struct obd_run_ctxt saved;
1803 struct lustre_handle parent_lockh;
1804 struct dentry *dparent;
1805 struct ll_fid mds_fid = { .id = 0 };
1806 struct dentry *dchild = NULL;
1808 int err, rc, cleanup_phase;
1811 exp = class_conn2export(conn);
1813 CDEBUG(D_IOCTL,"invalid client cookie "LPX64"\n", conn->cookie);
1818 filter = &obd->u.filter;
1819 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1821 oa->o_id = filter_next_id(filter);
1824 dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1826 if (IS_ERR(dparent))
1827 GOTO(cleanup, rc = PTR_ERR(dparent));
1830 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1832 GOTO(cleanup, rc = PTR_ERR(dchild));
1833 if (dchild->d_inode) {
1834 /* This would only happen if lastobjid was bad on disk */
1835 CERROR("Serious error: objid %*s already exists; is this "
1836 "filesystem corrupt? I will try to work around it.\n",
1837 dchild->d_name.len, dchild->d_name.name);
1839 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1844 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE_LOG, oti);
1846 GOTO(cleanup, rc = PTR_ERR(handle));
1848 rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
1850 CERROR("create failed rc = %d\n", rc);
1851 } else if (oa->o_valid & (OBD_MD_FLCTIME|OBD_MD_FLMTIME|OBD_MD_FLSIZE)){
1854 iattr_from_obdo(&attr, oa, oa->o_valid);
1855 rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
1857 CERROR("create setattr failed rc = %d\n", rc);
1859 rc = filter_finish_transno(exp, oti, rc);
1860 err = filter_update_server_data(obd, filter->fo_rcvd_filp,
1863 CERROR("unable to write lastobjid but file created\n");
1865 /* Set flags for fields we have set in the inode struct */
1866 if (!rc && mds_fid.id && (oa->o_valid & OBD_MD_FLCOOKIE)) {
1867 err = filter_log_op_create(obd->u.filter.fo_catalog, &mds_fid,
1868 dchild->d_inode->i_ino,
1869 dchild->d_inode->i_generation,
1870 oti->oti_logcookies);
1872 CERROR("error logging create record: rc %d\n", err);
1873 oa->o_valid = OBD_MD_FLID;
1875 oa->o_valid = OBD_MD_FLID | OBD_MD_FLCOOKIE;
1878 oa->o_valid = OBD_MD_FLID;
1880 err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1882 CERROR("error on commit, err = %d\n", err);
1890 /* Set flags for fields we have set in the inode struct */
1891 obdo_from_inode(oa, dchild->d_inode, FILTER_VALID_FLAGS);
1895 switch(cleanup_phase) {
1898 case 1: /* locked parent dentry */
1899 if (rc || oti == NULL) {
1900 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1902 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1903 sizeof(parent_lockh));
1904 oti->oti_ack_locks[0].mode = LCK_PW;
1907 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1908 class_export_put(exp);
1911 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1918 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1919 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1921 struct obd_export *exp;
1922 struct obd_device *obd;
1923 struct filter_obd *filter;
1924 struct dentry *dchild = NULL, *dparent = NULL;
1925 struct filter_dentry_data *fdd;
1926 struct obd_run_ctxt saved;
1927 void *handle = NULL;
1928 struct lustre_handle parent_lockh;
1929 struct llog_cookie *fcc = NULL;
1930 int rc, rc2, cleanup_phase = 0;
1933 exp = class_conn2export(conn);
1935 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1940 filter = &obd->u.filter;
1942 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1943 dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1944 LCK_PW, &parent_lockh);
1945 if (IS_ERR(dparent))
1946 GOTO(cleanup, rc = PTR_ERR(dparent));
1949 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1951 GOTO(cleanup, rc = -ENOENT);
1954 if (dchild->d_inode == NULL) {
1955 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
1956 GOTO(cleanup, rc = -ENOENT);
1958 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK_LOG, oti);
1960 GOTO(cleanup, rc = PTR_ERR(handle));
1963 fdd = dchild->d_fsdata;
1965 /* Our MDC connection is established by the MDS to us */
1966 if ((oa->o_valid & OBD_MD_FLCOOKIE) && filter->fo_mdc_imp != NULL) {
1967 OBD_ALLOC(fcc, sizeof(*fcc));
1969 memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
1972 if (fdd != NULL && atomic_read(&fdd->fdd_open_count)) {
1973 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1974 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1975 fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1977 #ifdef ENABLE_ORPHANS
1978 filter_log_op_orphan(filter->fo_catalog, oa->o_id,
1979 oa->o_generation,&fdd->fdd_cookie);
1982 "defer destroy of %dx open objid "LPU64"\n",
1983 atomic_read(&fdd->fdd_open_count), oa->o_id);
1986 "repeat destroy of %dx open objid "LPU64"\n",
1987 atomic_read(&fdd->fdd_open_count), oa->o_id);
1989 GOTO(cleanup, rc = 0);
1992 rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
1995 switch(cleanup_phase) {
1998 fsfilt_set_last_rcvd(obd, 0, oti->oti_handle,
1999 filter_cancel_cookies_cb, fcc);
2000 rc = filter_finish_transno(exp, oti, rc);
2001 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2003 CERROR("error on commit, err = %d\n", rc2);
2010 if (rc || oti == NULL) {
2011 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
2013 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
2014 sizeof(parent_lockh));
2015 oti->oti_ack_locks[0].mode = LCK_PW;
2018 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
2019 class_export_put(exp);
2022 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2029 /* NB start and end are used for punch, but not truncate */
2030 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
2031 struct lov_stripe_md *lsm,
2032 obd_off start, obd_off end,
2033 struct obd_trans_info *oti)
2038 if (end != OBD_OBJECT_EOF)
2039 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2042 CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
2043 "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2045 error = filter_setattr(conn, oa, NULL, oti);
2049 static int filter_syncfs(struct obd_export *exp)
2053 RETURN(fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb));
2056 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2057 unsigned long max_age)
2060 RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2063 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2064 void *key, __u32 *vallen, void *val)
2066 struct obd_device *obd;
2069 obd = class_conn2obd(conn);
2071 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2076 if (keylen == strlen("blocksize") &&
2077 memcmp(key, "blocksize", keylen) == 0) {
2078 __u32 *blocksize = val;
2079 *vallen = sizeof(*blocksize);
2080 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2084 if (keylen == strlen("blocksize_bits") &&
2085 memcmp(key, "blocksize_bits", keylen) == 0) {
2086 __u32 *blocksize_bits = val;
2087 *vallen = sizeof(*blocksize_bits);
2088 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2092 CDEBUG(D_IOCTL, "invalid key\n");
2096 static int filter_set_info(struct lustre_handle *conn, __u32 keylen,
2097 void *key, __u32 vallen, void *val)
2099 struct obd_device *obd;
2100 struct obd_export *exp;
2101 struct obd_import *imp;
2104 obd = class_conn2obd(conn);
2106 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2111 if (keylen < strlen("mds_conn") ||
2112 memcmp(key, "mds_conn", keylen) != 0)
2115 CERROR("Received MDS connection ("LPX64")\n", conn->cookie);
2116 memcpy(&obd->u.filter.fo_mdc_conn, conn, sizeof(*conn));
2118 imp = obd->u.filter.fo_mdc_imp = class_new_import();
2120 exp = class_conn2export(conn);
2121 imp->imp_connection = ptlrpc_connection_addref(exp->exp_connection);
2122 class_export_put(exp);
2124 imp->imp_client = &obd->u.filter.fo_mdc_client;
2125 imp->imp_remote_handle = *conn;
2127 imp->imp_dlm_fake = 1; /* XXX rename imp_dlm_fake to something else */
2128 imp->imp_level = LUSTRE_CONN_FULL;
2129 class_import_put(imp);
2134 int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2135 int len, void *karg, void *uarg)
2137 struct obd_device *obd = class_conn2obd(conn);
2140 case OBD_IOC_ABORT_RECOVERY:
2141 CERROR("aborting recovery for device %s\n", obd->obd_name);
2142 target_abort_recovery(obd);
2151 static struct obd_ops filter_obd_ops = {
2152 o_owner: THIS_MODULE,
2153 o_attach: filter_attach,
2154 o_detach: filter_detach,
2155 o_get_info: filter_get_info,
2156 o_set_info: filter_set_info,
2157 o_setup: filter_setup,
2158 o_cleanup: filter_cleanup,
2159 o_connect: filter_connect,
2160 o_disconnect: filter_disconnect,
2161 o_statfs: filter_statfs,
2162 o_syncfs: filter_syncfs,
2163 o_getattr: filter_getattr,
2164 o_create: filter_create,
2165 o_setattr: filter_setattr,
2166 o_destroy: filter_destroy,
2167 o_open: filter_open,
2168 o_close: filter_close,
2170 o_punch: filter_truncate,
2171 o_preprw: filter_preprw,
2172 o_commitrw: filter_commitrw,
2173 o_log_cancel: filter_log_cancel,
2174 o_destroy_export: filter_destroy_export,
2175 o_iocontrol: filter_iocontrol,
2178 static struct obd_ops filter_sanobd_ops = {
2179 o_owner: THIS_MODULE,
2180 o_attach: filter_attach,
2181 o_detach: filter_detach,
2182 o_get_info: filter_get_info,
2183 o_set_info: filter_set_info,
2184 o_setup: filter_san_setup,
2185 o_cleanup: filter_cleanup,
2186 o_connect: filter_connect,
2187 o_disconnect: filter_disconnect,
2188 o_statfs: filter_statfs,
2189 o_getattr: filter_getattr,
2190 o_create: filter_create,
2191 o_setattr: filter_setattr,
2192 o_destroy: filter_destroy,
2193 o_open: filter_open,
2194 o_close: filter_close,
2196 o_punch: filter_truncate,
2197 o_preprw: filter_preprw,
2198 o_commitrw: filter_commitrw,
2199 o_log_cancel: filter_log_cancel,
2200 o_san_preprw: filter_san_preprw,
2201 o_destroy_export: filter_destroy_export,
2202 o_iocontrol: filter_iocontrol,
2205 static int __init obdfilter_init(void)
2207 struct lprocfs_static_vars lvars;
2210 printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2212 lprocfs_init_vars(filter, &lvars);
2214 rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2215 OBD_FILTER_DEVICENAME);
2219 rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2220 OBD_FILTER_SAN_DEVICENAME);
2222 class_unregister_type(OBD_FILTER_DEVICENAME);
2226 static void __exit obdfilter_exit(void)
2228 class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2229 class_unregister_type(OBD_FILTER_DEVICENAME);
2232 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2233 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2234 MODULE_LICENSE("GPL");
2236 module_init(obdfilter_init);
2237 module_exit(obdfilter_exit);