1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28 * (which need to get journal_lock, may block if journal full).
30 * Invariant: Call filter_start_transno() before any journal ops to avoid the
31 * same deadlock problem. We can (and want) to get rid of the
32 * transno sem in favour of the dir/inode i_sem to avoid single
33 * threaded operation on the OST.
37 #define DEBUG_SUBSYSTEM S_FILTER
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
53 static kmem_cache_t *filter_open_cache;
54 static kmem_cache_t *filter_dentry_cache;
56 /* should be generic per-obd stats... */
57 struct xprocfs_io_stat {
62 __u64 st_getattr_reqs;
63 __u64 st_setattr_reqs;
65 __u64 st_destroy_reqs;
72 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
73 static struct proc_dir_entry *xprocfs_dir;
75 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count) \
77 xprocfs_iostats[smp_processor_id()].field += (count); \
80 #define DECLARE_XPROCFS_SUM_STAT(field) \
82 xprocfs_sum_##field (void) \
87 for (i = 0; i < smp_num_cpus; i++) \
88 stat += xprocfs_iostats[i].field; \
92 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
93 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
94 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
95 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
96 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
97 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
98 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
99 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
100 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
101 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
102 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
103 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
106 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
107 int *eof, void *data)
109 long long (*fn)(void) = (long long(*)(void))data;
116 len = snprintf (page, count, "%Ld\n", fn());
123 xprocfs_add_stat(char *name, long long (*fn)(void))
125 struct proc_dir_entry *entry;
127 entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
129 CERROR ("Can't add procfs stat %s\n", name);
134 entry->read_proc = xprocfs_rd_stat;
135 entry->write_proc = NULL;
139 xprocfs_init (char *name)
143 snprintf (dirname, sizeof (dirname), "sys/%s", name);
145 xprocfs_dir = proc_mkdir ("sys/obdfilter", NULL);
146 if (xprocfs_dir == NULL) {
147 CERROR ("Can't make dir\n");
151 xprocfs_add_stat ("read_bytes", xprocfs_sum_st_read_bytes);
152 xprocfs_add_stat ("read_reqs", xprocfs_sum_st_read_reqs);
153 xprocfs_add_stat ("write_bytes", xprocfs_sum_st_write_bytes);
154 xprocfs_add_stat ("write_reqs", xprocfs_sum_st_write_reqs);
155 xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
156 xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
157 xprocfs_add_stat ("create_reqs", xprocfs_sum_st_create_reqs);
158 xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
159 xprocfs_add_stat ("statfs_reqs", xprocfs_sum_st_statfs_reqs);
160 xprocfs_add_stat ("open_reqs", xprocfs_sum_st_open_reqs);
161 xprocfs_add_stat ("close_reqs", xprocfs_sum_st_close_reqs);
162 xprocfs_add_stat ("punch_reqs", xprocfs_sum_st_punch_reqs);
165 void xprocfs_fini (void)
167 if (xprocfs_dir == NULL)
170 remove_proc_entry ("read_bytes", xprocfs_dir);
171 remove_proc_entry ("read_reqs", xprocfs_dir);
172 remove_proc_entry ("write_bytes", xprocfs_dir);
173 remove_proc_entry ("write_reqs", xprocfs_dir);
174 remove_proc_entry ("getattr_reqs", xprocfs_dir);
175 remove_proc_entry ("setattr_reqs", xprocfs_dir);
176 remove_proc_entry ("create_reqs", xprocfs_dir);
177 remove_proc_entry ("destroy_reqs", xprocfs_dir);
178 remove_proc_entry ("statfs_reqs", xprocfs_dir);
179 remove_proc_entry ("open_reqs", xprocfs_dir);
180 remove_proc_entry ("close_reqs", xprocfs_dir);
181 remove_proc_entry ("punch_reqs", xprocfs_dir);
183 remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
188 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
190 [S_IFREG >> S_SHIFT] "R",
191 [S_IFDIR >> S_SHIFT] "D",
192 [S_IFCHR >> S_SHIFT] "C",
193 [S_IFBLK >> S_SHIFT] "B",
194 [S_IFIFO >> S_SHIFT] "F",
195 [S_IFSOCK >> S_SHIFT] "S",
196 [S_IFLNK >> S_SHIFT] "L"
199 static inline const char *obd_mode_to_type(int mode)
201 return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
204 static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
207 CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
209 if (!error && last_rcvd > obd->obd_last_committed)
210 obd->obd_last_committed = last_rcvd;
213 void filter_start_transno(struct obd_export *export)
215 struct obd_device * obd = export->exp_obd;
218 down(&obd->u.filter.fo_transno_sem);
221 /* Assumes caller has already pushed us into the kernel context. */
222 int filter_finish_transno(struct obd_export *export, void *handle,
223 struct obd_trans_info *oti, int rc)
226 struct obd_device *obd = export->exp_obd;
227 struct filter_obd *filter = &obd->u.filter;
228 struct filter_export_data *fed = &export->exp_filter_data;
229 struct filter_client_data *fcd = fed->fed_fcd;
233 /* Propagate error code. */
237 /* we don't allocate new transnos for replayed requests */
239 /* perhaps if transno already set? or should level be in oti? */
240 if (req->rq_level == LUSTRE_CONN_RECOVD)
244 off = FILTER_LR_CLIENT_START + fed->fed_lr_off * FILTER_LR_CLIENT_SIZE;
246 last_rcvd = ++filter->fo_fsd->fsd_last_rcvd;
248 oti->oti_transno = last_rcvd;
249 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
250 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
252 /* get this from oti */
255 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
258 fcd->fcd_last_xid = 0;
260 fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_last_rcvd_cb);
261 written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
263 CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
264 LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_off, written);
266 if (written == sizeof(*fcd))
268 CERROR("error writing to last_rcvd file: rc = %d\n", rc);
270 GOTO(out, rc = -EIO);
277 up(&filter->fo_transno_sem);
281 /* write the pathname into the string */
282 static int filter_id(char *buf, obd_id id, obd_mode mode)
284 return sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
287 static inline void f_dput(struct dentry *dentry)
289 /* Can't go inside filter_ddelete because it can block */
290 CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
291 dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
292 LASSERT(atomic_read(&dentry->d_count) > 0);
297 /* Not racy w.r.t. others, because we are the only user of this dentry */
298 static void filter_drelease(struct dentry *dentry)
300 if (dentry->d_fsdata)
301 kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
304 struct dentry_operations filter_dops = {
305 .d_release = filter_drelease,
308 #define LAST_RCVD "last_rcvd"
311 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
312 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
313 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
315 static unsigned long filter_last_rcvd_slots[FILTER_LR_MAX_CLIENT_WORDS];
317 /* Add client data to the FILTER. We use a bitmap to locate a free space
318 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
319 * Otherwise, we have just read the data from the last_rcvd file and
320 * we know its offset.
322 int filter_client_add(struct filter_obd *filter,
323 struct filter_export_data *fed, int cl_off)
325 int new_client = (cl_off == -1);
327 /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
328 * there's no need for extra complication here
331 cl_off = find_first_zero_bit(filter_last_rcvd_slots,
332 FILTER_LR_MAX_CLIENTS);
334 if (cl_off >= FILTER_LR_MAX_CLIENTS) {
335 CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
338 if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
339 CERROR("FILTER client %d: found bit is set in bitmap\n",
341 cl_off = find_next_zero_bit(filter_last_rcvd_slots,
342 FILTER_LR_MAX_CLIENTS,
347 if (test_and_set_bit(cl_off, filter_last_rcvd_slots)) {
348 CERROR("FILTER client %d: bit already set in bitmap!\n",
354 CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
355 cl_off, fed->fed_fcd->fcd_uuid);
357 fed->fed_lr_off = cl_off;
360 struct obd_run_ctxt saved;
361 loff_t off = FILTER_LR_CLIENT_START +
362 (cl_off * FILTER_LR_CLIENT_SIZE);
365 push_ctxt(&saved, &filter->fo_ctxt, NULL);
366 written = lustre_fwrite(filter->fo_rcvd_filp,
367 (char *)fed->fed_fcd,
368 sizeof(*fed->fed_fcd), &off);
369 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
371 if (written != sizeof(*fed->fed_fcd)) {
376 CDEBUG(D_INFO, "wrote client fcd at off %u (len %u)\n",
377 FILTER_LR_CLIENT_START + (cl_off*FILTER_LR_CLIENT_SIZE),
378 (unsigned int)sizeof(*fed->fed_fcd));
383 int filter_client_free(struct obd_export *exp)
385 struct filter_export_data *fed = &exp->exp_filter_data;
386 struct filter_obd *filter = &exp->exp_obd->u.filter;
387 struct filter_client_data zero_fcd;
388 struct obd_run_ctxt saved;
395 off = FILTER_LR_CLIENT_START + (fed->fed_lr_off*FILTER_LR_CLIENT_SIZE);
397 CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
398 fed->fed_lr_off, off, fed->fed_fcd->fcd_uuid);
400 if (!test_and_clear_bit(fed->fed_lr_off, filter_last_rcvd_slots)) {
401 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
406 memset(&zero_fcd, 0, sizeof zero_fcd);
407 push_ctxt(&saved, &filter->fo_ctxt, NULL);
408 written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
409 sizeof(zero_fcd), &off);
411 /* XXX: this write gets lost sometimes, unless this sync is here. */
412 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
413 fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
415 file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
417 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
419 if (written != sizeof(zero_fcd)) {
420 CERROR("error zeroing out client %s off %d in %s: %d\n",
421 fed->fed_fcd->fcd_uuid, fed->fed_lr_off, LAST_RCVD,
425 "zeroed disconnecting client %s at off %d ("LPX64")\n",
426 fed->fed_fcd->fcd_uuid, fed->fed_lr_off, off);
429 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
434 static void filter_unpack_fsd(struct filter_server_data *fsd)
436 fsd->fsd_last_objid = le64_to_cpu(fsd->fsd_last_objid);
437 fsd->fsd_last_rcvd = le64_to_cpu(fsd->fsd_last_rcvd);
438 fsd->fsd_mount_count = le64_to_cpu(fsd->fsd_mount_count);
441 static void filter_pack_fsd(struct filter_server_data *disk_fsd,
442 struct filter_server_data *fsd)
444 memset(disk_fsd, 0, sizeof(*disk_fsd));
445 memcpy(disk_fsd->fsd_uuid, fsd->fsd_uuid, sizeof(fsd->fsd_uuid));
446 disk_fsd->fsd_last_objid = cpu_to_le64(fsd->fsd_last_objid);
447 disk_fsd->fsd_last_rcvd = cpu_to_le64(fsd->fsd_last_rcvd);
448 disk_fsd->fsd_mount_count = cpu_to_le64(fsd->fsd_mount_count);
451 static int filter_free_server_data(struct filter_obd *filter)
453 OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
454 filter->fo_fsd = NULL;
460 /* assumes caller has already in kernel ctxt */
461 static int filter_update_server_data(struct file *filp,
462 struct filter_server_data *fsd)
464 struct filter_server_data disk_fsd;
468 CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
469 CDEBUG(D_INODE, "server last_objid: "LPU64"\n", fsd->fsd_last_objid);
470 CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n", fsd->fsd_last_rcvd);
471 CDEBUG(D_INODE, "server last_mount: "LPU64"\n", fsd->fsd_mount_count);
473 filter_pack_fsd(&disk_fsd, fsd);
474 rc = lustre_fwrite(filp, (char *)&disk_fsd,
475 sizeof(disk_fsd), &off);
476 if (rc != sizeof(disk_fsd)) {
477 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
484 /* assumes caller has already in kernel ctxt */
485 static int filter_init_server_data(struct obd_device *obd,
487 __u64 init_lastobjid)
489 struct filter_obd *filter = &obd->u.filter;
490 struct filter_server_data *fsd;
491 struct filter_client_data *fcd = NULL;
492 struct inode *inode = filp->f_dentry->d_inode;
493 unsigned long last_rcvd_size = inode->i_size;
498 /* ensure padding in the struct is the correct size */
499 LASSERT (offsetof(struct filter_server_data, fsd_padding) +
500 sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
501 LASSERT (offsetof(struct filter_client_data, fcd_padding) +
502 sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
504 OBD_ALLOC(fsd, sizeof(*fsd));
507 filter->fo_fsd = fsd;
509 if (last_rcvd_size == 0) {
510 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
512 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
513 fsd->fsd_last_objid = init_lastobjid;
514 fsd->fsd_last_rcvd = 0;
515 fsd->fsd_mount_count = 0;
518 ssize_t retval = lustre_fread(filp, (char *)fsd,
521 if (retval != sizeof(*fsd)) {
522 CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
523 GOTO(out, rc = -EIO);
525 filter_unpack_fsd(fsd);
528 CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
529 obd->obd_name, fsd->fsd_last_objid);
530 CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
531 obd->obd_name, fsd->fsd_last_rcvd);
532 CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
533 obd->obd_name, fsd->fsd_mount_count);
536 * When we do a clean FILTER shutdown, we save the last_rcvd into
537 * the header. If we find clients with higher last_rcvd values
538 * then those clients may need recovery done.
540 /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
541 for (off = FILTER_LR_CLIENT_START, cl_off = 0; off < last_rcvd_size;
547 OBD_ALLOC(fcd, sizeof(*fcd));
549 GOTO(err_fsd, rc = -ENOMEM);
552 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
553 if (rc != sizeof(*fcd)) {
554 CERROR("error reading FILTER %s offset %d: rc = %d\n",
555 LAST_RCVD, cl_off, rc);
556 if (rc > 0) /* XXX fatal error or just abort reading? */
561 if (fcd->fcd_uuid[0] == '\0') {
562 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
567 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
569 /* These exports are cleaned up by filter_disconnect(), so they
570 * need to be set up like real exports as filter_connect() does.
572 mount_age = fsd->fsd_mount_count -
573 le64_to_cpu(fcd->fcd_mount_count);
574 if (mount_age < FILTER_MOUNT_RECOV) {
575 CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
576 "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
577 "\n", fcd->fcd_uuid, cl_off,
578 last_rcvd, fsd->fsd_last_rcvd,
579 le64_to_cpu(fcd->fcd_mount_count),
580 fsd->fsd_mount_count);
582 /* disabled until OST recovery is actually working */
583 struct obd_export *exp = class_new_export(obd);
584 struct filter_export_data *fed;
591 fed = &exp->exp_filter_data;
593 filter_client_add(filter, fed, cl_off);
594 /* create helper if export init gets more complex */
595 INIT_LIST_HEAD(&fed->fed_open_head);
596 spin_lock_init(&fed->fed_lock);
599 filter->fo_recoverable_clients++;
603 "discarded client %d, UUID '%s', count %Ld\n",
604 cl_off, fcd->fcd_uuid,
605 (long long)le64_to_cpu(fcd->fcd_mount_count));
608 CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
609 cl_off, (unsigned long long)last_rcvd);
611 if (last_rcvd > filter->fo_fsd->fsd_last_rcvd)
612 filter->fo_fsd->fsd_last_rcvd = last_rcvd;
615 obd->obd_last_committed = filter->fo_fsd->fsd_last_rcvd;
616 if (filter->fo_recoverable_clients) {
617 CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
618 filter->fo_recoverable_clients,
619 filter->fo_fsd->fsd_last_rcvd);
620 filter->fo_next_recovery_transno = obd->obd_last_committed + 1;
621 obd->obd_flags |= OBD_RECOVERING;
625 OBD_FREE(fcd, sizeof(*fcd));
627 fsd->fsd_mount_count++;
629 /* save it,so mount count and last_recvd is current */
630 rc = filter_update_server_data(filp, filter->fo_fsd);
636 filter_free_server_data(filter);
640 /* setup the object store with correct subdirectories */
641 static int filter_prep(struct obd_device *obd)
643 struct obd_run_ctxt saved;
644 struct filter_obd *filter = &obd->u.filter;
645 struct dentry *dentry;
651 push_ctxt(&saved, &filter->fo_ctxt, NULL);
652 dentry = simple_mkdir(current->fs->pwd, "O", 0700);
653 CDEBUG(D_INODE, "got/created O: %p\n", dentry);
654 if (IS_ERR(dentry)) {
655 rc = PTR_ERR(dentry);
656 CERROR("cannot open/create O: rc = %d\n", rc);
659 filter->fo_dentry_O = dentry;
662 * Create directories and/or get dentries for each object type.
663 * This saves us from having to do multiple lookups for each one.
665 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
666 char *name = obd_type_by_mode[mode];
669 filter->fo_dentry_O_mode[mode] = NULL;
672 dentry = simple_mkdir(filter->fo_dentry_O, name, 0700);
673 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
674 if (IS_ERR(dentry)) {
675 rc = PTR_ERR(dentry);
676 CERROR("cannot create O/%s: rc = %d\n", name, rc);
677 GOTO(out_O_mode, rc);
679 filter->fo_dentry_O_mode[mode] = dentry;
682 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
683 if ( !file || IS_ERR(file) ) {
685 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
687 GOTO(out_O_mode, rc);
690 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
691 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
692 file->f_dentry->d_inode->i_mode);
693 GOTO(err_filp, rc = -ENOENT);
696 rc = fsfilt_journal_data(obd, file);
698 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
701 /* steal operations */
702 inode = file->f_dentry->d_inode;
703 filter->fo_fop = file->f_op;
704 filter->fo_iop = inode->i_op;
705 filter->fo_aops = inode->i_mapping->a_ops;
707 rc = filter_init_server_data(obd, file, INIT_OBJID);
709 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
710 GOTO(err_client, rc);
712 filter->fo_rcvd_filp = file;
716 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
721 class_disconnect_all(obd);
723 if (filp_close(file, 0))
724 CERROR("can't close %s after error\n", LAST_RCVD);
725 filter->fo_rcvd_filp = NULL;
728 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
731 filter->fo_dentry_O_mode[mode] = NULL;
734 f_dput(filter->fo_dentry_O);
735 filter->fo_dentry_O = NULL;
739 /* cleanup the filter: write last used object id to status file */
740 static void filter_post(struct obd_device *obd)
742 struct obd_run_ctxt saved;
743 struct filter_obd *filter = &obd->u.filter;
747 /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
748 * best to start a transaction with h_sync, because we removed this
751 push_ctxt(&saved, &filter->fo_ctxt, NULL);
752 rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
754 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
755 filter_free_server_data(filter);
758 if (filter->fo_rcvd_filp) {
759 /* broken sync at umount bug workaround */
760 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
761 rc = fsync_dev(filter->fo_rcvd_filp->f_dentry->d_inode->i_rdev);
763 rc = file_fsync(filter->fo_rcvd_filp,
764 filter->fo_rcvd_filp->f_dentry, 1);
766 filp_close(filter->fo_rcvd_filp, 0);
767 filter->fo_rcvd_filp = NULL;
769 CERROR("last_rcvd file won't closek rc = %ld\n", rc);
772 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
773 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
776 filter->fo_dentry_O_mode[mode] = NULL;
779 f_dput(filter->fo_dentry_O);
780 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
784 static __u64 filter_next_id(struct obd_device *obd)
787 LASSERT(obd->u.filter.fo_fsd != NULL);
789 spin_lock(&obd->u.filter.fo_objidlock);
790 id = ++obd->u.filter.fo_fsd->fsd_last_objid;
791 spin_unlock(&obd->u.filter.fo_objidlock);
796 /* how to get files, dentries, inodes from object id's */
797 /* parent i_sem is already held if needed for exclusivity */
798 static struct dentry *filter_fid2dentry(struct obd_device *obd,
799 struct dentry *dparent,
800 __u64 id, int lockit)
802 struct super_block *sb = obd->u.filter.fo_sb;
803 struct dentry *dchild;
808 if (!sb || !sb->s_dev) {
809 CERROR("fatal: device not initialized.\n");
810 RETURN(ERR_PTR(-ENXIO));
814 CERROR("fatal: invalid object id 0\n");
816 RETURN(ERR_PTR(-ESTALE));
819 len = sprintf(name, LPU64, id);
820 CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
821 dparent->d_name.len, dparent->d_name.name, name);
823 down(&dparent->d_inode->i_sem);
824 dchild = lookup_one_len(name, dparent, len);
826 up(&dparent->d_inode->i_sem);
827 if (IS_ERR(dchild)) {
828 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
832 CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
833 dparent->d_name.len, dparent->d_name.name, name, dchild,
834 atomic_read(&dchild->d_count));
836 LASSERT(atomic_read(&dchild->d_count) > 0);
841 static inline struct dentry *filter_parent(struct obd_device *obd,
844 struct filter_obd *filter = &obd->u.filter;
846 LASSERT((mode & S_IFMT) == S_IFREG); /* only regular files for now */
847 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
850 static struct file *filter_obj_open(struct obd_export *export,
851 __u64 id, __u32 type)
853 struct filter_obd *filter = &export->exp_obd->u.filter;
854 struct super_block *sb = filter->fo_sb;
855 struct dentry *dentry;
856 struct filter_export_data *fed = &export->exp_filter_data;
857 struct filter_dentry_data *fdd;
858 struct filter_file_data *ffd;
859 struct obd_run_ctxt saved;
864 if (!sb || !sb->s_dev) {
865 CERROR("fatal: device not initialized.\n");
866 RETURN(ERR_PTR(-ENXIO));
870 CERROR("fatal: invalid obdo "LPU64"\n", id);
871 RETURN(ERR_PTR(-ESTALE));
874 if (!(type & S_IFMT)) {
875 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
876 __FUNCTION__, id, type);
877 RETURN(ERR_PTR(-EINVAL));
880 PORTAL_SLAB_ALLOC(ffd, filter_open_cache, sizeof(*ffd));
882 CERROR("obdfilter: out of memory\n");
883 RETURN(ERR_PTR(-ENOMEM));
886 /* We preallocate this to avoid blocking while holding fo_fddlock */
887 fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
889 CERROR("obdfilter: out of memory\n");
890 GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
893 filter_id(name, id, type);
894 push_ctxt(&saved, &filter->fo_ctxt, NULL);
895 file = filp_open(name, O_RDWR | O_LARGEFILE, 0 /* type? */);
896 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
899 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
903 dentry = file->f_dentry;
904 spin_lock(&filter->fo_fddlock);
905 if (dentry->d_fsdata) {
906 spin_unlock(&filter->fo_fddlock);
907 kmem_cache_free(filter_dentry_cache, fdd);
908 fdd = dentry->d_fsdata;
909 LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
910 /* should only happen during client recovery */
911 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
912 CDEBUG(D_INODE,"opening destroyed object "LPX64"\n",id);
913 atomic_inc(&fdd->fdd_open_count);
915 atomic_set(&fdd->fdd_open_count, 1);
917 /* If this is racy, then we can use {cmp}xchg and atomic_add */
918 dentry->d_fsdata = fdd;
919 spin_unlock(&filter->fo_fddlock);
922 get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
923 ffd->ffd_file = file;
924 file->private_data = ffd;
927 dentry->d_op = &filter_dops;
929 LASSERT(dentry->d_op == &filter_dops);
931 spin_lock(&fed->fed_lock);
932 list_add(&ffd->ffd_export_list, &fed->fed_open_head);
933 spin_unlock(&fed->fed_lock);
935 CDEBUG(D_INODE, "opened objid "LPX64": rc = %p\n", id, file);
941 kmem_cache_free(filter_dentry_cache, fdd);
943 ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
944 PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
948 /* Caller must hold i_sem on dir_dentry->d_inode */
949 /* Caller must push us into kernel context */
950 static int filter_destroy_internal(struct obd_device *obd,
951 struct dentry *dir_dentry,
952 struct dentry *object_dentry)
954 struct inode *inode = object_dentry->d_inode;
958 if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
959 CERROR("destroying objid %*s nlink = %d, count = %d\n",
960 object_dentry->d_name.len,
961 object_dentry->d_name.name,
962 inode->i_nlink, atomic_read(&inode->i_count));
965 rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
968 CERROR("error unlinking objid %*s: rc %d\n",
969 object_dentry->d_name.len,
970 object_dentry->d_name.name, rc);
975 static int filter_close_internal(struct obd_export *export,
976 struct filter_file_data *ffd,
977 struct obd_trans_info *oti)
979 struct obd_device *obd = export->exp_obd;
980 struct filter_obd *filter = &obd->u.filter;
981 struct file *filp = ffd->ffd_file;
982 struct dentry *object_dentry = dget(filp->f_dentry);
983 struct filter_dentry_data *fdd = object_dentry->d_fsdata;
987 LASSERT(filp->private_data == ffd);
990 rc = filp_close(filp, 0);
992 if (atomic_dec_and_test(&fdd->fdd_open_count) &&
993 fdd->fdd_flags & FILTER_FLAG_DESTROY) {
994 struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
995 struct obd_run_ctxt saved;
998 down(&dir_dentry->d_inode->i_sem);
999 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1000 filter_start_transno(export);
1001 handle = fsfilt_start(obd, dir_dentry->d_inode,
1003 if (IS_ERR(handle)) {
1004 rc = filter_finish_transno(export, handle, oti,
1008 /* XXX unlink from PENDING directory now too */
1009 rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
1012 rc = filter_finish_transno(export, handle, oti, rc);
1013 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1015 CERROR("error on commit, err = %d\n", rc2);
1020 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1021 up(&dir_dentry->d_inode->i_sem);
1024 f_dput(object_dentry);
1025 PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1031 /* mount the file system (secretly) */
1032 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1034 struct obd_ioctl_data* data = buf;
1035 struct filter_obd *filter;
1036 struct vfsmount *mnt;
1040 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1041 RETURN(rc = -EINVAL);
1043 obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1044 if (IS_ERR(obd->obd_fsops))
1045 RETURN(rc = PTR_ERR(obd->obd_fsops));
1047 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
1052 obd->obd_flags |= OBD_REPLAYABLE;
1054 filter = &obd->u.filter;;
1055 init_MUTEX(&filter->fo_transno_sem);
1056 filter->fo_vfsmnt = mnt;
1057 filter->fo_fstype = strdup(data->ioc_inlbuf2);
1058 filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
1059 CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1061 OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1062 filter->fo_ctxt.pwdmnt = mnt;
1063 filter->fo_ctxt.pwd = mnt->mnt_root;
1064 filter->fo_ctxt.fs = get_ds();
1066 rc = filter_prep(obd);
1068 GOTO(err_kfree, rc);
1070 spin_lock_init(&filter->fo_fddlock);
1071 spin_lock_init(&filter->fo_objidlock);
1072 INIT_LIST_HEAD(&filter->fo_export_list);
1074 obd->obd_namespace =
1075 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
1076 if (!obd->obd_namespace)
1077 GOTO(err_post, rc = -ENOMEM);
1079 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1080 "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1087 kfree(filter->fo_fstype);
1089 mntput(filter->fo_vfsmnt);
1093 fsfilt_put_ops(obd->obd_fsops);
1098 static int filter_cleanup(struct obd_device *obd)
1100 struct super_block *sb;
1103 if (!list_empty(&obd->obd_exports)) {
1104 CERROR("still has clients!\n");
1105 class_disconnect_all(obd);
1106 if (!list_empty(&obd->obd_exports)) {
1107 CERROR("still has exports after forced cleanup?\n");
1112 ldlm_namespace_free(obd->obd_namespace);
1114 sb = obd->u.filter.fo_sb;
1115 if (!obd->u.filter.fo_sb)
1120 shrink_dcache_parent(sb->s_root);
1122 mntput(obd->u.filter.fo_vfsmnt);
1123 obd->u.filter.fo_sb = 0;
1124 kfree(obd->u.filter.fo_fstype);
1125 fsfilt_put_ops(obd->obd_fsops);
1132 int filter_attach(struct obd_device *dev, obd_count len, void *data)
1134 struct lprocfs_static_vars lvars;
1136 lprocfs_init_vars(&lvars);
1137 return lprocfs_obd_attach(dev, lvars.obd_vars);
1140 int filter_detach(struct obd_device *dev)
1142 return lprocfs_obd_detach(dev);
1145 /* nearly identical to mds_connect */
1146 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1147 struct obd_uuid *cluuid, struct recovd_obd *recovd,
1148 ptlrpc_recovery_cb_t recover)
1150 struct obd_export *exp;
1151 struct filter_export_data *fed;
1152 struct filter_client_data *fcd;
1153 struct filter_obd *filter = &obd->u.filter;
1158 if (!conn || !obd || !cluuid)
1161 rc = class_connect(conn, obd, cluuid);
1164 exp = class_conn2export(conn);
1166 fed = &exp->exp_filter_data;
1168 OBD_ALLOC(fcd, sizeof(*fcd));
1170 CERROR("filter: out of memory for client data\n");
1171 GOTO(out_export, rc = -ENOMEM);
1174 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1176 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1178 INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
1179 spin_lock_init(&exp->exp_filter_data.fed_lock);
1181 rc = filter_client_add(filter, fed, -1);
1188 OBD_FREE(fcd, sizeof(*fcd));
1190 class_disconnect(conn);
1195 /* also incredibly similar to mds_disconnect */
1196 static int filter_disconnect(struct lustre_handle *conn)
1198 struct obd_export *exp = class_conn2export(conn);
1199 struct filter_export_data *fed;
1204 fed = &exp->exp_filter_data;
1205 spin_lock(&fed->fed_lock);
1206 while (!list_empty(&fed->fed_open_head)) {
1207 struct filter_file_data *ffd;
1209 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1211 list_del(&ffd->ffd_export_list);
1212 spin_unlock(&fed->fed_lock);
1214 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1215 ffd->ffd_file->f_dentry->d_name.len,
1216 ffd->ffd_file->f_dentry->d_name.name,
1217 ffd, ffd->ffd_servercookie);
1219 filter_close_internal(exp, ffd, NULL);
1220 spin_lock(&fed->fed_lock);
1222 spin_unlock(&fed->fed_lock);
1224 ldlm_cancel_locks_for_export(exp);
1225 filter_client_free(exp);
1227 rc = class_disconnect(conn);
1229 /* XXX cleanup preallocated inodes */
1233 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1235 int type = oa->o_mode & S_IFMT;
1238 CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPX64" valid 0x%08x\n",
1239 inode->i_ino, inode, oa->o_id, valid);
1240 /* Don't copy the inode number in place of the object ID */
1241 obdo_from_inode(oa, inode, valid);
1242 oa->o_mode &= ~S_IFMT;
1245 if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1246 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1248 oa->o_valid |= OBD_MD_FLRDEV;
1254 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
1256 struct filter_file_data *ffd = NULL;
1259 if (!handle || !handle->addr)
1262 ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
1263 if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
1266 if (ffd->ffd_servercookie != handle->cookie)
1269 LASSERT(ffd->ffd_file->private_data == ffd);
1273 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1274 struct obdo *oa, int locked,char *what)
1276 struct dentry *dentry = NULL;
1278 if (oa->o_valid & OBD_MD_FLHANDLE) {
1279 struct lustre_handle *ost_handle = obdo_handle(oa);
1280 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1283 dentry = dget(ffd->ffd_file->f_dentry);
1287 struct obd_device *obd = class_conn2obd(conn);
1289 CERROR("invalid client "LPX64"\n", conn->addr);
1290 RETURN(ERR_PTR(-EINVAL));
1292 dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode),
1296 if (IS_ERR(dentry)) {
1297 CERROR("%s error looking up object: "LPX64"\n", what, oa->o_id);
1301 if (!dentry->d_inode) {
1302 CERROR("%s on non-existent object: "LPX64"\n", what, oa->o_id);
1305 RETURN(ERR_PTR(-ENOENT));
1311 #define filter_oa2dentry(conn, oa, locked) __filter_oa2dentry(conn, oa, locked,\
1314 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1315 struct lov_stripe_md *md)
1317 struct dentry *dentry = NULL;
1321 XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
1323 dentry = filter_oa2dentry(conn, oa, 1);
1325 RETURN(PTR_ERR(dentry));
1327 filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1333 /* this is called from filter_truncate() until we have filter_punch() */
1334 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1335 struct lov_stripe_md *md, struct obd_trans_info *oti)
1337 struct obd_run_ctxt saved;
1338 struct obd_export *export = class_conn2export(conn);
1339 struct obd_device *obd = class_conn2obd(conn);
1340 struct filter_obd *filter = &obd->u.filter;
1341 struct dentry *dentry;
1343 struct inode *inode;
1348 XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
1350 dentry = filter_oa2dentry(conn, oa, 0);
1353 RETURN(PTR_ERR(dentry));
1355 iattr_from_obdo(&iattr, oa, oa->o_valid);
1356 iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1357 inode = dentry->d_inode;
1359 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1361 if (iattr.ia_valid & ATTR_SIZE)
1362 down(&inode->i_sem);
1364 filter_start_transno(export);
1365 handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1366 if (IS_ERR(handle)) {
1367 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1368 GOTO(out_unlock, rc);
1371 if (inode->i_op->setattr)
1372 rc = inode->i_op->setattr(dentry, &iattr);
1374 rc = inode_setattr(inode, &iattr);
1375 rc = filter_finish_transno(export, handle, oti, rc);
1376 rc2 = fsfilt_commit(obd, dentry->d_inode, handle);
1378 CERROR("error on commit, err = %d\n", rc2);
1383 if (iattr.ia_valid & ATTR_SIZE) {
1385 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1386 obdo_from_inode(oa, inode, oa->o_valid);
1391 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1397 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1398 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1400 struct obd_export *export;
1401 struct lustre_handle *handle;
1402 struct filter_file_data *ffd;
1407 export = class_conn2export(conn);
1409 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1413 XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
1415 filp = filter_obj_open(export, oa->o_id, oa->o_mode);
1417 GOTO(out, rc = PTR_ERR(filp));
1419 filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1421 ffd = filp->private_data;
1422 handle = obdo_handle(oa);
1423 handle->addr = (__u64)(unsigned long)ffd;
1424 handle->cookie = ffd->ffd_servercookie;
1425 oa->o_valid |= OBD_MD_FLHANDLE;
1431 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1432 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1434 struct obd_export *exp;
1435 struct filter_file_data *ffd;
1436 struct filter_export_data *fed;
1440 exp = class_conn2export(conn);
1442 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1446 XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
1448 if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1449 CERROR("no handle for close of objid "LPX64"\n", oa->o_id);
1453 ffd = filter_handle2ffd(obdo_handle(oa));
1455 struct lustre_handle *handle = obdo_handle(oa);
1456 CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
1457 handle->addr, handle->cookie);
1461 fed = &exp->exp_filter_data;
1462 spin_lock(&fed->fed_lock);
1463 list_del(&ffd->ffd_export_list);
1464 spin_unlock(&fed->fed_lock);
1466 rc = filter_close_internal(exp, ffd, oti);
1469 } /* filter_close */
1471 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1472 struct lov_stripe_md **ea, struct obd_trans_info *oti)
1474 struct obd_export *export = class_conn2export(conn);
1475 struct obd_device *obd = class_conn2obd(conn);
1476 struct filter_obd *filter = &obd->u.filter;
1477 struct obd_run_ctxt saved;
1478 struct dentry *dir_dentry;
1486 CERROR("invalid client "LPX64"\n", conn->addr);
1490 XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
1492 oa->o_id = filter_next_id(obd);
1494 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1495 dir_dentry = filter_parent(obd, oa->o_mode);
1496 down(&dir_dentry->d_inode->i_sem);
1497 new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
1499 GOTO(out, rc = PTR_ERR(new));
1502 /* This would only happen if lastobjid was bad on disk */
1503 CERROR("objid O/%*s/"LPU64" already exists\n",
1504 dir_dentry->d_name.len, dir_dentry->d_name.name,
1507 GOTO(out, rc = -EEXIST);
1510 filter_start_transno(export);
1511 handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
1512 if (IS_ERR(handle)) {
1513 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1516 rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
1518 CERROR("create failed rc = %d\n", rc);
1520 rc = filter_finish_transno(export, handle, oti, rc);
1521 err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1523 CERROR("unable to write lastobjid but file created\n");
1527 err = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1529 CERROR("error on commit, err = %d\n", err);
1537 /* Set flags for fields we have set in the inode struct */
1538 oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1539 OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1540 filter_from_inode(oa, new->d_inode, oa->o_valid);
1546 up(&dir_dentry->d_inode->i_sem);
1547 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1551 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1552 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1554 struct obd_export *export = class_conn2export(conn);
1555 struct obd_device *obd = class_conn2obd(conn);
1556 struct filter_obd *filter = &obd->u.filter;
1557 struct dentry *dir_dentry, *object_dentry;
1558 struct filter_dentry_data *fdd;
1559 struct obd_run_ctxt saved;
1565 CERROR("invalid client "LPX64"\n", conn->addr);
1569 XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
1571 CDEBUG(D_INODE, "destroying objid "LPX64"\n", oa->o_id);
1573 dir_dentry = filter_parent(obd, oa->o_mode);
1574 down(&dir_dentry->d_inode->i_sem);
1576 object_dentry = filter_oa2dentry(conn, oa, 0);
1577 if (IS_ERR(object_dentry))
1578 GOTO(out, rc = -ENOENT);
1580 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1581 filter_start_transno(export);
1582 handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
1583 if (IS_ERR(handle)) {
1584 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1588 fdd = object_dentry->d_fsdata;
1589 if (fdd && atomic_read(&fdd->fdd_open_count)) {
1590 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1591 fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1592 /* XXX put into PENDING directory in case of crash */
1594 "defer destroy of %dx open objid "LPX64"\n",
1595 atomic_read(&fdd->fdd_open_count), oa->o_id);
1598 "repeat destroy of %dx open objid "LPX64"\n",
1599 atomic_read(&fdd->fdd_open_count), oa->o_id);
1600 GOTO(out_commit, rc = 0);
1603 rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
1606 /* XXX save last_rcvd on disk */
1607 rc = filter_finish_transno(export, handle, oti, rc);
1608 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1610 CERROR("error on commit, err = %d\n", rc2);
1615 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1616 f_dput(object_dentry);
1620 up(&dir_dentry->d_inode->i_sem);
1624 /* NB start and end are used for punch, but not truncate */
1625 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1626 struct lov_stripe_md *lsm,
1627 obd_off start, obd_off end,
1628 struct obd_trans_info *oti)
1633 XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
1635 if (end != OBD_OBJECT_EOF)
1636 CERROR("PUNCH not supported, only truncate works\n");
1638 CDEBUG(D_INODE, "calling truncate for object "LPX64", valid = %x, "
1639 "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1641 error = filter_setattr(conn, oa, NULL, oti);
1645 static inline void lustre_put_page(struct page *page)
1648 page_cache_release(page);
1652 static struct page *
1653 lustre_get_page_read(struct inode *inode, struct niobuf_remote *rnb)
1655 unsigned long index = rnb->offset >> PAGE_SHIFT;
1656 struct address_space *mapping = inode->i_mapping;
1660 page = read_cache_page(mapping, index,
1661 (filler_t*)mapping->a_ops->readpage, NULL);
1662 if (!IS_ERR(page)) {
1665 if (!PageUptodate(page)) {
1666 CERROR("page index %lu not uptodate\n", index);
1667 GOTO(err_page, rc = -EIO);
1669 if (PageError(page)) {
1670 CERROR("page index %lu has error\n", index);
1671 GOTO(err_page, rc = -EIO);
1677 lustre_put_page(page);
1681 static struct page *
1682 lustre_get_page_write(struct inode *inode, unsigned long index)
1684 struct address_space *mapping = inode->i_mapping;
1688 page = grab_cache_page(mapping, index); /* locked page */
1690 if (!IS_ERR(page)) {
1692 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
1693 * a no-op for most filesystems, because we write the whole
1694 * page. For partial-page I/O this will read in the page.
1696 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
1698 CERROR("page index %lu, rc = %d\n", index, rc);
1701 GOTO(err_unlock, rc);
1703 /* XXX not sure if we need this if we are overwriting page */
1704 if (PageError(page)) {
1705 CERROR("error on page index %lu, rc = %d\n", index, rc);
1707 GOTO(err_unlock, rc = -EIO);
1714 lustre_put_page(page);
1718 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1719 int waitfor_one_page(struct page *page)
1721 wait_on_page_locked(page);
1726 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1727 /* We should only change the file mtime (and not the ctime, like
1728 * update_inode_times() in generic_file_write()) when we only change data.
1730 static inline void inode_update_time(struct inode *inode, int ctime_too)
1732 time_t now = CURRENT_TIME;
1733 if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
1735 inode->i_mtime = now;
1737 inode->i_ctime = now;
1738 mark_inode_dirty_sync(inode);
1742 static int lustre_commit_write(struct niobuf_local *lnb)
1744 struct page *page = lnb->page;
1745 unsigned from = lnb->offset & ~PAGE_MASK;
1746 unsigned to = from + lnb->len;
1747 struct inode *inode = page->mapping->host;
1750 LASSERT(to <= PAGE_SIZE);
1751 err = page->mapping->a_ops->commit_write(NULL, page, from, to);
1752 if (!err && IS_SYNC(inode))
1753 err = waitfor_one_page(page);
1754 //SetPageUptodate(page); // the client commit_write will do this
1756 SetPageReferenced(page);
1758 lustre_put_page(page);
1762 struct page *filter_get_page_write(struct inode *inode,
1763 struct niobuf_remote *rnb,
1764 struct niobuf_local *lnb, int *pglocked)
1766 unsigned long index = rnb->offset >> PAGE_SHIFT;
1767 struct address_space *mapping = inode->i_mapping;
1772 //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
1774 page = grab_cache_page_nowait(mapping, index); /* locked page */
1776 page = grab_cache_page(mapping, index); /* locked page */
1779 /* This page is currently locked, so get a temporary page instead. */
1780 /* XXX I believe this is a very dangerous thing to do - consider if
1781 * we had multiple writers for the same file (definitely the case
1782 * if we are using this codepath). If writer A locks the page,
1783 * writer B writes to a copy (as here), writer A drops the page
1784 * lock, and writer C grabs the lock before B does, then B will
1785 * later overwrite the data from C, even if C had LDLM locked
1786 * and initiated the write after B did.
1790 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
1791 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
1793 CERROR("no memory for a temp page\n");
1795 GOTO(err, rc = -ENOMEM);
1798 memset((void *)addr, 0xBA, PAGE_SIZE);
1799 page = virt_to_page(addr);
1801 page->index = index;
1802 lnb->flags |= N_LOCAL_TEMP_PAGE;
1803 } else if (!IS_ERR(page)) {
1807 rc = mapping->a_ops->prepare_write(NULL, page,
1808 rnb->offset % PAGE_SIZE,
1811 CERROR("page index %lu, rc = %d\n", index, rc);
1814 GOTO(err_unlock, rc);
1816 /* XXX not sure if we need this if we are overwriting page */
1817 if (PageError(page)) {
1818 CERROR("error on page index %lu, rc = %d\n", index, rc);
1820 GOTO(err_unlock, rc = -EIO);
1827 lustre_put_page(page);
1833 * We need to balance prepare_write() calls with commit_write() calls.
1834 * If the page has been prepared, but we have no data for it, we don't
1835 * want to overwrite valid data on disk, but we still need to zero out
1836 * data for space which was newly allocated. Like part of what happens
1837 * in __block_prepare_write() for newly allocated blocks.
1839 * XXX currently __block_prepare_write() creates buffers for all the
1840 * pages, and the filesystems mark these buffers as BH_New if they
1841 * were newly allocated from disk. We use the BH_New flag similarly.
1843 static int filter_commit_write(struct niobuf_local *lnb, int err)
1845 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1847 unsigned block_start, block_end;
1848 struct buffer_head *bh, *head = lnb->page->buffers;
1849 unsigned blocksize = head->b_size;
1851 /* debugging: just seeing if this ever happens */
1852 CERROR("called filter_commit_write for ino %lu:%lu on err %d\n",
1853 lnb->page->mapping->host->i_ino, lnb->page->index, err);
1855 /* Currently one buffer per page, but in the future... */
1856 for (bh = head, block_start = 0; bh != head || !block_start;
1857 block_start = block_end, bh = bh->b_this_page) {
1858 block_end = block_start + blocksize;
1860 memset(lnb->addr + block_start, 0, blocksize);
1864 return lustre_commit_write(lnb);
1867 static int filter_preprw(int cmd, struct lustre_handle *conn,
1868 int objcount, struct obd_ioobj *obj,
1869 int niocount, struct niobuf_remote *nb,
1870 struct niobuf_local *res, void **desc_private,
1871 struct obd_trans_info *oti)
1873 struct obd_run_ctxt saved;
1874 struct obd_export *export;
1875 struct obd_device *obd;
1876 struct obd_ioobj *o;
1877 struct niobuf_remote *rnb = nb;
1878 struct niobuf_local *lnb = res;
1879 struct dentry *dir_dentry;
1880 struct fsfilt_objinfo *fso;
1886 if ((cmd & OBD_BRW_WRITE) != 0)
1887 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
1889 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
1891 memset(res, 0, niocount * sizeof(*res));
1893 export = class_conn2export(conn);
1894 obd = class_conn2obd(conn);
1896 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
1900 LASSERT(objcount < 16); // theoretically we support multi-obj BRW
1902 OBD_ALLOC(fso, objcount * sizeof(*fso));
1906 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
1907 dir_dentry = filter_parent(obd, S_IFREG);
1909 for (i = 0, o = obj; i < objcount; i++, o++) {
1910 struct filter_dentry_data *fdd;
1911 struct dentry *dentry;
1913 LASSERT(o->ioo_bufcnt);
1915 dentry = filter_fid2dentry(obd, dir_dentry, o->ioo_id, 0);
1918 GOTO(out_objinfo, rc = PTR_ERR(dentry));
1920 fso[i].fso_dentry = dentry;
1921 fso[i].fso_bufcnt = o->ioo_bufcnt;
1923 if (!dentry->d_inode) {
1924 CERROR("trying to BRW to non-existent file "LPU64"\n",
1926 GOTO(out_objinfo, rc = -ENOENT);
1929 fdd = dentry->d_fsdata;
1930 if (!fdd || !atomic_read(&fdd->fdd_open_count))
1931 CDEBUG(D_PAGE, "I/O to unopened object "LPX64"\n",
1935 if (cmd & OBD_BRW_WRITE) {
1936 #warning "FIXME: we need to get inode->i_sem for each object here"
1937 /* Even worse, we need to get locks on mulitple inodes (in
1938 * order) or use the DLM to do the locking for us (and use
1939 * the same locking in filter_setattr() for truncate. The
1940 * handling gets very ugly when dealing with locked pages.
1941 * It may be easier to just get rid of the locked page code
1942 * (which has problems of its own) and either discover we do
1943 * not need it anymore (i.e. it was a symptom of another bug)
1944 * or ensure we get the page locks in an appropriate order.
1946 /* Danger, Will Robinson! You are taking a lock here and also
1947 * starting a transaction and releasing/finishing then in
1948 * filter_commitrw(), so you must call fsfilt_commit() and
1949 * finish_transno() if an error occurs in this function.
1951 filter_start_transno(export);
1952 *desc_private = fsfilt_brw_start(obd, objcount, fso,
1954 if (IS_ERR(*desc_private))
1955 GOTO(out_objinfo, rc = PTR_ERR(*desc_private));
1958 obd_kmap_get(niocount, 1);
1960 for (i = 0, o = obj; i < objcount; i++, o++) {
1961 struct dentry *dentry;
1962 struct inode *inode;
1965 dentry = fso[i].fso_dentry;
1966 inode = dentry->d_inode;
1968 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
1972 lnb->dentry = dentry;
1974 lnb->dentry = dget(dentry);
1976 if (cmd & OBD_BRW_WRITE) {
1977 page = filter_get_page_write(inode, rnb, lnb,
1980 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_bytes,
1983 page = lustre_get_page_read(inode, rnb);
1985 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_bytes,
1992 GOTO(out_pages, rc);
1995 lnb->addr = page_address(page);
1996 lnb->offset = rnb->offset;
1998 lnb->len = rnb->len;
2004 OBD_FREE(fso, objcount * sizeof(*fso));
2005 current->journal_info = NULL;
2006 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2010 while (lnb-- > res) {
2011 CERROR("%d error cleanup on brw\n", rc);
2012 if (cmd & OBD_BRW_WRITE)
2013 filter_commit_write(lnb, rc);
2015 lustre_put_page(lnb->page);
2016 f_dput(lnb->dentry);
2018 obd_kmap_put(niocount);
2019 goto out_err; /* dropped the dentry refs already (one per page) */
2022 for (i = 0; i < objcount && fso[i].fso_dentry; i++)
2023 f_dput(fso[i].fso_dentry);
2025 if (cmd & OBD_BRW_WRITE) {
2026 filter_finish_transno(export, *desc_private, oti, rc);
2027 fsfilt_commit(obd, dir_dentry->d_inode, *desc_private);
2032 static int filter_write_locked_page(struct niobuf_local *lnb)
2038 lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2039 if (IS_ERR(lpage)) {
2040 /* It is highly unlikely that we would ever get an error here.
2041 * The page we want to get was previously locked, so it had to
2042 * have already allocated the space, and we were just writing
2043 * over the same data, so there would be no hole in the file.
2045 * XXX: possibility of a race with truncate could exist, need
2046 * to check that. There are no guarantees w.r.t.
2047 * write order even on a local filesystem, although the
2048 * normal response would be to return the number of bytes
2049 * successfully written and leave the rest to the app.
2051 rc = PTR_ERR(lpage);
2052 CERROR("error getting locked page index %ld: rc = %d\n",
2053 lnb->page->index, rc);
2055 lustre_commit_write(lnb);
2059 /* lpage is kmapped in lustre_get_page_write() above and kunmapped in
2060 * lustre_commit_write() below, lnb->page was kmapped previously in
2061 * filter_get_page_write() and kunmapped in lustre_put_page() below.
2063 memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE);
2064 lustre_put_page(lnb->page);
2067 rc = lustre_commit_write(lnb);
2069 CERROR("error committing locked page %ld: rc = %d\n",
2070 lnb->page->index, rc);
2075 static int filter_sync(struct obd_device *obd)
2077 RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2080 static int filter_commitrw(int cmd, struct lustre_handle *conn,
2081 int objcount, struct obd_ioobj *obj,
2082 int niocount, struct niobuf_local *res,
2083 void *desc_private, struct obd_trans_info *oti)
2085 struct obd_run_ctxt saved;
2086 struct obd_ioobj *o;
2087 struct niobuf_local *lnb;
2088 struct obd_export *export = class_conn2export(conn);
2089 struct obd_device *obd = class_conn2obd(conn);
2090 int found_locked = 0;
2095 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2097 LASSERT(!current->journal_info);
2098 current->journal_info = desc_private;
2100 for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2103 if (cmd & OBD_BRW_WRITE)
2104 inode_update_time(lnb->dentry->d_inode, 1);
2105 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2106 if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2111 if (cmd & OBD_BRW_WRITE) {
2112 int err = filter_commit_write(lnb, 0);
2117 lustre_put_page(lnb->page);
2120 f_dput(lnb->dentry);
2124 for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2127 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2129 if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2132 err = filter_write_locked_page(lnb);
2136 f_dput(lnb->dentry);
2141 if (cmd & OBD_BRW_WRITE) {
2143 struct dentry *dir_dentry = filter_parent(obd, S_IFREG);
2145 rc = filter_finish_transno(export, desc_private, oti, rc);
2146 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
2149 if (obd_sync_filter) {
2150 /* this can fail with ENOMEM, what should we do then? */
2153 /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
2156 LASSERT(!current->journal_info);
2158 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2162 static int filter_brw(int cmd, struct lustre_handle *conn,
2163 struct lov_stripe_md *lsm, obd_count oa_bufs,
2164 struct brw_page *pga, struct obd_brw_set *set,
2165 struct obd_trans_info *oti)
2167 struct obd_ioobj ioo;
2168 struct niobuf_local *lnb;
2169 struct niobuf_remote *rnb;
2175 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2176 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2178 if (lnb == NULL || rnb == NULL)
2179 GOTO(out, ret = -ENOMEM);
2181 for (i = 0; i < oa_bufs; i++) {
2182 rnb[i].offset = pga[i].off;
2183 rnb[i].len = pga[i].count;
2186 ioo.ioo_id = lsm->lsm_object_id;
2188 ioo.ioo_type = S_IFREG;
2189 ioo.ioo_bufcnt = oa_bufs;
2191 ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
2192 &desc_private, oti);
2196 for (i = 0; i < oa_bufs; i++) {
2197 void *virt = kmap(pga[i].pg);
2198 obd_off off = pga[i].off & ~PAGE_MASK;
2200 if (cmd & OBD_BRW_WRITE)
2201 memcpy(lnb[i].addr + off, virt + off, pga[i].count);
2203 memcpy(virt + off, lnb[i].addr + off, pga[i].count);
2208 ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private,
2213 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2215 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2219 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2221 struct obd_device *obd;
2224 obd = class_conn2obd(conn);
2226 XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
2228 RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2231 static int filter_get_info(struct lustre_handle *conn, obd_count keylen,
2232 void *key, obd_count *vallen, void **val)
2234 struct obd_device *obd;
2237 obd = class_conn2obd(conn);
2239 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2243 if ( keylen == strlen("blocksize") &&
2244 memcmp(key, "blocksize", keylen) == 0 ) {
2245 *vallen = sizeof(long);
2246 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize;
2250 if ( keylen == strlen("blocksize_bits") &&
2251 memcmp(key, "blocksize_bits", keylen) == 0 ){
2252 *vallen = sizeof(long);
2253 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize_bits;
2257 CDEBUG(D_IOCTL, "invalid key\n");
2261 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2262 struct lustre_handle *src_conn, struct obdo *src,
2263 obd_size count, obd_off offset, struct obd_trans_info *oti)
2266 struct lov_stripe_md srcmd, dstmd;
2267 unsigned long index = 0;
2270 memset(&srcmd, 0, sizeof(srcmd));
2271 memset(&dstmd, 0, sizeof(dstmd));
2272 srcmd.lsm_object_id = src->o_id;
2273 dstmd.lsm_object_id = dst->o_id;
2276 CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2277 ", dst: ino "LPU64"\n",
2278 src->o_id, src->o_blocks, src->o_size, dst->o_id);
2279 page = alloc_page(GFP_USER);
2283 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2284 while (TryLockPage(page))
2285 ___wait_on_page(page);
2287 wait_on_page_locked(page);
2290 /* XXX with brw vector I/O, we could batch up reads and writes here,
2291 * all we need to do is allocate multiple pages to handle the I/Os
2292 * and arrays to handle the request parameters.
2294 while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2296 struct obd_brw_set *set;
2298 set = obd_brw_set_new();
2306 pg.count = PAGE_SIZE;
2307 pg.off = (page->index) << PAGE_SHIFT;
2310 page->index = index;
2311 set->brw_callback = ll_brw_sync_wait;
2312 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
2313 obd_brw_set_free(set);
2319 set = obd_brw_set_new();
2325 pg.flag = OBD_BRW_CREATE;
2326 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2328 set->brw_callback = ll_brw_sync_wait;
2329 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
2330 obd_brw_set_free(set);
2332 /* XXX should handle dst->o_size, dst->o_blocks here */
2338 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2342 dst->o_size = src->o_size;
2343 dst->o_blocks = src->o_blocks;
2344 dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2351 static struct obd_ops filter_obd_ops = {
2352 o_owner: THIS_MODULE,
2353 o_attach: filter_attach,
2354 o_detach: filter_detach,
2355 o_get_info: filter_get_info,
2356 o_setup: filter_setup,
2357 o_cleanup: filter_cleanup,
2358 o_connect: filter_connect,
2359 o_disconnect: filter_disconnect,
2360 o_statfs: filter_statfs,
2361 o_getattr: filter_getattr,
2362 o_create: filter_create,
2363 o_setattr: filter_setattr,
2364 o_destroy: filter_destroy,
2365 o_open: filter_open,
2366 o_close: filter_close,
2368 o_punch: filter_truncate,
2369 o_preprw: filter_preprw,
2370 o_commitrw: filter_commitrw
2372 o_preallocate: filter_preallocate_inodes,
2373 o_migrate: filter_migrate,
2374 o_copy: filter_copy_data,
2375 o_iterate: filter_iterate
2380 static int __init obdfilter_init(void)
2382 struct lprocfs_static_vars lvars;
2384 printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2385 filter_open_cache = kmem_cache_create("ll_filter_fdata",
2386 sizeof(struct filter_file_data),
2388 if (!filter_open_cache)
2391 filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
2392 sizeof(struct filter_dentry_data),
2394 if (!filter_dentry_cache) {
2395 kmem_cache_destroy(filter_open_cache);
2399 xprocfs_init ("filter");
2401 lprocfs_init_vars(&lvars);
2402 return class_register_type(&filter_obd_ops, lvars.module_vars,
2403 OBD_FILTER_DEVICENAME);
2406 static void __exit obdfilter_exit(void)
2408 class_unregister_type(OBD_FILTER_DEVICENAME);
2409 if (kmem_cache_destroy(filter_dentry_cache))
2410 CERROR("couldn't free obdfilter dentry cache\n");
2411 if (kmem_cache_destroy(filter_open_cache))
2412 CERROR("couldn't free obdfilter open cache\n");
2416 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2417 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2418 MODULE_LICENSE("GPL");
2420 module_init(obdfilter_init);
2421 module_exit(obdfilter_exit);