1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28 * (which need to get journal_lock, may block if journal full).
30 * Invariant: Call filter_start_transno() before any journal ops to avoid the
31 * same deadlock problem. We can (and want) to get rid of the
32 * transno sem in favour of the dir/inode i_sem to avoid single
33 * threaded operation on the OST.
36 #define DEBUG_SUBSYSTEM S_FILTER
38 #include <linux/config.h>
39 #include <linux/module.h>
41 #include <linux/dcache.h>
42 #include <linux/init.h>
43 #include <linux/version.h>
44 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
45 # include <linux/mount.h>
46 # include <linux/buffer_head.h>
49 #include <linux/obd_class.h>
50 #include <linux/lustre_dlm.h>
51 #include <linux/lustre_fsfilt.h>
52 #include <linux/lprocfs_status.h>
53 #include <linux/lustre_log.h>
54 #include <linux/lustre_commit_confd.h>
56 #include "filter_internal.h"
59 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
61 [S_IFREG >> S_SHIFT] "R",
62 [S_IFDIR >> S_SHIFT] "D",
63 [S_IFCHR >> S_SHIFT] "C",
64 [S_IFBLK >> S_SHIFT] "B",
65 [S_IFIFO >> S_SHIFT] "F",
66 [S_IFSOCK >> S_SHIFT] "S",
67 [S_IFLNK >> S_SHIFT] "L"
70 static inline const char *obd_mode_to_type(int mode)
72 return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
75 static void filter_ffd_addref(void *ffdp)
77 struct filter_file_data *ffd = ffdp;
79 atomic_inc(&ffd->ffd_refcount);
80 CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
81 atomic_read(&ffd->ffd_refcount));
84 static struct filter_file_data *filter_ffd_new(void)
86 struct filter_file_data *ffd;
88 OBD_ALLOC(ffd, sizeof *ffd);
90 CERROR("out of memory\n");
94 atomic_set(&ffd->ffd_refcount, 2);
96 INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
97 class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
102 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
104 struct filter_file_data *ffd = NULL;
106 LASSERT(handle != NULL);
107 ffd = class_handle2object(handle->cookie);
109 LASSERT(ffd->ffd_file->private_data == ffd);
113 static void filter_ffd_put(struct filter_file_data *ffd)
115 CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
116 atomic_read(&ffd->ffd_refcount) - 1);
117 LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
118 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
119 if (atomic_dec_and_test(&ffd->ffd_refcount)) {
120 LASSERT(list_empty(&ffd->ffd_handle.h_link));
121 OBD_FREE(ffd, sizeof *ffd);
125 static void filter_ffd_destroy(struct filter_file_data *ffd)
127 class_handle_unhash(&ffd->ffd_handle);
131 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
132 void *cb_data, int error)
134 obd_transno_commit_cb(obd, transno, error);
137 static int filter_client_log_cancel(struct lustre_handle *conn,
138 struct lov_stripe_md *lsm, int count,
139 struct llog_cookie *cookies, int flags)
141 struct obd_device *obd = class_conn2obd(conn);
142 struct llog_commit_data *llcd;
143 struct filter_obd *filter = &obd->u.filter;
147 if (count == 0 || cookies == NULL) {
148 down(&filter->fo_sem);
149 if (filter->fo_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
152 llcd = filter->fo_llcd;
156 down(&filter->fo_sem);
157 llcd = filter->fo_llcd;
161 CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
162 cookies->lgc_lgl.lgl_oid,
163 cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
164 GOTO(out, rc = -ENOMEM);
166 llcd->llcd_import = filter->fo_mdc_imp;
167 filter->fo_llcd = llcd;
170 memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
172 llcd->llcd_cookiebytes += sizeof(*cookies);
176 if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
177 flags & OBD_LLOG_FL_SENDNOW)) {
178 filter->fo_llcd = NULL;
187 /* When this (destroy) operation is committed, return the cancel cookie */
188 static void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
189 void *cb_data, int error)
191 filter_client_log_cancel(&obd->u.filter.fo_mdc_conn, NULL, 1,
192 cb_data, OBD_LLOG_FL_SENDNOW);
193 OBD_FREE(cb_data, sizeof(struct llog_cookie));
196 /* Assumes caller has already pushed us into the kernel context. */
197 int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
200 struct filter_obd *filter = &exp->exp_obd->u.filter;
201 struct filter_export_data *fed = &exp->exp_filter_data;
202 struct filter_client_data *fcd = fed->fed_fcd;
207 /* Propagate error code. */
211 if (!exp->exp_obd->obd_replayable)
214 /* we don't allocate new transnos for replayed requests */
215 if (oti != NULL && oti->oti_transno == 0) {
216 spin_lock(&filter->fo_translock);
217 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
218 filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
219 spin_unlock(&filter->fo_translock);
220 oti->oti_transno = last_rcvd;
221 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
222 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
224 /* could get xid from oti, if it's ever needed */
225 fcd->fcd_last_xid = 0;
227 off = fed->fed_lr_off;
228 fsfilt_set_last_rcvd(exp->exp_obd, last_rcvd, oti->oti_handle,
229 filter_commit_cb, NULL);
230 written = fsfilt_write_record(exp->exp_obd,
231 filter->fo_rcvd_filp, fcd,
233 CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
234 "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid,
235 fed->fed_lr_idx, written);
237 if (written == sizeof(*fcd))
239 CERROR("error writing to %s: rc = %d\n", LAST_RCVD,
249 void f_dput(struct dentry *dentry)
251 /* Can't go inside filter_ddelete because it can block */
252 CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
253 dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
254 LASSERT(atomic_read(&dentry->d_count) > 0);
259 /* Not racy w.r.t. others, because we are the only user of this dentry */
260 static void filter_drelease(struct dentry *dentry)
262 if (dentry->d_fsdata)
263 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
266 struct dentry_operations filter_dops = {
267 d_release: filter_drelease,
270 /* Add client data to the FILTER. We use a bitmap to locate a free space
271 * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
272 * Otherwise, we have just read the data from the last_rcvd file and
273 * we know its offset. */
274 static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
275 struct filter_export_data *fed, int cl_idx)
277 unsigned long *bitmap = filter->fo_last_rcvd_slots;
278 int new_client = (cl_idx == -1);
281 LASSERT(bitmap != NULL);
283 /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
284 if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
287 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
288 * there's no need for extra complication here
291 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
293 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
294 CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
297 if (test_and_set_bit(cl_idx, bitmap)) {
298 CERROR("FILTER client %d: found bit is set in bitmap\n",
300 cl_idx = find_next_zero_bit(bitmap,
301 FILTER_LR_MAX_CLIENTS,
306 if (test_and_set_bit(cl_idx, bitmap)) {
307 CERROR("FILTER client %d: bit already set in bitmap!\n",
313 fed->fed_lr_idx = cl_idx;
314 fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
315 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
317 CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
318 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
321 struct obd_run_ctxt saved;
322 loff_t off = fed->fed_lr_off;
326 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
327 fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
329 push_ctxt(&saved, &filter->fo_ctxt, NULL);
330 /* Transaction needed to fix bug 1403 */
331 handle = fsfilt_start(obd,
332 filter->fo_rcvd_filp->f_dentry->d_inode,
333 FSFILT_OP_SETATTR, NULL);
334 if (IS_ERR(handle)) {
335 written = PTR_ERR(handle);
336 CERROR("unable to start transaction: rc %d\n",
339 written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
341 sizeof(*fed->fed_fcd),
344 filter->fo_rcvd_filp->f_dentry->d_inode,
347 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
349 if (written != sizeof(*fed->fed_fcd)) {
350 CERROR("error writing %s client idx %u: rc %d\n",
351 LAST_RCVD, fed->fed_lr_idx, written);
360 static int filter_client_free(struct obd_export *exp, int flags)
362 struct filter_export_data *fed = &exp->exp_filter_data;
363 struct filter_obd *filter = &exp->exp_obd->u.filter;
364 struct obd_device *obd = exp->exp_obd;
365 struct filter_client_data zero_fcd;
366 struct obd_run_ctxt saved;
371 if (fed->fed_fcd == NULL)
374 if (flags & OBD_OPT_FAILOVER)
377 /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
378 if (strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID") == 0)
381 LASSERT(filter->fo_last_rcvd_slots != NULL);
383 off = fed->fed_lr_off;
385 CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
386 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
388 if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
389 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
394 memset(&zero_fcd, 0, sizeof zero_fcd);
395 push_ctxt(&saved, &filter->fo_ctxt, NULL);
396 written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
397 &zero_fcd, sizeof(zero_fcd), &off);
399 /* XXX: this write gets lost sometimes, unless this sync is here. */
401 file_fsync(filter->fo_rcvd_filp,
402 filter->fo_rcvd_filp->f_dentry, 1);
403 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
405 if (written != sizeof(zero_fcd)) {
406 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
407 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
411 "zeroed disconnecting client %s at idx %u (%llu)\n",
412 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
416 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
421 static int filter_free_server_data(struct filter_obd *filter)
423 OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
424 filter->fo_fsd = NULL;
425 OBD_FREE(filter->fo_last_rcvd_slots,
426 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
427 filter->fo_last_rcvd_slots = NULL;
431 /* assumes caller is already in kernel ctxt */
432 int filter_update_server_data(struct obd_device *obd,
433 struct file *filp, struct filter_server_data *fsd)
439 CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
440 CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
441 le64_to_cpu(fsd->fsd_last_objid));
442 CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
443 le64_to_cpu(fsd->fsd_last_transno));
444 CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
445 le64_to_cpu(fsd->fsd_mount_count));
447 rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off);
448 if (rc == sizeof(*fsd))
451 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n", rc);
457 /* assumes caller has already in kernel ctxt */
458 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
459 __u64 init_lastobjid)
461 struct filter_obd *filter = &obd->u.filter;
462 struct filter_server_data *fsd;
463 struct filter_client_data *fcd = NULL;
464 struct inode *inode = filp->f_dentry->d_inode;
465 unsigned long last_rcvd_size = inode->i_size;
466 __u64 mount_count = 0;
471 /* ensure padding in the struct is the correct size */
472 LASSERT (offsetof(struct filter_server_data, fsd_padding) +
473 sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
474 LASSERT (offsetof(struct filter_client_data, fcd_padding) +
475 sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
477 OBD_ALLOC(fsd, sizeof(*fsd));
480 filter->fo_fsd = fsd;
482 OBD_ALLOC(filter->fo_last_rcvd_slots,
483 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
484 if (filter->fo_last_rcvd_slots == NULL) {
485 OBD_FREE(fsd, sizeof(*fsd));
489 if (last_rcvd_size == 0) {
490 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
492 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
493 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
494 fsd->fsd_last_transno = 0;
495 mount_count = fsd->fsd_mount_count = 0;
496 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
497 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
498 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
499 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
500 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
502 int retval = fsfilt_read_record(obd, filp, fsd,
504 if (retval != sizeof(*fsd)) {
505 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
507 GOTO(err_fsd, rc = -EIO);
509 mount_count = le64_to_cpu(fsd->fsd_mount_count);
510 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
511 fsd->fsd_last_objid =
512 cpu_to_le64(le64_to_cpu(fsd->fsd_last_objid) +
516 if (fsd->fsd_feature_incompat) {
517 CERROR("unsupported feature %x\n",
518 le32_to_cpu(fsd->fsd_feature_incompat));
519 GOTO(err_fsd, rc = -EINVAL);
521 if (fsd->fsd_feature_rocompat) {
522 CERROR("read-only feature %x\n",
523 le32_to_cpu(fsd->fsd_feature_rocompat));
524 /* Do something like remount filesystem read-only */
525 GOTO(err_fsd, rc = -EINVAL);
528 CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
529 obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
530 CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
531 obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
532 CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
533 obd->obd_name, mount_count);
534 CDEBUG(D_INODE, "%s: server data size: %u\n",
535 obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
536 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
537 obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
538 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
539 obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
540 CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
541 obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
543 if (!obd->obd_replayable) {
544 CWARN("%s: recovery support OFF\n", obd->obd_name);
548 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
553 OBD_ALLOC(fcd, sizeof(*fcd));
555 GOTO(err_fsd, rc = -ENOMEM);
558 /* Don't assume off is incremented properly, in case
559 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
561 off = le32_to_cpu(fsd->fsd_client_start) +
562 cl_idx * le16_to_cpu(fsd->fsd_client_size);
563 rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
564 if (rc != sizeof(*fcd)) {
565 CERROR("error reading FILTER %s offset %d: rc = %d\n",
566 LAST_RCVD, cl_idx, rc);
567 if (rc > 0) /* XXX fatal error or just abort reading? */
572 if (fcd->fcd_uuid[0] == '\0') {
573 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
578 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
580 /* These exports are cleaned up by filter_disconnect(), so they
581 * need to be set up like real exports as filter_connect() does.
583 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
584 if (mount_age < FILTER_MOUNT_RECOV) {
585 struct obd_export *exp = class_new_export(obd);
586 struct filter_export_data *fed;
587 CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
588 " srv lr: "LPU64" mnt: "LPU64" last mount: "
589 LPU64"\n", fcd->fcd_uuid, cl_idx,
590 last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
591 le64_to_cpu(fcd->fcd_mount_count), mount_count);
593 /* XXX this rc is ignored */
597 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
598 sizeof exp->exp_client_uuid.uuid);
599 fed = &exp->exp_filter_data;
601 filter_client_add(obd, filter, fed, cl_idx);
602 /* create helper if export init gets more complex */
603 INIT_LIST_HEAD(&fed->fed_open_head);
604 spin_lock_init(&fed->fed_lock);
607 obd->obd_recoverable_clients++;
608 class_export_put(exp);
611 "discarded client %d UUID '%s' count "LPU64"\n",
612 cl_idx, fcd->fcd_uuid,
613 le64_to_cpu(fcd->fcd_mount_count));
616 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
619 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
620 filter->fo_fsd->fsd_last_transno=cpu_to_le64(last_rcvd);
622 obd->obd_last_committed =
623 le64_to_cpu(filter->fo_fsd->fsd_last_transno);
625 if (obd->obd_recoverable_clients) {
626 CERROR("RECOVERY: %d recoverable clients, last_rcvd "
627 LPU64"\n", obd->obd_recoverable_clients,
628 le64_to_cpu(filter->fo_fsd->fsd_last_transno));
629 obd->obd_next_recovery_transno =
630 obd->obd_last_committed + 1;
631 obd->obd_recovering = 1;
637 OBD_FREE(fcd, sizeof(*fcd));
640 fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
642 /* save it, so mount count and last_transno is current */
643 rc = filter_update_server_data(obd, filp, filter->fo_fsd);
648 filter_free_server_data(filter);
652 /* setup the object store with correct subdirectories */
653 static int filter_prep(struct obd_device *obd)
655 struct obd_run_ctxt saved;
656 struct filter_obd *filter = &obd->u.filter;
657 struct dentry *dentry, *O_dentry;
664 push_ctxt(&saved, &filter->fo_ctxt, NULL);
665 dentry = simple_mkdir(current->fs->pwd, "O", 0700);
666 CDEBUG(D_INODE, "got/created O: %p\n", dentry);
667 if (IS_ERR(dentry)) {
668 rc = PTR_ERR(dentry);
669 CERROR("cannot open/create O: rc = %d\n", rc);
672 filter->fo_dentry_O = dentry;
675 * Create directories and/or get dentries for each object type.
676 * This saves us from having to do multiple lookups for each one.
678 O_dentry = filter->fo_dentry_O;
679 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
680 char *name = obd_type_by_mode[mode];
683 filter->fo_dentry_O_mode[mode] = NULL;
686 dentry = simple_mkdir(O_dentry, name, 0700);
687 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
688 if (IS_ERR(dentry)) {
689 rc = PTR_ERR(dentry);
690 CERROR("cannot create O/%s: rc = %d\n", name, rc);
691 GOTO(err_O_mode, rc);
693 filter->fo_dentry_O_mode[mode] = dentry;
696 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
697 if (!file || IS_ERR(file)) {
699 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
701 GOTO(err_O_mode, rc);
704 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
705 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
706 file->f_dentry->d_inode->i_mode);
707 GOTO(err_filp, rc = -ENOENT);
710 rc = fsfilt_journal_data(obd, file);
712 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
715 /* steal operations */
716 inode = file->f_dentry->d_inode;
717 filter->fo_fop = file->f_op;
718 filter->fo_iop = inode->i_op;
719 filter->fo_aops = inode->i_mapping->a_ops;
720 #ifdef I_SKIP_PDFLUSH
722 * we need this to protect from deadlock
723 * pdflush vs. lustre_fwrite()
725 inode->i_flags |= I_SKIP_PDFLUSH;
728 rc = filter_init_server_data(obd, file, FILTER_INIT_OBJID);
730 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
731 GOTO(err_client, rc);
733 filter->fo_rcvd_filp = file;
735 if (filter->fo_subdir_count) {
736 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
737 OBD_ALLOC(filter->fo_dentry_O_sub,
738 filter->fo_subdir_count * sizeof(dentry));
739 if (!filter->fo_dentry_O_sub)
740 GOTO(err_client, rc = -ENOMEM);
742 for (i = 0; i < filter->fo_subdir_count; i++) {
744 snprintf(dir, sizeof(dir), "d%u", i);
746 dentry = simple_mkdir(O_dentry, dir, 0700);
747 CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
748 if (IS_ERR(dentry)) {
749 rc = PTR_ERR(dentry);
750 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
753 filter->fo_dentry_O_sub[i] = dentry;
758 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
764 struct dentry *dentry = filter->fo_dentry_O_sub[i];
767 filter->fo_dentry_O_sub[i] = NULL;
770 OBD_FREE(filter->fo_dentry_O_sub,
771 filter->fo_subdir_count * sizeof(dentry));
773 class_disconnect_exports(obd, 0);
775 if (filp_close(file, 0))
776 CERROR("can't close %s after error\n", LAST_RCVD);
777 filter->fo_rcvd_filp = NULL;
780 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
783 filter->fo_dentry_O_mode[mode] = NULL;
786 f_dput(filter->fo_dentry_O);
787 filter->fo_dentry_O = NULL;
791 /* cleanup the filter: write last used object id to status file */
792 static void filter_post(struct obd_device *obd)
794 struct obd_run_ctxt saved;
795 struct filter_obd *filter = &obd->u.filter;
799 /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
800 * best to start a transaction with h_sync, because we removed this
803 push_ctxt(&saved, &filter->fo_ctxt, NULL);
804 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
807 CERROR("error writing lastobjid: rc = %ld\n", rc);
810 if (filter->fo_rcvd_filp) {
811 rc = file_fsync(filter->fo_rcvd_filp,
812 filter->fo_rcvd_filp->f_dentry, 1);
813 filp_close(filter->fo_rcvd_filp, 0);
814 filter->fo_rcvd_filp = NULL;
816 CERROR("error closing %s: rc = %ld\n", LAST_RCVD, rc);
819 if (filter->fo_subdir_count) {
821 for (i = 0; i < filter->fo_subdir_count; i++) {
822 struct dentry *dentry = filter->fo_dentry_O_sub[i];
824 filter->fo_dentry_O_sub[i] = NULL;
826 OBD_FREE(filter->fo_dentry_O_sub,
827 filter->fo_subdir_count *
828 sizeof(*filter->fo_dentry_O_sub));
830 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
831 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
834 filter->fo_dentry_O_mode[mode] = NULL;
837 f_dput(filter->fo_dentry_O);
838 filter_free_server_data(filter);
839 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
842 __u64 filter_next_id(struct filter_obd *filter)
845 LASSERT(filter->fo_fsd != NULL);
847 spin_lock(&filter->fo_objidlock);
848 id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
849 filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
850 spin_unlock(&filter->fo_objidlock);
855 /* direct cut-n-paste of mds_blocking_ast() */
856 static int filter_blocking_ast(struct ldlm_lock *lock,
857 struct ldlm_lock_desc *desc,
858 void *data, int flag)
863 if (flag == LDLM_CB_CANCELING) {
864 /* Don't need to do anything here. */
868 /* XXX layering violation! -phil */
869 l_lock(&lock->l_resource->lr_namespace->ns_lock);
870 /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
871 * such that mds_blocking_ast is called just before l_i_p takes the
872 * ns_lock, then by the time we get the lock, we might not be the
873 * correct blocking function anymore. So check, and return early, if
875 if (lock->l_blocking_ast != filter_blocking_ast) {
876 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
880 lock->l_flags |= LDLM_FL_CBPENDING;
881 do_ast = (!lock->l_readers && !lock->l_writers);
882 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
885 struct lustre_handle lockh;
888 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
889 ldlm_lock2handle(lock, &lockh);
890 rc = ldlm_cli_cancel(&lockh);
892 CERROR("ldlm_cli_cancel: %d\n", rc);
894 LDLM_DEBUG(lock, "Lock still has references, will be "
900 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
901 ldlm_mode_t lock_mode,struct lustre_handle *lockh)
903 struct ldlm_res_id res_id = { .name = {0} };
907 res_id.name[0] = de->d_inode->i_ino;
908 res_id.name[1] = de->d_inode->i_generation;
909 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
910 res_id, LDLM_PLAIN, NULL, 0, lock_mode,
911 &flags, ldlm_completion_ast,
912 filter_blocking_ast, NULL, lockh);
914 RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
917 /* We never dget the object parent, so DON'T dput it either */
918 static void filter_parent_unlock(struct dentry *dparent,
919 struct lustre_handle *lockh,
920 ldlm_mode_t lock_mode)
922 ldlm_lock_decref(lockh, lock_mode);
925 /* We never dget the object parent, so DON'T dput it either */
926 struct dentry *filter_parent(struct obd_device *obd, obd_mode mode,
929 struct filter_obd *filter = &obd->u.filter;
931 LASSERT(S_ISREG(mode)); /* only regular files for now */
932 if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
933 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
935 return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
938 /* We never dget the object parent, so DON'T dput it either */
939 struct dentry *filter_parent_lock(struct obd_device *obd, obd_mode mode,
940 obd_id objid, ldlm_mode_t lock_mode,
941 struct lustre_handle *lockh)
943 unsigned long now = jiffies;
944 struct dentry *de = filter_parent(obd, mode, objid);
950 rc = filter_lock_dentry(obd, de, lock_mode, lockh);
951 if (time_after(jiffies, now + 15 * HZ))
952 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
953 return rc ? ERR_PTR(rc) : de;
956 /* How to get files, dentries, inodes from object id's.
958 * If dir_dentry is passed, the caller has already locked the parent
959 * appropriately for this operation (normally a write lock). If
960 * dir_dentry is NULL, we do a read lock while we do the lookup to
961 * avoid races with create/destroy and such changing the directory
962 * internal to the filesystem code. */
963 struct dentry *filter_fid2dentry(struct obd_device *obd,
964 struct dentry *dir_dentry,
965 obd_mode mode, obd_id id)
967 struct lustre_handle lockh;
968 struct dentry *dparent = dir_dentry;
969 struct dentry *dchild;
975 CERROR("fatal: invalid object id 0\n");
977 RETURN(ERR_PTR(-ESTALE));
980 len = sprintf(name, LPU64, id);
981 if (dir_dentry == NULL) {
982 dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
986 CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
987 dparent->d_name.len, dparent->d_name.name, name);
988 dchild = ll_lookup_one_len(name, dparent, len);
989 if (dir_dentry == NULL)
990 filter_parent_unlock(dparent, &lockh, LCK_PR);
991 if (IS_ERR(dchild)) {
992 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
996 CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
997 name, dchild, atomic_read(&dchild->d_count));
999 LASSERT(atomic_read(&dchild->d_count) > 0);
1004 static struct file *filter_obj_open(struct obd_export *export,
1005 struct obd_trans_info *oti,
1006 __u64 id, __u32 type, int parent_mode,
1007 struct lustre_handle *parent_lockh)
1009 struct obd_device *obd = export->exp_obd;
1010 struct filter_obd *filter = &obd->u.filter;
1011 struct dentry *dchild = NULL, *dparent = NULL;
1012 struct filter_export_data *fed = &export->exp_filter_data;
1013 struct filter_dentry_data *fdd = NULL;
1014 struct filter_file_data *ffd = NULL;
1015 struct obd_run_ctxt saved;
1018 int len, cleanup_phase = 0;
1021 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1024 CERROR("fatal: invalid obdo "LPU64"\n", id);
1025 GOTO(cleanup, file = ERR_PTR(-ESTALE));
1028 if (!(type & S_IFMT)) {
1029 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
1030 __FUNCTION__, id, type);
1031 GOTO(cleanup, file = ERR_PTR(-EINVAL));
1034 ffd = filter_ffd_new();
1036 CERROR("obdfilter: out of memory\n");
1037 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1042 /* We preallocate this to avoid blocking while holding fo_fddlock */
1043 OBD_ALLOC(fdd, sizeof *fdd);
1045 CERROR("obdfilter: out of memory\n");
1046 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1051 dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1052 if (IS_ERR(dparent))
1053 GOTO(cleanup, file = (void *)dparent);
1057 len = snprintf(name, sizeof(name), LPU64, id);
1058 dchild = ll_lookup_one_len(name, dparent, len);
1060 GOTO(cleanup, file = (void *)dchild);
1064 if (dchild->d_inode == NULL) {
1065 CERROR("opening non-existent object %s - O_CREAT?\n", name);
1066 /* dput(dchild); call filter_create_internal here */
1067 file = ERR_PTR(-ENOENT);
1068 GOTO(cleanup, file);
1071 /* dentry_open does a dput(dchild) and mntput(mnt) on error */
1072 mntget(filter->fo_vfsmnt);
1073 file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1075 dchild = NULL; /* prevent a double dput in step 4 */
1076 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1077 GOTO(cleanup, file);
1080 spin_lock(&filter->fo_fddlock);
1081 if (dchild->d_fsdata) {
1082 spin_unlock(&filter->fo_fddlock);
1083 OBD_FREE(fdd, sizeof *fdd);
1084 fdd = dchild->d_fsdata;
1085 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1086 /* should only happen during client recovery */
1087 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1088 CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1089 atomic_inc(&fdd->fdd_open_count);
1091 atomic_set(&fdd->fdd_open_count, 1);
1092 fdd->fdd_magic = FILTER_DENTRY_MAGIC;
1094 fdd->fdd_objid = id;
1095 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1096 dchild->d_fsdata = fdd;
1097 spin_unlock(&filter->fo_fddlock);
1100 ffd->ffd_file = file;
1101 LASSERT(file->private_data == NULL);
1102 file->private_data = ffd;
1105 dchild->d_op = &filter_dops;
1107 LASSERT(dchild->d_op == &filter_dops);
1109 spin_lock(&fed->fed_lock);
1110 list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1111 spin_unlock(&fed->fed_lock);
1113 CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1115 switch (cleanup_phase) {
1121 filter_parent_unlock(dparent, parent_lockh,parent_mode);
1124 OBD_FREE(fdd, sizeof *fdd);
1127 filter_ffd_destroy(ffd);
1128 filter_ffd_put(ffd);
1130 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1135 /* Caller must hold LCK_PW on parent and push us into kernel context.
1136 * Caller is also required to ensure that dchild->d_inode exists. */
1137 static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
1138 struct dentry *dparent,
1139 struct dentry *dchild)
1141 struct inode *inode = dchild->d_inode;
1145 if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1146 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1147 dchild->d_name.len, dchild->d_name.name,
1148 inode->i_nlink, atomic_read(&inode->i_count));
1153 /* Tell the clients that the object is gone now and that they should
1154 * throw away any cached pages. We don't need to wait until they're
1155 * done, so just decref the lock right away and let ldlm_completion_ast
1156 * clean up when it's all over. */
1157 ldlm_cli_enqueue(..., LCK_PW, AST_INTENT_DESTROY, &lockh);
1158 ldlm_lock_decref(&lockh, LCK_PW);
1162 struct lustre_handle lockh;
1164 struct ldlm_res_id res_id = { .name = { objid } };
1166 /* This part is a wee bit iffy: we really only want to bust the
1167 * locks on our stripe, so that we don't end up bouncing
1168 * [0->EOF] locks around on each of the OSTs as the rest of the
1169 * destroys get processed. Because we're only talking to
1170 * the local LDLM, though, we should only end up locking the
1171 * whole of our stripe. When bug 1425 (take all locks on OST
1172 * for stripe 0) is fixed, this code should be revisited. */
1173 struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
1175 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1176 res_id, LDLM_EXTENT, &extent,
1177 sizeof(extent), LCK_PW, &flags,
1178 ldlm_completion_ast, filter_blocking_ast,
1180 /* We only care about the side-effects, just drop the lock. */
1181 ldlm_lock_decref(&lockh, LCK_PW);
1184 rc = vfs_unlink(dparent->d_inode, dchild);
1187 CERROR("error unlinking objid %*s: rc %d\n",
1188 dchild->d_name.len, dchild->d_name.name, rc);
1193 /* If closing because we are failing this device, then
1194 don't do the unlink on close.
1196 static int filter_close_internal(struct obd_export *exp,
1197 struct filter_file_data *ffd,
1198 struct obd_trans_info *oti, int flags)
1200 struct obd_device *obd = exp->exp_obd;
1201 struct filter_obd *filter = &obd->u.filter;
1202 struct file *filp = ffd->ffd_file;
1203 struct dentry *dchild = dget(filp->f_dentry);
1204 struct filter_dentry_data *fdd = dchild->d_fsdata;
1205 struct lustre_handle parent_lockh;
1206 int rc, rc2, cleanup_phase = 0;
1207 struct dentry *dparent = NULL;
1208 struct obd_run_ctxt saved;
1209 int nested_trans = (current->journal_info != NULL);
1212 LASSERT(filp->private_data == ffd);
1213 LASSERT(fdd != NULL);
1214 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1216 rc = filp_close(filp, 0);
1218 if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1219 (fdd->fdd_flags & FILTER_FLAG_DESTROY) &&
1220 !(flags & OBD_OPT_FAILOVER)) {
1223 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1226 LASSERT(fdd->fdd_objid > 0);
1227 dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1228 LCK_PW, &parent_lockh);
1229 if (IS_ERR(dparent))
1230 GOTO(cleanup, rc = PTR_ERR(dparent));
1233 handle = fsfilt_start(obd, dparent->d_inode,
1234 FSFILT_OP_UNLINK_LOG, oti);
1236 GOTO(cleanup, rc = PTR_ERR(handle));
1239 if (oti->oti_handle == NULL)
1240 oti->oti_handle = handle;
1242 LASSERT(oti->oti_handle == handle);
1245 #ifdef ENABLE_ORPHANS
1246 /* Remove orphan unlink record from log */
1247 llog_cancel_records(filter->fo_catalog, 1, &fdd->fdd_cookie);
1249 /* XXX unlink from PENDING directory now too */
1250 rc2 = filter_destroy_internal(obd, fdd->fdd_objid, dparent,
1254 rc = filter_finish_transno(exp, oti, rc);
1255 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1257 CERROR("error on commit, err = %d\n", rc2);
1261 if (nested_trans == 0) {
1262 LASSERT(current->journal_info == NULL);
1264 oti->oti_handle = NULL;
1269 switch(cleanup_phase) {
1271 if (rc || oti == NULL) {
1272 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1274 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1275 sizeof(parent_lockh));
1276 oti->oti_ack_locks[0].mode = LCK_PW;
1279 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1282 filter_ffd_destroy(ffd);
1285 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1292 /* mount the file system (secretly) */
1293 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1296 struct obd_ioctl_data* data = buf;
1297 struct filter_obd *filter = &obd->u.filter;
1298 struct vfsmount *mnt;
1302 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1305 obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1306 if (IS_ERR(obd->obd_fsops))
1307 RETURN(PTR_ERR(obd->obd_fsops));
1309 mnt = do_kern_mount(data->ioc_inlbuf2, MS_NOATIME | MS_NODIRATIME,
1310 data->ioc_inlbuf1, option);
1315 if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1316 if (*data->ioc_inlbuf3 == 'f') {
1317 obd->obd_replayable = 1;
1318 obd_sync_filter = 1;
1319 CERROR("%s: configured for recovery and sync write\n",
1322 if (*data->ioc_inlbuf3 != 'n') {
1323 CERROR("unrecognised flag '%c'\n",
1324 *data->ioc_inlbuf3);
1329 if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
1330 if (*data->ioc_inlbuf4 == '/') {
1331 CERROR("filter namespace mount: %s\n",
1333 filter->fo_nspath = strdup(data->ioc_inlbuf4);
1335 CERROR("namespace mount must be absolute path: '%s'\n",
1340 filter->fo_vfsmnt = mnt;
1341 filter->fo_sb = mnt->mnt_sb;
1342 filter->fo_fstype = mnt->mnt_sb->s_type->name;
1343 CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1345 OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1346 filter->fo_ctxt.pwdmnt = mnt;
1347 filter->fo_ctxt.pwd = mnt->mnt_root;
1348 filter->fo_ctxt.fs = get_ds();
1350 rc = filter_prep(obd);
1352 GOTO(err_mntput, rc);
1354 spin_lock_init(&filter->fo_translock);
1355 spin_lock_init(&filter->fo_fddlock);
1356 spin_lock_init(&filter->fo_objidlock);
1357 INIT_LIST_HEAD(&filter->fo_export_list);
1359 ptlrpc_init_client(MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1360 "filter_mdc", &filter->fo_mdc_client);
1361 sema_init(&filter->fo_sem, 1);
1363 obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1364 LDLM_NAMESPACE_SERVER);
1365 if (obd->obd_namespace == NULL)
1366 GOTO(err_post, rc = -ENOMEM);
1368 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1369 "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1371 /* Create a non-replaying connection for recovery logging, so that
1372 * we don't create a client entry for this local connection, and do
1373 * not log or assign transaction numbers for logging operations. */
1374 #ifdef ENABLE_ORPHANS
1375 filter->fo_catalog = filter_get_catalog(obd);
1376 if (IS_ERR(filter->fo_catalog))
1377 GOTO(err_post, rc = PTR_ERR(filter->fo_catalog));
1390 fsfilt_put_ops(obd->obd_fsops);
1394 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1396 struct obd_ioctl_data* data = buf;
1397 char *option = NULL;
1399 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1400 /* bug 1577: implement async-delete for 2.5 */
1401 if (!strcmp(data->ioc_inlbuf2, "ext3"))
1402 option = "asyncdel";
1405 return filter_common_setup(obd, len, buf, option);
1408 static int filter_cleanup(struct obd_device *obd, int flags)
1410 struct filter_obd *filter = &obd->u.filter;
1413 if (flags & OBD_OPT_FAILOVER)
1414 CERROR("%s: shutting down for failover; client state will"
1415 " be preserved.\n", obd->obd_name);
1417 if (!list_empty(&obd->obd_exports)) {
1418 CERROR("%s: still has clients!\n", obd->obd_name);
1419 class_disconnect_exports(obd, flags);
1420 if (!list_empty(&obd->obd_exports)) {
1421 CERROR("still has exports after forced cleanup?\n");
1426 #ifdef ENABLE_ORPHANS
1427 filter_put_catalog(filter->fo_catalog);
1430 ldlm_namespace_free(obd->obd_namespace);
1432 if (filter->fo_sb == NULL)
1437 shrink_dcache_parent(filter->fo_sb->s_root);
1440 if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
1441 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1442 atomic_read(&filter->fo_vfsmnt->mnt_count));
1445 mntput(filter->fo_vfsmnt);
1446 //destroy_buffers(filter->fo_sb->s_dev);
1447 filter->fo_sb = NULL;
1448 fsfilt_put_ops(obd->obd_fsops);
1454 static int filter_attach(struct obd_device *obd, obd_count len, void *data)
1456 struct lprocfs_static_vars lvars;
1459 lprocfs_init_vars(filter, &lvars);
1460 rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1464 rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1468 /* Init obdfilter private stats here */
1469 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1470 LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1471 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1472 LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1476 static int filter_detach(struct obd_device *dev)
1478 lprocfs_free_obd_stats(dev);
1479 return lprocfs_obd_detach(dev);
1482 /* nearly identical to mds_connect */
1483 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1484 struct obd_uuid *cluuid)
1486 struct obd_export *exp;
1487 struct filter_export_data *fed;
1488 struct filter_client_data *fcd;
1489 struct filter_obd *filter = &obd->u.filter;
1493 if (conn == NULL || obd == NULL || cluuid == NULL)
1496 rc = class_connect(conn, obd, cluuid);
1499 exp = class_conn2export(conn);
1500 LASSERT(exp != NULL);
1502 fed = &exp->exp_filter_data;
1503 class_export_put(exp);
1505 INIT_LIST_HEAD(&fed->fed_open_head);
1506 spin_lock_init(&fed->fed_lock);
1508 if (!obd->obd_replayable)
1511 OBD_ALLOC(fcd, sizeof(*fcd));
1513 CERROR("filter: out of memory for client data\n");
1514 GOTO(out_export, rc = -ENOMEM);
1517 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1519 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1521 rc = filter_client_add(obd, filter, fed, -1);
1528 OBD_FREE(fcd, sizeof(*fcd));
1530 class_disconnect(conn, 0);
1535 static void filter_destroy_export(struct obd_export *exp)
1537 struct filter_export_data *fed = &exp->exp_filter_data;
1540 spin_lock(&fed->fed_lock);
1541 while (!list_empty(&fed->fed_open_head)) {
1542 struct filter_file_data *ffd;
1544 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1546 list_del(&ffd->ffd_export_list);
1547 spin_unlock(&fed->fed_lock);
1549 CDEBUG(D_INFO, "force close file %*s (hdl %p:"LPX64") on "
1550 "disconnect\n", ffd->ffd_file->f_dentry->d_name.len,
1551 ffd->ffd_file->f_dentry->d_name.name,
1552 ffd, ffd->ffd_handle.h_cookie);
1554 filter_close_internal(exp, ffd, NULL, exp->exp_flags);
1555 spin_lock(&fed->fed_lock);
1557 spin_unlock(&fed->fed_lock);
1559 if (exp->exp_obd->obd_replayable)
1560 filter_client_free(exp, exp->exp_flags);
1564 /* also incredibly similar to mds_disconnect */
1565 static int filter_disconnect(struct lustre_handle *conn, int flags)
1567 struct obd_export *exp = class_conn2export(conn);
1568 unsigned long irqflags;
1573 ldlm_cancel_locks_for_export(exp);
1575 spin_lock_irqsave(&exp->exp_lock, irqflags);
1576 exp->exp_flags = flags;
1577 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1579 rc = class_disconnect(conn, flags);
1581 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1582 class_export_put(exp);
1583 /* XXX cleanup preallocated inodes */
1587 struct dentry *__filter_oa2dentry(struct obd_device *obd,
1588 struct obdo *oa, const char *what)
1590 struct dentry *dchild = NULL;
1592 if (oa->o_valid & OBD_MD_FLHANDLE) {
1593 struct lustre_handle *ost_handle = obdo_handle(oa);
1594 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1597 struct filter_dentry_data *fdd;
1598 dchild = dget(ffd->ffd_file->f_dentry);
1599 fdd = dchild->d_fsdata;
1600 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1601 filter_ffd_put(ffd);
1603 CDEBUG(D_INODE,"%s got child objid %*s: %p, count %d\n",
1604 what, dchild->d_name.len, dchild->d_name.name,
1605 dchild, atomic_read(&dchild->d_count));
1610 dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
1612 if (IS_ERR(dchild)) {
1613 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1617 if (!dchild->d_inode) {
1618 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1620 RETURN(ERR_PTR(-ENOENT));
1626 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1627 struct lov_stripe_md *md)
1629 struct dentry *dentry = NULL;
1630 struct obd_device *obd;
1634 obd = class_conn2obd(conn);
1636 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1640 dentry = filter_oa2dentry(obd, oa);
1642 RETURN(PTR_ERR(dentry));
1644 /* Limit the valid bits in the return data to what we actually use */
1645 oa->o_valid = OBD_MD_FLID;
1646 obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1652 /* this is called from filter_truncate() until we have filter_punch() */
1653 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1654 struct lov_stripe_md *md, struct obd_trans_info *oti)
1656 struct obd_run_ctxt saved;
1657 struct obd_export *exp;
1658 struct filter_obd *filter;
1659 struct dentry *dentry;
1665 LASSERT(oti != NULL);
1666 exp = class_conn2export(conn);
1668 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1672 dentry = filter_oa2dentry(exp->exp_obd, oa);
1674 GOTO(out_exp, rc = PTR_ERR(dentry));
1676 filter = &exp->exp_obd->u.filter;
1678 iattr_from_obdo(&iattr, oa, oa->o_valid);
1680 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1683 /* XXX this could be a rwsem instead, if filter_preprw played along */
1684 if (iattr.ia_valid & ATTR_SIZE)
1685 down(&dentry->d_inode->i_sem);
1687 handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
1690 GOTO(out_unlock, rc = PTR_ERR(handle));
1692 rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
1693 rc = filter_finish_transno(exp, oti, rc);
1694 rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
1696 CERROR("error on commit, err = %d\n", rc2);
1701 if (iattr.ia_valid & ATTR_SIZE)
1702 up(&dentry->d_inode->i_sem);
1704 oa->o_valid = OBD_MD_FLID;
1705 obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
1709 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1713 class_export_put(exp);
1717 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1718 struct lov_stripe_md *ea, struct obd_trans_info *oti,
1719 struct obd_client_handle *och)
1721 struct obd_export *exp;
1722 struct lustre_handle *handle;
1723 struct filter_file_data *ffd;
1725 struct lustre_handle parent_lockh;
1729 exp = class_conn2export(conn);
1731 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1735 filp = filter_obj_open(exp, oti, oa->o_id, oa->o_mode,
1736 LCK_PR, &parent_lockh);
1738 GOTO(out, rc = PTR_ERR(filp));
1740 oa->o_valid = OBD_MD_FLID;
1741 obdo_from_inode(oa, filp->f_dentry->d_inode, FILTER_VALID_FLAGS);
1743 ffd = filp->private_data;
1744 handle = obdo_handle(oa);
1745 handle->cookie = ffd->ffd_handle.h_cookie;
1746 oa->o_valid |= OBD_MD_FLHANDLE;
1749 class_export_put(exp);
1751 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1752 sizeof(parent_lockh));
1753 oti->oti_ack_locks[0].mode = LCK_PR;
1758 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1759 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1761 struct obd_export *exp;
1762 struct filter_file_data *ffd;
1763 struct filter_export_data *fed;
1767 exp = class_conn2export(conn);
1769 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1773 if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1774 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1775 GOTO(out, rc = -EINVAL);
1778 ffd = filter_handle2ffd(obdo_handle(oa));
1780 CERROR("bad handle ("LPX64") for close\n",
1781 obdo_handle(oa)->cookie);
1782 GOTO(out, rc = -ESTALE);
1785 fed = &exp->exp_filter_data;
1786 spin_lock(&fed->fed_lock);
1787 list_del(&ffd->ffd_export_list);
1788 spin_unlock(&fed->fed_lock);
1790 oa->o_valid = OBD_MD_FLID;
1791 obdo_from_inode(oa,ffd->ffd_file->f_dentry->d_inode,FILTER_VALID_FLAGS);
1793 rc = filter_close_internal(exp, ffd, oti, 0);
1794 filter_ffd_put(ffd);
1797 class_export_put(exp);
1801 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1802 struct lov_stripe_md **ea, struct obd_trans_info *oti)
1804 struct obd_export *exp;
1805 struct obd_device *obd;
1806 struct filter_obd *filter;
1807 struct obd_run_ctxt saved;
1808 struct lustre_handle parent_lockh;
1809 struct dentry *dparent;
1810 struct ll_fid mds_fid = { .id = 0 };
1811 struct dentry *dchild = NULL;
1813 int err, rc, cleanup_phase;
1816 exp = class_conn2export(conn);
1818 CDEBUG(D_IOCTL,"invalid client cookie "LPX64"\n", conn->cookie);
1823 filter = &obd->u.filter;
1824 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1826 oa->o_id = filter_next_id(filter);
1829 dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1831 if (IS_ERR(dparent))
1832 GOTO(cleanup, rc = PTR_ERR(dparent));
1835 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1837 GOTO(cleanup, rc = PTR_ERR(dchild));
1838 if (dchild->d_inode) {
1839 /* This would only happen if lastobjid was bad on disk */
1840 CERROR("Serious error: objid %*s already exists; is this "
1841 "filesystem corrupt? I will try to work around it.\n",
1842 dchild->d_name.len, dchild->d_name.name);
1844 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1849 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE_LOG, oti);
1851 GOTO(cleanup, rc = PTR_ERR(handle));
1853 rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
1855 CERROR("create failed rc = %d\n", rc);
1856 } else if (oa->o_valid & (OBD_MD_FLCTIME|OBD_MD_FLMTIME|OBD_MD_FLSIZE)){
1859 iattr_from_obdo(&attr, oa, oa->o_valid);
1860 rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
1862 CERROR("create setattr failed rc = %d\n", rc);
1864 rc = filter_finish_transno(exp, oti, rc);
1865 err = filter_update_server_data(obd, filter->fo_rcvd_filp,
1868 CERROR("unable to write lastobjid but file created\n");
1870 /* Set flags for fields we have set in the inode struct */
1871 if (!rc && mds_fid.id && (oa->o_valid & OBD_MD_FLCOOKIE)) {
1872 err = filter_log_op_create(obd->u.filter.fo_catalog, &mds_fid,
1873 dchild->d_inode->i_ino,
1874 dchild->d_inode->i_generation,
1875 oti->oti_logcookies);
1877 CERROR("error logging create record: rc %d\n", err);
1878 oa->o_valid = OBD_MD_FLID;
1880 oa->o_valid = OBD_MD_FLID | OBD_MD_FLCOOKIE;
1883 oa->o_valid = OBD_MD_FLID;
1885 err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1887 CERROR("error on commit, err = %d\n", err);
1895 /* Set flags for fields we have set in the inode struct */
1896 obdo_from_inode(oa, dchild->d_inode, FILTER_VALID_FLAGS);
1900 switch(cleanup_phase) {
1903 case 1: /* locked parent dentry */
1904 if (rc || oti == NULL) {
1905 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1907 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1908 sizeof(parent_lockh));
1909 oti->oti_ack_locks[0].mode = LCK_PW;
1912 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1913 class_export_put(exp);
1916 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1923 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1924 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1926 struct obd_export *exp;
1927 struct obd_device *obd;
1928 struct filter_obd *filter;
1929 struct dentry *dchild = NULL, *dparent = NULL;
1930 struct filter_dentry_data *fdd;
1931 struct obd_run_ctxt saved;
1932 void *handle = NULL;
1933 struct lustre_handle parent_lockh;
1934 struct llog_cookie *fcc = NULL;
1935 int rc, rc2, cleanup_phase = 0;
1938 exp = class_conn2export(conn);
1940 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
1945 filter = &obd->u.filter;
1947 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1948 dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1949 LCK_PW, &parent_lockh);
1950 if (IS_ERR(dparent))
1951 GOTO(cleanup, rc = PTR_ERR(dparent));
1954 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1956 GOTO(cleanup, rc = -ENOENT);
1959 if (dchild->d_inode == NULL) {
1960 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
1961 GOTO(cleanup, rc = -ENOENT);
1963 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK_LOG, oti);
1965 GOTO(cleanup, rc = PTR_ERR(handle));
1968 fdd = dchild->d_fsdata;
1970 /* Our MDC connection is established by the MDS to us */
1971 if ((oa->o_valid & OBD_MD_FLCOOKIE) && filter->fo_mdc_imp != NULL) {
1972 OBD_ALLOC(fcc, sizeof(*fcc));
1974 memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
1977 if (fdd != NULL && atomic_read(&fdd->fdd_open_count)) {
1978 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1979 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1980 fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1982 #ifdef ENABLE_ORPHANS
1983 filter_log_op_orphan(filter->fo_catalog, oa->o_id,
1984 oa->o_generation,&fdd->fdd_cookie);
1987 "defer destroy of %dx open objid "LPU64"\n",
1988 atomic_read(&fdd->fdd_open_count), oa->o_id);
1991 "repeat destroy of %dx open objid "LPU64"\n",
1992 atomic_read(&fdd->fdd_open_count), oa->o_id);
1994 GOTO(cleanup, rc = 0);
1997 rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
2000 switch(cleanup_phase) {
2003 fsfilt_set_last_rcvd(obd, 0, oti->oti_handle,
2004 filter_cancel_cookies_cb, fcc);
2005 rc = filter_finish_transno(exp, oti, rc);
2006 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
2008 CERROR("error on commit, err = %d\n", rc2);
2015 if (rc || oti == NULL) {
2016 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
2018 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
2019 sizeof(parent_lockh));
2020 oti->oti_ack_locks[0].mode = LCK_PW;
2023 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
2024 class_export_put(exp);
2027 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2034 /* NB start and end are used for punch, but not truncate */
2035 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
2036 struct lov_stripe_md *lsm,
2037 obd_off start, obd_off end,
2038 struct obd_trans_info *oti)
2043 if (end != OBD_OBJECT_EOF)
2044 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2047 CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
2048 "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2050 error = filter_setattr(conn, oa, NULL, oti);
2054 static int filter_syncfs(struct obd_export *exp)
2058 RETURN(fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb));
2061 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2062 unsigned long max_age)
2065 RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2068 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2069 void *key, __u32 *vallen, void *val)
2071 struct obd_device *obd;
2074 obd = class_conn2obd(conn);
2076 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2081 if (keylen == strlen("blocksize") &&
2082 memcmp(key, "blocksize", keylen) == 0) {
2083 __u32 *blocksize = val;
2084 *vallen = sizeof(*blocksize);
2085 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2089 if (keylen == strlen("blocksize_bits") &&
2090 memcmp(key, "blocksize_bits", keylen) == 0) {
2091 __u32 *blocksize_bits = val;
2092 *vallen = sizeof(*blocksize_bits);
2093 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2097 CDEBUG(D_IOCTL, "invalid key\n");
2101 static int filter_set_info(struct lustre_handle *conn, __u32 keylen,
2102 void *key, __u32 vallen, void *val)
2104 struct obd_device *obd;
2105 struct obd_export *exp;
2106 struct obd_import *imp;
2109 obd = class_conn2obd(conn);
2111 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2116 if (keylen < strlen("mds_conn") ||
2117 memcmp(key, "mds_conn", keylen) != 0)
2120 CERROR("Received MDS connection ("LPX64")\n", conn->cookie);
2121 memcpy(&obd->u.filter.fo_mdc_conn, conn, sizeof(*conn));
2123 imp = obd->u.filter.fo_mdc_imp = class_new_import();
2125 exp = class_conn2export(conn);
2126 imp->imp_connection = ptlrpc_connection_addref(exp->exp_connection);
2127 class_export_put(exp);
2129 imp->imp_client = &obd->u.filter.fo_mdc_client;
2130 imp->imp_remote_handle = *conn;
2132 imp->imp_dlm_fake = 1; /* XXX rename imp_dlm_fake to something else */
2133 imp->imp_level = LUSTRE_CONN_FULL;
2134 class_import_put(imp);
2139 int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2140 int len, void *karg, void *uarg)
2142 struct obd_device *obd = class_conn2obd(conn);
2145 case OBD_IOC_ABORT_RECOVERY:
2146 CERROR("aborting recovery for device %s\n", obd->obd_name);
2147 target_abort_recovery(obd);
2156 static struct obd_ops filter_obd_ops = {
2157 o_owner: THIS_MODULE,
2158 o_attach: filter_attach,
2159 o_detach: filter_detach,
2160 o_get_info: filter_get_info,
2161 o_set_info: filter_set_info,
2162 o_setup: filter_setup,
2163 o_cleanup: filter_cleanup,
2164 o_connect: filter_connect,
2165 o_disconnect: filter_disconnect,
2166 o_statfs: filter_statfs,
2167 o_syncfs: filter_syncfs,
2168 o_getattr: filter_getattr,
2169 o_create: filter_create,
2170 o_setattr: filter_setattr,
2171 o_destroy: filter_destroy,
2172 o_open: filter_open,
2173 o_close: filter_close,
2175 o_punch: filter_truncate,
2176 o_preprw: filter_preprw,
2177 o_commitrw: filter_commitrw,
2178 o_log_cancel: filter_log_cancel,
2179 o_destroy_export: filter_destroy_export,
2180 o_iocontrol: filter_iocontrol,
2183 static struct obd_ops filter_sanobd_ops = {
2184 o_owner: THIS_MODULE,
2185 o_attach: filter_attach,
2186 o_detach: filter_detach,
2187 o_get_info: filter_get_info,
2188 o_set_info: filter_set_info,
2189 o_setup: filter_san_setup,
2190 o_cleanup: filter_cleanup,
2191 o_connect: filter_connect,
2192 o_disconnect: filter_disconnect,
2193 o_statfs: filter_statfs,
2194 o_getattr: filter_getattr,
2195 o_create: filter_create,
2196 o_setattr: filter_setattr,
2197 o_destroy: filter_destroy,
2198 o_open: filter_open,
2199 o_close: filter_close,
2201 o_punch: filter_truncate,
2202 o_preprw: filter_preprw,
2203 o_commitrw: filter_commitrw,
2204 o_log_cancel: filter_log_cancel,
2205 o_san_preprw: filter_san_preprw,
2206 o_destroy_export: filter_destroy_export,
2207 o_iocontrol: filter_iocontrol,
2210 static int __init obdfilter_init(void)
2212 struct lprocfs_static_vars lvars;
2215 printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2217 lprocfs_init_vars(filter, &lvars);
2219 rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2220 OBD_FILTER_DEVICENAME);
2224 rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2225 OBD_FILTER_SAN_DEVICENAME);
2227 class_unregister_type(OBD_FILTER_DEVICENAME);
2231 static void __exit obdfilter_exit(void)
2233 class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2234 class_unregister_type(OBD_FILTER_DEVICENAME);
2237 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2238 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2239 MODULE_LICENSE("GPL");
2241 module_init(obdfilter_init);
2242 module_exit(obdfilter_exit);