1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28 * (which need to get journal_lock, may block if journal full).
30 * Invariant: Call filter_start_transno() before any journal ops to avoid the
31 * same deadlock problem. We can (and want) to get rid of the
32 * transno sem in favour of the dir/inode i_sem to avoid single
33 * threaded operation on the OST.
37 #define DEBUG_SUBSYSTEM S_FILTER
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
57 LPROC_FILTER_READ_BYTES = 0,
58 LPROC_FILTER_WRITE_BYTES = 1,
63 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
65 [S_IFREG >> S_SHIFT] "R",
66 [S_IFDIR >> S_SHIFT] "D",
67 [S_IFCHR >> S_SHIFT] "C",
68 [S_IFBLK >> S_SHIFT] "B",
69 [S_IFIFO >> S_SHIFT] "F",
70 [S_IFSOCK >> S_SHIFT] "S",
71 [S_IFLNK >> S_SHIFT] "L"
74 static inline const char *obd_mode_to_type(int mode)
76 return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
79 static void filter_ffd_addref(void *ffdp)
81 struct filter_file_data *ffd = ffdp;
83 atomic_inc(&ffd->ffd_refcount);
84 CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
85 atomic_read(&ffd->ffd_refcount));
88 static struct filter_file_data *filter_ffd_new(void)
90 struct filter_file_data *ffd;
92 OBD_ALLOC(ffd, sizeof *ffd);
94 CERROR("out of memory\n");
98 atomic_set(&ffd->ffd_refcount, 2);
100 INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
101 class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
106 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
108 struct filter_file_data *ffd = NULL;
110 LASSERT(handle != NULL);
111 ffd = class_handle2object(handle->cookie);
113 LASSERT(ffd->ffd_file->private_data == ffd);
117 static void filter_ffd_put(struct filter_file_data *ffd)
119 CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
120 atomic_read(&ffd->ffd_refcount) - 1);
121 LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
122 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
123 if (atomic_dec_and_test(&ffd->ffd_refcount)) {
124 LASSERT(list_empty(&ffd->ffd_handle.h_link));
125 OBD_FREE(ffd, sizeof *ffd);
129 static void filter_ffd_destroy(struct filter_file_data *ffd)
131 class_handle_unhash(&ffd->ffd_handle);
135 static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
137 obd_transno_commit_cb(obd, transno, error);
139 /* Assumes caller has already pushed us into the kernel context. */
140 int filter_finish_transno(struct obd_export *export, void *handle,
141 struct obd_trans_info *oti, int rc)
144 struct obd_device *obd = export->exp_obd;
145 struct filter_obd *filter = &obd->u.filter;
146 struct filter_export_data *fed = &export->exp_filter_data;
147 struct filter_client_data *fcd = fed->fed_fcd;
151 /* Propagate error code. */
155 if (!obd->obd_replayable)
158 /* we don't allocate new transnos for replayed requests */
159 if (oti && oti->oti_transno == 0) {
160 spin_lock(&filter->fo_translock);
161 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd) + 1;
162 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
163 spin_unlock(&filter->fo_translock);
164 oti->oti_transno = last_rcvd;
165 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
166 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
168 /* could get xid from oti, if it's ever needed */
169 fcd->fcd_last_xid = 0;
171 off = fed->fed_lr_off;
172 fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
173 written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd,
175 CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
176 "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid,
177 fed->fed_lr_idx, written);
179 if (written == sizeof(*fcd))
181 CERROR("error writing to last_rcvd file: rc = %d\n",
192 static inline void f_dput(struct dentry *dentry)
194 /* Can't go inside filter_ddelete because it can block */
195 CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
196 dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
197 LASSERT(atomic_read(&dentry->d_count) > 0);
202 /* Not racy w.r.t. others, because we are the only user of this dentry */
203 static void filter_drelease(struct dentry *dentry)
205 if (dentry->d_fsdata)
206 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
209 struct dentry_operations filter_dops = {
210 .d_release = filter_drelease,
213 #define LAST_RCVD "last_rcvd"
216 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
217 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
218 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
220 /* Add client data to the FILTER. We use a bitmap to locate a free space
221 * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
222 * Otherwise, we have just read the data from the last_rcvd file and
223 * we know its offset.
225 int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
226 struct filter_export_data *fed, int cl_idx)
228 unsigned long *bitmap = filter->fo_last_rcvd_slots;
229 int new_client = (cl_idx == -1);
231 LASSERT(bitmap != NULL);
233 /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
234 if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
237 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
238 * there's no need for extra complication here
241 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
243 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
244 CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
247 if (test_and_set_bit(cl_idx, bitmap)) {
248 CERROR("FILTER client %d: found bit is set in bitmap\n",
250 cl_idx = find_next_zero_bit(bitmap,
251 FILTER_LR_MAX_CLIENTS,
256 if (test_and_set_bit(cl_idx, bitmap)) {
257 CERROR("FILTER client %d: bit already set in bitmap!\n",
263 fed->fed_lr_idx = cl_idx;
264 fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
265 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
267 CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
268 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
271 struct obd_run_ctxt saved;
272 loff_t off = fed->fed_lr_off;
276 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
277 fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
279 push_ctxt(&saved, &filter->fo_ctxt, NULL);
280 /* Transaction eeded to fix for bug 1403 */
281 handle = fsfilt_start(obd,
282 filter->fo_rcvd_filp->f_dentry->d_inode,
284 if (IS_ERR(handle)) {
285 written = PTR_ERR(handle);
286 CERROR("unable to start transaction: rc %d\n",
289 written = lustre_fwrite(filter->fo_rcvd_filp,
290 (char *)fed->fed_fcd,
291 sizeof(*fed->fed_fcd), &off);
293 filter->fo_rcvd_filp->f_dentry->d_inode,
296 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
298 if (written != sizeof(*fed->fed_fcd)) {
307 int filter_client_free(struct obd_export *exp, int failover)
309 struct filter_export_data *fed = &exp->exp_filter_data;
310 struct filter_obd *filter = &exp->exp_obd->u.filter;
311 struct filter_client_data zero_fcd;
312 struct obd_run_ctxt saved;
323 /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
324 if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
327 LASSERT(filter->fo_last_rcvd_slots != NULL);
329 off = fed->fed_lr_off;
331 CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
332 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
334 if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
335 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
340 memset(&zero_fcd, 0, sizeof zero_fcd);
341 push_ctxt(&saved, &filter->fo_ctxt, NULL);
342 written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
343 sizeof(zero_fcd), &off);
345 /* XXX: this write gets lost sometimes, unless this sync is here. */
347 file_fsync(filter->fo_rcvd_filp,
348 filter->fo_rcvd_filp->f_dentry, 1);
349 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
351 if (written != sizeof(zero_fcd)) {
352 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
353 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
357 "zeroed disconnecting client %s at idx %u (%llu)\n",
358 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
362 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
367 static int filter_free_server_data(struct filter_obd *filter)
369 OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
370 filter->fo_fsd = NULL;
371 OBD_FREE(filter->fo_last_rcvd_slots,
372 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
373 filter->fo_last_rcvd_slots = NULL;
378 /* assumes caller is already in kernel ctxt */
379 static int filter_update_server_data(struct file *filp,
380 struct filter_server_data *fsd)
385 CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
386 CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
387 le64_to_cpu(fsd->fsd_last_objid));
388 CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
389 le64_to_cpu(fsd->fsd_last_rcvd));
390 CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
391 le64_to_cpu(fsd->fsd_mount_count));
393 rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
394 if (rc != sizeof(*fsd)) {
395 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
402 /* assumes caller has already in kernel ctxt */
403 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
404 __u64 init_lastobjid)
406 struct filter_obd *filter = &obd->u.filter;
407 struct filter_server_data *fsd;
408 struct filter_client_data *fcd = NULL;
409 struct inode *inode = filp->f_dentry->d_inode;
410 unsigned long last_rcvd_size = inode->i_size;
411 __u64 mount_count = 0;
416 /* ensure padding in the struct is the correct size */
417 LASSERT (offsetof(struct filter_server_data, fsd_padding) +
418 sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
419 LASSERT (offsetof(struct filter_client_data, fcd_padding) +
420 sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
422 OBD_ALLOC(fsd, sizeof(*fsd));
425 filter->fo_fsd = fsd;
427 OBD_ALLOC(filter->fo_last_rcvd_slots,
428 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
429 if (filter->fo_last_rcvd_slots == NULL) {
430 OBD_FREE(fsd, sizeof(*fsd));
434 if (last_rcvd_size == 0) {
435 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
437 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
438 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
439 fsd->fsd_last_rcvd = 0;
440 mount_count = fsd->fsd_mount_count = 0;
441 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
442 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
443 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
444 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
445 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
447 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
449 if (retval != sizeof(*fsd)) {
450 CDEBUG(D_INODE,"OBD filter: error reading %s\n",
452 GOTO(err_fsd, rc = -EIO);
454 mount_count = le64_to_cpu(fsd->fsd_mount_count);
455 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
458 if (fsd->fsd_feature_incompat) {
459 CERROR("unsupported feature %x\n",
460 le32_to_cpu(fsd->fsd_feature_incompat));
461 GOTO(err_fsd, rc = -EINVAL);
463 if (fsd->fsd_feature_rocompat) {
464 CERROR("read-only feature %x\n",
465 le32_to_cpu(fsd->fsd_feature_rocompat));
466 /* Do something like remount filesystem read-only */
467 GOTO(err_fsd, rc = -EINVAL);
470 CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
471 obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
472 CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
473 obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
474 CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
475 obd->obd_name, mount_count);
476 CDEBUG(D_INODE, "%s: server data size: %u\n",
477 obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
478 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
479 obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
480 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
481 obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
482 CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
483 obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
486 * When we do a clean FILTER shutdown, we save the last_rcvd into
487 * the header. If we find clients with higher last_rcvd values
488 * then those clients may need recovery done.
490 if (!obd->obd_replayable) {
491 CERROR("%s: recovery support OFF\n", obd->obd_name);
495 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
500 OBD_ALLOC(fcd, sizeof(*fcd));
502 GOTO(err_fsd, rc = -ENOMEM);
505 /* Don't assume off is incremented properly, in case
506 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
508 off = le32_to_cpu(fsd->fsd_client_start) +
509 cl_idx * le16_to_cpu(fsd->fsd_client_size);
510 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
511 if (rc != sizeof(*fcd)) {
512 CERROR("error reading FILTER %s offset %d: rc = %d\n",
513 LAST_RCVD, cl_idx, rc);
514 if (rc > 0) /* XXX fatal error or just abort reading? */
519 if (fcd->fcd_uuid[0] == '\0') {
520 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
525 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
527 /* These exports are cleaned up by filter_disconnect(), so they
528 * need to be set up like real exports as filter_connect() does.
530 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
531 if (mount_age < FILTER_MOUNT_RECOV) {
532 struct obd_export *exp = class_new_export(obd);
533 struct filter_export_data *fed;
534 CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
535 " srv lr: "LPU64" mnt: "LPU64" last mount: "
536 LPU64"\n", fcd->fcd_uuid, cl_idx,
537 last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
538 le64_to_cpu(fcd->fcd_mount_count), mount_count);
540 /* XXX this rc is ignored */
544 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
545 sizeof exp->exp_client_uuid.uuid);
546 fed = &exp->exp_filter_data;
548 filter_client_add(obd, filter, fed, cl_idx);
549 /* create helper if export init gets more complex */
550 INIT_LIST_HEAD(&fed->fed_open_head);
551 spin_lock_init(&fed->fed_lock);
554 obd->obd_recoverable_clients++;
555 class_export_put(exp);
558 "discarded client %d UUID '%s' count "LPU64"\n",
559 cl_idx, fcd->fcd_uuid,
560 le64_to_cpu(fcd->fcd_mount_count));
563 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
566 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
567 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
569 obd->obd_last_committed =
570 le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
571 if (obd->obd_recoverable_clients) {
572 CERROR("RECOVERY: %d recoverable clients, last_rcvd "
573 LPU64"\n", obd->obd_recoverable_clients,
574 le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
575 obd->obd_next_recovery_transno =
576 obd->obd_last_committed + 1;
577 obd->obd_recovering = 1;
583 OBD_FREE(fcd, sizeof(*fcd));
586 fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
588 /* save it,so mount count and last_recvd is current */
589 rc = filter_update_server_data(filp, filter->fo_fsd);
594 filter_free_server_data(filter);
598 /* setup the object store with correct subdirectories */
599 static int filter_prep(struct obd_device *obd)
601 struct obd_run_ctxt saved;
602 struct filter_obd *filter = &obd->u.filter;
603 struct dentry *dentry, *O_dentry;
610 push_ctxt(&saved, &filter->fo_ctxt, NULL);
611 dentry = simple_mkdir(current->fs->pwd, "O", 0700);
612 CDEBUG(D_INODE, "got/created O: %p\n", dentry);
613 if (IS_ERR(dentry)) {
614 rc = PTR_ERR(dentry);
615 CERROR("cannot open/create O: rc = %d\n", rc);
618 filter->fo_dentry_O = dentry;
621 * Create directories and/or get dentries for each object type.
622 * This saves us from having to do multiple lookups for each one.
624 O_dentry = filter->fo_dentry_O;
625 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
626 char *name = obd_type_by_mode[mode];
629 filter->fo_dentry_O_mode[mode] = NULL;
632 dentry = simple_mkdir(O_dentry, name, 0700);
633 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
634 if (IS_ERR(dentry)) {
635 rc = PTR_ERR(dentry);
636 CERROR("cannot create O/%s: rc = %d\n", name, rc);
637 GOTO(err_O_mode, rc);
639 filter->fo_dentry_O_mode[mode] = dentry;
642 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
643 if (!file || IS_ERR(file)) {
645 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
647 GOTO(err_O_mode, rc);
650 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
651 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
652 file->f_dentry->d_inode->i_mode);
653 GOTO(err_filp, rc = -ENOENT);
656 rc = fsfilt_journal_data(obd, file);
658 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
661 /* steal operations */
662 inode = file->f_dentry->d_inode;
663 filter->fo_fop = file->f_op;
664 filter->fo_iop = inode->i_op;
665 filter->fo_aops = inode->i_mapping->a_ops;
667 rc = filter_init_server_data(obd, file, INIT_OBJID);
669 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
670 GOTO(err_client, rc);
672 filter->fo_rcvd_filp = file;
674 if (filter->fo_subdir_count) {
675 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
676 OBD_ALLOC(filter->fo_dentry_O_sub,
677 filter->fo_subdir_count * sizeof(dentry));
678 if (!filter->fo_dentry_O_sub)
679 GOTO(err_client, rc = -ENOMEM);
681 for (i = 0; i < filter->fo_subdir_count; i++) {
683 snprintf(dir, sizeof(dir), "d%u", i);
685 dentry = simple_mkdir(O_dentry, dir, 0700);
686 CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
687 if (IS_ERR(dentry)) {
688 rc = PTR_ERR(dentry);
689 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
692 filter->fo_dentry_O_sub[i] = dentry;
697 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
703 struct dentry *dentry = filter->fo_dentry_O_sub[i];
706 filter->fo_dentry_O_sub[i] = NULL;
709 OBD_FREE(filter->fo_dentry_O_sub,
710 filter->fo_subdir_count * sizeof(dentry));
712 class_disconnect_exports(obd, 0);
714 if (filp_close(file, 0))
715 CERROR("can't close %s after error\n", LAST_RCVD);
716 filter->fo_rcvd_filp = NULL;
719 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
722 filter->fo_dentry_O_mode[mode] = NULL;
725 f_dput(filter->fo_dentry_O);
726 filter->fo_dentry_O = NULL;
730 /* cleanup the filter: write last used object id to status file */
731 static void filter_post(struct obd_device *obd)
733 struct obd_run_ctxt saved;
734 struct filter_obd *filter = &obd->u.filter;
738 /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
739 * best to start a transaction with h_sync, because we removed this
742 push_ctxt(&saved, &filter->fo_ctxt, NULL);
743 rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
745 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
748 if (filter->fo_rcvd_filp) {
749 rc = file_fsync(filter->fo_rcvd_filp,
750 filter->fo_rcvd_filp->f_dentry, 1);
751 filp_close(filter->fo_rcvd_filp, 0);
752 filter->fo_rcvd_filp = NULL;
754 CERROR("last_rcvd file won't closed rc = %ld\n", rc);
757 if (filter->fo_subdir_count) {
759 for (i = 0; i < filter->fo_subdir_count; i++) {
760 struct dentry *dentry = filter->fo_dentry_O_sub[i];
762 filter->fo_dentry_O_sub[i] = NULL;
764 OBD_FREE(filter->fo_dentry_O_sub,
765 filter->fo_subdir_count *
766 sizeof(*filter->fo_dentry_O_sub));
768 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
769 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
772 filter->fo_dentry_O_mode[mode] = NULL;
775 f_dput(filter->fo_dentry_O);
776 filter_free_server_data(filter);
777 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
781 static __u64 filter_next_id(struct filter_obd *filter)
784 LASSERT(filter->fo_fsd != NULL);
786 spin_lock(&filter->fo_objidlock);
787 id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
788 filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
789 spin_unlock(&filter->fo_objidlock);
794 /* direct cut-n-paste of mds_blocking_ast() */
795 int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
796 void *data, int flag)
801 if (flag == LDLM_CB_CANCELING) {
802 /* Don't need to do anything here. */
806 /* XXX layering violation! -phil */
807 l_lock(&lock->l_resource->lr_namespace->ns_lock);
808 /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
809 * such that mds_blocking_ast is called just before l_i_p takes the
810 * ns_lock, then by the time we get the lock, we might not be the
811 * correct blocking function anymore. So check, and return early, if
813 if (lock->l_blocking_ast != filter_blocking_ast) {
814 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
818 lock->l_flags |= LDLM_FL_CBPENDING;
819 do_ast = (!lock->l_readers && !lock->l_writers);
820 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
823 struct lustre_handle lockh;
826 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
827 ldlm_lock2handle(lock, &lockh);
828 rc = ldlm_cli_cancel(&lockh);
830 CERROR("ldlm_cli_cancel: %d\n", rc);
832 LDLM_DEBUG(lock, "Lock still has references, will be "
838 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
839 ldlm_mode_t lock_mode,struct lustre_handle *lockh)
841 struct ldlm_res_id res_id = { .name = {0} };
845 res_id.name[0] = de->d_inode->i_ino;
846 res_id.name[1] = de->d_inode->i_generation;
847 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
848 res_id, LDLM_PLAIN, NULL, 0, lock_mode,
849 &flags, ldlm_completion_ast,
850 filter_blocking_ast, NULL, lockh);
852 RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
855 static void filter_parent_unlock(struct dentry *dparent,
856 struct lustre_handle *lockh,
857 ldlm_mode_t lock_mode)
859 ldlm_lock_decref(lockh, lock_mode);
862 /* We never dget the object parent, so DON'T dput it either */
863 static inline struct dentry *filter_parent(struct obd_device *obd,
864 obd_mode mode, obd_id objid)
866 struct filter_obd *filter = &obd->u.filter;
868 LASSERT(S_ISREG(mode)); /* only regular files for now */
869 if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
870 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
872 return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
875 /* We never dget the object parent, so DON'T dput it either */
876 static inline struct dentry *filter_parent_lock(struct obd_device *obd,
877 obd_mode mode, obd_id objid,
878 ldlm_mode_t lock_mode,
879 struct lustre_handle *lockh)
881 unsigned long now = jiffies;
882 struct dentry *de = filter_parent(obd, mode, objid);
888 rc = filter_lock_dentry(obd, de, lock_mode, lockh);
889 if (time_after(jiffies, now + 15*HZ))
890 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
891 return rc ? ERR_PTR(rc) : de;
894 /* How to get files, dentries, inodes from object id's.
896 * If dir_dentry is passed, the caller has already locked the parent
897 * appropriately for this operation (normally a write lock). If
898 * dir_dentry is NULL, we do a read lock while we do the lookup to
899 * avoid races with create/destroy and such changing the directory
900 * internal to the filesystem code.
902 static struct dentry *filter_fid2dentry(struct obd_device *obd,
903 struct dentry *dir_dentry,
904 obd_mode mode, obd_id id)
906 struct super_block *sb = obd->u.filter.fo_sb;
907 struct lustre_handle lockh;
908 struct dentry *dparent = dir_dentry;
909 struct dentry *dchild;
914 if (!sb || !sb->s_dev) {
915 CERROR("device not initialized.\n");
916 RETURN(ERR_PTR(-ENXIO));
920 CERROR("fatal: invalid object id 0\n");
922 RETURN(ERR_PTR(-ESTALE));
925 len = sprintf(name, LPU64, id);
927 dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
931 CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
932 dparent->d_name.len, dparent->d_name.name, name);
933 dchild = ll_lookup_one_len(name, dparent, len);
935 filter_parent_unlock(dparent, &lockh, LCK_PR);
936 if (IS_ERR(dchild)) {
937 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
941 CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
942 name, dchild, atomic_read(&dchild->d_count));
944 LASSERT(atomic_read(&dchild->d_count) > 0);
949 static struct file *filter_obj_open(struct obd_export *export,
950 __u64 id, __u32 type,
951 ldlm_mode_t parent_mode,
952 struct lustre_handle *parent_lockh)
954 struct obd_device *obd = export->exp_obd;
955 struct filter_obd *filter = &obd->u.filter;
956 struct super_block *sb = filter->fo_sb;
957 struct dentry *dchild = NULL, *dparent = NULL;
958 struct filter_export_data *fed = &export->exp_filter_data;
959 struct filter_dentry_data *fdd = NULL;
960 struct filter_file_data *ffd = NULL;
961 struct obd_run_ctxt saved;
964 int len, cleanup_phase = 0;
967 push_ctxt(&saved, &filter->fo_ctxt, NULL);
969 if (!sb || !sb->s_dev) {
970 CERROR("fatal: device not initialized.\n");
971 GOTO(cleanup, file = ERR_PTR(-ENXIO));
975 CERROR("fatal: invalid obdo "LPU64"\n", id);
976 GOTO(cleanup, file = ERR_PTR(-ESTALE));
979 if (!(type & S_IFMT)) {
980 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
981 __FUNCTION__, id, type);
982 GOTO(cleanup, file = ERR_PTR(-EINVAL));
985 ffd = filter_ffd_new();
987 CERROR("obdfilter: out of memory\n");
988 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
993 /* We preallocate this to avoid blocking while holding fo_fddlock */
994 OBD_ALLOC(fdd, sizeof *fdd);
996 CERROR("obdfilter: out of memory\n");
997 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1002 dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1003 if (IS_ERR(dparent))
1004 GOTO(cleanup, file = (void *)dparent);
1008 len = snprintf(name, sizeof(name), LPU64, id);
1009 dchild = ll_lookup_one_len(name, dparent, len);
1011 GOTO(cleanup, file = (void *)dchild);
1015 if (dchild->d_inode == NULL) {
1016 CERROR("opening non-existent object %s - O_CREAT?\n", name);
1017 file = ERR_PTR(-ENOENT);
1018 GOTO(cleanup, file);
1021 /* dentry_open does a dput(dchild) and mntput(mnt) on error */
1022 mntget(filter->fo_vfsmnt);
1023 file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1025 dchild = NULL; /* prevent a double dput in step 4 */
1026 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1027 GOTO(cleanup, file);
1030 spin_lock(&filter->fo_fddlock);
1031 if (dchild->d_fsdata) {
1032 spin_unlock(&filter->fo_fddlock);
1033 OBD_FREE(fdd, sizeof *fdd);
1034 fdd = dchild->d_fsdata;
1035 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1036 /* should only happen during client recovery */
1037 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1038 CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1039 atomic_inc(&fdd->fdd_open_count);
1041 atomic_set(&fdd->fdd_open_count, 1);
1042 fdd->fdd_magic = FILTER_DENTRY_MAGIC;
1044 fdd->fdd_objid = id;
1045 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1046 dchild->d_fsdata = fdd;
1047 spin_unlock(&filter->fo_fddlock);
1050 ffd->ffd_file = file;
1051 LASSERT(file->private_data == NULL);
1052 file->private_data = ffd;
1055 dchild->d_op = &filter_dops;
1057 LASSERT(dchild->d_op == &filter_dops);
1059 spin_lock(&fed->fed_lock);
1060 list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1061 spin_unlock(&fed->fed_lock);
1063 CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1065 switch (cleanup_phase) {
1071 filter_parent_unlock(dparent, parent_lockh,parent_mode);
1074 OBD_FREE(fdd, sizeof *fdd);
1077 filter_ffd_destroy(ffd);
1078 filter_ffd_put(ffd);
1080 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1085 /* Caller must hold LCK_PW on parent and push us into kernel context.
1086 * Caller is also required to ensure that dchild->d_inode exists.
1088 static int filter_destroy_internal(struct obd_device *obd,
1089 struct dentry *dparent,
1090 struct dentry *dchild)
1092 struct inode *inode = dchild->d_inode;
1096 if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1097 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1098 dchild->d_name.len, dchild->d_name.name,
1099 inode->i_nlink, atomic_read(&inode->i_count));
1102 rc = vfs_unlink(dparent->d_inode, dchild);
1105 CERROR("error unlinking objid %*s: rc %d\n",
1106 dchild->d_name.len, dchild->d_name.name, rc);
1111 /* If closing because we are failing this device, then
1112 don't do the unlink on close.
1114 static int filter_close_internal(struct obd_export *exp,
1115 struct filter_file_data *ffd,
1116 struct obd_trans_info *oti,
1119 struct obd_device *obd = exp->exp_obd;
1120 struct filter_obd *filter = &obd->u.filter;
1121 struct file *filp = ffd->ffd_file;
1122 struct dentry *dchild = dget(filp->f_dentry);
1123 struct filter_dentry_data *fdd = dchild->d_fsdata;
1124 struct lustre_handle parent_lockh;
1125 int rc, rc2, cleanup_phase = 0;
1126 struct dentry *dparent = NULL;
1127 struct obd_run_ctxt saved;
1130 LASSERT(filp->private_data == ffd);
1132 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1134 rc = filp_close(filp, 0);
1136 if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1137 fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
1140 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1143 LASSERT(fdd->fdd_objid > 0);
1144 dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1145 LCK_PW, &parent_lockh);
1146 if (IS_ERR(dparent))
1147 GOTO(cleanup, rc = PTR_ERR(dparent));
1150 handle = fsfilt_start(obd, dparent->d_inode,
1153 GOTO(cleanup, rc = PTR_ERR(handle));
1155 /* XXX unlink from PENDING directory now too */
1156 rc2 = filter_destroy_internal(obd, dparent, dchild);
1159 rc = filter_finish_transno(exp, handle, oti, rc);
1160 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1162 CERROR("error on commit, err = %d\n", rc2);
1169 switch(cleanup_phase) {
1171 if (rc || oti == NULL) {
1172 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1174 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1175 sizeof(parent_lockh));
1176 oti->oti_ack_locks[0].mode = LCK_PW;
1179 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1182 filter_ffd_destroy(ffd);
1185 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1193 /* mount the file system (secretly) */
1194 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1197 struct obd_ioctl_data* data = buf;
1198 struct filter_obd *filter = &obd->u.filter;
1200 struct vfsmount *mnt;
1204 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1207 obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1208 if (IS_ERR(obd->obd_fsops))
1209 RETURN(PTR_ERR(obd->obd_fsops));
1211 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1216 if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1217 if (*data->ioc_inlbuf3 == 'f') {
1218 obd->obd_replayable = 1;
1219 obd_sync_filter = 1;
1220 CERROR("%s: configured for recovery and sync write\n",
1223 if (*data->ioc_inlbuf3 != 'n') {
1224 CERROR("unrecognised flag '%c'\n",
1225 *data->ioc_inlbuf3);
1230 if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
1231 if (*data->ioc_inlbuf4 == '/') {
1232 CERROR("filter namespace mount: %s\n",
1234 filter->fo_nspath = strdup(data->ioc_inlbuf4);
1236 CERROR("namespace mount must be absolute path: '%s'\n",
1241 filter->fo_vfsmnt = mnt;
1242 filter->fo_sb = mnt->mnt_sb;
1243 filter->fo_fstype = mnt->mnt_sb->s_type->name;
1244 CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1246 OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1247 filter->fo_ctxt.pwdmnt = mnt;
1248 filter->fo_ctxt.pwd = mnt->mnt_root;
1249 filter->fo_ctxt.fs = get_ds();
1251 rc = filter_prep(obd);
1253 GOTO(err_mntput, rc);
1255 spin_lock_init(&filter->fo_translock);
1256 spin_lock_init(&filter->fo_fddlock);
1257 spin_lock_init(&filter->fo_objidlock);
1258 INIT_LIST_HEAD(&filter->fo_export_list);
1260 obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1261 LDLM_NAMESPACE_SERVER);
1262 if (!obd->obd_namespace)
1263 GOTO(err_post, rc = -ENOMEM);
1265 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1266 "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1278 fsfilt_put_ops(obd->obd_fsops);
1282 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1284 struct obd_ioctl_data* data = buf;
1285 char *option = NULL;
1287 if (!strcmp(data->ioc_inlbuf2, "ext3"))
1288 option = "asyncdel";
1290 return filter_common_setup(obd, len, buf, option);
1293 /* sanobd setup methods - use a specific mount option */
1294 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1296 struct obd_ioctl_data* data = buf;
1297 char *option = NULL;
1299 if (!data->ioc_inlbuf2)
1302 /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1303 if (!strcmp(data->ioc_inlbuf2, "extN"))
1304 option = "data=writeback";
1305 else if (!strcmp(data->ioc_inlbuf2, "ext3"))
1306 option = "data=writeback,asyncdel";
1308 LBUG(); /* just a reminder */
1310 return filter_common_setup(obd, len, buf, option);
1313 static int filter_cleanup(struct obd_device *obd, int force, int failover)
1315 struct super_block *sb;
1319 CERROR("%s: shutting down for failover; client state will"
1320 " be preserved.\n", obd->obd_name);
1322 if (!list_empty(&obd->obd_exports)) {
1323 CERROR("%s: still has clients!\n", obd->obd_name);
1324 class_disconnect_exports(obd, failover);
1325 if (!list_empty(&obd->obd_exports)) {
1326 CERROR("still has exports after forced cleanup?\n");
1331 ldlm_namespace_free(obd->obd_namespace);
1333 sb = obd->u.filter.fo_sb;
1339 shrink_dcache_parent(sb->s_root);
1342 if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
1343 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1344 atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
1347 mntput(obd->u.filter.fo_vfsmnt);
1348 obd->u.filter.fo_sb = 0;
1349 /* destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
1351 fsfilt_put_ops(obd->obd_fsops);
1357 int filter_attach(struct obd_device *obd, obd_count len, void *data)
1359 struct lprocfs_static_vars lvars;
1362 lprocfs_init_vars(&lvars);
1363 rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1367 rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1371 /* Init obdfilter private stats here */
1372 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1373 LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1374 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1375 LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1379 int filter_detach(struct obd_device *dev)
1381 lprocfs_free_obd_stats(dev);
1382 return lprocfs_obd_detach(dev);
1385 /* nearly identical to mds_connect */
1386 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1387 struct obd_uuid *cluuid)
1389 struct obd_export *exp;
1390 struct filter_export_data *fed;
1391 struct filter_client_data *fcd;
1392 struct filter_obd *filter = &obd->u.filter;
1397 if (!conn || !obd || !cluuid)
1400 rc = class_connect(conn, obd, cluuid);
1403 exp = class_conn2export(conn);
1406 fed = &exp->exp_filter_data;
1407 class_export_put(exp);
1409 INIT_LIST_HEAD(&fed->fed_open_head);
1410 spin_lock_init(&fed->fed_lock);
1412 if (!obd->obd_replayable)
1415 OBD_ALLOC(fcd, sizeof(*fcd));
1417 CERROR("filter: out of memory for client data\n");
1418 GOTO(out_export, rc = -ENOMEM);
1421 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1423 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1425 rc = filter_client_add(obd, filter, fed, -1);
1432 OBD_FREE(fcd, sizeof(*fcd));
1434 class_disconnect(conn, 0);
1439 static void filter_destroy_export(struct obd_export *exp)
1441 struct filter_export_data *fed = &exp->exp_filter_data;
1444 spin_lock(&fed->fed_lock);
1445 while (!list_empty(&fed->fed_open_head)) {
1446 struct filter_file_data *ffd;
1448 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1450 list_del(&ffd->ffd_export_list);
1451 spin_unlock(&fed->fed_lock);
1453 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1454 ffd->ffd_file->f_dentry->d_name.len,
1455 ffd->ffd_file->f_dentry->d_name.name,
1456 ffd, ffd->ffd_handle.h_cookie);
1458 filter_close_internal(exp, ffd, NULL, exp->exp_failover);
1459 spin_lock(&fed->fed_lock);
1461 spin_unlock(&fed->fed_lock);
1463 if (exp->exp_obd->obd_replayable)
1464 filter_client_free(exp, exp->exp_failover);
1468 /* also incredibly similar to mds_disconnect */
1469 static int filter_disconnect(struct lustre_handle *conn, int failover)
1471 struct obd_export *exp = class_conn2export(conn);
1473 unsigned long flags;
1477 ldlm_cancel_locks_for_export(exp);
1479 spin_lock_irqsave(&exp->exp_lock, flags);
1480 exp->exp_failover = failover;
1481 spin_unlock_irqrestore(&exp->exp_lock, flags);
1483 rc = class_disconnect(conn, failover);
1485 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1486 class_export_put(exp);
1487 /* XXX cleanup preallocated inodes */
1491 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1493 int type = oa->o_mode & S_IFMT;
1496 CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1497 inode->i_ino, inode, oa->o_id, valid);
1498 /* Don't copy the inode number in place of the object ID */
1499 obdo_from_inode(oa, inode, valid);
1500 oa->o_mode &= ~S_IFMT;
1503 if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1504 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1506 oa->o_valid |= OBD_MD_FLRDEV;
1512 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1513 struct obdo *oa, char *what)
1515 struct dentry *dchild = NULL;
1517 if (oa->o_valid & OBD_MD_FLHANDLE) {
1518 struct lustre_handle *ost_handle = obdo_handle(oa);
1519 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1522 struct filter_dentry_data *fdd;
1523 dchild = dget(ffd->ffd_file->f_dentry);
1524 fdd = dchild->d_fsdata;
1525 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1526 filter_ffd_put(ffd);
1529 "got child objid %*s: %p, count = %d\n",
1530 dchild->d_name.len, dchild->d_name.name,
1531 dchild, atomic_read(&dchild->d_count));
1536 struct obd_device *obd = class_conn2obd(conn);
1539 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1540 RETURN(ERR_PTR(-EINVAL));
1542 dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
1545 if (IS_ERR(dchild)) {
1546 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1550 if (!dchild->d_inode) {
1551 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1553 RETURN(ERR_PTR(-ENOENT));
1559 #define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__)
1561 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1562 struct lov_stripe_md *md)
1564 struct dentry *dentry = NULL;
1568 dentry = filter_oa2dentry(conn, oa);
1570 RETURN(PTR_ERR(dentry));
1572 filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1578 /* this is called from filter_truncate() until we have filter_punch() */
1579 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1580 struct lov_stripe_md *md, struct obd_trans_info *oti)
1582 struct obd_run_ctxt saved;
1583 struct obd_export *export = class_conn2export(conn);
1584 struct obd_device *obd = class_conn2obd(conn);
1585 struct filter_obd *filter = &obd->u.filter;
1586 struct dentry *dentry;
1588 struct inode *inode;
1593 dentry = filter_oa2dentry(conn, oa);
1596 GOTO(out_exp, rc = PTR_ERR(dentry));
1598 iattr_from_obdo(&iattr, oa, oa->o_valid);
1599 iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1600 inode = dentry->d_inode;
1602 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1604 if (iattr.ia_valid & ATTR_SIZE)
1605 down(&inode->i_sem);
1607 handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1609 GOTO(out_unlock, rc = PTR_ERR(handle));
1611 rc = fsfilt_setattr(obd, dentry, handle, &iattr, 1);
1612 rc = filter_finish_transno(export, handle, oti, rc);
1613 rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
1615 CERROR("error on commit, err = %d\n", rc2);
1620 if (iattr.ia_valid & ATTR_SIZE) {
1622 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1623 obdo_from_inode(oa, inode, oa->o_valid);
1628 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1632 class_export_put(export);
1636 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1637 struct lov_stripe_md *ea, struct obd_trans_info *oti,
1638 struct obd_client_handle *och)
1640 struct obd_export *export = NULL;
1641 struct lustre_handle *handle;
1642 struct filter_file_data *ffd;
1644 struct lustre_handle parent_lockh;
1648 export = class_conn2export(conn);
1650 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1652 GOTO(out, rc = -EINVAL);
1655 filp = filter_obj_open(export, oa->o_id, oa->o_mode,
1656 LCK_PR, &parent_lockh);
1658 GOTO(out, rc = PTR_ERR(filp));
1660 filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1662 ffd = filp->private_data;
1663 handle = obdo_handle(oa);
1664 handle->cookie = ffd->ffd_handle.h_cookie;
1665 oa->o_valid |= OBD_MD_FLHANDLE;
1668 class_export_put(export);
1670 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1671 sizeof(parent_lockh));
1672 oti->oti_ack_locks[0].mode = LCK_PR;
1677 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1678 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1680 struct obd_export *exp = class_conn2export(conn);
1681 struct filter_file_data *ffd;
1682 struct filter_export_data *fed;
1687 CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
1688 GOTO(out, rc = -EINVAL);
1691 if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1692 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1693 GOTO(out, rc = -EINVAL);
1696 ffd = filter_handle2ffd(obdo_handle(oa));
1698 CERROR("bad handle ("LPX64") for close\n",
1699 obdo_handle(oa)->cookie);
1700 GOTO(out, rc = -ESTALE);
1703 fed = &exp->exp_filter_data;
1704 spin_lock(&fed->fed_lock);
1705 list_del(&ffd->ffd_export_list);
1706 spin_unlock(&fed->fed_lock);
1708 rc = filter_close_internal(exp, ffd, oti, 0);
1709 filter_ffd_put(ffd);
1712 class_export_put(exp);
1716 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1717 struct lov_stripe_md **ea, struct obd_trans_info *oti)
1719 struct obd_export *exp;
1720 struct obd_device *obd = class_conn2obd(conn);
1721 struct filter_obd *filter = &obd->u.filter;
1722 struct obd_run_ctxt saved;
1723 struct lustre_handle parent_lockh;
1724 struct dentry *dparent;
1725 struct dentry *dchild = NULL;
1728 int err, rc, cleanup_phase;
1732 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1736 exp = class_conn2export(conn);
1738 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1740 oa->o_id = filter_next_id(filter);
1743 dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1745 if (IS_ERR(dparent))
1746 GOTO(cleanup, rc = PTR_ERR(dparent));
1749 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1751 GOTO(cleanup, rc = PTR_ERR(dchild));
1752 if (dchild->d_inode) {
1753 /* This would only happen if lastobjid was bad on disk */
1754 CERROR("Serious error: objid %*s already exists; is this "
1755 "filesystem corrupt? I will try to work around it.\n",
1756 dchild->d_name.len, dchild->d_name.name);
1758 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1763 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE);
1765 GOTO(cleanup, rc = PTR_ERR(handle));
1767 rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
1769 CERROR("create failed rc = %d\n", rc);
1771 rc = filter_finish_transno(exp, handle, oti, rc);
1772 err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1774 CERROR("unable to write lastobjid but file created\n");
1778 err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1780 CERROR("error on commit, err = %d\n", err);
1788 /* Set flags for fields we have set in the inode struct */
1789 oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1790 OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1791 filter_from_inode(oa, dchild->d_inode, oa->o_valid);
1795 switch(cleanup_phase) {
1798 case 1: /* locked parent dentry */
1799 if (rc || oti == NULL) {
1800 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1802 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1803 sizeof(parent_lockh));
1804 oti->oti_ack_locks[0].mode = LCK_PW;
1807 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1808 class_export_put(exp);
1811 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1818 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1819 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1821 struct obd_export *exp;
1822 struct obd_device *obd = class_conn2obd(conn);
1823 struct filter_obd *filter = &obd->u.filter;
1824 struct dentry *dparent, *dchild = NULL;
1825 struct filter_dentry_data *fdd;
1826 struct obd_run_ctxt saved;
1827 void *handle = NULL;
1828 struct lustre_handle parent_lockh;
1829 int rc, rc2, cleanup_phase = 0;
1833 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1837 exp = class_conn2export(conn);
1839 CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1841 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1842 dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1843 LCK_PW, &parent_lockh);
1844 if (IS_ERR(dparent))
1845 GOTO(cleanup, rc = PTR_ERR(dparent));
1848 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1850 GOTO(cleanup, rc = -ENOENT);
1853 if (!dchild->d_inode) {
1854 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
1855 GOTO(cleanup, rc = -ENOENT);
1858 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK);
1860 GOTO(cleanup, rc = PTR_ERR(handle));
1863 fdd = dchild->d_fsdata;
1864 if (fdd && atomic_read(&fdd->fdd_open_count)) {
1865 LASSERT(fdd->fdd_magic = FILTER_DENTRY_MAGIC);
1866 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1867 fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1868 /* XXX put into PENDING directory in case of crash */
1870 "defer destroy of %dx open objid "LPU64"\n",
1871 atomic_read(&fdd->fdd_open_count), oa->o_id);
1874 "repeat destroy of %dx open objid "LPU64"\n",
1875 atomic_read(&fdd->fdd_open_count), oa->o_id);
1876 GOTO(cleanup, rc = 0);
1879 rc = filter_destroy_internal(obd, dparent, dchild);
1882 switch(cleanup_phase) {
1884 rc = filter_finish_transno(exp, handle, oti, rc);
1885 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1887 CERROR("error on commit, err = %d\n", rc2);
1894 if (rc || oti == NULL) {
1895 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1897 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1898 sizeof(parent_lockh));
1899 oti->oti_ack_locks[0].mode = LCK_PW;
1902 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1903 class_export_put(exp);
1906 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1913 /* NB start and end are used for punch, but not truncate */
1914 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1915 struct lov_stripe_md *lsm,
1916 obd_off start, obd_off end,
1917 struct obd_trans_info *oti)
1922 if (end != OBD_OBJECT_EOF)
1923 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
1926 CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
1927 "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1929 error = filter_setattr(conn, oa, NULL, oti);
1933 static inline void lustre_put_page(struct page *page)
1935 page_cache_release(page);
1938 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
1940 struct address_space *mapping = inode->i_mapping;
1942 unsigned long index = lnb->offset >> PAGE_SHIFT;
1945 page = grab_cache_page(mapping, index); /* locked page */
1947 return lnb->rc = PTR_ERR(page);
1951 if (inode->i_size < lnb->offset + lnb->len - 1)
1952 lnb->rc = inode->i_size - lnb->offset;
1956 if (PageUptodate(page)) {
1961 rc = mapping->a_ops->readpage(NULL, page);
1963 CERROR("page index %lu, rc = %d\n", index, rc);
1965 lustre_put_page(page);
1966 return lnb->rc = rc;
1972 static int filter_finish_page_read(struct niobuf_local *lnb)
1974 if (lnb->page == NULL)
1977 if (PageUptodate(lnb->page))
1980 wait_on_page(lnb->page);
1981 if (!PageUptodate(lnb->page)) {
1982 CERROR("page index %lu/offset "LPX64" not uptodate\n",
1983 lnb->page->index, lnb->offset);
1984 GOTO(err_page, lnb->rc = -EIO);
1986 if (PageError(lnb->page)) {
1987 CERROR("page index %lu/offset "LPX64" has error\n",
1988 lnb->page->index, lnb->offset);
1989 GOTO(err_page, lnb->rc = -EIO);
1995 lustre_put_page(lnb->page);
2000 static struct page *lustre_get_page_write(struct inode *inode,
2001 unsigned long index)
2003 struct address_space *mapping = inode->i_mapping;
2007 page = grab_cache_page(mapping, index); /* locked page */
2009 if (!IS_ERR(page)) {
2010 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
2011 * a no-op for most filesystems, because we write the whole
2012 * page. For partial-page I/O this will read in the page.
2014 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
2016 CERROR("page index %lu, rc = %d\n", index, rc);
2019 GOTO(err_unlock, rc);
2021 /* XXX not sure if we need this if we are overwriting page */
2022 if (PageError(page)) {
2023 CERROR("error on page index %lu, rc = %d\n", index, rc);
2025 GOTO(err_unlock, rc = -EIO);
2032 lustre_put_page(page);
2036 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2037 int waitfor_one_page(struct page *page)
2039 wait_on_page_locked(page);
2044 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2045 /* We should only change the file mtime (and not the ctime, like
2046 * update_inode_times() in generic_file_write()) when we only change data.
2048 static inline void inode_update_time(struct inode *inode, int ctime_too)
2050 time_t now = CURRENT_TIME;
2051 if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
2053 inode->i_mtime = now;
2055 inode->i_ctime = now;
2056 mark_inode_dirty_sync(inode);
2060 static int lustre_commit_write(struct niobuf_local *lnb)
2062 struct page *page = lnb->page;
2063 unsigned from = lnb->offset & ~PAGE_MASK;
2064 unsigned to = from + lnb->len;
2065 struct inode *inode = page->mapping->host;
2068 LASSERT(to <= PAGE_SIZE);
2069 err = page->mapping->a_ops->commit_write(NULL, page, from, to);
2070 if (!err && IS_SYNC(inode))
2071 err = waitfor_one_page(page);
2072 //SetPageUptodate(page); // the client commit_write will do this
2074 SetPageReferenced(page);
2076 lustre_put_page(page);
2080 int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
2083 unsigned long index = lnb->offset >> PAGE_SHIFT;
2084 struct address_space *mapping = inode->i_mapping;
2088 //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
2090 page = grab_cache_page_nowait(mapping, index); /* locked page */
2092 page = grab_cache_page(mapping, index); /* locked page */
2095 /* This page is currently locked, so get a temporary page instead. */
2097 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
2098 page = alloc_pages(GFP_KERNEL, 0); /* locked page */
2100 CERROR("no memory for a temp page\n");
2101 GOTO(err, rc = -ENOMEM);
2103 page->index = index;
2105 lnb->flags |= N_LOCAL_TEMP_PAGE;
2106 } else if (!IS_ERR(page)) {
2109 rc = mapping->a_ops->prepare_write(NULL, page,
2110 lnb->offset & ~PAGE_MASK,
2114 CERROR("page index %lu, rc = %d\n", index, rc);
2115 GOTO(err_unlock, rc);
2117 /* XXX not sure if we need this if we are overwriting page */
2118 if (PageError(page)) {
2119 CERROR("error on page index %lu, rc = %d\n", index, rc);
2121 GOTO(err_unlock, rc = -EIO);
2130 lustre_put_page(page);
2132 return lnb->rc = rc;
2136 * We need to balance prepare_write() calls with commit_write() calls.
2137 * If the page has been prepared, but we have no data for it, we don't
2138 * want to overwrite valid data on disk, but we still need to zero out
2139 * data for space which was newly allocated. Like part of what happens
2140 * in __block_prepare_write() for newly allocated blocks.
2142 * XXX currently __block_prepare_write() creates buffers for all the
2143 * pages, and the filesystems mark these buffers as BH_New if they
2144 * were newly allocated from disk. We use the BH_New flag similarly.
2146 static int filter_commit_write(struct niobuf_local *lnb, int err)
2148 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2150 unsigned block_start, block_end;
2151 struct buffer_head *bh, *head = lnb->page->buffers;
2152 unsigned blocksize = head->b_size;
2154 /* debugging: just seeing if this ever happens */
2155 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
2156 "called for ino %lu:%lu on err %d\n",
2157 lnb->page->mapping->host->i_ino, lnb->page->index, err);
2159 /* Currently one buffer per page, but in the future... */
2160 for (bh = head, block_start = 0; bh != head || !block_start;
2161 block_start = block_end, bh = bh->b_this_page) {
2162 block_end = block_start + blocksize;
2163 if (buffer_new(bh)) {
2164 memset(kmap(lnb->page) + block_start, 0,
2171 return lustre_commit_write(lnb);
2174 static int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
2175 int objcount, struct obd_ioobj *obj,
2176 int niocount, struct niobuf_remote *nb,
2177 struct niobuf_local *res, void **desc_private,
2178 struct obd_trans_info *oti)
2180 struct obd_run_ctxt saved;
2181 struct obd_device *obd;
2182 struct obd_ioobj *o;
2183 struct niobuf_remote *rnb;
2184 struct niobuf_local *lnb;
2185 struct fsfilt_objinfo *fso;
2186 struct dentry *dentry;
2187 struct inode *inode;
2188 int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
2189 unsigned long now = jiffies;
2192 memset(res, 0, niocount * sizeof(*res));
2198 // theoretically we support multi-obj BRW RPCs, but until then...
2199 LASSERT(objcount == 1);
2201 OBD_ALLOC(fso, objcount * sizeof(*fso));
2205 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2207 for (i = 0, o = obj; i < objcount; i++, o++) {
2208 struct filter_dentry_data *fdd;
2210 LASSERT(o->ioo_bufcnt);
2212 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2215 GOTO(out_objinfo, rc = PTR_ERR(dentry));
2217 fso[i].fso_dentry = dentry;
2218 fso[i].fso_bufcnt = o->ioo_bufcnt;
2220 if (!dentry->d_inode) {
2221 CERROR("trying to BRW to non-existent file "LPU64"\n",
2224 GOTO(out_objinfo, rc = -ENOENT);
2227 /* If we ever start to support mutli-object BRW RPCs, we will
2228 * need to get locks on mulitple inodes (in order) or use the
2229 * DLM to do the locking for us (and use the same locking in
2230 * filter_setattr() for truncate). That isn't all, because
2231 * there still exists the possibility of a truncate starting
2232 * a new transaction while holding the ext3 rwsem = write
2233 * while some writes (which have started their transactions
2234 * here) blocking on the ext3 rwsem = read => lock inversion.
2236 * The handling gets very ugly when dealing with locked pages.
2237 * It may be easier to just get rid of the locked page code
2238 * (which has problems of its own) and either discover we do
2239 * not need it anymore (i.e. it was a symptom of another bug)
2240 * or ensure we get the page locks in an appropriate order.
2242 if (cmd & OBD_BRW_WRITE)
2243 down(&dentry->d_inode->i_sem);
2244 fdd = dentry->d_fsdata;
2245 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2246 CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2250 if (time_after(jiffies, now + 15*HZ))
2251 CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
2253 if (cmd & OBD_BRW_WRITE) {
2254 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2256 if (IS_ERR(*desc_private)) {
2257 rc = PTR_ERR(*desc_private);
2258 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2259 "error starting transaction: rc = %d\n", rc);
2260 *desc_private = NULL;
2261 GOTO(out_objinfo, rc);
2265 for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
2266 dentry = fso[i].fso_dentry;
2267 inode = dentry->d_inode;
2269 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2271 lnb->dentry = dentry;
2273 lnb->dentry = dget(dentry);
2275 lnb->offset = rnb->offset;
2276 lnb->len = rnb->len;
2277 lnb->flags = rnb->flags;
2278 lnb->start = jiffies;
2280 if (cmd & OBD_BRW_WRITE) {
2281 rc = filter_get_page_write(inode,lnb,&pglocked);
2283 up(&dentry->d_inode->i_sem);
2284 } else if (inode->i_size <= rnb->offset) {
2285 /* If there's no more data, abort early.
2286 * lnb->page == NULL and lnb->rc == 0, so it's
2287 * easy to detect later. */
2292 rc = filter_start_page_read(inode, lnb);
2296 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2297 "page err %u@"LPU64" %u/%u %p: rc %d\n",
2298 lnb->len, lnb->offset, j, o->ioo_bufcnt,
2301 GOTO(out_pages, rc);
2304 tot_bytes += lnb->len;
2306 if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
2307 /* Likewise with a partial read */
2313 if (time_after(jiffies, now + 15*HZ))
2314 CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
2316 if (cmd & OBD_BRW_READ) {
2317 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
2319 while (lnb-- > res) {
2320 rc = filter_finish_page_read(lnb);
2322 CERROR("error page %u@"LPU64" %u %p: rc %d\n",
2323 lnb->len, lnb->offset, lnb - res,
2325 f_dput(lnb->dentry);
2326 GOTO(out_pages, rc);
2330 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
2333 if (time_after(jiffies, now + 15*HZ))
2334 CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
2338 OBD_FREE(fso, objcount * sizeof(*fso));
2339 current->journal_info = NULL;
2340 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2344 while (lnb-- > res) {
2345 if (cmd & OBD_BRW_WRITE) {
2346 filter_commit_write(lnb, rc);
2347 up(&lnb->dentry->d_inode->i_sem);
2349 lustre_put_page(lnb->page);
2351 f_dput(lnb->dentry);
2353 if (cmd & OBD_BRW_WRITE) {
2354 filter_finish_transno(exp, *desc_private, oti, rc);
2356 filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2359 goto out; /* dropped the dentry refs already (one per page) */
2362 for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
2363 if (cmd & OBD_BRW_WRITE)
2364 up(&fso[i].fso_dentry->d_inode->i_sem);
2365 f_dput(fso[i].fso_dentry);
2370 static int filter_write_locked_page(struct niobuf_local *lnb)
2378 lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2379 if (IS_ERR(lpage)) {
2380 /* It is highly unlikely that we would ever get an error here.
2381 * The page we want to get was previously locked, so it had to
2382 * have already allocated the space, and we were just writing
2383 * over the same data, so there would be no hole in the file.
2385 * XXX: possibility of a race with truncate could exist, need
2386 * to check that. There are no guarantees w.r.t.
2387 * write order even on a local filesystem, although the
2388 * normal response would be to return the number of bytes
2389 * successfully written and leave the rest to the app.
2391 rc = PTR_ERR(lpage);
2392 CERROR("error getting locked page index %ld: rc = %d\n",
2393 lnb->page->index, rc);
2395 lustre_commit_write(lnb);
2399 /* 2 kmaps == vanishingly small deadlock opportunity */
2400 lpage_addr = kmap(lpage);
2401 lnb_addr = kmap(lnb->page);
2403 memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
2408 lustre_put_page(lnb->page);
2411 rc = lustre_commit_write(lnb);
2413 CERROR("error committing locked page %ld: rc = %d\n",
2414 lnb->page->index, rc);
2419 static int filter_syncfs(struct obd_export *exp)
2421 struct obd_device *obd = exp->exp_obd;
2424 RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2427 static int filter_commitrw(int cmd, struct obd_export *exp,
2428 int objcount, struct obd_ioobj *obj,
2429 int niocount, struct niobuf_local *res,
2430 void *desc_private, struct obd_trans_info *oti)
2432 struct obd_run_ctxt saved;
2433 struct obd_ioobj *o;
2434 struct niobuf_local *lnb;
2435 struct obd_device *obd = exp->exp_obd;
2436 int found_locked = 0, rc = 0, i;
2437 unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
2440 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2442 LASSERT(!current->journal_info);
2443 current->journal_info = desc_private;
2445 for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2448 if (cmd & OBD_BRW_WRITE) {
2449 inode_update_time(lnb->dentry->d_inode, 1);
2450 up(&lnb->dentry->d_inode->i_sem);
2452 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2453 if (lnb->page == NULL) {
2457 if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2462 if (time_after(jiffies, lnb->start + 15*HZ))
2463 CERROR("slow commitrw %lus\n",
2464 (jiffies - lnb->start) / HZ);
2466 if (cmd & OBD_BRW_WRITE) {
2467 int err = filter_commit_write(lnb, 0);
2472 lustre_put_page(lnb->page);
2475 f_dput(lnb->dentry);
2476 if (time_after(jiffies, lnb->start + 15*HZ))
2477 CERROR("slow commit_write %lus\n",
2478 (jiffies - lnb->start) / HZ);
2482 for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2485 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2487 if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2490 if (time_after(jiffies, lnb->start + 15*HZ))
2491 CERROR("slow commitrw locked %lus\n",
2492 (jiffies - lnb->start) / HZ);
2494 err = filter_write_locked_page(lnb);
2497 f_dput(lnb->dentry);
2500 if (time_after(jiffies, lnb->start + 15*HZ))
2501 CERROR("slow commit_write locked %lus\n",
2502 (jiffies - lnb->start) / HZ);
2506 if (cmd & OBD_BRW_WRITE) {
2507 /* We just want any dentry for the commit, for now */
2508 struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
2511 rc = filter_finish_transno(exp, desc_private, oti, rc);
2512 err = fsfilt_commit(obd, dparent->d_inode, desc_private,
2516 if (obd_sync_filter)
2517 LASSERT(oti->oti_transno <= obd->obd_last_committed);
2519 if (time_after(jiffies, now + 15*HZ))
2520 CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
2523 LASSERT(!current->journal_info);
2525 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2529 static int filter_brw(int cmd, struct lustre_handle *conn,
2530 struct lov_stripe_md *lsm, obd_count oa_bufs,
2531 struct brw_page *pga, struct obd_trans_info *oti)
2533 struct obd_export *export = class_conn2export(conn);
2534 struct obd_ioobj ioo;
2535 struct niobuf_local *lnb;
2536 struct niobuf_remote *rnb;
2545 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2546 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2548 if (lnb == NULL || rnb == NULL)
2549 GOTO(out, ret = -ENOMEM);
2551 for (i = 0; i < oa_bufs; i++) {
2552 rnb[i].offset = pga[i].off;
2553 rnb[i].len = pga[i].count;
2556 ioo.ioo_id = lsm->lsm_object_id;
2558 ioo.ioo_type = S_IFREG;
2559 ioo.ioo_bufcnt = oa_bufs;
2561 ret = filter_preprw(cmd, export, NULL, 1, &ioo, oa_bufs, rnb, lnb,
2562 &desc_private, oti);
2566 for (i = 0; i < oa_bufs; i++) {
2567 void *virt = kmap(pga[i].pg);
2568 obd_off off = pga[i].off & ~PAGE_MASK;
2569 void *addr = kmap(lnb[i].page);
2571 /* 2 kmaps == vanishingly small deadlock opportunity */
2573 if (cmd & OBD_BRW_WRITE)
2574 memcpy(addr + off, virt + off, pga[i].count);
2576 memcpy(virt + off, addr + off, pga[i].count);
2582 ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
2587 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2589 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2590 class_export_put(export);
2594 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2595 int objcount, struct obd_ioobj *obj,
2596 int niocount, struct niobuf_remote *nb)
2598 struct obd_device *obd;
2599 struct obd_ioobj *o = obj;
2600 struct niobuf_remote *rnb = nb;
2605 obd = class_conn2obd(conn);
2607 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2612 for (i = 0; i < objcount; i++, o++) {
2613 struct dentry *dentry;
2614 struct inode *inode;
2615 int (*fs_bmap)(struct address_space *, long);
2618 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2620 GOTO(out, rc = PTR_ERR(dentry));
2621 inode = dentry->d_inode;
2623 CERROR("trying to BRW to non-existent file "LPU64"\n",
2626 GOTO(out, rc = -ENOENT);
2628 fs_bmap = inode->i_mapping->a_ops->bmap;
2630 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2633 block = rnb->offset >> inode->i_blkbits;
2635 if (cmd == OBD_BRW_READ) {
2636 block = fs_bmap(inode->i_mapping, block);
2638 loff_t newsize = rnb->offset + rnb->len;
2639 /* fs_prep_san_write will also update inode
2641 * (1) new alloced block
2642 * (2) existed block but size extented
2644 /* FIXME We could call fs_prep_san_write()
2645 * only once for all the blocks allocation.
2646 * Now call it once for each block, for
2647 * simplicity. And if error happens, we
2648 * probably need to release previous alloced
2650 rc = fs_prep_san_write(obd, inode, &block,
2656 rnb->offset = block;
2664 static int filter_statfs(struct obd_export *exp, struct obd_statfs *osfs)
2666 struct obd_device *obd = exp->exp_obd;
2669 RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2672 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2673 void *key, __u32 *vallen, void *val)
2675 struct obd_device *obd;
2678 obd = class_conn2obd(conn);
2680 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2685 if (keylen == strlen("blocksize") &&
2686 memcmp(key, "blocksize", keylen) == 0) {
2687 __u32 *blocksize = val;
2688 *vallen = sizeof(*blocksize);
2689 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2693 if (keylen == strlen("blocksize_bits") &&
2694 memcmp(key, "blocksize_bits", keylen) == 0) {
2695 __u32 *blocksize_bits = val;
2696 *vallen = sizeof(*blocksize_bits);
2697 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2701 CDEBUG(D_IOCTL, "invalid key\n");
2705 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2706 struct lustre_handle *src_conn, struct obdo *src,
2707 obd_size count, obd_off offset, struct obd_trans_info *oti)
2710 struct lov_stripe_md srcmd, dstmd;
2711 unsigned long index = 0;
2714 LBUG(); /* THIS CODE IS NOT CORRECT -phil */
2716 memset(&srcmd, 0, sizeof(srcmd));
2717 memset(&dstmd, 0, sizeof(dstmd));
2718 srcmd.lsm_object_id = src->o_id;
2719 dstmd.lsm_object_id = dst->o_id;
2722 CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2723 ", dst: ino "LPU64"\n",
2724 src->o_id, src->o_blocks, src->o_size, dst->o_id);
2725 page = alloc_page(GFP_USER);
2731 /* XXX with brw vector I/O, we could batch up reads and writes here,
2732 * all we need to do is allocate multiple pages to handle the I/Os
2733 * and arrays to handle the request parameters.
2735 while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2739 pg.count = PAGE_SIZE;
2740 pg.off = (page->index) << PAGE_SHIFT;
2743 page->index = index;
2744 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
2750 pg.flag = OBD_BRW_CREATE;
2751 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2753 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
2755 /* XXX should handle dst->o_size, dst->o_blocks here */
2761 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2765 dst->o_size = src->o_size;
2766 dst->o_blocks = src->o_blocks;
2767 dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2774 int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2775 int len, void *karg, void *uarg)
2777 struct obd_device *obd = class_conn2obd(conn);
2780 case OBD_IOC_ABORT_RECOVERY:
2781 CERROR("aborting recovery for device %s\n", obd->obd_name);
2782 target_abort_recovery(obd);
2792 static struct obd_ops filter_obd_ops = {
2793 o_owner: THIS_MODULE,
2794 o_attach: filter_attach,
2795 o_detach: filter_detach,
2796 o_get_info: filter_get_info,
2797 o_setup: filter_setup,
2798 o_cleanup: filter_cleanup,
2799 o_connect: filter_connect,
2800 o_disconnect: filter_disconnect,
2801 o_statfs: filter_statfs,
2802 o_syncfs: filter_syncfs,
2803 o_getattr: filter_getattr,
2804 o_create: filter_create,
2805 o_setattr: filter_setattr,
2806 o_destroy: filter_destroy,
2807 o_open: filter_open,
2808 o_close: filter_close,
2810 o_punch: filter_truncate,
2811 o_preprw: filter_preprw,
2812 o_commitrw: filter_commitrw,
2813 o_destroy_export: filter_destroy_export,
2814 o_iocontrol: filter_iocontrol,
2816 o_san_preprw: filter_san_preprw,
2817 o_preallocate: filter_preallocate_inodes,
2818 o_migrate: filter_migrate,
2819 o_copy: filter_copy_data,
2820 o_iterate: filter_iterate
2824 static struct obd_ops filter_sanobd_ops = {
2825 o_owner: THIS_MODULE,
2826 o_attach: filter_attach,
2827 o_detach: filter_detach,
2828 o_get_info: filter_get_info,
2829 o_setup: filter_san_setup,
2830 o_cleanup: filter_cleanup,
2831 o_connect: filter_connect,
2832 o_disconnect: filter_disconnect,
2833 o_statfs: filter_statfs,
2834 o_getattr: filter_getattr,
2835 o_create: filter_create,
2836 o_setattr: filter_setattr,
2837 o_destroy: filter_destroy,
2838 o_open: filter_open,
2839 o_close: filter_close,
2841 o_punch: filter_truncate,
2842 o_preprw: filter_preprw,
2843 o_commitrw: filter_commitrw,
2844 o_san_preprw: filter_san_preprw,
2845 o_destroy_export: filter_destroy_export,
2846 o_iocontrol: filter_iocontrol,
2848 o_preallocate: filter_preallocate_inodes,
2849 o_migrate: filter_migrate,
2850 o_copy: filter_copy_data,
2851 o_iterate: filter_iterate
2856 static int __init obdfilter_init(void)
2858 struct lprocfs_static_vars lvars;
2861 printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2863 lprocfs_init_vars(&lvars);
2865 rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2866 OBD_FILTER_DEVICENAME);
2870 rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2871 OBD_FILTER_SAN_DEVICENAME);
2873 class_unregister_type(OBD_FILTER_DEVICENAME);
2877 static void __exit obdfilter_exit(void)
2879 class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2880 class_unregister_type(OBD_FILTER_DEVICENAME);
2883 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2884 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2885 MODULE_LICENSE("GPL");
2887 module_init(obdfilter_init);
2888 module_exit(obdfilter_exit);