1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * Invariant: Get O/R i_sem for lookup, if needed, before any journal ops
28 * (which need to get journal_lock, may block if journal full).
30 * Invariant: Call filter_start_transno() before any journal ops to avoid the
31 * same deadlock problem. We can (and want) to get rid of the
32 * transno sem in favour of the dir/inode i_sem to avoid single
33 * threaded operation on the OST.
37 #define DEBUG_SUBSYSTEM S_FILTER
39 #include <linux/config.h>
40 #include <linux/module.h>
41 #include <linux/pagemap.h> // XXX kill me soon
43 #include <linux/dcache.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/obd_filter.h>
47 #include <linux/init.h>
48 #include <linux/random.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lprocfs_status.h>
51 #include <linux/version.h>
52 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
53 #include <linux/mount.h>
57 LPROC_FILTER_READ_BYTES = 0,
58 LPROC_FILTER_WRITE_BYTES = 1,
63 static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
65 [S_IFREG >> S_SHIFT] "R",
66 [S_IFDIR >> S_SHIFT] "D",
67 [S_IFCHR >> S_SHIFT] "C",
68 [S_IFBLK >> S_SHIFT] "B",
69 [S_IFIFO >> S_SHIFT] "F",
70 [S_IFSOCK >> S_SHIFT] "S",
71 [S_IFLNK >> S_SHIFT] "L"
74 static inline const char *obd_mode_to_type(int mode)
76 return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
79 static void filter_ffd_addref(void *ffdp)
81 struct filter_file_data *ffd = ffdp;
83 atomic_inc(&ffd->ffd_refcount);
84 CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
85 atomic_read(&ffd->ffd_refcount));
88 static struct filter_file_data *filter_ffd_new(void)
90 struct filter_file_data *ffd;
92 OBD_ALLOC(ffd, sizeof *ffd);
94 CERROR("out of memory\n");
98 atomic_set(&ffd->ffd_refcount, 2);
100 INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
101 class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
106 static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
108 struct filter_file_data *ffd = NULL;
110 LASSERT(handle != NULL);
111 ffd = class_handle2object(handle->cookie);
113 LASSERT(ffd->ffd_file->private_data == ffd);
117 static void filter_ffd_put(struct filter_file_data *ffd)
119 CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
120 atomic_read(&ffd->ffd_refcount) - 1);
121 LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
122 atomic_read(&ffd->ffd_refcount) < 0x5a5a);
123 if (atomic_dec_and_test(&ffd->ffd_refcount)) {
124 LASSERT(list_empty(&ffd->ffd_handle.h_link));
125 OBD_FREE(ffd, sizeof *ffd);
129 static void filter_ffd_destroy(struct filter_file_data *ffd)
131 class_handle_unhash(&ffd->ffd_handle);
135 static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
137 obd_transno_commit_cb(obd, transno, error);
139 /* Assumes caller has already pushed us into the kernel context. */
140 int filter_finish_transno(struct obd_export *export, void *handle,
141 struct obd_trans_info *oti, int rc)
144 struct obd_device *obd = export->exp_obd;
145 struct filter_obd *filter = &obd->u.filter;
146 struct filter_export_data *fed = &export->exp_filter_data;
147 struct filter_client_data *fcd = fed->fed_fcd;
151 /* Propagate error code. */
155 if (!obd->obd_replayable)
158 /* we don't allocate new transnos for replayed requests */
160 /* perhaps if transno already set? or should level be in oti? */
161 if (req->rq_level == LUSTRE_CONN_RECOVD)
165 off = fed->fed_lr_off;
167 spin_lock(&filter->fo_translock);
168 last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
169 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
170 spin_unlock(&filter->fo_translock);
172 oti->oti_transno = last_rcvd;
173 fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
174 fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
176 /* get this from oti */
179 fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
182 fcd->fcd_last_xid = 0;
184 fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
185 written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
187 CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
188 LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
190 if (written == sizeof(*fcd))
192 CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
199 static inline void f_dput(struct dentry *dentry)
201 /* Can't go inside filter_ddelete because it can block */
202 CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
203 dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
204 LASSERT(atomic_read(&dentry->d_count) > 0);
209 /* Not racy w.r.t. others, because we are the only user of this dentry */
210 static void filter_drelease(struct dentry *dentry)
212 if (dentry->d_fsdata)
213 OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
216 struct dentry_operations filter_dops = {
217 .d_release = filter_drelease,
220 #define LAST_RCVD "last_rcvd"
223 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
224 #define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
225 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
227 /* Add client data to the FILTER. We use a bitmap to locate a free space
228 * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
229 * Otherwise, we have just read the data from the last_rcvd file and
230 * we know its offset.
232 int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
233 struct filter_export_data *fed, int cl_idx)
235 unsigned long *bitmap = filter->fo_last_rcvd_slots;
236 int new_client = (cl_idx == -1);
238 LASSERT(bitmap != NULL);
240 /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
241 if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
244 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
245 * there's no need for extra complication here
248 cl_idx = find_first_zero_bit(bitmap, FILTER_LR_MAX_CLIENTS);
250 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
251 CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
254 if (test_and_set_bit(cl_idx, bitmap)) {
255 CERROR("FILTER client %d: found bit is set in bitmap\n",
257 cl_idx = find_next_zero_bit(bitmap,
258 FILTER_LR_MAX_CLIENTS,
263 if (test_and_set_bit(cl_idx, bitmap)) {
264 CERROR("FILTER client %d: bit already set in bitmap!\n",
270 fed->fed_lr_idx = cl_idx;
271 fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
272 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
274 CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
275 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
278 struct obd_run_ctxt saved;
279 loff_t off = fed->fed_lr_off;
283 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
284 fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
286 push_ctxt(&saved, &filter->fo_ctxt, NULL);
287 /* Transaction eeded to fix for bug 1403 */
288 handle = fsfilt_start(obd,
289 filter->fo_rcvd_filp->f_dentry->d_inode,
291 if (IS_ERR(handle)) {
292 written = PTR_ERR(handle);
293 CERROR("unable to start transaction: rc %d\n",
296 written = lustre_fwrite(filter->fo_rcvd_filp,
297 (char *)fed->fed_fcd,
298 sizeof(*fed->fed_fcd), &off);
300 filter->fo_rcvd_filp->f_dentry->d_inode,
303 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
305 if (written != sizeof(*fed->fed_fcd)) {
314 int filter_client_free(struct obd_export *exp, int failover)
316 struct filter_export_data *fed = &exp->exp_filter_data;
317 struct filter_obd *filter = &exp->exp_obd->u.filter;
318 struct filter_client_data zero_fcd;
319 struct obd_run_ctxt saved;
327 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
331 LASSERT(filter->fo_last_rcvd_slots != NULL);
333 off = fed->fed_lr_off;
335 CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
336 fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
338 if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
339 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
344 memset(&zero_fcd, 0, sizeof zero_fcd);
345 push_ctxt(&saved, &filter->fo_ctxt, NULL);
346 written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
347 sizeof(zero_fcd), &off);
349 /* XXX: this write gets lost sometimes, unless this sync is here. */
351 file_fsync(filter->fo_rcvd_filp,
352 filter->fo_rcvd_filp->f_dentry, 1);
353 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
355 if (written != sizeof(zero_fcd)) {
356 CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
357 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
361 "zeroed disconnecting client %s at idx %u (%llu)\n",
362 fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
365 OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
370 static int filter_free_server_data(struct filter_obd *filter)
372 OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
373 filter->fo_fsd = NULL;
374 OBD_FREE(filter->fo_last_rcvd_slots,
375 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
376 filter->fo_last_rcvd_slots = NULL;
381 /* assumes caller is already in kernel ctxt */
382 static int filter_update_server_data(struct file *filp,
383 struct filter_server_data *fsd)
388 CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
389 CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
390 le64_to_cpu(fsd->fsd_last_objid));
391 CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
392 le64_to_cpu(fsd->fsd_last_rcvd));
393 CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
394 le64_to_cpu(fsd->fsd_mount_count));
396 rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
397 if (rc != sizeof(*fsd)) {
398 CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
405 /* assumes caller has already in kernel ctxt */
406 static int filter_init_server_data(struct obd_device *obd, struct file * filp,
407 __u64 init_lastobjid)
409 struct filter_obd *filter = &obd->u.filter;
410 struct filter_server_data *fsd;
411 struct filter_client_data *fcd = NULL;
412 struct inode *inode = filp->f_dentry->d_inode;
413 unsigned long last_rcvd_size = inode->i_size;
414 __u64 mount_count = 0;
419 /* ensure padding in the struct is the correct size */
420 LASSERT (offsetof(struct filter_server_data, fsd_padding) +
421 sizeof(fsd->fsd_padding) == FILTER_LR_SERVER_SIZE);
422 LASSERT (offsetof(struct filter_client_data, fcd_padding) +
423 sizeof(fcd->fcd_padding) == FILTER_LR_CLIENT_SIZE);
425 OBD_ALLOC(fsd, sizeof(*fsd));
428 filter->fo_fsd = fsd;
430 OBD_ALLOC(filter->fo_last_rcvd_slots,
431 FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
432 if (filter->fo_last_rcvd_slots == NULL) {
433 OBD_FREE(fsd, sizeof(*fsd));
437 if (last_rcvd_size == 0) {
438 CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
440 memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
441 fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
442 fsd->fsd_last_rcvd = 0;
443 mount_count = fsd->fsd_mount_count = 0;
444 fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
445 fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
446 fsd->fsd_client_size = cpu_to_le16(FILTER_LR_CLIENT_SIZE);
447 fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
448 filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
450 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
452 if (retval != sizeof(*fsd)) {
453 CDEBUG(D_INODE,"OBD filter: error reading %s\n",
455 GOTO(err_fsd, rc = -EIO);
457 mount_count = le64_to_cpu(fsd->fsd_mount_count);
458 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
461 if (fsd->fsd_feature_incompat) {
462 CERROR("unsupported feature %x\n",
463 le32_to_cpu(fsd->fsd_feature_incompat));
464 GOTO(err_fsd, rc = -EINVAL);
466 if (fsd->fsd_feature_rocompat) {
467 CERROR("read-only feature %x\n",
468 le32_to_cpu(fsd->fsd_feature_rocompat));
469 /* Do something like remount filesystem read-only */
470 GOTO(err_fsd, rc = -EINVAL);
473 CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
474 obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
475 CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
476 obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
477 CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
478 obd->obd_name, mount_count);
479 CDEBUG(D_INODE, "%s: server data size: %u\n",
480 obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
481 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
482 obd->obd_name, le32_to_cpu(fsd->fsd_client_start));
483 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
484 obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
485 CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
486 obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
489 * When we do a clean FILTER shutdown, we save the last_rcvd into
490 * the header. If we find clients with higher last_rcvd values
491 * then those clients may need recovery done.
493 if (!obd->obd_replayable) {
494 CERROR("%s: recovery support OFF\n", obd->obd_name);
498 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
503 OBD_ALLOC(fcd, sizeof(*fcd));
505 GOTO(err_fsd, rc = -ENOMEM);
508 /* Don't assume off is incremented properly, in case
509 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
511 off = le32_to_cpu(fsd->fsd_client_start) +
512 cl_idx * le16_to_cpu(fsd->fsd_client_size);
513 rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
514 if (rc != sizeof(*fcd)) {
515 CERROR("error reading FILTER %s offset %d: rc = %d\n",
516 LAST_RCVD, cl_idx, rc);
517 if (rc > 0) /* XXX fatal error or just abort reading? */
522 if (fcd->fcd_uuid[0] == '\0') {
523 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
528 last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
530 /* These exports are cleaned up by filter_disconnect(), so they
531 * need to be set up like real exports as filter_connect() does.
533 mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
534 if (mount_age < FILTER_MOUNT_RECOV) {
535 struct obd_export *exp = class_new_export(obd);
536 struct filter_export_data *fed;
537 CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
538 " srv lr: "LPU64" mnt: "LPU64" last mount: "
539 LPU64"\n", fcd->fcd_uuid, cl_idx,
540 last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
541 le64_to_cpu(fcd->fcd_mount_count), mount_count);
543 /* XXX this rc is ignored */
547 memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
548 sizeof exp->exp_client_uuid.uuid);
549 fed = &exp->exp_filter_data;
551 filter_client_add(obd, filter, fed, cl_idx);
552 /* create helper if export init gets more complex */
553 INIT_LIST_HEAD(&fed->fed_open_head);
554 spin_lock_init(&fed->fed_lock);
557 obd->obd_recoverable_clients++;
558 class_export_put(exp);
561 "discarded client %d UUID '%s' count "LPU64"\n",
562 cl_idx, fcd->fcd_uuid,
563 le64_to_cpu(fcd->fcd_mount_count));
566 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
569 if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
570 filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
572 obd->obd_last_committed =
573 le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
574 if (obd->obd_recoverable_clients) {
575 CERROR("RECOVERY: %d recoverable clients, last_rcvd "
576 LPU64"\n", obd->obd_recoverable_clients,
577 le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
578 obd->obd_next_recovery_transno =
579 obd->obd_last_committed + 1;
580 obd->obd_recovering = 1;
586 OBD_FREE(fcd, sizeof(*fcd));
589 fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
591 /* save it,so mount count and last_recvd is current */
592 rc = filter_update_server_data(filp, filter->fo_fsd);
597 filter_free_server_data(filter);
601 /* setup the object store with correct subdirectories */
602 static int filter_prep(struct obd_device *obd)
604 struct obd_run_ctxt saved;
605 struct filter_obd *filter = &obd->u.filter;
606 struct dentry *dentry, *O_dentry;
613 push_ctxt(&saved, &filter->fo_ctxt, NULL);
614 dentry = simple_mkdir(current->fs->pwd, "O", 0700);
615 CDEBUG(D_INODE, "got/created O: %p\n", dentry);
616 if (IS_ERR(dentry)) {
617 rc = PTR_ERR(dentry);
618 CERROR("cannot open/create O: rc = %d\n", rc);
621 filter->fo_dentry_O = dentry;
624 * Create directories and/or get dentries for each object type.
625 * This saves us from having to do multiple lookups for each one.
627 O_dentry = filter->fo_dentry_O;
628 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
629 char *name = obd_type_by_mode[mode];
632 filter->fo_dentry_O_mode[mode] = NULL;
635 dentry = simple_mkdir(O_dentry, name, 0700);
636 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
637 if (IS_ERR(dentry)) {
638 rc = PTR_ERR(dentry);
639 CERROR("cannot create O/%s: rc = %d\n", name, rc);
640 GOTO(err_O_mode, rc);
642 filter->fo_dentry_O_mode[mode] = dentry;
645 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
646 if (!file || IS_ERR(file)) {
648 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
650 GOTO(err_O_mode, rc);
653 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
654 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
655 file->f_dentry->d_inode->i_mode);
656 GOTO(err_filp, rc = -ENOENT);
659 rc = fsfilt_journal_data(obd, file);
661 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
664 /* steal operations */
665 inode = file->f_dentry->d_inode;
666 filter->fo_fop = file->f_op;
667 filter->fo_iop = inode->i_op;
668 filter->fo_aops = inode->i_mapping->a_ops;
670 rc = filter_init_server_data(obd, file, INIT_OBJID);
672 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
673 GOTO(err_client, rc);
675 filter->fo_rcvd_filp = file;
677 if (filter->fo_subdir_count) {
678 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
679 OBD_ALLOC(filter->fo_dentry_O_sub,
680 filter->fo_subdir_count * sizeof(dentry));
681 if (!filter->fo_dentry_O_sub)
682 GOTO(err_client, rc = -ENOMEM);
684 for (i = 0; i < filter->fo_subdir_count; i++) {
686 snprintf(dir, sizeof(dir), "d%u", i);
688 dentry = simple_mkdir(O_dentry, dir, 0700);
689 CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
690 if (IS_ERR(dentry)) {
691 rc = PTR_ERR(dentry);
692 CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
695 filter->fo_dentry_O_sub[i] = dentry;
700 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
706 struct dentry *dentry = filter->fo_dentry_O_sub[i];
709 filter->fo_dentry_O_sub[i] = NULL;
712 OBD_FREE(filter->fo_dentry_O_sub,
713 filter->fo_subdir_count * sizeof(dentry));
715 class_disconnect_exports(obd, 0);
717 if (filp_close(file, 0))
718 CERROR("can't close %s after error\n", LAST_RCVD);
719 filter->fo_rcvd_filp = NULL;
722 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
725 filter->fo_dentry_O_mode[mode] = NULL;
728 f_dput(filter->fo_dentry_O);
729 filter->fo_dentry_O = NULL;
733 /* cleanup the filter: write last used object id to status file */
734 static void filter_post(struct obd_device *obd)
736 struct obd_run_ctxt saved;
737 struct filter_obd *filter = &obd->u.filter;
741 /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
742 * best to start a transaction with h_sync, because we removed this
745 push_ctxt(&saved, &filter->fo_ctxt, NULL);
746 rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
748 CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
751 if (filter->fo_rcvd_filp) {
752 rc = file_fsync(filter->fo_rcvd_filp,
753 filter->fo_rcvd_filp->f_dentry, 1);
754 filp_close(filter->fo_rcvd_filp, 0);
755 filter->fo_rcvd_filp = NULL;
757 CERROR("last_rcvd file won't closed rc = %ld\n", rc);
760 if (filter->fo_subdir_count) {
762 for (i = 0; i < filter->fo_subdir_count; i++) {
763 struct dentry *dentry = filter->fo_dentry_O_sub[i];
765 filter->fo_dentry_O_sub[i] = NULL;
767 OBD_FREE(filter->fo_dentry_O_sub,
768 filter->fo_subdir_count *
769 sizeof(*filter->fo_dentry_O_sub));
771 for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
772 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
775 filter->fo_dentry_O_mode[mode] = NULL;
778 f_dput(filter->fo_dentry_O);
779 filter_free_server_data(filter);
780 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
784 static __u64 filter_next_id(struct filter_obd *filter)
787 LASSERT(filter->fo_fsd != NULL);
789 spin_lock(&filter->fo_objidlock);
790 id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
791 filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
792 spin_unlock(&filter->fo_objidlock);
797 /* direct cut-n-paste of mds_blocking_ast() */
798 int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
799 void *data, int flag)
804 if (flag == LDLM_CB_CANCELING) {
805 /* Don't need to do anything here. */
809 /* XXX layering violation! -phil */
810 l_lock(&lock->l_resource->lr_namespace->ns_lock);
811 /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
812 * such that mds_blocking_ast is called just before l_i_p takes the
813 * ns_lock, then by the time we get the lock, we might not be the
814 * correct blocking function anymore. So check, and return early, if
816 if (lock->l_blocking_ast != filter_blocking_ast) {
817 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
821 lock->l_flags |= LDLM_FL_CBPENDING;
822 do_ast = (!lock->l_readers && !lock->l_writers);
823 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
826 struct lustre_handle lockh;
829 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
830 ldlm_lock2handle(lock, &lockh);
831 rc = ldlm_cli_cancel(&lockh);
833 CERROR("ldlm_cli_cancel: %d\n", rc);
835 LDLM_DEBUG(lock, "Lock still has references, will be "
841 static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
842 ldlm_mode_t lock_mode,struct lustre_handle *lockh)
844 struct ldlm_res_id res_id = { .name = {0} };
848 res_id.name[0] = de->d_inode->i_ino;
849 res_id.name[1] = de->d_inode->i_generation;
850 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
851 res_id, LDLM_PLAIN, NULL, 0, lock_mode,
852 &flags, ldlm_completion_ast,
853 filter_blocking_ast, NULL, lockh);
855 RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
858 static void filter_parent_unlock(struct dentry *dparent,
859 struct lustre_handle *lockh,
860 ldlm_mode_t lock_mode)
862 ldlm_lock_decref(lockh, lock_mode);
865 /* We never dget the object parent, so DON'T dput it either */
866 static inline struct dentry *filter_parent(struct obd_device *obd,
867 obd_mode mode, obd_id objid)
869 struct filter_obd *filter = &obd->u.filter;
871 LASSERT(S_ISREG(mode)); /* only regular files for now */
872 if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
873 return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
875 return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
878 /* We never dget the object parent, so DON'T dput it either */
879 static inline struct dentry *filter_parent_lock(struct obd_device *obd,
880 obd_mode mode, obd_id objid,
881 ldlm_mode_t lock_mode,
882 struct lustre_handle *lockh)
884 unsigned long now = jiffies;
885 struct dentry *de = filter_parent(obd, mode, objid);
891 rc = filter_lock_dentry(obd, de, lock_mode, lockh);
892 if (time_after(jiffies, now + 15*HZ))
893 CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
894 return rc ? ERR_PTR(rc) : de;
897 /* How to get files, dentries, inodes from object id's.
899 * If dir_dentry is passed, the caller has already locked the parent
900 * appropriately for this operation (normally a write lock). If
901 * dir_dentry is NULL, we do a read lock while we do the lookup to
902 * avoid races with create/destroy and such changing the directory
903 * internal to the filesystem code.
905 static struct dentry *filter_fid2dentry(struct obd_device *obd,
906 struct dentry *dir_dentry,
907 obd_mode mode, obd_id id)
909 struct super_block *sb = obd->u.filter.fo_sb;
910 struct lustre_handle lockh;
911 struct dentry *dparent = dir_dentry;
912 struct dentry *dchild;
917 if (!sb || !sb->s_dev) {
918 CERROR("device not initialized.\n");
919 RETURN(ERR_PTR(-ENXIO));
923 CERROR("fatal: invalid object id 0\n");
925 RETURN(ERR_PTR(-ESTALE));
928 len = sprintf(name, LPU64, id);
930 dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
934 CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
935 dparent->d_name.len, dparent->d_name.name, name);
936 dchild = ll_lookup_one_len(name, dparent, len);
938 filter_parent_unlock(dparent, &lockh, LCK_PR);
939 if (IS_ERR(dchild)) {
940 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
944 CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
945 name, dchild, atomic_read(&dchild->d_count));
947 LASSERT(atomic_read(&dchild->d_count) > 0);
952 static struct file *filter_obj_open(struct obd_export *export,
953 __u64 id, __u32 type,
954 ldlm_mode_t parent_mode,
955 struct lustre_handle *parent_lockh)
957 struct obd_device *obd = export->exp_obd;
958 struct filter_obd *filter = &obd->u.filter;
959 struct super_block *sb = filter->fo_sb;
960 struct dentry *dchild = NULL, *dparent = NULL;
961 struct filter_export_data *fed = &export->exp_filter_data;
962 struct filter_dentry_data *fdd = NULL;
963 struct filter_file_data *ffd = NULL;
964 struct obd_run_ctxt saved;
967 int len, cleanup_phase = 0;
970 push_ctxt(&saved, &filter->fo_ctxt, NULL);
972 if (!sb || !sb->s_dev) {
973 CERROR("fatal: device not initialized.\n");
974 GOTO(cleanup, file = ERR_PTR(-ENXIO));
978 CERROR("fatal: invalid obdo "LPU64"\n", id);
979 GOTO(cleanup, file = ERR_PTR(-ESTALE));
982 if (!(type & S_IFMT)) {
983 CERROR("OBD %s, object "LPU64" has bad type: %o\n",
984 __FUNCTION__, id, type);
985 GOTO(cleanup, file = ERR_PTR(-EINVAL));
988 ffd = filter_ffd_new();
990 CERROR("obdfilter: out of memory\n");
991 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
996 /* We preallocate this to avoid blocking while holding fo_fddlock */
997 OBD_ALLOC(fdd, sizeof *fdd);
999 CERROR("obdfilter: out of memory\n");
1000 GOTO(cleanup, file = ERR_PTR(-ENOMEM));
1005 dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
1006 if (IS_ERR(dparent))
1007 GOTO(cleanup, file = (void *)dparent);
1011 len = snprintf(name, sizeof(name), LPU64, id);
1012 dchild = ll_lookup_one_len(name, dparent, len);
1014 GOTO(cleanup, file = (void *)dchild);
1018 if (dchild->d_inode == NULL) {
1019 CERROR("opening non-existent object %s - O_CREAT?\n", name);
1020 file = ERR_PTR(-ENOENT);
1021 GOTO(cleanup, file);
1024 /* dentry_open does a dput(dchild) and mntput(mnt) on error */
1025 mntget(filter->fo_vfsmnt);
1026 file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
1028 dchild = NULL; /* prevent a double dput in step 4 */
1029 CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
1030 GOTO(cleanup, file);
1033 spin_lock(&filter->fo_fddlock);
1034 if (dchild->d_fsdata) {
1035 spin_unlock(&filter->fo_fddlock);
1036 OBD_FREE(fdd, sizeof *fdd);
1037 fdd = dchild->d_fsdata;
1038 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1039 /* should only happen during client recovery */
1040 if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
1041 CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
1042 atomic_inc(&fdd->fdd_open_count);
1044 atomic_set(&fdd->fdd_open_count, 1);
1045 fdd->fdd_magic = FILTER_DENTRY_MAGIC;
1047 fdd->fdd_objid = id;
1048 /* If this is racy, then we can use {cmp}xchg and atomic_add */
1049 dchild->d_fsdata = fdd;
1050 spin_unlock(&filter->fo_fddlock);
1053 ffd->ffd_file = file;
1054 LASSERT(file->private_data == NULL);
1055 file->private_data = ffd;
1058 dchild->d_op = &filter_dops;
1060 LASSERT(dchild->d_op == &filter_dops);
1062 spin_lock(&fed->fed_lock);
1063 list_add(&ffd->ffd_export_list, &fed->fed_open_head);
1064 spin_unlock(&fed->fed_lock);
1066 CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
1068 switch (cleanup_phase) {
1074 filter_parent_unlock(dparent, parent_lockh,parent_mode);
1077 OBD_FREE(fdd, sizeof *fdd);
1080 filter_ffd_destroy(ffd);
1081 filter_ffd_put(ffd);
1083 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1088 /* Caller must hold LCK_PW on parent and push us into kernel context.
1089 * Caller is also required to ensure that dchild->d_inode exists.
1091 static int filter_destroy_internal(struct obd_device *obd,
1092 struct dentry *dparent,
1093 struct dentry *dchild)
1095 struct inode *inode = dchild->d_inode;
1099 if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
1100 CERROR("destroying objid %*s nlink = %d, count = %d\n",
1101 dchild->d_name.len, dchild->d_name.name,
1102 inode->i_nlink, atomic_read(&inode->i_count));
1105 rc = vfs_unlink(dparent->d_inode, dchild);
1108 CERROR("error unlinking objid %*s: rc %d\n",
1109 dchild->d_name.len, dchild->d_name.name, rc);
1114 /* If closing because we are failing this device, then
1115 don't do the unlink on close.
1117 static int filter_close_internal(struct obd_export *exp,
1118 struct filter_file_data *ffd,
1119 struct obd_trans_info *oti,
1122 struct obd_device *obd = exp->exp_obd;
1123 struct filter_obd *filter = &obd->u.filter;
1124 struct file *filp = ffd->ffd_file;
1125 struct dentry *dchild = dget(filp->f_dentry);
1126 struct filter_dentry_data *fdd = dchild->d_fsdata;
1127 struct lustre_handle parent_lockh;
1128 int rc, rc2, cleanup_phase = 0;
1129 struct dentry *dparent;
1130 struct obd_run_ctxt saved;
1133 LASSERT(filp->private_data == ffd);
1135 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1137 rc = filp_close(filp, 0);
1139 if (atomic_dec_and_test(&fdd->fdd_open_count) &&
1140 fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
1143 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1146 LASSERT(fdd->fdd_objid > 0);
1147 dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
1148 LCK_PW, &parent_lockh);
1149 if (IS_ERR(dparent))
1150 GOTO(cleanup, rc = PTR_ERR(dparent));
1153 handle = fsfilt_start(obd, dparent->d_inode,
1156 GOTO(cleanup, rc = PTR_ERR(handle));
1158 /* XXX unlink from PENDING directory now too */
1159 rc2 = filter_destroy_internal(obd, dparent, dchild);
1162 rc = filter_finish_transno(exp, handle, oti, rc);
1163 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1165 CERROR("error on commit, err = %d\n", rc2);
1172 switch(cleanup_phase) {
1174 if (rc || oti == NULL) {
1175 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1177 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1178 sizeof(parent_lockh));
1179 oti->oti_ack_locks[0].mode = LCK_PW;
1182 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1185 filter_ffd_destroy(ffd);
1188 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1196 /* mount the file system (secretly) */
1197 static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
1200 struct obd_ioctl_data* data = buf;
1201 struct filter_obd *filter = &obd->u.filter;
1203 struct vfsmount *mnt;
1207 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1210 obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1211 if (IS_ERR(obd->obd_fsops))
1212 RETURN(PTR_ERR(obd->obd_fsops));
1214 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
1219 if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1220 if (*data->ioc_inlbuf3 == 'f') {
1221 obd->obd_replayable = 1;
1222 obd_sync_filter = 1;
1223 CERROR("%s: configured for recovery and sync write\n",
1226 if (*data->ioc_inlbuf3 != 'n') {
1227 CERROR("unrecognised flag '%c'\n",
1228 *data->ioc_inlbuf3);
1233 if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
1234 if (*data->ioc_inlbuf4 == '/') {
1235 CERROR("filter namespace mount: %s\n",
1237 filter->fo_nspath = strdup(data->ioc_inlbuf4);
1239 CERROR("namespace mount must be absolute path: '%s'\n",
1244 filter->fo_vfsmnt = mnt;
1245 filter->fo_sb = mnt->mnt_sb;
1246 filter->fo_fstype = mnt->mnt_sb->s_type->name;
1247 CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
1249 OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
1250 filter->fo_ctxt.pwdmnt = mnt;
1251 filter->fo_ctxt.pwd = mnt->mnt_root;
1252 filter->fo_ctxt.fs = get_ds();
1254 rc = filter_prep(obd);
1256 GOTO(err_mntput, rc);
1258 spin_lock_init(&filter->fo_translock);
1259 spin_lock_init(&filter->fo_fddlock);
1260 spin_lock_init(&filter->fo_objidlock);
1261 INIT_LIST_HEAD(&filter->fo_export_list);
1263 obd->obd_namespace = ldlm_namespace_new("filter-tgt",
1264 LDLM_NAMESPACE_SERVER);
1265 if (!obd->obd_namespace)
1266 GOTO(err_post, rc = -ENOMEM);
1268 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1269 "filter_ldlm_cb_client", &obd->obd_ldlm_client);
1281 fsfilt_put_ops(obd->obd_fsops);
1285 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
1287 struct obd_ioctl_data* data = buf;
1288 char *option = NULL;
1290 if (!strcmp(data->ioc_inlbuf2, "ext3"))
1291 option = "asyncdel";
1293 return filter_common_setup(obd, len, buf, option);
1296 /* sanobd setup methods - use a specific mount option */
1297 static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
1299 struct obd_ioctl_data* data = buf;
1300 char *option = NULL;
1302 if (!data->ioc_inlbuf2)
1305 /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
1306 if (!strcmp(data->ioc_inlbuf2, "extN"))
1307 option = "data=writeback";
1308 else if (!strcmp(data->ioc_inlbuf2, "ext3"))
1309 option = "data=writeback,asyncdel";
1311 LBUG(); /* just a reminder */
1313 return filter_common_setup(obd, len, buf, option);
1316 static int filter_cleanup(struct obd_device *obd, int force, int failover)
1318 struct super_block *sb;
1322 CERROR("%s: shutting down for failover; client state will"
1323 " be preserved.\n", obd->obd_name);
1325 if (!list_empty(&obd->obd_exports)) {
1326 CERROR("%s: still has clients!\n", obd->obd_name);
1327 class_disconnect_exports(obd, failover);
1328 if (!list_empty(&obd->obd_exports)) {
1329 CERROR("still has exports after forced cleanup?\n");
1334 ldlm_namespace_free(obd->obd_namespace);
1336 sb = obd->u.filter.fo_sb;
1342 shrink_dcache_parent(sb->s_root);
1345 if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
1346 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1347 atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
1350 mntput(obd->u.filter.fo_vfsmnt);
1351 obd->u.filter.fo_sb = 0;
1352 /* destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
1354 fsfilt_put_ops(obd->obd_fsops);
1360 int filter_attach(struct obd_device *obd, obd_count len, void *data)
1362 struct lprocfs_static_vars lvars;
1365 lprocfs_init_vars(&lvars);
1366 rc = lprocfs_obd_attach(obd, lvars.obd_vars);
1370 rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
1374 /* Init obdfilter private stats here */
1375 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
1376 LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
1377 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
1378 LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
1382 int filter_detach(struct obd_device *dev)
1384 lprocfs_free_obd_stats(dev);
1385 return lprocfs_obd_detach(dev);
1388 /* nearly identical to mds_connect */
1389 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
1390 struct obd_uuid *cluuid)
1392 struct obd_export *exp;
1393 struct filter_export_data *fed;
1394 struct filter_client_data *fcd;
1395 struct filter_obd *filter = &obd->u.filter;
1400 if (!conn || !obd || !cluuid)
1403 rc = class_connect(conn, obd, cluuid);
1406 exp = class_conn2export(conn);
1409 fed = &exp->exp_filter_data;
1410 class_export_put(exp);
1412 INIT_LIST_HEAD(&fed->fed_open_head);
1413 spin_lock_init(&fed->fed_lock);
1415 if (!obd->obd_replayable)
1418 OBD_ALLOC(fcd, sizeof(*fcd));
1420 CERROR("filter: out of memory for client data\n");
1421 GOTO(out_export, rc = -ENOMEM);
1424 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
1426 fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
1428 rc = filter_client_add(obd, filter, fed, -1);
1435 OBD_FREE(fcd, sizeof(*fcd));
1437 class_disconnect(conn, 0);
1442 static void filter_destroy_export(struct obd_export *exp)
1444 struct filter_export_data *fed = &exp->exp_filter_data;
1447 spin_lock(&fed->fed_lock);
1448 while (!list_empty(&fed->fed_open_head)) {
1449 struct filter_file_data *ffd;
1451 ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
1453 list_del(&ffd->ffd_export_list);
1454 spin_unlock(&fed->fed_lock);
1456 CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
1457 ffd->ffd_file->f_dentry->d_name.len,
1458 ffd->ffd_file->f_dentry->d_name.name,
1459 ffd, ffd->ffd_handle.h_cookie);
1461 filter_close_internal(exp, ffd, NULL, exp->exp_failover);
1462 spin_lock(&fed->fed_lock);
1464 spin_unlock(&fed->fed_lock);
1466 if (exp->exp_obd->obd_replayable)
1467 filter_client_free(exp, exp->exp_failover);
1471 /* also incredibly similar to mds_disconnect */
1472 static int filter_disconnect(struct lustre_handle *conn, int failover)
1474 struct obd_export *exp = class_conn2export(conn);
1476 unsigned long flags;
1480 ldlm_cancel_locks_for_export(exp);
1482 spin_lock_irqsave(&exp->exp_lock, flags);
1483 exp->exp_failover = failover;
1484 spin_unlock_irqrestore(&exp->exp_lock, flags);
1486 rc = class_disconnect(conn, failover);
1488 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
1489 class_export_put(exp);
1490 /* XXX cleanup preallocated inodes */
1494 static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
1496 int type = oa->o_mode & S_IFMT;
1499 CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
1500 inode->i_ino, inode, oa->o_id, valid);
1501 /* Don't copy the inode number in place of the object ID */
1502 obdo_from_inode(oa, inode, valid);
1503 oa->o_mode &= ~S_IFMT;
1506 if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
1507 obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
1509 oa->o_valid |= OBD_MD_FLRDEV;
1515 static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
1516 struct obdo *oa, char *what)
1518 struct dentry *dchild = NULL;
1520 if (oa->o_valid & OBD_MD_FLHANDLE) {
1521 struct lustre_handle *ost_handle = obdo_handle(oa);
1522 struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
1525 struct filter_dentry_data *fdd;
1526 dchild = dget(ffd->ffd_file->f_dentry);
1527 fdd = dchild->d_fsdata;
1528 LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
1529 filter_ffd_put(ffd);
1532 "got child objid %*s: %p, count = %d\n",
1533 dchild->d_name.len, dchild->d_name.name,
1534 dchild, atomic_read(&dchild->d_count));
1539 struct obd_device *obd = class_conn2obd(conn);
1542 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1543 RETURN(ERR_PTR(-EINVAL));
1545 dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
1548 if (IS_ERR(dchild)) {
1549 CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
1553 if (!dchild->d_inode) {
1554 CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
1556 RETURN(ERR_PTR(-ENOENT));
1562 #define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__)
1564 static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
1565 struct lov_stripe_md *md)
1567 struct dentry *dentry = NULL;
1571 dentry = filter_oa2dentry(conn, oa);
1573 RETURN(PTR_ERR(dentry));
1575 filter_from_inode(oa, dentry->d_inode, oa->o_valid);
1581 /* this is called from filter_truncate() until we have filter_punch() */
1582 static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
1583 struct lov_stripe_md *md, struct obd_trans_info *oti)
1585 struct obd_run_ctxt saved;
1586 struct obd_export *export = class_conn2export(conn);
1587 struct obd_device *obd = class_conn2obd(conn);
1588 struct filter_obd *filter = &obd->u.filter;
1589 struct dentry *dentry;
1591 struct inode *inode;
1596 dentry = filter_oa2dentry(conn, oa);
1599 GOTO(out_exp, rc = PTR_ERR(dentry));
1601 iattr_from_obdo(&iattr, oa, oa->o_valid);
1602 iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
1603 inode = dentry->d_inode;
1605 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1607 if (iattr.ia_valid & ATTR_SIZE)
1608 down(&inode->i_sem);
1610 handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
1612 GOTO(out_unlock, rc = PTR_ERR(handle));
1614 rc = fsfilt_setattr(obd, dentry, handle, &iattr, 1);
1615 rc = filter_finish_transno(export, handle, oti, rc);
1616 rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
1618 CERROR("error on commit, err = %d\n", rc2);
1623 if (iattr.ia_valid & ATTR_SIZE) {
1625 oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
1626 obdo_from_inode(oa, inode, oa->o_valid);
1631 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1635 class_export_put(export);
1639 static int filter_open(struct lustre_handle *conn, struct obdo *oa,
1640 struct lov_stripe_md *ea, struct obd_trans_info *oti,
1641 struct obd_client_handle *och)
1643 struct obd_export *export = NULL;
1644 struct lustre_handle *handle;
1645 struct filter_file_data *ffd;
1647 struct lustre_handle parent_lockh;
1651 export = class_conn2export(conn);
1653 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1655 GOTO(out, rc = -EINVAL);
1658 filp = filter_obj_open(export, oa->o_id, oa->o_mode,
1659 LCK_PR, &parent_lockh);
1661 GOTO(out, rc = PTR_ERR(filp));
1663 filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
1665 ffd = filp->private_data;
1666 handle = obdo_handle(oa);
1667 handle->cookie = ffd->ffd_handle.h_cookie;
1668 oa->o_valid |= OBD_MD_FLHANDLE;
1671 class_export_put(export);
1673 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1674 sizeof(parent_lockh));
1675 oti->oti_ack_locks[0].mode = LCK_PR;
1680 static int filter_close(struct lustre_handle *conn, struct obdo *oa,
1681 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1683 struct obd_export *exp = class_conn2export(conn);
1684 struct filter_file_data *ffd;
1685 struct filter_export_data *fed;
1690 CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
1691 GOTO(out, rc = -EINVAL);
1694 if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
1695 CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
1696 GOTO(out, rc = -EINVAL);
1699 ffd = filter_handle2ffd(obdo_handle(oa));
1701 CERROR("bad handle ("LPX64") for close\n",
1702 obdo_handle(oa)->cookie);
1703 GOTO(out, rc = -ESTALE);
1706 fed = &exp->exp_filter_data;
1707 spin_lock(&fed->fed_lock);
1708 list_del(&ffd->ffd_export_list);
1709 spin_unlock(&fed->fed_lock);
1711 rc = filter_close_internal(exp, ffd, oti, 0);
1712 filter_ffd_put(ffd);
1715 class_export_put(exp);
1719 static int filter_create(struct lustre_handle *conn, struct obdo *oa,
1720 struct lov_stripe_md **ea, struct obd_trans_info *oti)
1722 struct obd_export *exp;
1723 struct obd_device *obd = class_conn2obd(conn);
1724 struct filter_obd *filter = &obd->u.filter;
1725 struct obd_run_ctxt saved;
1726 struct lustre_handle parent_lockh;
1727 struct dentry *dparent;
1728 struct dentry *dchild = NULL;
1731 int err, rc, cleanup_phase;
1735 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1739 exp = class_conn2export(conn);
1741 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1743 oa->o_id = filter_next_id(filter);
1746 dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
1748 if (IS_ERR(dparent))
1749 GOTO(cleanup, rc = PTR_ERR(dparent));
1752 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1754 GOTO(cleanup, rc = PTR_ERR(dchild));
1755 if (dchild->d_inode) {
1756 /* This would only happen if lastobjid was bad on disk */
1757 CERROR("Serious error: objid %*s already exists; is this "
1758 "filesystem corrupt? I will try to work around it.\n",
1759 dchild->d_name.len, dchild->d_name.name);
1761 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1766 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE);
1768 GOTO(cleanup, rc = PTR_ERR(handle));
1770 rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
1772 CERROR("create failed rc = %d\n", rc);
1774 rc = filter_finish_transno(exp, handle, oti, rc);
1775 err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
1777 CERROR("unable to write lastobjid but file created\n");
1781 err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1783 CERROR("error on commit, err = %d\n", err);
1791 /* Set flags for fields we have set in the inode struct */
1792 oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
1793 OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
1794 filter_from_inode(oa, dchild->d_inode, oa->o_valid);
1798 switch(cleanup_phase) {
1801 case 1: /* locked parent dentry */
1802 if (rc || oti == NULL) {
1803 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1805 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1806 sizeof(parent_lockh));
1807 oti->oti_ack_locks[0].mode = LCK_PW;
1810 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1811 class_export_put(exp);
1814 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1821 static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
1822 struct lov_stripe_md *ea, struct obd_trans_info *oti)
1824 struct obd_export *exp;
1825 struct obd_device *obd = class_conn2obd(conn);
1826 struct filter_obd *filter = &obd->u.filter;
1827 struct dentry *dparent, *dchild = NULL;
1828 struct filter_dentry_data *fdd;
1829 struct obd_run_ctxt saved;
1830 void *handle = NULL;
1831 struct lustre_handle parent_lockh;
1832 int rc, rc2, cleanup_phase = 0;
1836 CERROR("invalid client cookie "LPX64"\n", conn->cookie);
1840 exp = class_conn2export(conn);
1842 CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
1844 push_ctxt(&saved, &filter->fo_ctxt, NULL);
1845 dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
1846 LCK_PW, &parent_lockh);
1847 if (IS_ERR(dparent))
1848 GOTO(cleanup, rc = PTR_ERR(dparent));
1851 dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
1853 GOTO(cleanup, rc = -ENOENT);
1856 if (!dchild->d_inode) {
1857 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
1858 GOTO(cleanup, rc = -ENOENT);
1861 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK);
1863 GOTO(cleanup, rc = PTR_ERR(handle));
1866 fdd = dchild->d_fsdata;
1867 if (fdd && atomic_read(&fdd->fdd_open_count)) {
1868 LASSERT(fdd->fdd_magic = FILTER_DENTRY_MAGIC);
1869 if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
1870 fdd->fdd_flags |= FILTER_FLAG_DESTROY;
1871 /* XXX put into PENDING directory in case of crash */
1873 "defer destroy of %dx open objid "LPU64"\n",
1874 atomic_read(&fdd->fdd_open_count), oa->o_id);
1877 "repeat destroy of %dx open objid "LPU64"\n",
1878 atomic_read(&fdd->fdd_open_count), oa->o_id);
1879 GOTO(cleanup, rc = 0);
1882 rc = filter_destroy_internal(obd, dparent, dchild);
1885 switch(cleanup_phase) {
1887 rc = filter_finish_transno(exp, handle, oti, rc);
1888 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
1890 CERROR("error on commit, err = %d\n", rc2);
1897 if (rc || oti == NULL) {
1898 filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
1900 memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
1901 sizeof(parent_lockh));
1902 oti->oti_ack_locks[0].mode = LCK_PW;
1905 pop_ctxt(&saved, &filter->fo_ctxt, NULL);
1906 class_export_put(exp);
1909 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1916 /* NB start and end are used for punch, but not truncate */
1917 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
1918 struct lov_stripe_md *lsm,
1919 obd_off start, obd_off end,
1920 struct obd_trans_info *oti)
1925 if (end != OBD_OBJECT_EOF)
1926 CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
1929 CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
1930 "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
1932 error = filter_setattr(conn, oa, NULL, oti);
1936 static inline void lustre_put_page(struct page *page)
1938 page_cache_release(page);
1941 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
1943 struct address_space *mapping = inode->i_mapping;
1945 unsigned long index = lnb->offset >> PAGE_SHIFT;
1948 page = grab_cache_page(mapping, index); /* locked page */
1950 return lnb->rc = PTR_ERR(page);
1954 if (inode->i_size < lnb->offset + lnb->len - 1)
1955 lnb->rc = inode->i_size - lnb->offset;
1959 if (PageUptodate(page)) {
1964 rc = mapping->a_ops->readpage(NULL, page);
1966 CERROR("page index %lu, rc = %d\n", index, rc);
1968 lustre_put_page(page);
1969 return lnb->rc = rc;
1975 static int filter_finish_page_read(struct niobuf_local *lnb)
1977 if (lnb->page == NULL)
1980 if (PageUptodate(lnb->page))
1983 wait_on_page(lnb->page);
1984 if (!PageUptodate(lnb->page)) {
1985 CERROR("page index %lu/offset "LPX64" not uptodate\n",
1986 lnb->page->index, lnb->offset);
1987 GOTO(err_page, lnb->rc = -EIO);
1989 if (PageError(lnb->page)) {
1990 CERROR("page index %lu/offset "LPX64" has error\n",
1991 lnb->page->index, lnb->offset);
1992 GOTO(err_page, lnb->rc = -EIO);
1998 lustre_put_page(lnb->page);
2003 static struct page *lustre_get_page_write(struct inode *inode,
2004 unsigned long index)
2006 struct address_space *mapping = inode->i_mapping;
2010 page = grab_cache_page(mapping, index); /* locked page */
2012 if (!IS_ERR(page)) {
2013 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
2014 * a no-op for most filesystems, because we write the whole
2015 * page. For partial-page I/O this will read in the page.
2017 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
2019 CERROR("page index %lu, rc = %d\n", index, rc);
2022 GOTO(err_unlock, rc);
2024 /* XXX not sure if we need this if we are overwriting page */
2025 if (PageError(page)) {
2026 CERROR("error on page index %lu, rc = %d\n", index, rc);
2028 GOTO(err_unlock, rc = -EIO);
2035 lustre_put_page(page);
2039 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2040 int waitfor_one_page(struct page *page)
2042 wait_on_page_locked(page);
2047 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2048 /* We should only change the file mtime (and not the ctime, like
2049 * update_inode_times() in generic_file_write()) when we only change data.
2051 static inline void inode_update_time(struct inode *inode, int ctime_too)
2053 time_t now = CURRENT_TIME;
2054 if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
2056 inode->i_mtime = now;
2058 inode->i_ctime = now;
2059 mark_inode_dirty_sync(inode);
2063 static int lustre_commit_write(struct niobuf_local *lnb)
2065 struct page *page = lnb->page;
2066 unsigned from = lnb->offset & ~PAGE_MASK;
2067 unsigned to = from + lnb->len;
2068 struct inode *inode = page->mapping->host;
2071 LASSERT(to <= PAGE_SIZE);
2072 err = page->mapping->a_ops->commit_write(NULL, page, from, to);
2073 if (!err && IS_SYNC(inode))
2074 err = waitfor_one_page(page);
2075 //SetPageUptodate(page); // the client commit_write will do this
2077 SetPageReferenced(page);
2079 lustre_put_page(page);
2083 int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
2086 unsigned long index = lnb->offset >> PAGE_SHIFT;
2087 struct address_space *mapping = inode->i_mapping;
2091 //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
2093 page = grab_cache_page_nowait(mapping, index); /* locked page */
2095 page = grab_cache_page(mapping, index); /* locked page */
2098 /* This page is currently locked, so get a temporary page instead. */
2100 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
2101 page = alloc_pages(GFP_KERNEL, 0); /* locked page */
2103 CERROR("no memory for a temp page\n");
2104 GOTO(err, rc = -ENOMEM);
2106 page->index = index;
2108 lnb->flags |= N_LOCAL_TEMP_PAGE;
2109 } else if (!IS_ERR(page)) {
2112 rc = mapping->a_ops->prepare_write(NULL, page,
2113 lnb->offset & ~PAGE_MASK,
2117 CERROR("page index %lu, rc = %d\n", index, rc);
2118 GOTO(err_unlock, rc);
2120 /* XXX not sure if we need this if we are overwriting page */
2121 if (PageError(page)) {
2122 CERROR("error on page index %lu, rc = %d\n", index, rc);
2124 GOTO(err_unlock, rc = -EIO);
2133 lustre_put_page(page);
2135 return lnb->rc = rc;
2139 * We need to balance prepare_write() calls with commit_write() calls.
2140 * If the page has been prepared, but we have no data for it, we don't
2141 * want to overwrite valid data on disk, but we still need to zero out
2142 * data for space which was newly allocated. Like part of what happens
2143 * in __block_prepare_write() for newly allocated blocks.
2145 * XXX currently __block_prepare_write() creates buffers for all the
2146 * pages, and the filesystems mark these buffers as BH_New if they
2147 * were newly allocated from disk. We use the BH_New flag similarly.
2149 static int filter_commit_write(struct niobuf_local *lnb, int err)
2151 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2153 unsigned block_start, block_end;
2154 struct buffer_head *bh, *head = lnb->page->buffers;
2155 unsigned blocksize = head->b_size;
2157 /* debugging: just seeing if this ever happens */
2158 CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
2159 "called for ino %lu:%lu on err %d\n",
2160 lnb->page->mapping->host->i_ino, lnb->page->index, err);
2162 /* Currently one buffer per page, but in the future... */
2163 for (bh = head, block_start = 0; bh != head || !block_start;
2164 block_start = block_end, bh = bh->b_this_page) {
2165 block_end = block_start + blocksize;
2166 if (buffer_new(bh)) {
2167 memset(kmap(lnb->page) + block_start, 0,
2174 return lustre_commit_write(lnb);
2177 static int filter_preprw(int cmd, struct obd_export *exp,
2178 int objcount, struct obd_ioobj *obj,
2179 int niocount, struct niobuf_remote *nb,
2180 struct niobuf_local *res, void **desc_private,
2181 struct obd_trans_info *oti)
2183 struct obd_run_ctxt saved;
2184 struct obd_device *obd;
2185 struct obd_ioobj *o;
2186 struct niobuf_remote *rnb;
2187 struct niobuf_local *lnb;
2188 struct fsfilt_objinfo *fso;
2189 struct dentry *dentry;
2190 struct inode *inode;
2191 int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
2192 unsigned long now = jiffies;
2195 memset(res, 0, niocount * sizeof(*res));
2201 // theoretically we support multi-obj BRW RPCs, but until then...
2202 LASSERT(objcount == 1);
2204 OBD_ALLOC(fso, objcount * sizeof(*fso));
2208 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2210 for (i = 0, o = obj; i < objcount; i++, o++) {
2211 struct filter_dentry_data *fdd;
2213 LASSERT(o->ioo_bufcnt);
2215 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2218 GOTO(out_objinfo, rc = PTR_ERR(dentry));
2220 fso[i].fso_dentry = dentry;
2221 fso[i].fso_bufcnt = o->ioo_bufcnt;
2223 if (!dentry->d_inode) {
2224 CERROR("trying to BRW to non-existent file "LPU64"\n",
2227 GOTO(out_objinfo, rc = -ENOENT);
2230 /* If we ever start to support mutli-object BRW RPCs, we will
2231 * need to get locks on mulitple inodes (in order) or use the
2232 * DLM to do the locking for us (and use the same locking in
2233 * filter_setattr() for truncate). That isn't all, because
2234 * there still exists the possibility of a truncate starting
2235 * a new transaction while holding the ext3 rwsem = write
2236 * while some writes (which have started their transactions
2237 * here) blocking on the ext3 rwsem = read => lock inversion.
2239 * The handling gets very ugly when dealing with locked pages.
2240 * It may be easier to just get rid of the locked page code
2241 * (which has problems of its own) and either discover we do
2242 * not need it anymore (i.e. it was a symptom of another bug)
2243 * or ensure we get the page locks in an appropriate order.
2245 if (cmd & OBD_BRW_WRITE)
2246 down(&dentry->d_inode->i_sem);
2247 fdd = dentry->d_fsdata;
2248 if (!fdd || !atomic_read(&fdd->fdd_open_count))
2249 CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
2253 if (time_after(jiffies, now + 15*HZ))
2254 CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
2256 if (cmd & OBD_BRW_WRITE) {
2257 *desc_private = fsfilt_brw_start(obd, objcount, fso,
2259 if (IS_ERR(*desc_private)) {
2260 rc = PTR_ERR(*desc_private);
2261 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2262 "error starting transaction: rc = %d\n", rc);
2263 *desc_private = NULL;
2264 GOTO(out_objinfo, rc);
2268 for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
2269 dentry = fso[i].fso_dentry;
2270 inode = dentry->d_inode;
2272 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
2274 lnb->dentry = dentry;
2276 lnb->dentry = dget(dentry);
2278 lnb->offset = rnb->offset;
2279 lnb->len = rnb->len;
2280 lnb->flags = rnb->flags;
2281 lnb->start = jiffies;
2283 if (cmd & OBD_BRW_WRITE) {
2284 rc = filter_get_page_write(inode,lnb,&pglocked);
2286 up(&dentry->d_inode->i_sem);
2287 } else if (inode->i_size <= rnb->offset) {
2288 /* If there's no more data, abort early.
2289 * lnb->page == NULL and lnb->rc == 0, so it's
2290 * easy to detect later. */
2295 rc = filter_start_page_read(inode, lnb);
2299 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
2300 "page err %u@"LPU64" %u/%u %p: rc %d\n",
2301 lnb->len, lnb->offset, j, o->ioo_bufcnt,
2304 GOTO(out_pages, rc);
2307 tot_bytes += lnb->len;
2309 if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
2310 /* Likewise with a partial read */
2316 if (time_after(jiffies, now + 15*HZ))
2317 CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
2319 if (cmd & OBD_BRW_READ) {
2320 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
2322 while (lnb-- > res) {
2323 rc = filter_finish_page_read(lnb);
2325 CERROR("error page %u@"LPU64" %u %p: rc %d\n",
2326 lnb->len, lnb->offset, lnb - res,
2328 f_dput(lnb->dentry);
2329 GOTO(out_pages, rc);
2333 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
2336 if (time_after(jiffies, now + 15*HZ))
2337 CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
2341 OBD_FREE(fso, objcount * sizeof(*fso));
2342 current->journal_info = NULL;
2343 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2347 while (lnb-- > res) {
2348 if (cmd & OBD_BRW_WRITE) {
2349 filter_commit_write(lnb, rc);
2350 up(&lnb->dentry->d_inode->i_sem);
2352 lustre_put_page(lnb->page);
2354 f_dput(lnb->dentry);
2356 if (cmd & OBD_BRW_WRITE) {
2357 filter_finish_transno(exp, *desc_private, oti, rc);
2359 filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
2362 goto out; /* dropped the dentry refs already (one per page) */
2365 for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
2366 if (cmd & OBD_BRW_WRITE)
2367 up(&fso[i].fso_dentry->d_inode->i_sem);
2368 f_dput(fso[i].fso_dentry);
2373 static int filter_write_locked_page(struct niobuf_local *lnb)
2381 lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
2382 if (IS_ERR(lpage)) {
2383 /* It is highly unlikely that we would ever get an error here.
2384 * The page we want to get was previously locked, so it had to
2385 * have already allocated the space, and we were just writing
2386 * over the same data, so there would be no hole in the file.
2388 * XXX: possibility of a race with truncate could exist, need
2389 * to check that. There are no guarantees w.r.t.
2390 * write order even on a local filesystem, although the
2391 * normal response would be to return the number of bytes
2392 * successfully written and leave the rest to the app.
2394 rc = PTR_ERR(lpage);
2395 CERROR("error getting locked page index %ld: rc = %d\n",
2396 lnb->page->index, rc);
2398 lustre_commit_write(lnb);
2402 /* 2 kmaps == vanishingly small deadlock opportunity */
2403 lpage_addr = kmap(lpage);
2404 lnb_addr = kmap(lnb->page);
2406 memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
2411 lustre_put_page(lnb->page);
2414 rc = lustre_commit_write(lnb);
2416 CERROR("error committing locked page %ld: rc = %d\n",
2417 lnb->page->index, rc);
2422 static int filter_syncfs(struct obd_export *exp)
2424 struct obd_device *obd = exp->exp_obd;
2427 RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
2430 static int filter_commitrw(int cmd, struct obd_export *exp,
2431 int objcount, struct obd_ioobj *obj,
2432 int niocount, struct niobuf_local *res,
2433 void *desc_private, struct obd_trans_info *oti)
2435 struct obd_run_ctxt saved;
2436 struct obd_ioobj *o;
2437 struct niobuf_local *lnb;
2438 struct obd_device *obd = exp->exp_obd;
2439 int found_locked = 0, rc = 0, i;
2440 unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
2443 push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2445 LASSERT(!current->journal_info);
2446 current->journal_info = desc_private;
2448 for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
2451 if (cmd & OBD_BRW_WRITE) {
2452 inode_update_time(lnb->dentry->d_inode, 1);
2453 up(&lnb->dentry->d_inode->i_sem);
2455 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2456 if (lnb->page == NULL) {
2460 if (lnb->flags & N_LOCAL_TEMP_PAGE) {
2465 if (time_after(jiffies, lnb->start + 15*HZ))
2466 CERROR("slow commitrw %lus\n",
2467 (jiffies - lnb->start) / HZ);
2469 if (cmd & OBD_BRW_WRITE) {
2470 int err = filter_commit_write(lnb, 0);
2475 lustre_put_page(lnb->page);
2478 f_dput(lnb->dentry);
2479 if (time_after(jiffies, lnb->start + 15*HZ))
2480 CERROR("slow commit_write %lus\n",
2481 (jiffies - lnb->start) / HZ);
2485 for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
2488 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
2490 if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
2493 if (time_after(jiffies, lnb->start + 15*HZ))
2494 CERROR("slow commitrw locked %lus\n",
2495 (jiffies - lnb->start) / HZ);
2497 err = filter_write_locked_page(lnb);
2500 f_dput(lnb->dentry);
2503 if (time_after(jiffies, lnb->start + 15*HZ))
2504 CERROR("slow commit_write locked %lus\n",
2505 (jiffies - lnb->start) / HZ);
2509 if (cmd & OBD_BRW_WRITE) {
2510 /* We just want any dentry for the commit, for now */
2511 struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
2514 rc = filter_finish_transno(exp, desc_private, oti, rc);
2515 err = fsfilt_commit(obd, dparent->d_inode, desc_private,
2519 if (obd_sync_filter)
2520 LASSERT(oti->oti_transno <= obd->obd_last_committed);
2522 if (time_after(jiffies, now + 15*HZ))
2523 CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
2526 LASSERT(!current->journal_info);
2528 pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
2532 static int filter_brw(int cmd, struct lustre_handle *conn,
2533 struct lov_stripe_md *lsm, obd_count oa_bufs,
2534 struct brw_page *pga, struct obd_trans_info *oti)
2536 struct obd_export *export = class_conn2export(conn);
2537 struct obd_ioobj ioo;
2538 struct niobuf_local *lnb;
2539 struct niobuf_remote *rnb;
2548 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
2549 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
2551 if (lnb == NULL || rnb == NULL)
2552 GOTO(out, ret = -ENOMEM);
2554 for (i = 0; i < oa_bufs; i++) {
2555 rnb[i].offset = pga[i].off;
2556 rnb[i].len = pga[i].count;
2559 ioo.ioo_id = lsm->lsm_object_id;
2561 ioo.ioo_type = S_IFREG;
2562 ioo.ioo_bufcnt = oa_bufs;
2564 ret = filter_preprw(cmd, export, 1, &ioo, oa_bufs, rnb, lnb,
2565 &desc_private, oti);
2569 for (i = 0; i < oa_bufs; i++) {
2570 void *virt = kmap(pga[i].pg);
2571 obd_off off = pga[i].off & ~PAGE_MASK;
2572 void *addr = kmap(lnb[i].page);
2574 /* 2 kmaps == vanishingly small deadlock opportunity */
2576 if (cmd & OBD_BRW_WRITE)
2577 memcpy(addr + off, virt + off, pga[i].count);
2579 memcpy(virt + off, addr + off, pga[i].count);
2585 ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
2590 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
2592 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
2593 class_export_put(export);
2597 static int filter_san_preprw(int cmd, struct lustre_handle *conn,
2598 int objcount, struct obd_ioobj *obj,
2599 int niocount, struct niobuf_remote *nb)
2601 struct obd_device *obd;
2602 struct obd_ioobj *o = obj;
2603 struct niobuf_remote *rnb = nb;
2608 obd = class_conn2obd(conn);
2610 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2615 for (i = 0; i < objcount; i++, o++) {
2616 struct dentry *dentry;
2617 struct inode *inode;
2618 int (*fs_bmap)(struct address_space *, long);
2621 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
2623 GOTO(out, rc = PTR_ERR(dentry));
2624 inode = dentry->d_inode;
2626 CERROR("trying to BRW to non-existent file "LPU64"\n",
2629 GOTO(out, rc = -ENOENT);
2631 fs_bmap = inode->i_mapping->a_ops->bmap;
2633 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
2636 block = rnb->offset >> inode->i_blkbits;
2638 if (cmd == OBD_BRW_READ) {
2639 block = fs_bmap(inode->i_mapping, block);
2641 loff_t newsize = rnb->offset + rnb->len;
2642 /* fs_prep_san_write will also update inode
2644 * (1) new alloced block
2645 * (2) existed block but size extented
2647 /* FIXME We could call fs_prep_san_write()
2648 * only once for all the blocks allocation.
2649 * Now call it once for each block, for
2650 * simplicity. And if error happens, we
2651 * probably need to release previous alloced
2653 rc = fs_prep_san_write(obd, inode, &block,
2659 rnb->offset = block;
2667 static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2669 struct obd_device *obd;
2672 obd = class_conn2obd(conn);
2674 RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
2677 static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
2678 void *key, __u32 *vallen, void *val)
2680 struct obd_device *obd;
2683 obd = class_conn2obd(conn);
2685 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2690 if (keylen == strlen("blocksize") &&
2691 memcmp(key, "blocksize", keylen) == 0) {
2692 __u32 *blocksize = val;
2693 *vallen = sizeof(*blocksize);
2694 *blocksize = obd->u.filter.fo_sb->s_blocksize;
2698 if (keylen == strlen("blocksize_bits") &&
2699 memcmp(key, "blocksize_bits", keylen) == 0) {
2700 __u32 *blocksize_bits = val;
2701 *vallen = sizeof(*blocksize_bits);
2702 *blocksize_bits = obd->u.filter.fo_sb->s_blocksize_bits;
2706 CDEBUG(D_IOCTL, "invalid key\n");
2710 int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
2711 struct lustre_handle *src_conn, struct obdo *src,
2712 obd_size count, obd_off offset, struct obd_trans_info *oti)
2715 struct lov_stripe_md srcmd, dstmd;
2716 unsigned long index = 0;
2719 LBUG(); /* THIS CODE IS NOT CORRECT -phil */
2721 memset(&srcmd, 0, sizeof(srcmd));
2722 memset(&dstmd, 0, sizeof(dstmd));
2723 srcmd.lsm_object_id = src->o_id;
2724 dstmd.lsm_object_id = dst->o_id;
2727 CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
2728 ", dst: ino "LPU64"\n",
2729 src->o_id, src->o_blocks, src->o_size, dst->o_id);
2730 page = alloc_page(GFP_USER);
2736 /* XXX with brw vector I/O, we could batch up reads and writes here,
2737 * all we need to do is allocate multiple pages to handle the I/Os
2738 * and arrays to handle the request parameters.
2740 while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
2744 pg.count = PAGE_SIZE;
2745 pg.off = (page->index) << PAGE_SHIFT;
2748 page->index = index;
2749 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
2755 pg.flag = OBD_BRW_CREATE;
2756 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
2758 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
2760 /* XXX should handle dst->o_size, dst->o_blocks here */
2766 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
2770 dst->o_size = src->o_size;
2771 dst->o_blocks = src->o_blocks;
2772 dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
2779 int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2780 int len, void *karg, void *uarg)
2782 struct obd_device *obd = class_conn2obd(conn);
2785 case OBD_IOC_ABORT_RECOVERY:
2786 CERROR("aborting recovery for device %s\n", obd->obd_name);
2787 target_abort_recovery(obd);
2797 static struct obd_ops filter_obd_ops = {
2798 o_owner: THIS_MODULE,
2799 o_attach: filter_attach,
2800 o_detach: filter_detach,
2801 o_get_info: filter_get_info,
2802 o_setup: filter_setup,
2803 o_cleanup: filter_cleanup,
2804 o_connect: filter_connect,
2805 o_disconnect: filter_disconnect,
2806 o_statfs: filter_statfs,
2807 o_syncfs: filter_syncfs,
2808 o_getattr: filter_getattr,
2809 o_create: filter_create,
2810 o_setattr: filter_setattr,
2811 o_destroy: filter_destroy,
2812 o_open: filter_open,
2813 o_close: filter_close,
2815 o_punch: filter_truncate,
2816 o_preprw: filter_preprw,
2817 o_commitrw: filter_commitrw,
2818 o_destroy_export: filter_destroy_export,
2819 o_iocontrol: filter_iocontrol,
2821 o_san_preprw: filter_san_preprw,
2822 o_preallocate: filter_preallocate_inodes,
2823 o_migrate: filter_migrate,
2824 o_copy: filter_copy_data,
2825 o_iterate: filter_iterate
2829 static struct obd_ops filter_sanobd_ops = {
2830 o_owner: THIS_MODULE,
2831 o_attach: filter_attach,
2832 o_detach: filter_detach,
2833 o_get_info: filter_get_info,
2834 o_setup: filter_san_setup,
2835 o_cleanup: filter_cleanup,
2836 o_connect: filter_connect,
2837 o_disconnect: filter_disconnect,
2838 o_statfs: filter_statfs,
2839 o_getattr: filter_getattr,
2840 o_create: filter_create,
2841 o_setattr: filter_setattr,
2842 o_destroy: filter_destroy,
2843 o_open: filter_open,
2844 o_close: filter_close,
2846 o_punch: filter_truncate,
2847 o_preprw: filter_preprw,
2848 o_commitrw: filter_commitrw,
2849 o_san_preprw: filter_san_preprw,
2850 o_destroy_export: filter_destroy_export,
2851 o_iocontrol: filter_iocontrol,
2853 o_preallocate: filter_preallocate_inodes,
2854 o_migrate: filter_migrate,
2855 o_copy: filter_copy_data,
2856 o_iterate: filter_iterate
2861 static int __init obdfilter_init(void)
2863 struct lprocfs_static_vars lvars;
2866 printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
2868 lprocfs_init_vars(&lvars);
2870 rc = class_register_type(&filter_obd_ops, lvars.module_vars,
2871 OBD_FILTER_DEVICENAME);
2875 rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
2876 OBD_FILTER_SAN_DEVICENAME);
2878 class_unregister_type(OBD_FILTER_DEVICENAME);
2882 static void __exit obdfilter_exit(void)
2884 class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
2885 class_unregister_type(OBD_FILTER_DEVICENAME);
2888 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2889 MODULE_DESCRIPTION("Lustre Filtering OBD driver");
2890 MODULE_LICENSE("GPL");
2892 module_init(obdfilter_init);
2893 module_exit(obdfilter_exit);