1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28 * (which need to get journal_lock, may block if journal full).
30 * Invariant: Call filter_start_transno() before any journal ops to avoid the
31 * same deadlock problem. We can (and want) to get rid of the
32 * transno sem in favour of the dir/inode i_sem to avoid single
33 * threaded operation on the OST.
37 #define DEBUG_SUBSYSTEM S_FILTER
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
56 static kmem_cache_t *filter_open_cache;
57 static kmem_cache_t *filter_dentry_cache;
59 /* should be generic per-obd stats... */
60 struct xprocfs_io_stat {
65 __u64 st_getattr_reqs;
66 __u64 st_setattr_reqs;
68 __u64 st_destroy_reqs;
76 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
77 static struct proc_dir_entry *xprocfs_dir;
79 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count) \
81 xprocfs_iostats[smp_processor_id()].field += (count); \
84 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
85 #define DECLARE_XPROCFS_SUM_STAT(field) \
87 xprocfs_sum_##field (void) \
92 for (i = 0; i < smp_num_cpus; i++) \
93 stat += xprocfs_iostats[i].field; \
97 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
98 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
99 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
100 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
101 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
102 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
103 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
104 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
105 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
106 DECLARE_XPROCFS_SUM_STAT (st_syncfs_reqs)
107 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
108 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
109 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
113 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
114 int *eof, void *data)
116 long long (*fn)(void) = (long long(*)(void))data;
123 len = snprintf (page, count, "%Ld\n", fn());
130 xprocfs_add_stat(char *name, long long (*fn)(void))
132 struct proc_dir_entry *entry;
134 entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
136 CERROR ("Can't add procfs stat %s\n", name);
141 entry->read_proc = xprocfs_rd_stat;
142 entry->write_proc = NULL;
146 xprocfs_init (char *name)
150 snprintf (dirname, sizeof (dirname), "sys/%s", name);
152 xprocfs_dir = proc_mkdir ("sys/obdfilter", NULL);
153 if (xprocfs_dir == NULL) {
154 CERROR ("Can't make dir\n");
158 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
159 xprocfs_add_stat ("read_bytes", xprocfs_sum_st_read_bytes);
160 xprocfs_add_stat ("read_reqs", xprocfs_sum_st_read_reqs);
161 xprocfs_add_stat ("write_bytes", xprocfs_sum_st_write_bytes);
162 xprocfs_add_stat ("write_reqs", xprocfs_sum_st_write_reqs);
163 xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
164 xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
165 xprocfs_add_stat ("create_reqs", xprocfs_sum_st_create_reqs);
166 xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
167 xprocfs_add_stat ("statfs_reqs", xprocfs_sum_st_statfs_reqs);
168 xprocfs_add_stat ("syncfs_reqs", xprocfs_sum_st_syncfs_reqs);
169 xprocfs_add_stat ("open_reqs", xprocfs_sum_st_open_reqs);
170 xprocfs_add_stat ("close_reqs", xprocfs_sum_st_close_reqs);
171 xprocfs_add_stat ("punch_reqs", xprocfs_sum_st_punch_reqs);
175 void xprocfs_fini (void)
177 if (xprocfs_dir == NULL)
180 remove_proc_entry ("read_bytes", xprocfs_dir);
181 remove_proc_entry ("read_reqs", xprocfs_dir);
182 remove_proc_entry ("write_bytes", xprocfs_dir);
183 remove_proc_entry ("write_reqs", xprocfs_dir);
184 remove_proc_entry ("getattr_reqs", xprocfs_dir);
185 remove_proc_entry ("setattr_reqs", xprocfs_dir);
186 remove_proc_entry ("create_reqs", xprocfs_dir);
187 remove_proc_entry ("destroy_reqs", xprocfs_dir);
188 remove_proc_entry ("statfs_reqs", xprocfs_dir);
189 remove_proc_entry ("syncfs_reqs", xprocfs_dir);
190 remove_proc_entry ("open_reqs", xprocfs_dir);
191 remove_proc_entry ("close_reqs", xprocfs_dir);
192 remove_proc_entry ("punch_reqs", xprocfs_dir);
194 remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
199 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
201 [S_IFREG >> S_SHIFT] "R",
202 [S_IFDIR >> S_SHIFT] "D",
203 [S_IFCHR >> S_SHIFT] "C",
204 [S_IFBLK >> S_SHIFT] "B",
205 [S_IFIFO >> S_SHIFT] "F",
206 [S_IFSOCK >> S_SHIFT] "S",
207 [S_IFLNK >> S_SHIFT] "L"
210 static inline const char *obd_mode_to_type(int mode)
212 return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
215 static void filter_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd,
218 CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
220 if (!error && last_rcvd > obd->obd_last_committed)
221 obd->obd_last_committed = last_rcvd;
224 void filter_start_transno(struct obd_export *export)
226 #ifdef FILTER_TRANSNO_SEM
227 struct obd_device * obd = export->exp_obd;
230 down(&obd->u.filter.fo_transno_sem);
234 /* Assumes caller has already pushed us into the kernel context. */
235 int filter_finish_transno(struct obd_export *export, void *handle,
236 struct obd_trans_info *oti, int rc)
239 struct obd_device *obd = export->exp_obd;
240 struct filter_obd *filter = &obd->u.filter;
241 struct filter_export_data *fed = &export->exp_filter_data;
242 struct filter_client_data *fcd = fed->fed_fcd;
246 /* Propagate error code. */
248 #ifdef FILTER_TRANSNO_SEM
249 up(&filter->fo_transno_sem);
254 if (!(obd->obd_flags & OBD_REPLAYABLE)) {
258 /* we don't allocate new transnos for replayed requests */
260 /* perhaps if transno already set? or should level be in oti? */
261 if (req->rq_level == LUSTRE_CONN_RECOVD)
265 off = fed->fed_lr_off;
267 #ifndef FILTER_TRANSNO_SEM
268 spin_lock(&filter->fo_translock);
270 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
271 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
272 #ifndef FILTER_TRANSNO_SEM
273 spin_unlock(&filter->fo_translock);
276 oti->oti_transno = last_rcvd;
277 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
278 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
280 /* get this from oti */
283 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
286 fcd->fcd_last_xid = 0;
288 fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_last_rcvd_cb);
289 written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
291 CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
292 LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
294 #ifdef FILTER_TRANSNO_SEM
295 up(&filter->fo_transno_sem);
297 if (written == sizeof(*fcd))
299 CERROR("error writing to last_rcvd file: rc = %d\n", written);
306 /* write the pathname into the string */
307 static char *filter_id(char *buf, struct filter_obd *filter, obd_id id,
310 if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
311 sprintf(buf, "O/%s/"LPU64, obd_mode_to_type(mode), id);
313 sprintf(buf, "O/%s/d%d/"LPU64, obd_mode_to_type(mode),
314 (int)id & (filter->fo_subdir_count - 1), id);
319 static inline void f_dput(struct dentry *dentry)
321 /* Can't go inside filter_ddelete because it can block */
322 CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
323 dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
324 LASSERT(atomic_read(&dentry->d_count) > 0);
329 /* Not racy w.r.t. others, because we are the only user of this dentry */
330 static void filter_drelease(struct dentry *dentry)
332 if (dentry->d_fsdata)
333 kmem_cache_free(filter_dentry_cache, dentry->d_fsdata);
336 struct dentry_operations filter_dops = {
337 .d_release = filter_drelease,
340 #define LAST_RCVD "last_rcvd"
343 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
344 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
345 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
347 /* Add client data to the FILTER. We use a bitmap to locate a free space
348 * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
349 * Otherwise, we have just read the data from the last_rcvd file and
350 * we know its offset.
352 int filter_client_add(struct filter_obd *filter,
353 struct filter_export_data *fed, int cl_idx)
355 int new_client = (cl_idx == -1);
357 LASSERT(filter->fo_last_rcvd_slots != NULL);
359 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
360 * there's no need for extra complication here
363 cl_idx = find_first_zero_bit(filter->fo_last_rcvd_slots,
364 FILTER_LR_MAX_CLIENTS);
366 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
367 CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
370 if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
371 CERROR("FILTER client %d: found bit is set in bitmap\n",
373 cl_idx = find_next_zero_bit(filter->fo_last_rcvd_slots,
374 FILTER_LR_MAX_CLIENTS,
379 if (test_and_set_bit(cl_idx, filter->fo_last_rcvd_slots)) {
380 CERROR("FILTER client %d: bit already set in bitmap!\n",
386 fed->fed_lr_idx = cl_idx;
387 fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
388 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
390 CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
391 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
394 struct obd_run_ctxt saved;
395 loff_t off = fed->fed_lr_off;
398 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
399 fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
401 push_ctxt(&saved, &filter->fo_ctxt, NULL);
402 written = lustre_fwrite(filter->fo_rcvd_filp,
403 (char *)fed->fed_fcd,
404 sizeof(*fed->fed_fcd), &off);
405 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
407 if (written != sizeof(*fed->fed_fcd)) {
416 int filter_client_free(struct obd_export *exp)
418 struct filter_export_data *fed = &exp->exp_filter_data;
419 struct filter_obd *filter = &exp->exp_obd->u.filter;
420 struct filter_client_data zero_fcd;
421 struct obd_run_ctxt saved;
428 LASSERT(filter->fo_last_rcvd_slots != NULL);
430 off = fed->fed_lr_off;
432 CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
433 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
435 if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
436 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
441 memset(&zero_fcd, 0, sizeof zero_fcd);
442 push_ctxt(&saved, &filter->fo_ctxt, NULL);
443 written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
444 sizeof(zero_fcd), &off);
446 /* XXX: this write gets lost sometimes, unless this sync is here. */
447 file_fsync(filter->fo_rcvd_filp, filter->fo_rcvd_filp->f_dentry, 1);
448 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
450 if (written != sizeof(zero_fcd)) {
451 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
452 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
456 "zeroed disconnecting client %s at idx %u (%llu)\n",
457 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
460 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
465 static int filter_free_server_data(struct filter_obd *filter)
467 OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
468 filter->fo_fsd = NULL;
469 OBD_FREE(filter->fo_last_rcvd_slots,
470 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
471 filter->fo_last_rcvd_slots = NULL;
476 /* assumes caller is already in kernel ctxt */
477 static int filter_update_server_data(struct file *filp,
478 struct filter_server_data *fsd)
483 CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
484 CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
485 le64_to_cpu(fsd->fsd_last_objid));
486 CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
487 le64_to_cpu(fsd->fsd_last_rcvd));
488 CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
489 le64_to_cpu(fsd->fsd_mount_count));
491 rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
492 if (rc != sizeof(*fsd)) {
493 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
500 /* assumes caller has already in kernel ctxt */
501 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
502 __u64 init_lastobjid)
504 struct filter_obd *filter = &obd->u.filter;
505 struct filter_server_data *fsd;
506 struct filter_client_data *fcd = NULL;
507 struct inode *inode = filp->f_dentry->d_inode;
508 unsigned long last_rcvd_size = inode->i_size;
514 /* ensure padding in the struct is the correct size */
515 LASSERT (offsetof(struct filter_server_data, fsd_padding) +
516 sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
517 LASSERT (offsetof(struct filter_client_data, fcd_padding) +
518 sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
520 OBD_ALLOC(fsd, sizeof(*fsd));
523 filter->fo_fsd = fsd;
525 OBD_ALLOC(filter->fo_last_rcvd_slots,
526 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
527 if (filter->fo_last_rcvd_slots == NULL) {
528 OBD_FREE(fsd, sizeof(*fsd));
532 if (last_rcvd_size == 0) {
533 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
535 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
536 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
537 fsd->fsd_last_rcvd = 0;
538 mount_count = fsd->fsd_mount_count = 0;
539 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
540 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
541 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
542 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
543 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
545 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
547 if (retval != sizeof(*fsd)) {
548 CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
549 GOTO(out, rc = -EIO);
551 mount_count = le64_to_cpu(fsd->fsd_mount_count);
552 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
555 if (fsd->fsd_feature_incompat) {
556 CERROR("unsupported feature %x\n",
557 le32_to_cpu(fsd->fsd_feature_incompat));
560 if (fsd->fsd_feature_rocompat) {
561 CERROR("read-only feature %x\n",
562 le32_to_cpu(fsd->fsd_feature_rocompat));
563 /* Do something like remount filesystem read-only */
567 CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
568 obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
569 CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
570 obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
571 CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
572 obd->obd_name, mount_count);
573 CDEBUG(D_INODE, "%s: server data size: %u\n",
574 obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
575 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
576 obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
577 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
578 obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
579 CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
580 obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
583 * When we do a clean FILTER shutdown, we save the last_rcvd into
584 * the header. If we find clients with higher last_rcvd values
585 * then those clients may need recovery done.
587 if (obd->obd_flags & OBD_REPLAYABLE) {
588 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
593 OBD_ALLOC(fcd, sizeof(*fcd));
595 GOTO(err_fsd, rc = -ENOMEM);
598 /* Don't assume off is incremented properly, in case
599 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
601 off = le32_to_cpu(fsd->fsd_client_start) +
602 cl_idx * le16_to_cpu(fsd->fsd_client_size);
603 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
604 if (rc != sizeof(*fcd)) {
605 CERROR("error reading FILTER %s offset %d: rc = %d\n",
606 LAST_RCVD, cl_idx, rc);
607 if (rc > 0) /* XXX fatal error or just abort reading? */
612 if (fcd->fcd_uuid[0] == '\0') {
613 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
618 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
620 /* These exports are cleaned up by filter_disconnect(), so they
621 * need to be set up like real exports as filter_connect() does.
623 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
624 if (mount_age < FILTER_MOUNT_RECOV) {
625 struct obd_export *exp = class_new_export(obd);
626 struct filter_export_data *fed;
627 CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
628 " srv lr: "LPU64" mnt: "LPU64" last mount: "
629 LPU64"\n", fcd->fcd_uuid, cl_idx,
630 last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
631 le64_to_cpu(fcd->fcd_mount_count), mount_count);
632 /* disabled until OST recovery is actually working */
638 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
639 sizeof exp->exp_client_uuid.uuid);
640 fed = &exp->exp_filter_data;
642 filter_client_add(filter, fed, cl_idx);
643 /* create helper if export init gets more complex */
644 INIT_LIST_HEAD(&fed->fed_open_head);
645 spin_lock_init(&fed->fed_lock);
648 obd->obd_recoverable_clients++;
651 "discarded client %d UUID '%s' count "LPU64"\n",
652 cl_idx, fcd->fcd_uuid,
653 le64_to_cpu(fcd->fcd_mount_count));
656 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
659 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
660 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
663 obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
664 if (obd->obd_recoverable_clients) {
665 CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
666 obd->obd_recoverable_clients,
667 le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
668 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
669 obd->obd_flags |= OBD_RECOVERING;
673 OBD_FREE(fcd, sizeof(*fcd));
676 CERROR("%s: recovery support OFF\n", obd->obd_name);
679 fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
681 /* save it,so mount count and last_recvd is current */
682 rc = filter_update_server_data(filp, filter->fo_fsd);
688 filter_free_server_data(filter);
692 /* setup the object store with correct subdirectories */
693 static int filter_prep(struct obd_device *obd)
695 struct obd_run_ctxt saved;
696 struct filter_obd *filter = &obd->u.filter;
697 struct dentry *dentry, *O_dentry;
704 push_ctxt(&saved, &filter->fo_ctxt, NULL);
705 dentry = simple_mkdir(current->fs->pwd, "O", 0700);
706 CDEBUG(D_INODE, "got/created O: %p\n", dentry);
707 if (IS_ERR(dentry)) {
708 rc = PTR_ERR(dentry);
709 CERROR("cannot open/create O: rc = %d\n", rc);
712 filter->fo_dentry_O = dentry;
715 * Create directories and/or get dentries for each object type.
716 * This saves us from having to do multiple lookups for each one.
718 O_dentry = filter->fo_dentry_O;
719 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
720 char *name = obd_type_by_mode[mode];
723 filter->fo_dentry_O_mode[mode] = NULL;
726 dentry = simple_mkdir(O_dentry, name, 0700);
727 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
728 if (IS_ERR(dentry)) {
729 rc = PTR_ERR(dentry);
730 CERROR("cannot create O/%s: rc = %d\n", name, rc);
731 GOTO(err_O_mode, rc);
733 filter->fo_dentry_O_mode[mode] = dentry;
736 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
737 if (!file || IS_ERR(file)) {
739 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
741 GOTO(err_O_mode, rc);
744 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
745 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
746 file->f_dentry->d_inode->i_mode);
747 GOTO(err_filp, rc = -ENOENT);
750 rc = fsfilt_journal_data(obd, file);
752 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
755 /* steal operations */
756 inode = file->f_dentry->d_inode;
757 filter->fo_fop = file->f_op;
758 filter->fo_iop = inode->i_op;
759 filter->fo_aops = inode->i_mapping->a_ops;
761 rc = filter_init_server_data(obd, file, INIT_OBJID);
763 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
764 GOTO(err_client, rc);
766 filter->fo_rcvd_filp = file;
768 if (filter->fo_subdir_count) {
769 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
770 OBD_ALLOC(filter->fo_dentry_O_sub,
771 FILTER_SUBDIR_COUNT * sizeof(dentry));
772 if (!filter->fo_dentry_O_sub)
773 GOTO(err_client, rc = -ENOMEM);
775 for (i = 0; i < filter->fo_subdir_count; i++) {
777 snprintf(dir, sizeof(dir), "d%u", i);
779 dentry = simple_mkdir(O_dentry, dir, 0700);
780 CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
781 if (IS_ERR(dentry)) {
782 rc = PTR_ERR(dentry);
783 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
786 filter->fo_dentry_O_sub[i] = dentry;
791 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
797 struct dentry *dentry = filter->fo_dentry_O_sub[i];
800 filter->fo_dentry_O_sub[i] = NULL;
803 OBD_FREE(filter->fo_dentry_O_sub,
804 filter->fo_subdir_count * sizeof(dentry));
806 class_disconnect_all(obd);
808 if (filp_close(file, 0))
809 CERROR("can't close %s after error\n", LAST_RCVD);
810 filter->fo_rcvd_filp = NULL;
813 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
816 filter->fo_dentry_O_mode[mode] = NULL;
819 f_dput(filter->fo_dentry_O);
820 filter->fo_dentry_O = NULL;
824 /* cleanup the filter: write last used object id to status file */
825 static void filter_post(struct obd_device *obd)
827 struct obd_run_ctxt saved;
828 struct filter_obd *filter = &obd->u.filter;
832 /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
833 * best to start a transaction with h_sync, because we removed this
836 push_ctxt(&saved, &filter->fo_ctxt, NULL);
837 rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
839 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
842 if (filter->fo_rcvd_filp) {
843 rc = file_fsync(filter->fo_rcvd_filp,
844 filter->fo_rcvd_filp->f_dentry, 1);
845 filp_close(filter->fo_rcvd_filp, 0);
846 filter->fo_rcvd_filp = NULL;
848 CERROR("last_rcvd file won't closed rc = %ld\n", rc);
851 if (filter->fo_subdir_count) {
853 for (i = 0; i < filter->fo_subdir_count; i++) {
854 struct dentry *dentry = filter->fo_dentry_O_sub[i];
856 filter->fo_dentry_O_sub[i] = NULL;
858 OBD_FREE(filter->fo_dentry_O_sub,
859 filter->fo_subdir_count *
860 sizeof(*filter->fo_dentry_O_sub));
862 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
863 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
866 filter->fo_dentry_O_mode[mode] = NULL;
869 f_dput(filter->fo_dentry_O);
870 filter_free_server_data(filter);
871 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
875 static __u64 filter_next_id(struct obd_device *obd)
878 LASSERT(obd->u.filter.fo_fsd != NULL);
880 spin_lock(&obd->u.filter.fo_objidlock);
881 id = le64_to_cpu(obd->u.filter.fo_fsd->fsd_last_objid);
882 obd->u.filter.fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
883 spin_unlock(&obd->u.filter.fo_objidlock);
888 /* how to get files, dentries, inodes from object id's */
889 /* parent i_sem is already held if needed for exclusivity */
890 static struct dentry *filter_fid2dentry(struct obd_device *obd,
891 struct dentry *dparent,
892 __u64 id, int lockit)
894 struct super_block *sb = obd->u.filter.fo_sb;
895 struct dentry *dchild;
900 if (!sb || !sb->s_dev) {
901 CERROR("fatal: device not initialized.\n");
902 RETURN(ERR_PTR(-ENXIO));
906 CERROR("fatal: invalid object id 0\n");
908 RETURN(ERR_PTR(-ESTALE));
911 len = sprintf(name, LPU64, id);
912 CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
913 dparent->d_name.len, dparent->d_name.name, name);
915 down(&dparent->d_inode->i_sem);
916 dchild = lookup_one_len(name, dparent, len);
918 up(&dparent->d_inode->i_sem);
919 if (IS_ERR(dchild)) {
920 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
924 CDEBUG(D_INODE, "got child obj O/%*s/%s: %p, count = %d\n",
925 dparent->d_name.len, dparent->d_name.name, name, dchild,
926 atomic_read(&dchild->d_count));
928 LASSERT(atomic_read(&dchild->d_count) > 0);
933 static inline struct dentry *filter_parent(struct obd_device *obd,
934 obd_mode mode, obd_id objid)
936 struct filter_obd *filter = &obd->u.filter;
938 LASSERT((mode & S_IFMT) == S_IFREG); /* only regular files for now */
939 if ((mode & S_IFMT) != S_IFREG || filter->fo_subdir_count == 0)
940 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
942 return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
945 static struct file *filter_obj_open(struct obd_export *export,
946 __u64 id, __u32 type)
948 struct filter_obd *filter = &export->exp_obd->u.filter;
949 struct super_block *sb = filter->fo_sb;
950 struct dentry *dentry;
951 struct filter_export_data *fed = &export->exp_filter_data;
952 struct filter_dentry_data *fdd;
953 struct filter_file_data *ffd;
954 struct obd_run_ctxt saved;
959 if (!sb || !sb->s_dev) {
960 CERROR("fatal: device not initialized.\n");
961 RETURN(ERR_PTR(-ENXIO));
965 CERROR("fatal: invalid obdo "LPU64"\n", id);
966 RETURN(ERR_PTR(-ESTALE));
969 if (!(type & S_IFMT)) {
970 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
971 __FUNCTION__, id, type);
972 RETURN(ERR_PTR(-EINVAL));
975 PORTAL_SLAB_ALLOC(ffd, filter_open_cache, sizeof(*ffd));
977 CERROR("obdfilter: out of memory\n");
978 RETURN(ERR_PTR(-ENOMEM));
981 /* We preallocate this to avoid blocking while holding fo_fddlock */
982 fdd = kmem_cache_alloc(filter_dentry_cache, SLAB_KERNEL);
984 CERROR("obdfilter: out of memory\n");
985 GOTO(out_ffd, file = ERR_PTR(-ENOMEM));
988 push_ctxt(&saved, &filter->fo_ctxt, NULL);
989 file = filp_open(filter_id(name, filter, id, type),
990 O_RDWR | O_LARGEFILE, type);
991 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
994 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
998 dentry = file->f_dentry;
999 spin_lock(&filter->fo_fddlock);
1000 if (dentry->d_fsdata) {
1001 spin_unlock(&filter->fo_fddlock);
1002 kmem_cache_free(filter_dentry_cache, fdd);
1003 fdd = dentry->d_fsdata;
1004 LASSERT(kmem_cache_validate(filter_dentry_cache, fdd));
1005 /* should only happen during client recovery */
1006 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1007 CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1008 atomic_inc(&fdd->fdd_open_count);
1010 atomic_set(&fdd->fdd_open_count, 1);
1012 fdd->fdd_objid = id;
1013 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1014 dentry->d_fsdata = fdd;
1015 spin_unlock(&filter->fo_fddlock);
1018 get_random_bytes(&ffd->ffd_servercookie, sizeof(ffd->ffd_servercookie));
1019 ffd->ffd_file = file;
1020 LASSERT(file->private_data == NULL);
1021 file->private_data = ffd;
1024 dentry->d_op = &filter_dops;
1026 LASSERT(dentry->d_op == &filter_dops);
1028 spin_lock(&fed->fed_lock);
1029 list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1030 spin_unlock(&fed->fed_lock);
1032 CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1038 kmem_cache_free(filter_dentry_cache, fdd);
1040 ffd->ffd_servercookie = DEAD_HANDLE_MAGIC;
1041 PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1045 /* Caller must hold i_sem on dir_dentry->d_inode */
1046 /* Caller must push us into kernel context */
1047 static int filter_destroy_internal(struct obd_device *obd,
1048 struct dentry *dir_dentry,
1049 struct dentry *object_dentry)
1051 struct inode *inode = object_dentry->d_inode;
1055 if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1056 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1057 object_dentry->d_name.len,
1058 object_dentry->d_name.name,
1059 inode->i_nlink, atomic_read(&inode->i_count));
1062 rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
1065 CERROR("error unlinking objid %*s: rc %d\n",
1066 object_dentry->d_name.len,
1067 object_dentry->d_name.name, rc);
1072 static int filter_close_internal(struct obd_export *export,
1073 struct filter_file_data *ffd,
1074 struct obd_trans_info *oti)
1076 struct obd_device *obd = export->exp_obd;
1077 struct filter_obd *filter = &obd->u.filter;
1078 struct file *filp = ffd->ffd_file;
1079 struct dentry *object_dentry = dget(filp->f_dentry);
1080 struct filter_dentry_data *fdd = object_dentry->d_fsdata;
1084 LASSERT(filp->private_data == ffd);
1087 rc = filp_close(filp, 0);
1089 if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1090 fdd->fdd_flags & FILTER_FLAG_DESTROY) {
1091 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, fdd->fdd_objid);
1092 struct obd_run_ctxt saved;
1095 down(&dir_dentry->d_inode->i_sem);
1096 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1097 filter_start_transno(export);
1098 handle = fsfilt_start(obd, dir_dentry->d_inode,
1100 if (IS_ERR(handle)) {
1101 rc = filter_finish_transno(export, handle, oti,
1105 /* XXX unlink from PENDING directory now too */
1106 rc2 = filter_destroy_internal(obd, dir_dentry, object_dentry);
1109 rc = filter_finish_transno(export, handle, oti, rc);
1110 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1112 CERROR("error on commit, err = %d\n", rc2);
1117 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1118 up(&dir_dentry->d_inode->i_sem);
1121 f_dput(object_dentry);
1122 PORTAL_SLAB_FREE(ffd, filter_open_cache, sizeof(*ffd));
1128 /* mount the file system (secretly) */
1129 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1132 struct obd_ioctl_data* data = buf;
1133 struct filter_obd *filter;
1134 struct vfsmount *mnt;
1138 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1141 obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1142 if (IS_ERR(obd->obd_fsops))
1143 RETURN(PTR_ERR(obd->obd_fsops));
1145 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1148 CERROR("mount of %s as type %s failed: rc %d\n",
1149 data->ioc_inlbuf2, data->ioc_inlbuf1, rc);
1154 obd->obd_flags |= OBD_REPLAYABLE;
1157 filter = &obd->u.filter;;
1158 filter->fo_vfsmnt = mnt;
1159 filter->fo_fstype = strdup(data->ioc_inlbuf2);
1160 filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
1161 CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1163 OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1164 filter->fo_ctxt.pwdmnt = mnt;
1165 filter->fo_ctxt.pwd = mnt->mnt_root;
1166 filter->fo_ctxt.fs = get_ds();
1168 rc = filter_prep(obd);
1170 GOTO(err_kfree, rc);
1172 #ifdef FILTER_TRANSNO_SEM
1173 init_MUTEX(&filter->fo_transno_sem);
1175 spin_lock_init(&filter->fo_translock);
1177 spin_lock_init(&filter->fo_fddlock);
1178 spin_lock_init(&filter->fo_objidlock);
1179 INIT_LIST_HEAD(&filter->fo_export_list);
1181 obd->obd_namespace =
1182 ldlm_namespace_new("filter-tgt", LDLM_NAMESPACE_SERVER);
1183 if (!obd->obd_namespace)
1184 GOTO(err_post, rc = -ENOMEM);
1186 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1187 "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1194 kfree(filter->fo_fstype);
1196 mntput(filter->fo_vfsmnt);
1200 fsfilt_put_ops(obd->obd_fsops);
1204 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1206 return filter_common_setup(obd, len, buf, NULL);
1209 /* sanobd setup methods - use a specific mount option */
1210 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1212 struct obd_ioctl_data* data = buf;
1213 char *option = NULL;
1215 if (!data->ioc_inlbuf2)
1218 /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1219 if (!strcmp(data->ioc_inlbuf2, "extN") ||
1220 !strcmp(data->ioc_inlbuf2, "ext3"))
1221 option = "data=writeback";
1223 LBUG(); /* just a reminder */
1225 return filter_common_setup(obd, len, buf, option);
1228 static int filter_cleanup(struct obd_device *obd)
1230 struct super_block *sb;
1233 if (!list_empty(&obd->obd_exports)) {
1234 CERROR("still has clients!\n");
1235 class_disconnect_all(obd);
1236 if (!list_empty(&obd->obd_exports)) {
1237 CERROR("still has exports after forced cleanup?\n");
1242 ldlm_namespace_free(obd->obd_namespace);
1244 sb = obd->u.filter.fo_sb;
1245 if (!obd->u.filter.fo_sb)
1250 shrink_dcache_parent(sb->s_root);
1252 mntput(obd->u.filter.fo_vfsmnt);
1253 obd->u.filter.fo_sb = 0;
1254 kfree(obd->u.filter.fo_fstype);
1255 fsfilt_put_ops(obd->obd_fsops);
1262 int filter_attach(struct obd_device *dev, obd_count len, void *data)
1264 struct lprocfs_static_vars lvars;
1266 lprocfs_init_vars(&lvars);
1267 return lprocfs_obd_attach(dev, lvars.obd_vars);
1270 int filter_detach(struct obd_device *dev)
1272 return lprocfs_obd_detach(dev);
1275 /* nearly identical to mds_connect */
1276 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1277 struct obd_uuid *cluuid, struct recovd_obd *recovd,
1278 ptlrpc_recovery_cb_t recover)
1280 struct obd_export *exp;
1281 struct filter_export_data *fed;
1282 struct filter_client_data *fcd;
1283 struct filter_obd *filter = &obd->u.filter;
1288 if (!conn || !obd || !cluuid)
1291 rc = class_connect(conn, obd, cluuid);
1294 exp = class_conn2export(conn);
1296 fed = &exp->exp_filter_data;
1298 OBD_ALLOC(fcd, sizeof(*fcd));
1300 CERROR("filter: out of memory for client data\n");
1301 GOTO(out_export, rc = -ENOMEM);
1304 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1306 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1308 INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
1309 spin_lock_init(&exp->exp_filter_data.fed_lock);
1311 if (obd->obd_flags & OBD_REPLAYABLE) {
1312 rc = filter_client_add(filter, fed, -1);
1320 OBD_FREE(fcd, sizeof(*fcd));
1322 class_disconnect(conn);
1327 /* also incredibly similar to mds_disconnect */
1328 static int filter_disconnect(struct lustre_handle *conn)
1330 struct obd_export *exp = class_conn2export(conn);
1331 struct filter_export_data *fed;
1336 fed = &exp->exp_filter_data;
1337 spin_lock(&fed->fed_lock);
1338 while (!list_empty(&fed->fed_open_head)) {
1339 struct filter_file_data *ffd;
1341 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1343 list_del(&ffd->ffd_export_list);
1344 spin_unlock(&fed->fed_lock);
1346 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1347 ffd->ffd_file->f_dentry->d_name.len,
1348 ffd->ffd_file->f_dentry->d_name.name,
1349 ffd, ffd->ffd_servercookie);
1351 filter_close_internal(exp, ffd, NULL);
1352 spin_lock(&fed->fed_lock);
1354 spin_unlock(&fed->fed_lock);
1356 ldlm_cancel_locks_for_export(exp);
1358 if (exp->exp_obd->obd_flags & OBD_REPLAYABLE)
1359 filter_client_free(exp);
1361 rc = class_disconnect(conn);
1363 /* XXX cleanup preallocated inodes */
1367 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1369 int type = oa->o_mode & S_IFMT;
1372 CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1373 inode->i_ino, inode, oa->o_id, valid);
1374 /* Don't copy the inode number in place of the object ID */
1375 obdo_from_inode(oa, inode, valid);
1376 oa->o_mode &= ~S_IFMT;
1379 if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1380 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1382 oa->o_valid |= OBD_MD_FLRDEV;
1388 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
1390 struct filter_file_data *ffd = NULL;
1393 if (!handle || !handle->addr)
1396 ffd = (struct filter_file_data *)(unsigned long)(handle->addr);
1397 if (!kmem_cache_validate(filter_open_cache, (void *)ffd))
1400 if (ffd->ffd_servercookie != handle->cookie)
1403 LASSERT(ffd->ffd_file->private_data == ffd);
1407 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1408 struct obdo *oa, int locked,char *what)
1410 struct dentry *dentry = NULL;
1412 if (oa->o_valid & OBD_MD_FLHANDLE) {
1413 struct lustre_handle *ost_handle = obdo_handle(oa);
1414 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1417 dentry = dget(ffd->ffd_file->f_dentry);
1421 struct obd_device *obd = class_conn2obd(conn);
1423 CERROR("invalid client "LPX64"\n", conn->addr);
1424 RETURN(ERR_PTR(-EINVAL));
1426 dentry = filter_fid2dentry(obd, filter_parent(obd, oa->o_mode,
1431 if (IS_ERR(dentry)) {
1432 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1436 if (!dentry->d_inode) {
1437 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1440 RETURN(ERR_PTR(-ENOENT));
1446 #define filter_oa2dentry(conn, oa, locked) __filter_oa2dentry(conn, oa, locked,\
1449 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1450 struct lov_stripe_md *md)
1452 struct dentry *dentry = NULL;
1456 XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
1458 dentry = filter_oa2dentry(conn, oa, 1);
1460 RETURN(PTR_ERR(dentry));
1462 filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1468 /* this is called from filter_truncate() until we have filter_punch() */
1469 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1470 struct lov_stripe_md *md, struct obd_trans_info *oti)
1472 struct obd_run_ctxt saved;
1473 struct obd_export *export = class_conn2export(conn);
1474 struct obd_device *obd = class_conn2obd(conn);
1475 struct filter_obd *filter = &obd->u.filter;
1476 struct dentry *dentry;
1478 struct inode *inode;
1483 XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
1485 dentry = filter_oa2dentry(conn, oa, 0);
1488 RETURN(PTR_ERR(dentry));
1490 iattr_from_obdo(&iattr, oa, oa->o_valid);
1491 iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1492 inode = dentry->d_inode;
1494 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1496 if (iattr.ia_valid & ATTR_SIZE)
1497 down(&inode->i_sem);
1499 filter_start_transno(export);
1500 handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1501 if (IS_ERR(handle)) {
1502 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1503 GOTO(out_unlock, rc);
1506 if (inode->i_op->setattr)
1507 rc = inode->i_op->setattr(dentry, &iattr);
1509 rc = inode_setattr(inode, &iattr);
1510 rc = filter_finish_transno(export, handle, oti, rc);
1511 rc2 = fsfilt_commit(obd, dentry->d_inode, handle);
1513 CERROR("error on commit, err = %d\n", rc2);
1518 if (iattr.ia_valid & ATTR_SIZE) {
1520 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1521 obdo_from_inode(oa, inode, oa->o_valid);
1526 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1532 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1533 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1535 struct obd_export *export;
1536 struct lustre_handle *handle;
1537 struct filter_file_data *ffd;
1542 export = class_conn2export(conn);
1544 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1548 XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
1550 filp = filter_obj_open(export, oa->o_id, oa->o_mode);
1552 GOTO(out, rc = PTR_ERR(filp));
1554 filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1556 ffd = filp->private_data;
1557 handle = obdo_handle(oa);
1558 handle->addr = (__u64)(unsigned long)ffd;
1559 handle->cookie = ffd->ffd_servercookie;
1560 oa->o_valid |= OBD_MD_FLHANDLE;
1566 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1567 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1569 struct obd_export *exp;
1570 struct filter_file_data *ffd;
1571 struct filter_export_data *fed;
1575 exp = class_conn2export(conn);
1577 CDEBUG(D_IOCTL, "fatal: invalid client "LPX64"\n", conn->addr);
1581 XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
1583 if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1584 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1588 ffd = filter_handle2ffd(obdo_handle(oa));
1590 struct lustre_handle *handle = obdo_handle(oa);
1591 CERROR("bad handle ("LPX64") or cookie ("LPX64") for close\n",
1592 handle->addr, handle->cookie);
1596 fed = &exp->exp_filter_data;
1597 spin_lock(&fed->fed_lock);
1598 list_del(&ffd->ffd_export_list);
1599 spin_unlock(&fed->fed_lock);
1601 rc = filter_close_internal(exp, ffd, oti);
1604 } /* filter_close */
1606 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1607 struct lov_stripe_md **ea, struct obd_trans_info *oti)
1609 struct obd_export *export = class_conn2export(conn);
1610 struct obd_device *obd = class_conn2obd(conn);
1611 struct filter_obd *filter = &obd->u.filter;
1612 struct obd_run_ctxt saved;
1613 struct dentry *dir_dentry;
1621 CERROR("invalid client "LPX64"\n", conn->addr);
1625 XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
1627 oa->o_id = filter_next_id(obd);
1629 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1630 dir_dentry = filter_parent(obd, S_IFREG, oa->o_id);
1631 down(&dir_dentry->d_inode->i_sem);
1632 new = filter_fid2dentry(obd, dir_dentry, oa->o_id, 0);
1634 GOTO(out, rc = PTR_ERR(new));
1639 /* This would only happen if lastobjid was bad on disk */
1640 CERROR("objid %s already exists\n",
1641 filter_id(buf, filter, S_IFREG, oa->o_id));
1643 GOTO(out, rc = -EEXIST);
1646 filter_start_transno(export);
1647 handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_CREATE);
1648 if (IS_ERR(handle)) {
1649 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1652 rc = vfs_create(dir_dentry->d_inode, new, oa->o_mode);
1654 CERROR("create failed rc = %d\n", rc);
1656 rc = filter_finish_transno(export, handle, oti, rc);
1657 err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1659 CERROR("unable to write lastobjid but file created\n");
1663 err = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1665 CERROR("error on commit, err = %d\n", err);
1673 /* Set flags for fields we have set in the inode struct */
1674 oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1675 OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1676 filter_from_inode(oa, new->d_inode, oa->o_valid);
1682 up(&dir_dentry->d_inode->i_sem);
1683 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1687 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1688 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1690 struct obd_export *export = class_conn2export(conn);
1691 struct obd_device *obd = class_conn2obd(conn);
1692 struct filter_obd *filter = &obd->u.filter;
1693 struct dentry *dir_dentry, *object_dentry;
1694 struct filter_dentry_data *fdd;
1695 struct obd_run_ctxt saved;
1701 CERROR("invalid client "LPX64"\n", conn->addr);
1705 XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
1707 CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1709 dir_dentry = filter_parent(obd, oa->o_mode, oa->o_id);
1710 down(&dir_dentry->d_inode->i_sem);
1712 object_dentry = filter_oa2dentry(conn, oa, 0);
1713 if (IS_ERR(object_dentry))
1714 GOTO(out, rc = -ENOENT);
1716 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1717 filter_start_transno(export);
1718 handle = fsfilt_start(obd, dir_dentry->d_inode, FSFILT_OP_UNLINK);
1719 if (IS_ERR(handle)) {
1720 rc = filter_finish_transno(export, handle, oti,PTR_ERR(handle));
1724 fdd = object_dentry->d_fsdata;
1725 if (fdd && atomic_read(&fdd->fdd_open_count)) {
1726 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1727 fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1728 /* XXX put into PENDING directory in case of crash */
1730 "defer destroy of %dx open objid "LPU64"\n",
1731 atomic_read(&fdd->fdd_open_count), oa->o_id);
1734 "repeat destroy of %dx open objid "LPU64"\n",
1735 atomic_read(&fdd->fdd_open_count), oa->o_id);
1736 GOTO(out_commit, rc = 0);
1739 rc = filter_destroy_internal(obd, dir_dentry, object_dentry);
1742 /* XXX save last_rcvd on disk */
1743 rc = filter_finish_transno(export, handle, oti, rc);
1744 rc2 = fsfilt_commit(obd, dir_dentry->d_inode, handle);
1746 CERROR("error on commit, err = %d\n", rc2);
1751 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1752 f_dput(object_dentry);
1756 up(&dir_dentry->d_inode->i_sem);
1760 /* NB start and end are used for punch, but not truncate */
1761 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1762 struct lov_stripe_md *lsm,
1763 obd_off start, obd_off end,
1764 struct obd_trans_info *oti)
1769 XPROCFS_BUMP_MYCPU_IOSTAT (st_punch_reqs, 1);
1771 if (end != OBD_OBJECT_EOF)
1772 CERROR("PUNCH not supported, only truncate works\n");
1774 CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
1775 "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1777 error = filter_setattr(conn, oa, NULL, oti);
1781 static inline void lustre_put_page(struct page *page)
1784 page_cache_release(page);
1788 static struct page *
1789 lustre_get_page_read(struct inode *inode, struct niobuf_local *lnb)
1791 unsigned long index = lnb->offset >> PAGE_SHIFT;
1792 struct address_space *mapping = inode->i_mapping;
1796 page = read_cache_page(mapping, index,
1797 (filler_t*)mapping->a_ops->readpage, NULL);
1798 if (!IS_ERR(page)) {
1800 lnb->addr = kmap(page);
1802 if (!PageUptodate(page)) {
1803 CERROR("page index %lu not uptodate\n", index);
1804 GOTO(err_page, rc = -EIO);
1806 if (PageError(page)) {
1807 CERROR("page index %lu has error\n", index);
1808 GOTO(err_page, rc = -EIO);
1814 lustre_put_page(page);
1818 static struct page *
1819 lustre_get_page_write(struct inode *inode, unsigned long index)
1821 struct address_space *mapping = inode->i_mapping;
1825 page = grab_cache_page(mapping, index); /* locked page */
1827 if (!IS_ERR(page)) {
1829 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
1830 * a no-op for most filesystems, because we write the whole
1831 * page. For partial-page I/O this will read in the page.
1833 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
1835 CERROR("page index %lu, rc = %d\n", index, rc);
1838 GOTO(err_unlock, rc);
1840 /* XXX not sure if we need this if we are overwriting page */
1841 if (PageError(page)) {
1842 CERROR("error on page index %lu, rc = %d\n", index, rc);
1844 GOTO(err_unlock, rc = -EIO);
1851 lustre_put_page(page);
1855 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1856 int waitfor_one_page(struct page *page)
1858 wait_on_page_locked(page);
1863 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1864 /* We should only change the file mtime (and not the ctime, like
1865 * update_inode_times() in generic_file_write()) when we only change data.
1867 static inline void inode_update_time(struct inode *inode, int ctime_too)
1869 time_t now = CURRENT_TIME;
1870 if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
1872 inode->i_mtime = now;
1874 inode->i_ctime = now;
1875 mark_inode_dirty_sync(inode);
1879 static int lustre_commit_write(struct niobuf_local *lnb)
1881 struct page *page = lnb->page;
1882 unsigned from = lnb->offset & ~PAGE_MASK;
1883 unsigned to = from + lnb->len;
1884 struct inode *inode = page->mapping->host;
1887 LASSERT(to <= PAGE_SIZE);
1888 err = page->mapping->a_ops->commit_write(NULL, page, from, to);
1889 if (!err && IS_SYNC(inode))
1890 err = waitfor_one_page(page);
1891 //SetPageUptodate(page); // the client commit_write will do this
1893 SetPageReferenced(page);
1895 lustre_put_page(page);
1899 struct page *filter_get_page_write(struct inode *inode,
1900 struct niobuf_local *lnb, int *pglocked)
1902 unsigned long index = lnb->offset >> PAGE_SHIFT;
1903 struct address_space *mapping = inode->i_mapping;
1907 //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
1909 page = grab_cache_page_nowait(mapping, index); /* locked page */
1911 page = grab_cache_page(mapping, index); /* locked page */
1914 /* This page is currently locked, so get a temporary page instead. */
1915 /* XXX I believe this is a very dangerous thing to do - consider if
1916 * we had multiple writers for the same file (definitely the case
1917 * if we are using this codepath). If writer A locks the page,
1918 * writer B writes to a copy (as here), writer A drops the page
1919 * lock, and writer C grabs the lock before B does, then B will
1920 * later overwrite the data from C, even if C had LDLM locked
1921 * and initiated the write after B did.
1925 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
1926 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
1928 CERROR("no memory for a temp page\n");
1929 GOTO(err, rc = -ENOMEM);
1931 POISON((void *)addr, 0xBA, PAGE_SIZE);
1932 page = virt_to_page(addr);
1934 page->index = index;
1935 lnb->addr = (void *)addr;
1937 lnb->flags |= N_LOCAL_TEMP_PAGE;
1938 } else if (!IS_ERR(page)) {
1942 rc = mapping->a_ops->prepare_write(NULL, page,
1943 lnb->offset & ~PAGE_MASK,
1947 CERROR("page index %lu, rc = %d\n", index, rc);
1948 GOTO(err_unlock, rc);
1950 /* XXX not sure if we need this if we are overwriting page */
1951 if (PageError(page)) {
1952 CERROR("error on page index %lu, rc = %d\n", index, rc);
1954 GOTO(err_unlock, rc = -EIO);
1956 lnb->addr = page_address(page);
1964 lustre_put_page(page);
1970 * We need to balance prepare_write() calls with commit_write() calls.
1971 * If the page has been prepared, but we have no data for it, we don't
1972 * want to overwrite valid data on disk, but we still need to zero out
1973 * data for space which was newly allocated. Like part of what happens
1974 * in __block_prepare_write() for newly allocated blocks.
1976 * XXX currently __block_prepare_write() creates buffers for all the
1977 * pages, and the filesystems mark these buffers as BH_New if they
1978 * were newly allocated from disk. We use the BH_New flag similarly.
1980 static int filter_commit_write(struct niobuf_local *lnb, int err)
1982 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1984 unsigned block_start, block_end;
1985 struct buffer_head *bh, *head = lnb->page->buffers;
1986 unsigned blocksize = head->b_size;
1988 /* debugging: just seeing if this ever happens */
1989 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
1990 "called for ino %lu:%lu on err %d\n",
1991 lnb->page->mapping->host->i_ino, lnb->page->index, err);
1993 /* Currently one buffer per page, but in the future... */
1994 for (bh = head, block_start = 0; bh != head || !block_start;
1995 block_start = block_end, bh = bh->b_this_page) {
1996 block_end = block_start + blocksize;
1998 memset(lnb->addr + block_start, 0, blocksize);
2002 return lustre_commit_write(lnb);
2005 static int filter_preprw(int cmd, struct lustre_handle *conn,
2006 int objcount, struct obd_ioobj *obj,
2007 int niocount, struct niobuf_remote *nb,
2008 struct niobuf_local *res, void **desc_private,
2009 struct obd_trans_info *oti)
2011 struct obd_run_ctxt saved;
2012 struct obd_export *export;
2013 struct obd_device *obd;
2014 struct obd_ioobj *o;
2015 struct niobuf_remote *rnb = nb;
2016 struct niobuf_local *lnb = res;
2017 struct fsfilt_objinfo *fso;
2023 if ((cmd & OBD_BRW_WRITE) != 0)
2024 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2026 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2028 memset(res, 0, niocount * sizeof(*res));
2030 export = class_conn2export(conn);
2031 obd = class_conn2obd(conn);
2033 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2037 LASSERT(objcount < 16); // theoretically we support multi-obj BRW
2039 OBD_ALLOC(fso, objcount * sizeof(*fso));
2043 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2045 for (i = 0, o = obj; i < objcount; i++, o++) {
2046 struct filter_dentry_data *fdd;
2047 struct dentry *dentry;
2049 LASSERT(o->ioo_bufcnt);
2051 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2056 GOTO(out_objinfo, rc = PTR_ERR(dentry));
2058 fso[i].fso_dentry = dentry;
2059 fso[i].fso_bufcnt = o->ioo_bufcnt;
2061 if (!dentry->d_inode) {
2062 CERROR("trying to BRW to non-existent file "LPU64"\n",
2065 GOTO(out_objinfo, rc = -ENOENT);
2068 fdd = dentry->d_fsdata;
2069 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2070 CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2074 if (cmd & OBD_BRW_WRITE) {
2075 #warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
2076 /* Even worse, we need to get locks on mulitple inodes (in
2077 * order) or use the DLM to do the locking for us (and use
2078 * the same locking in filter_setattr() for truncate. The
2079 * handling gets very ugly when dealing with locked pages.
2080 * It may be easier to just get rid of the locked page code
2081 * (which has problems of its own) and either discover we do
2082 * not need it anymore (i.e. it was a symptom of another bug)
2083 * or ensure we get the page locks in an appropriate order.
2085 /* Danger, Will Robinson! You are taking a lock here and also
2086 * starting a transaction and releasing/finishing then in
2087 * filter_commitrw(), so you must call fsfilt_commit() and
2088 * finish_transno() if an error occurs in this function.
2090 filter_start_transno(export);
2091 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2093 if (IS_ERR(*desc_private)) {
2094 rc = PTR_ERR(*desc_private);
2095 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2096 "error starting transaction: rc = %d\n", rc);
2097 *desc_private = NULL;
2098 GOTO(out_objinfo, rc);
2102 obd_kmap_get(niocount, 1);
2104 for (i = 0, o = obj; i < objcount; i++, o++) {
2105 struct dentry *dentry;
2106 struct inode *inode;
2109 dentry = fso[i].fso_dentry;
2110 inode = dentry->d_inode;
2112 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2116 lnb->dentry = dentry;
2118 lnb->dentry = dget(dentry);
2120 /* lnb->offset is aligned, while rnb->offset isn't,
2121 * and we need to copy the fields to lnb anyways.
2123 memcpy(lnb, rnb, sizeof(*rnb));
2124 if (cmd & OBD_BRW_WRITE) {
2125 page = filter_get_page_write(inode, lnb,
2128 XPROCFS_BUMP_MYCPU_IOSTAT(st_write_bytes,
2131 page = lustre_get_page_read(inode, lnb);
2133 XPROCFS_BUMP_MYCPU_IOSTAT(st_read_bytes,
2139 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2140 "error on page @"LPU64"%u/%u: rc = %d\n",
2141 lnb->offset, j, o->ioo_bufcnt, rc);
2143 GOTO(out_pages, rc);
2150 OBD_FREE(fso, objcount * sizeof(*fso));
2151 current->journal_info = NULL;
2152 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2156 while (lnb-- > res) {
2157 if (cmd & OBD_BRW_WRITE)
2158 filter_commit_write(lnb, rc);
2160 lustre_put_page(lnb->page);
2161 f_dput(lnb->dentry);
2163 obd_kmap_put(niocount);
2164 if (cmd & OBD_BRW_WRITE) {
2165 filter_finish_transno(export, *desc_private, oti, rc);
2167 filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2170 goto out; /* dropped the dentry refs already (one per page) */
2173 for (i = 0; i < objcount && fso[i].fso_dentry; i++)
2174 f_dput(fso[i].fso_dentry);
2178 static int filter_write_locked_page(struct niobuf_local *lnb)
2184 lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2185 if (IS_ERR(lpage)) {
2186 /* It is highly unlikely that we would ever get an error here.
2187 * The page we want to get was previously locked, so it had to
2188 * have already allocated the space, and we were just writing
2189 * over the same data, so there would be no hole in the file.
2191 * XXX: possibility of a race with truncate could exist, need
2192 * to check that. There are no guarantees w.r.t.
2193 * write order even on a local filesystem, although the
2194 * normal response would be to return the number of bytes
2195 * successfully written and leave the rest to the app.
2197 rc = PTR_ERR(lpage);
2198 CERROR("error getting locked page index %ld: rc = %d\n",
2199 lnb->page->index, rc);
2201 lustre_commit_write(lnb);
2205 /* lpage is kmapped in lustre_get_page_write() above and kunmapped in
2206 * lustre_commit_write() below, lnb->page was kmapped previously in
2207 * filter_get_page_write() and kunmapped in lustre_put_page() below.
2209 memcpy(page_address(lpage), page_address(lnb->page), PAGE_SIZE);
2210 lustre_put_page(lnb->page);
2213 rc = lustre_commit_write(lnb);
2215 CERROR("error committing locked page %ld: rc = %d\n",
2216 lnb->page->index, rc);
2221 static int filter_syncfs(struct lustre_handle *conn)
2223 struct obd_device *obd;
2226 obd = class_conn2obd(conn);
2228 XPROCFS_BUMP_MYCPU_IOSTAT (st_syncfs_reqs, 1);
2230 RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2233 static int filter_commitrw(int cmd, struct lustre_handle *conn,
2234 int objcount, struct obd_ioobj *obj,
2235 int niocount, struct niobuf_local *res,
2236 void *desc_private, struct obd_trans_info *oti)
2238 struct obd_run_ctxt saved;
2239 struct obd_ioobj *o;
2240 struct niobuf_local *lnb;
2241 struct obd_export *export = class_conn2export(conn);
2242 struct obd_device *obd = class_conn2obd(conn);
2243 int found_locked = 0;
2248 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2250 LASSERT(!current->journal_info);
2251 current->journal_info = desc_private;
2253 for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2256 if (cmd & OBD_BRW_WRITE)
2257 inode_update_time(lnb->dentry->d_inode, 1);
2258 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2259 if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2264 if (cmd & OBD_BRW_WRITE) {
2265 int err = filter_commit_write(lnb, 0);
2270 lustre_put_page(lnb->page);
2273 f_dput(lnb->dentry);
2277 for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2280 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2282 if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2285 err = filter_write_locked_page(lnb);
2289 f_dput(lnb->dentry);
2294 if (cmd & OBD_BRW_WRITE) {
2295 /* We just want any dentry for the commit, for now */
2296 struct dentry *dir_dentry = filter_parent(obd, S_IFREG, 0);
2299 rc = filter_finish_transno(export, desc_private, oti, rc);
2300 err = fsfilt_commit(obd, dir_dentry->d_inode, desc_private);
2303 if (obd_sync_filter) {
2304 /* this can fail with ENOMEM, what should we do then? */
2305 filter_syncfs(conn);
2307 /* XXX <adilger> LASSERT(last_rcvd == last_committed)*/
2310 LASSERT(!current->journal_info);
2312 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2316 static int filter_brw(int cmd, struct lustre_handle *conn,
2317 struct lov_stripe_md *lsm, obd_count oa_bufs,
2318 struct brw_page *pga, struct obd_brw_set *set,
2319 struct obd_trans_info *oti)
2321 struct obd_ioobj ioo;
2322 struct niobuf_local *lnb;
2323 struct niobuf_remote *rnb;
2329 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2330 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2332 if (lnb == NULL || rnb == NULL)
2333 GOTO(out, ret = -ENOMEM);
2335 for (i = 0; i < oa_bufs; i++) {
2336 rnb[i].offset = pga[i].off;
2337 rnb[i].len = pga[i].count;
2340 ioo.ioo_id = lsm->lsm_object_id;
2342 ioo.ioo_type = S_IFREG;
2343 ioo.ioo_bufcnt = oa_bufs;
2345 ret = filter_preprw(cmd, conn, 1, &ioo, oa_bufs, rnb, lnb,
2346 &desc_private, oti);
2350 for (i = 0; i < oa_bufs; i++) {
2351 void *virt = kmap(pga[i].pg);
2352 obd_off off = pga[i].off & ~PAGE_MASK;
2354 if (cmd & OBD_BRW_WRITE)
2355 memcpy(lnb[i].addr + off, virt + off, pga[i].count);
2357 memcpy(virt + off, lnb[i].addr + off, pga[i].count);
2362 ret = filter_commitrw(cmd, conn, 1, &ioo, oa_bufs, lnb, desc_private,
2367 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2369 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2373 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2374 int objcount, struct obd_ioobj *obj,
2375 int niocount, struct niobuf_remote *nb)
2377 struct obd_device *obd;
2378 struct obd_ioobj *o = obj;
2379 struct niobuf_remote *rnb = nb;
2384 if ((cmd & OBD_BRW_WRITE) != 0)
2385 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
2387 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
2389 obd = class_conn2obd(conn);
2391 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2395 for (i = 0; i < objcount; i++, o++) {
2396 struct dentry *dentry;
2397 struct inode *inode;
2400 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
2404 GOTO(out, rc = PTR_ERR(dentry));
2405 inode = dentry->d_inode;
2407 CERROR("trying to BRW to non-existent file "LPU64"\n",
2410 GOTO(out, rc = -ENOENT);
2413 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2416 block = rnb->offset >> PAGE_SHIFT;
2418 if (cmd == OBD_BRW_READ) {
2419 block = inode->i_mapping->a_ops->bmap(
2420 inode->i_mapping, block);
2422 loff_t newsize = rnb->offset + rnb->len;
2423 /* fs_prep_san_write will also update inode
2425 * (1) new alloced block
2426 * (2) existed block but size extented
2428 /* FIXME We could call fs_prep_san_write()
2429 * only once for all the blocks allocation.
2430 * Now call it once for each block, for
2431 * simplicity. And if error happens, we
2432 * probably need to release previous alloced
2434 rc = fs_prep_san_write(obd, inode, &block,
2440 rnb->offset = block;
2448 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2450 struct obd_device *obd;
2453 obd = class_conn2obd(conn);
2455 XPROCFS_BUMP_MYCPU_IOSTAT (st_statfs_reqs, 1);
2457 RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2460 static int filter_get_info(struct lustre_handle *conn, obd_count keylen,
2461 void *key, obd_count *vallen, void **val)
2463 struct obd_device *obd;
2466 obd = class_conn2obd(conn);
2468 CDEBUG(D_IOCTL, "invalid client "LPX64"\n", conn->addr);
2472 if ( keylen == strlen("blocksize") &&
2473 memcmp(key, "blocksize", keylen) == 0 ) {
2474 *vallen = sizeof(long);
2475 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize;
2479 if ( keylen == strlen("blocksize_bits") &&
2480 memcmp(key, "blocksize_bits", keylen) == 0 ){
2481 *vallen = sizeof(long);
2482 *val = (void *)(long)obd->u.filter.fo_sb->s_blocksize_bits;
2486 CDEBUG(D_IOCTL, "invalid key\n");
2490 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2491 struct lustre_handle *src_conn, struct obdo *src,
2492 obd_size count, obd_off offset, struct obd_trans_info *oti)
2495 struct lov_stripe_md srcmd, dstmd;
2496 unsigned long index = 0;
2499 memset(&srcmd, 0, sizeof(srcmd));
2500 memset(&dstmd, 0, sizeof(dstmd));
2501 srcmd.lsm_object_id = src->o_id;
2502 dstmd.lsm_object_id = dst->o_id;
2505 CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2506 ", dst: ino "LPU64"\n",
2507 src->o_id, src->o_blocks, src->o_size, dst->o_id);
2508 page = alloc_page(GFP_USER);
2512 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2513 while (TryLockPage(page))
2514 ___wait_on_page(page);
2516 wait_on_page_locked(page);
2519 /* XXX with brw vector I/O, we could batch up reads and writes here,
2520 * all we need to do is allocate multiple pages to handle the I/Os
2521 * and arrays to handle the request parameters.
2523 while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2525 struct obd_brw_set *set;
2527 set = obd_brw_set_new();
2535 pg.count = PAGE_SIZE;
2536 pg.off = (page->index) << PAGE_SHIFT;
2539 page->index = index;
2540 set->brw_callback = ll_brw_sync_wait;
2541 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
2542 obd_brw_set_free(set);
2548 set = obd_brw_set_new();
2554 pg.flag = OBD_BRW_CREATE;
2555 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2557 set->brw_callback = ll_brw_sync_wait;
2558 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
2559 obd_brw_set_free(set);
2561 /* XXX should handle dst->o_size, dst->o_blocks here */
2567 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2571 dst->o_size = src->o_size;
2572 dst->o_blocks = src->o_blocks;
2573 dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2580 static struct obd_ops filter_obd_ops = {
2581 o_owner: THIS_MODULE,
2582 o_attach: filter_attach,
2583 o_detach: filter_detach,
2584 o_get_info: filter_get_info,
2585 o_setup: filter_setup,
2586 o_cleanup: filter_cleanup,
2587 o_connect: filter_connect,
2588 o_disconnect: filter_disconnect,
2589 o_statfs: filter_statfs,
2590 o_syncfs: filter_syncfs,
2591 o_getattr: filter_getattr,
2592 o_create: filter_create,
2593 o_setattr: filter_setattr,
2594 o_destroy: filter_destroy,
2595 o_open: filter_open,
2596 o_close: filter_close,
2598 o_punch: filter_truncate,
2599 o_preprw: filter_preprw,
2600 o_commitrw: filter_commitrw
2602 o_san_preprw: filter_san_preprw,
2603 o_preallocate: filter_preallocate_inodes,
2604 o_migrate: filter_migrate,
2605 o_copy: filter_copy_data,
2606 o_iterate: filter_iterate
2610 static struct obd_ops filter_sanobd_ops = {
2611 o_owner: THIS_MODULE,
2612 o_attach: filter_attach,
2613 o_detach: filter_detach,
2614 o_get_info: filter_get_info,
2615 o_setup: filter_san_setup,
2616 o_cleanup: filter_cleanup,
2617 o_connect: filter_connect,
2618 o_disconnect: filter_disconnect,
2619 o_statfs: filter_statfs,
2620 o_getattr: filter_getattr,
2621 o_create: filter_create,
2622 o_setattr: filter_setattr,
2623 o_destroy: filter_destroy,
2624 o_open: filter_open,
2625 o_close: filter_close,
2627 o_punch: filter_truncate,
2628 o_preprw: filter_preprw,
2629 o_commitrw: filter_commitrw,
2630 o_san_preprw: filter_san_preprw,
2632 o_preallocate: filter_preallocate_inodes,
2633 o_migrate: filter_migrate,
2634 o_copy: filter_copy_data,
2635 o_iterate: filter_iterate
2640 static int __init obdfilter_init(void)
2642 struct lprocfs_static_vars lvars;
2645 printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2646 filter_open_cache = kmem_cache_create("ll_filter_fdata",
2647 sizeof(struct filter_file_data),
2649 if (!filter_open_cache)
2652 filter_dentry_cache = kmem_cache_create("ll_filter_dentry",
2653 sizeof(struct filter_dentry_data),
2655 if (!filter_dentry_cache) {
2660 xprocfs_init ("filter");
2662 lprocfs_init_vars(&lvars);
2664 rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2665 OBD_FILTER_DEVICENAME);
2669 rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2670 OBD_FILTER_SAN_DEVICENAME);
2676 class_unregister_type(OBD_FILTER_DEVICENAME);
2678 kmem_cache_destroy(filter_dentry_cache);
2680 kmem_cache_destroy(filter_open_cache);
2684 static void __exit obdfilter_exit(void)
2686 class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2687 class_unregister_type(OBD_FILTER_DEVICENAME);
2688 if (kmem_cache_destroy(filter_dentry_cache))
2689 CERROR("couldn't free obdfilter dentry cache\n");
2690 if (kmem_cache_destroy(filter_open_cache))
2691 CERROR("couldn't free obdfilter open cache\n");
2695 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2696 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2697 MODULE_LICENSE("GPL");
2699 module_init(obdfilter_init);
2700 module_exit(obdfilter_exit);