1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28 * (which need to get journal_lock, may block if journal full).
30 * Invariant: Call filter_start_transno() before any journal ops to avoid the
31 * same deadlock problem. We can (and want) to get rid of the
32 * transno sem in favour of the dir/inode i_sem to avoid single
33 * threaded operation on the OST.
37 #define DEBUG_SUBSYSTEM S_FILTER
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
57 LPROC_FILTER_READS = 0,
58 LPROC_FILTER_READ_BYTES = 1,
59 LPROC_FILTER_WRITES = 2,
60 LPROC_FILTER_WRITE_BYTES = 3,
61 LPROC_FILTER_LAST = LPROC_FILTER_WRITE_BYTES +1
64 /* should be generic per-obd stats... */
65 struct xprocfs_io_stat {
70 __u64 st_getattr_reqs;
71 __u64 st_setattr_reqs;
73 __u64 st_destroy_reqs;
81 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
82 static struct proc_dir_entry *xprocfs_dir;
84 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count) \
86 xprocfs_iostats[smp_processor_id()].field += (count); \
89 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
90 #define DECLARE_XPROCFS_SUM_STAT(field) \
92 xprocfs_sum_##field (void) \
97 for (i = 0; i < smp_num_cpus; i++) \
98 stat += xprocfs_iostats[i].field; \
102 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
103 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
104 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
105 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
106 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
107 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
108 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
109 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
110 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
111 DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs)
112 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
113 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
114 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
118 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
119 int *eof, void *data)
121 long long (*fn)(void) = (long long(*)(void))data;
128 len = snprintf (page, count, "%Ld\n", fn());
135 xprocfs_add_stat(char *name, long long (*fn)(void))
137 struct proc_dir_entry *entry;
139 entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
141 CERROR ("Can't add procfs stat %s\n", name);
146 entry->read_proc = xprocfs_rd_stat;
147 entry->write_proc = NULL;
151 xprocfs_init (char *name)
155 snprintf (dirname, sizeof (dirname), "sys/%s", name);
157 xprocfs_dir = proc_mkdir (dirname, NULL);
158 if (xprocfs_dir == NULL) {
159 CERROR ("Can't make procfs dir %s\n", dirname);
163 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
164 xprocfs_add_stat ("read_bytes", xprocfs_sum_st_read_bytes);
165 xprocfs_add_stat ("read_reqs", xprocfs_sum_st_read_reqs);
166 xprocfs_add_stat ("write_bytes", xprocfs_sum_st_write_bytes);
167 xprocfs_add_stat ("write_reqs", xprocfs_sum_st_write_reqs);
168 xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
169 xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
170 xprocfs_add_stat ("create_reqs", xprocfs_sum_st_create_reqs);
171 xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
172 xprocfs_add_stat ("statfs_reqs", xprocfs_sum_st_statfs_reqs);
173 xprocfs_add_stat ("syncfs_reqs", xprocfs_sum_st_syncfs_reqs);
174 xprocfs_add_stat ("open_reqs", xprocfs_sum_st_open_reqs);
175 xprocfs_add_stat ("close_reqs", xprocfs_sum_st_close_reqs);
176 xprocfs_add_stat ("punch_reqs", xprocfs_sum_st_punch_reqs);
180 void xprocfs_fini (void)
182 if (xprocfs_dir == NULL)
185 remove_proc_entry ("read_bytes", xprocfs_dir);
186 remove_proc_entry ("read_reqs", xprocfs_dir);
187 remove_proc_entry ("write_bytes", xprocfs_dir);
188 remove_proc_entry ("write_reqs", xprocfs_dir);
189 remove_proc_entry ("getattr_reqs", xprocfs_dir);
190 remove_proc_entry ("setattr_reqs", xprocfs_dir);
191 remove_proc_entry ("create_reqs", xprocfs_dir);
192 remove_proc_entry ("destroy_reqs", xprocfs_dir);
193 remove_proc_entry ("statfs_reqs", xprocfs_dir);
194 remove_proc_entry ("syncfs_reqs", xprocfs_dir);
195 remove_proc_entry ("open_reqs", xprocfs_dir);
196 remove_proc_entry ("close_reqs", xprocfs_dir);
197 remove_proc_entry ("punch_reqs", xprocfs_dir);
199 remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
204 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
206 [S_IFREG >> S_SHIFT] "R",
207 [S_IFDIR >> S_SHIFT] "D",
208 [S_IFCHR >> S_SHIFT] "C",
209 [S_IFBLK >> S_SHIFT] "B",
210 [S_IFIFO >> S_SHIFT] "F",
211 [S_IFSOCK >> S_SHIFT] "S",
212 [S_IFLNK >> S_SHIFT] "L"
215 static inline const char *obd_mode_to_type(int mode)
217 return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
220 static void filter_ffd_addref(void *ffdp)
222 struct filter_file_data *ffd = ffdp;
224 atomic_inc(&ffd->ffd_refcount);
225 CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
226 atomic_read(&ffd->ffd_refcount));
229 static struct filter_file_data *filter_ffd_new(void)
231 struct filter_file_data *ffd;
233 OBD_ALLOC(ffd, sizeof *ffd);
235 CERROR("out of memory\n");
239 atomic_set(&ffd->ffd_refcount, 2);
241 INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
242 class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
247 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
249 struct filter_file_data *ffd = NULL;
251 LASSERT(handle != NULL);
252 ffd = class_handle2object(handle->cookie);
254 LASSERT(ffd->ffd_file->private_data == ffd);
258 static void filter_ffd_put(struct filter_file_data *ffd)
260 CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
261 atomic_read(&ffd->ffd_refcount) - 1);
262 LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
263 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
264 if (atomic_dec_and_test(&ffd->ffd_refcount)) {
265 LASSERT(list_empty(&ffd->ffd_handle.h_link));
266 OBD_FREE(ffd, sizeof *ffd);
270 static void filter_ffd_destroy(struct filter_file_data *ffd)
272 class_handle_unhash(&ffd->ffd_handle);
276 static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
278 obd_transno_commit_cb(obd, transno, error);
280 /* Assumes caller has already pushed us into the kernel context. */
281 int filter_finish_transno(struct obd_export *export, void *handle,
282 struct obd_trans_info *oti, int rc)
285 struct obd_device *obd = export->exp_obd;
286 struct filter_obd *filter = &obd->u.filter;
287 struct filter_export_data *fed = &export->exp_filter_data;
288 struct filter_client_data *fcd = fed->fed_fcd;
292 /* Propagate error code. */
296 if (!obd->obd_replayable)
299 /* we don't allocate new transnos for replayed requests */
301 /* perhaps if transno already set? or should level be in oti? */
302 if (req->rq_level == LUSTRE_CONN_RECOVD)
306 off = fed->fed_lr_off;
308 spin_lock(&filter->fo_translock);
309 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
310 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
311 spin_unlock(&filter->fo_translock);
313 oti->oti_transno = last_rcvd;
314 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
315 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
317 /* get this from oti */
320 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
323 fcd->fcd_last_xid = 0;
325 fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
326 written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
328 CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
329 LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
331 if (written == sizeof(*fcd))
333 CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
340 /* write the pathname into the string */
341 static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
344 if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
345 sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
347 sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
348 (int)id & (filter->fo_subdir_count - 1), id);
353 static inline void f_dput(struct dentry *dentry)
355 /* Can't go inside filter_ddelete because it can block */
356 CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
357 dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
358 LASSERT(atomic_read(&dentry->d_count) > 0);
363 /* Not racy w.r.t. others, because we are the only user of this dentry */
364 static void filter_drelease(struct dentry *dentry)
366 if (dentry->d_fsdata)
367 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
370 struct dentry_operations filter_dops = {
371 .d_release = filter_drelease,
374 #define LAST_RCVD "last_rcvd"
377 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
378 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
379 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
381 /* Add client data to the FILTER. We use a bitmap to locate a free space
382 * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
383 * Otherwise, we have just read the data from the last_rcvd file and
384 * we know its offset.
386 int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
387 struct filter_export_data *fed, int cl_idx)
389 unsigned long *bitmap = filter->fo_last_rcvd_slots;
390 int new_client = (cl_idx == -1);
392 LASSERT(bitmap != NULL);
394 /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
395 if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
398 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
399 * there's no need for extra complication here
402 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
404 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
405 CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
408 if (test_and_set_bit(cl_idx, bitmap)) {
409 CERROR("FILTER client %d: found bit is set in bitmap\n",
411 cl_idx = find_next_zero_bit(bitmap,
412 FILTER_LR_MAX_CLIENTS,
417 if (test_and_set_bit(cl_idx, bitmap)) {
418 CERROR("FILTER client %d: bit already set in bitmap!\n",
424 fed->fed_lr_idx = cl_idx;
425 fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
426 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
428 CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
429 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
432 struct obd_run_ctxt saved;
433 loff_t off = fed->fed_lr_off;
437 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
438 fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
440 push_ctxt(&saved, &filter->fo_ctxt, NULL);
441 /* Transaction eeded to fix for bug 1403 */
442 handle = fsfilt_start(obd,
443 filter->fo_rcvd_filp->f_dentry->d_inode,
445 if (IS_ERR(handle)) {
446 written = PTR_ERR(handle);
447 CERROR("unable to start transaction: rc %d\n",
450 written = lustre_fwrite(filter->fo_rcvd_filp,
451 (char *)fed->fed_fcd,
452 sizeof(*fed->fed_fcd), &off);
454 filter->fo_rcvd_filp->f_dentry->d_inode,
457 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
459 if (written != sizeof(*fed->fed_fcd)) {
468 int filter_client_free(struct obd_export *exp, int failover)
470 struct filter_export_data *fed = &exp->exp_filter_data;
471 struct filter_obd *filter = &exp->exp_obd->u.filter;
472 struct filter_client_data zero_fcd;
473 struct obd_run_ctxt saved;
481 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
485 LASSERT(filter->fo_last_rcvd_slots != NULL);
487 off = fed->fed_lr_off;
489 CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
490 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
492 if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
493 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
498 memset(&zero_fcd, 0, sizeof zero_fcd);
499 push_ctxt(&saved, &filter->fo_ctxt, NULL);
500 written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
501 sizeof(zero_fcd), &off);
503 /* XXX: this write gets lost sometimes, unless this sync is here. */
505 file_fsync(filter->fo_rcvd_filp,
506 filter->fo_rcvd_filp->f_dentry, 1);
507 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
509 if (written != sizeof(zero_fcd)) {
510 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
511 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
515 "zeroed disconnecting client %s at idx %u (%llu)\n",
516 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
519 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
524 static int filter_free_server_data(struct filter_obd *filter)
526 OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
527 filter->fo_fsd = NULL;
528 OBD_FREE(filter->fo_last_rcvd_slots,
529 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
530 filter->fo_last_rcvd_slots = NULL;
535 /* assumes caller is already in kernel ctxt */
536 static int filter_update_server_data(struct file *filp,
537 struct filter_server_data *fsd)
542 CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
543 CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
544 le64_to_cpu(fsd->fsd_last_objid));
545 CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
546 le64_to_cpu(fsd->fsd_last_rcvd));
547 CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
548 le64_to_cpu(fsd->fsd_mount_count));
550 rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
551 if (rc != sizeof(*fsd)) {
552 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
559 /* assumes caller has already in kernel ctxt */
560 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
561 __u64 init_lastobjid)
563 struct filter_obd *filter = &obd->u.filter;
564 struct filter_server_data *fsd;
565 struct filter_client_data *fcd = NULL;
566 struct inode *inode = filp->f_dentry->d_inode;
567 unsigned long last_rcvd_size = inode->i_size;
568 __u64 mount_count = 0;
573 /* ensure padding in the struct is the correct size */
574 LASSERT (offsetof(struct filter_server_data, fsd_padding) +
575 sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
576 LASSERT (offsetof(struct filter_client_data, fcd_padding) +
577 sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
579 OBD_ALLOC(fsd, sizeof(*fsd));
582 filter->fo_fsd = fsd;
584 OBD_ALLOC(filter->fo_last_rcvd_slots,
585 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
586 if (filter->fo_last_rcvd_slots == NULL) {
587 OBD_FREE(fsd, sizeof(*fsd));
591 if (last_rcvd_size == 0) {
592 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
594 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
595 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
596 fsd->fsd_last_rcvd = 0;
597 mount_count = fsd->fsd_mount_count = 0;
598 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
599 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
600 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
601 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
602 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
604 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
606 if (retval != sizeof(*fsd)) {
607 CDEBUG(D_INODE,"OBD filter: error reading %s\n",
609 GOTO(err_fsd, rc = -EIO);
611 mount_count = le64_to_cpu(fsd->fsd_mount_count);
612 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
615 if (fsd->fsd_feature_incompat) {
616 CERROR("unsupported feature %x\n",
617 le32_to_cpu(fsd->fsd_feature_incompat));
618 GOTO(err_fsd, rc = -EINVAL);
620 if (fsd->fsd_feature_rocompat) {
621 CERROR("read-only feature %x\n",
622 le32_to_cpu(fsd->fsd_feature_rocompat));
623 /* Do something like remount filesystem read-only */
624 GOTO(err_fsd, rc = -EINVAL);
627 CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
628 obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
629 CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
630 obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
631 CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
632 obd->obd_name, mount_count);
633 CDEBUG(D_INODE, "%s: server data size: %u\n",
634 obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
635 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
636 obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
637 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
638 obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
639 CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
640 obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
643 * When we do a clean FILTER shutdown, we save the last_rcvd into
644 * the header. If we find clients with higher last_rcvd values
645 * then those clients may need recovery done.
647 if (!obd->obd_replayable) {
648 CERROR("%s: recovery support OFF\n", obd->obd_name);
652 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
657 OBD_ALLOC(fcd, sizeof(*fcd));
659 GOTO(err_fsd, rc = -ENOMEM);
662 /* Don't assume off is incremented properly, in case
663 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
665 off = le32_to_cpu(fsd->fsd_client_start) +
666 cl_idx * le16_to_cpu(fsd->fsd_client_size);
667 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
668 if (rc != sizeof(*fcd)) {
669 CERROR("error reading FILTER %s offset %d: rc = %d\n",
670 LAST_RCVD, cl_idx, rc);
671 if (rc > 0) /* XXX fatal error or just abort reading? */
676 if (fcd->fcd_uuid[0] == '\0') {
677 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
682 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
684 /* These exports are cleaned up by filter_disconnect(), so they
685 * need to be set up like real exports as filter_connect() does.
687 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
688 if (mount_age < FILTER_MOUNT_RECOV) {
689 struct obd_export *exp = class_new_export(obd);
690 struct filter_export_data *fed;
691 CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
692 " srv lr: "LPU64" mnt: "LPU64" last mount: "
693 LPU64"\n", fcd->fcd_uuid, cl_idx,
694 last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
695 le64_to_cpu(fcd->fcd_mount_count), mount_count);
697 /* XXX this rc is ignored */
701 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
702 sizeof exp->exp_client_uuid.uuid);
703 fed = &exp->exp_filter_data;
705 filter_client_add(obd, filter, fed, cl_idx);
706 /* create helper if export init gets more complex */
707 INIT_LIST_HEAD(&fed->fed_open_head);
708 spin_lock_init(&fed->fed_lock);
711 obd->obd_recoverable_clients++;
712 class_export_put(exp);
715 "discarded client %d UUID '%s' count "LPU64"\n",
716 cl_idx, fcd->fcd_uuid,
717 le64_to_cpu(fcd->fcd_mount_count));
720 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
723 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
724 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
726 obd->obd_last_committed =
727 le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
728 if (obd->obd_recoverable_clients) {
729 CERROR("RECOVERY: %d recoverable clients, last_rcvd "
730 LPU64"\n", obd->obd_recoverable_clients,
731 le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
732 obd->obd_next_recovery_transno =
733 obd->obd_last_committed + 1;
734 obd->obd_recovering = 1;
740 OBD_FREE(fcd, sizeof(*fcd));
743 fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
745 /* save it,so mount count and last_recvd is current */
746 rc = filter_update_server_data(filp, filter->fo_fsd);
751 filter_free_server_data(filter);
755 /* setup the object store with correct subdirectories */
756 static int filter_prep(struct obd_device *obd)
758 struct obd_run_ctxt saved;
759 struct filter_obd *filter = &obd->u.filter;
760 struct dentry *dentry, *O_dentry;
767 push_ctxt(&saved, &filter->fo_ctxt, NULL);
768 dentry = simple_mkdir(current->fs->pwd, "O", 0700);
769 CDEBUG(D_INODE, "got/created O: %p\n", dentry);
770 if (IS_ERR(dentry)) {
771 rc = PTR_ERR(dentry);
772 CERROR("cannot open/create O: rc = %d\n", rc);
775 filter->fo_dentry_O = dentry;
778 * Create directories and/or get dentries for each object type.
779 * This saves us from having to do multiple lookups for each one.
781 O_dentry = filter->fo_dentry_O;
782 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
783 char *name = obd_type_by_mode[mode];
786 filter->fo_dentry_O_mode[mode] = NULL;
789 dentry = simple_mkdir(O_dentry, name, 0700);
790 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
791 if (IS_ERR(dentry)) {
792 rc = PTR_ERR(dentry);
793 CERROR("cannot create O/%s: rc = %d\n", name, rc);
794 GOTO(err_O_mode, rc);
796 filter->fo_dentry_O_mode[mode] = dentry;
799 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
800 if (!file || IS_ERR(file)) {
802 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
804 GOTO(err_O_mode, rc);
807 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
808 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
809 file->f_dentry->d_inode->i_mode);
810 GOTO(err_filp, rc = -ENOENT);
813 rc = fsfilt_journal_data(obd, file);
815 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
818 /* steal operations */
819 inode = file->f_dentry->d_inode;
820 filter->fo_fop = file->f_op;
821 filter->fo_iop = inode->i_op;
822 filter->fo_aops = inode->i_mapping->a_ops;
824 rc = filter_init_server_data(obd, file, INIT_OBJID);
826 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
827 GOTO(err_client, rc);
829 filter->fo_rcvd_filp = file;
831 if (filter->fo_subdir_count) {
832 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
833 OBD_ALLOC(filter->fo_dentry_O_sub,
834 filter->fo_subdir_count * sizeof(dentry));
835 if (!filter->fo_dentry_O_sub)
836 GOTO(err_client, rc = -ENOMEM);
838 for (i = 0; i < filter->fo_subdir_count; i++) {
840 snprintf(dir, sizeof(dir), "d%u", i);
842 dentry = simple_mkdir(O_dentry, dir, 0700);
843 CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
844 if (IS_ERR(dentry)) {
845 rc = PTR_ERR(dentry);
846 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
849 filter->fo_dentry_O_sub[i] = dentry;
854 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
860 struct dentry *dentry = filter->fo_dentry_O_sub[i];
863 filter->fo_dentry_O_sub[i] = NULL;
866 OBD_FREE(filter->fo_dentry_O_sub,
867 filter->fo_subdir_count * sizeof(dentry));
869 class_disconnect_exports(obd, 0);
871 if (filp_close(file, 0))
872 CERROR("can't close %s after error\n", LAST_RCVD);
873 filter->fo_rcvd_filp = NULL;
876 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
879 filter->fo_dentry_O_mode[mode] = NULL;
882 f_dput(filter->fo_dentry_O);
883 filter->fo_dentry_O = NULL;
887 /* cleanup the filter: write last used object id to status file */
888 static void filter_post(struct obd_device *obd)
890 struct obd_run_ctxt saved;
891 struct filter_obd *filter = &obd->u.filter;
895 /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
896 * best to start a transaction with h_sync, because we removed this
899 push_ctxt(&saved, &filter->fo_ctxt, NULL);
900 rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
902 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
905 if (filter->fo_rcvd_filp) {
906 rc = file_fsync(filter->fo_rcvd_filp,
907 filter->fo_rcvd_filp->f_dentry, 1);
908 filp_close(filter->fo_rcvd_filp, 0);
909 filter->fo_rcvd_filp = NULL;
911 CERROR("last_rcvd file won't closed rc = %ld\n", rc);
914 if (filter->fo_subdir_count) {
916 for (i = 0; i < filter->fo_subdir_count; i++) {
917 struct dentry *dentry = filter->fo_dentry_O_sub[i];
919 filter->fo_dentry_O_sub[i] = NULL;
921 OBD_FREE(filter->fo_dentry_O_sub,
922 filter->fo_subdir_count *
923 sizeof(*filter->fo_dentry_O_sub));
925 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
926 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
929 filter->fo_dentry_O_mode[mode] = NULL;
932 f_dput(filter->fo_dentry_O);
933 filter_free_server_data(filter);
934 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
938 static __u64 filter_next_id(struct obd_device *obd)
941 LASSERT(obd->u.filter.fo_fsd != NULL);
943 spin_lock(&obd->u.filter.fo_objidlock);
944 id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid);
945 obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
946 spin_unlock(&obd->u.filter.fo_objidlock);
951 /* how to get files, dentries, inodes from object id's */
952 /* parent i_sem is already held if needed for exclusivity */
953 static struct dentry *filter_fid2dentry(struct obd_device *obd,
954 struct dentry *dparent,
955 __u64 id, int lockit)
957 struct super_block *sb = obd->u.filter.fo_sb;
958 struct dentry *dchild;
963 if (!sb || !sb->s_dev) {
964 CERROR("fatal: device not initialized.\n");
965 RETURN(ERR_PTR(-ENXIO));
969 CERROR("fatal: invalid object id 0\n");
971 RETURN(ERR_PTR(-ESTALE));
974 len = sprintf(name, LPU64, id);
975 CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
976 dparent->d_name.len, dparent->d_name.name, name);
978 down(&dparent->d_inode->i_sem);
979 dchild = lookup_one_len(name, dparent, len);
981 up(&dparent->d_inode->i_sem);
982 if (IS_ERR(dchild)) {
983 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
987 CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
988 dparent->d_name.len, dparent->d_name.name, name, dchild,
989 atomic_read(&dchild->d_count));
991 LASSERT(atomic_read(&dchild->d_count) > 0);
996 /* direct cut-n-paste of mds_blocking_ast() */
997 int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
998 void *data, int flag)
1003 if (flag == LDLM_CB_CANCELING) {
1004 /* Don't need to do anything here. */
1008 /* XXX layering violation! -phil */
1009 l_lock(&lock->l_resource->lr_namespace->ns_lock);
1010 /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
1011 * such that mds_blocking_ast is called just before l_i_p takes the
1012 * ns_lock, then by the time we get the lock, we might not be the
1013 * correct blocking function anymore. So check, and return early, if
1015 if (lock->l_blocking_ast != filter_blocking_ast) {
1016 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
1020 lock->l_flags |= LDLM_FL_CBPENDING;
1021 do_ast = (!lock->l_readers && !lock->l_writers);
1022 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
1025 struct lustre_handle lockh;
1028 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
1029 ldlm_lock2handle(lock, &lockh);
1030 rc = ldlm_cli_cancel(&lockh);
1032 CERROR("ldlm_cli_cancel: %d\n", rc);
1034 LDLM_DEBUG(lock, "Lock still has references, will be "
1040 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
1041 int lock_mode, struct lustre_handle *lockh)
1043 struct ldlm_res_id res_id = { .name = {0} };
1047 res_id.name[0] = de->d_inode->i_ino;
1048 res_id.name[1] = de->d_inode->i_generation;
1049 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1050 res_id, LDLM_PLAIN, NULL, 0, lock_mode,
1051 &flags, ldlm_completion_ast,
1052 filter_blocking_ast, NULL, lockh);
1054 RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
1057 static inline struct dentry *filter_parent(struct obd_device *obd,
1058 obd_mode mode, obd_id objid)
1060 struct filter_obd *filter = &obd->u.filter;
1062 LASSERT(S_ISREG(mode)); /* only regular files for now */
1063 if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
1064 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
1066 return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
1069 static inline struct dentry *filter_parent_lock(struct obd_device *obd,
1070 obd_mode mode, obd_id objid,
1072 struct lustre_handle *lockh)
1074 struct dentry *de = filter_parent(obd, mode, objid);
1080 rc = filter_lock_dentry(obd, de, lock_mode, lockh);
1081 return rc ? ERR_PTR(rc) : de;
1084 static struct file *filter_obj_open(struct obd_export *export,
1085 __u64 id, __u32 type, int parent_mode,
1086 struct lustre_handle *parent_lockh)
1088 struct obd_device *obd = export->exp_obd;
1089 struct filter_obd *filter = &obd->u.filter;
1090 struct super_block *sb = filter->fo_sb;
1091 struct dentry *dchild = NULL, *parent;
1092 struct filter_export_data *fed = &export->exp_filter_data;
1093 struct filter_dentry_data *fdd = NULL;
1094 struct filter_file_data *ffd = NULL;
1095 struct obd_run_ctxt saved;
1098 int len, cleanup_phase = 0;
1101 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1103 if (!sb || !sb->s_dev) {
1104 CERROR("fatal: device not initialized.\n");
1105 GOTO(cleanup, file = ERR_PTR(-ENXIO));
1109 CERROR("fatal: invalid obdo "LPU64"\n", id);
1110 GOTO(cleanup, file = ERR_PTR(-ESTALE));
1113 if (!(type & S_IFMT)) {
1114 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
1115 __FUNCTION__, id, type);
1116 GOTO(cleanup, file = ERR_PTR(-EINVAL));
1119 ffd = filter_ffd_new();
1121 CERROR("obdfilter: out of memory\n");
1122 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1127 /* We preallocate this to avoid blocking while holding fo_fddlock */
1128 OBD_ALLOC(fdd, sizeof *fdd);
1130 CERROR("obdfilter: out of memory\n");
1131 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1136 parent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1138 GOTO(cleanup, file = (void *)parent);
1142 len = snprintf(name, sizeof(name), LPU64, id);
1143 dchild = lookup_one_len(name, parent, len);
1145 GOTO(cleanup, file = (void *)dchild);
1146 LASSERT(dchild->d_inode);
1150 /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
1151 mntget(filter->fo_vfsmnt);
1152 file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1154 dchild = NULL; /* prevent a double dput in step 4 */
1155 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1156 GOTO(cleanup, file);
1159 spin_lock(&filter->fo_fddlock);
1160 if (dchild->d_fsdata) {
1161 spin_unlock(&filter->fo_fddlock);
1162 OBD_FREE(fdd, sizeof *fdd);
1163 fdd = dchild->d_fsdata;
1164 /* should only happen during client recovery */
1165 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1166 CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1167 atomic_inc(&fdd->fdd_open_count);
1169 atomic_set(&fdd->fdd_open_count, 1);
1171 fdd->fdd_objid = id;
1172 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1173 dchild->d_fsdata = fdd;
1174 spin_unlock(&filter->fo_fddlock);
1177 ffd->ffd_file = file;
1178 LASSERT(file->private_data == NULL);
1179 file->private_data = ffd;
1182 dchild->d_op = &filter_dops;
1184 LASSERT(dchild->d_op == &filter_dops);
1186 spin_lock(&fed->fed_lock);
1187 list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1188 spin_unlock(&fed->fed_lock);
1190 CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1192 switch (cleanup_phase) {
1198 ldlm_lock_decref(parent_lockh, parent_mode);
1201 OBD_FREE(fdd, sizeof *fdd);
1204 filter_ffd_destroy(ffd);
1205 filter_ffd_put(ffd);
1207 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1212 /* Caller must hold i_sem on dir_dentry->d_inode */
1213 /* Caller must push us into kernel context */
1214 static int filter_destroy_internal(struct obd_device *obd,
1215 struct dentry *dir_dentry,
1216 struct dentry *object_dentry)
1218 struct inode *inode = object_dentry->d_inode;
1222 if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1223 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1224 object_dentry->d_name.len,
1225 object_dentry->d_name.name,
1226 inode->i_nlink, atomic_read(&inode->i_count));
1229 rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
1232 CERROR("error unlinking objid %*s: rc %d\n",
1233 object_dentry->d_name.len,
1234 object_dentry->d_name.name, rc);
1239 /* If closing because we are failing this device, then
1240 don't do the unlink on close.
1242 static int filter_close_internal(struct obd_export *export,
1243 struct filter_file_data *ffd,
1244 struct obd_trans_info *oti,
1247 struct obd_device *obd = export->exp_obd;
1248 struct filter_obd *filter = &obd->u.filter;
1249 struct file *filp = ffd->ffd_file;
1250 struct dentry *object_dentry = dget(filp->f_dentry);
1251 struct filter_dentry_data *fdd = object_dentry->d_fsdata;
1252 struct lustre_handle parent_lockh;
1253 int rc, rc2, cleanup_phase = 0;
1254 struct dentry *dir_dentry;
1255 struct obd_run_ctxt saved;
1258 LASSERT(filp->private_data == ffd);
1261 rc = filp_close(filp, 0);
1263 if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1264 fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
1267 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1270 dir_dentry = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1271 LCK_PW, &parent_lockh);
1272 if (IS_ERR(dir_dentry))
1273 GOTO(cleanup, rc = PTR_ERR(dir_dentry));
1276 handle = fsfilt_start(obd, dir_dentry->d_inode,
1279 GOTO(cleanup, rc = PTR_ERR(handle));
1281 /* XXX unlink from PENDING directory now too */
1282 rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
1285 rc = filter_finish_transno(export, handle, oti, rc);
1286 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
1288 CERROR("error on commit, err = %d\n", rc2);
1295 switch(cleanup_phase) {
1297 if (rc || oti == NULL) {
1298 ldlm_lock_decref(&parent_lockh, LCK_PW);
1300 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1301 sizeof(parent_lockh));
1302 oti->oti_ack_locks[0].mode = LCK_PW;
1305 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1307 f_dput(object_dentry);
1308 filter_ffd_destroy(ffd);
1311 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1319 /* mount the file system (secretly) */
1320 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1323 struct obd_ioctl_data* data = buf;
1324 struct filter_obd *filter;
1325 struct vfsmount *mnt;
1329 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1332 obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1333 if (IS_ERR(obd->obd_fsops))
1334 RETURN(PTR_ERR(obd->obd_fsops));
1336 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1341 if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1342 if (*data->ioc_inlbuf3 == 'f') {
1343 obd->obd_replayable = 1;
1344 obd_sync_filter = 1;
1345 CERROR("%s: configured for recovery and sync write\n",
1348 CERROR("unrecognised flag '%c'\n",
1349 *data->ioc_inlbuf3);
1353 filter = &obd->u.filter;
1354 filter->fo_vfsmnt = mnt;
1355 filter->fo_fstype = strdup(data->ioc_inlbuf2);
1356 filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
1357 CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1359 OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1360 filter->fo_ctxt.pwdmnt = mnt;
1361 filter->fo_ctxt.pwd = mnt->mnt_root;
1362 filter->fo_ctxt.fs = get_ds();
1364 rc = filter_prep(obd);
1366 GOTO(err_kfree, rc);
1368 spin_lock_init(&filter->fo_translock);
1369 spin_lock_init(&filter->fo_fddlock);
1370 spin_lock_init(&filter->fo_objidlock);
1371 INIT_LIST_HEAD(&filter->fo_export_list);
1373 obd->obd_namespace =
1374 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
1375 if (!obd->obd_namespace)
1376 GOTO(err_post, rc = -ENOMEM);
1378 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1379 "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1386 kfree(filter->fo_fstype);
1388 mntput(filter->fo_vfsmnt);
1392 fsfilt_put_ops(obd->obd_fsops);
1396 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1398 struct obd_ioctl_data* data = buf;
1399 char *option = NULL;
1401 if (!strcmp(data->ioc_inlbuf2, "ext3"))
1402 option = "asyncdel";
1404 return filter_common_setup(obd, len, buf, option);
1407 /* sanobd setup methods - use a specific mount option */
1408 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1410 struct obd_ioctl_data* data = buf;
1411 char *option = NULL;
1413 if (!data->ioc_inlbuf2)
1416 /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1417 if (!strcmp(data->ioc_inlbuf2, "extN"))
1418 option = "data=writeback";
1419 else if (!strcmp(data->ioc_inlbuf2, "ext3"))
1420 option = "data=writeback,asyncdel";
1422 LBUG(); /* just a reminder */
1424 return filter_common_setup(obd, len, buf, option);
1427 static int filter_cleanup(struct obd_device *obd, int force, int failover)
1429 struct super_block *sb;
1433 CERROR("%s: shutting down for failover; client state will"
1434 " be preserved.\n", obd->obd_name);
1436 if (!list_empty(&obd->obd_exports)) {
1437 CERROR("%s: still has clients!\n", obd->obd_name);
1438 class_disconnect_exports(obd, failover);
1439 if (!list_empty(&obd->obd_exports)) {
1440 CERROR("still has exports after forced cleanup?\n");
1445 ldlm_namespace_free(obd->obd_namespace);
1447 sb = obd->u.filter.fo_sb;
1448 if (!obd->u.filter.fo_sb)
1453 shrink_dcache_parent(sb->s_root);
1456 if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
1457 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1458 atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
1461 mntput(obd->u.filter.fo_vfsmnt);
1462 obd->u.filter.fo_sb = 0;
1463 /* destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
1465 kfree(obd->u.filter.fo_fstype);
1466 fsfilt_put_ops(obd->obd_fsops);
1473 int filter_attach(struct obd_device *dev, obd_count len, void *data)
1475 struct lprocfs_static_vars lvars;
1476 struct lprocfs_counters* cntrs;
1479 lprocfs_init_vars(&lvars);
1480 rc = lprocfs_obd_attach(dev, lvars.obd_vars);
1484 rc = lprocfs_alloc_obd_counters(dev, LPROC_FILTER_LAST);
1488 /* Init obdfilter private counters here */
1489 cntrs = dev->counters;
1490 LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_READS],
1491 0, NULL, "read", "reqs");
1492 LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_READ_BYTES],
1493 LPROCFS_CNTR_AVGMINMAX,
1494 NULL, "read_bytes", "bytes");
1495 LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_WRITES],
1496 0, NULL, "write", "reqs");
1498 LPROCFS_COUNTER_INIT(&cntrs->cntr[LPROC_FILTER_WRITE_BYTES],
1499 LPROCFS_CNTR_AVGMINMAX,
1500 NULL, "write_bytes", "bytes");
1504 int filter_detach(struct obd_device *dev)
1506 lprocfs_free_obd_counters(dev);
1507 return lprocfs_obd_detach(dev);
1510 /* nearly identical to mds_connect */
1511 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1512 struct obd_uuid *cluuid)
1514 struct obd_export *exp;
1515 struct filter_export_data *fed;
1516 struct filter_client_data *fcd;
1517 struct filter_obd *filter = &obd->u.filter;
1522 if (!conn || !obd || !cluuid)
1525 rc = class_connect(conn, obd, cluuid);
1528 exp = class_conn2export(conn);
1531 fed = &exp->exp_filter_data;
1532 class_export_put(exp);
1534 INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
1535 spin_lock_init(&exp->exp_filter_data.fed_lock);
1537 if (!obd->obd_replayable)
1540 OBD_ALLOC(fcd, sizeof(*fcd));
1542 CERROR("filter: out of memory for client data\n");
1543 GOTO(out_export, rc = -ENOMEM);
1546 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1548 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1550 rc = filter_client_add(obd, filter, fed, -1);
1557 OBD_FREE(fcd, sizeof(*fcd));
1559 class_disconnect(conn, 0);
1564 static void filter_destroy_export(struct obd_export *exp)
1566 struct filter_export_data *fed = &exp->exp_filter_data;
1569 spin_lock(&fed->fed_lock);
1570 while (!list_empty(&fed->fed_open_head)) {
1571 struct filter_file_data *ffd;
1573 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1575 list_del(&ffd->ffd_export_list);
1576 spin_unlock(&fed->fed_lock);
1578 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1579 ffd->ffd_file->f_dentry->d_name.len,
1580 ffd->ffd_file->f_dentry->d_name.name,
1581 ffd, ffd->ffd_handle.h_cookie);
1583 filter_close_internal(exp, ffd, NULL, exp->exp_failover);
1584 spin_lock(&fed->fed_lock);
1586 spin_unlock(&fed->fed_lock);
1588 if (exp->exp_obd->obd_replayable)
1589 filter_client_free(exp, exp->exp_failover);
1593 /* also incredibly similar to mds_disconnect */
1594 static int filter_disconnect(struct lustre_handle *conn, int failover)
1596 struct obd_export *exp = class_conn2export(conn);
1598 unsigned long flags;
1602 ldlm_cancel_locks_for_export(exp);
1604 spin_lock_irqsave(&exp->exp_lock, flags);
1605 exp->exp_failover = failover;
1606 spin_unlock_irqrestore(&exp->exp_lock, flags);
1608 rc = class_disconnect(conn, failover);
1610 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1611 class_export_put(exp);
1612 /* XXX cleanup preallocated inodes */
1616 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1618 int type = oa->o_mode & S_IFMT;
1621 CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1622 inode->i_ino, inode, oa->o_id, valid);
1623 /* Don't copy the inode number in place of the object ID */
1624 obdo_from_inode(oa, inode, valid);
1625 oa->o_mode &= ~S_IFMT;
1628 if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1629 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1631 oa->o_valid |= OBD_MD_FLRDEV;
1637 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1638 struct obdo *oa, int locked,char *what)
1640 struct dentry *dentry = NULL;
1642 if (oa->o_valid & OBD_MD_FLHANDLE) {
1643 struct lustre_handle *ost_handle = obdo_handle(oa);
1644 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1647 dentry = dget(ffd->ffd_file->f_dentry);
1648 filter_ffd_put(ffd);
1653 struct obd_device *obd = class_conn2obd(conn);
1655 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1656 RETURN(ERR_PTR(-EINVAL));
1658 dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
1663 if (IS_ERR(dentry)) {
1664 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1668 if (!dentry->d_inode) {
1669 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1671 RETURN(ERR_PTR(-ENOENT));
1677 #define filter_oa2dentry(conn, oa, locked) __filter_oa2dentry(conn, oa, locked,\
1680 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1681 struct lov_stripe_md *md)
1683 struct dentry *dentry = NULL;
1687 XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
1689 dentry = filter_oa2dentry(conn, oa, 1);
1691 RETURN(PTR_ERR(dentry));
1693 filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1699 /* this is called from filter_truncate() until we have filter_punch() */
1700 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1701 struct lov_stripe_md *md, struct obd_trans_info *oti)
1703 struct obd_run_ctxt saved;
1704 struct obd_export *export = class_conn2export(conn);
1705 struct obd_device *obd = class_conn2obd(conn);
1706 struct filter_obd *filter = &obd->u.filter;
1707 struct dentry *dentry;
1709 struct inode *inode;
1714 XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
1716 dentry = filter_oa2dentry(conn, oa, 0);
1719 GOTO(out_exp, rc = PTR_ERR(dentry));
1721 iattr_from_obdo(&iattr, oa, oa->o_valid);
1722 iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1723 inode = dentry->d_inode;
1725 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1727 if (iattr.ia_valid & ATTR_SIZE)
1728 down(&inode->i_sem);
1730 handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1732 GOTO(out_unlock, rc = PTR_ERR(handle));
1734 if (inode->i_op->setattr)
1735 rc = inode->i_op->setattr(dentry, &iattr);
1737 rc = inode_setattr(inode, &iattr);
1738 rc = filter_finish_transno(export, handle, oti, rc);
1739 rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
1741 CERROR("error on commit, err = %d\n", rc2);
1746 if (iattr.ia_valid & ATTR_SIZE) {
1748 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1749 obdo_from_inode(oa, inode, oa->o_valid);
1754 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1758 class_export_put(export);
1762 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1763 struct lov_stripe_md *ea, struct obd_trans_info *oti,
1764 struct obd_client_handle *och)
1766 struct obd_export *export;
1767 struct lustre_handle *handle;
1768 struct filter_file_data *ffd;
1770 struct lustre_handle parent_lockh;
1774 export = class_conn2export(conn);
1776 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1778 GOTO(out, rc = -EINVAL);
1781 XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
1783 filp = filter_obj_open(export, oa->o_id, oa->o_mode,
1784 LCK_PR, &parent_lockh);
1786 GOTO(out, rc = PTR_ERR(filp));
1788 filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1790 ffd = filp->private_data;
1791 handle = obdo_handle(oa);
1792 handle->cookie = ffd->ffd_handle.h_cookie;
1793 oa->o_valid |= OBD_MD_FLHANDLE;
1796 class_export_put(export);
1798 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1799 sizeof(parent_lockh));
1800 oti->oti_ack_locks[0].mode = LCK_PR;
1805 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1806 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1808 struct obd_export *exp = class_conn2export(conn);
1809 struct filter_file_data *ffd;
1810 struct filter_export_data *fed;
1815 CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
1816 GOTO(out, rc = -EINVAL);
1819 XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
1821 if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1822 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1823 GOTO(out, rc = -EINVAL);
1826 ffd = filter_handle2ffd(obdo_handle(oa));
1828 CERROR("bad handle ("LPX64") for close\n",
1829 obdo_handle(oa)->cookie);
1830 GOTO(out, rc = -ESTALE);
1833 fed = &exp->exp_filter_data;
1834 spin_lock(&fed->fed_lock);
1835 list_del(&ffd->ffd_export_list);
1836 spin_unlock(&fed->fed_lock);
1838 rc = filter_close_internal(exp, ffd, oti, 0);
1839 filter_ffd_put(ffd);
1842 class_export_put(exp);
1846 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1847 struct lov_stripe_md **ea, struct obd_trans_info *oti)
1849 struct obd_export *export;
1850 struct obd_device *obd = class_conn2obd(conn);
1851 struct filter_obd *filter = &obd->u.filter;
1852 struct obd_run_ctxt saved;
1853 struct dentry *dir_dentry;
1854 struct lustre_handle parent_lockh;
1855 struct dentry *new = NULL;
1858 int err, rc, cleanup_phase;
1862 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1866 export = class_conn2export(conn);
1867 XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
1869 oa->o_id = filter_next_id(obd);
1871 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1874 dir_dentry = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1876 if (IS_ERR(dir_dentry))
1877 GOTO(cleanup, rc = PTR_ERR(dir_dentry));
1880 new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
1882 GOTO(cleanup, rc = PTR_ERR(new));
1886 /* This would only happen if lastobjid was bad on disk */
1887 CERROR("Serious error: objid %s already exists; is this "
1888 "filesystem corrupt? I will try to work around it.\n",
1889 filter_id(buf, filter, oa->o_id, oa->o_mode));
1891 ldlm_lock_decref(&parent_lockh, LCK_PW);
1892 oa->o_id = filter_next_id(obd);
1897 handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
1899 GOTO(cleanup, rc = PTR_ERR(handle));
1901 rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
1903 CERROR("create failed rc = %d\n", rc);
1905 rc = filter_finish_transno(export, handle, oti, rc);
1906 err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1908 CERROR("unable to write lastobjid but file created\n");
1912 err = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
1914 CERROR("error on commit, err = %d\n", err);
1922 /* Set flags for fields we have set in the inode struct */
1923 oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1924 OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1925 filter_from_inode(oa, new->d_inode, oa->o_valid);
1929 switch(cleanup_phase) {
1932 case 1: /* locked parent dentry */
1933 if (rc || oti == NULL) {
1934 ldlm_lock_decref(&parent_lockh, LCK_PW);
1936 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1937 sizeof(parent_lockh));
1938 oti->oti_ack_locks[0].mode = LCK_PW;
1941 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1942 class_export_put(export);
1945 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1952 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1953 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1955 struct obd_export *export;
1956 struct obd_device *obd = class_conn2obd(conn);
1957 struct filter_obd *filter = &obd->u.filter;
1958 struct dentry *dir_dentry, *object_dentry = NULL;
1959 struct filter_dentry_data *fdd;
1960 struct obd_run_ctxt saved;
1961 void *handle = NULL;
1962 struct lustre_handle parent_lockh;
1963 int rc, rc2, cleanup_phase = 0;
1967 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1971 export = class_conn2export(conn);
1972 XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
1974 CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1976 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1977 dir_dentry = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1978 LCK_PW, &parent_lockh);
1979 if (IS_ERR(dir_dentry))
1980 GOTO(cleanup, rc = PTR_ERR(dir_dentry));
1983 object_dentry = filter_oa2dentry(conn, oa, 0);
1984 if (IS_ERR(object_dentry))
1985 GOTO(cleanup, rc = -ENOENT);
1988 handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
1990 GOTO(cleanup, rc = PTR_ERR(handle));
1993 fdd = object_dentry->d_fsdata;
1994 if (fdd && atomic_read(&fdd->fdd_open_count)) {
1995 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1996 fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1997 /* XXX put into PENDING directory in case of crash */
1999 "defer destroy of %dx open objid "LPU64"\n",
2000 atomic_read(&fdd->fdd_open_count), oa->o_id);
2003 "repeat destroy of %dx open objid "LPU64"\n",
2004 atomic_read(&fdd->fdd_open_count), oa->o_id);
2005 GOTO(cleanup, rc = 0);
2008 rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
2011 switch(cleanup_phase) {
2013 rc = filter_finish_transno(export, handle, oti, rc);
2014 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle, 0);
2016 CERROR("error on commit, err = %d\n", rc2);
2021 f_dput(object_dentry);
2023 if (rc || oti == NULL) {
2024 ldlm_lock_decref(&parent_lockh, LCK_PW);
2026 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
2027 sizeof(parent_lockh));
2028 oti->oti_ack_locks[0].mode = LCK_PW;
2031 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
2032 class_export_put(export);
2035 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2042 /* NB start and end are used for punch, but not truncate */
2043 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
2044 struct lov_stripe_md *lsm,
2045 obd_off start, obd_off end,
2046 struct obd_trans_info *oti)
2051 XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
2053 if (end != OBD_OBJECT_EOF)
2054 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
2057 CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
2058 "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
2060 error = filter_setattr(conn, oa, NULL, oti);
2064 static inline void lustre_put_page(struct page *page)
2066 page_cache_release(page);
2069 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
2071 struct address_space *mapping = inode->i_mapping;
2073 unsigned long index = lnb->offset >> PAGE_SHIFT;
2076 page = grab_cache_page(mapping, index); /* locked page */
2078 return lnb->rc = PTR_ERR(page);
2082 if (inode->i_size < lnb->offset + lnb->len - 1)
2083 lnb->rc = inode->i_size - lnb->offset;
2087 if (PageUptodate(page)) {
2092 rc = mapping->a_ops->readpage(NULL, page);
2094 CERROR("page index %lu, rc = %d\n", index, rc);
2096 lustre_put_page(page);
2097 return lnb->rc = rc;
2103 static int filter_finish_page_read(struct niobuf_local *lnb)
2105 if (lnb->page == NULL)
2108 if (PageUptodate(lnb->page))
2111 wait_on_page(lnb->page);
2112 if (!PageUptodate(lnb->page)) {
2113 CERROR("page index %lu/offset "LPX64" not uptodate\n",
2114 lnb->page->index, lnb->offset);
2115 GOTO(err_page, lnb->rc = -EIO);
2117 if (PageError(lnb->page)) {
2118 CERROR("page index %lu/offset "LPX64" has error\n",
2119 lnb->page->index, lnb->offset);
2120 GOTO(err_page, lnb->rc = -EIO);
2126 lustre_put_page(lnb->page);
2131 static struct page *lustre_get_page_write(struct inode *inode,
2132 unsigned long index)
2134 struct address_space *mapping = inode->i_mapping;
2138 page = grab_cache_page(mapping, index); /* locked page */
2140 if (!IS_ERR(page)) {
2141 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
2142 * a no-op for most filesystems, because we write the whole
2143 * page. For partial-page I/O this will read in the page.
2145 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
2147 CERROR("page index %lu, rc = %d\n", index, rc);
2150 GOTO(err_unlock, rc);
2152 /* XXX not sure if we need this if we are overwriting page */
2153 if (PageError(page)) {
2154 CERROR("error on page index %lu, rc = %d\n", index, rc);
2156 GOTO(err_unlock, rc = -EIO);
2163 lustre_put_page(page);
2167 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2168 int waitfor_one_page(struct page *page)
2170 wait_on_page_locked(page);
2175 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2176 /* We should only change the file mtime (and not the ctime, like
2177 * update_inode_times() in generic_file_write()) when we only change data.
2179 static inline void inode_update_time(struct inode *inode, int ctime_too)
2181 time_t now = CURRENT_TIME;
2182 if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
2184 inode->i_mtime = now;
2186 inode->i_ctime = now;
2187 mark_inode_dirty_sync(inode);
2191 static int lustre_commit_write(struct niobuf_local *lnb)
2193 struct page *page = lnb->page;
2194 unsigned from = lnb->offset & ~PAGE_MASK;
2195 unsigned to = from + lnb->len;
2196 struct inode *inode = page->mapping->host;
2199 LASSERT(to <= PAGE_SIZE);
2200 err = page->mapping->a_ops->commit_write(NULL, page, from, to);
2201 if (!err && IS_SYNC(inode))
2202 waitfor_one_page(page);
2203 //SetPageUptodate(page); // the client commit_write will do this
2205 SetPageReferenced(page);
2207 lustre_put_page(page);
2211 int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
2214 unsigned long index = lnb->offset >> PAGE_SHIFT;
2215 struct address_space *mapping = inode->i_mapping;
2219 //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
2221 page = grab_cache_page_nowait(mapping, index); /* locked page */
2223 page = grab_cache_page(mapping, index); /* locked page */
2226 /* This page is currently locked, so get a temporary page instead. */
2229 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
2230 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
2232 CERROR("no memory for a temp page\n");
2233 GOTO(err, rc = -ENOMEM);
2235 POISON((void *)addr, 0xBA, PAGE_SIZE);
2236 page = virt_to_page(addr);
2237 page->index = index;
2239 lnb->flags |= N_LOCAL_TEMP_PAGE;
2240 } else if (!IS_ERR(page)) {
2243 rc = mapping->a_ops->prepare_write(NULL, page,
2244 lnb->offset & ~PAGE_MASK,
2248 CERROR("page index %lu, rc = %d\n", index, rc);
2249 GOTO(err_unlock, rc);
2251 /* XXX not sure if we need this if we are overwriting page */
2252 if (PageError(page)) {
2253 CERROR("error on page index %lu, rc = %d\n", index, rc);
2255 GOTO(err_unlock, rc = -EIO);
2264 lustre_put_page(page);
2266 return lnb->rc = rc;
2270 * We need to balance prepare_write() calls with commit_write() calls.
2271 * If the page has been prepared, but we have no data for it, we don't
2272 * want to overwrite valid data on disk, but we still need to zero out
2273 * data for space which was newly allocated. Like part of what happens
2274 * in __block_prepare_write() for newly allocated blocks.
2276 * XXX currently __block_prepare_write() creates buffers for all the
2277 * pages, and the filesystems mark these buffers as BH_New if they
2278 * were newly allocated from disk. We use the BH_New flag similarly.
2280 static int filter_commit_write(struct niobuf_local *lnb, int err)
2282 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2284 unsigned block_start, block_end;
2285 struct buffer_head *bh, *head = lnb->page->buffers;
2286 unsigned blocksize = head->b_size;
2288 /* debugging: just seeing if this ever happens */
2289 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
2290 "called for ino %lu:%lu on err %d\n",
2291 lnb->page->mapping->host->i_ino, lnb->page->index, err);
2293 /* Currently one buffer per page, but in the future... */
2294 for (bh = head, block_start = 0; bh != head || !block_start;
2295 block_start = block_end, bh = bh->b_this_page) {
2296 block_end = block_start + blocksize;
2297 if (buffer_new(bh)) {
2298 memset(kmap(lnb->page) + block_start, 0,
2305 return lustre_commit_write(lnb);
2308 static int filter_preprw(int cmd, struct obd_export *export,
2309 int objcount, struct obd_ioobj *obj,
2310 int niocount, struct niobuf_remote *nb,
2311 struct niobuf_local *res, void **desc_private,
2312 struct obd_trans_info *oti)
2314 struct obd_run_ctxt saved;
2315 struct obd_device *obd;
2316 struct obd_ioobj *o;
2317 struct niobuf_remote *rnb;
2318 struct niobuf_local *lnb;
2319 struct fsfilt_objinfo *fso;
2320 struct dentry *dentry;
2321 struct inode *inode;
2322 struct lprocfs_counters *cntrs;
2323 int pglocked = 0, rc = 0, i, j;
2327 if ((cmd & OBD_BRW_WRITE) != 0)
2328 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2330 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2332 memset(res, 0, niocount * sizeof(*res));
2334 obd = export->exp_obd;
2338 cntrs = obd->counters;
2339 if ((cmd & OBD_BRW_WRITE) != 0)
2340 LPROCFS_COUNTER_INCBY1(&cntrs->cntr[LPROC_FILTER_WRITES]);
2342 LPROCFS_COUNTER_INCBY1(&cntrs->cntr[LPROC_FILTER_READS]);
2344 // theoretically we support multi-obj BRW RPCs, but until then...
2345 LASSERT(objcount == 1);
2347 OBD_ALLOC(fso, objcount * sizeof(*fso));
2351 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2353 for (i = 0, o = obj; i < objcount; i++, o++) {
2354 struct filter_dentry_data *fdd;
2356 LASSERT(o->ioo_bufcnt);
2358 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2363 GOTO(out_objinfo, rc = PTR_ERR(dentry));
2365 fso[i].fso_dentry = dentry;
2366 fso[i].fso_bufcnt = o->ioo_bufcnt;
2368 if (!dentry->d_inode) {
2369 CERROR("trying to BRW to non-existent file "LPU64"\n",
2371 GOTO(out_objinfo, rc = -ENOENT);
2374 /* If we ever start to support mutli-object BRW RPCs, we will
2375 * need to get locks on mulitple inodes (in order) or use the
2376 * DLM to do the locking for us (and use the same locking in
2377 * filter_setattr() for truncate). That isn't all, because
2378 * there still exists the possibility of a truncate starting
2379 * a new transaction while holding the ext3 rwsem = write
2380 * while some writes (which have started their transactions
2381 * here) blocking on the ext3 rwsem = read => lock inversion.
2383 * The handling gets very ugly when dealing with locked pages.
2384 * It may be easier to just get rid of the locked page code
2385 * (which has problems of its own) and either discover we do
2386 * not need it anymore (i.e. it was a symptom of another bug)
2387 * or ensure we get the page locks in an appropriate order.
2389 if (cmd & OBD_BRW_WRITE)
2390 down(&dentry->d_inode->i_sem);
2391 fdd = dentry->d_fsdata;
2392 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2393 CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2397 if (cmd & OBD_BRW_WRITE) {
2398 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2400 if (IS_ERR(*desc_private)) {
2401 rc = PTR_ERR(*desc_private);
2402 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2403 "error starting transaction: rc = %d\n", rc);
2404 *desc_private = NULL;
2405 GOTO(out_objinfo, rc);
2409 for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
2410 dentry = fso[i].fso_dentry;
2411 inode = dentry->d_inode;
2413 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2415 lnb->dentry = dentry;
2417 lnb->dentry = dget(dentry);
2419 lnb->offset = rnb->offset;
2420 lnb->len = rnb->len;
2421 lnb->flags = rnb->flags;
2423 if (cmd & OBD_BRW_WRITE) {
2424 rc = filter_get_page_write(inode,lnb,&pglocked);
2426 XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
2428 LPROCFS_COUNTER_INCR(&cntrs->cntr[LPROC_FILTER_WRITE_BYTES], lnb->len);
2429 } else if (inode->i_size <= rnb->offset) {
2430 /* If there's no more data, abort early.
2431 * lnb->page == NULL and lnb->rc == 0, so it's
2432 * easy to detect later. */
2433 f_dput(lnb->dentry);
2437 rc = filter_start_page_read(inode, lnb);
2439 XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
2441 LPROCFS_COUNTER_INCR(&cntrs->cntr[LPROC_FILTER_READ_BYTES], lnb->len);
2445 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2446 "error on page @"LPU64"%u/%u: rc = %d\n",
2447 lnb->offset, j, o->ioo_bufcnt, rc);
2449 GOTO(out_pages, rc);
2452 if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
2453 /* Likewise with a partial read */
2459 while ((cmd & OBD_BRW_READ) && lnb-- > res) {
2460 rc = filter_finish_page_read(lnb);
2462 CERROR("error on page %u@"LPU64": rc = %d\n",
2463 lnb->len, lnb->offset, rc);
2464 f_dput(lnb->dentry);
2465 GOTO(out_pages, rc);
2470 OBD_FREE(fso, objcount * sizeof(*fso));
2471 current->journal_info = NULL;
2472 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2476 while (lnb-- > res) {
2477 if (cmd & OBD_BRW_WRITE) {
2478 filter_commit_write(lnb, rc);
2479 up(&lnb->dentry->d_inode->i_sem);
2481 lustre_put_page(lnb->page);
2483 f_dput(lnb->dentry);
2485 if (cmd & OBD_BRW_WRITE) {
2486 filter_finish_transno(export, *desc_private, oti, rc);
2488 filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2491 goto out; /* dropped the dentry refs already (one per page) */
2494 for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
2495 if (cmd & OBD_BRW_WRITE)
2496 up(&fso[i].fso_dentry->d_inode->i_sem);
2497 f_dput(fso[i].fso_dentry);
2502 static int filter_write_locked_page(struct niobuf_local *lnb)
2510 lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2511 if (IS_ERR(lpage)) {
2512 /* It is highly unlikely that we would ever get an error here.
2513 * The page we want to get was previously locked, so it had to
2514 * have already allocated the space, and we were just writing
2515 * over the same data, so there would be no hole in the file.
2517 * XXX: possibility of a race with truncate could exist, need
2518 * to check that. There are no guarantees w.r.t.
2519 * write order even on a local filesystem, although the
2520 * normal response would be to return the number of bytes
2521 * successfully written and leave the rest to the app.
2523 rc = PTR_ERR(lpage);
2524 CERROR("error getting locked page index %ld: rc = %d\n",
2525 lnb->page->index, rc);
2527 lustre_commit_write(lnb);
2531 /* 2 kmaps == vanishingly small deadlock opportunity */
2532 lpage_addr = kmap(lpage);
2533 lnb_addr = kmap(lnb->page);
2535 memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
2540 lustre_put_page(lnb->page);
2543 rc = lustre_commit_write(lnb);
2545 CERROR("error committing locked page %ld: rc = %d\n",
2546 lnb->page->index, rc);
2551 static int filter_syncfs(struct obd_export *exp)
2553 struct obd_device *obd = exp->exp_obd;
2556 XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
2558 RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2561 static int filter_commitrw(int cmd, struct obd_export *export,
2562 int objcount, struct obd_ioobj *obj,
2563 int niocount, struct niobuf_local *res,
2564 void *desc_private, struct obd_trans_info *oti)
2566 struct obd_run_ctxt saved;
2567 struct obd_ioobj *o;
2568 struct niobuf_local *lnb;
2569 struct obd_device *obd = export->exp_obd;
2570 int found_locked = 0, rc = 0, i;
2573 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2575 LASSERT(!current->journal_info);
2576 current->journal_info = desc_private;
2578 for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2581 if (cmd & OBD_BRW_WRITE) {
2582 inode_update_time(lnb->dentry->d_inode, 1);
2583 up(&lnb->dentry->d_inode->i_sem);
2585 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2586 if (lnb->page == NULL) {
2589 if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2594 if (cmd & OBD_BRW_WRITE) {
2595 int err = filter_commit_write(lnb, 0);
2600 lustre_put_page(lnb->page);
2603 f_dput(lnb->dentry);
2607 for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2610 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2612 if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2615 err = filter_write_locked_page(lnb);
2618 f_dput(lnb->dentry);
2623 if (cmd & OBD_BRW_WRITE) {
2624 /* We just want any dentry for the commit, for now */
2625 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0);
2628 rc = filter_finish_transno(export, desc_private, oti, rc);
2629 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private,
2633 if (obd_sync_filter)
2634 LASSERT(oti->oti_transno <= obd->obd_last_committed);
2638 LASSERT(!current->journal_info);
2640 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2644 static int filter_brw(int cmd, struct lustre_handle *conn,
2645 struct lov_stripe_md *lsm, obd_count oa_bufs,
2646 struct brw_page *pga, struct obd_trans_info *oti)
2648 struct obd_export *export = class_conn2export(conn);
2649 struct obd_ioobj ioo;
2650 struct niobuf_local *lnb;
2651 struct niobuf_remote *rnb;
2660 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2661 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2663 if (lnb == NULL || rnb == NULL)
2664 GOTO(out, ret = -ENOMEM);
2666 for (i = 0; i < oa_bufs; i++) {
2667 rnb[i].offset = pga[i].off;
2668 rnb[i].len = pga[i].count;
2671 ioo.ioo_id = lsm->lsm_object_id;
2673 ioo.ioo_type = S_IFREG;
2674 ioo.ioo_bufcnt = oa_bufs;
2676 ret = filter_preprw(cmd, export, 1, &ioo, oa_bufs, rnb, lnb,
2677 &desc_private, oti);
2681 for (i = 0; i < oa_bufs; i++) {
2682 void *virt = kmap(pga[i].pg);
2683 obd_off off = pga[i].off & ~PAGE_MASK;
2684 void *addr = kmap(lnb[i].page);
2686 /* 2 kmaps == vanishingly small deadlock opportunity */
2688 if (cmd & OBD_BRW_WRITE)
2689 memcpy(addr + off, virt + off, pga[i].count);
2691 memcpy(virt + off, addr + off, pga[i].count);
2697 ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
2702 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2704 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2705 class_export_put(export);
2709 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2710 int objcount, struct obd_ioobj *obj,
2711 int niocount, struct niobuf_remote *nb)
2713 struct obd_device *obd;
2714 struct obd_ioobj *o = obj;
2715 struct niobuf_remote *rnb = nb;
2720 if ((cmd & OBD_BRW_WRITE) != 0)
2721 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2723 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2725 obd = class_conn2obd(conn);
2727 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2732 for (i = 0; i < objcount; i++, o++) {
2733 struct dentry *dentry;
2734 struct inode *inode;
2735 int (*fs_bmap)(struct address_space *, long);
2738 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2742 GOTO(out, rc = PTR_ERR(dentry));
2743 inode = dentry->d_inode;
2745 CERROR("trying to BRW to non-existent file "LPU64"\n",
2748 GOTO(out, rc = -ENOENT);
2750 fs_bmap = inode->i_mapping->a_ops->bmap;
2752 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2755 block = rnb->offset >> inode->i_blkbits;
2757 if (cmd == OBD_BRW_READ) {
2758 block = fs_bmap(inode->i_mapping, block);
2760 loff_t newsize = rnb->offset + rnb->len;
2761 /* fs_prep_san_write will also update inode
2763 * (1) new alloced block
2764 * (2) existed block but size extented
2766 /* FIXME We could call fs_prep_san_write()
2767 * only once for all the blocks allocation.
2768 * Now call it once for each block, for
2769 * simplicity. And if error happens, we
2770 * probably need to release previous alloced
2772 rc = fs_prep_san_write(obd, inode, &block,
2778 rnb->offset = block;
2786 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2788 struct obd_device *obd;
2791 obd = class_conn2obd(conn);
2793 XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
2795 RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2798 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2799 void *key, __u32 *vallen, void *val)
2801 struct obd_device *obd;
2804 obd = class_conn2obd(conn);
2806 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2811 if (keylen == strlen("blocksize") &&
2812 memcmp(key, "blocksize", keylen) == 0) {
2813 __u32 *blocksize = val;
2814 *vallen = sizeof(*blocksize);
2815 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2819 if (keylen == strlen("blocksize_bits") &&
2820 memcmp(key, "blocksize_bits", keylen) == 0) {
2821 __u32 *blocksize_bits = val;
2822 *vallen = sizeof(*blocksize_bits);
2823 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2827 CDEBUG(D_IOCTL, "invalid key\n");
2831 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2832 struct lustre_handle *src_conn, struct obdo *src,
2833 obd_size count, obd_off offset, struct obd_trans_info *oti)
2836 struct lov_stripe_md srcmd, dstmd;
2837 unsigned long index = 0;
2840 LBUG(); /* THIS CODE IS NOT CORRECT -phil */
2842 memset(&srcmd, 0, sizeof(srcmd));
2843 memset(&dstmd, 0, sizeof(dstmd));
2844 srcmd.lsm_object_id = src->o_id;
2845 dstmd.lsm_object_id = dst->o_id;
2848 CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2849 ", dst: ino "LPU64"\n",
2850 src->o_id, src->o_blocks, src->o_size, dst->o_id);
2851 page = alloc_page(GFP_USER);
2857 /* XXX with brw vector I/O, we could batch up reads and writes here,
2858 * all we need to do is allocate multiple pages to handle the I/Os
2859 * and arrays to handle the request parameters.
2861 while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2865 pg.count = PAGE_SIZE;
2866 pg.off = (page->index) << PAGE_SHIFT;
2869 page->index = index;
2870 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
2876 pg.flag = OBD_BRW_CREATE;
2877 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2879 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
2881 /* XXX should handle dst->o_size, dst->o_blocks here */
2887 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2891 dst->o_size = src->o_size;
2892 dst->o_blocks = src->o_blocks;
2893 dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2900 static struct obd_ops filter_obd_ops = {
2901 o_owner: THIS_MODULE,
2902 o_attach: filter_attach,
2903 o_detach: filter_detach,
2904 o_get_info: filter_get_info,
2905 o_setup: filter_setup,
2906 o_cleanup: filter_cleanup,
2907 o_connect: filter_connect,
2908 o_disconnect: filter_disconnect,
2909 o_statfs: filter_statfs,
2910 o_syncfs: filter_syncfs,
2911 o_getattr: filter_getattr,
2912 o_create: filter_create,
2913 o_setattr: filter_setattr,
2914 o_destroy: filter_destroy,
2915 o_open: filter_open,
2916 o_close: filter_close,
2918 o_punch: filter_truncate,
2919 o_preprw: filter_preprw,
2920 o_commitrw: filter_commitrw,
2921 o_destroy_export: filter_destroy_export,
2923 o_san_preprw: filter_san_preprw,
2924 o_preallocate: filter_preallocate_inodes,
2925 o_migrate: filter_migrate,
2926 o_copy: filter_copy_data,
2927 o_iterate: filter_iterate
2931 static struct obd_ops filter_sanobd_ops = {
2932 o_owner: THIS_MODULE,
2933 o_attach: filter_attach,
2934 o_detach: filter_detach,
2935 o_get_info: filter_get_info,
2936 o_setup: filter_san_setup,
2937 o_cleanup: filter_cleanup,
2938 o_connect: filter_connect,
2939 o_disconnect: filter_disconnect,
2940 o_statfs: filter_statfs,
2941 o_getattr: filter_getattr,
2942 o_create: filter_create,
2943 o_setattr: filter_setattr,
2944 o_destroy: filter_destroy,
2945 o_open: filter_open,
2946 o_close: filter_close,
2948 o_punch: filter_truncate,
2949 o_preprw: filter_preprw,
2950 o_commitrw: filter_commitrw,
2951 o_san_preprw: filter_san_preprw,
2952 o_destroy_export: filter_destroy_export
2954 o_preallocate: filter_preallocate_inodes,
2955 o_migrate: filter_migrate,
2956 o_copy: filter_copy_data,
2957 o_iterate: filter_iterate
2962 static int __init obdfilter_init(void)
2964 struct lprocfs_static_vars lvars;
2967 printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2969 xprocfs_init ("filter");
2970 lprocfs_init_vars(&lvars);
2972 rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2973 OBD_FILTER_DEVICENAME);
2977 rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2978 OBD_FILTER_SAN_DEVICENAME);
2980 class_unregister_type(OBD_FILTER_DEVICENAME);
2984 static void __exit obdfilter_exit(void)
2986 class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2987 class_unregister_type(OBD_FILTER_DEVICENAME);
2991 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2992 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2993 MODULE_LICENSE("GPL");
2995 module_init(obdfilter_init);
2996 module_exit(obdfilter_exit);